package influx_upstream

import (
	"errors"
	"fmt"
	"io"
	"strings"
	"time"

	"github.com/influxdata/line-protocol/v2/lineprotocol"

	"github.com/influxdata/telegraf"
	"github.com/influxdata/telegraf/config"
	"github.com/influxdata/telegraf/metric"
	"github.com/influxdata/telegraf/plugins/parsers"
)

const (
	maxErrorBufferSize = 1024
)

var (
	ErrNoMetric = errors.New("no metric in line")
)

type TimeFunc func() time.Time

// nthIndexAny finds the nth index of some unicode code point in a string or returns -1
func nthIndexAny(s, chars string, n int) int {
	offset := 0
	for found := 1; found <= n; found++ {
		i := strings.IndexAny(s[offset:], chars)
		if i < 0 {
			break
		}

		offset += i
		if found == n {
			return offset
		}

		offset += len(chars)
	}

	return -1
}

// ParseError indicates a error in the parsing of the text.
type ParseError struct {
	*lineprotocol.DecodeError
	buf string
}

func (e *ParseError) Error() string {
	// When an error occurs within the stream decoder, we do not have access
	// to the internal buffer, so we cannot display the contents of the invalid
	// metric
	if e.buf == "" {
		return fmt.Sprintf("metric parse error: %s at %d:%d", e.Err, e.Line, e.Column)
	}

	lineStart := nthIndexAny(e.buf, "\n", int(e.Line-1)) + 1
	buffer := e.buf[lineStart:]
	eol := strings.IndexAny(buffer, "\n")
	if eol >= 0 {
		buffer = strings.TrimSuffix(buffer[:eol], "\r")
	}
	if len(buffer) > maxErrorBufferSize {
		startEllipsis := true
		offset := e.Column - 1 - lineStart
		if offset > len(buffer) || offset < 0 {
			offset = len(buffer)
		}
		start := offset - maxErrorBufferSize
		if start < 0 {
			startEllipsis = false
			start = 0
		}
		// if we trimmed it the column won't line up. it'll always be the last character,
		// because the parser doesn't continue past it, but point it out anyway so
		// it's obvious where the issue is.
		buffer = buffer[start:offset] + "<-- here"
		if startEllipsis {
			buffer = "..." + buffer
		}
	}
	return fmt.Sprintf("metric parse error: %s at %d:%d: %q", e.Err, e.Line, e.Column, buffer)
}

// convertToParseError attempts to convert a lineprotocol.DecodeError to a ParseError
func convertToParseError(input []byte, rawErr error) error {
	var decErr *lineprotocol.DecodeError
	if !errors.As(rawErr, &decErr) {
		return rawErr
	}

	return &ParseError{
		DecodeError: decErr,
		buf:         string(input),
	}
}

// Parser is an InfluxDB Line Protocol parser that implements the
// parsers.Parser interface.
type Parser struct {
	InfluxTimestampPrecision config.Duration   `toml:"influx_timestamp_precision"`
	DefaultTags              map[string]string `toml:"-"`
	// If set to "series" a series machine will be initialized, defaults to regular machine
	Type string `toml:"-"`

	defaultTime  TimeFunc
	precision    lineprotocol.Precision
	allowPartial bool
}

func (p *Parser) SetTimeFunc(f TimeFunc) {
	p.defaultTime = f
}

func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) {
	metrics := make([]telegraf.Metric, 0)
	decoder := lineprotocol.NewDecoderWithBytes(input)

	for decoder.Next() {
		m, err := nextMetric(decoder, p.precision, p.defaultTime, p.allowPartial)
		if err != nil {
			return nil, convertToParseError(input, err)
		}
		metrics = append(metrics, m)
	}

	p.applyDefaultTags(metrics)
	return metrics, nil
}

func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
	metrics, err := p.Parse([]byte(line))
	if err != nil {
		return nil, err
	}

	if len(metrics) < 1 {
		return nil, ErrNoMetric
	}

	return metrics[0], nil
}

func (p *Parser) SetDefaultTags(tags map[string]string) {
	p.DefaultTags = tags
}

func (p *Parser) SetTimePrecision(u time.Duration) error {
	switch u {
	case 0:
		p.precision = lineprotocol.Nanosecond
	case time.Nanosecond:
		p.precision = lineprotocol.Nanosecond
	case time.Microsecond:
		p.precision = lineprotocol.Microsecond
	case time.Millisecond:
		p.precision = lineprotocol.Millisecond
	case time.Second:
		p.precision = lineprotocol.Second
	default:
		return fmt.Errorf("invalid time precision: %d", u)
	}

	return nil
}

func (p *Parser) applyDefaultTags(metrics []telegraf.Metric) {
	if len(p.DefaultTags) == 0 {
		return
	}

	for _, m := range metrics {
		p.applyDefaultTagsSingle(m)
	}
}

func (p *Parser) applyDefaultTagsSingle(m telegraf.Metric) {
	for k, v := range p.DefaultTags {
		if !m.HasTag(k) {
			m.AddTag(k, v)
		}
	}
}

func (p *Parser) Init() error {
	if err := p.SetTimePrecision(time.Duration(p.InfluxTimestampPrecision)); err != nil {
		return err
	}

	p.defaultTime = time.Now
	p.allowPartial = p.Type == "series"

	return nil
}

func init() {
	parsers.Add("influx_upstream",
		func(string) telegraf.Parser {
			return &Parser{}
		},
	)
}

// StreamParser is an InfluxDB Line Protocol parser.  It is not safe for
// concurrent use in multiple goroutines.
type StreamParser struct {
	decoder     *lineprotocol.Decoder
	defaultTime TimeFunc
	precision   lineprotocol.Precision
	lastError   error
}

func NewStreamParser(r io.Reader) *StreamParser {
	return &StreamParser{
		decoder:     lineprotocol.NewDecoder(r),
		defaultTime: time.Now,
		precision:   lineprotocol.Nanosecond,
	}
}

// SetTimeFunc changes the function used to determine the time of metrics
// without a timestamp.  The default TimeFunc is time.Now.  Useful mostly for
// testing, or perhaps if you want all metrics to have the same timestamp.
func (sp *StreamParser) SetTimeFunc(f TimeFunc) {
	sp.defaultTime = f
}

func (sp *StreamParser) SetTimePrecision(u time.Duration) error {
	switch u {
	case time.Nanosecond:
		sp.precision = lineprotocol.Nanosecond
	case time.Microsecond:
		sp.precision = lineprotocol.Microsecond
	case time.Millisecond:
		sp.precision = lineprotocol.Millisecond
	case time.Second:
		sp.precision = lineprotocol.Second
	case time.Minute:
		return errors.New("time precision 'm' is not supported")
	case time.Hour:
		return errors.New("time precision 'h' is not supported")
	}

	return nil
}

// Next parses the next item from the stream.  You can repeat calls to this
// function if it returns ParseError to get the next metric or error.
func (sp *StreamParser) Next() (telegraf.Metric, error) {
	if !sp.decoder.Next() {
		if err := sp.decoder.Err(); err != nil && !errors.Is(err, sp.lastError) {
			sp.lastError = err
			return nil, err
		}

		return nil, io.EOF
	}

	m, err := nextMetric(sp.decoder, sp.precision, sp.defaultTime, false)
	if err != nil {
		return nil, convertToParseError(nil, err)
	}

	return m, nil
}

func nextMetric(decoder *lineprotocol.Decoder, precision lineprotocol.Precision, defaultTime TimeFunc, allowPartial bool) (telegraf.Metric, error) {
	measurement, err := decoder.Measurement()
	if err != nil {
		return nil, err
	}
	m := metric.New(string(measurement), nil, nil, time.Time{})

	for {
		key, value, err := decoder.NextTag()
		if err != nil {
			// Allow empty tags for series parser
			if strings.Contains(err.Error(), "empty tag name") && allowPartial {
				break
			}

			return nil, err
		} else if key == nil {
			break
		}

		m.AddTag(string(key), string(value))
	}

	for {
		key, value, err := decoder.NextField()
		if err != nil {
			// Allow empty fields for series parser
			if strings.Contains(err.Error(), "expected field key") && allowPartial {
				break
			}

			return nil, err
		} else if key == nil {
			break
		}

		m.AddField(string(key), value.Interface())
	}

	t, err := decoder.Time(precision, defaultTime())
	if err != nil && !allowPartial {
		return nil, err
	}
	m.SetTime(t)

	return m, nil
}
