From 2fa9ccd74ab4ba6590ed85d1125f9f5badde3069 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Wed, 27 May 2026 11:11:24 -0700 Subject: [PATCH 01/17] add BDN to evaluate overhead of ConsistentRead --- .../ConsistentRead/ConsistentReadContext.cs | 168 ++++++++++++++++++ .../ConsistentReadOperations.cs | 72 ++++++++ .../ConsistentRead/ConsistentReadParams.cs | 47 +++++ 3 files changed, 287 insertions(+) create mode 100644 benchmark/BDN.benchmark/Cluster/ConsistentRead/ConsistentReadContext.cs create mode 100644 benchmark/BDN.benchmark/Cluster/ConsistentRead/ConsistentReadOperations.cs create mode 100644 benchmark/BDN.benchmark/Cluster/ConsistentRead/ConsistentReadParams.cs diff --git a/benchmark/BDN.benchmark/Cluster/ConsistentRead/ConsistentReadContext.cs b/benchmark/BDN.benchmark/Cluster/ConsistentRead/ConsistentReadContext.cs new file mode 100644 index 00000000000..ed80a0385c5 --- /dev/null +++ b/benchmark/BDN.benchmark/Cluster/ConsistentRead/ConsistentReadContext.cs @@ -0,0 +1,168 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System.Net; +using System.Runtime.InteropServices; +using Embedded.server; +using Garnet.cluster; +using Garnet.common; +using Garnet.server; + +namespace BDN.benchmark.Cluster.ConsistentRead +{ + /// + /// Context for consistent read benchmarks — sets up an embedded Garnet server + /// with cluster + AOF + optional multilog, and optionally forces replica mode. + /// + unsafe class ConsistentReadContext + { + EmbeddedRespServer server; + RespServerSession session; + readonly BenchUtils benchUtils = new(); + readonly int port = 7100; + + public static ReadOnlySpan keyTag => "{0}"u8; + public Request getRequest; + public Request mgetRequest; + + public void Dispose() + { + session.Dispose(); + server.Dispose(); + } + + public void SetupInstance(ConsistentReadParams parameters) + { + var opt = new GarnetServerOptions + { + QuietMode = true, + EnableCluster = true, + EndPoints = [new IPEndPoint(IPAddress.Loopback, port)], + CleanClusterConfig = true, + ClusterAnnounceEndpoint = new IPEndPoint(IPAddress.Loopback, port), + EnableAOF = true, + UseAofNullDevice = true, + FastAofTruncate = true, + CommitFrequencyMs = -1, + AofPageSize = "128m", + AofMemorySize = "256m", + }; + + if (parameters.multiLogEnabled) + { + opt.AofPhysicalSublogCount = 1; + opt.AofReplayTaskCount = 4; + opt.EnableFastCommit = true; + } + + if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux)) + opt.CheckpointDir = "/tmp"; + + server = new EmbeddedRespServer(opt, null, new GarnetServerEmbedded()); + session = server.GetRespSession(); + + // Assign all slots + AddSlotRange([(0, 16383)]); + + // Populate keys while still primary + PopulateKeys(); + + // If replica mode, force replica role and pre-seed sequence numbers + if (parameters.replicaMode && parameters.multiLogEnabled) + { + ForceReplicaRole(); + PreSeedSequenceNumbers(); + } + } + + void AddSlotRange(List<(int, int)> slotRanges) + { + foreach (var slotRange in slotRanges) + { + var cmd = $"*4\r\n$7\r\nCLUSTER\r\n$13\r\nADDSLOTSRANGE\r\n" + + $"${NumUtils.CountDigits(slotRange.Item1)}\r\n{slotRange.Item1}\r\n" + + $"${NumUtils.CountDigits(slotRange.Item2)}\r\n{slotRange.Item2}\r\n"; + var bytes = System.Text.Encoding.ASCII.GetBytes(cmd); + fixed (byte* req = bytes) + _ = session.TryConsumeMessages(req, bytes.Length); + } + } + + void PopulateKeys() + { + const int batchSize = 100; + const int keySize = 8; + const int valueSize = 32; + + var pairs = new (byte[], byte[])[batchSize]; + for (var i = 0; i < batchSize; i++) + { + pairs[i] = (new byte[keySize], new byte[valueSize]); + keyTag.CopyTo(pairs[i].Item1.AsSpan()); + benchUtils.RandomBytes(ref pairs[i].Item1, startOffset: keyTag.Length); + benchUtils.RandomBytes(ref pairs[i].Item2); + } + + // SET all keys + var setByteCount = batchSize * ("*3\r\n$3\r\nSET\r\n"u8.Length + 1 + NumUtils.CountDigits(keySize) + 2 + keySize + 2 + 1 + NumUtils.CountDigits(valueSize) + 2 + valueSize + 2); + var setReq = new Request(setByteCount); + var curr = setReq.ptr; + var end = curr + setReq.buffer.Length; + for (var i = 0; i < batchSize; i++) + { + _ = RespWriteUtils.TryWriteArrayLength(3, ref curr, end); + _ = RespWriteUtils.TryWriteBulkString("SET"u8, ref curr, end); + _ = RespWriteUtils.TryWriteBulkString(pairs[i].Item1, ref curr, end); + _ = RespWriteUtils.TryWriteBulkString(pairs[i].Item2, ref curr, end); + } + _ = session.TryConsumeMessages(setReq.ptr, setReq.buffer.Length); + + // Build GET request buffer + var getByteCount = batchSize * ("*2\r\n$3\r\nGET\r\n"u8.Length + 1 + NumUtils.CountDigits(keySize) + 2 + keySize + 2); + getRequest = new Request(getByteCount); + curr = getRequest.ptr; + end = curr + getRequest.buffer.Length; + for (var i = 0; i < batchSize; i++) + { + _ = RespWriteUtils.TryWriteArrayLength(2, ref curr, end); + _ = RespWriteUtils.TryWriteBulkString("GET"u8, ref curr, end); + _ = RespWriteUtils.TryWriteBulkString(pairs[i].Item1, ref curr, end); + } + + // Build MGET request buffer + var mGetHeaderSize = 1 + NumUtils.CountDigits(1 + batchSize) + 2 + "$4\r\nMGET\r\n"u8.Length; + var getRespSize = 1 + NumUtils.CountDigits(keySize) + 2 + keySize + 2; + var mGetByteCount = mGetHeaderSize + (batchSize * getRespSize); + mgetRequest = new Request(mGetByteCount); + curr = mgetRequest.ptr; + end = curr + mgetRequest.buffer.Length; + _ = RespWriteUtils.TryWriteArrayLength(1 + batchSize, ref curr, end); + _ = RespWriteUtils.TryWriteBulkString("MGET"u8, ref curr, end); + for (var i = 0; i < batchSize; i++) + _ = RespWriteUtils.TryWriteBulkString(pairs[i].Item1, ref curr, end); + } + + void ForceReplicaRole() + { + var clusterProvider = (ClusterProvider)server.StoreWrapper.clusterProvider; + clusterProvider.clusterManager.TrySetLocalNodeRole(NodeRole.REPLICA); + } + + void PreSeedSequenceNumbers() + { + // Advance all virtual sublog frontier sequence numbers to long.MaxValue + // so that consistent reads never block waiting for replay to catch up. + var appendOnlyFile = server.StoreWrapper.appendOnlyFile; + var readConsistencyManager = appendOnlyFile.readConsistencyManager; + if (readConsistencyManager == null) + return; + + var physicalSublogCount = appendOnlyFile.serverOptions.AofPhysicalSublogCount; + for (var i = 0; i < physicalSublogCount; i++) + readConsistencyManager.UpdatePhysicalSublogMaxSequenceNumber(i, long.MaxValue); + } + + public void Consume(byte* ptr, int length) + => session.TryConsumeMessages(ptr, length); + } +} diff --git a/benchmark/BDN.benchmark/Cluster/ConsistentRead/ConsistentReadOperations.cs b/benchmark/BDN.benchmark/Cluster/ConsistentRead/ConsistentReadOperations.cs new file mode 100644 index 00000000000..9e5c6e82d29 --- /dev/null +++ b/benchmark/BDN.benchmark/Cluster/ConsistentRead/ConsistentReadOperations.cs @@ -0,0 +1,72 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using BenchmarkDotNet.Attributes; + +namespace BDN.benchmark.Cluster.ConsistentRead +{ + /// + /// Benchmarks for GET operations exercising the consistent read path + /// in cluster mode with multilog enabled. + /// + [MemoryDiagnoser] + public unsafe class ConsistentReadOperations + { + /// + /// Benchmark parameters + /// + [ParamsSource(nameof(ConsistentReadParamsProvider))] + public ConsistentReadParams Params { get; set; } + + /// + /// Parameters provider + /// + public IEnumerable ConsistentReadParamsProvider() + { + yield return new(false, false); // SingleLog (baseline) + yield return new(true, false); // MultiLog+Primary + yield return new(true, true); // MultiLog+Replica (exercises consistent read) + } + + ConsistentReadContext cc; + + /// + /// Global setup + /// + [GlobalSetup] + public void GlobalSetup() + { + cc = new ConsistentReadContext(); + cc.SetupInstance(Params); + } + + /// + /// Global cleanup + /// + [GlobalCleanup] + public void GlobalCleanup() + { + cc.Dispose(); + } + + /// + /// GET benchmark — 100 sequential GETs per invocation. + /// Exercises: sublog idx calc, sketch lookup, inProgress lock, ResetTimeoutCts, SwitchActiveDatabaseSession. + /// + [Benchmark] + public void Get() + { + cc.Consume(cc.getRequest.ptr, cc.getRequest.buffer.Length); + } + + /// + /// MGET benchmark — single MGET with 100 keys per invocation. + /// Exercises: batch consistent read path (BeforeConsistentReadKeyBatch). + /// + [Benchmark] + public void MGet() + { + cc.Consume(cc.mgetRequest.ptr, cc.mgetRequest.buffer.Length); + } + } +} diff --git a/benchmark/BDN.benchmark/Cluster/ConsistentRead/ConsistentReadParams.cs b/benchmark/BDN.benchmark/Cluster/ConsistentRead/ConsistentReadParams.cs new file mode 100644 index 00000000000..2a822b1f259 --- /dev/null +++ b/benchmark/BDN.benchmark/Cluster/ConsistentRead/ConsistentReadParams.cs @@ -0,0 +1,47 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +namespace BDN.benchmark.Cluster.ConsistentRead +{ + /// + /// Parameters for consistent read benchmarks + /// + public struct ConsistentReadParams + { + /// + /// Whether multi-log (single physical log + multi-replay) is enabled + /// + public bool multiLogEnabled; + + /// + /// Whether the node operates in replica mode (exercises consistent read path) + /// + public bool replicaMode; + + /// + /// Constructor + /// + public ConsistentReadParams(bool multiLogEnabled, bool replicaMode) + { + this.multiLogEnabled = multiLogEnabled; + this.replicaMode = replicaMode; + } + + /// + /// String representation for BDN output + /// + public override string ToString() + { + if (!multiLogEnabled && !replicaMode) + return "SingleLog"; + + if (multiLogEnabled && !replicaMode) + return "MultiLog+Primary"; + + if (multiLogEnabled && replicaMode) + return "MultiLog+Replica"; + + return "Invalid"; + } + } +} From fe65700c24c3ecdb269a77e8a8c0b0a24477b439 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Wed, 27 May 2026 11:48:34 -0700 Subject: [PATCH 02/17] cache consistentRead switch --- libs/server/Resp/RespServerSession.cs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/libs/server/Resp/RespServerSession.cs b/libs/server/Resp/RespServerSession.cs index 0efdfd44955..6bf1d2719ca 100644 --- a/libs/server/Resp/RespServerSession.cs +++ b/libs/server/Resp/RespServerSession.cs @@ -465,6 +465,7 @@ internal bool CanRunModule() } bool txnSkip = false; + bool consistentReadActive = false; public override int TryConsumeMessages(byte* reqBuffer, int bytesReceived) { @@ -484,25 +485,23 @@ public override int TryConsumeMessages(byte* reqBuffer, int bytesReceived) if (storeWrapper.EnforceConsistentRead()) { - try + txnSkip = false; + Debug.Assert(consistentReadDBSession != null); + if (!consistentReadActive) { // We actively switch session because we aim to avoid performing any additional checks or switches on the normal processing path // This requires us to cache txnSkip result since the txnManager instance will change when the following finally executes // Switching is required because we cannot guarantee the role of the node outside the epoch protection - txnSkip = false; - Debug.Assert(consistentReadDBSession != null); SwitchActiveDatabaseSession(consistentReadDBSession); - ProcessMessages(ref consistentReadGarnetApi, ref txnConsistentReadApi); - txnSkip = txnManager.IsSkippingOperations(); - } - finally - { - // Switch back to normal session in the event a failover results in this node to become a primary - SwitchActiveDatabaseSession(databaseSessions.Map[0]); + consistentReadActive = true; } + ProcessMessages(ref consistentReadGarnetApi, ref txnConsistentReadApi); + txnSkip = txnManager.IsSkippingOperations(); } else { + if (consistentReadActive) + SwitchActiveDatabaseSession(databaseSessions.Map[0]); txnSkip = false; ProcessMessages(ref basicGarnetApi, ref transactionalGarnetApi); txnSkip = txnManager.IsSkippingOperations(); From 5c80ee25258ed66a3078963f12c6745e4cf30238 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Wed, 27 May 2026 13:11:30 -0700 Subject: [PATCH 03/17] reduce collision by using the high order bits when accessing the sketch --- .../Cluster/ConsistentRead/ConsistentReadContext.cs | 2 +- .../Cluster/ConsistentRead/ConsistentReadOperations.cs | 2 +- .../Cluster/ConsistentRead/ConsistentReadParams.cs | 2 +- libs/server/AOF/ReadConsistency/VirtualSublogReplayState.cs | 6 +++--- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/benchmark/BDN.benchmark/Cluster/ConsistentRead/ConsistentReadContext.cs b/benchmark/BDN.benchmark/Cluster/ConsistentRead/ConsistentReadContext.cs index ed80a0385c5..d463c4f4217 100644 --- a/benchmark/BDN.benchmark/Cluster/ConsistentRead/ConsistentReadContext.cs +++ b/benchmark/BDN.benchmark/Cluster/ConsistentRead/ConsistentReadContext.cs @@ -165,4 +165,4 @@ void PreSeedSequenceNumbers() public void Consume(byte* ptr, int length) => session.TryConsumeMessages(ptr, length); } -} +} \ No newline at end of file diff --git a/benchmark/BDN.benchmark/Cluster/ConsistentRead/ConsistentReadOperations.cs b/benchmark/BDN.benchmark/Cluster/ConsistentRead/ConsistentReadOperations.cs index 9e5c6e82d29..b5f5606605e 100644 --- a/benchmark/BDN.benchmark/Cluster/ConsistentRead/ConsistentReadOperations.cs +++ b/benchmark/BDN.benchmark/Cluster/ConsistentRead/ConsistentReadOperations.cs @@ -69,4 +69,4 @@ public void MGet() cc.Consume(cc.mgetRequest.ptr, cc.mgetRequest.buffer.Length); } } -} +} \ No newline at end of file diff --git a/benchmark/BDN.benchmark/Cluster/ConsistentRead/ConsistentReadParams.cs b/benchmark/BDN.benchmark/Cluster/ConsistentRead/ConsistentReadParams.cs index 2a822b1f259..81ff441bcff 100644 --- a/benchmark/BDN.benchmark/Cluster/ConsistentRead/ConsistentReadParams.cs +++ b/benchmark/BDN.benchmark/Cluster/ConsistentRead/ConsistentReadParams.cs @@ -44,4 +44,4 @@ public override string ToString() return "Invalid"; } } -} +} \ No newline at end of file diff --git a/libs/server/AOF/ReadConsistency/VirtualSublogReplayState.cs b/libs/server/AOF/ReadConsistency/VirtualSublogReplayState.cs index 7baa2b148fd..c8f23e210d4 100644 --- a/libs/server/AOF/ReadConsistency/VirtualSublogReplayState.cs +++ b/libs/server/AOF/ReadConsistency/VirtualSublogReplayState.cs @@ -35,7 +35,7 @@ public VirtualSublogReplayState() /// The hash value for which to retrieve the frontier sequence number. /// The frontier sequence number corresponding to the specified hash value. public readonly long GetFrontierSequenceNumber(long hash) - => Math.Max(sketch[hash & SketchSlotMask], sketchMaxValue); + => Math.Max(sketch[(hash >>> 32) & SketchSlotMask], sketchMaxValue); /// /// Gets the sequence number associated with the specified hash key. @@ -43,7 +43,7 @@ public readonly long GetFrontierSequenceNumber(long hash) /// The hash value for which to retrieve the sequence number. /// The sequence number corresponding to the given hash key. public readonly long GetKeySequenceNumber(long hash) - => sketch[hash & SketchSlotMask]; + => sketch[(hash >>> 32) & SketchSlotMask]; /// /// Updates the maximum observed sequence number. @@ -65,7 +65,7 @@ public void UpdateMaxSequenceNumber(long sequenceNumber) /// current value to have an effect. public void UpdateKeySequenceNumber(long hash, long sequenceNumber) { - _ = Utility.MonotonicUpdate(ref sketch[hash & SketchSlotMask], sequenceNumber, out _); + _ = Utility.MonotonicUpdate(ref sketch[(hash >>> 32) & SketchSlotMask], sequenceNumber, out _); _ = Utility.MonotonicUpdate(ref sketchMaxValue, sequenceNumber, out _); SignalAdvanceTime(); } From 21a32ac566698731c74f5eb96bbfab7576427d82 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Wed, 27 May 2026 14:52:46 -0700 Subject: [PATCH 04/17] eliminate TimeQueue contention by using inline timeout evaluation through passing the timeout directly through the method signature --- .../ReadConsistency/ReadConsistencyManager.cs | 14 +++++--- .../ReplicaReadSessionContext.cs | 34 ++++--------------- .../VirtualSublogReplayState.cs | 8 +++-- 3 files changed, 21 insertions(+), 35 deletions(-) diff --git a/libs/server/AOF/ReadConsistency/ReadConsistencyManager.cs b/libs/server/AOF/ReadConsistency/ReadConsistencyManager.cs index 420bb5036a0..01aaa9a0925 100644 --- a/libs/server/AOF/ReadConsistency/ReadConsistencyManager.cs +++ b/libs/server/AOF/ReadConsistency/ReadConsistencyManager.cs @@ -132,9 +132,10 @@ public void CheckConsistencyManagerVersion(ref ReplicaReadSessionContext replica /// /// /// + /// /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - void VerifyKeyFreshness(long keyHash, ref ReplicaReadSessionContext replicaReadSessionContext, CancellationToken ct) + void VerifyKeyFreshness(long keyHash, ref ReplicaReadSessionContext replicaReadSessionContext, TimeSpan timeout, CancellationToken ct) { var virtualSublogIdx = appendOnlyFile.Log.GetVirtualSublogIdx(keyHash); @@ -148,6 +149,7 @@ void VerifyKeyFreshness(long keyHash, ref ReplicaReadSessionContext replicaReadS vsrs[virtualSublogIdx].WaitForSequenceNumber( keyHash, replicaReadSessionContext.maximumSessionSequenceNumber, + timeout, ct); } } @@ -166,14 +168,15 @@ void VerifyKeyFreshness(long keyHash, ref ReplicaReadSessionContext replicaReadS /// /// /// + /// /// - public void BeforeConsistentReadKey(long hash, ref ReplicaReadSessionContext replicaReadSessionContext, CancellationToken ct) + public void BeforeConsistentReadKey(long hash, ref ReplicaReadSessionContext replicaReadSessionContext, TimeSpan timeout, CancellationToken ct) { // Check version CheckConsistencyManagerVersion(ref replicaReadSessionContext); // Verify key freshness - VerifyKeyFreshness(hash, ref replicaReadSessionContext, ct); + VerifyKeyFreshness(hash, ref replicaReadSessionContext, timeout, ct); } /// @@ -195,13 +198,14 @@ public void AfterConsistentReadKey(ref ReplicaReadSessionContext replicaReadSess /// /// /// + /// /// /// - public void BeforeConsistentReadKeyBatch(ReadOnlySpan key, ref ReplicaReadSessionContext batchReadContext, CancellationToken ct, out long hash) + public void BeforeConsistentReadKeyBatch(ReadOnlySpan key, ref ReplicaReadSessionContext batchReadContext, TimeSpan timeout, CancellationToken ct, out long hash) { // Verify key freshness hash = GarnetLog.HASH(key); - VerifyKeyFreshness(hash, ref batchReadContext, ct); + VerifyKeyFreshness(hash, ref batchReadContext, timeout, ct); // Keep track of max sequence number to check for updates after batch read. batchReadContext.maximumSessionSequenceNumber = Math.Max( diff --git a/libs/server/AOF/ReadConsistency/ReplicaReadSessionContext.cs b/libs/server/AOF/ReadConsistency/ReplicaReadSessionContext.cs index 01543b41bca..15a6acd27dd 100644 --- a/libs/server/AOF/ReadConsistency/ReplicaReadSessionContext.cs +++ b/libs/server/AOF/ReadConsistency/ReplicaReadSessionContext.cs @@ -62,14 +62,14 @@ public class ReadSessionState : IDisposable ReplicaReadSessionContext batchReadContext; /// - /// A cancellation token source used to signal cancellation for consistent read operations. + /// A cancellation token source used to signal cancellation for consistent read operations (e.g., on dispose). /// readonly CancellationTokenSource consistentReadCts; /// - /// Timeout cancellation token source. + /// Timeout duration for consistent read wait operations. /// - CancellationTokenSource timeoutCts; + readonly TimeSpan readTimeout; /// /// Consistent read in progress lock @@ -108,8 +108,7 @@ public ReadSessionState(GarnetAppendOnlyFile appendOnlyFile, GarnetServerOptions this.serverOptions = serverOptions; replicaReadContext = new() { sessionVersion = -1, maximumSessionSequenceNumber = 0, lastVirtualSublogIdx = -1 }; consistentReadCts = new(); - timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(consistentReadCts.Token); - timeoutCts.CancelAfter(serverOptions.ReplicaSyncTimeout); + readTimeout = serverOptions.ReplicaSyncTimeout; } /// @@ -118,25 +117,8 @@ public ReadSessionState(GarnetAppendOnlyFile appendOnlyFile, GarnetServerOptions public void Dispose() { consistentReadCts.Cancel(); - timeoutCts.Cancel(); inProgress.WriteLock(); consistentReadCts.Dispose(); - timeoutCts.Dispose(); - } - - void ResetTimeoutCts() - { - if (timeoutCts.TryReset()) - { - timeoutCts.CancelAfter(serverOptions.ReplicaSyncTimeout); - } - else - { - // TryReset failed (too many resets), recreate the CTS - timeoutCts?.Dispose(); - timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(consistentReadCts.Token); - timeoutCts.CancelAfter(serverOptions.ReplicaSyncTimeout); - } } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -146,8 +128,7 @@ public void BeforeConsistentReadKeyCallback(long hash) throw new GarnetException($"Failed to acquire inProgress lock at {nameof(BeforeConsistentReadKeyCallback)}"); try { - ResetTimeoutCts(); - appendOnlyFile.readConsistencyManager.BeforeConsistentReadKey(hash & long.MaxValue, ref replicaReadContext, timeoutCts.Token); + appendOnlyFile.readConsistencyManager.BeforeConsistentReadKey(hash & long.MaxValue, ref replicaReadContext, readTimeout, consistentReadCts.Token); } finally { @@ -171,7 +152,7 @@ public void BeforeConsistentReadKeyBatch(ReadOnlySpan parameters { var keyCount = parameters.Length; var consistencyManager = appendOnlyFile.readConsistencyManager; - // First check if version of consistency mananger has changed + // First check if version of consistency manager has changed appendOnlyFile.readConsistencyManager.CheckConsistencyManagerVersion(ref replicaReadContext); // Allocate array to cache key hashes for batch read @@ -186,8 +167,7 @@ public void BeforeConsistentReadKeyBatch(ReadOnlySpan parameters for (var i = 0; i < parameters.Length; i++) { var key = parameters[i]; - ResetTimeoutCts(); - consistencyManager.BeforeConsistentReadKeyBatch(key.ReadOnlySpan, ref batchReadContext, timeoutCts.Token, out var hash); + consistencyManager.BeforeConsistentReadKeyBatch(key.ReadOnlySpan, ref batchReadContext, readTimeout, consistentReadCts.Token, out var hash); keyHashCache[i] = hash; } } diff --git a/libs/server/AOF/ReadConsistency/VirtualSublogReplayState.cs b/libs/server/AOF/ReadConsistency/VirtualSublogReplayState.cs index c8f23e210d4..1db6977783a 100644 --- a/libs/server/AOF/ReadConsistency/VirtualSublogReplayState.cs +++ b/libs/server/AOF/ReadConsistency/VirtualSublogReplayState.cs @@ -94,8 +94,9 @@ void SignalAdvanceTime() /// /// The hash value identifying the session whose sequence number is being monitored. /// The target sequence number to wait for. - /// A cancellation token that can be used to cancel the wait operation. - public void WaitForSequenceNumber(long hash, long maximumSessionSequenceNumber, CancellationToken ct) + /// The maximum duration to wait before timing out. + /// A cancellation token that can be used to cancel the wait operation (e.g., on dispose). + public void WaitForSequenceNumber(long hash, long maximumSessionSequenceNumber, TimeSpan timeout, CancellationToken ct) { while (true) { @@ -109,7 +110,8 @@ public void WaitForSequenceNumber(long hash, long maximumSessionSequenceNumber, try { - updateSignal.Wait(ct); + if (!updateSignal.Wait(timeout, ct)) + throw new TimeoutException("Consistent read timed out waiting for replay to catch up."); } finally { From 6c55ad74761046cfa9cbe9a7be82c94ce1b02a8d Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Wed, 27 May 2026 17:06:41 -0700 Subject: [PATCH 05/17] implement a list of waiters and allow for a fixed number of spin waits --- .../VirtualSublogReplayState.cs | 167 +++++++++++++----- 1 file changed, 124 insertions(+), 43 deletions(-) diff --git a/libs/server/AOF/ReadConsistency/VirtualSublogReplayState.cs b/libs/server/AOF/ReadConsistency/VirtualSublogReplayState.cs index 1db6977783a..66ee4e0fb2d 100644 --- a/libs/server/AOF/ReadConsistency/VirtualSublogReplayState.cs +++ b/libs/server/AOF/ReadConsistency/VirtualSublogReplayState.cs @@ -2,7 +2,9 @@ // Licensed under the MIT license. using System; +using System.Runtime.CompilerServices; using System.Threading; +using Garnet.common; using Tsavorite.core; namespace Garnet.server @@ -12,11 +14,23 @@ internal struct VirtualSublogReplayState const int SketchSlotSize = 1 << 15; const int SketchSlotMask = SketchSlotSize - 1; + /// + /// Maximum number of spin iterations before falling back to the waiter queue. + /// + const int MaxSpinCount = 64; + readonly long[] sketch = new long[SketchSlotSize]; long sketchMaxValue; + + /// + /// Lock protecting the intrusive waiter list. + /// readonly object @lock = new(); - readonly SemaphoreSlim updateSignal = new(0); - int waiterCount; + + /// + /// Head of the intrusive sorted linked list of waiters (ascending by target sequence number). + /// + WaiterNode waiterHead; public readonly long Max => sketchMaxValue; @@ -27,100 +41,167 @@ public VirtualSublogReplayState() throw new InvalidOperationException($"Size ({SketchSlotSize}) must be a power of 2"); Array.Clear(sketch); sketchMaxValue = 0; + waiterHead = null; } /// /// Gets the current frontier sequence number associated with the specified hash value. /// - /// The hash value for which to retrieve the frontier sequence number. - /// The frontier sequence number corresponding to the specified hash value. + [MethodImpl(MethodImplOptions.AggressiveInlining)] public readonly long GetFrontierSequenceNumber(long hash) - => Math.Max(sketch[(hash >>> 32) & SketchSlotMask], sketchMaxValue); + => Math.Max(Volatile.Read(ref Unsafe.AsRef(in sketch[(hash >>> 32) & SketchSlotMask])), + Volatile.Read(ref Unsafe.AsRef(in sketchMaxValue))); /// /// Gets the sequence number associated with the specified hash key. /// - /// The hash value for which to retrieve the sequence number. - /// The sequence number corresponding to the given hash key. + [MethodImpl(MethodImplOptions.AggressiveInlining)] public readonly long GetKeySequenceNumber(long hash) - => sketch[(hash >>> 32) & SketchSlotMask]; + => Volatile.Read(ref Unsafe.AsRef(in sketch[(hash >>> 32) & SketchSlotMask])); /// /// Updates the maximum observed sequence number. /// /// Updates are thread-safe and guaranteed to be monotonically increasing. - /// The sequence number to compare against the current maximum. public void UpdateMaxSequenceNumber(long sequenceNumber) { _ = Utility.MonotonicUpdate(ref sketchMaxValue, sequenceNumber, out _); - SignalAdvanceTime(); + SignalWaiters(); } /// /// Updates the sequence number associated with the specified key hash. /// /// Updates are thread-safe and guaranteed to be monotonically increasing. - /// The hash value identifying the key whose sequence number is to be updated. - /// The new sequence number to associate with the specified key hash. Must be greater than or equal to the - /// current value to have an effect. public void UpdateKeySequenceNumber(long hash, long sequenceNumber) { _ = Utility.MonotonicUpdate(ref sketch[(hash >>> 32) & SketchSlotMask], sequenceNumber, out _); _ = Utility.MonotonicUpdate(ref sketchMaxValue, sequenceNumber, out _); - SignalAdvanceTime(); + SignalWaiters(); } /// - /// Signals that time should advance, allowing any awaiting operations to proceed. + /// Signals waiters whose target sequence numbers have been reached. + /// Walks from the head (lowest target) and signals all satisfied waiters via O(1) unlink. /// - void SignalAdvanceTime() + private void SignalWaiters() { - var releaseCount = 0; - if (Volatile.Read(ref waiterCount) == 0) + if (waiterHead == null) return; lock (@lock) { - releaseCount = waiterCount; + var currentMax = Volatile.Read(ref sketchMaxValue); + while (waiterHead != null && waiterHead.TargetSequenceNumber < currentMax) + { + var node = waiterHead; + waiterHead = node.Next; + if (waiterHead != null) + waiterHead.Prev = null; + node.Next = null; + node.Signal.Set(); + } } - - if (releaseCount > 0) - updateSignal.Release(releaseCount); } /// /// Waits until the session's frontier sequence number for the specified hash reaches or exceeds /// the given maximum sequence number. /// - /// The hash value identifying the session whose sequence number is being monitored. - /// The target sequence number to wait for. - /// The maximum duration to wait before timing out. - /// A cancellation token that can be used to cancel the wait operation (e.g., on dispose). public void WaitForSequenceNumber(long hash, long maximumSessionSequenceNumber, TimeSpan timeout, CancellationToken ct) { - while (true) + // Phase 1: SpinWait — fast path when replay is keeping up + var spinner = new SpinWait(); + for (var i = 0; i < MaxSpinCount; i++) { - lock (@lock) - { - if (maximumSessionSequenceNumber < GetFrontierSequenceNumber(hash)) - return; + if (maximumSessionSequenceNumber < GetFrontierSequenceNumber(hash)) + return; + spinner.SpinOnce(sleep1Threshold: -1); + } - waiterCount++; - } + // Phase 2: Register in waiter list and block + using var signal = new ManualResetEventSlim(false); + var node = new WaiterNode(maximumSessionSequenceNumber, signal); - try - { - if (!updateSignal.Wait(timeout, ct)) - throw new TimeoutException("Consistent read timed out waiting for replay to catch up."); - } - finally + lock (@lock) + { + // Double-check after acquiring lock + if (maximumSessionSequenceNumber < GetFrontierSequenceNumber(hash)) + return; + InsertWaiter(node); + } + + try + { + if (!signal.Wait(timeout, ct)) { - lock (@lock) - { - waiterCount--; - } + RemoveWaiter(node); + ExceptionUtils.ThrowException(new TimeoutException("Consistent read timed out waiting for replay to catch up.")); } } + catch (OperationCanceledException) + { + RemoveWaiter(node); + throw; + } + } + + /// + /// Inserts a waiter node into the sorted linked list (ascending by target sequence number). + /// Must be called under lock. + /// + private void InsertWaiter(WaiterNode node) + { + if (waiterHead == null || node.TargetSequenceNumber <= waiterHead.TargetSequenceNumber) + { + // Insert at head + node.Next = waiterHead; + _ = (waiterHead?.Prev = node); + waiterHead = node; + return; + } + + // Walk to find insertion point + var current = waiterHead; + while (current.Next != null && current.Next.TargetSequenceNumber <= node.TargetSequenceNumber) + current = current.Next; + + // Insert after current + node.Next = current.Next; + node.Prev = current; + _ = (current.Next?.Prev = node); + current.Next = node; + } + + /// + /// Removes a waiter node from the linked list in O(1). Used on timeout/cancellation. + /// + private void RemoveWaiter(WaiterNode node) + { + lock (@lock) + { + if (node.Prev != null) + node.Prev.Next = node.Next; + else if (waiterHead == node) + waiterHead = node.Next; + + _ = (node.Next?.Prev = node.Prev); + + node.Prev = null; + node.Next = null; + } + } + + /// + /// Intrusive linked list node for a waiter. Holds prev/next pointers, the target + /// sequence number, and a signal to wake the waiting thread. + /// + sealed class WaiterNode(long targetSequenceNumber, ManualResetEventSlim signal) + { + public readonly long TargetSequenceNumber = targetSequenceNumber; + public readonly ManualResetEventSlim Signal = signal; + public WaiterNode Prev; + public WaiterNode Next; } } } \ No newline at end of file From b02a7f9b3fb07e44f99bfd18c17db38e07c43ad0 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Mon, 1 Jun 2026 11:11:43 -0700 Subject: [PATCH 06/17] renaming and cleanup --- libs/server/AOF/GarnetLog.cs | 2 +- .../ReadConsistency/ReadConsistencyManager.cs | 8 ++++---- .../ReplicaReadSessionContext.cs | 20 +++++++++---------- .../VirtualSublogReplayState.cs | 9 ++++++--- .../MainStore/MainSessionFunctions.cs | 16 +++++++-------- .../ObjectStore/ObjectSessionFunctions.cs | 16 +++++++-------- .../UnifiedStore/UnifiedSessionFunctions.cs | 16 +++++++-------- .../VectorStore/VectorSessionFunctions.cs | 16 +++++++-------- .../Common/ArrayKeyIterationFunctions.cs | 4 ++-- .../ClientSession/ConsistentReadContext.cs | 20 +++++++++---------- .../ClientSession/NoOpSessionFunctions.cs | 8 ++++---- .../TransactionalConsistentReadContext.cs | 12 +++++------ .../Index/Interfaces/ISessionFunctions.cs | 8 ++++---- .../Index/Interfaces/SessionFunctionsBase.cs | 8 ++++---- 14 files changed, 83 insertions(+), 80 deletions(-) diff --git a/libs/server/AOF/GarnetLog.cs b/libs/server/AOF/GarnetLog.cs index cd700ee16c5..ef7aaa9082f 100644 --- a/libs/server/AOF/GarnetLog.cs +++ b/libs/server/AOF/GarnetLog.cs @@ -92,7 +92,7 @@ public static long HASH(ReadOnlySpan key) [MethodImpl(MethodImplOptions.AggressiveInlining)] public int GetReplayTaskIdx(long hash) => (int)(((ulong)hash / (uint)physicalSublogCount) % (uint)replayTaskCount); [MethodImpl(MethodImplOptions.AggressiveInlining)] - public int GetVirtualSublogIdx(long hash) => GetPhysicalSublogIdx(hash) * replayTaskCount + GetReplayTaskIdx(hash); + public int GetVirtualSublogIdx(long hash) => (GetPhysicalSublogIdx(hash) * replayTaskCount) + GetReplayTaskIdx(hash); [MethodImpl(MethodImplOptions.AggressiveInlining)] public int GetPhysicalSublogIdx(ReadOnlySpan key) => GetPhysicalSublogIdx(HASH(key)); diff --git a/libs/server/AOF/ReadConsistency/ReadConsistencyManager.cs b/libs/server/AOF/ReadConsistency/ReadConsistencyManager.cs index 01aaa9a0925..41445a23469 100644 --- a/libs/server/AOF/ReadConsistency/ReadConsistencyManager.cs +++ b/libs/server/AOF/ReadConsistency/ReadConsistencyManager.cs @@ -170,7 +170,7 @@ void VerifyKeyFreshness(long keyHash, ref ReplicaReadSessionContext replicaReadS /// /// /// - public void BeforeConsistentReadKey(long hash, ref ReplicaReadSessionContext replicaReadSessionContext, TimeSpan timeout, CancellationToken ct) + public void PreSingleKeyConsistentRead(long hash, ref ReplicaReadSessionContext replicaReadSessionContext, TimeSpan timeout, CancellationToken ct) { // Check version CheckConsistencyManagerVersion(ref replicaReadSessionContext); @@ -187,7 +187,7 @@ public void BeforeConsistentReadKey(long hash, ref ReplicaReadSessionContext rep /// we cannot be certain at prepare phase what is the actual sequence number. /// /// - public void AfterConsistentReadKey(ref ReplicaReadSessionContext replicaReadSessionContext) + public void PostSingleKeyConsistentRead(ref ReplicaReadSessionContext replicaReadSessionContext) { replicaReadSessionContext.maximumSessionSequenceNumber = Math.Max( replicaReadSessionContext.maximumSessionSequenceNumber, GetKeySequenceNumber(replicaReadSessionContext.lastHash)); @@ -201,7 +201,7 @@ public void AfterConsistentReadKey(ref ReplicaReadSessionContext replicaReadSess /// /// /// - public void BeforeConsistentReadKeyBatch(ReadOnlySpan key, ref ReplicaReadSessionContext batchReadContext, TimeSpan timeout, CancellationToken ct, out long hash) + public void PreBatchKeyConsistentRead(ReadOnlySpan key, ref ReplicaReadSessionContext batchReadContext, TimeSpan timeout, CancellationToken ct, out long hash) { // Verify key freshness hash = GarnetLog.HASH(key); @@ -218,7 +218,7 @@ public void BeforeConsistentReadKeyBatch(ReadOnlySpan key, ref ReplicaRead /// /// /// - public bool AfterConsistentReadKeyBatch(long hash, ref ReplicaReadSessionContext batchReadContext) + public bool PreBatchKeyConsistentRead(long hash, ref ReplicaReadSessionContext batchReadContext) { var keySequenceNumber = GetKeySequenceNumber(hash); var mSSN = batchReadContext.maximumSessionSequenceNumber; diff --git a/libs/server/AOF/ReadConsistency/ReplicaReadSessionContext.cs b/libs/server/AOF/ReadConsistency/ReplicaReadSessionContext.cs index 15a6acd27dd..3fbca3c5ca3 100644 --- a/libs/server/AOF/ReadConsistency/ReplicaReadSessionContext.cs +++ b/libs/server/AOF/ReadConsistency/ReplicaReadSessionContext.cs @@ -122,13 +122,13 @@ public void Dispose() } [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void BeforeConsistentReadKeyCallback(long hash) + public void PreSingleKeyConsistentRead(long hash) { if (!inProgress.TryReadLock()) - throw new GarnetException($"Failed to acquire inProgress lock at {nameof(BeforeConsistentReadKeyCallback)}"); + throw new GarnetException($"Failed to acquire inProgress lock at {nameof(PreSingleKeyConsistentRead)}"); try { - appendOnlyFile.readConsistencyManager.BeforeConsistentReadKey(hash & long.MaxValue, ref replicaReadContext, readTimeout, consistentReadCts.Token); + appendOnlyFile.readConsistencyManager.PreSingleKeyConsistentRead(hash & long.MaxValue, ref replicaReadContext, readTimeout, consistentReadCts.Token); } finally { @@ -137,17 +137,17 @@ public void BeforeConsistentReadKeyCallback(long hash) } [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void AfterConsistentReadKeyCallback() - => appendOnlyFile.readConsistencyManager.AfterConsistentReadKey(ref replicaReadContext); + public void PostSingleKeyConsistentReadCallback() + => appendOnlyFile.readConsistencyManager.PostSingleKeyConsistentRead(ref replicaReadContext); /// /// Initialize context for read key batch. /// /// - public void BeforeConsistentReadKeyBatch(ReadOnlySpan parameters) + public void PreBatchKeyConsistentReadCallback(ReadOnlySpan parameters) { if (!inProgress.TryReadLock()) - throw new GarnetException($"Failed to acquire inProgress lock at {nameof(BeforeConsistentReadKeyCallback)}"); + throw new GarnetException($"Failed to acquire inProgress lock at {nameof(PreSingleKeyConsistentRead)}"); try { var keyCount = parameters.Length; @@ -167,7 +167,7 @@ public void BeforeConsistentReadKeyBatch(ReadOnlySpan parameters for (var i = 0; i < parameters.Length; i++) { var key = parameters[i]; - consistencyManager.BeforeConsistentReadKeyBatch(key.ReadOnlySpan, ref batchReadContext, readTimeout, consistentReadCts.Token, out var hash); + consistencyManager.PreBatchKeyConsistentRead(key.ReadOnlySpan, ref batchReadContext, readTimeout, consistentReadCts.Token, out var hash); keyHashCache[i] = hash; } } @@ -182,13 +182,13 @@ public void BeforeConsistentReadKeyBatch(ReadOnlySpan parameters /// /// /// - public bool AfterConsistentReadKeyBatch(int keyCount) + public bool PostBatchKeyConsistentReadCallback(int keyCount) { var consistencyManager = appendOnlyFile.readConsistencyManager; for (var i = 0; i < keyCount; i++) { var hash = keyHashCache[i]; - if (!consistencyManager.AfterConsistentReadKeyBatch(hash, ref batchReadContext)) + if (!consistencyManager.PreBatchKeyConsistentRead(hash, ref batchReadContext)) return false; } diff --git a/libs/server/AOF/ReadConsistency/VirtualSublogReplayState.cs b/libs/server/AOF/ReadConsistency/VirtualSublogReplayState.cs index 66ee4e0fb2d..921864b3781 100644 --- a/libs/server/AOF/ReadConsistency/VirtualSublogReplayState.cs +++ b/libs/server/AOF/ReadConsistency/VirtualSublogReplayState.cs @@ -44,12 +44,15 @@ public VirtualSublogReplayState() waiterHead = null; } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + static long GetSketchSlot(long hash) => (hash >> 32) & SketchSlotMask; + /// /// Gets the current frontier sequence number associated with the specified hash value. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] public readonly long GetFrontierSequenceNumber(long hash) - => Math.Max(Volatile.Read(ref Unsafe.AsRef(in sketch[(hash >>> 32) & SketchSlotMask])), + => Math.Max(Volatile.Read(ref Unsafe.AsRef(in sketch[GetSketchSlot(hash)])), Volatile.Read(ref Unsafe.AsRef(in sketchMaxValue))); /// @@ -57,7 +60,7 @@ public readonly long GetFrontierSequenceNumber(long hash) /// [MethodImpl(MethodImplOptions.AggressiveInlining)] public readonly long GetKeySequenceNumber(long hash) - => Volatile.Read(ref Unsafe.AsRef(in sketch[(hash >>> 32) & SketchSlotMask])); + => Volatile.Read(ref Unsafe.AsRef(in sketch[GetSketchSlot(hash)])); /// /// Updates the maximum observed sequence number. @@ -75,7 +78,7 @@ public void UpdateMaxSequenceNumber(long sequenceNumber) /// Updates are thread-safe and guaranteed to be monotonically increasing. public void UpdateKeySequenceNumber(long hash, long sequenceNumber) { - _ = Utility.MonotonicUpdate(ref sketch[(hash >>> 32) & SketchSlotMask], sequenceNumber, out _); + _ = Utility.MonotonicUpdate(ref sketch[GetSketchSlot(hash)], sequenceNumber, out _); _ = Utility.MonotonicUpdate(ref sketchMaxValue, sequenceNumber, out _); SignalWaiters(); } diff --git a/libs/server/Storage/Functions/MainStore/MainSessionFunctions.cs b/libs/server/Storage/Functions/MainStore/MainSessionFunctions.cs index 442721280ae..b3718c6660d 100644 --- a/libs/server/Storage/Functions/MainStore/MainSessionFunctions.cs +++ b/libs/server/Storage/Functions/MainStore/MainSessionFunctions.cs @@ -34,19 +34,19 @@ public void ConvertOutputToHeap(ref StringInput input, ref StringOutput output) } /// - public void BeforeConsistentReadCallback(long hash) - => readSessionState?.BeforeConsistentReadKeyCallback(hash); + public void PreSingleKeyConsistentRead(long hash) + => readSessionState?.PreSingleKeyConsistentRead(hash); /// - public void AfterConsistentReadKeyCallback() - => readSessionState?.AfterConsistentReadKeyCallback(); + public void PostSingleKeyConsistentReadCallback() + => readSessionState?.PostSingleKeyConsistentReadCallback(); /// - public void BeforeConsistentReadKeyBatchCallback(ReadOnlySpan parameters) - => readSessionState?.BeforeConsistentReadKeyBatch(parameters); + public void PreBatchKeyConsistentReadCallback(ReadOnlySpan parameters) + => readSessionState?.PreBatchKeyConsistentReadCallback(parameters); /// - public bool AfterConsistentReadKeyBatchCallback(int keyCount) - => readSessionState != null && readSessionState.AfterConsistentReadKeyBatch(keyCount); + public bool PostBatchKeyConsistentReadCallback(int keyCount) + => readSessionState != null && readSessionState.PostBatchKeyConsistentReadCallback(keyCount); } } \ No newline at end of file diff --git a/libs/server/Storage/Functions/ObjectStore/ObjectSessionFunctions.cs b/libs/server/Storage/Functions/ObjectStore/ObjectSessionFunctions.cs index 89850eb372e..5352edb660c 100644 --- a/libs/server/Storage/Functions/ObjectStore/ObjectSessionFunctions.cs +++ b/libs/server/Storage/Functions/ObjectStore/ObjectSessionFunctions.cs @@ -34,19 +34,19 @@ public void ConvertOutputToHeap(ref ObjectInput input, ref ObjectOutput output) } /// - public void BeforeConsistentReadCallback(long hash) - => readSessionState?.BeforeConsistentReadKeyCallback(hash); + public void PreSingleKeyConsistentRead(long hash) + => readSessionState?.PreSingleKeyConsistentRead(hash); /// - public void AfterConsistentReadKeyCallback() - => readSessionState?.AfterConsistentReadKeyCallback(); + public void PostSingleKeyConsistentReadCallback() + => readSessionState?.PostSingleKeyConsistentReadCallback(); /// - public void BeforeConsistentReadKeyBatchCallback(ReadOnlySpan parameters) - => readSessionState?.BeforeConsistentReadKeyBatch(parameters); + public void PreBatchKeyConsistentReadCallback(ReadOnlySpan parameters) + => readSessionState?.PreBatchKeyConsistentReadCallback(parameters); /// - public bool AfterConsistentReadKeyBatchCallback(int keyCount) - => readSessionState != null && readSessionState.AfterConsistentReadKeyBatch(keyCount); + public bool PostBatchKeyConsistentReadCallback(int keyCount) + => readSessionState != null && readSessionState.PostBatchKeyConsistentReadCallback(keyCount); } } \ No newline at end of file diff --git a/libs/server/Storage/Functions/UnifiedStore/UnifiedSessionFunctions.cs b/libs/server/Storage/Functions/UnifiedStore/UnifiedSessionFunctions.cs index 7f33c36a8d3..91737c6f62d 100644 --- a/libs/server/Storage/Functions/UnifiedStore/UnifiedSessionFunctions.cs +++ b/libs/server/Storage/Functions/UnifiedStore/UnifiedSessionFunctions.cs @@ -30,19 +30,19 @@ public void ConvertOutputToHeap(ref UnifiedInput input, ref UnifiedOutput output } /// - public void BeforeConsistentReadCallback(long hash) - => readSessionState?.BeforeConsistentReadKeyCallback(hash); + public void PreSingleKeyConsistentRead(long hash) + => readSessionState?.PreSingleKeyConsistentRead(hash); /// - public void AfterConsistentReadKeyCallback() - => readSessionState?.AfterConsistentReadKeyCallback(); + public void PostSingleKeyConsistentReadCallback() + => readSessionState?.PostSingleKeyConsistentReadCallback(); /// - public void BeforeConsistentReadKeyBatchCallback(ReadOnlySpan parameters) - => readSessionState?.BeforeConsistentReadKeyBatch(parameters); + public void PreBatchKeyConsistentReadCallback(ReadOnlySpan parameters) + => readSessionState?.PreBatchKeyConsistentReadCallback(parameters); /// - public bool AfterConsistentReadKeyBatchCallback(int keyCount) - => readSessionState != null && readSessionState.AfterConsistentReadKeyBatch(keyCount); + public bool PostBatchKeyConsistentReadCallback(int keyCount) + => readSessionState != null && readSessionState.PostBatchKeyConsistentReadCallback(keyCount); } } \ No newline at end of file diff --git a/libs/server/Storage/Functions/VectorStore/VectorSessionFunctions.cs b/libs/server/Storage/Functions/VectorStore/VectorSessionFunctions.cs index c67423ce2aa..ebe4fc07f24 100644 --- a/libs/server/Storage/Functions/VectorStore/VectorSessionFunctions.cs +++ b/libs/server/Storage/Functions/VectorStore/VectorSessionFunctions.cs @@ -704,19 +704,19 @@ public static void UpdateMigratedElementNamespaces(FrozenDictionary - public void BeforeConsistentReadCallback(long hash) - => readSessionState?.BeforeConsistentReadKeyCallback(hash); + public void PreSingleKeyConsistentRead(long hash) + => readSessionState?.PreSingleKeyConsistentRead(hash); /// - public void AfterConsistentReadKeyCallback() - => readSessionState?.AfterConsistentReadKeyCallback(); + public void PostSingleKeyConsistentReadCallback() + => readSessionState?.PostSingleKeyConsistentReadCallback(); /// - public void BeforeConsistentReadKeyBatchCallback(ReadOnlySpan parameters) - => readSessionState?.BeforeConsistentReadKeyBatch(parameters); + public void PreBatchKeyConsistentReadCallback(ReadOnlySpan parameters) + => readSessionState?.PreBatchKeyConsistentReadCallback(parameters); /// - public bool AfterConsistentReadKeyBatchCallback(int keyCount) - => readSessionState != null && readSessionState.AfterConsistentReadKeyBatch(keyCount); + public bool PostBatchKeyConsistentReadCallback(int keyCount) + => readSessionState != null && readSessionState.PostBatchKeyConsistentReadCallback(keyCount); } } \ No newline at end of file diff --git a/libs/server/Storage/Session/Common/ArrayKeyIterationFunctions.cs b/libs/server/Storage/Session/Common/ArrayKeyIterationFunctions.cs index bd065678f03..845feb86bbd 100644 --- a/libs/server/Storage/Session/Common/ArrayKeyIterationFunctions.cs +++ b/libs/server/Storage/Session/Common/ArrayKeyIterationFunctions.cs @@ -251,9 +251,9 @@ internal ConsistentUnifiedStoreGetDBKeys(ReadSessionState readSessionState) : ba public override bool Reader(in TSourceLogRecord logRecord, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult) { - readSessionState.BeforeConsistentReadKeyCallback(GarnetLog.HASH(logRecord.Key)); + readSessionState.PreSingleKeyConsistentRead(GarnetLog.HASH(logRecord.Key)); var status = base.Reader(in logRecord, recordMetadata, numberOfRecords, out cursorRecordResult); - readSessionState.AfterConsistentReadKeyCallback(); + readSessionState.PostSingleKeyConsistentReadCallback(); return status; } } diff --git a/libs/storage/Tsavorite/cs/src/core/ClientSession/ConsistentReadContext.cs b/libs/storage/Tsavorite/cs/src/core/ClientSession/ConsistentReadContext.cs index d3a62bf0ef3..433ebb61450 100644 --- a/libs/storage/Tsavorite/cs/src/core/ClientSession/ConsistentReadContext.cs +++ b/libs/storage/Tsavorite/cs/src/core/ClientSession/ConsistentReadContext.cs @@ -49,9 +49,9 @@ internal ConsistentReadContext(ClientSession(ref TBatch batch, TContext userContext = de do { Thread.Yield(); - Session.functions.BeforeConsistentReadKeyBatchCallback(batch.Parameters); + Session.functions.PreBatchKeyConsistentReadCallback(batch.Parameters); BasicContext.ReadWithPrefetch(ref batch, userContext); - } while (!Session.functions.AfterConsistentReadKeyBatchCallback(batch.Count)); + } while (!Session.functions.PostBatchKeyConsistentReadCallback(batch.Count)); } #endregion @@ -139,7 +139,7 @@ public void ReadWithPrefetch(ref TBatch batch, TContext userContext = de public bool CompletePending(bool wait = false, bool spinWaitForCommit = false) { var status = BasicContext.CompletePending(wait, spinWaitForCommit); - Session.functions.AfterConsistentReadKeyCallback(); + Session.functions.PostSingleKeyConsistentReadCallback(); return status; } @@ -147,7 +147,7 @@ public bool CompletePending(bool wait = false, bool spinWaitForCommit = false) public bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false) { var status = BasicContext.CompletePendingWithOutputs(out completedOutputs, wait, spinWaitForCommit); - Session.functions.AfterConsistentReadKeyCallback(); + Session.functions.PostSingleKeyConsistentReadCallback(); return status; } @@ -155,14 +155,14 @@ public bool CompletePendingWithOutputs(out CompletedOutputIterator public async ValueTask> CompletePendingWithOutputsAsync(bool waitForCommit = false, CancellationToken token = default) { var status = BasicContext.CompletePendingWithOutputsAsync(waitForCommit, token); - Session.functions.AfterConsistentReadKeyCallback(); + Session.functions.PostSingleKeyConsistentReadCallback(); return await status.ConfigureAwait(false); } diff --git a/libs/storage/Tsavorite/cs/src/core/ClientSession/NoOpSessionFunctions.cs b/libs/storage/Tsavorite/cs/src/core/ClientSession/NoOpSessionFunctions.cs index d412e2eb59c..86cc7ea3e28 100644 --- a/libs/storage/Tsavorite/cs/src/core/ClientSession/NoOpSessionFunctions.cs +++ b/libs/storage/Tsavorite/cs/src/core/ClientSession/NoOpSessionFunctions.cs @@ -162,13 +162,13 @@ public readonly void PostDeleteOperation(TKey key, ref Del public readonly void ConvertOutputToHeap(ref TInput input, ref TOutput output) { } - public readonly void BeforeConsistentReadCallback(long hash) { } + public readonly void PreSingleKeyConsistentRead(long hash) { } - public readonly void AfterConsistentReadKeyCallback() { } + public readonly void PostSingleKeyConsistentReadCallback() { } /// - public readonly void BeforeConsistentReadKeyBatchCallback(ReadOnlySpan parameters) { } + public readonly void PreBatchKeyConsistentReadCallback(ReadOnlySpan parameters) { } - public readonly bool AfterConsistentReadKeyBatchCallback(int keyCount) => true; + public readonly bool PostBatchKeyConsistentReadCallback(int keyCount) => true; } } \ No newline at end of file diff --git a/libs/storage/Tsavorite/cs/src/core/ClientSession/TransactionalConsistentReadContext.cs b/libs/storage/Tsavorite/cs/src/core/ClientSession/TransactionalConsistentReadContext.cs index 1899cb26b8d..3c5731b38a5 100644 --- a/libs/storage/Tsavorite/cs/src/core/ClientSession/TransactionalConsistentReadContext.cs +++ b/libs/storage/Tsavorite/cs/src/core/ClientSession/TransactionalConsistentReadContext.cs @@ -120,9 +120,9 @@ public void Unlock(ReadOnlySpan keys) wher public Status Read(TKey key, ref TInput input, ref TOutput output, TContext userContext = default) { var hash = GetKeyHash(key); - Session.functions.BeforeConsistentReadCallback(hash); + Session.functions.PreSingleKeyConsistentRead(hash); var status = TransactionalContext.Read(key, ref input, ref output, userContext); - Session.functions.AfterConsistentReadKeyCallback(); + Session.functions.PostSingleKeyConsistentReadCallback(); return status; } @@ -170,9 +170,9 @@ public Status Read(TKey key, ref TOutput output, ref ReadOptions readOptions, TC public Status Read(TKey key, ref TInput input, ref TOutput output, ref ReadOptions readOptions, out RecordMetadata recordMetadata, TContext userContext = default) { var hash = GetKeyHash(key); - Session.functions.BeforeConsistentReadCallback(hash); + Session.functions.PreSingleKeyConsistentRead(hash); var status = TransactionalContext.Read(key, ref input, ref output, ref readOptions, out recordMetadata, userContext); - Session.functions.AfterConsistentReadKeyCallback(); + Session.functions.PostSingleKeyConsistentReadCallback(); return status; } @@ -197,9 +197,9 @@ public void ReadWithPrefetch(ref TBatch batch, TContext userContext = de do { Thread.Yield(); - Session.functions.BeforeConsistentReadKeyBatchCallback(batch.Parameters); + Session.functions.PreBatchKeyConsistentReadCallback(batch.Parameters); TransactionalContext.ReadWithPrefetch(ref batch, userContext); - } while (!Session.functions.AfterConsistentReadKeyBatchCallback(batch.Count)); + } while (!Session.functions.PostBatchKeyConsistentReadCallback(batch.Count)); } #endregion Read Methods (To be overridden with custom logic) diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Interfaces/ISessionFunctions.cs b/libs/storage/Tsavorite/cs/src/core/Index/Interfaces/ISessionFunctions.cs index c7d11b41e74..ab4c8266e0d 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Interfaces/ISessionFunctions.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Interfaces/ISessionFunctions.cs @@ -352,23 +352,23 @@ void PostDeleteOperation(TKey key, ref DeleteInfo deleteIn /// Called before reading a single key to verify key freshness and enforce prefix consistency. /// /// The key hash about to be read - void BeforeConsistentReadCallback(long hash); + void PreSingleKeyConsistentRead(long hash); /// /// Called after a single key read to update the session timestamp. /// - void AfterConsistentReadKeyCallback(); + void PostSingleKeyConsistentReadCallback(); /// /// Called before reading a batch of keys to verify their freshness and enforce prefix consistency. /// /// - void BeforeConsistentReadKeyBatchCallback(ReadOnlySpan parameters); + void PreBatchKeyConsistentReadCallback(ReadOnlySpan parameters); /// /// Called after reading a batch of keys to update the session timestamp. /// - bool AfterConsistentReadKeyBatchCallback(int keyCount); + bool PostBatchKeyConsistentReadCallback(int keyCount); #endregion Utilities } diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Interfaces/SessionFunctionsBase.cs b/libs/storage/Tsavorite/cs/src/core/Index/Interfaces/SessionFunctionsBase.cs index ce68dbd1b02..65c5f369b87 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Interfaces/SessionFunctionsBase.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Interfaces/SessionFunctionsBase.cs @@ -191,12 +191,12 @@ public virtual RecordFieldInfo GetUpsertFieldInfo(TKey k /// public virtual void ConvertOutputToHeap(ref TInput input, ref TOutput output) { } - public virtual void BeforeConsistentReadCallback(long hash) { } + public virtual void PreSingleKeyConsistentRead(long hash) { } - public virtual void AfterConsistentReadKeyCallback() { } + public virtual void PostSingleKeyConsistentReadCallback() { } - public virtual void BeforeConsistentReadKeyBatchCallback(ReadOnlySpan parameters) { } + public virtual void PreBatchKeyConsistentReadCallback(ReadOnlySpan parameters) { } - public virtual bool AfterConsistentReadKeyBatchCallback(int keyCount) => true; + public virtual bool PostBatchKeyConsistentReadCallback(int keyCount) => true; } } \ No newline at end of file From 71b1986ea989c21dd0633b38a75eaf4dc1a5b4a5 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Mon, 1 Jun 2026 12:41:42 -0700 Subject: [PATCH 07/17] propagate hash down to read --- .../cs/src/core/ClientSession/ConsistentReadContext.cs | 5 +++-- .../core/ClientSession/TransactionalConsistentReadContext.cs | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/libs/storage/Tsavorite/cs/src/core/ClientSession/ConsistentReadContext.cs b/libs/storage/Tsavorite/cs/src/core/ClientSession/ConsistentReadContext.cs index 433ebb61450..59311074c32 100644 --- a/libs/storage/Tsavorite/cs/src/core/ClientSession/ConsistentReadContext.cs +++ b/libs/storage/Tsavorite/cs/src/core/ClientSession/ConsistentReadContext.cs @@ -50,7 +50,8 @@ public Status Read(TKey key, ref TInput input, ref TOutput output, TContext user { var hash = GetKeyHash(key); Session.functions.PreSingleKeyConsistentRead(hash); - var status = BasicContext.Read(key, ref input, ref output, userContext); + var readOptions = new ReadOptions() { KeyHash = hash }; + var status = BasicContext.Read(key, ref input, ref output, ref readOptions, userContext); Session.functions.PostSingleKeyConsistentReadCallback(); return status; } @@ -98,7 +99,7 @@ public Status Read(TKey key, ref TOutput output, ref ReadOptions readOptions, TC [MethodImpl(MethodImplOptions.AggressiveInlining)] public Status Read(TKey key, ref TInput input, ref TOutput output, ref ReadOptions readOptions, out RecordMetadata recordMetadata, TContext userContext = default) { - var hash = GetKeyHash(key); + var hash = readOptions.KeyHash ?? GetKeyHash(key); Session.functions.PreSingleKeyConsistentRead(hash); var status = BasicContext.Read(key, ref input, ref output, ref readOptions, out recordMetadata, userContext); Session.functions.PostSingleKeyConsistentReadCallback(); diff --git a/libs/storage/Tsavorite/cs/src/core/ClientSession/TransactionalConsistentReadContext.cs b/libs/storage/Tsavorite/cs/src/core/ClientSession/TransactionalConsistentReadContext.cs index 3c5731b38a5..07e611cee2c 100644 --- a/libs/storage/Tsavorite/cs/src/core/ClientSession/TransactionalConsistentReadContext.cs +++ b/libs/storage/Tsavorite/cs/src/core/ClientSession/TransactionalConsistentReadContext.cs @@ -121,7 +121,8 @@ public Status Read(TKey key, ref TInput input, ref TOutput output, TContext user { var hash = GetKeyHash(key); Session.functions.PreSingleKeyConsistentRead(hash); - var status = TransactionalContext.Read(key, ref input, ref output, userContext); + var readOptions = new ReadOptions() { KeyHash = hash }; + var status = TransactionalContext.Read(key, ref input, ref output, ref readOptions, userContext); Session.functions.PostSingleKeyConsistentReadCallback(); return status; } @@ -169,7 +170,7 @@ public Status Read(TKey key, ref TOutput output, ref ReadOptions readOptions, TC [MethodImpl(MethodImplOptions.AggressiveInlining)] public Status Read(TKey key, ref TInput input, ref TOutput output, ref ReadOptions readOptions, out RecordMetadata recordMetadata, TContext userContext = default) { - var hash = GetKeyHash(key); + var hash = readOptions.KeyHash ?? GetKeyHash(key); Session.functions.PreSingleKeyConsistentRead(hash); var status = TransactionalContext.Read(key, ref input, ref output, ref readOptions, out recordMetadata, userContext); Session.functions.PostSingleKeyConsistentReadCallback(); From 8be3c1d9861266aa30166d50e9bcdc56cbcbf5fb Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Tue, 2 Jun 2026 11:18:40 -0700 Subject: [PATCH 08/17] fix tsavorite formatting --- .../cs/benchmark/YCSB.benchmark/SessionFixedLenFunctions.cs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/libs/storage/Tsavorite/cs/benchmark/YCSB.benchmark/SessionFixedLenFunctions.cs b/libs/storage/Tsavorite/cs/benchmark/YCSB.benchmark/SessionFixedLenFunctions.cs index e4d904bae63..52a5bbd3023 100644 --- a/libs/storage/Tsavorite/cs/benchmark/YCSB.benchmark/SessionFixedLenFunctions.cs +++ b/libs/storage/Tsavorite/cs/benchmark/YCSB.benchmark/SessionFixedLenFunctions.cs @@ -200,6 +200,10 @@ public void AfterConsistentReadKeyCallback() { } public void BeforeConsistentReadKeyBatchCallback(ReadOnlySpan parameters) { } public bool AfterConsistentReadKeyBatchCallback(int keyCount) => true; + public void PreSingleKeyConsistentRead(long hash) => throw new NotImplementedException(); + public void PostSingleKeyConsistentReadCallback() => throw new NotImplementedException(); + public void PreBatchKeyConsistentReadCallback(ReadOnlySpan parameters) => throw new NotImplementedException(); + public bool PostBatchKeyConsistentReadCallback(int keyCount) => throw new NotImplementedException(); } static class StaticUtilities From 02c5c82db81576fd9edad0c03cfcf8f73bff9e1a Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Tue, 2 Jun 2026 11:26:28 -0700 Subject: [PATCH 09/17] fix build for BDN --- .../Cluster/ConsistentRead/ConsistentReadContext.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/benchmark/BDN.benchmark/Cluster/ConsistentRead/ConsistentReadContext.cs b/benchmark/BDN.benchmark/Cluster/ConsistentRead/ConsistentReadContext.cs index d463c4f4217..498adec1e62 100644 --- a/benchmark/BDN.benchmark/Cluster/ConsistentRead/ConsistentReadContext.cs +++ b/benchmark/BDN.benchmark/Cluster/ConsistentRead/ConsistentReadContext.cs @@ -52,7 +52,6 @@ public void SetupInstance(ConsistentReadParams parameters) { opt.AofPhysicalSublogCount = 1; opt.AofReplayTaskCount = 4; - opt.EnableFastCommit = true; } if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux)) From 448ea969d7d5ba8dfd8bc5c7a024ff1363d8514a Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Fri, 5 Jun 2026 17:19:15 -0700 Subject: [PATCH 10/17] avoid false sharing at sketchMaxValue; cache sketMaxValue at read side --- .../ReadConsistency/ReadConsistencyManager.cs | 19 +++++++---- .../ReplicaReadSessionContext.cs | 23 +++++++++++-- .../VirtualSublogReplayState.cs | 33 ++++++++++++------- 3 files changed, 54 insertions(+), 21 deletions(-) diff --git a/libs/server/AOF/ReadConsistency/ReadConsistencyManager.cs b/libs/server/AOF/ReadConsistency/ReadConsistencyManager.cs index 41445a23469..fff555edf9d 100644 --- a/libs/server/AOF/ReadConsistency/ReadConsistencyManager.cs +++ b/libs/server/AOF/ReadConsistency/ReadConsistencyManager.cs @@ -124,6 +124,7 @@ public void CheckConsistencyManagerVersion(ref ReplicaReadSessionContext replica replicaReadSessionContext.sessionVersion = CurrentVersion; replicaReadSessionContext.lastVirtualSublogIdx = -1; replicaReadSessionContext.maximumSessionSequenceNumber = 0; + replicaReadSessionContext.ResetCachedSublogMax(); } } @@ -138,19 +139,23 @@ public void CheckConsistencyManagerVersion(ref ReplicaReadSessionContext replica void VerifyKeyFreshness(long keyHash, ref ReplicaReadSessionContext replicaReadSessionContext, TimeSpan timeout, CancellationToken ct) { var virtualSublogIdx = appendOnlyFile.Log.GetVirtualSublogIdx(keyHash); + var initOrSameSublog = replicaReadSessionContext.lastVirtualSublogIdx == -1 || replicaReadSessionContext.lastVirtualSublogIdx == virtualSublogIdx; + var mssn = replicaReadSessionContext.maximumSessionSequenceNumber; // Here we have to wait for replay to catch up // Don't have to wait if reading from same sublog or maximumSessionTimestamp is behind the sublog frontier timestamp - if (replicaReadSessionContext.lastVirtualSublogIdx != -1 && replicaReadSessionContext.lastVirtualSublogIdx != virtualSublogIdx) + if (!initOrSameSublog && mssn >= replicaReadSessionContext.cachedSublogMax[virtualSublogIdx]) { + // Refresh cached view + var sketchMaxValue = vsrs[virtualSublogIdx].Max; + replicaReadSessionContext.cachedSublogMax[virtualSublogIdx] = sketchMaxValue; + // Optimistic check without lock - while (replicaReadSessionContext.maximumSessionSequenceNumber >= GetSublogFrontierSequenceNumber(keyHash)) + if (mssn >= sketchMaxValue) { - vsrs[virtualSublogIdx].WaitForSequenceNumber( - keyHash, - replicaReadSessionContext.maximumSessionSequenceNumber, - timeout, - ct); + vsrs[virtualSublogIdx].WaitForSequenceNumber(mssn, timeout, ct); + // Refresh after wait + replicaReadSessionContext.cachedSublogMax[virtualSublogIdx] = vsrs[virtualSublogIdx].Max; } } diff --git a/libs/server/AOF/ReadConsistency/ReplicaReadSessionContext.cs b/libs/server/AOF/ReadConsistency/ReplicaReadSessionContext.cs index 3fbca3c5ca3..fa50023ec5d 100644 --- a/libs/server/AOF/ReadConsistency/ReplicaReadSessionContext.cs +++ b/libs/server/AOF/ReadConsistency/ReplicaReadSessionContext.cs @@ -11,7 +11,7 @@ namespace Garnet.server { - [StructLayout(LayoutKind.Explicit, Size = 26)] + [StructLayout(LayoutKind.Explicit)] public struct ReplicaReadSessionContext { /// @@ -37,6 +37,24 @@ public struct ReplicaReadSessionContext /// [FieldOffset(24)] public short lastVirtualSublogIdx; + + /// + /// Per-sublog cached maximum sequence numbers. Avoids repeated volatile reads of the + /// shared sketchMax value when the cached value already satisfies the read requirement. + /// Shared across batch/session context copies (same backing array). + /// + [FieldOffset(32)] + public long[] cachedSublogMax; + + /// + /// Resets the cached per-sublog max values (e.g., on version change). + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public readonly void ResetCachedSublogMax() + { + if (cachedSublogMax != null) + Array.Clear(cachedSublogMax); + } } public class ReadSessionState : IDisposable @@ -106,7 +124,8 @@ public ReadSessionState(GarnetAppendOnlyFile appendOnlyFile, GarnetServerOptions { this.appendOnlyFile = appendOnlyFile; this.serverOptions = serverOptions; - replicaReadContext = new() { sessionVersion = -1, maximumSessionSequenceNumber = 0, lastVirtualSublogIdx = -1 }; + var sublogMax = new long[serverOptions.AofVirtualSublogCount]; + replicaReadContext = new() { sessionVersion = -1, maximumSessionSequenceNumber = 0, lastVirtualSublogIdx = -1, cachedSublogMax = sublogMax }; consistentReadCts = new(); readTimeout = serverOptions.ReplicaSyncTimeout; } diff --git a/libs/server/AOF/ReadConsistency/VirtualSublogReplayState.cs b/libs/server/AOF/ReadConsistency/VirtualSublogReplayState.cs index 921864b3781..31cb5fc1d68 100644 --- a/libs/server/AOF/ReadConsistency/VirtualSublogReplayState.cs +++ b/libs/server/AOF/ReadConsistency/VirtualSublogReplayState.cs @@ -3,6 +3,7 @@ using System; using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; using System.Threading; using Garnet.common; using Tsavorite.core; @@ -20,7 +21,16 @@ internal struct VirtualSublogReplayState const int MaxSpinCount = 64; readonly long[] sketch = new long[SketchSlotSize]; - long sketchMaxValue; + + /// + /// Explicit definition to minimize cache invalidation + /// + [StructLayout(LayoutKind.Explicit, Size = 128)] + sealed class SublogReplayStateMax + { + [FieldOffset(64)] public long Value; + } + readonly SublogReplayStateMax sketchMax = new(); /// /// Lock protecting the intrusive waiter list. @@ -32,7 +42,7 @@ internal struct VirtualSublogReplayState /// WaiterNode waiterHead; - public readonly long Max => sketchMaxValue; + public readonly long Max => sketchMax.Value; public VirtualSublogReplayState() { @@ -40,7 +50,7 @@ public VirtualSublogReplayState() if ((size & (size - 1)) != 0) throw new InvalidOperationException($"Size ({SketchSlotSize}) must be a power of 2"); Array.Clear(sketch); - sketchMaxValue = 0; + sketchMax.Value = 0; waiterHead = null; } @@ -53,7 +63,7 @@ public VirtualSublogReplayState() [MethodImpl(MethodImplOptions.AggressiveInlining)] public readonly long GetFrontierSequenceNumber(long hash) => Math.Max(Volatile.Read(ref Unsafe.AsRef(in sketch[GetSketchSlot(hash)])), - Volatile.Read(ref Unsafe.AsRef(in sketchMaxValue))); + Volatile.Read(ref Unsafe.AsRef(in sketchMax.Value))); /// /// Gets the sequence number associated with the specified hash key. @@ -68,7 +78,7 @@ public readonly long GetKeySequenceNumber(long hash) /// Updates are thread-safe and guaranteed to be monotonically increasing. public void UpdateMaxSequenceNumber(long sequenceNumber) { - _ = Utility.MonotonicUpdate(ref sketchMaxValue, sequenceNumber, out _); + _ = Utility.MonotonicUpdate(ref sketchMax.Value, sequenceNumber, out _); SignalWaiters(); } @@ -79,7 +89,7 @@ public void UpdateMaxSequenceNumber(long sequenceNumber) public void UpdateKeySequenceNumber(long hash, long sequenceNumber) { _ = Utility.MonotonicUpdate(ref sketch[GetSketchSlot(hash)], sequenceNumber, out _); - _ = Utility.MonotonicUpdate(ref sketchMaxValue, sequenceNumber, out _); + _ = Utility.MonotonicUpdate(ref sketchMax.Value, sequenceNumber, out _); SignalWaiters(); } @@ -94,7 +104,7 @@ private void SignalWaiters() lock (@lock) { - var currentMax = Volatile.Read(ref sketchMaxValue); + var currentMax = Volatile.Read(ref sketchMax.Value); while (waiterHead != null && waiterHead.TargetSequenceNumber < currentMax) { var node = waiterHead; @@ -108,16 +118,15 @@ private void SignalWaiters() } /// - /// Waits until the session's frontier sequence number for the specified hash reaches or exceeds - /// the given maximum sequence number. + /// Waits until the sublog's maximum sequence number exceeds the given session maximum. /// - public void WaitForSequenceNumber(long hash, long maximumSessionSequenceNumber, TimeSpan timeout, CancellationToken ct) + public void WaitForSequenceNumber(long maximumSessionSequenceNumber, TimeSpan timeout, CancellationToken ct) { // Phase 1: SpinWait — fast path when replay is keeping up var spinner = new SpinWait(); for (var i = 0; i < MaxSpinCount; i++) { - if (maximumSessionSequenceNumber < GetFrontierSequenceNumber(hash)) + if (maximumSessionSequenceNumber < Volatile.Read(ref sketchMax.Value)) return; spinner.SpinOnce(sleep1Threshold: -1); } @@ -129,7 +138,7 @@ public void WaitForSequenceNumber(long hash, long maximumSessionSequenceNumber, lock (@lock) { // Double-check after acquiring lock - if (maximumSessionSequenceNumber < GetFrontierSequenceNumber(hash)) + if (maximumSessionSequenceNumber < Volatile.Read(ref sketchMax.Value)) return; InsertWaiter(node); } From f107f239cc50dc553d2b3de10d917151262f77b6 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Mon, 8 Jun 2026 10:09:08 -0700 Subject: [PATCH 11/17] track ongoing consistent read --- .../ReadConsistency/ReadConsistencyManager.cs | 2 +- .../ReplicaReadSessionContext.cs | 55 ++++++++++++++----- 2 files changed, 43 insertions(+), 14 deletions(-) diff --git a/libs/server/AOF/ReadConsistency/ReadConsistencyManager.cs b/libs/server/AOF/ReadConsistency/ReadConsistencyManager.cs index fff555edf9d..8e4cd5d0a69 100644 --- a/libs/server/AOF/ReadConsistency/ReadConsistencyManager.cs +++ b/libs/server/AOF/ReadConsistency/ReadConsistencyManager.cs @@ -223,7 +223,7 @@ public void PreBatchKeyConsistentRead(ReadOnlySpan key, ref ReplicaReadSes /// /// /// - public bool PreBatchKeyConsistentRead(long hash, ref ReplicaReadSessionContext batchReadContext) + public bool PostBatchKeyConsistentReadValidate(long hash, ref ReplicaReadSessionContext batchReadContext) { var keySequenceNumber = GetKeySequenceNumber(hash); var mSSN = batchReadContext.maximumSessionSequenceNumber; diff --git a/libs/server/AOF/ReadConsistency/ReplicaReadSessionContext.cs b/libs/server/AOF/ReadConsistency/ReplicaReadSessionContext.cs index fa50023ec5d..201f68fb49c 100644 --- a/libs/server/AOF/ReadConsistency/ReplicaReadSessionContext.cs +++ b/libs/server/AOF/ReadConsistency/ReplicaReadSessionContext.cs @@ -140,6 +140,11 @@ public void Dispose() consistentReadCts.Dispose(); } + /// + /// Freshness check before actual read for a single key + /// + /// + /// [MethodImpl(MethodImplOptions.AggressiveInlining)] public void PreSingleKeyConsistentRead(long hash) { @@ -155,9 +160,24 @@ public void PreSingleKeyConsistentRead(long hash) } } + /// + /// Post read update maximum sequence number for a single key. + /// + /// [MethodImpl(MethodImplOptions.AggressiveInlining)] public void PostSingleKeyConsistentReadCallback() - => appendOnlyFile.readConsistencyManager.PostSingleKeyConsistentRead(ref replicaReadContext); + { + if (!inProgress.TryReadLock()) + throw new GarnetException($"Failed to acquire inProgress lock at {nameof(PreSingleKeyConsistentRead)}"); + try + { + appendOnlyFile.readConsistencyManager.PostSingleKeyConsistentRead(ref replicaReadContext); + } + finally + { + inProgress.ReadUnlock(); + } + } /// /// Initialize context for read key batch. @@ -203,21 +223,30 @@ public void PreBatchKeyConsistentReadCallback(ReadOnlySpan param /// public bool PostBatchKeyConsistentReadCallback(int keyCount) { - var consistencyManager = appendOnlyFile.readConsistencyManager; - for (var i = 0; i < keyCount; i++) + if (!inProgress.TryReadLock()) + throw new GarnetException($"Failed to acquire inProgress lock at {nameof(PreSingleKeyConsistentRead)}"); + try { - var hash = keyHashCache[i]; - if (!consistencyManager.PreBatchKeyConsistentRead(hash, ref batchReadContext)) - return false; - } + var consistencyManager = appendOnlyFile.readConsistencyManager; + for (var i = 0; i < keyCount; i++) + { + var hash = keyHashCache[i]; + if (!consistencyManager.PostBatchKeyConsistentReadValidate(hash, ref batchReadContext)) + return false; + } - // Propagate batch context back to session context to maintain prefix consistency - // for subsequent single-key reads across different sublogs. - replicaReadContext.maximumSessionSequenceNumber = batchReadContext.maximumSessionSequenceNumber; - replicaReadContext.lastVirtualSublogIdx = batchReadContext.lastVirtualSublogIdx; - replicaReadContext.lastHash = batchReadContext.lastHash; + // Propagate batch context back to session context to maintain prefix consistency + // for subsequent single-key reads across different sublogs. + replicaReadContext.maximumSessionSequenceNumber = batchReadContext.maximumSessionSequenceNumber; + replicaReadContext.lastVirtualSublogIdx = batchReadContext.lastVirtualSublogIdx; + replicaReadContext.lastHash = batchReadContext.lastHash; - return true; + return true; + } + finally + { + inProgress.ReadUnlock(); + } } } } \ No newline at end of file From 423d76ca946a071de8cb25049130a2b42628791a Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Mon, 8 Jun 2026 12:37:49 -0700 Subject: [PATCH 12/17] implement replay side throttling --- .../AOFReplay/ReplicaReplayDriver.cs | 49 ++++++++++++++++++- libs/host/Configuration/Options.cs | 4 ++ libs/host/defaults.conf | 3 ++ .../ReadConsistency/ReadConsistencyManager.cs | 17 +++++++ .../VirtualSublogReplayState.cs | 5 ++ libs/server/Servers/GarnetServerOptions.cs | 7 +++ 6 files changed, 84 insertions(+), 1 deletion(-) diff --git a/libs/cluster/Server/Replication/ReplicaOps/AOFReplay/ReplicaReplayDriver.cs b/libs/cluster/Server/Replication/ReplicaOps/AOFReplay/ReplicaReplayDriver.cs index 3f04bf1409f..27a32d5bcb6 100644 --- a/libs/cluster/Server/Replication/ReplicaOps/AOFReplay/ReplicaReplayDriver.cs +++ b/libs/cluster/Server/Replication/ReplicaOps/AOFReplay/ReplicaReplayDriver.cs @@ -34,6 +34,8 @@ internal sealed class ReplicaReplayDriver : IBulkLogEntryConsumer, IDisposable readonly TsavoriteLog physicalSublog; readonly bool useChannels = false; + int throttleCounter; + [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool ResumeReplay() => activeWorkerMonitor.TryEnter(); @@ -270,7 +272,52 @@ private unsafe void ConsumeDirect(byte* record, int recordLength, long currentAd } } - public void Throttle() { } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void Throttle() + { + if (serverOptions.AofPhysicalSublogCount <= 1) + return; + + var w = serverOptions.AofReplayMaxDrift; + if (w <= 0) + return; + + if (++throttleCounter < w) + return; + throttleCounter = 0; + ThrottleSlow(w); + + void ThrottleSlow(long w) + { + var rcm = appendOnlyFile.readConsistencyManager; + if (rcm == null) + return; + + var myMax = rcm.GetPhysicalSublogMax(physicalSublogIdx); + + // If we're too far ahead, wait until all peers reach our current max + if (!AllPeersReached(rcm, Math.Max(myMax - w, w))) + { + do + { + cts.Token.ThrowIfCancellationRequested(); + Thread.Yield(); + } while (!AllPeersReached(rcm, myMax)); + } + + bool AllPeersReached(ReadConsistencyManager rcm, long target) + { + for (var i = 0; i < serverOptions.AofPhysicalSublogCount; i++) + { + if (i == physicalSublogIdx) + continue; + if (rcm.GetPhysicalSublogMax(i) < target) + return false; + } + return true; + } + } + } #endregion [MethodImpl(MethodImplOptions.AggressiveInlining)] diff --git a/libs/host/Configuration/Options.cs b/libs/host/Configuration/Options.cs index 58f640068fa..bad0baddfcd 100644 --- a/libs/host/Configuration/Options.cs +++ b/libs/host/Configuration/Options.cs @@ -212,6 +212,9 @@ internal sealed class Options : ICloneable [Option("aof-replay-task-count", Required = false, HelpText = "Number of replay tasks per physical sublog at the replica.")] public int AofReplayTaskCount { get; set; } + [Option("aof-replay-max-drift", Required = false, HelpText = "Maximum allowed drift in key sequence numbers between physical sublog replay drivers. When a driver is ahead of the slowest peer by more than this value, it yields. Only effective when aof-physical-sublog-count > 1. -1 = disabled.")] + public long AofReplayMaxDrift { get; set; } + [IntRangeValidation(0, int.MaxValue)] [Option("aof-tail-witness-freq", Required = false, HelpText = "Polling frequency of the background task responsible for moving time ahead for all physical sublogs (Used only with physical sublog value >1).")] public int AofTailWitnessFreqMs { get; set; } @@ -859,6 +862,7 @@ endpoint is IPEndPoint listenEp && clusterAnnounceEndpoint[0] is IPEndPoint anno AofSegmentSize = AofSegmentSize, AofPhysicalSublogCount = AofPhysicalSublogCount, AofReplayTaskCount = AofReplayTaskCount, + AofReplayMaxDrift = AofReplayMaxDrift, AofTailWitnessFreqMs = AofTailWitnessFreqMs, CommitFrequencyMs = CommitFrequencyMs, WaitForCommit = WaitForCommit.GetValueOrDefault(), diff --git a/libs/host/defaults.conf b/libs/host/defaults.conf index 36db0bb7cca..35e4480b4ff 100644 --- a/libs/host/defaults.conf +++ b/libs/host/defaults.conf @@ -145,6 +145,9 @@ /* Number of replay tasks per physical sublog at the replica. */ "AofReplayTaskCount": 1, + /* Maximum allowed drift in key sequence numbers between physical sublog replay drivers. -1 = disabled. */ + "AofReplayMaxDrift": -1, + /* Polling frequency of the background task responsible for moving time ahead for all physical sublogs (Used only with physical sublog value >1). */ "AofTailWitnessFreqMs": 10, diff --git a/libs/server/AOF/ReadConsistency/ReadConsistencyManager.cs b/libs/server/AOF/ReadConsistency/ReadConsistencyManager.cs index 8e4cd5d0a69..4864991e59b 100644 --- a/libs/server/AOF/ReadConsistency/ReadConsistencyManager.cs +++ b/libs/server/AOF/ReadConsistency/ReadConsistencyManager.cs @@ -54,6 +54,23 @@ public AofAddress GetPhysicalSublogMaxReplayedSequenceNumber() return maxKeySeqNumVector; } + /// + /// Gets the maximum replayed sequence number for a single physical sublog + /// by reading the max across all its virtual sublogs. + /// + /// Physical sublog index. + /// The maximum sequence number observed across all virtual sublogs for this physical sublog. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public long GetPhysicalSublogMax(int physicalSublogIdx) + { + var replayTaskCount = serverOptions.AofReplayTaskCount; + var startIdx = appendOnlyFile.GetVirtualSublogIdx(physicalSublogIdx, 0); + long max = 0; + for (var rt = 0; rt < replayTaskCount; rt++) + max = Math.Max(max, Volatile.Read(ref vsrs[startIdx + rt].MaxRef)); + return max; + } + /// /// Get frontier sequence number for provided hash /// NOTE: Frontier sequence number is maximum sequence number between key specific sequence number and maximum observed sublog sequence number diff --git a/libs/server/AOF/ReadConsistency/VirtualSublogReplayState.cs b/libs/server/AOF/ReadConsistency/VirtualSublogReplayState.cs index 31cb5fc1d68..5b4533c6fc3 100644 --- a/libs/server/AOF/ReadConsistency/VirtualSublogReplayState.cs +++ b/libs/server/AOF/ReadConsistency/VirtualSublogReplayState.cs @@ -44,6 +44,11 @@ sealed class SublogReplayStateMax public readonly long Max => sketchMax.Value; + /// + /// Reference to the max value for Volatile.Read access from external callers. + /// + public ref long MaxRef => ref sketchMax.Value; + public VirtualSublogReplayState() { var size = SketchSlotSize; diff --git a/libs/server/Servers/GarnetServerOptions.cs b/libs/server/Servers/GarnetServerOptions.cs index 80a8232a4e1..f47582e8181 100644 --- a/libs/server/Servers/GarnetServerOptions.cs +++ b/libs/server/Servers/GarnetServerOptions.cs @@ -87,6 +87,13 @@ public class GarnetServerOptions : ServerOptions /// public int AofReplayTaskCount = 1; + /// + /// Maximum allowed drift (in key sequence numbers) between the fastest and slowest physical sublog replay drivers. + /// When a driver is ahead of the slowest peer by more than this value, it yields until the gap closes. + /// Only effective when AofPhysicalSublogCount > 1. -1 = disabled (no throttling). + /// + public long AofReplayMaxDrift = -1; + /// /// Polling frequency of the background task responsible for moving time ahead for all physical sublogs (Used only with physical sublog value >1). /// From 69c4e4df1d21426dd013f9701d8aeafad86329b9 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Mon, 8 Jun 2026 16:51:12 -0700 Subject: [PATCH 13/17] fix log messages --- .../server/AOF/ReadConsistency/ReplicaReadSessionContext.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/libs/server/AOF/ReadConsistency/ReplicaReadSessionContext.cs b/libs/server/AOF/ReadConsistency/ReplicaReadSessionContext.cs index 201f68fb49c..25802c6a072 100644 --- a/libs/server/AOF/ReadConsistency/ReplicaReadSessionContext.cs +++ b/libs/server/AOF/ReadConsistency/ReplicaReadSessionContext.cs @@ -168,7 +168,7 @@ public void PreSingleKeyConsistentRead(long hash) public void PostSingleKeyConsistentReadCallback() { if (!inProgress.TryReadLock()) - throw new GarnetException($"Failed to acquire inProgress lock at {nameof(PreSingleKeyConsistentRead)}"); + throw new GarnetException($"Failed to acquire inProgress lock at {nameof(PostSingleKeyConsistentReadCallback)}"); try { appendOnlyFile.readConsistencyManager.PostSingleKeyConsistentRead(ref replicaReadContext); @@ -186,7 +186,7 @@ public void PostSingleKeyConsistentReadCallback() public void PreBatchKeyConsistentReadCallback(ReadOnlySpan parameters) { if (!inProgress.TryReadLock()) - throw new GarnetException($"Failed to acquire inProgress lock at {nameof(PreSingleKeyConsistentRead)}"); + throw new GarnetException($"Failed to acquire inProgress lock at {nameof(PreBatchKeyConsistentReadCallback)}"); try { var keyCount = parameters.Length; @@ -224,7 +224,7 @@ public void PreBatchKeyConsistentReadCallback(ReadOnlySpan param public bool PostBatchKeyConsistentReadCallback(int keyCount) { if (!inProgress.TryReadLock()) - throw new GarnetException($"Failed to acquire inProgress lock at {nameof(PreSingleKeyConsistentRead)}"); + throw new GarnetException($"Failed to acquire inProgress lock at {nameof(PostBatchKeyConsistentReadCallback)}"); try { var consistencyManager = appendOnlyFile.readConsistencyManager; From f38173f5d72025be9b2e2e9cd423a3dc1092845e Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Mon, 8 Jun 2026 16:53:15 -0700 Subject: [PATCH 14/17] reset consistentReadActive = false --- libs/server/Resp/RespServerSession.cs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/libs/server/Resp/RespServerSession.cs b/libs/server/Resp/RespServerSession.cs index 6bf1d2719ca..9cb2d1fe4ae 100644 --- a/libs/server/Resp/RespServerSession.cs +++ b/libs/server/Resp/RespServerSession.cs @@ -501,7 +501,10 @@ public override int TryConsumeMessages(byte* reqBuffer, int bytesReceived) else { if (consistentReadActive) + { SwitchActiveDatabaseSession(databaseSessions.Map[0]); + consistentReadActive = false; + } txnSkip = false; ProcessMessages(ref basicGarnetApi, ref transactionalGarnetApi); txnSkip = txnManager.IsSkippingOperations(); From 8ccf9aae3266fa932eebcb395a549161c445ec14 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Mon, 8 Jun 2026 17:21:57 -0700 Subject: [PATCH 15/17] fix possible race --- .../ReadConsistency/VirtualSublogReplayState.cs | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/libs/server/AOF/ReadConsistency/VirtualSublogReplayState.cs b/libs/server/AOF/ReadConsistency/VirtualSublogReplayState.cs index 5b4533c6fc3..259db2bff0b 100644 --- a/libs/server/AOF/ReadConsistency/VirtualSublogReplayState.cs +++ b/libs/server/AOF/ReadConsistency/VirtualSublogReplayState.cs @@ -114,8 +114,7 @@ private void SignalWaiters() { var node = waiterHead; waiterHead = node.Next; - if (waiterHead != null) - waiterHead.Prev = null; + _ = (waiterHead?.Prev = null); node.Next = null; node.Signal.Set(); } @@ -142,10 +141,20 @@ public void WaitForSequenceNumber(long maximumSessionSequenceNumber, TimeSpan ti lock (@lock) { - // Double-check after acquiring lock + // Insert first, then re-check: if an updater raced with us + // (SignalWaiters saw waiterHead == null before we inserted), + // we catch it here and unlink before blocking. + InsertWaiter(node); if (maximumSessionSequenceNumber < Volatile.Read(ref sketchMax.Value)) + { + // Unlink directly — we already hold the lock + if (node.Prev != null) + node.Prev.Next = node.Next; + else if (waiterHead == node) + waiterHead = node.Next; + _ = (node.Next?.Prev = node.Prev); return; - InsertWaiter(node); + } } try From e70f61cb9fc85dd8f1c8e5906f4d9b9023848185 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Mon, 8 Jun 2026 21:07:06 -0700 Subject: [PATCH 16/17] ensure complete pending is awaited before post single key callback executes --- .../ClientSession/ConsistentReadContext.cs | 4 +-- .../TransactionalConsistentReadContext.cs | 27 ++++++++++++++----- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/libs/storage/Tsavorite/cs/src/core/ClientSession/ConsistentReadContext.cs b/libs/storage/Tsavorite/cs/src/core/ClientSession/ConsistentReadContext.cs index 59311074c32..fbcb1272f18 100644 --- a/libs/storage/Tsavorite/cs/src/core/ClientSession/ConsistentReadContext.cs +++ b/libs/storage/Tsavorite/cs/src/core/ClientSession/ConsistentReadContext.cs @@ -162,9 +162,9 @@ public async ValueTask CompletePendingAsync(bool waitForCommit = false, Cancella /// public async ValueTask> CompletePendingWithOutputsAsync(bool waitForCommit = false, CancellationToken token = default) { - var status = BasicContext.CompletePendingWithOutputsAsync(waitForCommit, token); + var status = await BasicContext.CompletePendingWithOutputsAsync(waitForCommit, token).ConfigureAwait(false); Session.functions.PostSingleKeyConsistentReadCallback(); - return await status.ConfigureAwait(false); + return status; } /// diff --git a/libs/storage/Tsavorite/cs/src/core/ClientSession/TransactionalConsistentReadContext.cs b/libs/storage/Tsavorite/cs/src/core/ClientSession/TransactionalConsistentReadContext.cs index 07e611cee2c..2c1a303d122 100644 --- a/libs/storage/Tsavorite/cs/src/core/ClientSession/TransactionalConsistentReadContext.cs +++ b/libs/storage/Tsavorite/cs/src/core/ClientSession/TransactionalConsistentReadContext.cs @@ -215,19 +215,34 @@ public void ReadWithPrefetch(ref TBatch batch, TContext userContext = de /// public bool CompletePending(bool wait = false, bool spinWaitForCommit = false) - => TransactionalContext.CompletePending(wait, spinWaitForCommit); + { + var status = TransactionalContext.CompletePending(wait, spinWaitForCommit); + Session.functions.PostSingleKeyConsistentReadCallback(); + return status; + } /// public bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false) - => TransactionalContext.CompletePendingWithOutputs(out completedOutputs, wait, spinWaitForCommit); + { + var status = TransactionalContext.CompletePendingWithOutputs(out completedOutputs, wait, spinWaitForCommit); + Session.functions.PostSingleKeyConsistentReadCallback(); + return status; + } /// - public ValueTask CompletePendingAsync(bool waitForCommit = false, CancellationToken token = default) - => TransactionalContext.CompletePendingAsync(waitForCommit, token); + public async ValueTask CompletePendingAsync(bool waitForCommit = false, CancellationToken token = default) + { + await TransactionalContext.CompletePendingAsync(waitForCommit, token).ConfigureAwait(false); + Session.functions.PostSingleKeyConsistentReadCallback(); + } /// - public ValueTask> CompletePendingWithOutputsAsync(bool waitForCommit = false, CancellationToken token = default) - => TransactionalContext.CompletePendingWithOutputsAsync(waitForCommit, token); + public async ValueTask> CompletePendingWithOutputsAsync(bool waitForCommit = false, CancellationToken token = default) + { + var status = await TransactionalContext.CompletePendingWithOutputsAsync(waitForCommit, token).ConfigureAwait(false); + Session.functions.PostSingleKeyConsistentReadCallback(); + return status; + } /// [MethodImpl(MethodImplOptions.AggressiveInlining)] From 74ee331ea6765f9ee169a5219affbab229ddbe1a Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Tue, 9 Jun 2026 11:33:39 -0700 Subject: [PATCH 17/17] better logging --- .../ReplicaOps/AOFReplay/ReplicaReplayTask.cs | 36 ++++++++++++++++--- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/libs/cluster/Server/Replication/ReplicaOps/AOFReplay/ReplicaReplayTask.cs b/libs/cluster/Server/Replication/ReplicaOps/AOFReplay/ReplicaReplayTask.cs index 7822952c3b2..87c1da05eaf 100644 --- a/libs/cluster/Server/Replication/ReplicaOps/AOFReplay/ReplicaReplayTask.cs +++ b/libs/cluster/Server/Replication/ReplicaOps/AOFReplay/ReplicaReplayTask.cs @@ -63,7 +63,7 @@ internal async Task FullPageBasedBackgroundReplayAsync() { await replayBatchContext.LeaderFollowerBarrier.WaitReadyWorkAsync(cancellationToken: cts.Token).ConfigureAwait(false); } - catch (TaskCanceledException) when (cts.Token.IsCancellationRequested) + catch (OperationCanceledException) when (cts.Token.IsCancellationRequested) { // Suppress the exception if the task was cancelled because of store wrapper disposal } @@ -131,7 +131,7 @@ internal async Task FullPageBasedBackgroundReplayAsync() appendOnlyFile.readConsistencyManager.UpdateVirtualSublogMaxSequenceNumber(virtualSublogIdx, nextAddress); } } - catch (TaskCanceledException) when (cts.Token.IsCancellationRequested) + catch (OperationCanceledException) when (cts.Token.IsCancellationRequested) { // Suppress the exception if the task was cancelled because of store wrapper disposal } @@ -143,8 +143,21 @@ internal async Task FullPageBasedBackgroundReplayAsync() } finally { - // Signal work completion after processing - replayBatchContext.LeaderFollowerBarrier.SignalCompleted(); + // Signal work completion after processing. + // Pass cancellation token so participants are unblocked if the leader + // times out in WaitCompleted and never calls Release(). + try + { + replayBatchContext.LeaderFollowerBarrier.SignalCompleted(cts.Token); + } + catch (OperationCanceledException) when (cts.Token.IsCancellationRequested) + { + // Suppress: leader timed out or store is disposing; exit cleanly. + } + catch (Exception ex) + { + logger?.LogError(ex, "{method} failed at SignalCompleted", nameof(FullPageBasedBackgroundReplayAsync)); + } } } } @@ -178,7 +191,20 @@ internal async Task ChannelBasedBackgroundReplayAsync() } // Signal work completion after processing - replayBatchContext.LeaderFollowerBarrier.SignalCompleted(); + try + { + replayBatchContext.LeaderFollowerBarrier.SignalCompleted(cts.Token); + } + catch (OperationCanceledException) when (cts.Token.IsCancellationRequested) + { + // Suppress: leader timed out or store is disposing; exit cleanly. + break; + } + catch (Exception ex) + { + logger?.LogError(ex, "{method} failed at SignalCompleted", nameof(ChannelBasedBackgroundReplayAsync)); + break; + } } } }