Add new output plugin for Azure Data Explorer(ADX) (#9426)
This commit is contained in:
parent
2267733a04
commit
f57ffa2a9b
|
|
@ -8,6 +8,7 @@ following works:
|
|||
- collectd.org [MIT License](https://git.octo.it/?p=collectd.git;a=blob;f=COPYING;hb=HEAD)
|
||||
- github.com/Azure/azure-amqp-common-go [MIT License](https://github.com/Azure/azure-amqp-common-go/blob/master/LICENSE)
|
||||
- github.com/Azure/azure-event-hubs-go [MIT License](https://github.com/Azure/azure-event-hubs-go/blob/master/LICENSE)
|
||||
- github.com/Azure/azure-kusto-go [MIT](https://github.com/Azure/azure-kusto-go/blob/master/LICENSE)
|
||||
- github.com/Azure/azure-pipeline-go [MIT License](https://github.com/Azure/azure-pipeline-go/blob/master/LICENSE)
|
||||
- github.com/Azure/azure-sdk-for-go [Apache License 2.0](https://github.com/Azure/azure-sdk-for-go/blob/master/LICENSE)
|
||||
- github.com/Azure/azure-storage-blob-go [MIT License](https://github.com/Azure/azure-storage-blob-go/blob/master/LICENSE)
|
||||
|
|
@ -141,6 +142,7 @@ following works:
|
|||
- github.com/karrick/godirwalk [BSD 2-Clause "Simplified" License](https://github.com/karrick/godirwalk/blob/master/LICENSE)
|
||||
- github.com/kballard/go-shellquote [MIT License](https://github.com/kballard/go-shellquote/blob/master/LICENSE)
|
||||
- github.com/klauspost/compress [BSD 3-Clause Clear License](https://github.com/klauspost/compress/blob/master/LICENSE)
|
||||
- github.com/kylelemons/godebug [Apache License](https://github.com/kylelemons/godebug/blob/master/LICENSE)
|
||||
- github.com/leodido/ragel-machinery [MIT License](https://github.com/leodido/ragel-machinery/blob/develop/LICENSE)
|
||||
- github.com/mailru/easyjson [MIT License](https://github.com/mailru/easyjson/blob/master/LICENSE)
|
||||
- github.com/mattn/go-colorable [MIT License](https://github.com/mattn/go-colorable/blob/master/LICENSE)
|
||||
|
|
|
|||
3
go.mod
3
go.mod
|
|
@ -9,7 +9,8 @@ require (
|
|||
code.cloudfoundry.org/clock v1.0.0 // indirect
|
||||
collectd.org v0.5.0
|
||||
github.com/Azure/azure-event-hubs-go/v3 v3.2.0
|
||||
github.com/Azure/azure-storage-queue-go v0.0.0-20181215014128-6ed74e755687
|
||||
github.com/Azure/azure-kusto-go v0.3.2
|
||||
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd
|
||||
github.com/Azure/go-autorest/autorest v0.11.17
|
||||
github.com/Azure/go-autorest/autorest/adal v0.9.10
|
||||
github.com/Azure/go-autorest/autorest/azure/auth v0.5.6
|
||||
|
|
|
|||
16
go.sum
16
go.sum
|
|
@ -41,19 +41,24 @@ github.com/Azure/azure-amqp-common-go/v3 v3.0.0 h1:j9tjcwhypb/jek3raNrwlCIl7iKQY
|
|||
github.com/Azure/azure-amqp-common-go/v3 v3.0.0/go.mod h1:SY08giD/XbhTz07tJdpw1SoxQXHPN30+DI3Z04SYqyg=
|
||||
github.com/Azure/azure-event-hubs-go/v3 v3.2.0 h1:CQlxKH5a4NX1ZmbdqXUPRwuNGh2XvtgmhkZvkEuWzhs=
|
||||
github.com/Azure/azure-event-hubs-go/v3 v3.2.0/go.mod h1:BPIIJNH/l/fVHYq3Rm6eg4clbrULrQ3q7+icmqHyyLc=
|
||||
github.com/Azure/azure-kusto-go v0.3.2 h1:XpS9co6GvEDl2oICF9HsjEsQVwEpRK6wbNWb9Z+uqsY=
|
||||
github.com/Azure/azure-kusto-go v0.3.2/go.mod h1:wd50n4qlsSxh+G4f80t+Fnl2ShK9AcXD+lMOstiKuYo=
|
||||
github.com/Azure/azure-pipeline-go v0.1.8/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg=
|
||||
github.com/Azure/azure-pipeline-go v0.1.9/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg=
|
||||
github.com/Azure/azure-pipeline-go v0.2.1/go.mod h1:UGSo8XybXnIGZ3epmeBw7Jdz+HiUVpqIlpz/HKHylF4=
|
||||
github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U=
|
||||
github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
|
||||
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
||||
github.com/Azure/azure-sdk-for-go v37.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
||||
github.com/Azure/azure-sdk-for-go v44.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
||||
github.com/Azure/azure-sdk-for-go v45.1.0+incompatible h1:kxtaPD8n2z5Za+9e3sKsYG2IX6PG2R6VXtgS7gAbh3A=
|
||||
github.com/Azure/azure-sdk-for-go v45.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
||||
github.com/Azure/azure-storage-blob-go v0.6.0/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y=
|
||||
github.com/Azure/azure-storage-blob-go v0.8.0/go.mod h1:lPI3aLPpuLTeUwh1sViKXFxwl2B6teiRqI0deQUvsw0=
|
||||
github.com/Azure/azure-storage-blob-go v0.13.0 h1:lgWHvFh+UYBNVQLFHXkvul2f6yOPA9PIH82RTG2cSwc=
|
||||
github.com/Azure/azure-storage-blob-go v0.13.0/go.mod h1:pA9kNqtjUeQF2zOSu4s//nUdBD+e64lEuc4sVnuOfNs=
|
||||
github.com/Azure/azure-storage-queue-go v0.0.0-20181215014128-6ed74e755687 h1:7MiZ6Th+YTmwUdrKmFg5OMsGYz7IdQwjqL0RPxkhhOQ=
|
||||
github.com/Azure/azure-storage-queue-go v0.0.0-20181215014128-6ed74e755687/go.mod h1:K6am8mT+5iFXgingS9LUc7TmbsW6XBw3nxaRyaMyWc8=
|
||||
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd h1:b3wyxBl3vvr15tUAziPBPK354y+LSdfPCpex5oBttHo=
|
||||
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd/go.mod h1:K6am8mT+5iFXgingS9LUc7TmbsW6XBw3nxaRyaMyWc8=
|
||||
github.com/Azure/go-amqp v0.12.6 h1:34yItuwhA/nusvq2sPSNPQxZLCf/CtaogYH8n578mnY=
|
||||
github.com/Azure/go-amqp v0.12.6/go.mod h1:qApuH6OFTSKZFmCOxccvAv5rLizBQf4v8pRmG138DPo=
|
||||
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8=
|
||||
|
|
@ -63,6 +68,7 @@ github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK
|
|||
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
|
||||
github.com/Azure/go-autorest/autorest v0.9.0/go.mod h1:xyHB1BMZT0cuDHU7I0+g046+BFDTQ8rEZB0s4Yfa6bI=
|
||||
github.com/Azure/go-autorest/autorest v0.9.3/go.mod h1:GsRuLYvwzLjjjRoWEIyMUaYq8GNUx2nRB378IPt/1p0=
|
||||
github.com/Azure/go-autorest/autorest v0.10.0/go.mod h1:/FALq9T/kS7b5J5qsQ+RSTUdAmGFqi0vUdVNNx8q630=
|
||||
github.com/Azure/go-autorest/autorest v0.11.1/go.mod h1:JFgpikqFJ/MleTTxwepExTKnFUKKszPS8UavbQYUMuw=
|
||||
github.com/Azure/go-autorest/autorest v0.11.4/go.mod h1:JFgpikqFJ/MleTTxwepExTKnFUKKszPS8UavbQYUMuw=
|
||||
github.com/Azure/go-autorest/autorest v0.11.17 h1:2zCdHwNgRH+St1J+ZMf66xI8aLr/5KMy+wWLH97zwYM=
|
||||
|
|
@ -70,6 +76,7 @@ github.com/Azure/go-autorest/autorest v0.11.17/go.mod h1:eipySxLmqSyC5s5k1CLupqe
|
|||
github.com/Azure/go-autorest/autorest/adal v0.5.0/go.mod h1:8Z9fGy2MpX0PvDjB1pEgQTmVqjGhiHBW7RJJEciWzS0=
|
||||
github.com/Azure/go-autorest/autorest/adal v0.8.0/go.mod h1:Z6vX6WXXuyieHAXwMj0S6HY6e6wcHn37qQMBQlvY3lc=
|
||||
github.com/Azure/go-autorest/autorest/adal v0.8.1/go.mod h1:ZjhuQClTqx435SRJ2iMlOxPYt3d2C/T/7TiQCVZSn3Q=
|
||||
github.com/Azure/go-autorest/autorest/adal v0.8.2/go.mod h1:ZjhuQClTqx435SRJ2iMlOxPYt3d2C/T/7TiQCVZSn3Q=
|
||||
github.com/Azure/go-autorest/autorest/adal v0.9.0/go.mod h1:/c022QCutn2P7uY+/oQWWNcK9YU+MH96NgK+jErpbcg=
|
||||
github.com/Azure/go-autorest/autorest/adal v0.9.2/go.mod h1:/3SMAM86bP6wC9Ev35peQDUeqFZBMH07vvUOmg4z/fE=
|
||||
github.com/Azure/go-autorest/autorest/adal v0.9.5/go.mod h1:B7KF7jKIeC9Mct5spmyCB/A8CG/sEz1vwIRGv/bbw7A=
|
||||
|
|
@ -396,6 +403,7 @@ github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmf
|
|||
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
|
||||
github.com/coreos/go-systemd v0.0.0-20161114122254-48702e0da86b/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
|
||||
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
|
||||
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
|
||||
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
|
||||
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
|
||||
github.com/coreos/go-systemd/v22 v22.0.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk=
|
||||
|
|
@ -439,6 +447,7 @@ github.com/digitalocean/godo v1.42.1/go.mod h1:p7dOjjtSBqCTUksqtA5Fd3uaKs9kyTq2x
|
|||
github.com/dimchansky/utfbom v1.1.0/go.mod h1:rO41eb7gLfo8SF1jd9F8HplJm1Fewwi4mQvIirEdv+8=
|
||||
github.com/dimchansky/utfbom v1.1.1 h1:vV6w1AhK4VMnhBno/TPVCoK9U/LP0PkLCS9tbxHdi/U=
|
||||
github.com/dimchansky/utfbom v1.1.1/go.mod h1:SxdoEBH5qIqFocHMyGOXVAybYJdr71b1Q/j0mACtrfE=
|
||||
github.com/dnaeon/go-vcr v1.0.1 h1:r8L/HqC0Hje5AXMu1ooW8oyQyOFv4GxqpL0nRP7SLLY=
|
||||
github.com/dnaeon/go-vcr v1.0.1/go.mod h1:aBB1+wY4s93YsC3HHjMBMrwTj2R9FHDzUr9KyGc8n1E=
|
||||
github.com/docker/distribution v0.0.0-20190905152932-14b96e55d84c/go.mod h1:0+TTO4EOBfRPhZXAeF1Vu+W3hHZ8eLp8PgKVZlcvtFY=
|
||||
github.com/docker/distribution v2.7.1-0.20190205005809-0d3efadf0154+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w=
|
||||
|
|
@ -654,6 +663,7 @@ github.com/godbus/dbus v0.0.0-20151105175453-c7fdd8b5cd55/go.mod h1:/YcGZj5zSblf
|
|||
github.com/godbus/dbus v0.0.0-20180201030542-885f9cc04c9c/go.mod h1:/YcGZj5zSblfDWMMoOzV4fas9FZnQYTkDnsGvmh2Grw=
|
||||
github.com/godbus/dbus v0.0.0-20190422162347-ade71ed3457e/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4=
|
||||
github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
|
||||
github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE=
|
||||
github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
|
||||
github.com/gofrs/uuid v3.3.0+incompatible h1:8K4tyRfvU1CYPgJsveYFQMhpFd/wXNM7iK6rR7UHz84=
|
||||
github.com/gofrs/uuid v3.3.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
|
||||
|
|
@ -1040,6 +1050,7 @@ github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVc
|
|||
github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
|
||||
github.com/mattn/go-colorable v0.1.6 h1:6Su7aK7lXmJ/U79bYtBjLNaha4Fs1Rg9plHpcH+vvnE=
|
||||
github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
|
||||
github.com/mattn/go-ieproxy v0.0.0-20190610004146-91bb50d98149/go.mod h1:31jz6HNzdxOmlERGGEc4v/dMssOfmp2p5bT/okiKFFc=
|
||||
github.com/mattn/go-ieproxy v0.0.1 h1:qiyop7gCflfhwCzGyeT0gro3sF9AIg9HU98JORTkqfI=
|
||||
github.com/mattn/go-ieproxy v0.0.1/go.mod h1:pYabZ6IHcRpFh7vIaLfK7rdcWgFEb3SFJ6/gNWuh88E=
|
||||
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
|
||||
|
|
@ -1056,6 +1067,7 @@ github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzp
|
|||
github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
|
||||
github.com/mattn/go-shellwords v1.0.3/go.mod h1:3xCvwCdWdlDJUrvuMn7Wuy9eWs4pE8vqg+NOMyg4B2o=
|
||||
github.com/mattn/go-sqlite3 v1.11.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
|
||||
github.com/mattn/go-sqlite3 v1.11.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
|
||||
github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg=
|
||||
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
|
||||
github.com/mattn/go-tty v0.0.0-20180907095812-13ff1204f104/go.mod h1:XPvLUNfbS4fJH25nqRHfWLMa1ONC8Amw+mIA639KxkE=
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import (
|
|||
_ "github.com/influxdata/telegraf/plugins/outputs/amon"
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/amqp"
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/application_insights"
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/azure_data_explorer"
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/azure_monitor"
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/cloud_pubsub"
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/cloudwatch"
|
||||
|
|
|
|||
|
|
@ -0,0 +1,175 @@
|
|||
# Azure Data Explorer output plugin
|
||||
|
||||
This plugin writes metrics collected by any of the input plugins of Telegraf to [Azure Data Explorer](https://azure.microsoft.com/en-au/services/data-explorer/).
|
||||
|
||||
## Pre-requisites:
|
||||
- [Create Azure Data Explorer cluster and database](https://docs.microsoft.com/en-us/azure/data-explorer/create-cluster-database-portal)
|
||||
- VM/compute or container to host Telegraf - it could be hosted locally where an app/services to be monitored are deployed or remotely on a dedicated monitoring compute/container.
|
||||
|
||||
|
||||
## Configuration:
|
||||
|
||||
```toml
|
||||
[[outputs.azure_data_explorer]]
|
||||
## The URI property of the Azure Data Explorer resource on Azure
|
||||
## ex: https://myadxresource.australiasoutheast.kusto.windows.net
|
||||
# endpoint_url = ""
|
||||
|
||||
## The Azure Data Explorer database that the metrics will be ingested into.
|
||||
## The plugin will NOT generate this database automatically, it's expected that this database already exists before ingestion.
|
||||
## ex: "exampledatabase"
|
||||
# database = ""
|
||||
|
||||
## Timeout for Azure Data Explorer operations
|
||||
# timeout = "15s"
|
||||
|
||||
## Type of metrics grouping used when pushing to Azure Data Explorer.
|
||||
## Default is "TablePerMetric" for one table per different metric.
|
||||
## For more information, please check the plugin README.
|
||||
# metrics_grouping_type = "TablePerMetric"
|
||||
|
||||
## Name of the single table to store all the metrics (Only needed if metrics_grouping_type is "SingleTable").
|
||||
# table_name = ""
|
||||
|
||||
# timeout = "20s"
|
||||
|
||||
```
|
||||
|
||||
## Metrics Grouping
|
||||
|
||||
Metrics can be grouped in two ways to be sent to Azure Data Explorer. To specify which metric grouping type the plugin should use, the respective value should be given to the `metrics_grouping_type` in the config file. If no value is given to `metrics_grouping_type`, by default, the metrics will be grouped using `TablePerMetric`.
|
||||
|
||||
### TablePerMetric
|
||||
|
||||
The plugin will group the metrics by the metric name, and will send each group of metrics to an Azure Data Explorer table. If the table doesn't exist the plugin will create the table, if the table exists then the plugin will try to merge the Telegraf metric schema to the existing table. For more information about the merge process check the [`.create-merge` documentation](https://docs.microsoft.com/en-us/azure/data-explorer/kusto/management/create-merge-table-command).
|
||||
|
||||
The table name will match the `name` property of the metric, this means that the name of the metric should comply with the Azure Data Explorer table naming constraints in case you plan to add a prefix to the metric name.
|
||||
|
||||
|
||||
### SingleTable
|
||||
|
||||
The plugin will send all the metrics received to a single Azure Data Explorer table. The name of the table must be supplied via `table_name` the config file. If the table doesn't exist the plugin will create the table, if the table exists then the plugin will try to merge the Telegraf metric schema to the existing table. For more information about the merge process check the [`.create-merge` documentation](https://docs.microsoft.com/en-us/azure/data-explorer/kusto/management/create-merge-table-command).
|
||||
|
||||
|
||||
## Tables Schema
|
||||
|
||||
The schema of the Azure Data Explorer table will match the structure of the Telegraf `Metric` object. The corresponding Azure Data Explorer command would be like the following:
|
||||
```
|
||||
.create-merge table ['table-name'] (['fields']:dynamic, ['name']:string, ['tags']:dynamic, ['timestamp']:datetime)
|
||||
```
|
||||
|
||||
The corresponding table mapping would be like the following:
|
||||
```
|
||||
.create-or-alter table ['table-name'] ingestion json mapping 'table-name_mapping' '[{"column":"fields", "Properties":{"Path":"$[\'fields\']"}},{"column":"name", "Properties":{"Path":"$[\'name\']"}},{"column":"tags", "Properties":{"Path":"$[\'tags\']"}},{"column":"timestamp", "Properties":{"Path":"$[\'timestamp\']"}}]'
|
||||
```
|
||||
|
||||
**Note**: This plugin will automatically create Azure Data Explorer tables and corresponding table mapping as per the above mentioned commands. Since the `Metric` object is a complex type, the only output format supported is JSON.
|
||||
|
||||
## Authentiation
|
||||
|
||||
### Supported Authentication Methods
|
||||
This plugin provides several types of authentication. The plugin will check the existence of several specific environment variables, and consequently will choose the right method.
|
||||
|
||||
These methods are:
|
||||
|
||||
|
||||
1. AAD Application Tokens (Service Principals with secrets or certificates).
|
||||
|
||||
For guidance on how to create and register an App in Azure Active Directory check [this article](https://docs.microsoft.com/en-us/azure/active-directory/develop/quickstart-register-app#register-an-application), and for more information on the Service Principals check [this article](https://docs.microsoft.com/en-us/azure/active-directory/develop/app-objects-and-service-principals).
|
||||
|
||||
|
||||
2. AAD User Tokens
|
||||
- Allows Telegraf to authenticate like a user. This method is mainly used
|
||||
for development purposes only.
|
||||
|
||||
3. Managed Service Identity (MSI) token
|
||||
- If you are running Telegraf from Azure VM or infrastructure, then this is the prefered authentication method.
|
||||
|
||||
[principal]: https://docs.microsoft.com/en-us/azure/active-directory/develop/active-directory-application-objects
|
||||
|
||||
Whichever method, the designated Principal needs to be assigned the `Database User` role on the Database level in the Azure Data Explorer. This role will allow the plugin to create the required tables and ingest data into it.
|
||||
|
||||
### Configurations of the chosen Authentication Method
|
||||
|
||||
The plugin will authenticate using the first available of the
|
||||
following configurations, **it's important to understand that the assessment, and consequently choosing the authentication method, will happen in order as below**:
|
||||
|
||||
1. **Client Credentials**: Azure AD Application ID and Secret.
|
||||
|
||||
Set the following environment variables:
|
||||
|
||||
- `AZURE_TENANT_ID`: Specifies the Tenant to which to authenticate.
|
||||
- `AZURE_CLIENT_ID`: Specifies the app client ID to use.
|
||||
- `AZURE_CLIENT_SECRET`: Specifies the app secret to use.
|
||||
|
||||
2. **Client Certificate**: Azure AD Application ID and X.509 Certificate.
|
||||
|
||||
- `AZURE_TENANT_ID`: Specifies the Tenant to which to authenticate.
|
||||
- `AZURE_CLIENT_ID`: Specifies the app client ID to use.
|
||||
- `AZURE_CERTIFICATE_PATH`: Specifies the certificate Path to use.
|
||||
- `AZURE_CERTIFICATE_PASSWORD`: Specifies the certificate password to use.
|
||||
|
||||
3. **Resource Owner Password**: Azure AD User and Password. This grant type is
|
||||
*not recommended*, use device login instead if you need interactive login.
|
||||
|
||||
- `AZURE_TENANT_ID`: Specifies the Tenant to which to authenticate.
|
||||
- `AZURE_CLIENT_ID`: Specifies the app client ID to use.
|
||||
- `AZURE_USERNAME`: Specifies the username to use.
|
||||
- `AZURE_PASSWORD`: Specifies the password to use.
|
||||
|
||||
4. **Azure Managed Service Identity**: Delegate credential management to the
|
||||
platform. Requires that code is running in Azure, e.g. on a VM. All
|
||||
configuration is handled by Azure. See [Azure Managed Service Identity][msi]
|
||||
for more details. Only available when using the [Azure Resource Manager][arm].
|
||||
|
||||
[msi]: https://docs.microsoft.com/en-us/azure/active-directory/msi-overview
|
||||
[arm]: https://docs.microsoft.com/en-us/azure/azure-resource-manager/resource-group-overview
|
||||
|
||||
|
||||
## Querying collected metrics data in Azure Data Explorer
|
||||
With all above configurations, you will have data stored in following standard format for each metric type stored as an Azure Data Explorer table -
|
||||
ColumnName | ColumnType
|
||||
---------- | ----------
|
||||
fields | dynamic
|
||||
name | string
|
||||
tags | dynamic
|
||||
timestamp | datetime
|
||||
|
||||
As "fields" and "tags" are of dynamic data type so following multiple ways to query this data -
|
||||
1. **Query JSON attributes directly**: This is one of the coolest feature of Azure Data Explorer so you can run query like this -
|
||||
```
|
||||
Tablename
|
||||
| where fields.size_kb == 9120
|
||||
```
|
||||
2. **Use [Update policy](https://docs.microsoft.com/en-us/azure/data-explorer/kusto/management/updatepolicy)**: to transform data, in this case, to flatten dynamic data type columns. This is the recommended performant way for querying over large data volumes compared to querying directly over JSON attributes.
|
||||
```
|
||||
// Function to transform data
|
||||
.create-or-alter function Transform_TargetTableName() {
|
||||
SourceTableName
|
||||
| extend clerk_type = tags.clerk_type
|
||||
| extend host = tags.host
|
||||
}
|
||||
|
||||
// Create the destination table (if it doesn't exist already)
|
||||
.set-or-append TargetTableName <| Transform_TargetTableName() | limit 0
|
||||
|
||||
// Apply update policy on destination table
|
||||
.alter table TargetTableName policy update
|
||||
@'[{"IsEnabled": true, "Source": "SourceTableName", "Query": "Transform_TargetTableName()", "IsTransactional": false, "PropagateIngestionProperties": false}]'
|
||||
|
||||
```
|
||||
There are two ways to flatten dynamic columns as explained below. You can use either of these ways in above mentioned update policy function - 'Transform_TargetTableName()'
|
||||
- Use [bag_unpack plugin](https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/bag-unpackplugin) to unpack the dynamic columns as shown below. This method will unpack all columns, it could lead to issues in case source schema changes.
|
||||
```
|
||||
Tablename
|
||||
| evaluate bag_unpack(tags)
|
||||
| evaluate bag_unpack(fields)
|
||||
```
|
||||
|
||||
- Use [extend](https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/extendoperator) operator as shown below. This is the best way provided you know what columns are needed in the final destination table. Another benefit of this method is even if schema changes, it will not break your queries or dashboards.
|
||||
```
|
||||
Tablename
|
||||
| extend clerk_type = tags.clerk_type
|
||||
| extend host = tags.host
|
||||
```
|
||||
|
||||
|
|
@ -0,0 +1,255 @@
|
|||
package azure_data_explorer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-kusto-go/kusto"
|
||||
"github.com/Azure/azure-kusto-go/kusto/ingest"
|
||||
"github.com/Azure/azure-kusto-go/kusto/unsafe"
|
||||
"github.com/Azure/go-autorest/autorest/azure/auth"
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
"github.com/influxdata/telegraf/plugins/serializers"
|
||||
"github.com/influxdata/telegraf/plugins/serializers/json"
|
||||
)
|
||||
|
||||
type AzureDataExplorer struct {
|
||||
Endpoint string `toml:"endpoint_url"`
|
||||
Database string `toml:"database"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
Timeout config.Duration `toml:"timeout"`
|
||||
MetricsGrouping string `toml:"metrics_grouping_type"`
|
||||
TableName string `toml:"table_name"`
|
||||
client localClient
|
||||
ingesters map[string]localIngestor
|
||||
serializer serializers.Serializer
|
||||
createIngestor ingestorFactory
|
||||
}
|
||||
|
||||
const (
|
||||
tablePerMetric = "tablepermetric"
|
||||
singleTable = "singletable"
|
||||
)
|
||||
|
||||
type localIngestor interface {
|
||||
FromReader(ctx context.Context, reader io.Reader, options ...ingest.FileOption) (*ingest.Result, error)
|
||||
}
|
||||
|
||||
type localClient interface {
|
||||
Mgmt(ctx context.Context, db string, query kusto.Stmt, options ...kusto.MgmtOption) (*kusto.RowIterator, error)
|
||||
}
|
||||
|
||||
type ingestorFactory func(localClient, string, string) (localIngestor, error)
|
||||
|
||||
const createTableCommand = `.create-merge table ['%s'] (['fields']:dynamic, ['name']:string, ['tags']:dynamic, ['timestamp']:datetime);`
|
||||
const createTableMappingCommand = `.create-or-alter table ['%s'] ingestion json mapping '%s_mapping' '[{"column":"fields", "Properties":{"Path":"$[\'fields\']"}},{"column":"name", "Properties":{"Path":"$[\'name\']"}},{"column":"tags", "Properties":{"Path":"$[\'tags\']"}},{"column":"timestamp", "Properties":{"Path":"$[\'timestamp\']"}}]'`
|
||||
|
||||
func (adx *AzureDataExplorer) Description() string {
|
||||
return "Sends metrics to Azure Data Explorer"
|
||||
}
|
||||
|
||||
func (adx *AzureDataExplorer) SampleConfig() string {
|
||||
return `
|
||||
## Azure Data Exlorer cluster endpoint
|
||||
## ex: endpoint_url = "https://clustername.australiasoutheast.kusto.windows.net"
|
||||
endpoint_url = ""
|
||||
|
||||
## The Azure Data Explorer database that the metrics will be ingested into.
|
||||
## The plugin will NOT generate this database automatically, it's expected that this database already exists before ingestion.
|
||||
## ex: "exampledatabase"
|
||||
database = ""
|
||||
|
||||
## Timeout for Azure Data Explorer operations
|
||||
# timeout = "20s"
|
||||
|
||||
## Type of metrics grouping used when pushing to Azure Data Explorer.
|
||||
## Default is "TablePerMetric" for one table per different metric.
|
||||
## For more information, please check the plugin README.
|
||||
# metrics_grouping_type = "TablePerMetric"
|
||||
|
||||
## Name of the single table to store all the metrics (Only needed if metrics_grouping_type is "SingleTable").
|
||||
# table_name = ""
|
||||
|
||||
`
|
||||
}
|
||||
|
||||
func (adx *AzureDataExplorer) Connect() error {
|
||||
authorizer, err := auth.NewAuthorizerFromEnvironmentWithResource(adx.Endpoint)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
authorization := kusto.Authorization{
|
||||
Authorizer: authorizer,
|
||||
}
|
||||
client, err := kusto.New(adx.Endpoint, authorization)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
adx.client = client
|
||||
adx.ingesters = make(map[string]localIngestor)
|
||||
adx.createIngestor = createRealIngestor
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (adx *AzureDataExplorer) Close() error {
|
||||
adx.client = nil
|
||||
adx.ingesters = nil
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (adx *AzureDataExplorer) Write(metrics []telegraf.Metric) error {
|
||||
if adx.MetricsGrouping == tablePerMetric {
|
||||
return adx.writeTablePerMetric(metrics)
|
||||
}
|
||||
return adx.writeSingleTable(metrics)
|
||||
}
|
||||
|
||||
func (adx *AzureDataExplorer) writeTablePerMetric(metrics []telegraf.Metric) error {
|
||||
tableMetricGroups := make(map[string][]byte)
|
||||
// Group metrics by name and serialize them
|
||||
for _, m := range metrics {
|
||||
tableName := m.Name()
|
||||
metricInBytes, err := adx.serializer.Serialize(m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if existingBytes, ok := tableMetricGroups[tableName]; ok {
|
||||
tableMetricGroups[tableName] = append(existingBytes, metricInBytes...)
|
||||
} else {
|
||||
tableMetricGroups[tableName] = metricInBytes
|
||||
}
|
||||
}
|
||||
ctx := context.Background()
|
||||
ctx, cancel := context.WithTimeout(ctx, time.Duration(adx.Timeout))
|
||||
defer cancel()
|
||||
|
||||
// Push the metrics for each table
|
||||
format := ingest.FileFormat(ingest.JSON)
|
||||
for tableName, tableMetrics := range tableMetricGroups {
|
||||
if err := adx.pushMetrics(ctx, format, tableName, tableMetrics); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (adx *AzureDataExplorer) writeSingleTable(metrics []telegraf.Metric) error {
|
||||
//serialise each metric in metrics - store in byte[]
|
||||
metricsArray := make([]byte, 0)
|
||||
for _, m := range metrics {
|
||||
metricsInBytes, err := adx.serializer.Serialize(m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
metricsArray = append(metricsArray, metricsInBytes...)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
ctx, cancel := context.WithTimeout(ctx, time.Duration(adx.Timeout))
|
||||
defer cancel()
|
||||
|
||||
//push metrics to a single table
|
||||
format := ingest.FileFormat(ingest.JSON)
|
||||
err := adx.pushMetrics(ctx, format, adx.TableName, metricsArray)
|
||||
return err
|
||||
}
|
||||
|
||||
func (adx *AzureDataExplorer) pushMetrics(ctx context.Context, format ingest.FileOption, tableName string, metricsArray []byte) error {
|
||||
ingestor, err := adx.getIngestor(ctx, tableName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
reader := bytes.NewReader(metricsArray)
|
||||
mapping := ingest.IngestionMappingRef(fmt.Sprintf("%s_mapping", tableName), ingest.JSON)
|
||||
if _, err := ingestor.FromReader(ctx, reader, format, mapping); err != nil {
|
||||
adx.Log.Errorf("sending ingestion request to Azure Data Explorer for table %q failed: %v", tableName, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (adx *AzureDataExplorer) getIngestor(ctx context.Context, tableName string) (localIngestor, error) {
|
||||
ingestor := adx.ingesters[tableName]
|
||||
|
||||
if ingestor == nil {
|
||||
if err := adx.createAzureDataExplorerTable(ctx, tableName); err != nil {
|
||||
return nil, fmt.Errorf("creating table for %q failed: %v", tableName, err)
|
||||
}
|
||||
//create a new ingestor client for the table
|
||||
tempIngestor, err := adx.createIngestor(adx.client, adx.Database, tableName)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("creating ingestor for %q failed: %v", tableName, err)
|
||||
}
|
||||
adx.ingesters[tableName] = tempIngestor
|
||||
ingestor = tempIngestor
|
||||
}
|
||||
return ingestor, nil
|
||||
}
|
||||
|
||||
func (adx *AzureDataExplorer) createAzureDataExplorerTable(ctx context.Context, tableName string) error {
|
||||
createStmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(fmt.Sprintf(createTableCommand, tableName))
|
||||
if _, err := adx.client.Mgmt(ctx, adx.Database, createStmt); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
createTableMappingstmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(fmt.Sprintf(createTableMappingCommand, tableName, tableName))
|
||||
if _, err := adx.client.Mgmt(ctx, adx.Database, createTableMappingstmt); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (adx *AzureDataExplorer) Init() error {
|
||||
if adx.Endpoint == "" {
|
||||
return errors.New("Endpoint configuration cannot be empty")
|
||||
}
|
||||
if adx.Database == "" {
|
||||
return errors.New("Database configuration cannot be empty")
|
||||
}
|
||||
|
||||
adx.MetricsGrouping = strings.ToLower(adx.MetricsGrouping)
|
||||
if adx.MetricsGrouping == singleTable && adx.TableName == "" {
|
||||
return errors.New("Table name cannot be empty for SingleTable metrics grouping type")
|
||||
}
|
||||
if adx.MetricsGrouping == "" {
|
||||
adx.MetricsGrouping = tablePerMetric
|
||||
}
|
||||
if !(adx.MetricsGrouping == singleTable || adx.MetricsGrouping == tablePerMetric) {
|
||||
return errors.New("Metrics grouping type is not valid")
|
||||
}
|
||||
|
||||
serializer, err := json.NewSerializer(time.Second)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
adx.serializer = serializer
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
outputs.Add("azure_data_explorer", func() telegraf.Output {
|
||||
return &AzureDataExplorer{
|
||||
Timeout: config.Duration(20 * time.Second),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func createRealIngestor(client localClient, database string, tableName string) (localIngestor, error) {
|
||||
ingestor, err := ingest.New(client.(*kusto.Client), database, tableName)
|
||||
if ingestor != nil {
|
||||
return ingestor, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -0,0 +1,200 @@
|
|||
package azure_data_explorer
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-kusto-go/kusto"
|
||||
"github.com/Azure/azure-kusto-go/kusto/ingest"
|
||||
"github.com/influxdata/telegraf"
|
||||
telegrafJson "github.com/influxdata/telegraf/plugins/serializers/json"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
const createTableCommandExpected = `.create-merge table ['%s'] (['fields']:dynamic, ['name']:string, ['tags']:dynamic, ['timestamp']:datetime);`
|
||||
const createTableMappingCommandExpected = `.create-or-alter table ['%s'] ingestion json mapping '%s_mapping' '[{"column":"fields", "Properties":{"Path":"$[\'fields\']"}},{"column":"name", "Properties":{"Path":"$[\'name\']"}},{"column":"tags", "Properties":{"Path":"$[\'tags\']"}},{"column":"timestamp", "Properties":{"Path":"$[\'timestamp\']"}}]'`
|
||||
|
||||
func TestWrite(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
inputMetric []telegraf.Metric
|
||||
client *fakeClient
|
||||
createIngestor ingestorFactory
|
||||
metricsGrouping string
|
||||
tableName string
|
||||
expected map[string]interface{}
|
||||
expectedWriteError string
|
||||
}{
|
||||
{
|
||||
name: "Valid metric",
|
||||
inputMetric: testutil.MockMetrics(),
|
||||
client: &fakeClient{
|
||||
queries: make([]string, 0),
|
||||
internalMgmt: func(f *fakeClient, ctx context.Context, db string, query kusto.Stmt, options ...kusto.MgmtOption) (*kusto.RowIterator, error) {
|
||||
f.queries = append(f.queries, query.String())
|
||||
return &kusto.RowIterator{}, nil
|
||||
},
|
||||
},
|
||||
createIngestor: createFakeIngestor,
|
||||
metricsGrouping: tablePerMetric,
|
||||
expected: map[string]interface{}{
|
||||
"metricName": "test1",
|
||||
"fields": map[string]interface{}{
|
||||
"value": 1.0,
|
||||
},
|
||||
"tags": map[string]interface{}{
|
||||
"tag1": "value1",
|
||||
},
|
||||
"timestamp": float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).UnixNano() / int64(time.Second)),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Error in Mgmt",
|
||||
inputMetric: testutil.MockMetrics(),
|
||||
client: &fakeClient{
|
||||
queries: make([]string, 0),
|
||||
internalMgmt: func(f *fakeClient, ctx context.Context, db string, query kusto.Stmt, options ...kusto.MgmtOption) (*kusto.RowIterator, error) {
|
||||
return nil, errors.New("Something went wrong")
|
||||
},
|
||||
},
|
||||
createIngestor: createFakeIngestor,
|
||||
metricsGrouping: tablePerMetric,
|
||||
expected: map[string]interface{}{
|
||||
"metricName": "test1",
|
||||
"fields": map[string]interface{}{
|
||||
"value": 1.0,
|
||||
},
|
||||
"tags": map[string]interface{}{
|
||||
"tag1": "value1",
|
||||
},
|
||||
"timestamp": float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).UnixNano() / int64(time.Second)),
|
||||
},
|
||||
expectedWriteError: "creating table for \"test1\" failed: Something went wrong",
|
||||
},
|
||||
{
|
||||
name: "SingleTable metric grouping type",
|
||||
inputMetric: testutil.MockMetrics(),
|
||||
client: &fakeClient{
|
||||
queries: make([]string, 0),
|
||||
internalMgmt: func(f *fakeClient, ctx context.Context, db string, query kusto.Stmt, options ...kusto.MgmtOption) (*kusto.RowIterator, error) {
|
||||
f.queries = append(f.queries, query.String())
|
||||
return &kusto.RowIterator{}, nil
|
||||
},
|
||||
},
|
||||
createIngestor: createFakeIngestor,
|
||||
metricsGrouping: singleTable,
|
||||
expected: map[string]interface{}{
|
||||
"metricName": "test1",
|
||||
"fields": map[string]interface{}{
|
||||
"value": 1.0,
|
||||
},
|
||||
"tags": map[string]interface{}{
|
||||
"tag1": "value1",
|
||||
},
|
||||
"timestamp": float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).UnixNano() / int64(time.Second)),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tC := range testCases {
|
||||
t.Run(tC.name, func(t *testing.T) {
|
||||
serializer, err := telegrafJson.NewSerializer(time.Second)
|
||||
require.NoError(t, err)
|
||||
|
||||
plugin := AzureDataExplorer{
|
||||
Endpoint: "someendpoint",
|
||||
Database: "databasename",
|
||||
Log: testutil.Logger{},
|
||||
MetricsGrouping: tC.metricsGrouping,
|
||||
TableName: tC.tableName,
|
||||
client: tC.client,
|
||||
ingesters: map[string]localIngestor{},
|
||||
createIngestor: tC.createIngestor,
|
||||
serializer: serializer,
|
||||
}
|
||||
|
||||
errorInWrite := plugin.Write(testutil.MockMetrics())
|
||||
|
||||
if tC.expectedWriteError != "" {
|
||||
require.EqualError(t, errorInWrite, tC.expectedWriteError)
|
||||
} else {
|
||||
require.NoError(t, errorInWrite)
|
||||
|
||||
expectedNameOfMetric := tC.expected["metricName"].(string)
|
||||
expectedNameOfTable := expectedNameOfMetric
|
||||
createdIngestor := plugin.ingesters[expectedNameOfMetric]
|
||||
|
||||
if tC.metricsGrouping == singleTable {
|
||||
expectedNameOfTable = tC.tableName
|
||||
createdIngestor = plugin.ingesters[expectedNameOfTable]
|
||||
}
|
||||
|
||||
require.NotNil(t, createdIngestor)
|
||||
createdFakeIngestor := createdIngestor.(*fakeIngestor)
|
||||
require.Equal(t, expectedNameOfMetric, createdFakeIngestor.actualOutputMetric["name"])
|
||||
|
||||
expectedFields := tC.expected["fields"].(map[string]interface{})
|
||||
require.Equal(t, expectedFields, createdFakeIngestor.actualOutputMetric["fields"])
|
||||
|
||||
expectedTags := tC.expected["tags"].(map[string]interface{})
|
||||
require.Equal(t, expectedTags, createdFakeIngestor.actualOutputMetric["tags"])
|
||||
|
||||
expectedTime := tC.expected["timestamp"].(float64)
|
||||
require.Equal(t, expectedTime, createdFakeIngestor.actualOutputMetric["timestamp"])
|
||||
|
||||
createTableString := fmt.Sprintf(createTableCommandExpected, expectedNameOfTable)
|
||||
require.Equal(t, createTableString, tC.client.queries[0])
|
||||
|
||||
createTableMappingString := fmt.Sprintf(createTableMappingCommandExpected, expectedNameOfTable, expectedNameOfTable)
|
||||
require.Equal(t, createTableMappingString, tC.client.queries[1])
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestInitBlankEndpoint(t *testing.T) {
|
||||
plugin := AzureDataExplorer{
|
||||
Log: testutil.Logger{},
|
||||
client: &fakeClient{},
|
||||
ingesters: map[string]localIngestor{},
|
||||
createIngestor: createFakeIngestor,
|
||||
}
|
||||
|
||||
errorInit := plugin.Init()
|
||||
require.Error(t, errorInit)
|
||||
require.Equal(t, "Endpoint configuration cannot be empty", errorInit.Error())
|
||||
}
|
||||
|
||||
type fakeClient struct {
|
||||
queries []string
|
||||
internalMgmt func(client *fakeClient, ctx context.Context, db string, query kusto.Stmt, options ...kusto.MgmtOption) (*kusto.RowIterator, error)
|
||||
}
|
||||
|
||||
func (f *fakeClient) Mgmt(ctx context.Context, db string, query kusto.Stmt, options ...kusto.MgmtOption) (*kusto.RowIterator, error) {
|
||||
return f.internalMgmt(f, ctx, db, query, options...)
|
||||
}
|
||||
|
||||
type fakeIngestor struct {
|
||||
actualOutputMetric map[string]interface{}
|
||||
}
|
||||
|
||||
func createFakeIngestor(client localClient, database string, tableName string) (localIngestor, error) {
|
||||
return &fakeIngestor{}, nil
|
||||
}
|
||||
func (f *fakeIngestor) FromReader(ctx context.Context, reader io.Reader, options ...ingest.FileOption) (*ingest.Result, error) {
|
||||
scanner := bufio.NewScanner(reader)
|
||||
scanner.Scan()
|
||||
firstLine := scanner.Text()
|
||||
err := json.Unmarshal([]byte(firstLine), &f.actualOutputMetric)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &ingest.Result{}, nil
|
||||
}
|
||||
Loading…
Reference in New Issue