package events import ( "encoding/json" "fmt" "log" "os" "time" "github.com/isayme/go-amqp-reconnect/rabbitmq" "github.com/streadway/amqp" ) const LOG_QUEUE = "AMQP_LOG_QUEUE" var logChannel *rabbitmq.Channel var connection *rabbitmq.Connection var channel *rabbitmq.Channel // ID is the id of the Uploaded object var ID string // Worker defined which worker the log should be attributed to var Worker string // Event Status const ( ERROR = iota WARN INFO ) // LogMessage outlines the required fields for a message to be logged type LogMessage struct { ID string `json:"upload_id"` Level int64 `json:"level"` Worker string `json:"worker"` Message string `json:"message"` Time time.Time `json:"logged_at"` } // Start the logging system func Start(worker string) *rabbitmq.Channel { Worker = worker logChannel = connect(defaultQueue()) ResetUploadID() return logChannel } // Deprecated: Please use ResetID() // ResetUploadID resets the Upload ID back to unset func ResetUploadID() string { return ResetID() } // ResetID resets the ID back to unset func ResetID() string { ID = "unset" return "unset" } // Log publishes a message to the queue to notify that // something has changed func Log(level int64, message string) { logMessage := LogMessage{ ID: ID, Level: level, Worker: Worker, Message: message, Time: time.Now(), } body, err := json.Marshal(logMessage) if err != nil { outputError(fmt.Errorf("json unmarshal failed: %s", err)) } // Publish logChannel.Publish( "", // publish to an exchange defaultQueue(), // routing to 0 or more queues false, // mandatory false, // immediate amqp.Publishing{ Headers: amqp.Table{}, ContentType: "application/json", ContentEncoding: "", Body: body, DeliveryMode: amqp.Persistent, // 1=non-persistent, 2=persistent Priority: 0, // 0-9 }, ) outputLog(fmt.Sprintf("Publishing message to: '%s'\n %v\n\n", defaultQueue(), string(body))) } func defaultQueue() string { if os.Getenv(LOG_QUEUE) != "" { return os.Getenv(LOG_QUEUE) } else { return "upload_logs" } } // Connect to the RabbitMQ cluster // and return the channel to use func connect(queue string) *rabbitmq.Channel { setQueue(queue) return channel } // getConnection returns the AMQP connection func getConnection() *rabbitmq.Connection { if connection == nil { rabbit := os.Getenv("AMQP_CONNECTION") var err error connection, err = rabbitmq.Dial(rabbit) if err != nil { outputError(fmt.Errorf("%s: %s", err, "Failed to connect to RabbitMQ")) } outputLog("Connected to RabbitMQ") } return connection } // getChannel sets the channel to use for sending messages func getChannel() *rabbitmq.Channel { if channel == nil { var err error channel, err = getConnection().Channel() if err != nil { outputError(fmt.Errorf("%s: %s", err, "Failed to create/connect to Channel")) } outputLog("Created Channel") } return channel } // setQueue binds the queue to the channel func setQueue(q string) { _, err := getChannel().QueueDeclare( q, true, false, false, false, nil, ) if err != nil { outputError(fmt.Errorf("%s: %s", err, "Failed to create/connect to Queue")) } outputLog("Created Queue") } // OutputLog prints a message to stdout func outputLog(message string) { log.Printf("[Event] %s", message) } // OutputError prints to stderr func outputError(err error) { log.Printf("[Event][Error] %s:", err.Error()) }