feat(outputs.parquet): Introduce Parquet output (#15602)

This commit is contained in:
Joshua Powers 2024-07-25 02:40:18 -06:00 committed by GitHub
parent 26df1e7205
commit a3eda34048
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 741 additions and 27 deletions

15
go.mod
View File

@ -37,10 +37,9 @@ require (
github.com/antchfx/jsonquery v1.3.3
github.com/antchfx/xmlquery v1.4.1
github.com/antchfx/xpath v1.3.1
github.com/apache/arrow/go/v13 v13.0.0
github.com/apache/arrow/go/v16 v16.0.0-20240319161736-1ee3da0064a0
github.com/apache/arrow/go/v18 v18.0.0-20240716144821-cf5d7c7ec3cf
github.com/apache/iotdb-client-go v1.2.0-tsbs
github.com/apache/thrift v0.19.0
github.com/apache/thrift v0.20.0
github.com/aristanetworks/goarista v0.0.0-20190325233358-a123909ec740
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5
github.com/awnumar/memguard v0.22.5
@ -212,7 +211,7 @@ require (
go.starlark.net v0.0.0-20240520160348-046347dcd104
go.step.sm/crypto v0.50.0
golang.org/x/crypto v0.25.0
golang.org/x/mod v0.18.0
golang.org/x/mod v0.19.0
golang.org/x/net v0.27.0
golang.org/x/oauth2 v0.21.0
golang.org/x/sync v0.7.0
@ -348,7 +347,7 @@ require (
github.com/golang-sql/sqlexp v0.1.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/flatbuffers v24.3.7+incompatible // indirect
github.com/google/flatbuffers v24.3.25+incompatible // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
@ -371,7 +370,7 @@ require (
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/hashicorp/packer-plugin-sdk v0.3.2 // indirect
github.com/hashicorp/serf v0.10.1 // indirect
github.com/huandu/xstrings v1.3.3 // indirect
github.com/huandu/xstrings v1.4.0 // indirect
github.com/imdario/mergo v0.3.16 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
@ -392,7 +391,7 @@ require (
github.com/json-iterator/go v1.1.12 // indirect
github.com/jzelinskie/whirlpool v0.0.0-20201016144138-0675e54bb004 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/kr/fs v0.1.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/leodido/ragel-machinery v0.0.0-20190525184631-5f46317e436b // indirect
@ -499,7 +498,7 @@ require (
go.uber.org/zap v1.24.0 // indirect
golang.org/x/exp v0.0.0-20240529005216-23cca8864a10 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.22.0 // indirect
golang.org/x/tools v0.23.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
golang.zx2c4.com/wireguard v0.0.0-20211209221555-9c9e7e272434 // indirect
google.golang.org/genproto v0.0.0-20240708141625-4ad9e859172b // indirect

29
go.sum
View File

@ -827,18 +827,16 @@ github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8
github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g=
github.com/apache/arrow/go/v10 v10.0.1/go.mod h1:YvhnlEePVnBS4+0z3fhPfUy7W1Ikj0Ih0vcRo/gZ1M0=
github.com/apache/arrow/go/v11 v11.0.0/go.mod h1:Eg5OsL5H+e299f7u5ssuXsuHQVEGC4xei5aX110hRiI=
github.com/apache/arrow/go/v13 v13.0.0 h1:kELrvDQuKZo8csdWYqBQfyi431x6Zs/YJTEgUuSVcWk=
github.com/apache/arrow/go/v13 v13.0.0/go.mod h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc=
github.com/apache/arrow/go/v15 v15.0.2 h1:60IliRbiyTWCWjERBCkO1W4Qun9svcYoZrSLcyOsMLE=
github.com/apache/arrow/go/v15 v15.0.2/go.mod h1:DGXsR3ajT524njufqf95822i+KTh+yea1jass9YXgjA=
github.com/apache/arrow/go/v16 v16.0.0-20240319161736-1ee3da0064a0 h1:XbC214lVvnAnDzowGV7dYiv4f4Aa6jhtIby08OgbcUg=
github.com/apache/arrow/go/v16 v16.0.0-20240319161736-1ee3da0064a0/go.mod h1:VVbdJivCXZAJ6IhOSCSzk/RVQ/PlcitjskAWEST3Sc0=
github.com/apache/arrow/go/v18 v18.0.0-20240716144821-cf5d7c7ec3cf h1:9b4bG4uqvid0RH3MHWq2soXTfhPFbqbuNCqLRrl4ZGg=
github.com/apache/arrow/go/v18 v18.0.0-20240716144821-cf5d7c7ec3cf/go.mod h1:84kVJOfdiXAj9Zo8lvZ2uuJVzPn2vKlPdrSHU1zD2mE=
github.com/apache/iotdb-client-go v1.2.0-tsbs h1:hezGUydAkDSceCvsetYorI87S2e8HZ4hTQHmGZgOGDY=
github.com/apache/iotdb-client-go v1.2.0-tsbs/go.mod h1:3D6QYkqRmASS/4HsjU+U/3fscyc5M9xKRfywZsKuoZY=
github.com/apache/thrift v0.15.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU=
github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU=
github.com/apache/thrift v0.19.0 h1:sOqkWPzMj7w6XaYbJQG7m4sGqVolaW/0D28Ln7yPzMk=
github.com/apache/thrift v0.19.0/go.mod h1:SUALL216IiaOw2Oy+5Vs9lboJ/t9g40C+G07Dc0QC1I=
github.com/apache/thrift v0.20.0 h1:631+KvYbsBZxmuJjYwhezVsrfc/TbqtZV4QcxOX1fOI=
github.com/apache/thrift v0.20.0/go.mod h1:hOk1BQqcp2OLzGsyVXdfMk7YFlMxK3aoEVhjD06QhB8=
github.com/apex/log v1.6.0/go.mod h1:x7s+P9VtvFBXge9Vbn+8TrqKmuzmD35TTkeBHul8UtY=
github.com/apex/logs v1.0.0/go.mod h1:XzxuLZ5myVHDy9SAmYpamKKRNApGj54PfYLcFrXqDwo=
github.com/aphistic/golf v0.0.0-20180712155816-02c07f170c5a/go.mod h1:3NqKYiepwy8kCu4PNA+aP7WUV72eXWJeP9/r3/K9aLE=
@ -1382,8 +1380,8 @@ github.com/google/cel-go v0.20.1 h1:nDx9r8S3L4pE61eDdt8igGj8rf5kjYR3ILxWIpWNi84=
github.com/google/cel-go v0.20.1/go.mod h1:kWcIzTsPX0zmQ+H3TirHstLLf9ep5QTsZBN9u4dOYLg=
github.com/google/flatbuffers v1.12.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/flatbuffers v2.0.8+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/flatbuffers v24.3.7+incompatible h1:BxGUkIQnOciBu33bd5BdvqY8Qvo0O/GR4SPhh7x9Ed0=
github.com/google/flatbuffers v24.3.7+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/flatbuffers v24.3.25+incompatible h1:CX395cjN9Kke9mmalRoL3d81AtFUxJM+yDthflgJGkI=
github.com/google/flatbuffers v24.3.25+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I=
github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U=
github.com/google/gnxi v0.0.0-20231026134436-d82d9936af15 h1:EETGSLGKBReUUYZdztSp45EzTE6CHw2qMKIfyPrgp6c=
@ -1593,8 +1591,9 @@ github.com/henrybear327/go-proton-api v1.0.0/go.mod h1:w63MZuzufKcIZ93pwRgiOtxMX
github.com/hetznercloud/hcloud-go/v2 v2.4.0 h1:MqlAE+w125PLvJRCpAJmEwrIxoVdUdOyuFUhE/Ukbok=
github.com/hetznercloud/hcloud-go/v2 v2.4.0/go.mod h1:l7fA5xsncFBzQTyw29/dw5Yr88yEGKKdc6BHf24ONS0=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/huandu/xstrings v1.3.3 h1:/Gcsuc1x8JVbJ9/rlye4xZnVAbEkGauT8lbebqcQws4=
github.com/huandu/xstrings v1.3.3/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/huandu/xstrings v1.4.0 h1:D17IlohoQq4UcpqD7fDk80P7l+lwAmlFaBHgOipl2FU=
github.com/huandu/xstrings v1.4.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
@ -1766,8 +1765,8 @@ github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/cpuid v1.2.0/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM=
github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM=
github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/klauspost/pgzip v1.2.4/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
github.com/klauspost/pgzip v1.2.5/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
github.com/klauspost/pgzip v1.2.6 h1:8RXeL5crjEUFnR2/Sn6GJNWtSQ3Dk8pq4CL3jvdDyjU=
@ -2619,8 +2618,8 @@ golang.org/x/mod v0.6.0/go.mod h1:4mET923SAdbXp2ki8ey+zGs1SLqsuM2Y0uvdZR/fUNI=
golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.18.0 h1:5+9lSbEzPSdWkH32vYPBwEpX8KwDbM52Ud9xBUvNlb0=
golang.org/x/mod v0.18.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.19.0 h1:fEdghXQSo20giMthA7cd28ZC+jts4amQ3YMXiP5oMQ8=
golang.org/x/mod v0.19.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@ -3027,8 +3026,8 @@ golang.org/x/tools v0.2.0/go.mod h1:y4OqIKeOV/fWJetJ8bXPU1sEVniLMIyDAZWeHdV+NTA=
golang.org/x/tools v0.3.0/go.mod h1:/rWhSS2+zyEVwoJf8YAX6L2f0ntZ7Kn/mGgAWcipA5k=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s=
golang.org/x/tools v0.22.0 h1:gqSGLZqv+AI9lIQzniJ0nZDRG5GBPsSi+DRNHWNz6yA=
golang.org/x/tools v0.22.0/go.mod h1:aCwcsjqvq7Yqt6TNyX7QMU2enbQ/Gt0bo6krSeEri+c=
golang.org/x/tools v0.23.0 h1:SGsXPZ+2l4JsgaCKkx+FQ9YZ5XEtA1GZYuoDjenLjvg=
golang.org/x/tools v0.23.0/go.mod h1:pnu6ufv6vQkll6szChhK3C3L/ruaIv5eBeztNG8wtsI=
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

View File

@ -5,7 +5,7 @@ import (
_ "github.com/ClickHouse/clickhouse-go"
_ "github.com/IBM/nzgo/v12"
_ "github.com/SAP/go-hdb/driver"
_ "github.com/apache/arrow/go/v13/arrow/flight/flightsql/driver"
_ "github.com/apache/arrow/go/v18/arrow/flight/flightsql/driver"
_ "github.com/go-sql-driver/mysql"
_ "github.com/jackc/pgx/v4/stdlib"
_ "github.com/microsoft/go-mssqldb"

View File

@ -0,0 +1,5 @@
//go:build !custom || outputs || outputs.parquet
package all
import _ "github.com/influxdata/telegraf/plugins/outputs/parquet" // register plugin

View File

@ -0,0 +1,115 @@
# Parquet Output Plugin
This plugin writes metrics to parquet files. By default, the parquet
output groups metrics by metric name and write those metrics all to the same
file. If a metric schema does not match then metrics are dropped.
To lean more about Parquet check out the [Parquet docs][] as well as a blog
post on [Querying Parquet][].
[Parquet docs]: https://parquet.apache.org/docs/
[Querying Parquet]: https://www.influxdata.com/blog/querying-parquet-millisecond-latency/
## Global configuration options <!-- @/docs/includes/plugin_config.md -->
In addition to the plugin-specific configuration settings, plugins support
additional global and plugin configuration settings. These settings are used to
modify metrics, tags, and field or create aliases and configure ordering, etc.
See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins
## Configuration
```toml @sample.conf
# A plugin that writes metrics to parquet files
[[outputs.parquet]]
## Directory to write parquet files in. If a file already exists the output
## will attempt to continue using the existing file.
# directory = "."
## Files are rotated after the time interval specified. When set to 0 no time
## based rotation is performed.
# rotation_interval = "0h"
## Timestamp field name
## Field name to use to store the timestamp. If set to an empty string, then
## the timestamp is omitted.
# timestamp_field_name = "timestamp"
```
## Building Parquet Files
### Schema
Parquet files require a schema when writing files. To generate a schema,
Telegraf will go through all grouped metrics and generate an Apache Arrow schema
based on the union of all fields and tags. If a field and tag have the same name
then the field takes precedence.
The consequence of schema generation is that the very first flush sequence a
metric is seen takes much longer due to the additional looping through the
metrics to generate the schema. Subsequent flush intervals are significantly
faster.
When writing to a file, the schema is used to look for each value and if it is
not present a null value is added. The result is that if additional fields are
present after the first metric flush those fields are omitted.
### Write
The plugin makes use of the buffered writer. This may buffer some metrics into
memory before writing it to disk. This method is used as it can more compactly
write multiple flushes of metrics into a single Parquet row group.
Additionally, the Parquet format requires a proper footer, so close must be
called on the file to ensure it is properly formatted.
### Close
Parquet files must close properly or the file will not be readable. The parquet
format requires a footer at the end of the file and if that footer is not
present then the file cannot be read correctly.
If Telegraf were to crash while writing parquet files there is the possibility
of this occurring.
## File Rotation
If a file with the same target name exists at start, the existing file is
rotated to avoid over-writing it or conflicting schema.
File rotation is available via a time based interval that a user can optionally
set. Due to the usage of a buffered writer, a size based rotation is not
possible as the file may not actually get data at each interval.
## Explore Parquet Files
If a user wishes to explore a schema or data in a Parquet file quickly, then
consider the options below:
### CLI
The Arrow repo contains a Go CLI tool to read and parse Parquet files:
```s
go install github.com/apache/arrow/go/v18/parquet/cmd/parquet_reader@latest
parquet_reader <file>
```
### Python
Users can also use the [pyarrow][] library to quick open and explore Parquet
files:
```python
import pyarrow.parquet as pq
table = pq.read_table('example.parquet')
```
Once created, a user can look the various [pyarrow.Table][] functions to further
explore the data.
[pyarrow]: https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html
[pyarrow.Table]: https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table

View File

@ -0,0 +1,336 @@
//go:generate ../../../tools/readme_config_includer/generator
package parquet
import (
_ "embed"
"errors"
"fmt"
"os"
"strconv"
"time"
"github.com/apache/arrow/go/v18/arrow"
"github.com/apache/arrow/go/v18/arrow/array"
"github.com/apache/arrow/go/v18/arrow/memory"
"github.com/apache/arrow/go/v18/parquet"
"github.com/apache/arrow/go/v18/parquet/pqarrow"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/outputs"
)
//go:embed sample.conf
var sampleConfig string
var defaultTimestampFieldName = "timestamp"
type metricGroup struct {
filename string
builder *array.RecordBuilder
schema *arrow.Schema
writer *pqarrow.FileWriter
}
type Parquet struct {
Directory string `toml:"directory"`
RotationInterval config.Duration `toml:"rotation_interval"`
TimestampFieldName string `toml:"timestamp_field_name"`
Log telegraf.Logger `toml:"-"`
metricGroups map[string]*metricGroup
}
func (*Parquet) SampleConfig() string {
return sampleConfig
}
func (p *Parquet) Init() error {
if p.Directory == "" {
p.Directory = "."
}
stat, err := os.Stat(p.Directory)
if os.IsNotExist(err) {
if err := os.MkdirAll(p.Directory, 0750); err != nil {
return fmt.Errorf("failed to create directory %q: %w", p.Directory, err)
}
} else if !stat.IsDir() {
return fmt.Errorf("provided directory %q is not a directory", p.Directory)
}
p.metricGroups = make(map[string]*metricGroup)
return nil
}
func (p *Parquet) Connect() error {
return nil
}
func (p *Parquet) Close() error {
var errorOccurred bool
for _, metrics := range p.metricGroups {
if err := metrics.writer.Close(); err != nil {
p.Log.Errorf("failed to close file %q: %v", metrics.filename, err)
errorOccurred = true
}
}
if errorOccurred {
return errors.New("failed closing one or more parquet files")
}
return nil
}
func (p *Parquet) Write(metrics []telegraf.Metric) error {
groupedMetrics := make(map[string][]telegraf.Metric)
for _, metric := range metrics {
groupedMetrics[metric.Name()] = append(groupedMetrics[metric.Name()], metric)
}
now := time.Now()
for name, metrics := range groupedMetrics {
if _, ok := p.metricGroups[name]; !ok {
filename := fmt.Sprintf("%s/%s-%s-%s.parquet", p.Directory, name, now.Format("2006-01-02"), strconv.FormatInt(now.Unix(), 10))
schema, err := p.createSchema(metrics)
if err != nil {
return fmt.Errorf("failed to create schema for file %q: %w", name, err)
}
writer, err := p.createWriter(name, filename, schema)
if err != nil {
return fmt.Errorf("failed to create writer for file %q: %w", name, err)
}
p.metricGroups[name] = &metricGroup{
builder: array.NewRecordBuilder(memory.DefaultAllocator, schema),
filename: filename,
schema: schema,
writer: writer,
}
}
if p.RotationInterval != 0 {
if err := p.rotateIfNeeded(name); err != nil {
return fmt.Errorf("failed to rotate file %q: %w", p.metricGroups[name].filename, err)
}
}
record, err := p.createRecord(metrics, p.metricGroups[name].builder, p.metricGroups[name].schema)
if err != nil {
return fmt.Errorf("failed to create record for file %q: %w", p.metricGroups[name].filename, err)
}
if err = p.metricGroups[name].writer.WriteBuffered(record); err != nil {
return fmt.Errorf("failed to write to file %q: %w", p.metricGroups[name].filename, err)
}
record.Release()
}
return nil
}
func (p *Parquet) rotateIfNeeded(name string) error {
fileInfo, err := os.Stat(p.metricGroups[name].filename)
if err != nil {
return fmt.Errorf("failed to stat file %q: %w", p.metricGroups[name].filename, err)
}
expireTime := fileInfo.ModTime().Add(time.Duration(p.RotationInterval))
if time.Now().Before(expireTime) {
return nil
}
if err := p.metricGroups[name].writer.Close(); err != nil {
return fmt.Errorf("failed to close file for rotation %q: %w", p.metricGroups[name].filename, err)
}
writer, err := p.createWriter(name, p.metricGroups[name].filename, p.metricGroups[name].schema)
if err != nil {
return fmt.Errorf("failed to create new writer for file %q: %w", p.metricGroups[name].filename, err)
}
p.metricGroups[name].writer = writer
return nil
}
func (p *Parquet) createRecord(metrics []telegraf.Metric, builder *array.RecordBuilder, schema *arrow.Schema) (arrow.Record, error) {
for index, col := range schema.Fields() {
for _, m := range metrics {
if p.TimestampFieldName != "" && col.Name == p.TimestampFieldName {
builder.Field(index).(*array.Int64Builder).Append(m.Time().UnixNano())
continue
}
// Try to get the value from a field first, then from a tag.
var value any
var ok bool
value, ok = m.GetField(col.Name)
if !ok {
value, ok = m.GetTag(col.Name)
}
// if neither field nor tag exists, append a null value
if !ok {
switch col.Type {
case arrow.PrimitiveTypes.Int8:
builder.Field(index).(*array.Int8Builder).AppendNull()
case arrow.PrimitiveTypes.Int16:
builder.Field(index).(*array.Int16Builder).AppendNull()
case arrow.PrimitiveTypes.Int32:
builder.Field(index).(*array.Int32Builder).AppendNull()
case arrow.PrimitiveTypes.Int64:
builder.Field(index).(*array.Int64Builder).AppendNull()
case arrow.PrimitiveTypes.Uint8:
builder.Field(index).(*array.Uint8Builder).AppendNull()
case arrow.PrimitiveTypes.Uint16:
builder.Field(index).(*array.Uint16Builder).AppendNull()
case arrow.PrimitiveTypes.Uint32:
builder.Field(index).(*array.Uint32Builder).AppendNull()
case arrow.PrimitiveTypes.Uint64:
builder.Field(index).(*array.Uint64Builder).AppendNull()
case arrow.PrimitiveTypes.Float32:
builder.Field(index).(*array.Float32Builder).AppendNull()
case arrow.PrimitiveTypes.Float64:
builder.Field(index).(*array.Float64Builder).AppendNull()
case arrow.BinaryTypes.String:
builder.Field(index).(*array.StringBuilder).AppendNull()
case arrow.FixedWidthTypes.Boolean:
builder.Field(index).(*array.BooleanBuilder).AppendNull()
default:
return nil, fmt.Errorf("unsupported type: %T", value)
}
continue
}
switch col.Type {
case arrow.PrimitiveTypes.Int8:
builder.Field(index).(*array.Int8Builder).Append(value.(int8))
case arrow.PrimitiveTypes.Int16:
builder.Field(index).(*array.Int16Builder).Append(value.(int16))
case arrow.PrimitiveTypes.Int32:
builder.Field(index).(*array.Int32Builder).Append(value.(int32))
case arrow.PrimitiveTypes.Int64:
builder.Field(index).(*array.Int64Builder).Append(value.(int64))
case arrow.PrimitiveTypes.Uint8:
builder.Field(index).(*array.Uint8Builder).Append(value.(uint8))
case arrow.PrimitiveTypes.Uint16:
builder.Field(index).(*array.Uint16Builder).Append(value.(uint16))
case arrow.PrimitiveTypes.Uint32:
builder.Field(index).(*array.Uint32Builder).Append(value.(uint32))
case arrow.PrimitiveTypes.Uint64:
builder.Field(index).(*array.Uint64Builder).Append(value.(uint64))
case arrow.PrimitiveTypes.Float32:
builder.Field(index).(*array.Float32Builder).Append(value.(float32))
case arrow.PrimitiveTypes.Float64:
builder.Field(index).(*array.Float64Builder).Append(value.(float64))
case arrow.BinaryTypes.String:
builder.Field(index).(*array.StringBuilder).Append(value.(string))
case arrow.FixedWidthTypes.Boolean:
builder.Field(index).(*array.BooleanBuilder).Append(value.(bool))
default:
return nil, fmt.Errorf("unsupported type: %T", value)
}
}
}
record := builder.NewRecord()
return record, nil
}
func (p *Parquet) createSchema(metrics []telegraf.Metric) (*arrow.Schema, error) {
rawFields := make(map[string]arrow.DataType, 0)
for _, metric := range metrics {
for _, field := range metric.FieldList() {
if _, ok := rawFields[field.Key]; !ok {
arrowType, err := goToArrowType(field.Value)
if err != nil {
return nil, fmt.Errorf("error converting '%s=%s' field to arrow type: %w", field.Key, field.Value, err)
}
rawFields[field.Key] = arrowType
}
}
for _, tag := range metric.TagList() {
if _, ok := rawFields[tag.Key]; !ok {
rawFields[tag.Key] = arrow.BinaryTypes.String
}
}
}
fields := make([]arrow.Field, 0)
for key, value := range rawFields {
fields = append(fields, arrow.Field{
Name: key,
Type: value,
})
}
if p.TimestampFieldName != "" {
fields = append(fields, arrow.Field{
Name: p.TimestampFieldName,
Type: arrow.PrimitiveTypes.Int64,
})
}
return arrow.NewSchema(fields, nil), nil
}
func (p *Parquet) createWriter(name string, filename string, schema *arrow.Schema) (*pqarrow.FileWriter, error) {
if _, err := os.Stat(filename); err == nil {
now := time.Now()
rotatedFilename := fmt.Sprintf("%s/%s-%s-%s.parquet", p.Directory, name, now.Format("2006-01-02"), strconv.FormatInt(now.Unix(), 10))
if err := os.Rename(filename, rotatedFilename); err != nil {
return nil, fmt.Errorf("failed to rename file %q: %w", filename, err)
}
}
file, err := os.Create(filename)
if err != nil {
return nil, fmt.Errorf("failed to create file %q: %w", filename, err)
}
writer, err := pqarrow.NewFileWriter(schema, file, parquet.NewWriterProperties(), pqarrow.DefaultWriterProps())
if err != nil {
return nil, fmt.Errorf("failed to create parquet writer for file %q: %w", filename, err)
}
return writer, nil
}
func goToArrowType(value interface{}) (arrow.DataType, error) {
switch value.(type) {
case int8:
return arrow.PrimitiveTypes.Int8, nil
case int16:
return arrow.PrimitiveTypes.Int16, nil
case int32:
return arrow.PrimitiveTypes.Int32, nil
case int64, int:
return arrow.PrimitiveTypes.Int64, nil
case uint8:
return arrow.PrimitiveTypes.Uint8, nil
case uint16:
return arrow.PrimitiveTypes.Uint16, nil
case uint32:
return arrow.PrimitiveTypes.Uint32, nil
case uint64, uint:
return arrow.PrimitiveTypes.Uint64, nil
case float32:
return arrow.PrimitiveTypes.Float32, nil
case float64:
return arrow.PrimitiveTypes.Float64, nil
case string:
return arrow.BinaryTypes.String, nil
case bool:
return arrow.FixedWidthTypes.Boolean, nil
default:
return nil, fmt.Errorf("unsupported type: %T", value)
}
}
func init() {
outputs.Add("parquet", func() telegraf.Output {
return &Parquet{
TimestampFieldName: defaultTimestampFieldName,
}
})
}

View File

@ -0,0 +1,246 @@
package parquet
import (
"os"
"path/filepath"
"testing"
"time"
"github.com/apache/arrow/go/v18/parquet/file"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestCases(t *testing.T) {
type testcase struct {
name string
metrics []telegraf.Metric
numRows int
numColumns int
}
var testcases = []testcase{
{
name: "basic single metric",
metrics: []telegraf.Metric{
testutil.MustMetric(
"test",
map[string]string{},
map[string]interface{}{
"value": 1.0,
},
time.Now(),
),
},
numRows: 1,
numColumns: 2,
},
{
name: "mix of tags and fields",
metrics: []telegraf.Metric{
testutil.MustMetric(
"test",
map[string]string{
"tag": "tag",
},
map[string]interface{}{
"value": 1.0,
},
time.Now(),
),
testutil.MustMetric(
"test",
map[string]string{
"tag": "tag2",
},
map[string]interface{}{
"value": 2.0,
},
time.Now(),
),
},
numRows: 2,
numColumns: 3,
},
{
name: "null values",
metrics: []telegraf.Metric{
testutil.MustMetric(
"test",
map[string]string{
"host": "tag",
},
map[string]interface{}{
"value_old": 1.0,
},
time.Now(),
),
testutil.MustMetric(
"test",
map[string]string{
"tag": "tag2",
},
map[string]interface{}{
"value_new": 2.0,
},
time.Now(),
),
},
numRows: 2,
numColumns: 5,
},
{
name: "data types",
metrics: []telegraf.Metric{
testutil.MustMetric(
"test",
map[string]string{},
map[string]interface{}{
"int": int(0),
"int8": int8(1),
"int16": int16(2),
"int32": int32(3),
"int64": int64(4),
"uint": uint(5),
"uint8": uint8(6),
"uint16": uint16(7),
"uint32": uint32(8),
"uint64": uint64(9),
"float32": float32(10.0),
"float64": float64(11.0),
"string": "string",
"bool": true,
},
time.Now(),
),
},
numRows: 1,
numColumns: 15,
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
testDir := t.TempDir()
plugin := &Parquet{
Directory: testDir,
TimestampFieldName: defaultTimestampFieldName,
}
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
require.NoError(t, plugin.Write(tc.metrics))
require.NoError(t, plugin.Close())
// Read metrics from parquet file
files, err := os.ReadDir(testDir)
require.NoError(t, err)
require.Len(t, files, 1)
reader, err := file.OpenParquetFile(filepath.Join(testDir, files[0].Name()), false)
require.NoError(t, err)
defer reader.Close()
metadata := reader.MetaData()
require.Equal(t, tc.numRows, int(metadata.NumRows))
require.Equal(t, tc.numColumns, metadata.Schema.NumColumns())
})
}
}
func TestRotation(t *testing.T) {
metrics := []telegraf.Metric{
testutil.MustMetric(
"test",
map[string]string{},
map[string]interface{}{
"value": 1.0,
},
time.Now(),
),
}
testDir := t.TempDir()
plugin := &Parquet{
Directory: testDir,
RotationInterval: config.Duration(1 * time.Second),
TimestampFieldName: defaultTimestampFieldName,
}
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
require.Eventually(t, func() bool {
require.NoError(t, plugin.Write(metrics))
files, err := os.ReadDir(testDir)
require.NoError(t, err)
return len(files) == 2
}, 5*time.Second, time.Second)
require.NoError(t, plugin.Close())
}
func TestOmitTimestamp(t *testing.T) {
metrics := []telegraf.Metric{
testutil.MustMetric(
"test",
map[string]string{},
map[string]interface{}{
"value": 1.0,
},
time.Now(),
),
}
testDir := t.TempDir()
plugin := &Parquet{
Directory: testDir,
}
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
require.NoError(t, plugin.Write(metrics))
require.NoError(t, plugin.Close())
files, err := os.ReadDir(testDir)
require.NoError(t, err)
require.Len(t, files, 1)
reader, err := file.OpenParquetFile(filepath.Join(testDir, files[0].Name()), false)
require.NoError(t, err)
defer reader.Close()
metadata := reader.MetaData()
require.Equal(t, 1, int(metadata.NumRows))
require.Equal(t, 1, metadata.Schema.NumColumns())
}
func TestTimestampDifferentName(t *testing.T) {
metrics := []telegraf.Metric{
testutil.MustMetric(
"test",
map[string]string{},
map[string]interface{}{
"value": 1.0,
},
time.Now(),
),
}
testDir := t.TempDir()
plugin := &Parquet{
Directory: testDir,
TimestampFieldName: "time",
}
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
require.NoError(t, plugin.Write(metrics))
require.NoError(t, plugin.Close())
files, err := os.ReadDir(testDir)
require.NoError(t, err)
require.Len(t, files, 1)
reader, err := file.OpenParquetFile(filepath.Join(testDir, files[0].Name()), false)
require.NoError(t, err)
defer reader.Close()
metadata := reader.MetaData()
require.Equal(t, 1, int(metadata.NumRows))
require.Equal(t, 2, metadata.Schema.NumColumns())
}

View File

@ -0,0 +1,14 @@
# A plugin that writes metrics to parquet files
[[outputs.parquet]]
## Directory to write parquet files in. If a file already exists the output
## will attempt to continue using the existing file.
# directory = "."
## Files are rotated after the time interval specified. When set to 0 no time
## based rotation is performed.
# rotation_interval = "0h"
## Timestamp field name
## Field name to use to store the timestamp. If set to an empty string, then
## the timestamp is omitted.
# timestamp_field_name = "timestamp"

View File

@ -3,8 +3,8 @@ package parquet
import (
"reflect"
"github.com/apache/arrow/go/v16/parquet"
"github.com/apache/arrow/go/v16/parquet/file"
"github.com/apache/arrow/go/v18/parquet"
"github.com/apache/arrow/go/v18/parquet/file"
)
func newColumnParser(reader file.ColumnChunkReader) *columnParser {

View File

@ -7,7 +7,7 @@ import (
"slices"
"time"
"github.com/apache/arrow/go/v16/parquet/file"
"github.com/apache/arrow/go/v18/parquet/file"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"