package zabbix

import (
	"encoding/binary"
	"encoding/json"
	"errors"
	"net"
	"os"
	"path/filepath"
	"sort"
	"strings"
	"sync"
	"sync/atomic"
	"testing"
	"time"

	"github.com/datadope-io/go-zabbix/v2"
	"github.com/stretchr/testify/require"

	"github.com/influxdata/telegraf"
	"github.com/influxdata/telegraf/config"
	"github.com/influxdata/telegraf/metric"
	"github.com/influxdata/telegraf/plugins/outputs"
	"github.com/influxdata/telegraf/plugins/parsers/influx"
	"github.com/influxdata/telegraf/testutil"
)

func TestSuccessfulReceive(t *testing.T) {
	hostname, err := os.Hostname()
	require.NoError(t, err)

	tests := []struct {
		name                  string
		prefix                string
		agentActive           bool
		skipMeasurementPrefix bool
		input                 []telegraf.Metric
		expected              []zabbix.Packet
	}{
		{
			name: "send one metric with one field and no extra tags, generates one zabbix metric",
			input: []telegraf.Metric{
				metric.New(
					"name",
					map[string]string{
						"host": "hostname",
					},
					map[string]interface{}{
						"value": int64(0),
					},
					time.Unix(1522082244, 0),
				),
			},
			expected: []zabbix.Packet{
				{
					Request: "sender data",
					Data: []*zabbix.Metric{
						{
							Host:  "hostname",
							Key:   "name.value",
							Value: "0",
							Clock: 1522082244,
						},
					},
				},
			},
		},
		{
			name: "string values representing a float number should be sent in the exact same format",
			input: []telegraf.Metric{
				metric.New(
					"name",
					map[string]string{
						"host": "hostname",
					},
					map[string]interface{}{
						"value": "3.1415",
					},
					time.Unix(1522082244, 0),
				),
			},
			expected: []zabbix.Packet{
				{
					Request: "sender data",
					Data: []*zabbix.Metric{
						{
							Host:  "hostname",
							Key:   "name.value",
							Value: "3.1415",
							Clock: 1522082244,
						},
					},
				},
			},
		},
		{
			name: "send one metric with one string field and no extra tags, generates one zabbix metric",
			input: []telegraf.Metric{
				metric.New(
					"name",
					map[string]string{
						"host": "hostname",
					},
					map[string]interface{}{
						"value": "some value",
					},
					time.Unix(1522082244, 0),
				),
			},
			expected: []zabbix.Packet{
				{
					Request: "sender data",
					Data: []*zabbix.Metric{
						{
							Host:  "hostname",
							Key:   "name.value",
							Value: "some value",
							Clock: 1522082244,
						},
					},
				},
			},
		},
		{
			name: "boolean values are converted to 1 (true) or 0 (false)",
			input: []telegraf.Metric{
				metric.New(
					"name",
					map[string]string{
						"host": "hostname",
					},
					map[string]interface{}{
						"valueTrue":  true,
						"valueFalse": false,
					},
					time.Unix(1522082244, 0),
				),
			},
			expected: []zabbix.Packet{
				{
					Request: "sender data",
					Data: []*zabbix.Metric{
						{
							Host:  "hostname",
							Key:   "name.valueTrue",
							Value: "true",
							Clock: 1522082244,
						},
						{
							Host:  "hostname",
							Key:   "name.valueFalse",
							Value: "false",
							Clock: 1522082244,
						},
					},
				},
			},
		},
		{
			name: "metrics without host tag use the system hostname",
			input: []telegraf.Metric{
				metric.New(
					"name",
					map[string]string{},
					map[string]interface{}{
						"value": "x",
					},
					time.Unix(1522082244, 0),
				),
			},
			expected: []zabbix.Packet{
				{
					Request: "sender data",
					Data: []*zabbix.Metric{
						{
							Host:  hostname,
							Key:   "name.value",
							Value: "x",
							Clock: 1522082244,
						},
					},
				},
			},
		},
		{
			name: "send one metric with extra tags, zabbix metric should be generated with a parameter",
			input: []telegraf.Metric{
				metric.New(
					"name",
					map[string]string{
						"host": "hostname",
						"foo":  "bar",
					},
					map[string]interface{}{
						"value": int64(0),
					},
					time.Unix(1522082244, 0),
				),
			},
			expected: []zabbix.Packet{
				{
					Request: "sender data",
					Data: []*zabbix.Metric{
						{
							Host:  "hostname",
							Key:   "name.value[bar]",
							Value: "0",
							Clock: 1522082244,
						},
					},
				},
			},
		},
		{
			name: "send one metric with two extra tags, zabbix parameters should be alphabetically ordered",
			input: []telegraf.Metric{
				metric.New(
					"name",
					map[string]string{
						"host":   "hostname",
						"zparam": "last",
						"aparam": "first",
					},
					map[string]interface{}{
						"value": int64(0),
					},
					time.Unix(1522082244, 0),
				),
			},
			expected: []zabbix.Packet{
				{
					Request: "sender data",
					Data: []*zabbix.Metric{
						{
							Host:  "hostname",
							Key:   "name.value[first,last]",
							Value: "0",
							Clock: 1522082244,
						},
					},
				},
			},
		},
		{
			name: "send one metric with two fields and no extra tags, generates two zabbix metrics",
			input: []telegraf.Metric{
				metric.New(
					"name",
					map[string]string{
						"host": "hostname",
					},
					map[string]interface{}{
						"valueA": int64(0),
						"valueB": int64(1),
					},
					time.Unix(1522082244, 0),
				),
			},
			expected: []zabbix.Packet{
				{
					Request: "sender data",
					Data: []*zabbix.Metric{
						{
							Host:  "hostname",
							Key:   "name.valueA",
							Value: "0",
							Clock: 1522082244,
						},
						{
							Host:  "hostname",
							Key:   "name.valueB",
							Value: "1",
							Clock: 1522082244,
						},
					},
				},
			},
		},
		{
			name: "send two metrics with one field and no extra tags, generates two zabbix metrics",
			input: []telegraf.Metric{
				metric.New(
					"nameA",
					map[string]string{
						"host": "hostname",
					},
					map[string]interface{}{
						"value": int64(0),
					},
					time.Unix(1522082244, 0),
				),
				metric.New(
					"nameB",
					map[string]string{
						"host": "hostname",
					},
					map[string]interface{}{
						"value": int64(0),
					},
					time.Unix(1522082244, 0),
				),
			},
			expected: []zabbix.Packet{
				{
					Request: "sender data",
					Data: []*zabbix.Metric{
						{
							Host:  "hostname",
							Key:   "nameA.value",
							Value: "0",
							Clock: 1522082244,
						},
						{
							Host:  "hostname",
							Key:   "nameB.value",
							Value: "0",
							Clock: 1522082244,
						},
					},
				},
			},
		},
		{
			name: "send two metrics with different hostname, generates two zabbix metrics for different hosts",
			input: []telegraf.Metric{
				metric.New(
					"name",
					map[string]string{
						"host": "hostnameA",
					},
					map[string]interface{}{
						"value": int64(0),
					},
					time.Unix(1522082244, 0),
				),
				metric.New(
					"name",
					map[string]string{
						"host": "hostnameB",
					},
					map[string]interface{}{
						"value": int64(0),
					},
					time.Unix(1522082244, 0),
				),
			},
			expected: []zabbix.Packet{
				{
					Request: "sender data",
					Data: []*zabbix.Metric{
						{
							Host:  "hostnameA",
							Key:   "name.value",
							Value: "0",
							Clock: 1522082244,
						},
						{
							Host:  "hostnameB",
							Key:   "name.value",
							Value: "0",
							Clock: 1522082244,
						},
					},
				},
			},
		},
		{
			name:   "if key_prefix is configured, zabbix metrics should have that prefix in the key",
			prefix: "telegraf.",
			input: []telegraf.Metric{
				metric.New(
					"name",
					map[string]string{
						"host": "hostname",
					},
					map[string]interface{}{
						"value": int64(0),
					},
					time.Unix(1522082244, 0),
				),
			},
			expected: []zabbix.Packet{
				{
					Request: "sender data",
					Data: []*zabbix.Metric{
						{
							Host:  "hostname",
							Key:   "telegraf.name.value",
							Value: "0",
							Clock: 1522082244,
						},
					},
				},
			},
		},
		{
			name:                  "if skip_measurement_prefix is configured, zabbix metrics should have to skip that prefix in the key",
			skipMeasurementPrefix: true,
			input: []telegraf.Metric{
				metric.New(
					"name",
					map[string]string{
						"host": "hostname",
					},
					map[string]interface{}{
						"value": int64(0),
					},
					time.Unix(1522082244, 0),
				),
			},
			expected: []zabbix.Packet{
				{
					Request: "sender data",
					Data: []*zabbix.Metric{
						{
							Host:  "hostname",
							Key:   "value",
							Value: "0",
							Clock: 1522082244,
						},
					},
				},
			},
		},
		{
			name:        "if AgentActive is configured, zabbix metrics should be sent respecting that protocol",
			agentActive: true,
			input: []telegraf.Metric{
				metric.New(
					"name",
					map[string]string{
						"host": "hostname",
					},
					map[string]interface{}{
						"value": int64(0),
					},
					time.Unix(1522082244, 0),
				),
			},
			expected: []zabbix.Packet{
				{
					Request: "agent data",
					Data: []*zabbix.Metric{
						{
							Host:  "hostname",
							Key:   "name.value",
							Value: "0",
							Clock: 1522082244,
						},
					},
				},
			},
		},
		{
			name: "metrics should be time sorted, oldest to newest, to avoid zabbix doing extra work when generating trends",
			input: []telegraf.Metric{
				metric.New(
					"name",
					map[string]string{
						"host": "hostnameD",
					},
					map[string]interface{}{
						"value": int64(0),
					},
					time.Unix(4444444444, 0),
				),
				metric.New(
					"name",
					map[string]string{
						"host": "hostnameC",
					},
					map[string]interface{}{
						"value": int64(0),
					},
					time.Unix(3333333333, 0),
				),
				metric.New(
					"name",
					map[string]string{
						"host": "hostnameA",
					},
					map[string]interface{}{
						"value": int64(0),
					},
					time.Unix(1111111111, 0),
				),
				metric.New(
					"name",
					map[string]string{
						"host": "hostnameB",
					},
					map[string]interface{}{
						"value": int64(0),
					},
					time.Unix(2222222222, 0),
				),
			},
			expected: []zabbix.Packet{
				{
					Request: "sender data",
					Data: []*zabbix.Metric{
						{
							Host:  "hostnameA",
							Key:   "name.value",
							Value: "0",
							Clock: 1111111111,
						},
						{
							Host:  "hostnameB",
							Key:   "name.value",
							Value: "0",
							Clock: 2222222222,
						},
						{
							Host:  "hostnameC",
							Key:   "name.value",
							Value: "0",
							Clock: 3333333333,
						},
						{
							Host:  "hostnameD",
							Key:   "name.value",
							Value: "0",
							Clock: 4444444444,
						},
					},
				},
			},
		},
	}

	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			// Setup a Zabbix mock server and start listening
			server, err := newZabbixMockServer()
			require.NoError(t, err)
			defer server.close()

			var wg sync.WaitGroup
			wg.Add(1)
			go func() {
				defer wg.Done()
				server.listen()
			}()

			// Setup the plugin
			plugin := &Zabbix{
				Address:               server.addr(),
				KeyPrefix:             tt.prefix,
				HostTag:               "host",
				SkipMeasurementPrefix: tt.skipMeasurementPrefix,
				AgentActive:           tt.agentActive,
				LLDSendInterval:       config.Duration(10 * time.Minute),
				Log:                   testutil.Logger{},
			}
			require.NoError(t, plugin.Init())

			// Connect and write the data
			require.NoError(t, plugin.Connect())
			defer plugin.Close()
			require.NoError(t, plugin.Write(tt.input))

			// Wait for the data to arrive
			require.Eventually(t, func() bool {
				return server.count.Load() > 0
			}, 3*time.Second, 100*time.Millisecond, "nothing received")

			// Stop listening
			server.listener.Close()
			wg.Wait()

			// Check the received data
			server.Lock()
			defer server.Unlock()

			require.Empty(t, server.errs, "server had errors")
			requireRequestDataEqual(t, tt.expected, server.received, false)
		})
	}
}

func TestInvalidData(t *testing.T) {
	input := []telegraf.Metric{
		metric.New(
			"name",
			map[string]string{
				"host": "hostname",
			},
			map[string]interface{}{
				"value": []int{1, 2},
			},
			time.Unix(1522082244, 0),
		),
	}

	// Setup a Zabbix mock server and start listening
	server, err := newZabbixMockServer()
	require.NoError(t, err)
	defer server.close()

	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		server.listen()
	}()

	// Setup the plugin
	plugin := &Zabbix{
		Address:         server.addr(),
		HostTag:         "host",
		LLDSendInterval: config.Duration(10 * time.Minute),
		Log:             testutil.Logger{},
	}
	require.NoError(t, plugin.Init())

	// Connect and write the data
	require.NoError(t, plugin.Connect())
	defer plugin.Close()
	require.NoError(t, plugin.Write(input))
	require.NoError(t, plugin.Close())

	// Stop listening
	server.listener.Close()
	wg.Wait()

	// Check the received data
	server.Lock()
	defer server.Unlock()

	require.Empty(t, server.errs, "server had errors")
	require.Empty(t, server.received)
}

// TestLLD tests how LLD metrics are sent simulating the time passing.
// LLD is sent each LLDSendInterval. Only new data.
// LLD data is cleared LLDClearInterval.
func TestLLD(t *testing.T) {
	// Telegraf metric which will be sent repeatedly
	m := metric.New(
		"name",
		map[string]string{"host": "hostA", "foo": "bar"},
		map[string]interface{}{"value": int64(0)},
		time.Unix(0, 0),
	)

	mNew := metric.New(
		"name",
		map[string]string{"host": "hostA", "foo": "moo"},
		map[string]interface{}{"value": int64(0)},
		time.Unix(0, 0),
	)

	// Expected Zabbix metric generated
	expected := []zabbix.Packet{
		{
			Request: "sender data",
			Data: []*zabbix.Metric{
				{
					Host:  "hostA",
					Key:   "telegraf.name.value[bar]",
					Value: "0",
				},
			},
		},
		{
			Request: "sender data",
			Data: []*zabbix.Metric{
				{
					Host:  "hostA",
					Key:   "telegraf.name.value[bar]",
					Value: "0",
				},
			},
		},
		{
			Request: "sender data",
			Data: []*zabbix.Metric{
				{
					Host:  "hostA",
					Key:   "telegraf.name.value[bar]",
					Value: "0",
				},
				{
					Host:  "hostA",
					Key:   "telegraf.lld.name.foo",
					Value: `{"data":[{"{#FOO}":"bar"}]}`,
				},
			},
		},
		{
			Request: "sender data",
			Data: []*zabbix.Metric{
				{
					Host:  "hostA",
					Key:   "telegraf.name.value[bar]",
					Value: "0",
				},
			},
		},
		{
			Request: "sender data",
			Data: []*zabbix.Metric{
				{
					Host:  "hostA",
					Key:   "telegraf.name.value[bar]",
					Value: "0",
				},
			},
		},
		{
			Request: "sender data",
			Data: []*zabbix.Metric{
				{
					Host:  "hostA",
					Key:   "telegraf.name.value[moo]",
					Value: "0",
				},
			},
		},
		{
			Request: "sender data",
			Data: []*zabbix.Metric{
				{
					Host:  "hostA",
					Key:   "telegraf.name.value[bar]",
					Value: "0",
				},
				{
					Host:  "hostA",
					Key:   "telegraf.lld.name.foo",
					Value: `{"data":[{"{#FOO}":"bar"},{"{#FOO}":"moo"}]}`,
				},
			},
		},
		{
			Request: "sender data",
			Data: []*zabbix.Metric{
				{
					Host:  "hostA",
					Key:   "telegraf.name.value[bar]",
					Value: "0",
				},
			},
		},
		{
			Request: "sender data",
			Data: []*zabbix.Metric{
				{
					Host:  "hostA",
					Key:   "telegraf.name.value[bar]",
					Value: "0",
				},
				{
					Host:  "hostA",
					Key:   "telegraf.lld.name.foo",
					Value: `{"data":[{"{#FOO}":"bar"}]}`,
				},
			},
		},
	}

	// Setup a Zabbix mock server and start listening
	server, err := newZabbixMockServer()
	require.NoError(t, err)
	defer server.close()

	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		server.listen()
	}()

	// Setup plugin
	plugin := &Zabbix{
		Address:          server.addr(),
		KeyPrefix:        "telegraf.",
		HostTag:          "host",
		LLDSendInterval:  config.Duration(10 * time.Minute),
		LLDClearInterval: config.Duration(1 * time.Hour),
		Log:              testutil.Logger{},
	}
	require.NoError(t, plugin.Init())

	// Connect and write the metrics
	require.NoError(t, plugin.Connect())
	defer plugin.Close()

	// First packet
	require.NoError(t, plugin.Write([]telegraf.Metric{m}))

	// Second packet, while time has not surpassed LLDSendInterval
	require.NoError(t, plugin.Write([]telegraf.Metric{m}))

	// Simulate time passing for a new LLD send
	plugin.lldLastSend = time.Now().Add(-time.Duration(plugin.LLDSendInterval)).Add(-time.Millisecond)

	// Third packet, time has surpassed LLDSendInterval, metrics + LLD
	require.NoError(t, plugin.Write([]telegraf.Metric{m}))

	// Fourth packet
	require.NoError(t, plugin.Write([]telegraf.Metric{m}))

	// Simulate time passing for a new LLD send
	plugin.lldLastSend = time.Now().Add(-time.Duration(plugin.LLDSendInterval)).Add(-time.Millisecond)

	// Fifth packet, time has surpassed LLDSendInterval, metrics. No LLD as there is nothing new.
	require.NoError(t, plugin.Write([]telegraf.Metric{m}))

	// Sixth packet, new LLD info, but time has not surpassed LLDSendInterval
	require.NoError(t, plugin.Write([]telegraf.Metric{mNew}))

	// Simulate time passing for LLD clear
	plugin.lldLastSend = time.Now().Add(-time.Duration(plugin.LLDClearInterval)).Add(-time.Millisecond)

	// Seventh packet, time has surpassed LLDSendInterval and LLDClearInterval, metrics + LLD.
	// LLD will be cleared.
	require.NoError(t, plugin.Write([]telegraf.Metric{m}))

	// Eighth packet, time host not surpassed LLDSendInterval, just metrics.
	require.NoError(t, plugin.Write([]telegraf.Metric{m}))

	// Simulate time passing for a new LLD send
	plugin.lldLastSend = time.Now().Add(-time.Duration(plugin.LLDSendInterval)).Add(-time.Millisecond)

	// Ninth packet, time has surpassed LLDSendInterval, metrics + LLD.
	require.NoError(t, plugin.Write([]telegraf.Metric{m}))

	// Wait for the metrics to be received
	require.Eventuallyf(t, func() bool {
		return server.count.Load() >= uint32(len(expected))
	}, 3*time.Second, 100*time.Millisecond, "expected %d got %d", len(expected), server.count.Load())

	// Stop listening
	require.NoError(t, plugin.Close())
	server.listener.Close()
	wg.Wait()

	// Check the received metrics
	server.Lock()
	defer server.Unlock()

	require.Empty(t, server.errs, "server had errors")
	requireRequestDataEqual(t, expected, server.received, true)
}

// TestAutoRegister tests that auto-registration requests are sent to zabbix if enabled
func TestAutoRegister(t *testing.T) {
	now := time.Now()
	input := []telegraf.Metric{
		metric.New(
			"name",
			map[string]string{"host": "hostA"},
			map[string]interface{}{"value": int64(0)},
			now,
		),
		metric.New(
			"name",
			map[string]string{"host": "hostB"},
			map[string]interface{}{"value": int64(42)},
			now,
		),
	}

	expected := []zabbix.Packet{
		{
			Request: "sender data",
			Data: []*zabbix.Metric{
				{
					Host:  "hostA",
					Key:   "telegraf.name.value",
					Value: "0",
					Clock: now.Unix(),
				},
				{
					Host:  "hostB",
					Key:   "telegraf.name.value",
					Value: "42",
					Clock: now.Unix(),
				},
			},
		},
		{
			Request:      "active checks",
			Host:         "hostA",
			HostMetadata: "xxx",
		},
		{
			Request:      "active checks",
			Host:         "hostB",
			HostMetadata: "xxx",
		},
	}

	// Setup a Zabbix mock server and start listening
	server, err := newZabbixMockServer()
	require.NoError(t, err)
	defer server.close()

	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		server.listen()
	}()

	// Setup plugin
	plugin := &Zabbix{
		Address:                    server.addr(),
		KeyPrefix:                  "telegraf.",
		HostTag:                    "host",
		Autoregister:               "xxx",
		AutoregisterResendInterval: config.Duration(time.Minute * 5),
		Log:                        testutil.Logger{},
	}
	require.NoError(t, plugin.Init())

	// Connect and write the metrics
	require.NoError(t, plugin.Connect())
	require.NoError(t, plugin.Write(input))

	// Wait for the metrics to be received
	require.Eventuallyf(t, func() bool {
		return server.count.Load() >= uint32(len(expected))
	}, 3*time.Second, 100*time.Millisecond, "expected %d got %d", len(expected), server.count.Load())

	// Stop listening
	require.NoError(t, plugin.Close())
	server.listener.Close()
	wg.Wait()

	// Check the received metrics
	server.Lock()
	defer server.Unlock()

	require.Empty(t, server.errs, "server had errors")
	actual := server.received
	sort.SliceStable(expected, func(i, j int) bool { return expected[i].Host < expected[j].Host })
	sort.SliceStable(actual, func(i, j int) bool { return actual[i].Host < actual[j].Host })
	requireRequestDataEqual(t, expected, actual, false)
}

func TestBuildZabbixMetric(t *testing.T) {
	keyPrefix := "prefix."
	hostTag := "host"

	z := &Zabbix{
		KeyPrefix: keyPrefix,
		HostTag:   hostTag,
	}

	zm, err := z.buildZabbixMetric(metric.New(

		"name",
		map[string]string{hostTag: "hostA", "foo": "bar", "a": "b"},
		map[string]interface{}{},
		time.Now()),
		"value",
		1,
	)
	require.NoError(t, err)
	require.Equal(t, keyPrefix+"name.value[b,bar]", zm.Key)

	zm, err = z.buildZabbixMetric(metric.New(

		"name",
		map[string]string{hostTag: "hostA"},
		map[string]interface{}{},
		time.Now()),
		"value",
		1,
	)
	require.NoError(t, err)
	require.Equal(t, keyPrefix+"name.value", zm.Key)
}

func TestGetHostname(t *testing.T) {
	hostname, err := os.Hostname()
	require.NoError(t, err)

	tests := map[string]struct {
		HostTag string
		Host    string
		Tags    map[string]string
		Result  string
	}{
		"metric with host tag": {
			HostTag: "host",
			Tags: map[string]string{
				"host": "bar",
			},
			Result: "bar",
		},
		"metric with host tag changed": {
			HostTag: "source",
			Tags: map[string]string{
				"source": "bar",
			},
			Result: "bar",
		},
		"metric with no host tag": {
			Tags:   map[string]string{},
			Result: hostname,
		},
	}

	for desc, test := range tests {
		t.Run(desc, func(t *testing.T) {
			mt := metric.New(

				"name",
				test.Tags,
				map[string]interface{}{},
				time.Now(),
			)

			host, err := getHostname(test.HostTag, mt)
			require.NoError(t, err)
			require.Equal(t, test.Result, host)
		})
	}
}

func TestCases(t *testing.T) {
	if testing.Short() {
		t.Skip("Skipping integration test in short mode")
	}

	// Get all testcase directories
	folders, err := os.ReadDir("testcases")
	require.NoError(t, err)

	// Register the plugin
	outputs.Add("zabbix", func() telegraf.Output {
		return &Zabbix{
			KeyPrefix:                  "telegraf.",
			HostTag:                    "host",
			AutoregisterResendInterval: config.Duration(time.Minute * 30),
			LLDSendInterval:            config.Duration(time.Minute * 10),
			LLDClearInterval:           config.Duration(time.Hour),
		}
	})

	for _, f := range folders {
		// Only handle folders
		if !f.IsDir() {
			continue
		}

		t.Run(f.Name(), func(t *testing.T) {
			testcasePath := filepath.Join("testcases", f.Name())
			configFilename := filepath.Join(testcasePath, "telegraf.conf")
			inputFilename := filepath.Join(testcasePath, "input.influx")
			expectedFilename := filepath.Join(testcasePath, "expected.out")
			expectedErrorFilename := filepath.Join(testcasePath, "expected.err")

			// Get parser to parse input and expected output
			parser := &influx.Parser{}
			require.NoError(t, parser.Init())

			// Load the input data
			input, err := testutil.ParseMetricsFromFile(inputFilename, parser)
			require.NoError(t, err)

			// Read the expected output if any
			var expected []zabbix.Packet
			if _, err := os.Stat(expectedFilename); err == nil {
				buf, err := os.ReadFile(expectedFilename)
				require.NoError(t, err)
				require.NoError(t, json.Unmarshal(buf, &expected))
			}

			// Read the expected output if any
			var expectedError string
			if _, err := os.Stat(expectedErrorFilename); err == nil {
				expectedErrors, err := testutil.ParseLinesFromFile(expectedErrorFilename)
				require.NoError(t, err)
				require.Len(t, expectedErrors, 1)
				expectedError = expectedErrors[0]
			}

			// Configure the plugin
			cfg := config.NewConfig()
			require.NoError(t, cfg.LoadConfig(configFilename))
			require.Len(t, cfg.Outputs, 1)

			// Setup a Zabbix mock server and start listening
			server, err := newZabbixMockServer()
			require.NoError(t, err)
			defer server.close()

			var wg sync.WaitGroup
			wg.Add(1)
			go func() {
				defer wg.Done()
				server.listen()
			}()
			defer server.listener.Close()

			// Setup the plugin
			plugin := cfg.Outputs[0].Output.(*Zabbix)
			plugin.Address = server.addr()
			plugin.Log = testutil.Logger{}
			require.NoError(t, plugin.Init())

			// Connect and write the metric(s)
			require.NoError(t, plugin.Connect())
			defer plugin.Close()

			err = plugin.Write(input)
			if expectedError != "" {
				require.ErrorContains(t, err, expectedError)
				return
			}
			require.NoError(t, err)

			// Wait for the data to arrive
			require.Eventuallyf(t, func() bool {
				return server.count.Load() >= uint32(len(expected))
			}, 3*time.Second, 100*time.Millisecond, "expected %d got %d", len(expected), server.count.Load())

			server.listener.Close()
			wg.Wait()

			// Check the received data
			server.Lock()
			defer server.Unlock()
			require.Empty(t, server.errs, "server had errors")
			requireRequestDataEqual(t, expected, server.received, false)
		})
	}
}

type zabbixMockServer struct {
	listener net.Listener

	received []zabbix.Packet
	errs     []error
	count    atomic.Uint32
	sync.Mutex
}

func newZabbixMockServer() (*zabbixMockServer, error) {
	l, err := net.Listen("tcp", "127.0.0.1:0")
	if err != nil {
		return nil, err
	}
	return &zabbixMockServer{listener: l}, nil
}

func (s *zabbixMockServer) addr() string {
	return s.listener.Addr().String()
}

func (s *zabbixMockServer) close() error {
	if s.listener != nil {
		return s.listener.Close()
	}
	return nil
}

func (s *zabbixMockServer) listen() {
	for {
		request, err := s.handle()
		if err != nil {
			if !errors.Is(err, net.ErrClosed) {
				s.Lock()
				s.errs = append(s.errs, err)
				s.Unlock()
			}
			return
		}
		s.Lock()
		s.received = append(s.received, request)
		s.Unlock()
		s.count.Store(uint32(len(s.received)))
	}
}

func (s *zabbixMockServer) handle() (zabbix.Packet, error) {
	conn, err := s.listener.Accept()
	if err != nil {
		return zabbix.Packet{}, err
	}
	defer conn.Close()

	if err := conn.SetDeadline(time.Now().Add(time.Second)); err != nil {
		return zabbix.Packet{}, err
	}

	// Obtain request from the mock zabbix server
	// Read protocol header and version
	header := make([]byte, 5)
	if _, err := conn.Read(header); err != nil {
		return zabbix.Packet{}, err
	}

	// Read data length
	dataLengthRaw := make([]byte, 8)
	if _, err := conn.Read(dataLengthRaw); err != nil {
		return zabbix.Packet{}, err
	}
	dataLength := binary.LittleEndian.Uint64(dataLengthRaw)

	// Read data content
	content := make([]byte, dataLength)
	if _, err := conn.Read(content); err != nil {
		return zabbix.Packet{}, err
	}

	// The zabbix output checks that there are not errors
	// Simulated response from the server
	resp := []byte("ZBXD\x01\x00\x00\x00\x00\x00\x00\x00\x00{\"response\": \"success\", \"info\": \"\"}\n")
	if _, err := conn.Write(resp); err != nil {
		return zabbix.Packet{}, err
	}

	// Strip zabbix header and get JSON request
	var request zabbix.Packet
	if err := json.Unmarshal(content, &request); err != nil {
		return zabbix.Packet{}, err
	}

	return request, nil
}

type lldValue struct {
	Data []map[string]string `json:"data"`
}

func requireRequestDataEqual(t *testing.T, expected, actual []zabbix.Packet, ignoreClock bool) {
	t.Helper()
	require.Len(t, actual, len(expected))
	for i, expectedReq := range expected {
		actualReq := actual[i]
		require.Equalf(t, expectedReq.Request, actualReq.Request, "different request types in request %d", i)
		require.Equalf(t, expectedReq.Host, actualReq.Host, "different host in request %d", i)
		require.Equalf(t, expectedReq.HostMetadata, actualReq.HostMetadata, "different hostmetadata in request %d", i)

		// Check the elements
		require.Len(t, actualReq.Data, len(expectedReq.Data))

		less := func(a, b *zabbix.Metric) bool {
			if a.Key == b.Key {
				if a.Clock == b.Clock {
					return a.Value < b.Value
				}
				return a.Clock < b.Clock
			}
			return a.Key < b.Key
		}
		sort.SliceStable(actualReq.Data, func(i, j int) bool { return less(actualReq.Data[i], actualReq.Data[j]) })
		sort.SliceStable(expectedReq.Data, func(i, j int) bool { return less(expectedReq.Data[i], expectedReq.Data[j]) })
		for j, expectedData := range expectedReq.Data {
			actualData := actualReq.Data[j]
			require.Equalf(t, expectedData.Key, actualData.Key, "different key in request %d, data %d", i, j)
			require.Equalf(t, expectedData.Host, actualData.Host, "different host in request %d, data %d", i, j)
			if !ignoreClock {
				require.Equalf(t, expectedData.Clock, actualData.Clock, "different clock in request %d, data %d", i, j)
			}
			if strings.HasPrefix(expectedData.Value, "{") {
				var actualValue, expectedValue lldValue
				require.NoError(t, json.Unmarshal([]byte(actualData.Value), &actualValue))
				require.NoError(t, json.Unmarshal([]byte(expectedData.Value), &expectedValue))
				require.ElementsMatchf(t, expectedValue.Data, actualValue.Data, "different value in request %d, data %d", i, j)
			} else {
				require.Equalf(t, expectedData.Value, actualData.Value, "different value in request %d, data %d", i, j)
			}
		}
	}
}
