package timestream

import (
	"context"
	"errors"
	"fmt"
	"math"
	"reflect"
	"sort"
	"strconv"
	"strings"
	"testing"
	"time"

	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/service/timestreamwrite"
	"github.com/aws/aws-sdk-go-v2/service/timestreamwrite/types"
	"github.com/stretchr/testify/require"

	"github.com/influxdata/telegraf"
	"github.com/influxdata/telegraf/metric"
	common_aws "github.com/influxdata/telegraf/plugins/common/aws"
	"github.com/influxdata/telegraf/testutil"
)

const tsDBName = "testDb"

const testSingleTableName = "SingleTableName"
const testSingleTableDim = "namespace"

var time1 = time.Date(2009, time.November, 10, 22, 0, 0, 0, time.UTC)

const time1Epoch = "1257890400"
const timeUnit = types.TimeUnitSeconds

var time2 = time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)

const time2Epoch = "1257894000"

const metricName1 = "metricName1"
const metricName2 = "metricName2"

type mockTimestreamClient struct {
	WriteRecordsRequestCount int
}

func (*mockTimestreamClient) CreateTable(
	context.Context,
	*timestreamwrite.CreateTableInput,
	...func(*timestreamwrite.Options),
) (*timestreamwrite.CreateTableOutput, error) {
	return nil, nil
}

func (m *mockTimestreamClient) WriteRecords(
	context.Context,
	*timestreamwrite.WriteRecordsInput,
	...func(*timestreamwrite.Options),
) (*timestreamwrite.WriteRecordsOutput, error) {
	m.WriteRecordsRequestCount++
	return nil, nil
}

func (*mockTimestreamClient) DescribeDatabase(
	context.Context,
	*timestreamwrite.DescribeDatabaseInput,
	...func(*timestreamwrite.Options),
) (*timestreamwrite.DescribeDatabaseOutput, error) {
	return nil, errors.New("hello from DescribeDatabase")
}

func TestConnectValidatesConfigParameters(t *testing.T) {
	originalWriteFactory := WriteFactory
	t.Cleanup(func() {
		WriteFactory = originalWriteFactory
	})

	WriteFactory = func(*common_aws.CredentialConfig) (WriteClient, error) {
		return &mockTimestreamClient{}, nil
	}
	// checking base arguments
	noDatabaseName := Timestream{Log: testutil.Logger{}}
	require.ErrorContains(t, noDatabaseName.Connect(), "'database_name' key is required")

	noMappingMode := Timestream{
		DatabaseName: tsDBName,
		Log:          testutil.Logger{},
	}
	require.ErrorContains(t, noMappingMode.Connect(), "'mapping_mode' key is required")

	incorrectMappingMode := Timestream{
		DatabaseName: tsDBName,
		MappingMode:  "foo",
		Log:          testutil.Logger{},
	}
	require.Contains(t, incorrectMappingMode.Connect().Error(), "single-table")

	// multi-measure config validation multi table mode
	validConfigMultiMeasureMultiTableMode := Timestream{
		DatabaseName:                      tsDBName,
		MappingMode:                       MappingModeMultiTable,
		UseMultiMeasureRecords:            true,
		MeasureNameForMultiMeasureRecords: "multi-measure-name",
		Log:                               testutil.Logger{},
	}
	require.NoError(t, validConfigMultiMeasureMultiTableMode.Connect())

	invalidConfigMultiMeasureMultiTableMode := Timestream{
		DatabaseName:           tsDBName,
		MappingMode:            MappingModeMultiTable,
		UseMultiMeasureRecords: true,
		// without MeasureNameForMultiMeasureRecords set we expect validation failure
		Log: testutil.Logger{},
	}
	require.Contains(t, invalidConfigMultiMeasureMultiTableMode.Connect().Error(), "MeasureNameForMultiMeasureRecords")

	// multi-measure config validation single table mode
	validConfigMultiMeasureSingleTableMode := Timestream{
		DatabaseName:           tsDBName,
		MappingMode:            MappingModeSingleTable,
		SingleTableName:        testSingleTableName,
		UseMultiMeasureRecords: true, // MeasureNameForMultiMeasureRecords is not needed as
		// measurement name (from telegraf metric) is used as multi-measure name in TS
		Log: testutil.Logger{},
	}
	require.NoError(t, validConfigMultiMeasureSingleTableMode.Connect())

	invalidConfigMultiMeasureSingleTableMode := Timestream{
		DatabaseName:                      tsDBName,
		MappingMode:                       MappingModeSingleTable,
		SingleTableName:                   testSingleTableName,
		UseMultiMeasureRecords:            true,
		MeasureNameForMultiMeasureRecords: "multi-measure-name",
		// value of MeasureNameForMultiMeasureRecords will be ignored and
		// measurement name (from telegraf metric) is used as multi-measure name in TS
		Log: testutil.Logger{},
	}
	err := invalidConfigMultiMeasureSingleTableMode.Connect()
	require.ErrorContains(t, err, "MeasureNameForMultiMeasureRecords")

	// multi-table arguments
	validMappingModeMultiTable := Timestream{
		DatabaseName: tsDBName,
		MappingMode:  MappingModeMultiTable,
		Log:          testutil.Logger{},
	}
	require.NoError(t, validMappingModeMultiTable.Connect())

	singleTableNameWithMultiTable := Timestream{
		DatabaseName:    tsDBName,
		MappingMode:     MappingModeMultiTable,
		SingleTableName: testSingleTableName,
		Log:             testutil.Logger{},
	}
	require.Contains(t, singleTableNameWithMultiTable.Connect().Error(), "SingleTableName")

	singleTableDimensionWithMultiTable := Timestream{
		DatabaseName: tsDBName,
		MappingMode:  MappingModeMultiTable,
		SingleTableDimensionNameForTelegrafMeasurementName: testSingleTableDim,
		Log: testutil.Logger{},
	}
	require.Contains(t, singleTableDimensionWithMultiTable.Connect().Error(),
		"SingleTableDimensionNameForTelegrafMeasurementName")

	// single-table arguments
	noTableNameMappingModeSingleTable := Timestream{
		DatabaseName: tsDBName,
		MappingMode:  MappingModeSingleTable,
		Log:          testutil.Logger{},
	}
	require.Contains(t, noTableNameMappingModeSingleTable.Connect().Error(), "SingleTableName")

	noDimensionNameMappingModeSingleTable := Timestream{
		DatabaseName:    tsDBName,
		MappingMode:     MappingModeSingleTable,
		SingleTableName: testSingleTableName,
		Log:             testutil.Logger{},
	}
	require.Contains(t, noDimensionNameMappingModeSingleTable.Connect().Error(),
		"SingleTableDimensionNameForTelegrafMeasurementName")

	validConfigurationMappingModeSingleTable := Timestream{
		DatabaseName:    tsDBName,
		MappingMode:     MappingModeSingleTable,
		SingleTableName: testSingleTableName,
		SingleTableDimensionNameForTelegrafMeasurementName: testSingleTableDim,
		Log: testutil.Logger{},
	}
	require.NoError(t, validConfigurationMappingModeSingleTable.Connect())

	// create table arguments
	createTableNoMagneticRetention := Timestream{
		DatabaseName:           tsDBName,
		MappingMode:            MappingModeMultiTable,
		CreateTableIfNotExists: true,
		Log:                    testutil.Logger{},
	}
	require.Contains(t, createTableNoMagneticRetention.Connect().Error(),
		"CreateTableMagneticStoreRetentionPeriodInDays")

	createTableNoMemoryRetention := Timestream{
		DatabaseName:           tsDBName,
		MappingMode:            MappingModeMultiTable,
		CreateTableIfNotExists: true,
		CreateTableMagneticStoreRetentionPeriodInDays: 3,
		Log: testutil.Logger{},
	}
	require.Contains(t, createTableNoMemoryRetention.Connect().Error(),
		"CreateTableMemoryStoreRetentionPeriodInHours")

	createTableValid := Timestream{
		DatabaseName:           tsDBName,
		MappingMode:            MappingModeMultiTable,
		CreateTableIfNotExists: true,
		CreateTableMagneticStoreRetentionPeriodInDays: 3,
		CreateTableMemoryStoreRetentionPeriodInHours:  3,
		Log: testutil.Logger{},
	}
	require.NoError(t, createTableValid.Connect())

	// describe table on start arguments
	describeTableInvoked := Timestream{
		DatabaseName:            tsDBName,
		MappingMode:             MappingModeMultiTable,
		DescribeDatabaseOnStart: true,
		Log:                     testutil.Logger{},
	}
	require.Contains(t, describeTableInvoked.Connect().Error(), "hello from DescribeDatabase")
}

func TestConnectReturnsWriteFactoryError(t *testing.T) {
	originalWriteFactory := WriteFactory
	t.Cleanup(func() {
		WriteFactory = originalWriteFactory
	})

	WriteFactory = func(*common_aws.CredentialConfig) (WriteClient, error) {
		return nil, errors.New("unable to construct client")
	}

	plugin := Timestream{
		DatabaseName: tsDBName,
		MappingMode:  MappingModeMultiTable,
		Log:          testutil.Logger{},
	}

	require.ErrorContains(t, plugin.Connect(), "unable to construct client")
}

func TestWriteMultiMeasuresSingleTableMode(t *testing.T) {
	const recordCount = 100
	mockClient := &mockTimestreamClient{0}

	WriteFactory = func(*common_aws.CredentialConfig) (WriteClient, error) {
		return mockClient, nil
	}

	localTime, err := strconv.Atoi(time1Epoch)
	require.NoError(t, err)

	inputs := make([]telegraf.Metric, 0, recordCount+1)
	for i := 1; i <= recordCount+1; i++ {
		localTime++

		fieldName1 := "value_supported1" + strconv.Itoa(i)
		fieldName2 := "value_supported2" + strconv.Itoa(i)
		inputs = append(inputs, metric.New(
			"multi_measure_name",
			map[string]string{"tag1": "value1"},
			map[string]interface{}{
				fieldName1: float64(10),
				fieldName2: float64(20),
			},
			time.Unix(int64(localTime), 0),
		))
	}

	plugin := Timestream{
		MappingMode:            MappingModeSingleTable,
		SingleTableName:        "test-multi-single-table-mode",
		DatabaseName:           tsDBName,
		UseMultiMeasureRecords: true, // use multi
		Log:                    testutil.Logger{},
	}

	// validate config correctness
	require.NoError(t, plugin.Connect())

	// validate multi-record generation
	result := plugin.TransformMetrics(inputs)
	// 'inputs' has a total of 101 metrics transformed to 2 writeRecord calls to TS
	require.Len(t, result, 2, "Expected 2 WriteRecordsInput requests")

	var transformedRecords []types.Record
	for _, r := range result {
		transformedRecords = append(transformedRecords, r.Records...)
		// Assert that we use measure name from input
		require.Equal(t, "multi_measure_name", *r.Records[0].MeasureName)
	}
	// Expected 101 records
	require.Len(t, transformedRecords, recordCount+1, "Expected 101 records after transforming")
	// validate write to TS
	err = plugin.Write(inputs)
	require.NoError(t, err, "Write to Timestream failed")
	require.Equal(t, 2, mockClient.WriteRecordsRequestCount, "Expected 2 WriteRecords calls")
}

func TestWriteMultiMeasuresMultiTableMode(t *testing.T) {
	const recordCount = 100
	mockClient := &mockTimestreamClient{0}

	WriteFactory = func(*common_aws.CredentialConfig) (WriteClient, error) {
		return mockClient, nil
	}

	localTime, err := strconv.Atoi(time1Epoch)
	require.NoError(t, err)

	inputs := make([]telegraf.Metric, 0, recordCount)
	for i := 1; i <= recordCount; i++ {
		localTime++

		fieldName1 := "value_supported1" + strconv.Itoa(i)
		fieldName2 := "value_supported2" + strconv.Itoa(i)
		inputs = append(inputs, metric.New(
			"multi_measure_name",
			map[string]string{"tag1": "value1"},
			map[string]interface{}{
				fieldName1: float64(10),
				fieldName2: float64(20),
			},
			time.Unix(int64(localTime), 0),
		))
	}

	plugin := Timestream{
		MappingMode:                       MappingModeMultiTable,
		DatabaseName:                      tsDBName,
		UseMultiMeasureRecords:            true, // use multi
		MeasureNameForMultiMeasureRecords: "config-multi-measure-name",
		Log:                               testutil.Logger{},
	}

	// validate config correctness
	err = plugin.Connect()
	require.NoError(t, err, "Invalid configuration")

	// validate multi-record generation
	result := plugin.TransformMetrics(inputs)
	// 'inputs' has a total of 101 metrics transformed to 2 writeRecord calls to TS
	require.Len(t, result, 1, "Expected 1 WriteRecordsInput requests")

	// Assert that we use measure name from config
	require.Equal(t, "config-multi-measure-name", *result[0].Records[0].MeasureName)

	var transformedRecords []types.Record
	for _, r := range result {
		transformedRecords = append(transformedRecords, r.Records...)
	}
	// Expected 100 records
	require.Len(t, transformedRecords, recordCount, "Expected 100 records after transforming")

	for _, input := range inputs {
		fmt.Println("Input", input)
		fmt.Println(*result[0].Records[0].MeasureName)
		break
	}

	// validate successful write to TS
	err = plugin.Write(inputs)
	require.NoError(t, err, "Write to Timestream failed")
	require.Equal(t, 1, mockClient.WriteRecordsRequestCount, "Expected 1 WriteRecords call")
}

func TestBuildMultiMeasuresInSingleAndMultiTableMode(t *testing.T) {
	input1 := metric.New(
		metricName1,
		map[string]string{"tag1": "value1"},
		map[string]interface{}{
			"measureDouble": aws.Float64(10),
		},
		time1,
	)

	input2 := metric.New(
		metricName1,
		map[string]string{"tag2": "value2"},
		map[string]interface{}{
			"measureBigint": aws.Int32(20),
		},
		time1,
	)

	input3 := metric.New(
		metricName1,
		map[string]string{"tag3": "value3"},
		map[string]interface{}{
			"measureVarchar": "DUMMY",
		},
		time1,
	)

	input4 := metric.New(
		metricName1,
		map[string]string{"tag4": "value4"},
		map[string]interface{}{
			"measureBool": true,
		},
		time1,
	)

	input5 := metric.New(
		metricName1,
		map[string]string{"tag5": "value5"},
		map[string]interface{}{
			"measureMaxUint64": uint64(math.MaxUint64),
		},
		time1,
	)

	input6 := metric.New(
		metricName1,
		map[string]string{"tag6": "value6"},
		map[string]interface{}{
			"measureSmallUint64": uint64(123456),
		},
		time1,
	)

	expectedResultMultiTable := buildExpectedMultiRecords("config-multi-measure-name", metricName1)

	plugin := Timestream{
		MappingMode:                       MappingModeMultiTable,
		DatabaseName:                      tsDBName,
		UseMultiMeasureRecords:            true, // use multi
		MeasureNameForMultiMeasureRecords: "config-multi-measure-name",
		Log:                               testutil.Logger{},
	}

	// validate config correctness
	err := plugin.Connect()
	require.NoError(t, err, "Invalid configuration")

	// validate multi-record generation with MappingModeMultiTable
	result := plugin.TransformMetrics([]telegraf.Metric{input1, input2, input3, input4, input5, input6})
	require.Len(t, result, 1, "Expected 1 WriteRecordsInput requests")

	require.EqualValues(t, expectedResultMultiTable, result[0])

	require.True(t, arrayContains(result, expectedResultMultiTable), "Expected that the list of requests to Timestream: %+v\n "+
		"will contain request: %+v\n\n", result, expectedResultMultiTable)

	// singleTableMode

	plugin = Timestream{
		MappingMode:            MappingModeSingleTable,
		SingleTableName:        "singleTableName",
		DatabaseName:           tsDBName,
		UseMultiMeasureRecords: true, // use multi
		Log:                    testutil.Logger{},
	}

	// validate config correctness
	err = plugin.Connect()
	require.NoError(t, err, "Invalid configuration")

	expectedResultSingleTable := buildExpectedMultiRecords(metricName1, "singleTableName")

	// validate multi-record generation with MappingModeSingleTable
	result = plugin.TransformMetrics([]telegraf.Metric{input1, input2, input3, input4, input5, input6})
	require.Len(t, result, 1, "Expected 1 WriteRecordsInput requests")

	require.EqualValues(t, expectedResultSingleTable, result[0])

	require.True(t, arrayContains(result, expectedResultSingleTable), "Expected that the list of requests to Timestream: %+v\n "+
		"will contain request: %+v\n\n", result, expectedResultSingleTable)
}

func buildExpectedMultiRecords(multiMeasureName, tableName string) *timestreamwrite.WriteRecordsInput {
	var totalRecords int
	recordDouble := buildMultiRecords(
		[]SimpleInput{
			{
				t:             time1Epoch,
				tableName:     metricName1,
				dimensions:    map[string]string{"tag1": "value1"},
				measureValues: map[string]string{"measureDouble": "10"},
			},
		},
		multiMeasureName,
		types.MeasureValueTypeDouble,
	)
	totalRecords += len(recordDouble)

	recordBigint := buildMultiRecords(
		[]SimpleInput{
			{
				t:             time1Epoch,
				tableName:     metricName1,
				dimensions:    map[string]string{"tag2": "value2"},
				measureValues: map[string]string{"measureBigint": "20"},
			},
		},
		multiMeasureName,
		types.MeasureValueTypeBigint,
	)
	totalRecords += len(recordBigint)

	recordVarchar := buildMultiRecords(
		[]SimpleInput{
			{
				t:             time1Epoch,
				tableName:     metricName1,
				dimensions:    map[string]string{"tag3": "value3"},
				measureValues: map[string]string{"measureVarchar": "DUMMY"},
			},
		},
		multiMeasureName,
		types.MeasureValueTypeVarchar,
	)
	totalRecords += len(recordVarchar)

	recordBool := buildMultiRecords(
		[]SimpleInput{
			{
				t:             time1Epoch,
				tableName:     metricName1,
				dimensions:    map[string]string{"tag4": "value4"},
				measureValues: map[string]string{"measureBool": "true"},
			},
		},
		multiMeasureName,
		types.MeasureValueTypeBoolean,
	)
	totalRecords += len(recordBool)

	recordMaxUint64 := buildMultiRecords(
		[]SimpleInput{
			{
				t:             time1Epoch,
				tableName:     metricName1,
				dimensions:    map[string]string{"tag5": "value5"},
				measureValues: map[string]string{"measureMaxUint64": "9223372036854775807"},
			},
		},
		multiMeasureName,
		types.MeasureValueTypeBigint,
	)
	totalRecords += len(recordMaxUint64)

	recordUint64 := buildMultiRecords(
		[]SimpleInput{
			{
				t:             time1Epoch,
				tableName:     metricName1,
				dimensions:    map[string]string{"tag6": "value6"},
				measureValues: map[string]string{"measureSmallUint64": "123456"},
			},
		},
		multiMeasureName,
		types.MeasureValueTypeBigint,
	)
	totalRecords += len(recordUint64)

	recordsMultiTableMode := make([]types.Record, 0, totalRecords)
	recordsMultiTableMode = append(recordsMultiTableMode, recordDouble...)
	recordsMultiTableMode = append(recordsMultiTableMode, recordBigint...)
	recordsMultiTableMode = append(recordsMultiTableMode, recordVarchar...)
	recordsMultiTableMode = append(recordsMultiTableMode, recordBool...)
	recordsMultiTableMode = append(recordsMultiTableMode, recordMaxUint64...)
	recordsMultiTableMode = append(recordsMultiTableMode, recordUint64...)

	expectedResultMultiTable := &timestreamwrite.WriteRecordsInput{
		DatabaseName:     aws.String(tsDBName),
		TableName:        aws.String(tableName),
		Records:          recordsMultiTableMode,
		CommonAttributes: &types.Record{},
	}
	return expectedResultMultiTable
}

type mockTimestreamErrorClient struct {
	ErrorToReturnOnWriteRecords error
}

func (*mockTimestreamErrorClient) CreateTable(
	context.Context,
	*timestreamwrite.CreateTableInput,
	...func(*timestreamwrite.Options),
) (*timestreamwrite.CreateTableOutput, error) {
	return nil, nil
}

func (m *mockTimestreamErrorClient) WriteRecords(
	context.Context,
	*timestreamwrite.WriteRecordsInput,
	...func(*timestreamwrite.Options),
) (*timestreamwrite.WriteRecordsOutput, error) {
	return nil, m.ErrorToReturnOnWriteRecords
}

func (*mockTimestreamErrorClient) DescribeDatabase(
	context.Context,
	*timestreamwrite.DescribeDatabaseInput,
	...func(*timestreamwrite.Options),
) (*timestreamwrite.DescribeDatabaseOutput, error) {
	return nil, nil
}

func TestThrottlingErrorIsReturnedToTelegraf(t *testing.T) {
	WriteFactory = func(*common_aws.CredentialConfig) (WriteClient, error) {
		return &mockTimestreamErrorClient{
			ErrorToReturnOnWriteRecords: &types.ThrottlingException{Message: aws.String("Throttling Test")},
		}, nil
	}

	plugin := Timestream{
		MappingMode:  MappingModeMultiTable,
		DatabaseName: tsDBName,
		Log:          testutil.Logger{},
	}
	require.NoError(t, plugin.Connect())
	input := metric.New(
		metricName1,
		map[string]string{"tag1": "value1"},
		map[string]interface{}{"value": float64(1)},
		time1,
	)

	err := plugin.Write([]telegraf.Metric{input})

	require.Error(t, err, "Expected an error to be returned to Telegraf, "+
		"so that the write will be retried by Telegraf later.")
}

func TestRejectedRecordsErrorResultsInMetricsBeingSkipped(t *testing.T) {
	WriteFactory = func(*common_aws.CredentialConfig) (WriteClient, error) {
		return &mockTimestreamErrorClient{
			ErrorToReturnOnWriteRecords: &types.RejectedRecordsException{Message: aws.String("RejectedRecords Test")},
		}, nil
	}

	plugin := Timestream{
		MappingMode:  MappingModeMultiTable,
		DatabaseName: tsDBName,
		Log:          testutil.Logger{},
	}
	require.NoError(t, plugin.Connect())
	input := metric.New(
		metricName1,
		map[string]string{"tag1": "value1"},
		map[string]interface{}{"value": float64(1)},
		time1,
	)

	err := plugin.Write([]telegraf.Metric{input})

	require.NoError(t, err, "Expected to silently swallow the RejectedRecordsException, "+
		"as retrying this error doesn't make sense.")
}
func TestWriteWhenRequestsGreaterThanMaxWriteGoRoutinesCount(t *testing.T) {
	t.Skip("Skipping test due to data race, will be re-visited")
	const maxWriteRecordsCalls = 5
	const maxRecordsInWriteRecordsCall = 100
	const totalRecords = maxWriteRecordsCalls * maxRecordsInWriteRecordsCall
	mockClient := &mockTimestreamClient{0}

	WriteFactory = func(*common_aws.CredentialConfig) (WriteClient, error) {
		return mockClient, nil
	}

	plugin := Timestream{
		MappingMode:  MappingModeMultiTable,
		DatabaseName: tsDBName,
		// Spawn only one go routine to serve all 5 write requests
		MaxWriteGoRoutinesCount: 2,
		Log:                     testutil.Logger{},
	}

	require.NoError(t, plugin.Connect())

	inputs := make([]telegraf.Metric, 0, totalRecords)
	for i := 1; i <= totalRecords; i++ {
		fieldName := "value_supported" + strconv.Itoa(i)
		inputs = append(inputs, metric.New(
			metricName1,
			map[string]string{"tag1": "value1"},
			map[string]interface{}{
				fieldName: float64(10),
			},
			time1,
		))
	}

	err := plugin.Write(inputs)
	require.NoError(t, err, "Expected to write without any errors ")
	require.Equal(t, maxWriteRecordsCalls, mockClient.WriteRecordsRequestCount, "Expected 5 calls to WriteRecords")
}

func TestWriteWhenRequestsLesserThanMaxWriteGoRoutinesCount(t *testing.T) {
	t.Skip("Skipping test due to data race, will be re-visited")
	const maxWriteRecordsCalls = 2
	const maxRecordsInWriteRecordsCall = 100
	const totalRecords = maxWriteRecordsCalls * maxRecordsInWriteRecordsCall
	mockClient := &mockTimestreamClient{0}

	WriteFactory = func(*common_aws.CredentialConfig) (WriteClient, error) {
		return mockClient, nil
	}

	plugin := Timestream{
		MappingMode:  MappingModeMultiTable,
		DatabaseName: tsDBName,
		// Spawn 5 parallel go routines to serve 2 write requests
		// In this case only 2 of the 5 go routines will process the write requests
		MaxWriteGoRoutinesCount: 5,
		Log:                     testutil.Logger{},
	}
	require.NoError(t, plugin.Connect())

	inputs := make([]telegraf.Metric, 0, totalRecords)
	for i := 1; i <= totalRecords; i++ {
		fieldName := "value_supported" + strconv.Itoa(i)
		inputs = append(inputs, metric.New(
			metricName1,
			map[string]string{"tag1": "value1"},
			map[string]interface{}{
				fieldName: float64(10),
			},
			time1,
		))
	}

	err := plugin.Write(inputs)
	require.NoError(t, err, "Expected to write without any errors ")
	require.Equal(t, maxWriteRecordsCalls, mockClient.WriteRecordsRequestCount, "Expected 5 calls to WriteRecords")
}

func TestTransformMetricsSkipEmptyMetric(t *testing.T) {
	input1 := metric.New(
		metricName1,
		map[string]string{"tag1": "value1"},
		map[string]interface{}{}, // no fields here
		time1,
	)
	input2 := metric.New(
		metricName1,
		map[string]string{"tag2": "value2"},
		map[string]interface{}{
			"value": float64(10),
		},
		time1,
	)
	input3 := metric.New(
		metricName1,
		map[string]string{}, // record with no dimensions should appear in the results
		map[string]interface{}{
			"value": float64(20),
		},
		time1,
	)

	records := buildRecords([]SimpleInput{
		{
			t:             time1Epoch,
			tableName:     metricName1,
			dimensions:    map[string]string{"tag2": "value2", testSingleTableDim: metricName1},
			measureValues: map[string]string{"value": "10"},
		},

		{
			t:             time1Epoch,
			tableName:     metricName1,
			dimensions:    map[string]string{testSingleTableDim: metricName1},
			measureValues: map[string]string{"value": "20"},
		},
	})

	expectedResultSingleTable := &timestreamwrite.WriteRecordsInput{
		DatabaseName:     aws.String(tsDBName),
		TableName:        aws.String(testSingleTableName),
		Records:          records,
		CommonAttributes: &types.Record{},
	}

	comparisonTest(t, MappingModeSingleTable,
		[]telegraf.Metric{input1, input2, input3},
		[]*timestreamwrite.WriteRecordsInput{expectedResultSingleTable})

	recordsMulti := buildRecords([]SimpleInput{
		{
			t:             time1Epoch,
			tableName:     metricName1,
			dimensions:    map[string]string{"tag2": "value2"},
			measureValues: map[string]string{"value": "10"},
		},
		{
			t:             time1Epoch,
			tableName:     metricName1,
			dimensions:    map[string]string{},
			measureValues: map[string]string{"value": "20"},
		},
	})

	expectedResultMultiTable := &timestreamwrite.WriteRecordsInput{
		DatabaseName:     aws.String(tsDBName),
		TableName:        aws.String(metricName1),
		Records:          recordsMulti,
		CommonAttributes: &types.Record{},
	}

	comparisonTest(t, MappingModeMultiTable,
		[]telegraf.Metric{input1, input2, input3},
		[]*timestreamwrite.WriteRecordsInput{expectedResultMultiTable})
}

func TestTransformMetricsRequestsAboveLimitAreSplit(t *testing.T) {
	const maxRecordsInWriteRecordsCall = 100

	inputs := make([]telegraf.Metric, 0, maxRecordsInWriteRecordsCall+1)
	for i := 1; i <= maxRecordsInWriteRecordsCall+1; i++ {
		fieldName := "value_supported" + strconv.Itoa(i)
		inputs = append(inputs, metric.New(
			metricName1,
			map[string]string{"tag1": "value1"},
			map[string]interface{}{
				fieldName: float64(10),
			},
			time1,
		))
	}

	resultFields := make(map[string]string)
	for i := 1; i <= maxRecordsInWriteRecordsCall; i++ {
		fieldName := "value_supported" + strconv.Itoa(i)
		resultFields[fieldName] = "10"
	}

	expectedResult1SingleTable := buildExpectedInput(SimpleInput{
		t:             time1Epoch,
		tableName:     testSingleTableName,
		dimensions:    map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
		measureValues: resultFields,
	})
	expectedResult2SingleTable := buildExpectedInput(SimpleInput{
		t:             time1Epoch,
		tableName:     testSingleTableName,
		dimensions:    map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
		measureValues: map[string]string{"value_supported" + strconv.Itoa(maxRecordsInWriteRecordsCall+1): "10"},
	})
	comparisonTest(t, MappingModeSingleTable,
		inputs,
		[]*timestreamwrite.WriteRecordsInput{expectedResult1SingleTable, expectedResult2SingleTable})

	expectedResult1MultiTable := buildExpectedInput(SimpleInput{
		t:             time1Epoch,
		tableName:     metricName1,
		dimensions:    map[string]string{"tag1": "value1"},
		measureValues: resultFields,
	})
	expectedResult2MultiTable := buildExpectedInput(SimpleInput{
		t:             time1Epoch,
		tableName:     metricName1,
		dimensions:    map[string]string{"tag1": "value1"},
		measureValues: map[string]string{"value_supported" + strconv.Itoa(maxRecordsInWriteRecordsCall+1): "10"},
	})
	comparisonTest(t, MappingModeMultiTable,
		inputs,
		[]*timestreamwrite.WriteRecordsInput{expectedResult1MultiTable, expectedResult2MultiTable})
}

func TestTransformMetricsRequestsAboveLimitAreSplitSingleTable(t *testing.T) {
	const maxRecordsInWriteRecordsCall = 100

	localTime, err := strconv.Atoi(time1Epoch)
	require.NoError(t, err)

	inputs := make([]telegraf.Metric, 0, maxRecordsInWriteRecordsCall+1)
	for i := 1; i <= maxRecordsInWriteRecordsCall+1; i++ {
		localTime++

		fieldName := "value_supported" + strconv.Itoa(i)
		inputs = append(inputs, metric.New(
			metricName1,
			map[string]string{"tag1": "value1"},
			map[string]interface{}{
				fieldName: float64(10),
			},
			time.Unix(int64(localTime), 0),
		))
	}

	localTime, err = strconv.Atoi(time1Epoch)
	require.NoError(t, err)

	var recordsFirstReq []types.Record

	for i := 1; i <= maxRecordsInWriteRecordsCall; i++ {
		localTime++

		recordsFirstReq = append(recordsFirstReq, buildRecord(SimpleInput{
			t:             strconv.Itoa(localTime),
			tableName:     testSingleTableName,
			dimensions:    map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
			measureValues: map[string]string{"value_supported" + strconv.Itoa(i): "10"},
		})...)
	}

	expectedResult1SingleTable := &timestreamwrite.WriteRecordsInput{
		DatabaseName:     aws.String(tsDBName),
		TableName:        aws.String(testSingleTableName),
		Records:          recordsFirstReq,
		CommonAttributes: &types.Record{},
	}

	localTime++

	recordsSecondReq := buildRecord(
		SimpleInput{
			t:             strconv.Itoa(localTime),
			tableName:     testSingleTableName,
			dimensions:    map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
			measureValues: map[string]string{"value_supported" + strconv.Itoa(maxRecordsInWriteRecordsCall+1): "10"},
		},
	)

	expectedResult2SingleTable := &timestreamwrite.WriteRecordsInput{
		DatabaseName:     aws.String(tsDBName),
		TableName:        aws.String(testSingleTableName),
		Records:          recordsSecondReq,
		CommonAttributes: &types.Record{},
	}

	comparisonTest(t, MappingModeSingleTable,
		inputs,
		[]*timestreamwrite.WriteRecordsInput{expectedResult1SingleTable, expectedResult2SingleTable})
}

func TestTransformMetricsDifferentDimensionsSameTimestampsAreWrittenSeparate(t *testing.T) {
	input1 := metric.New(
		metricName1,
		map[string]string{"tag1": "value1"},
		map[string]interface{}{
			"value_supported1": float64(10), "value_supported2": float64(20),
		},
		time1,
	)

	input2 := metric.New(
		metricName2,
		map[string]string{"tag2": "value2"},
		map[string]interface{}{
			"value_supported3": float64(30),
		},
		time1,
	)

	recordsSingle := buildRecords([]SimpleInput{
		{
			t:             time1Epoch,
			tableName:     testSingleTableName,
			dimensions:    map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
			measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"},
		},
		{
			t:             time1Epoch,
			tableName:     testSingleTableName,
			dimensions:    map[string]string{"tag2": "value2", testSingleTableDim: metricName2},
			measureValues: map[string]string{"value_supported3": "30"},
		},
	})

	expectedResultSingleTable := &timestreamwrite.WriteRecordsInput{
		DatabaseName:     aws.String(tsDBName),
		TableName:        aws.String(testSingleTableName),
		Records:          recordsSingle,
		CommonAttributes: &types.Record{},
	}

	comparisonTest(t, MappingModeSingleTable,
		[]telegraf.Metric{input1, input2},
		[]*timestreamwrite.WriteRecordsInput{expectedResultSingleTable})

	expectedResult1MultiTable := buildExpectedInput(SimpleInput{
		t:             time1Epoch,
		tableName:     metricName1,
		dimensions:    map[string]string{"tag1": "value1"},
		measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"},
	})

	expectedResult2MultiTable := buildExpectedInput(SimpleInput{
		t:             time1Epoch,
		tableName:     metricName2,
		dimensions:    map[string]string{"tag2": "value2"},
		measureValues: map[string]string{"value_supported3": "30"},
	})

	comparisonTest(t, MappingModeMultiTable,
		[]telegraf.Metric{input1, input2},
		[]*timestreamwrite.WriteRecordsInput{expectedResult1MultiTable, expectedResult2MultiTable})
}

func TestTransformMetricsSameDimensionsDifferentDimensionValuesAreWrittenSeparate(t *testing.T) {
	input1 := metric.New(
		metricName1,
		map[string]string{"tag1": "value1"},
		map[string]interface{}{
			"value_supported1": float64(10),
		},
		time1,
	)
	input2 := metric.New(
		metricName2,
		map[string]string{"tag1": "value2"},
		map[string]interface{}{
			"value_supported1": float64(20),
		},
		time1,
	)

	recordsSingle := buildRecords([]SimpleInput{
		{
			t:             time1Epoch,
			tableName:     testSingleTableName,
			dimensions:    map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
			measureValues: map[string]string{"value_supported1": "10"},
		},
		{
			t:             time1Epoch,
			tableName:     testSingleTableName,
			dimensions:    map[string]string{"tag1": "value2", testSingleTableDim: metricName2},
			measureValues: map[string]string{"value_supported1": "20"},
		},
	})

	expectedResultSingleTable := &timestreamwrite.WriteRecordsInput{
		DatabaseName:     aws.String(tsDBName),
		TableName:        aws.String(testSingleTableName),
		Records:          recordsSingle,
		CommonAttributes: &types.Record{},
	}

	comparisonTest(t, MappingModeSingleTable,
		[]telegraf.Metric{input1, input2},
		[]*timestreamwrite.WriteRecordsInput{expectedResultSingleTable})

	expectedResult1MultiTable := buildExpectedInput(SimpleInput{
		t:             time1Epoch,
		tableName:     metricName1,
		dimensions:    map[string]string{"tag1": "value1"},
		measureValues: map[string]string{"value_supported1": "10"},
	})
	expectedResult2MultiTable := buildExpectedInput(SimpleInput{
		t:             time1Epoch,
		tableName:     metricName2,
		dimensions:    map[string]string{"tag1": "value2"},
		measureValues: map[string]string{"value_supported1": "20"},
	})

	comparisonTest(t, MappingModeMultiTable,
		[]telegraf.Metric{input1, input2},
		[]*timestreamwrite.WriteRecordsInput{expectedResult1MultiTable, expectedResult2MultiTable})
}

func TestTransformMetricsSameDimensionsDifferentTimestampsAreWrittenSeparate(t *testing.T) {
	input1 := metric.New(
		metricName1,
		map[string]string{"tag1": "value1"},
		map[string]interface{}{
			"value_supported1": float64(10), "value_supported2": float64(20),
		},
		time1,
	)
	input2 := metric.New(
		metricName1,
		map[string]string{"tag1": "value1"},
		map[string]interface{}{
			"value_supported3": float64(30),
		},
		time2,
	)

	recordsSingle := buildRecords([]SimpleInput{
		{
			t:             time1Epoch,
			tableName:     testSingleTableName,
			dimensions:    map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
			measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"},
		},
		{
			t:             time2Epoch,
			tableName:     testSingleTableName,
			dimensions:    map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
			measureValues: map[string]string{"value_supported3": "30"},
		},
	})

	expectedResultSingleTable := &timestreamwrite.WriteRecordsInput{
		DatabaseName:     aws.String(tsDBName),
		TableName:        aws.String(testSingleTableName),
		Records:          recordsSingle,
		CommonAttributes: &types.Record{},
	}

	comparisonTest(t, MappingModeSingleTable,
		[]telegraf.Metric{input1, input2},
		[]*timestreamwrite.WriteRecordsInput{expectedResultSingleTable})

	recordsMultiTable := buildRecords([]SimpleInput{
		{
			t:             time1Epoch,
			tableName:     metricName1,
			dimensions:    map[string]string{"tag1": "value1"},
			measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"},
		},
		{
			t:             time2Epoch,
			tableName:     metricName1,
			dimensions:    map[string]string{"tag1": "value1"},
			measureValues: map[string]string{"value_supported3": "30"},
		},
	})

	expectedResultMultiTable := &timestreamwrite.WriteRecordsInput{
		DatabaseName:     aws.String(tsDBName),
		TableName:        aws.String(metricName1),
		Records:          recordsMultiTable,
		CommonAttributes: &types.Record{},
	}

	comparisonTest(t, MappingModeMultiTable,
		[]telegraf.Metric{input1, input2},
		[]*timestreamwrite.WriteRecordsInput{expectedResultMultiTable})
}

func TestTransformMetricsSameDimensionsSameTimestampsAreWrittenTogether(t *testing.T) {
	input1 := metric.New(
		metricName1,
		map[string]string{"tag1": "value1"},
		map[string]interface{}{
			"value_supported1": float64(10), "value_supported2": float64(20),
		},
		time1,
	)
	input2 := metric.New(
		metricName1,
		map[string]string{"tag1": "value1"},
		map[string]interface{}{
			"value_supported3": float64(30),
		},
		time1,
	)

	expectedResultSingleTable := buildExpectedInput(SimpleInput{
		t:             time1Epoch,
		tableName:     testSingleTableName,
		dimensions:    map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
		measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20", "value_supported3": "30"},
	})

	comparisonTest(t, MappingModeSingleTable,
		[]telegraf.Metric{input1, input2},
		[]*timestreamwrite.WriteRecordsInput{expectedResultSingleTable})

	expectedResultMultiTable := buildExpectedInput(SimpleInput{
		t:             time1Epoch,
		tableName:     metricName1,
		dimensions:    map[string]string{"tag1": "value1"},
		measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20", "value_supported3": "30"},
	})

	comparisonTest(t, MappingModeMultiTable,
		[]telegraf.Metric{input1, input2},
		[]*timestreamwrite.WriteRecordsInput{expectedResultMultiTable})
}

func TestTransformMetricsDifferentMetricsAreWrittenToDifferentTablesInMultiTableMapping(t *testing.T) {
	input1 := metric.New(
		metricName1,
		map[string]string{"tag1": "value1"},
		map[string]interface{}{
			"value_supported1": float64(10), "value_supported2": float64(20),
		},
		time1,
	)
	input2 := metric.New(
		metricName2,
		map[string]string{"tag1": "value1"},
		map[string]interface{}{
			"value_supported3": float64(30),
		},
		time1,
	)

	recordsSingle := buildRecords([]SimpleInput{
		{
			t:             time1Epoch,
			tableName:     testSingleTableName,
			dimensions:    map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
			measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"},
		},
		{
			t:             time1Epoch,
			tableName:     testSingleTableName,
			dimensions:    map[string]string{"tag1": "value1", testSingleTableDim: metricName2},
			measureValues: map[string]string{"value_supported3": "30"},
		},
	})

	expectedResultSingleTable := &timestreamwrite.WriteRecordsInput{
		DatabaseName:     aws.String(tsDBName),
		TableName:        aws.String(testSingleTableName),
		Records:          recordsSingle,
		CommonAttributes: &types.Record{},
	}

	comparisonTest(t, MappingModeSingleTable,
		[]telegraf.Metric{input1, input2},
		[]*timestreamwrite.WriteRecordsInput{expectedResultSingleTable})

	expectedResult1MultiTable := buildExpectedInput(SimpleInput{
		t:             time1Epoch,
		tableName:     metricName1,
		dimensions:    map[string]string{"tag1": "value1"},
		measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"},
	})
	expectedResult2MultiTable := buildExpectedInput(SimpleInput{
		t:             time1Epoch,
		tableName:     metricName2,
		dimensions:    map[string]string{"tag1": "value1"},
		measureValues: map[string]string{"value_supported3": "30"},
	})

	comparisonTest(t, MappingModeMultiTable,
		[]telegraf.Metric{input1, input2},
		[]*timestreamwrite.WriteRecordsInput{expectedResult1MultiTable, expectedResult2MultiTable})
}

func TestTransformMetricsUnsupportedFieldsAreSkipped(t *testing.T) {
	metricWithUnsupportedField := metric.New(
		metricName1,
		map[string]string{"tag1": "value1"},
		map[string]interface{}{
			"value_supported1": float64(10), "value_unsupported": time.Now(),
		},
		time1,
	)
	expectedResultSingleTable := buildExpectedInput(SimpleInput{
		t:             time1Epoch,
		tableName:     testSingleTableName,
		dimensions:    map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
		measureValues: map[string]string{"value_supported1": "10"},
	})

	comparisonTest(t, MappingModeSingleTable,
		[]telegraf.Metric{metricWithUnsupportedField},
		[]*timestreamwrite.WriteRecordsInput{expectedResultSingleTable})

	expectedResultMultiTable := buildExpectedInput(SimpleInput{
		t:             time1Epoch,
		tableName:     metricName1,
		dimensions:    map[string]string{"tag1": "value1"},
		measureValues: map[string]string{"value_supported1": "10"},
	})

	comparisonTest(t, MappingModeMultiTable,
		[]telegraf.Metric{metricWithUnsupportedField},
		[]*timestreamwrite.WriteRecordsInput{expectedResultMultiTable})
}

func TestCustomEndpoint(t *testing.T) {
	customEndpoint := "http://test.custom.endpoint.com"
	plugin := Timestream{
		MappingMode:      MappingModeMultiTable,
		DatabaseName:     tsDBName,
		Log:              testutil.Logger{},
		CredentialConfig: common_aws.CredentialConfig{EndpointURL: customEndpoint},
	}

	// validate config correctness
	err := plugin.Connect()
	require.NoError(t, err, "Invalid configuration")
	// Check customURL is used
	require.Equal(t, plugin.EndpointURL, customEndpoint)
}

func comparisonTest(t *testing.T,
	mappingMode string,
	telegrafMetrics []telegraf.Metric,
	timestreamRecords []*timestreamwrite.WriteRecordsInput,
) {
	var plugin Timestream
	switch mappingMode {
	case MappingModeSingleTable:
		plugin = Timestream{
			MappingMode:  mappingMode,
			DatabaseName: tsDBName,

			SingleTableName: testSingleTableName,
			SingleTableDimensionNameForTelegrafMeasurementName: testSingleTableDim,
			Log: testutil.Logger{},
		}
	case MappingModeMultiTable:
		plugin = Timestream{
			MappingMode:  mappingMode,
			DatabaseName: tsDBName,
			Log:          testutil.Logger{},
		}
	}

	comparison(t, plugin, mappingMode, telegrafMetrics, timestreamRecords)
}

func comparison(t *testing.T,
	plugin Timestream,
	mappingMode string,
	telegrafMetrics []telegraf.Metric,
	timestreamRecords []*timestreamwrite.WriteRecordsInput) {
	result := plugin.TransformMetrics(telegrafMetrics)

	require.Len(t, result, len(timestreamRecords), "The number of transformed records was expected to be different")
	for _, tsRecord := range timestreamRecords {
		require.True(t, arrayContains(result, tsRecord), "Expected that the list of requests to Timestream: \n%s\n\n "+
			"will contain request: \n%s\n\nUsed MappingMode: %s", result, tsRecord, mappingMode)
	}
}

func arrayContains(
	array []*timestreamwrite.WriteRecordsInput,
	element *timestreamwrite.WriteRecordsInput,
) bool {
	sortWriteInputForComparison(*element)

	for _, a := range array {
		sortWriteInputForComparison(*a)

		if reflect.DeepEqual(a, element) {
			return true
		}
	}
	return false
}

func sortWriteInputForComparison(element timestreamwrite.WriteRecordsInput) {
	// sort the records by MeasureName, as they are kept in an array, but the order of records doesn't matter
	sort.Slice(element.Records, func(i, j int) bool {
		return strings.Compare(*element.Records[i].MeasureName, *element.Records[j].MeasureName) < 0
	})
	// sort the dimensions in CommonAttributes
	if element.CommonAttributes != nil {
		sort.Slice(element.CommonAttributes.Dimensions, func(i, j int) bool {
			return strings.Compare(*element.CommonAttributes.Dimensions[i].Name,
				*element.CommonAttributes.Dimensions[j].Name) < 0
		})
	}
	// sort the dimensions in Records
	for _, r := range element.Records {
		sort.Slice(r.Dimensions, func(i, j int) bool {
			return strings.Compare(*r.Dimensions[i].Name, *r.Dimensions[j].Name) < 0
		})
	}
}

type SimpleInput struct {
	t             string
	tableName     string
	dimensions    map[string]string
	measureValues map[string]string
}

func buildExpectedInput(i SimpleInput) *timestreamwrite.WriteRecordsInput {
	tsDimensions := make([]types.Dimension, 0, len(i.dimensions))
	for k, v := range i.dimensions {
		tsDimensions = append(tsDimensions, types.Dimension{
			Name:  aws.String(k),
			Value: aws.String(v),
		})
	}

	tsRecords := make([]types.Record, 0, len(i.measureValues))
	for k, v := range i.measureValues {
		tsRecords = append(tsRecords, types.Record{
			MeasureName:      aws.String(k),
			MeasureValue:     aws.String(v),
			MeasureValueType: types.MeasureValueTypeDouble,
			Dimensions:       tsDimensions,
			Time:             aws.String(i.t),
			TimeUnit:         timeUnit,
		})
	}

	result := &timestreamwrite.WriteRecordsInput{
		DatabaseName:     aws.String(tsDBName),
		TableName:        aws.String(i.tableName),
		Records:          tsRecords,
		CommonAttributes: &types.Record{},
	}

	return result
}

func buildRecords(inputs []SimpleInput) []types.Record {
	tsRecords := make([]types.Record, 0, len(inputs))
	for _, inp := range inputs {
		tsRecords = append(tsRecords, buildRecord(inp)...)
	}

	return tsRecords
}

func buildRecord(input SimpleInput) []types.Record {
	tsDimensions := make([]types.Dimension, 0, len(input.dimensions))
	for k, v := range input.dimensions {
		tsDimensions = append(tsDimensions, types.Dimension{
			Name:  aws.String(k),
			Value: aws.String(v),
		})
	}

	tsRecords := make([]types.Record, 0, len(input.measureValues))
	for k, v := range input.measureValues {
		tsRecords = append(tsRecords, types.Record{
			MeasureName:      aws.String(k),
			MeasureValue:     aws.String(v),
			MeasureValueType: types.MeasureValueTypeDouble,
			Dimensions:       tsDimensions,
			Time:             aws.String(input.t),
			TimeUnit:         timeUnit,
		})
	}

	return tsRecords
}

func buildMultiRecords(inputs []SimpleInput, multiMeasureName string, measureType types.MeasureValueType) []types.Record {
	tsRecords := make([]types.Record, 0, len(inputs))
	for _, input := range inputs {
		tsDimensions := make([]types.Dimension, 0, len(input.dimensions))
		for k, v := range input.dimensions {
			tsDimensions = append(tsDimensions, types.Dimension{
				Name:  aws.String(k),
				Value: aws.String(v),
			})
		}

		multiMeasures := make([]types.MeasureValue, 0, len(input.measureValues))
		for k, v := range input.measureValues {
			multiMeasures = append(multiMeasures, types.MeasureValue{
				Name:  aws.String(k),
				Value: aws.String(v),
				Type:  measureType,
			})
		}

		tsRecords = append(tsRecords, types.Record{
			MeasureName:      aws.String(multiMeasureName),
			MeasureValueType: "MULTI",
			MeasureValues:    multiMeasures,
			Dimensions:       tsDimensions,
			Time:             aws.String(input.t),
			TimeUnit:         timeUnit,
		})
	}

	return tsRecords
}
