feat(inputs.syslog): log remote host as source tag (#9440)

This commit is contained in:
Mat Wood 2022-07-20 12:14:07 -07:00 committed by GitHub
parent 678e6e7a8c
commit 08c895d99f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 96 additions and 18 deletions

View File

@ -138,6 +138,7 @@ To complete TLS setup please refer to [rsyslog docs][4].
- facility (string) - facility (string)
- hostname (string) - hostname (string)
- appname (string) - appname (string)
- source (string)
- fields - fields
- version (integer) - version (integer)
- severity_code (integer) - severity_code (integer)
@ -164,8 +165,6 @@ syslog,appname=evntslog,facility=local4,hostname=mymachine.example.com,severity=
## Troubleshooting ## Troubleshooting
You can send debugging messages directly to the input plugin using netcat:
```sh ```sh
# TCP with octet framing # TCP with octet framing
echo "57 <13>1 2018-10-01T12:00:00.0Z example.org root - - - test" | nc 127.0.0.1 6514 echo "57 <13>1 2018-10-01T12:00:00.0Z example.org root - - - test" | nc 127.0.0.1 6514
@ -174,6 +173,14 @@ echo "57 <13>1 2018-10-01T12:00:00.0Z example.org root - - - test" | nc 127.0.0.
echo "<13>1 2018-10-01T12:00:00.0Z example.org root - - - test" | nc -u 127.0.0.1 6514 echo "<13>1 2018-10-01T12:00:00.0Z example.org root - - - test" | nc -u 127.0.0.1 6514
``` ```
### Resolving Source IPs
The `source` tag stores the remote IP address of the syslog sender.
To resolve these IPs to DNS names, use the
[`reverse_dns` processor](../../../plugins/processors/reverse_dns).
You can send debugging messages directly to the input plugin using netcat:
### RFC3164 ### RFC3164
RFC3164 encoded messages are supported for UDP only, but not all vendors output RFC3164 encoded messages are supported for UDP only, but not all vendors output
@ -201,3 +208,19 @@ $UDPServerRun 514
Make adjustments to the target address as needed and sent your RFC3164 messages Make adjustments to the target address as needed and sent your RFC3164 messages
to port 514. to port 514.
## Example Output
Here is example output of this plugin:
```shell
syslog,appname=docker-compose,facility=daemon,host=bb8,hostname=droplet,location=home,severity=info,source=10.0.0.12 facility_code=3i,message="<redacted>",severity_code=6i,timestamp=1624643706396113000i,version=1i 1624643706400667198
syslog,appname=tailscaled,facility=daemon,host=bb8,hostname=dev,location=home,severity=info,source=10.0.0.15 facility_code=3i,message="<redacted>",severity_code=6i,timestamp=1624643706403394000i,version=1i 1624643706407850408
syslog,appname=docker-compose,facility=daemon,host=bb8,hostname=droplet,location=home,severity=info,source=10.0.0.12 facility_code=3i,message="<redacted>",severity_code=6i,timestamp=1624643706675853000i,version=1i 1624643706679251683
syslog,appname=telegraf,facility=daemon,host=bb8,hostname=droplet,location=home,severity=info,source=10.0.0.12 facility_code=3i,message="<redacted>",severity_code=6i,timestamp=1624643710005006000i,version=1i 1624643710008285426
syslog,appname=telegraf,facility=daemon,host=bb8,hostname=droplet,location=home,severity=info,source=10.0.0.12 facility_code=3i,message="<redacted>",severity_code=6i,timestamp=1624643710005696000i,version=1i 1624643710010754050
syslog,appname=docker-compose,facility=daemon,host=bb8,hostname=droplet,location=home,severity=info,source=10.0.0.12 facility_code=3i,message="<redacted>",severity_code=6i,timestamp=1624643715777813000i,version=1i 1624643715782158154
syslog,appname=docker-compose,facility=daemon,host=bb8,hostname=droplet,location=home,severity=info,source=10.0.0.12 facility_code=3i,message="<redacted>",severity_code=6i,timestamp=1624643716396547000i,version=1i 1624643716400395788
syslog,appname=tailscaled,facility=daemon,host=bb8,hostname=dev,location=home,severity=info,source=10.0.0.15 facility_code=3i,message="<redacted>",severity_code=6i,timestamp=1624643716404931000i,version=1i 1624643716416947058
syslog,appname=docker-compose,facility=daemon,host=bb8,hostname=droplet,location=home,severity=info,source=10.0.0.12 facility_code=3i,message="<redacted>",severity_code=6i,timestamp=1624643716676633000i,version=1i 1624643716680157558
```

View File

@ -14,7 +14,7 @@ import (
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
) )
func getTestCasesForNonTransparent() []testCaseStream { func getTestCasesForNonTransparent(hasRemoteAddr bool) []testCaseStream {
testCases := []testCaseStream{ testCases := []testCaseStream{
{ {
name: "1st/avg/ok", name: "1st/avg/ok",
@ -130,11 +130,23 @@ func getTestCasesForNonTransparent() []testCaseStream {
}, },
}, },
} }
if hasRemoteAddr {
for _, tc := range testCases {
for _, m := range tc.wantStrict {
m.AddTag("source", "127.0.0.1")
}
for _, m := range tc.wantBestEffort {
m.AddTag("source", "127.0.0.1")
}
}
}
return testCases return testCases
} }
func testStrictNonTransparent(t *testing.T, protocol string, address string, wantTLS bool, keepAlive *config.Duration) { func testStrictNonTransparent(t *testing.T, protocol string, address string, wantTLS bool, keepAlive *config.Duration) {
for _, tc := range getTestCasesForNonTransparent() { for _, tc := range getTestCasesForNonTransparent(protocol != "unix") {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
// Creation of a strict mode receiver // Creation of a strict mode receiver
receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 10, false, framing.NonTransparent) receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 10, false, framing.NonTransparent)
@ -193,7 +205,7 @@ func testStrictNonTransparent(t *testing.T, protocol string, address string, wan
func testBestEffortNonTransparent(t *testing.T, protocol string, address string, wantTLS bool) { func testBestEffortNonTransparent(t *testing.T, protocol string, address string, wantTLS bool) {
keepAlive := (*config.Duration)(nil) keepAlive := (*config.Duration)(nil)
for _, tc := range getTestCasesForNonTransparent() { for _, tc := range getTestCasesForNonTransparent(protocol != "unix") {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
// Creation of a best effort mode receiver // Creation of a best effort mode receiver
receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 10, true, framing.NonTransparent) receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 10, true, framing.NonTransparent)

View File

@ -15,7 +15,7 @@ import (
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
) )
func getTestCasesForOctetCounting() []testCaseStream { func getTestCasesForOctetCounting(hasRemoteAddr bool) []testCaseStream {
testCases := []testCaseStream{ testCases := []testCaseStream{
{ {
name: "1st/avg/ok", name: "1st/avg/ok",
@ -330,11 +330,22 @@ func getTestCasesForOctetCounting() []testCaseStream {
}, },
} }
if hasRemoteAddr {
for _, tc := range testCases {
for _, m := range tc.wantStrict {
m.AddTag("source", "127.0.0.1")
}
for _, m := range tc.wantBestEffort {
m.AddTag("source", "127.0.0.1")
}
}
}
return testCases return testCases
} }
func testStrictOctetCounting(t *testing.T, protocol string, address string, wantTLS bool, keepAlive *config.Duration) { func testStrictOctetCounting(t *testing.T, protocol string, address string, wantTLS bool, keepAlive *config.Duration) {
for _, tc := range getTestCasesForOctetCounting() { for _, tc := range getTestCasesForOctetCounting(protocol != "unix") {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
// Creation of a strict mode receiver // Creation of a strict mode receiver
receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, false, framing.OctetCounting) receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, false, framing.OctetCounting)
@ -393,7 +404,7 @@ func testStrictOctetCounting(t *testing.T, protocol string, address string, want
func testBestEffortOctetCounting(t *testing.T, protocol string, address string, wantTLS bool) { func testBestEffortOctetCounting(t *testing.T, protocol string, address string, wantTLS bool) {
keepAlive := (*config.Duration)(nil) keepAlive := (*config.Duration)(nil)
for _, tc := range getTestCasesForOctetCounting() { for _, tc := range getTestCasesForOctetCounting(protocol != "unix") {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
// Creation of a best effort mode receiver // Creation of a best effort mode receiver
receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, true, framing.OctetCounting) receiver := newTCPSyslogReceiver(protocol+"://"+address, keepAlive, 0, true, framing.OctetCounting)

View File

@ -20,7 +20,7 @@ func timeMustParse(value string) time.Time {
return t return t
} }
func getTestCasesForRFC3164() []testCasePacket { func getTestCasesForRFC3164(hasRemoteAddr bool) []testCasePacket {
currentYear := time.Now().Year() currentYear := time.Now().Year()
ts := timeMustParse(fmt.Sprintf("Dec 2 16:31:03 %d", currentYear)).UnixNano() ts := timeMustParse(fmt.Sprintf("Dec 2 16:31:03 %d", currentYear)).UnixNano()
testCases := []testCasePacket{ testCases := []testCasePacket{
@ -62,11 +62,22 @@ func getTestCasesForRFC3164() []testCasePacket {
}, },
} }
if hasRemoteAddr {
for _, tc := range testCases {
if tc.wantStrict != nil {
tc.wantStrict.AddTag("source", "127.0.0.1")
}
if tc.wantBestEffort != nil {
tc.wantBestEffort.AddTag("source", "127.0.0.1")
}
}
}
return testCases return testCases
} }
func testRFC3164(t *testing.T, protocol string, address string, bestEffort bool) { func testRFC3164(t *testing.T, protocol string, address string, bestEffort bool) {
for _, tc := range getTestCasesForRFC3164() { for _, tc := range getTestCasesForRFC3164(protocol != "unix") {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
// Create receiver // Create receiver
receiver := newUDPSyslogReceiver(protocol+"://"+address, bestEffort, syslogRFC3164) receiver := newUDPSyslogReceiver(protocol+"://"+address, bestEffort, syslogRFC3164)

View File

@ -15,7 +15,7 @@ import (
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
) )
func getTestCasesForRFC5426() []testCasePacket { func getTestCasesForRFC5426(hasRemoteAddr bool) []testCasePacket {
testCases := []testCasePacket{ testCases := []testCasePacket{
{ {
name: "complete", name: "complete",
@ -223,11 +223,22 @@ func getTestCasesForRFC5426() []testCasePacket {
}, },
} }
if hasRemoteAddr {
for _, tc := range testCases {
if tc.wantStrict != nil {
tc.wantStrict.AddTag("source", "127.0.0.1")
}
if tc.wantBestEffort != nil {
tc.wantBestEffort.AddTag("source", "127.0.0.1")
}
}
}
return testCases return testCases
} }
func testRFC5426(t *testing.T, protocol string, address string, bestEffort bool) { func testRFC5426(t *testing.T, protocol string, address string, bestEffort bool) {
for _, tc := range getTestCasesForRFC5426() { for _, tc := range getTestCasesForRFC5426(protocol != "unixgram") {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
// Create receiver // Create receiver
receiver := newUDPSyslogReceiver(protocol+"://"+address, bestEffort, syslogRFC5424) receiver := newUDPSyslogReceiver(protocol+"://"+address, bestEffort, syslogRFC5424)
@ -350,6 +361,7 @@ func TestTimeIncrement_udp(t *testing.T) {
map[string]string{ map[string]string{
"severity": "alert", "severity": "alert",
"facility": "kern", "facility": "kern",
"source": "127.0.0.1",
}, },
map[string]interface{}{ map[string]interface{}{
"version": uint16(1), "version": uint16(1),
@ -380,6 +392,7 @@ func TestTimeIncrement_udp(t *testing.T) {
map[string]string{ map[string]string{
"severity": "alert", "severity": "alert",
"facility": "kern", "facility": "kern",
"source": "127.0.0.1",
}, },
map[string]interface{}{ map[string]interface{}{
"version": uint16(1), "version": uint16(1),
@ -409,6 +422,7 @@ func TestTimeIncrement_udp(t *testing.T) {
map[string]string{ map[string]string{
"severity": "alert", "severity": "alert",
"facility": "kern", "facility": "kern",
"source": "127.0.0.1",
}, },
map[string]interface{}{ map[string]interface{}{
"version": uint16(1), "version": uint16(1),

View File

@ -195,7 +195,7 @@ func (s *Syslog) listenPacket(acc telegraf.Accumulator) {
p = rfc3164.NewParser(rfc3164.WithYear(rfc3164.CurrentYear{}), rfc3164.WithBestEffort()) p = rfc3164.NewParser(rfc3164.WithYear(rfc3164.CurrentYear{}), rfc3164.WithBestEffort())
} }
for { for {
n, _, err := s.udpListener.ReadFrom(b) n, sourceAddr, err := s.udpListener.ReadFrom(b)
if err != nil { if err != nil {
if !strings.HasSuffix(err.Error(), ": use of closed network connection") { if !strings.HasSuffix(err.Error(), ": use of closed network connection") {
acc.AddError(err) acc.AddError(err)
@ -205,7 +205,7 @@ func (s *Syslog) listenPacket(acc telegraf.Accumulator) {
message, err := p.Parse(b[:n]) message, err := p.Parse(b[:n])
if message != nil { if message != nil {
acc.AddFields("syslog", fields(message, s), tags(message), s.currentTime()) acc.AddFields("syslog", fields(message, s), tags(message, sourceAddr), s.currentTime())
} }
if err != nil { if err != nil {
acc.AddError(err) acc.AddError(err)
@ -275,7 +275,7 @@ func (s *Syslog) handle(conn net.Conn, acc telegraf.Accumulator) {
var p syslog.Parser var p syslog.Parser
emit := func(r *syslog.Result) { emit := func(r *syslog.Result) {
s.store(*r, acc) s.store(*r, conn.RemoteAddr(), acc)
if s.ReadTimeout != nil && time.Duration(*s.ReadTimeout) > 0 { if s.ReadTimeout != nil && time.Duration(*s.ReadTimeout) > 0 {
if err := conn.SetReadDeadline(time.Now().Add(time.Duration(*s.ReadTimeout))); err != nil { if err := conn.SetReadDeadline(time.Now().Add(time.Duration(*s.ReadTimeout))); err != nil {
acc.AddError(fmt.Errorf("setting read deadline failed: %v", err)) acc.AddError(fmt.Errorf("setting read deadline failed: %v", err))
@ -324,16 +324,16 @@ func (s *Syslog) setKeepAlive(c *net.TCPConn) error {
return c.SetKeepAlivePeriod(time.Duration(*s.KeepAlivePeriod)) return c.SetKeepAlivePeriod(time.Duration(*s.KeepAlivePeriod))
} }
func (s *Syslog) store(res syslog.Result, acc telegraf.Accumulator) { func (s *Syslog) store(res syslog.Result, remoteAddr net.Addr, acc telegraf.Accumulator) {
if res.Error != nil { if res.Error != nil {
acc.AddError(res.Error) acc.AddError(res.Error)
} }
if res.Message != nil { if res.Message != nil {
acc.AddFields("syslog", fields(res.Message, s), tags(res.Message), s.currentTime()) acc.AddFields("syslog", fields(res.Message, s), tags(res.Message, remoteAddr), s.currentTime())
} }
} }
func tags(msg syslog.Message) map[string]string { func tags(msg syslog.Message, sourceAddr net.Addr) map[string]string {
ts := map[string]string{} ts := map[string]string{}
// Not checking assuming a minimally valid message // Not checking assuming a minimally valid message
@ -346,6 +346,13 @@ func tags(msg syslog.Message) map[string]string {
case *rfc3164.SyslogMessage: case *rfc3164.SyslogMessage:
populateCommonTags(&m.Base, ts) populateCommonTags(&m.Base, ts)
} }
if sourceAddr != nil {
if source, _, err := net.SplitHostPort(sourceAddr.String()); err == nil {
ts["source"] = source
}
}
return ts return ts
} }