diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index 8fcefe55f..ca8c0611c 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -105,6 +105,9 @@ following works: - github.com/hashicorp/golang-lru [Mozilla Public License 2.0](https://github.com/hashicorp/golang-lru/blob/master/LICENSE) - github.com/hashicorp/serf [Mozilla Public License 2.0](https://github.com/hashicorp/serf/blob/master/LICENSE) - github.com/influxdata/go-syslog [MIT License](https://github.com/influxdata/go-syslog/blob/develop/LICENSE) +- github.com/influxdata/influxdb-observability/common [MIT License](https://github.com/influxdata/influxdb-observability/blob/main/LICENSE) +- github.com/influxdata/influxdb-observability/otel2influx [MIT License](https://github.com/influxdata/influxdb-observability/blob/main/LICENSE) +- github.com/influxdata/influxdb-observability/otlp [MIT License](https://github.com/influxdata/influxdb-observability/blob/main/LICENSE) - github.com/influxdata/tail [MIT License](https://github.com/influxdata/tail/blob/master/LICENSE.txt) - github.com/influxdata/toml [MIT License](https://github.com/influxdata/toml/blob/master/LICENSE) - github.com/influxdata/wlog [MIT License](https://github.com/influxdata/wlog/blob/master/LICENSE) diff --git a/go.mod b/go.mod index 95d64c570..fc0ec4dea 100644 --- a/go.mod +++ b/go.mod @@ -73,6 +73,9 @@ require ( github.com/hashicorp/consul/api v1.6.0 github.com/hashicorp/go-msgpack v0.5.5 // indirect github.com/influxdata/go-syslog/v2 v2.0.1 + github.com/influxdata/influxdb-observability/common v0.0.0-20210429174543-86ae73cafd31 + github.com/influxdata/influxdb-observability/otel2influx v0.0.0-20210429174543-86ae73cafd31 + github.com/influxdata/influxdb-observability/otlp v0.0.0-20210429174543-86ae73cafd31 github.com/influxdata/tail v1.0.1-0.20200707181643-03a791b270e4 github.com/influxdata/toml v0.0.0-20190415235208-270119a8ce65 github.com/influxdata/wlog v0.0.0-20160411224016-7c63b0a71ef8 @@ -137,7 +140,7 @@ require ( golang.zx2c4.com/wireguard/wgctrl v0.0.0-20200205215550-e35592f146e4 google.golang.org/api v0.29.0 google.golang.org/genproto v0.0.0-20200815001618-f69a88009b70 - google.golang.org/grpc v1.33.1 + google.golang.org/grpc v1.37.0 gopkg.in/djherbis/times.v1 v1.2.0 gopkg.in/fatih/pool.v2 v2.0.0 // indirect gopkg.in/gorethink/gorethink.v3 v3.0.5 diff --git a/go.sum b/go.sum index 6470784f9..6ce13c192 100644 --- a/go.sum +++ b/go.sum @@ -235,6 +235,7 @@ github.com/cisco-ie/nx-telemetry-proto v0.0.0-20190531143454-82441e232cf6/go.mod github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4dUb/I5gc9Hdhagfvm9+RyrPryS/auMzxE= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I= github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= @@ -315,6 +316,7 @@ github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4s github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= +github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/evanphx/json-patch v0.0.0-20200808040245-162e5629780b/go.mod h1:NAJj0yf/KaRKURN6nyi7A9IZydMivZEm9oQLWNjfKDc= github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= @@ -526,6 +528,7 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.1 h1:jAbXjIeW2ZSW2AwFxlGTDoc2CjI2XujLkV3ArsZFCvc= github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx4u74HPM= @@ -667,6 +670,14 @@ github.com/influxdata/flux v0.65.0/go.mod h1:BwN2XG2lMszOoquQaFdPET8FRQfrXiZsWmc github.com/influxdata/go-syslog/v2 v2.0.1 h1:l44S4l4Q8MhGQcoOxJpbo+QQYxJqp0vdgIVHh4+DO0s= github.com/influxdata/go-syslog/v2 v2.0.1/go.mod h1:hjvie1UTaD5E1fTnDmxaCw8RRDrT4Ve+XHr5O2dKSCo= github.com/influxdata/influxdb v1.8.2/go.mod h1:SIzcnsjaHRFpmlxpJ4S3NT64qtEKYweNTUMb/vh0OMQ= +github.com/influxdata/influxdb-observability/common v0.0.0-20210428231528-a010f53e3e02/go.mod h1:PMngVYsW4uwtzIVmj0ZfLL9UIOwo7Vs+09QHkoYMZv8= +github.com/influxdata/influxdb-observability/common v0.0.0-20210429174543-86ae73cafd31 h1:pfWcpiOrWLJvicIpCiFR8vqrkVbAuKUttWvQDmSlfUM= +github.com/influxdata/influxdb-observability/common v0.0.0-20210429174543-86ae73cafd31/go.mod h1:PMngVYsW4uwtzIVmj0ZfLL9UIOwo7Vs+09QHkoYMZv8= +github.com/influxdata/influxdb-observability/otel2influx v0.0.0-20210429174543-86ae73cafd31 h1:uiRNaaczvfx837c6OSH9Q6H4td1cWnR9X0pveHTHeYs= +github.com/influxdata/influxdb-observability/otel2influx v0.0.0-20210429174543-86ae73cafd31/go.mod h1:43guzIbK1oO/UMBuMCqG++LHZqLhMbWxqU4H1Lgpf28= +github.com/influxdata/influxdb-observability/otlp v0.0.0-20210428231528-a010f53e3e02/go.mod h1:J2N8KOAXSXgDhLjYWvjbxPhrgq3nVQ/npzW8l8T77Qo= +github.com/influxdata/influxdb-observability/otlp v0.0.0-20210429174543-86ae73cafd31 h1:Cf6WCNdgyxWv3x3uMehlexHAkWO3AZTAv5Q2yo0WQ0s= +github.com/influxdata/influxdb-observability/otlp v0.0.0-20210429174543-86ae73cafd31/go.mod h1:23SLY21Ag84PC0TbvVhdKoOVvrQF6nq5j5sFOW09ZBU= github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/influxdata/influxql v1.1.0/go.mod h1:KpVI7okXjK6PRi3Z5B+mtKZli+R1DnZgb3N+tzevNgo= github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e/go.mod h1:4kt73NQhadE3daL3WhR5EJ/J2ocX0PZzwxQ0gXJ7oFE= @@ -1452,6 +1463,7 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY golang.org/x/tools v0.0.0-20200822203824-307de81be3f4/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20201022035929-9cf592e881e9/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -1518,6 +1530,7 @@ google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfG google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto v0.0.0-20200815001618-f69a88009b70 h1:wboULUXGF3c5qdUnKp+6gLAccE6PRpa/czkYvQ4UXv8= google.golang.org/genproto v0.0.0-20200815001618-f69a88009b70/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= @@ -1535,8 +1548,9 @@ google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8 google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60= google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= -google.golang.org/grpc v1.33.1 h1:DGeFlSan2f+WEtCERJ4J9GJWk15TxUi8QGagfI87Xyc= google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= +google.golang.org/grpc v1.37.0 h1:uSZWeQJX5j11bIQ4AJoj+McDBo29cY1MCoC1wO3ts+c= +google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -1547,6 +1561,7 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 3beb30cb4..aa273a4aa 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -132,6 +132,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/openldap" _ "github.com/influxdata/telegraf/plugins/inputs/openntpd" _ "github.com/influxdata/telegraf/plugins/inputs/opensmtpd" + _ "github.com/influxdata/telegraf/plugins/inputs/opentelemetry" _ "github.com/influxdata/telegraf/plugins/inputs/openweathermap" _ "github.com/influxdata/telegraf/plugins/inputs/passenger" _ "github.com/influxdata/telegraf/plugins/inputs/pf" diff --git a/plugins/inputs/opentelemetry/README.md b/plugins/inputs/opentelemetry/README.md new file mode 100644 index 000000000..9cb8f96eb --- /dev/null +++ b/plugins/inputs/opentelemetry/README.md @@ -0,0 +1,78 @@ +# OpenTelemetry Input Plugin + +This plugin receives traces, metrics and logs from [OpenTelemetry](https://opentelemetry.io) clients and agents via gRPC. + +### Configuration + +```toml +[[inputs.opentelemetry]] + ## Override the OpenTelemetry gRPC service address:port + # service_address = "0.0.0.0:4317" + + ## Override the default request timeout + # timeout = "5s" + + ## Select a schema for metrics: "prometheus-v1" or "prometheus-v2" + ## For more information about the alternatives, read the Prometheus input + ## plugin notes. + # metrics_schema = "prometheus-v1" +``` + +#### Schema + +The OpenTelemetry->InfluxDB conversion [schema](https://github.com/influxdata/influxdb-observability/blob/main/docs/index.md) +and [implementation](https://github.com/influxdata/influxdb-observability/tree/main/otel2influx) +are hosted at https://github.com/influxdata/influxdb-observability . + +Spans are stored in measurement `spans`. +Logs are stored in measurement `logs`. + +For metrics, two output schemata exist. +Metrics received with `metrics_schema=prometheus-v1` are assigned measurement from the OTel field `Metric.name`. +Metrics received with `metrics_schema=prometheus-v2` are stored in measurement `prometheus`. + +### Example Output + +#### Tracing Spans +``` +spans end_time_unix_nano="2021-02-19 20:50:25.6893952 +0000 UTC",instrumentation_library_name="tracegen",kind="SPAN_KIND_INTERNAL",name="okey-dokey",net.peer.ip="1.2.3.4",parent_span_id="d5270e78d85f570f",peer.service="tracegen-client",service.name="tracegen",span.kind="server",span_id="4c28227be6a010e1",status_code="STATUS_CODE_OK",trace_id="7d4854815225332c9834e6dbf85b9380" 1613767825689169000 +spans end_time_unix_nano="2021-02-19 20:50:25.6893952 +0000 UTC",instrumentation_library_name="tracegen",kind="SPAN_KIND_INTERNAL",name="lets-go",net.peer.ip="1.2.3.4",peer.service="tracegen-server",service.name="tracegen",span.kind="client",span_id="d5270e78d85f570f",status_code="STATUS_CODE_OK",trace_id="7d4854815225332c9834e6dbf85b9380" 1613767825689135000 +spans end_time_unix_nano="2021-02-19 20:50:25.6895667 +0000 UTC",instrumentation_library_name="tracegen",kind="SPAN_KIND_INTERNAL",name="okey-dokey",net.peer.ip="1.2.3.4",parent_span_id="b57e98af78c3399b",peer.service="tracegen-client",service.name="tracegen",span.kind="server",span_id="a0643a156d7f9f7f",status_code="STATUS_CODE_OK",trace_id="fd6b8bb5965e726c94978c644962cdc8" 1613767825689388000 +spans end_time_unix_nano="2021-02-19 20:50:25.6895667 +0000 UTC",instrumentation_library_name="tracegen",kind="SPAN_KIND_INTERNAL",name="lets-go",net.peer.ip="1.2.3.4",peer.service="tracegen-server",service.name="tracegen",span.kind="client",span_id="b57e98af78c3399b",status_code="STATUS_CODE_OK",trace_id="fd6b8bb5965e726c94978c644962cdc8" 1613767825689303300 +spans end_time_unix_nano="2021-02-19 20:50:25.6896741 +0000 UTC",instrumentation_library_name="tracegen",kind="SPAN_KIND_INTERNAL",name="okey-dokey",net.peer.ip="1.2.3.4",parent_span_id="6a8e6a0edcc1c966",peer.service="tracegen-client",service.name="tracegen",span.kind="server",span_id="d68f7f3b41eb8075",status_code="STATUS_CODE_OK",trace_id="651dadde186b7834c52b13a28fc27bea" 1613767825689480300 +``` + +### Metrics - `prometheus-v1` +``` +cpu_temp,foo=bar gauge=87.332 +http_requests_total,method=post,code=200 counter=1027 +http_requests_total,method=post,code=400 counter=3 +http_request_duration_seconds 0.05=24054,0.1=33444,0.2=100392,0.5=129389,1=133988,sum=53423,count=144320 +rpc_duration_seconds 0.01=3102,0.05=3272,0.5=4773,0.9=9001,0.99=76656,sum=1.7560473e+07,count=2693 +``` + +### Metrics - `prometheus-v2` +``` +prometheus,foo=bar cpu_temp=87.332 +prometheus,method=post,code=200 http_requests_total=1027 +prometheus,method=post,code=400 http_requests_total=3 +prometheus,le=0.05 http_request_duration_seconds_bucket=24054 +prometheus,le=0.1 http_request_duration_seconds_bucket=33444 +prometheus,le=0.2 http_request_duration_seconds_bucket=100392 +prometheus,le=0.5 http_request_duration_seconds_bucket=129389 +prometheus,le=1 http_request_duration_seconds_bucket=133988 +prometheus http_request_duration_seconds_count=144320,http_request_duration_seconds_sum=53423 +prometheus,quantile=0.01 rpc_duration_seconds=3102 +prometheus,quantile=0.05 rpc_duration_seconds=3272 +prometheus,quantile=0.5 rpc_duration_seconds=4773 +prometheus,quantile=0.9 rpc_duration_seconds=9001 +prometheus,quantile=0.99 rpc_duration_seconds=76656 +prometheus rpc_duration_seconds_count=1.7560473e+07,rpc_duration_seconds_sum=2693 +``` + +### Logs +``` +logs fluent.tag="fluent.info",pid=18i,ppid=9i,worker=0i 1613769568895331700 +logs fluent.tag="fluent.debug",instance=1720i,queue_size=0i,stage_size=0i 1613769568895697200 +logs fluent.tag="fluent.info",worker=0i 1613769568896515100 +``` diff --git a/plugins/inputs/opentelemetry/grpc_services.go b/plugins/inputs/opentelemetry/grpc_services.go new file mode 100644 index 000000000..4045f8b60 --- /dev/null +++ b/plugins/inputs/opentelemetry/grpc_services.go @@ -0,0 +1,94 @@ +package opentelemetry + +import ( + "context" + "fmt" + + "github.com/influxdata/influxdb-observability/common" + "github.com/influxdata/influxdb-observability/otel2influx" + otlpcollectorlogs "github.com/influxdata/influxdb-observability/otlp/collector/logs/v1" + otlpcollectormetrics "github.com/influxdata/influxdb-observability/otlp/collector/metrics/v1" + otlpcollectortrace "github.com/influxdata/influxdb-observability/otlp/collector/trace/v1" +) + +type traceService struct { + otlpcollectortrace.UnimplementedTraceServiceServer + + converter *otel2influx.OtelTracesToLineProtocol + writer *writeToAccumulator +} + +func newTraceService(logger common.Logger, writer *writeToAccumulator) *traceService { + converter := otel2influx.NewOtelTracesToLineProtocol(logger) + return &traceService{ + converter: converter, + writer: writer, + } +} + +func (s *traceService) Export(ctx context.Context, req *otlpcollectortrace.ExportTraceServiceRequest) (*otlpcollectortrace.ExportTraceServiceResponse, error) { + err := s.converter.WriteTraces(ctx, req.ResourceSpans, s.writer) + if err != nil { + return nil, err + } + return &otlpcollectortrace.ExportTraceServiceResponse{}, nil +} + +type metricsService struct { + otlpcollectormetrics.UnimplementedMetricsServiceServer + + converter *otel2influx.OtelMetricsToLineProtocol + writer *writeToAccumulator +} + +var metricsSchemata = map[string]otel2influx.MetricsSchema{ + "prometheus-v1": otel2influx.MetricsSchemaTelegrafPrometheusV1, + "prometheus-v2": otel2influx.MetricsSchemaTelegrafPrometheusV2, +} + +func newMetricsService(logger common.Logger, writer *writeToAccumulator, schema string) (*metricsService, error) { + ms, found := metricsSchemata[schema] + if !found { + return nil, fmt.Errorf("schema '%s' not recognized", schema) + } + + converter, err := otel2influx.NewOtelMetricsToLineProtocol(logger, ms) + if err != nil { + return nil, err + } + return &metricsService{ + converter: converter, + writer: writer, + }, nil +} + +func (s *metricsService) Export(ctx context.Context, req *otlpcollectormetrics.ExportMetricsServiceRequest) (*otlpcollectormetrics.ExportMetricsServiceResponse, error) { + err := s.converter.WriteMetrics(ctx, req.ResourceMetrics, s.writer) + if err != nil { + return nil, err + } + return &otlpcollectormetrics.ExportMetricsServiceResponse{}, nil +} + +type logsService struct { + otlpcollectorlogs.UnimplementedLogsServiceServer + + converter *otel2influx.OtelLogsToLineProtocol + writer *writeToAccumulator +} + +func newLogsService(logger common.Logger, writer *writeToAccumulator) *logsService { + converter := otel2influx.NewOtelLogsToLineProtocol(logger) + return &logsService{ + converter: converter, + writer: writer, + } +} + +func (s *logsService) Export(ctx context.Context, req *otlpcollectorlogs.ExportLogsServiceRequest) (*otlpcollectorlogs.ExportLogsServiceResponse, error) { + err := s.converter.WriteLogs(ctx, req.ResourceLogs, s.writer) + if err != nil { + return nil, err + } + return &otlpcollectorlogs.ExportLogsServiceResponse{}, nil +} diff --git a/plugins/inputs/opentelemetry/logger.go b/plugins/inputs/opentelemetry/logger.go new file mode 100644 index 000000000..3db3621bc --- /dev/null +++ b/plugins/inputs/opentelemetry/logger.go @@ -0,0 +1,16 @@ +package opentelemetry + +import ( + "strings" + + "github.com/influxdata/telegraf" +) + +type otelLogger struct { + telegraf.Logger +} + +func (l otelLogger) Debug(msg string, kv ...interface{}) { + format := msg + strings.Repeat(" %s=%q", len(kv)/2) + l.Logger.Debugf(format, kv...) +} diff --git a/plugins/inputs/opentelemetry/opentelemetry.go b/plugins/inputs/opentelemetry/opentelemetry.go new file mode 100644 index 000000000..cf2f6de08 --- /dev/null +++ b/plugins/inputs/opentelemetry/opentelemetry.go @@ -0,0 +1,101 @@ +package opentelemetry + +import ( + "fmt" + "net" + "sync" + "time" + + otlpcollectorlogs "github.com/influxdata/influxdb-observability/otlp/collector/logs/v1" + otlpcollectormetrics "github.com/influxdata/influxdb-observability/otlp/collector/metrics/v1" + otlpcollectortrace "github.com/influxdata/influxdb-observability/otlp/collector/trace/v1" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/plugins/inputs" + "google.golang.org/grpc" +) + +type OpenTelemetry struct { + ServiceAddress string `toml:"service_address"` + Timeout config.Duration `toml:"timeout"` + + MetricsSchema string `toml:"metrics_schema"` + + Log telegraf.Logger `toml:"-"` + + grpcServer *grpc.Server + + wg sync.WaitGroup +} + +const sampleConfig = ` + ## Override the OpenTelemetry gRPC service address:port + # service_address = "0.0.0.0:4317" + + ## Override the default request timeout + # timeout = "5s" + + ## Select a schema for metrics: prometheus-v1 or prometheus-v2 + ## For more information about the alternatives, read the Prometheus input + ## plugin notes. + # metrics_schema = "prometheus-v1" +` + +func (o *OpenTelemetry) SampleConfig() string { + return sampleConfig +} + +func (o *OpenTelemetry) Description() string { + return "Receive OpenTelemetry traces, metrics, and logs over gRPC" +} + +func (o *OpenTelemetry) Gather(_ telegraf.Accumulator) error { + return nil +} + +func (o *OpenTelemetry) Start(accumulator telegraf.Accumulator) error { + listener, err := net.Listen("tcp", o.ServiceAddress) + if err != nil { + return err + } + + logger := &otelLogger{o.Log} + influxWriter := &writeToAccumulator{accumulator} + o.grpcServer = grpc.NewServer() + + otlpcollectortrace.RegisterTraceServiceServer(o.grpcServer, newTraceService(logger, influxWriter)) + ms, err := newMetricsService(logger, influxWriter, o.MetricsSchema) + if err != nil { + return err + } + otlpcollectormetrics.RegisterMetricsServiceServer(o.grpcServer, ms) + otlpcollectorlogs.RegisterLogsServiceServer(o.grpcServer, newLogsService(logger, influxWriter)) + + o.wg.Add(1) + go func() { + if err := o.grpcServer.Serve(listener); err != nil { + accumulator.AddError(fmt.Errorf("failed to stop OpenTelemetry gRPC service: %w", err)) + } + o.wg.Done() + }() + + return nil +} + +func (o *OpenTelemetry) Stop() { + if o.grpcServer != nil { + o.grpcServer.Stop() + } + + o.wg.Wait() +} + +func init() { + inputs.Add("opentelemetry", func() telegraf.Input { + return &OpenTelemetry{ + ServiceAddress: "0.0.0.0:4317", + Timeout: config.Duration(5 * time.Second), + MetricsSchema: "prometheus-v1", + } + }) +} diff --git a/plugins/inputs/opentelemetry/writer.go b/plugins/inputs/opentelemetry/writer.go new file mode 100644 index 000000000..69b627e38 --- /dev/null +++ b/plugins/inputs/opentelemetry/writer.go @@ -0,0 +1,32 @@ +package opentelemetry + +import ( + "context" + "fmt" + "time" + + "github.com/influxdata/influxdb-observability/otel2influx" + "github.com/influxdata/telegraf" +) + +type writeToAccumulator struct { + accumulator telegraf.Accumulator +} + +func (w *writeToAccumulator) WritePoint(_ context.Context, measurement string, tags map[string]string, fields map[string]interface{}, ts time.Time, vType otel2influx.InfluxWriterValueType) error { + switch vType { + case otel2influx.InfluxWriterValueTypeUntyped: + w.accumulator.AddFields(measurement, fields, tags, ts) + case otel2influx.InfluxWriterValueTypeGauge: + w.accumulator.AddGauge(measurement, fields, tags, ts) + case otel2influx.InfluxWriterValueTypeSum: + w.accumulator.AddCounter(measurement, fields, tags, ts) + case otel2influx.InfluxWriterValueTypeHistogram: + w.accumulator.AddHistogram(measurement, fields, tags, ts) + case otel2influx.InfluxWriterValueTypeSummary: + w.accumulator.AddSummary(measurement, fields, tags, ts) + default: + return fmt.Errorf("unrecognized InfluxWriterValueType %q", vType) + } + return nil +}