Add configurable number of 'most recent' date-stamped indices to gather in Elasticsearch input (#8543)

Add configurable number of 'most recent' date-stamped indices to gather in the Elasticsearch input plugin, and allow wildcards to account for date-suffixed index names. Configuring '3' for num_most_recent_indices will only gather the 3 latest indices, based on the date or number they end with. Finding the date or number is dependent on the targeted indices being configured with wildcards at the end of their 'base' names.
This commit is contained in:
David Bennett 2020-12-21 11:45:19 -05:00 committed by GitHub
parent a7dff56dde
commit c47fcf6626
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 2206 additions and 47 deletions

View File

@ -53,6 +53,7 @@ Note that specific statistics information can change between Elasticsearch versi
cluster_stats_only_from_master = true
## Indices to collect; can be one or more indices names or _all
## Use of wildcards is allowed. Use a wildcard at the end to retrieve index names that end with a changing value, like a date.
indices_include = ["_all"]
## One of "shards", "cluster", "indices"
@ -74,6 +75,10 @@ Note that specific statistics information can change between Elasticsearch versi
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Sets the number of most recent indices to return for indices that are configured with a date-stamped suffix.
## Each 'indices_include' entry ending with a wildcard (*) or glob matching pattern will group together all indices that match it, and ## sort them by the date or number after the wildcard. Metrics then are gathered for only the 'num_most_recent_indices' amount of most ## recent indices.
# num_most_recent_indices = 0
```
### Metrics

View File

@ -12,6 +12,7 @@ import (
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
@ -115,6 +116,7 @@ const sampleConfig = `
cluster_stats_only_from_master = true
## Indices to collect; can be one or more indices names or _all
## Use of wildcards is allowed. Use a wildcard at the end to retrieve index names that end with a changing value, like a date.
indices_include = ["_all"]
## One of "shards", "cluster", "indices"
@ -135,6 +137,11 @@ const sampleConfig = `
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Sets the number of most recent indices to return for indices that are configured with a date-stamped suffix.
## Each 'indices_include' entry ending with a wildcard (*) or glob matching pattern will group together all indices that match it, and sort them
## by the date or number after the wildcard. Metrics then are gathered for only the 'num_most_recent_indices' amount of most recent indices.
# num_most_recent_indices = 0
`
// Elasticsearch is a plugin to read stats from one or many Elasticsearch
@ -152,11 +159,14 @@ type Elasticsearch struct {
NodeStats []string `toml:"node_stats"`
Username string `toml:"username"`
Password string `toml:"password"`
NumMostRecentIndices int `toml:"num_most_recent_indices"`
tls.ClientConfig
client *http.Client
serverInfo map[string]serverInfo
serverInfoMutex sync.Mutex
indexMatchers map[string]filter.Filter
}
type serverInfo struct {
nodeID string
@ -214,6 +224,19 @@ func (e *Elasticsearch) Description() string {
return "Read stats from one or more Elasticsearch servers or clusters"
}
// Init the plugin.
func (e *Elasticsearch) Init() error {
// Compile the configured indexes to match for sorting.
indexMatchers, err := e.compileIndexMatchers()
if err != nil {
return err
}
e.indexMatchers = indexMatchers
return nil
}
// Gather reads the stats from Elasticsearch and writes it to the
// Accumulator.
func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
@ -527,66 +550,135 @@ func (e *Elasticsearch) gatherIndicesStats(url string, acc telegraf.Accumulator)
acc.AddFields("elasticsearch_indices_stats_"+m, jsonParser.Fields, map[string]string{"index_name": "_all"}, now)
}
// Individual Indices stats
for id, index := range indicesStats.Indices {
indexTag := map[string]string{"index_name": id}
stats := map[string]interface{}{
"primaries": index.Primaries,
"total": index.Total,
// Gather stats for each index.
err := e.gatherIndividualIndicesStats(indicesStats.Indices, now, acc)
return err
}
// gatherSortedIndicesStats gathers stats for all indices in no particular order.
func (e *Elasticsearch) gatherIndividualIndicesStats(indices map[string]indexStat, now time.Time, acc telegraf.Accumulator) error {
// Sort indices into buckets based on their configured prefix, if any matches.
categorizedIndexNames, err := e.categorizeIndices(indices)
if err != nil {
return err
}
for _, matchingIndices := range categorizedIndexNames {
// Establish the number of each category of indices to use. User can configure to use only the latest 'X' amount.
indicesCount := len(matchingIndices)
indicesToTrackCount := indicesCount
// Sort the indices if configured to do so.
if e.NumMostRecentIndices > 0 {
if e.NumMostRecentIndices < indicesToTrackCount {
indicesToTrackCount = e.NumMostRecentIndices
}
sort.Strings(matchingIndices)
}
for m, s := range stats {
f := jsonparser.JSONFlattener{}
// parse Json, getting strings and bools
err := f.FullFlattenJSON("", s, true, true)
// Gather only the number of indexes that have been configured, in descending order (most recent, if date-stamped).
for i := indicesCount - 1; i >= indicesCount-indicesToTrackCount; i-- {
indexName := matchingIndices[i]
err := e.gatherSingleIndexStats(indexName, indices[indexName], now, acc)
if err != nil {
return err
}
acc.AddFields("elasticsearch_indices_stats_"+m, f.Fields, indexTag, now)
}
}
return nil
}
func (e *Elasticsearch) categorizeIndices(indices map[string]indexStat) (map[string][]string, error) {
categorizedIndexNames := map[string][]string{}
// If all indices are configured to be gathered, bucket them all together.
if len(e.IndicesInclude) == 0 || e.IndicesInclude[0] == "_all" {
for indexName := range indices {
categorizedIndexNames["_all"] = append(categorizedIndexNames["_all"], indexName)
}
if e.IndicesLevel == "shards" {
for shardNumber, shards := range index.Shards {
for _, shard := range shards {
return categorizedIndexNames, nil
}
// Get Shard Stats
flattened := jsonparser.JSONFlattener{}
err := flattened.FullFlattenJSON("", shard, true, true)
if err != nil {
return err
}
// Bucket each returned index with its associated configured index (if any match).
for indexName := range indices {
match := indexName
for name, matcher := range e.indexMatchers {
// If a configured index matches one of the returned indexes, mark it as a match.
if matcher.Match(match) {
match = name
break
}
}
// determine shard tag and primary/replica designation
shardType := "replica"
if flattened.Fields["routing_primary"] == true {
shardType = "primary"
}
delete(flattened.Fields, "routing_primary")
// Bucket all matching indices together for sorting.
categorizedIndexNames[match] = append(categorizedIndexNames[match], indexName)
}
routingState, ok := flattened.Fields["routing_state"].(string)
if ok {
flattened.Fields["routing_state"] = mapShardStatusToCode(routingState)
}
return categorizedIndexNames, nil
}
routingNode, _ := flattened.Fields["routing_node"].(string)
shardTags := map[string]string{
"index_name": id,
"node_id": routingNode,
"shard_name": string(shardNumber),
"type": shardType,
}
func (e *Elasticsearch) gatherSingleIndexStats(name string, index indexStat, now time.Time, acc telegraf.Accumulator) error {
indexTag := map[string]string{"index_name": name}
stats := map[string]interface{}{
"primaries": index.Primaries,
"total": index.Total,
}
for m, s := range stats {
f := jsonparser.JSONFlattener{}
// parse Json, getting strings and bools
err := f.FullFlattenJSON("", s, true, true)
if err != nil {
return err
}
acc.AddFields("elasticsearch_indices_stats_"+m, f.Fields, indexTag, now)
}
for key, field := range flattened.Fields {
switch field.(type) {
case string, bool:
delete(flattened.Fields, key)
}
}
if e.IndicesLevel == "shards" {
for shardNumber, shards := range index.Shards {
for _, shard := range shards {
acc.AddFields("elasticsearch_indices_stats_shards",
flattened.Fields,
shardTags,
now)
// Get Shard Stats
flattened := jsonparser.JSONFlattener{}
err := flattened.FullFlattenJSON("", shard, true, true)
if err != nil {
return err
}
// determine shard tag and primary/replica designation
shardType := "replica"
if flattened.Fields["routing_primary"] == true {
shardType = "primary"
}
delete(flattened.Fields, "routing_primary")
routingState, ok := flattened.Fields["routing_state"].(string)
if ok {
flattened.Fields["routing_state"] = mapShardStatusToCode(routingState)
}
routingNode, _ := flattened.Fields["routing_node"].(string)
shardTags := map[string]string{
"index_name": name,
"node_id": routingNode,
"shard_name": string(shardNumber),
"type": shardType,
}
for key, field := range flattened.Fields {
switch field.(type) {
case string, bool:
delete(flattened.Fields, key)
}
}
acc.AddFields("elasticsearch_indices_stats_shards",
flattened.Fields,
shardTags,
now)
}
}
}
@ -656,6 +748,23 @@ func (e *Elasticsearch) gatherJSONData(url string, v interface{}) error {
return nil
}
func (e *Elasticsearch) compileIndexMatchers() (map[string]filter.Filter, error) {
indexMatchers := map[string]filter.Filter{}
var err error
// Compile each configured index into a glob matcher.
for _, configuredIndex := range e.IndicesInclude {
if _, exists := indexMatchers[configuredIndex]; !exists {
indexMatchers[configuredIndex], err = filter.Compile([]string{configuredIndex})
if err != nil {
return nil, err
}
}
}
return indexMatchers, nil
}
func init() {
inputs.Add("elasticsearch", func() telegraf.Input {
return NewElasticsearch()

View File

@ -310,6 +310,49 @@ func TestGatherClusterIndicesStats(t *testing.T) {
map[string]string{"index_name": "twitter"})
}
func TestGatherDateStampedIndicesStats(t *testing.T) {
es := newElasticsearchWithClient()
es.IndicesInclude = []string{"twitter*", "influx*", "penguins"}
es.NumMostRecentIndices = 2
es.Servers = []string{"http://example.com:9200"}
es.client.Transport = newTransportMock(http.StatusOK, dateStampedIndicesResponse)
es.serverInfo = make(map[string]serverInfo)
es.serverInfo["http://example.com:9200"] = defaultServerInfo()
es.Init()
var acc testutil.Accumulator
if err := es.gatherIndicesStats(es.Servers[0]+"/"+strings.Join(es.IndicesInclude, ",")+"/_stats", &acc); err != nil {
t.Fatal(err)
}
// includes 2 most recent indices for "twitter", only expect the most recent two.
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
clusterIndicesExpected,
map[string]string{"index_name": "twitter_2020_08_02"})
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
clusterIndicesExpected,
map[string]string{"index_name": "twitter_2020_08_01"})
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
clusterIndicesExpected,
map[string]string{"index_name": "twitter_2020_07_31"})
// includes 2 most recent indices for "influx", only expect the most recent two.
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
clusterIndicesExpected,
map[string]string{"index_name": "influx2021.01.02"})
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
clusterIndicesExpected,
map[string]string{"index_name": "influx2021.01.01"})
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
clusterIndicesExpected,
map[string]string{"index_name": "influx2020.12.31"})
// not configured to sort the 'penguins' index, but ensure it is also included.
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
clusterIndicesExpected,
map[string]string{"index_name": "penguins"})
}
func TestGatherClusterIndiceShardsStats(t *testing.T) {
es := newElasticsearchWithClient()
es.IndicesLevel = "shards"

File diff suppressed because it is too large Load Diff