chore(linters): Configure `revive:var-naming` and fix its findings (#16817)
This commit is contained in:
parent
9788d53549
commit
dba5597530
|
|
@ -367,6 +367,9 @@ linters:
|
||||||
- name: unused-receiver
|
- name: unused-receiver
|
||||||
- name: var-declaration
|
- name: var-declaration
|
||||||
- name: var-naming
|
- name: var-naming
|
||||||
|
arguments:
|
||||||
|
- [ ] # AllowList
|
||||||
|
- [ "ID", "DB", "TS" ] # DenyList
|
||||||
- name: waitgroup-by-value
|
- name: waitgroup-by-value
|
||||||
|
|
||||||
staticcheck:
|
staticcheck:
|
||||||
|
|
|
||||||
|
|
@ -119,7 +119,7 @@ func (c *CiscoTelemetryMDT) Start(acc telegraf.Accumulator) error {
|
||||||
for alias, encodingPath := range c.Aliases {
|
for alias, encodingPath := range c.Aliases {
|
||||||
c.internalAliases[encodingPath] = alias
|
c.internalAliases[encodingPath] = alias
|
||||||
}
|
}
|
||||||
c.initDb()
|
c.initDB()
|
||||||
|
|
||||||
c.dmesFuncs = make(map[string]string, len(c.Dmes))
|
c.dmesFuncs = make(map[string]string, len(c.Dmes))
|
||||||
for dme, dmeKey := range c.Dmes {
|
for dme, dmeKey := range c.Dmes {
|
||||||
|
|
|
||||||
|
|
@ -838,7 +838,7 @@ func (c *CiscoTelemetryMDT) initLldp() {
|
||||||
c.nxpathMap[key]["id"] = "string"
|
c.nxpathMap[key]["id"] = "string"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *CiscoTelemetryMDT) initDb() {
|
func (c *CiscoTelemetryMDT) initDB() {
|
||||||
c.nxpathMap = make(map[string]map[string]string, 200)
|
c.nxpathMap = make(map[string]map[string]string, 200)
|
||||||
|
|
||||||
c.initPower()
|
c.initPower()
|
||||||
|
|
|
||||||
|
|
@ -369,7 +369,7 @@ func tailStream(
|
||||||
|
|
||||||
r := bufio.NewReaderSize(reader, 64*1024)
|
r := bufio.NewReaderSize(reader, 64*1024)
|
||||||
|
|
||||||
var lastTs time.Time
|
var lastTS time.Time
|
||||||
for {
|
for {
|
||||||
line, err := r.ReadBytes('\n')
|
line, err := r.ReadBytes('\n')
|
||||||
|
|
||||||
|
|
@ -385,14 +385,14 @@ func tailStream(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store the last processed timestamp
|
// Store the last processed timestamp
|
||||||
if ts.After(lastTs) {
|
if ts.After(lastTS) {
|
||||||
lastTs = ts
|
lastTS = ts
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
return lastTs, nil
|
return lastTS, nil
|
||||||
}
|
}
|
||||||
return time.Time{}, err
|
return time.Time{}, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -38,8 +38,8 @@ type EventHub struct {
|
||||||
UserAgent string `toml:"user_agent"`
|
UserAgent string `toml:"user_agent"`
|
||||||
PartitionIDs []string `toml:"partition_ids"`
|
PartitionIDs []string `toml:"partition_ids"`
|
||||||
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
|
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
|
||||||
EnqueuedTimeAsTs bool `toml:"enqueued_time_as_ts"`
|
EnqueuedTimeAsTS bool `toml:"enqueued_time_as_ts"`
|
||||||
IotHubEnqueuedTimeAsTs bool `toml:"iot_hub_enqueued_time_as_ts"`
|
IotHubEnqueuedTimeAsTS bool `toml:"iot_hub_enqueued_time_as_ts"`
|
||||||
|
|
||||||
// Metadata
|
// Metadata
|
||||||
ApplicationPropertyFields []string `toml:"application_property_fields"`
|
ApplicationPropertyFields []string `toml:"application_property_fields"`
|
||||||
|
|
@ -299,7 +299,7 @@ func (e *EventHub) createMetrics(event *eventhub.Event) ([]telegraf.Metric, erro
|
||||||
metrics[i].AddField(e.SequenceNumberField, *event.SystemProperties.SequenceNumber)
|
metrics[i].AddField(e.SequenceNumberField, *event.SystemProperties.SequenceNumber)
|
||||||
}
|
}
|
||||||
|
|
||||||
if e.EnqueuedTimeAsTs {
|
if e.EnqueuedTimeAsTS {
|
||||||
metrics[i].SetTime(*event.SystemProperties.EnqueuedTime)
|
metrics[i].SetTime(*event.SystemProperties.EnqueuedTime)
|
||||||
} else if e.EnqueuedTimeField != "" {
|
} else if e.EnqueuedTimeField != "" {
|
||||||
metrics[i].AddField(e.EnqueuedTimeField, (*event.SystemProperties.EnqueuedTime).UnixNano()/int64(time.Millisecond))
|
metrics[i].AddField(e.EnqueuedTimeField, (*event.SystemProperties.EnqueuedTime).UnixNano()/int64(time.Millisecond))
|
||||||
|
|
@ -328,7 +328,7 @@ func (e *EventHub) createMetrics(event *eventhub.Event) ([]telegraf.Metric, erro
|
||||||
metrics[i].AddTag(e.IoTHubConnectionModuleIDTag, *event.SystemProperties.IoTHubConnectionModuleID)
|
metrics[i].AddTag(e.IoTHubConnectionModuleIDTag, *event.SystemProperties.IoTHubConnectionModuleID)
|
||||||
}
|
}
|
||||||
if event.SystemProperties.IoTHubEnqueuedTime != nil {
|
if event.SystemProperties.IoTHubEnqueuedTime != nil {
|
||||||
if e.IotHubEnqueuedTimeAsTs {
|
if e.IotHubEnqueuedTimeAsTS {
|
||||||
metrics[i].SetTime(*event.SystemProperties.IoTHubEnqueuedTime)
|
metrics[i].SetTime(*event.SystemProperties.IoTHubEnqueuedTime)
|
||||||
} else if e.IoTHubEnqueuedTimeField != "" {
|
} else if e.IoTHubEnqueuedTimeField != "" {
|
||||||
metrics[i].AddField(e.IoTHubEnqueuedTimeField, (*event.SystemProperties.IoTHubEnqueuedTime).UnixNano()/int64(time.Millisecond))
|
metrics[i].AddField(e.IoTHubEnqueuedTimeField, (*event.SystemProperties.IoTHubEnqueuedTime).UnixNano()/int64(time.Millisecond))
|
||||||
|
|
|
||||||
|
|
@ -40,8 +40,8 @@ func (ki *KubernetesInventory) gatherDaemonSet(d *apps.DaemonSet, acc telegraf.A
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
creationTs := d.GetCreationTimestamp()
|
creationTS := d.GetCreationTimestamp()
|
||||||
if !creationTs.IsZero() {
|
if !creationTS.IsZero() {
|
||||||
fields["created"] = d.GetCreationTimestamp().UnixNano()
|
fields["created"] = d.GetCreationTimestamp().UnixNano()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -21,8 +21,8 @@ func collectEndpoints(ctx context.Context, acc telegraf.Accumulator, ki *Kuberne
|
||||||
}
|
}
|
||||||
|
|
||||||
func gatherEndpoint(e corev1.Endpoints, acc telegraf.Accumulator) {
|
func gatherEndpoint(e corev1.Endpoints, acc telegraf.Accumulator) {
|
||||||
creationTs := e.GetCreationTimestamp()
|
creationTS := e.GetCreationTimestamp()
|
||||||
if creationTs.IsZero() {
|
if creationTS.IsZero() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -20,8 +20,8 @@ func collectIngress(ctx context.Context, acc telegraf.Accumulator, ki *Kubernete
|
||||||
}
|
}
|
||||||
|
|
||||||
func gatherIngress(i netv1.Ingress, acc telegraf.Accumulator) {
|
func gatherIngress(i netv1.Ingress, acc telegraf.Accumulator) {
|
||||||
creationTs := i.GetCreationTimestamp()
|
creationTS := i.GetCreationTimestamp()
|
||||||
if creationTs.IsZero() {
|
if creationTS.IsZero() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -30,8 +30,8 @@ func collectPods(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesIn
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ki *KubernetesInventory) gatherPod(p *corev1.Pod, acc telegraf.Accumulator) {
|
func (ki *KubernetesInventory) gatherPod(p *corev1.Pod, acc telegraf.Accumulator) {
|
||||||
creationTs := p.GetCreationTimestamp()
|
creationTS := p.GetCreationTimestamp()
|
||||||
if creationTs.IsZero() {
|
if creationTS.IsZero() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -20,8 +20,8 @@ func collectServices(ctx context.Context, acc telegraf.Accumulator, ki *Kubernet
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ki *KubernetesInventory) gatherService(s *corev1.Service, acc telegraf.Accumulator) {
|
func (ki *KubernetesInventory) gatherService(s *corev1.Service, acc telegraf.Accumulator) {
|
||||||
creationTs := s.GetCreationTimestamp()
|
creationTS := s.GetCreationTimestamp()
|
||||||
if creationTs.IsZero() {
|
if creationTS.IsZero() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -31,11 +31,11 @@ var disconnectedServersBehaviors = []string{"error", "skip"}
|
||||||
type MongoDB struct {
|
type MongoDB struct {
|
||||||
Servers []string `toml:"servers"`
|
Servers []string `toml:"servers"`
|
||||||
GatherClusterStatus bool `toml:"gather_cluster_status"`
|
GatherClusterStatus bool `toml:"gather_cluster_status"`
|
||||||
GatherPerdbStats bool `toml:"gather_perdb_stats"`
|
GatherPerDBStats bool `toml:"gather_perdb_stats"`
|
||||||
GatherColStats bool `toml:"gather_col_stats"`
|
GatherColStats bool `toml:"gather_col_stats"`
|
||||||
GatherTopStat bool `toml:"gather_top_stat"`
|
GatherTopStat bool `toml:"gather_top_stat"`
|
||||||
DisconnectedServersBehavior string `toml:"disconnected_servers_behavior"`
|
DisconnectedServersBehavior string `toml:"disconnected_servers_behavior"`
|
||||||
ColStatsDbs []string `toml:"col_stats_dbs"`
|
ColStatsDBs []string `toml:"col_stats_dbs"`
|
||||||
common_tls.ClientConfig
|
common_tls.ClientConfig
|
||||||
Ssl ssl
|
Ssl ssl
|
||||||
|
|
||||||
|
|
@ -118,7 +118,7 @@ func (m *MongoDB) Gather(acc telegraf.Accumulator) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err := srv.gatherData(acc, m.GatherClusterStatus, m.GatherPerdbStats, m.GatherColStats, m.GatherTopStat, m.ColStatsDbs)
|
err := srv.gatherData(acc, m.GatherClusterStatus, m.GatherPerDBStats, m.GatherColStats, m.GatherTopStat, m.ColStatsDBs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.Log.Errorf("Failed to gather data: %s", err)
|
m.Log.Errorf("Failed to gather data: %s", err)
|
||||||
}
|
}
|
||||||
|
|
@ -191,10 +191,10 @@ func init() {
|
||||||
inputs.Add("mongodb", func() telegraf.Input {
|
inputs.Add("mongodb", func() telegraf.Input {
|
||||||
return &MongoDB{
|
return &MongoDB{
|
||||||
GatherClusterStatus: true,
|
GatherClusterStatus: true,
|
||||||
GatherPerdbStats: false,
|
GatherPerDBStats: false,
|
||||||
GatherColStats: false,
|
GatherColStats: false,
|
||||||
GatherTopStat: false,
|
GatherTopStat: false,
|
||||||
ColStatsDbs: []string{"local"},
|
ColStatsDBs: []string{"local"},
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -8,11 +8,11 @@ import (
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
)
|
)
|
||||||
|
|
||||||
type mongodbData struct {
|
type mongoDBData struct {
|
||||||
StatLine *statLine
|
StatLine *statLine
|
||||||
Fields map[string]interface{}
|
Fields map[string]interface{}
|
||||||
Tags map[string]string
|
Tags map[string]string
|
||||||
DbData []bbData
|
DBData []bbData
|
||||||
ColData []colData
|
ColData []colData
|
||||||
ShardHostData []bbData
|
ShardHostData []bbData
|
||||||
TopStatsData []bbData
|
TopStatsData []bbData
|
||||||
|
|
@ -25,12 +25,12 @@ type bbData struct {
|
||||||
|
|
||||||
type colData struct {
|
type colData struct {
|
||||||
Name string
|
Name string
|
||||||
DbName string
|
DBName string
|
||||||
Fields map[string]interface{}
|
Fields map[string]interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMongodbData(statLine *statLine, tags map[string]string) *mongodbData {
|
func newMongodbData(statLine *statLine, tags map[string]string) *mongoDBData {
|
||||||
return &mongodbData{
|
return &mongoDBData{
|
||||||
StatLine: statLine,
|
StatLine: statLine,
|
||||||
Tags: tags,
|
Tags: tags,
|
||||||
Fields: make(map[string]interface{}),
|
Fields: make(map[string]interface{}),
|
||||||
|
|
@ -297,30 +297,30 @@ var topDataStats = map[string]string{
|
||||||
"commands_count": "CommandsCount",
|
"commands_count": "CommandsCount",
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *mongodbData) addDbStats() {
|
func (d *mongoDBData) addDBStats() {
|
||||||
for i := range d.StatLine.DbStatsLines {
|
for i := range d.StatLine.DBStatsLines {
|
||||||
dbstat := d.StatLine.DbStatsLines[i]
|
dbStat := d.StatLine.DBStatsLines[i]
|
||||||
dbStatLine := reflect.ValueOf(&dbstat).Elem()
|
dbStatLine := reflect.ValueOf(&dbStat).Elem()
|
||||||
newDbData := &bbData{
|
newDBData := &bbData{
|
||||||
Name: dbstat.Name,
|
Name: dbStat.Name,
|
||||||
Fields: make(map[string]interface{}),
|
Fields: make(map[string]interface{}),
|
||||||
}
|
}
|
||||||
newDbData.Fields["type"] = "db_stat"
|
newDBData.Fields["type"] = "db_stat"
|
||||||
for key, value := range dbDataStats {
|
for key, value := range dbDataStats {
|
||||||
val := dbStatLine.FieldByName(value).Interface()
|
val := dbStatLine.FieldByName(value).Interface()
|
||||||
newDbData.Fields[key] = val
|
newDBData.Fields[key] = val
|
||||||
}
|
}
|
||||||
d.DbData = append(d.DbData, *newDbData)
|
d.DBData = append(d.DBData, *newDBData)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *mongodbData) addColStats() {
|
func (d *mongoDBData) addColStats() {
|
||||||
for i := range d.StatLine.ColStatsLines {
|
for i := range d.StatLine.ColStatsLines {
|
||||||
colstat := d.StatLine.ColStatsLines[i]
|
colstat := d.StatLine.ColStatsLines[i]
|
||||||
colStatLine := reflect.ValueOf(&colstat).Elem()
|
colStatLine := reflect.ValueOf(&colstat).Elem()
|
||||||
newColData := &colData{
|
newColData := &colData{
|
||||||
Name: colstat.Name,
|
Name: colstat.Name,
|
||||||
DbName: colstat.DbName,
|
DBName: colstat.DBName,
|
||||||
Fields: make(map[string]interface{}),
|
Fields: make(map[string]interface{}),
|
||||||
}
|
}
|
||||||
newColData.Fields["type"] = "col_stat"
|
newColData.Fields["type"] = "col_stat"
|
||||||
|
|
@ -332,24 +332,24 @@ func (d *mongodbData) addColStats() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *mongodbData) addShardHostStats() {
|
func (d *mongoDBData) addShardHostStats() {
|
||||||
for host := range d.StatLine.ShardHostStatsLines {
|
for host := range d.StatLine.ShardHostStatsLines {
|
||||||
hostStat := d.StatLine.ShardHostStatsLines[host]
|
hostStat := d.StatLine.ShardHostStatsLines[host]
|
||||||
hostStatLine := reflect.ValueOf(&hostStat).Elem()
|
hostStatLine := reflect.ValueOf(&hostStat).Elem()
|
||||||
newDbData := &bbData{
|
newDBData := &bbData{
|
||||||
Name: host,
|
Name: host,
|
||||||
Fields: make(map[string]interface{}),
|
Fields: make(map[string]interface{}),
|
||||||
}
|
}
|
||||||
newDbData.Fields["type"] = "shard_host_stat"
|
newDBData.Fields["type"] = "shard_host_stat"
|
||||||
for k, v := range shardHostStats {
|
for k, v := range shardHostStats {
|
||||||
val := hostStatLine.FieldByName(v).Interface()
|
val := hostStatLine.FieldByName(v).Interface()
|
||||||
newDbData.Fields[k] = val
|
newDBData.Fields[k] = val
|
||||||
}
|
}
|
||||||
d.ShardHostData = append(d.ShardHostData, *newDbData)
|
d.ShardHostData = append(d.ShardHostData, *newDBData)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *mongodbData) addTopStats() {
|
func (d *mongoDBData) addTopStats() {
|
||||||
for i := range d.StatLine.TopStatLines {
|
for i := range d.StatLine.TopStatLines {
|
||||||
topStat := d.StatLine.TopStatLines[i]
|
topStat := d.StatLine.TopStatLines[i]
|
||||||
topStatLine := reflect.ValueOf(&topStat).Elem()
|
topStatLine := reflect.ValueOf(&topStat).Elem()
|
||||||
|
|
@ -366,7 +366,7 @@ func (d *mongodbData) addTopStats() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *mongodbData) addDefaultStats() {
|
func (d *mongoDBData) addDefaultStats() {
|
||||||
statLine := reflect.ValueOf(d.StatLine).Elem()
|
statLine := reflect.ValueOf(d.StatLine).Elem()
|
||||||
d.addStat(statLine, defaultStats)
|
d.addStat(statLine, defaultStats)
|
||||||
if d.StatLine.NodeType != "" {
|
if d.StatLine.NodeType != "" {
|
||||||
|
|
@ -414,18 +414,18 @@ func (d *mongodbData) addDefaultStats() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *mongodbData) addStat(statLine reflect.Value, stats map[string]string) {
|
func (d *mongoDBData) addStat(statLine reflect.Value, stats map[string]string) {
|
||||||
for key, value := range stats {
|
for key, value := range stats {
|
||||||
val := statLine.FieldByName(value).Interface()
|
val := statLine.FieldByName(value).Interface()
|
||||||
d.add(key, val)
|
d.add(key, val)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *mongodbData) add(key string, val interface{}) {
|
func (d *mongoDBData) add(key string, val interface{}) {
|
||||||
d.Fields[key] = val
|
d.Fields[key] = val
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *mongodbData) flush(acc telegraf.Accumulator) {
|
func (d *mongoDBData) flush(acc telegraf.Accumulator) {
|
||||||
acc.AddFields(
|
acc.AddFields(
|
||||||
"mongodb",
|
"mongodb",
|
||||||
d.Fields,
|
d.Fields,
|
||||||
|
|
@ -434,7 +434,7 @@ func (d *mongodbData) flush(acc telegraf.Accumulator) {
|
||||||
)
|
)
|
||||||
d.Fields = make(map[string]interface{})
|
d.Fields = make(map[string]interface{})
|
||||||
|
|
||||||
for _, db := range d.DbData {
|
for _, db := range d.DBData {
|
||||||
d.Tags["db_name"] = db.Name
|
d.Tags["db_name"] = db.Name
|
||||||
acc.AddFields(
|
acc.AddFields(
|
||||||
"mongodb_db_stats",
|
"mongodb_db_stats",
|
||||||
|
|
@ -446,7 +446,7 @@ func (d *mongodbData) flush(acc telegraf.Accumulator) {
|
||||||
}
|
}
|
||||||
for _, col := range d.ColData {
|
for _, col := range d.ColData {
|
||||||
d.Tags["collection"] = col.Name
|
d.Tags["collection"] = col.Name
|
||||||
d.Tags["db_name"] = col.DbName
|
d.Tags["db_name"] = col.DBName
|
||||||
acc.AddFields(
|
acc.AddFields(
|
||||||
"mongodb_col_stats",
|
"mongodb_col_stats",
|
||||||
col.Fields,
|
col.Fields,
|
||||||
|
|
|
||||||
|
|
@ -182,7 +182,7 @@ func (s *server) gatherDBStats(name string) (*db, error) {
|
||||||
|
|
||||||
return &db{
|
return &db{
|
||||||
Name: name,
|
Name: name,
|
||||||
DbStatsData: stats,
|
DBStatsData: stats,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -229,7 +229,7 @@ func (s *server) gatherOplogStats() (*oplogStats, error) {
|
||||||
return s.getOplogReplLag("oplog.$main")
|
return s.getOplogReplLag("oplog.$main")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *server) gatherCollectionStats(colStatsDbs []string) (*colStats, error) {
|
func (s *server) gatherCollectionStats(colStatsDBs []string) (*colStats, error) {
|
||||||
names, err := s.client.ListDatabaseNames(context.Background(), bson.D{})
|
names, err := s.client.ListDatabaseNames(context.Background(), bson.D{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
@ -237,7 +237,7 @@ func (s *server) gatherCollectionStats(colStatsDbs []string) (*colStats, error)
|
||||||
|
|
||||||
results := &colStats{}
|
results := &colStats{}
|
||||||
for _, dbName := range names {
|
for _, dbName := range names {
|
||||||
if slices.Contains(colStatsDbs, dbName) || len(colStatsDbs) == 0 {
|
if slices.Contains(colStatsDBs, dbName) || len(colStatsDBs) == 0 {
|
||||||
// skip views as they fail on collStats below
|
// skip views as they fail on collStats below
|
||||||
filter := bson.M{"type": bson.M{"$in": bson.A{"collection", "timeseries"}}}
|
filter := bson.M{"type": bson.M{"$in": bson.A{"collection", "timeseries"}}}
|
||||||
|
|
||||||
|
|
@ -261,7 +261,7 @@ func (s *server) gatherCollectionStats(colStatsDbs []string) (*colStats, error)
|
||||||
}
|
}
|
||||||
collection := &collection{
|
collection := &collection{
|
||||||
Name: colName,
|
Name: colName,
|
||||||
DbName: dbName,
|
DBName: dbName,
|
||||||
ColStatsData: colStatLine,
|
ColStatsData: colStatLine,
|
||||||
}
|
}
|
||||||
results.Collections = append(results.Collections, *collection)
|
results.Collections = append(results.Collections, *collection)
|
||||||
|
|
@ -271,7 +271,7 @@ func (s *server) gatherCollectionStats(colStatsDbs []string) (*colStats, error)
|
||||||
return results, nil
|
return results, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *server) gatherData(acc telegraf.Accumulator, gatherClusterStatus, gatherDbStats, gatherColStats, gatherTopStat bool, colStatsDbs []string) error {
|
func (s *server) gatherData(acc telegraf.Accumulator, gatherClusterStatus, gatherDBStats, gatherColStats, gatherTopStat bool, colStatsDBs []string) error {
|
||||||
serverStatus, err := s.gatherServerStatus()
|
serverStatus, err := s.gatherServerStatus()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
@ -310,7 +310,7 @@ func (s *server) gatherData(acc telegraf.Accumulator, gatherClusterStatus, gathe
|
||||||
|
|
||||||
var collectionStats *colStats
|
var collectionStats *colStats
|
||||||
if gatherColStats {
|
if gatherColStats {
|
||||||
stats, err := s.gatherCollectionStats(colStatsDbs)
|
stats, err := s.gatherCollectionStats(colStatsDBs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -318,7 +318,7 @@ func (s *server) gatherData(acc telegraf.Accumulator, gatherClusterStatus, gathe
|
||||||
}
|
}
|
||||||
|
|
||||||
dbStats := &dbStats{}
|
dbStats := &dbStats{}
|
||||||
if gatherDbStats {
|
if gatherDBStats {
|
||||||
names, err := s.client.ListDatabaseNames(context.Background(), bson.D{})
|
names, err := s.client.ListDatabaseNames(context.Background(), bson.D{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
@ -330,7 +330,7 @@ func (s *server) gatherData(acc telegraf.Accumulator, gatherClusterStatus, gathe
|
||||||
s.log.Errorf("Error getting db stats from %q: %v", name, err)
|
s.log.Errorf("Error getting db stats from %q: %v", name, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
dbStats.Dbs = append(dbStats.Dbs, *db)
|
dbStats.DBs = append(dbStats.DBs, *db)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -348,7 +348,7 @@ func (s *server) gatherData(acc telegraf.Accumulator, gatherClusterStatus, gathe
|
||||||
ServerStatus: serverStatus,
|
ServerStatus: serverStatus,
|
||||||
ReplSetStatus: replSetStatus,
|
ReplSetStatus: replSetStatus,
|
||||||
ClusterStatus: clusterStatus,
|
ClusterStatus: clusterStatus,
|
||||||
DbStats: dbStats,
|
DBStats: dbStats,
|
||||||
ColStats: collectionStats,
|
ColStats: collectionStats,
|
||||||
ShardStats: shardStats,
|
ShardStats: shardStats,
|
||||||
OplogStats: oplogStats,
|
OplogStats: oplogStats,
|
||||||
|
|
@ -367,7 +367,7 @@ func (s *server) gatherData(acc telegraf.Accumulator, gatherClusterStatus, gathe
|
||||||
s.getDefaultTags(),
|
s.getDefaultTags(),
|
||||||
)
|
)
|
||||||
data.addDefaultStats()
|
data.addDefaultStats()
|
||||||
data.addDbStats()
|
data.addDBStats()
|
||||||
data.addColStats()
|
data.addColStats()
|
||||||
data.addShardHostStats()
|
data.addShardHostStats()
|
||||||
data.addTopStats()
|
data.addTopStats()
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,7 @@ type mongoStatus struct {
|
||||||
ServerStatus *serverStatus
|
ServerStatus *serverStatus
|
||||||
ReplSetStatus *replSetStatus
|
ReplSetStatus *replSetStatus
|
||||||
ClusterStatus *clusterStatus
|
ClusterStatus *clusterStatus
|
||||||
DbStats *dbStats
|
DBStats *dbStats
|
||||||
ColStats *colStats
|
ColStats *colStats
|
||||||
ShardStats *shardStats
|
ShardStats *shardStats
|
||||||
OplogStats *oplogStats
|
OplogStats *oplogStats
|
||||||
|
|
@ -61,18 +61,18 @@ type serverStatus struct {
|
||||||
|
|
||||||
// dbStats stores stats from all dbs
|
// dbStats stores stats from all dbs
|
||||||
type dbStats struct {
|
type dbStats struct {
|
||||||
Dbs []db
|
DBs []db
|
||||||
}
|
}
|
||||||
|
|
||||||
// db represent a single DB
|
// db represent a single DB
|
||||||
type db struct {
|
type db struct {
|
||||||
Name string
|
Name string
|
||||||
DbStatsData *dbStatsData
|
DBStatsData *dbStatsData
|
||||||
}
|
}
|
||||||
|
|
||||||
// dbStatsData stores stats from a db
|
// dbStatsData stores stats from a db
|
||||||
type dbStatsData struct {
|
type dbStatsData struct {
|
||||||
Db string `bson:"db"`
|
DB string `bson:"db"`
|
||||||
Collections int64 `bson:"collections"`
|
Collections int64 `bson:"collections"`
|
||||||
Objects int64 `bson:"objects"`
|
Objects int64 `bson:"objects"`
|
||||||
AvgObjSize float64 `bson:"avgObjSize"`
|
AvgObjSize float64 `bson:"avgObjSize"`
|
||||||
|
|
@ -93,7 +93,7 @@ type colStats struct {
|
||||||
|
|
||||||
type collection struct {
|
type collection struct {
|
||||||
Name string
|
Name string
|
||||||
DbName string
|
DBName string
|
||||||
ColStatsData *colStatsData
|
ColStatsData *colStatsData
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -751,7 +751,7 @@ type statLine struct {
|
||||||
JumboChunksCount int64
|
JumboChunksCount int64
|
||||||
|
|
||||||
// DB stats field
|
// DB stats field
|
||||||
DbStatsLines []dbStatLine
|
DBStatsLines []dbStatLine
|
||||||
|
|
||||||
// Col Stats field
|
// Col Stats field
|
||||||
ColStatsLines []colStatLine
|
ColStatsLines []colStatLine
|
||||||
|
|
@ -807,7 +807,7 @@ type dbStatLine struct {
|
||||||
}
|
}
|
||||||
type colStatLine struct {
|
type colStatLine struct {
|
||||||
Name string
|
Name string
|
||||||
DbName string
|
DBName string
|
||||||
Count int64
|
Count int64
|
||||||
Size int64
|
Size int64
|
||||||
AvgObjSize float64
|
AvgObjSize float64
|
||||||
|
|
@ -1347,16 +1347,16 @@ func newStatLine(oldMongo, newMongo mongoStatus, key string, sampleSecs int64) *
|
||||||
returnVal.OplogStats = newMongo.OplogStats
|
returnVal.OplogStats = newMongo.OplogStats
|
||||||
}
|
}
|
||||||
|
|
||||||
if newMongo.DbStats != nil {
|
if newMongo.DBStats != nil {
|
||||||
newDbStats := *newMongo.DbStats
|
newDBStats := *newMongo.DBStats
|
||||||
for _, db := range newDbStats.Dbs {
|
for _, db := range newDBStats.DBs {
|
||||||
dbStatsData := db.DbStatsData
|
dbStatsData := db.DBStatsData
|
||||||
// mongos doesn't have the db key, so setting the db name
|
// mongos doesn't have the db key, so setting the db name
|
||||||
if dbStatsData.Db == "" {
|
if dbStatsData.DB == "" {
|
||||||
dbStatsData.Db = db.Name
|
dbStatsData.DB = db.Name
|
||||||
}
|
}
|
||||||
dbStatLine := &dbStatLine{
|
dbStatLine := &dbStatLine{
|
||||||
Name: dbStatsData.Db,
|
Name: dbStatsData.DB,
|
||||||
Collections: dbStatsData.Collections,
|
Collections: dbStatsData.Collections,
|
||||||
Objects: dbStatsData.Objects,
|
Objects: dbStatsData.Objects,
|
||||||
AvgObjSize: dbStatsData.AvgObjSize,
|
AvgObjSize: dbStatsData.AvgObjSize,
|
||||||
|
|
@ -1369,7 +1369,7 @@ func newStatLine(oldMongo, newMongo mongoStatus, key string, sampleSecs int64) *
|
||||||
FsTotalSize: dbStatsData.FsTotalSize,
|
FsTotalSize: dbStatsData.FsTotalSize,
|
||||||
FsUsedSize: dbStatsData.FsUsedSize,
|
FsUsedSize: dbStatsData.FsUsedSize,
|
||||||
}
|
}
|
||||||
returnVal.DbStatsLines = append(returnVal.DbStatsLines, *dbStatLine)
|
returnVal.DBStatsLines = append(returnVal.DBStatsLines, *dbStatLine)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1382,7 +1382,7 @@ func newStatLine(oldMongo, newMongo mongoStatus, key string, sampleSecs int64) *
|
||||||
}
|
}
|
||||||
colStatLine := &colStatLine{
|
colStatLine := &colStatLine{
|
||||||
Name: colStatsData.Collection,
|
Name: colStatsData.Collection,
|
||||||
DbName: col.DbName,
|
DBName: col.DBName,
|
||||||
Count: colStatsData.Count,
|
Count: colStatsData.Count,
|
||||||
Size: colStatsData.Size,
|
Size: colStatsData.Size,
|
||||||
AvgObjSize: colStatsData.AvgObjSize,
|
AvgObjSize: colStatsData.AvgObjSize,
|
||||||
|
|
|
||||||
|
|
@ -222,7 +222,7 @@ type memory struct {
|
||||||
Plugins int64 `json:"plugins"`
|
Plugins int64 `json:"plugins"`
|
||||||
OtherProc int64 `json:"other_proc"`
|
OtherProc int64 `json:"other_proc"`
|
||||||
Metrics int64 `json:"metrics"`
|
Metrics int64 `json:"metrics"`
|
||||||
MgmtDb int64 `json:"mgmt_db"`
|
MgmtDB int64 `json:"mgmt_db"`
|
||||||
Mnesia int64 `json:"mnesia"`
|
Mnesia int64 `json:"mnesia"`
|
||||||
OtherEts int64 `json:"other_ets"`
|
OtherEts int64 `json:"other_ets"`
|
||||||
Binary int64 `json:"binary"`
|
Binary int64 `json:"binary"`
|
||||||
|
|
@ -505,7 +505,7 @@ func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) {
|
||||||
fields["mem_plugins"] = memory.Memory.Plugins
|
fields["mem_plugins"] = memory.Memory.Plugins
|
||||||
fields["mem_other_proc"] = memory.Memory.OtherProc
|
fields["mem_other_proc"] = memory.Memory.OtherProc
|
||||||
fields["mem_metrics"] = memory.Memory.Metrics
|
fields["mem_metrics"] = memory.Memory.Metrics
|
||||||
fields["mem_mgmt_db"] = memory.Memory.MgmtDb
|
fields["mem_mgmt_db"] = memory.Memory.MgmtDB
|
||||||
fields["mem_mnesia"] = memory.Memory.Mnesia
|
fields["mem_mnesia"] = memory.Memory.Mnesia
|
||||||
fields["mem_other_ets"] = memory.Memory.OtherEts
|
fields["mem_other_ets"] = memory.Memory.OtherEts
|
||||||
fields["mem_binary"] = memory.Memory.Binary
|
fields["mem_binary"] = memory.Memory.Binary
|
||||||
|
|
|
||||||
|
|
@ -29,7 +29,7 @@ const (
|
||||||
FROM mce_record
|
FROM mce_record
|
||||||
WHERE timestamp > ?
|
WHERE timestamp > ?
|
||||||
`
|
`
|
||||||
defaultDbPath = "/var/lib/rasdaemon/ras-mc_event.db"
|
defaultDBPath = "/var/lib/rasdaemon/ras-mc_event.db"
|
||||||
dateLayout = "2006-01-02 15:04:05 -0700"
|
dateLayout = "2006-01-02 15:04:05 -0700"
|
||||||
memoryReadCorrected = "memory_read_corrected_errors"
|
memoryReadCorrected = "memory_read_corrected_errors"
|
||||||
memoryReadUncorrected = "memory_read_uncorrectable_errors"
|
memoryReadUncorrected = "memory_read_uncorrectable_errors"
|
||||||
|
|
@ -76,7 +76,7 @@ func (*Ras) SampleConfig() string {
|
||||||
|
|
||||||
// Start initializes connection to DB, metrics are gathered in Gather
|
// Start initializes connection to DB, metrics are gathered in Gather
|
||||||
func (r *Ras) Start(telegraf.Accumulator) error {
|
func (r *Ras) Start(telegraf.Accumulator) error {
|
||||||
err := validateDbPath(r.DBPath)
|
err := validateDBPath(r.DBPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -177,7 +177,7 @@ func (r *Ras) updateServerCounters(mcError *machineCheckError) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func validateDbPath(dbPath string) error {
|
func validateDBPath(dbPath string) error {
|
||||||
pathInfo, err := os.Stat(dbPath)
|
pathInfo, err := os.Stat(dbPath)
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
return fmt.Errorf("provided db_path does not exist: [%s]", dbPath)
|
return fmt.Errorf("provided db_path does not exist: [%s]", dbPath)
|
||||||
|
|
@ -321,7 +321,7 @@ func init() {
|
||||||
//nolint:errcheck // known timestamp
|
//nolint:errcheck // known timestamp
|
||||||
defaultTimestamp, _ := parseDate("1970-01-01 00:00:01 -0700")
|
defaultTimestamp, _ := parseDate("1970-01-01 00:00:01 -0700")
|
||||||
return &Ras{
|
return &Ras{
|
||||||
DBPath: defaultDbPath,
|
DBPath: defaultDBPath,
|
||||||
latestTimestamp: defaultTimestamp,
|
latestTimestamp: defaultTimestamp,
|
||||||
cpuSocketCounters: map[int]metricCounters{
|
cpuSocketCounters: map[int]metricCounters{
|
||||||
0: *newMetricCounters(),
|
0: *newMetricCounters(),
|
||||||
|
|
|
||||||
|
|
@ -136,7 +136,7 @@ func newRas() *Ras {
|
||||||
//nolint:errcheck // known timestamp
|
//nolint:errcheck // known timestamp
|
||||||
defaultTimestamp, _ := parseDate("1970-01-01 00:00:01 -0700")
|
defaultTimestamp, _ := parseDate("1970-01-01 00:00:01 -0700")
|
||||||
return &Ras{
|
return &Ras{
|
||||||
DBPath: defaultDbPath,
|
DBPath: defaultDBPath,
|
||||||
latestTimestamp: defaultTimestamp,
|
latestTimestamp: defaultTimestamp,
|
||||||
cpuSocketCounters: map[int]metricCounters{
|
cpuSocketCounters: map[int]metricCounters{
|
||||||
0: *newMetricCounters(),
|
0: *newMetricCounters(),
|
||||||
|
|
|
||||||
|
|
@ -33,9 +33,9 @@ type RavenDB struct {
|
||||||
Timeout config.Duration `toml:"timeout"`
|
Timeout config.Duration `toml:"timeout"`
|
||||||
|
|
||||||
StatsInclude []string `toml:"stats_include"`
|
StatsInclude []string `toml:"stats_include"`
|
||||||
DbStatsDbs []string `toml:"db_stats_dbs"`
|
DBStatsDBs []string `toml:"db_stats_dbs"`
|
||||||
IndexStatsDbs []string `toml:"index_stats_dbs"`
|
IndexStatsDBs []string `toml:"index_stats_dbs"`
|
||||||
CollectionStatsDbs []string `toml:"collection_stats_dbs"`
|
CollectionStatsDBs []string `toml:"collection_stats_dbs"`
|
||||||
|
|
||||||
tls.ClientConfig
|
tls.ClientConfig
|
||||||
|
|
||||||
|
|
@ -58,9 +58,9 @@ func (r *RavenDB) Init() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
r.requestURLServer = r.URL + "/admin/monitoring/v1/server"
|
r.requestURLServer = r.URL + "/admin/monitoring/v1/server"
|
||||||
r.requestURLDatabases = r.URL + "/admin/monitoring/v1/databases" + prepareDBNamesURLPart(r.DbStatsDbs)
|
r.requestURLDatabases = r.URL + "/admin/monitoring/v1/databases" + prepareDBNamesURLPart(r.DBStatsDBs)
|
||||||
r.requestURLIndexes = r.URL + "/admin/monitoring/v1/indexes" + prepareDBNamesURLPart(r.IndexStatsDbs)
|
r.requestURLIndexes = r.URL + "/admin/monitoring/v1/indexes" + prepareDBNamesURLPart(r.IndexStatsDBs)
|
||||||
r.requestURLCollection = r.URL + "/admin/monitoring/v1/collections" + prepareDBNamesURLPart(r.IndexStatsDbs)
|
r.requestURLCollection = r.URL + "/admin/monitoring/v1/collections" + prepareDBNamesURLPart(r.IndexStatsDBs)
|
||||||
|
|
||||||
err := choice.CheckSlice(r.StatsInclude, []string{"server", "databases", "indexes", "collections"})
|
err := choice.CheckSlice(r.StatsInclude, []string{"server", "databases", "indexes", "collections"})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -305,10 +305,10 @@ func (r *RavenDB) gatherIndexes(acc telegraf.Accumulator) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, perDbIndexResponse := range indexesResponse.Results {
|
for _, perDBIndexResponse := range indexesResponse.Results {
|
||||||
for _, indexResponse := range perDbIndexResponse.Indexes {
|
for _, indexResponse := range perDBIndexResponse.Indexes {
|
||||||
tags := map[string]string{
|
tags := map[string]string{
|
||||||
"database_name": perDbIndexResponse.DatabaseName,
|
"database_name": perDBIndexResponse.DatabaseName,
|
||||||
"index_name": indexResponse.IndexName,
|
"index_name": indexResponse.IndexName,
|
||||||
"node_tag": indexesResponse.NodeTag,
|
"node_tag": indexesResponse.NodeTag,
|
||||||
"url": r.URL,
|
"url": r.URL,
|
||||||
|
|
@ -346,11 +346,11 @@ func (r *RavenDB) gatherCollections(acc telegraf.Accumulator) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, perDbCollectionMetrics := range collectionsResponse.Results {
|
for _, perDBCollectionMetrics := range collectionsResponse.Results {
|
||||||
for _, collectionMetrics := range perDbCollectionMetrics.Collections {
|
for _, collectionMetrics := range perDBCollectionMetrics.Collections {
|
||||||
tags := map[string]string{
|
tags := map[string]string{
|
||||||
"collection_name": collectionMetrics.CollectionName,
|
"collection_name": collectionMetrics.CollectionName,
|
||||||
"database_name": perDbCollectionMetrics.DatabaseName,
|
"database_name": perDBCollectionMetrics.DatabaseName,
|
||||||
"node_tag": collectionsResponse.NodeTag,
|
"node_tag": collectionsResponse.NodeTag,
|
||||||
"url": r.URL,
|
"url": r.URL,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1194,22 +1194,22 @@ func (e *endpoint) alignSamples(info []types.PerfSampleInfo, values []int64, int
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
ts := info[idx].Timestamp
|
ts := info[idx].Timestamp
|
||||||
roundedTs := ts.Truncate(interval)
|
roundedTS := ts.Truncate(interval)
|
||||||
|
|
||||||
// Are we still working on the same bucket?
|
// Are we still working on the same bucket?
|
||||||
if roundedTs == lastBucket {
|
if roundedTS == lastBucket {
|
||||||
bi++
|
bi++
|
||||||
p := len(rValues) - 1
|
p := len(rValues) - 1
|
||||||
rValues[p] = ((bi-1)/bi)*rValues[p] + v/bi
|
rValues[p] = ((bi-1)/bi)*rValues[p] + v/bi
|
||||||
} else {
|
} else {
|
||||||
rValues = append(rValues, v)
|
rValues = append(rValues, v)
|
||||||
roundedInfo := types.PerfSampleInfo{
|
roundedInfo := types.PerfSampleInfo{
|
||||||
Timestamp: roundedTs,
|
Timestamp: roundedTS,
|
||||||
Interval: info[idx].Interval,
|
Interval: info[idx].Interval,
|
||||||
}
|
}
|
||||||
rInfo = append(rInfo, roundedInfo)
|
rInfo = append(rInfo, roundedInfo)
|
||||||
bi = 1.0
|
bi = 1.0
|
||||||
lastBucket = roundedTs
|
lastBucket = roundedTS
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return rInfo, rValues
|
return rInfo, rValues
|
||||||
|
|
@ -1318,8 +1318,8 @@ func (e *endpoint) collectChunk(
|
||||||
count++
|
count++
|
||||||
|
|
||||||
// Update hiwater marks
|
// Update hiwater marks
|
||||||
adjTs := ts.Add(interval).Truncate(interval).Add(-time.Second)
|
adjTS := ts.Add(interval).Truncate(interval).Add(-time.Second)
|
||||||
e.hwMarks.put(moid, name, adjTs)
|
e.hwMarks.put(moid, name, adjTS)
|
||||||
}
|
}
|
||||||
if nValues == 0 {
|
if nValues == 0 {
|
||||||
e.log.Debugf("Missing value for: %s, %s", name, objectRef.name)
|
e.log.Debugf("Missing value for: %s, %s", name, objectRef.name)
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,7 @@ import (
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
const tsDbName = "testDb"
|
const tsDBName = "testDb"
|
||||||
|
|
||||||
const testSingleTableName = "SingleTableName"
|
const testSingleTableName = "SingleTableName"
|
||||||
const testSingleTableDim = "namespace"
|
const testSingleTableDim = "namespace"
|
||||||
|
|
@ -77,13 +77,13 @@ func TestConnectValidatesConfigParameters(t *testing.T) {
|
||||||
require.ErrorContains(t, noDatabaseName.Connect(), "'database_name' key is required")
|
require.ErrorContains(t, noDatabaseName.Connect(), "'database_name' key is required")
|
||||||
|
|
||||||
noMappingMode := Timestream{
|
noMappingMode := Timestream{
|
||||||
DatabaseName: tsDbName,
|
DatabaseName: tsDBName,
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
}
|
}
|
||||||
require.ErrorContains(t, noMappingMode.Connect(), "'mapping_mode' key is required")
|
require.ErrorContains(t, noMappingMode.Connect(), "'mapping_mode' key is required")
|
||||||
|
|
||||||
incorrectMappingMode := Timestream{
|
incorrectMappingMode := Timestream{
|
||||||
DatabaseName: tsDbName,
|
DatabaseName: tsDBName,
|
||||||
MappingMode: "foo",
|
MappingMode: "foo",
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
}
|
}
|
||||||
|
|
@ -91,7 +91,7 @@ func TestConnectValidatesConfigParameters(t *testing.T) {
|
||||||
|
|
||||||
// multi-measure config validation multi table mode
|
// multi-measure config validation multi table mode
|
||||||
validConfigMultiMeasureMultiTableMode := Timestream{
|
validConfigMultiMeasureMultiTableMode := Timestream{
|
||||||
DatabaseName: tsDbName,
|
DatabaseName: tsDBName,
|
||||||
MappingMode: MappingModeMultiTable,
|
MappingMode: MappingModeMultiTable,
|
||||||
UseMultiMeasureRecords: true,
|
UseMultiMeasureRecords: true,
|
||||||
MeasureNameForMultiMeasureRecords: "multi-measure-name",
|
MeasureNameForMultiMeasureRecords: "multi-measure-name",
|
||||||
|
|
@ -100,7 +100,7 @@ func TestConnectValidatesConfigParameters(t *testing.T) {
|
||||||
require.NoError(t, validConfigMultiMeasureMultiTableMode.Connect())
|
require.NoError(t, validConfigMultiMeasureMultiTableMode.Connect())
|
||||||
|
|
||||||
invalidConfigMultiMeasureMultiTableMode := Timestream{
|
invalidConfigMultiMeasureMultiTableMode := Timestream{
|
||||||
DatabaseName: tsDbName,
|
DatabaseName: tsDBName,
|
||||||
MappingMode: MappingModeMultiTable,
|
MappingMode: MappingModeMultiTable,
|
||||||
UseMultiMeasureRecords: true,
|
UseMultiMeasureRecords: true,
|
||||||
// without MeasureNameForMultiMeasureRecords set we expect validation failure
|
// without MeasureNameForMultiMeasureRecords set we expect validation failure
|
||||||
|
|
@ -110,7 +110,7 @@ func TestConnectValidatesConfigParameters(t *testing.T) {
|
||||||
|
|
||||||
// multi-measure config validation single table mode
|
// multi-measure config validation single table mode
|
||||||
validConfigMultiMeasureSingleTableMode := Timestream{
|
validConfigMultiMeasureSingleTableMode := Timestream{
|
||||||
DatabaseName: tsDbName,
|
DatabaseName: tsDBName,
|
||||||
MappingMode: MappingModeSingleTable,
|
MappingMode: MappingModeSingleTable,
|
||||||
SingleTableName: testSingleTableName,
|
SingleTableName: testSingleTableName,
|
||||||
UseMultiMeasureRecords: true, // MeasureNameForMultiMeasureRecords is not needed as
|
UseMultiMeasureRecords: true, // MeasureNameForMultiMeasureRecords is not needed as
|
||||||
|
|
@ -120,7 +120,7 @@ func TestConnectValidatesConfigParameters(t *testing.T) {
|
||||||
require.NoError(t, validConfigMultiMeasureSingleTableMode.Connect())
|
require.NoError(t, validConfigMultiMeasureSingleTableMode.Connect())
|
||||||
|
|
||||||
invalidConfigMultiMeasureSingleTableMode := Timestream{
|
invalidConfigMultiMeasureSingleTableMode := Timestream{
|
||||||
DatabaseName: tsDbName,
|
DatabaseName: tsDBName,
|
||||||
MappingMode: MappingModeSingleTable,
|
MappingMode: MappingModeSingleTable,
|
||||||
SingleTableName: testSingleTableName,
|
SingleTableName: testSingleTableName,
|
||||||
UseMultiMeasureRecords: true,
|
UseMultiMeasureRecords: true,
|
||||||
|
|
@ -134,14 +134,14 @@ func TestConnectValidatesConfigParameters(t *testing.T) {
|
||||||
|
|
||||||
// multi-table arguments
|
// multi-table arguments
|
||||||
validMappingModeMultiTable := Timestream{
|
validMappingModeMultiTable := Timestream{
|
||||||
DatabaseName: tsDbName,
|
DatabaseName: tsDBName,
|
||||||
MappingMode: MappingModeMultiTable,
|
MappingMode: MappingModeMultiTable,
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
}
|
}
|
||||||
require.NoError(t, validMappingModeMultiTable.Connect())
|
require.NoError(t, validMappingModeMultiTable.Connect())
|
||||||
|
|
||||||
singleTableNameWithMultiTable := Timestream{
|
singleTableNameWithMultiTable := Timestream{
|
||||||
DatabaseName: tsDbName,
|
DatabaseName: tsDBName,
|
||||||
MappingMode: MappingModeMultiTable,
|
MappingMode: MappingModeMultiTable,
|
||||||
SingleTableName: testSingleTableName,
|
SingleTableName: testSingleTableName,
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
|
|
@ -149,7 +149,7 @@ func TestConnectValidatesConfigParameters(t *testing.T) {
|
||||||
require.Contains(t, singleTableNameWithMultiTable.Connect().Error(), "SingleTableName")
|
require.Contains(t, singleTableNameWithMultiTable.Connect().Error(), "SingleTableName")
|
||||||
|
|
||||||
singleTableDimensionWithMultiTable := Timestream{
|
singleTableDimensionWithMultiTable := Timestream{
|
||||||
DatabaseName: tsDbName,
|
DatabaseName: tsDBName,
|
||||||
MappingMode: MappingModeMultiTable,
|
MappingMode: MappingModeMultiTable,
|
||||||
SingleTableDimensionNameForTelegrafMeasurementName: testSingleTableDim,
|
SingleTableDimensionNameForTelegrafMeasurementName: testSingleTableDim,
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
|
|
@ -159,14 +159,14 @@ func TestConnectValidatesConfigParameters(t *testing.T) {
|
||||||
|
|
||||||
// single-table arguments
|
// single-table arguments
|
||||||
noTableNameMappingModeSingleTable := Timestream{
|
noTableNameMappingModeSingleTable := Timestream{
|
||||||
DatabaseName: tsDbName,
|
DatabaseName: tsDBName,
|
||||||
MappingMode: MappingModeSingleTable,
|
MappingMode: MappingModeSingleTable,
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
}
|
}
|
||||||
require.Contains(t, noTableNameMappingModeSingleTable.Connect().Error(), "SingleTableName")
|
require.Contains(t, noTableNameMappingModeSingleTable.Connect().Error(), "SingleTableName")
|
||||||
|
|
||||||
noDimensionNameMappingModeSingleTable := Timestream{
|
noDimensionNameMappingModeSingleTable := Timestream{
|
||||||
DatabaseName: tsDbName,
|
DatabaseName: tsDBName,
|
||||||
MappingMode: MappingModeSingleTable,
|
MappingMode: MappingModeSingleTable,
|
||||||
SingleTableName: testSingleTableName,
|
SingleTableName: testSingleTableName,
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
|
|
@ -175,7 +175,7 @@ func TestConnectValidatesConfigParameters(t *testing.T) {
|
||||||
"SingleTableDimensionNameForTelegrafMeasurementName")
|
"SingleTableDimensionNameForTelegrafMeasurementName")
|
||||||
|
|
||||||
validConfigurationMappingModeSingleTable := Timestream{
|
validConfigurationMappingModeSingleTable := Timestream{
|
||||||
DatabaseName: tsDbName,
|
DatabaseName: tsDBName,
|
||||||
MappingMode: MappingModeSingleTable,
|
MappingMode: MappingModeSingleTable,
|
||||||
SingleTableName: testSingleTableName,
|
SingleTableName: testSingleTableName,
|
||||||
SingleTableDimensionNameForTelegrafMeasurementName: testSingleTableDim,
|
SingleTableDimensionNameForTelegrafMeasurementName: testSingleTableDim,
|
||||||
|
|
@ -185,7 +185,7 @@ func TestConnectValidatesConfigParameters(t *testing.T) {
|
||||||
|
|
||||||
// create table arguments
|
// create table arguments
|
||||||
createTableNoMagneticRetention := Timestream{
|
createTableNoMagneticRetention := Timestream{
|
||||||
DatabaseName: tsDbName,
|
DatabaseName: tsDBName,
|
||||||
MappingMode: MappingModeMultiTable,
|
MappingMode: MappingModeMultiTable,
|
||||||
CreateTableIfNotExists: true,
|
CreateTableIfNotExists: true,
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
|
|
@ -194,7 +194,7 @@ func TestConnectValidatesConfigParameters(t *testing.T) {
|
||||||
"CreateTableMagneticStoreRetentionPeriodInDays")
|
"CreateTableMagneticStoreRetentionPeriodInDays")
|
||||||
|
|
||||||
createTableNoMemoryRetention := Timestream{
|
createTableNoMemoryRetention := Timestream{
|
||||||
DatabaseName: tsDbName,
|
DatabaseName: tsDBName,
|
||||||
MappingMode: MappingModeMultiTable,
|
MappingMode: MappingModeMultiTable,
|
||||||
CreateTableIfNotExists: true,
|
CreateTableIfNotExists: true,
|
||||||
CreateTableMagneticStoreRetentionPeriodInDays: 3,
|
CreateTableMagneticStoreRetentionPeriodInDays: 3,
|
||||||
|
|
@ -204,7 +204,7 @@ func TestConnectValidatesConfigParameters(t *testing.T) {
|
||||||
"CreateTableMemoryStoreRetentionPeriodInHours")
|
"CreateTableMemoryStoreRetentionPeriodInHours")
|
||||||
|
|
||||||
createTableValid := Timestream{
|
createTableValid := Timestream{
|
||||||
DatabaseName: tsDbName,
|
DatabaseName: tsDBName,
|
||||||
MappingMode: MappingModeMultiTable,
|
MappingMode: MappingModeMultiTable,
|
||||||
CreateTableIfNotExists: true,
|
CreateTableIfNotExists: true,
|
||||||
CreateTableMagneticStoreRetentionPeriodInDays: 3,
|
CreateTableMagneticStoreRetentionPeriodInDays: 3,
|
||||||
|
|
@ -215,7 +215,7 @@ func TestConnectValidatesConfigParameters(t *testing.T) {
|
||||||
|
|
||||||
// describe table on start arguments
|
// describe table on start arguments
|
||||||
describeTableInvoked := Timestream{
|
describeTableInvoked := Timestream{
|
||||||
DatabaseName: tsDbName,
|
DatabaseName: tsDBName,
|
||||||
MappingMode: MappingModeMultiTable,
|
MappingMode: MappingModeMultiTable,
|
||||||
DescribeDatabaseOnStart: true,
|
DescribeDatabaseOnStart: true,
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
|
|
@ -254,7 +254,7 @@ func TestWriteMultiMeasuresSingleTableMode(t *testing.T) {
|
||||||
plugin := Timestream{
|
plugin := Timestream{
|
||||||
MappingMode: MappingModeSingleTable,
|
MappingMode: MappingModeSingleTable,
|
||||||
SingleTableName: "test-multi-single-table-mode",
|
SingleTableName: "test-multi-single-table-mode",
|
||||||
DatabaseName: tsDbName,
|
DatabaseName: tsDBName,
|
||||||
UseMultiMeasureRecords: true, // use multi
|
UseMultiMeasureRecords: true, // use multi
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
}
|
}
|
||||||
|
|
@ -311,7 +311,7 @@ func TestWriteMultiMeasuresMultiTableMode(t *testing.T) {
|
||||||
|
|
||||||
plugin := Timestream{
|
plugin := Timestream{
|
||||||
MappingMode: MappingModeMultiTable,
|
MappingMode: MappingModeMultiTable,
|
||||||
DatabaseName: tsDbName,
|
DatabaseName: tsDBName,
|
||||||
UseMultiMeasureRecords: true, // use multi
|
UseMultiMeasureRecords: true, // use multi
|
||||||
MeasureNameForMultiMeasureRecords: "config-multi-measure-name",
|
MeasureNameForMultiMeasureRecords: "config-multi-measure-name",
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
|
|
@ -407,7 +407,7 @@ func TestBuildMultiMeasuresInSingleAndMultiTableMode(t *testing.T) {
|
||||||
|
|
||||||
plugin := Timestream{
|
plugin := Timestream{
|
||||||
MappingMode: MappingModeMultiTable,
|
MappingMode: MappingModeMultiTable,
|
||||||
DatabaseName: tsDbName,
|
DatabaseName: tsDBName,
|
||||||
UseMultiMeasureRecords: true, // use multi
|
UseMultiMeasureRecords: true, // use multi
|
||||||
MeasureNameForMultiMeasureRecords: "config-multi-measure-name",
|
MeasureNameForMultiMeasureRecords: "config-multi-measure-name",
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
|
|
@ -431,7 +431,7 @@ func TestBuildMultiMeasuresInSingleAndMultiTableMode(t *testing.T) {
|
||||||
plugin = Timestream{
|
plugin = Timestream{
|
||||||
MappingMode: MappingModeSingleTable,
|
MappingMode: MappingModeSingleTable,
|
||||||
SingleTableName: "singleTableName",
|
SingleTableName: "singleTableName",
|
||||||
DatabaseName: tsDbName,
|
DatabaseName: tsDBName,
|
||||||
UseMultiMeasureRecords: true, // use multi
|
UseMultiMeasureRecords: true, // use multi
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
}
|
}
|
||||||
|
|
@ -518,7 +518,7 @@ func buildExpectedMultiRecords(multiMeasureName, tableName string) *timestreamwr
|
||||||
recordsMultiTableMode = append(recordsMultiTableMode, recordUint64...)
|
recordsMultiTableMode = append(recordsMultiTableMode, recordUint64...)
|
||||||
|
|
||||||
expectedResultMultiTable := ×treamwrite.WriteRecordsInput{
|
expectedResultMultiTable := ×treamwrite.WriteRecordsInput{
|
||||||
DatabaseName: aws.String(tsDbName),
|
DatabaseName: aws.String(tsDBName),
|
||||||
TableName: aws.String(tableName),
|
TableName: aws.String(tableName),
|
||||||
Records: recordsMultiTableMode,
|
Records: recordsMultiTableMode,
|
||||||
CommonAttributes: &types.Record{},
|
CommonAttributes: &types.Record{},
|
||||||
|
|
@ -563,7 +563,7 @@ func TestThrottlingErrorIsReturnedToTelegraf(t *testing.T) {
|
||||||
|
|
||||||
plugin := Timestream{
|
plugin := Timestream{
|
||||||
MappingMode: MappingModeMultiTable,
|
MappingMode: MappingModeMultiTable,
|
||||||
DatabaseName: tsDbName,
|
DatabaseName: tsDBName,
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
}
|
}
|
||||||
require.NoError(t, plugin.Connect())
|
require.NoError(t, plugin.Connect())
|
||||||
|
|
@ -589,7 +589,7 @@ func TestRejectedRecordsErrorResultsInMetricsBeingSkipped(t *testing.T) {
|
||||||
|
|
||||||
plugin := Timestream{
|
plugin := Timestream{
|
||||||
MappingMode: MappingModeMultiTable,
|
MappingMode: MappingModeMultiTable,
|
||||||
DatabaseName: tsDbName,
|
DatabaseName: tsDBName,
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
}
|
}
|
||||||
require.NoError(t, plugin.Connect())
|
require.NoError(t, plugin.Connect())
|
||||||
|
|
@ -618,7 +618,7 @@ func TestWriteWhenRequestsGreaterThanMaxWriteGoRoutinesCount(t *testing.T) {
|
||||||
|
|
||||||
plugin := Timestream{
|
plugin := Timestream{
|
||||||
MappingMode: MappingModeMultiTable,
|
MappingMode: MappingModeMultiTable,
|
||||||
DatabaseName: tsDbName,
|
DatabaseName: tsDBName,
|
||||||
// Spawn only one go routine to serve all 5 write requests
|
// Spawn only one go routine to serve all 5 write requests
|
||||||
MaxWriteGoRoutinesCount: 2,
|
MaxWriteGoRoutinesCount: 2,
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
|
|
@ -657,7 +657,7 @@ func TestWriteWhenRequestsLesserThanMaxWriteGoRoutinesCount(t *testing.T) {
|
||||||
|
|
||||||
plugin := Timestream{
|
plugin := Timestream{
|
||||||
MappingMode: MappingModeMultiTable,
|
MappingMode: MappingModeMultiTable,
|
||||||
DatabaseName: tsDbName,
|
DatabaseName: tsDBName,
|
||||||
// Spawn 5 parallel go routines to serve 2 write requests
|
// Spawn 5 parallel go routines to serve 2 write requests
|
||||||
// In this case only 2 of the 5 go routines will process the write requests
|
// In this case only 2 of the 5 go routines will process the write requests
|
||||||
MaxWriteGoRoutinesCount: 5,
|
MaxWriteGoRoutinesCount: 5,
|
||||||
|
|
@ -724,7 +724,7 @@ func TestTransformMetricsSkipEmptyMetric(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
expectedResultSingleTable := ×treamwrite.WriteRecordsInput{
|
expectedResultSingleTable := ×treamwrite.WriteRecordsInput{
|
||||||
DatabaseName: aws.String(tsDbName),
|
DatabaseName: aws.String(tsDBName),
|
||||||
TableName: aws.String(testSingleTableName),
|
TableName: aws.String(testSingleTableName),
|
||||||
Records: records,
|
Records: records,
|
||||||
CommonAttributes: &types.Record{},
|
CommonAttributes: &types.Record{},
|
||||||
|
|
@ -750,7 +750,7 @@ func TestTransformMetricsSkipEmptyMetric(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
expectedResultMultiTable := ×treamwrite.WriteRecordsInput{
|
expectedResultMultiTable := ×treamwrite.WriteRecordsInput{
|
||||||
DatabaseName: aws.String(tsDbName),
|
DatabaseName: aws.String(tsDBName),
|
||||||
TableName: aws.String(metricName1),
|
TableName: aws.String(metricName1),
|
||||||
Records: recordsMulti,
|
Records: recordsMulti,
|
||||||
CommonAttributes: &types.Record{},
|
CommonAttributes: &types.Record{},
|
||||||
|
|
@ -854,7 +854,7 @@ func TestTransformMetricsRequestsAboveLimitAreSplitSingleTable(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
expectedResult1SingleTable := ×treamwrite.WriteRecordsInput{
|
expectedResult1SingleTable := ×treamwrite.WriteRecordsInput{
|
||||||
DatabaseName: aws.String(tsDbName),
|
DatabaseName: aws.String(tsDBName),
|
||||||
TableName: aws.String(testSingleTableName),
|
TableName: aws.String(testSingleTableName),
|
||||||
Records: recordsFirstReq,
|
Records: recordsFirstReq,
|
||||||
CommonAttributes: &types.Record{},
|
CommonAttributes: &types.Record{},
|
||||||
|
|
@ -872,7 +872,7 @@ func TestTransformMetricsRequestsAboveLimitAreSplitSingleTable(t *testing.T) {
|
||||||
})...)
|
})...)
|
||||||
|
|
||||||
expectedResult2SingleTable := ×treamwrite.WriteRecordsInput{
|
expectedResult2SingleTable := ×treamwrite.WriteRecordsInput{
|
||||||
DatabaseName: aws.String(tsDbName),
|
DatabaseName: aws.String(tsDBName),
|
||||||
TableName: aws.String(testSingleTableName),
|
TableName: aws.String(testSingleTableName),
|
||||||
Records: recordsSecondReq,
|
Records: recordsSecondReq,
|
||||||
CommonAttributes: &types.Record{},
|
CommonAttributes: &types.Record{},
|
||||||
|
|
@ -918,7 +918,7 @@ func TestTransformMetricsDifferentDimensionsSameTimestampsAreWrittenSeparate(t *
|
||||||
})
|
})
|
||||||
|
|
||||||
expectedResultSingleTable := ×treamwrite.WriteRecordsInput{
|
expectedResultSingleTable := ×treamwrite.WriteRecordsInput{
|
||||||
DatabaseName: aws.String(tsDbName),
|
DatabaseName: aws.String(tsDBName),
|
||||||
TableName: aws.String(testSingleTableName),
|
TableName: aws.String(testSingleTableName),
|
||||||
Records: recordsSingle,
|
Records: recordsSingle,
|
||||||
CommonAttributes: &types.Record{},
|
CommonAttributes: &types.Record{},
|
||||||
|
|
@ -981,7 +981,7 @@ func TestTransformMetricsSameDimensionsDifferentDimensionValuesAreWrittenSeparat
|
||||||
})
|
})
|
||||||
|
|
||||||
expectedResultSingleTable := ×treamwrite.WriteRecordsInput{
|
expectedResultSingleTable := ×treamwrite.WriteRecordsInput{
|
||||||
DatabaseName: aws.String(tsDbName),
|
DatabaseName: aws.String(tsDBName),
|
||||||
TableName: aws.String(testSingleTableName),
|
TableName: aws.String(testSingleTableName),
|
||||||
Records: recordsSingle,
|
Records: recordsSingle,
|
||||||
CommonAttributes: &types.Record{},
|
CommonAttributes: &types.Record{},
|
||||||
|
|
@ -1043,7 +1043,7 @@ func TestTransformMetricsSameDimensionsDifferentTimestampsAreWrittenSeparate(t *
|
||||||
})
|
})
|
||||||
|
|
||||||
expectedResultSingleTable := ×treamwrite.WriteRecordsInput{
|
expectedResultSingleTable := ×treamwrite.WriteRecordsInput{
|
||||||
DatabaseName: aws.String(tsDbName),
|
DatabaseName: aws.String(tsDBName),
|
||||||
TableName: aws.String(testSingleTableName),
|
TableName: aws.String(testSingleTableName),
|
||||||
Records: recordsSingle,
|
Records: recordsSingle,
|
||||||
CommonAttributes: &types.Record{},
|
CommonAttributes: &types.Record{},
|
||||||
|
|
@ -1069,7 +1069,7 @@ func TestTransformMetricsSameDimensionsDifferentTimestampsAreWrittenSeparate(t *
|
||||||
})
|
})
|
||||||
|
|
||||||
expectedResultMultiTable := ×treamwrite.WriteRecordsInput{
|
expectedResultMultiTable := ×treamwrite.WriteRecordsInput{
|
||||||
DatabaseName: aws.String(tsDbName),
|
DatabaseName: aws.String(tsDBName),
|
||||||
TableName: aws.String(metricName1),
|
TableName: aws.String(metricName1),
|
||||||
Records: recordsMultiTable,
|
Records: recordsMultiTable,
|
||||||
CommonAttributes: &types.Record{},
|
CommonAttributes: &types.Record{},
|
||||||
|
|
@ -1155,7 +1155,7 @@ func TestTransformMetricsDifferentMetricsAreWrittenToDifferentTablesInMultiTable
|
||||||
})
|
})
|
||||||
|
|
||||||
expectedResultSingleTable := ×treamwrite.WriteRecordsInput{
|
expectedResultSingleTable := ×treamwrite.WriteRecordsInput{
|
||||||
DatabaseName: aws.String(tsDbName),
|
DatabaseName: aws.String(tsDBName),
|
||||||
TableName: aws.String(testSingleTableName),
|
TableName: aws.String(testSingleTableName),
|
||||||
Records: recordsSingle,
|
Records: recordsSingle,
|
||||||
CommonAttributes: &types.Record{},
|
CommonAttributes: &types.Record{},
|
||||||
|
|
@ -1219,7 +1219,7 @@ func TestCustomEndpoint(t *testing.T) {
|
||||||
customEndpoint := "http://test.custom.endpoint.com"
|
customEndpoint := "http://test.custom.endpoint.com"
|
||||||
plugin := Timestream{
|
plugin := Timestream{
|
||||||
MappingMode: MappingModeMultiTable,
|
MappingMode: MappingModeMultiTable,
|
||||||
DatabaseName: tsDbName,
|
DatabaseName: tsDBName,
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
CredentialConfig: common_aws.CredentialConfig{EndpointURL: customEndpoint},
|
CredentialConfig: common_aws.CredentialConfig{EndpointURL: customEndpoint},
|
||||||
}
|
}
|
||||||
|
|
@ -1241,7 +1241,7 @@ func comparisonTest(t *testing.T,
|
||||||
case MappingModeSingleTable:
|
case MappingModeSingleTable:
|
||||||
plugin = Timestream{
|
plugin = Timestream{
|
||||||
MappingMode: mappingMode,
|
MappingMode: mappingMode,
|
||||||
DatabaseName: tsDbName,
|
DatabaseName: tsDBName,
|
||||||
|
|
||||||
SingleTableName: testSingleTableName,
|
SingleTableName: testSingleTableName,
|
||||||
SingleTableDimensionNameForTelegrafMeasurementName: testSingleTableDim,
|
SingleTableDimensionNameForTelegrafMeasurementName: testSingleTableDim,
|
||||||
|
|
@ -1250,7 +1250,7 @@ func comparisonTest(t *testing.T,
|
||||||
case MappingModeMultiTable:
|
case MappingModeMultiTable:
|
||||||
plugin = Timestream{
|
plugin = Timestream{
|
||||||
MappingMode: mappingMode,
|
MappingMode: mappingMode,
|
||||||
DatabaseName: tsDbName,
|
DatabaseName: tsDBName,
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -1337,7 +1337,7 @@ func buildExpectedInput(i SimpleInput) *timestreamwrite.WriteRecordsInput {
|
||||||
}
|
}
|
||||||
|
|
||||||
result := ×treamwrite.WriteRecordsInput{
|
result := ×treamwrite.WriteRecordsInput{
|
||||||
DatabaseName: aws.String(tsDbName),
|
DatabaseName: aws.String(tsDBName),
|
||||||
TableName: aws.String(i.tableName),
|
TableName: aws.String(i.tableName),
|
||||||
Records: tsRecords,
|
Records: tsRecords,
|
||||||
CommonAttributes: &types.Record{},
|
CommonAttributes: &types.Record{},
|
||||||
|
|
|
||||||
|
|
@ -119,12 +119,12 @@ type Parser struct {
|
||||||
// "RESPONSE_CODE": "%{NUMBER:rc:tag}"
|
// "RESPONSE_CODE": "%{NUMBER:rc:tag}"
|
||||||
// }
|
// }
|
||||||
patternsMap map[string]string
|
patternsMap map[string]string
|
||||||
// foundTsLayouts is a slice of timestamp patterns that have been found
|
// foundTSLayouts is a slice of timestamp patterns that have been found
|
||||||
// in the log lines. This slice gets updated if the user uses the generic
|
// in the log lines. This slice gets updated if the user uses the generic
|
||||||
// 'ts' modifier for timestamps. This slice is checked first for matches,
|
// 'ts' modifier for timestamps. This slice is checked first for matches,
|
||||||
// so that previously-matched layouts get priority over all other timestamp
|
// so that previously-matched layouts get priority over all other timestamp
|
||||||
// layouts.
|
// layouts.
|
||||||
foundTsLayouts []string
|
foundTSLayouts []string
|
||||||
|
|
||||||
timeFunc func() time.Time
|
timeFunc func() time.Time
|
||||||
g *grok.Grok
|
g *grok.Grok
|
||||||
|
|
@ -329,32 +329,32 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
|
||||||
p.Log.Errorf("Error parsing %s to time layout [%s]: %s", v, t, err)
|
p.Log.Errorf("Error parsing %s to time layout [%s]: %s", v, t, err)
|
||||||
}
|
}
|
||||||
case GenericTimestamp:
|
case GenericTimestamp:
|
||||||
var foundTs bool
|
var foundTS bool
|
||||||
// first try timestamp layouts that we've already found
|
// first try timestamp layouts that we've already found
|
||||||
for _, layout := range p.foundTsLayouts {
|
for _, layout := range p.foundTSLayouts {
|
||||||
ts, err := internal.ParseTimestamp(layout, v, p.loc)
|
ts, err := internal.ParseTimestamp(layout, v, p.loc)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
timestamp = ts
|
timestamp = ts
|
||||||
foundTs = true
|
foundTS = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// if we haven't found a timestamp layout yet, try all timestamp
|
// if we haven't found a timestamp layout yet, try all timestamp
|
||||||
// layouts.
|
// layouts.
|
||||||
if !foundTs {
|
if !foundTS {
|
||||||
for _, layout := range timeLayouts {
|
for _, layout := range timeLayouts {
|
||||||
ts, err := internal.ParseTimestamp(layout, v, p.loc)
|
ts, err := internal.ParseTimestamp(layout, v, p.loc)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
timestamp = ts
|
timestamp = ts
|
||||||
foundTs = true
|
foundTS = true
|
||||||
p.foundTsLayouts = append(p.foundTsLayouts, layout)
|
p.foundTSLayouts = append(p.foundTSLayouts, layout)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// if we still haven't found a timestamp layout, log it and we will
|
// if we still haven't found a timestamp layout, log it and we will
|
||||||
// just use time.Now()
|
// just use time.Now()
|
||||||
if !foundTs {
|
if !foundTS {
|
||||||
p.Log.Errorf("Error parsing timestamp [%s], could not find any "+
|
p.Log.Errorf("Error parsing timestamp [%s], could not find any "+
|
||||||
"suitable time layouts.", v)
|
"suitable time layouts.", v)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -25,19 +25,19 @@ func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, ts := range req.Timeseries {
|
for _, ts := range req.Timeseries {
|
||||||
var metricsFromTs []telegraf.Metric
|
var metricsFromTS []telegraf.Metric
|
||||||
switch p.MetricVersion {
|
switch p.MetricVersion {
|
||||||
case 0, 2:
|
case 0, 2:
|
||||||
metricsFromTs, err = p.extractMetricsV2(&ts)
|
metricsFromTS, err = p.extractMetricsV2(&ts)
|
||||||
case 1:
|
case 1:
|
||||||
metricsFromTs, err = p.extractMetricsV1(&ts)
|
metricsFromTS, err = p.extractMetricsV1(&ts)
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("unknown prometheus metric version %d", p.MetricVersion)
|
return nil, fmt.Errorf("unknown prometheus metric version %d", p.MetricVersion)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
metrics = append(metrics, metricsFromTs...)
|
metrics = append(metrics, metricsFromTS...)
|
||||||
}
|
}
|
||||||
|
|
||||||
return metrics, err
|
return metrics, err
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue