From e65bb4c39825ae4d3401d6b4540ac1542efdfe37 Mon Sep 17 00:00:00 2001 From: Minwoo Kang Date: Mon, 22 Jun 2026 08:21:08 +0900 Subject: [PATCH] Filter worker heartbeats by per-topology timeout ReportWorkerHeartbeats dropped stale local heartbeats using only the global supervisor.worker.timeout.secs. The worker-kill authority, Slot.getHbTimeoutMs(), instead uses max(topology.worker.timeout.secs, supervisor.worker.timeout.secs). A slow-but-alive worker of a topology that raised its own timeout was therefore excluded from the report one round before Slot would treat it as dead. Compute the effective timeout per topology to mirror Slot: read the localized topology conf and use max(global, min(override, max)). The topology override is bounded by worker.max.timeout.secs -- Nimbus already clamps it at submission, re-applied here defensively against the override component only so the global floor is never reduced. When the topology conf cannot be read (the orphaned worker dirs this filter targets often outlive their conf) fall back to the global timeout. The per-topology result is cached within a reporting round. Add regression tests for a longer per-topology override, the worker.max.timeout.secs cap, and the conf-read-failure fallback. --- .../timer/ReportWorkerHeartbeats.java | 58 ++++++- .../timer/ReportWorkerHeartbeatsTest.java | 145 ++++++++++++++++++ 2 files changed, 198 insertions(+), 5 deletions(-) diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java index 4dedf2e63e..c9582c20c1 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java @@ -13,6 +13,7 @@ package org.apache.storm.daemon.supervisor.timer; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.storm.Config; @@ -41,11 +42,13 @@ public class ReportWorkerHeartbeats implements Runnable { private Supervisor supervisor; private Map conf; private final int workerTimeoutSecs; + private final int workerMaxTimeoutSecs; public ReportWorkerHeartbeats(Map conf, Supervisor supervisor) { this.conf = conf; this.supervisor = supervisor; this.workerTimeoutSecs = ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)); + this.workerMaxTimeoutSecs = ObjectReader.getInt(conf.get(Config.WORKER_MAX_TIMEOUT_SECS)); } @Override @@ -71,26 +74,37 @@ SupervisorWorkerHeartbeats getSupervisorWorkerHeartbeatsFromLocal(Map heartbeatList = new ArrayList<>(); + // Cache the effective timeout per topology so multiple workers of the same topology on this + // supervisor do not each re-read the topology conf within a single reporting round. + Map effectiveTimeoutCache = new HashMap<>(); + for (LSWorkerHeartbeat lsWorkerHeartbeat : localHeartbeats.values()) { // local worker heartbeat can be null cause some error/exception if (null == lsWorkerHeartbeat) { continue; } + String topologyId = lsWorkerHeartbeat.get_topology_id(); + // Skip stale heartbeats left by worker directories that were never cleaned up // (e.g. a worker that died before the supervisor finished cleanup). Such a worker // has not heartbeat within the timeout, so it is already considered dead; forwarding // it would make Nimbus repeatedly read the (often deleted) topology conf and log noise. - // A live worker always refreshes its heartbeat well within the timeout. + // A live worker always refreshes its heartbeat well within the timeout. The timeout must + // match Slot.getHbTimeoutMs (the actual kill authority), which honors a per-topology + // topology.worker.timeout.secs override; using only the global timeout would drop a + // slow-but-alive worker of a longer-timeout topology one round before Slot kills it. + int effectiveTimeoutSecs = + effectiveTimeoutCache.computeIfAbsent(topologyId, this::effectiveWorkerTimeoutSecs); long hbAgeSecs = Time.deltaSecsLong(lsWorkerHeartbeat.get_time_secs()); - if (hbAgeSecs > workerTimeoutSecs) { - LOG.debug("Skipping stale heartbeat for topology {}: age {}s > worker timeout {}s", - lsWorkerHeartbeat.get_topology_id(), hbAgeSecs, workerTimeoutSecs); + if (hbAgeSecs > effectiveTimeoutSecs) { + LOG.debug("Skipping stale heartbeat for topology {}: age {}s > effective worker timeout {}s", + topologyId, hbAgeSecs, effectiveTimeoutSecs); continue; } SupervisorWorkerHeartbeat supervisorWorkerHeartbeat = new SupervisorWorkerHeartbeat(); - supervisorWorkerHeartbeat.set_storm_id(lsWorkerHeartbeat.get_topology_id()); + supervisorWorkerHeartbeat.set_storm_id(topologyId); supervisorWorkerHeartbeat.set_executors(lsWorkerHeartbeat.get_executors()); supervisorWorkerHeartbeat.set_time_secs(lsWorkerHeartbeat.get_time_secs()); @@ -101,6 +115,40 @@ SupervisorWorkerHeartbeats getSupervisorWorkerHeartbeatsFromLocal(MapNimbus clamps {@link Config#TOPOLOGY_WORKER_TIMEOUT_SECS} to {@link Config#WORKER_MAX_TIMEOUT_SECS} + * at submission time and persists the clamped value, so the on-disk conf both this method and Slot read is + * already bounded; for that value the two computations match. Unlike Slot, this method re-applies the cap + * to the topology override defensively, guarding against an un-clamped conf. The cap is applied to the + * override component only (not the final result), so the global timeout is never shrunk below the value + * Slot would use. + * + *

Orphaned worker directories often outlive their topology conf; when the conf cannot be read we fall + * back to the global timeout, which is exactly the behavior wanted for an already-dead worker. + */ + @VisibleForTesting + int effectiveWorkerTimeoutSecs(String topologyId) { + Map topoConf; + try { + topoConf = ConfigUtils.readSupervisorStormConf(conf, topologyId); + } catch (Exception e) { + LOG.debug("Cannot read topology conf for {}; using supervisor worker timeout {}s. msg: {}", + topologyId, workerTimeoutSecs, e.getMessage()); + return workerTimeoutSecs; + } + Object topoTimeout = topoConf.get(Config.TOPOLOGY_WORKER_TIMEOUT_SECS); + if (topoTimeout == null) { + return workerTimeoutSecs; + } + int cappedTopoTimeoutSecs = Math.min(ObjectReader.getInt(topoTimeout), workerMaxTimeoutSecs); + return Math.max(workerTimeoutSecs, cappedTopoTimeoutSecs); + } + private void reportWorkerHeartbeats(SupervisorWorkerHeartbeats supervisorWorkerHeartbeats) { if (supervisorWorkerHeartbeats == null) { // error/exception thrown, just skip diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeatsTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeatsTest.java index f3ff48d610..c9c85cde12 100644 --- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeatsTest.java +++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeatsTest.java @@ -13,9 +13,14 @@ package org.apache.storm.daemon.supervisor.timer; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.IOException; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; @@ -26,14 +31,34 @@ import org.apache.storm.generated.LSWorkerHeartbeat; import org.apache.storm.generated.SupervisorWorkerHeartbeat; import org.apache.storm.generated.SupervisorWorkerHeartbeats; +import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.Time; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; public class ReportWorkerHeartbeatsTest { private static final int WORKER_TIMEOUT_SECS = 30; + private static final int WORKER_MAX_TIMEOUT_SECS = 600; private static final String SUPERVISOR_ID = "test-supervisor"; + private ConfigUtils previousConfigUtils; + private ConfigUtils mockConfigUtils; + + @BeforeEach + public void setUp() throws Exception { + mockConfigUtils = mock(ConfigUtils.class); + previousConfigUtils = ConfigUtils.setInstance(mockConfigUtils); + // Default: no per-topology conf override, so the reporter uses the global worker timeout. + when(mockConfigUtils.readSupervisorStormConfImpl(any(), any())).thenReturn(new HashMap<>()); + } + + @AfterEach + public void tearDown() { + ConfigUtils.setInstance(previousConfigUtils); + } + private static LSWorkerHeartbeat mkHeartbeat(String topologyId, long timeSecs) { LSWorkerHeartbeat hb = new LSWorkerHeartbeat(); hb.set_topology_id(topologyId); @@ -41,9 +66,16 @@ private static LSWorkerHeartbeat mkHeartbeat(String topologyId, long timeSecs) { return hb; } + private static Map topoConf(String key, Object value) { + Map conf = new HashMap<>(); + conf.put(key, value); + return conf; + } + private ReportWorkerHeartbeats mkReporter() { Map conf = new HashMap<>(); conf.put(Config.SUPERVISOR_WORKER_TIMEOUT_SECS, WORKER_TIMEOUT_SECS); + conf.put(Config.WORKER_MAX_TIMEOUT_SECS, WORKER_MAX_TIMEOUT_SECS); Supervisor supervisor = mock(Supervisor.class); when(supervisor.getId()).thenReturn(SUPERVISOR_ID); return new ReportWorkerHeartbeats(conf, supervisor); @@ -93,4 +125,117 @@ public void nullLocalHeartbeatsAreSkipped() { assertEquals(Set.of("topo-fresh"), reportedTopologies(result)); } } + + @Test + public void perTopologyTimeoutOverrideExtendsTheStaleThreshold() throws Exception { + // topo-long raises its own worker timeout well above the global one, mirroring Slot.getHbTimeoutMs. + when(mockConfigUtils.readSupervisorStormConfImpl(any(), eq("topo-long"))) + .thenReturn(topoConf(Config.TOPOLOGY_WORKER_TIMEOUT_SECS, 300)); + + try (Time.SimulatedTime ignored = new Time.SimulatedTime()) { + Time.advanceTimeSecs(100_000); + long now = Time.currentTimeSecsLong(); + + Map local = new LinkedHashMap<>(); + // stale under the global 30s timeout, but still alive under the 300s override + local.put("w-long", mkHeartbeat("topo-long", now - 100)); + // same age, no override: already dead under the global timeout -> filtered out + local.put("w-default", mkHeartbeat("topo-default", now - 100)); + + SupervisorWorkerHeartbeats result = mkReporter().getSupervisorWorkerHeartbeatsFromLocal(local); + + assertEquals(Set.of("topo-long"), reportedTopologies(result)); + } + } + + @Test + public void perTopologyTimeoutBelowGlobalKeepsTheGlobalThreshold() throws Exception { + // An override smaller than the global timeout must not shrink the effective timeout below the + // global one: effectiveWorkerTimeoutSecs takes max(global, override), matching Slot.getHbTimeoutMs. + when(mockConfigUtils.readSupervisorStormConfImpl(any(), eq("topo-small"))) + .thenReturn(topoConf(Config.TOPOLOGY_WORKER_TIMEOUT_SECS, 10)); + + try (Time.SimulatedTime ignored = new Time.SimulatedTime()) { + Time.advanceTimeSecs(100_000); + long now = Time.currentTimeSecsLong(); + + Map local = new LinkedHashMap<>(); + // age 20s: stale under the 10s override, but still alive under the global 30s floor + local.put("w-small", mkHeartbeat("topo-small", now - 20)); + + SupervisorWorkerHeartbeats result = mkReporter().getSupervisorWorkerHeartbeatsFromLocal(local); + + assertEquals(Set.of("topo-small"), reportedTopologies(result)); + } + } + + @Test + public void topologyConfIsReadOncePerRoundForMultipleWorkers() throws Exception { + // Multiple workers of the same topology on this supervisor must share a single conf read per round. + when(mockConfigUtils.readSupervisorStormConfImpl(any(), eq("topo-shared"))) + .thenReturn(topoConf(Config.TOPOLOGY_WORKER_TIMEOUT_SECS, 300)); + + try (Time.SimulatedTime ignored = new Time.SimulatedTime()) { + Time.advanceTimeSecs(100_000); + long now = Time.currentTimeSecsLong(); + + Map local = new LinkedHashMap<>(); + local.put("w1", mkHeartbeat("topo-shared", now)); + local.put("w2", mkHeartbeat("topo-shared", now)); + local.put("w3", mkHeartbeat("topo-shared", now)); + + mkReporter().getSupervisorWorkerHeartbeatsFromLocal(local); + + verify(mockConfigUtils, times(1)).readSupervisorStormConfImpl(any(), eq("topo-shared")); + } + } + + @Test + public void perTopologyTimeoutIsCappedByWorkerMaxTimeout() throws Exception { + // An override beyond the cap must clamp to WORKER_MAX_TIMEOUT_SECS, just like Nimbus does at submission. + when(mockConfigUtils.readSupervisorStormConfImpl(any(), eq("topo-huge-stale"))) + .thenReturn(topoConf(Config.TOPOLOGY_WORKER_TIMEOUT_SECS, WORKER_MAX_TIMEOUT_SECS * 100)); + when(mockConfigUtils.readSupervisorStormConfImpl(any(), eq("topo-huge-fresh"))) + .thenReturn(topoConf(Config.TOPOLOGY_WORKER_TIMEOUT_SECS, WORKER_MAX_TIMEOUT_SECS * 100)); + + try (Time.SimulatedTime ignored = new Time.SimulatedTime()) { + Time.advanceTimeSecs(1_000_000); + long now = Time.currentTimeSecsLong(); + + Map local = new LinkedHashMap<>(); + // just past the cap: stale despite the huge override + local.put("w-stale", mkHeartbeat("topo-huge-stale", now - WORKER_MAX_TIMEOUT_SECS - 1)); + // just within the cap: still alive + local.put("w-fresh", mkHeartbeat("topo-huge-fresh", now - WORKER_MAX_TIMEOUT_SECS + 1)); + + SupervisorWorkerHeartbeats result = mkReporter().getSupervisorWorkerHeartbeatsFromLocal(local); + + assertEquals(Set.of("topo-huge-fresh"), reportedTopologies(result)); + } + } + + @Test + public void unreadableTopologyConfFallsBackToGlobalTimeout() throws Exception { + // Orphaned worker dirs often outlive their topology conf; a read failure must fall back to the + // global timeout rather than reporting a dead worker indefinitely. + when(mockConfigUtils.readSupervisorStormConfImpl(any(), eq("topo-orphan-fresh"))) + .thenThrow(new IOException("topology conf gone")); + when(mockConfigUtils.readSupervisorStormConfImpl(any(), eq("topo-orphan-stale"))) + .thenThrow(new IOException("topology conf gone")); + + try (Time.SimulatedTime ignored = new Time.SimulatedTime()) { + Time.advanceTimeSecs(100_000); + long now = Time.currentTimeSecsLong(); + + Map local = new LinkedHashMap<>(); + // alive under the fallback global timeout + local.put("w-fresh", mkHeartbeat("topo-orphan-fresh", now - WORKER_TIMEOUT_SECS + 1)); + // stale under the fallback global timeout -> filtered out + local.put("w-stale", mkHeartbeat("topo-orphan-stale", now - WORKER_TIMEOUT_SECS - 1)); + + SupervisorWorkerHeartbeats result = mkReporter().getSupervisorWorkerHeartbeatsFromLocal(local); + + assertEquals(Set.of("topo-orphan-fresh"), reportedTopologies(result)); + } + } }