Skip to content
Snippets Groups Projects
nexus_puma_statsd.rb 6.28 KiB
Newer Older
# frozen_string_literal: true
require 'puma'
require 'puma/plugin'
require 'socket'
require 'nexus_semantic_logger/datadog_singleton'

# Forked from puma-plugin-statsd.
# Uses the same datadog settings as nexus_semantic_logger.
# To use, add to puma.rb:
#   plugin :nexus_puma_statsd
class StatsdConnector
  STATSD_TYPES = { count: 'c', gauge: 'g' }
  METRIC_DELIMETER = "."

  attr_reader :host, :port

  def initialize
    @host = ENV.fetch('DD_AGENT_HOST', '127.0.0.1')
    @port = ENV.fetch('DD_STATSD_PORT', 8125)
    @socket_path = ENV.fetch('DD_STATSD_SOCKET_PATH', '')
  end

  def send(metric_name:, value:, type:, tags: nil)
    data = "#{metric_name}:#{value}|#{STATSD_TYPES.fetch(type)}"
    data = "#{data}|##{tags}" unless tags.nil?

    if @socket_path.to_s.strip.empty?
      socket = UDPSocket.new
      socket.send(data, 0, host, port)
    else
      socket = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM)
      socket.connect(Socket.pack_sockaddr_un(@socket_path))
      socket.sendmsg_nonblock(data)
    end
  ensure
    socket.close
  end
end

# Wrap puma's stats in a safe API
class PumaStats
  def initialize(stats)
    @stats = stats
  end

  def clustered?
John Harris's avatar
John Harris committed
    @stats.key?(:workers)
  end

  def workers
    @stats.fetch(:workers, 1)
  end

  def booted_workers
    @stats.fetch(:booted_workers, 1)
  end

  def old_workers
    @stats.fetch(:old_workers, 0)
  end

  def running
    if clustered?
      @stats[:worker_status].map { |s| s[:last_status].fetch(:running, 0) }.inject(0, &:+)
    else
      @stats.fetch(:running, 0)
    end
  end

  def backlog
    if clustered?
      @stats[:worker_status].map { |s| s[:last_status].fetch(:backlog, 0) }.inject(0, &:+)
    else
      @stats.fetch(:backlog, 0)
    end
  end

  def pool_capacity
    if clustered?
      @stats[:worker_status].map { |s| s[:last_status].fetch(:pool_capacity, 0) }.inject(0, &:+)
    else
      @stats.fetch(:pool_capacity, 0)
    end
  end

  def max_threads
    if clustered?
      @stats[:worker_status].map { |s| s[:last_status].fetch(:max_threads, 0) }.inject(0, &:+)
    else
      @stats.fetch(:max_threads, 0)
    end
  end

  def requests_count
    if clustered?
      @stats[:worker_status].map { |s| s[:last_status].fetch(:requests_count, 0) }.inject(0, &:+)
    else
      @stats.fetch(:requests_count, 0)
    end
  end
end

Puma::Plugin.create do
  # We can start doing something when we have a launcher:
  def start(launcher)
    @launcher = launcher
    @log_writer =
      if Gem::Version.new(Puma::Const::PUMA_VERSION) >= Gem::Version.new(6)
        @launcher.log_writer
      else
        @launcher.events
      end

    @statsd = ::StatsdConnector.new
John Harris's avatar
John Harris committed
    @log_writer.debug("statsd: enabled (host: #{@statsd.host})")

    # Fetch global metric prefix from env variable
    @metric_prefix = ENV.fetch('DD_STATSD_METRIC_PREFIX', nil)
    if @metric_prefix && !@metric_prefix.end_with?(::StatsdConnector::METRIC_DELIMETER)
      @metric_prefix += ::StatsdConnector::METRIC_DELIMETER
    end

    register_hooks
  end

  private

  def register_hooks
    in_background(&method(:stats_loop))
  end

  def environment_variable_tags
    # Tags are separated by spaces, and while they are normally a tag and
    # value separated by a ':', they can also just be tagged without any
    # associated value.
    #
    # Examples: simple-tag-0 tag-key-1:tag-value-1
    #
    tags = []
    global_tags = NexusSemanticLogger::DatadogSingleton.instance.global_tags
    tags += global_tags unless global_tags.nil?

John Harris's avatar
John Harris committed
    if ENV.key?('HOSTNAME')
      tags << "pod_name:#{ENV['HOSTNAME']}"
    end

    # Standardised datadog tag attributes, so that we can share the metric
    # tags with the application running
    #
    # https://docs.datadoghq.com/agent/docker/?tab=standard#global-options
    #
John Harris's avatar
John Harris committed
    if ENV.key?("DD_TAGS")
      ENV["DD_TAGS"].split(/\s+|,/).each do |t|
        tags << t
      end
    end

    # Support the Unified Service Tagging from Datadog, so that we can share
    # the metric tags with the application running
    #
    # https://docs.datadoghq.com/getting_started/tagging/unified_service_tagging
John Harris's avatar
John Harris committed
    if ENV.key?('DD_ENV')
      tags << "env:#{ENV['DD_ENV']}"
John Harris's avatar
John Harris committed
    if ENV.key?('DD_SERVICE')
      tags << "service:#{ENV['DD_SERVICE']}"
John Harris's avatar
John Harris committed
    if ENV.key?('DD_VERSION')
      tags << "version:#{ENV['DD_VERSION']}"
    end

    # Support the origin detection over UDP from Datadog, it allows DogStatsD
    # to detect where the container metrics come from, and tag metrics automatically.
    #
    # https://docs.datadoghq.com/developers/dogstatsd/?tab=kubernetes#origin-detection-over-udp
John Harris's avatar
John Harris committed
    if ENV.key?('DD_ENTITY_ID')
      tags << "dd.internal.entity_id:#{ENV['DD_ENTITY_ID']}"
    end

    # Return nil if we have no environment variable tags. This way we don't
    # send an unnecessary '|' on the end of each stat
    return nil if tags.empty?

    tags.join(",")
  end

  def prefixed_metric_name(puma_metric)
    "#{@metric_prefix}#{puma_metric}"
  end

  # Send data to statsd every few seconds
  def stats_loop
    tags = environment_variable_tags

John Harris's avatar
John Harris committed
    sleep(5)
John Harris's avatar
John Harris committed
      @log_writer.debug("statsd: notify statsd")
      begin
        stats = ::PumaStats.new(Puma.stats_hash)
        @statsd.send(metric_name: prefixed_metric_name("puma.workers"), value: stats.workers, type: :gauge, tags: tags)
John Harris's avatar
John Harris committed
        @statsd.send(metric_name: prefixed_metric_name("puma.booted_workers"), value: stats.booted_workers,
type: :gauge, tags: tags)
        @statsd.send(metric_name: prefixed_metric_name("puma.old_workers"), value: stats.old_workers, type: :gauge,
tags: tags)
        @statsd.send(metric_name: prefixed_metric_name("puma.running"), value: stats.running, type: :gauge, tags: tags)
        @statsd.send(metric_name: prefixed_metric_name("puma.backlog"), value: stats.backlog, type: :gauge, tags: tags)
John Harris's avatar
John Harris committed
        @statsd.send(metric_name: prefixed_metric_name("puma.pool_capacity"), value: stats.pool_capacity, type: :gauge,
tags: tags)
        @statsd.send(metric_name: prefixed_metric_name("puma.max_threads"), value: stats.max_threads, type: :gauge,
tags: tags)
        @statsd.send(metric_name: prefixed_metric_name("puma.requests_count"), value: stats.requests_count,
type: :gauge, tags: tags)
      rescue StandardError => e
John Harris's avatar
John Harris committed
        @log_writer.unknown_error(e, nil, "! statsd: notify stats failed")
John Harris's avatar
John Harris committed
        sleep(2)