feat(outputs): Add Zabbix plugin (#13739)
This commit is contained in:
parent
26e3eaaf21
commit
c8e12fa15a
|
|
@ -108,6 +108,7 @@ following works:
|
|||
- github.com/cpuguy83/dockercfg [MIT License](https://github.com/cpuguy83/dockercfg/blob/main/LICENSE)
|
||||
- github.com/cpuguy83/go-md2man [MIT License](https://github.com/cpuguy83/go-md2man/blob/master/LICENSE.md)
|
||||
- github.com/danieljoos/wincred [MIT License](https://github.com/danieljoos/wincred/blob/master/LICENSE)
|
||||
- github.com/datadope-io/go-zabbix [MIT License](https://github.com/datadope-io/go-zabbix/blob/master/LICENSE)
|
||||
- github.com/davecgh/go-spew [ISC License](https://github.com/davecgh/go-spew/blob/master/LICENSE)
|
||||
- github.com/devigned/tab [MIT License](https://github.com/devigned/tab/blob/master/LICENSE)
|
||||
- github.com/dgryski/go-rendezvous [MIT License](https://github.com/dgryski/go-rendezvous/blob/master/LICENSE)
|
||||
|
|
|
|||
1
go.mod
1
go.mod
|
|
@ -65,6 +65,7 @@ require (
|
|||
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f
|
||||
github.com/coreos/go-systemd/v22 v22.5.0
|
||||
github.com/couchbase/go-couchbase v0.1.1
|
||||
github.com/datadope-io/go-zabbix/v2 v2.0.1
|
||||
github.com/digitalocean/go-libvirt v0.0.0-20221205150000-2939327a8519
|
||||
github.com/dimchansky/utfbom v1.1.1
|
||||
github.com/djherbis/times v1.6.0
|
||||
|
|
|
|||
2
go.sum
2
go.sum
|
|
@ -1041,6 +1041,8 @@ github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY=
|
|||
github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4=
|
||||
github.com/danieljoos/wincred v1.2.0 h1:ozqKHaLK0W/ii4KVbbvluM91W2H3Sh0BncbUNPS7jLE=
|
||||
github.com/danieljoos/wincred v1.2.0/go.mod h1:FzQLLMKBFdvu+osBrnFODiv32YGwCfx0SkRa/eYHgec=
|
||||
github.com/datadope-io/go-zabbix/v2 v2.0.1 h1:kGlyzfFqbwhMph4Mo0hpYxxBHI14eHuV5TVy+7uNonE=
|
||||
github.com/datadope-io/go-zabbix/v2 v2.0.1/go.mod h1:hRbQWszykTUPoR6g5fJXfNwPFZpP3nDNSZ9HrEKuKCM=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
|
||||
|
|
|
|||
|
|
@ -0,0 +1,5 @@
|
|||
//go:build !custom || outputs || outputs.zabbix
|
||||
|
||||
package all
|
||||
|
||||
import _ "github.com/influxdata/telegraf/plugins/outputs/zabbix" // register plugin
|
||||
|
|
@ -0,0 +1,413 @@
|
|||
# Zabbix Output Plugin
|
||||
|
||||
This plugin send metrics to [Zabbix](https://www.zabbix.com/) via
|
||||
[traps][traps].
|
||||
|
||||
It has been tested with versions
|
||||
[3.0](https://www.zabbix.com/documentation/3.0/en/manual/appendix/items/trapper)
|
||||
,
|
||||
[4.0](https://www.zabbix.com/documentation/4.0/en/manual/appendix/items/trapper)
|
||||
and
|
||||
[6.0](https://www.zabbix.com/documentation/6.0/en/manual/appendix/items/trapper)
|
||||
.
|
||||
|
||||
[traps]: https://www.zabbix.com/documentation/current/en/manual/appendix/items/trapper
|
||||
|
||||
It should work with newer versions as long as Zabbix does not change the
|
||||
protocol.
|
||||
|
||||
## Global configuration options <!-- @/docs/includes/plugin_config.md -->
|
||||
|
||||
In addition to the plugin-specific configuration settings, plugins support
|
||||
additional global and plugin configuration settings. These settings are used to
|
||||
modify metrics, tags, and field or create aliases and configure ordering, etc.
|
||||
See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
||||
|
||||
[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins
|
||||
|
||||
## Configuration
|
||||
|
||||
```toml @sample.conf
|
||||
# Send metrics to Zabbix
|
||||
[[outputs.zabbix]]
|
||||
## Address and (optional) port of the Zabbix server
|
||||
address = "zabbix.example.com:10051"
|
||||
|
||||
## Send metrics as type "Zabbix agent (active)"
|
||||
# agent_active = false
|
||||
|
||||
## Add prefix to all keys sent to Zabbix.
|
||||
# key_prefix = "telegraf."
|
||||
|
||||
## Name of the tag that contains the host name. Used to set the host in Zabbix.
|
||||
## If the tag is not found, use the hostname of the system running Telegraf.
|
||||
# host_tag = "host"
|
||||
|
||||
## Skip measurement prefix to all keys sent to Zabbix.
|
||||
# skip_measurement_prefix = false
|
||||
|
||||
## This field will be sent as HostMetadata to Zabbix Server to autoregister the host.
|
||||
## To enable this feature, this option must be set to a value other than "".
|
||||
# autoregister = ""
|
||||
|
||||
## Interval to resend auto-registration data to Zabbix.
|
||||
## Only applies if autoregister feature is enabled.
|
||||
## This value is a lower limit, the actual resend should be triggered by the next flush interval.
|
||||
# autoregister_resend_interval = "30m"
|
||||
|
||||
## Interval to send LLD data to Zabbix.
|
||||
## This value is a lower limit, the actual resend should be triggered by the next flush interval.
|
||||
# lld_send_interval = "10m"
|
||||
|
||||
## Interval to delete stored LLD known data and start capturing it again.
|
||||
## This value is a lower limit, the actual resend should be triggered by the next flush interval.
|
||||
# lld_clear_interval = "1h"
|
||||
```
|
||||
|
||||
### agent_active
|
||||
|
||||
The `request` value in the package sent to Zabbix should be different if the
|
||||
items configured in Zabbix are [Zabbix trapper][zabbixtrapper] or
|
||||
[Zabbix agent (active)][zabbixagentactive].
|
||||
|
||||
`agent_active = false` will send data as _sender data_, expecting trapper items.
|
||||
|
||||
`agent_active = true` will send data as _agent data_, expecting active Zabbix
|
||||
agent items.
|
||||
|
||||
[zabbixtrapper]: https://www.zabbix.com/documentation/6.4/en/manual/config/items/itemtypes/trapper?hl=Trapper
|
||||
[zabbixagentactive]: https://www.zabbix.com/documentation/6.4/en/manual/config/items/itemtypes/zabbix_agent
|
||||
|
||||
### key_prefix
|
||||
|
||||
We can set a prefix that should be added to all Zabbix keys.
|
||||
|
||||
This is configurable with the option `key_prefix`, set by default to
|
||||
`telegraf.`.
|
||||
|
||||
Example how the configuration `key_prefix = "telegraf."` will generate the
|
||||
Zabbix keys given a Telegraf metric:
|
||||
|
||||
```diff
|
||||
- measurement,host=hostname valueA=0,valueB=1
|
||||
+ telegraf.measurement.valueA
|
||||
+ telegraf.measurement.valueB
|
||||
```
|
||||
|
||||
### skip_measurement_prefix
|
||||
|
||||
We can skip the measurement prefix added to all Zabbix keys.
|
||||
|
||||
Example with `skip_measurement_prefix = true"` and `prefix = "telegraf."`:
|
||||
|
||||
```diff
|
||||
- measurement,host=hostname valueA=0,valueB=1
|
||||
+ telegraf.valueA
|
||||
+ telegraf.valueB
|
||||
```
|
||||
|
||||
Example with `skip_measurement_prefix = true"` and `prefix = ""`:
|
||||
|
||||
```diff
|
||||
- measurement,host=hostname valueA=0,valueB=1
|
||||
+ valueA
|
||||
+ valueB
|
||||
```
|
||||
|
||||
### autoregister
|
||||
|
||||
If this field is active, Telegraf will send an
|
||||
[autoregister request][autoregisterrequest] to Zabbix, using the content of
|
||||
this field as the [HostMetadata][hostmetadata].
|
||||
|
||||
One request is sent for each of the different values seen by Telegraf for the
|
||||
`host` tag.
|
||||
|
||||
[autoregisterrequest]: https://www.zabbix.com/documentation/current/en/manual/discovery/auto_registration?hl=autoregistration
|
||||
[hostmetadata]: https://www.zabbix.com/documentation/current/en/manual/discovery/auto_registration?hl=autoregistration#using-host-metadata
|
||||
|
||||
### autoregister_resend_interval
|
||||
|
||||
If `autoregister` is defined, this field set the interval at which
|
||||
autoregister requests are resend to Zabbix.
|
||||
|
||||
The [telegraf interval format][intervals_format] should be used.
|
||||
|
||||
The actual send of the autoregister request will happen in the next output flush
|
||||
after this interval has been surpassed.
|
||||
|
||||
[intervals_format]: ../../../docs/CONFIGURATION.md#intervals
|
||||
|
||||
### lld_send_interval
|
||||
|
||||
To reduce the number of LLD requests sent to Zabbix (LLD processing is
|
||||
[expensive][lldexpensive]), this plugin will send only one per
|
||||
`lld_send_interval`.
|
||||
|
||||
When Telegraf is started, this plugin will start to collect the info needed to
|
||||
generate this LLD packets (measurements, tags keys and values).
|
||||
|
||||
Once this interval is surpassed, the next flush of this plugin will add the
|
||||
packet with the LLD data.
|
||||
|
||||
In the next interval, only new, or modified, LLDs will be sent.
|
||||
|
||||
[lldexpensive]: https://www.zabbix.com/documentation/4.2/en/manual/introduction/whatsnew420#:~:text=Daemons-,Separate%20processing%20for%20low%2Dlevel%20discovery,-Processing%20low%2Dlevel
|
||||
|
||||
### lld_clear_interval
|
||||
|
||||
When this interval is surpassed, the next flush will clear all the LLD data
|
||||
collected.
|
||||
|
||||
This allows this plugin to forget about old data and resend LLDs to Zabbix, in
|
||||
case the host has new discovery rules or the packet was lost.
|
||||
|
||||
If we have `flush_interval = "1m"`, `lld_send_interval = "10m"` and
|
||||
`lld_clear_interval = "1h"` and Telegraf is started at 00:00, the first LLD will
|
||||
be sent at 00:10. At 01:00 the LLD data will be deleted and at 01:10 LLD data
|
||||
will be resent.
|
||||
|
||||
## Trap format
|
||||
|
||||
For each new metric generated by Telegraf, this output plugin will send one
|
||||
trap for each field.
|
||||
|
||||
Given this Telegraf metric:
|
||||
|
||||
```text
|
||||
measurement,host=hostname valueA=0,valueB=1
|
||||
```
|
||||
|
||||
It will generate this Zabbix metrics:
|
||||
|
||||
```json
|
||||
{"host": "hostname", "key": "telegraf.measurement.valueA", "value": "0"}
|
||||
{"host": "hostname", "key": "telegraf.measurement.valueB", "value": "1"}
|
||||
```
|
||||
|
||||
If the metric has tags (aside from `host`), they will be added, in alphabetical
|
||||
order using the format for LLD metrics:
|
||||
|
||||
```text
|
||||
measurement,host=hostname,tagA=keyA,tagB=keyB valueA=0,valueB=1
|
||||
```
|
||||
|
||||
Zabbix generated metrics:
|
||||
|
||||
```json
|
||||
{"host": "hostname", "key": "telegraf.measurement.valueA[keyA,keyB]", "value": "0"}
|
||||
{"host": "hostname", "key": "telegraf.measurement.valueB[keyA,keyB]", "value": "1"}
|
||||
```
|
||||
|
||||
This order is based on the tags keys, not the tag values, so, for example, this
|
||||
Telegraf metric:
|
||||
|
||||
```text
|
||||
measurement,host=hostname,aaaTag=999,zzzTag=111 value=0
|
||||
```
|
||||
|
||||
Will generate this Zabbix metric:
|
||||
|
||||
```json
|
||||
{"host": "hostname", "key": "telegraf.measurement.value[999,111]", "value": "0"}
|
||||
```
|
||||
|
||||
## Zabbix low-level discovery
|
||||
|
||||
Zabbix needs an `item` created before receiving any metric. In some cases we do
|
||||
not know in advance what are we going to send, for example, the name of a
|
||||
container to send its cpu and memory consumption.
|
||||
|
||||
For this case Zabbix provides [low-level discovery][lld] that allow to create
|
||||
new items dinamically based on the parameters sent by the trap.
|
||||
|
||||
As explained previously, this output plugin will format the Zabbix key using
|
||||
the tags seen in the Telegraf metric following the LLD format.
|
||||
|
||||
To create those _discovered items_ this plugin uses the same mechanism as the
|
||||
Zabbix agent, collecting information about which tags has been seen for each
|
||||
measurement and periodically sending a request to a discovery rule with the
|
||||
collected data.
|
||||
|
||||
Keep in mind that, for metrics in this category, Zabbix will discard them until
|
||||
the low-level discovery (LLD) data is sent.
|
||||
Sending LLD to Zabbix is a heavy-weight process and is only done at the interval
|
||||
per the lld_send_interval setting.
|
||||
|
||||
[lld]: https://www.zabbix.com/documentation/current/manual/discovery/low_level_discovery
|
||||
|
||||
### Design
|
||||
|
||||
To explain how everything interconnects we will use an example with the
|
||||
`net_response` input:
|
||||
|
||||
```toml
|
||||
[[inputs.net_response]]
|
||||
protocol = "tcp"
|
||||
address = "example.com:80"
|
||||
```
|
||||
|
||||
This input will generate this metric:
|
||||
|
||||
```text
|
||||
$ telegraf -config example.conf -test
|
||||
* Plugin: inputs.net_response, Collection 1
|
||||
> net_response,server=example.com,port=80,protocol=tcp,host=myhost result_type="success",response_time=0.091026869 1522741063000000000
|
||||
```
|
||||
|
||||
Here we have four tags: server, port, protocol and host (this one will be
|
||||
assumed that is always present and treated differently).
|
||||
|
||||
The values those three parameters could take are unknown to Zabbix, so we
|
||||
cannot create trappers items in Zabbix to receive that values (at least without
|
||||
mixing that metric with another `net_response` metric with different tags).
|
||||
|
||||
To solve this problem we use a discovery rule in Zabbix, that will receive the
|
||||
different groups of tag values and create the traps to gather the metrics.
|
||||
|
||||
This plugin knows about three tags (excluding host) for the input
|
||||
`net_response`, therefore it will generate this new Telegraf metric:
|
||||
|
||||
```text
|
||||
lld.host=myhost net_response.port.protocol.server="{\"data\":[{\"{#PORT}\":\"80\",\"{#PROTOCOL}\":\"tcp\",\"{#SERVER}\":\"example.com\"}]}"
|
||||
```
|
||||
|
||||
Once sent, the final package will be:
|
||||
|
||||
```json
|
||||
{
|
||||
"request":"sender data",
|
||||
"data":[
|
||||
{
|
||||
"host":"myhost",
|
||||
"key":"telegraf.lld.net_response.port.protocol.server",
|
||||
"value":"{\"data\":[{\"{#PORT}\":\"80\",\"{#PROTOCOL}\":\"tcp\",\"{#SERVER}\":\"example.com\"}]}",
|
||||
"clock":1519043805
|
||||
}
|
||||
],
|
||||
"clock":1519043805
|
||||
}
|
||||
```
|
||||
|
||||
The Zabbix key is generated joining `lld`, the input name and tags (keys)
|
||||
alphabetically sorted.
|
||||
Some inputs could use different groups of tags for different fields, that is
|
||||
why the tags are added to the key, to allow having different discovery rules
|
||||
for the same input.
|
||||
|
||||
The tags used in `value` are changed to uppercase to match the format of Zabbix.
|
||||
|
||||
In the Zabbix server we should have a discovery rule associated with that key
|
||||
(telegraf.lld.net_response.port.protocol.server) and one item prototype for
|
||||
each field, in this case `result_type` and `response_time`.
|
||||
|
||||
The item prototypes will be Zabbix trappers with keys (data type should also
|
||||
match and some values will be better stored as _delta_):
|
||||
|
||||
```text
|
||||
telegraf.net_response.response_time[{#PORT},{#PROTOCOL},{#SERVER}]
|
||||
telegraf.net_response.result_type[{#PORT},{#PROTOCOL},{#SERVER}]
|
||||
```
|
||||
|
||||
The macros in the item prototypes keys should be alphabetically sorted so they
|
||||
can match the keys generated by this plugin.
|
||||
|
||||
With that keys and the example trap, the host `myhost` will have two new items:
|
||||
|
||||
```text
|
||||
telegraf.net_response.response_time[80,tcp,example.com]
|
||||
telegraf.net_response.result_type[80,tcp,example.com]
|
||||
```
|
||||
|
||||
This plugin, for each metric, will send traps to the Zabbix server following
|
||||
the same structure (INPUT.FIELD[tags sorted]...), filling the items created by
|
||||
the discovery rule.
|
||||
|
||||
In summary:
|
||||
|
||||
- we need a discovery rule with the correct key and one item prototype for each
|
||||
field
|
||||
- this plugin will generate traps to create items based on the metrics seen in
|
||||
Telegraf
|
||||
- it will also send the traps to fill the new created items
|
||||
|
||||
### Reducing the number of LLDs
|
||||
|
||||
This plugin remembers which LLDs has been sent to Zabbix and avoid generating
|
||||
the same metrics again, to avoid the cost of LLD processing in Zabbix.
|
||||
|
||||
It will only send LLD data each `lld_send_interval`.
|
||||
|
||||
But, could happen that package is lost or some host get new discovery rules, so
|
||||
each `lld_clear_interval` the plugin will forget about the known data and start
|
||||
collecting again.
|
||||
|
||||
### Note on inputs configuration
|
||||
|
||||
Which tags should expose each input should be controlled, because an unexpected
|
||||
tag could modify the trap key and will not match the trapper defined in Zabbix.
|
||||
|
||||
For example, in the docker input, each container label is a new tag.
|
||||
|
||||
To control this we can add to the input a config like:
|
||||
|
||||
```toml
|
||||
taginclude = ["host", "container_name"]
|
||||
```
|
||||
|
||||
Allowing only the tags "host" and "container_name" to be used to generate the
|
||||
key (and loosing the information provided in the others tags).
|
||||
|
||||
## Examples of metrics converted to traps
|
||||
|
||||
### Without tags
|
||||
|
||||
```text
|
||||
mem,host=myHost available_percent=14.684620843239944,used=14246531072i 152276442800000000
|
||||
```
|
||||
|
||||
```json
|
||||
{
|
||||
"request":"sender data",
|
||||
"data":[
|
||||
{
|
||||
"host":"myHost",
|
||||
"key":"telegraf.mem.available_percent",
|
||||
"value":"14.382719",
|
||||
"clock":1522764428
|
||||
},
|
||||
{
|
||||
"host":"myHost",
|
||||
"key":"telegraf.mem.used",
|
||||
"value":"14246531072",
|
||||
"clock":1522764428
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
### With tags
|
||||
|
||||
```text
|
||||
docker_container_net,host=myHost,container_name=laughing_babbage rx_errors=0i,tx_errors=0i 1522764038000000000
|
||||
```
|
||||
|
||||
```json
|
||||
{
|
||||
"request":"sender data",
|
||||
"data": [
|
||||
{
|
||||
"host":"myHost",
|
||||
"key":"telegraf.docker_container_net.rx_errors[laughing_babbage]",
|
||||
"value":"0",
|
||||
"clock":15227640380
|
||||
},
|
||||
{
|
||||
"host":"myHost",
|
||||
"key":"telegraf.docker_container_net.tx_errors[laughing_babbage]",
|
||||
"value":"0",
|
||||
"clock":15227640380
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
|
@ -0,0 +1,38 @@
|
|||
package zabbix
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// Add adds a host to the list of hosts to send autoregister data to Zabbix.
|
||||
// Only store information if autoregister is enabled (config Autoregister is not empty).
|
||||
func (z *Zabbix) autoregisterAdd(hostname string) {
|
||||
if z.Autoregister == "" {
|
||||
return
|
||||
}
|
||||
|
||||
if _, exists := z.autoregisterLastSend[hostname]; !exists {
|
||||
z.autoregisterLastSend[hostname] = time.Time{}
|
||||
}
|
||||
}
|
||||
|
||||
// Push sends autoregister data to Zabbix for each host.
|
||||
func (z *Zabbix) autoregisterPush() {
|
||||
if z.Autoregister == "" {
|
||||
return
|
||||
}
|
||||
|
||||
// For each "host" tag seen, send an autoregister request to Zabbix server.
|
||||
// z.AutoregisterSendPeriod is the interval at which requests are resend.
|
||||
for hostname, timeLastSend := range z.autoregisterLastSend {
|
||||
if time.Since(timeLastSend) > time.Duration(z.AutoregisterResendInterval) {
|
||||
z.Log.Debugf("Autoregistering host %q", hostname)
|
||||
|
||||
if err := z.sender.RegisterHost(hostname, z.Autoregister); err != nil {
|
||||
z.Log.Errorf("Autoregistering host %q: %v", hostname, err)
|
||||
}
|
||||
|
||||
z.autoregisterLastSend[hostname] = time.Now()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,122 @@
|
|||
package zabbix
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/datadope-io/go-zabbix/v2"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
)
|
||||
|
||||
// TestZabbixAutoregisterDisabledAdd tests that Add does not store information if autoregister is disabled.
|
||||
func TestZabbixAutoregisterDisabledAdd(t *testing.T) {
|
||||
z := Zabbix{
|
||||
autoregisterLastSend: make(map[string]time.Time),
|
||||
}
|
||||
|
||||
z.autoregisterAdd("hostname")
|
||||
require.Empty(t, z.autoregisterLastSend)
|
||||
}
|
||||
|
||||
// TestZabbixAutoregisterEnabledAdd tests that Add stores information if autoregister is enabled.
|
||||
func TestZabbixAutoregisterEnabledAdd(t *testing.T) {
|
||||
z := Zabbix{
|
||||
Autoregister: "autoregister",
|
||||
autoregisterLastSend: make(map[string]time.Time),
|
||||
}
|
||||
|
||||
z.autoregisterAdd("hostname")
|
||||
require.Len(t, z.autoregisterLastSend, 1)
|
||||
|
||||
require.Contains(t, z.autoregisterLastSend, "hostname")
|
||||
}
|
||||
|
||||
// TestZabbixAutoregisterPush tests different cases of Push with a table oriented test.
|
||||
func TestZabbixAutoregisterPush(t *testing.T) {
|
||||
zabbixSender := &mockZabbixSender{}
|
||||
z := Zabbix{
|
||||
Log: testutil.Logger{},
|
||||
AutoregisterResendInterval: config.Duration(1 * time.Second),
|
||||
autoregisterLastSend: make(map[string]time.Time),
|
||||
sender: zabbixSender,
|
||||
}
|
||||
|
||||
// Test that nothing is sent if autoregister is disabled.
|
||||
z.autoregisterPush()
|
||||
require.Empty(t, z.autoregisterLastSend)
|
||||
|
||||
// Test that nothing is sent if autoregister is enabled but no host is added.
|
||||
z.Autoregister = "autoregister"
|
||||
z.autoregisterPush()
|
||||
require.Empty(t, z.autoregisterLastSend)
|
||||
|
||||
// Test that autoregister is sent if autoregister is enabled and a host is added.
|
||||
z.Autoregister = "autoregister"
|
||||
z.autoregisterAdd("hostname")
|
||||
z.autoregisterPush()
|
||||
require.Len(t, z.autoregisterLastSend, 1)
|
||||
require.Equal(t, "hostname", zabbixSender.hostname)
|
||||
require.Equal(t, "autoregister", zabbixSender.hostMetadata)
|
||||
|
||||
// Test that autoregister is not sent if the last send was less than AutoregisterResendInterval ago.
|
||||
z.Autoregister = "autoregister"
|
||||
z.autoregisterAdd("hostname")
|
||||
z.autoregisterLastSend["hostname"] = time.Now().Add(time.Hour)
|
||||
zabbixSender.Reset()
|
||||
z.autoregisterPush()
|
||||
require.Len(t, z.autoregisterLastSend, 1)
|
||||
require.Equal(t, "", zabbixSender.hostname)
|
||||
require.Equal(t, "", zabbixSender.hostMetadata)
|
||||
|
||||
// Test that autoregister is sent if last send was more than autoregisterSendPeriod ago.
|
||||
z.Autoregister = "autoregister"
|
||||
z.autoregisterAdd("hostname")
|
||||
z.autoregisterLastSend["hostname"] = time.Now().Add(-24 * time.Hour)
|
||||
zabbixSender.Reset()
|
||||
z.autoregisterPush()
|
||||
require.Len(t, z.autoregisterLastSend, 1)
|
||||
require.Equal(t, "hostname", zabbixSender.hostname)
|
||||
require.Equal(t, "autoregister", zabbixSender.hostMetadata)
|
||||
}
|
||||
|
||||
// mockZabbixSender is a mock of ZabbixAutoregisterSender.
|
||||
type mockZabbixSender struct {
|
||||
hostname string
|
||||
hostMetadata string
|
||||
sendMetrics []*zabbix.Metric
|
||||
sendPackets []*zabbix.Packet
|
||||
}
|
||||
|
||||
// Reset resets the mock.
|
||||
func (m *mockZabbixSender) Reset() {
|
||||
m.hostname = ""
|
||||
m.hostMetadata = ""
|
||||
m.sendMetrics = nil
|
||||
m.sendPackets = nil
|
||||
}
|
||||
|
||||
// RegisterHost is a mock of ZabbixAutoregisterSender.RegisterHost.
|
||||
func (m *mockZabbixSender) RegisterHost(hostname, hostMetadata string) error {
|
||||
m.hostname = hostname
|
||||
m.hostMetadata = hostMetadata
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// RegisterHost is a mock of ZabbixAutoregisterSender.RegisterHost.
|
||||
func (m *mockZabbixSender) Send(packet *zabbix.Packet) (res zabbix.Response, err error) {
|
||||
m.sendPackets = append(m.sendPackets, packet)
|
||||
return zabbix.Response{}, nil
|
||||
}
|
||||
|
||||
func (m *mockZabbixSender) SendMetrics(metrics []*zabbix.Metric) (
|
||||
resActive zabbix.Response,
|
||||
resTrapper zabbix.Response,
|
||||
err error,
|
||||
) {
|
||||
m.sendMetrics = append(m.sendMetrics, metrics...)
|
||||
return zabbix.Response{}, zabbix.Response{}, nil
|
||||
}
|
||||
|
|
@ -0,0 +1,234 @@
|
|||
package zabbix
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
)
|
||||
|
||||
const (
|
||||
lldName = "lld"
|
||||
empty = `{"data":[]}`
|
||||
)
|
||||
|
||||
type lldInfo struct {
|
||||
Hostname string
|
||||
Key string
|
||||
DataHash uint64
|
||||
Data map[uint64]map[string]string
|
||||
}
|
||||
|
||||
func (i *lldInfo) hash() uint64 {
|
||||
ids := make([]uint64, 0, len(i.Data))
|
||||
for id := range i.Data {
|
||||
ids = append(ids, id)
|
||||
}
|
||||
sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
|
||||
|
||||
h := fnv.New64a()
|
||||
// Write cannot fail
|
||||
_ = binary.Write(h, internal.HostEndianness, lldSeriesID(i.Hostname, i.Key))
|
||||
h.Write([]byte{0})
|
||||
_ = binary.Write(h, internal.HostEndianness, ids)
|
||||
|
||||
return h.Sum64()
|
||||
}
|
||||
|
||||
func (i *lldInfo) metric(hostTag string) (telegraf.Metric, error) {
|
||||
values := make([]map[string]string, 0, len(i.Data))
|
||||
for _, v := range i.Data {
|
||||
values = append(values, v)
|
||||
}
|
||||
data := map[string]interface{}{"data": values}
|
||||
buf, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return metric.New(
|
||||
lldName,
|
||||
map[string]string{
|
||||
hostTag: i.Hostname,
|
||||
},
|
||||
map[string]interface{}{
|
||||
i.Key: buf,
|
||||
},
|
||||
time.Now(),
|
||||
), nil
|
||||
}
|
||||
|
||||
type zabbixLLD struct {
|
||||
log telegraf.Logger
|
||||
|
||||
// current is the collection of metrics added during the recent period
|
||||
current map[uint64]lldInfo
|
||||
|
||||
// previous stores the hashes of metrics received during the previous period
|
||||
previous map[uint64]lldInfo
|
||||
|
||||
// lastClear store the time of the last clear of the LLD data
|
||||
lastClear time.Time
|
||||
|
||||
// clearInterval after this number of pushes, all data is considered new.
|
||||
// The idea behind this parameter is to resend known LLDs with low freq in case
|
||||
// previous sent was not processed by Zabbix.
|
||||
clearInterval config.Duration
|
||||
|
||||
// hostTag is the name of the tag that contains the host name
|
||||
hostTag string
|
||||
}
|
||||
|
||||
// Push returns a slice of metrics to send to Zabbix with the LLD data using the accumulated info.
|
||||
// The name of the metric will be always "lld" (const).
|
||||
// It will have only one tag, with the host.
|
||||
// It will have an uniq field, with the LLD key as the key name and the JSON data as the value
|
||||
// Eg.: lld,host=hostA disk.device.fstype.mode.path="{\"data\":[...
|
||||
func (zl *zabbixLLD) Push() []telegraf.Metric {
|
||||
metrics := make([]telegraf.Metric, 0, len(zl.current))
|
||||
newPrevious := make(map[uint64]lldInfo, len(zl.current))
|
||||
|
||||
// Iterate over the data collected in the closing period and determine which
|
||||
// data needs to be send. This can be done by comparing the complete data
|
||||
// hash with what was previously sent (i.e. during last push). If different,
|
||||
// send a new LLD.
|
||||
seen := make(map[uint64]bool, len(zl.current))
|
||||
for series, info := range zl.current {
|
||||
dataHash := info.hash()
|
||||
|
||||
// Get the hash of the complete data and remember the data for next period
|
||||
newPrevious[series] = lldInfo{
|
||||
Hostname: info.Hostname,
|
||||
Key: info.Key,
|
||||
DataHash: dataHash,
|
||||
}
|
||||
seen[series] = true
|
||||
|
||||
// Skip already sent data
|
||||
previous, found := zl.previous[series]
|
||||
if found && previous.DataHash == dataHash {
|
||||
continue
|
||||
}
|
||||
|
||||
// For unseen series or series with new tags, we should send/resend
|
||||
// the data for discovery
|
||||
m, err := info.metric(zl.hostTag)
|
||||
if err != nil {
|
||||
zl.log.Warnf("Marshaling to JSON LLD tags in Zabbix format: %v", err)
|
||||
continue
|
||||
}
|
||||
metrics = append(metrics, m)
|
||||
}
|
||||
|
||||
// Check if we have seen the LLD in this cycle and send an empty LLD otherwise
|
||||
for series, info := range zl.previous {
|
||||
if seen[series] {
|
||||
continue
|
||||
}
|
||||
m := metric.New(
|
||||
lldName,
|
||||
map[string]string{
|
||||
zl.hostTag: info.Hostname,
|
||||
},
|
||||
map[string]interface{}{
|
||||
info.Key: []byte(empty),
|
||||
},
|
||||
time.Now(),
|
||||
)
|
||||
metrics = append(metrics, m)
|
||||
}
|
||||
|
||||
// Replace "previous" with the data of this period or clear previous
|
||||
// if enough time has passed
|
||||
if time.Since(zl.lastClear) < time.Duration(zl.clearInterval) {
|
||||
zl.previous = newPrevious
|
||||
} else {
|
||||
zl.previous = make(map[uint64]lldInfo, len(zl.previous))
|
||||
zl.lastClear = time.Now()
|
||||
}
|
||||
|
||||
// Clear current
|
||||
zl.current = make(map[uint64]lldInfo, len(zl.current))
|
||||
|
||||
return metrics
|
||||
}
|
||||
|
||||
// Add parse a metric and add it to the LLD cache.
|
||||
func (zl *zabbixLLD) Add(in telegraf.Metric) error {
|
||||
// Extract all necessary information from the metric
|
||||
// Get the metric tags. The tag-list is already sorted by key name
|
||||
tagList := in.TagList()
|
||||
|
||||
// Iterate over the tags and extract
|
||||
// - the hostname contained in the host tag
|
||||
// - the key-values of the tags WITHOUT the host tag
|
||||
// - the LLD-key for sending the metric in the form
|
||||
// <metric>.<tag key 1>[,<tag key 2>...,<tag key N>]
|
||||
// - a hash for the metric
|
||||
var hostname string
|
||||
keys := make([]string, 0, len(tagList))
|
||||
data := make(map[string]string, len(tagList))
|
||||
for _, tag := range tagList {
|
||||
// Extract the host key and skip everything else
|
||||
if tag.Key == zl.hostTag {
|
||||
hostname = tag.Value
|
||||
continue
|
||||
}
|
||||
|
||||
// Collect the tag keys for generating the lld-key later
|
||||
if tag.Value != "" {
|
||||
keys = append(keys, tag.Key)
|
||||
}
|
||||
|
||||
// Prepare the data for lld-metric
|
||||
key := "{#" + strings.ToUpper(tag.Key) + "}"
|
||||
data[key] = tag.Value
|
||||
}
|
||||
|
||||
if len(keys) == 0 {
|
||||
// Ignore metrics without tags (apart from the host tag)
|
||||
return nil
|
||||
}
|
||||
key := in.Name() + "." + strings.Join(keys, ".")
|
||||
|
||||
// If hostname is not defined, use the hostname of the system
|
||||
if hostname == "" {
|
||||
var err error
|
||||
hostname, err = os.Hostname()
|
||||
if err != nil {
|
||||
return fmt.Errorf("no hostname found and unable to get hostname from system: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Try to lookup the Zabbix series in the already received metrics and
|
||||
// create a new one if not found
|
||||
series := lldSeriesID(hostname, key)
|
||||
if _, found := zl.current[series]; !found {
|
||||
zl.current[series] = lldInfo{
|
||||
Hostname: hostname,
|
||||
Key: key,
|
||||
Data: make(map[uint64]map[string]string),
|
||||
}
|
||||
}
|
||||
zl.current[series].Data[in.HashID()] = data
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func lldSeriesID(hostname, key string) uint64 {
|
||||
h := fnv.New64a()
|
||||
h.Write([]byte(hostname))
|
||||
h.Write([]byte{0})
|
||||
h.Write([]byte(key))
|
||||
h.Write([]byte{0})
|
||||
return h.Sum64()
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
|
|
@ -0,0 +1,34 @@
|
|||
# Send metrics to Zabbix
|
||||
[[outputs.zabbix]]
|
||||
## Address and (optional) port of the Zabbix server
|
||||
address = "zabbix.example.com:10051"
|
||||
|
||||
## Send metrics as type "Zabbix agent (active)"
|
||||
# agent_active = false
|
||||
|
||||
## Add prefix to all keys sent to Zabbix.
|
||||
# key_prefix = "telegraf."
|
||||
|
||||
## Name of the tag that contains the host name. Used to set the host in Zabbix.
|
||||
## If the tag is not found, use the hostname of the system running Telegraf.
|
||||
# host_tag = "host"
|
||||
|
||||
## Skip measurement prefix to all keys sent to Zabbix.
|
||||
# skip_measurement_prefix = false
|
||||
|
||||
## This field will be sent as HostMetadata to Zabbix Server to autoregister the host.
|
||||
## To enable this feature, this option must be set to a value other than "".
|
||||
# autoregister = ""
|
||||
|
||||
## Interval to resend auto-registration data to Zabbix.
|
||||
## Only applies if autoregister feature is enabled.
|
||||
## This value is a lower limit, the actual resend should be triggered by the next flush interval.
|
||||
# autoregister_resend_interval = "30m"
|
||||
|
||||
## Interval to send LLD data to Zabbix.
|
||||
## This value is a lower limit, the actual resend should be triggered by the next flush interval.
|
||||
# lld_send_interval = "10m"
|
||||
|
||||
## Interval to delete stored LLD known data and start capturing it again.
|
||||
## This value is a lower limit, the actual resend should be triggered by the next flush interval.
|
||||
# lld_clear_interval = "1h"
|
||||
|
|
@ -0,0 +1,240 @@
|
|||
//go:generate ../../../tools/readme_config_includer/generator
|
||||
package zabbix
|
||||
|
||||
import (
|
||||
_ "embed"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/datadope-io/go-zabbix/v2"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
)
|
||||
|
||||
// zabbixSender is an interface to send autoregister data to Zabbix.
|
||||
// It is implemented by Zabbix.Sender.
|
||||
// Created to be able to mock Zabbix.Sender in tests.
|
||||
type zabbixSender interface {
|
||||
Send(packet *zabbix.Packet) (res zabbix.Response, err error)
|
||||
SendMetrics(metrics []*zabbix.Metric) (resActive zabbix.Response, resTrapper zabbix.Response, err error)
|
||||
RegisterHost(hostname string, hostMetadata string) error
|
||||
}
|
||||
|
||||
// Zabbix allows pushing metrics to Zabbix software
|
||||
type Zabbix struct {
|
||||
Address string `toml:"address"`
|
||||
AgentActive bool `toml:"agent_active"`
|
||||
KeyPrefix string `toml:"key_prefix"`
|
||||
HostTag string `toml:"host_tag"`
|
||||
SkipMeasurementPrefix bool `toml:"skip_measurement_prefix"`
|
||||
LLDSendInterval config.Duration `toml:"lld_send_interval"`
|
||||
LLDClearInterval config.Duration `toml:"lld_clear_interval"`
|
||||
Autoregister string `toml:"autoregister"`
|
||||
AutoregisterResendInterval config.Duration `toml:"autoregister_resend_interval"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
|
||||
// lldHandler handles low level discovery data
|
||||
lldHandler zabbixLLD
|
||||
// lldLastSend store the last LLD send to known where to send it again
|
||||
lldLastSend time.Time
|
||||
// autoregisterLastSend stores the last time autoregister data was sent to Zabbix for each host.
|
||||
autoregisterLastSend map[string]time.Time
|
||||
// sender is the interface to send data to Zabbix.
|
||||
sender zabbixSender
|
||||
}
|
||||
|
||||
//go:embed sample.conf
|
||||
var sampleConfig string
|
||||
|
||||
func (*Zabbix) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
// Connect does nothing, Write() would initiate connection in each call.
|
||||
// Checking if Zabbix server is alive in this step does not allow Telegraf
|
||||
// to start if there is a temporal connection problem with the server.
|
||||
func (z *Zabbix) Connect() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Init initilizes LLD and autoregister maps. Copy config values to them. Configure Logger.
|
||||
func (z *Zabbix) Init() error {
|
||||
// Add port to address if not present
|
||||
if _, _, err := net.SplitHostPort(z.Address); err != nil {
|
||||
z.Address = net.JoinHostPort(z.Address, "10051")
|
||||
}
|
||||
|
||||
z.sender = zabbix.NewSender(z.Address)
|
||||
// Initialize autoregisterLastSend map with size one, as the most common scenario is to have one host.
|
||||
z.autoregisterLastSend = make(map[string]time.Time, 1)
|
||||
z.lldLastSend = time.Now()
|
||||
|
||||
z.lldHandler = zabbixLLD{
|
||||
log: z.Log,
|
||||
hostTag: z.HostTag,
|
||||
clearInterval: z.LLDClearInterval,
|
||||
lastClear: time.Now(),
|
||||
current: make(map[uint64]lldInfo, 100),
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (z *Zabbix) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Write sends metrics to Zabbix server
|
||||
func (z *Zabbix) Write(metrics []telegraf.Metric) error {
|
||||
if len(metrics) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
zbxMetrics := make([]*zabbix.Metric, 0, len(metrics))
|
||||
|
||||
for _, metric := range metrics {
|
||||
hostname, err := getHostname(z.HostTag, metric)
|
||||
if err != nil {
|
||||
z.Log.Errorf("Error getting hostname for metric %v: %v", metric, err)
|
||||
continue
|
||||
}
|
||||
|
||||
zbxMetrics = append(zbxMetrics, z.processMetric(metric)...)
|
||||
|
||||
// Handle hostname for autoregister
|
||||
z.autoregisterAdd(hostname)
|
||||
|
||||
// Process LLD data
|
||||
err = z.lldHandler.Add(metric)
|
||||
if err != nil {
|
||||
z.Log.Errorf("Error processing LLD for metric %v: %v", metric, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Send LLD data if enough time has passed
|
||||
if time.Since(z.lldLastSend) > time.Duration(z.LLDSendInterval) {
|
||||
z.lldLastSend = time.Now()
|
||||
for _, lldMetric := range z.lldHandler.Push() {
|
||||
zbxMetrics = append(zbxMetrics, z.processMetric(lldMetric)...)
|
||||
}
|
||||
}
|
||||
|
||||
// Send metrics to Zabbix server
|
||||
err := z.sendZabbixMetrics(zbxMetrics)
|
||||
|
||||
// Send autoregister data after sending metrics.
|
||||
z.autoregisterPush()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// sendZabbixMetrics sends metrics to Zabbix server
|
||||
func (z *Zabbix) sendZabbixMetrics(zbxMetrics []*zabbix.Metric) error {
|
||||
if len(zbxMetrics) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Sort metrics by time.
|
||||
// Avoid extra work in Zabbix when generating the trends.
|
||||
// If values are not sent in clock order, trend generation is forced to
|
||||
// make more database operations.
|
||||
// When a value is received with a new hour, trend is flushed to the
|
||||
// database.
|
||||
// If later a value is received with the previous hour, new trend is
|
||||
// flushed, old one is retrieved from database and updated.
|
||||
// When a new value with the new hour is received, old trend is flushed,
|
||||
// new trend retrieved from database and updated.
|
||||
sort.Slice(zbxMetrics, func(i, j int) bool {
|
||||
return zbxMetrics[i].Clock < zbxMetrics[j].Clock
|
||||
})
|
||||
|
||||
packet := zabbix.NewPacket(zbxMetrics, z.AgentActive)
|
||||
_, err := z.sender.Send(packet)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// processMetric converts a Telegraf metric to a list of Zabbix metrics.
|
||||
// Ignore metrics with no hostname.
|
||||
func (z Zabbix) processMetric(metric telegraf.Metric) []*zabbix.Metric {
|
||||
zbxMetrics := make([]*zabbix.Metric, 0, len(metric.FieldList()))
|
||||
|
||||
for _, field := range metric.FieldList() {
|
||||
zbxMetric, err := z.buildZabbixMetric(metric, field.Key, field.Value)
|
||||
if err != nil {
|
||||
z.Log.Errorf("Error converting telegraf metric to Zabbix format: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
zbxMetrics = append(zbxMetrics, zbxMetric)
|
||||
}
|
||||
|
||||
return zbxMetrics
|
||||
}
|
||||
|
||||
// buildZabbixMetric builds a Zabbix metric from a Telegraf metric, for one particular value.
|
||||
func (z Zabbix) buildZabbixMetric(metric telegraf.Metric, fieldName string, value interface{}) (*zabbix.Metric, error) {
|
||||
hostname, err := getHostname(z.HostTag, metric)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error getting hostname: %w", err)
|
||||
}
|
||||
|
||||
metricValue, err := internal.ToString(value)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error converting value: %w", err)
|
||||
}
|
||||
|
||||
key := z.KeyPrefix + metric.Name() + "." + fieldName
|
||||
if z.SkipMeasurementPrefix {
|
||||
key = z.KeyPrefix + fieldName
|
||||
}
|
||||
|
||||
// Ignore host tag.
|
||||
// We want to add tags to the key in alphabetical order. Eg.:
|
||||
// lld.dns_query.query_time_ms[DOMAIN,RECORD_TYPE,SERVER]
|
||||
// TagList already return the tags in alphabetical order.
|
||||
tagValues := make([]string, 0, len(metric.TagList()))
|
||||
|
||||
for _, tag := range metric.TagList() {
|
||||
if tag.Key == z.HostTag {
|
||||
continue
|
||||
}
|
||||
|
||||
// Get tag values in the same order as the tag keys in the tags slice.
|
||||
tagValues = append(tagValues, tag.Value)
|
||||
}
|
||||
|
||||
if len(tagValues) != 0 {
|
||||
key = fmt.Sprintf("%v[%v]", key, strings.Join(tagValues, ","))
|
||||
}
|
||||
|
||||
return zabbix.NewMetric(hostname, key, metricValue, z.AgentActive, metric.Time().Unix()), nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
outputs.Add("zabbix", func() telegraf.Output {
|
||||
return &Zabbix{
|
||||
KeyPrefix: "telegraf.",
|
||||
HostTag: "host",
|
||||
AutoregisterResendInterval: config.Duration(time.Minute * 30),
|
||||
LLDSendInterval: config.Duration(time.Minute * 10),
|
||||
LLDClearInterval: config.Duration(time.Hour),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// getHostname returns the hostname from the tags, or the system hostname if not found.
|
||||
func getHostname(hostTag string, metric telegraf.Metric) (string, error) {
|
||||
if hostname, ok := metric.GetTag(hostTag); ok {
|
||||
return hostname, nil
|
||||
}
|
||||
|
||||
return os.Hostname()
|
||||
}
|
||||
|
|
@ -0,0 +1,920 @@
|
|||
package zabbix
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
)
|
||||
|
||||
type zabbixRequestData struct {
|
||||
Host string `json:"host"`
|
||||
Key string `json:"key"`
|
||||
Value string `json:"value"`
|
||||
Clock int64 `json:"clock"`
|
||||
}
|
||||
|
||||
type zabbixRequest struct {
|
||||
Request string `json:"request"`
|
||||
Data []zabbixRequestData `json:"data"`
|
||||
Clock int `json:"clock"`
|
||||
Host string `json:"host"`
|
||||
HostMetadata string `json:"host_metadata"`
|
||||
}
|
||||
|
||||
type zabbixLLDValue struct {
|
||||
Data []map[string]string `json:"data"`
|
||||
}
|
||||
|
||||
func TestZabbix(t *testing.T) {
|
||||
hostname, err := os.Hostname()
|
||||
require.NoError(t, err)
|
||||
|
||||
tests := map[string]struct {
|
||||
KeyPrefix string
|
||||
AgentActive bool
|
||||
SkipMeasurementPrefix bool
|
||||
telegrafMetrics []telegraf.Metric
|
||||
zabbixMetrics []zabbixRequestData
|
||||
}{
|
||||
"send one metric with one field and no extra tags, generates one zabbix metric": {
|
||||
telegrafMetrics: []telegraf.Metric{
|
||||
testutil.MustMetric("name",
|
||||
map[string]string{
|
||||
"host": "hostname",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"value": int64(0),
|
||||
},
|
||||
time.Unix(1522082244, 0),
|
||||
),
|
||||
},
|
||||
zabbixMetrics: []zabbixRequestData{
|
||||
{
|
||||
Host: "hostname",
|
||||
Key: "name.value",
|
||||
Value: "0",
|
||||
Clock: 1522082244,
|
||||
},
|
||||
},
|
||||
},
|
||||
"string values representing a float number should be sent in the exact same format": {
|
||||
telegrafMetrics: []telegraf.Metric{
|
||||
testutil.MustMetric("name",
|
||||
map[string]string{
|
||||
"host": "hostname",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"value": "3.1415",
|
||||
},
|
||||
time.Unix(1522082244, 0),
|
||||
),
|
||||
},
|
||||
zabbixMetrics: []zabbixRequestData{
|
||||
{
|
||||
Host: "hostname",
|
||||
Key: "name.value",
|
||||
Value: "3.1415",
|
||||
Clock: 1522082244,
|
||||
},
|
||||
},
|
||||
},
|
||||
"send one metric with one string field and no extra tags, generates one zabbix metric": {
|
||||
telegrafMetrics: []telegraf.Metric{
|
||||
testutil.MustMetric("name",
|
||||
map[string]string{
|
||||
"host": "hostname",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"value": "some value",
|
||||
},
|
||||
time.Unix(1522082244, 0),
|
||||
),
|
||||
},
|
||||
zabbixMetrics: []zabbixRequestData{
|
||||
{
|
||||
Host: "hostname",
|
||||
Key: "name.value",
|
||||
Value: "some value",
|
||||
Clock: 1522082244,
|
||||
},
|
||||
},
|
||||
},
|
||||
"boolean values are converted to 1 (true) or 0 (false)": {
|
||||
telegrafMetrics: []telegraf.Metric{
|
||||
testutil.MustMetric("name",
|
||||
map[string]string{
|
||||
"host": "hostname",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"valueTrue": true,
|
||||
"valueFalse": false,
|
||||
},
|
||||
time.Unix(1522082244, 0),
|
||||
),
|
||||
},
|
||||
zabbixMetrics: []zabbixRequestData{
|
||||
{
|
||||
Host: "hostname",
|
||||
Key: "name.valueTrue",
|
||||
Value: "true",
|
||||
Clock: 1522082244,
|
||||
},
|
||||
{
|
||||
Host: "hostname",
|
||||
Key: "name.valueFalse",
|
||||
Value: "false",
|
||||
Clock: 1522082244,
|
||||
},
|
||||
},
|
||||
},
|
||||
"invalid value data is ignored and not sent": {
|
||||
telegrafMetrics: []telegraf.Metric{
|
||||
testutil.MustMetric("name",
|
||||
map[string]string{
|
||||
"host": "hostname",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"value": []int{1, 2},
|
||||
},
|
||||
time.Unix(1522082244, 0),
|
||||
),
|
||||
},
|
||||
zabbixMetrics: []zabbixRequestData{},
|
||||
},
|
||||
"metrics without host tag use the system hostname": {
|
||||
telegrafMetrics: []telegraf.Metric{
|
||||
testutil.MustMetric("name",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"value": "x",
|
||||
},
|
||||
time.Unix(1522082244, 0),
|
||||
),
|
||||
},
|
||||
zabbixMetrics: []zabbixRequestData{
|
||||
{
|
||||
Host: hostname,
|
||||
Key: "name.value",
|
||||
Value: "x",
|
||||
Clock: 1522082244,
|
||||
},
|
||||
},
|
||||
},
|
||||
"send one metric with extra tags, zabbix metric should be generated with a parameter": {
|
||||
telegrafMetrics: []telegraf.Metric{
|
||||
testutil.MustMetric("name",
|
||||
map[string]string{
|
||||
"host": "hostname",
|
||||
"foo": "bar",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"value": int64(0),
|
||||
},
|
||||
time.Unix(1522082244, 0),
|
||||
),
|
||||
},
|
||||
zabbixMetrics: []zabbixRequestData{
|
||||
{
|
||||
Host: "hostname",
|
||||
Key: "name.value[bar]",
|
||||
Value: "0",
|
||||
Clock: 1522082244,
|
||||
},
|
||||
},
|
||||
},
|
||||
"send one metric with two extra tags, zabbix parameters should be alfabetically orderer": {
|
||||
telegrafMetrics: []telegraf.Metric{
|
||||
testutil.MustMetric("name",
|
||||
map[string]string{
|
||||
"host": "hostname",
|
||||
"zparam": "last",
|
||||
"aparam": "first",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"value": int64(0),
|
||||
},
|
||||
time.Unix(1522082244, 0),
|
||||
),
|
||||
},
|
||||
zabbixMetrics: []zabbixRequestData{
|
||||
{
|
||||
Host: "hostname",
|
||||
Key: "name.value[first,last]",
|
||||
Value: "0",
|
||||
Clock: 1522082244,
|
||||
},
|
||||
},
|
||||
},
|
||||
"send one metric with two fields and no extra tags, generates two zabbix metrics": {
|
||||
telegrafMetrics: []telegraf.Metric{
|
||||
testutil.MustMetric("name",
|
||||
map[string]string{
|
||||
"host": "hostname",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"valueA": int64(0),
|
||||
"valueB": int64(1),
|
||||
},
|
||||
time.Unix(1522082244, 0),
|
||||
),
|
||||
},
|
||||
zabbixMetrics: []zabbixRequestData{
|
||||
{
|
||||
Host: "hostname",
|
||||
Key: "name.valueA",
|
||||
Value: "0",
|
||||
Clock: 1522082244,
|
||||
},
|
||||
{
|
||||
Host: "hostname",
|
||||
Key: "name.valueB",
|
||||
Value: "1",
|
||||
Clock: 1522082244,
|
||||
},
|
||||
},
|
||||
},
|
||||
"send two metrics with one field and no extra tags, generates two zabbix metrics": {
|
||||
telegrafMetrics: []telegraf.Metric{
|
||||
testutil.MustMetric("nameA",
|
||||
map[string]string{
|
||||
"host": "hostname",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"value": int64(0),
|
||||
},
|
||||
time.Unix(1522082244, 0),
|
||||
),
|
||||
testutil.MustMetric("nameB",
|
||||
map[string]string{
|
||||
"host": "hostname",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"value": int64(0),
|
||||
},
|
||||
time.Unix(1522082244, 0),
|
||||
),
|
||||
},
|
||||
zabbixMetrics: []zabbixRequestData{
|
||||
{
|
||||
Host: "hostname",
|
||||
Key: "nameA.value",
|
||||
Value: "0",
|
||||
Clock: 1522082244,
|
||||
},
|
||||
{
|
||||
Host: "hostname",
|
||||
Key: "nameB.value",
|
||||
Value: "0",
|
||||
Clock: 1522082244,
|
||||
},
|
||||
},
|
||||
},
|
||||
"send two metrics with different hostname, generates two zabbix metrics for different hosts": {
|
||||
telegrafMetrics: []telegraf.Metric{
|
||||
testutil.MustMetric("name",
|
||||
map[string]string{
|
||||
"host": "hostnameA",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"value": int64(0),
|
||||
},
|
||||
time.Unix(1522082244, 0),
|
||||
),
|
||||
testutil.MustMetric("name",
|
||||
map[string]string{
|
||||
"host": "hostnameB",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"value": int64(0),
|
||||
},
|
||||
time.Unix(1522082244, 0),
|
||||
),
|
||||
},
|
||||
zabbixMetrics: []zabbixRequestData{
|
||||
{
|
||||
Host: "hostnameA",
|
||||
Key: "name.value",
|
||||
Value: "0",
|
||||
Clock: 1522082244,
|
||||
},
|
||||
{
|
||||
Host: "hostnameB",
|
||||
Key: "name.value",
|
||||
Value: "0",
|
||||
Clock: 1522082244,
|
||||
},
|
||||
},
|
||||
},
|
||||
"if key_prefix is configured, zabbix metrics should have that prefix in the key": {
|
||||
KeyPrefix: "telegraf.",
|
||||
telegrafMetrics: []telegraf.Metric{
|
||||
testutil.MustMetric("name",
|
||||
map[string]string{
|
||||
"host": "hostname",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"value": int64(0),
|
||||
},
|
||||
time.Unix(1522082244, 0),
|
||||
),
|
||||
},
|
||||
zabbixMetrics: []zabbixRequestData{
|
||||
{
|
||||
Host: "hostname",
|
||||
Key: "telegraf.name.value",
|
||||
Value: "0",
|
||||
Clock: 1522082244,
|
||||
},
|
||||
},
|
||||
},
|
||||
"if skip_measurement_prefix is configured, zabbix metrics should have to skip that prefix in the key": {
|
||||
SkipMeasurementPrefix: true,
|
||||
telegrafMetrics: []telegraf.Metric{
|
||||
testutil.MustMetric("name",
|
||||
map[string]string{
|
||||
"host": "hostname",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"value": int64(0),
|
||||
},
|
||||
time.Unix(1522082244, 0),
|
||||
),
|
||||
},
|
||||
zabbixMetrics: []zabbixRequestData{
|
||||
{
|
||||
Host: "hostname",
|
||||
Key: "value",
|
||||
Value: "0",
|
||||
Clock: 1522082244,
|
||||
},
|
||||
},
|
||||
},
|
||||
"if AgentActive is configured, zabbix metrics should be sent respecting that protocol": {
|
||||
AgentActive: true,
|
||||
telegrafMetrics: []telegraf.Metric{
|
||||
testutil.MustMetric("name",
|
||||
map[string]string{
|
||||
"host": "hostname",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"value": int64(0),
|
||||
},
|
||||
time.Unix(1522082244, 0),
|
||||
),
|
||||
},
|
||||
zabbixMetrics: []zabbixRequestData{
|
||||
{
|
||||
Host: "hostname",
|
||||
Key: "name.value",
|
||||
Value: "0",
|
||||
Clock: 1522082244,
|
||||
},
|
||||
},
|
||||
},
|
||||
"metrics should be time sorted, oldest to newest, to avoid zabbix doing extra work when generating trends": {
|
||||
telegrafMetrics: []telegraf.Metric{
|
||||
testutil.MustMetric("name",
|
||||
map[string]string{
|
||||
"host": "hostnameD",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"value": int64(0),
|
||||
},
|
||||
time.Unix(4444444444, 0),
|
||||
),
|
||||
testutil.MustMetric("name",
|
||||
map[string]string{
|
||||
"host": "hostnameC",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"value": int64(0),
|
||||
},
|
||||
time.Unix(3333333333, 0),
|
||||
),
|
||||
testutil.MustMetric("name",
|
||||
map[string]string{
|
||||
"host": "hostnameA",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"value": int64(0),
|
||||
},
|
||||
time.Unix(1111111111, 0),
|
||||
),
|
||||
testutil.MustMetric("name",
|
||||
map[string]string{
|
||||
"host": "hostnameB",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"value": int64(0),
|
||||
},
|
||||
time.Unix(2222222222, 0),
|
||||
),
|
||||
},
|
||||
zabbixMetrics: []zabbixRequestData{
|
||||
{
|
||||
Host: "hostnameA",
|
||||
Key: "name.value",
|
||||
Value: "0",
|
||||
Clock: 1111111111,
|
||||
},
|
||||
{
|
||||
Host: "hostnameB",
|
||||
Key: "name.value",
|
||||
Value: "0",
|
||||
Clock: 2222222222,
|
||||
},
|
||||
{
|
||||
Host: "hostnameC",
|
||||
Key: "name.value",
|
||||
Value: "0",
|
||||
Clock: 3333333333,
|
||||
},
|
||||
{
|
||||
Host: "hostnameD",
|
||||
Key: "name.value",
|
||||
Value: "0",
|
||||
Clock: 4444444444,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for desc, test := range tests {
|
||||
t.Run(desc, func(t *testing.T) {
|
||||
// Simulate a Zabbix server to get the data sent. It has a timeout to avoid waiting forever.
|
||||
listener, err := net.Listen("tcp", "127.0.0.1:")
|
||||
require.NoError(t, err)
|
||||
defer listener.Close()
|
||||
|
||||
z := &Zabbix{
|
||||
Address: listener.Addr().String(),
|
||||
KeyPrefix: test.KeyPrefix,
|
||||
HostTag: "host",
|
||||
SkipMeasurementPrefix: test.SkipMeasurementPrefix,
|
||||
AgentActive: test.AgentActive,
|
||||
LLDSendInterval: config.Duration(10 * time.Minute),
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
require.NoError(t, z.Init())
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
success := make(chan zabbixRequest, 1)
|
||||
|
||||
go func() {
|
||||
success <- listenForZabbixMetric(t, listener, len(test.zabbixMetrics) == 0)
|
||||
}()
|
||||
|
||||
// By default we use trappers
|
||||
requestType := "sender data"
|
||||
if test.AgentActive {
|
||||
requestType = "agent data"
|
||||
}
|
||||
|
||||
select {
|
||||
case request := <-success:
|
||||
require.Equal(t, requestType, request.Request)
|
||||
compareData(t, test.zabbixMetrics, request.Data)
|
||||
case <-time.After(1 * time.Second):
|
||||
require.Empty(t, test.zabbixMetrics, "no metrics should be expected if the connection times out")
|
||||
}
|
||||
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
require.NoError(t, z.Write(test.telegrafMetrics))
|
||||
|
||||
// Wait for zabbix server emulator to finish
|
||||
wg.Wait()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestLLD tests how LLD metrics are sent simulating the time passing.
|
||||
// LLD is sent each LLDSendInterval. Only new data.
|
||||
// LLD data is cleared LLDClearInterval.
|
||||
func TestLLD(t *testing.T) {
|
||||
// Telegraf metric which will be sent repeatedly
|
||||
m := testutil.MustMetric(
|
||||
"name",
|
||||
map[string]string{"host": "hostA", "foo": "bar"},
|
||||
map[string]interface{}{"value": int64(0)},
|
||||
time.Unix(0, 0),
|
||||
)
|
||||
|
||||
mNew := testutil.MustMetric(
|
||||
"name",
|
||||
map[string]string{"host": "hostA", "foo": "moo"},
|
||||
map[string]interface{}{"value": int64(0)},
|
||||
time.Unix(0, 0),
|
||||
)
|
||||
|
||||
// Expected Zabbix metric generated
|
||||
zabbixMetric := zabbixRequestData{
|
||||
Host: "hostA",
|
||||
Key: "telegraf.name.value[bar]",
|
||||
Value: "0",
|
||||
Clock: 0,
|
||||
}
|
||||
|
||||
// Expected Zabbix metric generated
|
||||
zabbixMetricNew := zabbixRequestData{
|
||||
Host: "hostA",
|
||||
Key: "telegraf.name.value[moo]",
|
||||
Value: "0",
|
||||
Clock: 0,
|
||||
}
|
||||
|
||||
// Expected Zabbix LLD metric generated
|
||||
zabbixLLDMetric := zabbixRequestData{
|
||||
Host: "hostA",
|
||||
Key: "telegraf.lld.name.foo",
|
||||
Value: `{"data":[{"{#FOO}":"bar"}]}`,
|
||||
Clock: 0,
|
||||
}
|
||||
|
||||
// Expected Zabbix LLD metric generated
|
||||
zabbixLLDMetricNew := zabbixRequestData{
|
||||
Host: "hostA",
|
||||
Key: "telegraf.lld.name.foo",
|
||||
Value: `{"data":[{"{#FOO}":"bar"},{"{#FOO}":"moo"}]}`,
|
||||
Clock: 0,
|
||||
}
|
||||
|
||||
// Simulate a Zabbix server to get the data sent
|
||||
listener, err := net.Listen("tcp", "127.0.0.1:")
|
||||
require.NoError(t, err)
|
||||
defer listener.Close()
|
||||
|
||||
z := &Zabbix{
|
||||
Address: listener.Addr().String(),
|
||||
KeyPrefix: "telegraf.",
|
||||
HostTag: "host",
|
||||
LLDSendInterval: config.Duration(10 * time.Minute),
|
||||
LLDClearInterval: config.Duration(1 * time.Hour),
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
require.NoError(t, z.Init())
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
|
||||
// Read first packet with two metrics, then the first autoregister packet and the second autoregister packet.
|
||||
go func() {
|
||||
// First packet with metrics
|
||||
request := listenForZabbixMetric(t, listener, false)
|
||||
compareData(t, []zabbixRequestData{zabbixMetric}, request.Data)
|
||||
|
||||
// Second packet, while time has not surpassed LLDSendInterval
|
||||
request = listenForZabbixMetric(t, listener, false)
|
||||
compareData(t, []zabbixRequestData{zabbixMetric}, request.Data)
|
||||
|
||||
// Third packet, time has surpassed LLDSendInterval, metrics + LLD
|
||||
request = listenForZabbixMetric(t, listener, false)
|
||||
require.Len(t, request.Data, 2, "Expected 2 metrics")
|
||||
request.Data[1].Clock = 0 // Ignore lld request clock
|
||||
compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetric}, request.Data)
|
||||
|
||||
// Fourth packet with metrics
|
||||
request = listenForZabbixMetric(t, listener, false)
|
||||
compareData(t, []zabbixRequestData{zabbixMetric}, request.Data)
|
||||
|
||||
// Fifth packet, time has surpassed LLDSendInterval, metrics. No LLD as there is nothing new.
|
||||
request = listenForZabbixMetric(t, listener, false)
|
||||
compareData(t, []zabbixRequestData{zabbixMetric}, request.Data)
|
||||
|
||||
// Sixth packet, new LLD info, but time has not surpassed LLDSendInterval
|
||||
request = listenForZabbixMetric(t, listener, false)
|
||||
compareData(t, []zabbixRequestData{zabbixMetricNew}, request.Data)
|
||||
|
||||
// Seventh packet, time has surpassed LLDSendInterval, metrics + LLD.
|
||||
// Also, time has surpassed LLDClearInterval, so LLD is cleared.
|
||||
request = listenForZabbixMetric(t, listener, false)
|
||||
require.Len(t, request.Data, 2, "Expected 2 metrics")
|
||||
request.Data[1].Clock = 0 // Ignore lld request clock
|
||||
compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetricNew}, request.Data)
|
||||
|
||||
// Eighth packet, time host not surpassed LLDSendInterval, just metrics.
|
||||
request = listenForZabbixMetric(t, listener, false)
|
||||
compareData(t, []zabbixRequestData{zabbixMetric}, request.Data)
|
||||
|
||||
// Ninth packet, time has surpassed LLDSendInterval, metrics + LLD.
|
||||
// Just the info of the zabbixMetric as zabbixMetricNew has not been seen since LLDClearInterval.
|
||||
request = listenForZabbixMetric(t, listener, false)
|
||||
require.Len(t, request.Data, 2, "Expected 2 metrics")
|
||||
request.Data[1].Clock = 0 // Ignore lld request clock
|
||||
compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetric}, request.Data)
|
||||
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
// First packet
|
||||
require.NoError(t, z.Write([]telegraf.Metric{m}))
|
||||
|
||||
// Second packet, while time has not surpassed LLDSendInterval
|
||||
require.NoError(t, z.Write([]telegraf.Metric{m}))
|
||||
|
||||
// Simulate time passing for a new LLD send
|
||||
z.lldLastSend = time.Now().Add(-time.Duration(z.LLDSendInterval)).Add(-time.Millisecond)
|
||||
|
||||
// Third packet, time has surpassed LLDSendInterval, metrics + LLD
|
||||
require.NoError(t, z.Write([]telegraf.Metric{m}))
|
||||
|
||||
// Fourth packet
|
||||
require.NoError(t, z.Write([]telegraf.Metric{m}))
|
||||
|
||||
// Simulate time passing for a new LLD send
|
||||
z.lldLastSend = time.Now().Add(-time.Duration(z.LLDSendInterval)).Add(-time.Millisecond)
|
||||
|
||||
// Fifth packet, time has surpassed LLDSendInterval, metrics. No LLD as there is nothing new.
|
||||
require.NoError(t, z.Write([]telegraf.Metric{m}))
|
||||
|
||||
// Sixth packet, new LLD info, but time has not surpassed LLDSendInterval
|
||||
require.NoError(t, z.Write([]telegraf.Metric{mNew}))
|
||||
|
||||
// Simulate time passing for LLD clear
|
||||
z.lldLastSend = time.Now().Add(-time.Duration(z.LLDClearInterval)).Add(-time.Millisecond)
|
||||
|
||||
// Seventh packet, time has surpassed LLDSendInterval and LLDClearInterval, metrics + LLD.
|
||||
// LLD will be cleared.
|
||||
require.NoError(t, z.Write([]telegraf.Metric{m}))
|
||||
|
||||
// Eighth packet, time host not surpassed LLDSendInterval, just metrics.
|
||||
require.NoError(t, z.Write([]telegraf.Metric{m}))
|
||||
|
||||
// Simulate time passing for a new LLD send
|
||||
z.lldLastSend = time.Now().Add(-time.Duration(z.LLDSendInterval)).Add(-time.Millisecond)
|
||||
|
||||
// Ninth packet, time has surpassed LLDSendInterval, metrics + LLD.
|
||||
require.NoError(t, z.Write([]telegraf.Metric{m}))
|
||||
|
||||
// Wait for zabbix server emulator to finish
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// TestAutoregister tests that autoregistration requests are sent to zabbix if enabled
|
||||
func TestAutoregister(t *testing.T) {
|
||||
// Simulate a Zabbix server to get the data sent
|
||||
listener, err := net.Listen("tcp", "127.0.0.1:")
|
||||
require.NoError(t, err)
|
||||
defer listener.Close()
|
||||
|
||||
z := &Zabbix{
|
||||
Address: listener.Addr().String(),
|
||||
KeyPrefix: "telegraf.",
|
||||
HostTag: "host",
|
||||
SkipMeasurementPrefix: false,
|
||||
AgentActive: false,
|
||||
Autoregister: "xxx",
|
||||
AutoregisterResendInterval: config.Duration(time.Minute * 5),
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
require.NoError(t, z.Init())
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
|
||||
// Read first packet with two metrics, then the first autoregister packet and the second autoregister packet.
|
||||
go func() {
|
||||
// Accept packet with the two metrics sent
|
||||
_ = listenForZabbixMetric(t, listener, false)
|
||||
|
||||
// Read the first autoregister packet
|
||||
request := listenForZabbixMetric(t, listener, false)
|
||||
require.Equal(t, "active checks", request.Request)
|
||||
require.Equal(t, "xxx", request.HostMetadata)
|
||||
|
||||
hostsRegistered := []string{request.Host}
|
||||
|
||||
// Read the second autoregister packet
|
||||
request = listenForZabbixMetric(t, listener, false)
|
||||
require.Equal(t, "active checks", request.Request)
|
||||
require.Equal(t, "xxx", request.HostMetadata)
|
||||
|
||||
// Check we have received autoregistration for both hosts
|
||||
hostsRegistered = append(hostsRegistered, request.Host)
|
||||
require.ElementsMatch(t, []string{"hostA", "hostB"}, hostsRegistered)
|
||||
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
err = z.Write([]telegraf.Metric{
|
||||
testutil.MustMetric(
|
||||
"name",
|
||||
map[string]string{"host": "hostA"},
|
||||
map[string]interface{}{"value": int64(0)},
|
||||
time.Now(),
|
||||
),
|
||||
testutil.MustMetric(
|
||||
"name",
|
||||
map[string]string{"host": "hostB"},
|
||||
map[string]interface{}{"value": int64(0)},
|
||||
time.Now(),
|
||||
),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Wait for zabbix server emulator to finish
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// compareData compares generated data with expected data ignoring slice order if all Clocks are
|
||||
// the same.
|
||||
// This is useful for metrics with several fields that should produce several Zabbix values that
|
||||
// could not be sorted by clock
|
||||
func compareData(t *testing.T, expected []zabbixRequestData, data []zabbixRequestData) {
|
||||
t.Helper()
|
||||
|
||||
var clock int64
|
||||
|
||||
sameClock := true
|
||||
|
||||
// Check if all clocks are the same
|
||||
for i := 0; i < len(data); i++ {
|
||||
if i == 0 {
|
||||
clock = data[i].Clock
|
||||
} else if clock != data[i].Clock {
|
||||
sameClock = false
|
||||
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Zabbix requests with LLD data contains a JSON value with an array of dictionaries.
|
||||
// That array order depends in the access to a map, so it does not have a defined order.
|
||||
// To compare the data, we need to sort the array of dictionaries.
|
||||
// Before comparing the requests, sort those values.
|
||||
// To detect if a request contains LLD data, try to unmarshal it to a ZabbixLLDValue.
|
||||
// If it could be unmarshalled, sort the slice and marshal it again.
|
||||
for i := 0; i < len(data); i++ {
|
||||
var lldValue zabbixLLDValue
|
||||
|
||||
err := json.Unmarshal([]byte(data[i].Value), &lldValue)
|
||||
if err == nil {
|
||||
sort.Slice(lldValue.Data, func(i, j int) bool {
|
||||
// Generate a global order based on the keys and values present in the map
|
||||
keysValuesI := make([]string, 0, len(lldValue.Data[i])*2)
|
||||
keysValuesJ := make([]string, 0, len(lldValue.Data[j])*2)
|
||||
for k, v := range lldValue.Data[i] {
|
||||
keysValuesI = append(keysValuesI, k, v)
|
||||
}
|
||||
for k, v := range lldValue.Data[j] {
|
||||
keysValuesJ = append(keysValuesJ, k, v)
|
||||
}
|
||||
|
||||
sort.Strings(keysValuesI)
|
||||
sort.Strings(keysValuesJ)
|
||||
|
||||
return strings.Join(keysValuesI, "") < strings.Join(keysValuesJ, "")
|
||||
})
|
||||
sortedValue, err := json.Marshal(lldValue)
|
||||
require.NoError(t, err)
|
||||
|
||||
data[i].Value = string(sortedValue)
|
||||
}
|
||||
}
|
||||
|
||||
if sameClock {
|
||||
require.ElementsMatch(t, expected, data)
|
||||
} else {
|
||||
require.Equal(t, expected, data)
|
||||
}
|
||||
}
|
||||
|
||||
// listenForZabbixMetric starts a TCP server listening for one Zabbix metric.
|
||||
// ignoreAcceptError is used to ignore the error when the server is closed.
|
||||
func listenForZabbixMetric(t *testing.T, listener net.Listener, ignoreAcceptError bool) zabbixRequest {
|
||||
t.Helper()
|
||||
|
||||
conn, err := listener.Accept()
|
||||
if err != nil && ignoreAcceptError {
|
||||
return zabbixRequest{}
|
||||
}
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
// Obtain request from the mock zabbix server
|
||||
// Read protocol header and version
|
||||
header := make([]byte, 5)
|
||||
_, err = conn.Read(header)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Read data length
|
||||
dataLengthRaw := make([]byte, 8)
|
||||
_, err = conn.Read(dataLengthRaw)
|
||||
require.NoError(t, err)
|
||||
|
||||
dataLength := binary.LittleEndian.Uint64(dataLengthRaw)
|
||||
|
||||
// Read data content
|
||||
content := make([]byte, dataLength)
|
||||
_, err = conn.Read(content)
|
||||
require.NoError(t, err)
|
||||
|
||||
// The zabbix output checks that there are not errors
|
||||
// Simulated response from the server
|
||||
resp := []byte("ZBXD\x01\x00\x00\x00\x00\x00\x00\x00\x00{\"response\": \"success\", \"info\": \"\"}\n")
|
||||
_, err = conn.Write(resp)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Close connection after reading the client data
|
||||
conn.Close()
|
||||
|
||||
// Strip zabbix header and get JSON request
|
||||
var request zabbixRequest
|
||||
require.NoError(t, json.Unmarshal(content, &request))
|
||||
|
||||
return request
|
||||
}
|
||||
|
||||
func TestBuildZabbixMetric(t *testing.T) {
|
||||
keyPrefix := "prefix."
|
||||
hostTag := "host"
|
||||
|
||||
z := &Zabbix{
|
||||
KeyPrefix: keyPrefix,
|
||||
HostTag: hostTag,
|
||||
}
|
||||
|
||||
zm, err := z.buildZabbixMetric(testutil.MustMetric(
|
||||
"name",
|
||||
map[string]string{hostTag: "hostA", "foo": "bar", "a": "b"},
|
||||
map[string]interface{}{},
|
||||
time.Now()),
|
||||
"value",
|
||||
1,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, fmt.Sprintf("%sname.value[b,bar]", keyPrefix), zm.Key)
|
||||
|
||||
zm, err = z.buildZabbixMetric(testutil.MustMetric(
|
||||
"name",
|
||||
map[string]string{hostTag: "hostA"},
|
||||
map[string]interface{}{},
|
||||
time.Now()),
|
||||
"value",
|
||||
1,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, fmt.Sprintf("%sname.value", keyPrefix), zm.Key)
|
||||
}
|
||||
|
||||
func TestGetHostname(t *testing.T) {
|
||||
hostname, err := os.Hostname()
|
||||
require.NoError(t, err)
|
||||
|
||||
tests := map[string]struct {
|
||||
HostTag string
|
||||
Host string
|
||||
Tags map[string]string
|
||||
Result string
|
||||
}{
|
||||
"metric with host tag": {
|
||||
HostTag: "host",
|
||||
Tags: map[string]string{
|
||||
"host": "bar",
|
||||
},
|
||||
Result: "bar",
|
||||
},
|
||||
"metric with host tag changed": {
|
||||
HostTag: "source",
|
||||
Tags: map[string]string{
|
||||
"source": "bar",
|
||||
},
|
||||
Result: "bar",
|
||||
},
|
||||
"metric with no host tag": {
|
||||
Tags: map[string]string{},
|
||||
Result: hostname,
|
||||
},
|
||||
}
|
||||
|
||||
for desc, test := range tests {
|
||||
t.Run(desc, func(t *testing.T) {
|
||||
metric := testutil.MustMetric(
|
||||
"name",
|
||||
test.Tags,
|
||||
map[string]interface{}{},
|
||||
time.Now(),
|
||||
)
|
||||
|
||||
host, err := getHostname(test.HostTag, metric)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, test.Result, host)
|
||||
})
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue