diff --git a/plugins/common/jolokia2/gatherer.go b/plugins/common/jolokia2/gatherer.go index 823d8b522..447fc602a 100644 --- a/plugins/common/jolokia2/gatherer.go +++ b/plugins/common/jolokia2/gatherer.go @@ -116,10 +116,10 @@ func (g *Gatherer) generatePoints(metric Metric, responses []ReadResponse) ([]po func mergeTags(metricTags, outerTags map[string]string) map[string]string { tags := make(map[string]string) for k, v := range outerTags { - tags[k] = v + tags[k] = strings.Trim(v, `'"`) } for k, v := range metricTags { - tags[k] = v + tags[k] = strings.Trim(v, `'"`) } return tags diff --git a/plugins/inputs/activemq/activemq_test.go b/plugins/inputs/activemq/activemq_test.go index 1e733a4ee..29889748c 100644 --- a/plugins/inputs/activemq/activemq_test.go +++ b/plugins/inputs/activemq/activemq_test.go @@ -6,8 +6,9 @@ import ( "net/http/httptest" "testing" - "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/testutil" ) func TestGatherQueuesMetrics(t *testing.T) { @@ -44,14 +45,14 @@ func TestGatherQueuesMetrics(t *testing.T) { records["enqueue_count"] = 0 records["dequeue_count"] = 0 + plugin := &ActiveMQ{ + Server: "localhost", + Port: 8161, + } + require.NoError(t, plugin.Init()) + var acc testutil.Accumulator - - activeMQ := new(ActiveMQ) - activeMQ.Server = "localhost" - activeMQ.Port = 8161 - require.NoError(t, activeMQ.Init()) - - activeMQ.GatherQueuesMetrics(&acc, queues) + plugin.GatherQueuesMetrics(&acc, queues) acc.AssertContainsTaggedFields(t, "activemq_queues", records, tags) } @@ -90,14 +91,14 @@ func TestGatherTopicsMetrics(t *testing.T) { records["enqueue_count"] = 1 records["dequeue_count"] = 0 + plugin := &ActiveMQ{ + Server: "localhost", + Port: 8161, + } + require.NoError(t, plugin.Init()) + var acc testutil.Accumulator - - activeMQ := new(ActiveMQ) - activeMQ.Server = "localhost" - activeMQ.Port = 8161 - require.NoError(t, activeMQ.Init()) - - activeMQ.GatherTopicsMetrics(&acc, topics) + plugin.GatherTopicsMetrics(&acc, topics) acc.AssertContainsTaggedFields(t, "activemq_topics", records, tags) } @@ -109,7 +110,6 @@ func TestGatherSubscribersMetrics(t *testing.T) { ` subscribers := Subscribers{} - require.NoError(t, xml.Unmarshal([]byte(s), &subscribers)) records := make(map[string]interface{}) @@ -130,14 +130,14 @@ func TestGatherSubscribersMetrics(t *testing.T) { records["enqueue_counter"] = 0 records["dequeue_counter"] = 0 + plugin := &ActiveMQ{ + Server: "localhost", + Port: 8161, + } + require.NoError(t, plugin.Init()) + var acc testutil.Accumulator - - activeMQ := new(ActiveMQ) - activeMQ.Server = "localhost" - activeMQ.Port = 8161 - require.NoError(t, activeMQ.Init()) - - activeMQ.GatherSubscribersMetrics(&acc, subscribers) + plugin.GatherSubscribersMetrics(&acc, subscribers) acc.AssertContainsTaggedFields(t, "activemq_subscribers", records, tags) } @@ -169,12 +169,9 @@ func TestURLs(t *testing.T) { URL: "http://" + ts.Listener.Addr().String(), Webadmin: "admin", } - err := plugin.Init() - require.NoError(t, err) + require.NoError(t, plugin.Init()) var acc testutil.Accumulator - err = plugin.Gather(&acc) - require.NoError(t, err) - - require.Len(t, acc.GetTelegrafMetrics(), 0) + require.NoError(t, plugin.Gather(&acc)) + require.Empty(t, acc.GetTelegrafMetrics()) } diff --git a/plugins/inputs/jolokia2_agent/jolokia2_agent_test.go b/plugins/inputs/jolokia2_agent/jolokia2_agent_test.go index 05f082cf0..43ff6a53e 100644 --- a/plugins/inputs/jolokia2_agent/jolokia2_agent_test.go +++ b/plugins/inputs/jolokia2_agent/jolokia2_agent_test.go @@ -8,12 +8,16 @@ import ( "net/http/httptest" "os" "testing" + "time" + "github.com/docker/go-connections/nat" "github.com/influxdata/toml" "github.com/influxdata/toml/ast" "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go/wait" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" common "github.com/influxdata/telegraf/plugins/common/jolokia2" "github.com/influxdata/telegraf/plugins/inputs/jolokia2_agent" "github.com/influxdata/telegraf/testutil" @@ -670,6 +674,161 @@ func TestFillFields(t *testing.T) { require.Equal(t, map[string]interface{}{}, results) } +func TestIntegrationArtemis(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + // Start the docker container + container := testutil.Container{ + Image: "apache/activemq-artemis", + ExposedPorts: []string{"61616", "8161"}, + WaitingFor: wait.ForAll( + wait.ForLog("Artemis Console available at"), + wait.ForListeningPort(nat.Port("8161")), + ), + } + require.NoError(t, container.Start(), "failed to start container") + defer container.Terminate() + + // Setup the plugin + port := container.Ports["8161"] + endpoint := "http://" + container.Address + ":" + port + "/console/jolokia/" + plugin := &jolokia2_agent.JolokiaAgent{ + URLs: []string{endpoint}, + Username: "artemis", + Password: "artemis", + Metrics: []common.MetricConfig{ + { + Name: "artemis", + Mbean: `org.apache.activemq.artemis:broker="*",component=addresses,address="*",subcomponent=queues,routing-type="anycast",queue="*"`, + TagKeys: []string{"queue", "subcomponent"}, + }, + }, + } + + // Setup the expectations + expected := []telegraf.Metric{ + metric.New( + "artemis", + map[string]string{ + "jolokia_agent_url": endpoint, + "queue": "ExpiryQueue", + "subcomponent": "queues", + }, + map[string]interface{}{ + "AcknowledgeAttempts": float64(0), + "Address": "ExpiryQueue", + "AutoDelete": false, + "ConfigurationManaged": false, + "ConsumerCount": float64(0), + "ConsumersBeforeDispatch": float64(0), + "DeadLetterAddress": "DLQ", + "DelayBeforeDispatch": float64(0), + "DeliveringCount": float64(0), + "DeliveringSize": float64(0), + "Durable": false, + "DurableDeliveringCount": float64(0), + "DurableDeliveringSize": float64(0), + "DurableMessageCount": float64(0), + "DurablePersistentSize": float64(0), + "DurableScheduledCount": float64(0), + "DurableScheduledSize": float64(0), + "Enabled": true, + "Exclusive": true, + "ExpiryAddress": "ExpiryQueue", + "FirstMessageAsJSON": "[{}]", + "GroupBuckets": float64(0), + "GroupCount": float64(0), + "GroupRebalance": false, + "GroupRebalancePauseDispatch": false, + "ID": float64(0), + "LastValue": false, + "MaxConsumers": float64(0), + "MessageCount": float64(0), + "MessagesAcknowledged": float64(0), + "MessagesAdded": float64(0), + "MessagesExpired": float64(0), + "MessagesKilled": float64(0), + "Name": "ExpiryQueue", + "Paused": false, + "PersistentSize": float64(0), + "PreparedTransactionMessageCount": float64(0), + "PurgeOnNoConsumers": false, + "RetroactiveResource": false, + "RingSize": float64(0), + "RoutingType": "ANYCAST", + "ScheduledCount": float64(0), + "ScheduledSize": float64(0), + "Temporary": false, + }, + time.Unix(0, 0), + ), + metric.New( + "artemis", + map[string]string{ + "jolokia_agent_url": endpoint, + "queue": "DLQ", + "subcomponent": "queues", + }, + map[string]interface{}{ + "AcknowledgeAttempts": float64(0), + "Address": "DLQ", + "AutoDelete": false, + "ConfigurationManaged": false, + "ConsumerCount": float64(0), + "ConsumersBeforeDispatch": float64(0), + "DeadLetterAddress": "DLQ", + "DelayBeforeDispatch": float64(0), + "DeliveringCount": float64(0), + "DeliveringSize": float64(0), + "Durable": false, + "DurableDeliveringCount": float64(0), + "DurableDeliveringSize": float64(0), + "DurableMessageCount": float64(0), + "DurablePersistentSize": float64(0), + "DurableScheduledCount": float64(0), + "DurableScheduledSize": float64(0), + "Enabled": true, + "Exclusive": true, + "ExpiryAddress": "ExpiryQueue", + "FirstMessageAsJSON": "[{}]", + "GroupBuckets": float64(0), + "GroupCount": float64(0), + "GroupRebalance": false, + "GroupRebalancePauseDispatch": false, + "ID": float64(0), + "LastValue": false, + "MaxConsumers": float64(0), + "MessageCount": float64(0), + "MessagesAcknowledged": float64(0), + "MessagesAdded": float64(0), + "MessagesExpired": float64(0), + "MessagesKilled": float64(0), + "Name": "DLQ", + "Paused": false, + "PersistentSize": float64(0), + "PreparedTransactionMessageCount": float64(0), + "PurgeOnNoConsumers": false, + "RetroactiveResource": false, + "RingSize": float64(0), + "RoutingType": "ANYCAST", + "ScheduledCount": float64(0), + "ScheduledSize": float64(0), + "Temporary": false, + }, + time.Unix(0, 0), + ), + } + + // Collect the metrics and compare + var acc testutil.Accumulator + require.NoError(t, plugin.Gather(&acc)) + + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsStructureEqual(t, expected, actual, testutil.IgnoreTime()) +} + func setupServer(resp string) *httptest.Server { return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK)