diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java index 7f6cc9cdfe2..b8a287ec704 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java @@ -72,6 +72,9 @@ public static Set slotsCanReassign(Cluster cluster, Set } public static void defaultSchedule(Topologies topologies, Cluster cluster) { + // Single full-set round-robin redistribute for the whole round. The per-topology scheduleTopologiesEvenly call + // below passes redistributeOntoIdle=false so the max.free.per.topology cap is not applied a second time on a + // supervisor left idle by this pass (apache/storm#8778 follow-up). EvenScheduler.redistributeOntoIdleSupervisors(topologies, cluster); for (TopologyDetails topology : cluster.needsSchedulingTopologies()) { // needsSchedulingTopologies() returns the cluster's full topology set, but this run is scoped to the @@ -103,7 +106,7 @@ public static void defaultSchedule(Topologies topologies, Cluster cluster) { cluster.freeSlots(badSlots); } - EvenScheduler.scheduleTopologiesEvenly(new Topologies(topology), cluster); + EvenScheduler.scheduleTopologiesEvenly(new Topologies(topology), cluster, false); } } diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java index 69d62d6248d..dfd0ec23b55 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java @@ -129,9 +129,9 @@ public static Map> getAliveAssignedWorkerSlotE *

Gated by {@link DaemonConfig#NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED}: when disabled (the default) the * method returns before scanning any supervisor, so a cluster that has not opted in pays no per-scheduling-round cost. * - *

This is the package-private entry point shared by {@link #scheduleTopologiesEvenly(Topologies, Cluster)} and - * {@link DefaultScheduler#defaultSchedule(Topologies, Cluster)}; its visibility is dictated by those callers, not by - * tests (which also reach it from the same package). + *

This is the package-private entry point reached by the {@code scheduleTopologiesEvenly} overloads (when + * {@code redistributeOntoIdle} is true) and called directly by {@link DefaultScheduler#defaultSchedule(Topologies, + * Cluster)}; its visibility is dictated by those callers, not by tests (which also reach it from the same package). */ static void redistributeOntoIdleSupervisors(Topologies topologies, Cluster cluster) { if (!ObjectReader.getBoolean( @@ -333,12 +333,31 @@ public int compare(ExecutorDetails o1, ExecutorDetails o2) { } public static void scheduleTopologiesEvenly(Topologies topologies, Cluster cluster) { - redistributeOntoIdleSupervisors(topologies, cluster); + scheduleTopologiesEvenly(topologies, cluster, true); + } + + /** + * Even-schedules each topology in {@code topologies}, optionally running the idle-supervisor redistribute pass first. + * + *

{@code redistributeOntoIdle} exists to keep the per-topology {@code max.free.per.topology} cap applied once per + * scheduling round. The {@link EvenScheduler#schedule(Topologies, Cluster)} entry point passes {@code true}: it runs + * here once over the full topology set, so the redistribute is the single full-set round-robin pass. + * {@link DefaultScheduler#defaultSchedule(Topologies, Cluster)} instead calls + * {@link #redistributeOntoIdleSupervisors(Topologies, Cluster)} itself, once, over the full set, and then delegates + * here once per leftover topology with a single-topology {@link Topologies}; it passes {@code false} so the cap is not + * re-applied per topology. Without that guard a returning supervisor left idle by the full-set pass would be filled a + * second time in the same round, letting an under-assigned topology move up to twice the cap (apache/storm#8778 + * follow-up). + */ + static void scheduleTopologiesEvenly(Topologies topologies, Cluster cluster, boolean redistributeOntoIdle) { + if (redistributeOntoIdle) { + redistributeOntoIdleSupervisors(topologies, cluster); + } for (TopologyDetails topology : cluster.needsSchedulingTopologies()) { // needsSchedulingTopologies() returns the cluster's full topology set, but this run is scoped to the // topologies passed in: EvenScheduler.schedule passes the full set (so the guard is a no-op), while // DefaultScheduler.defaultSchedule calls us once per leftover topology with a single-topology Topologies. - // redistributeOntoIdleSupervisors above acted only on that passed-in set too. Skip topologies outside it so + // The redistribute pass (when run) acted only on that passed-in set too. Skip topologies outside it so // the leftover path never schedules one the caller excluded -- e.g. a down isolated topology on a reserved host. if (topologies.getById(topology.getId()) == null) { continue; diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/TestEvenSchedulerIdleSupervisor.java b/storm-server/src/test/java/org/apache/storm/scheduler/TestEvenSchedulerIdleSupervisor.java index c5849c683b9..32756a4ec7c 100644 --- a/storm-server/src/test/java/org/apache/storm/scheduler/TestEvenSchedulerIdleSupervisor.java +++ b/storm-server/src/test/java/org/apache/storm/scheduler/TestEvenSchedulerIdleSupervisor.java @@ -547,6 +547,61 @@ public void blacklistedIdleSupervisorIsNotReusableTarget() { assertEquals(3, cluster.getAssignedNumWorkers(firstTopology(cluster))); } + /** + * Regression for the apache/storm#8778 follow-up: in the {@link DefaultScheduler} path the per-topology + * {@code max.free.per.topology} cap must bind once per scheduling round, not once per redistribute call. + * + *

{@link DefaultScheduler#defaultSchedule(Topologies, Cluster)} runs the idle-supervisor redistribute once over + * the full topology set, then delegates to {@link EvenScheduler#scheduleTopologiesEvenly(Topologies, Cluster, boolean)} + * once per under-assigned topology (now passing {@code redistributeOntoIdle=false}) -- the 2-arg overload it used to + * call ran the redistribute a second time. With two idle supervisors the + * full-set pass fills one of them (consuming the cap), leaving the second idle for the per-topology pass to fill + * again, so an under-assigned topology relocated up to {@code 2 * maxFree} workers in a single round. + * + *

The fixture makes the topology under-assigned by declaring more workers than it has slots (8 declared, 4 + * executors on 4 slots) so {@code needsScheduling} stays true and the delegated call fires, while leaving no + * executor unassigned -- the ordinary even-scheduling pass is then a no-op and only the redistribute relocations are + * observable. sup-0 and sup-1 are donors with two workers each; sup-2 and sup-3 start idle. + */ + @Test + public void defaultSchedulerAppliesMaxFreeCapOncePerRound() { + String topoId = "topo-double-cap"; + Map supMap = TestUtilsForBlacklistScheduler.genSupervisors(4, 4); + // 8 declared workers but only 4 executors: needsScheduling stays true (8 > 4 assigned) with nothing to reassign. + TopologyDetails topology = makeTopologyDetails(topoId, 8, 2); + + Map assignments = new HashMap<>(); + assignments.put(topoId, buildAssignment(topology, new WorkerSlot[]{ + new WorkerSlot("sup-0", 0), new WorkerSlot("sup-0", 1), + new WorkerSlot("sup-1", 0), new WorkerSlot("sup-1", 1), + })); + + Map topoMap = new HashMap<>(); + topoMap.put(topoId, topology); + Topologies topologies = new Topologies(topoMap); + + Cluster cluster = newCluster(supMap, assignments, topologies, evenRebalanceConf(true, 1)); + + // sup-2 and sup-3 both start idle; the topology has two workers on each of sup-0 and sup-1. + assertEquals(0, usedSlotCount(cluster, "sup-2")); + assertEquals(0, usedSlotCount(cluster, "sup-3")); + + DefaultScheduler.defaultSchedule(new Topologies(topology), cluster); + + // maxFree=1 caps relocation at one existing worker per round. The old double redistribute moved two -- one onto + // each idle supervisor; with the cap applied once per round only one idle supervisor is filled and the other + // stays untouched. + int relocatedOntoIdle = supervisorWorkerCount(cluster, topoId, "sup-2") + + supervisorWorkerCount(cluster, topoId, "sup-3"); + assertEquals(1, relocatedOntoIdle, + "max.free.per.topology must cap idle-supervisor relocation at 1 per round, not 2 (apache/storm#8778 follow-up)"); + assertEquals(0, supervisorWorkerCount(cluster, topoId, "sup-3"), + "the second idle supervisor stays idle once the per-round cap is reached"); + // The relocation moves existing workers only -- the assigned worker count is unchanged and no executor is lost. + assertEquals(4, cluster.getAssignedNumWorkers(cluster.getTopologies().getById(topoId)), + "relocation must preserve the topology's four assigned workers"); + } + @Test public void defaultSchedulerIdleRebalanceHonorsLeftoverTopologySubset() { Map supMap = genSupervisorsWithUptime(3, 4, 100);