2022-05-24 21:49:47 +08:00
|
|
|
//go:generate ../../../tools/readme_config_includer/generator
|
2020-07-01 14:19:16 +08:00
|
|
|
package gnmi
|
2019-06-05 05:39:46 +08:00
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"bytes"
|
|
|
|
|
"context"
|
|
|
|
|
"crypto/tls"
|
2022-05-24 21:49:47 +08:00
|
|
|
_ "embed"
|
2019-06-05 05:39:46 +08:00
|
|
|
"encoding/json"
|
|
|
|
|
"fmt"
|
|
|
|
|
"io"
|
2020-03-18 03:56:51 +08:00
|
|
|
"math"
|
2019-06-05 05:39:46 +08:00
|
|
|
"net"
|
2019-09-25 02:05:56 +08:00
|
|
|
"path"
|
2019-06-05 05:39:46 +08:00
|
|
|
"strings"
|
|
|
|
|
"sync"
|
|
|
|
|
"time"
|
|
|
|
|
|
2022-04-26 21:38:02 +08:00
|
|
|
"github.com/google/gnxi/utils/xpath"
|
2021-07-28 05:28:26 +08:00
|
|
|
gnmiLib "github.com/openconfig/gnmi/proto/gnmi"
|
|
|
|
|
"google.golang.org/grpc"
|
|
|
|
|
"google.golang.org/grpc/credentials"
|
|
|
|
|
"google.golang.org/grpc/metadata"
|
|
|
|
|
|
2019-06-05 05:39:46 +08:00
|
|
|
"github.com/influxdata/telegraf"
|
2021-04-10 01:15:04 +08:00
|
|
|
"github.com/influxdata/telegraf/config"
|
2019-06-05 14:00:24 +08:00
|
|
|
"github.com/influxdata/telegraf/metric"
|
2020-06-26 02:44:22 +08:00
|
|
|
internaltls "github.com/influxdata/telegraf/plugins/common/tls"
|
2019-06-05 05:39:46 +08:00
|
|
|
"github.com/influxdata/telegraf/plugins/inputs"
|
|
|
|
|
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
|
|
|
|
|
)
|
|
|
|
|
|
2022-05-27 21:13:47 +08:00
|
|
|
// DO NOT REMOVE THE NEXT TWO LINES! This is required to embed the sampleConfig data.
|
2022-05-24 21:49:47 +08:00
|
|
|
//go:embed sample.conf
|
|
|
|
|
var sampleConfig string
|
|
|
|
|
|
2020-07-01 14:19:16 +08:00
|
|
|
// gNMI plugin instance
|
|
|
|
|
type GNMI struct {
|
2019-06-05 05:39:46 +08:00
|
|
|
Addresses []string `toml:"addresses"`
|
|
|
|
|
Subscriptions []Subscription `toml:"subscription"`
|
|
|
|
|
Aliases map[string]string `toml:"aliases"`
|
|
|
|
|
|
|
|
|
|
// Optional subscription configuration
|
|
|
|
|
Encoding string
|
|
|
|
|
Origin string
|
|
|
|
|
Prefix string
|
|
|
|
|
Target string
|
|
|
|
|
UpdatesOnly bool `toml:"updates_only"`
|
|
|
|
|
|
2020-07-01 14:19:16 +08:00
|
|
|
// gNMI target credentials
|
2019-06-05 05:39:46 +08:00
|
|
|
Username string
|
|
|
|
|
Password string
|
|
|
|
|
|
|
|
|
|
// Redial
|
2021-04-10 01:15:04 +08:00
|
|
|
Redial config.Duration
|
2019-06-05 05:39:46 +08:00
|
|
|
|
|
|
|
|
// GRPC TLS settings
|
|
|
|
|
EnableTLS bool `toml:"enable_tls"`
|
|
|
|
|
internaltls.ClientConfig
|
|
|
|
|
|
|
|
|
|
// Internal state
|
2021-07-28 05:28:26 +08:00
|
|
|
internalAliases map[string]string
|
|
|
|
|
acc telegraf.Accumulator
|
|
|
|
|
cancel context.CancelFunc
|
|
|
|
|
wg sync.WaitGroup
|
2022-02-07 23:18:53 +08:00
|
|
|
// Lookup/device+name/key/value
|
2022-04-22 21:40:20 +08:00
|
|
|
lookup map[string]map[string]map[string]interface{}
|
|
|
|
|
lookupMutex sync.Mutex
|
2019-09-24 06:39:50 +08:00
|
|
|
|
|
|
|
|
Log telegraf.Logger
|
2019-06-05 05:39:46 +08:00
|
|
|
}
|
|
|
|
|
|
2020-07-01 14:19:16 +08:00
|
|
|
// Subscription for a gNMI client
|
2019-06-05 05:39:46 +08:00
|
|
|
type Subscription struct {
|
|
|
|
|
Name string
|
|
|
|
|
Origin string
|
|
|
|
|
Path string
|
|
|
|
|
|
|
|
|
|
// Subscription mode and interval
|
2021-04-10 01:15:04 +08:00
|
|
|
SubscriptionMode string `toml:"subscription_mode"`
|
|
|
|
|
SampleInterval config.Duration `toml:"sample_interval"`
|
2019-06-05 05:39:46 +08:00
|
|
|
|
|
|
|
|
// Duplicate suppression
|
2021-04-10 01:15:04 +08:00
|
|
|
SuppressRedundant bool `toml:"suppress_redundant"`
|
|
|
|
|
HeartbeatInterval config.Duration `toml:"heartbeat_interval"`
|
2022-02-07 23:18:53 +08:00
|
|
|
|
|
|
|
|
// Mark this subscription as a tag-only lookup source, not emitting any metric
|
|
|
|
|
TagOnly bool `toml:"tag_only"`
|
2019-06-05 05:39:46 +08:00
|
|
|
}
|
|
|
|
|
|
2022-05-24 21:49:47 +08:00
|
|
|
func (*GNMI) SampleConfig() string {
|
|
|
|
|
return sampleConfig
|
|
|
|
|
}
|
|
|
|
|
|
2019-06-05 05:39:46 +08:00
|
|
|
// Start the http listener service
|
2020-07-01 14:19:16 +08:00
|
|
|
func (c *GNMI) Start(acc telegraf.Accumulator) error {
|
2019-06-05 05:39:46 +08:00
|
|
|
var err error
|
|
|
|
|
var ctx context.Context
|
|
|
|
|
var tlscfg *tls.Config
|
2021-07-28 05:28:26 +08:00
|
|
|
var request *gnmiLib.SubscribeRequest
|
2019-06-05 05:39:46 +08:00
|
|
|
c.acc = acc
|
|
|
|
|
ctx, c.cancel = context.WithCancel(context.Background())
|
2022-04-22 21:40:20 +08:00
|
|
|
c.lookupMutex.Lock()
|
2022-02-07 23:18:53 +08:00
|
|
|
c.lookup = make(map[string]map[string]map[string]interface{})
|
2022-04-22 21:40:20 +08:00
|
|
|
c.lookupMutex.Unlock()
|
2019-06-05 05:39:46 +08:00
|
|
|
|
|
|
|
|
// Validate configuration
|
|
|
|
|
if request, err = c.newSubscribeRequest(); err != nil {
|
|
|
|
|
return err
|
2021-04-10 01:15:04 +08:00
|
|
|
} else if time.Duration(c.Redial).Nanoseconds() <= 0 {
|
2019-06-05 05:39:46 +08:00
|
|
|
return fmt.Errorf("redial duration must be positive")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Parse TLS config
|
|
|
|
|
if c.EnableTLS {
|
|
|
|
|
if tlscfg, err = c.ClientConfig.TLSConfig(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(c.Username) > 0 {
|
|
|
|
|
ctx = metadata.AppendToOutgoingContext(ctx, "username", c.Username, "password", c.Password)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Invert explicit alias list and prefill subscription names
|
2021-07-28 05:28:26 +08:00
|
|
|
c.internalAliases = make(map[string]string, len(c.Subscriptions)+len(c.Aliases))
|
2019-06-05 05:39:46 +08:00
|
|
|
for _, subscription := range c.Subscriptions {
|
2021-07-28 05:28:26 +08:00
|
|
|
var gnmiLongPath, gnmiShortPath *gnmiLib.Path
|
2019-09-25 02:05:56 +08:00
|
|
|
|
2019-06-15 02:29:06 +08:00
|
|
|
// Build the subscription path without keys
|
2019-09-25 02:05:56 +08:00
|
|
|
if gnmiLongPath, err = parsePath(subscription.Origin, subscription.Path, ""); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if gnmiShortPath, err = parsePath("", subscription.Path, ""); err != nil {
|
2019-06-15 02:29:06 +08:00
|
|
|
return err
|
2019-06-05 05:39:46 +08:00
|
|
|
}
|
|
|
|
|
|
2021-04-23 05:08:03 +08:00
|
|
|
longPath, _, err := c.handlePath(gnmiLongPath, nil, "")
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("handling long-path failed: %v", err)
|
|
|
|
|
}
|
|
|
|
|
shortPath, _, err := c.handlePath(gnmiShortPath, nil, "")
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("handling short-path failed: %v", err)
|
|
|
|
|
}
|
2019-06-05 05:39:46 +08:00
|
|
|
name := subscription.Name
|
2019-06-15 02:29:06 +08:00
|
|
|
|
|
|
|
|
// If the user didn't provide a measurement name, use last path element
|
2019-06-05 05:39:46 +08:00
|
|
|
if len(name) == 0 {
|
2019-09-25 02:05:56 +08:00
|
|
|
name = path.Base(shortPath)
|
2019-06-05 05:39:46 +08:00
|
|
|
}
|
|
|
|
|
if len(name) > 0 {
|
2021-07-28 05:28:26 +08:00
|
|
|
c.internalAliases[longPath] = name
|
|
|
|
|
c.internalAliases[shortPath] = name
|
2019-06-05 05:39:46 +08:00
|
|
|
}
|
2022-02-07 23:18:53 +08:00
|
|
|
|
|
|
|
|
if subscription.TagOnly {
|
|
|
|
|
// Create the top-level lookup for this tag
|
2022-04-22 21:40:20 +08:00
|
|
|
c.lookupMutex.Lock()
|
2022-02-07 23:18:53 +08:00
|
|
|
c.lookup[name] = make(map[string]map[string]interface{})
|
2022-04-22 21:40:20 +08:00
|
|
|
c.lookupMutex.Unlock()
|
2022-02-07 23:18:53 +08:00
|
|
|
}
|
2019-06-05 05:39:46 +08:00
|
|
|
}
|
2021-07-28 05:28:26 +08:00
|
|
|
for alias, encodingPath := range c.Aliases {
|
|
|
|
|
c.internalAliases[encodingPath] = alias
|
2019-06-05 05:39:46 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Create a goroutine for each device, dial and subscribe
|
|
|
|
|
c.wg.Add(len(c.Addresses))
|
|
|
|
|
for _, addr := range c.Addresses {
|
|
|
|
|
go func(address string) {
|
|
|
|
|
defer c.wg.Done()
|
|
|
|
|
for ctx.Err() == nil {
|
|
|
|
|
if err := c.subscribeGNMI(ctx, address, tlscfg, request); err != nil && ctx.Err() == nil {
|
|
|
|
|
acc.AddError(err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
2021-04-10 01:15:04 +08:00
|
|
|
case <-time.After(time.Duration(c.Redial)):
|
2019-06-05 05:39:46 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}(addr)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2020-07-01 14:19:16 +08:00
|
|
|
// Create a new gNMI SubscribeRequest
|
2021-07-28 05:28:26 +08:00
|
|
|
func (c *GNMI) newSubscribeRequest() (*gnmiLib.SubscribeRequest, error) {
|
2019-06-05 05:39:46 +08:00
|
|
|
// Create subscription objects
|
2021-07-28 05:28:26 +08:00
|
|
|
subscriptions := make([]*gnmiLib.Subscription, len(c.Subscriptions))
|
2019-06-05 05:39:46 +08:00
|
|
|
for i, subscription := range c.Subscriptions {
|
|
|
|
|
gnmiPath, err := parsePath(subscription.Origin, subscription.Path, "")
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
2021-07-28 05:28:26 +08:00
|
|
|
mode, ok := gnmiLib.SubscriptionMode_value[strings.ToUpper(subscription.SubscriptionMode)]
|
2019-06-05 05:39:46 +08:00
|
|
|
if !ok {
|
|
|
|
|
return nil, fmt.Errorf("invalid subscription mode %s", subscription.SubscriptionMode)
|
|
|
|
|
}
|
2021-07-28 05:28:26 +08:00
|
|
|
subscriptions[i] = &gnmiLib.Subscription{
|
2019-06-05 05:39:46 +08:00
|
|
|
Path: gnmiPath,
|
2021-07-28 05:28:26 +08:00
|
|
|
Mode: gnmiLib.SubscriptionMode(mode),
|
2021-04-10 01:15:04 +08:00
|
|
|
SampleInterval: uint64(time.Duration(subscription.SampleInterval).Nanoseconds()),
|
2019-06-05 05:39:46 +08:00
|
|
|
SuppressRedundant: subscription.SuppressRedundant,
|
2021-04-10 01:15:04 +08:00
|
|
|
HeartbeatInterval: uint64(time.Duration(subscription.HeartbeatInterval).Nanoseconds()),
|
2019-06-05 05:39:46 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Construct subscribe request
|
|
|
|
|
gnmiPath, err := parsePath(c.Origin, c.Prefix, c.Target)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
2020-11-03 00:11:28 +08:00
|
|
|
if c.Encoding != "proto" && c.Encoding != "json" && c.Encoding != "json_ietf" && c.Encoding != "bytes" {
|
2019-06-05 05:39:46 +08:00
|
|
|
return nil, fmt.Errorf("unsupported encoding %s", c.Encoding)
|
|
|
|
|
}
|
|
|
|
|
|
2021-07-28 05:28:26 +08:00
|
|
|
return &gnmiLib.SubscribeRequest{
|
|
|
|
|
Request: &gnmiLib.SubscribeRequest_Subscribe{
|
|
|
|
|
Subscribe: &gnmiLib.SubscriptionList{
|
2019-06-05 05:39:46 +08:00
|
|
|
Prefix: gnmiPath,
|
2021-07-28 05:28:26 +08:00
|
|
|
Mode: gnmiLib.SubscriptionList_STREAM,
|
|
|
|
|
Encoding: gnmiLib.Encoding(gnmiLib.Encoding_value[strings.ToUpper(c.Encoding)]),
|
2019-06-05 05:39:46 +08:00
|
|
|
Subscription: subscriptions,
|
|
|
|
|
UpdatesOnly: c.UpdatesOnly,
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// SubscribeGNMI and extract telemetry data
|
2021-07-28 05:28:26 +08:00
|
|
|
func (c *GNMI) subscribeGNMI(ctx context.Context, address string, tlscfg *tls.Config, request *gnmiLib.SubscribeRequest) error {
|
2019-06-05 05:39:46 +08:00
|
|
|
var opt grpc.DialOption
|
|
|
|
|
if tlscfg != nil {
|
|
|
|
|
opt = grpc.WithTransportCredentials(credentials.NewTLS(tlscfg))
|
|
|
|
|
} else {
|
|
|
|
|
opt = grpc.WithInsecure()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
client, err := grpc.DialContext(ctx, address, opt)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("failed to dial: %v", err)
|
|
|
|
|
}
|
|
|
|
|
defer client.Close()
|
|
|
|
|
|
2021-07-28 05:28:26 +08:00
|
|
|
subscribeClient, err := gnmiLib.NewGNMIClient(client).Subscribe(ctx)
|
2019-06-05 05:39:46 +08:00
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("failed to setup subscription: %v", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err = subscribeClient.Send(request); err != nil {
|
2020-07-22 06:24:45 +08:00
|
|
|
// If io.EOF is returned, the stream may have ended and stream status
|
|
|
|
|
// can be determined by calling Recv.
|
|
|
|
|
if err != io.EOF {
|
|
|
|
|
return fmt.Errorf("failed to send subscription request: %v", err)
|
|
|
|
|
}
|
2019-06-05 05:39:46 +08:00
|
|
|
}
|
|
|
|
|
|
2020-07-01 14:19:16 +08:00
|
|
|
c.Log.Debugf("Connection to gNMI device %s established", address)
|
|
|
|
|
defer c.Log.Debugf("Connection to gNMI device %s closed", address)
|
2019-06-05 05:39:46 +08:00
|
|
|
for ctx.Err() == nil {
|
2021-07-28 05:28:26 +08:00
|
|
|
var reply *gnmiLib.SubscribeResponse
|
2019-06-05 05:39:46 +08:00
|
|
|
if reply, err = subscribeClient.Recv(); err != nil {
|
|
|
|
|
if err != io.EOF && ctx.Err() == nil {
|
2020-07-01 14:19:16 +08:00
|
|
|
return fmt.Errorf("aborted gNMI subscription: %v", err)
|
2019-06-05 05:39:46 +08:00
|
|
|
}
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
c.handleSubscribeResponse(address, reply)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2021-07-28 05:28:26 +08:00
|
|
|
func (c *GNMI) handleSubscribeResponse(address string, reply *gnmiLib.SubscribeResponse) {
|
2020-12-01 01:12:10 +08:00
|
|
|
switch response := reply.Response.(type) {
|
2021-07-28 05:28:26 +08:00
|
|
|
case *gnmiLib.SubscribeResponse_Update:
|
2020-12-01 01:12:10 +08:00
|
|
|
c.handleSubscribeResponseUpdate(address, response)
|
2021-07-28 05:28:26 +08:00
|
|
|
case *gnmiLib.SubscribeResponse_Error:
|
2020-12-01 01:12:10 +08:00
|
|
|
c.Log.Errorf("Subscribe error (%d), %q", response.Error.Code, response.Error.Message)
|
2019-06-05 05:39:46 +08:00
|
|
|
}
|
2020-12-01 01:12:10 +08:00
|
|
|
}
|
2019-06-05 05:39:46 +08:00
|
|
|
|
2020-12-01 01:12:10 +08:00
|
|
|
// Handle SubscribeResponse_Update message from gNMI and parse contained telemetry data
|
2021-07-28 05:28:26 +08:00
|
|
|
func (c *GNMI) handleSubscribeResponseUpdate(address string, response *gnmiLib.SubscribeResponse_Update) {
|
2019-06-05 05:39:46 +08:00
|
|
|
var prefix, prefixAliasPath string
|
|
|
|
|
grouper := metric.NewSeriesGrouper()
|
|
|
|
|
timestamp := time.Unix(0, response.Update.Timestamp)
|
|
|
|
|
prefixTags := make(map[string]string)
|
|
|
|
|
|
|
|
|
|
if response.Update.Prefix != nil {
|
2021-04-23 05:08:03 +08:00
|
|
|
var err error
|
|
|
|
|
if prefix, prefixAliasPath, err = c.handlePath(response.Update.Prefix, prefixTags, ""); err != nil {
|
|
|
|
|
c.Log.Errorf("handling path %q failed: %v", response.Update.Prefix, err)
|
|
|
|
|
}
|
2019-06-05 05:39:46 +08:00
|
|
|
}
|
|
|
|
|
prefixTags["source"], _, _ = net.SplitHostPort(address)
|
|
|
|
|
prefixTags["path"] = prefix
|
|
|
|
|
|
|
|
|
|
// Parse individual Update message and create measurements
|
|
|
|
|
var name, lastAliasPath string
|
|
|
|
|
for _, update := range response.Update.Update {
|
|
|
|
|
// Prepare tags from prefix
|
|
|
|
|
tags := make(map[string]string, len(prefixTags))
|
|
|
|
|
for key, val := range prefixTags {
|
|
|
|
|
tags[key] = val
|
|
|
|
|
}
|
|
|
|
|
aliasPath, fields := c.handleTelemetryField(update, tags, prefix)
|
|
|
|
|
|
|
|
|
|
// Inherent valid alias from prefix parsing
|
|
|
|
|
if len(prefixAliasPath) > 0 && len(aliasPath) == 0 {
|
|
|
|
|
aliasPath = prefixAliasPath
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Lookup alias if alias-path has changed
|
|
|
|
|
if aliasPath != lastAliasPath {
|
|
|
|
|
name = prefix
|
2021-07-28 05:28:26 +08:00
|
|
|
if alias, ok := c.internalAliases[aliasPath]; ok {
|
2019-06-05 05:39:46 +08:00
|
|
|
name = alias
|
|
|
|
|
} else {
|
2020-07-01 14:19:16 +08:00
|
|
|
c.Log.Debugf("No measurement alias for gNMI path: %s", name)
|
2019-06-05 05:39:46 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2022-02-07 23:18:53 +08:00
|
|
|
// Update tag lookups and discard rest of update
|
|
|
|
|
subscriptionKey := tags["source"] + "/" + tags["name"]
|
2022-04-22 21:40:20 +08:00
|
|
|
c.lookupMutex.Lock()
|
2022-02-07 23:18:53 +08:00
|
|
|
if _, ok := c.lookup[name]; ok {
|
|
|
|
|
// We are subscribed to this, so add the fields to the lookup-table
|
|
|
|
|
if _, ok := c.lookup[name][subscriptionKey]; !ok {
|
|
|
|
|
c.lookup[name][subscriptionKey] = make(map[string]interface{})
|
|
|
|
|
}
|
|
|
|
|
for k, v := range fields {
|
|
|
|
|
c.lookup[name][subscriptionKey][path.Base(k)] = v
|
|
|
|
|
}
|
2022-04-22 21:40:20 +08:00
|
|
|
c.lookupMutex.Unlock()
|
2022-02-07 23:18:53 +08:00
|
|
|
// Do not process the data further as we only subscribed here for the lookup table
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Apply lookups if present
|
|
|
|
|
for subscriptionName, values := range c.lookup {
|
|
|
|
|
if annotations, ok := values[subscriptionKey]; ok {
|
|
|
|
|
for k, v := range annotations {
|
2022-04-21 21:45:37 +08:00
|
|
|
tags[subscriptionName+"/"+k] = fmt.Sprint(v)
|
2022-02-07 23:18:53 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2022-04-22 21:40:20 +08:00
|
|
|
c.lookupMutex.Unlock()
|
2022-02-07 23:18:53 +08:00
|
|
|
|
2019-06-05 05:39:46 +08:00
|
|
|
// Group metrics
|
2020-01-09 02:52:36 +08:00
|
|
|
for k, v := range fields {
|
|
|
|
|
key := k
|
2021-01-14 04:48:21 +08:00
|
|
|
if len(aliasPath) < len(key) && len(aliasPath) != 0 {
|
2020-01-09 02:52:36 +08:00
|
|
|
// This may not be an exact prefix, due to naming style
|
|
|
|
|
// conversion on the key.
|
2019-06-15 02:29:06 +08:00
|
|
|
key = key[len(aliasPath)+1:]
|
2021-01-14 04:48:21 +08:00
|
|
|
} else if len(aliasPath) >= len(key) {
|
2020-01-09 02:52:36 +08:00
|
|
|
// Otherwise use the last path element as the field key.
|
|
|
|
|
key = path.Base(key)
|
|
|
|
|
|
|
|
|
|
// If there are no elements skip the item; this would be an
|
|
|
|
|
// invalid message.
|
|
|
|
|
key = strings.TrimLeft(key, "/.")
|
|
|
|
|
if key == "" {
|
|
|
|
|
c.Log.Errorf("invalid empty path: %q", k)
|
|
|
|
|
continue
|
|
|
|
|
}
|
2019-06-15 02:29:06 +08:00
|
|
|
}
|
2020-01-09 02:52:36 +08:00
|
|
|
|
2021-04-23 05:08:03 +08:00
|
|
|
if err := grouper.Add(name, tags, timestamp, key, v); err != nil {
|
|
|
|
|
c.Log.Errorf("cannot add to grouper: %v", err)
|
|
|
|
|
}
|
2019-06-05 05:39:46 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
lastAliasPath = aliasPath
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Add grouped measurements
|
2021-07-28 05:28:26 +08:00
|
|
|
for _, metricToAdd := range grouper.Metrics() {
|
|
|
|
|
c.acc.AddMetric(metricToAdd)
|
2019-06-05 05:39:46 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// HandleTelemetryField and add it to a measurement
|
2021-07-28 05:28:26 +08:00
|
|
|
func (c *GNMI) handleTelemetryField(update *gnmiLib.Update, tags map[string]string, prefix string) (string, map[string]interface{}) {
|
2021-04-23 05:08:03 +08:00
|
|
|
gpath, aliasPath, err := c.handlePath(update.Path, tags, prefix)
|
|
|
|
|
if err != nil {
|
|
|
|
|
c.Log.Errorf("handling path %q failed: %v", update.Path, err)
|
|
|
|
|
}
|
2019-06-05 05:39:46 +08:00
|
|
|
|
|
|
|
|
var value interface{}
|
|
|
|
|
var jsondata []byte
|
|
|
|
|
|
2019-09-25 02:05:56 +08:00
|
|
|
// Make sure a value is actually set
|
|
|
|
|
if update.Val == nil || update.Val.Value == nil {
|
2021-04-23 05:08:03 +08:00
|
|
|
c.Log.Infof("Discarded empty or legacy type value with path: %q", gpath)
|
2019-09-25 02:05:56 +08:00
|
|
|
return aliasPath, nil
|
|
|
|
|
}
|
|
|
|
|
|
2019-06-05 05:39:46 +08:00
|
|
|
switch val := update.Val.Value.(type) {
|
2021-07-28 05:28:26 +08:00
|
|
|
case *gnmiLib.TypedValue_AsciiVal:
|
2019-06-05 05:39:46 +08:00
|
|
|
value = val.AsciiVal
|
2021-07-28 05:28:26 +08:00
|
|
|
case *gnmiLib.TypedValue_BoolVal:
|
2019-06-05 05:39:46 +08:00
|
|
|
value = val.BoolVal
|
2021-07-28 05:28:26 +08:00
|
|
|
case *gnmiLib.TypedValue_BytesVal:
|
2019-06-05 05:39:46 +08:00
|
|
|
value = val.BytesVal
|
2021-07-28 05:28:26 +08:00
|
|
|
case *gnmiLib.TypedValue_DecimalVal:
|
2020-03-18 03:56:51 +08:00
|
|
|
value = float64(val.DecimalVal.Digits) / math.Pow(10, float64(val.DecimalVal.Precision))
|
2021-07-28 05:28:26 +08:00
|
|
|
case *gnmiLib.TypedValue_FloatVal:
|
2019-06-05 05:39:46 +08:00
|
|
|
value = val.FloatVal
|
2021-07-28 05:28:26 +08:00
|
|
|
case *gnmiLib.TypedValue_IntVal:
|
2019-06-05 05:39:46 +08:00
|
|
|
value = val.IntVal
|
2021-07-28 05:28:26 +08:00
|
|
|
case *gnmiLib.TypedValue_StringVal:
|
2019-06-05 05:39:46 +08:00
|
|
|
value = val.StringVal
|
2021-07-28 05:28:26 +08:00
|
|
|
case *gnmiLib.TypedValue_UintVal:
|
2019-06-05 05:39:46 +08:00
|
|
|
value = val.UintVal
|
2021-07-28 05:28:26 +08:00
|
|
|
case *gnmiLib.TypedValue_JsonIetfVal:
|
2019-06-05 05:39:46 +08:00
|
|
|
jsondata = val.JsonIetfVal
|
2021-07-28 05:28:26 +08:00
|
|
|
case *gnmiLib.TypedValue_JsonVal:
|
2019-06-05 05:39:46 +08:00
|
|
|
jsondata = val.JsonVal
|
|
|
|
|
}
|
|
|
|
|
|
2022-05-11 23:53:34 +08:00
|
|
|
name := strings.ReplaceAll(gpath, "-", "_")
|
2019-06-05 05:39:46 +08:00
|
|
|
fields := make(map[string]interface{})
|
|
|
|
|
if value != nil {
|
|
|
|
|
fields[name] = value
|
|
|
|
|
} else if jsondata != nil {
|
|
|
|
|
if err := json.Unmarshal(jsondata, &value); err != nil {
|
|
|
|
|
c.acc.AddError(fmt.Errorf("failed to parse JSON value: %v", err))
|
|
|
|
|
} else {
|
|
|
|
|
flattener := jsonparser.JSONFlattener{Fields: fields}
|
2021-04-23 05:08:03 +08:00
|
|
|
if err := flattener.FullFlattenJSON(name, value, true, true); err != nil {
|
|
|
|
|
c.acc.AddError(fmt.Errorf("failed to flatten JSON: %v", err))
|
|
|
|
|
}
|
2019-06-05 05:39:46 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return aliasPath, fields
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Parse path to path-buffer and tag-field
|
2021-07-28 05:28:26 +08:00
|
|
|
func (c *GNMI) handlePath(gnmiPath *gnmiLib.Path, tags map[string]string, prefix string) (pathBuffer string, aliasPath string, err error) {
|
2019-06-05 05:39:46 +08:00
|
|
|
builder := bytes.NewBufferString(prefix)
|
|
|
|
|
|
|
|
|
|
// Prefix with origin
|
2021-07-28 05:28:26 +08:00
|
|
|
if len(gnmiPath.Origin) > 0 {
|
|
|
|
|
if _, err := builder.WriteString(gnmiPath.Origin); err != nil {
|
2021-04-23 05:08:03 +08:00
|
|
|
return "", "", err
|
|
|
|
|
}
|
|
|
|
|
if _, err := builder.WriteRune(':'); err != nil {
|
|
|
|
|
return "", "", err
|
|
|
|
|
}
|
2019-06-05 05:39:46 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Parse generic keys from prefix
|
2021-07-28 05:28:26 +08:00
|
|
|
for _, elem := range gnmiPath.Elem {
|
2019-09-25 02:05:56 +08:00
|
|
|
if len(elem.Name) > 0 {
|
2021-04-23 05:08:03 +08:00
|
|
|
if _, err := builder.WriteRune('/'); err != nil {
|
|
|
|
|
return "", "", err
|
|
|
|
|
}
|
|
|
|
|
if _, err := builder.WriteString(elem.Name); err != nil {
|
|
|
|
|
return "", "", err
|
|
|
|
|
}
|
2019-09-25 02:05:56 +08:00
|
|
|
}
|
2019-06-05 05:39:46 +08:00
|
|
|
name := builder.String()
|
|
|
|
|
|
2021-07-28 05:28:26 +08:00
|
|
|
if _, exists := c.internalAliases[name]; exists {
|
2019-06-05 05:39:46 +08:00
|
|
|
aliasPath = name
|
|
|
|
|
}
|
|
|
|
|
|
2019-06-15 02:29:06 +08:00
|
|
|
if tags != nil {
|
|
|
|
|
for key, val := range elem.Key {
|
2022-05-11 23:53:34 +08:00
|
|
|
key = strings.ReplaceAll(key, "-", "_")
|
2019-06-15 02:29:06 +08:00
|
|
|
|
|
|
|
|
// Use short-form of key if possible
|
|
|
|
|
if _, exists := tags[key]; exists {
|
|
|
|
|
tags[name+"/"+key] = val
|
|
|
|
|
} else {
|
|
|
|
|
tags[key] = val
|
|
|
|
|
}
|
2019-06-05 05:39:46 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2021-04-23 05:08:03 +08:00
|
|
|
return builder.String(), aliasPath, nil
|
2019-06-05 05:39:46 +08:00
|
|
|
}
|
|
|
|
|
|
2020-07-01 14:19:16 +08:00
|
|
|
//ParsePath from XPath-like string to gNMI path structure
|
2021-07-28 05:28:26 +08:00
|
|
|
func parsePath(origin string, pathToParse string, target string) (*gnmiLib.Path, error) {
|
2022-04-26 21:38:02 +08:00
|
|
|
gnmiPath, err := xpath.ToGNMIPath(pathToParse)
|
2019-06-05 05:39:46 +08:00
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
2022-04-26 21:38:02 +08:00
|
|
|
gnmiPath.Origin = origin
|
|
|
|
|
gnmiPath.Target = target
|
|
|
|
|
return gnmiPath, err
|
2019-06-05 05:39:46 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Stop listener and cleanup
|
2020-07-01 14:19:16 +08:00
|
|
|
func (c *GNMI) Stop() {
|
2019-06-05 05:39:46 +08:00
|
|
|
c.cancel()
|
|
|
|
|
c.wg.Wait()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Gather plugin measurements (unused)
|
2020-07-01 14:19:16 +08:00
|
|
|
func (c *GNMI) Gather(_ telegraf.Accumulator) error {
|
2019-06-05 05:39:46 +08:00
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2020-07-01 14:19:16 +08:00
|
|
|
func New() telegraf.Input {
|
|
|
|
|
return &GNMI{
|
|
|
|
|
Encoding: "proto",
|
2021-04-10 01:15:04 +08:00
|
|
|
Redial: config.Duration(10 * time.Second),
|
2020-07-01 14:19:16 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2019-06-05 05:39:46 +08:00
|
|
|
func init() {
|
2020-07-01 14:19:16 +08:00
|
|
|
inputs.Add("gnmi", New)
|
|
|
|
|
// Backwards compatible alias:
|
|
|
|
|
inputs.Add("cisco_telemetry_gnmi", New)
|
2019-06-05 05:39:46 +08:00
|
|
|
}
|