diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index 37a66068c..f12064e17 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -22,6 +22,7 @@ following works: - github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor [MIT License](https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/resourcemanager/monitor/armmonitor/LICENSE.txt) - github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources [MIT License](https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/resourcemanager/resources/armresources/LICENSE.txt) - github.com/Azure/azure-sdk-for-go/sdk/storage/azblob [MIT License](https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/storage/azblob/LICENSE.txt) +- github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue [MIT License](https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/storage/azqueue/LICENSE.txt) - github.com/Azure/azure-storage-queue-go [MIT License](https://github.com/Azure/azure-storage-queue-go/blob/master/LICENSE) - github.com/Azure/go-amqp [MIT License](https://github.com/Azure/go-amqp/blob/master/LICENSE) - github.com/Azure/go-ansiterm [MIT License](https://github.com/Azure/go-ansiterm/blob/master/LICENSE) diff --git a/go.mod b/go.mod index efbdb140c..5b0759d2e 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.3.2 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.11.0 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0 - github.com/Azure/azure-storage-queue-go v0.0.0-20230531184854-c06a8eff66fe + github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue v1.0.0 github.com/Azure/go-autorest/autorest v0.11.30 github.com/Azure/go-autorest/autorest/adal v0.9.24 github.com/Azure/go-autorest/autorest/azure/auth v0.5.13 @@ -261,6 +261,7 @@ require ( github.com/Azure/azure-sdk-for-go v68.0.0+incompatible // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.0 // indirect + github.com/Azure/azure-storage-queue-go v0.0.0-20230531184854-c06a8eff66fe // indirect github.com/Azure/go-amqp v1.4.0 // indirect github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect github.com/Azure/go-autorest v14.2.0+incompatible // indirect diff --git a/plugins/inputs/azure_storage_queue/README.md b/plugins/inputs/azure_storage_queue/README.md index 0e3e04b48..1467b7321 100644 --- a/plugins/inputs/azure_storage_queue/README.md +++ b/plugins/inputs/azure_storage_queue/README.md @@ -23,13 +23,11 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ```toml @sample.conf # Gather Azure Storage Queue metrics [[inputs.azure_storage_queue]] - ## Required Azure Storage Account name + ## Azure Storage Account name and shared access key (required) account_name = "mystorageaccount" - - ## Required Azure Storage Account access key account_key = "storageaccountaccesskey" - ## Set to false to disable peeking age of oldest message (executes faster) + ## Disable peeking age of oldest message (faster) # peek_oldest_message_age = true ``` diff --git a/plugins/inputs/azure_storage_queue/azure_storage_queue.go b/plugins/inputs/azure_storage_queue/azure_storage_queue.go index d8100e7a5..60c29994c 100644 --- a/plugins/inputs/azure_storage_queue/azure_storage_queue.go +++ b/plugins/inputs/azure_storage_queue/azure_storage_queue.go @@ -5,11 +5,11 @@ import ( "context" _ "embed" "errors" - "net/url" + "fmt" "strings" "time" - "github.com/Azure/azure-storage-queue-go/azqueue" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" @@ -19,12 +19,13 @@ import ( var sampleConfig string type AzureStorageQueue struct { + EndpointURL string `toml:"endpoint"` StorageAccountName string `toml:"account_name"` StorageAccountKey string `toml:"account_key"` PeekOldestMessageAge bool `toml:"peek_oldest_message_age"` Log telegraf.Logger - serviceURL *azqueue.ServiceURL + client *azqueue.ServiceClient } func (*AzureStorageQueue) SampleConfig() string { @@ -32,6 +33,7 @@ func (*AzureStorageQueue) SampleConfig() string { } func (a *AzureStorageQueue) Init() error { + // Check settings if a.StorageAccountName == "" { return errors.New("account_name must be configured") } @@ -39,90 +41,81 @@ func (a *AzureStorageQueue) Init() error { if a.StorageAccountKey == "" { return errors.New("account_key must be configured") } + + // Prepare the client + if a.EndpointURL == "" { + a.EndpointURL = "https://" + a.StorageAccountName + ".queue.core.windows.net" + } + credentials, err := azqueue.NewSharedKeyCredential(a.StorageAccountName, a.StorageAccountKey) + if err != nil { + return fmt.Errorf("creating shared-key credentials failed: %w", err) + } + + client, err := azqueue.NewServiceClientWithSharedKeyCredential(a.EndpointURL, credentials, nil) + if err != nil { + return fmt.Errorf("creating client failed: %w", err) + } + a.client = client + return nil } func (a *AzureStorageQueue) Gather(acc telegraf.Accumulator) error { - serviceURL, err := a.getServiceURL() - if err != nil { - return err - } + ctx := context.Background() - ctx := context.TODO() + a.Log.Debugf("Listing queues of storage account %q", a.StorageAccountName) - for marker := (azqueue.Marker{}); marker.NotDone(); { - a.Log.Debugf("Listing queues of storage account %q", a.StorageAccountName) - queuesSegment, err := serviceURL.ListQueuesSegment(ctx, marker, - azqueue.ListQueuesSegmentOptions{ - Detail: azqueue.ListQueuesSegmentDetails{Metadata: false}, - }) + // Iterate through the queues and generate metrics + pages := a.client.NewListQueuesPager(nil) + for pages.More() { + response, err := pages.NextPage(ctx) if err != nil { - return err + return fmt.Errorf("getting next page failed: %w", err) } - marker = queuesSegment.NextMarker - for _, queueItem := range queuesSegment.QueueItems { - a.Log.Debugf("Processing queue %q of storage account %q", queueItem.Name, a.StorageAccountName) - queueURL := serviceURL.NewQueueURL(queueItem.Name) - properties, err := queueURL.GetProperties(ctx) - if err != nil { - a.Log.Errorf("Error getting properties for queue %s: %s", queueItem.Name, err.Error()) + // Get the properties and the message properties for each of the queues + for _, queue := range response.Queues { + if queue.Name == nil { continue } - var peekedMessage *azqueue.PeekedMessage - if a.PeekOldestMessageAge { - messagesURL := queueURL.NewMessagesURL() - messagesResponse, err := messagesURL.Peek(ctx, 1) - if err != nil { - a.Log.Errorf("Error peeking queue %s: %s", queueItem.Name, err.Error()) - } else if messagesResponse.NumMessages() > 0 { - peekedMessage = messagesResponse.Message(0) - } + name := strings.TrimSpace(*queue.Name) + + // Access the queue and get the properties + c := a.client.NewQueueClient(*queue.Name) + props, err := c.GetProperties(ctx, nil) + if err != nil { + acc.AddError(fmt.Errorf("getting properties for queue %q failed: %w", name, err)) + continue + } + if props.ApproximateMessagesCount == nil { + acc.AddError(fmt.Errorf("unset message count for queue %q", name)) + continue } - a.gatherQueueMetrics(acc, queueItem, properties, peekedMessage) + // Setup the metric elements + tags := map[string]string{ + "account": a.StorageAccountName, + "queue": strings.TrimSpace(name), + } + fields := map[string]interface{}{ + "size": *props.ApproximateMessagesCount, + } + now := time.Now() + if a.PeekOldestMessageAge { + if r, err := c.PeekMessage(ctx, nil); err != nil { + acc.AddError(fmt.Errorf("peeking message for queue %q failed: %w", name, err)) + } else if len(r.Messages) > 0 && r.Messages[0] != nil && r.Messages[0].InsertionTime != nil { + msg := r.Messages[0] + fields["oldest_message_age_ns"] = now.Sub(*msg.InsertionTime).Nanoseconds() + } + } + acc.AddFields("azure_storage_queues", fields, tags, now) } } + return nil } -func (a *AzureStorageQueue) getServiceURL() (azqueue.ServiceURL, error) { - if a.serviceURL == nil { - _url, err := url.Parse("https://" + a.StorageAccountName + ".queue.core.windows.net") - if err != nil { - return azqueue.ServiceURL{}, err - } - - credential, err := azqueue.NewSharedKeyCredential(a.StorageAccountName, a.StorageAccountKey) - if err != nil { - return azqueue.ServiceURL{}, err - } - - pipeline := azqueue.NewPipeline(credential, azqueue.PipelineOptions{}) - - serviceURL := azqueue.NewServiceURL(*_url, pipeline) - a.serviceURL = &serviceURL - } - return *a.serviceURL, nil -} - -func (a *AzureStorageQueue) gatherQueueMetrics( - acc telegraf.Accumulator, - queueItem azqueue.QueueItem, - properties *azqueue.QueueGetPropertiesResponse, - peekedMessage *azqueue.PeekedMessage, -) { - fields := make(map[string]interface{}) - tags := make(map[string]string) - tags["queue"] = strings.TrimSpace(queueItem.Name) - tags["account"] = a.StorageAccountName - fields["size"] = properties.ApproximateMessagesCount() - if peekedMessage != nil { - fields["oldest_message_age_ns"] = time.Now().UnixNano() - peekedMessage.InsertionTime.UnixNano() - } - acc.AddFields("azure_storage_queues", fields, tags) -} - func init() { inputs.Add("azure_storage_queue", func() telegraf.Input { return &AzureStorageQueue{PeekOldestMessageAge: true} diff --git a/plugins/inputs/azure_storage_queue/azure_storage_queue_test.go b/plugins/inputs/azure_storage_queue/azure_storage_queue_test.go new file mode 100644 index 000000000..e969ca15c --- /dev/null +++ b/plugins/inputs/azure_storage_queue/azure_storage_queue_test.go @@ -0,0 +1,148 @@ +package azure_storage_queue + +import ( + "fmt" + "os" + "testing" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue" + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/modules/azurite" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/testutil" +) + +func TestEmulatorIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + // Require the developers to explicitly accept the EULA of the emulator + if os.Getenv("AZURE_EVENT_HUBS_EMULATOR_ACCEPT_EULA") != "yes" { + t.Skip(` + Skipping due to unexcepted EULA. To run this test, please check the EULA of the emulator + at https://github.com/Azure/azure-event-hubs-emulator-installer/blob/main/EMULATOR_EULA.md + and accept it by setting the environment variable AZURE_EVENT_HUBS_EMULATOR_ACCEPT_EULA + to 'yes'. + `) + } + + // Setup the Azure Event Hub emulator environment + // See https://learn.microsoft.com/en-us/azure/event-hubs/test-locally-with-event-hub-emulator + emulator, err := azurite.Run( + t.Context(), + "mcr.microsoft.com/azure-storage/azurite:3.28.0", + azurite.WithInMemoryPersistence(64.0), + ) + require.NoError(t, err, "failed to start Azurite container") + defer testcontainers.TerminateContainer(emulator) //nolint:errcheck // Ignore error as we can't do anything about it + + endpoint := emulator.MustServiceURL(t.Context(), azurite.QueueService) + "/" + azurite.AccountName + + // Create two queues and push some messages to get data + credentials, err := azqueue.NewSharedKeyCredential(azurite.AccountName, azurite.AccountKey) + require.NoError(t, err) + + client, err := azqueue.NewServiceClientWithSharedKeyCredential(endpoint, credentials, nil) + require.NoError(t, err) + + // Remember the oldest messages + oldest := make(map[string]time.Time, 2) + + // Add five messages to test queue one + _, err = client.CreateQueue(t.Context(), "test-one", nil) + require.NoError(t, err) + + qc := client.NewQueueClient("test-one") + for i := range 5 { + msg := fmt.Sprintf(`{"count": %d, "message": "foobar"}`, i) + resp, err := qc.EnqueueMessage(t.Context(), msg, nil) + require.NoError(t, err) + if i == 0 { + oldest["test-one"] = *resp.Date + time.Sleep(time.Second) + } + } + + // Add three messages to test queue two + _, err = client.CreateQueue(t.Context(), "test-two", nil) + require.NoError(t, err) + + qc = client.NewQueueClient("test-two") + for i := range 3 { + msg := fmt.Sprintf(`{"count": %d, "message": "tiger"}`, i) + resp, err := qc.EnqueueMessage(t.Context(), msg, nil) + require.NoError(t, err) + if i == 0 { + oldest["test-two"] = *resp.Date + time.Sleep(time.Second) + } + } + + // Setup plugin + plugin := &AzureStorageQueue{ + EndpointURL: endpoint, + StorageAccountName: azurite.AccountName, + StorageAccountKey: azurite.AccountKey, + PeekOldestMessageAge: true, + Log: &testutil.Logger{}, + } + + require.NoError(t, plugin.Init()) + + // Make sure we are connected + var acc testutil.Accumulator + require.NoError(t, plugin.Gather(&acc)) + + expected := []telegraf.Metric{ + metric.New( + "azure_storage_queues", + map[string]string{ + "account": azurite.AccountName, + "queue": "test-one", + }, + map[string]interface{}{ + "oldest_message_age_ns": int64(0), + "size": int64(5), + }, + time.Unix(0, 0), + ), + metric.New( + "azure_storage_queues", + map[string]string{ + "account": azurite.AccountName, + "queue": "test-two", + }, + map[string]interface{}{ + "oldest_message_age_ns": int64(0), + "size": int64(3), + }, + time.Unix(0, 0), + ), + } + + // Test the metrics + options := []cmp.Option{ + testutil.IgnoreTime(), + testutil.IgnoreFields("oldest_message_age_ns"), + } + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, options...) + + // Test the oldest-message values + for _, m := range actual { + q, found := m.GetTag("queue") + require.True(t, found) + + actualAge, found := m.GetField("oldest_message_age_ns") + require.True(t, found) + + expectedAge := m.Time().Sub(oldest[q]) + require.Equal(t, expectedAge.Nanoseconds(), actualAge) + } +} diff --git a/plugins/inputs/azure_storage_queue/sample.conf b/plugins/inputs/azure_storage_queue/sample.conf index 54799105f..f80879f44 100644 --- a/plugins/inputs/azure_storage_queue/sample.conf +++ b/plugins/inputs/azure_storage_queue/sample.conf @@ -1,10 +1,8 @@ # Gather Azure Storage Queue metrics [[inputs.azure_storage_queue]] - ## Required Azure Storage Account name + ## Azure Storage Account name and shared access key (required) account_name = "mystorageaccount" - - ## Required Azure Storage Account access key account_key = "storageaccountaccesskey" - ## Set to false to disable peeking age of oldest message (executes faster) + ## Disable peeking age of oldest message (faster) # peek_oldest_message_age = true