Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package org.apache.storm.daemon.supervisor.timer;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.storm.Config;
Expand Down Expand Up @@ -41,11 +42,13 @@ public class ReportWorkerHeartbeats implements Runnable {
private Supervisor supervisor;
private Map<String, Object> conf;
private final int workerTimeoutSecs;
private final int workerMaxTimeoutSecs;

public ReportWorkerHeartbeats(Map<String, Object> conf, Supervisor supervisor) {
this.conf = conf;
this.supervisor = supervisor;
this.workerTimeoutSecs = ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
this.workerMaxTimeoutSecs = ObjectReader.getInt(conf.get(Config.WORKER_MAX_TIMEOUT_SECS));
}

@Override
Expand All @@ -71,26 +74,37 @@ SupervisorWorkerHeartbeats getSupervisorWorkerHeartbeatsFromLocal(Map<String, LS

List<SupervisorWorkerHeartbeat> heartbeatList = new ArrayList<>();

// Cache the effective timeout per topology so multiple workers of the same topology on this
// supervisor do not each re-read the topology conf within a single reporting round.
Map<String, Integer> effectiveTimeoutCache = new HashMap<>();

for (LSWorkerHeartbeat lsWorkerHeartbeat : localHeartbeats.values()) {
// local worker heartbeat can be null cause some error/exception
if (null == lsWorkerHeartbeat) {
continue;
}

String topologyId = lsWorkerHeartbeat.get_topology_id();

// Skip stale heartbeats left by worker directories that were never cleaned up
// (e.g. a worker that died before the supervisor finished cleanup). Such a worker
// has not heartbeat within the timeout, so it is already considered dead; forwarding
// it would make Nimbus repeatedly read the (often deleted) topology conf and log noise.
// A live worker always refreshes its heartbeat well within the timeout.
// A live worker always refreshes its heartbeat well within the timeout. The timeout must
// match Slot.getHbTimeoutMs (the actual kill authority), which honors a per-topology
// topology.worker.timeout.secs override; using only the global timeout would drop a
// slow-but-alive worker of a longer-timeout topology one round before Slot kills it.
int effectiveTimeoutSecs =
effectiveTimeoutCache.computeIfAbsent(topologyId, this::effectiveWorkerTimeoutSecs);
long hbAgeSecs = Time.deltaSecsLong(lsWorkerHeartbeat.get_time_secs());
if (hbAgeSecs > workerTimeoutSecs) {
LOG.debug("Skipping stale heartbeat for topology {}: age {}s > worker timeout {}s",
lsWorkerHeartbeat.get_topology_id(), hbAgeSecs, workerTimeoutSecs);
if (hbAgeSecs > effectiveTimeoutSecs) {
LOG.debug("Skipping stale heartbeat for topology {}: age {}s > effective worker timeout {}s",
topologyId, hbAgeSecs, effectiveTimeoutSecs);
continue;
}

SupervisorWorkerHeartbeat supervisorWorkerHeartbeat = new SupervisorWorkerHeartbeat();
supervisorWorkerHeartbeat.set_storm_id(lsWorkerHeartbeat.get_topology_id());
supervisorWorkerHeartbeat.set_storm_id(topologyId);
supervisorWorkerHeartbeat.set_executors(lsWorkerHeartbeat.get_executors());
supervisorWorkerHeartbeat.set_time_secs(lsWorkerHeartbeat.get_time_secs());

Expand All @@ -101,6 +115,40 @@ SupervisorWorkerHeartbeats getSupervisorWorkerHeartbeatsFromLocal(Map<String, LS
return supervisorWorkerHeartbeats;
}

/**
* Effective heartbeat timeout (in seconds) for a topology, agreeing with {@code Slot.getHbTimeoutMs} (the
* actual kill authority) for the value Slot uses. A topology may raise its own timeout via
* {@link Config#TOPOLOGY_WORKER_TIMEOUT_SECS}, which overrides the global
* {@link Config#SUPERVISOR_WORKER_TIMEOUT_SECS} when larger.
*
* <p>Nimbus clamps {@link Config#TOPOLOGY_WORKER_TIMEOUT_SECS} to {@link Config#WORKER_MAX_TIMEOUT_SECS}
* at submission time and persists the clamped value, so the on-disk conf both this method and Slot read is
* already bounded; for that value the two computations match. Unlike Slot, this method re-applies the cap
* to the topology override defensively, guarding against an un-clamped conf. The cap is applied to the
* override component only (not the final result), so the global timeout is never shrunk below the value
* Slot would use.
*
* <p>Orphaned worker directories often outlive their topology conf; when the conf cannot be read we fall
* back to the global timeout, which is exactly the behavior wanted for an already-dead worker.
*/
@VisibleForTesting
int effectiveWorkerTimeoutSecs(String topologyId) {
Map<String, Object> topoConf;
try {
topoConf = ConfigUtils.readSupervisorStormConf(conf, topologyId);
} catch (Exception e) {
LOG.debug("Cannot read topology conf for {}; using supervisor worker timeout {}s. msg: {}",
topologyId, workerTimeoutSecs, e.getMessage());
return workerTimeoutSecs;
}
Object topoTimeout = topoConf.get(Config.TOPOLOGY_WORKER_TIMEOUT_SECS);
if (topoTimeout == null) {
return workerTimeoutSecs;
}
int cappedTopoTimeoutSecs = Math.min(ObjectReader.getInt(topoTimeout), workerMaxTimeoutSecs);
return Math.max(workerTimeoutSecs, cappedTopoTimeoutSecs);
}

private void reportWorkerHeartbeats(SupervisorWorkerHeartbeats supervisorWorkerHeartbeats) {
if (supervisorWorkerHeartbeats == null) {
// error/exception thrown, just skip
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@
package org.apache.storm.daemon.supervisor.timer;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
Expand All @@ -26,24 +31,51 @@
import org.apache.storm.generated.LSWorkerHeartbeat;
import org.apache.storm.generated.SupervisorWorkerHeartbeat;
import org.apache.storm.generated.SupervisorWorkerHeartbeats;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.Time;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class ReportWorkerHeartbeatsTest {

private static final int WORKER_TIMEOUT_SECS = 30;
private static final int WORKER_MAX_TIMEOUT_SECS = 600;
private static final String SUPERVISOR_ID = "test-supervisor";

private ConfigUtils previousConfigUtils;
private ConfigUtils mockConfigUtils;

@BeforeEach
public void setUp() throws Exception {
mockConfigUtils = mock(ConfigUtils.class);
previousConfigUtils = ConfigUtils.setInstance(mockConfigUtils);
// Default: no per-topology conf override, so the reporter uses the global worker timeout.
when(mockConfigUtils.readSupervisorStormConfImpl(any(), any())).thenReturn(new HashMap<>());
}

@AfterEach
public void tearDown() {
ConfigUtils.setInstance(previousConfigUtils);
}

private static LSWorkerHeartbeat mkHeartbeat(String topologyId, long timeSecs) {
LSWorkerHeartbeat hb = new LSWorkerHeartbeat();
hb.set_topology_id(topologyId);
hb.set_time_secs(timeSecs);
return hb;
}

private static Map<String, Object> topoConf(String key, Object value) {
Map<String, Object> conf = new HashMap<>();
conf.put(key, value);
return conf;
}

private ReportWorkerHeartbeats mkReporter() {
Map<String, Object> conf = new HashMap<>();
conf.put(Config.SUPERVISOR_WORKER_TIMEOUT_SECS, WORKER_TIMEOUT_SECS);
conf.put(Config.WORKER_MAX_TIMEOUT_SECS, WORKER_MAX_TIMEOUT_SECS);
Supervisor supervisor = mock(Supervisor.class);
when(supervisor.getId()).thenReturn(SUPERVISOR_ID);
return new ReportWorkerHeartbeats(conf, supervisor);
Expand Down Expand Up @@ -93,4 +125,117 @@ public void nullLocalHeartbeatsAreSkipped() {
assertEquals(Set.of("topo-fresh"), reportedTopologies(result));
}
}

@Test
public void perTopologyTimeoutOverrideExtendsTheStaleThreshold() throws Exception {
// topo-long raises its own worker timeout well above the global one, mirroring Slot.getHbTimeoutMs.
when(mockConfigUtils.readSupervisorStormConfImpl(any(), eq("topo-long")))
.thenReturn(topoConf(Config.TOPOLOGY_WORKER_TIMEOUT_SECS, 300));

try (Time.SimulatedTime ignored = new Time.SimulatedTime()) {
Time.advanceTimeSecs(100_000);
long now = Time.currentTimeSecsLong();

Map<String, LSWorkerHeartbeat> local = new LinkedHashMap<>();
// stale under the global 30s timeout, but still alive under the 300s override
local.put("w-long", mkHeartbeat("topo-long", now - 100));
// same age, no override: already dead under the global timeout -> filtered out
local.put("w-default", mkHeartbeat("topo-default", now - 100));

SupervisorWorkerHeartbeats result = mkReporter().getSupervisorWorkerHeartbeatsFromLocal(local);

assertEquals(Set.of("topo-long"), reportedTopologies(result));
}
}

@Test
public void perTopologyTimeoutBelowGlobalKeepsTheGlobalThreshold() throws Exception {
// An override smaller than the global timeout must not shrink the effective timeout below the
// global one: effectiveWorkerTimeoutSecs takes max(global, override), matching Slot.getHbTimeoutMs.
when(mockConfigUtils.readSupervisorStormConfImpl(any(), eq("topo-small")))
.thenReturn(topoConf(Config.TOPOLOGY_WORKER_TIMEOUT_SECS, 10));

try (Time.SimulatedTime ignored = new Time.SimulatedTime()) {
Time.advanceTimeSecs(100_000);
long now = Time.currentTimeSecsLong();

Map<String, LSWorkerHeartbeat> local = new LinkedHashMap<>();
// age 20s: stale under the 10s override, but still alive under the global 30s floor
local.put("w-small", mkHeartbeat("topo-small", now - 20));

SupervisorWorkerHeartbeats result = mkReporter().getSupervisorWorkerHeartbeatsFromLocal(local);

assertEquals(Set.of("topo-small"), reportedTopologies(result));
}
}

@Test
public void topologyConfIsReadOncePerRoundForMultipleWorkers() throws Exception {
// Multiple workers of the same topology on this supervisor must share a single conf read per round.
when(mockConfigUtils.readSupervisorStormConfImpl(any(), eq("topo-shared")))
.thenReturn(topoConf(Config.TOPOLOGY_WORKER_TIMEOUT_SECS, 300));

try (Time.SimulatedTime ignored = new Time.SimulatedTime()) {
Time.advanceTimeSecs(100_000);
long now = Time.currentTimeSecsLong();

Map<String, LSWorkerHeartbeat> local = new LinkedHashMap<>();
local.put("w1", mkHeartbeat("topo-shared", now));
local.put("w2", mkHeartbeat("topo-shared", now));
local.put("w3", mkHeartbeat("topo-shared", now));

mkReporter().getSupervisorWorkerHeartbeatsFromLocal(local);

verify(mockConfigUtils, times(1)).readSupervisorStormConfImpl(any(), eq("topo-shared"));
}
}

@Test
public void perTopologyTimeoutIsCappedByWorkerMaxTimeout() throws Exception {
// An override beyond the cap must clamp to WORKER_MAX_TIMEOUT_SECS, just like Nimbus does at submission.
when(mockConfigUtils.readSupervisorStormConfImpl(any(), eq("topo-huge-stale")))
.thenReturn(topoConf(Config.TOPOLOGY_WORKER_TIMEOUT_SECS, WORKER_MAX_TIMEOUT_SECS * 100));
when(mockConfigUtils.readSupervisorStormConfImpl(any(), eq("topo-huge-fresh")))
.thenReturn(topoConf(Config.TOPOLOGY_WORKER_TIMEOUT_SECS, WORKER_MAX_TIMEOUT_SECS * 100));

try (Time.SimulatedTime ignored = new Time.SimulatedTime()) {
Time.advanceTimeSecs(1_000_000);
long now = Time.currentTimeSecsLong();

Map<String, LSWorkerHeartbeat> local = new LinkedHashMap<>();
// just past the cap: stale despite the huge override
local.put("w-stale", mkHeartbeat("topo-huge-stale", now - WORKER_MAX_TIMEOUT_SECS - 1));
// just within the cap: still alive
local.put("w-fresh", mkHeartbeat("topo-huge-fresh", now - WORKER_MAX_TIMEOUT_SECS + 1));

SupervisorWorkerHeartbeats result = mkReporter().getSupervisorWorkerHeartbeatsFromLocal(local);

assertEquals(Set.of("topo-huge-fresh"), reportedTopologies(result));
}
}

@Test
public void unreadableTopologyConfFallsBackToGlobalTimeout() throws Exception {
// Orphaned worker dirs often outlive their topology conf; a read failure must fall back to the
// global timeout rather than reporting a dead worker indefinitely.
when(mockConfigUtils.readSupervisorStormConfImpl(any(), eq("topo-orphan-fresh")))
.thenThrow(new IOException("topology conf gone"));
when(mockConfigUtils.readSupervisorStormConfImpl(any(), eq("topo-orphan-stale")))
.thenThrow(new IOException("topology conf gone"));

try (Time.SimulatedTime ignored = new Time.SimulatedTime()) {
Time.advanceTimeSecs(100_000);
long now = Time.currentTimeSecsLong();

Map<String, LSWorkerHeartbeat> local = new LinkedHashMap<>();
// alive under the fallback global timeout
local.put("w-fresh", mkHeartbeat("topo-orphan-fresh", now - WORKER_TIMEOUT_SECS + 1));
// stale under the fallback global timeout -> filtered out
local.put("w-stale", mkHeartbeat("topo-orphan-stale", now - WORKER_TIMEOUT_SECS - 1));

SupervisorWorkerHeartbeats result = mkReporter().getSupervisorWorkerHeartbeatsFromLocal(local);

assertEquals(Set.of("topo-orphan-fresh"), reportedTopologies(result));
}
}
}