Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,9 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire merge buffer for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/maxBytesUsed`|Maximum number of bytes used by merge buffers for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/bytesUsed`|Total merge buffer bytes used to process groupBy queries, summed across queries. Each query's value is itself the sum across the slices it held: a merge buffer is divided among `druid.processing.numThreads` concurrent query slices, so a query can spill once a single slice fills even though this summed usage looks well below `druid.processing.buffer.sizeBytes`. To gauge spill pressure, use `mergeBuffer/maxSpillProximity` rather than comparing this to `sizeBytes`.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/maxBytesUsed`|Maximum merge buffer bytes used by any single groupBy query within the emission period, where each query's usage is the sum across the slices it held. Because the buffer is sliced among `druid.processing.numThreads` query slices, this value is not directly comparable to `druid.processing.buffer.sizeBytes`; use `mergeBuffer/maxSpillProximity` to gauge spill pressure.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/maxSpillProximity`|How close any single groupBy query came to spilling within the emission period, as a fraction in [0.0, 1.0]. A merge buffer is divided into `druid.processing.numThreads` slices and a query spills as soon as its fullest single slice reaches the hash-table load factor; this metric is that fullest slice's fill fraction of its spill threshold (max across slices, then across queries). 1.0 means at least one query reached the spill point — raise `druid.processing.buffer.sizeBytes` or lower `druid.processing.numThreads` to widen each slice. Read alongside `groupBy/spilledQueries`.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
Expand Down Expand Up @@ -122,8 +123,9 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire merge buffer for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/maxBytesUsed`|Maximum number of bytes used by merge buffers for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/bytesUsed`|Total merge buffer bytes used to process groupBy queries, summed across queries. Each query's value is itself the sum across the slices it held: a merge buffer is divided among `druid.processing.numThreads` concurrent query slices, so a query can spill once a single slice fills even though this summed usage looks well below `druid.processing.buffer.sizeBytes`. To gauge spill pressure, use `mergeBuffer/maxSpillProximity` rather than comparing this to `sizeBytes`.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/maxBytesUsed`|Maximum merge buffer bytes used by any single groupBy query within the emission period, where each query's usage is the sum across the slices it held. Because the buffer is sliced among `druid.processing.numThreads` query slices, this value is not directly comparable to `druid.processing.buffer.sizeBytes`; use `mergeBuffer/maxSpillProximity` to gauge spill pressure.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/maxSpillProximity`|How close any single groupBy query came to spilling within the emission period, as a fraction in [0.0, 1.0]. A merge buffer is divided into `druid.processing.numThreads` slices and a query spills as soon as its fullest single slice reaches the hash-table load factor; this metric is that fullest slice's fill fraction of its spill threshold (max across slices, then across queries). 1.0 means at least one query reached the spill point — raise `druid.processing.buffer.sizeBytes` or lower `druid.processing.numThreads` to widen each slice. Read alongside `groupBy/spilledQueries`.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
Expand Down Expand Up @@ -154,8 +156,9 @@ to represent the task ID are deprecated and will be removed in a future release.
|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire merge buffer for any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`mergeBuffer/maxBytesUsed`|Maximum number of bytes used by merge buffers for any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`mergeBuffer/bytesUsed`|Total merge buffer bytes used to process groupBy queries, summed across queries. Each query's value is itself the sum across the slices it held: a merge buffer is divided among `druid.processing.numThreads` concurrent query slices, so a query can spill once a single slice fills even though this summed usage looks well below `druid.processing.buffer.sizeBytes`. To gauge spill pressure, use `mergeBuffer/maxSpillProximity` rather than comparing this to `sizeBytes`. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`mergeBuffer/maxBytesUsed`|Maximum merge buffer bytes used by any single groupBy query within the emission period, where each query's usage is the sum across the slices it held. Because the buffer is sliced among `druid.processing.numThreads` query slices, this value is not directly comparable to `druid.processing.buffer.sizeBytes`; use `mergeBuffer/maxSpillProximity` to gauge spill pressure. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`mergeBuffer/maxSpillProximity`|How close any single groupBy query came to spilling within the emission period, as a fraction in [0.0, 1.0]. A merge buffer is divided into `druid.processing.numThreads` slices and a query spills as soon as its fullest single slice reaches the hash-table load factor; this metric is that fullest slice's fill fraction of its spill threshold (max across slices, then across queries). 1.0 means at least one query reached the spill point — raise `druid.processing.buffer.sizeBytes` or lower `druid.processing.numThreads` to widen each slice. Read alongside `groupBy/spilledQueries`. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public static class AggregateStats
private long maxMergeBufferAcquisitionTimeNs = 0;
private long totalMergeBufferUsedBytes = 0;
private long maxMergeBufferUsedBytes = 0;
private double maxSpillProximity = 0.0;
private long spilledQueries = 0;
private long spilledBytes = 0;
private long maxSpilledBytes = 0;
Expand All @@ -91,6 +92,7 @@ public AggregateStats(AggregateStats aggregateStats)
aggregateStats.maxMergeBufferAcquisitionTimeNs,
aggregateStats.totalMergeBufferUsedBytes,
aggregateStats.maxMergeBufferUsedBytes,
aggregateStats.maxSpillProximity,
aggregateStats.spilledQueries,
aggregateStats.spilledBytes,
aggregateStats.maxSpilledBytes,
Expand All @@ -105,6 +107,7 @@ public AggregateStats(
long maxMergeBufferAcquisitionTimeNs,
long totalMergeBufferUsedBytes,
long maxMergeBufferUsedBytes,
double maxSpillProximity,
long spilledQueries,
long spilledBytes,
long maxSpilledBytes,
Expand All @@ -117,6 +120,7 @@ public AggregateStats(
this.maxMergeBufferAcquisitionTimeNs = maxMergeBufferAcquisitionTimeNs;
this.totalMergeBufferUsedBytes = totalMergeBufferUsedBytes;
this.maxMergeBufferUsedBytes = maxMergeBufferUsedBytes;
this.maxSpillProximity = maxSpillProximity;
this.spilledQueries = spilledQueries;
this.spilledBytes = spilledBytes;
this.maxSpilledBytes = maxSpilledBytes;
Expand Down Expand Up @@ -149,6 +153,11 @@ public long getMaxMergeBufferUsedBytes()
return maxMergeBufferUsedBytes;
}

public double getMaxSpillProximity()
{
return maxSpillProximity;
}

public long getSpilledQueries()
{
return spilledQueries;
Expand All @@ -174,6 +183,19 @@ public long getMaxMergeDictionarySize()
return maxMergeDictionarySize;
}

/**
* Folds a completed query's stats into the running aggregate. For merge-buffer usage:
* <ul>
* <li>{@code totalMergeBufferUsedBytes} (emitted as {@code mergeBuffer/bytesUsed}) sums each query's usage
* across all queries, where each query's usage is itself the sum across the query's slices.</li>
* <li>{@code maxMergeBufferUsedBytes} (emitted as {@code mergeBuffer/maxBytesUsed}) is the max such per-query
* summed usage across queries.</li>
* <li>{@code maxSpillProximity} (emitted as {@code mergeBuffer/maxSpillProximity}) is the max per-query spill
* proximity across queries, where each query's value is its fullest slice's fill fraction of the spill
* threshold. Unlike the byte sums above, this is a per-slice MAX so it reflects the slice that drives a
* spill; 1.0 means at least one query reached the spill point.</li>
* </ul>
*/
public void addQueryStats(PerQueryStats perQueryStats)
{
if (perQueryStats.getMergeBufferAcquisitionTimeNs() > 0) {
Expand All @@ -183,8 +205,9 @@ public void addQueryStats(PerQueryStats perQueryStats)
maxMergeBufferAcquisitionTimeNs,
perQueryStats.getMergeBufferAcquisitionTimeNs()
);
totalMergeBufferUsedBytes += perQueryStats.getMaxMergeBufferUsedBytes();
maxMergeBufferUsedBytes = Math.max(maxMergeBufferUsedBytes, perQueryStats.getMaxMergeBufferUsedBytes());
totalMergeBufferUsedBytes += perQueryStats.getMergeBufferUsedBytes();
maxMergeBufferUsedBytes = Math.max(maxMergeBufferUsedBytes, perQueryStats.getMergeBufferUsedBytes());
maxSpillProximity = Math.max(maxSpillProximity, perQueryStats.getSpillProximity());
}

if (perQueryStats.getSpilledBytes() > 0) {
Expand All @@ -204,6 +227,7 @@ public void reset()
this.maxMergeBufferAcquisitionTimeNs = 0;
this.totalMergeBufferUsedBytes = 0;
this.maxMergeBufferUsedBytes = 0;
this.maxSpillProximity = 0.0;
this.spilledQueries = 0;
this.spilledBytes = 0;
this.maxSpilledBytes = 0;
Expand All @@ -215,7 +239,25 @@ public void reset()
public static class PerQueryStats
{
private final AtomicLong mergeBufferAcquisitionTimeNs = new AtomicLong(0);
private final AtomicLong maxMergeBufferUsedBytes = new AtomicLong(0);
/**
* Sum of the peak merge-buffer usage of every grouper (slice) this query held. A
* {@code ConcurrentGrouper} slices a single merge buffer into one slice per processing thread, and each slice
* reports its own peak via
* {@link #addMergeBufferUsedBytes(long)} when closed, so the per-query value is the SUM across the query's slices.
*/
private final AtomicLong mergeBufferUsedBytes = new AtomicLong(0);
/**
* Peak used bytes of the single fullest slice this query held, tracked as a MAX across the query's slices. A query
* spills as soon as one slice fills, so spill proximity is driven by the hottest slice, NOT the sum tracked by
* {@link #mergeBufferUsedBytes}. Compared against {@link #sliceSpillThresholdBytes}.
*/
private final AtomicLong maxSliceUsedBytes = new AtomicLong(0);
/**
* Per-slice spill threshold in bytes, i.e. {@code sliceSize * maxLoadFactor}. A slice's hash table spills once its
* bucket count reaches the max load factor, so this (not the raw slice size) is the denominator that makes a
* usage ratio of 1.0 mean "at the spill point". Tracked as a max; all slices of a query share the same value.
*/
private final AtomicLong sliceSpillThresholdBytes = new AtomicLong(0);
private final AtomicLong spilledBytes = new AtomicLong(0);
private final AtomicLong mergeDictionarySize = new AtomicLong(0);

Expand All @@ -224,9 +266,24 @@ public void mergeBufferAcquisitionTime(long delay)
mergeBufferAcquisitionTimeNs.addAndGet(delay);
}

public void maxMergeBufferUsedBytes(long bytes)
/**
* Accumulates the peak merge-buffer usage of one grouper (slice). Despite the previous "max" naming, this method
* sums across the slices a query holds; see {@link #mergeBufferUsedBytes}.
*/
public void addMergeBufferUsedBytes(long bytes)
{
mergeBufferUsedBytes.addAndGet(bytes);
}

/**
* Records one slice's peak used bytes and its spill threshold ({@code sliceSize * maxLoadFactor}). Both are tracked
* as maxima, so after all slices close the pair describes the single fullest slice — the one that drives spilling.
* Used to compute {@code mergeBuffer/maxSpillProximity}.
*/
public void sliceUsage(long usedBytes, long spillThresholdBytes)
{
maxMergeBufferUsedBytes.addAndGet(bytes);
maxSliceUsedBytes.accumulateAndGet(usedBytes, Math::max);
sliceSpillThresholdBytes.accumulateAndGet(spillThresholdBytes, Math::max);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Track spill proximity as a per-slice ratio

This stores the maximum used bytes and maximum threshold independently, which breaks when one query reports groupers with different thresholds. Nested/subtotal processing can pass the same PerQueryStats through a sliced ConcurrentGrouper and later full-buffer SpillingGroupers; if a small slice reaches its spill threshold, the larger full-buffer threshold can be retained here and getSpillProximity() will divide by that larger value, under-reporting the slice spill as roughly 1 / concurrencyHint. Please track the maximum usedBytes / spillThresholdBytes per sliceUsage call, or otherwise keep the used/threshold pair together, so mixed grouper sizes still report the true max proximity.

}

public void spilledBytes(long bytes)
Expand All @@ -244,9 +301,23 @@ public long getMergeBufferAcquisitionTimeNs()
return mergeBufferAcquisitionTimeNs.get();
}

public long getMaxMergeBufferUsedBytes()
public long getMergeBufferUsedBytes()
{
return maxMergeBufferUsedBytes.get();
return mergeBufferUsedBytes.get();
}

/**
* Spill proximity for this query in [0.0, 1.0]: the fullest slice's used bytes divided by that slice's spill
* threshold. 1.0 means a slice reached the point at which it spills to disk. Returns 0.0 when no slice usage was
* recorded (e.g. a grouper that never initialized).
*/
public double getSpillProximity()
{
long threshold = sliceSpillThresholdBytes.get();
if (threshold <= 0) {
return 0.0;
}
return Math.min(1.0, (double) maxSliceUsedBytes.get() / threshold);
}

public long getSpilledBytes()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
{
private static final int MIN_INITIAL_BUCKETS = 4;
private static final int DEFAULT_INITIAL_BUCKETS = 1024;
private static final float DEFAULT_MAX_LOAD_FACTOR = 0.7f;
// Package-private so SpillingGrouper can mirror the default-resolution when computing per-slice spill proximity.
static final float DEFAULT_MAX_LOAD_FACTOR = 0.7f;

private boolean initialized = false;

Expand All @@ -63,7 +64,7 @@ public BufferHashGrouper(
{
super(bufferSupplier, keySerde, aggregators, HASH_SIZE + keySerde.keySize(), bufferGrouperMaxSize);

this.maxLoadFactor = maxLoadFactor > 0 ? maxLoadFactor : DEFAULT_MAX_LOAD_FACTOR;
this.maxLoadFactor = resolveMaxLoadFactor(maxLoadFactor);
this.initialBuckets = initialBuckets > 0 ? Math.max(MIN_INITIAL_BUCKETS, initialBuckets) : DEFAULT_INITIAL_BUCKETS;

if (this.maxLoadFactor >= 1.0f) {
Expand All @@ -74,6 +75,16 @@ public BufferHashGrouper(
this.useDefaultSorting = useDefaultSorting;
}

/**
* Resolves the effective max load factor, applying {@link #DEFAULT_MAX_LOAD_FACTOR} when a non-positive value is
* configured. A hash table spills once its bucket count reaches this fraction of capacity, so this is the value to
* compare per-slice usage against when computing spill proximity (see {@code SpillingGrouper}).
*/
static float resolveMaxLoadFactor(float maxLoadFactor)
{
return maxLoadFactor > 0 ? maxLoadFactor : DEFAULT_MAX_LOAD_FACTOR;
}

@Override
public void init()
{
Expand Down
Loading
Loading