chore: Fix linter findings for `revive:exported` in `plugins/inputs/n*` (#16205)

This commit is contained in:
Paweł Żak 2024-11-25 11:23:17 +01:00 committed by GitHub
parent f80910be16
commit 3dea61cb5c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
36 changed files with 726 additions and 745 deletions

View File

@ -23,8 +23,8 @@ import (
var sampleConfig string var sampleConfig string
type Nats struct { type Nats struct {
Server string Server string `toml:"server"`
ResponseTimeout config.Duration ResponseTimeout config.Duration `toml:"response_timeout"`
client *http.Client client *http.Client
} }

View File

@ -19,27 +19,12 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
var once sync.Once
var ( var (
once sync.Once
defaultMaxUndeliveredMessages = 1000 defaultMaxUndeliveredMessages = 1000
) )
type empty struct{} type NatsConsumer struct {
type semaphore chan empty
type natsError struct {
conn *nats.Conn
sub *nats.Subscription
err error
}
func (e natsError) Error() string {
return fmt.Sprintf("%s url:%s id:%s sub:%s queue:%s",
e.err.Error(), e.conn.ConnectedUrl(), e.conn.ConnectedServerId(), e.sub.Subject, e.sub.Queue)
}
type natsConsumer struct {
QueueGroup string `toml:"queue_group"` QueueGroup string `toml:"queue_group"`
Subjects []string `toml:"subjects"` Subjects []string `toml:"subjects"`
Servers []string `toml:"servers"` Servers []string `toml:"servers"`
@ -70,24 +55,32 @@ type natsConsumer struct {
cancel context.CancelFunc cancel context.CancelFunc
} }
func (*natsConsumer) SampleConfig() string { type (
empty struct{}
semaphore chan empty
)
type natsError struct {
conn *nats.Conn
sub *nats.Subscription
err error
}
func (e natsError) Error() string {
return fmt.Sprintf("%s url:%s id:%s sub:%s queue:%s",
e.err.Error(), e.conn.ConnectedUrl(), e.conn.ConnectedServerId(), e.sub.Subject, e.sub.Queue)
}
func (*NatsConsumer) SampleConfig() string {
return sampleConfig return sampleConfig
} }
func (n *natsConsumer) SetParser(parser telegraf.Parser) { func (n *NatsConsumer) SetParser(parser telegraf.Parser) {
n.parser = parser n.parser = parser
} }
func (n *natsConsumer) natsErrHandler(c *nats.Conn, s *nats.Subscription, e error) { // Start the nats consumer. Caller must call *NatsConsumer.Stop() to clean up.
select { func (n *NatsConsumer) Start(acc telegraf.Accumulator) error {
case n.errs <- natsError{conn: c, sub: s, err: e}:
default:
return
}
}
// Start the nats consumer. Caller must call *natsConsumer.Stop() to clean up.
func (n *natsConsumer) Start(acc telegraf.Accumulator) error {
n.acc = acc.WithTracking(n.MaxUndeliveredMessages) n.acc = acc.WithTracking(n.MaxUndeliveredMessages)
options := []nats.Option{ options := []nats.Option{
@ -193,9 +186,27 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error {
return nil return nil
} }
func (n *NatsConsumer) Gather(_ telegraf.Accumulator) error {
return nil
}
func (n *NatsConsumer) Stop() {
n.cancel()
n.wg.Wait()
n.clean()
}
func (n *NatsConsumer) natsErrHandler(c *nats.Conn, s *nats.Subscription, e error) {
select {
case n.errs <- natsError{conn: c, sub: s, err: e}:
default:
return
}
}
// receiver() reads all incoming messages from NATS, and parses them into // receiver() reads all incoming messages from NATS, and parses them into
// telegraf metrics. // telegraf metrics.
func (n *natsConsumer) receiver(ctx context.Context) { func (n *NatsConsumer) receiver(ctx context.Context) {
sem := make(semaphore, n.MaxUndeliveredMessages) sem := make(semaphore, n.MaxUndeliveredMessages)
for { for {
@ -237,7 +248,7 @@ func (n *natsConsumer) receiver(ctx context.Context) {
} }
} }
func (n *natsConsumer) clean() { func (n *NatsConsumer) clean() {
for _, sub := range n.subs { for _, sub := range n.subs {
if err := sub.Unsubscribe(); err != nil { if err := sub.Unsubscribe(); err != nil {
n.Log.Errorf("Error unsubscribing from subject %s in queue %s: %s", n.Log.Errorf("Error unsubscribing from subject %s in queue %s: %s",
@ -257,19 +268,9 @@ func (n *natsConsumer) clean() {
} }
} }
func (n *natsConsumer) Stop() {
n.cancel()
n.wg.Wait()
n.clean()
}
func (n *natsConsumer) Gather(_ telegraf.Accumulator) error {
return nil
}
func init() { func init() {
inputs.Add("nats_consumer", func() telegraf.Input { inputs.Add("nats_consumer", func() telegraf.Input {
return &natsConsumer{ return &NatsConsumer{
Servers: []string{"nats://localhost:4222"}, Servers: []string{"nats://localhost:4222"},
Subjects: []string{"telegraf"}, Subjects: []string{"telegraf"},
QueueGroup: "telegraf_consumers", QueueGroup: "telegraf_consumers",

View File

@ -28,7 +28,7 @@ func TestStartStop(t *testing.T) {
require.NoError(t, container.Start(), "failed to start container") require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate() defer container.Terminate()
plugin := &natsConsumer{ plugin := &NatsConsumer{
Servers: []string{fmt.Sprintf("nats://%s:%s", container.Address, container.Ports["4222"])}, Servers: []string{fmt.Sprintf("nats://%s:%s", container.Address, container.Ports["4222"])},
Subjects: []string{"telegraf"}, Subjects: []string{"telegraf"},
QueueGroup: "telegraf_consumers", QueueGroup: "telegraf_consumers",
@ -140,7 +140,7 @@ func TestSendReceive(t *testing.T) {
} }
// Setup the plugin // Setup the plugin
plugin := &natsConsumer{ plugin := &NatsConsumer{
Servers: []string{addr}, Servers: []string{addr},
Subjects: subjects, Subjects: subjects,
QueueGroup: "telegraf_consumers", QueueGroup: "telegraf_consumers",
@ -161,15 +161,15 @@ func TestSendReceive(t *testing.T) {
defer plugin.Stop() defer plugin.Stop()
// Send all messages to the topics (random order due to Golang map) // Send all messages to the topics (random order due to Golang map)
publisher := &sender{Addr: addr} publisher := &sender{addr: addr}
require.NoError(t, publisher.Connect()) require.NoError(t, publisher.connect())
defer publisher.Disconnect() defer publisher.disconnect()
for topic, msgs := range tt.msgs { for topic, msgs := range tt.msgs {
for _, msg := range msgs { for _, msg := range msgs {
require.NoError(t, publisher.Send(topic, msg)) require.NoError(t, publisher.send(topic, msg))
} }
} }
publisher.Disconnect() publisher.disconnect()
// Wait for the metrics to be collected // Wait for the metrics to be collected
require.Eventually(t, func() bool { require.Eventually(t, func() bool {
@ -185,16 +185,12 @@ func TestSendReceive(t *testing.T) {
} }
type sender struct { type sender struct {
Addr string addr string
Username string
Password string
conn *nats.Conn conn *nats.Conn
} }
func (s *sender) Connect() error { func (s *sender) connect() error {
conn, err := nats.Connect(s.Addr) conn, err := nats.Connect(s.addr)
if err != nil { if err != nil {
return err return err
} }
@ -203,7 +199,7 @@ func (s *sender) Connect() error {
return nil return nil
} }
func (s *sender) Disconnect() { func (s *sender) disconnect() {
if s.conn != nil && !s.conn.IsClosed() { if s.conn != nil && !s.conn.IsClosed() {
_ = s.conn.Flush() _ = s.conn.Flush()
s.conn.Close() s.conn.Close()
@ -211,6 +207,6 @@ func (s *sender) Disconnect() {
s.conn = nil s.conn = nil
} }
func (s *sender) Send(topic, msg string) error { func (s *sender) send(topic, msg string) error {
return s.conn.Publish(topic, []byte(msg)) return s.conn.Publish(topic, []byte(msg))
} }

View File

@ -68,12 +68,6 @@ func (n *NeoomBeaam) Start(telegraf.Accumulator) error {
return n.updateConfiguration() return n.updateConfiguration()
} }
func (n *NeoomBeaam) Stop() {
if n.client != nil {
n.client.CloseIdleConnections()
}
}
func (n *NeoomBeaam) Gather(acc telegraf.Accumulator) error { func (n *NeoomBeaam) Gather(acc telegraf.Accumulator) error {
// Refresh the config if requested // Refresh the config if requested
if n.RefreshConfig { if n.RefreshConfig {
@ -97,6 +91,12 @@ func (n *NeoomBeaam) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
func (n *NeoomBeaam) Stop() {
if n.client != nil {
n.client.CloseIdleConnections()
}
}
func (n *NeoomBeaam) updateConfiguration() error { func (n *NeoomBeaam) updateConfiguration() error {
endpoint := n.Address + "/api/v1/site/configuration" endpoint := n.Address + "/api/v1/site/configuration"
request, err := http.NewRequest("GET", endpoint, nil) request, err := http.NewRequest("GET", endpoint, nil)

View File

@ -27,6 +27,12 @@ var sampleConfig string
// Measurement is constant across all metrics. // Measurement is constant across all metrics.
const Measurement = "neptune_apex" const Measurement = "neptune_apex"
type NeptuneApex struct {
Servers []string `toml:"servers"`
ResponseTimeout config.Duration `toml:"response_timeout"`
httpClient *http.Client
}
type xmlReply struct { type xmlReply struct {
SoftwareVersion string `xml:"software,attr"` SoftwareVersion string `xml:"software,attr"`
HardwareVersion string `xml:"hardware,attr"` HardwareVersion string `xml:"hardware,attr"`
@ -54,18 +60,10 @@ type outlet struct {
Xstatus *string `xml:"xstatus"` Xstatus *string `xml:"xstatus"`
} }
// NeptuneApex implements telegraf.Input.
type NeptuneApex struct {
Servers []string
ResponseTimeout config.Duration
httpClient *http.Client
}
func (*NeptuneApex) SampleConfig() string { func (*NeptuneApex) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// Gather implements telegraf.Input.Gather
func (n *NeptuneApex) Gather(acc telegraf.Accumulator) error { func (n *NeptuneApex) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup var wg sync.WaitGroup
for _, server := range n.Servers { for _, server := range n.Servers {

View File

@ -69,7 +69,7 @@ func TestParseXML(t *testing.T) {
}{ }{
{ {
name: "Good test", name: "Good test",
xmlResponse: []byte(APEX2016), xmlResponse: []byte(apex2016),
wantMetrics: []telegraf.Metric{ wantMetrics: []telegraf.Metric{
testutil.MustMetric( testutil.MustMetric(
Measurement, Measurement,
@ -532,7 +532,7 @@ func fakeHTTPClient(h http.Handler) (*http.Client, func()) {
} }
// Sample configuration from a 2016 version Neptune Apex. // Sample configuration from a 2016 version Neptune Apex.
const APEX2016 = `<?xml version="1.0"?> const apex2016 = `<?xml version="1.0"?>
<status software="5.04_7A18" hardware="1.0"> <status software="5.04_7A18" hardware="1.0">
<hostname>apex</hostname> <hostname>apex</hostname>
<serial>AC5:12345</serial> <serial>AC5:12345</serial>

View File

@ -21,20 +21,20 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
type NetIOStats struct { type Net struct {
filter filter.Filter Interfaces []string `toml:"interfaces"`
ps system.PS IgnoreProtocolStats bool `toml:"ignore_protocol_stats"`
skipChecks bool filter filter.Filter
IgnoreProtocolStats bool ps system.PS
Interfaces []string skipChecks bool
} }
func (*NetIOStats) SampleConfig() string { func (*Net) SampleConfig() string {
return sampleConfig return sampleConfig
} }
func (n *NetIOStats) Init() error { func (n *Net) Init() error {
if !n.IgnoreProtocolStats { if !n.IgnoreProtocolStats {
config.PrintOptionValueDeprecationNotice("inputs.net", "ignore_protocol_stats", "false", config.PrintOptionValueDeprecationNotice("inputs.net", "ignore_protocol_stats", "false",
telegraf.DeprecationInfo{ telegraf.DeprecationInfo{
@ -48,7 +48,7 @@ func (n *NetIOStats) Init() error {
return nil return nil
} }
func (n *NetIOStats) Gather(acc telegraf.Accumulator) error { func (n *Net) Gather(acc telegraf.Accumulator) error {
netio, err := n.ps.NetIO() netio, err := n.ps.NetIO()
if err != nil { if err != nil {
return fmt.Errorf("error getting net io info: %w", err) return fmt.Errorf("error getting net io info: %w", err)
@ -153,6 +153,6 @@ func getInterfaceSpeed(ioName string) int64 {
func init() { func init() {
inputs.Add("net", func() telegraf.Input { inputs.Add("net", func() telegraf.Input {
return &NetIOStats{ps: system.NewSystemPS()} return &Net{ps: system.NewSystemPS()}
}) })
} }

View File

@ -44,7 +44,7 @@ func TestNetIOStats(t *testing.T) {
t.Setenv("HOST_SYS", filepath.Join("testdata", "general", "sys")) t.Setenv("HOST_SYS", filepath.Join("testdata", "general", "sys"))
plugin := &NetIOStats{ps: &mps, skipChecks: true} plugin := &Net{ps: &mps, skipChecks: true}
var acc testutil.Accumulator var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc)) require.NoError(t, plugin.Gather(&acc))
@ -111,7 +111,7 @@ func TestNetIOStatsSpeedUnsupported(t *testing.T) {
t.Setenv("HOST_SYS", filepath.Join("testdata", "general", "sys")) t.Setenv("HOST_SYS", filepath.Join("testdata", "general", "sys"))
plugin := &NetIOStats{ps: &mps, skipChecks: true} plugin := &Net{ps: &mps, skipChecks: true}
var acc testutil.Accumulator var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc)) require.NoError(t, plugin.Gather(&acc))
@ -178,7 +178,7 @@ func TestNetIOStatsNoSpeedFile(t *testing.T) {
t.Setenv("HOST_SYS", filepath.Join("testdata", "general", "sys")) t.Setenv("HOST_SYS", filepath.Join("testdata", "general", "sys"))
plugin := &NetIOStats{ps: &mps, skipChecks: true} plugin := &Net{ps: &mps, skipChecks: true}
var acc testutil.Accumulator var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc)) require.NoError(t, plugin.Gather(&acc))

View File

@ -20,155 +20,29 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
type ResultType uint64 type resultType uint64
const ( const (
Success ResultType = 0 success resultType = 0
Timeout ResultType = 1 timeout resultType = 1
ConnectionFailed ResultType = 2 connectionFailed resultType = 2
ReadFailed ResultType = 3 readFailed resultType = 3
StringMismatch ResultType = 4 stringMismatch resultType = 4
) )
// NetResponse struct
type NetResponse struct { type NetResponse struct {
Address string Address string `toml:"address"`
Timeout config.Duration Timeout config.Duration `toml:"timeout"`
ReadTimeout config.Duration ReadTimeout config.Duration `toml:"read_timeout"`
Send string Send string `toml:"send"`
Expect string Expect string `toml:"expect"`
Protocol string Protocol string `toml:"protocol"`
} }
func (*NetResponse) SampleConfig() string { func (*NetResponse) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// TCPGather will execute if there are TCP tests defined in the configuration.
// It will return a map[string]interface{} for fields and a map[string]string for tags
func (n *NetResponse) TCPGather() (map[string]string, map[string]interface{}, error) {
// Prepare returns
tags := make(map[string]string)
fields := make(map[string]interface{})
// Start Timer
start := time.Now()
// Connecting
conn, err := net.DialTimeout("tcp", n.Address, time.Duration(n.Timeout))
// Stop timer
responseTime := time.Since(start).Seconds()
// Handle error
if err != nil {
var e net.Error
if errors.As(err, &e) && e.Timeout() {
setResult(Timeout, fields, tags, n.Expect)
} else {
setResult(ConnectionFailed, fields, tags, n.Expect)
}
return tags, fields, nil
}
defer conn.Close()
// Send string if needed
if n.Send != "" {
msg := []byte(n.Send)
if _, gerr := conn.Write(msg); gerr != nil {
return nil, nil, gerr
}
// Stop timer
responseTime = time.Since(start).Seconds()
}
// Read string if needed
if n.Expect != "" {
// Set read timeout
if gerr := conn.SetReadDeadline(time.Now().Add(time.Duration(n.ReadTimeout))); gerr != nil {
return nil, nil, gerr
}
// Prepare reader
reader := bufio.NewReader(conn)
tp := textproto.NewReader(reader)
// Read
data, err := tp.ReadLine()
// Stop timer
responseTime = time.Since(start).Seconds()
// Handle error
if err != nil {
setResult(ReadFailed, fields, tags, n.Expect)
} else {
// Looking for string in answer
regEx := regexp.MustCompile(`.*` + n.Expect + `.*`)
find := regEx.FindString(data)
if find != "" {
setResult(Success, fields, tags, n.Expect)
} else {
setResult(StringMismatch, fields, tags, n.Expect)
}
}
} else {
setResult(Success, fields, tags, n.Expect)
}
fields["response_time"] = responseTime
return tags, fields, nil
}
// UDPGather will execute if there are UDP tests defined in the configuration.
// It will return a map[string]interface{} for fields and a map[string]string for tags
func (n *NetResponse) UDPGather() (map[string]string, map[string]interface{}, error) {
// Prepare returns
tags := make(map[string]string)
fields := make(map[string]interface{})
// Start Timer
start := time.Now()
// Resolving
udpAddr, err := net.ResolveUDPAddr("udp", n.Address)
// Handle error
if err != nil {
setResult(ConnectionFailed, fields, tags, n.Expect)
return tags, fields, nil
}
// Connecting
conn, err := net.DialUDP("udp", nil, udpAddr)
// Handle error
if err != nil {
setResult(ConnectionFailed, fields, tags, n.Expect)
return tags, fields, nil
}
defer conn.Close()
// Send string
msg := []byte(n.Send)
if _, gerr := conn.Write(msg); gerr != nil {
return nil, nil, gerr
}
// Read string
// Set read timeout
if gerr := conn.SetReadDeadline(time.Now().Add(time.Duration(n.ReadTimeout))); gerr != nil {
return nil, nil, gerr
}
// Read
buf := make([]byte, 1024)
_, _, err = conn.ReadFromUDP(buf)
// Stop timer
responseTime := time.Since(start).Seconds()
// Handle error
if err != nil {
setResult(ReadFailed, fields, tags, n.Expect)
return tags, fields, nil
}
// Looking for string in answer
regEx := regexp.MustCompile(`.*` + n.Expect + `.*`)
find := regEx.FindString(string(buf))
if find != "" {
setResult(Success, fields, tags, n.Expect)
} else {
setResult(StringMismatch, fields, tags, n.Expect)
}
fields["response_time"] = responseTime
return tags, fields, nil
}
// Init performs one time setup of the plugin and returns an error if the
// configuration is invalid.
func (n *NetResponse) Init() error { func (n *NetResponse) Init() error {
// Set default values // Set default values
if n.Timeout == 0 { if n.Timeout == 0 {
@ -203,9 +77,6 @@ func (n *NetResponse) Init() error {
return nil return nil
} }
// Gather is called by telegraf when the plugin is executed on its interval.
// It will call either UDPGather or TCPGather based on the configuration and
// also fill an Accumulator that is supplied.
func (n *NetResponse) Gather(acc telegraf.Accumulator) error { func (n *NetResponse) Gather(acc telegraf.Accumulator) error {
// Prepare host and port // Prepare host and port
host, port, err := net.SplitHostPort(n.Address) host, port, err := net.SplitHostPort(n.Address)
@ -221,13 +92,13 @@ func (n *NetResponse) Gather(acc telegraf.Accumulator) error {
// Gather data // Gather data
switch n.Protocol { switch n.Protocol {
case "tcp": case "tcp":
returnTags, fields, err = n.TCPGather() returnTags, fields, err = n.tcpGather()
if err != nil { if err != nil {
return err return err
} }
tags["protocol"] = "tcp" tags["protocol"] = "tcp"
case "udp": case "udp":
returnTags, fields, err = n.UDPGather() returnTags, fields, err = n.udpGather()
if err != nil { if err != nil {
return err return err
} }
@ -243,18 +114,137 @@ func (n *NetResponse) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
func setResult(result ResultType, fields map[string]interface{}, tags map[string]string, expect string) { func (n *NetResponse) tcpGather() (map[string]string, map[string]interface{}, error) {
// Prepare returns
tags := make(map[string]string)
fields := make(map[string]interface{})
// Start Timer
start := time.Now()
// Connecting
conn, err := net.DialTimeout("tcp", n.Address, time.Duration(n.Timeout))
// Stop timer
responseTime := time.Since(start).Seconds()
// Handle error
if err != nil {
var e net.Error
if errors.As(err, &e) && e.Timeout() {
setResult(timeout, fields, tags, n.Expect)
} else {
setResult(connectionFailed, fields, tags, n.Expect)
}
return tags, fields, nil
}
defer conn.Close()
// Send string if needed
if n.Send != "" {
msg := []byte(n.Send)
if _, gerr := conn.Write(msg); gerr != nil {
return nil, nil, gerr
}
// Stop timer
responseTime = time.Since(start).Seconds()
}
// Read string if needed
if n.Expect != "" {
// Set read timeout
if gerr := conn.SetReadDeadline(time.Now().Add(time.Duration(n.ReadTimeout))); gerr != nil {
return nil, nil, gerr
}
// Prepare reader
reader := bufio.NewReader(conn)
tp := textproto.NewReader(reader)
// Read
data, err := tp.ReadLine()
// Stop timer
responseTime = time.Since(start).Seconds()
// Handle error
if err != nil {
setResult(readFailed, fields, tags, n.Expect)
} else {
// Looking for string in answer
regEx := regexp.MustCompile(`.*` + n.Expect + `.*`)
find := regEx.FindString(data)
if find != "" {
setResult(success, fields, tags, n.Expect)
} else {
setResult(stringMismatch, fields, tags, n.Expect)
}
}
} else {
setResult(success, fields, tags, n.Expect)
}
fields["response_time"] = responseTime
return tags, fields, nil
}
func (n *NetResponse) udpGather() (map[string]string, map[string]interface{}, error) {
// Prepare returns
tags := make(map[string]string)
fields := make(map[string]interface{})
// Start Timer
start := time.Now()
// Resolving
udpAddr, err := net.ResolveUDPAddr("udp", n.Address)
// Handle error
if err != nil {
setResult(connectionFailed, fields, tags, n.Expect)
return tags, fields, nil
}
// Connecting
conn, err := net.DialUDP("udp", nil, udpAddr)
// Handle error
if err != nil {
setResult(connectionFailed, fields, tags, n.Expect)
return tags, fields, nil
}
defer conn.Close()
// Send string
msg := []byte(n.Send)
if _, gerr := conn.Write(msg); gerr != nil {
return nil, nil, gerr
}
// Read string
// Set read timeout
if gerr := conn.SetReadDeadline(time.Now().Add(time.Duration(n.ReadTimeout))); gerr != nil {
return nil, nil, gerr
}
// Read
buf := make([]byte, 1024)
_, _, err = conn.ReadFromUDP(buf)
// Stop timer
responseTime := time.Since(start).Seconds()
// Handle error
if err != nil {
setResult(readFailed, fields, tags, n.Expect)
return tags, fields, nil
}
// Looking for string in answer
regEx := regexp.MustCompile(`.*` + n.Expect + `.*`)
find := regEx.FindString(string(buf))
if find != "" {
setResult(success, fields, tags, n.Expect)
} else {
setResult(stringMismatch, fields, tags, n.Expect)
}
fields["response_time"] = responseTime
return tags, fields, nil
}
func setResult(result resultType, fields map[string]interface{}, tags map[string]string, expect string) {
var tag string var tag string
switch result { switch result {
case Success: case success:
tag = "success" tag = "success"
case Timeout: case timeout:
tag = "timeout" tag = "timeout"
case ConnectionFailed: case connectionFailed:
tag = "connection_failed" tag = "connection_failed"
case ReadFailed: case readFailed:
tag = "read_failed" tag = "read_failed"
case StringMismatch: case stringMismatch:
tag = "string_mismatch" tag = "string_mismatch"
} }
@ -266,7 +256,7 @@ func setResult(result ResultType, fields map[string]interface{}, tags map[string
// deprecated in 1.4; use result tag // deprecated in 1.4; use result tag
if expect != "" { if expect != "" {
fields["string_found"] = result == Success fields["string_found"] = result == success
} }
} }

View File

@ -106,7 +106,7 @@ func TestTCPOK1(t *testing.T) {
require.NoError(t, c.Init()) require.NoError(t, c.Init())
// Start TCP server // Start TCP server
wg.Add(1) wg.Add(1)
go TCPServer(t, &wg) go tcpServer(t, &wg)
wg.Wait() // Wait for the server to spin up wg.Wait() // Wait for the server to spin up
wg.Add(1) wg.Add(1)
// Connect // Connect
@ -151,7 +151,7 @@ func TestTCPOK2(t *testing.T) {
require.NoError(t, c.Init()) require.NoError(t, c.Init())
// Start TCP server // Start TCP server
wg.Add(1) wg.Add(1)
go TCPServer(t, &wg) go tcpServer(t, &wg)
wg.Wait() wg.Wait()
wg.Add(1) wg.Add(1)
@ -233,7 +233,7 @@ func TestUDPOK1(t *testing.T) {
require.NoError(t, c.Init()) require.NoError(t, c.Init())
// Start UDP server // Start UDP server
wg.Add(1) wg.Add(1)
go UDPServer(t, &wg) go udpServer(t, &wg)
wg.Wait() wg.Wait()
wg.Add(1) wg.Add(1)
@ -264,7 +264,7 @@ func TestUDPOK1(t *testing.T) {
wg.Wait() wg.Wait()
} }
func UDPServer(t *testing.T, wg *sync.WaitGroup) { func udpServer(t *testing.T, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
udpAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:2004") udpAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:2004")
if err != nil { if err != nil {
@ -297,7 +297,7 @@ func UDPServer(t *testing.T, wg *sync.WaitGroup) {
} }
} }
func TCPServer(t *testing.T, wg *sync.WaitGroup) { func tcpServer(t *testing.T, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
tcpAddr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:2004") tcpAddr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:2004")
if err != nil { if err != nil {

View File

@ -19,11 +19,6 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
type protocolDecoder interface {
Init() error
Decode(net.IP, []byte) ([]telegraf.Metric, error)
}
type NetFlow struct { type NetFlow struct {
ServiceAddress string `toml:"service_address"` ServiceAddress string `toml:"service_address"`
ReadBufferSize config.Size `toml:"read_buffer_size"` ReadBufferSize config.Size `toml:"read_buffer_size"`
@ -37,6 +32,11 @@ type NetFlow struct {
wg sync.WaitGroup wg sync.WaitGroup
} }
type protocolDecoder interface {
init() error
decode(net.IP, []byte) ([]telegraf.Metric, error)
}
func (*NetFlow) SampleConfig() string { func (*NetFlow) SampleConfig() string {
return sampleConfig return sampleConfig
} }
@ -61,12 +61,12 @@ func (n *NetFlow) Init() error {
n.Log.Warn("'private_enterprise_number_files' option will be ignored in 'netflow v9'") n.Log.Warn("'private_enterprise_number_files' option will be ignored in 'netflow v9'")
} }
n.decoder = &netflowDecoder{ n.decoder = &netflowDecoder{
Log: n.Log, log: n.Log,
} }
case "", "ipfix": case "", "ipfix":
n.decoder = &netflowDecoder{ n.decoder = &netflowDecoder{
PENFiles: n.PENFiles, penFiles: n.PENFiles,
Log: n.Log, log: n.Log,
} }
case "netflow v5": case "netflow v5":
if len(n.PENFiles) != 0 { if len(n.PENFiles) != 0 {
@ -74,12 +74,12 @@ func (n *NetFlow) Init() error {
} }
n.decoder = &netflowv5Decoder{} n.decoder = &netflowv5Decoder{}
case "sflow", "sflow v5": case "sflow", "sflow v5":
n.decoder = &sflowv5Decoder{Log: n.Log} n.decoder = &sflowv5Decoder{log: n.Log}
default: default:
return fmt.Errorf("invalid protocol %q, only supports 'sflow', 'netflow v5', 'netflow v9' and 'ipfix'", n.Protocol) return fmt.Errorf("invalid protocol %q, only supports 'sflow', 'netflow v5', 'netflow v9' and 'ipfix'", n.Protocol)
} }
return n.decoder.Init() return n.decoder.init()
} }
func (n *NetFlow) Start(acc telegraf.Accumulator) error { func (n *NetFlow) Start(acc telegraf.Accumulator) error {
@ -114,6 +114,10 @@ func (n *NetFlow) Start(acc telegraf.Accumulator) error {
return nil return nil
} }
func (n *NetFlow) Gather(_ telegraf.Accumulator) error {
return nil
}
func (n *NetFlow) Stop() { func (n *NetFlow) Stop() {
if n.conn != nil { if n.conn != nil {
_ = n.conn.Close() _ = n.conn.Close()
@ -138,7 +142,7 @@ func (n *NetFlow) read(acc telegraf.Accumulator) {
if n.Log.Level().Includes(telegraf.Trace) || n.DumpPackets { // for backward compatibility if n.Log.Level().Includes(telegraf.Trace) || n.DumpPackets { // for backward compatibility
n.Log.Tracef("raw data: %s", hex.EncodeToString(buf[:count])) n.Log.Tracef("raw data: %s", hex.EncodeToString(buf[:count]))
} }
metrics, err := n.decoder.Decode(src.IP, buf[:count]) metrics, err := n.decoder.decode(src.IP, buf[:count])
if err != nil { if err != nil {
errWithData := fmt.Errorf("%w; raw data: %s", err, hex.EncodeToString(buf[:count])) errWithData := fmt.Errorf("%w; raw data: %s", err, hex.EncodeToString(buf[:count]))
acc.AddError(errWithData) acc.AddError(errWithData)
@ -150,10 +154,6 @@ func (n *NetFlow) read(acc telegraf.Accumulator) {
} }
} }
func (n *NetFlow) Gather(_ telegraf.Accumulator) error {
return nil
}
// Register the plugin // Register the plugin
func init() { func init() {
inputs.Add("netflow", func() telegraf.Input { inputs.Add("netflow", func() telegraf.Input {

View File

@ -530,8 +530,8 @@ var fieldMappingsIPFIX = map[uint16][]fieldMapping{
// Decoder structure // Decoder structure
type netflowDecoder struct { type netflowDecoder struct {
PENFiles []string penFiles []string
Log telegraf.Logger log telegraf.Logger
templates map[string]netflow.NetFlowTemplateSystem templates map[string]netflow.NetFlowTemplateSystem
mappingsV9 map[uint16]fieldMapping mappingsV9 map[uint16]fieldMapping
@ -542,7 +542,7 @@ type netflowDecoder struct {
sync.Mutex sync.Mutex
} }
func (d *netflowDecoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric, error) { func (d *netflowDecoder) decode(srcIP net.IP, payload []byte) ([]telegraf.Metric, error) {
var metrics []telegraf.Metric var metrics []telegraf.Metric
t := time.Now() t := time.Now()
@ -563,7 +563,7 @@ func (d *netflowDecoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric
if err := netflow.DecodeMessageVersion(buf, templates, &msg9, &msg10); err != nil { if err := netflow.DecodeMessageVersion(buf, templates, &msg9, &msg10); err != nil {
if errors.Is(err, netflow.ErrorTemplateNotFound) { if errors.Is(err, netflow.ErrorTemplateNotFound) {
msg := "Skipping packet until the device resends the required template..." msg := "Skipping packet until the device resends the required template..."
d.Log.Warnf("%v. %s", err, msg) d.log.Warnf("%v. %s", err, msg)
return nil, nil return nil, nil
} }
return nil, fmt.Errorf("decoding message failed: %w", err) return nil, fmt.Errorf("decoding message failed: %w", err)
@ -587,7 +587,7 @@ func (d *netflowDecoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric
for _, value := range record.ScopesValues { for _, value := range record.ScopesValues {
decodedFields, err := d.decodeValueV9(value) decodedFields, err := d.decodeValueV9(value)
if err != nil { if err != nil {
d.Log.Errorf("decoding option record %+v failed: %v", record, err) d.log.Errorf("decoding option record %+v failed: %v", record, err)
continue continue
} }
for _, field := range decodedFields { for _, field := range decodedFields {
@ -597,7 +597,7 @@ func (d *netflowDecoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric
for _, value := range record.OptionsValues { for _, value := range record.OptionsValues {
decodedFields, err := d.decodeValueV9(value) decodedFields, err := d.decodeValueV9(value)
if err != nil { if err != nil {
d.Log.Errorf("decoding option record %+v failed: %v", record, err) d.log.Errorf("decoding option record %+v failed: %v", record, err)
continue continue
} }
for _, field := range decodedFields { for _, field := range decodedFields {
@ -616,7 +616,7 @@ func (d *netflowDecoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric
for _, value := range record.Values { for _, value := range record.Values {
decodedFields, err := d.decodeValueV9(value) decodedFields, err := d.decodeValueV9(value)
if err != nil { if err != nil {
d.Log.Errorf("decoding record %+v failed: %v", record, err) d.log.Errorf("decoding record %+v failed: %v", record, err)
continue continue
} }
for _, field := range decodedFields { for _, field := range decodedFields {
@ -643,7 +643,7 @@ func (d *netflowDecoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric
for _, value := range record.ScopesValues { for _, value := range record.ScopesValues {
decodedFields, err := d.decodeValueIPFIX(value) decodedFields, err := d.decodeValueIPFIX(value)
if err != nil { if err != nil {
d.Log.Errorf("decoding option record %+v failed: %v", record, err) d.log.Errorf("decoding option record %+v failed: %v", record, err)
continue continue
} }
for _, field := range decodedFields { for _, field := range decodedFields {
@ -653,7 +653,7 @@ func (d *netflowDecoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric
for _, value := range record.OptionsValues { for _, value := range record.OptionsValues {
decodedFields, err := d.decodeValueIPFIX(value) decodedFields, err := d.decodeValueIPFIX(value)
if err != nil { if err != nil {
d.Log.Errorf("decoding option record %+v failed: %v", record, err) d.log.Errorf("decoding option record %+v failed: %v", record, err)
continue continue
} }
for _, field := range decodedFields { for _, field := range decodedFields {
@ -673,7 +673,7 @@ func (d *netflowDecoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric
for _, value := range record.Values { for _, value := range record.Values {
decodedFields, err := d.decodeValueIPFIX(value) decodedFields, err := d.decodeValueIPFIX(value)
if err != nil { if err != nil {
d.Log.Errorf("decoding value %+v failed: %v", value, err) d.log.Errorf("decoding value %+v failed: %v", value, err)
continue continue
} }
for _, field := range decodedFields { for _, field := range decodedFields {
@ -691,7 +691,7 @@ func (d *netflowDecoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric
return metrics, nil return metrics, nil
} }
func (d *netflowDecoder) Init() error { func (d *netflowDecoder) init() error {
if err := initL4ProtoMapping(); err != nil { if err := initL4ProtoMapping(); err != nil {
return fmt.Errorf("initializing layer 4 protocol mapping failed: %w", err) return fmt.Errorf("initializing layer 4 protocol mapping failed: %w", err)
} }
@ -703,8 +703,8 @@ func (d *netflowDecoder) Init() error {
d.mappingsV9 = make(map[uint16]fieldMapping) d.mappingsV9 = make(map[uint16]fieldMapping)
d.mappingsIPFIX = make(map[uint16]fieldMapping) d.mappingsIPFIX = make(map[uint16]fieldMapping)
d.mappingsPEN = make(map[string]fieldMapping) d.mappingsPEN = make(map[string]fieldMapping)
for _, fn := range d.PENFiles { for _, fn := range d.penFiles {
d.Log.Debugf("Loading PEN mapping file %q...", fn) d.log.Debugf("Loading PEN mapping file %q...", fn)
mappings, err := loadMapping(fn) mappings, err := loadMapping(fn)
if err != nil { if err != nil {
return err return err
@ -719,7 +719,7 @@ func (d *netflowDecoder) Init() error {
d.mappingsPEN[k] = v d.mappingsPEN[k] = v
} }
} }
d.Log.Infof("Loaded %d PEN mappings...", len(d.mappingsPEN)) d.log.Infof("Loaded %d PEN mappings...", len(d.mappingsPEN))
d.logged = make(map[string]bool) d.logged = make(map[string]bool)
@ -783,7 +783,7 @@ func (d *netflowDecoder) decodeValueV9(field netflow.DataField) ([]telegraf.Fiel
// Return the raw data if no mapping was found // Return the raw data if no mapping was found
key := fmt.Sprintf("type_%d", elementID) key := fmt.Sprintf("type_%d", elementID)
if !d.logged[key] { if !d.logged[key] {
d.Log.Debugf("unknown Netflow v9 data field %v", field) d.log.Debugf("unknown Netflow v9 data field %v", field)
d.logged[key] = true d.logged[key] = true
} }
v, err := decodeHex(raw) v, err := decodeHex(raw)
@ -817,7 +817,7 @@ func (d *netflowDecoder) decodeValueIPFIX(field netflow.DataField) ([]telegraf.F
return []telegraf.Field{{Key: name, Value: v}}, nil return []telegraf.Field{{Key: name, Value: v}}, nil
} }
if !d.logged[key] { if !d.logged[key] {
d.Log.Debugf("unknown IPFIX PEN data field %v", field) d.log.Debugf("unknown IPFIX PEN data field %v", field)
d.logged[key] = true d.logged[key] = true
} }
name := fmt.Sprintf("type_%d_%s%d", field.Pen, prefix, elementID) name := fmt.Sprintf("type_%d_%s%d", field.Pen, prefix, elementID)
@ -866,7 +866,7 @@ func (d *netflowDecoder) decodeValueIPFIX(field netflow.DataField) ([]telegraf.F
// Return the raw data if no mapping was found // Return the raw data if no mapping was found
key := fmt.Sprintf("type_%d", elementID) key := fmt.Sprintf("type_%d", elementID)
if !d.logged[key] { if !d.logged[key] {
d.Log.Debugf("unknown IPFIX data field %v", field) d.log.Debugf("unknown IPFIX data field %v", field)
d.logged[key] = true d.logged[key] = true
} }
v, err := decodeHex(raw) v, err := decodeHex(raw)

View File

@ -15,14 +15,14 @@ import (
// Decoder structure // Decoder structure
type netflowv5Decoder struct{} type netflowv5Decoder struct{}
func (d *netflowv5Decoder) Init() error { func (d *netflowv5Decoder) init() error {
if err := initL4ProtoMapping(); err != nil { if err := initL4ProtoMapping(); err != nil {
return fmt.Errorf("initializing layer 4 protocol mapping failed: %w", err) return fmt.Errorf("initializing layer 4 protocol mapping failed: %w", err)
} }
return nil return nil
} }
func (d *netflowv5Decoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric, error) { func (d *netflowv5Decoder) decode(srcIP net.IP, payload []byte) ([]telegraf.Metric, error) {
src := srcIP.String() src := srcIP.String()
// Decode the message // Decode the message

View File

@ -19,13 +19,13 @@ import (
// Decoder structure // Decoder structure
type sflowv5Decoder struct { type sflowv5Decoder struct {
Log telegraf.Logger log telegraf.Logger
warnedCounterRaw map[uint32]bool warnedCounterRaw map[uint32]bool
warnedFlowRaw map[int64]bool warnedFlowRaw map[int64]bool
} }
func (d *sflowv5Decoder) Init() error { func (d *sflowv5Decoder) init() error {
if err := initL4ProtoMapping(); err != nil { if err := initL4ProtoMapping(); err != nil {
return fmt.Errorf("initializing layer 4 protocol mapping failed: %w", err) return fmt.Errorf("initializing layer 4 protocol mapping failed: %w", err)
} }
@ -35,7 +35,7 @@ func (d *sflowv5Decoder) Init() error {
return nil return nil
} }
func (d *sflowv5Decoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric, error) { func (d *sflowv5Decoder) decode(srcIP net.IP, payload []byte) ([]telegraf.Metric, error) {
t := time.Now() t := time.Now()
src := srcIP.String() src := srcIP.String()
@ -448,11 +448,11 @@ func (d *sflowv5Decoder) decodeRawHeaderSample(record *sflow.SampledHeader) (map
if !d.warnedFlowRaw[ltype] { if !d.warnedFlowRaw[ltype] {
contents := hex.EncodeToString(pkt.LayerContents()) contents := hex.EncodeToString(pkt.LayerContents())
payload := hex.EncodeToString(pkt.LayerPayload()) payload := hex.EncodeToString(pkt.LayerPayload())
d.Log.Warnf("Unknown flow raw flow message %s (%d):", pkt.LayerType().String(), pkt.LayerType()) d.log.Warnf("Unknown flow raw flow message %s (%d):", pkt.LayerType().String(), pkt.LayerType())
d.Log.Warnf(" contents: %s", contents) d.log.Warnf(" contents: %s", contents)
d.Log.Warnf(" payload: %s", payload) d.log.Warnf(" payload: %s", payload)
d.Log.Warn("This message is only printed once.") d.log.Warn("This message is only printed once.")
} }
d.warnedFlowRaw[ltype] = true d.warnedFlowRaw[ltype] = true
} }
@ -524,8 +524,8 @@ func (d *sflowv5Decoder) decodeCounterRecords(records []sflow.CounterRecord) (ma
default: default:
if !d.warnedCounterRaw[r.Header.DataFormat] { if !d.warnedCounterRaw[r.Header.DataFormat] {
data := hex.EncodeToString(record.Data) data := hex.EncodeToString(record.Data)
d.Log.Warnf("Unknown counter raw flow message %d: %s", r.Header.DataFormat, data) d.log.Warnf("Unknown counter raw flow message %d: %s", r.Header.DataFormat, data)
d.Log.Warn("This message is only printed once.") d.log.Warn("This message is only printed once.")
} }
d.warnedCounterRaw[r.Header.DataFormat] = true d.warnedCounterRaw[r.Header.DataFormat] = true
} }

View File

@ -14,16 +14,16 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
type NetStats struct { type NetStat struct {
PS system.PS ps system.PS
} }
func (*NetStats) SampleConfig() string { func (*NetStat) SampleConfig() string {
return sampleConfig return sampleConfig
} }
func (ns *NetStats) Gather(acc telegraf.Accumulator) error { func (ns *NetStat) Gather(acc telegraf.Accumulator) error {
netconns, err := ns.PS.NetConnections() netconns, err := ns.ps.NetConnections()
if err != nil { if err != nil {
return fmt.Errorf("error getting net connections info: %w", err) return fmt.Errorf("error getting net connections info: %w", err)
} }
@ -66,6 +66,6 @@ func (ns *NetStats) Gather(acc telegraf.Accumulator) error {
func init() { func init() {
inputs.Add("netstat", func() telegraf.Input { inputs.Add("netstat", func() telegraf.Input {
return &NetStats{PS: system.NewSystemPS()} return &NetStat{ps: system.NewSystemPS()}
}) })
} }

View File

@ -32,7 +32,7 @@ func TestNetStats(t *testing.T) {
}, nil) }, nil)
var acc testutil.Accumulator var acc testutil.Accumulator
require.NoError(t, (&NetStats{PS: &mps}).Gather(&acc)) require.NoError(t, (&NetStat{ps: &mps}).Gather(&acc))
expected := []telegraf.Metric{ expected := []telegraf.Metric{
metric.New( metric.New(

View File

@ -31,36 +31,193 @@ type NFSClient struct {
mountstatsPath string mountstatsPath string
} }
func convertToUint64(line []string) ([]uint64, error) { func (*NFSClient) SampleConfig() string {
/* A "line" of input data (a pre-split array of strings) is return sampleConfig
processed one field at a time. Each field is converted to }
an uint64 value, and appended to an array of return values.
On an error, check for ErrRange, and returns an error
if found. This situation indicates a pretty major issue in
the /proc/self/mountstats file, and returning faulty data
is worse than no data. Other errors are ignored, and append
whatever we got in the first place (probably 0).
Yes, this is ugly. */
if len(line) < 2 { func (n *NFSClient) Init() error {
return nil, nil var nfs3Fields = []string{
"NULL",
"GETATTR",
"SETATTR",
"LOOKUP",
"ACCESS",
"READLINK",
"READ",
"WRITE",
"CREATE",
"MKDIR",
"SYMLINK",
"MKNOD",
"REMOVE",
"RMDIR",
"RENAME",
"LINK",
"READDIR",
"READDIRPLUS",
"FSSTAT",
"FSINFO",
"PATHCONF",
"COMMIT",
} }
nline := make([]uint64, 0, len(line[1:])) var nfs4Fields = []string{
// Skip the first field; it's handled specially as the "first" variable "NULL",
for _, l := range line[1:] { "READ",
val, err := strconv.ParseUint(l, 10, 64) "WRITE",
if err != nil { "COMMIT",
var numError *strconv.NumError "OPEN",
if errors.As(err, &numError) { "OPEN_CONFIRM",
if errors.Is(numError.Err, strconv.ErrRange) { "OPEN_NOATTR",
return nil, fmt.Errorf("errrange: line:[%v] raw:[%v] -> parsed:[%v]", line, l, val) "OPEN_DOWNGRADE",
} "CLOSE",
"SETATTR",
"FSINFO",
"RENEW",
"SETCLIENTID",
"SETCLIENTID_CONFIRM",
"LOCK",
"LOCKT",
"LOCKU",
"ACCESS",
"GETATTR",
"LOOKUP",
"LOOKUP_ROOT",
"REMOVE",
"RENAME",
"LINK",
"SYMLINK",
"CREATE",
"PATHCONF",
"STATFS",
"READLINK",
"READDIR",
"SERVER_CAPS",
"DELEGRETURN",
"GETACL",
"SETACL",
"FS_LOCATIONS",
"RELEASE_LOCKOWNER",
"SECINFO",
"FSID_PRESENT",
"EXCHANGE_ID",
"CREATE_SESSION",
"DESTROY_SESSION",
"SEQUENCE",
"GET_LEASE_TIME",
"RECLAIM_COMPLETE",
"LAYOUTGET",
"GETDEVICEINFO",
"LAYOUTCOMMIT",
"LAYOUTRETURN",
"SECINFO_NO_NAME",
"TEST_STATEID",
"FREE_STATEID",
"GETDEVICELIST",
"BIND_CONN_TO_SESSION",
"DESTROY_CLIENTID",
"SEEK",
"ALLOCATE",
"DEALLOCATE",
"LAYOUTSTATS",
"CLONE",
"COPY",
"OFFLOAD_CANCEL",
"LOOKUPP",
"LAYOUTERROR",
"COPY_NOTIFY",
"GETXATTR",
"SETXATTR",
"LISTXATTRS",
"REMOVEXATTR",
}
nfs3Ops := make(map[string]bool)
nfs4Ops := make(map[string]bool)
n.mountstatsPath = n.getMountStatsPath()
if len(n.IncludeOperations) == 0 {
for _, Op := range nfs3Fields {
nfs3Ops[Op] = true
}
for _, Op := range nfs4Fields {
nfs4Ops[Op] = true
}
} else {
for _, Op := range n.IncludeOperations {
nfs3Ops[Op] = true
}
for _, Op := range n.IncludeOperations {
nfs4Ops[Op] = true
}
}
if len(n.ExcludeOperations) > 0 {
for _, Op := range n.ExcludeOperations {
if nfs3Ops[Op] {
delete(nfs3Ops, Op)
}
if nfs4Ops[Op] {
delete(nfs4Ops, Op)
} }
} }
nline = append(nline, val)
} }
return nline, nil
n.nfs3Ops = nfs3Ops
n.nfs4Ops = nfs4Ops
if len(n.IncludeMounts) > 0 {
n.Log.Debugf("Including these mount patterns: %v", n.IncludeMounts)
} else {
n.Log.Debugf("Including all mounts.")
}
if len(n.ExcludeMounts) > 0 {
n.Log.Debugf("Excluding these mount patterns: %v", n.ExcludeMounts)
} else {
n.Log.Debugf("Not excluding any mounts.")
}
if len(n.IncludeOperations) > 0 {
n.Log.Debugf("Including these operations: %v", n.IncludeOperations)
} else {
n.Log.Debugf("Including all operations.")
}
if len(n.ExcludeOperations) > 0 {
n.Log.Debugf("Excluding these mount patterns: %v", n.ExcludeOperations)
} else {
n.Log.Debugf("Not excluding any operations.")
}
return nil
}
func (n *NFSClient) Gather(acc telegraf.Accumulator) error {
if _, err := os.Stat(n.mountstatsPath); os.IsNotExist(err) {
return err
}
// Attempt to read the file to see if we have permissions before opening
// which can lead to a panic
if _, err := os.ReadFile(n.mountstatsPath); err != nil {
return err
}
file, err := os.Open(n.mountstatsPath)
if err != nil {
n.Log.Errorf("Failed opening the %q file: %v ", file.Name(), err)
return err
}
defer file.Close()
scanner := bufio.NewScanner(file)
if err := n.processText(scanner, acc); err != nil {
return err
}
return scanner.Err()
} }
func (n *NFSClient) parseStat(mountpoint, export, version string, line []string, acc telegraf.Accumulator) error { func (n *NFSClient) parseStat(mountpoint, export, version string, line []string, acc telegraf.Accumulator) error {
@ -291,193 +448,36 @@ func (n *NFSClient) getMountStatsPath() string {
return path return path
} }
func (*NFSClient) SampleConfig() string { func convertToUint64(line []string) ([]uint64, error) {
return sampleConfig /* A "line" of input data (a pre-split array of strings) is
} processed one field at a time. Each field is converted to
an uint64 value, and appended to an array of return values.
On an error, check for ErrRange, and returns an error
if found. This situation indicates a pretty major issue in
the /proc/self/mountstats file, and returning faulty data
is worse than no data. Other errors are ignored, and append
whatever we got in the first place (probably 0).
Yes, this is ugly. */
func (n *NFSClient) Gather(acc telegraf.Accumulator) error { if len(line) < 2 {
if _, err := os.Stat(n.mountstatsPath); os.IsNotExist(err) { return nil, nil
return err
} }
// Attempt to read the file to see if we have permissions before opening nline := make([]uint64, 0, len(line[1:]))
// which can lead to a panic // Skip the first field; it's handled specially as the "first" variable
if _, err := os.ReadFile(n.mountstatsPath); err != nil { for _, l := range line[1:] {
return err val, err := strconv.ParseUint(l, 10, 64)
} if err != nil {
var numError *strconv.NumError
file, err := os.Open(n.mountstatsPath) if errors.As(err, &numError) {
if err != nil { if errors.Is(numError.Err, strconv.ErrRange) {
n.Log.Errorf("Failed opening the %q file: %v ", file.Name(), err) return nil, fmt.Errorf("errrange: line:[%v] raw:[%v] -> parsed:[%v]", line, l, val)
return err }
}
defer file.Close()
scanner := bufio.NewScanner(file)
if err := n.processText(scanner, acc); err != nil {
return err
}
return scanner.Err()
}
func (n *NFSClient) Init() error {
var nfs3Fields = []string{
"NULL",
"GETATTR",
"SETATTR",
"LOOKUP",
"ACCESS",
"READLINK",
"READ",
"WRITE",
"CREATE",
"MKDIR",
"SYMLINK",
"MKNOD",
"REMOVE",
"RMDIR",
"RENAME",
"LINK",
"READDIR",
"READDIRPLUS",
"FSSTAT",
"FSINFO",
"PATHCONF",
"COMMIT",
}
var nfs4Fields = []string{
"NULL",
"READ",
"WRITE",
"COMMIT",
"OPEN",
"OPEN_CONFIRM",
"OPEN_NOATTR",
"OPEN_DOWNGRADE",
"CLOSE",
"SETATTR",
"FSINFO",
"RENEW",
"SETCLIENTID",
"SETCLIENTID_CONFIRM",
"LOCK",
"LOCKT",
"LOCKU",
"ACCESS",
"GETATTR",
"LOOKUP",
"LOOKUP_ROOT",
"REMOVE",
"RENAME",
"LINK",
"SYMLINK",
"CREATE",
"PATHCONF",
"STATFS",
"READLINK",
"READDIR",
"SERVER_CAPS",
"DELEGRETURN",
"GETACL",
"SETACL",
"FS_LOCATIONS",
"RELEASE_LOCKOWNER",
"SECINFO",
"FSID_PRESENT",
"EXCHANGE_ID",
"CREATE_SESSION",
"DESTROY_SESSION",
"SEQUENCE",
"GET_LEASE_TIME",
"RECLAIM_COMPLETE",
"LAYOUTGET",
"GETDEVICEINFO",
"LAYOUTCOMMIT",
"LAYOUTRETURN",
"SECINFO_NO_NAME",
"TEST_STATEID",
"FREE_STATEID",
"GETDEVICELIST",
"BIND_CONN_TO_SESSION",
"DESTROY_CLIENTID",
"SEEK",
"ALLOCATE",
"DEALLOCATE",
"LAYOUTSTATS",
"CLONE",
"COPY",
"OFFLOAD_CANCEL",
"LOOKUPP",
"LAYOUTERROR",
"COPY_NOTIFY",
"GETXATTR",
"SETXATTR",
"LISTXATTRS",
"REMOVEXATTR",
}
nfs3Ops := make(map[string]bool)
nfs4Ops := make(map[string]bool)
n.mountstatsPath = n.getMountStatsPath()
if len(n.IncludeOperations) == 0 {
for _, Op := range nfs3Fields {
nfs3Ops[Op] = true
}
for _, Op := range nfs4Fields {
nfs4Ops[Op] = true
}
} else {
for _, Op := range n.IncludeOperations {
nfs3Ops[Op] = true
}
for _, Op := range n.IncludeOperations {
nfs4Ops[Op] = true
}
}
if len(n.ExcludeOperations) > 0 {
for _, Op := range n.ExcludeOperations {
if nfs3Ops[Op] {
delete(nfs3Ops, Op)
}
if nfs4Ops[Op] {
delete(nfs4Ops, Op)
} }
} }
nline = append(nline, val)
} }
return nline, nil
n.nfs3Ops = nfs3Ops
n.nfs4Ops = nfs4Ops
if len(n.IncludeMounts) > 0 {
n.Log.Debugf("Including these mount patterns: %v", n.IncludeMounts)
} else {
n.Log.Debugf("Including all mounts.")
}
if len(n.ExcludeMounts) > 0 {
n.Log.Debugf("Excluding these mount patterns: %v", n.ExcludeMounts)
} else {
n.Log.Debugf("Not excluding any mounts.")
}
if len(n.IncludeOperations) > 0 {
n.Log.Debugf("Including these operations: %v", n.IncludeOperations)
} else {
n.Log.Debugf("Including all operations.")
}
if len(n.ExcludeOperations) > 0 {
n.Log.Debugf("Excluding these mount patterns: %v", n.ExcludeOperations)
} else {
n.Log.Debugf("Not excluding any operations.")
}
return nil
} }
func init() { func init() {

View File

@ -23,8 +23,8 @@ import (
var sampleConfig string var sampleConfig string
type Nginx struct { type Nginx struct {
Urls []string Urls []string `toml:"urls"`
ResponseTimeout config.Duration ResponseTimeout config.Duration `toml:"response_timeout"`
tls.ClientConfig tls.ClientConfig
// HTTP client // HTTP client

View File

@ -276,11 +276,11 @@ func gatherStatusURL(r *bufio.Reader, tags map[string]string, acc telegraf.Accum
if err := dec.Decode(status); err != nil { if err := dec.Decode(status); err != nil {
return errors.New("error while decoding JSON response") return errors.New("error while decoding JSON response")
} }
status.Gather(tags, acc) status.gather(tags, acc)
return nil return nil
} }
func (s *status) Gather(tags map[string]string, acc telegraf.Accumulator) { func (s *status) gather(tags map[string]string, acc telegraf.Accumulator) {
s.gatherProcessesMetrics(tags, acc) s.gatherProcessesMetrics(tags, acc)
s.gatherConnectionsMetrics(tags, acc) s.gatherConnectionsMetrics(tags, acc)
s.gatherSslMetrics(tags, acc) s.gatherSslMetrics(tags, acc)

View File

@ -18,15 +18,6 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
type NginxPlusAPI struct {
Urls []string `toml:"urls"`
APIVersion int64 `toml:"api_version"`
ResponseTimeout config.Duration `toml:"response_timeout"`
tls.ClientConfig
client *http.Client
}
const ( const (
// Default settings // Default settings
defaultAPIVersion = 3 defaultAPIVersion = 3
@ -49,6 +40,15 @@ const (
streamUpstreamsPath = "stream/upstreams" streamUpstreamsPath = "stream/upstreams"
) )
type NginxPlusAPI struct {
Urls []string `toml:"urls"`
APIVersion int64 `toml:"api_version"`
ResponseTimeout config.Duration `toml:"response_timeout"`
tls.ClientConfig
client *http.Client
}
func (*NginxPlusAPI) SampleConfig() string { func (*NginxPlusAPI) SampleConfig() string {
return sampleConfig return sampleConfig
} }

View File

@ -106,7 +106,7 @@ func (n *NginxSTS) gatherURL(addr *url.URL, acc telegraf.Accumulator) error {
} }
} }
type NginxSTSResponse struct { type nginxSTSResponse struct {
Connections struct { Connections struct {
Active uint64 `json:"active"` Active uint64 `json:"active"`
Reading uint64 `json:"reading"` Reading uint64 `json:"reading"`
@ -117,12 +117,12 @@ type NginxSTSResponse struct {
Requests uint64 `json:"requests"` Requests uint64 `json:"requests"`
} `json:"connections"` } `json:"connections"`
Hostname string `json:"hostName"` Hostname string `json:"hostName"`
StreamFilterZones map[string]map[string]Server `json:"streamFilterZones"` StreamFilterZones map[string]map[string]server `json:"streamFilterZones"`
StreamServerZones map[string]Server `json:"streamServerZones"` StreamServerZones map[string]server `json:"streamServerZones"`
StreamUpstreamZones map[string][]Upstream `json:"streamUpstreamZones"` StreamUpstreamZones map[string][]upstream `json:"streamUpstreamZones"`
} }
type Server struct { type server struct {
ConnectCounter uint64 `json:"connectCounter"` ConnectCounter uint64 `json:"connectCounter"`
InBytes uint64 `json:"inBytes"` InBytes uint64 `json:"inBytes"`
OutBytes uint64 `json:"outBytes"` OutBytes uint64 `json:"outBytes"`
@ -137,7 +137,7 @@ type Server struct {
} `json:"responses"` } `json:"responses"`
} }
type Upstream struct { type upstream struct {
Server string `json:"server"` Server string `json:"server"`
ConnectCounter uint64 `json:"connectCounter"` ConnectCounter uint64 `json:"connectCounter"`
InBytes uint64 `json:"inBytes"` InBytes uint64 `json:"inBytes"`
@ -166,7 +166,7 @@ type Upstream struct {
func gatherStatusURL(r *bufio.Reader, tags map[string]string, acc telegraf.Accumulator) error { func gatherStatusURL(r *bufio.Reader, tags map[string]string, acc telegraf.Accumulator) error {
dec := json.NewDecoder(r) dec := json.NewDecoder(r)
status := &NginxSTSResponse{} status := &nginxSTSResponse{}
if err := dec.Decode(status); err != nil { if err := dec.Decode(status); err != nil {
return errors.New("error while decoding JSON response") return errors.New("error while decoding JSON response")
} }

View File

@ -34,31 +34,15 @@ type NginxUpstreamCheck struct {
client *http.Client client *http.Client
} }
func NewNginxUpstreamCheck() *NginxUpstreamCheck { type nginxUpstreamCheckData struct {
return &NginxUpstreamCheck{
URL: "http://127.0.0.1/status?format=json",
Method: "GET",
Headers: make(map[string]string),
HostHeader: "",
Timeout: config.Duration(time.Second * 5),
}
}
func init() {
inputs.Add("nginx_upstream_check", func() telegraf.Input {
return NewNginxUpstreamCheck()
})
}
type NginxUpstreamCheckData struct {
Servers struct { Servers struct {
Total uint64 `json:"total"` Total uint64 `json:"total"`
Generation uint64 `json:"generation"` Generation uint64 `json:"generation"`
Server []NginxUpstreamCheckServer `json:"server"` Server []nginxUpstreamCheckServer `json:"server"`
} `json:"servers"` } `json:"servers"`
} }
type NginxUpstreamCheckServer struct { type nginxUpstreamCheckServer struct {
Index uint64 `json:"index"` Index uint64 `json:"index"`
Upstream string `json:"upstream"` Upstream string `json:"upstream"`
Name string `json:"name"` Name string `json:"name"`
@ -69,6 +53,33 @@ type NginxUpstreamCheckServer struct {
Port uint16 `json:"port"` Port uint16 `json:"port"`
} }
func (*NginxUpstreamCheck) SampleConfig() string {
return sampleConfig
}
func (check *NginxUpstreamCheck) Gather(accumulator telegraf.Accumulator) error {
if check.client == nil {
client, err := check.createHTTPClient()
if err != nil {
return err
}
check.client = client
}
statusURL, err := url.Parse(check.URL)
if err != nil {
return err
}
err = check.gatherStatusData(statusURL.String(), accumulator)
if err != nil {
return err
}
return nil
}
// createHTTPClient create a clients to access API // createHTTPClient create a clients to access API
func (check *NginxUpstreamCheck) createHTTPClient() (*http.Client, error) { func (check *NginxUpstreamCheck) createHTTPClient() (*http.Client, error) {
tlsConfig, err := check.ClientConfig.TLSConfig() tlsConfig, err := check.ClientConfig.TLSConfig()
@ -130,35 +141,8 @@ func (check *NginxUpstreamCheck) gatherJSONData(address string, value interface{
return nil return nil
} }
func (*NginxUpstreamCheck) SampleConfig() string {
return sampleConfig
}
func (check *NginxUpstreamCheck) Gather(accumulator telegraf.Accumulator) error {
if check.client == nil {
client, err := check.createHTTPClient()
if err != nil {
return err
}
check.client = client
}
statusURL, err := url.Parse(check.URL)
if err != nil {
return err
}
err = check.gatherStatusData(statusURL.String(), accumulator)
if err != nil {
return err
}
return nil
}
func (check *NginxUpstreamCheck) gatherStatusData(address string, accumulator telegraf.Accumulator) error { func (check *NginxUpstreamCheck) gatherStatusData(address string, accumulator telegraf.Accumulator) error {
checkData := &NginxUpstreamCheckData{} checkData := &nginxUpstreamCheckData{}
err := check.gatherJSONData(address, checkData) err := check.gatherJSONData(address, checkData)
if err != nil { if err != nil {
@ -197,3 +181,19 @@ func (check *NginxUpstreamCheck) getStatusCode(status string) uint8 {
return 0 return 0
} }
} }
func newNginxUpstreamCheck() *NginxUpstreamCheck {
return &NginxUpstreamCheck{
URL: "http://127.0.0.1/status?format=json",
Method: "GET",
Headers: make(map[string]string),
HostHeader: "",
Timeout: config.Duration(time.Second * 5),
}
}
func init() {
inputs.Add("nginx_upstream_check", func() telegraf.Input {
return newNginxUpstreamCheck()
})
}

View File

@ -58,7 +58,7 @@ func TestNginxUpstreamCheckData(test *testing.T) {
})) }))
defer testServer.Close() defer testServer.Close()
check := NewNginxUpstreamCheck() check := newNginxUpstreamCheck()
check.URL = testServer.URL + "/status" check.URL = testServer.URL + "/status"
var accumulator testutil.Accumulator var accumulator testutil.Accumulator
@ -139,7 +139,7 @@ func TestNginxUpstreamCheckRequest(test *testing.T) {
})) }))
defer testServer.Close() defer testServer.Close()
check := NewNginxUpstreamCheck() check := newNginxUpstreamCheck()
check.URL = testServer.URL + "/status" check.URL = testServer.URL + "/status"
check.Headers["X-test"] = "test-value" check.Headers["X-test"] = "test-value"
check.HostHeader = "status.local" check.HostHeader = "status.local"

View File

@ -106,7 +106,7 @@ func (n *NginxVTS) gatherURL(addr *url.URL, acc telegraf.Accumulator) error {
} }
} }
type NginxVTSResponse struct { type nginxVTSResponse struct {
Connections struct { Connections struct {
Active uint64 `json:"active"` Active uint64 `json:"active"`
Reading uint64 `json:"reading"` Reading uint64 `json:"reading"`
@ -116,13 +116,13 @@ type NginxVTSResponse struct {
Handled uint64 `json:"handled"` Handled uint64 `json:"handled"`
Requests uint64 `json:"requests"` Requests uint64 `json:"requests"`
} `json:"connections"` } `json:"connections"`
ServerZones map[string]Server `json:"serverZones"` ServerZones map[string]server `json:"serverZones"`
FilterZones map[string]map[string]Server `json:"filterZones"` FilterZones map[string]map[string]server `json:"filterZones"`
UpstreamZones map[string][]Upstream `json:"upstreamZones"` UpstreamZones map[string][]upstream `json:"upstreamZones"`
CacheZones map[string]Cache `json:"cacheZones"` CacheZones map[string]cache `json:"cacheZones"`
} }
type Server struct { type server struct {
RequestCounter uint64 `json:"requestCounter"` RequestCounter uint64 `json:"requestCounter"`
InBytes uint64 `json:"inBytes"` InBytes uint64 `json:"inBytes"`
OutBytes uint64 `json:"outBytes"` OutBytes uint64 `json:"outBytes"`
@ -144,7 +144,7 @@ type Server struct {
} `json:"responses"` } `json:"responses"`
} }
type Upstream struct { type upstream struct {
Server string `json:"server"` Server string `json:"server"`
RequestCounter uint64 `json:"requestCounter"` RequestCounter uint64 `json:"requestCounter"`
InBytes uint64 `json:"inBytes"` InBytes uint64 `json:"inBytes"`
@ -165,7 +165,7 @@ type Upstream struct {
Down bool `json:"down"` Down bool `json:"down"`
} }
type Cache struct { type cache struct {
MaxSize uint64 `json:"maxSize"` MaxSize uint64 `json:"maxSize"`
UsedSize uint64 `json:"usedSize"` UsedSize uint64 `json:"usedSize"`
InBytes uint64 `json:"inBytes"` InBytes uint64 `json:"inBytes"`
@ -184,7 +184,7 @@ type Cache struct {
func gatherStatusURL(r *bufio.Reader, tags map[string]string, acc telegraf.Accumulator) error { func gatherStatusURL(r *bufio.Reader, tags map[string]string, acc telegraf.Accumulator) error {
dec := json.NewDecoder(r) dec := json.NewDecoder(r)
status := &NginxVTSResponse{} status := &nginxVTSResponse{}
if err := dec.Decode(status); err != nil { if err := dec.Decode(status); err != nil {
return errors.New("error while decoding JSON response") return errors.New("error while decoding JSON response")
} }

View File

@ -18,27 +18,16 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
// Nomad configuration object const timeLayout = "2006-01-02 15:04:05 -0700 MST"
type Nomad struct { type Nomad struct {
URL string `toml:"url"` URL string `toml:"url"`
ResponseTimeout config.Duration `toml:"response_timeout"` ResponseTimeout config.Duration `toml:"response_timeout"`
tls.ClientConfig tls.ClientConfig
roundTripper http.RoundTripper roundTripper http.RoundTripper
} }
const timeLayout = "2006-01-02 15:04:05 -0700 MST"
func init() {
inputs.Add("nomad", func() telegraf.Input {
return &Nomad{
ResponseTimeout: config.Duration(5 * time.Second),
}
})
}
func (*Nomad) SampleConfig() string { func (*Nomad) SampleConfig() string {
return sampleConfig return sampleConfig
} }
@ -161,3 +150,11 @@ func buildNomadMetrics(acc telegraf.Accumulator, summaryMetrics *metricsSummary)
return nil return nil
} }
func init() {
inputs.Add("nomad", func() telegraf.Input {
return &Nomad{
ResponseTimeout: config.Duration(5 * time.Second),
}
})
}

View File

@ -37,6 +37,7 @@ type sampledValue struct {
DisplayLabels map[string]string `json:"Labels"` DisplayLabels map[string]string `json:"Labels"`
} }
// AggregateSample needs to be exported, because JSON decode cannot set embedded pointer to unexported struct
type AggregateSample struct { type AggregateSample struct {
Count int `json:"count"` Count int `json:"count"`
Rate float64 `json:"rate"` Rate float64 `json:"rate"`

View File

@ -21,61 +21,27 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
type runner func(cmdName string, timeout config.Duration, useSudo bool, Server string, ConfigFile string) (*bytes.Buffer, error) var (
defaultBinary = "/usr/sbin/nsd-control"
defaultTimeout = config.Duration(time.Second)
)
// NSD is used to store configuration values
type NSD struct { type NSD struct {
Binary string Binary string `toml:"binary"`
Timeout config.Duration Timeout config.Duration `toml:"timeout"`
UseSudo bool UseSudo bool `toml:"use_sudo"`
Server string Server string `toml:"server"`
ConfigFile string ConfigFile string `toml:"config_file"`
run runner run runner
} }
var defaultBinary = "/usr/sbin/nsd-control" type runner func(cmdName string, timeout config.Duration, useSudo bool, Server string, ConfigFile string) (*bytes.Buffer, error)
var defaultTimeout = config.Duration(time.Second)
// Shell out to nsd_stat and return the output
func nsdRunner(cmdName string, timeout config.Duration, useSudo bool, server, configFile string) (*bytes.Buffer, error) {
cmdArgs := []string{"stats_noreset"}
if server != "" {
host, port, err := net.SplitHostPort(server)
if err == nil {
server = host + "@" + port
}
cmdArgs = append([]string{"-s", server}, cmdArgs...)
}
if configFile != "" {
cmdArgs = append([]string{"-c", configFile}, cmdArgs...)
}
cmd := exec.Command(cmdName, cmdArgs...)
if useSudo {
cmdArgs = append([]string{cmdName}, cmdArgs...)
cmd = exec.Command("sudo", cmdArgs...)
}
var out bytes.Buffer
cmd.Stdout = &out
err := internal.RunTimeout(cmd, time.Duration(timeout))
if err != nil {
return &out, fmt.Errorf("error running nsd-control: %w (%s %v)", err, cmdName, cmdArgs)
}
return &out, nil
}
func (*NSD) SampleConfig() string { func (*NSD) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// Gather collects stats from nsd-control and adds them to the Accumulator
func (s *NSD) Gather(acc telegraf.Accumulator) error { func (s *NSD) Gather(acc telegraf.Accumulator) error {
out, err := s.run(s.Binary, s.Timeout, s.UseSudo, s.Server, s.ConfigFile) out, err := s.run(s.Binary, s.Timeout, s.UseSudo, s.Server, s.ConfigFile)
if err != nil { if err != nil {
@ -133,6 +99,40 @@ func (s *NSD) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
// Shell out to nsd_stat and return the output
func nsdRunner(cmdName string, timeout config.Duration, useSudo bool, server, configFile string) (*bytes.Buffer, error) {
cmdArgs := []string{"stats_noreset"}
if server != "" {
host, port, err := net.SplitHostPort(server)
if err == nil {
server = host + "@" + port
}
cmdArgs = append([]string{"-s", server}, cmdArgs...)
}
if configFile != "" {
cmdArgs = append([]string{"-c", configFile}, cmdArgs...)
}
cmd := exec.Command(cmdName, cmdArgs...)
if useSudo {
cmdArgs = append([]string{cmdName}, cmdArgs...)
cmd = exec.Command("sudo", cmdArgs...)
}
var out bytes.Buffer
cmd.Stdout = &out
err := internal.RunTimeout(cmd, time.Duration(timeout))
if err != nil {
return &out, fmt.Errorf("error running nsd-control: %w (%s %v)", err, cmdName, cmdArgs)
}
return &out, nil
}
func init() { func init() {
inputs.Add("nsd", func() telegraf.Input { inputs.Add("nsd", func() telegraf.Input {
return &NSD{ return &NSD{

View File

@ -10,7 +10,7 @@ import (
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
) )
func NSDControl(output string) func(string, config.Duration, bool, string, string) (*bytes.Buffer, error) { func nsdControl(output string) func(string, config.Duration, bool, string, string) (*bytes.Buffer, error) {
return func(string, config.Duration, bool, string, string) (*bytes.Buffer, error) { return func(string, config.Duration, bool, string, string) (*bytes.Buffer, error) {
return bytes.NewBufferString(output), nil return bytes.NewBufferString(output), nil
} }
@ -19,7 +19,7 @@ func NSDControl(output string) func(string, config.Duration, bool, string, strin
func TestParseFullOutput(t *testing.T) { func TestParseFullOutput(t *testing.T) {
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
v := &NSD{ v := &NSD{
run: NSDControl(fullOutput), run: nsdControl(fullOutput),
} }
err := v.Gather(acc) err := v.Gather(acc)

View File

@ -42,25 +42,15 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
// Might add Lookupd endpoints for cluster discovery
type NSQ struct {
Endpoints []string
tls.ClientConfig
httpClient *http.Client
}
const ( const (
requestPattern = `%s/stats?format=json` requestPattern = `%s/stats?format=json`
) )
func init() { type NSQ struct {
inputs.Add("nsq", func() telegraf.Input { Endpoints []string `toml:"endpoints"`
return New()
})
}
func New() *NSQ { tls.ClientConfig
return &NSQ{} httpClient *http.Client
} }
func (*NSQ) SampleConfig() string { func (*NSQ) SampleConfig() string {
@ -305,3 +295,13 @@ type clientStats struct {
TLSNegotiatedProtocol string `json:"tls_negotiated_protocol"` TLSNegotiatedProtocol string `json:"tls_negotiated_protocol"`
TLSNegotiatedProtocolIsMutual bool `json:"tls_negotiated_protocol_is_mutual"` TLSNegotiatedProtocolIsMutual bool `json:"tls_negotiated_protocol_is_mutual"`
} }
func newNSQ() *NSQ {
return &NSQ{}
}
func init() {
inputs.Add("nsq", func() telegraf.Input {
return newNSQ()
})
}

View File

@ -23,7 +23,7 @@ func TestNSQStatsV1(t *testing.T) {
})) }))
defer ts.Close() defer ts.Close()
n := New() n := newNSQ()
n.Endpoints = []string{ts.URL} n.Endpoints = []string{ts.URL}
var acc testutil.Accumulator var acc testutil.Accumulator
@ -283,7 +283,7 @@ func TestNSQStatsPreV1(t *testing.T) {
})) }))
defer ts.Close() defer ts.Close()
n := New() n := newNSQ()
n.Endpoints = []string{ts.URL} n.Endpoints = []string{ts.URL}
var acc testutil.Accumulator var acc testutil.Accumulator

View File

@ -20,8 +20,29 @@ const (
defaultMaxUndeliveredMessages = 1000 defaultMaxUndeliveredMessages = 1000
) )
type empty struct{} type NSQConsumer struct {
type semaphore chan empty Server string `toml:"server" deprecated:"1.5.0;1.35.0;use 'nsqd' instead"`
Nsqd []string `toml:"nsqd"`
Nsqlookupd []string `toml:"nsqlookupd"`
Topic string `toml:"topic"`
Channel string `toml:"channel"`
MaxInFlight int `toml:"max_in_flight"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
Log telegraf.Logger `toml:"-"`
parser telegraf.Parser
consumer *nsq.Consumer
mu sync.Mutex
messages map[telegraf.TrackingID]*nsq.Message
wg sync.WaitGroup
cancel context.CancelFunc
}
type (
empty struct{}
semaphore chan empty
)
type logger struct { type logger struct {
log telegraf.Logger log telegraf.Logger
@ -32,28 +53,6 @@ func (l *logger) Output(_ int, s string) error {
return nil return nil
} }
// NSQConsumer represents the configuration of the plugin
type NSQConsumer struct {
Server string `toml:"server" deprecated:"1.5.0;1.35.0;use 'nsqd' instead"`
Nsqd []string `toml:"nsqd"`
Nsqlookupd []string `toml:"nsqlookupd"`
Topic string `toml:"topic"`
Channel string `toml:"channel"`
MaxInFlight int `toml:"max_in_flight"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
parser telegraf.Parser
consumer *nsq.Consumer
Log telegraf.Logger
mu sync.Mutex
messages map[telegraf.TrackingID]*nsq.Message
wg sync.WaitGroup
cancel context.CancelFunc
}
func (*NSQConsumer) SampleConfig() string { func (*NSQConsumer) SampleConfig() string {
return sampleConfig return sampleConfig
} }
@ -77,7 +76,6 @@ func (n *NSQConsumer) SetParser(parser telegraf.Parser) {
n.parser = parser n.parser = parser
} }
// Start pulls data from nsq
func (n *NSQConsumer) Start(ac telegraf.Accumulator) error { func (n *NSQConsumer) Start(ac telegraf.Accumulator) error {
acc := ac.WithTracking(n.MaxUndeliveredMessages) acc := ac.WithTracking(n.MaxUndeliveredMessages)
sem := make(semaphore, n.MaxUndeliveredMessages) sem := make(semaphore, n.MaxUndeliveredMessages)
@ -140,6 +138,17 @@ func (n *NSQConsumer) Start(ac telegraf.Accumulator) error {
return nil return nil
} }
func (n *NSQConsumer) Gather(_ telegraf.Accumulator) error {
return nil
}
func (n *NSQConsumer) Stop() {
n.cancel()
n.wg.Wait()
n.consumer.Stop()
<-n.consumer.StopChan
}
func (n *NSQConsumer) onDelivery(ctx context.Context, acc telegraf.TrackingAccumulator, sem semaphore) { func (n *NSQConsumer) onDelivery(ctx context.Context, acc telegraf.TrackingAccumulator, sem semaphore) {
for { for {
select { select {
@ -165,19 +174,6 @@ func (n *NSQConsumer) onDelivery(ctx context.Context, acc telegraf.TrackingAccum
} }
} }
// Stop processing messages
func (n *NSQConsumer) Stop() {
n.cancel()
n.wg.Wait()
n.consumer.Stop()
<-n.consumer.StopChan
}
// Gather is a noop
func (n *NSQConsumer) Gather(_ telegraf.Accumulator) error {
return nil
}
func (n *NSQConsumer) connect() error { func (n *NSQConsumer) connect() error {
if n.consumer == nil { if n.consumer == nil {
config := nsq.NewConfig() config := nsq.NewConfig()

View File

@ -20,20 +20,18 @@ var (
colonByte = []byte(":") colonByte = []byte(":")
) )
// default file paths
const ( const (
NetNetstat = "/net/netstat" // default file paths
NetSnmp = "/net/snmp" netNetstat = "/net/netstat"
NetSnmp6 = "/net/snmp6" netSnmp = "/net/snmp"
NetProc = "/proc" netSnmp6 = "/net/snmp6"
) netProc = "/proc"
// env variable names // env variable names
const ( envNetstat = "PROC_NET_NETSTAT"
EnvNetstat = "PROC_NET_NETSTAT" envSnmp = "PROC_NET_SNMP"
EnvSnmp = "PROC_NET_SNMP" envSnmp6 = "PROC_NET_SNMP6"
EnvSnmp6 = "PROC_NET_SNMP6" envRoot = "PROC_ROOT"
EnvRoot = "PROC_ROOT"
) )
type Nstat struct { type Nstat struct {
@ -104,13 +102,13 @@ func (ns *Nstat) gatherSNMP6(data []byte, acc telegraf.Accumulator) {
// if it is empty then try read from env variables // if it is empty then try read from env variables
func (ns *Nstat) loadPaths() { func (ns *Nstat) loadPaths() {
if ns.ProcNetNetstat == "" { if ns.ProcNetNetstat == "" {
ns.ProcNetNetstat = proc(EnvNetstat, NetNetstat) ns.ProcNetNetstat = proc(envNetstat, netNetstat)
} }
if ns.ProcNetSNMP == "" { if ns.ProcNetSNMP == "" {
ns.ProcNetSNMP = proc(EnvSnmp, NetSnmp) ns.ProcNetSNMP = proc(envSnmp, netSnmp)
} }
if ns.ProcNetSNMP6 == "" { if ns.ProcNetSNMP6 == "" {
ns.ProcNetSNMP6 = proc(EnvSnmp6, NetSnmp6) ns.ProcNetSNMP6 = proc(envSnmp6, netSnmp6)
} }
} }
@ -188,9 +186,9 @@ func proc(env, path string) string {
return p return p
} }
// try to read root path, or use default root path // try to read root path, or use default root path
root := os.Getenv(EnvRoot) root := os.Getenv(envRoot)
if root == "" { if root == "" {
root = NetProc root = netProc
} }
return root + path return root + path
} }

View File

@ -30,16 +30,25 @@ var reBrackets = regexp.MustCompile(`\s+\([\S]*`)
type elementType int64 type elementType int64
const ( const (
None elementType = iota none elementType = iota
Tag tag
FieldFloat fieldFloat
FieldDuration fieldDuration
FieldIntDecimal fieldIntDecimal
FieldIntOctal fieldIntOctal
FieldIntRatio8 fieldIntRatio8
FieldIntBits fieldIntBits
) )
type NTPQ struct {
DNSLookup bool `toml:"dns_lookup" deprecated:"1.24.0;1.35.0;add '-n' to 'options' instead to skip DNS lookup"`
Options string `toml:"options"`
Servers []string `toml:"servers"`
ReachFormat string `toml:"reach_format"`
runQ func(string) ([]byte, error)
}
type column struct { type column struct {
name string name string
etype elementType etype elementType
@ -55,21 +64,12 @@ var tagHeaders = map[string]string{
// Mapping of fields // Mapping of fields
var fieldElements = map[string]elementType{ var fieldElements = map[string]elementType{
"delay": FieldFloat, "delay": fieldFloat,
"jitter": FieldFloat, "jitter": fieldFloat,
"offset": FieldFloat, "offset": fieldFloat,
"reach": FieldIntDecimal, "reach": fieldIntDecimal,
"poll": FieldDuration, "poll": fieldDuration,
"when": FieldDuration, "when": fieldDuration,
}
type NTPQ struct {
DNSLookup bool `toml:"dns_lookup" deprecated:"1.24.0;1.35.0;add '-n' to 'options' instead to skip DNS lookup"`
Options string `toml:"options"`
Servers []string `toml:"servers"`
ReachFormat string `toml:"reach_format"`
runQ func(string) ([]byte, error)
} }
func (*NTPQ) SampleConfig() string { func (*NTPQ) SampleConfig() string {
@ -117,19 +117,19 @@ func (n *NTPQ) Init() error {
n.ReachFormat = "octal" n.ReachFormat = "octal"
// Interpret the field as decimal integer returning // Interpret the field as decimal integer returning
// the raw (octal) representation // the raw (octal) representation
fieldElements["reach"] = FieldIntDecimal fieldElements["reach"] = fieldIntDecimal
case "decimal": case "decimal":
// Interpret the field as octal integer returning // Interpret the field as octal integer returning
// decimal number representation // decimal number representation
fieldElements["reach"] = FieldIntOctal fieldElements["reach"] = fieldIntOctal
case "count": case "count":
// Interpret the field as bits set returning // Interpret the field as bits set returning
// the number of bits set // the number of bits set
fieldElements["reach"] = FieldIntBits fieldElements["reach"] = fieldIntBits
case "ratio": case "ratio":
// Interpret the field as ratio between the number of // Interpret the field as ratio between the number of
// bits set and the maximum available bits set (8). // bits set and the maximum available bits set (8).
fieldElements["reach"] = FieldIntRatio8 fieldElements["reach"] = fieldIntRatio8
default: default:
return fmt.Errorf("unknown 'reach_format' %q", n.ReachFormat) return fmt.Errorf("unknown 'reach_format' %q", n.ReachFormat)
} }
@ -176,7 +176,7 @@ func (n *NTPQ) gatherServer(acc telegraf.Accumulator, server string) {
if name, isTag := tagHeaders[el]; isTag { if name, isTag := tagHeaders[el]; isTag {
columns = append(columns, column{ columns = append(columns, column{
name: name, name: name,
etype: Tag, etype: tag,
}) })
continue continue
} }
@ -191,7 +191,7 @@ func (n *NTPQ) gatherServer(acc telegraf.Accumulator, server string) {
} }
// Skip the column if not found // Skip the column if not found
columns = append(columns, column{etype: None}) columns = append(columns, column{etype: none})
} }
break break
} }
@ -221,11 +221,11 @@ func (n *NTPQ) gatherServer(acc telegraf.Accumulator, server string) {
col := columns[i] col := columns[i]
switch col.etype { switch col.etype {
case None: case none:
continue continue
case Tag: case tag:
tags[col.name] = raw tags[col.name] = raw
case FieldFloat: case fieldFloat:
value, err := strconv.ParseFloat(raw, 64) value, err := strconv.ParseFloat(raw, 64)
if err != nil { if err != nil {
msg := fmt.Sprintf("%sparsing %q (%v) as float failed", msgPrefix, col.name, raw) msg := fmt.Sprintf("%sparsing %q (%v) as float failed", msgPrefix, col.name, raw)
@ -233,7 +233,7 @@ func (n *NTPQ) gatherServer(acc telegraf.Accumulator, server string) {
continue continue
} }
fields[col.name] = value fields[col.name] = value
case FieldDuration: case fieldDuration:
// Ignore fields only containing a minus // Ignore fields only containing a minus
if raw == "-" { if raw == "-" {
continue continue
@ -257,28 +257,28 @@ func (n *NTPQ) gatherServer(acc telegraf.Accumulator, server string) {
continue continue
} }
fields[col.name] = value * factor fields[col.name] = value * factor
case FieldIntDecimal: case fieldIntDecimal:
value, err := strconv.ParseInt(raw, 10, 64) value, err := strconv.ParseInt(raw, 10, 64)
if err != nil { if err != nil {
acc.AddError(fmt.Errorf("parsing %q (%v) as int failed: %w", col.name, raw, err)) acc.AddError(fmt.Errorf("parsing %q (%v) as int failed: %w", col.name, raw, err))
continue continue
} }
fields[col.name] = value fields[col.name] = value
case FieldIntOctal: case fieldIntOctal:
value, err := strconv.ParseInt(raw, 8, 64) value, err := strconv.ParseInt(raw, 8, 64)
if err != nil { if err != nil {
acc.AddError(fmt.Errorf("parsing %q (%v) as int failed: %w", col.name, raw, err)) acc.AddError(fmt.Errorf("parsing %q (%v) as int failed: %w", col.name, raw, err))
continue continue
} }
fields[col.name] = value fields[col.name] = value
case FieldIntBits: case fieldIntBits:
value, err := strconv.ParseUint(raw, 8, 64) value, err := strconv.ParseUint(raw, 8, 64)
if err != nil { if err != nil {
acc.AddError(fmt.Errorf("parsing %q (%v) as int failed: %w", col.name, raw, err)) acc.AddError(fmt.Errorf("parsing %q (%v) as int failed: %w", col.name, raw, err))
continue continue
} }
fields[col.name] = bits.OnesCount64(value) fields[col.name] = bits.OnesCount64(value)
case FieldIntRatio8: case fieldIntRatio8:
value, err := strconv.ParseUint(raw, 8, 64) value, err := strconv.ParseUint(raw, 8, 64)
if err != nil { if err != nil {
acc.AddError(fmt.Errorf("parsing %q (%v) as int failed: %w", col.name, raw, err)) acc.AddError(fmt.Errorf("parsing %q (%v) as int failed: %w", col.name, raw, err))

View File

@ -5,12 +5,14 @@ import (
"strings" "strings"
) )
// SetTagIfUsed sets those tags whose value is different from empty string.
func SetTagIfUsed(m map[string]string, k, v string) { func SetTagIfUsed(m map[string]string, k, v string) {
if v != "" { if v != "" {
m[k] = v m[k] = v
} }
} }
// SetIfUsed sets those fields whose value is different from empty string.
func SetIfUsed(t string, m map[string]interface{}, k, v string) { func SetIfUsed(t string, m map[string]interface{}, k, v string) {
vals := strings.Fields(v) vals := strings.Fields(v)
if len(vals) < 1 { if len(vals) < 1 {

View File

@ -8,6 +8,7 @@ import (
"github.com/influxdata/telegraf/plugins/inputs/nvidia_smi/common" "github.com/influxdata/telegraf/plugins/inputs/nvidia_smi/common"
) )
// Parse parses the XML-encoded data from nvidia-smi and adds measurements.
func Parse(acc telegraf.Accumulator, buf []byte) error { func Parse(acc telegraf.Accumulator, buf []byte) error {
var s smi var s smi
if err := xml.Unmarshal(buf, &s); err != nil { if err := xml.Unmarshal(buf, &s); err != nil {

View File

@ -9,6 +9,7 @@ import (
"github.com/influxdata/telegraf/plugins/inputs/nvidia_smi/common" "github.com/influxdata/telegraf/plugins/inputs/nvidia_smi/common"
) )
// Parse parses the XML-encoded data from nvidia-smi and adds measurements.
func Parse(acc telegraf.Accumulator, buf []byte) error { func Parse(acc telegraf.Accumulator, buf []byte) error {
var s smi var s smi
if err := xml.Unmarshal(buf, &s); err != nil { if err := xml.Unmarshal(buf, &s); err != nil {