chore: Fix linter findings for `revive:exported` in `plugins/inputs/p*` (#16307)

This commit is contained in:
Paweł Żak 2024-12-17 18:10:18 +01:00 committed by GitHub
parent 3b87986f42
commit e2b5a9910b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
38 changed files with 778 additions and 853 deletions

View File

@ -43,7 +43,7 @@ func createEntityCounterEntry(
}
}
func NewTestP4RuntimeClient(
func newTestP4RuntimeClient(
p4RuntimeClient *fakeP4RuntimeClient,
addr string,
t *testing.T,
@ -102,7 +102,7 @@ func TestErrorGetP4Info(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
plugin := NewTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t)
plugin := newTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t)
var acc testutil.Accumulator
require.Error(t, plugin.Gather(&acc))
@ -245,7 +245,7 @@ func TestOneCounterRead(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
plugin := NewTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t)
plugin := newTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t)
var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc))
@ -333,7 +333,7 @@ func TestMultipleEntitiesSingleCounterRead(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
plugin := NewTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t)
plugin := newTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t)
var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc))
@ -425,7 +425,7 @@ func TestSingleEntitiesMultipleCounterRead(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
plugin := NewTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t)
plugin := newTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t)
var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc))
@ -457,7 +457,7 @@ func TestNoCountersAvailable(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
plugin := NewTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t)
plugin := newTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t)
var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc))
@ -484,7 +484,7 @@ func TestFilterCounters(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
plugin := NewTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t)
plugin := newTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t)
plugin.CounterNamesInclude = []string{"oof"}
@ -534,7 +534,7 @@ func TestFailReadCounterEntryFromEntry(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
plugin := NewTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t)
plugin := newTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t)
var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc))
@ -577,7 +577,7 @@ func TestFailReadAllEntries(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
plugin := NewTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t)
plugin := newTestP4RuntimeClient(p4RtClient, listener.Addr().String(), t)
var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc))

View File

@ -19,22 +19,8 @@ import (
//go:embed sample.conf
var sampleConfig string
type passenger struct {
Command string
}
func (p *passenger) parseCommand() (string, []string) {
var arguments []string
if !strings.Contains(p.Command, " ") {
return p.Command, arguments
}
arguments = strings.Split(p.Command, " ")
if len(arguments) == 1 {
return arguments[0], arguments[1:]
}
return arguments[0], arguments[1:]
type Passenger struct {
Command string `toml:"command"`
}
type info struct {
@ -91,6 +77,39 @@ type process struct {
ProcessGroupID string `xml:"process_group_id"`
}
func (*Passenger) SampleConfig() string {
return sampleConfig
}
func (p *Passenger) Gather(acc telegraf.Accumulator) error {
if p.Command == "" {
p.Command = "passenger-status -v --show=xml"
}
cmd, args := p.parseCommand()
out, err := exec.Command(cmd, args...).Output()
if err != nil {
return err
}
return importMetric(out, acc)
}
func (p *Passenger) parseCommand() (string, []string) {
var arguments []string
if !strings.Contains(p.Command, " ") {
return p.Command, arguments
}
arguments = strings.Split(p.Command, " ")
if len(arguments) == 1 {
return arguments[0], arguments[1:]
}
return arguments[0], arguments[1:]
}
func (p *process) getUptime() int64 {
if p.Uptime == "" {
return 0
@ -131,25 +150,6 @@ func (p *process) getUptime() int64 {
return uptime
}
func (*passenger) SampleConfig() string {
return sampleConfig
}
func (p *passenger) Gather(acc telegraf.Accumulator) error {
if p.Command == "" {
p.Command = "passenger-status -v --show=xml"
}
cmd, args := p.parseCommand()
out, err := exec.Command(cmd, args...).Output()
if err != nil {
return err
}
return importMetric(out, acc)
}
func importMetric(stat []byte, acc telegraf.Accumulator) error {
var p info
@ -231,6 +231,6 @@ func importMetric(stat []byte, acc telegraf.Accumulator) error {
func init() {
inputs.Add("passenger", func() telegraf.Input {
return &passenger{}
return &Passenger{}
})
}

View File

@ -39,7 +39,7 @@ func teardown(tempFilePath string) {
}
func Test_Invalid_Passenger_Status_Cli(t *testing.T) {
r := &passenger{
r := &Passenger{
Command: "an-invalid-command passenger-status",
}
@ -55,7 +55,7 @@ func Test_Invalid_Xml(t *testing.T) {
require.NoError(t, err)
defer teardown(tempFilePath)
r := &passenger{
r := &Passenger{
Command: tempFilePath,
}
@ -72,7 +72,7 @@ func Test_Default_Config_Load_Default_Command(t *testing.T) {
require.NoError(t, err)
defer teardown(tempFilePath)
r := &passenger{}
r := &Passenger{}
var acc testutil.Accumulator
@ -87,7 +87,7 @@ func TestPassengerGenerateMetric(t *testing.T) {
defer teardown(tempFilePath)
// Now we tested again above server, with our authentication data
r := &passenger{
r := &Passenger{
Command: tempFilePath,
}

View File

@ -18,26 +18,81 @@ import (
//go:embed sample.conf
var sampleConfig string
const measurement = "pf"
const pfctlCommand = "pfctl"
var (
errParseHeader = fmt.Errorf("cannot find header in %s output", pfctlCommand)
anyTableHeaderRE = regexp.MustCompile("^[A-Z]")
stateTableRE = regexp.MustCompile(`^ (.*?)\s+(\d+)`)
counterTableRE = regexp.MustCompile(`^ (.*?)\s+(\d+)`)
execLookPath = exec.LookPath
execCommand = exec.Command
pfctlOutputStanzas = []*pfctlOutputStanza{
{
headerRE: regexp.MustCompile("^State Table"),
parseFunc: parseStateTable,
},
{
headerRE: regexp.MustCompile("^Counters"),
parseFunc: parseCounterTable,
},
}
stateTable = []*entry{
{"entries", "current entries", -1},
{"searches", "searches", -1},
{"inserts", "inserts", -1},
{"removals", "removals", -1},
}
counterTable = []*entry{
{"match", "match", -1},
{"bad-offset", "bad-offset", -1},
{"fragment", "fragment", -1},
{"short", "short", -1},
{"normalize", "normalize", -1},
{"memory", "memory", -1},
{"bad-timestamp", "bad-timestamp", -1},
{"congestion", "congestion", -1},
{"ip-option", "ip-option", -1},
{"proto-cksum", "proto-cksum", -1},
{"state-mismatch", "state-mismatch", -1},
{"state-insert", "state-insert", -1},
{"state-limit", "state-limit", -1},
{"src-limit", "src-limit", -1},
{"synproxy", "synproxy", -1},
}
)
const (
measurement = "pf"
pfctlCommand = "pfctl"
)
type PF struct {
PfctlCommand string
PfctlArgs []string
UseSudo bool
StateTable []*Entry
UseSudo bool `toml:"use_sudo"`
pfctlCommand string
pfctlArgs []string
infoFunc func() (string, error)
}
type pfctlOutputStanza struct {
headerRE *regexp.Regexp
parseFunc func([]string, map[string]interface{}) error
found bool
}
type entry struct {
field string
pfctlTitle string
value int64
}
func (*PF) SampleConfig() string {
return sampleConfig
}
// Gather is the entrypoint for the plugin.
func (pf *PF) Gather(acc telegraf.Accumulator) error {
if pf.PfctlCommand == "" {
if pf.pfctlCommand == "" {
var err error
if pf.PfctlCommand, pf.PfctlArgs, err = pf.buildPfctlCmd(); err != nil {
if pf.pfctlCommand, pf.pfctlArgs, err = pf.buildPfctlCmd(); err != nil {
acc.AddError(fmt.Errorf("can't construct pfctl commandline: %w", err))
return nil
}
@ -55,38 +110,17 @@ func (pf *PF) Gather(acc telegraf.Accumulator) error {
return nil
}
var errParseHeader = fmt.Errorf("cannot find header in %s output", pfctlCommand)
func errMissingData(tag string) error {
return fmt.Errorf("struct data for tag %q not found in %s output", tag, pfctlCommand)
}
type pfctlOutputStanza struct {
HeaderRE *regexp.Regexp
ParseFunc func([]string, map[string]interface{}) error
Found bool
}
var pfctlOutputStanzas = []*pfctlOutputStanza{
{
HeaderRE: regexp.MustCompile("^State Table"),
ParseFunc: parseStateTable,
},
{
HeaderRE: regexp.MustCompile("^Counters"),
ParseFunc: parseCounterTable,
},
}
var anyTableHeaderRE = regexp.MustCompile("^[A-Z]")
func (pf *PF) parsePfctlOutput(pfoutput string, acc telegraf.Accumulator) error {
fields := make(map[string]interface{})
scanner := bufio.NewScanner(strings.NewReader(pfoutput))
for scanner.Scan() {
line := scanner.Text()
for _, s := range pfctlOutputStanzas {
if s.HeaderRE.MatchString(line) {
if s.headerRE.MatchString(line) {
var stanzaLines []string
scanner.Scan()
line = scanner.Text()
@ -98,15 +132,15 @@ func (pf *PF) parsePfctlOutput(pfoutput string, acc telegraf.Accumulator) error
}
line = scanner.Text()
}
if perr := s.ParseFunc(stanzaLines, fields); perr != nil {
if perr := s.parseFunc(stanzaLines, fields); perr != nil {
return perr
}
s.Found = true
s.found = true
}
}
}
for _, s := range pfctlOutputStanzas {
if !s.Found {
if !s.found {
return errParseHeader
}
}
@ -115,57 +149,22 @@ func (pf *PF) parsePfctlOutput(pfoutput string, acc telegraf.Accumulator) error
return nil
}
type Entry struct {
Field string
PfctlTitle string
Value int64
}
var StateTable = []*Entry{
{"entries", "current entries", -1},
{"searches", "searches", -1},
{"inserts", "inserts", -1},
{"removals", "removals", -1},
}
var stateTableRE = regexp.MustCompile(`^ (.*?)\s+(\d+)`)
func parseStateTable(lines []string, fields map[string]interface{}) error {
return storeFieldValues(lines, stateTableRE, fields, StateTable)
return storeFieldValues(lines, stateTableRE, fields, stateTable)
}
var CounterTable = []*Entry{
{"match", "match", -1},
{"bad-offset", "bad-offset", -1},
{"fragment", "fragment", -1},
{"short", "short", -1},
{"normalize", "normalize", -1},
{"memory", "memory", -1},
{"bad-timestamp", "bad-timestamp", -1},
{"congestion", "congestion", -1},
{"ip-option", "ip-option", -1},
{"proto-cksum", "proto-cksum", -1},
{"state-mismatch", "state-mismatch", -1},
{"state-insert", "state-insert", -1},
{"state-limit", "state-limit", -1},
{"src-limit", "src-limit", -1},
{"synproxy", "synproxy", -1},
}
var counterTableRE = regexp.MustCompile(`^ (.*?)\s+(\d+)`)
func parseCounterTable(lines []string, fields map[string]interface{}) error {
return storeFieldValues(lines, counterTableRE, fields, CounterTable)
return storeFieldValues(lines, counterTableRE, fields, counterTable)
}
func storeFieldValues(lines []string, regex *regexp.Regexp, fields map[string]interface{}, entryTable []*Entry) error {
func storeFieldValues(lines []string, regex *regexp.Regexp, fields map[string]interface{}, entryTable []*entry) error {
for _, v := range lines {
entries := regex.FindStringSubmatch(v)
if entries != nil {
for _, f := range entryTable {
if f.PfctlTitle == entries[1] {
if f.pfctlTitle == entries[1] {
var err error
if f.Value, err = strconv.ParseInt(entries[2], 10, 64); err != nil {
if f.value, err = strconv.ParseInt(entries[2], 10, 64); err != nil {
return err
}
}
@ -174,17 +173,17 @@ func storeFieldValues(lines []string, regex *regexp.Regexp, fields map[string]in
}
for _, v := range entryTable {
if v.Value == -1 {
return errMissingData(v.PfctlTitle)
if v.value == -1 {
return errMissingData(v.pfctlTitle)
}
fields[v.Field] = v.Value
fields[v.field] = v.value
}
return nil
}
func (pf *PF) callPfctl() (string, error) {
cmd := execCommand(pf.PfctlCommand, pf.PfctlArgs...)
cmd := execCommand(pf.pfctlCommand, pf.pfctlArgs...)
out, oerr := cmd.Output()
if oerr != nil {
var ee *exec.ExitError
@ -196,9 +195,6 @@ func (pf *PF) callPfctl() (string, error) {
return string(out), oerr
}
var execLookPath = exec.LookPath
var execCommand = exec.Command
func (pf *PF) buildPfctlCmd() (string, []string, error) {
cmd, err := execLookPath(pfctlCommand)
if err != nil {

View File

@ -16,6 +16,11 @@ import (
//go:embed sample.conf
var sampleConfig string
var ignoredColumns = map[string]bool{"user": true, "database": true, "pool_mode": true,
"avg_req": true, "avg_recv": true, "avg_sent": true, "avg_query": true,
"force_user": true, "host": true, "port": true, "name": true,
}
type PgBouncer struct {
ShowCommands []string `toml:"show_commands"`
postgresql.Config
@ -23,11 +28,6 @@ type PgBouncer struct {
service *postgresql.Service
}
var ignoredColumns = map[string]bool{"user": true, "database": true, "pool_mode": true,
"avg_req": true, "avg_recv": true, "avg_sent": true, "avg_query": true,
"force_user": true, "host": true, "port": true, "name": true,
}
func (*PgBouncer) SampleConfig() string {
return sampleConfig
}
@ -58,10 +58,6 @@ func (p *PgBouncer) Start(_ telegraf.Accumulator) error {
return p.service.Start()
}
func (p *PgBouncer) Stop() {
p.service.Stop()
}
func (p *PgBouncer) Gather(acc telegraf.Accumulator) error {
for _, cmd := range p.ShowCommands {
switch cmd {
@ -87,6 +83,10 @@ func (p *PgBouncer) Gather(acc telegraf.Accumulator) error {
return nil
}
func (p *PgBouncer) Stop() {
p.service.Stop()
}
func (p *PgBouncer) accRow(row *sql.Rows, columns []string) (map[string]string, map[string]*interface{}, error) {
var dbname bytes.Buffer

View File

@ -10,10 +10,8 @@ import (
"errors"
"fmt"
"io"
"net"
"net/http"
"net/http/cgi"
"os"
"strings"
"sync"
"time"
@ -164,13 +162,13 @@ var errCloseConn = errors.New("fcgi: connection should be closed")
var emptyBody = io.NopCloser(strings.NewReader(""))
// ErrRequestAborted is returned by Read when a handler attempts to read the
// errRequestAborted is returned by Read when a handler attempts to read the
// body of a request that has been aborted by the web server.
var ErrRequestAborted = errors.New("fcgi: request aborted by web server")
var errRequestAborted = errors.New("fcgi: request aborted by web server")
// ErrConnClosed is returned by Read when a handler attempts to read the body of
// errConnClosed is returned by Read when a handler attempts to read the body of
// a request after the connection to the web server has been closed.
var ErrConnClosed = errors.New("fcgi: connection to web server closed")
var errConnClosed = errors.New("fcgi: connection to web server closed")
func (c *child) handleRecord(rec *record) error {
c.mu.Lock()
@ -249,7 +247,7 @@ func (c *child) handleRecord(rec *record) error {
return err
}
if req.pw != nil {
req.pw.CloseWithError(ErrRequestAborted)
req.pw.CloseWithError(errRequestAborted)
}
if !req.keepConn {
// connection will close upon return
@ -306,34 +304,7 @@ func (c *child) cleanUp() {
if req.pw != nil {
// race with call to Close in c.serveRequest doesn't matter because
// Pipe(Reader|Writer).Close are idempotent
req.pw.CloseWithError(ErrConnClosed)
req.pw.CloseWithError(errConnClosed)
}
}
}
// Serve accepts incoming FastCGI connections on the listener l, creating a new
// goroutine for each. The goroutine reads requests and then calls handler
// to reply to them.
// If l is nil, Serve accepts connections from os.Stdin.
// If handler is nil, http.DefaultServeMux is used.
func Serve(l net.Listener, handler http.Handler) error {
if l == nil {
var err error
l, err = net.FileListener(os.Stdin)
if err != nil {
return err
}
defer l.Close()
}
if handler == nil {
handler = http.DefaultServeMux
}
for {
rw, err := l.Accept()
if err != nil {
return err
}
c := newChild(rw, handler)
go c.serve()
}
}

View File

@ -44,7 +44,7 @@ func newFcgiClient(timeout time.Duration, h string, args ...interface{}) (*conn,
return &conn{rwc: con}, nil
}
func (c *conn) Request(env map[string]string, requestData string) (retout, reterr []byte, err error) {
func (c *conn) request(env map[string]string, requestData string) (retout, reterr []byte, err error) {
defer c.rwc.Close()
var reqID uint16 = 1

View File

@ -206,7 +206,7 @@ var cleanUpTests = []struct {
makeRecord(typeAbortRequest, nil),
},
nil),
ErrRequestAborted,
errRequestAborted,
},
// confirm that child.serve closes all pipes after error reading record
{
@ -215,7 +215,7 @@ var cleanUpTests = []struct {
nil,
},
nil),
ErrConnClosed,
errConnClosed,
},
}

View File

@ -26,22 +26,31 @@ import (
var sampleConfig string
const (
PfPool = "pool"
PfProcessManager = "process manager"
PfStartSince = "start since"
PfAcceptedConn = "accepted conn"
PfListenQueue = "listen queue"
PfMaxListenQueue = "max listen queue"
PfListenQueueLen = "listen queue len"
PfIdleProcesses = "idle processes"
PfActiveProcesses = "active processes"
PfTotalProcesses = "total processes"
PfMaxActiveProcesses = "max active processes"
PfMaxChildrenReached = "max children reached"
PfSlowRequests = "slow requests"
pfPool = "pool"
pfStartSince = "start since"
pfAcceptedConn = "accepted conn"
pfListenQueue = "listen queue"
pfMaxListenQueue = "max listen queue"
pfListenQueueLen = "listen queue len"
pfIdleProcesses = "idle processes"
pfActiveProcesses = "active processes"
pfTotalProcesses = "total processes"
pfMaxActiveProcesses = "max active processes"
pfMaxChildrenReached = "max children reached"
pfSlowRequests = "slow requests"
)
type JSONMetrics struct {
type Phpfpm struct {
Format string `toml:"format"`
Timeout config.Duration `toml:"timeout"`
Urls []string `toml:"urls"`
Log telegraf.Logger `toml:"-"`
tls.ClientConfig
client *http.Client
}
type jsonMetrics struct {
Pool string `json:"pool"`
ProcessManager string `json:"process manager"`
StartTime int `json:"start time"`
@ -76,21 +85,11 @@ type JSONMetrics struct {
type metricStat map[string]int64
type poolStat map[string]metricStat
type phpfpm struct {
Format string `toml:"format"`
Timeout config.Duration `toml:"timeout"`
Urls []string `toml:"urls"`
Log telegraf.Logger `toml:"-"`
tls.ClientConfig
client *http.Client
}
func (*phpfpm) SampleConfig() string {
func (*Phpfpm) SampleConfig() string {
return sampleConfig
}
func (p *phpfpm) Init() error {
func (p *Phpfpm) Init() error {
if len(p.Urls) == 0 {
p.Urls = []string{"http://127.0.0.1/status"}
}
@ -118,9 +117,7 @@ func (p *phpfpm) Init() error {
return nil
}
// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (p *phpfpm) Gather(acc telegraf.Accumulator) error {
func (p *Phpfpm) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
for _, serv := range expandUrls(acc, p.Urls) {
wg.Add(1)
@ -136,7 +133,7 @@ func (p *phpfpm) Gather(acc telegraf.Accumulator) error {
}
// Request status page to get stat raw data and import it
func (p *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error {
func (p *Phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error {
if strings.HasPrefix(addr, "http://") || strings.HasPrefix(addr, "https://") {
return p.gatherHTTP(addr, acc)
}
@ -187,8 +184,8 @@ func (p *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error {
}
// Gather stat using fcgi protocol
func (p *phpfpm) gatherFcgi(fcgi *conn, statusPath string, acc telegraf.Accumulator, addr string) error {
fpmOutput, fpmErr, err := fcgi.Request(map[string]string{
func (p *Phpfpm) gatherFcgi(fcgi *conn, statusPath string, acc telegraf.Accumulator, addr string) error {
fpmOutput, fpmErr, err := fcgi.request(map[string]string{
"SCRIPT_NAME": "/" + statusPath,
"SCRIPT_FILENAME": statusPath,
"REQUEST_METHOD": "GET",
@ -206,7 +203,7 @@ func (p *phpfpm) gatherFcgi(fcgi *conn, statusPath string, acc telegraf.Accumula
}
// Gather stat using http protocol
func (p *phpfpm) gatherHTTP(addr string, acc telegraf.Accumulator) error {
func (p *Phpfpm) gatherHTTP(addr string, acc telegraf.Accumulator) error {
u, err := url.Parse(addr)
if err != nil {
return fmt.Errorf("unable parse server address %q: %w", addr, err)
@ -232,7 +229,7 @@ func (p *phpfpm) gatherHTTP(addr string, acc telegraf.Accumulator) error {
}
// Import stat data into Telegraf system
func (p *phpfpm) importMetric(r io.Reader, acc telegraf.Accumulator, addr string) {
func (p *Phpfpm) importMetric(r io.Reader, acc telegraf.Accumulator, addr string) {
if p.Format == "json" {
p.parseJSON(r, acc, addr)
} else {
@ -254,7 +251,7 @@ func parseLines(r io.Reader, acc telegraf.Accumulator, addr string) {
}
fieldName := strings.Trim(keyvalue[0], " ")
// We start to gather data for a new pool here
if fieldName == PfPool {
if fieldName == pfPool {
currentPool = strings.Trim(keyvalue[1], " ")
stats[currentPool] = make(metricStat)
continue
@ -262,17 +259,17 @@ func parseLines(r io.Reader, acc telegraf.Accumulator, addr string) {
// Start to parse metric for current pool
switch fieldName {
case PfStartSince,
PfAcceptedConn,
PfListenQueue,
PfMaxListenQueue,
PfListenQueueLen,
PfIdleProcesses,
PfActiveProcesses,
PfTotalProcesses,
PfMaxActiveProcesses,
PfMaxChildrenReached,
PfSlowRequests:
case pfStartSince,
pfAcceptedConn,
pfListenQueue,
pfMaxListenQueue,
pfListenQueueLen,
pfIdleProcesses,
pfActiveProcesses,
pfTotalProcesses,
pfMaxActiveProcesses,
pfMaxChildrenReached,
pfSlowRequests:
fieldValue, err := strconv.ParseInt(strings.Trim(keyvalue[1], " "), 10, 64)
if err == nil {
stats[currentPool][fieldName] = fieldValue
@ -294,8 +291,8 @@ func parseLines(r io.Reader, acc telegraf.Accumulator, addr string) {
}
}
func (p *phpfpm) parseJSON(r io.Reader, acc telegraf.Accumulator, addr string) {
var metrics JSONMetrics
func (p *Phpfpm) parseJSON(r io.Reader, acc telegraf.Accumulator, addr string) {
var metrics jsonMetrics
if err := json.NewDecoder(r).Decode(&metrics); err != nil {
p.Log.Errorf("Unable to decode JSON response: %s", err)
return
@ -402,6 +399,6 @@ func isNetworkURL(addr string) bool {
func init() {
inputs.Add("phpfpm", func() telegraf.Input {
return &phpfpm{}
return &Phpfpm{}
})
}

View File

@ -56,7 +56,7 @@ func TestPhpFpmGeneratesMetrics_From_Http(t *testing.T) {
defer ts.Close()
url := ts.URL + "?test=ok"
r := &phpfpm{
r := &Phpfpm{
Urls: []string{url},
Log: &testutil.Logger{},
}
@ -106,7 +106,7 @@ func TestPhpFpmGeneratesJSONMetrics_From_Http(t *testing.T) {
expected, err := testutil.ParseMetricsFromFile("testdata/expected.out", parser)
require.NoError(t, err)
input := &phpfpm{
input := &Phpfpm{
Urls: []string{server.URL + "?full&json"},
Format: "json",
Log: &testutil.Logger{},
@ -128,7 +128,7 @@ func TestPhpFpmGeneratesMetrics_From_Fcgi(t *testing.T) {
go fcgi.Serve(tcp, s) //nolint:errcheck // ignore the returned error as we cannot do anything about it anyway
// Now we tested again above server
r := &phpfpm{
r := &Phpfpm{
Urls: []string{"fcgi://" + tcp.Addr().String() + "/status"},
Log: &testutil.Logger{},
}
@ -179,7 +179,7 @@ func TestPhpFpmTimeout_From_Fcgi(t *testing.T) {
}()
// Now we tested again above server
r := &phpfpm{
r := &Phpfpm{
Urls: []string{"fcgi://" + tcp.Addr().String() + "/status"},
Timeout: config.Duration(timeout),
Log: &testutil.Logger{},
@ -211,7 +211,7 @@ func TestPhpFpmCrashWithTimeout_From_Fcgi(t *testing.T) {
const timeout = 200 * time.Millisecond
// Now we tested again above server
r := &phpfpm{
r := &Phpfpm{
Urls: []string{"fcgi://" + tcpAddress + "/status"},
Timeout: config.Duration(timeout),
Log: &testutil.Logger{},
@ -237,7 +237,7 @@ func TestPhpFpmGeneratesMetrics_From_Socket(t *testing.T) {
s := statServer{}
go fcgi.Serve(tcp, s) //nolint:errcheck // ignore the returned error as we cannot do anything about it anyway
r := &phpfpm{
r := &Phpfpm{
Urls: []string{tcp.Addr().String()},
Log: &testutil.Logger{},
}
@ -289,7 +289,7 @@ func TestPhpFpmGeneratesMetrics_From_Multiple_Sockets_With_Glob(t *testing.T) {
go fcgi.Serve(tcp1, s) //nolint:errcheck // ignore the returned error as we cannot do anything about it anyway
go fcgi.Serve(tcp2, s) //nolint:errcheck // ignore the returned error as we cannot do anything about it anyway
r := &phpfpm{
r := &Phpfpm{
Urls: []string{"/tmp/test-fpm[\\-0-9]*.sock"},
Log: &testutil.Logger{},
}
@ -340,7 +340,7 @@ func TestPhpFpmGeneratesMetrics_From_Socket_Custom_Status_Path(t *testing.T) {
s := statServer{}
go fcgi.Serve(tcp, s) //nolint:errcheck // ignore the returned error as we cannot do anything about it anyway
r := &phpfpm{
r := &Phpfpm{
Urls: []string{tcp.Addr().String() + ":custom-status-path"},
Log: &testutil.Logger{},
}
@ -374,7 +374,7 @@ func TestPhpFpmGeneratesMetrics_From_Socket_Custom_Status_Path(t *testing.T) {
// When not passing server config, we default to localhost
// We just want to make sure we did request stat from localhost
func TestPhpFpmDefaultGetFromLocalhost(t *testing.T) {
r := &phpfpm{
r := &Phpfpm{
Urls: []string{"http://bad.localhost:62001/status"},
Log: &testutil.Logger{},
}
@ -389,7 +389,7 @@ func TestPhpFpmGeneratesMetrics_Throw_Error_When_Fpm_Status_Is_Not_Responding(t
t.Skip("Skipping long test in short mode")
}
r := &phpfpm{
r := &Phpfpm{
Urls: []string{"http://aninvalidone"},
Log: &testutil.Logger{},
}
@ -402,7 +402,7 @@ func TestPhpFpmGeneratesMetrics_Throw_Error_When_Fpm_Status_Is_Not_Responding(t
}
func TestPhpFpmGeneratesMetrics_Throw_Error_When_Socket_Path_Is_Invalid(t *testing.T) {
r := &phpfpm{
r := &Phpfpm{
Urls: []string{"/tmp/invalid.sock"},
Log: &testutil.Logger{},
}
@ -435,7 +435,7 @@ var outputSampleJSON []byte
func TestPhpFpmParseJSON_Log_Error_Without_Panic_When_When_JSON_Is_Invalid(t *testing.T) {
// Capture the logging output for checking
logger := &testutil.CaptureLogger{Name: "inputs.phpfpm"}
plugin := &phpfpm{Log: logger}
plugin := &Phpfpm{Log: logger}
require.NoError(t, plugin.Init())
// parse valid JSON without panic and without log output
@ -459,7 +459,7 @@ func TestGatherDespiteUnavailable(t *testing.T) {
go fcgi.Serve(tcp, s) //nolint:errcheck // ignore the returned error as we cannot do anything about it anyway
// Now we tested again above server
r := &phpfpm{
r := &Phpfpm{
Urls: []string{"fcgi://" + tcp.Addr().String() + "/status", "/lala"},
Log: &testutil.Logger{},
}

View File

@ -28,73 +28,71 @@ const (
defaultPingDataBytesSize = 56
)
// HostPinger is a function that runs the "ping" function using a list of
// passed arguments. This can be easily switched with a mocked ping function
// for unit test purposes (see ping_test.go)
type HostPinger func(binary string, timeout float64, args ...string) (string, error)
type Ping struct {
// wg is used to wait for ping with multiple URLs
wg sync.WaitGroup
// Pre-calculated interval and timeout
calcInterval time.Duration
calcTimeout time.Duration
sourceAddress string
Urls []string `toml:"urls"` // URLs to ping
Method string `toml:"method"` // Method defines how to ping (native or exec)
Count int `toml:"count"` // Number of pings to send (ping -c <COUNT>)
PingInterval float64 `toml:"ping_interval"` // Interval at which to ping (ping -i <INTERVAL>)
Timeout float64 `toml:"timeout"` // Per-ping timeout, in seconds. 0 means no timeout (ping -W <TIMEOUT>)
Deadline int `toml:"deadline"` // Ping deadline, in seconds. 0 means no deadline. (ping -w <DEADLINE>)
Interface string `toml:"interface"` // Interface or source address to send ping from (ping -I/-S <INTERFACE/SRC_ADDR>)
Percentiles []int `toml:"percentiles"` // Calculate the given percentiles when using native method
Binary string `toml:"binary"` // Ping executable binary
// Arguments for ping command. When arguments are not empty, system binary will be used and other options (ping_interval, timeout, etc.) will be ignored
Arguments []string `toml:"arguments"`
IPv4 bool `toml:"ipv4"` // Whether to resolve addresses using ipv4 or not.
IPv6 bool `toml:"ipv6"` // Whether to resolve addresses using ipv6 or not.
Size *int `toml:"size"` // Packet size
Log telegraf.Logger `toml:"-"`
// Interval at which to ping (ping -i <INTERVAL>)
PingInterval float64 `toml:"ping_interval"`
wg sync.WaitGroup // wg is used to wait for ping with multiple URLs
calcInterval time.Duration // Pre-calculated interval and timeout
calcTimeout time.Duration
sourceAddress string
pingHost hostPingerFunc // host ping function
nativePingFunc nativePingFunc
}
// Number of pings to send (ping -c <COUNT>)
Count int
// hostPingerFunc is a function that runs the "ping" function using a list of
// passed arguments. This can be easily switched with a mocked ping function
// for unit test purposes (see ping_test.go)
type hostPingerFunc func(binary string, timeout float64, args ...string) (string, error)
// Per-ping timeout, in seconds. 0 means no timeout (ping -W <TIMEOUT>)
Timeout float64
type nativePingFunc func(destination string) (*pingStats, error)
// Ping deadline, in seconds. 0 means no deadline. (ping -w <DEADLINE>)
Deadline int
type durationSlice []time.Duration
// Interface or source address to send ping from (ping -I/-S <INTERFACE/SRC_ADDR>)
Interface string
// URLs to ping
Urls []string
// Method defines how to ping (native or exec)
Method string
// Ping executable binary
Binary string
// Arguments for ping command. When arguments is not empty, system binary will be used and
// other options (ping_interval, timeout, etc.) will be ignored
Arguments []string
// Whether to resolve addresses using ipv4 or not.
IPv4 bool
// Whether to resolve addresses using ipv6 or not.
IPv6 bool
// host ping function
pingHost HostPinger
nativePingFunc NativePingFunc
// Calculate the given percentiles when using native method
Percentiles []int
// Packet size
Size *int
type pingStats struct {
ping.Statistics
ttl int
}
func (*Ping) SampleConfig() string {
return sampleConfig
}
func (p *Ping) Init() error {
if p.Count < 1 {
return errors.New("bad number of packets to transmit")
}
// The interval cannot be below 0.2 seconds, matching ping implementation: https://linux.die.net/man/8/ping
if p.PingInterval < 0.2 {
p.calcInterval = time.Duration(.2 * float64(time.Second))
} else {
p.calcInterval = time.Duration(p.PingInterval * float64(time.Second))
}
// If no timeout is given default to 5 seconds, matching original implementation
if p.Timeout == 0 {
p.calcTimeout = time.Duration(5) * time.Second
} else {
p.calcTimeout = time.Duration(p.Timeout) * time.Second
}
return nil
}
func (p *Ping) Gather(acc telegraf.Accumulator) error {
for _, host := range p.Urls {
p.wg.Add(1)
@ -115,13 +113,6 @@ func (p *Ping) Gather(acc telegraf.Accumulator) error {
return nil
}
type pingStats struct {
ping.Statistics
ttl int
}
type NativePingFunc func(destination string) (*pingStats, error)
func (p *Ping) nativePing(destination string) (*pingStats, error) {
ps := &pingStats{}
@ -259,10 +250,10 @@ func (p *Ping) pingToURLNative(destination string, acc telegraf.Accumulator) {
acc.AddFields("ping", fields, tags)
}
type durationSlice []time.Duration
func (p durationSlice) Len() int { return len(p) }
func (p durationSlice) Less(i, j int) bool { return p[i] < p[j] }
func (p durationSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
// R7 from Hyndman and Fan (1996), which matches Excel
@ -292,29 +283,6 @@ func percentile(values durationSlice, perc int) time.Duration {
return lower + time.Duration(rankFraction*float64(upper-lower))
}
// Init ensures the plugin is configured correctly.
func (p *Ping) Init() error {
if p.Count < 1 {
return errors.New("bad number of packets to transmit")
}
// The interval cannot be below 0.2 seconds, matching ping implementation: https://linux.die.net/man/8/ping
if p.PingInterval < 0.2 {
p.calcInterval = time.Duration(.2 * float64(time.Second))
} else {
p.calcInterval = time.Duration(p.PingInterval * float64(time.Second))
}
// If no timeout is given default to 5 seconds, matching original implementation
if p.Timeout == 0 {
p.calcTimeout = time.Duration(5) * time.Second
} else {
p.calcTimeout = time.Duration(p.Timeout) * time.Second
}
return nil
}
func hostPinger(binary string, timeout float64, args ...string) (string, error) {
bin, err := exec.LookPath(binary)
if err != nil {

View File

@ -261,7 +261,7 @@ func TestFatalPingGather(t *testing.T) {
"Fatal ping should not have packet measurements")
}
var UnreachablePingOutput = `
var unreachablePingOutput = `
Pinging www.google.pl [8.8.8.8] with 32 bytes of data:
Request timed out.
Request timed out.
@ -273,7 +273,7 @@ Ping statistics for 8.8.8.8:
`
func mockUnreachableHostPinger(string, float64, ...string) (string, error) {
return UnreachablePingOutput, errors.New("so very bad")
return unreachablePingOutput, errors.New("so very bad")
}
// Reply from 185.28.251.217: TTL expired in transit.
@ -312,7 +312,7 @@ func TestUnreachablePingGather(t *testing.T) {
"Fatal ping should not have packet measurements")
}
var TTLExpiredPingOutput = `
var ttlExpiredPingOutput = `
Pinging www.google.pl [8.8.8.8] with 32 bytes of data:
Request timed out.
Request timed out.
@ -324,7 +324,7 @@ Ping statistics for 8.8.8.8:
`
func mockTTLExpiredPinger(string, float64, ...string) (string, error) {
return TTLExpiredPingOutput, errors.New("so very bad")
return ttlExpiredPingOutput, errors.New("so very bad")
}
// in case 'Destination net unreachable' ping app return receive packet which is not what we need

View File

@ -21,6 +21,36 @@ import (
//go:embed sample.conf
var sampleConfig string
type Postfix struct {
QueueDirectory string `toml:"queue_directory"`
}
func (*Postfix) SampleConfig() string {
return sampleConfig
}
func (p *Postfix) Gather(acc telegraf.Accumulator) error {
if p.QueueDirectory == "" {
var err error
p.QueueDirectory, err = getQueueDirectory()
if err != nil {
return fmt.Errorf("unable to determine queue directory: %w", err)
}
}
for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred"} {
fields, err := qScan(filepath.Join(p.QueueDirectory, q), acc)
if err != nil {
acc.AddError(fmt.Errorf("error scanning queue %q: %w", q, err))
continue
}
acc.AddFields("postfix_queue", fields, map[string]string{"queue": q})
}
return nil
}
func getQueueDirectory() (string, error) {
qd, err := exec.Command("postconf", "-h", "queue_directory").Output()
if err != nil {
@ -75,36 +105,6 @@ func qScan(path string, acc telegraf.Accumulator) (map[string]interface{}, error
return fields, nil
}
type Postfix struct {
QueueDirectory string
}
func (*Postfix) SampleConfig() string {
return sampleConfig
}
func (p *Postfix) Gather(acc telegraf.Accumulator) error {
if p.QueueDirectory == "" {
var err error
p.QueueDirectory, err = getQueueDirectory()
if err != nil {
return fmt.Errorf("unable to determine queue directory: %w", err)
}
}
for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred"} {
fields, err := qScan(filepath.Join(p.QueueDirectory, q), acc)
if err != nil {
acc.AddError(fmt.Errorf("error scanning queue %q: %w", q, err))
continue
}
acc.AddFields("postfix_queue", fields, map[string]string{"queue": q})
}
return nil
}
func init() {
inputs.Add("postfix", func() telegraf.Input {
return &Postfix{

View File

@ -16,11 +16,13 @@ type Postfix struct {
Log telegraf.Logger `toml:"-"`
}
func (*Postfix) SampleConfig() string { return sampleConfig }
func (p *Postfix) Init() error {
p.Log.Warn("current platform is not supported")
p.Log.Warn("Current platform is not supported")
return nil
}
func (*Postfix) SampleConfig() string { return sampleConfig }
func (*Postfix) Gather(_ telegraf.Accumulator) error { return nil }
func init() {

View File

@ -16,6 +16,8 @@ import (
//go:embed sample.conf
var sampleConfig string
var ignoredColumns = map[string]bool{"stats_reset": true}
type Postgresql struct {
Databases []string `toml:"databases"`
IgnoredDatabases []string `toml:"ignored_databases"`
@ -25,8 +27,6 @@ type Postgresql struct {
service *postgresql.Service
}
var ignoredColumns = map[string]bool{"stats_reset": true}
func (*Postgresql) SampleConfig() string {
return sampleConfig
}
@ -47,10 +47,6 @@ func (p *Postgresql) Start(_ telegraf.Accumulator) error {
return p.service.Start()
}
func (p *Postgresql) Stop() {
p.service.Stop()
}
func (p *Postgresql) Gather(acc telegraf.Accumulator) error {
var query string
if len(p.Databases) == 0 && len(p.IgnoredDatabases) == 0 {
@ -106,6 +102,10 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error {
return bgWriterRow.Err()
}
func (p *Postgresql) Stop() {
p.service.Stop()
}
func (p *Postgresql) accRow(row *sql.Rows, acc telegraf.Accumulator, columns []string) error {
var dbname bytes.Buffer

View File

@ -21,6 +21,8 @@ import (
//go:embed sample.conf
var sampleConfig string
var ignoredColumns = map[string]bool{"stats_reset": true}
type Postgresql struct {
Databases []string `deprecated:"1.22.4;use the sqlquery option to specify database to use"`
Query []query `toml:"query"`
@ -45,7 +47,9 @@ type query struct {
additionalTags map[string]bool
}
var ignoredColumns = map[string]bool{"stats_reset": true}
type scanner interface {
Scan(dest ...interface{}) error
}
func (*Postgresql) SampleConfig() string {
return sampleConfig
@ -102,10 +106,6 @@ func (p *Postgresql) Start(_ telegraf.Accumulator) error {
return p.service.Start()
}
func (p *Postgresql) Stop() {
p.service.Stop()
}
func (p *Postgresql) Gather(acc telegraf.Accumulator) error {
// Retrieving the database version
query := `SELECT setting::integer / 100 AS version FROM pg_settings WHERE name = 'server_version_num'`
@ -128,6 +128,10 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error {
return nil
}
func (p *Postgresql) Stop() {
p.service.Stop()
}
func (p *Postgresql) gatherMetricsFromQuery(acc telegraf.Accumulator, q query, timestamp time.Time) error {
rows, err := p.service.DB.Query(q.Sqlquery)
if err != nil {
@ -150,10 +154,6 @@ func (p *Postgresql) gatherMetricsFromQuery(acc telegraf.Accumulator, q query, t
return nil
}
type scanner interface {
Scan(dest ...interface{}) error
}
func (p *Postgresql) accRow(acc telegraf.Accumulator, row scanner, columns []string, q query, timestamp time.Time) error {
// this is where we'll store the column name with its *interface{}
columnMap := make(map[string]*interface{})

View File

@ -19,14 +19,13 @@ import (
//go:embed sample.conf
var sampleConfig string
type Powerdns struct {
UnixSockets []string
const defaultTimeout = 5 * time.Second
type Powerdns struct {
UnixSockets []string `toml:"unix_sockets"`
Log telegraf.Logger `toml:"-"`
}
var defaultTimeout = 5 * time.Second
func (*Powerdns) SampleConfig() string {
return sampleConfig
}

View File

@ -14,6 +14,8 @@ import (
//go:embed sample.conf
var sampleConfig string
const defaultTimeout = 5 * time.Second
type PowerdnsRecursor struct {
UnixSockets []string `toml:"unix_sockets"`
SocketDir string `toml:"socket_dir"`
@ -26,8 +28,6 @@ type PowerdnsRecursor struct {
gatherFromServer func(address string, acc telegraf.Accumulator) error
}
var defaultTimeout = 5 * time.Second
func (*PowerdnsRecursor) SampleConfig() string {
return sampleConfig
}

View File

@ -20,12 +20,10 @@ import (
type Processes struct {
UseSudo bool `toml:"use_sudo"`
Log telegraf.Logger `toml:"-"`
execPS func(UseSudo bool) ([]byte, error)
readProcFile func(filename string) ([]byte, error)
Log telegraf.Logger
forcePS bool
forceProc bool
}

View File

@ -7,13 +7,13 @@ import (
"strconv"
"strings"
"github.com/shirou/gopsutil/v4/process"
gopsprocess "github.com/shirou/gopsutil/v4/process"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
telegraf_filter "github.com/influxdata/telegraf/filter"
)
type Filter struct {
type filter struct {
Name string `toml:"name"`
PidFiles []string `toml:"pid_files"`
SystemdUnits []string `toml:"systemd_units"`
@ -29,13 +29,13 @@ type Filter struct {
filterSupervisorUnit string
filterCmds []*regexp.Regexp
filterUser filter.Filter
filterExecutable filter.Filter
filterProcessName filter.Filter
filterUser telegraf_filter.Filter
filterExecutable telegraf_filter.Filter
filterProcessName telegraf_filter.Filter
finder *processFinder
}
func (f *Filter) Init() error {
func (f *filter) init() error {
if f.Name == "" {
return errors.New("filter must be named")
}
@ -74,13 +74,13 @@ func (f *Filter) Init() error {
f.filterSupervisorUnit = strings.TrimSpace(strings.Join(f.SupervisorUnits, " "))
var err error
if f.filterUser, err = filter.Compile(f.Users); err != nil {
if f.filterUser, err = telegraf_filter.Compile(f.Users); err != nil {
return fmt.Errorf("compiling users filter for %q failed: %w", f.Name, err)
}
if f.filterExecutable, err = filter.Compile(f.Executables); err != nil {
if f.filterExecutable, err = telegraf_filter.Compile(f.Executables); err != nil {
return fmt.Errorf("compiling executables filter for %q failed: %w", f.Name, err)
}
if f.filterProcessName, err = filter.Compile(f.ProcessNames); err != nil {
if f.filterProcessName, err = telegraf_filter.Compile(f.ProcessNames); err != nil {
return fmt.Errorf("compiling process-names filter for %q failed: %w", f.Name, err)
}
@ -89,7 +89,7 @@ func (f *Filter) Init() error {
return nil
}
func (f *Filter) ApplyFilter() ([]processGroup, error) {
func (f *filter) applyFilter() ([]processGroup, error) {
// Determine processes on service level. if there is no constraint on the
// services, use all processes for matching.
var groups []processGroup
@ -125,7 +125,7 @@ func (f *Filter) ApplyFilter() ([]processGroup, error) {
}
groups = append(groups, g...)
default:
procs, err := process.Processes()
procs, err := gopsprocess.Processes()
if err != nil {
return nil, err
}
@ -135,7 +135,7 @@ func (f *Filter) ApplyFilter() ([]processGroup, error) {
// Filter by additional properties such as users, patterns etc
result := make([]processGroup, 0, len(groups))
for _, g := range groups {
var matched []*process.Process
var matched []*gopsprocess.Process
for _, p := range g.processes {
// Users
if f.filterUser != nil {
@ -218,13 +218,13 @@ func (f *Filter) ApplyFilter() ([]processGroup, error) {
return result, nil
}
func getChildren(p *process.Process) ([]*process.Process, error) {
func getChildren(p *gopsprocess.Process) ([]*gopsprocess.Process, error) {
children, err := p.Children()
// Check for cases that do not really mean error but rather means that there
// is no match.
switch {
case err == nil,
errors.Is(err, process.ErrorNoChildren),
errors.Is(err, gopsprocess.ErrorNoChildren),
strings.Contains(err.Error(), "exit status 1"):
return children, nil
}

View File

@ -7,16 +7,16 @@ import (
"strconv"
"strings"
"github.com/shirou/gopsutil/v4/process"
gopsprocess "github.com/shirou/gopsutil/v4/process"
)
// NativeFinder uses gopsutil to find processes
type NativeFinder struct{}
// Uid will return all pids for the given user
func (pg *NativeFinder) UID(user string) ([]PID, error) {
var dst []PID
procs, err := process.Processes()
func (pg *NativeFinder) uid(user string) ([]pid, error) {
var dst []pid
procs, err := gopsprocess.Processes()
if err != nil {
return dst, err
}
@ -27,35 +27,35 @@ func (pg *NativeFinder) UID(user string) ([]PID, error) {
continue
}
if username == user {
dst = append(dst, PID(p.Pid))
dst = append(dst, pid(p.Pid))
}
}
return dst, nil
}
// PidFile returns the pid from the pid file given.
func (pg *NativeFinder) PidFile(path string) ([]PID, error) {
var pids []PID
func (pg *NativeFinder) pidFile(path string) ([]pid, error) {
var pids []pid
pidString, err := os.ReadFile(path)
if err != nil {
return pids, fmt.Errorf("failed to read pidfile %q: %w", path, err)
}
pid, err := strconv.ParseInt(strings.TrimSpace(string(pidString)), 10, 32)
processID, err := strconv.ParseInt(strings.TrimSpace(string(pidString)), 10, 32)
if err != nil {
return pids, err
}
pids = append(pids, PID(pid))
pids = append(pids, pid(processID))
return pids, nil
}
// FullPattern matches on the command line when the process was executed
func (pg *NativeFinder) FullPattern(pattern string) ([]PID, error) {
var pids []PID
func (pg *NativeFinder) fullPattern(pattern string) ([]pid, error) {
var pids []pid
regxPattern, err := regexp.Compile(pattern)
if err != nil {
return pids, err
}
procs, err := pg.FastProcessList()
procs, err := pg.fastProcessList()
if err != nil {
return pids, err
}
@ -66,18 +66,18 @@ func (pg *NativeFinder) FullPattern(pattern string) ([]PID, error) {
continue
}
if regxPattern.MatchString(cmd) {
pids = append(pids, PID(p.Pid))
pids = append(pids, pid(p.Pid))
}
}
return pids, err
}
// Children matches children pids on the command line when the process was executed
func (pg *NativeFinder) Children(pid PID) ([]PID, error) {
func (pg *NativeFinder) children(processID pid) ([]pid, error) {
// Get all running processes
p, err := process.NewProcess(int32(pid))
p, err := gopsprocess.NewProcess(int32(processID))
if err != nil {
return nil, fmt.Errorf("getting process %d failed: %w", pid, err)
return nil, fmt.Errorf("getting process %d failed: %w", processID, err)
}
// Get all children of the current process
@ -85,35 +85,35 @@ func (pg *NativeFinder) Children(pid PID) ([]PID, error) {
if err != nil {
return nil, fmt.Errorf("unable to get children of process %d: %w", p.Pid, err)
}
pids := make([]PID, 0, len(children))
pids := make([]pid, 0, len(children))
for _, child := range children {
pids = append(pids, PID(child.Pid))
pids = append(pids, pid(child.Pid))
}
return pids, err
}
func (pg *NativeFinder) FastProcessList() ([]*process.Process, error) {
pids, err := process.Pids()
func (pg *NativeFinder) fastProcessList() ([]*gopsprocess.Process, error) {
pids, err := gopsprocess.Pids()
if err != nil {
return nil, err
}
result := make([]*process.Process, 0, len(pids))
result := make([]*gopsprocess.Process, 0, len(pids))
for _, pid := range pids {
result = append(result, &process.Process{Pid: pid})
result = append(result, &gopsprocess.Process{Pid: pid})
}
return result, nil
}
// Pattern matches on the process name
func (pg *NativeFinder) Pattern(pattern string) ([]PID, error) {
var pids []PID
func (pg *NativeFinder) pattern(pattern string) ([]pid, error) {
var pids []pid
regxPattern, err := regexp.Compile(pattern)
if err != nil {
return pids, err
}
procs, err := pg.FastProcessList()
procs, err := pg.fastProcessList()
if err != nil {
return pids, err
}
@ -124,7 +124,7 @@ func (pg *NativeFinder) Pattern(pattern string) ([]PID, error) {
continue
}
if regxPattern.MatchString(name) {
pids = append(pids, PID(p.Pid))
pids = append(pids, pid(p.Pid))
}
}
return pids, err

View File

@ -14,7 +14,7 @@ import (
func BenchmarkPattern(b *testing.B) {
finder := &NativeFinder{}
for n := 0; n < b.N; n++ {
_, err := finder.Pattern(".*")
_, err := finder.pattern(".*")
require.NoError(b, err)
}
}
@ -22,7 +22,7 @@ func BenchmarkPattern(b *testing.B) {
func BenchmarkFullPattern(b *testing.B) {
finder := &NativeFinder{}
for n := 0; n < b.N; n++ {
_, err := finder.FullPattern(".*")
_, err := finder.fullPattern(".*")
require.NoError(b, err)
}
}
@ -37,26 +37,26 @@ func TestChildPattern(t *testing.T) {
require.NoError(t, err)
// Spawn two child processes and get their PIDs
expected := make([]PID, 0, 2)
expected := make([]pid, 0, 2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// First process
cmd1 := exec.CommandContext(ctx, "/bin/sh")
require.NoError(t, cmd1.Start(), "starting first command failed")
expected = append(expected, PID(cmd1.Process.Pid))
expected = append(expected, pid(cmd1.Process.Pid))
// Second process
cmd2 := exec.CommandContext(ctx, "/bin/sh")
require.NoError(t, cmd2.Start(), "starting first command failed")
expected = append(expected, PID(cmd2.Process.Pid))
expected = append(expected, pid(cmd2.Process.Pid))
// Use the plugin to find the children
finder := &NativeFinder{}
parent, err := finder.Pattern(parentName)
parent, err := finder.pattern(parentName)
require.NoError(t, err)
require.Len(t, parent, 1)
children, err := finder.Children(parent[0])
children, err := finder.children(parent[0])
require.NoError(t, err)
require.ElementsMatch(t, expected, children)
}
@ -66,7 +66,7 @@ func TestGather_RealPatternIntegration(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
pg := &NativeFinder{}
pids, err := pg.Pattern(`procstat`)
pids, err := pg.pattern(`procstat`)
require.NoError(t, err)
require.NotEmpty(t, pids)
}
@ -79,7 +79,7 @@ func TestGather_RealFullPatternIntegration(t *testing.T) {
t.Skip("Skipping integration test on Non-Windows OS")
}
pg := &NativeFinder{}
pids, err := pg.FullPattern(`%procstat%`)
pids, err := pg.fullPattern(`%procstat%`)
require.NoError(t, err)
require.NotEmpty(t, pids)
}
@ -92,7 +92,7 @@ func TestGather_RealUserIntegration(t *testing.T) {
require.NoError(t, err)
pg := &NativeFinder{}
pids, err := pg.UID(currentUser.Username)
pids, err := pg.uid(currentUser.Username)
require.NoError(t, err)
require.NotEmpty(t, pids)
}

View File

@ -13,15 +13,15 @@ import (
"github.com/coreos/go-systemd/v22/dbus"
"github.com/prometheus/procfs"
"github.com/shirou/gopsutil/v4/net"
"github.com/shirou/gopsutil/v4/process"
gopsnet "github.com/shirou/gopsutil/v4/net"
gopsprocess "github.com/shirou/gopsutil/v4/process"
"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"
"github.com/influxdata/telegraf/internal"
)
func processName(p *process.Process) (string, error) {
func processName(p *gopsprocess.Process) (string, error) {
return p.Exe()
}
@ -29,7 +29,7 @@ func queryPidWithWinServiceName(_ string) (uint32, error) {
return 0, errors.New("os not supporting win_service option")
}
func collectMemmap(proc Process, prefix string, fields map[string]any) {
func collectMemmap(proc process, prefix string, fields map[string]any) {
memMapStats, err := proc.MemoryMaps(true)
if err == nil && len(*memMapStats) == 1 {
memMap := (*memMapStats)[0]
@ -70,12 +70,12 @@ func findBySystemdUnits(units []string) ([]processGroup, error) {
if !ok {
return nil, fmt.Errorf("failed to parse PID %v of unit %q: invalid type %T", raw, u, raw)
}
p, err := process.NewProcess(int32(pid))
p, err := gopsprocess.NewProcess(int32(pid))
if err != nil {
return nil, fmt.Errorf("failed to find process for PID %d of unit %q: %w", pid, u, err)
}
groups = append(groups, processGroup{
processes: []*process.Process{p},
processes: []*gopsprocess.Process{p},
tags: map[string]string{"systemd_unit": u.Name},
})
}
@ -87,14 +87,14 @@ func findByWindowsServices(_ []string) ([]processGroup, error) {
return nil, nil
}
func collectTotalReadWrite(proc Process) (r, w uint64, err error) {
func collectTotalReadWrite(proc process) (r, w uint64, err error) {
path := internal.GetProcPath()
fs, err := procfs.NewFS(path)
if err != nil {
return 0, 0, err
}
p, err := fs.Proc(int(proc.PID()))
p, err := fs.Proc(int(proc.pid()))
if err != nil {
return 0, 0, err
}
@ -177,7 +177,7 @@ func mapFdToInode(pid int32, fd uint32) (uint32, error) {
return uint32(inode), nil
}
func statsTCP(conns []net.ConnectionStat, family uint8) ([]map[string]interface{}, error) {
func statsTCP(conns []gopsnet.ConnectionStat, family uint8) ([]map[string]interface{}, error) {
if len(conns) == 0 {
return nil, nil
}
@ -185,7 +185,7 @@ func statsTCP(conns []net.ConnectionStat, family uint8) ([]map[string]interface{
// For TCP we need the inode for each connection to relate the connection
// statistics to the actual process socket. Therefore, map the
// file-descriptors to inodes using the /proc/<pid>/fd entries.
inodes := make(map[uint32]net.ConnectionStat, len(conns))
inodes := make(map[uint32]gopsnet.ConnectionStat, len(conns))
for _, c := range conns {
inode, err := mapFdToInode(c.Pid, c.Fd)
if err != nil {
@ -240,7 +240,7 @@ func statsTCP(conns []net.ConnectionStat, family uint8) ([]map[string]interface{
return fieldslist, nil
}
func statsUDP(conns []net.ConnectionStat, family uint8) ([]map[string]interface{}, error) {
func statsUDP(conns []gopsnet.ConnectionStat, family uint8) ([]map[string]interface{}, error) {
if len(conns) == 0 {
return nil, nil
}
@ -248,7 +248,7 @@ func statsUDP(conns []net.ConnectionStat, family uint8) ([]map[string]interface{
// For UDP we need the inode for each connection to relate the connection
// statistics to the actual process socket. Therefore, map the
// file-descriptors to inodes using the /proc/<pid>/fd entries.
inodes := make(map[uint32]net.ConnectionStat, len(conns))
inodes := make(map[uint32]gopsnet.ConnectionStat, len(conns))
for _, c := range conns {
inode, err := mapFdToInode(c.Pid, c.Fd)
if err != nil {
@ -299,7 +299,7 @@ func statsUDP(conns []net.ConnectionStat, family uint8) ([]map[string]interface{
return fieldslist, nil
}
func statsUnix(conns []net.ConnectionStat) ([]map[string]interface{}, error) {
func statsUnix(conns []gopsnet.ConnectionStat) ([]map[string]interface{}, error) {
if len(conns) == 0 {
return nil, nil
}
@ -307,7 +307,7 @@ func statsUnix(conns []net.ConnectionStat) ([]map[string]interface{}, error) {
// We need to read the inode for each connection to relate the connection
// statistics to the actual process socket. Therefore, map the
// file-descriptors to inodes using the /proc/<pid>/fd entries.
inodes := make(map[uint32]net.ConnectionStat, len(conns))
inodes := make(map[uint32]gopsnet.ConnectionStat, len(conns))
for _, c := range conns {
inode, err := mapFdToInode(c.Pid, c.Fd)
if err != nil {

View File

@ -6,11 +6,11 @@ import (
"errors"
"syscall"
"github.com/shirou/gopsutil/v4/net"
"github.com/shirou/gopsutil/v4/process"
gopsnet "github.com/shirou/gopsutil/v4/net"
gopsprocess "github.com/shirou/gopsutil/v4/process"
)
func processName(p *process.Process) (string, error) {
func processName(p *gopsprocess.Process) (string, error) {
return p.Exe()
}
@ -18,7 +18,7 @@ func queryPidWithWinServiceName(string) (uint32, error) {
return 0, errors.New("os not supporting win_service option")
}
func collectMemmap(Process, string, map[string]any) {}
func collectMemmap(process, string, map[string]any) {}
func findBySystemdUnits([]string) ([]processGroup, error) {
return nil, nil
@ -28,11 +28,11 @@ func findByWindowsServices([]string) ([]processGroup, error) {
return nil, nil
}
func collectTotalReadWrite(Process) (r, w uint64, err error) {
func collectTotalReadWrite(process) (r, w uint64, err error) {
return 0, 0, errors.ErrUnsupported
}
func statsTCP(conns []net.ConnectionStat, _ uint8) ([]map[string]interface{}, error) {
func statsTCP(conns []gopsnet.ConnectionStat, _ uint8) ([]map[string]interface{}, error) {
if len(conns) == 0 {
return nil, nil
}
@ -65,7 +65,7 @@ func statsTCP(conns []net.ConnectionStat, _ uint8) ([]map[string]interface{}, er
return fieldslist, nil
}
func statsUDP(conns []net.ConnectionStat, _ uint8) ([]map[string]interface{}, error) {
func statsUDP(conns []gopsnet.ConnectionStat, _ uint8) ([]map[string]interface{}, error) {
if len(conns) == 0 {
return nil, nil
}
@ -98,6 +98,6 @@ func statsUDP(conns []net.ConnectionStat, _ uint8) ([]map[string]interface{}, er
return fieldslist, nil
}
func statsUnix([]net.ConnectionStat) ([]map[string]interface{}, error) {
func statsUnix([]gopsnet.ConnectionStat) ([]map[string]interface{}, error) {
return nil, errors.ErrUnsupported
}

View File

@ -8,13 +8,13 @@ import (
"syscall"
"unsafe"
"github.com/shirou/gopsutil/v4/net"
"github.com/shirou/gopsutil/v4/process"
gopsnet "github.com/shirou/gopsutil/v4/net"
gopsprocess "github.com/shirou/gopsutil/v4/process"
"golang.org/x/sys/windows"
"golang.org/x/sys/windows/svc/mgr"
)
func processName(p *process.Process) (string, error) {
func processName(p *gopsprocess.Process) (string, error) {
return p.Name()
}
@ -57,7 +57,7 @@ func queryPidWithWinServiceName(winServiceName string) (uint32, error) {
return p.ProcessId, nil
}
func collectMemmap(Process, string, map[string]any) {}
func collectMemmap(process, string, map[string]any) {}
func findBySystemdUnits([]string) ([]processGroup, error) {
return nil, nil
@ -71,13 +71,13 @@ func findByWindowsServices(services []string) ([]processGroup, error) {
return nil, fmt.Errorf("failed to query PID of service %q: %w", service, err)
}
p, err := process.NewProcess(int32(pid))
p, err := gopsprocess.NewProcess(int32(pid))
if err != nil {
return nil, fmt.Errorf("failed to find process for PID %d of service %q: %w", pid, service, err)
}
groups = append(groups, processGroup{
processes: []*process.Process{p},
processes: []*gopsprocess.Process{p},
tags: map[string]string{"win_service": service},
})
}
@ -85,11 +85,11 @@ func findByWindowsServices(services []string) ([]processGroup, error) {
return groups, nil
}
func collectTotalReadWrite(Process) (r, w uint64, err error) {
func collectTotalReadWrite(process) (r, w uint64, err error) {
return 0, 0, errors.ErrUnsupported
}
func statsTCP(conns []net.ConnectionStat, _ uint8) ([]map[string]interface{}, error) {
func statsTCP(conns []gopsnet.ConnectionStat, _ uint8) ([]map[string]interface{}, error) {
if len(conns) == 0 {
return nil, nil
}
@ -122,7 +122,7 @@ func statsTCP(conns []net.ConnectionStat, _ uint8) ([]map[string]interface{}, er
return fieldslist, nil
}
func statsUDP(conns []net.ConnectionStat, _ uint8) ([]map[string]interface{}, error) {
func statsUDP(conns []gopsnet.ConnectionStat, _ uint8) ([]map[string]interface{}, error) {
if len(conns) == 0 {
return nil, nil
}
@ -155,6 +155,6 @@ func statsUDP(conns []net.ConnectionStat, _ uint8) ([]map[string]interface{}, er
return fieldslist, nil
}
func statsUnix([]net.ConnectionStat) ([]map[string]interface{}, error) {
func statsUnix([]gopsnet.ConnectionStat) ([]map[string]interface{}, error) {
return nil, nil
}

View File

@ -11,54 +11,54 @@ import (
)
// Implementation of PIDGatherer that execs pgrep to find processes
type Pgrep struct {
type pgrep struct {
path string
}
func newPgrepFinder() (PIDFinder, error) {
func newPgrepFinder() (pidFinder, error) {
path, err := exec.LookPath("pgrep")
if err != nil {
return nil, fmt.Errorf("could not find pgrep binary: %w", err)
}
return &Pgrep{path}, nil
return &pgrep{path}, nil
}
func (pg *Pgrep) PidFile(path string) ([]PID, error) {
var pids []PID
func (pg *pgrep) pidFile(path string) ([]pid, error) {
var pids []pid
pidString, err := os.ReadFile(path)
if err != nil {
return pids, fmt.Errorf("failed to read pidfile %q: %w",
path, err)
}
pid, err := strconv.ParseInt(strings.TrimSpace(string(pidString)), 10, 32)
processID, err := strconv.ParseInt(strings.TrimSpace(string(pidString)), 10, 32)
if err != nil {
return pids, err
}
pids = append(pids, PID(pid))
pids = append(pids, pid(processID))
return pids, nil
}
func (pg *Pgrep) Pattern(pattern string) ([]PID, error) {
func (pg *pgrep) pattern(pattern string) ([]pid, error) {
args := []string{pattern}
return pg.find(args)
}
func (pg *Pgrep) UID(user string) ([]PID, error) {
func (pg *pgrep) uid(user string) ([]pid, error) {
args := []string{"-u", user}
return pg.find(args)
}
func (pg *Pgrep) FullPattern(pattern string) ([]PID, error) {
func (pg *pgrep) fullPattern(pattern string) ([]pid, error) {
args := []string{"-f", pattern}
return pg.find(args)
}
func (pg *Pgrep) Children(pid PID) ([]PID, error) {
func (pg *pgrep) children(pid pid) ([]pid, error) {
args := []string{"-P", strconv.FormatInt(int64(pid), 10)}
return pg.find(args)
}
func (pg *Pgrep) find(args []string) ([]PID, error) {
func (pg *pgrep) find(args []string) ([]pid, error) {
// Execute pgrep with the given arguments
buf, err := exec.Command(pg.path, args...).Output()
if err != nil {
@ -73,13 +73,13 @@ func (pg *Pgrep) find(args []string) ([]PID, error) {
// Parse the command output to extract the PIDs
fields := strings.Fields(out)
pids := make([]PID, 0, len(fields))
pids := make([]pid, 0, len(fields))
for _, field := range fields {
pid, err := strconv.ParseInt(field, 10, 32)
processID, err := strconv.ParseInt(field, 10, 32)
if err != nil {
return nil, err
}
pids = append(pids, PID(pid))
pids = append(pids, pid(processID))
}
return pids, nil
}

View File

@ -9,41 +9,41 @@ import (
"time"
gopsnet "github.com/shirou/gopsutil/v4/net"
"github.com/shirou/gopsutil/v4/process"
gopsprocess "github.com/shirou/gopsutil/v4/process"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)
type Process interface {
PID() PID
type process interface {
Name() (string, error)
SetTag(string, string)
MemoryMaps(bool) (*[]process.MemoryMapsStat, error)
Metrics(string, *collectionConfig, time.Time) ([]telegraf.Metric, error)
MemoryMaps(bool) (*[]gopsprocess.MemoryMapsStat, error)
pid() pid
setTag(string, string)
metrics(string, *collectionConfig, time.Time) ([]telegraf.Metric, error)
}
type PIDFinder interface {
PidFile(path string) ([]PID, error)
Pattern(pattern string) ([]PID, error)
UID(user string) ([]PID, error)
FullPattern(path string) ([]PID, error)
Children(pid PID) ([]PID, error)
type pidFinder interface {
pidFile(path string) ([]pid, error)
pattern(pattern string) ([]pid, error)
uid(user string) ([]pid, error)
fullPattern(path string) ([]pid, error)
children(pid pid) ([]pid, error)
}
type Proc struct {
type proc struct {
hasCPUTimes bool
tags map[string]string
*process.Process
*gopsprocess.Process
}
func newProc(pid PID) (Process, error) {
p, err := process.NewProcess(int32(pid))
func newProc(pid pid) (process, error) {
p, err := gopsprocess.NewProcess(int32(pid))
if err != nil {
return nil, err
}
proc := &Proc{
proc := &proc{
Process: p,
hasCPUTimes: false,
tags: make(map[string]string),
@ -51,15 +51,15 @@ func newProc(pid PID) (Process, error) {
return proc, nil
}
func (p *Proc) PID() PID {
return PID(p.Process.Pid)
func (p *proc) pid() pid {
return pid(p.Process.Pid)
}
func (p *Proc) SetTag(k, v string) {
func (p *proc) setTag(k, v string) {
p.tags[k] = v
}
func (p *Proc) percent(_ time.Duration) (float64, error) {
func (p *proc) percent(_ time.Duration) (float64, error) {
cpuPerc, err := p.Process.Percent(time.Duration(0))
if !p.hasCPUTimes && err == nil {
p.hasCPUTimes = true
@ -68,8 +68,8 @@ func (p *Proc) percent(_ time.Duration) (float64, error) {
return cpuPerc, err
}
// Add metrics a single Process
func (p *Proc) Metrics(prefix string, cfg *collectionConfig, t time.Time) ([]telegraf.Metric, error) {
// Add metrics a single process
func (p *proc) metrics(prefix string, cfg *collectionConfig, t time.Time) ([]telegraf.Metric, error) {
if prefix != "" {
prefix += "_"
}
@ -163,27 +163,27 @@ func (p *Proc) Metrics(prefix string, cfg *collectionConfig, t time.Time) ([]tel
for _, rlim := range rlims {
var name string
switch rlim.Resource {
case process.RLIMIT_CPU:
case gopsprocess.RLIMIT_CPU:
name = "cpu_time"
case process.RLIMIT_DATA:
case gopsprocess.RLIMIT_DATA:
name = "memory_data"
case process.RLIMIT_STACK:
case gopsprocess.RLIMIT_STACK:
name = "memory_stack"
case process.RLIMIT_RSS:
case gopsprocess.RLIMIT_RSS:
name = "memory_rss"
case process.RLIMIT_NOFILE:
case gopsprocess.RLIMIT_NOFILE:
name = "num_fds"
case process.RLIMIT_MEMLOCK:
case gopsprocess.RLIMIT_MEMLOCK:
name = "memory_locked"
case process.RLIMIT_AS:
case gopsprocess.RLIMIT_AS:
name = "memory_vms"
case process.RLIMIT_LOCKS:
case gopsprocess.RLIMIT_LOCKS:
name = "file_locks"
case process.RLIMIT_SIGPENDING:
case gopsprocess.RLIMIT_SIGPENDING:
name = "signals_pending"
case process.RLIMIT_NICE:
case gopsprocess.RLIMIT_NICE:
name = "nice_priority"
case process.RLIMIT_RTPRIO:
case gopsprocess.RLIMIT_RTPRIO:
name = "realtime_priority"
default:
continue

View File

@ -15,7 +15,7 @@ import (
"strings"
"time"
"github.com/shirou/gopsutil/v4/process"
gopsprocess "github.com/shirou/gopsutil/v4/process"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/choice"
@ -28,14 +28,7 @@ var sampleConfig string
// execCommand is so tests can mock out exec.Command usage.
var execCommand = exec.Command
type PID int32
type collectionConfig struct {
solarisMode bool
tagging map[string]bool
features map[string]bool
socketProtos []string
}
type pid int32
type Procstat struct {
PidFinder string `toml:"pid_finder"`
@ -57,24 +50,31 @@ type Procstat struct {
Properties []string `toml:"properties"`
SocketProtocols []string `toml:"socket_protocols"`
TagWith []string `toml:"tag_with"`
Filter []Filter `toml:"filter"`
Filter []filter `toml:"filter"`
Log telegraf.Logger `toml:"-"`
finder PIDFinder
processes map[PID]Process
finder pidFinder
processes map[pid]process
cfg collectionConfig
oldMode bool
createProcess func(PID) (Process, error)
createProcess func(pid) (process, error)
}
type PidsTags struct {
PIDs []PID
type collectionConfig struct {
solarisMode bool
tagging map[string]bool
features map[string]bool
socketProtos []string
}
type pidsTags struct {
PIDs []pid
Tags map[string]string
}
type processGroup struct {
processes []*process.Process
processes []*gopsprocess.Process
tags map[string]string
}
@ -196,14 +196,14 @@ func (p *Procstat) Init() error {
// New-style operations
for i := range p.Filter {
p.Filter[i].Log = p.Log
if err := p.Filter[i].Init(); err != nil {
if err := p.Filter[i].init(); err != nil {
return fmt.Errorf("initializing filter %d failed: %w", i, err)
}
}
}
// Initialize the running process cache
p.processes = make(map[PID]Process)
p.processes = make(map[pid]process)
return nil
}
@ -240,7 +240,7 @@ func (p *Procstat) gatherOld(acc telegraf.Accumulator) error {
}
var count int
running := make(map[PID]bool)
running := make(map[pid]bool)
for _, r := range results {
if len(r.PIDs) < 1 && len(p.SupervisorUnits) > 0 {
continue
@ -271,16 +271,16 @@ func (p *Procstat) gatherOld(acc telegraf.Accumulator) error {
// Add initial tags
for k, v := range r.Tags {
proc.SetTag(k, v)
proc.setTag(k, v)
}
if p.ProcessName != "" {
proc.SetTag("process_name", p.ProcessName)
proc.setTag("process_name", p.ProcessName)
}
p.processes[pid] = proc
}
running[pid] = true
metrics, err := proc.Metrics(p.Prefix, &p.cfg, now)
metrics, err := proc.metrics(p.Prefix, &p.cfg, now)
if err != nil {
// Continue after logging an error as there might still be
// metrics available
@ -324,9 +324,9 @@ func (p *Procstat) gatherOld(acc telegraf.Accumulator) error {
func (p *Procstat) gatherNew(acc telegraf.Accumulator) error {
now := time.Now()
running := make(map[PID]bool)
running := make(map[pid]bool)
for _, f := range p.Filter {
groups, err := f.ApplyFilter()
groups, err := f.applyFilter()
if err != nil {
// Add lookup error-metric
acc.AddFields(
@ -357,8 +357,8 @@ func (p *Procstat) gatherNew(acc telegraf.Accumulator) error {
// Use the cached processes as we need the existing instances
// to compute delta-metrics (e.g. cpu-usage).
pid := PID(gp.Pid)
proc, found := p.processes[pid]
pid := pid(gp.Pid)
process, found := p.processes[pid]
if !found {
//nolint:errcheck // Assumption: if a process has no name, it probably does not exist
if name, _ := gp.Name(); name == "" {
@ -372,19 +372,19 @@ func (p *Procstat) gatherNew(acc telegraf.Accumulator) error {
tags[k] = v
}
if p.ProcessName != "" {
proc.SetTag("process_name", p.ProcessName)
process.setTag("process_name", p.ProcessName)
}
tags["filter"] = f.Name
proc = &Proc{
process = &proc{
Process: gp,
hasCPUTimes: false,
tags: tags,
}
p.processes[pid] = proc
p.processes[pid] = process
}
running[pid] = true
metrics, err := proc.Metrics(p.Prefix, &p.cfg, now)
metrics, err := process.metrics(p.Prefix, &p.cfg, now)
if err != nil {
// Continue after logging an error as there might still be
// metrics available
@ -422,7 +422,7 @@ func (p *Procstat) gatherNew(acc telegraf.Accumulator) error {
}
// Get matching PIDs and their initial tags
func (p *Procstat) findPids() ([]PidsTags, error) {
func (p *Procstat) findPids() ([]pidsTags, error) {
switch {
case len(p.SupervisorUnits) > 0:
return p.findSupervisorUnits()
@ -434,65 +434,65 @@ func (p *Procstat) findPids() ([]PidsTags, error) {
return nil, err
}
tags := map[string]string{"win_service": p.WinService}
return []PidsTags{{pids, tags}}, nil
return []pidsTags{{pids, tags}}, nil
case p.CGroup != "":
return p.cgroupPIDs()
case p.PidFile != "":
pids, err := p.finder.PidFile(p.PidFile)
pids, err := p.finder.pidFile(p.PidFile)
if err != nil {
return nil, err
}
tags := map[string]string{"pidfile": p.PidFile}
return []PidsTags{{pids, tags}}, nil
return []pidsTags{{pids, tags}}, nil
case p.Exe != "":
pids, err := p.finder.Pattern(p.Exe)
pids, err := p.finder.pattern(p.Exe)
if err != nil {
return nil, err
}
tags := map[string]string{"exe": p.Exe}
return []PidsTags{{pids, tags}}, nil
return []pidsTags{{pids, tags}}, nil
case p.Pattern != "":
pids, err := p.finder.FullPattern(p.Pattern)
pids, err := p.finder.fullPattern(p.Pattern)
if err != nil {
return nil, err
}
tags := map[string]string{"pattern": p.Pattern}
return []PidsTags{{pids, tags}}, nil
return []pidsTags{{pids, tags}}, nil
case p.User != "":
pids, err := p.finder.UID(p.User)
pids, err := p.finder.uid(p.User)
if err != nil {
return nil, err
}
tags := map[string]string{"user": p.User}
return []PidsTags{{pids, tags}}, nil
return []pidsTags{{pids, tags}}, nil
}
return nil, errors.New("no filter option set")
}
func (p *Procstat) findSupervisorUnits() ([]PidsTags, error) {
func (p *Procstat) findSupervisorUnits() ([]pidsTags, error) {
groups, groupsTags, err := p.supervisorPIDs()
if err != nil {
return nil, fmt.Errorf("getting supervisor PIDs failed: %w", err)
}
// According to the PID, find the system process number and get the child processes
pidTags := make([]PidsTags, 0, len(groups))
pidTags := make([]pidsTags, 0, len(groups))
for _, group := range groups {
grppid := groupsTags[group]["pid"]
if grppid == "" {
pidTags = append(pidTags, PidsTags{nil, groupsTags[group]})
pidTags = append(pidTags, pidsTags{nil, groupsTags[group]})
continue
}
pid, err := strconv.ParseInt(grppid, 10, 32)
processID, err := strconv.ParseInt(grppid, 10, 32)
if err != nil {
return nil, fmt.Errorf("converting PID %q failed: %w", grppid, err)
}
// Get all children of the supervisor unit
pids, err := p.finder.Children(PID(pid))
pids, err := p.finder.children(pid(processID))
if err != nil {
return nil, fmt.Errorf("getting children for %d failed: %w", pid, err)
return nil, fmt.Errorf("getting children for %d failed: %w", processID, err)
}
tags := map[string]string{"pattern": p.Pattern, "parent_pid": p.Pattern}
@ -510,7 +510,7 @@ func (p *Procstat) findSupervisorUnits() ([]PidsTags, error) {
}
// Remove duplicate pid tags
delete(tags, "pid")
pidTags = append(pidTags, PidsTags{pids, tags})
pidTags = append(pidTags, pidsTags{pids, tags})
}
return pidTags, nil
}
@ -559,30 +559,30 @@ func (p *Procstat) supervisorPIDs() ([]string, map[string]map[string]string, err
return p.SupervisorUnits, mainPids, nil
}
func (p *Procstat) systemdUnitPIDs() ([]PidsTags, error) {
func (p *Procstat) systemdUnitPIDs() ([]pidsTags, error) {
if p.IncludeSystemdChildren {
p.CGroup = "systemd/system.slice/" + p.SystemdUnit
return p.cgroupPIDs()
}
var pidTags []PidsTags
var pidTags []pidsTags
pids, err := p.simpleSystemdUnitPIDs()
if err != nil {
return nil, err
}
tags := map[string]string{"systemd_unit": p.SystemdUnit}
pidTags = append(pidTags, PidsTags{pids, tags})
pidTags = append(pidTags, pidsTags{pids, tags})
return pidTags, nil
}
func (p *Procstat) simpleSystemdUnitPIDs() ([]PID, error) {
func (p *Procstat) simpleSystemdUnitPIDs() ([]pid, error) {
out, err := execCommand("systemctl", "show", p.SystemdUnit).Output()
if err != nil {
return nil, err
}
lines := bytes.Split(out, []byte{'\n'})
pids := make([]PID, 0, len(lines))
pids := make([]pid, 0, len(lines))
for _, line := range lines {
kv := bytes.SplitN(line, []byte{'='}, 2)
if len(kv) != 2 {
@ -594,17 +594,17 @@ func (p *Procstat) simpleSystemdUnitPIDs() ([]PID, error) {
if len(kv[1]) == 0 || bytes.Equal(kv[1], []byte("0")) {
return nil, nil
}
pid, err := strconv.ParseInt(string(kv[1]), 10, 32)
processID, err := strconv.ParseInt(string(kv[1]), 10, 32)
if err != nil {
return nil, fmt.Errorf("invalid pid %q", kv[1])
}
pids = append(pids, PID(pid))
pids = append(pids, pid(processID))
}
return pids, nil
}
func (p *Procstat) cgroupPIDs() ([]PidsTags, error) {
func (p *Procstat) cgroupPIDs() ([]pidsTags, error) {
procsPath := p.CGroup
if procsPath[0] != '/' {
procsPath = "/sys/fs/cgroup/" + procsPath
@ -615,20 +615,20 @@ func (p *Procstat) cgroupPIDs() ([]PidsTags, error) {
return nil, fmt.Errorf("glob failed: %w", err)
}
pidTags := make([]PidsTags, 0, len(items))
pidTags := make([]pidsTags, 0, len(items))
for _, item := range items {
pids, err := p.singleCgroupPIDs(item)
if err != nil {
return nil, err
}
tags := map[string]string{"cgroup": p.CGroup, "cgroup_full": item}
pidTags = append(pidTags, PidsTags{pids, tags})
pidTags = append(pidTags, pidsTags{pids, tags})
}
return pidTags, nil
}
func (p *Procstat) singleCgroupPIDs(path string) ([]PID, error) {
func (p *Procstat) singleCgroupPIDs(path string) ([]pid, error) {
ok, err := isDir(path)
if err != nil {
return nil, err
@ -643,16 +643,16 @@ func (p *Procstat) singleCgroupPIDs(path string) ([]PID, error) {
}
lines := bytes.Split(out, []byte{'\n'})
pids := make([]PID, 0, len(lines))
pids := make([]pid, 0, len(lines))
for _, pidBS := range lines {
if len(pidBS) == 0 {
continue
}
pid, err := strconv.ParseInt(string(pidBS), 10, 32)
processID, err := strconv.ParseInt(string(pidBS), 10, 32)
if err != nil {
return nil, fmt.Errorf("invalid pid %q", pidBS)
}
pids = append(pids, PID(pid))
pids = append(pids, pid(processID))
}
return pids, nil
@ -666,15 +666,15 @@ func isDir(path string) (bool, error) {
return result.IsDir(), nil
}
func (p *Procstat) winServicePIDs() ([]PID, error) {
var pids []PID
func (p *Procstat) winServicePIDs() ([]pid, error) {
var pids []pid
pid, err := queryPidWithWinServiceName(p.WinService)
processID, err := queryPidWithWinServiceName(p.WinService)
if err != nil {
return pids, err
}
pids = append(pids, PID(pid))
pids = append(pids, pid(processID))
return pids, nil
}

View File

@ -12,7 +12,7 @@ import (
"testing"
"time"
"github.com/shirou/gopsutil/v4/process"
gopsprocess "github.com/shirou/gopsutil/v4/process"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
@ -77,73 +77,69 @@ TestGather_STARTINGsupervisorUnitPIDs STARTING`)
}
type testPgrep struct {
pids []PID
pids []pid
err error
}
func newTestFinder(pids []PID) PIDFinder {
func newTestFinder(pids []pid) pidFinder {
return &testPgrep{
pids: pids,
err: nil,
}
}
func (pg *testPgrep) PidFile(_ string) ([]PID, error) {
func (pg *testPgrep) pidFile(_ string) ([]pid, error) {
return pg.pids, pg.err
}
func (p *testProc) Cmdline() (string, error) {
return "test_proc", nil
}
func (pg *testPgrep) Pattern(_ string) ([]PID, error) {
func (pg *testPgrep) pattern(_ string) ([]pid, error) {
return pg.pids, pg.err
}
func (pg *testPgrep) UID(_ string) ([]PID, error) {
func (pg *testPgrep) uid(_ string) ([]pid, error) {
return pg.pids, pg.err
}
func (pg *testPgrep) FullPattern(_ string) ([]PID, error) {
func (pg *testPgrep) fullPattern(_ string) ([]pid, error) {
return pg.pids, pg.err
}
func (pg *testPgrep) Children(_ PID) ([]PID, error) {
pids := []PID{7311, 8111, 8112}
func (pg *testPgrep) children(_ pid) ([]pid, error) {
pids := []pid{7311, 8111, 8112}
return pids, pg.err
}
type testProc struct {
pid PID
procID pid
tags map[string]string
}
func newTestProc(pid PID) (Process, error) {
func newTestProc(pid pid) (process, error) {
proc := &testProc{
pid: pid,
procID: pid,
tags: make(map[string]string),
}
return proc, nil
}
func (p *testProc) PID() PID {
return p.pid
func (p *testProc) pid() pid {
return p.procID
}
func (p *testProc) Name() (string, error) {
return "test_proc", nil
}
func (p *testProc) SetTag(k, v string) {
func (p *testProc) setTag(k, v string) {
p.tags[k] = v
}
func (p *testProc) MemoryMaps(bool) (*[]process.MemoryMapsStat, error) {
stats := make([]process.MemoryMapsStat, 0)
func (p *testProc) MemoryMaps(bool) (*[]gopsprocess.MemoryMapsStat, error) {
stats := make([]gopsprocess.MemoryMapsStat, 0)
return &stats, nil
}
func (p *testProc) Metrics(prefix string, cfg *collectionConfig, t time.Time) ([]telegraf.Metric, error) {
func (p *testProc) metrics(prefix string, cfg *collectionConfig, t time.Time) ([]telegraf.Metric, error) {
if prefix != "" {
prefix += "_"
}
@ -190,9 +186,9 @@ func (p *testProc) Metrics(prefix string, cfg *collectionConfig, t time.Time) ([
}
if cfg.tagging["pid"] {
tags["pid"] = strconv.Itoa(int(p.pid))
tags["pid"] = strconv.Itoa(int(p.procID))
} else {
fields["pid"] = int32(p.pid)
fields["pid"] = int32(p.procID)
}
if cfg.tagging["ppid"] {
@ -216,7 +212,7 @@ func (p *testProc) Metrics(prefix string, cfg *collectionConfig, t time.Time) ([
return []telegraf.Metric{metric.New("procstat", tags, fields, t)}, nil
}
var pid = PID(42)
var processID = pid(42)
var exe = "foo"
func TestInitInvalidFinder(t *testing.T) {
@ -277,8 +273,8 @@ func TestGather_CreateProcessErrorOk(t *testing.T) {
PidFinder: "test",
Properties: []string{"cpu", "memory", "mmap"},
Log: testutil.Logger{},
finder: newTestFinder([]PID{pid}),
createProcess: func(PID) (Process, error) {
finder: newTestFinder([]pid{processID}),
createProcess: func(pid) (process, error) {
return nil, errors.New("createProcess error")
},
}
@ -350,7 +346,7 @@ func TestGather_ProcessName(t *testing.T) {
PidFinder: "test",
Properties: []string{"cpu", "memory", "mmap"},
Log: testutil.Logger{},
finder: newTestFinder([]PID{pid}),
finder: newTestFinder([]pid{processID}),
createProcess: newTestProc,
}
require.NoError(t, p.Init())
@ -362,14 +358,14 @@ func TestGather_ProcessName(t *testing.T) {
}
func TestGather_NoProcessNameUsesReal(t *testing.T) {
pid := PID(os.Getpid())
processID := pid(os.Getpid())
p := Procstat{
Exe: exe,
PidFinder: "test",
Properties: []string{"cpu", "memory", "mmap"},
Log: testutil.Logger{},
finder: newTestFinder([]PID{pid}),
finder: newTestFinder([]pid{processID}),
createProcess: newTestProc,
}
require.NoError(t, p.Init())
@ -386,7 +382,7 @@ func TestGather_NoPidTag(t *testing.T) {
PidFinder: "test",
Properties: []string{"cpu", "memory", "mmap"},
Log: testutil.Logger{},
finder: newTestFinder([]PID{pid}),
finder: newTestFinder([]pid{processID}),
createProcess: newTestProc,
}
require.NoError(t, p.Init())
@ -405,7 +401,7 @@ func TestGather_PidTag(t *testing.T) {
PidFinder: "test",
Properties: []string{"cpu", "memory", "mmap"},
Log: testutil.Logger{},
finder: newTestFinder([]PID{pid}),
finder: newTestFinder([]pid{processID}),
createProcess: newTestProc,
}
require.NoError(t, p.Init())
@ -424,7 +420,7 @@ func TestGather_Prefix(t *testing.T) {
PidFinder: "test",
Properties: []string{"cpu", "memory", "mmap"},
Log: testutil.Logger{},
finder: newTestFinder([]PID{pid}),
finder: newTestFinder([]pid{processID}),
createProcess: newTestProc,
}
require.NoError(t, p.Init())
@ -441,7 +437,7 @@ func TestGather_Exe(t *testing.T) {
PidFinder: "test",
Properties: []string{"cpu", "memory", "mmap"},
Log: testutil.Logger{},
finder: newTestFinder([]PID{pid}),
finder: newTestFinder([]pid{processID}),
createProcess: newTestProc,
}
require.NoError(t, p.Init())
@ -460,7 +456,7 @@ func TestGather_User(t *testing.T) {
PidFinder: "test",
Properties: []string{"cpu", "memory", "mmap"},
Log: testutil.Logger{},
finder: newTestFinder([]PID{pid}),
finder: newTestFinder([]pid{processID}),
createProcess: newTestProc,
}
require.NoError(t, p.Init())
@ -479,7 +475,7 @@ func TestGather_Pattern(t *testing.T) {
PidFinder: "test",
Properties: []string{"cpu", "memory", "mmap"},
Log: testutil.Logger{},
finder: newTestFinder([]PID{pid}),
finder: newTestFinder([]pid{processID}),
createProcess: newTestProc,
}
require.NoError(t, p.Init())
@ -498,7 +494,7 @@ func TestGather_PidFile(t *testing.T) {
PidFinder: "test",
Properties: []string{"cpu", "memory", "mmap"},
Log: testutil.Logger{},
finder: newTestFinder([]PID{pid}),
finder: newTestFinder([]pid{processID}),
createProcess: newTestProc,
}
require.NoError(t, p.Init())
@ -510,7 +506,7 @@ func TestGather_PidFile(t *testing.T) {
}
func TestGather_PercentFirstPass(t *testing.T) {
pid := PID(os.Getpid())
processID := pid(os.Getpid())
p := Procstat{
Pattern: "foo",
@ -518,7 +514,7 @@ func TestGather_PercentFirstPass(t *testing.T) {
PidFinder: "test",
Properties: []string{"cpu", "memory", "mmap"},
Log: testutil.Logger{},
finder: newTestFinder([]PID{pid}),
finder: newTestFinder([]pid{processID}),
createProcess: newProc,
}
require.NoError(t, p.Init())
@ -531,7 +527,7 @@ func TestGather_PercentFirstPass(t *testing.T) {
}
func TestGather_PercentSecondPass(t *testing.T) {
pid := PID(os.Getpid())
processID := pid(os.Getpid())
p := Procstat{
Pattern: "foo",
@ -539,7 +535,7 @@ func TestGather_PercentSecondPass(t *testing.T) {
PidFinder: "test",
Properties: []string{"cpu", "memory", "mmap"},
Log: testutil.Logger{},
finder: newTestFinder([]PID{pid}),
finder: newTestFinder([]pid{processID}),
createProcess: newProc,
}
require.NoError(t, p.Init())
@ -558,7 +554,7 @@ func TestGather_systemdUnitPIDs(t *testing.T) {
PidFinder: "test",
Properties: []string{"cpu", "memory", "mmap"},
Log: testutil.Logger{},
finder: newTestFinder([]PID{pid}),
finder: newTestFinder([]pid{processID}),
}
require.NoError(t, p.Init())
@ -566,7 +562,7 @@ func TestGather_systemdUnitPIDs(t *testing.T) {
require.NoError(t, err)
for _, pidsTag := range pidsTags {
require.Equal(t, []PID{11408}, pidsTag.PIDs)
require.Equal(t, []pid{11408}, pidsTag.PIDs)
require.Equal(t, "TestGather_systemdUnitPIDs", pidsTag.Tags["systemd_unit"])
}
}
@ -585,14 +581,14 @@ func TestGather_cgroupPIDs(t *testing.T) {
PidFinder: "test",
Properties: []string{"cpu", "memory", "mmap"},
Log: testutil.Logger{},
finder: newTestFinder([]PID{pid}),
finder: newTestFinder([]pid{processID}),
}
require.NoError(t, p.Init())
pidsTags, err := p.findPids()
require.NoError(t, err)
for _, pidsTag := range pidsTags {
require.Equal(t, []PID{1234, 5678}, pidsTag.PIDs)
require.Equal(t, []pid{1234, 5678}, pidsTag.PIDs)
require.Equal(t, td, pidsTag.Tags["cgroup"])
}
}
@ -603,7 +599,7 @@ func TestProcstatLookupMetric(t *testing.T) {
PidFinder: "test",
Properties: []string{"cpu", "memory", "mmap"},
Log: testutil.Logger{},
finder: newTestFinder([]PID{543}),
finder: newTestFinder([]pid{543}),
createProcess: newProc,
}
require.NoError(t, p.Init())
@ -621,7 +617,7 @@ func TestGather_SameTimestamps(t *testing.T) {
PidFinder: "test",
Properties: []string{"cpu", "memory", "mmap"},
Log: testutil.Logger{},
finder: newTestFinder([]PID{pid}),
finder: newTestFinder([]pid{processID}),
createProcess: newTestProc,
}
require.NoError(t, p.Init())
@ -641,14 +637,14 @@ func TestGather_supervisorUnitPIDs(t *testing.T) {
PidFinder: "test",
Properties: []string{"cpu", "memory", "mmap"},
Log: testutil.Logger{},
finder: newTestFinder([]PID{pid}),
finder: newTestFinder([]pid{processID}),
}
require.NoError(t, p.Init())
pidsTags, err := p.findPids()
require.NoError(t, err)
for _, pidsTag := range pidsTags {
require.Equal(t, []PID{7311, 8111, 8112}, pidsTag.PIDs)
require.Equal(t, []pid{7311, 8111, 8112}, pidsTag.PIDs)
require.Equal(t, "TestGather_supervisorUnitPIDs", pidsTag.Tags["supervisor_unit"])
}
}
@ -659,7 +655,7 @@ func TestGather_MoresupervisorUnitPIDs(t *testing.T) {
PidFinder: "test",
Properties: []string{"cpu", "memory", "mmap"},
Log: testutil.Logger{},
finder: newTestFinder([]PID{pid}),
finder: newTestFinder([]pid{processID}),
}
require.NoError(t, p.Init())

View File

@ -8,8 +8,9 @@ import (
"strconv"
"strings"
gopsprocess "github.com/shirou/gopsutil/v4/process"
"github.com/influxdata/telegraf"
"github.com/shirou/gopsutil/v4/process"
)
type processFinder struct {
@ -36,13 +37,13 @@ func (f *processFinder) findByPidFiles(paths []string) ([]processGroup, error) {
return nil, fmt.Errorf("failed to parse PID in file %q: %w", path, err)
}
p, err := process.NewProcess(int32(pid))
p, err := gopsprocess.NewProcess(int32(pid))
if err != nil && !f.errPidFiles[path] {
f.log.Errorf("failed to find process for PID %d of file %q: %v", pid, path, err)
f.errPidFiles[path] = true
}
groups = append(groups, processGroup{
processes: []*process.Process{p},
processes: []*gopsprocess.Process{p},
tags: map[string]string{"pidfile": path},
})
}
@ -76,7 +77,7 @@ func findByCgroups(cgroups []string) ([]processGroup, error) {
return nil, err
}
lines := bytes.Split(buf, []byte{'\n'})
procs := make([]*process.Process, 0, len(lines))
procs := make([]*gopsprocess.Process, 0, len(lines))
for _, l := range lines {
l := strings.TrimSpace(string(l))
if len(l) == 0 {
@ -86,7 +87,7 @@ func findByCgroups(cgroups []string) ([]processGroup, error) {
if err != nil {
return nil, fmt.Errorf("failed to parse PID %q in file %q", l, fpath)
}
p, err := process.NewProcess(int32(pid))
p, err := gopsprocess.NewProcess(int32(pid))
if err != nil {
return nil, fmt.Errorf("failed to find process for PID %d of %q: %w", pid, fpath, err)
}
@ -130,7 +131,7 @@ func findBySupervisorUnits(units string) ([]processGroup, error) {
"status": status,
}
var procs []*process.Process
var procs []*gopsprocess.Process
switch status {
case "FATAL", "EXITED", "BACKOFF", "STOPPING":
tags["error"] = strings.Join(kv[2:], " ")
@ -141,7 +142,7 @@ func findBySupervisorUnits(units string) ([]processGroup, error) {
if err != nil {
return nil, fmt.Errorf("failed to parse group PID %q: %w", rawpid, err)
}
p, err := process.NewProcess(int32(grouppid))
p, err := gopsprocess.NewProcess(int32(grouppid))
if err != nil {
return nil, fmt.Errorf("failed to find process for PID %d of unit %q: %w", grouppid, name, err)
}

View File

@ -14,17 +14,17 @@ import (
"github.com/influxdata/telegraf/config"
)
type ConsulConfig struct {
type consulConfig struct {
// Address of the Consul agent. The address must contain a hostname or an IP address
// and optionally a port (format: "host:port").
Enabled bool `toml:"enabled"`
Agent string `toml:"agent"`
QueryInterval config.Duration `toml:"query_interval"`
Queries []*ConsulQuery `toml:"query"`
Queries []*consulQuery `toml:"query"`
}
// One Consul service discovery query
type ConsulQuery struct {
type consulQuery struct {
// A name of the searched services (not ID)
ServiceName string `toml:"name"`
@ -128,7 +128,7 @@ func (p *Prometheus) startConsul(ctx context.Context) error {
}
func (p *Prometheus) refreshConsulServices(c *api.Catalog) error {
consulServiceURLs := make(map[string]URLAndAddress)
consulServiceURLs := make(map[string]urlAndAddress)
p.Log.Debugf("Refreshing Consul services")
@ -165,8 +165,8 @@ func (p *Prometheus) refreshConsulServices(c *api.Catalog) error {
p.Log.Infof("Created scrape URLs from Consul for Service (%s, %s)", q.ServiceName, q.ServiceTag)
}
q.lastQueryFailed = false
p.Log.Debugf("Adding scrape URL from Consul for Service (%s, %s): %s", q.ServiceName, q.ServiceTag, uaa.URL.String())
consulServiceURLs[uaa.URL.String()] = *uaa
p.Log.Debugf("Adding scrape URL from Consul for Service (%s, %s): %s", q.ServiceName, q.ServiceTag, uaa.url.String())
consulServiceURLs[uaa.url.String()] = *uaa
}
}
@ -177,7 +177,7 @@ func (p *Prometheus) refreshConsulServices(c *api.Catalog) error {
return nil
}
func (p *Prometheus) getConsulServiceURL(q *ConsulQuery, s *api.CatalogService) (*URLAndAddress, error) {
func (p *Prometheus) getConsulServiceURL(q *consulQuery, s *api.CatalogService) (*urlAndAddress, error) {
var buffer bytes.Buffer
buffer.Reset()
err := q.serviceURLTemplate.Execute(&buffer, s)
@ -201,9 +201,9 @@ func (p *Prometheus) getConsulServiceURL(q *ConsulQuery, s *api.CatalogService)
p.Log.Debugf("Will scrape metrics from Consul Service %s", serviceURL.String())
return &URLAndAddress{
URL: serviceURL,
OriginalURL: serviceURL,
Tags: extraTags,
return &urlAndAddress{
url: serviceURL,
originalURL: serviceURL,
tags: extraTags,
}, nil
}

View File

@ -124,11 +124,11 @@ func shouldScrapePod(pod *corev1.Pod, p *Prometheus) bool {
var shouldScrape bool
switch p.MonitorKubernetesPodsMethod {
case MonitorMethodAnnotations: // must have 'true' annotation to be scraped
case monitorMethodAnnotations: // must have 'true' annotation to be scraped
shouldScrape = pod.Annotations != nil && pod.Annotations["prometheus.io/scrape"] == "true"
case MonitorMethodSettings: // will be scraped regardless of annotation
case monitorMethodSettings: // will be scraped regardless of annotation
shouldScrape = true
case MonitorMethodSettingsAndAnnotations: // will be scraped unless opts out with 'false' annotation
case monitorMethodSettingsAndAnnotations: // will be scraped unless opts out with 'false' annotation
shouldScrape = pod.Annotations == nil || pod.Annotations["prometheus.io/scrape"] != "false"
}
@ -194,7 +194,7 @@ func (p *Prometheus) watchPod(ctx context.Context, clientset *kubernetes.Clients
if err != nil {
p.Log.Errorf("getting key from cache %s", err.Error())
}
podID := PodID(key)
podID := podID(key)
if shouldScrapePod(newPod, p) {
// When Informers re-Lists, pod might already be registered,
// do nothing if it is, register otherwise
@ -209,7 +209,7 @@ func (p *Prometheus) watchPod(ctx context.Context, clientset *kubernetes.Clients
DeleteFunc: func(oldObj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(oldObj)
if err == nil {
unregisterPod(PodID(key), p)
unregisterPod(podID(key), p)
}
},
})
@ -280,7 +280,7 @@ func updateCadvisorPodList(p *Prometheus, req *http.Request) error {
// Updating pod list to be latest cadvisor response
p.lock.Lock()
p.kubernetesPods = make(map[PodID]URLAndAddress)
p.kubernetesPods = make(map[podID]urlAndAddress)
// Register pod only if it has an annotation to scrape, if it is ready,
// and if namespace and selectors are specified and match
@ -419,7 +419,7 @@ func registerPod(pod *corev1.Pod, p *Prometheus) {
tags[k] = v
}
}
podURL := p.AddressToURL(targetURL, targetURL.Hostname())
podURL := p.addressToURL(targetURL, targetURL.Hostname())
// Locks earlier if using cAdvisor calls - makes a new list each time
// rather than updating and removing from the same list
@ -427,12 +427,12 @@ func registerPod(pod *corev1.Pod, p *Prometheus) {
p.lock.Lock()
defer p.lock.Unlock()
}
p.kubernetesPods[PodID(pod.GetNamespace()+"/"+pod.GetName())] = URLAndAddress{
URL: podURL,
Address: targetURL.Hostname(),
OriginalURL: targetURL,
Tags: tags,
Namespace: pod.GetNamespace(),
p.kubernetesPods[podID(pod.GetNamespace()+"/"+pod.GetName())] = urlAndAddress{
url: podURL,
address: targetURL.Hostname(),
originalURL: targetURL,
tags: tags,
namespace: pod.GetNamespace(),
}
}
@ -446,15 +446,15 @@ func getScrapeURL(pod *corev1.Pod, p *Prometheus) (*url.URL, error) {
var scheme, pathAndQuery, port string
if p.MonitorKubernetesPodsMethod == MonitorMethodSettings ||
p.MonitorKubernetesPodsMethod == MonitorMethodSettingsAndAnnotations {
if p.MonitorKubernetesPodsMethod == monitorMethodSettings ||
p.MonitorKubernetesPodsMethod == monitorMethodSettingsAndAnnotations {
scheme = p.MonitorKubernetesPodsScheme
pathAndQuery = p.MonitorKubernetesPodsPath
port = strconv.Itoa(p.MonitorKubernetesPodsPort)
}
if p.MonitorKubernetesPodsMethod == MonitorMethodAnnotations ||
p.MonitorKubernetesPodsMethod == MonitorMethodSettingsAndAnnotations {
if p.MonitorKubernetesPodsMethod == monitorMethodAnnotations ||
p.MonitorKubernetesPodsMethod == monitorMethodSettingsAndAnnotations {
if ann := pod.Annotations["prometheus.io/scheme"]; ann != "" {
scheme = ann
}
@ -489,12 +489,12 @@ func getScrapeURL(pod *corev1.Pod, p *Prometheus) (*url.URL, error) {
return base, nil
}
func unregisterPod(podID PodID, p *Prometheus) {
func unregisterPod(podID podID, p *Prometheus) {
p.lock.Lock()
defer p.lock.Unlock()
if v, ok := p.kubernetesPods[podID]; ok {
p.Log.Debugf("registered a delete request for %s", podID)
delete(p.kubernetesPods, podID)
p.Log.Debugf("will stop scraping for %q", v.URL.String())
p.Log.Debugf("will stop scraping for %q", v.url.String())
}
}

View File

@ -18,8 +18,8 @@ func initPrometheus() *Prometheus {
prom.MonitorKubernetesPodsScheme = "http"
prom.MonitorKubernetesPodsPort = 9102
prom.MonitorKubernetesPodsPath = "/metrics"
prom.MonitorKubernetesPodsMethod = MonitorMethodAnnotations
prom.kubernetesPods = map[PodID]URLAndAddress{}
prom.MonitorKubernetesPodsMethod = monitorMethodAnnotations
prom.kubernetesPods = map[podID]urlAndAddress{}
return prom
}
@ -34,7 +34,7 @@ func TestScrapeURLNoAnnotations(t *testing.T) {
func TestScrapeURLNoAnnotationsScrapeConfig(t *testing.T) {
prom := initPrometheus()
prom.MonitorKubernetesPodsMethod = MonitorMethodSettingsAndAnnotations
prom.MonitorKubernetesPodsMethod = monitorMethodSettingsAndAnnotations
p := pod()
p.Annotations = map[string]string{}
@ -45,7 +45,7 @@ func TestScrapeURLNoAnnotationsScrapeConfig(t *testing.T) {
func TestScrapeURLScrapeConfigCustom(t *testing.T) {
prom := initPrometheus()
prom.MonitorKubernetesPodsMethod = MonitorMethodSettingsAndAnnotations
prom.MonitorKubernetesPodsMethod = monitorMethodSettingsAndAnnotations
prom.MonitorKubernetesPodsScheme = "https"
prom.MonitorKubernetesPodsPort = 9999
@ -66,7 +66,7 @@ func TestScrapeURLAnnotations(t *testing.T) {
func TestScrapeURLAnnotationsScrapeConfig(t *testing.T) {
prom := initPrometheus()
prom.MonitorKubernetesPodsMethod = MonitorMethodSettingsAndAnnotations
prom.MonitorKubernetesPodsMethod = monitorMethodSettingsAndAnnotations
p := pod()
url, err := getScrapeURL(p, prom)
require.NoError(t, err)
@ -84,7 +84,7 @@ func TestScrapeURLAnnotationsCustomPort(t *testing.T) {
func TestScrapeURLAnnotationsCustomPortScrapeConfig(t *testing.T) {
prom := initPrometheus()
prom.MonitorKubernetesPodsMethod = MonitorMethodSettingsAndAnnotations
prom.MonitorKubernetesPodsMethod = monitorMethodSettingsAndAnnotations
p := pod()
p.Annotations = map[string]string{"prometheus.io/port": "9000"}
url, err := getScrapeURL(p, prom)
@ -129,7 +129,7 @@ func TestScrapeURLAnnotationsCustomPathWithFragment(t *testing.T) {
}
func TestAddPod(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[PodID]URLAndAddress{}}
prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[podID]urlAndAddress{}}
p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true"}
@ -139,7 +139,7 @@ func TestAddPod(t *testing.T) {
func TestAddPodScrapeConfig(t *testing.T) {
prom := initPrometheus()
prom.MonitorKubernetesPodsMethod = MonitorMethodSettingsAndAnnotations
prom.MonitorKubernetesPodsMethod = monitorMethodSettingsAndAnnotations
p := pod()
p.Annotations = map[string]string{}
@ -148,7 +148,7 @@ func TestAddPodScrapeConfig(t *testing.T) {
}
func TestAddMultipleDuplicatePods(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[PodID]URLAndAddress{}}
prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[podID]urlAndAddress{}}
p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true"}
@ -156,13 +156,13 @@ func TestAddMultipleDuplicatePods(t *testing.T) {
p.Name = "Pod2"
registerPod(p, prom)
urls, err := prom.GetAllURLs()
urls, err := prom.getAllURLs()
require.NoError(t, err)
require.Len(t, urls, 1)
}
func TestAddMultiplePods(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[PodID]URLAndAddress{}}
prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[podID]urlAndAddress{}}
p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true"}
@ -174,41 +174,41 @@ func TestAddMultiplePods(t *testing.T) {
}
func TestDeletePods(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[PodID]URLAndAddress{}}
prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[podID]urlAndAddress{}}
p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom)
podID, err := cache.MetaNamespaceKeyFunc(p)
id, err := cache.MetaNamespaceKeyFunc(p)
require.NoError(t, err)
unregisterPod(PodID(podID), prom)
unregisterPod(podID(id), prom)
require.Empty(t, prom.kubernetesPods)
}
func TestKeepDefaultNamespaceLabelName(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[PodID]URLAndAddress{}}
prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[podID]urlAndAddress{}}
p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom)
podID, err := cache.MetaNamespaceKeyFunc(p)
id, err := cache.MetaNamespaceKeyFunc(p)
require.NoError(t, err)
tags := prom.kubernetesPods[PodID(podID)].Tags
tags := prom.kubernetesPods[podID(id)].tags
require.Equal(t, "default", tags["namespace"])
}
func TestChangeNamespaceLabelName(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}, PodNamespaceLabelName: "pod_namespace", kubernetesPods: map[PodID]URLAndAddress{}}
prom := &Prometheus{Log: testutil.Logger{}, PodNamespaceLabelName: "pod_namespace", kubernetesPods: map[podID]urlAndAddress{}}
p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom)
podID, err := cache.MetaNamespaceKeyFunc(p)
id, err := cache.MetaNamespaceKeyFunc(p)
require.NoError(t, err)
tags := prom.kubernetesPods[PodID(podID)].Tags
tags := prom.kubernetesPods[podID(id)].tags
require.Equal(t, "default", tags["pod_namespace"])
require.Equal(t, "", tags["namespace"])
}
@ -300,14 +300,14 @@ func TestAnnotationFilters(t *testing.T) {
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[PodID]URLAndAddress{}}
prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[podID]urlAndAddress{}}
prom.PodAnnotationInclude = tc.include
prom.PodAnnotationExclude = tc.exclude
require.NoError(t, prom.initFilters())
registerPod(p, prom)
for _, pd := range prom.kubernetesPods {
for _, tagKey := range tc.expectedTags {
require.Contains(t, pd.Tags, tagKey)
require.Contains(t, pd.tags, tagKey)
}
}
})
@ -345,14 +345,14 @@ func TestLabelFilters(t *testing.T) {
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[PodID]URLAndAddress{}}
prom := &Prometheus{Log: testutil.Logger{}, kubernetesPods: map[podID]urlAndAddress{}}
prom.PodLabelInclude = tc.include
prom.PodLabelExclude = tc.exclude
require.NoError(t, prom.initFilters())
registerPod(p, prom)
for _, pd := range prom.kubernetesPods {
for _, tagKey := range tc.expectedTags {
require.Contains(t, pd.Tags, tagKey)
require.Contains(t, pd.tags, tagKey)
}
}
})

View File

@ -34,18 +34,14 @@ import (
//go:embed sample.conf
var sampleConfig string
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3`
type MonitorMethod string
const (
MonitorMethodNone MonitorMethod = ""
MonitorMethodAnnotations MonitorMethod = "annotations"
MonitorMethodSettings MonitorMethod = "settings"
MonitorMethodSettingsAndAnnotations MonitorMethod = "settings+annotations"
)
acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3`
type PodID string
monitorMethodNone monitorMethod = ""
monitorMethodAnnotations monitorMethod = "annotations"
monitorMethodSettings monitorMethod = "settings"
monitorMethodSettingsAndAnnotations monitorMethod = "settings+annotations"
)
type Prometheus struct {
URLs []string `toml:"urls"`
@ -72,7 +68,7 @@ type Prometheus struct {
KubeConfig string `toml:"kube_config"`
KubernetesLabelSelector string `toml:"kubernetes_label_selector"`
KubernetesFieldSelector string `toml:"kubernetes_field_selector"`
MonitorKubernetesPodsMethod MonitorMethod `toml:"monitor_kubernetes_pods_method"`
MonitorKubernetesPodsMethod monitorMethod `toml:"monitor_kubernetes_pods_method"`
MonitorKubernetesPodsScheme string `toml:"monitor_kubernetes_pods_scheme"`
MonitorKubernetesPodsPath string `toml:"monitor_kubernetes_pods_path"`
MonitorKubernetesPodsPort int `toml:"monitor_kubernetes_pods_port"`
@ -85,7 +81,7 @@ type Prometheus struct {
CacheRefreshInterval int `toml:"cache_refresh_interval"`
// Consul discovery
ConsulConfig ConsulConfig `toml:"consul"`
ConsulConfig consulConfig `toml:"consul"`
Log telegraf.Logger `toml:"-"`
common_http.HTTPClientConfig
@ -100,7 +96,7 @@ type Prometheus struct {
// Should we scrape Kubernetes services for prometheus annotations
lock sync.Mutex
kubernetesPods map[PodID]URLAndAddress
kubernetesPods map[podID]urlAndAddress
cancel context.CancelFunc
wg sync.WaitGroup
@ -114,9 +110,21 @@ type Prometheus struct {
podLabelExcludeFilter filter.Filter
// List of consul services to scrape
consulServices map[string]URLAndAddress
consulServices map[string]urlAndAddress
}
type urlAndAddress struct {
originalURL *url.URL
url *url.URL
address string
tags map[string]string
namespace string
}
type monitorMethod string
type podID string
func (*Prometheus) SampleConfig() string {
return sampleConfig
}
@ -164,8 +172,8 @@ func (p *Prometheus) Init() error {
p.Log.Infof("Using pod scrape scope at node level to get pod list using cAdvisor.")
}
if p.MonitorKubernetesPodsMethod == MonitorMethodNone {
p.MonitorKubernetesPodsMethod = MonitorMethodAnnotations
if p.MonitorKubernetesPodsMethod == monitorMethodNone {
p.MonitorKubernetesPodsMethod = monitorMethodAnnotations
}
// Parse label and field selectors - will be used to filter pods after cAdvisor call
@ -239,11 +247,65 @@ func (p *Prometheus) Init() error {
"Accept": acceptHeader,
}
p.kubernetesPods = make(map[PodID]URLAndAddress)
p.kubernetesPods = make(map[podID]urlAndAddress)
return nil
}
// Start will start the Kubernetes and/or Consul scraping if enabled in the configuration
func (p *Prometheus) Start(_ telegraf.Accumulator) error {
var ctx context.Context
p.wg = sync.WaitGroup{}
ctx, p.cancel = context.WithCancel(context.Background())
if p.ConsulConfig.Enabled && len(p.ConsulConfig.Queries) > 0 {
if err := p.startConsul(ctx); err != nil {
return err
}
}
if p.MonitorPods {
if err := p.startK8s(ctx); err != nil {
return err
}
}
return nil
}
func (p *Prometheus) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
allURLs, err := p.getAllURLs()
if err != nil {
return err
}
for _, URL := range allURLs {
wg.Add(1)
go func(serviceURL urlAndAddress) {
defer wg.Done()
requestFields, tags, err := p.gatherURL(serviceURL, acc)
acc.AddError(err)
// Add metrics
if p.EnableRequestMetrics {
acc.AddFields("prometheus_request", requestFields, tags)
}
}(URL)
}
wg.Wait()
return nil
}
func (p *Prometheus) Stop() {
p.cancel()
p.wg.Wait()
if p.client != nil {
p.client.CloseIdleConnections()
}
}
func (p *Prometheus) initFilters() error {
if p.PodAnnotationExclude != nil {
podAnnotationExclude, err := filter.Compile(p.PodAnnotationExclude)
@ -276,7 +338,7 @@ func (p *Prometheus) initFilters() error {
return nil
}
func (p *Prometheus) AddressToURL(u *url.URL, address string) *url.URL {
func (p *Prometheus) addressToURL(u *url.URL, address string) *url.URL {
host := address
if u.Port() != "" {
host = address + ":" + u.Port()
@ -295,23 +357,15 @@ func (p *Prometheus) AddressToURL(u *url.URL, address string) *url.URL {
return reconstructedURL
}
type URLAndAddress struct {
OriginalURL *url.URL
URL *url.URL
Address string
Tags map[string]string
Namespace string
}
func (p *Prometheus) GetAllURLs() (map[string]URLAndAddress, error) {
allURLs := make(map[string]URLAndAddress, len(p.URLs)+len(p.consulServices)+len(p.kubernetesPods))
func (p *Prometheus) getAllURLs() (map[string]urlAndAddress, error) {
allURLs := make(map[string]urlAndAddress, len(p.URLs)+len(p.consulServices)+len(p.kubernetesPods))
for _, u := range p.URLs {
address, err := url.Parse(u)
if err != nil {
p.Log.Errorf("Could not parse %q, skipping it. Error: %s", u, err.Error())
continue
}
allURLs[address.String()] = URLAndAddress{URL: address, OriginalURL: address}
allURLs[address.String()] = urlAndAddress{url: address, originalURL: address}
}
p.lock.Lock()
@ -322,8 +376,8 @@ func (p *Prometheus) GetAllURLs() (map[string]URLAndAddress, error) {
}
// loop through all pods scraped via the prometheus annotation on the pods
for _, v := range p.kubernetesPods {
if namespaceAnnotationMatch(v.Namespace, p) {
allURLs[v.URL.String()] = v
if namespaceAnnotationMatch(v.namespace, p) {
allURLs[v.url.String()] = v
}
}
@ -339,62 +393,34 @@ func (p *Prometheus) GetAllURLs() (map[string]URLAndAddress, error) {
continue
}
for _, resolved := range resolvedAddresses {
serviceURL := p.AddressToURL(address, resolved)
allURLs[serviceURL.String()] = URLAndAddress{
URL: serviceURL,
Address: resolved,
OriginalURL: address,
serviceURL := p.addressToURL(address, resolved)
allURLs[serviceURL.String()] = urlAndAddress{
url: serviceURL,
address: resolved,
originalURL: address,
}
}
}
return allURLs, nil
}
// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (p *Prometheus) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
allURLs, err := p.GetAllURLs()
if err != nil {
return err
}
for _, URL := range allURLs {
wg.Add(1)
go func(serviceURL URLAndAddress) {
defer wg.Done()
requestFields, tags, err := p.gatherURL(serviceURL, acc)
acc.AddError(err)
// Add metrics
if p.EnableRequestMetrics {
acc.AddFields("prometheus_request", requestFields, tags)
}
}(URL)
}
wg.Wait()
return nil
}
func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) (map[string]interface{}, map[string]string, error) {
func (p *Prometheus) gatherURL(u urlAndAddress, acc telegraf.Accumulator) (map[string]interface{}, map[string]string, error) {
var req *http.Request
var uClient *http.Client
requestFields := make(map[string]interface{})
tags := make(map[string]string, len(u.Tags)+2)
tags := make(map[string]string, len(u.tags)+2)
if p.URLTag != "" {
tags[p.URLTag] = u.OriginalURL.String()
tags[p.URLTag] = u.originalURL.String()
}
if u.Address != "" {
tags["address"] = u.Address
if u.address != "" {
tags["address"] = u.address
}
for k, v := range u.Tags {
for k, v := range u.tags {
tags[k] = v
}
if u.URL.Scheme == "unix" {
path := u.URL.Query().Get("path")
if u.url.Scheme == "unix" {
path := u.url.Query().Get("path")
if path == "" {
path = "/metrics"
}
@ -413,19 +439,19 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) (map[s
TLSClientConfig: tlsCfg,
DisableKeepAlives: true,
Dial: func(string, string) (net.Conn, error) {
c, err := net.Dial("unix", u.URL.Path)
c, err := net.Dial("unix", u.url.Path)
return c, err
},
},
}
} else {
if u.URL.Path == "" {
u.URL.Path = "/metrics"
if u.url.Path == "" {
u.url.Path = "/metrics"
}
var err error
req, err = http.NewRequest("GET", u.URL.String(), nil)
req, err = http.NewRequest("GET", u.url.String(), nil)
if err != nil {
return nil, nil, fmt.Errorf("unable to create new request %q: %w", u.URL.String(), err)
return nil, nil, fmt.Errorf("unable to create new request %q: %w", u.url.String(), err)
}
}
@ -469,7 +495,7 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) (map[s
var err error
var resp *http.Response
var start time.Time
if u.URL.Scheme != "unix" {
if u.url.Scheme != "unix" {
start = time.Now()
//nolint:bodyclose // False positive (because of if-else) - body will be closed in `defer`
resp, err = p.client.Do(req)
@ -480,14 +506,14 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) (map[s
}
end := time.Since(start).Seconds()
if err != nil {
return requestFields, tags, fmt.Errorf("error making HTTP request to %q: %w", u.URL, err)
return requestFields, tags, fmt.Errorf("error making HTTP request to %q: %w", u.url, err)
}
requestFields["response_time"] = end
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return requestFields, tags, fmt.Errorf("%q returned HTTP status %q", u.URL, resp.Status)
return requestFields, tags, fmt.Errorf("%q returned HTTP status %q", u.url, resp.Status)
}
var body []byte
@ -504,7 +530,7 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) (map[s
return requestFields, tags, fmt.Errorf("error reading body: %w", err)
}
if int64(len(body)) > limit {
p.Log.Infof("skipping %s: content length exceeded maximum body size (%d)", u.URL, limit)
p.Log.Infof("skipping %s: content length exceeded maximum body size (%d)", u.url, limit)
return requestFields, tags, nil
}
} else {
@ -539,20 +565,20 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) (map[s
}
metrics, err := metricParser.Parse(body)
if err != nil {
return requestFields, tags, fmt.Errorf("error reading metrics for %q: %w", u.URL, err)
return requestFields, tags, fmt.Errorf("error reading metrics for %q: %w", u.url, err)
}
for _, metric := range metrics {
tags := metric.Tags()
// strip user and password from URL
u.OriginalURL.User = nil
u.originalURL.User = nil
if p.URLTag != "" {
tags[p.URLTag] = u.OriginalURL.String()
tags[p.URLTag] = u.originalURL.String()
}
if u.Address != "" {
tags["address"] = u.Address
if u.address != "" {
tags["address"] = u.address
}
for k, v := range u.Tags {
for k, v := range u.tags {
tags[k] = v
}
@ -603,39 +629,11 @@ func fieldSelectorIsSupported(fieldSelector fields.Selector) (bool, string) {
return true, ""
}
// Start will start the Kubernetes and/or Consul scraping if enabled in the configuration
func (p *Prometheus) Start(_ telegraf.Accumulator) error {
var ctx context.Context
p.wg = sync.WaitGroup{}
ctx, p.cancel = context.WithCancel(context.Background())
if p.ConsulConfig.Enabled && len(p.ConsulConfig.Queries) > 0 {
if err := p.startConsul(ctx); err != nil {
return err
}
}
if p.MonitorPods {
if err := p.startK8s(ctx); err != nil {
return err
}
}
return nil
}
func (p *Prometheus) Stop() {
p.cancel()
p.wg.Wait()
if p.client != nil {
p.client.CloseIdleConnections()
}
}
func init() {
inputs.Add("prometheus", func() telegraf.Input {
return &Prometheus{
kubernetesPods: make(map[PodID]URLAndAddress),
consulServices: make(map[string]URLAndAddress),
kubernetesPods: make(map[podID]urlAndAddress),
consulServices: make(map[string]urlAndAddress),
URLTag: "url",
}
})

View File

@ -630,7 +630,7 @@ func TestInitConfigSelectors(t *testing.T) {
URLs: nil,
URLTag: "url",
MonitorPods: true,
MonitorKubernetesPodsMethod: MonitorMethodSettings,
MonitorKubernetesPodsMethod: monitorMethodSettings,
PodScrapeInterval: 60,
KubernetesLabelSelector: "app=test",
KubernetesFieldSelector: "spec.nodeName=node-0",

View File

@ -23,18 +23,6 @@ func (*Proxmox) SampleConfig() string {
return sampleConfig
}
func (px *Proxmox) Gather(acc telegraf.Accumulator) error {
err := getNodeSearchDomain(px)
if err != nil {
return err
}
gatherLxcData(px, acc)
gatherQemuData(px, acc)
return nil
}
func (px *Proxmox) Init() error {
// Set hostname as default node name for backwards compatibility
if px.NodeName == "" {
@ -57,12 +45,16 @@ func (px *Proxmox) Init() error {
return nil
}
func init() {
inputs.Add("proxmox", func() telegraf.Input {
return &Proxmox{
requestFunction: performRequest,
func (px *Proxmox) Gather(acc telegraf.Accumulator) error {
err := getNodeSearchDomain(px)
if err != nil {
return err
}
})
gatherLxcData(px, acc)
gatherQemuData(px, acc)
return nil
}
func getNodeSearchDomain(px *Proxmox) error {
@ -274,3 +266,11 @@ func getTags(px *Proxmox, name string, vmConfig vmConfig, rt resourceType) map[s
"vm_type": string(rt),
}
}
func init() {
inputs.Add("proxmox", func() telegraf.Input {
return &Proxmox{
requestFunction: performRequest,
}
})
}

View File

@ -10,28 +10,28 @@ import (
"github.com/influxdata/telegraf/plugins/common/tls"
)
var (
qemu resourceType = "qemu"
lxc resourceType = "lxc"
)
type Proxmox struct {
BaseURL string `toml:"base_url"`
APIToken string `toml:"api_token"`
ResponseTimeout config.Duration `toml:"response_timeout"`
NodeName string `toml:"node_name"`
tls.ClientConfig
httpClient *http.Client
nodeSearchDomain string
requestFunction func(px *Proxmox, apiUrl string, method string, data url.Values) ([]byte, error)
Log telegraf.Logger `toml:"-"`
httpClient *http.Client
nodeSearchDomain string
requestFunction func(px *Proxmox, apiUrl string, method string, data url.Values) ([]byte, error)
}
type resourceType string
var (
qemu resourceType = "qemu"
lxc resourceType = "lxc"
)
type vmStats struct {
Data []vmStat `json:"data"`
}

View File

@ -17,12 +17,11 @@ import (
//go:embed sample.conf
var sampleConfig string
// PuppetAgent is a PuppetAgent plugin
type PuppetAgent struct {
Location string
Location string `toml:"location"`
}
type State struct {
type state struct {
Events event
Resources resource
Changes change
@ -101,7 +100,7 @@ func (pa *PuppetAgent) Gather(acc telegraf.Accumulator) error {
return err
}
var puppetState State
var puppetState state
err = yaml.Unmarshal(fh, &puppetState)
if err != nil {
@ -114,7 +113,7 @@ func (pa *PuppetAgent) Gather(acc telegraf.Accumulator) error {
return nil
}
func structPrinter(s *State, acc telegraf.Accumulator, tags map[string]string) {
func structPrinter(s *state, acc telegraf.Accumulator, tags map[string]string) {
e := reflect.ValueOf(s).Elem()
fields := make(map[string]interface{})