feat(outputs.azure_data_explorer): Added support for streaming ingestion for ADX output plugin (#11874)

This commit is contained in:
asaharn 2022-10-03 21:00:13 +05:30 committed by GitHub
parent 758f2cba7a
commit 9485e9040e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 281 additions and 14 deletions

View File

@ -41,6 +41,12 @@ of logs, metrics and time series data.
## Creates tables and relevant mapping if set to true(default).
## Skips table and mapping creation if set to false, this is useful for running Telegraf with the lowest possible permissions i.e. table ingestor role.
# create_tables = true
## Ingestion method to use.
## Available options are
## - managed -- streaming ingestion with fallback to batched ingestion or the "queued" method below
## - queued -- queue up metrics data and process sequentially
# ingestion_type = "queued"
```
## Metrics Grouping
@ -93,6 +99,18 @@ The corresponding table mapping would be like the following:
**Note**: This plugin will automatically create Azure Data Explorer tables and
corresponding table mapping as per the above mentioned commands.
## Ingestion type
**Note**:
[Streaming ingestion](https://aka.ms/AAhlg6s)
has to be enabled on ADX [configure the ADX cluster]
in case of `managed` option.
Refer the query below to check if streaming is enabled
```kql
.show database <DB-Name> policy streamingingestion
```
## Authentiation
### Supported Authentication Methods

View File

@ -12,12 +12,14 @@ import (
"time"
"github.com/Azure/azure-kusto-go/kusto"
kustoerrors "github.com/Azure/azure-kusto-go/kusto/data/errors"
"github.com/Azure/azure-kusto-go/kusto/ingest"
"github.com/Azure/azure-kusto-go/kusto/unsafe"
"github.com/Azure/go-autorest/autorest/azure/auth"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/plugins/serializers/json"
@ -34,10 +36,17 @@ type AzureDataExplorer struct {
MetricsGrouping string `toml:"metrics_grouping_type"`
TableName string `toml:"table_name"`
CreateTables bool `toml:"create_tables"`
client localClient
ingesters map[string]localIngestor
serializer serializers.Serializer
createIngestor ingestorFactory
IngestionType string `toml:"ingestion_type"`
//Deprecated: client of type *kusto.Client, ingestors of type ingest.Ingestor introduced
client localClient
ingesters map[string]localIngestor
/***/
serializer serializers.Serializer
//Deprecated
createIngestor ingestorFactory
/***/
kustoClient *kusto.Client
metricIngestors map[string]ingest.Ingestor
}
const (
@ -60,11 +69,14 @@ type ingestorFactory func(localClient, string, string) (localIngestor, error)
const createTableCommand = `.create-merge table ['%s'] (['fields']:dynamic, ['name']:string, ['tags']:dynamic, ['timestamp']:datetime);`
const createTableMappingCommand = `.create-or-alter table ['%s'] ingestion json mapping '%s_mapping' '[{"column":"fields", "Properties":{"Path":"$[\'fields\']"}},{"column":"name", "Properties":{"Path":"$[\'name\']"}},{"column":"tags", "Properties":{"Path":"$[\'tags\']"}},{"column":"timestamp", "Properties":{"Path":"$[\'timestamp\']"}}]'`
const managedIngestion = "managed"
const queuedIngestion = "queued"
func (*AzureDataExplorer) SampleConfig() string {
return sampleConfig
}
// Initialize the client and the ingestor
func (adx *AzureDataExplorer) Connect() error {
authorizer, err := auth.NewAuthorizerFromEnvironmentWithResource(adx.Endpoint)
if err != nil {
@ -78,18 +90,45 @@ func (adx *AzureDataExplorer) Connect() error {
if err != nil {
return err
}
adx.kustoClient = client
adx.metricIngestors = make(map[string]ingest.Ingestor)
//Depticated
adx.client = client
adx.ingesters = make(map[string]localIngestor)
adx.createIngestor = createRealIngestor
/***/
return nil
}
// Clean up and close the ingestor
func (adx *AzureDataExplorer) Close() error {
var errs []error
for _, v := range adx.metricIngestors {
if err := v.Close(); err != nil {
// accumulate errors while closing ingestors
errs = append(errs, err)
}
}
if err := adx.kustoClient.Close(); err != nil {
errs = append(errs, err)
}
adx.kustoClient = nil
adx.metricIngestors = nil
if len(errs) == 0 {
adx.Log.Info("Closed ingestors and client")
return nil
}
//Deprecated
adx.client = nil
adx.ingesters = nil
/***/
return nil
// Combine errors into a single object and return the combined error
return kustoerrors.GetCombinedError(errs...)
}
func (adx *AzureDataExplorer) Write(metrics []telegraf.Metric) error {
@ -151,19 +190,58 @@ func (adx *AzureDataExplorer) writeSingleTable(metrics []telegraf.Metric) error
}
func (adx *AzureDataExplorer) pushMetrics(ctx context.Context, format ingest.FileOption, tableName string, metricsArray []byte) error {
ingestor, err := adx.getIngestor(ctx, tableName)
if err != nil {
return err
var ingestor localIngestor
var metricIngestor ingest.Ingestor
var err error
if adx.client != nil && adx.createIngestor != nil {
ingestor, err = adx.getIngestor(ctx, tableName)
if err != nil {
return err
}
} else {
metricIngestor, err = adx.getMetricIngestor(ctx, tableName)
if err != nil {
return err
}
}
length := len(metricsArray)
adx.Log.Debugf("Writing %s metrics to table %q", length, tableName)
reader := bytes.NewReader(metricsArray)
mapping := ingest.IngestionMappingRef(fmt.Sprintf("%s_mapping", tableName), ingest.JSON)
if _, err := ingestor.FromReader(ctx, reader, format, mapping); err != nil {
adx.Log.Errorf("sending ingestion request to Azure Data Explorer for table %q failed: %v", tableName, err)
if ingestor != nil {
//Deprecated
if _, err := ingestor.FromReader(ctx, reader, format, mapping); err != nil {
adx.Log.Errorf("sending ingestion request to Azure Data Explorer for table %q failed: %v", tableName, err)
}
} else if metricIngestor != nil {
if _, err := metricIngestor.FromReader(ctx, reader, format, mapping); err != nil {
adx.Log.Errorf("sending ingestion request to Azure Data Explorer for table %q failed: %v", tableName, err)
}
}
return nil
}
func (adx *AzureDataExplorer) getMetricIngestor(ctx context.Context, tableName string) (ingest.Ingestor, error) {
ingestor := adx.metricIngestors[tableName]
if ingestor == nil {
if err := adx.createAzureDataExplorerTable(ctx, tableName); err != nil {
return nil, fmt.Errorf("creating table for %q failed: %v", tableName, err)
}
//create a new ingestor client for the table
tempIngestor, err := createIngestorByTable(adx.kustoClient, adx.Database, tableName, adx.IngestionType)
if err != nil {
return nil, fmt.Errorf("creating ingestor for %q failed: %v", tableName, err)
}
adx.metricIngestors[tableName] = tempIngestor
adx.Log.Debugf("Ingestor for table %s created", tableName)
ingestor = tempIngestor
}
return ingestor, nil
}
// Deprecated: getMetricIngestor introduced to use inget.Ingestor instead of localIngestor
func (adx *AzureDataExplorer) getIngestor(ctx context.Context, tableName string) (localIngestor, error) {
ingestor := adx.ingesters[tableName]
@ -182,19 +260,33 @@ func (adx *AzureDataExplorer) getIngestor(ctx context.Context, tableName string)
return ingestor, nil
}
/***/
func (adx *AzureDataExplorer) createAzureDataExplorerTable(ctx context.Context, tableName string) error {
if !adx.CreateTables {
adx.Log.Info("skipped table creation")
return nil
}
createStmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(fmt.Sprintf(createTableCommand, tableName))
if _, err := adx.client.Mgmt(ctx, adx.Database, createStmt); err != nil {
return err
if adx.client != nil {
if _, err := adx.client.Mgmt(ctx, adx.Database, createStmt); err != nil {
return err
}
} else if adx.kustoClient != nil {
if _, err := adx.kustoClient.Mgmt(ctx, adx.Database, createStmt); err != nil {
return err
}
}
createTableMappingstmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(fmt.Sprintf(createTableMappingCommand, tableName, tableName))
if _, err := adx.client.Mgmt(ctx, adx.Database, createTableMappingstmt); err != nil {
return err
if adx.client != nil {
if _, err := adx.client.Mgmt(ctx, adx.Database, createTableMappingstmt); err != nil {
return err
}
} else if adx.kustoClient != nil {
if _, err := adx.kustoClient.Mgmt(ctx, adx.Database, createTableMappingstmt); err != nil {
return err
}
}
return nil
@ -219,6 +311,12 @@ func (adx *AzureDataExplorer) Init() error {
return errors.New("Metrics grouping type is not valid")
}
if adx.IngestionType == "" {
adx.IngestionType = queuedIngestion
} else if !(choice.Contains(adx.IngestionType, []string{managedIngestion, queuedIngestion})) {
return fmt.Errorf("unknown ingestion type %q", adx.IngestionType)
}
serializer, err := json.NewSerializer(time.Nanosecond, time.RFC3339Nano, "")
if err != nil {
return err
@ -236,6 +334,7 @@ func init() {
})
}
// Deprecated: createIngestorByTable should be used with ingestionType and ingest.Ingestor
func createRealIngestor(client localClient, database string, tableName string) (localIngestor, error) {
ingestor, err := ingest.New(client.(*kusto.Client), database, tableName, ingest.WithStaticBuffer(bufferSize, maxBuffers))
if ingestor != nil {
@ -243,3 +342,16 @@ func createRealIngestor(client localClient, database string, tableName string) (
}
return nil, err
}
// For each table create the ingestor
func createIngestorByTable(client *kusto.Client, database string, tableName string, ingestionType string) (ingest.Ingestor, error) {
switch strings.ToLower(ingestionType) {
case managedIngestion:
mi, err := ingest.NewManaged(client, database, tableName)
return mi, err
case queuedIngestion:
qi, err := ingest.New(client, database, tableName, ingest.WithStaticBuffer(bufferSize, maxBuffers))
return qi, err
}
return nil, fmt.Errorf(`ingestion_type has to be one of %q or %q`, managedIngestion, queuedIngestion)
}

View File

@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io"
"strings"
"testing"
"time"
@ -193,6 +194,108 @@ func TestWrite(t *testing.T) {
}
}
/***/
func TestWriteWithType(t *testing.T) {
metricName := "test1"
fakeClient := kusto.NewMockClient()
expectedResultMap := map[string]string{metricName: `{"fields":{"value":1},"name":"test1","tags":{"tag1":"value1"},"timestamp":1257894000}`}
mockMetrics := testutil.MockMetrics()
// Multi tables
mockMetricsMulti := []telegraf.Metric{
testutil.TestMetric(1.0, "test2"),
testutil.TestMetric(2.0, "test3"),
}
expectedResultMap2 := map[string]string{"test2": `{"fields":{"value":1.0},"name":"test2","tags":{"tag1":"value1"},"timestamp":1257894000}`, "test3": `{"fields":{"value":2.0},"name":"test3","tags":{"tag1":"value1"},"timestamp":1257894000}`}
// List of tests
testCases := []struct {
name string
inputMetric []telegraf.Metric
metricsGrouping string
tableNameToExpectedResult map[string]string
expectedWriteError string
createTables bool
ingestionType string
}{
{
name: "Valid metric",
inputMetric: mockMetrics,
createTables: true,
metricsGrouping: tablePerMetric,
tableNameToExpectedResult: expectedResultMap,
},
{
name: "Don't create tables'",
inputMetric: mockMetrics,
createTables: false,
metricsGrouping: tablePerMetric,
tableNameToExpectedResult: expectedResultMap,
},
{
name: "SingleTable metric grouping type",
inputMetric: mockMetrics,
createTables: true,
metricsGrouping: singleTable,
tableNameToExpectedResult: expectedResultMap,
},
{
name: "Valid metric managed ingestion",
inputMetric: mockMetrics,
createTables: true,
metricsGrouping: tablePerMetric,
tableNameToExpectedResult: expectedResultMap,
ingestionType: managedIngestion,
},
{
name: "Table per metric type",
inputMetric: mockMetricsMulti,
createTables: true,
metricsGrouping: tablePerMetric,
tableNameToExpectedResult: expectedResultMap2,
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
serializer, err := telegrafJson.NewSerializer(time.Second, "", "")
require.NoError(t, err)
for tableName, jsonValue := range testCase.tableNameToExpectedResult {
ingestionType := "queued"
if testCase.ingestionType != "" {
ingestionType = testCase.ingestionType
}
mockIngestor := &mockIngestor{}
plugin := AzureDataExplorer{
Endpoint: "someendpoint",
Database: "databasename",
Log: testutil.Logger{},
IngestionType: ingestionType,
MetricsGrouping: testCase.metricsGrouping,
TableName: tableName,
CreateTables: testCase.createTables,
kustoClient: fakeClient,
metricIngestors: map[string]ingest.Ingestor{
tableName: mockIngestor,
},
serializer: serializer,
}
err := plugin.Write(testCase.inputMetric)
if testCase.expectedWriteError != "" {
require.EqualError(t, err, testCase.expectedWriteError)
continue
}
require.NoError(t, err)
createdIngestor := plugin.metricIngestors[tableName]
if testCase.metricsGrouping == singleTable {
createdIngestor = plugin.metricIngestors[tableName]
}
records := mockIngestor.records[0] // the first element
require.NotNil(t, createdIngestor)
require.JSONEq(t, jsonValue, records)
}
})
}
}
func TestInitBlankEndpoint(t *testing.T) {
plugin := AzureDataExplorer{
Log: testutil.Logger{},
@ -232,3 +335,31 @@ func (f *fakeIngestor) FromReader(_ context.Context, reader io.Reader, _ ...inge
}
return &ingest.Result{}, nil
}
type mockIngestor struct {
records []string
}
func (m *mockIngestor) FromReader(ctx context.Context, reader io.Reader, options ...ingest.FileOption) (*ingest.Result, error) {
bufbytes, _ := io.ReadAll(reader)
metricjson := string(bufbytes)
m.SetRecords(strings.Split(metricjson, "\n"))
return &ingest.Result{}, nil
}
func (m *mockIngestor) FromFile(ctx context.Context, fPath string, options ...ingest.FileOption) (*ingest.Result, error) {
return &ingest.Result{}, nil
}
func (m *mockIngestor) SetRecords(records []string) {
m.records = records
}
// Name receives a copy of Foo since it doesn't need to modify it.
func (m *mockIngestor) Records() []string {
return m.records
}
func (m *mockIngestor) Close() error {
return nil
}

View File

@ -23,3 +23,9 @@
## Creates tables and relevant mapping if set to true(default).
## Skips table and mapping creation if set to false, this is useful for running Telegraf with the lowest possible permissions i.e. table ingestor role.
# create_tables = true
## Ingestion method to use.
## Available options are
## - managed -- streaming ingestion with fallback to batched ingestion or the "queued" method below
## - queued -- queue up metrics data and process sequentially
# ingestion_type = "queued"