From 678e6e7a8c328585cfc8424971f0cebfcae74a74 Mon Sep 17 00:00:00 2001 From: Pavlo Sumkin Date: Wed, 20 Jul 2022 22:05:01 +0300 Subject: [PATCH] feat(outputs.groundwork): Improve metric parsing to extend output (#11443) --- go.mod | 2 +- go.sum | 4 +- plugins/outputs/groundwork/README.md | 29 ++- plugins/outputs/groundwork/groundwork.go | 212 +++++++++++------- plugins/outputs/groundwork/groundwork_test.go | 132 +++++++---- 5 files changed, 243 insertions(+), 136 deletions(-) diff --git a/go.mod b/go.mod index e4278b8be..4b646c44e 100644 --- a/go.mod +++ b/go.mod @@ -79,7 +79,7 @@ require ( github.com/gorilla/websocket v1.5.0 github.com/gosnmp/gosnmp v1.34.0 github.com/grid-x/modbus v0.0.0-20211113184042-7f2251c342c9 - github.com/gwos/tcg/sdk v0.0.0-20211223101342-35fbd1ae683c + github.com/gwos/tcg/sdk v0.0.0-20220621192633-df0eac0a1a4c github.com/harlow/kinesis-consumer v0.3.6-0.20210911031324-5a873d6e9fec github.com/hashicorp/consul/api v1.12.0 github.com/hashicorp/go-uuid v1.0.2 diff --git a/go.sum b/go.sum index 71269dacd..cce037676 100644 --- a/go.sum +++ b/go.sum @@ -1197,8 +1197,8 @@ github.com/grpc-ecosystem/grpc-gateway v1.14.5/go.mod h1:UJ0EZAp832vCd54Wev9N1BM github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw= -github.com/gwos/tcg/sdk v0.0.0-20211223101342-35fbd1ae683c h1:befb5xGUwNCoBuN/akLFCKekUzr0ixyws3aAX/7TaOk= -github.com/gwos/tcg/sdk v0.0.0-20211223101342-35fbd1ae683c/go.mod h1:OjlJNRXwlEjznVfU3YtLWH8FyM7KWHUevXDI47UeZeM= +github.com/gwos/tcg/sdk v0.0.0-20220621192633-df0eac0a1a4c h1:pVr0TkSFnMP4BWSsEak/4bxD8/K+foJ9V8DGyZ6PIDE= +github.com/gwos/tcg/sdk v0.0.0-20220621192633-df0eac0a1a4c/go.mod h1:4yzxLBACr76Is0AMAkE0F/fqWBk28p2tzeO06yDGR/Y= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= github.com/harlow/kinesis-consumer v0.3.6-0.20210911031324-5a873d6e9fec h1:ya+kv1eNnd5QhcHuaj5g5eMq5Ra3VCNaPY2ZI7Aq91o= diff --git a/plugins/outputs/groundwork/README.md b/plugins/outputs/groundwork/README.md index f6853365e..305897da6 100644 --- a/plugins/outputs/groundwork/README.md +++ b/plugins/outputs/groundwork/README.md @@ -38,19 +38,26 @@ GW8+ ## List of tags used by the plugin -* group - to define the name of the group you want to monitor, can be changed - with config. -* host - to define the name of the host you want to monitor, can be changed with - config. -* service - to define the name of the service you want to monitor. -* status - to define the status of the service. Supported statuses: +* __group__ - to define the name of the group you want to monitor, + can be changed with config. +* __host__ - to define the name of the host you want to monitor, + can be changed with config. +* __service__ - to define the name of the service you want to monitor. +* __status__ - to define the status of the service. Supported statuses: "SERVICE_OK", "SERVICE_WARNING", "SERVICE_UNSCHEDULED_CRITICAL", "SERVICE_PENDING", "SERVICE_SCHEDULED_CRITICAL", "SERVICE_UNKNOWN". -* message - to provide any message you want. -* unitType - to use in monitoring contexts(subset of The Unified Code for Units - of Measure standard). Supported types: "1", "%cpu", "KB", "GB", "MB". -* warning - to define warning threshold value. -* critical - to define critical threshold value. +* __message__ - to provide any message you want, + it overrides __message__ field value. +* __unitType__ - to use in monitoring contexts (subset of The Unified Code for + Units of Measure standard). Supported types: "1", "%cpu", "KB", "GB", "MB". +* __critical__ - to define the default critical threshold value, + it overrides value_cr field value. +* __warning__ - to define the default warning threshold value, + it overrides value_wn field value. +* __value_cr__ - to define critical threshold value, + it overrides __critical__ tag value and __value_cr__ field value. +* __value_wn__ - to define warning threshold value, + it overrides __warning__ tag value and __value_wn__ field value. ## NOTE diff --git a/plugins/outputs/groundwork/groundwork.go b/plugins/outputs/groundwork/groundwork.go index 28a62353d..7c0998ba0 100644 --- a/plugins/outputs/groundwork/groundwork.go +++ b/plugins/outputs/groundwork/groundwork.go @@ -9,6 +9,7 @@ import ( "errors" "fmt" "strconv" + "strings" "github.com/gwos/tcg/sdk/clients" "github.com/gwos/tcg/sdk/logper" @@ -48,28 +49,28 @@ func (*Groundwork) SampleConfig() string { func (g *Groundwork) Init() error { if g.Server == "" { - return errors.New("no 'url' provided") + return errors.New(`no "url" provided`) } if g.AgentID == "" { - return errors.New("no 'agent_id' provided") + return errors.New(`no "agent_id" provided`) } if g.Username == "" { - return errors.New("no 'username' provided") + return errors.New(`no "username" provided`) } if g.Password == "" { - return errors.New("no 'password' provided") + return errors.New(`no "password" provided`) } if g.DefaultAppType == "" { - return errors.New("no 'default_app_type' provided") + return errors.New(`no "default_app_type" provided`) } if g.DefaultHost == "" { - return errors.New("no 'default_host' provided") + return errors.New(`no "default_host" provided`) } if g.ResourceTag == "" { - return errors.New("no 'resource_tag' provided") + return errors.New(`no "resource_tag" provided`) } if !validStatus(g.DefaultServiceState) { - return errors.New("invalid 'default_service_state' provided") + return errors.New(`invalid "default_service_state" provided`) } g.client = clients.GWClient{ @@ -214,114 +215,167 @@ func (g *Groundwork) parseMetric(metric telegraf.Metric) (metricMeta, *transit.M group, _ := metric.GetTag(g.GroupTag) resource := g.DefaultHost - if value, present := metric.GetTag(g.ResourceTag); present { - resource = value + if v, ok := metric.GetTag(g.ResourceTag); ok { + resource = v } service := metric.Name() - if value, present := metric.GetTag("service"); present { - service = value + if v, ok := metric.GetTag("service"); ok { + service = v } - status := g.DefaultServiceState - value, statusPresent := metric.GetTag("status") - if validStatus(value) { - status = value - } - - message, _ := metric.GetTag("message") - unitType := string(transit.UnitCounter) - if value, present := metric.GetTag("unitType"); present { - unitType = value - } - - var critical float64 - value, criticalPresent := metric.GetTag("critical") - if criticalPresent { - if s, err := strconv.ParseFloat(value, 64); err == nil { - critical = s - } - } - - var warning float64 - value, warningPresent := metric.GetTag("warning") - if warningPresent { - if s, err := strconv.ParseFloat(value, 64); err == nil { - warning = s - } + if v, ok := metric.GetTag("unitType"); ok { + unitType = v } lastCheckTime := transit.NewTimestamp() lastCheckTime.Time = metric.Time() serviceObject := transit.MonitoredService{ BaseInfo: transit.BaseInfo{ - Name: service, - Type: transit.ResourceTypeService, - Owner: resource, + Name: service, + Type: transit.ResourceTypeService, + Owner: resource, + Properties: make(map[string]transit.TypedValue), }, MonitoredInfo: transit.MonitoredInfo{ - Status: transit.MonitorStatus(status), - LastCheckTime: lastCheckTime, - NextCheckTime: lastCheckTime, // if not added, GW will make this as LastCheckTime + 5 mins - LastPluginOutput: message, + Status: transit.MonitorStatus(g.DefaultServiceState), + LastCheckTime: lastCheckTime, + NextCheckTime: lastCheckTime, // if not added, GW will make this as LastCheckTime + 5 mins }, Metrics: nil, } - for _, value := range metric.FieldList() { - var thresholds []transit.ThresholdValue - if warningPresent { - thresholds = append(thresholds, transit.ThresholdValue{ - SampleType: transit.Warning, - Label: value.Key + "_wn", - Value: &transit.TypedValue{ - ValueType: transit.DoubleType, - DoubleValue: &warning, - }, - }) + knownKey := func(t string) bool { + if strings.HasSuffix(t, "_cr") || + strings.HasSuffix(t, "_wn") || + t == "critical" || + t == "warning" || + t == g.GroupTag || + t == g.ResourceTag || + t == "service" || + t == "status" || + t == "message" || + t == "unitType" { + return true } - if criticalPresent { - thresholds = append(thresholds, transit.ThresholdValue{ - SampleType: transit.Critical, - Label: value.Key + "_cr", - Value: &transit.TypedValue{ - ValueType: transit.DoubleType, - DoubleValue: &critical, - }, - }) + return false + } + + for _, tag := range metric.TagList() { + if knownKey(tag.Key) { + continue + } + serviceObject.Properties[tag.Key] = *transit.NewTypedValue(tag.Value) + } + + for _, field := range metric.FieldList() { + if knownKey(field.Key) { + continue } - typedValue := transit.NewTypedValue(value.Value) - if typedValue == nil { - g.Log.Warnf("could not convert type %T, skipping field %s: %v", value.Value, value.Key, value.Value) + switch field.Value.(type) { + case string, []byte: + g.Log.Warnf("string values are not supported, skipping field %s: %q", field.Key, field.Value) continue } - if typedValue.ValueType == transit.StringType { - g.Log.Warnf("string values are not supported, skipping field %s: %q", value.Key, value.Value) + + typedValue := transit.NewTypedValue(field.Value) + if typedValue == nil { + g.Log.Warnf("could not convert type %T, skipping field %s: %v", field.Value, field.Key, field.Value) continue } + var thresholds []transit.ThresholdValue + addCriticalThreshold := func(v interface{}) { + if tv := transit.NewTypedValue(v); tv != nil { + thresholds = append(thresholds, transit.ThresholdValue{ + SampleType: transit.Critical, + Label: field.Key + "_cr", + Value: tv, + }) + } + } + addWarningThreshold := func(v interface{}) { + if tv := transit.NewTypedValue(v); tv != nil { + thresholds = append(thresholds, transit.ThresholdValue{ + SampleType: transit.Warning, + Label: field.Key + "_wn", + Value: tv, + }) + } + } + if v, ok := metric.GetTag(field.Key + "_cr"); ok { + if v, err := strconv.ParseFloat(v, 64); err == nil { + addCriticalThreshold(v) + } + } else if v, ok := metric.GetTag("critical"); ok { + if v, err := strconv.ParseFloat(v, 64); err == nil { + addCriticalThreshold(v) + } + } else if v, ok := metric.GetField(field.Key + "_cr"); ok { + addCriticalThreshold(v) + } + if v, ok := metric.GetTag(field.Key + "_wn"); ok { + if v, err := strconv.ParseFloat(v, 64); err == nil { + addWarningThreshold(v) + } + } else if v, ok := metric.GetTag("warning"); ok { + if v, err := strconv.ParseFloat(v, 64); err == nil { + addWarningThreshold(v) + } + } else if v, ok := metric.GetField(field.Key + "_wn"); ok { + addWarningThreshold(v) + } + serviceObject.Metrics = append(serviceObject.Metrics, transit.TimeSeries{ - MetricName: value.Key, + MetricName: field.Key, SampleType: transit.Value, - Interval: &transit.TimeInterval{ - EndTime: lastCheckTime, - }, + Interval: &transit.TimeInterval{EndTime: lastCheckTime}, Value: typedValue, Unit: transit.UnitType(unitType), Thresholds: thresholds, }) } - if !statusPresent { - serviceStatus, err := transit.CalculateServiceStatus(&serviceObject.Metrics) + if m, ok := metric.GetTag("message"); ok { + serviceObject.LastPluginOutput = m + } else if m, ok := metric.GetField("message"); ok { + switch m := m.(type) { + case string: + serviceObject.LastPluginOutput = m + case []byte: + serviceObject.LastPluginOutput = string(m) + default: + serviceObject.LastPluginOutput = fmt.Sprintf("%v", m) + } + } + + func() { + if s, ok := metric.GetTag("status"); ok && validStatus(s) { + serviceObject.Status = transit.MonitorStatus(s) + return + } + if s, ok := metric.GetField("status"); ok { + status := g.DefaultServiceState + switch s := s.(type) { + case string: + status = s + case []byte: + status = string(s) + } + if validStatus(status) { + serviceObject.Status = transit.MonitorStatus(status) + return + } + } + status, err := transit.CalculateServiceStatus(&serviceObject.Metrics) if err != nil { g.Log.Infof("could not calculate service status, reverting to default_service_state: %v", err) - serviceObject.Status = transit.MonitorStatus(g.DefaultServiceState) + status = transit.MonitorStatus(g.DefaultServiceState) } - serviceObject.Status = serviceStatus - } + serviceObject.Status = status + }() return metricMeta{resource: resource, group: group}, &serviceObject, nil } diff --git a/plugins/outputs/groundwork/groundwork_test.go b/plugins/outputs/groundwork/groundwork_test.go index 86b0870e1..144d5c3e0 100644 --- a/plugins/outputs/groundwork/groundwork_test.go +++ b/plugins/outputs/groundwork/groundwork_test.go @@ -9,6 +9,7 @@ import ( "testing" "github.com/gwos/tcg/sdk/clients" + "github.com/gwos/tcg/sdk/transit" "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" @@ -32,23 +33,25 @@ func TestWriteWithDefaults(t *testing.T) { require.NoError(t, err) // Decode body to use in assertions below - var obj groundworkObject + var obj transit.ResourcesWithServicesRequest err = json.Unmarshal(body, &obj) require.NoError(t, err) - // Check if server gets valid metrics object + // Check if server gets proper data require.Equal(t, defaultTestAgentID, obj.Context.AgentID) require.Equal(t, customAppType, obj.Context.AppType) require.Equal(t, defaultHost, obj.Resources[0].Name) + require.Equal(t, transit.MonitorStatus("SERVICE_OK"), obj.Resources[0].Services[0].Status) require.Equal(t, "IntMetric", obj.Resources[0].Services[0].Name) - require.Equal(t, int64(42), obj.Resources[0].Services[0].Metrics[0].Value.IntegerValue) + require.Equal(t, int64(42), *obj.Resources[0].Services[0].Metrics[0].Value.IntegerValue) require.Equal(t, 0, len(obj.Groups)) - _, err = fmt.Fprintln(w, `OK`) + _, err = fmt.Fprintln(w, "OK") require.NoError(t, err) })) i := Groundwork{ + Log: testutil.Logger{}, Server: server.URL, AgentID: defaultTestAgentID, DefaultHost: defaultHost, @@ -68,11 +71,13 @@ func TestWriteWithDefaults(t *testing.T) { defer server.Close() } -func TestWriteWithTags(t *testing.T) { - // Generate test metric with tags to test Write logic +func TestWriteWithFields(t *testing.T) { + // Generate test metric with fields to test Write logic floatMetric := testutil.TestMetric(1.0, "FloatMetric") - floatMetric.AddTag("host", "Host01") - floatMetric.AddTag("group", "Group01") + floatMetric.AddField("value_cr", 3.0) + floatMetric.AddField("value_wn", 2.0) + floatMetric.AddField("message", "Test Message") + floatMetric.AddField("status", "SERVICE_WARNING") // Simulate Groundwork server that should receive custom metrics server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -80,24 +85,23 @@ func TestWriteWithTags(t *testing.T) { require.NoError(t, err) // Decode body to use in assertions below - var obj groundworkObject + var obj transit.ResourcesWithServicesRequest err = json.Unmarshal(body, &obj) require.NoError(t, err) - // Check if server gets valid metrics object - require.Equal(t, defaultTestAgentID, obj.Context.AgentID) - require.Equal(t, defaultAppType, obj.Context.AppType) - require.Equal(t, "Host01", obj.Resources[0].Name) - require.Equal(t, "FloatMetric", obj.Resources[0].Services[0].Name) - require.Equal(t, 1.0, obj.Resources[0].Services[0].Metrics[0].Value.DoubleValue) - require.Equal(t, "Group01", obj.Groups[0].GroupName) - require.Equal(t, "Host01", obj.Groups[0].Resources[0].Name) + // Check if server gets proper data + require.Equal(t, "Test Message", obj.Resources[0].Services[0].LastPluginOutput) + require.Equal(t, transit.MonitorStatus("SERVICE_WARNING"), obj.Resources[0].Services[0].Status) + require.Equal(t, float64(1.0), *obj.Resources[0].Services[0].Metrics[0].Value.DoubleValue) + require.Equal(t, float64(3.0), *obj.Resources[0].Services[0].Metrics[0].Thresholds[0].Value.DoubleValue) + require.Equal(t, float64(2.0), *obj.Resources[0].Services[0].Metrics[0].Thresholds[1].Value.DoubleValue) - _, err = fmt.Fprintln(w, `OK`) + _, err = fmt.Fprintln(w, "OK") require.NoError(t, err) })) i := Groundwork{ + Log: testutil.Logger{}, Server: server.URL, AgentID: defaultTestAgentID, DefaultHost: defaultHost, @@ -119,29 +123,71 @@ func TestWriteWithTags(t *testing.T) { defer server.Close() } -type groundworkObject struct { - Context struct { - AgentID string `json:"agentId"` - AppType string `json:"appType"` - } `json:"context"` - Resources []struct { - Name string `json:"name"` - Services []struct { - Name string `json:"name"` - Metrics []struct { - Value struct { - DoubleValue float64 `json:"doubleValue"` - IntegerValue int64 `json:"integerValue"` - } `json:"value"` - } - } `json:"services"` - } `json:"resources"` - Groups []struct { - Type string `json:"type"` - GroupName string `json:"groupName"` - Resources []struct { - Name string `json:"name"` - Type string `json:"type"` - } `json:"resources"` - } `json:"groups"` +func TestWriteWithTags(t *testing.T) { + // Generate test metric with tags to test Write logic + floatMetric := testutil.TestMetric(1.0, "FloatMetric") + floatMetric.AddField("value_cr", 3.0) + floatMetric.AddField("value_wn", 2.0) + floatMetric.AddField("message", "Test Message") + floatMetric.AddField("status", "SERVICE_WARNING") + floatMetric.AddTag("value_cr", "9.0") + floatMetric.AddTag("value_wn", "6.0") + floatMetric.AddTag("message", "Test Tag") + floatMetric.AddTag("status", "SERVICE_PENDING") + floatMetric.AddTag("group-tag", "Group01") + floatMetric.AddTag("resource-tag", "Host01") + floatMetric.AddTag("service", "Service01") + floatMetric.AddTag("facility", "FACILITY") + floatMetric.AddTag("severity", "SEVERITY") + + // Simulate Groundwork server that should receive custom metrics + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := ioutil.ReadAll(r.Body) + require.NoError(t, err) + + // Decode body to use in assertions below + var obj transit.ResourcesWithServicesRequest + err = json.Unmarshal(body, &obj) + require.NoError(t, err) + + // Check if server gets proper data + require.Equal(t, defaultTestAgentID, obj.Context.AgentID) + require.Equal(t, defaultAppType, obj.Context.AppType) + require.Equal(t, "Host01", obj.Resources[0].Name) + require.Equal(t, "Service01", obj.Resources[0].Services[0].Name) + require.Equal(t, "FACILITY", *obj.Resources[0].Services[0].Properties["facility"].StringValue) + require.Equal(t, "SEVERITY", *obj.Resources[0].Services[0].Properties["severity"].StringValue) + require.Equal(t, "Group01", obj.Groups[0].GroupName) + require.Equal(t, "Host01", obj.Groups[0].Resources[0].Name) + require.Equal(t, "Test Tag", obj.Resources[0].Services[0].LastPluginOutput) + require.Equal(t, transit.MonitorStatus("SERVICE_PENDING"), obj.Resources[0].Services[0].Status) + require.Equal(t, float64(1.0), *obj.Resources[0].Services[0].Metrics[0].Value.DoubleValue) + require.Equal(t, float64(9.0), *obj.Resources[0].Services[0].Metrics[0].Thresholds[0].Value.DoubleValue) + require.Equal(t, float64(6.0), *obj.Resources[0].Services[0].Metrics[0].Thresholds[1].Value.DoubleValue) + + _, err = fmt.Fprintln(w, "OK") + require.NoError(t, err) + })) + + i := Groundwork{ + Log: testutil.Logger{}, + Server: server.URL, + AgentID: defaultTestAgentID, + DefaultHost: defaultHost, + DefaultAppType: defaultAppType, + GroupTag: "group-tag", + ResourceTag: "resource-tag", + client: clients.GWClient{ + AppName: "telegraf", + AppType: defaultAppType, + GWConnection: &clients.GWConnection{ + HostName: server.URL, + }, + }, + } + + err := i.Write([]telegraf.Metric{floatMetric}) + require.NoError(t, err) + + defer server.Close() }