Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package events
import (
"encoding/json"
"fmt"
"log"
"os"
"time"
"github.com/isayme/go-amqp-reconnect/rabbitmq"
"github.com/streadway/amqp"
)
const LOG_QUEUE = "AMQP_LOG_QUEUE"
var logChannel *rabbitmq.Channel
var connection *rabbitmq.Connection
var channel *rabbitmq.Channel
// ID is the id of the Uploaded object
var ID string
// Worker defined which worker the log should be attributed to
var Worker string
// Event Status
const (
ERROR = iota
WARN
INFO
)
// LogMessage outlines the required fields for a message to be logged
type LogMessage struct {
ID string `json:"upload_id"`
Level int64 `json:"level"`
Worker string `json:"worker"`
Message string `json:"message"`
Time time.Time `json:"logged_at"`
}
// Start the logging system
func Start(worker string) *rabbitmq.Channel {
Worker = worker
logChannel = connect(defaultQueue())
ResetUploadID()
return logChannel
}
// Deprecated: Please use ResetID()
// ResetUploadID resets the Upload ID back to unset
func ResetUploadID() string {
return ResetID()
}
// ResetID resets the ID back to unset
func ResetID() string {
ID = "unset"
return "unset"
}
// Log publishes a message to the queue to notify that
// something has changed
func Log(level int64, message string) {
logMessage := LogMessage{
ID: ID,
Level: level,
Worker: Worker,
Message: message,
Time: time.Now(),
}
body, err := json.Marshal(logMessage)
if err != nil {
outputError(fmt.Errorf("json unmarshal failed: %s", err))
}
// Publish
logChannel.Publish(
"", // publish to an exchange
defaultQueue(), // routing to 0 or more queues
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table{},
ContentType: "application/json",
ContentEncoding: "",
Body: body,
DeliveryMode: amqp.Persistent, // 1=non-persistent, 2=persistent
Priority: 0, // 0-9
},
)
outputLog(fmt.Sprintf("Publishing message to: '%s'\n %v\n\n", defaultQueue(), string(body)))
}
func defaultQueue() string {
if os.Getenv(LOG_QUEUE) != "" {
return os.Getenv(LOG_QUEUE)
} else {
return "upload_logs"
}
}
// Connect to the RabbitMQ cluster
// and return the channel to use
func connect(queue string) *rabbitmq.Channel {
setQueue(queue)
return channel
}
// getConnection returns the AMQP connection
func getConnection() *rabbitmq.Connection {
if connection == nil {
rabbit := os.Getenv("AMQP_CONNECTION")
var err error
connection, err = rabbitmq.Dial(rabbit)
if err != nil {
outputError(fmt.Errorf("%s: %s", err, "Failed to connect to RabbitMQ"))
}
outputLog("Connected to RabbitMQ")
}
return connection
}
// getChannel sets the channel to use for sending messages
func getChannel() *rabbitmq.Channel {
if channel == nil {
var err error
channel, err = getConnection().Channel()
if err != nil {
outputError(fmt.Errorf("%s: %s", err, "Failed to create/connect to Channel"))
}
outputLog("Created Channel")
}
return channel
}
// setQueue binds the queue to the channel
func setQueue(q string) {
_, err := getChannel().QueueDeclare(
q,
true,
false,
false,
false,
nil,
)
if err != nil {
outputError(fmt.Errorf("%s: %s", err, "Failed to create/connect to Queue"))
}
outputLog("Created Queue")
}
// OutputLog prints a message to stdout
func outputLog(message string) {
log.Printf("[Event] %s", message)
}
// OutputError prints to stderr
func outputError(err error) {
log.Printf("[Event][Error] %s:", err.Error())
}