From 9485e9040eb94c962ae16527d9ca420d4e4106e8 Mon Sep 17 00:00:00 2001 From: asaharn <102726227+asaharn@users.noreply.github.com> Date: Mon, 3 Oct 2022 21:00:13 +0530 Subject: [PATCH] feat(outputs.azure_data_explorer): Added support for streaming ingestion for ADX output plugin (#11874) --- plugins/outputs/azure_data_explorer/README.md | 18 +++ .../azure_data_explorer.go | 140 ++++++++++++++++-- .../azure_data_explorer_test.go | 131 ++++++++++++++++ .../outputs/azure_data_explorer/sample.conf | 6 + 4 files changed, 281 insertions(+), 14 deletions(-) diff --git a/plugins/outputs/azure_data_explorer/README.md b/plugins/outputs/azure_data_explorer/README.md index 35cc7489a..b29261935 100644 --- a/plugins/outputs/azure_data_explorer/README.md +++ b/plugins/outputs/azure_data_explorer/README.md @@ -41,6 +41,12 @@ of logs, metrics and time series data. ## Creates tables and relevant mapping if set to true(default). ## Skips table and mapping creation if set to false, this is useful for running Telegraf with the lowest possible permissions i.e. table ingestor role. # create_tables = true + + ## Ingestion method to use. + ## Available options are + ## - managed -- streaming ingestion with fallback to batched ingestion or the "queued" method below + ## - queued -- queue up metrics data and process sequentially + # ingestion_type = "queued" ``` ## Metrics Grouping @@ -93,6 +99,18 @@ The corresponding table mapping would be like the following: **Note**: This plugin will automatically create Azure Data Explorer tables and corresponding table mapping as per the above mentioned commands. +## Ingestion type + +**Note**: +[Streaming ingestion](https://aka.ms/AAhlg6s) +has to be enabled on ADX [configure the ADX cluster] +in case of `managed` option. +Refer the query below to check if streaming is enabled + +```kql +.show database policy streamingingestion +``` + ## Authentiation ### Supported Authentication Methods diff --git a/plugins/outputs/azure_data_explorer/azure_data_explorer.go b/plugins/outputs/azure_data_explorer/azure_data_explorer.go index 74b3f69e3..12500709e 100644 --- a/plugins/outputs/azure_data_explorer/azure_data_explorer.go +++ b/plugins/outputs/azure_data_explorer/azure_data_explorer.go @@ -12,12 +12,14 @@ import ( "time" "github.com/Azure/azure-kusto-go/kusto" + kustoerrors "github.com/Azure/azure-kusto-go/kusto/data/errors" "github.com/Azure/azure-kusto-go/kusto/ingest" "github.com/Azure/azure-kusto-go/kusto/unsafe" "github.com/Azure/go-autorest/autorest/azure/auth" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal/choice" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/telegraf/plugins/serializers/json" @@ -34,10 +36,17 @@ type AzureDataExplorer struct { MetricsGrouping string `toml:"metrics_grouping_type"` TableName string `toml:"table_name"` CreateTables bool `toml:"create_tables"` - client localClient - ingesters map[string]localIngestor - serializer serializers.Serializer - createIngestor ingestorFactory + IngestionType string `toml:"ingestion_type"` + //Deprecated: client of type *kusto.Client, ingestors of type ingest.Ingestor introduced + client localClient + ingesters map[string]localIngestor + /***/ + serializer serializers.Serializer + //Deprecated + createIngestor ingestorFactory + /***/ + kustoClient *kusto.Client + metricIngestors map[string]ingest.Ingestor } const ( @@ -60,11 +69,14 @@ type ingestorFactory func(localClient, string, string) (localIngestor, error) const createTableCommand = `.create-merge table ['%s'] (['fields']:dynamic, ['name']:string, ['tags']:dynamic, ['timestamp']:datetime);` const createTableMappingCommand = `.create-or-alter table ['%s'] ingestion json mapping '%s_mapping' '[{"column":"fields", "Properties":{"Path":"$[\'fields\']"}},{"column":"name", "Properties":{"Path":"$[\'name\']"}},{"column":"tags", "Properties":{"Path":"$[\'tags\']"}},{"column":"timestamp", "Properties":{"Path":"$[\'timestamp\']"}}]'` +const managedIngestion = "managed" +const queuedIngestion = "queued" func (*AzureDataExplorer) SampleConfig() string { return sampleConfig } +// Initialize the client and the ingestor func (adx *AzureDataExplorer) Connect() error { authorizer, err := auth.NewAuthorizerFromEnvironmentWithResource(adx.Endpoint) if err != nil { @@ -78,18 +90,45 @@ func (adx *AzureDataExplorer) Connect() error { if err != nil { return err } + adx.kustoClient = client + adx.metricIngestors = make(map[string]ingest.Ingestor) + //Depticated adx.client = client adx.ingesters = make(map[string]localIngestor) adx.createIngestor = createRealIngestor + /***/ return nil } +// Clean up and close the ingestor func (adx *AzureDataExplorer) Close() error { + var errs []error + for _, v := range adx.metricIngestors { + if err := v.Close(); err != nil { + // accumulate errors while closing ingestors + errs = append(errs, err) + } + } + if err := adx.kustoClient.Close(); err != nil { + errs = append(errs, err) + } + + adx.kustoClient = nil + adx.metricIngestors = nil + + if len(errs) == 0 { + adx.Log.Info("Closed ingestors and client") + return nil + } + + //Deprecated adx.client = nil adx.ingesters = nil + /***/ - return nil + // Combine errors into a single object and return the combined error + return kustoerrors.GetCombinedError(errs...) } func (adx *AzureDataExplorer) Write(metrics []telegraf.Metric) error { @@ -151,19 +190,58 @@ func (adx *AzureDataExplorer) writeSingleTable(metrics []telegraf.Metric) error } func (adx *AzureDataExplorer) pushMetrics(ctx context.Context, format ingest.FileOption, tableName string, metricsArray []byte) error { - ingestor, err := adx.getIngestor(ctx, tableName) - if err != nil { - return err + var ingestor localIngestor + var metricIngestor ingest.Ingestor + var err error + if adx.client != nil && adx.createIngestor != nil { + ingestor, err = adx.getIngestor(ctx, tableName) + if err != nil { + return err + } + } else { + metricIngestor, err = adx.getMetricIngestor(ctx, tableName) + if err != nil { + return err + } } + length := len(metricsArray) + adx.Log.Debugf("Writing %s metrics to table %q", length, tableName) reader := bytes.NewReader(metricsArray) mapping := ingest.IngestionMappingRef(fmt.Sprintf("%s_mapping", tableName), ingest.JSON) - if _, err := ingestor.FromReader(ctx, reader, format, mapping); err != nil { - adx.Log.Errorf("sending ingestion request to Azure Data Explorer for table %q failed: %v", tableName, err) + if ingestor != nil { + //Deprecated + if _, err := ingestor.FromReader(ctx, reader, format, mapping); err != nil { + adx.Log.Errorf("sending ingestion request to Azure Data Explorer for table %q failed: %v", tableName, err) + } + } else if metricIngestor != nil { + if _, err := metricIngestor.FromReader(ctx, reader, format, mapping); err != nil { + adx.Log.Errorf("sending ingestion request to Azure Data Explorer for table %q failed: %v", tableName, err) + } } return nil } +func (adx *AzureDataExplorer) getMetricIngestor(ctx context.Context, tableName string) (ingest.Ingestor, error) { + ingestor := adx.metricIngestors[tableName] + + if ingestor == nil { + if err := adx.createAzureDataExplorerTable(ctx, tableName); err != nil { + return nil, fmt.Errorf("creating table for %q failed: %v", tableName, err) + } + //create a new ingestor client for the table + tempIngestor, err := createIngestorByTable(adx.kustoClient, adx.Database, tableName, adx.IngestionType) + if err != nil { + return nil, fmt.Errorf("creating ingestor for %q failed: %v", tableName, err) + } + adx.metricIngestors[tableName] = tempIngestor + adx.Log.Debugf("Ingestor for table %s created", tableName) + ingestor = tempIngestor + } + return ingestor, nil +} + +// Deprecated: getMetricIngestor introduced to use inget.Ingestor instead of localIngestor func (adx *AzureDataExplorer) getIngestor(ctx context.Context, tableName string) (localIngestor, error) { ingestor := adx.ingesters[tableName] @@ -182,19 +260,33 @@ func (adx *AzureDataExplorer) getIngestor(ctx context.Context, tableName string) return ingestor, nil } +/***/ + func (adx *AzureDataExplorer) createAzureDataExplorerTable(ctx context.Context, tableName string) error { if !adx.CreateTables { adx.Log.Info("skipped table creation") return nil } createStmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(fmt.Sprintf(createTableCommand, tableName)) - if _, err := adx.client.Mgmt(ctx, adx.Database, createStmt); err != nil { - return err + if adx.client != nil { + if _, err := adx.client.Mgmt(ctx, adx.Database, createStmt); err != nil { + return err + } + } else if adx.kustoClient != nil { + if _, err := adx.kustoClient.Mgmt(ctx, adx.Database, createStmt); err != nil { + return err + } } createTableMappingstmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(fmt.Sprintf(createTableMappingCommand, tableName, tableName)) - if _, err := adx.client.Mgmt(ctx, adx.Database, createTableMappingstmt); err != nil { - return err + if adx.client != nil { + if _, err := adx.client.Mgmt(ctx, adx.Database, createTableMappingstmt); err != nil { + return err + } + } else if adx.kustoClient != nil { + if _, err := adx.kustoClient.Mgmt(ctx, adx.Database, createTableMappingstmt); err != nil { + return err + } } return nil @@ -219,6 +311,12 @@ func (adx *AzureDataExplorer) Init() error { return errors.New("Metrics grouping type is not valid") } + if adx.IngestionType == "" { + adx.IngestionType = queuedIngestion + } else if !(choice.Contains(adx.IngestionType, []string{managedIngestion, queuedIngestion})) { + return fmt.Errorf("unknown ingestion type %q", adx.IngestionType) + } + serializer, err := json.NewSerializer(time.Nanosecond, time.RFC3339Nano, "") if err != nil { return err @@ -236,6 +334,7 @@ func init() { }) } +// Deprecated: createIngestorByTable should be used with ingestionType and ingest.Ingestor func createRealIngestor(client localClient, database string, tableName string) (localIngestor, error) { ingestor, err := ingest.New(client.(*kusto.Client), database, tableName, ingest.WithStaticBuffer(bufferSize, maxBuffers)) if ingestor != nil { @@ -243,3 +342,16 @@ func createRealIngestor(client localClient, database string, tableName string) ( } return nil, err } + +// For each table create the ingestor +func createIngestorByTable(client *kusto.Client, database string, tableName string, ingestionType string) (ingest.Ingestor, error) { + switch strings.ToLower(ingestionType) { + case managedIngestion: + mi, err := ingest.NewManaged(client, database, tableName) + return mi, err + case queuedIngestion: + qi, err := ingest.New(client, database, tableName, ingest.WithStaticBuffer(bufferSize, maxBuffers)) + return qi, err + } + return nil, fmt.Errorf(`ingestion_type has to be one of %q or %q`, managedIngestion, queuedIngestion) +} diff --git a/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go b/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go index 83a09fa4f..83a1b1324 100644 --- a/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go +++ b/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io" + "strings" "testing" "time" @@ -193,6 +194,108 @@ func TestWrite(t *testing.T) { } } +/***/ + +func TestWriteWithType(t *testing.T) { + metricName := "test1" + fakeClient := kusto.NewMockClient() + expectedResultMap := map[string]string{metricName: `{"fields":{"value":1},"name":"test1","tags":{"tag1":"value1"},"timestamp":1257894000}`} + mockMetrics := testutil.MockMetrics() + // Multi tables + mockMetricsMulti := []telegraf.Metric{ + testutil.TestMetric(1.0, "test2"), + testutil.TestMetric(2.0, "test3"), + } + expectedResultMap2 := map[string]string{"test2": `{"fields":{"value":1.0},"name":"test2","tags":{"tag1":"value1"},"timestamp":1257894000}`, "test3": `{"fields":{"value":2.0},"name":"test3","tags":{"tag1":"value1"},"timestamp":1257894000}`} + // List of tests + testCases := []struct { + name string + inputMetric []telegraf.Metric + metricsGrouping string + tableNameToExpectedResult map[string]string + expectedWriteError string + createTables bool + ingestionType string + }{ + { + name: "Valid metric", + inputMetric: mockMetrics, + createTables: true, + metricsGrouping: tablePerMetric, + tableNameToExpectedResult: expectedResultMap, + }, + { + name: "Don't create tables'", + inputMetric: mockMetrics, + createTables: false, + metricsGrouping: tablePerMetric, + tableNameToExpectedResult: expectedResultMap, + }, + { + name: "SingleTable metric grouping type", + inputMetric: mockMetrics, + createTables: true, + metricsGrouping: singleTable, + tableNameToExpectedResult: expectedResultMap, + }, + { + name: "Valid metric managed ingestion", + inputMetric: mockMetrics, + createTables: true, + metricsGrouping: tablePerMetric, + tableNameToExpectedResult: expectedResultMap, + ingestionType: managedIngestion, + }, + { + name: "Table per metric type", + inputMetric: mockMetricsMulti, + createTables: true, + metricsGrouping: tablePerMetric, + tableNameToExpectedResult: expectedResultMap2, + }, + } + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + serializer, err := telegrafJson.NewSerializer(time.Second, "", "") + require.NoError(t, err) + for tableName, jsonValue := range testCase.tableNameToExpectedResult { + ingestionType := "queued" + if testCase.ingestionType != "" { + ingestionType = testCase.ingestionType + } + mockIngestor := &mockIngestor{} + plugin := AzureDataExplorer{ + Endpoint: "someendpoint", + Database: "databasename", + Log: testutil.Logger{}, + IngestionType: ingestionType, + MetricsGrouping: testCase.metricsGrouping, + TableName: tableName, + CreateTables: testCase.createTables, + kustoClient: fakeClient, + metricIngestors: map[string]ingest.Ingestor{ + tableName: mockIngestor, + }, + serializer: serializer, + } + err := plugin.Write(testCase.inputMetric) + if testCase.expectedWriteError != "" { + require.EqualError(t, err, testCase.expectedWriteError) + continue + } + require.NoError(t, err) + createdIngestor := plugin.metricIngestors[tableName] + if testCase.metricsGrouping == singleTable { + createdIngestor = plugin.metricIngestors[tableName] + } + records := mockIngestor.records[0] // the first element + require.NotNil(t, createdIngestor) + require.JSONEq(t, jsonValue, records) + } + }) + } +} + func TestInitBlankEndpoint(t *testing.T) { plugin := AzureDataExplorer{ Log: testutil.Logger{}, @@ -232,3 +335,31 @@ func (f *fakeIngestor) FromReader(_ context.Context, reader io.Reader, _ ...inge } return &ingest.Result{}, nil } + +type mockIngestor struct { + records []string +} + +func (m *mockIngestor) FromReader(ctx context.Context, reader io.Reader, options ...ingest.FileOption) (*ingest.Result, error) { + bufbytes, _ := io.ReadAll(reader) + metricjson := string(bufbytes) + m.SetRecords(strings.Split(metricjson, "\n")) + return &ingest.Result{}, nil +} + +func (m *mockIngestor) FromFile(ctx context.Context, fPath string, options ...ingest.FileOption) (*ingest.Result, error) { + return &ingest.Result{}, nil +} + +func (m *mockIngestor) SetRecords(records []string) { + m.records = records +} + +// Name receives a copy of Foo since it doesn't need to modify it. +func (m *mockIngestor) Records() []string { + return m.records +} + +func (m *mockIngestor) Close() error { + return nil +} diff --git a/plugins/outputs/azure_data_explorer/sample.conf b/plugins/outputs/azure_data_explorer/sample.conf index 5f8965b30..f708c7808 100644 --- a/plugins/outputs/azure_data_explorer/sample.conf +++ b/plugins/outputs/azure_data_explorer/sample.conf @@ -23,3 +23,9 @@ ## Creates tables and relevant mapping if set to true(default). ## Skips table and mapping creation if set to false, this is useful for running Telegraf with the lowest possible permissions i.e. table ingestor role. # create_tables = true + + ## Ingestion method to use. + ## Available options are + ## - managed -- streaming ingestion with fallback to batched ingestion or the "queued" method below + ## - queued -- queue up metrics data and process sequentially + # ingestion_type = "queued"