2022-05-25 22:48:59 +08:00
//go:generate ../../../tools/readme_config_includer/generator
2016-01-08 08:26:33 +08:00
package graphite
import (
2017-06-14 04:42:11 +08:00
"crypto/tls"
2022-05-25 22:48:59 +08:00
_ "embed"
2016-01-08 08:26:33 +08:00
"errors"
2022-07-26 03:06:30 +08:00
"fmt"
2017-01-25 04:50:29 +08:00
"io"
2016-01-08 08:26:33 +08:00
"math/rand"
"net"
2022-07-26 03:06:30 +08:00
"strings"
2016-01-08 08:26:33 +08:00
"time"
2016-02-11 06:50:07 +08:00
"github.com/influxdata/telegraf"
2020-06-26 02:44:22 +08:00
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
2016-02-11 06:50:07 +08:00
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
2016-01-08 08:26:33 +08:00
)
2022-05-25 22:48:59 +08:00
//go:embed sample.conf
var sampleConfig string
2016-01-08 08:26:33 +08:00
type Graphite struct {
2021-05-19 00:29:30 +08:00
GraphiteTagSupport bool ` toml:"graphite_tag_support" `
GraphiteTagSanitizeMode string ` toml:"graphite_tag_sanitize_mode" `
GraphiteSeparator string ` toml:"graphite_separator" `
2017-11-01 08:00:06 +08:00
// URL is only for backwards compatibility
2021-02-09 00:18:40 +08:00
Servers [ ] string ` toml:"servers" `
Prefix string ` toml:"prefix" `
Template string ` toml:"template" `
Templates [ ] string ` toml:"templates" `
Timeout int ` toml:"timeout" `
Log telegraf . Logger ` toml:"-" `
conns [ ] net . Conn
2018-05-05 07:33:23 +08:00
tlsint . ClientConfig
2022-07-26 03:06:30 +08:00
failedServers [ ] string
2016-01-08 08:26:33 +08:00
}
2022-05-25 22:48:59 +08:00
func ( * Graphite ) SampleConfig ( ) string {
return sampleConfig
}
2016-01-08 08:26:33 +08:00
func ( g * Graphite ) Connect ( ) error {
// Set default values
if g . Timeout <= 0 {
g . Timeout = 2
}
if len ( g . Servers ) == 0 {
g . Servers = append ( g . Servers , "localhost:2003" )
}
2017-06-14 04:42:11 +08:00
// Set tls config
2018-05-05 07:33:23 +08:00
tlsConfig , err := g . ClientConfig . TLSConfig ( )
2017-06-14 04:42:11 +08:00
if err != nil {
return err
}
2022-07-26 03:06:30 +08:00
// Only retry the failed servers
servers := g . Servers
if len ( g . failedServers ) > 0 {
servers = g . failedServers
// Remove failed server from exisiting connections
var workingConns [ ] net . Conn
for _ , conn := range g . conns {
var found bool
for _ , server := range servers {
if conn . RemoteAddr ( ) . String ( ) == server {
found = true
break
}
}
if ! found {
workingConns = append ( workingConns , conn )
}
}
g . conns = workingConns
}
2016-01-08 08:26:33 +08:00
// Get Connections
var conns [ ] net . Conn
2022-07-26 03:06:30 +08:00
var failedServers [ ] string
for _ , server := range servers {
2017-06-14 04:42:11 +08:00
// Dialer with timeout
d := net . Dialer { Timeout : time . Duration ( g . Timeout ) * time . Second }
// Get secure connection if tls config is set
var conn net . Conn
2018-05-05 07:33:23 +08:00
if tlsConfig != nil {
conn , err = tls . DialWithDialer ( & d , "tcp" , server , tlsConfig )
2017-06-14 04:42:11 +08:00
} else {
conn , err = d . Dial ( "tcp" , server )
}
2016-01-08 08:26:33 +08:00
if err == nil {
conns = append ( conns , conn )
2022-07-26 03:06:30 +08:00
} else {
g . Log . Debugf ( "Failed to establish connection: %v" , err )
failedServers = append ( failedServers , server )
2016-01-08 08:26:33 +08:00
}
}
2022-07-26 03:06:30 +08:00
if len ( g . failedServers ) > 0 {
g . conns = append ( g . conns , conns ... )
g . failedServers = failedServers
} else {
g . conns = conns
}
2016-01-08 08:26:33 +08:00
return nil
}
func ( g * Graphite ) Close ( ) error {
// Closing all connections
for _ , conn := range g . conns {
2021-11-25 03:40:25 +08:00
_ = conn . Close ( )
2016-01-08 08:26:33 +08:00
}
return nil
}
2017-01-25 04:50:29 +08:00
// We need check eof as we can write to nothing without noticing anything is wrong
// the connection stays in a close_wait
// We can detect that by finding an eof
// if not for this, we can happily write and flush without getting errors (in Go) but getting RST tcp packets back (!)
// props to Tv via the authors of carbon-relay-ng` for this trick.
2022-07-26 03:06:30 +08:00
func ( g * Graphite ) checkEOF ( conn net . Conn ) error {
2017-01-25 04:50:29 +08:00
b := make ( [ ] byte , 1024 )
2021-11-25 03:40:25 +08:00
if err := conn . SetReadDeadline ( time . Now ( ) . Add ( 10 * time . Millisecond ) ) ; err != nil {
2022-07-26 03:06:30 +08:00
g . Log . Debugf ( "Couldn't set read deadline for connection due to error %v with remote address %s. closing conn explicitly" , err , conn . RemoteAddr ( ) . String ( ) )
err = conn . Close ( )
g . Log . Debugf ( "Failed to close the connection: %v" , err )
return err
2021-11-25 03:40:25 +08:00
}
2017-01-25 04:50:29 +08:00
num , err := conn . Read ( b )
if err == io . EOF {
2022-07-26 03:06:30 +08:00
g . Log . Debugf ( "Conn %s is closed. closing conn explicitly" , conn . RemoteAddr ( ) . String ( ) )
err = conn . Close ( )
g . Log . Debugf ( "Failed to close the connection: %v" , err )
return err
2017-01-25 04:50:29 +08:00
}
// just in case i misunderstand something or the remote behaves badly
if num != 0 {
2021-02-09 00:18:40 +08:00
g . Log . Infof ( "conn %s .conn.Read data? did not expect that. data: %s" , conn , b [ : num ] )
2017-01-25 04:50:29 +08:00
}
2022-07-26 03:06:30 +08:00
// Log non-timeout errors and close.
2017-01-25 04:50:29 +08:00
if e , ok := err . ( net . Error ) ; ! ( ok && e . Timeout ( ) ) {
2022-07-26 03:06:30 +08:00
g . Log . Debugf ( "conn %s checkEOF .conn.Read returned err != EOF, which is unexpected. closing conn. error: %s" , conn , err )
err = conn . Close ( )
g . Log . Debugf ( "Failed to close the connection: %v" , err )
return err
2017-01-25 04:50:29 +08:00
}
2022-07-26 03:06:30 +08:00
return nil
2017-01-25 04:50:29 +08:00
}
2016-01-08 08:26:33 +08:00
// Choose a random server in the cluster to write to until a successful write
// occurs, logging each unsuccessful. If all servers fail, return error.
2016-01-28 07:15:14 +08:00
func ( g * Graphite ) Write ( metrics [ ] telegraf . Metric ) error {
2016-01-08 08:26:33 +08:00
// Prepare data
2016-11-22 20:51:57 +08:00
var batch [ ] byte
2021-05-19 00:29:30 +08:00
s , err := serializers . NewGraphiteSerializer ( g . Prefix , g . Template , g . GraphiteTagSupport , g . GraphiteTagSanitizeMode , g . GraphiteSeparator , g . Templates )
2016-02-11 06:50:07 +08:00
if err != nil {
return err
}
2016-01-08 08:26:33 +08:00
2016-02-11 06:50:07 +08:00
for _ , metric := range metrics {
2016-11-22 20:51:57 +08:00
buf , err := s . Serialize ( metric )
2016-02-11 06:50:07 +08:00
if err != nil {
2021-02-09 00:18:40 +08:00
g . Log . Errorf ( "Error serializing some metrics to graphite: %s" , err . Error ( ) )
2016-01-08 08:26:33 +08:00
}
2016-11-22 20:51:57 +08:00
batch = append ( batch , buf ... )
2016-01-08 08:26:33 +08:00
}
2018-01-18 07:27:24 +08:00
err = g . send ( batch )
2022-07-26 03:06:30 +08:00
// If a send failed for a server, try to reconnect to that server
if len ( g . failedServers ) > 0 {
g . Log . Debugf ( "Reconnecting and retrying for the following servers: %s" , strings . Join ( g . failedServers , "," ) )
err = g . Connect ( )
if err != nil {
return fmt . Errorf ( "Failed to reconnect: %v" , err )
}
2018-01-18 07:27:24 +08:00
err = g . send ( batch )
}
return err
}
func ( g * Graphite ) send ( batch [ ] byte ) error {
2016-01-08 08:26:33 +08:00
// This will get set to nil if a successful write occurs
2022-07-26 03:06:30 +08:00
globalErr := errors . New ( "could not write to any Graphite server in cluster" )
2018-01-18 07:27:24 +08:00
2016-01-08 08:26:33 +08:00
// Send data to a random server
p := rand . Perm ( len ( g . conns ) )
for _ , n := range p {
2016-07-13 04:44:11 +08:00
if g . Timeout > 0 {
2022-07-26 03:06:30 +08:00
err := g . conns [ n ] . SetWriteDeadline ( time . Now ( ) . Add ( time . Duration ( g . Timeout ) * time . Second ) )
if err != nil {
g . Log . Errorf ( "failed to set write deadline for %s: %v" , g . conns [ n ] . RemoteAddr ( ) . String ( ) , err )
// Mark server as failed so a new connection will be made
g . failedServers = append ( g . failedServers , g . conns [ n ] . RemoteAddr ( ) . String ( ) )
}
}
err := g . checkEOF ( g . conns [ n ] )
if err != nil {
// Mark server as failed so a new connection will be made
g . failedServers = append ( g . failedServers , g . conns [ n ] . RemoteAddr ( ) . String ( ) )
break
2016-07-13 04:44:11 +08:00
}
2016-11-22 20:51:57 +08:00
if _ , e := g . conns [ n ] . Write ( batch ) ; e != nil {
2016-01-08 08:26:33 +08:00
// Error
2022-07-26 03:06:30 +08:00
g . Log . Debugf ( "Graphite Error: " + e . Error ( ) )
2021-03-13 04:21:51 +08:00
// Close explicitly and let's try the next one
2022-07-26 03:06:30 +08:00
err := g . conns [ n ] . Close ( )
g . Log . Debugf ( "Failed to close the connection: %v" , err )
// Mark server as failed so a new connection will be made
g . failedServers = append ( g . failedServers , g . conns [ n ] . RemoteAddr ( ) . String ( ) )
2016-01-08 08:26:33 +08:00
} else {
2022-07-26 03:06:30 +08:00
globalErr = nil
2016-01-08 08:26:33 +08:00
break
}
}
2018-01-18 07:27:24 +08:00
2022-07-26 03:06:30 +08:00
return globalErr
2016-01-08 08:26:33 +08:00
}
func init ( ) {
2016-01-28 05:21:36 +08:00
outputs . Add ( "graphite" , func ( ) telegraf . Output {
2016-01-08 08:26:33 +08:00
return & Graphite { }
} )
}