fix(inputs.cloud_pubsub): Fix gzip decompression (#13238)
This commit is contained in:
parent
1b8339ace8
commit
43048aad8c
|
|
@ -62,9 +62,10 @@ type PubSub struct {
|
||||||
wg *sync.WaitGroup
|
wg *sync.WaitGroup
|
||||||
acc telegraf.TrackingAccumulator
|
acc telegraf.TrackingAccumulator
|
||||||
|
|
||||||
undelivered map[telegraf.TrackingID]message
|
undelivered map[telegraf.TrackingID]message
|
||||||
sem semaphore
|
sem semaphore
|
||||||
decoder internal.ContentDecoder
|
decoder internal.ContentDecoder
|
||||||
|
decoderMutex sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*PubSub) SampleConfig() string {
|
func (*PubSub) SampleConfig() string {
|
||||||
|
|
@ -216,10 +217,13 @@ func (ps *PubSub) decompressData(data []byte) ([]byte, error) {
|
||||||
return data, nil
|
return data, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ps.decoderMutex.Lock()
|
||||||
data, err := ps.decoder.Decode(data, int64(ps.MaxDecompressionSize))
|
data, err := ps.decoder.Decode(data, int64(ps.MaxDecompressionSize))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
ps.decoderMutex.Unlock()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
ps.decoderMutex.Unlock()
|
||||||
|
|
||||||
decompressedData := make([]byte, len(data))
|
decompressedData := make([]byte, len(data))
|
||||||
copy(decompressedData, data)
|
copy(decompressedData, data)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue