chore(inputs.tcp_listener)!: Remove deprecated plugin (#14865)

This commit is contained in:
Sven Rebhan 2024-02-26 18:29:38 +01:00 committed by GitHub
parent 1490207a0f
commit c7c8020dd3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 0 additions and 687 deletions

View File

@ -1,5 +0,0 @@
//go:build !custom || inputs || inputs.tcp_listener
package all
import _ "github.com/influxdata/telegraf/plugins/inputs/tcp_listener" // register plugin

View File

@ -1,37 +0,0 @@
# TCP Listener Input Plugin
**DEPRECATED: As of version 1.3 the TCP listener plugin has been deprecated in
favor of the [socket_listener plugin](../socket_listener/README.md)**
## Service Input <!-- @/docs/includes/service_input.md -->
This plugin is a service input. Normal plugins gather metrics determined by the
interval setting. Service plugins start a service to listens and waits for
metrics or events to occur. Service plugins have two key differences from
normal plugins:
1. The global or plugin specific `interval` setting may not apply
2. The CLI options of `--test`, `--test-wait`, and `--once` may not produce
output for this plugin
## Global configuration options <!-- @/docs/includes/plugin_config.md -->
In addition to the plugin-specific configuration settings, plugins support
additional global and plugin configuration settings. These settings are used to
modify metrics, tags, and field or create aliases and configure ordering, etc.
See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins
## Configuration
```toml @sample.conf
# Generic TCP listener
[[inputs.tcp_listener]]
# socket_listener plugin
# see https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener
```
## Metrics
## Example Output

View File

@ -1,4 +0,0 @@
# Generic TCP listener
[[inputs.tcp_listener]]
# socket_listener plugin
# see https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener

View File

@ -1,304 +0,0 @@
//go:generate ../../../tools/readme_config_includer/generator
package tcp_listener
import (
"bufio"
_ "embed"
"fmt"
"net"
"sync"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/selfstat"
)
//go:embed sample.conf
var sampleConfig string
type TCPListener struct {
ServiceAddress string
AllowedPendingMessages int
MaxTCPConnections int `toml:"max_tcp_connections"`
sync.Mutex
// Lock for preventing a data race during resource cleanup
cleanup sync.Mutex
wg sync.WaitGroup
in chan []byte
done chan struct{}
// accept channel tracks how many active connections there are, if there
// is an available bool in accept, then we are below the maximum and can
// accept the connection
accept chan bool
// drops tracks the number of dropped metrics.
drops int
// malformed tracks the number of malformed packets
malformed int
// track the listener here so we can close it in Stop()
listener *net.TCPListener
// track current connections so we can close them in Stop()
conns map[string]*net.TCPConn
parser telegraf.Parser
acc telegraf.Accumulator
MaxConnections selfstat.Stat
CurrentConnections selfstat.Stat
TotalConnections selfstat.Stat
PacketsRecv selfstat.Stat
BytesRecv selfstat.Stat
Log telegraf.Logger
}
var dropwarn = "tcp_listener message queue full. " +
"We have dropped %d messages so far. " +
"You may want to increase allowed_pending_messages in the config"
var malformedwarn = "tcp_listener has received %d malformed packets" +
" thus far."
func (*TCPListener) SampleConfig() string {
return sampleConfig
}
// All the work is done in the Start() function, so this is just a dummy
// function.
func (t *TCPListener) Gather(_ telegraf.Accumulator) error {
return nil
}
func (t *TCPListener) SetParser(parser telegraf.Parser) {
t.parser = parser
}
// Start starts the tcp listener service.
func (t *TCPListener) Start(acc telegraf.Accumulator) error {
t.Lock()
defer t.Unlock()
t.Log.Warn("DEPRECATED: the TCP listener plugin has been deprecated " +
"in favor of the socket_listener plugin " +
"(https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener)")
tags := map[string]string{
"address": t.ServiceAddress,
}
t.MaxConnections = selfstat.Register("tcp_listener", "max_connections", tags)
t.MaxConnections.Set(int64(t.MaxTCPConnections))
t.CurrentConnections = selfstat.Register("tcp_listener", "current_connections", tags)
t.TotalConnections = selfstat.Register("tcp_listener", "total_connections", tags)
t.PacketsRecv = selfstat.Register("tcp_listener", "packets_received", tags)
t.BytesRecv = selfstat.Register("tcp_listener", "bytes_received", tags)
t.acc = acc
t.in = make(chan []byte, t.AllowedPendingMessages)
t.done = make(chan struct{})
t.accept = make(chan bool, t.MaxTCPConnections)
t.conns = make(map[string]*net.TCPConn)
for i := 0; i < t.MaxTCPConnections; i++ {
t.accept <- true
}
// Start listener
var err error
address, _ := net.ResolveTCPAddr("tcp", t.ServiceAddress)
t.listener, err = net.ListenTCP("tcp", address)
if err != nil {
t.Log.Errorf("Failed to listen: %s", err.Error())
return err
}
t.wg.Add(2)
go t.tcpListen()
go t.tcpParser()
t.Log.Infof("Started TCP listener service on %q", t.ServiceAddress)
return nil
}
// Stop cleans up all resources
func (t *TCPListener) Stop() {
t.Lock()
defer t.Unlock()
close(t.done)
t.listener.Close()
// Close all open TCP connections
// - get all conns from the t.conns map and put into slice
// - this is so the forget() function doesn't conflict with looping
// over the t.conns map
t.cleanup.Lock()
conns := make([]*net.TCPConn, 0, len(t.conns))
for _, conn := range t.conns {
conns = append(conns, conn)
}
t.cleanup.Unlock()
for _, conn := range conns {
conn.Close()
}
t.wg.Wait()
close(t.in)
t.Log.Infof("Stopped TCP listener service on %q", t.ServiceAddress)
}
// tcpListen listens for incoming TCP connections.
func (t *TCPListener) tcpListen() {
defer t.wg.Done()
for {
select {
case <-t.done:
return
default:
// Accept connection:
conn, err := t.listener.AcceptTCP()
if err != nil {
t.Log.Errorf("accepting TCP connection failed: %v", err)
return
}
select {
case <-t.accept:
// generate a random id for this TCPConn
id, err := internal.RandomString(6)
if err != nil {
t.Log.Errorf("generating a random id for TCP connection failed: %v", err)
return
}
// not over connection limit, handle the connection properly.
t.wg.Add(1)
t.remember(id, conn)
go t.handler(conn, id)
default:
// We are over the connection limit, refuse & close.
t.refuser(conn)
}
}
}
}
// refuser refuses a TCP connection
func (t *TCPListener) refuser(conn *net.TCPConn) {
// Tell the connection why we are closing.
fmt.Fprintf(conn, "Telegraf maximum concurrent TCP connections (%d)"+
" reached, closing.\nYou may want to increase max_tcp_connections in"+
" the Telegraf tcp listener configuration.\n", t.MaxTCPConnections)
conn.Close()
t.Log.Infof("Refused TCP Connection from %s", conn.RemoteAddr())
t.Log.Warn("Maximum TCP Connections reached, you may want to adjust max_tcp_connections")
}
// handler handles a single TCP Connection
func (t *TCPListener) handler(conn *net.TCPConn, id string) {
t.CurrentConnections.Incr(1)
t.TotalConnections.Incr(1)
// connection cleanup function
defer func() {
t.wg.Done()
if err := conn.Close(); err != nil {
t.acc.AddError(err)
}
// Add one connection potential back to channel when this one closes
t.accept <- true
t.forget(id)
t.CurrentConnections.Incr(-1)
}()
var n int
scanner := bufio.NewScanner(conn)
for {
select {
case <-t.done:
return
default:
if !scanner.Scan() {
return
}
n = len(scanner.Bytes())
if n == 0 {
continue
}
t.BytesRecv.Incr(int64(n))
t.PacketsRecv.Incr(1)
bufCopy := make([]byte, n+1)
copy(bufCopy, scanner.Bytes())
bufCopy[n] = '\n'
select {
case t.in <- bufCopy:
default:
t.drops++
if t.drops == 1 || t.drops%t.AllowedPendingMessages == 0 {
t.Log.Errorf(dropwarn, t.drops)
}
}
}
}
}
// tcpParser parses the incoming tcp byte packets
func (t *TCPListener) tcpParser() {
defer t.wg.Done()
var packet []byte
var metrics []telegraf.Metric
var err error
for {
select {
case <-t.done:
// drain input packets before finishing:
if len(t.in) == 0 {
return
}
case packet = <-t.in:
if len(packet) == 0 {
continue
}
metrics, err = t.parser.Parse(packet)
if err == nil {
for _, m := range metrics {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
} else {
t.malformed++
if t.malformed == 1 || t.malformed%1000 == 0 {
t.Log.Errorf(malformedwarn, t.malformed)
}
}
}
}
}
// forget a TCP connection
func (t *TCPListener) forget(id string) {
t.cleanup.Lock()
defer t.cleanup.Unlock()
delete(t.conns, id)
}
// remember a TCP connection
func (t *TCPListener) remember(id string, conn *net.TCPConn) {
t.cleanup.Lock()
defer t.cleanup.Unlock()
t.conns[id] = conn
}
func init() {
inputs.Add("tcp_listener", func() telegraf.Input {
return &TCPListener{
ServiceAddress: ":8094",
AllowedPendingMessages: 10000,
MaxTCPConnections: 250,
}
})
}

View File

@ -1,337 +0,0 @@
package tcp_listener
import (
"fmt"
"io"
"net"
"testing"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/plugins/parsers/graphite"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/testutil"
)
const (
testMsg = "cpu_load_short,host=server01 value=12.0 1422568543702900257\n"
testMsgs = `
cpu_load_short,host=server02 value=12.0 1422568543702900257
cpu_load_short,host=server03 value=12.0 1422568543702900257
cpu_load_short,host=server04 value=12.0 1422568543702900257
cpu_load_short,host=server05 value=12.0 1422568543702900257
cpu_load_short,host=server06 value=12.0 1422568543702900257
`
)
func newTestTCPListener() (*TCPListener, chan []byte) {
in := make(chan []byte, 1500)
listener := &TCPListener{
Log: testutil.Logger{},
ServiceAddress: "localhost:8194",
AllowedPendingMessages: 10000,
MaxTCPConnections: 250,
in: in,
done: make(chan struct{}),
}
return listener, in
}
// benchmark how long it takes to accept & process 100,000 metrics:
func BenchmarkTCP(b *testing.B) {
listener := TCPListener{
Log: testutil.Logger{},
ServiceAddress: "localhost:8198",
AllowedPendingMessages: 100000,
MaxTCPConnections: 250,
}
parser := &influx.Parser{}
require.NoError(b, parser.Init())
listener.parser = parser
acc := &testutil.Accumulator{Discard: true}
// send multiple messages to socket
for n := 0; n < b.N; n++ {
require.NoError(b, listener.Start(acc))
conn, err := net.Dial("tcp", "127.0.0.1:8198")
require.NoError(b, err)
for i := 0; i < 100000; i++ {
_, err := fmt.Fprint(conn, testMsg)
require.NoError(b, err)
}
require.NoError(b, conn.(*net.TCPConn).CloseWrite())
// wait for all 100,000 metrics to be processed
buf := []byte{0}
// will EOF when completed
_, err = conn.Read(buf)
require.NoError(b, err)
listener.Stop()
}
}
func TestHighTrafficTCP(t *testing.T) {
listener := TCPListener{
Log: testutil.Logger{},
ServiceAddress: "localhost:8199",
AllowedPendingMessages: 100000,
MaxTCPConnections: 250,
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
listener.parser = parser
acc := &testutil.Accumulator{}
// send multiple messages to socket
require.NoError(t, listener.Start(acc))
conn, err := net.Dial("tcp", "127.0.0.1:8199")
require.NoError(t, err)
for i := 0; i < 100000; i++ {
_, err := fmt.Fprint(conn, testMsg)
require.NoError(t, err)
}
require.NoError(t, conn.(*net.TCPConn).CloseWrite())
buf := []byte{0}
_, err = conn.Read(buf)
require.Equal(t, err, io.EOF)
listener.Stop()
require.Equal(t, 100000, int(acc.NMetrics()))
}
func TestConnectTCP(t *testing.T) {
listener := TCPListener{
Log: testutil.Logger{},
ServiceAddress: "localhost:8194",
AllowedPendingMessages: 10000,
MaxTCPConnections: 250,
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
listener.parser = parser
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
defer listener.Stop()
conn, err := net.Dial("tcp", "127.0.0.1:8194")
require.NoError(t, err)
// send single message to socket
_, err = fmt.Fprint(conn, testMsg)
require.NoError(t, err)
acc.Wait(1)
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": "server01"},
)
// send multiple messages to socket
_, err = fmt.Fprint(conn, testMsgs)
require.NoError(t, err)
acc.Wait(6)
hostTags := []string{"server02", "server03",
"server04", "server05", "server06"}
for _, hostTag := range hostTags {
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": hostTag},
)
}
}
// Test that MaxTCPConnections is respected
func TestConcurrentConns(t *testing.T) {
listener := TCPListener{
Log: testutil.Logger{},
ServiceAddress: "localhost:8195",
AllowedPendingMessages: 10000,
MaxTCPConnections: 2,
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
listener.parser = parser
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
defer listener.Stop()
_, err := net.Dial("tcp", "127.0.0.1:8195")
require.NoError(t, err)
_, err = net.Dial("tcp", "127.0.0.1:8195")
require.NoError(t, err)
// Connection over the limit:
conn, err := net.Dial("tcp", "127.0.0.1:8195")
require.NoError(t, err)
_, err = net.Dial("tcp", "127.0.0.1:8195")
require.NoError(t, err)
buf := make([]byte, 1500)
n, err := conn.Read(buf)
require.NoError(t, err)
require.Equal(t,
"Telegraf maximum concurrent TCP connections (2) reached, closing.\n"+
"You may want to increase max_tcp_connections in"+
" the Telegraf tcp listener configuration.\n",
string(buf[:n]))
_, err = conn.Read(buf)
require.Equal(t, io.EOF, err)
}
// Test that MaxTCPConnections is respected when max==1
func TestConcurrentConns1(t *testing.T) {
listener := TCPListener{
Log: testutil.Logger{},
ServiceAddress: "localhost:8196",
AllowedPendingMessages: 10000,
MaxTCPConnections: 1,
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
listener.parser = parser
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
defer listener.Stop()
_, err := net.Dial("tcp", "127.0.0.1:8196")
require.NoError(t, err)
// Connection over the limit:
conn, err := net.Dial("tcp", "127.0.0.1:8196")
require.NoError(t, err)
_, err = net.Dial("tcp", "127.0.0.1:8196")
require.NoError(t, err)
buf := make([]byte, 1500)
n, err := conn.Read(buf)
require.NoError(t, err)
require.Equal(t,
"Telegraf maximum concurrent TCP connections (1) reached, closing.\n"+
"You may want to increase max_tcp_connections in"+
" the Telegraf tcp listener configuration.\n",
string(buf[:n]))
_, err = conn.Read(buf)
require.Equal(t, io.EOF, err)
}
// Test that MaxTCPConnections is respected
func TestCloseConcurrentConns(t *testing.T) {
listener := TCPListener{
Log: testutil.Logger{},
ServiceAddress: "localhost:8195",
AllowedPendingMessages: 10000,
MaxTCPConnections: 2,
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
listener.parser = parser
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
_, err := net.Dial("tcp", "127.0.0.1:8195")
require.NoError(t, err)
_, err = net.Dial("tcp", "127.0.0.1:8195")
require.NoError(t, err)
listener.Stop()
}
func TestRunParser(t *testing.T) {
var testmsg = []byte(testMsg)
listener, in := newTestTCPListener()
acc := testutil.Accumulator{}
listener.acc = &acc
defer close(listener.done)
parser := &influx.Parser{}
require.NoError(t, parser.Init())
listener.parser = parser
listener.wg.Add(1)
go listener.tcpParser()
in <- testmsg
require.NoError(t, listener.Gather(&acc))
acc.Wait(1)
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": "server01"},
)
}
func TestRunParserInvalidMsg(t *testing.T) {
var testmsg = []byte("cpu_load_short")
logger := &testutil.CaptureLogger{}
listener, in := newTestTCPListener()
listener.Log = logger
listener.acc = &testutil.Accumulator{}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
listener.parser = parser
listener.wg.Add(1)
go listener.tcpParser()
in <- testmsg
listener.Stop()
require.Contains(t, logger.LastError(), "tcp_listener has received 1 malformed packets thus far.")
}
func TestRunParserGraphiteMsg(t *testing.T) {
var testmsg = []byte("cpu.load.graphite 12 1454780029")
listener, in := newTestTCPListener()
acc := testutil.Accumulator{}
listener.acc = &acc
defer close(listener.done)
p := graphite.Parser{Separator: "_", Templates: []string{}}
require.NoError(t, p.Init())
listener.parser = &p
listener.wg.Add(1)
go listener.tcpParser()
in <- testmsg
require.NoError(t, listener.Gather(&acc))
acc.Wait(1)
acc.AssertContainsFields(t, "cpu_load_graphite",
map[string]interface{}{"value": float64(12)})
}
func TestRunParserJSONMsg(t *testing.T) {
var testmsg = []byte("{\"a\": 5, \"b\": {\"c\": 6}}\n")
listener, in := newTestTCPListener()
acc := testutil.Accumulator{}
listener.acc = &acc
defer close(listener.done)
parser := &json.Parser{MetricName: "udp_json_test"}
require.NoError(t, parser.Init())
listener.parser = parser
listener.wg.Add(1)
go listener.tcpParser()
in <- testmsg
require.NoError(t, listener.Gather(&acc))
acc.Wait(1)
acc.AssertContainsFields(t, "udp_json_test",
map[string]interface{}{
"a": float64(5),
"b_c": float64(6),
})
}