//go:generate ../../../tools/readme_config_includer/generator
package quix

import (
	"crypto/tls"
	"crypto/x509"
	_ "embed"
	"errors"
	"fmt"
	"strings"
	"time"

	"github.com/IBM/sarama"

	"github.com/influxdata/telegraf"
	"github.com/influxdata/telegraf/config"
	common_http "github.com/influxdata/telegraf/plugins/common/http"
	common_kafka "github.com/influxdata/telegraf/plugins/common/kafka"
	"github.com/influxdata/telegraf/plugins/outputs"
	"github.com/influxdata/telegraf/plugins/serializers/json"
)

//go:embed sample.conf
var sampleConfig string

type Quix struct {
	APIURL    string          `toml:"url"`
	Workspace string          `toml:"workspace"`
	Topic     string          `toml:"topic"`
	Token     config.Secret   `toml:"token"`
	Log       telegraf.Logger `toml:"-"`
	common_http.HTTPClientConfig

	producer   sarama.SyncProducer
	serializer telegraf.Serializer
	kakfaTopic string
}

func (*Quix) SampleConfig() string {
	return sampleConfig
}

func (q *Quix) Init() error {
	// Set defaults
	if q.APIURL == "" {
		q.APIURL = "https://portal-api.platform.quix.io"
	}
	q.APIURL = strings.TrimSuffix(q.APIURL, "/")

	// Check input parameters
	if q.Topic == "" {
		return errors.New("option 'topic' must be set")
	}
	if q.Workspace == "" {
		return errors.New("option 'workspace' must be set")
	}
	if q.Token.Empty() {
		return errors.New("option 'token' must be set")
	}
	q.kakfaTopic = q.Workspace + "-" + q.Topic

	// Create a JSON serializer for the output
	q.serializer = &json.Serializer{
		TimestampUnits: config.Duration(time.Nanosecond), // Hardcoded nanoseconds precision
	}

	return nil
}

func (q *Quix) Connect() error {
	// Fetch the Kafka broker configuration from the Quix HTTP endpoint
	quixConfig, err := q.fetchBrokerConfig()
	if err != nil {
		return fmt.Errorf("fetching broker config failed: %w", err)
	}
	brokers := strings.Split(quixConfig.BootstrapServers, ",")
	if len(brokers) == 0 {
		return errors.New("no brokers received")
	}

	// Setup the Kakfa producer config
	cfg := sarama.NewConfig()
	cfg.Producer.Return.Successes = true

	switch quixConfig.SecurityProtocol {
	case "SASL_SSL":
		cfg.Net.SASL.Enable = true
		cfg.Net.SASL.User = quixConfig.SaslUsername
		cfg.Net.SASL.Password = quixConfig.SaslPassword
		cfg.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
		cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
			return &common_kafka.XDGSCRAMClient{HashGeneratorFcn: common_kafka.SHA256}
		}

		switch quixConfig.SaslMechanism {
		case "SCRAM-SHA-512":
			cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
				return &common_kafka.XDGSCRAMClient{HashGeneratorFcn: common_kafka.SHA512}
			}
			cfg.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
		case "SCRAM-SHA-256":
			cfg.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
			cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
				return &common_kafka.XDGSCRAMClient{HashGeneratorFcn: common_kafka.SHA256}
			}
		case "PLAIN":
			cfg.Net.SASL.Mechanism = sarama.SASLTypePlaintext
		default:
			return fmt.Errorf("unsupported SASL mechanism: %s", quixConfig.SaslMechanism)
		}

		cfg.Net.TLS.Enable = true

		// Add the CA certificate sent by the server if there is any. Newer cloud
		// instances do not need this and we can go with the system certificates.
		if len(quixConfig.cert) > 0 {
			certPool := x509.NewCertPool()
			if !certPool.AppendCertsFromPEM(quixConfig.cert) {
				return errors.New("appending CA cert to pool failed")
			}
			cfg.Net.TLS.Config = &tls.Config{RootCAs: certPool}
		}
	case "PLAINTEXT":
		// No additional configuration required for plaintext communication
	default:
		return fmt.Errorf("unsupported security protocol: %s", quixConfig.SecurityProtocol)
	}

	// Setup the Kakfa producer itself
	producer, err := sarama.NewSyncProducer(brokers, cfg)
	if err != nil {
		return fmt.Errorf("creating producer failed: %w", err)
	}
	q.producer = producer

	return nil
}

func (q *Quix) Write(metrics []telegraf.Metric) error {
	for _, m := range metrics {
		serialized, err := q.serializer.Serialize(m)
		if err != nil {
			q.Log.Errorf("Error serializing metric: %v", err)
			continue
		}

		msg := &sarama.ProducerMessage{
			Topic:     q.kakfaTopic,
			Value:     sarama.ByteEncoder(serialized),
			Timestamp: m.Time(),
			Key:       sarama.StringEncoder("telegraf"),
		}

		if _, _, err = q.producer.SendMessage(msg); err != nil {
			q.Log.Errorf("Error sending message to Kafka: %v", err)
			continue
		}
	}

	return nil
}

func (q *Quix) Close() error {
	if q.producer != nil {
		return q.producer.Close()
	}
	return nil
}

func init() {
	outputs.Add("quix", func() telegraf.Output { return &Quix{} })
}
