feat: allow other fluentd metrics apart from retry_count, buffer_queu… (#11056)
This commit is contained in:
parent
442728b03e
commit
d3abbc0897
|
|
@ -40,9 +40,20 @@ example configuration with `@id` parameter for http plugin:
|
|||
Fields may vary depending on the plugin type
|
||||
|
||||
- fluentd
|
||||
- retry_count (float, unit)
|
||||
- buffer_queue_length (float, unit)
|
||||
- retry_count (float, unit)
|
||||
- buffer_queue_length (float, unit)
|
||||
- buffer_total_queued_size (float, unit)
|
||||
- rollback_count (float, unit)
|
||||
- flush_time_count (float, unit)
|
||||
- slow_flush_count (float, unit)
|
||||
- emit_count (float, unit)
|
||||
- emit_records (float, unit)
|
||||
- emit_size (float, unit)
|
||||
- write_count (float, unit)
|
||||
- buffer_stage_length (float, unit)
|
||||
- buffer_queue_byte_size (float, unit)
|
||||
- buffer_stage_byte_size (float, unit)
|
||||
- buffer_available_buffer_space_ratios (float, unit)
|
||||
|
||||
## Tags
|
||||
|
||||
|
|
@ -61,5 +72,5 @@ $ telegraf --config fluentd.conf --input-filter fluentd --test
|
|||
> fluentd,plugin_id=object:820190,plugin_category=input,plugin_type=monitor_agent,host=T440s retry_count=0,buffer_total_queued_size=0,buffer_queue_length=0 1492006105000000000
|
||||
> fluentd,plugin_id=object:c5e054,plugin_category=output,plugin_type=stdout,host=T440s buffer_queue_length=0,retry_count=0,buffer_total_queued_size=0 1492006105000000000
|
||||
> fluentd,plugin_type=s3,host=T440s,plugin_id=object:bd7a90,plugin_category=output buffer_queue_length=0,retry_count=0,buffer_total_queued_size=0 1492006105000000000
|
||||
|
||||
> fluentd,plugin_id=output_td, plugin_category=output,plugin_type=tdlog, host=T440s buffer_available_buffer_space_ratios=100,buffer_queue_byte_size=0,buffer_queue_length=0,buffer_stage_byte_size=0,buffer_stage_length=0,buffer_total_queued_size=0,emit_count=0,emit_records=0,flush_time_count=0,retry_count=0,rollback_count=0,slow_flush_count=0,write_count=0 1651474085000000000
|
||||
```
|
||||
|
|
|
|||
|
|
@ -26,12 +26,23 @@ type endpointInfo struct {
|
|||
}
|
||||
|
||||
type pluginData struct {
|
||||
PluginID string `json:"plugin_id"`
|
||||
PluginType string `json:"type"`
|
||||
PluginCategory string `json:"plugin_category"`
|
||||
RetryCount *float64 `json:"retry_count"`
|
||||
BufferQueueLength *float64 `json:"buffer_queue_length"`
|
||||
BufferTotalQueuedSize *float64 `json:"buffer_total_queued_size"`
|
||||
PluginID string `json:"plugin_id"`
|
||||
PluginType string `json:"type"`
|
||||
PluginCategory string `json:"plugin_category"`
|
||||
RetryCount *float64 `json:"retry_count"`
|
||||
BufferQueueLength *float64 `json:"buffer_queue_length"`
|
||||
BufferTotalQueuedSize *float64 `json:"buffer_total_queued_size"`
|
||||
RollbackCount *float64 `json:"rollback_count"`
|
||||
EmitRecords *float64 `json:"emit_records"`
|
||||
EmitSize *float64 `json:"emit_size"`
|
||||
EmitCount *float64 `json:"emit_count"`
|
||||
WriteCount *float64 `json:"write_count"`
|
||||
SlowFlushCount *float64 `json:"slow_flush_count"`
|
||||
FlushTimeCount *float64 `json:"flush_time_count"`
|
||||
BufferStageLength *float64 `json:"buffer_stage_length"`
|
||||
BufferStageByteSize *float64 `json:"buffer_stage_byte_size"`
|
||||
BufferQueueByteSize *float64 `json:"buffer_queue_byte_size"`
|
||||
AvailBufferSpaceRatios *float64 `json:"buffer_available_buffer_space_ratios"`
|
||||
}
|
||||
|
||||
// parse JSON from fluentd Endpoint
|
||||
|
|
@ -121,6 +132,7 @@ func (h *Fluentd) Gather(acc telegraf.Accumulator) error {
|
|||
if p.BufferQueueLength != nil {
|
||||
tmpFields["buffer_queue_length"] = *p.BufferQueueLength
|
||||
}
|
||||
|
||||
if p.RetryCount != nil {
|
||||
tmpFields["retry_count"] = *p.RetryCount
|
||||
}
|
||||
|
|
@ -129,7 +141,64 @@ func (h *Fluentd) Gather(acc telegraf.Accumulator) error {
|
|||
tmpFields["buffer_total_queued_size"] = *p.BufferTotalQueuedSize
|
||||
}
|
||||
|
||||
if !((p.BufferQueueLength == nil) && (p.RetryCount == nil) && (p.BufferTotalQueuedSize == nil)) {
|
||||
if p.RollbackCount != nil {
|
||||
tmpFields["rollback_count"] = *p.RollbackCount
|
||||
}
|
||||
|
||||
if p.EmitRecords != nil {
|
||||
tmpFields["emit_records"] = *p.EmitRecords
|
||||
}
|
||||
|
||||
if p.EmitCount != nil {
|
||||
tmpFields["emit_count"] = *p.EmitCount
|
||||
}
|
||||
|
||||
if p.EmitSize != nil {
|
||||
tmpFields["emit_size"] = *p.EmitSize
|
||||
}
|
||||
|
||||
if p.WriteCount != nil {
|
||||
tmpFields["write_count"] = *p.WriteCount
|
||||
}
|
||||
|
||||
if p.SlowFlushCount != nil {
|
||||
tmpFields["slow_flush_count"] = *p.SlowFlushCount
|
||||
}
|
||||
|
||||
if p.FlushTimeCount != nil {
|
||||
tmpFields["flush_time_count"] = *p.FlushTimeCount
|
||||
}
|
||||
|
||||
if p.BufferStageLength != nil {
|
||||
tmpFields["buffer_stage_length"] = *p.BufferStageLength
|
||||
}
|
||||
|
||||
if p.BufferStageByteSize != nil {
|
||||
tmpFields["buffer_stage_byte_size"] = *p.BufferStageByteSize
|
||||
}
|
||||
|
||||
if p.BufferQueueByteSize != nil {
|
||||
tmpFields["buffer_queue_byte_size"] = *p.BufferQueueByteSize
|
||||
}
|
||||
|
||||
if p.AvailBufferSpaceRatios != nil {
|
||||
tmpFields["buffer_available_buffer_space_ratios"] = *p.AvailBufferSpaceRatios
|
||||
}
|
||||
|
||||
if !((p.BufferQueueLength == nil) &&
|
||||
(p.RetryCount == nil) &&
|
||||
(p.BufferTotalQueuedSize == nil) &&
|
||||
(p.EmitCount == nil) &&
|
||||
(p.EmitRecords == nil) &&
|
||||
(p.EmitSize == nil) &&
|
||||
(p.WriteCount == nil) &&
|
||||
(p.FlushTimeCount == nil) &&
|
||||
(p.SlowFlushCount == nil) &&
|
||||
(p.RollbackCount == nil) &&
|
||||
(p.BufferStageLength == nil) &&
|
||||
(p.BufferStageByteSize == nil) &&
|
||||
(p.BufferQueueByteSize == nil) &&
|
||||
(p.AvailBufferSpaceRatios == nil)) {
|
||||
acc.AddFields(measurement, tmpFields, tmpTags)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -88,8 +88,53 @@ const sampleJSON = `
|
|||
},
|
||||
"output_plugin": true,
|
||||
"buffer_queue_length": 0,
|
||||
"retry_count": 0,
|
||||
"buffer_total_queued_size": 0
|
||||
},
|
||||
{
|
||||
"plugin_id": "object:output_td_1",
|
||||
"plugin_category": "output",
|
||||
"type": "tdlog",
|
||||
"config": {
|
||||
"@type": "tdlog",
|
||||
"@id": "output_td",
|
||||
"apikey": "xxxxxx",
|
||||
"auto_create_table": ""
|
||||
},
|
||||
"output_plugin": true,
|
||||
"buffer_queue_length": 0,
|
||||
"buffer_total_queued_size": 0,
|
||||
"retry_count": 0
|
||||
"retry_count": 0,
|
||||
"emit_records": 0,
|
||||
"emit_size": 0,
|
||||
"emit_count": 0,
|
||||
"write_count": 0,
|
||||
"rollback_count": 0,
|
||||
"slow_flush_count": 0,
|
||||
"flush_time_count": 0,
|
||||
"buffer_stage_length": 0,
|
||||
"buffer_stage_byte_size": 0,
|
||||
"buffer_queue_byte_size": 0,
|
||||
"buffer_available_buffer_space_ratios": 0
|
||||
},
|
||||
{
|
||||
"plugin_id": "object:output_td_2",
|
||||
"plugin_category": "output",
|
||||
"type": "tdlog",
|
||||
"config": {
|
||||
"@type": "tdlog",
|
||||
"@id": "output_td",
|
||||
"apikey": "xxxxxx",
|
||||
"auto_create_table": ""
|
||||
},
|
||||
"output_plugin": true,
|
||||
"buffer_queue_length": 0,
|
||||
"buffer_total_queued_size": 0,
|
||||
"retry_count": 0,
|
||||
"rollback_count": 0,
|
||||
"emit_records": 0,
|
||||
"slow_flush_count": 0,
|
||||
"buffer_available_buffer_space_ratios": 0
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
@ -101,8 +146,10 @@ var (
|
|||
// {"object:f48698", "dummy", "input", nil, nil, nil},
|
||||
// {"object:e27138", "dummy", "input", nil, nil, nil},
|
||||
// {"object:d74060", "monitor_agent", "input", nil, nil, nil},
|
||||
{"object:11a5e2c", "stdout", "output", &zero, nil, nil},
|
||||
{"object:11237ec", "s3", "output", &zero, &zero, &zero},
|
||||
{"object:11a5e2c", "stdout", "output", &zero, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil},
|
||||
{"object:11237ec", "s3", "output", &zero, &zero, &zero, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil},
|
||||
{"object:output_td_1", "tdlog", "output", &zero, &zero, &zero, &zero, &zero, &zero, &zero, &zero, &zero, &zero, &zero, &zero, &zero, &zero},
|
||||
{"object:output_td_2", "tdlog", "output", &zero, &zero, &zero, &zero, &zero, nil, nil, nil, &zero, nil, nil, nil, nil, &zero},
|
||||
}
|
||||
fluentdTest = &Fluentd{
|
||||
Endpoint: "http://localhost:8081",
|
||||
|
|
@ -111,6 +158,7 @@ var (
|
|||
|
||||
func Test_parse(t *testing.T) {
|
||||
t.Log("Testing parser function")
|
||||
t.Logf("JSON (%s) ", sampleJSON)
|
||||
_, err := parse([]byte(sampleJSON))
|
||||
|
||||
if err != nil {
|
||||
|
|
@ -160,4 +208,33 @@ func Test_Gather(t *testing.T) {
|
|||
require.Equal(t, *expectedOutput[1].RetryCount, acc.Metrics[1].Fields["retry_count"])
|
||||
require.Equal(t, *expectedOutput[1].BufferQueueLength, acc.Metrics[1].Fields["buffer_queue_length"])
|
||||
require.Equal(t, *expectedOutput[1].BufferTotalQueuedSize, acc.Metrics[1].Fields["buffer_total_queued_size"])
|
||||
|
||||
require.Equal(t, expectedOutput[2].PluginID, acc.Metrics[2].Tags["plugin_id"])
|
||||
require.Equal(t, expectedOutput[2].PluginType, acc.Metrics[2].Tags["plugin_type"])
|
||||
require.Equal(t, expectedOutput[2].PluginCategory, acc.Metrics[2].Tags["plugin_category"])
|
||||
require.Equal(t, *expectedOutput[2].RetryCount, acc.Metrics[2].Fields["retry_count"])
|
||||
require.Equal(t, *expectedOutput[2].BufferQueueLength, acc.Metrics[2].Fields["buffer_queue_length"])
|
||||
require.Equal(t, *expectedOutput[2].BufferTotalQueuedSize, acc.Metrics[2].Fields["buffer_total_queued_size"])
|
||||
require.Equal(t, *expectedOutput[2].EmitRecords, acc.Metrics[2].Fields["emit_records"])
|
||||
require.Equal(t, *expectedOutput[2].EmitSize, acc.Metrics[2].Fields["emit_size"])
|
||||
require.Equal(t, *expectedOutput[2].EmitCount, acc.Metrics[2].Fields["emit_count"])
|
||||
require.Equal(t, *expectedOutput[2].RollbackCount, acc.Metrics[2].Fields["rollback_count"])
|
||||
require.Equal(t, *expectedOutput[2].SlowFlushCount, acc.Metrics[2].Fields["slow_flush_count"])
|
||||
require.Equal(t, *expectedOutput[2].WriteCount, acc.Metrics[2].Fields["write_count"])
|
||||
require.Equal(t, *expectedOutput[2].FlushTimeCount, acc.Metrics[2].Fields["flush_time_count"])
|
||||
require.Equal(t, *expectedOutput[2].BufferStageLength, acc.Metrics[2].Fields["buffer_stage_length"])
|
||||
require.Equal(t, *expectedOutput[2].BufferStageByteSize, acc.Metrics[2].Fields["buffer_stage_byte_size"])
|
||||
require.Equal(t, *expectedOutput[2].BufferQueueByteSize, acc.Metrics[2].Fields["buffer_queue_byte_size"])
|
||||
require.Equal(t, *expectedOutput[2].AvailBufferSpaceRatios, acc.Metrics[2].Fields["buffer_available_buffer_space_ratios"])
|
||||
|
||||
require.Equal(t, expectedOutput[3].PluginID, acc.Metrics[3].Tags["plugin_id"])
|
||||
require.Equal(t, expectedOutput[3].PluginType, acc.Metrics[3].Tags["plugin_type"])
|
||||
require.Equal(t, expectedOutput[3].PluginCategory, acc.Metrics[3].Tags["plugin_category"])
|
||||
require.Equal(t, *expectedOutput[3].RetryCount, acc.Metrics[3].Fields["retry_count"])
|
||||
require.Equal(t, *expectedOutput[3].BufferQueueLength, acc.Metrics[3].Fields["buffer_queue_length"])
|
||||
require.Equal(t, *expectedOutput[3].BufferTotalQueuedSize, acc.Metrics[3].Fields["buffer_total_queued_size"])
|
||||
require.Equal(t, *expectedOutput[3].EmitRecords, acc.Metrics[3].Fields["emit_records"])
|
||||
require.Equal(t, *expectedOutput[3].RollbackCount, acc.Metrics[3].Fields["rollback_count"])
|
||||
require.Equal(t, *expectedOutput[3].SlowFlushCount, acc.Metrics[3].Fields["slow_flush_count"])
|
||||
require.Equal(t, *expectedOutput[3].AvailBufferSpaceRatios, acc.Metrics[3].Fields["buffer_available_buffer_space_ratios"])
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue