From f3a208ee2877608aab670becab2ab9b840fcb87f Mon Sep 17 00:00:00 2001 From: Helen Weller <38860767+helenosheaa@users.noreply.github.com> Date: Thu, 11 Feb 2021 11:45:13 -0500 Subject: [PATCH] Fix reconnection issues mqtt (#8821) --- docs/LICENSE_OF_DEPENDENCIES.md | 1 + go.mod | 2 +- go.sum | 7 +++++-- plugins/inputs/mqtt_consumer/mqtt_consumer.go | 4 +--- plugins/inputs/mqtt_consumer/mqtt_consumer_test.go | 5 +++++ 5 files changed, 13 insertions(+), 6 deletions(-) diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index 154f13a88..543d59c3e 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -80,6 +80,7 @@ following works: - github.com/googleapis/gax-go [BSD 3-Clause "New" or "Revised" License](https://github.com/googleapis/gax-go/blob/master/LICENSE) - github.com/gopcua/opcua [MIT License](https://github.com/gopcua/opcua/blob/master/LICENSE) - github.com/gorilla/mux [BSD 3-Clause "New" or "Revised" License](https://github.com/gorilla/mux/blob/master/LICENSE) +- github.com/gorilla/websocket [BSD 2-Clause "Simplified" License](https://github.com/gorilla/websocket/blob/master/LICENSE) - github.com/gosnmp/gosnmp [BSD 2-Clause "Simplified" License](https://github.com/gosnmp/gosnmp/blob/master/LICENSE) - github.com/grpc-ecosystem/grpc-gateway [BSD 3-Clause "New" or "Revised" License](https://github.com/grpc-ecosystem/grpc-gateway/blob/master/LICENSE.txt) - github.com/hailocab/go-hostpool [MIT License](https://github.com/hailocab/go-hostpool/blob/master/LICENSE) diff --git a/go.mod b/go.mod index f65401c5a..15a296424 100644 --- a/go.mod +++ b/go.mod @@ -51,7 +51,7 @@ require ( github.com/docker/go-connections v0.3.0 // indirect github.com/docker/go-units v0.3.3 // indirect github.com/docker/libnetwork v0.8.0-dev.2.0.20181012153825-d7b61745d166 - github.com/eclipse/paho.mqtt.golang v1.2.0 + github.com/eclipse/paho.mqtt.golang v1.3.0 github.com/ericchiang/k8s v1.2.0 github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32 github.com/go-logfmt/logfmt v0.4.0 diff --git a/go.sum b/go.sum index 220a706d0..611ab4e49 100644 --- a/go.sum +++ b/go.sum @@ -204,8 +204,8 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8 github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= -github.com/eclipse/paho.mqtt.golang v1.2.0 h1:1F8mhG9+aO5/xpdtFkW4SxOJB67ukuDC3t2y2qayIX0= -github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= +github.com/eclipse/paho.mqtt.golang v1.3.0 h1:MU79lqr3FKNKbSrGN7d7bNYqh8MwWW7Zcx0iG+VIw9I= +github.com/eclipse/paho.mqtt.golang v1.3.0/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc= github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -336,6 +336,8 @@ github.com/gorilla/context v1.1.1 h1:AWwleXJkX/nhcU9bZSnZoi3h/qGYqQAGhq6zZe/aQW8 github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= github.com/gorilla/mux v1.6.2 h1:Pgr17XVTNXAk3q/r4CpKzC5xBM/qW1uVLV+IhRZpIIk= github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gosnmp/gosnmp v1.29.0 h1:fEkud7oiYVzR64L+/BQA7uvp+7COI9+XkrUQi8JunYM= github.com/gosnmp/gosnmp v1.29.0/go.mod h1:Ux0YzU4nV5yDET7dNIijd0VST0BCy8ijBf+gTVFQeaM= github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo= @@ -729,6 +731,7 @@ golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200904194848-62affa334b73 h1:MXfv8rhZWmFeqX3GNZRsd6vOLoaCHjYEX3qkRo3YBUA= golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index 73d41a32f..006aaac25 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -184,6 +184,7 @@ func (m *MQTTConsumer) Init() error { } m.opts = opts + m.messages = map[telegraf.TrackingID]bool{} return nil } @@ -221,9 +222,6 @@ func (m *MQTTConsumer) connect() error { m.Log.Infof("Connected %v", m.Servers) m.state = Connected - m.messagesMutex.Lock() - m.messages = make(map[telegraf.TrackingID]bool) - m.messagesMutex.Unlock() // Persistent sessions should skip subscription if a session is present, as // the subscriptions are stored by the server. diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go index 2d9db2c23..efa921cb1 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go @@ -63,6 +63,7 @@ func (p *FakeParser) SetDefaultTags(tags map[string]string) { type FakeToken struct { sessionPresent bool + complete chan struct{} } // FakeToken satisfies mqtt.Token @@ -84,6 +85,10 @@ func (t *FakeToken) SessionPresent() bool { return t.sessionPresent } +func (t *FakeToken) Done() <-chan struct{} { + return t.complete +} + // Test the basic lifecycle transitions of the plugin. func TestLifecycleSanity(t *testing.T) { var acc testutil.Accumulator