fix: improve Clickhouse corner cases for empty recordset in aggregation queries, fix dictionaries behavior (#9401)
Signed-off-by: Eugene Klimov <eklimov@altinity.com>
This commit is contained in:
parent
7def7e7328
commit
b1930526f8
|
|
@ -261,21 +261,34 @@ func (ch *ClickHouse) clusterIncludeExcludeFilter() string {
|
|||
}
|
||||
|
||||
func (ch *ClickHouse) commonMetrics(acc telegraf.Accumulator, conn *connect, metric string) error {
|
||||
var result []struct {
|
||||
var intResult []struct {
|
||||
Metric string `json:"metric"`
|
||||
Value chUInt64 `json:"value"`
|
||||
}
|
||||
if err := ch.execQuery(conn.url, commonMetrics[metric], &result); err != nil {
|
||||
return err
|
||||
|
||||
var floatResult []struct {
|
||||
Metric string `json:"metric"`
|
||||
Value float64 `json:"value"`
|
||||
}
|
||||
|
||||
tags := ch.makeDefaultTags(conn)
|
||||
|
||||
fields := make(map[string]interface{})
|
||||
for _, r := range result {
|
||||
fields[internal.SnakeCase(r.Metric)] = uint64(r.Value)
|
||||
}
|
||||
|
||||
if commonMetricsIsFloat[metric] {
|
||||
if err := ch.execQuery(conn.url, commonMetrics[metric], &floatResult); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, r := range floatResult {
|
||||
fields[internal.SnakeCase(r.Metric)] = r.Value
|
||||
}
|
||||
} else {
|
||||
if err := ch.execQuery(conn.url, commonMetrics[metric], &intResult); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, r := range intResult {
|
||||
fields[internal.SnakeCase(r.Metric)] = uint64(r.Value)
|
||||
}
|
||||
}
|
||||
acc.AddFields("clickhouse_"+metric, fields, tags)
|
||||
|
||||
return nil
|
||||
|
|
@ -575,7 +588,7 @@ func (ch *ClickHouse) execQuery(address *url.URL, query string, i interface{}) e
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
if resp.StatusCode >= 300 {
|
||||
body, _ := ioutil.ReadAll(io.LimitReader(resp.Body, 200))
|
||||
return &clickhouseError{
|
||||
|
|
@ -614,9 +627,9 @@ func (i *chUInt64) UnmarshalJSON(b []byte) error {
|
|||
}
|
||||
|
||||
const (
|
||||
systemEventsSQL = "SELECT event AS metric, CAST(value AS UInt64) AS value FROM system.events"
|
||||
systemMetricsSQL = "SELECT metric, CAST(value AS UInt64) AS value FROM system.metrics"
|
||||
systemAsyncMetricsSQL = "SELECT metric, CAST(value AS UInt64) AS value FROM system.asynchronous_metrics"
|
||||
systemEventsSQL = "SELECT event AS metric, toUInt64(value) AS value FROM system.events"
|
||||
systemMetricsSQL = "SELECT metric, toUInt64(value) AS value FROM system.metrics"
|
||||
systemAsyncMetricsSQL = "SELECT metric, toFloat64(value) AS value FROM system.asynchronous_metrics"
|
||||
systemPartsSQL = `
|
||||
SELECT
|
||||
database,
|
||||
|
|
@ -635,18 +648,18 @@ const (
|
|||
systemZookeeperRootNodesSQL = "SELECT count() AS zk_root_nodes FROM system.zookeeper WHERE path='/'"
|
||||
|
||||
systemReplicationExistsSQL = "SELECT count() AS replication_queue_exists FROM system.tables WHERE database='system' AND name='replication_queue'"
|
||||
systemReplicationNumTriesSQL = "SELECT countIf(num_tries>1) AS replication_num_tries_replicas, countIf(num_tries>100) AS replication_too_many_tries_replicas FROM system.replication_queue"
|
||||
systemReplicationNumTriesSQL = "SELECT countIf(num_tries>1) AS replication_num_tries_replicas, countIf(num_tries>100) AS replication_too_many_tries_replicas FROM system.replication_queue SETTINGS empty_result_for_aggregation_by_empty_set=0"
|
||||
|
||||
systemDetachedPartsSQL = "SELECT count() AS detached_parts FROM system.detached_parts"
|
||||
systemDetachedPartsSQL = "SELECT count() AS detached_parts FROM system.detached_parts SETTINGS empty_result_for_aggregation_by_empty_set=0"
|
||||
|
||||
systemDictionariesSQL = "SELECT origin, status, bytes_allocated FROM system.dictionaries"
|
||||
|
||||
systemMutationSQL = "SELECT countIf(latest_fail_time>toDateTime('0000-00-00 00:00:00') AND is_done=0) AS failed, countIf(latest_fail_time=toDateTime('0000-00-00 00:00:00') AND is_done=0) AS running, countIf(is_done=1) AS completed FROM system.mutations"
|
||||
systemMutationSQL = "SELECT countIf(latest_fail_time>toDateTime('0000-00-00 00:00:00') AND is_done=0) AS failed, countIf(latest_fail_time=toDateTime('0000-00-00 00:00:00') AND is_done=0) AS running, countIf(is_done=1) AS completed FROM system.mutations SETTINGS empty_result_for_aggregation_by_empty_set=0"
|
||||
systemDisksSQL = "SELECT name, path, toUInt64(100*free_space / total_space) AS free_space_percent, toUInt64( 100 * keep_free_space / total_space) AS keep_free_space_percent FROM system.disks"
|
||||
systemProcessesSQL = "SELECT multiIf(positionCaseInsensitive(query,'select')=1,'select',positionCaseInsensitive(query,'insert')=1,'insert','other') AS query_type, quantile\n(0.5)(elapsed) AS p50, quantile(0.9)(elapsed) AS p90, max(elapsed) AS longest_running FROM system.processes GROUP BY query_type"
|
||||
systemProcessesSQL = "SELECT multiIf(positionCaseInsensitive(query,'select')=1,'select',positionCaseInsensitive(query,'insert')=1,'insert','other') AS query_type, quantile\n(0.5)(elapsed) AS p50, quantile(0.9)(elapsed) AS p90, max(elapsed) AS longest_running FROM system.processes GROUP BY query_type SETTINGS empty_result_for_aggregation_by_empty_set=0"
|
||||
|
||||
systemTextLogExistsSQL = "SELECT count() AS text_log_exists FROM system.tables WHERE database='system' AND name='text_log'"
|
||||
systemTextLogSQL = "SELECT count() AS messages_last_10_min, level FROM system.text_log WHERE level <= 'Notice' AND event_time >= now() - INTERVAL 600 SECOND GROUP BY level"
|
||||
systemTextLogSQL = "SELECT count() AS messages_last_10_min, level FROM system.text_log WHERE level <= 'Notice' AND event_time >= now() - INTERVAL 600 SECOND GROUP BY level SETTINGS empty_result_for_aggregation_by_empty_set=0"
|
||||
)
|
||||
|
||||
var commonMetrics = map[string]string{
|
||||
|
|
@ -655,4 +668,10 @@ var commonMetrics = map[string]string{
|
|||
"asynchronous_metrics": systemAsyncMetricsSQL,
|
||||
}
|
||||
|
||||
var commonMetricsIsFloat = map[string]bool{
|
||||
"events": false,
|
||||
"metrics": false,
|
||||
"asynchronous_metrics": true,
|
||||
}
|
||||
|
||||
var _ telegraf.ServiceInput = &ClickHouse{}
|
||||
|
|
|
|||
|
|
@ -337,8 +337,8 @@ func TestGather(t *testing.T) {
|
|||
)
|
||||
acc.AssertContainsFields(t, "clickhouse_asynchronous_metrics",
|
||||
map[string]interface{}{
|
||||
"test_system_asynchronous_metric": uint64(1000),
|
||||
"test_system_asynchronous_metric2": uint64(2000),
|
||||
"test_system_asynchronous_metric": float64(1000),
|
||||
"test_system_asynchronous_metric2": float64(2000),
|
||||
},
|
||||
)
|
||||
acc.AssertContainsFields(t, "clickhouse_zookeeper",
|
||||
|
|
|
|||
|
|
@ -5,16 +5,19 @@ services:
|
|||
# choose `:latest` after resolve https://github.com/ClickHouse/ClickHouse/issues/13057
|
||||
image: docker.io/yandex/clickhouse-server:${CLICKHOUSE_VERSION:-latest}
|
||||
volumes:
|
||||
- ./init_schema.sql:/docker-entrypoint-initdb.d/init_schema.sql
|
||||
- ./test_dictionary.xml:/etc/clickhouse-server/01-test_dictionary.xml
|
||||
- ./zookeeper.xml:/etc/clickhouse-server/config.d/00-zookeeper.xml
|
||||
- ./tls_settings.xml:/etc/clickhouse-server/config.d/01-tls_settings.xml
|
||||
# please comment text_log.xml when CLICKHOUSE_VERSION = 19.16
|
||||
- ./text_log.xml:/etc/clickhouse-server/config.d/02-text_log.xml
|
||||
- ./part_log.xml:/etc/clickhouse-server/config.d/03-part_log.xml
|
||||
- ./mysql_port.xml:/etc/clickhouse-server/config.d/04-mysql_port.xml
|
||||
- ./dhparam.pem:/etc/clickhouse-server/dhparam.pem
|
||||
- ../../../../testutil/pki/serverkey.pem:/etc/clickhouse-server/server.key
|
||||
- ../../../../testutil/pki/servercert.pem:/etc/clickhouse-server/server.crt
|
||||
ports:
|
||||
- 3306:3306
|
||||
- 8123:8123
|
||||
- 8443:8443
|
||||
- 9000:9000
|
||||
|
|
|
|||
|
|
@ -0,0 +1,6 @@
|
|||
DROP TABLE IF EXISTS default.test;
|
||||
CREATE TABLE default.test(
|
||||
Nom String,
|
||||
Code Nullable(String) DEFAULT Null,
|
||||
Cur Nullable(String) DEFAULT Null
|
||||
) ENGINE=MergeTree() ORDER BY tuple();
|
||||
|
|
@ -0,0 +1,3 @@
|
|||
<yandex>
|
||||
<mysql_port>3306</mysql_port>
|
||||
</yandex>
|
||||
|
|
@ -1,11 +1,11 @@
|
|||
<!--
|
||||
CREATE DICTIONARY IF NOT EXISTS default.test_dict1(
|
||||
nom String,
|
||||
code String DEFAULT Null,
|
||||
cur String DEFAULT Null
|
||||
Nom String,
|
||||
Code Nullable(String) DEFAULT Null,
|
||||
Cur Nullable(String) DEFAULT Null
|
||||
) PRIMARY KEY nom
|
||||
SOURCE(
|
||||
MYSQL(port 9000 host '127.0.0.1' user 'wrong' password 'wrong' db 'default' table 'test')
|
||||
MYSQL(port 3306 host '127.0.0.1' user 'default' password '' db 'default' table 'test')
|
||||
)
|
||||
LAYOUT(COMPLEX_KEY_HASHED())
|
||||
LIFETIME(MIN 300 MAX 600);
|
||||
|
|
@ -16,23 +16,23 @@ LIFETIME(MIN 300 MAX 600);
|
|||
|
||||
<structure>
|
||||
<!-- Complex key configuration -->
|
||||
<id>
|
||||
<name>Nom</name>
|
||||
</id>
|
||||
<attribute>
|
||||
<!-- Attribute parameters -->
|
||||
<key>
|
||||
<attribute>
|
||||
<name>Nom</name>
|
||||
<type>String</type>
|
||||
<null_value></null_value>
|
||||
</attribute>
|
||||
<attribute>
|
||||
<name>Code</name>
|
||||
<type>String</type>
|
||||
</attribute>
|
||||
<attribute>
|
||||
<name>Cur</name>
|
||||
<type>String</type>
|
||||
</attribute>
|
||||
</key>
|
||||
<!-- Attribute parameters -->
|
||||
<attribute>
|
||||
<name>Code</name>
|
||||
<type>String</type>
|
||||
<null_value></null_value>
|
||||
</attribute>
|
||||
<attribute>
|
||||
<name>Cur</name>
|
||||
<type>String</type>
|
||||
<null_value></null_value>
|
||||
</attribute>
|
||||
</structure>
|
||||
|
||||
|
|
@ -40,8 +40,8 @@ LIFETIME(MIN 300 MAX 600);
|
|||
<!-- Source configuration -->
|
||||
<mysql>
|
||||
<port>3306</port>
|
||||
<user>wrong</user>
|
||||
<password>wrong</password>
|
||||
<user>default</user>
|
||||
<password/>
|
||||
<replica>
|
||||
<host>127.0.0.1</host>
|
||||
<priority>1</priority>
|
||||
|
|
@ -56,8 +56,7 @@ LIFETIME(MIN 300 MAX 600);
|
|||
<complex_key_hashed />
|
||||
</layout>
|
||||
|
||||
<lifetime>
|
||||
<!-- Lifetime of dictionary in memory -->
|
||||
</lifetime>
|
||||
<!-- Lifetime of dictionary in memory -->
|
||||
<lifetime>300</lifetime>
|
||||
</dictionary>
|
||||
</yandex>
|
||||
|
|
|
|||
Loading…
Reference in New Issue