feat(inputs.ceph): Use perf schema to determine metric type (#15233)

This commit is contained in:
Alex 2024-05-02 16:24:08 -04:00 committed by GitHub
parent 7ed0364b34
commit 62509daa6c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 208 additions and 4 deletions

View File

@ -44,7 +44,8 @@ type Ceph struct {
GatherAdminSocketStats bool `toml:"gather_admin_socket_stats"`
GatherClusterStats bool `toml:"gather_cluster_stats"`
Log telegraf.Logger `toml:"-"`
Log telegraf.Logger `toml:"-"`
schemaMaps map[socket]perfSchemaMap
}
func (*Ceph) SampleConfig() string {
@ -73,7 +74,21 @@ func (c *Ceph) gatherAdminSocketStats(acc telegraf.Accumulator) error {
return fmt.Errorf("failed to find sockets at path %q: %w", c.SocketDir, err)
}
if c.schemaMaps == nil {
c.schemaMaps = make(map[socket]perfSchemaMap)
}
for _, s := range sockets {
if _, ok := c.schemaMaps[*s]; !ok {
rawSchema, err := perfSchema(c.CephBinary, s)
if err != nil {
c.Log.Warnf("failed to dump perf schema from socket %q: %v", s.socket, err)
} else if schema, err := parseSchema(rawSchema); err != nil {
c.Log.Warnf("failed to parse perf schema from socket %q: %v", s.socket, err)
} else {
c.schemaMaps[*s] = schema
}
}
dump, err := perfDump(c.CephBinary, s)
if err != nil {
acc.AddError(fmt.Errorf("error reading from socket %q: %w", s.socket, err))
@ -85,9 +100,25 @@ func (c *Ceph) gatherAdminSocketStats(acc telegraf.Accumulator) error {
continue
}
for tag, metrics := range data {
acc.AddFields(measurement,
metrics,
map[string]string{"type": s.sockType, "id": s.sockID, "collection": tag})
if schema, ok := c.schemaMaps[*s]; ok {
for name, metric := range metrics {
valueType := schema[tag][name]
switch valueType {
case telegraf.Counter:
acc.AddCounter(measurement,
map[string]interface{}{name: metric},
map[string]string{"type": s.sockType, "id": s.sockID, "collection": tag})
default:
acc.AddGauge(measurement,
map[string]interface{}{name: metric},
map[string]string{"type": s.sockType, "id": s.sockID, "collection": tag})
}
}
} else {
acc.AddFields(measurement,
metrics,
map[string]string{"type": s.sockType, "id": s.sockID, "collection": tag})
}
}
}
return nil
@ -136,6 +167,37 @@ func init() {
})
}
// Run ceph perf schema on the passed socket. The output is a JSON string
// mapping collection names to a map of counter names to information.
//
// The counter information includes the type of the counter, which determines
// the names of the final series produced. For example, a real-integer pair
// valued metric produces three series: sum, avgcount and avgtime; which hold
// the sum of all values, the count of all values and the division of these
// values.
func perfSchema(binary string, socket *socket) (string, error) {
cmdArgs := []string{"--admin-daemon", socket.socket}
switch socket.sockType {
case typeOsd, typeMds, typeRgw:
cmdArgs = append(cmdArgs, "perf", "schema")
case typeMon:
cmdArgs = append(cmdArgs, "perfcounters_schema")
default:
return "", fmt.Errorf("ignoring unknown socket type: %s", socket.sockType)
}
cmd := exec.Command(binary, cmdArgs...)
var out bytes.Buffer
cmd.Stdout = &out
err := cmd.Run()
if err != nil {
return "", fmt.Errorf("error running ceph schema: %w", err)
}
return out.String(), nil
}
var perfDump = func(binary string, socket *socket) (string, error) {
cmdArgs := []string{"--admin-daemon", socket.socket}
@ -233,6 +295,60 @@ type metricMap map[string]interface{}
type taggedMetricMap map[string]metricMap
// Mask bits for perf counters
const (
perfCounterNone = 0
perfCounterTime = 0x1
perfCounterU64 = 0x2
perfCounterLongRunAvg = 0x4
perfCounterCounter = 0x8
perfCounterHistogram = 0x10
)
type rawPerfCounter struct {
TypeMask int `json:"type"`
MetricType string `json:"metric_type"`
ValueType string `json:"value_type"`
Description string `json:"description"`
Nick string `json:"nick"`
Priority int `json:"priority"`
Units string `json:"units"`
}
type rawCollection map[string]rawPerfCounter
type perfSchemaMap map[string]map[string]telegraf.ValueType
// Parses the output of ceph perf schema into a useful format, mapping metrics
// in collections to their Telegraf metric type. This is made a little more
// complicated by the need to expand averages into their component metrics.
func parseSchema(rawSchema string) (perfSchemaMap, error) {
rawMap := make(map[string]rawCollection)
err := json.Unmarshal([]byte(rawSchema), &rawMap)
if err != nil {
return nil, fmt.Errorf("failed to parse json: %q: %w", rawSchema, err)
}
schemaMap := make(perfSchemaMap)
for collection, counters := range rawMap {
schemaMap[collection] = make(map[string]telegraf.ValueType)
for counter, schema := range counters {
if schema.TypeMask&perfCounterLongRunAvg != 0 {
schemaMap[collection][counter+".sum"] = telegraf.Counter
schemaMap[collection][counter+".avgcount"] = telegraf.Counter
if schema.TypeMask&perfCounterTime != 0 {
schemaMap[collection][counter+".avgtime"] = telegraf.Gauge
}
} else if schema.TypeMask&perfCounterCounter != 0 {
schemaMap[collection][counter] = telegraf.Counter
} else {
schemaMap[collection][counter] = telegraf.Gauge
}
}
}
return schemaMap, nil
}
// Parses a raw JSON string into a taggedMetricMap
// Delegates the actual parsing to newTaggedMetricMap(..)
func (c *Ceph) parseDump(dump string) (taggedMetricMap, error) {

View File

@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
)
@ -120,6 +121,25 @@ func TestGather(t *testing.T) {
require.NoError(t, c.Gather(acc))
}
func TestParseSchema(t *testing.T) {
schemaMap, err := parseSchema(osdRawSchema)
require.NoError(t, err)
// Test Gauge
require.Equal(t, telegraf.Counter, schemaMap["osd"]["op"],
"op should be a Counter")
// Test Counter
require.Equal(t, telegraf.Gauge, schemaMap["osd"]["op_wip"],
"op_wip should be a Gauge")
// Test LongRunAvg
require.Equal(t, telegraf.Counter, schemaMap["osd"]["op_latency.avgcount"],
"op_latency.avgcount should be a Counter")
require.Equal(t, telegraf.Counter, schemaMap["osd"]["op_latency.sum"],
"op_latency.sum should be a Counter")
require.Equal(t, telegraf.Gauge, schemaMap["osd"]["op_latency.avgtime"],
"op_latency.avgtime should be a Gauge")
}
func TestFindSockets(t *testing.T) {
tmpdir := t.TempDir()
c := &Ceph{
@ -1767,6 +1787,74 @@ var rgwPerfDump = `
}
}
`
var osdRawSchema = `
{ "osd": {
"op_wip": {
"type": 2,
"metric_type": "gauge",
"value_type": "integer",
"description": "Replication operations currently being processed (primary)",
"nick": "",
"priority": 5,
"units": "none"
},
"op": {
"type": 10,
"metric_type": "counter",
"value_type": "integer",
"description": "Client operations",
"nick": "ops",
"priority": 10,
"units": "none"
},
"op_in_bytes": {
"type": 10,
"metric_type": "counter",
"value_type": "integer",
"description": "Client operations total write size",
"nick": "wr",
"priority": 8,
"units": "bytes"
},
"op_out_bytes": {
"type": 10,
"metric_type": "counter",
"value_type": "integer",
"description": "Client operations total read size",
"nick": "rd",
"priority": 8,
"units": "bytes"
},
"op_latency": {
"type": 5,
"metric_type": "gauge",
"value_type": "real-integer-pair",
"description": "Latency of client operations (including queue time)",
"nick": "l",
"priority": 9,
"units": "none"
},
"op_process_latency": {
"type": 5,
"metric_type": "gauge",
"value_type": "real-integer-pair",
"description": "Latency of client operations (excluding queue time)",
"nick": "",
"priority": 5,
"units": "none"
},
"op_prepare_latency": {
"type": 5,
"metric_type": "gauge",
"value_type": "real-integer-pair",
"description": "Latency of client operations (excluding queue time and wait for finished)",
"nick": "",
"priority": 5,
"units": "none"
}
}
}
`
var clusterStatusDump = `
{