diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 8d71c6044f85..812e5d4d043c 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -93,8 +93,9 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.| |`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire merge buffer for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| -|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| -|`mergeBuffer/maxBytesUsed`|Maximum number of bytes used by merge buffers for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| +|`mergeBuffer/bytesUsed`|Total merge buffer bytes used to process groupBy queries, summed across queries. Each query's value is itself the sum across the slices it held: a merge buffer is divided among `druid.processing.numThreads` concurrent query slices, so a query can spill once a single slice fills even though this summed usage looks well below `druid.processing.buffer.sizeBytes`. To gauge spill pressure, use `mergeBuffer/maxSpillProximity` rather than comparing this to `sizeBytes`.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| +|`mergeBuffer/maxBytesUsed`|Maximum merge buffer bytes used by any single groupBy query within the emission period, where each query's usage is the sum across the slices it held. Because the buffer is sliced among `druid.processing.numThreads` query slices, this value is not directly comparable to `druid.processing.buffer.sizeBytes`; use `mergeBuffer/maxSpillProximity` to gauge spill pressure.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| +|`mergeBuffer/maxSpillProximity`|How close any single groupBy query came to spilling within the emission period, as a fraction in [0.0, 1.0]. A merge buffer is divided into `druid.processing.numThreads` slices and a query spills as soon as its fullest single slice reaches the hash-table load factor; this metric is that fullest slice's fill fraction of its spill threshold (max across slices, then across queries). 1.0 means at least one query reached the spill point — raise `druid.processing.buffer.sizeBytes` or lower `druid.processing.numThreads` to widen each slice. Read alongside `groupBy/spilledQueries`.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| @@ -122,8 +123,9 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.| |`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire merge buffer for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| -|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| -|`mergeBuffer/maxBytesUsed`|Maximum number of bytes used by merge buffers for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| +|`mergeBuffer/bytesUsed`|Total merge buffer bytes used to process groupBy queries, summed across queries. Each query's value is itself the sum across the slices it held: a merge buffer is divided among `druid.processing.numThreads` concurrent query slices, so a query can spill once a single slice fills even though this summed usage looks well below `druid.processing.buffer.sizeBytes`. To gauge spill pressure, use `mergeBuffer/maxSpillProximity` rather than comparing this to `sizeBytes`.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| +|`mergeBuffer/maxBytesUsed`|Maximum merge buffer bytes used by any single groupBy query within the emission period, where each query's usage is the sum across the slices it held. Because the buffer is sliced among `druid.processing.numThreads` query slices, this value is not directly comparable to `druid.processing.buffer.sizeBytes`; use `mergeBuffer/maxSpillProximity` to gauge spill pressure.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| +|`mergeBuffer/maxSpillProximity`|How close any single groupBy query came to spilling within the emission period, as a fraction in [0.0, 1.0]. A merge buffer is divided into `druid.processing.numThreads` slices and a query spills as soon as its fullest single slice reaches the hash-table load factor; this metric is that fullest slice's fill fraction of its spill threshold (max across slices, then across queries). 1.0 means at least one query reached the spill point — raise `druid.processing.buffer.sizeBytes` or lower `druid.processing.numThreads` to widen each slice. Read alongside `groupBy/spilledQueries`.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| @@ -154,8 +156,9 @@ to represent the task ID are deprecated and will be removed in a future release. |`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Depends on the number of groupBy queries needing merge buffers.| |`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies| |`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire merge buffer for any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies| -|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies| -|`mergeBuffer/maxBytesUsed`|Maximum number of bytes used by merge buffers for any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies| +|`mergeBuffer/bytesUsed`|Total merge buffer bytes used to process groupBy queries, summed across queries. Each query's value is itself the sum across the slices it held: a merge buffer is divided among `druid.processing.numThreads` concurrent query slices, so a query can spill once a single slice fills even though this summed usage looks well below `druid.processing.buffer.sizeBytes`. To gauge spill pressure, use `mergeBuffer/maxSpillProximity` rather than comparing this to `sizeBytes`. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies| +|`mergeBuffer/maxBytesUsed`|Maximum merge buffer bytes used by any single groupBy query within the emission period, where each query's usage is the sum across the slices it held. Because the buffer is sliced among `druid.processing.numThreads` query slices, this value is not directly comparable to `druid.processing.buffer.sizeBytes`; use `mergeBuffer/maxSpillProximity` to gauge spill pressure. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies| +|`mergeBuffer/maxSpillProximity`|How close any single groupBy query came to spilling within the emission period, as a fraction in [0.0, 1.0]. A merge buffer is divided into `druid.processing.numThreads` slices and a query spills as soon as its fullest single slice reaches the hash-table load factor; this metric is that fullest slice's fill fraction of its spill threshold (max across slices, then across queries). 1.0 means at least one query reached the spill point — raise `druid.processing.buffer.sizeBytes` or lower `druid.processing.numThreads` to widen each slice. Read alongside `groupBy/spilledQueries`. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies| |`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies| |`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies| |`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies| diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java index f6b92a7b62c1..ad68c649c409 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java @@ -73,6 +73,7 @@ public static class AggregateStats private long maxMergeBufferAcquisitionTimeNs = 0; private long totalMergeBufferUsedBytes = 0; private long maxMergeBufferUsedBytes = 0; + private double maxSpillProximity = 0.0; private long spilledQueries = 0; private long spilledBytes = 0; private long maxSpilledBytes = 0; @@ -91,6 +92,7 @@ public AggregateStats(AggregateStats aggregateStats) aggregateStats.maxMergeBufferAcquisitionTimeNs, aggregateStats.totalMergeBufferUsedBytes, aggregateStats.maxMergeBufferUsedBytes, + aggregateStats.maxSpillProximity, aggregateStats.spilledQueries, aggregateStats.spilledBytes, aggregateStats.maxSpilledBytes, @@ -105,6 +107,7 @@ public AggregateStats( long maxMergeBufferAcquisitionTimeNs, long totalMergeBufferUsedBytes, long maxMergeBufferUsedBytes, + double maxSpillProximity, long spilledQueries, long spilledBytes, long maxSpilledBytes, @@ -117,6 +120,7 @@ public AggregateStats( this.maxMergeBufferAcquisitionTimeNs = maxMergeBufferAcquisitionTimeNs; this.totalMergeBufferUsedBytes = totalMergeBufferUsedBytes; this.maxMergeBufferUsedBytes = maxMergeBufferUsedBytes; + this.maxSpillProximity = maxSpillProximity; this.spilledQueries = spilledQueries; this.spilledBytes = spilledBytes; this.maxSpilledBytes = maxSpilledBytes; @@ -149,6 +153,11 @@ public long getMaxMergeBufferUsedBytes() return maxMergeBufferUsedBytes; } + public double getMaxSpillProximity() + { + return maxSpillProximity; + } + public long getSpilledQueries() { return spilledQueries; @@ -174,6 +183,19 @@ public long getMaxMergeDictionarySize() return maxMergeDictionarySize; } + /** + * Folds a completed query's stats into the running aggregate. For merge-buffer usage: + * + */ public void addQueryStats(PerQueryStats perQueryStats) { if (perQueryStats.getMergeBufferAcquisitionTimeNs() > 0) { @@ -183,8 +205,9 @@ public void addQueryStats(PerQueryStats perQueryStats) maxMergeBufferAcquisitionTimeNs, perQueryStats.getMergeBufferAcquisitionTimeNs() ); - totalMergeBufferUsedBytes += perQueryStats.getMaxMergeBufferUsedBytes(); - maxMergeBufferUsedBytes = Math.max(maxMergeBufferUsedBytes, perQueryStats.getMaxMergeBufferUsedBytes()); + totalMergeBufferUsedBytes += perQueryStats.getMergeBufferUsedBytes(); + maxMergeBufferUsedBytes = Math.max(maxMergeBufferUsedBytes, perQueryStats.getMergeBufferUsedBytes()); + maxSpillProximity = Math.max(maxSpillProximity, perQueryStats.getSpillProximity()); } if (perQueryStats.getSpilledBytes() > 0) { @@ -204,6 +227,7 @@ public void reset() this.maxMergeBufferAcquisitionTimeNs = 0; this.totalMergeBufferUsedBytes = 0; this.maxMergeBufferUsedBytes = 0; + this.maxSpillProximity = 0.0; this.spilledQueries = 0; this.spilledBytes = 0; this.maxSpilledBytes = 0; @@ -215,7 +239,25 @@ public void reset() public static class PerQueryStats { private final AtomicLong mergeBufferAcquisitionTimeNs = new AtomicLong(0); - private final AtomicLong maxMergeBufferUsedBytes = new AtomicLong(0); + /** + * Sum of the peak merge-buffer usage of every grouper (slice) this query held. A + * {@code ConcurrentGrouper} slices a single merge buffer into one slice per processing thread, and each slice + * reports its own peak via + * {@link #addMergeBufferUsedBytes(long)} when closed, so the per-query value is the SUM across the query's slices. + */ + private final AtomicLong mergeBufferUsedBytes = new AtomicLong(0); + /** + * Peak used bytes of the single fullest slice this query held, tracked as a MAX across the query's slices. A query + * spills as soon as one slice fills, so spill proximity is driven by the hottest slice, NOT the sum tracked by + * {@link #mergeBufferUsedBytes}. Compared against {@link #sliceSpillThresholdBytes}. + */ + private final AtomicLong maxSliceUsedBytes = new AtomicLong(0); + /** + * Per-slice spill threshold in bytes, i.e. {@code sliceSize * maxLoadFactor}. A slice's hash table spills once its + * bucket count reaches the max load factor, so this (not the raw slice size) is the denominator that makes a + * usage ratio of 1.0 mean "at the spill point". Tracked as a max; all slices of a query share the same value. + */ + private final AtomicLong sliceSpillThresholdBytes = new AtomicLong(0); private final AtomicLong spilledBytes = new AtomicLong(0); private final AtomicLong mergeDictionarySize = new AtomicLong(0); @@ -224,9 +266,24 @@ public void mergeBufferAcquisitionTime(long delay) mergeBufferAcquisitionTimeNs.addAndGet(delay); } - public void maxMergeBufferUsedBytes(long bytes) + /** + * Accumulates the peak merge-buffer usage of one grouper (slice). Despite the previous "max" naming, this method + * sums across the slices a query holds; see {@link #mergeBufferUsedBytes}. + */ + public void addMergeBufferUsedBytes(long bytes) + { + mergeBufferUsedBytes.addAndGet(bytes); + } + + /** + * Records one slice's peak used bytes and its spill threshold ({@code sliceSize * maxLoadFactor}). Both are tracked + * as maxima, so after all slices close the pair describes the single fullest slice — the one that drives spilling. + * Used to compute {@code mergeBuffer/maxSpillProximity}. + */ + public void sliceUsage(long usedBytes, long spillThresholdBytes) { - maxMergeBufferUsedBytes.addAndGet(bytes); + maxSliceUsedBytes.accumulateAndGet(usedBytes, Math::max); + sliceSpillThresholdBytes.accumulateAndGet(spillThresholdBytes, Math::max); } public void spilledBytes(long bytes) @@ -244,9 +301,23 @@ public long getMergeBufferAcquisitionTimeNs() return mergeBufferAcquisitionTimeNs.get(); } - public long getMaxMergeBufferUsedBytes() + public long getMergeBufferUsedBytes() { - return maxMergeBufferUsedBytes.get(); + return mergeBufferUsedBytes.get(); + } + + /** + * Spill proximity for this query in [0.0, 1.0]: the fullest slice's used bytes divided by that slice's spill + * threshold. 1.0 means a slice reached the point at which it spills to disk. Returns 0.0 when no slice usage was + * recorded (e.g. a grouper that never initialized). + */ + public double getSpillProximity() + { + long threshold = sliceSpillThresholdBytes.get(); + if (threshold <= 0) { + return 0.0; + } + return Math.min(1.0, (double) maxSliceUsedBytes.get() / threshold); } public long getSpilledBytes() diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java index 670a03cb2dee..e5ab63a83d82 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java @@ -36,7 +36,8 @@ public class BufferHashGrouper extends AbstractBufferHashGrouper 0 ? maxLoadFactor : DEFAULT_MAX_LOAD_FACTOR; + this.maxLoadFactor = resolveMaxLoadFactor(maxLoadFactor); this.initialBuckets = initialBuckets > 0 ? Math.max(MIN_INITIAL_BUCKETS, initialBuckets) : DEFAULT_INITIAL_BUCKETS; if (this.maxLoadFactor >= 1.0f) { @@ -74,6 +75,16 @@ public BufferHashGrouper( this.useDefaultSorting = useDefaultSorting; } + /** + * Resolves the effective max load factor, applying {@link #DEFAULT_MAX_LOAD_FACTOR} when a non-positive value is + * configured. A hash table spills once its bucket count reaches this fraction of capacity, so this is the value to + * compare per-slice usage against when computing spill proximity (see {@code SpillingGrouper}). + */ + static float resolveMaxLoadFactor(float maxLoadFactor) + { + return maxLoadFactor > 0 ? maxLoadFactor : DEFAULT_MAX_LOAD_FACTOR; + } + @Override public void init() { diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java index 96e55907b21f..df43c5e934e8 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java @@ -84,6 +84,11 @@ public class SpillingGrouper implements Grouper private final Comparator> defaultOrderKeyObjComparator; private final GroupByStatsProvider.PerQueryStats perQueryStats; private final long minSpillFileSize; + // Per-slice spill threshold in bytes: the slice's capacity scaled by the resolved max load factor. A slice's hash + // table spills once its bucket count reaches the load factor, so this (not the raw slice size) is what the slice's + // peak usage is compared against to produce mergeBuffer/maxSpillProximity. For a ConcurrentGrouper slice the slice + // size is the per-thread fraction of the merge buffer, not the full configured buffer. + private final long spillThresholdBytes; private final List files = new ArrayList<>(); private final List dictionaryFiles = new ArrayList<>(); @@ -178,6 +183,8 @@ public SpillingGrouper( this.sortHasNonGroupingFields = sortHasNonGroupingFields; this.minSpillFileSize = minSpillFileSize; this.perQueryStats = perQueryStats; + final float resolvedLoadFactor = BufferHashGrouper.resolveMaxLoadFactor(bufferGrouperMaxLoadFactor); + this.spillThresholdBytes = (long) (mergeBufferSize * resolvedLoadFactor); } @Override @@ -249,7 +256,13 @@ public void reset() public void close() { perQueryStats.dictionarySize(getDictionarySizeEstimate()); - perQueryStats.maxMergeBufferUsedBytes(getMaxMergeBufferUsedBytes()); + final long sliceUsedBytes = getMaxMergeBufferUsedBytes(); + perQueryStats.addMergeBufferUsedBytes(sliceUsedBytes); + if (grouper.isInitialized()) { + // Report this slice's peak usage against its spill threshold so the provider can compute spill proximity. Only + // recorded when the grouper was initialized, so a grouper that never touched the merge buffer is not counted. + perQueryStats.sliceUsage(sliceUsedBytes, spillThresholdBytes); + } // Record spilled bytes before deleteFiles() decrements bytesUsed in temporaryStorage. long spilledBytes = 0; for (final File file : files) { diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByStatsProviderTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByStatsProviderTest.java index dafd381668d8..4153c2faf3e5 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByStatsProviderTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByStatsProviderTest.java @@ -25,6 +25,8 @@ public class GroupByStatsProviderTest { + private static final double DELTA = 1e-9; + @Test public void testMetricCollection() { @@ -35,7 +37,12 @@ public void testMetricCollection() stats1.mergeBufferAcquisitionTime(300); stats1.mergeBufferAcquisitionTime(400); - stats1.maxMergeBufferUsedBytes(50); + // Two slices of the same query: usage SUMS to 80, while spill proximity is the per-slice MAX. Slice 1 is at + // 50/1000 of its threshold, slice 2 at 30/1000; the fullest slice (0.05) drives proximity. + stats1.addMergeBufferUsedBytes(50); + stats1.sliceUsage(50, 1000); + stats1.addMergeBufferUsedBytes(30); + stats1.sliceUsage(30, 1000); stats1.spilledBytes(200); stats1.spilledBytes(400); stats1.dictionarySize(100); @@ -46,7 +53,9 @@ public void testMetricCollection() stats2.mergeBufferAcquisitionTime(500); stats2.mergeBufferAcquisitionTime(600); - stats2.maxMergeBufferUsedBytes(100); + // Single slice at 100/2000 = 0.05. + stats2.addMergeBufferUsedBytes(100); + stats2.sliceUsage(100, 2000); stats2.spilledBytes(400); stats2.spilledBytes(600); stats2.dictionarySize(300); @@ -58,6 +67,7 @@ public void testMetricCollection() Assertions.assertEquals(0L, aggregateStats.getMaxMergeBufferAcquisitionTimeNs()); Assertions.assertEquals(0L, aggregateStats.getTotalMergeBufferUsedBytes()); Assertions.assertEquals(0L, aggregateStats.getMaxMergeBufferUsedBytes()); + Assertions.assertEquals(0.0, aggregateStats.getMaxSpillProximity(), DELTA); Assertions.assertEquals(0L, aggregateStats.getSpilledQueries()); Assertions.assertEquals(0L, aggregateStats.getSpilledBytes()); Assertions.assertEquals(0L, aggregateStats.getMaxSpilledBytes()); @@ -71,8 +81,13 @@ public void testMetricCollection() Assertions.assertEquals(2, aggregateStats.getMergeBufferQueries()); Assertions.assertEquals(1800L, aggregateStats.getMergeBufferAcquisitionTimeNs()); Assertions.assertEquals(1100L, aggregateStats.getMaxMergeBufferAcquisitionTimeNs()); - Assertions.assertEquals(150L, aggregateStats.getTotalMergeBufferUsedBytes()); + // bytesUsed sums across queries AND across each query's slices: (50 + 30) + 100 = 180. + Assertions.assertEquals(180L, aggregateStats.getTotalMergeBufferUsedBytes()); + // maxBytesUsed is the max per-query summed usage: max(80, 100) = 100. Assertions.assertEquals(100L, aggregateStats.getMaxMergeBufferUsedBytes()); + // maxSpillProximity is the max per-query proximity, where each query's proximity is its fullest slice's + // fill fraction: q1 -> max(50/1000, 30/1000) = 0.05, q2 -> 100/2000 = 0.05, so max = 0.05. + Assertions.assertEquals(0.05, aggregateStats.getMaxSpillProximity(), DELTA); Assertions.assertEquals(2L, aggregateStats.getSpilledQueries()); Assertions.assertEquals(1600L, aggregateStats.getSpilledBytes()); Assertions.assertEquals(1000L, aggregateStats.getMaxSpilledBytes()); @@ -88,28 +103,32 @@ public void testMetricsWithMultipleQueries() QueryResourceId r1 = new QueryResourceId("r1"); GroupByStatsProvider.PerQueryStats stats1 = statsProvider.getPerQueryStatsContainer(r1); stats1.mergeBufferAcquisitionTime(2000); - stats1.maxMergeBufferUsedBytes(50); + stats1.addMergeBufferUsedBytes(50); + stats1.sliceUsage(50, 1000); // 0.05 stats1.spilledBytes(100); stats1.dictionarySize(200); QueryResourceId r2 = new QueryResourceId("r2"); GroupByStatsProvider.PerQueryStats stats2 = statsProvider.getPerQueryStatsContainer(r2); stats2.mergeBufferAcquisitionTime(100); - stats2.maxMergeBufferUsedBytes(500); + stats2.addMergeBufferUsedBytes(500); + stats2.sliceUsage(500, 1000); // 0.5 stats2.spilledBytes(150); stats2.dictionarySize(250); QueryResourceId r3 = new QueryResourceId("r3"); GroupByStatsProvider.PerQueryStats stats3 = statsProvider.getPerQueryStatsContainer(r3); stats3.mergeBufferAcquisitionTime(200); - stats3.maxMergeBufferUsedBytes(100); + stats3.addMergeBufferUsedBytes(100); + stats3.sliceUsage(100, 1000); // 0.1 stats3.spilledBytes(3000); stats3.dictionarySize(300); QueryResourceId r4 = new QueryResourceId("r4"); GroupByStatsProvider.PerQueryStats stats4 = statsProvider.getPerQueryStatsContainer(r4); stats4.mergeBufferAcquisitionTime(300); - stats4.maxMergeBufferUsedBytes(75); + stats4.addMergeBufferUsedBytes(75); + stats4.sliceUsage(75, 1000); // 0.075 stats4.spilledBytes(200); stats4.dictionarySize(1500); @@ -122,6 +141,8 @@ public void testMetricsWithMultipleQueries() Assertions.assertEquals(2000L, aggregateStats.getMaxMergeBufferAcquisitionTimeNs()); Assertions.assertEquals(500L, aggregateStats.getMaxMergeBufferUsedBytes()); + // Max per-query proximity across the four queries: max(0.05, 0.5, 0.1, 0.075) = 0.5. + Assertions.assertEquals(0.5, aggregateStats.getMaxSpillProximity(), DELTA); Assertions.assertEquals(3000L, aggregateStats.getMaxSpilledBytes()); Assertions.assertEquals(1500L, aggregateStats.getMaxMergeDictionarySize()); @@ -132,4 +153,193 @@ public void testMetricsWithMultipleQueries() Assertions.assertEquals(3450L, aggregateStats.getSpilledBytes()); Assertions.assertEquals(2250L, aggregateStats.getMergeDictionarySize()); } + + @Test + public void testPerQueryUsedBytesSumsWhileSpillProximityTakesSliceMax() + { + GroupByStatsProvider.PerQueryStats stats = new GroupByStatsProvider.PerQueryStats(); + + // Simulate a ConcurrentGrouper closing four equally-sized slices. Each slice reports its own peak usage and the + // shared per-slice spill threshold. Used bytes accumulate (sum); spill proximity is driven by the fullest slice. + stats.addMergeBufferUsedBytes(10); + stats.sliceUsage(10, 1000); + stats.addMergeBufferUsedBytes(20); + stats.sliceUsage(20, 1000); + stats.addMergeBufferUsedBytes(30); + stats.sliceUsage(30, 1000); + stats.addMergeBufferUsedBytes(40); + stats.sliceUsage(40, 1000); + + // Used bytes are summed across slices. + Assertions.assertEquals(100L, stats.getMergeBufferUsedBytes()); + // Proximity is the fullest slice's fill fraction (40/1000), NOT the summed fraction (100/1000 = 0.1). + Assertions.assertEquals(0.04, stats.getSpillProximity(), DELTA); + } + + @Test + public void testSpillProximityClampsToOneAtSpillPoint() + { + GroupByStatsProvider.PerQueryStats stats = new GroupByStatsProvider.PerQueryStats(); + // A slice that reached its threshold exactly. + stats.sliceUsage(700, 700); + Assertions.assertEquals(1.0, stats.getSpillProximity(), DELTA); + + // The offset-list term can push raw used bytes slightly above the threshold; proximity must clamp at 1.0. + GroupByStatsProvider.PerQueryStats over = new GroupByStatsProvider.PerQueryStats(); + over.sliceUsage(750, 700); + Assertions.assertEquals(1.0, over.getSpillProximity(), DELTA); + } + + @Test + public void testSpillProximityZeroWhenNoSliceUsageRecorded() + { + GroupByStatsProvider.PerQueryStats stats = new GroupByStatsProvider.PerQueryStats(); + // No sliceUsage() call: threshold is 0, so proximity is 0.0 rather than dividing by zero. + stats.addMergeBufferUsedBytes(500); + Assertions.assertEquals(0.0, stats.getSpillProximity(), DELTA); + } + + @Test + public void testSpillProximityPicksFullestSliceWhenSlicesDiffer() + { + GroupByStatsProvider.PerQueryStats stats = new GroupByStatsProvider.PerQueryStats(); + // Slices with differing usage AND thresholds. maxSliceUsedBytes and sliceSpillThresholdBytes are each tracked + // as independent maxima; in practice all slices of a query share the same threshold, so this exercises the + // max-of-each behaviour for the dominating slice. + stats.sliceUsage(200, 1000); + stats.sliceUsage(900, 1000); + stats.sliceUsage(100, 1000); + // Fullest slice is 900/1000 = 0.9. + Assertions.assertEquals(0.9, stats.getSpillProximity(), DELTA); + } + + @Test + public void testAggregateStatsResetZeroesSpillProximity() + { + GroupByStatsProvider.AggregateStats aggregateStats = new GroupByStatsProvider.AggregateStats( + 1L, + 100L, + 100L, + 200L, + 200L, + 0.75, + 2L, + 200L, + 200L, + 300L, + 300L + ); + Assertions.assertEquals(0.75, aggregateStats.getMaxSpillProximity(), DELTA); + + aggregateStats.reset(); + Assertions.assertEquals(0.0, aggregateStats.getMaxSpillProximity(), DELTA); + } + + @Test + public void testAggregateStatsCopyConstructorRoundTripsSpillProximity() + { + GroupByStatsProvider.AggregateStats original = new GroupByStatsProvider.AggregateStats( + 1L, + 100L, + 100L, + 200L, + 200L, + 0.42, + 2L, + 200L, + 200L, + 300L, + 300L + ); + GroupByStatsProvider.AggregateStats copy = new GroupByStatsProvider.AggregateStats(original); + + Assertions.assertEquals(0.42, copy.getMaxSpillProximity(), DELTA); + // Spill proximity is the 6th ctor arg, sitting between maxBytesUsed and spilledQueries; verify neighbours + // did not shift position. + Assertions.assertEquals(200L, copy.getMaxMergeBufferUsedBytes()); + Assertions.assertEquals(2L, copy.getSpilledQueries()); + } + + @Test + public void testAggregateStatsTakesMaxSpillProximityAcrossQueries() + { + GroupByStatsProvider.AggregateStats agg = new GroupByStatsProvider.AggregateStats(); + + GroupByStatsProvider.PerQueryStats low = new GroupByStatsProvider.PerQueryStats(); + low.mergeBufferAcquisitionTime(10); + low.sliceUsage(300, 1000); // 0.3 + agg.addQueryStats(low); + + GroupByStatsProvider.PerQueryStats high = new GroupByStatsProvider.PerQueryStats(); + high.mergeBufferAcquisitionTime(10); + high.sliceUsage(800, 1000); // 0.8 + agg.addQueryStats(high); + + GroupByStatsProvider.PerQueryStats mid = new GroupByStatsProvider.PerQueryStats(); + mid.mergeBufferAcquisitionTime(10); + mid.sliceUsage(500, 1000); // 0.5 + agg.addQueryStats(mid); + + Assertions.assertEquals(0.8, agg.getMaxSpillProximity(), DELTA); + } + + @Test + public void testSpillProximityDroppedWhenNoAcquisitionTimeRecorded() + { + // A PerQueryStats with slice usage but no acquisition time is not folded into the mergeBuffer block, mirroring + // the monitor guard. In practice this never happens: acquisition time is recorded in + // GroupByResourcesReservationPool.reserve() before any grouper initializes. + GroupByStatsProvider.AggregateStats agg = new GroupByStatsProvider.AggregateStats(); + GroupByStatsProvider.PerQueryStats stats = new GroupByStatsProvider.PerQueryStats(); + stats.sliceUsage(900, 1000); + agg.addQueryStats(stats); + + Assertions.assertEquals(0L, agg.getMergeBufferQueries()); + Assertions.assertEquals(0.0, agg.getMaxSpillProximity(), DELTA); + } + + /** + * End-to-end through {@link GroupByStatsProvider} reproducing the user's scenario: a 125MiB merge buffer divided + * into 240 per-thread slices (sliceSize ~= 546KiB). Each slice fills well below the configured buffer size, yet the + * fullest slice reaches its spill threshold (sliceSize * 0.7) and the query spills. The summed {@code bytesUsed} is + * far below {@code sizeBytes}, which is exactly why comparing it to {@code sizeBytes} was misleading; {@code + * maxSpillProximity} instead reports ~1.0, correctly indicating the query was at the spill point. + */ + @Test + public void testEndToEndSlicedBufferSpillScenario() + { + final long sizeBytes = 125L * 1024 * 1024; // druid.processing.buffer.sizeBytes (125MiB) + final int numThreads = 240; // concurrencyHint / numThreads + final long sliceSize = sizeBytes / numThreads; // per-slice capacity (~546KiB) + final float loadFactor = 0.7f; // BufferHashGrouper.DEFAULT_MAX_LOAD_FACTOR + final long sliceThreshold = (long) (sliceSize * loadFactor); + + GroupByStatsProvider statsProvider = new GroupByStatsProvider(); + QueryResourceId id = new QueryResourceId("spilly"); + GroupByStatsProvider.PerQueryStats stats = statsProvider.getPerQueryStatsContainer(id); + + stats.mergeBufferAcquisitionTime(42); + long expectedUsed = 0; + for (int i = 0; i < numThreads; i++) { + // Most slices stay light; one slice (i == 0) reaches its threshold and triggers the spill. + final long sliceUsed = (i == 0) ? sliceThreshold : sliceThreshold / 10; + stats.addMergeBufferUsedBytes(sliceUsed); + stats.sliceUsage(sliceUsed, sliceThreshold); + expectedUsed += sliceUsed; + } + stats.spilledBytes(1_000_000L); + + statsProvider.closeQuery(id); + GroupByStatsProvider.AggregateStats aggregateStats = statsProvider.getStatsSince(); + + // The fullest slice is at its threshold, so proximity is 1.0: the query spilled. + Assertions.assertEquals(1.0, aggregateStats.getMaxSpillProximity(), DELTA); + // ...even though the summed usage across slices is a tiny fraction of the configured buffer size. + Assertions.assertEquals(expectedUsed, aggregateStats.getTotalMergeBufferUsedBytes()); + Assertions.assertTrue( + aggregateStats.getTotalMergeBufferUsedBytes() < sizeBytes / 2, + "summed bytesUsed should look small next to sizeBytes, despite the spill" + ); + Assertions.assertEquals(1L, aggregateStats.getSpilledQueries()); + } } diff --git a/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java index e5f46020fe09..9c69f182f8f5 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java +++ b/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java @@ -68,6 +68,7 @@ public boolean doMonitor(ServiceEmitter emitter) emitter.emit(builder.setMetric("mergeBuffer/maxAcquisitionTimeNs", statsContainer.getMaxMergeBufferAcquisitionTimeNs())); emitter.emit(builder.setMetric("mergeBuffer/bytesUsed", statsContainer.getTotalMergeBufferUsedBytes())); emitter.emit(builder.setMetric("mergeBuffer/maxBytesUsed", statsContainer.getMaxMergeBufferUsedBytes())); + emitter.emit(builder.setMetric("mergeBuffer/maxSpillProximity", statsContainer.getMaxSpillProximity())); } if (statsContainer.getSpilledQueries() > 0) { diff --git a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java index eaca043e02e7..8d02e64e3b49 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java @@ -63,6 +63,7 @@ public synchronized AggregateStats getStatsSince() 100L, 200L, 200L, + 0.85, 2L, 200L, 200L, @@ -93,7 +94,7 @@ public void testMonitor() // Trigger metric emission monitor.doMonitor(emitter); - Assert.assertEquals(12, emitter.getNumEmittedEvents()); + Assert.assertEquals(13, emitter.getNumEmittedEvents()); emitter.verifyValue("mergeBuffer/pendingRequests", 0L); emitter.verifyValue("mergeBuffer/used", 0L); emitter.verifyValue("mergeBuffer/queries", 1L); @@ -101,6 +102,7 @@ public void testMonitor() emitter.verifyValue("mergeBuffer/maxAcquisitionTimeNs", 100L); emitter.verifyValue("mergeBuffer/bytesUsed", 200L); emitter.verifyValue("mergeBuffer/maxBytesUsed", 200L); + emitter.verifyValue("mergeBuffer/maxSpillProximity", 0.85); emitter.verifyValue("groupBy/spilledQueries", 2L); emitter.verifyValue("groupBy/spilledBytes", 200L); emitter.verifyValue("groupBy/maxSpilledBytes", 200L); @@ -137,6 +139,7 @@ public void testMonitorWithServiceDimensions() verifyMetricValue(emitter, "mergeBuffer/maxAcquisitionTimeNs", dimFilters, 100L); verifyMetricValue(emitter, "mergeBuffer/bytesUsed", dimFilters, 200L); verifyMetricValue(emitter, "mergeBuffer/maxBytesUsed", dimFilters, 200L); + verifyMetricValue(emitter, "mergeBuffer/maxSpillProximity", dimFilters, 0.85); verifyMetricValue(emitter, "groupBy/spilledQueries", dimFilters, 2L); verifyMetricValue(emitter, "groupBy/spilledBytes", dimFilters, 200L); verifyMetricValue(emitter, "groupBy/maxSpilledBytes", dimFilters, 200L); @@ -202,21 +205,24 @@ public void testMonitoringWithMultipleResources() QueryResourceId r1 = new QueryResourceId("r1"); GroupByStatsProvider.PerQueryStats stats1 = statsProvider.getPerQueryStatsContainer(r1); stats1.mergeBufferAcquisitionTime(100); - stats1.maxMergeBufferUsedBytes(50); + stats1.addMergeBufferUsedBytes(50); + stats1.sliceUsage(50, 1000); // 0.05 stats1.spilledBytes(200); stats1.dictionarySize(100); QueryResourceId r2 = new QueryResourceId("r2"); GroupByStatsProvider.PerQueryStats stats2 = statsProvider.getPerQueryStatsContainer(r2); stats2.mergeBufferAcquisitionTime(500); - stats2.maxMergeBufferUsedBytes(30); + stats2.addMergeBufferUsedBytes(30); + stats2.sliceUsage(30, 2000); // 0.015 stats2.spilledBytes(100); stats2.dictionarySize(300); QueryResourceId r3 = new QueryResourceId("r3"); GroupByStatsProvider.PerQueryStats stats3 = statsProvider.getPerQueryStatsContainer(r3); stats3.mergeBufferAcquisitionTime(200); - stats3.maxMergeBufferUsedBytes(150); + stats3.addMergeBufferUsedBytes(150); + stats3.sliceUsage(150, 1500); // 0.1 stats3.spilledBytes(800); stats3.dictionarySize(200); @@ -239,10 +245,34 @@ public void testMonitoringWithMultipleResources() emitter.verifyValue("mergeBuffer/maxAcquisitionTimeNs", 500L); emitter.verifyValue("mergeBuffer/maxBytesUsed", 150L); + // Spill proximity is the MAX per-query fullest-slice fill fraction: max(0.05, 0.015, 0.1) = 0.1. + emitter.verifyValue("mergeBuffer/maxSpillProximity", 0.1); emitter.verifyValue("groupBy/maxSpilledBytes", 800L); emitter.verifyValue("groupBy/maxMergeDictionarySize", 300L); } + @Test + public void testMaxSpillProximityNotEmittedWhenNoMergeBufferQueries() + { + // No query records any merge-buffer acquisition time, so the entire mergeBuffer/* block is skipped. + GroupByStatsProvider statsProvider = new GroupByStatsProvider(); + + QueryResourceId r1 = new QueryResourceId("r1"); + GroupByStatsProvider.PerQueryStats stats1 = statsProvider.getPerQueryStatsContainer(r1); + // dictionary-only activity, no merge buffer acquisition + stats1.dictionarySize(100); + statsProvider.closeQuery(r1); + + final GroupByStatsMonitor monitor = new GroupByStatsMonitor(statsProvider, mergeBufferPool); + final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + emitter.start(); + monitor.doMonitor(emitter); + + Assert.assertTrue(emitter.getMetricEvents("mergeBuffer/queries").isEmpty()); + Assert.assertTrue(emitter.getMetricEvents("mergeBuffer/maxBytesUsed").isEmpty()); + Assert.assertTrue(emitter.getMetricEvents("mergeBuffer/maxSpillProximity").isEmpty()); + } + private void verifyMetricValue(StubServiceEmitter emitter, String metricName, Map dimFilters, Number expectedValue) { final List observedMetricEvents = emitter.getMetricEvents(metricName);