feat(inputs.jti_openconfig_telemetry): Set timestamp from data (#12730)

This commit is contained in:
Joshua Powers 2023-02-27 10:39:25 -07:00 committed by GitHub
parent a90b6eb119
commit d40f46e7ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 73 additions and 6 deletions

View File

@ -54,6 +54,11 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
"/interfaces", "/interfaces",
] ]
## Timestamp Source
## Set to 'collection' for time of collection, and 'data' for using the time
## provided by the _timestamp field.
# timestamp_source = "collection"
## Optional TLS Config ## Optional TLS Config
# enable_tls = false # enable_tls = false
# tls_ca = "/etc/telegraf/ca.pem" # tls_ca = "/etc/telegraf/ca.pem"

View File

@ -2,6 +2,7 @@
package jti_openconfig_telemetry package jti_openconfig_telemetry
import ( import (
"context"
_ "embed" _ "embed"
"fmt" "fmt"
"net" "net"
@ -10,7 +11,6 @@ import (
"sync" "sync"
"time" "time"
"context"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
@ -34,6 +34,7 @@ type OpenConfigTelemetry struct {
Username string `toml:"username"` Username string `toml:"username"`
Password string `toml:"password"` Password string `toml:"password"`
ClientID string `toml:"client_id"` ClientID string `toml:"client_id"`
TimestampSource string `toml:"timestamp_source"`
SampleFrequency config.Duration `toml:"sample_frequency"` SampleFrequency config.Duration `toml:"sample_frequency"`
StrAsTags bool `toml:"str_as_tags"` StrAsTags bool `toml:"str_as_tags"`
RetryDelay config.Duration `toml:"retry_delay"` RetryDelay config.Duration `toml:"retry_delay"`
@ -56,6 +57,17 @@ func (*OpenConfigTelemetry) SampleConfig() string {
return sampleConfig return sampleConfig
} }
func (m *OpenConfigTelemetry) Init() error {
switch m.TimestampSource {
case "", "collection":
case "data":
default:
return fmt.Errorf("unknown option for timestamp_source: %q", m.TimestampSource)
}
return nil
}
func (m *OpenConfigTelemetry) Gather(_ telegraf.Accumulator) error { func (m *OpenConfigTelemetry) Gather(_ telegraf.Accumulator) error {
return nil return nil
} }
@ -279,13 +291,23 @@ func (m *OpenConfigTelemetry) collectData(
// Print final data collection // Print final data collection
m.Log.Debugf("Available collection for %s is: %v", grpcServer, dgroups) m.Log.Debugf("Available collection for %s is: %v", grpcServer, dgroups)
tnow := time.Now() timestamp := time.Now()
// Iterate through data groups and add them // Iterate through data groups and add them
for _, group := range dgroups { for _, group := range dgroups {
if m.TimestampSource == "data" {
// OpenConfig timestamp is in milliseconds since epoch
ts, ok := group.data["_timestamp"].(uint64)
if ok {
timestamp = time.UnixMilli(int64(ts))
} else {
m.Log.Warnf("invalid type %T for _timestamp %v", group.data["_timestamp"], group.data["_timestamp"])
}
}
if len(group.tags) == 0 { if len(group.tags) == 0 {
acc.AddFields(sensor.measurementName, group.data, tags, tnow) acc.AddFields(sensor.measurementName, group.data, tags, timestamp)
} else { } else {
acc.AddFields(sensor.measurementName, group.data, group.tags, tnow) acc.AddFields(sensor.measurementName, group.data, group.tags, timestamp)
} }
} }
} }
@ -363,8 +385,9 @@ func (m *OpenConfigTelemetry) Start(acc telegraf.Accumulator) error {
func init() { func init() {
inputs.Add("jti_openconfig_telemetry", func() telegraf.Input { inputs.Add("jti_openconfig_telemetry", func() telegraf.Input {
return &OpenConfigTelemetry{ return &OpenConfigTelemetry{
RetryDelay: config.Duration(time.Second), RetryDelay: config.Duration(time.Second),
StrAsTags: false, StrAsTags: false,
TimestampSource: "collection",
} }
}) })
} }

View File

@ -46,6 +46,14 @@ var dataWithStringValues = &telemetry.OpenConfigData{
{Key: "strKey[tag='tagValue']/strValue", Value: &telemetry.KeyValue_StrValue{StrValue: "10"}}}, {Key: "strKey[tag='tagValue']/strValue", Value: &telemetry.KeyValue_StrValue{StrValue: "10"}}},
} }
var dataWithTimestamp = &telemetry.OpenConfigData{
Path: "/sensor_with_timestamp",
Kv: []*telemetry.KeyValue{
{Key: "/sensor[tag='tagValue']/intKey", Value: &telemetry.KeyValue_IntValue{IntValue: 10}},
},
Timestamp: 1676560510002,
}
type openConfigTelemetryServer struct { type openConfigTelemetryServer struct {
telemetry.UnimplementedOpenConfigTelemetryServer telemetry.UnimplementedOpenConfigTelemetryServer
} }
@ -64,6 +72,8 @@ func (s *openConfigTelemetryServer) TelemetrySubscribe(
return stream.Send(dataWithMultipleTags) return stream.Send(dataWithMultipleTags)
case "/sensor_with_string_values": case "/sensor_with_string_values":
return stream.Send(dataWithStringValues) return stream.Send(dataWithStringValues)
case "/sensor_with_timestamp":
return stream.Send(dataWithTimestamp)
} }
return nil return nil
} }
@ -124,6 +134,30 @@ func TestOpenConfigTelemetryData(t *testing.T) {
acc.AssertContainsTaggedFields(t, "/sensor", fields, tags) acc.AssertContainsTaggedFields(t, "/sensor", fields, tags)
} }
func TestOpenConfigTelemetryData_timestamp(t *testing.T) {
var acc testutil.Accumulator
cfg.Sensors = []string{"/sensor_with_timestamp"}
require.NoError(t, cfg.Start(&acc))
timestamp := int64(1676560510002)
tags := map[string]string{
"device": "127.0.0.1",
"/sensor/@tag": "tagValue",
"system_id": "",
"path": "/sensor_with_timestamp",
}
fields := map[string]interface{}{
"/sensor/intKey": int64(10),
"_sequence": uint64(0),
"_timestamp": uint64(timestamp),
"_component_id": uint32(0),
"_subcomponent_id": uint32(0),
}
require.Eventually(t, func() bool { return acc.HasMeasurement("/sensor_with_timestamp") }, 5*time.Second, 10*time.Millisecond)
acc.AssertContainsTaggedFields(t, "/sensor_with_timestamp", fields, tags)
}
func TestOpenConfigTelemetryDataWithPrefix(t *testing.T) { func TestOpenConfigTelemetryDataWithPrefix(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
cfg.Sensors = []string{"/sensor_with_prefix"} cfg.Sensors = []string{"/sensor_with_prefix"}

View File

@ -33,6 +33,11 @@
"/interfaces", "/interfaces",
] ]
## Timestamp Source
## Set to 'collection' for time of collection, and 'data' for using the time
## provided by the _timestamp field.
# timestamp_source = "collection"
## Optional TLS Config ## Optional TLS Config
# enable_tls = false # enable_tls = false
# tls_ca = "/etc/telegraf/ca.pem" # tls_ca = "/etc/telegraf/ca.pem"