2020-12-17 03:11:05 +08:00
|
|
|
package prometheusremotewrite
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"bytes"
|
|
|
|
|
"fmt"
|
|
|
|
|
"hash/fnv"
|
|
|
|
|
"sort"
|
|
|
|
|
"strconv"
|
|
|
|
|
"strings"
|
|
|
|
|
"time"
|
|
|
|
|
|
2021-03-26 00:18:50 +08:00
|
|
|
"github.com/golang/snappy"
|
2021-12-01 04:50:00 +08:00
|
|
|
"github.com/prometheus/prometheus/prompb"
|
2021-03-26 00:18:50 +08:00
|
|
|
|
2020-12-17 03:11:05 +08:00
|
|
|
"github.com/influxdata/telegraf"
|
2021-12-01 04:50:00 +08:00
|
|
|
"github.com/influxdata/telegraf/plugins/serializers/prometheus"
|
2020-12-17 03:11:05 +08:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type MetricKey uint64
|
|
|
|
|
|
|
|
|
|
// MetricSortOrder controls if the output is sorted.
|
|
|
|
|
type MetricSortOrder int
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
NoSortMetrics MetricSortOrder = iota
|
|
|
|
|
SortMetrics
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// StringHandling defines how to process string fields.
|
|
|
|
|
type StringHandling int
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
DiscardStrings StringHandling = iota
|
|
|
|
|
StringAsLabel
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type FormatConfig struct {
|
|
|
|
|
MetricSortOrder MetricSortOrder
|
|
|
|
|
StringHandling StringHandling
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type Serializer struct {
|
|
|
|
|
config FormatConfig
|
|
|
|
|
}
|
|
|
|
|
|
2022-11-09 03:04:12 +08:00
|
|
|
func NewSerializer(config FormatConfig) *Serializer {
|
|
|
|
|
return &Serializer{config: config}
|
2020-12-17 03:11:05 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
|
|
|
|
|
return s.SerializeBatch([]telegraf.Metric{metric})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
|
|
|
|
|
var buf bytes.Buffer
|
2021-03-26 00:18:50 +08:00
|
|
|
var entries = make(map[MetricKey]prompb.TimeSeries)
|
2020-12-17 03:11:05 +08:00
|
|
|
for _, metric := range metrics {
|
|
|
|
|
commonLabels := s.createLabels(metric)
|
|
|
|
|
var metrickey MetricKey
|
2021-03-26 00:18:50 +08:00
|
|
|
var promts prompb.TimeSeries
|
2020-12-17 03:11:05 +08:00
|
|
|
for _, field := range metric.FieldList() {
|
|
|
|
|
metricName := prometheus.MetricName(metric.Name(), field.Key, metric.Type())
|
|
|
|
|
metricName, ok := prometheus.SanitizeMetricName(metricName)
|
|
|
|
|
if !ok {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
switch metric.Type() {
|
|
|
|
|
case telegraf.Counter:
|
|
|
|
|
fallthrough
|
|
|
|
|
case telegraf.Gauge:
|
|
|
|
|
fallthrough
|
|
|
|
|
case telegraf.Untyped:
|
|
|
|
|
value, ok := prometheus.SampleValue(field.Value)
|
|
|
|
|
if !ok {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
metrickey, promts = getPromTS(metricName, commonLabels, value, metric.Time())
|
|
|
|
|
case telegraf.Histogram:
|
|
|
|
|
switch {
|
|
|
|
|
case strings.HasSuffix(field.Key, "_bucket"):
|
|
|
|
|
// if bucket only, init sum, count, inf
|
|
|
|
|
metrickeysum, promtssum := getPromTS(fmt.Sprintf("%s_sum", metricName), commonLabels, float64(0), metric.Time())
|
|
|
|
|
if _, ok = entries[metrickeysum]; !ok {
|
|
|
|
|
entries[metrickeysum] = promtssum
|
|
|
|
|
}
|
|
|
|
|
metrickeycount, promtscount := getPromTS(fmt.Sprintf("%s_count", metricName), commonLabels, float64(0), metric.Time())
|
|
|
|
|
if _, ok = entries[metrickeycount]; !ok {
|
|
|
|
|
entries[metrickeycount] = promtscount
|
|
|
|
|
}
|
2021-03-26 00:18:50 +08:00
|
|
|
labels := make([]prompb.Label, len(commonLabels), len(commonLabels)+1)
|
2020-12-17 03:11:05 +08:00
|
|
|
copy(labels, commonLabels)
|
2021-03-26 00:18:50 +08:00
|
|
|
labels = append(labels, prompb.Label{
|
2020-12-17 03:11:05 +08:00
|
|
|
Name: "le",
|
|
|
|
|
Value: "+Inf",
|
|
|
|
|
})
|
|
|
|
|
metrickeyinf, promtsinf := getPromTS(fmt.Sprintf("%s_bucket", metricName), labels, float64(0), metric.Time())
|
|
|
|
|
if _, ok = entries[metrickeyinf]; !ok {
|
|
|
|
|
entries[metrickeyinf] = promtsinf
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
le, ok := metric.GetTag("le")
|
|
|
|
|
if !ok {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
bound, err := strconv.ParseFloat(le, 64)
|
|
|
|
|
if err != nil {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
count, ok := prometheus.SampleCount(field.Value)
|
|
|
|
|
if !ok {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
2021-03-26 00:18:50 +08:00
|
|
|
labels = make([]prompb.Label, len(commonLabels), len(commonLabels)+1)
|
2020-12-17 03:11:05 +08:00
|
|
|
copy(labels, commonLabels)
|
2021-03-26 00:18:50 +08:00
|
|
|
labels = append(labels, prompb.Label{
|
2020-12-17 03:11:05 +08:00
|
|
|
Name: "le",
|
|
|
|
|
Value: fmt.Sprint(bound),
|
|
|
|
|
})
|
|
|
|
|
metrickey, promts = getPromTS(fmt.Sprintf("%s_bucket", metricName), labels, float64(count), metric.Time())
|
|
|
|
|
case strings.HasSuffix(field.Key, "_sum"):
|
|
|
|
|
sum, ok := prometheus.SampleSum(field.Value)
|
|
|
|
|
if !ok {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
metrickey, promts = getPromTS(fmt.Sprintf("%s_sum", metricName), commonLabels, sum, metric.Time())
|
|
|
|
|
case strings.HasSuffix(field.Key, "_count"):
|
|
|
|
|
count, ok := prometheus.SampleCount(field.Value)
|
|
|
|
|
if !ok {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// if no bucket generate +Inf entry
|
2021-03-26 00:18:50 +08:00
|
|
|
labels := make([]prompb.Label, len(commonLabels), len(commonLabels)+1)
|
2020-12-17 03:11:05 +08:00
|
|
|
copy(labels, commonLabels)
|
2021-03-26 00:18:50 +08:00
|
|
|
labels = append(labels, prompb.Label{
|
2020-12-17 03:11:05 +08:00
|
|
|
Name: "le",
|
|
|
|
|
Value: "+Inf",
|
|
|
|
|
})
|
|
|
|
|
metrickeyinf, promtsinf := getPromTS(fmt.Sprintf("%s_bucket", metricName), labels, float64(count), metric.Time())
|
|
|
|
|
if minf, ok := entries[metrickeyinf]; !ok || minf.Samples[0].Value == 0 {
|
|
|
|
|
entries[metrickeyinf] = promtsinf
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
metrickey, promts = getPromTS(fmt.Sprintf("%s_count", metricName), commonLabels, float64(count), metric.Time())
|
|
|
|
|
default:
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
case telegraf.Summary:
|
|
|
|
|
switch {
|
|
|
|
|
case strings.HasSuffix(field.Key, "_sum"):
|
|
|
|
|
sum, ok := prometheus.SampleSum(field.Value)
|
|
|
|
|
if !ok {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
metrickey, promts = getPromTS(fmt.Sprintf("%s_sum", metricName), commonLabels, sum, metric.Time())
|
|
|
|
|
case strings.HasSuffix(field.Key, "_count"):
|
|
|
|
|
count, ok := prometheus.SampleCount(field.Value)
|
|
|
|
|
if !ok {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
metrickey, promts = getPromTS(fmt.Sprintf("%s_count", metricName), commonLabels, float64(count), metric.Time())
|
|
|
|
|
default:
|
|
|
|
|
quantileTag, ok := metric.GetTag("quantile")
|
|
|
|
|
if !ok {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
quantile, err := strconv.ParseFloat(quantileTag, 64)
|
|
|
|
|
if err != nil {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
value, ok := prometheus.SampleValue(field.Value)
|
|
|
|
|
if !ok {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
2021-03-26 00:18:50 +08:00
|
|
|
labels := make([]prompb.Label, len(commonLabels), len(commonLabels)+1)
|
2020-12-17 03:11:05 +08:00
|
|
|
copy(labels, commonLabels)
|
2021-03-26 00:18:50 +08:00
|
|
|
labels = append(labels, prompb.Label{
|
2020-12-17 03:11:05 +08:00
|
|
|
Name: "quantile",
|
|
|
|
|
Value: fmt.Sprint(quantile),
|
|
|
|
|
})
|
|
|
|
|
metrickey, promts = getPromTS(metricName, labels, value, metric.Time())
|
|
|
|
|
}
|
|
|
|
|
default:
|
2021-02-17 07:19:50 +08:00
|
|
|
return nil, fmt.Errorf("unknown type %v", metric.Type())
|
2020-12-17 03:11:05 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// A batch of metrics can contain multiple values for a single
|
|
|
|
|
// Prometheus sample. If this metric is older than the existing
|
|
|
|
|
// sample then we can skip over it.
|
|
|
|
|
m, ok := entries[metrickey]
|
|
|
|
|
if ok {
|
2022-02-17 00:00:37 +08:00
|
|
|
if metric.Time().Before(time.Unix(0, m.Samples[0].Timestamp*1_000_000)) {
|
2020-12-17 03:11:05 +08:00
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
entries[metrickey] = promts
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2021-03-26 00:18:50 +08:00
|
|
|
var promTS = make([]prompb.TimeSeries, len(entries))
|
|
|
|
|
var i int
|
2020-12-17 03:11:05 +08:00
|
|
|
for _, promts := range entries {
|
|
|
|
|
promTS[i] = promts
|
|
|
|
|
i++
|
|
|
|
|
}
|
|
|
|
|
|
2021-12-01 04:50:00 +08:00
|
|
|
if s.config.MetricSortOrder == SortMetrics {
|
2020-12-17 03:11:05 +08:00
|
|
|
sort.Slice(promTS, func(i, j int) bool {
|
|
|
|
|
lhs := promTS[i].Labels
|
|
|
|
|
rhs := promTS[j].Labels
|
|
|
|
|
if len(lhs) != len(rhs) {
|
|
|
|
|
return len(lhs) < len(rhs)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for index := range lhs {
|
|
|
|
|
l := lhs[index]
|
|
|
|
|
r := rhs[index]
|
|
|
|
|
|
|
|
|
|
if l.Name != r.Name {
|
|
|
|
|
return l.Name < r.Name
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if l.Value != r.Value {
|
|
|
|
|
return l.Value < r.Value
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return false
|
|
|
|
|
})
|
|
|
|
|
}
|
2021-11-09 22:51:14 +08:00
|
|
|
pb := &prompb.WriteRequest{Timeseries: promTS}
|
|
|
|
|
data, err := pb.Marshal()
|
2020-12-17 03:11:05 +08:00
|
|
|
if err != nil {
|
2023-02-22 19:08:46 +08:00
|
|
|
return nil, fmt.Errorf("unable to marshal protobuf: %w", err)
|
2020-12-17 03:11:05 +08:00
|
|
|
}
|
|
|
|
|
encoded := snappy.Encode(nil, data)
|
2023-04-03 21:19:43 +08:00
|
|
|
buf.Write(encoded)
|
2020-12-17 03:11:05 +08:00
|
|
|
return buf.Bytes(), nil
|
|
|
|
|
}
|
|
|
|
|
|
2021-03-26 00:18:50 +08:00
|
|
|
func hasLabel(name string, labels []prompb.Label) bool {
|
2020-12-17 03:11:05 +08:00
|
|
|
for _, label := range labels {
|
|
|
|
|
if name == label.Name {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
2021-03-26 00:18:50 +08:00
|
|
|
func (s *Serializer) createLabels(metric telegraf.Metric) []prompb.Label {
|
|
|
|
|
labels := make([]prompb.Label, 0, len(metric.TagList()))
|
2020-12-17 03:11:05 +08:00
|
|
|
for _, tag := range metric.TagList() {
|
|
|
|
|
// Ignore special tags for histogram and summary types.
|
|
|
|
|
switch metric.Type() {
|
|
|
|
|
case telegraf.Histogram:
|
|
|
|
|
if tag.Key == "le" {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
case telegraf.Summary:
|
|
|
|
|
if tag.Key == "quantile" {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
name, ok := prometheus.SanitizeLabelName(tag.Key)
|
|
|
|
|
if !ok {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
2021-04-27 23:01:28 +08:00
|
|
|
// remove tags with empty values
|
|
|
|
|
if tag.Value == "" {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
2021-03-26 00:18:50 +08:00
|
|
|
labels = append(labels, prompb.Label{Name: name, Value: tag.Value})
|
2020-12-17 03:11:05 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if s.config.StringHandling != StringAsLabel {
|
|
|
|
|
return labels
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
addedFieldLabel := false
|
|
|
|
|
for _, field := range metric.FieldList() {
|
|
|
|
|
value, ok := field.Value.(string)
|
|
|
|
|
if !ok {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
name, ok := prometheus.SanitizeLabelName(field.Key)
|
|
|
|
|
if !ok {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// If there is a tag with the same name as the string field, discard
|
|
|
|
|
// the field and use the tag instead.
|
|
|
|
|
if hasLabel(name, labels) {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
2021-03-26 00:18:50 +08:00
|
|
|
labels = append(labels, prompb.Label{Name: name, Value: value})
|
2020-12-17 03:11:05 +08:00
|
|
|
addedFieldLabel = true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if addedFieldLabel {
|
|
|
|
|
sort.Slice(labels, func(i, j int) bool {
|
|
|
|
|
return labels[i].Name < labels[j].Name
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return labels
|
|
|
|
|
}
|
|
|
|
|
|
2021-03-26 00:18:50 +08:00
|
|
|
func MakeMetricKey(labels []prompb.Label) MetricKey {
|
2020-12-17 03:11:05 +08:00
|
|
|
h := fnv.New64a()
|
|
|
|
|
for _, label := range labels {
|
2023-04-03 21:19:43 +08:00
|
|
|
h.Write([]byte(label.Name))
|
|
|
|
|
h.Write([]byte("\x00"))
|
|
|
|
|
h.Write([]byte(label.Value))
|
|
|
|
|
h.Write([]byte("\x00"))
|
2020-12-17 03:11:05 +08:00
|
|
|
}
|
|
|
|
|
return MetricKey(h.Sum64())
|
|
|
|
|
}
|
|
|
|
|
|
2021-03-26 00:18:50 +08:00
|
|
|
func getPromTS(name string, labels []prompb.Label, value float64, ts time.Time) (MetricKey, prompb.TimeSeries) {
|
2020-12-17 03:11:05 +08:00
|
|
|
sample := []prompb.Sample{{
|
|
|
|
|
// Timestamp is int milliseconds for remote write.
|
|
|
|
|
Timestamp: ts.UnixNano() / int64(time.Millisecond),
|
|
|
|
|
Value: value,
|
|
|
|
|
}}
|
2021-03-26 00:18:50 +08:00
|
|
|
labelscopy := make([]prompb.Label, len(labels), len(labels)+1)
|
2020-12-17 03:11:05 +08:00
|
|
|
copy(labelscopy, labels)
|
2021-03-26 00:18:50 +08:00
|
|
|
labels = append(labelscopy, prompb.Label{
|
2020-12-17 03:11:05 +08:00
|
|
|
Name: "__name__",
|
|
|
|
|
Value: name,
|
|
|
|
|
})
|
2022-09-02 23:08:57 +08:00
|
|
|
|
|
|
|
|
// we sort the labels since Prometheus TSDB does not like out of order labels
|
|
|
|
|
sort.Slice(labels, func(i, j int) bool {
|
|
|
|
|
return labels[i].Name < labels[j].Name
|
|
|
|
|
})
|
|
|
|
|
|
2021-03-26 00:18:50 +08:00
|
|
|
return MakeMetricKey(labels), prompb.TimeSeries{Labels: labels, Samples: sample}
|
2020-12-17 03:11:05 +08:00
|
|
|
}
|