package config

import (
	"bytes"
	"crypto/tls"
	"errors"
	"fmt"
	"io"
	"log"
	"net/http"
	"net/url"
	"os"
	"path/filepath"
	"reflect"
	"regexp"
	"runtime"
	"slices"
	"sort"
	"strconv"
	"strings"
	"sync"
	"time"

	"github.com/coreos/go-semver/semver"
	"github.com/influxdata/toml"
	"github.com/influxdata/toml/ast"

	"github.com/influxdata/telegraf"
	"github.com/influxdata/telegraf/internal"
	logging "github.com/influxdata/telegraf/logger"
	"github.com/influxdata/telegraf/models"
	"github.com/influxdata/telegraf/persister"
	"github.com/influxdata/telegraf/plugins/aggregators"
	"github.com/influxdata/telegraf/plugins/inputs"
	"github.com/influxdata/telegraf/plugins/outputs"
	"github.com/influxdata/telegraf/plugins/parsers"
	"github.com/influxdata/telegraf/plugins/parsers/csv"
	"github.com/influxdata/telegraf/plugins/processors"
	"github.com/influxdata/telegraf/plugins/secretstores"
	"github.com/influxdata/telegraf/plugins/serializers"
)

var (
	httpLoadConfigRetryInterval = 10 * time.Second

	// fetchURLRe is a regex to determine whether the requested file should
	// be fetched from a remote or read from the filesystem.
	fetchURLRe = regexp.MustCompile(`^\w+://`)

	// oldVarRe is a regex to reproduce pre v1.27.0 environment variable
	// replacement behavior
	oldVarRe = regexp.MustCompile(`\$(?i:(?P<named>[_a-z][_a-z0-9]*)|{(?:(?P<braced>[_a-z][_a-z0-9]*(?::?[-+?](.*))?)}|(?P<invalid>)))`)
	// OldEnvVarReplacement is a switch to allow going back to pre v1.27.0
	// environment variable replacement behavior
	OldEnvVarReplacement = false

	// NonStrictEnvVarHandling allows to disable strict and safe environment
	// variables handling. Strict handling cannot replace non-string settings
	// so this option must be used in those use-cases.
	NonStrictEnvVarHandling = false

	// PrintPluginConfigSource is a switch to enable printing of plugin sources
	PrintPluginConfigSource = false

	// Password specified via command-line
	Password Secret

	// telegrafVersion contains the parsed semantic Telegraf version
	telegrafVersion *semver.Version = semver.New("0.0.0-unknown")

	// List of (redacted) configuration Sources
	sources   []string
	sourcesMu sync.Mutex
)

const EmptySourcePath string = ""

// Config specifies the URL/user/password for the database that telegraf
// will be logging to, as well as all the plugins that the user has
// specified
type Config struct {
	toml              *toml.Config
	errs              []error // config load errors.
	UnusedFields      map[string]bool
	unusedFieldsMutex *sync.Mutex

	Tags               map[string]string
	InputFilters       []string
	OutputFilters      []string
	SecretStoreFilters []string

	SecretStores      map[string]telegraf.SecretStore
	secretStoreSource map[string][]string

	Agent       *AgentConfig
	Inputs      []*models.RunningInput
	Outputs     []*models.RunningOutput
	Aggregators []*models.RunningAggregator
	// Processors have a slice wrapper type because they need to be sorted
	Processors        models.RunningProcessors
	AggProcessors     models.RunningProcessors
	fileProcessors    OrderedPlugins
	fileAggProcessors OrderedPlugins

	// Parsers are created by their inputs during gather. Config doesn't keep track of them
	// like the other plugins because they need to be garbage collected (See issue #11809)

	Deprecations map[string][]int64

	Persister *persister.Persister

	NumberSecrets uint64

	seenAgentTable     bool
	seenAgentTableOnce sync.Once
}

// OrderedPlugin is used to keep the order in which they appear in a file
type OrderedPlugin struct {
	Line   int
	plugin any
}
type OrderedPlugins []*OrderedPlugin

func (op OrderedPlugins) Len() int           { return len(op) }
func (op OrderedPlugins) Swap(i, j int)      { op[i], op[j] = op[j], op[i] }
func (op OrderedPlugins) Less(i, j int) bool { return op[i].Line < op[j].Line }

// NewConfig creates a new struct to hold the Telegraf config.
// For historical reasons, It holds the actual instances of the running plugins
// once the configuration is parsed.
func NewConfig() *Config {
	c := &Config{
		UnusedFields:      make(map[string]bool),
		unusedFieldsMutex: &sync.Mutex{},

		// Agent defaults:
		Agent: &AgentConfig{
			Interval:                   Duration(10 * time.Second),
			RoundInterval:              true,
			FlushInterval:              Duration(10 * time.Second),
			LogfileRotationMaxArchives: 5,
		},

		Tags:               make(map[string]string),
		Inputs:             make([]*models.RunningInput, 0),
		Outputs:            make([]*models.RunningOutput, 0),
		Processors:         make([]*models.RunningProcessor, 0),
		AggProcessors:      make([]*models.RunningProcessor, 0),
		SecretStores:       make(map[string]telegraf.SecretStore),
		secretStoreSource:  make(map[string][]string),
		fileProcessors:     make([]*OrderedPlugin, 0),
		fileAggProcessors:  make([]*OrderedPlugin, 0),
		InputFilters:       make([]string, 0),
		OutputFilters:      make([]string, 0),
		SecretStoreFilters: make([]string, 0),
		Deprecations:       make(map[string][]int64),
	}

	// Handle unknown version
	if internal.Version != "" && internal.Version != "unknown" {
		telegrafVersion = semver.New(internal.Version)
	}

	tomlCfg := &toml.Config{
		NormFieldName: toml.DefaultConfig.NormFieldName,
		FieldToKey:    toml.DefaultConfig.FieldToKey,
		MissingField:  c.missingTomlField,
	}
	c.toml = tomlCfg

	// Initialize the configuration source list
	sourcesMu.Lock()
	sources = make([]string, 0)
	sourcesMu.Unlock()

	return c
}

// AgentConfig defines configuration that will be used by the Telegraf agent
type AgentConfig struct {
	// Interval at which to gather information
	Interval Duration

	// RoundInterval rounds collection interval to 'interval'.
	//     ie, if Interval=10s then always collect on :00, :10, :20, etc.
	RoundInterval bool

	// Collected metrics are rounded to the precision specified. Precision is
	// specified as an interval with an integer + unit (e.g. 0s, 10ms, 2us, 4s).
	// Valid time units are "ns", "us" (or "µs"), "ms", "s".
	//
	// By default, or when set to "0s", precision will be set to the same
	// timestamp order as the collection interval, with the maximum being 1s:
	//   ie, when interval = "10s", precision will be "1s"
	//       when interval = "250ms", precision will be "1ms"
	//
	// Precision will NOT be used for service inputs. It is up to each individual
	// service input to set the timestamp at the appropriate precision.
	Precision Duration

	// CollectionJitter is used to jitter the collection by a random amount.
	// Each plugin will sleep for a random time within jitter before collecting.
	// This can be used to avoid many plugins querying things like sysfs at the
	// same time, which can have a measurable effect on the system.
	CollectionJitter Duration

	// CollectionOffset is used to shift the collection by the given amount.
	// This can be used to avoid many plugins querying constraint devices
	// at the same time by manually scheduling them in time.
	CollectionOffset Duration

	// FlushInterval is the Interval at which to flush data
	FlushInterval Duration

	// FlushJitter Jitters the flush interval by a random amount.
	// This is primarily to avoid large write spikes for users running a large
	// number of telegraf instances.
	// ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
	FlushJitter Duration

	// MetricBatchSize is the maximum number of metrics that is written to an
	// output plugin in one call.
	MetricBatchSize int

	// MetricBufferLimit is the max number of metrics that each output plugin
	// will cache. The buffer is cleared when a successful write occurs. When
	// full, the oldest metrics will be overwritten. This number should be a
	// multiple of MetricBatchSize. Due to current implementation, this could
	// not be less than 2 times MetricBatchSize.
	MetricBufferLimit int

	// Debug is the option for running in debug mode
	Debug bool `toml:"debug"`

	// Quiet is the option for running in quiet mode
	Quiet bool `toml:"quiet"`

	// Log target controls the destination for logs and can be one of "file",
	// "stderr" or, on Windows, "eventlog". When set to "file", the output file
	// is determined by the "logfile" setting
	LogTarget string `toml:"logtarget" deprecated:"1.32.0;1.40.0;use 'logformat' and 'logfile' instead"`

	// Log format controls the way messages are logged and can be one of "text",
	// "structured" or, on Windows, "eventlog".
	LogFormat string `toml:"logformat"`

	// Name of the file to be logged to or stderr if empty. Ignored for "eventlog" format.
	Logfile string `toml:"logfile"`

	// Message key for structured logs, to override the default of "msg".
	// Ignored if "logformat" is not "structured".
	StructuredLogMessageKey string `toml:"structured_log_message_key"`

	// The file will be rotated after the time interval specified.  When set
	// to 0 no time based rotation is performed.
	LogfileRotationInterval Duration `toml:"logfile_rotation_interval"`

	// The logfile will be rotated when it becomes larger than the specified
	// size.  When set to 0 no size based rotation is performed.
	LogfileRotationMaxSize Size `toml:"logfile_rotation_max_size"`

	// Maximum number of rotated archives to keep, any older logs are deleted.
	// If set to -1, no archives are removed.
	LogfileRotationMaxArchives int `toml:"logfile_rotation_max_archives"`

	// Pick a timezone to use when logging or type 'local' for local time.
	LogWithTimezone string `toml:"log_with_timezone"`

	Hostname     string
	OmitHostname bool

	// Method for translating SNMP objects. 'netsnmp' to call external programs,
	// 'gosmi' to use the built-in library.
	SnmpTranslator string `toml:"snmp_translator"`

	// Name of the file to load the state of plugins from and store the state to.
	// If uncommented and not empty, this file will be used to save the state of
	// stateful plugins on termination of Telegraf. If the file exists on start,
	// the state in the file will be restored for the plugins.
	Statefile string `toml:"statefile"`

	// Flag to always keep tags explicitly defined in the plugin itself and
	// ensure those tags always pass filtering.
	AlwaysIncludeLocalTags bool `toml:"always_include_local_tags"`

	// Flag to always keep tags explicitly defined in the global tags section
	// and ensure those tags always pass filtering.
	AlwaysIncludeGlobalTags bool `toml:"always_include_global_tags"`

	// Flag to skip running processors after aggregators
	// By default, processors are run a second time after aggregators. Changing
	// this setting to true will skip the second run of processors.
	SkipProcessorsAfterAggregators *bool `toml:"skip_processors_after_aggregators"`

	// Number of attempts to obtain a remote configuration via a URL during
	// startup. Set to -1 for unlimited attempts.
	ConfigURLRetryAttempts int `toml:"config_url_retry_attempts"`

	// BufferStrategy is the metric buffer type to use for a given output plugin.
	// Supported types currently are "memory" and "disk_write_through" (alias: "disk").
	BufferStrategy string `toml:"buffer_strategy"`

	// BufferDirectory is the directory to store buffer files for serialized
	// to disk metrics when using the "disk_write_through" buffer strategy.
	BufferDirectory string `toml:"buffer_directory"`

	// BufferDiskSync controls writes durability when "disk" buffer strategy
	// is used. No sync offers better write performance at the risk of losing
	// metrics buffered in the last `flush_interval` in the event of a power
	// cut.
	BufferDiskSync *bool `toml:"buffer_disk_sync"`
}

// InputNames returns a list of strings of the configured inputs.
func (c *Config) InputNames() []string {
	name := make([]string, 0, len(c.Inputs))
	for _, input := range c.Inputs {
		name = append(name, input.Config.Name)
	}
	return PluginNameCounts(name)
}

// InputNamesWithSources returns a table representation of input names and their sources.
func (c *Config) InputNamesWithSources() string {
	plugins := make(pluginNames, 0, len(c.Inputs))
	for _, input := range c.Inputs {
		plugins = append(plugins, pluginPrinter{
			name:   input.Config.Name,
			source: input.Config.Source,
		})
	}
	return getPluginSourcesTable(plugins)
}

// AggregatorNames returns a list of strings of the configured aggregators.
func (c *Config) AggregatorNames() []string {
	name := make([]string, 0, len(c.Aggregators))
	for _, aggregator := range c.Aggregators {
		name = append(name, aggregator.Config.Name)
	}
	return PluginNameCounts(name)
}

// AggregatorNamesWithSources returns a table representation of aggregator names and their sources.
func (c *Config) AggregatorNamesWithSources() string {
	plugins := make(pluginNames, 0, len(c.Aggregators))
	for _, aggregator := range c.Aggregators {
		plugins = append(plugins, pluginPrinter{
			name:   aggregator.Config.Name,
			source: aggregator.Config.Source,
		})
	}
	return getPluginSourcesTable(plugins)
}

// ProcessorNames returns a list of strings of the configured processors.
func (c *Config) ProcessorNames() []string {
	name := make([]string, 0, len(c.Processors))
	for _, processor := range c.Processors {
		name = append(name, processor.Config.Name)
	}
	return PluginNameCounts(name)
}

// ProcessorNamesWithSources returns a table representation of processor names and their sources.
func (c *Config) ProcessorNamesWithSources() string {
	plugins := make(pluginNames, 0, len(c.Processors))
	for _, processor := range c.Processors {
		plugins = append(plugins, pluginPrinter{
			name:   processor.Config.Name,
			source: processor.Config.Source,
		})
	}
	return getPluginSourcesTable(plugins)
}

// OutputNames returns a list of strings of the configured outputs.
func (c *Config) OutputNames() []string {
	name := make([]string, 0, len(c.Outputs))
	for _, output := range c.Outputs {
		name = append(name, output.Config.Name)
	}
	return PluginNameCounts(name)
}

// OutputNamesWithSources returns a table representation of output names and their sources.
func (c *Config) OutputNamesWithSources() string {
	plugins := make(pluginNames, 0, len(c.Outputs))
	for _, output := range c.Outputs {
		plugins = append(plugins, pluginPrinter{
			name:   output.Config.Name,
			source: output.Config.Source,
		})
	}
	return getPluginSourcesTable(plugins)
}

// SecretstoreNames returns a list of strings of the configured secret-stores.
func (c *Config) SecretstoreNames() []string {
	names := make([]string, 0, len(c.SecretStores))
	for name := range c.SecretStores {
		names = append(names, name)
	}
	return PluginNameCounts(names)
}

// SecretstoreNamesWithSources returns a table representation of secret store names and their sources.
func (c *Config) SecretstoreNamesWithSources() string {
	plugins := make(pluginNames, 0, len(c.SecretStores))
	for name, sources := range c.secretStoreSource {
		for _, source := range sources {
			plugins = append(plugins, pluginPrinter{
				name:   name,
				source: source,
			})
		}
	}
	return getPluginSourcesTable(plugins)
}

// PluginNameCounts returns a string of plugin names and their counts.
// PluginNameCounts returns a list of sorted plugin names and their count
func PluginNameCounts(plugins []string) []string {
	names := make(map[string]int)
	for _, plugin := range plugins {
		names[plugin]++
	}

	var namecount []string
	for name, count := range names {
		if count == 1 {
			namecount = append(namecount, name)
		} else {
			namecount = append(namecount, fmt.Sprintf("%s (%dx)", name, count))
		}
	}

	sort.Strings(namecount)
	return namecount
}

// ListTags returns a string of tags specified in the config,
// line-protocol style
func (c *Config) ListTags() string {
	tags := make([]string, 0, len(c.Tags))
	for k, v := range c.Tags {
		tags = append(tags, fmt.Sprintf("%s=%s", k, v))
	}

	sort.Strings(tags)

	return strings.Join(tags, " ")
}

func sliceContains(name string, list []string) bool {
	for _, b := range list {
		if b == name {
			return true
		}
	}
	return false
}

// WalkDirectory collects all toml files that need to be loaded
func WalkDirectory(path string) ([]string, error) {
	// Check permissions of the directly specified directories and error
	// out if those are not readable
	if _, err := os.ReadDir(path); err != nil {
		return nil, err
	}

	var files []string
	walkfn := func(thispath string, info os.FileInfo, err error) error {
		if info == nil || errors.Is(err, os.ErrPermission) {
			log.Printf("W! Telegraf is not permitted to read %s", thispath)
			return nil
		}

		if info.IsDir() {
			if strings.HasPrefix(info.Name(), "..") {
				// skip Kubernetes mounts, preventing loading the same config twice
				return filepath.SkipDir
			}

			return nil
		}
		name := info.Name()
		if len(name) < 6 || name[len(name)-5:] != ".conf" {
			return nil
		}
		files = append(files, thispath)
		return nil
	}
	return files, filepath.Walk(path, walkfn)
}

// GetDefaultConfigPath will try to find a default config file at these locations (in order):
//  1. $TELEGRAF_CONFIG_PATH
//  2. $HOME/.telegraf/telegraf.conf
//  3. /etc/telegraf/telegraf.conf and /etc/telegraf/telegraf.d/*.conf
func GetDefaultConfigPath() ([]string, error) {
	envfile := os.Getenv("TELEGRAF_CONFIG_PATH")
	homefile := os.ExpandEnv("${HOME}/.telegraf/telegraf.conf")
	etcfile := "/etc/telegraf/telegraf.conf"
	etcfolder := "/etc/telegraf/telegraf.d"

	if runtime.GOOS == "windows" {
		programFiles := os.Getenv("ProgramFiles")
		if programFiles == "" { // Should never happen
			programFiles = `C:\Program Files`
		}
		etcfile = programFiles + `\Telegraf\telegraf.conf`
		etcfolder = programFiles + `\Telegraf\telegraf.d\`
	}

	for _, path := range []string{envfile, homefile} {
		if isURL(path) {
			return []string{path}, nil
		}
		if _, err := os.Stat(path); err == nil {
			return []string{path}, nil
		}
	}

	// At this point we need to check if the files under /etc/telegraf are
	// populated and return them all.
	confFiles := make([]string, 0)
	if _, err := os.Stat(etcfile); err == nil {
		confFiles = append(confFiles, etcfile)
	}
	if _, err := os.Stat(etcfolder); err == nil {
		files, err := WalkDirectory(etcfolder)
		if err != nil {
			log.Printf("W! unable walk %q: %s", etcfolder, err)
		}
		confFiles = append(confFiles, files...)
	}
	if len(confFiles) > 0 {
		return confFiles, nil
	}

	// if we got here, we didn't find a file in a default location
	return nil, fmt.Errorf("no config file specified, and could not find one"+
		" in $TELEGRAF_CONFIG_PATH, %s, %s, or %s/*.conf", homefile, etcfile, etcfolder)
}

// isURL checks if string is valid url
func isURL(str string) bool {
	u, err := url.Parse(str)
	return err == nil && u.Scheme != "" && u.Host != ""
}

// LoadConfig loads the given config files and applies it to c
func (c *Config) LoadConfig(path string) error {
	if !c.Agent.Quiet {
		log.Printf("I! Loading config: %s", path)
	}

	data, _, err := LoadConfigFileWithRetries(path, c.Agent.ConfigURLRetryAttempts)
	if err != nil {
		return fmt.Errorf("loading config file %s failed: %w", path, err)
	}

	if err = c.LoadConfigData(data, path); err != nil {
		return fmt.Errorf("loading config file %s failed: %w", path, err)
	}

	return nil
}

func (c *Config) LoadAll(configFiles ...string) error {
	for _, fConfig := range configFiles {
		if err := c.LoadConfig(fConfig); err != nil {
			return err
		}
	}

	// Sort the processors according to their `order` setting while
	// using a stable sort to keep the file loading / file position order.
	sort.Stable(c.Processors)
	sort.Stable(c.AggProcessors)

	// Set snmp agent translator default
	if c.Agent.SnmpTranslator == "" {
		c.Agent.SnmpTranslator = "netsnmp"
	}

	// Check if there is enough lockable memory for the secret
	count := secretCount.Load()
	if count < 0 {
		log.Printf("E! Invalid secret count %d, please report this incident including your configuration!", count)
		count = 0
	}
	c.NumberSecrets = uint64(count)

	// Let's link all secrets to their secret-stores
	return c.LinkSecrets()
}

type cfgDataOptions struct {
	sourcePath string
}

type cfgDataOption func(*cfgDataOptions)

func WithSourcePath(path string) cfgDataOption {
	return func(o *cfgDataOptions) {
		o.sourcePath = path
	}
}

// LoadConfigData loads TOML-formatted config data
func (c *Config) LoadConfigData(data []byte, path string) error {
	tbl, err := parseConfig(data)
	if err != nil {
		return fmt.Errorf("error parsing data: %w", err)
	}

	// Parse tags tables first:
	for _, tableName := range []string{"tags", "global_tags"} {
		if val, ok := tbl.Fields[tableName]; ok {
			subTable, ok := val.(*ast.Table)
			if !ok {
				return fmt.Errorf("invalid configuration, bad table name %q", tableName)
			}
			if err = c.toml.UnmarshalTable(subTable, c.Tags); err != nil {
				return fmt.Errorf("error parsing table name %q: %w", tableName, err)
			}
		}
	}

	// Parse agent table:
	if val, ok := tbl.Fields["agent"]; ok {
		if c.seenAgentTable {
			c.seenAgentTableOnce.Do(func() {
				log.Printf("W! Overlapping settings in multiple agent tables are not supported: may cause undefined behavior")
			})
		}
		c.seenAgentTable = true

		subTable, ok := val.(*ast.Table)
		if !ok {
			return errors.New("invalid configuration, error parsing agent table")
		}
		if err = c.toml.UnmarshalTable(subTable, c.Agent); err != nil {
			return fmt.Errorf("error parsing [agent]: %w", err)
		}
		if c.Agent.CollectionOffset < 0 {
			return fmt.Errorf("agent collection_offset must not be negative, found %v", c.Agent.CollectionOffset)
		}
	}

	if !c.Agent.OmitHostname {
		if c.Agent.Hostname == "" {
			hostname, err := os.Hostname()
			if err != nil {
				return err
			}

			c.Agent.Hostname = hostname
		}

		c.Tags["host"] = c.Agent.Hostname
	}

	// Warn when explicitly setting the old snmp translator
	if c.Agent.SnmpTranslator == "netsnmp" {
		PrintOptionValueDeprecationNotice("agent", "snmp_translator", "netsnmp", telegraf.DeprecationInfo{
			Since:     "1.25.0",
			RemovalIn: "1.40.0",
			Notice:    "Use 'gosmi' instead",
		})
	}

	// Set up the persister if requested
	if c.Agent.Statefile != "" {
		c.Persister = &persister.Persister{
			Filename: c.Agent.Statefile,
		}
	}

	if len(c.UnusedFields) > 0 {
		return fmt.Errorf(
			"line %d: configuration specified the fields %q, but they were not used; "+
				"this is either a typo or this config option does not exist in this version",
			tbl.Line, keys(c.UnusedFields))
	}

	// Initialize the file-sorting slices
	c.fileProcessors = make(OrderedPlugins, 0)
	c.fileAggProcessors = make(OrderedPlugins, 0)

	// Parse all the rest of the plugins:
	for name, val := range tbl.Fields {
		subTable, ok := val.(*ast.Table)
		if !ok {
			return fmt.Errorf("invalid configuration, error parsing field %q as table", name)
		}

		switch name {
		case "agent", "global_tags", "tags":
		case "outputs":
			for pluginName, pluginVal := range subTable.Fields {
				switch pluginSubTable := pluginVal.(type) {
				// legacy [outputs.influxdb] support
				case *ast.Table:
					if err = c.addOutput(pluginName, path, pluginSubTable); err != nil {
						return fmt.Errorf("error parsing %s, %w", pluginName, err)
					}
				case []*ast.Table:
					for _, t := range pluginSubTable {
						if err = c.addOutput(pluginName, path, t); err != nil {
							return fmt.Errorf("error parsing %s array, %w", pluginName, err)
						}
					}
				default:
					return fmt.Errorf("unsupported config format: %s",
						pluginName)
				}
				if len(c.UnusedFields) > 0 {
					return fmt.Errorf(
						"plugin %s.%s: line %d: configuration specified the fields %q, but they were not used; "+
							"this is either a typo or this config option does not exist in this version",
						name, pluginName, subTable.Line, keys(c.UnusedFields))
				}
			}
		case "inputs", "plugins":
			for pluginName, pluginVal := range subTable.Fields {
				switch pluginSubTable := pluginVal.(type) {
				// legacy [inputs.cpu] support
				case *ast.Table:
					if err = c.addInput(pluginName, path, pluginSubTable); err != nil {
						return fmt.Errorf("error parsing %s, %w", pluginName, err)
					}
				case []*ast.Table:
					for _, t := range pluginSubTable {
						if err = c.addInput(pluginName, path, t); err != nil {
							return fmt.Errorf("error parsing %s, %w", pluginName, err)
						}
					}
				default:
					return fmt.Errorf("unsupported config format: %s",
						pluginName)
				}
				if len(c.UnusedFields) > 0 {
					return fmt.Errorf(
						"plugin %s.%s: line %d: configuration specified the fields %q, but they were not used; "+
							"this is either a typo or this config option does not exist in this version",
						name, pluginName, subTable.Line, keys(c.UnusedFields))
				}
			}
		case "processors":
			for pluginName, pluginVal := range subTable.Fields {
				switch pluginSubTable := pluginVal.(type) {
				case []*ast.Table:
					for _, t := range pluginSubTable {
						if err = c.addProcessor(pluginName, path, t); err != nil {
							return fmt.Errorf("error parsing %s, %w", pluginName, err)
						}
					}
				default:
					return fmt.Errorf("unsupported config format: %s",
						pluginName)
				}
				if len(c.UnusedFields) > 0 {
					return fmt.Errorf(
						"plugin %s.%s: line %d: configuration specified the fields %q, but they were not used; "+
							"this is either a typo or this config option does not exist in this version",
						name,
						pluginName,
						subTable.Line,
						keys(c.UnusedFields),
					)
				}
			}
		case "aggregators":
			for pluginName, pluginVal := range subTable.Fields {
				switch pluginSubTable := pluginVal.(type) {
				case []*ast.Table:
					for _, t := range pluginSubTable {
						if err = c.addAggregator(pluginName, path, t); err != nil {
							return fmt.Errorf("error parsing %s, %w", pluginName, err)
						}
					}
				default:
					return fmt.Errorf("unsupported config format: %s",
						pluginName)
				}
				if len(c.UnusedFields) > 0 {
					return fmt.Errorf(
						"plugin %s.%s: line %d: configuration specified the fields %q, but they were not used; "+
							"this is either a typo or this config option does not exist in this version",
						name, pluginName, subTable.Line, keys(c.UnusedFields))
				}
			}
		case "secretstores":
			for pluginName, pluginVal := range subTable.Fields {
				switch pluginSubTable := pluginVal.(type) {
				case []*ast.Table:
					for _, t := range pluginSubTable {
						if err = c.addSecretStore(pluginName, path, t); err != nil {
							return fmt.Errorf("error parsing %s, %w", pluginName, err)
						}
					}
				default:
					return fmt.Errorf("unsupported config format: %s", pluginName)
				}
				if len(c.UnusedFields) > 0 {
					msg := "plugin %s.%s: line %d: configuration specified the fields %q, but they were not used; " +
						"this is either a typo or this config option does not exist in this version"
					return fmt.Errorf(msg, name, pluginName, subTable.Line, keys(c.UnusedFields))
				}
			}

		// Assume it's an input for legacy config file support if no other
		// identifiers are present
		default:
			if err = c.addInput(name, path, subTable); err != nil {
				return fmt.Errorf("error parsing %s, %w", name, err)
			}
		}
	}

	// Sort the processor according to the order they appeared in this file
	// In a later stage, we sort them using the `order` option.
	sort.Sort(c.fileProcessors)
	for _, op := range c.fileProcessors {
		c.Processors = append(c.Processors, op.plugin.(*models.RunningProcessor))
	}

	sort.Sort(c.fileAggProcessors)
	for _, op := range c.fileAggProcessors {
		c.AggProcessors = append(c.AggProcessors, op.plugin.(*models.RunningProcessor))
	}

	return nil
}

// trimBOM trims the Byte-Order-Marks from the beginning of the file.
// this is for Windows compatibility only.
// see https://github.com/influxdata/telegraf/issues/1378
func trimBOM(f []byte) []byte {
	return bytes.TrimPrefix(f, []byte("\xef\xbb\xbf"))
}

// LoadConfigFile loads the content of a configuration file and returns it
// together with a flag denoting if the file is from a remote location such
// as a web server.
func LoadConfigFile(config string) ([]byte, bool, error) {
	return LoadConfigFileWithRetries(config, 0)
}

func LoadConfigFileWithRetries(config string, urlRetryAttempts int) ([]byte, bool, error) {
	if fetchURLRe.MatchString(config) {
		u, err := url.Parse(config)
		if err != nil {
			return nil, true, err
		}

		switch u.Scheme {
		case "https", "http":
			data, err := fetchConfig(u, urlRetryAttempts)
			if err != nil {
				return nil, true, err
			}
			sourcesMu.Lock()
			sources = append(sources, u.Redacted())
			sourcesMu.Unlock()
			return data, true, nil
		default:
			return nil, true, fmt.Errorf("scheme %q not supported", u.Scheme)
		}
	}

	// If it isn't a https scheme, try it as a file
	buffer, err := os.ReadFile(config)
	if err != nil {
		return nil, false, err
	}
	sourcesMu.Lock()
	sources = append(sources, config)
	sourcesMu.Unlock()

	mimeType := http.DetectContentType(buffer)
	if !strings.Contains(mimeType, "text/plain") {
		return nil, false, fmt.Errorf("provided config is not a TOML file: %s", config)
	}

	return buffer, false, nil
}

// GetSources returns the redacted list of configuration sources
func GetSources() []string {
	sourcesMu.Lock()
	defer sourcesMu.Unlock()
	return slices.Clone(sources)
}

func fetchConfig(u *url.URL, urlRetryAttempts int) ([]byte, error) {
	req, err := http.NewRequest("GET", u.String(), nil)
	if err != nil {
		return nil, err
	}

	if v, exists := os.LookupEnv("INFLUX_TOKEN"); exists {
		req.Header.Add("Authorization", "Token "+v)
	}
	req.Header.Add("Accept", "application/toml")
	req.Header.Set("User-Agent", internal.ProductToken())

	var totalAttempts int
	if urlRetryAttempts == -1 {
		totalAttempts = -1
		log.Printf("Using unlimited number of attempts to fetch HTTP config")
	} else if urlRetryAttempts == 0 {
		totalAttempts = 3
	} else if urlRetryAttempts > 0 {
		totalAttempts = urlRetryAttempts
	} else {
		return nil, fmt.Errorf("invalid number of attempts: %d", urlRetryAttempts)
	}

	attempt := 0
	for {
		body, err := requestURLConfig(req)
		if err == nil {
			return body, nil
		}

		log.Printf("Error getting HTTP config (attempt %d of %d): %s", attempt, totalAttempts, err)
		if urlRetryAttempts != -1 && attempt >= totalAttempts {
			return nil, err
		}

		time.Sleep(httpLoadConfigRetryInterval)
		attempt++
	}
}

func requestURLConfig(req *http.Request) ([]byte, error) {
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		return nil, fmt.Errorf("failed to connect to HTTP config server: %w", err)
	}
	defer resp.Body.Close()

	if resp.StatusCode != http.StatusOK {
		return nil, fmt.Errorf("failed to fetch HTTP config: %s", resp.Status)
	}

	body, err := io.ReadAll(resp.Body)
	if err != nil {
		return nil, fmt.Errorf("failed to read response body: %w", err)
	}

	return body, nil
}

// parseConfig loads a TOML configuration from a provided path and
// returns the AST produced from the TOML parser. When loading the file, it
// will find environment variables and replace them.
func parseConfig(contents []byte) (*ast.Table, error) {
	contents = trimBOM(contents)
	var err error
	contents, err = removeComments(contents)
	if err != nil {
		return nil, err
	}

	// Use non-strict mode
	if NonStrictEnvVarHandling {
		output, err := substituteEnvironmentNonStrict(contents, OldEnvVarReplacement)
		if err != nil {
			return nil, err
		}
		return toml.Parse(output)
	}

	// Use strict mode (default)
	return substituteEnvironmentStrict(contents, OldEnvVarReplacement)
}

func (c *Config) addAggregator(name, source string, table *ast.Table) error {
	enabled, err := c.matchesLabelSelection(table)
	if err != nil {
		return fmt.Errorf("invalid label in plugin aggregators.%s: %w", name, err)
	}
	if !enabled {
		return nil
	}

	creator, ok := aggregators.Aggregators[name]
	if !ok {
		// Handle removed, deprecated plugins
		if di, deprecated := aggregators.Deprecations[name]; deprecated {
			printHistoricPluginDeprecationNotice("aggregators", name, di)
			return errors.New("plugin deprecated")
		}
		return fmt.Errorf("undefined but requested aggregator: %s", name)
	}
	aggregator := creator()

	conf, err := c.buildAggregator(name, source, table)
	if err != nil {
		return err
	}

	if err := c.toml.UnmarshalTable(table, aggregator); err != nil {
		return err
	}

	if err := c.printUserDeprecation("aggregators", name, aggregator); err != nil {
		return err
	}

	c.Aggregators = append(c.Aggregators, models.NewRunningAggregator(aggregator, conf))
	return nil
}

func (c *Config) addSecretStore(name, source string, table *ast.Table) error {
	if len(c.SecretStoreFilters) > 0 && !sliceContains(name, c.SecretStoreFilters) {
		return nil
	}

	enabled, err := c.matchesLabelSelection(table)
	if err != nil {
		return fmt.Errorf("invalid label in plugin secretstores.%s: %w", name, err)
	}
	if !enabled {
		return nil
	}

	storeID := c.getFieldString(table, "id")
	if storeID == "" {
		return fmt.Errorf("%q secret-store without ID", name)
	}
	if !secretStorePattern.MatchString(storeID) {
		return fmt.Errorf("invalid secret-store ID %q, must only contain letters, numbers or underscore", storeID)
	}

	tags := map[string]string{
		"_id":         storeID,
		"secretstore": name,
	}

	creator, ok := secretstores.SecretStores[name]
	if !ok {
		// Handle removed, deprecated plugins
		if di, deprecated := secretstores.Deprecations[name]; deprecated {
			printHistoricPluginDeprecationNotice("secretstores", name, di)
			return errors.New("plugin deprecated")
		}
		return fmt.Errorf("undefined but requested secretstores: %s", name)
	}
	store := creator(storeID)

	if err := c.toml.UnmarshalTable(table, store); err != nil {
		return err
	}

	if err := c.printUserDeprecation("secretstores", name, store); err != nil {
		return err
	}

	logger := logging.New("secretstores", name, "")
	models.SetLoggerOnPlugin(store, logger)
	models.SetStatisticsOnPlugin(store, logger, tags)

	if err := store.Init(); err != nil {
		return fmt.Errorf("error initializing secret-store %q: %w", storeID, err)
	}

	if _, found := c.SecretStores[storeID]; found {
		return fmt.Errorf("duplicate ID %q for secretstore %q", storeID, name)
	}
	c.SecretStores[storeID] = store
	if _, found := c.secretStoreSource[name]; !found {
		c.secretStoreSource[name] = make([]string, 0)
	}
	c.secretStoreSource[name] = append(c.secretStoreSource[name], source)
	return nil
}

func (c *Config) LinkSecrets() error {
	for _, s := range unlinkedSecrets {
		resolvers := make(map[string]telegraf.ResolveFunc)
		for _, ref := range s.GetUnlinked() {
			// Split the reference and lookup the resolver
			storeID, key := splitLink(ref)
			store, found := c.SecretStores[storeID]
			if !found {
				return fmt.Errorf("unknown secret-store for %q", ref)
			}
			resolver, err := store.GetResolver(key)
			if err != nil {
				return fmt.Errorf("retrieving resolver for %q failed: %w", ref, err)
			}
			resolvers[ref] = resolver
		}
		// Inject the resolver list into the secret
		if err := s.Link(resolvers); err != nil {
			return fmt.Errorf("retrieving resolver failed: %w", err)
		}
	}
	return nil
}

func (c *Config) probeParser(parentCategory, parentName string, table *ast.Table) bool {
	dataFormat := c.getFieldString(table, "data_format")
	if dataFormat == "" {
		dataFormat = setDefaultParser(parentCategory, parentName)
	}

	creator, ok := parsers.Parsers[dataFormat]
	if !ok {
		return false
	}

	// Try to parse the options to detect if any of them is misspelled
	parser := creator("")
	//nolint:errcheck // We don't actually use the parser, so no need to check the error.
	c.toml.UnmarshalTable(table, parser)

	return true
}

func (c *Config) addParser(parentcategory, parentname string, table *ast.Table) (*models.RunningParser, error) {
	conf := &models.ParserConfig{
		Parent: parentname,
	}

	conf.DataFormat = c.getFieldString(table, "data_format")
	if conf.DataFormat == "" {
		conf.DataFormat = setDefaultParser(parentcategory, parentname)
	} else if conf.DataFormat == "influx" {
		influxParserType := c.getFieldString(table, "influx_parser_type")
		if influxParserType == "upstream" {
			conf.DataFormat = "influx_upstream"
		}
	}
	conf.LogLevel = c.getFieldString(table, "log_level")

	creator, ok := parsers.Parsers[conf.DataFormat]
	if !ok {
		return nil, fmt.Errorf("undefined but requested parser: %s", conf.DataFormat)
	}
	parser := creator(parentname)

	// Handle reset-mode of CSV parsers to stay backward compatible (see issue #12022)
	if conf.DataFormat == "csv" && parentcategory == "inputs" {
		if parentname == "exec" {
			csvParser := parser.(*csv.Parser)
			csvParser.ResetMode = "always"
		}
	}

	if err := c.toml.UnmarshalTable(table, parser); err != nil {
		return nil, err
	}

	running := models.NewRunningParser(parser, conf)
	err := running.Init()
	return running, err
}

func (c *Config) probeSerializer(table *ast.Table) bool {
	dataFormat := c.getFieldString(table, "data_format")
	if dataFormat == "" {
		dataFormat = "influx"
	}

	creator, ok := serializers.Serializers[dataFormat]
	if !ok {
		return false
	}

	// Try to parse the options to detect if any of them is misspelled
	serializer := creator()
	//nolint:errcheck // We don't actually use the parser, so no need to check the error.
	c.toml.UnmarshalTable(table, serializer)

	return true
}

func (c *Config) addSerializer(parentname string, table *ast.Table) (*models.RunningSerializer, error) {
	conf := &models.SerializerConfig{
		Parent: parentname,
	}
	conf.DataFormat = c.getFieldString(table, "data_format")
	if conf.DataFormat == "" {
		conf.DataFormat = "influx"
	}
	conf.LogLevel = c.getFieldString(table, "log_level")

	creator, ok := serializers.Serializers[conf.DataFormat]
	if !ok {
		return nil, fmt.Errorf("undefined but requested serializer: %s", conf.DataFormat)
	}
	serializer := creator()

	if err := c.toml.UnmarshalTable(table, serializer); err != nil {
		return nil, err
	}

	running := models.NewRunningSerializer(serializer, conf)
	err := running.Init()
	return running, err
}

func (c *Config) addProcessor(name, source string, table *ast.Table) error {
	enabled, err := c.matchesLabelSelection(table)
	if err != nil {
		return fmt.Errorf("invalid label in plugin processors.%s: %w", name, err)
	}
	if !enabled {
		return nil
	}
	creator, ok := processors.Processors[name]
	if !ok {
		// Handle removed, deprecated plugins
		if di, deprecated := processors.Deprecations[name]; deprecated {
			printHistoricPluginDeprecationNotice("processors", name, di)
			return errors.New("plugin deprecated")
		}
		return fmt.Errorf("undefined but requested processor: %s", name)
	}

	// For processors with parsers we need to compute the set of
	// options that is not covered by both, the parser and the processor.
	// We achieve this by keeping a local book of missing entries
	// that counts the number of misses. In case we have a parser
	// for the input both need to miss the entry. We count the
	// missing entries at the end.
	missCount := make(map[string]int)
	missCountThreshold := 0
	c.setLocalMissingTomlFieldTracker(missCount)
	defer c.resetMissingTomlFieldTracker()

	// Set up the processor running before the aggregators
	processorBeforeConfig, err := c.buildProcessor("processors", name, source, table)
	if err != nil {
		return err
	}
	processorBefore, count, err := c.setupProcessor(processorBeforeConfig.Name, creator, table)
	if err != nil {
		return err
	}
	rf := models.NewRunningProcessor(processorBefore, processorBeforeConfig)
	c.fileProcessors = append(c.fileProcessors, &OrderedPlugin{table.Line, rf})

	// Setup another (new) processor instance running after the aggregator
	processorAfterConfig, err := c.buildProcessor("aggprocessors", name, source, table)
	if err != nil {
		return err
	}
	processorAfter, _, err := c.setupProcessor(processorAfterConfig.Name, creator, table)
	if err != nil {
		return err
	}
	rf = models.NewRunningProcessor(processorAfter, processorAfterConfig)
	c.fileAggProcessors = append(c.fileAggProcessors, &OrderedPlugin{table.Line, rf})

	// Check the number of misses against the threshold. We need to double
	// the count as the processor setup is executed twice.
	missCountThreshold = 2 * count
	for key, count := range missCount {
		if count <= missCountThreshold {
			continue
		}
		if err := c.missingTomlField(nil, key); err != nil {
			return err
		}
	}

	return nil
}

func (c *Config) setupProcessor(name string, creator processors.StreamingCreator, table *ast.Table) (telegraf.StreamingProcessor, int, error) {
	var optionTestCount int

	streamingProcessor := creator()

	var processor interface{}
	if p, ok := streamingProcessor.(processors.HasUnwrap); ok {
		processor = p.Unwrap()
	} else {
		processor = streamingProcessor
	}

	// If the (underlying) processor has a SetParser or SetParserFunc function,
	// it can accept arbitrary data-formats, so build the requested parser and
	// set it.
	if t, ok := processor.(telegraf.ParserPlugin); ok {
		parser, err := c.addParser("processors", name, table)
		if err != nil {
			return nil, 0, fmt.Errorf("adding parser failed: %w", err)
		}
		t.SetParser(parser)
		optionTestCount++
	}

	if t, ok := processor.(telegraf.ParserFuncPlugin); ok {
		if !c.probeParser("processors", name, table) {
			return nil, 0, errors.New("parser not found")
		}
		t.SetParserFunc(func() (telegraf.Parser, error) {
			return c.addParser("processors", name, table)
		})
		optionTestCount++
	}

	// If the (underlying) processor has a SetSerializer function it can accept
	// arbitrary data-formats, so build the requested serializer and set it.
	if t, ok := processor.(telegraf.SerializerPlugin); ok {
		serializer, err := c.addSerializer(name, table)
		if err != nil {
			return nil, 0, fmt.Errorf("adding serializer failed: %w", err)
		}
		t.SetSerializer(serializer)
		optionTestCount++
	}
	if t, ok := processor.(telegraf.SerializerFuncPlugin); ok {
		if !c.probeSerializer(table) {
			return nil, 0, errors.New("serializer not found")
		}
		t.SetSerializerFunc(func() (telegraf.Serializer, error) {
			return c.addSerializer(name, table)
		})
		optionTestCount++
	}

	if err := c.toml.UnmarshalTable(table, processor); err != nil {
		return nil, 0, fmt.Errorf("unmarshalling failed: %w", err)
	}

	err := c.printUserDeprecation("processors", name, processor)
	return streamingProcessor, optionTestCount, err
}

func (c *Config) addOutput(name, source string, table *ast.Table) error {
	if len(c.OutputFilters) > 0 && !sliceContains(name, c.OutputFilters) {
		return nil
	}

	enabled, err := c.matchesLabelSelection(table)
	if err != nil {
		return fmt.Errorf("invalid label in plugin outputs.%s: %w", name, err)
	}
	if !enabled {
		return nil
	}

	// For outputs with serializers we need to compute the set of
	// options that is not covered by both, the serializer and the input.
	// We achieve this by keeping a local book of missing entries
	// that counts the number of misses. In case we have a parser
	// for the input both need to miss the entry. We count the
	// missing entries at the end.
	missThreshold := 0
	missCount := make(map[string]int)
	c.setLocalMissingTomlFieldTracker(missCount)
	defer c.resetMissingTomlFieldTracker()

	creator, ok := outputs.Outputs[name]
	if !ok {
		// Handle removed, deprecated plugins
		if di, deprecated := outputs.Deprecations[name]; deprecated {
			printHistoricPluginDeprecationNotice("outputs", name, di)
			return errors.New("plugin deprecated")
		}
		return fmt.Errorf("undefined but requested output: %s", name)
	}
	output := creator()

	// If the output has a SetSerializer function, then this means it can write
	// arbitrary types of output, so build the serializer and set it.
	if t, ok := output.(telegraf.SerializerPlugin); ok {
		missThreshold = 1
		serializer, err := c.addSerializer(name, table)
		if err != nil {
			return err
		}
		t.SetSerializer(serializer)
	}

	if t, ok := output.(telegraf.SerializerFuncPlugin); ok {
		missThreshold = 1
		if !c.probeSerializer(table) {
			return errors.New("serializer not found")
		}
		t.SetSerializerFunc(func() (telegraf.Serializer, error) {
			return c.addSerializer(name, table)
		})
	}

	outputConfig, err := c.buildOutput(name, source, table)
	if err != nil {
		return err
	}

	if err := c.toml.UnmarshalTable(table, output); err != nil {
		return err
	}

	if err := c.printUserDeprecation("outputs", name, output); err != nil {
		return err
	}

	if c, ok := interface{}(output).(interface{ TLSConfig() (*tls.Config, error) }); ok {
		if _, err := c.TLSConfig(); err != nil {
			return err
		}
	}

	// Check the number of misses against the threshold
	for key, count := range missCount {
		if count <= missThreshold {
			continue
		}
		if err := c.missingTomlField(nil, key); err != nil {
			return err
		}
	}

	ro, err := models.NewRunningOutput(output, outputConfig, c.Agent.MetricBatchSize, c.Agent.MetricBufferLimit)
	if err != nil {
		return err
	}
	c.Outputs = append(c.Outputs, ro)

	return nil
}

func (c *Config) addInput(name, source string, table *ast.Table) error {
	if len(c.InputFilters) > 0 && !sliceContains(name, c.InputFilters) {
		return nil
	}

	enabled, err := c.matchesLabelSelection(table)
	if err != nil {
		return fmt.Errorf("invalid label in plugin inputs.%s: %w", name, err)
	}
	if !enabled {
		return nil
	}

	// For inputs with parsers we need to compute the set of
	// options that is not covered by both, the parser and the input.
	// We achieve this by keeping a local book of missing entries
	// that counts the number of misses. In case we have a parser
	// for the input both need to miss the entry. We count the
	// missing entries at the end.
	missCount := make(map[string]int)
	missCountThreshold := 0
	c.setLocalMissingTomlFieldTracker(missCount)
	defer c.resetMissingTomlFieldTracker()

	creator, ok := inputs.Inputs[name]
	if !ok {
		// Handle removed, deprecated plugins
		if di, deprecated := inputs.Deprecations[name]; deprecated {
			printHistoricPluginDeprecationNotice("inputs", name, di)
			return errors.New("plugin deprecated")
		}

		return fmt.Errorf("undefined but requested input: %s", name)
	}
	input := creator()

	// If the input has a SetParser or SetParserFunc function, it can accept
	// arbitrary data-formats, so build the requested parser and set it.
	if t, ok := input.(telegraf.ParserPlugin); ok {
		missCountThreshold = 1
		parser, err := c.addParser("inputs", name, table)
		if err != nil {
			return fmt.Errorf("adding parser failed: %w", err)
		}
		t.SetParser(parser)
	}

	if t, ok := input.(telegraf.ParserFuncPlugin); ok {
		missCountThreshold = 1
		if !c.probeParser("inputs", name, table) {
			return errors.New("parser not found")
		}
		t.SetParserFunc(func() (telegraf.Parser, error) {
			return c.addParser("inputs", name, table)
		})
	}

	pluginConfig, err := c.buildInput(name, source, table)
	if err != nil {
		return err
	}

	if err := c.toml.UnmarshalTable(table, input); err != nil {
		return err
	}

	if err := c.printUserDeprecation("inputs", name, input); err != nil {
		return err
	}

	if c, ok := interface{}(input).(interface{ TLSConfig() (*tls.Config, error) }); ok {
		if _, err := c.TLSConfig(); err != nil {
			return err
		}
	}

	// Check the number of misses against the threshold
	for key, count := range missCount {
		if count <= missCountThreshold {
			continue
		}
		if err := c.missingTomlField(nil, key); err != nil {
			return err
		}
	}

	rp := models.NewRunningInput(input, pluginConfig)
	rp.SetDefaultTags(c.Tags)
	c.Inputs = append(c.Inputs, rp)

	return nil
}

// buildAggregator parses Aggregator specific items from the ast.Table,
// builds the filter and returns a
// models.AggregatorConfig to be inserted into models.RunningAggregator
func (c *Config) buildAggregator(name, source string, tbl *ast.Table) (*models.AggregatorConfig, error) {
	conf := &models.AggregatorConfig{
		Name:   name,
		Source: source,
		Delay:  time.Millisecond * 100,
		Period: time.Second * 30,
		Grace:  time.Second * 0,
	}

	if period, found := c.getFieldDuration(tbl, "period"); found {
		conf.Period = period
	}
	if delay, found := c.getFieldDuration(tbl, "delay"); found {
		conf.Delay = delay
	}
	if grace, found := c.getFieldDuration(tbl, "grace"); found {
		conf.Grace = grace
	}

	conf.DropOriginal = c.getFieldBool(tbl, "drop_original")
	conf.MeasurementPrefix = c.getFieldString(tbl, "name_prefix")
	conf.MeasurementSuffix = c.getFieldString(tbl, "name_suffix")
	conf.NameOverride = c.getFieldString(tbl, "name_override")
	conf.Alias = c.getFieldString(tbl, "alias")
	conf.LogLevel = c.getFieldString(tbl, "log_level")

	conf.Tags = make(map[string]string)
	if node, ok := tbl.Fields["tags"]; ok {
		if subtbl, ok := node.(*ast.Table); ok {
			if err := c.toml.UnmarshalTable(subtbl, conf.Tags); err != nil {
				return nil, fmt.Errorf("could not parse tags for input %s", name)
			}
		}
	}

	if c.hasErrs() {
		return nil, c.firstErr()
	}

	var err error
	conf.Filter, err = c.buildFilter("aggregators."+name, tbl)
	if err != nil {
		return conf, err
	}

	// Generate an ID for the plugin
	conf.ID, err = generatePluginID("aggregators."+name, tbl)
	return conf, err
}

// buildProcessor parses Processor specific items from the ast.Table,
// builds the filter and returns a
// models.ProcessorConfig to be inserted into models.RunningProcessor
func (c *Config) buildProcessor(category, name, source string, tbl *ast.Table) (*models.ProcessorConfig, error) {
	conf := &models.ProcessorConfig{
		Name:   name,
		Source: source,
	}

	conf.Order = c.getFieldInt64(tbl, "order")
	conf.Alias = c.getFieldString(tbl, "alias")
	conf.LogLevel = c.getFieldString(tbl, "log_level")

	if c.hasErrs() {
		return nil, c.firstErr()
	}

	var err error
	conf.Filter, err = c.buildFilter(category+"."+name, tbl)
	if err != nil {
		return conf, err
	}

	// Generate an ID for the plugin
	conf.ID, err = generatePluginID(category+"."+name, tbl)
	return conf, err
}

// buildFilter builds a Filter
// (tags, fields, namepass, namedrop, metricpass) to
// be inserted into the models.OutputConfig/models.InputConfig
// to be used for glob filtering on tags and measurements
func (c *Config) buildFilter(plugin string, tbl *ast.Table) (models.Filter, error) {
	f := models.Filter{}

	f.NamePass = c.getFieldStringSlice(tbl, "namepass")
	f.NamePassSeparators = c.getFieldString(tbl, "namepass_separator")
	f.NameDrop = c.getFieldStringSlice(tbl, "namedrop")
	f.NameDropSeparators = c.getFieldString(tbl, "namedrop_separator")

	oldPass := c.getFieldStringSlice(tbl, "pass")
	if len(oldPass) > 0 {
		PrintOptionDeprecationNotice(plugin, "pass", telegraf.DeprecationInfo{
			Since:     "0.10.4",
			RemovalIn: "1.35.0",
			Notice:    "use 'fieldinclude' instead",
		})
		f.FieldInclude = append(f.FieldInclude, oldPass...)
	}

	oldFieldPass := c.getFieldStringSlice(tbl, "fieldpass")
	if len(oldFieldPass) > 0 {
		PrintOptionDeprecationNotice(plugin, "fieldpass", telegraf.DeprecationInfo{
			Since:     "1.29.0",
			RemovalIn: "1.40.0",
			Notice:    "use 'fieldinclude' instead",
		})
		f.FieldInclude = append(f.FieldInclude, oldFieldPass...)
	}

	fieldInclude := c.getFieldStringSlice(tbl, "fieldinclude")
	if len(fieldInclude) > 0 {
		f.FieldInclude = append(f.FieldInclude, fieldInclude...)
	}

	oldDrop := c.getFieldStringSlice(tbl, "drop")
	if len(oldDrop) > 0 {
		PrintOptionDeprecationNotice(plugin, "drop", telegraf.DeprecationInfo{
			Since:     "0.10.4",
			RemovalIn: "1.35.0",
			Notice:    "use 'fieldexclude' instead",
		})
		f.FieldExclude = append(f.FieldExclude, oldDrop...)
	}

	oldFieldDrop := c.getFieldStringSlice(tbl, "fielddrop")
	if len(oldFieldDrop) > 0 {
		PrintOptionDeprecationNotice(plugin, "fielddrop", telegraf.DeprecationInfo{
			Since:     "1.29.0",
			RemovalIn: "1.40.0",
			Notice:    "use 'fieldexclude' instead",
		})
		f.FieldExclude = append(f.FieldExclude, oldFieldDrop...)
	}

	fieldExclude := c.getFieldStringSlice(tbl, "fieldexclude")
	if len(fieldExclude) > 0 {
		f.FieldExclude = append(f.FieldExclude, fieldExclude...)
	}

	f.TagPassFilters = c.getFieldTagFilter(tbl, "tagpass")
	f.TagDropFilters = c.getFieldTagFilter(tbl, "tagdrop")

	f.TagExclude = c.getFieldStringSlice(tbl, "tagexclude")
	f.TagInclude = c.getFieldStringSlice(tbl, "taginclude")

	f.MetricPass = c.getFieldString(tbl, "metricpass")

	if c.hasErrs() {
		return f, c.firstErr()
	}

	if err := f.Compile(); err != nil {
		return f, err
	}

	return f, nil
}

// buildInput parses input specific items from the ast.Table,
// builds the filter and returns a
// models.InputConfig to be inserted into models.RunningInput
func (c *Config) buildInput(name, source string, tbl *ast.Table) (*models.InputConfig, error) {
	cp := &models.InputConfig{
		Name:                    name,
		Source:                  source,
		AlwaysIncludeLocalTags:  c.Agent.AlwaysIncludeLocalTags,
		AlwaysIncludeGlobalTags: c.Agent.AlwaysIncludeGlobalTags,
	}
	cp.Interval, _ = c.getFieldDuration(tbl, "interval")
	cp.Precision, _ = c.getFieldDuration(tbl, "precision")
	cp.CollectionJitter, cp.CollectionJitterSet = c.getFieldDuration(tbl, "collection_jitter")
	cp.CollectionOffset, _ = c.getFieldDuration(tbl, "collection_offset")
	if cp.CollectionOffset < 0 {
		return nil, fmt.Errorf("negative collection_offset %q is not allowed", cp.CollectionOffset)
	}
	cp.StartupErrorBehavior = c.getFieldString(tbl, "startup_error_behavior")
	cp.TimeSource = c.getFieldString(tbl, "time_source")

	cp.MeasurementPrefix = c.getFieldString(tbl, "name_prefix")
	cp.MeasurementSuffix = c.getFieldString(tbl, "name_suffix")
	cp.NameOverride = c.getFieldString(tbl, "name_override")
	cp.Alias = c.getFieldString(tbl, "alias")
	cp.LogLevel = c.getFieldString(tbl, "log_level")

	cp.Tags = make(map[string]string)
	if node, ok := tbl.Fields["tags"]; ok {
		if subtbl, ok := node.(*ast.Table); ok {
			if err := c.toml.UnmarshalTable(subtbl, cp.Tags); err != nil {
				return nil, fmt.Errorf("could not parse tags for input %s", name)
			}
		}
	}

	if c.hasErrs() {
		return nil, c.firstErr()
	}

	var err error
	cp.Filter, err = c.buildFilter("inputs."+name, tbl)
	if err != nil {
		return cp, err
	}

	// Generate an ID for the plugin
	cp.ID, err = generatePluginID("inputs."+name, tbl)
	return cp, err
}

// buildOutput parses output specific items from the ast.Table,
// builds the filter and returns a
// models.OutputConfig to be inserted into models.RunningInput
// Note: error exists in the return for future calls that might require error
func (c *Config) buildOutput(name, source string, tbl *ast.Table) (*models.OutputConfig, error) {
	filter, err := c.buildFilter("outputs."+name, tbl)
	if err != nil {
		return nil, err
	}

	bufferStrategy := c.Agent.BufferStrategy
	if bufferStrategy == "disk" {
		bufferStrategy = "disk_write_through"
	}
	bufferDiskSync := true
	if c.Agent.BufferDiskSync != nil {
		bufferDiskSync = *c.Agent.BufferDiskSync
	}

	oc := &models.OutputConfig{
		Name:            name,
		Source:          source,
		Filter:          filter,
		BufferStrategy:  bufferStrategy,
		BufferDirectory: c.Agent.BufferDirectory,
		BufferDiskSync:  bufferDiskSync,
	}

	// TODO: support FieldPass/FieldDrop on outputs

	oc.FlushInterval, _ = c.getFieldDuration(tbl, "flush_interval")
	oc.FlushJitter, _ = c.getFieldDuration(tbl, "flush_jitter")
	oc.MetricBufferLimit = c.getFieldInt(tbl, "metric_buffer_limit")
	oc.MetricBatchSize = c.getFieldInt(tbl, "metric_batch_size")
	oc.Alias = c.getFieldString(tbl, "alias")
	oc.NameOverride = c.getFieldString(tbl, "name_override")
	oc.NameSuffix = c.getFieldString(tbl, "name_suffix")
	oc.NamePrefix = c.getFieldString(tbl, "name_prefix")
	oc.StartupErrorBehavior = c.getFieldString(tbl, "startup_error_behavior")
	oc.LogLevel = c.getFieldString(tbl, "log_level")

	if c.hasErrs() {
		return nil, c.firstErr()
	}

	if oc.BufferStrategy == "disk_write_through" {
		log.Printf("W! Using disk-write-through buffer strategy for plugin outputs.%s, this is an experimental feature", name)
	}

	// Generate an ID for the plugin
	oc.ID, err = generatePluginID("outputs."+name, tbl)
	return oc, err
}

func (c *Config) missingTomlField(_ reflect.Type, key string) error {
	switch key {
	// General options to ignore
	case "alias", "always_include_local_tags",
		"buffer_strategy", "buffer_directory", "buffer_disk_sync",
		"collection_jitter", "collection_offset",
		"data_format", "delay", "drop", "drop_original",
		"fielddrop", "fieldexclude", "fieldinclude", "fieldpass", "flush_interval", "flush_jitter",
		"grace",
		"interval",
		"log_level", "lvm", // What is this used for?
		"metric_batch_size", "metric_buffer_limit", "metricpass",
		"name_override", "name_prefix", "name_suffix", "namedrop", "namedrop_separator", "namepass", "namepass_separator",
		"order",
		"pass", "period", "precision",
		"tagdrop", "tagexclude", "taginclude", "tagpass", "tags", "startup_error_behavior", "labels":

	// Secret-store options to ignore
	case "id":

	// Parser and serializer options to ignore
	case "data_type", "influx_parser_type":

	default:
		c.unusedFieldsMutex.Lock()
		c.UnusedFields[key] = true
		c.unusedFieldsMutex.Unlock()
	}
	return nil
}

func (c *Config) setLocalMissingTomlFieldTracker(counter map[string]int) {
	f := func(t reflect.Type, key string) error {
		// Check if we are in a root element that might share options among
		// each other. Those root elements are plugins of all types.
		// All other elements are subtables of their respective plugin and
		// should just be hit once anyway. Therefore, we mark them with a
		// high number to handle them correctly later.
		pt := reflect.PointerTo(t)
		root := pt.Implements(reflect.TypeOf((*telegraf.Input)(nil)).Elem())
		root = root || pt.Implements(reflect.TypeOf((*telegraf.ServiceInput)(nil)).Elem())
		root = root || pt.Implements(reflect.TypeOf((*telegraf.Output)(nil)).Elem())
		root = root || pt.Implements(reflect.TypeOf((*telegraf.Aggregator)(nil)).Elem())
		root = root || pt.Implements(reflect.TypeOf((*telegraf.Processor)(nil)).Elem())
		root = root || pt.Implements(reflect.TypeOf((*telegraf.StreamingProcessor)(nil)).Elem())
		root = root || pt.Implements(reflect.TypeOf((*telegraf.Parser)(nil)).Elem())
		root = root || pt.Implements(reflect.TypeOf((*telegraf.Serializer)(nil)).Elem())

		c, ok := counter[key]
		if !root {
			counter[key] = 100
		} else if !ok {
			counter[key] = 1
		} else {
			counter[key] = c + 1
		}
		return nil
	}
	c.toml.MissingField = f
}

func (c *Config) resetMissingTomlFieldTracker() {
	c.toml.MissingField = c.missingTomlField
}

func (*Config) getFieldString(tbl *ast.Table, fieldName string) string {
	if node, ok := tbl.Fields[fieldName]; ok {
		if kv, ok := node.(*ast.KeyValue); ok {
			if str, ok := kv.Value.(*ast.String); ok {
				return str.Value
			}
		}
	}

	return ""
}

func (c *Config) getFieldDuration(tbl *ast.Table, fieldName string) (time.Duration, bool) {
	if node, ok := tbl.Fields[fieldName]; ok {
		if kv, ok := node.(*ast.KeyValue); ok {
			if str, ok := kv.Value.(*ast.String); ok {
				d, err := time.ParseDuration(str.Value)
				if err != nil {
					c.addError(tbl, fmt.Errorf("error parsing duration: %w", err))
					return 0, false
				}
				return d, true
			}
		}
	}

	return 0, false
}

func (c *Config) getFieldBool(tbl *ast.Table, fieldName string) bool {
	if node, ok := tbl.Fields[fieldName]; ok {
		if kv, ok := node.(*ast.KeyValue); ok {
			switch t := kv.Value.(type) {
			case *ast.Boolean:
				target, err := t.Boolean()
				if err != nil {
					c.addError(tbl, fmt.Errorf("unknown boolean value type %q, expecting boolean", kv.Value))
					return false
				}
				return target
			case *ast.String:
				target, err := strconv.ParseBool(t.Value)
				if err != nil {
					c.addError(tbl, fmt.Errorf("unknown boolean value type %q, expecting boolean", kv.Value))
					return false
				}
				return target
			default:
				c.addError(tbl, fmt.Errorf("unknown boolean value type %q, expecting boolean", kv.Value.Source()))
				return false
			}
		}
	}

	return false
}

func (c *Config) getFieldInt(tbl *ast.Table, fieldName string) int {
	if node, ok := tbl.Fields[fieldName]; ok {
		if kv, ok := node.(*ast.KeyValue); ok {
			if iAst, ok := kv.Value.(*ast.Integer); ok {
				i, err := iAst.Int()
				if err != nil {
					c.addError(tbl, fmt.Errorf("unexpected int type %q, expecting int", iAst.Value))
					return 0
				}
				return int(i)
			}
		}
	}

	return 0
}

func (c *Config) getFieldInt64(tbl *ast.Table, fieldName string) int64 {
	if node, ok := tbl.Fields[fieldName]; ok {
		if kv, ok := node.(*ast.KeyValue); ok {
			if iAst, ok := kv.Value.(*ast.Integer); ok {
				i, err := iAst.Int()
				if err != nil {
					c.addError(tbl, fmt.Errorf("unexpected int type %q, expecting int", iAst.Value))
					return 0
				}
				return i
			}
			c.addError(tbl, fmt.Errorf("found unexpected format while parsing %q, expecting int", fieldName))
			return 0
		}
	}

	return 0
}

func (c *Config) getFieldStringSlice(tbl *ast.Table, fieldName string) []string {
	var target []string
	if node, ok := tbl.Fields[fieldName]; ok {
		if kv, ok := node.(*ast.KeyValue); ok {
			ary, ok := kv.Value.(*ast.Array)
			if !ok {
				c.addError(tbl, fmt.Errorf("found unexpected format while parsing %q, expecting string array/slice format", fieldName))
				return target
			}
			for _, elem := range ary.Value {
				if str, ok := elem.(*ast.String); ok {
					target = append(target, str.Value)
				}
			}
		}
	}

	return target
}

func (c *Config) getFieldTagFilter(tbl *ast.Table, fieldName string) []models.TagFilter {
	var target []models.TagFilter
	if node, ok := tbl.Fields[fieldName]; ok {
		if subTbl, ok := node.(*ast.Table); ok {
			for name, val := range subTbl.Fields {
				if kv, ok := val.(*ast.KeyValue); ok {
					ary, ok := kv.Value.(*ast.Array)
					if !ok {
						c.addError(tbl, fmt.Errorf("found unexpected format while parsing %q, expecting string array/slice format on each entry", fieldName))
						return nil
					}

					tagFilter := models.TagFilter{Name: name}
					for _, elem := range ary.Value {
						if str, ok := elem.(*ast.String); ok {
							tagFilter.Values = append(tagFilter.Values, str.Value)
						}
					}
					target = append(target, tagFilter)
				}
			}
		}
	}

	return target
}

func (*Config) getFieldMap(tbl *ast.Table, fieldName string) map[string]string {
	target := make(map[string]string)
	if node, ok := tbl.Fields[fieldName]; ok {
		if subTbl, ok := node.(*ast.Table); ok {
			for _, val := range subTbl.Fields {
				if kv, ok := val.(*ast.KeyValue); ok {
					if str, ok := kv.Value.(*ast.String); ok {
						target[kv.Key] = str.Value
					}
				}
			}
		}
	}

	return target
}

func (c *Config) matchesLabelSelection(tbl *ast.Table) (bool, error) {
	// Get the label definitions for the plugin and check them
	labels := c.getFieldMap(tbl, "labels")
	for k, v := range labels {
		if err := CheckLabelKeyValuePairs(k, v); err != nil {
			return false, err
		}
	}

	// Match the selection statement against the labels
	return pluginLabelSelector.matches(labels), nil
}

func keys(m map[string]bool) []string {
	result := make([]string, 0, len(m))
	for k := range m {
		result = append(result, k)
	}
	return result
}

func setDefaultParser(category, name string) string {
	// Legacy support, exec plugin originally parsed JSON by default.
	if category == "inputs" && name == "exec" {
		return "json"
	}

	return "influx"
}

func (c *Config) hasErrs() bool {
	return len(c.errs) > 0
}

func (c *Config) firstErr() error {
	if len(c.errs) == 0 {
		return nil
	}
	return c.errs[0]
}

func (c *Config) addError(tbl *ast.Table, err error) {
	c.errs = append(c.errs, fmt.Errorf("line %d:%d: %w", tbl.Line, tbl.Position, err))
}
