diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index b22275f2b..dd13ced12 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -87,6 +87,7 @@ following works: - github.com/cenkalti/backoff [MIT License](https://github.com/cenkalti/backoff/blob/master/LICENSE) - github.com/cespare/xxhash [MIT License](https://github.com/cespare/xxhash/blob/master/LICENSE.txt) - github.com/cisco-ie/nx-telemetry-proto [Apache License 2.0](https://github.com/cisco-ie/nx-telemetry-proto/blob/master/LICENSE) +- github.com/clarify/clarify-go [Apache License 2.0](https://github.com/clarify/clarify-go/blob/master/LICENSE) - github.com/cloudevents/sdk-go [Apache License 2.0](https://github.com/cloudevents/sdk-go/blob/main/LICENSE) - github.com/containerd/containerd [Apache License 2.0](https://github.com/containerd/containerd/blob/master/LICENSE) - github.com/coocood/freecache [MIT License](https://github.com/coocood/freecache/blob/master/LICENSE) diff --git a/go.mod b/go.mod index a4c1fe733..fdb575862 100644 --- a/go.mod +++ b/go.mod @@ -53,6 +53,7 @@ require ( github.com/bmatcuk/doublestar/v3 v3.0.0 github.com/caio/go-tdigest v3.1.0+incompatible github.com/cisco-ie/nx-telemetry-proto v0.0.0-20230117155933-f64c045c77df + github.com/clarify/clarify-go v0.2.4 github.com/coocood/freecache v1.2.3 github.com/coreos/go-semver v0.3.1 github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f diff --git a/go.sum b/go.sum index eeb795848..534311fc7 100644 --- a/go.sum +++ b/go.sum @@ -398,6 +398,8 @@ github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6D github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= github.com/cisco-ie/nx-telemetry-proto v0.0.0-20230117155933-f64c045c77df h1:GmrltUp5Qf5XhT+LmqMDizsgm/6VHTSxPWRdrq21yRo= github.com/cisco-ie/nx-telemetry-proto v0.0.0-20230117155933-f64c045c77df/go.mod h1:rJDd05J5hqWVU9MjJ+5jw1CuLn/jRhvU0xtFEzzqjwM= +github.com/clarify/clarify-go v0.2.4 h1:4MH6UHS3PFSNeitAkS/k3ur6ASxZpiRa6EezkbCVLVs= +github.com/clarify/clarify-go v0.2.4/go.mod h1:bdKwACxI2WZMdlFQOun4J8H5wG0Dbn1bKWelgxsDJaM= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cloudevents/sdk-go/v2 v2.14.0 h1:Nrob4FwVgi5L4tV9lhjzZcjYqFVyJzsA56CwPaPfv6s= github.com/cloudevents/sdk-go/v2 v2.14.0/go.mod h1:xDmKfzNjM8gBvjaF8ijFjM1VYOVUEeUfapHMUX1T5To= diff --git a/internal/type_conversions.go b/internal/type_conversions.go index e2506a906..464e464c1 100644 --- a/internal/type_conversions.go +++ b/internal/type_conversions.go @@ -53,6 +53,11 @@ func ToFloat64(value interface{}) (float64, error) { return strconv.ParseFloat(string(v), 64) case fmt.Stringer: return strconv.ParseFloat(v.String(), 64) + case bool: + if v { + return float64(1), nil + } + return float64(0), nil case int: return float64(v), nil case int8: @@ -91,6 +96,11 @@ func ToInt64(value interface{}) (int64, error) { return strconv.ParseInt(string(v), 10, 64) case fmt.Stringer: return strconv.ParseInt(v.String(), 10, 64) + case bool: + if v { + return int64(1), nil + } + return int64(0), nil case int: return int64(v), nil case int8: @@ -129,6 +139,11 @@ func ToUint64(value interface{}) (uint64, error) { return strconv.ParseUint(string(v), 10, 64) case fmt.Stringer: return strconv.ParseUint(v.String(), 10, 64) + case bool: + if v { + return uint64(1), nil + } + return uint64(0), nil case int: return uint64(v), nil case int8: diff --git a/plugins/outputs/all/clarify.go b/plugins/outputs/all/clarify.go new file mode 100644 index 000000000..486851872 --- /dev/null +++ b/plugins/outputs/all/clarify.go @@ -0,0 +1,5 @@ +//go:build !custom || outputs || outputs.clarify + +package all + +import _ "github.com/influxdata/telegraf/plugins/outputs/clarify" // register plugin diff --git a/plugins/outputs/clarify/README.md b/plugins/outputs/clarify/README.md new file mode 100644 index 000000000..590be1bc2 --- /dev/null +++ b/plugins/outputs/clarify/README.md @@ -0,0 +1,87 @@ +# Clarify Output Plugin + +This plugin writes to [Clarify][clarify]. To use this plugin you will +need to obtain a set of [credentials][credentials]. + +## Global configuration options + +In addition to the plugin-specific configuration settings, plugins support +additional global and plugin configuration settings. These settings are used to +modify metrics, tags, and field or create aliases and configure ordering, etc. +See the [CONFIGURATION.md][CONFIGURATION.md] for more details. + +[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins + +## Configuration + +```toml @sample.conf +## Configuration to publish Telegraf metrics to Clarify +[[outputs.clarify]] + ## Credentials File (Oauth 2.0 from Clarify integration) + credentials_file = "/path/to/clarify/credentials.json" + + ## Clarify username password (Basic Auth from Clarify integration) + username = "i-am-bob" + password = "secret-password" + + ## Timeout for Clarify operations + # timeout = "20s" + + ## Optional tags to be included when generating the unique ID for a signal in Clarify + # id_tags = [] + # clarify_id_tag = 'clarify_input_id' +``` + +You can use either a credentials file or username/password. +If both are present and valid in the configuration the +credentials file will be used. + +## How Telegraf Metrics map to Clarify signals + +Clarify signal names are formed by joining the Telegraf metric name and the +field key with a `.` character. Telegraf tags are added to signal labels. + +If you wish to specify a specific tag to use as the input id, set the config +option `clarify_id_tag` to the tag containing the id to be used. +If this tag is present and there is only one field present in the metric, +this tag will be used as the inputID in Clarify. If there are more fields +available in the metric, the tag will be ignored and normal id generation +will be used. + +If information from one or several tags is needed to uniquely identify a metric +field, the id_tags array can be added to the config with the needed tag names. +E.g: + +`id_tags = ['sensor']` + +Clarify only supports values that can be converted to floating point numbers. +Strings and invalid numbers are ignored. + +## Example + +The following input would be stored in Clarify with the values shown below: + +```text +temperature,host=demo.clarifylocal,sensor=TC0P value=49 1682670910000000000 +``` + +```json +"signal" { + "id": "temperature.value.TC0P" + "name": "temperature.value" + "labels": { + "host": ["demo.clarifylocal"], + "sensor": ["TC0P"] + } +} +"values" { + "times": ["2023-04-28T08:43:16+00:00"], + "series": { + "temperature.value.TC0P": [49] + } +} +``` + +[clarify]: https://clarify.io +[clarifydoc]: https://docs.clarify.io +[credentials]: https://docs.clarify.io/users/admin/integrations/credentials diff --git a/plugins/outputs/clarify/clarify.go b/plugins/outputs/clarify/clarify.go new file mode 100644 index 000000000..9b6503e03 --- /dev/null +++ b/plugins/outputs/clarify/clarify.go @@ -0,0 +1,190 @@ +//go:generate ../../../tools/readme_config_includer/generator + +package clarify + +import ( + "context" + _ "embed" + "errors" + "fmt" + "strings" + "time" + + "github.com/clarify/clarify-go" + "github.com/clarify/clarify-go/fields" + "github.com/clarify/clarify-go/views" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/outputs" +) + +type Clarify struct { + Username config.Secret `toml:"username"` + Password config.Secret `toml:"password"` + CredentialsFile string `toml:"credentials_file"` + Timeout config.Duration `toml:"timeout"` + IDTags []string `toml:"id_tags"` + ClarifyIDTag string `toml:"clarify_id_tag"` + Log telegraf.Logger `toml:"-"` + + client *clarify.Client +} + +var errIDTooLong = errors.New("id too long (>128)") +var errCredentials = errors.New("only credentials_file OR username/password can be specified") + +const defaultTimeout = config.Duration(20 * time.Second) +const allowedIDRunes = `abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890-_:.#+/` + +//go:embed sample.conf +var sampleConfig string + +func (c *Clarify) Init() error { + if c.Timeout <= 0 { + c.Timeout = defaultTimeout + } + // Not blocking as it doesn't do any http requests, just sets up the necessarry Oauth2 client. + ctx := context.Background() + switch { + case c.CredentialsFile != "": + if !c.Username.Empty() || !c.Password.Empty() { + return errCredentials + } + creds, err := clarify.CredentialsFromFile(c.CredentialsFile) + if err != nil { + return err + } + c.client = creds.Client(ctx) + return nil + case !c.Username.Empty() && !c.Password.Empty(): + username, err := c.Username.Get() + if err != nil { + return fmt.Errorf("getting username failed: %w", err) + } + password, err := c.Password.Get() + if err != nil { + config.ReleaseSecret(username) + return fmt.Errorf("getting password failed: %w", err) + } + creds := clarify.BasicAuthCredentials(string(username), string(password)) + config.ReleaseSecret(username) + config.ReleaseSecret(password) + c.client = creds.Client(ctx) + return nil + } + return errors.New("no credentials provided") +} + +func (c *Clarify) Connect() error { + return nil +} + +func (c *Clarify) Write(metrics []telegraf.Metric) error { + frame, signals := c.processMetrics(metrics) + + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, time.Duration(c.Timeout)) + defer cancel() + + if _, err := c.client.Insert(frame).Do(ctx); err != nil { + return fmt.Errorf("inserting failed: %w", err) + } + + if _, err := c.client.SaveSignals(signals).Do(ctx); err != nil { + return fmt.Errorf("saving signals failed: %w", err) + } + + return nil +} + +func (c *Clarify) processMetrics(metrics []telegraf.Metric) (views.DataFrame, map[string]views.SignalSave) { + signals := make(map[string]views.SignalSave) + frame := views.DataFrame{} + + for _, m := range metrics { + for _, f := range m.FieldList() { + value, err := internal.ToFloat64(f.Value) + if err != nil { + c.Log.Warnf("Skipping field %q of metric %q: %s", f.Key, m.Name(), err.Error()) + continue + } + id, err := c.generateID(m, f) + if err != nil { + c.Log.Warnf("Skipping field %q of metric %q: %s", f.Key, m.Name(), err.Error()) + continue + } + ts := fields.AsTimestamp(m.Time()) + + if _, ok := frame[id]; ok { + frame[id][ts] = value + } else { + frame[id] = views.DataSeries{ts: value} + } + + s := views.SignalSave{} + s.Name = m.Name() + "." + f.Key + + for _, t := range m.TagList() { + labelName := strings.ReplaceAll(t.Key, " ", "-") + labelName = strings.ReplaceAll(labelName, "_", "-") + labelName = strings.ToLower(labelName) + s.Labels.Add(labelName, t.Value) + } + + signals[id] = s + } + } + return frame, signals +} + +func normalizeID(id string) string { + return strings.Map(func(r rune) rune { + if strings.ContainsRune(allowedIDRunes, r) { + return r + } + return '_' + }, id) +} + +func (c *Clarify) generateID(m telegraf.Metric, f *telegraf.Field) (string, error) { + var id string + if c.ClarifyIDTag != "" { + if cid, exist := m.GetTag(c.ClarifyIDTag); exist && len(m.FieldList()) == 1 { + id = cid + } + } + if id == "" { + parts := make([]string, 0, len(c.IDTags)+2) + parts = append(parts, m.Name(), f.Key) + + for _, idTag := range c.IDTags { + if k, found := m.GetTag(idTag); found { + parts = append(parts, k) + } + } + id = strings.Join(parts, ".") + } + id = normalizeID(id) + if len(id) > 128 { + return id, errIDTooLong + } + return id, nil +} + +func (c *Clarify) SampleConfig() string { + return sampleConfig +} + +func (c *Clarify) Close() error { + c.client = nil + return nil +} + +func init() { + outputs.Add("clarify", func() telegraf.Output { + return &Clarify{ + Timeout: defaultTimeout, + } + }) +} diff --git a/plugins/outputs/clarify/clarify_test.go b/plugins/outputs/clarify/clarify_test.go new file mode 100644 index 000000000..2341b1d3f --- /dev/null +++ b/plugins/outputs/clarify/clarify_test.go @@ -0,0 +1,318 @@ +package clarify + +import ( + "context" + "encoding/json" + "errors" + "math" + "testing" + "time" + + "github.com/clarify/clarify-go" + "github.com/clarify/clarify-go/jsonrpc" + "github.com/clarify/clarify-go/views" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" +) + +var errTimeout = errors.New("timeout: operation timed out") + +const validResponse = `{ + "signalsByInput" : { + "test1.value" : { + "id": "c8bvu9fqfsjctpv7b6fg", + "created" : true + } + } +}` + +type MockHandler struct { + jsonResult string + sleep time.Duration +} + +func (m *MockHandler) Do(ctx context.Context, _ jsonrpc.Request, result any) error { + err := json.Unmarshal([]byte(m.jsonResult), result) + if m.sleep > 0 { + timer := time.NewTimer(m.sleep) + select { + case <-ctx.Done(): + timer.Stop() + return errTimeout + case <-timer.C: + timer.Stop() + return nil + } + } + return err +} + +func TestGenerateID(t *testing.T) { + clfy := &Clarify{ + Log: testutil.Logger{}, + IDTags: []string{"tag1", "tag2"}, + ClarifyIDTag: "clarify_input_id", + } + var idTests = []struct { + inMetric telegraf.Metric + outID []string + err error + }{ + { + testutil.MustMetric( + "cpu+='''..2!@#$abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890", + map[string]string{ + "tag1": "78sx", + }, + map[string]interface{}{ + "time_idle": math.NaN(), + }, + time.Now()), + []string{"cpu.time_idle.78sx"}, + errIDTooLong, + }, + { + testutil.MustMetric( + "cpu@@", + map[string]string{ + "tag1": "78sx", + "tag2": "33t2", + }, + map[string]interface{}{ + "time_idle": math.NaN(), + }, + time.Now()), + []string{"cpu__.time_idle.78sx.33t2"}, + nil, + }, + { + testutil.MustMetric( + "temperature", + map[string]string{}, + map[string]interface{}{ + "cpu1": 12, + "cpu2": 13, + }, + time.Now()), + []string{"temperature.cpu1", "temperature.cpu2"}, + nil, + }, + { + testutil.MustMetric( + "legacy_measurement", + map[string]string{ + "clarify_input_id": "e5e82f63-3700-4997-835d-eb366b7294a2", + "xid": "78sx", + }, + map[string]interface{}{ + "value": 1337, + }, + time.Now()), + []string{"e5e82f63-3700-4997-835d-eb366b7294a2"}, + nil, + }, + } + for _, tt := range idTests { + for n, f := range tt.inMetric.FieldList() { + id, err := clfy.generateID(tt.inMetric, f) + if tt.err != nil { + require.ErrorIs(t, err, tt.err) + } else { + require.NoError(t, err) + require.True(t, slices.Contains(tt.outID, id), "\nexpected %+v\ngot %+v\n", tt.outID[n], id) + } + } + } +} + +func TestProcessMetrics(t *testing.T) { + clfy := &Clarify{ + Log: testutil.Logger{}, + IDTags: []string{"tag1", "tag2", "node_id"}, + ClarifyIDTag: "clarify_input_id", + } + var idTests = []struct { + inMetric telegraf.Metric + outFrame views.DataFrame + outSignals map[string]views.SignalSave + }{ + { + testutil.MustMetric( + "cpu1", + map[string]string{ + "tag1": "78sx", + }, + map[string]interface{}{ + "time_idle": 1337.3, + }, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)), + views.DataFrame{ + "cpu1.time_idle.78sx": views.DataSeries{ + 1257894000000000: 1337.3, + }, + }, + map[string]views.SignalSave{ + "cpu1.time_idle.78sx": { + SignalSaveAttributes: views.SignalSaveAttributes{ + Name: "cpu1.time_idle", + Labels: map[string][]string{ + "tag1": {"78sx"}, + }, + }, + }, + }, + }, + { + testutil.MustMetric( + "cpu2", + map[string]string{ + "tag1": "78sx", + "tag2": "33t2", + }, + map[string]interface{}{ + "time_idle": 200, + }, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)), + views.DataFrame{ + "cpu2.time_idle.78sx.33t2": views.DataSeries{ + 1257894000000000: 200, + }, + }, + map[string]views.SignalSave{ + "cpu2.time_idle.78sx.33t2": { + SignalSaveAttributes: views.SignalSaveAttributes{ + Name: "cpu2.time_idle", + Labels: map[string][]string{ + "tag1": {"78sx"}, + "tag2": {"33t2"}, + }, + }, + }, + }, + }, + { + testutil.MustMetric( + "temperature", + map[string]string{}, + map[string]interface{}{ + "cpu1": 12, + "cpu2": 13, + }, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)), + views.DataFrame{ + "temperature.cpu1": views.DataSeries{ + 1257894000000000: 12, + }, + "temperature.cpu2": views.DataSeries{ + 1257894000000000: 13, + }, + }, + map[string]views.SignalSave{ + "temperature.cpu1": { + SignalSaveAttributes: views.SignalSaveAttributes{ + Name: "temperature.cpu1", + }, + }, + "temperature.cpu2": { + SignalSaveAttributes: views.SignalSaveAttributes{ + Name: "temperature.cpu2", + }, + }, + }, + }, + { + testutil.MustMetric( + "legacy_measurement", + map[string]string{ + "clarify_input_id": "e5e82f63-3700-4997-835d-eb366b7294a2", + "xid": "78sx", + }, + map[string]interface{}{ + "value": 123.333, + }, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)), + views.DataFrame{ + "e5e82f63-3700-4997-835d-eb366b7294a2": views.DataSeries{ + 1257894000000000: 123.333, + }, + }, + map[string]views.SignalSave{ + "e5e82f63-3700-4997-835d-eb366b7294a2": { + SignalSaveAttributes: views.SignalSaveAttributes{ + Name: "legacy_measurement.value", + Labels: map[string][]string{ + "clarify-input-id": {"e5e82f63-3700-4997-835d-eb366b7294a2"}, + "xid": {"78sx"}, + }, + }, + }, + }, + }, + { + testutil.MustMetric( + "opc_metric", + map[string]string{ + "node_id": "ns=1;s=Omron PLC.Objects.new_Controller_0.GlobalVars.counter1", + }, + map[string]interface{}{ + "value": 12345.6789, + "quality": "GOOD", + }, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)), + views.DataFrame{ + "opc_metric.value.ns_1_s_Omron_PLC.Objects.new_Controller_0.GlobalVars.counter1": views.DataSeries{ + 1257894000000000: 12345.6789, + }, + }, + map[string]views.SignalSave{ + "opc_metric.value.ns_1_s_Omron_PLC.Objects.new_Controller_0.GlobalVars.counter1": { + SignalSaveAttributes: views.SignalSaveAttributes{ + Name: "opc_metric.value", + Labels: map[string][]string{ + "node-id": {"ns=1;s=Omron PLC.Objects.new_Controller_0.GlobalVars.counter1"}, + }, + }, + }, + }, + }, + } + for _, tt := range idTests { + of, os := clfy.processMetrics([]telegraf.Metric{tt.inMetric}) + require.EqualValues(t, tt.outFrame, of) + require.EqualValues(t, tt.outSignals, os) + } +} + +func TestTimeout(t *testing.T) { + clfy := &Clarify{ + Log: testutil.Logger{}, + Timeout: config.Duration(1 * time.Millisecond), + client: clarify.NewClient("c8bvu9fqfsjctpv7b6fg", &MockHandler{ + sleep: 6 * time.Millisecond, + jsonResult: validResponse, + }), + } + + metrics := []telegraf.Metric{} + err := clfy.Write(metrics) + require.ErrorIs(t, err, errTimeout) +} + +func TestInit(t *testing.T) { + username := config.NewSecret([]byte("user")) + + clfy := &Clarify{ + Log: testutil.Logger{}, + Timeout: config.Duration(1 * time.Millisecond), + client: clarify.NewClient("c8bvu9fqfsjctpv7b6fg", &MockHandler{ + sleep: 6 * time.Millisecond, + jsonResult: validResponse, + }), + Username: username, + CredentialsFile: "file", + } + require.ErrorIs(t, clfy.Init(), errCredentials) +} diff --git a/plugins/outputs/clarify/sample.conf b/plugins/outputs/clarify/sample.conf new file mode 100644 index 000000000..436f3a60c --- /dev/null +++ b/plugins/outputs/clarify/sample.conf @@ -0,0 +1,15 @@ +## Configuration to publish Telegraf metrics to Clarify +[[outputs.clarify]] + ## Credentials File (Oauth 2.0 from Clarify integration) + credentials_file = "/path/to/clarify/credentials.json" + + ## Clarify username password (Basic Auth from Clarify integration) + username = "i-am-bob" + password = "secret-password" + + ## Timeout for Clarify operations + # timeout = "20s" + + ## Optional tags to be included when generating the unique ID for a signal in Clarify + # id_tags = [] + # clarify_id_tag = 'clarify_input_id'