fix(inputs.jolokia2_agent): Trim quotes around tags (#14132)
This commit is contained in:
parent
c142f48f72
commit
3d34c41154
|
|
@ -116,10 +116,10 @@ func (g *Gatherer) generatePoints(metric Metric, responses []ReadResponse) ([]po
|
||||||
func mergeTags(metricTags, outerTags map[string]string) map[string]string {
|
func mergeTags(metricTags, outerTags map[string]string) map[string]string {
|
||||||
tags := make(map[string]string)
|
tags := make(map[string]string)
|
||||||
for k, v := range outerTags {
|
for k, v := range outerTags {
|
||||||
tags[k] = v
|
tags[k] = strings.Trim(v, `'"`)
|
||||||
}
|
}
|
||||||
for k, v := range metricTags {
|
for k, v := range metricTags {
|
||||||
tags[k] = v
|
tags[k] = strings.Trim(v, `'"`)
|
||||||
}
|
}
|
||||||
|
|
||||||
return tags
|
return tags
|
||||||
|
|
|
||||||
|
|
@ -6,8 +6,9 @@ import (
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/testutil"
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestGatherQueuesMetrics(t *testing.T) {
|
func TestGatherQueuesMetrics(t *testing.T) {
|
||||||
|
|
@ -44,14 +45,14 @@ func TestGatherQueuesMetrics(t *testing.T) {
|
||||||
records["enqueue_count"] = 0
|
records["enqueue_count"] = 0
|
||||||
records["dequeue_count"] = 0
|
records["dequeue_count"] = 0
|
||||||
|
|
||||||
|
plugin := &ActiveMQ{
|
||||||
|
Server: "localhost",
|
||||||
|
Port: 8161,
|
||||||
|
}
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
|
plugin.GatherQueuesMetrics(&acc, queues)
|
||||||
activeMQ := new(ActiveMQ)
|
|
||||||
activeMQ.Server = "localhost"
|
|
||||||
activeMQ.Port = 8161
|
|
||||||
require.NoError(t, activeMQ.Init())
|
|
||||||
|
|
||||||
activeMQ.GatherQueuesMetrics(&acc, queues)
|
|
||||||
acc.AssertContainsTaggedFields(t, "activemq_queues", records, tags)
|
acc.AssertContainsTaggedFields(t, "activemq_queues", records, tags)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -90,14 +91,14 @@ func TestGatherTopicsMetrics(t *testing.T) {
|
||||||
records["enqueue_count"] = 1
|
records["enqueue_count"] = 1
|
||||||
records["dequeue_count"] = 0
|
records["dequeue_count"] = 0
|
||||||
|
|
||||||
|
plugin := &ActiveMQ{
|
||||||
|
Server: "localhost",
|
||||||
|
Port: 8161,
|
||||||
|
}
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
|
plugin.GatherTopicsMetrics(&acc, topics)
|
||||||
activeMQ := new(ActiveMQ)
|
|
||||||
activeMQ.Server = "localhost"
|
|
||||||
activeMQ.Port = 8161
|
|
||||||
require.NoError(t, activeMQ.Init())
|
|
||||||
|
|
||||||
activeMQ.GatherTopicsMetrics(&acc, topics)
|
|
||||||
acc.AssertContainsTaggedFields(t, "activemq_topics", records, tags)
|
acc.AssertContainsTaggedFields(t, "activemq_topics", records, tags)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -109,7 +110,6 @@ func TestGatherSubscribersMetrics(t *testing.T) {
|
||||||
</subscribers>`
|
</subscribers>`
|
||||||
|
|
||||||
subscribers := Subscribers{}
|
subscribers := Subscribers{}
|
||||||
|
|
||||||
require.NoError(t, xml.Unmarshal([]byte(s), &subscribers))
|
require.NoError(t, xml.Unmarshal([]byte(s), &subscribers))
|
||||||
|
|
||||||
records := make(map[string]interface{})
|
records := make(map[string]interface{})
|
||||||
|
|
@ -130,14 +130,14 @@ func TestGatherSubscribersMetrics(t *testing.T) {
|
||||||
records["enqueue_counter"] = 0
|
records["enqueue_counter"] = 0
|
||||||
records["dequeue_counter"] = 0
|
records["dequeue_counter"] = 0
|
||||||
|
|
||||||
|
plugin := &ActiveMQ{
|
||||||
|
Server: "localhost",
|
||||||
|
Port: 8161,
|
||||||
|
}
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
|
plugin.GatherSubscribersMetrics(&acc, subscribers)
|
||||||
activeMQ := new(ActiveMQ)
|
|
||||||
activeMQ.Server = "localhost"
|
|
||||||
activeMQ.Port = 8161
|
|
||||||
require.NoError(t, activeMQ.Init())
|
|
||||||
|
|
||||||
activeMQ.GatherSubscribersMetrics(&acc, subscribers)
|
|
||||||
acc.AssertContainsTaggedFields(t, "activemq_subscribers", records, tags)
|
acc.AssertContainsTaggedFields(t, "activemq_subscribers", records, tags)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -169,12 +169,9 @@ func TestURLs(t *testing.T) {
|
||||||
URL: "http://" + ts.Listener.Addr().String(),
|
URL: "http://" + ts.Listener.Addr().String(),
|
||||||
Webadmin: "admin",
|
Webadmin: "admin",
|
||||||
}
|
}
|
||||||
err := plugin.Init()
|
require.NoError(t, plugin.Init())
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
err = plugin.Gather(&acc)
|
require.NoError(t, plugin.Gather(&acc))
|
||||||
require.NoError(t, err)
|
require.Empty(t, acc.GetTelegrafMetrics())
|
||||||
|
|
||||||
require.Len(t, acc.GetTelegrafMetrics(), 0)
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -8,12 +8,16 @@ import (
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/docker/go-connections/nat"
|
||||||
"github.com/influxdata/toml"
|
"github.com/influxdata/toml"
|
||||||
"github.com/influxdata/toml/ast"
|
"github.com/influxdata/toml/ast"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"github.com/testcontainers/testcontainers-go/wait"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
common "github.com/influxdata/telegraf/plugins/common/jolokia2"
|
common "github.com/influxdata/telegraf/plugins/common/jolokia2"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs/jolokia2_agent"
|
"github.com/influxdata/telegraf/plugins/inputs/jolokia2_agent"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
|
@ -670,6 +674,161 @@ func TestFillFields(t *testing.T) {
|
||||||
require.Equal(t, map[string]interface{}{}, results)
|
require.Equal(t, map[string]interface{}{}, results)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestIntegrationArtemis(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping integration test in short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start the docker container
|
||||||
|
container := testutil.Container{
|
||||||
|
Image: "apache/activemq-artemis",
|
||||||
|
ExposedPorts: []string{"61616", "8161"},
|
||||||
|
WaitingFor: wait.ForAll(
|
||||||
|
wait.ForLog("Artemis Console available at"),
|
||||||
|
wait.ForListeningPort(nat.Port("8161")),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
require.NoError(t, container.Start(), "failed to start container")
|
||||||
|
defer container.Terminate()
|
||||||
|
|
||||||
|
// Setup the plugin
|
||||||
|
port := container.Ports["8161"]
|
||||||
|
endpoint := "http://" + container.Address + ":" + port + "/console/jolokia/"
|
||||||
|
plugin := &jolokia2_agent.JolokiaAgent{
|
||||||
|
URLs: []string{endpoint},
|
||||||
|
Username: "artemis",
|
||||||
|
Password: "artemis",
|
||||||
|
Metrics: []common.MetricConfig{
|
||||||
|
{
|
||||||
|
Name: "artemis",
|
||||||
|
Mbean: `org.apache.activemq.artemis:broker="*",component=addresses,address="*",subcomponent=queues,routing-type="anycast",queue="*"`,
|
||||||
|
TagKeys: []string{"queue", "subcomponent"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Setup the expectations
|
||||||
|
expected := []telegraf.Metric{
|
||||||
|
metric.New(
|
||||||
|
"artemis",
|
||||||
|
map[string]string{
|
||||||
|
"jolokia_agent_url": endpoint,
|
||||||
|
"queue": "ExpiryQueue",
|
||||||
|
"subcomponent": "queues",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"AcknowledgeAttempts": float64(0),
|
||||||
|
"Address": "ExpiryQueue",
|
||||||
|
"AutoDelete": false,
|
||||||
|
"ConfigurationManaged": false,
|
||||||
|
"ConsumerCount": float64(0),
|
||||||
|
"ConsumersBeforeDispatch": float64(0),
|
||||||
|
"DeadLetterAddress": "DLQ",
|
||||||
|
"DelayBeforeDispatch": float64(0),
|
||||||
|
"DeliveringCount": float64(0),
|
||||||
|
"DeliveringSize": float64(0),
|
||||||
|
"Durable": false,
|
||||||
|
"DurableDeliveringCount": float64(0),
|
||||||
|
"DurableDeliveringSize": float64(0),
|
||||||
|
"DurableMessageCount": float64(0),
|
||||||
|
"DurablePersistentSize": float64(0),
|
||||||
|
"DurableScheduledCount": float64(0),
|
||||||
|
"DurableScheduledSize": float64(0),
|
||||||
|
"Enabled": true,
|
||||||
|
"Exclusive": true,
|
||||||
|
"ExpiryAddress": "ExpiryQueue",
|
||||||
|
"FirstMessageAsJSON": "[{}]",
|
||||||
|
"GroupBuckets": float64(0),
|
||||||
|
"GroupCount": float64(0),
|
||||||
|
"GroupRebalance": false,
|
||||||
|
"GroupRebalancePauseDispatch": false,
|
||||||
|
"ID": float64(0),
|
||||||
|
"LastValue": false,
|
||||||
|
"MaxConsumers": float64(0),
|
||||||
|
"MessageCount": float64(0),
|
||||||
|
"MessagesAcknowledged": float64(0),
|
||||||
|
"MessagesAdded": float64(0),
|
||||||
|
"MessagesExpired": float64(0),
|
||||||
|
"MessagesKilled": float64(0),
|
||||||
|
"Name": "ExpiryQueue",
|
||||||
|
"Paused": false,
|
||||||
|
"PersistentSize": float64(0),
|
||||||
|
"PreparedTransactionMessageCount": float64(0),
|
||||||
|
"PurgeOnNoConsumers": false,
|
||||||
|
"RetroactiveResource": false,
|
||||||
|
"RingSize": float64(0),
|
||||||
|
"RoutingType": "ANYCAST",
|
||||||
|
"ScheduledCount": float64(0),
|
||||||
|
"ScheduledSize": float64(0),
|
||||||
|
"Temporary": false,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"artemis",
|
||||||
|
map[string]string{
|
||||||
|
"jolokia_agent_url": endpoint,
|
||||||
|
"queue": "DLQ",
|
||||||
|
"subcomponent": "queues",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"AcknowledgeAttempts": float64(0),
|
||||||
|
"Address": "DLQ",
|
||||||
|
"AutoDelete": false,
|
||||||
|
"ConfigurationManaged": false,
|
||||||
|
"ConsumerCount": float64(0),
|
||||||
|
"ConsumersBeforeDispatch": float64(0),
|
||||||
|
"DeadLetterAddress": "DLQ",
|
||||||
|
"DelayBeforeDispatch": float64(0),
|
||||||
|
"DeliveringCount": float64(0),
|
||||||
|
"DeliveringSize": float64(0),
|
||||||
|
"Durable": false,
|
||||||
|
"DurableDeliveringCount": float64(0),
|
||||||
|
"DurableDeliveringSize": float64(0),
|
||||||
|
"DurableMessageCount": float64(0),
|
||||||
|
"DurablePersistentSize": float64(0),
|
||||||
|
"DurableScheduledCount": float64(0),
|
||||||
|
"DurableScheduledSize": float64(0),
|
||||||
|
"Enabled": true,
|
||||||
|
"Exclusive": true,
|
||||||
|
"ExpiryAddress": "ExpiryQueue",
|
||||||
|
"FirstMessageAsJSON": "[{}]",
|
||||||
|
"GroupBuckets": float64(0),
|
||||||
|
"GroupCount": float64(0),
|
||||||
|
"GroupRebalance": false,
|
||||||
|
"GroupRebalancePauseDispatch": false,
|
||||||
|
"ID": float64(0),
|
||||||
|
"LastValue": false,
|
||||||
|
"MaxConsumers": float64(0),
|
||||||
|
"MessageCount": float64(0),
|
||||||
|
"MessagesAcknowledged": float64(0),
|
||||||
|
"MessagesAdded": float64(0),
|
||||||
|
"MessagesExpired": float64(0),
|
||||||
|
"MessagesKilled": float64(0),
|
||||||
|
"Name": "DLQ",
|
||||||
|
"Paused": false,
|
||||||
|
"PersistentSize": float64(0),
|
||||||
|
"PreparedTransactionMessageCount": float64(0),
|
||||||
|
"PurgeOnNoConsumers": false,
|
||||||
|
"RetroactiveResource": false,
|
||||||
|
"RingSize": float64(0),
|
||||||
|
"RoutingType": "ANYCAST",
|
||||||
|
"ScheduledCount": float64(0),
|
||||||
|
"ScheduledSize": float64(0),
|
||||||
|
"Temporary": false,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Collect the metrics and compare
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
require.NoError(t, plugin.Gather(&acc))
|
||||||
|
|
||||||
|
actual := acc.GetTelegrafMetrics()
|
||||||
|
testutil.RequireMetricsStructureEqual(t, expected, actual, testutil.IgnoreTime())
|
||||||
|
}
|
||||||
|
|
||||||
func setupServer(resp string) *httptest.Server {
|
func setupServer(resp string) *httptest.Server {
|
||||||
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue