diff --git a/plugins/inputs/gnmi/gnmi.go b/plugins/inputs/gnmi/gnmi.go index 4dfaa8e4e..3832669b8 100644 --- a/plugins/inputs/gnmi/gnmi.go +++ b/plugins/inputs/gnmi/gnmi.go @@ -7,6 +7,7 @@ import ( _ "embed" "errors" "fmt" + "net" "strings" "sync" "time" @@ -272,8 +273,14 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error { go func(addr string) { defer c.wg.Done() + host, port, err := net.SplitHostPort(addr) + if err != nil { + acc.AddError(fmt.Errorf("unable to parse address %s: %w", addr, err)) + return + } h := handler{ - address: addr, + host: host, + port: port, aliases: c.internalAliases, tagsubs: c.TagSubscriptions, maxMsgSize: int(c.MaxMsgSize), diff --git a/plugins/inputs/gnmi/handler.go b/plugins/inputs/gnmi/handler.go index 62c572bdd..03771fb8d 100644 --- a/plugins/inputs/gnmi/handler.go +++ b/plugins/inputs/gnmi/handler.go @@ -33,7 +33,8 @@ import ( const eidJuniperTelemetryHeader = 1 type handler struct { - address string + host string + port string aliases map[*pathInfo]string tagsubs []tagSubscription maxMsgSize int @@ -72,7 +73,14 @@ func (h *handler) subscribeGNMI(ctx context.Context, acc telegraf.Accumulator, t opts = append(opts, grpc.WithKeepaliveParams(h.ClientParameters)) } - client, err := grpc.NewClient(h.address, opts...) + // Used to report the status of the TCP connection to the device. If the + // GNMI connection goes down, but TCP is still up this will still report + // connected until the TCP connection times out. + connectStat := selfstat.Register("gnmi", "grpc_connection_status", map[string]string{"source": h.host}) + defer connectStat.Set(0) + + address := net.JoinHostPort(h.host, h.port) + client, err := grpc.NewClient(address, opts...) if err != nil { return fmt.Errorf("failed to dial: %w", err) } @@ -88,21 +96,14 @@ func (h *handler) subscribeGNMI(ctx context.Context, acc telegraf.Accumulator, t if err := subscribeClient.Send(request); err != nil && !errors.Is(err, io.EOF) { return fmt.Errorf("failed to send subscription request: %w", err) } - - h.log.Debugf("Connection to gNMI device %s established", h.address) - - // Used to report the status of the TCP connection to the device. If the - // GNMI connection goes down, but TCP is still up this will still report - // connected until the TCP connection times out. - connectStat := selfstat.Register("gnmi", "grpc_connection_status", map[string]string{"source": h.address}) connectStat.Set(1) + h.log.Debugf("Connection to gNMI device %s established", address) - defer h.log.Debugf("Connection to gNMI device %s closed", h.address) + defer h.log.Debugf("Connection to gNMI device %s closed", address) for ctx.Err() == nil { var reply *gnmi.SubscribeResponse if reply, err = subscribeClient.Recv(); err != nil { if !errors.Is(err, io.EOF) && ctx.Err() == nil { - connectStat.Set(0) return fmt.Errorf("aborted gNMI subscription: %w", err) } break @@ -121,8 +122,6 @@ func (h *handler) subscribeGNMI(ctx context.Context, acc telegraf.Accumulator, t h.handleSubscribeResponseUpdate(acc, response, reply.GetExtension()) } } - - connectStat.Set(0) return nil } @@ -164,11 +163,7 @@ func (h *handler) handleSubscribeResponseUpdate(acc telegraf.Accumulator, respon prefix := newInfoFromPath(response.Update.Prefix) // Add info to the tags - var err error - headerTags["source"], _, err = net.SplitHostPort(h.address) - if err != nil { - h.log.Errorf("unable to parse address %s: %v", h.address, err) - } + headerTags["source"] = h.host if !prefix.empty() { headerTags["path"] = prefix.fullPath() }