Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,25 @@ public CostBasedAutoScalerConfig getConfig()
return config;
}

private boolean isCriticalLag(CostMetrics metrics)
{
final Long criticalLagThreshold = config.getCriticalLagThreshold();
return metrics != null && criticalLagThreshold != null
&& metrics.getAggregateLag() >= criticalLagThreshold * WeightedCostFunction.CRITICAL_LAG_TIER1_FRACTION;
}

/**
* Whether the last collected metrics crossed {@link WeightedCostFunction#CRITICAL_LAG_TIER2_FRACTION} of
* {@link CostBasedAutoScalerConfig#getCriticalLagThreshold()}, meaning the argmin search should be
* skipped entirely in favor of jumping straight to the maximum task count.
*/
private boolean isEmergencyLag(CostMetrics metrics)
{
final Long criticalLagThreshold = config.getCriticalLagThreshold();
return metrics != null && criticalLagThreshold != null
&& metrics.getAggregateLag() >= criticalLagThreshold * WeightedCostFunction.CRITICAL_LAG_TIER2_FRACTION;
}

/**
* Returns the lowest-cost task count given {@code metrics}, or {@link #CANNOT_COMPUTE} when
* metrics are unusable. Returning the current task count means the current count is already
Expand Down Expand Up @@ -244,6 +263,24 @@ int computeOptimalTaskCount(CostMetrics metrics)
return currentTaskCount;
}

final boolean criticalLag = isCriticalLag(metrics);
final boolean emergencyLag = isEmergencyLag(metrics);
if (emergencyLag) {
log.info(
"Supervisor[%s] aggregateLag[%.0f] crossed [%.0f%%] of criticalLagThreshold[%d]: skipping the argmin"
+ " search and jumping straight to the maximum task count.",
supervisorId, metrics.getAggregateLag(), WeightedCostFunction.CRITICAL_LAG_TIER2_FRACTION * 100,
config.getCriticalLagThreshold()
);
} else if (criticalLag) {
log.info(
"Supervisor[%s] aggregateLag[%.0f] crossed [%.0f%%] of criticalLagThreshold[%d]: widening scale-up"
+ " candidates and maxing out the lag-amplification multiplier.",
supervisorId, metrics.getAggregateLag(), WeightedCostFunction.CRITICAL_LAG_TIER1_FRACTION * 100,
config.getCriticalLagThreshold()
);
}

// Start with the current task count as optimal
int optimalTaskCount = currentTaskCount;
CostResult optimalCost = costFunction.computeCost(metrics, currentTaskCount, config);
Expand All @@ -270,7 +307,7 @@ int computeOptimalTaskCount(CostMetrics metrics)
int startIndex = 0;
int endIndex = validTaskCounts.length - 1;

if (config.isUseTaskCountBoundariesOnScaleUp()) {
if (config.isUseTaskCountBoundariesOnScaleUp() && !criticalLag) {
int currentTaskCountIndex = Arrays.binarySearch(validTaskCounts, currentTaskCount);
endIndex = currentTaskCountIndex >= 0
? Math.min(currentTaskCountIndex + BOUNDARY_LIMIT_IN_PARTITIONS_PER_TASK, endIndex)
Expand All @@ -284,6 +321,12 @@ int computeOptimalTaskCount(CostMetrics metrics)
: startIndex;
}

// Emergency (tier 2) lag skips the argmin search entirely: evaluate only the maximum valid task count.
if (emergencyLag) {
startIndex = validTaskCounts.length - 1;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Make emergency lag choose the max unconditionally

The emergency path narrows the loop to only the maximum valid task count, but optimalTaskCount and optimalCost are still initialized from the current task count before this block. If the configured weights make the current-count cost lower than the max-count cost, for example lagWeight=0 and idleWeight=1, the loop evaluates the max candidate but never updates optimalTaskCount, so tier-2 emergency lag returns the current count instead of jumping to the max. Set the emergency result to the max candidate unconditionally, or initialize the optimal candidate from startIndex after the emergency bounds are applied.

endIndex = validTaskCounts.length - 1;
}

for (int i = startIndex; i <= endIndex; ++i) {
final int taskCount = validTaskCounts[i];
CostResult costResult = costFunction.computeCost(metrics, taskCount, config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class CostBasedAutoScalerConfig implements AutoScalerConfig
private final Duration minScaleDownDelay;
private final boolean scaleDownDuringTaskRolloverOnly;
private final boolean usePollIdleRatio;
private final Long criticalLagThreshold;

/**
* Creates a new CostBasedAutoScalerConfig instance.
Expand All @@ -93,7 +94,8 @@ public CostBasedAutoScalerConfig(
@Nullable @JsonProperty("minScaleUpDelay") Duration minScaleUpDelay,
@Nullable @JsonProperty("minScaleDownDelay") Duration minScaleDownDelay,
@Nullable @JsonProperty("scaleDownDuringTaskRolloverOnly") Boolean scaleDownDuringTaskRolloverOnly,
@Nullable @JsonProperty("usePollIdleRatio") Boolean usePollIdleRatio
@Nullable @JsonProperty("usePollIdleRatio") Boolean usePollIdleRatio,
@Nullable @JsonProperty("criticalLagThreshold") Long criticalLagThreshold
)
{
this.enableTaskAutoScaler = enableTaskAutoScaler != null ? enableTaskAutoScaler : false;
Expand Down Expand Up @@ -123,6 +125,12 @@ public CostBasedAutoScalerConfig(
this.minScaleDownDelay = Configs.valueOrDefault(minScaleDownDelay, DEFAULT_MIN_SCALE_DELAY);
this.scaleDownDuringTaskRolloverOnly = Configs.valueOrDefault(scaleDownDuringTaskRolloverOnly, false);
this.usePollIdleRatio = Configs.valueOrDefault(usePollIdleRatio, true);
this.criticalLagThreshold = criticalLagThreshold;

Preconditions.checkArgument(
criticalLagThreshold == null || criticalLagThreshold > 0,
"criticalLagThreshold must be > 0"
);

if (this.enableTaskAutoScaler) {
Preconditions.checkNotNull(taskCountMax, "taskCountMax is required when enableTaskAutoScaler is true");
Expand Down Expand Up @@ -305,6 +313,24 @@ public boolean isUsePollIdleRatio()
return usePollIdleRatio;
}

/**
* Aggregate (sum-across-partitions) lag threshold driving a two-tier SLA-critical fast path,
* relative to {@link CostMetrics#getAggregateLag()}:
* <ul>
* <li>At 75% of this value, the lag-amplification multiplier maxes out at 6.0 (instead of the
* default 0.3), and the scale-up candidate search bypasses {@link #isUseTaskCountBoundariesOnScaleUp()}.</li>
* <li>At 95% of this value, cost minimization is skipped entirely and the task count jumps
* straight to the maximum.</li>
* </ul>
* {@code null} disables the feature.
*/
@JsonProperty
@Nullable
public Long getCriticalLagThreshold()
{
return criticalLagThreshold;
}

@Override
public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec, ServiceEmitter emitter)
{
Expand Down Expand Up @@ -338,7 +364,8 @@ public boolean equals(Object o)
&& scaleDownDuringTaskRolloverOnly == that.scaleDownDuringTaskRolloverOnly
&& usePollIdleRatio == that.usePollIdleRatio
&& Objects.equals(taskCountStart, that.taskCountStart)
&& Objects.equals(stopTaskCountRatio, that.stopTaskCountRatio);
&& Objects.equals(stopTaskCountRatio, that.stopTaskCountRatio)
&& Objects.equals(criticalLagThreshold, that.criticalLagThreshold);
}

@Override
Expand All @@ -360,7 +387,8 @@ public int hashCode()
minScaleUpDelay,
minScaleDownDelay,
scaleDownDuringTaskRolloverOnly,
usePollIdleRatio
usePollIdleRatio,
criticalLagThreshold
);
}

Expand All @@ -384,6 +412,7 @@ public String toString()
", minScaleDownDelay=" + minScaleDownDelay +
", scaleDownDuringTaskRolloverOnly=" + scaleDownDuringTaskRolloverOnly +
", usePollIdleRatio=" + usePollIdleRatio +
", criticalLagThreshold=" + criticalLagThreshold +
'}';
}

Expand All @@ -409,6 +438,7 @@ public static class Builder
private Duration minScaleDownDelay;
private Boolean scaleDownDuringTaskRolloverOnly;
private Boolean usePollIdleRatio;
private Long criticalLagThreshold;

private Builder()
{
Expand Down Expand Up @@ -498,6 +528,12 @@ public Builder usePollIdleRatio(boolean usePollIdleRatio)
return this;
}

public Builder criticalLagThreshold(Long criticalLagThreshold)
{
this.criticalLagThreshold = criticalLagThreshold;
return this;
}

public Builder useTaskCountBoundariesOnScaleUp(boolean useTaskCountBoundariesOnScaleUp)
{
this.useTaskCountBoundariesOnScaleUp = useTaskCountBoundariesOnScaleUp;
Expand Down Expand Up @@ -528,7 +564,8 @@ public CostBasedAutoScalerConfig build()
minScaleUpDelay,
minScaleDownDelay,
scaleDownDuringTaskRolloverOnly,
usePollIdleRatio
usePollIdleRatio,
criticalLagThreshold
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,26 @@ public class WeightedCostFunction
*/
static final double LAG_AMPLIFICATION_MULTIPLIER = 0.3;

/**
* Amplification multiplier used once aggregate lag crosses {@link #CRITICAL_LAG_TIER1_FRACTION} of
* {@link CostBasedAutoScalerConfig#getCriticalLagThreshold()} (tier 1 of the critical-lag fast path).
*/
static final double CRITICAL_LAG_AMPLIFICATION_MULTIPLIER = 6.0;

/**
* Fraction of {@link CostBasedAutoScalerConfig#getCriticalLagThreshold()} at which tier 1 of the
* critical-lag fast path engages: amplification maxes out at {@link #CRITICAL_LAG_AMPLIFICATION_MULTIPLIER}
* and the scale-up candidate boundary is bypassed.
*/
static final double CRITICAL_LAG_TIER1_FRACTION = 0.75;

/**
* Fraction of {@link CostBasedAutoScalerConfig#getCriticalLagThreshold()} at which tier 2 of the
* critical-lag fast path engages: the cost-minimization search is skipped entirely and the task
* count jumps straight to the maximum.
*/
static final double CRITICAL_LAG_TIER2_FRACTION = 0.95;

/**
* Exponent (< 1) for sublinear busy redistribution in the idle projection:
* busy grows as {@code (currentTaskCount / proposedTaskCount)^EXPONENT}, not linearly.
Expand Down Expand Up @@ -108,12 +128,19 @@ public CostResult computeCost(
// Lag recovery time is decreasing by adding tasks and increasing by ejecting tasks.
// In case of increasing lag, we apply an amplification factor to reflect the urgency of addressing lag.
// Caution: we rely only on the metrics, the real issues may be absolutely different, up to hardware failure.
// Once aggregate lag crosses CRITICAL_LAG_TIER1_FRACTION of criticalLagThreshold, the multiplier is
// maxed out at CRITICAL_LAG_AMPLIFICATION_MULTIPLIER (vs the default 0.3).
final double lagRecoveryTime;
if (metrics.getAggregateLag() <= 0) {
lagRecoveryTime = 0;
} else {
final double lagPerPartition = metrics.getAggregateLag() / metrics.getPartitionCount();
final double amplification = Math.max(1.0, 1.0 + LAG_AMPLIFICATION_MULTIPLIER * Math.log(lagPerPartition));
final Long criticalLagThreshold = config.getCriticalLagThreshold();
final boolean criticalLag = criticalLagThreshold != null
&& metrics.getAggregateLag() >= criticalLagThreshold * CRITICAL_LAG_TIER1_FRACTION;

final double amplificationMultiplier = criticalLag ? CRITICAL_LAG_AMPLIFICATION_MULTIPLIER : LAG_AMPLIFICATION_MULTIPLIER;
final double amplification = Math.max(1.0, 1.0 + amplificationMultiplier * Math.log(lagPerPartition));
final double adjustedProcessingRate = Math.max(avgProcessingRate, MIN_PROCESSING_RATE);
lagRecoveryTime = metrics.getAggregateLag() * amplification / (proposedTaskCount * adjustedProcessingRate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public void testSerdeWithAllProperties() throws Exception
+ " \"minScaleUpDelay\": \"PT5M\",\n"
+ " \"minScaleDownDelay\": \"PT10M\",\n"
+ " \"scaleDownDuringTaskRolloverOnly\": true,\n"
+ " \"usePollIdleRatio\": false\n"
+ " \"usePollIdleRatio\": false,\n"
+ " \"criticalLagThreshold\": 500000\n"
+ "}";

final CostBasedAutoScalerConfig config = mapper.readValue(json, CostBasedAutoScalerConfig.class);
Expand All @@ -74,6 +75,7 @@ public void testSerdeWithAllProperties() throws Exception
Assert.assertFalse(config.isUsePollIdleRatio());
Assert.assertFalse(config.isUseTaskCountBoundariesOnScaleUp());
Assert.assertTrue(config.isUseTaskCountBoundariesOnScaleDown());
Assert.assertEquals(Long.valueOf(500000), config.getCriticalLagThreshold());

// Test serialization back to JSON
final String serialized = mapper.writeValueAsString(config);
Expand Down Expand Up @@ -112,6 +114,7 @@ public void testSerdeWithDefaults() throws Exception
Assert.assertTrue(config.isUseTaskCountBoundariesOnScaleDown());
Assert.assertNull(config.getTaskCountStart());
Assert.assertNull(config.getStopTaskCountRatio());
Assert.assertNull(config.getCriticalLagThreshold());
}

@Test
Expand Down Expand Up @@ -221,6 +224,7 @@ public void testBuilder()
.minScaleDownDelay(Duration.standardMinutes(10))
.scaleDownDuringTaskRolloverOnly(true)
.usePollIdleRatio(false)
.criticalLagThreshold(500000L)
.build();

Assert.assertTrue(config.getEnableTaskAutoScaler());
Expand All @@ -238,6 +242,18 @@ public void testBuilder()
Assert.assertEquals(Duration.standardMinutes(10), config.getMinScaleDownDelay());
Assert.assertTrue(config.isScaleDownOnTaskRolloverOnly());
Assert.assertFalse(config.isUsePollIdleRatio());
Assert.assertEquals(Long.valueOf(500000), config.getCriticalLagThreshold());
}

@Test(expected = IllegalArgumentException.class)
public void testValidation_ZeroCriticalLagThreshold()
{
CostBasedAutoScalerConfig.builder()
.taskCountMax(100)
.taskCountMin(5)
.criticalLagThreshold(0L)
.enableTaskAutoScaler(true)
.build();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,62 @@ public void testComputeOptimalTaskCountLimitsTaskCountJumps()
);
}

@Test
public void testCriticalLagThresholdBypassesScaleUpBoundary()
{
// aggregateLag = 100_000 * 100 = 10,000,000. With threshold=12,000,000: tier1=9,000,000 (crossed),
// tier2=11,400,000 (not crossed), so this exercises tier1 (boundary bypass) without triggering
// tier2's emergency jump-to-max.
final CostBasedAutoScalerConfig boundedScaleUpConfig = CostBasedAutoScalerConfig
.builder()
.taskCountMax(100)
.taskCountMin(1)
.enableTaskAutoScaler(true)
.lagWeight(1.0)
.idleWeight(0.0)
.useTaskCountBoundariesOnScaleUp(true)
.criticalLagThreshold(12_000_000L)
.build();
final CostBasedAutoScaler scaler = createAutoScaler(boundedScaleUpConfig);

Assert.assertEquals(
"Critical lag should bypass the scale-up boundary and jump straight to the argmin",
100,
scaler.computeOptimalTaskCount(createMetrics(100_000.0, 10, 100, 0.25))
);

// Below the threshold, the boundary still applies as usual.
Assert.assertEquals(
"Below criticalLagThreshold, the scale-up boundary still limits candidates",
13,
scaler.computeOptimalTaskCount(createMetrics(10.0, 10, 100, 0.25))
);
}

@Test
public void testEmergencyLagJumpsStraightToMaxTaskCount()
{
// aggregateLag = 100_000 * 500 = 50,000,000. With threshold=10,000,000: tier2=9,500,000 is
// comfortably crossed, so the argmin search is skipped entirely in favor of the maximum task count.
final CostBasedAutoScalerConfig config = CostBasedAutoScalerConfig
.builder()
.taskCountMax(500)
.taskCountMin(1)
.enableTaskAutoScaler(true)
.lagWeight(0.1)
.idleWeight(0.9)
.criticalLagThreshold(10_000_000L)
.build();
final CostBasedAutoScaler scaler = createAutoScaler(config);

// Idle-heavy weights would normally argue for scaling down, but emergency lag overrides that entirely.
Assert.assertEquals(
"Emergency lag should jump straight to the maximum task count regardless of idle-favoring weights",
500,
scaler.computeOptimalTaskCount(createMetrics(100_000.0, 10, 500, 0.9))
);
}

@Test
public void testExtractPollIdleRatio()
{
Expand Down
Loading
Loading