-
Notifications
You must be signed in to change notification settings - Fork 4k
Add jitter control loop and JitterAwareStreamGrouping
#8593
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 27 commits
aae2554
2aad8a2
5cb1936
5aa8264
946715d
ac4e3b1
23247e3
ce8e2b1
f040770
b29b8e3
cb69a04
f5e28f4
451eafa
e7d94f8
35abc68
215e3e0
150e210
99f53e4
7d34c90
a76ee8d
a95f5f1
c938196
d8e7cda
2b4d8ad
babd3d9
7edf2d5
d3f4e08
0e0e18c
f983050
2d3f5fd
2643296
935b324
3df0555
c2c7ac2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,296 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License | ||
| */ | ||
|
|
||
| package org.apache.storm.perf; | ||
|
|
||
| import java.io.FileInputStream; | ||
| import java.io.IOException; | ||
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.concurrent.ThreadLocalRandom; | ||
| import java.util.concurrent.locks.LockSupport; | ||
| import org.apache.storm.Config; | ||
| import org.apache.storm.generated.StormTopology; | ||
| import org.apache.storm.grouping.JitterAwareStreamGrouping; | ||
| import org.apache.storm.perf.spout.FileReadSpout; | ||
| import org.apache.storm.perf.utils.Helper; | ||
| import org.apache.storm.spout.SpoutOutputCollector; | ||
| import org.apache.storm.task.OutputCollector; | ||
| import org.apache.storm.task.TopologyContext; | ||
| import org.apache.storm.topology.OutputFieldsDeclarer; | ||
| import org.apache.storm.topology.TopologyBuilder; | ||
| import org.apache.storm.topology.base.BaseRichBolt; | ||
| import org.apache.storm.topology.base.BaseRichSpout; | ||
| import org.apache.storm.tuple.Fields; | ||
| import org.apache.storm.tuple.Tuple; | ||
| import org.apache.storm.tuple.Values; | ||
| import org.apache.storm.utils.Utils; | ||
|
|
||
| /** | ||
| * Benchmark for {@link JitterAwareStreamGrouping} in a word-count pipeline where worker tasks have | ||
| * artificially skewed processing latencies. | ||
| * | ||
| * <p>Pipeline: {@code GenSpout -> SplitterBolt -> JitteryWorkerBolt -> SinkBolt} | ||
| * | ||
| * <p>{@code JitteryWorkerBolt} tasks have task-index-dependent processing delays. Task 0 is fast; | ||
| * each subsequent task is progressively slower. This mimics real-world conditions (GC pressure, | ||
| * I/O, resource contention) where downstream tasks diverge in responsiveness. With upstream | ||
| * feedback enabled, {@link JitterAwareStreamGrouping} routes more tuples to the fastest tasks, | ||
| * improving throughput and reducing complete latency by ≥10% compared to plain round-robin. | ||
| * | ||
| * <p>Run the baseline and the jitter-aware run back-to-back to compare: | ||
| * <pre> | ||
| * # Baseline: JitterAwareStreamGrouping falls back to round-robin (no feedback stats). | ||
| * storm jar storm-perf.jar org.apache.storm.perf.JitterAwareGroupingTopology 120 \ | ||
| * -c topology.upstream.feedback.enable=false | ||
| * | ||
| * # Jitter-aware: grouping steers tuples to lowest-jitter workers. | ||
| * storm jar storm-perf.jar org.apache.storm.perf.JitterAwareGroupingTopology 120 \ | ||
| * -c topology.upstream.feedback.enable=true | ||
| * </pre> | ||
| * | ||
| * <p>Tuning knobs (pass with {@code -c key=value}): | ||
| * <ul> | ||
| * <li>{@code spout.count} — number of spout tasks (default 1)</li> | ||
| * <li>{@code splitter.count} — number of splitter tasks (default 2)</li> | ||
| * <li>{@code worker.count} — number of jittery worker tasks (default 4)</li> | ||
| * <li>{@code sink.count} — number of sink tasks (default 1)</li> | ||
| * <li>{@code input.file} — path to a text file whose lines are treated as sentences | ||
| * (e.g. {@code src/main/sampledata/randomwords.txt})</li> | ||
| * <li>{@code worker.base.delay.us} — per-index processing delay step in µs (default 2000). | ||
| * Task {@code i} parks for {@code i * base} µs plus up to {@code base} µs of random noise, | ||
| * so a 4-worker setup has delays of ~0-2, ~2-4, ~4-6, ~6-8 ms. Must be ≥1000 µs so | ||
| * the EWMA latency gauge (millisecond resolution) sees distinct values per task.</li> | ||
| * </ul> | ||
| */ | ||
| public class JitterAwareGroupingTopology { | ||
|
|
||
| public static final String TOPOLOGY_NAME = "JitterAwareGroupingTopology"; | ||
|
|
||
| static final String SPOUT_ID = "gen"; | ||
| static final String SPLITTER_ID = "splitter"; | ||
| static final String WORKER_ID = "worker"; | ||
| static final String SINK_ID = "sink"; | ||
|
|
||
| static final String SPOUT_NUM = "spout.count"; | ||
| static final String SPLITTER_NUM = "splitter.count"; | ||
| static final String WORKER_NUM = "worker.count"; | ||
| static final String SINK_NUM = "sink.count"; | ||
| static final String INPUT_FILE = "input.file"; | ||
| static final String WORKER_BASE_DELAY_US = "worker.base.delay.us"; | ||
|
|
||
| private static final String FIELD_SENTENCE = "sentence"; | ||
| private static final String FIELD_WORD = "word"; | ||
| private static final String FIELD_COUNT = "count"; | ||
|
|
||
| static StormTopology getTopology(Map<String, Object> conf) { | ||
| int spouts = Helper.getInt(conf, SPOUT_NUM, 1); | ||
| int splitters = Helper.getInt(conf, SPLITTER_NUM, 2); | ||
| int workers = Helper.getInt(conf, WORKER_NUM, 4); | ||
| int sinks = Helper.getInt(conf, SINK_NUM, 1); | ||
| long baseDelayUs = Helper.getInt(conf, WORKER_BASE_DELAY_US, 2000); | ||
| String inputFile = Helper.getStr(conf, INPUT_FILE); | ||
|
|
||
| TopologyBuilder builder = new TopologyBuilder(); | ||
| builder.setSpout(SPOUT_ID, new GenSpout(inputFile), spouts); | ||
| builder.setBolt(SPLITTER_ID, new SplitterBolt(), splitters) | ||
| .localOrShuffleGrouping(SPOUT_ID); | ||
| builder.setBolt(WORKER_ID, new JitteryWorkerBolt(baseDelayUs), workers) | ||
| .customGrouping(SPLITTER_ID, new JitterAwareStreamGrouping()); | ||
| builder.setBolt(SINK_ID, new SinkBolt(), sinks) | ||
| .localOrShuffleGrouping(WORKER_ID); | ||
|
|
||
| return builder.createTopology(); | ||
| } | ||
|
|
||
| public static void main(String[] args) throws Exception { | ||
| int runTime = -1; | ||
| Config topoConf = new Config(); | ||
| if (args.length > 0) { | ||
| runTime = Integer.parseInt(args[0]); | ||
| } | ||
| if (args.length > 1) { | ||
| topoConf.putAll(Utils.findAndReadConfigFile(args[1])); | ||
| } | ||
| if (args.length > 2) { | ||
| System.err.println("args: [runDurationSec] [optionalConfFile]"); | ||
| return; | ||
| } | ||
|
|
||
| topoConf.put(Config.TOPOLOGY_STATS_EWMA_ENABLE, true); | ||
| topoConf.putIfAbsent(Config.TOPOLOGY_MAX_SPOUT_PENDING, 4000); | ||
| topoConf.putIfAbsent(Config.TOPOLOGY_UPSTREAM_FEEDBACK_FREQ_SECS, 10); | ||
| topoConf.putAll(Utils.readCommandLineOpts()); | ||
|
|
||
| Helper.runOnClusterAndPrintMetrics(runTime, TOPOLOGY_NAME, topoConf, getTopology(topoConf)); | ||
| } | ||
|
|
||
| /** | ||
| * Emits anchored sentences loaded from {@code input.file} at maximum rate. The file is read | ||
| * once into memory during {@link #open} and replayed in a round-robin loop. Anchoring (with a | ||
| * msgId) ensures Storm tracks each tuple tree to completion, so spout complete-latency is a | ||
| * reliable end-to-end signal. | ||
| */ | ||
| private static class GenSpout extends BaseRichSpout { | ||
| private final String filePath; | ||
| private SpoutOutputCollector collector; | ||
| private List<String> lines; | ||
| private int lineIdx; | ||
| private long msgId; | ||
|
|
||
| GenSpout(String filePath) { | ||
| this.filePath = filePath; | ||
| } | ||
|
|
||
| @Override | ||
| public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { | ||
| this.collector = collector; | ||
| try { | ||
| this.lines = FileReadSpout.readLines(new FileInputStream(filePath)); | ||
| } catch (IOException e) { | ||
| throw new RuntimeException("Cannot open input file: " + filePath, e); | ||
| } | ||
| if (lines.isEmpty()) { | ||
| throw new RuntimeException("Input file is empty: " + filePath); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void nextTuple() { | ||
| String sentence = lines.get(lineIdx++ % lines.size()); | ||
| collector.emit(new Values(sentence), ++msgId); | ||
| } | ||
|
|
||
| @Override | ||
| public void declareOutputFields(OutputFieldsDeclarer declarer) { | ||
| declarer.declare(new Fields(FIELD_SENTENCE)); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Splits each incoming sentence into words and emits one tuple per word, anchored so the ack | ||
| * tree extends to the downstream worker. | ||
| */ | ||
| private static class SplitterBolt extends BaseRichBolt { | ||
| private OutputCollector collector; | ||
|
|
||
| @Override | ||
| public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) { | ||
| this.collector = collector; | ||
| } | ||
|
|
||
| @Override | ||
| public void execute(Tuple tuple) { | ||
| String sentence = tuple.getString(0); | ||
| for (String word : sentence.split("\\s+")) { | ||
| collector.emit(tuple, new Values(word)); | ||
| } | ||
| collector.ack(tuple); | ||
| } | ||
|
|
||
| @Override | ||
| public void declareOutputFields(OutputFieldsDeclarer declarer) { | ||
| declarer.declare(new Fields(FIELD_WORD)); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Counts words and adds a task-index-proportional processing delay to produce deliberately | ||
| * skewed jitter profiles across tasks. | ||
| * | ||
| * <p>Task {@code i} parks for {@code i * baseDelayUs} µs plus up to {@code baseDelayUs} µs of | ||
| * random noise per tuple. For a 4-task setup with the default {@code baseDelayUs = 2000}: | ||
| * <ul> | ||
| * <li>Task 0: ~0–2 ms (fast)</li> | ||
| * <li>Task 1: ~2–4 ms</li> | ||
| * <li>Task 2: ~4–6 ms</li> | ||
| * <li>Task 3: ~6–8 ms (slow)</li> | ||
| * </ul> | ||
| * Delays are millisecond-scale so the EWMA execute-latency gauge (which stores values in ms) | ||
| * records distinct values per task. {@link JitterAwareStreamGrouping} then steers tuples toward | ||
| * the task with the lowest EWMA execute-latency when feedback is enabled. | ||
| * | ||
| * <p>{@link LockSupport#parkNanos} is used instead of a spin loop so slow tasks yield the CPU | ||
| * and do not starve the fast task's executor thread. | ||
| */ | ||
| private static class JitteryWorkerBolt extends BaseRichBolt { | ||
| private final long baseDelayUs; | ||
| private OutputCollector collector; | ||
| private long taskBaselineNs; | ||
| private long baseDelayNs; | ||
| private final Map<String, Integer> counts = new HashMap<>(); | ||
|
|
||
| JitteryWorkerBolt(long baseDelayUs) { | ||
| this.baseDelayUs = baseDelayUs; | ||
| } | ||
|
|
||
| @Override | ||
| public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) { | ||
| this.collector = collector; | ||
| this.baseDelayNs = baseDelayUs * 1_000L; | ||
| // Higher task indices get proportionally larger baseline delays. | ||
| this.taskBaselineNs = baseDelayNs * context.getThisTaskIndex(); | ||
| } | ||
|
|
||
| @Override | ||
| public void execute(Tuple tuple) { | ||
| String word = tuple.getString(0); | ||
| counts.merge(word, 1, Integer::sum); | ||
| int count = counts.get(word); | ||
|
|
||
| long noiseNs = (long) (ThreadLocalRandom.current().nextDouble() * baseDelayNs); | ||
| long sleepNs = taskBaselineNs + noiseNs; | ||
| if (sleepNs > 0) { | ||
| LockSupport.parkNanos(sleepNs); | ||
| } | ||
|
|
||
| // Emit anchored so the ack chain continues to SinkBolt, and so execute/process jitter | ||
| // is measured and reported back to SplitterBolt via upstream feedback. | ||
| collector.emit(tuple, new Values(word, count)); | ||
| collector.ack(tuple); | ||
| } | ||
|
|
||
| @Override | ||
| public void declareOutputFields(OutputFieldsDeclarer declarer) { | ||
| declarer.declare(new Fields(FIELD_WORD, FIELD_COUNT)); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Terminal bolt: acks each tuple to complete the tuple tree and drive spout complete-latency. | ||
| */ | ||
| private static class SinkBolt extends BaseRichBolt { | ||
| private OutputCollector collector; | ||
|
|
||
| @Override | ||
| public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) { | ||
| this.collector = collector; | ||
| } | ||
|
|
||
| @Override | ||
| public void execute(Tuple tuple) { | ||
| collector.ack(tuple); | ||
| } | ||
|
|
||
| @Override | ||
| public void declareOutputFields(OutputFieldsDeclarer declarer) { | ||
| // terminal — no output | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -608,8 +608,45 @@ public class Config extends HashMap<String, Object> { | |
| * | ||
| * @see <a href="https://www.rfc-editor.org/rfc/rfc1889#appendix-A.8">RFC 1889 §A.8</a> | ||
| */ | ||
| @CustomValidator(validatorClass = ConfigValidation.EwmaSmoothingFactorValidator.class) | ||
| @CustomValidator(validatorClass = ConfigValidation.ZeroOneOpenIntervalValidator.class) | ||
| public static final String TOPOLOGY_STATS_EWMA_SMOOTHING_FACTOR = "topology.stats.ewma.smoothing.factor"; | ||
| /** | ||
| * Flag to enable or disable the feedback channel for upstream communication. | ||
| * When true, components can send unanchored tuples back to their source tasks. | ||
| */ | ||
| @IsBoolean | ||
| public static final String TOPOLOGY_UPSTREAM_FEEDBACK_ENABLE = "topology.upstream.feedback.enable"; | ||
| /** | ||
| * The specific stream ID used for upstream feedback communication. | ||
| * Defaults to "__feedback" if not explicitly configured. | ||
| */ | ||
| @IsString | ||
| public static final String TOPOLOGY_UPSTREAM_FEEDBACK_STREAM_ID = "topology.upstream.feedback.stream"; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. User stream ids can't start with
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you, I completely agree. I am moving it in Constants. |
||
| /** | ||
| * The period, in seconds, between upstream feedback messages within the topology. | ||
| * | ||
| * <p>A dedicated internal feedback tick fires on this interval; on each tick a task emits | ||
| * a feedback tuple (containing metrics such as EWMA jitter stats) back to its parent tasks. | ||
| * This mechanism allows parent tasks to receive performance signals from downstream | ||
| * components to facilitate adaptive flow control or load balancing. Unlike a probabilistic | ||
| * trigger, the period yields a deterministic, data-volume-independent feedback cadence.</p> | ||
| * | ||
| * <p><b>Validation:</b> Must be a positive integer (seconds).</p> | ||
| * | ||
| * <p><b>Impact:</b> | ||
| * <ul> | ||
| * <li>Lower values provide more precise, real-time performance data but increase | ||
| * network overhead and CPU usage on the control plane.</li> | ||
| * <li>Higher values minimize the "observer effect" on the topology's throughput | ||
| * while still providing periodic statistical snapshots of health.</li> | ||
| * </ul> | ||
| * </p> | ||
| * | ||
| * Defaults to 10 if not explicitly configured. | ||
| */ | ||
| @IsInteger | ||
| @IsPositiveNumber | ||
| public static final String TOPOLOGY_UPSTREAM_FEEDBACK_FREQ_SECS = "topology.upstream.feedback.freq.secs"; | ||
| /** | ||
| * The time period that builtin metrics data in bucketed into. | ||
| */ | ||
|
|
@@ -1901,7 +1938,6 @@ public class Config extends HashMap<String, Object> { | |
| public static final String STORM_MESSAGING_NETTY_TLS_SSL_PROTOCOLS = "storm.messaging.netty.tls.ssl.protocols"; | ||
|
|
||
| /** | ||
| * /** | ||
| * Netty based messaging: The number of milliseconds that a Netty client will retry flushing messages that are already | ||
| * buffered to be sent. | ||
| */ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,7 @@ | |
| import java.util.Set; | ||
| import org.apache.storm.Config; | ||
| import org.apache.storm.Thrift; | ||
| import org.apache.storm.executor.ChildEwmaStats; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This file only adds an unused import, leftover from an earlier revision? Please drop. |
||
| import org.apache.storm.generated.GlobalStreamId; | ||
| import org.apache.storm.generated.Grouping; | ||
| import org.apache.storm.grouping.CustomStreamGrouping; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The javadoc claims a 10%+ improvement over round-robin. Let's keep performance claims out of the code until the benchmark questions are settled. Also, a missing
input.filecurrently fails with a bare NPE fromnew FileInputStream(null), a short usage message would help.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you! I am fixing.