package redis_sentinel

import (
	"bufio"
	"bytes"
	"fmt"
	"os"
	"testing"
	"time"

	"github.com/stretchr/testify/require"
	"github.com/testcontainers/testcontainers-go/network"
	"github.com/testcontainers/testcontainers-go/wait"

	"github.com/influxdata/telegraf"
	"github.com/influxdata/telegraf/metric"
	"github.com/influxdata/telegraf/testutil"
)

const masterName = "mymaster"
const sentinelServicePort = "26379"

func TestRedisSentinelConnectIntegration(t *testing.T) {
	if testing.Short() {
		t.Skip("Skipping integration test in short mode")
	}

	net, err := network.New(t.Context())
	require.NoError(t, err)
	defer func() {
		require.NoError(t, net.Remove(t.Context()), "terminating network failed")
	}()

	redis := createRedisContainer(net.Name)
	require.NoError(t, redis.Start(), "failed to start container")
	defer redis.Terminate()

	firstSentinel := createSentinelContainer(redis.Name, net.Name, wait.ForAll(
		wait.ForLog("+monitor master"),
		wait.ForListeningPort(sentinelServicePort),
	))
	require.NoError(t, firstSentinel.Start(), "failed to start container")
	defer firstSentinel.Terminate()

	secondSentinel := createSentinelContainer(redis.Name, net.Name, wait.ForAll(
		wait.ForLog("+sentinel sentinel"),
		wait.ForListeningPort(sentinelServicePort),
	))
	require.NoError(t, secondSentinel.Start(), "failed to start container")
	defer secondSentinel.Terminate()

	addr := fmt.Sprintf("tcp://%s:%s", secondSentinel.Address, secondSentinel.Ports[sentinelServicePort])

	r := &RedisSentinel{
		Servers: []string{addr},
	}
	require.NoError(t, r.Init(), "failed to run Init function")

	var acc testutil.Accumulator
	require.NoError(t, acc.GatherError(r.Gather))

	require.True(t, acc.HasMeasurement("redis_sentinel_masters"), "redis_sentinel_masters measurement is missing")
	require.True(t, acc.HasMeasurement("redis_sentinel_sentinels"), "redis_sentinel_sentinels measurement is missing")
	require.True(t, acc.HasMeasurement("redis_sentinel"), "redis_sentinel measurement is missing")
}

func TestRedisSentinelMasters(t *testing.T) {
	now := time.Now()

	globalTags := map[string]string{
		"port":   "6379",
		"source": "redis.io",
	}

	expectedTags := map[string]string{
		"port":   "6379",
		"source": "redis.io",
		"master": masterName,
	}

	// has_quorum is a custom field
	expectedFields := map[string]interface{}{
		"config_epoch":            0,
		"down_after_milliseconds": 30000,
		"failover_timeout":        180000,
		"flags":                   "master",
		"info_refresh":            8819,
		"ip":                      "127.0.0.1",
		"last_ok_ping_reply":      174,
		"last_ping_reply":         174,
		"last_ping_sent":          0,
		"link_pending_commands":   0,
		"link_refcount":           1,
		"num_other_sentinels":     1,
		"num_slaves":              0,
		"parallel_syncs":          1,
		"port":                    6379,
		"quorum":                  2,
		"role_reported":           "master",
		"role_reported_time":      83138826,
		"has_quorum":              true,
	}

	expectedMetrics := []telegraf.Metric{
		metric.New(measurementMasters, expectedTags, expectedFields, now),
	}

	sentinelMastersOutput := map[string]string{
		"config_epoch":            "0",
		"down_after_milliseconds": "30000",
		"failover_timeout":        "180000",
		"flags":                   "master",
		"info_refresh":            "8819",
		"ip":                      "127.0.0.1",
		"last_ok_ping_reply":      "174",
		"last_ping_reply":         "174",
		"last_ping_sent":          "0",
		"link_pending_commands":   "0",
		"link_refcount":           "1",
		"name":                    "mymaster",
		"num_other_sentinels":     "1",
		"num_slaves":              "0",
		"parallel_syncs":          "1",
		"port":                    "6379",
		"quorum":                  "2",
		"role_reported":           "master",
		"role_reported_time":      "83138826",
		"runid":                   "ff3dadd1cfea3043de4d25711d93f01a564562f7",
	}

	sentinelTags, sentinelFields, sentinelErr := convertSentinelMastersOutput(globalTags, sentinelMastersOutput, nil)
	require.NoErrorf(t, sentinelErr, "failed converting output: %v", sentinelErr)

	actualMetrics := []telegraf.Metric{
		metric.New(measurementMasters, sentinelTags, sentinelFields, now),
	}

	testutil.RequireMetricsEqual(t, expectedMetrics, actualMetrics, testutil.IgnoreTime())
}

func TestRedisSentinels(t *testing.T) {
	now := time.Now()

	globalTags := make(map[string]string)

	expectedTags := map[string]string{
		"sentinel_ip":   "127.0.0.1",
		"sentinel_port": "26380",
		"master":        masterName,
	}
	expectedFields := map[string]interface{}{
		"name":                    "adfd343f6b6ecc77e2b9636de6d9f28d4b827521",
		"flags":                   "sentinel",
		"link_pending_commands":   0,
		"link_refcount":           1,
		"last_ping_sent":          0,
		"last_ok_ping_reply":      516,
		"last_ping_reply":         516,
		"down_after_milliseconds": 30000,
		"last_hello_message":      1905,
		"voted_leader":            "?",
		"voted_leader_epoch":      0,
	}

	expectedMetrics := []telegraf.Metric{
		metric.New(measurementSentinels, expectedTags, expectedFields, now),
	}

	sentinelsOutput := map[string]string{
		"name":                    "adfd343f6b6ecc77e2b9636de6d9f28d4b827521",
		"ip":                      "127.0.0.1",
		"port":                    "26380",
		"runid":                   "adfd343f6b6ecc77e2b9636de6d9f28d4b827521",
		"flags":                   "sentinel",
		"link_pending_commands":   "0",
		"link_refcount":           "1",
		"last_ping_sent":          "0",
		"last_ok_ping_reply":      "516",
		"last_ping_reply":         "516",
		"down_after_milliseconds": "30000",
		"last_hello_message":      "1905",
		"voted_leader":            "?",
		"voted_leader_epoch":      "0",
	}

	sentinelTags, sentinelFields, sentinelErr := convertSentinelSentinelsOutput(globalTags, masterName, sentinelsOutput)
	require.NoErrorf(t, sentinelErr, "failed converting output: %v", sentinelErr)

	actualMetrics := []telegraf.Metric{
		metric.New(measurementSentinels, sentinelTags, sentinelFields, now),
	}

	testutil.RequireMetricsEqual(t, expectedMetrics, actualMetrics)
}

func TestRedisSentinelReplicas(t *testing.T) {
	now := time.Now()

	globalTags := make(map[string]string)

	expectedTags := map[string]string{
		"replica_ip":   "127.0.0.1",
		"replica_port": "6380",
		"master":       masterName,
	}
	expectedFields := map[string]interface{}{
		"down_after_milliseconds": 30000,
		"flags":                   "slave",
		"info_refresh":            8476,
		"last_ok_ping_reply":      987,
		"last_ping_reply":         987,
		"last_ping_sent":          0,
		"link_pending_commands":   0,
		"link_refcount":           1,
		"master_host":             "127.0.0.1",
		"master_link_down_time":   0,
		"master_link_status":      "ok",
		"master_port":             6379,
		"name":                    "127.0.0.1:6380",
		"role_reported":           "slave",
		"role_reported_time":      10267432,
		"slave_priority":          100,
		"slave_repl_offset":       1392400,
	}

	expectedMetrics := []telegraf.Metric{
		metric.New(measurementReplicas, expectedTags, expectedFields, now),
	}

	replicasOutput := map[string]string{
		"down_after_milliseconds": "30000",
		"flags":                   "slave",
		"info_refresh":            "8476",
		"ip":                      "127.0.0.1",
		"last_ok_ping_reply":      "987",
		"last_ping_reply":         "987",
		"last_ping_sent":          "0",
		"link_pending_commands":   "0",
		"link_refcount":           "1",
		"master_host":             "127.0.0.1",
		"master_link_down_time":   "0",
		"master_link_status":      "ok",
		"master_port":             "6379",
		"name":                    "127.0.0.1:6380",
		"port":                    "6380",
		"role_reported":           "slave",
		"role_reported_time":      "10267432",
		"runid":                   "70e07dad9e450e2d35f1b75338e0a5341b59d710",
		"slave_priority":          "100",
		"slave_repl_offset":       "1392400",
	}

	sentinelTags, sentinelFields, sentinelErr := convertSentinelReplicaOutput(globalTags, masterName, replicasOutput)
	require.NoErrorf(t, sentinelErr, "failed converting output: %v", sentinelErr)

	actualMetrics := []telegraf.Metric{
		metric.New(measurementReplicas, sentinelTags, sentinelFields, now),
	}

	testutil.RequireMetricsEqual(t, expectedMetrics, actualMetrics)
}

func TestRedisSentinelInfoAll(t *testing.T) {
	now := time.Now()

	globalTags := map[string]string{
		"port":   "6379",
		"source": "redis.io",
	}

	expectedTags := map[string]string{
		"port":   "6379",
		"source": "redis.io",
	}

	expectedFields := map[string]interface{}{
		"lru_clock":     int64(15585808),
		"uptime_ns":     int64(901000000000),
		"redis_version": "5.0.5",

		"clients":                         int64(2),
		"client_recent_max_input_buffer":  int64(2),
		"client_recent_max_output_buffer": int64(0),
		"blocked_clients":                 int64(0),

		"used_cpu_sys":           float64(0.786872),
		"used_cpu_user":          float64(0.939455),
		"used_cpu_sys_children":  float64(0.000000),
		"used_cpu_user_children": float64(0.000000),

		"total_connections_received":     int64(2),
		"total_commands_processed":       int64(6),
		"instantaneous_ops_per_sec":      int64(0),
		"total_net_input_bytes":          int64(124),
		"total_net_output_bytes":         int64(10148),
		"instantaneous_input_kbps":       float64(0.00),
		"instantaneous_output_kbps":      float64(0.00),
		"rejected_connections":           int64(0),
		"sync_full":                      int64(0),
		"sync_partial_ok":                int64(0),
		"sync_partial_err":               int64(0),
		"expired_keys":                   int64(0),
		"expired_stale_perc":             float64(0.00),
		"expired_time_cap_reached_count": int64(0),
		"evicted_keys":                   int64(0),
		"keyspace_hits":                  int64(0),
		"keyspace_misses":                int64(0),
		"pubsub_channels":                int64(0),
		"pubsub_patterns":                int64(0),
		"latest_fork_usec":               int64(0),
		"migrate_cached_sockets":         int64(0),
		"slave_expires_tracked_keys":     int64(0),
		"active_defrag_hits":             int64(0),
		"active_defrag_misses":           int64(0),
		"active_defrag_key_hits":         int64(0),
		"active_defrag_key_misses":       int64(0),

		"sentinel_masters":                int64(2),
		"sentinel_running_scripts":        int64(0),
		"sentinel_scripts_queue_length":   int64(0),
		"sentinel_simulate_failure_flags": int64(0),
		"sentinel_tilt":                   int64(0),
	}

	expectedMetrics := []telegraf.Metric{
		metric.New(measurementSentinel, expectedTags, expectedFields, now),
	}

	sentinelInfoResponse, err := os.ReadFile("testdata/sentinel.info.response")
	require.NoErrorf(t, err, "could not init fixture: %v", err)

	rdr := bufio.NewReader(bytes.NewReader(sentinelInfoResponse))

	sentinelTags, sentinelFields, sentinelErr := convertSentinelInfoOutput(globalTags, rdr)
	require.NoErrorf(t, sentinelErr, "failed converting output: %v", sentinelErr)

	actualMetrics := []telegraf.Metric{
		metric.New(measurementSentinel, sentinelTags, sentinelFields, now),
	}

	testutil.RequireMetricsEqual(t, expectedMetrics, actualMetrics)
}

func createRedisContainer(networkName string) testutil.Container {
	return testutil.Container{
		Image:        "redis:7.0-alpine",
		Name:         "telegraf-test-redis-sentinel-redis",
		Networks:     []string{networkName},
		ExposedPorts: []string{"6379"},
		WaitingFor: wait.ForAll(
			wait.ForLog("Ready to accept connections"),
			wait.ForListeningPort("6379"),
		),
	}
}

func createSentinelContainer(redisAddress, networkName string, waitingFor wait.Strategy) testutil.Container {
	return testutil.Container{
		Image:        "bitnamilegacy/redis-sentinel:7.0",
		ExposedPorts: []string{sentinelServicePort},
		Networks:     []string{networkName},
		Env: map[string]string{
			"REDIS_MASTER_HOST": redisAddress,
		},
		WaitingFor: waitingFor,
	}
}
