chore(inputs.azure_storage_queue): Switch to maintained library (#16609)

This commit is contained in:
Sven Rebhan 2025-04-11 19:13:47 +02:00 committed by GitHub
parent 94595a10f3
commit 7dd1886248
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 217 additions and 78 deletions

View File

@ -22,6 +22,7 @@ following works:
- github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor [MIT License](https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/resourcemanager/monitor/armmonitor/LICENSE.txt)
- github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources [MIT License](https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/resourcemanager/resources/armresources/LICENSE.txt)
- github.com/Azure/azure-sdk-for-go/sdk/storage/azblob [MIT License](https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/storage/azblob/LICENSE.txt)
- github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue [MIT License](https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/storage/azqueue/LICENSE.txt)
- github.com/Azure/azure-storage-queue-go [MIT License](https://github.com/Azure/azure-storage-queue-go/blob/master/LICENSE)
- github.com/Azure/go-amqp [MIT License](https://github.com/Azure/go-amqp/blob/master/LICENSE)
- github.com/Azure/go-ansiterm [MIT License](https://github.com/Azure/go-ansiterm/blob/master/LICENSE)

3
go.mod
View File

@ -18,7 +18,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.3.2
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.11.0
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0
github.com/Azure/azure-storage-queue-go v0.0.0-20230531184854-c06a8eff66fe
github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue v1.0.0
github.com/Azure/go-autorest/autorest v0.11.30
github.com/Azure/go-autorest/autorest/adal v0.9.24
github.com/Azure/go-autorest/autorest/azure/auth v0.5.13
@ -261,6 +261,7 @@ require (
github.com/Azure/azure-sdk-for-go v68.0.0+incompatible // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.0 // indirect
github.com/Azure/azure-storage-queue-go v0.0.0-20230531184854-c06a8eff66fe // indirect
github.com/Azure/go-amqp v1.4.0 // indirect
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect

View File

@ -23,13 +23,11 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
```toml @sample.conf
# Gather Azure Storage Queue metrics
[[inputs.azure_storage_queue]]
## Required Azure Storage Account name
## Azure Storage Account name and shared access key (required)
account_name = "mystorageaccount"
## Required Azure Storage Account access key
account_key = "storageaccountaccesskey"
## Set to false to disable peeking age of oldest message (executes faster)
## Disable peeking age of oldest message (faster)
# peek_oldest_message_age = true
```

View File

@ -5,11 +5,11 @@ import (
"context"
_ "embed"
"errors"
"net/url"
"fmt"
"strings"
"time"
"github.com/Azure/azure-storage-queue-go/azqueue"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
@ -19,12 +19,13 @@ import (
var sampleConfig string
type AzureStorageQueue struct {
EndpointURL string `toml:"endpoint"`
StorageAccountName string `toml:"account_name"`
StorageAccountKey string `toml:"account_key"`
PeekOldestMessageAge bool `toml:"peek_oldest_message_age"`
Log telegraf.Logger
serviceURL *azqueue.ServiceURL
client *azqueue.ServiceClient
}
func (*AzureStorageQueue) SampleConfig() string {
@ -32,6 +33,7 @@ func (*AzureStorageQueue) SampleConfig() string {
}
func (a *AzureStorageQueue) Init() error {
// Check settings
if a.StorageAccountName == "" {
return errors.New("account_name must be configured")
}
@ -39,90 +41,81 @@ func (a *AzureStorageQueue) Init() error {
if a.StorageAccountKey == "" {
return errors.New("account_key must be configured")
}
// Prepare the client
if a.EndpointURL == "" {
a.EndpointURL = "https://" + a.StorageAccountName + ".queue.core.windows.net"
}
credentials, err := azqueue.NewSharedKeyCredential(a.StorageAccountName, a.StorageAccountKey)
if err != nil {
return fmt.Errorf("creating shared-key credentials failed: %w", err)
}
client, err := azqueue.NewServiceClientWithSharedKeyCredential(a.EndpointURL, credentials, nil)
if err != nil {
return fmt.Errorf("creating client failed: %w", err)
}
a.client = client
return nil
}
func (a *AzureStorageQueue) Gather(acc telegraf.Accumulator) error {
serviceURL, err := a.getServiceURL()
if err != nil {
return err
}
ctx := context.Background()
ctx := context.TODO()
a.Log.Debugf("Listing queues of storage account %q", a.StorageAccountName)
for marker := (azqueue.Marker{}); marker.NotDone(); {
a.Log.Debugf("Listing queues of storage account %q", a.StorageAccountName)
queuesSegment, err := serviceURL.ListQueuesSegment(ctx, marker,
azqueue.ListQueuesSegmentOptions{
Detail: azqueue.ListQueuesSegmentDetails{Metadata: false},
})
// Iterate through the queues and generate metrics
pages := a.client.NewListQueuesPager(nil)
for pages.More() {
response, err := pages.NextPage(ctx)
if err != nil {
return err
return fmt.Errorf("getting next page failed: %w", err)
}
marker = queuesSegment.NextMarker
for _, queueItem := range queuesSegment.QueueItems {
a.Log.Debugf("Processing queue %q of storage account %q", queueItem.Name, a.StorageAccountName)
queueURL := serviceURL.NewQueueURL(queueItem.Name)
properties, err := queueURL.GetProperties(ctx)
if err != nil {
a.Log.Errorf("Error getting properties for queue %s: %s", queueItem.Name, err.Error())
// Get the properties and the message properties for each of the queues
for _, queue := range response.Queues {
if queue.Name == nil {
continue
}
var peekedMessage *azqueue.PeekedMessage
if a.PeekOldestMessageAge {
messagesURL := queueURL.NewMessagesURL()
messagesResponse, err := messagesURL.Peek(ctx, 1)
if err != nil {
a.Log.Errorf("Error peeking queue %s: %s", queueItem.Name, err.Error())
} else if messagesResponse.NumMessages() > 0 {
peekedMessage = messagesResponse.Message(0)
}
name := strings.TrimSpace(*queue.Name)
// Access the queue and get the properties
c := a.client.NewQueueClient(*queue.Name)
props, err := c.GetProperties(ctx, nil)
if err != nil {
acc.AddError(fmt.Errorf("getting properties for queue %q failed: %w", name, err))
continue
}
if props.ApproximateMessagesCount == nil {
acc.AddError(fmt.Errorf("unset message count for queue %q", name))
continue
}
a.gatherQueueMetrics(acc, queueItem, properties, peekedMessage)
// Setup the metric elements
tags := map[string]string{
"account": a.StorageAccountName,
"queue": strings.TrimSpace(name),
}
fields := map[string]interface{}{
"size": *props.ApproximateMessagesCount,
}
now := time.Now()
if a.PeekOldestMessageAge {
if r, err := c.PeekMessage(ctx, nil); err != nil {
acc.AddError(fmt.Errorf("peeking message for queue %q failed: %w", name, err))
} else if len(r.Messages) > 0 && r.Messages[0] != nil && r.Messages[0].InsertionTime != nil {
msg := r.Messages[0]
fields["oldest_message_age_ns"] = now.Sub(*msg.InsertionTime).Nanoseconds()
}
}
acc.AddFields("azure_storage_queues", fields, tags, now)
}
}
return nil
}
func (a *AzureStorageQueue) getServiceURL() (azqueue.ServiceURL, error) {
if a.serviceURL == nil {
_url, err := url.Parse("https://" + a.StorageAccountName + ".queue.core.windows.net")
if err != nil {
return azqueue.ServiceURL{}, err
}
credential, err := azqueue.NewSharedKeyCredential(a.StorageAccountName, a.StorageAccountKey)
if err != nil {
return azqueue.ServiceURL{}, err
}
pipeline := azqueue.NewPipeline(credential, azqueue.PipelineOptions{})
serviceURL := azqueue.NewServiceURL(*_url, pipeline)
a.serviceURL = &serviceURL
}
return *a.serviceURL, nil
}
func (a *AzureStorageQueue) gatherQueueMetrics(
acc telegraf.Accumulator,
queueItem azqueue.QueueItem,
properties *azqueue.QueueGetPropertiesResponse,
peekedMessage *azqueue.PeekedMessage,
) {
fields := make(map[string]interface{})
tags := make(map[string]string)
tags["queue"] = strings.TrimSpace(queueItem.Name)
tags["account"] = a.StorageAccountName
fields["size"] = properties.ApproximateMessagesCount()
if peekedMessage != nil {
fields["oldest_message_age_ns"] = time.Now().UnixNano() - peekedMessage.InsertionTime.UnixNano()
}
acc.AddFields("azure_storage_queues", fields, tags)
}
func init() {
inputs.Add("azure_storage_queue", func() telegraf.Input {
return &AzureStorageQueue{PeekOldestMessageAge: true}

View File

@ -0,0 +1,148 @@
package azure_storage_queue
import (
"fmt"
"os"
"testing"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/azurite"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
)
func TestEmulatorIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
// Require the developers to explicitly accept the EULA of the emulator
if os.Getenv("AZURE_EVENT_HUBS_EMULATOR_ACCEPT_EULA") != "yes" {
t.Skip(`
Skipping due to unexcepted EULA. To run this test, please check the EULA of the emulator
at https://github.com/Azure/azure-event-hubs-emulator-installer/blob/main/EMULATOR_EULA.md
and accept it by setting the environment variable AZURE_EVENT_HUBS_EMULATOR_ACCEPT_EULA
to 'yes'.
`)
}
// Setup the Azure Event Hub emulator environment
// See https://learn.microsoft.com/en-us/azure/event-hubs/test-locally-with-event-hub-emulator
emulator, err := azurite.Run(
t.Context(),
"mcr.microsoft.com/azure-storage/azurite:3.28.0",
azurite.WithInMemoryPersistence(64.0),
)
require.NoError(t, err, "failed to start Azurite container")
defer testcontainers.TerminateContainer(emulator) //nolint:errcheck // Ignore error as we can't do anything about it
endpoint := emulator.MustServiceURL(t.Context(), azurite.QueueService) + "/" + azurite.AccountName
// Create two queues and push some messages to get data
credentials, err := azqueue.NewSharedKeyCredential(azurite.AccountName, azurite.AccountKey)
require.NoError(t, err)
client, err := azqueue.NewServiceClientWithSharedKeyCredential(endpoint, credentials, nil)
require.NoError(t, err)
// Remember the oldest messages
oldest := make(map[string]time.Time, 2)
// Add five messages to test queue one
_, err = client.CreateQueue(t.Context(), "test-one", nil)
require.NoError(t, err)
qc := client.NewQueueClient("test-one")
for i := range 5 {
msg := fmt.Sprintf(`{"count": %d, "message": "foobar"}`, i)
resp, err := qc.EnqueueMessage(t.Context(), msg, nil)
require.NoError(t, err)
if i == 0 {
oldest["test-one"] = *resp.Date
time.Sleep(time.Second)
}
}
// Add three messages to test queue two
_, err = client.CreateQueue(t.Context(), "test-two", nil)
require.NoError(t, err)
qc = client.NewQueueClient("test-two")
for i := range 3 {
msg := fmt.Sprintf(`{"count": %d, "message": "tiger"}`, i)
resp, err := qc.EnqueueMessage(t.Context(), msg, nil)
require.NoError(t, err)
if i == 0 {
oldest["test-two"] = *resp.Date
time.Sleep(time.Second)
}
}
// Setup plugin
plugin := &AzureStorageQueue{
EndpointURL: endpoint,
StorageAccountName: azurite.AccountName,
StorageAccountKey: azurite.AccountKey,
PeekOldestMessageAge: true,
Log: &testutil.Logger{},
}
require.NoError(t, plugin.Init())
// Make sure we are connected
var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc))
expected := []telegraf.Metric{
metric.New(
"azure_storage_queues",
map[string]string{
"account": azurite.AccountName,
"queue": "test-one",
},
map[string]interface{}{
"oldest_message_age_ns": int64(0),
"size": int64(5),
},
time.Unix(0, 0),
),
metric.New(
"azure_storage_queues",
map[string]string{
"account": azurite.AccountName,
"queue": "test-two",
},
map[string]interface{}{
"oldest_message_age_ns": int64(0),
"size": int64(3),
},
time.Unix(0, 0),
),
}
// Test the metrics
options := []cmp.Option{
testutil.IgnoreTime(),
testutil.IgnoreFields("oldest_message_age_ns"),
}
actual := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, options...)
// Test the oldest-message values
for _, m := range actual {
q, found := m.GetTag("queue")
require.True(t, found)
actualAge, found := m.GetField("oldest_message_age_ns")
require.True(t, found)
expectedAge := m.Time().Sub(oldest[q])
require.Equal(t, expectedAge.Nanoseconds(), actualAge)
}
}

View File

@ -1,10 +1,8 @@
# Gather Azure Storage Queue metrics
[[inputs.azure_storage_queue]]
## Required Azure Storage Account name
## Azure Storage Account name and shared access key (required)
account_name = "mystorageaccount"
## Required Azure Storage Account access key
account_key = "storageaccountaccesskey"
## Set to false to disable peeking age of oldest message (executes faster)
## Disable peeking age of oldest message (faster)
# peek_oldest_message_age = true