diff --git a/plugins/outputs/azure_data_explorer/azure_data_explorer.go b/plugins/outputs/azure_data_explorer/azure_data_explorer.go index 002ca236d..23ea1ba66 100644 --- a/plugins/outputs/azure_data_explorer/azure_data_explorer.go +++ b/plugins/outputs/azure_data_explorer/azure_data_explorer.go @@ -7,7 +7,6 @@ import ( _ "embed" "errors" "fmt" - "io" "strings" "time" @@ -37,14 +36,7 @@ type AzureDataExplorer struct { TableName string `toml:"table_name"` CreateTables bool `toml:"create_tables"` IngestionType string `toml:"ingestion_type"` - //Deprecated: client of type *kusto.Client, ingestors of type ingest.Ingestor introduced - client localClient - ingesters map[string]localIngestor - /***/ - serializer serializers.Serializer - //Deprecated - createIngestor ingestorFactory - /***/ + serializer serializers.Serializer kustoClient *kusto.Client metricIngestors map[string]ingest.Ingestor } @@ -57,16 +49,6 @@ const ( maxBuffers = 5 ) -type localIngestor interface { - FromReader(ctx context.Context, reader io.Reader, options ...ingest.FileOption) (*ingest.Result, error) -} - -type localClient interface { - Mgmt(ctx context.Context, db string, query kusto.Stmt, options ...kusto.MgmtOption) (*kusto.RowIterator, error) -} - -type ingestorFactory func(localClient, string, string) (localIngestor, error) - const createTableCommand = `.create-merge table ['%s'] (['fields']:dynamic, ['name']:string, ['tags']:dynamic, ['timestamp']:datetime);` const createTableMappingCommand = `.create-or-alter table ['%s'] ingestion json mapping '%s_mapping' '[{"column":"fields", ` + `"Properties":{"Path":"$[\'fields\']"}},{"column":"name", ` + @@ -96,11 +78,6 @@ func (adx *AzureDataExplorer) Connect() error { } adx.kustoClient = client adx.metricIngestors = make(map[string]ingest.Ingestor) - //Depticated - adx.client = client - adx.ingesters = make(map[string]localIngestor) - adx.createIngestor = createRealIngestor - /***/ return nil } @@ -125,12 +102,6 @@ func (adx *AzureDataExplorer) Close() error { adx.Log.Info("Closed ingestors and client") return nil } - - //Deprecated - adx.client = nil - adx.ingesters = nil - /***/ - // Combine errors into a single object and return the combined error return kustoerrors.GetCombinedError(errs...) } @@ -194,31 +165,19 @@ func (adx *AzureDataExplorer) writeSingleTable(metrics []telegraf.Metric) error } func (adx *AzureDataExplorer) pushMetrics(ctx context.Context, format ingest.FileOption, tableName string, metricsArray []byte) error { - var ingestor localIngestor var metricIngestor ingest.Ingestor var err error - if adx.client != nil && adx.createIngestor != nil { - ingestor, err = adx.getIngestor(ctx, tableName) - if err != nil { - return err - } - } else { - metricIngestor, err = adx.getMetricIngestor(ctx, tableName) - if err != nil { - return err - } + + metricIngestor, err = adx.getMetricIngestor(ctx, tableName) + if err != nil { + return err } length := len(metricsArray) adx.Log.Debugf("Writing %s metrics to table %q", length, tableName) reader := bytes.NewReader(metricsArray) mapping := ingest.IngestionMappingRef(fmt.Sprintf("%s_mapping", tableName), ingest.JSON) - if ingestor != nil { - //Deprecated - if _, err := ingestor.FromReader(ctx, reader, format, mapping); err != nil { - adx.Log.Errorf("sending ingestion request to Azure Data Explorer for table %q failed: %v", tableName, err) - } - } else if metricIngestor != nil { + if metricIngestor != nil { if _, err := metricIngestor.FromReader(ctx, reader, format, mapping); err != nil { adx.Log.Errorf("sending ingestion request to Azure Data Explorer for table %q failed: %v", tableName, err) } @@ -245,54 +204,20 @@ func (adx *AzureDataExplorer) getMetricIngestor(ctx context.Context, tableName s return ingestor, nil } -// Deprecated: getMetricIngestor introduced to use inget.Ingestor instead of localIngestor -func (adx *AzureDataExplorer) getIngestor(ctx context.Context, tableName string) (localIngestor, error) { - ingestor := adx.ingesters[tableName] - - if ingestor == nil { - if err := adx.createAzureDataExplorerTable(ctx, tableName); err != nil { - return nil, fmt.Errorf("creating table for %q failed: %v", tableName, err) - } - //create a new ingestor client for the table - tempIngestor, err := adx.createIngestor(adx.client, adx.Database, tableName) - if err != nil { - return nil, fmt.Errorf("creating ingestor for %q failed: %v", tableName, err) - } - adx.ingesters[tableName] = tempIngestor - ingestor = tempIngestor - } - return ingestor, nil -} - -/***/ - func (adx *AzureDataExplorer) createAzureDataExplorerTable(ctx context.Context, tableName string) error { if !adx.CreateTables { adx.Log.Info("skipped table creation") return nil } createStmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(fmt.Sprintf(createTableCommand, tableName)) - if adx.client != nil { - if _, err := adx.client.Mgmt(ctx, adx.Database, createStmt); err != nil { - return err - } - } else if adx.kustoClient != nil { - if _, err := adx.kustoClient.Mgmt(ctx, adx.Database, createStmt); err != nil { - return err - } + if _, err := adx.kustoClient.Mgmt(ctx, adx.Database, createStmt); err != nil { + return err } - createTableMappingStmt := - kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})). - UnsafeAdd(fmt.Sprintf(createTableMappingCommand, tableName, tableName)) - if adx.client != nil { - if _, err := adx.client.Mgmt(ctx, adx.Database, createTableMappingStmt); err != nil { - return err - } - } else if adx.kustoClient != nil { - if _, err := adx.kustoClient.Mgmt(ctx, adx.Database, createTableMappingStmt); err != nil { - return err - } + createTableMappingstmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})). + UnsafeAdd(fmt.Sprintf(createTableMappingCommand, tableName, tableName)) + if _, err := adx.kustoClient.Mgmt(ctx, adx.Database, createTableMappingstmt); err != nil { + return err } return nil @@ -343,15 +268,6 @@ func init() { }) } -// Deprecated: createIngestorByTable should be used with ingestionType and ingest.Ingestor -func createRealIngestor(client localClient, database string, tableName string) (localIngestor, error) { - ingestor, err := ingest.New(client.(*kusto.Client), database, tableName, ingest.WithStaticBuffer(bufferSize, maxBuffers)) - if ingestor != nil { - return ingestor, nil - } - return nil, err -} - // For each table create the ingestor func createIngestorByTable(client *kusto.Client, database string, tableName string, ingestionType string) (ingest.Ingestor, error) { switch strings.ToLower(ingestionType) { diff --git a/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go b/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go index 8d211f0f0..c012fb633 100644 --- a/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go +++ b/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go @@ -2,11 +2,12 @@ package azure_data_explorer import ( "bufio" + "bytes" "context" "encoding/json" - "errors" - "fmt" "io" + "log" + "os" "strings" "testing" "time" @@ -20,37 +21,22 @@ import ( "github.com/influxdata/telegraf/testutil" ) -const createTableCommandExpected = `.create-merge table ['%s'] (['fields']:dynamic, ['name']:string, ['tags']:dynamic, ['timestamp']:datetime);` -const createTableMappingCommandExpected = `.create-or-alter table ['%s'] ingestion json mapping '%s_mapping' '[{"column":"fields", ` + - `"Properties":{"Path":"$[\'fields\']"}},{"column":"name", ` + - `"Properties":{"Path":"$[\'name\']"}},{"column":"tags", ` + - `"Properties":{"Path":"$[\'tags\']"}},{"column":"timestamp", ` + - `"Properties":{"Path":"$[\'timestamp\']"}}]'` - func TestWrite(t *testing.T) { testCases := []struct { name string inputMetric []telegraf.Metric - client *fakeClient - createIngestor ingestorFactory metricsGrouping string tableName string expected map[string]interface{} expectedWriteError string createTables bool + ingestionType string }{ { - name: "Valid metric", - inputMetric: testutil.MockMetrics(), - createTables: true, - client: &fakeClient{ - queries: make([]string, 0), - internalMgmt: func(f *fakeClient, ctx context.Context, db string, query kusto.Stmt, options ...kusto.MgmtOption) (*kusto.RowIterator, error) { - f.queries = append(f.queries, query.String()) - return &kusto.RowIterator{}, nil - }, - }, - createIngestor: createFakeIngestor, + name: "Valid metric", + inputMetric: testutil.MockMetrics(), + createTables: true, + tableName: "test1", metricsGrouping: tablePerMetric, expected: map[string]interface{}{ "metricName": "test1", @@ -64,18 +50,10 @@ func TestWrite(t *testing.T) { }, }, { - name: "Don't create tables'", - inputMetric: testutil.MockMetrics(), - createTables: false, - client: &fakeClient{ - queries: make([]string, 0), - internalMgmt: func(f *fakeClient, ctx context.Context, db string, query kusto.Stmt, options ...kusto.MgmtOption) (*kusto.RowIterator, error) { - require.Fail(t, "Mgmt shouldn't be called when create_tables is false") - f.queries = append(f.queries, query.String()) - return &kusto.RowIterator{}, nil - }, - }, - createIngestor: createFakeIngestor, + name: "Don't create tables'", + inputMetric: testutil.MockMetrics(), + createTables: false, + tableName: "test1", metricsGrouping: tablePerMetric, expected: map[string]interface{}{ "metricName": "test1", @@ -89,41 +67,10 @@ func TestWrite(t *testing.T) { }, }, { - name: "Error in Mgmt", - inputMetric: testutil.MockMetrics(), - createTables: true, - client: &fakeClient{ - queries: make([]string, 0), - internalMgmt: func(f *fakeClient, ctx context.Context, db string, query kusto.Stmt, options ...kusto.MgmtOption) (*kusto.RowIterator, error) { - return nil, errors.New("Something went wrong") - }, - }, - createIngestor: createFakeIngestor, - metricsGrouping: tablePerMetric, - expected: map[string]interface{}{ - "metricName": "test1", - "fields": map[string]interface{}{ - "value": 1.0, - }, - "tags": map[string]interface{}{ - "tag1": "value1", - }, - "timestamp": float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).UnixNano() / int64(time.Second)), - }, - expectedWriteError: "creating table for \"test1\" failed: Something went wrong", - }, - { - name: "SingleTable metric grouping type", - inputMetric: testutil.MockMetrics(), - createTables: true, - client: &fakeClient{ - queries: make([]string, 0), - internalMgmt: func(f *fakeClient, ctx context.Context, db string, query kusto.Stmt, options ...kusto.MgmtOption) (*kusto.RowIterator, error) { - f.queries = append(f.queries, query.String()) - return &kusto.RowIterator{}, nil - }, - }, - createIngestor: createFakeIngestor, + name: "SingleTable metric grouping type", + inputMetric: testutil.MockMetrics(), + createTables: true, + tableName: "test1", metricsGrouping: singleTable, expected: map[string]interface{}{ "metricName": "test1", @@ -136,6 +83,24 @@ func TestWrite(t *testing.T) { "timestamp": float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).UnixNano() / int64(time.Second)), }, }, + { + name: "Valid metric managed ingestion", + inputMetric: testutil.MockMetrics(), + createTables: true, + tableName: "test1", + metricsGrouping: tablePerMetric, + ingestionType: managedIngestion, + expected: map[string]interface{}{ + "metricName": "test1", + "fields": map[string]interface{}{ + "value": 1.0, + }, + "tags": map[string]interface{}{ + "tag1": "value1", + }, + "timestamp": float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).UnixNano() / int64(time.Second)), + }, + }, } for _, tC := range testCases { @@ -146,6 +111,12 @@ func TestWrite(t *testing.T) { }) require.NoError(t, err) + ingestionType := "queued" + if tC.ingestionType != "" { + ingestionType = tC.ingestionType + } + + localFakeIngestor := &fakeIngestor{} plugin := AzureDataExplorer{ Endpoint: "someendpoint", Database: "databasename", @@ -153,10 +124,12 @@ func TestWrite(t *testing.T) { MetricsGrouping: tC.metricsGrouping, TableName: tC.tableName, CreateTables: tC.createTables, - client: tC.client, - ingesters: map[string]localIngestor{}, - createIngestor: tC.createIngestor, - serializer: serializer, + kustoClient: kusto.NewMockClient(), + metricIngestors: map[string]ingest.Ingestor{ + tC.tableName: localFakeIngestor, + }, + serializer: serializer, + IngestionType: ingestionType, } errorInWrite := plugin.Write(testutil.MockMetrics()) @@ -167,16 +140,9 @@ func TestWrite(t *testing.T) { require.NoError(t, errorInWrite) expectedNameOfMetric := tC.expected["metricName"].(string) - expectedNameOfTable := expectedNameOfMetric - createdIngestor := plugin.ingesters[expectedNameOfMetric] - if tC.metricsGrouping == singleTable { - expectedNameOfTable = tC.tableName - createdIngestor = plugin.ingesters[expectedNameOfTable] - } + createdFakeIngestor := localFakeIngestor - require.NotNil(t, createdIngestor) - createdFakeIngestor := createdIngestor.(*fakeIngestor) require.Equal(t, expectedNameOfMetric, createdFakeIngestor.actualOutputMetric["name"]) expectedFields := tC.expected["fields"].(map[string]interface{}) @@ -187,22 +153,42 @@ func TestWrite(t *testing.T) { expectedTime := tC.expected["timestamp"].(float64) require.Equal(t, expectedTime, createdFakeIngestor.actualOutputMetric["timestamp"]) - - if tC.createTables { - createTableString := fmt.Sprintf(createTableCommandExpected, expectedNameOfTable) - require.Equal(t, createTableString, tC.client.queries[0]) - - createTableMappingString := fmt.Sprintf(createTableMappingCommandExpected, expectedNameOfTable, expectedNameOfTable) - require.Equal(t, createTableMappingString, tC.client.queries[1]) - } else { - require.Empty(t, tC.client.queries) - } } }) } } -/***/ +func TestCreateAzureDataExplorerTable(t *testing.T) { + serializer, _ := telegrafJson.NewSerializer(time.Second, "", "") + plugin := AzureDataExplorer{ + Endpoint: "someendpoint", + Database: "databasename", + Log: testutil.Logger{}, + MetricsGrouping: tablePerMetric, + TableName: "test1", + CreateTables: false, + kustoClient: kusto.NewMockClient(), + metricIngestors: map[string]ingest.Ingestor{ + "test1": &fakeIngestor{}, + }, + serializer: serializer, + IngestionType: queuedIngestion, + } + var buf bytes.Buffer + log.SetOutput(&buf) + defer func() { + log.SetOutput(os.Stderr) + }() + + err := plugin.createAzureDataExplorerTable(context.Background(), "test1") + + output := buf.String() + + if err == nil && !strings.Contains(output, "skipped table creation") { + t.Logf("FAILED : TestCreateAzureDataExplorerTable: Should have skipped table creation.") + t.Fail() + } +} func TestWriteWithType(t *testing.T) { metricName := "test1" @@ -307,12 +293,11 @@ func TestWriteWithType(t *testing.T) { } } -func TestInitBlankEndpoint(t *testing.T) { +func TestInitBlankEndpointData(t *testing.T) { plugin := AzureDataExplorer{ - Log: testutil.Logger{}, - client: &fakeClient{}, - ingesters: map[string]localIngestor{}, - createIngestor: createFakeIngestor, + Log: testutil.Logger{}, + kustoClient: kusto.NewMockClient(), + metricIngestors: map[string]ingest.Ingestor{}, } errorInit := plugin.Init() @@ -320,22 +305,10 @@ func TestInitBlankEndpoint(t *testing.T) { require.Equal(t, "Endpoint configuration cannot be empty", errorInit.Error()) } -type fakeClient struct { - queries []string - internalMgmt func(client *fakeClient, ctx context.Context, db string, query kusto.Stmt, options ...kusto.MgmtOption) (*kusto.RowIterator, error) -} - -func (f *fakeClient) Mgmt(ctx context.Context, db string, query kusto.Stmt, options ...kusto.MgmtOption) (*kusto.RowIterator, error) { - return f.internalMgmt(f, ctx, db, query, options...) -} - type fakeIngestor struct { actualOutputMetric map[string]interface{} } -func createFakeIngestor(localClient, string, string) (localIngestor, error) { - return &fakeIngestor{}, nil -} func (f *fakeIngestor) FromReader(_ context.Context, reader io.Reader, _ ...ingest.FileOption) (*ingest.Result, error) { scanner := bufio.NewScanner(reader) scanner.Scan() @@ -347,6 +320,14 @@ func (f *fakeIngestor) FromReader(_ context.Context, reader io.Reader, _ ...inge return &ingest.Result{}, nil } +func (f *fakeIngestor) FromFile(_ context.Context, _ string, _ ...ingest.FileOption) (*ingest.Result, error) { + return &ingest.Result{}, nil +} + +func (f *fakeIngestor) Close() error { + return nil +} + type mockIngestor struct { records []string }