feat: Add Clarify output plugin (#13220)

This commit is contained in:
Bernt-Johan Bergshaven 2023-05-22 20:08:55 +02:00 committed by GitHub
parent 7570feda01
commit df166cfbcc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 634 additions and 0 deletions

View File

@ -87,6 +87,7 @@ following works:
- github.com/cenkalti/backoff [MIT License](https://github.com/cenkalti/backoff/blob/master/LICENSE)
- github.com/cespare/xxhash [MIT License](https://github.com/cespare/xxhash/blob/master/LICENSE.txt)
- github.com/cisco-ie/nx-telemetry-proto [Apache License 2.0](https://github.com/cisco-ie/nx-telemetry-proto/blob/master/LICENSE)
- github.com/clarify/clarify-go [Apache License 2.0](https://github.com/clarify/clarify-go/blob/master/LICENSE)
- github.com/cloudevents/sdk-go [Apache License 2.0](https://github.com/cloudevents/sdk-go/blob/main/LICENSE)
- github.com/containerd/containerd [Apache License 2.0](https://github.com/containerd/containerd/blob/master/LICENSE)
- github.com/coocood/freecache [MIT License](https://github.com/coocood/freecache/blob/master/LICENSE)

1
go.mod
View File

@ -53,6 +53,7 @@ require (
github.com/bmatcuk/doublestar/v3 v3.0.0
github.com/caio/go-tdigest v3.1.0+incompatible
github.com/cisco-ie/nx-telemetry-proto v0.0.0-20230117155933-f64c045c77df
github.com/clarify/clarify-go v0.2.4
github.com/coocood/freecache v1.2.3
github.com/coreos/go-semver v0.3.1
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f

2
go.sum
View File

@ -398,6 +398,8 @@ github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6D
github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I=
github.com/cisco-ie/nx-telemetry-proto v0.0.0-20230117155933-f64c045c77df h1:GmrltUp5Qf5XhT+LmqMDizsgm/6VHTSxPWRdrq21yRo=
github.com/cisco-ie/nx-telemetry-proto v0.0.0-20230117155933-f64c045c77df/go.mod h1:rJDd05J5hqWVU9MjJ+5jw1CuLn/jRhvU0xtFEzzqjwM=
github.com/clarify/clarify-go v0.2.4 h1:4MH6UHS3PFSNeitAkS/k3ur6ASxZpiRa6EezkbCVLVs=
github.com/clarify/clarify-go v0.2.4/go.mod h1:bdKwACxI2WZMdlFQOun4J8H5wG0Dbn1bKWelgxsDJaM=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudevents/sdk-go/v2 v2.14.0 h1:Nrob4FwVgi5L4tV9lhjzZcjYqFVyJzsA56CwPaPfv6s=
github.com/cloudevents/sdk-go/v2 v2.14.0/go.mod h1:xDmKfzNjM8gBvjaF8ijFjM1VYOVUEeUfapHMUX1T5To=

View File

@ -53,6 +53,11 @@ func ToFloat64(value interface{}) (float64, error) {
return strconv.ParseFloat(string(v), 64)
case fmt.Stringer:
return strconv.ParseFloat(v.String(), 64)
case bool:
if v {
return float64(1), nil
}
return float64(0), nil
case int:
return float64(v), nil
case int8:
@ -91,6 +96,11 @@ func ToInt64(value interface{}) (int64, error) {
return strconv.ParseInt(string(v), 10, 64)
case fmt.Stringer:
return strconv.ParseInt(v.String(), 10, 64)
case bool:
if v {
return int64(1), nil
}
return int64(0), nil
case int:
return int64(v), nil
case int8:
@ -129,6 +139,11 @@ func ToUint64(value interface{}) (uint64, error) {
return strconv.ParseUint(string(v), 10, 64)
case fmt.Stringer:
return strconv.ParseUint(v.String(), 10, 64)
case bool:
if v {
return uint64(1), nil
}
return uint64(0), nil
case int:
return uint64(v), nil
case int8:

View File

@ -0,0 +1,5 @@
//go:build !custom || outputs || outputs.clarify
package all
import _ "github.com/influxdata/telegraf/plugins/outputs/clarify" // register plugin

View File

@ -0,0 +1,87 @@
# Clarify Output Plugin
This plugin writes to [Clarify][clarify]. To use this plugin you will
need to obtain a set of [credentials][credentials].
## Global configuration options <!-- @/docs/includes/plugin_config.md -->
In addition to the plugin-specific configuration settings, plugins support
additional global and plugin configuration settings. These settings are used to
modify metrics, tags, and field or create aliases and configure ordering, etc.
See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins
## Configuration
```toml @sample.conf
## Configuration to publish Telegraf metrics to Clarify
[[outputs.clarify]]
## Credentials File (Oauth 2.0 from Clarify integration)
credentials_file = "/path/to/clarify/credentials.json"
## Clarify username password (Basic Auth from Clarify integration)
username = "i-am-bob"
password = "secret-password"
## Timeout for Clarify operations
# timeout = "20s"
## Optional tags to be included when generating the unique ID for a signal in Clarify
# id_tags = []
# clarify_id_tag = 'clarify_input_id'
```
You can use either a credentials file or username/password.
If both are present and valid in the configuration the
credentials file will be used.
## How Telegraf Metrics map to Clarify signals
Clarify signal names are formed by joining the Telegraf metric name and the
field key with a `.` character. Telegraf tags are added to signal labels.
If you wish to specify a specific tag to use as the input id, set the config
option `clarify_id_tag` to the tag containing the id to be used.
If this tag is present and there is only one field present in the metric,
this tag will be used as the inputID in Clarify. If there are more fields
available in the metric, the tag will be ignored and normal id generation
will be used.
If information from one or several tags is needed to uniquely identify a metric
field, the id_tags array can be added to the config with the needed tag names.
E.g:
`id_tags = ['sensor']`
Clarify only supports values that can be converted to floating point numbers.
Strings and invalid numbers are ignored.
## Example
The following input would be stored in Clarify with the values shown below:
```text
temperature,host=demo.clarifylocal,sensor=TC0P value=49 1682670910000000000
```
```json
"signal" {
"id": "temperature.value.TC0P"
"name": "temperature.value"
"labels": {
"host": ["demo.clarifylocal"],
"sensor": ["TC0P"]
}
}
"values" {
"times": ["2023-04-28T08:43:16+00:00"],
"series": {
"temperature.value.TC0P": [49]
}
}
```
[clarify]: https://clarify.io
[clarifydoc]: https://docs.clarify.io
[credentials]: https://docs.clarify.io/users/admin/integrations/credentials

View File

@ -0,0 +1,190 @@
//go:generate ../../../tools/readme_config_includer/generator
package clarify
import (
"context"
_ "embed"
"errors"
"fmt"
"strings"
"time"
"github.com/clarify/clarify-go"
"github.com/clarify/clarify-go/fields"
"github.com/clarify/clarify-go/views"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
)
type Clarify struct {
Username config.Secret `toml:"username"`
Password config.Secret `toml:"password"`
CredentialsFile string `toml:"credentials_file"`
Timeout config.Duration `toml:"timeout"`
IDTags []string `toml:"id_tags"`
ClarifyIDTag string `toml:"clarify_id_tag"`
Log telegraf.Logger `toml:"-"`
client *clarify.Client
}
var errIDTooLong = errors.New("id too long (>128)")
var errCredentials = errors.New("only credentials_file OR username/password can be specified")
const defaultTimeout = config.Duration(20 * time.Second)
const allowedIDRunes = `abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890-_:.#+/`
//go:embed sample.conf
var sampleConfig string
func (c *Clarify) Init() error {
if c.Timeout <= 0 {
c.Timeout = defaultTimeout
}
// Not blocking as it doesn't do any http requests, just sets up the necessarry Oauth2 client.
ctx := context.Background()
switch {
case c.CredentialsFile != "":
if !c.Username.Empty() || !c.Password.Empty() {
return errCredentials
}
creds, err := clarify.CredentialsFromFile(c.CredentialsFile)
if err != nil {
return err
}
c.client = creds.Client(ctx)
return nil
case !c.Username.Empty() && !c.Password.Empty():
username, err := c.Username.Get()
if err != nil {
return fmt.Errorf("getting username failed: %w", err)
}
password, err := c.Password.Get()
if err != nil {
config.ReleaseSecret(username)
return fmt.Errorf("getting password failed: %w", err)
}
creds := clarify.BasicAuthCredentials(string(username), string(password))
config.ReleaseSecret(username)
config.ReleaseSecret(password)
c.client = creds.Client(ctx)
return nil
}
return errors.New("no credentials provided")
}
func (c *Clarify) Connect() error {
return nil
}
func (c *Clarify) Write(metrics []telegraf.Metric) error {
frame, signals := c.processMetrics(metrics)
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, time.Duration(c.Timeout))
defer cancel()
if _, err := c.client.Insert(frame).Do(ctx); err != nil {
return fmt.Errorf("inserting failed: %w", err)
}
if _, err := c.client.SaveSignals(signals).Do(ctx); err != nil {
return fmt.Errorf("saving signals failed: %w", err)
}
return nil
}
func (c *Clarify) processMetrics(metrics []telegraf.Metric) (views.DataFrame, map[string]views.SignalSave) {
signals := make(map[string]views.SignalSave)
frame := views.DataFrame{}
for _, m := range metrics {
for _, f := range m.FieldList() {
value, err := internal.ToFloat64(f.Value)
if err != nil {
c.Log.Warnf("Skipping field %q of metric %q: %s", f.Key, m.Name(), err.Error())
continue
}
id, err := c.generateID(m, f)
if err != nil {
c.Log.Warnf("Skipping field %q of metric %q: %s", f.Key, m.Name(), err.Error())
continue
}
ts := fields.AsTimestamp(m.Time())
if _, ok := frame[id]; ok {
frame[id][ts] = value
} else {
frame[id] = views.DataSeries{ts: value}
}
s := views.SignalSave{}
s.Name = m.Name() + "." + f.Key
for _, t := range m.TagList() {
labelName := strings.ReplaceAll(t.Key, " ", "-")
labelName = strings.ReplaceAll(labelName, "_", "-")
labelName = strings.ToLower(labelName)
s.Labels.Add(labelName, t.Value)
}
signals[id] = s
}
}
return frame, signals
}
func normalizeID(id string) string {
return strings.Map(func(r rune) rune {
if strings.ContainsRune(allowedIDRunes, r) {
return r
}
return '_'
}, id)
}
func (c *Clarify) generateID(m telegraf.Metric, f *telegraf.Field) (string, error) {
var id string
if c.ClarifyIDTag != "" {
if cid, exist := m.GetTag(c.ClarifyIDTag); exist && len(m.FieldList()) == 1 {
id = cid
}
}
if id == "" {
parts := make([]string, 0, len(c.IDTags)+2)
parts = append(parts, m.Name(), f.Key)
for _, idTag := range c.IDTags {
if k, found := m.GetTag(idTag); found {
parts = append(parts, k)
}
}
id = strings.Join(parts, ".")
}
id = normalizeID(id)
if len(id) > 128 {
return id, errIDTooLong
}
return id, nil
}
func (c *Clarify) SampleConfig() string {
return sampleConfig
}
func (c *Clarify) Close() error {
c.client = nil
return nil
}
func init() {
outputs.Add("clarify", func() telegraf.Output {
return &Clarify{
Timeout: defaultTimeout,
}
})
}

View File

@ -0,0 +1,318 @@
package clarify
import (
"context"
"encoding/json"
"errors"
"math"
"testing"
"time"
"github.com/clarify/clarify-go"
"github.com/clarify/clarify-go/jsonrpc"
"github.com/clarify/clarify-go/views"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
)
var errTimeout = errors.New("timeout: operation timed out")
const validResponse = `{
"signalsByInput" : {
"test1.value" : {
"id": "c8bvu9fqfsjctpv7b6fg",
"created" : true
}
}
}`
type MockHandler struct {
jsonResult string
sleep time.Duration
}
func (m *MockHandler) Do(ctx context.Context, _ jsonrpc.Request, result any) error {
err := json.Unmarshal([]byte(m.jsonResult), result)
if m.sleep > 0 {
timer := time.NewTimer(m.sleep)
select {
case <-ctx.Done():
timer.Stop()
return errTimeout
case <-timer.C:
timer.Stop()
return nil
}
}
return err
}
func TestGenerateID(t *testing.T) {
clfy := &Clarify{
Log: testutil.Logger{},
IDTags: []string{"tag1", "tag2"},
ClarifyIDTag: "clarify_input_id",
}
var idTests = []struct {
inMetric telegraf.Metric
outID []string
err error
}{
{
testutil.MustMetric(
"cpu+='''..2!@#$abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890",
map[string]string{
"tag1": "78sx",
},
map[string]interface{}{
"time_idle": math.NaN(),
},
time.Now()),
[]string{"cpu.time_idle.78sx"},
errIDTooLong,
},
{
testutil.MustMetric(
"cpu@@",
map[string]string{
"tag1": "78sx",
"tag2": "33t2",
},
map[string]interface{}{
"time_idle": math.NaN(),
},
time.Now()),
[]string{"cpu__.time_idle.78sx.33t2"},
nil,
},
{
testutil.MustMetric(
"temperature",
map[string]string{},
map[string]interface{}{
"cpu1": 12,
"cpu2": 13,
},
time.Now()),
[]string{"temperature.cpu1", "temperature.cpu2"},
nil,
},
{
testutil.MustMetric(
"legacy_measurement",
map[string]string{
"clarify_input_id": "e5e82f63-3700-4997-835d-eb366b7294a2",
"xid": "78sx",
},
map[string]interface{}{
"value": 1337,
},
time.Now()),
[]string{"e5e82f63-3700-4997-835d-eb366b7294a2"},
nil,
},
}
for _, tt := range idTests {
for n, f := range tt.inMetric.FieldList() {
id, err := clfy.generateID(tt.inMetric, f)
if tt.err != nil {
require.ErrorIs(t, err, tt.err)
} else {
require.NoError(t, err)
require.True(t, slices.Contains(tt.outID, id), "\nexpected %+v\ngot %+v\n", tt.outID[n], id)
}
}
}
}
func TestProcessMetrics(t *testing.T) {
clfy := &Clarify{
Log: testutil.Logger{},
IDTags: []string{"tag1", "tag2", "node_id"},
ClarifyIDTag: "clarify_input_id",
}
var idTests = []struct {
inMetric telegraf.Metric
outFrame views.DataFrame
outSignals map[string]views.SignalSave
}{
{
testutil.MustMetric(
"cpu1",
map[string]string{
"tag1": "78sx",
},
map[string]interface{}{
"time_idle": 1337.3,
},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)),
views.DataFrame{
"cpu1.time_idle.78sx": views.DataSeries{
1257894000000000: 1337.3,
},
},
map[string]views.SignalSave{
"cpu1.time_idle.78sx": {
SignalSaveAttributes: views.SignalSaveAttributes{
Name: "cpu1.time_idle",
Labels: map[string][]string{
"tag1": {"78sx"},
},
},
},
},
},
{
testutil.MustMetric(
"cpu2",
map[string]string{
"tag1": "78sx",
"tag2": "33t2",
},
map[string]interface{}{
"time_idle": 200,
},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)),
views.DataFrame{
"cpu2.time_idle.78sx.33t2": views.DataSeries{
1257894000000000: 200,
},
},
map[string]views.SignalSave{
"cpu2.time_idle.78sx.33t2": {
SignalSaveAttributes: views.SignalSaveAttributes{
Name: "cpu2.time_idle",
Labels: map[string][]string{
"tag1": {"78sx"},
"tag2": {"33t2"},
},
},
},
},
},
{
testutil.MustMetric(
"temperature",
map[string]string{},
map[string]interface{}{
"cpu1": 12,
"cpu2": 13,
},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)),
views.DataFrame{
"temperature.cpu1": views.DataSeries{
1257894000000000: 12,
},
"temperature.cpu2": views.DataSeries{
1257894000000000: 13,
},
},
map[string]views.SignalSave{
"temperature.cpu1": {
SignalSaveAttributes: views.SignalSaveAttributes{
Name: "temperature.cpu1",
},
},
"temperature.cpu2": {
SignalSaveAttributes: views.SignalSaveAttributes{
Name: "temperature.cpu2",
},
},
},
},
{
testutil.MustMetric(
"legacy_measurement",
map[string]string{
"clarify_input_id": "e5e82f63-3700-4997-835d-eb366b7294a2",
"xid": "78sx",
},
map[string]interface{}{
"value": 123.333,
},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)),
views.DataFrame{
"e5e82f63-3700-4997-835d-eb366b7294a2": views.DataSeries{
1257894000000000: 123.333,
},
},
map[string]views.SignalSave{
"e5e82f63-3700-4997-835d-eb366b7294a2": {
SignalSaveAttributes: views.SignalSaveAttributes{
Name: "legacy_measurement.value",
Labels: map[string][]string{
"clarify-input-id": {"e5e82f63-3700-4997-835d-eb366b7294a2"},
"xid": {"78sx"},
},
},
},
},
},
{
testutil.MustMetric(
"opc_metric",
map[string]string{
"node_id": "ns=1;s=Omron PLC.Objects.new_Controller_0.GlobalVars.counter1",
},
map[string]interface{}{
"value": 12345.6789,
"quality": "GOOD",
},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)),
views.DataFrame{
"opc_metric.value.ns_1_s_Omron_PLC.Objects.new_Controller_0.GlobalVars.counter1": views.DataSeries{
1257894000000000: 12345.6789,
},
},
map[string]views.SignalSave{
"opc_metric.value.ns_1_s_Omron_PLC.Objects.new_Controller_0.GlobalVars.counter1": {
SignalSaveAttributes: views.SignalSaveAttributes{
Name: "opc_metric.value",
Labels: map[string][]string{
"node-id": {"ns=1;s=Omron PLC.Objects.new_Controller_0.GlobalVars.counter1"},
},
},
},
},
},
}
for _, tt := range idTests {
of, os := clfy.processMetrics([]telegraf.Metric{tt.inMetric})
require.EqualValues(t, tt.outFrame, of)
require.EqualValues(t, tt.outSignals, os)
}
}
func TestTimeout(t *testing.T) {
clfy := &Clarify{
Log: testutil.Logger{},
Timeout: config.Duration(1 * time.Millisecond),
client: clarify.NewClient("c8bvu9fqfsjctpv7b6fg", &MockHandler{
sleep: 6 * time.Millisecond,
jsonResult: validResponse,
}),
}
metrics := []telegraf.Metric{}
err := clfy.Write(metrics)
require.ErrorIs(t, err, errTimeout)
}
func TestInit(t *testing.T) {
username := config.NewSecret([]byte("user"))
clfy := &Clarify{
Log: testutil.Logger{},
Timeout: config.Duration(1 * time.Millisecond),
client: clarify.NewClient("c8bvu9fqfsjctpv7b6fg", &MockHandler{
sleep: 6 * time.Millisecond,
jsonResult: validResponse,
}),
Username: username,
CredentialsFile: "file",
}
require.ErrorIs(t, clfy.Init(), errCredentials)
}

View File

@ -0,0 +1,15 @@
## Configuration to publish Telegraf metrics to Clarify
[[outputs.clarify]]
## Credentials File (Oauth 2.0 from Clarify integration)
credentials_file = "/path/to/clarify/credentials.json"
## Clarify username password (Basic Auth from Clarify integration)
username = "i-am-bob"
password = "secret-password"
## Timeout for Clarify operations
# timeout = "20s"
## Optional tags to be included when generating the unique ID for a signal in Clarify
# id_tags = []
# clarify_id_tag = 'clarify_input_id'