Add support for Logstash 7 'queue' stats from the Pipelines API (#9080)
* LAdd support for logstash 7 'queue' stats for its pipelines stats API * appease the linter * Update samples_logstash7.go
This commit is contained in:
parent
5524acfb78
commit
868befcb5f
|
|
@ -138,10 +138,13 @@ type PipelinePlugins struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type PipelineQueue struct {
|
type PipelineQueue struct {
|
||||||
Events float64 `json:"events"`
|
Events float64 `json:"events"`
|
||||||
Type string `json:"type"`
|
EventsCount *float64 `json:"events_count"`
|
||||||
Capacity interface{} `json:"capacity"`
|
Type string `json:"type"`
|
||||||
Data interface{} `json:"data"`
|
Capacity interface{} `json:"capacity"`
|
||||||
|
Data interface{} `json:"data"`
|
||||||
|
QueueSizeInBytes *float64 `json:"queue_size_in_bytes"`
|
||||||
|
MaxQueueSizeInBytes *float64 `json:"max_queue_size_in_bytes"`
|
||||||
}
|
}
|
||||||
|
|
||||||
const jvmStats = "/_node/stats/jvm"
|
const jvmStats = "/_node/stats/jvm"
|
||||||
|
|
@ -304,8 +307,13 @@ func (logstash *Logstash) gatherQueueStats(
|
||||||
queueTags[tag] = value
|
queueTags[tag] = value
|
||||||
}
|
}
|
||||||
|
|
||||||
|
events := queue.Events
|
||||||
|
if queue.EventsCount != nil {
|
||||||
|
events = *queue.EventsCount
|
||||||
|
}
|
||||||
|
|
||||||
queueFields := map[string]interface{}{
|
queueFields := map[string]interface{}{
|
||||||
"events": queue.Events,
|
"events": events,
|
||||||
}
|
}
|
||||||
|
|
||||||
if queue.Type != "memory" {
|
if queue.Type != "memory" {
|
||||||
|
|
@ -321,6 +329,14 @@ func (logstash *Logstash) gatherQueueStats(
|
||||||
for field, value := range flattener.Fields {
|
for field, value := range flattener.Fields {
|
||||||
queueFields[field] = value
|
queueFields[field] = value
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if queue.MaxQueueSizeInBytes != nil {
|
||||||
|
queueFields["max_queue_size_in_bytes"] = *queue.MaxQueueSizeInBytes
|
||||||
|
}
|
||||||
|
|
||||||
|
if queue.QueueSizeInBytes != nil {
|
||||||
|
queueFields["queue_size_in_bytes"] = *queue.QueueSizeInBytes
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
accumulator.AddFields("logstash_queue", queueFields, queueTags)
|
accumulator.AddFields("logstash_queue", queueFields, queueTags)
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,7 @@ var logstashTest = NewLogstash()
|
||||||
var (
|
var (
|
||||||
logstash5accPipelineStats testutil.Accumulator
|
logstash5accPipelineStats testutil.Accumulator
|
||||||
logstash6accPipelinesStats testutil.Accumulator
|
logstash6accPipelinesStats testutil.Accumulator
|
||||||
|
logstash7accPipelinesStats testutil.Accumulator
|
||||||
logstash5accProcessStats testutil.Accumulator
|
logstash5accProcessStats testutil.Accumulator
|
||||||
logstash6accProcessStats testutil.Accumulator
|
logstash6accProcessStats testutil.Accumulator
|
||||||
logstash5accJVMStats testutil.Accumulator
|
logstash5accJVMStats testutil.Accumulator
|
||||||
|
|
@ -686,3 +687,71 @@ func Test_Logstash6GatherJVMStats(test *testing.T) {
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Test_Logstash7GatherPipelinesQueueStats(test *testing.T) {
|
||||||
|
fakeServer := httptest.NewUnstartedServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
|
||||||
|
writer.Header().Set("Content-Type", "application/json")
|
||||||
|
_, err := fmt.Fprintf(writer, "%s", string(logstash7PipelinesJSON))
|
||||||
|
if err != nil {
|
||||||
|
test.Logf("Can't print test json")
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
requestURL, err := url.Parse(logstashTest.URL)
|
||||||
|
if err != nil {
|
||||||
|
test.Logf("Can't connect to: %s", logstashTest.URL)
|
||||||
|
}
|
||||||
|
fakeServer.Listener, _ = net.Listen("tcp", fmt.Sprintf("%s:%s", requestURL.Hostname(), requestURL.Port()))
|
||||||
|
fakeServer.Start()
|
||||||
|
defer fakeServer.Close()
|
||||||
|
|
||||||
|
if logstashTest.client == nil {
|
||||||
|
client, err := logstashTest.createHTTPClient()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
test.Logf("Can't createHTTPClient")
|
||||||
|
}
|
||||||
|
logstashTest.client = client
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := logstashTest.gatherPipelinesStats(logstashTest.URL+pipelineStats, &logstash7accPipelinesStats); err != nil {
|
||||||
|
test.Logf("Can't gather Pipeline stats")
|
||||||
|
}
|
||||||
|
|
||||||
|
fields := make(map[string]interface{})
|
||||||
|
fields["duration_in_millis"] = float64(3032875.0)
|
||||||
|
fields["queue_push_duration_in_millis"] = float64(13300.0)
|
||||||
|
fields["in"] = float64(2665549.0)
|
||||||
|
fields["filtered"] = float64(2665549.0)
|
||||||
|
fields["out"] = float64(2665549.0)
|
||||||
|
|
||||||
|
logstash7accPipelinesStats.AssertContainsTaggedFields(
|
||||||
|
test,
|
||||||
|
"logstash_events",
|
||||||
|
fields,
|
||||||
|
map[string]string{
|
||||||
|
"node_id": string("28580380-ad2c-4032-934b-76359125edca"),
|
||||||
|
"node_name": string("HOST01.local"),
|
||||||
|
"source": string("HOST01.local"),
|
||||||
|
"node_version": string("7.4.2"),
|
||||||
|
"pipeline": string("infra"),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
logstash7accPipelinesStats.AssertContainsTaggedFields(
|
||||||
|
test,
|
||||||
|
"logstash_queue",
|
||||||
|
map[string]interface{}{
|
||||||
|
"events": float64(0),
|
||||||
|
"max_queue_size_in_bytes": float64(4294967296),
|
||||||
|
"queue_size_in_bytes": float64(32028566),
|
||||||
|
},
|
||||||
|
map[string]string{
|
||||||
|
"node_id": string("28580380-ad2c-4032-934b-76359125edca"),
|
||||||
|
"node_name": string("HOST01.local"),
|
||||||
|
"source": string("HOST01.local"),
|
||||||
|
"node_version": string("7.4.2"),
|
||||||
|
"pipeline": string("infra"),
|
||||||
|
"queue_type": string("persisted"),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,137 @@
|
||||||
|
package logstash
|
||||||
|
|
||||||
|
const logstash7PipelinesJSON = `
|
||||||
|
{
|
||||||
|
"host" : "HOST01.local",
|
||||||
|
"version" : "7.4.2",
|
||||||
|
"http_address" : "127.0.0.1:9600",
|
||||||
|
"id" : "28580380-ad2c-4032-934b-76359125edca",
|
||||||
|
"name" : "HOST01.local",
|
||||||
|
"ephemeral_id" : "bd95ff6b-3fa8-42ae-be32-098a4e4ea1ec",
|
||||||
|
"status" : "green",
|
||||||
|
"snapshot" : true,
|
||||||
|
"pipeline" : {
|
||||||
|
"workers" : 8,
|
||||||
|
"batch_size" : 125,
|
||||||
|
"batch_delay" : 50
|
||||||
|
},
|
||||||
|
"pipelines" : {
|
||||||
|
"infra" : {
|
||||||
|
"events" : {
|
||||||
|
"in" : 2665549,
|
||||||
|
"out" : 2665549,
|
||||||
|
"duration_in_millis" : 3032875,
|
||||||
|
"filtered" : 2665549,
|
||||||
|
"queue_push_duration_in_millis" : 13300
|
||||||
|
},
|
||||||
|
"plugins" : {
|
||||||
|
"inputs" : [ {
|
||||||
|
"id" : "8526dc80bc2257ab08f96018f96b0c68dd03abc5695bb22fb9e96339a8dfb4f86",
|
||||||
|
"events" : {
|
||||||
|
"out" : 2665549,
|
||||||
|
"queue_push_duration_in_millis" : 13300
|
||||||
|
},
|
||||||
|
"peak_connections" : 1,
|
||||||
|
"name" : "beats",
|
||||||
|
"current_connections" : 1
|
||||||
|
} ],
|
||||||
|
"codecs" : [ {
|
||||||
|
"id" : "plain_7312c097-1e7f-41db-983b-4f5a87a9eba2",
|
||||||
|
"encode" : {
|
||||||
|
"duration_in_millis" : 0,
|
||||||
|
"writes_in" : 0
|
||||||
|
},
|
||||||
|
"name" : "plain",
|
||||||
|
"decode" : {
|
||||||
|
"out" : 0,
|
||||||
|
"duration_in_millis" : 0,
|
||||||
|
"writes_in" : 0
|
||||||
|
}
|
||||||
|
}, {
|
||||||
|
"id" : "rubydebug_e958e3dc-10f6-4dd6-b7c5-ae3de2892afb",
|
||||||
|
"encode" : {
|
||||||
|
"duration_in_millis" : 0,
|
||||||
|
"writes_in" : 0
|
||||||
|
},
|
||||||
|
"name" : "rubydebug",
|
||||||
|
"decode" : {
|
||||||
|
"out" : 0,
|
||||||
|
"duration_in_millis" : 0,
|
||||||
|
"writes_in" : 0
|
||||||
|
}
|
||||||
|
}, {
|
||||||
|
"id" : "plain_addb97be-fb77-4cbc-b45c-0424cd5d0ac7",
|
||||||
|
"encode" : {
|
||||||
|
"duration_in_millis" : 0,
|
||||||
|
"writes_in" : 0
|
||||||
|
},
|
||||||
|
"name" : "plain",
|
||||||
|
"decode" : {
|
||||||
|
"out" : 0,
|
||||||
|
"duration_in_millis" : 0,
|
||||||
|
"writes_in" : 0
|
||||||
|
}
|
||||||
|
} ],
|
||||||
|
"filters" : [ {
|
||||||
|
"id" : "9e8297a6ee7b61864f77853317dccde83d29952ef869010c385dcfc9064ab8b8",
|
||||||
|
"events" : {
|
||||||
|
"in" : 2665549,
|
||||||
|
"out" : 2665549,
|
||||||
|
"duration_in_millis" : 8648
|
||||||
|
},
|
||||||
|
"name" : "date",
|
||||||
|
"matches" : 2665549
|
||||||
|
}, {
|
||||||
|
"id" : "bec0c77b3f53a78c7878449c72ec59f97be31c1f12f9621f61ed2d4563bad869",
|
||||||
|
"events" : {
|
||||||
|
"in" : 2665549,
|
||||||
|
"out" : 2665549,
|
||||||
|
"duration_in_millis" : 195138
|
||||||
|
},
|
||||||
|
"name" : "fingerprint"
|
||||||
|
} ],
|
||||||
|
"outputs" : [ {
|
||||||
|
"id" : "df59066a933f038354c1845ba44de692f70dbd0d2009ab07a12b98b776be7e3f",
|
||||||
|
"events" : {
|
||||||
|
"in" : 0,
|
||||||
|
"out" : 0,
|
||||||
|
"duration_in_millis" : 25
|
||||||
|
},
|
||||||
|
"name" : "stdout"
|
||||||
|
}, {
|
||||||
|
"id" : "38967f09bbd2647a95aa00702b6b557bdbbab31da6a04f991d38abe5629779e3",
|
||||||
|
"events" : {
|
||||||
|
"in" : 2665549,
|
||||||
|
"out" : 2665549,
|
||||||
|
"duration_in_millis" : 2802177
|
||||||
|
},
|
||||||
|
"name" : "elasticsearch",
|
||||||
|
"bulk_requests" : {
|
||||||
|
"successes" : 2870,
|
||||||
|
"responses" : {
|
||||||
|
"200" : 2870
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"documents" : {
|
||||||
|
"successes" : 2665549
|
||||||
|
}
|
||||||
|
} ]
|
||||||
|
},
|
||||||
|
"reloads" : {
|
||||||
|
"successes" : 4,
|
||||||
|
"last_error" : null,
|
||||||
|
"failures" : 0,
|
||||||
|
"last_success_timestamp" : "2020-06-05T08:06:12.538Z",
|
||||||
|
"last_failure_timestamp" : null
|
||||||
|
},
|
||||||
|
"queue" : {
|
||||||
|
"type" : "persisted",
|
||||||
|
"events_count" : 0,
|
||||||
|
"queue_size_in_bytes" : 32028566,
|
||||||
|
"max_queue_size_in_bytes" : 4294967296
|
||||||
|
},
|
||||||
|
"hash" : "5bc589ae4b02cb3e436626429b50928b9d99360639c84dc7fc69268ac01a9fd0",
|
||||||
|
"ephemeral_id" : "4bcacefa-6cbf-461e-b14e-184edd9ebdf3"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}`
|
||||||
Loading…
Reference in New Issue