feat(outputs.groundwork): Improve metric parsing to extend output (#11443)

This commit is contained in:
Pavlo Sumkin 2022-07-20 22:05:01 +03:00 committed by GitHub
parent 46f059ebfd
commit 678e6e7a8c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 243 additions and 136 deletions

2
go.mod
View File

@ -79,7 +79,7 @@ require (
github.com/gorilla/websocket v1.5.0
github.com/gosnmp/gosnmp v1.34.0
github.com/grid-x/modbus v0.0.0-20211113184042-7f2251c342c9
github.com/gwos/tcg/sdk v0.0.0-20211223101342-35fbd1ae683c
github.com/gwos/tcg/sdk v0.0.0-20220621192633-df0eac0a1a4c
github.com/harlow/kinesis-consumer v0.3.6-0.20210911031324-5a873d6e9fec
github.com/hashicorp/consul/api v1.12.0
github.com/hashicorp/go-uuid v1.0.2

4
go.sum
View File

@ -1197,8 +1197,8 @@ github.com/grpc-ecosystem/grpc-gateway v1.14.5/go.mod h1:UJ0EZAp832vCd54Wev9N1BM
github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw=
github.com/gwos/tcg/sdk v0.0.0-20211223101342-35fbd1ae683c h1:befb5xGUwNCoBuN/akLFCKekUzr0ixyws3aAX/7TaOk=
github.com/gwos/tcg/sdk v0.0.0-20211223101342-35fbd1ae683c/go.mod h1:OjlJNRXwlEjznVfU3YtLWH8FyM7KWHUevXDI47UeZeM=
github.com/gwos/tcg/sdk v0.0.0-20220621192633-df0eac0a1a4c h1:pVr0TkSFnMP4BWSsEak/4bxD8/K+foJ9V8DGyZ6PIDE=
github.com/gwos/tcg/sdk v0.0.0-20220621192633-df0eac0a1a4c/go.mod h1:4yzxLBACr76Is0AMAkE0F/fqWBk28p2tzeO06yDGR/Y=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
github.com/harlow/kinesis-consumer v0.3.6-0.20210911031324-5a873d6e9fec h1:ya+kv1eNnd5QhcHuaj5g5eMq5Ra3VCNaPY2ZI7Aq91o=

View File

@ -38,19 +38,26 @@ GW8+
## List of tags used by the plugin
* group - to define the name of the group you want to monitor, can be changed
with config.
* host - to define the name of the host you want to monitor, can be changed with
config.
* service - to define the name of the service you want to monitor.
* status - to define the status of the service. Supported statuses:
* __group__ - to define the name of the group you want to monitor,
can be changed with config.
* __host__ - to define the name of the host you want to monitor,
can be changed with config.
* __service__ - to define the name of the service you want to monitor.
* __status__ - to define the status of the service. Supported statuses:
"SERVICE_OK", "SERVICE_WARNING", "SERVICE_UNSCHEDULED_CRITICAL",
"SERVICE_PENDING", "SERVICE_SCHEDULED_CRITICAL", "SERVICE_UNKNOWN".
* message - to provide any message you want.
* unitType - to use in monitoring contexts(subset of The Unified Code for Units
of Measure standard). Supported types: "1", "%cpu", "KB", "GB", "MB".
* warning - to define warning threshold value.
* critical - to define critical threshold value.
* __message__ - to provide any message you want,
it overrides __message__ field value.
* __unitType__ - to use in monitoring contexts (subset of The Unified Code for
Units of Measure standard). Supported types: "1", "%cpu", "KB", "GB", "MB".
* __critical__ - to define the default critical threshold value,
it overrides value_cr field value.
* __warning__ - to define the default warning threshold value,
it overrides value_wn field value.
* __value_cr__ - to define critical threshold value,
it overrides __critical__ tag value and __value_cr__ field value.
* __value_wn__ - to define warning threshold value,
it overrides __warning__ tag value and __value_wn__ field value.
## NOTE

View File

@ -9,6 +9,7 @@ import (
"errors"
"fmt"
"strconv"
"strings"
"github.com/gwos/tcg/sdk/clients"
"github.com/gwos/tcg/sdk/logper"
@ -48,28 +49,28 @@ func (*Groundwork) SampleConfig() string {
func (g *Groundwork) Init() error {
if g.Server == "" {
return errors.New("no 'url' provided")
return errors.New(`no "url" provided`)
}
if g.AgentID == "" {
return errors.New("no 'agent_id' provided")
return errors.New(`no "agent_id" provided`)
}
if g.Username == "" {
return errors.New("no 'username' provided")
return errors.New(`no "username" provided`)
}
if g.Password == "" {
return errors.New("no 'password' provided")
return errors.New(`no "password" provided`)
}
if g.DefaultAppType == "" {
return errors.New("no 'default_app_type' provided")
return errors.New(`no "default_app_type" provided`)
}
if g.DefaultHost == "" {
return errors.New("no 'default_host' provided")
return errors.New(`no "default_host" provided`)
}
if g.ResourceTag == "" {
return errors.New("no 'resource_tag' provided")
return errors.New(`no "resource_tag" provided`)
}
if !validStatus(g.DefaultServiceState) {
return errors.New("invalid 'default_service_state' provided")
return errors.New(`invalid "default_service_state" provided`)
}
g.client = clients.GWClient{
@ -214,114 +215,167 @@ func (g *Groundwork) parseMetric(metric telegraf.Metric) (metricMeta, *transit.M
group, _ := metric.GetTag(g.GroupTag)
resource := g.DefaultHost
if value, present := metric.GetTag(g.ResourceTag); present {
resource = value
if v, ok := metric.GetTag(g.ResourceTag); ok {
resource = v
}
service := metric.Name()
if value, present := metric.GetTag("service"); present {
service = value
if v, ok := metric.GetTag("service"); ok {
service = v
}
status := g.DefaultServiceState
value, statusPresent := metric.GetTag("status")
if validStatus(value) {
status = value
}
message, _ := metric.GetTag("message")
unitType := string(transit.UnitCounter)
if value, present := metric.GetTag("unitType"); present {
unitType = value
}
var critical float64
value, criticalPresent := metric.GetTag("critical")
if criticalPresent {
if s, err := strconv.ParseFloat(value, 64); err == nil {
critical = s
}
}
var warning float64
value, warningPresent := metric.GetTag("warning")
if warningPresent {
if s, err := strconv.ParseFloat(value, 64); err == nil {
warning = s
}
if v, ok := metric.GetTag("unitType"); ok {
unitType = v
}
lastCheckTime := transit.NewTimestamp()
lastCheckTime.Time = metric.Time()
serviceObject := transit.MonitoredService{
BaseInfo: transit.BaseInfo{
Name: service,
Type: transit.ResourceTypeService,
Owner: resource,
Name: service,
Type: transit.ResourceTypeService,
Owner: resource,
Properties: make(map[string]transit.TypedValue),
},
MonitoredInfo: transit.MonitoredInfo{
Status: transit.MonitorStatus(status),
LastCheckTime: lastCheckTime,
NextCheckTime: lastCheckTime, // if not added, GW will make this as LastCheckTime + 5 mins
LastPluginOutput: message,
Status: transit.MonitorStatus(g.DefaultServiceState),
LastCheckTime: lastCheckTime,
NextCheckTime: lastCheckTime, // if not added, GW will make this as LastCheckTime + 5 mins
},
Metrics: nil,
}
for _, value := range metric.FieldList() {
var thresholds []transit.ThresholdValue
if warningPresent {
thresholds = append(thresholds, transit.ThresholdValue{
SampleType: transit.Warning,
Label: value.Key + "_wn",
Value: &transit.TypedValue{
ValueType: transit.DoubleType,
DoubleValue: &warning,
},
})
knownKey := func(t string) bool {
if strings.HasSuffix(t, "_cr") ||
strings.HasSuffix(t, "_wn") ||
t == "critical" ||
t == "warning" ||
t == g.GroupTag ||
t == g.ResourceTag ||
t == "service" ||
t == "status" ||
t == "message" ||
t == "unitType" {
return true
}
if criticalPresent {
thresholds = append(thresholds, transit.ThresholdValue{
SampleType: transit.Critical,
Label: value.Key + "_cr",
Value: &transit.TypedValue{
ValueType: transit.DoubleType,
DoubleValue: &critical,
},
})
return false
}
for _, tag := range metric.TagList() {
if knownKey(tag.Key) {
continue
}
serviceObject.Properties[tag.Key] = *transit.NewTypedValue(tag.Value)
}
for _, field := range metric.FieldList() {
if knownKey(field.Key) {
continue
}
typedValue := transit.NewTypedValue(value.Value)
if typedValue == nil {
g.Log.Warnf("could not convert type %T, skipping field %s: %v", value.Value, value.Key, value.Value)
switch field.Value.(type) {
case string, []byte:
g.Log.Warnf("string values are not supported, skipping field %s: %q", field.Key, field.Value)
continue
}
if typedValue.ValueType == transit.StringType {
g.Log.Warnf("string values are not supported, skipping field %s: %q", value.Key, value.Value)
typedValue := transit.NewTypedValue(field.Value)
if typedValue == nil {
g.Log.Warnf("could not convert type %T, skipping field %s: %v", field.Value, field.Key, field.Value)
continue
}
var thresholds []transit.ThresholdValue
addCriticalThreshold := func(v interface{}) {
if tv := transit.NewTypedValue(v); tv != nil {
thresholds = append(thresholds, transit.ThresholdValue{
SampleType: transit.Critical,
Label: field.Key + "_cr",
Value: tv,
})
}
}
addWarningThreshold := func(v interface{}) {
if tv := transit.NewTypedValue(v); tv != nil {
thresholds = append(thresholds, transit.ThresholdValue{
SampleType: transit.Warning,
Label: field.Key + "_wn",
Value: tv,
})
}
}
if v, ok := metric.GetTag(field.Key + "_cr"); ok {
if v, err := strconv.ParseFloat(v, 64); err == nil {
addCriticalThreshold(v)
}
} else if v, ok := metric.GetTag("critical"); ok {
if v, err := strconv.ParseFloat(v, 64); err == nil {
addCriticalThreshold(v)
}
} else if v, ok := metric.GetField(field.Key + "_cr"); ok {
addCriticalThreshold(v)
}
if v, ok := metric.GetTag(field.Key + "_wn"); ok {
if v, err := strconv.ParseFloat(v, 64); err == nil {
addWarningThreshold(v)
}
} else if v, ok := metric.GetTag("warning"); ok {
if v, err := strconv.ParseFloat(v, 64); err == nil {
addWarningThreshold(v)
}
} else if v, ok := metric.GetField(field.Key + "_wn"); ok {
addWarningThreshold(v)
}
serviceObject.Metrics = append(serviceObject.Metrics, transit.TimeSeries{
MetricName: value.Key,
MetricName: field.Key,
SampleType: transit.Value,
Interval: &transit.TimeInterval{
EndTime: lastCheckTime,
},
Interval: &transit.TimeInterval{EndTime: lastCheckTime},
Value: typedValue,
Unit: transit.UnitType(unitType),
Thresholds: thresholds,
})
}
if !statusPresent {
serviceStatus, err := transit.CalculateServiceStatus(&serviceObject.Metrics)
if m, ok := metric.GetTag("message"); ok {
serviceObject.LastPluginOutput = m
} else if m, ok := metric.GetField("message"); ok {
switch m := m.(type) {
case string:
serviceObject.LastPluginOutput = m
case []byte:
serviceObject.LastPluginOutput = string(m)
default:
serviceObject.LastPluginOutput = fmt.Sprintf("%v", m)
}
}
func() {
if s, ok := metric.GetTag("status"); ok && validStatus(s) {
serviceObject.Status = transit.MonitorStatus(s)
return
}
if s, ok := metric.GetField("status"); ok {
status := g.DefaultServiceState
switch s := s.(type) {
case string:
status = s
case []byte:
status = string(s)
}
if validStatus(status) {
serviceObject.Status = transit.MonitorStatus(status)
return
}
}
status, err := transit.CalculateServiceStatus(&serviceObject.Metrics)
if err != nil {
g.Log.Infof("could not calculate service status, reverting to default_service_state: %v", err)
serviceObject.Status = transit.MonitorStatus(g.DefaultServiceState)
status = transit.MonitorStatus(g.DefaultServiceState)
}
serviceObject.Status = serviceStatus
}
serviceObject.Status = status
}()
return metricMeta{resource: resource, group: group}, &serviceObject, nil
}

View File

@ -9,6 +9,7 @@ import (
"testing"
"github.com/gwos/tcg/sdk/clients"
"github.com/gwos/tcg/sdk/transit"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
@ -32,23 +33,25 @@ func TestWriteWithDefaults(t *testing.T) {
require.NoError(t, err)
// Decode body to use in assertions below
var obj groundworkObject
var obj transit.ResourcesWithServicesRequest
err = json.Unmarshal(body, &obj)
require.NoError(t, err)
// Check if server gets valid metrics object
// Check if server gets proper data
require.Equal(t, defaultTestAgentID, obj.Context.AgentID)
require.Equal(t, customAppType, obj.Context.AppType)
require.Equal(t, defaultHost, obj.Resources[0].Name)
require.Equal(t, transit.MonitorStatus("SERVICE_OK"), obj.Resources[0].Services[0].Status)
require.Equal(t, "IntMetric", obj.Resources[0].Services[0].Name)
require.Equal(t, int64(42), obj.Resources[0].Services[0].Metrics[0].Value.IntegerValue)
require.Equal(t, int64(42), *obj.Resources[0].Services[0].Metrics[0].Value.IntegerValue)
require.Equal(t, 0, len(obj.Groups))
_, err = fmt.Fprintln(w, `OK`)
_, err = fmt.Fprintln(w, "OK")
require.NoError(t, err)
}))
i := Groundwork{
Log: testutil.Logger{},
Server: server.URL,
AgentID: defaultTestAgentID,
DefaultHost: defaultHost,
@ -68,11 +71,13 @@ func TestWriteWithDefaults(t *testing.T) {
defer server.Close()
}
func TestWriteWithTags(t *testing.T) {
// Generate test metric with tags to test Write logic
func TestWriteWithFields(t *testing.T) {
// Generate test metric with fields to test Write logic
floatMetric := testutil.TestMetric(1.0, "FloatMetric")
floatMetric.AddTag("host", "Host01")
floatMetric.AddTag("group", "Group01")
floatMetric.AddField("value_cr", 3.0)
floatMetric.AddField("value_wn", 2.0)
floatMetric.AddField("message", "Test Message")
floatMetric.AddField("status", "SERVICE_WARNING")
// Simulate Groundwork server that should receive custom metrics
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@ -80,24 +85,23 @@ func TestWriteWithTags(t *testing.T) {
require.NoError(t, err)
// Decode body to use in assertions below
var obj groundworkObject
var obj transit.ResourcesWithServicesRequest
err = json.Unmarshal(body, &obj)
require.NoError(t, err)
// Check if server gets valid metrics object
require.Equal(t, defaultTestAgentID, obj.Context.AgentID)
require.Equal(t, defaultAppType, obj.Context.AppType)
require.Equal(t, "Host01", obj.Resources[0].Name)
require.Equal(t, "FloatMetric", obj.Resources[0].Services[0].Name)
require.Equal(t, 1.0, obj.Resources[0].Services[0].Metrics[0].Value.DoubleValue)
require.Equal(t, "Group01", obj.Groups[0].GroupName)
require.Equal(t, "Host01", obj.Groups[0].Resources[0].Name)
// Check if server gets proper data
require.Equal(t, "Test Message", obj.Resources[0].Services[0].LastPluginOutput)
require.Equal(t, transit.MonitorStatus("SERVICE_WARNING"), obj.Resources[0].Services[0].Status)
require.Equal(t, float64(1.0), *obj.Resources[0].Services[0].Metrics[0].Value.DoubleValue)
require.Equal(t, float64(3.0), *obj.Resources[0].Services[0].Metrics[0].Thresholds[0].Value.DoubleValue)
require.Equal(t, float64(2.0), *obj.Resources[0].Services[0].Metrics[0].Thresholds[1].Value.DoubleValue)
_, err = fmt.Fprintln(w, `OK`)
_, err = fmt.Fprintln(w, "OK")
require.NoError(t, err)
}))
i := Groundwork{
Log: testutil.Logger{},
Server: server.URL,
AgentID: defaultTestAgentID,
DefaultHost: defaultHost,
@ -119,29 +123,71 @@ func TestWriteWithTags(t *testing.T) {
defer server.Close()
}
type groundworkObject struct {
Context struct {
AgentID string `json:"agentId"`
AppType string `json:"appType"`
} `json:"context"`
Resources []struct {
Name string `json:"name"`
Services []struct {
Name string `json:"name"`
Metrics []struct {
Value struct {
DoubleValue float64 `json:"doubleValue"`
IntegerValue int64 `json:"integerValue"`
} `json:"value"`
}
} `json:"services"`
} `json:"resources"`
Groups []struct {
Type string `json:"type"`
GroupName string `json:"groupName"`
Resources []struct {
Name string `json:"name"`
Type string `json:"type"`
} `json:"resources"`
} `json:"groups"`
func TestWriteWithTags(t *testing.T) {
// Generate test metric with tags to test Write logic
floatMetric := testutil.TestMetric(1.0, "FloatMetric")
floatMetric.AddField("value_cr", 3.0)
floatMetric.AddField("value_wn", 2.0)
floatMetric.AddField("message", "Test Message")
floatMetric.AddField("status", "SERVICE_WARNING")
floatMetric.AddTag("value_cr", "9.0")
floatMetric.AddTag("value_wn", "6.0")
floatMetric.AddTag("message", "Test Tag")
floatMetric.AddTag("status", "SERVICE_PENDING")
floatMetric.AddTag("group-tag", "Group01")
floatMetric.AddTag("resource-tag", "Host01")
floatMetric.AddTag("service", "Service01")
floatMetric.AddTag("facility", "FACILITY")
floatMetric.AddTag("severity", "SEVERITY")
// Simulate Groundwork server that should receive custom metrics
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
require.NoError(t, err)
// Decode body to use in assertions below
var obj transit.ResourcesWithServicesRequest
err = json.Unmarshal(body, &obj)
require.NoError(t, err)
// Check if server gets proper data
require.Equal(t, defaultTestAgentID, obj.Context.AgentID)
require.Equal(t, defaultAppType, obj.Context.AppType)
require.Equal(t, "Host01", obj.Resources[0].Name)
require.Equal(t, "Service01", obj.Resources[0].Services[0].Name)
require.Equal(t, "FACILITY", *obj.Resources[0].Services[0].Properties["facility"].StringValue)
require.Equal(t, "SEVERITY", *obj.Resources[0].Services[0].Properties["severity"].StringValue)
require.Equal(t, "Group01", obj.Groups[0].GroupName)
require.Equal(t, "Host01", obj.Groups[0].Resources[0].Name)
require.Equal(t, "Test Tag", obj.Resources[0].Services[0].LastPluginOutput)
require.Equal(t, transit.MonitorStatus("SERVICE_PENDING"), obj.Resources[0].Services[0].Status)
require.Equal(t, float64(1.0), *obj.Resources[0].Services[0].Metrics[0].Value.DoubleValue)
require.Equal(t, float64(9.0), *obj.Resources[0].Services[0].Metrics[0].Thresholds[0].Value.DoubleValue)
require.Equal(t, float64(6.0), *obj.Resources[0].Services[0].Metrics[0].Thresholds[1].Value.DoubleValue)
_, err = fmt.Fprintln(w, "OK")
require.NoError(t, err)
}))
i := Groundwork{
Log: testutil.Logger{},
Server: server.URL,
AgentID: defaultTestAgentID,
DefaultHost: defaultHost,
DefaultAppType: defaultAppType,
GroupTag: "group-tag",
ResourceTag: "resource-tag",
client: clients.GWClient{
AppName: "telegraf",
AppType: defaultAppType,
GWConnection: &clients.GWConnection{
HostName: server.URL,
},
},
}
err := i.Write([]telegraf.Metric{floatMetric})
require.NoError(t, err)
defer server.Close()
}