chore(outputs.azure_data_explorer): Move code to common in preparation of new plugin (#16523)

This commit is contained in:
Abhishek Saharn 2025-04-11 19:47:06 +05:30 committed by GitHub
parent dba814d3d2
commit 18c54e8d32
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 446 additions and 546 deletions

189
plugins/common/adx/adx.go Normal file
View File

@ -0,0 +1,189 @@
package adx
import (
"bytes"
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/Azure/azure-kusto-go/kusto"
kustoerrors "github.com/Azure/azure-kusto-go/kusto/data/errors"
"github.com/Azure/azure-kusto-go/kusto/ingest"
"github.com/Azure/azure-kusto-go/kusto/kql"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
)
const (
TablePerMetric = "tablepermetric"
SingleTable = "singletable"
// These control the amount of memory we use when ingesting blobs
bufferSize = 1 << 20 // 1 MiB
maxBuffers = 5
ManagedIngestion = "managed"
QueuedIngestion = "queued"
)
type Config struct {
Endpoint string `toml:"endpoint_url"`
Database string `toml:"database"`
Timeout config.Duration `toml:"timeout"`
MetricsGrouping string `toml:"metrics_grouping_type"`
TableName string `toml:"table_name"`
CreateTables bool `toml:"create_tables"`
IngestionType string `toml:"ingestion_type"`
}
type Client struct {
cfg *Config
client *kusto.Client
ingestors map[string]ingest.Ingestor
logger telegraf.Logger
}
func (cfg *Config) NewClient(app string, log telegraf.Logger) (*Client, error) {
if cfg.Endpoint == "" {
return nil, errors.New("endpoint configuration cannot be empty")
}
if cfg.Database == "" {
return nil, errors.New("database configuration cannot be empty")
}
cfg.MetricsGrouping = strings.ToLower(cfg.MetricsGrouping)
if cfg.MetricsGrouping == SingleTable && cfg.TableName == "" {
return nil, errors.New("table name cannot be empty for SingleTable metrics grouping type")
}
if cfg.MetricsGrouping == "" {
cfg.MetricsGrouping = TablePerMetric
}
if !(cfg.MetricsGrouping == SingleTable || cfg.MetricsGrouping == TablePerMetric) {
return nil, errors.New("metrics grouping type is not valid")
}
if cfg.Timeout == 0 {
cfg.Timeout = config.Duration(20 * time.Second)
}
switch cfg.IngestionType {
case "":
cfg.IngestionType = QueuedIngestion
case ManagedIngestion, QueuedIngestion:
// Do nothing as those are valid
default:
return nil, fmt.Errorf("unknown ingestion type %q", cfg.IngestionType)
}
conn := kusto.NewConnectionStringBuilder(cfg.Endpoint).WithDefaultAzureCredential()
conn.SetConnectorDetails("Telegraf", internal.ProductToken(), app, "", false, "")
client, err := kusto.New(conn)
if err != nil {
return nil, err
}
return &Client{
cfg: cfg,
ingestors: make(map[string]ingest.Ingestor),
logger: log,
client: client,
}, nil
}
// Clean up and close the ingestor
func (adx *Client) Close() error {
var errs []error
for _, v := range adx.ingestors {
if err := v.Close(); err != nil {
// accumulate errors while closing ingestors
errs = append(errs, err)
}
}
if err := adx.client.Close(); err != nil {
errs = append(errs, err)
}
adx.client = nil
adx.ingestors = nil
if len(errs) == 0 {
return nil
}
// Combine errors into a single object and return the combined error
return kustoerrors.GetCombinedError(errs...)
}
func (adx *Client) PushMetrics(format ingest.FileOption, tableName string, metrics []byte) error {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, time.Duration(adx.cfg.Timeout))
defer cancel()
metricIngestor, err := adx.getMetricIngestor(ctx, tableName)
if err != nil {
return err
}
reader := bytes.NewReader(metrics)
mapping := ingest.IngestionMappingRef(tableName+"_mapping", ingest.JSON)
if metricIngestor != nil {
if _, err := metricIngestor.FromReader(ctx, reader, format, mapping); err != nil {
return fmt.Errorf("sending ingestion request to Azure Data Explorer for table %q failed: %w", tableName, err)
}
}
return nil
}
func (adx *Client) getMetricIngestor(ctx context.Context, tableName string) (ingest.Ingestor, error) {
if ingestor := adx.ingestors[tableName]; ingestor != nil {
return ingestor, nil
}
if adx.cfg.CreateTables {
if _, err := adx.client.Mgmt(ctx, adx.cfg.Database, createTableCommand(tableName)); err != nil {
return nil, fmt.Errorf("creating table for %q failed: %w", tableName, err)
}
if _, err := adx.client.Mgmt(ctx, adx.cfg.Database, createTableMappingCommand(tableName)); err != nil {
return nil, err
}
}
// Create a new ingestor client for the table
var ingestor ingest.Ingestor
var err error
switch strings.ToLower(adx.cfg.IngestionType) {
case ManagedIngestion:
ingestor, err = ingest.NewManaged(adx.client, adx.cfg.Database, tableName)
case QueuedIngestion:
ingestor, err = ingest.New(adx.client, adx.cfg.Database, tableName, ingest.WithStaticBuffer(bufferSize, maxBuffers))
default:
return nil, fmt.Errorf(`ingestion_type has to be one of %q or %q`, ManagedIngestion, QueuedIngestion)
}
if err != nil {
return nil, fmt.Errorf("creating ingestor for %q failed: %w", tableName, err)
}
adx.ingestors[tableName] = ingestor
return ingestor, nil
}
func createTableCommand(table string) kusto.Statement {
builder := kql.New(`.create-merge table ['`).AddTable(table).AddLiteral(`'] `)
builder.AddLiteral(`(['fields']:dynamic, ['name']:string, ['tags']:dynamic, ['timestamp']:datetime);`)
return builder
}
func createTableMappingCommand(table string) kusto.Statement {
builder := kql.New(`.create-or-alter table ['`).AddTable(table).AddLiteral(`'] `)
builder.AddLiteral(`ingestion json mapping '`).AddTable(table + "_mapping").AddLiteral(`' `)
builder.AddLiteral(`'[{"column":"fields", `)
builder.AddLiteral(`"Properties":{"Path":"$[\'fields\']"}},{"column":"name", `)
builder.AddLiteral(`"Properties":{"Path":"$[\'name\']"}},{"column":"tags", `)
builder.AddLiteral(`"Properties":{"Path":"$[\'tags\']"}},{"column":"timestamp", `)
builder.AddLiteral(`"Properties":{"Path":"$[\'timestamp\']"}}]'`)
return builder
}

View File

@ -0,0 +1,219 @@
package adx
import (
"bufio"
"context"
"encoding/json"
"io"
"testing"
"time"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/ingest"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
serializers_json "github.com/influxdata/telegraf/plugins/serializers/json"
"github.com/influxdata/telegraf/testutil"
)
func TestInitBlankEndpointData(t *testing.T) {
plugin := Config{
Endpoint: "",
Database: "mydb",
}
_, err := plugin.NewClient("TestKusto.Telegraf", nil)
require.Error(t, err)
require.Equal(t, "endpoint configuration cannot be empty", err.Error())
}
func TestQueryConstruction(t *testing.T) {
const tableName = "mytable"
const expectedCreate = `.create-merge table ['mytable'] (['fields']:dynamic, ['name']:string, ['tags']:dynamic, ['timestamp']:datetime);`
const expectedMapping = `` +
`.create-or-alter table ['mytable'] ingestion json mapping 'mytable_mapping' '[{"column":"fields", ` +
`"Properties":{"Path":"$[\'fields\']"}},{"column":"name", "Properties":{"Path":"$[\'name\']"}},{"column":"tags", ` +
`"Properties":{"Path":"$[\'tags\']"}},{"column":"timestamp", "Properties":{"Path":"$[\'timestamp\']"}}]'`
require.Equal(t, expectedCreate, createTableCommand(tableName).String())
require.Equal(t, expectedMapping, createTableMappingCommand(tableName).String())
}
func TestGetMetricIngestor(t *testing.T) {
plugin := Client{
logger: testutil.Logger{},
client: kusto.NewMockClient(),
cfg: &Config{
Database: "mydb",
IngestionType: QueuedIngestion,
},
ingestors: map[string]ingest.Ingestor{"test1": &fakeIngestor{}},
}
ingestor, err := plugin.getMetricIngestor(t.Context(), "test1")
require.NoError(t, err)
require.NotNil(t, ingestor)
}
func TestGetMetricIngestorNoIngester(t *testing.T) {
plugin := Client{
logger: testutil.Logger{},
client: kusto.NewMockClient(),
cfg: &Config{
IngestionType: QueuedIngestion,
},
ingestors: map[string]ingest.Ingestor{"test1": &fakeIngestor{}},
}
ingestor, err := plugin.getMetricIngestor(t.Context(), "test1")
require.NoError(t, err)
require.NotNil(t, ingestor)
}
func TestPushMetrics(t *testing.T) {
plugin := Client{
logger: testutil.Logger{},
client: kusto.NewMockClient(),
cfg: &Config{
Database: "mydb",
Endpoint: "https://ingest-test.westus.kusto.windows.net",
IngestionType: QueuedIngestion,
},
ingestors: map[string]ingest.Ingestor{"test1": &fakeIngestor{}},
}
metrics := []byte(`{"fields": {"value": 1}, "name": "test1", "tags": {"tag1": "value1"}, "timestamp": "2021-01-01T00:00:00Z"}`)
require.NoError(t, plugin.PushMetrics(ingest.FileFormat(ingest.JSON), "test1", metrics))
}
func TestPushMetricsOutputs(t *testing.T) {
testCases := []struct {
name string
inputMetric []telegraf.Metric
metricsGrouping string
createTables bool
ingestionType string
}{
{
name: "Valid metric",
inputMetric: testutil.MockMetrics(),
createTables: true,
metricsGrouping: TablePerMetric,
},
{
name: "Don't create tables'",
inputMetric: testutil.MockMetrics(),
createTables: false,
metricsGrouping: TablePerMetric,
},
{
name: "SingleTable metric grouping type",
inputMetric: testutil.MockMetrics(),
createTables: true,
metricsGrouping: SingleTable,
},
{
name: "Valid metric managed ingestion",
inputMetric: testutil.MockMetrics(),
createTables: true,
metricsGrouping: TablePerMetric,
ingestionType: ManagedIngestion,
},
}
var expectedMetric = map[string]interface{}{
"metricName": "test1",
"fields": map[string]interface{}{
"value": 1.0,
},
"tags": map[string]interface{}{
"tag1": "value1",
},
"timestamp": float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).UnixNano() / int64(time.Second)),
}
for _, tC := range testCases {
t.Run(tC.name, func(t *testing.T) {
ingestionType := "queued"
if tC.ingestionType != "" {
ingestionType = tC.ingestionType
}
serializer := &serializers_json.Serializer{
TimestampUnits: config.Duration(time.Nanosecond),
TimestampFormat: time.RFC3339Nano,
}
cfg := &Config{
Endpoint: "https://someendpoint.kusto.net",
Database: "databasename",
MetricsGrouping: tC.metricsGrouping,
TableName: "test1",
CreateTables: tC.createTables,
IngestionType: ingestionType,
Timeout: config.Duration(20 * time.Second),
}
client, err := cfg.NewClient("telegraf", &testutil.Logger{})
require.NoError(t, err)
// Inject the ingestor
ingestor := &fakeIngestor{}
client.ingestors["test1"] = ingestor
tableMetricGroups := make(map[string][]byte)
mockmetrics := testutil.MockMetrics()
for _, m := range mockmetrics {
metricInBytes, err := serializer.Serialize(m)
require.NoError(t, err)
tableMetricGroups[m.Name()] = append(tableMetricGroups[m.Name()], metricInBytes...)
}
format := ingest.FileFormat(ingest.JSON)
for tableName, tableMetrics := range tableMetricGroups {
require.NoError(t, client.PushMetrics(format, tableName, tableMetrics))
createdFakeIngestor := ingestor
require.EqualValues(t, expectedMetric["metricName"], createdFakeIngestor.actualOutputMetric["name"])
require.EqualValues(t, expectedMetric["fields"], createdFakeIngestor.actualOutputMetric["fields"])
require.EqualValues(t, expectedMetric["tags"], createdFakeIngestor.actualOutputMetric["tags"])
timestampStr := createdFakeIngestor.actualOutputMetric["timestamp"].(string)
parsedTime, err := time.Parse(time.RFC3339Nano, timestampStr)
parsedTimeFloat := float64(parsedTime.UnixNano()) / 1e9
require.NoError(t, err)
require.InDelta(t, expectedMetric["timestamp"].(float64), parsedTimeFloat, testutil.DefaultDelta)
}
})
}
}
func TestAlreadyClosed(t *testing.T) {
plugin := Client{
logger: testutil.Logger{},
cfg: &Config{
IngestionType: QueuedIngestion,
},
client: kusto.NewMockClient(),
}
require.NoError(t, plugin.Close())
}
type fakeIngestor struct {
actualOutputMetric map[string]interface{}
}
func (f *fakeIngestor) FromReader(_ context.Context, reader io.Reader, _ ...ingest.FileOption) (*ingest.Result, error) {
scanner := bufio.NewScanner(reader)
scanner.Scan()
firstLine := scanner.Text()
err := json.Unmarshal([]byte(firstLine), &f.actualOutputMetric)
if err != nil {
return nil, err
}
return &ingest.Result{}, nil
}
func (*fakeIngestor) FromFile(_ context.Context, _ string, _ ...ingest.FileOption) (*ingest.Result, error) {
return &ingest.Result{}, nil
}
func (*fakeIngestor) Close() error {
return nil
}

View File

@ -2,23 +2,15 @@
package azure_data_explorer
import (
"bytes"
"context"
_ "embed"
"errors"
"fmt"
"strings"
"time"
"github.com/Azure/azure-kusto-go/kusto"
kustoerrors "github.com/Azure/azure-kusto-go/kusto/data/errors"
"github.com/Azure/azure-kusto-go/kusto/ingest"
"github.com/Azure/azure-kusto-go/kusto/kql"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/choice"
common_adx "github.com/influxdata/telegraf/plugins/common/adx"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers/json"
)
@ -27,75 +19,45 @@ import (
var sampleConfig string
type AzureDataExplorer struct {
Endpoint string `toml:"endpoint_url"`
Database string `toml:"database"`
Log telegraf.Logger `toml:"-"`
Timeout config.Duration `toml:"timeout"`
MetricsGrouping string `toml:"metrics_grouping_type"`
TableName string `toml:"table_name"`
CreateTables bool `toml:"create_tables"`
IngestionType string `toml:"ingestion_type"`
serializer telegraf.Serializer
kustoClient *kusto.Client
metricIngestors map[string]ingest.Ingestor
Log telegraf.Logger `toml:"-"`
common_adx.Config
serializer telegraf.Serializer
client *common_adx.Client
}
const (
tablePerMetric = "tablepermetric"
singleTable = "singletable"
// These control the amount of memory we use when ingesting blobs
bufferSize = 1 << 20 // 1 MiB
maxBuffers = 5
)
const managedIngestion = "managed"
const queuedIngestion = "queued"
func (*AzureDataExplorer) SampleConfig() string {
return sampleConfig
}
// Initialize the client and the ingestor
func (adx *AzureDataExplorer) Connect() error {
conn := kusto.NewConnectionStringBuilder(adx.Endpoint).WithDefaultAzureCredential()
// Since init is called before connect, we can set the connector details here including the type. This will be used for telemetry and tracing.
conn.SetConnectorDetails("Telegraf", internal.ProductToken(), "", "", false, "")
client, err := kusto.New(conn)
if err != nil {
func (adx *AzureDataExplorer) Init() error {
serializer := &json.Serializer{
TimestampUnits: config.Duration(time.Nanosecond),
TimestampFormat: time.RFC3339Nano,
}
if err := serializer.Init(); err != nil {
return err
}
adx.kustoClient = client
adx.metricIngestors = make(map[string]ingest.Ingestor)
adx.serializer = serializer
return nil
}
func (adx *AzureDataExplorer) Connect() error {
var err error
if adx.client, err = adx.Config.NewClient("Kusto.Telegraf", adx.Log); err != nil {
return fmt.Errorf("creating new client failed: %w", err)
}
return nil
}
// Clean up and close the ingestor
func (adx *AzureDataExplorer) Close() error {
var errs []error
for _, v := range adx.metricIngestors {
if err := v.Close(); err != nil {
// accumulate errors while closing ingestors
errs = append(errs, err)
}
}
if err := adx.kustoClient.Close(); err != nil {
errs = append(errs, err)
}
adx.kustoClient = nil
adx.metricIngestors = nil
if len(errs) == 0 {
adx.Log.Info("Closed ingestors and client")
return nil
}
// Combine errors into a single object and return the combined error
return kustoerrors.GetCombinedError(errs...)
return adx.client.Close()
}
func (adx *AzureDataExplorer) Write(metrics []telegraf.Metric) error {
if adx.MetricsGrouping == tablePerMetric {
if adx.MetricsGrouping == common_adx.TablePerMetric {
return adx.writeTablePerMetric(metrics)
}
return adx.writeSingleTable(metrics)
@ -116,14 +78,11 @@ func (adx *AzureDataExplorer) writeTablePerMetric(metrics []telegraf.Metric) err
tableMetricGroups[tableName] = metricInBytes
}
}
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, time.Duration(adx.Timeout))
defer cancel()
// Push the metrics for each table
format := ingest.FileFormat(ingest.JSON)
for tableName, tableMetrics := range tableMetricGroups {
if err := adx.pushMetrics(ctx, format, tableName, tableMetrics); err != nil {
if err := adx.client.PushMetrics(format, tableName, tableMetrics); err != nil {
return err
}
}
@ -142,146 +101,18 @@ func (adx *AzureDataExplorer) writeSingleTable(metrics []telegraf.Metric) error
metricsArray = append(metricsArray, metricsInBytes...)
}
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, time.Duration(adx.Timeout))
defer cancel()
// push metrics to a single table
format := ingest.FileFormat(ingest.JSON)
err := adx.pushMetrics(ctx, format, adx.TableName, metricsArray)
err := adx.client.PushMetrics(format, adx.TableName, metricsArray)
return err
}
func (adx *AzureDataExplorer) pushMetrics(ctx context.Context, format ingest.FileOption, tableName string, metricsArray []byte) error {
var metricIngestor ingest.Ingestor
var err error
metricIngestor, err = adx.getMetricIngestor(ctx, tableName)
if err != nil {
return err
}
length := len(metricsArray)
adx.Log.Debugf("Writing %d metrics to table %q", length, tableName)
reader := bytes.NewReader(metricsArray)
mapping := ingest.IngestionMappingRef(tableName+"_mapping", ingest.JSON)
if metricIngestor != nil {
if _, err := metricIngestor.FromReader(ctx, reader, format, mapping); err != nil {
adx.Log.Errorf("sending ingestion request to Azure Data Explorer for table %q failed: %v", tableName, err)
}
}
return nil
}
func (adx *AzureDataExplorer) getMetricIngestor(ctx context.Context, tableName string) (ingest.Ingestor, error) {
ingestor := adx.metricIngestors[tableName]
if ingestor == nil {
if err := adx.createAzureDataExplorerTable(ctx, tableName); err != nil {
return nil, fmt.Errorf("creating table for %q failed: %w", tableName, err)
}
// create a new ingestor client for the table
tempIngestor, err := createIngestorByTable(adx.kustoClient, adx.Database, tableName, adx.IngestionType)
if err != nil {
return nil, fmt.Errorf("creating ingestor for %q failed: %w", tableName, err)
}
adx.metricIngestors[tableName] = tempIngestor
adx.Log.Debugf("Ingestor for table %s created", tableName)
ingestor = tempIngestor
}
return ingestor, nil
}
func (adx *AzureDataExplorer) createAzureDataExplorerTable(ctx context.Context, tableName string) error {
if !adx.CreateTables {
adx.Log.Info("skipped table creation")
return nil
}
if _, err := adx.kustoClient.Mgmt(ctx, adx.Database, createTableCommand(tableName)); err != nil {
return err
}
if _, err := adx.kustoClient.Mgmt(ctx, adx.Database, createTableMappingCommand(tableName)); err != nil {
return err
}
return nil
}
func (adx *AzureDataExplorer) Init() error {
if adx.Endpoint == "" {
return errors.New("endpoint configuration cannot be empty")
}
if adx.Database == "" {
return errors.New("database configuration cannot be empty")
}
adx.MetricsGrouping = strings.ToLower(adx.MetricsGrouping)
if adx.MetricsGrouping == singleTable && adx.TableName == "" {
return errors.New("table name cannot be empty for SingleTable metrics grouping type")
}
if adx.MetricsGrouping == "" {
adx.MetricsGrouping = tablePerMetric
}
if !(adx.MetricsGrouping == singleTable || adx.MetricsGrouping == tablePerMetric) {
return errors.New("metrics grouping type is not valid")
}
if adx.IngestionType == "" {
adx.IngestionType = queuedIngestion
} else if !(choice.Contains(adx.IngestionType, []string{managedIngestion, queuedIngestion})) {
return fmt.Errorf("unknown ingestion type %q", adx.IngestionType)
}
serializer := &json.Serializer{
TimestampUnits: config.Duration(time.Nanosecond),
TimestampFormat: time.RFC3339Nano,
}
if err := serializer.Init(); err != nil {
return err
}
adx.serializer = serializer
return nil
}
func init() {
outputs.Add("azure_data_explorer", func() telegraf.Output {
return &AzureDataExplorer{
Timeout: config.Duration(20 * time.Second),
CreateTables: true,
Config: common_adx.Config{
CreateTables: true,
Timeout: config.Duration(20 * time.Second)},
}
})
}
// For each table create the ingestor
func createIngestorByTable(client *kusto.Client, database, tableName, ingestionType string) (ingest.Ingestor, error) {
switch strings.ToLower(ingestionType) {
case managedIngestion:
mi, err := ingest.NewManaged(client, database, tableName)
return mi, err
case queuedIngestion:
qi, err := ingest.New(client, database, tableName, ingest.WithStaticBuffer(bufferSize, maxBuffers))
return qi, err
}
return nil, fmt.Errorf(`ingestion_type has to be one of %q or %q`, managedIngestion, queuedIngestion)
}
func createTableCommand(table string) kusto.Statement {
builder := kql.New(`.create-merge table ['`).AddTable(table).AddLiteral(`'] `)
builder.AddLiteral(`(['fields']:dynamic, ['name']:string, ['tags']:dynamic, ['timestamp']:datetime);`)
return builder
}
func createTableMappingCommand(table string) kusto.Statement {
builder := kql.New(`.create-or-alter table ['`).AddTable(table).AddLiteral(`'] `)
builder.AddLiteral(`ingestion json mapping '`).AddTable(table + "_mapping").AddLiteral(`' `)
builder.AddLiteral(`'[{"column":"fields", `)
builder.AddLiteral(`"Properties":{"Path":"$[\'fields\']"}},{"column":"name", `)
builder.AddLiteral(`"Properties":{"Path":"$[\'name\']"}},{"column":"tags", `)
builder.AddLiteral(`"Properties":{"Path":"$[\'tags\']"}},{"column":"timestamp", `)
builder.AddLiteral(`"Properties":{"Path":"$[\'timestamp\']"}}]'`)
return builder
}

View File

@ -1,369 +1,30 @@
package azure_data_explorer
import (
"bufio"
"bytes"
"context"
"encoding/json"
"io"
"log"
"os"
"strings"
"testing"
"time"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/ingest"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
serializers_json "github.com/influxdata/telegraf/plugins/serializers/json"
common_adx "github.com/influxdata/telegraf/plugins/common/adx"
"github.com/influxdata/telegraf/testutil"
)
func TestWrite(t *testing.T) {
testCases := []struct {
name string
inputMetric []telegraf.Metric
metricsGrouping string
tableName string
expected map[string]interface{}
expectedWriteError string
createTables bool
ingestionType string
}{
{
name: "Valid metric",
inputMetric: testutil.MockMetrics(),
createTables: true,
tableName: "test1",
metricsGrouping: tablePerMetric,
expected: map[string]interface{}{
"metricName": "test1",
"fields": map[string]interface{}{
"value": 1.0,
},
"tags": map[string]interface{}{
"tag1": "value1",
},
"timestamp": float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).UnixNano() / int64(time.Second)),
},
},
{
name: "Don't create tables'",
inputMetric: testutil.MockMetrics(),
createTables: false,
tableName: "test1",
metricsGrouping: tablePerMetric,
expected: map[string]interface{}{
"metricName": "test1",
"fields": map[string]interface{}{
"value": 1.0,
},
"tags": map[string]interface{}{
"tag1": "value1",
},
"timestamp": float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).UnixNano() / int64(time.Second)),
},
},
{
name: "SingleTable metric grouping type",
inputMetric: testutil.MockMetrics(),
createTables: true,
tableName: "test1",
metricsGrouping: singleTable,
expected: map[string]interface{}{
"metricName": "test1",
"fields": map[string]interface{}{
"value": 1.0,
},
"tags": map[string]interface{}{
"tag1": "value1",
},
"timestamp": float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).UnixNano() / int64(time.Second)),
},
},
{
name: "Valid metric managed ingestion",
inputMetric: testutil.MockMetrics(),
createTables: true,
tableName: "test1",
metricsGrouping: tablePerMetric,
ingestionType: managedIngestion,
expected: map[string]interface{}{
"metricName": "test1",
"fields": map[string]interface{}{
"value": 1.0,
},
"tags": map[string]interface{}{
"tag1": "value1",
},
"timestamp": float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).UnixNano() / int64(time.Second)),
},
},
}
for _, tC := range testCases {
t.Run(tC.name, func(t *testing.T) {
serializer := &serializers_json.Serializer{}
require.NoError(t, serializer.Init())
ingestionType := "queued"
if tC.ingestionType != "" {
ingestionType = tC.ingestionType
}
localFakeIngestor := &fakeIngestor{}
plugin := AzureDataExplorer{
Endpoint: "someendpoint",
Database: "databasename",
Log: testutil.Logger{},
MetricsGrouping: tC.metricsGrouping,
TableName: tC.tableName,
CreateTables: tC.createTables,
kustoClient: kusto.NewMockClient(),
metricIngestors: map[string]ingest.Ingestor{
tC.tableName: localFakeIngestor,
},
serializer: serializer,
IngestionType: ingestionType,
}
errorInWrite := plugin.Write(testutil.MockMetrics())
if tC.expectedWriteError != "" {
require.EqualError(t, errorInWrite, tC.expectedWriteError)
} else {
require.NoError(t, errorInWrite)
expectedNameOfMetric := tC.expected["metricName"].(string)
createdFakeIngestor := localFakeIngestor
require.Equal(t, expectedNameOfMetric, createdFakeIngestor.actualOutputMetric["name"])
expectedFields := tC.expected["fields"].(map[string]interface{})
require.Equal(t, expectedFields, createdFakeIngestor.actualOutputMetric["fields"])
expectedTags := tC.expected["tags"].(map[string]interface{})
require.Equal(t, expectedTags, createdFakeIngestor.actualOutputMetric["tags"])
expectedTime := tC.expected["timestamp"].(float64)
require.InDelta(t, expectedTime, createdFakeIngestor.actualOutputMetric["timestamp"], testutil.DefaultDelta)
}
})
}
}
func TestCreateAzureDataExplorerTable(t *testing.T) {
serializer := &serializers_json.Serializer{}
require.NoError(t, serializer.Init())
func TestInit(t *testing.T) {
plugin := AzureDataExplorer{
Endpoint: "someendpoint",
Database: "databasename",
Log: testutil.Logger{},
MetricsGrouping: tablePerMetric,
TableName: "test1",
CreateTables: false,
kustoClient: kusto.NewMockClient(),
metricIngestors: map[string]ingest.Ingestor{
"test1": &fakeIngestor{},
Log: testutil.Logger{},
client: &common_adx.Client{},
Config: common_adx.Config{
Endpoint: "someendpoint",
},
serializer: serializer,
IngestionType: queuedIngestion,
}
var buf bytes.Buffer
log.SetOutput(&buf)
defer func() {
log.SetOutput(os.Stderr)
}()
err := plugin.createAzureDataExplorerTable(t.Context(), "test1")
output := buf.String()
if err == nil && !strings.Contains(output, "skipped table creation") {
t.Logf("FAILED : TestCreateAzureDataExplorerTable: Should have skipped table creation.")
t.Fail()
}
err := plugin.Init()
require.NoError(t, err)
}
func TestWriteWithType(t *testing.T) {
metricName := "test1"
fakeClient := kusto.NewMockClient()
expectedResultMap := map[string]string{metricName: `{"fields":{"value":1},"name":"test1","tags":{"tag1":"value1"},"timestamp":1257894000}`}
mockMetrics := testutil.MockMetrics()
// Multi tables
mockMetricsMulti := []telegraf.Metric{
testutil.TestMetric(1.0, "test2"),
testutil.TestMetric(2.0, "test3"),
}
expectedResultMap2 := map[string]string{
"test2": `{"fields":{"value":1.0},"name":"test2","tags":{"tag1":"value1"},"timestamp":1257894000}`,
"test3": `{"fields":{"value":2.0},"name":"test3","tags":{"tag1":"value1"},"timestamp":1257894000}`,
}
// List of tests
testCases := []struct {
name string
inputMetric []telegraf.Metric
metricsGrouping string
tableNameToExpectedResult map[string]string
expectedWriteError string
createTables bool
ingestionType string
}{
{
name: "Valid metric",
inputMetric: mockMetrics,
createTables: true,
metricsGrouping: tablePerMetric,
tableNameToExpectedResult: expectedResultMap,
},
{
name: "Don't create tables'",
inputMetric: mockMetrics,
createTables: false,
metricsGrouping: tablePerMetric,
tableNameToExpectedResult: expectedResultMap,
},
{
name: "SingleTable metric grouping type",
inputMetric: mockMetrics,
createTables: true,
metricsGrouping: singleTable,
tableNameToExpectedResult: expectedResultMap,
},
{
name: "Valid metric managed ingestion",
inputMetric: mockMetrics,
createTables: true,
metricsGrouping: tablePerMetric,
tableNameToExpectedResult: expectedResultMap,
ingestionType: managedIngestion,
},
{
name: "Table per metric type",
inputMetric: mockMetricsMulti,
createTables: true,
metricsGrouping: tablePerMetric,
tableNameToExpectedResult: expectedResultMap2,
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
serializer := &serializers_json.Serializer{}
require.NoError(t, serializer.Init())
for tableName, jsonValue := range testCase.tableNameToExpectedResult {
ingestionType := "queued"
if testCase.ingestionType != "" {
ingestionType = testCase.ingestionType
}
mockIngestor := &mockIngestor{}
plugin := AzureDataExplorer{
Endpoint: "someendpoint",
Database: "databasename",
Log: testutil.Logger{},
IngestionType: ingestionType,
MetricsGrouping: testCase.metricsGrouping,
TableName: tableName,
CreateTables: testCase.createTables,
kustoClient: fakeClient,
metricIngestors: map[string]ingest.Ingestor{
tableName: mockIngestor,
},
serializer: serializer,
}
err := plugin.Write(testCase.inputMetric)
if testCase.expectedWriteError != "" {
require.EqualError(t, err, testCase.expectedWriteError)
continue
}
require.NoError(t, err)
createdIngestor := plugin.metricIngestors[tableName]
if testCase.metricsGrouping == singleTable {
createdIngestor = plugin.metricIngestors[tableName]
}
records := mockIngestor.records[0] // the first element
require.NotNil(t, createdIngestor)
require.JSONEq(t, jsonValue, records)
}
})
}
}
func TestInitBlankEndpointData(t *testing.T) {
func TestConnectBlankEndpointData(t *testing.T) {
plugin := AzureDataExplorer{
Log: testutil.Logger{},
kustoClient: kusto.NewMockClient(),
metricIngestors: map[string]ingest.Ingestor{},
Log: testutil.Logger{},
}
errorInit := plugin.Init()
require.Error(t, errorInit)
require.Equal(t, "endpoint configuration cannot be empty", errorInit.Error())
}
func TestQueryConstruction(t *testing.T) {
const tableName = "mytable"
const expectedCreate = `.create-merge table ['mytable'] (['fields']:dynamic, ['name']:string, ['tags']:dynamic, ['timestamp']:datetime);`
const expectedMapping = `` +
`.create-or-alter table ['mytable'] ingestion json mapping 'mytable_mapping' '[{"column":"fields", ` +
`"Properties":{"Path":"$[\'fields\']"}},{"column":"name", "Properties":{"Path":"$[\'name\']"}},{"column":"tags", ` +
`"Properties":{"Path":"$[\'tags\']"}},{"column":"timestamp", "Properties":{"Path":"$[\'timestamp\']"}}]'`
require.Equal(t, expectedCreate, createTableCommand(tableName).String())
require.Equal(t, expectedMapping, createTableMappingCommand(tableName).String())
}
type fakeIngestor struct {
actualOutputMetric map[string]interface{}
}
func (f *fakeIngestor) FromReader(_ context.Context, reader io.Reader, _ ...ingest.FileOption) (*ingest.Result, error) {
scanner := bufio.NewScanner(reader)
scanner.Scan()
firstLine := scanner.Text()
err := json.Unmarshal([]byte(firstLine), &f.actualOutputMetric)
if err != nil {
return nil, err
}
return &ingest.Result{}, nil
}
func (*fakeIngestor) FromFile(context.Context, string, ...ingest.FileOption) (*ingest.Result, error) {
return &ingest.Result{}, nil
}
func (*fakeIngestor) Close() error {
return nil
}
type mockIngestor struct {
records []string
}
func (m *mockIngestor) FromReader(_ context.Context, reader io.Reader, _ ...ingest.FileOption) (*ingest.Result, error) {
bufbytes, err := io.ReadAll(reader)
if err != nil {
return nil, err
}
metricjson := string(bufbytes)
m.SetRecords(strings.Split(metricjson, "\n"))
return &ingest.Result{}, nil
}
func (*mockIngestor) FromFile(context.Context, string, ...ingest.FileOption) (*ingest.Result, error) {
return &ingest.Result{}, nil
}
func (m *mockIngestor) SetRecords(records []string) {
m.records = records
}
// Name receives a copy of Foo since it doesn't need to modify it.
func (m *mockIngestor) Records() []string {
return m.records
}
func (*mockIngestor) Close() error {
return nil
require.ErrorContains(t, plugin.Connect(), "endpoint configuration cannot be empty")
}