feat(inputs.suricata): Add ability to parse drop or rejected (#13048)

This commit is contained in:
Joshua Powers 2023-05-02 11:07:34 -06:00 committed by GitHub
parent 200c9e5684
commit 9284bdabf0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 494 additions and 31 deletions

View File

@ -31,17 +31,30 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
```toml @sample.conf
# Suricata stats and alerts plugin
[[inputs.suricata]]
## Data sink for Suricata stats log.
# This is expected to be a filename of a
# unix socket to be created for listening.
source = "/var/run/suricata-stats.sock"
## Source
## Data sink for Suricata stats log. This is expected to be a filename of a
## unix socket to be created for listening.
# source = "/var/run/suricata-stats.sock"
# Delimiter for flattening field keys, e.g. subitem "alert" of "detect"
# becomes "detect_alert" when delimiter is "_".
delimiter = "_"
## Delimiter
## Used for flattening field keys, e.g. subitem "alert" of "detect" becomes
## "detect_alert" when delimiter is "_".
# delimiter = "_"
# Detect alert logs
alerts = false
## Metric version
## Version 1 only collects stats and optionally will look for alerts if
## the configuration setting alerts is set to true.
## Version 2 parses any event type message by default and produced metrics
## under a single metric name using a tag to differentiate between event
## types. The timestamp for the message is applied to the generated metric.
## Additional tags and fields are included as well.
# version = "1"
## Alerts
## In metric version 1, only status is captured by default, alerts must be
## turned on with this configuration option. This option does not apply for
## metric version 2.
# alerts = false
```
## Metrics

View File

@ -1,13 +1,26 @@
# Suricata stats and alerts plugin
[[inputs.suricata]]
## Data sink for Suricata stats log.
# This is expected to be a filename of a
# unix socket to be created for listening.
source = "/var/run/suricata-stats.sock"
## Source
## Data sink for Suricata stats log. This is expected to be a filename of a
## unix socket to be created for listening.
# source = "/var/run/suricata-stats.sock"
# Delimiter for flattening field keys, e.g. subitem "alert" of "detect"
# becomes "detect_alert" when delimiter is "_".
delimiter = "_"
## Delimiter
## Used for flattening field keys, e.g. subitem "alert" of "detect" becomes
## "detect_alert" when delimiter is "_".
# delimiter = "_"
# Detect alert logs
alerts = false
## Metric version
## Version 1 only collects stats and optionally will look for alerts if
## the configuration setting alerts is set to true.
## Version 2 parses any event type message by default and produced metrics
## under a single metric name using a tag to differentiate between event
## types. The timestamp for the message is applied to the generated metric.
## Additional tags and fields are included as well.
# version = "1"
## Alerts
## In metric version 1, only status is captured by default, alerts must be
## turned on with this configuration option. This option does not apply for
## metric version 2.
# alerts = false

View File

@ -12,8 +12,10 @@ import (
"net"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
)
@ -32,6 +34,7 @@ type Suricata struct {
Source string `toml:"source"`
Delimiter string `toml:"delimiter"`
Alerts bool `toml:"alerts"`
Version string `toml:"version"`
inputListener *net.UnixListener
cancel context.CancelFunc
@ -45,6 +48,26 @@ func (*Suricata) SampleConfig() string {
return sampleConfig
}
func (s *Suricata) Init() error {
if s.Source == "" {
s.Source = "/var/run/suricata-stats.sock"
}
if s.Delimiter == "" {
s.Delimiter = "_"
}
switch s.Version {
case "":
s.Version = "1"
case "1", "2":
default:
return fmt.Errorf("invalid version %q, use either 1 or 2", s.Version)
}
return nil
}
// Start initiates background collection of JSON data from the socket
// provided to Suricata.
func (s *Suricata) Start(acc telegraf.Accumulator) error {
@ -223,6 +246,75 @@ func (s *Suricata) parseStats(acc telegraf.Accumulator, result map[string]interf
}
}
func (s *Suricata) parseGeneric(acc telegraf.Accumulator, result map[string]interface{}) error {
eventType := ""
if _, ok := result["event_type"]; !ok {
return fmt.Errorf("unable to determine event type of message: %s", result)
}
value, err := internal.ToString(result["event_type"])
if err != nil {
return fmt.Errorf("unable to convert event type %q to string: %w", result["event_type"], err)
}
eventType = value
timestamp := time.Now()
if val, ok := result["timestamp"]; ok {
value, err := internal.ToString(val)
if err != nil {
return fmt.Errorf("unable to convert timestamp %q to string: %w", val, err)
}
timestamp, err = time.Parse("2006-01-02T15:04:05.999999-0700", value)
if err != nil {
return fmt.Errorf("unable to parse timestamp %q: %w", val, err)
}
}
// Make sure the event key exists first
if _, ok := result[eventType].(map[string]interface{}); !ok {
return fmt.Errorf("unable to find key %q in %s", eventType, result)
}
fields := make(map[string]interface{})
for k, v := range result[eventType].(map[string]interface{}) {
err := flexFlatten(fields, k, v, s.Delimiter)
if err != nil {
s.Log.Debugf("Flattening %q failed: %v", eventType, err)
continue
}
}
tags := map[string]string{
"event_type": eventType,
}
// best effort to gather these tags and fields, if errors are encountered
// we ignore and move on
for _, key := range []string{"proto", "out_iface", "in_iface"} {
if val, ok := result[key]; ok {
if convertedVal, err := internal.ToString(val); err == nil {
tags[key] = convertedVal
}
}
}
for _, key := range []string{"src_ip", "dest_ip"} {
if val, ok := result[key]; ok {
if convertedVal, err := internal.ToString(val); err == nil {
fields[key] = convertedVal
}
}
}
for _, key := range []string{"src_port", "dest_port"} {
if val, ok := result[key]; ok {
if convertedVal, err := internal.ToInt64(val); err == nil {
fields[key] = convertedVal
}
}
}
acc.AddFields("suricata", fields, tags, timestamp)
return nil
}
func (s *Suricata) parse(acc telegraf.Accumulator, sjson []byte) error {
// initial parsing
var result map[string]interface{}
@ -230,18 +322,23 @@ func (s *Suricata) parse(acc telegraf.Accumulator, sjson []byte) error {
if err != nil {
return err
}
// check for presence of relevant stats or alert
_, ok := result["stats"]
_, ok2 := result["alert"]
if !ok && !ok2 {
if s.Version == "2" {
return s.parseGeneric(acc, result)
}
// Version 1 parsing of stats and optionally alerts
if _, ok := result["stats"]; ok {
s.parseStats(acc, result)
} else if _, ok := result["alert"]; ok {
if s.Alerts {
s.parseAlert(acc, result)
}
} else {
s.Log.Debugf("Invalid input without 'stats' or 'alert' object: %v", result)
return fmt.Errorf("input does not contain 'stats' or 'alert' object")
}
if ok {
s.parseStats(acc, result)
} else if ok2 && s.Alerts {
s.parseAlert(acc, result)
}
return nil
}
@ -253,9 +350,6 @@ func (s *Suricata) Gather(_ telegraf.Accumulator) error {
func init() {
inputs.Add("suricata", func() telegraf.Input {
return &Suricata{
Source: "/var/run/suricata-stats.sock",
Delimiter: "_",
}
return &Suricata{}
})
}

View File

@ -380,3 +380,191 @@ func TestSuricataParse(t *testing.T) {
testutil.RequireMetricsEqual(t, tc.expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
}
}
func TestSuricataParseVersion2(t *testing.T) {
tests := []struct {
filename string
expected []telegraf.Metric
}{
{
filename: "v2/alert.json",
expected: []telegraf.Metric{
testutil.MustMetric(
"suricata",
map[string]string{
"event_type": "alert",
"in_iface": "s1-suricata",
"proto": "TCP",
},
map[string]interface{}{
"action": "allowed",
"category": "Misc activity",
"dest_ip": "179.60.192.3",
"dest_port": int64(80),
"gid": float64(1),
"rev": float64(0),
"severity": float64(3),
"signature": "Corrupted HTTP body",
"signature_id": float64(6),
"sourceip": "10.0.0.5",
"sourceport": float64(18715),
"src_ip": "10.0.0.5",
"src_port": int64(18715),
"targetip": "179.60.192.3",
"targetport": float64(80),
},
time.Unix(0, 0),
),
},
},
{
filename: "v2/dns.json",
expected: []telegraf.Metric{
testutil.MustMetric(
"suricata",
map[string]string{
"event_type": "dns",
"in_iface": "eth1",
"proto": "UDP",
},
map[string]interface{}{
"dest_ip": "192.168.0.1",
"dest_port": int64(53),
"id": float64(7145),
"rrname": "reddit.com",
"rrtype": "A",
"src_ip": "192.168.0.100",
"type": "query",
"src_port": int64(39262),
"tx_id": float64(10),
},
time.Unix(0, 0),
),
},
},
{
filename: "v2/drop.json",
expected: []telegraf.Metric{
testutil.MustMetric(
"suricata",
map[string]string{
"event_type": "drop",
"in_iface": "eth1",
"proto": "TCP",
},
map[string]interface{}{
"dest_ip": "54.192.18.125",
"dest_port": int64(443),
"ipid": float64(62316),
"len": float64(76),
"reason": "stream error",
"src_ip": "192.168.0.110",
"src_port": int64(46016),
"tcpack": float64(2339873683),
"tcpres": float64(0),
"tcpseq": float64(3900248957),
"tcpurgp": float64(0),
"tcpwin": float64(501),
"tos": float64(0),
"ttl": float64(64),
},
time.Unix(0, 0),
),
},
},
{
filename: "v2/flow.json",
expected: []telegraf.Metric{
testutil.MustMetric(
"suricata",
map[string]string{
"event_type": "flow",
"in_iface": "eth1",
"proto": "TCP",
},
map[string]interface{}{
"age": float64(0),
"dest_ip": "142.251.130.3",
"dest_port": int64(443),
"src_ip": "192.168.0.121",
"src_port": int64(50212),
"state": "new",
},
time.Unix(0, 0),
),
},
},
{
filename: "v2/http.json",
expected: []telegraf.Metric{
testutil.MustMetric(
"suricata",
map[string]string{
"event_type": "http",
"in_iface": "eth2",
"proto": "TCP",
},
map[string]interface{}{
"dest_ip": "203.205.239.179",
"dest_port": int64(80),
"hostname": "hkminorshort.weixin.qq.com",
"http_content_type": "application/octet-stream",
"http_method": "POST",
"http_user_agent": "MicroMessenger Client",
"length": float64(245),
"protocol": "HTTP/1.1",
"src_ip": "192.168.0.120",
"src_port": int64(33950),
"status": float64(200),
"url": "/mmtls/2d6d45f1",
},
time.Unix(0, 0),
),
},
},
{
filename: "v2/status.json",
expected: []telegraf.Metric{
testutil.MustMetric(
"suricata",
map[string]string{
"event_type": "stats",
},
map[string]interface{}{
"captureerrors": float64(0),
"capturekernel_drops": float64(0),
"capturekernel_packets": float64(522),
"flowemerg_mode_entered": float64(0),
"flowemerg_mode_over": float64(0),
"flowmemcap": float64(0),
"flowmemuse": float64(9965056),
"flowmgrclosed_pruned": float64(0),
"flowmgrfull_hash_pass": float64(1),
"flowmgrnew_pruned": float64(0),
"flowspare": float64(10100),
"flowtcp": float64(15),
"flowudp": float64(13),
"flowwrkspare_sync": float64(11),
"flowwrkspare_sync_avg": float64(100),
"uptime": float64(160),
},
time.Unix(0, 0),
),
},
},
}
for _, tc := range tests {
data, err := os.ReadFile("testdata/" + tc.filename)
require.NoError(t, err)
s := Suricata{
Version: "2",
Log: testutil.Logger{},
}
acc := testutil.Accumulator{}
require.NoError(t, s.parse(&acc, data))
testutil.RequireMetricsEqual(t, tc.expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
}
}

View File

@ -0,0 +1,35 @@
{
"timestamp": "2021-05-30T20:07:13.208777+0200",
"flow_id": 1696236471136137,
"in_iface": "s1-suricata",
"event_type": "alert",
"src_ip": "10.0.0.5",
"src_port": 18715,
"dest_ip": "179.60.192.3",
"dest_port": 80,
"proto": "TCP",
"alert": {
"action": "allowed",
"gid": 1,
"source": {
"ip": "10.0.0.5",
"port": 18715
},
"target": {
"ip": "179.60.192.3",
"port": 80
},
"signature_id": 6,
"rev": 0,
"signature": "Corrupted HTTP body",
"severity": 3,
"category": "Misc activity"
},
"flow": {
"pkts_toserver": 1,
"pkts_toclient": 0,
"bytes_toserver": 174,
"bytes_toclient": 0,
"start": "2021-05-30T20:07:13.208777+0200"
}
}

View File

@ -0,0 +1,18 @@
{
"timestamp": "2023-04-07T00:20:57.995497+0800",
"flow_id": 2150129093506313,
"in_iface": "eth1",
"event_type": "dns",
"src_ip": "192.168.0.100",
"src_port": 39262,
"dest_ip": "192.168.0.1",
"dest_port": 53,
"proto": "UDP",
"dns": {
"type": "query",
"id": 7145,
"rrname": "reddit.com",
"rrtype": "A",
"tx_id": 10
}
}

View File

@ -0,0 +1,29 @@
{
"timestamp": "2023-04-07T00:21:01.318245+0800",
"flow_id": 180225164834117,
"in_iface": "eth1",
"event_type": "drop",
"src_ip": "192.168.0.110",
"src_port": 46016,
"dest_ip": "54.192.18.125",
"dest_port": 443,
"proto": "TCP",
"drop": {
"len": 76,
"tos": 0,
"ttl": 64,
"ipid": 62316,
"tcpseq": 3900248957,
"tcpack": 2339873683,
"tcpwin": 501,
"syn": false,
"ack": true,
"psh": true,
"rst": false,
"urg": false,
"fin": true,
"tcpres": 0,
"tcpurgp": 0,
"reason": "stream error"
}
}

View File

@ -0,0 +1,21 @@
{
"timestamp": "2023-04-07T00:28:22.136079+0800",
"flow_id": 911610881873910,
"in_iface": "eth1",
"event_type": "flow",
"src_ip": "192.168.0.121",
"src_port": 50212,
"dest_ip": "142.251.130.3",
"dest_port": 443,
"proto": "TCP",
"flow": {
"age": 0,
"state": "new",
"alerted": false
},
"tcp": {
"tcp_flags": "00",
"tcp_flags_ts": "00",
"tcp_flags_tc": "00"
}
}

View File

@ -0,0 +1,22 @@
{
"timestamp": "2023-04-07T00:27:50.220224+0800",
"flow_id": 1124332026121723,
"in_iface": "eth2",
"event_type": "http",
"src_ip": "192.168.0.120",
"src_port": 33950,
"dest_ip": "203.205.239.179",
"dest_port": 80,
"proto": "TCP",
"tx_id": 0,
"http": {
"hostname": "hkminorshort.weixin.qq.com",
"url": "/mmtls/2d6d45f1",
"http_user_agent": "MicroMessenger Client",
"http_content_type": "application/octet-stream",
"http_method": "POST",
"protocol": "HTTP/1.1",
"status": 200,
"length": 245
}
}

View File

@ -0,0 +1,30 @@
{
"timestamp": "2023-04-07T00:21:07.805683+0800",
"event_type": "stats",
"stats": {
"uptime": 160,
"capture": {
"kernel_packets": 522,
"kernel_drops": 0,
"errors": 0
},
"flow": {
"memcap": 0,
"tcp": 15,
"udp": 13,
"wrk": {
"spare_sync_avg": 100,
"spare_sync": 11
},
"mgr": {
"full_hash_pass": 1,
"closed_pruned": 0,
"new_pruned": 0
},
"spare": 10100,
"emerg_mode_entered": 0,
"emerg_mode_over": 0,
"memuse": 9965056
}
}
}