feat(inputs.powerdns_recursor): Support for new PowerDNS recursor control protocol (#9633)

This commit is contained in:
Orfeas Zafeiris 2022-12-12 19:01:49 +02:00 committed by GitHub
parent e264721cb9
commit 459a658224
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 582 additions and 116 deletions

View File

@ -25,8 +25,26 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
# socket_dir = "/var/run/"
## Socket permissions for the receive socket.
# socket_mode = "0666"
## The version of the PowerDNS control protocol to use. You will have to
## change this based on your PowerDNS Recursor version, see below:
## Version 1: PowerDNS <4.5.0
## Version 2: PowerDNS 4.5.0 - 4.5.11
## Version 3: PowerDNS >=4.6.0
## By default this is set to 1.
# control_protocol_version = 1
```
### Newer PowerDNS Recursor versions
By default, this plugin is compatible with PowerDNS Recursor versions older
than `4.5.0`. If you are using a newer version then you'll need to adjust the
`control_protocol_version` configuration option based on your version. For
versions between `4.5.0` and `4.5.11` set it to `2` and for versions `4.6.0`
and newer set it to `3`. If you don't, you will get an `i/o timeout` or a
`protocol wrong type for socket` error.
### Permissions
Telegraf will need read/write access to the control socket and to the

View File

@ -2,16 +2,10 @@
package powerdns_recursor
import (
"bufio"
_ "embed"
"errors"
"fmt"
"math/rand"
"net"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/influxdata/telegraf"
@ -22,13 +16,15 @@ import (
var sampleConfig string
type PowerdnsRecursor struct {
UnixSockets []string `toml:"unix_sockets"`
SocketDir string `toml:"socket_dir"`
SocketMode string `toml:"socket_mode"`
UnixSockets []string `toml:"unix_sockets"`
SocketDir string `toml:"socket_dir"`
SocketMode string `toml:"socket_mode"`
ControlProtocolVersion int `toml:"control_protocol_version"`
Log telegraf.Logger `toml:"-"`
mode uint32
mode uint32
gatherFromServer func(address string, acc telegraf.Accumulator) error
}
var defaultTimeout = 5 * time.Second
@ -46,16 +42,33 @@ func (p *PowerdnsRecursor) Init() error {
p.mode = uint32(mode)
}
if p.SocketDir == "" {
p.SocketDir = filepath.Join("/", "var", "run")
}
switch p.ControlProtocolVersion {
// We treat 0 the same as 1 since it's the default value if a user doesn't explicitly specify one.
case 0, 1:
p.gatherFromServer = p.gatherFromV1Server
case 2:
p.gatherFromServer = p.gatherFromV2Server
case 3:
p.gatherFromServer = p.gatherFromV3Server
default:
return fmt.Errorf("unknown control protocol version '%d', allowed values are 1, 2, 3", p.ControlProtocolVersion)
}
if len(p.UnixSockets) == 0 {
p.UnixSockets = []string{"/var/run/pdns_recursor.controlsocket"}
}
return nil
}
func (p *PowerdnsRecursor) Gather(acc telegraf.Accumulator) error {
if len(p.UnixSockets) == 0 {
return p.gatherServer("/var/run/pdns_recursor.controlsocket", acc)
}
for _, serverSocket := range p.UnixSockets {
if err := p.gatherServer(serverSocket, acc); err != nil {
if err := p.gatherFromServer(serverSocket, acc); err != nil {
acc.AddError(err)
}
}
@ -63,91 +76,6 @@ func (p *PowerdnsRecursor) Gather(acc telegraf.Accumulator) error {
return nil
}
func (p *PowerdnsRecursor) gatherServer(address string, acc telegraf.Accumulator) error {
randomNumber := rand.Int63()
recvSocket := filepath.Join("/", "var", "run", fmt.Sprintf("pdns_recursor_telegraf%d", randomNumber))
if p.SocketDir != "" {
recvSocket = filepath.Join(p.SocketDir, fmt.Sprintf("pdns_recursor_telegraf%d", randomNumber))
}
laddr, err := net.ResolveUnixAddr("unixgram", recvSocket)
if err != nil {
return err
}
defer os.Remove(recvSocket)
raddr, err := net.ResolveUnixAddr("unixgram", address)
if err != nil {
return err
}
conn, err := net.DialUnix("unixgram", laddr, raddr)
if err != nil {
return err
}
if err := os.Chmod(recvSocket, os.FileMode(p.mode)); err != nil {
return err
}
defer conn.Close()
if err := conn.SetDeadline(time.Now().Add(defaultTimeout)); err != nil {
return err
}
// Read and write buffer
rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
// Send command
if _, err := fmt.Fprint(rw, "get-all\n"); err != nil {
return err
}
if err := rw.Flush(); err != nil {
return err
}
// Read data
buf := make([]byte, 16384)
n, err := rw.Read(buf)
if err != nil {
return err
}
if n == 0 {
return errors.New("no data received")
}
metrics := string(buf)
// Process data
fields := p.parseResponse(metrics)
// Add server socket as a tag
tags := map[string]string{"server": address}
acc.AddFields("powerdns_recursor", fields, tags)
return conn.Close()
}
func (p *PowerdnsRecursor) parseResponse(metrics string) map[string]interface{} {
values := make(map[string]interface{})
s := strings.Split(metrics, "\n")
for _, metric := range s[:len(s)-1] {
m := strings.Split(metric, "\t")
if len(m) < 2 {
continue
}
i, err := strconv.ParseInt(m[1], 10, 64)
if err != nil {
p.Log.Errorf("error parsing integer for metric %q: %s", metric, err.Error())
continue
}
values[m[0]] = i
}
return values
}
func init() {
inputs.Add("powerdns_recursor", func() telegraf.Input {
return &PowerdnsRecursor{

View File

@ -96,12 +96,13 @@ var intOverflowMetrics = "all-outqueries\t18446744073709550195\nanswers-slow\t36
"x-our-latency\t19\nx-ourtime-slow\t632\nx-ourtime0-1\t3060079\nx-ourtime1-2\t3351\nx-ourtime16-32\t197\n" +
"x-ourtime2-4\t302\nx-ourtime4-8\t194\nx-ourtime8-16\t24\n"
func TestPowerdnsRecursorGeneratesMetrics(t *testing.T) {
func TestV1PowerdnsRecursorGeneratesMetrics(t *testing.T) {
if runtime.GOOS == "darwin" || runtime.GOOS == "windows" {
t.Skip("Skipping on windows and darwin, as unixgram sockets are not supported")
}
// We create a fake server to return test data
controlSocket := "/tmp/pdns5724354148158589552.controlsocket"
defer os.Remove(controlSocket)
addr, err := net.ResolveUnixAddr("unixgram", controlSocket)
require.NoError(t, err, "Cannot parse unix socket")
socket, err := net.ListenUnixgram("unixgram", addr)
@ -158,6 +159,174 @@ func TestPowerdnsRecursorGeneratesMetrics(t *testing.T) {
wg.Wait()
testReturnedMetrics(t, &acc)
}
func TestV2PowerdnsRecursorGeneratesMetrics(t *testing.T) {
if runtime.GOOS == "darwin" || runtime.GOOS == "windows" {
t.Skip("Skipping on windows and darwin, as unixgram sockets are not supported")
}
// We create a fake server to return test data
controlSocket := "/tmp/pdns-v2-5724354148158589552.controlsocket"
defer os.Remove(controlSocket)
addr, err := net.ResolveUnixAddr("unixgram", controlSocket)
require.NoError(t, err, "Cannot parse unix socket")
socket, err := net.ListenUnixgram("unixgram", addr)
require.NoError(t, err, "Cannot initialize server on port")
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer func() {
// Ignore the returned error as we need to remove the socket file anyway
//nolint:errcheck,revive
socket.Close()
// Ignore the returned error as we want to remove the file and ignore
// no-such-file errors
//nolint:errcheck,revive
os.Remove(controlSocket)
wg.Done()
}()
for {
status := make([]byte, 4)
n, _, err := socket.ReadFromUnix(status)
if err != nil || n != 4 {
// Ignore the returned error as we cannot do anything about it anyway
//nolint:errcheck,revive
socket.Close()
return
}
buf := make([]byte, 1024)
n, remote, err := socket.ReadFromUnix(buf)
if err != nil {
// Ignore the returned error as we cannot do anything about it anyway
//nolint:errcheck,revive
socket.Close()
return
}
data := buf[:n]
if string(data) == "get-all" {
// Ignore the returned error as we need to close the socket anyway
//nolint:errcheck,revive
socket.WriteToUnix([]byte{0, 0, 0, 0}, remote)
//nolint:errcheck,revive
socket.WriteToUnix([]byte(metrics), remote)
// Ignore the returned error as we cannot do anything about it anyway
//nolint:errcheck,revive
socket.Close()
}
time.Sleep(100 * time.Millisecond)
}
}()
p := &PowerdnsRecursor{
UnixSockets: []string{controlSocket},
SocketDir: "/tmp",
SocketMode: "0666",
ControlProtocolVersion: 2,
}
require.NoError(t, p.Init())
var acc testutil.Accumulator
require.NoError(t, acc.GatherError(p.Gather))
wg.Wait()
testReturnedMetrics(t, &acc)
}
func TestV3PowerdnsRecursorGeneratesMetrics(t *testing.T) {
if runtime.GOOS == "darwin" || runtime.GOOS == "windows" {
t.Skip("Skipping on windows and darwin, as unixgram sockets are not supported")
}
// We create a fake server to return test data
controlSocket := "/tmp/pdns-v3-5724354148158589552.controlsocket"
defer os.Remove(controlSocket)
socket, err := net.Listen("unix", controlSocket)
require.NoError(t, err, "Cannot initialize server on port")
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer func() {
// Ignore the returned error as we need to remove the socket file anyway
//nolint:errcheck,revive
socket.Close()
// Ignore the returned error as we want to remove the file and ignore
// no-such-file errors
//nolint:errcheck,revive
os.Remove(controlSocket)
wg.Done()
}()
for {
conn, err := socket.Accept()
if err != nil {
return
}
status := make([]byte, 4)
n, err := conn.Read(status)
if err != nil || n != 4 {
return
}
dataLen, err := readNativeUIntFromConn(conn)
if err != nil || dataLen == 0 || dataLen >= 16384 {
return
}
buf := make([]byte, dataLen)
n, err = conn.Read(buf)
if err != nil || uint(n) != dataLen {
return
}
if string(buf) == "get-all" {
// Ignore the returned error as we need to close the socket anyway
//nolint:errcheck,revive
conn.Write([]byte{0, 0, 0, 0})
metrics := []byte(metrics)
//nolint:errcheck,revive
writeNativeUIntToConn(conn, uint(len(metrics)))
//nolint:errcheck,revive
conn.Write(metrics)
// Ignore the returned error as we cannot do anything about it anyway
//nolint:errcheck,revive
socket.Close()
}
time.Sleep(100 * time.Millisecond)
}
}()
p := &PowerdnsRecursor{
UnixSockets: []string{controlSocket},
SocketDir: "/tmp",
SocketMode: "0666",
ControlProtocolVersion: 3,
}
require.NoError(t, p.Init())
var acc testutil.Accumulator
require.NoError(t, acc.GatherError(p.Gather))
wg.Wait()
testReturnedMetrics(t, &acc)
}
func testReturnedMetrics(t *testing.T, acc *testutil.Accumulator) {
intMetrics := []string{"all-outqueries", "answers-slow", "answers0-1", "answers1-10",
"answers10-100", "answers100-1000", "auth-zone-queries", "auth4-answers-slow",
"auth4-answers0-1", "auth4-answers1-10", "auth4-answers10-100", "auth4-answers100-1000",
@ -188,11 +357,7 @@ func TestPowerdnsRecursorGeneratesMetrics(t *testing.T) {
}
func TestPowerdnsRecursorParseMetrics(t *testing.T) {
p := &PowerdnsRecursor{
Log: testutil.Logger{},
}
values := p.parseResponse(metrics)
values := parseResponse(metrics)
tests := []struct {
key string
@ -312,11 +477,7 @@ func TestPowerdnsRecursorParseMetrics(t *testing.T) {
}
func TestPowerdnsRecursorParseCorruptMetrics(t *testing.T) {
p := &PowerdnsRecursor{
Log: testutil.Logger{},
}
values := p.parseResponse(corruptMetrics)
values := parseResponse(corruptMetrics)
tests := []struct {
key string
@ -435,11 +596,7 @@ func TestPowerdnsRecursorParseCorruptMetrics(t *testing.T) {
}
func TestPowerdnsRecursorParseIntOverflowMetrics(t *testing.T) {
p := &PowerdnsRecursor{
Log: testutil.Logger{},
}
values := p.parseResponse(intOverflowMetrics)
values := parseResponse(intOverflowMetrics)
tests := []struct {
key string

View File

@ -0,0 +1,88 @@
package powerdns_recursor
import (
"fmt"
"net"
"strconv"
"strings"
"github.com/influxdata/telegraf/internal"
)
func parseResponse(metrics string) map[string]interface{} {
values := make(map[string]interface{})
s := strings.Split(metrics, "\n")
if len(s) < 1 {
return values
}
for _, metric := range s[:len(s)-1] {
m := strings.Split(metric, "\t")
if len(m) < 2 {
continue
}
i, err := strconv.ParseInt(m[1], 10, 64)
if err != nil {
continue
}
values[m[0]] = i
}
return values
}
// This below is generally unsafe but necessary in this case
// since the powerdns protocol encoding is host dependent.
// The C implementation uses size_t as the size type for the
// command length. The size and endianness of size_t change
// depending on the platform the program is being run on.
// Using the target architecture endianness and the known
// integer size, we can "recreate" the corresponding C
// behavior in an effort to maintain compatibility. Of course
// in cases where one program is compiled for i386 and the
// other for amd64 (and similar), this method will fail.
const uintSizeInBytes = strconv.IntSize / 8
func writeNativeUIntToConn(conn net.Conn, value uint) error {
intData := make([]byte, uintSizeInBytes)
switch uintSizeInBytes {
case 4:
internal.HostEndianess.PutUint32(intData, uint32(value))
case 8:
internal.HostEndianess.PutUint64(intData, uint64(value))
default:
return fmt.Errorf("unsupported system configuration")
}
_, err := conn.Write(intData)
return err
}
func readNativeUIntFromConn(conn net.Conn) (uint, error) {
intData := make([]byte, uintSizeInBytes)
n, err := conn.Read(intData)
if err != nil {
return 0, err
}
if n != uintSizeInBytes {
return 0, fmt.Errorf("did not read enough data for native uint: read '%v' bytes, expected '%v'", n, uintSizeInBytes)
}
switch uintSizeInBytes {
case 4:
return uint(internal.HostEndianess.Uint32(intData)), nil
case 8:
return uint(internal.HostEndianess.Uint64(intData)), nil
default:
return 0, fmt.Errorf("unsupported system configuration")
}
}

View File

@ -0,0 +1,81 @@
package powerdns_recursor
import (
"fmt"
"math/rand"
"net"
"os"
"path/filepath"
"time"
"github.com/influxdata/telegraf"
)
// V1 (before 4.5.0) Protocol:
// Unix datagram socket
// Synchronous request / response, individual datagrams
// Structure:
// data: byte[]
// The `data` field contains a list of commands to execute with
// the \n character after every command.
func (p *PowerdnsRecursor) gatherFromV1Server(address string, acc telegraf.Accumulator) error {
randomNumber := rand.Int63()
recvSocket := filepath.Join(p.SocketDir, fmt.Sprintf("pdns_recursor_telegraf%d", randomNumber))
laddr, err := net.ResolveUnixAddr("unixgram", recvSocket)
if err != nil {
return err
}
defer os.Remove(recvSocket)
raddr, err := net.ResolveUnixAddr("unixgram", address)
if err != nil {
return err
}
conn, err := net.DialUnix("unixgram", laddr, raddr)
if err != nil {
return err
}
defer conn.Close()
if err := os.Chmod(recvSocket, os.FileMode(p.mode)); err != nil {
return err
}
if err := conn.SetDeadline(time.Now().Add(defaultTimeout)); err != nil {
return err
}
// Then send the get-all command.
command := "get-all\n"
_, err = conn.Write([]byte(command))
if err != nil {
return err
}
// Read the response data.
buf := make([]byte, 16_384)
n, err := conn.Read(buf)
if err != nil {
return err
}
if n == 0 {
return fmt.Errorf("no data received")
}
metrics := string(buf)
// Process data
fields := parseResponse(metrics)
// Add server socket as a tag
tags := map[string]string{"server": address}
acc.AddFields("powerdns_recursor", fields, tags)
return nil
}

View File

@ -0,0 +1,95 @@
package powerdns_recursor
import (
"fmt"
"math/rand"
"net"
"os"
"path/filepath"
"time"
"github.com/influxdata/telegraf"
)
// V2 (4.5.0 - 4.5.9) Protocol:
// Unix datagram socket
// Synchronous request / response, individual datagrams
// Datagram 1 => status: uint32
// Datagram 2 => data: byte[] (max 16_384 bytes)
func (p *PowerdnsRecursor) gatherFromV2Server(address string, acc telegraf.Accumulator) error {
randomNumber := rand.Int63()
recvSocket := filepath.Join(p.SocketDir, fmt.Sprintf("pdns_recursor_telegraf%d", randomNumber))
laddr, err := net.ResolveUnixAddr("unixgram", recvSocket)
if err != nil {
return err
}
defer os.Remove(recvSocket)
raddr, err := net.ResolveUnixAddr("unixgram", address)
if err != nil {
return err
}
conn, err := net.DialUnix("unixgram", laddr, raddr)
if err != nil {
return err
}
defer conn.Close()
if err := os.Chmod(recvSocket, os.FileMode(p.mode)); err != nil {
return err
}
if err := conn.SetDeadline(time.Now().Add(defaultTimeout)); err != nil {
return err
}
// First send a 0 status code.
_, err = conn.Write([]byte{0, 0, 0, 0})
if err != nil {
return err
}
// Then send the get-all command.
command := "get-all"
_, err = conn.Write([]byte(command))
if err != nil {
return err
}
// Read the response status code.
status := make([]byte, 4)
n, err := conn.Read(status)
if err != nil {
return err
}
if n == 0 {
return fmt.Errorf("no status code received")
}
// Read the response data.
buf := make([]byte, 16_384)
n, err = conn.Read(buf)
if err != nil {
return err
}
if n == 0 {
return fmt.Errorf("no data received")
}
metrics := string(buf)
// Process data
fields := parseResponse(metrics)
// Add server socket as a tag
tags := map[string]string{"server": address}
acc.AddFields("powerdns_recursor", fields, tags)
return nil
}

View File

@ -0,0 +1,90 @@
package powerdns_recursor
import (
"fmt"
"net"
"time"
"github.com/influxdata/telegraf"
)
// V3 (4.6.0+) Protocol:
// Standard unix stream socket
// Synchronous request / response
// Data structure:
// status: uint32
// dataLength: size_t
// data: byte[dataLength]
func (p *PowerdnsRecursor) gatherFromV3Server(address string, acc telegraf.Accumulator) error {
conn, err := net.Dial("unix", address)
if err != nil {
return err
}
defer conn.Close()
if err := conn.SetDeadline(time.Now().Add(defaultTimeout)); err != nil {
return err
}
// Write 4-byte response code.
if _, err = conn.Write([]byte{0, 0, 0, 0}); err != nil {
return err
}
command := []byte("get-all")
if err = writeNativeUIntToConn(conn, uint(len(command))); err != nil {
return err
}
if _, err = conn.Write(command); err != nil {
return err
}
// Now read the response.
status := make([]byte, 4)
n, err := conn.Read(status)
if err != nil {
return err
}
if n == 0 {
return fmt.Errorf("no status code received")
}
responseLength, err := readNativeUIntFromConn(conn)
if err != nil {
return err
}
if responseLength == 0 {
return fmt.Errorf("received data length was '0'")
}
// Don't allow more than 64kb of data to prevent DOS / issues
// with architecture mismatch. V2 protocol allowed for up to
// 16kb, so 64kb should give us a pretty good margin for anything
// that has been added since.
if responseLength > 64*1024 {
return fmt.Errorf("received data length was '%d', we only allow up to '%d'", responseLength, 64*1024)
}
data := make([]byte, responseLength)
n, err = conn.Read(data)
if err != nil {
return err
}
if uint(n) != responseLength {
return fmt.Errorf("no data received, expected '%v' bytes but got '%v'", responseLength, n)
}
// Process data
metrics := string(data)
fields := parseResponse(metrics)
// Add server socket as a tag
tags := map[string]string{"server": address}
acc.AddFields("powerdns_recursor", fields, tags)
return nil
}

View File

@ -8,3 +8,12 @@
# socket_dir = "/var/run/"
## Socket permissions for the receive socket.
# socket_mode = "0666"
## The version of the PowerDNS control protocol to use. You will have to
## change this based on your PowerDNS Recursor version, see below:
## Version 1: PowerDNS <4.5.0
## Version 2: PowerDNS 4.5.0 - 4.5.11
## Version 3: PowerDNS >=4.6.0
## By default this is set to 1.
# control_protocol_version = 1