feat(inputs.socket_listener): Use reception time as timestamp (#15976)
This commit is contained in:
parent
c0a365686b
commit
4e6e2a297b
|
|
@ -12,6 +12,7 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/alitto/pond"
|
"github.com/alitto/pond"
|
||||||
|
|
||||||
|
|
@ -51,6 +52,7 @@ func (l *packetListener) listenData(onData CallbackData, onError CallbackError)
|
||||||
buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet
|
buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet
|
||||||
for {
|
for {
|
||||||
n, src, err := l.conn.ReadFrom(buf)
|
n, src, err := l.conn.ReadFrom(buf)
|
||||||
|
receiveTime := time.Now()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !strings.HasSuffix(err.Error(), ": use of closed network connection") {
|
if !strings.HasSuffix(err.Error(), ": use of closed network connection") {
|
||||||
if onError != nil {
|
if onError != nil {
|
||||||
|
|
@ -74,7 +76,7 @@ func (l *packetListener) listenData(onData CallbackData, onError CallbackError)
|
||||||
src = &net.UnixAddr{Name: l.path, Net: "unixgram"}
|
src = &net.UnixAddr{Name: l.path, Net: "unixgram"}
|
||||||
}
|
}
|
||||||
|
|
||||||
onData(src, body)
|
onData(src, body, receiveTime)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
|
||||||
|
|
@ -9,13 +9,14 @@ import (
|
||||||
"net/url"
|
"net/url"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
common_tls "github.com/influxdata/telegraf/plugins/common/tls"
|
common_tls "github.com/influxdata/telegraf/plugins/common/tls"
|
||||||
)
|
)
|
||||||
|
|
||||||
type CallbackData func(net.Addr, []byte)
|
type CallbackData func(net.Addr, []byte, time.Time)
|
||||||
type CallbackConnection func(net.Addr, io.ReadCloser)
|
type CallbackConnection func(net.Addr, io.ReadCloser)
|
||||||
type CallbackError func(error)
|
type CallbackError func(error)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -153,7 +153,7 @@ func TestListenData(t *testing.T) {
|
||||||
require.NoError(t, parser.Init())
|
require.NoError(t, parser.Init())
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
onData := func(remote net.Addr, data []byte) {
|
onData := func(remote net.Addr, data []byte, _ time.Time) {
|
||||||
m, err := parser.Parse(data)
|
m, err := parser.Parse(data)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
addr, _, err := net.SplitHostPort(remote.String())
|
addr, _, err := net.SplitHostPort(remote.String())
|
||||||
|
|
@ -450,7 +450,7 @@ func TestClosingConnections(t *testing.T) {
|
||||||
require.NoError(t, parser.Init())
|
require.NoError(t, parser.Init())
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
onData := func(_ net.Addr, data []byte) {
|
onData := func(_ net.Addr, data []byte, _ time.Time) {
|
||||||
m, err := parser.Parse(data)
|
m, err := parser.Parse(data)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
acc.AddMetrics(m)
|
acc.AddMetrics(m)
|
||||||
|
|
@ -518,7 +518,7 @@ func TestMaxConnections(t *testing.T) {
|
||||||
// Create callback
|
// Create callback
|
||||||
var errs []error
|
var errs []error
|
||||||
var mu sync.Mutex
|
var mu sync.Mutex
|
||||||
onData := func(_ net.Addr, _ []byte) {}
|
onData := func(_ net.Addr, _ []byte, _ time.Time) {}
|
||||||
onError := func(err error) {
|
onError := func(err error) {
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
errs = append(errs, err)
|
errs = append(errs, err)
|
||||||
|
|
|
||||||
|
|
@ -352,6 +352,7 @@ func (l *streamListener) read(conn net.Conn, onData CallbackData) error {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
receiveTime := time.Now()
|
||||||
src := conn.RemoteAddr()
|
src := conn.RemoteAddr()
|
||||||
if l.path != "" {
|
if l.path != "" {
|
||||||
src = &net.UnixAddr{Name: l.path, Net: "unix"}
|
src = &net.UnixAddr{Name: l.path, Net: "unix"}
|
||||||
|
|
@ -361,7 +362,7 @@ func (l *streamListener) read(conn net.Conn, onData CallbackData) error {
|
||||||
d := make([]byte, len(data))
|
d := make([]byte, len(data))
|
||||||
copy(d, data)
|
copy(d, data)
|
||||||
l.parsePool.Submit(func() {
|
l.parsePool.Submit(func() {
|
||||||
onData(src, d)
|
onData(src, d, receiveTime)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -407,8 +408,9 @@ func (l *streamListener) readAll(conn net.Conn, onData CallbackData) error {
|
||||||
return fmt.Errorf("read on %s failed: %w", src, err)
|
return fmt.Errorf("read on %s failed: %w", src, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
receiveTime := time.Now()
|
||||||
l.parsePool.Submit(func() {
|
l.parsePool.Submit(func() {
|
||||||
onData(src, buf)
|
onData(src, buf, receiveTime)
|
||||||
})
|
})
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@ import (
|
||||||
_ "embed"
|
_ "embed"
|
||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
|
@ -20,6 +21,7 @@ var once sync.Once
|
||||||
|
|
||||||
type SocketListener struct {
|
type SocketListener struct {
|
||||||
ServiceAddress string `toml:"service_address"`
|
ServiceAddress string `toml:"service_address"`
|
||||||
|
TimeSource string `toml:"time_source"`
|
||||||
Log telegraf.Logger `toml:"-"`
|
Log telegraf.Logger `toml:"-"`
|
||||||
socket.Config
|
socket.Config
|
||||||
socket.SplitConfig
|
socket.SplitConfig
|
||||||
|
|
@ -52,18 +54,27 @@ func (sl *SocketListener) SetParser(parser telegraf.Parser) {
|
||||||
|
|
||||||
func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
|
func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
|
||||||
// Create the callbacks for parsing the data and recording issues
|
// Create the callbacks for parsing the data and recording issues
|
||||||
onData := func(_ net.Addr, data []byte) {
|
onData := func(_ net.Addr, data []byte, receiveTime time.Time) {
|
||||||
metrics, err := sl.parser.Parse(data)
|
metrics, err := sl.parser.Parse(data)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
acc.AddError(err)
|
acc.AddError(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(metrics) == 0 {
|
if len(metrics) == 0 {
|
||||||
once.Do(func() {
|
once.Do(func() {
|
||||||
sl.Log.Debug(internal.NoMetricsCreatedMsg)
|
sl.Log.Debug(internal.NoMetricsCreatedMsg)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, m := range metrics {
|
for _, m := range metrics {
|
||||||
|
switch sl.TimeSource {
|
||||||
|
case "", "metric":
|
||||||
|
case "receive_time":
|
||||||
|
m.SetTime(receiveTime)
|
||||||
|
}
|
||||||
|
|
||||||
acc.AddMetric(m)
|
acc.AddMetric(m)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"net/url"
|
"net/url"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
"unicode"
|
"unicode"
|
||||||
|
|
||||||
"github.com/leodido/go-syslog/v4"
|
"github.com/leodido/go-syslog/v4"
|
||||||
|
|
@ -214,7 +215,7 @@ func (s *Syslog) createDatagramDataHandler(acc telegraf.Accumulator) socket.Call
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return the OnData function
|
// Return the OnData function
|
||||||
return func(src net.Addr, data []byte) {
|
return func(src net.Addr, data []byte, _ time.Time) {
|
||||||
message, err := parser.Parse(data)
|
message, err := parser.Parse(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
acc.AddError(err)
|
acc.AddError(err)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue