From c029aeed7dbba0327e538da7372b50af8930e326 Mon Sep 17 00:00:00 2001 From: Andrey Yarovoy Date: Thu, 18 Jun 2026 13:53:41 -0400 Subject: [PATCH 1/3] Added running CRC calculation into BlockOutputStream for reducing CRC overhead in high load write scenaious --- .../hdds/scm/storage/BlockOutputStream.java | 60 +- .../storage/BenchmarkMockXceiverClient.java | 161 +++++ .../BlockOutputStreamWriteBenchmark.java | 622 ++++++++++++++++++ .../TestBlockOutputStreamCorrectness.java | 234 +++++++ .../apache/hadoop/ozone/common/Checksum.java | 13 + 5 files changed, 1088 insertions(+), 2 deletions(-) create mode 100644 hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/BenchmarkMockXceiverClient.java create mode 100644 hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStreamWriteBenchmark.java diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index d7076df3ba01..ec665d89a49d 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.ozone.common.Checksum; +import org.apache.hadoop.ozone.common.ChecksumByteBuffer; import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.hadoop.ozone.common.OzoneChecksumException; @@ -137,6 +138,11 @@ public class BlockOutputStream extends OutputStream { private final List failedServers; private final Checksum checksum; + // Running checksum updated alongside write() to avoid a second data read in writeChunkToContainer(). + // Non-null only for CRC32/CRC32C; other types fall back to checksum.computeChecksum(). + private final ChecksumByteBuffer runningCrc; + private int runningCrcBytesInSegment; + private final List runningCrcChecksums; //number of buffers used before doing a flush/putBlock. private int flushPeriod; @@ -233,6 +239,9 @@ public BlockOutputStream( failedServers = new CopyOnWriteArrayList<>(); ioException = new AtomicReference<>(null); this.checksum = new Checksum(config.getChecksumType(), config.getBytesPerChecksum(), true); + this.runningCrc = this.checksum.newChecksumByteBuffer(); + this.runningCrcBytesInSegment = 0; + this.runningCrcChecksums = new ArrayList<>(); this.clientMetrics = clientMetrics; this.streamBufferArgs = streamBufferArgs; this.allowPutBlockPiggybacking = canEnablePutblockPiggybacking(); @@ -351,6 +360,14 @@ public void write(int b) throws IOException { allocateNewBufferIfNeeded(); currentBuffer.put((byte) b); currentBufferRemaining--; + if (runningCrc != null) { + runningCrc.update(b); + if (++runningCrcBytesInSegment == config.getBytesPerChecksum()) { + runningCrcChecksums.add(Checksum.int2ByteString((int) runningCrc.getValue())); + runningCrc.reset(); + runningCrcBytesInSegment = 0; + } + } updateWrittenDataLength(1); writeChunkIfNeeded(); doFlushOrWatchIfNeeded(); @@ -385,6 +402,9 @@ public void write(byte[] b, int off, int len) throws IOException { allocateNewBufferIfNeeded(); final int writeLen = Math.min(currentBufferRemaining, len); currentBuffer.put(b, off, writeLen); + if (runningCrc != null) { + accumulateRunningCrc(b, off, writeLen); + } currentBufferRemaining -= writeLen; updateWrittenDataLength(writeLen); writeChunkIfNeeded(); @@ -414,6 +434,38 @@ private void doFlushOrWatchIfNeeded() throws IOException { } } + private void accumulateRunningCrc(byte[] b, int off, int len) { + while (len > 0) { + final int space = config.getBytesPerChecksum() - runningCrcBytesInSegment; + final int toUpdate = Math.min(space, len); + runningCrc.update(b, off, toUpdate); + runningCrcBytesInSegment += toUpdate; + if (runningCrcBytesInSegment == config.getBytesPerChecksum()) { + runningCrcChecksums.add(Checksum.int2ByteString((int) runningCrc.getValue())); + runningCrc.reset(); + runningCrcBytesInSegment = 0; + } + off += toUpdate; + len -= toUpdate; + } + } + + // Returns a ChecksumData built from the running CRC accumulated during write(), then clears + // the running state. Returns null if no running CRC is available (retry path, non-CRC types). + private ChecksumData consumeRunningCrc() { + if (runningCrc == null || (runningCrcChecksums.isEmpty() && runningCrcBytesInSegment == 0)) { + return null; + } + final List checksumList = new ArrayList<>(runningCrcChecksums); + if (runningCrcBytesInSegment > 0) { + checksumList.add(Checksum.int2ByteString((int) runningCrc.getValue())); + runningCrc.reset(); + runningCrcBytesInSegment = 0; + } + runningCrcChecksums.clear(); + return new ChecksumData(config.getChecksumType(), config.getBytesPerChecksum(), checksumList); + } + private void recordWatchForCommitAsync(CompletableFuture putBlockResultFuture) { final CompletableFuture flushFuture = putBlockResultFuture.thenCompose(x -> watchForCommit(x.commitIndex)); @@ -903,8 +955,12 @@ private CompletableFuture writeChunkToContainer( final long offset = chunkOffset.getAndAdd(effectiveChunkSize); final ByteString data = chunk.toByteString( bufferPool.byteStringConversion()); - // chunk is incremental, don't cache its checksum - ChecksumData checksumData = checksum.computeChecksum(chunk, false); + // Use running CRC accumulated during write() when available (avoids a second full read over the chunk). + // Falls back to computeChecksum for the retry path (fresh stream, runningCrc empty) and non-CRC32 types. + ChecksumData checksumData = consumeRunningCrc(); + if (checksumData == null) { + checksumData = checksum.computeChecksum(chunk, false); + } // side note: checksum object is shared with PutBlock's (blockData) checksum calc, // current impl does not support caching both ChunkInfo chunkInfo = ChunkInfo.newBuilder() diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/BenchmarkMockXceiverClient.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/BenchmarkMockXceiverClient.java new file mode 100644 index 000000000000..adfd9f5960b2 --- /dev/null +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/BenchmarkMockXceiverClient.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.storage; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.LockSupport; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetCommittedBlockLengthResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutBlockResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.scm.XceiverClientReply; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf; +import org.apache.ratis.thirdparty.io.netty.buffer.ByteBufOutputStream; +import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator; + +/** + * Minimal xceiver client for {@link BlockOutputStreamWriteBenchmark}. + * + *

Replicates the production serialization path without real network I/O: + * the proto is serialized via {@code request.writeTo(OutputStream)} — which + * causes protobuf to use {@code OutputStreamEncoder}, the same encoder gRPC's + * {@code MessageFramer} uses — into a pooled direct {@code ByteBuf}, then the + * buffer is released immediately. + * + *

This exercises the full cross-domain copy chain: + *

    + *
  • direct chunk (pre-patch): {@code NioByteString.writeTo} allocates + * a temporary {@code byte[]}, copies from off-heap via {@code copyMemory} + * (copy 1), then writes heap→direct ByteBuf (copy 2).
  • + *
  • heap chunk (this patch): {@code BoundedByteString.writeTo} writes + * directly heap→direct ByteBuf (copy 1 only).
  • + *
+ */ +final class BenchmarkMockXceiverClient extends XceiverClientSpi { + + private final Pipeline pipeline; + private final AtomicLong logIndex = new AtomicLong(); + // Simulated Raft commit latency. The calling thread parks for this duration in + // watchForCommit(), mimicking the real-cluster scenario where the writer blocks + // waiting for consensus before allocating the next buffer. + private final long commitLatencyNs; + + BenchmarkMockXceiverClient(Pipeline pipeline, long commitLatencyNs) { + this.pipeline = pipeline; + this.commitLatencyNs = commitLatencyNs; + } + + @Override + public void connect() { + } + + @Override + public void close() { + } + + @Override + public Pipeline getPipeline() { + return pipeline; + } + + @Override + public XceiverClientReply sendCommandAsync(ContainerCommandRequestProto request) { + // Replicate: gRPC MessageFramer allocates a pooled direct ByteBuf via + // NettyWritableBufferAllocator, then serializes the proto into it through + // OutputStreamEncoder → WritableBufferOutputStream → ByteBuf.writeBytes(). + // For NioByteString (direct chunk data): copyMemory(direct→heap tmp) + write(heap→direct). + // For BoundedByteString (heap chunk data): write(heap→direct) only. + // The buffer is released immediately rather than being enqueued to a socket. + final int size = request.getSerializedSize(); + final ByteBuf frame = PooledByteBufAllocator.DEFAULT.directBuffer(size, size); + try { + final ByteBufOutputStream out = new ByteBufOutputStream(frame); + try { + request.writeTo(out); + } catch (IOException e) { + throw new RuntimeException(e); + } + } finally { + frame.release(); + } + + final ContainerCommandResponseProto.Builder builder = + ContainerCommandResponseProto.newBuilder() + .setResult(Result.SUCCESS) + .setCmdType(request.getCmdType()); + if (request.getCmdType() == Type.PutBlock) { + builder.setPutBlock(PutBlockResponseProto.newBuilder() + .setCommittedBlockLength( + GetCommittedBlockLengthResponseProto.newBuilder() + .setBlockID(request.getPutBlock().getBlockData().getBlockID()) + .setBlockLength(request.getPutBlock().getBlockData().getSize()) + .build()) + .build()); + } + final XceiverClientReply reply = + new XceiverClientReply(CompletableFuture.completedFuture(builder.build())); + reply.setLogIndex(logIndex.incrementAndGet()); + return reply; + } + + @Override + public ReplicationType getPipelineType() { + return pipeline.getType(); + } + + @Override + public CompletableFuture watchForCommit(long index) { + // Block the calling thread for commitLatencyNs to mimic the Raft leader + // requiring consensus before acknowledging a putBlock. This causes back-pressure: + // buffers stay in-flight, pool fills up, and concurrent threads thrash L3 cache + // while each writer waits — reproducing the cold-staging-buffer scenario captured + // by the real-cluster async-profiler (12% CPU in computeChecksum). + if (commitLatencyNs > 0) { + LockSupport.parkNanos(commitLatencyNs); + } + final ContainerCommandResponseProto response = + ContainerCommandResponseProto.newBuilder() + .setCmdType(Type.WriteChunk) + .setResult(Result.SUCCESS) + .build(); + final XceiverClientReply reply = + new XceiverClientReply(CompletableFuture.completedFuture(response)); + reply.setLogIndex(index); + return CompletableFuture.completedFuture(reply); + } + + @Override + public long getReplicatedMinCommitIndex() { + return logIndex.get(); + } + + @Override + public Map sendCommandOnAllNodes( + ContainerCommandRequestProto request) { + return null; + } +} diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStreamWriteBenchmark.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStreamWriteBenchmark.java new file mode 100644 index 000000000000..28c504e00f99 --- /dev/null +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStreamWriteBenchmark.java @@ -0,0 +1,622 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.storage; + +import static java.util.concurrent.Executors.newSingleThreadExecutor; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; +import org.apache.hadoop.hdds.scm.ByteStringConversion; +import org.apache.hadoop.hdds.scm.ContainerClientMetrics; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.StreamBufferArgs; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.pipeline.MockPipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.ozone.common.ChunkBuffer; +import org.apache.ozone.test.GenericTestUtils; +import org.slf4j.event.Level; + +/** + * Client-side {@link RatisBlockOutputStream} write benchmark using mocked + * container RPCs ({@link BenchmarkMockXceiverClient}). + * + *

Mimics the write path exercised by {@code ozone freon dsfg} (DataStream + * File Generator) — each worker thread repeatedly fills a 4 MB block via + * sequential writes and closes the stream, driving the chunk serialisation and + * gRPC framing code at full speed without real network or disk I/O. + * + *

Supports two comparison modes: + *

    + *
  • heap vs direct (default) — compares {@code ByteBuffer.allocate()} chunks + * (heap, {@code BoundedByteString}, single arraycopy to gRPC wire buffer) against + * {@code ByteBuffer.allocateDirect()} chunks (direct, {@code NioByteString}, two copies).
  • + *
  • checksum overhead ({@code benchmark.checksumComparison=true}) — runs + * {@code NONE} then {@code CRC32} back-to-back with heap buffers and prints the + * per-write checksum overhead and throughput delta. Use this to quantify the cost of + * {@code Checksum.computeChecksum} (before the incremental-CRC change) and verify + * its elimination (after).
  • + *
+ * + *

On-demand benchmark (not part of {@code mvn test}). Run from the repo root: + *

+ *   mvn -pl hadoop-hdds/client -q test-compile exec:java \
+ *     -Dexec.mainClass=org.apache.hadoop.hdds.scm.storage.BlockOutputStreamWriteBenchmark \
+ *     -Dexec.classpathScope=test
+ * 
+ * + *

Optional system properties: + *

    + *
  • {@code benchmark.label} – profile label (default {@code workspace})
  • + *
  • {@code benchmark.writeSize} – single write size in bytes (default: run + * 1 MB, 2 MB, 3 MB and 4 MB)
  • + *
  • {@code benchmark.heapBuffer} – {@code true} or {@code false} + * (default: run both)
  • + *
  • {@code benchmark.checksumType} – {@code NONE} or {@code CRC32} + * (default {@code CRC32})
  • + *
  • {@code benchmark.threads} – worker count (default {@code CPUs * 2})
  • + *
  • {@code benchmark.scaling=true} – run multiple thread counts for both + * allocation modes and print a scaling summary
  • + *
  • {@code benchmark.scaling.threads} – comma-separated thread counts for + * scaling (default {@code 1,2,7,14,28,42,56})
  • + *
  • {@code benchmark.scaling.writeSizes} – comma-separated write sizes in + * bytes (overrides default 1/2/3/4 MB list when set)
  • + *
  • {@code benchmark.streamBufferSize} – chunk buffer size in bytes (default 4 MB). + * Use a small value (e.g. {@code 65536}) to call {@link BufferPool#allocateBuffer} + * 64× more often, making per-call locking overhead visible in a profiler.
  • + *
  • {@code benchmark.fileSize} – bytes per stream open/close cycle + * (default 2 × poolCapacity × streamBufferSize = 64 MB with defaults).
  • + *
  • {@code benchmark.dnLatencyMs} – simulated Raft commit latency in milliseconds + * (default 0 = instant). The writer thread parks in {@code watchForCommit} for this + * duration, filling the {@link BufferPool} and creating concurrent L3 cache pressure + * across all writer threads. Use {@code 2} to match typical three-way Raft latency. + * Combined with {@code benchmark.threads ≥ 14}, this reproduces the cold-staging-buffer + * scenario captured by the real-cluster async-profiler.
  • + *
  • {@code benchmark.checksumComparison} – {@code true} to run {@code NONE} then + * {@code CRC32} back-to-back with heap buffers and print the per-write checksum + * overhead and throughput delta.
  • + *
+ * + *

Or use + * {@code hadoop-hdds/client/dev-support/run-block-output-stream-write-benchmark.sh}. + */ +public final class BlockOutputStreamWriteBenchmark { + + private static final int WARMUP_SECONDS = Integer.getInteger("benchmark.warmupSeconds", 10); + private static final int BENCHMARK_SECONDS = Integer.getInteger("benchmark.benchmarkSeconds", 20); + // Configurable via -Dbenchmark.streamBufferSize= (default 4 MB). + // Use a small value (e.g. 65536) to call allocateBuffer more frequently and + // expose per-call BufferPool locking overhead in a profiler. + private static final int STREAM_BUFFER_SIZE = + Integer.getInteger("benchmark.streamBufferSize", 4 * 1024 * 1024); + private static final int POOL_CAPACITY = 8; + // Total bytes written per stream open/close cycle. Default = 2× pool capacity. + private static final int FILE_SIZE = + Integer.getInteger("benchmark.fileSize", 2 * POOL_CAPACITY * STREAM_BUFFER_SIZE); + // Simulated Raft commit latency. Causes the calling thread to park in watchForCommit(), + // filling the BufferPool and creating concurrent L3 cache pressure across all writer threads. + // Use 1-5ms to match real three-way Raft commit latency. + private static final long DN_LATENCY_MS = Long.getLong("benchmark.dnLatencyMs", 0); + // Source byte array — kept at STREAM_BUFFER_SIZE for memory efficiency; reused in a loop. + private static final int SOURCE_BUFFER_SIZE = STREAM_BUFFER_SIZE; + + private static final int[] DEFAULT_WRITE_SIZES = { + STREAM_BUFFER_SIZE / 4, + STREAM_BUFFER_SIZE / 2, + STREAM_BUFFER_SIZE * 3 / 4, + STREAM_BUFFER_SIZE + }; + + private static final DecimalFormat MBPS = new DecimalFormat("#,##0.0"); + private static final DecimalFormat NS = new DecimalFormat("#,##0"); + private static final DecimalFormat COUNT = new DecimalFormat("#,##0"); + + private BlockOutputStreamWriteBenchmark() { + } + + public static void main(String[] args) throws Exception { + GenericTestUtils.setLogLevel(BufferPool.class, Level.INFO); + GenericTestUtils.setLogLevel(BlockOutputStream.class, Level.INFO); + + final String label = System.getProperty("benchmark.label", "workspace"); + final ChecksumType checksumType = ChecksumType.valueOf( + System.getProperty("benchmark.checksumType", ChecksumType.CRC32.name())); + final int cpus = Runtime.getRuntime().availableProcessors(); + + System.out.println("BlockOutputStream client write benchmark (mocked container RPCs)"); + System.out.println("Profile: " + label); + System.out.println("Comparing: heap ByteBuffer (this patch) vs direct ByteBuffer (pre-patch)"); + System.out.println("JVM: " + System.getProperty("java.version") + + " on " + System.getProperty("os.arch")); + System.out.printf("CPUs=%d fileSize=%dKB streamBuffer=%dKB poolCapacity=%d checksum=%s dnLatency=%dms%n", + cpus, FILE_SIZE / 1024, STREAM_BUFFER_SIZE / 1024, POOL_CAPACITY, checksumType, DN_LATENCY_MS); + System.out.println(); + + final byte[] sourceBuffer = new byte[SOURCE_BUFFER_SIZE]; + ThreadLocalRandom.current().nextBytes(sourceBuffer); + + if (Boolean.parseBoolean(System.getProperty("benchmark.scaling", "false"))) { + runScalingStudy(sourceBuffer, checksumType, cpus); + } else if (Boolean.parseBoolean(System.getProperty("benchmark.crcMatrix", "false"))) { + runChecksumMatrix(sourceBuffer, resolveScalingThreadCounts(cpus), resolveWriteSizes()); + } else if (Boolean.parseBoolean(System.getProperty("benchmark.checksumComparison", "false"))) { + final int threadCount = resolveThreadCount(cpus); + System.out.printf("threads=%d%n%n", threadCount); + runChecksumComparison(sourceBuffer, threadCount); + } else { + final int threadCount = resolveThreadCount(cpus); + System.out.printf("threads=%d%n%n", threadCount); + final Pipeline pipeline = MockPipeline.createRatisPipeline(); + final BenchmarkMockXceiverClient client = new BenchmarkMockXceiverClient(pipeline, DN_LATENCY_MS * 1_000_000L); + try { + for (int writeSize : resolveWriteSizes()) { + for (boolean heapBuffer : resolveHeapBufferModes()) { + runProfile(sourceBuffer, writeSize, heapBuffer, checksumType, threadCount, client); + System.out.println(); + } + } + } finally { + client.close(); + } + } + } + + /** + * Runs NONE then CRC32 back-to-back with heap buffers and prints the per-write checksum + * overhead. Use this to quantify computeChecksum cost before and after the incremental-CRC change. + */ + private static void runChecksumComparison(byte[] sourceBuffer, int threadCount) throws Exception { + final Pipeline pipeline = MockPipeline.createRatisPipeline(); + final BenchmarkMockXceiverClient client = new BenchmarkMockXceiverClient(pipeline, DN_LATENCY_MS * 1_000_000L); + try { + System.out.println("=== Checksum overhead comparison (heapBuffer=true) ==="); + System.out.println(); + for (int writeSize : resolveWriteSizes()) { + final Result none = runProfile(sourceBuffer, writeSize, true, ChecksumType.NONE, threadCount, client); + System.out.println(); + final Result crc32 = runProfile(sourceBuffer, writeSize, true, ChecksumType.CRC32, threadCount, client); + System.out.println(); + final double overheadNs = crc32.nsPerWrite - none.nsPerWrite; + final double overheadPct = none.mbPerSec == 0 ? 0 : (none.mbPerSec - crc32.mbPerSec) / none.mbPerSec * 100; + System.out.printf(">>> writeSize=%dKB NONE=%s MB/s CRC32=%s MB/s overhead=%.0fns/write (%.1f%%)%n%n", + writeSize / 1024, MBPS.format(none.mbPerSec), MBPS.format(crc32.mbPerSec), + overheadNs, overheadPct); + } + } finally { + client.close(); + } + } + + /** + * Runs a thread-count × write-size matrix comparing NONE vs CRC32 with heap buffers. + * Emits one MATRIX line per cell for easy grep/diff between before and after runs. + * Use {@code benchmark.warmupSeconds} / {@code benchmark.benchmarkSeconds} to tune + * duration (e.g. 3/7 for fast matrix sweeps). + */ + private static void runChecksumMatrix(byte[] sourceBuffer, int[] threadCounts, + int[] writeSizes) throws Exception { + final Pipeline pipeline = MockPipeline.createRatisPipeline(); + final BenchmarkMockXceiverClient client = new BenchmarkMockXceiverClient(pipeline, DN_LATENCY_MS * 1_000_000L); + try { + System.out.printf("crcMatrix: %d thread counts × %d write sizes × {NONE,CRC32}" + + " dnLatency=%dms warmup=%ds benchmark=%ds%n%n", + threadCounts.length, writeSizes.length, DN_LATENCY_MS, WARMUP_SECONDS, BENCHMARK_SECONDS); + System.out.printf("%-10s %-10s %14s %14s %16s %8s%n", + "threads", "writeKB", "NONE MB/s", "CRC32 MB/s", "overhead ns/w", "overhead%"); + System.out.println("----------------------------------------------------------------------------"); + for (int threadCount : threadCounts) { + for (int writeSize : writeSizes) { + final Result none = runProfile(sourceBuffer, writeSize, true, ChecksumType.NONE, threadCount, client); + final Result crc32 = runProfile(sourceBuffer, writeSize, true, ChecksumType.CRC32, threadCount, client); + final double overheadNs = crc32.nsPerWrite - none.nsPerWrite; + final double overheadPct = none.mbPerSec == 0 ? 0 : (none.mbPerSec - crc32.mbPerSec) / none.mbPerSec * 100; + System.out.printf("MATRIX %-8d %-8d %14s %14s %16.0f %7.1f%%%n", + threadCount, writeSize / 1024, + MBPS.format(none.mbPerSec), MBPS.format(crc32.mbPerSec), + overheadNs, overheadPct); + } + System.out.println(); + } + } finally { + client.close(); + } + } + + private static void runScalingStudy(byte[] sourceBuffer, ChecksumType checksumType, + int cpus) throws Exception { + final int[] threadCounts = resolveScalingThreadCounts(cpus); + System.out.printf("scaling thread counts: %s%n%n", formatThreadCounts(threadCounts)); + final List rows = new ArrayList<>(); + + // One shared gRPC client for the entire scaling study. Reusing the same + // event loop thread (and therefore the same PooledByteBufAllocator arena) + // across all phases lets the arena recycle pool chunks between phases. + // Creating a new client per phase assigns each new event loop to a + // different arena (round-robin), and 0%-usage chunks in old arenas cannot + // be reused by threads on new arenas, so direct memory grows unboundedly. + final Pipeline pipeline = MockPipeline.createRatisPipeline(); + final BenchmarkMockXceiverClient sharedClient = new BenchmarkMockXceiverClient(pipeline, DN_LATENCY_MS * 1_000_000L); + try { + for (int writeSize : resolveWriteSizes()) { + System.out.printf("===== writeSize=%dKB scaling study =====%n%n", writeSize / 1024); + for (int threadCount : threadCounts) { + for (boolean heapBuffer : new boolean[] {false, true}) { + final Result result = runProfile(sourceBuffer, writeSize, heapBuffer, + checksumType, threadCount, sharedClient); + rows.add(new ScalingRow(writeSize, threadCount, heapBuffer, result.mbPerSec)); + System.out.println(); + } + } + printScalingSummary(writeSize, threadCounts, rows); + System.out.println(); + } + } finally { + sharedClient.close(); + } + } + + private static void printScalingSummary(int writeSize, int[] threadCounts, + List allRows) { + final List rows = new ArrayList<>(); + for (ScalingRow row : allRows) { + if (row.writeSize == writeSize) { + rows.add(row); + } + } + + final double directBaseline = findMbPerSec(rows, 1, false); + final double heapBaseline = findMbPerSec(rows, 1, true); + + System.out.printf("--- scaling summary writeSize=%dKB ---%n", writeSize / 1024); + System.out.printf("%8s | %12s | %12s | %10s | %10s | %10s%n", + "threads", "direct MB/s", "heap MB/s", "heap/direct", "direct eff.", "heap eff."); + System.out.println("---------|--------------|--------------|------------|------------|------------"); + + for (int threadCount : threadCounts) { + final double direct = findMbPerSec(rows, threadCount, false); + final double heap = findMbPerSec(rows, threadCount, true); + final double heapVsDirect = direct == 0 ? 0 : heap / direct; + final double directEff = directBaseline == 0 ? 0 : direct / directBaseline / threadCount; + final double heapEff = heapBaseline == 0 ? 0 : heap / heapBaseline / threadCount; + System.out.printf("%8d | %12s | %12s | %10.2fx | %10.2f | %10.2f%n", + threadCount, MBPS.format(direct), MBPS.format(heap), heapVsDirect, + directEff, heapEff); + } + System.out.println(); + System.out.println("eff. = throughput vs 1-thread baseline / thread count (1.0 = linear scaling)"); + } + + private static double findMbPerSec(List rows, int threadCount, + boolean heapBuffer) { + for (ScalingRow row : rows) { + if (row.threadCount == threadCount && row.heapBuffer == heapBuffer) { + return row.mbPerSec; + } + } + return 0; + } + + private static final class ScalingRow { + private final int writeSize; + private final int threadCount; + private final boolean heapBuffer; + private final double mbPerSec; + + private ScalingRow(int writeSize, int threadCount, boolean heapBuffer, double mbPerSec) { + this.writeSize = writeSize; + this.threadCount = threadCount; + this.heapBuffer = heapBuffer; + this.mbPerSec = mbPerSec; + } + } + + private static int resolveThreadCount(int cpus) { + final String property = System.getProperty("benchmark.threads"); + if (property != null && !property.isEmpty()) { + return Integer.parseInt(property); + } + return cpus * 2; + } + + private static int[] resolveScalingThreadCounts(int cpus) { + final String property = System.getProperty("benchmark.scaling.threads"); + if (property == null || property.isEmpty()) { + return new int[] {1, 2, 7, 14, 28, 42, 56}; + } + return parseCommaSeparatedInts(property); + } + + private static String formatThreadCounts(int[] threadCounts) { + final StringBuilder builder = new StringBuilder(); + for (int i = 0; i < threadCounts.length; i++) { + if (i > 0) { + builder.append(", "); + } + builder.append(threadCounts[i]); + } + return builder.toString(); + } + + private static Result runProfile(byte[] sourceBuffer, int writeSize, + boolean heapBuffer, ChecksumType checksumType, int threadCount, + BenchmarkMockXceiverClient client) + throws Exception { + System.out.printf("--- writeSize=%dKB heapBuffer=%s threads=%d writes/stream=%d ---%n", + writeSize / 1024, heapBuffer, threadCount, FILE_SIZE / writeSize); + + final Result result = runProfileMeasured(sourceBuffer, writeSize, heapBuffer, + checksumType, threadCount, client); + + System.out.printf("elapsed: %.2fs%n", result.elapsedSeconds); + System.out.printf("bytes written: %s%n", COUNT.format(result.bytesWritten)); + System.out.printf("write ops (%dKB): %s%n", + writeSize / 1024, COUNT.format(result.writeOps)); + System.out.printf("blocks written: %s%n", COUNT.format(result.blocksWritten)); + System.out.printf("aggregate throughput: %s MB/s%n", MBPS.format(result.mbPerSec)); + System.out.printf("ns/write (aggregate): %s%n", NS.format(result.nsPerWrite)); + System.out.printf("writeChunks metric: %s%n", + COUNT.format(result.writeChunksDuringWrite)); + System.out.printf("flushes metric: %s%n", + COUNT.format(result.flushesDuringWrite)); + return result; + } + + private static Result runProfileMeasured(byte[] sourceBuffer, int writeSize, + boolean heapBuffer, ChecksumType checksumType, int threadCount, + BenchmarkMockXceiverClient client) + throws Exception { + runTimedWrite(sourceBuffer, writeSize, heapBuffer, checksumType, + threadCount, WARMUP_SECONDS, "warmup", client); + return runTimedWrite(sourceBuffer, writeSize, heapBuffer, checksumType, + threadCount, BENCHMARK_SECONDS, "benchmark", client); + } + + private static int[] resolveWriteSizes() { + final String scalingSizes = System.getProperty("benchmark.scaling.writeSizes"); + if (scalingSizes != null && !scalingSizes.isEmpty()) { + return parseCommaSeparatedInts(scalingSizes); + } + final String property = System.getProperty("benchmark.writeSize"); + if (property == null || property.isEmpty()) { + return DEFAULT_WRITE_SIZES; + } + return new int[] {Integer.parseInt(property)}; + } + + private static int[] parseCommaSeparatedInts(String property) { + final String[] parts = property.split(","); + final int[] values = new int[parts.length]; + for (int i = 0; i < parts.length; i++) { + values[i] = Integer.parseInt(parts[i].trim()); + } + return values; + } + + private static boolean[] resolveHeapBufferModes() { + final String property = System.getProperty("benchmark.heapBuffer"); + if (property == null || property.isEmpty()) { + return new boolean[] {false, true}; + } + return new boolean[] {Boolean.parseBoolean(property)}; + } + + private static Result runTimedWrite(byte[] sourceBuffer, int writeSize, + boolean heapBuffer, ChecksumType checksumType, int threadCount, + int seconds, String phase, BenchmarkMockXceiverClient client) throws Exception { + final long writeChunksBefore = + ContainerClientMetrics.acquire().getWriteChunksDuringWrite().value(); + final long flushesBefore = + ContainerClientMetrics.acquire().getFlushesDuringWrite().value(); + + final long start = System.nanoTime(); + final long deadline = start + seconds * 1_000_000_000L; + + final ExecutorService workers = Executors.newFixedThreadPool(threadCount); + try { + final List> futures = new ArrayList<>(threadCount); + for (int i = 0; i < threadCount; i++) { + futures.add(workers.submit(() -> + runWorker(sourceBuffer, writeSize, heapBuffer, checksumType, deadline, client))); + } + + long bytesWritten = 0; + long writeOps = 0; + long blocksWritten = 0; + for (Future future : futures) { + final WorkerResult workerResult = future.get(); + bytesWritten += workerResult.bytesWritten; + writeOps += workerResult.writeOps; + blocksWritten += workerResult.blocksWritten; + } + + final long elapsedNanos = System.nanoTime() - start; + final double elapsedSeconds = elapsedNanos / 1_000_000_000.0; + System.out.printf("%s complete (%.2fs)%n", phase, elapsedSeconds); + + return new Result( + bytesWritten, + writeOps, + blocksWritten, + elapsedSeconds, + bytesWritten / elapsedSeconds / (1024.0 * 1024.0), + writeOps == 0 ? 0 : (double) elapsedNanos / writeOps, + ContainerClientMetrics.acquire().getWriteChunksDuringWrite().value() + - writeChunksBefore, + ContainerClientMetrics.acquire().getFlushesDuringWrite().value() + - flushesBefore); + } finally { + workers.shutdownNow(); + try { + workers.awaitTermination(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + private static WorkerResult runWorker(byte[] sourceBuffer, int writeSize, + boolean heapBuffer, ChecksumType checksumType, long deadline, + BenchmarkMockXceiverClient mockClient) + throws Exception { + long bytesWritten = 0; + long writeOps = 0; + long blocksWritten = 0; + + try (BenchmarkSession session = BenchmarkSession.open(heapBuffer, checksumType, mockClient)) { + while (System.nanoTime() < deadline) { + try (BlockOutputStream stream = session.newStream()) { + int remaining = FILE_SIZE; + while (remaining > 0 && System.nanoTime() < deadline) { + int chunk = Math.min(writeSize, remaining); + stream.write(sourceBuffer, 0, chunk); + bytesWritten += chunk; + writeOps++; + remaining -= chunk; + } + } + blocksWritten++; + } + } + return new WorkerResult(bytesWritten, writeOps, blocksWritten); + } + + private static final class WorkerResult { + private final long bytesWritten; + private final long writeOps; + private final long blocksWritten; + + private WorkerResult(long bytesWritten, long writeOps, long blocksWritten) { + this.bytesWritten = bytesWritten; + this.writeOps = writeOps; + this.blocksWritten = blocksWritten; + } + } + + private static final class BenchmarkSession implements AutoCloseable { + private final Pipeline pipeline; + private final ContainerClientMetrics metrics; + private final BufferPool bufferPool; + private final OzoneClientConfig config; + private final StreamBufferArgs streamBufferArgs; + private final XceiverClientManager xceiverClientManager; + private final ExecutorService responseExecutor; + + private BenchmarkSession(Pipeline pipeline, ContainerClientMetrics metrics, + BufferPool bufferPool, OzoneClientConfig config, StreamBufferArgs streamBufferArgs, + XceiverClientManager xceiverClientManager, ExecutorService responseExecutor) { + this.pipeline = pipeline; + this.metrics = metrics; + this.bufferPool = bufferPool; + this.config = config; + this.streamBufferArgs = streamBufferArgs; + this.xceiverClientManager = xceiverClientManager; + this.responseExecutor = responseExecutor; + } + + static BenchmarkSession open(boolean heapBuffer, + ChecksumType checksumType, BenchmarkMockXceiverClient mockClient) throws IOException { + final Pipeline pipeline = mockClient.getPipeline(); + final XceiverClientManager xcm = mock(XceiverClientManager.class); + when(xcm.acquireClient(any())).thenReturn(mockClient); + + final OzoneClientConfig config = new OzoneClientConfig(); + config.setStreamBufferSize(STREAM_BUFFER_SIZE); + config.setStreamBufferMaxSize(POOL_CAPACITY * STREAM_BUFFER_SIZE); + config.setStreamBufferFlushDelay(false); + config.setStreamBufferFlushSize(POOL_CAPACITY * STREAM_BUFFER_SIZE / 2); + config.setChecksumType(checksumType); + config.setBytesPerChecksum(Math.min(64 * 1024, STREAM_BUFFER_SIZE)); + config.validate(); + + final StreamBufferArgs streamBufferArgs = + StreamBufferArgs.getDefaultStreamBufferArgs(pipeline.getReplicationConfig(), config); + + // Both modes use the same BufferPool; ChunkBuffer.ALLOCATE_DIRECT controls + // which allocator is called inside ChunkBuffer.allocate(), giving a true + // apples-to-apples comparison of heap vs direct buffer serialisation overhead. + ChunkBuffer.ALLOCATE_DIRECT.set(!heapBuffer); + + return new BenchmarkSession( + pipeline, + ContainerClientMetrics.acquire(), + new BufferPool(STREAM_BUFFER_SIZE, POOL_CAPACITY, + ByteStringConversion.createByteBufferConversion(true)), + config, + streamBufferArgs, + xcm, + newSingleThreadExecutor()); + } + + BlockOutputStream newStream() throws IOException { + return new RatisBlockOutputStream( + new BlockID(1L, 1L), + -1, + xceiverClientManager, + pipeline, + bufferPool, + config, + null, + metrics, + streamBufferArgs, + () -> responseExecutor); + } + + @Override + public void close() { + bufferPool.clearBufferPool(); + responseExecutor.shutdownNow(); + // mockClient is shared across all sessions in a phase; runTimedWrite owns it. + } + } + + private static final class Result { + private final long bytesWritten; + private final long writeOps; + private final long blocksWritten; + private final double elapsedSeconds; + private final double mbPerSec; + private final double nsPerWrite; + private final long writeChunksDuringWrite; + private final long flushesDuringWrite; + + private Result(long bytesWritten, long writeOps, long blocksWritten, double elapsedSeconds, + double mbPerSec, double nsPerWrite, long writeChunksDuringWrite, + long flushesDuringWrite) { + this.bytesWritten = bytesWritten; + this.writeOps = writeOps; + this.blocksWritten = blocksWritten; + this.elapsedSeconds = elapsedSeconds; + this.mbPerSec = mbPerSec; + this.nsPerWrite = nsPerWrite; + this.writeChunksDuringWrite = writeChunksDuringWrite; + this.flushesDuringWrite = flushesDuringWrite; + } + } +} diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java index 440b5b3d4d52..f9d8f409833c 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java @@ -19,6 +19,7 @@ import static java.util.concurrent.Executors.newFixedThreadPool; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -26,6 +27,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; @@ -34,6 +37,7 @@ import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; @@ -52,12 +56,16 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.ozone.ClientVersion; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.junit.jupiter.api.Assertions; +import java.util.stream.Stream; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; /** @@ -196,6 +204,136 @@ private ECBlockOutputStream createECBlockOutputStream(OzoneClientConfig clientCo clientMetrics, streamBufferArgs, () -> newFixedThreadPool(2)); } + /** + * Verifies that SHA-256 chunks written by BlockOutputStream carry checksums that match an independent + * computation. With SHA-256, {@code runningCrc} is null, so {@code consumeRunningCrc()} returns null and + * {@code writeChunkToContainer()} falls back to {@code computeChecksum()}. This test confirms the fallback + * path produces correct checksums for every segment in every chunk, including a partial final segment. + */ + @ParameterizedTest + @ValueSource(ints = {256 * 1024, 1024 * 1024, 4 * 1024 * 1024 + 1}) + void testSha256ChecksumFallback(final int writeSize) throws IOException { + final int bytesPerChecksum = 256 * 1024; + final int totalBytes = 4 * 1024 * 1024 + 1; // non-power-of-2 to exercise a partial last segment + + final Pipeline pipeline = MockPipeline.createRatisPipeline(); + final List capturedChunkData = new ArrayList<>(); + final List capturedChecksums = new ArrayList<>(); + + final XceiverClientManager xcm = mock(XceiverClientManager.class); + when(xcm.acquireClient(any())) + .thenReturn(new ChecksumCapturingMockClient(pipeline, capturedChunkData, capturedChecksums)); + + OzoneClientConfig config = new OzoneClientConfig(); + config.setStreamBufferSize(4 * 1024 * 1024); + config.setStreamBufferMaxSize(32 * 1024 * 1024); + config.setStreamBufferFlushDelay(true); + config.setStreamBufferFlushSize(16 * 1024 * 1024); + config.setChecksumType(ChecksumType.SHA256); + config.setBytesPerChecksum(bytesPerChecksum); + StreamBufferArgs streamBufferArgs = + StreamBufferArgs.getDefaultStreamBufferArgs(pipeline.getReplicationConfig(), config); + + final BufferPool bufferPool = new BufferPool(4 * 1024 * 1024, 32 / 4); + try (BlockOutputStream stream = new RatisBlockOutputStream( + new BlockID(1L, 1L), -1, xcm, pipeline, bufferPool, config, null, + ContainerClientMetrics.acquire(), streamBufferArgs, () -> newFixedThreadPool(10))) { + for (int off = 0; off < totalBytes; off += writeSize) { + final int len = Math.min(writeSize, totalBytes - off); + if (len == 1) { + stream.write(DATA[off]); + } else { + stream.write(DATA, off, len); + } + } + } + + assertFalse(capturedChunkData.isEmpty()); + final Checksum checker = new Checksum(ChecksumType.SHA256, bytesPerChecksum); + for (int i = 0; i < capturedChunkData.size(); i++) { + final ContainerProtos.ChecksumData expected = + checker.computeChecksum(capturedChunkData.get(i)).getProtoBufMessage(); + assertEquals(expected, capturedChecksums.get(i), + "SHA-256 checksum mismatch for chunk " + i); + } + } + + /** + * Parameter sets for {@link #testRunningCrcChecksumCorrectness}. + * Each row covers a distinct branch in the running-CRC path: + *

    + *
  • CRC32 aligned — consumeRunningCrc() no-partial branch (runningCrcBytesInSegment == 0)
  • + *
  • CRC32 unaligned — consumeRunningCrc() partial-segment branch (runningCrcBytesInSegment > 0) + * and accumulateRunningCrc() segment-incomplete branch
  • + *
  • CRC32 single-byte writes — write(int b) runningCrc != null, both sub-branches + * (segment-complete and segment-incomplete)
  • + *
  • CRC32C unaligned — covers ChecksumByteBufferFactory.crc32CImpl() path
  • + *
+ */ + static Stream runningCrcParams() { + return Stream.of( + // CRC32 aligned: 2 full 256KB segments, no partial tail + Arguments.of(ChecksumType.CRC32, 256 * 1024, 2 * 256 * 1024, false), + // CRC32 unaligned: 2 full segments + 100-byte partial tail + Arguments.of(ChecksumType.CRC32, 256 * 1024, 2 * 256 * 1024 + 100, false), + // CRC32 single-byte: bytesPerChecksum=64, write 74 bytes → 1 full segment + 10-byte partial + Arguments.of(ChecksumType.CRC32, 64, 74, true), + // CRC32C unaligned: exercises crc32CImpl() path with a partial tail + Arguments.of(ChecksumType.CRC32C, 256 * 1024, 2 * 256 * 1024 + 100, false) + ); + } + + /** + * Verifies that CRC32/CRC32C running-checksum values emitted by BlockOutputStream match + * independently computed checksums for the same bytes. Covers all branches in + * {@code accumulateRunningCrc()} and {@code consumeRunningCrc()} for both CRC types, using + * aligned, unaligned, and single-byte write patterns. + */ + @ParameterizedTest + @MethodSource("runningCrcParams") + void testRunningCrcChecksumCorrectness(ChecksumType type, int bpc, int totalBytes, + boolean singleByteWrites) throws IOException { + final Pipeline pipeline = MockPipeline.createRatisPipeline(); + final List capturedChunkData = new ArrayList<>(); + final List capturedChecksums = new ArrayList<>(); + + final XceiverClientManager xcm = mock(XceiverClientManager.class); + when(xcm.acquireClient(any())) + .thenReturn(new ChecksumCapturingMockClient(pipeline, capturedChunkData, capturedChecksums)); + + OzoneClientConfig config = new OzoneClientConfig(); + config.setStreamBufferSize(4 * 1024 * 1024); + config.setStreamBufferMaxSize(32 * 1024 * 1024); + config.setStreamBufferFlushDelay(true); + config.setStreamBufferFlushSize(16 * 1024 * 1024); + config.setChecksumType(type); + config.setBytesPerChecksum(bpc); + StreamBufferArgs streamBufferArgs = + StreamBufferArgs.getDefaultStreamBufferArgs(pipeline.getReplicationConfig(), config); + + final BufferPool bufferPool = new BufferPool(4 * 1024 * 1024, 32 / 4); + try (BlockOutputStream stream = new RatisBlockOutputStream( + new BlockID(1L, 1L), -1, xcm, pipeline, bufferPool, config, null, + ContainerClientMetrics.acquire(), streamBufferArgs, () -> newFixedThreadPool(10))) { + if (singleByteWrites) { + for (int i = 0; i < totalBytes; i++) { + stream.write(DATA[i]); + } + } else { + stream.write(DATA, 0, totalBytes); + } + } + + assertFalse(capturedChunkData.isEmpty(), type + ": expected at least one WriteChunk"); + final Checksum checker = new Checksum(type, bpc); + for (int i = 0; i < capturedChunkData.size(); i++) { + final ContainerProtos.ChecksumData expected = + checker.computeChecksum(capturedChunkData.get(i)).getProtoBufMessage(); + assertEquals(expected, capturedChecksums.get(i), + type + " checksum mismatch for chunk " + i); + } + } + /** * XCeiverClient which simulates responses. */ @@ -297,4 +435,100 @@ public long getReplicatedMinCommitIndex() { } } + /** + * XceiverClient that captures each WriteChunk's raw data bytes and the embedded ChecksumData proto, + * so callers can independently verify SHA-256 (or any non-CRC) checksum correctness. + */ + private static class ChecksumCapturingMockClient extends XceiverClientSpi { + + private final Pipeline pipeline; + private final List capturedChunkData; + private final List capturedChecksums; + private final AtomicInteger counter = new AtomicInteger(); + + ChecksumCapturingMockClient(Pipeline pipeline, + List capturedChunkData, + List capturedChecksums) { + super(); + this.pipeline = pipeline; + this.capturedChunkData = capturedChunkData; + this.capturedChecksums = capturedChecksums; + } + + @Override + public void connect() { + } + + @Override + public void close() { + } + + @Override + public Pipeline getPipeline() { + return pipeline; + } + + @Override + public XceiverClientReply sendCommandAsync(ContainerCommandRequestProto request) { + if (!request.hasVersion()) { + request = ContainerCommandRequestProto.newBuilder(request) + .setVersion(ClientVersion.CURRENT.toProtoValue()).build(); + } + final ContainerCommandResponseProto.Builder builder = + ContainerCommandResponseProto.newBuilder() + .setResult(Result.SUCCESS) + .setCmdType(request.getCmdType()); + + switch (request.getCmdType()) { + case PutBlock: + builder.setPutBlock(PutBlockResponseProto.newBuilder() + .setCommittedBlockLength( + GetCommittedBlockLengthResponseProto.newBuilder() + .setBlockID(request.getPutBlock().getBlockData().getBlockID()) + .setBlockLength(request.getPutBlock().getBlockData().getSize()) + .build()) + .build()); + break; + case WriteChunk: + capturedChunkData.add(request.getWriteChunk().getData().toByteArray()); + capturedChecksums.add(request.getWriteChunk().getChunkData().getChecksumData()); + break; + default: + //no-op + } + final XceiverClientReply result = new XceiverClientReply( + CompletableFuture.completedFuture(builder.build())); + result.setLogIndex(counter.incrementAndGet()); + return result; + } + + @Override + public ReplicationType getPipelineType() { + return null; + } + + @Override + public CompletableFuture watchForCommit(long index) { + final ContainerCommandResponseProto.Builder builder = + ContainerCommandResponseProto.newBuilder() + .setCmdType(Type.WriteChunk) + .setResult(Result.SUCCESS); + final XceiverClientReply reply = new XceiverClientReply( + CompletableFuture.completedFuture(builder.build())); + reply.setLogIndex(index); + return CompletableFuture.completedFuture(reply); + } + + @Override + public long getReplicatedMinCommitIndex() { + return 0; + } + + @Override + public Map sendCommandOnAllNodes( + ContainerCommandRequestProto request) { + return null; + } + } + } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java index ffdab4cde160..493162af3d2b 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java @@ -148,6 +148,19 @@ public Checksum(ChecksumType type, int bytesPerChecksum, boolean allowChecksumCa } } + /** + * Creates a new {@link ChecksumByteBuffer} for the configured checksum type, suitable for incremental CRC + * accumulation alongside a write path. Returns {@code null} for types that do not support incremental + * updates via {@link ChecksumByteBuffer} (NONE, SHA-256, MD5). + */ + public ChecksumByteBuffer newChecksumByteBuffer() { + switch (checksumType) { + case CRC32: return ChecksumByteBufferFactory.crc32Impl(); + case CRC32C: return ChecksumByteBufferFactory.crc32CImpl(); + default: return null; + } + } + /** * Computes checksum for give data. * @param data input data. From e0f90eb7ed336146f3b22be9f813f13c45dd28ab Mon Sep 17 00:00:00 2001 From: Andrey Yarovoy Date: Thu, 18 Jun 2026 18:17:57 -0400 Subject: [PATCH 2/3] fixed checkstyle --- .../hdds/scm/storage/TestBlockOutputStreamCorrectness.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java index f9d8f409833c..77df8e6dec5f 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.ECReplicationConfig; @@ -61,7 +62,6 @@ import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.junit.jupiter.api.Assertions; -import java.util.stream.Stream; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; From 50ddc995709b58d0c9da81fe883a0929bfaafe23 Mon Sep 17 00:00:00 2001 From: Andrey Yarovoy Date: Wed, 24 Jun 2026 23:03:31 -0400 Subject: [PATCH 3/3] tweaks based on async profiler results --- .../hdds/scm/storage/BlockOutputStream.java | 30 +- .../storage/BenchmarkMockXceiverClient.java | 47 +-- .../BlockOutputStreamWriteBenchmark.java | 276 +++++------------- .../common/ChunkBufferImplWithByteBuffer.java | 6 + 4 files changed, 124 insertions(+), 235 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index ec665d89a49d..ed5a78c743be 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -401,9 +401,10 @@ public void write(byte[] b, int off, int len) throws IOException { while (len > 0) { allocateNewBufferIfNeeded(); final int writeLen = Math.min(currentBufferRemaining, len); + final int writeStart = currentBuffer.position(); currentBuffer.put(b, off, writeLen); if (runningCrc != null) { - accumulateRunningCrc(b, off, writeLen); + accumulateRunningCrc(currentBuffer, writeStart, writeLen); } currentBufferRemaining -= writeLen; updateWrittenDataLength(writeLen); @@ -434,19 +435,22 @@ private void doFlushOrWatchIfNeeded() throws IOException { } } - private void accumulateRunningCrc(byte[] b, int off, int len) { - while (len > 0) { - final int space = config.getBytesPerChecksum() - runningCrcBytesInSegment; - final int toUpdate = Math.min(space, len); - runningCrc.update(b, off, toUpdate); - runningCrcBytesInSegment += toUpdate; - if (runningCrcBytesInSegment == config.getBytesPerChecksum()) { - runningCrcChecksums.add(Checksum.int2ByteString((int) runningCrc.getValue())); - runningCrc.reset(); - runningCrcBytesInSegment = 0; + private void accumulateRunningCrc(ChunkBuffer src, int startPos, int writeLen) { + for (ByteBuffer bb : src.duplicate(startPos, startPos + writeLen).iterate(writeLen)) { + while (bb.hasRemaining()) { + final int space = config.getBytesPerChecksum() - runningCrcBytesInSegment; + final int toUpdate = Math.min(space, bb.remaining()); + final int savedLimit = bb.limit(); + bb.limit(bb.position() + toUpdate); + runningCrc.update(bb); + bb.limit(savedLimit); + runningCrcBytesInSegment += toUpdate; + if (runningCrcBytesInSegment == config.getBytesPerChecksum()) { + runningCrcChecksums.add(Checksum.int2ByteString((int) runningCrc.getValue())); + runningCrc.reset(); + runningCrcBytesInSegment = 0; + } } - off += toUpdate; - len -= toUpdate; } } diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/BenchmarkMockXceiverClient.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/BenchmarkMockXceiverClient.java index adfd9f5960b2..656dbb0deffd 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/BenchmarkMockXceiverClient.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/BenchmarkMockXceiverClient.java @@ -36,6 +36,7 @@ import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf; import org.apache.ratis.thirdparty.io.netty.buffer.ByteBufOutputStream; import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator; +import org.apache.ratis.thirdparty.io.netty.util.ResourceLeakDetector; /** * Minimal xceiver client for {@link BlockOutputStreamWriteBenchmark}. @@ -57,13 +58,28 @@ */ final class BenchmarkMockXceiverClient extends XceiverClientSpi { + static { + // The thread-local ByteBufs are never released (they live for the worker + // thread's lifetime), which triggers ResourceLeakDetector warnings when + // benchmark thread pools are torn down between phases. Disable detection + // for this benchmark-only class to keep output clean. + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED); + } + + // One reusable direct buffer per worker thread — eliminates the per-call + // PoolArena.allocateHuge + ByteBuffer.allocateDirect + OS zero-fill cycle + // that dominates CPU at large (≥4 MB) write sizes. + private static final ThreadLocal FRAME_BUF = ThreadLocal.withInitial( + () -> PooledByteBufAllocator.DEFAULT.directBuffer(4 * 1024 * 1024 + 1024)); + private final Pipeline pipeline; private final AtomicLong logIndex = new AtomicLong(); - // Simulated Raft commit latency. The calling thread parks for this duration in - // watchForCommit(), mimicking the real-cluster scenario where the writer blocks - // waiting for consensus before allocating the next buffer. private final long commitLatencyNs; + BenchmarkMockXceiverClient(Pipeline pipeline) { + this(pipeline, 0); + } + BenchmarkMockXceiverClient(Pipeline pipeline, long commitLatencyNs) { this.pipeline = pipeline; this.commitLatencyNs = commitLatencyNs; @@ -89,18 +105,16 @@ public XceiverClientReply sendCommandAsync(ContainerCommandRequestProto request) // OutputStreamEncoder → WritableBufferOutputStream → ByteBuf.writeBytes(). // For NioByteString (direct chunk data): copyMemory(direct→heap tmp) + write(heap→direct). // For BoundedByteString (heap chunk data): write(heap→direct) only. - // The buffer is released immediately rather than being enqueued to a socket. - final int size = request.getSerializedSize(); - final ByteBuf frame = PooledByteBufAllocator.DEFAULT.directBuffer(size, size); + // Reuse a thread-local direct buffer rather than allocating per call: + // ≥4 MB requests exceed PoolArena's chunk size and fall into allocateHuge, + // which calls ByteBuffer.allocateDirect + OS zero-fill on every invocation. + final ByteBuf frame = FRAME_BUF.get(); + frame.clear(); + frame.ensureWritable(request.getSerializedSize()); try { - final ByteBufOutputStream out = new ByteBufOutputStream(frame); - try { - request.writeTo(out); - } catch (IOException e) { - throw new RuntimeException(e); - } - } finally { - frame.release(); + request.writeTo(new ByteBufOutputStream(frame)); + } catch (IOException e) { + throw new RuntimeException(e); } final ContainerCommandResponseProto.Builder builder = @@ -129,11 +143,6 @@ public ReplicationType getPipelineType() { @Override public CompletableFuture watchForCommit(long index) { - // Block the calling thread for commitLatencyNs to mimic the Raft leader - // requiring consensus before acknowledging a putBlock. This causes back-pressure: - // buffers stay in-flight, pool fills up, and concurrent threads thrash L3 cache - // while each writer waits — reproducing the cold-staging-buffer scenario captured - // by the real-cluster async-profiler (12% CPU in computeChecksum). if (commitLatencyNs > 0) { LockSupport.parkNanos(commitLatencyNs); } diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStreamWriteBenchmark.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStreamWriteBenchmark.java index 28c504e00f99..8f72bbc9a3fe 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStreamWriteBenchmark.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStreamWriteBenchmark.java @@ -40,7 +40,6 @@ import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.pipeline.MockPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.ozone.test.GenericTestUtils; import org.slf4j.event.Level; @@ -53,17 +52,9 @@ * sequential writes and closes the stream, driving the chunk serialisation and * gRPC framing code at full speed without real network or disk I/O. * - *

Supports two comparison modes: - *

    - *
  • heap vs direct (default) — compares {@code ByteBuffer.allocate()} chunks - * (heap, {@code BoundedByteString}, single arraycopy to gRPC wire buffer) against - * {@code ByteBuffer.allocateDirect()} chunks (direct, {@code NioByteString}, two copies).
  • - *
  • checksum overhead ({@code benchmark.checksumComparison=true}) — runs - * {@code NONE} then {@code CRC32} back-to-back with heap buffers and prints the - * per-write checksum overhead and throughput delta. Use this to quantify the cost of - * {@code Checksum.computeChecksum} (before the incremental-CRC change) and verify - * its elimination (after).
  • - *
+ *

Uses heap {@code ByteBuffer} backed chunks. {@code UnsafeByteOperations.unsafeWrap()} + * returns a {@code BoundedByteString} with a backing array, so gRPC serialises via + * a single {@code System.arraycopy} into the Netty wire buffer. * *

On-demand benchmark (not part of {@code mvn test}). Run from the repo root: *

@@ -76,62 +67,39 @@
  * 
    *
  • {@code benchmark.label} – profile label (default {@code workspace})
  • *
  • {@code benchmark.writeSize} – single write size in bytes (default: run - * 1 MB, 2 MB, 3 MB and 4 MB)
  • - *
  • {@code benchmark.heapBuffer} – {@code true} or {@code false} - * (default: run both)
  • + * 256 KB, 512 KB, 1 MB, 2 MB and 4 MB) *
  • {@code benchmark.checksumType} – {@code NONE} or {@code CRC32} * (default {@code CRC32})
  • *
  • {@code benchmark.threads} – worker count (default {@code CPUs * 2})
  • - *
  • {@code benchmark.scaling=true} – run multiple thread counts for both - * allocation modes and print a scaling summary
  • - *
  • {@code benchmark.scaling.threads} – comma-separated thread counts for - * scaling (default {@code 1,2,7,14,28,42,56})
  • - *
  • {@code benchmark.scaling.writeSizes} – comma-separated write sizes in - * bytes (overrides default 1/2/3/4 MB list when set)
  • - *
  • {@code benchmark.streamBufferSize} – chunk buffer size in bytes (default 4 MB). - * Use a small value (e.g. {@code 65536}) to call {@link BufferPool#allocateBuffer} - * 64× more often, making per-call locking overhead visible in a profiler.
  • - *
  • {@code benchmark.fileSize} – bytes per stream open/close cycle - * (default 2 × poolCapacity × streamBufferSize = 64 MB with defaults).
  • *
  • {@code benchmark.dnLatencyMs} – simulated Raft commit latency in milliseconds * (default 0 = instant). The writer thread parks in {@code watchForCommit} for this - * duration, filling the {@link BufferPool} and creating concurrent L3 cache pressure - * across all writer threads. Use {@code 2} to match typical three-way Raft latency. - * Combined with {@code benchmark.threads ≥ 14}, this reproduces the cold-staging-buffer - * scenario captured by the real-cluster async-profiler.
  • - *
  • {@code benchmark.checksumComparison} – {@code true} to run {@code NONE} then - * {@code CRC32} back-to-back with heap buffers and print the per-write checksum - * overhead and throughput delta.
  • + * duration, filling the {@link BufferPool} and creating back-pressure that reproduces + * the cold-staging-buffer scenario from the real-cluster async-profiler. + *
  • {@code benchmark.scaling=true} – run multiple thread counts and print + * a scaling summary
  • + *
  • {@code benchmark.scaling.threads} – comma-separated thread counts for + * scaling (default {@code 1,2,4,7,14,28,42})
  • + *
  • {@code benchmark.scaling.writeSizes} – comma-separated write sizes in + * bytes (overrides default list when set)
  • *
- * - *

Or use - * {@code hadoop-hdds/client/dev-support/run-block-output-stream-write-benchmark.sh}. */ public final class BlockOutputStreamWriteBenchmark { - private static final int WARMUP_SECONDS = Integer.getInteger("benchmark.warmupSeconds", 10); - private static final int BENCHMARK_SECONDS = Integer.getInteger("benchmark.benchmarkSeconds", 20); - // Configurable via -Dbenchmark.streamBufferSize= (default 4 MB). - // Use a small value (e.g. 65536) to call allocateBuffer more frequently and - // expose per-call BufferPool locking overhead in a profiler. - private static final int STREAM_BUFFER_SIZE = - Integer.getInteger("benchmark.streamBufferSize", 4 * 1024 * 1024); + private static final int WARMUP_SECONDS = 10; + private static final int BENCHMARK_SECONDS = 20; + private static final int STREAM_BUFFER_SIZE = 4 * 1024 * 1024; + private static final int SOURCE_BUFFER_SIZE = STREAM_BUFFER_SIZE; private static final int POOL_CAPACITY = 8; - // Total bytes written per stream open/close cycle. Default = 2× pool capacity. - private static final int FILE_SIZE = - Integer.getInteger("benchmark.fileSize", 2 * POOL_CAPACITY * STREAM_BUFFER_SIZE); - // Simulated Raft commit latency. Causes the calling thread to park in watchForCommit(), - // filling the BufferPool and creating concurrent L3 cache pressure across all writer threads. - // Use 1-5ms to match real three-way Raft commit latency. private static final long DN_LATENCY_MS = Long.getLong("benchmark.dnLatencyMs", 0); - // Source byte array — kept at STREAM_BUFFER_SIZE for memory efficiency; reused in a loop. - private static final int SOURCE_BUFFER_SIZE = STREAM_BUFFER_SIZE; + private static final boolean UNSAFE_BYTE_OPS = + Boolean.parseBoolean(System.getProperty("benchmark.unsafeByteOps", "true")); private static final int[] DEFAULT_WRITE_SIZES = { - STREAM_BUFFER_SIZE / 4, - STREAM_BUFFER_SIZE / 2, - STREAM_BUFFER_SIZE * 3 / 4, - STREAM_BUFFER_SIZE + 256 * 1024, + 512 * 1024, + 1024 * 1024, + 2 * 1024 * 1024, + 4 * 1024 * 1024 }; private static final DecimalFormat MBPS = new DecimalFormat("#,##0.0"); @@ -152,11 +120,13 @@ public static void main(String[] args) throws Exception { System.out.println("BlockOutputStream client write benchmark (mocked container RPCs)"); System.out.println("Profile: " + label); - System.out.println("Comparing: heap ByteBuffer (this patch) vs direct ByteBuffer (pre-patch)"); System.out.println("JVM: " + System.getProperty("java.version") + " on " + System.getProperty("os.arch")); - System.out.printf("CPUs=%d fileSize=%dKB streamBuffer=%dKB poolCapacity=%d checksum=%s dnLatency=%dms%n", - cpus, FILE_SIZE / 1024, STREAM_BUFFER_SIZE / 1024, POOL_CAPACITY, checksumType, DN_LATENCY_MS); + System.out.printf( + "CPUs=%d sourceBuffer=%dMB streamBuffer=%dMB poolCapacity=%d checksum=%s dnLatency=%dms byteOps=%s%n", + cpus, SOURCE_BUFFER_SIZE / (1024 * 1024), + STREAM_BUFFER_SIZE / (1024 * 1024), POOL_CAPACITY, checksumType, DN_LATENCY_MS, + UNSAFE_BYTE_OPS ? "unsafe" : "safe"); System.out.println(); final byte[] sourceBuffer = new byte[SOURCE_BUFFER_SIZE]; @@ -164,12 +134,6 @@ public static void main(String[] args) throws Exception { if (Boolean.parseBoolean(System.getProperty("benchmark.scaling", "false"))) { runScalingStudy(sourceBuffer, checksumType, cpus); - } else if (Boolean.parseBoolean(System.getProperty("benchmark.crcMatrix", "false"))) { - runChecksumMatrix(sourceBuffer, resolveScalingThreadCounts(cpus), resolveWriteSizes()); - } else if (Boolean.parseBoolean(System.getProperty("benchmark.checksumComparison", "false"))) { - final int threadCount = resolveThreadCount(cpus); - System.out.printf("threads=%d%n%n", threadCount); - runChecksumComparison(sourceBuffer, threadCount); } else { final int threadCount = resolveThreadCount(cpus); System.out.printf("threads=%d%n%n", threadCount); @@ -177,10 +141,8 @@ public static void main(String[] args) throws Exception { final BenchmarkMockXceiverClient client = new BenchmarkMockXceiverClient(pipeline, DN_LATENCY_MS * 1_000_000L); try { for (int writeSize : resolveWriteSizes()) { - for (boolean heapBuffer : resolveHeapBufferModes()) { - runProfile(sourceBuffer, writeSize, heapBuffer, checksumType, threadCount, client); - System.out.println(); - } + runProfile(sourceBuffer, writeSize, checksumType, threadCount, client); + System.out.println(); } } finally { client.close(); @@ -188,67 +150,6 @@ public static void main(String[] args) throws Exception { } } - /** - * Runs NONE then CRC32 back-to-back with heap buffers and prints the per-write checksum - * overhead. Use this to quantify computeChecksum cost before and after the incremental-CRC change. - */ - private static void runChecksumComparison(byte[] sourceBuffer, int threadCount) throws Exception { - final Pipeline pipeline = MockPipeline.createRatisPipeline(); - final BenchmarkMockXceiverClient client = new BenchmarkMockXceiverClient(pipeline, DN_LATENCY_MS * 1_000_000L); - try { - System.out.println("=== Checksum overhead comparison (heapBuffer=true) ==="); - System.out.println(); - for (int writeSize : resolveWriteSizes()) { - final Result none = runProfile(sourceBuffer, writeSize, true, ChecksumType.NONE, threadCount, client); - System.out.println(); - final Result crc32 = runProfile(sourceBuffer, writeSize, true, ChecksumType.CRC32, threadCount, client); - System.out.println(); - final double overheadNs = crc32.nsPerWrite - none.nsPerWrite; - final double overheadPct = none.mbPerSec == 0 ? 0 : (none.mbPerSec - crc32.mbPerSec) / none.mbPerSec * 100; - System.out.printf(">>> writeSize=%dKB NONE=%s MB/s CRC32=%s MB/s overhead=%.0fns/write (%.1f%%)%n%n", - writeSize / 1024, MBPS.format(none.mbPerSec), MBPS.format(crc32.mbPerSec), - overheadNs, overheadPct); - } - } finally { - client.close(); - } - } - - /** - * Runs a thread-count × write-size matrix comparing NONE vs CRC32 with heap buffers. - * Emits one MATRIX line per cell for easy grep/diff between before and after runs. - * Use {@code benchmark.warmupSeconds} / {@code benchmark.benchmarkSeconds} to tune - * duration (e.g. 3/7 for fast matrix sweeps). - */ - private static void runChecksumMatrix(byte[] sourceBuffer, int[] threadCounts, - int[] writeSizes) throws Exception { - final Pipeline pipeline = MockPipeline.createRatisPipeline(); - final BenchmarkMockXceiverClient client = new BenchmarkMockXceiverClient(pipeline, DN_LATENCY_MS * 1_000_000L); - try { - System.out.printf("crcMatrix: %d thread counts × %d write sizes × {NONE,CRC32}" - + " dnLatency=%dms warmup=%ds benchmark=%ds%n%n", - threadCounts.length, writeSizes.length, DN_LATENCY_MS, WARMUP_SECONDS, BENCHMARK_SECONDS); - System.out.printf("%-10s %-10s %14s %14s %16s %8s%n", - "threads", "writeKB", "NONE MB/s", "CRC32 MB/s", "overhead ns/w", "overhead%"); - System.out.println("----------------------------------------------------------------------------"); - for (int threadCount : threadCounts) { - for (int writeSize : writeSizes) { - final Result none = runProfile(sourceBuffer, writeSize, true, ChecksumType.NONE, threadCount, client); - final Result crc32 = runProfile(sourceBuffer, writeSize, true, ChecksumType.CRC32, threadCount, client); - final double overheadNs = crc32.nsPerWrite - none.nsPerWrite; - final double overheadPct = none.mbPerSec == 0 ? 0 : (none.mbPerSec - crc32.mbPerSec) / none.mbPerSec * 100; - System.out.printf("MATRIX %-8d %-8d %14s %14s %16.0f %7.1f%%%n", - threadCount, writeSize / 1024, - MBPS.format(none.mbPerSec), MBPS.format(crc32.mbPerSec), - overheadNs, overheadPct); - } - System.out.println(); - } - } finally { - client.close(); - } - } - private static void runScalingStudy(byte[] sourceBuffer, ChecksumType checksumType, int cpus) throws Exception { final int[] threadCounts = resolveScalingThreadCounts(cpus); @@ -262,17 +163,16 @@ private static void runScalingStudy(byte[] sourceBuffer, ChecksumType checksumTy // different arena (round-robin), and 0%-usage chunks in old arenas cannot // be reused by threads on new arenas, so direct memory grows unboundedly. final Pipeline pipeline = MockPipeline.createRatisPipeline(); - final BenchmarkMockXceiverClient sharedClient = new BenchmarkMockXceiverClient(pipeline, DN_LATENCY_MS * 1_000_000L); + final BenchmarkMockXceiverClient sharedClient = + new BenchmarkMockXceiverClient(pipeline, DN_LATENCY_MS * 1_000_000L); try { for (int writeSize : resolveWriteSizes()) { System.out.printf("===== writeSize=%dKB scaling study =====%n%n", writeSize / 1024); for (int threadCount : threadCounts) { - for (boolean heapBuffer : new boolean[] {false, true}) { - final Result result = runProfile(sourceBuffer, writeSize, heapBuffer, - checksumType, threadCount, sharedClient); - rows.add(new ScalingRow(writeSize, threadCount, heapBuffer, result.mbPerSec)); - System.out.println(); - } + final Result result = runProfile(sourceBuffer, writeSize, checksumType, + threadCount, sharedClient); + rows.add(new ScalingRow(writeSize, threadCount, result.mbPerSec)); + System.out.println(); } printScalingSummary(writeSize, threadCounts, rows); System.out.println(); @@ -291,32 +191,24 @@ private static void printScalingSummary(int writeSize, int[] threadCounts, } } - final double directBaseline = findMbPerSec(rows, 1, false); - final double heapBaseline = findMbPerSec(rows, 1, true); + final double baseline = findMbPerSec(rows, 1); System.out.printf("--- scaling summary writeSize=%dKB ---%n", writeSize / 1024); - System.out.printf("%8s | %12s | %12s | %10s | %10s | %10s%n", - "threads", "direct MB/s", "heap MB/s", "heap/direct", "direct eff.", "heap eff."); - System.out.println("---------|--------------|--------------|------------|------------|------------"); + System.out.printf("%8s | %12s | %10s%n", "threads", "MB/s", "efficiency"); + System.out.println("---------|--------------|------------"); for (int threadCount : threadCounts) { - final double direct = findMbPerSec(rows, threadCount, false); - final double heap = findMbPerSec(rows, threadCount, true); - final double heapVsDirect = direct == 0 ? 0 : heap / direct; - final double directEff = directBaseline == 0 ? 0 : direct / directBaseline / threadCount; - final double heapEff = heapBaseline == 0 ? 0 : heap / heapBaseline / threadCount; - System.out.printf("%8d | %12s | %12s | %10.2fx | %10.2f | %10.2f%n", - threadCount, MBPS.format(direct), MBPS.format(heap), heapVsDirect, - directEff, heapEff); + final double mbPerSec = findMbPerSec(rows, threadCount); + final double eff = baseline == 0 ? 0 : mbPerSec / baseline / threadCount; + System.out.printf("%8d | %12s | %10.2f%n", threadCount, MBPS.format(mbPerSec), eff); } System.out.println(); - System.out.println("eff. = throughput vs 1-thread baseline / thread count (1.0 = linear scaling)"); + System.out.println("efficiency = throughput vs 1-thread baseline / thread count (1.0 = linear scaling)"); } - private static double findMbPerSec(List rows, int threadCount, - boolean heapBuffer) { + private static double findMbPerSec(List rows, int threadCount) { for (ScalingRow row : rows) { - if (row.threadCount == threadCount && row.heapBuffer == heapBuffer) { + if (row.threadCount == threadCount) { return row.mbPerSec; } } @@ -326,13 +218,11 @@ private static double findMbPerSec(List rows, int threadCount, private static final class ScalingRow { private final int writeSize; private final int threadCount; - private final boolean heapBuffer; private final double mbPerSec; - private ScalingRow(int writeSize, int threadCount, boolean heapBuffer, double mbPerSec) { + private ScalingRow(int writeSize, int threadCount, double mbPerSec) { this.writeSize = writeSize; this.threadCount = threadCount; - this.heapBuffer = heapBuffer; this.mbPerSec = mbPerSec; } } @@ -348,7 +238,7 @@ private static int resolveThreadCount(int cpus) { private static int[] resolveScalingThreadCounts(int cpus) { final String property = System.getProperty("benchmark.scaling.threads"); if (property == null || property.isEmpty()) { - return new int[] {1, 2, 7, 14, 28, 42, 56}; + return new int[] {1, 2, 4, 7, 14, 28, 42}; } return parseCommaSeparatedInts(property); } @@ -365,13 +255,13 @@ private static String formatThreadCounts(int[] threadCounts) { } private static Result runProfile(byte[] sourceBuffer, int writeSize, - boolean heapBuffer, ChecksumType checksumType, int threadCount, + ChecksumType checksumType, int threadCount, BenchmarkMockXceiverClient client) throws Exception { - System.out.printf("--- writeSize=%dKB heapBuffer=%s threads=%d writes/stream=%d ---%n", - writeSize / 1024, heapBuffer, threadCount, FILE_SIZE / writeSize); + System.out.printf("--- writeSize=%dKB threads=%d writes/stream=%d ---%n", + writeSize / 1024, threadCount, SOURCE_BUFFER_SIZE / writeSize); - final Result result = runProfileMeasured(sourceBuffer, writeSize, heapBuffer, + final Result result = runProfileMeasured(sourceBuffer, writeSize, checksumType, threadCount, client); System.out.printf("elapsed: %.2fs%n", result.elapsedSeconds); @@ -389,13 +279,11 @@ private static Result runProfile(byte[] sourceBuffer, int writeSize, } private static Result runProfileMeasured(byte[] sourceBuffer, int writeSize, - boolean heapBuffer, ChecksumType checksumType, int threadCount, + ChecksumType checksumType, int threadCount, BenchmarkMockXceiverClient client) throws Exception { - runTimedWrite(sourceBuffer, writeSize, heapBuffer, checksumType, - threadCount, WARMUP_SECONDS, "warmup", client); - return runTimedWrite(sourceBuffer, writeSize, heapBuffer, checksumType, - threadCount, BENCHMARK_SECONDS, "benchmark", client); + runTimedWrite(sourceBuffer, writeSize, checksumType, threadCount, WARMUP_SECONDS, client); + return runTimedWrite(sourceBuffer, writeSize, checksumType, threadCount, BENCHMARK_SECONDS, client); } private static int[] resolveWriteSizes() { @@ -419,17 +307,9 @@ private static int[] parseCommaSeparatedInts(String property) { return values; } - private static boolean[] resolveHeapBufferModes() { - final String property = System.getProperty("benchmark.heapBuffer"); - if (property == null || property.isEmpty()) { - return new boolean[] {false, true}; - } - return new boolean[] {Boolean.parseBoolean(property)}; - } - private static Result runTimedWrite(byte[] sourceBuffer, int writeSize, - boolean heapBuffer, ChecksumType checksumType, int threadCount, - int seconds, String phase, BenchmarkMockXceiverClient client) throws Exception { + ChecksumType checksumType, int threadCount, + int seconds, BenchmarkMockXceiverClient client) throws Exception { final long writeChunksBefore = ContainerClientMetrics.acquire().getWriteChunksDuringWrite().value(); final long flushesBefore = @@ -443,7 +323,7 @@ private static Result runTimedWrite(byte[] sourceBuffer, int writeSize, final List> futures = new ArrayList<>(threadCount); for (int i = 0; i < threadCount; i++) { futures.add(workers.submit(() -> - runWorker(sourceBuffer, writeSize, heapBuffer, checksumType, deadline, client))); + runWorker(sourceBuffer, writeSize, checksumType, deadline, client))); } long bytesWritten = 0; @@ -458,15 +338,13 @@ private static Result runTimedWrite(byte[] sourceBuffer, int writeSize, final long elapsedNanos = System.nanoTime() - start; final double elapsedSeconds = elapsedNanos / 1_000_000_000.0; - System.out.printf("%s complete (%.2fs)%n", phase, elapsedSeconds); + System.out.printf("complete (%.2fs)%n", elapsedSeconds); return new Result( bytesWritten, writeOps, blocksWritten, elapsedSeconds, - bytesWritten / elapsedSeconds / (1024.0 * 1024.0), - writeOps == 0 ? 0 : (double) elapsedNanos / writeOps, ContainerClientMetrics.acquire().getWriteChunksDuringWrite().value() - writeChunksBefore, ContainerClientMetrics.acquire().getFlushesDuringWrite().value() @@ -482,23 +360,21 @@ private static Result runTimedWrite(byte[] sourceBuffer, int writeSize, } private static WorkerResult runWorker(byte[] sourceBuffer, int writeSize, - boolean heapBuffer, ChecksumType checksumType, long deadline, + ChecksumType checksumType, long deadline, BenchmarkMockXceiverClient mockClient) throws Exception { long bytesWritten = 0; long writeOps = 0; long blocksWritten = 0; - try (BenchmarkSession session = BenchmarkSession.open(heapBuffer, checksumType, mockClient)) { + try (BenchmarkSession session = BenchmarkSession.open(checksumType, mockClient)) { while (System.nanoTime() < deadline) { try (BlockOutputStream stream = session.newStream()) { - int remaining = FILE_SIZE; - while (remaining > 0 && System.nanoTime() < deadline) { - int chunk = Math.min(writeSize, remaining); - stream.write(sourceBuffer, 0, chunk); - bytesWritten += chunk; + for (int offset = 0; offset + writeSize <= sourceBuffer.length + && System.nanoTime() < deadline; offset += writeSize) { + stream.write(sourceBuffer, offset, writeSize); + bytesWritten += writeSize; writeOps++; - remaining -= chunk; } } blocksWritten++; @@ -540,34 +416,29 @@ private BenchmarkSession(Pipeline pipeline, ContainerClientMetrics metrics, this.responseExecutor = responseExecutor; } - static BenchmarkSession open(boolean heapBuffer, - ChecksumType checksumType, BenchmarkMockXceiverClient mockClient) throws IOException { + static BenchmarkSession open(ChecksumType checksumType, + BenchmarkMockXceiverClient mockClient) throws IOException { final Pipeline pipeline = mockClient.getPipeline(); final XceiverClientManager xcm = mock(XceiverClientManager.class); when(xcm.acquireClient(any())).thenReturn(mockClient); final OzoneClientConfig config = new OzoneClientConfig(); config.setStreamBufferSize(STREAM_BUFFER_SIZE); - config.setStreamBufferMaxSize(POOL_CAPACITY * STREAM_BUFFER_SIZE); + config.setStreamBufferMaxSize(32 * 1024 * 1024); config.setStreamBufferFlushDelay(false); - config.setStreamBufferFlushSize(POOL_CAPACITY * STREAM_BUFFER_SIZE / 2); + config.setStreamBufferFlushSize(16 * 1024 * 1024); config.setChecksumType(checksumType); - config.setBytesPerChecksum(Math.min(64 * 1024, STREAM_BUFFER_SIZE)); + config.setBytesPerChecksum(64 * 1024); config.validate(); final StreamBufferArgs streamBufferArgs = StreamBufferArgs.getDefaultStreamBufferArgs(pipeline.getReplicationConfig(), config); - // Both modes use the same BufferPool; ChunkBuffer.ALLOCATE_DIRECT controls - // which allocator is called inside ChunkBuffer.allocate(), giving a true - // apples-to-apples comparison of heap vs direct buffer serialisation overhead. - ChunkBuffer.ALLOCATE_DIRECT.set(!heapBuffer); - return new BenchmarkSession( pipeline, ContainerClientMetrics.acquire(), new BufferPool(STREAM_BUFFER_SIZE, POOL_CAPACITY, - ByteStringConversion.createByteBufferConversion(true)), + ByteStringConversion.createByteBufferConversion(UNSAFE_BYTE_OPS)), config, streamBufferArgs, xcm, @@ -607,14 +478,13 @@ private static final class Result { private final long flushesDuringWrite; private Result(long bytesWritten, long writeOps, long blocksWritten, double elapsedSeconds, - double mbPerSec, double nsPerWrite, long writeChunksDuringWrite, - long flushesDuringWrite) { + long writeChunksDuringWrite, long flushesDuringWrite) { this.bytesWritten = bytesWritten; this.writeOps = writeOps; this.blocksWritten = blocksWritten; this.elapsedSeconds = elapsedSeconds; - this.mbPerSec = mbPerSec; - this.nsPerWrite = nsPerWrite; + this.mbPerSec = bytesWritten / elapsedSeconds / (1024.0 * 1024.0); + this.nsPerWrite = writeOps == 0 ? 0 : elapsedSeconds * 1e9 / writeOps; this.writeChunksDuringWrite = writeChunksDuringWrite; this.flushesDuringWrite = flushesDuringWrite; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java index d37b095621cf..db0c7aba6b61 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java @@ -118,6 +118,12 @@ public ChunkBuffer put(ByteBuffer b) { return this; } + @Override + public ChunkBuffer put(byte[] b, int offset, int length) { + buffer.put(b, offset, length); + return this; + } + @Override public ChunkBuffer put(byte b) { buffer.put(b);