From ba9cbeebb5698548140ec2758721a0cd766f79b9 Mon Sep 17 00:00:00 2001 From: Joshua Powers Date: Wed, 10 Apr 2024 07:54:16 -0600 Subject: [PATCH] feat(parsers.parquet): Add Apache Parquet Parser (#15008) --- docs/DATA_FORMATS_INPUT.md | 1 + docs/LICENSE_OF_DEPENDENCIES.md | 1 + go.mod | 17 +- go.sum | 37 +++-- plugins/parsers/all/parquet.go | 5 + plugins/parsers/parquet/README.md | 51 ++++++ plugins/parsers/parquet/columns.go | 126 +++++++++++++++ plugins/parsers/parquet/parser.go | 149 ++++++++++++++++++ plugins/parsers/parquet/parser_test.go | 74 +++++++++ .../parquet/testcases/benchmark/expected.out | 1 + .../parquet/testcases/benchmark/generate.py | 11 ++ .../parquet/testcases/benchmark/input.parquet | Bin 0 -> 2366 bytes .../parquet/testcases/benchmark/telegraf.conf | 6 + .../parquet/testcases/datatypes/expected.out | 2 + .../parquet/testcases/datatypes/generate.py | 33 ++++ .../parquet/testcases/datatypes/input.parquet | Bin 0 -> 6560 bytes .../parquet/testcases/datatypes/telegraf.conf | 9 ++ .../parquet/testcases/dense/expected.out | 7 + .../parquet/testcases/dense/generate.py | 16 ++ .../parquet/testcases/dense/input.parquet | Bin 0 -> 3540 bytes .../parquet/testcases/dense/telegraf.conf | 8 + .../parquet/testcases/empty/expected.out | 0 .../parquet/testcases/empty/generate.py | 6 + .../parquet/testcases/empty/input.parquet | Bin 0 -> 976 bytes .../parquet/testcases/empty/telegraf.conf | 3 + .../parquet/testcases/multitable/expected.out | 21 +++ .../parquet/testcases/multitable/generate.py | 39 +++++ .../testcases/multitable/input.parquet | Bin 0 -> 4711 bytes .../testcases/multitable/telegraf.conf | 8 + .../parquet/testcases/sparse/expected.out | 6 + .../parquet/testcases/sparse/generate.py | 20 +++ .../parquet/testcases/sparse/input.parquet | Bin 0 -> 5499 bytes .../parquet/testcases/sparse/telegraf.conf | 7 + .../parquet/testcases/timestamp/expected.out | 3 + .../parquet/testcases/timestamp/generate.py | 14 ++ .../parquet/testcases/timestamp/input.parquet | Bin 0 -> 2444 bytes .../parquet/testcases/timestamp/telegraf.conf | 7 + 37 files changed, 668 insertions(+), 20 deletions(-) create mode 100644 plugins/parsers/all/parquet.go create mode 100644 plugins/parsers/parquet/README.md create mode 100644 plugins/parsers/parquet/columns.go create mode 100644 plugins/parsers/parquet/parser.go create mode 100644 plugins/parsers/parquet/parser_test.go create mode 100644 plugins/parsers/parquet/testcases/benchmark/expected.out create mode 100644 plugins/parsers/parquet/testcases/benchmark/generate.py create mode 100644 plugins/parsers/parquet/testcases/benchmark/input.parquet create mode 100644 plugins/parsers/parquet/testcases/benchmark/telegraf.conf create mode 100644 plugins/parsers/parquet/testcases/datatypes/expected.out create mode 100644 plugins/parsers/parquet/testcases/datatypes/generate.py create mode 100644 plugins/parsers/parquet/testcases/datatypes/input.parquet create mode 100644 plugins/parsers/parquet/testcases/datatypes/telegraf.conf create mode 100644 plugins/parsers/parquet/testcases/dense/expected.out create mode 100644 plugins/parsers/parquet/testcases/dense/generate.py create mode 100644 plugins/parsers/parquet/testcases/dense/input.parquet create mode 100644 plugins/parsers/parquet/testcases/dense/telegraf.conf create mode 100644 plugins/parsers/parquet/testcases/empty/expected.out create mode 100644 plugins/parsers/parquet/testcases/empty/generate.py create mode 100644 plugins/parsers/parquet/testcases/empty/input.parquet create mode 100644 plugins/parsers/parquet/testcases/empty/telegraf.conf create mode 100644 plugins/parsers/parquet/testcases/multitable/expected.out create mode 100644 plugins/parsers/parquet/testcases/multitable/generate.py create mode 100644 plugins/parsers/parquet/testcases/multitable/input.parquet create mode 100644 plugins/parsers/parquet/testcases/multitable/telegraf.conf create mode 100644 plugins/parsers/parquet/testcases/sparse/expected.out create mode 100644 plugins/parsers/parquet/testcases/sparse/generate.py create mode 100644 plugins/parsers/parquet/testcases/sparse/input.parquet create mode 100644 plugins/parsers/parquet/testcases/sparse/telegraf.conf create mode 100644 plugins/parsers/parquet/testcases/timestamp/expected.out create mode 100644 plugins/parsers/parquet/testcases/timestamp/generate.py create mode 100644 plugins/parsers/parquet/testcases/timestamp/input.parquet create mode 100644 plugins/parsers/parquet/testcases/timestamp/telegraf.conf diff --git a/docs/DATA_FORMATS_INPUT.md b/docs/DATA_FORMATS_INPUT.md index c86b72637..98db6e37f 100644 --- a/docs/DATA_FORMATS_INPUT.md +++ b/docs/DATA_FORMATS_INPUT.md @@ -17,6 +17,7 @@ Protocol, JSON format, or Apache Avro format. - [JSON v2](/plugins/parsers/json_v2) - [Logfmt](/plugins/parsers/logfmt) - [Nagios](/plugins/parsers/nagios) +- [Parquet](/plugins/parsers/parquet) - [Prometheus](/plugins/parsers/prometheus) - [PrometheusRemoteWrite](/plugins/parsers/prometheusremotewrite) - [Value](/plugins/parsers/value), ie: 45 or "booyah" diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index edffeaa60..0d140ee4c 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -43,6 +43,7 @@ following works: - github.com/alitto/pond [MIT License](https://github.com/alitto/pond/blob/master/LICENSE) - github.com/aliyun/alibaba-cloud-sdk-go [Apache License 2.0](https://github.com/aliyun/alibaba-cloud-sdk-go/blob/master/LICENSE) - github.com/amir/raidman [The Unlicense](https://github.com/amir/raidman/blob/master/UNLICENSE) +- github.com/andybalholm/brotli [MIT License](https://github.com/andybalholm/brotli/blob/master/LICENSE) - github.com/antchfx/jsonquery [MIT License](https://github.com/antchfx/jsonquery/blob/master/LICENSE) - github.com/antchfx/xmlquery [MIT License](https://github.com/antchfx/xmlquery/blob/master/LICENSE) - github.com/antchfx/xpath [MIT License](https://github.com/antchfx/xpath/blob/master/LICENSE) diff --git a/go.mod b/go.mod index 383887b01..efcf32395 100644 --- a/go.mod +++ b/go.mod @@ -35,8 +35,9 @@ require ( github.com/antchfx/xmlquery v1.3.18 github.com/antchfx/xpath v1.2.5 github.com/apache/arrow/go/v13 v13.0.0 + github.com/apache/arrow/go/v16 v16.0.0-20240319161736-1ee3da0064a0 github.com/apache/iotdb-client-go v1.2.0-tsbs - github.com/apache/thrift v0.18.1 + github.com/apache/thrift v0.19.0 github.com/aristanetworks/goarista v0.0.0-20190325233358-a123909ec740 github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 github.com/awnumar/memguard v0.22.4 @@ -220,7 +221,7 @@ require ( k8s.io/apimachinery v0.29.3 k8s.io/client-go v0.29.3 layeh.com/radius v0.0.0-20221205141417-e7fbddd11d68 - modernc.org/sqlite v1.29.2 + modernc.org/sqlite v1.29.5 ) require ( @@ -257,6 +258,7 @@ require ( github.com/Microsoft/go-winio v0.6.1 // indirect github.com/Microsoft/hcsshim v0.11.4 // indirect github.com/alecthomas/participle v0.4.1 // indirect + github.com/andybalholm/brotli v1.1.0 // indirect github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df // indirect github.com/apache/arrow/go/v14 v14.0.2 // indirect github.com/aristanetworks/glog v0.0.0-20191112221043-67e8567f59f3 // indirect @@ -334,7 +336,7 @@ require ( github.com/golang-sql/sqlexp v0.1.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect - github.com/google/flatbuffers v23.5.26+incompatible // indirect + github.com/google/flatbuffers v24.3.7+incompatible // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/go-querystring v1.1.0 // indirect github.com/google/gofuzz v1.2.0 // indirect @@ -375,7 +377,8 @@ require ( github.com/josharian/native v1.1.0 // indirect github.com/jpillora/backoff v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/klauspost/cpuid/v2 v2.2.6 // indirect + github.com/klauspost/asmfmt v1.3.2 // indirect + github.com/klauspost/cpuid/v2 v2.2.7 // indirect github.com/kr/fs v0.1.0 // indirect github.com/kylelemons/godebug v1.1.0 // indirect github.com/leodido/ragel-machinery v0.0.0-20181214104525-299bdde78165 // indirect @@ -388,6 +391,8 @@ require ( github.com/mdlayher/genetlink v1.2.0 // indirect github.com/mdlayher/netlink v1.7.2 // indirect github.com/mdlayher/socket v0.4.1 // indirect + github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect + github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect github.com/minio/highwayhash v1.0.2 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect @@ -473,9 +478,9 @@ require ( go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.24.0 // indirect - golang.org/x/exp v0.0.0-20231219180239-dc181d75b848 // indirect + golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 // indirect golang.org/x/time v0.5.0 // indirect - golang.org/x/tools v0.17.0 // indirect + golang.org/x/tools v0.19.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect golang.zx2c4.com/wireguard v0.0.0-20211209221555-9c9e7e272434 // indirect google.golang.org/genproto v0.0.0-20240205150955-31a09d347014 // indirect diff --git a/go.sum b/go.sum index 00672a3a8..9f0c71b1d 100644 --- a/go.sum +++ b/go.sum @@ -781,6 +781,8 @@ github.com/amir/raidman v0.0.0-20170415203553-1ccc43bfb9c9 h1:FXrPTd8Rdlc94dKccl github.com/amir/raidman v0.0.0-20170415203553-1ccc43bfb9c9/go.mod h1:eliMa/PW+RDr2QLWRmLH1R1ZA4RInpmvOzDDXtaIZkc= github.com/andybalholm/brotli v1.0.0/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y= github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= +github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= +github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= github.com/antchfx/jsonquery v1.3.3 h1:zjZpbnZhYng3uOAbIfdNq81A9mMEeuDJeYIpeKpZ4es= github.com/antchfx/jsonquery v1.3.3/go.mod h1:1JG4DqRlRCHgVYDPY1ioYFAGSXGfWHzNgrbiGQHsWck= github.com/antchfx/xmlquery v1.3.18 h1:FSQ3wMuphnPPGJOFhvc+cRQ2CT/rUj4cyQXkJcjOwz0= @@ -798,12 +800,14 @@ github.com/apache/arrow/go/v13 v13.0.0 h1:kELrvDQuKZo8csdWYqBQfyi431x6Zs/YJTEgUu github.com/apache/arrow/go/v13 v13.0.0/go.mod h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc= github.com/apache/arrow/go/v14 v14.0.2 h1:N8OkaJEOfI3mEZt07BIkvo4sC6XDbL+48MBPWO5IONw= github.com/apache/arrow/go/v14 v14.0.2/go.mod h1:u3fgh3EdgN/YQ8cVQRguVW3R+seMybFg8QBQ5LU+eBY= +github.com/apache/arrow/go/v16 v16.0.0-20240319161736-1ee3da0064a0 h1:XbC214lVvnAnDzowGV7dYiv4f4Aa6jhtIby08OgbcUg= +github.com/apache/arrow/go/v16 v16.0.0-20240319161736-1ee3da0064a0/go.mod h1:VVbdJivCXZAJ6IhOSCSzk/RVQ/PlcitjskAWEST3Sc0= github.com/apache/iotdb-client-go v1.2.0-tsbs h1:hezGUydAkDSceCvsetYorI87S2e8HZ4hTQHmGZgOGDY= github.com/apache/iotdb-client-go v1.2.0-tsbs/go.mod h1:3D6QYkqRmASS/4HsjU+U/3fscyc5M9xKRfywZsKuoZY= github.com/apache/thrift v0.15.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= -github.com/apache/thrift v0.18.1 h1:lNhK/1nqjbwbiOPDBPFJVKxgDEGSepKuTh6OLiXW8kg= -github.com/apache/thrift v0.18.1/go.mod h1:rdQn/dCcDKEWjjylUeueum4vQEjG2v8v2PqriUnbr+I= +github.com/apache/thrift v0.19.0 h1:sOqkWPzMj7w6XaYbJQG7m4sGqVolaW/0D28Ln7yPzMk= +github.com/apache/thrift v0.19.0/go.mod h1:SUALL216IiaOw2Oy+5Vs9lboJ/t9g40C+G07Dc0QC1I= github.com/apex/log v1.6.0/go.mod h1:x7s+P9VtvFBXge9Vbn+8TrqKmuzmD35TTkeBHul8UtY= github.com/apex/logs v1.0.0/go.mod h1:XzxuLZ5myVHDy9SAmYpamKKRNApGj54PfYLcFrXqDwo= github.com/aphistic/golf v0.0.0-20180712155816-02c07f170c5a/go.mod h1:3NqKYiepwy8kCu4PNA+aP7WUV72eXWJeP9/r3/K9aLE= @@ -1312,8 +1316,8 @@ github.com/google/cel-go v0.18.1 h1:V/lAXKq4C3BYLDy/ARzMtpkEEYfHQpZzVyzy69nEUjs= github.com/google/cel-go v0.18.1/go.mod h1:PVAybmSnWkNMUZR/tEWFUiJ1Np4Hz0MHsZJcgC4zln4= github.com/google/flatbuffers v1.12.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/flatbuffers v2.0.8+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= -github.com/google/flatbuffers v23.5.26+incompatible h1:M9dgRyhJemaM4Sw8+66GHBu8ioaQmyPLg1b8VwK5WJg= -github.com/google/flatbuffers v23.5.26+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/flatbuffers v24.3.7+incompatible h1:BxGUkIQnOciBu33bd5BdvqY8Qvo0O/GR4SPhh7x9Ed0= +github.com/google/flatbuffers v24.3.7+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U= github.com/google/gnxi v0.0.0-20231026134436-d82d9936af15 h1:EETGSLGKBReUUYZdztSp45EzTE6CHw2qMKIfyPrgp6c= @@ -1676,6 +1680,7 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE= +github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.4.1/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.10.10/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= @@ -1684,8 +1689,8 @@ github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLA github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/klauspost/cpuid v1.2.0/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= -github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc= -github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= +github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/klauspost/pgzip v1.2.4/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= github.com/klauspost/pgzip v1.2.5/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= github.com/klauspost/pgzip v1.2.6 h1:8RXeL5crjEUFnR2/Sn6GJNWtSQ3Dk8pq4CL3jvdDyjU= @@ -1770,8 +1775,8 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/mattn/go-sqlite3 v1.14.14/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= -github.com/mattn/go-sqlite3 v1.14.19 h1:fhGleo2h1p8tVChob4I9HpmVFIAkKGpiukdrgQbWfGI= -github.com/mattn/go-sqlite3 v1.14.19/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= +github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= +github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/mdlayher/apcupsd v0.0.0-20220319200143-473c7b5f3c6a h1:JOlLsLUQnokTyWWwEvOVoKH3XUl6oDMP8jisO54l6J8= github.com/mdlayher/apcupsd v0.0.0-20220319200143-473c7b5f3c6a/go.mod h1:960H6oqSawdujauTeLX9BOx+ZdYX0TdG9xE9br5bino= @@ -1817,7 +1822,9 @@ github.com/miekg/dns v1.1.58 h1:ca2Hdkz+cDg/7eNF6V56jjzuZ4aCAE+DbVkILdQWG/4= github.com/miekg/dns v1.1.58/go.mod h1:Ypv+3b/KadlvW9vJfXOTf300O4UqaHFzFCuHz+rPkBY= github.com/mikioh/ipaddr v0.0.0-20190404000644-d465c8ab6721 h1:RlZweED6sbSArvlE924+mUcZuXKLBHA35U7LN621Bws= github.com/mikioh/ipaddr v0.0.0-20190404000644-d465c8ab6721/go.mod h1:Ickgr2WtCLZ2MDGd4Gr0geeCH5HybhRJbonOgQpvSxc= +github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= +github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= @@ -2425,8 +2432,8 @@ golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EH golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= golang.org/x/exp v0.0.0-20200513190911-00229845015e/go.mod h1:4M0jN8W1tt0AVLNr8HDosyJCDCDuyL9N9+3m7wDWgKw= golang.org/x/exp v0.0.0-20220827204233-334a2380cb91/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE= -golang.org/x/exp v0.0.0-20231219180239-dc181d75b848 h1:+iq7lrkxmFNBM7xx+Rae2W6uyPfhPeDWD+n+JgppptE= -golang.org/x/exp v0.0.0-20231219180239-dc181d75b848/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= +golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 h1:LfspQV/FYTatPTr/3HzIcmiUFH7PGP+OQ6mgDYo3yuQ= +golang.org/x/exp v0.0.0-20240222234643-814bf88cf225/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= @@ -2873,8 +2880,8 @@ golang.org/x/tools v0.2.0/go.mod h1:y4OqIKeOV/fWJetJ8bXPU1sEVniLMIyDAZWeHdV+NTA= golang.org/x/tools v0.3.0/go.mod h1:/rWhSS2+zyEVwoJf8YAX6L2f0ntZ7Kn/mGgAWcipA5k= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s= -golang.org/x/tools v0.17.0 h1:FvmRgNOcs3kOa+T20R1uhfP9F6HgG2mfxDv1vrx1Htc= -golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps= +golang.org/x/tools v0.19.0 h1:tfGCXNR1OsFG+sVdLAitlpjAvD/I6dHDKnYrpEZUHkw= +golang.org/x/tools v0.19.0/go.mod h1:qoJWxmGSIBmAeriMx19ogtrEPrGtDbPK634QFIcLAhc= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -3267,6 +3274,8 @@ modernc.org/ccgo/v3 v3.16.6/go.mod h1:tGtX0gE9Jn7hdZFeU88slbTh1UtCYKusWOoCJuvkWs modernc.org/ccgo/v3 v3.16.8/go.mod h1:zNjwkizS+fIFDrDjIAgBSCLkWbJuHF+ar3QRn+Z9aws= modernc.org/ccgo/v3 v3.16.9/go.mod h1:zNMzC9A9xeNUepy6KuZBbugn3c0Mc9TeiJO4lgvkJDo= modernc.org/ccorpus v1.11.6/go.mod h1:2gEUTrWqdpH2pXsmTM1ZkjeSrUWDpjMu2T6m29L/ErQ= +modernc.org/fileutil v1.3.0 h1:gQ5SIzK3H9kdfai/5x41oQiKValumqNTDXMvKo62HvE= +modernc.org/fileutil v1.3.0/go.mod h1:XatxS8fZi3pS8/hKG2GH/ArUogfxjpEKs3Ku3aK4JyQ= modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 h1:5D53IMaUuA5InSeMu9eJtlQXS2NxAhyWQvkKEgXZhHI= modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6/go.mod h1:Qz0X07sNOR1jWYCrJMEnbW/X55x206Q7Vt4mz6/wHp4= modernc.org/httpfs v1.0.6/go.mod h1:7dosgurJGp0sPaRanU53W4xZYKh14wfzX420oZADeHM= @@ -3292,8 +3301,8 @@ modernc.org/memory v1.7.2/go.mod h1:NO4NVCQy0N7ln+T9ngWqOQfi7ley4vpwvARR+Hjw95E= modernc.org/opt v0.1.1/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0= modernc.org/opt v0.1.3/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0= modernc.org/sqlite v1.18.1/go.mod h1:6ho+Gow7oX5V+OiOQ6Tr4xeqbx13UZ6t+Fw9IRUG4d4= -modernc.org/sqlite v1.29.2 h1:xgBSyA3gemwgP31PWFfFjtBorQNYpeypGdoSDjXhrgI= -modernc.org/sqlite v1.29.2/go.mod h1:hG41jCYxOAOoO6BRK66AdRlmOcDzXf7qnwlwjUIOqa0= +modernc.org/sqlite v1.29.5 h1:8l/SQKAjDtZFo9lkJLdk8g9JEOeYRG4/ghStDCCTiTE= +modernc.org/sqlite v1.29.5/go.mod h1:S02dvcmm7TnTRvGhv8IGYyLnIt7AS2KPaB1F/71p75U= modernc.org/strutil v1.1.1/go.mod h1:DE+MQQ/hjKBZS2zNInV5hhcipt5rLPWkmpbGeW5mmdw= modernc.org/strutil v1.1.3/go.mod h1:MEHNA7PdEnEwLvspRMtWTNnp2nnyvMfkimT1NKNAGbw= modernc.org/strutil v1.2.0 h1:agBi9dp1I+eOnxXeiZawM8F4LawKv4NzGWSaLfyeNZA= diff --git a/plugins/parsers/all/parquet.go b/plugins/parsers/all/parquet.go new file mode 100644 index 000000000..dc5492924 --- /dev/null +++ b/plugins/parsers/all/parquet.go @@ -0,0 +1,5 @@ +//go:build !custom || parsers || parsers.parquet + +package all + +import _ "github.com/influxdata/telegraf/plugins/parsers/parquet" // register plugin diff --git a/plugins/parsers/parquet/README.md b/plugins/parsers/parquet/README.md new file mode 100644 index 000000000..f54e9df70 --- /dev/null +++ b/plugins/parsers/parquet/README.md @@ -0,0 +1,51 @@ +# Parquet Parser Plugin + +The Parquet parser allows for the parsing of Parquet files that were read in. + +## Configuration + +```toml +[[inputs.file]] + files = ["example"] + + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "parquet" + + ## Tag column is an array of columns that should be added as tags. + # tag_columns = [] + + ## Name column is the column to use as the measurement name. + # measurement_column = "" + + ## Timestamp column is the column containing the time that should be used to + ## create the metric. If not set, then the time of parsing is used. + # timestamp_column = "" + + ## Timestamp format is the time layout that should be used to interpret the + ## timestamp_column. The time must be `unix`, `unix_ms`, `unix_us`, `unix_ns`, + ## or a time in the "reference time". To define a different format, arrange + ## the values from the "reference time" in the example to match the format + ## you will be using. For more information on the "reference time", visit + ## https://golang.org/pkg/time/#Time.Format + ## ex: timestamp_format = "Mon Jan 2 15:04:05 -0700 MST 2006" + ## timestamp_format = "2006-01-02T15:04:05Z07:00" + ## timestamp_format = "01/02/2006 15:04:05" + ## timestamp_format = "unix" + ## timestamp_format = "unix_ms" + # timestamp_format = "" + + ## Timezone allows you to provide an override for timestamps that + ## do not already include an offset + ## e.g. 04/06/2016 12:41:45 + ## + ## Default: "" which renders UTC + ## Options are as follows: + ## 1. Local -- interpret based on machine localtime + ## 2. "America/New_York" -- Unix TZ values like those found in + ## https://en.wikipedia.org/wiki/List_of_tz_database_time_zones + ## 3. UTC -- or blank/unspecified, will return timestamp in UTC + # timestamp_timezone = "" +``` diff --git a/plugins/parsers/parquet/columns.go b/plugins/parsers/parquet/columns.go new file mode 100644 index 000000000..08e57e02f --- /dev/null +++ b/plugins/parsers/parquet/columns.go @@ -0,0 +1,126 @@ +package parquet + +import ( + "reflect" + + "github.com/apache/arrow/go/v16/parquet" + "github.com/apache/arrow/go/v16/parquet/file" +) + +func newColumnParser(reader file.ColumnChunkReader) *columnParser { + batchSize := 128 + + var valueBuffer interface{} + switch reader.(type) { + case *file.BooleanColumnChunkReader: + valueBuffer = make([]bool, batchSize) + case *file.Int32ColumnChunkReader: + valueBuffer = make([]int32, batchSize) + case *file.Int64ColumnChunkReader: + valueBuffer = make([]int64, batchSize) + case *file.Float32ColumnChunkReader: + valueBuffer = make([]float32, batchSize) + case *file.Float64ColumnChunkReader: + valueBuffer = make([]float64, batchSize) + case *file.ByteArrayColumnChunkReader: + valueBuffer = make([]parquet.ByteArray, batchSize) + case *file.FixedLenByteArrayColumnChunkReader: + valueBuffer = make([]parquet.FixedLenByteArray, batchSize) + } + + return &columnParser{ + name: reader.Descriptor().Name(), + reader: reader, + batchSize: int64(batchSize), + defLevels: make([]int16, batchSize), + repLevels: make([]int16, batchSize), + valueBuffer: valueBuffer, + } +} + +type columnParser struct { + name string + reader file.ColumnChunkReader + batchSize int64 + valueOffset int + valuesBuffered int + + levelOffset int64 + levelsBuffered int64 + defLevels []int16 + repLevels []int16 + + valueBuffer interface{} +} + +func (c *columnParser) readNextBatch() error { + var err error + + switch reader := c.reader.(type) { + case *file.BooleanColumnChunkReader: + values := c.valueBuffer.([]bool) + c.levelsBuffered, c.valuesBuffered, err = reader.ReadBatch(c.batchSize, values, c.defLevels, c.repLevels) + case *file.Int32ColumnChunkReader: + values := c.valueBuffer.([]int32) + c.levelsBuffered, c.valuesBuffered, err = reader.ReadBatch(c.batchSize, values, c.defLevels, c.repLevels) + case *file.Int64ColumnChunkReader: + values := c.valueBuffer.([]int64) + c.levelsBuffered, c.valuesBuffered, err = reader.ReadBatch(c.batchSize, values, c.defLevels, c.repLevels) + case *file.Float32ColumnChunkReader: + values := c.valueBuffer.([]float32) + c.levelsBuffered, c.valuesBuffered, err = reader.ReadBatch(c.batchSize, values, c.defLevels, c.repLevels) + case *file.Float64ColumnChunkReader: + values := c.valueBuffer.([]float64) + c.levelsBuffered, c.valuesBuffered, err = reader.ReadBatch(c.batchSize, values, c.defLevels, c.repLevels) + case *file.ByteArrayColumnChunkReader: + values := c.valueBuffer.([]parquet.ByteArray) + c.levelsBuffered, c.valuesBuffered, err = reader.ReadBatch(c.batchSize, values, c.defLevels, c.repLevels) + case *file.FixedLenByteArrayColumnChunkReader: + values := c.valueBuffer.([]parquet.FixedLenByteArray) + c.levelsBuffered, c.valuesBuffered, err = reader.ReadBatch(c.batchSize, values, c.defLevels, c.repLevels) + } + + c.valueOffset = 0 + c.levelOffset = 0 + + return err +} + +func (c *columnParser) HasNext() bool { + return c.levelOffset < c.levelsBuffered || c.reader.HasNext() +} + +func (c *columnParser) Next() (interface{}, bool) { + if c.levelOffset == c.levelsBuffered { + if !c.HasNext() { + return nil, false + } + if err := c.readNextBatch(); err != nil { + return nil, false + } + if c.levelsBuffered == 0 { + return nil, false + } + } + + defLevel := c.defLevels[int(c.levelOffset)] + c.levelOffset++ + + if defLevel < c.reader.Descriptor().MaxDefinitionLevel() { + return nil, true + } + + vb := reflect.ValueOf(c.valueBuffer) + val := vb.Index(c.valueOffset).Interface() + c.valueOffset++ + + // Convert byte arrays to strings + switch v := val.(type) { + case parquet.ByteArray: + val = string(v) + case parquet.FixedLenByteArray: + val = string(v) + } + + return val, true +} diff --git a/plugins/parsers/parquet/parser.go b/plugins/parsers/parquet/parser.go new file mode 100644 index 000000000..4ee048e1e --- /dev/null +++ b/plugins/parsers/parquet/parser.go @@ -0,0 +1,149 @@ +package parquet + +import ( + "bytes" + "errors" + "fmt" + "slices" + "time" + + "github.com/apache/arrow/go/v16/parquet/file" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/parsers" +) + +type Parser struct { + MeasurementColumn string `toml:"measurement_column"` + TagColumns []string `toml:"tag_columns"` + TimestampColumn string `toml:"timestamp_column"` + TimestampFormat string `toml:"timestamp_format"` + TimestampTimezone string `toml:"timestamp_timezone"` + + defaultTags map[string]string + location *time.Location + metricName string +} + +func (p *Parser) Init() error { + if p.TimestampFormat == "" { + p.TimestampFormat = "unix" + } + if p.TimestampTimezone == "" { + p.location = time.UTC + } else { + loc, err := time.LoadLocation(p.TimestampTimezone) + if err != nil { + return fmt.Errorf("invalid location %s: %w", p.TimestampTimezone, err) + } + p.location = loc + } + + return nil +} + +func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { + reader := bytes.NewReader(buf) + parquetReader, err := file.NewParquetReader(reader) + if err != nil { + return nil, fmt.Errorf("unable to create parquet reader: %w", err) + } + metadata := parquetReader.MetaData() + + now := time.Now() + metrics := make([]telegraf.Metric, 0, metadata.NumRows) + for i := 0; i < parquetReader.NumRowGroups(); i++ { + rowGroup := parquetReader.RowGroup(i) + scanners := make([]*columnParser, metadata.Schema.NumColumns()) + for colIndex := range metadata.Schema.NumColumns() { + col, err := rowGroup.Column(colIndex) + if err != nil { + return nil, fmt.Errorf("unable to fetch column %q: %w", colIndex, err) + } + + scanners[colIndex] = newColumnParser(col) + } + + rowIndex := 0 + rowGroupMetrics := make([]telegraf.Metric, rowGroup.NumRows()) + for _, s := range scanners { + for s.HasNext() { + if rowIndex%int(rowGroup.NumRows()) == 0 { + rowIndex = 0 + } + + val, ok := s.Next() + if !ok || val == nil { + rowIndex++ + continue + } + + if rowGroupMetrics[rowIndex] == nil { + rowGroupMetrics[rowIndex] = metric.New(p.metricName, p.defaultTags, nil, now) + } + + if p.MeasurementColumn != "" && s.name == p.MeasurementColumn { + valStr, err := internal.ToString(val) + if err != nil { + return nil, fmt.Errorf("could not convert value to string: %w", err) + } + rowGroupMetrics[rowIndex].SetName(valStr) + } else if p.TagColumns != nil && slices.Contains(p.TagColumns, s.name) { + valStr, err := internal.ToString(val) + if err != nil { + return nil, fmt.Errorf("could not convert value to string: %w", err) + } + rowGroupMetrics[rowIndex].AddTag(s.name, valStr) + } else if p.TimestampColumn != "" && s.name == p.TimestampColumn { + valStr, err := internal.ToString(val) + if err != nil { + return nil, fmt.Errorf("could not convert value to string: %w", err) + } + timestamp, err := internal.ParseTimestamp(p.TimestampFormat, valStr, p.location) + if err != nil { + return nil, fmt.Errorf("could not parse '%s' to '%s'", valStr, p.TimestampFormat) + } + rowGroupMetrics[rowIndex].SetTime(timestamp) + } else { + rowGroupMetrics[rowIndex].AddField(s.name, val) + } + + rowIndex++ + } + } + + metrics = append(metrics, rowGroupMetrics...) + } + + return metrics, nil +} + +func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { + metrics, err := p.Parse([]byte(line)) + if err != nil { + return nil, err + } + + if len(metrics) < 1 { + return nil, nil + } + if len(metrics) > 1 { + return nil, errors.New("line contains multiple metrics") + } + + return metrics[0], nil +} + +func (p *Parser) SetDefaultTags(tags map[string]string) { + p.defaultTags = tags +} + +func init() { + parsers.Add("parquet", + func(defaultMetricName string) telegraf.Parser { + return &Parser{metricName: defaultMetricName} + }, + ) +} diff --git a/plugins/parsers/parquet/parser_test.go b/plugins/parsers/parquet/parser_test.go new file mode 100644 index 000000000..75c6fc574 --- /dev/null +++ b/plugins/parsers/parquet/parser_test.go @@ -0,0 +1,74 @@ +package parquet + +import ( + "os" + "path/filepath" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/testutil" + test "github.com/influxdata/telegraf/testutil/plugin_input" + "github.com/stretchr/testify/require" +) + +func TestCases(t *testing.T) { + folders, err := os.ReadDir("testcases") + require.NoError(t, err) + require.NotEmpty(t, folders) + + for _, f := range folders { + testcasePath := filepath.Join("testcases", f.Name()) + configFilename := filepath.Join(testcasePath, "telegraf.conf") + t.Run(f.Name(), func(t *testing.T) { + // Configure the plugin + cfg := config.NewConfig() + require.NoError(t, cfg.LoadConfig(configFilename)) + require.NoError(t, err) + require.Len(t, cfg.Inputs, 1) + + // Tune the test-plugin + plugin := cfg.Inputs[0].Input.(*test.Plugin) + plugin.Path = testcasePath + require.NoError(t, plugin.Init()) + + // Gather the metrics and check for potential errors + var acc testutil.Accumulator + err := plugin.Gather(&acc) + switch len(plugin.ExpectedErrors) { + case 0: + require.NoError(t, err) + case 1: + require.ErrorContains(t, err, plugin.ExpectedErrors[0]) + default: + require.Contains(t, plugin.ExpectedErrors, err.Error()) + } + + // Determine checking options + options := []cmp.Option{ + cmpopts.EquateApprox(0, 1e-6), + testutil.SortMetrics(), + } + if plugin.ShouldIgnoreTimestamp { + options = append(options, testutil.IgnoreTime()) + } + + // Process expected metrics and compare with resulting metrics + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, plugin.Expected, actual, options...) + }) + } +} + +func BenchmarkParsing(b *testing.B) { + plugin := &Parser{} + + benchmarkData, err := os.ReadFile("testcases/benchmark/input.parquet") + require.NoError(b, err) + + b.ResetTimer() + for n := 0; n < b.N; n++ { + _, _ = plugin.Parse(benchmarkData) + } +} diff --git a/plugins/parsers/parquet/testcases/benchmark/expected.out b/plugins/parsers/parquet/testcases/benchmark/expected.out new file mode 100644 index 000000000..369970dfa --- /dev/null +++ b/plugins/parsers/parquet/testcases/benchmark/expected.out @@ -0,0 +1 @@ +test value=42i 1710683608143228692 diff --git a/plugins/parsers/parquet/testcases/benchmark/generate.py b/plugins/parsers/parquet/testcases/benchmark/generate.py new file mode 100644 index 000000000..ccd42f34e --- /dev/null +++ b/plugins/parsers/parquet/testcases/benchmark/generate.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python +import pandas +import pyarrow +import pyarrow.parquet + +df = pandas.DataFrame({ + 'value': [42], + 'timestamp': ["1710683608143228692"] +}) + +pyarrow.parquet.write_table(pyarrow.Table.from_pandas(df), "input.parquet") diff --git a/plugins/parsers/parquet/testcases/benchmark/input.parquet b/plugins/parsers/parquet/testcases/benchmark/input.parquet new file mode 100644 index 0000000000000000000000000000000000000000..bb0d2c40ab6e760864e44ba0722770930129bedc GIT binary patch literal 2366 zcmcguPjBKz6gMPIvr-Px?wW$c0a;qKsz_=(z=l;X0|~~Iu#2<7##I&hU%UX0nZ;+Yy9JCs?_TjYuvKyl#lB9Yco&pB=tNCZP%KHn(LFA{zzs1Ocj6MD1Nt* ze^Q2Zb>+USP}!}iF|u5Z-h@J5rIY?q^bW}PxnEJ8Jb=ukbNO89Kkj$&|0zH7{YEBT z+|K;8@sgr)rR5{hL%4%OtN^)JKf+>r zVYN(pv4Ui&egFiw>9z!TlH!=77ES`v5y#d8Em+`&_zNuv zCNm7-$_l3T#KT&yvd`{w48UcQ><^6mK96lwb_Yq)JNa$Qp6rj?U*IuEM+l)F{P6e+ zp)#ruQ4^sdY7B|4USF}#u=03mZ_XO=8fFrO*hPe92=>8~1c4{n1oHrX_1QqKTn>~I zgc?2AUw4^4hD7uxAE}vna z$du>PP6OG-fiyC_9_Y|4Szj3qu__)NPsK+2+7$XT9Op#gOl%+UwW%p{k;2EhRb`no z8pxc3eRZ!t)f+gU>Z|hnQsLL+6`Fv(d955s6+^(7kZsBpAL3uCry|Y`7`d6%ud$X= zCvio$W@~PLY&M8JR269iG1$Q=;B8H9Rj!P6Uh;LDnhS0w| zT#(ysLS1Jzu2FRzM;D|E-ED?|r3zU1fjs9DT**0fFU*!ROYFYx_4sa#YYlEEGLHsq zdy9-rrvigCD z?+^5D?hoK@8Qvx7wb61wZeN)TayGoGw68U}7LhwRKIS@O6rG&5YvhhCajK5tb$y*L zlNcvJICOu>PB(PiN-5>kO3ct-;E zP`!sBsDO|`2#m=F#(4BZ;zUTdq&6hO+Lk&3emU3?_5%Tjz(&l|WSrdz2vOOBuLa*m iR=8*Q{vLVp6p|N9A%5lLO7M}z-`O8NrKtDt&*^W?32(0e literal 0 HcmV?d00001 diff --git a/plugins/parsers/parquet/testcases/benchmark/telegraf.conf b/plugins/parsers/parquet/testcases/benchmark/telegraf.conf new file mode 100644 index 000000000..4cd219fca --- /dev/null +++ b/plugins/parsers/parquet/testcases/benchmark/telegraf.conf @@ -0,0 +1,6 @@ +[[inputs.test]] + files = ["input.parquet"] + data_format = "parquet" + + timestamp_column = "timestamp" + timestamp_format = "unix_ns" diff --git a/plugins/parsers/parquet/testcases/datatypes/expected.out b/plugins/parsers/parquet/testcases/datatypes/expected.out new file mode 100644 index 000000000..218d342a5 --- /dev/null +++ b/plugins/parsers/parquet/testcases/datatypes/expected.out @@ -0,0 +1,2 @@ +row1,byteArray=Short,fixedLengthByteArray=STRING boolean=true,float32=1,float64=64,int32=-2147483648i,int64=-9223372036854775808i 1710697199000000000 +row2,byteArray=Much\ longer\ string\ here...,fixedLengthByteArray=FOOBAR boolean=false,float32=1.1234568357467651,float64=65.1234567891212,int32=2147483647i,int64=9223372036854775807i 551812924000000000 diff --git a/plugins/parsers/parquet/testcases/datatypes/generate.py b/plugins/parsers/parquet/testcases/datatypes/generate.py new file mode 100644 index 000000000..a20f07382 --- /dev/null +++ b/plugins/parsers/parquet/testcases/datatypes/generate.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python +import pandas +import pyarrow +import pyarrow.parquet + +df = pandas.DataFrame({ + 'boolean': [True, False], + 'string': ["row1", "row2"], + 'int32': [-2147483648, 2147483647], + 'int64': [-9223372036854775808, 9223372036854775807], + 'float32': [1.000000001, 1.123456789], + 'float64': [64.00000000000000001, 65.12345678912121212], + 'byteArray': ["Short", "Much longer string here..."], + 'fixedLengthByteArray': ["STRING", "FOOBAR"], + 'timestamp': [ + "Sun, 17 Mar 2024 10:39:59 MST", + "Sat, 27 Jun 1987 10:22:04 MST", + ] +}) + +schema = pyarrow.schema([ + pyarrow.field('boolean', pyarrow.bool_()), + pyarrow.field('string', pyarrow.string()), + pyarrow.field('int32', pyarrow.int32()), + pyarrow.field('int64', pyarrow.int64()), + pyarrow.field('float32', pyarrow.float32()), + pyarrow.field('float64', pyarrow.float64()), + pyarrow.field('byteArray', pyarrow.binary()), + pyarrow.field('fixedLengthByteArray', pyarrow.binary(6)), + pyarrow.field('timestamp', pyarrow.binary()) +]) + +pyarrow.parquet.write_table(pyarrow.Table.from_pandas(df, schema), "input.parquet") diff --git a/plugins/parsers/parquet/testcases/datatypes/input.parquet b/plugins/parsers/parquet/testcases/datatypes/input.parquet new file mode 100644 index 0000000000000000000000000000000000000000..cf40623dfe5f974289143934825c26cd5fec9f66 GIT binary patch literal 6560 zcmc&(O>Eo96{fU`;{@KVn}mY2Pyj(_*Z}L;5-r)0u|Pt~mMqnhE!h?&wM9Tmv_+ew zwED3m2SI~HPmBKSp@$rD$hqjLm)?3RilS(N9(pQz=wY#^1-dBq&|CW6kfJGya+BSp zWs)K1{eJJw%zI;Jk1NG!ihhs2u|)geV~(B+1w5XBhw@W*$(y36s{t>JK97g_L5QY; zcjl;I;HuPWsj{fkUcb-3=RaWPKXm8?K^Fd?Dwqz_-`PPAHz;cHLm$vOtwGFuvmkbZ z_9KEtf?$oh>jOR;z%d5G*TN=(cf8DJ-VnVIOi*)+!N6R%*HQFi0^(DT|CxvRiI@44 zC*&{?G=lRYp2abuMf%nl(R>JeJbn4{<&T_50zRY$5a?6Lh;8OT_34!m?KM$dQS{y_ zOF%vI_+NOK7e408cS4ugas@b%GH{0vg2ZPU=)A!Iu=9o}0qjBru05R}&xdHA4ftA| z0RMx>{}&JQuQ}$|upwNmKr&pYlxrZ4Oh0|l`t#2pOlKOYnCYhvFw++^2jI`Hhv<0| z@YN%=B~Ir0t82{fu0gIZ^c~h*A-d#x@8K{1ng7k_nZ1WoQ0K>-ajy3sVm_v+4(NY- zH$-2bfIgA!KQ1tzEI_uW?aZ+58cKmKjra9;pIre2`^{FT_da~&`}Jm6ZRy8yCu|&? za8vHc%gf8o2-m2)SBP5R{n<*6;C}J#i*tER8uetZ(-B8xGkxLlf8}9*w#dBr79P!4 zFzp&yfDVl#TUgpZC}j(o^zJUtm0aM@8T`PUA>qLK#mx}?p1BaWj+CL?*pc<)UX!1I zZB~?<%wKOHY^Oo{dE*z*^Ou%1zdu=eW}I3?fuK_Ia2X)z_x5XMU%TdQEQWB`@7fw zxtIC9pLtGW{NK#0TegC~N>EV4>rGh`=^Id;?|7}oLeUE_GYkA3Y6nX+?FEuiU7@F# z-)nJ-o<~Kq7EttcYlb{DbFm+>*DcWAfa|K8!d4b2zb_aHKL5IH@6EI-=1sR2=53dJ z=FOLA=IxhG<}DX6<}DQ+=Kpts{r_I@`U8zyHIfaKxBCB0QN;A4hailYy`IRTU(T6j^ONnxq+4AN59U6pZwBrQT}D z5s=pVT6<(MTGEMJ?;%~2dtyWEi6H(sBI_Uyw?x!EJV@WWA9?cR0?Z_eh+_X5F8T%f@iHvvDRJ62^az&L%b)pN-+>(k(`t%4G?)ZdB88POuzq zk=I3)$v9n)%G7>wyBSTppC$DskytHx+WiFUoBAYmp(BgER)^FMRgpSkXN0ArJrXoc53i|wdR#W@6OgUa*3un@HxhLs)Wmn;)%KqRutC=)4*_O1DDrd^)*^}0>u-a85 zE#0f_XVVViy<<6>FTdl4Z4Zcv0R)tctk=a6gfViEOl@S+kn>X0%E@-Lo87kMFqJ!RZ08`y^;vi(`1MR0)+~KqSuG{%kiT7p zQyM~|jroAw;ac)c_CJh*qsN942>CnAb#85)7M=+i3ng zI}1}=(u?PLh1fIqRl&Y~cD@T9wcpsRMqeXGXpb!z_^0+7?qkcI+kUh2HQ2KiK4H%v z##%UW-D`k-*gb;tFu8|nU5QO!mLE;ZB%6)R{Al(bnzZ3^222~Tl_!Tztz8p_rxo^M zU2EF#tXi>h>9%$b8`Jr6*m$M=RMpjJ?Jzy6Yw3is?$g+&WL(!WmuO%u!n5hJmJ*x$ zs3sI%XK!g@rK#4ohN`4BqT&{@&)4-_Q#Q_%%YLi1p&p(dWXk>8YN=Ip#`eF#9TlP0 z)>lhSslv@Vr&BU+^@HTSlAl%2E~~}H`6xq8OXW`LS{Z6|a)h-G&P!Jma-(V`mmoD- z=M(Tfl2(m6*U-wNI;);Z3iOdwc39Yt4|Yzr6g+FTp25~CxpPrS_KA&?Y(KBWhgm&} ztu2Sc8MeVJwmp6ed*K$&3)ufsE#%8MY?3BP`lY7POGMU2020 z`5bC@ua4}@`3Ea2ADph7t*pR|-y9;5>*4^8$Il^tKtGAsgi^i+y{VCDrsPUVEkfMY z$|-QGT#B@0?8jUNhs3Mk$=LQJ;wzj0p8G|-O2CZg3_QfB4G#&;I;-5~zA2yLHqYVn zKB#VOlB57AoREc=ZA6WjIgyi&(cNEU7t?;!u#>;UeQ>kNz zKCx`WZ|OrHLQ^mdFn!-K)3T_sGRDY+ek*#+5Pn9DbuU>n|0$ LNl|<7e>499!f)td literal 0 HcmV?d00001 diff --git a/plugins/parsers/parquet/testcases/datatypes/telegraf.conf b/plugins/parsers/parquet/testcases/datatypes/telegraf.conf new file mode 100644 index 000000000..1ec7b0d5b --- /dev/null +++ b/plugins/parsers/parquet/testcases/datatypes/telegraf.conf @@ -0,0 +1,9 @@ +[[inputs.test]] + files = ["input.parquet"] + data_format = "parquet" + + measurement_column = "string" + tag_columns = ["byteArray", "fixedLengthByteArray"] + timestamp_column = "timestamp" + timestamp_format = "Mon, 02 Jan 2006 15:04:05 MST" + timestamp_timezone = "MST" diff --git a/plugins/parsers/parquet/testcases/dense/expected.out b/plugins/parsers/parquet/testcases/dense/expected.out new file mode 100644 index 000000000..771163474 --- /dev/null +++ b/plugins/parsers/parquet/testcases/dense/expected.out @@ -0,0 +1,7 @@ +a,tag=row1 float_field=64 1710683695000000000 +b,tag=row1 float_field=65 1710683695000000000 +c,tag=row1 float_field=66 1710683695000000000 +d,tag=row1 float_field=67 1710683695000000000 +e,tag=row1 float_field=68 1710683695000000000 +f,tag=row1 float_field=69 1710683695000000000 +g,tag=row1 float_field=70 1710683695000000000 diff --git a/plugins/parsers/parquet/testcases/dense/generate.py b/plugins/parsers/parquet/testcases/dense/generate.py new file mode 100644 index 000000000..32878650c --- /dev/null +++ b/plugins/parsers/parquet/testcases/dense/generate.py @@ -0,0 +1,16 @@ +#!/usr/bin/env python +import pandas +import pyarrow +import pyarrow.parquet + +df = pandas.DataFrame({ + 'tag': ["row1", "row1", "row1", "row1", "row1", "row1", "row1"], + 'float_field': [64.0, 65.0, 66.0, 67.0, 68.0, 69.0, 70.0], + 'str_field': ["a", "b", "c", "d", "e", "f", "g"], + 'timestamp': [ + 1710683695, 1710683695, 1710683695, 1710683695, 1710683695, + 1710683695, 1710683695, + ] +}) + +pyarrow.parquet.write_table(pyarrow.Table.from_pandas(df), "input.parquet") diff --git a/plugins/parsers/parquet/testcases/dense/input.parquet b/plugins/parsers/parquet/testcases/dense/input.parquet new file mode 100644 index 0000000000000000000000000000000000000000..302cf94ed41e48fb1ead59bb2be30d964f2e767b GIT binary patch literal 3540 zcmcgv&2Jh<6rUxS;CxtCltmD6K(s+QI0=i56QUkG5Wq_@F*sqDZ7XZPfW^DZ1{Pxy zMT&aLp{m-P`!`fQ_RvGrL;rxDd*~lgE$E0v;1S%uk|_ujnso8No$);YvA zj`g#f?DaD1W6?E)f>A$0sMj5GEMl**p)Ga;)B!dS-9@+jL>{Ii7_G;AFm5o+21pF&37RW{aB!kIpAn9irLcLnKz?73vi>o{>#kwxeOI;|%J0ui=na$j zHJErE1b&x2y~F>iLqxtzqi~?FwslKpgNVIy(?`qkCMrkCX)eKfHou<5Ib{<&KiHk4 zM4^pHG*bTHBL7HbmiX{;NPLWRg}x4iHWI36atytWaEgb~*z%*e6_0)4=nWXcs1JiqSs*@@A;wxBq z{*I}0ee8DL3%&Kn^D+S#0BL-WUAfWp*fgi zU`8=R!BXgHhguFir&8Xmq=*iMui#qkk?_E4ZQ(yJ-z859 z=P|}r_>t#xjCXLhg$o$BaIQsV+3cKtNQAIwtk1U|dTSV|7pmtm9%9%BZ|VfzbQ6pX z_+^I;A$`&i4`7;y{n>lGWe%ibeMd}HPL=(-tJnv|BO|Nmqf$=>TfWn6@ph@LSenTj z!xnF)+lugn*bFq`tY6MyL*1^mRl5o@JRGl2)f~UeKUDjCuJlYR)`!H;fyik@-$-Tp zn!pXjlowkPr!fZ&fisD%V%Pghj>MB3NjN(ZQ!C;Y3xK`wOx&)e)gr-!4NXWp;QvHA z;z?}4$PM*+CaH^A>X)}NhHTY4T8`>nNvyTON3wAQc=!8;B&0h^s^%z`O8wRAwPN{k z%V;iOS4Uu9aDl%GE{I($*d9lVm}hi$7PthyxEV}#eIN^!Qwh#M_<^&#>b}|3_RS%j zV;9bG+OQn6v0e4_LOt0KYHh8UpXQ-?G;Bf6ru+eKeI->$iYX7vyKqU?*=a+pH6_b5 zm7?cgUBRnoS4~ws-d6DH#jAAQ1EXC@s%FNLg?eA|{Ce|3=KLJXHsczu%<{u2mPzt9 zsZmZ5v+3EoP*SC*l2A!{Ja8uD3hqIzJ*~CH zn^oMwJxWe_S#>;Tl;T-NgxvC0Wh4oOq_WMEoI~B6ISsg%s(nK16zT=$ zrBiK0>pCTgrDw8`8PNON$#dln9vmE%GW32<^DCLEU2&Au4)t+h?2O8tywRlngbbv` zV62fLPlcps;l?5KFVb$RJQbFRaN5c+&JpQ8+##V3cfa7ORNo@rpdYC?N7rP4b_b6= z_4kDO+ow`Gr8CdpOsx=U=`rb|JQ-k>#b?y?gpAPtCQq467HXV`ilje6zlAq_a%2;r z0elVUk{B8!?VA%2*=&v4ktRCppAtmwJvl$Pp!}e4%#nLf2fBM(b)4Ju!x0;Q7-HTh TBSQYBo&H1rUP9;){y+C0nwzha literal 0 HcmV?d00001 diff --git a/plugins/parsers/parquet/testcases/dense/telegraf.conf b/plugins/parsers/parquet/testcases/dense/telegraf.conf new file mode 100644 index 000000000..a0d764aba --- /dev/null +++ b/plugins/parsers/parquet/testcases/dense/telegraf.conf @@ -0,0 +1,8 @@ +[[inputs.test]] + files = ["input.parquet"] + data_format = "parquet" + + measurement_column = "str_field" + tag_columns = ["tag"] + timestamp_column = "timestamp" + timestamp_format = "unix" diff --git a/plugins/parsers/parquet/testcases/empty/expected.out b/plugins/parsers/parquet/testcases/empty/expected.out new file mode 100644 index 000000000..e69de29bb diff --git a/plugins/parsers/parquet/testcases/empty/generate.py b/plugins/parsers/parquet/testcases/empty/generate.py new file mode 100644 index 000000000..8570fd125 --- /dev/null +++ b/plugins/parsers/parquet/testcases/empty/generate.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python +import pandas +import pyarrow +import pyarrow.parquet + +pyarrow.parquet.write_table(pyarrow.Table.from_pandas(pandas.DataFrame()), "input.parquet") diff --git a/plugins/parsers/parquet/testcases/empty/input.parquet b/plugins/parsers/parquet/testcases/empty/input.parquet new file mode 100644 index 0000000000000000000000000000000000000000..94603b70e317c65ab935c3576bd1f51d826c24f7 GIT binary patch literal 976 zcmZuw%Wm306t&tYt1O!cq%4pZT_q_tI2Dn)7#uKmfdo+TqpHyO5o53i#}ABy@<-kD zANpavGf+g-jx?S-=iWKz_QgdB7Dwq7hLtSdUgTpN0=Wtu6k8D zjWKa?TKV;E%Ll~uHZx}x<`GHxyW00HzXS{-KEcHI_^TREaO43%@-T#5n&BjaZJoud z_z-$D6kdM-zT=xQa&J%jmgOn%LU;BS+u53BWlZJ3T6WuH`gs(W|H6@%VHal@YN@p& zVW!A@Bp%LI2~=$PFt8Jxl$1FxagwYG%DwiIG*}Ue3erveM&Mro-8(r-iZ>$7ag{SP z9zW=Jy9QT&o_wN1^4kcZ5llM1BGg1}A9WG(k?b?94FyN}f`XkxUSa294}(>)yoykP z;2f-s1Xgwko+iw8Vd~P-GzJLWcc85OS&hQnQl?Eq9Iox&G_%PdcnI3IS854{T6MAV zRie%9$PLw?@YP6~+xj(s3#PF=lXCl z3aAcnWKJ)9a(iK6O~8+=#Wop4@-MZaS z>AOSUHn9nPambSCf)2#m+ASHkXc^iI)>}F2-=ZV*7bqW0v=-~n!M7DUWEk6GlW`1! uX4zkKf~UV<9icB=r55AlIrp+_CyuY#C9Lf)S#5Xys%PBq_Z;^K|MDNQKQNF0 literal 0 HcmV?d00001 diff --git a/plugins/parsers/parquet/testcases/empty/telegraf.conf b/plugins/parsers/parquet/testcases/empty/telegraf.conf new file mode 100644 index 000000000..01ad04007 --- /dev/null +++ b/plugins/parsers/parquet/testcases/empty/telegraf.conf @@ -0,0 +1,3 @@ +[[inputs.test]] + files = ["input.parquet"] + data_format = "parquet" diff --git a/plugins/parsers/parquet/testcases/multitable/expected.out b/plugins/parsers/parquet/testcases/multitable/expected.out new file mode 100644 index 000000000..d0d3c46f7 --- /dev/null +++ b/plugins/parsers/parquet/testcases/multitable/expected.out @@ -0,0 +1,21 @@ +test,tag=row1 float_field=64 1710683608143228692 +test,tag=row1 float_field=65 1710683608143228692 +test,tag=row1 float_field=66 1710683608143228692 +test,tag=row1 float_field=67 1710683608143228692 +test,tag=row1 float_field=68 1710683608143228692 +test,tag=row1 float_field=69 1710683608143228692 +test,tag=row1 float_field=70 1710683608143228692 +test,tag=row1 float_field=64 1710683608143228693 +test,tag=row1 float_field=65 1710683608143228693 +test,tag=row1 float_field=66 1710683608143228693 +test,tag=row1 float_field=67 1710683608143228693 +test,tag=row1 float_field=68 1710683608143228693 +test,tag=row1 float_field=69 1710683608143228693 +test,tag=row1 float_field=70 1710683608143228693 +test,tag=row1 float_field=64 1710683608143228694 +test,tag=row1 float_field=65 1710683608143228694 +test,tag=row1 float_field=66 1710683608143228694 +test,tag=row1 float_field=67 1710683608143228694 +test,tag=row1 float_field=68 1710683608143228694 +test,tag=row1 float_field=69 1710683608143228694 +test,tag=row1 float_field=70 1710683608143228694 diff --git a/plugins/parsers/parquet/testcases/multitable/generate.py b/plugins/parsers/parquet/testcases/multitable/generate.py new file mode 100644 index 000000000..00214806e --- /dev/null +++ b/plugins/parsers/parquet/testcases/multitable/generate.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python +import pandas +import pyarrow +import pyarrow.parquet + +df1 = pandas.DataFrame({ + 'tag': ["row1", "row1", "row1", "row1", "row1", "row1", "row1"], + 'float_field': [64.0, 65.0, 66.0, 67.0, 68.0, 69.0, 70.0], + 'timestamp': [ + "1710683608143228692", "1710683608143228692", "1710683608143228692", + "1710683608143228692", "1710683608143228692", "1710683608143228692", + "1710683608143228692", + ] +}) + +df2 = pandas.DataFrame({ + 'tag': ["row1", "row1", "row1", "row1", "row1", "row1", "row1"], + 'float_field': [64.0, 65.0, 66.0, 67.0, 68.0, 69.0, 70.0], + 'timestamp': [ + "1710683608143228693", "1710683608143228693", "1710683608143228693", + "1710683608143228693", "1710683608143228693", "1710683608143228693", + "1710683608143228693", + ] +}) + +df3 = pandas.DataFrame({ + 'tag': ["row1", "row1", "row1", "row1", "row1", "row1", "row1"], + 'float_field': [64.0, 65.0, 66.0, 67.0, 68.0, 69.0, 70.0], + 'timestamp': [ + "1710683608143228694", "1710683608143228694", "1710683608143228694", + "1710683608143228694", "1710683608143228694", "1710683608143228694", + "1710683608143228694", + ] +}) + +with pyarrow.parquet.ParquetWriter('input.parquet', pyarrow.Table.from_pandas(df1).schema) as writer: + writer.write_table(pyarrow.Table.from_pandas(df1)) + writer.write_table(pyarrow.Table.from_pandas(df2)) + writer.write_table(pyarrow.Table.from_pandas(df3)) diff --git a/plugins/parsers/parquet/testcases/multitable/input.parquet b/plugins/parsers/parquet/testcases/multitable/input.parquet new file mode 100644 index 0000000000000000000000000000000000000000..2a4d385ed38df0564f1ed4545f80b20d4b46a830 GIT binary patch literal 4711 zcmd5=-EZ1f6u(fCCe6p9w5Cwy0io5ZrZmF_@=>)?3@yZr&_EIlOjBeVhd8l~3kE`~ zNY$RIshYN_+QU@!VUK&<<1|fsnEJRsVA|6(?P<@`H0@#MUK1e1QCmerwaWFq=NzB& zyXXAwvC|DM$50+>hPslb+!UHcXl%}d5UN#M427s!$~#X@gL;%2om)gVJXr1{BN&Yb z+%QhNT+@jD%r!?%`lD!c#_zjei51H2o%TLcF4a)vBVkv#TmvnQY^^+xxGJ{>M#!;huYg7>=7< zpxEO~UoBh)9Ly3ET3ikKB*yHy8cQDE;dQE{~Hl=LnAbIzNM+UFty{V9@vD^waUToW6-@ZftK79s<&WP|oap?Kw9@GI2{WMEIn||Z9Ywy;y1b|8A~lMr$?I#ab?m(F`EUDgpZldYq}K{PoG$PE%j2D<$Gm@dt{?>2Ie){Z zhto3Y;dC%NKTpSTg4{Sk=kG@JaQXo~oa1RH=-DhzkO%qaeWRvml*PL5#mJ+eYLu13 z9l5GCbfX@;75MN`@Bm~m3)V!Vq6FsyK||CP5EuO8bcswAJY|kP zf`#pGhd5U{Zcln4pLshcWj3rGt8L~EmxD)rU{0b4!nZPZa014$@LV`>mX78DPG@hyzR%jz5eJk5k%@LO zLTYd#1AU6V6r2j({qj5=_=IX8?3*Q?L?uP9DWX-aky>b~R1<4$Tne)-*5L0K456vi z>T1=%QYN|(T3~~asbD91pORf*7ntA?@=bEN+|L3-5LHnQYu%!$`akJUuq?a{bZB=+Zx+e+S z*rrhy4(sVSr^?~no@``6#Scjx6j(cU zsnCttC<=^*Z6zaLm*O~H$t($n2L;w4ZZZMb6AugFTvSeCOq^O4q9*u1C~fjMHeh60 zN`57z6k^0Ludk@0p5HIWiQXy|a(mz-RNMr-Yjw3GME4~&XG*$E{N?kxWO}Qj?has= z+hCutAb-cWAa*ig*f|=++@~{QkW0uH(}&5(H$)-REWsXdeqisGyr%7z*R&SwV-fbz zEb69K3}NP=U3voQ12TA2y4H+-^zNG%0EO zmmTtI+hy5|oi~TP+VL{`Z-CK&JPUc?N5TTW;Cktzkgt~#E4_W3w&`#<)1Mzk(luPS zV~#RXAr|fDg~X;Fl!Q#k&I6-COgIO*y)M_*I@}DWG>|uC)#ihg)Kpndv=zG!Ek#JQ zWql<~{1_!oHZrEfMhI8dx3(MW@kpCDLU=DdoO-@3vszQaKI42lz0Wt+H&ZKcAaM_{ zPwHKAh~a#5oUsA@0Uzh=HW992VYgLqTpUZ+`f`WWsV&@9cs#%=#vNiIcYp`z2gg?$ zPX=m^MM;bT`VZX66~_-jGlKq6gs#Cs!ytWx;DKo@mLqn!sr36N`@!b`;{_LFKg5!| qa|ihXrdWH>P^_D>Y2G9aHPC680lSs@Z^M_i{SSRUfzSs07t??1p&OV0 literal 0 HcmV?d00001 diff --git a/plugins/parsers/parquet/testcases/multitable/telegraf.conf b/plugins/parsers/parquet/testcases/multitable/telegraf.conf new file mode 100644 index 000000000..3261e20dc --- /dev/null +++ b/plugins/parsers/parquet/testcases/multitable/telegraf.conf @@ -0,0 +1,8 @@ +[[inputs.test]] + files = ["input.parquet"] + data_format = "parquet" + + measurement_column = "str_field" + tag_columns = ["tag"] + timestamp_column = "timestamp" + timestamp_format = "unix_ns" diff --git a/plugins/parsers/parquet/testcases/sparse/expected.out b/plugins/parsers/parquet/testcases/sparse/expected.out new file mode 100644 index 000000000..68021451d --- /dev/null +++ b/plugins/parsers/parquet/testcases/sparse/expected.out @@ -0,0 +1,6 @@ +test,tag=row1 float_field=64 1709313032000000000 +test,tag=row2 float_field=65 1709399432000000000 +test,tag=row3 int_field=65 1709485832000000000 +test,tag=row4 uint_field=5 1709572232000000000 +test,tag=row5 bool_field=true 1709658632000000000 +test,str_field=blargh,tag=multi_field bool_field=false 1709831432000000000 diff --git a/plugins/parsers/parquet/testcases/sparse/generate.py b/plugins/parsers/parquet/testcases/sparse/generate.py new file mode 100644 index 000000000..0242260bd --- /dev/null +++ b/plugins/parsers/parquet/testcases/sparse/generate.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python +import pandas +import pyarrow +import pyarrow.parquet + +df = pandas.DataFrame({ + 'tag': ["row1", "row2", "row3", "row4", "row5", "row6", "multi_field"], + 'float_field': [64.0, 65.0, None, None, None, None, None], + 'int_field': [None, None, 65, None, None, None, None], + 'uint_field': [None, None, None, 5, None, None, None], + 'bool_field': [None, None, None, None, True, None, False], + 'str_field': [None, None, None, None, None, "blargh", "blargh"], + 'timestamp': [ + "2024-03-01T17:10:32", "2024-03-02T17:10:32", "2024-03-03T17:10:32", + "2024-03-04T17:10:32", "2024-03-05T17:10:32", "2024-03-06T17:10:32", + "2024-03-07T17:10:32", + ] +}) + +pyarrow.parquet.write_table(pyarrow.Table.from_pandas(df), "input.parquet") diff --git a/plugins/parsers/parquet/testcases/sparse/input.parquet b/plugins/parsers/parquet/testcases/sparse/input.parquet new file mode 100644 index 0000000000000000000000000000000000000000..98177d15ca4907e7e937ff0118599d95d0db5b26 GIT binary patch literal 5499 zcmc&2&u`mg+fJ*{W?dl(XFyG}#Q$(moJq0PEDF`(jKP2Ac}NSvnW z7~;SIeL~;q(|3m3clgAa0|&$bap1rKapu5*zW@Ozp7*sA+i|8bts3rZzwh%r@ALbI zmp!h^P_xvBlu)J?DdGx2ypfnC2%_5_Fqh{D7VdSpH{ecRp1ZSo32;VVx3t3}P1O~O zprX{PH{j?J6{f<8P2&129Nmmv@({t1OV>m2TqI{^KKb@l`d2bREyUAAcqty6vBV}7 ziY`V!3Ppb==@03-$1sWC06hsRM8IPSUIYRA73yjkfkE%3I{<=f?=F7b-Mb5fSE#6~ zTx@QHaaKSZja;7s1-khaBw}WUoC(q2%_gXbqwl37y)9af^+@mU$mkCw{pAe(dnoY~ z5~iB7L@k%85H@EnF;e!4W`L>Dg@73VbG1E5k9DE#2hl>=ZI@Wq(9AyALi-L=0{feSdG2pgA!A$ARwc! zpv)m~TE|E-MiP)K_zfDp5X0HAN75f%PEbVrT9}B(B2v4pdop7$dO-hw;m*Y2IMBnN zlfcVVY!vax)i7|Dbg|oPO~!mPay@KgV$R!j%X{Q3LI3+jx1{EKR@bA(r)2a$B>lxA z{paPx6h2d|pQXMDQ6H0|j_^5Ie;LHFDRyHuwZ58S4w$XmOzQSJ%f15O8a&p~V*?(u z>4WM!1hEOrt6O6_FlVw&ygX*-lEqx0D+46^yT>nAJf>Qjq4q4%=-?jm+f4MU5dHm2 z^#7Jn7jfA=NyaoM#6L|Fv2agrsfI``64b)A5bjpjh%%~8Dgt|NH?SdYVcC06jzZYs zh$l;Io^H5CoROsYv1JZ4x)@KyKe(vg`J8&%bLlzHrzbp9rC(@$($A*?qW@ZmF4A++ zA0x{I^vGDaBbtiXi~TY8FsYe}dUhzc^}b>DlDAjhewaJ~7_5?A(QK;88!JguG*keX zeO-qg6lvFjZ3?g39awJRrHYo!qjzCr^E<@MRfnyg3;yUh^uuhm4y|DaiA(lPO>Qf; z27RM593$G&u_{|gGefmRMYKd9f0$HFAlJ+$I{V-tw|Xo2$fJy3izIxkAO=1)M!;v% z19|Du4N%9iE$R8p2uhqlPDsYs%?%s_p9ee$j|&2#$3rzJ*oV4cvF` zgM$6I3Bd>%^zzg^3pjZ^qk}Sis0oozm)!xWO(!dR9w$pRkYe&TxPl^b;a%w zRas{!cHut>NQG0i+tb=6f->owshdm^iXiDo_aNDuEW#171+H3si@$A8<=F2_Yv^LQ zJjZeSaHH!@j@#feO)kfAO)lHSuuR5>A0Xh)7+*$fd^=PaU*xy}2j}38nZSEQ)PPlH z(BRW2jruOf6;9xMW~*rEeIZ}lsI!$*sZg^db60yy%Sh_5)D?lYc-(Fl%~DG;6uqbo znnfeslK2ORrmyg4y>gb*4V?X{oGoq^@5#MlwsfxKYXih*x6UXC-(a_U z3eWWGtSwupk7f27JfkC8$*lFHERrX51pe%#&Q6h=&w=i_^ZI%U56;YhFTr1oM<=t^7x~Jm05%~1fbEuC(2taY zJ^(v5!H(00(a{_0`!=4hr5b#-rQ~= zkznY$l(+fIQ~YYv?XwlzZ&Ung%c~qejgER>$!!}VU+W1rUwfU~_Sg}iadNog^^YOt zw_D1+M#{-iMyhAh-n^L5-yipUnvwEn+vO~(2Twftfb5`tX00Qc|CIkqrxJVSi=7?E z>XBsREGKs~3F^(42h(!UK)ePQmw#p@CF@U!ZIEVKY#GVWQB98ePaw7uTY6vMD^Bbg z9qJHzT(vdsBZX8C)wahcDG~Y%>RVEs!!}RxTYzR-T|862aq_MXwOceQLxInwr1c`I zX~@H~P6PU`Y@Xmg1pNTkrBh{y`xYzIOXniL-N*g;c!wz;bN$`>CCFju$K(1HblI$Q zBz6Py*wr?M<>MXg2u~du&4VJEKx=47?4o&*eSnq%dfdZAXvT2qy{)x1G+y=}U|<0O z$D<2QA!q@5CSQYIpR*8Wu5$j`+S;vsH0ofN$y5=L!zh?fqpO4#5QpI?KhZwB2Zzqj z;c*LgJ9(@`W;Fj01-jaBXMDL0fE1D^qVrha-Rb%wQwzSj<+u{qh~*VI<4NZc*Sx1U zz?R!}&krZgV3b EUrMTg+W-In literal 0 HcmV?d00001 diff --git a/plugins/parsers/parquet/testcases/sparse/telegraf.conf b/plugins/parsers/parquet/testcases/sparse/telegraf.conf new file mode 100644 index 000000000..540f798b6 --- /dev/null +++ b/plugins/parsers/parquet/testcases/sparse/telegraf.conf @@ -0,0 +1,7 @@ +[[inputs.test]] + files = ["input.parquet"] + data_format = "parquet" + + tag_columns = ["tag", "str_field"] + timestamp_column = "timestamp" + timestamp_format = "2006-01-02T15:04:05" diff --git a/plugins/parsers/parquet/testcases/timestamp/expected.out b/plugins/parsers/parquet/testcases/timestamp/expected.out new file mode 100644 index 000000000..4f37063ff --- /dev/null +++ b/plugins/parsers/parquet/testcases/timestamp/expected.out @@ -0,0 +1,3 @@ +test value=1.1 1710511506000000000 +test value=2.2 1710597906000000000 +test value=3.3 1710684306000000000 diff --git a/plugins/parsers/parquet/testcases/timestamp/generate.py b/plugins/parsers/parquet/testcases/timestamp/generate.py new file mode 100644 index 000000000..1dc889140 --- /dev/null +++ b/plugins/parsers/parquet/testcases/timestamp/generate.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python +import pandas +import pyarrow +import pyarrow.parquet + +df = pandas.DataFrame({ + 'value': [1.1, 2.2, 3.3], + 'timestamp': [ + "2024-03-15T14:05:06+00:00", "2024-03-16T14:05:06+00:00", + "2024-03-17T14:05:06+00:00", + ] +}) + +pyarrow.parquet.write_table(pyarrow.Table.from_pandas(df), "input.parquet") diff --git a/plugins/parsers/parquet/testcases/timestamp/input.parquet b/plugins/parsers/parquet/testcases/timestamp/input.parquet new file mode 100644 index 0000000000000000000000000000000000000000..89affed419d17fd25039e80b403d2a13777263e9 GIT binary patch literal 2444 zcmcguUvJu06u(f)(kKtnR3nh66j>>%G}77*rXyT7pKfuH`9@zXz zl&>+ZeVVkFJ@pgpL+p9l)4oAp_Plei2?3UuPTH>G>vPXJ_nhDPXL2IcSt>y>RQ`}k zQs_&B(uoi6W3j}iH;+~}V{cnzt-hrY^_WWML1!JFWH!BWD@m_>c#qeoH!1Y&9xg&? zbu$ioG8TWZiE@9&veasZN6Ga}`r%M>LW_#0lBwTg@RvKut$dE@OdhZ!Dvn@VhZjO{ z{urbD+h7zw15A$L3R}$f7R#Tpg&l_9VTxZd%nrkFPY^14PtdEEOM>Nai|AQqW@TA9 zeJ5IWupIm!53|(TH26n>?OKCCbA6J4zv8K1ka6gjc>2%OW*-T_cgC=(fJM9(_9M}Ug$V*g39#+I5LFp`9N+H z(!wNr^Bo)_dl$sGH8s0vrDa==d9lcgU|j?gAG@PN&o+8ygdlYN$xP7GyOt4PWUdux zrWR033ZJ{a%ZK2X8 zymEO#KElksnOqbq(HVAPg~%mA7zuC>-oyyJ$tBnu@RUalg}-d5M*=v3`{kFC>x4~F zE~wn`P~VpW-8-`1*=60D9P~BNmAbvQa3Wqv-kraK-s)Eaku zA9j&vb#Yuetl*l?VGc0H(P4MbZkcN$!ltN;oD}_gPVJvwJZvSW$_Q*WI z{0IVbg69v2+BumVG9+pQ22pt4Jp z%D3|xamf{-9Q8m=5U2OyO%}}J>(?l~;cNZxLo3)ae1D64a5knNl#S?9lgY!6E&jdz M`3r