feat(outputs): Add Nebius Cloud Monitoring plugin (#13379)
This commit is contained in:
parent
a2f65d5728
commit
dada11e228
|
|
@ -0,0 +1,5 @@
|
||||||
|
//go:build !custom || outputs || outputs.nebius_cloud_monitoring
|
||||||
|
|
||||||
|
package all
|
||||||
|
|
||||||
|
import _ "github.com/influxdata/telegraf/plugins/outputs/nebius_cloud_monitoring" // register plugin
|
||||||
|
|
@ -0,0 +1,36 @@
|
||||||
|
# Nebius Cloud Monitoring Output Plugin
|
||||||
|
|
||||||
|
This plugin will send custom metrics to
|
||||||
|
[Nebuis Cloud Monitoring](https://nebius.com/il/services/monitoring).
|
||||||
|
|
||||||
|
## Global configuration options <!-- @/docs/includes/plugin_config.md -->
|
||||||
|
|
||||||
|
In addition to the plugin-specific configuration settings, plugins support
|
||||||
|
additional global and plugin configuration settings. These settings are used to
|
||||||
|
modify metrics, tags, and field or create aliases and configure ordering, etc.
|
||||||
|
See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
||||||
|
|
||||||
|
[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
```toml @sample.conf
|
||||||
|
# Send aggregated metrics to Nebius.Cloud Monitoring
|
||||||
|
[[outputs.nebius_cloud_monitoring]]
|
||||||
|
## Timeout for HTTP writes.
|
||||||
|
# timeout = "20s"
|
||||||
|
|
||||||
|
## Nebius.Cloud monitoring API endpoint. Normally should not be changed
|
||||||
|
# endpoint = "https://monitoring.api.il.nebius.cloud/monitoring/v2/data/write"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Authentication
|
||||||
|
|
||||||
|
This plugin currently only supports Compute metadata based authentication
|
||||||
|
in Nebius Cloud Platform.
|
||||||
|
|
||||||
|
When plugin is working inside a Compute instance it will take IAM token and
|
||||||
|
Folder ID from instance metadata. In this plugin we use [Google Cloud notation]
|
||||||
|
This internal metadata endpoint is only accessible for VMs from the cloud.
|
||||||
|
|
||||||
|
[Google Cloud notation]: https://nebius.com/il/docs/compute/operations/vm-info/get-info#gce-metadata
|
||||||
|
|
@ -0,0 +1,244 @@
|
||||||
|
//go:generate ../../../tools/readme_config_includer/generator
|
||||||
|
package nebius_cloud_monitoring
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
_ "embed"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/config"
|
||||||
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||||||
|
"github.com/influxdata/telegraf/selfstat"
|
||||||
|
)
|
||||||
|
|
||||||
|
//go:embed sample.conf
|
||||||
|
var sampleConfig string
|
||||||
|
|
||||||
|
// NebiusCloudMonitoring allows publishing of metrics to the Nebius Cloud Monitoring custom metrics
|
||||||
|
// service
|
||||||
|
type NebiusCloudMonitoring struct {
|
||||||
|
Timeout config.Duration `toml:"timeout"`
|
||||||
|
Endpoint string `toml:"endpoint"`
|
||||||
|
|
||||||
|
Log telegraf.Logger `toml:"-"`
|
||||||
|
|
||||||
|
metadataTokenURL string
|
||||||
|
metadataFolderURL string
|
||||||
|
folderID string
|
||||||
|
iamToken string
|
||||||
|
iamTokenExpirationTime time.Time
|
||||||
|
service string
|
||||||
|
|
||||||
|
client *http.Client
|
||||||
|
|
||||||
|
MetricOutsideWindow selfstat.Stat
|
||||||
|
}
|
||||||
|
|
||||||
|
type nebiusCloudMonitoringMessage struct {
|
||||||
|
TS string `json:"ts,omitempty"`
|
||||||
|
Labels map[string]string `json:"labels,omitempty"`
|
||||||
|
Metrics []nebiusCloudMonitoringMetric `json:"metrics"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type nebiusCloudMonitoringMetric struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Labels map[string]string `json:"labels"`
|
||||||
|
MetricType string `json:"type,omitempty"` // DGAUGE|IGAUGE|COUNTER|RATE. Default: DGAUGE
|
||||||
|
TS string `json:"ts,omitempty"`
|
||||||
|
Value float64 `json:"value"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type metadataIamToken struct {
|
||||||
|
AccessToken string `json:"access_token"`
|
||||||
|
ExpiresIn int64 `json:"expires_in"`
|
||||||
|
TokenType string `json:"token_type"`
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultRequestTimeout = time.Second * 20
|
||||||
|
defaultEndpoint = "https://monitoring.api.il.nebius.cloud/monitoring/v2/data/write"
|
||||||
|
/*
|
||||||
|
There is no DNS for metadata endpoint in Nebius Cloud yet.
|
||||||
|
So the only way is to hardcode reserved IP (https://en.wikipedia.org/wiki/Link-local_address)
|
||||||
|
*/
|
||||||
|
//nolint:gosec // G101: Potential hardcoded credentials - false positive
|
||||||
|
defaultMetadataTokenURL = "http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default/token"
|
||||||
|
defaultMetadataFolderURL = "http://169.254.169.254/computeMetadata/v1/yandex/folder-id"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (*NebiusCloudMonitoring) SampleConfig() string {
|
||||||
|
return sampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *NebiusCloudMonitoring) Init() error {
|
||||||
|
if a.Timeout <= 0 {
|
||||||
|
a.Timeout = config.Duration(defaultRequestTimeout)
|
||||||
|
}
|
||||||
|
if a.Endpoint == "" {
|
||||||
|
a.Endpoint = defaultEndpoint
|
||||||
|
}
|
||||||
|
if a.service == "" {
|
||||||
|
a.service = "custom"
|
||||||
|
}
|
||||||
|
if a.metadataTokenURL == "" {
|
||||||
|
a.metadataTokenURL = defaultMetadataTokenURL
|
||||||
|
}
|
||||||
|
if a.metadataFolderURL == "" {
|
||||||
|
a.metadataFolderURL = defaultMetadataFolderURL
|
||||||
|
}
|
||||||
|
|
||||||
|
a.client = &http.Client{
|
||||||
|
Transport: &http.Transport{
|
||||||
|
Proxy: http.ProxyFromEnvironment,
|
||||||
|
},
|
||||||
|
Timeout: time.Duration(a.Timeout),
|
||||||
|
}
|
||||||
|
tags := map[string]string{}
|
||||||
|
a.MetricOutsideWindow = selfstat.Register("nebius_cloud_monitoring", "metric_outside_window", tags)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect initializes the plugin and validates connectivity
|
||||||
|
func (a *NebiusCloudMonitoring) Connect() error {
|
||||||
|
a.Log.Debugf("Getting folder ID in %s", a.metadataFolderURL)
|
||||||
|
body, err := a.getResponseFromMetadata(a.client, a.metadataFolderURL)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
a.folderID = string(body)
|
||||||
|
if a.folderID == "" {
|
||||||
|
return fmt.Errorf("unable to fetch folder id from URL %s: %w", a.metadataFolderURL, err)
|
||||||
|
}
|
||||||
|
a.Log.Infof("Writing to Nebius.Cloud Monitoring URL: %s", a.Endpoint)
|
||||||
|
a.Log.Infof("FolderID: %s", a.folderID)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close shuts down an any active connections
|
||||||
|
func (a *NebiusCloudMonitoring) Close() error {
|
||||||
|
a.client = nil
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write writes metrics to the remote endpoint
|
||||||
|
func (a *NebiusCloudMonitoring) Write(metrics []telegraf.Metric) error {
|
||||||
|
var nebiusCloudMonitoringMetrics []nebiusCloudMonitoringMetric
|
||||||
|
for _, m := range metrics {
|
||||||
|
for _, field := range m.FieldList() {
|
||||||
|
value, err := internal.ToFloat64(field.Value)
|
||||||
|
if err != nil {
|
||||||
|
a.Log.Errorf("skipping value: %w", err.Error())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
nebiusCloudMonitoringMetrics = append(
|
||||||
|
nebiusCloudMonitoringMetrics,
|
||||||
|
nebiusCloudMonitoringMetric{
|
||||||
|
Name: m.Name() + "_" + field.Key,
|
||||||
|
Labels: m.Tags(),
|
||||||
|
TS: m.Time().Format(time.RFC3339),
|
||||||
|
Value: value,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
body, err := json.Marshal(
|
||||||
|
nebiusCloudMonitoringMessage{
|
||||||
|
Metrics: nebiusCloudMonitoringMetrics,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
body = append(body, '\n')
|
||||||
|
return a.send(body)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *NebiusCloudMonitoring) getResponseFromMetadata(c *http.Client, metadataURL string) ([]byte, error) {
|
||||||
|
req, err := http.NewRequest("GET", metadataURL, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error creating request: %w", err)
|
||||||
|
}
|
||||||
|
req.Header.Set("Metadata-Flavor", "Google")
|
||||||
|
resp, err := c.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
body, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
|
||||||
|
return nil, fmt.Errorf("unable to fetch instance metadata: [%s] %d",
|
||||||
|
metadataURL, resp.StatusCode)
|
||||||
|
}
|
||||||
|
return body, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *NebiusCloudMonitoring) getIAMTokenFromMetadata() (string, int, error) {
|
||||||
|
a.Log.Debugf("Getting new IAM token in %s", a.metadataTokenURL)
|
||||||
|
body, err := a.getResponseFromMetadata(a.client, a.metadataTokenURL)
|
||||||
|
if err != nil {
|
||||||
|
return "", 0, err
|
||||||
|
}
|
||||||
|
var metadata metadataIamToken
|
||||||
|
if err := json.Unmarshal(body, &metadata); err != nil {
|
||||||
|
return "", 0, err
|
||||||
|
}
|
||||||
|
if metadata.AccessToken == "" || metadata.ExpiresIn == 0 {
|
||||||
|
return "", 0, fmt.Errorf("unable to fetch authentication credentials %s: %w", a.metadataTokenURL, err)
|
||||||
|
}
|
||||||
|
return metadata.AccessToken, int(metadata.ExpiresIn), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *NebiusCloudMonitoring) send(body []byte) error {
|
||||||
|
req, err := http.NewRequest("POST", a.Endpoint, bytes.NewBuffer(body))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
q := req.URL.Query()
|
||||||
|
q.Add("folderId", a.folderID)
|
||||||
|
q.Add("service", a.service)
|
||||||
|
req.URL.RawQuery = q.Encode()
|
||||||
|
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
isTokenExpired := a.iamTokenExpirationTime.Before(time.Now())
|
||||||
|
if a.iamToken == "" || isTokenExpired {
|
||||||
|
token, expiresIn, err := a.getIAMTokenFromMetadata()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
a.iamTokenExpirationTime = time.Now().Add(time.Duration(expiresIn) * time.Second)
|
||||||
|
a.iamToken = token
|
||||||
|
}
|
||||||
|
req.Header.Set("Authorization", "Bearer "+a.iamToken)
|
||||||
|
|
||||||
|
resp, err := a.client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
_, err = io.ReadAll(resp.Body)
|
||||||
|
if err != nil || resp.StatusCode < 200 || resp.StatusCode > 299 {
|
||||||
|
return fmt.Errorf("failed to write batch: [%v] %s", resp.StatusCode, resp.Status)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
outputs.Add("nebius_cloud_monitoring", func() telegraf.Output {
|
||||||
|
return &NebiusCloudMonitoring{}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,139 @@
|
||||||
|
package nebius_cloud_monitoring
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func readBody(r *http.Request) (nebiusCloudMonitoringMessage, error) {
|
||||||
|
decoder := json.NewDecoder(r.Body)
|
||||||
|
var message nebiusCloudMonitoringMessage
|
||||||
|
err := decoder.Decode(&message)
|
||||||
|
return message, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWrite(t *testing.T) {
|
||||||
|
testMetadataHTTPServer := httptest.NewServer(
|
||||||
|
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if strings.HasSuffix(r.URL.Path, "/token") {
|
||||||
|
token := metadataIamToken{
|
||||||
|
AccessToken: "token1",
|
||||||
|
ExpiresIn: 123,
|
||||||
|
}
|
||||||
|
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||||||
|
err := json.NewEncoder(w).Encode(token)
|
||||||
|
require.NoError(t, err)
|
||||||
|
} else if strings.HasSuffix(r.URL.Path, "/folder") {
|
||||||
|
_, err := io.WriteString(w, "folder1")
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
defer testMetadataHTTPServer.Close()
|
||||||
|
metadataTokenURL := "http://" + testMetadataHTTPServer.Listener.Addr().String() + "/token"
|
||||||
|
metadataFolderURL := "http://" + testMetadataHTTPServer.Listener.Addr().String() + "/folder"
|
||||||
|
|
||||||
|
ts := httptest.NewServer(http.NotFoundHandler())
|
||||||
|
defer ts.Close()
|
||||||
|
url := "http://" + ts.Listener.Addr().String() + "/metrics"
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
plugin *NebiusCloudMonitoring
|
||||||
|
metrics []telegraf.Metric
|
||||||
|
handler func(t *testing.T, w http.ResponseWriter, r *http.Request)
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "metric is converted to json value",
|
||||||
|
plugin: &NebiusCloudMonitoring{},
|
||||||
|
metrics: []telegraf.Metric{
|
||||||
|
testutil.MustMetric(
|
||||||
|
"cluster",
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"cpu": 42.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
|
||||||
|
message, err := readBody(r)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, message.Metrics, 1)
|
||||||
|
require.Equal(t, "cluster_cpu", message.Metrics[0].Name)
|
||||||
|
require.Equal(t, 42.0, message.Metrics[0].Value)
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "int64 metric is converted to json value",
|
||||||
|
plugin: &NebiusCloudMonitoring{},
|
||||||
|
metrics: []telegraf.Metric{
|
||||||
|
testutil.MustMetric(
|
||||||
|
"cluster",
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": int64(9223372036854775806),
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
|
||||||
|
message, err := readBody(r)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, message.Metrics, 1)
|
||||||
|
require.Equal(t, "cluster_value", message.Metrics[0].Name)
|
||||||
|
require.Equal(t, float64(9.223372036854776e+18), message.Metrics[0].Value)
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "int metric is converted to json value",
|
||||||
|
plugin: &NebiusCloudMonitoring{},
|
||||||
|
metrics: []telegraf.Metric{
|
||||||
|
testutil.MustMetric(
|
||||||
|
"cluster",
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": 9226,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
|
||||||
|
message, err := readBody(r)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, message.Metrics, 1)
|
||||||
|
require.Equal(t, "cluster_value", message.Metrics[0].Name)
|
||||||
|
require.Equal(t, float64(9226), message.Metrics[0].Value)
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
tt.handler(t, w, r)
|
||||||
|
})
|
||||||
|
tt.plugin = &NebiusCloudMonitoring{
|
||||||
|
Endpoint: url,
|
||||||
|
metadataTokenURL: metadataTokenURL,
|
||||||
|
metadataFolderURL: metadataFolderURL,
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
}
|
||||||
|
require.NoError(t, tt.plugin.Init())
|
||||||
|
require.NoError(t, tt.plugin.Connect())
|
||||||
|
require.NoError(t, tt.plugin.Write(tt.metrics))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,7 @@
|
||||||
|
# Send aggregated metrics to Nebius.Cloud Monitoring
|
||||||
|
[[outputs.nebius_cloud_monitoring]]
|
||||||
|
## Timeout for HTTP writes.
|
||||||
|
# timeout = "20s"
|
||||||
|
|
||||||
|
## Nebius.Cloud monitoring API endpoint. Normally should not be changed
|
||||||
|
# endpoint = "https://monitoring.api.il.nebius.cloud/monitoring/v2/data/write"
|
||||||
Loading…
Reference in New Issue