fix(inputs.gnmi): Create selfstat to track connection state (#13149)
This commit is contained in:
parent
ca13259989
commit
f55d214082
|
|
@ -15,6 +15,7 @@ import (
|
||||||
"github.com/influxdata/telegraf/internal/choice"
|
"github.com/influxdata/telegraf/internal/choice"
|
||||||
"github.com/influxdata/telegraf/metric"
|
"github.com/influxdata/telegraf/metric"
|
||||||
jnprHeader "github.com/influxdata/telegraf/plugins/inputs/gnmi/extensions/jnpr_gnmi_extention"
|
jnprHeader "github.com/influxdata/telegraf/plugins/inputs/gnmi/extensions/jnpr_gnmi_extention"
|
||||||
|
"github.com/influxdata/telegraf/selfstat"
|
||||||
gnmiLib "github.com/openconfig/gnmi/proto/gnmi"
|
gnmiLib "github.com/openconfig/gnmi/proto/gnmi"
|
||||||
gnmiExt "github.com/openconfig/gnmi/proto/gnmi_ext"
|
gnmiExt "github.com/openconfig/gnmi/proto/gnmi_ext"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
|
@ -97,11 +98,19 @@ func (h *handler) subscribeGNMI(ctx context.Context, acc telegraf.Accumulator, t
|
||||||
}
|
}
|
||||||
|
|
||||||
h.log.Debugf("Connection to gNMI device %s established", h.address)
|
h.log.Debugf("Connection to gNMI device %s established", h.address)
|
||||||
|
|
||||||
|
// Used to report the status of the TCP connection to the device. If the
|
||||||
|
// GNMI connection goes down, but TCP is still up this will still report
|
||||||
|
// connected until the TCP connection times out.
|
||||||
|
connectStat := selfstat.Register("gnmi", "grpc_connection_status", map[string]string{"source": h.address})
|
||||||
|
connectStat.Set(1)
|
||||||
|
|
||||||
defer h.log.Debugf("Connection to gNMI device %s closed", h.address)
|
defer h.log.Debugf("Connection to gNMI device %s closed", h.address)
|
||||||
for ctx.Err() == nil {
|
for ctx.Err() == nil {
|
||||||
var reply *gnmiLib.SubscribeResponse
|
var reply *gnmiLib.SubscribeResponse
|
||||||
if reply, err = subscribeClient.Recv(); err != nil {
|
if reply, err = subscribeClient.Recv(); err != nil {
|
||||||
if !errors.Is(err, io.EOF) && ctx.Err() == nil {
|
if !errors.Is(err, io.EOF) && ctx.Err() == nil {
|
||||||
|
connectStat.Set(0)
|
||||||
return fmt.Errorf("aborted gNMI subscription: %w", err)
|
return fmt.Errorf("aborted gNMI subscription: %w", err)
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
|
|
@ -120,6 +129,8 @@ func (h *handler) subscribeGNMI(ctx context.Context, acc telegraf.Accumulator, t
|
||||||
h.handleSubscribeResponseUpdate(acc, response, reply.GetExtension())
|
h.handleSubscribeResponseUpdate(acc, response, reply.GetExtension())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
connectStat.Set(0)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue