This commit is contained in:
Steven Soroka 2020-06-24 13:29:44 -04:00
parent 31754635a3
commit 88b09cf18c
1 changed files with 11 additions and 1 deletions

View File

@ -55,6 +55,8 @@ func (a *Accumulator) NMetrics() uint64 {
} }
func (a *Accumulator) GetTelegrafMetrics() []telegraf.Metric { func (a *Accumulator) GetTelegrafMetrics() []telegraf.Metric {
a.Lock()
defer a.Unlock()
metrics := []telegraf.Metric{} metrics := []telegraf.Metric{}
for _, m := range a.Metrics { for _, m := range a.Metrics {
metrics = append(metrics, FromTestMetric(m)) metrics = append(metrics, FromTestMetric(m))
@ -251,6 +253,8 @@ func (a *Accumulator) SetDebug(debug bool) {
// Get gets the specified measurement point from the accumulator // Get gets the specified measurement point from the accumulator
func (a *Accumulator) Get(measurement string) (*Metric, bool) { func (a *Accumulator) Get(measurement string) (*Metric, bool) {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics { for _, p := range a.Metrics {
if p.Measurement == measurement { if p.Measurement == measurement {
return p, true return p, true
@ -261,6 +265,8 @@ func (a *Accumulator) Get(measurement string) (*Metric, bool) {
} }
func (a *Accumulator) HasTag(measurement string, key string) bool { func (a *Accumulator) HasTag(measurement string, key string) bool {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics { for _, p := range a.Metrics {
if p.Measurement == measurement { if p.Measurement == measurement {
_, ok := p.Tags[key] _, ok := p.Tags[key]
@ -271,6 +277,8 @@ func (a *Accumulator) HasTag(measurement string, key string) bool {
} }
func (a *Accumulator) TagSetValue(measurement string, key string) string { func (a *Accumulator) TagSetValue(measurement string, key string) string {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics { for _, p := range a.Metrics {
if p.Measurement == measurement { if p.Measurement == measurement {
v, ok := p.Tags[key] v, ok := p.Tags[key]
@ -283,6 +291,8 @@ func (a *Accumulator) TagSetValue(measurement string, key string) string {
} }
func (a *Accumulator) TagValue(measurement string, key string) string { func (a *Accumulator) TagValue(measurement string, key string) string {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics { for _, p := range a.Metrics {
if p.Measurement == measurement { if p.Measurement == measurement {
v, ok := p.Tags[key] v, ok := p.Tags[key]
@ -323,13 +333,13 @@ func (a *Accumulator) NFields() int {
// Wait waits for the given number of metrics to be added to the accumulator. // Wait waits for the given number of metrics to be added to the accumulator.
func (a *Accumulator) Wait(n int) { func (a *Accumulator) Wait(n int) {
a.Lock() a.Lock()
defer a.Unlock()
if a.Cond == nil { if a.Cond == nil {
a.Cond = sync.NewCond(&a.Mutex) a.Cond = sync.NewCond(&a.Mutex)
} }
for int(a.NMetrics()) < n { for int(a.NMetrics()) < n {
a.Cond.Wait() a.Cond.Wait()
} }
a.Unlock()
} }
// WaitError waits for the given number of errors to be added to the accumulator. // WaitError waits for the given number of errors to be added to the accumulator.