feat(inptus.elasticsearch): Gather enrich stats (#15688)

This commit is contained in:
Joshua Powers 2024-07-31 13:53:42 -06:00 committed by GitHub
parent 2cab6ec7ce
commit a21d263c4c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 155 additions and 2 deletions

View File

@ -74,6 +74,9 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## To work this require local = true
cluster_stats_only_from_master = true
## Gather stats from the enrich API
# enrich_stats = false
## Indices to collect; can be one or more indices names or _all
## Use of wildcards is allowed. Use a wildcard at the end to retrieve index
## names that end with a changing value, like a date.
@ -98,7 +101,7 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## If 'use_system_proxy' is set to true, Telegraf will check env vars such as
## HTTP_PROXY, HTTPS_PROXY, and NO_PROXY (or their lowercase counterparts).
## If 'use_system_proxy' is set to false (default) and 'http_proxy_url' is

View File

@ -68,6 +68,23 @@ type clusterHealth struct {
Indices map[string]indexHealth `json:"indices"`
}
type enrichStats struct {
CoordinatorStats []struct {
NodeID string `json:"node_id"`
QueueSize int `json:"queue_size"`
RemoteRequestsCurrent int `json:"remote_requests_current"`
RemoteRequestsTotal int `json:"remote_requests_total"`
ExecutedSearchesTotal int `json:"executed_searches_total"`
} `json:"coordinator_stats"`
CacheStats []struct {
NodeID string `json:"node_id"`
Count int `json:"count"`
Hits int64 `json:"hits"`
Misses int `json:"misses"`
Evictions int `json:"evictions"`
} `json:"cache_stats"`
}
type indexHealth struct {
ActivePrimaryShards int `json:"active_primary_shards"`
ActiveShards int `json:"active_shards"`
@ -104,6 +121,7 @@ type Elasticsearch struct {
ClusterHealthLevel string `toml:"cluster_health_level"`
ClusterStats bool `toml:"cluster_stats"`
ClusterStatsOnlyFromMaster bool `toml:"cluster_stats_only_from_master"`
EnrichStats bool `toml:"enrich_stats"`
IndicesInclude []string `toml:"indices_include"`
IndicesLevel string `toml:"indices_level"`
NodeStats []string `toml:"node_stats"`
@ -280,6 +298,13 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
}
}
}
if e.EnrichStats {
if err := e.gatherEnrichStats(s+"/_enrich/_stats", acc); err != nil {
acc.AddError(errors.New(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
return
}
}
}(serv, acc)
}
@ -440,6 +465,46 @@ func (e *Elasticsearch) gatherClusterHealth(url string, acc telegraf.Accumulator
return nil
}
func (e *Elasticsearch) gatherEnrichStats(url string, acc telegraf.Accumulator) error {
enrichStats := &enrichStats{}
if err := e.gatherJSONData(url, enrichStats); err != nil {
return err
}
measurementTime := time.Now()
for _, coordinator := range enrichStats.CoordinatorStats {
coordinatorFields := map[string]interface{}{
"queue_size": coordinator.QueueSize,
"remote_requests_current": coordinator.RemoteRequestsCurrent,
"remote_requests_total": coordinator.RemoteRequestsTotal,
"executed_searches_total": coordinator.ExecutedSearchesTotal,
}
acc.AddFields(
"elasticsearch_enrich_stats_coordinator",
coordinatorFields,
map[string]string{"node_id": coordinator.NodeID},
measurementTime,
)
}
for _, cache := range enrichStats.CacheStats {
cacheFields := map[string]interface{}{
"count": cache.Count,
"hits": cache.Hits,
"misses": cache.Misses,
"evictions": cache.Evictions,
}
acc.AddFields(
"elasticsearch_enrich_stats_cache",
cacheFields,
map[string]string{"node_id": cache.NodeID},
measurementTime,
)
}
return nil
}
func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator) error {
clusterStats := &clusterStats{}
if err := e.gatherJSONData(url, clusterStats); err != nil {

View File

@ -100,6 +100,22 @@ func TestGatherIndividualStats(t *testing.T) {
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_breakers", nodestatsBreakersExpected, tags)
}
func TestGatherEnrichStats(t *testing.T) {
es := newElasticsearchWithClient()
es.Servers = []string{"http://example.com:9200"}
es.EnrichStats = true
es.client.Transport = newTransportMock(enrichStatsResponse)
es.serverInfo = make(map[string]serverInfo)
es.serverInfo["http://example.com:9200"] = defaultServerInfo()
var acc testutil.Accumulator
require.NoError(t, acc.GatherError(es.Gather))
require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")
metrics := acc.GetTelegrafMetrics()
require.Len(t, metrics, 8)
}
func TestGatherNodeStats(t *testing.T) {
es := newElasticsearchWithClient()
es.Servers = []string{"http://example.com:9200"}

View File

@ -33,6 +33,9 @@
## To work this require local = true
cluster_stats_only_from_master = true
## Gather stats from the enrich API
# enrich_stats = false
## Indices to collect; can be one or more indices names or _all
## Use of wildcards is allowed. Use a wildcard at the end to retrieve index
## names that end with a changing value, like a date.
@ -57,7 +60,7 @@
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## If 'use_system_proxy' is set to true, Telegraf will check env vars such as
## HTTP_PROXY, HTTPS_PROXY, and NO_PROXY (or their lowercase counterparts).
## If 'use_system_proxy' is set to false (default) and 'http_proxy_url' is

View File

@ -62,6 +62,72 @@ const clusterHealthResponseWithIndices = `
}
`
const enrichStatsResponse = `
{
"executing_policies": [],
"coordinator_stats": [
{
"node_id": "RWkDKDRu_aV1fISRA7PIkg",
"queue_size": 0,
"remote_requests_current": 0,
"remote_requests_total": 101636700,
"executed_searches_total": 102230925
},
{
"node_id": "2BOvel8nrXRjmSMAMBSUp3",
"queue_size": 0,
"remote_requests_current": 0,
"remote_requests_total": 242051423,
"executed_searches_total": 242752071
},
{
"node_id": "smkOUPQOK1pymt8MCoglZJ",
"queue_size": 0,
"remote_requests_current": 0,
"remote_requests_total": 248009084,
"executed_searches_total": 248735550
},
{
"node_id": "g5EUAaS-6-z5w27OtGQeTI",
"queue_size": 0,
"remote_requests_current": 0,
"remote_requests_total": 233693129,
"executed_searches_total": 234476004
}
],
"cache_stats": [
{
"node_id": "RWkDKDRu_aV1fISRA7PIkg",
"count": 2500,
"hits": 6044497858,
"misses": 102230925,
"evictions": 92663663
},
{
"node_id": "2BOvel8nrXRjmSMAMBSUp3",
"count": 2500,
"hits": 14640821136,
"misses": 242752071,
"evictions": 226826313
},
{
"node_id": "smkOUPQOK1pymt8MCoglZJ",
"count": 2500,
"hits": 14145580115,
"misses": 248735550,
"evictions": 233860968
},
{
"node_id": "g5EUAaS-6-z5w27OtGQeTI",
"count": 2500,
"hits": 11016000946,
"misses": 234476004,
"evictions": 217698127
}
]
}
`
var clusterHealthExpected = map[string]interface{}{
"status": "green",
"status_code": 1,