add additional metrics to clickhouse input plugin (#7904)

add metrics and test for following ClickHouse tables:
- system.zookeeper
- system.detached_parts
- system.dictionaries
- system.replication_queue
- system.mutations
- system.disks
- system.processes
- system.text_log
- system.part_log
This commit is contained in:
Eugene Klimov 2020-08-12 20:48:07 +05:00 committed by GitHub
parent 6c4636b860
commit 78811f7b1c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 941 additions and 45 deletions

View File

@ -7,7 +7,7 @@ This plugin gathers the statistic data from [ClickHouse](https://github.com/Clic
# Read metrics from one or many ClickHouse servers
[[inputs.clickhouse]]
## Username for authorization on ClickHouse server
## example: user = "default"
## example: username = "default"
username = "default"
## Password for authorization on ClickHouse server
@ -109,6 +109,80 @@ This plugin gathers the statistic data from [ClickHouse](https://github.com/Clic
- parts
- rows
- clickhouse_zookeeper
- tags:
- source (ClickHouse server hostname)
- cluster (Name of the cluster [optional])
- shard_num (Shard number in the cluster [optional])
- fields:
- root_nodes (count of node from [system.zookeeper][] where path=/)
+ clickhouse_replication_queue
- tags:
- source (ClickHouse server hostname)
- cluster (Name of the cluster [optional])
- shard_num (Shard number in the cluster [optional])
- fields:
- too_many_tries_replicas (count of replicas which have num_tries > 1 in `system.replication_queue`)
- clickhouse_detached_parts
- tags:
- source (ClickHouse server hostname)
- cluster (Name of the cluster [optional])
- shard_num (Shard number in the cluster [optional])
- fields:
- detached_parts (total detached parts for all tables and databases from [system.detached_parts][])
+ clickhouse_dictionaries
- tags:
- source (ClickHouse server hostname)
- cluster (Name of the cluster [optional])
- shard_num (Shard number in the cluster [optional])
- dict_origin (xml Filename when dictionary created from *_dictionary.xml, database.table when dictionary created from DDL)
- fields:
- is_loaded (0 - when dictionary data not successful load, 1 - when dictionary data loading fail, see [system.dictionaries][] for details)
- bytes_allocated (how many bytes allocated in RAM after a dictionary loaded)
- clickhouse_mutations
- tags:
- source (ClickHouse server hostname)
- cluster (Name of the cluster [optional])
- shard_num (Shard number in the cluster [optional])
- fields:
- running - gauge which show how much mutation doesn't complete now, see [system.mutations][] for details
- failed - counter which show total failed mutations from first clickhouse-server run
- completed - counter which show total successful finished mutations from first clickhouse-server run
+ clickhouse_disks
- tags:
- source (ClickHouse server hostname)
- cluster (Name of the cluster [optional])
- shard_num (Shard number in the cluster [optional])
- name (disk name in storage configuration)
- path (path to disk)
- fields:
- free_space_percent - 0-100, gauge which show current percent of free disk space bytes relative to total disk space bytes
- keep_free_space_percent - 0-100, gauge which show current percent of required keep free disk bytes relative to total disk space bytes
- clickhouse_processes
- tags:
- source (ClickHouse server hostname)
- cluster (Name of the cluster [optional])
- shard_num (Shard number in the cluster [optional])
- fields:
- percentile_50 - float gauge which show 50% percentile (quantile 0.5) for `elapsed` field of running processes, see [system.processes][] for details
- percentile_90 - float gauge which show 90% percentile (quantile 0.9) for `elapsed` field of running processes, see [system.processes][] for details
- longest_running - float gauge which show maximum value for `elapsed` field of running processes, see [system.processes][] for details
- clickhouse_text_log
- tags:
- source (ClickHouse server hostname)
- cluster (Name of the cluster [optional])
- shard_num (Shard number in the cluster [optional])
- level (message level, only message with level less or equal Notice is collects), see details on [system.text_log][]
- fields:
- messages_last_10_min - gauge which show how many messages collected
### Example Output
```
@ -119,6 +193,13 @@ clickhouse_tables,cluster=test_cluster_two_shards_localhost,database=system,host
clickhouse_tables,cluster=test_cluster_two_shards_localhost,database=default,host=kshvakov,source=localhost,shard_num=1,table=example bytes=326i,parts=2i,rows=2i 1569421000000000000
```
[system.events]: https://clickhouse.tech/docs/en/operations/system_tables/#system_tables-events
[system.metrics]: https://clickhouse.tech/docs/en/operations/system_tables/#system_tables-metrics
[system.asynchronous_metrics]: https://clickhouse.tech/docs/en/operations/system_tables/#system_tables-asynchronous_metrics
[system.events]: https://clickhouse.tech/docs/en/operations/system-tables/events/
[system.metrics]: https://clickhouse.tech/docs/en/operations/system-tables/metrics/
[system.asynchronous_metrics]: https://clickhouse.tech/docs/en/operations/system-tables/asynchronous_metrics/
[system.zookeeper]: https://clickhouse.tech/docs/en/operations/system-tables/zookeeper/
[system.detached_parts]: https://clickhouse.tech/docs/en/operations/system-tables/detached_parts/
[system.dictionaries]: https://clickhouse.tech/docs/en/operations/system-tables/dictionaries/
[system.mutations]: https://clickhouse.tech/docs/en/operations/system-tables/mutations/
[system.disks]: https://clickhouse.tech/docs/en/operations/system-tables/disks/
[system.processes]: https://clickhouse.tech/docs/en/operations/system-tables/processes/
[system.text_log]: https://clickhouse.tech/docs/en/operations/system-tables/text_log/

View File

@ -23,7 +23,7 @@ var defaultTimeout = 5 * time.Second
var sampleConfig = `
## Username for authorization on ClickHouse server
## example: user = "default""
## example: username = "default""
username = "default"
## Password for authorization on ClickHouse server
@ -115,7 +115,7 @@ type ClickHouse struct {
ClusterInclude []string `toml:"cluster_include"`
ClusterExclude []string `toml:"cluster_exclude"`
Timeout internal.Duration `toml:"timeout"`
client http.Client
HTTPClient http.Client
tls.ClientConfig
}
@ -140,11 +140,12 @@ func (ch *ClickHouse) Start(telegraf.Accumulator) error {
return err
}
ch.client = http.Client{
ch.HTTPClient = http.Client{
Timeout: timeout,
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: tlsCfg,
Proxy: http.ProxyFromEnvironment,
MaxIdleConnsPerHost: 1,
},
}
return nil
@ -187,15 +188,33 @@ func (ch *ClickHouse) Gather(acc telegraf.Accumulator) (err error) {
}
default:
connects = append(connects, connect{
url: u,
Hostname: u.Hostname(),
url: u,
})
}
}
for _, conn := range connects {
if err := ch.tables(acc, &conn); err != nil {
acc.AddError(err)
metricsFuncs := []func(acc telegraf.Accumulator, conn *connect) error{
ch.tables,
ch.zookeeper,
ch.replicationQueue,
ch.detachedParts,
ch.dictionaries,
ch.mutations,
ch.disks,
ch.processes,
ch.textLog,
}
for _, metricFunc := range metricsFuncs {
if err := metricFunc(acc, &conn); err != nil {
acc.AddError(err)
}
}
for metric := range commonMetrics {
if err := ch.commonMetrics(acc, &conn, metric); err != nil {
acc.AddError(err)
@ -206,7 +225,7 @@ func (ch *ClickHouse) Gather(acc telegraf.Accumulator) (err error) {
}
func (ch *ClickHouse) Stop() {
ch.client.CloseIdleConnections()
ch.HTTPClient.CloseIdleConnections()
}
func (ch *ClickHouse) clusterIncludeExcludeFilter() string {
@ -239,10 +258,7 @@ func (ch *ClickHouse) clusterIncludeExcludeFilter() string {
if includeFilter == "" && excludeFilter != "" {
return "WHERE " + excludeFilter
}
if includeFilter != "" && excludeFilter == "" {
return "WHERE " + includeFilter
}
return ""
return "WHERE " + includeFilter
}
func (ch *ClickHouse) commonMetrics(acc telegraf.Accumulator, conn *connect, metric string) error {
@ -254,15 +270,7 @@ func (ch *ClickHouse) commonMetrics(acc telegraf.Accumulator, conn *connect, met
return err
}
tags := map[string]string{
"source": conn.Hostname,
}
if len(conn.Cluster) != 0 {
tags["cluster"] = conn.Cluster
}
if conn.ShardNum != 0 {
tags["shard_num"] = strconv.Itoa(conn.ShardNum)
}
tags := ch.makeDefaultTags(conn)
fields := make(map[string]interface{})
for _, r := range result {
@ -274,6 +282,241 @@ func (ch *ClickHouse) commonMetrics(acc telegraf.Accumulator, conn *connect, met
return nil
}
func (ch *ClickHouse) zookeeper(acc telegraf.Accumulator, conn *connect) error {
var zkExists []struct {
ZkExists chUInt64 `json:"zk_exists"`
}
if err := ch.execQuery(conn.url, systemZookeeperExistsSQL, &zkExists); err != nil {
return err
}
tags := ch.makeDefaultTags(conn)
if len(zkExists) > 0 && zkExists[0].ZkExists > 0 {
var zkRootNodes []struct {
ZkRootNodes chUInt64 `json:"zk_root_nodes"`
}
if err := ch.execQuery(conn.url, systemZookeeperRootNodesSQL, &zkRootNodes); err != nil {
return err
}
acc.AddFields("clickhouse_zookeeper",
map[string]interface{}{
"root_nodes": uint64(zkRootNodes[0].ZkRootNodes),
},
tags,
)
}
return nil
}
func (ch *ClickHouse) replicationQueue(acc telegraf.Accumulator, conn *connect) error {
var replicationQueueExists []struct {
ReplicationQueueExists chUInt64 `json:"replication_queue_exists"`
}
if err := ch.execQuery(conn.url, systemReplicationExistsSQL, &replicationQueueExists); err != nil {
return err
}
tags := ch.makeDefaultTags(conn)
if len(replicationQueueExists) > 0 && replicationQueueExists[0].ReplicationQueueExists > 0 {
var replicationTooManyTries []struct {
NumTriesReplicas chUInt64 `json:"replication_num_tries_replicas"`
TooManyTriesReplicas chUInt64 `json:"replication_too_many_tries_replicas"`
}
if err := ch.execQuery(conn.url, systemReplicationNumTriesSQL, &replicationTooManyTries); err != nil {
return err
}
acc.AddFields("clickhouse_replication_queue",
map[string]interface{}{
"too_many_tries_replicas": uint64(replicationTooManyTries[0].TooManyTriesReplicas),
"num_tries_replicas": uint64(replicationTooManyTries[0].NumTriesReplicas),
},
tags,
)
}
return nil
}
func (ch *ClickHouse) detachedParts(acc telegraf.Accumulator, conn *connect) error {
var detachedParts []struct {
DetachedParts chUInt64 `json:"detached_parts"`
}
if err := ch.execQuery(conn.url, systemDetachedPartsSQL, &detachedParts); err != nil {
return err
}
if len(detachedParts) > 0 {
tags := ch.makeDefaultTags(conn)
acc.AddFields("clickhouse_detached_parts",
map[string]interface{}{
"detached_parts": uint64(detachedParts[0].DetachedParts),
},
tags,
)
}
return nil
}
func (ch *ClickHouse) dictionaries(acc telegraf.Accumulator, conn *connect) error {
var brokenDictionaries []struct {
Origin string `json:"origin"`
BytesAllocated chUInt64 `json:"bytes_allocated"`
Status string `json:"status"`
}
if err := ch.execQuery(conn.url, systemDictionariesSQL, &brokenDictionaries); err != nil {
return err
}
for _, dict := range brokenDictionaries {
tags := ch.makeDefaultTags(conn)
isLoaded := uint64(1)
if dict.Status != "LOADED" {
isLoaded = 0
}
if dict.Origin != "" {
tags["dict_origin"] = dict.Origin
acc.AddFields("clickhouse_dictionaries",
map[string]interface{}{
"is_loaded": isLoaded,
"bytes_allocated": uint64(dict.BytesAllocated),
},
tags,
)
}
}
return nil
}
func (ch *ClickHouse) mutations(acc telegraf.Accumulator, conn *connect) error {
var mutationsStatus []struct {
Failed chUInt64 `json:"failed"`
Running chUInt64 `json:"running"`
Completed chUInt64 `json:"completed"`
}
if err := ch.execQuery(conn.url, systemMutationSQL, &mutationsStatus); err != nil {
return err
}
if len(mutationsStatus) > 0 {
tags := ch.makeDefaultTags(conn)
acc.AddFields("clickhouse_mutations",
map[string]interface{}{
"failed": uint64(mutationsStatus[0].Failed),
"running": uint64(mutationsStatus[0].Running),
"completed": uint64(mutationsStatus[0].Completed),
},
tags,
)
}
return nil
}
func (ch *ClickHouse) disks(acc telegraf.Accumulator, conn *connect) error {
var disksStatus []struct {
Name string `json:"name"`
Path string `json:"path"`
FreePercent chUInt64 `json:"free_space_percent"`
KeepFreePercent chUInt64 `json:"keep_free_space_percent"`
}
if err := ch.execQuery(conn.url, systemDisksSQL, &disksStatus); err != nil {
return err
}
for _, disk := range disksStatus {
tags := ch.makeDefaultTags(conn)
tags["name"] = disk.Name
tags["path"] = disk.Path
acc.AddFields("clickhouse_disks",
map[string]interface{}{
"free_space_percent": uint64(disk.FreePercent),
"keep_free_space_percent": uint64(disk.KeepFreePercent),
},
tags,
)
}
return nil
}
func (ch *ClickHouse) processes(acc telegraf.Accumulator, conn *connect) error {
var processesStats []struct {
QueryType string `json:"query_type"`
Percentile50 float64 `json:"p50"`
Percentile90 float64 `json:"p90"`
LongestRunning float64 `json:"longest_running"`
}
if err := ch.execQuery(conn.url, systemProcessesSQL, &processesStats); err != nil {
return err
}
for _, process := range processesStats {
tags := ch.makeDefaultTags(conn)
tags["query_type"] = process.QueryType
acc.AddFields("clickhouse_processes",
map[string]interface{}{
"percentile_50": process.Percentile50,
"percentile_90": process.Percentile90,
"longest_running": process.LongestRunning,
},
tags,
)
}
return nil
}
func (ch *ClickHouse) textLog(acc telegraf.Accumulator, conn *connect) error {
var textLogExists []struct {
TextLogExists chUInt64 `json:"text_log_exists"`
}
if err := ch.execQuery(conn.url, systemTextLogExistsSQL, &textLogExists); err != nil {
return err
}
if len(textLogExists) > 0 && textLogExists[0].TextLogExists > 0 {
var textLogLast10MinMessages []struct {
Level string `json:"level"`
MessagesLast10Min chUInt64 `json:"messages_last_10_min"`
}
if err := ch.execQuery(conn.url, systemTextLogSQL, &textLogLast10MinMessages); err != nil {
return err
}
for _, textLogItem := range textLogLast10MinMessages {
tags := ch.makeDefaultTags(conn)
tags["level"] = textLogItem.Level
acc.AddFields("clickhouse_text_log",
map[string]interface{}{
"messages_last_10_min": uint64(textLogItem.MessagesLast10Min),
},
tags,
)
}
}
return nil
}
func (ch *ClickHouse) tables(acc telegraf.Accumulator, conn *connect) error {
var parts []struct {
Database string `json:"database"`
@ -283,18 +526,11 @@ func (ch *ClickHouse) tables(acc telegraf.Accumulator, conn *connect) error {
Rows chUInt64 `json:"rows"`
}
if err := ch.execQuery(conn.url, systemParts, &parts); err != nil {
if err := ch.execQuery(conn.url, systemPartsSQL, &parts); err != nil {
return err
}
tags := map[string]string{
"source": conn.Hostname,
}
if len(conn.Cluster) != 0 {
tags["cluster"] = conn.Cluster
}
if conn.ShardNum != 0 {
tags["shard_num"] = strconv.Itoa(conn.ShardNum)
}
tags := ch.makeDefaultTags(conn)
for _, part := range parts {
tags["table"] = part.Table
tags["database"] = part.Database
@ -310,6 +546,19 @@ func (ch *ClickHouse) tables(acc telegraf.Accumulator, conn *connect) error {
return nil
}
func (ch *ClickHouse) makeDefaultTags(conn *connect) map[string]string {
tags := map[string]string{
"source": conn.Hostname,
}
if len(conn.Cluster) != 0 {
tags["cluster"] = conn.Cluster
}
if conn.ShardNum != 0 {
tags["shard_num"] = strconv.Itoa(conn.ShardNum)
}
return tags
}
type clickhouseError struct {
StatusCode int
body []byte
@ -330,7 +579,7 @@ func (ch *ClickHouse) execQuery(url *url.URL, query string, i interface{}) error
if ch.Password != "" {
req.Header.Add("X-ClickHouse-Key", ch.Password)
}
resp, err := ch.client.Do(req)
resp, err := ch.HTTPClient.Do(req)
if err != nil {
return err
}
@ -348,7 +597,14 @@ func (ch *ClickHouse) execQuery(url *url.URL, query string, i interface{}) error
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
return err
}
return json.Unmarshal(response.Data, i)
if err := json.Unmarshal(response.Data, i); err != nil {
return err
}
if _, err := io.Copy(ioutil.Discard, resp.Body); err != nil {
return err
}
return nil
}
// see https://clickhouse.yandex/docs/en/operations/settings/settings/#session_settings-output_format_json_quote_64bit_integers
@ -369,7 +625,7 @@ const (
systemEventsSQL = "SELECT event AS metric, CAST(value AS UInt64) AS value FROM system.events"
systemMetricsSQL = "SELECT metric, CAST(value AS UInt64) AS value FROM system.metrics"
systemAsyncMetricsSQL = "SELECT metric, CAST(value AS UInt64) AS value FROM system.asynchronous_metrics"
systemParts = `
systemPartsSQL = `
SELECT
database,
table,
@ -383,6 +639,22 @@ const (
ORDER BY
database, table
`
systemZookeeperExistsSQL = "SELECT count() AS zk_exists FROM system.tables WHERE database='system' AND name='zookeeper'"
systemZookeeperRootNodesSQL = "SELECT count() AS zk_root_nodes FROM system.zookeeper WHERE path='/'"
systemReplicationExistsSQL = "SELECT count() AS replication_queue_exists FROM system.tables WHERE database='system' AND name='replication_queue'"
systemReplicationNumTriesSQL = "SELECT countIf(num_tries>1) AS replication_num_tries_replicas, countIf(num_tries>100) AS replication_too_many_tries_replicas FROM system.replication_queue"
systemDetachedPartsSQL = "SELECT count() AS detached_parts FROM system.detached_parts"
systemDictionariesSQL = "SELECT origin, status, bytes_allocated FROM system.dictionaries"
systemMutationSQL = "SELECT countIf(latest_fail_time>toDateTime('0000-00-00 00:00:00') AND is_done=0) AS failed, countIf(latest_fail_time=toDateTime('0000-00-00 00:00:00') AND is_done=0) AS running, countIf(is_done=1) AS completed FROM system.mutations"
systemDisksSQL = "SELECT name, path, toUInt64(100*free_space / total_space) AS free_space_percent, toUInt64( 100 * keep_free_space / total_space) AS keep_free_space_percent FROM system.disks"
systemProcessesSQL = "SELECT multiIf(positionCaseInsensitive(query,'select')=1,'select',positionCaseInsensitive(query,'insert')=1,'insert','other') AS query_type, quantile\n(0.5)(elapsed) AS p50, quantile(0.9)(elapsed) AS p90, max(elapsed) AS longest_running FROM system.processes GROUP BY query_type"
systemTextLogExistsSQL = "SELECT count() AS text_log_exists FROM system.tables WHERE database='system' AND name='text_log'"
systemTextLogSQL = "SELECT count() AS messages_last_10_min, level FROM system.text_log WHERE level <= 'Notice' AND event_time >= now() - INTERVAL 600 SECOND GROUP BY level"
)
var commonMetrics = map[string]string{

View File

@ -6,6 +6,7 @@ import (
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
@ -121,6 +122,168 @@ func TestGather(t *testing.T) {
},
},
})
case strings.Contains(query, "zk_exists"):
enc.Encode(result{
Data: []struct {
ZkExists chUInt64 `json:"zk_exists"`
}{
{
ZkExists: 1,
},
},
})
case strings.Contains(query, "zk_root_nodes"):
enc.Encode(result{
Data: []struct {
ZkRootNodes chUInt64 `json:"zk_root_nodes"`
}{
{
ZkRootNodes: 2,
},
},
})
case strings.Contains(query, "replication_queue_exists"):
enc.Encode(result{
Data: []struct {
ReplicationQueueExists chUInt64 `json:"replication_queue_exists"`
}{
{
ReplicationQueueExists: 1,
},
},
})
case strings.Contains(query, "replication_too_many_tries_replicas"):
enc.Encode(result{
Data: []struct {
TooManyTriesReplicas chUInt64 `json:"replication_too_many_tries_replicas"`
NumTriesReplicas chUInt64 `json:"replication_num_tries_replicas"`
}{
{
TooManyTriesReplicas: 10,
NumTriesReplicas: 100,
},
},
})
case strings.Contains(query, "system.detached_parts"):
enc.Encode(result{
Data: []struct {
DetachedParts chUInt64 `json:"detached_parts"`
}{
{
DetachedParts: 10,
},
},
})
case strings.Contains(query, "system.dictionaries"):
enc.Encode(result{
Data: []struct {
Origin string `json:"origin"`
Status string `json:"status"`
BytesAllocated chUInt64 `json:"bytes_allocated"`
}{
{
Origin: "default.test_dict",
Status: "NOT_LOADED",
BytesAllocated: 100,
},
},
})
case strings.Contains(query, "system.mutations"):
enc.Encode(result{
Data: []struct {
Failed chUInt64 `json:"failed"`
Completed chUInt64 `json:"completed"`
Running chUInt64 `json:"running"`
}{
{
Failed: 10,
Running: 1,
Completed: 100,
},
},
})
case strings.Contains(query, "system.disks"):
enc.Encode(result{
Data: []struct {
Name string `json:"name"`
Path string `json:"path"`
FreePercent chUInt64 `json:"free_space_percent"`
KeepFreePercent chUInt64 `json:"keep_free_space_percent"`
}{
{
Name: "default",
Path: "/var/lib/clickhouse",
FreePercent: 1,
KeepFreePercent: 10,
},
},
})
case strings.Contains(query, "system.processes"):
enc.Encode(result{
Data: []struct {
QueryType string `json:"query_type"`
Percentile50 float64 `json:"p50"`
Percentile90 float64 `json:"p90"`
LongestRunning float64 `json:"longest_running"`
}{
{
QueryType: "select",
Percentile50: 0.1,
Percentile90: 0.5,
LongestRunning: 10,
},
{
QueryType: "insert",
Percentile50: 0.2,
Percentile90: 1.5,
LongestRunning: 100,
},
{
QueryType: "other",
Percentile50: 0.4,
Percentile90: 4.5,
LongestRunning: 1000,
},
},
})
case strings.Contains(query, "text_log_exists"):
enc.Encode(result{
Data: []struct {
TextLogExists chUInt64 `json:"text_log_exists"`
}{
{
TextLogExists: 1,
},
},
})
case strings.Contains(query, "system.text_log"):
enc.Encode(result{
Data: []struct {
Level string `json:"level"`
LastMessagesLast10Min chUInt64 `json:"messages_last_10_min"`
}{
{
Level: "Fatal",
LastMessagesLast10Min: 0,
},
{
Level: "Critical",
LastMessagesLast10Min: 10,
},
{
Level: "Error",
LastMessagesLast10Min: 20,
},
{
Level: "Warning",
LastMessagesLast10Min: 30,
},
{
Level: "Notice",
LastMessagesLast10Min: 40,
},
},
})
}
}))
ch = &ClickHouse{
@ -133,12 +296,17 @@ func TestGather(t *testing.T) {
defer ts.Close()
ch.Gather(acc)
acc.AssertContainsFields(t, "clickhouse_tables",
acc.AssertContainsTaggedFields(t, "clickhouse_tables",
map[string]interface{}{
"bytes": uint64(1),
"parts": uint64(10),
"rows": uint64(100),
},
map[string]string{
"source": "127.0.0.1",
"table": "test_table",
"database": "test_database",
},
)
acc.AssertContainsFields(t, "clickhouse_events",
map[string]interface{}{
@ -158,4 +326,262 @@ func TestGather(t *testing.T) {
"test_system_asynchronous_metric2": uint64(2000),
},
)
acc.AssertContainsFields(t, "clickhouse_zookeeper",
map[string]interface{}{
"root_nodes": uint64(2),
},
)
acc.AssertContainsFields(t, "clickhouse_replication_queue",
map[string]interface{}{
"too_many_tries_replicas": uint64(10),
"num_tries_replicas": uint64(100),
},
)
acc.AssertContainsFields(t, "clickhouse_detached_parts",
map[string]interface{}{
"detached_parts": uint64(10),
},
)
acc.AssertContainsTaggedFields(t, "clickhouse_dictionaries",
map[string]interface{}{
"is_loaded": uint64(0),
"bytes_allocated": uint64(100),
},
map[string]string{
"source": "127.0.0.1",
"dict_origin": "default.test_dict",
},
)
acc.AssertContainsFields(t, "clickhouse_mutations",
map[string]interface{}{
"running": uint64(1),
"failed": uint64(10),
"completed": uint64(100),
},
)
acc.AssertContainsTaggedFields(t, "clickhouse_disks",
map[string]interface{}{
"free_space_percent": uint64(1),
"keep_free_space_percent": uint64(10),
},
map[string]string{
"source": "127.0.0.1",
"name": "default",
"path": "/var/lib/clickhouse",
},
)
acc.AssertContainsTaggedFields(t, "clickhouse_processes",
map[string]interface{}{
"percentile_50": 0.1,
"percentile_90": 0.5,
"longest_running": float64(10),
},
map[string]string{
"source": "127.0.0.1",
"query_type": "select",
},
)
acc.AssertContainsTaggedFields(t, "clickhouse_processes",
map[string]interface{}{
"percentile_50": 0.2,
"percentile_90": 1.5,
"longest_running": float64(100),
},
map[string]string{
"source": "127.0.0.1",
"query_type": "insert",
},
)
acc.AssertContainsTaggedFields(t, "clickhouse_processes",
map[string]interface{}{
"percentile_50": 0.4,
"percentile_90": 4.5,
"longest_running": float64(1000),
},
map[string]string{
"source": "127.0.0.1",
"query_type": "other",
},
)
for i, level := range []string{"Fatal", "Critical", "Error", "Warning", "Notice"} {
acc.AssertContainsTaggedFields(t, "clickhouse_text_log",
map[string]interface{}{
"messages_last_10_min": uint64(i * 10),
},
map[string]string{
"source": "127.0.0.1",
"level": level,
},
)
}
}
func TestGatherWithSomeTablesNotExists(t *testing.T) {
var (
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
type result struct {
Data interface{} `json:"data"`
}
enc := json.NewEncoder(w)
switch query := r.URL.Query().Get("query"); {
case strings.Contains(query, "zk_exists"):
enc.Encode(result{
Data: []struct {
ZkExists chUInt64 `json:"zk_exists"`
}{
{
ZkExists: 0,
},
},
})
case strings.Contains(query, "replication_queue_exists"):
enc.Encode(result{
Data: []struct {
ReplicationQueueExists chUInt64 `json:"replication_queue_exists"`
}{
{
ReplicationQueueExists: 0,
},
},
})
case strings.Contains(query, "text_log_exists"):
enc.Encode(result{
Data: []struct {
TextLogExists chUInt64 `json:"text_log_exists"`
}{
{
TextLogExists: 0,
},
},
})
}
}))
ch = &ClickHouse{
Servers: []string{
ts.URL,
},
Username: "default",
}
acc = &testutil.Accumulator{}
)
defer ts.Close()
ch.Gather(acc)
acc.AssertDoesNotContainMeasurement(t, "clickhouse_zookeeper")
acc.AssertDoesNotContainMeasurement(t, "clickhouse_replication_queue")
acc.AssertDoesNotContainMeasurement(t, "clickhouse_text_log")
}
func TestWrongJSONMarshalling(t *testing.T) {
var (
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
type result struct {
Data interface{} `json:"data"`
}
enc := json.NewEncoder(w)
//wrong data section json
enc.Encode(result{
Data: []struct{}{},
})
}))
ch = &ClickHouse{
Servers: []string{
ts.URL,
},
Username: "default",
}
acc = &testutil.Accumulator{}
)
defer ts.Close()
ch.Gather(acc)
assert.Equal(t, 0, len(acc.Metrics))
allMeasurements := []string{
"clickhouse_events",
"clickhouse_metrics",
"clickhouse_asynchronous_metrics",
"clickhouse_tables",
"clickhouse_zookeeper",
"clickhouse_replication_queue",
"clickhouse_detached_parts",
"clickhouse_dictionaries",
"clickhouse_mutations",
"clickhouse_disks",
"clickhouse_processes",
"clickhouse_text_log",
}
assert.GreaterOrEqual(t, len(allMeasurements), len(acc.Errors))
}
func TestOfflineServer(t *testing.T) {
var (
acc = &testutil.Accumulator{}
ch = &ClickHouse{
Servers: []string{
"http://wrong-domain.local:8123",
},
Username: "default",
HTTPClient: http.Client{
Timeout: 1 * time.Millisecond,
},
}
)
ch.Gather(acc)
assert.Equal(t, 0, len(acc.Metrics))
allMeasurements := []string{
"clickhouse_events",
"clickhouse_metrics",
"clickhouse_asynchronous_metrics",
"clickhouse_tables",
"clickhouse_zookeeper",
"clickhouse_replication_queue",
"clickhouse_detached_parts",
"clickhouse_dictionaries",
"clickhouse_mutations",
"clickhouse_disks",
"clickhouse_processes",
"clickhouse_text_log",
}
assert.GreaterOrEqual(t, len(allMeasurements), len(acc.Errors))
}
func TestAutoDiscovery(t *testing.T) {
var (
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
type result struct {
Data interface{} `json:"data"`
}
enc := json.NewEncoder(w)
switch query := r.URL.Query().Get("query"); {
case strings.Contains(query, "system.clusters"):
enc.Encode(result{
Data: []struct {
Cluster string `json:"test"`
Hostname string `json:"localhost"`
ShardNum chUInt64 `json:"shard_num"`
}{
{
Cluster: "test_database",
Hostname: "test_table",
ShardNum: 1,
},
},
})
}
}))
ch = &ClickHouse{
Servers: []string{
ts.URL,
},
Username: "default",
AutoDiscovery: true,
}
acc = &testutil.Accumulator{}
)
defer ts.Close()
ch.Gather(acc)
}

View File

@ -2,15 +2,26 @@ version: '3'
services:
clickhouse:
image: yandex/clickhouse-server:latest
# choose `:latest` after resolve https://github.com/ClickHouse/ClickHouse/issues/13057
image: docker.io/yandex/clickhouse-server:${CLICKHOUSE_VERSION:-latest}
volumes:
- ./test_dictionary.xml:/etc/clickhouse-server/01-test_dictionary.xml
- ./zookeeper.xml:/etc/clickhouse-server/config.d/00-zookeeper.xml
- ./tls_settings.xml:/etc/clickhouse-server/config.d/01-tls_settings.xml
# please comment text_log.xml when CLICKHOUSE_VERSION = 19.16
- ./text_log.xml:/etc/clickhouse-server/config.d/02-text_log.xml
- ./part_log.xml:/etc/clickhouse-server/config.d/03-part_log.xml
- ./dhparam.pem:/etc/clickhouse-server/dhparam.pem
- ./tls_settings.xml:/etc/clickhouse-server/config.d/00-tls_settings.xml
- ../../../../testutil/pki/serverkey.pem:/etc/clickhouse-server/server.key
- ../../../../testutil/pki/servercert.pem:/etc/clickhouse-server/server.crt
restart: always
ports:
- 8123:8123
- 8443:8443
- 9000:9000
- 9009:9009
zookeeper:
image: docker.io/zookeeper:3.5.6
volumes:
- /var/lib/zookeeper
ports:
- 2181:2181

View File

@ -0,0 +1,12 @@
<yandex>
<part_log>
<database>system</database>
<table>part_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<!-- 19.16 -->
<partition_by>event_date</partition_by>
<!-- 20.5 -->
<!-- <engine>Engine = MergeTree PARTITION BY event_date ORDER BY event_time TTL event_date + INTERVAL 30 day</engine> -->
</part_log>
</yandex>

View File

@ -2,7 +2,7 @@
[[inputs.clickhouse]]
timeout = 2
user = "default"
username = "default"
servers = ["http://127.0.0.1:8123"]
auto_discovery = true
cluster_include = []

View File

@ -2,7 +2,7 @@
[[inputs.clickhouse]]
timeout = 2
user = "default"
username = "default"
servers = ["https://127.0.0.1:8443"]
auto_discovery = true
cluster_include = []

View File

@ -0,0 +1,63 @@
<!--
CREATE DICTIONARY IF NOT EXISTS default.test_dict1(
nom String,
code String DEFAULT Null,
cur String DEFAULT Null
) PRIMARY KEY nom
SOURCE(
MYSQL(port 9000 host '127.0.0.1' user 'wrong' password 'wrong' db 'default' table 'test')
)
LAYOUT(COMPLEX_KEY_HASHED())
LIFETIME(MIN 300 MAX 600);
-->
<yandex>
<dictionary>
<name>default.test_dict</name>
<structure>
<!-- Complex key configuration -->
<id>
<name>Nom</name>
</id>
<attribute>
<!-- Attribute parameters -->
<attribute>
<name>Nom</name>
<type>String</type>
</attribute>
<attribute>
<name>Code</name>
<type>String</type>
</attribute>
<attribute>
<name>Cur</name>
<type>String</type>
</attribute>
</attribute>
</structure>
<source>
<!-- Source configuration -->
<mysql>
<port>3306</port>
<user>wrong</user>
<password>wrong</password>
<replica>
<host>127.0.0.1</host>
<priority>1</priority>
</replica>
<db>default</db>
<table>test</table>
</mysql>
</source>
<layout>
<!-- Memory layout configuration -->
<complex_key_hashed />
</layout>
<lifetime>
<!-- Lifetime of dictionary in memory -->
</lifetime>
</dictionary>
</yandex>

View File

@ -0,0 +1,12 @@
<yandex>
<text_log>
<level>notice</level>
<database>system</database>
<table>text_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<!-- 19.17 -->
<partition_by>event_date</partition_by>
<!-- 20.5 -->
<!-- <engine>Engine = MergeTree PARTITION BY event_date ORDER BY event_time TTL event_date + INTERVAL 30 day</engine> -->
</text_log>
</yandex>

View File

@ -0,0 +1,19 @@
<yandex>
<zookeeper>
<node>
<host>zookeeper</host>
<port>2181</port>
</node>
</zookeeper>
<remote_servers replace="1">
<test>
<shard>
<internal_replication>1</internal_replication>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
</test>
</remote_servers>
</yandex>