#!/usr/bin/env ruby
# frozen_string_literal: true

require "bundler/setup"
require "benchmark/ips"
require "tmpdir"
require "socket"
require "statsd-instrument"
require "datadog/statsd"
require "forwardable"
require "vernier"

module Datadog
  class Statsd
    class Telemetry
      def flush
        ["bytes", "packets"].each do |kind|
          ["sent", "dropped", "dropped_queue", "dropped_writer"].each do |measure|
            var = "#{kind}_#{measure}"
            puts "#{var}: #{instance_variable_get("@#{var}".to_sym)}"
          end
        end
        puts "-" * 10

        []
      end
    end
  end
end

class DatadogShim
  extend Forwardable

  def_delegator :@client, :close
  # This is a shim to make the Datadog client compatible with the StatsD client
  # interface. It's not a complete implementation, but it's enough to run the
  # benchmarks.
  # @param [Datadog::Statsd] client
  def initialize(client)
    @client = client
  end

  class NullSink
    def flush(blocking: false)
    end
  end

  def sink
    @sink ||= NullSink.new
  end

  def increment(stat, value = 1, tags: nil)
    @client.increment(stat, value: value, tags: tags)
  end

  def measure(stat, value = nil, tags: nil, &block)
    @client.time(stat, value: value, tags: tags, &block)
  end

  def histogram(stat, value = nil, tags: nil, &block)
    @client.histogram(stat, value: value, tags: tags, &block)
  end

  def gauge(stat, value, tags: nil)
    @client.gauge(stat, value: value, tags: tags)
  end

  def set(stat, value, tags: nil)
    @client.set(stat, value: value, tags: tags)
  end

  def event(title, text, tags: nil)
    @client.event(title, text, tags: tags)
  end

  def service_check(name, status, tags: nil)
    @client.service_check(name, status, tags: tags)
  end
end

def send_metrics(client)
  client.increment("StatsD.increment", 10)
  client.measure("StatsD.measure") { 1 + 1 }
  client.gauge("StatsD.gauge", 12.0, tags: ["foo:bar", "quc"])
end

def send_metrics_high_cardinality(client)
  SERIES_COUNT.times do |i|
    tags = ["series:#{i}", "foo:bar", "baz:quc"]
    client.increment("StatsD.increment", 10, tags: tags)
    client.measure("StatsD.measure", tags: tags) { 1 + 1 }
    client.gauge("StatsD.gauge", 12.0, tags: tags)
  end
end

SOCKET_PATH = File.join(Dir.pwd, "tmp/metric.sock")
THREAD_COUNT = Integer(ENV.fetch("THREAD_COUNT", 5))
EVENTS_PER_ITERATION = 3
ITERATIONS = Integer(ENV.fetch("ITERATIONS", 10_000))
SERIES_COUNT = Integer(ENV.fetch("SERIES_COUNT", 0))
ENABLE_PROFILING = ENV.key?("ENABLE_PROFILING")
UDS_MAX_SEND_SIZE = 32_768

LOG_DIR = File.join(Dir.tmpdir, "statsd-instrument-benchmarks")
FileUtils.mkdir_p(LOG_DIR)
puts "Logs are stored in #{LOG_DIR}"

def benchmark_implementation(name, env = {}, datadog_client = false)
  intermediate_results_filename = "#{Dir.tmpdir}/statsd-instrument-benchmarks/"
  log_filename = File.join(LOG_DIR, "#{File.basename($PROGRAM_NAME)}-#{name}.log".tr(" ", "_"))
  FileUtils.mkdir_p(File.dirname(intermediate_results_filename))
  FileUtils.mkdir_p(File.dirname(log_filename))

  # Set up an UDP listener to which we can send StatsD packets
  receiver = UDPSocket.new
  receiver.bind("localhost", 0)

  FileUtils.mkdir_p(File.dirname(SOCKET_PATH))
  FileUtils.rm_f(SOCKET_PATH)
  receiver_uds = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM)
  receiver_uds.setsockopt(Socket::SOL_SOCKET, Socket::SO_REUSEADDR, true)
  receiver_uds.setsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF, UDS_MAX_SEND_SIZE * THREAD_COUNT)
  receiver_uds.bind(Socket.pack_sockaddr_un(SOCKET_PATH))
  # with UDS we have to take data out of the socket, otherwise it will fill up
  # and we will block writing to it (which is what we are testing)
  consume = Thread.new do
    loop do
      receiver_uds.recv(32768)
    rescue
      # Ignored
    end
  end

  log_file = File.open(log_filename, "w+", level: Logger::WARN)
  StatsD.logger = Logger.new(log_file)

  udp_client = StatsD::Instrument::Environment.new(ENV.to_h.merge(
    "STATSD_ADDR" => "#{receiver.addr[2]}:#{receiver.addr[1]}",
    "STATSD_IMPLEMENTATION" => "dogstatsd",
    "STATSD_ENV" => "production",
  ).merge(env)).client

  if datadog_client
    statsd = Datadog::Statsd.new(receiver.addr[2], receiver.addr[1], **env)
    udp_client = DatadogShim.new(statsd)
  end

  series = SERIES_COUNT.zero? ? 1 : SERIES_COUNT
  events_sent = THREAD_COUNT * EVENTS_PER_ITERATION * ITERATIONS * series
  puts "===== #{name} throughput (#{THREAD_COUNT} threads) - total events: #{events_sent} ====="
  start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  threads = THREAD_COUNT.times.map do
    Thread.new do
      count = ITERATIONS
      while (count -= 1) > 0
        if SERIES_COUNT.zero?
          send_metrics(udp_client)
        else
          send_metrics_high_cardinality(udp_client)
        end
      end
    end
  end

  threads.each(&:join)
  udp_client.shutdown if udp_client.respond_to?(:shutdown)
  if datadog_client
    udp_client.close
  end

  duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start

  consume.kill
  receiver.close
  receiver_uds.close

  series = SERIES_COUNT.zero? ? 1 : SERIES_COUNT
  events_sent = THREAD_COUNT * EVENTS_PER_ITERATION * ITERATIONS * series
  puts "events: #{(events_sent / duration).round(1).to_s.reverse.gsub(/(\d{3})(?=\d)/, '\\1,').reverse}/s"
end

if ENABLE_PROFILING
  Vernier.start_profile(out: "tmp/benchmark_profile_udp_sync.json")
end
benchmark_implementation("UDP sync", "STATSD_BUFFER_CAPACITY" => "0")
if ENABLE_PROFILING
  Vernier.stop_profile
end

if ENABLE_PROFILING
  Vernier.start_profile(out: "tmp/benchmark_profile_udp_async.json")
end
benchmark_implementation("UDP batched")
if ENABLE_PROFILING
  Vernier.stop_profile
end

if ENABLE_PROFILING
  Vernier.start_profile(out: "tmp/benchmark_profile_uds_small_packet.json")
end
benchmark_implementation("UDS batched with small packet", "STATSD_SOCKET_PATH" => SOCKET_PATH)
if ENABLE_PROFILING
  Vernier.stop_profile
end

if ENABLE_PROFILING
  Vernier.start_profile(out: "tmp/benchmark_profile_uds_batched_async.json")
end
benchmark_implementation(
  "UDS batched with jumbo packet",
  "STATSD_SOCKET_PATH" => SOCKET_PATH,
  "STATSD_MAX_PACKET_SIZE" => UDS_MAX_SEND_SIZE.to_s,
)
if ENABLE_PROFILING
  Vernier.stop_profile
end

if ENABLE_PROFILING
  Vernier.start_profile(out: "tmp/benchmark_udp_batched_with_aggregation.json")
end
benchmark_implementation(
  "UDP batched with aggregation and 5 second interval",
  "STATSD_ENABLE_AGGREGATION" => "true",
  "STATSD_AGGREGATION_FLUSH_INTERVAL" => "5",
)
if ENABLE_PROFILING
  Vernier.stop_profile
end

if ENABLE_PROFILING
  Vernier.start_profile(out: "tmp/benchmark_uds_with_aggregation.json")
end
benchmark_implementation(
  "UDS batched with aggregation and 5 second interval",
  "STATSD_ENABLE_AGGREGATION" => "true",
  "STATSD_AGGREGATION_FLUSH_INTERVAL" => "5",
  "STATSD_SOCKET_PATH" => SOCKET_PATH,
  "STATSD_MAX_PACKET_SIZE" => UDS_MAX_SEND_SIZE.to_s,
)
if ENABLE_PROFILING
  Vernier.stop_profile
end

if ENABLE_PROFILING
  Vernier.start_profile(out: "tmp/benchmark_profile_datadog.json")
end
benchmark_implementation(
  "Datadog client with UDP and delayed serialization",
  {
    buffer_max_payload_size: 4096,
    delay_serialization: true,
    telemetry_flush_interval: 1,
    # The datadog implemtation will drop metrics if the queue is full
    # to imitate the behavior of the implementation of this gem, we set
    # the queue size to infinity to avoid dropping metrics.
    sender_queue_size: Float::INFINITY,
  },
  true,
)
if ENABLE_PROFILING
  Vernier.stop_profile
end

if ENABLE_PROFILING
  Vernier.start_profile(out: "tmp/benchmark_profile_datadog.json")
end
benchmark_implementation(
  "UDP - Datadog gem - using: delay_serialization, multi-threaded, allow dropping samples",
  {
    buffer_max_payload_size: 4096,
    delay_serialization: true,
    telemetry_flush_interval: 1,
    sender_queue_size: 250_000,
  },
  true,
)
if ENABLE_PROFILING
  Vernier.stop_profile
end
