chore: Fix linter findings for revive:max-public-structs in plugins/inputs/[n-s]* (#15878)
This commit is contained in:
parent
65999bdab6
commit
714989aba2
|
|
@ -123,7 +123,7 @@ func getTags(addr *url.URL) map[string]string {
|
|||
return map[string]string{"server": host, "port": port}
|
||||
}
|
||||
|
||||
type ResponseStats struct {
|
||||
type responseStats struct {
|
||||
Responses1xx int64 `json:"1xx"`
|
||||
Responses2xx int64 `json:"2xx"`
|
||||
Responses3xx int64 `json:"3xx"`
|
||||
|
|
@ -132,25 +132,25 @@ type ResponseStats struct {
|
|||
Total int64 `json:"total"`
|
||||
}
|
||||
|
||||
type BasicHitStats struct {
|
||||
type basicHitStats struct {
|
||||
Responses int64 `json:"responses"`
|
||||
Bytes int64 `json:"bytes"`
|
||||
}
|
||||
|
||||
type ExtendedHitStats struct {
|
||||
BasicHitStats
|
||||
type extendedHitStats struct {
|
||||
basicHitStats
|
||||
ResponsesWritten int64 `json:"responses_written"`
|
||||
BytesWritten int64 `json:"bytes_written"`
|
||||
}
|
||||
|
||||
type HealthCheckStats struct {
|
||||
type healthCheckStats struct {
|
||||
Checks int64 `json:"checks"`
|
||||
Fails int64 `json:"fails"`
|
||||
Unhealthy int64 `json:"unhealthy"`
|
||||
LastPassed *bool `json:"last_passed"`
|
||||
}
|
||||
|
||||
type Status struct {
|
||||
type status struct {
|
||||
Version int `json:"version"`
|
||||
NginxVersion string `json:"nginx_version"`
|
||||
Address string `json:"address"`
|
||||
|
|
@ -184,7 +184,7 @@ type Status struct {
|
|||
ServerZones map[string]struct { // added in version 2
|
||||
Processing int `json:"processing"`
|
||||
Requests int64 `json:"requests"`
|
||||
Responses ResponseStats `json:"responses"`
|
||||
Responses responseStats `json:"responses"`
|
||||
Discarded *int64 `json:"discarded"` // added in version 6
|
||||
Received int64 `json:"received"`
|
||||
Sent int64 `json:"sent"`
|
||||
|
|
@ -201,12 +201,12 @@ type Status struct {
|
|||
Keepalive *int `json:"keepalive"` // removed in version 5
|
||||
MaxConns *int `json:"max_conns"` // added in version 3
|
||||
Requests int64 `json:"requests"`
|
||||
Responses ResponseStats `json:"responses"`
|
||||
Responses responseStats `json:"responses"`
|
||||
Sent int64 `json:"sent"`
|
||||
Received int64 `json:"received"`
|
||||
Fails int64 `json:"fails"`
|
||||
Unavail int64 `json:"unavail"`
|
||||
HealthChecks HealthCheckStats `json:"health_checks"`
|
||||
HealthChecks healthCheckStats `json:"health_checks"`
|
||||
Downtime int64 `json:"downtime"`
|
||||
Downstart int64 `json:"downstart"`
|
||||
Selected *int64 `json:"selected"` // added in version 4
|
||||
|
|
@ -226,20 +226,20 @@ type Status struct {
|
|||
Size int64 `json:"size"`
|
||||
MaxSize int64 `json:"max_size"`
|
||||
Cold bool `json:"cold"`
|
||||
Hit BasicHitStats `json:"hit"`
|
||||
Stale BasicHitStats `json:"stale"`
|
||||
Updating BasicHitStats `json:"updating"`
|
||||
Revalidated *BasicHitStats `json:"revalidated"` // added in version 3
|
||||
Miss ExtendedHitStats `json:"miss"`
|
||||
Expired ExtendedHitStats `json:"expired"`
|
||||
Bypass ExtendedHitStats `json:"bypass"`
|
||||
Hit basicHitStats `json:"hit"`
|
||||
Stale basicHitStats `json:"stale"`
|
||||
Updating basicHitStats `json:"updating"`
|
||||
Revalidated *basicHitStats `json:"revalidated"` // added in version 3
|
||||
Miss extendedHitStats `json:"miss"`
|
||||
Expired extendedHitStats `json:"expired"`
|
||||
Bypass extendedHitStats `json:"bypass"`
|
||||
} `json:"caches"`
|
||||
|
||||
Stream struct {
|
||||
ServerZones map[string]struct {
|
||||
Processing int `json:"processing"`
|
||||
Connections int `json:"connections"`
|
||||
Sessions *ResponseStats `json:"sessions"`
|
||||
Sessions *responseStats `json:"sessions"`
|
||||
Discarded *int64 `json:"discarded"` // added in version 7
|
||||
Received int64 `json:"received"`
|
||||
Sent int64 `json:"sent"`
|
||||
|
|
@ -260,7 +260,7 @@ type Status struct {
|
|||
Received int64 `json:"received"`
|
||||
Fails int64 `json:"fails"`
|
||||
Unavail int64 `json:"unavail"`
|
||||
HealthChecks HealthCheckStats `json:"health_checks"`
|
||||
HealthChecks healthCheckStats `json:"health_checks"`
|
||||
Downtime int64 `json:"downtime"`
|
||||
Downstart int64 `json:"downstart"`
|
||||
Selected int64 `json:"selected"`
|
||||
|
|
@ -272,7 +272,7 @@ type Status struct {
|
|||
|
||||
func gatherStatusURL(r *bufio.Reader, tags map[string]string, acc telegraf.Accumulator) error {
|
||||
dec := json.NewDecoder(r)
|
||||
status := &Status{}
|
||||
status := &status{}
|
||||
if err := dec.Decode(status); err != nil {
|
||||
return errors.New("error while decoding JSON response")
|
||||
}
|
||||
|
|
@ -280,7 +280,7 @@ func gatherStatusURL(r *bufio.Reader, tags map[string]string, acc telegraf.Accum
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Status) Gather(tags map[string]string, acc telegraf.Accumulator) {
|
||||
func (s *status) Gather(tags map[string]string, acc telegraf.Accumulator) {
|
||||
s.gatherProcessesMetrics(tags, acc)
|
||||
s.gatherConnectionsMetrics(tags, acc)
|
||||
s.gatherSslMetrics(tags, acc)
|
||||
|
|
@ -291,7 +291,7 @@ func (s *Status) Gather(tags map[string]string, acc telegraf.Accumulator) {
|
|||
s.gatherStreamMetrics(tags, acc)
|
||||
}
|
||||
|
||||
func (s *Status) gatherProcessesMetrics(tags map[string]string, acc telegraf.Accumulator) {
|
||||
func (s *status) gatherProcessesMetrics(tags map[string]string, acc telegraf.Accumulator) {
|
||||
var respawned int
|
||||
|
||||
if s.Processes.Respawned != nil {
|
||||
|
|
@ -307,7 +307,7 @@ func (s *Status) gatherProcessesMetrics(tags map[string]string, acc telegraf.Acc
|
|||
)
|
||||
}
|
||||
|
||||
func (s *Status) gatherConnectionsMetrics(tags map[string]string, acc telegraf.Accumulator) {
|
||||
func (s *status) gatherConnectionsMetrics(tags map[string]string, acc telegraf.Accumulator) {
|
||||
acc.AddFields(
|
||||
"nginx_plus_connections",
|
||||
map[string]interface{}{
|
||||
|
|
@ -320,7 +320,7 @@ func (s *Status) gatherConnectionsMetrics(tags map[string]string, acc telegraf.A
|
|||
)
|
||||
}
|
||||
|
||||
func (s *Status) gatherSslMetrics(tags map[string]string, acc telegraf.Accumulator) {
|
||||
func (s *status) gatherSslMetrics(tags map[string]string, acc telegraf.Accumulator) {
|
||||
acc.AddFields(
|
||||
"nginx_plus_ssl",
|
||||
map[string]interface{}{
|
||||
|
|
@ -332,7 +332,7 @@ func (s *Status) gatherSslMetrics(tags map[string]string, acc telegraf.Accumulat
|
|||
)
|
||||
}
|
||||
|
||||
func (s *Status) gatherRequestMetrics(tags map[string]string, acc telegraf.Accumulator) {
|
||||
func (s *status) gatherRequestMetrics(tags map[string]string, acc telegraf.Accumulator) {
|
||||
acc.AddFields(
|
||||
"nginx_plus_requests",
|
||||
map[string]interface{}{
|
||||
|
|
@ -343,7 +343,7 @@ func (s *Status) gatherRequestMetrics(tags map[string]string, acc telegraf.Accum
|
|||
)
|
||||
}
|
||||
|
||||
func (s *Status) gatherZoneMetrics(tags map[string]string, acc telegraf.Accumulator) {
|
||||
func (s *status) gatherZoneMetrics(tags map[string]string, acc telegraf.Accumulator) {
|
||||
for zoneName, zone := range s.ServerZones {
|
||||
zoneTags := map[string]string{}
|
||||
for k, v := range tags {
|
||||
|
|
@ -375,7 +375,7 @@ func (s *Status) gatherZoneMetrics(tags map[string]string, acc telegraf.Accumula
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Status) gatherUpstreamMetrics(tags map[string]string, acc telegraf.Accumulator) {
|
||||
func (s *status) gatherUpstreamMetrics(tags map[string]string, acc telegraf.Accumulator) {
|
||||
for upstreamName, upstream := range s.Upstreams {
|
||||
upstreamTags := map[string]string{}
|
||||
for k, v := range tags {
|
||||
|
|
@ -451,7 +451,7 @@ func (s *Status) gatherUpstreamMetrics(tags map[string]string, acc telegraf.Accu
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Status) gatherCacheMetrics(tags map[string]string, acc telegraf.Accumulator) {
|
||||
func (s *status) gatherCacheMetrics(tags map[string]string, acc telegraf.Accumulator) {
|
||||
for cacheName, cache := range s.Caches {
|
||||
cacheTags := map[string]string{}
|
||||
for k, v := range tags {
|
||||
|
|
@ -490,7 +490,7 @@ func (s *Status) gatherCacheMetrics(tags map[string]string, acc telegraf.Accumul
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Status) gatherStreamMetrics(tags map[string]string, acc telegraf.Accumulator) {
|
||||
func (s *status) gatherStreamMetrics(tags map[string]string, acc telegraf.Accumulator) {
|
||||
for zoneName, zone := range s.Stream.ServerZones {
|
||||
zoneTags := map[string]string{}
|
||||
for k, v := range tags {
|
||||
|
|
|
|||
|
|
@ -91,7 +91,7 @@ func (n *NginxPlusAPI) gatherProcessesMetrics(addr *url.URL, acc telegraf.Accumu
|
|||
return err
|
||||
}
|
||||
|
||||
var processes = &Processes{}
|
||||
var processes = &processes{}
|
||||
|
||||
if err := json.Unmarshal(body, processes); err != nil {
|
||||
return err
|
||||
|
|
@ -114,7 +114,7 @@ func (n *NginxPlusAPI) gatherConnectionsMetrics(addr *url.URL, acc telegraf.Accu
|
|||
return err
|
||||
}
|
||||
|
||||
var connections = &Connections{}
|
||||
var connections = &connections{}
|
||||
|
||||
if err := json.Unmarshal(body, connections); err != nil {
|
||||
return err
|
||||
|
|
@ -140,7 +140,7 @@ func (n *NginxPlusAPI) gatherSlabsMetrics(addr *url.URL, acc telegraf.Accumulato
|
|||
return err
|
||||
}
|
||||
|
||||
var slabs Slabs
|
||||
var slabs slabs
|
||||
|
||||
if err := json.Unmarshal(body, &slabs); err != nil {
|
||||
return err
|
||||
|
|
@ -193,7 +193,7 @@ func (n *NginxPlusAPI) gatherSslMetrics(addr *url.URL, acc telegraf.Accumulator)
|
|||
return err
|
||||
}
|
||||
|
||||
var ssl = &Ssl{}
|
||||
var ssl = &ssl{}
|
||||
|
||||
if err := json.Unmarshal(body, ssl); err != nil {
|
||||
return err
|
||||
|
|
@ -218,7 +218,7 @@ func (n *NginxPlusAPI) gatherHTTPRequestsMetrics(addr *url.URL, acc telegraf.Acc
|
|||
return err
|
||||
}
|
||||
|
||||
var httpRequests = &HTTPRequests{}
|
||||
var httpRequests = &httpRequests{}
|
||||
|
||||
if err := json.Unmarshal(body, httpRequests); err != nil {
|
||||
return err
|
||||
|
|
@ -242,7 +242,7 @@ func (n *NginxPlusAPI) gatherHTTPServerZonesMetrics(addr *url.URL, acc telegraf.
|
|||
return err
|
||||
}
|
||||
|
||||
var httpServerZones HTTPServerZones
|
||||
var httpServerZones httpServerZones
|
||||
|
||||
if err := json.Unmarshal(body, &httpServerZones); err != nil {
|
||||
return err
|
||||
|
|
@ -290,7 +290,7 @@ func (n *NginxPlusAPI) gatherHTTPLocationZonesMetrics(addr *url.URL, acc telegra
|
|||
return err
|
||||
}
|
||||
|
||||
var httpLocationZones HTTPLocationZones
|
||||
var httpLocationZones httpLocationZones
|
||||
|
||||
if err := json.Unmarshal(body, &httpLocationZones); err != nil {
|
||||
return err
|
||||
|
|
@ -336,7 +336,7 @@ func (n *NginxPlusAPI) gatherHTTPUpstreamsMetrics(addr *url.URL, acc telegraf.Ac
|
|||
return err
|
||||
}
|
||||
|
||||
var httpUpstreams HTTPUpstreams
|
||||
var httpUpstreams httpUpstreams
|
||||
|
||||
if err := json.Unmarshal(body, &httpUpstreams); err != nil {
|
||||
return err
|
||||
|
|
@ -420,7 +420,7 @@ func (n *NginxPlusAPI) gatherHTTPCachesMetrics(addr *url.URL, acc telegraf.Accum
|
|||
return err
|
||||
}
|
||||
|
||||
var httpCaches HTTPCaches
|
||||
var httpCaches httpCaches
|
||||
|
||||
if err := json.Unmarshal(body, &httpCaches); err != nil {
|
||||
return err
|
||||
|
|
@ -474,7 +474,7 @@ func (n *NginxPlusAPI) gatherStreamServerZonesMetrics(addr *url.URL, acc telegra
|
|||
return err
|
||||
}
|
||||
|
||||
var streamServerZones StreamServerZones
|
||||
var streamServerZones streamServerZones
|
||||
|
||||
if err := json.Unmarshal(body, &streamServerZones); err != nil {
|
||||
return err
|
||||
|
|
@ -510,7 +510,7 @@ func (n *NginxPlusAPI) gatherResolverZonesMetrics(addr *url.URL, acc telegraf.Ac
|
|||
return err
|
||||
}
|
||||
|
||||
var resolverZones ResolverZones
|
||||
var resolverZones resolverZones
|
||||
|
||||
if err := json.Unmarshal(body, &resolverZones); err != nil {
|
||||
return err
|
||||
|
|
@ -553,7 +553,7 @@ func (n *NginxPlusAPI) gatherStreamUpstreamsMetrics(addr *url.URL, acc telegraf.
|
|||
return err
|
||||
}
|
||||
|
||||
var streamUpstreams StreamUpstreams
|
||||
var streamUpstreams streamUpstreams
|
||||
|
||||
if err := json.Unmarshal(body, &streamUpstreams); err != nil {
|
||||
return err
|
||||
|
|
@ -623,7 +623,7 @@ func (n *NginxPlusAPI) gatherHTTPLimitReqsMetrics(addr *url.URL, acc telegraf.Ac
|
|||
return err
|
||||
}
|
||||
|
||||
var httpLimitReqs HTTPLimitReqs
|
||||
var httpLimitReqs httpLimitReqs
|
||||
|
||||
if err := json.Unmarshal(body, &httpLimitReqs); err != nil {
|
||||
return err
|
||||
|
|
|
|||
|
|
@ -1,17 +1,17 @@
|
|||
package nginx_plus_api
|
||||
|
||||
type Processes struct {
|
||||
type processes struct {
|
||||
Respawned int `json:"respawned"`
|
||||
}
|
||||
|
||||
type Connections struct {
|
||||
type connections struct {
|
||||
Accepted int64 `json:"accepted"`
|
||||
Dropped int64 `json:"dropped"`
|
||||
Active int64 `json:"active"`
|
||||
Idle int64 `json:"idle"`
|
||||
}
|
||||
|
||||
type Slabs map[string]struct {
|
||||
type slabs map[string]struct {
|
||||
Pages struct {
|
||||
Used int64 `json:"used"`
|
||||
Free int64 `json:"free"`
|
||||
|
|
@ -24,13 +24,13 @@ type Slabs map[string]struct {
|
|||
} `json:"slots"`
|
||||
}
|
||||
|
||||
type Ssl struct { // added in version 6
|
||||
type ssl struct { // added in version 6
|
||||
Handshakes int64 `json:"handshakes"`
|
||||
HandshakesFailed int64 `json:"handshakes_failed"`
|
||||
SessionReuses int64 `json:"session_reuses"`
|
||||
}
|
||||
|
||||
type ResolverZones map[string]struct {
|
||||
type resolverZones map[string]struct {
|
||||
Requests struct {
|
||||
Name int64 `json:"name"`
|
||||
Srv int64 `json:"srv"`
|
||||
|
|
@ -48,12 +48,12 @@ type ResolverZones map[string]struct {
|
|||
} `json:"responses"`
|
||||
}
|
||||
|
||||
type HTTPRequests struct {
|
||||
type httpRequests struct {
|
||||
Total int64 `json:"total"`
|
||||
Current int64 `json:"current"`
|
||||
}
|
||||
|
||||
type ResponseStats struct {
|
||||
type responseStats struct {
|
||||
Responses1xx int64 `json:"1xx"`
|
||||
Responses2xx int64 `json:"2xx"`
|
||||
Responses3xx int64 `json:"3xx"`
|
||||
|
|
@ -62,31 +62,31 @@ type ResponseStats struct {
|
|||
Total int64 `json:"total"`
|
||||
}
|
||||
|
||||
type HTTPServerZones map[string]struct {
|
||||
type httpServerZones map[string]struct {
|
||||
Processing int `json:"processing"`
|
||||
Requests int64 `json:"requests"`
|
||||
Responses ResponseStats `json:"responses"`
|
||||
Responses responseStats `json:"responses"`
|
||||
Discarded *int64 `json:"discarded"` // added in version 6
|
||||
Received int64 `json:"received"`
|
||||
Sent int64 `json:"sent"`
|
||||
}
|
||||
|
||||
type HTTPLocationZones map[string]struct {
|
||||
type httpLocationZones map[string]struct {
|
||||
Requests int64 `json:"requests"`
|
||||
Responses ResponseStats `json:"responses"`
|
||||
Responses responseStats `json:"responses"`
|
||||
Discarded *int64 `json:"discarded"` // added in version 6
|
||||
Received int64 `json:"received"`
|
||||
Sent int64 `json:"sent"`
|
||||
}
|
||||
|
||||
type HealthCheckStats struct {
|
||||
type healthCheckStats struct {
|
||||
Checks int64 `json:"checks"`
|
||||
Fails int64 `json:"fails"`
|
||||
Unhealthy int64 `json:"unhealthy"`
|
||||
LastPassed *bool `json:"last_passed"`
|
||||
}
|
||||
|
||||
type HTTPUpstreams map[string]struct {
|
||||
type httpUpstreams map[string]struct {
|
||||
Peers []struct {
|
||||
ID *int `json:"id"` // added in version 3
|
||||
Server string `json:"server"`
|
||||
|
|
@ -97,12 +97,12 @@ type HTTPUpstreams map[string]struct {
|
|||
Keepalive *int `json:"keepalive"` // removed in version 5
|
||||
MaxConns *int `json:"max_conns"` // added in version 3
|
||||
Requests int64 `json:"requests"`
|
||||
Responses ResponseStats `json:"responses"`
|
||||
Responses responseStats `json:"responses"`
|
||||
Sent int64 `json:"sent"`
|
||||
Received int64 `json:"received"`
|
||||
Fails int64 `json:"fails"`
|
||||
Unavail int64 `json:"unavail"`
|
||||
HealthChecks HealthCheckStats `json:"health_checks"`
|
||||
HealthChecks healthCheckStats `json:"health_checks"`
|
||||
Downtime int64 `json:"downtime"`
|
||||
HeaderTime *int64 `json:"header_time"` // added in version 5
|
||||
ResponseTime *int64 `json:"response_time"` // added in version 5
|
||||
|
|
@ -116,16 +116,16 @@ type HTTPUpstreams map[string]struct {
|
|||
} `json:"queue"`
|
||||
}
|
||||
|
||||
type StreamServerZones map[string]struct {
|
||||
type streamServerZones map[string]struct {
|
||||
Processing int `json:"processing"`
|
||||
Connections int `json:"connections"`
|
||||
Sessions *ResponseStats `json:"sessions"`
|
||||
Sessions *responseStats `json:"sessions"`
|
||||
Discarded *int64 `json:"discarded"` // added in version 7
|
||||
Received int64 `json:"received"`
|
||||
Sent int64 `json:"sent"`
|
||||
}
|
||||
|
||||
type StreamUpstreams map[string]struct {
|
||||
type streamUpstreams map[string]struct {
|
||||
Peers []struct {
|
||||
ID int `json:"id"`
|
||||
Server string `json:"server"`
|
||||
|
|
@ -141,37 +141,37 @@ type StreamUpstreams map[string]struct {
|
|||
Received int64 `json:"received"`
|
||||
Fails int64 `json:"fails"`
|
||||
Unavail int64 `json:"unavail"`
|
||||
HealthChecks HealthCheckStats `json:"health_checks"`
|
||||
HealthChecks healthCheckStats `json:"health_checks"`
|
||||
Downtime int64 `json:"downtime"`
|
||||
} `json:"peers"`
|
||||
Zombies int `json:"zombies"`
|
||||
}
|
||||
|
||||
type BasicHitStats struct {
|
||||
type basicHitStats struct {
|
||||
Responses int64 `json:"responses"`
|
||||
Bytes int64 `json:"bytes"`
|
||||
}
|
||||
|
||||
type ExtendedHitStats struct {
|
||||
BasicHitStats
|
||||
type extendedHitStats struct {
|
||||
basicHitStats
|
||||
ResponsesWritten int64 `json:"responses_written"`
|
||||
BytesWritten int64 `json:"bytes_written"`
|
||||
}
|
||||
|
||||
type HTTPCaches map[string]struct { // added in version 2
|
||||
type httpCaches map[string]struct { // added in version 2
|
||||
Size int64 `json:"size"`
|
||||
MaxSize int64 `json:"max_size"`
|
||||
Cold bool `json:"cold"`
|
||||
Hit BasicHitStats `json:"hit"`
|
||||
Stale BasicHitStats `json:"stale"`
|
||||
Updating BasicHitStats `json:"updating"`
|
||||
Revalidated *BasicHitStats `json:"revalidated"` // added in version 3
|
||||
Miss ExtendedHitStats `json:"miss"`
|
||||
Expired ExtendedHitStats `json:"expired"`
|
||||
Bypass ExtendedHitStats `json:"bypass"`
|
||||
Hit basicHitStats `json:"hit"`
|
||||
Stale basicHitStats `json:"stale"`
|
||||
Updating basicHitStats `json:"updating"`
|
||||
Revalidated *basicHitStats `json:"revalidated"` // added in version 3
|
||||
Miss extendedHitStats `json:"miss"`
|
||||
Expired extendedHitStats `json:"expired"`
|
||||
Bypass extendedHitStats `json:"bypass"`
|
||||
}
|
||||
|
||||
type HTTPLimitReqs map[string]struct {
|
||||
type httpLimitReqs map[string]struct {
|
||||
Passed int64 `json:"passed"`
|
||||
Delayed int64 `json:"delayed"`
|
||||
Rejected int64 `json:"rejected"`
|
||||
|
|
|
|||
|
|
@ -62,9 +62,8 @@ func (n *Nomad) Init() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Gather, collects metrics from Nomad endpoint
|
||||
func (n *Nomad) Gather(acc telegraf.Accumulator) error {
|
||||
summaryMetrics := &MetricsSummary{}
|
||||
summaryMetrics := &metricsSummary{}
|
||||
err := n.loadJSON(n.URL+"/v1/metrics", summaryMetrics)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
@ -103,7 +102,7 @@ func (n *Nomad) loadJSON(url string, v interface{}) error {
|
|||
}
|
||||
|
||||
// buildNomadMetrics, it builds all the metrics and adds them to the accumulator)
|
||||
func buildNomadMetrics(acc telegraf.Accumulator, summaryMetrics *MetricsSummary) error {
|
||||
func buildNomadMetrics(acc telegraf.Accumulator, summaryMetrics *metricsSummary) error {
|
||||
t, err := internal.ParseTimestamp(timeLayout, summaryMetrics.Timestamp, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error parsing time: %w", err)
|
||||
|
|
|
|||
|
|
@ -4,36 +4,36 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
type MetricsSummary struct {
|
||||
type metricsSummary struct {
|
||||
Timestamp string `json:"timestamp"`
|
||||
Gauges []GaugeValue `json:"gauges"`
|
||||
Points []PointValue `json:"points"`
|
||||
Counters []SampledValue `json:"counters"`
|
||||
Samples []SampledValue `json:"samples"`
|
||||
Gauges []gaugeValue `json:"gauges"`
|
||||
Points []pointValue `json:"points"`
|
||||
Counters []sampledValue `json:"counters"`
|
||||
Samples []sampledValue `json:"samples"`
|
||||
}
|
||||
|
||||
type GaugeValue struct {
|
||||
type gaugeValue struct {
|
||||
Name string `json:"name"`
|
||||
Hash string `json:"-"`
|
||||
Value float32 `json:"value"`
|
||||
|
||||
Labels []Label `json:"-"`
|
||||
Labels []label `json:"-"`
|
||||
DisplayLabels map[string]string `json:"Labels"`
|
||||
}
|
||||
|
||||
type PointValue struct {
|
||||
type pointValue struct {
|
||||
Name string `json:"name"`
|
||||
Points []float32 `json:"points"`
|
||||
}
|
||||
|
||||
type SampledValue struct {
|
||||
type sampledValue struct {
|
||||
Name string `json:"name"`
|
||||
Hash string `json:"-"`
|
||||
*AggregateSample
|
||||
Mean float64 `json:"mean"`
|
||||
Stddev float64 `json:"stddev"`
|
||||
|
||||
Labels []Label `json:"-"`
|
||||
Labels []label `json:"-"`
|
||||
DisplayLabels map[string]string `json:"Labels"`
|
||||
}
|
||||
|
||||
|
|
@ -47,7 +47,7 @@ type AggregateSample struct {
|
|||
LastUpdated time.Time `json:"-"`
|
||||
}
|
||||
|
||||
type Label struct {
|
||||
type label struct {
|
||||
Name string `json:"name"`
|
||||
Value string `json:"value"`
|
||||
}
|
||||
|
|
|
|||
|
|
@ -125,14 +125,14 @@ func (n *NSQ) gatherEndpoint(e string, acc telegraf.Accumulator) error {
|
|||
return fmt.Errorf(`error reading body: %w`, err)
|
||||
}
|
||||
|
||||
data := &NSQStatsData{}
|
||||
data := &nsqStatsData{}
|
||||
err = json.Unmarshal(body, data)
|
||||
if err != nil {
|
||||
return fmt.Errorf(`error parsing response: %w`, err)
|
||||
}
|
||||
// Data was not parsed correctly attempt to use old format.
|
||||
if len(data.Version) < 1 {
|
||||
wrapper := &NSQStats{}
|
||||
wrapper := &nsqStats{}
|
||||
err = json.Unmarshal(body, wrapper)
|
||||
if err != nil {
|
||||
return fmt.Errorf(`error parsing response: %w`, err)
|
||||
|
|
@ -155,7 +155,7 @@ func (n *NSQ) gatherEndpoint(e string, acc telegraf.Accumulator) error {
|
|||
|
||||
acc.AddFields("nsq_server", fields, tags)
|
||||
for _, t := range data.Topics {
|
||||
topicStats(t, acc, u.Host, data.Version)
|
||||
gatherTopicStats(t, acc, u.Host, data.Version)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
@ -170,7 +170,7 @@ func buildURL(e string) (*url.URL, error) {
|
|||
return addr, nil
|
||||
}
|
||||
|
||||
func topicStats(t TopicStats, acc telegraf.Accumulator, host, version string) {
|
||||
func gatherTopicStats(t topicStats, acc telegraf.Accumulator, host, version string) {
|
||||
// per topic overall (tag: name, paused, channel count)
|
||||
tags := map[string]string{
|
||||
"server_host": host,
|
||||
|
|
@ -187,11 +187,11 @@ func topicStats(t TopicStats, acc telegraf.Accumulator, host, version string) {
|
|||
acc.AddFields("nsq_topic", fields, tags)
|
||||
|
||||
for _, c := range t.Channels {
|
||||
channelStats(c, acc, host, version, t.Name)
|
||||
gatherChannelStats(c, acc, host, version, t.Name)
|
||||
}
|
||||
}
|
||||
|
||||
func channelStats(c ChannelStats, acc telegraf.Accumulator, host, version, topic string) {
|
||||
func gatherChannelStats(c channelStats, acc telegraf.Accumulator, host, version, topic string) {
|
||||
tags := map[string]string{
|
||||
"server_host": host,
|
||||
"server_version": version,
|
||||
|
|
@ -212,11 +212,11 @@ func channelStats(c ChannelStats, acc telegraf.Accumulator, host, version, topic
|
|||
|
||||
acc.AddFields("nsq_channel", fields, tags)
|
||||
for _, cl := range c.Clients {
|
||||
clientStats(cl, acc, host, version, topic, c.Name)
|
||||
gatherClientStats(cl, acc, host, version, topic, c.Name)
|
||||
}
|
||||
}
|
||||
|
||||
func clientStats(c ClientStats, acc telegraf.Accumulator, host, version, topic, channel string) {
|
||||
func gatherClientStats(c clientStats, acc telegraf.Accumulator, host, version, topic, channel string) {
|
||||
tags := map[string]string{
|
||||
"server_host": host,
|
||||
"server_version": version,
|
||||
|
|
@ -245,31 +245,31 @@ func clientStats(c ClientStats, acc telegraf.Accumulator, host, version, topic,
|
|||
acc.AddFields("nsq_client", fields, tags)
|
||||
}
|
||||
|
||||
type NSQStats struct {
|
||||
type nsqStats struct {
|
||||
Code int64 `json:"status_code"`
|
||||
Txt string `json:"status_txt"`
|
||||
Data NSQStatsData `json:"data"`
|
||||
Data nsqStatsData `json:"data"`
|
||||
}
|
||||
|
||||
type NSQStatsData struct {
|
||||
type nsqStatsData struct {
|
||||
Version string `json:"version"`
|
||||
Health string `json:"health"`
|
||||
StartTime int64 `json:"start_time"`
|
||||
Topics []TopicStats `json:"topics"`
|
||||
Topics []topicStats `json:"topics"`
|
||||
}
|
||||
|
||||
// e2e_processing_latency is not modeled
|
||||
type TopicStats struct {
|
||||
type topicStats struct {
|
||||
Name string `json:"topic_name"`
|
||||
Depth int64 `json:"depth"`
|
||||
BackendDepth int64 `json:"backend_depth"`
|
||||
MessageCount int64 `json:"message_count"`
|
||||
Paused bool `json:"paused"`
|
||||
Channels []ChannelStats `json:"channels"`
|
||||
Channels []channelStats `json:"channels"`
|
||||
}
|
||||
|
||||
// e2e_processing_latency is not modeled
|
||||
type ChannelStats struct {
|
||||
type channelStats struct {
|
||||
Name string `json:"channel_name"`
|
||||
Depth int64 `json:"depth"`
|
||||
BackendDepth int64 `json:"backend_depth"`
|
||||
|
|
@ -279,10 +279,10 @@ type ChannelStats struct {
|
|||
RequeueCount int64 `json:"requeue_count"`
|
||||
TimeoutCount int64 `json:"timeout_count"`
|
||||
Paused bool `json:"paused"`
|
||||
Clients []ClientStats `json:"clients"`
|
||||
Clients []clientStats `json:"clients"`
|
||||
}
|
||||
|
||||
type ClientStats struct {
|
||||
type clientStats struct {
|
||||
Name string `json:"name"` // DEPRECATED 1.x+, still here as the structs are currently being shared for parsing v3.x and 1.x
|
||||
ID string `json:"client_id"`
|
||||
Hostname string `json:"hostname"`
|
||||
|
|
|
|||
|
|
@ -1,52 +1,52 @@
|
|||
package schema_v11
|
||||
|
||||
// SMI defines the structure for the output of _nvidia-smi -q -x_.
|
||||
// smi defines the structure for the output of _nvidia-smi -q -x_.
|
||||
type smi struct {
|
||||
GPU []GPU `xml:"gpu"`
|
||||
GPU []gpu `xml:"gpu"`
|
||||
DriverVersion string `xml:"driver_version"`
|
||||
CUDAVersion string `xml:"cuda_version"`
|
||||
}
|
||||
|
||||
// GPU defines the structure of the GPU portion of the smi output.
|
||||
type GPU struct {
|
||||
Clocks ClockStats `xml:"clocks"`
|
||||
// gpu defines the structure of the GPU portion of the smi output.
|
||||
type gpu struct {
|
||||
Clocks clockStats `xml:"clocks"`
|
||||
ComputeMode string `xml:"compute_mode"`
|
||||
DisplayActive string `xml:"display_active"`
|
||||
DisplayMode string `xml:"display_mode"`
|
||||
EccMode ECCMode `xml:"ecc_mode"`
|
||||
Encoder EncoderStats `xml:"encoder_stats"`
|
||||
EccMode eccMode `xml:"ecc_mode"`
|
||||
Encoder encoderStats `xml:"encoder_stats"`
|
||||
FanSpeed string `xml:"fan_speed"` // int
|
||||
FBC FBCStats `xml:"fbc_stats"`
|
||||
Memory MemoryStats `xml:"fb_memory_usage"`
|
||||
PCI PCI `xml:"pci"`
|
||||
Power PowerReadings `xml:"power_readings"`
|
||||
FBC fbcStats `xml:"fbc_stats"`
|
||||
Memory memoryStats `xml:"fb_memory_usage"`
|
||||
PCI pic `xml:"pci"`
|
||||
Power powerReadings `xml:"power_readings"`
|
||||
ProdName string `xml:"product_name"`
|
||||
PState string `xml:"performance_state"`
|
||||
RemappedRows MemoryRemappedRows `xml:"remapped_rows"`
|
||||
RetiredPages MemoryRetiredPages `xml:"retired_pages"`
|
||||
RemappedRows memoryRemappedRows `xml:"remapped_rows"`
|
||||
RetiredPages memoryRetiredPages `xml:"retired_pages"`
|
||||
Serial string `xml:"serial"`
|
||||
Temp TempStats `xml:"temperature"`
|
||||
Utilization UtilizationStats `xml:"utilization"`
|
||||
Temp tempStats `xml:"temperature"`
|
||||
Utilization utilizationStats `xml:"utilization"`
|
||||
UUID string `xml:"uuid"`
|
||||
VbiosVersion string `xml:"vbios_version"`
|
||||
}
|
||||
|
||||
// ECCMode defines the structure of the ecc portions in the smi output.
|
||||
type ECCMode struct {
|
||||
// eccMode defines the structure of the ecc portions in the smi output.
|
||||
type eccMode struct {
|
||||
CurrentEcc string `xml:"current_ecc"` // Enabled, Disabled, N/A
|
||||
PendingEcc string `xml:"pending_ecc"` // Enabled, Disabled, N/A
|
||||
}
|
||||
|
||||
// MemoryStats defines the structure of the memory portions in the smi output.
|
||||
type MemoryStats struct {
|
||||
// memoryStats defines the structure of the memory portions in the smi output.
|
||||
type memoryStats struct {
|
||||
Total string `xml:"total"` // int
|
||||
Used string `xml:"used"` // int
|
||||
Free string `xml:"free"` // int
|
||||
Reserved string `xml:"reserved"` // int
|
||||
}
|
||||
|
||||
// MemoryRetiredPages defines the structure of the retired pages portions in the smi output.
|
||||
type MemoryRetiredPages struct {
|
||||
// memoryRetiredPages defines the structure of the retired pages portions in the smi output.
|
||||
type memoryRetiredPages struct {
|
||||
MultipleSingleBit struct {
|
||||
Count string `xml:"retired_count"` // int
|
||||
} `xml:"multiple_single_bit_retirement"`
|
||||
|
|
@ -57,35 +57,35 @@ type MemoryRetiredPages struct {
|
|||
PendingRetirement string `xml:"pending_retirement"` // Yes/No
|
||||
}
|
||||
|
||||
// MemoryRemappedRows defines the structure of the remapped rows portions in the smi output.
|
||||
type MemoryRemappedRows struct {
|
||||
// memoryRemappedRows defines the structure of the remapped rows portions in the smi output.
|
||||
type memoryRemappedRows struct {
|
||||
Correctable string `xml:"remapped_row_corr"` // int
|
||||
Uncorrectable string `xml:"remapped_row_unc"` // int
|
||||
Pending string `xml:"remapped_row_pending"` // Yes/No
|
||||
Failure string `xml:"remapped_row_failure"` // Yes/No
|
||||
}
|
||||
|
||||
// TempStats defines the structure of the temperature portion of the smi output.
|
||||
type TempStats struct {
|
||||
// tempStats defines the structure of the temperature portion of the smi output.
|
||||
type tempStats struct {
|
||||
GPUTemp string `xml:"gpu_temp"` // int
|
||||
}
|
||||
|
||||
// UtilizationStats defines the structure of the utilization portion of the smi output.
|
||||
type UtilizationStats struct {
|
||||
// utilizationStats defines the structure of the utilization portion of the smi output.
|
||||
type utilizationStats struct {
|
||||
GPU string `xml:"gpu_util"` // int
|
||||
Memory string `xml:"memory_util"` // int
|
||||
Encoder string `xml:"encoder_util"` // int
|
||||
Decoder string `xml:"decoder_util"` // int
|
||||
}
|
||||
|
||||
// PowerReadings defines the structure of the power_readings portion of the smi output.
|
||||
type PowerReadings struct {
|
||||
// powerReadings defines the structure of the power_readings portion of the smi output.
|
||||
type powerReadings struct {
|
||||
PowerDraw string `xml:"power_draw"` // float
|
||||
PowerLimit string `xml:"power_limit"` // float
|
||||
}
|
||||
|
||||
// PCI defines the structure of the pci portion of the smi output.
|
||||
type PCI struct {
|
||||
// pic defines the structure of the pci portion of the smi output.
|
||||
type pic struct {
|
||||
LinkInfo struct {
|
||||
PCIEGen struct {
|
||||
CurrentLinkGen string `xml:"current_link_gen"` // int
|
||||
|
|
@ -96,22 +96,22 @@ type PCI struct {
|
|||
} `xml:"pci_gpu_link_info"`
|
||||
}
|
||||
|
||||
// EncoderStats defines the structure of the encoder_stats portion of the smi output.
|
||||
type EncoderStats struct {
|
||||
// encoderStats defines the structure of the encoder_stats portion of the smi output.
|
||||
type encoderStats struct {
|
||||
SessionCount string `xml:"session_count"` // int
|
||||
AverageFPS string `xml:"average_fps"` // int
|
||||
AverageLatency string `xml:"average_latency"` // int
|
||||
}
|
||||
|
||||
// FBCStats defines the structure of the fbc_stats portion of the smi output.
|
||||
type FBCStats struct {
|
||||
// fbcStats defines the structure of the fbc_stats portion of the smi output.
|
||||
type fbcStats struct {
|
||||
SessionCount string `xml:"session_count"` // int
|
||||
AverageFPS string `xml:"average_fps"` // int
|
||||
AverageLatency string `xml:"average_latency"` // int
|
||||
}
|
||||
|
||||
// ClockStats defines the structure of the clocks portion of the smi output.
|
||||
type ClockStats struct {
|
||||
// clockStats defines the structure of the clocks portion of the smi output.
|
||||
type clockStats struct {
|
||||
Graphics string `xml:"graphics_clock"` // int
|
||||
SM string `xml:"sm_clock"` // int
|
||||
Memory string `xml:"mem_clock"` // int
|
||||
|
|
|
|||
|
|
@ -6,37 +6,37 @@ import (
|
|||
"github.com/influxdata/telegraf"
|
||||
)
|
||||
|
||||
type AggregationResponse struct {
|
||||
Hits *SearchHits `json:"hits"`
|
||||
Aggregations *Aggregation `json:"aggregations"`
|
||||
type aggregationResponse struct {
|
||||
Hits *searchHits `json:"hits"`
|
||||
Aggregations *aggregation `json:"aggregations"`
|
||||
}
|
||||
|
||||
type SearchHits struct {
|
||||
TotalHits *TotalHits `json:"total,omitempty"`
|
||||
type searchHits struct {
|
||||
TotalHits *totalHits `json:"total,omitempty"`
|
||||
}
|
||||
|
||||
type TotalHits struct {
|
||||
type totalHits struct {
|
||||
Relation string `json:"relation"`
|
||||
Value int64 `json:"value"`
|
||||
}
|
||||
|
||||
type MetricAggregation map[string]interface{}
|
||||
type metricAggregation map[string]interface{}
|
||||
|
||||
type AggregateValue struct {
|
||||
metrics MetricAggregation
|
||||
buckets []BucketData
|
||||
type aggregateValue struct {
|
||||
metrics metricAggregation
|
||||
buckets []bucketData
|
||||
}
|
||||
|
||||
type Aggregation map[string]AggregateValue
|
||||
type aggregation map[string]aggregateValue
|
||||
|
||||
type BucketData struct {
|
||||
type bucketData struct {
|
||||
DocumentCount int64 `json:"doc_count"`
|
||||
Key string `json:"key"`
|
||||
|
||||
subaggregation Aggregation
|
||||
subaggregation aggregation
|
||||
}
|
||||
|
||||
func (a *AggregationResponse) GetMetrics(acc telegraf.Accumulator, measurement string) error {
|
||||
func (a *aggregationResponse) GetMetrics(acc telegraf.Accumulator, measurement string) error {
|
||||
// Simple case (no aggregations)
|
||||
if a.Aggregations == nil {
|
||||
tags := make(map[string]string)
|
||||
|
|
@ -50,7 +50,7 @@ func (a *AggregationResponse) GetMetrics(acc telegraf.Accumulator, measurement s
|
|||
return a.Aggregations.GetMetrics(acc, measurement, a.Hits.TotalHits.Value, map[string]string{})
|
||||
}
|
||||
|
||||
func (a *Aggregation) GetMetrics(acc telegraf.Accumulator, measurement string, docCount int64, tags map[string]string) error {
|
||||
func (a *aggregation) GetMetrics(acc telegraf.Accumulator, measurement string, docCount int64, tags map[string]string) error {
|
||||
var err error
|
||||
fields := make(map[string]interface{})
|
||||
for name, agg := range *a {
|
||||
|
|
@ -85,7 +85,7 @@ func (a *Aggregation) GetMetrics(acc telegraf.Accumulator, measurement string, d
|
|||
return nil
|
||||
}
|
||||
|
||||
func (a *AggregateValue) UnmarshalJSON(bytes []byte) error {
|
||||
func (a *aggregateValue) UnmarshalJSON(bytes []byte) error {
|
||||
var partial map[string]json.RawMessage
|
||||
err := json.Unmarshal(bytes, &partial)
|
||||
if err != nil {
|
||||
|
|
@ -101,11 +101,11 @@ func (a *AggregateValue) UnmarshalJSON(bytes []byte) error {
|
|||
return json.Unmarshal(bytes, &a.metrics)
|
||||
}
|
||||
|
||||
func (a *AggregateValue) IsAggregation() bool {
|
||||
func (a *aggregateValue) IsAggregation() bool {
|
||||
return !(a.buckets == nil)
|
||||
}
|
||||
|
||||
func (b *BucketData) UnmarshalJSON(bytes []byte) error {
|
||||
func (b *bucketData) UnmarshalJSON(bytes []byte) error {
|
||||
var partial map[string]json.RawMessage
|
||||
var err error
|
||||
|
||||
|
|
@ -127,11 +127,11 @@ func (b *BucketData) UnmarshalJSON(bytes []byte) error {
|
|||
delete(partial, "key")
|
||||
|
||||
if b.subaggregation == nil {
|
||||
b.subaggregation = make(Aggregation)
|
||||
b.subaggregation = make(aggregation)
|
||||
}
|
||||
|
||||
for name, message := range partial {
|
||||
var subaggregation AggregateValue
|
||||
var subaggregation aggregateValue
|
||||
err = json.Unmarshal(message, &subaggregation)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
|||
|
|
@ -176,7 +176,7 @@ func init() {
|
|||
})
|
||||
}
|
||||
|
||||
func (o *OpensearchQuery) runAggregationQuery(ctx context.Context, aggregation osAggregation) (*AggregationResponse, error) {
|
||||
func (o *OpensearchQuery) runAggregationQuery(ctx context.Context, aggregation osAggregation) (*aggregationResponse, error) {
|
||||
now := time.Now().UTC()
|
||||
from := now.Add(time.Duration(-aggregation.QueryPeriod))
|
||||
filterQuery := aggregation.FilterQuery
|
||||
|
|
@ -219,7 +219,7 @@ func (o *OpensearchQuery) runAggregationQuery(ctx context.Context, aggregation o
|
|||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var searchResult AggregationResponse
|
||||
var searchResult aggregationResponse
|
||||
|
||||
decoder := json.NewDecoder(resp.Body)
|
||||
err = decoder.Decode(&searchResult)
|
||||
|
|
|
|||
|
|
@ -72,7 +72,7 @@ func getNodeSearchDomain(px *Proxmox) error {
|
|||
return err
|
||||
}
|
||||
|
||||
var nodeDNS NodeDNS
|
||||
var nodeDNS nodeDNS
|
||||
err = json.Unmarshal(jsonData, &nodeDNS)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
@ -108,14 +108,14 @@ func performRequest(px *Proxmox, apiURL, method string, data url.Values) ([]byte
|
|||
}
|
||||
|
||||
func gatherLxcData(px *Proxmox, acc telegraf.Accumulator) {
|
||||
gatherVMData(px, acc, LXC)
|
||||
gatherVMData(px, acc, lxc)
|
||||
}
|
||||
|
||||
func gatherQemuData(px *Proxmox, acc telegraf.Accumulator) {
|
||||
gatherVMData(px, acc, QEMU)
|
||||
gatherVMData(px, acc, qemu)
|
||||
}
|
||||
|
||||
func gatherVMData(px *Proxmox, acc telegraf.Accumulator, rt ResourceType) {
|
||||
func gatherVMData(px *Proxmox, acc telegraf.Accumulator, rt resourceType) {
|
||||
vmStats, err := getVMStats(px, rt)
|
||||
if err != nil {
|
||||
px.Log.Errorf("Error getting VM stats: %v", err)
|
||||
|
|
@ -147,56 +147,56 @@ func gatherVMData(px *Proxmox, acc telegraf.Accumulator, rt ResourceType) {
|
|||
}
|
||||
}
|
||||
|
||||
func getCurrentVMStatus(px *Proxmox, rt ResourceType, id json.Number) (VMStat, error) {
|
||||
func getCurrentVMStatus(px *Proxmox, rt resourceType, id json.Number) (vmStat, error) {
|
||||
apiURL := "/nodes/" + px.NodeName + "/" + string(rt) + "/" + string(id) + "/status/current"
|
||||
|
||||
jsonData, err := px.requestFunction(px, apiURL, http.MethodGet, nil)
|
||||
if err != nil {
|
||||
return VMStat{}, err
|
||||
return vmStat{}, err
|
||||
}
|
||||
|
||||
var currentVMStatus VMCurrentStats
|
||||
var currentVMStatus vmCurrentStats
|
||||
err = json.Unmarshal(jsonData, ¤tVMStatus)
|
||||
if err != nil {
|
||||
return VMStat{}, err
|
||||
return vmStat{}, err
|
||||
}
|
||||
|
||||
return currentVMStatus.Data, nil
|
||||
}
|
||||
|
||||
func getVMStats(px *Proxmox, rt ResourceType) (VMStats, error) {
|
||||
func getVMStats(px *Proxmox, rt resourceType) (vmStats, error) {
|
||||
apiURL := "/nodes/" + px.NodeName + "/" + string(rt)
|
||||
jsonData, err := px.requestFunction(px, apiURL, http.MethodGet, nil)
|
||||
if err != nil {
|
||||
return VMStats{}, err
|
||||
return vmStats{}, err
|
||||
}
|
||||
|
||||
var vmStats VMStats
|
||||
err = json.Unmarshal(jsonData, &vmStats)
|
||||
var vmStatistics vmStats
|
||||
err = json.Unmarshal(jsonData, &vmStatistics)
|
||||
if err != nil {
|
||||
return VMStats{}, err
|
||||
return vmStats{}, err
|
||||
}
|
||||
|
||||
return vmStats, nil
|
||||
return vmStatistics, nil
|
||||
}
|
||||
|
||||
func getVMConfig(px *Proxmox, vmID json.Number, rt ResourceType) (VMConfig, error) {
|
||||
func getVMConfig(px *Proxmox, vmID json.Number, rt resourceType) (vmConfig, error) {
|
||||
apiURL := "/nodes/" + px.NodeName + "/" + string(rt) + "/" + string(vmID) + "/config"
|
||||
jsonData, err := px.requestFunction(px, apiURL, http.MethodGet, nil)
|
||||
if err != nil {
|
||||
return VMConfig{}, err
|
||||
return vmConfig{}, err
|
||||
}
|
||||
|
||||
var vmConfig VMConfig
|
||||
err = json.Unmarshal(jsonData, &vmConfig)
|
||||
var vmCfg vmConfig
|
||||
err = json.Unmarshal(jsonData, &vmCfg)
|
||||
if err != nil {
|
||||
return VMConfig{}, err
|
||||
return vmConfig{}, err
|
||||
}
|
||||
|
||||
return vmConfig, nil
|
||||
return vmCfg, nil
|
||||
}
|
||||
|
||||
func getFields(vmStat VMStat) map[string]interface{} {
|
||||
func getFields(vmStat vmStat) map[string]interface{} {
|
||||
memMetrics := getByteMetrics(vmStat.TotalMem, vmStat.UsedMem)
|
||||
swapMetrics := getByteMetrics(vmStat.TotalSwap, vmStat.UsedSwap)
|
||||
diskMetrics := getByteMetrics(vmStat.TotalDisk, vmStat.UsedDisk)
|
||||
|
|
@ -255,7 +255,7 @@ func jsonNumberToFloat64(value json.Number) float64 {
|
|||
return float64Value
|
||||
}
|
||||
|
||||
func getTags(px *Proxmox, name string, vmConfig VMConfig, rt ResourceType) map[string]string {
|
||||
func getTags(px *Proxmox, name string, vmConfig vmConfig, rt resourceType) map[string]string {
|
||||
domain := vmConfig.Data.Searchdomain
|
||||
if len(domain) == 0 {
|
||||
domain = px.nodeSearchDomain
|
||||
|
|
|
|||
|
|
@ -25,22 +25,22 @@ type Proxmox struct {
|
|||
Log telegraf.Logger `toml:"-"`
|
||||
}
|
||||
|
||||
type ResourceType string
|
||||
type resourceType string
|
||||
|
||||
var (
|
||||
QEMU ResourceType = "qemu"
|
||||
LXC ResourceType = "lxc"
|
||||
qemu resourceType = "qemu"
|
||||
lxc resourceType = "lxc"
|
||||
)
|
||||
|
||||
type VMStats struct {
|
||||
Data []VMStat `json:"data"`
|
||||
type vmStats struct {
|
||||
Data []vmStat `json:"data"`
|
||||
}
|
||||
|
||||
type VMCurrentStats struct {
|
||||
Data VMStat `json:"data"`
|
||||
type vmCurrentStats struct {
|
||||
Data vmStat `json:"data"`
|
||||
}
|
||||
|
||||
type VMStat struct {
|
||||
type vmStat struct {
|
||||
ID json.Number `json:"vmid"`
|
||||
Name string `json:"name"`
|
||||
Status string `json:"status"`
|
||||
|
|
@ -54,7 +54,7 @@ type VMStat struct {
|
|||
CPULoad json.Number `json:"cpu"`
|
||||
}
|
||||
|
||||
type VMConfig struct {
|
||||
type vmConfig struct {
|
||||
Data struct {
|
||||
Searchdomain string `json:"searchdomain"`
|
||||
Hostname string `json:"hostname"`
|
||||
|
|
@ -62,7 +62,7 @@ type VMConfig struct {
|
|||
} `json:"data"`
|
||||
}
|
||||
|
||||
type NodeDNS struct {
|
||||
type nodeDNS struct {
|
||||
Data struct {
|
||||
Searchdomain string `json:"search"`
|
||||
} `json:"data"`
|
||||
|
|
|
|||
|
|
@ -22,21 +22,13 @@ import (
|
|||
//go:embed sample.conf
|
||||
var sampleConfig string
|
||||
|
||||
// DefaultUsername will set a default value that corresponds to the default
|
||||
// value used by Rabbitmq
|
||||
const DefaultUsername = "guest"
|
||||
|
||||
// DefaultPassword will set a default value that corresponds to the default
|
||||
// value used by Rabbitmq
|
||||
const DefaultPassword = "guest"
|
||||
|
||||
// DefaultURL will set a default value that corresponds to the default value
|
||||
// used by Rabbitmq
|
||||
const DefaultURL = "http://localhost:15672"
|
||||
|
||||
// Default http timeouts
|
||||
const DefaultResponseHeaderTimeout = 3
|
||||
const DefaultClientTimeout = 4
|
||||
const (
|
||||
defaultUsername = "guest"
|
||||
defaultPassword = "guest"
|
||||
defaultURL = "http://localhost:15672"
|
||||
defaultResponseHeaderTimeout = 3
|
||||
defaultClientTimeout = 4
|
||||
)
|
||||
|
||||
// RabbitMQ defines the configuration necessary for gathering metrics,
|
||||
// see the sample config for further details
|
||||
|
|
@ -70,41 +62,41 @@ type RabbitMQ struct {
|
|||
upstreamFilter filter.Filter
|
||||
}
|
||||
|
||||
type OverviewResponse struct {
|
||||
MessageStats *MessageStats `json:"message_stats"`
|
||||
ObjectTotals *ObjectTotals `json:"object_totals"`
|
||||
QueueTotals *QueueTotals `json:"queue_totals"`
|
||||
Listeners []Listeners `json:"listeners"`
|
||||
type overviewResponse struct {
|
||||
MessageStats *messageStats `json:"message_stats"`
|
||||
ObjectTotals *objectTotals `json:"object_totals"`
|
||||
QueueTotals *queueTotals `json:"queue_totals"`
|
||||
Listeners []listeners `json:"listeners"`
|
||||
}
|
||||
|
||||
type Listeners struct {
|
||||
type listeners struct {
|
||||
Protocol string `json:"protocol"`
|
||||
}
|
||||
|
||||
type Details struct {
|
||||
type details struct {
|
||||
Rate float64 `json:"rate"`
|
||||
}
|
||||
|
||||
type MessageStats struct {
|
||||
type messageStats struct {
|
||||
Ack int64
|
||||
AckDetails Details `json:"ack_details"`
|
||||
AckDetails details `json:"ack_details"`
|
||||
Deliver int64
|
||||
DeliverDetails Details `json:"deliver_details"`
|
||||
DeliverDetails details `json:"deliver_details"`
|
||||
DeliverGet int64 `json:"deliver_get"`
|
||||
DeliverGetDetails Details `json:"deliver_get_details"`
|
||||
DeliverGetDetails details `json:"deliver_get_details"`
|
||||
Publish int64
|
||||
PublishDetails Details `json:"publish_details"`
|
||||
PublishDetails details `json:"publish_details"`
|
||||
Redeliver int64
|
||||
RedeliverDetails Details `json:"redeliver_details"`
|
||||
RedeliverDetails details `json:"redeliver_details"`
|
||||
PublishIn int64 `json:"publish_in"`
|
||||
PublishInDetails Details `json:"publish_in_details"`
|
||||
PublishInDetails details `json:"publish_in_details"`
|
||||
PublishOut int64 `json:"publish_out"`
|
||||
PublishOutDetails Details `json:"publish_out_details"`
|
||||
PublishOutDetails details `json:"publish_out_details"`
|
||||
ReturnUnroutable int64 `json:"return_unroutable"`
|
||||
ReturnUnroutableDetails Details `json:"return_unroutable_details"`
|
||||
ReturnUnroutableDetails details `json:"return_unroutable_details"`
|
||||
}
|
||||
|
||||
type ObjectTotals struct {
|
||||
type objectTotals struct {
|
||||
Channels int64
|
||||
Connections int64
|
||||
Consumers int64
|
||||
|
|
@ -112,7 +104,7 @@ type ObjectTotals struct {
|
|||
Queues int64
|
||||
}
|
||||
|
||||
type QueueTotals struct {
|
||||
type queueTotals struct {
|
||||
Messages int64
|
||||
MessagesReady int64 `json:"messages_ready"`
|
||||
MessagesUnacknowledged int64 `json:"messages_unacknowledged"`
|
||||
|
|
@ -123,9 +115,9 @@ type QueueTotals struct {
|
|||
MessagePersistent int64 `json:"message_bytes_persistent"`
|
||||
}
|
||||
|
||||
type Queue struct {
|
||||
QueueTotals // just to not repeat the same code
|
||||
MessageStats `json:"message_stats"`
|
||||
type queue struct {
|
||||
queueTotals // just to not repeat the same code
|
||||
messageStats `json:"message_stats"`
|
||||
Memory int64
|
||||
Consumers int64
|
||||
ConsumerUtilisation float64 `json:"consumer_utilisation"`
|
||||
|
|
@ -140,7 +132,7 @@ type Queue struct {
|
|||
HeadMessageTimestamp *int64 `json:"head_message_timestamp"`
|
||||
}
|
||||
|
||||
type Node struct {
|
||||
type node struct {
|
||||
Name string
|
||||
|
||||
DiskFree int64 `json:"disk_free"`
|
||||
|
|
@ -159,26 +151,26 @@ type Node struct {
|
|||
Running bool `json:"running"`
|
||||
Uptime int64 `json:"uptime"`
|
||||
MnesiaDiskTxCount int64 `json:"mnesia_disk_tx_count"`
|
||||
MnesiaDiskTxCountDetails Details `json:"mnesia_disk_tx_count_details"`
|
||||
MnesiaDiskTxCountDetails details `json:"mnesia_disk_tx_count_details"`
|
||||
MnesiaRAMTxCount int64 `json:"mnesia_ram_tx_count"`
|
||||
MnesiaRAMTxCountDetails Details `json:"mnesia_ram_tx_count_details"`
|
||||
MnesiaRAMTxCountDetails details `json:"mnesia_ram_tx_count_details"`
|
||||
GcNum int64 `json:"gc_num"`
|
||||
GcNumDetails Details `json:"gc_num_details"`
|
||||
GcNumDetails details `json:"gc_num_details"`
|
||||
GcBytesReclaimed int64 `json:"gc_bytes_reclaimed"`
|
||||
GcBytesReclaimedDetails Details `json:"gc_bytes_reclaimed_details"`
|
||||
GcBytesReclaimedDetails details `json:"gc_bytes_reclaimed_details"`
|
||||
IoReadAvgTime float64 `json:"io_read_avg_time"`
|
||||
IoReadAvgTimeDetails Details `json:"io_read_avg_time_details"`
|
||||
IoReadAvgTimeDetails details `json:"io_read_avg_time_details"`
|
||||
IoReadBytes int64 `json:"io_read_bytes"`
|
||||
IoReadBytesDetails Details `json:"io_read_bytes_details"`
|
||||
IoReadBytesDetails details `json:"io_read_bytes_details"`
|
||||
IoWriteAvgTime float64 `json:"io_write_avg_time"`
|
||||
IoWriteAvgTimeDetails Details `json:"io_write_avg_time_details"`
|
||||
IoWriteAvgTimeDetails details `json:"io_write_avg_time_details"`
|
||||
IoWriteBytes int64 `json:"io_write_bytes"`
|
||||
IoWriteBytesDetails Details `json:"io_write_bytes_details"`
|
||||
IoWriteBytesDetails details `json:"io_write_bytes_details"`
|
||||
}
|
||||
|
||||
type Exchange struct {
|
||||
type exchange struct {
|
||||
Name string
|
||||
MessageStats `json:"message_stats"`
|
||||
messageStats `json:"message_stats"`
|
||||
Type string
|
||||
Internal bool
|
||||
Vhost string
|
||||
|
|
@ -186,25 +178,25 @@ type Exchange struct {
|
|||
AutoDelete bool `json:"auto_delete"`
|
||||
}
|
||||
|
||||
type FederationLinkChannelMessageStats struct {
|
||||
type federationLinkChannelMessageStats struct {
|
||||
Confirm int64 `json:"confirm"`
|
||||
ConfirmDetails Details `json:"confirm_details"`
|
||||
ConfirmDetails details `json:"confirm_details"`
|
||||
Publish int64 `json:"publish"`
|
||||
PublishDetails Details `json:"publish_details"`
|
||||
PublishDetails details `json:"publish_details"`
|
||||
ReturnUnroutable int64 `json:"return_unroutable"`
|
||||
ReturnUnroutableDetails Details `json:"return_unroutable_details"`
|
||||
ReturnUnroutableDetails details `json:"return_unroutable_details"`
|
||||
}
|
||||
|
||||
type FederationLinkChannel struct {
|
||||
type federationLinkChannel struct {
|
||||
AcksUncommitted int64 `json:"acks_uncommitted"`
|
||||
ConsumerCount int64 `json:"consumer_count"`
|
||||
MessagesUnacknowledged int64 `json:"messages_unacknowledged"`
|
||||
MessagesUncommitted int64 `json:"messages_uncommitted"`
|
||||
MessagesUnconfirmed int64 `json:"messages_unconfirmed"`
|
||||
MessageStats FederationLinkChannelMessageStats `json:"message_stats"`
|
||||
MessageStats federationLinkChannelMessageStats `json:"message_stats"`
|
||||
}
|
||||
|
||||
type FederationLink struct {
|
||||
type federationLink struct {
|
||||
Type string `json:"type"`
|
||||
Queue string `json:"queue"`
|
||||
UpstreamQueue string `json:"upstream_queue"`
|
||||
|
|
@ -212,19 +204,15 @@ type FederationLink struct {
|
|||
UpstreamExchange string `json:"upstream_exchange"`
|
||||
Vhost string `json:"vhost"`
|
||||
Upstream string `json:"upstream"`
|
||||
LocalChannel FederationLinkChannel `json:"local_channel"`
|
||||
LocalChannel federationLinkChannel `json:"local_channel"`
|
||||
}
|
||||
|
||||
type HealthCheck struct {
|
||||
Status string `json:"status"`
|
||||
type memoryResponse struct {
|
||||
Memory *memory `json:"memory"`
|
||||
}
|
||||
|
||||
type MemoryResponse struct {
|
||||
Memory *Memory `json:"memory"`
|
||||
}
|
||||
|
||||
// Memory details
|
||||
type Memory struct {
|
||||
// memory details
|
||||
type memory struct {
|
||||
ConnectionReaders int64 `json:"connection_readers"`
|
||||
ConnectionWriters int64 `json:"connection_writers"`
|
||||
ConnectionChannels int64 `json:"connection_channels"`
|
||||
|
|
@ -247,8 +235,7 @@ type Memory struct {
|
|||
Total interface{} `json:"total"`
|
||||
}
|
||||
|
||||
// Error response
|
||||
type ErrorResponse struct {
|
||||
type errorResponse struct {
|
||||
Error string `json:"error"`
|
||||
Reason string `json:"reason"`
|
||||
}
|
||||
|
|
@ -326,7 +313,7 @@ func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error {
|
|||
|
||||
func (r *RabbitMQ) requestEndpoint(u string) ([]byte, error) {
|
||||
if r.URL == "" {
|
||||
r.URL = DefaultURL
|
||||
r.URL = defaultURL
|
||||
}
|
||||
endpoint := r.URL + u
|
||||
r.Log.Debugf("Requesting %q...", endpoint)
|
||||
|
|
@ -336,7 +323,7 @@ func (r *RabbitMQ) requestEndpoint(u string) ([]byte, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
username := DefaultUsername
|
||||
username := defaultUsername
|
||||
if !r.Username.Empty() {
|
||||
usernameSecret, err := r.Username.Get()
|
||||
if err != nil {
|
||||
|
|
@ -346,7 +333,7 @@ func (r *RabbitMQ) requestEndpoint(u string) ([]byte, error) {
|
|||
username = usernameSecret.String()
|
||||
}
|
||||
|
||||
password := DefaultPassword
|
||||
password := defaultPassword
|
||||
if !r.Password.Empty() {
|
||||
passwordSecret, err := r.Password.Get()
|
||||
if err != nil {
|
||||
|
|
@ -381,7 +368,7 @@ func (r *RabbitMQ) requestJSON(u string, target interface{}) error {
|
|||
var jsonErr *json.UnmarshalTypeError
|
||||
if errors.As(err, &jsonErr) {
|
||||
// Try to get the error reason from the response
|
||||
var errResponse ErrorResponse
|
||||
var errResponse errorResponse
|
||||
if json.Unmarshal(buf, &errResponse) == nil && errResponse.Error != "" {
|
||||
// Return the error reason in the response
|
||||
return fmt.Errorf("error response trying to get %q: %q (reason: %q)", u, errResponse.Error, errResponse.Reason)
|
||||
|
|
@ -395,7 +382,7 @@ func (r *RabbitMQ) requestJSON(u string, target interface{}) error {
|
|||
}
|
||||
|
||||
func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
|
||||
overview := &OverviewResponse{}
|
||||
overview := &overviewResponse{}
|
||||
|
||||
err := r.requestJSON("/api/overview", &overview)
|
||||
if err != nil {
|
||||
|
|
@ -443,7 +430,7 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
|
|||
}
|
||||
|
||||
func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) {
|
||||
allNodes := make([]*Node, 0)
|
||||
allNodes := make([]*node, 0)
|
||||
|
||||
err := r.requestJSON("/api/nodes", &allNodes)
|
||||
if err != nil {
|
||||
|
|
@ -452,57 +439,57 @@ func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) {
|
|||
}
|
||||
|
||||
nodes := allNodes[:0]
|
||||
for _, node := range allNodes {
|
||||
if r.shouldGatherNode(node) {
|
||||
nodes = append(nodes, node)
|
||||
for _, singleNode := range allNodes {
|
||||
if r.shouldGatherNode(singleNode) {
|
||||
nodes = append(nodes, singleNode)
|
||||
}
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for _, node := range nodes {
|
||||
for _, singleNode := range nodes {
|
||||
wg.Add(1)
|
||||
go func(node *Node) {
|
||||
go func(singleNode *node) {
|
||||
defer wg.Done()
|
||||
|
||||
tags := map[string]string{"url": r.URL}
|
||||
tags["node"] = node.Name
|
||||
tags["node"] = singleNode.Name
|
||||
|
||||
fields := map[string]interface{}{
|
||||
"disk_free": node.DiskFree,
|
||||
"disk_free_limit": node.DiskFreeLimit,
|
||||
"disk_free_alarm": boolToInt(node.DiskFreeAlarm),
|
||||
"fd_total": node.FdTotal,
|
||||
"fd_used": node.FdUsed,
|
||||
"mem_limit": node.MemLimit,
|
||||
"mem_used": node.MemUsed,
|
||||
"mem_alarm": boolToInt(node.MemAlarm),
|
||||
"proc_total": node.ProcTotal,
|
||||
"proc_used": node.ProcUsed,
|
||||
"run_queue": node.RunQueue,
|
||||
"sockets_total": node.SocketsTotal,
|
||||
"sockets_used": node.SocketsUsed,
|
||||
"uptime": node.Uptime,
|
||||
"mnesia_disk_tx_count": node.MnesiaDiskTxCount,
|
||||
"mnesia_disk_tx_count_rate": node.MnesiaDiskTxCountDetails.Rate,
|
||||
"mnesia_ram_tx_count": node.MnesiaRAMTxCount,
|
||||
"mnesia_ram_tx_count_rate": node.MnesiaRAMTxCountDetails.Rate,
|
||||
"gc_num": node.GcNum,
|
||||
"gc_num_rate": node.GcNumDetails.Rate,
|
||||
"gc_bytes_reclaimed": node.GcBytesReclaimed,
|
||||
"gc_bytes_reclaimed_rate": node.GcBytesReclaimedDetails.Rate,
|
||||
"io_read_avg_time": node.IoReadAvgTime,
|
||||
"io_read_avg_time_rate": node.IoReadAvgTimeDetails.Rate,
|
||||
"io_read_bytes": node.IoReadBytes,
|
||||
"io_read_bytes_rate": node.IoReadBytesDetails.Rate,
|
||||
"io_write_avg_time": node.IoWriteAvgTime,
|
||||
"io_write_avg_time_rate": node.IoWriteAvgTimeDetails.Rate,
|
||||
"io_write_bytes": node.IoWriteBytes,
|
||||
"io_write_bytes_rate": node.IoWriteBytesDetails.Rate,
|
||||
"running": boolToInt(node.Running),
|
||||
"disk_free": singleNode.DiskFree,
|
||||
"disk_free_limit": singleNode.DiskFreeLimit,
|
||||
"disk_free_alarm": boolToInt(singleNode.DiskFreeAlarm),
|
||||
"fd_total": singleNode.FdTotal,
|
||||
"fd_used": singleNode.FdUsed,
|
||||
"mem_limit": singleNode.MemLimit,
|
||||
"mem_used": singleNode.MemUsed,
|
||||
"mem_alarm": boolToInt(singleNode.MemAlarm),
|
||||
"proc_total": singleNode.ProcTotal,
|
||||
"proc_used": singleNode.ProcUsed,
|
||||
"run_queue": singleNode.RunQueue,
|
||||
"sockets_total": singleNode.SocketsTotal,
|
||||
"sockets_used": singleNode.SocketsUsed,
|
||||
"uptime": singleNode.Uptime,
|
||||
"mnesia_disk_tx_count": singleNode.MnesiaDiskTxCount,
|
||||
"mnesia_disk_tx_count_rate": singleNode.MnesiaDiskTxCountDetails.Rate,
|
||||
"mnesia_ram_tx_count": singleNode.MnesiaRAMTxCount,
|
||||
"mnesia_ram_tx_count_rate": singleNode.MnesiaRAMTxCountDetails.Rate,
|
||||
"gc_num": singleNode.GcNum,
|
||||
"gc_num_rate": singleNode.GcNumDetails.Rate,
|
||||
"gc_bytes_reclaimed": singleNode.GcBytesReclaimed,
|
||||
"gc_bytes_reclaimed_rate": singleNode.GcBytesReclaimedDetails.Rate,
|
||||
"io_read_avg_time": singleNode.IoReadAvgTime,
|
||||
"io_read_avg_time_rate": singleNode.IoReadAvgTimeDetails.Rate,
|
||||
"io_read_bytes": singleNode.IoReadBytes,
|
||||
"io_read_bytes_rate": singleNode.IoReadBytesDetails.Rate,
|
||||
"io_write_avg_time": singleNode.IoWriteAvgTime,
|
||||
"io_write_avg_time_rate": singleNode.IoWriteAvgTimeDetails.Rate,
|
||||
"io_write_bytes": singleNode.IoWriteBytes,
|
||||
"io_write_bytes_rate": singleNode.IoWriteBytesDetails.Rate,
|
||||
"running": boolToInt(singleNode.Running),
|
||||
}
|
||||
|
||||
var memory MemoryResponse
|
||||
err = r.requestJSON("/api/nodes/"+node.Name+"/memory", &memory)
|
||||
var memory memoryResponse
|
||||
err = r.requestJSON("/api/nodes/"+singleNode.Name+"/memory", &memory)
|
||||
if err != nil {
|
||||
acc.AddError(err)
|
||||
return
|
||||
|
|
@ -552,7 +539,7 @@ func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) {
|
|||
}
|
||||
|
||||
acc.AddFields("rabbitmq_node", fields, tags)
|
||||
}(node)
|
||||
}(singleNode)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
|
@ -563,7 +550,7 @@ func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator) {
|
|||
return
|
||||
}
|
||||
// Gather information about queues
|
||||
queues := make([]Queue, 0)
|
||||
queues := make([]queue, 0)
|
||||
err := r.requestJSON("/api/queues", &queues)
|
||||
if err != nil {
|
||||
acc.AddError(err)
|
||||
|
|
@ -600,16 +587,16 @@ func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator) {
|
|||
"messages": queue.Messages,
|
||||
"messages_ready": queue.MessagesReady,
|
||||
"messages_unack": queue.MessagesUnacknowledged,
|
||||
"messages_ack": queue.MessageStats.Ack,
|
||||
"messages_ack_rate": queue.MessageStats.AckDetails.Rate,
|
||||
"messages_deliver": queue.MessageStats.Deliver,
|
||||
"messages_deliver_rate": queue.MessageStats.DeliverDetails.Rate,
|
||||
"messages_deliver_get": queue.MessageStats.DeliverGet,
|
||||
"messages_deliver_get_rate": queue.MessageStats.DeliverGetDetails.Rate,
|
||||
"messages_publish": queue.MessageStats.Publish,
|
||||
"messages_publish_rate": queue.MessageStats.PublishDetails.Rate,
|
||||
"messages_redeliver": queue.MessageStats.Redeliver,
|
||||
"messages_redeliver_rate": queue.MessageStats.RedeliverDetails.Rate,
|
||||
"messages_ack": queue.messageStats.Ack,
|
||||
"messages_ack_rate": queue.messageStats.AckDetails.Rate,
|
||||
"messages_deliver": queue.messageStats.Deliver,
|
||||
"messages_deliver_rate": queue.messageStats.DeliverDetails.Rate,
|
||||
"messages_deliver_get": queue.messageStats.DeliverGet,
|
||||
"messages_deliver_get_rate": queue.messageStats.DeliverGetDetails.Rate,
|
||||
"messages_publish": queue.messageStats.Publish,
|
||||
"messages_publish_rate": queue.messageStats.PublishDetails.Rate,
|
||||
"messages_redeliver": queue.messageStats.Redeliver,
|
||||
"messages_redeliver_rate": queue.messageStats.RedeliverDetails.Rate,
|
||||
}
|
||||
|
||||
if queue.HeadMessageTimestamp != nil {
|
||||
|
|
@ -626,7 +613,7 @@ func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator) {
|
|||
|
||||
func gatherExchanges(r *RabbitMQ, acc telegraf.Accumulator) {
|
||||
// Gather information about exchanges
|
||||
exchanges := make([]Exchange, 0)
|
||||
exchanges := make([]exchange, 0)
|
||||
err := r.requestJSON("/api/exchanges", &exchanges)
|
||||
if err != nil {
|
||||
acc.AddError(err)
|
||||
|
|
@ -650,10 +637,10 @@ func gatherExchanges(r *RabbitMQ, acc telegraf.Accumulator) {
|
|||
acc.AddFields(
|
||||
"rabbitmq_exchange",
|
||||
map[string]interface{}{
|
||||
"messages_publish_in": exchange.MessageStats.PublishIn,
|
||||
"messages_publish_in_rate": exchange.MessageStats.PublishInDetails.Rate,
|
||||
"messages_publish_out": exchange.MessageStats.PublishOut,
|
||||
"messages_publish_out_rate": exchange.MessageStats.PublishOutDetails.Rate,
|
||||
"messages_publish_in": exchange.messageStats.PublishIn,
|
||||
"messages_publish_in_rate": exchange.messageStats.PublishInDetails.Rate,
|
||||
"messages_publish_out": exchange.messageStats.PublishOut,
|
||||
"messages_publish_out_rate": exchange.messageStats.PublishOutDetails.Rate,
|
||||
},
|
||||
tags,
|
||||
)
|
||||
|
|
@ -662,7 +649,7 @@ func gatherExchanges(r *RabbitMQ, acc telegraf.Accumulator) {
|
|||
|
||||
func gatherFederationLinks(r *RabbitMQ, acc telegraf.Accumulator) {
|
||||
// Gather information about federation links
|
||||
federationLinks := make([]FederationLink, 0)
|
||||
federationLinks := make([]federationLink, 0)
|
||||
err := r.requestJSON("/api/federation-links", &federationLinks)
|
||||
if err != nil {
|
||||
acc.AddError(err)
|
||||
|
|
@ -706,7 +693,7 @@ func gatherFederationLinks(r *RabbitMQ, acc telegraf.Accumulator) {
|
|||
}
|
||||
}
|
||||
|
||||
func (r *RabbitMQ) shouldGatherNode(node *Node) bool {
|
||||
func (r *RabbitMQ) shouldGatherNode(node *node) bool {
|
||||
if len(r.Nodes) == 0 {
|
||||
return true
|
||||
}
|
||||
|
|
@ -765,7 +752,7 @@ func (r *RabbitMQ) shouldGatherExchange(exchangeName string) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (r *RabbitMQ) shouldGatherFederationLink(link FederationLink) bool {
|
||||
func (r *RabbitMQ) shouldGatherFederationLink(link federationLink) bool {
|
||||
if !r.upstreamFilter.Match(link.Upstream) {
|
||||
return false
|
||||
}
|
||||
|
|
@ -783,8 +770,8 @@ func (r *RabbitMQ) shouldGatherFederationLink(link FederationLink) bool {
|
|||
func init() {
|
||||
inputs.Add("rabbitmq", func() telegraf.Input {
|
||||
return &RabbitMQ{
|
||||
ResponseHeaderTimeout: config.Duration(DefaultResponseHeaderTimeout * time.Second),
|
||||
ClientTimeout: config.Duration(DefaultClientTimeout * time.Second),
|
||||
ResponseHeaderTimeout: config.Duration(defaultResponseHeaderTimeout * time.Second),
|
||||
ClientTimeout: config.Duration(defaultClientTimeout * time.Second),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ const (
|
|||
tagSetChassis = "chassis"
|
||||
)
|
||||
|
||||
type System struct {
|
||||
type system struct {
|
||||
Hostname string `json:"hostname"`
|
||||
Links struct {
|
||||
Chassis []struct {
|
||||
|
|
@ -55,9 +55,9 @@ type System struct {
|
|||
}
|
||||
}
|
||||
|
||||
type Chassis struct {
|
||||
type chassis struct {
|
||||
ChassisType string
|
||||
Location *Location
|
||||
Location *location
|
||||
Manufacturer string
|
||||
Model string
|
||||
PartNumber string
|
||||
|
|
@ -67,13 +67,13 @@ type Chassis struct {
|
|||
PowerState string
|
||||
SKU string
|
||||
SerialNumber string
|
||||
Status Status
|
||||
Status status
|
||||
Thermal struct {
|
||||
Ref string `json:"@odata.id"`
|
||||
}
|
||||
}
|
||||
|
||||
type Power struct {
|
||||
type power struct {
|
||||
PowerControl []struct {
|
||||
Name string
|
||||
MemberID string
|
||||
|
|
@ -96,7 +96,7 @@ type Power struct {
|
|||
PowerCapacityWatts *float64
|
||||
PowerOutputWatts *float64
|
||||
LastPowerOutputWatts *float64
|
||||
Status Status
|
||||
Status status
|
||||
LineInputVoltage *float64
|
||||
}
|
||||
Voltages []struct {
|
||||
|
|
@ -107,11 +107,11 @@ type Power struct {
|
|||
UpperThresholdFatal *float64
|
||||
LowerThresholdCritical *float64
|
||||
LowerThresholdFatal *float64
|
||||
Status Status
|
||||
Status status
|
||||
}
|
||||
}
|
||||
|
||||
type Thermal struct {
|
||||
type thermal struct {
|
||||
Fans []struct {
|
||||
Name string
|
||||
MemberID string
|
||||
|
|
@ -123,7 +123,7 @@ type Thermal struct {
|
|||
UpperThresholdFatal *int64
|
||||
LowerThresholdCritical *int64
|
||||
LowerThresholdFatal *int64
|
||||
Status Status
|
||||
Status status
|
||||
}
|
||||
Temperatures []struct {
|
||||
Name string
|
||||
|
|
@ -133,11 +133,11 @@ type Thermal struct {
|
|||
UpperThresholdFatal *float64
|
||||
LowerThresholdCritical *float64
|
||||
LowerThresholdFatal *float64
|
||||
Status Status
|
||||
Status status
|
||||
}
|
||||
}
|
||||
|
||||
type Location struct {
|
||||
type location struct {
|
||||
PostalAddress struct {
|
||||
DataCenter string
|
||||
Room string
|
||||
|
|
@ -148,7 +148,7 @@ type Location struct {
|
|||
}
|
||||
}
|
||||
|
||||
type Status struct {
|
||||
type status struct {
|
||||
State string
|
||||
Health string
|
||||
}
|
||||
|
|
@ -271,9 +271,9 @@ func (r *Redfish) getData(address string, payload interface{}) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (r *Redfish) getComputerSystem(id string) (*System, error) {
|
||||
func (r *Redfish) getComputerSystem(id string) (*system, error) {
|
||||
loc := r.baseURL.ResolveReference(&url.URL{Path: path.Join("/redfish/v1/Systems/", id)})
|
||||
system := &System{}
|
||||
system := &system{}
|
||||
err := r.getData(loc.String(), system)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -281,9 +281,9 @@ func (r *Redfish) getComputerSystem(id string) (*System, error) {
|
|||
return system, nil
|
||||
}
|
||||
|
||||
func (r *Redfish) getChassis(ref string) (*Chassis, error) {
|
||||
func (r *Redfish) getChassis(ref string) (*chassis, error) {
|
||||
loc := r.baseURL.ResolveReference(&url.URL{Path: ref})
|
||||
chassis := &Chassis{}
|
||||
chassis := &chassis{}
|
||||
err := r.getData(loc.String(), chassis)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -291,9 +291,9 @@ func (r *Redfish) getChassis(ref string) (*Chassis, error) {
|
|||
return chassis, nil
|
||||
}
|
||||
|
||||
func (r *Redfish) getPower(ref string) (*Power, error) {
|
||||
func (r *Redfish) getPower(ref string) (*power, error) {
|
||||
loc := r.baseURL.ResolveReference(&url.URL{Path: ref})
|
||||
power := &Power{}
|
||||
power := &power{}
|
||||
err := r.getData(loc.String(), power)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -301,9 +301,9 @@ func (r *Redfish) getPower(ref string) (*Power, error) {
|
|||
return power, nil
|
||||
}
|
||||
|
||||
func (r *Redfish) getThermal(ref string) (*Thermal, error) {
|
||||
func (r *Redfish) getThermal(ref string) (*thermal, error) {
|
||||
loc := r.baseURL.ResolveReference(&url.URL{Path: ref})
|
||||
thermal := &Thermal{}
|
||||
thermal := &thermal{}
|
||||
err := r.getData(loc.String(), thermal)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -311,7 +311,7 @@ func (r *Redfish) getThermal(ref string) (*Thermal, error) {
|
|||
return thermal, nil
|
||||
}
|
||||
|
||||
func setChassisTags(chassis *Chassis, tags map[string]string) {
|
||||
func setChassisTags(chassis *chassis, tags map[string]string) {
|
||||
tags["chassis_chassistype"] = chassis.ChassisType
|
||||
tags["chassis_manufacturer"] = chassis.Manufacturer
|
||||
tags["chassis_model"] = chassis.Model
|
||||
|
|
@ -358,7 +358,7 @@ func (r *Redfish) Gather(acc telegraf.Accumulator) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (r *Redfish) gatherThermal(acc telegraf.Accumulator, address string, system *System, chassis *Chassis) error {
|
||||
func (r *Redfish) gatherThermal(acc telegraf.Accumulator, address string, system *system, chassis *chassis) error {
|
||||
thermal, err := r.getThermal(chassis.Thermal.Ref)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
@ -430,7 +430,7 @@ func (r *Redfish) gatherThermal(acc telegraf.Accumulator, address string, system
|
|||
return nil
|
||||
}
|
||||
|
||||
func (r *Redfish) gatherPower(acc telegraf.Accumulator, address string, system *System, chassis *Chassis) error {
|
||||
func (r *Redfish) gatherPower(acc telegraf.Accumulator, address string, system *system, chassis *chassis) error {
|
||||
power, err := r.getPower(chassis.Power.Ref)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ import (
|
|||
type serverStatus struct {
|
||||
ID string `gorethink:"id"`
|
||||
Network struct {
|
||||
Addresses []Address `gorethink:"canonical_addresses"`
|
||||
Addresses []address `gorethink:"canonical_addresses"`
|
||||
Hostname string `gorethink:"hostname"`
|
||||
DriverPort int `gorethink:"reql_port"`
|
||||
} `gorethink:"network"`
|
||||
|
|
@ -20,16 +20,16 @@ type serverStatus struct {
|
|||
} `gorethink:"process"`
|
||||
}
|
||||
|
||||
type Address struct {
|
||||
type address struct {
|
||||
Host string `gorethink:"host"`
|
||||
Port int `gorethink:"port"`
|
||||
}
|
||||
|
||||
type stats struct {
|
||||
Engine Engine `gorethink:"query_engine"`
|
||||
Engine engine `gorethink:"query_engine"`
|
||||
}
|
||||
|
||||
type Engine struct {
|
||||
type engine struct {
|
||||
ClientConns int64 `gorethink:"client_connections,omitempty"`
|
||||
ClientActive int64 `gorethink:"clients_active,omitempty"`
|
||||
QueriesPerSec int64 `gorethink:"queries_per_sec,omitempty"`
|
||||
|
|
@ -47,28 +47,28 @@ type tableStatus struct {
|
|||
}
|
||||
|
||||
type tableStats struct {
|
||||
Engine Engine `gorethink:"query_engine"`
|
||||
Storage Storage `gorethink:"storage_engine"`
|
||||
Engine engine `gorethink:"query_engine"`
|
||||
Storage storage `gorethink:"storage_engine"`
|
||||
}
|
||||
|
||||
type Storage struct {
|
||||
Cache Cache `gorethink:"cache"`
|
||||
Disk Disk `gorethink:"disk"`
|
||||
type storage struct {
|
||||
Cache cache `gorethink:"cache"`
|
||||
Disk disk `gorethink:"disk"`
|
||||
}
|
||||
|
||||
type Cache struct {
|
||||
type cache struct {
|
||||
BytesInUse int64 `gorethink:"in_use_bytes"`
|
||||
}
|
||||
|
||||
type Disk struct {
|
||||
type disk struct {
|
||||
ReadBytesPerSec int64 `gorethink:"read_bytes_per_sec"`
|
||||
ReadBytesTotal int64 `gorethink:"read_bytes_total"`
|
||||
WriteBytesPerSec int64 `gorethik:"written_bytes_per_sec"`
|
||||
WriteBytesTotal int64 `gorethink:"written_bytes_total"`
|
||||
SpaceUsage SpaceUsage `gorethink:"space_usage"`
|
||||
SpaceUsage spaceUsage `gorethink:"space_usage"`
|
||||
}
|
||||
|
||||
type SpaceUsage struct {
|
||||
type spaceUsage struct {
|
||||
Data int64 `gorethink:"data_bytes"`
|
||||
Garbage int64 `gorethink:"garbage_bytes"`
|
||||
Metadata int64 `gorethink:"metadata_bytes"`
|
||||
|
|
@ -86,7 +86,7 @@ var engineStats = map[string]string{
|
|||
"total_writes": "TotalWrites",
|
||||
}
|
||||
|
||||
func (e *Engine) AddEngineStats(
|
||||
func (e *engine) AddEngineStats(
|
||||
keys []string,
|
||||
acc telegraf.Accumulator,
|
||||
tags map[string]string,
|
||||
|
|
@ -99,7 +99,7 @@ func (e *Engine) AddEngineStats(
|
|||
acc.AddFields("rethinkdb_engine", fields, tags)
|
||||
}
|
||||
|
||||
func (s *Storage) AddStats(acc telegraf.Accumulator, tags map[string]string) {
|
||||
func (s *storage) AddStats(acc telegraf.Accumulator, tags map[string]string) {
|
||||
fields := map[string]interface{}{
|
||||
"cache_bytes_in_use": s.Cache.BytesInUse,
|
||||
"disk_read_bytes_per_sec": s.Disk.ReadBytesPerSec,
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ import (
|
|||
var tags = make(map[string]string)
|
||||
|
||||
func TestAddEngineStats(t *testing.T) {
|
||||
engine := &Engine{
|
||||
engine := &engine{
|
||||
ClientConns: 0,
|
||||
ClientActive: 0,
|
||||
QueriesPerSec: 0,
|
||||
|
|
@ -42,7 +42,7 @@ func TestAddEngineStats(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestAddEngineStatsPartial(t *testing.T) {
|
||||
engine := &Engine{
|
||||
engine := &engine{
|
||||
ClientConns: 0,
|
||||
ClientActive: 0,
|
||||
QueriesPerSec: 0,
|
||||
|
|
@ -73,16 +73,16 @@ func TestAddEngineStatsPartial(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestAddStorageStats(t *testing.T) {
|
||||
storage := &Storage{
|
||||
Cache: Cache{
|
||||
storage := &storage{
|
||||
Cache: cache{
|
||||
BytesInUse: 0,
|
||||
},
|
||||
Disk: Disk{
|
||||
Disk: disk{
|
||||
ReadBytesPerSec: 0,
|
||||
ReadBytesTotal: 0,
|
||||
WriteBytesPerSec: 0,
|
||||
WriteBytesTotal: 0,
|
||||
SpaceUsage: SpaceUsage{
|
||||
SpaceUsage: spaceUsage{
|
||||
Data: 0,
|
||||
Garbage: 0,
|
||||
Metadata: 0,
|
||||
|
|
|
|||
|
|
@ -65,8 +65,8 @@ func TestIPv4SW(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
actual := []telegraf.Metric{}
|
||||
dc := NewDecoder()
|
||||
dc.OnPacket(func(p *V5Format) {
|
||||
dc := newDecoder()
|
||||
dc.OnPacket(func(p *v5Format) {
|
||||
metrics := makeMetrics(p)
|
||||
actual = append(actual, metrics...)
|
||||
})
|
||||
|
|
@ -160,7 +160,7 @@ func BenchmarkDecodeIPv4SW(b *testing.B) {
|
|||
)
|
||||
require.NoError(b, err)
|
||||
|
||||
dc := NewDecoder()
|
||||
dc := newDecoder()
|
||||
require.NoError(b, err)
|
||||
|
||||
b.ResetTimer()
|
||||
|
|
@ -188,7 +188,7 @@ func TestExpandFlow(t *testing.T) {
|
|||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
dc := NewDecoder()
|
||||
dc := newDecoder()
|
||||
p, err := dc.DecodeOnePacket(bytes.NewBuffer(packet))
|
||||
require.NoError(t, err)
|
||||
actual := makeMetrics(p)
|
||||
|
|
@ -329,7 +329,7 @@ func TestIPv4SWRT(t *testing.T) {
|
|||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
dc := NewDecoder()
|
||||
dc := newDecoder()
|
||||
p, err := dc.DecodeOnePacket(bytes.NewBuffer(packet))
|
||||
require.NoError(t, err)
|
||||
actual := makeMetrics(p)
|
||||
|
|
@ -556,7 +556,7 @@ func TestIPv6SW(t *testing.T) {
|
|||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
dc := NewDecoder()
|
||||
dc := newDecoder()
|
||||
p, err := dc.DecodeOnePacket(bytes.NewBuffer(packet))
|
||||
require.NoError(t, err)
|
||||
actual := makeMetrics(p)
|
||||
|
|
@ -627,7 +627,7 @@ func TestExpandFlowCounter(t *testing.T) {
|
|||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
dc := NewDecoder()
|
||||
dc := newDecoder()
|
||||
p, err := dc.DecodeOnePacket(bytes.NewBuffer(packet))
|
||||
require.NoError(t, err)
|
||||
actual := makeMetrics(p)
|
||||
|
|
@ -829,7 +829,7 @@ func TestFlowExpandCounter(t *testing.T) {
|
|||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
dc := NewDecoder()
|
||||
dc := newDecoder()
|
||||
p, err := dc.DecodeOnePacket(bytes.NewBuffer(packet))
|
||||
require.NoError(t, err)
|
||||
actual := makeMetrics(p)
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ import (
|
|||
"github.com/influxdata/telegraf/metric"
|
||||
)
|
||||
|
||||
func makeMetrics(p *V5Format) []telegraf.Metric {
|
||||
func makeMetrics(p *v5Format) []telegraf.Metric {
|
||||
now := time.Now()
|
||||
metrics := []telegraf.Metric{}
|
||||
tags := map[string]string{
|
||||
|
|
@ -26,8 +26,8 @@ func makeMetrics(p *V5Format) []telegraf.Metric {
|
|||
|
||||
for _, flowRecord := range sample.SampleData.FlowRecords {
|
||||
if flowRecord.FlowData != nil {
|
||||
tags2 := flowRecord.FlowData.GetTags()
|
||||
fields2 := flowRecord.FlowData.GetFields()
|
||||
tags2 := flowRecord.FlowData.getTags()
|
||||
fields2 := flowRecord.FlowData.getFields()
|
||||
for k, v := range tags {
|
||||
tags2[k] = v
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,28 +10,28 @@ import (
|
|||
"github.com/influxdata/telegraf/plugins/inputs/sflow/binaryio"
|
||||
)
|
||||
|
||||
type PacketDecoder struct {
|
||||
onPacket func(p *V5Format)
|
||||
type packetDecoder struct {
|
||||
onPacket func(p *v5Format)
|
||||
Log telegraf.Logger
|
||||
}
|
||||
|
||||
func NewDecoder() *PacketDecoder {
|
||||
return &PacketDecoder{}
|
||||
func newDecoder() *packetDecoder {
|
||||
return &packetDecoder{}
|
||||
}
|
||||
|
||||
func (d *PacketDecoder) debug(args ...interface{}) {
|
||||
func (d *packetDecoder) debug(args ...interface{}) {
|
||||
if d.Log != nil {
|
||||
d.Log.Debug(args...)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *PacketDecoder) OnPacket(f func(p *V5Format)) {
|
||||
func (d *packetDecoder) OnPacket(f func(p *v5Format)) {
|
||||
d.onPacket = f
|
||||
}
|
||||
|
||||
func (d *PacketDecoder) Decode(r io.Reader) error {
|
||||
func (d *packetDecoder) Decode(r io.Reader) error {
|
||||
var err error
|
||||
var packet *V5Format
|
||||
var packet *v5Format
|
||||
for err == nil {
|
||||
packet, err = d.DecodeOnePacket(r)
|
||||
if err != nil {
|
||||
|
|
@ -53,8 +53,8 @@ const (
|
|||
AddressTypeIPV6 AddressType = 2
|
||||
)
|
||||
|
||||
func (d *PacketDecoder) DecodeOnePacket(r io.Reader) (*V5Format, error) {
|
||||
p := &V5Format{}
|
||||
func (d *packetDecoder) DecodeOnePacket(r io.Reader) (*v5Format, error) {
|
||||
p := &v5Format{}
|
||||
err := read(r, &p.Version, "version")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -93,8 +93,8 @@ func (d *PacketDecoder) DecodeOnePacket(r io.Reader) (*V5Format, error) {
|
|||
return p, err
|
||||
}
|
||||
|
||||
func (d *PacketDecoder) decodeSamples(r io.Reader) ([]Sample, error) {
|
||||
result := []Sample{}
|
||||
func (d *packetDecoder) decodeSamples(r io.Reader) ([]sample, error) {
|
||||
result := []sample{}
|
||||
// # of samples
|
||||
var numOfSamples uint32
|
||||
if err := read(r, &numOfSamples, "sample count"); err != nil {
|
||||
|
|
@ -112,10 +112,10 @@ func (d *PacketDecoder) decodeSamples(r io.Reader) ([]Sample, error) {
|
|||
return result, nil
|
||||
}
|
||||
|
||||
func (d *PacketDecoder) decodeSample(r io.Reader) (Sample, error) {
|
||||
func (d *packetDecoder) decodeSample(r io.Reader) (sample, error) {
|
||||
var err error
|
||||
sam := Sample{}
|
||||
if err := read(r, &sam.SampleType, "SampleType"); err != nil {
|
||||
sam := sample{}
|
||||
if err := read(r, &sam.SampleType, "sampleType"); err != nil {
|
||||
return sam, err
|
||||
}
|
||||
sampleDataLen := uint32(0)
|
||||
|
|
@ -126,9 +126,9 @@ func (d *PacketDecoder) decodeSample(r io.Reader) (Sample, error) {
|
|||
defer mr.Close()
|
||||
|
||||
switch sam.SampleType {
|
||||
case SampleTypeFlowSample:
|
||||
case sampleTypeFlowSample:
|
||||
sam.SampleData, err = d.decodeFlowSample(mr)
|
||||
case SampleTypeFlowSampleExpanded:
|
||||
case sampleTypeFlowSampleExpanded:
|
||||
sam.SampleData, err = d.decodeFlowSampleExpanded(mr)
|
||||
default:
|
||||
d.debug("Unknown sample type: ", sam.SampleType)
|
||||
|
|
@ -142,7 +142,7 @@ const (
|
|||
InterfaceFormatTypePacketDiscarded InterfaceFormatType = 1
|
||||
)
|
||||
|
||||
func (d *PacketDecoder) decodeFlowSample(r io.Reader) (t SampleDataFlowSampleExpanded, err error) {
|
||||
func (d *packetDecoder) decodeFlowSample(r io.Reader) (t sampleDataFlowSampleExpanded, err error) {
|
||||
if err := read(r, &t.SequenceNumber, "SequenceNumber"); err != nil {
|
||||
return t, err
|
||||
}
|
||||
|
|
@ -185,7 +185,7 @@ func (d *PacketDecoder) decodeFlowSample(r io.Reader) (t SampleDataFlowSampleExp
|
|||
return t, err
|
||||
}
|
||||
|
||||
func (d *PacketDecoder) decodeFlowSampleExpanded(r io.Reader) (t SampleDataFlowSampleExpanded, err error) {
|
||||
func (d *packetDecoder) decodeFlowSampleExpanded(r io.Reader) (t sampleDataFlowSampleExpanded, err error) {
|
||||
if err := read(r, &t.SequenceNumber, "SequenceNumber"); err != nil { // sflow_version_5.txt line 1701
|
||||
return t, err
|
||||
}
|
||||
|
|
@ -228,14 +228,14 @@ func (d *PacketDecoder) decodeFlowSampleExpanded(r io.Reader) (t SampleDataFlowS
|
|||
return t, err
|
||||
}
|
||||
|
||||
func (d *PacketDecoder) decodeFlowRecords(r io.Reader, samplingRate uint32) (recs []FlowRecord, err error) {
|
||||
func (d *packetDecoder) decodeFlowRecords(r io.Reader, samplingRate uint32) (recs []flowRecord, err error) {
|
||||
var flowDataLen uint32
|
||||
var count uint32
|
||||
if err := read(r, &count, "FlowRecord count"); err != nil {
|
||||
return recs, err
|
||||
}
|
||||
for i := uint32(0); i < count; i++ {
|
||||
fr := FlowRecord{}
|
||||
fr := flowRecord{}
|
||||
if err := read(r, &fr.FlowFormat, "FlowFormat"); err != nil { // sflow_version_5.txt line 1597
|
||||
return recs, err
|
||||
}
|
||||
|
|
@ -246,7 +246,7 @@ func (d *PacketDecoder) decodeFlowRecords(r io.Reader, samplingRate uint32) (rec
|
|||
mr := binaryio.MinReader(r, int64(flowDataLen))
|
||||
|
||||
switch fr.FlowFormat {
|
||||
case FlowFormatTypeRawPacketHeader: // sflow_version_5.txt line 1938
|
||||
case flowFormatTypeRawPacketHeader: // sflow_version_5.txt line 1938
|
||||
fr.FlowData, err = d.decodeRawPacketHeaderFlowData(mr, samplingRate)
|
||||
default:
|
||||
d.debug("Unknown flow format: ", fr.FlowFormat)
|
||||
|
|
@ -263,7 +263,7 @@ func (d *PacketDecoder) decodeFlowRecords(r io.Reader, samplingRate uint32) (rec
|
|||
return recs, err
|
||||
}
|
||||
|
||||
func (d *PacketDecoder) decodeRawPacketHeaderFlowData(r io.Reader, samplingRate uint32) (h RawPacketHeaderFlowData, err error) {
|
||||
func (d *packetDecoder) decodeRawPacketHeaderFlowData(r io.Reader, samplingRate uint32) (h rawPacketHeaderFlowData, err error) {
|
||||
if err := read(r, &h.HeaderProtocol, "HeaderProtocol"); err != nil { // sflow_version_5.txt line 1940
|
||||
return h, err
|
||||
}
|
||||
|
|
@ -283,7 +283,7 @@ func (d *PacketDecoder) decodeRawPacketHeaderFlowData(r io.Reader, samplingRate
|
|||
defer mr.Close()
|
||||
|
||||
switch h.HeaderProtocol {
|
||||
case HeaderProtocolTypeEthernetISO88023:
|
||||
case headerProtocolTypeEthernetISO88023:
|
||||
h.Header, err = d.decodeEthHeader(mr)
|
||||
default:
|
||||
d.debug("Unknown header protocol type: ", h.HeaderProtocol)
|
||||
|
|
@ -294,7 +294,7 @@ func (d *PacketDecoder) decodeRawPacketHeaderFlowData(r io.Reader, samplingRate
|
|||
|
||||
// ethHeader answers a decode Directive that will decode an ethernet frame header
|
||||
// according to https://en.wikipedia.org/wiki/Ethernet_frame
|
||||
func (d *PacketDecoder) decodeEthHeader(r io.Reader) (h EthHeader, err error) {
|
||||
func (d *packetDecoder) decodeEthHeader(r io.Reader) (h ethHeader, err error) {
|
||||
// we may have to read out StrippedOctets bytes and throw them away first?
|
||||
if err := read(r, &h.DestinationMAC, "DestinationMAC"); err != nil {
|
||||
return h, err
|
||||
|
|
@ -318,7 +318,7 @@ func (d *PacketDecoder) decodeEthHeader(r io.Reader) (h EthHeader, err error) {
|
|||
default:
|
||||
h.EtherTypeCode = tagOrEType
|
||||
}
|
||||
h.EtherType = ETypeMap[h.EtherTypeCode]
|
||||
h.EtherType = eTypeMap[h.EtherTypeCode]
|
||||
switch h.EtherType {
|
||||
case "IPv4":
|
||||
h.IPHeader, err = d.decodeIPv4Header(r)
|
||||
|
|
@ -333,7 +333,7 @@ func (d *PacketDecoder) decodeEthHeader(r io.Reader) (h EthHeader, err error) {
|
|||
}
|
||||
|
||||
// https://en.wikipedia.org/wiki/IPv4#Header
|
||||
func (d *PacketDecoder) decodeIPv4Header(r io.Reader) (h IPV4Header, err error) {
|
||||
func (d *packetDecoder) decodeIPv4Header(r io.Reader) (h ipV4Header, err error) {
|
||||
if err := read(r, &h.Version, "Version"); err != nil {
|
||||
return h, err
|
||||
}
|
||||
|
|
@ -371,9 +371,9 @@ func (d *PacketDecoder) decodeIPv4Header(r io.Reader) (h IPV4Header, err error)
|
|||
return h, err
|
||||
}
|
||||
switch h.Protocol {
|
||||
case IPProtocolTCP:
|
||||
case ipProtocolTCP:
|
||||
h.ProtocolHeader, err = d.decodeTCPHeader(r)
|
||||
case IPProtocolUDP:
|
||||
case ipProtocolUDP:
|
||||
h.ProtocolHeader, err = d.decodeUDPHeader(r)
|
||||
default:
|
||||
d.debug("Unknown IP protocol: ", h.Protocol)
|
||||
|
|
@ -382,7 +382,7 @@ func (d *PacketDecoder) decodeIPv4Header(r io.Reader) (h IPV4Header, err error)
|
|||
}
|
||||
|
||||
// https://en.wikipedia.org/wiki/IPv6_packet
|
||||
func (d *PacketDecoder) decodeIPv6Header(r io.Reader) (h IPV6Header, err error) {
|
||||
func (d *packetDecoder) decodeIPv6Header(r io.Reader) (h ipV6Header, err error) {
|
||||
var fourByteBlock uint32
|
||||
if err := read(r, &fourByteBlock, "IPv6 header octet 0"); err != nil {
|
||||
return h, err
|
||||
|
|
@ -411,9 +411,9 @@ func (d *PacketDecoder) decodeIPv6Header(r io.Reader) (h IPV6Header, err error)
|
|||
return h, err
|
||||
}
|
||||
switch h.NextHeaderProto {
|
||||
case IPProtocolTCP:
|
||||
case ipProtocolTCP:
|
||||
h.ProtocolHeader, err = d.decodeTCPHeader(r)
|
||||
case IPProtocolUDP:
|
||||
case ipProtocolUDP:
|
||||
h.ProtocolHeader, err = d.decodeUDPHeader(r)
|
||||
default:
|
||||
// not handled
|
||||
|
|
@ -423,7 +423,7 @@ func (d *PacketDecoder) decodeIPv6Header(r io.Reader) (h IPV6Header, err error)
|
|||
}
|
||||
|
||||
// https://en.wikipedia.org/wiki/Transmission_Control_Protocol#TCP_segment_structure
|
||||
func (d *PacketDecoder) decodeTCPHeader(r io.Reader) (h TCPHeader, err error) {
|
||||
func (d *packetDecoder) decodeTCPHeader(r io.Reader) (h tcpHeader, err error) {
|
||||
if err := read(r, &h.SourcePort, "SourcePort"); err != nil {
|
||||
return h, err
|
||||
}
|
||||
|
|
@ -461,7 +461,7 @@ func (d *PacketDecoder) decodeTCPHeader(r io.Reader) (h TCPHeader, err error) {
|
|||
return h, err
|
||||
}
|
||||
|
||||
func (d *PacketDecoder) decodeUDPHeader(r io.Reader) (h UDPHeader, err error) {
|
||||
func (d *packetDecoder) decodeUDPHeader(r io.Reader) (h udpHeader, err error) {
|
||||
if err := read(r, &h.SourcePort, "SourcePort"); err != nil {
|
||||
return h, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,11 +15,11 @@ func TestUDPHeader(t *testing.T) {
|
|||
0x00, 0x00, // checksum
|
||||
})
|
||||
|
||||
dc := NewDecoder()
|
||||
dc := newDecoder()
|
||||
actual, err := dc.decodeUDPHeader(octets)
|
||||
require.NoError(t, err)
|
||||
|
||||
expected := UDPHeader{
|
||||
expected := udpHeader{
|
||||
SourcePort: 1,
|
||||
DestinationPort: 2,
|
||||
UDPLength: 3,
|
||||
|
|
@ -36,7 +36,7 @@ func BenchmarkUDPHeader(b *testing.B) {
|
|||
0x00, 0x00, // checksum
|
||||
})
|
||||
|
||||
dc := NewDecoder()
|
||||
dc := newDecoder()
|
||||
|
||||
b.ResetTimer()
|
||||
for n := 0; n < b.N; n++ {
|
||||
|
|
@ -64,11 +64,11 @@ func TestIPv4Header(t *testing.T) {
|
|||
0x00, 0x00, // checksum
|
||||
},
|
||||
)
|
||||
dc := NewDecoder()
|
||||
dc := newDecoder()
|
||||
actual, err := dc.decodeIPv4Header(octets)
|
||||
require.NoError(t, err)
|
||||
|
||||
expected := IPV4Header{
|
||||
expected := ipV4Header{
|
||||
Version: 0x40,
|
||||
InternetHeaderLength: 0x05,
|
||||
DSCP: 0,
|
||||
|
|
@ -82,7 +82,7 @@ func TestIPv4Header(t *testing.T) {
|
|||
HeaderChecksum: 0,
|
||||
SourceIP: [4]byte{127, 0, 0, 1},
|
||||
DestIP: [4]byte{127, 0, 0, 2},
|
||||
ProtocolHeader: UDPHeader{
|
||||
ProtocolHeader: udpHeader{
|
||||
SourcePort: 1,
|
||||
DestinationPort: 2,
|
||||
UDPLength: 3,
|
||||
|
|
@ -114,7 +114,7 @@ func TestIPv4HeaderSwitch(t *testing.T) {
|
|||
0x00, 0x00, // checksum
|
||||
},
|
||||
)
|
||||
dc := NewDecoder()
|
||||
dc := newDecoder()
|
||||
_, err := dc.decodeIPv4Header(octets)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
|
@ -140,17 +140,17 @@ func TestIPv4HeaderSwitch(t *testing.T) {
|
|||
0x00, 0x00, // tcp_urgent_pointer
|
||||
},
|
||||
)
|
||||
dc = NewDecoder()
|
||||
dc = newDecoder()
|
||||
actual, err := dc.decodeIPv4Header(octets)
|
||||
require.NoError(t, err)
|
||||
|
||||
expected := IPV4Header{
|
||||
expected := ipV4Header{
|
||||
Version: 64,
|
||||
InternetHeaderLength: 5,
|
||||
Protocol: 6,
|
||||
SourceIP: [4]byte{127, 0, 0, 1},
|
||||
DestIP: [4]byte{127, 0, 0, 2},
|
||||
ProtocolHeader: TCPHeader{
|
||||
ProtocolHeader: tcpHeader{
|
||||
SourcePort: 1,
|
||||
DestinationPort: 2,
|
||||
},
|
||||
|
|
@ -192,11 +192,11 @@ func TestUnknownProtocol(t *testing.T) {
|
|||
0x00,
|
||||
},
|
||||
)
|
||||
dc := NewDecoder()
|
||||
dc := newDecoder()
|
||||
actual, err := dc.decodeIPv4Header(octets)
|
||||
require.NoError(t, err)
|
||||
|
||||
expected := IPV4Header{
|
||||
expected := ipV4Header{
|
||||
Version: 64,
|
||||
InternetHeaderLength: 5,
|
||||
Protocol: 153,
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ type SFlow struct {
|
|||
Log telegraf.Logger `toml:"-"`
|
||||
|
||||
addr net.Addr
|
||||
decoder *PacketDecoder
|
||||
decoder *packetDecoder
|
||||
closer io.Closer
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
|
@ -40,14 +40,14 @@ func (*SFlow) SampleConfig() string {
|
|||
}
|
||||
|
||||
func (s *SFlow) Init() error {
|
||||
s.decoder = NewDecoder()
|
||||
s.decoder = newDecoder()
|
||||
s.decoder.Log = s.Log
|
||||
return nil
|
||||
}
|
||||
|
||||
// Start starts this sFlow listener listening on the configured network for sFlow packets
|
||||
func (s *SFlow) Start(acc telegraf.Accumulator) error {
|
||||
s.decoder.OnPacket(func(p *V5Format) {
|
||||
s.decoder.OnPacket(func(p *v5Format) {
|
||||
metrics := makeMetrics(p)
|
||||
for _, m := range metrics {
|
||||
acc.AddMetric(m)
|
||||
|
|
|
|||
|
|
@ -6,46 +6,44 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
IPProtocolTCP uint8 = 6
|
||||
IPProtocolUDP uint8 = 17
|
||||
ipProtocolTCP uint8 = 6
|
||||
ipProtocolUDP uint8 = 17
|
||||
)
|
||||
|
||||
var ETypeMap = map[uint16]string{
|
||||
var eTypeMap = map[uint16]string{
|
||||
0x0800: "IPv4",
|
||||
0x86DD: "IPv6",
|
||||
}
|
||||
|
||||
type ContainsMetricData interface {
|
||||
GetTags() map[string]string
|
||||
GetFields() map[string]interface{}
|
||||
type containsMetricData interface {
|
||||
getTags() map[string]string
|
||||
getFields() map[string]interface{}
|
||||
}
|
||||
|
||||
// V5Format answers and decoder.Directive capable of decoding sFlow v5 packets in accordance
|
||||
// v5Format answers and decoder.Directive capable of decoding sFlow v5 packets in accordance
|
||||
// with SFlow v5 specification at https://sflow.org/sflow_version_5.txt
|
||||
type V5Format struct {
|
||||
type v5Format struct {
|
||||
Version uint32
|
||||
AgentAddress net.IPAddr
|
||||
SubAgentID uint32
|
||||
SequenceNumber uint32
|
||||
Uptime uint32
|
||||
Samples []Sample
|
||||
Samples []sample
|
||||
}
|
||||
|
||||
type SampleType uint32
|
||||
type sampleType uint32
|
||||
|
||||
const (
|
||||
SampleTypeFlowSample SampleType = 1 // sflow_version_5.txt line: 1614
|
||||
SampleTypeFlowSampleExpanded SampleType = 3 // sflow_version_5.txt line: 1698
|
||||
sampleTypeFlowSample sampleType = 1 // sflow_version_5.txt line: 1614
|
||||
sampleTypeFlowSampleExpanded sampleType = 3 // sflow_version_5.txt line: 1698
|
||||
)
|
||||
|
||||
type SampleData interface{}
|
||||
|
||||
type Sample struct {
|
||||
SampleType SampleType
|
||||
SampleData SampleDataFlowSampleExpanded
|
||||
type sample struct {
|
||||
SampleType sampleType
|
||||
SampleData sampleDataFlowSampleExpanded
|
||||
}
|
||||
|
||||
type SampleDataFlowSampleExpanded struct {
|
||||
type sampleDataFlowSampleExpanded struct {
|
||||
SequenceNumber uint32
|
||||
SourceIDType uint32
|
||||
SourceIDIndex uint32
|
||||
|
|
@ -57,70 +55,70 @@ type SampleDataFlowSampleExpanded struct {
|
|||
InputIfIndex uint32
|
||||
OutputIfFormat uint32
|
||||
OutputIfIndex uint32
|
||||
FlowRecords []FlowRecord
|
||||
FlowRecords []flowRecord
|
||||
}
|
||||
|
||||
type FlowFormatType uint32
|
||||
type flowFormatType uint32
|
||||
|
||||
const (
|
||||
FlowFormatTypeRawPacketHeader FlowFormatType = 1 // sflow_version_5.txt line: 1938
|
||||
flowFormatTypeRawPacketHeader flowFormatType = 1 // sflow_version_5.txt line: 1938
|
||||
)
|
||||
|
||||
type FlowData ContainsMetricData
|
||||
type flowData containsMetricData
|
||||
|
||||
type FlowRecord struct {
|
||||
FlowFormat FlowFormatType
|
||||
FlowData FlowData
|
||||
type flowRecord struct {
|
||||
FlowFormat flowFormatType
|
||||
FlowData flowData
|
||||
}
|
||||
|
||||
type HeaderProtocolType uint32
|
||||
type headerProtocolType uint32
|
||||
|
||||
const (
|
||||
HeaderProtocolTypeEthernetISO88023 HeaderProtocolType = 1
|
||||
HeaderProtocolTypeISO88024TokenBus HeaderProtocolType = 2
|
||||
HeaderProtocolTypeISO88025TokenRing HeaderProtocolType = 3
|
||||
HeaderProtocolTypeFDDI HeaderProtocolType = 4
|
||||
HeaderProtocolTypeFrameRelay HeaderProtocolType = 5
|
||||
HeaderProtocolTypeX25 HeaderProtocolType = 6
|
||||
HeaderProtocolTypePPP HeaderProtocolType = 7
|
||||
HeaderProtocolTypeSMDS HeaderProtocolType = 8
|
||||
HeaderProtocolTypeAAL5 HeaderProtocolType = 9
|
||||
HeaderProtocolTypeAAL5IP HeaderProtocolType = 10 /* e.g. Cisco AAL5 mux */
|
||||
HeaderProtocolTypeIPv4 HeaderProtocolType = 11
|
||||
HeaderProtocolTypeIPv6 HeaderProtocolType = 12
|
||||
HeaderProtocolTypeMPLS HeaderProtocolType = 13
|
||||
HeaderProtocolTypePOS HeaderProtocolType = 14 /* RFC 1662, 2615 */
|
||||
headerProtocolTypeEthernetISO88023 headerProtocolType = 1
|
||||
headerProtocolTypeISO88024TokenBus headerProtocolType = 2
|
||||
headerProtocolTypeISO88025TokenRing headerProtocolType = 3
|
||||
headerProtocolTypeFDDI headerProtocolType = 4
|
||||
headerProtocolTypeFrameRelay headerProtocolType = 5
|
||||
headerProtocolTypeX25 headerProtocolType = 6
|
||||
headerProtocolTypePPP headerProtocolType = 7
|
||||
headerProtocolTypeSMDS headerProtocolType = 8
|
||||
headerProtocolTypeAAL5 headerProtocolType = 9
|
||||
headerProtocolTypeAAL5IP headerProtocolType = 10 /* e.g. Cisco AAL5 mux */
|
||||
headerProtocolTypeIPv4 headerProtocolType = 11
|
||||
headerProtocolTypeIPv6 headerProtocolType = 12
|
||||
headerProtocolTypeMPLS headerProtocolType = 13
|
||||
headerProtocolTypePOS headerProtocolType = 14 /* RFC 1662, 2615 */
|
||||
)
|
||||
|
||||
var HeaderProtocolMap = map[HeaderProtocolType]string{
|
||||
HeaderProtocolTypeEthernetISO88023: "ETHERNET-ISO88023", // sflow_version_5.txt line: 1920
|
||||
var headerProtocolMap = map[headerProtocolType]string{
|
||||
headerProtocolTypeEthernetISO88023: "ETHERNET-ISO88023", // sflow_version_5.txt line: 1920
|
||||
}
|
||||
|
||||
type Header ContainsMetricData
|
||||
type header containsMetricData
|
||||
|
||||
type RawPacketHeaderFlowData struct {
|
||||
HeaderProtocol HeaderProtocolType
|
||||
type rawPacketHeaderFlowData struct {
|
||||
HeaderProtocol headerProtocolType
|
||||
FrameLength uint32
|
||||
Bytes uint32
|
||||
StrippedOctets uint32
|
||||
HeaderLength uint32
|
||||
Header Header
|
||||
Header header
|
||||
}
|
||||
|
||||
func (h RawPacketHeaderFlowData) GetTags() map[string]string {
|
||||
func (h rawPacketHeaderFlowData) getTags() map[string]string {
|
||||
var t map[string]string
|
||||
if h.Header != nil {
|
||||
t = h.Header.GetTags()
|
||||
t = h.Header.getTags()
|
||||
} else {
|
||||
t = map[string]string{}
|
||||
}
|
||||
t["header_protocol"] = HeaderProtocolMap[h.HeaderProtocol]
|
||||
t["header_protocol"] = headerProtocolMap[h.HeaderProtocol]
|
||||
return t
|
||||
}
|
||||
func (h RawPacketHeaderFlowData) GetFields() map[string]interface{} {
|
||||
func (h rawPacketHeaderFlowData) getFields() map[string]interface{} {
|
||||
var f map[string]interface{}
|
||||
if h.Header != nil {
|
||||
f = h.Header.GetFields()
|
||||
f = h.Header.getFields()
|
||||
} else {
|
||||
f = map[string]interface{}{}
|
||||
}
|
||||
|
|
@ -130,22 +128,22 @@ func (h RawPacketHeaderFlowData) GetFields() map[string]interface{} {
|
|||
return f
|
||||
}
|
||||
|
||||
type IPHeader ContainsMetricData
|
||||
type ipHeader containsMetricData
|
||||
|
||||
type EthHeader struct {
|
||||
type ethHeader struct {
|
||||
DestinationMAC [6]byte
|
||||
SourceMAC [6]byte
|
||||
TagProtocolIdentifier uint16
|
||||
TagControlInformation uint16
|
||||
EtherTypeCode uint16
|
||||
EtherType string
|
||||
IPHeader IPHeader
|
||||
IPHeader ipHeader
|
||||
}
|
||||
|
||||
func (h EthHeader) GetTags() map[string]string {
|
||||
func (h ethHeader) getTags() map[string]string {
|
||||
var t map[string]string
|
||||
if h.IPHeader != nil {
|
||||
t = h.IPHeader.GetTags()
|
||||
t = h.IPHeader.getTags()
|
||||
} else {
|
||||
t = map[string]string{}
|
||||
}
|
||||
|
|
@ -154,17 +152,17 @@ func (h EthHeader) GetTags() map[string]string {
|
|||
t["ether_type"] = h.EtherType
|
||||
return t
|
||||
}
|
||||
func (h EthHeader) GetFields() map[string]interface{} {
|
||||
func (h ethHeader) getFields() map[string]interface{} {
|
||||
if h.IPHeader != nil {
|
||||
return h.IPHeader.GetFields()
|
||||
return h.IPHeader.getFields()
|
||||
}
|
||||
return map[string]interface{}{}
|
||||
}
|
||||
|
||||
type ProtocolHeader ContainsMetricData
|
||||
type protocolHeader containsMetricData
|
||||
|
||||
// https://en.wikipedia.org/wiki/IPv4#Header
|
||||
type IPV4Header struct {
|
||||
type ipV4Header struct {
|
||||
Version uint8 // 4 bit
|
||||
InternetHeaderLength uint8 // 4 bit
|
||||
DSCP uint8
|
||||
|
|
@ -178,13 +176,13 @@ type IPV4Header struct {
|
|||
HeaderChecksum uint16
|
||||
SourceIP [4]byte
|
||||
DestIP [4]byte
|
||||
ProtocolHeader ProtocolHeader
|
||||
ProtocolHeader protocolHeader
|
||||
}
|
||||
|
||||
func (h IPV4Header) GetTags() map[string]string {
|
||||
func (h ipV4Header) getTags() map[string]string {
|
||||
var t map[string]string
|
||||
if h.ProtocolHeader != nil {
|
||||
t = h.ProtocolHeader.GetTags()
|
||||
t = h.ProtocolHeader.getTags()
|
||||
} else {
|
||||
t = map[string]string{}
|
||||
}
|
||||
|
|
@ -192,10 +190,10 @@ func (h IPV4Header) GetTags() map[string]string {
|
|||
t["dst_ip"] = net.IP(h.DestIP[:]).String()
|
||||
return t
|
||||
}
|
||||
func (h IPV4Header) GetFields() map[string]interface{} {
|
||||
func (h ipV4Header) getFields() map[string]interface{} {
|
||||
var f map[string]interface{}
|
||||
if h.ProtocolHeader != nil {
|
||||
f = h.ProtocolHeader.GetFields()
|
||||
f = h.ProtocolHeader.getFields()
|
||||
} else {
|
||||
f = map[string]interface{}{}
|
||||
}
|
||||
|
|
@ -209,7 +207,7 @@ func (h IPV4Header) GetFields() map[string]interface{} {
|
|||
}
|
||||
|
||||
// https://en.wikipedia.org/wiki/IPv6_packet
|
||||
type IPV6Header struct {
|
||||
type ipV6Header struct {
|
||||
DSCP uint8
|
||||
ECN uint8
|
||||
PayloadLength uint16
|
||||
|
|
@ -217,13 +215,13 @@ type IPV6Header struct {
|
|||
HopLimit uint8
|
||||
SourceIP [16]byte
|
||||
DestIP [16]byte
|
||||
ProtocolHeader ProtocolHeader
|
||||
ProtocolHeader protocolHeader
|
||||
}
|
||||
|
||||
func (h IPV6Header) GetTags() map[string]string {
|
||||
func (h ipV6Header) getTags() map[string]string {
|
||||
var t map[string]string
|
||||
if h.ProtocolHeader != nil {
|
||||
t = h.ProtocolHeader.GetTags()
|
||||
t = h.ProtocolHeader.getTags()
|
||||
} else {
|
||||
t = map[string]string{}
|
||||
}
|
||||
|
|
@ -231,10 +229,10 @@ func (h IPV6Header) GetTags() map[string]string {
|
|||
t["dst_ip"] = net.IP(h.DestIP[:]).String()
|
||||
return t
|
||||
}
|
||||
func (h IPV6Header) GetFields() map[string]interface{} {
|
||||
func (h ipV6Header) getFields() map[string]interface{} {
|
||||
var f map[string]interface{}
|
||||
if h.ProtocolHeader != nil {
|
||||
f = h.ProtocolHeader.GetFields()
|
||||
f = h.ProtocolHeader.getFields()
|
||||
} else {
|
||||
f = map[string]interface{}{}
|
||||
}
|
||||
|
|
@ -245,7 +243,7 @@ func (h IPV6Header) GetFields() map[string]interface{} {
|
|||
}
|
||||
|
||||
// https://en.wikipedia.org/wiki/Transmission_Control_Protocol
|
||||
type TCPHeader struct {
|
||||
type tcpHeader struct {
|
||||
SourcePort uint16
|
||||
DestinationPort uint16
|
||||
Sequence uint32
|
||||
|
|
@ -257,14 +255,14 @@ type TCPHeader struct {
|
|||
TCPUrgentPointer uint16
|
||||
}
|
||||
|
||||
func (h TCPHeader) GetTags() map[string]string {
|
||||
func (h tcpHeader) getTags() map[string]string {
|
||||
t := map[string]string{
|
||||
"dst_port": strconv.FormatUint(uint64(h.DestinationPort), 10),
|
||||
"src_port": strconv.FormatUint(uint64(h.SourcePort), 10),
|
||||
}
|
||||
return t
|
||||
}
|
||||
func (h TCPHeader) GetFields() map[string]interface{} {
|
||||
func (h tcpHeader) getFields() map[string]interface{} {
|
||||
return map[string]interface{}{
|
||||
"tcp_header_length": h.TCPHeaderLength,
|
||||
"tcp_urgent_pointer": h.TCPUrgentPointer,
|
||||
|
|
@ -272,21 +270,21 @@ func (h TCPHeader) GetFields() map[string]interface{} {
|
|||
}
|
||||
}
|
||||
|
||||
type UDPHeader struct {
|
||||
type udpHeader struct {
|
||||
SourcePort uint16
|
||||
DestinationPort uint16
|
||||
UDPLength uint16
|
||||
Checksum uint16
|
||||
}
|
||||
|
||||
func (h UDPHeader) GetTags() map[string]string {
|
||||
func (h udpHeader) getTags() map[string]string {
|
||||
t := map[string]string{
|
||||
"dst_port": strconv.FormatUint(uint64(h.DestinationPort), 10),
|
||||
"src_port": strconv.FormatUint(uint64(h.SourcePort), 10),
|
||||
}
|
||||
return t
|
||||
}
|
||||
func (h UDPHeader) GetFields() map[string]interface{} {
|
||||
func (h udpHeader) getFields() map[string]interface{} {
|
||||
return map[string]interface{}{
|
||||
"udp_length": h.UDPLength,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,16 +7,16 @@ import (
|
|||
)
|
||||
|
||||
func TestRawPacketHeaderFlowData(t *testing.T) {
|
||||
h := RawPacketHeaderFlowData{
|
||||
HeaderProtocol: HeaderProtocolTypeEthernetISO88023,
|
||||
h := rawPacketHeaderFlowData{
|
||||
HeaderProtocol: headerProtocolTypeEthernetISO88023,
|
||||
FrameLength: 64,
|
||||
Bytes: 64,
|
||||
StrippedOctets: 0,
|
||||
HeaderLength: 0,
|
||||
Header: nil,
|
||||
}
|
||||
tags := h.GetTags()
|
||||
fields := h.GetFields()
|
||||
tags := h.getTags()
|
||||
fields := h.getFields()
|
||||
|
||||
require.NotNil(t, fields)
|
||||
require.NotNil(t, tags)
|
||||
|
|
@ -26,7 +26,7 @@ func TestRawPacketHeaderFlowData(t *testing.T) {
|
|||
|
||||
// process a raw ethernet packet without any encapsulated protocol
|
||||
func TestEthHeader(t *testing.T) {
|
||||
h := EthHeader{
|
||||
h := ethHeader{
|
||||
DestinationMAC: [6]byte{0xca, 0xff, 0xee, 0xff, 0xe, 0x0},
|
||||
SourceMAC: [6]byte{0xde, 0xad, 0xbe, 0xef, 0x0, 0x0},
|
||||
TagProtocolIdentifier: 0x88B5, // IEEE Std 802 - Local Experimental Ethertype
|
||||
|
|
@ -35,8 +35,8 @@ func TestEthHeader(t *testing.T) {
|
|||
EtherType: "",
|
||||
IPHeader: nil,
|
||||
}
|
||||
tags := h.GetTags()
|
||||
fields := h.GetFields()
|
||||
tags := h.getTags()
|
||||
fields := h.getFields()
|
||||
|
||||
require.NotNil(t, fields)
|
||||
require.NotNil(t, tags)
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ func (cfg *apiConfig) mbeansEndpoint(server, core string) string {
|
|||
return strings.TrimSuffix(server, "/") + "/solr/" + strings.Trim(core, "/") + cfg.endpointMBeans
|
||||
}
|
||||
|
||||
func (cfg *apiConfig) parseCore(acc telegraf.Accumulator, core string, data *MBeansData, ts time.Time) {
|
||||
func (cfg *apiConfig) parseCore(acc telegraf.Accumulator, coreStr string, data *mBeansData, ts time.Time) {
|
||||
// Determine the core information element
|
||||
var coreData json.RawMessage
|
||||
for i := 0; i < len(data.SolrMbeans); i += 2 {
|
||||
|
|
@ -64,9 +64,9 @@ func (cfg *apiConfig) parseCore(acc telegraf.Accumulator, core string, data *MBe
|
|||
return
|
||||
}
|
||||
|
||||
var coreMetrics map[string]Core
|
||||
var coreMetrics map[string]core
|
||||
if err := json.Unmarshal(coreData, &coreMetrics); err != nil {
|
||||
acc.AddError(fmt.Errorf("unmarshalling core metrics for %q failed: %w", core, err))
|
||||
acc.AddError(fmt.Errorf("unmarshalling core metrics for %q failed: %w", coreStr, err))
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -80,7 +80,7 @@ func (cfg *apiConfig) parseCore(acc telegraf.Accumulator, core string, data *MBe
|
|||
"num_docs": m.Stats.NumDocs,
|
||||
}
|
||||
tags := map[string]string{
|
||||
"core": core,
|
||||
"core": coreStr,
|
||||
"handler": name,
|
||||
}
|
||||
|
||||
|
|
@ -88,7 +88,7 @@ func (cfg *apiConfig) parseCore(acc telegraf.Accumulator, core string, data *MBe
|
|||
}
|
||||
}
|
||||
|
||||
func (cfg *apiConfig) parseCache(acc telegraf.Accumulator, core string, data *MBeansData, ts time.Time) {
|
||||
func (cfg *apiConfig) parseCache(acc telegraf.Accumulator, core string, data *mBeansData, ts time.Time) {
|
||||
// Determine the cache information element
|
||||
var cacheData json.RawMessage
|
||||
for i := 0; i < len(data.SolrMbeans); i += 2 {
|
||||
|
|
@ -102,7 +102,7 @@ func (cfg *apiConfig) parseCache(acc telegraf.Accumulator, core string, data *MB
|
|||
return
|
||||
}
|
||||
|
||||
var cacheMetrics map[string]Cache
|
||||
var cacheMetrics map[string]cache
|
||||
if err := json.Unmarshal(cacheData, &cacheMetrics); err != nil {
|
||||
acc.AddError(fmt.Errorf("unmarshalling update handler for %q failed: %w", core, err))
|
||||
return
|
||||
|
|
@ -135,7 +135,7 @@ func (cfg *apiConfig) parseCache(acc telegraf.Accumulator, core string, data *MB
|
|||
}
|
||||
}
|
||||
|
||||
func (cfg *apiConfig) parseQueryHandler(acc telegraf.Accumulator, core string, data *MBeansData, ts time.Time) {
|
||||
func (cfg *apiConfig) parseQueryHandler(acc telegraf.Accumulator, core string, data *mBeansData, ts time.Time) {
|
||||
// Determine the query-handler information element
|
||||
var queryData json.RawMessage
|
||||
for i := 0; i < len(data.SolrMbeans); i += 2 {
|
||||
|
|
@ -149,7 +149,7 @@ func (cfg *apiConfig) parseQueryHandler(acc telegraf.Accumulator, core string, d
|
|||
return
|
||||
}
|
||||
|
||||
var queryMetrics map[string]QueryHandler
|
||||
var queryMetrics map[string]queryHandler
|
||||
if err := json.Unmarshal(queryData, &queryMetrics); err != nil {
|
||||
acc.AddError(fmt.Errorf("unmarshalling query handler for %q failed: %w", core, err))
|
||||
return
|
||||
|
|
@ -202,7 +202,7 @@ func (cfg *apiConfig) parseQueryHandler(acc telegraf.Accumulator, core string, d
|
|||
}
|
||||
}
|
||||
|
||||
func (cfg *apiConfig) parseUpdateHandler(acc telegraf.Accumulator, core string, data *MBeansData, ts time.Time) {
|
||||
func (cfg *apiConfig) parseUpdateHandler(acc telegraf.Accumulator, core string, data *mBeansData, ts time.Time) {
|
||||
// Determine the update-handler information element
|
||||
var updateData json.RawMessage
|
||||
for i := 0; i < len(data.SolrMbeans); i += 2 {
|
||||
|
|
@ -216,7 +216,7 @@ func (cfg *apiConfig) parseUpdateHandler(acc telegraf.Accumulator, core string,
|
|||
return
|
||||
}
|
||||
|
||||
var updateMetrics map[string]UpdateHandler
|
||||
var updateMetrics map[string]updateHandler
|
||||
if err := json.Unmarshal(updateData, &updateMetrics); err != nil {
|
||||
acc.AddError(fmt.Errorf("unmarshalling update handler for %q failed: %w", core, err))
|
||||
return
|
||||
|
|
|
|||
|
|
@ -120,7 +120,7 @@ func (s *Solr) getAPIConfig(server string) *apiConfig {
|
|||
func (s *Solr) collect(acc telegraf.Accumulator, cfg *apiConfig, server string) {
|
||||
now := time.Now()
|
||||
|
||||
var coreStatus AdminCoresStatus
|
||||
var coreStatus adminCoresStatus
|
||||
if err := s.query(cfg.adminEndpoint(server), &coreStatus); err != nil {
|
||||
acc.AddError(err)
|
||||
return
|
||||
|
|
@ -145,7 +145,7 @@ func (s *Solr) collect(acc telegraf.Accumulator, cfg *apiConfig, server string)
|
|||
go func(server string, core string) {
|
||||
defer wg.Done()
|
||||
|
||||
var data MBeansData
|
||||
var data mBeansData
|
||||
if err := s.query(cfg.mbeansEndpoint(server, core), &data); err != nil {
|
||||
acc.AddError(err)
|
||||
return
|
||||
|
|
|
|||
|
|
@ -2,9 +2,8 @@ package solr
|
|||
|
||||
import "encoding/json"
|
||||
|
||||
// AdminCoresStatus is an exported type that
|
||||
// contains a response with information about Solr cores.
|
||||
type AdminCoresStatus struct {
|
||||
// adminCoresStatus is an exported type that contains a response with information about Solr cores.
|
||||
type adminCoresStatus struct {
|
||||
Status map[string]struct {
|
||||
Index struct {
|
||||
SizeInBytes int64 `json:"sizeInBytes"`
|
||||
|
|
@ -15,23 +14,20 @@ type AdminCoresStatus struct {
|
|||
} `json:"status"`
|
||||
}
|
||||
|
||||
// MBeansData is an exported type that
|
||||
// contains a response from Solr with metrics
|
||||
type MBeansData struct {
|
||||
Headers ResponseHeader `json:"responseHeader"`
|
||||
// mBeansData is an exported type that contains a response from Solr with metrics
|
||||
type mBeansData struct {
|
||||
Headers responseHeader `json:"responseHeader"`
|
||||
SolrMbeans []json.RawMessage `json:"solr-mbeans"`
|
||||
}
|
||||
|
||||
// ResponseHeader is an exported type that
|
||||
// contains a response metrics: QTime and Status
|
||||
type ResponseHeader struct {
|
||||
// responseHeader is an exported type that contains a response metrics: QTime and Status
|
||||
type responseHeader struct {
|
||||
QTime int64 `json:"QTime"`
|
||||
Status int64 `json:"status"`
|
||||
}
|
||||
|
||||
// Core is an exported type that
|
||||
// contains Core metrics
|
||||
type Core struct {
|
||||
// core is an exported type that contains Core metrics
|
||||
type core struct {
|
||||
Stats struct {
|
||||
DeletedDocs int64 `json:"deletedDocs"`
|
||||
MaxDoc int64 `json:"maxDoc"`
|
||||
|
|
@ -39,15 +35,13 @@ type Core struct {
|
|||
} `json:"stats"`
|
||||
}
|
||||
|
||||
// QueryHandler is an exported type that
|
||||
// contains query handler metrics
|
||||
type QueryHandler struct {
|
||||
// queryHandler is an exported type that contains query handler metrics
|
||||
type queryHandler struct {
|
||||
Stats interface{} `json:"stats"`
|
||||
}
|
||||
|
||||
// UpdateHandler is an exported type that
|
||||
// contains update handler metrics
|
||||
type UpdateHandler struct {
|
||||
// updateHandler is an exported type that contains update handler metrics
|
||||
type updateHandler struct {
|
||||
Stats struct {
|
||||
Adds int64 `json:"adds"`
|
||||
AutocommitMaxDocs int64 `json:"autocommit maxDocs"`
|
||||
|
|
@ -69,8 +63,7 @@ type UpdateHandler struct {
|
|||
} `json:"stats"`
|
||||
}
|
||||
|
||||
// Cache is an exported type that
|
||||
// contains cache metrics
|
||||
type Cache struct {
|
||||
// cache is an exported type that contains cache metrics
|
||||
type cache struct {
|
||||
Stats map[string]interface{} `json:"stats"`
|
||||
}
|
||||
|
|
|
|||
|
|
@ -42,8 +42,8 @@ var (
|
|||
)
|
||||
|
||||
type (
|
||||
// Stackdriver is the Google Stackdriver config info.
|
||||
Stackdriver struct {
|
||||
// stackdriver is the Google Stackdriver config info.
|
||||
stackdriver struct {
|
||||
Project string `toml:"project"`
|
||||
RateLimit int `toml:"rate_limit"`
|
||||
Window config.Duration `toml:"window"`
|
||||
|
|
@ -53,7 +53,7 @@ type (
|
|||
MetricTypePrefixExclude []string `toml:"metric_type_prefix_exclude"`
|
||||
GatherRawDistributionBuckets bool `toml:"gather_raw_distribution_buckets"`
|
||||
DistributionAggregationAligners []string `toml:"distribution_aggregation_aligners"`
|
||||
Filter *ListTimeSeriesFilter `toml:"filter"`
|
||||
Filter *listTimeSeriesFilter `toml:"filter"`
|
||||
|
||||
Log telegraf.Logger
|
||||
|
||||
|
|
@ -62,29 +62,28 @@ type (
|
|||
prevEnd time.Time
|
||||
}
|
||||
|
||||
// ListTimeSeriesFilter contains resource labels and metric labels
|
||||
ListTimeSeriesFilter struct {
|
||||
ResourceLabels []*Label `json:"resource_labels"`
|
||||
MetricLabels []*Label `json:"metric_labels"`
|
||||
UserLabels []*Label `json:"user_labels"`
|
||||
SystemLabels []*Label `json:"system_labels"`
|
||||
// listTimeSeriesFilter contains resource labels and metric labels
|
||||
listTimeSeriesFilter struct {
|
||||
ResourceLabels []*label `json:"resource_labels"`
|
||||
MetricLabels []*label `json:"metric_labels"`
|
||||
UserLabels []*label `json:"user_labels"`
|
||||
SystemLabels []*label `json:"system_labels"`
|
||||
}
|
||||
|
||||
// Label contains key and value
|
||||
Label struct {
|
||||
// label contains key and value
|
||||
label struct {
|
||||
Key string `toml:"key"`
|
||||
Value string `toml:"value"`
|
||||
}
|
||||
|
||||
// TimeSeriesConfCache caches generated timeseries configurations
|
||||
// timeSeriesConfCache caches generated timeseries configurations
|
||||
timeSeriesConfCache struct {
|
||||
TTL time.Duration
|
||||
Generated time.Time
|
||||
TimeSeriesConfs []*timeSeriesConf
|
||||
}
|
||||
|
||||
// Internal structure which holds our configuration for a particular GCP time
|
||||
// series.
|
||||
// Internal structure which holds our configuration for a particular GCP time series.
|
||||
timeSeriesConf struct {
|
||||
// The influx measurement name this time series maps to
|
||||
measurement string
|
||||
|
|
@ -193,12 +192,12 @@ func (smc *stackdriverMetricClient) Close() error {
|
|||
return smc.conn.Close()
|
||||
}
|
||||
|
||||
func (*Stackdriver) SampleConfig() string {
|
||||
func (*stackdriver) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
// Gather implements telegraf.Input interface
|
||||
func (s *Stackdriver) Gather(acc telegraf.Accumulator) error {
|
||||
func (s *stackdriver) Gather(acc telegraf.Accumulator) error {
|
||||
ctx := context.Background()
|
||||
|
||||
if s.RateLimit == 0 {
|
||||
|
|
@ -244,7 +243,7 @@ func (s *Stackdriver) Gather(acc telegraf.Accumulator) error {
|
|||
}
|
||||
|
||||
// Returns the start and end time for the next collection.
|
||||
func (s *Stackdriver) updateWindow(prevEnd time.Time) (time.Time, time.Time) {
|
||||
func (s *stackdriver) updateWindow(prevEnd time.Time) (time.Time, time.Time) {
|
||||
var start time.Time
|
||||
if time.Duration(s.Window) != 0 {
|
||||
start = time.Now().Add(-time.Duration(s.Delay)).Add(-time.Duration(s.Window))
|
||||
|
|
@ -258,7 +257,7 @@ func (s *Stackdriver) updateWindow(prevEnd time.Time) (time.Time, time.Time) {
|
|||
}
|
||||
|
||||
// Generate filter string for ListTimeSeriesRequest
|
||||
func (s *Stackdriver) newListTimeSeriesFilter(metricType string) string {
|
||||
func (s *stackdriver) newListTimeSeriesFilter(metricType string) string {
|
||||
functions := []string{
|
||||
"starts_with",
|
||||
"ends_with",
|
||||
|
|
@ -348,7 +347,7 @@ func (s *Stackdriver) newListTimeSeriesFilter(metricType string) string {
|
|||
|
||||
// Create and initialize a timeSeriesConf for a given GCP metric type with
|
||||
// defaults taken from the gcp_stackdriver plugin configuration.
|
||||
func (s *Stackdriver) newTimeSeriesConf(
|
||||
func (s *stackdriver) newTimeSeriesConf(
|
||||
metricType string, startTime, endTime time.Time,
|
||||
) *timeSeriesConf {
|
||||
filter := s.newListTimeSeriesFilter(metricType)
|
||||
|
|
@ -404,7 +403,7 @@ func (c *timeSeriesConfCache) IsValid() bool {
|
|||
return c.TimeSeriesConfs != nil && time.Since(c.Generated) < c.TTL
|
||||
}
|
||||
|
||||
func (s *Stackdriver) initializeStackdriverClient(ctx context.Context) error {
|
||||
func (s *stackdriver) initializeStackdriverClient(ctx context.Context) error {
|
||||
if s.client == nil {
|
||||
client, err := monitoring.NewMetricClient(ctx)
|
||||
if err != nil {
|
||||
|
|
@ -453,7 +452,7 @@ func includeExcludeHelper(key string, includes, excludes []string) bool {
|
|||
// Test whether a particular GCP metric type should be scraped by this plugin
|
||||
// by checking the plugin name against the configuration's
|
||||
// "includeMetricTypePrefixes" and "excludeMetricTypePrefixes"
|
||||
func (s *Stackdriver) includeMetricType(metricType string) bool {
|
||||
func (s *stackdriver) includeMetricType(metricType string) bool {
|
||||
k := metricType
|
||||
inc := s.MetricTypePrefixInclude
|
||||
exc := s.MetricTypePrefixExclude
|
||||
|
|
@ -462,7 +461,7 @@ func (s *Stackdriver) includeMetricType(metricType string) bool {
|
|||
}
|
||||
|
||||
// Generates filter for list metric descriptors request
|
||||
func (s *Stackdriver) newListMetricDescriptorsFilters() []string {
|
||||
func (s *stackdriver) newListMetricDescriptorsFilters() []string {
|
||||
if len(s.MetricTypePrefixInclude) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
|
@ -476,7 +475,7 @@ func (s *Stackdriver) newListMetricDescriptorsFilters() []string {
|
|||
|
||||
// Generate a list of timeSeriesConfig structs by making a ListMetricDescriptors
|
||||
// API request and filtering the result against our configuration.
|
||||
func (s *Stackdriver) generatetimeSeriesConfs(
|
||||
func (s *stackdriver) generatetimeSeriesConfs(
|
||||
ctx context.Context, startTime, endTime time.Time,
|
||||
) ([]*timeSeriesConf, error) {
|
||||
if s.timeSeriesConfCache != nil && s.timeSeriesConfCache.IsValid() {
|
||||
|
|
@ -547,7 +546,7 @@ func (s *Stackdriver) generatetimeSeriesConfs(
|
|||
|
||||
// Do the work to gather an individual time series. Runs inside a
|
||||
// timeseries-specific goroutine.
|
||||
func (s *Stackdriver) gatherTimeSeries(
|
||||
func (s *stackdriver) gatherTimeSeries(
|
||||
ctx context.Context, grouper *lockedSeriesGrouper, tsConf *timeSeriesConf,
|
||||
) error {
|
||||
tsReq := tsConf.listTimeSeriesRequest
|
||||
|
|
@ -600,7 +599,7 @@ func (s *Stackdriver) gatherTimeSeries(
|
|||
return nil
|
||||
}
|
||||
|
||||
type Buckets interface {
|
||||
type buckets interface {
|
||||
Amount() int32
|
||||
UpperBound(i int32) float64
|
||||
}
|
||||
|
|
@ -642,7 +641,7 @@ func (e *ExplicitBuckets) UpperBound(i int32) float64 {
|
|||
return e.Bounds[i]
|
||||
}
|
||||
|
||||
func NewBucket(dist *distributionpb.Distribution) (Buckets, error) {
|
||||
func NewBucket(dist *distributionpb.Distribution) (buckets, error) {
|
||||
linearBuckets := dist.BucketOptions.GetLinearBuckets()
|
||||
if linearBuckets != nil {
|
||||
var l LinearBuckets
|
||||
|
|
@ -668,7 +667,7 @@ func NewBucket(dist *distributionpb.Distribution) (Buckets, error) {
|
|||
}
|
||||
|
||||
// AddDistribution adds metrics from a distribution value type.
|
||||
func (s *Stackdriver) addDistribution(dist *distributionpb.Distribution, tags map[string]string, ts time.Time,
|
||||
func (s *stackdriver) addDistribution(dist *distributionpb.Distribution, tags map[string]string, ts time.Time,
|
||||
grouper *lockedSeriesGrouper, tsConf *timeSeriesConf,
|
||||
) error {
|
||||
field := tsConf.fieldKey
|
||||
|
|
@ -714,7 +713,7 @@ func (s *Stackdriver) addDistribution(dist *distributionpb.Distribution, tags ma
|
|||
|
||||
func init() {
|
||||
inputs.Add("stackdriver", func() telegraf.Input {
|
||||
return &Stackdriver{
|
||||
return &stackdriver{
|
||||
CacheTTL: defaultCacheTTL,
|
||||
RateLimit: defaultRateLimit,
|
||||
Delay: defaultDelay,
|
||||
|
|
|
|||
|
|
@ -63,7 +63,7 @@ func (m *MockStackdriverClient) Close() error {
|
|||
}
|
||||
|
||||
func TestInitAndRegister(t *testing.T) {
|
||||
expected := &Stackdriver{
|
||||
expected := &stackdriver{
|
||||
CacheTTL: defaultCacheTTL,
|
||||
RateLimit: defaultRateLimit,
|
||||
Delay: defaultDelay,
|
||||
|
|
@ -732,7 +732,7 @@ func TestGather(t *testing.T) {
|
|||
return ch, nil
|
||||
}
|
||||
|
||||
s := &Stackdriver{
|
||||
s := &stackdriver{
|
||||
Log: testutil.Logger{},
|
||||
Project: "test",
|
||||
RateLimit: 10,
|
||||
|
|
@ -858,7 +858,7 @@ func TestGatherAlign(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
s := &Stackdriver{
|
||||
s := &stackdriver{
|
||||
Log: testutil.Logger{},
|
||||
Project: "test",
|
||||
RateLimit: 10,
|
||||
|
|
@ -892,13 +892,13 @@ func TestListMetricDescriptorFilter(t *testing.T) {
|
|||
now := time.Now().Round(time.Second)
|
||||
tests := []struct {
|
||||
name string
|
||||
stackdriver *Stackdriver
|
||||
stackdriver *stackdriver
|
||||
descriptor *metricpb.MetricDescriptor
|
||||
calls []call
|
||||
}{
|
||||
{
|
||||
name: "simple",
|
||||
stackdriver: &Stackdriver{
|
||||
stackdriver: &stackdriver{
|
||||
Project: "test",
|
||||
MetricTypePrefixInclude: []string{"telegraf/cpu/usage"},
|
||||
RateLimit: 1,
|
||||
|
|
@ -919,11 +919,11 @@ func TestListMetricDescriptorFilter(t *testing.T) {
|
|||
},
|
||||
{
|
||||
name: "single resource labels string",
|
||||
stackdriver: &Stackdriver{
|
||||
stackdriver: &stackdriver{
|
||||
Project: "test",
|
||||
MetricTypePrefixInclude: []string{"telegraf/cpu/usage"},
|
||||
Filter: &ListTimeSeriesFilter{
|
||||
ResourceLabels: []*Label{
|
||||
Filter: &listTimeSeriesFilter{
|
||||
ResourceLabels: []*label{
|
||||
{
|
||||
Key: "instance_name",
|
||||
Value: `localhost`,
|
||||
|
|
@ -948,11 +948,11 @@ func TestListMetricDescriptorFilter(t *testing.T) {
|
|||
},
|
||||
{
|
||||
name: "single resource labels function",
|
||||
stackdriver: &Stackdriver{
|
||||
stackdriver: &stackdriver{
|
||||
Project: "test",
|
||||
MetricTypePrefixInclude: []string{"telegraf/cpu/usage"},
|
||||
Filter: &ListTimeSeriesFilter{
|
||||
ResourceLabels: []*Label{
|
||||
Filter: &listTimeSeriesFilter{
|
||||
ResourceLabels: []*label{
|
||||
{
|
||||
Key: "instance_name",
|
||||
Value: `starts_with("localhost")`,
|
||||
|
|
@ -977,11 +977,11 @@ func TestListMetricDescriptorFilter(t *testing.T) {
|
|||
},
|
||||
{
|
||||
name: "multiple resource labels",
|
||||
stackdriver: &Stackdriver{
|
||||
stackdriver: &stackdriver{
|
||||
Project: "test",
|
||||
MetricTypePrefixInclude: []string{"telegraf/cpu/usage"},
|
||||
Filter: &ListTimeSeriesFilter{
|
||||
ResourceLabels: []*Label{
|
||||
Filter: &listTimeSeriesFilter{
|
||||
ResourceLabels: []*label{
|
||||
{
|
||||
Key: "instance_name",
|
||||
Value: `localhost`,
|
||||
|
|
@ -1010,11 +1010,11 @@ func TestListMetricDescriptorFilter(t *testing.T) {
|
|||
},
|
||||
{
|
||||
name: "single metric label string",
|
||||
stackdriver: &Stackdriver{
|
||||
stackdriver: &stackdriver{
|
||||
Project: "test",
|
||||
MetricTypePrefixInclude: []string{"telegraf/cpu/usage"},
|
||||
Filter: &ListTimeSeriesFilter{
|
||||
MetricLabels: []*Label{
|
||||
Filter: &listTimeSeriesFilter{
|
||||
MetricLabels: []*label{
|
||||
{
|
||||
Key: "resource_type",
|
||||
Value: `instance`,
|
||||
|
|
@ -1039,11 +1039,11 @@ func TestListMetricDescriptorFilter(t *testing.T) {
|
|||
},
|
||||
{
|
||||
name: "single metric label function",
|
||||
stackdriver: &Stackdriver{
|
||||
stackdriver: &stackdriver{
|
||||
Project: "test",
|
||||
MetricTypePrefixInclude: []string{"telegraf/cpu/usage"},
|
||||
Filter: &ListTimeSeriesFilter{
|
||||
MetricLabels: []*Label{
|
||||
Filter: &listTimeSeriesFilter{
|
||||
MetricLabels: []*label{
|
||||
{
|
||||
Key: "resource_id",
|
||||
Value: `starts_with("abc-")`,
|
||||
|
|
@ -1068,11 +1068,11 @@ func TestListMetricDescriptorFilter(t *testing.T) {
|
|||
},
|
||||
{
|
||||
name: "multiple metric labels",
|
||||
stackdriver: &Stackdriver{
|
||||
stackdriver: &stackdriver{
|
||||
Project: "test",
|
||||
MetricTypePrefixInclude: []string{"telegraf/cpu/usage"},
|
||||
Filter: &ListTimeSeriesFilter{
|
||||
MetricLabels: []*Label{
|
||||
Filter: &listTimeSeriesFilter{
|
||||
MetricLabels: []*label{
|
||||
{
|
||||
Key: "resource_type",
|
||||
Value: "instance",
|
||||
|
|
@ -1102,11 +1102,11 @@ func TestListMetricDescriptorFilter(t *testing.T) {
|
|||
},
|
||||
{
|
||||
name: "all labels filters",
|
||||
stackdriver: &Stackdriver{
|
||||
stackdriver: &stackdriver{
|
||||
Project: "test",
|
||||
MetricTypePrefixInclude: []string{"telegraf/cpu/usage"},
|
||||
Filter: &ListTimeSeriesFilter{
|
||||
ResourceLabels: []*Label{
|
||||
Filter: &listTimeSeriesFilter{
|
||||
ResourceLabels: []*label{
|
||||
{
|
||||
Key: "instance_name",
|
||||
Value: `localhost`,
|
||||
|
|
@ -1116,7 +1116,7 @@ func TestListMetricDescriptorFilter(t *testing.T) {
|
|||
Value: `starts_with("us-")`,
|
||||
},
|
||||
},
|
||||
MetricLabels: []*Label{
|
||||
MetricLabels: []*label{
|
||||
{
|
||||
Key: "resource_type",
|
||||
Value: "instance",
|
||||
|
|
@ -1126,7 +1126,7 @@ func TestListMetricDescriptorFilter(t *testing.T) {
|
|||
Value: `starts_with("abc-")`,
|
||||
},
|
||||
},
|
||||
UserLabels: []*Label{
|
||||
UserLabels: []*label{
|
||||
{
|
||||
Key: "team",
|
||||
Value: "badgers",
|
||||
|
|
@ -1136,7 +1136,7 @@ func TestListMetricDescriptorFilter(t *testing.T) {
|
|||
Value: `starts_with("prod-")`,
|
||||
},
|
||||
},
|
||||
SystemLabels: []*Label{
|
||||
SystemLabels: []*label{
|
||||
{
|
||||
Key: "machine_type",
|
||||
Value: "e2",
|
||||
|
|
|
|||
Loading…
Reference in New Issue