Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ public static Set<WorkerSlot> slotsCanReassign(Cluster cluster, Set<WorkerSlot>
}

public static void defaultSchedule(Topologies topologies, Cluster cluster) {
// Single full-set round-robin redistribute for the whole round. The per-topology scheduleTopologiesEvenly call
// below passes redistributeOntoIdle=false so the max.free.per.topology cap is not applied a second time on a
// supervisor left idle by this pass (apache/storm#8778 follow-up).
EvenScheduler.redistributeOntoIdleSupervisors(topologies, cluster);
for (TopologyDetails topology : cluster.needsSchedulingTopologies()) {
// needsSchedulingTopologies() returns the cluster's full topology set, but this run is scoped to the
Expand Down Expand Up @@ -103,7 +106,7 @@ public static void defaultSchedule(Topologies topologies, Cluster cluster) {
cluster.freeSlots(badSlots);
}

EvenScheduler.scheduleTopologiesEvenly(new Topologies(topology), cluster);
EvenScheduler.scheduleTopologiesEvenly(new Topologies(topology), cluster, false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ public static Map<WorkerSlot, List<ExecutorDetails>> getAliveAssignedWorkerSlotE
* <p>Gated by {@link DaemonConfig#NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED}: when disabled (the default) the
* method returns before scanning any supervisor, so a cluster that has not opted in pays no per-scheduling-round cost.
*
* <p>This is the package-private entry point shared by {@link #scheduleTopologiesEvenly(Topologies, Cluster)} and
* {@link DefaultScheduler#defaultSchedule(Topologies, Cluster)}; its visibility is dictated by those callers, not by
* tests (which also reach it from the same package).
* <p>This is the package-private entry point reached by the {@code scheduleTopologiesEvenly} overloads (when
* {@code redistributeOntoIdle} is true) and called directly by {@link DefaultScheduler#defaultSchedule(Topologies,
* Cluster)}; its visibility is dictated by those callers, not by tests (which also reach it from the same package).
*/
static void redistributeOntoIdleSupervisors(Topologies topologies, Cluster cluster) {
if (!ObjectReader.getBoolean(
Expand Down Expand Up @@ -333,12 +333,31 @@ public int compare(ExecutorDetails o1, ExecutorDetails o2) {
}

public static void scheduleTopologiesEvenly(Topologies topologies, Cluster cluster) {
redistributeOntoIdleSupervisors(topologies, cluster);
scheduleTopologiesEvenly(topologies, cluster, true);
}

/**
* Even-schedules each topology in {@code topologies}, optionally running the idle-supervisor redistribute pass first.
*
* <p>{@code redistributeOntoIdle} exists to keep the per-topology {@code max.free.per.topology} cap applied once per
* scheduling round. The {@link EvenScheduler#schedule(Topologies, Cluster)} entry point passes {@code true}: it runs
* here once over the full topology set, so the redistribute is the single full-set round-robin pass.
* {@link DefaultScheduler#defaultSchedule(Topologies, Cluster)} instead calls
* {@link #redistributeOntoIdleSupervisors(Topologies, Cluster)} itself, once, over the full set, and then delegates
* here once per leftover topology with a single-topology {@link Topologies}; it passes {@code false} so the cap is not
* re-applied per topology. Without that guard a returning supervisor left idle by the full-set pass would be filled a
* second time in the same round, letting an under-assigned topology move up to twice the cap (apache/storm#8778
* follow-up).
*/
static void scheduleTopologiesEvenly(Topologies topologies, Cluster cluster, boolean redistributeOntoIdle) {
if (redistributeOntoIdle) {
redistributeOntoIdleSupervisors(topologies, cluster);
}
for (TopologyDetails topology : cluster.needsSchedulingTopologies()) {
// needsSchedulingTopologies() returns the cluster's full topology set, but this run is scoped to the
// topologies passed in: EvenScheduler.schedule passes the full set (so the guard is a no-op), while
// DefaultScheduler.defaultSchedule calls us once per leftover topology with a single-topology Topologies.
// redistributeOntoIdleSupervisors above acted only on that passed-in set too. Skip topologies outside it so
// The redistribute pass (when run) acted only on that passed-in set too. Skip topologies outside it so
// the leftover path never schedules one the caller excluded -- e.g. a down isolated topology on a reserved host.
if (topologies.getById(topology.getId()) == null) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,61 @@ public void blacklistedIdleSupervisorIsNotReusableTarget() {
assertEquals(3, cluster.getAssignedNumWorkers(firstTopology(cluster)));
}

/**
* Regression for the apache/storm#8778 follow-up: in the {@link DefaultScheduler} path the per-topology
* {@code max.free.per.topology} cap must bind once per scheduling round, not once per redistribute call.
*
* <p>{@link DefaultScheduler#defaultSchedule(Topologies, Cluster)} runs the idle-supervisor redistribute once over
* the full topology set, then delegates to {@link EvenScheduler#scheduleTopologiesEvenly(Topologies, Cluster, boolean)}
* once per under-assigned topology (now passing {@code redistributeOntoIdle=false}) -- the 2-arg overload it used to
* call ran the redistribute a second time. With two idle supervisors the
* full-set pass fills one of them (consuming the cap), leaving the second idle for the per-topology pass to fill
* again, so an under-assigned topology relocated up to {@code 2 * maxFree} workers in a single round.
*
* <p>The fixture makes the topology under-assigned by declaring more workers than it has slots (8 declared, 4
* executors on 4 slots) so {@code needsScheduling} stays true and the delegated call fires, while leaving no
* executor unassigned -- the ordinary even-scheduling pass is then a no-op and only the redistribute relocations are
* observable. sup-0 and sup-1 are donors with two workers each; sup-2 and sup-3 start idle.
*/
@Test
public void defaultSchedulerAppliesMaxFreeCapOncePerRound() {
String topoId = "topo-double-cap";
Map<String, SupervisorDetails> supMap = TestUtilsForBlacklistScheduler.genSupervisors(4, 4);
// 8 declared workers but only 4 executors: needsScheduling stays true (8 > 4 assigned) with nothing to reassign.
TopologyDetails topology = makeTopologyDetails(topoId, 8, 2);

Map<String, SchedulerAssignmentImpl> assignments = new HashMap<>();
assignments.put(topoId, buildAssignment(topology, new WorkerSlot[]{
new WorkerSlot("sup-0", 0), new WorkerSlot("sup-0", 1),
new WorkerSlot("sup-1", 0), new WorkerSlot("sup-1", 1),
}));

Map<String, TopologyDetails> topoMap = new HashMap<>();
topoMap.put(topoId, topology);
Topologies topologies = new Topologies(topoMap);

Cluster cluster = newCluster(supMap, assignments, topologies, evenRebalanceConf(true, 1));

// sup-2 and sup-3 both start idle; the topology has two workers on each of sup-0 and sup-1.
assertEquals(0, usedSlotCount(cluster, "sup-2"));
assertEquals(0, usedSlotCount(cluster, "sup-3"));

DefaultScheduler.defaultSchedule(new Topologies(topology), cluster);

// maxFree=1 caps relocation at one existing worker per round. The old double redistribute moved two -- one onto
// each idle supervisor; with the cap applied once per round only one idle supervisor is filled and the other
// stays untouched.
int relocatedOntoIdle = supervisorWorkerCount(cluster, topoId, "sup-2")
+ supervisorWorkerCount(cluster, topoId, "sup-3");
assertEquals(1, relocatedOntoIdle,
"max.free.per.topology must cap idle-supervisor relocation at 1 per round, not 2 (apache/storm#8778 follow-up)");
assertEquals(0, supervisorWorkerCount(cluster, topoId, "sup-3"),
"the second idle supervisor stays idle once the per-round cap is reached");
// The relocation moves existing workers only -- the assigned worker count is unchanged and no executor is lost.
assertEquals(4, cluster.getAssignedNumWorkers(cluster.getTopologies().getById(topoId)),
"relocation must preserve the topology's four assigned workers");
}

@Test
public void defaultSchedulerIdleRebalanceHonorsLeftoverTopologySubset() {
Map<String, SupervisorDetails> supMap = genSupervisorsWithUptime(3, 4, 100);
Expand Down