From e324ef19851a7dc1fc71b0de5d3d2846547f2c43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Thomas=20Cont=C3=A9?= <199027+tomconte@users.noreply.github.com> Date: Mon, 18 Oct 2021 16:06:35 +0200 Subject: [PATCH] feat: Azure Event Hubs output plugin (#9346) --- go.mod | 4 +- go.sum | 14 +- plugins/outputs/all/all.go | 1 + .../azure_monitor/azure_monitor_test.go | 12 ++ plugins/outputs/event_hubs/README.md | 25 +++ plugins/outputs/event_hubs/event_hubs.go | 148 ++++++++++++++++ plugins/outputs/event_hubs/event_hubs_test.go | 162 ++++++++++++++++++ 7 files changed, 362 insertions(+), 4 deletions(-) create mode 100644 plugins/outputs/event_hubs/README.md create mode 100644 plugins/outputs/event_hubs/event_hubs.go create mode 100644 plugins/outputs/event_hubs/event_hubs_test.go diff --git a/go.mod b/go.mod index 4933a0a4a..95992243e 100644 --- a/go.mod +++ b/go.mod @@ -9,11 +9,11 @@ require ( cloud.google.com/go/pubsub v1.17.0 code.cloudfoundry.org/clock v1.0.0 // indirect collectd.org v0.5.0 - github.com/Azure/azure-amqp-common-go/v3 v3.0.1 // indirect + github.com/Azure/azure-amqp-common-go/v3 v3.1.0 // indirect github.com/Azure/azure-event-hubs-go/v3 v3.3.13 github.com/Azure/azure-kusto-go v0.4.0 github.com/Azure/azure-pipeline-go v0.2.3 // indirect - github.com/Azure/azure-sdk-for-go v52.5.0+incompatible // indirect + github.com/Azure/azure-sdk-for-go v55.0.0+incompatible // indirect github.com/Azure/azure-storage-blob-go v0.14.0 // indirect github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd github.com/Azure/go-amqp v0.13.12 // indirect diff --git a/go.sum b/go.sum index 6b55cd79a..db4f0ad82 100644 --- a/go.sum +++ b/go.sum @@ -66,8 +66,10 @@ contrib.go.opencensus.io/exporter/prometheus v0.3.0/go.mod h1:rpCPVQKhiyH8oomWgm dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8= github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= -github.com/Azure/azure-amqp-common-go/v3 v3.0.1 h1:mXh+eyOxGLBfqDtfmbtby0l7XfG/6b2NkuZ3B7i6zHA= github.com/Azure/azure-amqp-common-go/v3 v3.0.1/go.mod h1:PBIGdzcO1teYoufTKMcGibdKaYZv4avS+O6LNIp8bq0= +github.com/Azure/azure-amqp-common-go/v3 v3.0.1/go.mod h1:PBIGdzcO1teYoufTKMcGibdKaYZv4avS+O6LNIp8bq0= +github.com/Azure/azure-amqp-common-go/v3 v3.1.0 h1:1N4YSkWYWffOpQHromYdOucBSQXhNRKzqtgICy6To8Q= +github.com/Azure/azure-amqp-common-go/v3 v3.1.0/go.mod h1:PBIGdzcO1teYoufTKMcGibdKaYZv4avS+O6LNIp8bq0= github.com/Azure/azure-event-hubs-go/v3 v3.3.13 h1:aiI2RLjp0MzLCuFUXzR8b3h3bdPIc2c3vBYXRK8jX3E= github.com/Azure/azure-event-hubs-go/v3 v3.3.13/go.mod h1:dJ/WqDn0KEJkNznL9UT/UbXzfmkffCjSNl9x2Y8JI28= github.com/Azure/azure-kusto-go v0.4.0 h1:CivPswdkVzSXzEjzJTyOJ6e5RhI4IKvaszilyNGvs+A= @@ -80,8 +82,10 @@ github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-sdk-for-go v44.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= -github.com/Azure/azure-sdk-for-go v52.5.0+incompatible h1:/NLBWHCnIHtZyLPc1P7WIqi4Te4CC23kIQyK3Ep/7lA= +github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-sdk-for-go v52.5.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= +github.com/Azure/azure-sdk-for-go v55.0.0+incompatible h1:L4/vUGbg1Xkw5L20LZD+hJI5I+ibWSytqQ68lTCfLwY= +github.com/Azure/azure-sdk-for-go v55.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-storage-blob-go v0.6.0/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y= github.com/Azure/azure-storage-blob-go v0.8.0/go.mod h1:lPI3aLPpuLTeUwh1sViKXFxwl2B6teiRqI0deQUvsw0= github.com/Azure/azure-storage-blob-go v0.14.0 h1:1BCg74AmVdYwO3dlKwtFU1V0wU2PZdREkXvAmZJRUlM= @@ -113,6 +117,7 @@ github.com/Azure/go-autorest/autorest/adal v0.9.0/go.mod h1:/c022QCutn2P7uY+/oQW github.com/Azure/go-autorest/autorest/adal v0.9.5/go.mod h1:B7KF7jKIeC9Mct5spmyCB/A8CG/sEz1vwIRGv/bbw7A= github.com/Azure/go-autorest/autorest/adal v0.9.11/go.mod h1:nBKAnTomx8gDtl+3ZCJv2v0KACFHWTB2drffI1B68Pk= github.com/Azure/go-autorest/autorest/adal v0.9.13/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M= +github.com/Azure/go-autorest/autorest/adal v0.9.13/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M= github.com/Azure/go-autorest/autorest/adal v0.9.16 h1:P8An8Z9rH1ldbOLdFpxYorgOt2sywL9V24dAwWHPuGc= github.com/Azure/go-autorest/autorest/adal v0.9.16/go.mod h1:tGMin8I49Yij6AQ+rvV+Xa/zwxYQB5hmsd6DkfAx2+A= github.com/Azure/go-autorest/autorest/azure/auth v0.4.2/go.mod h1:90gmfKdlmKgfjUpnCEpOJzsUEjrWDSLwHIG73tSXddM= @@ -132,6 +137,8 @@ github.com/Azure/go-autorest/autorest/mocks v0.4.0/go.mod h1:LTp+uSrOhSkaKrUy935 github.com/Azure/go-autorest/autorest/mocks v0.4.1 h1:K0laFcLE6VLTOwNgSxaGbUcLPuGXlNkbVvq4cW4nIHk= github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k= github.com/Azure/go-autorest/autorest/to v0.4.0 h1:oXVqrxakqqV1UZdSazDOPOLvOIz+XA683u8EctwboHk= +github.com/Azure/go-autorest/autorest/to v0.4.0 h1:oXVqrxakqqV1UZdSazDOPOLvOIz+XA683u8EctwboHk= +github.com/Azure/go-autorest/autorest/to v0.4.0/go.mod h1:fE8iZBn7LQR7zH/9XU2NcPR4o9jEImooCeWJcYV/zLE= github.com/Azure/go-autorest/autorest/to v0.4.0/go.mod h1:fE8iZBn7LQR7zH/9XU2NcPR4o9jEImooCeWJcYV/zLE= github.com/Azure/go-autorest/autorest/validation v0.3.1 h1:AgyqjAd94fwNAoTjl/WQXg4VvFeRFpO+UhNyRXqF1ac= github.com/Azure/go-autorest/autorest/validation v0.3.1/go.mod h1:yhLgjC0Wda5DYXl6JAsWyUe4KVNffhoDhG0zVzUMo3E= @@ -520,6 +527,8 @@ github.com/coreos/go-systemd v0.0.0-20161114122254-48702e0da86b/go.mod h1:F5haX7 github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd/v22 v22.0.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk= github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk= @@ -2429,6 +2438,7 @@ golang.org/x/sys v0.0.0-20210324051608-47abb6519492/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210426230700-d19ff857e887/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index 7248b4ddc..33b2f92dd 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -15,6 +15,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/discard" _ "github.com/influxdata/telegraf/plugins/outputs/dynatrace" _ "github.com/influxdata/telegraf/plugins/outputs/elasticsearch" + _ "github.com/influxdata/telegraf/plugins/outputs/event_hubs" _ "github.com/influxdata/telegraf/plugins/outputs/exec" _ "github.com/influxdata/telegraf/plugins/outputs/execd" _ "github.com/influxdata/telegraf/plugins/outputs/file" diff --git a/plugins/outputs/azure_monitor/azure_monitor_test.go b/plugins/outputs/azure_monitor/azure_monitor_test.go index 803b0441a..db8243e82 100644 --- a/plugins/outputs/azure_monitor/azure_monitor_test.go +++ b/plugins/outputs/azure_monitor/azure_monitor_test.go @@ -373,3 +373,15 @@ func TestWrite(t *testing.T) { }) } } + +func TestMain(m *testing.M) { + // Set up a fake environment for adal.getMSIType() + // Root cause: https://github.com/Azure/go-autorest/commit/def88ef859fb980eff240c755a70597bc9b490d0 + err := os.Setenv("MSI_ENDPOINT", "fake.endpoint") + + if err != nil { + panic(err) + } + + os.Exit(m.Run()) +} diff --git a/plugins/outputs/event_hubs/README.md b/plugins/outputs/event_hubs/README.md new file mode 100644 index 000000000..c71c06f99 --- /dev/null +++ b/plugins/outputs/event_hubs/README.md @@ -0,0 +1,25 @@ +# Azure Event Hubs output plugin + +This plugin for [Azure Event Hubs](https://azure.microsoft.com/en-gb/services/event-hubs/) will send metrics to a single Event Hub within an Event Hubs namespace. Metrics are sent as message batches, each message payload containing one metric object. The messages do not specify a partition key, and will thus be automatically load-balanced (round-robin) across all the Event Hub partitions. + +## Metrics + +The plugin uses the Telegraf serializers to format the metric data sent in the message payloads. You can select any of the supported output formats, although JSON is probably the easiest to integrate with downstream components. + +## Configuration + +```toml +[[ outputs.event_hubs ]] +## The full connection string to the Event Hub (required) +## The shared access key must have "Send" permissions on the target Event Hub. +connection_string = "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName" + +## Client timeout (defaults to 30s) +# timeout = "30s" + +## Data format to output. +## Each data format has its own unique set of configuration options, read +## more about them here: +## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md +data_format = "json" +``` diff --git a/plugins/outputs/event_hubs/event_hubs.go b/plugins/outputs/event_hubs/event_hubs.go new file mode 100644 index 000000000..3c87a84fb --- /dev/null +++ b/plugins/outputs/event_hubs/event_hubs.go @@ -0,0 +1,148 @@ +package event_hubs + +import ( + "context" + "time" + + eventhub "github.com/Azure/azure-event-hubs-go/v3" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf/plugins/serializers" +) + +/* +** Wrapper interface for eventhub.Hub + */ + +type EventHubInterface interface { + GetHub(s string) error + Close(ctx context.Context) error + SendBatch(ctx context.Context, iterator eventhub.BatchIterator, opts ...eventhub.BatchOption) error +} + +type eventHub struct { + hub *eventhub.Hub +} + +func (eh *eventHub) GetHub(s string) error { + hub, err := eventhub.NewHubFromConnectionString(s) + + if err != nil { + return err + } + + eh.hub = hub + + return nil +} + +func (eh *eventHub) Close(ctx context.Context) error { + return eh.hub.Close(ctx) +} + +func (eh *eventHub) SendBatch(ctx context.Context, iterator eventhub.BatchIterator, opts ...eventhub.BatchOption) error { + return eh.hub.SendBatch(ctx, iterator, opts...) +} + +/* End wrapper interface */ + +type EventHubs struct { + Log telegraf.Logger `toml:"-"` + ConnectionString string `toml:"connection_string"` + Timeout config.Duration + + Hub EventHubInterface + serializer serializers.Serializer +} + +const ( + defaultRequestTimeout = time.Second * 30 +) + +func (e *EventHubs) Description() string { + return "Configuration for Event Hubs output plugin" +} + +func (e *EventHubs) SampleConfig() string { + return ` + ## The full connection string to the Event Hub (required) + ## The shared access key must have "Send" permissions on the target Event Hub. + connection_string = "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName" + + ## Client timeout (defaults to 30s) + # timeout = "30s" + + ## Data format to output. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + data_format = "json" +` +} + +func (e *EventHubs) Init() error { + err := e.Hub.GetHub(e.ConnectionString) + + if err != nil { + return err + } + + return nil +} + +func (e *EventHubs) Connect() error { + return nil +} + +func (e *EventHubs) Close() error { + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout)) + defer cancel() + + err := e.Hub.Close(ctx) + + if err != nil { + return err + } + + return nil +} + +func (e *EventHubs) SetSerializer(serializer serializers.Serializer) { + e.serializer = serializer +} + +func (e *EventHubs) Write(metrics []telegraf.Metric) error { + var events []*eventhub.Event + + for _, metric := range metrics { + payload, err := e.serializer.Serialize(metric) + + if err != nil { + e.Log.Debugf("Could not serialize metric: %v", err) + continue + } + + events = append(events, eventhub.NewEvent(payload)) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout)) + defer cancel() + + err := e.Hub.SendBatch(ctx, eventhub.NewEventBatchIterator(events...)) + + if err != nil { + return err + } + + return nil +} + +func init() { + outputs.Add("event_hubs", func() telegraf.Output { + return &EventHubs{ + Hub: &eventHub{}, + Timeout: config.Duration(defaultRequestTimeout), + } + }) +} diff --git a/plugins/outputs/event_hubs/event_hubs_test.go b/plugins/outputs/event_hubs/event_hubs_test.go new file mode 100644 index 000000000..9b17aef60 --- /dev/null +++ b/plugins/outputs/event_hubs/event_hubs_test.go @@ -0,0 +1,162 @@ +package event_hubs + +import ( + "context" + "fmt" + "math/rand" + "os" + "testing" + "time" + + eventhub "github.com/Azure/azure-event-hubs-go/v3" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/plugins/serializers/json" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +/* +** Wrapper interface mock for eventhub.Hub + */ + +type mockEventHub struct { + mock.Mock +} + +func (eh *mockEventHub) GetHub(s string) error { + args := eh.Called(s) + return args.Error(0) +} + +func (eh *mockEventHub) Close(ctx context.Context) error { + args := eh.Called(ctx) + return args.Error(0) +} + +func (eh *mockEventHub) SendBatch(ctx context.Context, iterator eventhub.BatchIterator, opts ...eventhub.BatchOption) error { + args := eh.Called(ctx, iterator, opts) + return args.Error(0) +} + +/* End wrapper interface */ + +func TestInitAndWrite(t *testing.T) { + serializer, _ := json.NewSerializer(time.Second, "") + mockHub := &mockEventHub{} + e := &EventHubs{ + Hub: mockHub, + ConnectionString: "mock", + Timeout: config.Duration(time.Second * 5), + serializer: serializer, + } + + mockHub.On("GetHub", mock.Anything).Return(nil).Once() + err := e.Init() + require.NoError(t, err) + mockHub.AssertExpectations(t) + + metrics := testutil.MockMetrics() + + mockHub.On("SendBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() + err = e.Write(metrics) + require.NoError(t, err) + mockHub.AssertExpectations(t) +} + +/* +** Integration test (requires an Event Hubs instance) + */ + +func TestInitAndWriteIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + if os.Getenv("EVENTHUB_CONNECTION_STRING") == "" { + t.Skip("Missing environment variable EVENTHUB_CONNECTION_STRING") + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + + // Create a new, empty Event Hub + // NB: for this to work, the connection string needs to grant "Manage" permissions on the root namespace + mHub, err := eventhub.NewHubManagerFromConnectionString(os.Getenv("EVENTHUB_CONNECTION_STRING")) + require.NoError(t, err) + + r := rand.New(rand.NewSource(time.Now().UnixNano())) + name := fmt.Sprintf("testmetrics%05d", r.Intn(10000)) + + entity, err := mHub.Put(ctx, name, eventhub.HubWithPartitionCount(1)) + require.NoError(t, err) + + // Delete the test hub + defer func() { + err := mHub.Delete(ctx, entity.Name) + require.NoError(t, err) + }() + + testHubCS := os.Getenv("EVENTHUB_CONNECTION_STRING") + ";EntityPath=" + entity.Name + + // Configure the plugin to target the newly created hub + serializer, _ := json.NewSerializer(time.Second, "") + + e := &EventHubs{ + Hub: &eventHub{}, + ConnectionString: testHubCS, + Timeout: config.Duration(time.Second * 5), + serializer: serializer, + } + + // Verify that we can connect to Event Hubs + err = e.Init() + require.NoError(t, err) + + // Verify that we can successfully write data to Event Hubs + metrics := testutil.MockMetrics() + err = e.Write(metrics) + require.NoError(t, err) + + /* + ** Verify we can read data back from the test hub + */ + + exit := make(chan string) + + // Create a hub client for receiving + hub, err := eventhub.NewHubFromConnectionString(testHubCS) + require.NoError(t, err) + + // The handler function will pass received messages via the channel + handler := func(ctx context.Context, event *eventhub.Event) error { + exit <- string(event.Data) + return nil + } + + // Set up the receivers + runtimeInfo, err := hub.GetRuntimeInformation(ctx) + require.NoError(t, err) + + for _, partitionID := range runtimeInfo.PartitionIDs { + _, err := hub.Receive(ctx, partitionID, handler, eventhub.ReceiveWithStartingOffset("-1")) + require.NoError(t, err) + } + + // Wait to receive the same number of messages sent, with timeout + received := 0 +wait: + for _, metric := range metrics { + select { + case m := <-exit: + t.Logf("Received for %s: %s", metric.Name(), m) + received = received + 1 + case <-time.After(10 * time.Second): + t.Logf("Timeout") + break wait + } + } + + // Make sure received == sent + require.Equal(t, received, len(metrics)) +}