feat(outputs.opensearch): Add OpenSearch output plugin (#11958)

Co-authored-by: Josh Powers <powersj@fastmail.com>
This commit is contained in:
Mandeep Kalra 2023-09-29 16:10:04 +05:30 committed by GitHub
parent 188c8d7bbf
commit 4e35ac8dc3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 1864 additions and 0 deletions

View File

@ -0,0 +1,5 @@
//go:build !custom || outputs || outputs.opensearch
package all
import _ "github.com/influxdata/telegraf/plugins/outputs/opensearch" // register plugin

View File

@ -0,0 +1,352 @@
# OpenSearch Output Plugin
This plugin writes to [OpenSearch](https://opensearch.org/) via HTTP
It supports OpenSearch releases from 1 and 2. Future comparability with 1.x is
not guaranteed and instead will focus on 2.x support. Consider using the
existing Elasticsearch plugin for 1.x.
## Global configuration options <!-- @/docs/includes/plugin_config.md -->
In addition to the plugin-specific configuration settings, plugins support
additional global and plugin configuration settings. These settings are used to
modify metrics, tags, and field or create aliases and configure ordering, etc.
See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins
## Configuration
```toml @sample.conf
# Configuration for OpenSearch to send metrics to.
[[outputs.OpenSearch]]
## URLs
## The full HTTP endpoint URL for your OpenSearch instance. Multiple URLs can
## be specified as part of the same cluster, but only one URLs is used to
## write during each interval.
urls = ["http://node1.os.example.com:9200"]
## Index Name
## Target index name for metrics (OpenSearch will create if it not exists).
## This is a Golang template (see https://pkg.go.dev/text/template)
## You can also specify
## metric name (`{{.Name}}`), tag value (`{{.Tag "tag_name"}}`), field value (`{{.Field "feild_name"}}`)
## If the tag does not exist, the default tag value will be empty string "".
## the timestamp (`{{.Time.Format "xxxxxxxxx"}}`).
## For example: "telegraf-{{.Time.Format "2006-01-02"}}-{{.Tag "host"}}" would set it to telegraf-2023-07-27-HostName
index_name = ""
## Timeout
## OpenSearch client timeout
# timeout = "5s"
## Sniffer
## Set to true to ask OpenSearch a list of all cluster nodes,
## thus it is not necessary to list all nodes in the urls config option
# enable_sniffer = false
## GZIP Compression
## Set to true to enable gzip compression
# enable_gzip = false
## Health Check Interval
## Set the interval to check if the OpenSearch nodes are available
## Setting to "0s" will disable the health check (not recommended in production)
# health_check_interval = "10s"
## Set the timeout for periodic health checks.
# health_check_timeout = "1s"
## HTTP basic authentication details.
# username = ""
# password = ""
## HTTP bearer token authentication details
# auth_bearer_token = ""
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Template Config
## Manage templates
## Set to true if you want telegraf to manage its index template.
## If enabled it will create a recommended index template for telegraf indexes
# manage_template = true
## Template Name
## The template name used for telegraf indexes
# template_name = "telegraf"
## Overwrite Templates
## Set to true if you want telegraf to overwrite an existing template
# overwrite_template = false
## Document ID
## If set to true a unique ID hash will be sent as
## sha256(concat(timestamp,measurement,series-hash)) string. It will enable
## data resend and update metric points avoiding duplicated metrics with
## different id's
# force_document_id = false
## Value Handling
## Specifies the handling of NaN and Inf values.
## This option can have the following values:
## none -- do not modify field-values (default); will produce an error
## if NaNs or infs are encountered
## drop -- drop fields containing NaNs or infs
## replace -- replace with the value in "float_replacement_value" (default: 0.0)
## NaNs and inf will be replaced with the given number, -inf with the negative of that number
# float_handling = "none"
# float_replacement_value = 0.0
## Pipeline Config
## To use a ingest pipeline, set this to the name of the pipeline you want to use.
# use_pipeline = "my_pipeline"
## Pipeline Name
## Additionally, you can specify a tag name using the notation (`{{.Tag "tag_name"}}`)
## which will be used as the pipeline name (e.g. "{{.Tag "os_pipeline"}}").
## If the tag does not exist, the default pipeline will be used as the pipeline.
## If no default pipeline is set, no pipeline is used for the metric.
# default_pipeline = ""
```
### Required parameters
* `urls`: A list containing the full HTTP URL of one or more nodes from your
OpenSearch instance.
* `index_name`: The target index for metrics. You can use the date format
For example: "telegraf-{{.Time.Format "2006-01-02"}}" would set it to
"telegraf-2023-07-27". You can also specify metric name (`{{ .Name }}`), tag
value (`{{ .Tag "tag_name" }}`), and field value (`{{ .Field "field_name" }}`).
If the tag does not exist, the default tag value will be empty string ""
## Permissions
If you are using authentication within your OpenSearch cluster, you need to
create an account and create a role with at least the manage role in the Cluster
Privileges category. Otherwise, your account will not be able to connect to your
OpenSearch cluster and send logs to your cluster. After that, you need to
add "create_index" and "write" permission to your specific index pattern.
## OpenSearch indexes and templates
### Indexes per time-frame
This plugin can manage indexes per time-frame, as commonly done in other tools
with OpenSearch. The timestamp of the metric collected will be used to decide
the index destination. For more information about this usage on OpenSearch,
check [the docs][1].
[1]: https://opensearch.org/docs/latest/
### Template management
Index templates are used in OpenSearch to define settings and mappings for
the indexes and how the fields should be analyzed. For more information on how
this works, see [the docs][2].
This plugin can create a working template for use with telegraf metrics. It uses
OpenSearch dynamic templates feature to set proper types for the tags and
metrics fields. If the template specified already exists, it will not overwrite
unless you configure this plugin to do so. Thus you can customize this template
after its creation if necessary.
Example of an index template created by telegraf on OpenSearch 2.x:
```json
{
"telegraf-2022.10.02" : {
"aliases" : { },
"mappings" : {
"properties" : {
"@timestamp" : {
"type" : "date"
},
"disk" : {
"properties" : {
"free" : {
"type" : "long"
},
"inodes_free" : {
"type" : "long"
},
"inodes_total" : {
"type" : "long"
},
"inodes_used" : {
"type" : "long"
},
"total" : {
"type" : "long"
},
"used" : {
"type" : "long"
},
"used_percent" : {
"type" : "float"
}
}
},
"measurement_name" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"tag" : {
"properties" : {
"cpu" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"device" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"host" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"mode" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"path" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
}
}
},
"settings" : {
"index" : {
"creation_date" : "1664693522789",
"number_of_shards" : "1",
"number_of_replicas" : "1",
"uuid" : "TYugdmvsQfmxjzbGRJ8FIw",
"version" : {
"created" : "136247827"
},
"provided_name" : "telegraf-2022.10.02"
}
}
}
}
```
[2]: https://opensearch.org/docs/latest/opensearch/index-templates/
### Example events
This plugin will format the events in the following way:
```json
{
"@timestamp": "2017-01-01T00:00:00+00:00",
"measurement_name": "cpu",
"cpu": {
"usage_guest": 0,
"usage_guest_nice": 0,
"usage_idle": 71.85413456197966,
"usage_iowait": 0.256805341656516,
"usage_irq": 0,
"usage_nice": 0,
"usage_softirq": 0.2054442732579466,
"usage_steal": 0,
"usage_system": 15.04879301548127,
"usage_user": 12.634822807288275
},
"tag": {
"cpu": "cpu-total",
"host": "opensearhhost",
"dc": "datacenter1"
}
}
```
```json
{
"@timestamp": "2017-01-01T00:00:00+00:00",
"measurement_name": "system",
"system": {
"load1": 0.78,
"load15": 0.8,
"load5": 0.8,
"n_cpus": 2,
"n_users": 2
},
"tag": {
"host": "opensearhhost",
"dc": "datacenter1"
}
}
```
## Known issues
Integer values collected that are bigger than 2^63 and smaller than 1e21 (or in
this exact same window of their negative counterparts) are encoded by golang
JSON encoder in decimal format and that is not fully supported by OpenSearch
dynamic field mapping. This causes the metrics with such values to be dropped in
case a field mapping has not been created yet on the telegraf index. If that's
the case you will see an exception on OpenSearch side like this:
```json
{
"error": {
"root_cause": [
{"type": "mapper_parsing_exception", "reason": "failed to parse"}
],
"type": "mapper_parsing_exception",
"reason": "failed to parse",
"caused_by": {
"type": "illegal_state_exception",
"reason": "No matching token for number_type [BIG_INTEGER]"
}
},
"status": 400
}
```
The correct field mapping will be created on the telegraf index as soon as a
supported JSON value is received by OpenSearch, and subsequent insertions
will work because the field mapping will already exist.
This issue is caused by the way OpenSearch tries to detect integer fields,
and by how golang encodes numbers in JSON. There is no clear workaround for this
at the moment.

View File

@ -0,0 +1,445 @@
//go:generate ../../../tools/readme_config_includer/generator
package opensearch
import (
"bytes"
"context"
"crypto/sha256"
"crypto/tls"
_ "embed"
"encoding/json"
"fmt"
"math"
"net/http"
"strconv"
"strings"
"text/template"
"time"
"github.com/opensearch-project/opensearch-go/v2"
"github.com/opensearch-project/opensearch-go/v2/opensearchapi"
"github.com/opensearch-project/opensearch-go/v2/opensearchutil"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal/choice"
httpconfig "github.com/influxdata/telegraf/plugins/common/http"
"github.com/influxdata/telegraf/plugins/outputs"
)
//go:embed sample.conf
var sampleConfig string
type Opensearch struct {
Username config.Secret `toml:"username"`
Password config.Secret `toml:"password"`
AuthBearerToken config.Secret `toml:"auth_bearer_token"`
EnableGzip bool `toml:"enable_gzip"`
EnableSniffer bool `toml:"enable_sniffer"`
FloatHandling string `toml:"float_handling"`
FloatReplacement float64 `toml:"float_replacement_value"`
ForceDocumentID bool `toml:"force_document_id"`
IndexName string `toml:"index_name"`
TemplateName string `toml:"template_name"`
ManageTemplate bool `toml:"manage_template"`
OverwriteTemplate bool `toml:"overwrite_template"`
pipelineName string
DefaultPipeline string `toml:"default_pipeline"`
UsePipeline string `toml:"use_pipeline"`
Timeout config.Duration `toml:"timeout"`
HealthCheckInterval config.Duration `toml:"health_check_interval"`
HealthCheckTimeout config.Duration `toml:"health_check_timeout"`
URLs []string `toml:"urls"`
Log telegraf.Logger `toml:"-"`
indexTmpl *template.Template
pipelineTmpl *template.Template
onSucc func(context.Context, opensearchutil.BulkIndexerItem, opensearchutil.BulkIndexerResponseItem)
onFail func(context.Context, opensearchutil.BulkIndexerItem, opensearchutil.BulkIndexerResponseItem, error)
configOptions httpconfig.HTTPClientConfig
osClient *opensearch.Client
}
//go:embed template.json
var indexTemplate string
type templatePart struct {
TemplatePattern string
}
func (*Opensearch) SampleConfig() string {
return sampleConfig
}
func (o *Opensearch) Init() error {
if len(o.URLs) == 0 || o.IndexName == "" {
return fmt.Errorf("opensearch urls or index_name is not defined")
}
// Determine if we should process NaN and inf values
valOptions := []string{"", "none", "drop", "replace"}
if err := choice.Check(o.FloatHandling, valOptions); err != nil {
return fmt.Errorf("config float_handling type: %w", err)
}
if o.FloatHandling == "" {
o.FloatHandling = "none"
}
indexTmpl, err := template.New("index").Parse(o.IndexName)
if err != nil {
return fmt.Errorf("error parsing index_name template: %w", err)
}
o.indexTmpl = indexTmpl
pipelineTmpl, err := template.New("index").Parse(o.UsePipeline)
if err != nil {
return fmt.Errorf("error parsing use_pipeline template: %w", err)
}
o.pipelineTmpl = pipelineTmpl
o.onSucc = func(ctx context.Context, item opensearchutil.BulkIndexerItem, res opensearchutil.BulkIndexerResponseItem) {
o.Log.Debugf("Indexed to OpenSearch with status- [%d] Result- %s DocumentID- %s ", res.Status, res.Result, res.DocumentID)
}
o.onFail = func(ctx context.Context, item opensearchutil.BulkIndexerItem, res opensearchutil.BulkIndexerResponseItem, err error) {
if err != nil {
o.Log.Errorf("error while OpenSearch bulkIndexing: %v", err)
} else {
o.Log.Errorf("error while OpenSearch bulkIndexing: %s: %s", res.Error.Type, res.Error.Reason)
}
}
if o.TemplateName == "" {
return fmt.Errorf("template_name configuration not defined")
}
return nil
}
func init() {
outputs.Add("opensearch", func() telegraf.Output {
return &Opensearch{
Timeout: config.Duration(time.Second * 5),
HealthCheckInterval: config.Duration(time.Second * 10),
HealthCheckTimeout: config.Duration(time.Second * 1),
}
})
}
func (o *Opensearch) Connect() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(o.Timeout))
defer cancel()
err := o.newClient()
if err != nil {
o.Log.Errorf("error creating OpenSearch client: %v", err)
}
if o.ManageTemplate {
err := o.manageTemplate(ctx)
if err != nil {
return err
}
}
return nil
}
func (o *Opensearch) newClient() error {
username, err := o.Username.Get()
if err != nil {
return fmt.Errorf("getting username failed: %w", err)
}
defer config.ReleaseSecret(username)
password, err := o.Password.Get()
if err != nil {
return fmt.Errorf("getting password failed: %w", err)
}
defer config.ReleaseSecret(password)
clientConfig := opensearch.Config{
Addresses: o.URLs,
Username: string(username),
Password: string(password),
}
if o.configOptions.InsecureSkipVerify {
clientConfig.Transport = &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
}
header := http.Header{}
if o.EnableGzip {
header.Add("Content-Encoding", "gzip")
header.Add("Content-Type", "application/json")
header.Add("Accept-Encoding", "gzip")
}
if !o.AuthBearerToken.Empty() {
token, err := o.AuthBearerToken.Get()
if err != nil {
return fmt.Errorf("getting token failed: %w", err)
}
if string(token) != "" {
header.Add("Authorization", "Bearer "+string(token))
}
}
clientConfig.Header = header
client, err := opensearch.NewClient(clientConfig)
o.osClient = client
return err
}
// getPointID generates a unique ID for a Metric Point
// Timestamp(ns),measurement name and Series Hash for compute the final
// SHA256 based hash ID
func getPointID(m telegraf.Metric) string {
var buffer bytes.Buffer
buffer.WriteString(strconv.FormatInt(m.Time().Local().UnixNano(), 10))
buffer.WriteString(m.Name())
buffer.WriteString(strconv.FormatUint(m.HashID(), 10))
return fmt.Sprintf("%x", sha256.Sum256(buffer.Bytes()))
}
func (o *Opensearch) Write(metrics []telegraf.Metric) error {
// get indexers based on unique pipeline values
indexers := getTargetIndexers(metrics, o)
if len(indexers) == 0 {
return fmt.Errorf("failed to instantiate OpenSearch bulkindexer")
}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(o.Timeout))
defer cancel()
for _, metric := range metrics {
var name = metric.Name()
// index name has to be re-evaluated each time for telegraf
// to send the metric to the correct time-based index
indexName, err := o.GetIndexName(metric)
if err != nil {
return fmt.Errorf("generating indexname failed: %w", err)
}
// Handle NaN and inf field-values
fields := make(map[string]interface{})
for k, value := range metric.Fields() {
v, ok := value.(float64)
if !ok || o.FloatHandling == "none" || !(math.IsNaN(v) || math.IsInf(v, 0)) {
fields[k] = value
continue
}
if o.FloatHandling == "drop" {
continue
}
if math.IsNaN(v) || math.IsInf(v, 1) {
fields[k] = o.FloatReplacement
} else {
fields[k] = -o.FloatReplacement
}
}
m := make(map[string]interface{})
m["@timestamp"] = metric.Time()
m["measurement_name"] = name
m["tag"] = metric.Tags()
m[name] = fields
body, err := json.Marshal(m)
if err != nil {
return fmt.Errorf("failed to marshal body: %w", err)
}
bulkIndxrItem := opensearchutil.BulkIndexerItem{
Action: "index",
Index: indexName,
Body: strings.NewReader(string(body)),
OnSuccess: o.onSucc,
OnFailure: o.onFail,
}
if o.ForceDocumentID {
bulkIndxrItem.DocumentID = getPointID(metric)
}
if o.UsePipeline != "" {
pipelineName, err := o.getPipelineName(metric)
if err != nil {
return fmt.Errorf("failed to evaluate pipeline name: %w", err)
}
if pipelineName != "" {
if indexers[pipelineName] != nil {
if err := indexers[pipelineName].Add(ctx, bulkIndxrItem); err != nil {
o.Log.Errorf("error adding metric entry to OpenSearch bulkIndexer: %v for pipeline %s", err, pipelineName)
}
continue
}
}
}
if err := indexers["default"].Add(ctx, bulkIndxrItem); err != nil {
o.Log.Errorf("error adding metric entry to OpenSearch default bulkIndexer: %v", err)
}
}
for _, bulkIndxr := range indexers {
if err := bulkIndxr.Close(ctx); err != nil {
return fmt.Errorf("error sending bulk request to OpenSearch: %w", err)
}
// Report the indexer statistics
stats := bulkIndxr.Stats()
if stats.NumAdded < uint64(len(metrics)) {
return fmt.Errorf("indexed [%d] documents with [%d] errors", stats.NumAdded, stats.NumFailed)
}
o.Log.Debugf("Successfully indexed [%d] documents", stats.NumAdded)
}
return nil
}
// BulkIndexer supports pipeline at config level so seperate indexer instance for each unique pipeline
func getTargetIndexers(metrics []telegraf.Metric, osInst *Opensearch) map[string]opensearchutil.BulkIndexer {
var indexers = make(map[string]opensearchutil.BulkIndexer)
if osInst.UsePipeline != "" {
for _, metric := range metrics {
pipelineName, err := osInst.getPipelineName(metric)
if err != nil {
osInst.Log.Errorf("error while evaluating pipeline name: %v for pipeline %s", err, pipelineName)
}
if pipelineName != "" {
// BulkIndexer supports pipeline at config level not metric level
if _, ok := indexers[osInst.pipelineName]; ok {
continue
}
bulkIndxr, err := createBulkIndexer(osInst, pipelineName)
if err != nil {
osInst.Log.Errorf("error while intantiating OpenSearch NewBulkIndexer: %v for pipeline: %s", err, pipelineName)
} else {
indexers[pipelineName] = bulkIndxr
}
}
}
}
bulkIndxr, err := createBulkIndexer(osInst, "")
if err != nil {
osInst.Log.Errorf("error while intantiating OpenSearch NewBulkIndexer: %v for default pipeline", err)
} else {
indexers["default"] = bulkIndxr
}
return indexers
}
func createBulkIndexer(osInst *Opensearch, pipelineName string) (opensearchutil.BulkIndexer, error) {
var bulkIndexerConfig = opensearchutil.BulkIndexerConfig{
Client: osInst.osClient,
NumWorkers: 4, // The number of worker goroutines (default: number of CPUs)
FlushBytes: 5e+6, // The flush threshold in bytes (default: 5M)
}
if pipelineName != "" {
bulkIndexerConfig.Pipeline = pipelineName
}
return opensearchutil.NewBulkIndexer(bulkIndexerConfig)
}
func (o *Opensearch) GetIndexName(metric telegraf.Metric) (string, error) {
var buf bytes.Buffer
err := o.indexTmpl.Execute(&buf, metric)
if err != nil {
return "", fmt.Errorf("creating index name failed: %w", err)
}
var indexName = buf.String()
if strings.Contains(indexName, "{{") {
return "", fmt.Errorf("failed to evaluate valid indexname: %s", indexName)
}
o.Log.Debugf("indexName- %s", indexName)
return indexName, nil
}
func (o *Opensearch) getPipelineName(metric telegraf.Metric) (string, error) {
if o.UsePipeline == "" || !strings.Contains(o.UsePipeline, "{{") {
return o.UsePipeline, nil
}
var buf bytes.Buffer
err := o.pipelineTmpl.Execute(&buf, metric)
if err != nil {
return "", fmt.Errorf("creating pipeline name failed: %w", err)
}
var pipelineName = buf.String()
if strings.Contains(pipelineName, "{{") {
return "", fmt.Errorf("failed to evaluate valid pipelineName: %s", pipelineName)
}
o.Log.Debugf("PipelineTemplate- %s", pipelineName)
if pipelineName == "" {
pipelineName = o.DefaultPipeline
}
return pipelineName, nil
}
func (o *Opensearch) manageTemplate(ctx context.Context) error {
tempReq := opensearchapi.CatTemplatesRequest{
Name: o.TemplateName,
}
resp, err := tempReq.Do(ctx, o.osClient.Transport)
if err != nil {
return fmt.Errorf("template check failed, template name: %s, error: %w", o.TemplateName, err)
}
templateExists := resp.Body != http.NoBody
templatePattern := o.IndexName
if strings.Contains(templatePattern, "{{") {
templatePattern = templatePattern[0:strings.Index(templatePattern, "{{")]
}
if templatePattern == "" {
return fmt.Errorf("template cannot be created for dynamic index names without an index prefix")
}
if o.OverwriteTemplate || !templateExists || templatePattern != "" {
tp := templatePart{
TemplatePattern: templatePattern + "*",
}
t := template.Must(template.New("template").Parse(indexTemplate))
var tmpl bytes.Buffer
if err := t.Execute(&tmpl, tp); err != nil {
return err
}
indexTempReq := opensearchapi.IndicesPutTemplateRequest{
Name: o.TemplateName,
Body: strings.NewReader(tmpl.String()),
}
indexTempResp, err := indexTempReq.Do(ctx, o.osClient.Transport)
if err != nil || indexTempResp.StatusCode != 200 {
return fmt.Errorf("creating index template %q failed: %w", o.TemplateName, err)
}
o.Log.Debugf("Template %s created or updated", o.TemplateName)
} else {
o.Log.Debug("Found existing OpenSearch template. Skipping template management")
}
return nil
}
func (o *Opensearch) Close() error {
o.osClient = nil
return nil
}

View File

@ -0,0 +1,254 @@
package opensearch
import (
"net/http"
"net/http/httptest"
"testing"
"text/template"
"time"
"github.com/docker/go-connections/nat"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
)
const servicePort = "9200"
const imageVersion1 = "1.1.0"
const imageVersion2 = "2.8.0"
func launchTestContainer(t *testing.T, imageVersion string) *testutil.Container {
container := testutil.Container{
Image: "opensearchproject/opensearch:" + imageVersion,
ExposedPorts: []string{servicePort},
Env: map[string]string{
"discovery.type": "single-node",
"DISABLE_INSTALL_DEMO_CONFIG": "true",
"DISABLE_SECURITY_PLUGIN": "true",
},
WaitingFor: wait.ForAll(
wait.ForListeningPort(nat.Port(servicePort)),
wait.ForLog("Init AD version hash ring successfully"),
),
}
require.NoError(t, container.Start(), "failed to start container")
return &container
}
func TestGetIndexName(t *testing.T) {
e := &Opensearch{
Log: testutil.Logger{},
}
tests := []struct {
EventTime time.Time
Tags map[string]string
IndexName string
Expected string
}{
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
"indexname",
"indexname",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{},
`indexname-{{.Time.Format "2006-01-02"}}`,
"indexname-2014-12-01",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{},
`indexname-{{.Tag "tag2"}}-{{.Time.Format "2006-01-02"}}`,
"indexname--2014-12-01",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1"},
`indexname-{{.Tag "tag1"}}-{{.Time.Format "2006-01-02"}}`,
"indexname-value1-2014-12-01",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
`indexname-{{.Tag "tag1"}}-{{.Tag "tag2"}}-{{.Tag "tag3"}}-{{.Time.Format "2006-01-02"}}`,
"indexname-value1-value2--2014-12-01",
},
}
for _, test := range tests {
mockMetric := testutil.MockMetrics()[0]
mockMetric.SetTime(test.EventTime)
for key, val := range test.Tags {
mockMetric.AddTag(key, val)
}
e.indexTmpl, _ = template.New("index").Parse(test.IndexName)
indexName, err := e.GetIndexName(mockMetric)
require.NoError(t, err)
if indexName != test.Expected {
t.Errorf("Expected indexname %s, got %s\n", test.Expected, indexName)
}
}
}
func TestGetPipelineName(t *testing.T) {
e := &Opensearch{
DefaultPipeline: "myDefaultPipeline",
Log: testutil.Logger{},
}
tests := []struct {
Tags map[string]string
PipelineTagKeys []string
UsePipeline string
Expected string
}{
{
map[string]string{"tag1": "value1", "tag2": "value2"},
[]string{},
`{{.Tag "es-pipeline"}}`,
"myDefaultPipeline",
},
{
map[string]string{"tag1": "value1", "tag2": "value2"},
[]string{},
``,
"",
},
{
map[string]string{"tag1": "value1", "es-pipeline": "myOtherPipeline"},
[]string{},
`{{.Tag "es-pipeline"}}`,
"myOtherPipeline",
},
{
map[string]string{"tag1": "pipeline2", "es-pipeline": "myOtherPipeline"},
[]string{},
`{{.Tag "tag1"}}`,
"pipeline2",
},
}
for _, test := range tests {
e.UsePipeline = test.UsePipeline
e.pipelineTmpl, _ = template.New("index").Parse(test.UsePipeline)
mockMetric := testutil.MockMetrics()[0]
for key, val := range test.Tags {
mockMetric.AddTag(key, val)
}
pipelineName, err := e.getPipelineName(mockMetric)
require.NoError(t, err)
require.Equal(t, test.Expected, pipelineName)
}
}
func TestRequestHeaderWhenGzipIsEnabled(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/_bulk":
require.Equal(t, "gzip", r.Header.Get("Content-Encoding"))
require.Equal(t, "gzip", r.Header.Get("Accept-Encoding"))
_, err := w.Write([]byte("{}"))
require.NoError(t, err)
return
default:
_, err := w.Write([]byte(`{"version": {"number": "7.8"}}`))
require.NoError(t, err)
return
}
}))
defer ts.Close()
urls := []string{"http://" + ts.Listener.Addr().String()}
e := &Opensearch{
URLs: urls,
IndexName: `test-{{.Tag "tag1"}}-{{.Time.Format "2006-01-02"}}`,
Timeout: config.Duration(time.Second * 5),
EnableGzip: true,
ManageTemplate: false,
Log: testutil.Logger{},
}
e.indexTmpl, _ = template.New("index").Parse(e.IndexName)
var indexName, err = e.GetIndexName(testutil.MockMetrics()[0])
require.NoError(t, err)
e.IndexName = indexName
err = e.Connect()
require.NoError(t, err)
err = e.Write(testutil.MockMetrics())
require.NoError(t, err)
}
func TestRequestHeaderWhenGzipIsDisabled(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/_bulk":
require.NotEqual(t, "gzip", r.Header.Get("Content-Encoding"))
_, err := w.Write([]byte("{}"))
require.NoError(t, err)
return
default:
_, err := w.Write([]byte(`{"version": {"number": "7.8"}}`))
require.NoError(t, err)
return
}
}))
defer ts.Close()
urls := []string{"http://" + ts.Listener.Addr().String()}
e := &Opensearch{
URLs: urls,
IndexName: `test-{{.Tag "tag1"}}-{{.Time.Format "2006-01-02"}}`,
Timeout: config.Duration(time.Second * 5),
EnableGzip: false,
ManageTemplate: false,
Log: testutil.Logger{},
}
e.indexTmpl, _ = template.New("index").Parse(e.IndexName)
err := e.Connect()
require.NoError(t, err)
err = e.Write(testutil.MockMetrics())
require.NoError(t, err)
}
func TestAuthorizationHeaderWhenBearerTokenIsPresent(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/_bulk":
require.Equal(t, "Bearer 0123456789abcdef", r.Header.Get("Authorization"))
_, err := w.Write([]byte("{}"))
require.NoError(t, err)
return
default:
_, err := w.Write([]byte(`{"version": {"number": "7.8"}}`))
require.NoError(t, err)
return
}
}))
defer ts.Close()
urls := []string{"http://" + ts.Listener.Addr().String()}
e := &Opensearch{
URLs: urls,
IndexName: `{{.Tag "tag1"}}-{{.Time.Format "2006-01-02"}}`,
Timeout: config.Duration(time.Second * 5),
EnableGzip: false,
ManageTemplate: false,
Log: testutil.Logger{},
AuthBearerToken: config.NewSecret([]byte("0123456789abcdef")),
}
e.indexTmpl, _ = template.New("index").Parse(e.IndexName)
err := e.Connect()
require.NoError(t, err)
err = e.Write(testutil.MockMetrics())
require.NoError(t, err)
}

View File

@ -0,0 +1,330 @@
package opensearch
import (
"context"
"fmt"
"math"
"testing"
"text/template"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestConnectAndWriteIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
container := launchTestContainer(t, imageVersion1)
defer container.Terminate()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}
e := &Opensearch{
URLs: urls,
IndexName: `test-{{.Time.Format "2006-01-02"}}`,
Timeout: config.Duration(time.Second * 5),
EnableGzip: false,
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: false,
HealthCheckInterval: config.Duration(time.Second * 10),
HealthCheckTimeout: config.Duration(time.Second * 1),
Log: testutil.Logger{},
}
e.indexTmpl, _ = template.New("index").Parse(e.IndexName)
// Verify that we can connect to Opensearch
require.NoError(t, e.Connect())
// Verify that we can successfully write data to Opensearch
require.NoError(t, e.Write(testutil.MockMetrics()))
}
func TestConnectAndWriteMetricWithNaNValueEmptyIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
container := launchTestContainer(t, imageVersion1)
defer container.Terminate()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}
e := &Opensearch{
URLs: urls,
IndexName: `test-{{.Time.Format "2006-01-02"}}`,
Timeout: config.Duration(time.Second * 5),
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: false,
HealthCheckInterval: config.Duration(time.Second * 10),
HealthCheckTimeout: config.Duration(time.Second * 1),
Log: testutil.Logger{},
}
metrics := []telegraf.Metric{
testutil.TestMetric(math.NaN()),
testutil.TestMetric(math.Inf(1)),
testutil.TestMetric(math.Inf(-1)),
}
// Verify that we can connect to Opensearch
require.NoError(t, e.Init())
require.NoError(t, e.Connect())
// Verify that we can fail for metric with unhandled NaN/inf/-inf values
for _, m := range metrics {
require.Error(t, e.Write([]telegraf.Metric{m}), "error sending bulk request to Opensearch: "+
"json: unsupported value: NaN")
}
}
func TestConnectAndWriteMetricWithNaNValueNoneIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
container := launchTestContainer(t, imageVersion1)
defer container.Terminate()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}
e := &Opensearch{
URLs: urls,
IndexName: `test-{{.Time.Format "2006-01-02"}}`,
Timeout: config.Duration(time.Second * 5),
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: false,
HealthCheckInterval: config.Duration(time.Second * 10),
HealthCheckTimeout: config.Duration(time.Second * 1),
FloatHandling: "none",
Log: testutil.Logger{},
}
metrics := []telegraf.Metric{
testutil.TestMetric(math.NaN()),
testutil.TestMetric(math.Inf(1)),
testutil.TestMetric(math.Inf(-1)),
}
e.indexTmpl, _ = template.New("index").Parse(e.IndexName)
// Verify that we can connect to Opensearch
err := e.Connect()
require.NoError(t, err)
// Verify that we can fail for metric with unhandled NaN/inf/-inf values
for _, m := range metrics {
err = e.Write([]telegraf.Metric{m})
require.Error(t, err, "error sending bulk request to Opensearch: json: unsupported value: NaN")
}
}
func TestConnectAndWriteMetricWithNaNValueDropIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
container := launchTestContainer(t, imageVersion1)
defer container.Terminate()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}
e := &Opensearch{
URLs: urls,
IndexName: `test-{{.Time.Format "2006-01-02"}}`,
Timeout: config.Duration(time.Second * 5),
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: false,
HealthCheckInterval: config.Duration(time.Second * 10),
HealthCheckTimeout: config.Duration(time.Second * 1),
FloatHandling: "drop",
Log: testutil.Logger{},
}
metrics := []telegraf.Metric{
testutil.TestMetric(math.NaN()),
testutil.TestMetric(math.Inf(1)),
testutil.TestMetric(math.Inf(-1)),
}
e.indexTmpl, _ = template.New("index").Parse(e.IndexName)
// Verify that we can connect to Opensearch
err := e.Connect()
require.NoError(t, err)
// Verify that we can fail for metric with unhandled NaN/inf/-inf values
for _, m := range metrics {
err = e.Write([]telegraf.Metric{m})
require.NoError(t, err)
}
}
func TestConnectAndWriteMetricWithNaNValueReplacementIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
tests := []struct {
floatHandle string
floatReplacement float64
expectError bool
}{
{
"none",
0.0,
true,
},
{
"drop",
0.0,
false,
},
{
"replace",
0.0,
false,
},
}
container := launchTestContainer(t, imageVersion1)
defer container.Terminate()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}
for _, test := range tests {
e := &Opensearch{
URLs: urls,
IndexName: `test-{{.Time.Format "2006-01-02"}}`,
Timeout: config.Duration(time.Second * 5),
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: false,
HealthCheckInterval: config.Duration(time.Second * 10),
HealthCheckTimeout: config.Duration(time.Second * 1),
FloatHandling: test.floatHandle,
FloatReplacement: test.floatReplacement,
Log: testutil.Logger{},
}
metrics := []telegraf.Metric{
testutil.TestMetric(math.NaN()),
testutil.TestMetric(math.Inf(1)),
testutil.TestMetric(math.Inf(-1)),
}
e.indexTmpl, _ = template.New("index").Parse(e.IndexName)
err := e.Connect()
require.NoError(t, err)
for _, m := range metrics {
err = e.Write([]telegraf.Metric{m})
if test.expectError {
require.Error(t, err)
} else {
require.NoError(t, err)
}
}
}
}
func TestTemplateManagementEmptyTemplateIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
container := launchTestContainer(t, imageVersion1)
defer container.Terminate()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}
e := &Opensearch{
URLs: urls,
IndexName: `test-{{.Time.Format "2006-01-02"}}`,
Timeout: config.Duration(time.Second * 5),
EnableGzip: false,
ManageTemplate: true,
TemplateName: "",
OverwriteTemplate: true,
Log: testutil.Logger{},
}
err := e.Init()
require.Error(t, err)
}
func TestTemplateManagementIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
container := launchTestContainer(t, imageVersion1)
defer container.Terminate()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}
e := &Opensearch{
URLs: urls,
IndexName: `test-{{.Time.Format "2006-01-02"}}`,
Timeout: config.Duration(time.Second * 5),
EnableGzip: false,
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: true,
Log: testutil.Logger{},
}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout))
defer cancel()
e.indexTmpl, _ = template.New("index").Parse(e.IndexName)
err := e.Connect()
require.NoError(t, err)
err = e.manageTemplate(ctx)
require.NoError(t, err)
}
func TestTemplateInvalidIndexPatternIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
container := launchTestContainer(t, imageVersion1)
defer container.Terminate()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}
e := &Opensearch{
URLs: urls,
IndexName: `{{.Tag "tag1"}}-{{.Time.Format "2006-01-02"}}`,
Timeout: config.Duration(time.Second * 5),
EnableGzip: false,
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: true,
Log: testutil.Logger{},
}
e.indexTmpl, _ = template.New("index").Parse(e.IndexName)
err := e.Connect()
require.Error(t, err)
}

View File

@ -0,0 +1,330 @@
package opensearch
import (
"context"
"fmt"
"math"
"testing"
"text/template"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestConnectAndWriteIntegrationV2(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
container := launchTestContainer(t, imageVersion2)
defer container.Terminate()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}
e := &Opensearch{
URLs: urls,
IndexName: `test-{{.Time.Format "2006-01-02"}}`,
Timeout: config.Duration(time.Second * 5),
EnableGzip: false,
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: false,
HealthCheckInterval: config.Duration(time.Second * 10),
HealthCheckTimeout: config.Duration(time.Second * 1),
Log: testutil.Logger{},
}
e.indexTmpl, _ = template.New("index").Parse(e.IndexName)
// Verify that we can connect to Opensearch
require.NoError(t, e.Connect())
// Verify that we can successfully write data to Opensearch
require.NoError(t, e.Write(testutil.MockMetrics()))
}
func TestConnectAndWriteMetricWithNaNValueEmptyIntegrationV2(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
container := launchTestContainer(t, imageVersion2)
defer container.Terminate()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}
e := &Opensearch{
URLs: urls,
IndexName: `test-{{.Time.Format "2006-01-02"}}`,
Timeout: config.Duration(time.Second * 5),
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: false,
HealthCheckInterval: config.Duration(time.Second * 10),
HealthCheckTimeout: config.Duration(time.Second * 1),
Log: testutil.Logger{},
}
metrics := []telegraf.Metric{
testutil.TestMetric(math.NaN()),
testutil.TestMetric(math.Inf(1)),
testutil.TestMetric(math.Inf(-1)),
}
// Verify that we can connect to Opensearch
require.NoError(t, e.Init())
require.NoError(t, e.Connect())
// Verify that we can fail for metric with unhandled NaN/inf/-inf values
for _, m := range metrics {
require.Error(t, e.Write([]telegraf.Metric{m}), "error sending bulk request to Opensearch: "+
"json: unsupported value: NaN")
}
}
func TestConnectAndWriteMetricWithNaNValueNoneIntegrationV2(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
container := launchTestContainer(t, imageVersion2)
defer container.Terminate()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}
e := &Opensearch{
URLs: urls,
IndexName: `test-{{.Time.Format "2006-01-02"}}`,
Timeout: config.Duration(time.Second * 5),
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: false,
HealthCheckInterval: config.Duration(time.Second * 10),
HealthCheckTimeout: config.Duration(time.Second * 1),
FloatHandling: "none",
Log: testutil.Logger{},
}
metrics := []telegraf.Metric{
testutil.TestMetric(math.NaN()),
testutil.TestMetric(math.Inf(1)),
testutil.TestMetric(math.Inf(-1)),
}
e.indexTmpl, _ = template.New("index").Parse(e.IndexName)
// Verify that we can connect to Opensearch
err := e.Connect()
require.NoError(t, err)
// Verify that we can fail for metric with unhandled NaN/inf/-inf values
for _, m := range metrics {
err = e.Write([]telegraf.Metric{m})
require.Error(t, err, "error sending bulk request to Opensearch: json: unsupported value: NaN")
}
}
func TestConnectAndWriteMetricWithNaNValueDropIntegrationV2(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
container := launchTestContainer(t, imageVersion2)
defer container.Terminate()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}
e := &Opensearch{
URLs: urls,
IndexName: `test-{{.Time.Format "2006-01-02"}}`,
Timeout: config.Duration(time.Second * 5),
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: false,
HealthCheckInterval: config.Duration(time.Second * 10),
HealthCheckTimeout: config.Duration(time.Second * 1),
FloatHandling: "drop",
Log: testutil.Logger{},
}
metrics := []telegraf.Metric{
testutil.TestMetric(math.NaN()),
testutil.TestMetric(math.Inf(1)),
testutil.TestMetric(math.Inf(-1)),
}
e.indexTmpl, _ = template.New("index").Parse(e.IndexName)
// Verify that we can connect to Opensearch
err := e.Connect()
require.NoError(t, err)
// Verify that we can fail for metric with unhandled NaN/inf/-inf values
for _, m := range metrics {
err = e.Write([]telegraf.Metric{m})
require.NoError(t, err)
}
}
func TestConnectAndWriteMetricWithNaNValueReplacementIntegrationV2(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
tests := []struct {
floatHandle string
floatReplacement float64
expectError bool
}{
{
"none",
0.0,
true,
},
{
"drop",
0.0,
false,
},
{
"replace",
0.0,
false,
},
}
container := launchTestContainer(t, imageVersion2)
defer container.Terminate()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}
for _, test := range tests {
e := &Opensearch{
URLs: urls,
IndexName: `test-{{.Time.Format "2006-01-02"}}`,
Timeout: config.Duration(time.Second * 5),
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: false,
HealthCheckInterval: config.Duration(time.Second * 10),
HealthCheckTimeout: config.Duration(time.Second * 1),
FloatHandling: test.floatHandle,
FloatReplacement: test.floatReplacement,
Log: testutil.Logger{},
}
metrics := []telegraf.Metric{
testutil.TestMetric(math.NaN()),
testutil.TestMetric(math.Inf(1)),
testutil.TestMetric(math.Inf(-1)),
}
e.indexTmpl, _ = template.New("index").Parse(e.IndexName)
err := e.Connect()
require.NoError(t, err)
for _, m := range metrics {
err = e.Write([]telegraf.Metric{m})
if test.expectError {
require.Error(t, err)
} else {
require.NoError(t, err)
}
}
}
}
func TestTemplateManagementEmptyTemplateIntegrationV2(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
container := launchTestContainer(t, imageVersion2)
defer container.Terminate()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}
e := &Opensearch{
URLs: urls,
IndexName: `test-{{.Time.Format "2006-01-02"}}`,
Timeout: config.Duration(time.Second * 5),
EnableGzip: false,
ManageTemplate: true,
TemplateName: "",
OverwriteTemplate: true,
Log: testutil.Logger{},
}
err := e.Init()
require.Error(t, err)
}
func TestTemplateManagementIntegrationV2(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
container := launchTestContainer(t, imageVersion2)
defer container.Terminate()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}
e := &Opensearch{
URLs: urls,
IndexName: `test-{{.Time.Format "2006-01-02"}}`,
Timeout: config.Duration(time.Second * 5),
EnableGzip: false,
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: true,
Log: testutil.Logger{},
}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout))
defer cancel()
e.indexTmpl, _ = template.New("index").Parse(e.IndexName)
err := e.Connect()
require.NoError(t, err)
err = e.manageTemplate(ctx)
require.NoError(t, err)
}
func TestTemplateInvalidIndexPatternIntegrationV2(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
container := launchTestContainer(t, imageVersion2)
defer container.Terminate()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}
e := &Opensearch{
URLs: urls,
IndexName: `{{.Tag "tag1"}}-{{.Time.Format "2006-01-02"}}`,
Timeout: config.Duration(time.Second * 5),
EnableGzip: false,
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: true,
Log: testutil.Logger{},
}
e.indexTmpl, _ = template.New("index").Parse(e.IndexName)
err := e.Connect()
require.Error(t, err)
}

View File

@ -0,0 +1,93 @@
# Configuration for OpenSearch to send metrics to.
[[outputs.OpenSearch]]
## URLs
## The full HTTP endpoint URL for your OpenSearch instance. Multiple URLs can
## be specified as part of the same cluster, but only one URLs is used to
## write during each interval.
urls = ["http://node1.os.example.com:9200"]
## Index Name
## Target index name for metrics (OpenSearch will create if it not exists).
## This is a Golang template (see https://pkg.go.dev/text/template)
## You can also specify
## metric name (`{{.Name}}`), tag value (`{{.Tag "tag_name"}}`), field value (`{{.Field "feild_name"}}`)
## If the tag does not exist, the default tag value will be empty string "".
## the timestamp (`{{.Time.Format "xxxxxxxxx"}}`).
## For example: "telegraf-{{.Time.Format "2006-01-02"}}-{{.Tag "host"}}" would set it to telegraf-2023-07-27-HostName
index_name = ""
## Timeout
## OpenSearch client timeout
# timeout = "5s"
## Sniffer
## Set to true to ask OpenSearch a list of all cluster nodes,
## thus it is not necessary to list all nodes in the urls config option
# enable_sniffer = false
## GZIP Compression
## Set to true to enable gzip compression
# enable_gzip = false
## Health Check Interval
## Set the interval to check if the OpenSearch nodes are available
## Setting to "0s" will disable the health check (not recommended in production)
# health_check_interval = "10s"
## Set the timeout for periodic health checks.
# health_check_timeout = "1s"
## HTTP basic authentication details.
# username = ""
# password = ""
## HTTP bearer token authentication details
# auth_bearer_token = ""
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Template Config
## Manage templates
## Set to true if you want telegraf to manage its index template.
## If enabled it will create a recommended index template for telegraf indexes
# manage_template = true
## Template Name
## The template name used for telegraf indexes
# template_name = "telegraf"
## Overwrite Templates
## Set to true if you want telegraf to overwrite an existing template
# overwrite_template = false
## Document ID
## If set to true a unique ID hash will be sent as
## sha256(concat(timestamp,measurement,series-hash)) string. It will enable
## data resend and update metric points avoiding duplicated metrics with
## different id's
# force_document_id = false
## Value Handling
## Specifies the handling of NaN and Inf values.
## This option can have the following values:
## none -- do not modify field-values (default); will produce an error
## if NaNs or infs are encountered
## drop -- drop fields containing NaNs or infs
## replace -- replace with the value in "float_replacement_value" (default: 0.0)
## NaNs and inf will be replaced with the given number, -inf with the negative of that number
# float_handling = "none"
# float_replacement_value = 0.0
## Pipeline Config
## To use a ingest pipeline, set this to the name of the pipeline you want to use.
# use_pipeline = "my_pipeline"
## Pipeline Name
## Additionally, you can specify a tag name using the notation (`{{.Tag "tag_name"}}`)
## which will be used as the pipeline name (e.g. "{{.Tag "os_pipeline"}}").
## If the tag does not exist, the default pipeline will be used as the pipeline.
## If no default pipeline is set, no pipeline is used for the metric.
# default_pipeline = ""

View File

@ -0,0 +1,55 @@
{
"index_patterns" : [ "{{.TemplatePattern}}" ],
"settings": {
"index": {
"refresh_interval": "10s",
"mapping.total_fields.limit": 5000,
"auto_expand_replicas" : "0-1",
"codec" : "best_compression"
}
},
"mappings" : {
"properties" : {
"@timestamp" : { "type" : "date" },
"measurement_name" : { "type" : "keyword" }
},
"dynamic_templates": [
{
"tags": {
"match_mapping_type": "string",
"path_match": "tag.*",
"mapping": {
"ignore_above": 512,
"type": "keyword"
}
}
},
{
"metrics_long": {
"match_mapping_type": "long",
"mapping": {
"type": "float",
"index": false
}
}
},
{
"metrics_double": {
"match_mapping_type": "double",
"mapping": {
"type": "float",
"index": false
}
}
},
{
"text_fields": {
"match": "*",
"mapping": {
"norms": false
}
}
}
]
}
}