feat(inputs.p4runtime): Implementation of P4Runtime input plugin (#12473)

Co-Authored-By: Jakub Sikorski <jakub.sikorski@intel.com>
This commit is contained in:
jokuniew 2023-02-07 16:45:27 +01:00 committed by GitHub
parent f12870693e
commit 0f2db7abb8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 1146 additions and 6 deletions

View File

@ -259,6 +259,7 @@ following works:
- github.com/opencontainers/runc [Apache License 2.0](https://github.com/opencontainers/runc/blob/main/LICENSE)
- github.com/opensearch-project/opensearch-go [Apache License 2.0](https://github.com/opensearch-project/opensearch-go/blob/main/LICENSE.txt)
- github.com/opentracing/opentracing-go [Apache License 2.0](https://github.com/opentracing/opentracing-go/blob/master/LICENSE)
- github.com/p4lang/p4runtime [Apache License 2.0](https://github.com/p4lang/p4runtime/blob/main/LICENSE)
- github.com/pborman/ansi [BSD 3-Clause "New" or "Revised" License](https://github.com/pborman/ansi/blob/master/LICENSE)
- github.com/philhofer/fwd [MIT License](https://github.com/philhofer/fwd/blob/master/LICENSE.md)
- github.com/pierrec/lz4 [BSD 3-Clause "New" or "Revised" License](https://github.com/pierrec/lz4/blob/master/LICENSE)

7
go.mod
View File

@ -132,6 +132,7 @@ require (
github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b
github.com/openzipkin-contrib/zipkin-go-opentracing v0.4.5
github.com/openzipkin/zipkin-go v0.2.5
github.com/p4lang/p4runtime v1.3.0
github.com/pborman/ansi v1.0.0
github.com/pion/dtls/v2 v2.1.5
github.com/pkg/errors v0.9.1
@ -172,7 +173,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.34.0
go.opentelemetry.io/otel/sdk/metric v0.34.0
go.starlark.net v0.0.0-20220328144851-d1966c6b9fcd
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4
golang.org/x/mod v0.6.0
golang.org/x/net v0.5.0
golang.org/x/oauth2 v0.3.0
golang.org/x/sync v0.1.0
@ -423,9 +424,9 @@ require (
go.uber.org/multierr v1.9.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/crypto v0.3.0 // indirect
golang.org/x/exp v0.0.0-20200513190911-00229845015e // indirect
golang.org/x/exp v0.0.0-20230202163644-54bba9f4231b
golang.org/x/time v0.1.0 // indirect
golang.org/x/tools v0.1.12 // indirect
golang.org/x/tools v0.2.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
golang.zx2c4.com/wireguard v0.0.0-20211209221555-9c9e7e272434 // indirect
google.golang.org/appengine v1.6.7 // indirect

13
go.sum
View File

@ -2132,6 +2132,8 @@ github.com/openzipkin/zipkin-go v0.2.5 h1:UwtQQx2pyPIgWYHRg+epgdx1/HnBQTgN3/oIYE
github.com/openzipkin/zipkin-go v0.2.5/go.mod h1:KpXfKdgRDnnhsxw4pNIH9Md5lyFqKUa4YDFlwRYAMyE=
github.com/ory/go-acc v0.2.6/go.mod h1:4Kb/UnPcT8qRAk3IAxta+hvVapdxTLWtrr7bFLlEgpw=
github.com/ory/viper v1.7.5/go.mod h1:ypOuyJmEUb3oENywQZRgeAMwqgOyDqwboO1tj3DjTaM=
github.com/p4lang/p4runtime v1.3.0 h1:3fUhHj0JtsGcL2Bh0uxpACdBJBDqpZyLgj93tqKzoJY=
github.com/p4lang/p4runtime v1.3.0/go.mod h1:voPsRsgz/TDEhcaFvBxfMbI++hSKR/QGJusJveEs9Jg=
github.com/pact-foundation/pact-go v1.0.4/go.mod h1:uExwJY4kCzNPcHRj+hCR/HBbOOIwwtUjcrb0b5/5kLM=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
@ -2883,8 +2885,9 @@ golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20200331195152-e8c3332aa8e5/go.mod h1:4M0jN8W1tt0AVLNr8HDosyJCDCDuyL9N9+3m7wDWgKw=
golang.org/x/exp v0.0.0-20200513190911-00229845015e h1:rMqLP+9XLy+LdbCXHjJHAmTfXCr93W7oruWA6Hq1Alc=
golang.org/x/exp v0.0.0-20200513190911-00229845015e/go.mod h1:4M0jN8W1tt0AVLNr8HDosyJCDCDuyL9N9+3m7wDWgKw=
golang.org/x/exp v0.0.0-20230202163644-54bba9f4231b h1:EqBVA+nNsObCwQoBEHy4wLU0pi7i8a4AL3pbItPdPkE=
golang.org/x/exp v0.0.0-20230202163644-54bba9f4231b/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
@ -2918,8 +2921,9 @@ golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.6.0 h1:b9gGHsz9/HhJ3HF5DHQytPpuwocVTChQJK3AvoLRD5I=
golang.org/x/mod v0.6.0/go.mod h1:4mET923SAdbXp2ki8ey+zGs1SLqsuM2Y0uvdZR/fUNI=
golang.org/x/net v0.0.0-20150829230318-ea47fc708ee3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180530234432-1e491301e022/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@ -3396,8 +3400,9 @@ golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2/go.mod h1:o0xws9oXOQQZyj
golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo=
golang.org/x/tools v0.1.8/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU=
golang.org/x/tools v0.1.11/go.mod h1:SgwaegtQh8clINPpECJMqnxLv9I09HLqnW3RMqW0CA4=
golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.2.0 h1:G6AHpWxTMGY1KyEYoAQ5WTtIekUUvDNjan3ugu60JvE=
golang.org/x/tools v0.2.0/go.mod h1:y4OqIKeOV/fWJetJ8bXPU1sEVniLMIyDAZWeHdV+NTA=
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@ -3510,6 +3515,7 @@ google.golang.org/genproto v0.0.0-20200228133532-8c2c7df3a383/go.mod h1:55QSHmfG
google.golang.org/genproto v0.0.0-20200305110556-506484158171/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200312145019-da6875a35672/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200331122359-1ee6d9798940/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200413115906-b5235f65be36/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200430143042-b979b6f78d84/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200511104702-f5ebc3bea380/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
@ -3597,6 +3603,7 @@ google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60=
google.golang.org/grpc v1.28.1/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60=
google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk=
google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=

View File

@ -0,0 +1,5 @@
//go:build !custom || inputs || inputs.p4runtime
package all
import _ "github.com/influxdata/telegraf/plugins/inputs/p4runtime" // register plugin

View File

@ -0,0 +1,95 @@
# P4 Runtime Input Plugin
P4 is a language for programming the data plane of network devices,
such as Programmable Switches or Programmable Network Interface Cards.
The P4Runtime API is a control plane specification to manage
the data plane elements of those devices dynamically by a P4 program.
The `p4runtime` plugin gathers metrics about `Counter` values
present in P4 Program loaded onto networking device.
Metrics are collected through gRPC connection with
[P4Runtime](https://github.com/p4lang/p4runtime) server.
P4Runtime Plugin uses `PkgInfo.Name` field.
If user wants to gather information about program name, please follow
[6.2.1.Annotating P4 code with PkgInfo] instruction and apply changes
to your P4 program.
## Global configuration options <!-- @/docs/includes/plugin_config.md -->
In addition to the plugin-specific configuration settings, plugins support
additional global and plugin configuration settings. These settings are used to
modify metrics, tags, and field or create aliases and configure ordering, etc.
See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins
## Configuration
```toml @sample.conf
# P4Runtime telemetry input plugin
[[inputs.p4runtime]]
## Define the endpoint of P4Runtime gRPC server to collect metrics.
# endpoint = "127.0.0.1:9559"
## Set DeviceID required for Client Arbitration.
## https://p4.org/p4-spec/p4runtime/main/P4Runtime-Spec.html#sec-client-arbitration-and-controller-replication
# device_id = 1
## Filter counters by their names that should be observed.
## Example: counter_names_include=["ingressCounter", "egressCounter"]
# counter_names_include = []
## Optional TLS Config.
## Enable client-side TLS and define CA to authenticate the device.
# enable_tls = false
# tls_ca = "/etc/telegraf/ca.crt"
## Set minimal TLS version to accept by the client.
# tls_min_version = "TLS12"
## Use TLS but skip chain & host verification.
# insecure_skip_verify = true
## Define client-side TLS certificate & key to authenticate to the device.
# tls_cert = "/etc/telegraf/client.crt"
# tls_key = "/etc/telegraf/client.key"
```
## Metrics
P4Runtime gRPC server communicates using [p4runtime.proto] Protocol Buffer.
Static information about P4 program loaded into programmable switch
are collected by `GetForwardingPipelineConfigRequest` message.
Plugin gathers dynamic metrics with `Read` method.
`Readrequest` is defined with single `Entity` of type `CounterEntry`.
Since P4 Counter is array, plugin collects values of every cell of array
by [wildcard query].
Counters defined in P4 Program have unique ID and name.
Counters are arrays, thus `counter_index` informs
which cell value of array is described in metric.
Tags are constructed in given manner:
- `p4program_name`: P4 program name provided by user.
If user wants to gather information about program name, please follow
[6.2.1.Annotating P4 code with PkgInfo] instruction and apply changes
to your P4 program.
- `counter_name`: Name of given counter in P4 program.
- `counter_type`: Type of counter (BYTES, PACKETS, BOTH).
Fields are constructed in given manner:
- `bytes`: Number of bytes gathered in counter.
- `packets` Number of packets gathered in counter.
- `counter_index`: Index at which metrics are collected in P4 counter.
## Example Output
Expected output for p4runtime plugin instance
running on host named `p4runtime-host`:
```shell
p4_runtime,counter_name=MyIngress.egressTunnelCounter,counter_type=BOTH,host=p4 bytes=408i,packets=4i,counter_index=200i 1675175030000000000
```
[6.2.1.Annotating P4 code with PkgInfo]: https://p4.org/p4-spec/p4runtime/main/P4Runtime-Spec.html#sec-annotating-p4-code-with-pkginfo
[p4runtime.proto]: https://github.com/p4lang/p4runtime/blob/main/proto/p4/v1/p4runtime.proto
[wildcard query]: https://github.com/p4lang/p4runtime/blob/main/proto/p4/v1/p4runtime.proto#L379

View File

@ -0,0 +1,230 @@
package p4runtime
import (
"context"
"crypto/tls"
_ "embed"
"fmt"
"io"
"sync"
p4ConfigV1 "github.com/p4lang/p4runtime/go/p4/config/v1"
p4v1 "github.com/p4lang/p4runtime/go/p4/v1"
"golang.org/x/exp/slices"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"github.com/influxdata/telegraf"
internaltls "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
)
//go:embed sample.conf
var sampleConfig string
const (
defaultDeviceID = 1
defaultEndpoint = "127.0.0.1:9559"
)
type P4runtime struct {
Endpoint string `toml:"endpoint"`
DeviceID uint64 `toml:"device_id"`
CounterNamesInclude []string `toml:"counter_names_include"`
Log telegraf.Logger `toml:"-"`
EnableTLS bool `toml:"enable_tls"`
internaltls.ClientConfig
conn *grpc.ClientConn
client p4v1.P4RuntimeClient
wg sync.WaitGroup
}
func (*P4runtime) SampleConfig() string {
return sampleConfig
}
func (p *P4runtime) Init() error {
if p.Endpoint == "" {
p.Log.Debugf("Using default Endpoint: %v", defaultEndpoint)
p.Endpoint = defaultEndpoint
}
return p.newP4RuntimeClient()
}
func (p *P4runtime) Gather(acc telegraf.Accumulator) error {
p4Info, err := p.getP4Info()
if err != nil {
return err
}
if len(p4Info.Counters) == 0 {
p.Log.Warn("No counters available in P4 Program!")
return nil
}
filteredCounters := filterCounters(p4Info.Counters, p.CounterNamesInclude)
if len(filteredCounters) == 0 {
p.Log.Warn("No filtered counters available in P4 Program!")
return nil
}
for _, counter := range filteredCounters {
p.wg.Add(1)
go func(counter *p4ConfigV1.Counter) {
defer p.wg.Done()
entries, err := p.readAllEntries(counter.Preamble.Id)
if err != nil {
acc.AddError(
fmt.Errorf(
"reading counter entries with ID=%v failed with error: %w",
counter.Preamble.Id,
err,
),
)
return
}
for _, entry := range entries {
ce := entry.GetCounterEntry()
if ce == nil {
acc.AddError(fmt.Errorf("reading counter entry from entry %v failed", entry))
continue
}
if ce.Data.ByteCount == 0 && ce.Data.PacketCount == 0 {
continue
}
fields := map[string]interface{}{
"bytes": ce.Data.ByteCount,
"packets": ce.Data.PacketCount,
"counter_index": ce.Index.Index,
}
tags := map[string]string{
"p4program_name": p4Info.PkgInfo.Name,
"counter_name": counter.Preamble.Name,
"counter_type": counter.Spec.Unit.String(),
}
acc.AddFields("p4_runtime", fields, tags)
}
}(counter)
}
p.wg.Wait()
return nil
}
func (p *P4runtime) Stop() {
p.conn.Close()
p.wg.Wait()
}
func initConnection(endpoint string, tlscfg *tls.Config) (*grpc.ClientConn, error) {
var creds credentials.TransportCredentials
if tlscfg != nil {
creds = credentials.NewTLS(tlscfg)
} else {
creds = insecure.NewCredentials()
}
return grpc.Dial(endpoint, grpc.WithTransportCredentials(creds))
}
func (p *P4runtime) getP4Info() (*p4ConfigV1.P4Info, error) {
req := &p4v1.GetForwardingPipelineConfigRequest{
DeviceId: p.DeviceID,
ResponseType: p4v1.GetForwardingPipelineConfigRequest_ALL,
}
resp, err := p.client.GetForwardingPipelineConfig(context.Background(), req)
if err != nil {
return nil, fmt.Errorf("error when retrieving forwarding pipeline config: %w", err)
}
config := resp.GetConfig()
if config == nil {
return nil, fmt.Errorf(
"error when retrieving config from forwarding pipeline - pipeline doesn't have a config yet: %w",
err,
)
}
p4info := config.GetP4Info()
if p4info == nil {
return nil, fmt.Errorf(
"error when retrieving P4Info from config - config doesn't have a P4Info: %w",
err,
)
}
return p4info, nil
}
func filterCounters(counters []*p4ConfigV1.Counter, counterNamesInclude []string) []*p4ConfigV1.Counter {
if len(counterNamesInclude) == 0 {
return counters
}
var filteredCounters []*p4ConfigV1.Counter
for _, counter := range counters {
if counter == nil {
continue
}
if slices.Contains(counterNamesInclude, counter.Preamble.Name) {
filteredCounters = append(filteredCounters, counter)
}
}
return filteredCounters
}
func (p *P4runtime) newP4RuntimeClient() error {
var tlscfg *tls.Config
var err error
if p.EnableTLS {
if tlscfg, err = p.ClientConfig.TLSConfig(); err != nil {
return err
}
}
conn, err := initConnection(p.Endpoint, tlscfg)
if err != nil {
return fmt.Errorf("cannot connect to the server: %w", err)
}
p.conn = conn
p.client = p4v1.NewP4RuntimeClient(conn)
return nil
}
func (p *P4runtime) readAllEntries(counterID uint32) ([]*p4v1.Entity, error) {
readRequest := &p4v1.ReadRequest{
DeviceId: p.DeviceID,
Entities: []*p4v1.Entity{{
Entity: &p4v1.Entity_CounterEntry{
CounterEntry: &p4v1.CounterEntry{
CounterId: counterID}}}}}
stream, err := p.client.Read(context.Background(), readRequest)
if err != nil {
return nil, err
}
rep, err := stream.Recv()
if err != nil && err != io.EOF {
return nil, err
}
return rep.Entities, nil
}
func init() {
inputs.Add("p4runtime", func() telegraf.Input {
p4runtime := &P4runtime{
DeviceID: defaultDeviceID,
}
return p4runtime
})
}

View File

@ -0,0 +1,128 @@
package p4runtime
import (
"context"
p4v1 "github.com/p4lang/p4runtime/go/p4/v1"
"google.golang.org/grpc"
)
type fakeP4RuntimeClient struct {
writeFn func(
ctx context.Context,
in *p4v1.WriteRequest,
opts ...grpc.CallOption,
) (*p4v1.WriteResponse, error)
readFn func(
ctx context.Context,
in *p4v1.ReadRequest,
opts ...grpc.CallOption,
) (p4v1.P4Runtime_ReadClient, error)
setForwardingPipelineConfigFn func(
ctx context.Context,
in *p4v1.SetForwardingPipelineConfigRequest,
opts ...grpc.CallOption,
) (*p4v1.SetForwardingPipelineConfigResponse, error)
getForwardingPipelineConfigFn func(
ctx context.Context,
in *p4v1.GetForwardingPipelineConfigRequest,
opts ...grpc.CallOption,
) (*p4v1.GetForwardingPipelineConfigResponse, error)
streamChannelFn func(
ctx context.Context,
opts ...grpc.CallOption,
) (p4v1.P4Runtime_StreamChannelClient, error)
capabilitiesFn func(
ctx context.Context,
in *p4v1.CapabilitiesRequest,
opts ...grpc.CallOption,
) (*p4v1.CapabilitiesResponse, error)
}
// fakeP4RuntimeClient implements the p4v1.P4RuntimeClient interface
var _ p4v1.P4RuntimeClient = &fakeP4RuntimeClient{}
func (c *fakeP4RuntimeClient) Write(
ctx context.Context,
in *p4v1.WriteRequest,
opts ...grpc.CallOption,
) (*p4v1.WriteResponse, error) {
if c.writeFn == nil {
panic("No mock defined for Write RPC")
}
return c.writeFn(ctx, in, opts...)
}
func (c *fakeP4RuntimeClient) Read(
ctx context.Context,
in *p4v1.ReadRequest,
opts ...grpc.CallOption,
) (p4v1.P4Runtime_ReadClient, error) {
if c.readFn == nil {
panic("No mock defined for Read RPC")
}
return c.readFn(ctx, in, opts...)
}
func (c *fakeP4RuntimeClient) SetForwardingPipelineConfig(
ctx context.Context,
in *p4v1.SetForwardingPipelineConfigRequest,
opts ...grpc.CallOption,
) (*p4v1.SetForwardingPipelineConfigResponse, error) {
if c.setForwardingPipelineConfigFn == nil {
panic("No mock defined for SetForwardingPipelineConfig RPC")
}
return c.setForwardingPipelineConfigFn(ctx, in, opts...)
}
func (c *fakeP4RuntimeClient) GetForwardingPipelineConfig(
ctx context.Context,
in *p4v1.GetForwardingPipelineConfigRequest,
opts ...grpc.CallOption,
) (*p4v1.GetForwardingPipelineConfigResponse, error) {
if c.getForwardingPipelineConfigFn == nil {
panic("No mock defined for GetForwardingPipelineConfig RPC")
}
return c.getForwardingPipelineConfigFn(ctx, in, opts...)
}
func (c *fakeP4RuntimeClient) StreamChannel(
ctx context.Context,
opts ...grpc.CallOption,
) (p4v1.P4Runtime_StreamChannelClient, error) {
if c.streamChannelFn == nil {
panic("No mock defined for StreamChannel")
}
return c.streamChannelFn(ctx, opts...)
}
func (c *fakeP4RuntimeClient) Capabilities(
ctx context.Context,
in *p4v1.CapabilitiesRequest,
opts ...grpc.CallOption,
) (*p4v1.CapabilitiesResponse, error) {
if c.capabilitiesFn == nil {
panic("No mock defined for Capabilities RPC")
}
return c.capabilitiesFn(ctx, in, opts...)
}
type fakeP4RuntimeReadClient struct {
grpc.ClientStream
recvFn func() (*p4v1.ReadResponse, error)
}
// fakeP4RuntimeReadClient implements the p4v1.P4Runtime_ReadClient interface
var _ p4v1.P4Runtime_ReadClient = &fakeP4RuntimeReadClient{}
func (c *fakeP4RuntimeReadClient) Recv() (*p4v1.ReadResponse, error) {
if c.recvFn == nil {
panic("No mock provided for Recv function")
}
return c.recvFn()
}

View File

@ -0,0 +1,650 @@
package p4runtime
import (
"context"
"errors"
"fmt"
"net"
"testing"
"time"
p4ConfigV1 "github.com/p4lang/p4runtime/go/p4/config/v1"
p4v1 "github.com/p4lang/p4runtime/go/p4/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
)
// CounterSpec available here https://github.com/p4lang/p4runtime/blob/main/proto/p4/config/v1/p4info.proto#L289
func createCounter(
name string,
id uint32,
unit p4ConfigV1.CounterSpec_Unit,
) *p4ConfigV1.Counter {
return &p4ConfigV1.Counter{
Preamble: &p4ConfigV1.Preamble{Name: name, Id: id},
Spec: &p4ConfigV1.CounterSpec{Unit: unit},
}
}
func createEntityCounterEntry(
counterID uint32,
index int64,
data *p4v1.CounterData,
) *p4v1.Entity_CounterEntry {
return &p4v1.Entity_CounterEntry{
CounterEntry: &p4v1.CounterEntry{
CounterId: counterID,
Index: &p4v1.Index{Index: index},
Data: data,
},
}
}
func NewTestP4RuntimeClient(
p4RuntimeClient *fakeP4RuntimeClient,
addr string,
) *P4runtime {
conn, _ := grpc.Dial(
addr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
return &P4runtime{
Endpoint: addr,
DeviceID: uint64(1),
Log: testutil.Logger{},
conn: conn,
client: p4RuntimeClient,
}
}
func TestInitDefault(t *testing.T) {
plugin := &P4runtime{Log: testutil.Logger{}}
require.NoError(t, plugin.Init())
require.Equal(t, "127.0.0.1:9559", plugin.Endpoint)
require.Equal(t, uint64(0), plugin.DeviceID)
require.Empty(t, plugin.CounterNamesInclude)
require.False(t, plugin.EnableTLS)
}
func TestErrorGetP4Info(t *testing.T) {
responses := []struct {
getForwardingPipelineConfigResponse *p4v1.GetForwardingPipelineConfigResponse
getForwardingPipelineConfigResponseError error
}{
{
getForwardingPipelineConfigResponse: nil,
getForwardingPipelineConfigResponseError: fmt.Errorf(
"error when retrieving forwarding pipeline config",
),
}, {
getForwardingPipelineConfigResponse: &p4v1.GetForwardingPipelineConfigResponse{
Config: nil,
},
getForwardingPipelineConfigResponseError: nil,
}, {
getForwardingPipelineConfigResponse: &p4v1.GetForwardingPipelineConfigResponse{
Config: &p4v1.ForwardingPipelineConfig{P4Info: nil},
},
getForwardingPipelineConfigResponseError: nil,
},
}
for _, response := range responses {
p4RtClient := &fakeP4RuntimeClient{
getForwardingPipelineConfigFn: func(
ctx context.Context,
in *p4v1.GetForwardingPipelineConfigRequest,
opts ...grpc.CallOption,
) (*p4v1.GetForwardingPipelineConfigResponse, error) {
return response.getForwardingPipelineConfigResponse, response.getForwardingPipelineConfigResponseError
},
}
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
plugin := NewTestP4RuntimeClient(p4RtClient, listener.Addr().String())
var acc testutil.Accumulator
require.Error(t, plugin.Gather(&acc))
}
}
func TestOneCounterRead(t *testing.T) {
tests := []struct {
forwardingPipelineConfig *p4v1.ForwardingPipelineConfig
EntityCounterEntry *p4v1.Entity_CounterEntry
expected []telegraf.Metric
}{
{
forwardingPipelineConfig: &p4v1.ForwardingPipelineConfig{
P4Info: &p4ConfigV1.P4Info{
Counters: []*p4ConfigV1.Counter{
createCounter("foo", 1111, p4ConfigV1.CounterSpec_BOTH),
},
PkgInfo: &p4ConfigV1.PkgInfo{Name: "P4Program"},
},
},
EntityCounterEntry: createEntityCounterEntry(
1111,
5,
&p4v1.CounterData{ByteCount: 5, PacketCount: 1},
),
expected: []telegraf.Metric{testutil.MustMetric(
"p4_runtime",
map[string]string{
"p4program_name": "P4Program",
"counter_name": "foo",
"counter_type": "BOTH",
},
map[string]interface{}{
"bytes": int64(5),
"packets": int64(1),
"counter_index": 5},
time.Unix(0, 0)),
},
}, {
forwardingPipelineConfig: &p4v1.ForwardingPipelineConfig{
P4Info: &p4ConfigV1.P4Info{
Counters: []*p4ConfigV1.Counter{
createCounter(
"foo",
2222,
p4ConfigV1.CounterSpec_BYTES,
),
},
PkgInfo: &p4ConfigV1.PkgInfo{Name: "P4Program"},
},
},
EntityCounterEntry: createEntityCounterEntry(
2222,
5,
&p4v1.CounterData{ByteCount: 5},
),
expected: []telegraf.Metric{testutil.MustMetric(
"p4_runtime",
map[string]string{
"p4program_name": "P4Program",
"counter_name": "foo",
"counter_type": "BYTES",
},
map[string]interface{}{
"bytes": int64(5),
"packets": int64(0),
"counter_index": 5},
time.Unix(0, 0)),
},
}, {
forwardingPipelineConfig: &p4v1.ForwardingPipelineConfig{
P4Info: &p4ConfigV1.P4Info{
Counters: []*p4ConfigV1.Counter{
createCounter(
"foo",
3333,
p4ConfigV1.CounterSpec_PACKETS,
),
},
PkgInfo: &p4ConfigV1.PkgInfo{Name: "P4Program"},
},
},
EntityCounterEntry: createEntityCounterEntry(
3333,
5,
&p4v1.CounterData{PacketCount: 1},
),
expected: []telegraf.Metric{testutil.MustMetric(
"p4_runtime",
map[string]string{
"p4program_name": "P4Program",
"counter_name": "foo",
"counter_type": "PACKETS",
},
map[string]interface{}{
"bytes": int64(0),
"packets": int64(1),
"counter_index": 5},
time.Unix(0, 0)),
},
}, {
forwardingPipelineConfig: &p4v1.ForwardingPipelineConfig{
P4Info: &p4ConfigV1.P4Info{
Counters: []*p4ConfigV1.Counter{
createCounter("foo", 4444, p4ConfigV1.CounterSpec_BOTH),
},
PkgInfo: &p4ConfigV1.PkgInfo{Name: "P4Program"},
},
},
EntityCounterEntry: createEntityCounterEntry(
4444,
5,
&p4v1.CounterData{},
),
expected: nil,
},
}
for _, tt := range tests {
p4RtReadClient := &fakeP4RuntimeReadClient{
recvFn: func() (*p4v1.ReadResponse, error) {
return &p4v1.ReadResponse{
Entities: []*p4v1.Entity{{Entity: tt.EntityCounterEntry}},
}, nil
},
}
p4RtClient := &fakeP4RuntimeClient{
readFn: func(ctx context.Context, in *p4v1.ReadRequest, opts ...grpc.CallOption) (p4v1.P4Runtime_ReadClient, error) {
return p4RtReadClient, nil
},
getForwardingPipelineConfigFn: func(
ctx context.Context,
in *p4v1.GetForwardingPipelineConfigRequest,
opts ...grpc.CallOption,
) (*p4v1.GetForwardingPipelineConfigResponse, error) {
return &p4v1.GetForwardingPipelineConfigResponse{
Config: tt.forwardingPipelineConfig,
}, nil
},
}
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
plugin := NewTestP4RuntimeClient(p4RtClient, listener.Addr().String())
var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc))
testutil.RequireMetricsEqual(
t,
tt.expected,
acc.GetTelegrafMetrics(),
testutil.IgnoreTime(),
)
}
}
func TestMultipleEntitiesSingleCounterRead(t *testing.T) {
totalNumOfEntriesArr := [3]int{2, 10, 100}
for _, totalNumOfEntries := range totalNumOfEntriesArr {
var expected []telegraf.Metric
fmt.Println(
"Running TestMultipleEntitiesSingleCounterRead with ",
totalNumOfEntries,
"totalNumOfCounters",
)
entities := make([]*p4v1.Entity, 0, totalNumOfEntries)
p4InfoCounters := make([]*p4ConfigV1.Counter, 0, totalNumOfEntries)
p4InfoCounters = append(
p4InfoCounters,
createCounter("foo", 0, p4ConfigV1.CounterSpec_BOTH),
)
for i := 0; i < totalNumOfEntries; i++ {
counterEntry := &p4v1.Entity{
Entity: createEntityCounterEntry(
0,
int64(i),
&p4v1.CounterData{
ByteCount: int64(10),
PacketCount: int64(10),
},
),
}
entities = append(entities, counterEntry)
expected = append(expected, testutil.MustMetric(
"p4_runtime",
map[string]string{
"p4program_name": "P4Program",
"counter_name": "foo",
"counter_type": "BOTH",
},
map[string]interface{}{
"bytes": int64(10),
"packets": int64(10),
"counter_index": i,
},
time.Unix(0, 0),
))
}
forwardingPipelineConfig := &p4v1.ForwardingPipelineConfig{
P4Info: &p4ConfigV1.P4Info{
Counters: p4InfoCounters,
PkgInfo: &p4ConfigV1.PkgInfo{Name: "P4Program"},
},
}
p4RtReadClient := &fakeP4RuntimeReadClient{
recvFn: func() (*p4v1.ReadResponse, error) {
return &p4v1.ReadResponse{Entities: entities}, nil
},
}
p4RtClient := &fakeP4RuntimeClient{
readFn: func(ctx context.Context, in *p4v1.ReadRequest, opts ...grpc.CallOption) (p4v1.P4Runtime_ReadClient, error) {
return p4RtReadClient, nil
},
getForwardingPipelineConfigFn: func(
ctx context.Context,
in *p4v1.GetForwardingPipelineConfigRequest,
opts ...grpc.CallOption,
) (*p4v1.GetForwardingPipelineConfigResponse, error) {
return &p4v1.GetForwardingPipelineConfigResponse{
Config: forwardingPipelineConfig,
}, nil
},
}
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
plugin := NewTestP4RuntimeClient(p4RtClient, listener.Addr().String())
var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc))
acc.Wait(totalNumOfEntries)
testutil.RequireMetricsEqual(
t,
expected,
acc.GetTelegrafMetrics(),
testutil.IgnoreTime(),
)
}
}
func TestSingleEntitiesMultipleCounterRead(t *testing.T) {
totalNumOfCountersArr := [3]int{2, 10, 100}
for _, totalNumOfCounters := range totalNumOfCountersArr {
var expected []telegraf.Metric
fmt.Println(
"Running TestSingleEntitiesMultipleCounterRead with ",
totalNumOfCounters,
"totalNumOfCounters",
)
p4InfoCounters := make([]*p4ConfigV1.Counter, 0, totalNumOfCounters)
for i := 1; i <= totalNumOfCounters; i++ {
counterName := fmt.Sprintf("foo%v", i)
p4InfoCounters = append(
p4InfoCounters,
createCounter(
counterName,
uint32(i),
p4ConfigV1.CounterSpec_BOTH,
),
)
expected = append(expected, testutil.MustMetric(
"p4_runtime",
map[string]string{
"p4program_name": "P4Program",
"counter_name": counterName,
"counter_type": "BOTH",
},
map[string]interface{}{
"bytes": int64(10),
"packets": int64(10),
"counter_index": 1,
},
time.Unix(0, 0),
))
}
forwardingPipelineConfig := &p4v1.ForwardingPipelineConfig{
P4Info: &p4ConfigV1.P4Info{
Counters: p4InfoCounters,
PkgInfo: &p4ConfigV1.PkgInfo{Name: "P4Program"},
},
}
p4RtClient := &fakeP4RuntimeClient{
readFn: func(ctx context.Context, in *p4v1.ReadRequest, opts ...grpc.CallOption) (p4v1.P4Runtime_ReadClient, error) {
counterID := in.Entities[0].GetCounterEntry().CounterId
return &fakeP4RuntimeReadClient{
recvFn: func() (*p4v1.ReadResponse, error) {
return &p4v1.ReadResponse{
Entities: []*p4v1.Entity{{
Entity: createEntityCounterEntry(
counterID,
1,
&p4v1.CounterData{
ByteCount: 10,
PacketCount: 10,
},
),
}},
}, nil
},
}, nil
},
getForwardingPipelineConfigFn: func(
ctx context.Context,
in *p4v1.GetForwardingPipelineConfigRequest,
opts ...grpc.CallOption,
) (*p4v1.GetForwardingPipelineConfigResponse, error) {
return &p4v1.GetForwardingPipelineConfigResponse{
Config: forwardingPipelineConfig,
}, nil
},
}
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
plugin := NewTestP4RuntimeClient(p4RtClient, listener.Addr().String())
var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc))
acc.Wait(totalNumOfCounters)
testutil.RequireMetricsEqual(
t,
expected,
acc.GetTelegrafMetrics(),
testutil.SortMetrics(),
testutil.IgnoreTime(),
)
}
}
func TestNoCountersAvailable(t *testing.T) {
forwardingPipelineConfig := &p4v1.ForwardingPipelineConfig{
P4Info: &p4ConfigV1.P4Info{Counters: []*p4ConfigV1.Counter{}},
}
p4RtClient := &fakeP4RuntimeClient{
getForwardingPipelineConfigFn: func(
ctx context.Context,
in *p4v1.GetForwardingPipelineConfigRequest,
opts ...grpc.CallOption,
) (*p4v1.GetForwardingPipelineConfigResponse, error) {
return &p4v1.GetForwardingPipelineConfigResponse{
Config: forwardingPipelineConfig,
}, nil
},
}
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
plugin := NewTestP4RuntimeClient(p4RtClient, listener.Addr().String())
var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc))
}
func TestFilterCounters(t *testing.T) {
forwardingPipelineConfig := &p4v1.ForwardingPipelineConfig{
P4Info: &p4ConfigV1.P4Info{
Counters: []*p4ConfigV1.Counter{
createCounter("foo", 1, p4ConfigV1.CounterSpec_BOTH),
},
PkgInfo: &p4ConfigV1.PkgInfo{Name: "P4Program"},
},
}
p4RtClient := &fakeP4RuntimeClient{
getForwardingPipelineConfigFn: func(
ctx context.Context,
in *p4v1.GetForwardingPipelineConfigRequest,
opts ...grpc.CallOption,
) (*p4v1.GetForwardingPipelineConfigResponse, error) {
return &p4v1.GetForwardingPipelineConfigResponse{
Config: forwardingPipelineConfig,
}, nil
},
}
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
plugin := NewTestP4RuntimeClient(p4RtClient, listener.Addr().String())
plugin.CounterNamesInclude = []string{"oof"}
var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc))
testutil.RequireMetricsEqual(
t,
nil,
acc.GetTelegrafMetrics(),
testutil.IgnoreTime(),
)
}
func TestFailReadCounterEntryFromEntry(t *testing.T) {
p4RtReadClient := &fakeP4RuntimeReadClient{
recvFn: func() (*p4v1.ReadResponse, error) {
return &p4v1.ReadResponse{
Entities: []*p4v1.Entity{{
Entity: &p4v1.Entity_TableEntry{
TableEntry: &p4v1.TableEntry{},
}}}}, nil
},
}
p4RtClient := &fakeP4RuntimeClient{
readFn: func(ctx context.Context, in *p4v1.ReadRequest, opts ...grpc.CallOption) (p4v1.P4Runtime_ReadClient, error) {
return p4RtReadClient, nil
},
getForwardingPipelineConfigFn: func(
ctx context.Context,
in *p4v1.GetForwardingPipelineConfigRequest,
opts ...grpc.CallOption,
) (*p4v1.GetForwardingPipelineConfigResponse, error) {
return &p4v1.GetForwardingPipelineConfigResponse{
Config: &p4v1.ForwardingPipelineConfig{
P4Info: &p4ConfigV1.P4Info{
Counters: []*p4ConfigV1.Counter{
createCounter(
"foo",
1111,
p4ConfigV1.CounterSpec_BOTH,
),
},
PkgInfo: &p4ConfigV1.PkgInfo{Name: "P4Program"},
},
},
}, nil
},
}
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
plugin := NewTestP4RuntimeClient(p4RtClient, listener.Addr().String())
var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc))
assert.Equal(
t,
acc.Errors[0],
errors.New("reading counter entry from entry table_entry:<> failed"),
)
testutil.RequireMetricsEqual(
t,
nil,
acc.GetTelegrafMetrics(),
testutil.IgnoreTime(),
)
}
func TestFailReadAllEntries(t *testing.T) {
p4RtClient := &fakeP4RuntimeClient{
readFn: func(ctx context.Context, in *p4v1.ReadRequest, opts ...grpc.CallOption) (p4v1.P4Runtime_ReadClient, error) {
return nil, errors.New("Connection error")
},
getForwardingPipelineConfigFn: func(
ctx context.Context,
in *p4v1.GetForwardingPipelineConfigRequest,
opts ...grpc.CallOption,
) (*p4v1.GetForwardingPipelineConfigResponse, error) {
return &p4v1.GetForwardingPipelineConfigResponse{
Config: &p4v1.ForwardingPipelineConfig{
P4Info: &p4ConfigV1.P4Info{
Counters: []*p4ConfigV1.Counter{
createCounter(
"foo",
1111,
p4ConfigV1.CounterSpec_BOTH,
),
},
PkgInfo: &p4ConfigV1.PkgInfo{Name: "P4Program"},
},
},
}, nil
},
}
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
plugin := NewTestP4RuntimeClient(p4RtClient, listener.Addr().String())
var acc testutil.Accumulator
require.NoError(t, plugin.Gather(&acc))
assert.Equal(
t,
acc.Errors[0],
fmt.Errorf("reading counter entries with ID=1111 failed with error: %w",
errors.New("Connection error")),
)
testutil.RequireMetricsEqual(
t,
nil,
acc.GetTelegrafMetrics(),
testutil.IgnoreTime(),
)
}
func TestFilterCounterNamesInclude(t *testing.T) {
counters := []*p4ConfigV1.Counter{
createCounter("foo", 1, p4ConfigV1.CounterSpec_BOTH),
createCounter("bar", 2, p4ConfigV1.CounterSpec_BOTH),
nil,
createCounter("", 3, p4ConfigV1.CounterSpec_BOTH),
}
counterNamesInclude := []string{"bar"}
filteredCounters := filterCounters(counters, counterNamesInclude)
assert.Equal(
t,
filteredCounters,
[]*p4ConfigV1.Counter{
createCounter("bar", 2, p4ConfigV1.CounterSpec_BOTH),
},
)
}

View File

@ -0,0 +1,23 @@
# P4Runtime telemetry input plugin
[[inputs.p4runtime]]
## Define the endpoint of P4Runtime gRPC server to collect metrics.
# endpoint = "127.0.0.1:9559"
## Set DeviceID required for Client Arbitration.
## https://p4.org/p4-spec/p4runtime/main/P4Runtime-Spec.html#sec-client-arbitration-and-controller-replication
# device_id = 1
## Filter counters by their names that should be observed.
## Example: counter_names_include=["ingressCounter", "egressCounter"]
# counter_names_include = []
## Optional TLS Config.
## Enable client-side TLS and define CA to authenticate the device.
# enable_tls = false
# tls_ca = "/etc/telegraf/ca.crt"
## Set minimal TLS version to accept by the client.
# tls_min_version = "TLS12"
## Use TLS but skip chain & host verification.
# insecure_skip_verify = true
## Define client-side TLS certificate & key to authenticate to the device.
# tls_cert = "/etc/telegraf/client.crt"
# tls_key = "/etc/telegraf/client.key"