feat(parsers.parquet): Add Apache Parquet Parser (#15008)
This commit is contained in:
parent
8e505944e3
commit
ba9cbeebb5
|
|
@ -17,6 +17,7 @@ Protocol, JSON format, or Apache Avro format.
|
|||
- [JSON v2](/plugins/parsers/json_v2)
|
||||
- [Logfmt](/plugins/parsers/logfmt)
|
||||
- [Nagios](/plugins/parsers/nagios)
|
||||
- [Parquet](/plugins/parsers/parquet)
|
||||
- [Prometheus](/plugins/parsers/prometheus)
|
||||
- [PrometheusRemoteWrite](/plugins/parsers/prometheusremotewrite)
|
||||
- [Value](/plugins/parsers/value), ie: 45 or "booyah"
|
||||
|
|
|
|||
|
|
@ -43,6 +43,7 @@ following works:
|
|||
- github.com/alitto/pond [MIT License](https://github.com/alitto/pond/blob/master/LICENSE)
|
||||
- github.com/aliyun/alibaba-cloud-sdk-go [Apache License 2.0](https://github.com/aliyun/alibaba-cloud-sdk-go/blob/master/LICENSE)
|
||||
- github.com/amir/raidman [The Unlicense](https://github.com/amir/raidman/blob/master/UNLICENSE)
|
||||
- github.com/andybalholm/brotli [MIT License](https://github.com/andybalholm/brotli/blob/master/LICENSE)
|
||||
- github.com/antchfx/jsonquery [MIT License](https://github.com/antchfx/jsonquery/blob/master/LICENSE)
|
||||
- github.com/antchfx/xmlquery [MIT License](https://github.com/antchfx/xmlquery/blob/master/LICENSE)
|
||||
- github.com/antchfx/xpath [MIT License](https://github.com/antchfx/xpath/blob/master/LICENSE)
|
||||
|
|
|
|||
17
go.mod
17
go.mod
|
|
@ -35,8 +35,9 @@ require (
|
|||
github.com/antchfx/xmlquery v1.3.18
|
||||
github.com/antchfx/xpath v1.2.5
|
||||
github.com/apache/arrow/go/v13 v13.0.0
|
||||
github.com/apache/arrow/go/v16 v16.0.0-20240319161736-1ee3da0064a0
|
||||
github.com/apache/iotdb-client-go v1.2.0-tsbs
|
||||
github.com/apache/thrift v0.18.1
|
||||
github.com/apache/thrift v0.19.0
|
||||
github.com/aristanetworks/goarista v0.0.0-20190325233358-a123909ec740
|
||||
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5
|
||||
github.com/awnumar/memguard v0.22.4
|
||||
|
|
@ -220,7 +221,7 @@ require (
|
|||
k8s.io/apimachinery v0.29.3
|
||||
k8s.io/client-go v0.29.3
|
||||
layeh.com/radius v0.0.0-20221205141417-e7fbddd11d68
|
||||
modernc.org/sqlite v1.29.2
|
||||
modernc.org/sqlite v1.29.5
|
||||
)
|
||||
|
||||
require (
|
||||
|
|
@ -257,6 +258,7 @@ require (
|
|||
github.com/Microsoft/go-winio v0.6.1 // indirect
|
||||
github.com/Microsoft/hcsshim v0.11.4 // indirect
|
||||
github.com/alecthomas/participle v0.4.1 // indirect
|
||||
github.com/andybalholm/brotli v1.1.0 // indirect
|
||||
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df // indirect
|
||||
github.com/apache/arrow/go/v14 v14.0.2 // indirect
|
||||
github.com/aristanetworks/glog v0.0.0-20191112221043-67e8567f59f3 // indirect
|
||||
|
|
@ -334,7 +336,7 @@ require (
|
|||
github.com/golang-sql/sqlexp v0.1.0 // indirect
|
||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
|
||||
github.com/golang/protobuf v1.5.4 // indirect
|
||||
github.com/google/flatbuffers v23.5.26+incompatible // indirect
|
||||
github.com/google/flatbuffers v24.3.7+incompatible // indirect
|
||||
github.com/google/gnostic-models v0.6.8 // indirect
|
||||
github.com/google/go-querystring v1.1.0 // indirect
|
||||
github.com/google/gofuzz v1.2.0 // indirect
|
||||
|
|
@ -375,7 +377,8 @@ require (
|
|||
github.com/josharian/native v1.1.0 // indirect
|
||||
github.com/jpillora/backoff v1.0.0 // indirect
|
||||
github.com/json-iterator/go v1.1.12 // indirect
|
||||
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
|
||||
github.com/klauspost/asmfmt v1.3.2 // indirect
|
||||
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
|
||||
github.com/kr/fs v0.1.0 // indirect
|
||||
github.com/kylelemons/godebug v1.1.0 // indirect
|
||||
github.com/leodido/ragel-machinery v0.0.0-20181214104525-299bdde78165 // indirect
|
||||
|
|
@ -388,6 +391,8 @@ require (
|
|||
github.com/mdlayher/genetlink v1.2.0 // indirect
|
||||
github.com/mdlayher/netlink v1.7.2 // indirect
|
||||
github.com/mdlayher/socket v0.4.1 // indirect
|
||||
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
|
||||
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
|
||||
github.com/minio/highwayhash v1.0.2 // indirect
|
||||
github.com/mitchellh/copystructure v1.2.0 // indirect
|
||||
github.com/mitchellh/go-homedir v1.1.0 // indirect
|
||||
|
|
@ -473,9 +478,9 @@ require (
|
|||
go.uber.org/atomic v1.11.0 // indirect
|
||||
go.uber.org/multierr v1.11.0 // indirect
|
||||
go.uber.org/zap v1.24.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20231219180239-dc181d75b848 // indirect
|
||||
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 // indirect
|
||||
golang.org/x/time v0.5.0 // indirect
|
||||
golang.org/x/tools v0.17.0 // indirect
|
||||
golang.org/x/tools v0.19.0 // indirect
|
||||
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
|
||||
golang.zx2c4.com/wireguard v0.0.0-20211209221555-9c9e7e272434 // indirect
|
||||
google.golang.org/genproto v0.0.0-20240205150955-31a09d347014 // indirect
|
||||
|
|
|
|||
37
go.sum
37
go.sum
|
|
@ -781,6 +781,8 @@ github.com/amir/raidman v0.0.0-20170415203553-1ccc43bfb9c9 h1:FXrPTd8Rdlc94dKccl
|
|||
github.com/amir/raidman v0.0.0-20170415203553-1ccc43bfb9c9/go.mod h1:eliMa/PW+RDr2QLWRmLH1R1ZA4RInpmvOzDDXtaIZkc=
|
||||
github.com/andybalholm/brotli v1.0.0/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y=
|
||||
github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
|
||||
github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
|
||||
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
|
||||
github.com/antchfx/jsonquery v1.3.3 h1:zjZpbnZhYng3uOAbIfdNq81A9mMEeuDJeYIpeKpZ4es=
|
||||
github.com/antchfx/jsonquery v1.3.3/go.mod h1:1JG4DqRlRCHgVYDPY1ioYFAGSXGfWHzNgrbiGQHsWck=
|
||||
github.com/antchfx/xmlquery v1.3.18 h1:FSQ3wMuphnPPGJOFhvc+cRQ2CT/rUj4cyQXkJcjOwz0=
|
||||
|
|
@ -798,12 +800,14 @@ github.com/apache/arrow/go/v13 v13.0.0 h1:kELrvDQuKZo8csdWYqBQfyi431x6Zs/YJTEgUu
|
|||
github.com/apache/arrow/go/v13 v13.0.0/go.mod h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc=
|
||||
github.com/apache/arrow/go/v14 v14.0.2 h1:N8OkaJEOfI3mEZt07BIkvo4sC6XDbL+48MBPWO5IONw=
|
||||
github.com/apache/arrow/go/v14 v14.0.2/go.mod h1:u3fgh3EdgN/YQ8cVQRguVW3R+seMybFg8QBQ5LU+eBY=
|
||||
github.com/apache/arrow/go/v16 v16.0.0-20240319161736-1ee3da0064a0 h1:XbC214lVvnAnDzowGV7dYiv4f4Aa6jhtIby08OgbcUg=
|
||||
github.com/apache/arrow/go/v16 v16.0.0-20240319161736-1ee3da0064a0/go.mod h1:VVbdJivCXZAJ6IhOSCSzk/RVQ/PlcitjskAWEST3Sc0=
|
||||
github.com/apache/iotdb-client-go v1.2.0-tsbs h1:hezGUydAkDSceCvsetYorI87S2e8HZ4hTQHmGZgOGDY=
|
||||
github.com/apache/iotdb-client-go v1.2.0-tsbs/go.mod h1:3D6QYkqRmASS/4HsjU+U/3fscyc5M9xKRfywZsKuoZY=
|
||||
github.com/apache/thrift v0.15.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU=
|
||||
github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU=
|
||||
github.com/apache/thrift v0.18.1 h1:lNhK/1nqjbwbiOPDBPFJVKxgDEGSepKuTh6OLiXW8kg=
|
||||
github.com/apache/thrift v0.18.1/go.mod h1:rdQn/dCcDKEWjjylUeueum4vQEjG2v8v2PqriUnbr+I=
|
||||
github.com/apache/thrift v0.19.0 h1:sOqkWPzMj7w6XaYbJQG7m4sGqVolaW/0D28Ln7yPzMk=
|
||||
github.com/apache/thrift v0.19.0/go.mod h1:SUALL216IiaOw2Oy+5Vs9lboJ/t9g40C+G07Dc0QC1I=
|
||||
github.com/apex/log v1.6.0/go.mod h1:x7s+P9VtvFBXge9Vbn+8TrqKmuzmD35TTkeBHul8UtY=
|
||||
github.com/apex/logs v1.0.0/go.mod h1:XzxuLZ5myVHDy9SAmYpamKKRNApGj54PfYLcFrXqDwo=
|
||||
github.com/aphistic/golf v0.0.0-20180712155816-02c07f170c5a/go.mod h1:3NqKYiepwy8kCu4PNA+aP7WUV72eXWJeP9/r3/K9aLE=
|
||||
|
|
@ -1312,8 +1316,8 @@ github.com/google/cel-go v0.18.1 h1:V/lAXKq4C3BYLDy/ARzMtpkEEYfHQpZzVyzy69nEUjs=
|
|||
github.com/google/cel-go v0.18.1/go.mod h1:PVAybmSnWkNMUZR/tEWFUiJ1Np4Hz0MHsZJcgC4zln4=
|
||||
github.com/google/flatbuffers v1.12.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
|
||||
github.com/google/flatbuffers v2.0.8+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
|
||||
github.com/google/flatbuffers v23.5.26+incompatible h1:M9dgRyhJemaM4Sw8+66GHBu8ioaQmyPLg1b8VwK5WJg=
|
||||
github.com/google/flatbuffers v23.5.26+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
|
||||
github.com/google/flatbuffers v24.3.7+incompatible h1:BxGUkIQnOciBu33bd5BdvqY8Qvo0O/GR4SPhh7x9Ed0=
|
||||
github.com/google/flatbuffers v24.3.7+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
|
||||
github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I=
|
||||
github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U=
|
||||
github.com/google/gnxi v0.0.0-20231026134436-d82d9936af15 h1:EETGSLGKBReUUYZdztSp45EzTE6CHw2qMKIfyPrgp6c=
|
||||
|
|
@ -1676,6 +1680,7 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW
|
|||
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE=
|
||||
github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4=
|
||||
github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE=
|
||||
github.com/klauspost/compress v1.4.1/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
|
||||
github.com/klauspost/compress v1.10.10/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
|
||||
|
|
@ -1684,8 +1689,8 @@ github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLA
|
|||
github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
|
||||
github.com/klauspost/cpuid v1.2.0/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
|
||||
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
|
||||
github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc=
|
||||
github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
|
||||
github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM=
|
||||
github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
|
||||
github.com/klauspost/pgzip v1.2.4/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
|
||||
github.com/klauspost/pgzip v1.2.5/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
|
||||
github.com/klauspost/pgzip v1.2.6 h1:8RXeL5crjEUFnR2/Sn6GJNWtSQ3Dk8pq4CL3jvdDyjU=
|
||||
|
|
@ -1770,8 +1775,8 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE
|
|||
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||
github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
|
||||
github.com/mattn/go-sqlite3 v1.14.14/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
|
||||
github.com/mattn/go-sqlite3 v1.14.19 h1:fhGleo2h1p8tVChob4I9HpmVFIAkKGpiukdrgQbWfGI=
|
||||
github.com/mattn/go-sqlite3 v1.14.19/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
|
||||
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
|
||||
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
|
||||
github.com/mdlayher/apcupsd v0.0.0-20220319200143-473c7b5f3c6a h1:JOlLsLUQnokTyWWwEvOVoKH3XUl6oDMP8jisO54l6J8=
|
||||
github.com/mdlayher/apcupsd v0.0.0-20220319200143-473c7b5f3c6a/go.mod h1:960H6oqSawdujauTeLX9BOx+ZdYX0TdG9xE9br5bino=
|
||||
|
|
@ -1817,7 +1822,9 @@ github.com/miekg/dns v1.1.58 h1:ca2Hdkz+cDg/7eNF6V56jjzuZ4aCAE+DbVkILdQWG/4=
|
|||
github.com/miekg/dns v1.1.58/go.mod h1:Ypv+3b/KadlvW9vJfXOTf300O4UqaHFzFCuHz+rPkBY=
|
||||
github.com/mikioh/ipaddr v0.0.0-20190404000644-d465c8ab6721 h1:RlZweED6sbSArvlE924+mUcZuXKLBHA35U7LN621Bws=
|
||||
github.com/mikioh/ipaddr v0.0.0-20190404000644-d465c8ab6721/go.mod h1:Ickgr2WtCLZ2MDGd4Gr0geeCH5HybhRJbonOgQpvSxc=
|
||||
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
|
||||
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY=
|
||||
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI=
|
||||
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE=
|
||||
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
|
||||
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
|
||||
|
|
@ -2425,8 +2432,8 @@ golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EH
|
|||
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
|
||||
golang.org/x/exp v0.0.0-20200513190911-00229845015e/go.mod h1:4M0jN8W1tt0AVLNr8HDosyJCDCDuyL9N9+3m7wDWgKw=
|
||||
golang.org/x/exp v0.0.0-20220827204233-334a2380cb91/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE=
|
||||
golang.org/x/exp v0.0.0-20231219180239-dc181d75b848 h1:+iq7lrkxmFNBM7xx+Rae2W6uyPfhPeDWD+n+JgppptE=
|
||||
golang.org/x/exp v0.0.0-20231219180239-dc181d75b848/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI=
|
||||
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 h1:LfspQV/FYTatPTr/3HzIcmiUFH7PGP+OQ6mgDYo3yuQ=
|
||||
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc=
|
||||
golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs=
|
||||
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
|
||||
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
|
||||
|
|
@ -2873,8 +2880,8 @@ golang.org/x/tools v0.2.0/go.mod h1:y4OqIKeOV/fWJetJ8bXPU1sEVniLMIyDAZWeHdV+NTA=
|
|||
golang.org/x/tools v0.3.0/go.mod h1:/rWhSS2+zyEVwoJf8YAX6L2f0ntZ7Kn/mGgAWcipA5k=
|
||||
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
|
||||
golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s=
|
||||
golang.org/x/tools v0.17.0 h1:FvmRgNOcs3kOa+T20R1uhfP9F6HgG2mfxDv1vrx1Htc=
|
||||
golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps=
|
||||
golang.org/x/tools v0.19.0 h1:tfGCXNR1OsFG+sVdLAitlpjAvD/I6dHDKnYrpEZUHkw=
|
||||
golang.org/x/tools v0.19.0/go.mod h1:qoJWxmGSIBmAeriMx19ogtrEPrGtDbPK634QFIcLAhc=
|
||||
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
|
|
@ -3267,6 +3274,8 @@ modernc.org/ccgo/v3 v3.16.6/go.mod h1:tGtX0gE9Jn7hdZFeU88slbTh1UtCYKusWOoCJuvkWs
|
|||
modernc.org/ccgo/v3 v3.16.8/go.mod h1:zNjwkizS+fIFDrDjIAgBSCLkWbJuHF+ar3QRn+Z9aws=
|
||||
modernc.org/ccgo/v3 v3.16.9/go.mod h1:zNMzC9A9xeNUepy6KuZBbugn3c0Mc9TeiJO4lgvkJDo=
|
||||
modernc.org/ccorpus v1.11.6/go.mod h1:2gEUTrWqdpH2pXsmTM1ZkjeSrUWDpjMu2T6m29L/ErQ=
|
||||
modernc.org/fileutil v1.3.0 h1:gQ5SIzK3H9kdfai/5x41oQiKValumqNTDXMvKo62HvE=
|
||||
modernc.org/fileutil v1.3.0/go.mod h1:XatxS8fZi3pS8/hKG2GH/ArUogfxjpEKs3Ku3aK4JyQ=
|
||||
modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 h1:5D53IMaUuA5InSeMu9eJtlQXS2NxAhyWQvkKEgXZhHI=
|
||||
modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6/go.mod h1:Qz0X07sNOR1jWYCrJMEnbW/X55x206Q7Vt4mz6/wHp4=
|
||||
modernc.org/httpfs v1.0.6/go.mod h1:7dosgurJGp0sPaRanU53W4xZYKh14wfzX420oZADeHM=
|
||||
|
|
@ -3292,8 +3301,8 @@ modernc.org/memory v1.7.2/go.mod h1:NO4NVCQy0N7ln+T9ngWqOQfi7ley4vpwvARR+Hjw95E=
|
|||
modernc.org/opt v0.1.1/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0=
|
||||
modernc.org/opt v0.1.3/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0=
|
||||
modernc.org/sqlite v1.18.1/go.mod h1:6ho+Gow7oX5V+OiOQ6Tr4xeqbx13UZ6t+Fw9IRUG4d4=
|
||||
modernc.org/sqlite v1.29.2 h1:xgBSyA3gemwgP31PWFfFjtBorQNYpeypGdoSDjXhrgI=
|
||||
modernc.org/sqlite v1.29.2/go.mod h1:hG41jCYxOAOoO6BRK66AdRlmOcDzXf7qnwlwjUIOqa0=
|
||||
modernc.org/sqlite v1.29.5 h1:8l/SQKAjDtZFo9lkJLdk8g9JEOeYRG4/ghStDCCTiTE=
|
||||
modernc.org/sqlite v1.29.5/go.mod h1:S02dvcmm7TnTRvGhv8IGYyLnIt7AS2KPaB1F/71p75U=
|
||||
modernc.org/strutil v1.1.1/go.mod h1:DE+MQQ/hjKBZS2zNInV5hhcipt5rLPWkmpbGeW5mmdw=
|
||||
modernc.org/strutil v1.1.3/go.mod h1:MEHNA7PdEnEwLvspRMtWTNnp2nnyvMfkimT1NKNAGbw=
|
||||
modernc.org/strutil v1.2.0 h1:agBi9dp1I+eOnxXeiZawM8F4LawKv4NzGWSaLfyeNZA=
|
||||
|
|
|
|||
|
|
@ -0,0 +1,5 @@
|
|||
//go:build !custom || parsers || parsers.parquet
|
||||
|
||||
package all
|
||||
|
||||
import _ "github.com/influxdata/telegraf/plugins/parsers/parquet" // register plugin
|
||||
|
|
@ -0,0 +1,51 @@
|
|||
# Parquet Parser Plugin
|
||||
|
||||
The Parquet parser allows for the parsing of Parquet files that were read in.
|
||||
|
||||
## Configuration
|
||||
|
||||
```toml
|
||||
[[inputs.file]]
|
||||
files = ["example"]
|
||||
|
||||
## Data format to consume.
|
||||
## Each data format has its own unique set of configuration options, read
|
||||
## more about them here:
|
||||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
||||
data_format = "parquet"
|
||||
|
||||
## Tag column is an array of columns that should be added as tags.
|
||||
# tag_columns = []
|
||||
|
||||
## Name column is the column to use as the measurement name.
|
||||
# measurement_column = ""
|
||||
|
||||
## Timestamp column is the column containing the time that should be used to
|
||||
## create the metric. If not set, then the time of parsing is used.
|
||||
# timestamp_column = ""
|
||||
|
||||
## Timestamp format is the time layout that should be used to interpret the
|
||||
## timestamp_column. The time must be `unix`, `unix_ms`, `unix_us`, `unix_ns`,
|
||||
## or a time in the "reference time". To define a different format, arrange
|
||||
## the values from the "reference time" in the example to match the format
|
||||
## you will be using. For more information on the "reference time", visit
|
||||
## https://golang.org/pkg/time/#Time.Format
|
||||
## ex: timestamp_format = "Mon Jan 2 15:04:05 -0700 MST 2006"
|
||||
## timestamp_format = "2006-01-02T15:04:05Z07:00"
|
||||
## timestamp_format = "01/02/2006 15:04:05"
|
||||
## timestamp_format = "unix"
|
||||
## timestamp_format = "unix_ms"
|
||||
# timestamp_format = ""
|
||||
|
||||
## Timezone allows you to provide an override for timestamps that
|
||||
## do not already include an offset
|
||||
## e.g. 04/06/2016 12:41:45
|
||||
##
|
||||
## Default: "" which renders UTC
|
||||
## Options are as follows:
|
||||
## 1. Local -- interpret based on machine localtime
|
||||
## 2. "America/New_York" -- Unix TZ values like those found in
|
||||
## https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
|
||||
## 3. UTC -- or blank/unspecified, will return timestamp in UTC
|
||||
# timestamp_timezone = ""
|
||||
```
|
||||
|
|
@ -0,0 +1,126 @@
|
|||
package parquet
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
|
||||
"github.com/apache/arrow/go/v16/parquet"
|
||||
"github.com/apache/arrow/go/v16/parquet/file"
|
||||
)
|
||||
|
||||
func newColumnParser(reader file.ColumnChunkReader) *columnParser {
|
||||
batchSize := 128
|
||||
|
||||
var valueBuffer interface{}
|
||||
switch reader.(type) {
|
||||
case *file.BooleanColumnChunkReader:
|
||||
valueBuffer = make([]bool, batchSize)
|
||||
case *file.Int32ColumnChunkReader:
|
||||
valueBuffer = make([]int32, batchSize)
|
||||
case *file.Int64ColumnChunkReader:
|
||||
valueBuffer = make([]int64, batchSize)
|
||||
case *file.Float32ColumnChunkReader:
|
||||
valueBuffer = make([]float32, batchSize)
|
||||
case *file.Float64ColumnChunkReader:
|
||||
valueBuffer = make([]float64, batchSize)
|
||||
case *file.ByteArrayColumnChunkReader:
|
||||
valueBuffer = make([]parquet.ByteArray, batchSize)
|
||||
case *file.FixedLenByteArrayColumnChunkReader:
|
||||
valueBuffer = make([]parquet.FixedLenByteArray, batchSize)
|
||||
}
|
||||
|
||||
return &columnParser{
|
||||
name: reader.Descriptor().Name(),
|
||||
reader: reader,
|
||||
batchSize: int64(batchSize),
|
||||
defLevels: make([]int16, batchSize),
|
||||
repLevels: make([]int16, batchSize),
|
||||
valueBuffer: valueBuffer,
|
||||
}
|
||||
}
|
||||
|
||||
type columnParser struct {
|
||||
name string
|
||||
reader file.ColumnChunkReader
|
||||
batchSize int64
|
||||
valueOffset int
|
||||
valuesBuffered int
|
||||
|
||||
levelOffset int64
|
||||
levelsBuffered int64
|
||||
defLevels []int16
|
||||
repLevels []int16
|
||||
|
||||
valueBuffer interface{}
|
||||
}
|
||||
|
||||
func (c *columnParser) readNextBatch() error {
|
||||
var err error
|
||||
|
||||
switch reader := c.reader.(type) {
|
||||
case *file.BooleanColumnChunkReader:
|
||||
values := c.valueBuffer.([]bool)
|
||||
c.levelsBuffered, c.valuesBuffered, err = reader.ReadBatch(c.batchSize, values, c.defLevels, c.repLevels)
|
||||
case *file.Int32ColumnChunkReader:
|
||||
values := c.valueBuffer.([]int32)
|
||||
c.levelsBuffered, c.valuesBuffered, err = reader.ReadBatch(c.batchSize, values, c.defLevels, c.repLevels)
|
||||
case *file.Int64ColumnChunkReader:
|
||||
values := c.valueBuffer.([]int64)
|
||||
c.levelsBuffered, c.valuesBuffered, err = reader.ReadBatch(c.batchSize, values, c.defLevels, c.repLevels)
|
||||
case *file.Float32ColumnChunkReader:
|
||||
values := c.valueBuffer.([]float32)
|
||||
c.levelsBuffered, c.valuesBuffered, err = reader.ReadBatch(c.batchSize, values, c.defLevels, c.repLevels)
|
||||
case *file.Float64ColumnChunkReader:
|
||||
values := c.valueBuffer.([]float64)
|
||||
c.levelsBuffered, c.valuesBuffered, err = reader.ReadBatch(c.batchSize, values, c.defLevels, c.repLevels)
|
||||
case *file.ByteArrayColumnChunkReader:
|
||||
values := c.valueBuffer.([]parquet.ByteArray)
|
||||
c.levelsBuffered, c.valuesBuffered, err = reader.ReadBatch(c.batchSize, values, c.defLevels, c.repLevels)
|
||||
case *file.FixedLenByteArrayColumnChunkReader:
|
||||
values := c.valueBuffer.([]parquet.FixedLenByteArray)
|
||||
c.levelsBuffered, c.valuesBuffered, err = reader.ReadBatch(c.batchSize, values, c.defLevels, c.repLevels)
|
||||
}
|
||||
|
||||
c.valueOffset = 0
|
||||
c.levelOffset = 0
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *columnParser) HasNext() bool {
|
||||
return c.levelOffset < c.levelsBuffered || c.reader.HasNext()
|
||||
}
|
||||
|
||||
func (c *columnParser) Next() (interface{}, bool) {
|
||||
if c.levelOffset == c.levelsBuffered {
|
||||
if !c.HasNext() {
|
||||
return nil, false
|
||||
}
|
||||
if err := c.readNextBatch(); err != nil {
|
||||
return nil, false
|
||||
}
|
||||
if c.levelsBuffered == 0 {
|
||||
return nil, false
|
||||
}
|
||||
}
|
||||
|
||||
defLevel := c.defLevels[int(c.levelOffset)]
|
||||
c.levelOffset++
|
||||
|
||||
if defLevel < c.reader.Descriptor().MaxDefinitionLevel() {
|
||||
return nil, true
|
||||
}
|
||||
|
||||
vb := reflect.ValueOf(c.valueBuffer)
|
||||
val := vb.Index(c.valueOffset).Interface()
|
||||
c.valueOffset++
|
||||
|
||||
// Convert byte arrays to strings
|
||||
switch v := val.(type) {
|
||||
case parquet.ByteArray:
|
||||
val = string(v)
|
||||
case parquet.FixedLenByteArray:
|
||||
val = string(v)
|
||||
}
|
||||
|
||||
return val, true
|
||||
}
|
||||
|
|
@ -0,0 +1,149 @@
|
|||
package parquet
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"slices"
|
||||
"time"
|
||||
|
||||
"github.com/apache/arrow/go/v16/parquet/file"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
)
|
||||
|
||||
type Parser struct {
|
||||
MeasurementColumn string `toml:"measurement_column"`
|
||||
TagColumns []string `toml:"tag_columns"`
|
||||
TimestampColumn string `toml:"timestamp_column"`
|
||||
TimestampFormat string `toml:"timestamp_format"`
|
||||
TimestampTimezone string `toml:"timestamp_timezone"`
|
||||
|
||||
defaultTags map[string]string
|
||||
location *time.Location
|
||||
metricName string
|
||||
}
|
||||
|
||||
func (p *Parser) Init() error {
|
||||
if p.TimestampFormat == "" {
|
||||
p.TimestampFormat = "unix"
|
||||
}
|
||||
if p.TimestampTimezone == "" {
|
||||
p.location = time.UTC
|
||||
} else {
|
||||
loc, err := time.LoadLocation(p.TimestampTimezone)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid location %s: %w", p.TimestampTimezone, err)
|
||||
}
|
||||
p.location = loc
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
|
||||
reader := bytes.NewReader(buf)
|
||||
parquetReader, err := file.NewParquetReader(reader)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to create parquet reader: %w", err)
|
||||
}
|
||||
metadata := parquetReader.MetaData()
|
||||
|
||||
now := time.Now()
|
||||
metrics := make([]telegraf.Metric, 0, metadata.NumRows)
|
||||
for i := 0; i < parquetReader.NumRowGroups(); i++ {
|
||||
rowGroup := parquetReader.RowGroup(i)
|
||||
scanners := make([]*columnParser, metadata.Schema.NumColumns())
|
||||
for colIndex := range metadata.Schema.NumColumns() {
|
||||
col, err := rowGroup.Column(colIndex)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to fetch column %q: %w", colIndex, err)
|
||||
}
|
||||
|
||||
scanners[colIndex] = newColumnParser(col)
|
||||
}
|
||||
|
||||
rowIndex := 0
|
||||
rowGroupMetrics := make([]telegraf.Metric, rowGroup.NumRows())
|
||||
for _, s := range scanners {
|
||||
for s.HasNext() {
|
||||
if rowIndex%int(rowGroup.NumRows()) == 0 {
|
||||
rowIndex = 0
|
||||
}
|
||||
|
||||
val, ok := s.Next()
|
||||
if !ok || val == nil {
|
||||
rowIndex++
|
||||
continue
|
||||
}
|
||||
|
||||
if rowGroupMetrics[rowIndex] == nil {
|
||||
rowGroupMetrics[rowIndex] = metric.New(p.metricName, p.defaultTags, nil, now)
|
||||
}
|
||||
|
||||
if p.MeasurementColumn != "" && s.name == p.MeasurementColumn {
|
||||
valStr, err := internal.ToString(val)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not convert value to string: %w", err)
|
||||
}
|
||||
rowGroupMetrics[rowIndex].SetName(valStr)
|
||||
} else if p.TagColumns != nil && slices.Contains(p.TagColumns, s.name) {
|
||||
valStr, err := internal.ToString(val)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not convert value to string: %w", err)
|
||||
}
|
||||
rowGroupMetrics[rowIndex].AddTag(s.name, valStr)
|
||||
} else if p.TimestampColumn != "" && s.name == p.TimestampColumn {
|
||||
valStr, err := internal.ToString(val)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not convert value to string: %w", err)
|
||||
}
|
||||
timestamp, err := internal.ParseTimestamp(p.TimestampFormat, valStr, p.location)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not parse '%s' to '%s'", valStr, p.TimestampFormat)
|
||||
}
|
||||
rowGroupMetrics[rowIndex].SetTime(timestamp)
|
||||
} else {
|
||||
rowGroupMetrics[rowIndex].AddField(s.name, val)
|
||||
}
|
||||
|
||||
rowIndex++
|
||||
}
|
||||
}
|
||||
|
||||
metrics = append(metrics, rowGroupMetrics...)
|
||||
}
|
||||
|
||||
return metrics, nil
|
||||
}
|
||||
|
||||
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
|
||||
metrics, err := p.Parse([]byte(line))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(metrics) < 1 {
|
||||
return nil, nil
|
||||
}
|
||||
if len(metrics) > 1 {
|
||||
return nil, errors.New("line contains multiple metrics")
|
||||
}
|
||||
|
||||
return metrics[0], nil
|
||||
}
|
||||
|
||||
func (p *Parser) SetDefaultTags(tags map[string]string) {
|
||||
p.defaultTags = tags
|
||||
}
|
||||
|
||||
func init() {
|
||||
parsers.Add("parquet",
|
||||
func(defaultMetricName string) telegraf.Parser {
|
||||
return &Parser{metricName: defaultMetricName}
|
||||
},
|
||||
)
|
||||
}
|
||||
|
|
@ -0,0 +1,74 @@
|
|||
package parquet
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
test "github.com/influxdata/telegraf/testutil/plugin_input"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestCases(t *testing.T) {
|
||||
folders, err := os.ReadDir("testcases")
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, folders)
|
||||
|
||||
for _, f := range folders {
|
||||
testcasePath := filepath.Join("testcases", f.Name())
|
||||
configFilename := filepath.Join(testcasePath, "telegraf.conf")
|
||||
t.Run(f.Name(), func(t *testing.T) {
|
||||
// Configure the plugin
|
||||
cfg := config.NewConfig()
|
||||
require.NoError(t, cfg.LoadConfig(configFilename))
|
||||
require.NoError(t, err)
|
||||
require.Len(t, cfg.Inputs, 1)
|
||||
|
||||
// Tune the test-plugin
|
||||
plugin := cfg.Inputs[0].Input.(*test.Plugin)
|
||||
plugin.Path = testcasePath
|
||||
require.NoError(t, plugin.Init())
|
||||
|
||||
// Gather the metrics and check for potential errors
|
||||
var acc testutil.Accumulator
|
||||
err := plugin.Gather(&acc)
|
||||
switch len(plugin.ExpectedErrors) {
|
||||
case 0:
|
||||
require.NoError(t, err)
|
||||
case 1:
|
||||
require.ErrorContains(t, err, plugin.ExpectedErrors[0])
|
||||
default:
|
||||
require.Contains(t, plugin.ExpectedErrors, err.Error())
|
||||
}
|
||||
|
||||
// Determine checking options
|
||||
options := []cmp.Option{
|
||||
cmpopts.EquateApprox(0, 1e-6),
|
||||
testutil.SortMetrics(),
|
||||
}
|
||||
if plugin.ShouldIgnoreTimestamp {
|
||||
options = append(options, testutil.IgnoreTime())
|
||||
}
|
||||
|
||||
// Process expected metrics and compare with resulting metrics
|
||||
actual := acc.GetTelegrafMetrics()
|
||||
testutil.RequireMetricsEqual(t, plugin.Expected, actual, options...)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkParsing(b *testing.B) {
|
||||
plugin := &Parser{}
|
||||
|
||||
benchmarkData, err := os.ReadFile("testcases/benchmark/input.parquet")
|
||||
require.NoError(b, err)
|
||||
|
||||
b.ResetTimer()
|
||||
for n := 0; n < b.N; n++ {
|
||||
_, _ = plugin.Parse(benchmarkData)
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1 @@
|
|||
test value=42i 1710683608143228692
|
||||
|
|
@ -0,0 +1,11 @@
|
|||
#!/usr/bin/env python
|
||||
import pandas
|
||||
import pyarrow
|
||||
import pyarrow.parquet
|
||||
|
||||
df = pandas.DataFrame({
|
||||
'value': [42],
|
||||
'timestamp': ["1710683608143228692"]
|
||||
})
|
||||
|
||||
pyarrow.parquet.write_table(pyarrow.Table.from_pandas(df), "input.parquet")
|
||||
Binary file not shown.
|
|
@ -0,0 +1,6 @@
|
|||
[[inputs.test]]
|
||||
files = ["input.parquet"]
|
||||
data_format = "parquet"
|
||||
|
||||
timestamp_column = "timestamp"
|
||||
timestamp_format = "unix_ns"
|
||||
|
|
@ -0,0 +1,2 @@
|
|||
row1,byteArray=Short,fixedLengthByteArray=STRING boolean=true,float32=1,float64=64,int32=-2147483648i,int64=-9223372036854775808i 1710697199000000000
|
||||
row2,byteArray=Much\ longer\ string\ here...,fixedLengthByteArray=FOOBAR boolean=false,float32=1.1234568357467651,float64=65.1234567891212,int32=2147483647i,int64=9223372036854775807i 551812924000000000
|
||||
|
|
@ -0,0 +1,33 @@
|
|||
#!/usr/bin/env python
|
||||
import pandas
|
||||
import pyarrow
|
||||
import pyarrow.parquet
|
||||
|
||||
df = pandas.DataFrame({
|
||||
'boolean': [True, False],
|
||||
'string': ["row1", "row2"],
|
||||
'int32': [-2147483648, 2147483647],
|
||||
'int64': [-9223372036854775808, 9223372036854775807],
|
||||
'float32': [1.000000001, 1.123456789],
|
||||
'float64': [64.00000000000000001, 65.12345678912121212],
|
||||
'byteArray': ["Short", "Much longer string here..."],
|
||||
'fixedLengthByteArray': ["STRING", "FOOBAR"],
|
||||
'timestamp': [
|
||||
"Sun, 17 Mar 2024 10:39:59 MST",
|
||||
"Sat, 27 Jun 1987 10:22:04 MST",
|
||||
]
|
||||
})
|
||||
|
||||
schema = pyarrow.schema([
|
||||
pyarrow.field('boolean', pyarrow.bool_()),
|
||||
pyarrow.field('string', pyarrow.string()),
|
||||
pyarrow.field('int32', pyarrow.int32()),
|
||||
pyarrow.field('int64', pyarrow.int64()),
|
||||
pyarrow.field('float32', pyarrow.float32()),
|
||||
pyarrow.field('float64', pyarrow.float64()),
|
||||
pyarrow.field('byteArray', pyarrow.binary()),
|
||||
pyarrow.field('fixedLengthByteArray', pyarrow.binary(6)),
|
||||
pyarrow.field('timestamp', pyarrow.binary())
|
||||
])
|
||||
|
||||
pyarrow.parquet.write_table(pyarrow.Table.from_pandas(df, schema), "input.parquet")
|
||||
Binary file not shown.
|
|
@ -0,0 +1,9 @@
|
|||
[[inputs.test]]
|
||||
files = ["input.parquet"]
|
||||
data_format = "parquet"
|
||||
|
||||
measurement_column = "string"
|
||||
tag_columns = ["byteArray", "fixedLengthByteArray"]
|
||||
timestamp_column = "timestamp"
|
||||
timestamp_format = "Mon, 02 Jan 2006 15:04:05 MST"
|
||||
timestamp_timezone = "MST"
|
||||
|
|
@ -0,0 +1,7 @@
|
|||
a,tag=row1 float_field=64 1710683695000000000
|
||||
b,tag=row1 float_field=65 1710683695000000000
|
||||
c,tag=row1 float_field=66 1710683695000000000
|
||||
d,tag=row1 float_field=67 1710683695000000000
|
||||
e,tag=row1 float_field=68 1710683695000000000
|
||||
f,tag=row1 float_field=69 1710683695000000000
|
||||
g,tag=row1 float_field=70 1710683695000000000
|
||||
|
|
@ -0,0 +1,16 @@
|
|||
#!/usr/bin/env python
|
||||
import pandas
|
||||
import pyarrow
|
||||
import pyarrow.parquet
|
||||
|
||||
df = pandas.DataFrame({
|
||||
'tag': ["row1", "row1", "row1", "row1", "row1", "row1", "row1"],
|
||||
'float_field': [64.0, 65.0, 66.0, 67.0, 68.0, 69.0, 70.0],
|
||||
'str_field': ["a", "b", "c", "d", "e", "f", "g"],
|
||||
'timestamp': [
|
||||
1710683695, 1710683695, 1710683695, 1710683695, 1710683695,
|
||||
1710683695, 1710683695,
|
||||
]
|
||||
})
|
||||
|
||||
pyarrow.parquet.write_table(pyarrow.Table.from_pandas(df), "input.parquet")
|
||||
Binary file not shown.
|
|
@ -0,0 +1,8 @@
|
|||
[[inputs.test]]
|
||||
files = ["input.parquet"]
|
||||
data_format = "parquet"
|
||||
|
||||
measurement_column = "str_field"
|
||||
tag_columns = ["tag"]
|
||||
timestamp_column = "timestamp"
|
||||
timestamp_format = "unix"
|
||||
|
|
@ -0,0 +1,6 @@
|
|||
#!/usr/bin/env python
|
||||
import pandas
|
||||
import pyarrow
|
||||
import pyarrow.parquet
|
||||
|
||||
pyarrow.parquet.write_table(pyarrow.Table.from_pandas(pandas.DataFrame()), "input.parquet")
|
||||
Binary file not shown.
|
|
@ -0,0 +1,3 @@
|
|||
[[inputs.test]]
|
||||
files = ["input.parquet"]
|
||||
data_format = "parquet"
|
||||
|
|
@ -0,0 +1,21 @@
|
|||
test,tag=row1 float_field=64 1710683608143228692
|
||||
test,tag=row1 float_field=65 1710683608143228692
|
||||
test,tag=row1 float_field=66 1710683608143228692
|
||||
test,tag=row1 float_field=67 1710683608143228692
|
||||
test,tag=row1 float_field=68 1710683608143228692
|
||||
test,tag=row1 float_field=69 1710683608143228692
|
||||
test,tag=row1 float_field=70 1710683608143228692
|
||||
test,tag=row1 float_field=64 1710683608143228693
|
||||
test,tag=row1 float_field=65 1710683608143228693
|
||||
test,tag=row1 float_field=66 1710683608143228693
|
||||
test,tag=row1 float_field=67 1710683608143228693
|
||||
test,tag=row1 float_field=68 1710683608143228693
|
||||
test,tag=row1 float_field=69 1710683608143228693
|
||||
test,tag=row1 float_field=70 1710683608143228693
|
||||
test,tag=row1 float_field=64 1710683608143228694
|
||||
test,tag=row1 float_field=65 1710683608143228694
|
||||
test,tag=row1 float_field=66 1710683608143228694
|
||||
test,tag=row1 float_field=67 1710683608143228694
|
||||
test,tag=row1 float_field=68 1710683608143228694
|
||||
test,tag=row1 float_field=69 1710683608143228694
|
||||
test,tag=row1 float_field=70 1710683608143228694
|
||||
|
|
@ -0,0 +1,39 @@
|
|||
#!/usr/bin/env python
|
||||
import pandas
|
||||
import pyarrow
|
||||
import pyarrow.parquet
|
||||
|
||||
df1 = pandas.DataFrame({
|
||||
'tag': ["row1", "row1", "row1", "row1", "row1", "row1", "row1"],
|
||||
'float_field': [64.0, 65.0, 66.0, 67.0, 68.0, 69.0, 70.0],
|
||||
'timestamp': [
|
||||
"1710683608143228692", "1710683608143228692", "1710683608143228692",
|
||||
"1710683608143228692", "1710683608143228692", "1710683608143228692",
|
||||
"1710683608143228692",
|
||||
]
|
||||
})
|
||||
|
||||
df2 = pandas.DataFrame({
|
||||
'tag': ["row1", "row1", "row1", "row1", "row1", "row1", "row1"],
|
||||
'float_field': [64.0, 65.0, 66.0, 67.0, 68.0, 69.0, 70.0],
|
||||
'timestamp': [
|
||||
"1710683608143228693", "1710683608143228693", "1710683608143228693",
|
||||
"1710683608143228693", "1710683608143228693", "1710683608143228693",
|
||||
"1710683608143228693",
|
||||
]
|
||||
})
|
||||
|
||||
df3 = pandas.DataFrame({
|
||||
'tag': ["row1", "row1", "row1", "row1", "row1", "row1", "row1"],
|
||||
'float_field': [64.0, 65.0, 66.0, 67.0, 68.0, 69.0, 70.0],
|
||||
'timestamp': [
|
||||
"1710683608143228694", "1710683608143228694", "1710683608143228694",
|
||||
"1710683608143228694", "1710683608143228694", "1710683608143228694",
|
||||
"1710683608143228694",
|
||||
]
|
||||
})
|
||||
|
||||
with pyarrow.parquet.ParquetWriter('input.parquet', pyarrow.Table.from_pandas(df1).schema) as writer:
|
||||
writer.write_table(pyarrow.Table.from_pandas(df1))
|
||||
writer.write_table(pyarrow.Table.from_pandas(df2))
|
||||
writer.write_table(pyarrow.Table.from_pandas(df3))
|
||||
Binary file not shown.
|
|
@ -0,0 +1,8 @@
|
|||
[[inputs.test]]
|
||||
files = ["input.parquet"]
|
||||
data_format = "parquet"
|
||||
|
||||
measurement_column = "str_field"
|
||||
tag_columns = ["tag"]
|
||||
timestamp_column = "timestamp"
|
||||
timestamp_format = "unix_ns"
|
||||
|
|
@ -0,0 +1,6 @@
|
|||
test,tag=row1 float_field=64 1709313032000000000
|
||||
test,tag=row2 float_field=65 1709399432000000000
|
||||
test,tag=row3 int_field=65 1709485832000000000
|
||||
test,tag=row4 uint_field=5 1709572232000000000
|
||||
test,tag=row5 bool_field=true 1709658632000000000
|
||||
test,str_field=blargh,tag=multi_field bool_field=false 1709831432000000000
|
||||
|
|
@ -0,0 +1,20 @@
|
|||
#!/usr/bin/env python
|
||||
import pandas
|
||||
import pyarrow
|
||||
import pyarrow.parquet
|
||||
|
||||
df = pandas.DataFrame({
|
||||
'tag': ["row1", "row2", "row3", "row4", "row5", "row6", "multi_field"],
|
||||
'float_field': [64.0, 65.0, None, None, None, None, None],
|
||||
'int_field': [None, None, 65, None, None, None, None],
|
||||
'uint_field': [None, None, None, 5, None, None, None],
|
||||
'bool_field': [None, None, None, None, True, None, False],
|
||||
'str_field': [None, None, None, None, None, "blargh", "blargh"],
|
||||
'timestamp': [
|
||||
"2024-03-01T17:10:32", "2024-03-02T17:10:32", "2024-03-03T17:10:32",
|
||||
"2024-03-04T17:10:32", "2024-03-05T17:10:32", "2024-03-06T17:10:32",
|
||||
"2024-03-07T17:10:32",
|
||||
]
|
||||
})
|
||||
|
||||
pyarrow.parquet.write_table(pyarrow.Table.from_pandas(df), "input.parquet")
|
||||
Binary file not shown.
|
|
@ -0,0 +1,7 @@
|
|||
[[inputs.test]]
|
||||
files = ["input.parquet"]
|
||||
data_format = "parquet"
|
||||
|
||||
tag_columns = ["tag", "str_field"]
|
||||
timestamp_column = "timestamp"
|
||||
timestamp_format = "2006-01-02T15:04:05"
|
||||
|
|
@ -0,0 +1,3 @@
|
|||
test value=1.1 1710511506000000000
|
||||
test value=2.2 1710597906000000000
|
||||
test value=3.3 1710684306000000000
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
#!/usr/bin/env python
|
||||
import pandas
|
||||
import pyarrow
|
||||
import pyarrow.parquet
|
||||
|
||||
df = pandas.DataFrame({
|
||||
'value': [1.1, 2.2, 3.3],
|
||||
'timestamp': [
|
||||
"2024-03-15T14:05:06+00:00", "2024-03-16T14:05:06+00:00",
|
||||
"2024-03-17T14:05:06+00:00",
|
||||
]
|
||||
})
|
||||
|
||||
pyarrow.parquet.write_table(pyarrow.Table.from_pandas(df), "input.parquet")
|
||||
Binary file not shown.
|
|
@ -0,0 +1,7 @@
|
|||
[[inputs.test]]
|
||||
files = ["input.parquet"]
|
||||
data_format = "parquet"
|
||||
|
||||
tag_columns = ["tag", "str_field"]
|
||||
timestamp_column = "timestamp"
|
||||
timestamp_format = "2006-01-02T15:04:05Z07:00"
|
||||
Loading…
Reference in New Issue