chore: Fix linter findings for `revive:exported` in `plugins/inputs/m*` (#16191)

This commit is contained in:
Paweł Żak 2024-11-18 12:27:17 +01:00 committed by GitHub
parent d075815f29
commit 8a7947abbb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
44 changed files with 923 additions and 1014 deletions

View File

@ -22,7 +22,7 @@ const (
var mailchimpDatacenter = regexp.MustCompile("[a-z]+[0-9]+$")
type chimpAPI struct {
Transport http.RoundTripper
transport http.RoundTripper
debug bool
sync.Mutex
@ -32,30 +32,30 @@ type chimpAPI struct {
}
type reportsParams struct {
Count string
Offset string
SinceSendTime string
BeforeSendTime string
count string
offset string
sinceSendTime string
beforeSendTime string
}
func (p *reportsParams) String() string {
v := url.Values{}
if p.Count != "" {
v.Set("count", p.Count)
if p.count != "" {
v.Set("count", p.count)
}
if p.Offset != "" {
v.Set("offset", p.Offset)
if p.offset != "" {
v.Set("offset", p.offset)
}
if p.BeforeSendTime != "" {
v.Set("before_send_time", p.BeforeSendTime)
if p.beforeSendTime != "" {
v.Set("before_send_time", p.beforeSendTime)
}
if p.SinceSendTime != "" {
v.Set("since_send_time", p.SinceSendTime)
if p.sinceSendTime != "" {
v.Set("since_send_time", p.sinceSendTime)
}
return v.Encode()
}
func NewChimpAPI(apiKey string, log telegraf.Logger) *chimpAPI {
func newChimpAPI(apiKey string, log telegraf.Logger) *chimpAPI {
u := &url.URL{}
u.Scheme = "https"
u.Host = mailchimpDatacenter.FindString(apiKey) + ".api.mailchimp.com"
@ -86,7 +86,7 @@ func chimpErrorCheck(body []byte) error {
return nil
}
func (a *chimpAPI) GetReports(params reportsParams) (reportsResponse, error) {
func (a *chimpAPI) getReports(params reportsParams) (reportsResponse, error) {
a.Lock()
defer a.Unlock()
a.url.Path = reportsEndpoint
@ -105,7 +105,7 @@ func (a *chimpAPI) GetReports(params reportsParams) (reportsResponse, error) {
return response, nil
}
func (a *chimpAPI) GetReport(campaignID string) (report, error) {
func (a *chimpAPI) getReport(campaignID string) (report, error) {
a.Lock()
defer a.Unlock()
a.url.Path = fmt.Sprintf(reportsEndpointCampaign, campaignID)
@ -126,7 +126,7 @@ func (a *chimpAPI) GetReport(campaignID string) (report, error) {
func (a *chimpAPI) runChimp(params reportsParams) ([]byte, error) {
client := &http.Client{
Transport: a.Transport,
Transport: a.transport,
Timeout: 4 * time.Second,
}

View File

@ -14,13 +14,12 @@ import (
var sampleConfig string
type MailChimp struct {
api *chimpAPI
APIKey string `toml:"api_key"`
DaysOld int `toml:"days_old"`
CampaignID string `toml:"campaign_id"`
Log telegraf.Logger `toml:"-"`
api *chimpAPI
}
func (*MailChimp) SampleConfig() string {
@ -28,7 +27,7 @@ func (*MailChimp) SampleConfig() string {
}
func (m *MailChimp) Init() error {
m.api = NewChimpAPI(m.APIKey, m.Log)
m.api = newChimpAPI(m.APIKey, m.Log)
return nil
}
@ -45,8 +44,8 @@ func (m *MailChimp) Gather(acc telegraf.Accumulator) error {
since = now.Add(-d).Format(time.RFC3339)
}
reports, err := m.api.GetReports(reportsParams{
SinceSendTime: since,
reports, err := m.api.getReports(reportsParams{
sinceSendTime: since,
})
if err != nil {
return err
@ -57,7 +56,7 @@ func (m *MailChimp) Gather(acc telegraf.Accumulator) error {
gatherReport(acc, report, now)
}
} else {
report, err := m.api.GetReport(m.CampaignID)
report, err := m.api.getReport(m.CampaignID)
if err != nil {
return err
}

View File

@ -19,69 +19,69 @@ import (
//go:embed sample.conf
var sampleConfig string
// Marklogic configuration toml
const (
// MarkLogic v2 management api endpoints for hosts status
statsPath = "/manage/v2/hosts/"
viewFormat = "view=status&format=json"
)
type Marklogic struct {
URL string `toml:"url"`
Hosts []string `toml:"hosts"`
Username string `toml:"username"`
Password string `toml:"password"`
Sources []string
tls.ClientConfig
client *http.Client
sources []string
}
type MlPointInt struct {
type mlPointInt struct {
Value int `json:"value"`
}
type MlPointFloat struct {
type mlPointFloat struct {
Value float64 `json:"value"`
}
type MlPointBool struct {
type mlPointBool struct {
Value bool `json:"value"`
}
// MarkLogic v2 management api endpoints for hosts status
const statsPath = "/manage/v2/hosts/"
const viewFormat = "view=status&format=json"
type MlHost struct {
type mlHost struct {
HostStatus struct {
ID string `json:"id"`
Name string `json:"name"`
StatusProperties struct {
Online MlPointBool `json:"online"`
Online mlPointBool `json:"online"`
LoadProperties struct {
TotalLoad MlPointFloat `json:"total-load"`
TotalLoad mlPointFloat `json:"total-load"`
} `json:"load-properties"`
RateProperties struct {
TotalRate MlPointFloat `json:"total-rate"`
TotalRate mlPointFloat `json:"total-rate"`
} `json:"rate-properties"`
StatusDetail struct {
Cpus MlPointInt `json:"cpus"`
Cores MlPointInt `json:"cores"`
Cpus mlPointInt `json:"cpus"`
Cores mlPointInt `json:"cores"`
TotalCPUStatUser float64 `json:"total-cpu-stat-user"`
TotalCPUStatSystem float64 `json:"total-cpu-stat-system"`
TotalCPUStatIdle float64 `json:"total-cpu-stat-idle"`
TotalCPUStatIowait float64 `json:"total-cpu-stat-iowait"`
MemoryProcessSize MlPointInt `json:"memory-process-size"`
MemoryProcessRss MlPointInt `json:"memory-process-rss"`
MemorySystemTotal MlPointInt `json:"memory-system-total"`
MemorySystemFree MlPointInt `json:"memory-system-free"`
MemoryProcessSwapSize MlPointInt `json:"memory-process-swap-size"`
MemorySize MlPointInt `json:"memory-size"`
HostSize MlPointInt `json:"host-size"`
LogDeviceSpace MlPointInt `json:"log-device-space"`
DataDirSpace MlPointInt `json:"data-dir-space"`
QueryReadBytes MlPointInt `json:"query-read-bytes"`
QueryReadLoad MlPointInt `json:"query-read-load"`
MergeReadLoad MlPointInt `json:"merge-read-load"`
MergeWriteLoad MlPointInt `json:"merge-write-load"`
HTTPServerReceiveBytes MlPointInt `json:"http-server-receive-bytes"`
HTTPServerSendBytes MlPointInt `json:"http-server-send-bytes"`
MemoryProcessSize mlPointInt `json:"memory-process-size"`
MemoryProcessRss mlPointInt `json:"memory-process-rss"`
MemorySystemTotal mlPointInt `json:"memory-system-total"`
MemorySystemFree mlPointInt `json:"memory-system-free"`
MemoryProcessSwapSize mlPointInt `json:"memory-process-swap-size"`
MemorySize mlPointInt `json:"memory-size"`
HostSize mlPointInt `json:"host-size"`
LogDeviceSpace mlPointInt `json:"log-device-space"`
DataDirSpace mlPointInt `json:"data-dir-space"`
QueryReadBytes mlPointInt `json:"query-read-bytes"`
QueryReadLoad mlPointInt `json:"query-read-load"`
MergeReadLoad mlPointInt `json:"merge-read-load"`
MergeWriteLoad mlPointInt `json:"merge-write-load"`
HTTPServerReceiveBytes mlPointInt `json:"http-server-receive-bytes"`
HTTPServerSendBytes mlPointInt `json:"http-server-send-bytes"`
} `json:"status-detail"`
} `json:"status-properties"`
} `json:"host-status"`
@ -91,7 +91,6 @@ func (*Marklogic) SampleConfig() string {
return sampleConfig
}
// Init parse all source URLs and place on the Marklogic struct
func (c *Marklogic) Init() error {
if len(c.URL) == 0 {
c.URL = "http://localhost:8002/"
@ -108,12 +107,11 @@ func (c *Marklogic) Init() error {
addr.RawQuery = viewFormat
u := addr.String()
c.Sources = append(c.Sources, u)
c.sources = append(c.sources, u)
}
return nil
}
// Gather metrics from HTTP Server.
func (c *Marklogic) Gather(accumulator telegraf.Accumulator) error {
var wg sync.WaitGroup
@ -127,7 +125,7 @@ func (c *Marklogic) Gather(accumulator telegraf.Accumulator) error {
}
// Range over all source URL's appended to the struct
for _, serv := range c.Sources {
for _, serv := range c.sources {
wg.Add(1)
go func(serv string) {
defer wg.Done()
@ -143,7 +141,7 @@ func (c *Marklogic) Gather(accumulator telegraf.Accumulator) error {
}
func (c *Marklogic) fetchAndInsertData(acc telegraf.Accumulator, address string) error {
ml := &MlHost{}
ml := &mlHost{}
if err := c.gatherJSONData(address, ml); err != nil {
return err
}

View File

@ -33,7 +33,7 @@ func TestMarklogic(t *testing.T) {
ml := &Marklogic{
Hosts: []string{"example1"},
URL: ts.URL,
// Sources: []string{"http://localhost:8002/manage/v2/hosts/hostname1?view=status&format=json"},
// sources: []string{"http://localhost:8002/manage/v2/hosts/hostname1?view=status&format=json"},
}
// Create a test accumulator

View File

@ -21,12 +21,6 @@ import (
//go:embed sample.conf
var sampleConfig string
// Mcrouter is a mcrouter plugin
type Mcrouter struct {
Servers []string
Timeout config.Duration
}
// enum for statType
type statType int
@ -35,15 +29,14 @@ const (
typeFloat statType = iota
)
var defaultTimeout = 5 * time.Second
var defaultServerURL = url.URL{
var (
defaultTimeout = 5 * time.Second
defaultServerURL = url.URL{
Scheme: "tcp",
Host: "localhost:11211",
}
// The list of metrics that should be sent
var sendMetrics = map[string]statType{
sendMetrics = map[string]statType{
"uptime": typeInt,
"num_servers": typeInt,
"num_servers_new": typeInt,
@ -109,12 +102,17 @@ var sendMetrics = map[string]statType{
"cmd_delete_out_all": typeInt,
"cmd_lease_set_out_all": typeInt,
}
)
type Mcrouter struct {
Servers []string `toml:"servers"`
Timeout config.Duration `toml:"timeout"`
}
func (*Mcrouter) SampleConfig() string {
return sampleConfig
}
// Gather reads stats from all configured servers accumulates stats
func (m *Mcrouter) Gather(acc telegraf.Accumulator) error {
ctx := context.Background()
@ -136,8 +134,8 @@ func (m *Mcrouter) Gather(acc telegraf.Accumulator) error {
return nil
}
// ParseAddress parses an address string into 'host:port' and 'protocol' parts
func (m *Mcrouter) ParseAddress(address string) (parsedAddress, protocol string, err error) {
// parseAddress parses an address string into 'host:port' and 'protocol' parts
func (m *Mcrouter) parseAddress(address string) (parsedAddress, protocol string, err error) {
var host string
var port string
@ -189,7 +187,7 @@ func (m *Mcrouter) gatherServer(ctx context.Context, address string, acc telegra
var protocol string
var dialer net.Dialer
address, protocol, err = m.ParseAddress(address)
address, protocol, err = m.parseAddress(address)
if err != nil {
return err
}

View File

@ -32,7 +32,7 @@ func TestAddressParsing(t *testing.T) {
}
for _, args := range acceptTests {
address, protocol, err := m.ParseAddress(args[0])
address, protocol, err := m.parseAddress(args[0])
require.NoError(t, err, args[0])
require.Equal(t, args[1], address, args[0])
@ -40,7 +40,7 @@ func TestAddressParsing(t *testing.T) {
}
for _, addr := range rejectTests {
address, protocol, err := m.ParseAddress(addr)
address, protocol, err := m.parseAddress(addr)
require.Error(t, err, addr)
require.Empty(t, address, addr)

View File

@ -44,6 +44,10 @@ var (
componentDeviceRE = regexp.MustCompile(`(.*)\[\d+\]`)
)
type Mdstat struct {
FileName string `toml:"file_name"`
}
type statusLine struct {
active int64
total int64
@ -58,10 +62,6 @@ type recoveryLine struct {
speed float64
}
type MdstatConf struct {
FileName string `toml:"file_name"`
}
func evalStatusLine(deviceLine, statusLineStr string) (statusLine, error) {
sizeFields := strings.Fields(statusLineStr)
if len(sizeFields) < 1 {
@ -173,11 +173,11 @@ func evalComponentDevices(deviceFields []string) string {
return strings.Join(mdComponentDevices, ",")
}
func (*MdstatConf) SampleConfig() string {
func (*Mdstat) SampleConfig() string {
return sampleConfig
}
func (k *MdstatConf) Gather(acc telegraf.Accumulator) error {
func (k *Mdstat) Gather(acc telegraf.Accumulator) error {
data, err := k.getProcMdstat()
if err != nil {
return err
@ -267,7 +267,7 @@ func (k *MdstatConf) Gather(acc telegraf.Accumulator) error {
return nil
}
func (k *MdstatConf) getProcMdstat() ([]byte, error) {
func (k *Mdstat) getProcMdstat() ([]byte, error) {
var mdStatFile string
if k.FileName == "" {
mdStatFile = internal.GetProcPath() + "/mdstat"
@ -289,5 +289,5 @@ func (k *MdstatConf) getProcMdstat() ([]byte, error) {
}
func init() {
inputs.Add("mdstat", func() telegraf.Input { return &MdstatConf{} })
inputs.Add("mdstat", func() telegraf.Input { return &Mdstat{} })
}

View File

@ -17,12 +17,14 @@ type Mdstat struct {
Log telegraf.Logger `toml:"-"`
}
func (*Mdstat) SampleConfig() string { return sampleConfig }
func (m *Mdstat) Init() error {
m.Log.Warn("current platform is not supported")
m.Log.Warn("Current platform is not supported")
return nil
}
func (*Mdstat) SampleConfig() string { return sampleConfig }
func (*Mdstat) Gather(_ telegraf.Accumulator) error { return nil }
func (*Mdstat) Gather(telegraf.Accumulator) error { return nil }
func init() {
inputs.Add("mdstat", func() telegraf.Input {

View File

@ -14,7 +14,7 @@ import (
func TestFullMdstatProcFile(t *testing.T) {
filename := makeFakeMDStatFile([]byte(mdStatFileFull))
defer os.Remove(filename)
k := MdstatConf{
k := Mdstat{
FileName: filename,
}
acc := testutil.Accumulator{}
@ -39,7 +39,7 @@ func TestFullMdstatProcFile(t *testing.T) {
func TestMdstatSyncStart(t *testing.T) {
filename := makeFakeMDStatFile([]byte(mdStatSyncStart))
defer os.Remove(filename)
k := MdstatConf{
k := Mdstat{
FileName: filename,
}
acc := testutil.Accumulator{}
@ -65,7 +65,7 @@ func TestFailedDiskMdStatProcFile1(t *testing.T) {
filename := makeFakeMDStatFile([]byte(mdStatFileFailedDisk))
defer os.Remove(filename)
k := MdstatConf{
k := Mdstat{
FileName: filename,
}
@ -92,7 +92,7 @@ func TestEmptyMdStatProcFile1(t *testing.T) {
filename := makeFakeMDStatFile([]byte(mdStatFileEmpty))
defer os.Remove(filename)
k := MdstatConf{
k := Mdstat{
FileName: filename,
}
@ -105,7 +105,7 @@ func TestInvalidMdStatProcFile1(t *testing.T) {
filename := makeFakeMDStatFile([]byte(mdStatFileInvalid))
defer os.Remove(filename)
k := MdstatConf{
k := Mdstat{
FileName: filename,
}

View File

@ -14,21 +14,21 @@ import (
//go:embed sample.conf
var sampleConfig string
type MemStats struct {
type Mem struct {
ps system.PS
platform string
}
func (*MemStats) SampleConfig() string {
func (*Mem) SampleConfig() string {
return sampleConfig
}
func (ms *MemStats) Init() error {
func (ms *Mem) Init() error {
ms.platform = runtime.GOOS
return nil
}
func (ms *MemStats) Gather(acc telegraf.Accumulator) error {
func (ms *Mem) Gather(acc telegraf.Accumulator) error {
vm, err := ms.ps.VMStat()
if err != nil {
return fmt.Errorf("error getting virtual memory info: %w", err)
@ -102,6 +102,6 @@ func (ms *MemStats) Gather(acc telegraf.Accumulator) error {
func init() {
ps := system.NewSystemPS()
inputs.Add("mem", func() telegraf.Input {
return &MemStats{ps: ps}
return &Mem{ps: ps}
})
}

View File

@ -4,11 +4,12 @@ import (
"testing"
"time"
"github.com/shirou/gopsutil/v4/mem"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs/system"
"github.com/influxdata/telegraf/testutil"
"github.com/shirou/gopsutil/v4/mem"
"github.com/stretchr/testify/require"
)
func TestMemStats(t *testing.T) {
@ -55,7 +56,7 @@ func TestMemStats(t *testing.T) {
}
mps.On("VMStat").Return(vms, nil)
plugin := &MemStats{ps: &mps}
plugin := &Mem{ps: &mps}
err = plugin.Init()
require.NoError(t, err)

View File

@ -22,18 +22,11 @@ import (
//go:embed sample.conf
var sampleConfig string
// Memcached is a memcached plugin
type Memcached struct {
Servers []string `toml:"servers"`
UnixSockets []string `toml:"unix_sockets"`
EnableTLS bool `toml:"enable_tls"`
common_tls.ClientConfig
}
var defaultTimeout = 5 * time.Second
var (
defaultTimeout = 5 * time.Second
// The list of metrics that should be sent
var sendMetrics = []string{
sendMetrics = []string{
"accepting_conns",
"auth_cmds",
"auth_errors",
@ -103,12 +96,19 @@ var sendMetrics = []string{
"touch_misses",
"uptime",
}
)
type Memcached struct {
Servers []string `toml:"servers"`
UnixSockets []string `toml:"unix_sockets"`
EnableTLS bool `toml:"enable_tls"`
common_tls.ClientConfig
}
func (*Memcached) SampleConfig() string {
return sampleConfig
}
// Gather reads stats from all configured servers accumulates stats
func (m *Memcached) Gather(acc telegraf.Accumulator) error {
if len(m.Servers) == 0 && len(m.UnixSockets) == 0 {
return m.gatherServer(":11211", false, acc)
@ -125,11 +125,7 @@ func (m *Memcached) Gather(acc telegraf.Accumulator) error {
return nil
}
func (m *Memcached) gatherServer(
address string,
unix bool,
acc telegraf.Accumulator,
) error {
func (m *Memcached) gatherServer(address string, unix bool, acc telegraf.Accumulator) error {
var conn net.Conn
var err error
var dialer proxy.Dialer

View File

@ -23,22 +23,27 @@ import (
//go:embed sample.conf
var sampleConfig string
type Role string
type role string
const (
MASTER Role = "master"
SLAVE Role = "slave"
master role = "master"
slave role = "slave"
)
var allMetrics = map[role][]string{
master: {"resources", "master", "system", "agents", "frameworks", "framework_offers", "tasks", "messages", "evqueue", "registrar", "allocator"},
slave: {"resources", "agent", "system", "executors", "tasks", "messages"},
}
type Mesos struct {
Timeout int
Masters []string
Timeout int `toml:"timeout"`
Masters []string `toml:"masters"`
MasterCols []string `toml:"master_collections"`
Slaves []string
Slaves []string `toml:"slaves"`
SlaveCols []string `toml:"slave_collections"`
tls.ClientConfig
Log telegraf.Logger
Log telegraf.Logger `toml:"-"`
initialized bool
client *http.Client
@ -46,21 +51,52 @@ type Mesos struct {
slaveURLs []*url.URL
}
var allMetrics = map[Role][]string{
MASTER: {"resources", "master", "system", "agents", "frameworks", "framework_offers", "tasks", "messages", "evqueue", "registrar", "allocator"},
SLAVE: {"resources", "agent", "system", "executors", "tasks", "messages"},
func (*Mesos) SampleConfig() string {
return sampleConfig
}
func (m *Mesos) parseURL(s string, role Role) (*url.URL, error) {
func (m *Mesos) Gather(acc telegraf.Accumulator) error {
if !m.initialized {
err := m.initialize()
if err != nil {
return err
}
m.initialized = true
}
var wg sync.WaitGroup
for _, mstr := range m.masterURLs {
wg.Add(1)
go func(mstr *url.URL) {
acc.AddError(m.gatherMainMetrics(mstr, master, acc))
wg.Done()
}(mstr)
}
for _, slv := range m.slaveURLs {
wg.Add(1)
go func(slv *url.URL) {
acc.AddError(m.gatherMainMetrics(slv, slave, acc))
wg.Done()
}(slv)
}
wg.Wait()
return nil
}
func (m *Mesos) parseURL(s string, role role) (*url.URL, error) {
if !strings.HasPrefix(s, "http://") && !strings.HasPrefix(s, "https://") {
host, port, err := net.SplitHostPort(s)
// no port specified
if err != nil {
host = s
switch role {
case MASTER:
case master:
port = "5050"
case SLAVE:
case slave:
port = "5051"
}
}
@ -74,11 +110,11 @@ func (m *Mesos) parseURL(s string, role Role) (*url.URL, error) {
func (m *Mesos) initialize() error {
if len(m.MasterCols) == 0 {
m.MasterCols = allMetrics[MASTER]
m.MasterCols = allMetrics[master]
}
if len(m.SlaveCols) == 0 {
m.SlaveCols = allMetrics[SLAVE]
m.SlaveCols = allMetrics[slave]
}
if m.Timeout == 0 {
@ -89,8 +125,8 @@ func (m *Mesos) initialize() error {
rawQuery := "timeout=" + strconv.Itoa(m.Timeout) + "ms"
m.masterURLs = make([]*url.URL, 0, len(m.Masters))
for _, master := range m.Masters {
u, err := m.parseURL(master, MASTER)
for _, mstr := range m.Masters {
u, err := m.parseURL(mstr, master)
if err != nil {
return err
}
@ -100,8 +136,8 @@ func (m *Mesos) initialize() error {
}
m.slaveURLs = make([]*url.URL, 0, len(m.Slaves))
for _, slave := range m.Slaves {
u, err := m.parseURL(slave, SLAVE)
for _, slv := range m.Slaves {
u, err := m.parseURL(slv, slave)
if err != nil {
return err
}
@ -119,43 +155,6 @@ func (m *Mesos) initialize() error {
return nil
}
func (*Mesos) SampleConfig() string {
return sampleConfig
}
// Gather() metrics from given list of Mesos Masters
func (m *Mesos) Gather(acc telegraf.Accumulator) error {
if !m.initialized {
err := m.initialize()
if err != nil {
return err
}
m.initialized = true
}
var wg sync.WaitGroup
for _, master := range m.masterURLs {
wg.Add(1)
go func(master *url.URL) {
acc.AddError(m.gatherMainMetrics(master, MASTER, acc))
wg.Done()
}(master)
}
for _, slave := range m.slaveURLs {
wg.Add(1)
go func(slave *url.URL) {
acc.AddError(m.gatherMainMetrics(slave, SLAVE, acc))
wg.Done()
}(slave)
}
wg.Wait()
return nil
}
func (m *Mesos) createHTTPClient() (*http.Client, error) {
tlsCfg, err := m.ClientConfig.TLSConfig()
if err != nil {
@ -174,7 +173,7 @@ func (m *Mesos) createHTTPClient() (*http.Client, error) {
}
// metricsDiff() returns set names for removal
func metricsDiff(role Role, w []string) []string {
func metricsDiff(role role, w []string) []string {
b := make([]string, 0, len(allMetrics[role]))
s := make(map[string]bool)
@ -196,10 +195,10 @@ func metricsDiff(role Role, w []string) []string {
}
// masterBlocks serves as kind of metrics registry grouping them in sets
func (m *Mesos) getMetrics(role Role, group string) []string {
func (m *Mesos) getMetrics(role role, group string) []string {
metrics := make(map[string][]string)
if role == MASTER {
if role == master {
metrics["resources"] = []string{
"master/cpus_percent",
"master/cpus_used",
@ -356,7 +355,7 @@ func (m *Mesos) getMetrics(role Role, group string) []string {
"registrar/registry_size_bytes",
"registrar/state_store_ms/count",
}
} else if role == SLAVE {
} else if role == slave {
metrics["resources"] = []string{
"slave/cpus_percent",
"slave/cpus_used",
@ -430,7 +429,6 @@ func (m *Mesos) getMetrics(role Role, group string) []string {
}
ret, ok := metrics[group]
if !ok {
m.Log.Infof("Unknown role %q metrics group: %s", role, group)
return nil
@ -439,13 +437,13 @@ func (m *Mesos) getMetrics(role Role, group string) []string {
return ret
}
func (m *Mesos) filterMetrics(role Role, metrics *map[string]interface{}) {
func (m *Mesos) filterMetrics(role role, metrics *map[string]interface{}) {
var ok bool
var selectedMetrics []string
if role == MASTER {
if role == master {
selectedMetrics = m.MasterCols
} else if role == SLAVE {
} else if role == slave {
selectedMetrics = m.SlaveCols
}
@ -476,13 +474,6 @@ func (m *Mesos) filterMetrics(role Role, metrics *map[string]interface{}) {
}
}
// TaskStats struct for JSON API output /monitor/statistics
type TaskStats struct {
ExecutorID string `json:"executor_id"`
FrameworkID string `json:"framework_id"`
Statistics map[string]interface{} `json:"statistics"`
}
func withPath(u *url.URL, path string) *url.URL {
c := *u
c.Path = path
@ -498,7 +489,7 @@ func urlTag(u *url.URL) string {
}
// This should not belong to the object
func (m *Mesos) gatherMainMetrics(u *url.URL, role Role, acc telegraf.Accumulator) error {
func (m *Mesos) gatherMainMetrics(u *url.URL, role role, acc telegraf.Accumulator) error {
var jsonOut map[string]interface{}
tags := map[string]string{
@ -533,7 +524,7 @@ func (m *Mesos) gatherMainMetrics(u *url.URL, role Role, acc telegraf.Accumulato
return err
}
if role == MASTER {
if role == master {
if jf.Fields["master/elected"] != 0.0 {
tags["state"] = "leader"
} else {

View File

@ -333,11 +333,11 @@ func TestMasterFilter(t *testing.T) {
"messages", "evqueue", "tasks",
}
m.filterMetrics(MASTER, &masterMetrics)
m.filterMetrics(master, &masterMetrics)
// Assert expected metrics are present.
for _, v := range m.MasterCols {
for _, x := range m.getMetrics(MASTER, v) {
for _, x := range m.getMetrics(master, v) {
_, ok := masterMetrics[x]
require.Truef(t, ok, "Didn't find key %s, it should present.", x)
}
@ -354,7 +354,7 @@ func TestMasterFilter(t *testing.T) {
// Assert unexpected metrics are not present.
for _, v := range b {
for _, x := range m.getMetrics(MASTER, v) {
for _, x := range m.getMetrics(master, v) {
_, ok := masterMetrics[x]
require.Falsef(t, ok, "Found key %s, it should be gone.", x)
}
@ -395,16 +395,16 @@ func TestSlaveFilter(t *testing.T) {
"system", "executors", "messages",
}
m.filterMetrics(SLAVE, &slaveMetrics)
m.filterMetrics(slave, &slaveMetrics)
for _, v := range b {
for _, x := range m.getMetrics(SLAVE, v) {
for _, x := range m.getMetrics(slave, v) {
_, ok := slaveMetrics[x]
require.Falsef(t, ok, "Found key %s, it should be gone.", x)
}
}
for _, v := range m.MasterCols {
for _, x := range m.getMetrics(SLAVE, v) {
for _, x := range m.getMetrics(slave, v) {
_, ok := slaveMetrics[x]
require.Truef(t, ok, "Didn't find key %s, it should present.", x)
}

View File

@ -13,16 +13,16 @@ var (
scoreboardRegex = regexp.MustCompile(`\[(?P<name>[^\]]+)\]: (?P<value>\d+)`)
)
// Connection is an established connection to the Minecraft server.
type Connection interface {
// connection is an established connection to the Minecraft server.
type connection interface {
// Execute runs a command.
Execute(command string) (string, error)
}
// Connector is used to create connections to the Minecraft server.
type Connector interface {
// Connect establishes a connection to the server.
Connect() (Connection, error)
// conn is used to create connections to the Minecraft server.
type conn interface {
// connect establishes a connection to the server.
connect() (connection, error)
}
func newConnector(hostname, port, password string) *connector {
@ -39,7 +39,7 @@ type connector struct {
password string
}
func (c *connector) Connect() (Connection, error) {
func (c *connector) connect() (connection, error) {
client, err := rcon.Dial(c.hostname+":"+c.port, c.password)
if err != nil {
return nil, err
@ -48,17 +48,17 @@ func (c *connector) Connect() (Connection, error) {
return client, nil
}
func newClient(connector Connector) *client {
func newClient(connector conn) *client {
return &client{connector: connector}
}
type client struct {
connector Connector
conn Connection
connector conn
conn connection
}
func (c *client) Connect() error {
conn, err := c.connector.Connect()
func (c *client) connect() error {
conn, err := c.connector.connect()
if err != nil {
return err
}
@ -66,9 +66,9 @@ func (c *client) Connect() error {
return nil
}
func (c *client) Players() ([]string, error) {
func (c *client) players() ([]string, error) {
if c.conn == nil {
err := c.Connect()
err := c.connect()
if err != nil {
return nil, err
}
@ -83,9 +83,9 @@ func (c *client) Players() ([]string, error) {
return parsePlayers(resp), nil
}
func (c *client) Scores(player string) ([]Score, error) {
func (c *client) scores(player string) ([]score, error) {
if c.conn == nil {
err := c.Connect()
err := c.connect()
if err != nil {
return nil, err
}
@ -127,13 +127,13 @@ func parsePlayers(input string) []string {
return players
}
// Score is an individual tracked scoreboard stat.
type Score struct {
Name string
Value int64
// score is an individual tracked scoreboard stat.
type score struct {
name string
value int64
}
func parseScores(input string) []Score {
func parseScores(input string) []score {
if strings.Contains(input, "has no scores") {
return nil
}
@ -147,19 +147,19 @@ func parseScores(input string) []Score {
}
matches := re.FindAllStringSubmatch(input, -1)
scores := make([]Score, 0, len(matches))
scores := make([]score, 0, len(matches))
for _, match := range matches {
score := Score{}
score := score{}
for i, subexp := range re.SubexpNames() {
switch subexp {
case "name":
score.Name = match[i]
score.name = match[i]
case "value":
value, err := strconv.ParseInt(match[i], 10, 64)
if err != nil {
continue
}
score.Value = value
score.value = value
default:
continue
}

View File

@ -6,19 +6,19 @@ import (
"github.com/stretchr/testify/require"
)
type MockConnection struct {
type mockConnection struct {
commands map[string]string
}
func (c *MockConnection) Execute(command string) (string, error) {
func (c *mockConnection) Execute(command string) (string, error) {
return c.commands[command], nil
}
type MockConnector struct {
conn *MockConnection
type mockConnector struct {
conn *mockConnection
}
func (c *MockConnector) Connect() (Connection, error) {
func (c *mockConnector) connect() (connection, error) {
return c.conn, nil
}
@ -92,12 +92,12 @@ func TestClient_Player(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
connector := &MockConnector{
conn: &MockConnection{commands: tt.commands},
connector := &mockConnector{
conn: &mockConnection{commands: tt.commands},
}
client := newClient(connector)
actual, err := client.Players()
actual, err := client.players()
require.NoError(t, err)
require.Equal(t, tt.expected, actual)
@ -110,7 +110,7 @@ func TestClient_Scores(t *testing.T) {
name string
player string
commands map[string]string
expected []Score
expected []score
}{
{
name: "minecraft 1.12 player with no scores",
@ -125,8 +125,8 @@ func TestClient_Scores(t *testing.T) {
commands: map[string]string{
"scoreboard players list Etho": "Showing 1 tracked objective(s) for Etho:- jump: 2 (jump)",
},
expected: []Score{
{Name: "jump", Value: 2},
expected: []score{
{name: "jump", value: 2},
},
},
{
@ -135,10 +135,10 @@ func TestClient_Scores(t *testing.T) {
commands: map[string]string{
"scoreboard players list Etho": "Showing 3 tracked objective(s) for Etho:- hopper: 2 (hopper)- dropper: 2 (dropper)- redstone: 1 (redstone)",
},
expected: []Score{
{Name: "hopper", Value: 2},
{Name: "dropper", Value: 2},
{Name: "redstone", Value: 1},
expected: []score{
{name: "hopper", value: 2},
{name: "dropper", value: 2},
{name: "redstone", value: 1},
},
},
{
@ -154,8 +154,8 @@ func TestClient_Scores(t *testing.T) {
commands: map[string]string{
"scoreboard players list Etho": "Etho has 1 scores:[jumps]: 1",
},
expected: []Score{
{Name: "jumps", Value: 1},
expected: []score{
{name: "jumps", value: 1},
},
},
{
@ -164,21 +164,21 @@ func TestClient_Scores(t *testing.T) {
commands: map[string]string{
"scoreboard players list Etho": "Etho has 3 scores:[hopper]: 2[dropper]: 2[redstone]: 1",
},
expected: []Score{
{Name: "hopper", Value: 2},
{Name: "dropper", Value: 2},
{Name: "redstone", Value: 1},
expected: []score{
{name: "hopper", value: 2},
{name: "dropper", value: 2},
{name: "redstone", value: 1},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
connector := &MockConnector{
conn: &MockConnection{commands: tt.commands},
connector := &mockConnector{
conn: &mockConnection{commands: tt.commands},
}
client := newClient(connector)
actual, err := client.Scores(tt.player)
actual, err := client.scores(tt.player)
require.NoError(t, err)
require.Equal(t, tt.expected, actual)

View File

@ -11,25 +11,24 @@ import (
//go:embed sample.conf
var sampleConfig string
// Client is a client for the Minecraft server.
type Client interface {
// Connect establishes a connection to the server.
Connect() error
// Players returns the players on the scoreboard.
Players() ([]string, error)
// Scores return the objective scores for a player.
Scores(player string) ([]Score, error)
}
// Minecraft is the plugin type.
type Minecraft struct {
Server string `toml:"server"`
Port string `toml:"port"`
Password string `toml:"password"`
client Client
client cli
}
// cli is a client for the Minecraft server.
type cli interface {
// connect establishes a connection to the server.
connect() error
// players returns the players on the scoreboard.
players() ([]string, error)
// scores returns the objective scores for a player.
scores(player string) ([]score, error)
}
func (*Minecraft) SampleConfig() string {
@ -42,13 +41,13 @@ func (s *Minecraft) Gather(acc telegraf.Accumulator) error {
s.client = newClient(connector)
}
players, err := s.client.Players()
players, err := s.client.players()
if err != nil {
return err
}
for _, player := range players {
scores, err := s.client.Scores(player)
scores, err := s.client.scores(player)
if err != nil {
return err
}
@ -62,7 +61,7 @@ func (s *Minecraft) Gather(acc telegraf.Accumulator) error {
var fields = make(map[string]interface{}, len(scores))
for _, score := range scores {
fields[score.Name] = score.Value
fields[score.name] = score.value
}
acc.AddFields("minecraft", fields, tags)

View File

@ -9,22 +9,22 @@ import (
"github.com/stretchr/testify/require"
)
type MockClient struct {
ConnectF func() error
PlayersF func() ([]string, error)
ScoresF func(player string) ([]Score, error)
type mockClient struct {
connectF func() error
playersF func() ([]string, error)
scoresF func(player string) ([]score, error)
}
func (c *MockClient) Connect() error {
return c.ConnectF()
func (c *mockClient) connect() error {
return c.connectF()
}
func (c *MockClient) Players() ([]string, error) {
return c.PlayersF()
func (c *mockClient) players() ([]string, error) {
return c.playersF()
}
func (c *MockClient) Scores(player string) ([]Score, error) {
return c.ScoresF(player)
func (c *mockClient) scores(player string) ([]score, error) {
return c.scoresF(player)
}
func TestGather(t *testing.T) {
@ -32,31 +32,31 @@ func TestGather(t *testing.T) {
tests := []struct {
name string
client *MockClient
client *mockClient
metrics []telegraf.Metric
err error
}{
{
name: "no players",
client: &MockClient{
ConnectF: func() error {
client: &mockClient{
connectF: func() error {
return nil
},
PlayersF: func() ([]string, error) {
playersF: func() ([]string, error) {
return nil, nil
},
},
},
{
name: "one player without scores",
client: &MockClient{
ConnectF: func() error {
client: &mockClient{
connectF: func() error {
return nil
},
PlayersF: func() ([]string, error) {
playersF: func() ([]string, error) {
return []string{"Etho"}, nil
},
ScoresF: func(player string) ([]Score, error) {
scoresF: func(player string) ([]score, error) {
switch player {
case "Etho":
return nil, nil
@ -68,17 +68,17 @@ func TestGather(t *testing.T) {
},
{
name: "one player with scores",
client: &MockClient{
ConnectF: func() error {
client: &mockClient{
connectF: func() error {
return nil
},
PlayersF: func() ([]string, error) {
playersF: func() ([]string, error) {
return []string{"Etho"}, nil
},
ScoresF: func(player string) ([]Score, error) {
scoresF: func(player string) ([]score, error) {
switch player {
case "Etho":
return []Score{{Name: "jumps", Value: 42}}, nil
return []score{{name: "jumps", value: 42}}, nil
default:
panic("unknown player")
}

View File

@ -48,22 +48,22 @@ type sineWave struct {
}
type step struct {
latest float64
Name string `toml:"name"`
Start float64 `toml:"start"`
Step float64 `toml:"step"`
Min float64 `toml:"min" deprecated:"1.28.2;1.35.0;use 'start' instead"`
Max float64 `toml:"max" deprecated:"1.28.2;1.35.0;use 'step' instead"`
latest float64
}
type stock struct {
latest float64
Name string `toml:"name"`
Price float64 `toml:"price"`
Volatility float64 `toml:"volatility"`
latest float64
}
func (*Mock) SampleConfig() string {

View File

@ -9,10 +9,10 @@ const (
maxQuantityHoldingRegisters = uint16(125)
)
type Configuration interface {
Check() error
Process() (map[byte]requestSet, error)
SampleConfigPart() string
type configuration interface {
check() error
process() (map[byte]requestSet, error)
sampleConfigPart() string
}
func removeDuplicates(elements []uint16) []uint16 {

View File

@ -32,21 +32,21 @@ type metricDefinition struct {
Tags map[string]string `toml:"tags"`
}
type ConfigurationPerMetric struct {
type configurationPerMetric struct {
Optimization string `toml:"optimization"`
MaxExtraRegisters uint16 `toml:"optimization_max_register_fill"`
Metrics []metricDefinition `toml:"metric"`
workarounds ModbusWorkarounds
workarounds workarounds
excludeRegisterType bool
logger telegraf.Logger
}
func (c *ConfigurationPerMetric) SampleConfigPart() string {
func (c *configurationPerMetric) sampleConfigPart() string {
return sampleConfigPartPerMetric
}
func (c *ConfigurationPerMetric) Check() error {
func (c *configurationPerMetric) check() error {
switch c.workarounds.StringRegisterLocation {
case "", "both", "lower", "upper":
// Do nothing as those are valid
@ -178,7 +178,7 @@ func (c *ConfigurationPerMetric) Check() error {
return nil
}
func (c *ConfigurationPerMetric) Process() (map[byte]requestSet, error) {
func (c *configurationPerMetric) process() (map[byte]requestSet, error) {
collection := make(map[byte]map[string][]field)
// Collect the requested registers across metrics and transform them into
@ -206,40 +206,40 @@ func (c *ConfigurationPerMetric) Process() (map[byte]requestSet, error) {
result := make(map[byte]requestSet)
params := groupingParams{
Optimization: c.Optimization,
MaxExtraRegisters: c.MaxExtraRegisters,
Log: c.logger,
optimization: c.Optimization,
maxExtraRegisters: c.MaxExtraRegisters,
log: c.logger,
}
for sid, scollection := range collection {
var set requestSet
for registerType, fields := range scollection {
switch registerType {
case "coil":
params.MaxBatchSize = maxQuantityCoils
params.maxBatchSize = maxQuantityCoils
if c.workarounds.OnRequestPerField {
params.MaxBatchSize = 1
params.maxBatchSize = 1
}
params.EnforceFromZero = c.workarounds.ReadCoilsStartingAtZero
params.enforceFromZero = c.workarounds.ReadCoilsStartingAtZero
requests := groupFieldsToRequests(fields, params)
set.coil = append(set.coil, requests...)
case "discrete":
params.MaxBatchSize = maxQuantityDiscreteInput
params.maxBatchSize = maxQuantityDiscreteInput
if c.workarounds.OnRequestPerField {
params.MaxBatchSize = 1
params.maxBatchSize = 1
}
requests := groupFieldsToRequests(fields, params)
set.discrete = append(set.discrete, requests...)
case "holding":
params.MaxBatchSize = maxQuantityHoldingRegisters
params.maxBatchSize = maxQuantityHoldingRegisters
if c.workarounds.OnRequestPerField {
params.MaxBatchSize = 1
params.maxBatchSize = 1
}
requests := groupFieldsToRequests(fields, params)
set.holding = append(set.holding, requests...)
case "input":
params.MaxBatchSize = maxQuantityInputRegisters
params.maxBatchSize = maxQuantityInputRegisters
if c.workarounds.OnRequestPerField {
params.MaxBatchSize = 1
params.maxBatchSize = 1
}
requests := groupFieldsToRequests(fields, params)
set.input = append(set.input, requests...)
@ -247,7 +247,7 @@ func (c *ConfigurationPerMetric) Process() (map[byte]requestSet, error) {
return nil, fmt.Errorf("unknown register type %q", registerType)
}
}
if !set.Empty() {
if !set.empty() {
result[sid] = set
}
}
@ -255,7 +255,7 @@ func (c *ConfigurationPerMetric) Process() (map[byte]requestSet, error) {
return result, nil
}
func (c *ConfigurationPerMetric) newField(def metricFieldDefinition, mdef metricDefinition) (field, error) {
func (c *configurationPerMetric) newField(def metricFieldDefinition, mdef metricDefinition) (field, error) {
typed := def.RegisterType == "holding" || def.RegisterType == "input"
fieldLength := uint16(1)
@ -339,7 +339,7 @@ func (c *ConfigurationPerMetric) newField(def metricFieldDefinition, mdef metric
return f, nil
}
func (c *ConfigurationPerMetric) fieldID(seed maphash.Seed, def metricDefinition, field metricFieldDefinition) uint64 {
func (c *configurationPerMetric) fieldID(seed maphash.Seed, def metricDefinition, field metricFieldDefinition) uint64 {
var mh maphash.Hash
mh.SetSeed(seed)
@ -354,7 +354,7 @@ func (c *ConfigurationPerMetric) fieldID(seed maphash.Seed, def metricDefinition
mh.WriteString(field.Name)
mh.WriteByte(0)
// Tags
// tags
for k, v := range def.Tags {
mh.WriteString(k)
mh.WriteByte('=')
@ -366,7 +366,7 @@ func (c *ConfigurationPerMetric) fieldID(seed maphash.Seed, def metricDefinition
return mh.Sum64()
}
func (c *ConfigurationPerMetric) determineOutputDatatype(input string) (string, error) {
func (c *configurationPerMetric) determineOutputDatatype(input string) (string, error) {
// Handle our special types
switch input {
case "INT8L", "INT8H", "INT16", "INT32", "INT64":
@ -381,7 +381,7 @@ func (c *ConfigurationPerMetric) determineOutputDatatype(input string) (string,
return "unknown", fmt.Errorf("invalid input datatype %q for determining output", input)
}
func (c *ConfigurationPerMetric) determineFieldLength(input string, length uint16) (uint16, error) {
func (c *configurationPerMetric) determineFieldLength(input string, length uint16) (uint16, error) {
// Handle our special types
switch input {
case "BIT", "INT8L", "INT8H", "UINT8L", "UINT8H":

View File

@ -371,7 +371,7 @@ func TestMetricAddressOverflow(t *testing.T) {
Controller: "tcp://localhost:1502",
ConfigurationType: "metric",
Log: logger,
Workarounds: ModbusWorkarounds{ReadCoilsStartingAtZero: true},
Workarounds: workarounds{ReadCoilsStartingAtZero: true},
}
plugin.Metrics = []metricDefinition{
{

View File

@ -21,21 +21,21 @@ type fieldDefinition struct {
Bit uint8 `toml:"bit"`
}
type ConfigurationOriginal struct {
type configurationOriginal struct {
SlaveID byte `toml:"slave_id"`
DiscreteInputs []fieldDefinition `toml:"discrete_inputs"`
Coils []fieldDefinition `toml:"coils"`
HoldingRegisters []fieldDefinition `toml:"holding_registers"`
InputRegisters []fieldDefinition `toml:"input_registers"`
workarounds ModbusWorkarounds
workarounds workarounds
logger telegraf.Logger
}
func (c *ConfigurationOriginal) SampleConfigPart() string {
func (c *configurationOriginal) sampleConfigPart() string {
return sampleConfigPartPerRegister
}
func (c *ConfigurationOriginal) Check() error {
func (c *configurationOriginal) check() error {
switch c.workarounds.StringRegisterLocation {
case "", "both", "lower", "upper":
// Do nothing as those are valid
@ -58,7 +58,7 @@ func (c *ConfigurationOriginal) Check() error {
return c.validateFieldDefinitions(c.InputRegisters, cInputRegisters)
}
func (c *ConfigurationOriginal) Process() (map[byte]requestSet, error) {
func (c *configurationOriginal) process() (map[byte]requestSet, error) {
maxQuantity := uint16(1)
if !c.workarounds.OnRequestPerField {
maxQuantity = maxQuantityCoils
@ -102,22 +102,22 @@ func (c *ConfigurationOriginal) Process() (map[byte]requestSet, error) {
}, nil
}
func (c *ConfigurationOriginal) initRequests(fieldDefs []fieldDefinition, maxQuantity uint16, typed bool) ([]request, error) {
func (c *configurationOriginal) initRequests(fieldDefs []fieldDefinition, maxQuantity uint16, typed bool) ([]request, error) {
fields, err := c.initFields(fieldDefs, typed)
if err != nil {
return nil, err
}
params := groupingParams{
MaxBatchSize: maxQuantity,
Optimization: "none",
EnforceFromZero: c.workarounds.ReadCoilsStartingAtZero,
Log: c.logger,
maxBatchSize: maxQuantity,
optimization: "none",
enforceFromZero: c.workarounds.ReadCoilsStartingAtZero,
log: c.logger,
}
return groupFieldsToRequests(fields, params), nil
}
func (c *ConfigurationOriginal) initFields(fieldDefs []fieldDefinition, typed bool) ([]field, error) {
func (c *configurationOriginal) initFields(fieldDefs []fieldDefinition, typed bool) ([]field, error) {
// Construct the fields from the field definitions
fields := make([]field, 0, len(fieldDefs))
for _, def := range fieldDefs {
@ -131,7 +131,7 @@ func (c *ConfigurationOriginal) initFields(fieldDefs []fieldDefinition, typed bo
return fields, nil
}
func (c *ConfigurationOriginal) newFieldFromDefinition(def fieldDefinition, typed bool) (field, error) {
func (c *configurationOriginal) newFieldFromDefinition(def fieldDefinition, typed bool) (field, error) {
// Check if the addresses are consecutive
expected := def.Address[0]
for _, current := range def.Address[1:] {
@ -182,7 +182,7 @@ func (c *ConfigurationOriginal) newFieldFromDefinition(def fieldDefinition, type
return f, nil
}
func (c *ConfigurationOriginal) validateFieldDefinitions(fieldDefs []fieldDefinition, registerType string) error {
func (c *configurationOriginal) validateFieldDefinitions(fieldDefs []fieldDefinition, registerType string) error {
nameEncountered := make(map[string]bool, len(fieldDefs))
for _, item := range fieldDefs {
// check empty name
@ -276,7 +276,7 @@ func (c *ConfigurationOriginal) validateFieldDefinitions(fieldDefs []fieldDefini
return nil
}
func (c *ConfigurationOriginal) normalizeInputDatatype(dataType string, words int) (string, error) {
func (c *configurationOriginal) normalizeInputDatatype(dataType string, words int) (string, error) {
if dataType == "FLOAT32" {
config.PrintOptionValueDeprecationNotice("input.modbus", "data_type", "FLOAT32", telegraf.DeprecationInfo{
Since: "1.16.0",
@ -323,7 +323,7 @@ func (c *ConfigurationOriginal) normalizeInputDatatype(dataType string, words in
return normalizeInputDatatype(dataType)
}
func (c *ConfigurationOriginal) normalizeOutputDatatype(dataType string) (string, error) {
func (c *configurationOriginal) normalizeOutputDatatype(dataType string) (string, error) {
// Handle our special types
switch dataType {
case "FIXED", "FLOAT32", "UFIXED":
@ -332,7 +332,7 @@ func (c *ConfigurationOriginal) normalizeOutputDatatype(dataType string) (string
return normalizeOutputDatatype("native")
}
func (c *ConfigurationOriginal) normalizeByteOrder(byteOrder string) (string, error) {
func (c *configurationOriginal) normalizeByteOrder(byteOrder string) (string, error) {
// Handle our special types
switch byteOrder {
case "AB", "ABCDEFGH":

View File

@ -37,19 +37,19 @@ type requestDefinition struct {
Tags map[string]string `toml:"tags"`
}
type ConfigurationPerRequest struct {
type configurationPerRequest struct {
Requests []requestDefinition `toml:"request"`
workarounds ModbusWorkarounds
workarounds workarounds
excludeRegisterType bool
logger telegraf.Logger
}
func (c *ConfigurationPerRequest) SampleConfigPart() string {
func (c *configurationPerRequest) sampleConfigPart() string {
return sampleConfigPartPerRequest
}
func (c *ConfigurationPerRequest) Check() error {
func (c *configurationPerRequest) check() error {
switch c.workarounds.StringRegisterLocation {
case "", "both", "lower", "upper":
// Do nothing as those are valid
@ -213,7 +213,7 @@ func (c *ConfigurationPerRequest) Check() error {
return nil
}
func (c *ConfigurationPerRequest) Process() (map[byte]requestSet, error) {
func (c *configurationPerRequest) process() (map[byte]requestSet, error) {
result := make(map[byte]requestSet, len(c.Requests))
for _, def := range c.Requests {
// Set default
@ -235,45 +235,45 @@ func (c *ConfigurationPerRequest) Process() (map[byte]requestSet, error) {
}
params := groupingParams{
MaxExtraRegisters: def.MaxExtraRegisters,
Optimization: def.Optimization,
Tags: def.Tags,
Log: c.logger,
maxExtraRegisters: def.MaxExtraRegisters,
optimization: def.Optimization,
tags: def.Tags,
log: c.logger,
}
switch def.RegisterType {
case "coil":
params.MaxBatchSize = maxQuantityCoils
params.maxBatchSize = maxQuantityCoils
if c.workarounds.OnRequestPerField {
params.MaxBatchSize = 1
params.maxBatchSize = 1
}
params.EnforceFromZero = c.workarounds.ReadCoilsStartingAtZero
params.enforceFromZero = c.workarounds.ReadCoilsStartingAtZero
requests := groupFieldsToRequests(fields, params)
set.coil = append(set.coil, requests...)
case "discrete":
params.MaxBatchSize = maxQuantityDiscreteInput
params.maxBatchSize = maxQuantityDiscreteInput
if c.workarounds.OnRequestPerField {
params.MaxBatchSize = 1
params.maxBatchSize = 1
}
requests := groupFieldsToRequests(fields, params)
set.discrete = append(set.discrete, requests...)
case "holding":
params.MaxBatchSize = maxQuantityHoldingRegisters
params.maxBatchSize = maxQuantityHoldingRegisters
if c.workarounds.OnRequestPerField {
params.MaxBatchSize = 1
params.maxBatchSize = 1
}
requests := groupFieldsToRequests(fields, params)
set.holding = append(set.holding, requests...)
case "input":
params.MaxBatchSize = maxQuantityInputRegisters
params.maxBatchSize = maxQuantityInputRegisters
if c.workarounds.OnRequestPerField {
params.MaxBatchSize = 1
params.maxBatchSize = 1
}
requests := groupFieldsToRequests(fields, params)
set.input = append(set.input, requests...)
default:
return nil, fmt.Errorf("unknown register type %q", def.RegisterType)
}
if !set.Empty() {
if !set.empty() {
result[def.SlaveID] = set
}
}
@ -281,7 +281,7 @@ func (c *ConfigurationPerRequest) Process() (map[byte]requestSet, error) {
return result, nil
}
func (c *ConfigurationPerRequest) initFields(fieldDefs []requestFieldDefinition, typed bool, byteOrder string) ([]field, error) {
func (c *configurationPerRequest) initFields(fieldDefs []requestFieldDefinition, typed bool, byteOrder string) ([]field, error) {
// Construct the fields from the field definitions
fields := make([]field, 0, len(fieldDefs))
for _, def := range fieldDefs {
@ -295,7 +295,7 @@ func (c *ConfigurationPerRequest) initFields(fieldDefs []requestFieldDefinition,
return fields, nil
}
func (c *ConfigurationPerRequest) newFieldFromDefinition(def requestFieldDefinition, typed bool, byteOrder string) (field, error) {
func (c *configurationPerRequest) newFieldFromDefinition(def requestFieldDefinition, typed bool, byteOrder string) (field, error) {
var err error
fieldLength := uint16(1)
@ -379,7 +379,7 @@ func (c *ConfigurationPerRequest) newFieldFromDefinition(def requestFieldDefinit
return f, nil
}
func (c *ConfigurationPerRequest) fieldID(seed maphash.Seed, def requestDefinition, field requestFieldDefinition) uint64 {
func (c *configurationPerRequest) fieldID(seed maphash.Seed, def requestDefinition, field requestFieldDefinition) uint64 {
var mh maphash.Hash
mh.SetSeed(seed)
@ -394,7 +394,7 @@ func (c *ConfigurationPerRequest) fieldID(seed maphash.Seed, def requestDefiniti
mh.WriteString(field.Name)
mh.WriteByte(0)
// Tags
// tags
for k, v := range def.Tags {
mh.WriteString(k)
mh.WriteByte('=')
@ -406,7 +406,7 @@ func (c *ConfigurationPerRequest) fieldID(seed maphash.Seed, def requestDefiniti
return mh.Sum64()
}
func (c *ConfigurationPerRequest) determineOutputDatatype(input string) (string, error) {
func (c *configurationPerRequest) determineOutputDatatype(input string) (string, error) {
// Handle our special types
switch input {
case "INT8L", "INT8H", "INT16", "INT32", "INT64":
@ -421,7 +421,7 @@ func (c *ConfigurationPerRequest) determineOutputDatatype(input string) (string,
return "unknown", fmt.Errorf("invalid input datatype %q for determining output", input)
}
func (c *ConfigurationPerRequest) determineFieldLength(input string, length uint16) (uint16, error) {
func (c *configurationPerRequest) determineFieldLength(input string, length uint16) (uint16, error) {
// Handle our special types
switch input {
case "BIT", "INT8L", "INT8H", "UINT8L", "UINT8H":

View File

@ -3177,7 +3177,7 @@ func TestRequestWorkaroundsOneRequestPerField(t *testing.T) {
Controller: "tcp://localhost:1502",
ConfigurationType: "request",
Log: testutil.Logger{},
Workarounds: ModbusWorkarounds{OnRequestPerField: true},
Workarounds: workarounds{OnRequestPerField: true},
}
plugin.Requests = []requestDefinition{
{
@ -3223,7 +3223,7 @@ func TestRequestWorkaroundsReadCoilsStartingAtZeroRequest(t *testing.T) {
Controller: "tcp://localhost:1502",
ConfigurationType: "request",
Log: testutil.Logger{},
Workarounds: ModbusWorkarounds{ReadCoilsStartingAtZero: true},
Workarounds: workarounds{ReadCoilsStartingAtZero: true},
}
plugin.SlaveID = 1
plugin.Requests = []requestDefinition{
@ -3262,7 +3262,7 @@ func TestRequestOverlap(t *testing.T) {
Controller: "tcp://localhost:1502",
ConfigurationType: "request",
Log: logger,
Workarounds: ModbusWorkarounds{ReadCoilsStartingAtZero: true},
Workarounds: workarounds{ReadCoilsStartingAtZero: true},
}
plugin.Requests = []requestDefinition{
{
@ -3320,7 +3320,7 @@ func TestRequestAddressOverflow(t *testing.T) {
Controller: "tcp://localhost:1502",
ConfigurationType: "request",
Log: logger,
Workarounds: ModbusWorkarounds{ReadCoilsStartingAtZero: true},
Workarounds: workarounds{ReadCoilsStartingAtZero: true},
}
plugin.Requests = []requestDefinition{
{

View File

@ -27,7 +27,45 @@ var sampleConfigEnd string
var errAddressOverflow = errors.New("address overflow")
type ModbusWorkarounds struct {
const (
cDiscreteInputs = "discrete_input"
cCoils = "coil"
cHoldingRegisters = "holding_register"
cInputRegisters = "input_register"
)
type Modbus struct {
Name string `toml:"name"`
Controller string `toml:"controller"`
TransmissionMode string `toml:"transmission_mode"`
BaudRate int `toml:"baud_rate"`
DataBits int `toml:"data_bits"`
Parity string `toml:"parity"`
StopBits int `toml:"stop_bits"`
RS485 *rs485Config `toml:"rs485"`
Timeout config.Duration `toml:"timeout"`
Retries int `toml:"busy_retries"`
RetriesWaitTime config.Duration `toml:"busy_retries_wait"`
DebugConnection bool `toml:"debug_connection" deprecated:"1.35.0;use 'log_level' 'trace' instead"`
Workarounds workarounds `toml:"workarounds"`
ConfigurationType string `toml:"configuration_type"`
ExcludeRegisterTypeTag bool `toml:"exclude_register_type_tag"`
Log telegraf.Logger `toml:"-"`
// configuration type specific settings
configurationOriginal
configurationPerRequest
configurationPerMetric
// Connection handling
client mb.Client
handler mb.ClientHandler
isConnected bool
// Request handling
requests map[byte]requestSet
}
type workarounds struct {
AfterConnectPause config.Duration `toml:"pause_after_connect"`
PollPause config.Duration `toml:"pause_between_requests"`
CloseAfterGather bool `toml:"close_connection_after_gather"`
@ -37,7 +75,7 @@ type ModbusWorkarounds struct {
}
// According to github.com/grid-x/serial
type RS485Config struct {
type rs485Config struct {
DelayRtsBeforeSend config.Duration `toml:"delay_rts_before_send"`
DelayRtsAfterSend config.Duration `toml:"delay_rts_after_send"`
RtsHighDuringSend bool `toml:"rts_high_during_send"`
@ -45,38 +83,6 @@ type RS485Config struct {
RxDuringTx bool `toml:"rx_during_tx"`
}
// Modbus holds all data relevant to the plugin
type Modbus struct {
Name string `toml:"name"`
Controller string `toml:"controller"`
TransmissionMode string `toml:"transmission_mode"`
BaudRate int `toml:"baud_rate"`
DataBits int `toml:"data_bits"`
Parity string `toml:"parity"`
StopBits int `toml:"stop_bits"`
RS485 *RS485Config `toml:"rs485"`
Timeout config.Duration `toml:"timeout"`
Retries int `toml:"busy_retries"`
RetriesWaitTime config.Duration `toml:"busy_retries_wait"`
DebugConnection bool `toml:"debug_connection" deprecated:"1.35.0;use 'log_level' 'trace' instead"`
Workarounds ModbusWorkarounds `toml:"workarounds"`
ConfigurationType string `toml:"configuration_type"`
ExcludeRegisterTypeTag bool `toml:"exclude_register_type_tag"`
Log telegraf.Logger `toml:"-"`
// Configuration type specific settings
ConfigurationOriginal
ConfigurationPerRequest
ConfigurationPerMetric
// Connection handling
client mb.Client
handler mb.ClientHandler
isConnected bool
// Request handling
requests map[byte]requestSet
}
type fieldConverterFunc func(bytes []byte) interface{}
type requestSet struct {
@ -86,7 +92,7 @@ type requestSet struct {
input []request
}
func (r requestSet) Empty() bool {
func (r requestSet) empty() bool {
l := len(r.coil)
l += len(r.discrete)
l += len(r.holding)
@ -105,24 +111,16 @@ type field struct {
tags map[string]string
}
const (
cDiscreteInputs = "discrete_input"
cCoils = "coil"
cHoldingRegisters = "holding_register"
cInputRegisters = "input_register"
)
// SampleConfig returns a basic configuration for the plugin
func (m *Modbus) SampleConfig() string {
configs := []Configuration{
&m.ConfigurationOriginal,
&m.ConfigurationPerRequest,
&m.ConfigurationPerMetric,
configs := []configuration{
&m.configurationOriginal,
&m.configurationPerRequest,
&m.configurationPerMetric,
}
totalConfig := sampleConfigStart
for _, c := range configs {
totalConfig += c.SampleConfigPart() + "\n"
totalConfig += c.sampleConfigPart() + "\n"
}
totalConfig += "\n"
totalConfig += sampleConfigEnd
@ -140,32 +138,32 @@ func (m *Modbus) Init() error {
}
// Determine the configuration style
var cfg Configuration
var cfg configuration
switch m.ConfigurationType {
case "", "register":
m.ConfigurationOriginal.workarounds = m.Workarounds
m.ConfigurationOriginal.logger = m.Log
cfg = &m.ConfigurationOriginal
m.configurationOriginal.workarounds = m.Workarounds
m.configurationOriginal.logger = m.Log
cfg = &m.configurationOriginal
case "request":
m.ConfigurationPerRequest.workarounds = m.Workarounds
m.ConfigurationPerRequest.excludeRegisterType = m.ExcludeRegisterTypeTag
m.ConfigurationPerRequest.logger = m.Log
cfg = &m.ConfigurationPerRequest
m.configurationPerRequest.workarounds = m.Workarounds
m.configurationPerRequest.excludeRegisterType = m.ExcludeRegisterTypeTag
m.configurationPerRequest.logger = m.Log
cfg = &m.configurationPerRequest
case "metric":
m.ConfigurationPerMetric.workarounds = m.Workarounds
m.ConfigurationPerMetric.excludeRegisterType = m.ExcludeRegisterTypeTag
m.ConfigurationPerMetric.logger = m.Log
cfg = &m.ConfigurationPerMetric
m.configurationPerMetric.workarounds = m.Workarounds
m.configurationPerMetric.excludeRegisterType = m.ExcludeRegisterTypeTag
m.configurationPerMetric.logger = m.Log
cfg = &m.configurationPerMetric
default:
return fmt.Errorf("unknown configuration type %q in device %q", m.ConfigurationType, m.Name)
}
// Check and process the configuration
if err := cfg.Check(); err != nil {
if err := cfg.check(); err != nil {
return fmt.Errorf("configuration invalid for device %q: %w", m.Name, err)
}
r, err := cfg.Process()
r, err := cfg.process()
if err != nil {
return fmt.Errorf("cannot process configuration for device %q: %w", m.Name, err)
}
@ -219,7 +217,6 @@ func (m *Modbus) Init() error {
return nil
}
// Gather implements the telegraf plugin interface method for data accumulation
func (m *Modbus) Gather(acc telegraf.Accumulator) error {
if !m.isConnected {
if err := m.connect(); err != nil {
@ -558,7 +555,7 @@ func (m *Modbus) collectFields(grouper *metric.SeriesGrouper, timestamp time.Tim
}
}
// Implement the logger interface of the modbus client
// Printf implements the logger interface of the modbus client
func (m *Modbus) Printf(format string, v ...interface{}) {
m.Log.Tracef(format, v...)
}

View File

@ -491,7 +491,7 @@ func TestRegisterWorkaroundsOneRequestPerField(t *testing.T) {
Controller: "tcp://localhost:1502",
ConfigurationType: "register",
Log: testutil.Logger{Quiet: true},
Workarounds: ModbusWorkarounds{OnRequestPerField: true},
Workarounds: workarounds{OnRequestPerField: true},
}
plugin.SlaveID = 1
plugin.HoldingRegisters = []fieldDefinition{
@ -541,7 +541,7 @@ func TestRequestsWorkaroundsReadCoilsStartingAtZeroRegister(t *testing.T) {
Controller: "tcp://localhost:1502",
ConfigurationType: "register",
Log: testutil.Logger{Quiet: true},
Workarounds: ModbusWorkarounds{ReadCoilsStartingAtZero: true},
Workarounds: workarounds{ReadCoilsStartingAtZero: true},
}
plugin.SlaveID = 1
plugin.Coils = []fieldDefinition{
@ -688,8 +688,8 @@ func TestWorkaroundsStringRegisterLocation(t *testing.T) {
Controller: "tcp://localhost:1502",
ConfigurationType: "request",
Log: testutil.Logger{Quiet: true},
Workarounds: ModbusWorkarounds{StringRegisterLocation: tt.location},
ConfigurationPerRequest: ConfigurationPerRequest{
Workarounds: workarounds{StringRegisterLocation: tt.location},
configurationPerRequest: configurationPerRequest{
Requests: []requestDefinition{
{
SlaveID: 1,
@ -738,7 +738,7 @@ func TestWorkaroundsStringRegisterLocationInvalid(t *testing.T) {
Controller: "tcp://localhost:1502",
ConfigurationType: "request",
Log: testutil.Logger{Quiet: true},
Workarounds: ModbusWorkarounds{StringRegisterLocation: "foo"},
Workarounds: workarounds{StringRegisterLocation: "foo"},
}
require.ErrorContains(t, plugin.Init(), `invalid 'string_register_location'`)
}

View File

@ -138,7 +138,7 @@ func optimizeGroup(g request, maxBatchSize uint16) []request {
return requests
}
func optimitzeGroupWithinLimits(g request, params groupingParams) []request {
func optimizeGroupWithinLimits(g request, params groupingParams) []request {
if len(g.fields) == 0 {
return nil
}
@ -153,14 +153,14 @@ func optimitzeGroupWithinLimits(g request, params groupingParams) []request {
// Check if we need to interrupt the current chunk and require a new one
holeSize := g.fields[i].address - (g.fields[i-1].address + g.fields[i-1].length)
if g.fields[i].address < g.fields[i-1].address+g.fields[i-1].length {
params.Log.Warnf(
params.log.Warnf(
"Request at %d with length %d overlaps with next request at %d",
g.fields[i-1].address, g.fields[i-1].length, g.fields[i].address,
)
holeSize = 0
}
needInterrupt := holeSize > params.MaxExtraRegisters // too far apart
needInterrupt = needInterrupt || currentRequest.length+holeSize+g.fields[i].length > params.MaxBatchSize // too large
needInterrupt := holeSize > params.maxExtraRegisters // too far apart
needInterrupt = needInterrupt || currentRequest.length+holeSize+g.fields[i].length > params.maxBatchSize // too large
if !needInterrupt {
// Still safe to add the field to the current request
currentRequest.length = g.fields[i].address + g.fields[i].length - currentRequest.address
@ -181,18 +181,16 @@ func optimitzeGroupWithinLimits(g request, params groupingParams) []request {
type groupingParams struct {
// Maximum size of a request in registers
MaxBatchSize uint16
// Optimization to use for grouping register groups to requests.
// Also put potential optimization parameters here
Optimization string
MaxExtraRegisters uint16
// Will force reads to start at zero (if possible) while respecting
// the max-batch size.
EnforceFromZero bool
// Tags to add for the requests
Tags map[string]string
// Log facility to inform the user
Log telegraf.Logger
maxBatchSize uint16
// optimization to use for grouping register groups to requests, Also put potential optimization parameters here
optimization string
maxExtraRegisters uint16
// Will force reads to start at zero (if possible) while respecting the max-batch size.
enforceFromZero bool
// tags to add for the requests
tags map[string]string
// log facility to inform the user
log telegraf.Logger
}
func groupFieldsToRequests(fields []field, params groupingParams) []request {
@ -216,9 +214,9 @@ func groupFieldsToRequests(fields []field, params groupingParams) []request {
for _, f := range fields {
// Add tags from higher up
if f.tags == nil {
f.tags = make(map[string]string, len(params.Tags))
f.tags = make(map[string]string, len(params.tags))
}
for k, v := range params.Tags {
for k, v := range params.tags {
f.tags[k] = v
}
@ -253,18 +251,18 @@ func groupFieldsToRequests(fields []field, params groupingParams) []request {
}
// Enforce the first read to start at zero if the option is set
if params.EnforceFromZero {
if params.enforceFromZero {
groups[0].length += groups[0].address
groups[0].address = 0
}
var requests []request
switch params.Optimization {
switch params.optimization {
case "shrink":
// Shrink request by striping leading and trailing fields with an omit flag set
for _, g := range groups {
if len(g.fields) > 0 {
requests = append(requests, shrinkGroup(g, params.MaxBatchSize)...)
requests = append(requests, shrinkGroup(g, params.maxBatchSize)...)
}
}
case "rearrange":
@ -272,7 +270,7 @@ func groupFieldsToRequests(fields []field, params groupingParams) []request {
// registers while keeping the number of requests
for _, g := range groups {
if len(g.fields) > 0 {
requests = append(requests, optimizeGroup(g, params.MaxBatchSize)...)
requests = append(requests, optimizeGroup(g, params.maxBatchSize)...)
}
}
case "aggressive":
@ -284,7 +282,7 @@ func groupFieldsToRequests(fields []field, params groupingParams) []request {
total.fields = append(total.fields, g.fields...)
}
}
requests = optimizeGroup(total, params.MaxBatchSize)
requests = optimizeGroup(total, params.maxBatchSize)
case "max_insert":
// Similar to aggressive but keeps the number of touched registers below a threshold
var total request
@ -293,12 +291,12 @@ func groupFieldsToRequests(fields []field, params groupingParams) []request {
total.fields = append(total.fields, g.fields...)
}
}
requests = optimitzeGroupWithinLimits(total, params)
requests = optimizeGroupWithinLimits(total, params)
default:
// no optimization
for _, g := range groups {
if len(g.fields) > 0 {
requests = append(requests, splitMaxBatchSize(g, params.MaxBatchSize)...)
requests = append(requests, splitMaxBatchSize(g, params.maxBatchSize)...)
}
}
}

View File

@ -26,26 +26,26 @@ import (
//go:embed sample.conf
var sampleConfig string
var DisconnectedServersBehaviors = []string{"error", "skip"}
var disconnectedServersBehaviors = []string{"error", "skip"}
type MongoDB struct {
Servers []string
Ssl Ssl
GatherClusterStatus bool
GatherPerdbStats bool
GatherColStats bool
GatherTopStat bool
DisconnectedServersBehavior string
ColStatsDbs []string
Servers []string `toml:"servers"`
GatherClusterStatus bool `toml:"gather_cluster_status"`
GatherPerdbStats bool `toml:"gather_perdb_stats"`
GatherColStats bool `toml:"gather_col_stats"`
GatherTopStat bool `toml:"gather_top_stat"`
DisconnectedServersBehavior string `toml:"disconnected_servers_behavior"`
ColStatsDbs []string `toml:"col_stats_dbs"`
common_tls.ClientConfig
Ssl ssl
Log telegraf.Logger `toml:"-"`
clients []*Server
clients []*server
tlsConfig *tls.Config
}
type Ssl struct {
type ssl struct {
Enabled bool `toml:"ssl_enabled" deprecated:"1.3.0;1.35.0;use 'tls_*' options instead"`
CaCerts []string `toml:"cacerts" deprecated:"1.3.0;1.35.0;use 'tls_ca' instead"`
}
@ -59,7 +59,7 @@ func (m *MongoDB) Init() error {
m.DisconnectedServersBehavior = "error"
}
if err := choice.Check(m.DisconnectedServersBehavior, DisconnectedServersBehaviors); err != nil {
if err := choice.Check(m.DisconnectedServersBehavior, disconnectedServersBehaviors); err != nil {
return fmt.Errorf("disconnected_servers_behavior: %w", err)
}
@ -105,6 +105,41 @@ func (m *MongoDB) Start(telegraf.Accumulator) error {
return nil
}
func (m *MongoDB) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
for _, client := range m.clients {
wg.Add(1)
go func(srv *server) {
defer wg.Done()
if m.DisconnectedServersBehavior == "skip" {
if err := srv.ping(); err != nil {
m.Log.Debugf("Failed to ping server: %s", err)
return
}
}
err := srv.gatherData(acc, m.GatherClusterStatus, m.GatherPerdbStats, m.GatherColStats, m.GatherTopStat, m.ColStatsDbs)
if err != nil {
m.Log.Errorf("Failed to gather data: %s", err)
}
}(client)
}
wg.Wait()
return nil
}
// Stop disconnects mongo connections when stop or reload
func (m *MongoDB) Stop() {
for _, server := range m.clients {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
if err := server.client.Disconnect(ctx); err != nil {
m.Log.Errorf("Disconnecting from %q failed: %v", server.hostname, err)
}
cancel()
}
}
func (m *MongoDB) setupConnection(connURL string) error {
if !strings.HasPrefix(connURL, "mongodb://") && !strings.HasPrefix(connURL, "mongodb+srv://") {
// Preserve backwards compatibility for hostnames without a
@ -143,52 +178,15 @@ func (m *MongoDB) setupConnection(connURL string) error {
m.Log.Errorf("Unable to ping MongoDB: %s", err)
}
server := &Server{
server := &server{
client: client,
hostname: u.Host,
Log: m.Log,
log: m.Log,
}
m.clients = append(m.clients, server)
return nil
}
// Stop disconnect mongo connections when stop or reload
func (m *MongoDB) Stop() {
for _, server := range m.clients {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
if err := server.client.Disconnect(ctx); err != nil {
m.Log.Errorf("Disconnecting from %q failed: %v", server.hostname, err)
}
cancel()
}
}
// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (m *MongoDB) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
for _, client := range m.clients {
wg.Add(1)
go func(srv *Server) {
defer wg.Done()
if m.DisconnectedServersBehavior == "skip" {
if err := srv.ping(); err != nil {
m.Log.Debugf("Failed to ping server: %s", err)
return
}
}
err := srv.gatherData(acc, m.GatherClusterStatus, m.GatherPerdbStats, m.GatherColStats, m.GatherTopStat, m.ColStatsDbs)
if err != nil {
m.Log.Errorf("Failed to gather data: %s", err)
}
}(client)
}
wg.Wait()
return nil
}
func init() {
inputs.Add("mongodb", func() telegraf.Input {
return &MongoDB{

View File

@ -8,29 +8,29 @@ import (
"github.com/influxdata/telegraf"
)
type MongodbData struct {
type mongodbData struct {
StatLine *statLine
Fields map[string]interface{}
Tags map[string]string
DbData []DbData
ColData []ColData
ShardHostData []DbData
TopStatsData []DbData
DbData []bbData
ColData []colData
ShardHostData []bbData
TopStatsData []bbData
}
type DbData struct {
type bbData struct {
Name string
Fields map[string]interface{}
}
type ColData struct {
type colData struct {
Name string
DbName string
Fields map[string]interface{}
}
func NewMongodbData(statLine *statLine, tags map[string]string) *MongodbData {
return &MongodbData{
func newMongodbData(statLine *statLine, tags map[string]string) *mongodbData {
return &mongodbData{
StatLine: statLine,
Tags: tags,
Fields: make(map[string]interface{}),
@ -297,11 +297,11 @@ var topDataStats = map[string]string{
"commands_count": "CommandsCount",
}
func (d *MongodbData) AddDbStats() {
func (d *mongodbData) addDbStats() {
for i := range d.StatLine.DbStatsLines {
dbstat := d.StatLine.DbStatsLines[i]
dbStatLine := reflect.ValueOf(&dbstat).Elem()
newDbData := &DbData{
newDbData := &bbData{
Name: dbstat.Name,
Fields: make(map[string]interface{}),
}
@ -314,11 +314,11 @@ func (d *MongodbData) AddDbStats() {
}
}
func (d *MongodbData) AddColStats() {
func (d *mongodbData) addColStats() {
for i := range d.StatLine.ColStatsLines {
colstat := d.StatLine.ColStatsLines[i]
colStatLine := reflect.ValueOf(&colstat).Elem()
newColData := &ColData{
newColData := &colData{
Name: colstat.Name,
DbName: colstat.DbName,
Fields: make(map[string]interface{}),
@ -332,11 +332,11 @@ func (d *MongodbData) AddColStats() {
}
}
func (d *MongodbData) AddShardHostStats() {
func (d *mongodbData) addShardHostStats() {
for host := range d.StatLine.ShardHostStatsLines {
hostStat := d.StatLine.ShardHostStatsLines[host]
hostStatLine := reflect.ValueOf(&hostStat).Elem()
newDbData := &DbData{
newDbData := &bbData{
Name: host,
Fields: make(map[string]interface{}),
}
@ -349,11 +349,11 @@ func (d *MongodbData) AddShardHostStats() {
}
}
func (d *MongodbData) AddTopStats() {
func (d *mongodbData) addTopStats() {
for i := range d.StatLine.TopStatLines {
topStat := d.StatLine.TopStatLines[i]
topStatLine := reflect.ValueOf(&topStat).Elem()
newTopStatData := &DbData{
newTopStatData := &bbData{
Name: topStat.CollectionName,
Fields: make(map[string]interface{}),
}
@ -366,7 +366,7 @@ func (d *MongodbData) AddTopStats() {
}
}
func (d *MongodbData) AddDefaultStats() {
func (d *mongodbData) addDefaultStats() {
statLine := reflect.ValueOf(d.StatLine).Elem()
d.addStat(statLine, defaultStats)
if d.StatLine.NodeType != "" {
@ -414,18 +414,18 @@ func (d *MongodbData) AddDefaultStats() {
}
}
func (d *MongodbData) addStat(statLine reflect.Value, stats map[string]string) {
func (d *mongodbData) addStat(statLine reflect.Value, stats map[string]string) {
for key, value := range stats {
val := statLine.FieldByName(value).Interface()
d.add(key, val)
}
}
func (d *MongodbData) add(key string, val interface{}) {
func (d *mongodbData) add(key string, val interface{}) {
d.Fields[key] = val
}
func (d *MongodbData) flush(acc telegraf.Accumulator) {
func (d *mongodbData) flush(acc telegraf.Accumulator) {
acc.AddFields(
"mongodb",
d.Fields,

View File

@ -13,7 +13,7 @@ import (
var tags = make(map[string]string)
func TestAddNonReplStats(t *testing.T) {
d := NewMongodbData(
d := newMongodbData(
&statLine{
StorageEngine: "",
Time: time.Now(),
@ -62,7 +62,7 @@ func TestAddNonReplStats(t *testing.T) {
)
var acc testutil.Accumulator
d.AddDefaultStats()
d.addDefaultStats()
d.flush(&acc)
for key := range defaultStats {
@ -71,7 +71,7 @@ func TestAddNonReplStats(t *testing.T) {
}
func TestAddReplStats(t *testing.T) {
d := NewMongodbData(
d := newMongodbData(
&statLine{
StorageEngine: "mmapv1",
Mapped: 0,
@ -83,7 +83,7 @@ func TestAddReplStats(t *testing.T) {
var acc testutil.Accumulator
d.AddDefaultStats()
d.addDefaultStats()
d.flush(&acc)
for key := range mmapStats {
@ -92,7 +92,7 @@ func TestAddReplStats(t *testing.T) {
}
func TestAddWiredTigerStats(t *testing.T) {
d := NewMongodbData(
d := newMongodbData(
&statLine{
StorageEngine: "wiredTiger",
CacheDirtyPercent: 0,
@ -124,7 +124,7 @@ func TestAddWiredTigerStats(t *testing.T) {
var acc testutil.Accumulator
d.AddDefaultStats()
d.addDefaultStats()
d.flush(&acc)
for key := range wiredTigerStats {
@ -139,7 +139,7 @@ func TestAddWiredTigerStats(t *testing.T) {
}
func TestAddShardStats(t *testing.T) {
d := NewMongodbData(
d := newMongodbData(
&statLine{
TotalInUse: 0,
TotalAvailable: 0,
@ -151,7 +151,7 @@ func TestAddShardStats(t *testing.T) {
var acc testutil.Accumulator
d.AddDefaultStats()
d.addDefaultStats()
d.flush(&acc)
for key := range defaultShardStats {
@ -160,7 +160,7 @@ func TestAddShardStats(t *testing.T) {
}
func TestAddLatencyStats(t *testing.T) {
d := NewMongodbData(
d := newMongodbData(
&statLine{
CommandOpsCnt: 73,
CommandLatency: 364,
@ -174,7 +174,7 @@ func TestAddLatencyStats(t *testing.T) {
var acc testutil.Accumulator
d.AddDefaultStats()
d.addDefaultStats()
d.flush(&acc)
for key := range defaultLatencyStats {
@ -183,7 +183,7 @@ func TestAddLatencyStats(t *testing.T) {
}
func TestAddAssertsStats(t *testing.T) {
d := NewMongodbData(
d := newMongodbData(
&statLine{
Regular: 3,
Warning: 9,
@ -196,7 +196,7 @@ func TestAddAssertsStats(t *testing.T) {
var acc testutil.Accumulator
d.AddDefaultStats()
d.addDefaultStats()
d.flush(&acc)
for key := range defaultAssertsStats {
@ -205,7 +205,7 @@ func TestAddAssertsStats(t *testing.T) {
}
func TestAddCommandsStats(t *testing.T) {
d := NewMongodbData(
d := newMongodbData(
&statLine{
AggregateCommandTotal: 12,
AggregateCommandFailed: 2,
@ -231,7 +231,7 @@ func TestAddCommandsStats(t *testing.T) {
var acc testutil.Accumulator
d.AddDefaultStats()
d.addDefaultStats()
d.flush(&acc)
for key := range defaultCommandsStats {
@ -240,7 +240,7 @@ func TestAddCommandsStats(t *testing.T) {
}
func TestAddTCMallocStats(t *testing.T) {
d := NewMongodbData(
d := newMongodbData(
&statLine{
TCMallocCurrentAllocatedBytes: 5877253096,
TCMallocHeapSize: 8067108864,
@ -267,7 +267,7 @@ func TestAddTCMallocStats(t *testing.T) {
var acc testutil.Accumulator
d.AddDefaultStats()
d.addDefaultStats()
d.flush(&acc)
for key := range defaultTCMallocStats {
@ -276,7 +276,7 @@ func TestAddTCMallocStats(t *testing.T) {
}
func TestAddStorageStats(t *testing.T) {
d := NewMongodbData(
d := newMongodbData(
&statLine{
StorageFreelistSearchBucketExhausted: 0,
StorageFreelistSearchRequests: 0,
@ -287,7 +287,7 @@ func TestAddStorageStats(t *testing.T) {
var acc testutil.Accumulator
d.AddDefaultStats()
d.addDefaultStats()
d.flush(&acc)
for key := range defaultStorageStats {
@ -307,7 +307,7 @@ func TestAddShardHostStats(t *testing.T) {
}
}
d := NewMongodbData(
d := newMongodbData(
&statLine{
ShardHostStatsLines: hostStatLines,
},
@ -315,7 +315,7 @@ func TestAddShardHostStats(t *testing.T) {
)
var acc testutil.Accumulator
d.AddShardHostStats()
d.addShardHostStats()
d.flush(&acc)
hostsFound := make([]string, 0, len(hostStatLines))
@ -333,7 +333,7 @@ func TestAddShardHostStats(t *testing.T) {
}
func TestStateTag(t *testing.T) {
d := NewMongodbData(
d := newMongodbData(
&statLine{
StorageEngine: "",
Time: time.Now(),
@ -353,7 +353,7 @@ func TestStateTag(t *testing.T) {
var acc testutil.Accumulator
d.AddDefaultStats()
d.addDefaultStats()
d.flush(&acc)
fields := map[string]interface{}{
"active_reads": int64(0),
@ -524,7 +524,7 @@ func TestAddTopStats(t *testing.T) {
topStatLines = append(topStatLines, topStatLine)
}
d := NewMongodbData(
d := newMongodbData(
&statLine{
TopStatLines: topStatLines,
},
@ -532,7 +532,7 @@ func TestAddTopStats(t *testing.T) {
)
var acc testutil.Accumulator
d.AddTopStats()
d.addTopStats()
d.flush(&acc)
for range topStatLines {

View File

@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"slices"
"strconv"
"strings"
"time"
@ -16,44 +17,44 @@ import (
"github.com/influxdata/telegraf"
)
type Server struct {
type server struct {
client *mongo.Client
hostname string
lastResult *mongoStatus
Log telegraf.Logger
}
func (s *Server) getDefaultTags() map[string]string {
tags := make(map[string]string)
tags["hostname"] = s.hostname
return tags
log telegraf.Logger
}
type oplogEntry struct {
Timestamp primitive.Timestamp `bson:"ts"`
}
func IsAuthorization(err error) bool {
func isAuthorization(err error) bool {
return strings.Contains(err.Error(), "not authorized")
}
func (s *Server) ping() error {
func (s *server) getDefaultTags() map[string]string {
tags := make(map[string]string)
tags["hostname"] = s.hostname
return tags
}
func (s *server) ping() error {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
return s.client.Ping(ctx, nil)
}
func (s *Server) authLog(err error) {
if IsAuthorization(err) {
s.Log.Debug(err.Error())
func (s *server) authLog(err error) {
if isAuthorization(err) {
s.log.Debug(err.Error())
} else {
s.Log.Error(err.Error())
s.log.Error(err.Error())
}
}
func (s *Server) runCommand(database string, cmd, result interface{}) error {
func (s *server) runCommand(database string, cmd, result interface{}) error {
r := s.client.Database(database).RunCommand(context.Background(), cmd)
if r.Err() != nil {
return r.Err()
@ -61,7 +62,7 @@ func (s *Server) runCommand(database string, cmd, result interface{}) error {
return r.Decode(result)
}
func (s *Server) gatherServerStatus() (*serverStatus, error) {
func (s *server) gatherServerStatus() (*serverStatus, error) {
serverStatus := &serverStatus{}
err := s.runCommand("admin", bson.D{
{
@ -79,7 +80,7 @@ func (s *Server) gatherServerStatus() (*serverStatus, error) {
return serverStatus, nil
}
func (s *Server) gatherReplSetStatus() (*replSetStatus, error) {
func (s *server) gatherReplSetStatus() (*replSetStatus, error) {
replSetStatus := &replSetStatus{}
err := s.runCommand("admin", bson.D{
{
@ -93,7 +94,7 @@ func (s *Server) gatherReplSetStatus() (*replSetStatus, error) {
return replSetStatus, nil
}
func (s *Server) gatherTopStatData() (*topStats, error) {
func (s *server) gatherTopStatData() (*topStats, error) {
var dest map[string]interface{}
err := s.runCommand("admin", bson.D{
{
@ -124,7 +125,7 @@ func (s *Server) gatherTopStatData() (*topStats, error) {
return &topStats{Totals: topInfo}, nil
}
func (s *Server) gatherClusterStatus() (*clusterStatus, error) {
func (s *server) gatherClusterStatus() (*clusterStatus, error) {
chunkCount, err := s.client.Database("config").Collection("chunks").CountDocuments(context.Background(), bson.M{"jumbo": true})
if err != nil {
return nil, err
@ -148,7 +149,7 @@ func poolStatsCommand(version string) (string, error) {
return "shardConnPoolStats", nil
}
func (s *Server) gatherShardConnPoolStats(version string) (*shardStats, error) {
func (s *server) gatherShardConnPoolStats(version string) (*shardStats, error) {
command, err := poolStatsCommand(version)
if err != nil {
return nil, err
@ -167,7 +168,7 @@ func (s *Server) gatherShardConnPoolStats(version string) (*shardStats, error) {
return shardStats, nil
}
func (s *Server) gatherDBStats(name string) (*db, error) {
func (s *server) gatherDBStats(name string) (*db, error) {
stats := &dbStatsData{}
err := s.runCommand(name, bson.D{
{
@ -185,7 +186,7 @@ func (s *Server) gatherDBStats(name string) (*db, error) {
}, nil
}
func (s *Server) getOplogReplLag(collection string) (*oplogStats, error) {
func (s *server) getOplogReplLag(collection string) (*oplogStats, error) {
query := bson.M{"ts": bson.M{"$exists": true}}
var first oplogEntry
@ -219,7 +220,7 @@ func (s *Server) getOplogReplLag(collection string) (*oplogStats, error) {
// The "oplog.$main" collection is created on the master node of a
// master-slave replicated deployment. As of MongoDB 3.2, master-slave
// replication has been deprecated.
func (s *Server) gatherOplogStats() (*oplogStats, error) {
func (s *server) gatherOplogStats() (*oplogStats, error) {
stats, err := s.getOplogReplLag("oplog.rs")
if err == nil {
return stats, nil
@ -228,7 +229,7 @@ func (s *Server) gatherOplogStats() (*oplogStats, error) {
return s.getOplogReplLag("oplog.$main")
}
func (s *Server) gatherCollectionStats(colStatsDbs []string) (*colStats, error) {
func (s *server) gatherCollectionStats(colStatsDbs []string) (*colStats, error) {
names, err := s.client.ListDatabaseNames(context.Background(), bson.D{})
if err != nil {
return nil, err
@ -236,14 +237,14 @@ func (s *Server) gatherCollectionStats(colStatsDbs []string) (*colStats, error)
results := &colStats{}
for _, dbName := range names {
if stringInSlice(dbName, colStatsDbs) || len(colStatsDbs) == 0 {
if slices.Contains(colStatsDbs, dbName) || len(colStatsDbs) == 0 {
// skip views as they fail on collStats below
filter := bson.M{"type": bson.M{"$in": bson.A{"collection", "timeseries"}}}
var colls []string
colls, err = s.client.Database(dbName).ListCollectionNames(context.Background(), filter)
if err != nil {
s.Log.Errorf("Error getting collection names: %s", err.Error())
s.log.Errorf("Error getting collection names: %s", err.Error())
continue
}
for _, colName := range colls {
@ -270,7 +271,7 @@ func (s *Server) gatherCollectionStats(colStatsDbs []string) (*colStats, error)
return results, nil
}
func (s *Server) gatherData(acc telegraf.Accumulator, gatherClusterStatus, gatherDbStats, gatherColStats, gatherTopStat bool, colStatsDbs []string) error {
func (s *server) gatherData(acc telegraf.Accumulator, gatherClusterStatus, gatherDbStats, gatherColStats, gatherTopStat bool, colStatsDbs []string) error {
serverStatus, err := s.gatherServerStatus()
if err != nil {
return err
@ -280,7 +281,7 @@ func (s *Server) gatherData(acc telegraf.Accumulator, gatherClusterStatus, gathe
// member of a replica set.
replSetStatus, err := s.gatherReplSetStatus()
if err != nil {
s.Log.Debugf("Unable to gather replica set status: %s", err.Error())
s.log.Debugf("Unable to gather replica set status: %s", err.Error())
}
// Gather the oplog if we are a member of a replica set. Non-replica set
@ -297,7 +298,7 @@ func (s *Server) gatherData(acc telegraf.Accumulator, gatherClusterStatus, gathe
if gatherClusterStatus {
status, err := s.gatherClusterStatus()
if err != nil {
s.Log.Debugf("Unable to gather cluster status: %s", err.Error())
s.log.Debugf("Unable to gather cluster status: %s", err.Error())
}
clusterStatus = status
}
@ -326,7 +327,7 @@ func (s *Server) gatherData(acc telegraf.Accumulator, gatherClusterStatus, gathe
for _, name := range names {
db, err := s.gatherDBStats(name)
if err != nil {
s.Log.Debugf("Error getting db stats from %q: %s", name, err.Error())
s.log.Debugf("Error getting db stats from %q: %s", name, err.Error())
}
dbStats.Dbs = append(dbStats.Dbs, *db)
}
@ -336,7 +337,7 @@ func (s *Server) gatherData(acc telegraf.Accumulator, gatherClusterStatus, gathe
if gatherTopStat {
topStats, err := s.gatherTopStatData()
if err != nil {
s.Log.Debugf("Unable to gather top stat data: %s", err.Error())
s.log.Debugf("Unable to gather top stat data: %s", err.Error())
return err
}
topStatData = topStats
@ -360,27 +361,18 @@ func (s *Server) gatherData(acc telegraf.Accumulator, gatherClusterStatus, gathe
if durationInSeconds == 0 {
durationInSeconds = 1
}
data := NewMongodbData(
NewStatLine(*s.lastResult, *result, s.hostname, true, durationInSeconds),
data := newMongodbData(
newStatLine(*s.lastResult, *result, s.hostname, durationInSeconds),
s.getDefaultTags(),
)
data.AddDefaultStats()
data.AddDbStats()
data.AddColStats()
data.AddShardHostStats()
data.AddTopStats()
data.addDefaultStats()
data.addDbStats()
data.addColStats()
data.addShardHostStats()
data.addTopStats()
data.flush(acc)
}
s.lastResult = result
return nil
}
func stringInSlice(a string, list []string) bool {
for _, b := range list {
if b == a {
return true
}
}
return false
}

View File

@ -11,15 +11,15 @@ import (
"github.com/influxdata/telegraf/testutil"
)
var ServicePort = "27017"
var servicePort = "27017"
var unreachableMongoEndpoint = "mongodb://user:pass@127.0.0.1:27017/nop"
func createTestServer(t *testing.T) *testutil.Container {
container := testutil.Container{
Image: "mongo",
ExposedPorts: []string{ServicePort},
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForAll(
wait.NewHTTPStrategy("/").WithPort(nat.Port(ServicePort)),
wait.NewHTTPStrategy("/").WithPort(nat.Port(servicePort)),
wait.ForLog("Waiting for connections"),
),
}
@ -40,7 +40,7 @@ func TestGetDefaultTagsIntegration(t *testing.T) {
m := &MongoDB{
Log: testutil.Logger{},
Servers: []string{
fmt.Sprintf("mongodb://%s:%s", container.Address, container.Ports[ServicePort]),
fmt.Sprintf("mongodb://%s:%s", container.Address, container.Ports[servicePort]),
},
}
err := m.Init()
@ -76,7 +76,7 @@ func TestAddDefaultStatsIntegration(t *testing.T) {
m := &MongoDB{
Log: testutil.Logger{},
Servers: []string{
fmt.Sprintf("mongodb://%s:%s", container.Address, container.Ports[ServicePort]),
fmt.Sprintf("mongodb://%s:%s", container.Address, container.Ports[servicePort]),
},
}
err := m.Init()

View File

@ -12,18 +12,7 @@ import (
)
const (
MongosProcess = "mongos"
)
// Flags to determine cases when to activate/deactivate columns for output.
const (
Always = 1 << iota // always activate the column
Discover // only active when mongostat is in discover mode
Repl // only active if one of the nodes being monitored is in a replset
Locks // only active if node is capable of calculating lock info
AllOnly // only active if mongostat was run with --all option
MMAPOnly // only active if node has mmap-specific fields
WTOnly // only active if node has wiredtiger-specific fields
mongosProcess = "mongos"
)
type mongoStatus struct {
@ -557,48 +546,6 @@ type storageStats struct {
FreelistSearchScanned int64 `bson:"freelist.search.scanned"`
}
// statHeader describes a single column for mongostat's terminal output, its formatting, and in which modes it should be displayed.
type statHeader struct {
// The text to appear in the column's header cell
HeaderText string
// Bitmask containing flags to determine if this header is active or not
ActivateFlags int
}
// StatHeaders are the complete set of data metrics supported by mongostat.
var StatHeaders = []statHeader{
{"", Always}, // placeholder for hostname column (blank header text)
{"insert", Always},
{"query", Always},
{"update", Always},
{"delete", Always},
{"getmore", Always},
{"command", Always},
{"% dirty", WTOnly},
{"% used", WTOnly},
{"flushes", Always},
{"mapped", MMAPOnly},
{"vsize", Always},
{"res", Always},
{"non-mapped", MMAPOnly | AllOnly},
{"faults", MMAPOnly},
{"lr|lw %", MMAPOnly | AllOnly},
{"lrt|lwt", MMAPOnly | AllOnly},
{" locked db", Locks},
{"qr|qw", Always},
{"ar|aw", Always},
{"netIn", Always},
{"netOut", Always},
{"conn", Always},
{"set", Repl},
{"repl", Repl},
{"time", Always},
}
// NamespacedLocks stores information on the lockStatus of namespaces.
type NamespacedLocks map[string]lockStatus
// lockUsage stores information related to a namespace's lock usage.
type lockUsage struct {
Namespace string
@ -931,8 +878,8 @@ func diff(newVal, oldVal, sampleTime int64) (avg, newValue int64) {
return d / sampleTime, newVal
}
// NewStatLine constructs a statLine object from two mongoStatus objects.
func NewStatLine(oldMongo, newMongo mongoStatus, key string, all bool, sampleSecs int64) *statLine {
// newStatLine constructs a statLine object from two mongoStatus objects.
func newStatLine(oldMongo, newMongo mongoStatus, key string, sampleSecs int64) *statLine {
oldStat := *oldMongo.ServerStatus
newStat := *newMongo.ServerStatus
@ -1179,7 +1126,7 @@ func NewStatLine(oldMongo, newMongo mongoStatus, key string, all bool, sampleSec
returnVal.Time = newMongo.SampleTime
returnVal.IsMongos =
newStat.ShardCursorType != nil || strings.HasPrefix(newStat.Process, MongosProcess)
newStat.ShardCursorType != nil || strings.HasPrefix(newStat.Process, mongosProcess)
// BEGIN code modification
if oldStat.Mem.Supported.(bool) {
@ -1190,7 +1137,7 @@ func NewStatLine(oldMongo, newMongo mongoStatus, key string, all bool, sampleSec
returnVal.Virtual = newStat.Mem.Virtual
returnVal.Resident = newStat.Mem.Resident
if !returnVal.IsMongos && all {
if !returnVal.IsMongos {
returnVal.NonMapped = newStat.Mem.Virtual - newStat.Mem.Mapped
}
}

View File

@ -7,7 +7,7 @@ import (
)
func TestLatencyStats(t *testing.T) {
sl := NewStatLine(
sl := newStatLine(
mongoStatus{
ServerStatus: &serverStatus{
Connections: &connectionStats{},
@ -49,7 +49,6 @@ func TestLatencyStats(t *testing.T) {
},
},
"foo",
true,
60,
)
@ -62,7 +61,7 @@ func TestLatencyStats(t *testing.T) {
}
func TestLatencyStatsDiffZero(t *testing.T) {
sl := NewStatLine(
sl := newStatLine(
mongoStatus{
ServerStatus: &serverStatus{
Connections: &connectionStats{},
@ -118,7 +117,6 @@ func TestLatencyStatsDiffZero(t *testing.T) {
},
},
"foo",
true,
60,
)
@ -131,7 +129,7 @@ func TestLatencyStatsDiffZero(t *testing.T) {
}
func TestLatencyStatsDiff(t *testing.T) {
sl := NewStatLine(
sl := newStatLine(
mongoStatus{
ServerStatus: &serverStatus{
Connections: &connectionStats{},
@ -187,7 +185,6 @@ func TestLatencyStatsDiff(t *testing.T) {
},
},
"foo",
true,
60,
)
@ -200,7 +197,7 @@ func TestLatencyStatsDiff(t *testing.T) {
}
func TestLocksStatsNilWhenLocksMissingInOldStat(t *testing.T) {
sl := NewStatLine(
sl := newStatLine(
mongoStatus{
ServerStatus: &serverStatus{
Connections: &connectionStats{},
@ -223,7 +220,6 @@ func TestLocksStatsNilWhenLocksMissingInOldStat(t *testing.T) {
},
},
"foo",
true,
60,
)
@ -231,7 +227,7 @@ func TestLocksStatsNilWhenLocksMissingInOldStat(t *testing.T) {
}
func TestLocksStatsNilWhenGlobalLockStatsMissingInOldStat(t *testing.T) {
sl := NewStatLine(
sl := newStatLine(
mongoStatus{
ServerStatus: &serverStatus{
Connections: &connectionStats{},
@ -255,7 +251,6 @@ func TestLocksStatsNilWhenGlobalLockStatsMissingInOldStat(t *testing.T) {
},
},
"foo",
true,
60,
)
@ -263,7 +258,7 @@ func TestLocksStatsNilWhenGlobalLockStatsMissingInOldStat(t *testing.T) {
}
func TestLocksStatsNilWhenGlobalLockStatsEmptyInOldStat(t *testing.T) {
sl := NewStatLine(
sl := newStatLine(
mongoStatus{
ServerStatus: &serverStatus{
Connections: &connectionStats{},
@ -289,7 +284,6 @@ func TestLocksStatsNilWhenGlobalLockStatsEmptyInOldStat(t *testing.T) {
},
},
"foo",
true,
60,
)
@ -297,7 +291,7 @@ func TestLocksStatsNilWhenGlobalLockStatsEmptyInOldStat(t *testing.T) {
}
func TestLocksStatsNilWhenCollectionLockStatsMissingInOldStat(t *testing.T) {
sl := NewStatLine(
sl := newStatLine(
mongoStatus{
ServerStatus: &serverStatus{
Connections: &connectionStats{},
@ -325,7 +319,6 @@ func TestLocksStatsNilWhenCollectionLockStatsMissingInOldStat(t *testing.T) {
},
},
"foo",
true,
60,
)
@ -333,7 +326,7 @@ func TestLocksStatsNilWhenCollectionLockStatsMissingInOldStat(t *testing.T) {
}
func TestLocksStatsNilWhenCollectionLockStatsEmptyInOldStat(t *testing.T) {
sl := NewStatLine(
sl := newStatLine(
mongoStatus{
ServerStatus: &serverStatus{
Connections: &connectionStats{},
@ -362,7 +355,6 @@ func TestLocksStatsNilWhenCollectionLockStatsEmptyInOldStat(t *testing.T) {
},
},
"foo",
true,
60,
)
@ -370,7 +362,7 @@ func TestLocksStatsNilWhenCollectionLockStatsEmptyInOldStat(t *testing.T) {
}
func TestLocksStatsNilWhenLocksMissingInNewStat(t *testing.T) {
sl := NewStatLine(
sl := newStatLine(
mongoStatus{
ServerStatus: &serverStatus{
Connections: &connectionStats{},
@ -393,7 +385,6 @@ func TestLocksStatsNilWhenLocksMissingInNewStat(t *testing.T) {
},
},
"foo",
true,
60,
)
@ -401,7 +392,7 @@ func TestLocksStatsNilWhenLocksMissingInNewStat(t *testing.T) {
}
func TestLocksStatsNilWhenGlobalLockStatsMissingInNewStat(t *testing.T) {
sl := NewStatLine(
sl := newStatLine(
mongoStatus{
ServerStatus: &serverStatus{
Connections: &connectionStats{},
@ -425,7 +416,6 @@ func TestLocksStatsNilWhenGlobalLockStatsMissingInNewStat(t *testing.T) {
},
},
"foo",
true,
60,
)
@ -433,7 +423,7 @@ func TestLocksStatsNilWhenGlobalLockStatsMissingInNewStat(t *testing.T) {
}
func TestLocksStatsNilWhenGlobalLockStatsEmptyInNewStat(t *testing.T) {
sl := NewStatLine(
sl := newStatLine(
mongoStatus{
ServerStatus: &serverStatus{
Connections: &connectionStats{},
@ -459,7 +449,6 @@ func TestLocksStatsNilWhenGlobalLockStatsEmptyInNewStat(t *testing.T) {
},
},
"foo",
true,
60,
)
@ -467,7 +456,7 @@ func TestLocksStatsNilWhenGlobalLockStatsEmptyInNewStat(t *testing.T) {
}
func TestLocksStatsNilWhenCollectionLockStatsMissingInNewStat(t *testing.T) {
sl := NewStatLine(
sl := newStatLine(
mongoStatus{
ServerStatus: &serverStatus{
Connections: &connectionStats{},
@ -495,7 +484,6 @@ func TestLocksStatsNilWhenCollectionLockStatsMissingInNewStat(t *testing.T) {
},
},
"foo",
true,
60,
)
@ -503,7 +491,7 @@ func TestLocksStatsNilWhenCollectionLockStatsMissingInNewStat(t *testing.T) {
}
func TestLocksStatsNilWhenCollectionLockStatsEmptyInNewStat(t *testing.T) {
sl := NewStatLine(
sl := newStatLine(
mongoStatus{
ServerStatus: &serverStatus{
Connections: &connectionStats{},
@ -532,7 +520,6 @@ func TestLocksStatsNilWhenCollectionLockStatsEmptyInNewStat(t *testing.T) {
},
},
"foo",
true,
60,
)
@ -540,7 +527,7 @@ func TestLocksStatsNilWhenCollectionLockStatsEmptyInNewStat(t *testing.T) {
}
func TestLocksStatsPopulated(t *testing.T) {
sl := NewStatLine(
sl := newStatLine(
mongoStatus{
ServerStatus: &serverStatus{
Connections: &connectionStats{},
@ -596,7 +583,6 @@ func TestLocksStatsPopulated(t *testing.T) {
},
},
"foo",
true,
60,
)

View File

@ -19,6 +19,8 @@ import (
//go:embed sample.conf
var sampleConfig string
var pendingActions = []string{"ignore", "alert", "restart", "stop", "exec", "unmonitor", "start", "monitor"}
const (
fileSystem = "0"
directory = "1"
@ -31,7 +33,14 @@ const (
network = "8"
)
var pendingActions = []string{"ignore", "alert", "restart", "stop", "exec", "unmonitor", "start", "monitor"}
type Monit struct {
Address string `toml:"address"`
Username string `toml:"username"`
Password string `toml:"password"`
Timeout config.Duration `toml:"timeout"`
client http.Client
tls.ClientConfig
}
type status struct {
Server server `xml:"server"`
@ -179,15 +188,6 @@ type system struct {
} `xml:"swap"`
}
type Monit struct {
Address string `toml:"address"`
Username string `toml:"username"`
Password string `toml:"password"`
client http.Client
tls.ClientConfig
Timeout config.Duration `toml:"timeout"`
}
func (*Monit) SampleConfig() string {
return sampleConfig
}

View File

@ -24,32 +24,18 @@ import (
//go:embed sample.conf
var sampleConfig string
var once sync.Once
var (
once sync.Once
// 30 Seconds is the default used by paho.mqtt.golang
defaultConnectionTimeout = config.Duration(30 * time.Second)
defaultMaxUndeliveredMessages = 1000
)
type empty struct{}
type semaphore chan empty
type Client interface {
Connect() mqtt.Token
SubscribeMultiple(filters map[string]byte, callback mqtt.MessageHandler) mqtt.Token
AddRoute(topic string, callback mqtt.MessageHandler)
Disconnect(quiesce uint)
IsConnected() bool
}
type ClientFactory func(o *mqtt.ClientOptions) Client
type MQTTConsumer struct {
Servers []string `toml:"servers"`
Topics []string `toml:"topics"`
TopicTag *string `toml:"topic_tag"`
TopicParserConfig []TopicParsingConfig `toml:"topic_parsing"`
TopicParserConfig []topicParsingConfig `toml:"topic_parsing"`
Username config.Secret `toml:"username"`
Password config.Secret `toml:"password"`
QoS int `toml:"qos"`
@ -64,15 +50,15 @@ type MQTTConsumer struct {
tls.ClientConfig
parser telegraf.Parser
clientFactory ClientFactory
client Client
clientFactory clientFactory
client client
opts *mqtt.ClientOptions
acc telegraf.TrackingAccumulator
sem semaphore
messages map[telegraf.TrackingID]mqtt.Message
messagesMutex sync.Mutex
topicTagParse string
topicParsers []*TopicParser
topicParsers []*topicParser
ctx context.Context
cancel context.CancelFunc
payloadSize selfstat.Stat
@ -80,13 +66,22 @@ type MQTTConsumer struct {
wg sync.WaitGroup
}
type client interface {
Connect() mqtt.Token
SubscribeMultiple(filters map[string]byte, callback mqtt.MessageHandler) mqtt.Token
AddRoute(topic string, callback mqtt.MessageHandler)
Disconnect(quiesce uint)
IsConnected() bool
}
type empty struct{}
type semaphore chan empty
type clientFactory func(o *mqtt.ClientOptions) client
func (*MQTTConsumer) SampleConfig() string {
return sampleConfig
}
func (m *MQTTConsumer) SetParser(parser telegraf.Parser) {
m.parser = parser
}
func (m *MQTTConsumer) Init() error {
if m.ClientTrace {
log := &mqttLogger{m.Log}
@ -116,9 +111,9 @@ func (m *MQTTConsumer) Init() error {
m.opts = opts
m.messages = make(map[telegraf.TrackingID]mqtt.Message)
m.topicParsers = make([]*TopicParser, 0, len(m.TopicParserConfig))
m.topicParsers = make([]*topicParser, 0, len(m.TopicParserConfig))
for _, cfg := range m.TopicParserConfig {
p, err := cfg.NewParser()
p, err := cfg.newParser()
if err != nil {
return fmt.Errorf("config error topic parsing: %w", err)
}
@ -129,6 +124,11 @@ func (m *MQTTConsumer) Init() error {
m.messagesRecv = selfstat.Register("mqtt_consumer", "messages_received", make(map[string]string))
return nil
}
func (m *MQTTConsumer) SetParser(parser telegraf.Parser) {
m.parser = parser
}
func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
m.acc = acc.WithTracking(m.MaxUndeliveredMessages)
m.sem = make(semaphore, m.MaxUndeliveredMessages)
@ -149,6 +149,26 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
return m.connect()
}
func (m *MQTTConsumer) Gather(_ telegraf.Accumulator) error {
if !m.client.IsConnected() {
m.Log.Debugf("Connecting %v", m.Servers)
return m.connect()
}
return nil
}
func (m *MQTTConsumer) Stop() {
if m.client.IsConnected() {
m.Log.Debugf("Disconnecting %v", m.Servers)
m.client.Disconnect(200)
m.Log.Debugf("Disconnected %v", m.Servers)
}
if m.cancel != nil {
m.cancel()
}
}
func (m *MQTTConsumer) connect() error {
m.client = m.clientFactory(m.opts)
// AddRoute sets up the function for handling messages. These need to be
@ -196,6 +216,7 @@ func (m *MQTTConsumer) connect() error {
}
return nil
}
func (m *MQTTConsumer) onConnectionLost(_ mqtt.Client, err error) {
// Should already be disconnected, but make doubly sure
m.client.Disconnect(5)
@ -250,7 +271,7 @@ func (m *MQTTConsumer) onMessage(_ mqtt.Client, msg mqtt.Message) {
metric.AddTag(m.topicTagParse, msg.Topic())
}
for _, p := range m.topicParsers {
if err := p.Parse(metric, msg.Topic()); err != nil {
if err := p.parse(metric, msg.Topic()); err != nil {
if m.PersistentSession {
msg.Ack()
}
@ -265,23 +286,7 @@ func (m *MQTTConsumer) onMessage(_ mqtt.Client, msg mqtt.Message) {
m.messages[id] = msg
m.messagesMutex.Unlock()
}
func (m *MQTTConsumer) Stop() {
if m.client.IsConnected() {
m.Log.Debugf("Disconnecting %v", m.Servers)
m.client.Disconnect(200)
m.Log.Debugf("Disconnected %v", m.Servers)
}
if m.cancel != nil {
m.cancel()
}
}
func (m *MQTTConsumer) Gather(_ telegraf.Accumulator) error {
if !m.client.IsConnected() {
m.Log.Debugf("Connecting %v", m.Servers)
return m.connect()
}
return nil
}
func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
opts := mqtt.NewClientOptions()
opts.ConnectTimeout = time.Duration(m.ConnectionTimeout)
@ -342,7 +347,7 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
return opts, nil
}
func New(factory ClientFactory) *MQTTConsumer {
func newMQTTConsumer(factory clientFactory) *MQTTConsumer {
return &MQTTConsumer{
Servers: []string{"tcp://127.0.0.1:1883"},
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
@ -354,7 +359,7 @@ func New(factory ClientFactory) *MQTTConsumer {
}
func init() {
inputs.Add("mqtt_consumer", func() telegraf.Input {
return New(func(o *mqtt.ClientOptions) Client {
return newMQTTConsumer(func(o *mqtt.ClientOptions) client {
return mqtt.NewClient(o)
})
})

View File

@ -18,11 +18,11 @@ import (
"github.com/influxdata/telegraf/testutil"
)
type FakeClient struct {
ConnectF func() mqtt.Token
SubscribeMultipleF func() mqtt.Token
AddRouteF func(callback mqtt.MessageHandler)
DisconnectF func()
type fakeClient struct {
connectF func() mqtt.Token
subscribeMultipleF func() mqtt.Token
addRouteF func(callback mqtt.MessageHandler)
disconnectF func()
connectCallCount int
subscribeCallCount int
@ -32,75 +32,75 @@ type FakeClient struct {
connected bool
}
func (c *FakeClient) Connect() mqtt.Token {
func (c *fakeClient) Connect() mqtt.Token {
c.connectCallCount++
token := c.ConnectF()
token := c.connectF()
c.connected = token.Error() == nil
return token
}
func (c *FakeClient) SubscribeMultiple(map[string]byte, mqtt.MessageHandler) mqtt.Token {
func (c *fakeClient) SubscribeMultiple(map[string]byte, mqtt.MessageHandler) mqtt.Token {
c.subscribeCallCount++
return c.SubscribeMultipleF()
return c.subscribeMultipleF()
}
func (c *FakeClient) AddRoute(_ string, callback mqtt.MessageHandler) {
func (c *fakeClient) AddRoute(_ string, callback mqtt.MessageHandler) {
c.addRouteCallCount++
c.AddRouteF(callback)
c.addRouteF(callback)
}
func (c *FakeClient) Disconnect(uint) {
func (c *fakeClient) Disconnect(uint) {
c.disconnectCallCount++
c.DisconnectF()
c.disconnectF()
c.connected = false
}
func (c *FakeClient) IsConnected() bool {
func (c *fakeClient) IsConnected() bool {
return c.connected
}
type FakeParser struct{}
type fakeParser struct{}
// FakeParser satisfies telegraf.Parser
var _ telegraf.Parser = &FakeParser{}
// fakeParser satisfies telegraf.Parser
var _ telegraf.Parser = &fakeParser{}
func (p *FakeParser) Parse(_ []byte) ([]telegraf.Metric, error) {
func (p *fakeParser) Parse(_ []byte) ([]telegraf.Metric, error) {
panic("not implemented")
}
func (p *FakeParser) ParseLine(_ string) (telegraf.Metric, error) {
func (p *fakeParser) ParseLine(_ string) (telegraf.Metric, error) {
panic("not implemented")
}
func (p *FakeParser) SetDefaultTags(_ map[string]string) {
func (p *fakeParser) SetDefaultTags(_ map[string]string) {
panic("not implemented")
}
type FakeToken struct {
type fakeToken struct {
sessionPresent bool
complete chan struct{}
}
// FakeToken satisfies mqtt.Token
var _ mqtt.Token = &FakeToken{}
// fakeToken satisfies mqtt.Token
var _ mqtt.Token = &fakeToken{}
func (t *FakeToken) Wait() bool {
func (t *fakeToken) Wait() bool {
return true
}
func (t *FakeToken) WaitTimeout(time.Duration) bool {
func (t *fakeToken) WaitTimeout(time.Duration) bool {
return true
}
func (t *FakeToken) Error() error {
func (t *fakeToken) Error() error {
return nil
}
func (t *FakeToken) SessionPresent() bool {
func (t *fakeToken) SessionPresent() bool {
return t.sessionPresent
}
func (t *FakeToken) Done() <-chan struct{} {
func (t *fakeToken) Done() <-chan struct{} {
return t.complete
}
@ -108,24 +108,24 @@ func (t *FakeToken) Done() <-chan struct{} {
func TestLifecycleSanity(t *testing.T) {
var acc testutil.Accumulator
plugin := New(func(*mqtt.ClientOptions) Client {
return &FakeClient{
ConnectF: func() mqtt.Token {
return &FakeToken{}
plugin := newMQTTConsumer(func(*mqtt.ClientOptions) client {
return &fakeClient{
connectF: func() mqtt.Token {
return &fakeToken{}
},
AddRouteF: func(mqtt.MessageHandler) {
addRouteF: func(mqtt.MessageHandler) {
},
SubscribeMultipleF: func() mqtt.Token {
return &FakeToken{}
subscribeMultipleF: func() mqtt.Token {
return &fakeToken{}
},
DisconnectF: func() {
disconnectF: func() {
},
}
})
plugin.Log = testutil.Logger{}
plugin.Servers = []string{"tcp://127.0.0.1"}
parser := &FakeParser{}
parser := &fakeParser{}
plugin.SetParser(parser)
require.NoError(t, plugin.Init())
@ -138,12 +138,12 @@ func TestLifecycleSanity(t *testing.T) {
func TestRandomClientID(t *testing.T) {
var err error
m1 := New(nil)
m1 := newMQTTConsumer(nil)
m1.Log = testutil.Logger{}
err = m1.Init()
require.NoError(t, err)
m2 := New(nil)
m2 := newMQTTConsumer(nil)
m2.Log = testutil.Logger{}
err = m2.Init()
require.NoError(t, err)
@ -153,7 +153,7 @@ func TestRandomClientID(t *testing.T) {
// PersistentSession requires ClientID
func TestPersistentClientIDFail(t *testing.T) {
plugin := New(nil)
plugin := newMQTTConsumer(nil)
plugin.Log = testutil.Logger{}
plugin.PersistentSession = true
@ -161,36 +161,36 @@ func TestPersistentClientIDFail(t *testing.T) {
require.Error(t, err)
}
type Message struct {
type message struct {
topic string
qos byte
}
func (m *Message) Duplicate() bool {
func (m *message) Duplicate() bool {
panic("not implemented")
}
func (m *Message) Qos() byte {
func (m *message) Qos() byte {
return m.qos
}
func (m *Message) Retained() bool {
func (m *message) Retained() bool {
panic("not implemented")
}
func (m *Message) Topic() string {
func (m *message) Topic() string {
return m.topic
}
func (m *Message) MessageID() uint16 {
func (m *message) MessageID() uint16 {
panic("not implemented")
}
func (m *Message) Payload() []byte {
func (m *message) Payload() []byte {
return []byte("cpu time_idle=42i")
}
func (m *Message) Ack() {
func (m *message) Ack() {
panic("not implemented")
}
@ -200,7 +200,7 @@ func TestTopicTag(t *testing.T) {
topic string
topicTag func() *string
expectedError string
topicParsing []TopicParsingConfig
topicParsing []topicParsingConfig
expected []telegraf.Metric
}{
{
@ -267,7 +267,7 @@ func TestTopicTag(t *testing.T) {
tag := ""
return &tag
},
topicParsing: []TopicParsingConfig{
topicParsing: []topicParsingConfig{
{
Topic: "telegraf/123/test",
Measurement: "_/_/measurement",
@ -299,7 +299,7 @@ func TestTopicTag(t *testing.T) {
tag := ""
return &tag
},
topicParsing: []TopicParsingConfig{
topicParsing: []topicParsingConfig{
{
Topic: "telegraf/+/test/hello",
Measurement: "_/_/measurement/_",
@ -333,7 +333,7 @@ func TestTopicTag(t *testing.T) {
return &tag
},
expectedError: "config error topic parsing: fields length does not equal topic length",
topicParsing: []TopicParsingConfig{
topicParsing: []topicParsingConfig{
{
Topic: "telegraf/+/test/hello",
Measurement: "_/_/measurement/_",
@ -366,7 +366,7 @@ func TestTopicTag(t *testing.T) {
tag := ""
return &tag
},
topicParsing: []TopicParsingConfig{
topicParsing: []topicParsingConfig{
{
Topic: "telegraf/+/test/hello",
Measurement: "_/_/measurement/_",
@ -396,7 +396,7 @@ func TestTopicTag(t *testing.T) {
tag := ""
return &tag
},
topicParsing: []TopicParsingConfig{
topicParsing: []topicParsingConfig{
{
Topic: "telegraf/+/test/hello",
Tags: "testTag/_/_/_",
@ -428,7 +428,7 @@ func TestTopicTag(t *testing.T) {
tag := ""
return &tag
},
topicParsing: []TopicParsingConfig{
topicParsing: []topicParsingConfig{
{
Topic: "/telegraf/+/test/hello",
Measurement: "/_/_/measurement/_",
@ -461,7 +461,7 @@ func TestTopicTag(t *testing.T) {
tag := ""
return &tag
},
topicParsing: []TopicParsingConfig{
topicParsing: []topicParsingConfig{
{
Topic: "/telegraf/#/test/hello",
Measurement: "/#/measurement/_",
@ -495,7 +495,7 @@ func TestTopicTag(t *testing.T) {
tag := ""
return &tag
},
topicParsing: []TopicParsingConfig{
topicParsing: []topicParsingConfig{
{
Topic: "/telegraf/#",
Measurement: "/#/measurement/_",
@ -521,22 +521,22 @@ func TestTopicTag(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var handler mqtt.MessageHandler
client := &FakeClient{
ConnectF: func() mqtt.Token {
return &FakeToken{}
fClient := &fakeClient{
connectF: func() mqtt.Token {
return &fakeToken{}
},
AddRouteF: func(callback mqtt.MessageHandler) {
addRouteF: func(callback mqtt.MessageHandler) {
handler = callback
},
SubscribeMultipleF: func() mqtt.Token {
return &FakeToken{}
subscribeMultipleF: func() mqtt.Token {
return &fakeToken{}
},
DisconnectF: func() {
disconnectF: func() {
},
}
plugin := New(func(*mqtt.ClientOptions) Client {
return client
plugin := newMQTTConsumer(func(*mqtt.ClientOptions) client {
return fClient
})
plugin.Log = testutil.Logger{}
plugin.Topics = []string{tt.topic}
@ -557,7 +557,7 @@ func TestTopicTag(t *testing.T) {
var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
var m Message
var m message
m.topic = tt.topic
handler(nil, &m)
@ -570,20 +570,20 @@ func TestTopicTag(t *testing.T) {
}
func TestAddRouteCalledForEachTopic(t *testing.T) {
client := &FakeClient{
ConnectF: func() mqtt.Token {
return &FakeToken{}
fClient := &fakeClient{
connectF: func() mqtt.Token {
return &fakeToken{}
},
AddRouteF: func(mqtt.MessageHandler) {
addRouteF: func(mqtt.MessageHandler) {
},
SubscribeMultipleF: func() mqtt.Token {
return &FakeToken{}
subscribeMultipleF: func() mqtt.Token {
return &fakeToken{}
},
DisconnectF: func() {
disconnectF: func() {
},
}
plugin := New(func(*mqtt.ClientOptions) Client {
return client
plugin := newMQTTConsumer(func(*mqtt.ClientOptions) client {
return fClient
})
plugin.Log = testutil.Logger{}
plugin.Topics = []string{"a", "b"}
@ -595,24 +595,24 @@ func TestAddRouteCalledForEachTopic(t *testing.T) {
plugin.Stop()
require.Equal(t, 2, client.addRouteCallCount)
require.Equal(t, 2, fClient.addRouteCallCount)
}
func TestSubscribeCalledIfNoSession(t *testing.T) {
client := &FakeClient{
ConnectF: func() mqtt.Token {
return &FakeToken{}
fClient := &fakeClient{
connectF: func() mqtt.Token {
return &fakeToken{}
},
AddRouteF: func(mqtt.MessageHandler) {
addRouteF: func(mqtt.MessageHandler) {
},
SubscribeMultipleF: func() mqtt.Token {
return &FakeToken{}
subscribeMultipleF: func() mqtt.Token {
return &fakeToken{}
},
DisconnectF: func() {
disconnectF: func() {
},
}
plugin := New(func(*mqtt.ClientOptions) Client {
return client
plugin := newMQTTConsumer(func(*mqtt.ClientOptions) client {
return fClient
})
plugin.Log = testutil.Logger{}
plugin.Topics = []string{"b"}
@ -624,24 +624,24 @@ func TestSubscribeCalledIfNoSession(t *testing.T) {
plugin.Stop()
require.Equal(t, 1, client.subscribeCallCount)
require.Equal(t, 1, fClient.subscribeCallCount)
}
func TestSubscribeNotCalledIfSession(t *testing.T) {
client := &FakeClient{
ConnectF: func() mqtt.Token {
return &FakeToken{sessionPresent: true}
fClient := &fakeClient{
connectF: func() mqtt.Token {
return &fakeToken{sessionPresent: true}
},
AddRouteF: func(mqtt.MessageHandler) {
addRouteF: func(mqtt.MessageHandler) {
},
SubscribeMultipleF: func() mqtt.Token {
return &FakeToken{}
subscribeMultipleF: func() mqtt.Token {
return &fakeToken{}
},
DisconnectF: func() {
disconnectF: func() {
},
}
plugin := New(func(*mqtt.ClientOptions) Client {
return client
plugin := newMQTTConsumer(func(*mqtt.ClientOptions) client {
return fClient
})
plugin.Log = testutil.Logger{}
plugin.Topics = []string{"b"}
@ -652,7 +652,7 @@ func TestSubscribeNotCalledIfSession(t *testing.T) {
require.NoError(t, plugin.Start(&acc))
plugin.Stop()
require.Equal(t, 0, client.subscribeCallCount)
require.Equal(t, 0, fClient.subscribeCallCount)
}
func TestIntegration(t *testing.T) {
@ -679,7 +679,7 @@ func TestIntegration(t *testing.T) {
// Setup the plugin and connect to the broker
url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort])
topic := "/telegraf/test"
factory := func(o *mqtt.ClientOptions) Client { return mqtt.NewClient(o) }
factory := func(o *mqtt.ClientOptions) client { return mqtt.NewClient(o) }
plugin := &MQTTConsumer{
Servers: []string{url},
Topics: []string{topic},
@ -768,7 +768,7 @@ func TestStartupErrorBehaviorErrorIntegration(t *testing.T) {
// Setup the plugin and connect to the broker
url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort])
topic := "/telegraf/test"
factory := func(o *mqtt.ClientOptions) Client { return mqtt.NewClient(o) }
factory := func(o *mqtt.ClientOptions) client { return mqtt.NewClient(o) }
plugin := &MQTTConsumer{
Servers: []string{url},
Topics: []string{topic},
@ -827,7 +827,7 @@ func TestStartupErrorBehaviorIgnoreIntegration(t *testing.T) {
// Setup the plugin and connect to the broker
url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort])
topic := "/telegraf/test"
factory := func(o *mqtt.ClientOptions) Client { return mqtt.NewClient(o) }
factory := func(o *mqtt.ClientOptions) client { return mqtt.NewClient(o) }
plugin := &MQTTConsumer{
Servers: []string{url},
Topics: []string{topic},
@ -892,7 +892,7 @@ func TestStartupErrorBehaviorRetryIntegration(t *testing.T) {
// Setup the plugin and connect to the broker
url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort])
topic := "/telegraf/test"
factory := func(o *mqtt.ClientOptions) Client { return mqtt.NewClient(o) }
factory := func(o *mqtt.ClientOptions) client { return mqtt.NewClient(o) }
plugin := &MQTTConsumer{
Servers: []string{url},
Topics: []string{topic},
@ -997,7 +997,7 @@ func TestReconnectIntegration(t *testing.T) {
// Setup the plugin and connect to the broker
url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort])
topic := "/telegraf/test"
factory := func(o *mqtt.ClientOptions) Client { return mqtt.NewClient(o) }
factory := func(o *mqtt.ClientOptions) client { return mqtt.NewClient(o) }
plugin := &MQTTConsumer{
Servers: []string{url},
Topics: []string{topic},

View File

@ -8,9 +8,12 @@ type mqttLogger struct {
telegraf.Logger
}
// Printf implements mqtt.Logger
func (l mqttLogger) Printf(fmt string, args ...interface{}) {
l.Logger.Debugf(fmt, args...)
}
// Println implements mqtt.Logger
func (l mqttLogger) Println(args ...interface{}) {
l.Logger.Debug(args...)
}

View File

@ -9,7 +9,7 @@ import (
"github.com/influxdata/telegraf"
)
type TopicParsingConfig struct {
type topicParsingConfig struct {
Topic string `toml:"topic"`
Measurement string `toml:"measurement"`
Tags string `toml:"tags"`
@ -17,7 +17,7 @@ type TopicParsingConfig struct {
FieldTypes map[string]string `toml:"types"`
}
type TopicParser struct {
type topicParser struct {
topicIndices map[string]int
topicVarLength bool
topicMinLength int
@ -29,8 +29,8 @@ type TopicParser struct {
fieldTypes map[string]string
}
func (cfg *TopicParsingConfig) NewParser() (*TopicParser, error) {
p := &TopicParser{
func (cfg *topicParsingConfig) newParser() (*topicParser, error) {
p := &topicParser{
fieldTypes: cfg.FieldTypes,
}
@ -150,7 +150,7 @@ func (cfg *TopicParsingConfig) NewParser() (*TopicParser, error) {
return p, nil
}
func (p *TopicParser) Parse(metric telegraf.Metric, topic string) error {
func (p *topicParser) parse(metric telegraf.Metric, topic string) error {
// Split the actual topic into its elements and check for a match
topicParts := strings.Split(topic, "/")
if p.topicVarLength && len(topicParts) < p.topicMinLength || !p.topicVarLength && len(topicParts) != p.topicMinLength {
@ -200,7 +200,7 @@ func (p *TopicParser) Parse(metric telegraf.Metric, topic string) error {
return nil
}
func (p *TopicParser) convertToFieldType(value, key string) (interface{}, error) {
func (p *topicParser) convertToFieldType(value, key string) (interface{}, error) {
// If the user configured inputs.mqtt_consumer.topic.types, check for the desired type
desiredType, ok := p.fieldTypes[key]
if !ok {

View File

@ -19,15 +19,15 @@ import (
var sampleConfig string
type MultiFile struct {
BaseDir string
FailEarly bool
Files []File `toml:"file"`
BaseDir string `toml:"base_dir"`
FailEarly bool `toml:"fail_early"`
Files []file `toml:"file"`
}
type File struct {
type file struct {
Name string `toml:"file"`
Dest string
Conversion string
Dest string `toml:"dest"`
Conversion string `toml:"conversion"`
}
func (*MultiFile) SampleConfig() string {

View File

@ -17,7 +17,7 @@ func TestFileTypes(t *testing.T) {
m := MultiFile{
BaseDir: path.Join(wd, `testdata`),
FailEarly: true,
Files: []File{
Files: []file{
{Name: `bool.txt`, Dest: `examplebool`, Conversion: `bool`},
{Name: `float.txt`, Dest: `examplefloat`, Conversion: `float`},
{Name: `int.txt`, Dest: `examplefloatX`, Conversion: `float(3)`},
@ -43,14 +43,14 @@ func TestFileTypes(t *testing.T) {
}, acc.Metrics[0].Fields)
}
func FailEarly(failEarly bool, t *testing.T) error {
func failEarly(failEarly bool, t *testing.T) error {
wd, err := os.Getwd()
require.NoError(t, err)
m := MultiFile{
BaseDir: path.Join(wd, `testdata`),
FailEarly: failEarly,
Files: []File{
Files: []file{
{Name: `int.txt`, Dest: `exampleint`, Conversion: `int`},
{Name: `int.txt`, Dest: `exampleerror`, Conversion: `bool`},
},
@ -71,8 +71,8 @@ func FailEarly(failEarly bool, t *testing.T) error {
}
func TestFailEarly(t *testing.T) {
err := FailEarly(false, t)
err := failEarly(false, t)
require.NoError(t, err)
err = FailEarly(true, t)
err = failEarly(true, t)
require.Error(t, err)
}

View File

@ -29,6 +29,14 @@ var sampleConfig string
var tlsRe = regexp.MustCompile(`([\?&])(?:tls=custom)($|&)`)
const (
defaultPerfEventsStatementsDigestTextLimit = 120
defaultPerfEventsStatementsLimit = 250
defaultPerfEventsStatementsTimeLimit = 86400
defaultGatherGlobalVars = true
localhost = ""
)
type Mysql struct {
Servers []*config.Secret `toml:"servers"`
PerfEventsStatementsDigestTextLimit int64 `toml:"perf_events_statements_digest_text_limit"`
@ -64,15 +72,6 @@ type Mysql struct {
loggedConvertFields map[string]bool
}
const (
defaultPerfEventsStatementsDigestTextLimit = 120
defaultPerfEventsStatementsLimit = 250
defaultPerfEventsStatementsTimeLimit = 86400
defaultGatherGlobalVars = true
)
const localhost = ""
func (*Mysql) SampleConfig() string {
return sampleConfig
}

View File

@ -8,7 +8,7 @@ import (
"strconv"
)
type ConversionFunc func(value sql.RawBytes) (interface{}, error)
type conversionFunc func(value sql.RawBytes) (interface{}, error)
func ParseInt(value sql.RawBytes) (interface{}, error) {
v, err := strconv.ParseInt(string(value), 10, 64)
@ -86,7 +86,7 @@ func ParseValue(value sql.RawBytes) (interface{}, error) {
return nil, fmt.Errorf("unconvertible value: %q", string(value))
}
var GlobalStatusConversions = map[string]ConversionFunc{
var globalStatusConversions = map[string]conversionFunc{
"innodb_available_undo_logs": ParseUint,
"innodb_buffer_pool_pages_misc": ParseUint,
"innodb_data_pending_fsyncs": ParseUint,
@ -108,7 +108,7 @@ var GlobalStatusConversions = map[string]ConversionFunc{
"wsrep_local_send_queue_avg": ParseFloat,
}
var GlobalVariableConversions = map[string]ConversionFunc{
var globalVariableConversions = map[string]conversionFunc{
// see https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html
// see https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html
"delay_key_write": ParseString, // ON, OFF, ALL
@ -140,7 +140,7 @@ func ConvertGlobalStatus(key string, value sql.RawBytes) (interface{}, error) {
return nil, nil
}
if conv, ok := GlobalStatusConversions[key]; ok {
if conv, ok := globalStatusConversions[key]; ok {
return conv(value)
}
@ -152,7 +152,7 @@ func ConvertGlobalVariables(key string, value sql.RawBytes) (interface{}, error)
return nil, nil
}
if conv, ok := GlobalVariableConversions[key]; ok {
if conv, ok := globalVariableConversions[key]; ok {
return conv(value)
}