package events import ( "encoding/json" "github.com/isayme/go-amqp-reconnect/rabbitmq" "github.com/streadway/amqp" "golang.org/x/exp/slog" "os" "time" ) const LOG_QUEUE = "AMQP_LOG_QUEUE" var logChannel *rabbitmq.Channel var connection *rabbitmq.Connection var channel *rabbitmq.Channel // ID is the id of the Uploaded object var ID string // Worker defined which worker the log should be attributed to var Worker string // Event Status const ( ERROR = iota WARN INFO ) // LogMessage outlines the required fields for a message to be logged type LogMessage struct { ID string `json:"upload_id"` Level int64 `json:"level"` Worker string `json:"worker"` Message string `json:"message"` Time time.Time `json:"logged_at"` } // Start the logging system func Start(logger *slog.Logger, worker string) *rabbitmq.Channel { Worker = worker logChannel = connect(logger, defaultQueue()) ResetUploadID() return logChannel } // Deprecated: Please use ResetID() // ResetUploadID resets the Upload ID back to unset func ResetUploadID() string { return ResetID() } // ResetID resets the ID back to unset func ResetID() string { ID = "unset" return "unset" } // Log publishes a message to the queue to notify that // something has changed func Log(logger *slog.Logger, level int64, message string) { logMessage := LogMessage{ ID: ID, Level: level, Worker: Worker, Message: message, Time: time.Now(), } data, err := json.Marshal(logMessage) // Why do we not abort if there is an error here? if err != nil { logger.Error("Failed to produce JSON from message.", slog.String("error", err.Error())) } logChannel.Publish( "", // publish to an exchange defaultQueue(), // routing to 0 or more queues false, // mandatory false, // immediate amqp.Publishing{ Headers: amqp.Table{}, ContentType: "application/json", ContentEncoding: "", Body: data, DeliveryMode: amqp.Persistent, // 1=non-persistent, 2=persistent Priority: 0, // 0-9 }, ) logger.Debug("Published message.", slog.String("queue", defaultQueue()), slog.String("payload", string(data))) } func defaultQueue() string { if os.Getenv(LOG_QUEUE) != "" { return os.Getenv(LOG_QUEUE) } else { return "upload_logs" } } // Connect to the RabbitMQ cluster // and return the channel to use func connect(logger *slog.Logger, queue string) *rabbitmq.Channel { setQueue(logger, queue) return channel } // getConnection returns the AMQP connection func getConnection(logger *slog.Logger) *rabbitmq.Connection { if connection == nil { rabbit := os.Getenv("AMQP_CONNECTION") var err error connection, err = rabbitmq.Dial(rabbit) if err != nil { logger.Error("Failed to connect to RabbitMQ.", slog.String("error", err.Error())) } else { logger.Info("Connected to RabbitMQ.") } } return connection } // getChannel sets the channel to use for sending messages func getChannel(logger *slog.Logger) *rabbitmq.Channel { if channel == nil { var err error channel, err = getConnection(logger).Channel() if err != nil { logger.Error("Failed to create/connect to channel.", slog.String("error", err.Error())) } else { logger.Info("Created channel.") } } return channel } // Binds the queue to the channel. func setQueue(logger *slog.Logger, q string) { _, err := getChannel(logger).QueueDeclare( q, true, false, false, false, nil, ) if err != nil { logger.Error("Failed to create/connect to queue.", slog.String("queue", q), slog.String("error", err.Error())) } else { logger.Info("Created queue.", slog.String("queue", q)) } }