Prydin issue 8169 (#8357)
This commit is contained in:
parent
1939e58b68
commit
89919631c5
|
|
@ -37,6 +37,8 @@ const maxSampleConst = 10 // Absolute maximum number of samples regardless of pe
|
||||||
|
|
||||||
const maxMetadataSamples = 100 // Number of resources to sample for metric metadata
|
const maxMetadataSamples = 100 // Number of resources to sample for metric metadata
|
||||||
|
|
||||||
|
const maxRealtimeMetrics = 50000 // Absolute maximum metrics per realtime query
|
||||||
|
|
||||||
const hwMarkTTL = time.Duration(4 * time.Hour)
|
const hwMarkTTL = time.Duration(4 * time.Hour)
|
||||||
|
|
||||||
type queryChunk []types.PerfQuerySpec
|
type queryChunk []types.PerfQuerySpec
|
||||||
|
|
@ -322,13 +324,13 @@ func (e *Endpoint) reloadMetricNameMap(ctx context.Context) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
mn, err := client.CounterInfoByName(ctx)
|
mn, err := client.CounterInfoByKey(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
e.metricNameLookup = make(map[int32]string)
|
e.metricNameLookup = make(map[int32]string)
|
||||||
for name, m := range mn {
|
for key, m := range mn {
|
||||||
e.metricNameLookup[m.Key] = name
|
e.metricNameLookup[key] = m.Name()
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -889,6 +891,7 @@ func (e *Endpoint) chunkify(ctx context.Context, res *resourceKind, now time.Tim
|
||||||
}
|
}
|
||||||
|
|
||||||
pqs := make(queryChunk, 0, e.Parent.MaxQueryObjects)
|
pqs := make(queryChunk, 0, e.Parent.MaxQueryObjects)
|
||||||
|
numQs := 0
|
||||||
|
|
||||||
for _, object := range res.objects {
|
for _, object := range res.objects {
|
||||||
timeBuckets := make(map[int64]*types.PerfQuerySpec, 0)
|
timeBuckets := make(map[int64]*types.PerfQuerySpec, 0)
|
||||||
|
|
@ -924,9 +927,9 @@ func (e *Endpoint) chunkify(ctx context.Context, res *resourceKind, now time.Tim
|
||||||
// Add this metric to the bucket
|
// Add this metric to the bucket
|
||||||
bucket.MetricId = append(bucket.MetricId, metric)
|
bucket.MetricId = append(bucket.MetricId, metric)
|
||||||
|
|
||||||
// Bucket filled to capacity? (Only applies to non real time)
|
// Bucket filled to capacity?
|
||||||
// OR if we're past the absolute maximum limit
|
// OR if we're past the absolute maximum limit
|
||||||
if (!res.realTime && len(bucket.MetricId) >= maxMetrics) || len(bucket.MetricId) > 100000 {
|
if (!res.realTime && len(bucket.MetricId) >= maxMetrics) || len(bucket.MetricId) > maxRealtimeMetrics {
|
||||||
e.log.Debugf("Submitting partial query: %d metrics (%d remaining) of type %s for %s. Total objects %d",
|
e.log.Debugf("Submitting partial query: %d metrics (%d remaining) of type %s for %s. Total objects %d",
|
||||||
len(bucket.MetricId), len(res.metrics)-metricIdx, res.name, e.URL.Host, len(res.objects))
|
len(bucket.MetricId), len(res.metrics)-metricIdx, res.name, e.URL.Host, len(res.objects))
|
||||||
|
|
||||||
|
|
@ -943,16 +946,18 @@ func (e *Endpoint) chunkify(ctx context.Context, res *resourceKind, now time.Tim
|
||||||
// Handle data in time bucket and submit job if we've reached the maximum number of object.
|
// Handle data in time bucket and submit job if we've reached the maximum number of object.
|
||||||
for _, bucket := range timeBuckets {
|
for _, bucket := range timeBuckets {
|
||||||
pqs = append(pqs, *bucket)
|
pqs = append(pqs, *bucket)
|
||||||
if (!res.realTime && len(pqs) > e.Parent.MaxQueryObjects) || len(pqs) > 100000 {
|
numQs += len(bucket.MetricId)
|
||||||
e.log.Debugf("Submitting final bucket job for %s: %d metrics", res.name, len(bucket.MetricId))
|
if (!res.realTime && numQs > e.Parent.MaxQueryObjects) || numQs > maxRealtimeMetrics {
|
||||||
|
e.log.Debugf("Submitting final bucket job for %s: %d metrics", res.name, numQs)
|
||||||
submitChunkJob(ctx, te, job, pqs)
|
submitChunkJob(ctx, te, job, pqs)
|
||||||
pqs = make(queryChunk, 0, e.Parent.MaxQueryObjects)
|
pqs = make(queryChunk, 0, e.Parent.MaxQueryObjects)
|
||||||
|
numQs = 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Submit any jobs left in the queue
|
// Submit any jobs left in the queue
|
||||||
if len(pqs) > 0 {
|
if len(pqs) > 0 {
|
||||||
e.log.Debugf("Submitting job for %s: %d objects", res.name, len(pqs))
|
e.log.Debugf("Submitting job for %s: %d objects, %d metrics", res.name, len(pqs), numQs)
|
||||||
submitChunkJob(ctx, te, job, pqs)
|
submitChunkJob(ctx, te, job, pqs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue