fix(inputs.gnmi): Register connection statistics before creating client (#16171)

This commit is contained in:
Lauri 2024-11-12 19:02:21 +02:00 committed by GitHub
parent b4fdd52ff3
commit e5e52f0a4a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 21 additions and 19 deletions

View File

@ -7,6 +7,7 @@ import (
_ "embed" _ "embed"
"errors" "errors"
"fmt" "fmt"
"net"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -272,8 +273,14 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error {
go func(addr string) { go func(addr string) {
defer c.wg.Done() defer c.wg.Done()
host, port, err := net.SplitHostPort(addr)
if err != nil {
acc.AddError(fmt.Errorf("unable to parse address %s: %w", addr, err))
return
}
h := handler{ h := handler{
address: addr, host: host,
port: port,
aliases: c.internalAliases, aliases: c.internalAliases,
tagsubs: c.TagSubscriptions, tagsubs: c.TagSubscriptions,
maxMsgSize: int(c.MaxMsgSize), maxMsgSize: int(c.MaxMsgSize),

View File

@ -33,7 +33,8 @@ import (
const eidJuniperTelemetryHeader = 1 const eidJuniperTelemetryHeader = 1
type handler struct { type handler struct {
address string host string
port string
aliases map[*pathInfo]string aliases map[*pathInfo]string
tagsubs []tagSubscription tagsubs []tagSubscription
maxMsgSize int maxMsgSize int
@ -72,7 +73,14 @@ func (h *handler) subscribeGNMI(ctx context.Context, acc telegraf.Accumulator, t
opts = append(opts, grpc.WithKeepaliveParams(h.ClientParameters)) opts = append(opts, grpc.WithKeepaliveParams(h.ClientParameters))
} }
client, err := grpc.NewClient(h.address, opts...) // Used to report the status of the TCP connection to the device. If the
// GNMI connection goes down, but TCP is still up this will still report
// connected until the TCP connection times out.
connectStat := selfstat.Register("gnmi", "grpc_connection_status", map[string]string{"source": h.host})
defer connectStat.Set(0)
address := net.JoinHostPort(h.host, h.port)
client, err := grpc.NewClient(address, opts...)
if err != nil { if err != nil {
return fmt.Errorf("failed to dial: %w", err) return fmt.Errorf("failed to dial: %w", err)
} }
@ -88,21 +96,14 @@ func (h *handler) subscribeGNMI(ctx context.Context, acc telegraf.Accumulator, t
if err := subscribeClient.Send(request); err != nil && !errors.Is(err, io.EOF) { if err := subscribeClient.Send(request); err != nil && !errors.Is(err, io.EOF) {
return fmt.Errorf("failed to send subscription request: %w", err) return fmt.Errorf("failed to send subscription request: %w", err)
} }
h.log.Debugf("Connection to gNMI device %s established", h.address)
// Used to report the status of the TCP connection to the device. If the
// GNMI connection goes down, but TCP is still up this will still report
// connected until the TCP connection times out.
connectStat := selfstat.Register("gnmi", "grpc_connection_status", map[string]string{"source": h.address})
connectStat.Set(1) connectStat.Set(1)
h.log.Debugf("Connection to gNMI device %s established", address)
defer h.log.Debugf("Connection to gNMI device %s closed", h.address) defer h.log.Debugf("Connection to gNMI device %s closed", address)
for ctx.Err() == nil { for ctx.Err() == nil {
var reply *gnmi.SubscribeResponse var reply *gnmi.SubscribeResponse
if reply, err = subscribeClient.Recv(); err != nil { if reply, err = subscribeClient.Recv(); err != nil {
if !errors.Is(err, io.EOF) && ctx.Err() == nil { if !errors.Is(err, io.EOF) && ctx.Err() == nil {
connectStat.Set(0)
return fmt.Errorf("aborted gNMI subscription: %w", err) return fmt.Errorf("aborted gNMI subscription: %w", err)
} }
break break
@ -121,8 +122,6 @@ func (h *handler) subscribeGNMI(ctx context.Context, acc telegraf.Accumulator, t
h.handleSubscribeResponseUpdate(acc, response, reply.GetExtension()) h.handleSubscribeResponseUpdate(acc, response, reply.GetExtension())
} }
} }
connectStat.Set(0)
return nil return nil
} }
@ -164,11 +163,7 @@ func (h *handler) handleSubscribeResponseUpdate(acc telegraf.Accumulator, respon
prefix := newInfoFromPath(response.Update.Prefix) prefix := newInfoFromPath(response.Update.Prefix)
// Add info to the tags // Add info to the tags
var err error headerTags["source"] = h.host
headerTags["source"], _, err = net.SplitHostPort(h.address)
if err != nil {
h.log.Errorf("unable to parse address %s: %v", h.address, err)
}
if !prefix.empty() { if !prefix.empty() {
headerTags["path"] = prefix.fullPath() headerTags["path"] = prefix.fullPath()
} }