diff --git a/plugins/inputs/all/udp_listener.go b/plugins/inputs/all/udp_listener.go deleted file mode 100644 index 07150570c..000000000 --- a/plugins/inputs/all/udp_listener.go +++ /dev/null @@ -1,5 +0,0 @@ -//go:build !custom || inputs || inputs.udp_listener - -package all - -import _ "github.com/influxdata/telegraf/plugins/inputs/udp_listener" // register plugin diff --git a/plugins/inputs/udp_listener/README.md b/plugins/inputs/udp_listener/README.md deleted file mode 100644 index 19de2d8cc..000000000 --- a/plugins/inputs/udp_listener/README.md +++ /dev/null @@ -1,36 +0,0 @@ -# UDP Listener Input Plugin - -**DEPRECATED: As of version 1.3 the UDP listener plugin has been deprecated in -favor of the [socket_listener plugin](../socket_listener/README.md)** - -## Service Input - -This plugin is a service input. Normal plugins gather metrics determined by the -interval setting. Service plugins start a service to listens and waits for -metrics or events to occur. Service plugins have two key differences from -normal plugins: - -1. The global or plugin specific `interval` setting may not apply -2. The CLI options of `--test`, `--test-wait`, and `--once` may not produce - output for this plugin - -## Global configuration options - -In addition to the plugin-specific configuration settings, plugins support -additional global and plugin configuration settings. These settings are used to -modify metrics, tags, and field or create aliases and configure ordering, etc. -See the [CONFIGURATION.md][CONFIGURATION.md] for more details. - -[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins - -## Configuration - -```toml @sample.conf -# Generic UDP listener -[[inputs.udp_listener]] - # see https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener -``` - -## Metrics - -## Example Output diff --git a/plugins/inputs/udp_listener/sample.conf b/plugins/inputs/udp_listener/sample.conf deleted file mode 100644 index 4f1f3b263..000000000 --- a/plugins/inputs/udp_listener/sample.conf +++ /dev/null @@ -1,3 +0,0 @@ -# Generic UDP listener -[[inputs.udp_listener]] - # see https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener diff --git a/plugins/inputs/udp_listener/udp_listener.go b/plugins/inputs/udp_listener/udp_listener.go deleted file mode 100644 index 968808b1a..000000000 --- a/plugins/inputs/udp_listener/udp_listener.go +++ /dev/null @@ -1,225 +0,0 @@ -//go:generate ../../../tools/readme_config_includer/generator -package udp_listener - -import ( - _ "embed" - "errors" - "fmt" - "net" - "sync" - "time" - - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/plugins/inputs" - "github.com/influxdata/telegraf/selfstat" -) - -//go:embed sample.conf -var sampleConfig string - -// UDPListener main struct for the collector -type UDPListener struct { - ServiceAddress string - - // UDPBufferSize should only be set if you want/need the telegraf UDP socket to - // differ from the system setting. In cases where you set the rmem_default to a lower - // value at the host level, but need a larger buffer for UDP bursty traffic, this - // setting enables you to configure that value ONLY for telegraf UDP sockets on this listener - // Set this to 0 (or comment out) to take system default - // - // NOTE: You should ensure that your rmem_max is >= to this setting to work properly! - // (e.g. sysctl -w net.core.rmem_max=N) - UDPBufferSize int `toml:"udp_buffer_size"` - AllowedPendingMessages int - - // UDPPacketSize is deprecated, it's only here for legacy support - // we now always create 1 max size buffer and then copy only what we need - // into the in channel - // see https://github.com/influxdata/telegraf/pull/992 - UDPPacketSize int `toml:"udp_packet_size"` - - sync.Mutex - wg sync.WaitGroup - - in chan []byte - done chan struct{} - // drops tracks the number of dropped metrics. - drops int - // malformed tracks the number of malformed packets - malformed int - - parser telegraf.Parser - - // Keep the accumulator in this struct - acc telegraf.Accumulator - - listener *net.UDPConn - - PacketsRecv selfstat.Stat - BytesRecv selfstat.Stat - - Log telegraf.Logger -} - -// UDPMaxPacketSize is packet limit, see -// https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure -const UDPMaxPacketSize int = 64 * 1024 - -var dropwarn = "udp_listener message queue full. " + - "We have dropped %d messages so far. " + - "You may want to increase allowed_pending_messages in the config" - -var malformedwarn = "udp_listener has received %d malformed packets" + - " thus far." - -func (*UDPListener) SampleConfig() string { - return sampleConfig -} - -// All the work is done in the Start() function, so this is just a dummy -// function. -func (u *UDPListener) Gather(_ telegraf.Accumulator) error { - return nil -} - -func (u *UDPListener) SetParser(parser telegraf.Parser) { - u.parser = parser -} - -func (u *UDPListener) Start(acc telegraf.Accumulator) error { - u.Lock() - defer u.Unlock() - - u.Log.Warn("DEPRECATED: the UDP listener plugin has been deprecated " + - "in favor of the socket_listener plugin " + - "(https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener)") - - tags := map[string]string{ - "address": u.ServiceAddress, - } - u.PacketsRecv = selfstat.Register("udp_listener", "packets_received", tags) - u.BytesRecv = selfstat.Register("udp_listener", "bytes_received", tags) - - u.acc = acc - u.in = make(chan []byte, u.AllowedPendingMessages) - u.done = make(chan struct{}) - - if err := u.udpListen(); err != nil { - return err - } - - u.wg.Add(1) - go u.udpParser() - - u.Log.Infof("Started service on %q (ReadBuffer: %d)", u.ServiceAddress, u.UDPBufferSize) - return nil -} - -func (u *UDPListener) Stop() { - u.Lock() - defer u.Unlock() - close(u.done) - u.wg.Wait() - u.listener.Close() - close(u.in) - u.Log.Infof("Stopped service on %q", u.ServiceAddress) -} - -func (u *UDPListener) udpListen() error { - var err error - - address, _ := net.ResolveUDPAddr("udp", u.ServiceAddress) - u.listener, err = net.ListenUDP("udp", address) - - if err != nil { - return err - } - - u.Log.Infof("Server listening on %q", u.listener.LocalAddr().String()) - - if u.UDPBufferSize > 0 { - err = u.listener.SetReadBuffer(u.UDPBufferSize) // if we want to move away from OS default - if err != nil { - return fmt.Errorf("failed to set UDP read buffer to %d: %w", u.UDPBufferSize, err) - } - } - - u.wg.Add(1) - go u.udpListenLoop() - return nil -} - -func (u *UDPListener) udpListenLoop() { - defer u.wg.Done() - - buf := make([]byte, UDPMaxPacketSize) - for { - select { - case <-u.done: - return - default: - if err := u.listener.SetReadDeadline(time.Now().Add(time.Second)); err != nil { - u.Log.Error("setting read-deadline failed: " + err.Error()) - } - - n, _, err := u.listener.ReadFromUDP(buf) - if err != nil { - var netErr net.Error - if !errors.As(err, &netErr) || !netErr.Timeout() { - u.Log.Error(err.Error()) - } - continue - } - u.BytesRecv.Incr(int64(n)) - u.PacketsRecv.Incr(1) - bufCopy := make([]byte, n) - copy(bufCopy, buf[:n]) - - select { - case u.in <- bufCopy: - default: - u.drops++ - if u.drops == 1 || u.drops%u.AllowedPendingMessages == 0 { - u.Log.Errorf(dropwarn, u.drops) - } - } - } - } -} - -func (u *UDPListener) udpParser() { - defer u.wg.Done() - - var packet []byte - var metrics []telegraf.Metric - var err error - for { - select { - case <-u.done: - if len(u.in) == 0 { - return - } - case packet = <-u.in: - metrics, err = u.parser.Parse(packet) - if err == nil { - for _, m := range metrics { - u.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) - } - } else { - u.malformed++ - if u.malformed == 1 || u.malformed%1000 == 0 { - u.Log.Errorf(malformedwarn, u.malformed) - } - } - } - } -} - -func init() { - inputs.Add("udp_listener", func() telegraf.Input { - return &UDPListener{ - ServiceAddress: ":8092", - AllowedPendingMessages: 10000, - } - }) -} diff --git a/plugins/inputs/udp_listener/udp_listener_test.go b/plugins/inputs/udp_listener/udp_listener_test.go deleted file mode 100644 index a37d76da9..000000000 --- a/plugins/inputs/udp_listener/udp_listener_test.go +++ /dev/null @@ -1,201 +0,0 @@ -package udp_listener - -// This plugin will become officially deprecated in 2.0 -// These tests have been randomly failing the nightly tests, can't remove plugin until breaking changes are allowed to be merged -// See this issue for more information: https://github.com/influxdata/telegraf/issues/9478 - -// const ( -// testMsg = "cpu_load_short,host=server01 value=12.0 1422568543702900257\n" - -// testMsgs = ` -// cpu_load_short,host=server02 value=12.0 1422568543702900257 -// cpu_load_short,host=server03 value=12.0 1422568543702900257 -// cpu_load_short,host=server04 value=12.0 1422568543702900257 -// cpu_load_short,host=server05 value=12.0 1422568543702900257 -// cpu_load_short,host=server06 value=12.0 1422568543702900257 -// ` -// ) - -// func newTestUDPListener() (*UDPListener, chan []byte) { -// in := make(chan []byte, 1500) -// listener := &UDPListener{ -// Log: testutil.Logger{}, -// ServiceAddress: ":8125", -// AllowedPendingMessages: 10000, -// in: in, -// done: make(chan struct{}), -// } -// return listener, in -// } - -// // func TestHighTrafficUDP(t *testing.T) { -// // listener := UDPListener{ -// // ServiceAddress: ":8126", -// // AllowedPendingMessages: 100000, -// // } -// // var err error -// // listener.parser, err = parsers.NewInfluxParser() -// // require.NoError(t, err) -// // acc := &testutil.Accumulator{} - -// // // send multiple messages to socket -// // err = listener.Start(acc) -// // require.NoError(t, err) - -// // conn, err := net.Dial("udp", "127.0.0.1:8126") -// // require.NoError(t, err) -// // mlen := int64(len(testMsgs)) -// // var sent int64 -// // for i := 0; i < 20000; i++ { -// // for sent > listener.BytesRecv.Get()+32000 { -// // // more than 32kb sitting in OS buffer, let it drain -// // runtime.Gosched() -// // } -// // conn.Write([]byte(testMsgs)) -// // sent += mlen -// // } -// // for sent > listener.BytesRecv.Get() { -// // runtime.Gosched() -// // } -// // for len(listener.in) > 0 { -// // runtime.Gosched() -// // } -// // listener.Stop() - -// // require.Equal(t, uint64(100000), acc.NMetrics()) -// // } - -// func TestConnectUDP(t *testing.T) { -// listener := UDPListener{ -// Log: testutil.Logger{}, -// ServiceAddress: ":8127", -// AllowedPendingMessages: 10000, -// } -// listener.parser, _ = parsers.NewInfluxParser() - -// acc := &testutil.Accumulator{} -// require.NoError(t, listener.Start(acc)) -// defer listener.Stop() - -// conn, err := net.Dial("udp", "127.0.0.1:8127") -// require.NoError(t, err) - -// // send single message to socket -// _, err = fmt.Fprint(conn, testMsg) -// require.NoError(t, err) -// acc.Wait(1) -// acc.AssertContainsTaggedFields(t, "cpu_load_short", -// map[string]interface{}{"value": float64(12)}, -// map[string]string{"host": "server01"}, -// ) - -// // send multiple messages to socket -// _, err = fmt.Fprint(conn, testMsgs) -// require.NoError(t, err) -// acc.Wait(6) -// hostTags := []string{"server02", "server03", -// "server04", "server05", "server06"} -// for _, hostTag := range hostTags { -// acc.AssertContainsTaggedFields(t, "cpu_load_short", -// map[string]interface{}{"value": float64(12)}, -// map[string]string{"host": hostTag}, -// ) -// } -// } - -// func TestRunParser(t *testing.T) { -// log.SetOutput(io.Discard) -// var testmsg = []byte("cpu_load_short,host=server01 value=12.0 1422568543702900257\n") - -// listener, in := newTestUDPListener() -// acc := testutil.Accumulator{} -// listener.acc = &acc -// defer close(listener.done) - -// listener.parser, _ = parsers.NewInfluxParser() -// listener.wg.Add(1) -// go listener.udpParser() - -// in <- testmsg -// require.NoError(t, listener.Gather(&acc)) - -// acc.Wait(1) -// acc.AssertContainsTaggedFields(t, "cpu_load_short", -// map[string]interface{}{"value": float64(12)}, -// map[string]string{"host": "server01"}, -// ) -// } - -// func TestRunParserInvalidMsg(_ *testing.T) { -// log.SetOutput(io.Discard) -// var testmsg = []byte("cpu_load_short") - -// listener, in := newTestUDPListener() -// acc := testutil.Accumulator{} -// listener.acc = &acc -// defer close(listener.done) - -// listener.parser, _ = parsers.NewInfluxParser() -// listener.wg.Add(1) -// go listener.udpParser() - -// buf := bytes.NewBuffer(nil) -// log.SetOutput(buf) -// defer log.SetOutput(os.Stderr) -// in <- testmsg - -// scnr := bufio.NewScanner(buf) -// for scnr.Scan() { -// if strings.Contains(scnr.Text(), fmt.Sprintf(malformedwarn, 1)) { -// break -// } -// } -// } - -// func TestRunParserGraphiteMsg(t *testing.T) { -// log.SetOutput(io.Discard) -// var testmsg = []byte("cpu.load.graphite 12 1454780029") - -// listener, in := newTestUDPListener() -// acc := testutil.Accumulator{} -// listener.acc = &acc -// defer close(listener.done) - -// listener.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil) -// listener.wg.Add(1) -// go listener.udpParser() - -// in <- testmsg -// require.NoError(t, listener.Gather(&acc)) - -// acc.Wait(1) -// acc.AssertContainsFields(t, "cpu_load_graphite", -// map[string]interface{}{"value": float64(12)}) -// } - -// func TestRunParserJSONMsg(t *testing.T) { -// log.SetOutput(io.Discard) -// var testmsg = []byte("{\"a\": 5, \"b\": {\"c\": 6}}\n") - -// listener, in := newTestUDPListener() -// acc := testutil.Accumulator{} -// listener.acc = &acc -// defer close(listener.done) - -// listener.parser, _ = parsers.NewParser(&parsers.Config{ -// DataFormat: "json", -// MetricName: "udp_json_test", -// }) -// listener.wg.Add(1) -// go listener.udpParser() - -// in <- testmsg -// require.NoError(t, listener.Gather(&acc)) - -// acc.Wait(1) -// acc.AssertContainsFields(t, "udp_json_test", -// map[string]interface{}{ -// "a": float64(5), -// "b_c": float64(6), -// }) -// }