diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java index 2ebd4bd89c..4ec48b5540 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java @@ -303,10 +303,16 @@ private Map> hostToUsedSlots(Cluster cluster) { return hostUsedSlots; } - // returns list of list of slots, reverse sorted by number of slots + // Returns list of hosts with their assignable slots, sorted by: + // 1. (primary) total assignable slots, descending — same as before + // 2. (secondary) currently free slots, descending — prefer hosts that + // need fewer evictions + // 3. (tertiary) host name, ascending — deterministic order + // for testability private LinkedList hostAssignableSlots(Cluster cluster) { List assignableSlots = cluster.getAssignableSlots(); Map> hostAssignableSlots = new HashMap>(); + Map hostFreeSlotCounts = new HashMap(); for (WorkerSlot slot : assignableSlots) { String host = cluster.getHost(slot.getNodeId()); List slots = hostAssignableSlots.get(host); @@ -315,15 +321,31 @@ private LinkedList hostAssignableSlots(Cluster cluster) { hostAssignableSlots.put(host, slots); } slots.add(slot); + if (!cluster.isSlotOccupied(slot)) { + Integer count = hostFreeSlotCounts.get(host); + hostFreeSlotCounts.put(host, count == null ? 1 : count + 1); + } } List sortHostAssignSlots = new ArrayList(); for (Map.Entry> entry : hostAssignableSlots.entrySet()) { - sortHostAssignSlots.add(new HostAssignableSlots(entry.getKey(), entry.getValue())); + Integer free = hostFreeSlotCounts.get(entry.getKey()); + sortHostAssignSlots.add(new HostAssignableSlots(entry.getKey(), entry.getValue(), + free != null ? free.intValue() : 0)); } Collections.sort(sortHostAssignSlots, new Comparator() { @Override public int compare(HostAssignableSlots o1, HostAssignableSlots o2) { - return o2.getWorkerSlots().size() - o1.getWorkerSlots().size(); + int bySlots = o2.getWorkerSlots().size() - o1.getWorkerSlots().size(); + if (bySlots != 0) { + return bySlots; + } + + int byFree = o2.getFreeSlots() - o1.getFreeSlots(); + if (byFree != 0) { + return byFree; + } + + return o1.getHostName().compareTo(o2.getHostName()); } }); @@ -400,10 +422,12 @@ public Set getExecutors() { class HostAssignableSlots { private String hostName; private List workerSlots; + private final int freeSlots; - HostAssignableSlots(String hostName, List workerSlots) { + HostAssignableSlots(String hostName, List workerSlots, int freeSlots) { this.hostName = hostName; this.workerSlots = workerSlots; + this.freeSlots = freeSlots; } public String getHostName() { @@ -414,5 +438,9 @@ public List getWorkerSlots() { return workerSlots; } + public int getFreeSlots() { + return freeSlots; + } + } } diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/IsolationSchedulerTest.java b/storm-server/src/test/java/org/apache/storm/scheduler/IsolationSchedulerTest.java new file mode 100644 index 0000000000..3fbc43e159 --- /dev/null +++ b/storm-server/src/test/java/org/apache/storm/scheduler/IsolationSchedulerTest.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ + +package org.apache.storm.scheduler; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import org.apache.storm.Constants; +import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.scheduler.blacklist.TestUtilsForBlacklistScheduler; +import org.apache.storm.scheduler.resource.normalization.ResourceMetrics; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Unit tests for {@link IsolationScheduler}. + */ +public class IsolationSchedulerTest { + + private static SupervisorDetails mkSupervisor(String id, String host, int numPorts) { + List ports = new ArrayList<>(); + for (int i = 0; i < numPorts; i++) { + ports.add(i); + } + Map resources = new HashMap<>(); + resources.put(Constants.COMMON_CPU_RESOURCE_NAME, 400.0); + resources.put(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, 4096.0); + return new SupervisorDetails(id, host, null, ports, resources); + } + + private static Cluster mkCluster(Map supervisors, Topologies topologies) { + INimbus iNimbus = new TestUtilsForBlacklistScheduler.INimbusTest(); + ResourceMetrics resourceMetrics = new ResourceMetrics(new StormMetricsRegistry()); + return new Cluster(iNimbus, resourceMetrics, supervisors, new HashMap(), + topologies, new HashMap()); + } + + @SuppressWarnings("unchecked") + private static LinkedList hostAssignableSlots( + IsolationScheduler scheduler, Cluster cluster) throws Exception { + Method method = IsolationScheduler.class.getDeclaredMethod("hostAssignableSlots", Cluster.class); + method.setAccessible(true); + return (LinkedList) method.invoke(scheduler, cluster); + } + + private static List hostOrder(LinkedList slots) { + List hosts = new ArrayList<>(); + for (IsolationScheduler.HostAssignableSlots slot : slots) { + hosts.add(slot.getHostName()); + } + return hosts; + } + + @Test + public void hostAssignableSlots_prefersHostWithMoreFreeSlots() throws Exception { + Map supervisors = new HashMap<>(); + supervisors.put("sup-busy", mkSupervisor("sup-busy", "host-busy", 2)); + supervisors.put("sup-free", mkSupervisor("sup-free", "host-free", 2)); + + Map conf = new HashMap<>(); + TopologyDetails filler = TestUtilsForBlacklistScheduler.getTopology("filler", conf, 1, 0, 1, 0, 0, false); + Map topoMap = new HashMap<>(); + topoMap.put(filler.getId(), filler); + Topologies topologies = new Topologies(topoMap); + + Cluster cluster = mkCluster(supervisors, topologies); + cluster.assign(new WorkerSlot("sup-busy", 0), filler.getId(), + Collections.singletonList(filler.getExecutors().iterator().next())); + + LinkedList ranked = + hostAssignableSlots(new IsolationScheduler(), cluster); + + assertEquals(2, ranked.size()); + assertEquals("host-free", ranked.get(0).getHostName()); + assertEquals(2, ranked.get(0).getFreeSlots()); + assertEquals("host-busy", ranked.get(1).getHostName()); + assertEquals(1, ranked.get(1).getFreeSlots()); + assertEquals(hostOrder(ranked), List.of("host-free", "host-busy")); + } + + @Test + public void hostAssignableSlots_breaksTiesByHostName() throws Exception { + Map supervisors = new HashMap<>(); + supervisors.put("sup-a", mkSupervisor("sup-a", "host-aaa", 2)); + supervisors.put("sup-b", mkSupervisor("sup-b", "host-bbb", 2)); + + Cluster cluster = mkCluster(supervisors, new Topologies()); + + LinkedList ranked = + hostAssignableSlots(new IsolationScheduler(), cluster); + + assertEquals(2, ranked.size()); + assertEquals(2, ranked.get(0).getWorkerSlots().size()); + assertEquals(2, ranked.get(1).getWorkerSlots().size()); + assertEquals(2, ranked.get(0).getFreeSlots()); + assertEquals(2, ranked.get(1).getFreeSlots()); + assertEquals(hostOrder(ranked), List.of("host-aaa", "host-bbb")); + } +} \ No newline at end of file