Support Riemann-Protobuff Listener (#8163)

This commit is contained in:
Vipin Menon 2020-11-27 22:00:45 +05:30 committed by GitHub
parent 0ce55bbd4a
commit d536f610cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 507 additions and 1 deletions

View File

@ -123,6 +123,7 @@ following works:
- github.com/prometheus/common [Apache License 2.0](https://github.com/prometheus/common/blob/master/LICENSE)
- github.com/prometheus/procfs [Apache License 2.0](https://github.com/prometheus/procfs/blob/master/LICENSE)
- github.com/rcrowley/go-metrics [MIT License](https://github.com/rcrowley/go-metrics/blob/master/LICENSE)
- github.com/riemann/riemann-go-client [MIT License](https://github.com/riemann/riemann-go-client/blob/master/LICENSE)
- github.com/safchain/ethtool [Apache License 2.0](https://github.com/safchain/ethtool/blob/master/LICENSE)
- github.com/samuel/go-zookeeper [BSD 3-Clause Clear License](https://github.com/samuel/go-zookeeper/blob/master/LICENSE)
- github.com/shirou/gopsutil [BSD 3-Clause Clear License](https://github.com/shirou/gopsutil/blob/master/LICENSE)
@ -172,6 +173,7 @@ following works:
- gopkg.in/mgo.v2 [BSD 2-Clause "Simplified" License](https://github.com/go-mgo/mgo/blob/v2/LICENSE)
- gopkg.in/olivere/elastic.v5 [MIT License](https://github.com/olivere/elastic/blob/v5.0.76/LICENSE)
- gopkg.in/tomb.v1 [BSD 3-Clause Clear License](https://github.com/go-tomb/tomb/blob/v1/LICENSE)
- gopkg.in/tomb.v2 [BSD 3-Clause Clear License](https://github.com/go-tomb/tomb/blob/v2/LICENSE)
- gopkg.in/yaml.v2 [Apache License 2.0](https://github.com/go-yaml/yaml/blob/v2.2.2/LICENSE)
- gopkg.in/yaml.v3 [Apache License 2.0](https://github.com/go-yaml/yaml/blob/v3/LICENSE)
- modernc.org/libc [BSD 3-Clause "New" or "Revised" License](https://gitlab.com/cznic/libc/-/blob/master/LICENSE)

3
go.mod
View File

@ -108,6 +108,7 @@ require (
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.9.1
github.com/prometheus/procfs v0.0.8
github.com/riemann/riemann-go-client v0.5.0
github.com/safchain/ethtool v0.0.0-20200218184317-f459e2d13664
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec // indirect
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b // indirect
@ -149,7 +150,7 @@ require (
gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce
gopkg.in/olivere/elastic.v5 v5.0.70
gopkg.in/yaml.v2 v2.2.8
gotest.tools v2.2.0+incompatible // indirect
gotest.tools v2.2.0+incompatible
honnef.co/go/tools v0.0.1-2020.1.3 // indirect
k8s.io/apimachinery v0.17.1 // indirect
modernc.org/sqlite v1.7.4

6
go.sum
View File

@ -257,6 +257,7 @@ github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/protobuf v0.0.0-20161109072736-4bd1920723d7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v0.0.0-20170307001533-c9c7427a2a70/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
@ -539,6 +540,8 @@ github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/riemann/riemann-go-client v0.5.0 h1:yPP7tz1vSYJkSZvZFCsMiDsHHXX57x8/fEX3qyEXuAA=
github.com/riemann/riemann-go-client v0.5.0/go.mod h1:FMiaOL8dgBnRfgwENzV0xlYJ2eCbV1o7yqVwOBLbShQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/safchain/ethtool v0.0.0-20200218184317-f459e2d13664 h1:gvolwzuDhul9qK6/oHqxCHD5TEYfsWNBGidOeG6kvpk=
github.com/safchain/ethtool v0.0.0-20200218184317-f459e2d13664/go.mod h1:Z0q5wiBQGYcxhMZ6gUqHn6pYNLypFAvaL3UvgZLR0U4=
@ -674,6 +677,7 @@ golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73r
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190206173232-65e2d4e15006/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
@ -906,6 +910,8 @@ gopkg.in/olivere/elastic.v5 v5.0.70/go.mod h1:FylZT6jQWtfHsicejzOm3jIMVPOAksa80i
gopkg.in/tomb.v1 v1.0.0-20140529071818-c131134a1947/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 h1:yiW+nvdHb9LVqSHQBXfZCieqV4fzYhNBql77zY0ykqs=
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637/go.mod h1:BHsqpu/nsuzkT5BpiH1EMZPLyqSMM8JbIavyFACoFNk=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I=

View File

@ -146,6 +146,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/redis"
_ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb"
_ "github.com/influxdata/telegraf/plugins/inputs/riak"
_ "github.com/influxdata/telegraf/plugins/inputs/riemann_listener"
_ "github.com/influxdata/telegraf/plugins/inputs/salesforce"
_ "github.com/influxdata/telegraf/plugins/inputs/sensors"
_ "github.com/influxdata/telegraf/plugins/inputs/sflow"

View File

@ -0,0 +1,42 @@
# Riemann Listener Input Plugin
The Riemann Listener is a simple input plugin that listens for messages from
client that use riemann clients using riemann-protobuff format.
### Configuration:
This is a sample configuration for the plugin.
```toml
[[inputs.rimann_listener]]
## URL to listen on
## Default is "tcp://:5555"
# service_address = "tcp://:8094"
# service_address = "tcp://127.0.0.1:http"
# service_address = "tcp4://:8094"
# service_address = "tcp6://:8094"
# service_address = "tcp6://[2001:db8::1]:8094"
## Maximum number of concurrent connections.
## 0 (default) is unlimited.
# max_connections = 1024
## Read timeout.
## 0 (default) is unlimited.
# read_timeout = "30s"
## Optional TLS configuration.
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Enables client authentication if set.
# tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]
## Maximum socket buffer size (in bytes when no unit specified).
# read_buffer_size = "64KiB"
## Period between keep alive probes.
## 0 disables keep alive probes.
## Defaults to the OS configuration.
# keep_alive_period = "5m"
```
Just like Riemann the default port is 5555. This can be configured, refer configuration above.
Riemann `Service` is mapped as `measurement`. `metric` and `TTL` are converted into field values.
As Riemann tags as simply an array, they are converted into the `influx_line` format key-value, where both key and value are the tags.

View File

@ -0,0 +1,399 @@
package riemann_listener
import (
"bytes"
"context"
"crypto/tls"
"encoding/binary"
"fmt"
"io"
"log"
"net"
"os"
"os/signal"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf/metric"
"github.com/gogo/protobuf/proto"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
riemanngo "github.com/riemann/riemann-go-client"
riemangoProto "github.com/riemann/riemann-go-client/proto"
)
type RiemannSocketListener struct {
ServiceAddress string `toml:"service_address"`
MaxConnections int `toml:"max_connections"`
ReadBufferSize internal.Size `toml:"read_buffer_size"`
ReadTimeout *internal.Duration `toml:"read_timeout"`
KeepAlivePeriod *internal.Duration `toml:"keep_alive_period"`
SocketMode string `toml:"socket_mode"`
tlsint.ServerConfig
wg sync.WaitGroup
Log telegraf.Logger
telegraf.Accumulator
}
type setReadBufferer interface {
SetReadBuffer(bytes int) error
}
type riemannListener struct {
net.Listener
*RiemannSocketListener
sockType string
connections map[string]net.Conn
connectionsMtx sync.Mutex
}
func (rsl *riemannListener) listen(ctx context.Context) {
rsl.connections = map[string]net.Conn{}
wg := sync.WaitGroup{}
select {
case <-ctx.Done():
rsl.closeAllConnections()
wg.Wait()
return
default:
for {
c, err := rsl.Accept()
if err != nil {
if !strings.HasSuffix(err.Error(), ": use of closed network connection") {
rsl.Log.Error(err.Error())
}
break
}
if rsl.ReadBufferSize.Size > 0 {
if srb, ok := c.(setReadBufferer); ok {
srb.SetReadBuffer(int(rsl.ReadBufferSize.Size))
} else {
rsl.Log.Warnf("Unable to set read buffer on a %s socket", rsl.sockType)
}
}
rsl.connectionsMtx.Lock()
if rsl.MaxConnections > 0 && len(rsl.connections) >= rsl.MaxConnections {
rsl.connectionsMtx.Unlock()
c.Close()
continue
}
rsl.connections[c.RemoteAddr().String()] = c
rsl.connectionsMtx.Unlock()
if err := rsl.setKeepAlive(c); err != nil {
rsl.Log.Errorf("Unable to configure keep alive %q: %s", rsl.ServiceAddress, err.Error())
}
wg.Add(1)
go func() {
defer wg.Done()
rsl.read(c)
}()
}
rsl.closeAllConnections()
wg.Wait()
}
}
func (rsl *riemannListener) closeAllConnections() {
rsl.connectionsMtx.Lock()
for _, c := range rsl.connections {
c.Close()
}
rsl.connectionsMtx.Unlock()
}
func (rsl *riemannListener) setKeepAlive(c net.Conn) error {
if rsl.KeepAlivePeriod == nil {
return nil
}
tcpc, ok := c.(*net.TCPConn)
if !ok {
return fmt.Errorf("cannot set keep alive on a %s socket", strings.SplitN(rsl.ServiceAddress, "://", 2)[0])
}
if rsl.KeepAlivePeriod.Duration == 0 {
return tcpc.SetKeepAlive(false)
}
if err := tcpc.SetKeepAlive(true); err != nil {
return err
}
return tcpc.SetKeepAlivePeriod(rsl.KeepAlivePeriod.Duration)
}
func (rsl *riemannListener) removeConnection(c net.Conn) {
rsl.connectionsMtx.Lock()
delete(rsl.connections, c.RemoteAddr().String())
rsl.connectionsMtx.Unlock()
}
//Utilities
/*
readMessages will read Riemann messages in binary format
from the TCP connection. byte Array p size will depend on the size
of the riemann message as sent by the cleint
*/
func readMessages(r io.Reader, p []byte) error {
for len(p) > 0 {
n, err := r.Read(p)
p = p[n:]
if err != nil {
return err
}
}
return nil
}
func checkError(err error) {
log.Println("The error is")
if err != nil {
log.Println(err.Error())
}
}
func (rsl *riemannListener) read(conn net.Conn) {
defer rsl.removeConnection(conn)
defer conn.Close()
var err error
for {
if rsl.ReadTimeout != nil && rsl.ReadTimeout.Duration > 0 {
err = conn.SetDeadline(time.Now().Add(rsl.ReadTimeout.Duration))
}
messagePb := &riemangoProto.Msg{}
var header uint32
// First obtain the size of the riemann event from client and acknowledge
if err = binary.Read(conn, binary.BigEndian, &header); err != nil {
if err.Error() != "EOF" {
rsl.Log.Debugf("Failed to read header")
riemannReturnErrorResponse(conn, err.Error())
return
}
return
}
data := make([]byte, header)
if err = readMessages(conn, data); err != nil {
rsl.Log.Debugf("Failed to read body: %s", err.Error())
riemannReturnErrorResponse(conn, "Failed to read body")
return
}
if err = proto.Unmarshal(data, messagePb); err != nil {
rsl.Log.Debugf("Failed to unmarshal: %s", err.Error())
riemannReturnErrorResponse(conn, "Failed to unmarshal")
return
}
riemannEvents := riemanngo.ProtocolBuffersToEvents(messagePb.Events)
for _, m := range riemannEvents {
if m.Service == "" {
riemannReturnErrorResponse(conn, "No Service Name")
return
}
tags := make(map[string]string)
fieldValues := map[string]interface{}{}
for _, tag := range m.Tags {
tags[strings.ReplaceAll(tag, " ", "_")] = tag
}
tags["Host"] = m.Host
tags["Description"] = m.Description
tags["State"] = m.State
fieldValues["Metric"] = m.Metric
fieldValues["TTL"] = m.TTL.Seconds()
singleMetric, err := metric.New(m.Service, tags, fieldValues, m.Time, telegraf.Untyped)
if err != nil {
rsl.Log.Debugf("Could not create metric for service %s at %s", m.Service, m.Time.String())
riemannReturnErrorResponse(conn, "Could not create metric")
return
}
rsl.AddMetric(singleMetric)
}
riemannReturnResponse(conn)
}
}
func riemannReturnResponse(conn net.Conn) {
t := true
message := new(riemangoProto.Msg)
message.Ok = &t
returnData, err := proto.Marshal(message)
if err != nil {
checkError(err)
return
}
b := new(bytes.Buffer)
if err = binary.Write(b, binary.BigEndian, uint32(len(returnData))); err != nil {
checkError(err)
}
// send the msg length
if _, err = conn.Write(b.Bytes()); err != nil {
checkError(err)
}
if _, err = conn.Write(returnData); err != nil {
checkError(err)
}
}
func riemannReturnErrorResponse(conn net.Conn, errorMessage string) {
t := false
message := new(riemangoProto.Msg)
message.Ok = &t
message.Error = &errorMessage
returnData, err := proto.Marshal(message)
if err != nil {
checkError(err)
return
}
b := new(bytes.Buffer)
if err = binary.Write(b, binary.BigEndian, uint32(len(returnData))); err != nil {
checkError(err)
}
// send the msg length
if _, err = conn.Write(b.Bytes()); err != nil {
checkError(err)
}
if _, err = conn.Write(returnData); err != nil {
log.Println("Somethign")
checkError(err)
}
}
func (rsl *RiemannSocketListener) Description() string {
return "Riemann protobuff listener."
}
func (rsl *RiemannSocketListener) SampleConfig() string {
return `
## URL to listen on.
## Default is "tcp://:5555"
# service_address = "tcp://:8094"
# service_address = "tcp://127.0.0.1:http"
# service_address = "tcp4://:8094"
# service_address = "tcp6://:8094"
# service_address = "tcp6://[2001:db8::1]:8094"
## Maximum number of concurrent connections.
## 0 (default) is unlimited.
# max_connections = 1024
## Read timeout.
## 0 (default) is unlimited.
# read_timeout = "30s"
## Optional TLS configuration.
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Enables client authentication if set.
# tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]
## Maximum socket buffer size (in bytes when no unit specified).
# read_buffer_size = "64KiB"
## Period between keep alive probes.
## 0 disables keep alive probes.
## Defaults to the OS configuration.
# keep_alive_period = "5m"
`
}
func (rsl *RiemannSocketListener) Gather(_ telegraf.Accumulator) error {
return nil
}
func (rsl *RiemannSocketListener) Start(acc telegraf.Accumulator) error {
ctx, cancelFunc := context.WithCancel(context.Background())
go processOsSignals(cancelFunc)
rsl.Accumulator = acc
if rsl.ServiceAddress == "" {
rsl.Log.Warnf("Using default service_address tcp://:5555")
rsl.ServiceAddress = "tcp://:5555"
}
spl := strings.SplitN(rsl.ServiceAddress, "://", 2)
if len(spl) != 2 {
return fmt.Errorf("invalid service address: %s", rsl.ServiceAddress)
}
protocol := spl[0]
addr := spl[1]
switch protocol {
case "tcp", "tcp4", "tcp6":
tlsCfg, err := rsl.ServerConfig.TLSConfig()
if err != nil {
return err
}
var l net.Listener
if tlsCfg == nil {
l, err = net.Listen(protocol, addr)
} else {
l, err = tls.Listen(protocol, addr, tlsCfg)
}
if err != nil {
return err
}
rsl.Log.Infof("Listening on %s://%s", protocol, l.Addr())
rsl := &riemannListener{
Listener: l,
RiemannSocketListener: rsl,
sockType: spl[0],
}
rsl.wg = sync.WaitGroup{}
rsl.wg.Add(1)
go func() {
defer rsl.wg.Done()
rsl.listen(ctx)
}()
default:
return fmt.Errorf("unknown protocol '%s' in '%s'", protocol, rsl.ServiceAddress)
}
return nil
}
// Handle cancellations from the process
func processOsSignals(cancelFunc context.CancelFunc) {
signalChan := make(chan os.Signal)
signal.Notify(signalChan, os.Interrupt)
for {
sig := <-signalChan
switch sig {
case os.Interrupt:
log.Println("Signal SIGINT is received, probably due to `Ctrl-C`, exiting ...")
cancelFunc()
return
}
}
}
func (rsl *RiemannSocketListener) Stop() {
rsl.wg.Done()
rsl.wg.Wait()
os.Exit(0)
}
func newRiemannSocketListener() *RiemannSocketListener {
return &RiemannSocketListener{}
}
func init() {
inputs.Add("riemann_listener", func() telegraf.Input { return newRiemannSocketListener() })
}

View File

@ -0,0 +1,55 @@
package riemann_listener
import (
"log"
"testing"
"time"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/testutil"
riemanngo "github.com/riemann/riemann-go-client"
"github.com/stretchr/testify/require"
"gotest.tools/assert"
)
func TestSocketListener_tcp(t *testing.T) {
log.Println("Entering")
sl := newRiemannSocketListener()
sl.Log = testutil.Logger{}
sl.ServiceAddress = "tcp://127.0.0.1:5555"
sl.ReadBufferSize = internal.Size{Size: 1024}
acc := &testutil.Accumulator{}
err := sl.Start(acc)
require.NoError(t, err)
defer sl.Stop()
testStats(t, sl)
testMissingService(t, sl)
}
func testStats(t *testing.T, sl *RiemannSocketListener) {
c := riemanngo.NewTCPClient("127.0.0.1:5555", 5*time.Second)
err := c.Connect()
if err != nil {
log.Println("Error")
panic(err)
}
defer c.Close()
result, err := riemanngo.SendEvent(c, &riemanngo.Event{
Service: "hello",
})
assert.Equal(t, result.GetOk(), true)
}
func testMissingService(t *testing.T, sl *RiemannSocketListener) {
c := riemanngo.NewTCPClient("127.0.0.1:5555", 5*time.Second)
err := c.Connect()
if err != nil {
panic(err)
}
defer c.Close()
result, err := riemanngo.SendEvent(c, &riemanngo.Event{})
assert.Equal(t, result.GetOk(), false)
}