feat: add option to skip table creation in azure data explorer output (#9942)

This commit is contained in:
AsafMah 2021-10-25 17:44:20 +03:00 committed by GitHub
parent 76d5e3e4c8
commit 9d5eb7dd68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 65 additions and 15 deletions

View File

@ -31,6 +31,10 @@ Azure Data Explorer is a distributed, columnar store, purpose built for any type
## Name of the single table to store all the metrics (Only needed if metrics_grouping_type is "SingleTable"). ## Name of the single table to store all the metrics (Only needed if metrics_grouping_type is "SingleTable").
# table_name = "" # table_name = ""
## Creates tables and relevant mapping if set to true(default).
## Skips table and mapping creation if set to false, this is useful for running Telegraf with the lowest possible permissions i.e. table ingestor role.
# create_tables = true
``` ```
## Metrics Grouping ## Metrics Grouping
@ -85,7 +89,10 @@ These methods are:
[principal]: https://docs.microsoft.com/en-us/azure/active-directory/develop/active-directory-application-objects [principal]: https://docs.microsoft.com/en-us/azure/active-directory/develop/active-directory-application-objects
Whichever method, the designated Principal needs to be assigned the `Database User` role on the Database level in the Azure Data Explorer. This role will allow the plugin to create the required tables and ingest data into it. Whichever method, the designated Principal needs to be assigned the `Database User` role on the Database level in the Azure Data Explorer. This role will
allow the plugin to create the required tables and ingest data into it.
If `create_tables=false` then the designated principal only needs the `Database Ingestor` role at least.
### Configurations of the chosen Authentication Method ### Configurations of the chosen Authentication Method

View File

@ -27,6 +27,7 @@ type AzureDataExplorer struct {
Timeout config.Duration `toml:"timeout"` Timeout config.Duration `toml:"timeout"`
MetricsGrouping string `toml:"metrics_grouping_type"` MetricsGrouping string `toml:"metrics_grouping_type"`
TableName string `toml:"table_name"` TableName string `toml:"table_name"`
CreateTables bool `toml:"create_tables"`
client localClient client localClient
ingesters map[string]localIngestor ingesters map[string]localIngestor
serializer serializers.Serializer serializer serializers.Serializer
@ -57,7 +58,7 @@ func (adx *AzureDataExplorer) Description() string {
func (adx *AzureDataExplorer) SampleConfig() string { func (adx *AzureDataExplorer) SampleConfig() string {
return ` return `
## Azure Data Exlorer cluster endpoint ## Azure Data Explorer cluster endpoint
## ex: endpoint_url = "https://clustername.australiasoutheast.kusto.windows.net" ## ex: endpoint_url = "https://clustername.australiasoutheast.kusto.windows.net"
endpoint_url = "" endpoint_url = ""
@ -77,6 +78,9 @@ func (adx *AzureDataExplorer) SampleConfig() string {
## Name of the single table to store all the metrics (Only needed if metrics_grouping_type is "SingleTable"). ## Name of the single table to store all the metrics (Only needed if metrics_grouping_type is "SingleTable").
# table_name = "" # table_name = ""
## Creates tables and relevant mapping if set to true(default).
## Skips table and mapping creation if set to false, this is useful for running Telegraf with the lowest possible permissions i.e. table ingestor role.
# create_tables = true
` `
} }
@ -198,6 +202,10 @@ func (adx *AzureDataExplorer) getIngestor(ctx context.Context, tableName string)
} }
func (adx *AzureDataExplorer) createAzureDataExplorerTable(ctx context.Context, tableName string) error { func (adx *AzureDataExplorer) createAzureDataExplorerTable(ctx context.Context, tableName string) error {
if !adx.CreateTables {
adx.Log.Info("skipped table creation")
return nil
}
createStmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(fmt.Sprintf(createTableCommand, tableName)) createStmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(fmt.Sprintf(createTableCommand, tableName))
if _, err := adx.client.Mgmt(ctx, adx.Database, createStmt); err != nil { if _, err := adx.client.Mgmt(ctx, adx.Database, createStmt); err != nil {
return err return err
@ -241,7 +249,8 @@ func (adx *AzureDataExplorer) Init() error {
func init() { func init() {
outputs.Add("azure_data_explorer", func() telegraf.Output { outputs.Add("azure_data_explorer", func() telegraf.Output {
return &AzureDataExplorer{ return &AzureDataExplorer{
Timeout: config.Duration(20 * time.Second), Timeout: config.Duration(20 * time.Second),
CreateTables: true,
} }
}) })
} }

View File

@ -31,10 +31,12 @@ func TestWrite(t *testing.T) {
tableName string tableName string
expected map[string]interface{} expected map[string]interface{}
expectedWriteError string expectedWriteError string
createTables bool
}{ }{
{ {
name: "Valid metric", name: "Valid metric",
inputMetric: testutil.MockMetrics(), inputMetric: testutil.MockMetrics(),
createTables: true,
client: &fakeClient{ client: &fakeClient{
queries: make([]string, 0), queries: make([]string, 0),
internalMgmt: func(f *fakeClient, ctx context.Context, db string, query kusto.Stmt, options ...kusto.MgmtOption) (*kusto.RowIterator, error) { internalMgmt: func(f *fakeClient, ctx context.Context, db string, query kusto.Stmt, options ...kusto.MgmtOption) (*kusto.RowIterator, error) {
@ -56,8 +58,34 @@ func TestWrite(t *testing.T) {
}, },
}, },
{ {
name: "Error in Mgmt", name: "Don't create tables'",
inputMetric: testutil.MockMetrics(), inputMetric: testutil.MockMetrics(),
createTables: false,
client: &fakeClient{
queries: make([]string, 0),
internalMgmt: func(f *fakeClient, ctx context.Context, db string, query kusto.Stmt, options ...kusto.MgmtOption) (*kusto.RowIterator, error) {
require.Fail(t, "Mgmt shouldn't be called when create_tables is false")
f.queries = append(f.queries, query.String())
return &kusto.RowIterator{}, nil
},
},
createIngestor: createFakeIngestor,
metricsGrouping: tablePerMetric,
expected: map[string]interface{}{
"metricName": "test1",
"fields": map[string]interface{}{
"value": 1.0,
},
"tags": map[string]interface{}{
"tag1": "value1",
},
"timestamp": float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).UnixNano() / int64(time.Second)),
},
},
{
name: "Error in Mgmt",
inputMetric: testutil.MockMetrics(),
createTables: true,
client: &fakeClient{ client: &fakeClient{
queries: make([]string, 0), queries: make([]string, 0),
internalMgmt: func(f *fakeClient, ctx context.Context, db string, query kusto.Stmt, options ...kusto.MgmtOption) (*kusto.RowIterator, error) { internalMgmt: func(f *fakeClient, ctx context.Context, db string, query kusto.Stmt, options ...kusto.MgmtOption) (*kusto.RowIterator, error) {
@ -79,8 +107,9 @@ func TestWrite(t *testing.T) {
expectedWriteError: "creating table for \"test1\" failed: Something went wrong", expectedWriteError: "creating table for \"test1\" failed: Something went wrong",
}, },
{ {
name: "SingleTable metric grouping type", name: "SingleTable metric grouping type",
inputMetric: testutil.MockMetrics(), inputMetric: testutil.MockMetrics(),
createTables: true,
client: &fakeClient{ client: &fakeClient{
queries: make([]string, 0), queries: make([]string, 0),
internalMgmt: func(f *fakeClient, ctx context.Context, db string, query kusto.Stmt, options ...kusto.MgmtOption) (*kusto.RowIterator, error) { internalMgmt: func(f *fakeClient, ctx context.Context, db string, query kusto.Stmt, options ...kusto.MgmtOption) (*kusto.RowIterator, error) {
@ -114,6 +143,7 @@ func TestWrite(t *testing.T) {
Log: testutil.Logger{}, Log: testutil.Logger{},
MetricsGrouping: tC.metricsGrouping, MetricsGrouping: tC.metricsGrouping,
TableName: tC.tableName, TableName: tC.tableName,
CreateTables: tC.createTables,
client: tC.client, client: tC.client,
ingesters: map[string]localIngestor{}, ingesters: map[string]localIngestor{},
createIngestor: tC.createIngestor, createIngestor: tC.createIngestor,
@ -149,11 +179,15 @@ func TestWrite(t *testing.T) {
expectedTime := tC.expected["timestamp"].(float64) expectedTime := tC.expected["timestamp"].(float64)
require.Equal(t, expectedTime, createdFakeIngestor.actualOutputMetric["timestamp"]) require.Equal(t, expectedTime, createdFakeIngestor.actualOutputMetric["timestamp"])
createTableString := fmt.Sprintf(createTableCommandExpected, expectedNameOfTable) if tC.createTables {
require.Equal(t, createTableString, tC.client.queries[0]) createTableString := fmt.Sprintf(createTableCommandExpected, expectedNameOfTable)
require.Equal(t, createTableString, tC.client.queries[0])
createTableMappingString := fmt.Sprintf(createTableMappingCommandExpected, expectedNameOfTable, expectedNameOfTable) createTableMappingString := fmt.Sprintf(createTableMappingCommandExpected, expectedNameOfTable, expectedNameOfTable)
require.Equal(t, createTableMappingString, tC.client.queries[1]) require.Equal(t, createTableMappingString, tC.client.queries[1])
} else {
require.Empty(t, tC.client.queries)
}
} }
}) })
} }
@ -185,10 +219,10 @@ type fakeIngestor struct {
actualOutputMetric map[string]interface{} actualOutputMetric map[string]interface{}
} }
func createFakeIngestor(client localClient, database string, tableName string) (localIngestor, error) { func createFakeIngestor(localClient, string, string) (localIngestor, error) {
return &fakeIngestor{}, nil return &fakeIngestor{}, nil
} }
func (f *fakeIngestor) FromReader(ctx context.Context, reader io.Reader, options ...ingest.FileOption) (*ingest.Result, error) { func (f *fakeIngestor) FromReader(_ context.Context, reader io.Reader, _ ...ingest.FileOption) (*ingest.Result, error) {
scanner := bufio.NewScanner(reader) scanner := bufio.NewScanner(reader)
scanner.Scan() scanner.Scan()
firstLine := scanner.Text() firstLine := scanner.Text()