diff --git a/plugins/inputs/clickhouse/README.md b/plugins/inputs/clickhouse/README.md index 5c1d233e6..9b9e6caa9 100644 --- a/plugins/inputs/clickhouse/README.md +++ b/plugins/inputs/clickhouse/README.md @@ -7,7 +7,7 @@ This plugin gathers the statistic data from [ClickHouse](https://github.com/Clic # Read metrics from one or many ClickHouse servers [[inputs.clickhouse]] ## Username for authorization on ClickHouse server - ## example: user = "default" + ## example: username = "default" username = "default" ## Password for authorization on ClickHouse server @@ -109,6 +109,80 @@ This plugin gathers the statistic data from [ClickHouse](https://github.com/Clic - parts - rows +- clickhouse_zookeeper + - tags: + - source (ClickHouse server hostname) + - cluster (Name of the cluster [optional]) + - shard_num (Shard number in the cluster [optional]) + - fields: + - root_nodes (count of node from [system.zookeeper][] where path=/) + ++ clickhouse_replication_queue + - tags: + - source (ClickHouse server hostname) + - cluster (Name of the cluster [optional]) + - shard_num (Shard number in the cluster [optional]) + - fields: + - too_many_tries_replicas (count of replicas which have num_tries > 1 in `system.replication_queue`) + +- clickhouse_detached_parts + - tags: + - source (ClickHouse server hostname) + - cluster (Name of the cluster [optional]) + - shard_num (Shard number in the cluster [optional]) + - fields: + - detached_parts (total detached parts for all tables and databases from [system.detached_parts][]) + ++ clickhouse_dictionaries + - tags: + - source (ClickHouse server hostname) + - cluster (Name of the cluster [optional]) + - shard_num (Shard number in the cluster [optional]) + - dict_origin (xml Filename when dictionary created from *_dictionary.xml, database.table when dictionary created from DDL) + - fields: + - is_loaded (0 - when dictionary data not successful load, 1 - when dictionary data loading fail, see [system.dictionaries][] for details) + - bytes_allocated (how many bytes allocated in RAM after a dictionary loaded) + +- clickhouse_mutations + - tags: + - source (ClickHouse server hostname) + - cluster (Name of the cluster [optional]) + - shard_num (Shard number in the cluster [optional]) + - fields: + - running - gauge which show how much mutation doesn't complete now, see [system.mutations][] for details + - failed - counter which show total failed mutations from first clickhouse-server run + - completed - counter which show total successful finished mutations from first clickhouse-server run + ++ clickhouse_disks + - tags: + - source (ClickHouse server hostname) + - cluster (Name of the cluster [optional]) + - shard_num (Shard number in the cluster [optional]) + - name (disk name in storage configuration) + - path (path to disk) + - fields: + - free_space_percent - 0-100, gauge which show current percent of free disk space bytes relative to total disk space bytes + - keep_free_space_percent - 0-100, gauge which show current percent of required keep free disk bytes relative to total disk space bytes + +- clickhouse_processes + - tags: + - source (ClickHouse server hostname) + - cluster (Name of the cluster [optional]) + - shard_num (Shard number in the cluster [optional]) + - fields: + - percentile_50 - float gauge which show 50% percentile (quantile 0.5) for `elapsed` field of running processes, see [system.processes][] for details + - percentile_90 - float gauge which show 90% percentile (quantile 0.9) for `elapsed` field of running processes, see [system.processes][] for details + - longest_running - float gauge which show maximum value for `elapsed` field of running processes, see [system.processes][] for details + +- clickhouse_text_log + - tags: + - source (ClickHouse server hostname) + - cluster (Name of the cluster [optional]) + - shard_num (Shard number in the cluster [optional]) + - level (message level, only message with level less or equal Notice is collects), see details on [system.text_log][] + - fields: + - messages_last_10_min - gauge which show how many messages collected + ### Example Output ``` @@ -119,6 +193,13 @@ clickhouse_tables,cluster=test_cluster_two_shards_localhost,database=system,host clickhouse_tables,cluster=test_cluster_two_shards_localhost,database=default,host=kshvakov,source=localhost,shard_num=1,table=example bytes=326i,parts=2i,rows=2i 1569421000000000000 ``` -[system.events]: https://clickhouse.tech/docs/en/operations/system_tables/#system_tables-events -[system.metrics]: https://clickhouse.tech/docs/en/operations/system_tables/#system_tables-metrics -[system.asynchronous_metrics]: https://clickhouse.tech/docs/en/operations/system_tables/#system_tables-asynchronous_metrics +[system.events]: https://clickhouse.tech/docs/en/operations/system-tables/events/ +[system.metrics]: https://clickhouse.tech/docs/en/operations/system-tables/metrics/ +[system.asynchronous_metrics]: https://clickhouse.tech/docs/en/operations/system-tables/asynchronous_metrics/ +[system.zookeeper]: https://clickhouse.tech/docs/en/operations/system-tables/zookeeper/ +[system.detached_parts]: https://clickhouse.tech/docs/en/operations/system-tables/detached_parts/ +[system.dictionaries]: https://clickhouse.tech/docs/en/operations/system-tables/dictionaries/ +[system.mutations]: https://clickhouse.tech/docs/en/operations/system-tables/mutations/ +[system.disks]: https://clickhouse.tech/docs/en/operations/system-tables/disks/ +[system.processes]: https://clickhouse.tech/docs/en/operations/system-tables/processes/ +[system.text_log]: https://clickhouse.tech/docs/en/operations/system-tables/text_log/ diff --git a/plugins/inputs/clickhouse/clickhouse.go b/plugins/inputs/clickhouse/clickhouse.go index 927e41263..187ead5cf 100644 --- a/plugins/inputs/clickhouse/clickhouse.go +++ b/plugins/inputs/clickhouse/clickhouse.go @@ -23,7 +23,7 @@ var defaultTimeout = 5 * time.Second var sampleConfig = ` ## Username for authorization on ClickHouse server - ## example: user = "default"" + ## example: username = "default"" username = "default" ## Password for authorization on ClickHouse server @@ -115,7 +115,7 @@ type ClickHouse struct { ClusterInclude []string `toml:"cluster_include"` ClusterExclude []string `toml:"cluster_exclude"` Timeout internal.Duration `toml:"timeout"` - client http.Client + HTTPClient http.Client tls.ClientConfig } @@ -140,11 +140,12 @@ func (ch *ClickHouse) Start(telegraf.Accumulator) error { return err } - ch.client = http.Client{ + ch.HTTPClient = http.Client{ Timeout: timeout, Transport: &http.Transport{ - TLSClientConfig: tlsCfg, - Proxy: http.ProxyFromEnvironment, + TLSClientConfig: tlsCfg, + Proxy: http.ProxyFromEnvironment, + MaxIdleConnsPerHost: 1, }, } return nil @@ -187,15 +188,33 @@ func (ch *ClickHouse) Gather(acc telegraf.Accumulator) (err error) { } default: connects = append(connects, connect{ - url: u, + Hostname: u.Hostname(), + url: u, }) } } for _, conn := range connects { - if err := ch.tables(acc, &conn); err != nil { - acc.AddError(err) + + metricsFuncs := []func(acc telegraf.Accumulator, conn *connect) error{ + ch.tables, + ch.zookeeper, + ch.replicationQueue, + ch.detachedParts, + ch.dictionaries, + ch.mutations, + ch.disks, + ch.processes, + ch.textLog, } + + for _, metricFunc := range metricsFuncs { + if err := metricFunc(acc, &conn); err != nil { + acc.AddError(err) + } + + } + for metric := range commonMetrics { if err := ch.commonMetrics(acc, &conn, metric); err != nil { acc.AddError(err) @@ -206,7 +225,7 @@ func (ch *ClickHouse) Gather(acc telegraf.Accumulator) (err error) { } func (ch *ClickHouse) Stop() { - ch.client.CloseIdleConnections() + ch.HTTPClient.CloseIdleConnections() } func (ch *ClickHouse) clusterIncludeExcludeFilter() string { @@ -239,10 +258,7 @@ func (ch *ClickHouse) clusterIncludeExcludeFilter() string { if includeFilter == "" && excludeFilter != "" { return "WHERE " + excludeFilter } - if includeFilter != "" && excludeFilter == "" { - return "WHERE " + includeFilter - } - return "" + return "WHERE " + includeFilter } func (ch *ClickHouse) commonMetrics(acc telegraf.Accumulator, conn *connect, metric string) error { @@ -254,15 +270,7 @@ func (ch *ClickHouse) commonMetrics(acc telegraf.Accumulator, conn *connect, met return err } - tags := map[string]string{ - "source": conn.Hostname, - } - if len(conn.Cluster) != 0 { - tags["cluster"] = conn.Cluster - } - if conn.ShardNum != 0 { - tags["shard_num"] = strconv.Itoa(conn.ShardNum) - } + tags := ch.makeDefaultTags(conn) fields := make(map[string]interface{}) for _, r := range result { @@ -274,6 +282,241 @@ func (ch *ClickHouse) commonMetrics(acc telegraf.Accumulator, conn *connect, met return nil } +func (ch *ClickHouse) zookeeper(acc telegraf.Accumulator, conn *connect) error { + var zkExists []struct { + ZkExists chUInt64 `json:"zk_exists"` + } + + if err := ch.execQuery(conn.url, systemZookeeperExistsSQL, &zkExists); err != nil { + return err + } + tags := ch.makeDefaultTags(conn) + + if len(zkExists) > 0 && zkExists[0].ZkExists > 0 { + var zkRootNodes []struct { + ZkRootNodes chUInt64 `json:"zk_root_nodes"` + } + if err := ch.execQuery(conn.url, systemZookeeperRootNodesSQL, &zkRootNodes); err != nil { + return err + } + + acc.AddFields("clickhouse_zookeeper", + map[string]interface{}{ + "root_nodes": uint64(zkRootNodes[0].ZkRootNodes), + }, + tags, + ) + } + return nil +} + +func (ch *ClickHouse) replicationQueue(acc telegraf.Accumulator, conn *connect) error { + var replicationQueueExists []struct { + ReplicationQueueExists chUInt64 `json:"replication_queue_exists"` + } + + if err := ch.execQuery(conn.url, systemReplicationExistsSQL, &replicationQueueExists); err != nil { + return err + } + + tags := ch.makeDefaultTags(conn) + + if len(replicationQueueExists) > 0 && replicationQueueExists[0].ReplicationQueueExists > 0 { + var replicationTooManyTries []struct { + NumTriesReplicas chUInt64 `json:"replication_num_tries_replicas"` + TooManyTriesReplicas chUInt64 `json:"replication_too_many_tries_replicas"` + } + if err := ch.execQuery(conn.url, systemReplicationNumTriesSQL, &replicationTooManyTries); err != nil { + return err + } + + acc.AddFields("clickhouse_replication_queue", + map[string]interface{}{ + "too_many_tries_replicas": uint64(replicationTooManyTries[0].TooManyTriesReplicas), + "num_tries_replicas": uint64(replicationTooManyTries[0].NumTriesReplicas), + }, + tags, + ) + } + return nil +} + +func (ch *ClickHouse) detachedParts(acc telegraf.Accumulator, conn *connect) error { + + var detachedParts []struct { + DetachedParts chUInt64 `json:"detached_parts"` + } + if err := ch.execQuery(conn.url, systemDetachedPartsSQL, &detachedParts); err != nil { + return err + } + + if len(detachedParts) > 0 { + tags := ch.makeDefaultTags(conn) + acc.AddFields("clickhouse_detached_parts", + map[string]interface{}{ + "detached_parts": uint64(detachedParts[0].DetachedParts), + }, + tags, + ) + } + return nil +} + +func (ch *ClickHouse) dictionaries(acc telegraf.Accumulator, conn *connect) error { + + var brokenDictionaries []struct { + Origin string `json:"origin"` + BytesAllocated chUInt64 `json:"bytes_allocated"` + Status string `json:"status"` + } + if err := ch.execQuery(conn.url, systemDictionariesSQL, &brokenDictionaries); err != nil { + return err + } + + for _, dict := range brokenDictionaries { + tags := ch.makeDefaultTags(conn) + + isLoaded := uint64(1) + if dict.Status != "LOADED" { + isLoaded = 0 + } + + if dict.Origin != "" { + tags["dict_origin"] = dict.Origin + acc.AddFields("clickhouse_dictionaries", + map[string]interface{}{ + "is_loaded": isLoaded, + "bytes_allocated": uint64(dict.BytesAllocated), + }, + tags, + ) + } + } + + return nil +} + +func (ch *ClickHouse) mutations(acc telegraf.Accumulator, conn *connect) error { + + var mutationsStatus []struct { + Failed chUInt64 `json:"failed"` + Running chUInt64 `json:"running"` + Completed chUInt64 `json:"completed"` + } + if err := ch.execQuery(conn.url, systemMutationSQL, &mutationsStatus); err != nil { + return err + } + + if len(mutationsStatus) > 0 { + tags := ch.makeDefaultTags(conn) + + acc.AddFields("clickhouse_mutations", + map[string]interface{}{ + "failed": uint64(mutationsStatus[0].Failed), + "running": uint64(mutationsStatus[0].Running), + "completed": uint64(mutationsStatus[0].Completed), + }, + tags, + ) + } + + return nil +} + +func (ch *ClickHouse) disks(acc telegraf.Accumulator, conn *connect) error { + + var disksStatus []struct { + Name string `json:"name"` + Path string `json:"path"` + FreePercent chUInt64 `json:"free_space_percent"` + KeepFreePercent chUInt64 `json:"keep_free_space_percent"` + } + + if err := ch.execQuery(conn.url, systemDisksSQL, &disksStatus); err != nil { + return err + } + + for _, disk := range disksStatus { + tags := ch.makeDefaultTags(conn) + tags["name"] = disk.Name + tags["path"] = disk.Path + + acc.AddFields("clickhouse_disks", + map[string]interface{}{ + "free_space_percent": uint64(disk.FreePercent), + "keep_free_space_percent": uint64(disk.KeepFreePercent), + }, + tags, + ) + + } + + return nil +} + +func (ch *ClickHouse) processes(acc telegraf.Accumulator, conn *connect) error { + + var processesStats []struct { + QueryType string `json:"query_type"` + Percentile50 float64 `json:"p50"` + Percentile90 float64 `json:"p90"` + LongestRunning float64 `json:"longest_running"` + } + + if err := ch.execQuery(conn.url, systemProcessesSQL, &processesStats); err != nil { + return err + } + + for _, process := range processesStats { + tags := ch.makeDefaultTags(conn) + tags["query_type"] = process.QueryType + + acc.AddFields("clickhouse_processes", + map[string]interface{}{ + "percentile_50": process.Percentile50, + "percentile_90": process.Percentile90, + "longest_running": process.LongestRunning, + }, + tags, + ) + + } + + return nil +} + +func (ch *ClickHouse) textLog(acc telegraf.Accumulator, conn *connect) error { + var textLogExists []struct { + TextLogExists chUInt64 `json:"text_log_exists"` + } + + if err := ch.execQuery(conn.url, systemTextLogExistsSQL, &textLogExists); err != nil { + return err + } + + if len(textLogExists) > 0 && textLogExists[0].TextLogExists > 0 { + var textLogLast10MinMessages []struct { + Level string `json:"level"` + MessagesLast10Min chUInt64 `json:"messages_last_10_min"` + } + if err := ch.execQuery(conn.url, systemTextLogSQL, &textLogLast10MinMessages); err != nil { + return err + } + + for _, textLogItem := range textLogLast10MinMessages { + tags := ch.makeDefaultTags(conn) + tags["level"] = textLogItem.Level + acc.AddFields("clickhouse_text_log", + map[string]interface{}{ + "messages_last_10_min": uint64(textLogItem.MessagesLast10Min), + }, + tags, + ) + } + } + return nil +} + func (ch *ClickHouse) tables(acc telegraf.Accumulator, conn *connect) error { var parts []struct { Database string `json:"database"` @@ -283,18 +526,11 @@ func (ch *ClickHouse) tables(acc telegraf.Accumulator, conn *connect) error { Rows chUInt64 `json:"rows"` } - if err := ch.execQuery(conn.url, systemParts, &parts); err != nil { + if err := ch.execQuery(conn.url, systemPartsSQL, &parts); err != nil { return err } - tags := map[string]string{ - "source": conn.Hostname, - } - if len(conn.Cluster) != 0 { - tags["cluster"] = conn.Cluster - } - if conn.ShardNum != 0 { - tags["shard_num"] = strconv.Itoa(conn.ShardNum) - } + tags := ch.makeDefaultTags(conn) + for _, part := range parts { tags["table"] = part.Table tags["database"] = part.Database @@ -310,6 +546,19 @@ func (ch *ClickHouse) tables(acc telegraf.Accumulator, conn *connect) error { return nil } +func (ch *ClickHouse) makeDefaultTags(conn *connect) map[string]string { + tags := map[string]string{ + "source": conn.Hostname, + } + if len(conn.Cluster) != 0 { + tags["cluster"] = conn.Cluster + } + if conn.ShardNum != 0 { + tags["shard_num"] = strconv.Itoa(conn.ShardNum) + } + return tags +} + type clickhouseError struct { StatusCode int body []byte @@ -330,7 +579,7 @@ func (ch *ClickHouse) execQuery(url *url.URL, query string, i interface{}) error if ch.Password != "" { req.Header.Add("X-ClickHouse-Key", ch.Password) } - resp, err := ch.client.Do(req) + resp, err := ch.HTTPClient.Do(req) if err != nil { return err } @@ -348,7 +597,14 @@ func (ch *ClickHouse) execQuery(url *url.URL, query string, i interface{}) error if err := json.NewDecoder(resp.Body).Decode(&response); err != nil { return err } - return json.Unmarshal(response.Data, i) + if err := json.Unmarshal(response.Data, i); err != nil { + return err + } + + if _, err := io.Copy(ioutil.Discard, resp.Body); err != nil { + return err + } + return nil } // see https://clickhouse.yandex/docs/en/operations/settings/settings/#session_settings-output_format_json_quote_64bit_integers @@ -369,7 +625,7 @@ const ( systemEventsSQL = "SELECT event AS metric, CAST(value AS UInt64) AS value FROM system.events" systemMetricsSQL = "SELECT metric, CAST(value AS UInt64) AS value FROM system.metrics" systemAsyncMetricsSQL = "SELECT metric, CAST(value AS UInt64) AS value FROM system.asynchronous_metrics" - systemParts = ` + systemPartsSQL = ` SELECT database, table, @@ -383,6 +639,22 @@ const ( ORDER BY database, table ` + systemZookeeperExistsSQL = "SELECT count() AS zk_exists FROM system.tables WHERE database='system' AND name='zookeeper'" + systemZookeeperRootNodesSQL = "SELECT count() AS zk_root_nodes FROM system.zookeeper WHERE path='/'" + + systemReplicationExistsSQL = "SELECT count() AS replication_queue_exists FROM system.tables WHERE database='system' AND name='replication_queue'" + systemReplicationNumTriesSQL = "SELECT countIf(num_tries>1) AS replication_num_tries_replicas, countIf(num_tries>100) AS replication_too_many_tries_replicas FROM system.replication_queue" + + systemDetachedPartsSQL = "SELECT count() AS detached_parts FROM system.detached_parts" + + systemDictionariesSQL = "SELECT origin, status, bytes_allocated FROM system.dictionaries" + + systemMutationSQL = "SELECT countIf(latest_fail_time>toDateTime('0000-00-00 00:00:00') AND is_done=0) AS failed, countIf(latest_fail_time=toDateTime('0000-00-00 00:00:00') AND is_done=0) AS running, countIf(is_done=1) AS completed FROM system.mutations" + systemDisksSQL = "SELECT name, path, toUInt64(100*free_space / total_space) AS free_space_percent, toUInt64( 100 * keep_free_space / total_space) AS keep_free_space_percent FROM system.disks" + systemProcessesSQL = "SELECT multiIf(positionCaseInsensitive(query,'select')=1,'select',positionCaseInsensitive(query,'insert')=1,'insert','other') AS query_type, quantile\n(0.5)(elapsed) AS p50, quantile(0.9)(elapsed) AS p90, max(elapsed) AS longest_running FROM system.processes GROUP BY query_type" + + systemTextLogExistsSQL = "SELECT count() AS text_log_exists FROM system.tables WHERE database='system' AND name='text_log'" + systemTextLogSQL = "SELECT count() AS messages_last_10_min, level FROM system.text_log WHERE level <= 'Notice' AND event_time >= now() - INTERVAL 600 SECOND GROUP BY level" ) var commonMetrics = map[string]string{ diff --git a/plugins/inputs/clickhouse/clickhouse_test.go b/plugins/inputs/clickhouse/clickhouse_test.go index 382d2148a..68a443844 100644 --- a/plugins/inputs/clickhouse/clickhouse_test.go +++ b/plugins/inputs/clickhouse/clickhouse_test.go @@ -6,6 +6,7 @@ import ( "net/http/httptest" "strings" "testing" + "time" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" @@ -121,6 +122,168 @@ func TestGather(t *testing.T) { }, }, }) + case strings.Contains(query, "zk_exists"): + enc.Encode(result{ + Data: []struct { + ZkExists chUInt64 `json:"zk_exists"` + }{ + { + ZkExists: 1, + }, + }, + }) + case strings.Contains(query, "zk_root_nodes"): + enc.Encode(result{ + Data: []struct { + ZkRootNodes chUInt64 `json:"zk_root_nodes"` + }{ + { + ZkRootNodes: 2, + }, + }, + }) + case strings.Contains(query, "replication_queue_exists"): + enc.Encode(result{ + Data: []struct { + ReplicationQueueExists chUInt64 `json:"replication_queue_exists"` + }{ + { + ReplicationQueueExists: 1, + }, + }, + }) + case strings.Contains(query, "replication_too_many_tries_replicas"): + enc.Encode(result{ + Data: []struct { + TooManyTriesReplicas chUInt64 `json:"replication_too_many_tries_replicas"` + NumTriesReplicas chUInt64 `json:"replication_num_tries_replicas"` + }{ + { + TooManyTriesReplicas: 10, + NumTriesReplicas: 100, + }, + }, + }) + case strings.Contains(query, "system.detached_parts"): + enc.Encode(result{ + Data: []struct { + DetachedParts chUInt64 `json:"detached_parts"` + }{ + { + DetachedParts: 10, + }, + }, + }) + case strings.Contains(query, "system.dictionaries"): + enc.Encode(result{ + Data: []struct { + Origin string `json:"origin"` + Status string `json:"status"` + BytesAllocated chUInt64 `json:"bytes_allocated"` + }{ + { + Origin: "default.test_dict", + Status: "NOT_LOADED", + BytesAllocated: 100, + }, + }, + }) + case strings.Contains(query, "system.mutations"): + enc.Encode(result{ + Data: []struct { + Failed chUInt64 `json:"failed"` + Completed chUInt64 `json:"completed"` + Running chUInt64 `json:"running"` + }{ + { + Failed: 10, + Running: 1, + Completed: 100, + }, + }, + }) + case strings.Contains(query, "system.disks"): + enc.Encode(result{ + Data: []struct { + Name string `json:"name"` + Path string `json:"path"` + FreePercent chUInt64 `json:"free_space_percent"` + KeepFreePercent chUInt64 `json:"keep_free_space_percent"` + }{ + { + Name: "default", + Path: "/var/lib/clickhouse", + FreePercent: 1, + KeepFreePercent: 10, + }, + }, + }) + case strings.Contains(query, "system.processes"): + enc.Encode(result{ + Data: []struct { + QueryType string `json:"query_type"` + Percentile50 float64 `json:"p50"` + Percentile90 float64 `json:"p90"` + LongestRunning float64 `json:"longest_running"` + }{ + { + QueryType: "select", + Percentile50: 0.1, + Percentile90: 0.5, + LongestRunning: 10, + }, + { + QueryType: "insert", + Percentile50: 0.2, + Percentile90: 1.5, + LongestRunning: 100, + }, + { + QueryType: "other", + Percentile50: 0.4, + Percentile90: 4.5, + LongestRunning: 1000, + }, + }, + }) + case strings.Contains(query, "text_log_exists"): + enc.Encode(result{ + Data: []struct { + TextLogExists chUInt64 `json:"text_log_exists"` + }{ + { + TextLogExists: 1, + }, + }, + }) + case strings.Contains(query, "system.text_log"): + enc.Encode(result{ + Data: []struct { + Level string `json:"level"` + LastMessagesLast10Min chUInt64 `json:"messages_last_10_min"` + }{ + { + Level: "Fatal", + LastMessagesLast10Min: 0, + }, + { + Level: "Critical", + LastMessagesLast10Min: 10, + }, + { + Level: "Error", + LastMessagesLast10Min: 20, + }, + { + Level: "Warning", + LastMessagesLast10Min: 30, + }, + { + Level: "Notice", + LastMessagesLast10Min: 40, + }, + }, + }) } })) ch = &ClickHouse{ @@ -133,12 +296,17 @@ func TestGather(t *testing.T) { defer ts.Close() ch.Gather(acc) - acc.AssertContainsFields(t, "clickhouse_tables", + acc.AssertContainsTaggedFields(t, "clickhouse_tables", map[string]interface{}{ "bytes": uint64(1), "parts": uint64(10), "rows": uint64(100), }, + map[string]string{ + "source": "127.0.0.1", + "table": "test_table", + "database": "test_database", + }, ) acc.AssertContainsFields(t, "clickhouse_events", map[string]interface{}{ @@ -158,4 +326,262 @@ func TestGather(t *testing.T) { "test_system_asynchronous_metric2": uint64(2000), }, ) + acc.AssertContainsFields(t, "clickhouse_zookeeper", + map[string]interface{}{ + "root_nodes": uint64(2), + }, + ) + acc.AssertContainsFields(t, "clickhouse_replication_queue", + map[string]interface{}{ + "too_many_tries_replicas": uint64(10), + "num_tries_replicas": uint64(100), + }, + ) + acc.AssertContainsFields(t, "clickhouse_detached_parts", + map[string]interface{}{ + "detached_parts": uint64(10), + }, + ) + acc.AssertContainsTaggedFields(t, "clickhouse_dictionaries", + map[string]interface{}{ + "is_loaded": uint64(0), + "bytes_allocated": uint64(100), + }, + map[string]string{ + "source": "127.0.0.1", + "dict_origin": "default.test_dict", + }, + ) + acc.AssertContainsFields(t, "clickhouse_mutations", + map[string]interface{}{ + "running": uint64(1), + "failed": uint64(10), + "completed": uint64(100), + }, + ) + acc.AssertContainsTaggedFields(t, "clickhouse_disks", + map[string]interface{}{ + "free_space_percent": uint64(1), + "keep_free_space_percent": uint64(10), + }, + map[string]string{ + "source": "127.0.0.1", + "name": "default", + "path": "/var/lib/clickhouse", + }, + ) + acc.AssertContainsTaggedFields(t, "clickhouse_processes", + map[string]interface{}{ + "percentile_50": 0.1, + "percentile_90": 0.5, + "longest_running": float64(10), + }, + map[string]string{ + "source": "127.0.0.1", + "query_type": "select", + }, + ) + + acc.AssertContainsTaggedFields(t, "clickhouse_processes", + map[string]interface{}{ + "percentile_50": 0.2, + "percentile_90": 1.5, + "longest_running": float64(100), + }, + map[string]string{ + "source": "127.0.0.1", + "query_type": "insert", + }, + ) + acc.AssertContainsTaggedFields(t, "clickhouse_processes", + map[string]interface{}{ + "percentile_50": 0.4, + "percentile_90": 4.5, + "longest_running": float64(1000), + }, + map[string]string{ + "source": "127.0.0.1", + "query_type": "other", + }, + ) + + for i, level := range []string{"Fatal", "Critical", "Error", "Warning", "Notice"} { + acc.AssertContainsTaggedFields(t, "clickhouse_text_log", + map[string]interface{}{ + "messages_last_10_min": uint64(i * 10), + }, + map[string]string{ + "source": "127.0.0.1", + "level": level, + }, + ) + } +} + +func TestGatherWithSomeTablesNotExists(t *testing.T) { + var ( + ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + type result struct { + Data interface{} `json:"data"` + } + enc := json.NewEncoder(w) + switch query := r.URL.Query().Get("query"); { + case strings.Contains(query, "zk_exists"): + enc.Encode(result{ + Data: []struct { + ZkExists chUInt64 `json:"zk_exists"` + }{ + { + ZkExists: 0, + }, + }, + }) + case strings.Contains(query, "replication_queue_exists"): + enc.Encode(result{ + Data: []struct { + ReplicationQueueExists chUInt64 `json:"replication_queue_exists"` + }{ + { + ReplicationQueueExists: 0, + }, + }, + }) + case strings.Contains(query, "text_log_exists"): + enc.Encode(result{ + Data: []struct { + TextLogExists chUInt64 `json:"text_log_exists"` + }{ + { + TextLogExists: 0, + }, + }, + }) + } + })) + ch = &ClickHouse{ + Servers: []string{ + ts.URL, + }, + Username: "default", + } + acc = &testutil.Accumulator{} + ) + defer ts.Close() + ch.Gather(acc) + + acc.AssertDoesNotContainMeasurement(t, "clickhouse_zookeeper") + acc.AssertDoesNotContainMeasurement(t, "clickhouse_replication_queue") + acc.AssertDoesNotContainMeasurement(t, "clickhouse_text_log") +} + +func TestWrongJSONMarshalling(t *testing.T) { + var ( + ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + type result struct { + Data interface{} `json:"data"` + } + enc := json.NewEncoder(w) + //wrong data section json + enc.Encode(result{ + Data: []struct{}{}, + }) + })) + ch = &ClickHouse{ + Servers: []string{ + ts.URL, + }, + Username: "default", + } + acc = &testutil.Accumulator{} + ) + defer ts.Close() + ch.Gather(acc) + + assert.Equal(t, 0, len(acc.Metrics)) + allMeasurements := []string{ + "clickhouse_events", + "clickhouse_metrics", + "clickhouse_asynchronous_metrics", + "clickhouse_tables", + "clickhouse_zookeeper", + "clickhouse_replication_queue", + "clickhouse_detached_parts", + "clickhouse_dictionaries", + "clickhouse_mutations", + "clickhouse_disks", + "clickhouse_processes", + "clickhouse_text_log", + } + assert.GreaterOrEqual(t, len(allMeasurements), len(acc.Errors)) +} + +func TestOfflineServer(t *testing.T) { + var ( + acc = &testutil.Accumulator{} + ch = &ClickHouse{ + Servers: []string{ + "http://wrong-domain.local:8123", + }, + Username: "default", + HTTPClient: http.Client{ + Timeout: 1 * time.Millisecond, + }, + } + ) + ch.Gather(acc) + + assert.Equal(t, 0, len(acc.Metrics)) + allMeasurements := []string{ + "clickhouse_events", + "clickhouse_metrics", + "clickhouse_asynchronous_metrics", + "clickhouse_tables", + "clickhouse_zookeeper", + "clickhouse_replication_queue", + "clickhouse_detached_parts", + "clickhouse_dictionaries", + "clickhouse_mutations", + "clickhouse_disks", + "clickhouse_processes", + "clickhouse_text_log", + } + assert.GreaterOrEqual(t, len(allMeasurements), len(acc.Errors)) +} + +func TestAutoDiscovery(t *testing.T) { + var ( + ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + type result struct { + Data interface{} `json:"data"` + } + enc := json.NewEncoder(w) + switch query := r.URL.Query().Get("query"); { + case strings.Contains(query, "system.clusters"): + enc.Encode(result{ + Data: []struct { + Cluster string `json:"test"` + Hostname string `json:"localhost"` + ShardNum chUInt64 `json:"shard_num"` + }{ + { + Cluster: "test_database", + Hostname: "test_table", + ShardNum: 1, + }, + }, + }) + } + })) + ch = &ClickHouse{ + Servers: []string{ + ts.URL, + }, + Username: "default", + AutoDiscovery: true, + } + acc = &testutil.Accumulator{} + ) + defer ts.Close() + ch.Gather(acc) + } diff --git a/plugins/inputs/clickhouse/dev/docker-compose.yml b/plugins/inputs/clickhouse/dev/docker-compose.yml index 4dd4d1846..c34ee9320 100644 --- a/plugins/inputs/clickhouse/dev/docker-compose.yml +++ b/plugins/inputs/clickhouse/dev/docker-compose.yml @@ -2,15 +2,26 @@ version: '3' services: clickhouse: - image: yandex/clickhouse-server:latest +# choose `:latest` after resolve https://github.com/ClickHouse/ClickHouse/issues/13057 + image: docker.io/yandex/clickhouse-server:${CLICKHOUSE_VERSION:-latest} volumes: + - ./test_dictionary.xml:/etc/clickhouse-server/01-test_dictionary.xml + - ./zookeeper.xml:/etc/clickhouse-server/config.d/00-zookeeper.xml + - ./tls_settings.xml:/etc/clickhouse-server/config.d/01-tls_settings.xml + # please comment text_log.xml when CLICKHOUSE_VERSION = 19.16 + - ./text_log.xml:/etc/clickhouse-server/config.d/02-text_log.xml + - ./part_log.xml:/etc/clickhouse-server/config.d/03-part_log.xml - ./dhparam.pem:/etc/clickhouse-server/dhparam.pem - - ./tls_settings.xml:/etc/clickhouse-server/config.d/00-tls_settings.xml - ../../../../testutil/pki/serverkey.pem:/etc/clickhouse-server/server.key - ../../../../testutil/pki/servercert.pem:/etc/clickhouse-server/server.crt - restart: always ports: - 8123:8123 - 8443:8443 - 9000:9000 - 9009:9009 + zookeeper: + image: docker.io/zookeeper:3.5.6 + volumes: + - /var/lib/zookeeper + ports: + - 2181:2181 diff --git a/plugins/inputs/clickhouse/dev/part_log.xml b/plugins/inputs/clickhouse/dev/part_log.xml new file mode 100644 index 000000000..e16a23894 --- /dev/null +++ b/plugins/inputs/clickhouse/dev/part_log.xml @@ -0,0 +1,12 @@ + + + system + part_log
+ 7500 + + event_date + + +
+ +
diff --git a/plugins/inputs/clickhouse/dev/telegraf.conf b/plugins/inputs/clickhouse/dev/telegraf.conf index 883baf845..b488ef611 100644 --- a/plugins/inputs/clickhouse/dev/telegraf.conf +++ b/plugins/inputs/clickhouse/dev/telegraf.conf @@ -2,7 +2,7 @@ [[inputs.clickhouse]] timeout = 2 - user = "default" + username = "default" servers = ["http://127.0.0.1:8123"] auto_discovery = true cluster_include = [] diff --git a/plugins/inputs/clickhouse/dev/telegraf_ssl.conf b/plugins/inputs/clickhouse/dev/telegraf_ssl.conf index 21288d84f..62b1cce9c 100644 --- a/plugins/inputs/clickhouse/dev/telegraf_ssl.conf +++ b/plugins/inputs/clickhouse/dev/telegraf_ssl.conf @@ -2,7 +2,7 @@ [[inputs.clickhouse]] timeout = 2 - user = "default" + username = "default" servers = ["https://127.0.0.1:8443"] auto_discovery = true cluster_include = [] diff --git a/plugins/inputs/clickhouse/dev/test_dictionary.xml b/plugins/inputs/clickhouse/dev/test_dictionary.xml new file mode 100644 index 000000000..2f8f1ae5e --- /dev/null +++ b/plugins/inputs/clickhouse/dev/test_dictionary.xml @@ -0,0 +1,63 @@ + + + + default.test_dict + + + + + Nom + + + + + Nom + String + + + Code + String + + + Cur + String + + + + + + + + 3306 + wrong + wrong + + 127.0.0.1 + 1 + + default + test
+
+ + + + + + + + + + +
+
diff --git a/plugins/inputs/clickhouse/dev/text_log.xml b/plugins/inputs/clickhouse/dev/text_log.xml new file mode 100644 index 000000000..bcccea8b7 --- /dev/null +++ b/plugins/inputs/clickhouse/dev/text_log.xml @@ -0,0 +1,12 @@ + + + notice + system + text_log
+ 7500 + + event_date + + +
+
diff --git a/plugins/inputs/clickhouse/dev/zookeeper.xml b/plugins/inputs/clickhouse/dev/zookeeper.xml new file mode 100644 index 000000000..ffd374000 --- /dev/null +++ b/plugins/inputs/clickhouse/dev/zookeeper.xml @@ -0,0 +1,19 @@ + + + + zookeeper + 2181 + + + + + + 1 + + localhost + 9000 + + + + +