From 18c54e8d3204c80633a02f26742acb6a0f05cd73 Mon Sep 17 00:00:00 2001 From: Abhishek Saharn <102726227+asaharn@users.noreply.github.com> Date: Fri, 11 Apr 2025 19:47:06 +0530 Subject: [PATCH] chore(outputs.azure_data_explorer): Move code to common in preparation of new plugin (#16523) --- plugins/common/adx/adx.go | 189 +++++++++ plugins/common/adx/adx_test.go | 219 +++++++++++ .../azure_data_explorer.go | 223 ++--------- .../azure_data_explorer_test.go | 361 +----------------- 4 files changed, 446 insertions(+), 546 deletions(-) create mode 100644 plugins/common/adx/adx.go create mode 100644 plugins/common/adx/adx_test.go diff --git a/plugins/common/adx/adx.go b/plugins/common/adx/adx.go new file mode 100644 index 000000000..b56c32f0c --- /dev/null +++ b/plugins/common/adx/adx.go @@ -0,0 +1,189 @@ +package adx + +import ( + "bytes" + "context" + "errors" + "fmt" + "strings" + "time" + + "github.com/Azure/azure-kusto-go/kusto" + kustoerrors "github.com/Azure/azure-kusto-go/kusto/data/errors" + "github.com/Azure/azure-kusto-go/kusto/ingest" + "github.com/Azure/azure-kusto-go/kusto/kql" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" +) + +const ( + TablePerMetric = "tablepermetric" + SingleTable = "singletable" + // These control the amount of memory we use when ingesting blobs + bufferSize = 1 << 20 // 1 MiB + maxBuffers = 5 + ManagedIngestion = "managed" + QueuedIngestion = "queued" +) + +type Config struct { + Endpoint string `toml:"endpoint_url"` + Database string `toml:"database"` + Timeout config.Duration `toml:"timeout"` + MetricsGrouping string `toml:"metrics_grouping_type"` + TableName string `toml:"table_name"` + CreateTables bool `toml:"create_tables"` + IngestionType string `toml:"ingestion_type"` +} + +type Client struct { + cfg *Config + client *kusto.Client + ingestors map[string]ingest.Ingestor + logger telegraf.Logger +} + +func (cfg *Config) NewClient(app string, log telegraf.Logger) (*Client, error) { + if cfg.Endpoint == "" { + return nil, errors.New("endpoint configuration cannot be empty") + } + if cfg.Database == "" { + return nil, errors.New("database configuration cannot be empty") + } + + cfg.MetricsGrouping = strings.ToLower(cfg.MetricsGrouping) + if cfg.MetricsGrouping == SingleTable && cfg.TableName == "" { + return nil, errors.New("table name cannot be empty for SingleTable metrics grouping type") + } + + if cfg.MetricsGrouping == "" { + cfg.MetricsGrouping = TablePerMetric + } + + if !(cfg.MetricsGrouping == SingleTable || cfg.MetricsGrouping == TablePerMetric) { + return nil, errors.New("metrics grouping type is not valid") + } + + if cfg.Timeout == 0 { + cfg.Timeout = config.Duration(20 * time.Second) + } + + switch cfg.IngestionType { + case "": + cfg.IngestionType = QueuedIngestion + case ManagedIngestion, QueuedIngestion: + // Do nothing as those are valid + default: + return nil, fmt.Errorf("unknown ingestion type %q", cfg.IngestionType) + } + + conn := kusto.NewConnectionStringBuilder(cfg.Endpoint).WithDefaultAzureCredential() + conn.SetConnectorDetails("Telegraf", internal.ProductToken(), app, "", false, "") + client, err := kusto.New(conn) + if err != nil { + return nil, err + } + return &Client{ + cfg: cfg, + ingestors: make(map[string]ingest.Ingestor), + logger: log, + client: client, + }, nil +} + +// Clean up and close the ingestor +func (adx *Client) Close() error { + var errs []error + for _, v := range adx.ingestors { + if err := v.Close(); err != nil { + // accumulate errors while closing ingestors + errs = append(errs, err) + } + } + if err := adx.client.Close(); err != nil { + errs = append(errs, err) + } + + adx.client = nil + adx.ingestors = nil + + if len(errs) == 0 { + return nil + } + // Combine errors into a single object and return the combined error + return kustoerrors.GetCombinedError(errs...) +} + +func (adx *Client) PushMetrics(format ingest.FileOption, tableName string, metrics []byte) error { + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, time.Duration(adx.cfg.Timeout)) + defer cancel() + metricIngestor, err := adx.getMetricIngestor(ctx, tableName) + if err != nil { + return err + } + + reader := bytes.NewReader(metrics) + mapping := ingest.IngestionMappingRef(tableName+"_mapping", ingest.JSON) + if metricIngestor != nil { + if _, err := metricIngestor.FromReader(ctx, reader, format, mapping); err != nil { + return fmt.Errorf("sending ingestion request to Azure Data Explorer for table %q failed: %w", tableName, err) + } + } + return nil +} + +func (adx *Client) getMetricIngestor(ctx context.Context, tableName string) (ingest.Ingestor, error) { + if ingestor := adx.ingestors[tableName]; ingestor != nil { + return ingestor, nil + } + + if adx.cfg.CreateTables { + if _, err := adx.client.Mgmt(ctx, adx.cfg.Database, createTableCommand(tableName)); err != nil { + return nil, fmt.Errorf("creating table for %q failed: %w", tableName, err) + } + + if _, err := adx.client.Mgmt(ctx, adx.cfg.Database, createTableMappingCommand(tableName)); err != nil { + return nil, err + } + } + + // Create a new ingestor client for the table + var ingestor ingest.Ingestor + var err error + switch strings.ToLower(adx.cfg.IngestionType) { + case ManagedIngestion: + ingestor, err = ingest.NewManaged(adx.client, adx.cfg.Database, tableName) + case QueuedIngestion: + ingestor, err = ingest.New(adx.client, adx.cfg.Database, tableName, ingest.WithStaticBuffer(bufferSize, maxBuffers)) + default: + return nil, fmt.Errorf(`ingestion_type has to be one of %q or %q`, ManagedIngestion, QueuedIngestion) + } + if err != nil { + return nil, fmt.Errorf("creating ingestor for %q failed: %w", tableName, err) + } + adx.ingestors[tableName] = ingestor + + return ingestor, nil +} + +func createTableCommand(table string) kusto.Statement { + builder := kql.New(`.create-merge table ['`).AddTable(table).AddLiteral(`'] `) + builder.AddLiteral(`(['fields']:dynamic, ['name']:string, ['tags']:dynamic, ['timestamp']:datetime);`) + + return builder +} + +func createTableMappingCommand(table string) kusto.Statement { + builder := kql.New(`.create-or-alter table ['`).AddTable(table).AddLiteral(`'] `) + builder.AddLiteral(`ingestion json mapping '`).AddTable(table + "_mapping").AddLiteral(`' `) + builder.AddLiteral(`'[{"column":"fields", `) + builder.AddLiteral(`"Properties":{"Path":"$[\'fields\']"}},{"column":"name", `) + builder.AddLiteral(`"Properties":{"Path":"$[\'name\']"}},{"column":"tags", `) + builder.AddLiteral(`"Properties":{"Path":"$[\'tags\']"}},{"column":"timestamp", `) + builder.AddLiteral(`"Properties":{"Path":"$[\'timestamp\']"}}]'`) + + return builder +} diff --git a/plugins/common/adx/adx_test.go b/plugins/common/adx/adx_test.go new file mode 100644 index 000000000..c5dc4707e --- /dev/null +++ b/plugins/common/adx/adx_test.go @@ -0,0 +1,219 @@ +package adx + +import ( + "bufio" + "context" + "encoding/json" + "io" + "testing" + "time" + + "github.com/Azure/azure-kusto-go/kusto" + "github.com/Azure/azure-kusto-go/kusto/ingest" + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + serializers_json "github.com/influxdata/telegraf/plugins/serializers/json" + "github.com/influxdata/telegraf/testutil" +) + +func TestInitBlankEndpointData(t *testing.T) { + plugin := Config{ + Endpoint: "", + Database: "mydb", + } + + _, err := plugin.NewClient("TestKusto.Telegraf", nil) + require.Error(t, err) + require.Equal(t, "endpoint configuration cannot be empty", err.Error()) +} + +func TestQueryConstruction(t *testing.T) { + const tableName = "mytable" + const expectedCreate = `.create-merge table ['mytable'] (['fields']:dynamic, ['name']:string, ['tags']:dynamic, ['timestamp']:datetime);` + const expectedMapping = `` + + `.create-or-alter table ['mytable'] ingestion json mapping 'mytable_mapping' '[{"column":"fields", ` + + `"Properties":{"Path":"$[\'fields\']"}},{"column":"name", "Properties":{"Path":"$[\'name\']"}},{"column":"tags", ` + + `"Properties":{"Path":"$[\'tags\']"}},{"column":"timestamp", "Properties":{"Path":"$[\'timestamp\']"}}]'` + require.Equal(t, expectedCreate, createTableCommand(tableName).String()) + require.Equal(t, expectedMapping, createTableMappingCommand(tableName).String()) +} + +func TestGetMetricIngestor(t *testing.T) { + plugin := Client{ + logger: testutil.Logger{}, + client: kusto.NewMockClient(), + cfg: &Config{ + Database: "mydb", + IngestionType: QueuedIngestion, + }, + ingestors: map[string]ingest.Ingestor{"test1": &fakeIngestor{}}, + } + + ingestor, err := plugin.getMetricIngestor(t.Context(), "test1") + require.NoError(t, err) + require.NotNil(t, ingestor) +} + +func TestGetMetricIngestorNoIngester(t *testing.T) { + plugin := Client{ + logger: testutil.Logger{}, + client: kusto.NewMockClient(), + cfg: &Config{ + IngestionType: QueuedIngestion, + }, + ingestors: map[string]ingest.Ingestor{"test1": &fakeIngestor{}}, + } + + ingestor, err := plugin.getMetricIngestor(t.Context(), "test1") + require.NoError(t, err) + require.NotNil(t, ingestor) +} + +func TestPushMetrics(t *testing.T) { + plugin := Client{ + logger: testutil.Logger{}, + client: kusto.NewMockClient(), + cfg: &Config{ + Database: "mydb", + Endpoint: "https://ingest-test.westus.kusto.windows.net", + IngestionType: QueuedIngestion, + }, + ingestors: map[string]ingest.Ingestor{"test1": &fakeIngestor{}}, + } + + metrics := []byte(`{"fields": {"value": 1}, "name": "test1", "tags": {"tag1": "value1"}, "timestamp": "2021-01-01T00:00:00Z"}`) + require.NoError(t, plugin.PushMetrics(ingest.FileFormat(ingest.JSON), "test1", metrics)) +} + +func TestPushMetricsOutputs(t *testing.T) { + testCases := []struct { + name string + inputMetric []telegraf.Metric + metricsGrouping string + createTables bool + ingestionType string + }{ + { + name: "Valid metric", + inputMetric: testutil.MockMetrics(), + createTables: true, + metricsGrouping: TablePerMetric, + }, + { + name: "Don't create tables'", + inputMetric: testutil.MockMetrics(), + createTables: false, + metricsGrouping: TablePerMetric, + }, + { + name: "SingleTable metric grouping type", + inputMetric: testutil.MockMetrics(), + createTables: true, + metricsGrouping: SingleTable, + }, + { + name: "Valid metric managed ingestion", + inputMetric: testutil.MockMetrics(), + createTables: true, + metricsGrouping: TablePerMetric, + ingestionType: ManagedIngestion, + }, + } + var expectedMetric = map[string]interface{}{ + "metricName": "test1", + "fields": map[string]interface{}{ + "value": 1.0, + }, + "tags": map[string]interface{}{ + "tag1": "value1", + }, + "timestamp": float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).UnixNano() / int64(time.Second)), + } + for _, tC := range testCases { + t.Run(tC.name, func(t *testing.T) { + ingestionType := "queued" + if tC.ingestionType != "" { + ingestionType = tC.ingestionType + } + + serializer := &serializers_json.Serializer{ + TimestampUnits: config.Duration(time.Nanosecond), + TimestampFormat: time.RFC3339Nano, + } + + cfg := &Config{ + Endpoint: "https://someendpoint.kusto.net", + Database: "databasename", + MetricsGrouping: tC.metricsGrouping, + TableName: "test1", + CreateTables: tC.createTables, + IngestionType: ingestionType, + Timeout: config.Duration(20 * time.Second), + } + client, err := cfg.NewClient("telegraf", &testutil.Logger{}) + require.NoError(t, err) + + // Inject the ingestor + ingestor := &fakeIngestor{} + client.ingestors["test1"] = ingestor + + tableMetricGroups := make(map[string][]byte) + mockmetrics := testutil.MockMetrics() + for _, m := range mockmetrics { + metricInBytes, err := serializer.Serialize(m) + require.NoError(t, err) + tableMetricGroups[m.Name()] = append(tableMetricGroups[m.Name()], metricInBytes...) + } + + format := ingest.FileFormat(ingest.JSON) + for tableName, tableMetrics := range tableMetricGroups { + require.NoError(t, client.PushMetrics(format, tableName, tableMetrics)) + createdFakeIngestor := ingestor + require.EqualValues(t, expectedMetric["metricName"], createdFakeIngestor.actualOutputMetric["name"]) + require.EqualValues(t, expectedMetric["fields"], createdFakeIngestor.actualOutputMetric["fields"]) + require.EqualValues(t, expectedMetric["tags"], createdFakeIngestor.actualOutputMetric["tags"]) + timestampStr := createdFakeIngestor.actualOutputMetric["timestamp"].(string) + parsedTime, err := time.Parse(time.RFC3339Nano, timestampStr) + parsedTimeFloat := float64(parsedTime.UnixNano()) / 1e9 + require.NoError(t, err) + require.InDelta(t, expectedMetric["timestamp"].(float64), parsedTimeFloat, testutil.DefaultDelta) + } + }) + } +} + +func TestAlreadyClosed(t *testing.T) { + plugin := Client{ + logger: testutil.Logger{}, + cfg: &Config{ + IngestionType: QueuedIngestion, + }, + client: kusto.NewMockClient(), + } + require.NoError(t, plugin.Close()) +} + +type fakeIngestor struct { + actualOutputMetric map[string]interface{} +} + +func (f *fakeIngestor) FromReader(_ context.Context, reader io.Reader, _ ...ingest.FileOption) (*ingest.Result, error) { + scanner := bufio.NewScanner(reader) + scanner.Scan() + firstLine := scanner.Text() + err := json.Unmarshal([]byte(firstLine), &f.actualOutputMetric) + if err != nil { + return nil, err + } + return &ingest.Result{}, nil +} + +func (*fakeIngestor) FromFile(_ context.Context, _ string, _ ...ingest.FileOption) (*ingest.Result, error) { + return &ingest.Result{}, nil +} + +func (*fakeIngestor) Close() error { + return nil +} diff --git a/plugins/outputs/azure_data_explorer/azure_data_explorer.go b/plugins/outputs/azure_data_explorer/azure_data_explorer.go index 1c6cf4f1e..f48b757f5 100644 --- a/plugins/outputs/azure_data_explorer/azure_data_explorer.go +++ b/plugins/outputs/azure_data_explorer/azure_data_explorer.go @@ -2,23 +2,15 @@ package azure_data_explorer import ( - "bytes" - "context" _ "embed" - "errors" "fmt" - "strings" "time" - "github.com/Azure/azure-kusto-go/kusto" - kustoerrors "github.com/Azure/azure-kusto-go/kusto/data/errors" "github.com/Azure/azure-kusto-go/kusto/ingest" - "github.com/Azure/azure-kusto-go/kusto/kql" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" - "github.com/influxdata/telegraf/internal" - "github.com/influxdata/telegraf/internal/choice" + common_adx "github.com/influxdata/telegraf/plugins/common/adx" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/serializers/json" ) @@ -27,75 +19,45 @@ import ( var sampleConfig string type AzureDataExplorer struct { - Endpoint string `toml:"endpoint_url"` - Database string `toml:"database"` - Log telegraf.Logger `toml:"-"` - Timeout config.Duration `toml:"timeout"` - MetricsGrouping string `toml:"metrics_grouping_type"` - TableName string `toml:"table_name"` - CreateTables bool `toml:"create_tables"` - IngestionType string `toml:"ingestion_type"` - serializer telegraf.Serializer - kustoClient *kusto.Client - metricIngestors map[string]ingest.Ingestor + Log telegraf.Logger `toml:"-"` + common_adx.Config + + serializer telegraf.Serializer + client *common_adx.Client } -const ( - tablePerMetric = "tablepermetric" - singleTable = "singletable" - // These control the amount of memory we use when ingesting blobs - bufferSize = 1 << 20 // 1 MiB - maxBuffers = 5 -) - -const managedIngestion = "managed" -const queuedIngestion = "queued" - func (*AzureDataExplorer) SampleConfig() string { return sampleConfig } // Initialize the client and the ingestor -func (adx *AzureDataExplorer) Connect() error { - conn := kusto.NewConnectionStringBuilder(adx.Endpoint).WithDefaultAzureCredential() - // Since init is called before connect, we can set the connector details here including the type. This will be used for telemetry and tracing. - conn.SetConnectorDetails("Telegraf", internal.ProductToken(), "", "", false, "") - client, err := kusto.New(conn) - if err != nil { +func (adx *AzureDataExplorer) Init() error { + serializer := &json.Serializer{ + TimestampUnits: config.Duration(time.Nanosecond), + TimestampFormat: time.RFC3339Nano, + } + if err := serializer.Init(); err != nil { return err } - adx.kustoClient = client - adx.metricIngestors = make(map[string]ingest.Ingestor) + adx.serializer = serializer + return nil +} +func (adx *AzureDataExplorer) Connect() error { + var err error + if adx.client, err = adx.Config.NewClient("Kusto.Telegraf", adx.Log); err != nil { + return fmt.Errorf("creating new client failed: %w", err) + } return nil } // Clean up and close the ingestor func (adx *AzureDataExplorer) Close() error { - var errs []error - for _, v := range adx.metricIngestors { - if err := v.Close(); err != nil { - // accumulate errors while closing ingestors - errs = append(errs, err) - } - } - if err := adx.kustoClient.Close(); err != nil { - errs = append(errs, err) - } - - adx.kustoClient = nil - adx.metricIngestors = nil - - if len(errs) == 0 { - adx.Log.Info("Closed ingestors and client") - return nil - } - // Combine errors into a single object and return the combined error - return kustoerrors.GetCombinedError(errs...) + return adx.client.Close() } func (adx *AzureDataExplorer) Write(metrics []telegraf.Metric) error { - if adx.MetricsGrouping == tablePerMetric { + if adx.MetricsGrouping == common_adx.TablePerMetric { return adx.writeTablePerMetric(metrics) } return adx.writeSingleTable(metrics) @@ -116,14 +78,11 @@ func (adx *AzureDataExplorer) writeTablePerMetric(metrics []telegraf.Metric) err tableMetricGroups[tableName] = metricInBytes } } - ctx := context.Background() - ctx, cancel := context.WithTimeout(ctx, time.Duration(adx.Timeout)) - defer cancel() // Push the metrics for each table format := ingest.FileFormat(ingest.JSON) for tableName, tableMetrics := range tableMetricGroups { - if err := adx.pushMetrics(ctx, format, tableName, tableMetrics); err != nil { + if err := adx.client.PushMetrics(format, tableName, tableMetrics); err != nil { return err } } @@ -142,146 +101,18 @@ func (adx *AzureDataExplorer) writeSingleTable(metrics []telegraf.Metric) error metricsArray = append(metricsArray, metricsInBytes...) } - ctx := context.Background() - ctx, cancel := context.WithTimeout(ctx, time.Duration(adx.Timeout)) - defer cancel() - // push metrics to a single table format := ingest.FileFormat(ingest.JSON) - err := adx.pushMetrics(ctx, format, adx.TableName, metricsArray) + err := adx.client.PushMetrics(format, adx.TableName, metricsArray) return err } -func (adx *AzureDataExplorer) pushMetrics(ctx context.Context, format ingest.FileOption, tableName string, metricsArray []byte) error { - var metricIngestor ingest.Ingestor - var err error - - metricIngestor, err = adx.getMetricIngestor(ctx, tableName) - if err != nil { - return err - } - - length := len(metricsArray) - adx.Log.Debugf("Writing %d metrics to table %q", length, tableName) - reader := bytes.NewReader(metricsArray) - mapping := ingest.IngestionMappingRef(tableName+"_mapping", ingest.JSON) - if metricIngestor != nil { - if _, err := metricIngestor.FromReader(ctx, reader, format, mapping); err != nil { - adx.Log.Errorf("sending ingestion request to Azure Data Explorer for table %q failed: %v", tableName, err) - } - } - return nil -} - -func (adx *AzureDataExplorer) getMetricIngestor(ctx context.Context, tableName string) (ingest.Ingestor, error) { - ingestor := adx.metricIngestors[tableName] - - if ingestor == nil { - if err := adx.createAzureDataExplorerTable(ctx, tableName); err != nil { - return nil, fmt.Errorf("creating table for %q failed: %w", tableName, err) - } - // create a new ingestor client for the table - tempIngestor, err := createIngestorByTable(adx.kustoClient, adx.Database, tableName, adx.IngestionType) - if err != nil { - return nil, fmt.Errorf("creating ingestor for %q failed: %w", tableName, err) - } - adx.metricIngestors[tableName] = tempIngestor - adx.Log.Debugf("Ingestor for table %s created", tableName) - ingestor = tempIngestor - } - return ingestor, nil -} - -func (adx *AzureDataExplorer) createAzureDataExplorerTable(ctx context.Context, tableName string) error { - if !adx.CreateTables { - adx.Log.Info("skipped table creation") - return nil - } - - if _, err := adx.kustoClient.Mgmt(ctx, adx.Database, createTableCommand(tableName)); err != nil { - return err - } - - if _, err := adx.kustoClient.Mgmt(ctx, adx.Database, createTableMappingCommand(tableName)); err != nil { - return err - } - - return nil -} - -func (adx *AzureDataExplorer) Init() error { - if adx.Endpoint == "" { - return errors.New("endpoint configuration cannot be empty") - } - if adx.Database == "" { - return errors.New("database configuration cannot be empty") - } - - adx.MetricsGrouping = strings.ToLower(adx.MetricsGrouping) - if adx.MetricsGrouping == singleTable && adx.TableName == "" { - return errors.New("table name cannot be empty for SingleTable metrics grouping type") - } - if adx.MetricsGrouping == "" { - adx.MetricsGrouping = tablePerMetric - } - if !(adx.MetricsGrouping == singleTable || adx.MetricsGrouping == tablePerMetric) { - return errors.New("metrics grouping type is not valid") - } - - if adx.IngestionType == "" { - adx.IngestionType = queuedIngestion - } else if !(choice.Contains(adx.IngestionType, []string{managedIngestion, queuedIngestion})) { - return fmt.Errorf("unknown ingestion type %q", adx.IngestionType) - } - - serializer := &json.Serializer{ - TimestampUnits: config.Duration(time.Nanosecond), - TimestampFormat: time.RFC3339Nano, - } - if err := serializer.Init(); err != nil { - return err - } - adx.serializer = serializer - return nil -} - func init() { outputs.Add("azure_data_explorer", func() telegraf.Output { return &AzureDataExplorer{ - Timeout: config.Duration(20 * time.Second), - CreateTables: true, + Config: common_adx.Config{ + CreateTables: true, + Timeout: config.Duration(20 * time.Second)}, } }) } - -// For each table create the ingestor -func createIngestorByTable(client *kusto.Client, database, tableName, ingestionType string) (ingest.Ingestor, error) { - switch strings.ToLower(ingestionType) { - case managedIngestion: - mi, err := ingest.NewManaged(client, database, tableName) - return mi, err - case queuedIngestion: - qi, err := ingest.New(client, database, tableName, ingest.WithStaticBuffer(bufferSize, maxBuffers)) - return qi, err - } - return nil, fmt.Errorf(`ingestion_type has to be one of %q or %q`, managedIngestion, queuedIngestion) -} - -func createTableCommand(table string) kusto.Statement { - builder := kql.New(`.create-merge table ['`).AddTable(table).AddLiteral(`'] `) - builder.AddLiteral(`(['fields']:dynamic, ['name']:string, ['tags']:dynamic, ['timestamp']:datetime);`) - - return builder -} - -func createTableMappingCommand(table string) kusto.Statement { - builder := kql.New(`.create-or-alter table ['`).AddTable(table).AddLiteral(`'] `) - builder.AddLiteral(`ingestion json mapping '`).AddTable(table + "_mapping").AddLiteral(`' `) - builder.AddLiteral(`'[{"column":"fields", `) - builder.AddLiteral(`"Properties":{"Path":"$[\'fields\']"}},{"column":"name", `) - builder.AddLiteral(`"Properties":{"Path":"$[\'name\']"}},{"column":"tags", `) - builder.AddLiteral(`"Properties":{"Path":"$[\'tags\']"}},{"column":"timestamp", `) - builder.AddLiteral(`"Properties":{"Path":"$[\'timestamp\']"}}]'`) - - return builder -} diff --git a/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go b/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go index 42e2221cf..79fa4895d 100644 --- a/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go +++ b/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go @@ -1,369 +1,30 @@ package azure_data_explorer import ( - "bufio" - "bytes" - "context" - "encoding/json" - "io" - "log" - "os" - "strings" "testing" - "time" - "github.com/Azure/azure-kusto-go/kusto" - "github.com/Azure/azure-kusto-go/kusto/ingest" "github.com/stretchr/testify/require" - "github.com/influxdata/telegraf" - serializers_json "github.com/influxdata/telegraf/plugins/serializers/json" + common_adx "github.com/influxdata/telegraf/plugins/common/adx" "github.com/influxdata/telegraf/testutil" ) -func TestWrite(t *testing.T) { - testCases := []struct { - name string - inputMetric []telegraf.Metric - metricsGrouping string - tableName string - expected map[string]interface{} - expectedWriteError string - createTables bool - ingestionType string - }{ - { - name: "Valid metric", - inputMetric: testutil.MockMetrics(), - createTables: true, - tableName: "test1", - metricsGrouping: tablePerMetric, - expected: map[string]interface{}{ - "metricName": "test1", - "fields": map[string]interface{}{ - "value": 1.0, - }, - "tags": map[string]interface{}{ - "tag1": "value1", - }, - "timestamp": float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).UnixNano() / int64(time.Second)), - }, - }, - { - name: "Don't create tables'", - inputMetric: testutil.MockMetrics(), - createTables: false, - tableName: "test1", - metricsGrouping: tablePerMetric, - expected: map[string]interface{}{ - "metricName": "test1", - "fields": map[string]interface{}{ - "value": 1.0, - }, - "tags": map[string]interface{}{ - "tag1": "value1", - }, - "timestamp": float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).UnixNano() / int64(time.Second)), - }, - }, - { - name: "SingleTable metric grouping type", - inputMetric: testutil.MockMetrics(), - createTables: true, - tableName: "test1", - metricsGrouping: singleTable, - expected: map[string]interface{}{ - "metricName": "test1", - "fields": map[string]interface{}{ - "value": 1.0, - }, - "tags": map[string]interface{}{ - "tag1": "value1", - }, - "timestamp": float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).UnixNano() / int64(time.Second)), - }, - }, - { - name: "Valid metric managed ingestion", - inputMetric: testutil.MockMetrics(), - createTables: true, - tableName: "test1", - metricsGrouping: tablePerMetric, - ingestionType: managedIngestion, - expected: map[string]interface{}{ - "metricName": "test1", - "fields": map[string]interface{}{ - "value": 1.0, - }, - "tags": map[string]interface{}{ - "tag1": "value1", - }, - "timestamp": float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).UnixNano() / int64(time.Second)), - }, - }, - } - - for _, tC := range testCases { - t.Run(tC.name, func(t *testing.T) { - serializer := &serializers_json.Serializer{} - require.NoError(t, serializer.Init()) - - ingestionType := "queued" - if tC.ingestionType != "" { - ingestionType = tC.ingestionType - } - - localFakeIngestor := &fakeIngestor{} - plugin := AzureDataExplorer{ - Endpoint: "someendpoint", - Database: "databasename", - Log: testutil.Logger{}, - MetricsGrouping: tC.metricsGrouping, - TableName: tC.tableName, - CreateTables: tC.createTables, - kustoClient: kusto.NewMockClient(), - metricIngestors: map[string]ingest.Ingestor{ - tC.tableName: localFakeIngestor, - }, - serializer: serializer, - IngestionType: ingestionType, - } - - errorInWrite := plugin.Write(testutil.MockMetrics()) - - if tC.expectedWriteError != "" { - require.EqualError(t, errorInWrite, tC.expectedWriteError) - } else { - require.NoError(t, errorInWrite) - - expectedNameOfMetric := tC.expected["metricName"].(string) - - createdFakeIngestor := localFakeIngestor - - require.Equal(t, expectedNameOfMetric, createdFakeIngestor.actualOutputMetric["name"]) - - expectedFields := tC.expected["fields"].(map[string]interface{}) - require.Equal(t, expectedFields, createdFakeIngestor.actualOutputMetric["fields"]) - - expectedTags := tC.expected["tags"].(map[string]interface{}) - require.Equal(t, expectedTags, createdFakeIngestor.actualOutputMetric["tags"]) - - expectedTime := tC.expected["timestamp"].(float64) - require.InDelta(t, expectedTime, createdFakeIngestor.actualOutputMetric["timestamp"], testutil.DefaultDelta) - } - }) - } -} - -func TestCreateAzureDataExplorerTable(t *testing.T) { - serializer := &serializers_json.Serializer{} - require.NoError(t, serializer.Init()) +func TestInit(t *testing.T) { plugin := AzureDataExplorer{ - Endpoint: "someendpoint", - Database: "databasename", - Log: testutil.Logger{}, - MetricsGrouping: tablePerMetric, - TableName: "test1", - CreateTables: false, - kustoClient: kusto.NewMockClient(), - metricIngestors: map[string]ingest.Ingestor{ - "test1": &fakeIngestor{}, + Log: testutil.Logger{}, + client: &common_adx.Client{}, + Config: common_adx.Config{ + Endpoint: "someendpoint", }, - serializer: serializer, - IngestionType: queuedIngestion, } - var buf bytes.Buffer - log.SetOutput(&buf) - defer func() { - log.SetOutput(os.Stderr) - }() - err := plugin.createAzureDataExplorerTable(t.Context(), "test1") - - output := buf.String() - - if err == nil && !strings.Contains(output, "skipped table creation") { - t.Logf("FAILED : TestCreateAzureDataExplorerTable: Should have skipped table creation.") - t.Fail() - } + err := plugin.Init() + require.NoError(t, err) } -func TestWriteWithType(t *testing.T) { - metricName := "test1" - fakeClient := kusto.NewMockClient() - expectedResultMap := map[string]string{metricName: `{"fields":{"value":1},"name":"test1","tags":{"tag1":"value1"},"timestamp":1257894000}`} - mockMetrics := testutil.MockMetrics() - // Multi tables - mockMetricsMulti := []telegraf.Metric{ - testutil.TestMetric(1.0, "test2"), - testutil.TestMetric(2.0, "test3"), - } - expectedResultMap2 := map[string]string{ - "test2": `{"fields":{"value":1.0},"name":"test2","tags":{"tag1":"value1"},"timestamp":1257894000}`, - "test3": `{"fields":{"value":2.0},"name":"test3","tags":{"tag1":"value1"},"timestamp":1257894000}`, - } - // List of tests - testCases := []struct { - name string - inputMetric []telegraf.Metric - metricsGrouping string - tableNameToExpectedResult map[string]string - expectedWriteError string - createTables bool - ingestionType string - }{ - { - name: "Valid metric", - inputMetric: mockMetrics, - createTables: true, - metricsGrouping: tablePerMetric, - tableNameToExpectedResult: expectedResultMap, - }, - { - name: "Don't create tables'", - inputMetric: mockMetrics, - createTables: false, - metricsGrouping: tablePerMetric, - tableNameToExpectedResult: expectedResultMap, - }, - { - name: "SingleTable metric grouping type", - inputMetric: mockMetrics, - createTables: true, - metricsGrouping: singleTable, - tableNameToExpectedResult: expectedResultMap, - }, - { - name: "Valid metric managed ingestion", - inputMetric: mockMetrics, - createTables: true, - metricsGrouping: tablePerMetric, - tableNameToExpectedResult: expectedResultMap, - ingestionType: managedIngestion, - }, - { - name: "Table per metric type", - inputMetric: mockMetricsMulti, - createTables: true, - metricsGrouping: tablePerMetric, - tableNameToExpectedResult: expectedResultMap2, - }, - } - for _, testCase := range testCases { - t.Run(testCase.name, func(t *testing.T) { - serializer := &serializers_json.Serializer{} - require.NoError(t, serializer.Init()) - for tableName, jsonValue := range testCase.tableNameToExpectedResult { - ingestionType := "queued" - if testCase.ingestionType != "" { - ingestionType = testCase.ingestionType - } - mockIngestor := &mockIngestor{} - plugin := AzureDataExplorer{ - Endpoint: "someendpoint", - Database: "databasename", - Log: testutil.Logger{}, - IngestionType: ingestionType, - MetricsGrouping: testCase.metricsGrouping, - TableName: tableName, - CreateTables: testCase.createTables, - kustoClient: fakeClient, - metricIngestors: map[string]ingest.Ingestor{ - tableName: mockIngestor, - }, - serializer: serializer, - } - err := plugin.Write(testCase.inputMetric) - if testCase.expectedWriteError != "" { - require.EqualError(t, err, testCase.expectedWriteError) - continue - } - require.NoError(t, err) - createdIngestor := plugin.metricIngestors[tableName] - if testCase.metricsGrouping == singleTable { - createdIngestor = plugin.metricIngestors[tableName] - } - records := mockIngestor.records[0] // the first element - require.NotNil(t, createdIngestor) - require.JSONEq(t, jsonValue, records) - } - }) - } -} - -func TestInitBlankEndpointData(t *testing.T) { +func TestConnectBlankEndpointData(t *testing.T) { plugin := AzureDataExplorer{ - Log: testutil.Logger{}, - kustoClient: kusto.NewMockClient(), - metricIngestors: map[string]ingest.Ingestor{}, + Log: testutil.Logger{}, } - - errorInit := plugin.Init() - require.Error(t, errorInit) - require.Equal(t, "endpoint configuration cannot be empty", errorInit.Error()) -} - -func TestQueryConstruction(t *testing.T) { - const tableName = "mytable" - const expectedCreate = `.create-merge table ['mytable'] (['fields']:dynamic, ['name']:string, ['tags']:dynamic, ['timestamp']:datetime);` - const expectedMapping = `` + - `.create-or-alter table ['mytable'] ingestion json mapping 'mytable_mapping' '[{"column":"fields", ` + - `"Properties":{"Path":"$[\'fields\']"}},{"column":"name", "Properties":{"Path":"$[\'name\']"}},{"column":"tags", ` + - `"Properties":{"Path":"$[\'tags\']"}},{"column":"timestamp", "Properties":{"Path":"$[\'timestamp\']"}}]'` - require.Equal(t, expectedCreate, createTableCommand(tableName).String()) - require.Equal(t, expectedMapping, createTableMappingCommand(tableName).String()) -} - -type fakeIngestor struct { - actualOutputMetric map[string]interface{} -} - -func (f *fakeIngestor) FromReader(_ context.Context, reader io.Reader, _ ...ingest.FileOption) (*ingest.Result, error) { - scanner := bufio.NewScanner(reader) - scanner.Scan() - firstLine := scanner.Text() - err := json.Unmarshal([]byte(firstLine), &f.actualOutputMetric) - if err != nil { - return nil, err - } - return &ingest.Result{}, nil -} - -func (*fakeIngestor) FromFile(context.Context, string, ...ingest.FileOption) (*ingest.Result, error) { - return &ingest.Result{}, nil -} - -func (*fakeIngestor) Close() error { - return nil -} - -type mockIngestor struct { - records []string -} - -func (m *mockIngestor) FromReader(_ context.Context, reader io.Reader, _ ...ingest.FileOption) (*ingest.Result, error) { - bufbytes, err := io.ReadAll(reader) - if err != nil { - return nil, err - } - metricjson := string(bufbytes) - m.SetRecords(strings.Split(metricjson, "\n")) - return &ingest.Result{}, nil -} - -func (*mockIngestor) FromFile(context.Context, string, ...ingest.FileOption) (*ingest.Result, error) { - return &ingest.Result{}, nil -} - -func (m *mockIngestor) SetRecords(records []string) { - m.records = records -} - -// Name receives a copy of Foo since it doesn't need to modify it. -func (m *mockIngestor) Records() []string { - return m.records -} - -func (*mockIngestor) Close() error { - return nil + require.ErrorContains(t, plugin.Connect(), "endpoint configuration cannot be empty") }