chore: Fix linter findings for errorlint (part3) (#12704)
Co-authored-by: Pawel Zak <Pawel Zak>
This commit is contained in:
parent
f7949ca68a
commit
39d6b1d5cb
|
|
@ -83,17 +83,17 @@ func (a *Amon) Write(metrics []telegraf.Metric) error {
|
||||||
copy(ts.Series, tempSeries[0:])
|
copy(ts.Series, tempSeries[0:])
|
||||||
tsBytes, err := json.Marshal(ts)
|
tsBytes, err := json.Marshal(ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to marshal TimeSeries, %s", err.Error())
|
return fmt.Errorf("unable to marshal TimeSeries: %w", err)
|
||||||
}
|
}
|
||||||
req, err := http.NewRequest("POST", a.authenticatedURL(), bytes.NewBuffer(tsBytes))
|
req, err := http.NewRequest("POST", a.authenticatedURL(), bytes.NewBuffer(tsBytes))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to create http.Request, %s", err.Error())
|
return fmt.Errorf("unable to create http.Request: %w", err)
|
||||||
}
|
}
|
||||||
req.Header.Add("Content-Type", "application/json")
|
req.Header.Add("Content-Type", "application/json")
|
||||||
|
|
||||||
resp, err := a.client.Do(req)
|
resp, err := a.client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error POSTing metrics, %s", err.Error())
|
return fmt.Errorf("error POSTing metrics: %w", err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
|
@ -113,7 +113,7 @@ func buildMetrics(m telegraf.Metric) (map[string]Point, error) {
|
||||||
for k, v := range m.Fields() {
|
for k, v := range m.Fields() {
|
||||||
var p Point
|
var p Point
|
||||||
if err := p.setValue(v); err != nil {
|
if err := p.setValue(v); err != nil {
|
||||||
return ms, fmt.Errorf("unable to extract value from Fields, %s", err.Error())
|
return ms, fmt.Errorf("unable to extract value from Fields: %w", err)
|
||||||
}
|
}
|
||||||
p[0] = float64(m.Time().Unix())
|
p[0] = float64(m.Time().Unix())
|
||||||
ms[k] = p
|
ms[k] = p
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@ package amqp
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
_ "embed"
|
_ "embed"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
@ -161,7 +162,8 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error {
|
||||||
// If this is the first attempt to publish and the connection is
|
// If this is the first attempt to publish and the connection is
|
||||||
// closed, try to reconnect and retry once.
|
// closed, try to reconnect and retry once.
|
||||||
|
|
||||||
if aerr, ok := err.(*amqp.Error); first && ok && aerr == amqp.ErrClosed {
|
var aerr *amqp.Error
|
||||||
|
if first && errors.As(err, &aerr) && errors.Is(aerr, amqp.ErrClosed) {
|
||||||
q.client = nil
|
q.client = nil
|
||||||
err := q.publish(key, body)
|
err := q.publish(key, body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -70,7 +70,7 @@ func newClient(config *ClientConfig) (*client, error) {
|
||||||
|
|
||||||
channel, err := client.conn.Channel()
|
channel, err := client.conn.Channel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error opening channel: %v", err)
|
return nil, fmt.Errorf("error opening channel: %w", err)
|
||||||
}
|
}
|
||||||
client.channel = channel
|
client.channel = channel
|
||||||
|
|
||||||
|
|
@ -110,7 +110,7 @@ func (c *client) DeclareExchange() error {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error declaring exchange: %v", err)
|
return fmt.Errorf("error declaring exchange: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -139,7 +139,7 @@ func (c *client) Close() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
err := c.conn.Close()
|
err := c.conn.Close()
|
||||||
if err != nil && err != amqp.ErrClosed {
|
if err != nil && !errors.Is(err, amqp.ErrClosed) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
|
|
@ -190,12 +190,12 @@ func (adx *AzureDataExplorer) getMetricIngestor(ctx context.Context, tableName s
|
||||||
|
|
||||||
if ingestor == nil {
|
if ingestor == nil {
|
||||||
if err := adx.createAzureDataExplorerTable(ctx, tableName); err != nil {
|
if err := adx.createAzureDataExplorerTable(ctx, tableName); err != nil {
|
||||||
return nil, fmt.Errorf("creating table for %q failed: %v", tableName, err)
|
return nil, fmt.Errorf("creating table for %q failed: %w", tableName, err)
|
||||||
}
|
}
|
||||||
//create a new ingestor client for the table
|
//create a new ingestor client for the table
|
||||||
tempIngestor, err := createIngestorByTable(adx.kustoClient, adx.Database, tableName, adx.IngestionType)
|
tempIngestor, err := createIngestorByTable(adx.kustoClient, adx.Database, tableName, adx.IngestionType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("creating ingestor for %q failed: %v", tableName, err)
|
return nil, fmt.Errorf("creating ingestor for %q failed: %w", tableName, err)
|
||||||
}
|
}
|
||||||
adx.metricIngestors[tableName] = tempIngestor
|
adx.metricIngestors[tableName] = tempIngestor
|
||||||
adx.Log.Debugf("Ingestor for table %s created", tableName)
|
adx.Log.Debugf("Ingestor for table %s created", tableName)
|
||||||
|
|
@ -225,21 +225,21 @@ func (adx *AzureDataExplorer) createAzureDataExplorerTable(ctx context.Context,
|
||||||
|
|
||||||
func (adx *AzureDataExplorer) Init() error {
|
func (adx *AzureDataExplorer) Init() error {
|
||||||
if adx.Endpoint == "" {
|
if adx.Endpoint == "" {
|
||||||
return errors.New("Endpoint configuration cannot be empty")
|
return errors.New("endpoint configuration cannot be empty")
|
||||||
}
|
}
|
||||||
if adx.Database == "" {
|
if adx.Database == "" {
|
||||||
return errors.New("Database configuration cannot be empty")
|
return errors.New("database configuration cannot be empty")
|
||||||
}
|
}
|
||||||
|
|
||||||
adx.MetricsGrouping = strings.ToLower(adx.MetricsGrouping)
|
adx.MetricsGrouping = strings.ToLower(adx.MetricsGrouping)
|
||||||
if adx.MetricsGrouping == singleTable && adx.TableName == "" {
|
if adx.MetricsGrouping == singleTable && adx.TableName == "" {
|
||||||
return errors.New("Table name cannot be empty for SingleTable metrics grouping type")
|
return errors.New("table name cannot be empty for SingleTable metrics grouping type")
|
||||||
}
|
}
|
||||||
if adx.MetricsGrouping == "" {
|
if adx.MetricsGrouping == "" {
|
||||||
adx.MetricsGrouping = tablePerMetric
|
adx.MetricsGrouping = tablePerMetric
|
||||||
}
|
}
|
||||||
if !(adx.MetricsGrouping == singleTable || adx.MetricsGrouping == tablePerMetric) {
|
if !(adx.MetricsGrouping == singleTable || adx.MetricsGrouping == tablePerMetric) {
|
||||||
return errors.New("Metrics grouping type is not valid")
|
return errors.New("metrics grouping type is not valid")
|
||||||
}
|
}
|
||||||
|
|
||||||
if adx.IngestionType == "" {
|
if adx.IngestionType == "" {
|
||||||
|
|
|
||||||
|
|
@ -302,7 +302,7 @@ func TestInitBlankEndpointData(t *testing.T) {
|
||||||
|
|
||||||
errorInit := plugin.Init()
|
errorInit := plugin.Init()
|
||||||
require.Error(t, errorInit)
|
require.Error(t, errorInit)
|
||||||
require.Equal(t, "Endpoint configuration cannot be empty", errorInit.Error())
|
require.Equal(t, "endpoint configuration cannot be empty", errorInit.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
type fakeIngestor struct {
|
type fakeIngestor struct {
|
||||||
|
|
|
||||||
|
|
@ -8,11 +8,11 @@ import (
|
||||||
_ "embed"
|
_ "embed"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"hash/fnv"
|
"hash/fnv"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
|
||||||
"regexp"
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
@ -187,7 +187,7 @@ func (a *AzureMonitor) initHTTPClient() {
|
||||||
func vmInstanceMetadata(c *http.Client) (region string, resourceID string, err error) {
|
func vmInstanceMetadata(c *http.Client) (region string, resourceID string, err error) {
|
||||||
req, err := http.NewRequest("GET", vmInstanceMetadataURL, nil)
|
req, err := http.NewRequest("GET", vmInstanceMetadataURL, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", "", fmt.Errorf("error creating request: %v", err)
|
return "", "", fmt.Errorf("error creating request: %w", err)
|
||||||
}
|
}
|
||||||
req.Header.Set("Metadata", "true")
|
req.Header.Set("Metadata", "true")
|
||||||
|
|
||||||
|
|
@ -323,12 +323,12 @@ func (a *AzureMonitor) send(body []byte) error {
|
||||||
// refresh the token if needed.
|
// refresh the token if needed.
|
||||||
req, err = autorest.CreatePreparer(a.auth.WithAuthorization()).Prepare(req)
|
req, err = autorest.CreatePreparer(a.auth.WithAuthorization()).Prepare(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to fetch authentication credentials: %v", err)
|
return fmt.Errorf("unable to fetch authentication credentials: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := a.client.Do(req)
|
resp, err := a.client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err.(*url.Error).Unwrap() == context.DeadlineExceeded {
|
if errors.Is(err, context.DeadlineExceeded) {
|
||||||
a.initHTTPClient()
|
a.initHTTPClient()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -122,7 +122,7 @@ func (ps *PubSub) initPubSubClient() error {
|
||||||
option.WithUserAgent(internal.ProductToken()),
|
option.WithUserAgent(internal.ProductToken()),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to generate PubSub client: %v", err)
|
return fmt.Errorf("unable to generate PubSub client: %w", err)
|
||||||
}
|
}
|
||||||
ps.c = client
|
ps.c = client
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
|
|
@ -131,7 +131,7 @@ func (d *Datadog) Write(metrics []telegraf.Metric) error {
|
||||||
copy(ts.Series, tempSeries[0:])
|
copy(ts.Series, tempSeries[0:])
|
||||||
tsBytes, err := json.Marshal(ts)
|
tsBytes, err := json.Marshal(ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to marshal TimeSeries, %s", err.Error())
|
return fmt.Errorf("unable to marshal TimeSeries: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var req *http.Request
|
var req *http.Request
|
||||||
|
|
@ -192,7 +192,7 @@ func buildMetrics(m telegraf.Metric) (map[string]Point, error) {
|
||||||
}
|
}
|
||||||
var p Point
|
var p Point
|
||||||
if err := p.setValue(field.Value); err != nil {
|
if err := p.setValue(field.Value); err != nil {
|
||||||
return ms, fmt.Errorf("unable to extract value from Fields %v error %v", field.Key, err.Error())
|
return ms, fmt.Errorf("unable to extract value from Field %v: %w", field.Key, err)
|
||||||
}
|
}
|
||||||
p[0] = float64(m.Time().Unix())
|
p[0] = float64(m.Time().Unix())
|
||||||
ms[field.Key] = p
|
ms[field.Key] = p
|
||||||
|
|
|
||||||
|
|
@ -134,7 +134,7 @@ func (d *Dynatrace) Write(metrics []telegraf.Metric) error {
|
||||||
output := strings.Join(batch, "\n")
|
output := strings.Join(batch, "\n")
|
||||||
if output != "" {
|
if output != "" {
|
||||||
if err := d.send(output); err != nil {
|
if err := d.send(output); err != nil {
|
||||||
return fmt.Errorf("error processing data:, %s", err.Error())
|
return fmt.Errorf("error processing data: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -147,7 +147,7 @@ func (d *Dynatrace) send(msg string) error {
|
||||||
req, err := http.NewRequest("POST", d.URL, bytes.NewBufferString(msg))
|
req, err := http.NewRequest("POST", d.URL, bytes.NewBufferString(msg))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
d.Log.Errorf("Dynatrace error: %s", err.Error())
|
d.Log.Errorf("Dynatrace error: %s", err.Error())
|
||||||
return fmt.Errorf("error while creating HTTP request:, %s", err.Error())
|
return fmt.Errorf("error while creating HTTP request: %w", err)
|
||||||
}
|
}
|
||||||
req.Header.Add("Content-Type", "text/plain; charset=UTF-8")
|
req.Header.Add("Content-Type", "text/plain; charset=UTF-8")
|
||||||
|
|
||||||
|
|
@ -165,12 +165,12 @@ func (d *Dynatrace) send(msg string) error {
|
||||||
resp, err := d.client.Do(req)
|
resp, err := d.client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
d.Log.Errorf("Dynatrace error: %s", err.Error())
|
d.Log.Errorf("Dynatrace error: %s", err.Error())
|
||||||
return fmt.Errorf("error while sending HTTP request:, %s", err.Error())
|
return fmt.Errorf("error while sending HTTP request: %w", err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted && resp.StatusCode != http.StatusBadRequest {
|
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted && resp.StatusCode != http.StatusBadRequest {
|
||||||
return fmt.Errorf("request failed with response code:, %d", resp.StatusCode)
|
return fmt.Errorf("request failed with response code: %d", resp.StatusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
// print metric line results as info log
|
// print metric line results as info log
|
||||||
|
|
|
||||||
|
|
@ -169,7 +169,7 @@ func (a *Elasticsearch) Connect() error {
|
||||||
|
|
||||||
elasticURL, err := url.Parse(a.URLs[0])
|
elasticURL, err := url.Parse(a.URLs[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("parsing URL failed: %v", err)
|
return fmt.Errorf("parsing URL failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
clientOptions = append(clientOptions,
|
clientOptions = append(clientOptions,
|
||||||
|
|
@ -205,7 +205,7 @@ func (a *Elasticsearch) Connect() error {
|
||||||
esVersion, err := client.ElasticsearchVersion(a.URLs[0])
|
esVersion, err := client.ElasticsearchVersion(a.URLs[0])
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("elasticsearch version check failed: %s", err)
|
return fmt.Errorf("elasticsearch version check failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// quit if ES version is not supported
|
// quit if ES version is not supported
|
||||||
|
|
@ -310,7 +310,7 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {
|
||||||
res, err := bulkRequest.Do(ctx)
|
res, err := bulkRequest.Do(ctx)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error sending bulk request to Elasticsearch: %s", err)
|
return fmt.Errorf("error sending bulk request to Elasticsearch: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if res.Errors {
|
if res.Errors {
|
||||||
|
|
@ -338,7 +338,7 @@ func (a *Elasticsearch) manageTemplate(ctx context.Context) error {
|
||||||
templateExists, errExists := a.Client.IndexTemplateExists(a.TemplateName).Do(ctx)
|
templateExists, errExists := a.Client.IndexTemplateExists(a.TemplateName).Do(ctx)
|
||||||
|
|
||||||
if errExists != nil {
|
if errExists != nil {
|
||||||
return fmt.Errorf("elasticsearch template check failed, template name: %s, error: %s", a.TemplateName, errExists)
|
return fmt.Errorf("elasticsearch template check failed, template name: %s, error: %w", a.TemplateName, errExists)
|
||||||
}
|
}
|
||||||
|
|
||||||
templatePattern := a.IndexName
|
templatePattern := a.IndexName
|
||||||
|
|
@ -370,7 +370,7 @@ func (a *Elasticsearch) manageTemplate(ctx context.Context) error {
|
||||||
_, errCreateTemplate := a.Client.IndexPutTemplate(a.TemplateName).BodyString(tmpl.String()).Do(ctx)
|
_, errCreateTemplate := a.Client.IndexPutTemplate(a.TemplateName).BodyString(tmpl.String()).Do(ctx)
|
||||||
|
|
||||||
if errCreateTemplate != nil {
|
if errCreateTemplate != nil {
|
||||||
return fmt.Errorf("elasticsearch failed to create index template %s : %s", a.TemplateName, errCreateTemplate)
|
return fmt.Errorf("elasticsearch failed to create index template %s: %w", a.TemplateName, errCreateTemplate)
|
||||||
}
|
}
|
||||||
|
|
||||||
a.Log.Debugf("Template %s created or updated\n", a.TemplateName)
|
a.Log.Debugf("Template %s created or updated\n", a.TemplateName)
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@ package exec
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
_ "embed"
|
_ "embed"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
|
|
@ -100,7 +101,7 @@ func (c *CommandRunner) Run(timeout time.Duration, command []string, environment
|
||||||
s := stderr
|
s := stderr
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == internal.ErrTimeout {
|
if errors.Is(err, internal.ErrTimeout) {
|
||||||
return fmt.Errorf("%q timed out and was killed", command)
|
return fmt.Errorf("%q timed out and was killed", command)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -114,10 +115,10 @@ func (c *CommandRunner) Run(timeout time.Duration, command []string, environment
|
||||||
}
|
}
|
||||||
|
|
||||||
if status, ok := internal.ExitStatus(err); ok {
|
if status, ok := internal.ExitStatus(err); ok {
|
||||||
return fmt.Errorf("%q exited %d with %s", command, status, err.Error())
|
return fmt.Errorf("%q exited %d with %w", command, status, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return fmt.Errorf("%q failed with %s", command, err.Error())
|
return fmt.Errorf("%q failed with %w", command, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.cmd = cmd
|
c.cmd = cmd
|
||||||
|
|
@ -164,7 +165,7 @@ func removeWindowsCarriageReturns(b bytes.Buffer) bytes.Buffer {
|
||||||
if len(byt) > 0 {
|
if len(byt) > 0 {
|
||||||
_, _ = buf.Write(byt)
|
_, _ = buf.Write(byt)
|
||||||
}
|
}
|
||||||
if err == io.EOF {
|
if errors.Is(err, io.EOF) {
|
||||||
return buf
|
return buf
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -84,12 +84,12 @@ func (e *Execd) Write(metrics []telegraf.Metric) error {
|
||||||
if !e.IgnoreSerializationError {
|
if !e.IgnoreSerializationError {
|
||||||
return fmt.Errorf("error serializing metrics: %w", err)
|
return fmt.Errorf("error serializing metrics: %w", err)
|
||||||
}
|
}
|
||||||
e.Log.Error("Skipping metric due to a serialization error: %w", err)
|
e.Log.Errorf("Skipping metric due to a serialization error: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err = e.process.Stdin.Write(b); err != nil {
|
if _, err = e.process.Stdin.Write(b); err != nil {
|
||||||
return fmt.Errorf("error writing metrics %s", err)
|
return fmt.Errorf("error writing metrics: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ package execd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
|
"errors"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
|
@ -162,10 +163,11 @@ func runOutputConsumerProgram() {
|
||||||
for {
|
for {
|
||||||
m, err := parser.Next()
|
m, err := parser.Next()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == influx.EOF {
|
if errors.Is(err, influx.EOF) {
|
||||||
return // stream ended
|
return // stream ended
|
||||||
}
|
}
|
||||||
if parseErr, isParseError := err.(*influx.ParseError); isParseError {
|
var parseErr *influx.ParseError
|
||||||
|
if errors.As(err, &parseErr) {
|
||||||
fmt.Fprintf(os.Stderr, "parse ERR %v\n", parseErr)
|
fmt.Fprintf(os.Stderr, "parse ERR %v\n", parseErr)
|
||||||
//nolint:revive // error code is important for this "test"
|
//nolint:revive // error code is important for this "test"
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
|
|
|
||||||
|
|
@ -97,7 +97,7 @@ func (f *File) Write(metrics []telegraf.Metric) error {
|
||||||
|
|
||||||
_, err = f.writer.Write(b)
|
_, err = f.writer.Write(b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeErr = fmt.Errorf("failed to write message: %v", err)
|
writeErr = fmt.Errorf("failed to write message: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -138,7 +138,7 @@ func (g *Graphite) checkEOF(conn net.Conn) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
num, err := conn.Read(b)
|
num, err := conn.Read(b)
|
||||||
if err == io.EOF {
|
if errors.Is(err, io.EOF) {
|
||||||
g.Log.Debugf("Conn %s is closed. closing conn explicitly", conn.RemoteAddr().String())
|
g.Log.Debugf("Conn %s is closed. closing conn explicitly", conn.RemoteAddr().String())
|
||||||
err = conn.Close()
|
err = conn.Close()
|
||||||
g.Log.Debugf("Failed to close the connection: %v", err)
|
g.Log.Debugf("Failed to close the connection: %v", err)
|
||||||
|
|
@ -149,7 +149,8 @@ func (g *Graphite) checkEOF(conn net.Conn) error {
|
||||||
g.Log.Infof("conn %s .conn.Read data? did not expect that. data: %s", conn, b[:num])
|
g.Log.Infof("conn %s .conn.Read data? did not expect that. data: %s", conn, b[:num])
|
||||||
}
|
}
|
||||||
// Log non-timeout errors and close.
|
// Log non-timeout errors and close.
|
||||||
if e, ok := err.(net.Error); !(ok && e.Timeout()) {
|
var netErr net.Error
|
||||||
|
if !(errors.As(err, &netErr) && netErr.Timeout()) {
|
||||||
g.Log.Debugf("conn %s checkEOF .conn.Read returned err != EOF, which is unexpected. closing conn. error: %s", conn, err)
|
g.Log.Debugf("conn %s checkEOF .conn.Read returned err != EOF, which is unexpected. closing conn. error: %s", conn, err)
|
||||||
err = conn.Close()
|
err = conn.Close()
|
||||||
g.Log.Debugf("Failed to close the connection: %v", err)
|
g.Log.Debugf("Failed to close the connection: %v", err)
|
||||||
|
|
@ -184,7 +185,7 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
|
||||||
g.Log.Debugf("Reconnecting and retrying for the following servers: %s", strings.Join(g.failedServers, ","))
|
g.Log.Debugf("Reconnecting and retrying for the following servers: %s", strings.Join(g.failedServers, ","))
|
||||||
err = g.Connect()
|
err = g.Connect()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Failed to reconnect: %v", err)
|
return fmt.Errorf("failed to reconnect: %w", err)
|
||||||
}
|
}
|
||||||
err = g.send(batch)
|
err = g.send(batch)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -457,9 +457,9 @@ func (g *Graylog) Write(metrics []telegraf.Metric) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, value := range values {
|
for _, value := range values {
|
||||||
_, err := writer.Write([]byte(value))
|
_, err = writer.Write([]byte(value))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error writing message: %q, %v", value, err)
|
return fmt.Errorf("error writing message: %q: %w", value, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -115,7 +115,7 @@ func (g *Groundwork) Init() error {
|
||||||
func (g *Groundwork) Connect() error {
|
func (g *Groundwork) Connect() error {
|
||||||
err := g.client.Connect()
|
err := g.client.Connect()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not log in: %v", err)
|
return fmt.Errorf("could not log in: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -123,7 +123,7 @@ func (g *Groundwork) Connect() error {
|
||||||
func (g *Groundwork) Close() error {
|
func (g *Groundwork) Close() error {
|
||||||
err := g.client.Disconnect()
|
err := g.client.Disconnect()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not log out: %v", err)
|
return fmt.Errorf("could not log out: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,8 @@ import (
|
||||||
|
|
||||||
awsV2 "github.com/aws/aws-sdk-go-v2/aws"
|
awsV2 "github.com/aws/aws-sdk-go-v2/aws"
|
||||||
v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
|
v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
|
||||||
|
"golang.org/x/oauth2"
|
||||||
|
"google.golang.org/api/idtoken"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
|
|
@ -23,8 +25,6 @@ import (
|
||||||
httpconfig "github.com/influxdata/telegraf/plugins/common/http"
|
httpconfig "github.com/influxdata/telegraf/plugins/common/http"
|
||||||
"github.com/influxdata/telegraf/plugins/outputs"
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers"
|
"github.com/influxdata/telegraf/plugins/serializers"
|
||||||
"golang.org/x/oauth2"
|
|
||||||
"google.golang.org/api/idtoken"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
//go:embed sample.conf
|
//go:embed sample.conf
|
||||||
|
|
@ -239,7 +239,7 @@ func (h *HTTP) writeMetric(reqBody []byte) error {
|
||||||
|
|
||||||
_, err = io.ReadAll(resp.Body)
|
_, err = io.ReadAll(resp.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("when writing to [%s] received error: %v", h.URL, err)
|
return fmt.Errorf("when writing to [%s] received error: %w", h.URL, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -262,12 +262,12 @@ func (h *HTTP) getAccessToken(ctx context.Context, audience string) (*oauth2.Tok
|
||||||
|
|
||||||
ts, err := idtoken.NewTokenSource(ctx, audience, idtoken.WithCredentialsFile(h.CredentialsFile))
|
ts, err := idtoken.NewTokenSource(ctx, audience, idtoken.WithCredentialsFile(h.CredentialsFile))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error creating oauth2 token source: %s", err)
|
return nil, fmt.Errorf("error creating oauth2 token source: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
token, err := ts.Token()
|
token, err := ts.Token()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error fetching oauth2 token: %s", err)
|
return nil, fmt.Errorf("error fetching oauth2 token: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
h.oauth2Token = token
|
h.oauth2Token = token
|
||||||
|
|
|
||||||
|
|
@ -330,7 +330,7 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
|
||||||
func (c *httpClient) writeBatch(ctx context.Context, db, rp string, metrics []telegraf.Metric) error {
|
func (c *httpClient) writeBatch(ctx context.Context, db, rp string, metrics []telegraf.Metric) error {
|
||||||
loc, err := makeWriteURL(c.config.URL, db, rp, c.config.Consistency)
|
loc, err := makeWriteURL(c.config.URL, db, rp, c.config.Consistency)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed making write url: %s", err.Error())
|
return fmt.Errorf("failed making write url: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
reader := c.requestBodyReader(metrics)
|
reader := c.requestBodyReader(metrics)
|
||||||
|
|
@ -338,13 +338,13 @@ func (c *httpClient) writeBatch(ctx context.Context, db, rp string, metrics []te
|
||||||
|
|
||||||
req, err := c.makeWriteRequest(loc, reader)
|
req, err := c.makeWriteRequest(loc, reader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed making write req: %s", err.Error())
|
return fmt.Errorf("failed making write req: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := c.client.Do(req.WithContext(ctx))
|
resp, err := c.client.Do(req.WithContext(ctx))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
internal.OnClientError(c.client, err)
|
internal.OnClientError(c.client, err)
|
||||||
return fmt.Errorf("failed doing req: %s", err.Error())
|
return fmt.Errorf("failed doing req: %w", err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
|
@ -461,7 +461,7 @@ func (c *httpClient) makeWriteRequest(address string, body io.Reader) (*http.Req
|
||||||
|
|
||||||
req, err := http.NewRequest("POST", address, body)
|
req, err := http.NewRequest("POST", address, body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed creating new request: %s", err.Error())
|
return nil, fmt.Errorf("failed creating new request: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
req.Header.Set("Content-Type", "text/plain; charset=utf-8")
|
req.Header.Set("Content-Type", "text/plain; charset=utf-8")
|
||||||
|
|
|
||||||
|
|
@ -87,14 +87,14 @@ func (i *InfluxDB) Connect() error {
|
||||||
for _, u := range urls {
|
for _, u := range urls {
|
||||||
parts, err := url.Parse(u)
|
parts, err := url.Parse(u)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error parsing url [%q]: %v", u, err)
|
return fmt.Errorf("error parsing url [%q]: %w", u, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var proxy *url.URL
|
var proxy *url.URL
|
||||||
if len(i.HTTPProxy) > 0 {
|
if len(i.HTTPProxy) > 0 {
|
||||||
proxy, err = url.Parse(i.HTTPProxy)
|
proxy, err = url.Parse(i.HTTPProxy)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error parsing proxy_url [%s]: %v", i.HTTPProxy, err)
|
return fmt.Errorf("error parsing proxy_url [%s]: %w", i.HTTPProxy, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -145,8 +145,8 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
|
||||||
|
|
||||||
i.Log.Errorf("When writing to [%s]: %v", client.URL(), err)
|
i.Log.Errorf("When writing to [%s]: %v", client.URL(), err)
|
||||||
|
|
||||||
switch apiError := err.(type) {
|
var apiError *DatabaseNotFoundError
|
||||||
case *DatabaseNotFoundError:
|
if errors.As(err, &apiError) {
|
||||||
if i.SkipDatabaseCreation {
|
if i.SkipDatabaseCreation {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
@ -155,9 +155,8 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
|
||||||
if err := client.CreateDatabase(ctx, apiError.Database); err == nil {
|
if err := client.CreateDatabase(ctx, apiError.Database); err == nil {
|
||||||
return errors.New("database created; retry write")
|
return errors.New("database created; retry write")
|
||||||
}
|
}
|
||||||
i.Log.Errorf("When writing to [%s]: database %q not found and failed to recreate",
|
i.Log.Errorf("When writing to [%s]: database %q not found and failed to recreate", client.URL(), apiError.Database)
|
||||||
client.URL(), apiError.Database)
|
} else {
|
||||||
default:
|
|
||||||
allErrorsAreDatabaseNotFoundErrors = false
|
allErrorsAreDatabaseNotFoundErrors = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -179,7 +178,7 @@ func (i *InfluxDB) udpClient(address *url.URL) (Client, error) {
|
||||||
|
|
||||||
c, err := i.CreateUDPClientF(udpConfig)
|
c, err := i.CreateUDPClientF(udpConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error creating UDP client [%s]: %v", address, err)
|
return nil, fmt.Errorf("error creating UDP client [%s]: %w", address, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return c, nil
|
return c, nil
|
||||||
|
|
@ -215,7 +214,7 @@ func (i *InfluxDB) httpClient(ctx context.Context, address *url.URL, proxy *url.
|
||||||
|
|
||||||
c, err := i.CreateHTTPClientF(httpConfig)
|
c, err := i.CreateHTTPClientF(httpConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error creating HTTP client [%s]: %v", address, err)
|
return nil, fmt.Errorf("error creating HTTP client [%s]: %w", address, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !i.SkipDatabaseCreation {
|
if !i.SkipDatabaseCreation {
|
||||||
|
|
|
||||||
|
|
@ -85,7 +85,7 @@ func (c *udpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
|
||||||
if c.conn == nil {
|
if c.conn == nil {
|
||||||
conn, err := c.dialer.DialContext(ctx, c.url.Scheme, c.url.Host)
|
conn, err := c.dialer.DialContext(ctx, c.url.Scheme, c.url.Host)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error dialing address [%s]: %s", c.url, err)
|
return fmt.Errorf("error dialing address [%s]: %w", c.url, err)
|
||||||
}
|
}
|
||||||
c.conn = conn
|
c.conn = conn
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -187,8 +187,9 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
|
||||||
if c.BucketTag == "" {
|
if c.BucketTag == "" {
|
||||||
err := c.writeBatch(ctx, c.Bucket, metrics)
|
err := c.writeBatch(ctx, c.Bucket, metrics)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err, ok := err.(*APIError); ok {
|
var apiErr *APIError
|
||||||
if err.StatusCode == http.StatusRequestEntityTooLarge {
|
if errors.As(err, &apiErr) {
|
||||||
|
if apiErr.StatusCode == http.StatusRequestEntityTooLarge {
|
||||||
return c.splitAndWriteBatch(ctx, c.Bucket, metrics)
|
return c.splitAndWriteBatch(ctx, c.Bucket, metrics)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -219,8 +220,9 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
|
||||||
for bucket, batch := range batches {
|
for bucket, batch := range batches {
|
||||||
err := c.writeBatch(ctx, bucket, batch)
|
err := c.writeBatch(ctx, bucket, batch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err, ok := err.(*APIError); ok {
|
var apiErr *APIError
|
||||||
if err.StatusCode == http.StatusRequestEntityTooLarge {
|
if errors.As(err, &apiErr) {
|
||||||
|
if apiErr.StatusCode == http.StatusRequestEntityTooLarge {
|
||||||
return c.splitAndWriteBatch(ctx, c.Bucket, metrics)
|
return c.splitAndWriteBatch(ctx, c.Bucket, metrics)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -65,14 +65,14 @@ func (i *InfluxDB) Connect() error {
|
||||||
for _, u := range i.URLs {
|
for _, u := range i.URLs {
|
||||||
parts, err := url.Parse(u)
|
parts, err := url.Parse(u)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error parsing url [%q]: %v", u, err)
|
return fmt.Errorf("error parsing url [%q]: %w", u, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var proxy *url.URL
|
var proxy *url.URL
|
||||||
if len(i.HTTPProxy) > 0 {
|
if len(i.HTTPProxy) > 0 {
|
||||||
proxy, err = url.Parse(i.HTTPProxy)
|
proxy, err = url.Parse(i.HTTPProxy)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error parsing proxy_url [%s]: %v", i.HTTPProxy, err)
|
return fmt.Errorf("error parsing proxy_url [%s]: %w", i.HTTPProxy, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -144,7 +144,7 @@ func (i *InfluxDB) getHTTPClient(address *url.URL, proxy *url.URL) (Client, erro
|
||||||
|
|
||||||
c, err := NewHTTPClient(httpConfig)
|
c, err := NewHTTPClient(httpConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error creating HTTP client [%s]: %v", address, err)
|
return nil, fmt.Errorf("error creating HTTP client [%s]: %w", address, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return c, nil
|
return c, nil
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@ package instrumental
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
_ "embed"
|
_ "embed"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
|
|
@ -79,7 +80,7 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
|
||||||
if i.conn == nil {
|
if i.conn == nil {
|
||||||
err := i.Connect()
|
err := i.Connect()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to (re)connect to Instrumental. Error: %s", err)
|
return fmt.Errorf("failed to (re)connect to Instrumental. Error: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -148,7 +149,7 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
|
||||||
_, err = fmt.Fprint(i.conn, allPoints)
|
_, err = fmt.Fprint(i.conn, allPoints)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == io.EOF {
|
if errors.Is(err, io.EOF) {
|
||||||
_ = i.Close()
|
_ = i.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@ package kafka
|
||||||
|
|
||||||
import (
|
import (
|
||||||
_ "embed"
|
_ "embed"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
@ -146,7 +147,7 @@ func (k *Kafka) Init() error {
|
||||||
|
|
||||||
dialer, err := k.Socks5ProxyConfig.GetDialer()
|
dialer, err := k.Socks5ProxyConfig.GetDialer()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("connecting to proxy server failed: %s", err)
|
return fmt.Errorf("connecting to proxy server failed: %w", err)
|
||||||
}
|
}
|
||||||
config.Net.Proxy.Dialer = dialer
|
config.Net.Proxy.Dialer = dialer
|
||||||
}
|
}
|
||||||
|
|
@ -209,7 +210,7 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error {
|
||||||
|
|
||||||
key, err := k.routingKey(metric)
|
key, err := k.routingKey(metric)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not generate routing key: %v", err)
|
return fmt.Errorf("could not generate routing key: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if key != "" {
|
if key != "" {
|
||||||
|
|
@ -221,13 +222,14 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error {
|
||||||
err := k.producer.SendMessages(msgs)
|
err := k.producer.SendMessages(msgs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// We could have many errors, return only the first encountered.
|
// We could have many errors, return only the first encountered.
|
||||||
if errs, ok := err.(sarama.ProducerErrors); ok {
|
var errs sarama.ProducerErrors
|
||||||
|
if errors.As(err, &errs) {
|
||||||
for _, prodErr := range errs {
|
for _, prodErr := range errs {
|
||||||
if prodErr.Err == sarama.ErrMessageSizeTooLarge {
|
if errors.Is(prodErr.Err, sarama.ErrMessageSizeTooLarge) {
|
||||||
k.Log.Error("Message too large, consider increasing `max_message_bytes`; dropping batch")
|
k.Log.Error("Message too large, consider increasing `max_message_bytes`; dropping batch")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if prodErr.Err == sarama.ErrInvalidTimestamp {
|
if errors.Is(prodErr.Err, sarama.ErrInvalidTimestamp) {
|
||||||
k.Log.Error(
|
k.Log.Error(
|
||||||
"The timestamp of the message is out of acceptable range, consider increasing broker `message.timestamp.difference.max.ms`; " +
|
"The timestamp of the message is out of acceptable range, consider increasing broker `message.timestamp.difference.max.ms`; " +
|
||||||
"dropping batch",
|
"dropping batch",
|
||||||
|
|
|
||||||
|
|
@ -129,7 +129,7 @@ func (l *Librato) writeBatch(start int, sizeBatch int, metricCounter int, tempGa
|
||||||
copy(lmetrics.Gauges, tempGauges[start:end])
|
copy(lmetrics.Gauges, tempGauges[start:end])
|
||||||
metricsBytes, err := json.Marshal(lmetrics)
|
metricsBytes, err := json.Marshal(lmetrics)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to marshal Metrics, %s", err.Error())
|
return fmt.Errorf("unable to marshal Metrics: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
l.Log.Debugf("Librato request: %v", string(metricsBytes))
|
l.Log.Debugf("Librato request: %v", string(metricsBytes))
|
||||||
|
|
@ -139,7 +139,7 @@ func (l *Librato) writeBatch(start int, sizeBatch int, metricCounter int, tempGa
|
||||||
l.APIUrl,
|
l.APIUrl,
|
||||||
bytes.NewBuffer(metricsBytes))
|
bytes.NewBuffer(metricsBytes))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to create http.Request, %s", err.Error())
|
return fmt.Errorf("unable to create http.Request: %w", err)
|
||||||
}
|
}
|
||||||
req.Header.Add("Content-Type", "application/json")
|
req.Header.Add("Content-Type", "application/json")
|
||||||
|
|
||||||
|
|
@ -159,7 +159,7 @@ func (l *Librato) writeBatch(start int, sizeBatch int, metricCounter int, tempGa
|
||||||
resp, err := l.client.Do(req)
|
resp, err := l.client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
l.Log.Debugf("Error POSTing metrics: %v", err.Error())
|
l.Log.Debugf("Error POSTing metrics: %v", err.Error())
|
||||||
return fmt.Errorf("error POSTing metrics, %s", err.Error())
|
return fmt.Errorf("error POSTing metrics: %w", err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
|
@ -206,7 +206,7 @@ func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err := gauge.setValue(value); err != nil {
|
if err := gauge.setValue(value); err != nil {
|
||||||
return gauges, fmt.Errorf("unable to extract value from Fields, %s", err.Error())
|
return gauges, fmt.Errorf("unable to extract value from Fields: %w", err)
|
||||||
}
|
}
|
||||||
gauges = append(gauges, gauge)
|
gauges = append(gauges, gauge)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -97,18 +97,18 @@ func (l *Logzio) Write(metrics []telegraf.Metric) error {
|
||||||
|
|
||||||
serialized, err := json.Marshal(m)
|
serialized, err := json.Marshal(m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to marshal metric, %s", err.Error())
|
return fmt.Errorf("unable to marshal metric: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = gz.Write(append(serialized, '\n'))
|
_, err = gz.Write(append(serialized, '\n'))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to write gzip meric, %s", err.Error())
|
return fmt.Errorf("unable to write gzip meric: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err := gz.Close()
|
err := gz.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to close gzip, %s", err.Error())
|
return fmt.Errorf("unable to close gzip: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return l.send(buff.Bytes())
|
return l.send(buff.Bytes())
|
||||||
|
|
@ -122,14 +122,14 @@ func (l *Logzio) send(metrics []byte) error {
|
||||||
|
|
||||||
req, err := http.NewRequest("POST", url, bytes.NewBuffer(metrics))
|
req, err := http.NewRequest("POST", url, bytes.NewBuffer(metrics))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to create http.Request, %s", err.Error())
|
return fmt.Errorf("unable to create http.Request: %w", err)
|
||||||
}
|
}
|
||||||
req.Header.Add("Content-Type", "application/json")
|
req.Header.Add("Content-Type", "application/json")
|
||||||
req.Header.Set("Content-Encoding", "gzip")
|
req.Header.Set("Content-Encoding", "gzip")
|
||||||
|
|
||||||
resp, err := l.client.Do(req)
|
resp, err := l.client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error POSTing metrics, %s", err.Error())
|
return fmt.Errorf("error POSTing metrics: %w", err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -28,12 +28,12 @@ func (s *MongoDB) getCollections(ctx context.Context) error {
|
||||||
s.collections = map[string]bson.M{}
|
s.collections = map[string]bson.M{}
|
||||||
collections, err := s.client.Database(s.MetricDatabase).ListCollections(ctx, bson.M{})
|
collections, err := s.client.Database(s.MetricDatabase).ListCollections(ctx, bson.M{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to execute ListCollections: %v", err)
|
return fmt.Errorf("unable to execute ListCollections: %w", err)
|
||||||
}
|
}
|
||||||
for collections.Next(ctx) {
|
for collections.Next(ctx) {
|
||||||
var collection bson.M
|
var collection bson.M
|
||||||
if err := collections.Decode(&collection); err != nil {
|
if err = collections.Decode(&collection); err != nil {
|
||||||
return fmt.Errorf("unable to decode ListCollections: %v", err)
|
return fmt.Errorf("unable to decode ListCollections: %w", err)
|
||||||
}
|
}
|
||||||
name, ok := collection["name"].(string)
|
name, ok := collection["name"].(string)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|
@ -169,7 +169,7 @@ func (s *MongoDB) createTimeSeriesCollection(databaseCollection string) error {
|
||||||
cco.SetTimeSeriesOptions(tso)
|
cco.SetTimeSeriesOptions(tso)
|
||||||
err := s.client.Database(s.MetricDatabase).CreateCollection(ctx, databaseCollection, cco)
|
err := s.client.Database(s.MetricDatabase).CreateCollection(ctx, databaseCollection, cco)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to create time series collection: %v", err)
|
return fmt.Errorf("unable to create time series collection: %w", err)
|
||||||
}
|
}
|
||||||
s.collections[databaseCollection] = bson.M{}
|
s.collections[databaseCollection] = bson.M{}
|
||||||
}
|
}
|
||||||
|
|
@ -180,11 +180,11 @@ func (s *MongoDB) Connect() error {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
client, err := mongo.Connect(ctx, s.clientOptions)
|
client, err := mongo.Connect(ctx, s.clientOptions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to connect: %v", err)
|
return fmt.Errorf("unable to connect: %w", err)
|
||||||
}
|
}
|
||||||
s.client = client
|
s.client = client
|
||||||
if err := s.getCollections(ctx); err != nil {
|
if err = s.getCollections(ctx); err != nil {
|
||||||
return fmt.Errorf("unable to get collections from specified metric database: %v", err)
|
return fmt.Errorf("unable to get collections from specified metric database: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -106,7 +106,7 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
|
||||||
|
|
||||||
err = m.client.Publish(topic, buf)
|
err = m.client.Publish(topic, buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not write to MQTT server, %s", err)
|
return fmt.Errorf("could not write to MQTT server: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -119,7 +119,7 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
|
||||||
}
|
}
|
||||||
err = m.client.Publish(key, buf)
|
err = m.client.Publish(key, buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not write to MQTT server, %s", err)
|
return fmt.Errorf("could not write to MQTT server: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -108,7 +108,7 @@ func (n *NATS) Write(metrics []telegraf.Metric) error {
|
||||||
|
|
||||||
err = n.conn.Publish(n.Subject, buf)
|
err = n.conn.Publish(n.Subject, buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("FAILED to send NATS message: %s", err)
|
return fmt.Errorf("FAILED to send NATS message: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
|
|
@ -69,7 +69,7 @@ func (nr *NewRelic) Connect() error {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to connect to newrelic %v", err)
|
return fmt.Errorf("unable to connect to newrelic: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
nr.dc = cumulative.NewDeltaCalculator()
|
nr.dc = cumulative.NewDeltaCalculator()
|
||||||
|
|
|
||||||
|
|
@ -63,7 +63,7 @@ func (n *NSQ) Write(metrics []telegraf.Metric) error {
|
||||||
|
|
||||||
err = n.producer.Publish(n.Topic, buf)
|
err = n.producer.Publish(n.Topic, buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to send NSQD message: %s", err)
|
return fmt.Errorf("failed to send NSQD message: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
|
|
@ -67,17 +67,17 @@ func (o *OpenTSDB) Connect() error {
|
||||||
// Test Connection to OpenTSDB Server
|
// Test Connection to OpenTSDB Server
|
||||||
u, err := url.Parse(o.Host)
|
u, err := url.Parse(o.Host)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error in parsing host url: %s", err.Error())
|
return fmt.Errorf("error in parsing host url: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
uri := fmt.Sprintf("%s:%d", u.Host, o.Port)
|
uri := fmt.Sprintf("%s:%d", u.Host, o.Port)
|
||||||
tcpAddr, err := net.ResolveTCPAddr("tcp", uri)
|
tcpAddr, err := net.ResolveTCPAddr("tcp", uri)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("OpenTSDB TCP address cannot be resolved: %s", err)
|
return fmt.Errorf("OpenTSDB TCP address cannot be resolved: %w", err)
|
||||||
}
|
}
|
||||||
connection, err := net.DialTCP("tcp", nil, tcpAddr)
|
connection, err := net.DialTCP("tcp", nil, tcpAddr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("OpenTSDB Telnet connect fail: %s", err)
|
return fmt.Errorf("OpenTSDB Telnet connect fail: %w", err)
|
||||||
}
|
}
|
||||||
defer connection.Close()
|
defer connection.Close()
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -90,7 +90,7 @@ func (o *OpenTSDB) Write(metrics []telegraf.Metric) error {
|
||||||
|
|
||||||
u, err := url.Parse(o.Host)
|
u, err := url.Parse(o.Host)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error in parsing host url: %s", err.Error())
|
return fmt.Errorf("error in parsing host url: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if u.Scheme == "" || u.Scheme == "tcp" {
|
if u.Scheme == "" || u.Scheme == "tcp" {
|
||||||
|
|
@ -187,9 +187,9 @@ func (o *OpenTSDB) WriteTelnet(metrics []telegraf.Metric, u *url.URL) error {
|
||||||
sanitize(fmt.Sprintf("%s%s%s%s", o.Prefix, m.Name(), o.Separator, fieldName)),
|
sanitize(fmt.Sprintf("%s%s%s%s", o.Prefix, m.Name(), o.Separator, fieldName)),
|
||||||
now, metricValue, tags)
|
now, metricValue, tags)
|
||||||
|
|
||||||
_, err := connection.Write([]byte(messageLine))
|
_, err = connection.Write([]byte(messageLine))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("OpenTSDB: Telnet writing error %s", err.Error())
|
return fmt.Errorf("telnet writing error: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -354,8 +354,9 @@ func isTempError(err error) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
if err, ok := err.(interface{ Temporary() bool }); ok {
|
var tempErr interface{ Temporary() bool }
|
||||||
return err.Temporary()
|
if errors.As(err, &tempErr) {
|
||||||
|
return tempErr.Temporary()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Assume that any other error is permanent.
|
// Assume that any other error is permanent.
|
||||||
|
|
@ -398,13 +399,13 @@ func (p *Postgresql) writeMetricsFromMeasure(ctx context.Context, db dbh, tableS
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.TagsAsForeignKeys {
|
if p.TagsAsForeignKeys {
|
||||||
if err := p.writeTagTable(ctx, db, tableSource); err != nil {
|
if err = p.writeTagTable(ctx, db, tableSource); err != nil {
|
||||||
if p.ForeignTagConstraint {
|
if p.ForeignTagConstraint {
|
||||||
return fmt.Errorf("writing to tag table '%s': %s", tableSource.Name()+p.TagTableSuffix, err)
|
return fmt.Errorf("writing to tag table '%s': %w", tableSource.Name()+p.TagTableSuffix, err)
|
||||||
}
|
}
|
||||||
// log and continue. As the admin can correct the issue, and tags don't change over time, they can be
|
// log and continue. As the admin can correct the issue, and tags don't change over time, they can be
|
||||||
// added from future metrics after issue is corrected.
|
// added from future metrics after issue is corrected.
|
||||||
p.Logger.Errorf("writing to tag table '%s': %s", tableSource.Name()+p.TagTableSuffix, err)
|
p.Logger.Errorf("writing to tag table %q: %s", tableSource.Name()+p.TagTableSuffix, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -385,7 +385,7 @@ func (tm *TableManager) update(ctx context.Context,
|
||||||
stmt := fmt.Sprintf("COMMENT ON COLUMN %s.%s IS 'tag'",
|
stmt := fmt.Sprintf("COMMENT ON COLUMN %s.%s IS 'tag'",
|
||||||
tmplTable.String(), sqltemplate.QuoteIdentifier(col.Name))
|
tmplTable.String(), sqltemplate.QuoteIdentifier(col.Name))
|
||||||
if _, err := tx.Exec(ctx, stmt); err != nil {
|
if _, err := tx.Exec(ctx, stmt); err != nil {
|
||||||
return fmt.Errorf("setting column role comment: %s", err)
|
return fmt.Errorf("setting column role comment: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -113,7 +113,7 @@ func (p *PrometheusClient) Init() error {
|
||||||
for _, cidr := range p.IPRange {
|
for _, cidr := range p.IPRange {
|
||||||
_, ipNet, err := net.ParseCIDR(cidr)
|
_, ipNet, err := net.ParseCIDR(cidr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error parsing ip_range: %v", err)
|
return fmt.Errorf("error parsing ip_range: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ipRange = append(ipRange, ipNet)
|
ipRange = append(ipRange, ipNet)
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/go-redis/redis/v7"
|
"github.com/go-redis/redis/v7"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/plugins/common/tls"
|
"github.com/influxdata/telegraf/plugins/common/tls"
|
||||||
|
|
@ -80,7 +81,7 @@ func (r *RedisTimeSeries) Write(metrics []telegraf.Metric) error {
|
||||||
addSlice = append(addSlice, tags...)
|
addSlice = append(addSlice, tags...)
|
||||||
|
|
||||||
if err := r.client.Do(addSlice...).Err(); err != nil {
|
if err := r.client.Do(addSlice...).Err(); err != nil {
|
||||||
return fmt.Errorf("adding sample failed: %v", err)
|
return fmt.Errorf("adding sample failed: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -70,7 +70,7 @@ func (r *Riemann) Write(metrics []telegraf.Metric) error {
|
||||||
|
|
||||||
if r.client == nil {
|
if r.client == nil {
|
||||||
if err := r.Connect(); err != nil {
|
if err := r.Connect(); err != nil {
|
||||||
return fmt.Errorf("failed to (re)connect to Riemann: %s", err.Error())
|
return fmt.Errorf("failed to (re)connect to Riemann: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -83,7 +83,7 @@ func (r *Riemann) Write(metrics []telegraf.Metric) error {
|
||||||
|
|
||||||
if err := r.client.SendMulti(events); err != nil {
|
if err := r.client.SendMulti(events); err != nil {
|
||||||
r.Close() //nolint:revive // There is another error which will be returned here
|
r.Close() //nolint:revive // There is another error which will be returned here
|
||||||
return fmt.Errorf("failed to send riemann message: %s", err)
|
return fmt.Errorf("failed to send riemann message: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -64,7 +64,7 @@ func (r *Riemann) Write(metrics []telegraf.Metric) error {
|
||||||
if r.client == nil {
|
if r.client == nil {
|
||||||
err := r.Connect()
|
err := r.Connect()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to (re)connect to Riemann, error: %s", err)
|
return fmt.Errorf("failed to (re)connect to Riemann: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -77,7 +77,7 @@ func (r *Riemann) Write(metrics []telegraf.Metric) error {
|
||||||
var senderr = r.client.SendMulti(events)
|
var senderr = r.client.SendMulti(events)
|
||||||
if senderr != nil {
|
if senderr != nil {
|
||||||
r.Close() //nolint:revive // There is another error which will be returned here
|
r.Close() //nolint:revive // There is another error which will be returned here
|
||||||
return fmt.Errorf("failed to send riemann message (will try to reconnect), error: %s", senderr)
|
return fmt.Errorf("failed to send riemann message (will try to reconnect): %w", senderr)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
|
|
@ -351,7 +351,7 @@ func (s *Sensu) setEntity() error {
|
||||||
} else {
|
} else {
|
||||||
defaultHostname, err := os.Hostname()
|
defaultHostname, err := os.Hostname()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("resolving hostname failed: %v", err)
|
return fmt.Errorf("resolving hostname failed: %w", err)
|
||||||
}
|
}
|
||||||
entityName = defaultHostname
|
entityName = defaultHostname
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@ package socket_writer
|
||||||
import (
|
import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
_ "embed"
|
_ "embed"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
@ -119,11 +120,12 @@ func (sw *SocketWriter) Write(metrics []telegraf.Metric) error {
|
||||||
|
|
||||||
if _, err := sw.Conn.Write(bs); err != nil {
|
if _, err := sw.Conn.Write(bs); err != nil {
|
||||||
//TODO log & keep going with remaining strings
|
//TODO log & keep going with remaining strings
|
||||||
if err, ok := err.(net.Error); ok {
|
var netErr net.Error
|
||||||
|
if errors.As(err, &netErr) {
|
||||||
// permanent error. close the connection
|
// permanent error. close the connection
|
||||||
sw.Close() //nolint:revive // There is another error which will be returned here
|
sw.Close() //nolint:revive // There is another error which will be returned here
|
||||||
sw.Conn = nil
|
sw.Conn = nil
|
||||||
return fmt.Errorf("closing connection: %v", err)
|
return fmt.Errorf("closing connection: %w", netErr)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -242,26 +242,26 @@ func (p *SQL) Write(metrics []telegraf.Metric) error {
|
||||||
// ClickHouse needs to batch inserts with prepared statements
|
// ClickHouse needs to batch inserts with prepared statements
|
||||||
tx, err := p.db.Begin()
|
tx, err := p.db.Begin()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("begin failed: %v", err)
|
return fmt.Errorf("begin failed: %w", err)
|
||||||
}
|
}
|
||||||
stmt, err := tx.Prepare(sql)
|
stmt, err := tx.Prepare(sql)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("prepare failed: %v", err)
|
return fmt.Errorf("prepare failed: %w", err)
|
||||||
}
|
}
|
||||||
defer stmt.Close() //nolint:revive // We cannot do anything about a failing close.
|
defer stmt.Close() //nolint:revive // We cannot do anything about a failing close.
|
||||||
|
|
||||||
_, err = stmt.Exec(values...)
|
_, err = stmt.Exec(values...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("execution failed: %v", err)
|
return fmt.Errorf("execution failed: %w", err)
|
||||||
}
|
}
|
||||||
err = tx.Commit()
|
err = tx.Commit()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("commit failed: %v", err)
|
return fmt.Errorf("commit failed: %w", err)
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
_, err = p.db.Exec(sql, values...)
|
_, err = p.db.Exec(sql, values...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("execution failed: %v", err)
|
return fmt.Errorf("execution failed: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -84,7 +84,7 @@ func (q *STOMP) Write(metrics []telegraf.Metric) error {
|
||||||
}
|
}
|
||||||
err = q.stomp.Send(q.QueueName, "text/plain", values, nil)
|
err = q.stomp.Send(q.QueueName, "text/plain", values, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("sending metric failed: %s", err)
|
return fmt.Errorf("sending metric failed: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@ package syslog
|
||||||
import (
|
import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
_ "embed"
|
_ "embed"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
@ -120,10 +121,11 @@ func (s *Syslog) Write(metrics []telegraf.Metric) (err error) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if _, err = s.Conn.Write(msgBytesWithFraming); err != nil {
|
if _, err = s.Conn.Write(msgBytesWithFraming); err != nil {
|
||||||
if netErr, ok := err.(net.Error); ok {
|
var netErr net.Error
|
||||||
|
if errors.As(err, &netErr) {
|
||||||
s.Close() //nolint:revive // There is another error which will be returned here
|
s.Close() //nolint:revive // There is another error which will be returned here
|
||||||
s.Conn = nil
|
s.Conn = nil
|
||||||
return fmt.Errorf("closing connection: %v", netErr)
|
return fmt.Errorf("closing connection: %w", netErr)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -286,8 +286,7 @@ func (t *Timestream) writeToTimestream(writeRecordsInput *timestreamwrite.WriteR
|
||||||
t.logWriteToTimestreamError(notFound, writeRecordsInput.TableName)
|
t.logWriteToTimestreamError(notFound, writeRecordsInput.TableName)
|
||||||
// log error and return error to telegraf to retry in next flush interval
|
// log error and return error to telegraf to retry in next flush interval
|
||||||
// We need this is to avoid data drop when there are no tables present in the database
|
// We need this is to avoid data drop when there are no tables present in the database
|
||||||
return fmt.Errorf("failed to write to Timestream database '%s' table '%s', Error: '%s'",
|
return fmt.Errorf("failed to write to Timestream database %q table %q: %w", t.DatabaseName, *writeRecordsInput.TableName, err)
|
||||||
t.DatabaseName, *writeRecordsInput.TableName, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var rejected *types.RejectedRecordsException
|
var rejected *types.RejectedRecordsException
|
||||||
|
|
@ -301,20 +300,20 @@ func (t *Timestream) writeToTimestream(writeRecordsInput *timestreamwrite.WriteR
|
||||||
|
|
||||||
var throttling *types.ThrottlingException
|
var throttling *types.ThrottlingException
|
||||||
if errors.As(err, &throttling) {
|
if errors.As(err, &throttling) {
|
||||||
return fmt.Errorf("unable to write to Timestream database '%s' table '%s'. Error: %s",
|
return fmt.Errorf("unable to write to Timestream database '%s' table '%s'. Error: %w",
|
||||||
t.DatabaseName, *writeRecordsInput.TableName, throttling)
|
t.DatabaseName, *writeRecordsInput.TableName, throttling)
|
||||||
}
|
}
|
||||||
|
|
||||||
var internal *types.InternalServerException
|
var internal *types.InternalServerException
|
||||||
if errors.As(err, &internal) {
|
if errors.As(err, &internal) {
|
||||||
return fmt.Errorf("unable to write to Timestream database '%s' table '%s'. Error: %s",
|
return fmt.Errorf("unable to write to Timestream database '%s' table '%s'. Error: %w",
|
||||||
t.DatabaseName, *writeRecordsInput.TableName, internal)
|
t.DatabaseName, *writeRecordsInput.TableName, internal)
|
||||||
}
|
}
|
||||||
|
|
||||||
var operation *smithy.OperationError
|
var operation *smithy.OperationError
|
||||||
if !errors.As(err, &operation) {
|
if !errors.As(err, &operation) {
|
||||||
// Retry other, non-aws errors.
|
// Retry other, non-aws errors.
|
||||||
return fmt.Errorf("unable to write to Timestream database '%s' table '%s'. Error: %s",
|
return fmt.Errorf("unable to write to Timestream database '%s' table '%s'. Error: %w",
|
||||||
t.DatabaseName, *writeRecordsInput.TableName, err)
|
t.DatabaseName, *writeRecordsInput.TableName, err)
|
||||||
}
|
}
|
||||||
t.logWriteToTimestreamError(err, writeRecordsInput.TableName)
|
t.logWriteToTimestreamError(err, writeRecordsInput.TableName)
|
||||||
|
|
@ -368,7 +367,8 @@ func (t *Timestream) createTable(tableName *string) error {
|
||||||
|
|
||||||
_, err := t.svc.CreateTable(context.Background(), createTableInput)
|
_, err := t.svc.CreateTable(context.Background(), createTableInput)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if _, ok := err.(*types.ConflictException); ok {
|
var e *types.ConflictException
|
||||||
|
if errors.As(err, &e) {
|
||||||
// if the table was created in the meantime, it's ok.
|
// if the table was created in the meantime, it's ok.
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -122,7 +122,7 @@ func (w *Warp10) Write(metrics []telegraf.Metric) error {
|
||||||
addr := w.WarpURL + "/api/v0/update"
|
addr := w.WarpURL + "/api/v0/update"
|
||||||
req, err := http.NewRequest("POST", addr, bytes.NewBufferString(payload))
|
req, err := http.NewRequest("POST", addr, bytes.NewBufferString(payload))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to create new request '%s': %s", addr, err)
|
return fmt.Errorf("unable to create new request %q: %w", addr, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
req.Header.Set("Content-Type", "text/plain")
|
req.Header.Set("Content-Type", "text/plain")
|
||||||
|
|
|
||||||
|
|
@ -126,9 +126,9 @@ func (w *Wavefront) Write(metrics []telegraf.Metric) error {
|
||||||
if flushErr := w.sender.Flush(); flushErr != nil {
|
if flushErr := w.sender.Flush(); flushErr != nil {
|
||||||
w.Log.Errorf("wavefront flushing error: %v", flushErr)
|
w.Log.Errorf("wavefront flushing error: %v", flushErr)
|
||||||
}
|
}
|
||||||
return fmt.Errorf("wavefront sending error: %v", err)
|
return fmt.Errorf("wavefront sending error: %w", err)
|
||||||
}
|
}
|
||||||
w.Log.Errorf("non-retryable error during Wavefront.Write: %v", err)
|
w.Log.Errorf("non-retryable error during Wavefront.Write: %w", err)
|
||||||
w.Log.Debugf(
|
w.Log.Debugf(
|
||||||
"Non-retryable metric data: Name: %v, Value: %v, Timestamp: %v, Source: %v, PointTags: %v ",
|
"Non-retryable metric data: Name: %v, Value: %v, Timestamp: %v, Source: %v, PointTags: %v ",
|
||||||
point.Metric,
|
point.Metric,
|
||||||
|
|
|
||||||
|
|
@ -68,12 +68,12 @@ func (w *WebSocket) Init() error {
|
||||||
func (w *WebSocket) Connect() error {
|
func (w *WebSocket) Connect() error {
|
||||||
tlsCfg, err := w.ClientConfig.TLSConfig()
|
tlsCfg, err := w.ClientConfig.TLSConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error creating TLS config: %v", err)
|
return fmt.Errorf("error creating TLS config: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
dialProxy, err := w.HTTPProxy.Proxy()
|
dialProxy, err := w.HTTPProxy.Proxy()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error creating proxy: %v", err)
|
return fmt.Errorf("error creating proxy: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
dialer := &ws.Dialer{
|
dialer := &ws.Dialer{
|
||||||
|
|
@ -85,7 +85,7 @@ func (w *WebSocket) Connect() error {
|
||||||
if w.Socks5ProxyEnabled {
|
if w.Socks5ProxyEnabled {
|
||||||
netDialer, err := w.Socks5ProxyConfig.GetDialer()
|
netDialer, err := w.Socks5ProxyConfig.GetDialer()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error connecting to socks5 proxy: %v", err)
|
return fmt.Errorf("error connecting to socks5 proxy: %w", err)
|
||||||
}
|
}
|
||||||
dialer.NetDial = netDialer.Dial
|
dialer.NetDial = netDialer.Dial
|
||||||
}
|
}
|
||||||
|
|
@ -97,7 +97,7 @@ func (w *WebSocket) Connect() error {
|
||||||
|
|
||||||
conn, resp, err := dialer.Dial(w.URL, headers)
|
conn, resp, err := dialer.Dial(w.URL, headers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error dial: %v", err)
|
return fmt.Errorf("error dial: %w", err)
|
||||||
}
|
}
|
||||||
_ = resp.Body.Close()
|
_ = resp.Body.Close()
|
||||||
if resp.StatusCode != http.StatusSwitchingProtocols {
|
if resp.StatusCode != http.StatusSwitchingProtocols {
|
||||||
|
|
@ -162,7 +162,7 @@ func (w *WebSocket) Write(metrics []telegraf.Metric) error {
|
||||||
|
|
||||||
if w.WriteTimeout > 0 {
|
if w.WriteTimeout > 0 {
|
||||||
if err := w.conn.SetWriteDeadline(time.Now().Add(time.Duration(w.WriteTimeout))); err != nil {
|
if err := w.conn.SetWriteDeadline(time.Now().Add(time.Duration(w.WriteTimeout))); err != nil {
|
||||||
return fmt.Errorf("error setting write deadline: %v", err)
|
return fmt.Errorf("error setting write deadline: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
messageType := ws.BinaryMessage
|
messageType := ws.BinaryMessage
|
||||||
|
|
@ -173,7 +173,7 @@ func (w *WebSocket) Write(metrics []telegraf.Metric) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = w.conn.Close()
|
_ = w.conn.Close()
|
||||||
w.conn = nil
|
w.conn = nil
|
||||||
return fmt.Errorf("error writing to connection: %v", err)
|
return fmt.Errorf("error writing to connection: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -158,7 +158,7 @@ func (a *YandexCloudMonitoring) Write(metrics []telegraf.Metric) error {
|
||||||
func getResponseFromMetadata(c *http.Client, metadataURL string) ([]byte, error) {
|
func getResponseFromMetadata(c *http.Client, metadataURL string) ([]byte, error) {
|
||||||
req, err := http.NewRequest("GET", metadataURL, nil)
|
req, err := http.NewRequest("GET", metadataURL, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error creating request: %v", err)
|
return nil, fmt.Errorf("error creating request: %w", err)
|
||||||
}
|
}
|
||||||
req.Header.Set("Metadata-Flavor", "Google")
|
req.Header.Set("Metadata-Flavor", "Google")
|
||||||
resp, err := c.Do(req)
|
resp, err := c.Do(req)
|
||||||
|
|
@ -186,7 +186,7 @@ func (a *YandexCloudMonitoring) getFolderIDFromMetadata() (string, error) {
|
||||||
}
|
}
|
||||||
folderID := string(body)
|
folderID := string(body)
|
||||||
if folderID == "" {
|
if folderID == "" {
|
||||||
return "", fmt.Errorf("unable to fetch folder id from URL %s: %v", a.MetadataFolderURL, err)
|
return "", fmt.Errorf("unable to fetch folder id from URL %s: %w", a.MetadataFolderURL, err)
|
||||||
}
|
}
|
||||||
return folderID, nil
|
return folderID, nil
|
||||||
}
|
}
|
||||||
|
|
@ -202,7 +202,7 @@ func (a *YandexCloudMonitoring) getIAMTokenFromMetadata() (string, int, error) {
|
||||||
return "", 0, err
|
return "", 0, err
|
||||||
}
|
}
|
||||||
if metadata.AccessToken == "" || metadata.ExpiresIn == 0 {
|
if metadata.AccessToken == "" || metadata.ExpiresIn == 0 {
|
||||||
return "", 0, fmt.Errorf("unable to fetch authentication credentials %s: %v", a.MetadataTokenURL, err)
|
return "", 0, fmt.Errorf("unable to fetch authentication credentials %s: %w", a.MetadataTokenURL, err)
|
||||||
}
|
}
|
||||||
return metadata.AccessToken, int(metadata.ExpiresIn), nil
|
return metadata.AccessToken, int(metadata.ExpiresIn), nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue