chore: Fix linter findings for `revive:exported` in `plugins/inputs/c*` (#16006)

This commit is contained in:
Paweł Żak 2024-10-15 13:06:55 +02:00 committed by GitHub
parent f8af593d33
commit a5c8a89b54
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 586 additions and 609 deletions

View File

@ -149,24 +149,6 @@ func (c *Ceph) gatherClusterStats(acc telegraf.Accumulator) error {
return nil return nil
} }
func init() {
inputs.Add(measurement, func() telegraf.Input {
return &Ceph{
CephBinary: "/usr/bin/ceph",
OsdPrefix: osdPrefix,
MonPrefix: monPrefix,
MdsPrefix: mdsPrefix,
RgwPrefix: rgwPrefix,
SocketDir: "/var/run/ceph",
SocketSuffix: sockSuffix,
CephUser: "client.admin",
CephConfig: "/etc/ceph/ceph.conf",
GatherAdminSocketStats: true,
GatherClusterStats: false,
}
})
}
// Run ceph perf schema on the passed socket. The output is a JSON string // Run ceph perf schema on the passed socket. The output is a JSON string
// mapping collection names to a map of counter names to information. // mapping collection names to a map of counter names to information.
// //
@ -428,8 +410,8 @@ func (c *Ceph) execute(command string) (string, error) {
return output, nil return output, nil
} }
// CephStatus is used to unmarshal "ceph -s" output // status is used to unmarshal "ceph -s" output
type CephStatus struct { type status struct {
FSMap struct { FSMap struct {
NumIn float64 `json:"in"` NumIn float64 `json:"in"`
NumMax float64 `json:"max"` NumMax float64 `json:"max"`
@ -492,12 +474,12 @@ type CephStatus struct {
// decodeStatus decodes the output of 'ceph -s' // decodeStatus decodes the output of 'ceph -s'
func decodeStatus(acc telegraf.Accumulator, input string) error { func decodeStatus(acc telegraf.Accumulator, input string) error {
data := &CephStatus{} data := &status{}
if err := json.Unmarshal([]byte(input), data); err != nil { if err := json.Unmarshal([]byte(input), data); err != nil {
return fmt.Errorf("failed to parse json: %q: %w", input, err) return fmt.Errorf("failed to parse json: %q: %w", input, err)
} }
decoders := []func(telegraf.Accumulator, *CephStatus) error{ decoders := []func(telegraf.Accumulator, *status) error{
decodeStatusFsmap, decodeStatusFsmap,
decodeStatusHealth, decodeStatusHealth,
decodeStatusMonmap, decodeStatusMonmap,
@ -516,7 +498,7 @@ func decodeStatus(acc telegraf.Accumulator, input string) error {
} }
// decodeStatusFsmap decodes the FS map portion of the output of 'ceph -s' // decodeStatusFsmap decodes the FS map portion of the output of 'ceph -s'
func decodeStatusFsmap(acc telegraf.Accumulator, data *CephStatus) error { func decodeStatusFsmap(acc telegraf.Accumulator, data *status) error {
fields := map[string]interface{}{ fields := map[string]interface{}{
"in": data.FSMap.NumIn, "in": data.FSMap.NumIn,
"max": data.FSMap.NumMax, "max": data.FSMap.NumMax,
@ -528,7 +510,7 @@ func decodeStatusFsmap(acc telegraf.Accumulator, data *CephStatus) error {
} }
// decodeStatusHealth decodes the health portion of the output of 'ceph status' // decodeStatusHealth decodes the health portion of the output of 'ceph status'
func decodeStatusHealth(acc telegraf.Accumulator, data *CephStatus) error { func decodeStatusHealth(acc telegraf.Accumulator, data *status) error {
statusCodes := map[string]float64{ statusCodes := map[string]float64{
"HEALTH_ERR": 0, "HEALTH_ERR": 0,
"HEALTH_WARN": 1, "HEALTH_WARN": 1,
@ -544,7 +526,7 @@ func decodeStatusHealth(acc telegraf.Accumulator, data *CephStatus) error {
} }
// decodeStatusMonmap decodes the Mon map portion of the output of 'ceph -s' // decodeStatusMonmap decodes the Mon map portion of the output of 'ceph -s'
func decodeStatusMonmap(acc telegraf.Accumulator, data *CephStatus) error { func decodeStatusMonmap(acc telegraf.Accumulator, data *status) error {
fields := map[string]interface{}{ fields := map[string]interface{}{
"num_mons": data.MonMap.NumMons, "num_mons": data.MonMap.NumMons,
} }
@ -553,7 +535,7 @@ func decodeStatusMonmap(acc telegraf.Accumulator, data *CephStatus) error {
} }
// decodeStatusOsdmap decodes the OSD map portion of the output of 'ceph -s' // decodeStatusOsdmap decodes the OSD map portion of the output of 'ceph -s'
func decodeStatusOsdmap(acc telegraf.Accumulator, data *CephStatus) error { func decodeStatusOsdmap(acc telegraf.Accumulator, data *status) error {
fields := map[string]interface{}{ fields := map[string]interface{}{
"epoch": data.OSDMap.Epoch, "epoch": data.OSDMap.Epoch,
"num_in_osds": data.OSDMap.NumInOSDs, "num_in_osds": data.OSDMap.NumInOSDs,
@ -578,7 +560,7 @@ func decodeStatusOsdmap(acc telegraf.Accumulator, data *CephStatus) error {
} }
// decodeStatusPgmap decodes the PG map portion of the output of 'ceph -s' // decodeStatusPgmap decodes the PG map portion of the output of 'ceph -s'
func decodeStatusPgmap(acc telegraf.Accumulator, data *CephStatus) error { func decodeStatusPgmap(acc telegraf.Accumulator, data *status) error {
fields := map[string]interface{}{ fields := map[string]interface{}{
"bytes_avail": data.PGMap.BytesAvail, "bytes_avail": data.PGMap.BytesAvail,
"bytes_total": data.PGMap.BytesTotal, "bytes_total": data.PGMap.BytesTotal,
@ -609,7 +591,7 @@ func decodeStatusPgmap(acc telegraf.Accumulator, data *CephStatus) error {
} }
// decodeStatusPgmapState decodes the PG map state portion of the output of 'ceph -s' // decodeStatusPgmapState decodes the PG map state portion of the output of 'ceph -s'
func decodeStatusPgmapState(acc telegraf.Accumulator, data *CephStatus) error { func decodeStatusPgmapState(acc telegraf.Accumulator, data *status) error {
for _, pgState := range data.PGMap.PGsByState { for _, pgState := range data.PGMap.PGsByState {
tags := map[string]string{ tags := map[string]string{
"state": pgState.StateName, "state": pgState.StateName,
@ -622,8 +604,8 @@ func decodeStatusPgmapState(acc telegraf.Accumulator, data *CephStatus) error {
return nil return nil
} }
// CephDF is used to unmarshal 'ceph df' output // df is used to unmarshal 'ceph df' output
type CephDf struct { type df struct {
Stats struct { Stats struct {
NumOSDs float64 `json:"num_osds"` NumOSDs float64 `json:"num_osds"`
NumPerPoolOmapOSDs float64 `json:"num_per_pool_omap_osds"` NumPerPoolOmapOSDs float64 `json:"num_per_pool_omap_osds"`
@ -653,7 +635,7 @@ type CephDf struct {
// decodeDf decodes the output of 'ceph df' // decodeDf decodes the output of 'ceph df'
func decodeDf(acc telegraf.Accumulator, input string) error { func decodeDf(acc telegraf.Accumulator, input string) error {
data := &CephDf{} data := &df{}
if err := json.Unmarshal([]byte(input), data); err != nil { if err := json.Unmarshal([]byte(input), data); err != nil {
return fmt.Errorf("failed to parse json: %q: %w", input, err) return fmt.Errorf("failed to parse json: %q: %w", input, err)
} }
@ -705,8 +687,8 @@ func decodeDf(acc telegraf.Accumulator, input string) error {
return nil return nil
} }
// CephOSDPoolStats is used to unmarshal 'ceph osd pool stats' output // osdPoolStats is used to unmarshal 'ceph osd pool stats' output
type CephOSDPoolStats []struct { type osdPoolStats []struct {
PoolName string `json:"pool_name"` PoolName string `json:"pool_name"`
ClientIORate struct { ClientIORate struct {
OpPerSec float64 `json:"op_per_sec"` // This field is no longer reported in ceph 10 and later OpPerSec float64 `json:"op_per_sec"` // This field is no longer reported in ceph 10 and later
@ -732,7 +714,7 @@ type CephOSDPoolStats []struct {
// decodeOsdPoolStats decodes the output of 'ceph osd pool stats' // decodeOsdPoolStats decodes the output of 'ceph osd pool stats'
func decodeOsdPoolStats(acc telegraf.Accumulator, input string) error { func decodeOsdPoolStats(acc telegraf.Accumulator, input string) error {
data := CephOSDPoolStats{} data := osdPoolStats{}
if err := json.Unmarshal([]byte(input), &data); err != nil { if err := json.Unmarshal([]byte(input), &data); err != nil {
return fmt.Errorf("failed to parse json: %q: %w", input, err) return fmt.Errorf("failed to parse json: %q: %w", input, err)
} }
@ -763,3 +745,21 @@ func decodeOsdPoolStats(acc telegraf.Accumulator, input string) error {
return nil return nil
} }
func init() {
inputs.Add(measurement, func() telegraf.Input {
return &Ceph{
CephBinary: "/usr/bin/ceph",
OsdPrefix: osdPrefix,
MonPrefix: monPrefix,
MdsPrefix: mdsPrefix,
RgwPrefix: rgwPrefix,
SocketDir: "/var/run/ceph",
SocketSuffix: sockSuffix,
CephUser: "client.admin",
CephConfig: "/etc/ceph/ceph.conf",
GatherAdminSocketStats: true,
GatherClusterStats: false,
}
})
}

View File

@ -46,43 +46,6 @@ func (*Chrony) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// dialUnix opens an unixgram connection with chrony
func (c *Chrony) dialUnix(address string) (*net.UnixConn, error) {
dir := path.Dir(address)
c.local = path.Join(dir, fmt.Sprintf("chrony-telegraf-%s.sock", uuid.New().String()))
conn, err := net.DialUnix("unixgram",
&net.UnixAddr{Name: c.local, Net: "unixgram"},
&net.UnixAddr{Name: address, Net: "unixgram"},
)
if err != nil {
return nil, err
}
filemode, err := strconv.ParseUint(c.SocketPerms, 8, 32)
if err != nil {
return nil, fmt.Errorf("parsing file mode %q failed: %w", c.SocketPerms, err)
}
if err := os.Chmod(c.local, os.FileMode(filemode)); err != nil {
return nil, fmt.Errorf("changing file mode of %q failed: %w", c.local, err)
}
group, err := user.LookupGroup(c.SocketGroup)
if err != nil {
return nil, fmt.Errorf("looking up group %q failed: %w", c.SocketGroup, err)
}
gid, err := strconv.Atoi(group.Gid)
if err != nil {
return nil, fmt.Errorf("parsing group ID %q failed: %w", group.Gid, err)
}
if err := os.Chown(c.local, os.Getuid(), gid); err != nil {
return nil, fmt.Errorf("changing group of %q failed: %w", c.local, err)
}
return conn, nil
}
func (c *Chrony) Init() error { func (c *Chrony) Init() error {
// Use the configured server, if none set, we try to guess it in Start() // Use the configured server, if none set, we try to guess it in Start()
if c.Server != "" { if c.Server != "" {
@ -182,19 +145,6 @@ func (c *Chrony) Start(_ telegraf.Accumulator) error {
return nil return nil
} }
func (c *Chrony) Stop() {
if c.conn != nil {
if err := c.conn.Close(); err != nil && !errors.Is(err, net.ErrClosed) && !errors.Is(err, syscall.EPIPE) {
c.Log.Errorf("Closing connection to %q failed: %v", c.Server, err)
}
}
if c.local != "" {
if err := os.Remove(c.local); err != nil {
c.Log.Errorf("Removing temporary socket %q failed: %v", c.local, err)
}
}
}
func (c *Chrony) Gather(acc telegraf.Accumulator) error { func (c *Chrony) Gather(acc telegraf.Accumulator) error {
for _, m := range c.Metrics { for _, m := range c.Metrics {
switch m { switch m {
@ -216,6 +166,56 @@ func (c *Chrony) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
func (c *Chrony) Stop() {
if c.conn != nil {
if err := c.conn.Close(); err != nil && !errors.Is(err, net.ErrClosed) && !errors.Is(err, syscall.EPIPE) {
c.Log.Errorf("Closing connection to %q failed: %v", c.Server, err)
}
}
if c.local != "" {
if err := os.Remove(c.local); err != nil {
c.Log.Errorf("Removing temporary socket %q failed: %v", c.local, err)
}
}
}
// dialUnix opens an unixgram connection with chrony
func (c *Chrony) dialUnix(address string) (*net.UnixConn, error) {
dir := path.Dir(address)
c.local = path.Join(dir, fmt.Sprintf("chrony-telegraf-%s.sock", uuid.New().String()))
conn, err := net.DialUnix("unixgram",
&net.UnixAddr{Name: c.local, Net: "unixgram"},
&net.UnixAddr{Name: address, Net: "unixgram"},
)
if err != nil {
return nil, err
}
filemode, err := strconv.ParseUint(c.SocketPerms, 8, 32)
if err != nil {
return nil, fmt.Errorf("parsing file mode %q failed: %w", c.SocketPerms, err)
}
if err := os.Chmod(c.local, os.FileMode(filemode)); err != nil {
return nil, fmt.Errorf("changing file mode of %q failed: %w", c.local, err)
}
group, err := user.LookupGroup(c.SocketGroup)
if err != nil {
return nil, fmt.Errorf("looking up group %q failed: %w", c.SocketGroup, err)
}
gid, err := strconv.Atoi(group.Gid)
if err != nil {
return nil, fmt.Errorf("parsing group ID %q failed: %w", group.Gid, err)
}
if err := os.Chown(c.local, os.Getuid(), gid); err != nil {
return nil, fmt.Errorf("changing group of %q failed: %w", c.local, err)
}
return conn, nil
}
func (c *Chrony) gatherActivity(acc telegraf.Accumulator) error { func (c *Chrony) gatherActivity(acc telegraf.Accumulator) error {
req := fbchrony.NewActivityPacket() req := fbchrony.NewActivityPacket()
r, err := c.client.Communicate(req) r, err := c.client.Communicate(req)

View File

@ -38,17 +38,12 @@ var sampleConfig string
const ( const (
// Maximum telemetry payload size (in bytes) to accept for GRPC dialout transport // Maximum telemetry payload size (in bytes) to accept for GRPC dialout transport
tcpMaxMsgLen uint32 = 1024 * 1024 tcpMaxMsgLen uint32 = 1024 * 1024
// default minimum time between successive pings
// this value is specified in the GRPC docs via GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS
defaultKeepaliveMinTime = config.Duration(time.Second * 300)
) )
// default minimum time between successive pings
// this value is specified in the GRPC docs via GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS
const defaultKeepaliveMinTime = config.Duration(time.Second * 300)
type GRPCEnforcementPolicy struct {
PermitKeepaliveWithoutCalls bool `toml:"permit_keepalive_without_calls"`
KeepaliveMinTime config.Duration `toml:"keepalive_minimum_time"`
}
// CiscoTelemetryMDT plugin for IOS XR, IOS XE and NXOS platforms // CiscoTelemetryMDT plugin for IOS XR, IOS XE and NXOS platforms
type CiscoTelemetryMDT struct { type CiscoTelemetryMDT struct {
// Common configuration // Common configuration
@ -58,7 +53,7 @@ type CiscoTelemetryMDT struct {
Aliases map[string]string `toml:"aliases"` Aliases map[string]string `toml:"aliases"`
Dmes map[string]string `toml:"dmes"` Dmes map[string]string `toml:"dmes"`
EmbeddedTags []string `toml:"embedded_tags"` EmbeddedTags []string `toml:"embedded_tags"`
EnforcementPolicy GRPCEnforcementPolicy `toml:"grpc_enforcement_policy"` EnforcementPolicy grpcEnforcementPolicy `toml:"grpc_enforcement_policy"`
IncludeDeleteField bool `toml:"include_delete_field"` IncludeDeleteField bool `toml:"include_delete_field"`
SourceFieldName string `toml:"source_field_name"` SourceFieldName string `toml:"source_field_name"`
@ -86,7 +81,12 @@ type CiscoTelemetryMDT struct {
mdtdialout.UnimplementedGRPCMdtDialoutServer mdtdialout.UnimplementedGRPCMdtDialoutServer
} }
type NxPayloadXfromStructure struct { type grpcEnforcementPolicy struct {
PermitKeepaliveWithoutCalls bool `toml:"permit_keepalive_without_calls"`
KeepaliveMinTime config.Duration `toml:"keepalive_minimum_time"`
}
type nxPayloadXfromStructure struct {
Name string `json:"Name"` Name string `json:"Name"`
Prop []struct { Prop []struct {
Key string `json:"Key"` Key string `json:"Key"`
@ -142,7 +142,7 @@ func (c *CiscoTelemetryMDT) Start(acc telegraf.Accumulator) error {
continue continue
} }
var jsStruct NxPayloadXfromStructure var jsStruct nxPayloadXfromStructure
err := json.Unmarshal([]byte(dmeKey), &jsStruct) err := json.Unmarshal([]byte(dmeKey), &jsStruct)
if err != nil { if err != nil {
continue continue
@ -218,7 +218,67 @@ func (c *CiscoTelemetryMDT) Start(acc telegraf.Accumulator) error {
return nil return nil
} }
// AcceptTCPDialoutClients defines the TCP dialout server main routine func (c *CiscoTelemetryMDT) Gather(_ telegraf.Accumulator) error {
return nil
}
// Stop listener and cleanup
func (c *CiscoTelemetryMDT) Stop() {
if c.grpcServer != nil {
// Stop server and terminate all running dialout routines
c.grpcServer.Stop()
}
if c.listener != nil {
c.listener.Close()
}
c.wg.Wait()
}
// MdtDialout RPC server method for grpc-dialout transport
func (c *CiscoTelemetryMDT) MdtDialout(stream mdtdialout.GRPCMdtDialout_MdtDialoutServer) error {
peerInCtx, peerOK := peer.FromContext(stream.Context())
if peerOK {
c.Log.Debugf("Accepted Cisco MDT GRPC dialout connection from %s", peerInCtx.Addr)
}
var chunkBuffer bytes.Buffer
for {
packet, err := stream.Recv()
if err != nil {
if !errors.Is(err, io.EOF) {
c.acc.AddError(fmt.Errorf("receive error during GRPC dialout: %w", err))
}
break
}
if len(packet.Data) == 0 && len(packet.Errors) != 0 {
c.acc.AddError(fmt.Errorf("error during GRPC dialout: %s", packet.Errors))
break
}
// Reassemble chunked telemetry data received from NX-OS
if packet.TotalSize == 0 {
c.handleTelemetry(packet.Data)
} else if int(packet.TotalSize) <= c.MaxMsgSize {
chunkBuffer.Write(packet.Data)
if chunkBuffer.Len() >= int(packet.TotalSize) {
c.handleTelemetry(chunkBuffer.Bytes())
chunkBuffer.Reset()
}
} else {
c.acc.AddError(fmt.Errorf("dropped too large packet: %dB > %dB", packet.TotalSize, c.MaxMsgSize))
}
}
if peerOK {
c.Log.Debugf("Closed Cisco MDT GRPC dialout connection from %s", peerInCtx.Addr)
}
return nil
}
// acceptTCPClients defines the TCP dialout server main routine
func (c *CiscoTelemetryMDT) acceptTCPClients() { func (c *CiscoTelemetryMDT) acceptTCPClients() {
// Keep track of all active connections, so we can close them if necessary // Keep track of all active connections, so we can close them if necessary
var mutex sync.Mutex var mutex sync.Mutex
@ -311,50 +371,6 @@ func (c *CiscoTelemetryMDT) handleTCPClient(conn net.Conn) error {
} }
} }
// MdtDialout RPC server method for grpc-dialout transport
func (c *CiscoTelemetryMDT) MdtDialout(stream mdtdialout.GRPCMdtDialout_MdtDialoutServer) error {
peerInCtx, peerOK := peer.FromContext(stream.Context())
if peerOK {
c.Log.Debugf("Accepted Cisco MDT GRPC dialout connection from %s", peerInCtx.Addr)
}
var chunkBuffer bytes.Buffer
for {
packet, err := stream.Recv()
if err != nil {
if !errors.Is(err, io.EOF) {
c.acc.AddError(fmt.Errorf("receive error during GRPC dialout: %w", err))
}
break
}
if len(packet.Data) == 0 && len(packet.Errors) != 0 {
c.acc.AddError(fmt.Errorf("error during GRPC dialout: %s", packet.Errors))
break
}
// Reassemble chunked telemetry data received from NX-OS
if packet.TotalSize == 0 {
c.handleTelemetry(packet.Data)
} else if int(packet.TotalSize) <= c.MaxMsgSize {
chunkBuffer.Write(packet.Data)
if chunkBuffer.Len() >= int(packet.TotalSize) {
c.handleTelemetry(chunkBuffer.Bytes())
chunkBuffer.Reset()
}
} else {
c.acc.AddError(fmt.Errorf("dropped too large packet: %dB > %dB", packet.TotalSize, c.MaxMsgSize))
}
}
if peerOK {
c.Log.Debugf("Closed Cisco MDT GRPC dialout connection from %s", peerInCtx.Addr)
}
return nil
}
// Handle telemetry packet from any transport, decode and add as measurement // Handle telemetry packet from any transport, decode and add as measurement
func (c *CiscoTelemetryMDT) handleTelemetry(data []byte) { func (c *CiscoTelemetryMDT) handleTelemetry(data []byte) {
msg := &telemetry.Telemetry{} msg := &telemetry.Telemetry{}
@ -782,27 +798,6 @@ func (c *CiscoTelemetryMDT) parseContentField(
delete(tags, prefix) delete(tags, prefix)
} }
func (c *CiscoTelemetryMDT) Address() net.Addr {
return c.listener.Addr()
}
// Stop listener and cleanup
func (c *CiscoTelemetryMDT) Stop() {
if c.grpcServer != nil {
// Stop server and terminate all running dialout routines
c.grpcServer.Stop()
}
if c.listener != nil {
c.listener.Close()
}
c.wg.Wait()
}
// Gather plugin measurements (unused)
func (c *CiscoTelemetryMDT) Gather(_ telegraf.Accumulator) error {
return nil
}
func init() { func init() {
inputs.Add("cisco_telemetry_mdt", func() telegraf.Input { inputs.Add("cisco_telemetry_mdt", func() telegraf.Input {
return &CiscoTelemetryMDT{ return &CiscoTelemetryMDT{

View File

@ -979,7 +979,7 @@ func TestTCPDialoutOverflow(t *testing.T) {
MsgLen uint32 MsgLen uint32
}{MsgLen: uint32(1000000000)} }{MsgLen: uint32(1000000000)}
addr := c.Address() addr := c.listener.Addr()
conn, err := net.Dial(addr.Network(), addr.String()) conn, err := net.Dial(addr.Network(), addr.String())
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, binary.Write(conn, binary.BigEndian, hdr)) require.NoError(t, binary.Write(conn, binary.BigEndian, hdr))
@ -1104,7 +1104,7 @@ func TestTCPDialoutMultiple(t *testing.T) {
MsgLen uint32 MsgLen uint32
}{} }{}
addr := c.Address() addr := c.listener.Addr()
conn, err := net.Dial(addr.Network(), addr.String()) conn, err := net.Dial(addr.Network(), addr.String())
require.NoError(t, err) require.NoError(t, err)
@ -1186,7 +1186,7 @@ func TestGRPCDialoutError(t *testing.T) {
err := c.Start(acc) err := c.Start(acc)
require.NoError(t, err) require.NoError(t, err)
addr := c.Address() addr := c.listener.Addr()
conn, err := grpc.NewClient(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials())) conn, err := grpc.NewClient(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err) require.NoError(t, err)
client := mdtdialout.NewGRPCMdtDialoutClient(conn) client := mdtdialout.NewGRPCMdtDialoutClient(conn)
@ -1220,7 +1220,7 @@ func TestGRPCDialoutMultiple(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
tel := mockTelemetryMessage() tel := mockTelemetryMessage()
addr := c.Address() addr := c.listener.Addr()
conn, err := grpc.NewClient(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials())) conn, err := grpc.NewClient(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err) require.NoError(t, err)
require.True(t, conn.WaitForStateChange(context.Background(), connectivity.Connecting)) require.True(t, conn.WaitForStateChange(context.Background(), connectivity.Connecting))
@ -1297,7 +1297,7 @@ func TestGRPCDialoutKeepalive(t *testing.T) {
Log: testutil.Logger{}, Log: testutil.Logger{},
Transport: "grpc", Transport: "grpc",
ServiceAddress: "127.0.0.1:0", ServiceAddress: "127.0.0.1:0",
EnforcementPolicy: GRPCEnforcementPolicy{ EnforcementPolicy: grpcEnforcementPolicy{
PermitKeepaliveWithoutCalls: true, PermitKeepaliveWithoutCalls: true,
KeepaliveMinTime: 0, KeepaliveMinTime: 0,
}, },
@ -1306,7 +1306,7 @@ func TestGRPCDialoutKeepalive(t *testing.T) {
err := c.Start(acc) err := c.Start(acc)
require.NoError(t, err) require.NoError(t, err)
addr := c.Address() addr := c.listener.Addr()
conn, err := grpc.NewClient(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials())) conn, err := grpc.NewClient(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err) require.NoError(t, err)
client := mdtdialout.NewGRPCMdtDialoutClient(conn) client := mdtdialout.NewGRPCMdtDialoutClient(conn)

View File

@ -26,26 +26,6 @@ var sampleConfig string
var defaultTimeout = 5 * time.Second var defaultTimeout = 5 * time.Second
type connect struct {
Cluster string `json:"cluster"`
ShardNum int `json:"shard_num"`
Hostname string `json:"host_name"`
url *url.URL
}
func init() {
inputs.Add("clickhouse", func() telegraf.Input {
return &ClickHouse{
AutoDiscovery: true,
ClientConfig: tls.ClientConfig{
InsecureSkipVerify: false,
},
Timeout: config.Duration(defaultTimeout),
}
})
}
// ClickHouse Telegraf Input Plugin
type ClickHouse struct { type ClickHouse struct {
Username string `toml:"username"` Username string `toml:"username"`
Password string `toml:"password"` Password string `toml:"password"`
@ -60,6 +40,13 @@ type ClickHouse struct {
tls.ClientConfig tls.ClientConfig
} }
type connect struct {
Cluster string `json:"cluster"`
ShardNum int `json:"shard_num"`
Hostname string `json:"host_name"`
url *url.URL
}
func (*ClickHouse) SampleConfig() string { func (*ClickHouse) SampleConfig() string {
return sampleConfig return sampleConfig
} }
@ -638,3 +625,15 @@ var commonMetricsIsFloat = map[string]bool{
} }
var _ telegraf.ServiceInput = &ClickHouse{} var _ telegraf.ServiceInput = &ClickHouse{}
func init() {
inputs.Add("clickhouse", func() telegraf.Input {
return &ClickHouse{
AutoDiscovery: true,
ClientConfig: tls.ClientConfig{
InsecureSkipVerify: false,
},
Timeout: config.Duration(defaultTimeout),
}
})
}

View File

@ -25,11 +25,10 @@ var sampleConfig string
var once sync.Once var once sync.Once
type empty struct{} const (
type semaphore chan empty defaultMaxUndeliveredMessages = 1000
defaultRetryDelaySeconds = 5
const defaultMaxUndeliveredMessages = 1000 )
const defaultRetryDelaySeconds = 5
type PubSub struct { type PubSub struct {
sync.Mutex sync.Mutex
@ -70,12 +69,41 @@ type PubSub struct {
decoderMutex sync.Mutex decoderMutex sync.Mutex
} }
type (
empty struct{}
semaphore chan empty
)
func (*PubSub) SampleConfig() string { func (*PubSub) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// Gather does nothing for this service input. func (ps *PubSub) Init() error {
func (ps *PubSub) Gather(_ telegraf.Accumulator) error { if ps.Subscription == "" {
return errors.New(`"subscription" is required`)
}
if ps.Project == "" {
return errors.New(`"project" is required`)
}
switch ps.ContentEncoding {
case "", "identity":
ps.ContentEncoding = "identity"
case "gzip":
var err error
var options []internal.DecodingOption
if ps.MaxDecompressionSize > 0 {
options = append(options, internal.WithMaxDecompressionSize(int64(ps.MaxDecompressionSize)))
}
ps.decoder, err = internal.NewContentDecoder(ps.ContentEncoding, options...)
if err != nil {
return err
}
default:
return fmt.Errorf("invalid value %q for content_encoding", ps.ContentEncoding)
}
return nil return nil
} }
@ -123,6 +151,11 @@ func (ps *PubSub) Start(ac telegraf.Accumulator) error {
return nil return nil
} }
// Gather does nothing for this service input.
func (ps *PubSub) Gather(_ telegraf.Accumulator) error {
return nil
}
// Stop ensures the PubSub subscriptions receivers are stopped by // Stop ensures the PubSub subscriptions receivers are stopped by
// canceling the context and waits for goroutines to finish. // canceling the context and waits for goroutines to finish.
func (ps *PubSub) Stop() { func (ps *PubSub) Stop() {
@ -315,35 +348,6 @@ func (ps *PubSub) getGCPSubscription(subID string) (subscription, error) {
return &gcpSubscription{s}, nil return &gcpSubscription{s}, nil
} }
func (ps *PubSub) Init() error {
if ps.Subscription == "" {
return errors.New(`"subscription" is required`)
}
if ps.Project == "" {
return errors.New(`"project" is required`)
}
switch ps.ContentEncoding {
case "", "identity":
ps.ContentEncoding = "identity"
case "gzip":
var err error
var options []internal.DecodingOption
if ps.MaxDecompressionSize > 0 {
options = append(options, internal.WithMaxDecompressionSize(int64(ps.MaxDecompressionSize)))
}
ps.decoder, err = internal.NewContentDecoder(ps.ContentEncoding, options...)
if err != nil {
return err
}
default:
return fmt.Errorf("invalid value %q for content_encoding", ps.ContentEncoding)
}
return nil
}
func init() { func init() {
inputs.Add("cloud_pubsub", func() telegraf.Input { inputs.Add("cloud_pubsub", func() telegraf.Input {
ps := &PubSub{ ps := &PubSub{

View File

@ -200,7 +200,7 @@ func TestRunInvalidMessages(t *testing.T) {
acc.WaitError(1) acc.WaitError(1)
// Make sure we acknowledged message so we don't receive it again. // Make sure we acknowledged message so we don't receive it again.
testTracker.WaitForAck(1) testTracker.waitForAck(1)
require.Equal(t, 0, acc.NFields()) require.Equal(t, 0, acc.NFields())
} }
@ -249,7 +249,7 @@ func TestRunOverlongMessages(t *testing.T) {
acc.WaitError(1) acc.WaitError(1)
// Make sure we acknowledged message so we don't receive it again. // Make sure we acknowledged message so we don't receive it again.
testTracker.WaitForAck(1) testTracker.waitForAck(1)
require.Equal(t, 0, acc.NFields()) require.Equal(t, 0, acc.NFields())
} }

View File

@ -51,11 +51,11 @@ type testMsg struct {
} }
func (tm *testMsg) Ack() { func (tm *testMsg) Ack() {
tm.tracker.Ack() tm.tracker.ack()
} }
func (tm *testMsg) Nack() { func (tm *testMsg) Nack() {
tm.tracker.Nack() tm.tracker.nack()
} }
func (tm *testMsg) ID() string { func (tm *testMsg) ID() string {
@ -82,7 +82,7 @@ type testTracker struct {
numNacks int numNacks int
} }
func (t *testTracker) WaitForAck(num int) { func (t *testTracker) waitForAck(num int) {
t.Lock() t.Lock()
if t.Cond == nil { if t.Cond == nil {
t.Cond = sync.NewCond(&t.Mutex) t.Cond = sync.NewCond(&t.Mutex)
@ -93,25 +93,14 @@ func (t *testTracker) WaitForAck(num int) {
t.Unlock() t.Unlock()
} }
func (t *testTracker) WaitForNack(num int) { func (t *testTracker) ack() {
t.Lock()
if t.Cond == nil {
t.Cond = sync.NewCond(&t.Mutex)
}
for t.numNacks < num {
t.Wait()
}
t.Unlock()
}
func (t *testTracker) Ack() {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
t.numAcks++ t.numAcks++
} }
func (t *testTracker) Nack() { func (t *testTracker) nack() {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()

View File

@ -26,9 +26,11 @@ var once sync.Once
// defaultMaxBodySize is the default maximum request body size, in bytes. // defaultMaxBodySize is the default maximum request body size, in bytes.
// if the request body is over this size, we will return an HTTP 413 error. // if the request body is over this size, we will return an HTTP 413 error.
// 500 MB const (
const defaultMaxBodySize = 500 * 1024 * 1024 // 500 MB
const defaultMaxUndeliveredMessages = 1000 defaultMaxBodySize = 500 * 1024 * 1024
defaultMaxUndeliveredMessages = 1000
)
type PubSubPush struct { type PubSubPush struct {
ServiceAddress string ServiceAddress string
@ -56,15 +58,15 @@ type PubSubPush struct {
sem chan struct{} sem chan struct{}
} }
// Message defines the structure of a Google Pub/Sub message. // message defines the structure of a Google Pub/Sub message.
type Message struct { type message struct {
Atts map[string]string `json:"attributes"` Atts map[string]string `json:"attributes"`
Data string `json:"data"` // Data is base64 encoded data Data string `json:"data"` // Data is base64 encoded data
} }
// Payload is the received Google Pub/Sub data. (https://cloud.google.com/pubsub/docs/push) // payload is the received Google Pub/Sub data. (https://cloud.google.com/pubsub/docs/push)
type Payload struct { type payload struct {
Msg Message `json:"message"` Msg message `json:"message"`
Subscription string `json:"subscription"` Subscription string `json:"subscription"`
} }
@ -72,10 +74,6 @@ func (*PubSubPush) SampleConfig() string {
return sampleConfig return sampleConfig
} }
func (p *PubSubPush) Gather(_ telegraf.Accumulator) error {
return nil
}
func (p *PubSubPush) SetParser(parser telegraf.Parser) { func (p *PubSubPush) SetParser(parser telegraf.Parser) {
p.Parser = parser p.Parser = parser
} }
@ -135,6 +133,10 @@ func (p *PubSubPush) Start(acc telegraf.Accumulator) error {
return nil return nil
} }
func (p *PubSubPush) Gather(_ telegraf.Accumulator) error {
return nil
}
// Stop cleans up all resources // Stop cleans up all resources
func (p *PubSubPush) Stop() { func (p *PubSubPush) Stop() {
p.cancel() p.cancel()
@ -144,9 +146,9 @@ func (p *PubSubPush) Stop() {
func (p *PubSubPush) ServeHTTP(res http.ResponseWriter, req *http.Request) { func (p *PubSubPush) ServeHTTP(res http.ResponseWriter, req *http.Request) {
if req.URL.Path == p.Path { if req.URL.Path == p.Path {
p.AuthenticateIfSet(p.serveWrite, res, req) p.authenticateIfSet(p.serveWrite, res, req)
} else { } else {
p.AuthenticateIfSet(http.NotFound, res, req) p.authenticateIfSet(http.NotFound, res, req)
} }
} }
@ -180,7 +182,7 @@ func (p *PubSubPush) serveWrite(res http.ResponseWriter, req *http.Request) {
return return
} }
var payload Payload var payload payload
if err = json.Unmarshal(bytes, &payload); err != nil { if err = json.Unmarshal(bytes, &payload); err != nil {
p.Log.Errorf("Error decoding payload %s", err.Error()) p.Log.Errorf("Error decoding payload %s", err.Error())
res.WriteHeader(http.StatusBadRequest) res.WriteHeader(http.StatusBadRequest)
@ -262,7 +264,7 @@ func (p *PubSubPush) receiveDelivered() {
} }
} }
func (p *PubSubPush) AuthenticateIfSet(handler http.HandlerFunc, res http.ResponseWriter, req *http.Request) { func (p *PubSubPush) authenticateIfSet(handler http.HandlerFunc, res http.ResponseWriter, req *http.Request) {
if p.Token != "" { if p.Token != "" {
if subtle.ConstantTimeCompare([]byte(req.FormValue("token")), []byte(p.Token)) != 1 { if subtle.ConstantTimeCompare([]byte(req.FormValue("token")), []byte(p.Token)) != 1 {
http.Error(res, "Unauthorized.", http.StatusUnauthorized) http.Error(res, "Unauthorized.", http.StatusUnauthorized)

View File

@ -39,18 +39,18 @@ type CloudWatch struct {
proxy.HTTPProxy proxy.HTTPProxy
Period config.Duration `toml:"period"` Period config.Duration `toml:"period"`
Delay config.Duration `toml:"delay"` Delay config.Duration `toml:"delay"`
Namespace string `toml:"namespace" deprecated:"1.25.0;1.35.0;use 'namespaces' instead"` Namespace string `toml:"namespace" deprecated:"1.25.0;1.35.0;use 'namespaces' instead"`
Namespaces []string `toml:"namespaces"` Namespaces []string `toml:"namespaces"`
Metrics []*Metric `toml:"metrics"` Metrics []*cloudwatchMetric `toml:"metrics"`
CacheTTL config.Duration `toml:"cache_ttl"` CacheTTL config.Duration `toml:"cache_ttl"`
RateLimit int `toml:"ratelimit"` RateLimit int `toml:"ratelimit"`
RecentlyActive string `toml:"recently_active"` RecentlyActive string `toml:"recently_active"`
BatchSize int `toml:"batch_size"` BatchSize int `toml:"batch_size"`
IncludeLinkedAccounts bool `toml:"include_linked_accounts"` IncludeLinkedAccounts bool `toml:"include_linked_accounts"`
MetricFormat string `toml:"metric_format"` MetricFormat string `toml:"metric_format"`
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
client cloudwatchClient client cloudwatchClient
statFilter filter.Filter statFilter filter.Filter
@ -62,16 +62,16 @@ type CloudWatch struct {
common_aws.CredentialConfig common_aws.CredentialConfig
} }
// Metric defines a simplified Cloudwatch metric. // cloudwatchMetric defines a simplified Cloudwatch metric.
type Metric struct { type cloudwatchMetric struct {
StatisticExclude *[]string `toml:"statistic_exclude"` StatisticExclude *[]string `toml:"statistic_exclude"`
StatisticInclude *[]string `toml:"statistic_include"` StatisticInclude *[]string `toml:"statistic_include"`
MetricNames []string `toml:"names"` MetricNames []string `toml:"names"`
Dimensions []*Dimension `toml:"dimensions"` Dimensions []*dimension `toml:"dimensions"`
} }
// Dimension defines a simplified Cloudwatch dimension (provides metric filtering). // dimension defines a simplified Cloudwatch dimension (provides metric filtering).
type Dimension struct { type dimension struct {
Name string `toml:"name"` Name string `toml:"name"`
Value string `toml:"value"` Value string `toml:"value"`
valueMatcher filter.Filter valueMatcher filter.Filter
@ -121,8 +121,6 @@ func (c *CloudWatch) Init() error {
return nil return nil
} }
// Gather takes in an accumulator and adds the metrics that the Input
// gathers. This is called every "interval".
func (c *CloudWatch) Gather(acc telegraf.Accumulator) error { func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {
filteredMetrics, err := getFilteredMetrics(c) filteredMetrics, err := getFilteredMetrics(c)
if err != nil { if err != nil {
@ -212,7 +210,7 @@ func (c *CloudWatch) initializeCloudWatch() error {
} }
}) })
// Initialize regex matchers for each Dimension value. // Initialize regex matchers for each dimension value.
for _, m := range c.Metrics { for _, m := range c.Metrics {
for _, dimension := range m.Dimensions { for _, dimension := range m.Dimensions {
matcher, err := filter.NewIncludeExcludeFilter([]string{dimension.Value}, nil) matcher, err := filter.NewIncludeExcludeFilter([]string{dimension.Value}, nil)
@ -494,22 +492,6 @@ func (c *CloudWatch) aggregateMetrics(acc telegraf.Accumulator, metricDataResult
} }
} }
func init() {
inputs.Add("cloudwatch", func() telegraf.Input {
return New()
})
}
// New instance of the cloudwatch plugin
func New() *CloudWatch {
return &CloudWatch{
CacheTTL: config.Duration(time.Hour),
RateLimit: 25,
Timeout: config.Duration(time.Second * 5),
BatchSize: 500,
}
}
func sanitizeMeasurement(namespace string) string { func sanitizeMeasurement(namespace string) string {
namespace = strings.ReplaceAll(namespace, "/", "_") namespace = strings.ReplaceAll(namespace, "/", "_")
namespace = snakeCase(namespace) namespace = snakeCase(namespace)
@ -545,7 +527,7 @@ func (f *metricCache) isValid() bool {
return f.metrics != nil && time.Since(f.built) < f.ttl return f.metrics != nil && time.Since(f.built) < f.ttl
} }
func hasWildcard(dimensions []*Dimension) bool { func hasWildcard(dimensions []*dimension) bool {
for _, d := range dimensions { for _, d := range dimensions {
if d.Value == "" || strings.ContainsAny(d.Value, "*?[") { if d.Value == "" || strings.ContainsAny(d.Value, "*?[") {
return true return true
@ -554,7 +536,7 @@ func hasWildcard(dimensions []*Dimension) bool {
return false return false
} }
func isSelected(name string, cloudwatchMetric types.Metric, dimensions []*Dimension) bool { func isSelected(name string, cloudwatchMetric types.Metric, dimensions []*dimension) bool {
if name != *cloudwatchMetric.MetricName { if name != *cloudwatchMetric.MetricName {
return false return false
} }
@ -576,3 +558,18 @@ func isSelected(name string, cloudwatchMetric types.Metric, dimensions []*Dimens
} }
return true return true
} }
func newCloudWatch() *CloudWatch {
return &CloudWatch{
CacheTTL: config.Duration(time.Hour),
RateLimit: 25,
Timeout: config.Duration(time.Second * 5),
BatchSize: 500,
}
}
func init() {
inputs.Add("cloudwatch", func() telegraf.Input {
return newCloudWatch()
})
}

View File

@ -379,10 +379,10 @@ func TestSelectMetrics(t *testing.T) {
Period: internalDuration, Period: internalDuration,
RateLimit: 200, RateLimit: 200,
BatchSize: 500, BatchSize: 500,
Metrics: []*Metric{ Metrics: []*cloudwatchMetric{
{ {
MetricNames: []string{"Latency", "RequestCount"}, MetricNames: []string{"Latency", "RequestCount"},
Dimensions: []*Dimension{ Dimensions: []*dimension{
{ {
Name: "LoadBalancerName", Name: "LoadBalancerName",
Value: "lb*", Value: "lb*",

View File

@ -55,7 +55,7 @@ type CloudWatchMetricStreams struct {
acc telegraf.Accumulator acc telegraf.Accumulator
} }
type Request struct { type request struct {
RequestID string `json:"requestId"` RequestID string `json:"requestId"`
Timestamp int64 `json:"timestamp"` Timestamp int64 `json:"timestamp"`
Records []struct { Records []struct {
@ -63,7 +63,7 @@ type Request struct {
} `json:"records"` } `json:"records"`
} }
type Data struct { type data struct {
MetricStreamName string `json:"metric_stream_name"` MetricStreamName string `json:"metric_stream_name"`
AccountID string `json:"account_id"` AccountID string `json:"account_id"`
Region string `json:"region"` Region string `json:"region"`
@ -75,7 +75,7 @@ type Data struct {
Unit string `json:"unit"` Unit string `json:"unit"`
} }
type Response struct { type response struct {
RequestID string `json:"requestId"` RequestID string `json:"requestId"`
Timestamp int64 `json:"timestamp"` Timestamp int64 `json:"timestamp"`
} }
@ -89,29 +89,28 @@ func (*CloudWatchMetricStreams) SampleConfig() string {
return sampleConfig return sampleConfig
} }
func (a *age) Record(t time.Duration) { func (cms *CloudWatchMetricStreams) Init() error {
if t > a.max { tags := map[string]string{
a.max = t "address": cms.ServiceAddress,
}
cms.requestsReceived = selfstat.Register("cloudwatch_metric_streams", "requests_received", tags)
cms.writesServed = selfstat.Register("cloudwatch_metric_streams", "writes_served", tags)
cms.requestTime = selfstat.Register("cloudwatch_metric_streams", "request_time", tags)
cms.ageMax = selfstat.Register("cloudwatch_metric_streams", "age_max", tags)
cms.ageMin = selfstat.Register("cloudwatch_metric_streams", "age_min", tags)
if cms.MaxBodySize == 0 {
cms.MaxBodySize = config.Size(defaultMaxBodySize)
} }
if t < a.min { if cms.ReadTimeout < config.Duration(time.Second) {
a.min = t cms.ReadTimeout = config.Duration(time.Second * 10)
} }
}
func (a *age) SubmitMax(stat selfstat.Stat) { if cms.WriteTimeout < config.Duration(time.Second) {
stat.Incr(a.max.Nanoseconds()) cms.WriteTimeout = config.Duration(time.Second * 10)
} }
func (a *age) SubmitMin(stat selfstat.Stat) {
stat.Incr(a.min.Nanoseconds())
}
func (cms *CloudWatchMetricStreams) Description() string {
return "HTTP listener & parser for AWS Metric Streams"
}
func (cms *CloudWatchMetricStreams) Gather(_ telegraf.Accumulator) error {
return nil return nil
} }
@ -150,13 +149,15 @@ func (cms *CloudWatchMetricStreams) Start(acc telegraf.Accumulator) error {
return nil return nil
} }
func (cms *CloudWatchMetricStreams) createHTTPServer() *http.Server { func (cms *CloudWatchMetricStreams) Gather(_ telegraf.Accumulator) error {
return &http.Server{ return nil
Addr: cms.ServiceAddress, }
Handler: cms,
ReadTimeout: time.Duration(cms.ReadTimeout), func (cms *CloudWatchMetricStreams) Stop() {
WriteTimeout: time.Duration(cms.WriteTimeout), if cms.listener != nil {
cms.listener.Close()
} }
cms.wg.Wait()
} }
func (cms *CloudWatchMetricStreams) ServeHTTP(res http.ResponseWriter, req *http.Request) { func (cms *CloudWatchMetricStreams) ServeHTTP(res http.ResponseWriter, req *http.Request) {
@ -173,6 +174,33 @@ func (cms *CloudWatchMetricStreams) ServeHTTP(res http.ResponseWriter, req *http
cms.authenticateIfSet(handler, res, req) cms.authenticateIfSet(handler, res, req)
} }
func (a *age) record(t time.Duration) {
if t > a.max {
a.max = t
}
if t < a.min {
a.min = t
}
}
func (a *age) submitMax(stat selfstat.Stat) {
stat.Incr(a.max.Nanoseconds())
}
func (a *age) submitMin(stat selfstat.Stat) {
stat.Incr(a.min.Nanoseconds())
}
func (cms *CloudWatchMetricStreams) createHTTPServer() *http.Server {
return &http.Server{
Addr: cms.ServiceAddress,
Handler: cms,
ReadTimeout: time.Duration(cms.ReadTimeout),
WriteTimeout: time.Duration(cms.WriteTimeout),
}
}
func (cms *CloudWatchMetricStreams) recordRequestTime(start time.Time) { func (cms *CloudWatchMetricStreams) recordRequestTime(start time.Time) {
elapsed := time.Since(start) elapsed := time.Since(start)
cms.requestTime.Incr(elapsed.Nanoseconds()) cms.requestTime.Incr(elapsed.Nanoseconds())
@ -224,7 +252,7 @@ func (cms *CloudWatchMetricStreams) serveWrite(res http.ResponseWriter, req *htt
} }
// Decode the request // Decode the request
var r Request var r request
err := json.NewDecoder(body).Decode(&r) err := json.NewDecoder(body).Decode(&r)
if err != nil { if err != nil {
cms.Log.Errorf("unable to decode metric-streams request: %v", err) cms.Log.Errorf("unable to decode metric-streams request: %v", err)
@ -235,10 +263,10 @@ func (cms *CloudWatchMetricStreams) serveWrite(res http.ResponseWriter, req *htt
} }
agesInRequest := &age{max: 0, min: math.MaxInt32} agesInRequest := &age{max: 0, min: math.MaxInt32}
defer agesInRequest.SubmitMax(cms.ageMax) defer agesInRequest.submitMax(cms.ageMax)
defer agesInRequest.SubmitMin(cms.ageMin) defer agesInRequest.submitMin(cms.ageMin)
// For each record, decode the base64 data and store it in a Data struct // For each record, decode the base64 data and store it in a data struct
// Metrics from Metric Streams are Base64 encoded JSON // Metrics from Metric Streams are Base64 encoded JSON
// https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html // https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html
for _, record := range r.Records { for _, record := range r.Records {
@ -261,7 +289,7 @@ func (cms *CloudWatchMetricStreams) serveWrite(res http.ResponseWriter, req *htt
} }
for _, js := range list { for _, js := range list {
var d Data var d data
err = json.Unmarshal([]byte(js), &d) err = json.Unmarshal([]byte(js), &d)
if err != nil { if err != nil {
cms.Log.Errorf("unable to unmarshal metric-streams data: %v", err) cms.Log.Errorf("unable to unmarshal metric-streams data: %v", err)
@ -271,13 +299,13 @@ func (cms *CloudWatchMetricStreams) serveWrite(res http.ResponseWriter, req *htt
return return
} }
cms.composeMetrics(d) cms.composeMetrics(d)
agesInRequest.Record(time.Since(time.Unix(d.Timestamp/1000, 0))) agesInRequest.record(time.Since(time.Unix(d.Timestamp/1000, 0)))
} }
} }
// Compose the response to AWS using the request's requestId // Compose the response to AWS using the request's requestId
// https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html#responseformat // https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html#responseformat
response := Response{ response := response{
RequestID: r.RequestID, RequestID: r.RequestID,
Timestamp: time.Now().UnixNano() / 1000000, Timestamp: time.Now().UnixNano() / 1000000,
} }
@ -300,7 +328,7 @@ func (cms *CloudWatchMetricStreams) serveWrite(res http.ResponseWriter, req *htt
} }
} }
func (cms *CloudWatchMetricStreams) composeMetrics(data Data) { func (cms *CloudWatchMetricStreams) composeMetrics(data data) {
fields := make(map[string]interface{}) fields := make(map[string]interface{})
tags := make(map[string]string) tags := make(map[string]string)
timestamp := time.Unix(data.Timestamp/1000, 0) timestamp := time.Unix(data.Timestamp/1000, 0)
@ -386,39 +414,6 @@ func (cms *CloudWatchMetricStreams) authenticateIfSet(handler http.HandlerFunc,
} }
} }
// Stop cleans up all resources
func (cms *CloudWatchMetricStreams) Stop() {
if cms.listener != nil {
cms.listener.Close()
}
cms.wg.Wait()
}
func (cms *CloudWatchMetricStreams) Init() error {
tags := map[string]string{
"address": cms.ServiceAddress,
}
cms.requestsReceived = selfstat.Register("cloudwatch_metric_streams", "requests_received", tags)
cms.writesServed = selfstat.Register("cloudwatch_metric_streams", "writes_served", tags)
cms.requestTime = selfstat.Register("cloudwatch_metric_streams", "request_time", tags)
cms.ageMax = selfstat.Register("cloudwatch_metric_streams", "age_max", tags)
cms.ageMin = selfstat.Register("cloudwatch_metric_streams", "age_min", tags)
if cms.MaxBodySize == 0 {
cms.MaxBodySize = config.Size(defaultMaxBodySize)
}
if cms.ReadTimeout < config.Duration(time.Second) {
cms.ReadTimeout = config.Duration(time.Second * 10)
}
if cms.WriteTimeout < config.Duration(time.Second) {
cms.WriteTimeout = config.Duration(time.Second * 10)
}
return nil
}
func init() { func init() {
inputs.Add("cloudwatch_metric_streams", func() telegraf.Input { inputs.Add("cloudwatch_metric_streams", func() telegraf.Input {
return &CloudWatchMetricStreams{ return &CloudWatchMetricStreams{

View File

@ -304,8 +304,8 @@ func TestComposeMetrics(t *testing.T) {
require.NoError(t, metricStream.Start(acc)) require.NoError(t, metricStream.Start(acc))
defer metricStream.Stop() defer metricStream.Stop()
// compose a Data object for writing // compose a data object for writing
data := Data{ data := data{
MetricStreamName: "cloudwatch-metric-stream", MetricStreamName: "cloudwatch-metric-stream",
AccountID: "546734499701", AccountID: "546734499701",
Region: "us-west-2", Region: "us-west-2",
@ -335,8 +335,8 @@ func TestComposeAPICompatibleMetrics(t *testing.T) {
require.NoError(t, metricStream.Start(acc)) require.NoError(t, metricStream.Start(acc))
defer metricStream.Stop() defer metricStream.Stop()
// compose a Data object for writing // compose a data object for writing
data := Data{ data := data{
MetricStreamName: "cloudwatch-metric-stream", MetricStreamName: "cloudwatch-metric-stream",
AccountID: "546734499701", AccountID: "546734499701",
Region: "us-west-2", Region: "us-west-2",

View File

@ -21,38 +21,29 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
type Conntrack struct { var (
ps system.PS dfltDirs = []string{
Path string "/proc/sys/net/ipv4/netfilter",
Dirs []string "/proc/sys/net/netfilter",
Files []string }
Collect []string
} dfltFiles = []string{
"ip_conntrack_count",
"ip_conntrack_max",
"nf_conntrack_count",
"nf_conntrack_max",
}
)
const ( const (
inputName = "conntrack" inputName = "conntrack"
) )
var dfltDirs = []string{ type Conntrack struct {
"/proc/sys/net/ipv4/netfilter", Collect []string `toml:"collect"`
"/proc/sys/net/netfilter", Dirs []string `toml:"dirs"`
} Files []string `toml:"files"`
ps system.PS
var dfltFiles = []string{
"ip_conntrack_count",
"ip_conntrack_max",
"nf_conntrack_count",
"nf_conntrack_max",
}
func (c *Conntrack) setDefaults() {
if len(c.Dirs) == 0 {
c.Dirs = dfltDirs
}
if len(c.Files) == 0 {
c.Files = dfltFiles
}
} }
func (*Conntrack) SampleConfig() string { func (*Conntrack) SampleConfig() string {
@ -154,6 +145,16 @@ func (c *Conntrack) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
func (c *Conntrack) setDefaults() {
if len(c.Dirs) == 0 {
c.Dirs = dfltDirs
}
if len(c.Files) == 0 {
c.Files = dfltFiles
}
}
func init() { func init() {
inputs.Add(inputName, func() telegraf.Input { inputs.Add(inputName, func() telegraf.Input {
return &Conntrack{ return &Conntrack{

View File

@ -17,11 +17,13 @@ type Conntrack struct {
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
} }
func (*Conntrack) SampleConfig() string { return sampleConfig }
func (c *Conntrack) Init() error { func (c *Conntrack) Init() error {
c.Log.Warn("current platform is not supported") c.Log.Warn("current platform is not supported")
return nil return nil
} }
func (*Conntrack) SampleConfig() string { return sampleConfig }
func (*Conntrack) Gather(_ telegraf.Accumulator) error { return nil } func (*Conntrack) Gather(_ telegraf.Accumulator) error { return nil }
func init() { func init() {

View File

@ -18,19 +18,19 @@ import (
var sampleConfig string var sampleConfig string
type Consul struct { type Consul struct {
Address string Address string `toml:"address"`
Scheme string Scheme string `toml:"scheme"`
Token string Token string `toml:"token"`
Username string Username string `toml:"username"`
Password string Password string `toml:"password"`
Datacentre string `toml:"datacentre" deprecated:"1.10.0;1.35.0;use 'datacenter' instead"` Datacentre string `toml:"datacentre" deprecated:"1.10.0;1.35.0;use 'datacenter' instead"`
Datacenter string Datacenter string `toml:"datacenter"`
tls.ClientConfig TagDelimiter string `toml:"tag_delimiter"`
TagDelimiter string MetricVersion int `toml:"metric_version"`
MetricVersion int
Log telegraf.Logger Log telegraf.Logger
tls.ClientConfig
// client used to connect to Consul agnet // client used to connect to Consul agent
client *api.Client client *api.Client
} }
@ -91,7 +91,19 @@ func (c *Consul) Init() error {
return err return err
} }
func (c *Consul) GatherHealthCheck(acc telegraf.Accumulator, checks []*api.HealthCheck) { func (c *Consul) Gather(acc telegraf.Accumulator) error {
checks, _, err := c.client.Health().State("any", nil)
if err != nil {
return err
}
c.gatherHealthCheck(acc, checks)
return nil
}
func (c *Consul) gatherHealthCheck(acc telegraf.Accumulator, checks []*api.HealthCheck) {
for _, check := range checks { for _, check := range checks {
record := make(map[string]interface{}) record := make(map[string]interface{})
tags := make(map[string]string) tags := make(map[string]string)
@ -132,18 +144,6 @@ func (c *Consul) GatherHealthCheck(acc telegraf.Accumulator, checks []*api.Healt
} }
} }
func (c *Consul) Gather(acc telegraf.Accumulator) error {
checks, _, err := c.client.Health().State("any", nil)
if err != nil {
return err
}
c.GatherHealthCheck(acc, checks)
return nil
}
func init() { func init() {
inputs.Add("consul", func() telegraf.Input { inputs.Add("consul", func() telegraf.Input {
return &Consul{} return &Consul{}

View File

@ -43,7 +43,7 @@ func TestGatherHealthCheck(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
consul := &Consul{} consul := &Consul{}
consul.GatherHealthCheck(&acc, sampleChecks) consul.gatherHealthCheck(&acc, sampleChecks)
acc.AssertContainsTaggedFields(t, "consul_health_checks", expectedFields, expectedTags) acc.AssertContainsTaggedFields(t, "consul_health_checks", expectedFields, expectedTags)
} }
@ -72,7 +72,7 @@ func TestGatherHealthCheckWithDelimitedTags(t *testing.T) {
consul := &Consul{ consul := &Consul{
TagDelimiter: ":", TagDelimiter: ":",
} }
consul.GatherHealthCheck(&acc, sampleChecks) consul.gatherHealthCheck(&acc, sampleChecks)
acc.AssertContainsTaggedFields(t, "consul_health_checks", expectedFields, expectedTags) acc.AssertContainsTaggedFields(t, "consul_health_checks", expectedFields, expectedTags)
} }
@ -101,7 +101,7 @@ func TestGatherHealthCheckV2(t *testing.T) {
consul := &Consul{ consul := &Consul{
MetricVersion: 2, MetricVersion: 2,
} }
consul.GatherHealthCheck(&acc, sampleChecks) consul.gatherHealthCheck(&acc, sampleChecks)
acc.AssertContainsTaggedFields(t, "consul_health_checks", expectedFields, expectedTags) acc.AssertContainsTaggedFields(t, "consul_health_checks", expectedFields, expectedTags)
} }
@ -131,7 +131,7 @@ func TestGatherHealthCheckWithDelimitedTagsV2(t *testing.T) {
MetricVersion: 2, MetricVersion: 2,
TagDelimiter: ":", TagDelimiter: ":",
} }
consul.GatherHealthCheck(&acc, sampleChecks) consul.gatherHealthCheck(&acc, sampleChecks)
acc.AssertContainsTaggedFields(t, "consul_health_checks", expectedFields, expectedTags) acc.AssertContainsTaggedFields(t, "consul_health_checks", expectedFields, expectedTags)
} }

View File

@ -21,7 +21,8 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
// consul_agent configuration object const timeLayout = "2006-01-02 15:04:05 -0700 MST"
type ConsulAgent struct { type ConsulAgent struct {
URL string `toml:"url"` URL string `toml:"url"`
@ -35,16 +36,6 @@ type ConsulAgent struct {
roundTripper http.RoundTripper roundTripper http.RoundTripper
} }
const timeLayout = "2006-01-02 15:04:05 -0700 MST"
func init() {
inputs.Add("consul_agent", func() telegraf.Input {
return &ConsulAgent{
ResponseTimeout: config.Duration(5 * time.Second),
}
})
}
func (*ConsulAgent) SampleConfig() string { func (*ConsulAgent) SampleConfig() string {
return sampleConfig return sampleConfig
} }
@ -80,7 +71,6 @@ func (n *ConsulAgent) Init() error {
return nil return nil
} }
// Gather, collects metrics from Consul endpoint
func (n *ConsulAgent) Gather(acc telegraf.Accumulator) error { func (n *ConsulAgent) Gather(acc telegraf.Accumulator) error {
summaryMetrics, err := n.loadJSON(n.URL + "/v1/agent/metrics") summaryMetrics, err := n.loadJSON(n.URL + "/v1/agent/metrics")
if err != nil { if err != nil {
@ -90,7 +80,7 @@ func (n *ConsulAgent) Gather(acc telegraf.Accumulator) error {
return buildConsulAgent(acc, summaryMetrics) return buildConsulAgent(acc, summaryMetrics)
} }
func (n *ConsulAgent) loadJSON(url string) (*AgentInfo, error) { func (n *ConsulAgent) loadJSON(url string) (*agentInfo, error) {
req, err := http.NewRequest("GET", url, nil) req, err := http.NewRequest("GET", url, nil)
if err != nil { if err != nil {
return nil, err return nil, err
@ -109,7 +99,7 @@ func (n *ConsulAgent) loadJSON(url string) (*AgentInfo, error) {
return nil, fmt.Errorf("%s returned HTTP status %s", url, resp.Status) return nil, fmt.Errorf("%s returned HTTP status %s", url, resp.Status)
} }
var metrics AgentInfo var metrics agentInfo
err = json.NewDecoder(resp.Body).Decode(&metrics) err = json.NewDecoder(resp.Body).Decode(&metrics)
if err != nil { if err != nil {
return nil, fmt.Errorf("error parsing json response: %w", err) return nil, fmt.Errorf("error parsing json response: %w", err)
@ -119,7 +109,7 @@ func (n *ConsulAgent) loadJSON(url string) (*AgentInfo, error) {
} }
// buildConsulAgent, it builds all the metrics and adds them to the accumulator) // buildConsulAgent, it builds all the metrics and adds them to the accumulator)
func buildConsulAgent(acc telegraf.Accumulator, agentInfo *AgentInfo) error { func buildConsulAgent(acc telegraf.Accumulator, agentInfo *agentInfo) error {
t, err := internal.ParseTimestamp(timeLayout, agentInfo.Timestamp, nil) t, err := internal.ParseTimestamp(timeLayout, agentInfo.Timestamp, nil)
if err != nil { if err != nil {
return fmt.Errorf("error parsing time: %w", err) return fmt.Errorf("error parsing time: %w", err)
@ -175,3 +165,11 @@ func buildConsulAgent(acc telegraf.Accumulator, agentInfo *AgentInfo) error {
return nil return nil
} }
func init() {
inputs.Add("consul_agent", func() telegraf.Input {
return &ConsulAgent{
ResponseTimeout: config.Duration(5 * time.Second),
}
})
}

View File

@ -1,25 +1,25 @@
package consul_agent package consul_agent
type AgentInfo struct { type agentInfo struct {
Timestamp string Timestamp string
Gauges []GaugeValue Gauges []gaugeValue
Points []PointValue Points []pointValue
Counters []SampledValue Counters []sampledValue
Samples []SampledValue Samples []sampledValue
} }
type GaugeValue struct { type gaugeValue struct {
Name string Name string
Value float32 Value float32
Labels map[string]string Labels map[string]string
} }
type PointValue struct { type pointValue struct {
Name string Name string
Points []float32 Points []float32
} }
type SampledValue struct { type sampledValue struct {
Name string Name string
Count int Count int
Sum float64 Sum float64

View File

@ -22,6 +22,8 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
var regexpURI = regexp.MustCompile(`(\S+://)?(\S+\:\S+@)`)
type Couchbase struct { type Couchbase struct {
Servers []string `toml:"servers"` Servers []string `toml:"servers"`
BucketStatsIncluded []string `toml:"bucket_stats_included"` BucketStatsIncluded []string `toml:"bucket_stats_included"`
@ -42,13 +44,40 @@ type autoFailover struct {
Timeout int `json:"timeout"` Timeout int `json:"timeout"`
} }
var regexpURI = regexp.MustCompile(`(\S+://)?(\S+\:\S+@)`)
func (*Couchbase) SampleConfig() string { func (*Couchbase) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// Reads stats from all configured clusters. Accumulates stats. func (cb *Couchbase) Init() error {
f, err := filter.NewIncludeExcludeFilter(cb.BucketStatsIncluded, []string{})
if err != nil {
return err
}
cb.bucketInclude = f
tlsConfig, err := cb.TLSConfig()
if err != nil {
return err
}
cb.client = &http.Client{
Timeout: 10 * time.Second,
Transport: &http.Transport{
MaxIdleConnsPerHost: couchbase.MaxIdleConnsPerHost,
TLSClientConfig: tlsConfig,
},
}
couchbase.SetSkipVerify(cb.ClientConfig.InsecureSkipVerify)
couchbase.SetCertFile(cb.ClientConfig.TLSCert)
couchbase.SetKeyFile(cb.ClientConfig.TLSKey)
couchbase.SetRootFile(cb.ClientConfig.TLSCA)
return nil
}
// Gather reads stats from all configured clusters. Accumulates stats.
// Returns one of the errors encountered while gathering stats (if any). // Returns one of the errors encountered while gathering stats (if any).
func (cb *Couchbase) Gather(acc telegraf.Accumulator) error { func (cb *Couchbase) Gather(acc telegraf.Accumulator) error {
if len(cb.Servers) == 0 { if len(cb.Servers) == 0 {
@ -181,7 +210,7 @@ func (cb *Couchbase) basicBucketStats(basicStats map[string]interface{}) map[str
} }
func (cb *Couchbase) gatherDetailedBucketStats(server, bucket, nodeHostname string, fields map[string]interface{}) error { func (cb *Couchbase) gatherDetailedBucketStats(server, bucket, nodeHostname string, fields map[string]interface{}) error {
extendedBucketStats := &BucketStats{} extendedBucketStats := &bucketStats{}
err := cb.queryDetailedBucketStats(server, bucket, nodeHostname, extendedBucketStats) err := cb.queryDetailedBucketStats(server, bucket, nodeHostname, extendedBucketStats)
if err != nil { if err != nil {
return err return err
@ -421,7 +450,7 @@ func (cb *Couchbase) addBucketFieldChecked(fields map[string]interface{}, fieldK
cb.addBucketField(fields, fieldKey, values[len(values)-1]) cb.addBucketField(fields, fieldKey, values[len(values)-1])
} }
func (cb *Couchbase) queryDetailedBucketStats(server, bucket, nodeHostname string, bucketStats *BucketStats) error { func (cb *Couchbase) queryDetailedBucketStats(server, bucket, nodeHostname string, bucketStats *bucketStats) error {
url := server + "/pools/default/buckets/" + bucket url := server + "/pools/default/buckets/" + bucket
if nodeHostname != "" { if nodeHostname != "" {
url += "/nodes/" + nodeHostname url += "/nodes/" + nodeHostname
@ -444,35 +473,6 @@ func (cb *Couchbase) queryDetailedBucketStats(server, bucket, nodeHostname strin
return json.NewDecoder(r.Body).Decode(bucketStats) return json.NewDecoder(r.Body).Decode(bucketStats)
} }
func (cb *Couchbase) Init() error {
f, err := filter.NewIncludeExcludeFilter(cb.BucketStatsIncluded, []string{})
if err != nil {
return err
}
cb.bucketInclude = f
tlsConfig, err := cb.TLSConfig()
if err != nil {
return err
}
cb.client = &http.Client{
Timeout: 10 * time.Second,
Transport: &http.Transport{
MaxIdleConnsPerHost: couchbase.MaxIdleConnsPerHost,
TLSClientConfig: tlsConfig,
},
}
couchbase.SetSkipVerify(cb.ClientConfig.InsecureSkipVerify)
couchbase.SetCertFile(cb.ClientConfig.TLSCert)
couchbase.SetKeyFile(cb.ClientConfig.TLSKey)
couchbase.SetRootFile(cb.ClientConfig.TLSCA)
return nil
}
func init() { func init() {
inputs.Add("couchbase", func() telegraf.Input { inputs.Add("couchbase", func() telegraf.Input {
return &Couchbase{ return &Couchbase{

View File

@ -1,6 +1,6 @@
package couchbase package couchbase
type BucketStats struct { type bucketStats struct {
Op struct { Op struct {
Samples struct { Samples struct {
CouchTotalDiskSize []float64 `json:"couch_total_disk_size"` CouchTotalDiskSize []float64 `json:"couch_total_disk_size"`

View File

@ -132,7 +132,7 @@ func TestGatherDetailedBucketMetrics(t *testing.T) {
err = cb.Init() err = cb.Init()
require.NoError(t, err) require.NoError(t, err)
var acc testutil.Accumulator var acc testutil.Accumulator
bucketStats := &BucketStats{} bucketStats := &bucketStats{}
if err := json.Unmarshal(test.response, bucketStats); err != nil { if err := json.Unmarshal(test.response, bucketStats); err != nil {
t.Fatal("parse bucketResponse", err) t.Fatal("parse bucketResponse", err)
} }

View File

@ -16,6 +16,14 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
type CouchDB struct {
Hosts []string `toml:"hosts"`
BasicUsername string `toml:"basic_username"`
BasicPassword string `toml:"basic_password"`
client *http.Client
}
type ( type (
metaData struct { metaData struct {
Current *float64 `json:"current"` Current *float64 `json:"current"`
@ -77,20 +85,12 @@ type (
ClientsRequestingChanges metaData `json:"clients_requesting_changes"` ClientsRequestingChanges metaData `json:"clients_requesting_changes"`
} }
Stats struct { stats struct {
Couchdb couchdb `json:"couchdb"` Couchdb couchdb `json:"couchdb"`
HttpdRequestMethods httpdRequestMethods `json:"httpd_request_methods"` HttpdRequestMethods httpdRequestMethods `json:"httpd_request_methods"`
HttpdStatusCodes httpdStatusCodes `json:"httpd_status_codes"` HttpdStatusCodes httpdStatusCodes `json:"httpd_status_codes"`
Httpd httpd `json:"httpd"` Httpd httpd `json:"httpd"`
} }
CouchDB struct {
Hosts []string `toml:"hosts"`
BasicUsername string `toml:"basic_username"`
BasicPassword string `toml:"basic_password"`
client *http.Client
}
) )
func (*CouchDB) SampleConfig() string { func (*CouchDB) SampleConfig() string {
@ -143,7 +143,7 @@ func (c *CouchDB) fetchAndInsertData(accumulator telegraf.Accumulator, host stri
return fmt.Errorf("failed to get stats from couchdb: HTTP responded %d", response.StatusCode) return fmt.Errorf("failed to get stats from couchdb: HTTP responded %d", response.StatusCode)
} }
stats := Stats{} stats := stats{}
decoder := json.NewDecoder(response.Body) decoder := json.NewDecoder(response.Body)
if err := decoder.Decode(&stats); err != nil { if err := decoder.Decode(&stats); err != nil {
return fmt.Errorf("failed to decode stats from couchdb: HTTP body %q", response.Body) return fmt.Errorf("failed to decode stats from couchdb: HTTP body %q", response.Body)

View File

@ -37,6 +37,25 @@ func (*CPUStats) SampleConfig() string {
return sampleConfig return sampleConfig
} }
func (c *CPUStats) Init() error {
if c.CoreTags {
cpuInfo, err := cpu.Info()
if err == nil {
c.coreID = cpuInfo[0].CoreID != ""
c.physicalID = cpuInfo[0].PhysicalID != ""
c.cpuInfo = make(map[string]cpu.InfoStat)
for _, ci := range cpuInfo {
c.cpuInfo[fmt.Sprintf("cpu%d", ci.CPU)] = ci
}
} else {
c.Log.Warnf("Failed to gather info about CPUs: %s", err)
}
}
return nil
}
func (c *CPUStats) Gather(acc telegraf.Accumulator) error { func (c *CPUStats) Gather(acc telegraf.Accumulator) error {
times, err := c.ps.CPUTimes(c.PerCPU, c.TotalCPU) times, err := c.ps.CPUTimes(c.PerCPU, c.TotalCPU)
if err != nil { if err != nil {
@ -127,25 +146,6 @@ func (c *CPUStats) Gather(acc telegraf.Accumulator) error {
return err return err
} }
func (c *CPUStats) Init() error {
if c.CoreTags {
cpuInfo, err := cpu.Info()
if err == nil {
c.coreID = cpuInfo[0].CoreID != ""
c.physicalID = cpuInfo[0].PhysicalID != ""
c.cpuInfo = make(map[string]cpu.InfoStat)
for _, ci := range cpuInfo {
c.cpuInfo[fmt.Sprintf("cpu%d", ci.CPU)] = ci
}
} else {
c.Log.Warnf("Failed to gather info about CPUs: %s", err)
}
}
return nil
}
func totalCPUTime(t cpu.TimesStat) float64 { func totalCPUTime(t cpu.TimesStat) float64 {
total := t.User + t.System + t.Nice + t.Iowait + t.Irq + t.Softirq + t.Steal + t.Idle total := t.User + t.System + t.Nice + t.Iowait + t.Irq + t.Softirq + t.Steal + t.Idle
return total return total

View File

@ -11,7 +11,7 @@ import (
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
) )
func NewCPUStats(ps system.PS) *CPUStats { func newCPUStats(ps system.PS) *CPUStats {
return &CPUStats{ return &CPUStats{
ps: ps, ps: ps,
CollectCPUTime: true, CollectCPUTime: true,
@ -54,7 +54,7 @@ func TestCPUStats(t *testing.T) {
mps.On("CPUTimes").Return([]cpu.TimesStat{cts}, nil) mps.On("CPUTimes").Return([]cpu.TimesStat{cts}, nil)
cs := NewCPUStats(&mps) cs := newCPUStats(&mps)
err := cs.Gather(&acc) err := cs.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
@ -159,7 +159,7 @@ func TestCPUCountIncrease(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
var err error var err error
cs := NewCPUStats(&mps) cs := newCPUStats(&mps)
mps.On("CPUTimes").Return( mps.On("CPUTimes").Return(
[]cpu.TimesStat{ []cpu.TimesStat{
@ -216,7 +216,7 @@ func TestCPUTimesDecrease(t *testing.T) {
mps.On("CPUTimes").Return([]cpu.TimesStat{cts}, nil) mps.On("CPUTimes").Return([]cpu.TimesStat{cts}, nil)
cs := NewCPUStats(&mps) cs := newCPUStats(&mps)
err := cs.Gather(&acc) err := cs.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)

View File

@ -58,6 +58,92 @@ type CtrlXDataLayer struct {
common_http.HTTPClientConfig common_http.HTTPClientConfig
} }
func (*CtrlXDataLayer) SampleConfig() string {
return sampleConfig
}
func (c *CtrlXDataLayer) Init() error {
// Check all configured subscriptions for valid settings
for i := range c.Subscription {
sub := &c.Subscription[i]
sub.applyDefaultSettings()
if !choice.Contains(sub.QueueBehaviour, queueBehaviours) {
c.Log.Infof("The right queue behaviour values are %v", queueBehaviours)
return fmt.Errorf("subscription %d: setting 'queue_behaviour' %q is invalid", i, sub.QueueBehaviour)
}
if !choice.Contains(sub.ValueChange, valueChanges) {
c.Log.Infof("The right value change values are %v", valueChanges)
return fmt.Errorf("subscription %d: setting 'value_change' %q is invalid", i, sub.ValueChange)
}
if len(sub.Nodes) == 0 {
c.Log.Warn("A configured subscription has no nodes configured")
}
sub.index = i
}
// Generate valid communication url based on configured server address
u := url.URL{
Scheme: "https",
Host: c.Server,
}
c.url = u.String()
if _, err := url.Parse(c.url); err != nil {
return errors.New("invalid server address")
}
return nil
}
func (c *CtrlXDataLayer) Start(acc telegraf.Accumulator) error {
var ctx context.Context
ctx, c.cancel = context.WithCancel(context.Background())
var err error
c.connection, err = c.HTTPClientConfig.CreateClient(ctx, c.Log)
if err != nil {
return fmt.Errorf("failed to create http client: %w", err)
}
username, err := c.Username.Get()
if err != nil {
return fmt.Errorf("getting username failed: %w", err)
}
password, err := c.Password.Get()
if err != nil {
username.Destroy()
return fmt.Errorf("getting password failed: %w", err)
}
c.tokenManager = token.TokenManager{
Url: c.url,
Username: username.String(),
Password: password.String(),
Connection: c.connection,
}
username.Destroy()
password.Destroy()
c.acc = acc
c.gatherLoop(ctx)
return nil
}
func (c *CtrlXDataLayer) Gather(_ telegraf.Accumulator) error {
// Metrics are sent to the accumulator asynchronously in worker thread. So nothing to do here.
return nil
}
func (c *CtrlXDataLayer) Stop() {
c.cancel()
c.wg.Wait()
if c.connection != nil {
c.connection.CloseIdleConnections()
}
}
// convertTimestamp2UnixTime converts the given Data Layer timestamp of the payload to UnixTime. // convertTimestamp2UnixTime converts the given Data Layer timestamp of the payload to UnixTime.
func convertTimestamp2UnixTime(t int64) time.Time { func convertTimestamp2UnixTime(t int64) time.Time {
// 1 sec=1000 millisec=1000000 microsec=1000000000 nanosec. // 1 sec=1000 millisec=1000000 microsec=1000000000 nanosec.
@ -238,77 +324,6 @@ func (c *CtrlXDataLayer) createMetric(em *sseEventData, sub *subscription) (tele
return nil, fmt.Errorf("unsupported value type: %s", em.Type) return nil, fmt.Errorf("unsupported value type: %s", em.Type)
} }
// Init is for setup, and validating config
func (c *CtrlXDataLayer) Init() error {
// Check all configured subscriptions for valid settings
for i := range c.Subscription {
sub := &c.Subscription[i]
sub.applyDefaultSettings()
if !choice.Contains(sub.QueueBehaviour, queueBehaviours) {
c.Log.Infof("The right queue behaviour values are %v", queueBehaviours)
return fmt.Errorf("subscription %d: setting 'queue_behaviour' %q is invalid", i, sub.QueueBehaviour)
}
if !choice.Contains(sub.ValueChange, valueChanges) {
c.Log.Infof("The right value change values are %v", valueChanges)
return fmt.Errorf("subscription %d: setting 'value_change' %q is invalid", i, sub.ValueChange)
}
if len(sub.Nodes) == 0 {
c.Log.Warn("A configured subscription has no nodes configured")
}
sub.index = i
}
// Generate valid communication url based on configured server address
u := url.URL{
Scheme: "https",
Host: c.Server,
}
c.url = u.String()
if _, err := url.Parse(c.url); err != nil {
return errors.New("invalid server address")
}
return nil
}
// Start input as service, retain the accumulator, establish the connection.
func (c *CtrlXDataLayer) Start(acc telegraf.Accumulator) error {
var ctx context.Context
ctx, c.cancel = context.WithCancel(context.Background())
var err error
c.connection, err = c.HTTPClientConfig.CreateClient(ctx, c.Log)
if err != nil {
return fmt.Errorf("failed to create http client: %w", err)
}
username, err := c.Username.Get()
if err != nil {
return fmt.Errorf("getting username failed: %w", err)
}
password, err := c.Password.Get()
if err != nil {
username.Destroy()
return fmt.Errorf("getting password failed: %w", err)
}
c.tokenManager = token.TokenManager{
Url: c.url,
Username: username.String(),
Password: password.String(),
Connection: c.connection,
}
username.Destroy()
password.Destroy()
c.acc = acc
c.gatherLoop(ctx)
return nil
}
// gatherLoop creates sse subscriptions on the Data Layer and requests the sse data // gatherLoop creates sse subscriptions on the Data Layer and requests the sse data
// the connection will be restablished if the sse subscription is broken. // the connection will be restablished if the sse subscription is broken.
func (c *CtrlXDataLayer) gatherLoop(ctx context.Context) { func (c *CtrlXDataLayer) gatherLoop(ctx context.Context) {
@ -349,26 +364,6 @@ func (c *CtrlXDataLayer) gatherLoop(ctx context.Context) {
} }
} }
// Stop input as service.
func (c *CtrlXDataLayer) Stop() {
c.cancel()
c.wg.Wait()
if c.connection != nil {
c.connection.CloseIdleConnections()
}
}
// Gather is called by telegraf to collect the metrics.
func (c *CtrlXDataLayer) Gather(_ telegraf.Accumulator) error {
// Metrics are sent to the accumulator asynchronously in worker thread. So nothing to do here.
return nil
}
// SampleConfig returns the auto-inserted sample configuration to the telegraf.
func (*CtrlXDataLayer) SampleConfig() string {
return sampleConfig
}
// init registers the plugin in telegraf. // init registers the plugin in telegraf.
func init() { func init() {
inputs.Add("ctrlx_datalayer", func() telegraf.Input { inputs.Add("ctrlx_datalayer", func() telegraf.Input {