2023-02-09 02:30:05 +08:00
|
|
|
package gnmi
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"crypto/tls"
|
2023-03-02 05:18:35 +08:00
|
|
|
"errors"
|
2023-02-09 02:30:05 +08:00
|
|
|
"fmt"
|
|
|
|
|
"io"
|
|
|
|
|
"net"
|
|
|
|
|
"path"
|
|
|
|
|
"strings"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/influxdata/telegraf"
|
2023-04-27 23:30:21 +08:00
|
|
|
"github.com/influxdata/telegraf/internal/choice"
|
2023-02-09 02:30:05 +08:00
|
|
|
"github.com/influxdata/telegraf/metric"
|
2023-04-27 23:30:21 +08:00
|
|
|
jnprHeader "github.com/influxdata/telegraf/plugins/inputs/gnmi/extensions/jnpr_gnmi_extention"
|
2023-05-02 19:29:30 +08:00
|
|
|
"github.com/influxdata/telegraf/selfstat"
|
2023-02-09 02:30:05 +08:00
|
|
|
gnmiLib "github.com/openconfig/gnmi/proto/gnmi"
|
2023-04-27 23:30:21 +08:00
|
|
|
gnmiExt "github.com/openconfig/gnmi/proto/gnmi_ext"
|
2023-02-09 02:30:05 +08:00
|
|
|
"google.golang.org/grpc"
|
|
|
|
|
"google.golang.org/grpc/credentials"
|
|
|
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
|
|
|
"google.golang.org/protobuf/encoding/protojson"
|
2023-04-27 23:30:21 +08:00
|
|
|
"google.golang.org/protobuf/proto"
|
2023-02-09 02:30:05 +08:00
|
|
|
)
|
|
|
|
|
|
2023-04-27 23:30:21 +08:00
|
|
|
const eidJuniperTelemetryHeader = 1
|
|
|
|
|
|
2023-02-09 02:30:05 +08:00
|
|
|
type handler struct {
|
2023-05-24 22:14:06 +08:00
|
|
|
address string
|
|
|
|
|
aliases map[string]string
|
|
|
|
|
tagsubs []TagSubscription
|
|
|
|
|
maxMsgSize int
|
|
|
|
|
emptyNameWarnShown bool
|
|
|
|
|
vendorExt []string
|
|
|
|
|
tagStore *tagStore
|
|
|
|
|
trace bool
|
|
|
|
|
canonicalFieldNames bool
|
|
|
|
|
log telegraf.Logger
|
2023-02-09 02:30:05 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// SubscribeGNMI and extract telemetry data
|
|
|
|
|
func (h *handler) subscribeGNMI(ctx context.Context, acc telegraf.Accumulator, tlscfg *tls.Config, request *gnmiLib.SubscribeRequest) error {
|
|
|
|
|
var creds credentials.TransportCredentials
|
|
|
|
|
if tlscfg != nil {
|
|
|
|
|
creds = credentials.NewTLS(tlscfg)
|
|
|
|
|
} else {
|
|
|
|
|
creds = insecure.NewCredentials()
|
|
|
|
|
}
|
|
|
|
|
opts := []grpc.DialOption{
|
|
|
|
|
grpc.WithTransportCredentials(creds),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if h.maxMsgSize > 0 {
|
|
|
|
|
opts = append(opts, grpc.WithDefaultCallOptions(
|
|
|
|
|
grpc.MaxCallRecvMsgSize(h.maxMsgSize),
|
|
|
|
|
))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
client, err := grpc.DialContext(ctx, h.address, opts...)
|
|
|
|
|
if err != nil {
|
2023-03-02 05:18:35 +08:00
|
|
|
return fmt.Errorf("failed to dial: %w", err)
|
2023-02-09 02:30:05 +08:00
|
|
|
}
|
|
|
|
|
defer client.Close()
|
|
|
|
|
|
|
|
|
|
subscribeClient, err := gnmiLib.NewGNMIClient(client).Subscribe(ctx)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("failed to setup subscription: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// If io.EOF is returned, the stream may have ended and stream status
|
|
|
|
|
// can be determined by calling Recv.
|
2023-03-02 05:18:35 +08:00
|
|
|
if err := subscribeClient.Send(request); err != nil && !errors.Is(err, io.EOF) {
|
2023-02-09 02:30:05 +08:00
|
|
|
return fmt.Errorf("failed to send subscription request: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
h.log.Debugf("Connection to gNMI device %s established", h.address)
|
2023-05-02 19:29:30 +08:00
|
|
|
|
|
|
|
|
// Used to report the status of the TCP connection to the device. If the
|
|
|
|
|
// GNMI connection goes down, but TCP is still up this will still report
|
|
|
|
|
// connected until the TCP connection times out.
|
|
|
|
|
connectStat := selfstat.Register("gnmi", "grpc_connection_status", map[string]string{"source": h.address})
|
|
|
|
|
connectStat.Set(1)
|
|
|
|
|
|
2023-02-09 02:30:05 +08:00
|
|
|
defer h.log.Debugf("Connection to gNMI device %s closed", h.address)
|
|
|
|
|
for ctx.Err() == nil {
|
|
|
|
|
var reply *gnmiLib.SubscribeResponse
|
|
|
|
|
if reply, err = subscribeClient.Recv(); err != nil {
|
2023-03-02 05:18:35 +08:00
|
|
|
if !errors.Is(err, io.EOF) && ctx.Err() == nil {
|
2023-05-02 19:29:30 +08:00
|
|
|
connectStat.Set(0)
|
2023-02-09 02:30:05 +08:00
|
|
|
return fmt.Errorf("aborted gNMI subscription: %w", err)
|
|
|
|
|
}
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if h.trace {
|
|
|
|
|
buf, err := protojson.Marshal(reply)
|
|
|
|
|
if err != nil {
|
|
|
|
|
h.log.Debugf("marshal failed: %v", err)
|
|
|
|
|
} else {
|
|
|
|
|
t := reply.GetUpdate().GetTimestamp()
|
|
|
|
|
h.log.Debugf("update_%v: %s", t, string(buf))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if response, ok := reply.Response.(*gnmiLib.SubscribeResponse_Update); ok {
|
2023-04-27 23:30:21 +08:00
|
|
|
h.handleSubscribeResponseUpdate(acc, response, reply.GetExtension())
|
2023-02-09 02:30:05 +08:00
|
|
|
}
|
|
|
|
|
}
|
2023-05-02 19:29:30 +08:00
|
|
|
|
|
|
|
|
connectStat.Set(0)
|
2023-02-09 02:30:05 +08:00
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Handle SubscribeResponse_Update message from gNMI and parse contained telemetry data
|
2023-04-27 23:30:21 +08:00
|
|
|
func (h *handler) handleSubscribeResponseUpdate(acc telegraf.Accumulator, response *gnmiLib.SubscribeResponse_Update, extension []*gnmiExt.Extension) {
|
2023-02-09 02:30:05 +08:00
|
|
|
var prefix, prefixAliasPath string
|
|
|
|
|
grouper := metric.NewSeriesGrouper()
|
|
|
|
|
timestamp := time.Unix(0, response.Update.Timestamp)
|
|
|
|
|
prefixTags := make(map[string]string)
|
|
|
|
|
|
2023-04-27 23:30:21 +08:00
|
|
|
// iter on each extension
|
|
|
|
|
for _, ext := range extension {
|
|
|
|
|
currentExt := ext.GetRegisteredExt().Msg
|
|
|
|
|
if currentExt == nil {
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
// extension ID
|
|
|
|
|
switch ext.GetRegisteredExt().Id {
|
|
|
|
|
// Juniper Header extention
|
|
|
|
|
//EID_JUNIPER_TELEMETRY_HEADER = 1;
|
|
|
|
|
case eidJuniperTelemetryHeader:
|
|
|
|
|
// Decode it only if user requested it
|
|
|
|
|
if choice.Contains("juniper_header", h.vendorExt) {
|
|
|
|
|
juniperHeader := &jnprHeader.GnmiJuniperTelemetryHeaderExtension{}
|
|
|
|
|
// unmarshal extention
|
|
|
|
|
err := proto.Unmarshal(currentExt, juniperHeader)
|
|
|
|
|
if err != nil {
|
|
|
|
|
h.log.Errorf("unmarshal gnmi Juniper Header extention failed: %w", err)
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
// Add only relevant Tags from the Juniper Header extention.
|
|
|
|
|
// These are requiered for aggregation
|
|
|
|
|
prefixTags["component_id"] = fmt.Sprint(juniperHeader.GetComponentId())
|
|
|
|
|
prefixTags["component"] = fmt.Sprint(juniperHeader.GetComponent())
|
|
|
|
|
prefixTags["sub_component_id"] = fmt.Sprint(juniperHeader.GetSubComponentId())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
default:
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2023-02-09 02:30:05 +08:00
|
|
|
if response.Update.Prefix != nil {
|
2023-05-23 21:06:02 +08:00
|
|
|
var origin string
|
2023-02-09 02:30:05 +08:00
|
|
|
var err error
|
2023-05-23 21:06:02 +08:00
|
|
|
if origin, prefix, prefixAliasPath, err = handlePath(response.Update.Prefix, prefixTags, h.aliases, ""); err != nil {
|
2023-02-09 02:30:05 +08:00
|
|
|
h.log.Errorf("handling path %q failed: %v", response.Update.Prefix, err)
|
|
|
|
|
}
|
2023-05-23 21:06:02 +08:00
|
|
|
prefix = origin + prefix
|
2023-02-09 02:30:05 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
prefixTags["source"], _, _ = net.SplitHostPort(h.address)
|
|
|
|
|
if prefix != "" {
|
|
|
|
|
prefixTags["path"] = prefix
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Process and remove tag-updates from the response first so we will
|
|
|
|
|
// add all available tags to the metrics later.
|
|
|
|
|
var valueUpdates []*gnmiLib.Update
|
|
|
|
|
for _, update := range response.Update.Update {
|
|
|
|
|
fullPath := pathWithPrefix(response.Update.Prefix, update.Path)
|
|
|
|
|
|
|
|
|
|
// Prepare tags from prefix
|
|
|
|
|
tags := make(map[string]string, len(prefixTags))
|
|
|
|
|
for key, val := range prefixTags {
|
|
|
|
|
tags[key] = val
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
_, fields := h.handleTelemetryField(update, tags, prefix)
|
|
|
|
|
|
|
|
|
|
var tagUpdate bool
|
|
|
|
|
for _, tagSub := range h.tagsubs {
|
|
|
|
|
if !equalPathNoKeys(fullPath, tagSub.fullPath) {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
h.log.Debugf("Tag-subscription update for %q: %+v", tagSub.Name, update)
|
|
|
|
|
if err := h.tagStore.insert(tagSub, fullPath, fields, tags); err != nil {
|
|
|
|
|
h.log.Errorf("inserting tag failed: %w", err)
|
|
|
|
|
}
|
|
|
|
|
tagUpdate = true
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
if !tagUpdate {
|
|
|
|
|
valueUpdates = append(valueUpdates, update)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Parse individual Update message and create measurements
|
|
|
|
|
var name, lastAliasPath string
|
|
|
|
|
for _, update := range valueUpdates {
|
|
|
|
|
fullPath := pathWithPrefix(response.Update.Prefix, update.Path)
|
|
|
|
|
|
|
|
|
|
// Prepare tags from prefix
|
|
|
|
|
tags := make(map[string]string, len(prefixTags))
|
|
|
|
|
for key, val := range prefixTags {
|
|
|
|
|
tags[key] = val
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
aliasPath, fields := h.handleTelemetryField(update, tags, prefix)
|
|
|
|
|
|
|
|
|
|
// Add the tags derived via tag-subscriptions
|
|
|
|
|
for k, v := range h.tagStore.lookup(fullPath, tags) {
|
|
|
|
|
tags[k] = v
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Inherent valid alias from prefix parsing
|
|
|
|
|
if len(prefixAliasPath) > 0 && len(aliasPath) == 0 {
|
|
|
|
|
aliasPath = prefixAliasPath
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Lookup alias if alias-path has changed
|
|
|
|
|
if aliasPath != lastAliasPath {
|
|
|
|
|
name = prefix
|
|
|
|
|
if alias, ok := h.aliases[aliasPath]; ok {
|
|
|
|
|
name = alias
|
|
|
|
|
} else {
|
|
|
|
|
h.log.Debugf("No measurement alias for gNMI path: %s", name)
|
|
|
|
|
}
|
|
|
|
|
lastAliasPath = aliasPath
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Check for empty names
|
|
|
|
|
if name == "" && !h.emptyNameWarnShown {
|
|
|
|
|
h.log.Warnf(emptyNameWarning, response.Update)
|
|
|
|
|
h.emptyNameWarnShown = true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Group metrics
|
|
|
|
|
for k, v := range fields {
|
|
|
|
|
key := k
|
|
|
|
|
if len(aliasPath) < len(key) && len(aliasPath) != 0 {
|
|
|
|
|
// This may not be an exact prefix, due to naming style
|
|
|
|
|
// conversion on the key.
|
|
|
|
|
key = key[len(aliasPath)+1:]
|
|
|
|
|
} else if len(aliasPath) >= len(key) {
|
2023-05-24 22:14:06 +08:00
|
|
|
if h.canonicalFieldNames {
|
|
|
|
|
// Strip the origin is any for the field names
|
|
|
|
|
if parts := strings.SplitN(key, ":", 2); len(parts) == 2 {
|
|
|
|
|
key = parts[1]
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
// Otherwise use the last path element as the field key.
|
|
|
|
|
key = path.Base(key)
|
|
|
|
|
|
|
|
|
|
// If there are no elements skip the item; this would be an
|
|
|
|
|
// invalid message.
|
|
|
|
|
key = strings.TrimLeft(key, "/.")
|
|
|
|
|
if key == "" {
|
|
|
|
|
h.log.Errorf("invalid empty path: %q", k)
|
|
|
|
|
continue
|
|
|
|
|
}
|
2023-02-09 02:30:05 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
grouper.Add(name, tags, timestamp, key, v)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Add grouped measurements
|
|
|
|
|
for _, metricToAdd := range grouper.Metrics() {
|
|
|
|
|
acc.AddMetric(metricToAdd)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// HandleTelemetryField and add it to a measurement
|
|
|
|
|
func (h *handler) handleTelemetryField(update *gnmiLib.Update, tags map[string]string, prefix string) (string, map[string]interface{}) {
|
2023-05-23 21:06:02 +08:00
|
|
|
_, gpath, aliasPath, err := handlePath(update.Path, tags, h.aliases, prefix)
|
2023-02-09 02:30:05 +08:00
|
|
|
if err != nil {
|
|
|
|
|
h.log.Errorf("handling path %q failed: %v", update.Path, err)
|
|
|
|
|
}
|
|
|
|
|
fields, err := gnmiToFields(strings.Replace(gpath, "-", "_", -1), update.Val)
|
|
|
|
|
if err != nil {
|
|
|
|
|
h.log.Errorf("error parsing update value %q: %v", update.Val, err)
|
|
|
|
|
}
|
|
|
|
|
return aliasPath, fields
|
|
|
|
|
}
|