Fix reconnection issues mqtt (#8821)
This commit is contained in:
parent
c25ae5295b
commit
f3a208ee28
|
|
@ -80,6 +80,7 @@ following works:
|
|||
- github.com/googleapis/gax-go [BSD 3-Clause "New" or "Revised" License](https://github.com/googleapis/gax-go/blob/master/LICENSE)
|
||||
- github.com/gopcua/opcua [MIT License](https://github.com/gopcua/opcua/blob/master/LICENSE)
|
||||
- github.com/gorilla/mux [BSD 3-Clause "New" or "Revised" License](https://github.com/gorilla/mux/blob/master/LICENSE)
|
||||
- github.com/gorilla/websocket [BSD 2-Clause "Simplified" License](https://github.com/gorilla/websocket/blob/master/LICENSE)
|
||||
- github.com/gosnmp/gosnmp [BSD 2-Clause "Simplified" License](https://github.com/gosnmp/gosnmp/blob/master/LICENSE)
|
||||
- github.com/grpc-ecosystem/grpc-gateway [BSD 3-Clause "New" or "Revised" License](https://github.com/grpc-ecosystem/grpc-gateway/blob/master/LICENSE.txt)
|
||||
- github.com/hailocab/go-hostpool [MIT License](https://github.com/hailocab/go-hostpool/blob/master/LICENSE)
|
||||
|
|
|
|||
2
go.mod
2
go.mod
|
|
@ -51,7 +51,7 @@ require (
|
|||
github.com/docker/go-connections v0.3.0 // indirect
|
||||
github.com/docker/go-units v0.3.3 // indirect
|
||||
github.com/docker/libnetwork v0.8.0-dev.2.0.20181012153825-d7b61745d166
|
||||
github.com/eclipse/paho.mqtt.golang v1.2.0
|
||||
github.com/eclipse/paho.mqtt.golang v1.3.0
|
||||
github.com/ericchiang/k8s v1.2.0
|
||||
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32
|
||||
github.com/go-logfmt/logfmt v0.4.0
|
||||
|
|
|
|||
7
go.sum
7
go.sum
|
|
@ -204,8 +204,8 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8
|
|||
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
|
||||
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
|
||||
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
|
||||
github.com/eclipse/paho.mqtt.golang v1.2.0 h1:1F8mhG9+aO5/xpdtFkW4SxOJB67ukuDC3t2y2qayIX0=
|
||||
github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts=
|
||||
github.com/eclipse/paho.mqtt.golang v1.3.0 h1:MU79lqr3FKNKbSrGN7d7bNYqh8MwWW7Zcx0iG+VIw9I=
|
||||
github.com/eclipse/paho.mqtt.golang v1.3.0/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
|
||||
github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
|
||||
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
|
||||
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
||||
|
|
@ -336,6 +336,8 @@ github.com/gorilla/context v1.1.1 h1:AWwleXJkX/nhcU9bZSnZoi3h/qGYqQAGhq6zZe/aQW8
|
|||
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
|
||||
github.com/gorilla/mux v1.6.2 h1:Pgr17XVTNXAk3q/r4CpKzC5xBM/qW1uVLV+IhRZpIIk=
|
||||
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
|
||||
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
|
||||
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/gosnmp/gosnmp v1.29.0 h1:fEkud7oiYVzR64L+/BQA7uvp+7COI9+XkrUQi8JunYM=
|
||||
github.com/gosnmp/gosnmp v1.29.0/go.mod h1:Ux0YzU4nV5yDET7dNIijd0VST0BCy8ijBf+gTVFQeaM=
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo=
|
||||
|
|
@ -729,6 +731,7 @@ golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLL
|
|||
golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI=
|
||||
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
|
||||
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
|
||||
golang.org/x/net v0.0.0-20200904194848-62affa334b73 h1:MXfv8rhZWmFeqX3GNZRsd6vOLoaCHjYEX3qkRo3YBUA=
|
||||
golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
|
||||
|
|
|
|||
|
|
@ -184,6 +184,7 @@ func (m *MQTTConsumer) Init() error {
|
|||
}
|
||||
|
||||
m.opts = opts
|
||||
m.messages = map[telegraf.TrackingID]bool{}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
@ -221,9 +222,6 @@ func (m *MQTTConsumer) connect() error {
|
|||
|
||||
m.Log.Infof("Connected %v", m.Servers)
|
||||
m.state = Connected
|
||||
m.messagesMutex.Lock()
|
||||
m.messages = make(map[telegraf.TrackingID]bool)
|
||||
m.messagesMutex.Unlock()
|
||||
|
||||
// Persistent sessions should skip subscription if a session is present, as
|
||||
// the subscriptions are stored by the server.
|
||||
|
|
|
|||
|
|
@ -63,6 +63,7 @@ func (p *FakeParser) SetDefaultTags(tags map[string]string) {
|
|||
|
||||
type FakeToken struct {
|
||||
sessionPresent bool
|
||||
complete chan struct{}
|
||||
}
|
||||
|
||||
// FakeToken satisfies mqtt.Token
|
||||
|
|
@ -84,6 +85,10 @@ func (t *FakeToken) SessionPresent() bool {
|
|||
return t.sessionPresent
|
||||
}
|
||||
|
||||
func (t *FakeToken) Done() <-chan struct{} {
|
||||
return t.complete
|
||||
}
|
||||
|
||||
// Test the basic lifecycle transitions of the plugin.
|
||||
func TestLifecycleSanity(t *testing.T) {
|
||||
var acc testutil.Accumulator
|
||||
|
|
|
|||
Loading…
Reference in New Issue