//go:generate ../../../tools/readme_config_includer/generator
package opentsdb

import (
	_ "embed"
	"errors"
	"fmt"
	"math"
	"net"
	"net/url"
	"regexp"
	"sort"
	"strconv"
	"strings"

	"github.com/influxdata/telegraf"
	"github.com/influxdata/telegraf/plugins/outputs"
)

//go:embed sample.conf
var sampleConfig string

var (
	allowedChars = regexp.MustCompile(`[^a-zA-Z0-9-_./\p{L}]`)
	hyphenChars  = strings.NewReplacer(
		"@", "-",
		"*", "-",
		`%`, "-",
		"#", "-",
		"$", "-")
	defaultHTTPPath  = "/api/put"
	defaultSeparator = "_"
)

type OpenTSDB struct {
	Prefix string `toml:"prefix"`

	Host string `toml:"host"`
	Port int    `toml:"port"`

	HTTPBatchSize int    `toml:"http_batch_size"`
	HTTPPath      string `toml:"http_path"`

	Debug bool `toml:"debug"`

	Separator string `toml:"separator"`

	Log telegraf.Logger `toml:"-"`
}

func ToLineFormat(tags map[string]string) string {
	tagsArray := make([]string, 0, len(tags))
	for k, v := range tags {
		tagsArray = append(tagsArray, fmt.Sprintf("%s=%s", k, v))
	}
	sort.Strings(tagsArray)
	return strings.Join(tagsArray, " ")
}

func (*OpenTSDB) SampleConfig() string {
	return sampleConfig
}

func (o *OpenTSDB) Connect() error {
	if !strings.HasPrefix(o.Host, "http") && !strings.HasPrefix(o.Host, "tcp") {
		o.Host = "tcp://" + o.Host
	}
	// Test Connection to OpenTSDB Server
	u, err := url.Parse(o.Host)
	if err != nil {
		return fmt.Errorf("error in parsing host url: %w", err)
	}

	uri := fmt.Sprintf("%s:%d", u.Host, o.Port)
	tcpAddr, err := net.ResolveTCPAddr("tcp", uri)
	if err != nil {
		return fmt.Errorf("failed to resolve TCP address: %w", err)
	}
	connection, err := net.DialTCP("tcp", nil, tcpAddr)
	if err != nil {
		return fmt.Errorf("failed to connect to OpenTSDB: %w", err)
	}
	defer connection.Close()
	return nil
}

func (o *OpenTSDB) Write(metrics []telegraf.Metric) error {
	if len(metrics) == 0 {
		return nil
	}

	u, err := url.Parse(o.Host)
	if err != nil {
		return fmt.Errorf("error in parsing host url: %w", err)
	}

	if u.Scheme == "" || u.Scheme == "tcp" {
		return o.WriteTelnet(metrics, u)
	} else if u.Scheme == "http" || u.Scheme == "https" {
		return o.WriteHTTP(metrics, u)
	}
	return errors.New("unknown scheme in host parameter")
}

func (o *OpenTSDB) WriteHTTP(metrics []telegraf.Metric, u *url.URL) error {
	http := openTSDBHttp{
		Host:      u.Host,
		Port:      o.Port,
		Scheme:    u.Scheme,
		User:      u.User,
		BatchSize: o.HTTPBatchSize,
		Path:      o.HTTPPath,
		Debug:     o.Debug,
		log:       o.Log,
	}

	for _, m := range metrics {
		now := m.Time().UnixNano() / 1000000000
		tags := cleanTags(m.Tags())

		for fieldName, value := range m.Fields() {
			switch fv := value.(type) {
			case int64:
			case uint64:
			case float64:
				// JSON does not support these special values
				if math.IsNaN(fv) || math.IsInf(fv, 0) {
					continue
				}
			default:
				o.Log.Debugf("OpenTSDB does not support metric value: [%s] of type [%T].", value, value)
				continue
			}

			metric := &HTTPMetric{
				Metric: sanitize(fmt.Sprintf("%s%s%s%s",
					o.Prefix, m.Name(), o.Separator, fieldName)),
				Tags:      tags,
				Timestamp: now,
				Value:     value,
			}

			if err := http.sendDataPoint(metric); err != nil {
				return err
			}
		}
	}

	return http.flush()
}

func (o *OpenTSDB) WriteTelnet(metrics []telegraf.Metric, u *url.URL) error {
	// Send Data with telnet / socket communication
	uri := fmt.Sprintf("%s:%d", u.Host, o.Port)
	tcpAddr, err := net.ResolveTCPAddr("tcp", uri)
	if err != nil {
		return fmt.Errorf("failed to resolve TCP address: %w", err)
	}
	connection, err := net.DialTCP("tcp", nil, tcpAddr)
	if err != nil {
		return fmt.Errorf("failed to connect to OpenTSDB: %w", err)
	}
	defer connection.Close()

	for _, m := range metrics {
		now := m.Time().UnixNano() / 1000000000
		tags := ToLineFormat(cleanTags(m.Tags()))

		for fieldName, value := range m.Fields() {
			switch fv := value.(type) {
			case int64:
			case uint64:
			case float64:
				// JSON does not support these special values
				if math.IsNaN(fv) || math.IsInf(fv, 0) {
					continue
				}
			default:
				o.Log.Debugf("OpenTSDB does not support metric value: [%s] of type [%T].", value, value)
				continue
			}

			metricValue, buildError := buildValue(value)
			if buildError != nil {
				o.Log.Errorf("OpenTSDB: %s", buildError.Error())
				continue
			}

			messageLine := fmt.Sprintf("put %s %v %s %s\n",
				sanitize(fmt.Sprintf("%s%s%s%s", o.Prefix, m.Name(), o.Separator, fieldName)),
				now, metricValue, tags)

			_, err = connection.Write([]byte(messageLine))
			if err != nil {
				return fmt.Errorf("telnet writing error: %w", err)
			}
		}
	}

	return nil
}

func cleanTags(tags map[string]string) map[string]string {
	tagSet := make(map[string]string, len(tags))
	for k, v := range tags {
		val := sanitize(v)
		if val != "" {
			tagSet[sanitize(k)] = val
		}
	}
	return tagSet
}

func buildValue(v interface{}) (string, error) {
	var retv string
	switch p := v.(type) {
	case int64:
		retv = IntToString(p)
	case uint64:
		retv = UIntToString(p)
	case float64:
		retv = FloatToString(p)
	default:
		return retv, fmt.Errorf("unexpected type %T with value %v for OpenTSDB", v, v)
	}
	return retv, nil
}

func IntToString(inputNum int64) string {
	return strconv.FormatInt(inputNum, 10)
}

func UIntToString(inputNum uint64) string {
	return strconv.FormatUint(inputNum, 10)
}

func FloatToString(inputNum float64) string {
	return strconv.FormatFloat(inputNum, 'f', 6, 64)
}

func (*OpenTSDB) Close() error {
	return nil
}

func sanitize(value string) string {
	// Apply special hyphenation rules to preserve backwards compatibility
	value = hyphenChars.Replace(value)
	// Replace any remaining illegal chars
	return allowedChars.ReplaceAllLiteralString(value, "_")
}

func init() {
	outputs.Add("opentsdb", func() telegraf.Output {
		return &OpenTSDB{
			HTTPPath:  defaultHTTPPath,
			Separator: defaultSeparator,
		}
	})
}
