fix: update GroundWork SDK and improve logging (#10255)

This commit is contained in:
Vladislav 2021-12-23 20:43:34 +03:00 committed by GitHub
parent 0b99c6c20a
commit 1d6d01a6db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 103 additions and 36 deletions

2
go.mod
View File

@ -69,7 +69,7 @@ require (
github.com/gorilla/websocket v1.4.2
github.com/gosnmp/gosnmp v1.33.0
github.com/grid-x/modbus v0.0.0-20210224155242-c4a3d042e99b
github.com/gwos/tcg/sdk v0.0.0-20211130162655-32ad77586ccf
github.com/gwos/tcg/sdk v0.0.0-20211223101342-35fbd1ae683c
github.com/harlow/kinesis-consumer v0.3.6-0.20210911031324-5a873d6e9fec
github.com/hashicorp/consul/api v1.9.1
github.com/hashicorp/go-uuid v1.0.2

4
go.sum
View File

@ -1146,8 +1146,8 @@ github.com/grpc-ecosystem/grpc-gateway v1.14.5/go.mod h1:UJ0EZAp832vCd54Wev9N1BM
github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw=
github.com/gwos/tcg/sdk v0.0.0-20211130162655-32ad77586ccf h1:xSjgqa6SiBaSC4sTC4HniWRLww2vbl3u0KyMUYeryJI=
github.com/gwos/tcg/sdk v0.0.0-20211130162655-32ad77586ccf/go.mod h1:OjlJNRXwlEjznVfU3YtLWH8FyM7KWHUevXDI47UeZeM=
github.com/gwos/tcg/sdk v0.0.0-20211223101342-35fbd1ae683c h1:befb5xGUwNCoBuN/akLFCKekUzr0ixyws3aAX/7TaOk=
github.com/gwos/tcg/sdk v0.0.0-20211223101342-35fbd1ae683c/go.mod h1:OjlJNRXwlEjznVfU3YtLWH8FyM7KWHUevXDI47UeZeM=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
github.com/harlow/kinesis-consumer v0.3.6-0.20210911031324-5a873d6e9fec h1:ya+kv1eNnd5QhcHuaj5g5eMq5Ra3VCNaPY2ZI7Aq91o=

View File

@ -31,8 +31,12 @@ This plugin writes to a [GroundWork Monitor][1] instance. Plugin only supports G
## List of tags used by the plugin
* service - to define the name of the service you want to monitor.
* status - to define the status of the service.
* status - to define the status of the service. Supported statuses: "SERVICE_OK", "SERVICE_WARNING", "SERVICE_UNSCHEDULED_CRITICAL", "SERVICE_PENDING", "SERVICE_SCHEDULED_CRITICAL", "SERVICE_UNKNOWN".
* message - to provide any message you want.
* unitType - to use in monitoring contexts(subset of The Unified Code for Units of Measure standard). Supported types: "1", "%cpu", "KB", "GB", "MB".
* warning - to define warning threshold value.
* critical - to define critical threshold value.
## NOTE
The current version of GroundWork Monitor does not support metrics whose values are strings. Such metrics will be skipped and will not be added to the final payload. You can find more context in this pull request: [#10255]( https://github.com/influxdata/telegraf/pull/10255)

View File

@ -1,6 +1,7 @@
package groundwork
import (
"bytes"
"context"
"encoding/json"
"errors"
@ -8,6 +9,7 @@ import (
"strconv"
"github.com/gwos/tcg/sdk/clients"
"github.com/gwos/tcg/sdk/logper"
"github.com/gwos/tcg/sdk/transit"
"github.com/hashicorp/go-uuid"
@ -85,6 +87,22 @@ func (g *Groundwork) Init() error {
IsDynamicInventory: true,
},
}
logper.SetLogger(
func(fields interface{}, format string, a ...interface{}) {
g.Log.Error(adaptLog(fields, format, a...))
},
func(fields interface{}, format string, a ...interface{}) {
g.Log.Warn(adaptLog(fields, format, a...))
},
func(fields interface{}, format string, a ...interface{}) {
g.Log.Info(adaptLog(fields, format, a...))
},
func(fields interface{}, format string, a ...interface{}) {
g.Log.Debug(adaptLog(fields, format, a...))
},
func() bool { return telegraf.Debug },
)
return nil
}
@ -105,7 +123,7 @@ func (g *Groundwork) Close() error {
}
func (g *Groundwork) Write(metrics []telegraf.Metric) error {
resourceToServicesMap := make(map[string][]transit.DynamicMonitoredService)
resourceToServicesMap := make(map[string][]transit.MonitoredService)
for _, metric := range metrics {
resource, service, err := g.parseMetric(metric)
if err != nil {
@ -115,18 +133,20 @@ func (g *Groundwork) Write(metrics []telegraf.Metric) error {
resourceToServicesMap[resource] = append(resourceToServicesMap[resource], *service)
}
var resources []transit.DynamicMonitoredResource
var resources []transit.MonitoredResource
for resourceName, services := range resourceToServicesMap {
resources = append(resources, transit.DynamicMonitoredResource{
resources = append(resources, transit.MonitoredResource{
BaseResource: transit.BaseResource{
BaseTransitData: transit.BaseTransitData{
BaseInfo: transit.BaseInfo{
Name: resourceName,
Type: transit.Host,
Type: transit.ResourceTypeHost,
},
},
Status: transit.HostUp,
LastCheckTime: transit.NewTimestamp(),
Services: services,
MonitoredInfo: transit.MonitoredInfo{
Status: transit.HostUp,
LastCheckTime: transit.NewTimestamp(),
},
Services: services,
})
}
@ -134,7 +154,7 @@ func (g *Groundwork) Write(metrics []telegraf.Metric) error {
if err != nil {
return err
}
requestJSON, err := json.Marshal(transit.DynamicResourcesWithServicesRequest{
requestJSON, err := json.Marshal(transit.ResourcesWithServicesRequest{
Context: &transit.TracerContext{
AppType: "TELEGRAF",
AgentID: g.AgentID,
@ -152,7 +172,7 @@ func (g *Groundwork) Write(metrics []telegraf.Metric) error {
_, err = g.client.SendResourcesWithMetrics(context.Background(), requestJSON)
if err != nil {
return fmt.Errorf("error while sending: %v", err)
return fmt.Errorf("error while sending: %w", err)
}
return nil
@ -172,7 +192,7 @@ func init() {
})
}
func (g *Groundwork) parseMetric(metric telegraf.Metric) (string, *transit.DynamicMonitoredService, error) {
func (g *Groundwork) parseMetric(metric telegraf.Metric) (string, *transit.MonitoredService, error) {
resource := g.DefaultHost
if value, present := metric.GetTag(g.ResourceTag); present {
resource = value
@ -214,16 +234,18 @@ func (g *Groundwork) parseMetric(metric telegraf.Metric) (string, *transit.Dynam
lastCheckTime := transit.NewTimestamp()
lastCheckTime.Time = metric.Time()
serviceObject := transit.DynamicMonitoredService{
BaseTransitData: transit.BaseTransitData{
serviceObject := transit.MonitoredService{
BaseInfo: transit.BaseInfo{
Name: service,
Type: transit.Service,
Type: transit.ResourceTypeService,
Owner: resource,
},
Status: transit.MonitorStatus(status),
LastCheckTime: lastCheckTime,
LastPlugInOutput: message,
Metrics: nil,
MonitoredInfo: transit.MonitoredInfo{
Status: transit.MonitorStatus(status),
LastCheckTime: lastCheckTime,
LastPluginOutput: message,
},
Metrics: nil,
}
for _, value := range metric.FieldList() {
@ -234,7 +256,7 @@ func (g *Groundwork) parseMetric(metric telegraf.Metric) (string, *transit.Dynam
Label: value.Key + "_wn",
Value: &transit.TypedValue{
ValueType: transit.DoubleType,
DoubleValue: warning,
DoubleValue: &warning,
},
})
}
@ -244,15 +266,19 @@ func (g *Groundwork) parseMetric(metric telegraf.Metric) (string, *transit.Dynam
Label: value.Key + "_cr",
Value: &transit.TypedValue{
ValueType: transit.DoubleType,
DoubleValue: critical,
DoubleValue: &critical,
},
})
}
typedValue := new(transit.TypedValue)
err := typedValue.FromInterface(value.Value)
if err != nil {
return "", nil, err
typedValue := transit.NewTypedValue(value.Value)
if typedValue == nil {
g.Log.Warnf("could not convert type %T, skipping field %s: %v", value.Value, value.Key, value.Value)
continue
}
if typedValue.ValueType == transit.StringType {
g.Log.Warnf("string values are not supported, skipping field %s: %q", value.Key, value.Value)
continue
}
serviceObject.Metrics = append(serviceObject.Metrics, transit.TimeSeries{
@ -263,7 +289,7 @@ func (g *Groundwork) parseMetric(metric telegraf.Metric) (string, *transit.Dynam
},
Value: typedValue,
Unit: transit.UnitType(unitType),
Thresholds: &thresholds,
Thresholds: thresholds,
})
}
@ -287,3 +313,46 @@ func validStatus(status string) bool {
}
return false
}
func adaptLog(fields interface{}, format string, a ...interface{}) string {
buf := &bytes.Buffer{}
if format != "" {
_, _ = fmt.Fprintf(buf, format, a...)
}
fmtField := func(k string, v interface{}) {
format := " %s:"
if len(k) == 0 {
format = " "
}
if _, ok := v.(int); ok {
format += "%d"
} else {
format += "%q"
}
_, _ = fmt.Fprintf(buf, format, k, v)
}
if ff, ok := fields.(interface {
LogFields() (map[string]interface{}, map[string][]byte)
}); ok {
m1, m2 := ff.LogFields()
for k, v := range m1 {
fmtField(k, v)
}
for k, v := range m2 {
fmtField(k, v)
}
} else if ff, ok := fields.(map[string]interface{}); ok {
for k, v := range ff {
fmtField(k, v)
}
} else if ff, ok := fields.([]interface{}); ok {
for _, v := range ff {
fmtField("", v)
}
}
out := buf.Bytes()
if len(out) > 1 {
out = append(bytes.ToUpper(out[0:1]), out[1:]...)
}
return string(out)
}

View File

@ -23,7 +23,6 @@ const (
func TestWrite(t *testing.T) {
// Generate test metric with default name to test Write logic
floatMetric := testutil.TestMetric(1.0, "Float")
stringMetric := testutil.TestMetric("Test", "String")
// Simulate Groundwork server that should receive custom metrics
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@ -48,11 +47,6 @@ func TestWrite(t *testing.T) {
obj.Resources[0].Services[0].Metrics[0].Value.DoubleValue,
1.0,
)
require.Equal(
t,
obj.Resources[0].Services[1].Metrics[0].Value.StringValue,
"Test",
)
_, err = fmt.Fprintln(w, `OK`)
require.NoError(t, err)
@ -71,7 +65,7 @@ func TestWrite(t *testing.T) {
},
}
err := i.Write([]telegraf.Metric{floatMetric, stringMetric})
err := i.Write([]telegraf.Metric{floatMetric})
require.NoError(t, err)
defer server.Close()