fix(inputs.jti_openconfig_telemetry): Reauthenticate connection (#13647)

This commit is contained in:
Joshua Powers 2023-07-31 05:22:38 -06:00 committed by GitHub
parent f5afcc169c
commit 0496741f4a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 78 additions and 25 deletions

View File

@ -15,6 +15,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"github.com/influxdata/telegraf"
@ -44,10 +45,20 @@ type OpenConfigTelemetry struct {
Log telegraf.Logger
sensorsConfig []sensorConfig
grpcClientConns []*grpc.ClientConn
grpcClientConns []grpcConnection
wg *sync.WaitGroup
}
type grpcConnection struct {
connection *grpc.ClientConn
cancel context.CancelFunc
}
func (g *grpcConnection) Close() {
g.connection.Close()
g.cancel()
}
var (
// Regex to match and extract data points from path value in received key
keyPathRegex = regexp.MustCompile(`/([^/]*)\[([A-Za-z0-9\-/]*=[^\[]*)]`)
@ -250,34 +261,49 @@ func (m *OpenConfigTelemetry) collectData(
defer m.wg.Done()
for {
stream, err := c.TelemetrySubscribe(ctx,
&telemetry.SubscriptionRequest{PathList: sensor.pathList})
stream, err := c.TelemetrySubscribe(
ctx,
&telemetry.SubscriptionRequest{PathList: sensor.pathList},
)
if err != nil {
rpcStatus, _ := status.FromError(err)
// If service is currently unavailable and may come back later, retry
if rpcStatus.Code() != codes.Unavailable {
acc.AddError(fmt.Errorf("could not subscribe to %q: %w", grpcServer, err))
if rpcStatus.Code() == codes.Unauthenticated {
if m.Username != "" && m.Password != "" && m.ClientID != "" {
err := m.authenticate(ctx, grpcServer, grpcClientConn)
if err == nil {
time.Sleep(1 * time.Second)
continue
}
acc.AddError(fmt.Errorf("could not re-authenticate: %w", err))
}
} else if rpcStatus.Code() != codes.Unavailable {
// If service is currently unavailable and may come back later, retry
acc.AddError(fmt.Errorf("could not subscribe to %s on %q: %w", sensor.measurementName, grpcServer, err))
return
}
// Retry with delay. If delay is not provided, use default
if time.Duration(m.RetryDelay) > 0 {
m.Log.Debugf("Retrying %s with timeout %v", grpcServer, time.Duration(m.RetryDelay))
m.Log.Debugf("Retrying %s from %s with timeout %v", sensor.measurementName, grpcServer, time.Duration(m.RetryDelay))
time.Sleep(time.Duration(m.RetryDelay))
continue
}
return
}
m.Log.Debugf("Sucessfully subscribed to %s on %s", sensor.measurementName, grpcServer)
for {
r, err := stream.Recv()
if err != nil {
// If we encounter error in the stream, break so we can retry
// the connection
acc.AddError(fmt.Errorf("failed to read from %q: %w", grpcServer, err))
acc.AddError(fmt.Errorf("failed to read from %s from %s: %w", sensor.measurementName, grpcServer, err))
time.Sleep(1 * time.Second)
break
}
m.Log.Debugf("Received from %s: %v", grpcServer, r)
m.Log.Debugf("Received from %s on %s: %v", sensor.measurementName, grpcServer, r)
// Create a point and add to batch
tags := make(map[string]string)
@ -288,7 +314,7 @@ func (m *OpenConfigTelemetry) collectData(
dgroups := m.extractData(r, grpcServer)
// Print final data collection
m.Log.Debugf("Available collection for %s is: %v", grpcServer, dgroups)
m.Log.Debugf("Available collection for %s on %s: %v", sensor.measurementName, grpcServer, dgroups)
timestamp := time.Now()
// Iterate through data groups and add them
@ -315,6 +341,28 @@ func (m *OpenConfigTelemetry) collectData(
}
}
func (m *OpenConfigTelemetry) authenticate(ctx context.Context, server string, grpcClientConn *grpc.ClientConn) error {
lc := authentication.NewLoginClient(grpcClientConn)
loginReply, err := lc.LoginCheck(
ctx,
&authentication.LoginRequest{
UserName: m.Username,
Password: m.Password,
ClientId: m.ClientID,
},
)
if err != nil {
return fmt.Errorf("could not initiate login check for %s: %w", server, err)
}
// Check if the user is authenticated. Bail if auth error
if !loginReply.Result {
return fmt.Errorf("failed to authenticate the user for %s", server)
}
return nil
}
func (m *OpenConfigTelemetry) Start(acc telegraf.Accumulator) error {
// Build sensors config
if m.splitSensorConfig() == 0 {
@ -337,17 +385,28 @@ func (m *OpenConfigTelemetry) Start(acc telegraf.Accumulator) error {
// Connect to given list of servers and start collecting data
var grpcClientConn *grpc.ClientConn
var wg sync.WaitGroup
ctx := context.Background()
m.wg = &wg
for _, server := range m.Servers {
ctx, cancel := context.WithCancel(context.Background())
if len(m.Username) > 0 {
ctx = metadata.AppendToOutgoingContext(
ctx,
"username", m.Username,
"password", m.Password,
"clientid", m.ClientID,
)
}
// Extract device address and port
grpcServer, grpcPort, err := net.SplitHostPort(server)
if err != nil {
m.Log.Errorf("Invalid server address: %s", err.Error())
cancel()
continue
}
grpcClientConn, err = grpc.Dial(server, opt)
grpcClientConn, err = grpc.DialContext(ctx, server, opt)
if err != nil {
m.Log.Errorf("Failed to connect to %s: %s", server, err.Error())
} else {
@ -355,21 +414,15 @@ func (m *OpenConfigTelemetry) Start(acc telegraf.Accumulator) error {
}
// Add to the list of client connections
m.grpcClientConns = append(m.grpcClientConns, grpcClientConn)
connection := grpcConnection{
connection: grpcClientConn,
cancel: cancel,
}
m.grpcClientConns = append(m.grpcClientConns, connection)
if m.Username != "" && m.Password != "" && m.ClientID != "" {
lc := authentication.NewLoginClient(grpcClientConn)
loginReply, loginErr := lc.LoginCheck(ctx,
&authentication.LoginRequest{UserName: m.Username,
Password: m.Password, ClientId: m.ClientID})
if loginErr != nil {
m.Log.Errorf("Could not initiate login check for %s: %v", server, loginErr)
continue
}
// Check if the user is authenticated. Bail if auth error
if !loginReply.Result {
m.Log.Errorf("Failed to authenticate the user for %s", server)
if err := m.authenticate(ctx, server, grpcClientConn); err != nil {
m.Log.Errorf("error authenticating to %s: %w", grpcServer, err)
continue
}
}