chore: Fix linter findings for `revive:exported` in `plugins/inputs/l*` (#16167)

This commit is contained in:
Paweł Żak 2024-11-12 18:26:23 +01:00 committed by GitHub
parent 54dcd2d8cb
commit 942d2b3f6f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
31 changed files with 1001 additions and 933 deletions

19
internal/env.go Normal file
View File

@ -0,0 +1,19 @@
package internal
import "os"
// GetProcPath returns the path stored in HOST_PROC env variable, or /proc if HOST_PROC has not been set.
func GetProcPath() string {
if hostProc := os.Getenv("HOST_PROC"); hostProc != "" {
return hostProc
}
return "/proc"
}
// GetSysPath returns the path stored in HOST_SYS env variable, or /sys if HOST_SYS has not been set.
func GetSysPath() string {
if hostSys := os.Getenv("HOST_SYS"); hostSys != "" {
return hostSys
}
return "/sys"
}

View File

@ -11,19 +11,13 @@ import (
"strings" "strings"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
const (
defaultHostProc = "/proc"
defaultHostSys = "/sys"
envProc = "HOST_PROC"
envSys = "HOST_SYS"
)
type Bond struct { type Bond struct {
HostProc string `toml:"host_proc"` HostProc string `toml:"host_proc"`
HostSys string `toml:"host_sys"` HostSys string `toml:"host_sys"`
@ -286,23 +280,13 @@ func (bond *Bond) gatherSlavePart(bondName, rawFile string, acc telegraf.Accumul
// if it is empty then try read from env variable // if it is empty then try read from env variable
func (bond *Bond) loadPaths() { func (bond *Bond) loadPaths() {
if bond.HostProc == "" { if bond.HostProc == "" {
bond.HostProc = proc(envProc, defaultHostProc) bond.HostProc = internal.GetProcPath()
} }
if bond.HostSys == "" { if bond.HostSys == "" {
bond.HostSys = proc(envSys, defaultHostSys) bond.HostSys = internal.GetSysPath()
} }
} }
// proc can be used to read file paths from env
func proc(env, path string) string {
// try to read full file path
if p := os.Getenv(env); p != "" {
return p
}
// return default path
return path
}
func (bond *Bond) listInterfaces() ([]string, error) { func (bond *Bond) listInterfaces() ([]string, error) {
var interfaces []string var interfaces []string
if len(bond.BondInterfaces) > 0 { if len(bond.BondInterfaces) > 0 {

View File

@ -1,4 +1,4 @@
// Code generated by mockery v0.0.0-dev. DO NOT EDIT. // Code generated by mockery v2.46.3. DO NOT EDIT.
package mocks package mocks
@ -19,6 +19,10 @@ type Conn struct {
func (_m *Conn) Close() error { func (_m *Conn) Close() error {
ret := _m.Called() ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for Close")
}
var r0 error var r0 error
if rf, ok := ret.Get(0).(func() error); ok { if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf() r0 = rf()
@ -33,6 +37,10 @@ func (_m *Conn) Close() error {
func (_m *Conn) LocalAddr() net.Addr { func (_m *Conn) LocalAddr() net.Addr {
ret := _m.Called() ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for LocalAddr")
}
var r0 net.Addr var r0 net.Addr
if rf, ok := ret.Get(0).(func() net.Addr); ok { if rf, ok := ret.Get(0).(func() net.Addr); ok {
r0 = rf() r0 = rf()
@ -49,14 +57,21 @@ func (_m *Conn) LocalAddr() net.Addr {
func (_m *Conn) Read(b []byte) (int, error) { func (_m *Conn) Read(b []byte) (int, error) {
ret := _m.Called(b) ret := _m.Called(b)
if len(ret) == 0 {
panic("no return value specified for Read")
}
var r0 int var r0 int
var r1 error
if rf, ok := ret.Get(0).(func([]byte) (int, error)); ok {
return rf(b)
}
if rf, ok := ret.Get(0).(func([]byte) int); ok { if rf, ok := ret.Get(0).(func([]byte) int); ok {
r0 = rf(b) r0 = rf(b)
} else { } else {
r0 = ret.Get(0).(int) r0 = ret.Get(0).(int)
} }
var r1 error
if rf, ok := ret.Get(1).(func([]byte) error); ok { if rf, ok := ret.Get(1).(func([]byte) error); ok {
r1 = rf(b) r1 = rf(b)
} else { } else {
@ -70,6 +85,10 @@ func (_m *Conn) Read(b []byte) (int, error) {
func (_m *Conn) RemoteAddr() net.Addr { func (_m *Conn) RemoteAddr() net.Addr {
ret := _m.Called() ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for RemoteAddr")
}
var r0 net.Addr var r0 net.Addr
if rf, ok := ret.Get(0).(func() net.Addr); ok { if rf, ok := ret.Get(0).(func() net.Addr); ok {
r0 = rf() r0 = rf()
@ -86,6 +105,10 @@ func (_m *Conn) RemoteAddr() net.Addr {
func (_m *Conn) SetDeadline(t time.Time) error { func (_m *Conn) SetDeadline(t time.Time) error {
ret := _m.Called(t) ret := _m.Called(t)
if len(ret) == 0 {
panic("no return value specified for SetDeadline")
}
var r0 error var r0 error
if rf, ok := ret.Get(0).(func(time.Time) error); ok { if rf, ok := ret.Get(0).(func(time.Time) error); ok {
r0 = rf(t) r0 = rf(t)
@ -100,6 +123,10 @@ func (_m *Conn) SetDeadline(t time.Time) error {
func (_m *Conn) SetReadDeadline(t time.Time) error { func (_m *Conn) SetReadDeadline(t time.Time) error {
ret := _m.Called(t) ret := _m.Called(t)
if len(ret) == 0 {
panic("no return value specified for SetReadDeadline")
}
var r0 error var r0 error
if rf, ok := ret.Get(0).(func(time.Time) error); ok { if rf, ok := ret.Get(0).(func(time.Time) error); ok {
r0 = rf(t) r0 = rf(t)
@ -114,6 +141,10 @@ func (_m *Conn) SetReadDeadline(t time.Time) error {
func (_m *Conn) SetWriteDeadline(t time.Time) error { func (_m *Conn) SetWriteDeadline(t time.Time) error {
ret := _m.Called(t) ret := _m.Called(t)
if len(ret) == 0 {
panic("no return value specified for SetWriteDeadline")
}
var r0 error var r0 error
if rf, ok := ret.Get(0).(func(time.Time) error); ok { if rf, ok := ret.Get(0).(func(time.Time) error); ok {
r0 = rf(t) r0 = rf(t)
@ -128,14 +159,21 @@ func (_m *Conn) SetWriteDeadline(t time.Time) error {
func (_m *Conn) Write(b []byte) (int, error) { func (_m *Conn) Write(b []byte) (int, error) {
ret := _m.Called(b) ret := _m.Called(b)
if len(ret) == 0 {
panic("no return value specified for Write")
}
var r0 int var r0 int
var r1 error
if rf, ok := ret.Get(0).(func([]byte) (int, error)); ok {
return rf(b)
}
if rf, ok := ret.Get(0).(func([]byte) int); ok { if rf, ok := ret.Get(0).(func([]byte) int); ok {
r0 = rf(b) r0 = rf(b)
} else { } else {
r0 = ret.Get(0).(int) r0 = ret.Get(0).(int)
} }
var r1 error
if rf, ok := ret.Get(1).(func([]byte) error); ok { if rf, ok := ret.Get(1).(func([]byte) error); ok {
r1 = rf(b) r1 = rf(b)
} else { } else {
@ -144,3 +182,17 @@ func (_m *Conn) Write(b []byte) (int, error) {
return r0, r1 return r0, r1
} }
// NewConn creates a new instance of Conn. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewConn(t interface {
mock.TestingT
Cleanup(func())
}) *Conn {
mock := &Conn{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -1,4 +1,4 @@
// Code generated by mockery v0.0.0-dev. DO NOT EDIT. // Code generated by mockery v2.46.3. DO NOT EDIT.
package mocks package mocks
@ -19,6 +19,10 @@ type Conn struct {
func (_m *Conn) Close() error { func (_m *Conn) Close() error {
ret := _m.Called() ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for Close")
}
var r0 error var r0 error
if rf, ok := ret.Get(0).(func() error); ok { if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf() r0 = rf()
@ -33,6 +37,10 @@ func (_m *Conn) Close() error {
func (_m *Conn) LocalAddr() net.Addr { func (_m *Conn) LocalAddr() net.Addr {
ret := _m.Called() ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for LocalAddr")
}
var r0 net.Addr var r0 net.Addr
if rf, ok := ret.Get(0).(func() net.Addr); ok { if rf, ok := ret.Get(0).(func() net.Addr); ok {
r0 = rf() r0 = rf()
@ -49,14 +57,21 @@ func (_m *Conn) LocalAddr() net.Addr {
func (_m *Conn) Read(b []byte) (int, error) { func (_m *Conn) Read(b []byte) (int, error) {
ret := _m.Called(b) ret := _m.Called(b)
if len(ret) == 0 {
panic("no return value specified for Read")
}
var r0 int var r0 int
var r1 error
if rf, ok := ret.Get(0).(func([]byte) (int, error)); ok {
return rf(b)
}
if rf, ok := ret.Get(0).(func([]byte) int); ok { if rf, ok := ret.Get(0).(func([]byte) int); ok {
r0 = rf(b) r0 = rf(b)
} else { } else {
r0 = ret.Get(0).(int) r0 = ret.Get(0).(int)
} }
var r1 error
if rf, ok := ret.Get(1).(func([]byte) error); ok { if rf, ok := ret.Get(1).(func([]byte) error); ok {
r1 = rf(b) r1 = rf(b)
} else { } else {
@ -70,6 +85,10 @@ func (_m *Conn) Read(b []byte) (int, error) {
func (_m *Conn) RemoteAddr() net.Addr { func (_m *Conn) RemoteAddr() net.Addr {
ret := _m.Called() ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for RemoteAddr")
}
var r0 net.Addr var r0 net.Addr
if rf, ok := ret.Get(0).(func() net.Addr); ok { if rf, ok := ret.Get(0).(func() net.Addr); ok {
r0 = rf() r0 = rf()
@ -86,6 +105,10 @@ func (_m *Conn) RemoteAddr() net.Addr {
func (_m *Conn) SetDeadline(t time.Time) error { func (_m *Conn) SetDeadline(t time.Time) error {
ret := _m.Called(t) ret := _m.Called(t)
if len(ret) == 0 {
panic("no return value specified for SetDeadline")
}
var r0 error var r0 error
if rf, ok := ret.Get(0).(func(time.Time) error); ok { if rf, ok := ret.Get(0).(func(time.Time) error); ok {
r0 = rf(t) r0 = rf(t)
@ -100,6 +123,10 @@ func (_m *Conn) SetDeadline(t time.Time) error {
func (_m *Conn) SetReadDeadline(t time.Time) error { func (_m *Conn) SetReadDeadline(t time.Time) error {
ret := _m.Called(t) ret := _m.Called(t)
if len(ret) == 0 {
panic("no return value specified for SetReadDeadline")
}
var r0 error var r0 error
if rf, ok := ret.Get(0).(func(time.Time) error); ok { if rf, ok := ret.Get(0).(func(time.Time) error); ok {
r0 = rf(t) r0 = rf(t)
@ -114,6 +141,10 @@ func (_m *Conn) SetReadDeadline(t time.Time) error {
func (_m *Conn) SetWriteDeadline(t time.Time) error { func (_m *Conn) SetWriteDeadline(t time.Time) error {
ret := _m.Called(t) ret := _m.Called(t)
if len(ret) == 0 {
panic("no return value specified for SetWriteDeadline")
}
var r0 error var r0 error
if rf, ok := ret.Get(0).(func(time.Time) error); ok { if rf, ok := ret.Get(0).(func(time.Time) error); ok {
r0 = rf(t) r0 = rf(t)
@ -128,14 +159,21 @@ func (_m *Conn) SetWriteDeadline(t time.Time) error {
func (_m *Conn) Write(b []byte) (int, error) { func (_m *Conn) Write(b []byte) (int, error) {
ret := _m.Called(b) ret := _m.Called(b)
if len(ret) == 0 {
panic("no return value specified for Write")
}
var r0 int var r0 int
var r1 error
if rf, ok := ret.Get(0).(func([]byte) (int, error)); ok {
return rf(b)
}
if rf, ok := ret.Get(0).(func([]byte) int); ok { if rf, ok := ret.Get(0).(func([]byte) int); ok {
r0 = rf(b) r0 = rf(b)
} else { } else {
r0 = ret.Get(0).(int) r0 = ret.Get(0).(int)
} }
var r1 error
if rf, ok := ret.Get(1).(func([]byte) error); ok { if rf, ok := ret.Get(1).(func([]byte) error); ok {
r1 = rf(b) r1 = rf(b)
} else { } else {
@ -144,3 +182,17 @@ func (_m *Conn) Write(b []byte) (int, error) {
return r0, r1 return r0, r1
} }
// NewConn creates a new instance of Conn. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewConn(t interface {
mock.TestingT
Cleanup(func())
}) *Conn {
mock := &Conn{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -10,7 +10,7 @@ import (
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
mocks "github.com/influxdata/telegraf/plugins/inputs/intel_baseband/mock" "github.com/influxdata/telegraf/plugins/inputs/intel_baseband/mocks"
) )
func TestWriteCommandToSocket(t *testing.T) { func TestWriteCommandToSocket(t *testing.T) {

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.14.0. DO NOT EDIT. // Code generated by mockery v2.46.3. DO NOT EDIT.
package intel_dlb package intel_dlb
@ -13,7 +13,15 @@ type mockRasReader struct {
func (_m *mockRasReader) gatherPaths(path string) ([]string, error) { func (_m *mockRasReader) gatherPaths(path string) ([]string, error) {
ret := _m.Called(path) ret := _m.Called(path)
if len(ret) == 0 {
panic("no return value specified for gatherPaths")
}
var r0 []string var r0 []string
var r1 error
if rf, ok := ret.Get(0).(func(string) ([]string, error)); ok {
return rf(path)
}
if rf, ok := ret.Get(0).(func(string) []string); ok { if rf, ok := ret.Get(0).(func(string) []string); ok {
r0 = rf(path) r0 = rf(path)
} else { } else {
@ -22,7 +30,6 @@ func (_m *mockRasReader) gatherPaths(path string) ([]string, error) {
} }
} }
var r1 error
if rf, ok := ret.Get(1).(func(string) error); ok { if rf, ok := ret.Get(1).(func(string) error); ok {
r1 = rf(path) r1 = rf(path)
} else { } else {
@ -36,7 +43,15 @@ func (_m *mockRasReader) gatherPaths(path string) ([]string, error) {
func (_m *mockRasReader) readFromFile(filePath string) ([]byte, error) { func (_m *mockRasReader) readFromFile(filePath string) ([]byte, error) {
ret := _m.Called(filePath) ret := _m.Called(filePath)
if len(ret) == 0 {
panic("no return value specified for readFromFile")
}
var r0 []byte var r0 []byte
var r1 error
if rf, ok := ret.Get(0).(func(string) ([]byte, error)); ok {
return rf(filePath)
}
if rf, ok := ret.Get(0).(func(string) []byte); ok { if rf, ok := ret.Get(0).(func(string) []byte); ok {
r0 = rf(filePath) r0 = rf(filePath)
} else { } else {
@ -45,7 +60,6 @@ func (_m *mockRasReader) readFromFile(filePath string) ([]byte, error) {
} }
} }
var r1 error
if rf, ok := ret.Get(1).(func(string) error); ok { if rf, ok := ret.Get(1).(func(string) error); ok {
r1 = rf(filePath) r1 = rf(filePath)
} else { } else {
@ -55,13 +69,12 @@ func (_m *mockRasReader) readFromFile(filePath string) ([]byte, error) {
return r0, r1 return r0, r1
} }
type mockConstructorTestingTnewMockRasReader interface { // newMockRasReader creates a new instance of mockRasReader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func newMockRasReader(t interface {
mock.TestingT mock.TestingT
Cleanup(func()) Cleanup(func())
} }) *mockRasReader {
// newMockRasReader creates a new instance of mockRasReader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func newMockRasReader(t mockConstructorTestingTnewMockRasReader) *mockRasReader {
mock := &mockRasReader{} mock := &mockRasReader{}
mock.Mock.Test(t) mock.Mock.Test(t)

View File

@ -18,30 +18,16 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
func init() {
inputs.Add("lanz", func() telegraf.Input {
return NewLanz()
})
}
type Lanz struct { type Lanz struct {
Servers []string `toml:"servers"` Servers []string `toml:"servers"`
clients []lanz.Client clients []lanz.Client
wg sync.WaitGroup wg sync.WaitGroup
} }
func NewLanz() *Lanz {
return &Lanz{}
}
func (*Lanz) SampleConfig() string { func (*Lanz) SampleConfig() string {
return sampleConfig return sampleConfig
} }
func (l *Lanz) Gather(_ telegraf.Accumulator) error {
return nil
}
func (l *Lanz) Start(acc telegraf.Accumulator) error { func (l *Lanz) Start(acc telegraf.Accumulator) error {
if len(l.Servers) == 0 { if len(l.Servers) == 0 {
l.Servers = append(l.Servers, "tcp://127.0.0.1:50001") l.Servers = append(l.Servers, "tcp://127.0.0.1:50001")
@ -72,6 +58,10 @@ func (l *Lanz) Start(acc telegraf.Accumulator) error {
return nil return nil
} }
func (l *Lanz) Gather(_ telegraf.Accumulator) error {
return nil
}
func (l *Lanz) Stop() { func (l *Lanz) Stop() {
for _, client := range l.clients { for _, client := range l.clients {
client.Stop() client.Stop()
@ -130,3 +120,9 @@ func msgToAccumulator(acc telegraf.Accumulator, msg *pb.LanzRecord, deviceURL *u
acc.AddFields("lanz_global_buffer_usage_record", vals, tags) acc.AddFields("lanz_global_buffer_usage_record", vals, tags)
} }
} }
func init() {
inputs.Add("lanz", func() telegraf.Input {
return &Lanz{}
})
}

View File

@ -52,14 +52,11 @@ var testProtoBufGlobalBufferUsageRecord = &pb.LanzRecord{
} }
func TestLanzGeneratesMetrics(t *testing.T) { func TestLanzGeneratesMetrics(t *testing.T) {
var acc testutil.Accumulator l := &Lanz{Servers: []string{
l := NewLanz()
l.Servers = append(l.Servers,
"tcp://switch01.int.example.com:50001", "tcp://switch01.int.example.com:50001",
"tcp://switch02.int.example.com:50001", "tcp://switch02.int.example.com:50001",
) }}
deviceURL1, err := url.Parse(l.Servers[0]) deviceURL1, err := url.Parse(l.Servers[0])
if err != nil { if err != nil {
t.Fail() t.Fail()
@ -69,6 +66,7 @@ func TestLanzGeneratesMetrics(t *testing.T) {
t.Fail() t.Fail()
} }
var acc testutil.Accumulator
msgToAccumulator(&acc, testProtoBufCongestionRecord1, deviceURL1) msgToAccumulator(&acc, testProtoBufCongestionRecord1, deviceURL1)
acc.Wait(1) acc.Wait(1)

View File

@ -20,26 +20,27 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
const oid = ".1.3.6.1.4.1.35450" const (
oid = ".1.3.6.1.4.1.35450"
// For Manager Master
defaultEndpoint = "127.0.0.1:4020"
)
// For Manager Master type serverType int
const defaultEndpoint = "127.0.0.1:4020"
type ServerType int
const ( const (
ServerTypeManagerMaster ServerType = iota serverTypeManagerMaster serverType = iota
ServerTypeManagerSlave serverTypeManagerSlave
ServerTypeStorage serverTypeStorage
ServerTypeGateway serverTypeGateway
) )
type LeoFS struct { type LeoFS struct {
Servers []string Servers []string `toml:"servers"`
} }
var KeyMapping = map[ServerType][]string{ var keyMapping = map[serverType][]string{
ServerTypeManagerMaster: { serverTypeManagerMaster: {
"num_of_processes", "num_of_processes",
"total_memory_usage", "total_memory_usage",
"system_memory_usage", "system_memory_usage",
@ -55,7 +56,7 @@ var KeyMapping = map[ServerType][]string{
"used_allocated_memory_5min", "used_allocated_memory_5min",
"allocated_memory_5min", "allocated_memory_5min",
}, },
ServerTypeManagerSlave: { serverTypeManagerSlave: {
"num_of_processes", "num_of_processes",
"total_memory_usage", "total_memory_usage",
"system_memory_usage", "system_memory_usage",
@ -71,7 +72,7 @@ var KeyMapping = map[ServerType][]string{
"used_allocated_memory_5min", "used_allocated_memory_5min",
"allocated_memory_5min", "allocated_memory_5min",
}, },
ServerTypeStorage: { serverTypeStorage: {
"num_of_processes", "num_of_processes",
"total_memory_usage", "total_memory_usage",
"system_memory_usage", "system_memory_usage",
@ -113,7 +114,7 @@ var KeyMapping = map[ServerType][]string{
"comp_num_of_ongoing_targets", "comp_num_of_ongoing_targets",
"comp_num_of_out_of_targets", "comp_num_of_out_of_targets",
}, },
ServerTypeGateway: { serverTypeGateway: {
"num_of_processes", "num_of_processes",
"total_memory_usage", "total_memory_usage",
"system_memory_usage", "system_memory_usage",
@ -141,15 +142,15 @@ var KeyMapping = map[ServerType][]string{
}, },
} }
var serverTypeMapping = map[string]ServerType{ var serverTypeMapping = map[string]serverType{
"4020": ServerTypeManagerMaster, "4020": serverTypeManagerMaster,
"4021": ServerTypeManagerSlave, "4021": serverTypeManagerSlave,
"4010": ServerTypeStorage, "4010": serverTypeStorage,
"4011": ServerTypeStorage, "4011": serverTypeStorage,
"4012": ServerTypeStorage, "4012": serverTypeStorage,
"4013": ServerTypeStorage, "4013": serverTypeStorage,
"4000": ServerTypeGateway, "4000": serverTypeGateway,
"4001": ServerTypeGateway, "4001": serverTypeGateway,
} }
func (*LeoFS) SampleConfig() string { func (*LeoFS) SampleConfig() string {
@ -158,7 +159,7 @@ func (*LeoFS) SampleConfig() string {
func (l *LeoFS) Gather(acc telegraf.Accumulator) error { func (l *LeoFS) Gather(acc telegraf.Accumulator) error {
if len(l.Servers) == 0 { if len(l.Servers) == 0 {
return l.gatherServer(defaultEndpoint, ServerTypeManagerMaster, acc) return l.gatherServer(defaultEndpoint, serverTypeManagerMaster, acc)
} }
var wg sync.WaitGroup var wg sync.WaitGroup
for _, endpoint := range l.Servers { for _, endpoint := range l.Servers {
@ -179,10 +180,10 @@ func (l *LeoFS) Gather(acc telegraf.Accumulator) error {
st, ok := serverTypeMapping[port] st, ok := serverTypeMapping[port]
if !ok { if !ok {
st = ServerTypeStorage st = serverTypeStorage
} }
wg.Add(1) wg.Add(1)
go func(endpoint string, st ServerType) { go func(endpoint string, st serverType) {
defer wg.Done() defer wg.Done()
acc.AddError(l.gatherServer(endpoint, st, acc)) acc.AddError(l.gatherServer(endpoint, st, acc))
}(endpoint, st) }(endpoint, st)
@ -191,11 +192,7 @@ func (l *LeoFS) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
func (l *LeoFS) gatherServer( func (l *LeoFS) gatherServer(endpoint string, serverType serverType, acc telegraf.Accumulator) error {
endpoint string,
serverType ServerType,
acc telegraf.Accumulator,
) error {
cmd := exec.Command("snmpwalk", "-v2c", "-cpublic", "-On", endpoint, oid) cmd := exec.Command("snmpwalk", "-v2c", "-cpublic", "-On", endpoint, oid)
stdout, err := cmd.StdoutPipe() stdout, err := cmd.StdoutPipe()
if err != nil { if err != nil {
@ -221,7 +218,7 @@ func (l *LeoFS) gatherServer(
fields := make(map[string]interface{}) fields := make(map[string]interface{})
for scanner.Scan() { for scanner.Scan() {
key := KeyMapping[serverType][i] key := keyMapping[serverType][i]
val, err := retrieveTokenAfterColon(scanner.Text()) val, err := retrieveTokenAfterColon(scanner.Text())
if err != nil { if err != nil {
return err return err

View File

@ -6,8 +6,9 @@ import (
"runtime" "runtime"
"testing" "testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/testutil"
) )
var fakeSNMP4Manager = ` var fakeSNMP4Manager = `
@ -123,7 +124,7 @@ func main() {
} }
` `
func testMain(t *testing.T, code, endpoint string, serverType ServerType) { func testMain(t *testing.T, code, endpoint string, serverType serverType) {
executable := "snmpwalk" executable := "snmpwalk"
if runtime.GOOS == "windows" { if runtime.GOOS == "windows" {
executable = "snmpwalk.exe" executable = "snmpwalk.exe"
@ -153,7 +154,7 @@ func testMain(t *testing.T, code, endpoint string, serverType ServerType) {
err = acc.GatherError(l.Gather) err = acc.GatherError(l.Gather)
require.NoError(t, err) require.NoError(t, err)
floatMetrics := KeyMapping[serverType] floatMetrics := keyMapping[serverType]
for _, metric := range floatMetrics { for _, metric := range floatMetrics {
require.True(t, acc.HasFloatField("leofs", metric), metric) require.True(t, acc.HasFloatField("leofs", metric), metric)
@ -165,7 +166,7 @@ func TestLeoFSManagerMasterMetricsIntegration(t *testing.T) {
t.Skip("Skipping integration test in short mode") t.Skip("Skipping integration test in short mode")
} }
testMain(t, fakeSNMP4Manager, "localhost:4020", ServerTypeManagerMaster) testMain(t, fakeSNMP4Manager, "localhost:4020", serverTypeManagerMaster)
} }
func TestLeoFSManagerSlaveMetricsIntegration(t *testing.T) { func TestLeoFSManagerSlaveMetricsIntegration(t *testing.T) {
@ -173,7 +174,7 @@ func TestLeoFSManagerSlaveMetricsIntegration(t *testing.T) {
t.Skip("Skipping integration test in short mode") t.Skip("Skipping integration test in short mode")
} }
testMain(t, fakeSNMP4Manager, "localhost:4021", ServerTypeManagerSlave) testMain(t, fakeSNMP4Manager, "localhost:4021", serverTypeManagerSlave)
} }
func TestLeoFSStorageMetricsIntegration(t *testing.T) { func TestLeoFSStorageMetricsIntegration(t *testing.T) {
@ -181,7 +182,7 @@ func TestLeoFSStorageMetricsIntegration(t *testing.T) {
t.Skip("Skipping integration test in short mode") t.Skip("Skipping integration test in short mode")
} }
testMain(t, fakeSNMP4Storage, "localhost:4010", ServerTypeStorage) testMain(t, fakeSNMP4Storage, "localhost:4010", serverTypeStorage)
} }
func TestLeoFSGatewayMetricsIntegration(t *testing.T) { func TestLeoFSGatewayMetricsIntegration(t *testing.T) {
@ -189,5 +190,5 @@ func TestLeoFSGatewayMetricsIntegration(t *testing.T) {
t.Skip("Skipping integration test in short mode") t.Skip("Skipping integration test in short mode")
} }
testMain(t, fakeSNMP4Gateway, "localhost:4000", ServerTypeGateway) testMain(t, fakeSNMP4Gateway, "localhost:4000", serverTypeGateway)
} }

View File

@ -90,6 +90,40 @@ func (l *Libvirt) Init() error {
return nil return nil
} }
func (l *Libvirt) Gather(acc telegraf.Accumulator) error {
var err error
if err := l.utils.ensureConnected(l.LibvirtURI); err != nil {
return err
}
// Get all available domains
gatheredDomains, err := l.utils.gatherAllDomains()
if handledErr := handleError(err, "error occurred while gathering all domains", l.utils); handledErr != nil {
return handledErr
} else if len(gatheredDomains) == 0 {
l.Log.Debug("Couldn't find any domains on system")
return nil
}
// Exclude domain.
domains := l.filterDomains(gatheredDomains)
if len(domains) == 0 {
l.Log.Debug("Configured domains are not available on system")
return nil
}
var vcpuInfos map[string][]vcpuAffinity
if l.vcpuMappingEnabled {
vcpuInfos, err = l.getVcpuMapping(domains)
if handledErr := handleError(err, "error occurred while gathering vcpu mapping", l.utils); handledErr != nil {
return handledErr
}
}
err = l.gatherMetrics(domains, vcpuInfos, acc)
return handleError(err, "error occurred while gathering metrics", l.utils)
}
func (l *Libvirt) validateLibvirtURI() error { func (l *Libvirt) validateLibvirtURI() error {
uri := libvirtutils.LibvirtUri{} uri := libvirtutils.LibvirtUri{}
err := uri.Unmarshal(l.LibvirtURI) err := uri.Unmarshal(l.LibvirtURI)
@ -150,43 +184,9 @@ func (l *Libvirt) isThereAnythingToGather() bool {
return l.metricNumber > 0 || len(l.AdditionalStatistics) > 0 return l.metricNumber > 0 || len(l.AdditionalStatistics) > 0
} }
func (l *Libvirt) Gather(acc telegraf.Accumulator) error {
var err error
if err := l.utils.EnsureConnected(l.LibvirtURI); err != nil {
return err
}
// Get all available domains
gatheredDomains, err := l.utils.GatherAllDomains()
if handledErr := handleError(err, "error occurred while gathering all domains", l.utils); handledErr != nil {
return handledErr
} else if len(gatheredDomains) == 0 {
l.Log.Debug("Couldn't find any domains on system")
return nil
}
// Exclude domain.
domains := l.filterDomains(gatheredDomains)
if len(domains) == 0 {
l.Log.Debug("Configured domains are not available on system")
return nil
}
var vcpuInfos map[string][]vcpuAffinity
if l.vcpuMappingEnabled {
vcpuInfos, err = l.getVcpuMapping(domains)
if handledErr := handleError(err, "error occurred while gathering vcpu mapping", l.utils); handledErr != nil {
return handledErr
}
}
err = l.gatherMetrics(domains, vcpuInfos, acc)
return handleError(err, "error occurred while gathering metrics", l.utils)
}
func handleError(err error, errMessage string, utils utils) error { func handleError(err error, errMessage string, utils utils) error {
if err != nil { if err != nil {
if chanErr := utils.Disconnect(); chanErr != nil { if chanErr := utils.disconnect(); chanErr != nil {
return fmt.Errorf("%s: %w; error occurred when disconnecting: %w", errMessage, err, chanErr) return fmt.Errorf("%s: %w; error occurred when disconnecting: %w", errMessage, err, chanErr)
} }
return fmt.Errorf("%s: %w", errMessage, err) return fmt.Errorf("%s: %w", errMessage, err)
@ -210,7 +210,7 @@ func (l *Libvirt) filterDomains(availableDomains []golibvirt.Domain) []golibvirt
} }
func (l *Libvirt) gatherMetrics(domains []golibvirt.Domain, vcpuInfos map[string][]vcpuAffinity, acc telegraf.Accumulator) error { func (l *Libvirt) gatherMetrics(domains []golibvirt.Domain, vcpuInfos map[string][]vcpuAffinity, acc telegraf.Accumulator) error {
stats, err := l.utils.GatherStatsForDomains(domains, l.metricNumber) stats, err := l.utils.gatherStatsForDomains(domains, l.metricNumber)
if err != nil { if err != nil {
return err return err
} }
@ -220,7 +220,7 @@ func (l *Libvirt) gatherMetrics(domains []golibvirt.Domain, vcpuInfos map[string
} }
func (l *Libvirt) getVcpuMapping(domains []golibvirt.Domain) (map[string][]vcpuAffinity, error) { func (l *Libvirt) getVcpuMapping(domains []golibvirt.Domain) (map[string][]vcpuAffinity, error) {
pCPUs, err := l.utils.GatherNumberOfPCPUs() pCPUs, err := l.utils.gatherNumberOfPCPUs()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -231,9 +231,9 @@ func (l *Libvirt) getVcpuMapping(domains []golibvirt.Domain) (map[string][]vcpuA
for i := range domains { for i := range domains {
domain := domains[i] domain := domains[i]
// Executing GatherVcpuMapping can take some time, it is worth to call it in parallel // Executing gatherVcpuMapping can take some time, it is worth to call it in parallel
group.Go(func() error { group.Go(func() error {
vcpuInfo, err := l.utils.GatherVcpuMapping(domain, pCPUs, l.shouldGetCurrentPCPU()) vcpuInfo, err := l.utils.gatherVcpuMapping(domain, pCPUs, l.shouldGetCurrentPCPU())
if err != nil { if err != nil {
return err return err
} }

View File

@ -35,10 +35,10 @@ func TestLibvirt_Init(t *testing.T) {
}) })
t.Run("throw error when user provided invalid uri", func(t *testing.T) { t.Run("throw error when user provided invalid uri", func(t *testing.T) {
mockLibvirtUtils := MockLibvirtUtils{} mockUtils := mockLibvirtUtils{}
l := Libvirt{ l := Libvirt{
LibvirtURI: "this/is/wrong/uri", LibvirtURI: "this/is/wrong/uri",
utils: &mockLibvirtUtils, utils: &mockUtils,
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
err := l.Init() err := l.Init()
@ -47,10 +47,10 @@ func TestLibvirt_Init(t *testing.T) {
}) })
t.Run("successfully initialize libvirt on correct user input", func(t *testing.T) { t.Run("successfully initialize libvirt on correct user input", func(t *testing.T) {
mockLibvirtUtils := MockLibvirtUtils{} mockUtils := mockLibvirtUtils{}
l := Libvirt{ l := Libvirt{
StatisticsGroups: []string{"state", "cpu_total", "vcpu", "interface"}, StatisticsGroups: []string{"state", "cpu_total", "vcpu", "interface"},
utils: &mockLibvirtUtils, utils: &mockUtils,
LibvirtURI: defaultLibvirtURI, LibvirtURI: defaultLibvirtURI,
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
@ -62,66 +62,66 @@ func TestLibvirt_Init(t *testing.T) {
func TestLibvirt_Gather(t *testing.T) { func TestLibvirt_Gather(t *testing.T) {
t.Run("wrong uri throws error", func(t *testing.T) { t.Run("wrong uri throws error", func(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
mockLibvirtUtils := MockLibvirtUtils{} mockUtils := mockLibvirtUtils{}
l := Libvirt{ l := Libvirt{
LibvirtURI: "this/is/wrong/uri", LibvirtURI: "this/is/wrong/uri",
Log: testutil.Logger{}, Log: testutil.Logger{},
utils: &mockLibvirtUtils, utils: &mockUtils,
} }
mockLibvirtUtils.On("EnsureConnected", mock.Anything).Return(errors.New("failed to connect")).Once() mockUtils.On("ensureConnected", mock.Anything).Return(errors.New("failed to connect")).Once()
err := l.Gather(&acc) err := l.Gather(&acc)
require.Error(t, err) require.Error(t, err)
require.Contains(t, err.Error(), "failed to connect") require.Contains(t, err.Error(), "failed to connect")
mockLibvirtUtils.AssertExpectations(t) mockUtils.AssertExpectations(t)
}) })
t.Run("error when read error happened in gathering domains", func(t *testing.T) { t.Run("error when read error happened in gathering domains", func(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
mockLibvirtUtils := MockLibvirtUtils{} mockUtils := mockLibvirtUtils{}
l := Libvirt{ l := Libvirt{
utils: &mockLibvirtUtils, utils: &mockUtils,
Log: testutil.Logger{}, Log: testutil.Logger{},
StatisticsGroups: []string{"state"}, StatisticsGroups: []string{"state"},
} }
mockLibvirtUtils.On("EnsureConnected", mock.Anything).Return(nil).Once(). mockUtils.On("ensureConnected", mock.Anything).Return(nil).Once().
On("GatherAllDomains", mock.Anything).Return(nil, errors.New("gather domain error")).Once(). On("gatherAllDomains", mock.Anything).Return(nil, errors.New("gather domain error")).Once().
On("Disconnect").Return(nil).Once() On("disconnect").Return(nil).Once()
err := l.Gather(&acc) err := l.Gather(&acc)
require.Error(t, err) require.Error(t, err)
require.Contains(t, err.Error(), "gather domain error") require.Contains(t, err.Error(), "gather domain error")
mockLibvirtUtils.AssertExpectations(t) mockUtils.AssertExpectations(t)
}) })
t.Run("no error when empty list of domains is returned", func(t *testing.T) { t.Run("no error when empty list of domains is returned", func(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
mockLibvirtUtils := MockLibvirtUtils{} mockUtils := mockLibvirtUtils{}
l := Libvirt{ l := Libvirt{
utils: &mockLibvirtUtils, utils: &mockUtils,
Log: testutil.Logger{}, Log: testutil.Logger{},
StatisticsGroups: []string{"state"}, StatisticsGroups: []string{"state"},
} }
mockLibvirtUtils.On("EnsureConnected", mock.Anything).Return(nil).Once(). mockUtils.On("ensureConnected", mock.Anything).Return(nil).Once().
On("GatherAllDomains", mock.Anything).Return(nil, nil).Once() On("gatherAllDomains", mock.Anything).Return(nil, nil).Once()
err := l.Gather(&acc) err := l.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
mockLibvirtUtils.AssertExpectations(t) mockUtils.AssertExpectations(t)
}) })
t.Run("error when gathering metrics by number", func(t *testing.T) { t.Run("error when gathering metrics by number", func(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
mockLibvirtUtils := MockLibvirtUtils{} mockUtils := mockLibvirtUtils{}
l := Libvirt{ l := Libvirt{
utils: &mockLibvirtUtils, utils: &mockUtils,
Log: testutil.Logger{}, Log: testutil.Logger{},
StatisticsGroups: []string{"state"}, StatisticsGroups: []string{"state"},
} }
mockLibvirtUtils.On("EnsureConnected", mock.Anything).Return(nil).Once(). mockUtils.On("ensureConnected", mock.Anything).Return(nil).Once().
On("GatherAllDomains", mock.Anything).Return(domains, nil).Once(). On("gatherAllDomains", mock.Anything).Return(domains, nil).Once().
On("GatherStatsForDomains", mock.Anything, mock.Anything). On("gatherStatsForDomains", mock.Anything, mock.Anything).
Return(nil, errors.New("gathering metric by number error")).Once(). Return(nil, errors.New("gathering metric by number error")).Once().
On("Disconnect").Return(nil).Once() On("disconnect").Return(nil).Once()
err := l.Init() err := l.Init()
require.NoError(t, err) require.NoError(t, err)
@ -129,7 +129,7 @@ func TestLibvirt_Gather(t *testing.T) {
err = l.Gather(&acc) err = l.Gather(&acc)
require.Error(t, err) require.Error(t, err)
require.Contains(t, err.Error(), "gathering metric by number error") require.Contains(t, err.Error(), "gathering metric by number error")
mockLibvirtUtils.AssertExpectations(t) mockUtils.AssertExpectations(t)
}) })
var successfulTests = []struct { var successfulTests = []struct {
@ -153,20 +153,20 @@ func TestLibvirt_Gather(t *testing.T) {
for _, test := range successfulTests { for _, test := range successfulTests {
t.Run(test.testName, func(t *testing.T) { t.Run(test.testName, func(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
mockLibvirtUtils := MockLibvirtUtils{} mockUtils := mockLibvirtUtils{}
l := Libvirt{ l := Libvirt{
utils: &mockLibvirtUtils, utils: &mockUtils,
Log: testutil.Logger{}, Log: testutil.Logger{},
StatisticsGroups: []string{"state"}, StatisticsGroups: []string{"state"},
Domains: test.excludeDomains, Domains: test.excludeDomains,
AdditionalStatistics: []string{"vcpu_mapping"}, AdditionalStatistics: []string{"vcpu_mapping"},
} }
mockLibvirtUtils.On("EnsureConnected", mock.Anything).Return(nil).Once(). mockUtils.On("ensureConnected", mock.Anything).Return(nil).Once().
On("GatherAllDomains", mock.Anything).Return(test.allDomains, nil).Once(). On("gatherAllDomains", mock.Anything).Return(test.allDomains, nil).Once().
On("GatherVcpuMapping", domains[0], mock.Anything, mock.Anything).Return(test.vcpuMapping, nil).Maybe(). On("gatherVcpuMapping", domains[0], mock.Anything, mock.Anything).Return(test.vcpuMapping, nil).Maybe().
On("GatherVcpuMapping", domains[1], mock.Anything, mock.Anything).Return(test.vcpuMapping, nil).Once(). On("gatherVcpuMapping", domains[1], mock.Anything, mock.Anything).Return(test.vcpuMapping, nil).Once().
On("GatherNumberOfPCPUs").Return(4, nil).Once(). On("gatherNumberOfPCPUs").Return(4, nil).Once().
On("GatherStatsForDomains", mock.Anything, mock.Anything).Return(test.statsForDomains, nil).Once() On("gatherStatsForDomains", mock.Anything, mock.Anything).Return(test.statsForDomains, nil).Once()
err := l.Init() err := l.Init()
require.NoError(t, err) require.NoError(t, err)
@ -177,7 +177,7 @@ func TestLibvirt_Gather(t *testing.T) {
actual := acc.GetTelegrafMetrics() actual := acc.GetTelegrafMetrics()
expected := test.expectedMetrics expected := test.expectedMetrics
testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics(), testutil.IgnoreTime()) testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics(), testutil.IgnoreTime())
mockLibvirtUtils.AssertExpectations(t) mockUtils.AssertExpectations(t)
}) })
} }
} }
@ -205,23 +205,23 @@ func TestLibvirt_GatherMetrics(t *testing.T) {
for _, test := range successfulTests { for _, test := range successfulTests {
t.Run(test.testName, func(t *testing.T) { t.Run(test.testName, func(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
mockLibvirtUtils := MockLibvirtUtils{} mockUtils := mockLibvirtUtils{}
l := Libvirt{ l := Libvirt{
utils: &mockLibvirtUtils, utils: &mockUtils,
Log: testutil.Logger{}, Log: testutil.Logger{},
Domains: test.excludeDomains, Domains: test.excludeDomains,
} }
mockLibvirtUtils.On("EnsureConnected", mock.Anything).Return(nil).Once(). mockUtils.On("ensureConnected", mock.Anything).Return(nil).Once().
On("GatherAllDomains", mock.Anything).Return(test.allDomains, nil).Once(). On("gatherAllDomains", mock.Anything).Return(test.allDomains, nil).Once().
On("GatherStatsForDomains", mock.Anything, mock.Anything).Return(test.statsForDomains, nil).Once() On("gatherStatsForDomains", mock.Anything, mock.Anything).Return(test.statsForDomains, nil).Once()
if test.vcpuMapping != nil { if test.vcpuMapping != nil {
l.vcpuMappingEnabled = true l.vcpuMappingEnabled = true
l.metricNumber = domainStatsVCPU l.metricNumber = domainStatsVCPU
mockLibvirtUtils.On("GatherNumberOfPCPUs").Return(4, nil).Once(). mockUtils.On("gatherNumberOfPCPUs").Return(4, nil).Once().
On("GatherVcpuMapping", domains[0], mock.Anything, mock.Anything).Return(test.vcpuMapping, nil).Once(). On("gatherVcpuMapping", domains[0], mock.Anything, mock.Anything).Return(test.vcpuMapping, nil).Once().
On("GatherVcpuMapping", domains[1], mock.Anything, mock.Anything).Return(nil, nil).Once() On("gatherVcpuMapping", domains[1], mock.Anything, mock.Anything).Return(nil, nil).Once()
} }
err := l.Gather(&acc) err := l.Gather(&acc)
@ -230,7 +230,7 @@ func TestLibvirt_GatherMetrics(t *testing.T) {
actual := acc.GetTelegrafMetrics() actual := acc.GetTelegrafMetrics()
expected := test.expectedMetrics expected := test.expectedMetrics
testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics(), testutil.IgnoreTime()) testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics(), testutil.IgnoreTime())
mockLibvirtUtils.AssertExpectations(t) mockUtils.AssertExpectations(t)
}) })
} }
} }

View File

@ -9,12 +9,12 @@ import (
) )
type utils interface { type utils interface {
GatherAllDomains() (domains []golibvirt.Domain, err error) gatherAllDomains() (domains []golibvirt.Domain, err error)
GatherStatsForDomains(domains []golibvirt.Domain, metricNumber uint32) ([]golibvirt.DomainStatsRecord, error) gatherStatsForDomains(domains []golibvirt.Domain, metricNumber uint32) ([]golibvirt.DomainStatsRecord, error)
GatherNumberOfPCPUs() (int, error) gatherNumberOfPCPUs() (int, error)
GatherVcpuMapping(domain golibvirt.Domain, pCPUs int, shouldGetCurrentPCPU bool) ([]vcpuAffinity, error) gatherVcpuMapping(domain golibvirt.Domain, pCPUs int, shouldGetCurrentPCPU bool) ([]vcpuAffinity, error)
EnsureConnected(libvirtURI string) error ensureConnected(libvirtURI string) error
Disconnect() error disconnect() error
} }
type utilsImpl struct { type utilsImpl struct {
@ -27,8 +27,8 @@ type vcpuAffinity struct {
currentPCPUID int32 currentPCPUID int32
} }
// GatherAllDomains gathers all domains on system // gatherAllDomains gathers all domains on system
func (l *utilsImpl) GatherAllDomains() (domains []golibvirt.Domain, err error) { func (l *utilsImpl) gatherAllDomains() (domains []golibvirt.Domain, err error) {
allDomainStatesFlag := golibvirt.ConnectListDomainsRunning + golibvirt.ConnectListDomainsPaused + allDomainStatesFlag := golibvirt.ConnectListDomainsRunning + golibvirt.ConnectListDomainsPaused +
golibvirt.ConnectListDomainsShutoff + golibvirt.ConnectListDomainsOther golibvirt.ConnectListDomainsShutoff + golibvirt.ConnectListDomainsOther
@ -36,8 +36,8 @@ func (l *utilsImpl) GatherAllDomains() (domains []golibvirt.Domain, err error) {
return domains, err return domains, err
} }
// GatherStatsForDomains gathers stats for given domains based on number that was previously calculated // gatherStatsForDomains gathers stats for given domains based on number that was previously calculated
func (l *utilsImpl) GatherStatsForDomains(domains []golibvirt.Domain, metricNumber uint32) ([]golibvirt.DomainStatsRecord, error) { func (l *utilsImpl) gatherStatsForDomains(domains []golibvirt.Domain, metricNumber uint32) ([]golibvirt.DomainStatsRecord, error) {
if metricNumber == 0 { if metricNumber == 0 {
// do not need to do expensive call if no stats were set to gather // do not need to do expensive call if no stats were set to gather
return nil, nil return nil, nil
@ -49,7 +49,7 @@ func (l *utilsImpl) GatherStatsForDomains(domains []golibvirt.Domain, metricNumb
return l.libvirt.ConnectGetAllDomainStats(domains, metricNumber, uint32(allDomainStatesFlag)) return l.libvirt.ConnectGetAllDomainStats(domains, metricNumber, uint32(allDomainStatesFlag))
} }
func (l *utilsImpl) GatherNumberOfPCPUs() (int, error) { func (l *utilsImpl) gatherNumberOfPCPUs() (int, error) {
//nolint:dogsled //Using only needed values from library function //nolint:dogsled //Using only needed values from library function
_, _, _, _, nodes, sockets, cores, threads, err := l.libvirt.NodeGetInfo() _, _, _, _, nodes, sockets, cores, threads, err := l.libvirt.NodeGetInfo()
if err != nil { if err != nil {
@ -59,10 +59,10 @@ func (l *utilsImpl) GatherNumberOfPCPUs() (int, error) {
return int(nodes * sockets * cores * threads), nil return int(nodes * sockets * cores * threads), nil
} }
// GatherVcpuMapping is based on official go-libvirt library: // gatherVcpuMapping is based on official go-libvirt library:
// https://github.com/libvirt/libvirt-go-module/blob/268a5d02e00cc9b3d5d7fa6c08d753071e7d14b8/domain.go#L4516 // https://github.com/libvirt/libvirt-go-module/blob/268a5d02e00cc9b3d5d7fa6c08d753071e7d14b8/domain.go#L4516
// (this library cannot be used here because of C bindings) // (this library cannot be used here because of C bindings)
func (l *utilsImpl) GatherVcpuMapping(domain golibvirt.Domain, pCPUs int, shouldGetCurrentPCPU bool) ([]vcpuAffinity, error) { func (l *utilsImpl) gatherVcpuMapping(domain golibvirt.Domain, pCPUs int, shouldGetCurrentPCPU bool) ([]vcpuAffinity, error) {
//nolint:dogsled //Using only needed values from library function //nolint:dogsled //Using only needed values from library function
_, _, _, vCPUs, _, err := l.libvirt.DomainGetInfo(domain) _, _, _, vCPUs, _, err := l.libvirt.DomainGetInfo(domain)
if err != nil { if err != nil {
@ -114,7 +114,7 @@ func (l *utilsImpl) GatherVcpuMapping(domain golibvirt.Domain, pCPUs int, should
return vcpuAffinities, nil return vcpuAffinities, nil
} }
func (l *utilsImpl) EnsureConnected(libvirtURI string) error { func (l *utilsImpl) ensureConnected(libvirtURI string) error {
if isConnected(l.libvirt) { if isConnected(l.libvirt) {
return nil return nil
} }
@ -127,7 +127,7 @@ func (l *utilsImpl) EnsureConnected(libvirtURI string) error {
return nil return nil
} }
func (l *utilsImpl) Disconnect() error { func (l *utilsImpl) disconnect() error {
l.libvirt = nil l.libvirt = nil
return nil return nil
} }

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.14.0. DO NOT EDIT. // Code generated by mockery v2.46.3. DO NOT EDIT.
package libvirt package libvirt
@ -7,15 +7,19 @@ import (
mock "github.com/stretchr/testify/mock" mock "github.com/stretchr/testify/mock"
) )
// MockLibvirtUtils is an autogenerated mock type for the utils type // mockLibvirtUtils is an autogenerated mock type for the utils type
type MockLibvirtUtils struct { type mockLibvirtUtils struct {
mock.Mock mock.Mock
} }
// Disconnect provides a mock function with given fields: // disconnect provides a mock function with given fields:
func (_m *MockLibvirtUtils) Disconnect() error { func (_m *mockLibvirtUtils) disconnect() error {
ret := _m.Called() ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for disconnect")
}
var r0 error var r0 error
if rf, ok := ret.Get(0).(func() error); ok { if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf() r0 = rf()
@ -26,10 +30,14 @@ func (_m *MockLibvirtUtils) Disconnect() error {
return r0 return r0
} }
// EnsureConnected provides a mock function with given fields: libvirtURI // ensureConnected provides a mock function with given fields: libvirtURI
func (_m *MockLibvirtUtils) EnsureConnected(libvirtURI string) error { func (_m *mockLibvirtUtils) ensureConnected(libvirtURI string) error {
ret := _m.Called(libvirtURI) ret := _m.Called(libvirtURI)
if len(ret) == 0 {
panic("no return value specified for ensureConnected")
}
var r0 error var r0 error
if rf, ok := ret.Get(0).(func(string) error); ok { if rf, ok := ret.Get(0).(func(string) error); ok {
r0 = rf(libvirtURI) r0 = rf(libvirtURI)
@ -40,11 +48,19 @@ func (_m *MockLibvirtUtils) EnsureConnected(libvirtURI string) error {
return r0 return r0
} }
// GatherAllDomains provides a mock function with given fields: // gatherAllDomains provides a mock function with given fields:
func (_m *MockLibvirtUtils) GatherAllDomains() ([]go_libvirt.Domain, error) { func (_m *mockLibvirtUtils) gatherAllDomains() ([]go_libvirt.Domain, error) {
ret := _m.Called() ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for gatherAllDomains")
}
var r0 []go_libvirt.Domain var r0 []go_libvirt.Domain
var r1 error
if rf, ok := ret.Get(0).(func() ([]go_libvirt.Domain, error)); ok {
return rf()
}
if rf, ok := ret.Get(0).(func() []go_libvirt.Domain); ok { if rf, ok := ret.Get(0).(func() []go_libvirt.Domain); ok {
r0 = rf() r0 = rf()
} else { } else {
@ -53,7 +69,6 @@ func (_m *MockLibvirtUtils) GatherAllDomains() ([]go_libvirt.Domain, error) {
} }
} }
var r1 error
if rf, ok := ret.Get(1).(func() error); ok { if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf() r1 = rf()
} else { } else {
@ -63,18 +78,25 @@ func (_m *MockLibvirtUtils) GatherAllDomains() ([]go_libvirt.Domain, error) {
return r0, r1 return r0, r1
} }
// GatherNumberOfPCPUs provides a mock function with given fields: // gatherNumberOfPCPUs provides a mock function with given fields:
func (_m *MockLibvirtUtils) GatherNumberOfPCPUs() (int, error) { func (_m *mockLibvirtUtils) gatherNumberOfPCPUs() (int, error) {
ret := _m.Called() ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for gatherNumberOfPCPUs")
}
var r0 int var r0 int
var r1 error
if rf, ok := ret.Get(0).(func() (int, error)); ok {
return rf()
}
if rf, ok := ret.Get(0).(func() int); ok { if rf, ok := ret.Get(0).(func() int); ok {
r0 = rf() r0 = rf()
} else { } else {
r0 = ret.Get(0).(int) r0 = ret.Get(0).(int)
} }
var r1 error
if rf, ok := ret.Get(1).(func() error); ok { if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf() r1 = rf()
} else { } else {
@ -84,11 +106,19 @@ func (_m *MockLibvirtUtils) GatherNumberOfPCPUs() (int, error) {
return r0, r1 return r0, r1
} }
// GatherStatsForDomains provides a mock function with given fields: domains, metricNumber // gatherStatsForDomains provides a mock function with given fields: domains, metricNumber
func (_m *MockLibvirtUtils) GatherStatsForDomains(domains []go_libvirt.Domain, metricNumber uint32) ([]go_libvirt.DomainStatsRecord, error) { func (_m *mockLibvirtUtils) gatherStatsForDomains(domains []go_libvirt.Domain, metricNumber uint32) ([]go_libvirt.DomainStatsRecord, error) {
ret := _m.Called(domains, metricNumber) ret := _m.Called(domains, metricNumber)
if len(ret) == 0 {
panic("no return value specified for gatherStatsForDomains")
}
var r0 []go_libvirt.DomainStatsRecord var r0 []go_libvirt.DomainStatsRecord
var r1 error
if rf, ok := ret.Get(0).(func([]go_libvirt.Domain, uint32) ([]go_libvirt.DomainStatsRecord, error)); ok {
return rf(domains, metricNumber)
}
if rf, ok := ret.Get(0).(func([]go_libvirt.Domain, uint32) []go_libvirt.DomainStatsRecord); ok { if rf, ok := ret.Get(0).(func([]go_libvirt.Domain, uint32) []go_libvirt.DomainStatsRecord); ok {
r0 = rf(domains, metricNumber) r0 = rf(domains, metricNumber)
} else { } else {
@ -97,7 +127,6 @@ func (_m *MockLibvirtUtils) GatherStatsForDomains(domains []go_libvirt.Domain, m
} }
} }
var r1 error
if rf, ok := ret.Get(1).(func([]go_libvirt.Domain, uint32) error); ok { if rf, ok := ret.Get(1).(func([]go_libvirt.Domain, uint32) error); ok {
r1 = rf(domains, metricNumber) r1 = rf(domains, metricNumber)
} else { } else {
@ -107,11 +136,19 @@ func (_m *MockLibvirtUtils) GatherStatsForDomains(domains []go_libvirt.Domain, m
return r0, r1 return r0, r1
} }
// GatherVcpuMapping provides a mock function with given fields: domain, pCPUs, shouldGetCurrentPCPU // gatherVcpuMapping provides a mock function with given fields: domain, pCPUs, shouldGetCurrentPCPU
func (_m *MockLibvirtUtils) GatherVcpuMapping(domain go_libvirt.Domain, pCPUs int, shouldGetCurrentPCPU bool) ([]vcpuAffinity, error) { func (_m *mockLibvirtUtils) gatherVcpuMapping(domain go_libvirt.Domain, pCPUs int, shouldGetCurrentPCPU bool) ([]vcpuAffinity, error) {
ret := _m.Called(domain, pCPUs, shouldGetCurrentPCPU) ret := _m.Called(domain, pCPUs, shouldGetCurrentPCPU)
if len(ret) == 0 {
panic("no return value specified for gatherVcpuMapping")
}
var r0 []vcpuAffinity var r0 []vcpuAffinity
var r1 error
if rf, ok := ret.Get(0).(func(go_libvirt.Domain, int, bool) ([]vcpuAffinity, error)); ok {
return rf(domain, pCPUs, shouldGetCurrentPCPU)
}
if rf, ok := ret.Get(0).(func(go_libvirt.Domain, int, bool) []vcpuAffinity); ok { if rf, ok := ret.Get(0).(func(go_libvirt.Domain, int, bool) []vcpuAffinity); ok {
r0 = rf(domain, pCPUs, shouldGetCurrentPCPU) r0 = rf(domain, pCPUs, shouldGetCurrentPCPU)
} else { } else {
@ -120,7 +157,6 @@ func (_m *MockLibvirtUtils) GatherVcpuMapping(domain go_libvirt.Domain, pCPUs in
} }
} }
var r1 error
if rf, ok := ret.Get(1).(func(go_libvirt.Domain, int, bool) error); ok { if rf, ok := ret.Get(1).(func(go_libvirt.Domain, int, bool) error); ok {
r1 = rf(domain, pCPUs, shouldGetCurrentPCPU) r1 = rf(domain, pCPUs, shouldGetCurrentPCPU)
} else { } else {
@ -130,14 +166,13 @@ func (_m *MockLibvirtUtils) GatherVcpuMapping(domain go_libvirt.Domain, pCPUs in
return r0, r1 return r0, r1
} }
type mockConstructorTestingTNewMockLibvirtUtils interface { // newMockLibvirtUtils creates a new instance of mockLibvirtUtils. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func newMockLibvirtUtils(t interface {
mock.TestingT mock.TestingT
Cleanup(func()) Cleanup(func())
} }) *mockLibvirtUtils {
mock := &mockLibvirtUtils{}
// NewMockLibvirtUtils creates a new instance of MockLibvirtUtils. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockLibvirtUtils(t mockConstructorTestingTNewMockLibvirtUtils) *MockLibvirtUtils {
mock := &MockLibvirtUtils{}
mock.Mock.Test(t) mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) }) t.Cleanup(func() { mock.AssertExpectations(t) })

View File

@ -19,6 +19,9 @@ import (
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
//go:embed sample.conf
var sampleConfig string
const ( const (
defaultHostSys = "/sys" defaultHostSys = "/sys"
cpufreq = "cpufreq" cpufreq = "cpufreq"
@ -26,9 +29,9 @@ const (
) )
type LinuxCPU struct { type LinuxCPU struct {
Log telegraf.Logger `toml:"-"`
PathSysfs string `toml:"host_sys"` PathSysfs string `toml:"host_sys"`
Metrics []string `toml:"metrics"` Metrics []string `toml:"metrics"`
Log telegraf.Logger `toml:"-"`
cpus []cpu cpus []cpu
} }
@ -44,9 +47,6 @@ type prop struct {
optional bool optional bool
} }
//go:embed sample.conf
var sampleConfig string
func (g *LinuxCPU) SampleConfig() string { func (g *LinuxCPU) SampleConfig() string {
return sampleConfig return sampleConfig
} }
@ -172,14 +172,6 @@ func (g *LinuxCPU) discoverCpus() ([]cpu, error) {
return cpus, nil return cpus, nil
} }
func init() {
inputs.Add("linux_cpu", func() telegraf.Input {
return &LinuxCPU{
Metrics: []string{"cpufreq"},
}
})
}
func validatePath(propPath string) error { func validatePath(propPath string) error {
f, err := os.Open(propPath) f, err := os.Open(propPath)
if os.IsNotExist(err) { if os.IsNotExist(err) {
@ -211,3 +203,11 @@ func readUintFromFile(propPath string) (uint64, error) {
return strconv.ParseUint(string(buffer[:n-1]), 10, 64) return strconv.ParseUint(string(buffer[:n-1]), 10, 64)
} }
func init() {
inputs.Add("linux_cpu", func() telegraf.Input {
return &LinuxCPU{
Metrics: []string{"cpufreq"},
}
})
}

View File

@ -16,11 +16,13 @@ type LinuxCPU struct {
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
} }
func (*LinuxCPU) SampleConfig() string { return sampleConfig }
func (l *LinuxCPU) Init() error { func (l *LinuxCPU) Init() error {
l.Log.Warn("current platform is not supported") l.Log.Warn("Current platform is not supported")
return nil return nil
} }
func (*LinuxCPU) SampleConfig() string { return sampleConfig }
func (*LinuxCPU) Gather(_ telegraf.Accumulator) error { return nil } func (*LinuxCPU) Gather(_ telegraf.Accumulator) error { return nil }
func init() { func init() {

View File

@ -10,6 +10,7 @@ import (
"strconv" "strconv"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
@ -21,6 +22,36 @@ type SysctlFS struct {
path string path string
} }
func (*SysctlFS) SampleConfig() string {
return sampleConfig
}
func (sfs *SysctlFS) Gather(acc telegraf.Accumulator) error {
fields := make(map[string]interface{})
for _, n := range []string{"aio-nr", "aio-max-nr", "dquot-nr", "dquot-max", "super-nr", "super-max"} {
if err := sfs.gatherOne(n, fields); err != nil {
return err
}
}
err := sfs.gatherList("inode-state", fields, "inode-nr", "inode-free-nr", "inode-preshrink-nr")
if err != nil {
return err
}
err = sfs.gatherList("dentry-state", fields, "dentry-nr", "dentry-unused-nr", "dentry-age-limit", "dentry-want-pages")
if err != nil {
return err
}
err = sfs.gatherList("file-nr", fields, "file-nr", "", "file-max")
if err != nil {
return err
}
acc.AddFields("linux_sysctl_fs", fields, nil)
return nil
}
func (sfs *SysctlFS) gatherList(file string, fields map[string]interface{}, fieldNames ...string) error { func (sfs *SysctlFS) gatherList(file string, fields map[string]interface{}, fieldNames ...string) error {
bs, err := os.ReadFile(sfs.path + "/" + file) bs, err := os.ReadFile(sfs.path + "/" + file)
if err != nil { if err != nil {
@ -69,48 +100,10 @@ func (sfs *SysctlFS) gatherOne(name string, fields map[string]interface{}) error
return nil return nil
} }
func (*SysctlFS) SampleConfig() string {
return sampleConfig
}
func (sfs *SysctlFS) Gather(acc telegraf.Accumulator) error {
fields := make(map[string]interface{})
for _, n := range []string{"aio-nr", "aio-max-nr", "dquot-nr", "dquot-max", "super-nr", "super-max"} {
if err := sfs.gatherOne(n, fields); err != nil {
return err
}
}
err := sfs.gatherList("inode-state", fields, "inode-nr", "inode-free-nr", "inode-preshrink-nr")
if err != nil {
return err
}
err = sfs.gatherList("dentry-state", fields, "dentry-nr", "dentry-unused-nr", "dentry-age-limit", "dentry-want-pages")
if err != nil {
return err
}
err = sfs.gatherList("file-nr", fields, "file-nr", "", "file-max")
if err != nil {
return err
}
acc.AddFields("linux_sysctl_fs", fields, nil)
return nil
}
func GetHostProc() string {
procPath := "/proc"
if os.Getenv("HOST_PROC") != "" {
procPath = os.Getenv("HOST_PROC")
}
return procPath
}
func init() { func init() {
inputs.Add("linux_sysctl_fs", func() telegraf.Input { inputs.Add("linux_sysctl_fs", func() telegraf.Input {
return &SysctlFS{ return &SysctlFS{
path: path.Join(GetHostProc(), "/sys/fs"), path: path.Join(internal.GetProcPath(), "/sys/fs"),
} }
}) })
} }

View File

@ -21,17 +21,35 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
const (
defaultWatchMethod = "inotify"
)
var ( var (
offsets = make(map[string]int64) offsets = make(map[string]int64)
offsetsMutex = new(sync.Mutex) offsetsMutex = new(sync.Mutex)
) )
// LogParser in the primary interface for the plugin const (
type GrokConfig struct { defaultWatchMethod = "inotify"
)
type LogParser struct {
Files []string `toml:"files"`
FromBeginning bool `toml:"from_beginning"`
WatchMethod string `toml:"watch_method"`
GrokConfig grokConfig `toml:"grok"`
Log telegraf.Logger `toml:"-"`
tailers map[string]*tail.Tail
offsets map[string]int64
lines chan logEntry
done chan struct{}
wg sync.WaitGroup
acc telegraf.Accumulator
sync.Mutex
grokParser telegraf.Parser
}
type grokConfig struct {
MeasurementName string `toml:"measurement"` MeasurementName string `toml:"measurement"`
Patterns []string Patterns []string
NamedPatterns []string NamedPatterns []string
@ -46,63 +64,16 @@ type logEntry struct {
line string line string
} }
// LogParserPlugin is the primary struct to implement the interface for logparser plugin func (*LogParser) SampleConfig() string {
type LogParserPlugin struct {
Files []string
FromBeginning bool
WatchMethod string
Log telegraf.Logger
tailers map[string]*tail.Tail
offsets map[string]int64
lines chan logEntry
done chan struct{}
wg sync.WaitGroup
acc telegraf.Accumulator
sync.Mutex
GrokParser telegraf.Parser
GrokConfig GrokConfig `toml:"grok"`
}
func NewLogParser() *LogParserPlugin {
offsetsMutex.Lock()
offsetsCopy := make(map[string]int64, len(offsets))
for k, v := range offsets {
offsetsCopy[k] = v
}
offsetsMutex.Unlock()
return &LogParserPlugin{
WatchMethod: defaultWatchMethod,
offsets: offsetsCopy,
}
}
func (*LogParserPlugin) SampleConfig() string {
return sampleConfig return sampleConfig
} }
func (l *LogParserPlugin) Init() error { func (l *LogParser) Init() error {
l.Log.Warnf(`The logparser plugin is deprecated; please use the 'tail' input with the 'grok' data_format`) l.Log.Warnf(`The logparser plugin is deprecated; please use the 'tail' input with the 'grok' data_format`)
return nil return nil
} }
// Gather is the primary function to collect the metrics for the plugin func (l *LogParser) Start(acc telegraf.Accumulator) error {
func (l *LogParserPlugin) Gather(_ telegraf.Accumulator) error {
l.Lock()
defer l.Unlock()
// always start from the beginning of files that appear while we're running
l.tailNewFiles(true)
return nil
}
// Start kicks off collection of stats for the plugin
func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
l.Lock() l.Lock()
defer l.Unlock() defer l.Unlock()
@ -130,8 +101,8 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
if err != nil { if err != nil {
return err return err
} }
l.GrokParser = &parser l.grokParser = &parser
models.SetLoggerOnPlugin(l.GrokParser, l.Log) models.SetLoggerOnPlugin(l.grokParser, l.Log)
l.wg.Add(1) l.wg.Add(1)
go l.parser() go l.parser()
@ -148,9 +119,54 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
return nil return nil
} }
func (l *LogParser) Gather(_ telegraf.Accumulator) error {
l.Lock()
defer l.Unlock()
// always start from the beginning of files that appear while we're running
l.tailNewFiles(true)
return nil
}
func (l *LogParser) Stop() {
l.Lock()
defer l.Unlock()
for _, t := range l.tailers {
if !l.FromBeginning {
// store offset for resume
offset, err := t.Tell()
if err == nil {
l.offsets[t.Filename] = offset
l.Log.Debugf("Recording offset %d for file: %v", offset, t.Filename)
} else {
l.acc.AddError(fmt.Errorf("error recording offset for file %s", t.Filename))
}
}
err := t.Stop()
// message for a stopped tailer
l.Log.Debugf("Tail dropped for file: %v", t.Filename)
if err != nil {
l.Log.Errorf("Error stopping tail on file %s", t.Filename)
}
}
close(l.done)
l.wg.Wait()
// persist offsets
offsetsMutex.Lock()
for k, v := range l.offsets {
offsets[k] = v
}
offsetsMutex.Unlock()
}
// check the globs against files on disk, and start tailing any new files. // check the globs against files on disk, and start tailing any new files.
// Assumes l's lock is held! // Assumes l's lock is held!
func (l *LogParserPlugin) tailNewFiles(fromBeginning bool) { func (l *LogParser) tailNewFiles(fromBeginning bool) {
var poll bool var poll bool
if l.WatchMethod == "poll" { if l.WatchMethod == "poll" {
poll = true poll = true
@ -213,7 +229,7 @@ func (l *LogParserPlugin) tailNewFiles(fromBeginning bool) {
// receiver is launched as a goroutine to continuously watch a tailed logfile // receiver is launched as a goroutine to continuously watch a tailed logfile
// for changes and send any log lines down the l.lines channel. // for changes and send any log lines down the l.lines channel.
func (l *LogParserPlugin) receiver(tailer *tail.Tail) { func (l *LogParser) receiver(tailer *tail.Tail) {
defer l.wg.Done() defer l.wg.Done()
var line *tail.Line var line *tail.Line
@ -242,7 +258,7 @@ func (l *LogParserPlugin) receiver(tailer *tail.Tail) {
// parse is launched as a goroutine to watch the l.lines channel. // parse is launched as a goroutine to watch the l.lines channel.
// when a line is available, parse parses it and adds the metric(s) to the // when a line is available, parse parses it and adds the metric(s) to the
// accumulator. // accumulator.
func (l *LogParserPlugin) parser() { func (l *LogParser) parser() {
defer l.wg.Done() defer l.wg.Done()
var m telegraf.Metric var m telegraf.Metric
@ -257,7 +273,7 @@ func (l *LogParserPlugin) parser() {
continue continue
} }
} }
m, err = l.GrokParser.ParseLine(entry.line) m, err = l.grokParser.ParseLine(entry.line)
if err == nil { if err == nil {
if m != nil { if m != nil {
tags := m.Tags() tags := m.Tags()
@ -270,44 +286,22 @@ func (l *LogParserPlugin) parser() {
} }
} }
// Stop will end the metrics collection process on file tailers func newLogParser() *LogParser {
func (l *LogParserPlugin) Stop() {
l.Lock()
defer l.Unlock()
for _, t := range l.tailers {
if !l.FromBeginning {
// store offset for resume
offset, err := t.Tell()
if err == nil {
l.offsets[t.Filename] = offset
l.Log.Debugf("Recording offset %d for file: %v", offset, t.Filename)
} else {
l.acc.AddError(fmt.Errorf("error recording offset for file %s", t.Filename))
}
}
err := t.Stop()
// message for a stopped tailer
l.Log.Debugf("Tail dropped for file: %v", t.Filename)
if err != nil {
l.Log.Errorf("Error stopping tail on file %s", t.Filename)
}
}
close(l.done)
l.wg.Wait()
// persist offsets
offsetsMutex.Lock() offsetsMutex.Lock()
for k, v := range l.offsets { offsetsCopy := make(map[string]int64, len(offsets))
offsets[k] = v for k, v := range offsets {
offsetsCopy[k] = v
} }
offsetsMutex.Unlock() offsetsMutex.Unlock()
return &LogParser{
WatchMethod: defaultWatchMethod,
offsets: offsetsCopy,
}
} }
func init() { func init() {
inputs.Add("logparser", func() telegraf.Input { inputs.Add("logparser", func() telegraf.Input {
return NewLogParser() return newLogParser()
}) })
} }

View File

@ -17,7 +17,7 @@ var (
) )
func TestStartNoParsers(t *testing.T) { func TestStartNoParsers(t *testing.T) {
logparser := &LogParserPlugin{ logparser := &LogParser{
Log: testutil.Logger{}, Log: testutil.Logger{},
FromBeginning: true, FromBeginning: true,
Files: []string{filepath.Join(testdataDir, "*.log")}, Files: []string{filepath.Join(testdataDir, "*.log")},
@ -28,11 +28,11 @@ func TestStartNoParsers(t *testing.T) {
} }
func TestGrokParseLogFilesNonExistPattern(t *testing.T) { func TestGrokParseLogFilesNonExistPattern(t *testing.T) {
logparser := &LogParserPlugin{ logparser := &LogParser{
Log: testutil.Logger{}, Log: testutil.Logger{},
FromBeginning: true, FromBeginning: true,
Files: []string{filepath.Join(testdataDir, "*.log")}, Files: []string{filepath.Join(testdataDir, "*.log")},
GrokConfig: GrokConfig{ GrokConfig: grokConfig{
Patterns: []string{"%{FOOBAR}"}, Patterns: []string{"%{FOOBAR}"},
CustomPatternFiles: []string{filepath.Join(testdataDir, "test-patterns")}, CustomPatternFiles: []string{filepath.Join(testdataDir, "test-patterns")},
}, },
@ -44,9 +44,9 @@ func TestGrokParseLogFilesNonExistPattern(t *testing.T) {
} }
func TestGrokParseLogFiles(t *testing.T) { func TestGrokParseLogFiles(t *testing.T) {
logparser := &LogParserPlugin{ logparser := &LogParser{
Log: testutil.Logger{}, Log: testutil.Logger{},
GrokConfig: GrokConfig{ GrokConfig: grokConfig{
MeasurementName: "logparser_grok", MeasurementName: "logparser_grok",
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}", "%{TEST_LOG_C}"}, Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}", "%{TEST_LOG_C}"},
CustomPatternFiles: []string{filepath.Join(testdataDir, "test-patterns")}, CustomPatternFiles: []string{filepath.Join(testdataDir, "test-patterns")},
@ -122,11 +122,11 @@ func TestGrokParseLogFilesAppearLater(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer os.RemoveAll(emptydir) defer os.RemoveAll(emptydir)
logparser := &LogParserPlugin{ logparser := &LogParser{
Log: testutil.Logger{}, Log: testutil.Logger{},
FromBeginning: true, FromBeginning: true,
Files: []string{filepath.Join(emptydir, "*.log")}, Files: []string{filepath.Join(emptydir, "*.log")},
GrokConfig: GrokConfig{ GrokConfig: grokConfig{
MeasurementName: "logparser_grok", MeasurementName: "logparser_grok",
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"}, Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
CustomPatternFiles: []string{filepath.Join(testdataDir, "test-patterns")}, CustomPatternFiles: []string{filepath.Join(testdataDir, "test-patterns")},
@ -165,11 +165,11 @@ func TestGrokParseLogFilesAppearLater(t *testing.T) {
// Test that test_a.log line gets parsed even though we don't have the correct // Test that test_a.log line gets parsed even though we don't have the correct
// pattern available for test_b.log // pattern available for test_b.log
func TestGrokParseLogFilesOneBad(t *testing.T) { func TestGrokParseLogFilesOneBad(t *testing.T) {
logparser := &LogParserPlugin{ logparser := &LogParser{
Log: testutil.Logger{}, Log: testutil.Logger{},
FromBeginning: true, FromBeginning: true,
Files: []string{filepath.Join(testdataDir, "test_a.log")}, Files: []string{filepath.Join(testdataDir, "test_a.log")},
GrokConfig: GrokConfig{ GrokConfig: grokConfig{
MeasurementName: "logparser_grok", MeasurementName: "logparser_grok",
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_BAD}"}, Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_BAD}"},
CustomPatternFiles: []string{filepath.Join(testdataDir, "test-patterns")}, CustomPatternFiles: []string{filepath.Join(testdataDir, "test-patterns")},
@ -197,9 +197,9 @@ func TestGrokParseLogFilesOneBad(t *testing.T) {
} }
func TestGrokParseLogFiles_TimestampInEpochMilli(t *testing.T) { func TestGrokParseLogFiles_TimestampInEpochMilli(t *testing.T) {
logparser := &LogParserPlugin{ logparser := &LogParser{
Log: testutil.Logger{}, Log: testutil.Logger{},
GrokConfig: GrokConfig{ GrokConfig: grokConfig{
MeasurementName: "logparser_grok", MeasurementName: "logparser_grok",
Patterns: []string{"%{TEST_LOG_C}"}, Patterns: []string{"%{TEST_LOG_C}"},
CustomPatternFiles: []string{filepath.Join(testdataDir, "test-patterns")}, CustomPatternFiles: []string{filepath.Join(testdataDir, "test-patterns")},

View File

@ -122,6 +122,69 @@ func (logstash *Logstash) Init() error {
return nil return nil
} }
func (*Logstash) Start(telegraf.Accumulator) error {
return nil
}
func (logstash *Logstash) Gather(accumulator telegraf.Accumulator) error {
if logstash.client == nil {
client, err := logstash.createHTTPClient()
if err != nil {
return err
}
logstash.client = client
}
if choice.Contains("jvm", logstash.Collect) {
jvmURL, err := url.Parse(logstash.URL + jvmStatsNode)
if err != nil {
return err
}
if err := logstash.gatherJVMStats(jvmURL.String(), accumulator); err != nil {
return err
}
}
if choice.Contains("process", logstash.Collect) {
processURL, err := url.Parse(logstash.URL + processStatsNode)
if err != nil {
return err
}
if err := logstash.gatherProcessStats(processURL.String(), accumulator); err != nil {
return err
}
}
if choice.Contains("pipelines", logstash.Collect) {
if logstash.SinglePipeline {
pipelineURL, err := url.Parse(logstash.URL + pipelineStatsNode)
if err != nil {
return err
}
if err := logstash.gatherPipelineStats(pipelineURL.String(), accumulator); err != nil {
return err
}
} else {
pipelinesURL, err := url.Parse(logstash.URL + pipelinesStatsNode)
if err != nil {
return err
}
if err := logstash.gatherPipelinesStats(pipelinesURL.String(), accumulator); err != nil {
return err
}
}
}
return nil
}
func (logstash *Logstash) Stop() {
if logstash.client != nil {
logstash.client.CloseIdleConnections()
}
}
// createHTTPClient create a clients to access API // createHTTPClient create a clients to access API
func (logstash *Logstash) createHTTPClient() (*http.Client, error) { func (logstash *Logstash) createHTTPClient() (*http.Client, error) {
ctx := context.Background() ctx := context.Background()
@ -193,7 +256,7 @@ func (logstash *Logstash) gatherJVMStats(address string, accumulator telegraf.Ac
return nil return nil
} }
// gatherJVMStats gather the Process metrics and add results to the accumulator // gatherProcessStats gather the Process metrics and add results to the accumulator
func (logstash *Logstash) gatherProcessStats(address string, accumulator telegraf.Accumulator) error { func (logstash *Logstash) gatherProcessStats(address string, accumulator telegraf.Accumulator) error {
processStats := &processStats{} processStats := &processStats{}
@ -352,7 +415,7 @@ func (logstash *Logstash) gatherQueueStats(queue pipelineQueue, tags map[string]
return nil return nil
} }
// gatherJVMStats gather the Pipeline metrics and add results to the accumulator (for Logstash < 6) // gatherPipelineStats gather the Pipeline metrics and add results to the accumulator (for Logstash < 6)
func (logstash *Logstash) gatherPipelineStats(address string, accumulator telegraf.Accumulator) error { func (logstash *Logstash) gatherPipelineStats(address string, accumulator telegraf.Accumulator) error {
pipelineStats := &pipelineStats{} pipelineStats := &pipelineStats{}
@ -396,7 +459,7 @@ func (logstash *Logstash) gatherPipelineStats(address string, accumulator telegr
return nil return nil
} }
// gatherJVMStats gather the Pipelines metrics and add results to the accumulator (for Logstash >= 6) // gatherPipelinesStats gather the Pipelines metrics and add results to the accumulator (for Logstash >= 6)
func (logstash *Logstash) gatherPipelinesStats(address string, accumulator telegraf.Accumulator) error { func (logstash *Logstash) gatherPipelinesStats(address string, accumulator telegraf.Accumulator) error {
pipelinesStats := &pipelinesStats{} pipelinesStats := &pipelinesStats{}
@ -443,78 +506,6 @@ func (logstash *Logstash) gatherPipelinesStats(address string, accumulator teleg
return nil return nil
} }
func (logstash *Logstash) Start(_ telegraf.Accumulator) error {
return nil
}
// Gather ask this plugin to start gathering metrics
func (logstash *Logstash) Gather(accumulator telegraf.Accumulator) error {
if logstash.client == nil {
client, err := logstash.createHTTPClient()
if err != nil {
return err
}
logstash.client = client
}
if choice.Contains("jvm", logstash.Collect) {
jvmURL, err := url.Parse(logstash.URL + jvmStatsNode)
if err != nil {
return err
}
if err := logstash.gatherJVMStats(jvmURL.String(), accumulator); err != nil {
return err
}
}
if choice.Contains("process", logstash.Collect) {
processURL, err := url.Parse(logstash.URL + processStatsNode)
if err != nil {
return err
}
if err := logstash.gatherProcessStats(processURL.String(), accumulator); err != nil {
return err
}
}
if choice.Contains("pipelines", logstash.Collect) {
if logstash.SinglePipeline {
pipelineURL, err := url.Parse(logstash.URL + pipelineStatsNode)
if err != nil {
return err
}
if err := logstash.gatherPipelineStats(pipelineURL.String(), accumulator); err != nil {
return err
}
} else {
pipelinesURL, err := url.Parse(logstash.URL + pipelinesStatsNode)
if err != nil {
return err
}
if err := logstash.gatherPipelinesStats(pipelinesURL.String(), accumulator); err != nil {
return err
}
}
}
return nil
}
func (logstash *Logstash) Stop() {
if logstash.client != nil {
logstash.client.CloseIdleConnections()
}
}
// init registers this plugin instance
func init() {
inputs.Add("logstash", func() telegraf.Input {
return newLogstash()
})
}
// newLogstash create an instance of the plugin with default settings
func newLogstash() *Logstash { func newLogstash() *Logstash {
return &Logstash{ return &Logstash{
URL: "http://127.0.0.1:9600", URL: "http://127.0.0.1:9600",
@ -525,3 +516,9 @@ func newLogstash() *Logstash {
}, },
} }
} }
func init() {
inputs.Add("logstash", func() telegraf.Input {
return newLogstash()
})
}

View File

@ -24,13 +24,8 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
type tags struct {
name, brwSection, bucket, job, client string
}
// Lustre proc files can change between versions, so we want to future-proof
// by letting people choose what to look at.
type Lustre2 struct { type Lustre2 struct {
// Lustre proc files can change between versions, so we want to future-proof by letting people choose what to look at.
MgsProcfiles []string `toml:"mgs_procfiles"` MgsProcfiles []string `toml:"mgs_procfiles"`
OstProcfiles []string `toml:"ost_procfiles"` OstProcfiles []string `toml:"ost_procfiles"`
MdsProcfiles []string `toml:"mds_procfiles"` MdsProcfiles []string `toml:"mds_procfiles"`
@ -43,13 +38,400 @@ type Lustre2 struct {
allFields map[tags]map[string]interface{} allFields map[tags]map[string]interface{}
} }
/* type tags struct {
The wanted fields would be a []string if not for the name, brwSection, bucket, job, client string
}
lines that start with read_bytes/write_bytes and contain func (*Lustre2) SampleConfig() string {
return sampleConfig
}
both the byte count and the function call count func (l *Lustre2) Gather(acc telegraf.Accumulator) error {
*/ l.allFields = make(map[tags]map[string]interface{})
err := l.getLustreHealth()
if err != nil {
return err
}
if len(l.MgsProcfiles) == 0 {
l.MgsProcfiles = []string{
// eviction count
"/sys/fs/lustre/mgs/*/eviction_count",
}
}
if len(l.OstProcfiles) == 0 {
l.OstProcfiles = []string{
// read/write bytes are in obdfilter/<ost_name>/stats
"/proc/fs/lustre/obdfilter/*/stats",
// cache counters are in osd-ldiskfs/<ost_name>/stats
"/proc/fs/lustre/osd-ldiskfs/*/stats",
// per job statistics are in obdfilter/<ost_name>/job_stats
"/proc/fs/lustre/obdfilter/*/job_stats",
// bulk read/write statistics for ldiskfs
"/proc/fs/lustre/osd-ldiskfs/*/brw_stats",
// bulk read/write statistics for zfs
"/proc/fs/lustre/osd-zfs/*/brw_stats",
// eviction count
"/sys/fs/lustre/obdfilter/*/eviction_count",
}
}
if len(l.MdsProcfiles) == 0 {
l.MdsProcfiles = []string{
// Metadata server stats
"/proc/fs/lustre/mdt/*/md_stats",
// Metadata target job stats
"/proc/fs/lustre/mdt/*/job_stats",
// eviction count
"/sys/fs/lustre/mdt/*/eviction_count",
}
}
for _, procfile := range l.MgsProcfiles {
if !strings.HasSuffix(procfile, "eviction_count") {
return fmt.Errorf("no handler found for mgs procfile pattern \"%s\"", procfile)
}
err := l.getLustreEvictionCount(procfile)
if err != nil {
return err
}
}
for _, procfile := range l.OstProcfiles {
if strings.HasSuffix(procfile, "brw_stats") {
err := l.getLustreProcBrwStats(procfile, wantedBrwstatsFields)
if err != nil {
return err
}
} else if strings.HasSuffix(procfile, "job_stats") {
err := l.getLustreProcStats(procfile, wantedOstJobstatsFields)
if err != nil {
return err
}
} else if strings.HasSuffix(procfile, "eviction_count") {
err := l.getLustreEvictionCount(procfile)
if err != nil {
return err
}
} else {
err := l.getLustreProcStats(procfile, wantedOstFields)
if err != nil {
return err
}
}
}
for _, procfile := range l.MdsProcfiles {
if strings.HasSuffix(procfile, "brw_stats") {
err := l.getLustreProcBrwStats(procfile, wantedBrwstatsFields)
if err != nil {
return err
}
} else if strings.HasSuffix(procfile, "job_stats") {
err := l.getLustreProcStats(procfile, wantedMdtJobstatsFields)
if err != nil {
return err
}
} else if strings.HasSuffix(procfile, "eviction_count") {
err := l.getLustreEvictionCount(procfile)
if err != nil {
return err
}
} else {
err := l.getLustreProcStats(procfile, wantedMdsFields)
if err != nil {
return err
}
}
}
for tgs, fields := range l.allFields {
tags := make(map[string]string, 5)
if len(tgs.name) > 0 {
tags["name"] = tgs.name
}
if len(tgs.brwSection) > 0 {
tags["brw_section"] = tgs.brwSection
}
if len(tgs.bucket) > 0 {
tags["bucket"] = tgs.bucket
}
if len(tgs.job) > 0 {
tags["jobid"] = tgs.job
}
if len(tgs.client) > 0 {
tags["client"] = tgs.client
}
acc.AddFields("lustre2", fields, tags)
}
return nil
}
func (l *Lustre2) getLustreHealth() error {
// the linter complains about using an element containing '/' in filepath.Join()
// so we explicitly set the rootdir default to '/' in this function rather than
// starting the second element with a '/'.
rootdir := l.rootdir
if rootdir == "" {
rootdir = "/"
}
filename := filepath.Join(rootdir, "sys", "fs", "lustre", "health_check")
if _, err := os.Stat(filename); err != nil {
// try falling back to the old procfs location
// it was moved in https://github.com/lustre/lustre-release/commit/5d368bd0b2
filename = filepath.Join(rootdir, "proc", "fs", "lustre", "health_check")
if _, err = os.Stat(filename); err != nil {
return nil //nolint:nilerr // we don't want to return an error if the file doesn't exist
}
}
contents, err := os.ReadFile(filename)
if err != nil {
return err
}
value := strings.TrimSpace(string(contents))
var health uint64
if value == "healthy" {
health = 1
}
t := tags{}
var fields map[string]interface{}
fields, ok := l.allFields[t]
if !ok {
fields = make(map[string]interface{})
l.allFields[t] = fields
}
fields["health"] = health
return nil
}
func (l *Lustre2) getLustreProcStats(fileglob string, wantedFields []*mapping) error {
files, err := filepath.Glob(filepath.Join(l.rootdir, fileglob))
if err != nil {
return err
}
fieldSplitter := regexp.MustCompile(`[ :]+`)
for _, file := range files {
/* From /proc/fs/lustre/obdfilter/<ost_name>/stats and similar
* extract the object store target name,
* and for per-client files under
* /proc/fs/lustre/obdfilter/<ost_name>/exports/<client_nid>/stats
* and similar the client NID
* Assumption: the target name is fourth to last
* for per-client files and second to last otherwise
* and the client NID is always second to last,
* which is true in Lustre 2.1->2.14
*/
path := strings.Split(file, "/")
var name, client string
if strings.Contains(file, "/exports/") {
name = path[len(path)-4]
client = path[len(path)-2]
} else {
name = path[len(path)-2]
client = ""
}
wholeFile, err := os.ReadFile(file)
if err != nil {
return err
}
jobs := strings.Split(string(wholeFile), "- ")
for _, job := range jobs {
lines := strings.Split(job, "\n")
jobid := ""
// figure out if the data should be tagged with job_id here
parts := strings.Fields(lines[0])
if strings.TrimSuffix(parts[0], ":") == "job_id" {
jobid = parts[1]
}
for _, line := range lines {
// skip any empty lines
if len(line) < 1 {
continue
}
parts := fieldSplitter.Split(line, -1)
if len(parts[0]) == 0 {
parts = parts[1:]
}
var fields map[string]interface{}
fields, ok := l.allFields[tags{name, "", "", jobid, client}]
if !ok {
fields = make(map[string]interface{})
l.allFields[tags{name, "", "", jobid, client}] = fields
}
for _, wanted := range wantedFields {
var data uint64
if parts[0] == wanted.inProc {
wantedField := wanted.field
// if not set, assume field[1]. Shouldn't be field[0], as
// that's a string
if wantedField == 0 {
wantedField = 1
}
data, err = strconv.ParseUint(strings.TrimSuffix(parts[wantedField], ","), 10, 64)
if err != nil {
return err
}
reportName := wanted.inProc
if wanted.reportAs != "" {
reportName = wanted.reportAs
}
fields[reportName] = data
}
}
}
}
}
return nil
}
func (l *Lustre2) getLustreProcBrwStats(fileglob string, wantedFields []*mapping) error {
files, err := filepath.Glob(filepath.Join(l.rootdir, fileglob))
if err != nil {
return fmt.Errorf("failed to find files matching glob %s: %w", fileglob, err)
}
for _, file := range files {
// Turn /proc/fs/lustre/obdfilter/<ost_name>/stats and similar into just the object store target name
// This assumes that the target name is always second to last, which is true in Lustre 2.1->2.12
path := strings.Split(file, "/")
if len(path) < 2 {
continue
}
name := path[len(path)-2]
wholeFile, err := os.ReadFile(file)
if err != nil {
if errors.Is(err, os.ErrPermission) {
l.Log.Debugf("%s", err)
continue
}
return fmt.Errorf("failed to read file %s: %w", file, err)
}
lines := strings.Split(string(wholeFile), "\n")
var headerName string
for _, line := range lines {
// There are four types of lines in a brw_stats file:
// 1. Header lines - contain the category of metric (e.g. disk I/Os in flight, disk I/O time)
// 2. Bucket lines - follow headers, contain the bucket value (e.g. 4K, 1M) and metric values
// 3. Empty lines - these will simply be filtered out
// 4. snapshot_time line - this will be filtered out, as it "looks" like a bucket line
if len(line) < 1 {
continue
}
parts := strings.Fields(line)
// This is a header line
// Set report name for use by the buckets that follow
if !strings.Contains(parts[0], ":") {
nameParts := strings.Split(line, " ")
headerName = nameParts[0]
continue
}
// snapshot_time should be discarded
if strings.Contains(parts[0], "snapshot_time") {
continue
}
// This is a bucket for a given header
for _, wanted := range wantedFields {
if headerName != wanted.inProc {
continue
}
bucket := strings.TrimSuffix(parts[0], ":")
// brw_stats columns are static and don't need configurable fields
readIos, err := strconv.ParseUint(parts[1], 10, 64)
if err != nil {
return fmt.Errorf("failed to parse read_ios: %w", err)
}
readPercent, err := strconv.ParseUint(parts[2], 10, 64)
if err != nil {
return fmt.Errorf("failed to parse read_percent: %w", err)
}
writeIos, err := strconv.ParseUint(parts[5], 10, 64)
if err != nil {
return fmt.Errorf("failed to parse write_ios: %w", err)
}
writePercent, err := strconv.ParseUint(parts[6], 10, 64)
if err != nil {
return fmt.Errorf("failed to parse write_percent: %w", err)
}
reportName := headerName
if wanted.reportAs != "" {
reportName = wanted.reportAs
}
tag := tags{name, reportName, bucket, "", ""}
fields, ok := l.allFields[tag]
if !ok {
fields = make(map[string]interface{})
l.allFields[tag] = fields
}
fields["read_ios"] = readIos
fields["read_percent"] = readPercent
fields["write_ios"] = writeIos
fields["write_percent"] = writePercent
}
}
}
return nil
}
func (l *Lustre2) getLustreEvictionCount(fileglob string) error {
files, err := filepath.Glob(filepath.Join(l.rootdir, fileglob))
if err != nil {
return fmt.Errorf("failed to find files matching glob %s: %w", fileglob, err)
}
for _, file := range files {
// Turn /sys/fs/lustre/*/<mgt/mdt/ost_name>/eviction_count into just the object store target name
// This assumes that the target name is always second to last, which is true in Lustre 2.1->2.12
path := strings.Split(file, "/")
if len(path) < 2 {
continue
}
name := path[len(path)-2]
contents, err := os.ReadFile(file)
if err != nil {
return fmt.Errorf("failed to read file %s: %w", file, err)
}
value, err := strconv.ParseUint(strings.TrimSpace(string(contents)), 10, 64)
if err != nil {
return fmt.Errorf("failed to parse file %s: %w", file, err)
}
tag := tags{name, "", "", "", ""}
fields, ok := l.allFields[tag]
if !ok {
fields = make(map[string]interface{})
l.allFields[tag] = fields
}
fields["evictions"] = value
}
return nil
}
// The wanted fields would be a []string, if not for the lines that start with read_bytes/write_bytes
// and contain both the byte count and the function call count
type mapping struct { type mapping struct {
inProc string // What to look for at the start of a line in /proc/fs/lustre/* inProc string // What to look for at the start of a line in /proc/fs/lustre/*
field uint32 // which field to extract from that line field uint32 // which field to extract from that line
@ -378,395 +760,6 @@ var wantedMdtJobstatsFields = []*mapping{
}, },
} }
func (*Lustre2) SampleConfig() string {
return sampleConfig
}
func (l *Lustre2) GetLustreHealth() error {
// the linter complains about using an element containing '/' in filepath.Join()
// so we explicitly set the rootdir default to '/' in this function rather than
// starting the second element with a '/'.
rootdir := l.rootdir
if rootdir == "" {
rootdir = "/"
}
filename := filepath.Join(rootdir, "sys", "fs", "lustre", "health_check")
if _, err := os.Stat(filename); err != nil {
// try falling back to the old procfs location
// it was moved in https://github.com/lustre/lustre-release/commit/5d368bd0b2
filename = filepath.Join(rootdir, "proc", "fs", "lustre", "health_check")
if _, err = os.Stat(filename); err != nil {
return nil //nolint:nilerr // we don't want to return an error if the file doesn't exist
}
}
contents, err := os.ReadFile(filename)
if err != nil {
return err
}
value := strings.TrimSpace(string(contents))
var health uint64
if value == "healthy" {
health = 1
}
t := tags{}
var fields map[string]interface{}
fields, ok := l.allFields[t]
if !ok {
fields = make(map[string]interface{})
l.allFields[t] = fields
}
fields["health"] = health
return nil
}
func (l *Lustre2) GetLustreProcStats(fileglob string, wantedFields []*mapping) error {
files, err := filepath.Glob(filepath.Join(l.rootdir, fileglob))
if err != nil {
return err
}
fieldSplitter := regexp.MustCompile(`[ :]+`)
for _, file := range files {
/* From /proc/fs/lustre/obdfilter/<ost_name>/stats and similar
* extract the object store target name,
* and for per-client files under
* /proc/fs/lustre/obdfilter/<ost_name>/exports/<client_nid>/stats
* and similar the client NID
* Assumption: the target name is fourth to last
* for per-client files and second to last otherwise
* and the client NID is always second to last,
* which is true in Lustre 2.1->2.14
*/
path := strings.Split(file, "/")
var name, client string
if strings.Contains(file, "/exports/") {
name = path[len(path)-4]
client = path[len(path)-2]
} else {
name = path[len(path)-2]
client = ""
}
wholeFile, err := os.ReadFile(file)
if err != nil {
return err
}
jobs := strings.Split(string(wholeFile), "- ")
for _, job := range jobs {
lines := strings.Split(job, "\n")
jobid := ""
// figure out if the data should be tagged with job_id here
parts := strings.Fields(lines[0])
if strings.TrimSuffix(parts[0], ":") == "job_id" {
jobid = parts[1]
}
for _, line := range lines {
// skip any empty lines
if len(line) < 1 {
continue
}
parts := fieldSplitter.Split(line, -1)
if len(parts[0]) == 0 {
parts = parts[1:]
}
var fields map[string]interface{}
fields, ok := l.allFields[tags{name, "", "", jobid, client}]
if !ok {
fields = make(map[string]interface{})
l.allFields[tags{name, "", "", jobid, client}] = fields
}
for _, wanted := range wantedFields {
var data uint64
if parts[0] == wanted.inProc {
wantedField := wanted.field
// if not set, assume field[1]. Shouldn't be field[0], as
// that's a string
if wantedField == 0 {
wantedField = 1
}
data, err = strconv.ParseUint(strings.TrimSuffix(parts[wantedField], ","), 10, 64)
if err != nil {
return err
}
reportName := wanted.inProc
if wanted.reportAs != "" {
reportName = wanted.reportAs
}
fields[reportName] = data
}
}
}
}
}
return nil
}
func (l *Lustre2) getLustreProcBrwStats(fileglob string, wantedFields []*mapping) error {
files, err := filepath.Glob(filepath.Join(l.rootdir, fileglob))
if err != nil {
return fmt.Errorf("failed to find files matching glob %s: %w", fileglob, err)
}
for _, file := range files {
// Turn /proc/fs/lustre/obdfilter/<ost_name>/stats and similar into just the object store target name
// This assumes that the target name is always second to last, which is true in Lustre 2.1->2.12
path := strings.Split(file, "/")
if len(path) < 2 {
continue
}
name := path[len(path)-2]
wholeFile, err := os.ReadFile(file)
if err != nil {
if errors.Is(err, os.ErrPermission) {
l.Log.Debugf("%s", err)
continue
}
return fmt.Errorf("failed to read file %s: %w", file, err)
}
lines := strings.Split(string(wholeFile), "\n")
var headerName string
for _, line := range lines {
// There are four types of lines in a brw_stats file:
// 1. Header lines - contain the category of metric (e.g. disk I/Os in flight, disk I/O time)
// 2. Bucket lines - follow headers, contain the bucket value (e.g. 4K, 1M) and metric values
// 3. Empty lines - these will simply be filtered out
// 4. snapshot_time line - this will be filtered out, as it "looks" like a bucket line
if len(line) < 1 {
continue
}
parts := strings.Fields(line)
// This is a header line
// Set report name for use by the buckets that follow
if !strings.Contains(parts[0], ":") {
nameParts := strings.Split(line, " ")
headerName = nameParts[0]
continue
}
// snapshot_time should be discarded
if strings.Contains(parts[0], "snapshot_time") {
continue
}
// This is a bucket for a given header
for _, wanted := range wantedFields {
if headerName != wanted.inProc {
continue
}
bucket := strings.TrimSuffix(parts[0], ":")
// brw_stats columns are static and don't need configurable fields
readIos, err := strconv.ParseUint(parts[1], 10, 64)
if err != nil {
return fmt.Errorf("failed to parse read_ios: %w", err)
}
readPercent, err := strconv.ParseUint(parts[2], 10, 64)
if err != nil {
return fmt.Errorf("failed to parse read_percent: %w", err)
}
writeIos, err := strconv.ParseUint(parts[5], 10, 64)
if err != nil {
return fmt.Errorf("failed to parse write_ios: %w", err)
}
writePercent, err := strconv.ParseUint(parts[6], 10, 64)
if err != nil {
return fmt.Errorf("failed to parse write_percent: %w", err)
}
reportName := headerName
if wanted.reportAs != "" {
reportName = wanted.reportAs
}
tag := tags{name, reportName, bucket, "", ""}
fields, ok := l.allFields[tag]
if !ok {
fields = make(map[string]interface{})
l.allFields[tag] = fields
}
fields["read_ios"] = readIos
fields["read_percent"] = readPercent
fields["write_ios"] = writeIos
fields["write_percent"] = writePercent
}
}
}
return nil
}
func (l *Lustre2) getLustreEvictionCount(fileglob string) error {
files, err := filepath.Glob(filepath.Join(l.rootdir, fileglob))
if err != nil {
return fmt.Errorf("failed to find files matching glob %s: %w", fileglob, err)
}
for _, file := range files {
// Turn /sys/fs/lustre/*/<mgt/mdt/ost_name>/eviction_count into just the object store target name
// This assumes that the target name is always second to last, which is true in Lustre 2.1->2.12
path := strings.Split(file, "/")
if len(path) < 2 {
continue
}
name := path[len(path)-2]
contents, err := os.ReadFile(file)
if err != nil {
return fmt.Errorf("failed to read file %s: %w", file, err)
}
value, err := strconv.ParseUint(strings.TrimSpace(string(contents)), 10, 64)
if err != nil {
return fmt.Errorf("failed to parse file %s: %w", file, err)
}
tag := tags{name, "", "", "", ""}
fields, ok := l.allFields[tag]
if !ok {
fields = make(map[string]interface{})
l.allFields[tag] = fields
}
fields["evictions"] = value
}
return nil
}
// Gather reads stats from all lustre targets
func (l *Lustre2) Gather(acc telegraf.Accumulator) error {
l.allFields = make(map[tags]map[string]interface{})
err := l.GetLustreHealth()
if err != nil {
return err
}
if len(l.MgsProcfiles) == 0 {
l.MgsProcfiles = []string{
// eviction count
"/sys/fs/lustre/mgs/*/eviction_count",
}
}
if len(l.OstProcfiles) == 0 {
l.OstProcfiles = []string{
// read/write bytes are in obdfilter/<ost_name>/stats
"/proc/fs/lustre/obdfilter/*/stats",
// cache counters are in osd-ldiskfs/<ost_name>/stats
"/proc/fs/lustre/osd-ldiskfs/*/stats",
// per job statistics are in obdfilter/<ost_name>/job_stats
"/proc/fs/lustre/obdfilter/*/job_stats",
// bulk read/write statistics for ldiskfs
"/proc/fs/lustre/osd-ldiskfs/*/brw_stats",
// bulk read/write statistics for zfs
"/proc/fs/lustre/osd-zfs/*/brw_stats",
// eviction count
"/sys/fs/lustre/obdfilter/*/eviction_count",
}
}
if len(l.MdsProcfiles) == 0 {
l.MdsProcfiles = []string{
// Metadata server stats
"/proc/fs/lustre/mdt/*/md_stats",
// Metadata target job stats
"/proc/fs/lustre/mdt/*/job_stats",
// eviction count
"/sys/fs/lustre/mdt/*/eviction_count",
}
}
for _, procfile := range l.MgsProcfiles {
if !strings.HasSuffix(procfile, "eviction_count") {
return fmt.Errorf("no handler found for mgs procfile pattern \"%s\"", procfile)
}
err := l.getLustreEvictionCount(procfile)
if err != nil {
return err
}
}
for _, procfile := range l.OstProcfiles {
if strings.HasSuffix(procfile, "brw_stats") {
err := l.getLustreProcBrwStats(procfile, wantedBrwstatsFields)
if err != nil {
return err
}
} else if strings.HasSuffix(procfile, "job_stats") {
err := l.GetLustreProcStats(procfile, wantedOstJobstatsFields)
if err != nil {
return err
}
} else if strings.HasSuffix(procfile, "eviction_count") {
err := l.getLustreEvictionCount(procfile)
if err != nil {
return err
}
} else {
err := l.GetLustreProcStats(procfile, wantedOstFields)
if err != nil {
return err
}
}
}
for _, procfile := range l.MdsProcfiles {
if strings.HasSuffix(procfile, "brw_stats") {
err := l.getLustreProcBrwStats(procfile, wantedBrwstatsFields)
if err != nil {
return err
}
} else if strings.HasSuffix(procfile, "job_stats") {
err := l.GetLustreProcStats(procfile, wantedMdtJobstatsFields)
if err != nil {
return err
}
} else if strings.HasSuffix(procfile, "eviction_count") {
err := l.getLustreEvictionCount(procfile)
if err != nil {
return err
}
} else {
err := l.GetLustreProcStats(procfile, wantedMdsFields)
if err != nil {
return err
}
}
}
for tgs, fields := range l.allFields {
tags := make(map[string]string, 5)
if len(tgs.name) > 0 {
tags["name"] = tgs.name
}
if len(tgs.brwSection) > 0 {
tags["brw_section"] = tgs.brwSection
}
if len(tgs.bucket) > 0 {
tags["bucket"] = tgs.bucket
}
if len(tgs.job) > 0 {
tags["jobid"] = tgs.job
}
if len(tgs.client) > 0 {
tags["client"] = tgs.client
}
acc.AddFields("lustre2", fields, tags)
}
return nil
}
func init() { func init() {
inputs.Add("lustre2", func() telegraf.Input { inputs.Add("lustre2", func() telegraf.Input {
return &Lustre2{} return &Lustre2{}

View File

@ -17,11 +17,13 @@ type Lustre2 struct {
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
} }
func (*Lustre2) SampleConfig() string { return sampleConfig }
func (l *Lustre2) Init() error { func (l *Lustre2) Init() error {
l.Log.Warn("current platform is not supported") l.Log.Warn("Current platform is not supported")
return nil return nil
} }
func (*Lustre2) SampleConfig() string { return sampleConfig }
func (*Lustre2) Gather(_ telegraf.Accumulator) error { return nil } func (*Lustre2) Gather(_ telegraf.Accumulator) error { return nil }
func init() { func init() {

View File

@ -6,8 +6,9 @@ import (
"os/exec" "os/exec"
"testing" "testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/testutil"
) )
func TestGather(t *testing.T) { func TestGather(t *testing.T) {

View File

@ -28,17 +28,13 @@ import (
"strings" "strings"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
const (
defaultHostProc = "/proc"
envProc = "HOST_PROC"
)
var ( var (
statusLineRE = regexp.MustCompile(`(\d+) blocks .*\[(\d+)/(\d+)\] \[([U_]+)\]`) statusLineRE = regexp.MustCompile(`(\d+) blocks .*\[(\d+)/(\d+)\] \[([U_]+)\]`)
recoveryLineBlocksRE = regexp.MustCompile(`\((\d+)/\d+\)`) recoveryLineBlocksRE = regexp.MustCompile(`\((\d+)/\d+\)`)
@ -274,7 +270,7 @@ func (k *MdstatConf) Gather(acc telegraf.Accumulator) error {
func (k *MdstatConf) getProcMdstat() ([]byte, error) { func (k *MdstatConf) getProcMdstat() ([]byte, error) {
var mdStatFile string var mdStatFile string
if k.FileName == "" { if k.FileName == "" {
mdStatFile = proc(envProc, defaultHostProc) + "/mdstat" mdStatFile = internal.GetProcPath() + "/mdstat"
} else { } else {
mdStatFile = k.FileName mdStatFile = k.FileName
} }
@ -295,13 +291,3 @@ func (k *MdstatConf) getProcMdstat() ([]byte, error) {
func init() { func init() {
inputs.Add("mdstat", func() telegraf.Input { return &MdstatConf{} }) inputs.Add("mdstat", func() telegraf.Input { return &MdstatConf{} })
} }
// proc can be used to read file paths from env
func proc(env, path string) string {
// try to read full file path
if p := os.Getenv(env); p != "" {
return p
}
// return default path
return path
}

View File

@ -13,6 +13,7 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/inputs/system" "github.com/influxdata/telegraf/plugins/inputs/system"
) )
@ -136,10 +137,7 @@ func (n *NetIOStats) Gather(acc telegraf.Accumulator) error {
// Get the interface speed from /sys/class/net/*/speed file. returns -1 if unsupported // Get the interface speed from /sys/class/net/*/speed file. returns -1 if unsupported
func getInterfaceSpeed(ioName string) int64 { func getInterfaceSpeed(ioName string) int64 {
sysPath := os.Getenv("HOST_SYS") sysPath := internal.GetSysPath()
if sysPath == "" {
sysPath = "/sys"
}
raw, err := os.ReadFile(filepath.Join(sysPath, "class", "net", ioName, "speed")) raw, err := os.ReadFile(filepath.Join(sysPath, "class", "net", ioName, "speed"))
if err != nil { if err != nil {

View File

@ -14,8 +14,8 @@ import (
"syscall" "syscall"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/inputs/linux_sysctl_fs"
) )
type Processes struct { type Processes struct {
@ -130,7 +130,7 @@ func (p *Processes) gatherFromPS(fields map[string]interface{}) error {
// get process states from /proc/(pid)/stat files // get process states from /proc/(pid)/stat files
func (p *Processes) gatherFromProc(fields map[string]interface{}) error { func (p *Processes) gatherFromProc(fields map[string]interface{}) error {
filenames, err := filepath.Glob(linux_sysctl_fs.GetHostProc() + "/[0-9]*/stat") filenames, err := filepath.Glob(internal.GetProcPath() + "/[0-9]*/stat")
if err != nil { if err != nil {
return err return err
} }

View File

@ -17,6 +17,8 @@ import (
"github.com/shirou/gopsutil/v4/process" "github.com/shirou/gopsutil/v4/process"
"github.com/vishvananda/netlink" "github.com/vishvananda/netlink"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
"github.com/influxdata/telegraf/internal"
) )
func processName(p *process.Process) (string, error) { func processName(p *process.Process) (string, error) {
@ -86,11 +88,7 @@ func findByWindowsServices(_ []string) ([]processGroup, error) {
} }
func collectTotalReadWrite(proc Process) (r, w uint64, err error) { func collectTotalReadWrite(proc Process) (r, w uint64, err error) {
path := procfs.DefaultMountPoint path := internal.GetProcPath()
if hp := os.Getenv("HOST_PROC"); hp != "" {
path = hp
}
fs, err := procfs.NewFS(path) fs, err := procfs.NewFS(path)
if err != nil { if err != nil {
return 0, 0, err return 0, 0, err
@ -163,11 +161,7 @@ func socketTypeName(t uint8) string {
} }
func mapFdToInode(pid int32, fd uint32) (uint32, error) { func mapFdToInode(pid int32, fd uint32) (uint32, error) {
root := os.Getenv("HOST_PROC") root := internal.GetProcPath()
if root == "" {
root = "/proc"
}
fn := fmt.Sprintf("%s/%d/fd/%d", root, pid, fd) fn := fmt.Sprintf("%s/%d/fd/%d", root, pid, fd)
link, err := os.Readlink(fn) link, err := os.Readlink(fn)
if err != nil { if err != nil {

View File

@ -103,14 +103,6 @@ func (ss *SlabStats) runCmd(cmd string, args []string) ([]byte, error) {
return out, nil return out, nil
} }
func getHostProc() string {
procPath := "/proc"
if os.Getenv("HOST_PROC") != "" {
procPath = os.Getenv("HOST_PROC")
}
return procPath
}
func normalizeName(name string) string { func normalizeName(name string) string {
return strings.ReplaceAll(strings.ToLower(name), "-", "_") + "_size" return strings.ReplaceAll(strings.ToLower(name), "-", "_") + "_size"
} }
@ -118,7 +110,7 @@ func normalizeName(name string) string {
func init() { func init() {
inputs.Add("slab", func() telegraf.Input { inputs.Add("slab", func() telegraf.Input {
return &SlabStats{ return &SlabStats{
statFile: path.Join(getHostProc(), "slabinfo"), statFile: path.Join(internal.GetProcPath(), "slabinfo"),
useSudo: true, useSudo: true,
} }
}) })

View File

@ -3,10 +3,10 @@ package synproxy
import ( import (
_ "embed" _ "embed"
"os"
"path" "path"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
@ -24,18 +24,10 @@ func (*Synproxy) SampleConfig() string {
return sampleConfig return sampleConfig
} }
func getHostProc() string {
procPath := "/proc"
if os.Getenv("HOST_PROC") != "" {
procPath = os.Getenv("HOST_PROC")
}
return procPath
}
func init() { func init() {
inputs.Add("synproxy", func() telegraf.Input { inputs.Add("synproxy", func() telegraf.Input {
return &Synproxy{ return &Synproxy{
statFile: path.Join(getHostProc(), "/net/stat/synproxy"), statFile: path.Join(internal.GetProcPath(), "/net/stat/synproxy"),
} }
}) })
} }

View File

@ -11,6 +11,7 @@ import (
"strings" "strings"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
) )
const scalingFactor = float64(1000.0) const scalingFactor = float64(1000.0)
@ -37,10 +38,7 @@ func (t *Temperature) Init() error {
func (t *Temperature) Gather(acc telegraf.Accumulator) error { func (t *Temperature) Gather(acc telegraf.Accumulator) error {
// Get all sensors and honor the HOST_SYS environment variable // Get all sensors and honor the HOST_SYS environment variable
path := os.Getenv("HOST_SYS") path := internal.GetSysPath()
if path == "" {
path = "/sys"
}
// Try to use the hwmon interface // Try to use the hwmon interface
temperatures, err := t.gatherHwmon(path) temperatures, err := t.gatherHwmon(path)

View File

@ -10,15 +10,10 @@ import (
"strings" "strings"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
// default host proc path
const defaultHostProc = "/proc"
// env host proc variable name
const envProc = "HOST_PROC"
// length of wireless interface fields // length of wireless interface fields
const interfaceFieldLength = 10 const interfaceFieldLength = 10
@ -41,7 +36,9 @@ type wirelessInterface struct {
// Gather collects the wireless information. // Gather collects the wireless information.
func (w *Wireless) Gather(acc telegraf.Accumulator) error { func (w *Wireless) Gather(acc telegraf.Accumulator) error {
// load proc path, get default value if config value and env variable are empty // load proc path, get default value if config value and env variable are empty
w.loadPath() if w.HostProc == "" {
w.HostProc = internal.GetProcPath()
}
wirelessPath := path.Join(w.HostProc, "net", "wireless") wirelessPath := path.Join(w.HostProc, "net", "wireless")
table, err := os.ReadFile(wirelessPath) table, err := os.ReadFile(wirelessPath)
@ -117,24 +114,6 @@ func (w *Wireless) loadWirelessTable(table []byte) ([]*wirelessInterface, error)
return wi, nil return wi, nil
} }
// loadPath can be used to read path firstly from config
// if it is empty then try read from env variable
func (w *Wireless) loadPath() {
if w.HostProc == "" {
w.HostProc = proc(envProc, defaultHostProc)
}
}
// proc can be used to read file paths from env
func proc(env, defaultPath string) string {
// try to read full file path
if p := os.Getenv(env); p != "" {
return p
}
// return default path
return defaultPath
}
func init() { func init() {
inputs.Add("wireless", func() telegraf.Input { inputs.Add("wireless", func() telegraf.Input {
return &Wireless{} return &Wireless{}