Adding RFC3164 support to inputs.syslog (#8454)

This commit is contained in:
Mat Wood 2021-07-08 13:39:46 -07:00 committed by GitHub
parent 1b20680e37
commit f69b37b759
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 239 additions and 77 deletions

2
go.mod
View File

@ -75,7 +75,7 @@ require (
github.com/harlow/kinesis-consumer v0.3.1-0.20181230152818-2f58b136fee0
github.com/hashicorp/consul/api v1.8.1
github.com/hashicorp/go-msgpack v0.5.5 // indirect
github.com/influxdata/go-syslog/v2 v2.0.1
github.com/influxdata/go-syslog/v3 v3.0.0
github.com/influxdata/influxdb-observability/common v0.0.0-20210429174543-86ae73cafd31
github.com/influxdata/influxdb-observability/otel2influx v0.0.0-20210429174543-86ae73cafd31
github.com/influxdata/influxdb-observability/otlp v0.0.0-20210429174543-86ae73cafd31

4
go.sum
View File

@ -860,8 +860,8 @@ github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANyt
github.com/influxdata/apcupsd v0.0.0-20210427145308-694d5caead0e h1:3J1OB4RDKwXs5l8uEV6BP/tucOJOPDQysiT7/9cuXzA=
github.com/influxdata/apcupsd v0.0.0-20210427145308-694d5caead0e/go.mod h1:WYK/Z/aXq9cbMFIL5ihcA4sX/r/3/WCas/Qvs/2fXcA=
github.com/influxdata/flux v0.65.0/go.mod h1:BwN2XG2lMszOoquQaFdPET8FRQfrXiZsWmcMO9rkaVY=
github.com/influxdata/go-syslog/v2 v2.0.1 h1:l44S4l4Q8MhGQcoOxJpbo+QQYxJqp0vdgIVHh4+DO0s=
github.com/influxdata/go-syslog/v2 v2.0.1/go.mod h1:hjvie1UTaD5E1fTnDmxaCw8RRDrT4Ve+XHr5O2dKSCo=
github.com/influxdata/go-syslog/v3 v3.0.0 h1:jichmjSZlYK0VMmlz+k4WeOQd7z745YLsvGMqwtYt4I=
github.com/influxdata/go-syslog/v3 v3.0.0/go.mod h1:tulsOp+CecTAYC27u9miMgq21GqXRW6VdKbOG+QSP4Q=
github.com/influxdata/influxdb v1.8.2/go.mod h1:SIzcnsjaHRFpmlxpJ4S3NT64qtEKYweNTUMb/vh0OMQ=
github.com/influxdata/influxdb-observability/common v0.0.0-20210428231528-a010f53e3e02/go.mod h1:PMngVYsW4uwtzIVmj0ZfLL9UIOwo7Vs+09QHkoYMZv8=
github.com/influxdata/influxdb-observability/common v0.0.0-20210429174543-86ae73cafd31 h1:pfWcpiOrWLJvicIpCiFR8vqrkVbAuKUttWvQDmSlfUM=

View File

@ -55,6 +55,11 @@ Syslog messages should be formatted according to
## By default best effort parsing is off.
# best_effort = false
## The RFC standard to use for message parsing
## By default RFC5424 is used. RFC3164 only supports UDP transport (no streaming support)
## Must be one of "RFC5424", or "RFC3164".
# syslog_standard = "RFC5424"
## Character to prepend to SD-PARAMs (default = "_").
## A syslog message can contain multiple parameters and multiple identifiers within structured data section.
## Eg., [id1 name1="val1" name2="val2"][id2 name1="val1" nameA="valA"]
@ -155,9 +160,12 @@ echo "<13>1 2018-10-01T12:00:00.0Z example.org root - - - test" | nc -u 127.0.0.
#### RFC3164
RFC3164 encoded messages are not currently supported. You may see the following error if a message encoded in this format:
```
E! Error in plugin [inputs.syslog]: expecting a version value in the range 1-999 [col 5]
```
RFC3164 encoded messages are supported for UDP only, but not all vendors output valid RFC3164 messages by default
You can use rsyslog to translate RFC3164 syslog messages into RFC5424 format.
- E.g. Cisco IOS
If you see the following error, it is due to a message encoded in this format:
```
E! Error in plugin [inputs.syslog]: expecting a version value in the range 1-999 [col 5]
```
You can use rsyslog to translate RFC3164 syslog messages into RFC5424 format.

View File

@ -29,14 +29,15 @@ type testCaseStream struct {
werr int // how many errors we expect in the strict mode?
}
func newUDPSyslogReceiver(address string, bestEffort bool) *Syslog {
func newUDPSyslogReceiver(address string, bestEffort bool, rfc syslogRFC) *Syslog {
return &Syslog{
Address: address,
now: func() time.Time {
return defaultTime
},
BestEffort: bestEffort,
Separator: "_",
BestEffort: bestEffort,
SyslogStandard: rfc,
Separator: "_",
}
}
@ -47,10 +48,11 @@ func newTCPSyslogReceiver(address string, keepAlive *config.Duration, maxConn in
now: func() time.Time {
return defaultTime
},
Framing: f,
ReadTimeout: &d,
BestEffort: bestEffort,
Separator: "_",
Framing: f,
ReadTimeout: &d,
BestEffort: bestEffort,
SyslogStandard: syslogRFC5424,
Separator: "_",
}
if keepAlive != nil {
s.KeepAlivePeriod = keepAlive

View File

@ -0,0 +1,123 @@
package syslog
import (
"fmt"
"net"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func timeMustParse(value string) time.Time {
format := "Jan 2 15:04:05 2006"
t, err := time.Parse(format, value)
if err != nil {
panic(fmt.Sprintf("couldn't parse time: %v", value))
}
return t
}
func getTestCasesForRFC3164() []testCasePacket {
currentYear := time.Now().Year()
ts := timeMustParse(fmt.Sprintf("Dec 2 16:31:03 %d", currentYear)).UnixNano()
testCases := []testCasePacket{
{
name: "complete",
data: []byte("<13>Dec 2 16:31:03 host app: Test"),
wantBestEffort: testutil.MustMetric(
"syslog",
map[string]string{
"appname": "app",
"severity": "notice",
"hostname": "host",
"facility": "user",
},
map[string]interface{}{
"timestamp": ts,
"message": "Test",
"facility_code": 1,
"severity_code": 5,
},
defaultTime,
),
wantStrict: testutil.MustMetric(
"syslog",
map[string]string{
"appname": "app",
"severity": "notice",
"hostname": "host",
"facility": "user",
},
map[string]interface{}{
"timestamp": ts,
"message": "Test",
"facility_code": 1,
"severity_code": 5,
},
defaultTime,
),
},
}
return testCases
}
func testRFC3164(t *testing.T, protocol string, address string, bestEffort bool) {
for _, tc := range getTestCasesForRFC3164() {
t.Run(tc.name, func(t *testing.T) {
// Create receiver
receiver := newUDPSyslogReceiver(protocol+"://"+address, bestEffort, syslogRFC3164)
acc := &testutil.Accumulator{}
require.NoError(t, receiver.Start(acc))
defer receiver.Stop()
// Connect
conn, err := net.Dial(protocol, address)
require.NotNil(t, conn)
require.NoError(t, err)
// Write
_, err = conn.Write(tc.data)
conn.Close()
if err != nil {
if err, ok := err.(*net.OpError); ok {
if err.Err.Error() == "write: message too long" {
return
}
}
}
// Waiting ...
if tc.wantStrict == nil && tc.werr || bestEffort && tc.werr {
acc.WaitError(1)
}
if tc.wantBestEffort != nil && bestEffort || tc.wantStrict != nil && !bestEffort {
acc.Wait(1) // RFC3164 mandates a syslog message per UDP packet
}
// Compare
var got telegraf.Metric
var want telegraf.Metric
if len(acc.Metrics) > 0 {
got = acc.GetTelegrafMetrics()[0]
}
if bestEffort {
want = tc.wantBestEffort
} else {
want = tc.wantStrict
}
testutil.RequireMetricEqual(t, want, got)
})
}
}
func TestRFC3164BestEffort_udp(t *testing.T) {
testRFC3164(t, "udp", address, true)
}
func TestRFC3164Strict_udp(t *testing.T) {
testRFC3164(t, "udp", address, false)
}

View File

@ -232,7 +232,7 @@ func testRFC5426(t *testing.T, protocol string, address string, bestEffort bool)
for _, tc := range getTestCasesForRFC5426() {
t.Run(tc.name, func(t *testing.T) {
// Create receiver
receiver := newUDPSyslogReceiver(protocol+"://"+address, bestEffort)
receiver := newUDPSyslogReceiver(protocol+"://"+address, bestEffort, syslogRFC5424)
acc := &testutil.Accumulator{}
require.NoError(t, receiver.Start(acc))
defer receiver.Stop()
@ -325,10 +325,11 @@ func TestTimeIncrement_udp(t *testing.T) {
// Create receiver
receiver := &Syslog{
Address: "udp://" + address,
now: getNow,
BestEffort: false,
Separator: "_",
Address: "udp://" + address,
now: getNow,
BestEffort: false,
SyslogStandard: syslogRFC5424,
Separator: "_",
}
acc := &testutil.Accumulator{}
require.NoError(t, receiver.Start(acc))

View File

@ -13,11 +13,11 @@ import (
"time"
"unicode"
"github.com/influxdata/go-syslog/v2"
"github.com/influxdata/go-syslog/v2/nontransparent"
"github.com/influxdata/go-syslog/v2/octetcounting"
"github.com/influxdata/go-syslog/v2/rfc5424"
syslog "github.com/influxdata/go-syslog/v3"
"github.com/influxdata/go-syslog/v3/nontransparent"
"github.com/influxdata/go-syslog/v3/octetcounting"
"github.com/influxdata/go-syslog/v3/rfc3164"
"github.com/influxdata/go-syslog/v3/rfc5424"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
framing "github.com/influxdata/telegraf/internal/syslog"
@ -25,8 +25,12 @@ import (
"github.com/influxdata/telegraf/plugins/inputs"
)
type syslogRFC string
const defaultReadTimeout = time.Second * 5
const ipMaxPacketSize = 64 * 1024
const syslogRFC3164 = "RFC3164"
const syslogRFC5424 = "RFC5424"
// Syslog is a syslog plugin
type Syslog struct {
@ -36,6 +40,7 @@ type Syslog struct {
MaxConnections int
ReadTimeout *config.Duration
Framing framing.Framing
SyslogStandard syslogRFC
Trailer nontransparent.TrailerType
BestEffort bool
Separator string `toml:"sdparam_separator"`
@ -97,6 +102,11 @@ var sampleConfig = `
## By default best effort parsing is off.
# best_effort = false
## The RFC standard to use for message parsing
## By default RFC5424 is used. RFC3164 only supports UDP transport (no streaming support)
## Must be one of "RFC5424", or "RFC3164".
# syslog_standard = "RFC5424"
## Character to prepend to SD-PARAMs (default = "_").
## A syslog message can contain multiple parameters and multiple identifiers within structured data section.
## Eg., [id1 name1="val1" name2="val2"][id2 name1="val1" nameA="valA"]
@ -228,10 +238,15 @@ func (s *Syslog) listenPacket(acc telegraf.Accumulator) {
defer s.wg.Done()
b := make([]byte, ipMaxPacketSize)
var p syslog.Machine
if s.BestEffort {
p = rfc5424.NewParser(rfc5424.WithBestEffort())
} else {
switch {
case !s.BestEffort && s.SyslogStandard == syslogRFC5424:
p = rfc5424.NewParser()
case s.BestEffort && s.SyslogStandard == syslogRFC5424:
p = rfc5424.NewParser(rfc5424.WithBestEffort())
case !s.BestEffort && s.SyslogStandard == syslogRFC3164:
p = rfc3164.NewParser(rfc3164.WithYear(rfc3164.CurrentYear{}))
case s.BestEffort && s.SyslogStandard == syslogRFC3164:
p = rfc3164.NewParser(rfc3164.WithYear(rfc3164.CurrentYear{}), rfc3164.WithBestEffort())
}
for {
n, _, err := s.udpListener.ReadFrom(b)
@ -379,60 +394,72 @@ func tags(msg syslog.Message) map[string]string {
ts["severity"] = *msg.SeverityShortLevel()
ts["facility"] = *msg.FacilityLevel()
if msg.Hostname() != nil {
ts["hostname"] = *msg.Hostname()
switch m := msg.(type) {
case *rfc5424.SyslogMessage:
populateCommonTags(&m.Base, ts)
case *rfc3164.SyslogMessage:
populateCommonTags(&m.Base, ts)
}
if msg.Appname() != nil {
ts["appname"] = *msg.Appname()
}
return ts
}
func fields(msg syslog.Message, s *Syslog) map[string]interface{} {
// Not checking assuming a minimally valid message
flds := map[string]interface{}{
"version": msg.Version(),
}
flds["severity_code"] = int(*msg.Severity())
flds["facility_code"] = int(*msg.Facility())
flds := map[string]interface{}{}
if msg.Timestamp() != nil {
flds["timestamp"] = (*msg.Timestamp()).UnixNano()
}
switch m := msg.(type) {
case *rfc5424.SyslogMessage:
populateCommonFields(&m.Base, flds)
// Not checking assuming a minimally valid message
flds["version"] = m.Version
if msg.ProcID() != nil {
flds["procid"] = *msg.ProcID()
}
if msg.MsgID() != nil {
flds["msgid"] = *msg.MsgID()
}
if msg.Message() != nil {
flds["message"] = strings.TrimRightFunc(*msg.Message(), func(r rune) bool {
return unicode.IsSpace(r)
})
}
if msg.StructuredData() != nil {
for sdid, sdparams := range *msg.StructuredData() {
if len(sdparams) == 0 {
// When SD-ID does not have params we indicate its presence with a bool
flds[sdid] = true
continue
}
for name, value := range sdparams {
// Using whitespace as separator since it is not allowed by the grammar within SDID
flds[sdid+s.Separator+name] = value
if m.StructuredData != nil {
for sdid, sdparams := range *m.StructuredData {
if len(sdparams) == 0 {
// When SD-ID does not have params we indicate its presence with a bool
flds[sdid] = true
continue
}
for name, value := range sdparams {
// Using whitespace as separator since it is not allowed by the grammar within SDID
flds[sdid+s.Separator+name] = value
}
}
}
case *rfc3164.SyslogMessage:
populateCommonFields(&m.Base, flds)
}
return flds
}
func populateCommonFields(msg *syslog.Base, flds map[string]interface{}) {
flds["facility_code"] = int(*msg.Facility)
flds["severity_code"] = int(*msg.Severity)
if msg.Timestamp != nil {
flds["timestamp"] = (*msg.Timestamp).UnixNano()
}
if msg.ProcID != nil {
flds["procid"] = *msg.ProcID
}
if msg.MsgID != nil {
flds["msgid"] = *msg.MsgID
}
if msg.Message != nil {
flds["message"] = strings.TrimRightFunc(*msg.Message, func(r rune) bool {
return unicode.IsSpace(r)
})
}
}
func populateCommonTags(msg *syslog.Base, ts map[string]string) {
if msg.Hostname != nil {
ts["hostname"] = *msg.Hostname
}
if msg.Appname != nil {
ts["appname"] = *msg.Appname
}
}
type unixCloser struct {
path string
closer io.Closer
@ -463,12 +490,13 @@ func init() {
defaultTimeout := config.Duration(defaultReadTimeout)
inputs.Add("syslog", func() telegraf.Input {
return &Syslog{
Address: ":6514",
now: getNanoNow,
ReadTimeout: &defaultTimeout,
Framing: framing.OctetCounting,
Trailer: nontransparent.LF,
Separator: "_",
Address: ":6514",
now: getNanoNow,
ReadTimeout: &defaultTimeout,
Framing: framing.OctetCounting,
SyslogStandard: syslogRFC5424,
Trailer: nontransparent.LF,
Separator: "_",
}
})
}

View File

@ -9,8 +9,8 @@ import (
"strings"
"time"
"github.com/influxdata/go-syslog/v2/nontransparent"
"github.com/influxdata/go-syslog/v2/rfc5424"
"github.com/influxdata/go-syslog/v3/nontransparent"
"github.com/influxdata/go-syslog/v3/rfc5424"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
framing "github.com/influxdata/telegraf/internal/syslog"

View File

@ -8,7 +8,7 @@ import (
"strings"
"time"
"github.com/influxdata/go-syslog/v2/rfc5424"
"github.com/influxdata/go-syslog/v3/rfc5424"
"github.com/influxdata/telegraf"
)