Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# frozen_string_literal: true
require 'puma'
require 'puma/plugin'
require 'socket'
require 'nexus_semantic_logger/datadog_singleton'
# Forked from puma-plugin-statsd.
# Uses the same datadog settings as nexus_semantic_logger.
# To use, add to puma.rb:
# plugin :nexus_puma_statsd
class StatsdConnector
STATSD_TYPES = { count: 'c', gauge: 'g' }
METRIC_DELIMETER = "."
attr_reader :host, :port
def initialize
@host = ENV.fetch('DD_AGENT_HOST', '127.0.0.1')
@port = ENV.fetch('DD_STATSD_PORT', 8125)
@socket_path = ENV.fetch('DD_STATSD_SOCKET_PATH', '')
end
def send(metric_name:, value:, type:, tags: nil)
data = "#{metric_name}:#{value}|#{STATSD_TYPES.fetch(type)}"
data = "#{data}|##{tags}" unless tags.nil?
if @socket_path.to_s.strip.empty?
socket = UDPSocket.new
socket.send(data, 0, host, port)
else
socket = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM)
socket.connect(Socket.pack_sockaddr_un(@socket_path))
socket.sendmsg_nonblock(data)
end
ensure
socket.close
end
end
# Wrap puma's stats in a safe API
class PumaStats
def initialize(stats)
@stats = stats
end
def clustered?
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
end
def workers
@stats.fetch(:workers, 1)
end
def booted_workers
@stats.fetch(:booted_workers, 1)
end
def old_workers
@stats.fetch(:old_workers, 0)
end
def running
if clustered?
@stats[:worker_status].map { |s| s[:last_status].fetch(:running, 0) }.inject(0, &:+)
else
@stats.fetch(:running, 0)
end
end
def backlog
if clustered?
@stats[:worker_status].map { |s| s[:last_status].fetch(:backlog, 0) }.inject(0, &:+)
else
@stats.fetch(:backlog, 0)
end
end
def pool_capacity
if clustered?
@stats[:worker_status].map { |s| s[:last_status].fetch(:pool_capacity, 0) }.inject(0, &:+)
else
@stats.fetch(:pool_capacity, 0)
end
end
def max_threads
if clustered?
@stats[:worker_status].map { |s| s[:last_status].fetch(:max_threads, 0) }.inject(0, &:+)
else
@stats.fetch(:max_threads, 0)
end
end
def requests_count
if clustered?
@stats[:worker_status].map { |s| s[:last_status].fetch(:requests_count, 0) }.inject(0, &:+)
else
@stats.fetch(:requests_count, 0)
end
end
end
Puma::Plugin.create do
# We can start doing something when we have a launcher:
def start(launcher)
@launcher = launcher
@log_writer =
if Gem::Version.new(Puma::Const::PUMA_VERSION) >= Gem::Version.new(6)
@launcher.log_writer
else
@launcher.events
end
@statsd = ::StatsdConnector.new
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# Fetch global metric prefix from env variable
@metric_prefix = ENV.fetch('DD_STATSD_METRIC_PREFIX', nil)
if @metric_prefix && !@metric_prefix.end_with?(::StatsdConnector::METRIC_DELIMETER)
@metric_prefix += ::StatsdConnector::METRIC_DELIMETER
end
register_hooks
end
private
def register_hooks
in_background(&method(:stats_loop))
end
def environment_variable_tags
# Tags are separated by spaces, and while they are normally a tag and
# value separated by a ':', they can also just be tagged without any
# associated value.
#
# Examples: simple-tag-0 tag-key-1:tag-value-1
#
tags = []
global_tags = NexusSemanticLogger::DatadogSingleton.instance.global_tags
tags += global_tags unless global_tags.nil?
tags << "pod_name:#{ENV['HOSTNAME']}"
end
# Standardised datadog tag attributes, so that we can share the metric
# tags with the application running
#
# https://docs.datadoghq.com/agent/docker/?tab=standard#global-options
#
ENV["DD_TAGS"].split(/\s+|,/).each do |t|
tags << t
end
end
# Support the Unified Service Tagging from Datadog, so that we can share
# the metric tags with the application running
#
# https://docs.datadoghq.com/getting_started/tagging/unified_service_tagging
if ENV.key?('DD_SERVICE')
tags << "service:#{ENV['DD_SERVICE']}"
if ENV.key?('DD_VERSION')
tags << "version:#{ENV['DD_VERSION']}"
end
# Support the origin detection over UDP from Datadog, it allows DogStatsD
# to detect where the container metrics come from, and tag metrics automatically.
#
# https://docs.datadoghq.com/developers/dogstatsd/?tab=kubernetes#origin-detection-over-udp
if ENV.key?('DD_ENTITY_ID')
tags << "dd.internal.entity_id:#{ENV['DD_ENTITY_ID']}"
end
# Return nil if we have no environment variable tags. This way we don't
# send an unnecessary '|' on the end of each stat
return nil if tags.empty?
tags.join(",")
end
def prefixed_metric_name(puma_metric)
"#{@metric_prefix}#{puma_metric}"
end
# Send data to statsd every few seconds
def stats_loop
tags = environment_variable_tags
begin
stats = ::PumaStats.new(Puma.stats_hash)
@statsd.send(metric_name: prefixed_metric_name("puma.workers"), value: stats.workers, type: :gauge, tags: tags)
@statsd.send(metric_name: prefixed_metric_name("puma.booted_workers"), value: stats.booted_workers,
type: :gauge, tags: tags)
@statsd.send(metric_name: prefixed_metric_name("puma.old_workers"), value: stats.old_workers, type: :gauge,
tags: tags)
@statsd.send(metric_name: prefixed_metric_name("puma.running"), value: stats.running, type: :gauge, tags: tags)
@statsd.send(metric_name: prefixed_metric_name("puma.backlog"), value: stats.backlog, type: :gauge, tags: tags)
@statsd.send(metric_name: prefixed_metric_name("puma.pool_capacity"), value: stats.pool_capacity, type: :gauge,
tags: tags)
@statsd.send(metric_name: prefixed_metric_name("puma.max_threads"), value: stats.max_threads, type: :gauge,
tags: tags)
@statsd.send(metric_name: prefixed_metric_name("puma.requests_count"), value: stats.requests_count,
type: :gauge, tags: tags)
@log_writer.unknown_error(e, nil, "! statsd: notify stats failed")