refactor(outputs.azure_data_explorer): removed deprecated code (#11951)
This commit is contained in:
parent
ce52b174ec
commit
736967974b
|
|
@ -7,7 +7,6 @@ import (
|
|||
_ "embed"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
|
|
@ -37,14 +36,7 @@ type AzureDataExplorer struct {
|
|||
TableName string `toml:"table_name"`
|
||||
CreateTables bool `toml:"create_tables"`
|
||||
IngestionType string `toml:"ingestion_type"`
|
||||
//Deprecated: client of type *kusto.Client, ingestors of type ingest.Ingestor introduced
|
||||
client localClient
|
||||
ingesters map[string]localIngestor
|
||||
/***/
|
||||
serializer serializers.Serializer
|
||||
//Deprecated
|
||||
createIngestor ingestorFactory
|
||||
/***/
|
||||
serializer serializers.Serializer
|
||||
kustoClient *kusto.Client
|
||||
metricIngestors map[string]ingest.Ingestor
|
||||
}
|
||||
|
|
@ -57,16 +49,6 @@ const (
|
|||
maxBuffers = 5
|
||||
)
|
||||
|
||||
type localIngestor interface {
|
||||
FromReader(ctx context.Context, reader io.Reader, options ...ingest.FileOption) (*ingest.Result, error)
|
||||
}
|
||||
|
||||
type localClient interface {
|
||||
Mgmt(ctx context.Context, db string, query kusto.Stmt, options ...kusto.MgmtOption) (*kusto.RowIterator, error)
|
||||
}
|
||||
|
||||
type ingestorFactory func(localClient, string, string) (localIngestor, error)
|
||||
|
||||
const createTableCommand = `.create-merge table ['%s'] (['fields']:dynamic, ['name']:string, ['tags']:dynamic, ['timestamp']:datetime);`
|
||||
const createTableMappingCommand = `.create-or-alter table ['%s'] ingestion json mapping '%s_mapping' '[{"column":"fields", ` +
|
||||
`"Properties":{"Path":"$[\'fields\']"}},{"column":"name", ` +
|
||||
|
|
@ -96,11 +78,6 @@ func (adx *AzureDataExplorer) Connect() error {
|
|||
}
|
||||
adx.kustoClient = client
|
||||
adx.metricIngestors = make(map[string]ingest.Ingestor)
|
||||
//Depticated
|
||||
adx.client = client
|
||||
adx.ingesters = make(map[string]localIngestor)
|
||||
adx.createIngestor = createRealIngestor
|
||||
/***/
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
@ -125,12 +102,6 @@ func (adx *AzureDataExplorer) Close() error {
|
|||
adx.Log.Info("Closed ingestors and client")
|
||||
return nil
|
||||
}
|
||||
|
||||
//Deprecated
|
||||
adx.client = nil
|
||||
adx.ingesters = nil
|
||||
/***/
|
||||
|
||||
// Combine errors into a single object and return the combined error
|
||||
return kustoerrors.GetCombinedError(errs...)
|
||||
}
|
||||
|
|
@ -194,31 +165,19 @@ func (adx *AzureDataExplorer) writeSingleTable(metrics []telegraf.Metric) error
|
|||
}
|
||||
|
||||
func (adx *AzureDataExplorer) pushMetrics(ctx context.Context, format ingest.FileOption, tableName string, metricsArray []byte) error {
|
||||
var ingestor localIngestor
|
||||
var metricIngestor ingest.Ingestor
|
||||
var err error
|
||||
if adx.client != nil && adx.createIngestor != nil {
|
||||
ingestor, err = adx.getIngestor(ctx, tableName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
metricIngestor, err = adx.getMetricIngestor(ctx, tableName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
metricIngestor, err = adx.getMetricIngestor(ctx, tableName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
length := len(metricsArray)
|
||||
adx.Log.Debugf("Writing %s metrics to table %q", length, tableName)
|
||||
reader := bytes.NewReader(metricsArray)
|
||||
mapping := ingest.IngestionMappingRef(fmt.Sprintf("%s_mapping", tableName), ingest.JSON)
|
||||
if ingestor != nil {
|
||||
//Deprecated
|
||||
if _, err := ingestor.FromReader(ctx, reader, format, mapping); err != nil {
|
||||
adx.Log.Errorf("sending ingestion request to Azure Data Explorer for table %q failed: %v", tableName, err)
|
||||
}
|
||||
} else if metricIngestor != nil {
|
||||
if metricIngestor != nil {
|
||||
if _, err := metricIngestor.FromReader(ctx, reader, format, mapping); err != nil {
|
||||
adx.Log.Errorf("sending ingestion request to Azure Data Explorer for table %q failed: %v", tableName, err)
|
||||
}
|
||||
|
|
@ -245,54 +204,20 @@ func (adx *AzureDataExplorer) getMetricIngestor(ctx context.Context, tableName s
|
|||
return ingestor, nil
|
||||
}
|
||||
|
||||
// Deprecated: getMetricIngestor introduced to use inget.Ingestor instead of localIngestor
|
||||
func (adx *AzureDataExplorer) getIngestor(ctx context.Context, tableName string) (localIngestor, error) {
|
||||
ingestor := adx.ingesters[tableName]
|
||||
|
||||
if ingestor == nil {
|
||||
if err := adx.createAzureDataExplorerTable(ctx, tableName); err != nil {
|
||||
return nil, fmt.Errorf("creating table for %q failed: %v", tableName, err)
|
||||
}
|
||||
//create a new ingestor client for the table
|
||||
tempIngestor, err := adx.createIngestor(adx.client, adx.Database, tableName)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("creating ingestor for %q failed: %v", tableName, err)
|
||||
}
|
||||
adx.ingesters[tableName] = tempIngestor
|
||||
ingestor = tempIngestor
|
||||
}
|
||||
return ingestor, nil
|
||||
}
|
||||
|
||||
/***/
|
||||
|
||||
func (adx *AzureDataExplorer) createAzureDataExplorerTable(ctx context.Context, tableName string) error {
|
||||
if !adx.CreateTables {
|
||||
adx.Log.Info("skipped table creation")
|
||||
return nil
|
||||
}
|
||||
createStmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(fmt.Sprintf(createTableCommand, tableName))
|
||||
if adx.client != nil {
|
||||
if _, err := adx.client.Mgmt(ctx, adx.Database, createStmt); err != nil {
|
||||
return err
|
||||
}
|
||||
} else if adx.kustoClient != nil {
|
||||
if _, err := adx.kustoClient.Mgmt(ctx, adx.Database, createStmt); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := adx.kustoClient.Mgmt(ctx, adx.Database, createStmt); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
createTableMappingStmt :=
|
||||
kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).
|
||||
UnsafeAdd(fmt.Sprintf(createTableMappingCommand, tableName, tableName))
|
||||
if adx.client != nil {
|
||||
if _, err := adx.client.Mgmt(ctx, adx.Database, createTableMappingStmt); err != nil {
|
||||
return err
|
||||
}
|
||||
} else if adx.kustoClient != nil {
|
||||
if _, err := adx.kustoClient.Mgmt(ctx, adx.Database, createTableMappingStmt); err != nil {
|
||||
return err
|
||||
}
|
||||
createTableMappingstmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).
|
||||
UnsafeAdd(fmt.Sprintf(createTableMappingCommand, tableName, tableName))
|
||||
if _, err := adx.kustoClient.Mgmt(ctx, adx.Database, createTableMappingstmt); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
@ -343,15 +268,6 @@ func init() {
|
|||
})
|
||||
}
|
||||
|
||||
// Deprecated: createIngestorByTable should be used with ingestionType and ingest.Ingestor
|
||||
func createRealIngestor(client localClient, database string, tableName string) (localIngestor, error) {
|
||||
ingestor, err := ingest.New(client.(*kusto.Client), database, tableName, ingest.WithStaticBuffer(bufferSize, maxBuffers))
|
||||
if ingestor != nil {
|
||||
return ingestor, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// For each table create the ingestor
|
||||
func createIngestorByTable(client *kusto.Client, database string, tableName string, ingestionType string) (ingest.Ingestor, error) {
|
||||
switch strings.ToLower(ingestionType) {
|
||||
|
|
|
|||
|
|
@ -2,11 +2,12 @@ package azure_data_explorer
|
|||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
|
@ -20,37 +21,22 @@ import (
|
|||
"github.com/influxdata/telegraf/testutil"
|
||||
)
|
||||
|
||||
const createTableCommandExpected = `.create-merge table ['%s'] (['fields']:dynamic, ['name']:string, ['tags']:dynamic, ['timestamp']:datetime);`
|
||||
const createTableMappingCommandExpected = `.create-or-alter table ['%s'] ingestion json mapping '%s_mapping' '[{"column":"fields", ` +
|
||||
`"Properties":{"Path":"$[\'fields\']"}},{"column":"name", ` +
|
||||
`"Properties":{"Path":"$[\'name\']"}},{"column":"tags", ` +
|
||||
`"Properties":{"Path":"$[\'tags\']"}},{"column":"timestamp", ` +
|
||||
`"Properties":{"Path":"$[\'timestamp\']"}}]'`
|
||||
|
||||
func TestWrite(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
inputMetric []telegraf.Metric
|
||||
client *fakeClient
|
||||
createIngestor ingestorFactory
|
||||
metricsGrouping string
|
||||
tableName string
|
||||
expected map[string]interface{}
|
||||
expectedWriteError string
|
||||
createTables bool
|
||||
ingestionType string
|
||||
}{
|
||||
{
|
||||
name: "Valid metric",
|
||||
inputMetric: testutil.MockMetrics(),
|
||||
createTables: true,
|
||||
client: &fakeClient{
|
||||
queries: make([]string, 0),
|
||||
internalMgmt: func(f *fakeClient, ctx context.Context, db string, query kusto.Stmt, options ...kusto.MgmtOption) (*kusto.RowIterator, error) {
|
||||
f.queries = append(f.queries, query.String())
|
||||
return &kusto.RowIterator{}, nil
|
||||
},
|
||||
},
|
||||
createIngestor: createFakeIngestor,
|
||||
name: "Valid metric",
|
||||
inputMetric: testutil.MockMetrics(),
|
||||
createTables: true,
|
||||
tableName: "test1",
|
||||
metricsGrouping: tablePerMetric,
|
||||
expected: map[string]interface{}{
|
||||
"metricName": "test1",
|
||||
|
|
@ -64,18 +50,10 @@ func TestWrite(t *testing.T) {
|
|||
},
|
||||
},
|
||||
{
|
||||
name: "Don't create tables'",
|
||||
inputMetric: testutil.MockMetrics(),
|
||||
createTables: false,
|
||||
client: &fakeClient{
|
||||
queries: make([]string, 0),
|
||||
internalMgmt: func(f *fakeClient, ctx context.Context, db string, query kusto.Stmt, options ...kusto.MgmtOption) (*kusto.RowIterator, error) {
|
||||
require.Fail(t, "Mgmt shouldn't be called when create_tables is false")
|
||||
f.queries = append(f.queries, query.String())
|
||||
return &kusto.RowIterator{}, nil
|
||||
},
|
||||
},
|
||||
createIngestor: createFakeIngestor,
|
||||
name: "Don't create tables'",
|
||||
inputMetric: testutil.MockMetrics(),
|
||||
createTables: false,
|
||||
tableName: "test1",
|
||||
metricsGrouping: tablePerMetric,
|
||||
expected: map[string]interface{}{
|
||||
"metricName": "test1",
|
||||
|
|
@ -89,41 +67,10 @@ func TestWrite(t *testing.T) {
|
|||
},
|
||||
},
|
||||
{
|
||||
name: "Error in Mgmt",
|
||||
inputMetric: testutil.MockMetrics(),
|
||||
createTables: true,
|
||||
client: &fakeClient{
|
||||
queries: make([]string, 0),
|
||||
internalMgmt: func(f *fakeClient, ctx context.Context, db string, query kusto.Stmt, options ...kusto.MgmtOption) (*kusto.RowIterator, error) {
|
||||
return nil, errors.New("Something went wrong")
|
||||
},
|
||||
},
|
||||
createIngestor: createFakeIngestor,
|
||||
metricsGrouping: tablePerMetric,
|
||||
expected: map[string]interface{}{
|
||||
"metricName": "test1",
|
||||
"fields": map[string]interface{}{
|
||||
"value": 1.0,
|
||||
},
|
||||
"tags": map[string]interface{}{
|
||||
"tag1": "value1",
|
||||
},
|
||||
"timestamp": float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).UnixNano() / int64(time.Second)),
|
||||
},
|
||||
expectedWriteError: "creating table for \"test1\" failed: Something went wrong",
|
||||
},
|
||||
{
|
||||
name: "SingleTable metric grouping type",
|
||||
inputMetric: testutil.MockMetrics(),
|
||||
createTables: true,
|
||||
client: &fakeClient{
|
||||
queries: make([]string, 0),
|
||||
internalMgmt: func(f *fakeClient, ctx context.Context, db string, query kusto.Stmt, options ...kusto.MgmtOption) (*kusto.RowIterator, error) {
|
||||
f.queries = append(f.queries, query.String())
|
||||
return &kusto.RowIterator{}, nil
|
||||
},
|
||||
},
|
||||
createIngestor: createFakeIngestor,
|
||||
name: "SingleTable metric grouping type",
|
||||
inputMetric: testutil.MockMetrics(),
|
||||
createTables: true,
|
||||
tableName: "test1",
|
||||
metricsGrouping: singleTable,
|
||||
expected: map[string]interface{}{
|
||||
"metricName": "test1",
|
||||
|
|
@ -136,6 +83,24 @@ func TestWrite(t *testing.T) {
|
|||
"timestamp": float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).UnixNano() / int64(time.Second)),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Valid metric managed ingestion",
|
||||
inputMetric: testutil.MockMetrics(),
|
||||
createTables: true,
|
||||
tableName: "test1",
|
||||
metricsGrouping: tablePerMetric,
|
||||
ingestionType: managedIngestion,
|
||||
expected: map[string]interface{}{
|
||||
"metricName": "test1",
|
||||
"fields": map[string]interface{}{
|
||||
"value": 1.0,
|
||||
},
|
||||
"tags": map[string]interface{}{
|
||||
"tag1": "value1",
|
||||
},
|
||||
"timestamp": float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).UnixNano() / int64(time.Second)),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tC := range testCases {
|
||||
|
|
@ -146,6 +111,12 @@ func TestWrite(t *testing.T) {
|
|||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
ingestionType := "queued"
|
||||
if tC.ingestionType != "" {
|
||||
ingestionType = tC.ingestionType
|
||||
}
|
||||
|
||||
localFakeIngestor := &fakeIngestor{}
|
||||
plugin := AzureDataExplorer{
|
||||
Endpoint: "someendpoint",
|
||||
Database: "databasename",
|
||||
|
|
@ -153,10 +124,12 @@ func TestWrite(t *testing.T) {
|
|||
MetricsGrouping: tC.metricsGrouping,
|
||||
TableName: tC.tableName,
|
||||
CreateTables: tC.createTables,
|
||||
client: tC.client,
|
||||
ingesters: map[string]localIngestor{},
|
||||
createIngestor: tC.createIngestor,
|
||||
serializer: serializer,
|
||||
kustoClient: kusto.NewMockClient(),
|
||||
metricIngestors: map[string]ingest.Ingestor{
|
||||
tC.tableName: localFakeIngestor,
|
||||
},
|
||||
serializer: serializer,
|
||||
IngestionType: ingestionType,
|
||||
}
|
||||
|
||||
errorInWrite := plugin.Write(testutil.MockMetrics())
|
||||
|
|
@ -167,16 +140,9 @@ func TestWrite(t *testing.T) {
|
|||
require.NoError(t, errorInWrite)
|
||||
|
||||
expectedNameOfMetric := tC.expected["metricName"].(string)
|
||||
expectedNameOfTable := expectedNameOfMetric
|
||||
createdIngestor := plugin.ingesters[expectedNameOfMetric]
|
||||
|
||||
if tC.metricsGrouping == singleTable {
|
||||
expectedNameOfTable = tC.tableName
|
||||
createdIngestor = plugin.ingesters[expectedNameOfTable]
|
||||
}
|
||||
createdFakeIngestor := localFakeIngestor
|
||||
|
||||
require.NotNil(t, createdIngestor)
|
||||
createdFakeIngestor := createdIngestor.(*fakeIngestor)
|
||||
require.Equal(t, expectedNameOfMetric, createdFakeIngestor.actualOutputMetric["name"])
|
||||
|
||||
expectedFields := tC.expected["fields"].(map[string]interface{})
|
||||
|
|
@ -187,22 +153,42 @@ func TestWrite(t *testing.T) {
|
|||
|
||||
expectedTime := tC.expected["timestamp"].(float64)
|
||||
require.Equal(t, expectedTime, createdFakeIngestor.actualOutputMetric["timestamp"])
|
||||
|
||||
if tC.createTables {
|
||||
createTableString := fmt.Sprintf(createTableCommandExpected, expectedNameOfTable)
|
||||
require.Equal(t, createTableString, tC.client.queries[0])
|
||||
|
||||
createTableMappingString := fmt.Sprintf(createTableMappingCommandExpected, expectedNameOfTable, expectedNameOfTable)
|
||||
require.Equal(t, createTableMappingString, tC.client.queries[1])
|
||||
} else {
|
||||
require.Empty(t, tC.client.queries)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/***/
|
||||
func TestCreateAzureDataExplorerTable(t *testing.T) {
|
||||
serializer, _ := telegrafJson.NewSerializer(time.Second, "", "")
|
||||
plugin := AzureDataExplorer{
|
||||
Endpoint: "someendpoint",
|
||||
Database: "databasename",
|
||||
Log: testutil.Logger{},
|
||||
MetricsGrouping: tablePerMetric,
|
||||
TableName: "test1",
|
||||
CreateTables: false,
|
||||
kustoClient: kusto.NewMockClient(),
|
||||
metricIngestors: map[string]ingest.Ingestor{
|
||||
"test1": &fakeIngestor{},
|
||||
},
|
||||
serializer: serializer,
|
||||
IngestionType: queuedIngestion,
|
||||
}
|
||||
var buf bytes.Buffer
|
||||
log.SetOutput(&buf)
|
||||
defer func() {
|
||||
log.SetOutput(os.Stderr)
|
||||
}()
|
||||
|
||||
err := plugin.createAzureDataExplorerTable(context.Background(), "test1")
|
||||
|
||||
output := buf.String()
|
||||
|
||||
if err == nil && !strings.Contains(output, "skipped table creation") {
|
||||
t.Logf("FAILED : TestCreateAzureDataExplorerTable: Should have skipped table creation.")
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriteWithType(t *testing.T) {
|
||||
metricName := "test1"
|
||||
|
|
@ -307,12 +293,11 @@ func TestWriteWithType(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestInitBlankEndpoint(t *testing.T) {
|
||||
func TestInitBlankEndpointData(t *testing.T) {
|
||||
plugin := AzureDataExplorer{
|
||||
Log: testutil.Logger{},
|
||||
client: &fakeClient{},
|
||||
ingesters: map[string]localIngestor{},
|
||||
createIngestor: createFakeIngestor,
|
||||
Log: testutil.Logger{},
|
||||
kustoClient: kusto.NewMockClient(),
|
||||
metricIngestors: map[string]ingest.Ingestor{},
|
||||
}
|
||||
|
||||
errorInit := plugin.Init()
|
||||
|
|
@ -320,22 +305,10 @@ func TestInitBlankEndpoint(t *testing.T) {
|
|||
require.Equal(t, "Endpoint configuration cannot be empty", errorInit.Error())
|
||||
}
|
||||
|
||||
type fakeClient struct {
|
||||
queries []string
|
||||
internalMgmt func(client *fakeClient, ctx context.Context, db string, query kusto.Stmt, options ...kusto.MgmtOption) (*kusto.RowIterator, error)
|
||||
}
|
||||
|
||||
func (f *fakeClient) Mgmt(ctx context.Context, db string, query kusto.Stmt, options ...kusto.MgmtOption) (*kusto.RowIterator, error) {
|
||||
return f.internalMgmt(f, ctx, db, query, options...)
|
||||
}
|
||||
|
||||
type fakeIngestor struct {
|
||||
actualOutputMetric map[string]interface{}
|
||||
}
|
||||
|
||||
func createFakeIngestor(localClient, string, string) (localIngestor, error) {
|
||||
return &fakeIngestor{}, nil
|
||||
}
|
||||
func (f *fakeIngestor) FromReader(_ context.Context, reader io.Reader, _ ...ingest.FileOption) (*ingest.Result, error) {
|
||||
scanner := bufio.NewScanner(reader)
|
||||
scanner.Scan()
|
||||
|
|
@ -347,6 +320,14 @@ func (f *fakeIngestor) FromReader(_ context.Context, reader io.Reader, _ ...inge
|
|||
return &ingest.Result{}, nil
|
||||
}
|
||||
|
||||
func (f *fakeIngestor) FromFile(_ context.Context, _ string, _ ...ingest.FileOption) (*ingest.Result, error) {
|
||||
return &ingest.Result{}, nil
|
||||
}
|
||||
|
||||
func (f *fakeIngestor) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type mockIngestor struct {
|
||||
records []string
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue