diff --git a/plugins/inputs/ceph/ceph.go b/plugins/inputs/ceph/ceph.go index 4e609aa32..e87c77717 100644 --- a/plugins/inputs/ceph/ceph.go +++ b/plugins/inputs/ceph/ceph.go @@ -44,7 +44,8 @@ type Ceph struct { GatherAdminSocketStats bool `toml:"gather_admin_socket_stats"` GatherClusterStats bool `toml:"gather_cluster_stats"` - Log telegraf.Logger `toml:"-"` + Log telegraf.Logger `toml:"-"` + schemaMaps map[socket]perfSchemaMap } func (*Ceph) SampleConfig() string { @@ -73,7 +74,21 @@ func (c *Ceph) gatherAdminSocketStats(acc telegraf.Accumulator) error { return fmt.Errorf("failed to find sockets at path %q: %w", c.SocketDir, err) } + if c.schemaMaps == nil { + c.schemaMaps = make(map[socket]perfSchemaMap) + } + for _, s := range sockets { + if _, ok := c.schemaMaps[*s]; !ok { + rawSchema, err := perfSchema(c.CephBinary, s) + if err != nil { + c.Log.Warnf("failed to dump perf schema from socket %q: %v", s.socket, err) + } else if schema, err := parseSchema(rawSchema); err != nil { + c.Log.Warnf("failed to parse perf schema from socket %q: %v", s.socket, err) + } else { + c.schemaMaps[*s] = schema + } + } dump, err := perfDump(c.CephBinary, s) if err != nil { acc.AddError(fmt.Errorf("error reading from socket %q: %w", s.socket, err)) @@ -85,9 +100,25 @@ func (c *Ceph) gatherAdminSocketStats(acc telegraf.Accumulator) error { continue } for tag, metrics := range data { - acc.AddFields(measurement, - metrics, - map[string]string{"type": s.sockType, "id": s.sockID, "collection": tag}) + if schema, ok := c.schemaMaps[*s]; ok { + for name, metric := range metrics { + valueType := schema[tag][name] + switch valueType { + case telegraf.Counter: + acc.AddCounter(measurement, + map[string]interface{}{name: metric}, + map[string]string{"type": s.sockType, "id": s.sockID, "collection": tag}) + default: + acc.AddGauge(measurement, + map[string]interface{}{name: metric}, + map[string]string{"type": s.sockType, "id": s.sockID, "collection": tag}) + } + } + } else { + acc.AddFields(measurement, + metrics, + map[string]string{"type": s.sockType, "id": s.sockID, "collection": tag}) + } } } return nil @@ -136,6 +167,37 @@ func init() { }) } +// Run ceph perf schema on the passed socket. The output is a JSON string +// mapping collection names to a map of counter names to information. +// +// The counter information includes the type of the counter, which determines +// the names of the final series produced. For example, a real-integer pair +// valued metric produces three series: sum, avgcount and avgtime; which hold +// the sum of all values, the count of all values and the division of these +// values. +func perfSchema(binary string, socket *socket) (string, error) { + cmdArgs := []string{"--admin-daemon", socket.socket} + + switch socket.sockType { + case typeOsd, typeMds, typeRgw: + cmdArgs = append(cmdArgs, "perf", "schema") + case typeMon: + cmdArgs = append(cmdArgs, "perfcounters_schema") + default: + return "", fmt.Errorf("ignoring unknown socket type: %s", socket.sockType) + } + + cmd := exec.Command(binary, cmdArgs...) + var out bytes.Buffer + cmd.Stdout = &out + err := cmd.Run() + if err != nil { + return "", fmt.Errorf("error running ceph schema: %w", err) + } + + return out.String(), nil +} + var perfDump = func(binary string, socket *socket) (string, error) { cmdArgs := []string{"--admin-daemon", socket.socket} @@ -233,6 +295,60 @@ type metricMap map[string]interface{} type taggedMetricMap map[string]metricMap +// Mask bits for perf counters +const ( + perfCounterNone = 0 + perfCounterTime = 0x1 + perfCounterU64 = 0x2 + perfCounterLongRunAvg = 0x4 + perfCounterCounter = 0x8 + perfCounterHistogram = 0x10 +) + +type rawPerfCounter struct { + TypeMask int `json:"type"` + MetricType string `json:"metric_type"` + ValueType string `json:"value_type"` + Description string `json:"description"` + Nick string `json:"nick"` + Priority int `json:"priority"` + Units string `json:"units"` +} + +type rawCollection map[string]rawPerfCounter + +type perfSchemaMap map[string]map[string]telegraf.ValueType + +// Parses the output of ceph perf schema into a useful format, mapping metrics +// in collections to their Telegraf metric type. This is made a little more +// complicated by the need to expand averages into their component metrics. +func parseSchema(rawSchema string) (perfSchemaMap, error) { + rawMap := make(map[string]rawCollection) + err := json.Unmarshal([]byte(rawSchema), &rawMap) + if err != nil { + return nil, fmt.Errorf("failed to parse json: %q: %w", rawSchema, err) + } + + schemaMap := make(perfSchemaMap) + for collection, counters := range rawMap { + schemaMap[collection] = make(map[string]telegraf.ValueType) + for counter, schema := range counters { + if schema.TypeMask&perfCounterLongRunAvg != 0 { + schemaMap[collection][counter+".sum"] = telegraf.Counter + schemaMap[collection][counter+".avgcount"] = telegraf.Counter + if schema.TypeMask&perfCounterTime != 0 { + schemaMap[collection][counter+".avgtime"] = telegraf.Gauge + } + } else if schema.TypeMask&perfCounterCounter != 0 { + schemaMap[collection][counter] = telegraf.Counter + } else { + schemaMap[collection][counter] = telegraf.Gauge + } + } + } + return schemaMap, nil +} + // Parses a raw JSON string into a taggedMetricMap // Delegates the actual parsing to newTaggedMetricMap(..) func (c *Ceph) parseDump(dump string) (taggedMetricMap, error) { diff --git a/plugins/inputs/ceph/ceph_test.go b/plugins/inputs/ceph/ceph_test.go index 24577e5ec..f269196f1 100644 --- a/plugins/inputs/ceph/ceph_test.go +++ b/plugins/inputs/ceph/ceph_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/testutil" ) @@ -120,6 +121,25 @@ func TestGather(t *testing.T) { require.NoError(t, c.Gather(acc)) } +func TestParseSchema(t *testing.T) { + schemaMap, err := parseSchema(osdRawSchema) + + require.NoError(t, err) + // Test Gauge + require.Equal(t, telegraf.Counter, schemaMap["osd"]["op"], + "op should be a Counter") + // Test Counter + require.Equal(t, telegraf.Gauge, schemaMap["osd"]["op_wip"], + "op_wip should be a Gauge") + // Test LongRunAvg + require.Equal(t, telegraf.Counter, schemaMap["osd"]["op_latency.avgcount"], + "op_latency.avgcount should be a Counter") + require.Equal(t, telegraf.Counter, schemaMap["osd"]["op_latency.sum"], + "op_latency.sum should be a Counter") + require.Equal(t, telegraf.Gauge, schemaMap["osd"]["op_latency.avgtime"], + "op_latency.avgtime should be a Gauge") +} + func TestFindSockets(t *testing.T) { tmpdir := t.TempDir() c := &Ceph{ @@ -1767,6 +1787,74 @@ var rgwPerfDump = ` } } ` +var osdRawSchema = ` +{ "osd": { + "op_wip": { + "type": 2, + "metric_type": "gauge", + "value_type": "integer", + "description": "Replication operations currently being processed (primary)", + "nick": "", + "priority": 5, + "units": "none" + }, + "op": { + "type": 10, + "metric_type": "counter", + "value_type": "integer", + "description": "Client operations", + "nick": "ops", + "priority": 10, + "units": "none" + }, + "op_in_bytes": { + "type": 10, + "metric_type": "counter", + "value_type": "integer", + "description": "Client operations total write size", + "nick": "wr", + "priority": 8, + "units": "bytes" + }, + "op_out_bytes": { + "type": 10, + "metric_type": "counter", + "value_type": "integer", + "description": "Client operations total read size", + "nick": "rd", + "priority": 8, + "units": "bytes" + }, + "op_latency": { + "type": 5, + "metric_type": "gauge", + "value_type": "real-integer-pair", + "description": "Latency of client operations (including queue time)", + "nick": "l", + "priority": 9, + "units": "none" + }, + "op_process_latency": { + "type": 5, + "metric_type": "gauge", + "value_type": "real-integer-pair", + "description": "Latency of client operations (excluding queue time)", + "nick": "", + "priority": 5, + "units": "none" + }, + "op_prepare_latency": { + "type": 5, + "metric_type": "gauge", + "value_type": "real-integer-pair", + "description": "Latency of client operations (excluding queue time and wait for finished)", + "nick": "", + "priority": 5, + "units": "none" + } +} +} +` var clusterStatusDump = ` {