chore: Fix linter findings for `revive:exported` in `plugins/inputs/r*` (#16341)

This commit is contained in:
Paweł Żak 2025-01-17 08:07:09 +01:00 committed by GitHub
parent 2bde197712
commit 09e3b59eb4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 548 additions and 572 deletions

View File

@ -18,6 +18,9 @@ import (
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
//go:embed sample.conf
var sampleConfig string
type Radius struct { type Radius struct {
Servers []string `toml:"servers"` Servers []string `toml:"servers"`
Username config.Secret `toml:"username"` Username config.Secret `toml:"username"`
@ -29,9 +32,6 @@ type Radius struct {
client radius.Client client radius.Client
} }
//go:embed sample.conf
var sampleConfig string
func (*Radius) SampleConfig() string { func (*Radius) SampleConfig() string {
return sampleConfig return sampleConfig
} }

View File

@ -21,7 +21,7 @@ import (
var sampleConfig string var sampleConfig string
type Raindrops struct { type Raindrops struct {
Urls []string Urls []string `toml:"urls"`
httpClient *http.Client httpClient *http.Client
} }

View File

@ -22,28 +22,6 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
// Ras plugin gathers and counts errors provided by RASDaemon
type Ras struct {
DBPath string `toml:"db_path"`
Log telegraf.Logger `toml:"-"`
db *sql.DB
latestTimestamp time.Time
cpuSocketCounters map[int]metricCounters
serverCounters metricCounters
}
type machineCheckError struct {
ID int
Timestamp string
SocketID int
ErrorMsg string
MciStatusMsg string
}
type metricCounters map[string]int64
const ( const (
mceQuery = ` mceQuery = `
SELECT SELECT
@ -72,6 +50,26 @@ const (
unclassifiedMCEBase = "unclassified_mce_errors" unclassifiedMCEBase = "unclassified_mce_errors"
) )
type Ras struct {
DBPath string `toml:"db_path"`
Log telegraf.Logger `toml:"-"`
db *sql.DB
latestTimestamp time.Time
cpuSocketCounters map[int]metricCounters
serverCounters metricCounters
}
type machineCheckError struct {
id int
timestamp string
socketID int
errorMsg string
mciStatusMsg string
}
type metricCounters map[string]int64
func (*Ras) SampleConfig() string { func (*Ras) SampleConfig() string {
return sampleConfig return sampleConfig
} }
@ -91,16 +89,6 @@ func (r *Ras) Start(telegraf.Accumulator) error {
return nil return nil
} }
// Stop closes any existing DB connection
func (r *Ras) Stop() {
if r.db != nil {
err := r.db.Close()
if err != nil {
r.Log.Errorf("Error appeared during closing DB (%s): %v", r.DBPath, err)
}
}
}
// Gather reads the stats provided by RASDaemon and writes it to the Accumulator. // Gather reads the stats provided by RASDaemon and writes it to the Accumulator.
func (r *Ras) Gather(acc telegraf.Accumulator) error { func (r *Ras) Gather(acc telegraf.Accumulator) error {
rows, err := r.db.Query(mceQuery, r.latestTimestamp) rows, err := r.db.Query(mceQuery, r.latestTimestamp)
@ -114,7 +102,7 @@ func (r *Ras) Gather(acc telegraf.Accumulator) error {
if err != nil { if err != nil {
return err return err
} }
tsErr := r.updateLatestTimestamp(mcError.Timestamp) tsErr := r.updateLatestTimestamp(mcError.timestamp)
if tsErr != nil { if tsErr != nil {
return err return err
} }
@ -127,6 +115,16 @@ func (r *Ras) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
// Stop closes any existing DB connection
func (r *Ras) Stop() {
if r.db != nil {
err := r.db.Close()
if err != nil {
r.Log.Errorf("Error appeared during closing DB (%s): %v", r.DBPath, err)
}
}
}
func (r *Ras) updateLatestTimestamp(timestamp string) error { func (r *Ras) updateLatestTimestamp(timestamp string) error {
ts, err := parseDate(timestamp) ts, err := parseDate(timestamp)
if err != nil { if err != nil {
@ -140,11 +138,11 @@ func (r *Ras) updateLatestTimestamp(timestamp string) error {
} }
func (r *Ras) updateCounters(mcError *machineCheckError) { func (r *Ras) updateCounters(mcError *machineCheckError) {
if strings.Contains(mcError.ErrorMsg, "No Error") { if strings.Contains(mcError.errorMsg, "No Error") {
return return
} }
r.initializeCPUMetricDataIfRequired(mcError.SocketID) r.initializeCPUMetricDataIfRequired(mcError.socketID)
r.updateSocketCounters(mcError) r.updateSocketCounters(mcError)
r.updateServerCounters(mcError) r.updateServerCounters(mcError)
} }
@ -170,11 +168,11 @@ func newMetricCounters() *metricCounters {
} }
func (r *Ras) updateServerCounters(mcError *machineCheckError) { func (r *Ras) updateServerCounters(mcError *machineCheckError) {
if strings.Contains(mcError.ErrorMsg, "CACHE Level-2") && strings.Contains(mcError.ErrorMsg, "Error") { if strings.Contains(mcError.errorMsg, "CACHE Level-2") && strings.Contains(mcError.errorMsg, "Error") {
r.serverCounters[levelTwoCache]++ r.serverCounters[levelTwoCache]++
} }
if strings.Contains(mcError.ErrorMsg, "UPI:") { if strings.Contains(mcError.errorMsg, "UPI:") {
r.serverCounters[upi]++ r.serverCounters[upi]++
} }
} }
@ -210,71 +208,71 @@ func (r *Ras) updateSocketCounters(mcError *machineCheckError) {
r.updateMemoryCounters(mcError) r.updateMemoryCounters(mcError)
r.updateProcessorBaseCounters(mcError) r.updateProcessorBaseCounters(mcError)
if strings.Contains(mcError.ErrorMsg, "Instruction TLB") && strings.Contains(mcError.ErrorMsg, "Error") { if strings.Contains(mcError.errorMsg, "Instruction TLB") && strings.Contains(mcError.errorMsg, "Error") {
r.cpuSocketCounters[mcError.SocketID][instructionTLB]++ r.cpuSocketCounters[mcError.socketID][instructionTLB]++
} }
if strings.Contains(mcError.ErrorMsg, "BUS") && strings.Contains(mcError.ErrorMsg, "Error") { if strings.Contains(mcError.errorMsg, "BUS") && strings.Contains(mcError.errorMsg, "Error") {
r.cpuSocketCounters[mcError.SocketID][processorBus]++ r.cpuSocketCounters[mcError.socketID][processorBus]++
} }
if (strings.Contains(mcError.ErrorMsg, "CACHE Level-0") || if (strings.Contains(mcError.errorMsg, "CACHE Level-0") ||
strings.Contains(mcError.ErrorMsg, "CACHE Level-1")) && strings.Contains(mcError.errorMsg, "CACHE Level-1")) &&
strings.Contains(mcError.ErrorMsg, "Error") { strings.Contains(mcError.errorMsg, "Error") {
r.cpuSocketCounters[mcError.SocketID][instructionCache]++ r.cpuSocketCounters[mcError.socketID][instructionCache]++
} }
} }
func (r *Ras) updateProcessorBaseCounters(mcError *machineCheckError) { func (r *Ras) updateProcessorBaseCounters(mcError *machineCheckError) {
if strings.Contains(mcError.ErrorMsg, "Internal Timer error") { if strings.Contains(mcError.errorMsg, "Internal Timer error") {
r.cpuSocketCounters[mcError.SocketID][internalTimer]++ r.cpuSocketCounters[mcError.socketID][internalTimer]++
r.cpuSocketCounters[mcError.SocketID][processorBase]++ r.cpuSocketCounters[mcError.socketID][processorBase]++
} }
if strings.Contains(mcError.ErrorMsg, "SMM Handler Code Access Violation") { if strings.Contains(mcError.errorMsg, "SMM Handler Code Access Violation") {
r.cpuSocketCounters[mcError.SocketID][smmHandlerCode]++ r.cpuSocketCounters[mcError.socketID][smmHandlerCode]++
r.cpuSocketCounters[mcError.SocketID][processorBase]++ r.cpuSocketCounters[mcError.socketID][processorBase]++
} }
if strings.Contains(mcError.ErrorMsg, "Internal parity error") { if strings.Contains(mcError.errorMsg, "Internal parity error") {
r.cpuSocketCounters[mcError.SocketID][internalParity]++ r.cpuSocketCounters[mcError.socketID][internalParity]++
r.cpuSocketCounters[mcError.SocketID][processorBase]++ r.cpuSocketCounters[mcError.socketID][processorBase]++
} }
if strings.Contains(mcError.ErrorMsg, "FRC error") { if strings.Contains(mcError.errorMsg, "FRC error") {
r.cpuSocketCounters[mcError.SocketID][frc]++ r.cpuSocketCounters[mcError.socketID][frc]++
r.cpuSocketCounters[mcError.SocketID][processorBase]++ r.cpuSocketCounters[mcError.socketID][processorBase]++
} }
if strings.Contains(mcError.ErrorMsg, "External error") { if strings.Contains(mcError.errorMsg, "External error") {
r.cpuSocketCounters[mcError.SocketID][externalMCEBase]++ r.cpuSocketCounters[mcError.socketID][externalMCEBase]++
r.cpuSocketCounters[mcError.SocketID][processorBase]++ r.cpuSocketCounters[mcError.socketID][processorBase]++
} }
if strings.Contains(mcError.ErrorMsg, "Microcode ROM parity error") { if strings.Contains(mcError.errorMsg, "Microcode ROM parity error") {
r.cpuSocketCounters[mcError.SocketID][microcodeROMParity]++ r.cpuSocketCounters[mcError.socketID][microcodeROMParity]++
r.cpuSocketCounters[mcError.SocketID][processorBase]++ r.cpuSocketCounters[mcError.socketID][processorBase]++
} }
if strings.Contains(mcError.ErrorMsg, "Unclassified") || strings.Contains(mcError.ErrorMsg, "Internal unclassified") { if strings.Contains(mcError.errorMsg, "Unclassified") || strings.Contains(mcError.errorMsg, "Internal unclassified") {
r.cpuSocketCounters[mcError.SocketID][unclassifiedMCEBase]++ r.cpuSocketCounters[mcError.socketID][unclassifiedMCEBase]++
r.cpuSocketCounters[mcError.SocketID][processorBase]++ r.cpuSocketCounters[mcError.socketID][processorBase]++
} }
} }
func (r *Ras) updateMemoryCounters(mcError *machineCheckError) { func (r *Ras) updateMemoryCounters(mcError *machineCheckError) {
if strings.Contains(mcError.ErrorMsg, "Memory read error") { if strings.Contains(mcError.errorMsg, "Memory read error") {
if strings.Contains(mcError.MciStatusMsg, "Corrected_error") { if strings.Contains(mcError.mciStatusMsg, "Corrected_error") {
r.cpuSocketCounters[mcError.SocketID][memoryReadCorrected]++ r.cpuSocketCounters[mcError.socketID][memoryReadCorrected]++
} else { } else {
r.cpuSocketCounters[mcError.SocketID][memoryReadUncorrected]++ r.cpuSocketCounters[mcError.socketID][memoryReadUncorrected]++
} }
} }
if strings.Contains(mcError.ErrorMsg, "Memory write error") { if strings.Contains(mcError.errorMsg, "Memory write error") {
if strings.Contains(mcError.MciStatusMsg, "Corrected_error") { if strings.Contains(mcError.mciStatusMsg, "Corrected_error") {
r.cpuSocketCounters[mcError.SocketID][memoryWriteCorrected]++ r.cpuSocketCounters[mcError.socketID][memoryWriteCorrected]++
} else { } else {
r.cpuSocketCounters[mcError.SocketID][memoryWriteUncorrected]++ r.cpuSocketCounters[mcError.socketID][memoryWriteUncorrected]++
} }
} }
} }
@ -305,7 +303,7 @@ func addServerMetrics(acc telegraf.Accumulator, counters map[string]int64) {
func fetchMachineCheckError(rows *sql.Rows) (*machineCheckError, error) { func fetchMachineCheckError(rows *sql.Rows) (*machineCheckError, error) {
mcError := &machineCheckError{} mcError := &machineCheckError{}
err := rows.Scan(&mcError.ID, &mcError.Timestamp, &mcError.ErrorMsg, &mcError.MciStatusMsg, &mcError.SocketID) err := rows.Scan(&mcError.id, &mcError.timestamp, &mcError.errorMsg, &mcError.mciStatusMsg, &mcError.socketID)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -17,14 +17,14 @@ type Ras struct {
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
} }
func (*Ras) SampleConfig() string { return sampleConfig }
func (r *Ras) Init() error { func (r *Ras) Init() error {
r.Log.Warn("current platform is not supported") r.Log.Warn("Current platform is not supported")
return nil return nil
} }
func (*Ras) SampleConfig() string { return sampleConfig }
func (*Ras) Gather(_ telegraf.Accumulator) error { return nil } func (*Ras) Gather(telegraf.Accumulator) error { return nil }
func (*Ras) Start(_ telegraf.Accumulator) error { return nil }
func (*Ras) Stop() {}
func init() { func init() {
inputs.Add("ras", func() telegraf.Input { inputs.Add("ras", func() telegraf.Input {

View File

@ -38,26 +38,26 @@ func TestUpdateLatestTimestamp(t *testing.T) {
ts := "2020-08-01 15:13:27 +0200" ts := "2020-08-01 15:13:27 +0200"
testData = append(testData, []machineCheckError{ testData = append(testData, []machineCheckError{
{ {
Timestamp: "2019-05-20 08:25:55 +0200", timestamp: "2019-05-20 08:25:55 +0200",
SocketID: 0, socketID: 0,
ErrorMsg: "", errorMsg: "",
MciStatusMsg: "", mciStatusMsg: "",
}, },
{ {
Timestamp: "2018-02-21 12:27:22 +0200", timestamp: "2018-02-21 12:27:22 +0200",
SocketID: 0, socketID: 0,
ErrorMsg: "", errorMsg: "",
MciStatusMsg: "", mciStatusMsg: "",
}, },
{ {
Timestamp: ts, timestamp: ts,
SocketID: 0, socketID: 0,
ErrorMsg: "", errorMsg: "",
MciStatusMsg: "", mciStatusMsg: "",
}, },
}...) }...)
for _, mce := range testData { for _, mce := range testData {
err := ras.updateLatestTimestamp(mce.Timestamp) err := ras.updateLatestTimestamp(mce.timestamp)
require.NoError(t, err) require.NoError(t, err)
} }
require.Equal(t, ts, ras.latestTimestamp.Format(dateLayout)) require.Equal(t, ts, ras.latestTimestamp.Format(dateLayout))
@ -69,28 +69,28 @@ func TestMultipleSockets(t *testing.T) {
overflow := "Error_overflow Corrected_error" overflow := "Error_overflow Corrected_error"
testData = []machineCheckError{ testData = []machineCheckError{
{ {
Timestamp: "2019-05-20 08:25:55 +0200", timestamp: "2019-05-20 08:25:55 +0200",
SocketID: 0, socketID: 0,
ErrorMsg: cacheL2, errorMsg: cacheL2,
MciStatusMsg: overflow, mciStatusMsg: overflow,
}, },
{ {
Timestamp: "2018-02-21 12:27:22 +0200", timestamp: "2018-02-21 12:27:22 +0200",
SocketID: 1, socketID: 1,
ErrorMsg: cacheL2, errorMsg: cacheL2,
MciStatusMsg: overflow, mciStatusMsg: overflow,
}, },
{ {
Timestamp: "2020-03-21 14:17:28 +0200", timestamp: "2020-03-21 14:17:28 +0200",
SocketID: 2, socketID: 2,
ErrorMsg: cacheL2, errorMsg: cacheL2,
MciStatusMsg: overflow, mciStatusMsg: overflow,
}, },
{ {
Timestamp: "2020-03-21 17:24:18 +0200", timestamp: "2020-03-21 17:24:18 +0200",
SocketID: 3, socketID: 3,
ErrorMsg: cacheL2, errorMsg: cacheL2,
MciStatusMsg: overflow, mciStatusMsg: overflow,
}, },
} }
for i := range testData { for i := range testData {
@ -150,105 +150,105 @@ func newRas() *Ras {
var testData = []machineCheckError{ var testData = []machineCheckError{
{ {
Timestamp: "2020-05-20 07:34:53 +0200", timestamp: "2020-05-20 07:34:53 +0200",
SocketID: 0, socketID: 0,
ErrorMsg: "MEMORY CONTROLLER RD_CHANNEL0_ERR Transaction: Memory read error", errorMsg: "MEMORY CONTROLLER RD_CHANNEL0_ERR Transaction: Memory read error",
MciStatusMsg: "Error_overflow Corrected_error", mciStatusMsg: "Error_overflow Corrected_error",
}, },
{ {
Timestamp: "2020-05-20 07:35:11 +0200", timestamp: "2020-05-20 07:35:11 +0200",
SocketID: 0, socketID: 0,
ErrorMsg: "MEMORY CONTROLLER RD_CHANNEL0_ERR Transaction: Memory read error", errorMsg: "MEMORY CONTROLLER RD_CHANNEL0_ERR Transaction: Memory read error",
MciStatusMsg: "Uncorrected_error", mciStatusMsg: "Uncorrected_error",
}, },
{ {
Timestamp: "2020-05-20 07:37:50 +0200", timestamp: "2020-05-20 07:37:50 +0200",
SocketID: 0, socketID: 0,
ErrorMsg: "MEMORY CONTROLLER RD_CHANNEL2_ERR Transaction: Memory write error", errorMsg: "MEMORY CONTROLLER RD_CHANNEL2_ERR Transaction: Memory write error",
MciStatusMsg: "Uncorrected_error", mciStatusMsg: "Uncorrected_error",
}, },
{ {
Timestamp: "2020-05-20 08:14:51 +0200", timestamp: "2020-05-20 08:14:51 +0200",
SocketID: 0, socketID: 0,
ErrorMsg: "MEMORY CONTROLLER WR_CHANNEL2_ERR Transaction: Memory write error", errorMsg: "MEMORY CONTROLLER WR_CHANNEL2_ERR Transaction: Memory write error",
MciStatusMsg: "Error_overflow Corrected_error", mciStatusMsg: "Error_overflow Corrected_error",
}, },
{ {
Timestamp: "2020-05-20 08:15:31 +0200", timestamp: "2020-05-20 08:15:31 +0200",
SocketID: 0, socketID: 0,
ErrorMsg: "corrected filtering (some unreported errors in same region) Instruction CACHE Level-0 Read Error", errorMsg: "corrected filtering (some unreported errors in same region) Instruction CACHE Level-0 Read Error",
MciStatusMsg: "Error_overflow Corrected_error", mciStatusMsg: "Error_overflow Corrected_error",
}, },
{ {
Timestamp: "2020-05-20 08:16:32 +0200", timestamp: "2020-05-20 08:16:32 +0200",
SocketID: 0, socketID: 0,
ErrorMsg: "Instruction TLB Level-0 Error", errorMsg: "Instruction TLB Level-0 Error",
MciStatusMsg: "Error_overflow Corrected_error", mciStatusMsg: "Error_overflow Corrected_error",
}, },
{ {
Timestamp: "2020-05-20 08:16:56 +0200", timestamp: "2020-05-20 08:16:56 +0200",
SocketID: 0, socketID: 0,
ErrorMsg: "No Error", errorMsg: "No Error",
MciStatusMsg: "Error_overflow Corrected_error", mciStatusMsg: "Error_overflow Corrected_error",
}, },
{ {
Timestamp: "2020-05-20 08:17:24 +0200", timestamp: "2020-05-20 08:17:24 +0200",
SocketID: 0, socketID: 0,
ErrorMsg: "Unclassified", errorMsg: "Unclassified",
MciStatusMsg: "Error_overflow Corrected_error", mciStatusMsg: "Error_overflow Corrected_error",
}, },
{ {
Timestamp: "2020-05-20 08:17:41 +0200", timestamp: "2020-05-20 08:17:41 +0200",
SocketID: 0, socketID: 0,
ErrorMsg: "Microcode ROM parity error", errorMsg: "Microcode ROM parity error",
MciStatusMsg: "Error_overflow Corrected_error", mciStatusMsg: "Error_overflow Corrected_error",
}, },
{ {
Timestamp: "2020-05-20 08:17:48 +0200", timestamp: "2020-05-20 08:17:48 +0200",
SocketID: 0, socketID: 0,
ErrorMsg: "FRC error", errorMsg: "FRC error",
MciStatusMsg: "Error_overflow Corrected_error", mciStatusMsg: "Error_overflow Corrected_error",
}, },
{ {
Timestamp: "2020-05-20 08:18:18 +0200", timestamp: "2020-05-20 08:18:18 +0200",
SocketID: 0, socketID: 0,
ErrorMsg: "Internal parity error", errorMsg: "Internal parity error",
MciStatusMsg: "Error_overflow Corrected_error", mciStatusMsg: "Error_overflow Corrected_error",
}, },
{ {
Timestamp: "2020-05-20 08:18:34 +0200", timestamp: "2020-05-20 08:18:34 +0200",
SocketID: 0, socketID: 0,
ErrorMsg: "SMM Handler Code Access Violation", errorMsg: "SMM Handler Code Access Violation",
MciStatusMsg: "Error_overflow Corrected_error", mciStatusMsg: "Error_overflow Corrected_error",
}, },
{ {
Timestamp: "2020-05-20 08:18:54 +0200", timestamp: "2020-05-20 08:18:54 +0200",
SocketID: 0, socketID: 0,
ErrorMsg: "Internal Timer error", errorMsg: "Internal Timer error",
MciStatusMsg: "Error_overflow Corrected_error", mciStatusMsg: "Error_overflow Corrected_error",
}, },
{ {
Timestamp: "2020-05-20 08:21:23 +0200", timestamp: "2020-05-20 08:21:23 +0200",
SocketID: 0, socketID: 0,
ErrorMsg: "BUS Level-3 Generic Generic IO Request-did-not-timeout Error", errorMsg: "BUS Level-3 Generic Generic IO Request-did-not-timeout Error",
MciStatusMsg: "Error_overflow Corrected_error", mciStatusMsg: "Error_overflow Corrected_error",
}, },
{ {
Timestamp: "2020-05-20 08:23:23 +0200", timestamp: "2020-05-20 08:23:23 +0200",
SocketID: 0, socketID: 0,
ErrorMsg: "External error", errorMsg: "External error",
MciStatusMsg: "Error_overflow Corrected_error", mciStatusMsg: "Error_overflow Corrected_error",
}, },
{ {
Timestamp: "2020-05-20 08:25:31 +0200", timestamp: "2020-05-20 08:25:31 +0200",
SocketID: 0, socketID: 0,
ErrorMsg: "UPI: COR LL Rx detected CRC error - successful LLR without Phy Reinit", errorMsg: "UPI: COR LL Rx detected CRC error - successful LLR without Phy Reinit",
MciStatusMsg: "Error_overflow Corrected_error", mciStatusMsg: "Error_overflow Corrected_error",
}, },
{ {
Timestamp: "2020-05-20 08:25:55 +0200", timestamp: "2020-05-20 08:25:55 +0200",
SocketID: 0, socketID: 0,
ErrorMsg: "Instruction CACHE Level-2 Generic Error", errorMsg: "Instruction CACHE Level-2 Generic Error",
MciStatusMsg: "Error_overflow Corrected_error", mciStatusMsg: "Error_overflow Corrected_error",
}, },
} }

View File

@ -21,14 +21,11 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
// defaultURL will set a default value that corresponds to the default value const (
// used by RavenDB defaultURL = "http://localhost:8080"
const defaultURL = "http://localhost:8080" defaultTimeout = 5
)
const defaultTimeout = 5
// RavenDB defines the configuration necessary for gathering metrics,
// see the sample config for further details
type RavenDB struct { type RavenDB struct {
URL string `toml:"url"` URL string `toml:"url"`
Name string `toml:"name"` Name string `toml:"name"`
@ -55,6 +52,30 @@ func (*RavenDB) SampleConfig() string {
return sampleConfig return sampleConfig
} }
func (r *RavenDB) Init() error {
if r.URL == "" {
r.URL = defaultURL
}
r.requestURLServer = r.URL + "/admin/monitoring/v1/server"
r.requestURLDatabases = r.URL + "/admin/monitoring/v1/databases" + prepareDBNamesURLPart(r.DbStatsDbs)
r.requestURLIndexes = r.URL + "/admin/monitoring/v1/indexes" + prepareDBNamesURLPart(r.IndexStatsDbs)
r.requestURLCollection = r.URL + "/admin/monitoring/v1/collections" + prepareDBNamesURLPart(r.IndexStatsDbs)
err := choice.CheckSlice(r.StatsInclude, []string{"server", "databases", "indexes", "collections"})
if err != nil {
return err
}
err = r.ensureClient()
if nil != err {
r.Log.Errorf("Error with Client %s", err)
return err
}
return nil
}
func (r *RavenDB) Gather(acc telegraf.Accumulator) error { func (r *RavenDB) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup var wg sync.WaitGroup
@ -363,30 +384,6 @@ func prepareDBNamesURLPart(dbNames []string) string {
return result return result
} }
func (r *RavenDB) Init() error {
if r.URL == "" {
r.URL = defaultURL
}
r.requestURLServer = r.URL + "/admin/monitoring/v1/server"
r.requestURLDatabases = r.URL + "/admin/monitoring/v1/databases" + prepareDBNamesURLPart(r.DbStatsDbs)
r.requestURLIndexes = r.URL + "/admin/monitoring/v1/indexes" + prepareDBNamesURLPart(r.IndexStatsDbs)
r.requestURLCollection = r.URL + "/admin/monitoring/v1/collections" + prepareDBNamesURLPart(r.IndexStatsDbs)
err := choice.CheckSlice(r.StatsInclude, []string{"server", "databases", "indexes", "collections"})
if err != nil {
return err
}
err = r.ensureClient()
if nil != err {
r.Log.Errorf("Error with Client %s", err)
return err
}
return nil
}
func init() { func init() {
inputs.Add("ravendb", func() telegraf.Input { inputs.Add("ravendb", func() telegraf.Input {
return &RavenDB{ return &RavenDB{

View File

@ -24,6 +24,12 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
const (
// tag sets used for including redfish OData link parent data
tagSetChassisLocation = "chassis.location"
tagSetChassis = "chassis"
)
type Redfish struct { type Redfish struct {
Address string `toml:"address"` Address string `toml:"address"`
Username config.Secret `toml:"username"` Username config.Secret `toml:"username"`
@ -40,12 +46,6 @@ type Redfish struct {
baseURL *url.URL baseURL *url.URL
} }
const (
// tag sets used for including redfish OData link parent data
tagSetChassisLocation = "chassis.location"
tagSetChassis = "chassis"
)
type system struct { type system struct {
Hostname string `json:"hostname"` Hostname string `json:"hostname"`
Links struct { Links struct {
@ -215,6 +215,41 @@ func (r *Redfish) Init() error {
return nil return nil
} }
func (r *Redfish) Gather(acc telegraf.Accumulator) error {
address, _, err := net.SplitHostPort(r.baseURL.Host)
if err != nil {
address = r.baseURL.Host
}
system, err := r.getComputerSystem(r.ComputerSystemID)
if err != nil {
return err
}
for _, link := range system.Links.Chassis {
chassis, err := r.getChassis(link.Ref)
if err != nil {
return err
}
for _, metric := range r.IncludeMetrics {
var err error
switch metric {
case "thermal":
err = r.gatherThermal(acc, address, system, chassis)
case "power":
err = r.gatherPower(acc, address, system, chassis)
default:
return fmt.Errorf("unknown metric requested: %s", metric)
}
if err != nil {
return err
}
}
}
return nil
}
func (r *Redfish) getData(address string, payload interface{}) error { func (r *Redfish) getData(address string, payload interface{}) error {
req, err := http.NewRequest("GET", address, nil) req, err := http.NewRequest("GET", address, nil)
if err != nil { if err != nil {
@ -323,41 +358,6 @@ func setChassisTags(chassis *chassis, tags map[string]string) {
tags["chassis_health"] = chassis.Status.Health tags["chassis_health"] = chassis.Status.Health
} }
func (r *Redfish) Gather(acc telegraf.Accumulator) error {
address, _, err := net.SplitHostPort(r.baseURL.Host)
if err != nil {
address = r.baseURL.Host
}
system, err := r.getComputerSystem(r.ComputerSystemID)
if err != nil {
return err
}
for _, link := range system.Links.Chassis {
chassis, err := r.getChassis(link.Ref)
if err != nil {
return err
}
for _, metric := range r.IncludeMetrics {
var err error
switch metric {
case "thermal":
err = r.gatherThermal(acc, address, system, chassis)
case "power":
err = r.gatherPower(acc, address, system, chassis)
default:
return fmt.Errorf("unknown metric requested: %s", metric)
}
if err != nil {
return err
}
}
}
return nil
}
func (r *Redfish) gatherThermal(acc telegraf.Accumulator, address string, system *system, chassis *chassis) error { func (r *Redfish) gatherThermal(acc telegraf.Accumulator, address string, system *system, chassis *chassis) error {
thermal, err := r.getThermal(chassis.Thermal.Ref) thermal, err := r.getThermal(chassis.Thermal.Ref)
if err != nil { if err != nil {

View File

@ -25,14 +25,17 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
type RedisCommand struct { var (
Command []interface{} `toml:"command"` replicationSlaveMetricPrefix = regexp.MustCompile(`^slave\d+`)
Field string `toml:"field"` tracking = map[string]string{
Type string `toml:"type"` "uptime_in_seconds": "uptime",
} "connected_clients": "clients",
"role": "replication_role",
}
)
type Redis struct { type Redis struct {
Commands []*RedisCommand `toml:"commands"` Commands []*redisCommand `toml:"commands"`
Servers []string `toml:"servers"` Servers []string `toml:"servers"`
Username string `toml:"username"` Username string `toml:"username"`
Password string `toml:"password"` Password string `toml:"password"`
@ -41,24 +44,23 @@ type Redis struct {
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
clients []Client clients []client
connected bool connected bool
} }
type Client interface { type redisCommand struct {
Do(returnType string, args ...interface{}) (interface{}, error) Command []interface{} `toml:"command"`
Info() *redis.StringCmd Field string `toml:"field"`
BaseTags() map[string]string Type string `toml:"type"`
Close() error
} }
type RedisClient struct { type redisClient struct {
client *redis.Client client *redis.Client
tags map[string]string tags map[string]string
} }
// RedisFieldTypes defines the types expected for each of the fields redis reports on // redisFieldTypes defines the types expected for each of the fields redis reports on
type RedisFieldTypes struct { type redisFieldTypes struct {
ActiveDefragHits int64 `json:"active_defrag_hits"` ActiveDefragHits int64 `json:"active_defrag_hits"`
ActiveDefragKeyHits int64 `json:"active_defrag_key_hits"` ActiveDefragKeyHits int64 `json:"active_defrag_key_hits"`
ActiveDefragKeyMisses int64 `json:"active_defrag_key_misses"` ActiveDefragKeyMisses int64 `json:"active_defrag_key_misses"`
@ -168,43 +170,11 @@ type RedisFieldTypes struct {
UsedMemoryStartup int64 `json:"used_memory_startup"` UsedMemoryStartup int64 `json:"used_memory_startup"`
} }
func (r *RedisClient) Do(returnType string, args ...interface{}) (interface{}, error) { type client interface {
rawVal := r.client.Do(context.Background(), args...) do(returnType string, args ...interface{}) (interface{}, error)
info() *redis.StringCmd
switch returnType { baseTags() map[string]string
case "integer": close() error
return rawVal.Int64()
case "string":
return rawVal.Text()
case "float":
return rawVal.Float64()
default:
return rawVal.Text()
}
}
func (r *RedisClient) Info() *redis.StringCmd {
return r.client.Info(context.Background(), "ALL")
}
func (r *RedisClient) BaseTags() map[string]string {
tags := make(map[string]string)
for k, v := range r.tags {
tags[k] = v
}
return tags
}
func (r *RedisClient) Close() error {
return r.client.Close()
}
var replicationSlaveMetricPrefix = regexp.MustCompile(`^slave\d+`)
var Tracking = map[string]string{
"uptime_in_seconds": "uptime",
"connected_clients": "clients",
"role": "replication_role",
} }
func (*Redis) SampleConfig() string { func (*Redis) SampleConfig() string {
@ -221,6 +191,43 @@ func (r *Redis) Init() error {
return nil return nil
} }
func (*Redis) Start(telegraf.Accumulator) error {
return nil
}
func (r *Redis) Gather(acc telegraf.Accumulator) error {
if !r.connected {
err := r.connect()
if err != nil {
return err
}
}
var wg sync.WaitGroup
for _, cl := range r.clients {
wg.Add(1)
go func(client client) {
defer wg.Done()
acc.AddError(gatherServer(client, acc))
acc.AddError(r.gatherCommandValues(client, acc))
}(cl)
}
wg.Wait()
return nil
}
// Stop close the client through ServiceInput interface Start/Stop methods impl.
func (r *Redis) Stop() {
for _, c := range r.clients {
err := c.close()
if err != nil {
r.Log.Errorf("error closing client: %v", err)
}
}
}
func (r *Redis) connect() error { func (r *Redis) connect() error {
if r.connected { if r.connected {
return nil return nil
@ -230,7 +237,7 @@ func (r *Redis) connect() error {
r.Servers = []string{"tcp://localhost:6379"} r.Servers = []string{"tcp://localhost:6379"}
} }
r.clients = make([]Client, 0, len(r.Servers)) r.clients = make([]client, 0, len(r.Servers))
for _, serv := range r.Servers { for _, serv := range r.Servers {
if !strings.HasPrefix(serv, "tcp://") && !strings.HasPrefix(serv, "unix://") { if !strings.HasPrefix(serv, "tcp://") && !strings.HasPrefix(serv, "unix://") {
r.Log.Warn("Server URL found without scheme; please update your configuration file") r.Log.Warn("Server URL found without scheme; please update your configuration file")
@ -289,7 +296,7 @@ func (r *Redis) connect() error {
tags["port"] = u.Port() tags["port"] = u.Port()
} }
r.clients = append(r.clients, &RedisClient{ r.clients = append(r.clients, &redisClient{
client: client, client: client,
tags: tags, tags: tags,
}) })
@ -299,35 +306,10 @@ func (r *Redis) connect() error {
return nil return nil
} }
// Reads stats from all configured servers accumulates stats. func (r *Redis) gatherCommandValues(client client, acc telegraf.Accumulator) error {
// Returns one of the errors encountered while gather stats (if any).
func (r *Redis) Gather(acc telegraf.Accumulator) error {
if !r.connected {
err := r.connect()
if err != nil {
return err
}
}
var wg sync.WaitGroup
for _, client := range r.clients {
wg.Add(1)
go func(client Client) {
defer wg.Done()
acc.AddError(gatherServer(client, acc))
acc.AddError(r.gatherCommandValues(client, acc))
}(client)
}
wg.Wait()
return nil
}
func (r *Redis) gatherCommandValues(client Client, acc telegraf.Accumulator) error {
fields := make(map[string]interface{}) fields := make(map[string]interface{})
for _, command := range r.Commands { for _, command := range r.Commands {
val, err := client.Do(command.Type, command.Command...) val, err := client.do(command.Type, command.Command...)
if err != nil { if err != nil {
if strings.Contains(err.Error(), "unexpected type=") { if strings.Contains(err.Error(), "unexpected type=") {
return fmt.Errorf("could not get command result: %w", err) return fmt.Errorf("could not get command result: %w", err)
@ -339,27 +321,53 @@ func (r *Redis) gatherCommandValues(client Client, acc telegraf.Accumulator) err
fields[command.Field] = val fields[command.Field] = val
} }
acc.AddFields("redis_commands", fields, client.BaseTags()) acc.AddFields("redis_commands", fields, client.baseTags())
return nil return nil
} }
func gatherServer(client Client, acc telegraf.Accumulator) error { func (r *redisClient) do(returnType string, args ...interface{}) (interface{}, error) {
info, err := client.Info().Result() rawVal := r.client.Do(context.Background(), args...)
switch returnType {
case "integer":
return rawVal.Int64()
case "string":
return rawVal.Text()
case "float":
return rawVal.Float64()
default:
return rawVal.Text()
}
}
func (r *redisClient) info() *redis.StringCmd {
return r.client.Info(context.Background(), "ALL")
}
func (r *redisClient) baseTags() map[string]string {
tags := make(map[string]string)
for k, v := range r.tags {
tags[k] = v
}
return tags
}
func (r *redisClient) close() error {
return r.client.Close()
}
func gatherServer(client client, acc telegraf.Accumulator) error {
info, err := client.info().Result()
if err != nil { if err != nil {
return err return err
} }
rdr := strings.NewReader(info) rdr := strings.NewReader(info)
return gatherInfoOutput(rdr, acc, client.BaseTags()) return gatherInfoOutput(rdr, acc, client.baseTags())
} }
// gatherInfoOutput gathers func gatherInfoOutput(rdr io.Reader, acc telegraf.Accumulator, tags map[string]string) error {
func gatherInfoOutput(
rdr io.Reader,
acc telegraf.Accumulator,
tags map[string]string,
) error {
var section string var section string
var keyspaceHits, keyspaceMisses int64 var keyspaceHits, keyspaceMisses int64
@ -403,7 +411,7 @@ func gatherInfoOutput(
continue continue
} }
metric, ok := Tracking[name] metric, ok := tracking[name]
if !ok { if !ok {
if section == "Keyspace" { if section == "Keyspace" {
kline := strings.TrimSpace(parts[1]) kline := strings.TrimSpace(parts[1])
@ -412,12 +420,12 @@ func gatherInfoOutput(
} }
if section == "Commandstats" { if section == "Commandstats" {
kline := strings.TrimSpace(parts[1]) kline := strings.TrimSpace(parts[1])
gatherCommandstateLine(name, kline, acc, tags) gatherCommandStateLine(name, kline, acc, tags)
continue continue
} }
if section == "Latencystats" { if section == "Latencystats" {
kline := strings.TrimSpace(parts[1]) kline := strings.TrimSpace(parts[1])
gatherLatencystatsLine(name, kline, acc, tags) gatherLatencyStatsLine(name, kline, acc, tags)
continue continue
} }
if section == "Replication" && replicationSlaveMetricPrefix.MatchString(name) { if section == "Replication" && replicationSlaveMetricPrefix.MatchString(name) {
@ -427,7 +435,7 @@ func gatherInfoOutput(
} }
if section == "Errorstats" { if section == "Errorstats" {
kline := strings.TrimSpace(parts[1]) kline := strings.TrimSpace(parts[1])
gatherErrorstatsLine(name, kline, acc, tags) gatherErrorStatsLine(name, kline, acc, tags)
continue continue
} }
@ -475,7 +483,7 @@ func gatherInfoOutput(
} }
fields["keyspace_hitrate"] = keyspaceHitrate fields["keyspace_hitrate"] = keyspaceHitrate
o := RedisFieldTypes{} o := redisFieldTypes{}
setStructFieldsFromObject(fields, &o) setStructFieldsFromObject(fields, &o)
setExistingFieldsFromStruct(fields, &o) setExistingFieldsFromStruct(fields, &o)
@ -516,7 +524,7 @@ func gatherKeyspaceLine(name, line string, acc telegraf.Accumulator, globalTags
// cmdstat_publish:calls=33791,usec=208789,usec_per_call=6.18 // cmdstat_publish:calls=33791,usec=208789,usec_per_call=6.18
// //
// Tag: command=publish; Fields: calls=33791i,usec=208789i,usec_per_call=6.18 // Tag: command=publish; Fields: calls=33791i,usec=208789i,usec_per_call=6.18
func gatherCommandstateLine(name, line string, acc telegraf.Accumulator, globalTags map[string]string) { func gatherCommandStateLine(name, line string, acc telegraf.Accumulator, globalTags map[string]string) {
if !strings.HasPrefix(name, "cmdstat") { if !strings.HasPrefix(name, "cmdstat") {
return return
} }
@ -558,7 +566,7 @@ func gatherCommandstateLine(name, line string, acc telegraf.Accumulator, globalT
// latency_percentiles_usec_zadd:p50=9.023,p99=28.031,p99.9=43.007 // latency_percentiles_usec_zadd:p50=9.023,p99=28.031,p99.9=43.007
// //
// Tag: command=zadd; Fields: p50=9.023,p99=28.031,p99.9=43.007 // Tag: command=zadd; Fields: p50=9.023,p99=28.031,p99.9=43.007
func gatherLatencystatsLine(name, line string, acc telegraf.Accumulator, globalTags map[string]string) { func gatherLatencyStatsLine(name, line string, acc telegraf.Accumulator, globalTags map[string]string) {
if !strings.HasPrefix(name, "latency_percentiles_usec") { if !strings.HasPrefix(name, "latency_percentiles_usec") {
return return
} }
@ -633,7 +641,7 @@ func gatherReplicationLine(name, line string, acc telegraf.Accumulator, globalTa
// //
// errorstat_ERR:count=37 // errorstat_ERR:count=37
// errorstat_MOVED:count=3626 // errorstat_MOVED:count=3626
func gatherErrorstatsLine(name, line string, acc telegraf.Accumulator, globalTags map[string]string) { func gatherErrorStatsLine(name, line string, acc telegraf.Accumulator, globalTags map[string]string) {
tags := make(map[string]string, len(globalTags)+1) tags := make(map[string]string, len(globalTags)+1)
for k, v := range globalTags { for k, v := range globalTags {
tags[k] = v tags[k] = v
@ -654,13 +662,7 @@ func gatherErrorstatsLine(name, line string, acc telegraf.Accumulator, globalTag
acc.AddFields("redis_errorstat", fields, tags) acc.AddFields("redis_errorstat", fields, tags)
} }
func init() { func setExistingFieldsFromStruct(fields map[string]interface{}, o *redisFieldTypes) {
inputs.Add("redis", func() telegraf.Input {
return &Redis{}
})
}
func setExistingFieldsFromStruct(fields map[string]interface{}, o *RedisFieldTypes) {
val := reflect.ValueOf(o).Elem() val := reflect.ValueOf(o).Elem()
typ := val.Type() typ := val.Type()
@ -678,7 +680,7 @@ func setExistingFieldsFromStruct(fields map[string]interface{}, o *RedisFieldTyp
} }
} }
func setStructFieldsFromObject(fields map[string]interface{}, o *RedisFieldTypes) { func setStructFieldsFromObject(fields map[string]interface{}, o *redisFieldTypes) {
val := reflect.ValueOf(o).Elem() val := reflect.ValueOf(o).Elem()
typ := val.Type() typ := val.Type()
@ -774,16 +776,8 @@ func coerceType(value interface{}, typ reflect.Type) reflect.Value {
return reflect.ValueOf(value) return reflect.ValueOf(value)
} }
func (*Redis) Start(telegraf.Accumulator) error { func init() {
return nil inputs.Add("redis", func() telegraf.Input {
} return &Redis{}
})
// Stop close the client through ServiceInput interface Start/Stop methods impl.
func (r *Redis) Stop() {
for _, c := range r.clients {
err := c.Close()
if err != nil {
r.Log.Errorf("error closing client: %v", err)
}
}
} }

View File

@ -2,6 +2,7 @@ package redis
import ( import (
"bufio" "bufio"
"fmt" "fmt"
"strings" "strings"
"testing" "testing"
@ -17,19 +18,19 @@ import (
type testClient struct{} type testClient struct{}
func (*testClient) BaseTags() map[string]string { func (*testClient) baseTags() map[string]string {
return map[string]string{"host": "redis.net"} return map[string]string{"host": "redis.net"}
} }
func (*testClient) Info() *redis.StringCmd { func (*testClient) info() *redis.StringCmd {
return nil return nil
} }
func (*testClient) Do(string, ...interface{}) (interface{}, error) { func (*testClient) do(string, ...interface{}) (interface{}, error) {
return 2, nil return 2, nil
} }
func (*testClient) Close() error { func (*testClient) close() error {
return nil return nil
} }
@ -67,15 +68,15 @@ func TestRedis_Commands(t *testing.T) {
tc := &testClient{} tc := &testClient{}
rc := &RedisCommand{ rc := &redisCommand{
Command: []interface{}{"llen", "test-list"}, Command: []interface{}{"llen", "test-list"},
Field: redisListKey, Field: redisListKey,
Type: "integer", Type: "integer",
} }
r := &Redis{ r := &Redis{
Commands: []*RedisCommand{rc}, Commands: []*redisCommand{rc},
clients: []Client{tc}, clients: []client{tc},
} }
err := r.gatherCommandValues(tc, &acc) err := r.gatherCommandValues(tc, &acc)
@ -389,17 +390,17 @@ func TestRedis_GatherErrorstatsLine(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
globalTags := map[string]string{} globalTags := map[string]string{}
gatherErrorstatsLine("FOO", "BAR", &acc, globalTags) gatherErrorStatsLine("FOO", "BAR", &acc, globalTags)
require.Len(t, acc.Errors, 1) require.Len(t, acc.Errors, 1)
require.Equal(t, "invalid line for \"FOO\": BAR", acc.Errors[0].Error()) require.Equal(t, "invalid line for \"FOO\": BAR", acc.Errors[0].Error())
acc = testutil.Accumulator{} acc = testutil.Accumulator{}
gatherErrorstatsLine("FOO", "BAR=a", &acc, globalTags) gatherErrorStatsLine("FOO", "BAR=a", &acc, globalTags)
require.Len(t, acc.Errors, 1) require.Len(t, acc.Errors, 1)
require.Equal(t, "parsing value in line \"BAR=a\" failed: strconv.ParseInt: parsing \"a\": invalid syntax", acc.Errors[0].Error()) require.Equal(t, "parsing value in line \"BAR=a\" failed: strconv.ParseInt: parsing \"a\": invalid syntax", acc.Errors[0].Error())
acc = testutil.Accumulator{} acc = testutil.Accumulator{}
gatherErrorstatsLine("FOO", "BAR=77", &acc, globalTags) gatherErrorStatsLine("FOO", "BAR=77", &acc, globalTags)
require.Empty(t, acc.Errors) require.Empty(t, acc.Errors)
} }

View File

@ -22,29 +22,25 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
const (
measurementMasters = "redis_sentinel_masters"
measurementSentinel = "redis_sentinel"
measurementSentinels = "redis_sentinel_sentinels"
measurementReplicas = "redis_sentinel_replicas"
)
type RedisSentinel struct { type RedisSentinel struct {
Servers []string `toml:"servers"` Servers []string `toml:"servers"`
tls.ClientConfig tls.ClientConfig
clients []*RedisSentinelClient clients []*redisSentinelClient
} }
type RedisSentinelClient struct { type redisSentinelClient struct {
sentinel *redis.SentinelClient sentinel *redis.SentinelClient
tags map[string]string tags map[string]string
} }
const measurementMasters = "redis_sentinel_masters"
const measurementSentinel = "redis_sentinel"
const measurementSentinels = "redis_sentinel_sentinels"
const measurementReplicas = "redis_sentinel_replicas"
func init() {
inputs.Add("redis_sentinel", func() telegraf.Input {
return &RedisSentinel{}
})
}
func (*RedisSentinel) SampleConfig() string { func (*RedisSentinel) SampleConfig() string {
return sampleConfig return sampleConfig
} }
@ -59,7 +55,7 @@ func (r *RedisSentinel) Init() error {
return err return err
} }
r.clients = make([]*RedisSentinelClient, 0, len(r.Servers)) r.clients = make([]*redisSentinelClient, 0, len(r.Servers))
for _, serv := range r.Servers { for _, serv := range r.Servers {
u, err := url.Parse(serv) u, err := url.Parse(serv)
if err != nil { if err != nil {
@ -101,7 +97,7 @@ func (r *RedisSentinel) Init() error {
}, },
) )
r.clients = append(r.clients, &RedisSentinelClient{ r.clients = append(r.clients, &redisSentinelClient{
sentinel: sentinel, sentinel: sentinel,
tags: tags, tags: tags,
}) })
@ -110,6 +106,32 @@ func (r *RedisSentinel) Init() error {
return nil return nil
} }
func (r *RedisSentinel) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
for _, client := range r.clients {
wg.Add(1)
go func(acc telegraf.Accumulator, client *redisSentinelClient) {
defer wg.Done()
masters, err := client.gatherMasterStats(acc)
acc.AddError(err)
for _, master := range masters {
acc.AddError(client.gatherReplicaStats(acc, master))
acc.AddError(client.gatherSentinelStats(acc, master))
}
acc.AddError(client.gatherInfoStats(acc))
}(acc, client)
}
wg.Wait()
return nil
}
// Redis list format has string key/values adjacent, so convert to a map for easier use // Redis list format has string key/values adjacent, so convert to a map for easier use
func toMap(vals []interface{}) map[string]string { func toMap(vals []interface{}) map[string]string {
m := make(map[string]string) m := make(map[string]string)
@ -170,35 +192,7 @@ func prepareFieldValues(fields map[string]string, typeMap map[string]configField
return preparedFields, nil return preparedFields, nil
} }
// Reads stats from all configured servers accumulates stats. func (client *redisSentinelClient) gatherInfoStats(acc telegraf.Accumulator) error {
// Returns one of the errors encountered while gather stats (if any).
func (r *RedisSentinel) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
for _, client := range r.clients {
wg.Add(1)
go func(acc telegraf.Accumulator, client *RedisSentinelClient) {
defer wg.Done()
masters, err := client.gatherMasterStats(acc)
acc.AddError(err)
for _, master := range masters {
acc.AddError(client.gatherReplicaStats(acc, master))
acc.AddError(client.gatherSentinelStats(acc, master))
}
acc.AddError(client.gatherInfoStats(acc))
}(acc, client)
}
wg.Wait()
return nil
}
func (client *RedisSentinelClient) gatherInfoStats(acc telegraf.Accumulator) error {
infoCmd := redis.NewStringCmd("info", "all") infoCmd := redis.NewStringCmd("info", "all")
if err := client.sentinel.Process(infoCmd); err != nil { if err := client.sentinel.Process(infoCmd); err != nil {
return err return err
@ -220,7 +214,7 @@ func (client *RedisSentinelClient) gatherInfoStats(acc telegraf.Accumulator) err
return nil return nil
} }
func (client *RedisSentinelClient) gatherMasterStats(acc telegraf.Accumulator) ([]string, error) { func (client *redisSentinelClient) gatherMasterStats(acc telegraf.Accumulator) ([]string, error) {
mastersCmd := redis.NewSliceCmd("sentinel", "masters") mastersCmd := redis.NewSliceCmd("sentinel", "masters")
if err := client.sentinel.Process(mastersCmd); err != nil { if err := client.sentinel.Process(mastersCmd); err != nil {
return nil, err return nil, err
@ -262,7 +256,7 @@ func (client *RedisSentinelClient) gatherMasterStats(acc telegraf.Accumulator) (
return masterNames, nil return masterNames, nil
} }
func (client *RedisSentinelClient) gatherReplicaStats(acc telegraf.Accumulator, masterName string) error { func (client *redisSentinelClient) gatherReplicaStats(acc telegraf.Accumulator, masterName string) error {
replicasCmd := redis.NewSliceCmd("sentinel", "replicas", masterName) replicasCmd := redis.NewSliceCmd("sentinel", "replicas", masterName)
if err := client.sentinel.Process(replicasCmd); err != nil { if err := client.sentinel.Process(replicasCmd); err != nil {
return err return err
@ -294,7 +288,7 @@ func (client *RedisSentinelClient) gatherReplicaStats(acc telegraf.Accumulator,
return nil return nil
} }
func (client *RedisSentinelClient) gatherSentinelStats(acc telegraf.Accumulator, masterName string) error { func (client *redisSentinelClient) gatherSentinelStats(acc telegraf.Accumulator, masterName string) error {
sentinelsCmd := redis.NewSliceCmd("sentinel", "sentinels", masterName) sentinelsCmd := redis.NewSliceCmd("sentinel", "sentinels", masterName)
if err := client.sentinel.Process(sentinelsCmd); err != nil { if err := client.sentinel.Process(sentinelsCmd); err != nil {
return err return err
@ -384,10 +378,7 @@ func convertSentinelReplicaOutput(
// convertSentinelInfoOutput parses `INFO` command output // convertSentinelInfoOutput parses `INFO` command output
// Largely copied from the Redis input plugin's gatherInfoOutput() // Largely copied from the Redis input plugin's gatherInfoOutput()
func convertSentinelInfoOutput( func convertSentinelInfoOutput(globalTags map[string]string, rdr io.Reader) (map[string]string, map[string]interface{}, error) {
globalTags map[string]string,
rdr io.Reader,
) (map[string]string, map[string]interface{}, error) {
scanner := bufio.NewScanner(rdr) scanner := bufio.NewScanner(rdr)
rawFields := make(map[string]string) rawFields := make(map[string]string)
@ -437,3 +428,9 @@ func convertSentinelInfoOutput(
return tags, fields, nil return tags, fields, nil
} }
func init() {
inputs.Add("redis_sentinel", func() telegraf.Input {
return &RedisSentinel{}
})
}

View File

@ -16,18 +16,16 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
type RethinkDB struct { var localhost = &server{url: &url.URL{Host: "127.0.0.1:28015"}}
Servers []string
}
var localhost = &Server{URL: &url.URL{Host: "127.0.0.1:28015"}} type RethinkDB struct {
Servers []string `toml:"servers"`
}
func (*RethinkDB) SampleConfig() string { func (*RethinkDB) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (r *RethinkDB) Gather(acc telegraf.Accumulator) error { func (r *RethinkDB) Gather(acc telegraf.Accumulator) error {
if len(r.Servers) == 0 { if len(r.Servers) == 0 {
return gatherServer(localhost, acc) return gatherServer(localhost, acc)
@ -47,7 +45,7 @@ func (r *RethinkDB) Gather(acc telegraf.Accumulator) error {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
acc.AddError(gatherServer(&Server{URL: u}, acc)) acc.AddError(gatherServer(&server{url: u}, acc))
}() }()
} }
@ -56,23 +54,23 @@ func (r *RethinkDB) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
func gatherServer(server *Server, acc telegraf.Accumulator) error { func gatherServer(server *server, acc telegraf.Accumulator) error {
var err error var err error
connectOpts := gorethink.ConnectOpts{ connectOpts := gorethink.ConnectOpts{
Address: server.URL.Host, Address: server.url.Host,
DiscoverHosts: false, DiscoverHosts: false,
} }
if server.URL.User != nil { if server.url.User != nil {
pwd, set := server.URL.User.Password() pwd, set := server.url.User.Password()
if set && pwd != "" { if set && pwd != "" {
connectOpts.AuthKey = pwd connectOpts.AuthKey = pwd
connectOpts.HandshakeVersion = gorethink.HandshakeV0_4 connectOpts.HandshakeVersion = gorethink.HandshakeV0_4
} }
} }
if server.URL.Scheme == "rethinkdb2" && server.URL.User != nil { if server.url.Scheme == "rethinkdb2" && server.url.User != nil {
pwd, set := server.URL.User.Password() pwd, set := server.url.User.Password()
if set && pwd != "" { if set && pwd != "" {
connectOpts.Username = server.URL.User.Username() connectOpts.Username = server.url.User.Username()
connectOpts.Password = pwd connectOpts.Password = pwd
connectOpts.HandshakeVersion = gorethink.HandshakeV1_0 connectOpts.HandshakeVersion = gorethink.HandshakeV1_0
} }

View File

@ -86,11 +86,7 @@ var engineStats = map[string]string{
"total_writes": "TotalWrites", "total_writes": "TotalWrites",
} }
func (e *engine) AddEngineStats( func (e *engine) addEngineStats(keys []string, acc telegraf.Accumulator, tags map[string]string) {
keys []string,
acc telegraf.Accumulator,
tags map[string]string,
) {
engine := reflect.ValueOf(e).Elem() engine := reflect.ValueOf(e).Elem()
fields := make(map[string]interface{}) fields := make(map[string]interface{})
for _, key := range keys { for _, key := range keys {
@ -99,7 +95,7 @@ func (e *engine) AddEngineStats(
acc.AddFields("rethinkdb_engine", fields, tags) acc.AddFields("rethinkdb_engine", fields, tags)
} }
func (s *storage) AddStats(acc telegraf.Accumulator, tags map[string]string) { func (s *storage) addStats(acc telegraf.Accumulator, tags map[string]string) {
fields := map[string]interface{}{ fields := map[string]interface{}{
"cache_bytes_in_use": s.Cache.BytesInUse, "cache_bytes_in_use": s.Cache.BytesInUse,
"disk_read_bytes_per_sec": s.Disk.ReadBytesPerSec, "disk_read_bytes_per_sec": s.Disk.ReadBytesPerSec,

View File

@ -34,7 +34,7 @@ func TestAddEngineStats(t *testing.T) {
"written_docs_per_sec", "written_docs_per_sec",
"total_writes", "total_writes",
} }
engine.AddEngineStats(keys, &acc, tags) engine.addEngineStats(keys, &acc, tags)
for _, metric := range keys { for _, metric := range keys {
require.True(t, acc.HasInt64Field("rethinkdb_engine", metric)) require.True(t, acc.HasInt64Field("rethinkdb_engine", metric))
@ -65,7 +65,7 @@ func TestAddEngineStatsPartial(t *testing.T) {
"total_reads", "total_reads",
"total_writes", "total_writes",
} }
engine.AddEngineStats(keys, &acc, tags) engine.addEngineStats(keys, &acc, tags)
for _, metric := range missingKeys { for _, metric := range missingKeys {
require.False(t, acc.HasInt64Field("rethinkdb", metric)) require.False(t, acc.HasInt64Field("rethinkdb", metric))
@ -105,7 +105,7 @@ func TestAddStorageStats(t *testing.T) {
"disk_usage_preallocated_bytes", "disk_usage_preallocated_bytes",
} }
storage.AddStats(&acc, tags) storage.addStats(&acc, tags)
for _, metric := range keys { for _, metric := range keys {
require.True(t, acc.HasInt64Field("rethinkdb", metric)) require.True(t, acc.HasInt64Field("rethinkdb", metric))

View File

@ -14,13 +14,13 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
) )
type Server struct { type server struct {
URL *url.URL url *url.URL
session *gorethink.Session session *gorethink.Session
serverStatus serverStatus serverStatus serverStatus
} }
func (s *Server) gatherData(acc telegraf.Accumulator) error { func (s *server) gatherData(acc telegraf.Accumulator) error {
if err := s.getServerStatus(); err != nil { if err := s.getServerStatus(); err != nil {
return fmt.Errorf("failed to get server_status: %w", err) return fmt.Errorf("failed to get server_status: %w", err)
} }
@ -44,7 +44,7 @@ func (s *Server) gatherData(acc telegraf.Accumulator) error {
return nil return nil
} }
func (s *Server) validateVersion() error { func (s *server) validateVersion() error {
if s.serverStatus.Process.Version == "" { if s.serverStatus.Process.Version == "" {
return errors.New("could not determine the RethinkDB server version: process.version key missing") return errors.New("could not determine the RethinkDB server version: process.version key missing")
} }
@ -62,7 +62,7 @@ func (s *Server) validateVersion() error {
return nil return nil
} }
func (s *Server) getServerStatus() error { func (s *server) getServerStatus() error {
cursor, err := gorethink.DB("rethinkdb").Table("server_status").Run(s.session) cursor, err := gorethink.DB("rethinkdb").Table("server_status").Run(s.session)
if err != nil { if err != nil {
return err return err
@ -77,9 +77,9 @@ func (s *Server) getServerStatus() error {
if err != nil { if err != nil {
return errors.New("could not parse server_status results") return errors.New("could not parse server_status results")
} }
host, port, err := net.SplitHostPort(s.URL.Host) host, port, err := net.SplitHostPort(s.url.Host)
if err != nil { if err != nil {
return fmt.Errorf("unable to determine provided hostname from %s", s.URL.Host) return fmt.Errorf("unable to determine provided hostname from %s", s.url.Host)
} }
driverPort, err := strconv.Atoi(port) driverPort, err := strconv.Atoi(port)
if err != nil { if err != nil {
@ -94,17 +94,17 @@ func (s *Server) getServerStatus() error {
} }
} }
return fmt.Errorf("unable to determine host id from server_status with %s", s.URL.Host) return fmt.Errorf("unable to determine host id from server_status with %s", s.url.Host)
} }
func (s *Server) getDefaultTags() map[string]string { func (s *server) getDefaultTags() map[string]string {
tags := make(map[string]string) tags := make(map[string]string)
tags["rethinkdb_host"] = s.URL.Host tags["rethinkdb_host"] = s.url.Host
tags["rethinkdb_hostname"] = s.serverStatus.Network.Hostname tags["rethinkdb_hostname"] = s.serverStatus.Network.Hostname
return tags return tags
} }
var ClusterTracking = []string{ var clusterTracking = []string{
"active_clients", "active_clients",
"clients", "clients",
"queries_per_sec", "queries_per_sec",
@ -112,7 +112,7 @@ var ClusterTracking = []string{
"written_docs_per_sec", "written_docs_per_sec",
} }
func (s *Server) addClusterStats(acc telegraf.Accumulator) error { func (s *server) addClusterStats(acc telegraf.Accumulator) error {
cursor, err := gorethink.DB("rethinkdb").Table("stats").Get([]string{"cluster"}).Run(s.session) cursor, err := gorethink.DB("rethinkdb").Table("stats").Get([]string{"cluster"}).Run(s.session)
if err != nil { if err != nil {
return fmt.Errorf("cluster stats query error: %w", err) return fmt.Errorf("cluster stats query error: %w", err)
@ -125,11 +125,11 @@ func (s *Server) addClusterStats(acc telegraf.Accumulator) error {
tags := s.getDefaultTags() tags := s.getDefaultTags()
tags["type"] = "cluster" tags["type"] = "cluster"
clusterStats.Engine.AddEngineStats(ClusterTracking, acc, tags) clusterStats.Engine.addEngineStats(clusterTracking, acc, tags)
return nil return nil
} }
var MemberTracking = []string{ var memberTracking = []string{
"active_clients", "active_clients",
"clients", "clients",
"queries_per_sec", "queries_per_sec",
@ -140,7 +140,7 @@ var MemberTracking = []string{
"total_writes", "total_writes",
} }
func (s *Server) addMemberStats(acc telegraf.Accumulator) error { func (s *server) addMemberStats(acc telegraf.Accumulator) error {
cursor, err := gorethink.DB("rethinkdb").Table("stats").Get([]string{"server", s.serverStatus.ID}).Run(s.session) cursor, err := gorethink.DB("rethinkdb").Table("stats").Get([]string{"server", s.serverStatus.ID}).Run(s.session)
if err != nil { if err != nil {
return fmt.Errorf("member stats query error: %w", err) return fmt.Errorf("member stats query error: %w", err)
@ -153,18 +153,18 @@ func (s *Server) addMemberStats(acc telegraf.Accumulator) error {
tags := s.getDefaultTags() tags := s.getDefaultTags()
tags["type"] = "member" tags["type"] = "member"
memberStats.Engine.AddEngineStats(MemberTracking, acc, tags) memberStats.Engine.addEngineStats(memberTracking, acc, tags)
return nil return nil
} }
var TableTracking = []string{ var tableTracking = []string{
"read_docs_per_sec", "read_docs_per_sec",
"total_reads", "total_reads",
"written_docs_per_sec", "written_docs_per_sec",
"total_writes", "total_writes",
} }
func (s *Server) addTablesStats(acc telegraf.Accumulator) error { func (s *server) addTablesStats(acc telegraf.Accumulator) error {
tablesCursor, err := gorethink.DB("rethinkdb").Table("table_status").Run(s.session) tablesCursor, err := gorethink.DB("rethinkdb").Table("table_status").Run(s.session)
if err != nil { if err != nil {
return fmt.Errorf("table stats query error: %w", err) return fmt.Errorf("table stats query error: %w", err)
@ -185,7 +185,7 @@ func (s *Server) addTablesStats(acc telegraf.Accumulator) error {
return nil return nil
} }
func (s *Server) addTableStats(acc telegraf.Accumulator, table tableStatus) error { func (s *server) addTableStats(acc telegraf.Accumulator, table tableStatus) error {
cursor, err := gorethink.DB("rethinkdb").Table("stats"). cursor, err := gorethink.DB("rethinkdb").Table("stats").
Get([]string{"table_server", table.ID, s.serverStatus.ID}). Get([]string{"table_server", table.ID, s.serverStatus.ID}).
Run(s.session) Run(s.session)
@ -202,8 +202,8 @@ func (s *Server) addTableStats(acc telegraf.Accumulator, table tableStatus) erro
tags := s.getDefaultTags() tags := s.getDefaultTags()
tags["type"] = "data" tags["type"] = "data"
tags["ns"] = fmt.Sprintf("%s.%s", table.DB, table.Name) tags["ns"] = fmt.Sprintf("%s.%s", table.DB, table.Name)
ts.Engine.AddEngineStats(TableTracking, acc, tags) ts.Engine.addEngineStats(tableTracking, acc, tags)
ts.Storage.AddStats(acc, tags) ts.Storage.addStats(acc, tags)
return nil return nil
} }

View File

@ -37,7 +37,7 @@ func TestAddClusterStats(t *testing.T) {
err := server.addClusterStats(&acc) err := server.addClusterStats(&acc)
require.NoError(t, err) require.NoError(t, err)
for _, metric := range ClusterTracking { for _, metric := range clusterTracking {
require.True(t, acc.HasIntValue(metric)) require.True(t, acc.HasIntValue(metric))
} }
} }
@ -48,7 +48,7 @@ func TestAddMemberStats(t *testing.T) {
err := server.addMemberStats(&acc) err := server.addMemberStats(&acc)
require.NoError(t, err) require.NoError(t, err)
for _, metric := range MemberTracking { for _, metric := range memberTracking {
require.True(t, acc.HasIntValue(metric)) require.True(t, acc.HasIntValue(metric))
} }
} }
@ -59,7 +59,7 @@ func TestAddTableStats(t *testing.T) {
err := server.addTableStats(&acc) err := server.addTableStats(&acc)
require.NoError(t, err) require.NoError(t, err)
for _, metric := range TableTracking { for _, metric := range tableTracking {
require.True(t, acc.HasIntValue(metric)) require.True(t, acc.HasIntValue(metric))
} }

View File

@ -14,7 +14,7 @@ import (
) )
var connect_url, authKey, username, password string var connect_url, authKey, username, password string
var server *Server var server *server
func init() { func init() {
connect_url = os.Getenv("RETHINKDB_URL") connect_url = os.Getenv("RETHINKDB_URL")
@ -28,18 +28,18 @@ func init() {
func testSetup(m *testing.M) { func testSetup(m *testing.M) {
var err error var err error
server = &Server{URL: &url.URL{Host: connect_url}} server = &server{url: &url.URL{Host: connect_url}}
if authKey { if authKey {
server.session, _ = gorethink.Connect(gorethink.ConnectOpts{ server.session, _ = gorethink.Connect(gorethink.ConnectOpts{
Address: server.URL.Host, Address: server.url.Host,
AuthKey: authKey, AuthKey: authKey,
HandshakeVersion: gorethink.HandshakeV0_4, HandshakeVersion: gorethink.HandshakeV0_4,
DiscoverHosts: false, DiscoverHosts: false,
}) })
} else { } else {
server.session, _ = gorethink.Connect(gorethink.ConnectOpts{ server.session, _ = gorethink.Connect(gorethink.ConnectOpts{
Address: server.URL.Host, Address: server.url.Host,
Username: username, Username: username,
Password: password, Password: password,
HandshakeVersion: gorethink.HandshakeV1_0, HandshakeVersion: gorethink.HandshakeV1_0,

View File

@ -16,25 +16,14 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
// Type Riak gathers statistics from one or more Riak instances
type Riak struct { type Riak struct {
// Servers is a slice of servers as http addresses (ex. http://127.0.0.1:8098) // Servers is a slice of servers as http addresses (ex. http://127.0.0.1:8098)
Servers []string Servers []string `toml:"servers"`
client *http.Client client *http.Client
} }
// NewRiak return a new instance of Riak with a default http client // Type riakStats represents the data received from Riak
func NewRiak() *Riak {
tr := &http.Transport{ResponseHeaderTimeout: 3 * time.Second}
client := &http.Client{
Transport: tr,
Timeout: 4 * time.Second,
}
return &Riak{client: client}
}
// Type riakStats represents the data that is received from Riak
type riakStats struct { type riakStats struct {
CPUAvg1 int64 `json:"cpu_avg1"` CPUAvg1 int64 `json:"cpu_avg1"`
CPUAvg15 int64 `json:"cpu_avg15"` CPUAvg15 int64 `json:"cpu_avg15"`
@ -88,7 +77,6 @@ func (*Riak) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// Reads stats from all configured servers.
func (r *Riak) Gather(acc telegraf.Accumulator) error { func (r *Riak) Gather(acc telegraf.Accumulator) error {
// Default to a single server at localhost (default port) if none specified // Default to a single server at localhost (default port) if none specified
if len(r.Servers) == 0 { if len(r.Servers) == 0 {
@ -103,7 +91,6 @@ func (r *Riak) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
// Gathers stats from a single server, adding them to the accumulator
func (r *Riak) gatherServer(s string, acc telegraf.Accumulator) error { func (r *Riak) gatherServer(s string, acc telegraf.Accumulator) error {
// Parse the given URL to extract the server tag // Parse the given URL to extract the server tag
u, err := url.Parse(s) u, err := url.Parse(s)
@ -190,8 +177,18 @@ func (r *Riak) gatherServer(s string, acc telegraf.Accumulator) error {
return nil return nil
} }
// newRiak return a new instance of Riak with a default http client
func newRiak() *Riak {
tr := &http.Transport{ResponseHeaderTimeout: 3 * time.Second}
client := &http.Client{
Transport: tr,
Timeout: 4 * time.Second,
}
return &Riak{client: client}
}
func init() { func init() {
inputs.Add("riak", func() telegraf.Input { inputs.Add("riak", func() telegraf.Input {
return NewRiak() return newRiak()
}) })
} }

View File

@ -29,7 +29,7 @@ func TestRiak(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// Create a new Riak instance with our given test server // Create a new Riak instance with our given test server
riak := NewRiak() riak := newRiak()
riak.Servers = []string{ts.URL} riak.Servers = []string{ts.URL}
// Create a test accumulator // Create a test accumulator

View File

@ -39,12 +39,12 @@ type RiemannSocketListener struct {
SocketMode string `toml:"socket_mode"` SocketMode string `toml:"socket_mode"`
common_tls.ServerConfig common_tls.ServerConfig
wg sync.WaitGroup
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
wg sync.WaitGroup
telegraf.Accumulator telegraf.Accumulator
} }
type setReadBufferer interface { type setReadBufferer interface {
SetReadBuffer(sizeInBytes int) error SetReadBuffer(sizeInBytes int) error
} }
@ -59,6 +59,73 @@ type riemannListener struct {
connectionsMtx sync.Mutex connectionsMtx sync.Mutex
} }
func (*RiemannSocketListener) SampleConfig() string {
return sampleConfig
}
func (rsl *RiemannSocketListener) Start(acc telegraf.Accumulator) error {
ctx, cancelFunc := context.WithCancel(context.Background())
go rsl.processOsSignals(cancelFunc)
rsl.Accumulator = acc
if rsl.ServiceAddress == "" {
rsl.Log.Warnf("Using default service_address tcp://:5555")
rsl.ServiceAddress = "tcp://:5555"
}
spl := strings.SplitN(rsl.ServiceAddress, "://", 2)
if len(spl) != 2 {
return fmt.Errorf("invalid service address: %s", rsl.ServiceAddress)
}
protocol := spl[0]
addr := spl[1]
switch protocol {
case "tcp", "tcp4", "tcp6":
tlsCfg, err := rsl.ServerConfig.TLSConfig()
if err != nil {
return err
}
var l net.Listener
if tlsCfg == nil {
l, err = net.Listen(protocol, addr)
} else {
l, err = tls.Listen(protocol, addr, tlsCfg)
}
if err != nil {
return err
}
rsl.Log.Infof("Listening on %s://%s", protocol, l.Addr())
rsl := &riemannListener{
Listener: l,
RiemannSocketListener: rsl,
sockType: spl[0],
}
rsl.wg = sync.WaitGroup{}
rsl.wg.Add(1)
go func() {
defer rsl.wg.Done()
rsl.listen(ctx)
}()
default:
return fmt.Errorf("unknown protocol %q in %q", protocol, rsl.ServiceAddress)
}
return nil
}
func (*RiemannSocketListener) Gather(telegraf.Accumulator) error {
return nil
}
func (rsl *RiemannSocketListener) Stop() {
rsl.wg.Done()
rsl.wg.Wait()
}
func (rsl *riemannListener) listen(ctx context.Context) { func (rsl *riemannListener) listen(ctx context.Context) {
rsl.connections = make(map[string]net.Conn) rsl.connections = make(map[string]net.Conn)
@ -148,8 +215,6 @@ func (rsl *riemannListener) removeConnection(c net.Conn) {
rsl.connectionsMtx.Unlock() rsl.connectionsMtx.Unlock()
} }
// Utilities
/* /*
readMessages will read Riemann messages in binary format readMessages will read Riemann messages in binary format
from the TCP connection. byte Array p size will depend on the size from the TCP connection. byte Array p size will depend on the size
@ -271,68 +336,6 @@ func (rsl *riemannListener) riemannReturnErrorResponse(conn net.Conn, errorMessa
} }
} }
func (*RiemannSocketListener) SampleConfig() string {
return sampleConfig
}
func (*RiemannSocketListener) Gather(telegraf.Accumulator) error {
return nil
}
func (rsl *RiemannSocketListener) Start(acc telegraf.Accumulator) error {
ctx, cancelFunc := context.WithCancel(context.Background())
go rsl.processOsSignals(cancelFunc)
rsl.Accumulator = acc
if rsl.ServiceAddress == "" {
rsl.Log.Warnf("Using default service_address tcp://:5555")
rsl.ServiceAddress = "tcp://:5555"
}
spl := strings.SplitN(rsl.ServiceAddress, "://", 2)
if len(spl) != 2 {
return fmt.Errorf("invalid service address: %s", rsl.ServiceAddress)
}
protocol := spl[0]
addr := spl[1]
switch protocol {
case "tcp", "tcp4", "tcp6":
tlsCfg, err := rsl.ServerConfig.TLSConfig()
if err != nil {
return err
}
var l net.Listener
if tlsCfg == nil {
l, err = net.Listen(protocol, addr)
} else {
l, err = tls.Listen(protocol, addr, tlsCfg)
}
if err != nil {
return err
}
rsl.Log.Infof("Listening on %s://%s", protocol, l.Addr())
rsl := &riemannListener{
Listener: l,
RiemannSocketListener: rsl,
sockType: spl[0],
}
rsl.wg = sync.WaitGroup{}
rsl.wg.Add(1)
go func() {
defer rsl.wg.Done()
rsl.listen(ctx)
}()
default:
return fmt.Errorf("unknown protocol %q in %q", protocol, rsl.ServiceAddress)
}
return nil
}
// Handle cancellations from the process // Handle cancellations from the process
func (rsl *RiemannSocketListener) processOsSignals(cancelFunc context.CancelFunc) { func (rsl *RiemannSocketListener) processOsSignals(cancelFunc context.CancelFunc) {
signalChan := make(chan os.Signal, 1) signalChan := make(chan os.Signal, 1)
@ -347,11 +350,6 @@ func (rsl *RiemannSocketListener) processOsSignals(cancelFunc context.CancelFunc
} }
} }
func (rsl *RiemannSocketListener) Stop() {
rsl.wg.Done()
rsl.wg.Wait()
}
func newRiemannSocketListener() *RiemannSocketListener { func newRiemannSocketListener() *RiemannSocketListener {
return &RiemannSocketListener{} return &RiemannSocketListener{}
} }