Skip to content
Snippets Groups Projects
log.go 3.49 KiB
Newer Older
package events

import (
	"encoding/json"
	"fmt"
	"log"
	"os"
	"time"

	"github.com/isayme/go-amqp-reconnect/rabbitmq"
	"github.com/streadway/amqp"
)

const LOG_QUEUE = "AMQP_LOG_QUEUE"

var logChannel *rabbitmq.Channel
var connection *rabbitmq.Connection
var channel *rabbitmq.Channel

// ID is the id of the Uploaded object
var ID string

// Worker defined which worker the log should be attributed to
var Worker string

// Event Status
const (
	ERROR = iota
	WARN
	INFO
)

// LogMessage outlines the required fields for a message to be logged
type LogMessage struct {
	ID      string    `json:"upload_id"`
	Level   int64     `json:"level"`
	Worker  string    `json:"worker"`
	Message string    `json:"message"`
	Time    time.Time `json:"logged_at"`
}

// Start the logging system
func Start(worker string) *rabbitmq.Channel {
	Worker = worker
	logChannel = connect(defaultQueue())
	ResetUploadID()

	return logChannel
}

// Deprecated: Please use ResetID()
// ResetUploadID resets the Upload ID back to unset
func ResetUploadID() string {
	return ResetID()
}

// ResetID resets the ID back to unset
func ResetID() string {
	ID = "unset"
	return "unset"
}

// Log publishes a message to the queue to notify that
// something has changed
func Log(level int64, message string) {

	logMessage := LogMessage{
		ID:      ID,
		Level:   level,
		Worker:  Worker,
		Message: message,
		Time:    time.Now(),
	}

	body, err := json.Marshal(logMessage)

	if err != nil {
		outputError(fmt.Errorf("json unmarshal failed: %s", err))
	}

	// Publish
	logChannel.Publish(
		"",             // publish to an exchange
		defaultQueue(), // routing to 0 or more queues
		false,          // mandatory
		false,          // immediate
		amqp.Publishing{
			Headers:         amqp.Table{},
			ContentType:     "application/json",
			ContentEncoding: "",
			Body:            body,
			DeliveryMode:    amqp.Persistent, // 1=non-persistent, 2=persistent
			Priority:        0,               // 0-9
		},
	)

	outputLog(fmt.Sprintf("Publishing message to: '%s'\n %v\n\n", defaultQueue(), string(body)))
}

func defaultQueue() string {
	if os.Getenv(LOG_QUEUE) != "" {
		return os.Getenv(LOG_QUEUE)
	} else {
		return "upload_logs"
	}
}

// Connect to the RabbitMQ cluster
// and return the channel to use
func connect(queue string) *rabbitmq.Channel {
	setQueue(queue)
	return channel
}

// getConnection returns the AMQP connection
func getConnection() *rabbitmq.Connection {
	if connection == nil {
		rabbit := os.Getenv("AMQP_CONNECTION")
		var err error
		connection, err = rabbitmq.Dial(rabbit)
		if err != nil {
			outputError(fmt.Errorf("%s: %s", err, "Failed to connect to RabbitMQ"))
		}
		outputLog("Connected to RabbitMQ")
	}
	return connection
}

// getChannel sets the channel to use for sending messages
func getChannel() *rabbitmq.Channel {
	if channel == nil {
		var err error
		channel, err = getConnection().Channel()
		if err != nil {
			outputError(fmt.Errorf("%s: %s", err, "Failed to create/connect to Channel"))
		}
		outputLog("Created Channel")
	}

	return channel
}

// setQueue binds the queue to the channel
func setQueue(q string) {
	_, err := getChannel().QueueDeclare(
		q,
		true,
		false,
		false,
		false,
		nil,
	)
	if err != nil {
		outputError(fmt.Errorf("%s: %s", err, "Failed to create/connect to Queue"))
	}
	outputLog("Created Queue")
}

// OutputLog prints a message to stdout
func outputLog(message string) {
	log.Printf("[Event] %s", message)
}

// OutputError prints to stderr
func outputError(err error) {
	log.Printf("[Event][Error] %s:", err.Error())
}