Newer
Older
package events
import (
"encoding/json"
"github.com/isayme/go-amqp-reconnect/rabbitmq"
"github.com/streadway/amqp"
)
const LOG_QUEUE = "AMQP_LOG_QUEUE"
var logChannel *rabbitmq.Channel
var connection *rabbitmq.Connection
var channel *rabbitmq.Channel
// ID is the id of the Uploaded object
var ID string
// Worker defined which worker the log should be attributed to
var Worker string
// Event Status
const (
ERROR = iota
WARN
INFO
)
// LogMessage outlines the required fields for a message to be logged
type LogMessage struct {
ID string `json:"upload_id"`
Level int64 `json:"level"`
Worker string `json:"worker"`
Message string `json:"message"`
Time time.Time `json:"logged_at"`
}
// Start the logging system
func Start(logger *slog.Logger, worker string) *rabbitmq.Channel {
ResetUploadID()
return logChannel
}
// Deprecated: Please use ResetID()
// ResetUploadID resets the Upload ID back to unset
func ResetUploadID() string {
return ResetID()
}
// ResetID resets the ID back to unset
func ResetID() string {
ID = "unset"
return "unset"
}
// Log publishes a message to the queue to notify that
// something has changed
func Log(logger *slog.Logger, level int64, message string) {
logMessage := LogMessage{
ID: ID,
Level: level,
Worker: Worker,
Message: message,
Time: time.Now(),
}
data, err := json.Marshal(logMessage)
// Why do we not abort if there is an error here?
logger.Error("Failed to produce JSON from message.", slog.String("error", err.Error()))
}
logChannel.Publish(
"", // publish to an exchange
defaultQueue(), // routing to 0 or more queues
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table{},
ContentType: "application/json",
ContentEncoding: "",
DeliveryMode: amqp.Persistent, // 1=non-persistent, 2=persistent
Priority: 0, // 0-9
},
)
logger.Debug("Published message.", slog.String("queue", defaultQueue()),
slog.String("payload", string(data)))
}
func defaultQueue() string {
if os.Getenv(LOG_QUEUE) != "" {
return os.Getenv(LOG_QUEUE)
} else {
return "upload_logs"
}
}
// Connect to the RabbitMQ cluster
// and return the channel to use
func connect(logger *slog.Logger, queue string) *rabbitmq.Channel {
setQueue(logger, queue)
return channel
}
// getConnection returns the AMQP connection
func getConnection(logger *slog.Logger) *rabbitmq.Connection {
if connection == nil {
rabbit := os.Getenv("AMQP_CONNECTION")
var err error
connection, err = rabbitmq.Dial(rabbit)
if err != nil {
logger.Error("Failed to connect to RabbitMQ.", slog.String("error", err.Error()))
} else {
logger.Info("Connected to RabbitMQ.")
}
}
return connection
}
// getChannel sets the channel to use for sending messages
func getChannel(logger *slog.Logger) *rabbitmq.Channel {
logger.Error("Failed to create/connect to channel.", slog.String("error", err.Error()))
} else {
logger.Info("Created channel.")
// Binds the queue to the channel.
func setQueue(logger *slog.Logger, q string) {
_, err := getChannel(logger).QueueDeclare(
q,
true,
false,
false,
false,
nil,
)
if err != nil {
logger.Error("Failed to create/connect to queue.", slog.String("queue", q),
slog.String("error", err.Error()))
} else {
logger.Info("Created queue.", slog.String("queue", q))