From bb32a3cd437bff7e355a395d87624d6afe5293b2 Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Tue, 8 Apr 2025 11:22:48 +0200 Subject: [PATCH] chore(outputs.zabbix): Cleanup unit-tests (#16668) --- .../zabbix/testcases/receive/expected.out | 172 ++ .../zabbix/testcases/receive/input.influx | 3 + .../zabbix/testcases/receive/telegraf.conf | 7 + plugins/outputs/zabbix/zabbix_test.go | 1388 ++++++++++------- 4 files changed, 1031 insertions(+), 539 deletions(-) create mode 100644 plugins/outputs/zabbix/testcases/receive/expected.out create mode 100644 plugins/outputs/zabbix/testcases/receive/input.influx create mode 100644 plugins/outputs/zabbix/testcases/receive/telegraf.conf diff --git a/plugins/outputs/zabbix/testcases/receive/expected.out b/plugins/outputs/zabbix/testcases/receive/expected.out new file mode 100644 index 000000000..0ff2c9f99 --- /dev/null +++ b/plugins/outputs/zabbix/testcases/receive/expected.out @@ -0,0 +1,172 @@ +[ + { + "request": "sender data", + "data": [ + { + "host": "telegraf-kafka-host-sync", + "key": "telegraf.resourcename", + "value": "192.168.0.1", + "clock": 1742386216 + }, + { + "host": "telegraf-kafka-host-sync", + "key": "telegraf.userid", + "value": "0", + "clock": 1742386216 + }, + { + "host": "telegraf-kafka-host-sync", + "key": "telegraf.zabbix_instance", + "value": "my_instance", + "clock": 1742386216 + }, + { + "host": "telegraf-kafka-host-sync", + "key": "telegraf.ip", + "value": "", + "clock": 1742386216 + }, + { + "host": "telegraf-kafka-host-sync", + "key": "telegraf.details", + "value": "{\"host.proxyid\":[\"update\",1,2]}", + "clock": 1742386216 + }, + { + "host": "telegraf-kafka-host-sync", + "key": "telegraf.action", + "value": "Update", + "clock": 1742386216 + }, + { + "host": "telegraf-kafka-host-sync", + "key": "telegraf.resourceid", + "value": "13196", + "clock": 1742386216 + }, + { + "host": "telegraf-kafka-host-sync", + "key": "telegraf.auditid", + "value": "cm8fvqdfv022zhikkndvnxv6x", + "clock": 1742386216 + }, + { + "host": "telegraf-kafka-host-sync", + "key": "telegraf.resourcetype", + "value": "Host", + "clock": 1742386216 + }, + { + "host": "telegraf-kafka-host-sync", + "key": "telegraf.action", + "value": "Update", + "clock": 1742386216 + }, + { + "host": "telegraf-kafka-host-sync", + "key": "telegraf.resourcetype", + "value": "Host", + "clock": 1742386216 + }, + { + "host": "telegraf-kafka-host-sync", + "key": "telegraf.ip", + "value": "", + "clock": 1742386216 + }, + { + "host": "telegraf-kafka-host-sync", + "key": "telegraf.zabbix_instance", + "value": "my_instance", + "clock": 1742386216 + }, + { + "host": "telegraf-kafka-host-sync", + "key": "telegraf.auditid", + "value": "cm8fvqdfv022zhikkndvnxv6x", + "clock": 1742386216 + }, + { + "host": "telegraf-kafka-host-sync", + "key": "telegraf.details", + "value": "{\"host.proxyid\":[\"update\",5,1]}", + "clock": 1742386216 + }, + { + "host": "telegraf-kafka-host-sync", + "key": "telegraf.resourcename", + "value": "172.18.0.94", + "clock": 1742386216 + }, + { + "host": "telegraf-kafka-host-sync", + "key": "telegraf.userid", + "value": "0", + "clock": 1742386216 + }, + { + "host": "telegraf-kafka-host-sync", + "key": "telegraf.resourceid", + "value": "13196", + "clock": 1742386216 + }, + { + "host": "telegraf-kafka-host-sync", + "key": "telegraf.userid", + "value": "0", + "clock": 1742386216 + }, + { + "host": "telegraf-kafka-host-sync", + "key": "telegraf.resourcename", + "value": "192.168.0.1", + "clock": 1742386216 + }, + { + "host": "telegraf-kafka-host-sync", + "key": "telegraf.action", + "value": "Update", + "clock": 1742386216 + }, + { + "host": "telegraf-kafka-host-sync", + "key": "telegraf.resourcetype", + "value": "Host", + "clock": 1742386216 + }, + { + "host": "telegraf-kafka-host-sync", + "key": "telegraf.zabbix_instance", + "value": "my_instance", + "clock": 1742386216 + }, + { + "host": "telegraf-kafka-host-sync", + "key": "telegraf.auditid", + "value": "cm8fvqdfv022zhikkndvnxv6x", + "clock": 1742386216 + }, + { + "host": "telegraf-kafka-host-sync", + "key": "telegraf.resourceid", + "value": "13196", + "clock": 1742386216 + }, + { + "host": "telegraf-kafka-host-sync", + "key": "telegraf.ip", + "value": "", + "clock": 1742386216 + }, + { + "host": "telegraf-kafka-host-sync", + "key": "telegraf.details", + "value": "{\"host.proxyid\":[\"update\",3,5]}", + "clock": 1742386216 + } + ], + "clock": 0, + "host": "", + "host_metadata": "" + } +] \ No newline at end of file diff --git a/plugins/outputs/zabbix/testcases/receive/input.influx b/plugins/outputs/zabbix/testcases/receive/input.influx new file mode 100644 index 000000000..5a6f662cd --- /dev/null +++ b/plugins/outputs/zabbix/testcases/receive/input.influx @@ -0,0 +1,3 @@ +kafka_consumer,host=telegraf-kafka-host-sync resourcename="192.168.0.1",userid="0",zabbix_instance="my_instance",ip="",details="{\"host.proxyid\":[\"update\",1,2]}",action="Update",resourceid="13196",auditid="cm8fvqdfv022zhikkndvnxv6x",resourcetype="Host" 1742386216793788721 +kafka_consumer,host=telegraf-kafka-host-sync action="Update",resourcetype="Host",ip="",zabbix_instance="my_instance",auditid="cm8fvqdfv022zhikkndvnxv6x",details="{\"host.proxyid\":[\"update\",5,1]}",resourcename="172.18.0.94",userid="0",resourceid="13196" 1742386216862414526 +kafka_consumer,host=telegraf-kafka-host-sync userid="0",resourcename="192.168.0.1",action="Update",resourcetype="Host",zabbix_instance="my_instance",auditid="cm8fvqdfv022zhikkndvnxv6x",resourceid="13196",ip="",details="{\"host.proxyid\":[\"update\",3,5]}" 1742386216978998820 diff --git a/plugins/outputs/zabbix/testcases/receive/telegraf.conf b/plugins/outputs/zabbix/testcases/receive/telegraf.conf new file mode 100644 index 000000000..606e11d6c --- /dev/null +++ b/plugins/outputs/zabbix/testcases/receive/telegraf.conf @@ -0,0 +1,7 @@ +[[outputs.zabbix]] + address = "dummy" + key_prefix = "telegraf." + lld_send_interval = "30s" + lld_clear_interval = "1h" + skip_measurement_prefix = true + agent_active = false \ No newline at end of file diff --git a/plugins/outputs/zabbix/zabbix_test.go b/plugins/outputs/zabbix/zabbix_test.go index 9fb746479..b9c0b6621 100644 --- a/plugins/outputs/zabbix/zabbix_test.go +++ b/plugins/outputs/zabbix/zabbix_test.go @@ -3,82 +3,45 @@ package zabbix import ( "encoding/binary" "encoding/json" + "errors" "net" "os" + "path/filepath" "sort" "strings" + "sync" + "sync/atomic" "testing" "time" + "github.com/datadope-io/go-zabbix/v2" "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/testutil" ) -type zabbixRequestData struct { - Host string `json:"host"` - Key string `json:"key"` - Value string `json:"value"` - Clock int64 `json:"clock"` -} - -type zabbixRequest struct { - Request string `json:"request"` - Data []zabbixRequestData `json:"data"` - Clock int `json:"clock"` - Host string `json:"host"` - HostMetadata string `json:"host_metadata"` -} - -type zabbixLLDValue struct { - Data []map[string]string `json:"data"` -} - -type result struct { - req zabbixRequest - err error -} - -type zabbixMockServer struct { - listener net.Listener - ignoreAcceptError bool -} - -func newZabbixMockServer(addr string, ignoreAcceptError bool) (*zabbixMockServer, error) { - l, err := net.Listen("tcp", addr) - if err != nil { - return nil, err - } - return &zabbixMockServer{listener: l, ignoreAcceptError: ignoreAcceptError}, nil -} - -func (s *zabbixMockServer) addr() string { - return s.listener.Addr().String() -} - -func (s *zabbixMockServer) close() error { - if s.listener != nil { - return s.listener.Close() - } - return nil -} - -func TestZabbix(t *testing.T) { +func TestSuccessfulReceive(t *testing.T) { hostname, err := os.Hostname() require.NoError(t, err) - tests := map[string]struct { - KeyPrefix string - AgentActive bool - SkipMeasurementPrefix bool - telegrafMetrics []telegraf.Metric - zabbixMetrics []zabbixRequestData + tests := []struct { + name string + prefix string + agentActive bool + skipMeasurementPrefix bool + input []telegraf.Metric + expected []zabbix.Packet }{ - "send one metric with one field and no extra tags, generates one zabbix metric": { - telegrafMetrics: []telegraf.Metric{ - testutil.MustMetric("name", + { + name: "send one metric with one field and no extra tags, generates one zabbix metric", + input: []telegraf.Metric{ + metric.New( + "name", map[string]string{ "host": "hostname", }, @@ -88,18 +51,25 @@ func TestZabbix(t *testing.T) { time.Unix(1522082244, 0), ), }, - zabbixMetrics: []zabbixRequestData{ + expected: []zabbix.Packet{ { - Host: "hostname", - Key: "name.value", - Value: "0", - Clock: 1522082244, + Request: "sender data", + Data: []*zabbix.Metric{ + { + Host: "hostname", + Key: "name.value", + Value: "0", + Clock: 1522082244, + }, + }, }, }, }, - "string values representing a float number should be sent in the exact same format": { - telegrafMetrics: []telegraf.Metric{ - testutil.MustMetric("name", + { + name: "string values representing a float number should be sent in the exact same format", + input: []telegraf.Metric{ + metric.New( + "name", map[string]string{ "host": "hostname", }, @@ -109,18 +79,25 @@ func TestZabbix(t *testing.T) { time.Unix(1522082244, 0), ), }, - zabbixMetrics: []zabbixRequestData{ + expected: []zabbix.Packet{ { - Host: "hostname", - Key: "name.value", - Value: "3.1415", - Clock: 1522082244, + Request: "sender data", + Data: []*zabbix.Metric{ + { + Host: "hostname", + Key: "name.value", + Value: "3.1415", + Clock: 1522082244, + }, + }, }, }, }, - "send one metric with one string field and no extra tags, generates one zabbix metric": { - telegrafMetrics: []telegraf.Metric{ - testutil.MustMetric("name", + { + name: "send one metric with one string field and no extra tags, generates one zabbix metric", + input: []telegraf.Metric{ + metric.New( + "name", map[string]string{ "host": "hostname", }, @@ -130,18 +107,25 @@ func TestZabbix(t *testing.T) { time.Unix(1522082244, 0), ), }, - zabbixMetrics: []zabbixRequestData{ + expected: []zabbix.Packet{ { - Host: "hostname", - Key: "name.value", - Value: "some value", - Clock: 1522082244, + Request: "sender data", + Data: []*zabbix.Metric{ + { + Host: "hostname", + Key: "name.value", + Value: "some value", + Clock: 1522082244, + }, + }, }, }, }, - "boolean values are converted to 1 (true) or 0 (false)": { - telegrafMetrics: []telegraf.Metric{ - testutil.MustMetric("name", + { + name: "boolean values are converted to 1 (true) or 0 (false)", + input: []telegraf.Metric{ + metric.New( + "name", map[string]string{ "host": "hostname", }, @@ -152,37 +136,31 @@ func TestZabbix(t *testing.T) { time.Unix(1522082244, 0), ), }, - zabbixMetrics: []zabbixRequestData{ + expected: []zabbix.Packet{ { - Host: "hostname", - Key: "name.valueTrue", - Value: "true", - Clock: 1522082244, - }, - { - Host: "hostname", - Key: "name.valueFalse", - Value: "false", - Clock: 1522082244, + Request: "sender data", + Data: []*zabbix.Metric{ + { + Host: "hostname", + Key: "name.valueTrue", + Value: "true", + Clock: 1522082244, + }, + { + Host: "hostname", + Key: "name.valueFalse", + Value: "false", + Clock: 1522082244, + }, + }, }, }, }, - "invalid value data is ignored and not sent": { - telegrafMetrics: []telegraf.Metric{ - testutil.MustMetric("name", - map[string]string{ - "host": "hostname", - }, - map[string]interface{}{ - "value": []int{1, 2}, - }, - time.Unix(1522082244, 0), - ), - }, - }, - "metrics without host tag use the system hostname": { - telegrafMetrics: []telegraf.Metric{ - testutil.MustMetric("name", + { + name: "metrics without host tag use the system hostname", + input: []telegraf.Metric{ + metric.New( + "name", map[string]string{}, map[string]interface{}{ "value": "x", @@ -190,18 +168,25 @@ func TestZabbix(t *testing.T) { time.Unix(1522082244, 0), ), }, - zabbixMetrics: []zabbixRequestData{ + expected: []zabbix.Packet{ { - Host: hostname, - Key: "name.value", - Value: "x", - Clock: 1522082244, + Request: "sender data", + Data: []*zabbix.Metric{ + { + Host: hostname, + Key: "name.value", + Value: "x", + Clock: 1522082244, + }, + }, }, }, }, - "send one metric with extra tags, zabbix metric should be generated with a parameter": { - telegrafMetrics: []telegraf.Metric{ - testutil.MustMetric("name", + { + name: "send one metric with extra tags, zabbix metric should be generated with a parameter", + input: []telegraf.Metric{ + metric.New( + "name", map[string]string{ "host": "hostname", "foo": "bar", @@ -212,18 +197,25 @@ func TestZabbix(t *testing.T) { time.Unix(1522082244, 0), ), }, - zabbixMetrics: []zabbixRequestData{ + expected: []zabbix.Packet{ { - Host: "hostname", - Key: "name.value[bar]", - Value: "0", - Clock: 1522082244, + Request: "sender data", + Data: []*zabbix.Metric{ + { + Host: "hostname", + Key: "name.value[bar]", + Value: "0", + Clock: 1522082244, + }, + }, }, }, }, - "send one metric with two extra tags, zabbix parameters should be alphabetically ordered": { - telegrafMetrics: []telegraf.Metric{ - testutil.MustMetric("name", + { + name: "send one metric with two extra tags, zabbix parameters should be alphabetically ordered", + input: []telegraf.Metric{ + metric.New( + "name", map[string]string{ "host": "hostname", "zparam": "last", @@ -235,18 +227,25 @@ func TestZabbix(t *testing.T) { time.Unix(1522082244, 0), ), }, - zabbixMetrics: []zabbixRequestData{ + expected: []zabbix.Packet{ { - Host: "hostname", - Key: "name.value[first,last]", - Value: "0", - Clock: 1522082244, + Request: "sender data", + Data: []*zabbix.Metric{ + { + Host: "hostname", + Key: "name.value[first,last]", + Value: "0", + Clock: 1522082244, + }, + }, }, }, }, - "send one metric with two fields and no extra tags, generates two zabbix metrics": { - telegrafMetrics: []telegraf.Metric{ - testutil.MustMetric("name", + { + name: "send one metric with two fields and no extra tags, generates two zabbix metrics", + input: []telegraf.Metric{ + metric.New( + "name", map[string]string{ "host": "hostname", }, @@ -257,24 +256,31 @@ func TestZabbix(t *testing.T) { time.Unix(1522082244, 0), ), }, - zabbixMetrics: []zabbixRequestData{ + expected: []zabbix.Packet{ { - Host: "hostname", - Key: "name.valueA", - Value: "0", - Clock: 1522082244, - }, - { - Host: "hostname", - Key: "name.valueB", - Value: "1", - Clock: 1522082244, + Request: "sender data", + Data: []*zabbix.Metric{ + { + Host: "hostname", + Key: "name.valueA", + Value: "0", + Clock: 1522082244, + }, + { + Host: "hostname", + Key: "name.valueB", + Value: "1", + Clock: 1522082244, + }, + }, }, }, }, - "send two metrics with one field and no extra tags, generates two zabbix metrics": { - telegrafMetrics: []telegraf.Metric{ - testutil.MustMetric("nameA", + { + name: "send two metrics with one field and no extra tags, generates two zabbix metrics", + input: []telegraf.Metric{ + metric.New( + "nameA", map[string]string{ "host": "hostname", }, @@ -283,7 +289,8 @@ func TestZabbix(t *testing.T) { }, time.Unix(1522082244, 0), ), - testutil.MustMetric("nameB", + metric.New( + "nameB", map[string]string{ "host": "hostname", }, @@ -293,24 +300,31 @@ func TestZabbix(t *testing.T) { time.Unix(1522082244, 0), ), }, - zabbixMetrics: []zabbixRequestData{ + expected: []zabbix.Packet{ { - Host: "hostname", - Key: "nameA.value", - Value: "0", - Clock: 1522082244, - }, - { - Host: "hostname", - Key: "nameB.value", - Value: "0", - Clock: 1522082244, + Request: "sender data", + Data: []*zabbix.Metric{ + { + Host: "hostname", + Key: "nameA.value", + Value: "0", + Clock: 1522082244, + }, + { + Host: "hostname", + Key: "nameB.value", + Value: "0", + Clock: 1522082244, + }, + }, }, }, }, - "send two metrics with different hostname, generates two zabbix metrics for different hosts": { - telegrafMetrics: []telegraf.Metric{ - testutil.MustMetric("name", + { + name: "send two metrics with different hostname, generates two zabbix metrics for different hosts", + input: []telegraf.Metric{ + metric.New( + "name", map[string]string{ "host": "hostnameA", }, @@ -319,7 +333,8 @@ func TestZabbix(t *testing.T) { }, time.Unix(1522082244, 0), ), - testutil.MustMetric("name", + metric.New( + "name", map[string]string{ "host": "hostnameB", }, @@ -329,25 +344,32 @@ func TestZabbix(t *testing.T) { time.Unix(1522082244, 0), ), }, - zabbixMetrics: []zabbixRequestData{ + expected: []zabbix.Packet{ { - Host: "hostnameA", - Key: "name.value", - Value: "0", - Clock: 1522082244, - }, - { - Host: "hostnameB", - Key: "name.value", - Value: "0", - Clock: 1522082244, + Request: "sender data", + Data: []*zabbix.Metric{ + { + Host: "hostnameA", + Key: "name.value", + Value: "0", + Clock: 1522082244, + }, + { + Host: "hostnameB", + Key: "name.value", + Value: "0", + Clock: 1522082244, + }, + }, }, }, }, - "if key_prefix is configured, zabbix metrics should have that prefix in the key": { - KeyPrefix: "telegraf.", - telegrafMetrics: []telegraf.Metric{ - testutil.MustMetric("name", + { + name: "if key_prefix is configured, zabbix metrics should have that prefix in the key", + prefix: "telegraf.", + input: []telegraf.Metric{ + metric.New( + "name", map[string]string{ "host": "hostname", }, @@ -357,19 +379,26 @@ func TestZabbix(t *testing.T) { time.Unix(1522082244, 0), ), }, - zabbixMetrics: []zabbixRequestData{ + expected: []zabbix.Packet{ { - Host: "hostname", - Key: "telegraf.name.value", - Value: "0", - Clock: 1522082244, + Request: "sender data", + Data: []*zabbix.Metric{ + { + Host: "hostname", + Key: "telegraf.name.value", + Value: "0", + Clock: 1522082244, + }, + }, }, }, }, - "if skip_measurement_prefix is configured, zabbix metrics should have to skip that prefix in the key": { - SkipMeasurementPrefix: true, - telegrafMetrics: []telegraf.Metric{ - testutil.MustMetric("name", + { + name: "if skip_measurement_prefix is configured, zabbix metrics should have to skip that prefix in the key", + skipMeasurementPrefix: true, + input: []telegraf.Metric{ + metric.New( + "name", map[string]string{ "host": "hostname", }, @@ -379,19 +408,26 @@ func TestZabbix(t *testing.T) { time.Unix(1522082244, 0), ), }, - zabbixMetrics: []zabbixRequestData{ + expected: []zabbix.Packet{ { - Host: "hostname", - Key: "value", - Value: "0", - Clock: 1522082244, + Request: "sender data", + Data: []*zabbix.Metric{ + { + Host: "hostname", + Key: "value", + Value: "0", + Clock: 1522082244, + }, + }, }, }, }, - "if AgentActive is configured, zabbix metrics should be sent respecting that protocol": { - AgentActive: true, - telegrafMetrics: []telegraf.Metric{ - testutil.MustMetric("name", + { + name: "if AgentActive is configured, zabbix metrics should be sent respecting that protocol", + agentActive: true, + input: []telegraf.Metric{ + metric.New( + "name", map[string]string{ "host": "hostname", }, @@ -401,18 +437,25 @@ func TestZabbix(t *testing.T) { time.Unix(1522082244, 0), ), }, - zabbixMetrics: []zabbixRequestData{ + expected: []zabbix.Packet{ { - Host: "hostname", - Key: "name.value", - Value: "0", - Clock: 1522082244, + Request: "agent data", + Data: []*zabbix.Metric{ + { + Host: "hostname", + Key: "name.value", + Value: "0", + Clock: 1522082244, + }, + }, }, }, }, - "metrics should be time sorted, oldest to newest, to avoid zabbix doing extra work when generating trends": { - telegrafMetrics: []telegraf.Metric{ - testutil.MustMetric("name", + { + name: "metrics should be time sorted, oldest to newest, to avoid zabbix doing extra work when generating trends", + input: []telegraf.Metric{ + metric.New( + "name", map[string]string{ "host": "hostnameD", }, @@ -421,7 +464,8 @@ func TestZabbix(t *testing.T) { }, time.Unix(4444444444, 0), ), - testutil.MustMetric("name", + metric.New( + "name", map[string]string{ "host": "hostnameC", }, @@ -430,7 +474,8 @@ func TestZabbix(t *testing.T) { }, time.Unix(3333333333, 0), ), - testutil.MustMetric("name", + metric.New( + "name", map[string]string{ "host": "hostnameA", }, @@ -439,7 +484,8 @@ func TestZabbix(t *testing.T) { }, time.Unix(1111111111, 0), ), - testutil.MustMetric("name", + metric.New( + "name", map[string]string{ "host": "hostnameB", }, @@ -449,91 +495,156 @@ func TestZabbix(t *testing.T) { time.Unix(2222222222, 0), ), }, - zabbixMetrics: []zabbixRequestData{ + expected: []zabbix.Packet{ { - Host: "hostnameA", - Key: "name.value", - Value: "0", - Clock: 1111111111, - }, - { - Host: "hostnameB", - Key: "name.value", - Value: "0", - Clock: 2222222222, - }, - { - Host: "hostnameC", - Key: "name.value", - Value: "0", - Clock: 3333333333, - }, - { - Host: "hostnameD", - Key: "name.value", - Value: "0", - Clock: 4444444444, + Request: "sender data", + Data: []*zabbix.Metric{ + { + Host: "hostnameA", + Key: "name.value", + Value: "0", + Clock: 1111111111, + }, + { + Host: "hostnameB", + Key: "name.value", + Value: "0", + Clock: 2222222222, + }, + { + Host: "hostnameC", + Key: "name.value", + Value: "0", + Clock: 3333333333, + }, + { + Host: "hostnameD", + Key: "name.value", + Value: "0", + Clock: 4444444444, + }, + }, }, }, }, } - for desc, test := range tests { - t.Run(desc, func(t *testing.T) { - // Simulate a Zabbix server to get the data sent. It has a timeout to avoid waiting forever. - server, err := newZabbixMockServer("127.0.0.1:", len(test.zabbixMetrics) == 0) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Setup a Zabbix mock server and start listening + server, err := newZabbixMockServer() require.NoError(t, err) defer server.close() - z := &Zabbix{ + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + server.listen() + }() + + // Setup the plugin + plugin := &Zabbix{ Address: server.addr(), - KeyPrefix: test.KeyPrefix, + KeyPrefix: tt.prefix, HostTag: "host", - SkipMeasurementPrefix: test.SkipMeasurementPrefix, - AgentActive: test.AgentActive, + SkipMeasurementPrefix: tt.skipMeasurementPrefix, + AgentActive: tt.agentActive, LLDSendInterval: config.Duration(10 * time.Minute), Log: testutil.Logger{}, } - require.NoError(t, z.Init()) + require.NoError(t, plugin.Init()) - resCh := make(chan result, 1) - go func() { - resCh <- server.listenForSingleRequest() - }() + // Connect and write the data + require.NoError(t, plugin.Connect()) + defer plugin.Close() + require.NoError(t, plugin.Write(tt.input)) - require.NoError(t, z.Write(test.telegrafMetrics)) + // Wait for the data to arrive + require.Eventually(t, func() bool { + return server.count.Load() > 0 + }, 3*time.Second, 100*time.Millisecond, "nothing received") - // By default, we use trappers - requestType := "sender data" - if test.AgentActive { - requestType = "agent data" - } + // Stop listening + server.listener.Close() + wg.Wait() - select { - case res := <-resCh: - require.NoError(t, res.err) - require.Equal(t, requestType, res.req.Request) - compareData(t, test.zabbixMetrics, res.req.Data) - case <-time.After(1 * time.Second): - require.Empty(t, test.zabbixMetrics, "no metrics should be expected if the connection times out") - } + // Check the received data + server.Lock() + defer server.Unlock() + + require.Empty(t, server.errs, "server had errors") + requireRequestDataEqual(t, tt.expected, server.received, false) }) } } +func TestInvalidData(t *testing.T) { + input := []telegraf.Metric{ + metric.New( + "name", + map[string]string{ + "host": "hostname", + }, + map[string]interface{}{ + "value": []int{1, 2}, + }, + time.Unix(1522082244, 0), + ), + } + + // Setup a Zabbix mock server and start listening + server, err := newZabbixMockServer() + require.NoError(t, err) + defer server.close() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + server.listen() + }() + + // Setup the plugin + plugin := &Zabbix{ + Address: server.addr(), + HostTag: "host", + LLDSendInterval: config.Duration(10 * time.Minute), + Log: testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + + // Connect and write the data + require.NoError(t, plugin.Connect()) + defer plugin.Close() + require.NoError(t, plugin.Write(input)) + require.NoError(t, plugin.Close()) + + // Stop listening + server.listener.Close() + wg.Wait() + + // Check the received data + server.Lock() + defer server.Unlock() + + require.Empty(t, server.errs, "server had errors") + require.Empty(t, server.received) +} + // TestLLD tests how LLD metrics are sent simulating the time passing. // LLD is sent each LLDSendInterval. Only new data. // LLD data is cleared LLDClearInterval. func TestLLD(t *testing.T) { // Telegraf metric which will be sent repeatedly - m := testutil.MustMetric( + m := metric.New( "name", map[string]string{"host": "hostA", "foo": "bar"}, map[string]interface{}{"value": int64(0)}, time.Unix(0, 0), ) - mNew := testutil.MustMetric( + mNew := metric.New( "name", map[string]string{"host": "hostA", "foo": "moo"}, map[string]interface{}{"value": int64(0)}, @@ -541,43 +652,128 @@ func TestLLD(t *testing.T) { ) // Expected Zabbix metric generated - zabbixMetric := zabbixRequestData{ - Host: "hostA", - Key: "telegraf.name.value[bar]", - Value: "0", - Clock: 0, + expected := []zabbix.Packet{ + { + Request: "sender data", + Data: []*zabbix.Metric{ + { + Host: "hostA", + Key: "telegraf.name.value[bar]", + Value: "0", + }, + }, + }, + { + Request: "sender data", + Data: []*zabbix.Metric{ + { + Host: "hostA", + Key: "telegraf.name.value[bar]", + Value: "0", + }, + }, + }, + { + Request: "sender data", + Data: []*zabbix.Metric{ + { + Host: "hostA", + Key: "telegraf.name.value[bar]", + Value: "0", + }, + { + Host: "hostA", + Key: "telegraf.lld.name.foo", + Value: `{"data":[{"{#FOO}":"bar"}]}`, + }, + }, + }, + { + Request: "sender data", + Data: []*zabbix.Metric{ + { + Host: "hostA", + Key: "telegraf.name.value[bar]", + Value: "0", + }, + }, + }, + { + Request: "sender data", + Data: []*zabbix.Metric{ + { + Host: "hostA", + Key: "telegraf.name.value[bar]", + Value: "0", + }, + }, + }, + { + Request: "sender data", + Data: []*zabbix.Metric{ + { + Host: "hostA", + Key: "telegraf.name.value[moo]", + Value: "0", + }, + }, + }, + { + Request: "sender data", + Data: []*zabbix.Metric{ + { + Host: "hostA", + Key: "telegraf.name.value[bar]", + Value: "0", + }, + { + Host: "hostA", + Key: "telegraf.lld.name.foo", + Value: `{"data":[{"{#FOO}":"bar"},{"{#FOO}":"moo"}]}`, + }, + }, + }, + { + Request: "sender data", + Data: []*zabbix.Metric{ + { + Host: "hostA", + Key: "telegraf.name.value[bar]", + Value: "0", + }, + }, + }, + { + Request: "sender data", + Data: []*zabbix.Metric{ + { + Host: "hostA", + Key: "telegraf.name.value[bar]", + Value: "0", + }, + { + Host: "hostA", + Key: "telegraf.lld.name.foo", + Value: `{"data":[{"{#FOO}":"bar"}]}`, + }, + }, + }, } - // Expected Zabbix metric generated - zabbixMetricNew := zabbixRequestData{ - Host: "hostA", - Key: "telegraf.name.value[moo]", - Value: "0", - Clock: 0, - } - - // Expected Zabbix LLD metric generated - zabbixLLDMetric := zabbixRequestData{ - Host: "hostA", - Key: "telegraf.lld.name.foo", - Value: `{"data":[{"{#FOO}":"bar"}]}`, - Clock: 0, - } - - // Expected Zabbix LLD metric generated - zabbixLLDMetricNew := zabbixRequestData{ - Host: "hostA", - Key: "telegraf.lld.name.foo", - Value: `{"data":[{"{#FOO}":"bar"},{"{#FOO}":"moo"}]}`, - Clock: 0, - } - - // Simulate a Zabbix server to get the data sent - server, err := newZabbixMockServer("127.0.0.1:", false) + // Setup a Zabbix mock server and start listening + server, err := newZabbixMockServer() require.NoError(t, err) defer server.close() - z := &Zabbix{ + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + server.listen() + }() + + // Setup plugin + plugin := &Zabbix{ Address: server.addr(), KeyPrefix: "telegraf.", HostTag: "host", @@ -585,302 +781,164 @@ func TestLLD(t *testing.T) { LLDClearInterval: config.Duration(1 * time.Hour), Log: testutil.Logger{}, } - require.NoError(t, z.Init()) + require.NoError(t, plugin.Init()) - resCh := make(chan []result, 1) - go func() { - resCh <- server.listenForNRequests(9) - }() + // Connect and write the metrics + require.NoError(t, plugin.Connect()) + defer plugin.Close() // First packet - require.NoError(t, z.Write([]telegraf.Metric{m})) + require.NoError(t, plugin.Write([]telegraf.Metric{m})) // Second packet, while time has not surpassed LLDSendInterval - require.NoError(t, z.Write([]telegraf.Metric{m})) + require.NoError(t, plugin.Write([]telegraf.Metric{m})) // Simulate time passing for a new LLD send - z.lldLastSend = time.Now().Add(-time.Duration(z.LLDSendInterval)).Add(-time.Millisecond) + plugin.lldLastSend = time.Now().Add(-time.Duration(plugin.LLDSendInterval)).Add(-time.Millisecond) // Third packet, time has surpassed LLDSendInterval, metrics + LLD - require.NoError(t, z.Write([]telegraf.Metric{m})) + require.NoError(t, plugin.Write([]telegraf.Metric{m})) // Fourth packet - require.NoError(t, z.Write([]telegraf.Metric{m})) + require.NoError(t, plugin.Write([]telegraf.Metric{m})) // Simulate time passing for a new LLD send - z.lldLastSend = time.Now().Add(-time.Duration(z.LLDSendInterval)).Add(-time.Millisecond) + plugin.lldLastSend = time.Now().Add(-time.Duration(plugin.LLDSendInterval)).Add(-time.Millisecond) // Fifth packet, time has surpassed LLDSendInterval, metrics. No LLD as there is nothing new. - require.NoError(t, z.Write([]telegraf.Metric{m})) + require.NoError(t, plugin.Write([]telegraf.Metric{m})) // Sixth packet, new LLD info, but time has not surpassed LLDSendInterval - require.NoError(t, z.Write([]telegraf.Metric{mNew})) + require.NoError(t, plugin.Write([]telegraf.Metric{mNew})) // Simulate time passing for LLD clear - z.lldLastSend = time.Now().Add(-time.Duration(z.LLDClearInterval)).Add(-time.Millisecond) + plugin.lldLastSend = time.Now().Add(-time.Duration(plugin.LLDClearInterval)).Add(-time.Millisecond) // Seventh packet, time has surpassed LLDSendInterval and LLDClearInterval, metrics + LLD. // LLD will be cleared. - require.NoError(t, z.Write([]telegraf.Metric{m})) + require.NoError(t, plugin.Write([]telegraf.Metric{m})) // Eighth packet, time host not surpassed LLDSendInterval, just metrics. - require.NoError(t, z.Write([]telegraf.Metric{m})) + require.NoError(t, plugin.Write([]telegraf.Metric{m})) // Simulate time passing for a new LLD send - z.lldLastSend = time.Now().Add(-time.Duration(z.LLDSendInterval)).Add(-time.Millisecond) + plugin.lldLastSend = time.Now().Add(-time.Duration(plugin.LLDSendInterval)).Add(-time.Millisecond) // Ninth packet, time has surpassed LLDSendInterval, metrics + LLD. - require.NoError(t, z.Write([]telegraf.Metric{m})) + require.NoError(t, plugin.Write([]telegraf.Metric{m})) - var results []result - select { - case res := <-resCh: - require.Len(t, res, 9) - results = res - case <-time.After(9 * time.Second): - require.Fail(t, "Timeout while waiting for results") - } + // Wait for the metrics to be received + require.Eventuallyf(t, func() bool { + return server.count.Load() >= uint32(len(expected)) + }, 3*time.Second, 100*time.Millisecond, "expected %d got %d", len(expected), server.count.Load()) - // Read first packet with two metrics, then the first auto-register packet and the second auto-register packet. - // First packet with metrics - require.NoError(t, results[0].err) - compareData(t, []zabbixRequestData{zabbixMetric}, results[0].req.Data) + // Stop listening + require.NoError(t, plugin.Close()) + server.listener.Close() + wg.Wait() - // Second packet, while time has not surpassed LLDSendInterval - require.NoError(t, results[1].err) - compareData(t, []zabbixRequestData{zabbixMetric}, results[1].req.Data) + // Check the received metrics + server.Lock() + defer server.Unlock() - // Third packet, time has surpassed LLDSendInterval, metrics + LLD - require.NoError(t, results[2].err) - require.Len(t, results[2].req.Data, 2, "Expected 2 metrics") - results[2].req.Data[1].Clock = 0 // Ignore lld request clock - compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetric}, results[2].req.Data) - - // Fourth packet with metrics - require.NoError(t, results[3].err) - compareData(t, []zabbixRequestData{zabbixMetric}, results[3].req.Data) - - // Fifth packet, time has surpassed LLDSendInterval, metrics. No LLD as there is nothing new. - require.NoError(t, results[4].err) - compareData(t, []zabbixRequestData{zabbixMetric}, results[4].req.Data) - - // Sixth packet, new LLD info, but time has not surpassed LLDSendInterval - require.NoError(t, results[5].err) - compareData(t, []zabbixRequestData{zabbixMetricNew}, results[5].req.Data) - - // Seventh packet, time has surpassed LLDSendInterval, metrics + LLD. - // Also, time has surpassed LLDClearInterval, so LLD is cleared. - require.NoError(t, results[6].err) - require.Len(t, results[6].req.Data, 2, "Expected 2 metrics") - results[6].req.Data[1].Clock = 0 // Ignore lld request clock - compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetricNew}, results[6].req.Data) - - // Eighth packet, time host not surpassed LLDSendInterval, just metrics. - require.NoError(t, results[7].err) - compareData(t, []zabbixRequestData{zabbixMetric}, results[7].req.Data) - - // Ninth packet, time has surpassed LLDSendInterval, metrics + LLD. - // Just the info of the zabbixMetric as zabbixMetricNew has not been seen since LLDClearInterval. - require.NoError(t, results[8].err) - require.Len(t, results[8].req.Data, 2, "Expected 2 metrics") - results[8].req.Data[1].Clock = 0 // Ignore lld request clock - compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetric}, results[8].req.Data) + require.Empty(t, server.errs, "server had errors") + requireRequestDataEqual(t, expected, server.received, true) } // TestAutoRegister tests that auto-registration requests are sent to zabbix if enabled func TestAutoRegister(t *testing.T) { - // Simulate a Zabbix server to get the data sent - server, err := newZabbixMockServer("127.0.0.1:", false) + now := time.Now() + input := []telegraf.Metric{ + metric.New( + "name", + map[string]string{"host": "hostA"}, + map[string]interface{}{"value": int64(0)}, + now, + ), + metric.New( + "name", + map[string]string{"host": "hostB"}, + map[string]interface{}{"value": int64(42)}, + now, + ), + } + + expected := []zabbix.Packet{ + { + Request: "sender data", + Data: []*zabbix.Metric{ + { + Host: "hostA", + Key: "telegraf.name.value", + Value: "0", + Clock: now.Unix(), + }, + { + Host: "hostB", + Key: "telegraf.name.value", + Value: "42", + Clock: now.Unix(), + }, + }, + }, + { + Request: "active checks", + Host: "hostA", + HostMetadata: "xxx", + }, + { + Request: "active checks", + Host: "hostB", + HostMetadata: "xxx", + }, + } + + // Setup a Zabbix mock server and start listening + server, err := newZabbixMockServer() require.NoError(t, err) defer server.close() - z := &Zabbix{ + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + server.listen() + }() + + // Setup plugin + plugin := &Zabbix{ Address: server.addr(), KeyPrefix: "telegraf.", HostTag: "host", - SkipMeasurementPrefix: false, - AgentActive: false, Autoregister: "xxx", AutoregisterResendInterval: config.Duration(time.Minute * 5), Log: testutil.Logger{}, } - require.NoError(t, z.Init()) + require.NoError(t, plugin.Init()) - resCh := make(chan []result, 1) - go func() { - resCh <- server.listenForNRequests(3) - }() + // Connect and write the metrics + require.NoError(t, plugin.Connect()) + require.NoError(t, plugin.Write(input)) - err = z.Write([]telegraf.Metric{ - testutil.MustMetric( - "name", - map[string]string{"host": "hostA"}, - map[string]interface{}{"value": int64(0)}, - time.Now(), - ), - testutil.MustMetric( - "name", - map[string]string{"host": "hostB"}, - map[string]interface{}{"value": int64(0)}, - time.Now(), - ), - }) - require.NoError(t, err) + // Wait for the metrics to be received + require.Eventuallyf(t, func() bool { + return server.count.Load() >= uint32(len(expected)) + }, 3*time.Second, 100*time.Millisecond, "expected %d got %d", len(expected), server.count.Load()) - var results []result - select { - case res := <-resCh: - require.Len(t, res, 3) - results = res - case <-time.After(3 * time.Second): - require.Fail(t, "Timeout while waiting for results") - } + // Stop listening + require.NoError(t, plugin.Close()) + server.listener.Close() + wg.Wait() - // Read first packet with two metrics, then the first auto-register packet and the second auto-register packet. - // Accept packet with the two metrics sent - require.NoError(t, results[0].err) + // Check the received metrics + server.Lock() + defer server.Unlock() - // Read the first auto-register packet - require.NoError(t, results[1].err) - require.Equal(t, "active checks", results[1].req.Request) - require.Equal(t, "xxx", results[1].req.HostMetadata) - - // Read the second auto-register packet - require.NoError(t, results[2].err) - require.Equal(t, "active checks", results[2].req.Request) - require.Equal(t, "xxx", results[2].req.HostMetadata) - - // Check we have received auto-registration for both hosts - hostsRegistered := []string{results[1].req.Host} - hostsRegistered = append(hostsRegistered, results[2].req.Host) - require.ElementsMatch(t, []string{"hostA", "hostB"}, hostsRegistered) -} - -// compareData compares generated data with expected data ignoring slice order if all Clocks are the same. -// This is useful for metrics with several fields that should produce several Zabbix values that -// could not be sorted by clock -func compareData(t *testing.T, expected, data []zabbixRequestData) { - t.Helper() - - var clock int64 - sameClock := true - - // Check if all clocks are the same - for i := 0; i < len(data); i++ { - if i == 0 { - clock = data[i].Clock - } else if clock != data[i].Clock { - sameClock = false - break - } - } - - // Zabbix requests with LLD data contains a JSON value with an array of dictionaries. - // That array order depends on the access to a map, so it does not have a defined order. - // To compare the data, we need to sort the array of dictionaries. - // Before comparing the requests, sort those values. - // To detect if a request contains LLD data, try to unmarshal it to a ZabbixLLDValue. - // If it could be unmarshalled, sort the slice and marshal it again. - for i := 0; i < len(data); i++ { - var lldValue zabbixLLDValue - - err := json.Unmarshal([]byte(data[i].Value), &lldValue) - if err == nil { - sort.Slice(lldValue.Data, func(i, j int) bool { - // Generate a global order based on the keys and values present in the map - keysValuesI := make([]string, 0, len(lldValue.Data[i])*2) - keysValuesJ := make([]string, 0, len(lldValue.Data[j])*2) - for k, v := range lldValue.Data[i] { - keysValuesI = append(keysValuesI, k, v) - } - for k, v := range lldValue.Data[j] { - keysValuesJ = append(keysValuesJ, k, v) - } - - sort.Strings(keysValuesI) - sort.Strings(keysValuesJ) - - return strings.Join(keysValuesI, "") < strings.Join(keysValuesJ, "") - }) - sortedValue, err := json.Marshal(lldValue) - require.NoError(t, err) - - data[i].Value = string(sortedValue) - } - } - - if sameClock { - require.ElementsMatch(t, expected, data) - } else { - require.Equal(t, expected, data) - } -} - -func (s *zabbixMockServer) listenForNRequests(n int) []result { - results := make([]result, 0, n) - defer s.listener.Close() - for i := 0; i < n; i++ { - res := s.listenForSingleRequest() - results = append(results, res) - } - - return results -} - -func (s *zabbixMockServer) listenForSingleRequest() result { - conn, err := s.listener.Accept() - if err != nil { - if s.ignoreAcceptError { - return result{req: zabbixRequest{}, err: nil} - } - return result{req: zabbixRequest{}, err: err} - } - defer conn.Close() - - if err = conn.SetDeadline(time.Now().Add(time.Second)); err != nil { - return result{req: zabbixRequest{}, err: err} - } - - // Obtain request from the mock zabbix server - // Read protocol header and version - header := make([]byte, 5) - _, err = conn.Read(header) - if err != nil { - return result{req: zabbixRequest{}, err: err} - } - - // Read data length - dataLengthRaw := make([]byte, 8) - _, err = conn.Read(dataLengthRaw) - if err != nil { - return result{req: zabbixRequest{}, err: err} - } - - dataLength := binary.LittleEndian.Uint64(dataLengthRaw) - - // Read data content - content := make([]byte, dataLength) - _, err = conn.Read(content) - if err != nil { - return result{req: zabbixRequest{}, err: err} - } - - // The zabbix output checks that there are not errors - // Simulated response from the server - resp := []byte("ZBXD\x01\x00\x00\x00\x00\x00\x00\x00\x00{\"response\": \"success\", \"info\": \"\"}\n") - _, err = conn.Write(resp) - if err != nil { - return result{req: zabbixRequest{}, err: err} - } - - // Strip zabbix header and get JSON request - var request zabbixRequest - err = json.Unmarshal(content, &request) - if err != nil { - return result{req: zabbixRequest{}, err: err} - } - - return result{req: request, err: nil} + require.Empty(t, server.errs, "server had errors") + actual := server.received + sort.SliceStable(expected, func(i, j int) bool { return expected[i].Host < expected[j].Host }) + sort.SliceStable(actual, func(i, j int) bool { return actual[i].Host < actual[j].Host }) + requireRequestDataEqual(t, expected, actual, false) } func TestBuildZabbixMetric(t *testing.T) { @@ -892,7 +950,8 @@ func TestBuildZabbixMetric(t *testing.T) { HostTag: hostTag, } - zm, err := z.buildZabbixMetric(testutil.MustMetric( + zm, err := z.buildZabbixMetric(metric.New( + "name", map[string]string{hostTag: "hostA", "foo": "bar", "a": "b"}, map[string]interface{}{}, @@ -903,7 +962,8 @@ func TestBuildZabbixMetric(t *testing.T) { require.NoError(t, err) require.Equal(t, keyPrefix+"name.value[b,bar]", zm.Key) - zm, err = z.buildZabbixMetric(testutil.MustMetric( + zm, err = z.buildZabbixMetric(metric.New( + "name", map[string]string{hostTag: "hostA"}, map[string]interface{}{}, @@ -947,7 +1007,8 @@ func TestGetHostname(t *testing.T) { for desc, test := range tests { t.Run(desc, func(t *testing.T) { - metric := testutil.MustMetric( + metric := metric.New( + "name", test.Tags, map[string]interface{}{}, @@ -960,3 +1021,252 @@ func TestGetHostname(t *testing.T) { }) } } + +func TestCases(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + // Get all testcase directories + folders, err := os.ReadDir("testcases") + require.NoError(t, err) + + // Register the plugin + outputs.Add("zabbix", func() telegraf.Output { + return &Zabbix{ + KeyPrefix: "telegraf.", + HostTag: "host", + AutoregisterResendInterval: config.Duration(time.Minute * 30), + LLDSendInterval: config.Duration(time.Minute * 10), + LLDClearInterval: config.Duration(time.Hour), + } + }) + + for _, f := range folders { + // Only handle folders + if !f.IsDir() { + continue + } + + t.Run(f.Name(), func(t *testing.T) { + testcasePath := filepath.Join("testcases", f.Name()) + configFilename := filepath.Join(testcasePath, "telegraf.conf") + inputFilename := filepath.Join(testcasePath, "input.influx") + expectedFilename := filepath.Join(testcasePath, "expected.out") + expectedErrorFilename := filepath.Join(testcasePath, "expected.err") + + // Get parser to parse input and expected output + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + + // Load the input data + input, err := testutil.ParseMetricsFromFile(inputFilename, parser) + require.NoError(t, err) + + // Read the expected output if any + var expected []zabbix.Packet + if _, err := os.Stat(expectedFilename); err == nil { + buf, err := os.ReadFile(expectedFilename) + require.NoError(t, err) + require.NoError(t, json.Unmarshal(buf, &expected)) + } + + // Read the expected output if any + var expectedError string + if _, err := os.Stat(expectedErrorFilename); err == nil { + expectedErrors, err := testutil.ParseLinesFromFile(expectedErrorFilename) + require.NoError(t, err) + require.Len(t, expectedErrors, 1) + expectedError = expectedErrors[0] + } + + // Configure the plugin + cfg := config.NewConfig() + require.NoError(t, cfg.LoadConfig(configFilename)) + require.Len(t, cfg.Outputs, 1) + + // Setup a Zabbix mock server and start listening + server, err := newZabbixMockServer() + require.NoError(t, err) + defer server.close() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + server.listen() + }() + defer server.listener.Close() + + // Setup the plugin + plugin := cfg.Outputs[0].Output.(*Zabbix) + plugin.Address = server.addr() + plugin.Log = testutil.Logger{} + require.NoError(t, plugin.Init()) + + // Connect and write the metric(s) + require.NoError(t, plugin.Connect()) + defer plugin.Close() + + err = plugin.Write(input) + if expectedError != "" { + require.ErrorContains(t, err, expectedError) + return + } + require.NoError(t, err) + + // Wait for the data to arrive + require.Eventuallyf(t, func() bool { + return server.count.Load() >= uint32(len(expected)) + }, 3*time.Second, 100*time.Millisecond, "expected %d got %d", len(expected), server.count.Load()) + + server.listener.Close() + wg.Wait() + + // Check the received data + server.Lock() + defer server.Unlock() + require.Empty(t, server.errs, "server had errors") + requireRequestDataEqual(t, expected, server.received, false) + }) + } +} + +type zabbixMockServer struct { + listener net.Listener + + received []zabbix.Packet + errs []error + count atomic.Uint32 + sync.Mutex +} + +func newZabbixMockServer() (*zabbixMockServer, error) { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return nil, err + } + return &zabbixMockServer{listener: l}, nil +} + +func (s *zabbixMockServer) addr() string { + return s.listener.Addr().String() +} + +func (s *zabbixMockServer) close() error { + if s.listener != nil { + return s.listener.Close() + } + return nil +} + +func (s *zabbixMockServer) listen() { + for { + request, err := s.handle() + if err != nil { + if !errors.Is(err, net.ErrClosed) { + s.Lock() + s.errs = append(s.errs, err) + s.Unlock() + } + return + } + s.Lock() + s.received = append(s.received, request) + s.Unlock() + s.count.Store(uint32(len(s.received))) + } +} + +func (s *zabbixMockServer) handle() (zabbix.Packet, error) { + conn, err := s.listener.Accept() + if err != nil { + return zabbix.Packet{}, err + } + defer conn.Close() + + if err := conn.SetDeadline(time.Now().Add(time.Second)); err != nil { + return zabbix.Packet{}, err + } + + // Obtain request from the mock zabbix server + // Read protocol header and version + header := make([]byte, 5) + if _, err := conn.Read(header); err != nil { + return zabbix.Packet{}, err + } + + // Read data length + dataLengthRaw := make([]byte, 8) + if _, err := conn.Read(dataLengthRaw); err != nil { + return zabbix.Packet{}, err + } + dataLength := binary.LittleEndian.Uint64(dataLengthRaw) + + // Read data content + content := make([]byte, dataLength) + if _, err := conn.Read(content); err != nil { + return zabbix.Packet{}, err + } + + // The zabbix output checks that there are not errors + // Simulated response from the server + resp := []byte("ZBXD\x01\x00\x00\x00\x00\x00\x00\x00\x00{\"response\": \"success\", \"info\": \"\"}\n") + if _, err := conn.Write(resp); err != nil { + return zabbix.Packet{}, err + } + + // Strip zabbix header and get JSON request + var request zabbix.Packet + if err := json.Unmarshal(content, &request); err != nil { + return zabbix.Packet{}, err + } + + return request, nil +} + +type lldValue struct { + Data []map[string]string `json:"data"` +} + +func requireRequestDataEqual(t *testing.T, expected, actual []zabbix.Packet, ignoreClock bool) { + t.Helper() + require.Len(t, actual, len(expected)) + for i, expectedReq := range expected { + actualReq := actual[i] + require.Equalf(t, expectedReq.Request, actualReq.Request, "different request types in request %d", i) + require.Equalf(t, expectedReq.Host, actualReq.Host, "different host in request %d", i) + require.Equalf(t, expectedReq.HostMetadata, actualReq.HostMetadata, "different hostmetadata in request %d", i) + + // Check the elements + require.Len(t, actualReq.Data, len(expectedReq.Data)) + + less := func(a, b *zabbix.Metric) bool { + if a.Key == b.Key { + if a.Clock == b.Clock { + return a.Value < b.Value + } + return a.Clock < b.Clock + } + return a.Key < b.Key + } + sort.SliceStable(actualReq.Data, func(i, j int) bool { return less(actualReq.Data[i], actualReq.Data[j]) }) + sort.SliceStable(expectedReq.Data, func(i, j int) bool { return less(expectedReq.Data[i], expectedReq.Data[j]) }) + for j, expectedData := range expectedReq.Data { + actualData := actualReq.Data[j] + require.Equalf(t, expectedData.Key, actualData.Key, "different key in request %d, data %d", i, j) + require.Equalf(t, expectedData.Host, actualData.Host, "different host in request %d, data %d", i, j) + if !ignoreClock { + require.Equalf(t, expectedData.Clock, actualData.Clock, "different clock in request %d, data %d", i, j) + } + if strings.HasPrefix(expectedData.Value, "{") { + var actualValue, expectedValue lldValue + require.NoError(t, json.Unmarshal([]byte(actualData.Value), &actualValue)) + require.NoError(t, json.Unmarshal([]byte(expectedData.Value), &expectedValue)) + require.ElementsMatchf(t, expectedValue.Data, actualValue.Data, "different value in request %d, data %d", i, j) + } else { + require.Equalf(t, expectedData.Value, actualData.Value, "different value in request %d, data %d", i, j) + } + } + } +}