package elasticsearch

import (
	"io"
	"net/http"
	"strings"
	"testing"

	"github.com/stretchr/testify/require"

	"github.com/influxdata/telegraf/testutil"
)

func defaultTags() map[string]string {
	return map[string]string{
		"cluster_name":          "es-testcluster",
		"node_attribute_master": "true",
		"node_id":               "SDFsfSDFsdfFSDSDfSFDSDF",
		"node_name":             "test.host.com",
		"node_host":             "test",
		"node_roles":            "data,ingest,master",
	}
}
func defaultServerInfo() serverInfo {
	return serverInfo{nodeID: "", masterID: "SDFsfSDFsdfFSDSDfSFDSDF"}
}

type transportMock struct {
	statusCode int
	body       string
}

func newTransportMock(body string) http.RoundTripper {
	return &transportMock{
		statusCode: http.StatusOK,
		body:       body,
	}
}

func (t *transportMock) RoundTrip(r *http.Request) (*http.Response, error) {
	res := &http.Response{
		Header:     make(http.Header),
		Request:    r,
		StatusCode: t.statusCode,
	}
	res.Header.Set("Content-Type", "application/json")
	res.Body = io.NopCloser(strings.NewReader(t.body))
	return res, nil
}

func checkNodeStatsResult(t *testing.T, acc *testutil.Accumulator) {
	tags := defaultTags()
	acc.AssertContainsTaggedFields(t, "elasticsearch_indices", nodestatsIndicesExpected, tags)
	acc.AssertContainsTaggedFields(t, "elasticsearch_os", nodestatsOsExpected, tags)
	acc.AssertContainsTaggedFields(t, "elasticsearch_process", nodestatsProcessExpected, tags)
	acc.AssertContainsTaggedFields(t, "elasticsearch_jvm", nodestatsJvmExpected, tags)
	acc.AssertContainsTaggedFields(t, "elasticsearch_thread_pool", nodestatsThreadPoolExpected, tags)
	acc.AssertContainsTaggedFields(t, "elasticsearch_fs", nodestatsFsExpected, tags)
	acc.AssertContainsTaggedFields(t, "elasticsearch_transport", nodestatsTransportExpected, tags)
	acc.AssertContainsTaggedFields(t, "elasticsearch_http", nodestatsHTTPExpected, tags)
	acc.AssertContainsTaggedFields(t, "elasticsearch_breakers", nodestatsBreakersExpected, tags)
}

func TestGather(t *testing.T) {
	es := newElasticsearchWithClient()
	es.Servers = []string{"http://example.com:9200"}
	es.client.Transport = newTransportMock(nodeStatsResponse)
	es.serverInfo = make(map[string]serverInfo)
	es.serverInfo["http://example.com:9200"] = defaultServerInfo()

	var acc testutil.Accumulator
	require.NoError(t, acc.GatherError(es.Gather))
	require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")
	checkNodeStatsResult(t, &acc)
}

func TestGatherIndividualStats(t *testing.T) {
	es := newElasticsearchWithClient()
	es.Servers = []string{"http://example.com:9200"}
	es.NodeStats = []string{"jvm", "process"}
	es.client.Transport = newTransportMock(nodeStatsResponseJVMProcess)
	es.serverInfo = make(map[string]serverInfo)
	es.serverInfo["http://example.com:9200"] = defaultServerInfo()

	var acc testutil.Accumulator
	require.NoError(t, acc.GatherError(es.Gather))
	require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")

	tags := defaultTags()
	acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_indices", nodestatsIndicesExpected, tags)
	acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_os", nodestatsOsExpected, tags)
	acc.AssertContainsTaggedFields(t, "elasticsearch_process", nodestatsProcessExpected, tags)
	acc.AssertContainsTaggedFields(t, "elasticsearch_jvm", nodestatsJvmExpected, tags)
	acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_thread_pool", nodestatsThreadPoolExpected, tags)
	acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_fs", nodestatsFsExpected, tags)
	acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_transport", nodestatsTransportExpected, tags)
	acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_http", nodestatsHTTPExpected, tags)
	acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_breakers", nodestatsBreakersExpected, tags)
}

func TestGatherEnrichStats(t *testing.T) {
	es := newElasticsearchWithClient()
	es.Servers = []string{"http://example.com:9200"}
	es.EnrichStats = true
	es.client.Transport = newTransportMock(enrichStatsResponse)
	es.serverInfo = make(map[string]serverInfo)
	es.serverInfo["http://example.com:9200"] = defaultServerInfo()

	var acc testutil.Accumulator
	require.NoError(t, acc.GatherError(es.Gather))
	require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")

	metrics := acc.GetTelegrafMetrics()
	require.Len(t, metrics, 8)
}

func TestGatherNodeStats(t *testing.T) {
	es := newElasticsearchWithClient()
	es.Servers = []string{"http://example.com:9200"}
	es.client.Transport = newTransportMock(nodeStatsResponse)
	es.serverInfo = make(map[string]serverInfo)
	es.serverInfo["http://example.com:9200"] = defaultServerInfo()

	var acc testutil.Accumulator
	require.NoError(t, es.gatherNodeStats("junk", &acc))
	require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")
	checkNodeStatsResult(t, &acc)
}

func TestGatherClusterHealthEmptyClusterHealth(t *testing.T) {
	es := newElasticsearchWithClient()
	es.Servers = []string{"http://example.com:9200"}
	es.ClusterHealth = true
	es.ClusterHealthLevel = ""
	es.client.Transport = newTransportMock(clusterHealthResponse)
	es.serverInfo = make(map[string]serverInfo)
	es.serverInfo["http://example.com:9200"] = defaultServerInfo()

	var acc testutil.Accumulator
	require.NoError(t, es.gatherClusterHealth("junk", &acc))
	require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")

	acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health",
		clusterHealthExpected,
		map[string]string{"name": "elasticsearch_telegraf"})

	acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_cluster_health_indices",
		v1IndexExpected,
		map[string]string{"index": "v1"})

	acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_cluster_health_indices",
		v2IndexExpected,
		map[string]string{"index": "v2"})
}

func TestGatherClusterHealthSpecificClusterHealth(t *testing.T) {
	es := newElasticsearchWithClient()
	es.Servers = []string{"http://example.com:9200"}
	es.ClusterHealth = true
	es.ClusterHealthLevel = "cluster"
	es.client.Transport = newTransportMock(clusterHealthResponse)
	es.serverInfo = make(map[string]serverInfo)
	es.serverInfo["http://example.com:9200"] = defaultServerInfo()

	var acc testutil.Accumulator
	require.NoError(t, es.gatherClusterHealth("junk", &acc))
	require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")

	acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health",
		clusterHealthExpected,
		map[string]string{"name": "elasticsearch_telegraf"})

	acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_cluster_health_indices",
		v1IndexExpected,
		map[string]string{"index": "v1"})

	acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_cluster_health_indices",
		v2IndexExpected,
		map[string]string{"index": "v2"})
}

func TestGatherClusterHealthAlsoIndicesHealth(t *testing.T) {
	es := newElasticsearchWithClient()
	es.Servers = []string{"http://example.com:9200"}
	es.ClusterHealth = true
	es.ClusterHealthLevel = "indices"
	es.client.Transport = newTransportMock(clusterHealthResponseWithIndices)
	es.serverInfo = make(map[string]serverInfo)
	es.serverInfo["http://example.com:9200"] = defaultServerInfo()

	var acc testutil.Accumulator
	require.NoError(t, es.gatherClusterHealth("junk", &acc))
	require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")

	acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health",
		clusterHealthExpected,
		map[string]string{"name": "elasticsearch_telegraf"})

	acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health_indices",
		v1IndexExpected,
		map[string]string{"index": "v1", "name": "elasticsearch_telegraf"})

	acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health_indices",
		v2IndexExpected,
		map[string]string{"index": "v2", "name": "elasticsearch_telegraf"})
}

func TestGatherClusterStatsMaster(t *testing.T) {
	// This needs multiple steps to replicate the multiple calls internally.
	es := newElasticsearchWithClient()
	es.ClusterStats = true
	es.Servers = []string{"http://example.com:9200"}
	es.serverInfo = make(map[string]serverInfo)
	info := serverInfo{nodeID: "SDFsfSDFsdfFSDSDfSFDSDF", masterID: ""}

	// first get catMaster
	es.client.Transport = newTransportMock(IsMasterResult)
	masterID, err := es.getCatMaster("junk")
	require.NoError(t, err)
	info.masterID = masterID
	es.serverInfo["http://example.com:9200"] = info

	isMasterResultTokens := strings.Split(IsMasterResult, " ")
	require.Equal(t, masterID, isMasterResultTokens[0], "catmaster is incorrect")

	// now get node status, which determines whether we're master
	var acc testutil.Accumulator
	es.Local = true
	es.client.Transport = newTransportMock(nodeStatsResponse)
	require.NoError(t, es.gatherNodeStats("junk", &acc))
	require.True(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")
	checkNodeStatsResult(t, &acc)

	// now test the clusterstats method
	es.client.Transport = newTransportMock(clusterStatsResponse)
	require.NoError(t, es.gatherClusterStats("junk", &acc))

	tags := map[string]string{
		"cluster_name": "es-testcluster",
		"node_name":    "test.host.com",
		"status":       "red",
	}

	acc.AssertContainsTaggedFields(t, "elasticsearch_clusterstats_nodes", clusterstatsNodesExpected, tags)
	acc.AssertContainsTaggedFields(t, "elasticsearch_clusterstats_indices", clusterstatsIndicesExpected, tags)
}

func TestGatherClusterStatsNonMaster(t *testing.T) {
	// This needs multiple steps to replicate the multiple calls internally.
	es := newElasticsearchWithClient()
	es.ClusterStats = true
	es.Servers = []string{"http://example.com:9200"}
	es.serverInfo = make(map[string]serverInfo)
	es.serverInfo["http://example.com:9200"] = serverInfo{nodeID: "SDFsfSDFsdfFSDSDfSFDSDF", masterID: ""}

	// first get catMaster
	es.client.Transport = newTransportMock(IsNotMasterResult)
	masterID, err := es.getCatMaster("junk")
	require.NoError(t, err)

	isNotMasterResultTokens := strings.Split(IsNotMasterResult, " ")
	require.Equal(t, masterID, isNotMasterResultTokens[0], "catmaster is incorrect")

	// now get node status, which determines whether we're master
	var acc testutil.Accumulator
	es.Local = true
	es.client.Transport = newTransportMock(nodeStatsResponse)
	require.NoError(t, es.gatherNodeStats("junk", &acc))

	// ensure flag is clear so Cluster Stats would not be done
	require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")
	checkNodeStatsResult(t, &acc)
}

func TestGatherClusterIndicesStats(t *testing.T) {
	es := newElasticsearchWithClient()
	es.IndicesInclude = []string{"_all"}
	es.Servers = []string{"http://example.com:9200"}
	es.client.Transport = newTransportMock(clusterIndicesResponse)
	es.serverInfo = make(map[string]serverInfo)
	es.serverInfo["http://example.com:9200"] = defaultServerInfo()

	var acc testutil.Accumulator
	require.NoError(t, es.gatherIndicesStats("junk", &acc))

	acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
		clusterIndicesExpected,
		map[string]string{"index_name": "twitter"})
}

func TestGatherDateStampedIndicesStats(t *testing.T) {
	es := newElasticsearchWithClient()
	es.IndicesInclude = []string{"twitter*", "influx*", "penguins"}
	es.NumMostRecentIndices = 2
	es.Servers = []string{"http://example.com:9200"}
	es.client.Transport = newTransportMock(dateStampedIndicesResponse)
	es.serverInfo = make(map[string]serverInfo)
	es.serverInfo["http://example.com:9200"] = defaultServerInfo()
	require.NoError(t, es.Init())

	var acc testutil.Accumulator
	require.NoError(t, es.gatherIndicesStats(es.Servers[0]+"/"+strings.Join(es.IndicesInclude, ",")+"/_stats", &acc))

	// includes 2 most recent indices for "twitter", only expect the most recent two.
	acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
		clusterIndicesExpected,
		map[string]string{"index_name": "twitter_2020_08_02"})
	acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
		clusterIndicesExpected,
		map[string]string{"index_name": "twitter_2020_08_01"})
	acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
		clusterIndicesExpected,
		map[string]string{"index_name": "twitter_2020_07_31"})

	// includes 2 most recent indices for "influx", only expect the most recent two.
	acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
		clusterIndicesExpected,
		map[string]string{"index_name": "influx2021.01.02"})
	acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
		clusterIndicesExpected,
		map[string]string{"index_name": "influx2021.01.01"})
	acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
		clusterIndicesExpected,
		map[string]string{"index_name": "influx2020.12.31"})

	// not configured to sort the 'penguins' index, but ensure it is also included.
	acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
		clusterIndicesExpected,
		map[string]string{"index_name": "penguins"})
}

func TestGatherClusterIndiceShardsStats(t *testing.T) {
	es := newElasticsearchWithClient()
	es.IndicesLevel = "shards"
	es.Servers = []string{"http://example.com:9200"}
	es.client.Transport = newTransportMock(clusterIndicesShardsResponse)
	es.serverInfo = make(map[string]serverInfo)
	es.serverInfo["http://example.com:9200"] = defaultServerInfo()

	var acc testutil.Accumulator
	require.NoError(t, es.gatherIndicesStats("junk", &acc))

	acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
		clusterIndicesExpected,
		map[string]string{"index_name": "twitter"})

	primaryTags := map[string]string{
		"index_name": "twitter",
		"node_id":    "oqvR8I1dTpONvwRM30etww",
		"shard_name": "0",
		"type":       "primary",
	}

	acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_shards",
		clusterIndicesPrimaryShardsExpected,
		primaryTags)

	replicaTags := map[string]string{
		"index_name": "twitter",
		"node_id":    "oqvR8I1dTpONvwRM30etww",
		"shard_name": "1",
		"type":       "replica",
	}
	acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_shards",
		clusterIndicesReplicaShardsExpected,
		replicaTags)
}

func newElasticsearchWithClient() *Elasticsearch {
	es := newElasticsearch()
	es.client = &http.Client{}
	return es
}
