telegraf/plugins/outputs/event_hubs/event_hubs_test.go

159 lines
4.0 KiB
Go
Raw Normal View History

package event_hubs
import (
"context"
"fmt"
"math/rand"
"os"
"testing"
"time"
eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/serializers/json"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
/*
** Wrapper interface mock for eventhub.Hub
*/
type mockEventHub struct {
mock.Mock
}
func (eh *mockEventHub) GetHub(s string) error {
args := eh.Called(s)
return args.Error(0)
}
func (eh *mockEventHub) Close(ctx context.Context) error {
args := eh.Called(ctx)
return args.Error(0)
}
func (eh *mockEventHub) SendBatch(ctx context.Context, iterator eventhub.BatchIterator, opts ...eventhub.BatchOption) error {
args := eh.Called(ctx, iterator, opts)
return args.Error(0)
}
/* End wrapper interface */
func TestInitAndWrite(t *testing.T) {
serializer, _ := json.NewSerializer(time.Second, "")
mockHub := &mockEventHub{}
e := &EventHubs{
Hub: mockHub,
ConnectionString: "mock",
Timeout: config.Duration(time.Second * 5),
serializer: serializer,
}
mockHub.On("GetHub", mock.Anything).Return(nil).Once()
err := e.Init()
require.NoError(t, err)
mockHub.AssertExpectations(t)
metrics := testutil.MockMetrics()
mockHub.On("SendBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
err = e.Write(metrics)
require.NoError(t, err)
mockHub.AssertExpectations(t)
}
/*
** Integration test (requires an Event Hubs instance)
*/
func TestInitAndWriteIntegration(t *testing.T) {
if os.Getenv("EVENTHUB_CONNECTION_STRING") == "" {
t.Skip("Missing environment variable EVENTHUB_CONNECTION_STRING")
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
// Create a new, empty Event Hub
// NB: for this to work, the connection string needs to grant "Manage" permissions on the root namespace
mHub, err := eventhub.NewHubManagerFromConnectionString(os.Getenv("EVENTHUB_CONNECTION_STRING"))
require.NoError(t, err)
r := rand.New(rand.NewSource(time.Now().UnixNano()))
name := fmt.Sprintf("testmetrics%05d", r.Intn(10000))
entity, err := mHub.Put(ctx, name, eventhub.HubWithPartitionCount(1))
require.NoError(t, err)
// Delete the test hub
defer func() {
err := mHub.Delete(ctx, entity.Name)
require.NoError(t, err)
}()
testHubCS := os.Getenv("EVENTHUB_CONNECTION_STRING") + ";EntityPath=" + entity.Name
// Configure the plugin to target the newly created hub
serializer, _ := json.NewSerializer(time.Second, "")
e := &EventHubs{
Hub: &eventHub{},
ConnectionString: testHubCS,
Timeout: config.Duration(time.Second * 5),
serializer: serializer,
}
// Verify that we can connect to Event Hubs
err = e.Init()
require.NoError(t, err)
// Verify that we can successfully write data to Event Hubs
metrics := testutil.MockMetrics()
err = e.Write(metrics)
require.NoError(t, err)
/*
** Verify we can read data back from the test hub
*/
exit := make(chan string)
// Create a hub client for receiving
hub, err := eventhub.NewHubFromConnectionString(testHubCS)
require.NoError(t, err)
// The handler function will pass received messages via the channel
handler := func(ctx context.Context, event *eventhub.Event) error {
exit <- string(event.Data)
return nil
}
// Set up the receivers
runtimeInfo, err := hub.GetRuntimeInformation(ctx)
require.NoError(t, err)
for _, partitionID := range runtimeInfo.PartitionIDs {
_, err := hub.Receive(ctx, partitionID, handler, eventhub.ReceiveWithStartingOffset("-1"))
require.NoError(t, err)
}
// Wait to receive the same number of messages sent, with timeout
received := 0
wait:
for _, metric := range metrics {
select {
case m := <-exit:
t.Logf("Received for %s: %s", metric.Name(), m)
received = received + 1
case <-time.After(10 * time.Second):
t.Logf("Timeout")
break wait
}
}
// Make sure received == sent
require.Equal(t, received, len(metrics))
}