Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System.Net;
using System.Runtime.InteropServices;
using Embedded.server;
using Garnet.cluster;
using Garnet.common;
using Garnet.server;

namespace BDN.benchmark.Cluster.ConsistentRead
{
/// <summary>
/// Context for consistent read benchmarks — sets up an embedded Garnet server
/// with cluster + AOF + optional multilog, and optionally forces replica mode.
/// </summary>
unsafe class ConsistentReadContext
{
EmbeddedRespServer server;
RespServerSession session;
readonly BenchUtils benchUtils = new();
readonly int port = 7100;

public static ReadOnlySpan<byte> keyTag => "{0}"u8;
public Request getRequest;
public Request mgetRequest;

public void Dispose()
{
session.Dispose();
server.Dispose();
}

public void SetupInstance(ConsistentReadParams parameters)
{
var opt = new GarnetServerOptions
{
QuietMode = true,
EnableCluster = true,
EndPoints = [new IPEndPoint(IPAddress.Loopback, port)],
CleanClusterConfig = true,
ClusterAnnounceEndpoint = new IPEndPoint(IPAddress.Loopback, port),
EnableAOF = true,
UseAofNullDevice = true,
FastAofTruncate = true,
CommitFrequencyMs = -1,
AofPageSize = "128m",
AofMemorySize = "256m",
};

if (parameters.multiLogEnabled)
{
opt.AofPhysicalSublogCount = 1;
opt.AofReplayTaskCount = 4;
}

if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux))
opt.CheckpointDir = "/tmp";

server = new EmbeddedRespServer(opt, null, new GarnetServerEmbedded());
session = server.GetRespSession();

// Assign all slots
AddSlotRange([(0, 16383)]);

// Populate keys while still primary
PopulateKeys();

// If replica mode, force replica role and pre-seed sequence numbers
if (parameters.replicaMode && parameters.multiLogEnabled)
{
ForceReplicaRole();
PreSeedSequenceNumbers();
}
}

void AddSlotRange(List<(int, int)> slotRanges)
{
foreach (var slotRange in slotRanges)
{
var cmd = $"*4\r\n$7\r\nCLUSTER\r\n$13\r\nADDSLOTSRANGE\r\n" +
$"${NumUtils.CountDigits(slotRange.Item1)}\r\n{slotRange.Item1}\r\n" +
$"${NumUtils.CountDigits(slotRange.Item2)}\r\n{slotRange.Item2}\r\n";
var bytes = System.Text.Encoding.ASCII.GetBytes(cmd);
fixed (byte* req = bytes)
_ = session.TryConsumeMessages(req, bytes.Length);
}
}

void PopulateKeys()
{
const int batchSize = 100;
const int keySize = 8;
const int valueSize = 32;

var pairs = new (byte[], byte[])[batchSize];
for (var i = 0; i < batchSize; i++)
{
pairs[i] = (new byte[keySize], new byte[valueSize]);
keyTag.CopyTo(pairs[i].Item1.AsSpan());
benchUtils.RandomBytes(ref pairs[i].Item1, startOffset: keyTag.Length);
benchUtils.RandomBytes(ref pairs[i].Item2);
}

// SET all keys
var setByteCount = batchSize * ("*3\r\n$3\r\nSET\r\n"u8.Length + 1 + NumUtils.CountDigits(keySize) + 2 + keySize + 2 + 1 + NumUtils.CountDigits(valueSize) + 2 + valueSize + 2);
var setReq = new Request(setByteCount);
var curr = setReq.ptr;
var end = curr + setReq.buffer.Length;
for (var i = 0; i < batchSize; i++)
{
_ = RespWriteUtils.TryWriteArrayLength(3, ref curr, end);
_ = RespWriteUtils.TryWriteBulkString("SET"u8, ref curr, end);
_ = RespWriteUtils.TryWriteBulkString(pairs[i].Item1, ref curr, end);
_ = RespWriteUtils.TryWriteBulkString(pairs[i].Item2, ref curr, end);
}
_ = session.TryConsumeMessages(setReq.ptr, setReq.buffer.Length);

// Build GET request buffer
var getByteCount = batchSize * ("*2\r\n$3\r\nGET\r\n"u8.Length + 1 + NumUtils.CountDigits(keySize) + 2 + keySize + 2);
getRequest = new Request(getByteCount);
curr = getRequest.ptr;
end = curr + getRequest.buffer.Length;
for (var i = 0; i < batchSize; i++)
{
_ = RespWriteUtils.TryWriteArrayLength(2, ref curr, end);
_ = RespWriteUtils.TryWriteBulkString("GET"u8, ref curr, end);
_ = RespWriteUtils.TryWriteBulkString(pairs[i].Item1, ref curr, end);
}

// Build MGET request buffer
var mGetHeaderSize = 1 + NumUtils.CountDigits(1 + batchSize) + 2 + "$4\r\nMGET\r\n"u8.Length;
var getRespSize = 1 + NumUtils.CountDigits(keySize) + 2 + keySize + 2;
var mGetByteCount = mGetHeaderSize + (batchSize * getRespSize);
mgetRequest = new Request(mGetByteCount);
curr = mgetRequest.ptr;
end = curr + mgetRequest.buffer.Length;
_ = RespWriteUtils.TryWriteArrayLength(1 + batchSize, ref curr, end);
_ = RespWriteUtils.TryWriteBulkString("MGET"u8, ref curr, end);
for (var i = 0; i < batchSize; i++)
_ = RespWriteUtils.TryWriteBulkString(pairs[i].Item1, ref curr, end);
}

void ForceReplicaRole()
{
var clusterProvider = (ClusterProvider)server.StoreWrapper.clusterProvider;
clusterProvider.clusterManager.TrySetLocalNodeRole(NodeRole.REPLICA);
}

void PreSeedSequenceNumbers()
{
// Advance all virtual sublog frontier sequence numbers to long.MaxValue
// so that consistent reads never block waiting for replay to catch up.
var appendOnlyFile = server.StoreWrapper.appendOnlyFile;
var readConsistencyManager = appendOnlyFile.readConsistencyManager;
if (readConsistencyManager == null)
return;

var physicalSublogCount = appendOnlyFile.serverOptions.AofPhysicalSublogCount;
for (var i = 0; i < physicalSublogCount; i++)
readConsistencyManager.UpdatePhysicalSublogMaxSequenceNumber(i, long.MaxValue);
}

public void Consume(byte* ptr, int length)
=> session.TryConsumeMessages(ptr, length);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using BenchmarkDotNet.Attributes;

namespace BDN.benchmark.Cluster.ConsistentRead
{
/// <summary>
/// Benchmarks for GET operations exercising the consistent read path
/// in cluster mode with multilog enabled.
/// </summary>
[MemoryDiagnoser]
public unsafe class ConsistentReadOperations
{
/// <summary>
/// Benchmark parameters
/// </summary>
[ParamsSource(nameof(ConsistentReadParamsProvider))]
public ConsistentReadParams Params { get; set; }

/// <summary>
/// Parameters provider
/// </summary>
public IEnumerable<ConsistentReadParams> ConsistentReadParamsProvider()
{
yield return new(false, false); // SingleLog (baseline)
yield return new(true, false); // MultiLog+Primary
yield return new(true, true); // MultiLog+Replica (exercises consistent read)
}

ConsistentReadContext cc;

/// <summary>
/// Global setup
/// </summary>
[GlobalSetup]
public void GlobalSetup()
{
cc = new ConsistentReadContext();
cc.SetupInstance(Params);
}

/// <summary>
/// Global cleanup
/// </summary>
[GlobalCleanup]
public void GlobalCleanup()
{
cc.Dispose();
}

/// <summary>
/// GET benchmark — 100 sequential GETs per invocation.
/// Exercises: sublog idx calc, sketch lookup, inProgress lock, ResetTimeoutCts, SwitchActiveDatabaseSession.
/// </summary>
[Benchmark]
public void Get()
{
cc.Consume(cc.getRequest.ptr, cc.getRequest.buffer.Length);
}

/// <summary>
/// MGET benchmark — single MGET with 100 keys per invocation.
/// Exercises: batch consistent read path (BeforeConsistentReadKeyBatch).
/// </summary>
[Benchmark]
public void MGet()
{
cc.Consume(cc.mgetRequest.ptr, cc.mgetRequest.buffer.Length);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

namespace BDN.benchmark.Cluster.ConsistentRead
{
/// <summary>
/// Parameters for consistent read benchmarks
/// </summary>
public struct ConsistentReadParams
{
/// <summary>
/// Whether multi-log (single physical log + multi-replay) is enabled
/// </summary>
public bool multiLogEnabled;

/// <summary>
/// Whether the node operates in replica mode (exercises consistent read path)
/// </summary>
public bool replicaMode;

/// <summary>
/// Constructor
/// </summary>
public ConsistentReadParams(bool multiLogEnabled, bool replicaMode)
{
this.multiLogEnabled = multiLogEnabled;
this.replicaMode = replicaMode;
}

/// <summary>
/// String representation for BDN output
/// </summary>
public override string ToString()
{
if (!multiLogEnabled && !replicaMode)
return "SingleLog";

if (multiLogEnabled && !replicaMode)
return "MultiLog+Primary";

if (multiLogEnabled && replicaMode)
return "MultiLog+Replica";

return "Invalid";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ internal sealed class ReplicaReplayDriver : IBulkLogEntryConsumer, IDisposable
readonly TsavoriteLog physicalSublog;
readonly bool useChannels = false;

int throttleCounter;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool ResumeReplay() => activeWorkerMonitor.TryEnter();

Expand Down Expand Up @@ -270,7 +272,52 @@ private unsafe void ConsumeDirect(byte* record, int recordLength, long currentAd
}
}

public void Throttle() { }
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Throttle()
{
if (serverOptions.AofPhysicalSublogCount <= 1)
return;

var w = serverOptions.AofReplayMaxDrift;
if (w <= 0)
return;

if (++throttleCounter < w)
return;
throttleCounter = 0;
ThrottleSlow(w);

void ThrottleSlow(long w)
{
var rcm = appendOnlyFile.readConsistencyManager;
if (rcm == null)
return;

var myMax = rcm.GetPhysicalSublogMax(physicalSublogIdx);

// If we're too far ahead, wait until all peers reach our current max
if (!AllPeersReached(rcm, Math.Max(myMax - w, w)))
{
do
{
cts.Token.ThrowIfCancellationRequested();
Thread.Yield();
} while (!AllPeersReached(rcm, myMax));
}

bool AllPeersReached(ReadConsistencyManager rcm, long target)
{
for (var i = 0; i < serverOptions.AofPhysicalSublogCount; i++)
{
if (i == physicalSublogIdx)
continue;
if (rcm.GetPhysicalSublogMax(i) < target)
return false;
}
return true;
}
}
}
#endregion

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down
4 changes: 4 additions & 0 deletions libs/host/Configuration/Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ internal sealed class Options : ICloneable
[Option("aof-replay-task-count", Required = false, HelpText = "Number of replay tasks per physical sublog at the replica.")]
public int AofReplayTaskCount { get; set; }

[Option("aof-replay-max-drift", Required = false, HelpText = "Maximum allowed drift in key sequence numbers between physical sublog replay drivers. When a driver is ahead of the slowest peer by more than this value, it yields. Only effective when aof-physical-sublog-count > 1. -1 = disabled.")]
public long AofReplayMaxDrift { get; set; }

[IntRangeValidation(0, int.MaxValue)]
[Option("aof-tail-witness-freq", Required = false, HelpText = "Polling frequency of the background task responsible for moving time ahead for all physical sublogs (Used only with physical sublog value >1).")]
public int AofTailWitnessFreqMs { get; set; }
Expand Down Expand Up @@ -859,6 +862,7 @@ endpoint is IPEndPoint listenEp && clusterAnnounceEndpoint[0] is IPEndPoint anno
AofSegmentSize = AofSegmentSize,
AofPhysicalSublogCount = AofPhysicalSublogCount,
AofReplayTaskCount = AofReplayTaskCount,
AofReplayMaxDrift = AofReplayMaxDrift,
AofTailWitnessFreqMs = AofTailWitnessFreqMs,
CommitFrequencyMs = CommitFrequencyMs,
WaitForCommit = WaitForCommit.GetValueOrDefault(),
Expand Down
3 changes: 3 additions & 0 deletions libs/host/defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@
/* Number of replay tasks per physical sublog at the replica. */
"AofReplayTaskCount": 1,

/* Maximum allowed drift in key sequence numbers between physical sublog replay drivers. -1 = disabled. */
"AofReplayMaxDrift": -1,

/* Polling frequency of the background task responsible for moving time ahead for all physical sublogs (Used only with physical sublog value >1). */
"AofTailWitnessFreqMs": 10,

Expand Down
2 changes: 1 addition & 1 deletion libs/server/AOF/GarnetLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public static long HASH(ReadOnlySpan<byte> key)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public int GetReplayTaskIdx(long hash) => (int)(((ulong)hash / (uint)physicalSublogCount) % (uint)replayTaskCount);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public int GetVirtualSublogIdx(long hash) => GetPhysicalSublogIdx(hash) * replayTaskCount + GetReplayTaskIdx(hash);
public int GetVirtualSublogIdx(long hash) => (GetPhysicalSublogIdx(hash) * replayTaskCount) + GetReplayTaskIdx(hash);

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public int GetPhysicalSublogIdx(ReadOnlySpan<byte> key) => GetPhysicalSublogIdx(HASH(key));
Expand Down
Loading
Loading