feat: Azure Event Hubs output plugin (#9346)

This commit is contained in:
Thomas Conté 2021-10-18 16:06:35 +02:00 committed by GitHub
parent c4c32025c8
commit e324ef1985
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 362 additions and 4 deletions

4
go.mod
View File

@ -9,11 +9,11 @@ require (
cloud.google.com/go/pubsub v1.17.0
code.cloudfoundry.org/clock v1.0.0 // indirect
collectd.org v0.5.0
github.com/Azure/azure-amqp-common-go/v3 v3.0.1 // indirect
github.com/Azure/azure-amqp-common-go/v3 v3.1.0 // indirect
github.com/Azure/azure-event-hubs-go/v3 v3.3.13
github.com/Azure/azure-kusto-go v0.4.0
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/Azure/azure-sdk-for-go v52.5.0+incompatible // indirect
github.com/Azure/azure-sdk-for-go v55.0.0+incompatible // indirect
github.com/Azure/azure-storage-blob-go v0.14.0 // indirect
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd
github.com/Azure/go-amqp v0.13.12 // indirect

14
go.sum
View File

@ -66,8 +66,10 @@ contrib.go.opencensus.io/exporter/prometheus v0.3.0/go.mod h1:rpCPVQKhiyH8oomWgm
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8=
github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/Azure/azure-amqp-common-go/v3 v3.0.1 h1:mXh+eyOxGLBfqDtfmbtby0l7XfG/6b2NkuZ3B7i6zHA=
github.com/Azure/azure-amqp-common-go/v3 v3.0.1/go.mod h1:PBIGdzcO1teYoufTKMcGibdKaYZv4avS+O6LNIp8bq0=
github.com/Azure/azure-amqp-common-go/v3 v3.0.1/go.mod h1:PBIGdzcO1teYoufTKMcGibdKaYZv4avS+O6LNIp8bq0=
github.com/Azure/azure-amqp-common-go/v3 v3.1.0 h1:1N4YSkWYWffOpQHromYdOucBSQXhNRKzqtgICy6To8Q=
github.com/Azure/azure-amqp-common-go/v3 v3.1.0/go.mod h1:PBIGdzcO1teYoufTKMcGibdKaYZv4avS+O6LNIp8bq0=
github.com/Azure/azure-event-hubs-go/v3 v3.3.13 h1:aiI2RLjp0MzLCuFUXzR8b3h3bdPIc2c3vBYXRK8jX3E=
github.com/Azure/azure-event-hubs-go/v3 v3.3.13/go.mod h1:dJ/WqDn0KEJkNznL9UT/UbXzfmkffCjSNl9x2Y8JI28=
github.com/Azure/azure-kusto-go v0.4.0 h1:CivPswdkVzSXzEjzJTyOJ6e5RhI4IKvaszilyNGvs+A=
@ -80,8 +82,10 @@ github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go v44.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go v52.5.0+incompatible h1:/NLBWHCnIHtZyLPc1P7WIqi4Te4CC23kIQyK3Ep/7lA=
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go v52.5.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go v55.0.0+incompatible h1:L4/vUGbg1Xkw5L20LZD+hJI5I+ibWSytqQ68lTCfLwY=
github.com/Azure/azure-sdk-for-go v55.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-storage-blob-go v0.6.0/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y=
github.com/Azure/azure-storage-blob-go v0.8.0/go.mod h1:lPI3aLPpuLTeUwh1sViKXFxwl2B6teiRqI0deQUvsw0=
github.com/Azure/azure-storage-blob-go v0.14.0 h1:1BCg74AmVdYwO3dlKwtFU1V0wU2PZdREkXvAmZJRUlM=
@ -113,6 +117,7 @@ github.com/Azure/go-autorest/autorest/adal v0.9.0/go.mod h1:/c022QCutn2P7uY+/oQW
github.com/Azure/go-autorest/autorest/adal v0.9.5/go.mod h1:B7KF7jKIeC9Mct5spmyCB/A8CG/sEz1vwIRGv/bbw7A=
github.com/Azure/go-autorest/autorest/adal v0.9.11/go.mod h1:nBKAnTomx8gDtl+3ZCJv2v0KACFHWTB2drffI1B68Pk=
github.com/Azure/go-autorest/autorest/adal v0.9.13/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M=
github.com/Azure/go-autorest/autorest/adal v0.9.13/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M=
github.com/Azure/go-autorest/autorest/adal v0.9.16 h1:P8An8Z9rH1ldbOLdFpxYorgOt2sywL9V24dAwWHPuGc=
github.com/Azure/go-autorest/autorest/adal v0.9.16/go.mod h1:tGMin8I49Yij6AQ+rvV+Xa/zwxYQB5hmsd6DkfAx2+A=
github.com/Azure/go-autorest/autorest/azure/auth v0.4.2/go.mod h1:90gmfKdlmKgfjUpnCEpOJzsUEjrWDSLwHIG73tSXddM=
@ -132,6 +137,8 @@ github.com/Azure/go-autorest/autorest/mocks v0.4.0/go.mod h1:LTp+uSrOhSkaKrUy935
github.com/Azure/go-autorest/autorest/mocks v0.4.1 h1:K0laFcLE6VLTOwNgSxaGbUcLPuGXlNkbVvq4cW4nIHk=
github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k=
github.com/Azure/go-autorest/autorest/to v0.4.0 h1:oXVqrxakqqV1UZdSazDOPOLvOIz+XA683u8EctwboHk=
github.com/Azure/go-autorest/autorest/to v0.4.0 h1:oXVqrxakqqV1UZdSazDOPOLvOIz+XA683u8EctwboHk=
github.com/Azure/go-autorest/autorest/to v0.4.0/go.mod h1:fE8iZBn7LQR7zH/9XU2NcPR4o9jEImooCeWJcYV/zLE=
github.com/Azure/go-autorest/autorest/to v0.4.0/go.mod h1:fE8iZBn7LQR7zH/9XU2NcPR4o9jEImooCeWJcYV/zLE=
github.com/Azure/go-autorest/autorest/validation v0.3.1 h1:AgyqjAd94fwNAoTjl/WQXg4VvFeRFpO+UhNyRXqF1ac=
github.com/Azure/go-autorest/autorest/validation v0.3.1/go.mod h1:yhLgjC0Wda5DYXl6JAsWyUe4KVNffhoDhG0zVzUMo3E=
@ -520,6 +527,8 @@ github.com/coreos/go-systemd v0.0.0-20161114122254-48702e0da86b/go.mod h1:F5haX7
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd/v22 v22.0.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk=
github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk=
@ -2429,6 +2438,7 @@ golang.org/x/sys v0.0.0-20210324051608-47abb6519492/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210426230700-d19ff857e887/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

View File

@ -15,6 +15,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/outputs/discard"
_ "github.com/influxdata/telegraf/plugins/outputs/dynatrace"
_ "github.com/influxdata/telegraf/plugins/outputs/elasticsearch"
_ "github.com/influxdata/telegraf/plugins/outputs/event_hubs"
_ "github.com/influxdata/telegraf/plugins/outputs/exec"
_ "github.com/influxdata/telegraf/plugins/outputs/execd"
_ "github.com/influxdata/telegraf/plugins/outputs/file"

View File

@ -373,3 +373,15 @@ func TestWrite(t *testing.T) {
})
}
}
func TestMain(m *testing.M) {
// Set up a fake environment for adal.getMSIType()
// Root cause: https://github.com/Azure/go-autorest/commit/def88ef859fb980eff240c755a70597bc9b490d0
err := os.Setenv("MSI_ENDPOINT", "fake.endpoint")
if err != nil {
panic(err)
}
os.Exit(m.Run())
}

View File

@ -0,0 +1,25 @@
# Azure Event Hubs output plugin
This plugin for [Azure Event Hubs](https://azure.microsoft.com/en-gb/services/event-hubs/) will send metrics to a single Event Hub within an Event Hubs namespace. Metrics are sent as message batches, each message payload containing one metric object. The messages do not specify a partition key, and will thus be automatically load-balanced (round-robin) across all the Event Hub partitions.
## Metrics
The plugin uses the Telegraf serializers to format the metric data sent in the message payloads. You can select any of the supported output formats, although JSON is probably the easiest to integrate with downstream components.
## Configuration
```toml
[[ outputs.event_hubs ]]
## The full connection string to the Event Hub (required)
## The shared access key must have "Send" permissions on the target Event Hub.
connection_string = "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName"
## Client timeout (defaults to 30s)
# timeout = "30s"
## Data format to output.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "json"
```

View File

@ -0,0 +1,148 @@
package event_hubs
import (
"context"
"time"
eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
)
/*
** Wrapper interface for eventhub.Hub
*/
type EventHubInterface interface {
GetHub(s string) error
Close(ctx context.Context) error
SendBatch(ctx context.Context, iterator eventhub.BatchIterator, opts ...eventhub.BatchOption) error
}
type eventHub struct {
hub *eventhub.Hub
}
func (eh *eventHub) GetHub(s string) error {
hub, err := eventhub.NewHubFromConnectionString(s)
if err != nil {
return err
}
eh.hub = hub
return nil
}
func (eh *eventHub) Close(ctx context.Context) error {
return eh.hub.Close(ctx)
}
func (eh *eventHub) SendBatch(ctx context.Context, iterator eventhub.BatchIterator, opts ...eventhub.BatchOption) error {
return eh.hub.SendBatch(ctx, iterator, opts...)
}
/* End wrapper interface */
type EventHubs struct {
Log telegraf.Logger `toml:"-"`
ConnectionString string `toml:"connection_string"`
Timeout config.Duration
Hub EventHubInterface
serializer serializers.Serializer
}
const (
defaultRequestTimeout = time.Second * 30
)
func (e *EventHubs) Description() string {
return "Configuration for Event Hubs output plugin"
}
func (e *EventHubs) SampleConfig() string {
return `
## The full connection string to the Event Hub (required)
## The shared access key must have "Send" permissions on the target Event Hub.
connection_string = "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName"
## Client timeout (defaults to 30s)
# timeout = "30s"
## Data format to output.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "json"
`
}
func (e *EventHubs) Init() error {
err := e.Hub.GetHub(e.ConnectionString)
if err != nil {
return err
}
return nil
}
func (e *EventHubs) Connect() error {
return nil
}
func (e *EventHubs) Close() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout))
defer cancel()
err := e.Hub.Close(ctx)
if err != nil {
return err
}
return nil
}
func (e *EventHubs) SetSerializer(serializer serializers.Serializer) {
e.serializer = serializer
}
func (e *EventHubs) Write(metrics []telegraf.Metric) error {
var events []*eventhub.Event
for _, metric := range metrics {
payload, err := e.serializer.Serialize(metric)
if err != nil {
e.Log.Debugf("Could not serialize metric: %v", err)
continue
}
events = append(events, eventhub.NewEvent(payload))
}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout))
defer cancel()
err := e.Hub.SendBatch(ctx, eventhub.NewEventBatchIterator(events...))
if err != nil {
return err
}
return nil
}
func init() {
outputs.Add("event_hubs", func() telegraf.Output {
return &EventHubs{
Hub: &eventHub{},
Timeout: config.Duration(defaultRequestTimeout),
}
})
}

View File

@ -0,0 +1,162 @@
package event_hubs
import (
"context"
"fmt"
"math/rand"
"os"
"testing"
"time"
eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/serializers/json"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
/*
** Wrapper interface mock for eventhub.Hub
*/
type mockEventHub struct {
mock.Mock
}
func (eh *mockEventHub) GetHub(s string) error {
args := eh.Called(s)
return args.Error(0)
}
func (eh *mockEventHub) Close(ctx context.Context) error {
args := eh.Called(ctx)
return args.Error(0)
}
func (eh *mockEventHub) SendBatch(ctx context.Context, iterator eventhub.BatchIterator, opts ...eventhub.BatchOption) error {
args := eh.Called(ctx, iterator, opts)
return args.Error(0)
}
/* End wrapper interface */
func TestInitAndWrite(t *testing.T) {
serializer, _ := json.NewSerializer(time.Second, "")
mockHub := &mockEventHub{}
e := &EventHubs{
Hub: mockHub,
ConnectionString: "mock",
Timeout: config.Duration(time.Second * 5),
serializer: serializer,
}
mockHub.On("GetHub", mock.Anything).Return(nil).Once()
err := e.Init()
require.NoError(t, err)
mockHub.AssertExpectations(t)
metrics := testutil.MockMetrics()
mockHub.On("SendBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
err = e.Write(metrics)
require.NoError(t, err)
mockHub.AssertExpectations(t)
}
/*
** Integration test (requires an Event Hubs instance)
*/
func TestInitAndWriteIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
if os.Getenv("EVENTHUB_CONNECTION_STRING") == "" {
t.Skip("Missing environment variable EVENTHUB_CONNECTION_STRING")
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
// Create a new, empty Event Hub
// NB: for this to work, the connection string needs to grant "Manage" permissions on the root namespace
mHub, err := eventhub.NewHubManagerFromConnectionString(os.Getenv("EVENTHUB_CONNECTION_STRING"))
require.NoError(t, err)
r := rand.New(rand.NewSource(time.Now().UnixNano()))
name := fmt.Sprintf("testmetrics%05d", r.Intn(10000))
entity, err := mHub.Put(ctx, name, eventhub.HubWithPartitionCount(1))
require.NoError(t, err)
// Delete the test hub
defer func() {
err := mHub.Delete(ctx, entity.Name)
require.NoError(t, err)
}()
testHubCS := os.Getenv("EVENTHUB_CONNECTION_STRING") + ";EntityPath=" + entity.Name
// Configure the plugin to target the newly created hub
serializer, _ := json.NewSerializer(time.Second, "")
e := &EventHubs{
Hub: &eventHub{},
ConnectionString: testHubCS,
Timeout: config.Duration(time.Second * 5),
serializer: serializer,
}
// Verify that we can connect to Event Hubs
err = e.Init()
require.NoError(t, err)
// Verify that we can successfully write data to Event Hubs
metrics := testutil.MockMetrics()
err = e.Write(metrics)
require.NoError(t, err)
/*
** Verify we can read data back from the test hub
*/
exit := make(chan string)
// Create a hub client for receiving
hub, err := eventhub.NewHubFromConnectionString(testHubCS)
require.NoError(t, err)
// The handler function will pass received messages via the channel
handler := func(ctx context.Context, event *eventhub.Event) error {
exit <- string(event.Data)
return nil
}
// Set up the receivers
runtimeInfo, err := hub.GetRuntimeInformation(ctx)
require.NoError(t, err)
for _, partitionID := range runtimeInfo.PartitionIDs {
_, err := hub.Receive(ctx, partitionID, handler, eventhub.ReceiveWithStartingOffset("-1"))
require.NoError(t, err)
}
// Wait to receive the same number of messages sent, with timeout
received := 0
wait:
for _, metric := range metrics {
select {
case m := <-exit:
t.Logf("Received for %s: %s", metric.Name(), m)
received = received + 1
case <-time.After(10 * time.Second):
t.Logf("Timeout")
break wait
}
}
// Make sure received == sent
require.Equal(t, received, len(metrics))
}