feat(inputs.lustre2): Add support for bulk read/write stats (#14813)

This commit is contained in:
Dane Strandboge 2024-03-18 02:37:53 -05:00 committed by GitHub
parent 19ac5e8081
commit 0f91ca6f67
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 290 additions and 20 deletions

View File

@ -22,18 +22,22 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
# This plugin ONLY supports Linux
[[inputs.lustre2]]
## An array of /proc globs to search for Lustre stats
## If not specified, the default will work on Lustre 2.5.x
## If not specified, the default will work on Lustre 2.12.x
##
# ost_procfiles = [
# "/proc/fs/lustre/obdfilter/*/stats",
# "/proc/fs/lustre/osd-ldiskfs/*/stats",
# "/proc/fs/lustre/obdfilter/*/job_stats",
# "/proc/fs/lustre/obdfilter/*/exports/*/stats",
# "/proc/fs/lustre/osd-ldiskfs/*/brw_stats",
# "/proc/fs/lustre/osd-zfs/*/brw_stats",
# ]
# mds_procfiles = [
# "/proc/fs/lustre/mdt/*/md_stats",
# "/proc/fs/lustre/mdt/*/job_stats",
# "/proc/fs/lustre/mdt/*/exports/*/stats",
# "/proc/fs/lustre/osd-ldiskfs/*/brw_stats",
# "/proc/fs/lustre/osd-zfs/*/brw_stats",
# ]
```

View File

@ -9,6 +9,7 @@ package lustre2
import (
_ "embed"
"fmt"
"os"
"path/filepath"
"regexp"
@ -23,7 +24,7 @@ import (
var sampleConfig string
type tags struct {
name, job, client string
name, brwSection, bucket, job, client string
}
// Lustre proc files can change between versions, so we want to future-proof
@ -32,7 +33,7 @@ type Lustre2 struct {
OstProcfiles []string `toml:"ost_procfiles"`
MdsProcfiles []string `toml:"mds_procfiles"`
// allFields maps and OST name to the metric fields associated with that OST
// allFields maps an OST name to the metric fields associated with that OST
allFields map[tags]map[string]interface{}
}
@ -49,6 +50,29 @@ type mapping struct {
reportAs string // What measurement name to use
}
var wantedBrwstatsFields = []*mapping{
{
inProc: "pages per bulk r/w",
reportAs: "pages_per_bulk_rw",
},
{
inProc: "discontiguous pages",
reportAs: "discontiguous_pages",
},
{
inProc: "disk I/Os in flight",
reportAs: "disk_ios_in_flight",
},
{
inProc: "I/O time (1/1000s)",
reportAs: "io_time",
},
{
inProc: "disk I/O size",
reportAs: "disk_io_size",
},
}
var wantedOstFields = []*mapping{
{
inProc: "write_bytes",
@ -408,10 +432,10 @@ func (l *Lustre2) GetLustreProcStats(fileglob string, wantedFields []*mapping) e
}
var fields map[string]interface{}
fields, ok := l.allFields[tags{name, jobid, client}]
fields, ok := l.allFields[tags{name, "", "", jobid, client}]
if !ok {
fields = make(map[string]interface{})
l.allFields[tags{name, jobid, client}] = fields
l.allFields[tags{name, "", "", jobid, client}] = fields
}
for _, wanted := range wantedFields {
@ -440,6 +464,98 @@ func (l *Lustre2) GetLustreProcStats(fileglob string, wantedFields []*mapping) e
return nil
}
func (l *Lustre2) getLustreProcBrwStats(fileglob string, wantedFields []*mapping) error {
files, err := filepath.Glob(fileglob)
if err != nil {
return fmt.Errorf("failed to find files matching glob %s: %w", fileglob, err)
}
for _, file := range files {
// Turn /proc/fs/lustre/obdfilter/<ost_name>/stats and similar into just the object store target name
// This assumes that the target name is always second to last, which is true in Lustre 2.1->2.12
path := strings.Split(file, "/")
if len(path) < 2 {
continue
}
name := path[len(path)-2]
wholeFile, err := os.ReadFile(file)
if err != nil {
return fmt.Errorf("failed to read file %s: %w", file, err)
}
lines := strings.Split(string(wholeFile), "\n")
var headerName string
for _, line := range lines {
// There are four types of lines in a brw_stats file:
// 1. Header lines - contain the category of metric (e.g. disk I/Os in flight, disk I/O time)
// 2. Bucket lines - follow headers, contain the bucket value (e.g. 4K, 1M) and metric values
// 3. Empty lines - these will simply be filtered out
// 4. snapshot_time line - this will be filtered out, as it "looks" like a bucket line
if len(line) < 1 {
continue
}
parts := strings.Fields(line)
// This is a header line
// Set report name for use by the buckets that follow
if !strings.Contains(parts[0], ":") {
nameParts := strings.Split(line, " ")
headerName = nameParts[0]
continue
}
// snapshot_time should be discarded
if strings.Contains(parts[0], "snapshot_time") {
continue
}
// This is a bucket for a given header
for _, wanted := range wantedFields {
if headerName != wanted.inProc {
continue
}
bucket := strings.TrimSuffix(parts[0], ":")
// brw_stats columns are static and don't need configurable fields
readIos, err := strconv.ParseUint(parts[1], 10, 64)
if err != nil {
return fmt.Errorf("failed to parse read_ios: %w", err)
}
readPercent, err := strconv.ParseUint(parts[2], 10, 64)
if err != nil {
return fmt.Errorf("failed to parse read_percent: %w", err)
}
writeIos, err := strconv.ParseUint(parts[5], 10, 64)
if err != nil {
return fmt.Errorf("failed to parse write_ios: %w", err)
}
writePercent, err := strconv.ParseUint(parts[6], 10, 64)
if err != nil {
return fmt.Errorf("failed to parse write_percent: %w", err)
}
reportName := headerName
if wanted.reportAs != "" {
reportName = wanted.reportAs
}
tag := tags{name, reportName, bucket, "", ""}
fields, ok := l.allFields[tag]
if !ok {
fields = make(map[string]interface{})
l.allFields[tag] = fields
}
fields["read_ios"] = readIos
fields["read_percent"] = readPercent
fields["write_ios"] = writeIos
fields["write_percent"] = writePercent
}
}
}
return nil
}
// Gather reads stats from all lustre targets
func (l *Lustre2) Gather(acc telegraf.Accumulator) error {
l.allFields = make(map[tags]map[string]interface{})
@ -460,6 +576,16 @@ func (l *Lustre2) Gather(acc telegraf.Accumulator) error {
if err != nil {
return err
}
// bulk read/wrote statistics for ldiskfs
err = l.getLustreProcBrwStats("/proc/fs/lustre/osd-ldiskfs/*/brw_stats", wantedBrwstatsFields)
if err != nil {
return err
}
// bulk read/write statistics for zfs
err = l.getLustreProcBrwStats("/proc/fs/lustre/osd-zfs/*/brw_stats", wantedBrwstatsFields)
if err != nil {
return err
}
}
if len(l.MdsProcfiles) == 0 {
@ -477,23 +603,39 @@ func (l *Lustre2) Gather(acc telegraf.Accumulator) error {
}
for _, procfile := range l.OstProcfiles {
ostFields := wantedOstFields
if strings.HasSuffix(procfile, "job_stats") {
ostFields = wantedOstJobstatsFields
}
err := l.GetLustreProcStats(procfile, ostFields)
if err != nil {
return err
if strings.HasSuffix(procfile, "brw_stats") {
err := l.getLustreProcBrwStats(procfile, wantedBrwstatsFields)
if err != nil {
return err
}
} else if strings.HasSuffix(procfile, "job_stats") {
err := l.GetLustreProcStats(procfile, wantedOstJobstatsFields)
if err != nil {
return err
}
} else {
err := l.GetLustreProcStats(procfile, wantedOstFields)
if err != nil {
return err
}
}
}
for _, procfile := range l.MdsProcfiles {
mdtFields := wantedMdsFields
if strings.HasSuffix(procfile, "job_stats") {
mdtFields = wantedMdtJobstatsFields
}
err := l.GetLustreProcStats(procfile, mdtFields)
if err != nil {
return err
if strings.HasSuffix(procfile, "brw_stats") {
err := l.getLustreProcBrwStats(procfile, wantedBrwstatsFields)
if err != nil {
return err
}
} else if strings.HasSuffix(procfile, "job_stats") {
err := l.GetLustreProcStats(procfile, wantedMdtJobstatsFields)
if err != nil {
return err
}
} else {
err := l.GetLustreProcStats(procfile, wantedMdsFields)
if err != nil {
return err
}
}
}
@ -501,6 +643,12 @@ func (l *Lustre2) Gather(acc telegraf.Accumulator) error {
tags := map[string]string{
"name": tgs.name,
}
if len(tgs.brwSection) > 0 {
tags["brw_section"] = tgs.brwSection
}
if len(tgs.bucket) > 0 {
tags["bucket"] = tgs.bucket
}
if len(tgs.job) > 0 {
tags["jobid"] = tgs.job
}

View File

@ -131,6 +131,44 @@ const mdtJobStatsContents = `job_stats:
crossdir_rename: { samples: 201, unit: reqs }
`
// Subset of a brw_stats file. Contains all headers, with representative buckets.
const brwstatsProcContents = `snapshot_time: 1589909588.327213269 (secs.nsecs)
read | write
pages per bulk r/w rpcs % cum % | rpcs % cum %
1: 5271 0 0 | 337023 22 22
2: 3030 0 0 | 5672 0 23
4: 4449 0 0 | 255983 17 40
8: 2780 0 0 | 33612 2 42
read | write
discontiguous pages rpcs % cum % | rpcs % cum %
0: 43942683 100 100 | 337023 22 22
1: 0 0 100 | 5672 0 23
2: 0 0 100 | 28016 1 24
3: 0 0 100 | 227967 15 40
4: 0 0 100 | 12869 0 41
read | write
disk I/Os in flight ios % cum % | ios % cum %
1: 2892221 6 6 | 1437946 96 96
2: 2763141 6 12 | 44373 2 99
3: 3014304 6 19 | 2677 0 99
4: 3212360 7 27 | 183 0 99
read | write
I/O time (1/1000s) ios % cum % | ios % cum %
1: 521780 1 1 | 0 0 0
16: 6035560 16 22 | 0 0 0
128: 5044958 14 98 | 0 0 0
1K: 651 0 99 | 0 0 0
read | write
disk I/O size ios % cum % | ios % cum %
1: 0 0 0 | 327301 22 22
16: 0 0 0 | 0 0 22
128: 35 0 0 | 209 0 22
1K: 0 0 0 | 1703 0 22
16K: 4449 0 0 | 255983 17 40
128K: 855 0 0 | 23 0 42
1M: 43866371 99 100 | 850248 57 100
`
func TestLustre2GeneratesMetrics(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "telegraf-lustre")
require.NoError(t, err)
@ -430,3 +468,79 @@ func TestLustre2CanParseConfiguration(t *testing.T) {
},
}, plugin)
}
func TestLustre2GeneratesBrwstatsMetrics(t *testing.T) {
tmpdir, err := os.MkdirTemp("", "telegraf-lustre-brwstats")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)
tempdir := tmpdir + "/telegraf/proc/fs/lustre/"
ostname := "OST0001"
osddir := tempdir + "/osd-ldiskfs/"
err = os.MkdirAll(osddir+"/"+ostname, 0750)
require.NoError(t, err)
err = os.WriteFile(osddir+"/"+ostname+"/brw_stats", []byte(brwstatsProcContents), 0640)
require.NoError(t, err)
m := &Lustre2{
OstProcfiles: []string{osddir + "/*/brw_stats"},
}
var acc testutil.Accumulator
err = m.Gather(&acc)
require.NoError(t, err)
expectedData := map[string]map[string][]uint64{
"pages_per_bulk_rw": {
"1": {5271, 0, 337023, 22},
"2": {3030, 0, 5672, 0},
"4": {4449, 0, 255983, 17},
"8": {2780, 0, 33612, 2}},
"discontiguous_pages": {
"0": {43942683, 100, 337023, 22},
"1": {0, 0, 5672, 0},
"2": {0, 0, 28016, 1},
"3": {0, 0, 227967, 15},
"4": {0, 0, 12869, 0}},
"disk_ios_in_flight": {
"1": {2892221, 6, 1437946, 96},
"2": {2763141, 6, 44373, 2},
"3": {3014304, 6, 2677, 0},
"4": {3212360, 7, 183, 0}},
"io_time": {
"1": {521780, 1, 0, 0},
"16": {6035560, 16, 0, 0},
"128": {5044958, 14, 0, 0},
"1K": {651, 0, 0, 0}},
"disk_io_size": {
"1": {0, 0, 327301, 22},
"16": {0, 0, 0, 0},
"128": {35, 0, 209, 0},
"1K": {0, 0, 1703, 0},
"16K": {4449, 0, 255983, 17},
"128K": {855, 0, 23, 0},
"1M": {43866371, 99, 850248, 57}},
}
for brwSection, buckets := range expectedData {
for bucket, values := range buckets {
tags := map[string]string{
"name": ostname,
"brw_section": brwSection,
"bucket": bucket,
}
fields := map[string]interface{}{
"read_ios": values[0],
"read_percent": values[1],
"write_ios": values[2],
"write_percent": values[3],
}
t.Log("\n", tags)
t.Log("\n", fields)
acc.AssertContainsTaggedFields(t, "lustre2", fields, tags)
}
}
}

View File

@ -2,16 +2,20 @@
# This plugin ONLY supports Linux
[[inputs.lustre2]]
## An array of /proc globs to search for Lustre stats
## If not specified, the default will work on Lustre 2.5.x
## If not specified, the default will work on Lustre 2.12.x
##
# ost_procfiles = [
# "/proc/fs/lustre/obdfilter/*/stats",
# "/proc/fs/lustre/osd-ldiskfs/*/stats",
# "/proc/fs/lustre/obdfilter/*/job_stats",
# "/proc/fs/lustre/obdfilter/*/exports/*/stats",
# "/proc/fs/lustre/osd-ldiskfs/*/brw_stats",
# "/proc/fs/lustre/osd-zfs/*/brw_stats",
# ]
# mds_procfiles = [
# "/proc/fs/lustre/mdt/*/md_stats",
# "/proc/fs/lustre/mdt/*/job_stats",
# "/proc/fs/lustre/mdt/*/exports/*/stats",
# "/proc/fs/lustre/osd-ldiskfs/*/brw_stats",
# "/proc/fs/lustre/osd-zfs/*/brw_stats",
# ]