fix(outputs.syslog): Trim field-names belonging to explicit SDIDs correctly (#16014)
This commit is contained in:
parent
c66c2c7c6c
commit
0b1581c0da
|
|
@ -121,13 +121,14 @@ func (s *Syslog) Write(metrics []telegraf.Metric) (err error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, metric := range metrics {
|
for _, metric := range metrics {
|
||||||
var msg *rfc5424.SyslogMessage
|
msg, err := s.mapper.MapMetricToSyslogMessage(metric)
|
||||||
if msg, err = s.mapper.MapMetricToSyslogMessage(metric); err != nil {
|
if err != nil {
|
||||||
s.Log.Errorf("Failed to create syslog message: %v", err)
|
s.Log.Errorf("Failed to create syslog message: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
var msgBytesWithFraming []byte
|
|
||||||
if msgBytesWithFraming, err = s.getSyslogMessageBytesWithFraming(msg); err != nil {
|
msgBytesWithFraming, err := s.getSyslogMessageBytesWithFraming(msg)
|
||||||
|
if err != nil {
|
||||||
s.Log.Errorf("Failed to convert syslog message with framing: %v", err)
|
s.Log.Errorf("Failed to convert syslog message with framing: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -53,22 +53,25 @@ func (sm *SyslogMapper) mapStructuredData(metric telegraf.Metric, msg *rfc5424.S
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *SyslogMapper) mapStructuredDataItem(key, value string, msg *rfc5424.SyslogMessage) {
|
func (sm *SyslogMapper) mapStructuredDataItem(key, value string, msg *rfc5424.SyslogMessage) {
|
||||||
|
// Do not add already reserved keys
|
||||||
if sm.reservedKeys[key] {
|
if sm.reservedKeys[key] {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
isExplicitSdid := false
|
|
||||||
|
// Add keys matching one of the sd-IDs
|
||||||
for _, sdid := range sm.Sdids {
|
for _, sdid := range sm.Sdids {
|
||||||
k := strings.TrimLeft(key, sdid+sm.Separator)
|
if k := strings.TrimPrefix(key, sdid+sm.Separator); key != k {
|
||||||
if len(key) > len(k) {
|
|
||||||
isExplicitSdid = true
|
|
||||||
msg.SetParameter(sdid, k, value)
|
msg.SetParameter(sdid, k, value)
|
||||||
break
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !isExplicitSdid && len(sm.DefaultSdid) > 0 {
|
|
||||||
k := strings.TrimPrefix(key, sm.DefaultSdid+sm.Separator)
|
// Add remaining keys with the default sd-ID if configured
|
||||||
msg.SetParameter(sm.DefaultSdid, k, value)
|
if sm.DefaultSdid == "" {
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
k := strings.TrimPrefix(key, sm.DefaultSdid+sm.Separator)
|
||||||
|
msg.SetParameter(sm.DefaultSdid, k, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *SyslogMapper) mapAppname(metric telegraf.Metric, msg *rfc5424.SyslogMessage) {
|
func (sm *SyslogMapper) mapAppname(metric telegraf.Metric, msg *rfc5424.SyslogMessage) {
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,10 @@
|
||||||
package syslog
|
package syslog
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"net"
|
"net"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
@ -9,9 +12,12 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
"github.com/influxdata/telegraf/metric"
|
"github.com/influxdata/telegraf/metric"
|
||||||
"github.com/influxdata/telegraf/models"
|
"github.com/influxdata/telegraf/models"
|
||||||
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||||||
|
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
"github.com/leodido/go-syslog/v4/nontransparent"
|
"github.com/leodido/go-syslog/v4/nontransparent"
|
||||||
)
|
)
|
||||||
|
|
@ -428,3 +434,153 @@ func TestStartupErrorBehaviorRetry(t *testing.T) {
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
require.NotEmpty(t, string(buf))
|
require.NotEmpty(t, string(buf))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCases(t *testing.T) {
|
||||||
|
// Get all testcase directories
|
||||||
|
folders, err := os.ReadDir("testcases")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Register the plugin
|
||||||
|
outputs.Add("syslog", func() telegraf.Output { return newSyslog() })
|
||||||
|
|
||||||
|
for _, f := range folders {
|
||||||
|
// Only handle folders
|
||||||
|
if !f.IsDir() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run(f.Name(), func(t *testing.T) {
|
||||||
|
testcasePath := filepath.Join("testcases", f.Name())
|
||||||
|
configFilename := filepath.Join(testcasePath, "telegraf.conf")
|
||||||
|
inputFilename := filepath.Join(testcasePath, "input.influx")
|
||||||
|
expectedFilename := filepath.Join(testcasePath, "expected.out")
|
||||||
|
expectedErrorFilename := filepath.Join(testcasePath, "expected.err")
|
||||||
|
|
||||||
|
// Get parser to parse input and expected output
|
||||||
|
parser := &influx.Parser{}
|
||||||
|
require.NoError(t, parser.Init())
|
||||||
|
|
||||||
|
// Load the input data
|
||||||
|
input, err := testutil.ParseMetricsFromFile(inputFilename, parser)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Read the expected output if any
|
||||||
|
var expected []byte
|
||||||
|
if _, err := os.Stat(expectedFilename); err == nil {
|
||||||
|
expected, err = os.ReadFile(expectedFilename)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read the expected output if any
|
||||||
|
var expectedError string
|
||||||
|
if _, err := os.Stat(expectedErrorFilename); err == nil {
|
||||||
|
expectedErrors, err := testutil.ParseLinesFromFile(expectedErrorFilename)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, expectedErrors, 1)
|
||||||
|
expectedError = expectedErrors[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Configure the plugin
|
||||||
|
cfg := config.NewConfig()
|
||||||
|
require.NoError(t, cfg.LoadConfig(configFilename))
|
||||||
|
require.Len(t, cfg.Outputs, 1)
|
||||||
|
|
||||||
|
// Create a mock-server to receive the data
|
||||||
|
server, err := newMockServer()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
server.listen()
|
||||||
|
}()
|
||||||
|
defer server.close()
|
||||||
|
|
||||||
|
// Setup the plugin
|
||||||
|
plugin := cfg.Outputs[0].Output.(*Syslog)
|
||||||
|
plugin.Address = "udp://" + server.address()
|
||||||
|
plugin.Log = testutil.Logger{}
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
|
require.NoError(t, plugin.Connect())
|
||||||
|
defer plugin.Close()
|
||||||
|
|
||||||
|
// Write the data and wait for it to arrive
|
||||||
|
err = plugin.Write(input)
|
||||||
|
if expectedError != "" {
|
||||||
|
require.ErrorContains(t, err, expectedError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, plugin.Close())
|
||||||
|
|
||||||
|
require.Eventuallyf(t, func() bool {
|
||||||
|
return server.len() >= len(expected)
|
||||||
|
}, 3*time.Second, 100*time.Millisecond, "received %q", server.message())
|
||||||
|
|
||||||
|
// Check the received data
|
||||||
|
require.Equal(t, string(expected), server.message())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type mockServer struct {
|
||||||
|
conn *net.UDPConn
|
||||||
|
|
||||||
|
data bytes.Buffer
|
||||||
|
err error
|
||||||
|
|
||||||
|
sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMockServer() (*mockServer, error) {
|
||||||
|
addr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
conn, err := net.ListenUDP("udp", addr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &mockServer{conn: conn}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *mockServer) address() string {
|
||||||
|
return s.conn.LocalAddr().String()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *mockServer) listen() {
|
||||||
|
buf := make([]byte, 2048)
|
||||||
|
for {
|
||||||
|
n, err := s.conn.Read(buf)
|
||||||
|
if err != nil {
|
||||||
|
s.err = err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.Lock()
|
||||||
|
_, _ = s.data.Write(buf[:n])
|
||||||
|
s.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *mockServer) close() error {
|
||||||
|
if s.conn == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.conn.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *mockServer) message() string {
|
||||||
|
s.Lock()
|
||||||
|
defer s.Unlock()
|
||||||
|
return s.data.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *mockServer) len() int {
|
||||||
|
s.Lock()
|
||||||
|
defer s.Unlock()
|
||||||
|
return s.data.Len()
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
342 <13>1 2024-10-11T21:30:04Z draco Telegraf - scc-change-logs [additional entityUid="6b580296-7199-47b5-9736-9b91329c284e" lastEventDate="2024-10-09T19:26:13Z" status="COMPLETED" uid="544ee602-1f4c-4f5f-bbd2-365d865d78b3"][events action="UPDATE" date="2024-10-09T19:26:08Z" description="Changed ASA Config" diff="" username="user@mydomain.com"]
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
scc-change-logs,host=draco events_description="Changed ASA Config",events_diff="@@ -5,1 +5,1 @@\\n-: Written by lockhart at 18:53:02.210 UTC Tue Oct 8 2024\\n+: Written by lockhart at 19:24:54.048 UTC Wed Oct 9 2024\\n@@ -135,2 +135,0 @@\\n-object network 1.1.1.1\\n-host 1.1.1.1\\n@@ -239,0 +237,2 @@\\n+object network 1.1.1.1\\n+host 1.1.1.1\\n@@ -1108,1 +1108,1 @@\\n-Cryptochecksum:b06f479add1a10f8388a2958d0ee0018\\n+Cryptochecksum:b858dfb10323f3dbc9694a49b8c94168",events_username="user@mydomain.com",events_date="2024-10-09T19:26:08Z",events_action="UPDATE",uid="544ee602-1f4c-4f5f-bbd2-365d865d78b3",status="COMPLETED",lastEventDate="2024-10-09T19:26:13Z",entityUid="6b580296-7199-47b5-9736-9b91329c284e" 1728682204000000000
|
||||||
|
|
@ -0,0 +1,4 @@
|
||||||
|
[[outputs.syslog]]
|
||||||
|
address = "udp://127.0.0.1:0"
|
||||||
|
default_sdid = "additional"
|
||||||
|
sdids = ["events"]
|
||||||
Loading…
Reference in New Issue