feat: Add tag batch-processing to XPath parser (#10585)
This commit is contained in:
parent
92d1b0efcf
commit
9387fb602d
|
|
@ -1623,6 +1623,10 @@ func (c *Config) getParserConfig(name string, tbl *ast.Table) (*parsers.Config,
|
|||
c.getFieldBool(subtbl, "field_name_expansion", &subcfg.FieldNameExpand)
|
||||
c.getFieldString(subtbl, "field_name", &subcfg.FieldNameQuery)
|
||||
c.getFieldString(subtbl, "field_value", &subcfg.FieldValueQuery)
|
||||
c.getFieldString(subtbl, "tag_selection", &subcfg.TagSelection)
|
||||
c.getFieldBool(subtbl, "tag_name_expansion", &subcfg.TagNameExpand)
|
||||
c.getFieldString(subtbl, "tag_name", &subcfg.TagNameQuery)
|
||||
c.getFieldString(subtbl, "tag_value", &subcfg.TagValueQuery)
|
||||
pc.XPathConfig[i] = subcfg
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -136,6 +136,19 @@ metric.
|
|||
## This allows to flatten out nodes with non-unique names in the subtree
|
||||
# field_name_expansion = false
|
||||
|
||||
## Tag specifications using a selector.
|
||||
## tag_selection = "child::*"
|
||||
## Optional: Queries to specify tag name and value.
|
||||
## These options are only to be used in combination with 'tag_selection'!
|
||||
## By default the node name and node content is used if a tag-selection
|
||||
## is specified.
|
||||
# tag_name = "name()"
|
||||
# tag_value = "."
|
||||
|
||||
## Optional: Expand tag names relative to the selected node
|
||||
## This allows to flatten out nodes with non-unique names in the subtree
|
||||
# tag_name_expansion = false
|
||||
|
||||
## Tag definitions using the given XPath queries.
|
||||
[inputs.file.xpath.tags]
|
||||
name = "substring-after(Sensor/@name, ' ')"
|
||||
|
|
@ -206,6 +219,21 @@ When *true*, field names selected with `field_selection` are expanded to a *path
|
|||
is necessary if we e.g. select all leaf nodes as fields and those leaf nodes do not have unique names. That is in case
|
||||
you have duplicate names in the fields you select you should set this to `true`.
|
||||
|
||||
### tag_selection, tag_name, tag_value (optional)
|
||||
|
||||
You can specify a [XPath][xpath] query to select a set of nodes forming the tags of the metric. The specified path can be absolute (starting with `/`) or relative to the currently selected node. Each node selected by `tag_selection` forms a new tag within the metric.
|
||||
|
||||
The *name* and the *value* of each tag can be specified using the optional `tag_name` and `tag_value` queries. The queries are relative to the selected tag if not starting with `/`. If not specified the tag's *name* defaults to the node name and the tag's *value* defaults to the content of the selected tag node.
|
||||
**NOTE**: `tag_name` and `tag_value` queries are only evaluated if a `tag_selection` is specified.
|
||||
|
||||
Specifying `tag_selection` is optional. This is an alternative way to specify tags especially for documents where the node names are not known a priori or if there is a large number of tags to be specified. These options can also be combined with the tag specifications above.
|
||||
|
||||
### tag_name_expansion (optional)
|
||||
|
||||
When *true*, tag names selected with `tag_selection` are expanded to a *path* relative to the *selected node*. This
|
||||
is necessary if we e.g. select all leaf nodes as tags and those leaf nodes do not have unique names. That is in case
|
||||
you have duplicate names in the tags you select you should set this to `true`.
|
||||
|
||||
## Examples
|
||||
|
||||
This `example.xml` file is used in the configuration examples below:
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import (
|
|||
path "github.com/antchfx/xpath"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
)
|
||||
|
||||
|
|
@ -48,6 +49,11 @@ type Config struct {
|
|||
FieldNameQuery string `toml:"field_name"`
|
||||
FieldValueQuery string `toml:"field_value"`
|
||||
FieldNameExpand bool `toml:"field_name_expansion"`
|
||||
|
||||
TagSelection string `toml:"tag_selection"`
|
||||
TagNameQuery string `toml:"tag_name"`
|
||||
TagValueQuery string `toml:"tag_value"`
|
||||
TagNameExpand bool `toml:"tag_name_expansion"`
|
||||
}
|
||||
|
||||
func (p *Parser) Init() error {
|
||||
|
|
@ -89,6 +95,7 @@ func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
|
|||
|
||||
// Queries
|
||||
metrics := make([]telegraf.Metric, 0)
|
||||
p.Log.Debugf("Number of configs: %d", len(p.Configs))
|
||||
for _, config := range p.Configs {
|
||||
if len(config.Selection) == 0 {
|
||||
config.Selection = "/"
|
||||
|
|
@ -243,6 +250,69 @@ func (p *Parser) parseQuery(starttime time.Time, doc, selected dataNode, config
|
|||
return nil, fmt.Errorf("unknown format '%T' for tag '%s'", v, name)
|
||||
}
|
||||
}
|
||||
|
||||
// Handle the tag batch definitions if any.
|
||||
if len(config.TagSelection) > 0 {
|
||||
tagnamequery := "name()"
|
||||
tagvaluequery := "."
|
||||
if len(config.TagNameQuery) > 0 {
|
||||
tagnamequery = config.TagNameQuery
|
||||
}
|
||||
if len(config.TagValueQuery) > 0 {
|
||||
tagvaluequery = config.TagValueQuery
|
||||
}
|
||||
|
||||
// Query all tags
|
||||
selectedTagNodes, err := p.document.QueryAll(selected, config.TagSelection)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.Log.Debugf("Number of selected tag nodes: %d", len(selectedTagNodes))
|
||||
if len(selectedTagNodes) > 0 && selectedTagNodes[0] != nil {
|
||||
for _, selectedtag := range selectedTagNodes {
|
||||
n, err := p.executeQuery(doc, selectedtag, tagnamequery)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query tag name with query '%s': %v", tagnamequery, err)
|
||||
}
|
||||
name, ok := n.(string)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("failed to query tag name with query '%s': result is not a string (%v)", tagnamequery, n)
|
||||
}
|
||||
v, err := p.executeQuery(doc, selectedtag, tagvaluequery)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query tag value for '%s': %v", name, err)
|
||||
}
|
||||
|
||||
if config.TagNameExpand {
|
||||
p := p.document.GetNodePath(selectedtag, selected, "_")
|
||||
if len(p) > 0 {
|
||||
name = p + "_" + name
|
||||
}
|
||||
}
|
||||
|
||||
// Check if field name already exists and if so, append an index number.
|
||||
if _, ok := tags[name]; ok {
|
||||
for i := 1; ; i++ {
|
||||
p := name + "_" + strconv.Itoa(i)
|
||||
if _, ok := tags[p]; !ok {
|
||||
name = p
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Convert the tag to be a string
|
||||
s, err := internal.ToString(v)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query tag value for '%s': result is not a string (%v)", name, v)
|
||||
}
|
||||
tags[name] = s
|
||||
}
|
||||
} else {
|
||||
p.debugEmptyQuery("tag selection", selected, config.TagSelection)
|
||||
}
|
||||
}
|
||||
|
||||
for name, v := range p.DefaultTags {
|
||||
tags[name] = v
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1196,6 +1196,10 @@ func TestTestCases(t *testing.T) {
|
|||
name: "message-pack",
|
||||
filename: "testcases/tracker_msgpack.conf",
|
||||
},
|
||||
{
|
||||
name: "field and tag batch (json)",
|
||||
filename: "testcases/field_tag_batch.conf",
|
||||
},
|
||||
}
|
||||
|
||||
parser := influx.NewParser(influx.NewMetricHandler())
|
||||
|
|
|
|||
|
|
@ -0,0 +1,14 @@
|
|||
# Example for reading batches of selecting fields AND tags.
|
||||
#
|
||||
# File:
|
||||
# testcases/field_tag_batch.json xpath_json
|
||||
#
|
||||
# Expected Output:
|
||||
# measurementName,machine=machineValue,source=sourceValue field1="1",field2="2" 1643760000000000000
|
||||
#
|
||||
|
||||
metric_name = "/measurement"
|
||||
timestamp = "/timestamp"
|
||||
timestamp_format = "2006-01-02T15:04:05Z"
|
||||
field_selection = "fields/child::*"
|
||||
tag_selection = "tags/child::*"
|
||||
|
|
@ -0,0 +1,12 @@
|
|||
{
|
||||
"measurement": "measurementName",
|
||||
"timestamp": "2022-02-02T00:00:00Z",
|
||||
"tags": {
|
||||
"source": "sourceValue",
|
||||
"machine": "machineValue"
|
||||
},
|
||||
"fields": {
|
||||
"field1": 1.0,
|
||||
"field2": 2.0
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue