feat(parsers.avro): Allow connection to https schema registry (#13903)
This commit is contained in:
parent
a4631a2cfb
commit
9db814d1c5
|
|
@ -31,10 +31,15 @@ The message is supposed to be encoded as follows:
|
||||||
## Supported values are "binary" (default) and "json"
|
## Supported values are "binary" (default) and "json"
|
||||||
# avro_format = "binary"
|
# avro_format = "binary"
|
||||||
|
|
||||||
## Url of the schema registry; exactly one of schema registry and
|
## URL of the schema registry which may contain username and password in the
|
||||||
## schema must be set
|
## form http[s]://[username[:password]@]<host>[:port]
|
||||||
|
## NOTE: Exactly one of schema registry and schema must be set
|
||||||
avro_schema_registry = "http://localhost:8081"
|
avro_schema_registry = "http://localhost:8081"
|
||||||
|
|
||||||
|
## Path to the schema registry certificate. Should be specified only if
|
||||||
|
## required for connection to the schema registry.
|
||||||
|
# avro_schema_registry_cert = "/etc/telegraf/ca_cert.crt"
|
||||||
|
|
||||||
## Schema string; exactly one of schema registry and schema must be set
|
## Schema string; exactly one of schema registry and schema must be set
|
||||||
#avro_schema = '''
|
#avro_schema = '''
|
||||||
# {
|
# {
|
||||||
|
|
|
||||||
|
|
@ -26,6 +26,7 @@ import (
|
||||||
type Parser struct {
|
type Parser struct {
|
||||||
MetricName string `toml:"metric_name"`
|
MetricName string `toml:"metric_name"`
|
||||||
SchemaRegistry string `toml:"avro_schema_registry"`
|
SchemaRegistry string `toml:"avro_schema_registry"`
|
||||||
|
CaCertPath string `toml:"avro_schema_registry_cert"`
|
||||||
Schema string `toml:"avro_schema"`
|
Schema string `toml:"avro_schema"`
|
||||||
Format string `toml:"avro_format"`
|
Format string `toml:"avro_format"`
|
||||||
Measurement string `toml:"avro_measurement"`
|
Measurement string `toml:"avro_measurement"`
|
||||||
|
|
@ -62,7 +63,11 @@ func (p *Parser) Init() error {
|
||||||
return fmt.Errorf("invalid timestamp format '%v'", p.TimestampFormat)
|
return fmt.Errorf("invalid timestamp format '%v'", p.TimestampFormat)
|
||||||
}
|
}
|
||||||
if p.SchemaRegistry != "" {
|
if p.SchemaRegistry != "" {
|
||||||
p.registryObj = newSchemaRegistry(p.SchemaRegistry)
|
registry, err := newSchemaRegistry(p.SchemaRegistry, p.CaCertPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error connecting to the schema registry %q: %w", p.SchemaRegistry, err)
|
||||||
|
}
|
||||||
|
p.registryObj = registry
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,14 @@
|
||||||
package avro
|
package avro
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/tls"
|
||||||
|
"crypto/x509"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/linkedin/goavro/v2"
|
"github.com/linkedin/goavro/v2"
|
||||||
)
|
)
|
||||||
|
|
@ -14,21 +19,75 @@ type schemaAndCodec struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type schemaRegistry struct {
|
type schemaRegistry struct {
|
||||||
url string
|
url string
|
||||||
cache map[int]*schemaAndCodec
|
username string
|
||||||
|
password string
|
||||||
|
cache map[int]*schemaAndCodec
|
||||||
|
client *http.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
const schemaByID = "%s/schemas/ids/%d"
|
const schemaByID = "%s/schemas/ids/%d"
|
||||||
|
|
||||||
func newSchemaRegistry(url string) *schemaRegistry {
|
func newSchemaRegistry(addr string, caCertPath string) (*schemaRegistry, error) {
|
||||||
return &schemaRegistry{url: url, cache: make(map[int]*schemaAndCodec)}
|
caCert, err := os.ReadFile(caCertPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var client *http.Client
|
||||||
|
var tlsCfg *tls.Config
|
||||||
|
if caCertPath != "" {
|
||||||
|
caCertPool := x509.NewCertPool()
|
||||||
|
caCertPool.AppendCertsFromPEM(caCert)
|
||||||
|
tlsCfg = &tls.Config{
|
||||||
|
RootCAs: caCertPool,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
client = &http.Client{
|
||||||
|
Transport: &http.Transport{
|
||||||
|
TLSClientConfig: tlsCfg,
|
||||||
|
MaxIdleConns: 10,
|
||||||
|
IdleConnTimeout: 90 * time.Second,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
u, err := url.Parse(addr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("parsing registry URL failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var username, password string
|
||||||
|
if u.User != nil {
|
||||||
|
username = u.User.Username()
|
||||||
|
password, _ = u.User.Password()
|
||||||
|
}
|
||||||
|
|
||||||
|
registry := &schemaRegistry{
|
||||||
|
url: u.String(),
|
||||||
|
username: username,
|
||||||
|
password: password,
|
||||||
|
cache: make(map[int]*schemaAndCodec),
|
||||||
|
client: client,
|
||||||
|
}
|
||||||
|
|
||||||
|
return registry, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sr *schemaRegistry) getSchemaAndCodec(id int) (*schemaAndCodec, error) {
|
func (sr *schemaRegistry) getSchemaAndCodec(id int) (*schemaAndCodec, error) {
|
||||||
if v, ok := sr.cache[id]; ok {
|
if v, ok := sr.cache[id]; ok {
|
||||||
return v, nil
|
return v, nil
|
||||||
}
|
}
|
||||||
resp, err := http.Get(fmt.Sprintf(schemaByID, sr.url, id))
|
|
||||||
|
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf(schemaByID, sr.url, id), nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if sr.username != "" {
|
||||||
|
req.SetBasicAuth(sr.username, sr.password)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := sr.client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue