fix: add additional logstash output plugin stats (#9707)

This commit is contained in:
John Seekins 2021-09-16 15:19:51 -06:00 committed by GitHub
parent b806ad8848
commit f5a3df429a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 131 additions and 5 deletions

View File

@ -42,6 +42,8 @@ Logstash 5 and later is supported.
### Metrics
Additional plugin stats may be collected (because logstash doesn't consistently expose all stats)
- logstash_jvm
- tags:
- node_id
@ -125,6 +127,10 @@ Logstash 5 and later is supported.
- duration_in_millis
- in
- out
- bulk_requests_failures (for Logstash 7+)
- bulk_requests_with_errors (for Logstash 7+)
- documents_successes (for logstash 7+)
- documents_retryable_failures (for logstash 7+)
- logstash_queue
- tags:

View File

@ -126,9 +126,11 @@ type Pipeline struct {
}
type Plugin struct {
ID string `json:"id"`
Events interface{} `json:"events"`
Name string `json:"name"`
ID string `json:"id"`
Events interface{} `json:"events"`
Name string `json:"name"`
BulkRequests map[string]interface{} `json:"bulk_requests"`
Documents map[string]interface{} `json:"documents"`
}
type PipelinePlugins struct {
@ -290,6 +292,63 @@ func (logstash *Logstash) gatherPluginsStats(
return err
}
accumulator.AddFields("logstash_plugins", flattener.Fields, pluginTags)
/*
The elasticsearch output produces additional stats around
bulk requests and document writes (that are elasticsearch specific).
Collect those here
*/
if pluginType == "output" && plugin.Name == "elasticsearch" {
/*
The "bulk_requests" section has details about batch writes
into Elasticsearch
"bulk_requests" : {
"successes" : 2870,
"responses" : {
"200" : 2870
},
"failures": 262,
"with_errors": 9089
},
*/
flattener := jsonParser.JSONFlattener{}
err := flattener.FlattenJSON("", plugin.BulkRequests)
if err != nil {
return err
}
for k, v := range flattener.Fields {
if strings.HasPrefix(k, "bulk_requests") {
continue
}
newKey := fmt.Sprintf("bulk_requests_%s", k)
flattener.Fields[newKey] = v
delete(flattener.Fields, k)
}
accumulator.AddFields("logstash_plugins", flattener.Fields, pluginTags)
/*
The "documents" section has counts of individual documents
written/retried/etc.
"documents" : {
"successes" : 2665549,
"retryable_failures": 13733
}
*/
flattener = jsonParser.JSONFlattener{}
err = flattener.FlattenJSON("", plugin.Documents)
if err != nil {
return err
}
for k, v := range flattener.Fields {
if strings.HasPrefix(k, "documents") {
continue
}
newKey := fmt.Sprintf("documents_%s", k)
flattener.Fields[newKey] = v
delete(flattener.Fields, k)
}
accumulator.AddFields("logstash_plugins", flattener.Fields, pluginTags)
}
}
return nil

View File

@ -708,6 +708,64 @@ func Test_Logstash7GatherPipelinesQueueStats(test *testing.T) {
},
)
logstash7accPipelinesStats.AssertContainsTaggedFields(
test,
"logstash_plugins",
map[string]interface{}{
"duration_in_millis": float64(2802177.0),
"in": float64(2665549.0),
"out": float64(2665549.0),
},
map[string]string{
"node_id": string("28580380-ad2c-4032-934b-76359125edca"),
"node_name": string("HOST01.local"),
"source": string("HOST01.local"),
"node_version": string("7.4.2"),
"pipeline": string("infra"),
"plugin_name": string("elasticsearch"),
"plugin_id": string("38967f09bbd2647a95aa00702b6b557bdbbab31da6a04f991d38abe5629779e3"),
"plugin_type": string("output"),
},
)
logstash7accPipelinesStats.AssertContainsTaggedFields(
test,
"logstash_plugins",
map[string]interface{}{
"bulk_requests_successes": float64(2870),
"bulk_requests_responses_200": float64(2870),
"bulk_requests_failures": float64(262),
"bulk_requests_with_errors": float64(9089),
},
map[string]string{
"node_id": string("28580380-ad2c-4032-934b-76359125edca"),
"node_name": string("HOST01.local"),
"source": string("HOST01.local"),
"node_version": string("7.4.2"),
"pipeline": string("infra"),
"plugin_name": string("elasticsearch"),
"plugin_id": string("38967f09bbd2647a95aa00702b6b557bdbbab31da6a04f991d38abe5629779e3"),
"plugin_type": string("output"),
},
)
logstash7accPipelinesStats.AssertContainsTaggedFields(
test,
"logstash_plugins",
map[string]interface{}{
"documents_successes": float64(2665549),
"documents_retryable_failures": float64(13733),
},
map[string]string{
"node_id": string("28580380-ad2c-4032-934b-76359125edca"),
"node_name": string("HOST01.local"),
"source": string("HOST01.local"),
"node_version": string("7.4.2"),
"pipeline": string("infra"),
"plugin_name": string("elasticsearch"),
"plugin_id": string("38967f09bbd2647a95aa00702b6b557bdbbab31da6a04f991d38abe5629779e3"),
"plugin_type": string("output"),
},
)
logstash7accPipelinesStats.AssertContainsTaggedFields(
test,
"logstash_queue",

View File

@ -110,10 +110,13 @@ const logstash7PipelinesJSON = `
"successes" : 2870,
"responses" : {
"200" : 2870
}
},
"failures": 262,
"with_errors": 9089
},
"documents" : {
"successes" : 2665549
"successes" : 2665549,
"retryable_failures": 13733
}
} ]
},