Skip to content
Snippets Groups Projects
log.go 3.63 KiB
Newer Older
package events

import (
	"encoding/json"
	"github.com/isayme/go-amqp-reconnect/rabbitmq"
	"github.com/streadway/amqp"
John Harris's avatar
John Harris committed
	"golang.org/x/exp/slog"
	"os"
	"time"
)

const LOG_QUEUE = "AMQP_LOG_QUEUE"

var logChannel *rabbitmq.Channel
var connection *rabbitmq.Connection
var channel *rabbitmq.Channel

// ID is the id of the Uploaded object
var ID string

// Worker defined which worker the log should be attributed to
var Worker string

// Event Status
const (
	ERROR = iota
	WARN
	INFO
)

// LogMessage outlines the required fields for a message to be logged
type LogMessage struct {
	ID      string    `json:"upload_id"`
	Level   int64     `json:"level"`
	Worker  string    `json:"worker"`
	Message string    `json:"message"`
	Time    time.Time `json:"logged_at"`
}

// Start the logging system
John Harris's avatar
John Harris committed
func Start(logger *slog.Logger, worker string) *rabbitmq.Channel {
	Worker = worker
John Harris's avatar
John Harris committed
	logChannel = connect(logger, defaultQueue())
	ResetUploadID()

	return logChannel
}

// Deprecated: Please use ResetID()
// ResetUploadID resets the Upload ID back to unset
func ResetUploadID() string {
	return ResetID()
}

// ResetID resets the ID back to unset
func ResetID() string {
	ID = "unset"
	return "unset"
}

// Log publishes a message to the queue to notify that
// something has changed
John Harris's avatar
John Harris committed
func Log(logger *slog.Logger, level int64, message string) {

	logMessage := LogMessage{
		ID:      ID,
		Level:   level,
		Worker:  Worker,
		Message: message,
		Time:    time.Now(),
	}

John Harris's avatar
John Harris committed
	data, err := json.Marshal(logMessage)
	// Why do we not abort if there is an error here?
	if err != nil {
John Harris's avatar
John Harris committed
		logger.Error("Failed to produce JSON from message.", slog.String("error", err.Error()))
	}

	logChannel.Publish(
		"",             // publish to an exchange
		defaultQueue(), // routing to 0 or more queues
		false,          // mandatory
		false,          // immediate
		amqp.Publishing{
			Headers:         amqp.Table{},
			ContentType:     "application/json",
			ContentEncoding: "",
John Harris's avatar
John Harris committed
			Body:            data,
			DeliveryMode:    amqp.Persistent, // 1=non-persistent, 2=persistent
			Priority:        0,               // 0-9
		},
	)

John Harris's avatar
John Harris committed
	logger.Debug("Published message.", slog.String("queue", defaultQueue()),
		slog.String("payload", string(data)))
}

func defaultQueue() string {
	if os.Getenv(LOG_QUEUE) != "" {
		return os.Getenv(LOG_QUEUE)
	} else {
		return "upload_logs"
	}
}

// Connect to the RabbitMQ cluster
// and return the channel to use
John Harris's avatar
John Harris committed
func connect(logger *slog.Logger, queue string) *rabbitmq.Channel {
	setQueue(logger, queue)
	return channel
}

// getConnection returns the AMQP connection
John Harris's avatar
John Harris committed
func getConnection(logger *slog.Logger) *rabbitmq.Connection {
	if connection == nil {
		rabbit := os.Getenv("AMQP_CONNECTION")
		var err error
		connection, err = rabbitmq.Dial(rabbit)
		if err != nil {
John Harris's avatar
John Harris committed
			logger.Error("Failed to connect to RabbitMQ.", slog.String("error", err.Error()))
		} else {
			logger.Info("Connected to RabbitMQ.")
		}
	}
	return connection
}

// getChannel sets the channel to use for sending messages
John Harris's avatar
John Harris committed
func getChannel(logger *slog.Logger) *rabbitmq.Channel {
	if channel == nil {
		var err error
John Harris's avatar
John Harris committed
		channel, err = getConnection(logger).Channel()
		if err != nil {
John Harris's avatar
John Harris committed
			logger.Error("Failed to create/connect to channel.", slog.String("error", err.Error()))
		} else {
			logger.Info("Created channel.")
John Harris's avatar
John Harris committed
// Binds the queue to the channel.
func setQueue(logger *slog.Logger, q string) {
	_, err := getChannel(logger).QueueDeclare(
		q,
		true,
		false,
		false,
		false,
		nil,
	)
	if err != nil {
John Harris's avatar
John Harris committed
		logger.Error("Failed to create/connect to queue.", slog.String("queue", q),
			slog.String("error", err.Error()))
	} else {
		logger.Info("Created queue.", slog.String("queue", q))