diff --git a/go.mod b/go.mod index 10cd98877..cc3692872 100644 --- a/go.mod +++ b/go.mod @@ -130,7 +130,6 @@ require ( github.com/linkedin/goavro/v2 v2.12.0 github.com/logzio/azure-monitor-metrics-receiver v1.0.1 github.com/lxc/lxd v0.0.0-20220920163450-e9b4b514106a - github.com/matttproud/golang_protobuf_extensions v1.0.4 github.com/mdlayher/apcupsd v0.0.0-20220319200143-473c7b5f3c6a github.com/mdlayher/vsock v1.2.1 github.com/microsoft/ApplicationInsights-Go v0.4.4 diff --git a/go.sum b/go.sum index 9d8bb53d5..5de2736b1 100644 --- a/go.sum +++ b/go.sum @@ -1761,8 +1761,6 @@ github.com/mattn/go-sqlite3 v1.14.14/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4 github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y= github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= -github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= -github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k= github.com/mdlayher/apcupsd v0.0.0-20220319200143-473c7b5f3c6a h1:JOlLsLUQnokTyWWwEvOVoKH3XUl6oDMP8jisO54l6J8= diff --git a/plugins/parsers/prometheus/common.go b/plugins/parsers/prometheus/common.go index 529e0c31b..458b92256 100644 --- a/plugins/parsers/prometheus/common.go +++ b/plugins/parsers/prometheus/common.go @@ -5,7 +5,7 @@ import ( dto "github.com/prometheus/client_model/go" ) -func ValueType(mt dto.MetricType) telegraf.ValueType { +func mapValueType(mt dto.MetricType) telegraf.ValueType { switch mt { case dto.MetricType_COUNTER: return telegraf.Counter @@ -20,7 +20,7 @@ func ValueType(mt dto.MetricType) telegraf.ValueType { } } -func GetTagsFromLabels(m *dto.Metric, defaultTags map[string]string) map[string]string { +func getTagsFromLabels(m *dto.Metric, defaultTags map[string]string) map[string]string { result := map[string]string{} for key, value := range defaultTags { diff --git a/plugins/parsers/prometheus/metric_v1.go b/plugins/parsers/prometheus/metric_v1.go new file mode 100644 index 000000000..e51d5ab07 --- /dev/null +++ b/plugins/parsers/prometheus/metric_v1.go @@ -0,0 +1,85 @@ +package prometheus + +import ( + "math" + "strconv" + "time" + + dto "github.com/prometheus/client_model/go" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" +) + +func (p *Parser) extractMetricsV1(prommetrics *dto.MetricFamily) []telegraf.Metric { + now := time.Now() + + // Convert each prometheus metrics to the corresponding telegraf metrics. + // You will get one telegraf metric with one field per prometheus metric + // for "simple" types like Gauge and Counter but a telegraf metric with + // multiple fields for "complex" types like Summary or Histogram. + var metrics []telegraf.Metric + metricName := prommetrics.GetName() + metricType := prommetrics.GetType() + for _, pm := range prommetrics.Metric { + // Extract the timestamp of the metric if it exists and should + // not be ignored. + t := now + if ts := pm.GetTimestampMs(); !p.IgnoreTimestamp && ts > 0 { + t = time.UnixMilli(ts) + } + + // Convert the labels to tags + tags := getTagsFromLabels(pm, p.DefaultTags) + + // Construct the metrics + switch metricType { + case dto.MetricType_SUMMARY: + summary := pm.GetSummary() + + // Collect the fields + fields := make(map[string]interface{}, len(summary.Quantile)+2) + fields["count"] = float64(summary.GetSampleCount()) + fields["sum"] = summary.GetSampleSum() + for _, q := range summary.Quantile { + if v := q.GetValue(); !math.IsNaN(v) { + fname := strconv.FormatFloat(q.GetQuantile(), 'g', -1, 64) + fields[fname] = v + } + } + metrics = append(metrics, metric.New(metricName, tags, fields, t, telegraf.Summary)) + case dto.MetricType_HISTOGRAM: + histogram := pm.GetHistogram() + + // Collect the fields + fields := make(map[string]interface{}, len(histogram.Bucket)+2) + fields["count"] = float64(pm.GetHistogram().GetSampleCount()) + fields["sum"] = pm.GetHistogram().GetSampleSum() + for _, b := range histogram.Bucket { + fname := strconv.FormatFloat(b.GetUpperBound(), 'g', -1, 64) + fields[fname] = float64(b.GetCumulativeCount()) + } + metrics = append(metrics, metric.New(metricName, tags, fields, t, telegraf.Histogram)) + default: + var fname string + var v float64 + if gauge := pm.GetGauge(); gauge != nil { + fname = "gauge" + v = gauge.GetValue() + } else if counter := pm.GetCounter(); counter != nil { + fname = "counter" + v = counter.GetValue() + } else if untyped := pm.GetUntyped(); untyped != nil { + fname = "value" + v = untyped.GetValue() + } + if fname != "" && !math.IsNaN(v) { + fields := map[string]interface{}{fname: v} + vtype := mapValueType(metricType) + metrics = append(metrics, metric.New(metricName, tags, fields, t, vtype)) + } + } + } + + return metrics +} diff --git a/plugins/parsers/prometheus/metric_v2.go b/plugins/parsers/prometheus/metric_v2.go new file mode 100644 index 000000000..565688660 --- /dev/null +++ b/plugins/parsers/prometheus/metric_v2.go @@ -0,0 +1,107 @@ +package prometheus + +import ( + "math" + "strconv" + "time" + + dto "github.com/prometheus/client_model/go" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" +) + +func (p *Parser) extractMetricsV2(prommetrics *dto.MetricFamily) []telegraf.Metric { + now := time.Now() + + // Convert each prometheus metric to a corresponding telegraf metric + // with one field each. The process will filter NaNs in values and skip + // the corresponding metrics. + var metrics []telegraf.Metric + metricName := prommetrics.GetName() + metricType := prommetrics.GetType() + for _, pm := range prommetrics.Metric { + // Extract the timestamp of the metric if it exists and should + // not be ignored. + t := now + if ts := pm.GetTimestampMs(); !p.IgnoreTimestamp && ts > 0 { + t = time.UnixMilli(ts) + } + + // Convert the labels to tags + tags := getTagsFromLabels(pm, p.DefaultTags) + + // Construct the metrics + switch metricType { + case dto.MetricType_SUMMARY: + summary := pm.GetSummary() + + // Add an overall metric containing the number of samples and and its sum + summaryFields := make(map[string]interface{}) + summaryFields[metricName+"_count"] = float64(summary.GetSampleCount()) + summaryFields[metricName+"_sum"] = summary.GetSampleSum() + metrics = append(metrics, metric.New("prometheus", tags, summaryFields, t, telegraf.Summary)) + + // Add one metric per quantile + for _, q := range summary.Quantile { + quantileTags := tags + quantileTags["quantile"] = strconv.FormatFloat(q.GetQuantile(), 'g', -1, 64) + quantileFields := map[string]interface{}{ + metricName: q.GetValue(), + } + m := metric.New("prometheus", quantileTags, quantileFields, t, telegraf.Summary) + metrics = append(metrics, m) + } + case dto.MetricType_HISTOGRAM: + histogram := pm.GetHistogram() + + // Add an overall metric containing the number of samples and and its sum + histFields := make(map[string]interface{}) + histFields[metricName+"_count"] = float64(histogram.GetSampleCount()) + histFields[metricName+"_sum"] = histogram.GetSampleSum() + metrics = append(metrics, metric.New("prometheus", tags, histFields, t, telegraf.Histogram)) + + // Add one metric per histogram bucket + var infSeen bool + for _, b := range histogram.Bucket { + bucketTags := tags + bucketTags["le"] = strconv.FormatFloat(b.GetUpperBound(), 'g', -1, 64) + bucketFields := map[string]interface{}{ + metricName + "_bucket": float64(b.GetCumulativeCount()), + } + m := metric.New("prometheus", bucketTags, bucketFields, t, telegraf.Histogram) + metrics = append(metrics, m) + + // Record if any of the buckets marks an infinite upper bound + infSeen = infSeen || math.IsInf(b.GetUpperBound(), +1) + } + + // Infinity bucket is required for proper function of histogram in prometheus + if !infSeen { + infTags := tags + infTags["le"] = "+Inf" + infFields := map[string]interface{}{ + metricName + "_bucket": float64(histogram.GetSampleCount()), + } + m := metric.New("prometheus", infTags, infFields, t, telegraf.Histogram) + metrics = append(metrics, m) + } + default: + v := math.Inf(1) + if gauge := pm.GetGauge(); gauge != nil { + v = gauge.GetValue() + } else if counter := pm.GetCounter(); counter != nil { + v = counter.GetValue() + } else if untyped := pm.GetUntyped(); untyped != nil { + v = untyped.GetValue() + } + if !math.IsNaN(v) { + fields := map[string]interface{}{metricName: v} + vtype := mapValueType(metricType) + metrics = append(metrics, metric.New("prometheus", tags, fields, t, vtype)) + } + } + } + + return metrics +} diff --git a/plugins/parsers/prometheus/parser.go b/plugins/parsers/prometheus/parser.go index 6a481f471..0466e4cb9 100644 --- a/plugins/parsers/prometheus/parser.go +++ b/plugins/parsers/prometheus/parser.go @@ -1,11 +1,16 @@ package prometheus import ( + "bytes" + "errors" "fmt" + "io" "net/http" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/parsers" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" ) type Parser struct { @@ -13,20 +18,67 @@ type Parser struct { MetricVersion int `toml:"prometheus_metric_version"` Header http.Header `toml:"-"` // set by the prometheus input DefaultTags map[string]string `toml:"-"` + Log telegraf.Logger `toml:"-"` } func (p *Parser) SetDefaultTags(tags map[string]string) { p.DefaultTags = tags } -func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { - switch p.MetricVersion { - case 0, 2: - return p.parseV2(buf) - case 1: - return p.parseV1(buf) +func (p *Parser) Parse(data []byte) ([]telegraf.Metric, error) { + // Make sure we have a finishing newline but no trailing one + data = bytes.TrimPrefix(data, []byte("\n")) + if !bytes.HasSuffix(data, []byte("\n")) { + data = append(data, []byte("\n")...) } - return nil, fmt.Errorf("unknown prometheus metric version %d", p.MetricVersion) + buf := bytes.NewBuffer(data) + + // Determine the metric transport-type derived from the response header and + // create a matching decoder. + format := expfmt.ResponseFormat(p.Header) + if format == expfmt.FmtUnknown { + p.Log.Warnf("Unknown format %q... Trying to continue...", p.Header.Get("Content-Type")) + } + decoder := expfmt.NewDecoder(buf, format) + + // Decode the input data into prometheus metrics + var metrics []telegraf.Metric + for { + var mf dto.MetricFamily + if err := decoder.Decode(&mf); err != nil { + if errors.Is(err, io.EOF) { + break + } + return nil, fmt.Errorf("decoding response failed: %w", err) + } + + switch p.MetricVersion { + case 0, 2: + metrics = append(metrics, p.extractMetricsV2(&mf)...) + case 1: + metrics = append(metrics, p.extractMetricsV1(&mf)...) + default: + return nil, fmt.Errorf("unknown prometheus metric version %d", p.MetricVersion) + } + } + return metrics, nil +} + +func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { + metrics, err := p.Parse([]byte(line)) + if err != nil { + return nil, err + } + + if len(metrics) < 1 { + return nil, fmt.Errorf("no metrics in line") + } + + if len(metrics) > 1 { + return nil, fmt.Errorf("more than one metric in line") + } + + return metrics[0], nil } func init() { diff --git a/plugins/parsers/prometheus/parser_v1.go b/plugins/parsers/prometheus/parser_v1.go deleted file mode 100644 index 19932f8ee..000000000 --- a/plugins/parsers/prometheus/parser_v1.go +++ /dev/null @@ -1,147 +0,0 @@ -package prometheus - -import ( - "bufio" - "bytes" - "errors" - "fmt" - "io" - "math" - "mime" - "net/http" - "time" - - "github.com/matttproud/golang_protobuf_extensions/pbutil" - dto "github.com/prometheus/client_model/go" - "github.com/prometheus/common/expfmt" - - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/metric" -) - -func (p *Parser) parseV1(buf []byte) ([]telegraf.Metric, error) { - var parser expfmt.TextParser - var metrics []telegraf.Metric - var err error - // parse even if the buffer begins with a newline - buf = bytes.TrimPrefix(buf, []byte("\n")) - // Read raw data - buffer := bytes.NewBuffer(buf) - reader := bufio.NewReader(buffer) - - // Prepare output - metricFamilies := make(map[string]*dto.MetricFamily) - - if isProtobuf(p.Header) { - for { - mf := &dto.MetricFamily{} - if _, ierr := pbutil.ReadDelimited(reader, mf); ierr != nil { - if errors.Is(ierr, io.EOF) { - break - } - return nil, fmt.Errorf("reading metric family protocol buffer failed: %w", ierr) - } - metricFamilies[mf.GetName()] = mf - } - } else { - metricFamilies, err = parser.TextToMetricFamilies(reader) - if err != nil { - return nil, fmt.Errorf("reading text format failed: %w", err) - } - } - - now := time.Now() - // read metrics - for metricName, mf := range metricFamilies { - for _, m := range mf.Metric { - // reading tags - tags := GetTagsFromLabels(m, p.DefaultTags) - - // reading fields - var fields map[string]interface{} - if mf.GetType() == dto.MetricType_SUMMARY { - // summary metric - fields = makeQuantilesV1(m) - fields["count"] = float64(m.GetSummary().GetSampleCount()) - //nolint:unconvert // Conversion may be needed for float64 https://github.com/mdempsky/unconvert/issues/40 - fields["sum"] = float64(m.GetSummary().GetSampleSum()) - } else if mf.GetType() == dto.MetricType_HISTOGRAM { - // histogram metric - fields = makeBucketsV1(m) - fields["count"] = float64(m.GetHistogram().GetSampleCount()) - //nolint:unconvert // Conversion may be needed for float64 https://github.com/mdempsky/unconvert/issues/40 - fields["sum"] = float64(m.GetHistogram().GetSampleSum()) - } else { - // standard metric - fields = getNameAndValueV1(m) - } - // converting to telegraf metric - if len(fields) > 0 { - var t time.Time - if !p.IgnoreTimestamp && m.TimestampMs != nil && *m.TimestampMs > 0 { - t = time.Unix(0, *m.TimestampMs*1000000) - } else { - t = now - } - m := metric.New(metricName, tags, fields, t, ValueType(mf.GetType())) - metrics = append(metrics, m) - } - } - } - - return metrics, err -} - -func isProtobuf(header http.Header) bool { - mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type")) - if err != nil { - return false - } - - return mediatype == "application/vnd.google.protobuf" && - params["encoding"] == "delimited" && - params["proto"] == "io.prometheus.client.MetricFamily" -} - -// Get Quantiles from summary metric -func makeQuantilesV1(m *dto.Metric) map[string]interface{} { - fields := make(map[string]interface{}) - for _, q := range m.GetSummary().Quantile { - if !math.IsNaN(q.GetValue()) { - //nolint:unconvert // Conversion may be needed for float64 https://github.com/mdempsky/unconvert/issues/40 - fields[fmt.Sprint(q.GetQuantile())] = float64(q.GetValue()) - } - } - return fields -} - -// Get Buckets from histogram metric -func makeBucketsV1(m *dto.Metric) map[string]interface{} { - fields := make(map[string]interface{}) - for _, b := range m.GetHistogram().Bucket { - fields[fmt.Sprint(b.GetUpperBound())] = float64(b.GetCumulativeCount()) - } - return fields -} - -// Get name and value from metric -func getNameAndValueV1(m *dto.Metric) map[string]interface{} { - fields := make(map[string]interface{}) - if m.Gauge != nil { - if !math.IsNaN(m.GetGauge().GetValue()) { - //nolint:unconvert // Conversion may be needed for float64 https://github.com/mdempsky/unconvert/issues/40 - fields["gauge"] = float64(m.GetGauge().GetValue()) - } - } else if m.Counter != nil { - if !math.IsNaN(m.GetCounter().GetValue()) { - //nolint:unconvert // Conversion may be needed for float64 https://github.com/mdempsky/unconvert/issues/40 - fields["counter"] = float64(m.GetCounter().GetValue()) - } - } else if m.Untyped != nil { - if !math.IsNaN(m.GetUntyped().GetValue()) { - //nolint:unconvert // Conversion may be needed for float64 https://github.com/mdempsky/unconvert/issues/40 - fields["value"] = float64(m.GetUntyped().GetValue()) - } - } - return fields -} diff --git a/plugins/parsers/prometheus/parser_v2.go b/plugins/parsers/prometheus/parser_v2.go deleted file mode 100644 index edb096430..000000000 --- a/plugins/parsers/prometheus/parser_v2.go +++ /dev/null @@ -1,197 +0,0 @@ -package prometheus - -import ( - "bufio" - "bytes" - "errors" - "fmt" - "io" - "math" - "mime" - "time" - - "github.com/matttproud/golang_protobuf_extensions/pbutil" - dto "github.com/prometheus/client_model/go" - "github.com/prometheus/common/expfmt" - - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/metric" -) - -func (p *Parser) parseV2(buf []byte) ([]telegraf.Metric, error) { - var parser expfmt.TextParser - var metrics []telegraf.Metric - var err error - - // Make sure we have a finishing newline but no trailing one - buf = bytes.TrimPrefix(buf, []byte("\n")) - if !bytes.HasSuffix(buf, []byte("\n")) { - buf = append(buf, []byte("\n")...) - } - - // Read raw data - buffer := bytes.NewBuffer(buf) - reader := bufio.NewReader(buffer) - - // Prepare output - metricFamilies := make(map[string]*dto.MetricFamily) - mediatype, params, err := mime.ParseMediaType(p.Header.Get("Content-Type")) - if err == nil && mediatype == "application/vnd.google.protobuf" && - params["encoding"] == "delimited" && - params["proto"] == "io.prometheus.client.MetricFamily" { - for { - mf := &dto.MetricFamily{} - if _, ierr := pbutil.ReadDelimited(reader, mf); ierr != nil { - if errors.Is(ierr, io.EOF) { - break - } - return nil, fmt.Errorf("reading metric family protocol buffer failed: %w", ierr) - } - metricFamilies[mf.GetName()] = mf - } - } else { - metricFamilies, err = parser.TextToMetricFamilies(reader) - if err != nil { - return nil, fmt.Errorf("reading text format failed: %w", err) - } - } - - now := time.Now() - - // read metrics - for metricName, mf := range metricFamilies { - for _, m := range mf.Metric { - // reading tags - tags := GetTagsFromLabels(m, p.DefaultTags) - t := p.getTimestampV2(m, now) - - if mf.GetType() == dto.MetricType_SUMMARY { - // summary metric - telegrafMetrics := makeQuantilesV2(m, tags, metricName, mf.GetType(), t) - metrics = append(metrics, telegrafMetrics...) - } else if mf.GetType() == dto.MetricType_HISTOGRAM { - // histogram metric - telegrafMetrics := makeBucketsV2(m, tags, metricName, mf.GetType(), t) - metrics = append(metrics, telegrafMetrics...) - } else { - // standard metric - // reading fields - fields := getNameAndValueV2(m, metricName) - // converting to telegraf metric - if len(fields) > 0 { - m := metric.New("prometheus", tags, fields, t, ValueType(mf.GetType())) - metrics = append(metrics, m) - } - } - } - } - - return metrics, err -} - -func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { - metrics, err := p.Parse([]byte(line)) - if err != nil { - return nil, err - } - - if len(metrics) < 1 { - return nil, fmt.Errorf("no metrics in line") - } - - if len(metrics) > 1 { - return nil, fmt.Errorf("more than one metric in line") - } - - return metrics[0], nil -} - -// Get Quantiles for summary metric & Buckets for histogram -func makeQuantilesV2(m *dto.Metric, tags map[string]string, metricName string, metricType dto.MetricType, t time.Time) []telegraf.Metric { - metrics := make([]telegraf.Metric, 0, len(m.GetSummary().Quantile)+1) - fields := make(map[string]interface{}) - - fields[metricName+"_count"] = float64(m.GetSummary().GetSampleCount()) - fields[metricName+"_sum"] = m.GetSummary().GetSampleSum() - met := metric.New("prometheus", tags, fields, t, ValueType(metricType)) - metrics = append(metrics, met) - - for _, q := range m.GetSummary().Quantile { - newTags := tags - fields = make(map[string]interface{}) - - newTags["quantile"] = fmt.Sprint(q.GetQuantile()) - fields[metricName] = q.GetValue() - - quantileMetric := metric.New("prometheus", newTags, fields, t, ValueType(metricType)) - metrics = append(metrics, quantileMetric) - } - return metrics -} - -// Get Buckets from histogram metric -func makeBucketsV2(m *dto.Metric, tags map[string]string, metricName string, metricType dto.MetricType, t time.Time) []telegraf.Metric { - metrics := make([]telegraf.Metric, 0, len(m.GetHistogram().Bucket)+2) - fields := make(map[string]interface{}) - - fields[metricName+"_count"] = float64(m.GetHistogram().GetSampleCount()) - fields[metricName+"_sum"] = m.GetHistogram().GetSampleSum() - - met := metric.New("prometheus", tags, fields, t, ValueType(metricType)) - metrics = append(metrics, met) - - infSeen := false - for _, b := range m.GetHistogram().Bucket { - newTags := tags - fields = make(map[string]interface{}) - newTags["le"] = fmt.Sprint(b.GetUpperBound()) - fields[metricName+"_bucket"] = float64(b.GetCumulativeCount()) - - histogramMetric := metric.New("prometheus", newTags, fields, t, ValueType(metricType)) - metrics = append(metrics, histogramMetric) - if math.IsInf(b.GetUpperBound(), +1) { - infSeen = true - } - } - // Infinity bucket is required for proper function of histogram in prometheus - if !infSeen { - newTags := tags - newTags["le"] = "+Inf" - - fields = make(map[string]interface{}) - fields[metricName+"_bucket"] = float64(m.GetHistogram().GetSampleCount()) - - histogramInfMetric := metric.New("prometheus", newTags, fields, t, ValueType(metricType)) - metrics = append(metrics, histogramInfMetric) - } - return metrics -} - -// Get name and value from metric -func getNameAndValueV2(m *dto.Metric, metricName string) map[string]interface{} { - fields := make(map[string]interface{}) - if m.Gauge != nil { - if !math.IsNaN(m.GetGauge().GetValue()) { - fields[metricName] = m.GetGauge().GetValue() - } - } else if m.Counter != nil { - if !math.IsNaN(m.GetCounter().GetValue()) { - fields[metricName] = m.GetCounter().GetValue() - } - } else if m.Untyped != nil { - if !math.IsNaN(m.GetUntyped().GetValue()) { - fields[metricName] = m.GetUntyped().GetValue() - } - } - return fields -} - -func (p *Parser) getTimestampV2(m *dto.Metric, now time.Time) time.Time { - var t time.Time - if !p.IgnoreTimestamp && m.TimestampMs != nil && *m.TimestampMs > 0 { - t = time.Unix(0, m.GetTimestampMs()*1000000) - } else { - t = now - } - return t -}