XPath parser extension to allow parsing of JSON, MessagePack and Protocol-buffers (#9277)

This commit is contained in:
Sven Rebhan 2021-07-01 22:48:16 +02:00 committed by GitHub
parent 9b22161d92
commit 25413b2b6d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 869 additions and 150 deletions

View File

@ -19,6 +19,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/aggregators"
"github.com/influxdata/telegraf/plugins/inputs"
@ -1296,6 +1297,11 @@ func (c *Config) buildParser(name string, tbl *ast.Table) (parsers.Parser, error
}
logger := models.NewLogger("parsers", config.DataFormat, name)
models.SetLoggerOnPlugin(parser, logger)
if initializer, ok := parser.(telegraf.Initializer); ok {
if err := initializer.Init(); err != nil {
return nil, err
}
}
return parser, nil
}
@ -1366,24 +1372,36 @@ func (c *Config) getParserConfig(name string, tbl *ast.Table) (*parsers.Config,
c.getFieldString(tbl, "value_field_name", &pc.ValueFieldName)
//for XML parser
if node, ok := tbl.Fields["xml"]; ok {
if subtbls, ok := node.([]*ast.Table); ok {
pc.XMLConfig = make([]parsers.XMLConfig, len(subtbls))
for i, subtbl := range subtbls {
subcfg := pc.XMLConfig[i]
c.getFieldString(subtbl, "metric_name", &subcfg.MetricQuery)
c.getFieldString(subtbl, "metric_selection", &subcfg.Selection)
c.getFieldString(subtbl, "timestamp", &subcfg.Timestamp)
c.getFieldString(subtbl, "timestamp_format", &subcfg.TimestampFmt)
c.getFieldStringMap(subtbl, "tags", &subcfg.Tags)
c.getFieldStringMap(subtbl, "fields", &subcfg.Fields)
c.getFieldStringMap(subtbl, "fields_int", &subcfg.FieldsInt)
c.getFieldString(subtbl, "field_selection", &subcfg.FieldSelection)
c.getFieldBool(subtbl, "field_name_expansion", &subcfg.FieldNameExpand)
c.getFieldString(subtbl, "field_name", &subcfg.FieldNameQuery)
c.getFieldString(subtbl, "field_value", &subcfg.FieldValueQuery)
pc.XMLConfig[i] = subcfg
//for XPath parser family
if choice.Contains(pc.DataFormat, []string{"xml", "xpath_json", "xpath_msgpack", "xpath_protobuf"}) {
c.getFieldString(tbl, "xpath_protobuf_file", &pc.XPathProtobufFile)
c.getFieldString(tbl, "xpath_protobuf_type", &pc.XPathProtobufType)
c.getFieldBool(tbl, "xpath_print_document", &pc.XPathPrintDocument)
// Determine the actual xpath configuration tables
node, xpathOK := tbl.Fields["xpath"]
if !xpathOK {
// Add this for backward compatibility
node, xpathOK = tbl.Fields[pc.DataFormat]
}
if xpathOK {
if subtbls, ok := node.([]*ast.Table); ok {
pc.XPathConfig = make([]parsers.XPathConfig, len(subtbls))
for i, subtbl := range subtbls {
subcfg := pc.XPathConfig[i]
c.getFieldString(subtbl, "metric_name", &subcfg.MetricQuery)
c.getFieldString(subtbl, "metric_selection", &subcfg.Selection)
c.getFieldString(subtbl, "timestamp", &subcfg.Timestamp)
c.getFieldString(subtbl, "timestamp_format", &subcfg.TimestampFmt)
c.getFieldStringMap(subtbl, "tags", &subcfg.Tags)
c.getFieldStringMap(subtbl, "fields", &subcfg.Fields)
c.getFieldStringMap(subtbl, "fields_int", &subcfg.FieldsInt)
c.getFieldString(subtbl, "field_selection", &subcfg.FieldSelection)
c.getFieldBool(subtbl, "field_name_expansion", &subcfg.FieldNameExpand)
c.getFieldString(subtbl, "field_name", &subcfg.FieldNameQuery)
c.getFieldString(subtbl, "field_value", &subcfg.FieldValueQuery)
pc.XPathConfig[i] = subcfg
}
}
}
}
@ -1551,13 +1569,15 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
"grok_custom_pattern_files", "grok_custom_patterns", "grok_named_patterns", "grok_patterns",
"grok_timezone", "grok_unique_timestamp", "influx_max_line_bytes", "influx_sort_fields",
"influx_uint_support", "interval", "json_name_key", "json_query", "json_strict",
"json_string_fields", "json_time_format", "json_time_key", "json_timestamp_units", "json_timezone",
"json_string_fields", "json_time_format", "json_time_key", "json_timestamp_units", "json_timezone", "json_v2",
"metric_batch_size", "metric_buffer_limit", "name_override", "name_prefix",
"name_suffix", "namedrop", "namepass", "order", "pass", "period", "precision",
"prefix", "prometheus_export_timestamp", "prometheus_sort_metrics", "prometheus_string_as_label",
"separator", "splunkmetric_hec_routing", "splunkmetric_multimetric", "tag_keys",
"tagdrop", "tagexclude", "taginclude", "tagpass", "tags", "template", "templates",
"value_field_name", "wavefront_source_override", "wavefront_use_strict", "xml", "json_v2":
"value_field_name", "wavefront_source_override", "wavefront_use_strict",
"xml", "xpath", "xpath_json", "xpath_msgpack", "xpath_protobuf", "xpath_print_document",
"xpath_protobuf_file", "xpath_protobuf_type":
// ignore fields that are common to all plugins.
default:

View File

@ -23,6 +23,7 @@ following works:
- github.com/alecthomas/units [MIT License](https://github.com/alecthomas/units/blob/master/COPYING)
- github.com/aliyun/alibaba-cloud-sdk-go [Apache License 2.0](https://github.com/aliyun/alibaba-cloud-sdk-go/blob/master/LICENSE)
- github.com/amir/raidman [The Unlicense](https://github.com/amir/raidman/blob/master/UNLICENSE)
- github.com/antchfx/jsonquery [MIT License](https://github.com/antchfx/jsonquery/blob/master/LICENSE)
- github.com/antchfx/xmlquery [MIT License](https://github.com/antchfx/xmlquery/blob/master/LICENSE)
- github.com/antchfx/xpath [MIT License](https://github.com/antchfx/xpath/blob/master/LICENSE)
- github.com/apache/arrow/go/arrow [Apache License 2.0](https://github.com/apache/arrow/blob/master/LICENSE.txt)
@ -64,6 +65,7 @@ following works:
- github.com/docker/docker [Apache License 2.0](https://github.com/docker/docker/blob/master/LICENSE)
- github.com/docker/go-connections [Apache License 2.0](https://github.com/docker/go-connections/blob/master/LICENSE)
- github.com/docker/go-units [Apache License 2.0](https://github.com/docker/go-units/blob/master/LICENSE)
- github.com/doclambda/protobufquery [MIT License](https://github.com/doclambda/protobufquery/blob/master/LICENSE)
- github.com/dynatrace-oss/dynatrace-metric-utils-go [Apache License 2.0](https://github.com/dynatrace-oss/dynatrace-metric-utils-go/blob/master/LICENSE)
- github.com/eapache/go-resiliency [MIT License](https://github.com/eapache/go-resiliency/blob/master/LICENSE)
- github.com/eapache/go-xerial-snappy [MIT License](https://github.com/eapache/go-xerial-snappy/blob/master/LICENSE)
@ -131,6 +133,7 @@ following works:
- github.com/jaegertracing/jaeger [Apache License 2.0](https://github.com/jaegertracing/jaeger/blob/master/LICENSE)
- github.com/james4k/rcon [MIT License](https://github.com/james4k/rcon/blob/master/LICENSE)
- github.com/jcmturner/gofork [BSD 3-Clause "New" or "Revised" License](https://github.com/jcmturner/gofork/blob/master/LICENSE)
- github.com/jhump/protoreflect [Apache License 2.0](https://github.com/jhump/protoreflect/blob/master/LICENSE)
- github.com/jmespath/go-jmespath [Apache License 2.0](https://github.com/jmespath/go-jmespath/blob/master/LICENSE)
- github.com/jpillora/backoff [MIT License](https://github.com/jpillora/backoff/blob/master/LICENSE)
- github.com/json-iterator/go [MIT License](https://github.com/json-iterator/go/blob/master/LICENSE)

4
go.mod
View File

@ -21,6 +21,7 @@ require (
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d
github.com/aliyun/alibaba-cloud-sdk-go v1.61.1004
github.com/amir/raidman v0.0.0-20170415203553-1ccc43bfb9c9
github.com/antchfx/jsonquery v1.1.4
github.com/antchfx/xmlquery v1.3.5
github.com/antchfx/xpath v1.1.11
github.com/apache/thrift v0.13.0
@ -45,6 +46,7 @@ require (
github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1
github.com/dimchansky/utfbom v1.1.1
github.com/docker/docker v20.10.6+incompatible
github.com/doclambda/protobufquery v0.0.0-20210317203640-88ffabe06a60
github.com/dynatrace-oss/dynatrace-metric-utils-go v0.1.0
github.com/eclipse/paho.mqtt.golang v1.3.0
github.com/form3tech-oss/jwt-go v3.2.3+incompatible // indirect
@ -82,6 +84,7 @@ require (
github.com/influxdata/wlog v0.0.0-20160411224016-7c63b0a71ef8
github.com/jackc/pgx/v4 v4.6.0
github.com/james4k/rcon v0.0.0-20120923215419-8fbb8268b60a
github.com/jhump/protoreflect v1.8.3-0.20210616212123-6cc1efa697ca
github.com/jmespath/go-jmespath v0.4.0
github.com/kardianos/service v1.0.0
github.com/karrick/godirwalk v1.16.1
@ -142,6 +145,7 @@ require (
google.golang.org/api v0.29.0
google.golang.org/genproto v0.0.0-20201110150050-8816d57aaa9a
google.golang.org/grpc v1.37.0
google.golang.org/protobuf v1.26.0
gopkg.in/djherbis/times.v1 v1.2.0
gopkg.in/fatih/pool.v2 v2.0.0 // indirect
gopkg.in/gorethink/gorethink.v3 v3.0.5

15
go.sum
View File

@ -182,8 +182,11 @@ github.com/aliyun/alibaba-cloud-sdk-go v1.61.1004/go.mod h1:pUKYbK5JQ+1Dfxk80P0q
github.com/amir/raidman v0.0.0-20170415203553-1ccc43bfb9c9 h1:FXrPTd8Rdlc94dKccl7KPmdmIbVh/OjelJ8/vgMRzcQ=
github.com/amir/raidman v0.0.0-20170415203553-1ccc43bfb9c9/go.mod h1:eliMa/PW+RDr2QLWRmLH1R1ZA4RInpmvOzDDXtaIZkc=
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8=
github.com/antchfx/jsonquery v1.1.4 h1:+OlFO3QS9wjU0MKx9MgHm5f6o6hdd4e9mUTp0wTjxlM=
github.com/antchfx/jsonquery v1.1.4/go.mod h1:cHs8r6Bymd8j6HI6Ej1IJbjahKvLBcIEh54dfmo+E9A=
github.com/antchfx/xmlquery v1.3.5 h1:I7TuBRqsnfFuL11ruavGm911Awx9IqSdiU6W/ztSmVw=
github.com/antchfx/xmlquery v1.3.5/go.mod h1:64w0Xesg2sTaawIdNqMB+7qaW/bSqkQm+ssPaCMWNnc=
github.com/antchfx/xpath v1.1.7/go.mod h1:Yee4kTMuNiPYJ7nSNorELQMr1J33uOpXDMByNYhvtNk=
github.com/antchfx/xpath v1.1.10/go.mod h1:Yee4kTMuNiPYJ7nSNorELQMr1J33uOpXDMByNYhvtNk=
github.com/antchfx/xpath v1.1.11 h1:WOFtK8TVAjLm3lbgqeP0arlHpvCEeTANeWZ/csPpJkQ=
github.com/antchfx/xpath v1.1.11/go.mod h1:i54GszH55fYfBmoZXapTHN8T8tkcHfRgLyVwwqzXNcs=
@ -462,6 +465,8 @@ github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw
github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/docker/libtrust v0.0.0-20150114040149-fa567046d9b1/go.mod h1:cyGadeNEkKy96OOhEzfZl+yxihPEzKnqJwvfuSUqbZE=
github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM=
github.com/doclambda/protobufquery v0.0.0-20210317203640-88ffabe06a60 h1:27379cxrsKlr7hAnW/xrusefspUPjqHVRW1K/bZgfGw=
github.com/doclambda/protobufquery v0.0.0-20210317203640-88ffabe06a60/go.mod h1:8Ia4zp86glrUhC29AAdK9hwTYh8RB6v0WRCtpplYqEg=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
github.com/dropbox/godropbox v0.0.0-20180512210157-31879d3884b9 h1:NAvZb7gqQfLSNBPzVsvI7eZMosXtg2g2kxXrei90CtU=
github.com/dropbox/godropbox v0.0.0-20180512210157-31879d3884b9/go.mod h1:glr97hP/JuXb+WMYCizc4PIFuzw1lCR97mwbe1VVXhQ=
@ -760,6 +765,7 @@ github.com/gophercloud/gophercloud v0.1.0/go.mod h1:vxM41WHh5uqHVBMZHzuwNOHh8XEo
github.com/gophercloud/gophercloud v0.12.0/go.mod h1:gmC5oQqMDOMO1t1gq5DquX/yAU808e/4mzjjDA76+Ss=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gordonklaus/ineffassign v0.0.0-20200309095847-7953dde2c7bf/go.mod h1:cuNKsD1zp2v6XfE/orVX2QE1LC+i254ceGcVeDT3pTU=
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
github.com/gorilla/handlers v0.0.0-20150720190736-60c7bfde3e33/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/OAirnOIQ=
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
@ -927,6 +933,8 @@ github.com/james4k/rcon v0.0.0-20120923215419-8fbb8268b60a/go.mod h1:1qNVsDcmNQD
github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8=
github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jhump/protoreflect v1.8.3-0.20210616212123-6cc1efa697ca h1:a0GZUdb+qnutF8shJxr2qs2qT3fnF+ptxTxPB8+oIvk=
github.com/jhump/protoreflect v1.8.3-0.20210616212123-6cc1efa697ca/go.mod h1:7GcYQDdMU/O/BBrl/cX6PNHpXh6cenjd8pneu5yW7Tg=
github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmespath/go-jmespath v0.0.0-20160803190731-bd40a432e4c7/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
@ -1150,6 +1158,7 @@ github.com/newrelic/newrelic-telemetry-sdk-go v0.5.1 h1:9YEHXplqlVkOltThchh+RxeO
github.com/newrelic/newrelic-telemetry-sdk-go v0.5.1/go.mod h1:2kY6OeOxrJ+RIQlVjWDc/pZlT3MIf30prs6drzMfJ6E=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nishanths/predeclared v0.0.0-20200524104333-86fad755b4d3/go.mod h1:nt3d53pc1VYcphSCIaYAJtnPYnr3Zyn8fMq2wvPGPso=
github.com/nsqio/go-nsq v1.0.8 h1:3L2F8tNLlwXXlp2slDUrUWSBn2O3nMh8R1/KEDFTHPk=
github.com/nsqio/go-nsq v1.0.8/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs=
@ -1853,7 +1862,9 @@ golang.org/x/tools v0.0.0-20200224181240-023911ca70b2/go.mod h1:TB2adYChydJhpapK
golang.org/x/tools v0.0.0-20200304193943-95d2e580d8eb/go.mod h1:o4KQGtdN14AW+yjsvvwRTJJuXz8XRtIHtEnmAXLyFUw=
golang.org/x/tools v0.0.0-20200331025713-a30bf2db82d4/go.mod h1:Sl4aGygMT6LrqrWclx+PTx3U+LnKx/seiNR+3G19Ar8=
golang.org/x/tools v0.0.0-20200513201620-d5fe73897c97/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200522201501-cb1345f3a375/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200717024301-6ddee64345a6/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200822203824-307de81be3f4/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20201022035929-9cf592e881e9/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201124115921-2c860bdd6e78/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
@ -1963,6 +1974,7 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.25.1-0.20200805231151-a709e31e5d12/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
@ -2051,8 +2063,9 @@ honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3 h1:sXmLre5bzIR6ypkjXCDI3jHPssRhc8KD/Ome589sc3U=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4 h1:UoveltGrhghAA7ePc+e+QYDHXrBps2PqFZiHkGR/xK8=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.18.8/go.mod h1:d/CXqwWv+Z2XEG1LgceeDmHQwpUJhROPx16SlxJgERY=
k8s.io/api v0.20.1/go.mod h1:KqwcCVogGxQY3nBlRpwt+wpAMF/KjaCc7RpywacvqUo=
k8s.io/api v0.20.4 h1:xZjKidCirayzX6tHONRQyTNDVIR55TYVqgATqo6ZULY=

View File

@ -19,7 +19,7 @@ import (
"github.com/influxdata/telegraf/plugins/parsers/prometheusremotewrite"
"github.com/influxdata/telegraf/plugins/parsers/value"
"github.com/influxdata/telegraf/plugins/parsers/wavefront"
"github.com/influxdata/telegraf/plugins/parsers/xml"
"github.com/influxdata/telegraf/plugins/parsers/xpath"
)
type ParserFunc func() (Parser, error)
@ -159,16 +159,17 @@ type Config struct {
// Value configuration
ValueFieldName string `toml:"value_field_name"`
// XML configuration
XMLConfig []XMLConfig `toml:"xml"`
// XPath configuration
XPathPrintDocument bool `toml:"xpath_print_document"`
XPathProtobufFile string `toml:"xpath_protobuf_file"`
XPathProtobufType string `toml:"xpath_protobuf_type"`
XPathConfig []XPathConfig
// JSONPath configuration
JSONV2Config []JSONV2Config `toml:"json_v2"`
}
type XMLConfig struct {
xml.Config
}
type XPathConfig xpath.Config
type JSONV2Config struct {
json_v2.Config
@ -261,8 +262,15 @@ func NewParser(config *Config) (Parser, error) {
parser, err = NewPrometheusParser(config.DefaultTags)
case "prometheusremotewrite":
parser, err = NewPrometheusRemoteWriteParser(config.DefaultTags)
case "xml":
parser, err = NewXMLParser(config.MetricName, config.DefaultTags, config.XMLConfig)
case "xml", "xpath_json", "xpath_msgpack", "xpath_protobuf":
parser = &xpath.Parser{
Format: config.DataFormat,
ProtobufMessageDef: config.XPathProtobufFile,
ProtobufMessageType: config.XPathProtobufType,
PrintDocument: config.XPathPrintDocument,
DefaultTags: config.DefaultTags,
Configs: NewXPathParserConfigs(config.MetricName, config.XPathConfig),
}
case "json_v2":
parser, err = NewJSONPathParser(config.JSONV2Config)
default:
@ -382,30 +390,15 @@ func NewPrometheusRemoteWriteParser(defaultTags map[string]string) (Parser, erro
}, nil
}
func NewXMLParser(metricName string, defaultTags map[string]string, xmlConfigs []XMLConfig) (Parser, error) {
func NewXPathParserConfigs(metricName string, cfgs []XPathConfig) []xpath.Config {
// Convert the config formats which is a one-to-one copy
configs := make([]xml.Config, len(xmlConfigs))
for i, cfg := range xmlConfigs {
configs[i].MetricName = metricName
configs[i].MetricQuery = cfg.MetricQuery
configs[i].Selection = cfg.Selection
configs[i].Timestamp = cfg.Timestamp
configs[i].TimestampFmt = cfg.TimestampFmt
configs[i].Tags = cfg.Tags
configs[i].Fields = cfg.Fields
configs[i].FieldsInt = cfg.FieldsInt
configs[i].FieldSelection = cfg.FieldSelection
configs[i].FieldNameQuery = cfg.FieldNameQuery
configs[i].FieldValueQuery = cfg.FieldValueQuery
configs[i].FieldNameExpand = cfg.FieldNameExpand
configs := make([]xpath.Config, 0, len(cfgs))
for _, cfg := range cfgs {
config := xpath.Config(cfg)
config.MetricName = metricName
configs = append(configs, config)
}
return &xml.Parser{
Configs: configs,
DefaultTags: defaultTags,
}, nil
return configs
}
func NewJSONPathParser(jsonv2config []JSONV2Config) (Parser, error) {

View File

@ -1,13 +1,24 @@
# XML
# XPath
The XML data format parser parses a [XML][xml] string into metric fields using [XPath][xpath] expressions. For supported
XPath functions check [the underlying XPath library][xpath lib].
The XPath data format parser parses different formats into metric fields using [XPath][xpath] expressions.
**NOTE:** The type of fields are specified using [XPath functions][xpath lib]. The only exception are *integer* fields
that need to be specified in a `fields_int` section.
For supported XPath functions check [the underlying XPath library][xpath lib].
### Configuration
**NOTE:** The type of fields are specified using [XPath functions][xpath lib]. The only exception are *integer* fields that need to be specified in a `fields_int` section.
### Supported data formats
| name | `data_format` setting | comment |
| --------------------------------------- | --------------------- | ------- |
| [Extensible Markup Language (XML)][xml] | `"xml"` | |
| [JSON][json] | `"xpath_json"` | |
| [MessagePack][msgpack] | `"xpath_msgpack"` | |
| [Protocol buffers][protobuf] | `"xpath_protobuf"` | [see additional parameters](protocol-buffers-additiona-settings)|
#### Protocol buffers additional settings
For using the protocol-buffer format you need to specify a protocol buffer definition file (`.proto`) in `xpath_protobuf_file`, Furthermore, you need to specify which message type you want to use via `xpath_protobuf_type`.
### Configuration (explicit)
In this configuration mode, you explicitly specify the field and tags you want to scrape out of your data.
```toml
[[inputs.file]]
files = ["example.xml"]
@ -18,44 +29,56 @@ that need to be specified in a `fields_int` section.
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "xml"
## PROTOCOL BUFFER definitions
## Protocol buffer definition file
# xpath_protobuf_file = "sparkplug_b.proto"
## Name of the protocol buffer message type to use in a fully qualified form.
# xpath_protobuf_type = ""org.eclipse.tahu.protobuf.Payload""
## Print the internal XML document when in debug logging mode.
## This is especially useful when using the parser with non-XML formats like protocol buffers
## to get an idea on the expression necessary to derive fields etc.
# xpath_print_document = false
## Multiple parsing sections are allowed
[[inputs.file.xml]]
[[inputs.file.xpath]]
## Optional: XPath-query to select a subset of nodes from the XML document.
#metric_selection = "/Bus/child::Sensor"
# metric_selection = "/Bus/child::Sensor"
## Optional: XPath-query to set the metric (measurement) name.
#metric_name = "string('example')"
# metric_name = "string('example')"
## Optional: Query to extract metric timestamp.
## If not specified the time of execution is used.
#timestamp = "/Gateway/Timestamp"
# timestamp = "/Gateway/Timestamp"
## Optional: Format of the timestamp determined by the query above.
## This can be any of "unix", "unix_ms", "unix_us", "unix_ns" or a valid Golang
## time format. If not specified, a "unix" timestamp (in seconds) is expected.
#timestamp_format = "2006-01-02T15:04:05Z"
# timestamp_format = "2006-01-02T15:04:05Z"
## Tag definitions using the given XPath queries.
[inputs.file.xml.tags]
[inputs.file.xpath.tags]
name = "substring-after(Sensor/@name, ' ')"
device = "string('the ultimate sensor')"
## Integer field definitions using XPath queries.
[inputs.file.xml.fields_int]
[inputs.file.xpath.fields_int]
consumers = "Variable/@consumers"
## Non-integer field definitions using XPath queries.
## The field type is defined using XPath expressions such as number(), boolean() or string(). If no conversion is performed the field will be of type string.
[inputs.file.xml.fields]
[inputs.file.xpath.fields]
temperature = "number(Variable/@temperature)"
power = "number(Variable/@power)"
frequency = "number(Variable/@frequency)"
ok = "Mode != 'ok'"
```
A configuration can contain muliple *xml* subsections for e.g. the file plugin to process the xml-string multiple times.
Consult the [XPath syntax][xpath] and the [underlying library's functions][xpath lib] for details and help regarding XPath queries. Consider using an XPath tester such as [xpather.com][xpather] or [Code Beautify's XPath Tester][xpath tester] for help developing and debugging
A configuration can contain muliple *xpath* subsections for e.g. the file plugin to process the xml-string multiple times. Consult the [XPath syntax][xpath] and the [underlying library's functions][xpath lib] for details and help regarding XPath queries. Consider using an XPath tester such as [xpather.com][xpather] or [Code Beautify's XPath Tester][xpath tester] for help developing and debugging
your query.
## Configuration (batch)
Alternatively to the configuration above, fields can also be specified in a batch way. So contrary to specify the fields
in a section, you can define a `name` and a `value` selector used to determine the name and value of the fields in the
metric.
@ -69,21 +92,31 @@ metric.
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "xml"
## Name of the protocol buffer type to use.
## This is only relevant when parsing protocol buffers and must contain the fully qualified
## name of the type e.g. "org.eclipse.tahu.protobuf.Payload".
# xpath_protobuf_type = ""
## Print the internal XML document when in debug logging mode.
## This is especially useful when using the parser with non-XML formats like protocol buffers
## to get an idea on the expression necessary to derive fields etc.
# xpath_print_document = false
## Multiple parsing sections are allowed
[[inputs.file.xml]]
[[inputs.file.xpath]]
## Optional: XPath-query to select a subset of nodes from the XML document.
metric_selection = "/Bus/child::Sensor"
## Optional: XPath-query to set the metric (measurement) name.
#metric_name = "string('example')"
# metric_name = "string('example')"
## Optional: Query to extract metric timestamp.
## If not specified the time of execution is used.
#timestamp = "/Gateway/Timestamp"
# timestamp = "/Gateway/Timestamp"
## Optional: Format of the timestamp determined by the query above.
## This can be any of "unix", "unix_ms", "unix_us", "unix_ns" or a valid Golang
## time format. If not specified, a "unix" timestamp (in seconds) is expected.
#timestamp_format = "2006-01-02T15:04:05Z"
# timestamp_format = "2006-01-02T15:04:05Z"
## Field specifications using a selector.
field_selection = "child::*"
@ -91,15 +124,15 @@ metric.
## These options are only to be used in combination with 'field_selection'!
## By default the node name and node content is used if a field-selection
## is specified.
#field_name = "name()"
#field_value = "."
# field_name = "name()"
# field_value = "."
## Optional: Expand field names relative to the selected node
## This allows to flatten out nodes with non-unique names in the subtree
#field_name_expansion = false
# field_name_expansion = false
## Tag definitions using the given XPath queries.
[inputs.file.xml.tags]
[inputs.file.xpath.tags]
name = "substring-after(Sensor/@name, ' ')"
device = "string('the ultimate sensor')"
@ -215,14 +248,14 @@ Config:
files = ["example.xml"]
data_format = "xml"
[[inputs.file.xml]]
[inputs.file.xml.tags]
[[inputs.file.xpath]]
[inputs.file.xpath.tags]
gateway = "substring-before(/Gateway/Name, ' ')"
[inputs.file.xml.fields_int]
[inputs.file.xpath.fields_int]
seqnr = "/Gateway/Sequence"
[inputs.file.xml.fields]
[inputs.file.xpath.fields]
ok = "/Gateway/Status = 'ok'"
```
@ -244,16 +277,16 @@ Config:
files = ["example.xml"]
data_format = "xml"
[[inputs.file.xml]]
[[inputs.file.xpath]]
metric_name = "name(/Gateway/Status)"
timestamp = "/Gateway/Timestamp"
timestamp_format = "2006-01-02T15:04:05Z"
[inputs.file.xml.tags]
[inputs.file.xpath.tags]
gateway = "substring-before(/Gateway/Name, ' ')"
[inputs.file.xml.fields]
[inputs.file.xpath.fields]
ok = "/Gateway/Status = 'ok'"
```
@ -273,7 +306,7 @@ Config:
files = ["example.xml"]
data_format = "xml"
[[inputs.file.xml]]
[[inputs.file.xpath]]
metric_selection = "/Bus/child::Sensor"
metric_name = "string('sensors')"
@ -281,13 +314,13 @@ Config:
timestamp = "/Gateway/Timestamp"
timestamp_format = "2006-01-02T15:04:05Z"
[inputs.file.xml.tags]
[inputs.file.xpath.tags]
name = "substring-after(@name, ' ')"
[inputs.file.xml.fields_int]
[inputs.file.xpath.fields_int]
consumers = "Variable/@consumers"
[inputs.file.xml.fields]
[inputs.file.xpath.fields]
temperature = "number(Variable/@temperature)"
power = "number(Variable/@power)"
frequency = "number(Variable/@frequency)"
@ -314,7 +347,7 @@ Config:
files = ["example.xml"]
data_format = "xml"
[[inputs.file.xml]]
[[inputs.file.xpath]]
metric_selection = "/Bus/child::Sensor"
metric_name = "string('sensors')"
@ -325,7 +358,7 @@ Config:
field_name = "name(@*[1])"
field_value = "number(@*[1])"
[inputs.file.xml.tags]
[inputs.file.xpath.tags]
name = "substring-after(@name, ' ')"
```
@ -340,6 +373,9 @@ Using the `metric_selection` option we select all `Sensor` nodes in the XML docu
For each selected *field-node* we use `field_name` and `field_value` to determining the field's name and value, respectively. The `field_name` derives the name of the first attribute of the node, while `field_value` derives the value of the first attribute and converts the result to a number.
[xpath lib]: https://github.com/antchfx/xpath
[json]: https://www.json.org/
[msgpack]: https://msgpack.org/
[protobuf]: https://developers.google.com/protocol-buffers
[xml]: https://www.w3.org/XML/
[xpath]: https://www.w3.org/TR/xpath/
[xpather]: http://xpather.com/

View File

@ -0,0 +1,65 @@
package xpath
import (
"strings"
"github.com/antchfx/jsonquery"
path "github.com/antchfx/xpath"
)
type jsonDocument struct{}
func (d *jsonDocument) Parse(buf []byte) (dataNode, error) {
return jsonquery.Parse(strings.NewReader(string(buf)))
}
func (d *jsonDocument) QueryAll(node dataNode, expr string) ([]dataNode, error) {
// If this panics it's a programming error as we changed the document type while processing
native, err := jsonquery.QueryAll(node.(*jsonquery.Node), expr)
if err != nil {
return nil, err
}
nodes := make([]dataNode, len(native))
for i, n := range native {
nodes[i] = n
}
return nodes, nil
}
func (d *jsonDocument) CreateXPathNavigator(node dataNode) path.NodeNavigator {
// If this panics it's a programming error as we changed the document type while processing
return jsonquery.CreateXPathNavigator(node.(*jsonquery.Node))
}
func (d *jsonDocument) GetNodePath(node, relativeTo dataNode, sep string) string {
names := make([]string, 0)
// If these panic it's a programming error as we changed the document type while processing
nativeNode := node.(*jsonquery.Node)
nativeRelativeTo := relativeTo.(*jsonquery.Node)
// Climb up the tree and collect the node names
n := nativeNode.Parent
for n != nil && n != nativeRelativeTo {
names = append(names, n.Data)
n = n.Parent
}
if len(names) < 1 {
return ""
}
// Construct the nodes
nodepath := ""
for _, name := range names {
nodepath = name + sep + nodepath
}
return nodepath[:len(nodepath)-1]
}
func (d *jsonDocument) OutputXML(node dataNode) string {
native := node.(*jsonquery.Node)
return native.OutputXML()
}

View File

@ -0,0 +1,39 @@
package xpath
import (
"bytes"
"fmt"
"github.com/tinylib/msgp/msgp"
"github.com/antchfx/jsonquery"
path "github.com/antchfx/xpath"
)
type msgpackDocument jsonDocument
func (d *msgpackDocument) Parse(buf []byte) (dataNode, error) {
var json bytes.Buffer
// Unmarshal the message-pack binary message to JSON and proceed with the jsonquery class
if _, err := msgp.UnmarshalAsJSON(&json, buf); err != nil {
return nil, fmt.Errorf("unmarshalling to json failed: %v", err)
}
return jsonquery.Parse(&json)
}
func (d *msgpackDocument) QueryAll(node dataNode, expr string) ([]dataNode, error) {
return (*jsonDocument)(d).QueryAll(node, expr)
}
func (d *msgpackDocument) CreateXPathNavigator(node dataNode) path.NodeNavigator {
return (*jsonDocument)(d).CreateXPathNavigator(node)
}
func (d *msgpackDocument) GetNodePath(node, relativeTo dataNode, sep string) string {
return (*jsonDocument)(d).GetNodePath(node, relativeTo, sep)
}
func (d *msgpackDocument) OutputXML(node dataNode) string {
return (*jsonDocument)(d).OutputXML(node)
}

View File

@ -1,4 +1,4 @@
package xml
package xpath
import (
"fmt"
@ -6,17 +6,32 @@ import (
"strings"
"time"
"github.com/antchfx/xmlquery"
"github.com/antchfx/xpath"
path "github.com/antchfx/xpath"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)
type dataNode interface{}
type dataDocument interface {
Parse(buf []byte) (dataNode, error)
QueryAll(node dataNode, expr string) ([]dataNode, error)
CreateXPathNavigator(node dataNode) path.NodeNavigator
GetNodePath(node, relativeTo dataNode, sep string) string
OutputXML(node dataNode) string
}
type Parser struct {
Configs []Config
DefaultTags map[string]string
Log telegraf.Logger
Format string
ProtobufMessageDef string
ProtobufMessageType string
PrintDocument bool
Configs []Config
DefaultTags map[string]string
Log telegraf.Logger
document dataDocument
}
type Config struct {
@ -35,14 +50,42 @@ type Config struct {
FieldNameExpand bool `toml:"field_name_expansion"`
}
func (p *Parser) Init() error {
switch p.Format {
case "", "xml":
p.document = &xmlDocument{}
case "xpath_json":
p.document = &jsonDocument{}
case "xpath_msgpack":
p.document = &msgpackDocument{}
case "xpath_protobuf":
pbdoc := protobufDocument{
MessageDefinition: p.ProtobufMessageDef,
MessageType: p.ProtobufMessageType,
Log: p.Log,
}
if err := pbdoc.Init(); err != nil {
return err
}
p.document = &pbdoc
default:
return fmt.Errorf("unknown data-format %q for xpath parser", p.Format)
}
return nil
}
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
t := time.Now()
// Parse the XML
doc, err := xmlquery.Parse(strings.NewReader(string(buf)))
doc, err := p.document.Parse(buf)
if err != nil {
return nil, err
}
if p.PrintDocument {
p.Log.Debugf("XML document equivalent: %q", p.document.OutputXML(doc))
}
// Queries
metrics := make([]telegraf.Metric, 0)
@ -50,7 +93,7 @@ func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
if len(config.Selection) == 0 {
config.Selection = "/"
}
selectedNodes, err := xmlquery.QueryAll(doc, config.Selection)
selectedNodes, err := p.document.QueryAll(doc, config.Selection)
if err != nil {
return nil, err
}
@ -82,14 +125,14 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
case 1:
config := p.Configs[0]
doc, err := xmlquery.Parse(strings.NewReader(line))
doc, err := p.document.Parse([]byte(line))
if err != nil {
return nil, err
}
selected := doc
if len(config.Selection) > 0 {
selectedNodes, err := xmlquery.QueryAll(doc, config.Selection)
selectedNodes, err := p.document.QueryAll(doc, config.Selection)
if err != nil {
return nil, err
}
@ -111,7 +154,7 @@ func (p *Parser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}
func (p *Parser) parseQuery(starttime time.Time, doc, selected *xmlquery.Node, config Config) (telegraf.Metric, error) {
func (p *Parser) parseQuery(starttime time.Time, doc, selected dataNode, config Config) (telegraf.Metric, error) {
var timestamp time.Time
var metricname string
@ -119,7 +162,7 @@ func (p *Parser) parseQuery(starttime time.Time, doc, selected *xmlquery.Node, c
// otherwise.
metricname = config.MetricName
if len(config.MetricQuery) > 0 {
v, err := executeQuery(doc, selected, config.MetricQuery)
v, err := p.executeQuery(doc, selected, config.MetricQuery)
if err != nil {
return nil, fmt.Errorf("failed to query metric name: %v", err)
}
@ -130,7 +173,7 @@ func (p *Parser) parseQuery(starttime time.Time, doc, selected *xmlquery.Node, c
// with the queried timestamp if an expresion was specified.
timestamp = starttime
if len(config.Timestamp) > 0 {
v, err := executeQuery(doc, selected, config.Timestamp)
v, err := p.executeQuery(doc, selected, config.Timestamp)
if err != nil {
return nil, fmt.Errorf("failed to query timestamp: %v", err)
}
@ -177,7 +220,7 @@ func (p *Parser) parseQuery(starttime time.Time, doc, selected *xmlquery.Node, c
tags := make(map[string]string)
for name, query := range config.Tags {
// Execute the query and cast the returned values into strings
v, err := executeQuery(doc, selected, query)
v, err := p.executeQuery(doc, selected, query)
if err != nil {
return nil, fmt.Errorf("failed to query tag '%s': %v", name, err)
}
@ -202,7 +245,7 @@ func (p *Parser) parseQuery(starttime time.Time, doc, selected *xmlquery.Node, c
fields := make(map[string]interface{})
for name, query := range config.FieldsInt {
// Execute the query and cast the returned values into integers
v, err := executeQuery(doc, selected, query)
v, err := p.executeQuery(doc, selected, query)
if err != nil {
return nil, fmt.Errorf("failed to query field (int) '%s': %v", name, err)
}
@ -228,7 +271,7 @@ func (p *Parser) parseQuery(starttime time.Time, doc, selected *xmlquery.Node, c
for name, query := range config.Fields {
// Execute the query and store the result in fields
v, err := executeQuery(doc, selected, query)
v, err := p.executeQuery(doc, selected, query)
if err != nil {
return nil, fmt.Errorf("failed to query field '%s': %v", name, err)
}
@ -247,14 +290,14 @@ func (p *Parser) parseQuery(starttime time.Time, doc, selected *xmlquery.Node, c
}
// Query all fields
selectedFieldNodes, err := xmlquery.QueryAll(selected, config.FieldSelection)
selectedFieldNodes, err := p.document.QueryAll(selected, config.FieldSelection)
if err != nil {
return nil, err
}
p.Log.Debugf("Number of selected field nodes: %d", len(selectedFieldNodes))
if len(selectedFieldNodes) > 0 && selectedFieldNodes[0] != nil {
for _, selectedfield := range selectedFieldNodes {
n, err := executeQuery(doc, selectedfield, fieldnamequery)
n, err := p.executeQuery(doc, selectedfield, fieldnamequery)
if err != nil {
return nil, fmt.Errorf("failed to query field name with query '%s': %v", fieldnamequery, err)
}
@ -262,13 +305,13 @@ func (p *Parser) parseQuery(starttime time.Time, doc, selected *xmlquery.Node, c
if !ok {
return nil, fmt.Errorf("failed to query field name with query '%s': result is not a string (%v)", fieldnamequery, n)
}
v, err := executeQuery(doc, selectedfield, fieldvaluequery)
v, err := p.executeQuery(doc, selectedfield, fieldvaluequery)
if err != nil {
return nil, fmt.Errorf("failed to query field value for '%s': %v", name, err)
}
path := name
if config.FieldNameExpand {
p := getNodePath(selectedfield, selected, "_")
p := p.document.GetNodePath(selectedfield, selected, "_")
if len(p) > 0 {
path = p + "_" + name
}
@ -295,30 +338,7 @@ func (p *Parser) parseQuery(starttime time.Time, doc, selected *xmlquery.Node, c
return metric.New(metricname, tags, fields, timestamp), nil
}
func getNodePath(node, relativeTo *xmlquery.Node, sep string) string {
names := make([]string, 0)
// Climb up the tree and collect the node names
n := node.Parent
for n != nil && n != relativeTo {
names = append(names, n.Data)
n = n.Parent
}
if len(names) < 1 {
return ""
}
// Construct the nodes
path := ""
for _, name := range names {
path = name + sep + path
}
return path[:len(path)-1]
}
func executeQuery(doc, selected *xmlquery.Node, query string) (r interface{}, err error) {
func (p *Parser) executeQuery(doc, selected dataNode, query string) (r interface{}, err error) {
// Check if the query is relative or absolute and set the root for the query
root := selected
if strings.HasPrefix(query, "/") {
@ -326,7 +346,7 @@ func executeQuery(doc, selected *xmlquery.Node, query string) (r interface{}, er
}
// Compile the query
expr, err := xpath.Compile(query)
expr, err := path.Compile(query)
if err != nil {
return nil, fmt.Errorf("failed to compile query '%s': %v", query, err)
}
@ -334,8 +354,8 @@ func executeQuery(doc, selected *xmlquery.Node, query string) (r interface{}, er
// Evaluate the compiled expression and handle returned node-iterators
// separately. Those iterators will be returned for queries directly
// referencing a node (value or attribute).
n := expr.Evaluate(xmlquery.CreateXPathNavigator(root))
if iter, ok := n.(*xpath.NodeIterator); ok {
n := expr.Evaluate(p.document.CreateXPathNavigator(root))
if iter, ok := n.(*path.NodeIterator); ok {
// We got an iterator, so take the first match and get the referenced
// property. This will always be a string.
if iter.MoveNext() {
@ -399,7 +419,7 @@ func splitLastPathElement(query string) []string {
return elements
}
func (p *Parser) debugEmptyQuery(operation string, root *xmlquery.Node, initialquery string) {
func (p *Parser) debugEmptyQuery(operation string, root dataNode, initialquery string) {
if p.Log == nil {
return
}
@ -415,7 +435,7 @@ func (p *Parser) debugEmptyQuery(operation string, root *xmlquery.Node, initialq
}
for i := len(parts) - 1; i >= 0; i-- {
q := parts[i]
nodes, err := xmlquery.QueryAll(root, q)
nodes, err := p.document.QueryAll(root, q)
if err != nil {
p.Log.Debugf("executing query %q in %s failed: %v", q, operation, err)
return

View File

@ -1,4 +1,4 @@
package xml
package xpath
import (
"io/ioutil"
@ -12,7 +12,6 @@ import (
"github.com/influxdata/telegraf/testutil"
"github.com/influxdata/toml"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -127,6 +126,7 @@ func TestParseInvalidXML(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
parser := &Parser{Configs: tt.configs, DefaultTags: tt.defaultTags, Log: testutil.Logger{Name: "parsers.xml"}}
require.NoError(t, parser.Init())
_, err := parser.ParseLine(tt.input)
require.Error(t, err)
@ -163,6 +163,7 @@ func TestInvalidTypeQueriesFail(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
parser := &Parser{Configs: tt.configs, DefaultTags: tt.defaultTags, Log: testutil.Logger{Name: "parsers.xml"}}
require.NoError(t, parser.Init())
_, err := parser.ParseLine(tt.input)
require.Error(t, err)
@ -228,6 +229,7 @@ func TestInvalidTypeQueries(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
parser := &Parser{Configs: tt.configs, DefaultTags: tt.defaultTags, Log: testutil.Logger{Name: "parsers.xml"}}
require.NoError(t, parser.Init())
actual, err := parser.ParseLine(tt.input)
require.NoError(t, err)
@ -357,6 +359,7 @@ func TestParseTimestamps(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
parser := &Parser{Configs: tt.configs, DefaultTags: tt.defaultTags, Log: testutil.Logger{Name: "parsers.xml"}}
require.NoError(t, parser.Init())
actual, err := parser.ParseLine(tt.input)
require.NoError(t, err)
@ -561,6 +564,7 @@ func TestParseSingleValues(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
parser := &Parser{Configs: tt.configs, DefaultTags: tt.defaultTags, Log: testutil.Logger{Name: "parsers.xml"}}
require.NoError(t, parser.Init())
actual, err := parser.ParseLine(tt.input)
require.NoError(t, err)
@ -772,6 +776,7 @@ func TestParseSingleAttributes(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
parser := &Parser{Configs: tt.configs, DefaultTags: tt.defaultTags, Log: testutil.Logger{Name: "parsers.xml"}}
require.NoError(t, parser.Init())
actual, err := parser.ParseLine(tt.input)
require.NoError(t, err)
@ -858,6 +863,7 @@ func TestParseMultiValues(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
parser := &Parser{Configs: tt.configs, DefaultTags: tt.defaultTags, Log: testutil.Logger{Name: "parsers.xml"}}
require.NoError(t, parser.Init())
actual, err := parser.ParseLine(tt.input)
require.NoError(t, err)
@ -970,6 +976,7 @@ func TestParseMultiNodes(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
parser := &Parser{Configs: tt.configs, DefaultTags: tt.defaultTags, Log: testutil.Logger{Name: "parsers.xml"}}
require.NoError(t, parser.Init())
actual, err := parser.Parse([]byte(tt.input))
require.NoError(t, err)
@ -1015,6 +1022,7 @@ func TestParseMetricQuery(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
parser := &Parser{Configs: tt.configs, DefaultTags: tt.defaultTags, Log: testutil.Logger{Name: "parsers.xml"}}
require.NoError(t, parser.Init())
actual, err := parser.ParseLine(tt.input)
require.NoError(t, err)
@ -1080,11 +1088,10 @@ func TestEmptySelection(t *testing.T) {
},
}
logger := testutil.Logger{Name: "parsers.xml"}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
parser := &Parser{Configs: tt.configs, DefaultTags: map[string]string{}, Log: logger}
parser := &Parser{Configs: tt.configs, DefaultTags: map[string]string{}, Log: testutil.Logger{Name: "parsers.xml"}}
require.NoError(t, parser.Init())
_, err := parser.Parse([]byte(tt.input))
require.Error(t, err)
@ -1110,14 +1117,26 @@ func TestTestCases(t *testing.T) {
name: "field selection batch",
filename: "testcases/multisensor_selection_batch.conf",
},
{
name: "openweathermap forecast",
filename: "testcases/openweathermap.conf",
},
{
name: "earthquakes quakeml",
filename: "testcases/earthquakes.conf",
},
{
name: "openweathermap forecast (xml)",
filename: "testcases/openweathermap_xml.conf",
},
{
name: "openweathermap forecast (json)",
filename: "testcases/openweathermap_json.conf",
},
{
name: "addressbook tutorial (protobuf)",
filename: "testcases/addressbook.conf",
},
{
name: "message-pack",
filename: "testcases/tracker_msgpack.conf",
},
}
parser := influx.NewParser(influx.NewMetricHandler())
@ -1132,9 +1151,29 @@ func TestTestCases(t *testing.T) {
// Load the xml-content
input, err := testutil.ParseRawLinesFrom(header, "File:")
require.NoError(t, err)
assert.Len(t, input, 1)
require.Len(t, input, 1)
filefields := strings.Fields(input[0])
require.GreaterOrEqual(t, len(filefields), 1)
datafile := filepath.FromSlash(filefields[0])
fileformat := ""
if len(filefields) > 1 {
fileformat = filefields[1]
}
// Load the protocol buffer information if required
var pbmsgdef, pbmsgtype string
if fileformat == "xpath_protobuf" {
input, err := testutil.ParseRawLinesFrom(header, "Protobuf:")
require.NoError(t, err)
require.Len(t, input, 1)
protofields := strings.Fields(input[0])
require.Len(t, protofields, 2)
pbmsgdef = protofields[0]
pbmsgtype = protofields[1]
}
datafile := filepath.FromSlash(input[0])
content, err := ioutil.ReadFile(datafile)
require.NoError(t, err)
@ -1145,7 +1184,14 @@ func TestTestCases(t *testing.T) {
expectedErrors, _ := testutil.ParseRawLinesFrom(header, "Expected Error:")
// Setup the parser and run it.
parser := &Parser{Configs: []Config{*cfg}, Log: testutil.Logger{Name: "parsers.xml"}}
parser := &Parser{
Format: fileformat,
ProtobufMessageDef: pbmsgdef,
ProtobufMessageType: pbmsgtype,
Configs: []Config{*cfg},
Log: testutil.Logger{Name: "parsers.xml"},
}
require.NoError(t, parser.Init())
outputs, err := parser.Parse(content)
if len(expectedErrors) == 0 {
require.NoError(t, err)

View File

@ -0,0 +1,161 @@
package xpath
import (
"fmt"
"sort"
"strings"
"github.com/influxdata/telegraf"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/reflect/protoregistry"
"google.golang.org/protobuf/types/dynamicpb"
"github.com/jhump/protoreflect/desc/protoparse"
path "github.com/antchfx/xpath"
"github.com/doclambda/protobufquery"
)
type protobufDocument struct {
MessageDefinition string
MessageType string
Log telegraf.Logger
msg *dynamicpb.Message
}
func (d *protobufDocument) Init() error {
// Check the message definition and type
if d.MessageDefinition == "" {
return fmt.Errorf("protocol-buffer message-definition not set")
}
if d.MessageType == "" {
return fmt.Errorf("protocol-buffer message-type not set")
}
// Load the file descriptors from the given protocol-buffer definition
parser := protoparse.Parser{}
fds, err := parser.ParseFiles(d.MessageDefinition)
if err != nil {
return fmt.Errorf("parsing protocol-buffer definition in %q failed: %v", d.MessageDefinition, err)
}
if len(fds) < 1 {
return fmt.Errorf("file %q does not contain file descriptors", d.MessageDefinition)
}
// Register all definitions in the file in the global registry
for _, fd := range fds {
if fd == nil {
continue
}
fileDescProto := fd.AsFileDescriptorProto()
fileDesc, err := protodesc.NewFile(fileDescProto, nil)
if err != nil {
return fmt.Errorf("creating file descriptor from proto failed: %v", err)
}
if err := protoregistry.GlobalFiles.RegisterFile(fileDesc); err != nil {
return fmt.Errorf("registering file descriptor %q failed: %v", fileDesc.Package(), err)
}
}
// Lookup given type in the loaded file descriptors
msgFullName := protoreflect.FullName(d.MessageType)
desc, err := protoregistry.GlobalFiles.FindDescriptorByName(msgFullName)
if err != nil {
d.Log.Infof("Could not find %q... Known messages:", msgFullName)
var known []string
protoregistry.GlobalFiles.RangeFiles(func(fd protoreflect.FileDescriptor) bool {
name := strings.TrimSpace(string(fd.FullName()))
if name != "" {
known = append(known, name)
}
return true
})
sort.Strings(known)
for _, name := range known {
d.Log.Infof(" %s", name)
}
return err
}
// Get a prototypical message for later use
msgDesc, ok := desc.(protoreflect.MessageDescriptor)
if !ok {
return fmt.Errorf("%q is not a message descriptor (%T)", msgFullName, desc)
}
d.msg = dynamicpb.NewMessage(msgDesc)
if d.msg == nil {
return fmt.Errorf("creating message template for %q failed", msgDesc.FullName())
}
return nil
}
func (d *protobufDocument) Parse(buf []byte) (dataNode, error) {
msg := d.msg.New()
// Unmarshal the received buffer
if err := proto.Unmarshal(buf, msg.Interface()); err != nil {
return nil, err
}
return protobufquery.Parse(msg)
}
func (d *protobufDocument) QueryAll(node dataNode, expr string) ([]dataNode, error) {
// If this panics it's a programming error as we changed the document type while processing
native, err := protobufquery.QueryAll(node.(*protobufquery.Node), expr)
if err != nil {
return nil, err
}
nodes := make([]dataNode, len(native))
for i, n := range native {
nodes[i] = n
}
return nodes, nil
}
func (d *protobufDocument) CreateXPathNavigator(node dataNode) path.NodeNavigator {
// If this panics it's a programming error as we changed the document type while processing
return protobufquery.CreateXPathNavigator(node.(*protobufquery.Node))
}
func (d *protobufDocument) GetNodePath(node, relativeTo dataNode, sep string) string {
names := make([]string, 0)
// If these panic it's a programming error as we changed the document type while processing
nativeNode := node.(*protobufquery.Node)
nativeRelativeTo := relativeTo.(*protobufquery.Node)
// Climb up the tree and collect the node names
n := nativeNode.Parent
for n != nil && n != nativeRelativeTo {
names = append(names, n.Name)
n = n.Parent
}
if len(names) < 1 {
return ""
}
// Construct the nodes
nodepath := ""
for _, name := range names {
nodepath = name + sep + nodepath
}
return nodepath[:len(nodepath)-1]
}
func (d *protobufDocument) OutputXML(node dataNode) string {
native := node.(*protobufquery.Node)
return native.OutputXML()
}
func init() {
}

View File

@ -0,0 +1,28 @@
# Example for parsing an example protocol buffer data.
#
# File:
# testcases/addressbook.dat xpath_protobuf
#
# Protobuf:
# testcases/addressbook.proto addressbook.AddressBook
#
# Expected Output:
# addresses,id=101,name=John\ Doe age=42i,email="john@example.com" 1621430181000000000
# addresses,id=102,name=Jane\ Doe age=40i 1621430181000000000
# addresses,id=201,name=Jack\ Doe age=12i,email="jack@example.com" 1621430181000000000
# addresses,id=301,name=Jack\ Buck age=19i,email="buck@example.com" 1621430181000000000
# addresses,id=1001,name=Janet\ Doe age=16i,email="janet@example.com" 1621430181000000000
#
metric_name = "'addresses'"
metric_selection = "//people"
[tags]
id = "id"
name = "name"
[fields_int]
age = "age"
[fields]
email = "email"

View File

@ -0,0 +1,17 @@
John Doeejohn@example.com *

Jane Doef (
3
Jack DoeÉjack@example.com *
555-555-5555
V
Jack Buck­buck@example.com *
555-555-0000*
555-555-0001*
555-555-0002
E
Janet Doeéjanet@example.com *
555-777-0000*
555-777-0001homeprivatefriends

View File

@ -0,0 +1,28 @@
syntax = "proto3";
package addressbook;
message Person {
string name = 1;
int32 id = 2; // Unique ID number for this person.
string email = 3;
uint32 age = 4;
enum PhoneType {
MOBILE = 0;
HOME = 1;
WORK = 2;
}
message PhoneNumber {
string number = 1;
PhoneType type = 2;
}
repeated PhoneNumber phones = 5;
}
message AddressBook {
repeated Person people = 1;
repeated string tags = 2;
}

View File

@ -0,0 +1,127 @@
{
"cod": "200",
"message": 0.0179,
"cnt": 96,
"list": [
{
"dt": 1596632400,
"main": {
"temp": 280.16,
"feels_like": 280.41,
"temp_min": 280.16,
"temp_max": 280.16,
"pressure": 1010,
"sea_level": 1010,
"grnd_level": 1010,
"humidity": 70,
"temp_kf": 0
},
"weather": [
{
"id": 804,
"main": "Clouds",
"description": "overcast clouds",
"icon": "04n"
}
],
"clouds": {
"all": 100
},
"wind": {
"speed": 2.03,
"deg": 252,
"gust":5.46
},
"visibility": 10000,
"pop": 0.04,
"sys": {
"pod": "n"
},
"dt_txt": "2020-08-05 13:00:00"
},
{
"dt": 159663600,
"main": {
"temp": 281.16,
"feels_like": 281.41,
"temp_min": 281.16,
"temp_max": 281.16,
"pressure": 1011,
"sea_level": 1011,
"grnd_level": 1011,
"humidity": 71,
"temp_kf": 0
},
"weather": [
{
"id": 804,
"main": "Clouds",
"description": "overcast clouds",
"icon": "04n"
}
],
"clouds": {
"all": 100
},
"wind": {
"speed": 2.03,
"deg": 252,
"gust":5.46
},
"visibility": 10000,
"pop": 0.04,
"sys": {
"pod": "n"
},
"dt_txt": "2020-08-05 14:00:00"
},
{
"dt": 159667200,
"main": {
"temp": 282.16,
"feels_like": 282.41,
"temp_min": 282.16,
"temp_max": 282.16,
"pressure": 1012,
"sea_level": 1012,
"grnd_level": 1012,
"humidity": 71,
"temp_kf": 0
},
"weather": [
{
"id": 804,
"main": "Clouds",
"description": "overcast clouds",
"icon": "04n"
}
],
"clouds": {
"all": 100
},
"wind": {
"speed": 2.03,
"deg": 252,
"gust":5.46
},
"visibility": 10000,
"pop": 0.04,
"sys": {
"pod": "n"
},
"dt_txt": "2020-08-05 15:00:00"
}
],
"city": {
"id": 2643743,
"name": "London",
"coord": {
"lat": 51.5085,
"lon": -0.1258
},
"country": "GB",
"timezone": 0,
"sunrise": 1568958164,
"sunset": 1569002733
}
}

View File

@ -0,0 +1,29 @@
# Example for parsing openweathermap five-day-forecast data.
#
# File:
# testcases/openweathermap_5d.json xpath_json
#
# Expected Output:
# weather,city=London,country=GB humidity=70i,clouds=100i,wind_direction=252,wind_speed=2.03,temperature=137.86666666666667,precipitation=0 1596632400000000000
# weather,city=London,country=GB wind_direction=252,wind_speed=2.03,temperature=138.42222222222225,precipitation=0,clouds=100i,humidity=71i 159663600000000000
# weather,city=London,country=GB humidity=71i,clouds=100i,wind_direction=252,wind_speed=2.03,temperature=138.9777777777778,precipitation=0 159667200000000000
#
metric_name = "'weather'"
metric_selection = "//list/*"
timestamp = "dt"
timestamp_format = "unix"
[tags]
city = "/city/name"
country = "/city/country"
[fields_int]
humidity = "main/humidity"
clouds = "clouds/all"
[fields]
precipitation = "number(main/precipitation)"
wind_direction = "number(wind/deg)"
wind_speed = "number(wind/speed)"
temperature = "(number(main/temp) - 32.0)*(5.0 div 9.0)"

View File

@ -1,7 +1,7 @@
# Example for parsing openweathermap five-day-forecast data.
#
# File:
# testcases/openweathermap_5d.xml
# testcases/openweathermap_5d.xml xml
#
# Expected Output:
# weather,city=London,country=GB clouds=64i,humidity=96i,precipitation=5,temperature=16.89,wind_direction=253.5,wind_speed=4.9 1435654800000000000

View File

@ -0,0 +1 @@
„£geoË@BåsEËÀ^™ŽMîˆy¦device¨TrackerA¤infoƒ§quality­serial_number¬123abc456def£fixétimestampÎ`ÔV¨

View File

@ -0,0 +1,24 @@
# Example for parsing openweathermap five-day-forecast data.
#
# File:
# testcases/tracker.msg xpath_msgpack
#
# Expected Output:
# tracker,device=TrackerA,fixation=true serial="123abc456def",lat=37.78980863758897,lon=-122.39931057256935,quality=2i 1624528552000000000
#
metric_name = "'tracker'"
timestamp = "timestamp"
timestamp_format = "unix"
[tags]
device = "device"
fixation = "info/fix"
[fields_int]
quality = "info/quality"
[fields]
serial = "info/serial_number"
lat = "number(/geo/*[1])"
lon = "number(/geo/*[2])"

View File

@ -0,0 +1,65 @@
package xpath
import (
"strings"
"github.com/antchfx/xmlquery"
path "github.com/antchfx/xpath"
)
type xmlDocument struct{}
func (d *xmlDocument) Parse(buf []byte) (dataNode, error) {
return xmlquery.Parse(strings.NewReader(string(buf)))
}
func (d *xmlDocument) QueryAll(node dataNode, expr string) ([]dataNode, error) {
// If this panics it's a programming error as we changed the document type while processing
native, err := xmlquery.QueryAll(node.(*xmlquery.Node), expr)
if err != nil {
return nil, err
}
nodes := make([]dataNode, len(native))
for i, n := range native {
nodes[i] = n
}
return nodes, nil
}
func (d *xmlDocument) CreateXPathNavigator(node dataNode) path.NodeNavigator {
// If this panics it's a programming error as we changed the document type while processing
return xmlquery.CreateXPathNavigator(node.(*xmlquery.Node))
}
func (d *xmlDocument) GetNodePath(node, relativeTo dataNode, sep string) string {
names := make([]string, 0)
// If these panic it's a programming error as we changed the document type while processing
nativeNode := node.(*xmlquery.Node)
nativeRelativeTo := relativeTo.(*xmlquery.Node)
// Climb up the tree and collect the node names
n := nativeNode.Parent
for n != nil && n != nativeRelativeTo {
names = append(names, n.Data)
n = n.Parent
}
if len(names) < 1 {
return ""
}
// Construct the nodes
nodepath := ""
for _, name := range names {
nodepath = name + sep + nodepath
}
return nodepath[:len(nodepath)-1]
}
func (d *xmlDocument) OutputXML(node dataNode) string {
native := node.(*xmlquery.Node)
return native.OutputXML(false)
}