chore(outputs.azure_monitor): Cleanup code (#16437)
This commit is contained in:
parent
0609b800dd
commit
5b8dc1595f
|
|
@ -22,6 +22,7 @@ import (
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
|
"github.com/influxdata/telegraf/internal"
|
||||||
"github.com/influxdata/telegraf/metric"
|
"github.com/influxdata/telegraf/metric"
|
||||||
"github.com/influxdata/telegraf/plugins/outputs"
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||||||
"github.com/influxdata/telegraf/selfstat"
|
"github.com/influxdata/telegraf/selfstat"
|
||||||
|
|
@ -30,55 +31,14 @@ import (
|
||||||
//go:embed sample.conf
|
//go:embed sample.conf
|
||||||
var sampleConfig string
|
var sampleConfig string
|
||||||
|
|
||||||
// AzureMonitor allows publishing of metrics to the Azure Monitor custom metrics
|
const (
|
||||||
// service
|
vmInstanceMetadataURL = "http://169.254.169.254/metadata/instance?api-version=2017-12-01"
|
||||||
type AzureMonitor struct {
|
resourceIDTemplate = "/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/virtualMachines/%s"
|
||||||
Timeout config.Duration
|
resourceIDScaleSetTemplate = "/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/virtualMachineScaleSets/%s"
|
||||||
NamespacePrefix string `toml:"namespace_prefix"`
|
maxRequestBodySize = 4000000
|
||||||
StringsAsDimensions bool `toml:"strings_as_dimensions"`
|
)
|
||||||
Region string `toml:"region"`
|
|
||||||
ResourceID string `toml:"resource_id"`
|
|
||||||
EndpointURL string `toml:"endpoint_url"`
|
|
||||||
Log telegraf.Logger `toml:"-"`
|
|
||||||
|
|
||||||
url string
|
var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
|
||||||
auth autorest.Authorizer
|
|
||||||
client *http.Client
|
|
||||||
|
|
||||||
cache map[time.Time]map[uint64]*aggregate
|
|
||||||
timeFunc func() time.Time
|
|
||||||
|
|
||||||
MetricOutsideWindow selfstat.Stat
|
|
||||||
}
|
|
||||||
|
|
||||||
// VirtualMachineMetadata contains information about a VM from the metadata service
|
|
||||||
type virtualMachineMetadata struct {
|
|
||||||
Compute struct {
|
|
||||||
Location string `json:"location"`
|
|
||||||
Name string `json:"name"`
|
|
||||||
ResourceGroupName string `json:"resourceGroupName"`
|
|
||||||
SubscriptionID string `json:"subscriptionId"`
|
|
||||||
VMScaleSetName string `json:"vmScaleSetName"`
|
|
||||||
} `json:"compute"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *virtualMachineMetadata) ResourceID() string {
|
|
||||||
if m.Compute.VMScaleSetName != "" {
|
|
||||||
return fmt.Sprintf(
|
|
||||||
resourceIDScaleSetTemplate,
|
|
||||||
m.Compute.SubscriptionID,
|
|
||||||
m.Compute.ResourceGroupName,
|
|
||||||
m.Compute.VMScaleSetName,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
return fmt.Sprintf(
|
|
||||||
resourceIDTemplate,
|
|
||||||
m.Compute.SubscriptionID,
|
|
||||||
m.Compute.ResourceGroupName,
|
|
||||||
m.Compute.Name,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
type dimension struct {
|
type dimension struct {
|
||||||
name string
|
name string
|
||||||
|
|
@ -95,92 +55,330 @@ type aggregate struct {
|
||||||
updated bool
|
updated bool
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
type AzureMonitor struct {
|
||||||
defaultRequestTimeout = time.Second * 5
|
Timeout config.Duration `toml:"timeout"`
|
||||||
defaultNamespacePrefix = "Telegraf/"
|
NamespacePrefix string `toml:"namespace_prefix"`
|
||||||
defaultAuthResource = "https://monitoring.azure.com/"
|
StringsAsDimensions bool `toml:"strings_as_dimensions"`
|
||||||
|
Region string `toml:"region"`
|
||||||
|
ResourceID string `toml:"resource_id"`
|
||||||
|
EndpointURL string `toml:"endpoint_url"`
|
||||||
|
Log telegraf.Logger `toml:"-"`
|
||||||
|
|
||||||
vmInstanceMetadataURL = "http://169.254.169.254/metadata/instance?api-version=2017-12-01"
|
url string
|
||||||
resourceIDTemplate = "/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/virtualMachines/%s"
|
preparer autorest.Preparer
|
||||||
resourceIDScaleSetTemplate = "/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/virtualMachineScaleSets/%s"
|
client *http.Client
|
||||||
urlTemplate = "https://%s.monitoring.azure.com%s/metrics"
|
|
||||||
urlOverrideTemplate = "%s%s/metrics"
|
cache map[time.Time]map[uint64]*aggregate
|
||||||
maxRequestBodySize = 4000000
|
timeFunc func() time.Time
|
||||||
)
|
|
||||||
|
MetricOutsideWindow selfstat.Stat
|
||||||
|
}
|
||||||
|
|
||||||
func (*AzureMonitor) SampleConfig() string {
|
func (*AzureMonitor) SampleConfig() string {
|
||||||
return sampleConfig
|
return sampleConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
// Connect initializes the plugin and validates connectivity
|
func (a *AzureMonitor) Init() error {
|
||||||
func (a *AzureMonitor) Connect() error {
|
|
||||||
a.cache = make(map[time.Time]map[uint64]*aggregate, 36)
|
a.cache = make(map[time.Time]map[uint64]*aggregate, 36)
|
||||||
|
|
||||||
if a.Timeout == 0 {
|
authorizer, err := auth.NewAuthorizerFromEnvironmentWithResource("https://monitoring.azure.com/")
|
||||||
a.Timeout = config.Duration(defaultRequestTimeout)
|
|
||||||
}
|
|
||||||
|
|
||||||
a.initHTTPClient()
|
|
||||||
|
|
||||||
var err error
|
|
||||||
var region string
|
|
||||||
var resourceID string
|
|
||||||
var endpointURL string
|
|
||||||
|
|
||||||
if a.Region == "" || a.ResourceID == "" {
|
|
||||||
// Pull region and resource identifier
|
|
||||||
region, resourceID, err = vmInstanceMetadata(a.client)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if a.Region != "" {
|
|
||||||
region = a.Region
|
|
||||||
}
|
|
||||||
if a.ResourceID != "" {
|
|
||||||
resourceID = a.ResourceID
|
|
||||||
}
|
|
||||||
if a.EndpointURL != "" {
|
|
||||||
endpointURL = a.EndpointURL
|
|
||||||
}
|
|
||||||
|
|
||||||
if resourceID == "" {
|
|
||||||
return errors.New("no resource ID configured or available via VM instance metadata")
|
|
||||||
} else if region == "" {
|
|
||||||
return errors.New("no region configured or available via VM instance metadata")
|
|
||||||
}
|
|
||||||
|
|
||||||
if endpointURL == "" {
|
|
||||||
a.url = fmt.Sprintf(urlTemplate, region, resourceID)
|
|
||||||
} else {
|
|
||||||
a.url = fmt.Sprintf(urlOverrideTemplate, endpointURL, resourceID)
|
|
||||||
}
|
|
||||||
|
|
||||||
a.Log.Debugf("Writing to Azure Monitor URL: %s", a.url)
|
|
||||||
|
|
||||||
a.auth, err = auth.NewAuthorizerFromEnvironmentWithResource(defaultAuthResource)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("creating authorizer failed: %w", err)
|
||||||
}
|
}
|
||||||
|
a.preparer = autorest.CreatePreparer(authorizer.WithAuthorization())
|
||||||
a.Reset()
|
|
||||||
|
|
||||||
tags := map[string]string{
|
|
||||||
"region": region,
|
|
||||||
"resource_id": resourceID,
|
|
||||||
}
|
|
||||||
a.MetricOutsideWindow = selfstat.Register("azure_monitor", "metric_outside_window", tags)
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *AzureMonitor) initHTTPClient() {
|
func (a *AzureMonitor) Connect() error {
|
||||||
|
// If information is missing try to retrieve it from the Azure VM instance
|
||||||
|
if a.Region == "" || a.ResourceID == "" {
|
||||||
|
region, resourceID, err := vmInstanceMetadata(a.client)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("getting VM metadata failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if a.Region == "" {
|
||||||
|
a.Region = region
|
||||||
|
}
|
||||||
|
|
||||||
|
if a.ResourceID == "" {
|
||||||
|
a.ResourceID = resourceID
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if a.ResourceID == "" {
|
||||||
|
return errors.New("no resource ID configured or available via VM instance metadata")
|
||||||
|
}
|
||||||
|
|
||||||
|
if a.EndpointURL == "" {
|
||||||
|
if a.Region == "" {
|
||||||
|
return errors.New("no region configured or available via VM instance metadata")
|
||||||
|
}
|
||||||
|
a.url = fmt.Sprintf("https://%s.monitoring.azure.com%s/metrics", a.Region, a.ResourceID)
|
||||||
|
} else {
|
||||||
|
a.url = a.EndpointURL + a.ResourceID + "/metrics"
|
||||||
|
}
|
||||||
|
|
||||||
|
a.MetricOutsideWindow = selfstat.Register(
|
||||||
|
"azure_monitor",
|
||||||
|
"metric_outside_window",
|
||||||
|
map[string]string{
|
||||||
|
"region": a.Region,
|
||||||
|
"resource_id": a.ResourceID,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
a.Log.Debugf("Writing to Azure Monitor URL: %s", a.url)
|
||||||
|
|
||||||
a.client = &http.Client{
|
a.client = &http.Client{
|
||||||
Transport: &http.Transport{
|
Transport: &http.Transport{
|
||||||
Proxy: http.ProxyFromEnvironment,
|
Proxy: http.ProxyFromEnvironment,
|
||||||
},
|
},
|
||||||
Timeout: time.Duration(a.Timeout),
|
Timeout: time.Duration(a.Timeout),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
a.Reset()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close shuts down an any active connections
|
||||||
|
func (a *AzureMonitor) Close() error {
|
||||||
|
a.client.CloseIdleConnections()
|
||||||
|
a.client = nil
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add will append a metric to the output aggregate
|
||||||
|
func (a *AzureMonitor) Add(m telegraf.Metric) {
|
||||||
|
// Azure Monitor only supports aggregates 30 minutes into the past and 4
|
||||||
|
// minutes into the future. Future metrics are dropped when pushed.
|
||||||
|
tbucket := m.Time().Truncate(time.Minute)
|
||||||
|
if tbucket.Before(a.timeFunc().Add(-30 * time.Minute)) {
|
||||||
|
a.MetricOutsideWindow.Incr(1)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Azure Monitor doesn't have a string value type, so convert string fields
|
||||||
|
// to dimensions (a.k.a. tags) if enabled.
|
||||||
|
if a.StringsAsDimensions {
|
||||||
|
for _, f := range m.FieldList() {
|
||||||
|
if v, ok := f.Value.(string); ok {
|
||||||
|
m.AddTag(f.Key, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, f := range m.FieldList() {
|
||||||
|
fv, err := internal.ToFloat64(f.Value)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Azure Monitor does not support fields so the field name is appended
|
||||||
|
// to the metric name.
|
||||||
|
sanitizeKey := invalidNameCharRE.ReplaceAllString(f.Key, "_")
|
||||||
|
name := m.Name() + "-" + sanitizeKey
|
||||||
|
id := hashIDWithField(m.HashID(), f.Key)
|
||||||
|
|
||||||
|
// Create the time bucket if doesn't exist
|
||||||
|
if _, ok := a.cache[tbucket]; !ok {
|
||||||
|
a.cache[tbucket] = make(map[uint64]*aggregate)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch existing aggregate
|
||||||
|
agg, ok := a.cache[tbucket][id]
|
||||||
|
if !ok {
|
||||||
|
dimensions := make([]dimension, 0, len(m.TagList()))
|
||||||
|
for _, tag := range m.TagList() {
|
||||||
|
dimensions = append(dimensions, dimension{
|
||||||
|
name: tag.Key,
|
||||||
|
value: tag.Value,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
a.cache[tbucket][id] = &aggregate{
|
||||||
|
name: name,
|
||||||
|
dimensions: dimensions,
|
||||||
|
min: fv,
|
||||||
|
max: fv,
|
||||||
|
sum: fv,
|
||||||
|
count: 1,
|
||||||
|
updated: true,
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if fv < agg.min {
|
||||||
|
agg.min = fv
|
||||||
|
}
|
||||||
|
if fv > agg.max {
|
||||||
|
agg.max = fv
|
||||||
|
}
|
||||||
|
agg.sum += fv
|
||||||
|
agg.count++
|
||||||
|
agg.updated = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Push sends metrics to the output metric buffer
|
||||||
|
func (a *AzureMonitor) Push() []telegraf.Metric {
|
||||||
|
var metrics []telegraf.Metric
|
||||||
|
for tbucket, aggs := range a.cache {
|
||||||
|
// Do not send metrics early
|
||||||
|
if tbucket.After(a.timeFunc().Add(-time.Minute)) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, agg := range aggs {
|
||||||
|
// Only send aggregates that have had an update since the last push.
|
||||||
|
if !agg.updated {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
tags := make(map[string]string, len(agg.dimensions))
|
||||||
|
for _, tag := range agg.dimensions {
|
||||||
|
tags[tag.name] = tag.value
|
||||||
|
}
|
||||||
|
|
||||||
|
m := metric.New(agg.name,
|
||||||
|
tags,
|
||||||
|
map[string]interface{}{
|
||||||
|
"min": agg.min,
|
||||||
|
"max": agg.max,
|
||||||
|
"sum": agg.sum,
|
||||||
|
"count": agg.count,
|
||||||
|
},
|
||||||
|
tbucket,
|
||||||
|
)
|
||||||
|
|
||||||
|
metrics = append(metrics, m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return metrics
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset clears the cache of aggregate metrics
|
||||||
|
func (a *AzureMonitor) Reset() {
|
||||||
|
for tbucket := range a.cache {
|
||||||
|
// Remove aggregates older than 30 minutes
|
||||||
|
if tbucket.Before(a.timeFunc().Add(-30 * time.Minute)) {
|
||||||
|
delete(a.cache, tbucket)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Metrics updated within the latest 1m have not been pushed and should
|
||||||
|
// not be cleared.
|
||||||
|
if tbucket.After(a.timeFunc().Add(-1 * time.Minute)) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for id := range a.cache[tbucket] {
|
||||||
|
a.cache[tbucket][id].updated = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write writes metrics to the remote endpoint
|
||||||
|
func (a *AzureMonitor) Write(metrics []telegraf.Metric) error {
|
||||||
|
azmetrics := make(map[uint64]*azureMonitorMetric, len(metrics))
|
||||||
|
for _, m := range metrics {
|
||||||
|
amm, err := translate(m, a.NamespacePrefix)
|
||||||
|
if err != nil {
|
||||||
|
a.Log.Errorf("Could not create azure metric for %q; discarding point", m.Name())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
id := hashIDWithTagKeysOnly(m)
|
||||||
|
if azm, ok := azmetrics[id]; !ok {
|
||||||
|
azmetrics[id] = amm
|
||||||
|
} else {
|
||||||
|
azmetrics[id].Data.BaseData.Series = append(
|
||||||
|
azm.Data.BaseData.Series,
|
||||||
|
amm.Data.BaseData.Series...,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(azmetrics) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var buffer bytes.Buffer
|
||||||
|
buffer.Grow(maxRequestBodySize)
|
||||||
|
for _, m := range azmetrics {
|
||||||
|
// Azure Monitor accepts new batches of points in new-line delimited
|
||||||
|
// JSON, following RFC 4288 (see https://github.com/ndjson/ndjson-spec).
|
||||||
|
buf, err := json.Marshal(m)
|
||||||
|
if err != nil {
|
||||||
|
a.Log.Errorf("Could not marshall metric to JSON: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Azure Monitor's maximum request body size of 4MB. Send batches that
|
||||||
|
// exceed this size via separate write requests.
|
||||||
|
if buffer.Len()+len(buf)+1 > maxRequestBodySize {
|
||||||
|
if err := a.send(buffer.Bytes()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
buffer.Reset()
|
||||||
|
}
|
||||||
|
if _, err := buffer.Write(buf); err != nil {
|
||||||
|
return fmt.Errorf("writing to buffer failed: %w", err)
|
||||||
|
}
|
||||||
|
if err := buffer.WriteByte('\n'); err != nil {
|
||||||
|
return fmt.Errorf("writing to buffer failed: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return a.send(buffer.Bytes())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *AzureMonitor) send(body []byte) error {
|
||||||
|
var buf bytes.Buffer
|
||||||
|
g := gzip.NewWriter(&buf)
|
||||||
|
if _, err := g.Write(body); err != nil {
|
||||||
|
return fmt.Errorf("zipping content failed: %w", err)
|
||||||
|
}
|
||||||
|
if err := g.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := http.NewRequest("POST", a.url, &buf)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("creating request failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
req.Header.Set("Content-Encoding", "gzip")
|
||||||
|
req.Header.Set("Content-Type", "application/x-ndjson")
|
||||||
|
|
||||||
|
// Add the authorization header. WithAuthorization will automatically
|
||||||
|
// refresh the token if needed.
|
||||||
|
req, err = a.preparer.Prepare(req)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to fetch authentication credentials: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := a.client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, context.DeadlineExceeded) {
|
||||||
|
a.client.CloseIdleConnections()
|
||||||
|
a.client = &http.Client{
|
||||||
|
Transport: &http.Transport{
|
||||||
|
Proxy: http.ProxyFromEnvironment,
|
||||||
|
},
|
||||||
|
Timeout: time.Duration(a.Timeout),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode >= 200 && resp.StatusCode <= 299 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if respbody, err := io.ReadAll(resp.Body); err == nil {
|
||||||
|
return fmt.Errorf("failed to write batch: [%d] %s: %s", resp.StatusCode, resp.Status, string(respbody))
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("failed to write batch: [%d] %s", resp.StatusCode, resp.Status)
|
||||||
}
|
}
|
||||||
|
|
||||||
// vmMetadata retrieves metadata about the current Azure VM
|
// vmMetadata retrieves metadata about the current Azure VM
|
||||||
|
|
@ -217,131 +415,15 @@ func vmInstanceMetadata(c *http.Client) (region, resourceID string, err error) {
|
||||||
return region, resourceID, nil
|
return region, resourceID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close shuts down an any active connections
|
func hashIDWithField(id uint64, fk string) uint64 {
|
||||||
func (a *AzureMonitor) Close() error {
|
h := fnv.New64a()
|
||||||
a.client = nil
|
b := make([]byte, binary.MaxVarintLen64)
|
||||||
return nil
|
n := binary.PutUvarint(b, id)
|
||||||
}
|
h.Write(b[:n])
|
||||||
|
h.Write([]byte("\n"))
|
||||||
type azureMonitorMetric struct {
|
h.Write([]byte(fk))
|
||||||
Time time.Time `json:"time"`
|
h.Write([]byte("\n"))
|
||||||
Data *azureMonitorData `json:"data"`
|
return h.Sum64()
|
||||||
}
|
|
||||||
|
|
||||||
type azureMonitorData struct {
|
|
||||||
BaseData *azureMonitorBaseData `json:"baseData"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type azureMonitorBaseData struct {
|
|
||||||
Metric string `json:"metric"`
|
|
||||||
Namespace string `json:"namespace"`
|
|
||||||
DimensionNames []string `json:"dimNames"`
|
|
||||||
Series []*azureMonitorSeries `json:"series"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type azureMonitorSeries struct {
|
|
||||||
DimensionValues []string `json:"dimValues"`
|
|
||||||
Min float64 `json:"min"`
|
|
||||||
Max float64 `json:"max"`
|
|
||||||
Sum float64 `json:"sum"`
|
|
||||||
Count int64 `json:"count"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write writes metrics to the remote endpoint
|
|
||||||
func (a *AzureMonitor) Write(metrics []telegraf.Metric) error {
|
|
||||||
azmetrics := make(map[uint64]*azureMonitorMetric, len(metrics))
|
|
||||||
for _, m := range metrics {
|
|
||||||
id := hashIDWithTagKeysOnly(m)
|
|
||||||
if azm, ok := azmetrics[id]; !ok {
|
|
||||||
amm, err := translate(m, a.NamespacePrefix)
|
|
||||||
if err != nil {
|
|
||||||
a.Log.Errorf("Could not create azure metric for %q; discarding point", m.Name())
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
azmetrics[id] = amm
|
|
||||||
} else {
|
|
||||||
amm, err := translate(m, a.NamespacePrefix)
|
|
||||||
if err != nil {
|
|
||||||
a.Log.Errorf("Could not create azure metric for %q; discarding point", m.Name())
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
azmetrics[id].Data.BaseData.Series = append(
|
|
||||||
azm.Data.BaseData.Series,
|
|
||||||
amm.Data.BaseData.Series...,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(azmetrics) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var body []byte //nolint:prealloc // There is no point in guessing the final capacity of this slice
|
|
||||||
for _, m := range azmetrics {
|
|
||||||
// Azure Monitor accepts new batches of points in new-line delimited
|
|
||||||
// JSON, following RFC 4288 (see https://github.com/ndjson/ndjson-spec).
|
|
||||||
jsonBytes, err := json.Marshal(m)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// Azure Monitor's maximum request body size of 4MB. Send batches that
|
|
||||||
// exceed this size via separate write requests.
|
|
||||||
if (len(body) + len(jsonBytes) + 1) > maxRequestBodySize {
|
|
||||||
err := a.send(body)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
body = nil
|
|
||||||
}
|
|
||||||
body = append(body, jsonBytes...)
|
|
||||||
body = append(body, '\n')
|
|
||||||
}
|
|
||||||
|
|
||||||
return a.send(body)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *AzureMonitor) send(body []byte) error {
|
|
||||||
var buf bytes.Buffer
|
|
||||||
g := gzip.NewWriter(&buf)
|
|
||||||
if _, err := g.Write(body); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := g.Close(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
req, err := http.NewRequest("POST", a.url, &buf)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
req.Header.Set("Content-Encoding", "gzip")
|
|
||||||
req.Header.Set("Content-Type", "application/x-ndjson")
|
|
||||||
|
|
||||||
// Add the authorization header. WithAuthorization will automatically
|
|
||||||
// refresh the token if needed.
|
|
||||||
req, err = autorest.CreatePreparer(a.auth.WithAuthorization()).Prepare(req)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("unable to fetch authentication credentials: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
resp, err := a.client.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
if errors.Is(err, context.DeadlineExceeded) {
|
|
||||||
a.initHTTPClient()
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
respbody, err := io.ReadAll(resp.Body)
|
|
||||||
if err != nil || resp.StatusCode < 200 || resp.StatusCode > 299 {
|
|
||||||
return fmt.Errorf("failed to write batch: [%v] %s: %s", resp.StatusCode, resp.Status, string(respbody))
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func hashIDWithTagKeysOnly(m telegraf.Metric) uint64 {
|
func hashIDWithTagKeysOnly(m telegraf.Metric) uint64 {
|
||||||
|
|
@ -451,175 +533,12 @@ func getIntField(m telegraf.Metric, key string) (int64, error) {
|
||||||
}
|
}
|
||||||
return 0, fmt.Errorf("unexpected type: %s: %T", key, fv)
|
return 0, fmt.Errorf("unexpected type: %s: %T", key, fv)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add will append a metric to the output aggregate
|
|
||||||
func (a *AzureMonitor) Add(m telegraf.Metric) {
|
|
||||||
// Azure Monitor only supports aggregates 30 minutes into the past and 4
|
|
||||||
// minutes into the future. Future metrics are dropped when pushed.
|
|
||||||
t := m.Time()
|
|
||||||
tbucket := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), 0, 0, t.Location())
|
|
||||||
if tbucket.Before(a.timeFunc().Add(-time.Minute * 30)) {
|
|
||||||
a.MetricOutsideWindow.Incr(1)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Azure Monitor doesn't have a string value type, so convert string fields
|
|
||||||
// to dimensions (a.k.a. tags) if enabled.
|
|
||||||
if a.StringsAsDimensions {
|
|
||||||
for _, f := range m.FieldList() {
|
|
||||||
if v, ok := f.Value.(string); ok {
|
|
||||||
m.AddTag(f.Key, v)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, f := range m.FieldList() {
|
|
||||||
fv, ok := convert(f.Value)
|
|
||||||
if !ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Azure Monitor does not support fields so the field name is appended
|
|
||||||
// to the metric name.
|
|
||||||
name := m.Name() + "-" + sanitize(f.Key)
|
|
||||||
id := hashIDWithField(m.HashID(), f.Key)
|
|
||||||
|
|
||||||
_, ok = a.cache[tbucket]
|
|
||||||
if !ok {
|
|
||||||
// Time bucket does not exist and needs to be created.
|
|
||||||
a.cache[tbucket] = make(map[uint64]*aggregate)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fetch existing aggregate
|
|
||||||
var agg *aggregate
|
|
||||||
agg, ok = a.cache[tbucket][id]
|
|
||||||
if !ok {
|
|
||||||
agg := &aggregate{
|
|
||||||
name: name,
|
|
||||||
min: fv,
|
|
||||||
max: fv,
|
|
||||||
sum: fv,
|
|
||||||
count: 1,
|
|
||||||
}
|
|
||||||
for _, tag := range m.TagList() {
|
|
||||||
dim := dimension{
|
|
||||||
name: tag.Key,
|
|
||||||
value: tag.Value,
|
|
||||||
}
|
|
||||||
agg.dimensions = append(agg.dimensions, dim)
|
|
||||||
}
|
|
||||||
agg.updated = true
|
|
||||||
a.cache[tbucket][id] = agg
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if fv < agg.min {
|
|
||||||
agg.min = fv
|
|
||||||
}
|
|
||||||
if fv > agg.max {
|
|
||||||
agg.max = fv
|
|
||||||
}
|
|
||||||
agg.sum += fv
|
|
||||||
agg.count++
|
|
||||||
agg.updated = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func convert(in interface{}) (float64, bool) {
|
|
||||||
switch v := in.(type) {
|
|
||||||
case int64:
|
|
||||||
return float64(v), true
|
|
||||||
case uint64:
|
|
||||||
return float64(v), true
|
|
||||||
case float64:
|
|
||||||
return v, true
|
|
||||||
case bool:
|
|
||||||
if v {
|
|
||||||
return 1, true
|
|
||||||
}
|
|
||||||
return 0, true
|
|
||||||
default:
|
|
||||||
return 0, false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
|
|
||||||
|
|
||||||
func sanitize(value string) string {
|
|
||||||
return invalidNameCharRE.ReplaceAllString(value, "_")
|
|
||||||
}
|
|
||||||
|
|
||||||
func hashIDWithField(id uint64, fk string) uint64 {
|
|
||||||
h := fnv.New64a()
|
|
||||||
b := make([]byte, binary.MaxVarintLen64)
|
|
||||||
n := binary.PutUvarint(b, id)
|
|
||||||
h.Write(b[:n])
|
|
||||||
h.Write([]byte("\n"))
|
|
||||||
h.Write([]byte(fk))
|
|
||||||
h.Write([]byte("\n"))
|
|
||||||
return h.Sum64()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Push sends metrics to the output metric buffer
|
|
||||||
func (a *AzureMonitor) Push() []telegraf.Metric {
|
|
||||||
var metrics []telegraf.Metric
|
|
||||||
for tbucket, aggs := range a.cache {
|
|
||||||
// Do not send metrics early
|
|
||||||
if tbucket.After(a.timeFunc().Add(-time.Minute)) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
for _, agg := range aggs {
|
|
||||||
// Only send aggregates that have had an update since the last push.
|
|
||||||
if !agg.updated {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
tags := make(map[string]string, len(agg.dimensions))
|
|
||||||
for _, tag := range agg.dimensions {
|
|
||||||
tags[tag.name] = tag.value
|
|
||||||
}
|
|
||||||
|
|
||||||
m := metric.New(agg.name,
|
|
||||||
tags,
|
|
||||||
map[string]interface{}{
|
|
||||||
"min": agg.min,
|
|
||||||
"max": agg.max,
|
|
||||||
"sum": agg.sum,
|
|
||||||
"count": agg.count,
|
|
||||||
},
|
|
||||||
tbucket,
|
|
||||||
)
|
|
||||||
|
|
||||||
metrics = append(metrics, m)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return metrics
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reset clears the cache of aggregate metrics
|
|
||||||
func (a *AzureMonitor) Reset() {
|
|
||||||
for tbucket := range a.cache {
|
|
||||||
// Remove aggregates older than 30 minutes
|
|
||||||
if tbucket.Before(a.timeFunc().Add(-time.Minute * 30)) {
|
|
||||||
delete(a.cache, tbucket)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// Metrics updated within the latest 1m have not been pushed and should
|
|
||||||
// not be cleared.
|
|
||||||
if tbucket.After(a.timeFunc().Add(-time.Minute)) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
for id := range a.cache[tbucket] {
|
|
||||||
a.cache[tbucket][id].updated = false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
outputs.Add("azure_monitor", func() telegraf.Output {
|
outputs.Add("azure_monitor", func() telegraf.Output {
|
||||||
return &AzureMonitor{
|
return &AzureMonitor{
|
||||||
|
NamespacePrefix: "Telegraf/",
|
||||||
|
Timeout: config.Duration(5 * time.Second),
|
||||||
timeFunc: time.Now,
|
timeFunc: time.Now,
|
||||||
NamespacePrefix: defaultNamespacePrefix,
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -182,7 +182,9 @@ func TestAggregate(t *testing.T) {
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
timeFunc: func() time.Time { return tt.addTime },
|
timeFunc: func() time.Time { return tt.addTime },
|
||||||
}
|
}
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
require.NoError(t, plugin.Connect())
|
require.NoError(t, plugin.Connect())
|
||||||
|
defer plugin.Close()
|
||||||
|
|
||||||
// Reset statistics
|
// Reset statistics
|
||||||
plugin.MetricOutsideWindow.Set(0)
|
plugin.MetricOutsideWindow.Set(0)
|
||||||
|
|
@ -319,10 +321,15 @@ func TestWrite(t *testing.T) {
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
timeFunc: func() time.Time { return time.Unix(120, 0) },
|
timeFunc: func() time.Time { return time.Unix(120, 0) },
|
||||||
}
|
}
|
||||||
require.NoError(t, plugin.Connect())
|
require.NoError(t, plugin.Init())
|
||||||
|
|
||||||
// Override with testing setup
|
// Override with testing setup
|
||||||
plugin.auth = autorest.NullAuthorizer{}
|
plugin.preparer = autorest.CreatePreparer(autorest.NullAuthorizer{}.WithAuthorization())
|
||||||
|
require.NoError(t, plugin.Connect())
|
||||||
|
defer plugin.Close()
|
||||||
|
|
||||||
|
// Override with testing setup
|
||||||
|
plugin.preparer = autorest.CreatePreparer(autorest.NullAuthorizer{}.WithAuthorization())
|
||||||
|
|
||||||
err := plugin.Write(tt.metrics)
|
err := plugin.Write(tt.metrics)
|
||||||
if tt.errmsg != "" {
|
if tt.errmsg != "" {
|
||||||
|
|
@ -563,10 +570,12 @@ func TestWriteTimelimits(t *testing.T) {
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
timeFunc: func() time.Time { return tref },
|
timeFunc: func() time.Time { return tref },
|
||||||
}
|
}
|
||||||
require.NoError(t, plugin.Connect())
|
require.NoError(t, plugin.Init())
|
||||||
|
|
||||||
// Override with testing setup
|
// Override with testing setup
|
||||||
plugin.auth = autorest.NullAuthorizer{}
|
plugin.preparer = autorest.CreatePreparer(autorest.NullAuthorizer{}.WithAuthorization())
|
||||||
|
require.NoError(t, plugin.Connect())
|
||||||
|
defer plugin.Close()
|
||||||
|
|
||||||
// Test writing
|
// Test writing
|
||||||
err := plugin.Write(tt.input)
|
err := plugin.Write(tt.input)
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,59 @@
|
||||||
|
package azure_monitor
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type azureMonitorMetric struct {
|
||||||
|
Time time.Time `json:"time"`
|
||||||
|
Data *azureMonitorData `json:"data"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type azureMonitorData struct {
|
||||||
|
BaseData *azureMonitorBaseData `json:"baseData"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type azureMonitorBaseData struct {
|
||||||
|
Metric string `json:"metric"`
|
||||||
|
Namespace string `json:"namespace"`
|
||||||
|
DimensionNames []string `json:"dimNames"`
|
||||||
|
Series []*azureMonitorSeries `json:"series"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type azureMonitorSeries struct {
|
||||||
|
DimensionValues []string `json:"dimValues"`
|
||||||
|
Min float64 `json:"min"`
|
||||||
|
Max float64 `json:"max"`
|
||||||
|
Sum float64 `json:"sum"`
|
||||||
|
Count int64 `json:"count"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// VirtualMachineMetadata contains information about a VM from the metadata service
|
||||||
|
type virtualMachineMetadata struct {
|
||||||
|
Compute struct {
|
||||||
|
Location string `json:"location"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
ResourceGroupName string `json:"resourceGroupName"`
|
||||||
|
SubscriptionID string `json:"subscriptionId"`
|
||||||
|
VMScaleSetName string `json:"vmScaleSetName"`
|
||||||
|
} `json:"compute"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *virtualMachineMetadata) ResourceID() string {
|
||||||
|
if m.Compute.VMScaleSetName != "" {
|
||||||
|
return fmt.Sprintf(
|
||||||
|
resourceIDScaleSetTemplate,
|
||||||
|
m.Compute.SubscriptionID,
|
||||||
|
m.Compute.ResourceGroupName,
|
||||||
|
m.Compute.VMScaleSetName,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Sprintf(
|
||||||
|
resourceIDTemplate,
|
||||||
|
m.Compute.SubscriptionID,
|
||||||
|
m.Compute.ResourceGroupName,
|
||||||
|
m.Compute.Name,
|
||||||
|
)
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue