Skip to content
Merged
6 changes: 2 additions & 4 deletions libs/cluster/Server/ClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,8 @@ public bool AllowDataLoss
=> serverOptions.AllowDataLoss;

/// <inheritdoc />
public void Recover()
{
replicationManager.Recover();
}
public ValueTask RecoverAsync()
=> replicationManager.RecoverAsync();

/// <inheritdoc />
public bool PreventRoleChange()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ async Task<string> ReplicaSyncAttachTaskAsync(bool downgradeLock, bool forceAsyn
cEntry = GetLatestCheckpointEntryFromDisk();
logger?.LogCheckpointEntry(LogLevel.Information, nameof(ReplicaSyncAttachTaskAsync), cEntry);

storeWrapper.RecoverAOF();
await storeWrapper.RecoverAOFAsync().ConfigureAwait(false);
logger?.LogInformation("InitiateReplicaSync: AOF BeginAddress:{beginAddress} AOF TailAddress:{tailAddress}", storeWrapper.appendOnlyFile.Log.BeginAddress, storeWrapper.appendOnlyFile.Log.TailAddress);

var beginAddress = storeWrapper.appendOnlyFile.Log.BeginAddress;
Expand Down Expand Up @@ -301,10 +301,12 @@ public AofAddress TryReplicaDiskbasedRecovery(
remoteCheckpoint.metadata.storeIndexToken,
remoteCheckpoint.metadata.storeHlogToken);

storeWrapper.RecoverCheckpoint(
#pragma warning disable VSTHRD002 // The replica-recovery RESP path is synchronous and must complete before sending a response.
storeWrapper.RecoverCheckpointAsync(
replicaRecover: true,
recoverStoreFromToken,
remoteCheckpoint.metadata);
remoteCheckpoint.metadata).AsTask().GetAwaiter().GetResult();
#pragma warning restore VSTHRD002

if (replayAOFMap > 0)
{
Expand Down
14 changes: 7 additions & 7 deletions libs/cluster/Server/Replication/ReplicationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -509,20 +509,20 @@ public void Dispose()
/// <summary>
/// Main recover method for replication
/// </summary>
public void Recover()
public async ValueTask RecoverAsync()
{
var nodeRole = clusterProvider.clusterManager.CurrentConfig.LocalNodeRole;

switch (nodeRole)
{
case NodeRole.PRIMARY:
RecoverCheckpointAndAOF();
await RecoverCheckpointAndAOFAsync().ConfigureAwait(false);
break;
case NodeRole.REPLICA:
// If configured, load from disk - otherwise wait to connect with a Primary
if (clusterProvider.serverOptions.ClusterReplicaResumeWithData)
{
RecoverCheckpointAndAOF();
await RecoverCheckpointAndAOFAsync().ConfigureAwait(false);
}

break;
Expand All @@ -535,10 +535,10 @@ public void Recover()
/// <summary>
/// Recover whatever is available from <see cref="storeWrapper"/>.
/// </summary>
private void RecoverCheckpointAndAOF()
private async ValueTask RecoverCheckpointAndAOFAsync()
{
storeWrapper.RecoverCheckpoint();
storeWrapper.RecoverAOF();
await storeWrapper.RecoverCheckpointAsync().ConfigureAwait(false);
await storeWrapper.RecoverAOFAsync().ConfigureAwait(false);
if (clusterProvider.serverOptions.EnableAOF)
{
// If recovered checkpoint corresponds to an unavailable AOF address, we initialize AOF to that address
Expand All @@ -555,7 +555,7 @@ private void RecoverCheckpointAndAOF()

// First recover and then load latest checkpoint info in-memory
if (!InitializeCheckpointStore())
logger?.LogWarning("Failed acquiring latest memory checkpoint metadata at {method}", nameof(RecoverCheckpointAndAOF));
logger?.LogWarning("Failed acquiring latest memory checkpoint metadata at {method}", nameof(RecoverCheckpointAndAOFAsync));
}

/// <summary>
Expand Down
4 changes: 3 additions & 1 deletion libs/host/GarnetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,9 @@ private GarnetAppendOnlyFile CreateAOF(int dbId)
/// </summary>
public void Start()
{
Provider.Recover();
#pragma warning disable VSTHRD002 // Server startup is synchronous and must complete recovery before accepting connections.
Provider.RecoverAsync().AsTask().GetAwaiter().GetResult();
#pragma warning restore VSTHRD002
for (var i = 0; i < servers.Length; i++)
servers[i].Start();
Provider.Start();
Expand Down
9 changes: 2 additions & 7 deletions libs/server/AOF/GarnetLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,8 @@ public AofAddress MemorySizeBytes
}
}

public void Recover()
{
if (singleLog != null)
singleLog.Recover();
else
shardedLog.Recover();
}
public ValueTask RecoverAsync()
=> singleLog != null ? singleLog.RecoverAsync() : shardedLog.RecoverAsync();

public bool RecoverLatestSequenceNumber(out long recoverUntilSequenceNumber)
{
Expand Down
5 changes: 3 additions & 2 deletions libs/server/AOF/ShardedLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Garnet.common;
using Microsoft.Extensions.Logging;
using Tsavorite.core;
Expand Down Expand Up @@ -166,10 +167,10 @@ public AofAddress MemorySizeBytes
}
}

public void Recover()
public async ValueTask RecoverAsync()
{
foreach (var log in sublog)
log.Recover();
await log.RecoverAsync().ConfigureAwait(false);
}

public void Reset()
Expand Down
3 changes: 2 additions & 1 deletion libs/server/AOF/SingleLog.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Tsavorite.core;

Expand Down Expand Up @@ -39,7 +40,7 @@ public class SingleLog(TsavoriteLogSettings logSettings, ILogger logger = null)

public AofAddress MemorySizeBytes => AofAddress.Create(1, value: log.MemorySizeBytes);

public void Recover() => log.Recover();
public ValueTask RecoverAsync() => log.RecoverAsync();
public void Reset() => log.Reset();

public void Dispose()
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Cluster/IClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public interface IClusterProvider : IDisposable
/// <summary>
/// Recover the cluster
/// </summary>
void Recover();
ValueTask RecoverAsync();

/// <summary>
/// Reset gossip stats
Expand Down
19 changes: 8 additions & 11 deletions libs/server/Databases/DatabaseManagerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ internal abstract class DatabaseManagerBase : IDatabaseManager
public abstract void ResumeCheckpoints(int dbId);

/// <inheritdoc/>
public abstract void RecoverCheckpoint(bool replicaRecover = false, bool recoverFromToken = false, CheckpointMetadata metadata = null);
public abstract ValueTask RecoverCheckpointAsync(bool replicaRecover = false, bool recoverFromToken = false, CheckpointMetadata metadata = null);

/// <inheritdoc/>
public abstract Task<bool> TakeCheckpointAsync(bool background, int dbId = -1, CancellationToken token = default, ILogger logger = null);
Expand All @@ -57,7 +57,7 @@ public abstract Task TaskCheckpointBasedOnAofSizeLimitAsync(long aofSizeLimit,
public abstract Task WaitForCommitToAofAsync(CancellationToken token = default, ILogger logger = null);

/// <inheritdoc/>
public abstract void RecoverAOF();
public abstract ValueTask RecoverAOFAsync();

/// <inheritdoc/>
public abstract AofAddress ReplayAOF(AofAddress untilAddress);
Expand Down Expand Up @@ -164,18 +164,15 @@ protected DatabaseManagerBase(StoreWrapper.DatabaseCreatorDelegate createDatabas
/// Recover single database from checkpoint
/// </summary>
/// <param name="db">Database to recover</param>
/// <param name="storeVersion">Store version</param>
protected void RecoverDatabaseCheckpoint(GarnetDatabase db, out long storeVersion)
protected async ValueTask<long> RecoverDatabaseCheckpointAsync(GarnetDatabase db)
{
storeVersion = 0;

storeVersion = db.Store.Recover();
var storeVersion = await db.Store.RecoverAsync().ConfigureAwait(false);
Logger?.LogInformation("Recovered store to version {storeVersion}", storeVersion);

if (storeVersion > 0)
{
db.LastSaveTime = DateTimeOffset.UtcNow;
}

return storeVersion;
}

/// <summary>
Expand Down Expand Up @@ -227,11 +224,11 @@ protected static void ResumeCheckpoints(GarnetDatabase db)
/// Recover a single database from AOF
/// </summary>
/// <param name="db">Database to recover</param>
protected void RecoverDatabaseAOF(GarnetDatabase db)
protected async ValueTask RecoverDatabaseAOFAsync(GarnetDatabase db)
{
if (db.AppendOnlyFile == null) return;

db.AppendOnlyFile.Log.Recover();
await db.AppendOnlyFile.Log.RecoverAsync().ConfigureAwait(false);
Logger?.LogInformation("Recovered AOF: begin address = {beginAddress}, tail address = {tailAddress}, DB ID: {id}",
db.AppendOnlyFile.Log.BeginAddress, db.AppendOnlyFile.Log.TailAddress, db.Id);
}
Expand Down
4 changes: 2 additions & 2 deletions libs/server/Databases/IDatabaseManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public interface IDatabaseManager : IDisposable
/// </summary>
/// <param name="replicaRecover"></param>
/// <param name="recoverFromToken"></param>
public void RecoverCheckpoint(bool replicaRecover = false, bool recoverFromToken = false, CheckpointMetadata metadata = null);
public ValueTask RecoverCheckpointAsync(bool replicaRecover = false, bool recoverFromToken = false, CheckpointMetadata metadata = null);

/// <summary>
/// Take checkpoint of all active databases (or a specified database) if checkpointing is not in progress
Expand Down Expand Up @@ -140,7 +140,7 @@ public Task TaskCheckpointBasedOnAofSizeLimitAsync(long aofSizeLimit, Cancellati
/// <summary>
/// Recover AOF
/// </summary>
public void RecoverAOF();
public ValueTask RecoverAOFAsync();

/// <summary>
/// When replaying AOF we do not want to write AOF records again.
Expand Down
10 changes: 5 additions & 5 deletions libs/server/Databases/MultiDatabaseManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,11 @@ public MultiDatabaseManager(SingleDatabaseManager src) :
}

/// <inheritdoc/>
public override void RecoverCheckpoint(bool replicaRecover = false, bool recoverFromToken = false, CheckpointMetadata metadata = null)
public override async ValueTask RecoverCheckpointAsync(bool replicaRecover = false, bool recoverFromToken = false, CheckpointMetadata metadata = null)
{
if (replicaRecover)
throw new GarnetException(
$"Unexpected call to {nameof(MultiDatabaseManager)}.{nameof(RecoverCheckpoint)} with {nameof(replicaRecover)} == true.");
$"Unexpected call to {nameof(MultiDatabaseManager)}.{nameof(RecoverCheckpointAsync)} with {nameof(replicaRecover)} == true.");

var checkpointParentDir = StoreWrapper.serverOptions.StoreCheckpointBaseDirectory;
var checkpointDirBaseName = GarnetServerOptions.GetCheckpointDirectoryName(0);
Expand Down Expand Up @@ -116,7 +116,7 @@ public override void RecoverCheckpoint(bool replicaRecover = false, bool recover

try
{
RecoverDatabaseCheckpoint(db, out storeVersion);
storeVersion = await RecoverDatabaseCheckpointAsync(db).ConfigureAwait(false);
}
catch (TsavoriteNoHybridLogException ex)
{
Expand Down Expand Up @@ -416,7 +416,7 @@ public override async Task WaitForCommitToAofAsync(CancellationToken token = def
}

/// <inheritdoc/>
public override void RecoverAOF()
public override async ValueTask RecoverAOFAsync()
{
var aofParentDir = StoreWrapper.serverOptions.AppendOnlyFileBaseDirectory;
var aofDirBaseName = GarnetServerOptions.GetAppendOnlyFileDirectoryName(0);
Expand All @@ -442,7 +442,7 @@ public override void RecoverAOF()
if (!success)
throw new GarnetException($"Failed to retrieve or create database for AOF recovery (DB ID = {dbId}).");

RecoverDatabaseAOF(db);
await RecoverDatabaseAOFAsync(db).ConfigureAwait(false);
}
}

Expand Down
10 changes: 6 additions & 4 deletions libs/server/Databases/SingleDatabaseManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public override GarnetDatabase TryGetOrAddDatabase(int dbId, out bool success, o
}

/// <inheritdoc/>
public override void RecoverCheckpoint(bool replicaRecover = false, bool recoverFromToken = false, CheckpointMetadata metadata = null)
public override async ValueTask RecoverCheckpointAsync(bool replicaRecover = false, bool recoverFromToken = false, CheckpointMetadata metadata = null)
{
long storeVersion = 0;
try
Expand All @@ -64,15 +64,17 @@ public override void RecoverCheckpoint(bool replicaRecover = false, bool recover
// Note: Since replicaRecover only pertains to cluster-mode, we can use the default store pointers (since multi-db mode is disabled in cluster-mode)
if (metadata!.storeIndexToken != default && metadata.storeHlogToken != default)
{
storeVersion = !recoverFromToken ? Store.Recover() : Store.Recover(metadata.storeIndexToken, metadata.storeHlogToken);
storeVersion = !recoverFromToken
? await Store.RecoverAsync().ConfigureAwait(false)
: await Store.RecoverAsync(metadata.storeIndexToken, metadata.storeHlogToken).ConfigureAwait(false);
}

if (storeVersion > 0)
defaultDatabase.LastSaveTime = DateTimeOffset.UtcNow;
}
else
{
RecoverDatabaseCheckpoint(defaultDatabase, out storeVersion);
storeVersion = await RecoverDatabaseCheckpointAsync(defaultDatabase).ConfigureAwait(false);
}
}
catch (TsavoriteNoHybridLogException ex)
Expand Down Expand Up @@ -239,7 +241,7 @@ public override async Task WaitForCommitToAofAsync(CancellationToken token = def
}

/// <inheritdoc/>
public override void RecoverAOF() => RecoverDatabaseAOF(defaultDatabase);
public override ValueTask RecoverAOFAsync() => RecoverDatabaseAOFAsync(defaultDatabase);

/// <inheritdoc/>
public override AofAddress ReplayAOF(AofAddress untilAddress)
Expand Down
5 changes: 3 additions & 2 deletions libs/server/Providers/GarnetProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT license.

using System.Threading;
using System.Threading.Tasks;
using Garnet.common;
using Garnet.networking;
using Tsavorite.core;
Expand Down Expand Up @@ -43,8 +44,8 @@ public void Start()
/// <summary>
/// Recover
/// </summary>
public void Recover()
=> storeWrapper.Recover();
public ValueTask RecoverAsync()
=> storeWrapper.RecoverAsync();

/// <summary>
/// Dispose
Expand Down
14 changes: 7 additions & 7 deletions libs/server/StoreWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -360,19 +360,19 @@ public IPEndPoint GetClusterEndpoint()
return localEndPoint;
}

internal void Recover()
internal async ValueTask RecoverAsync()
{
if (serverOptions.EnableCluster)
{
if (serverOptions.Recover)
clusterProvider.Recover();
await clusterProvider.RecoverAsync().ConfigureAwait(false);
}
else
{
if (serverOptions.Recover)
{
RecoverCheckpoint();
RecoverAOF();
await RecoverCheckpointAsync().ConfigureAwait(false);
await RecoverAOFAsync().ConfigureAwait(false);
ReplayAOF(AofAddress.Create(length: serverOptions.AofPhysicalSublogCount, value: -1));
}
}
Expand Down Expand Up @@ -413,10 +413,10 @@ public async Task TakeOnDemandCheckpointAsync(DateTimeOffset entryTime, int dbId
/// <summary>
/// Recover checkpoint
/// </summary>
public void RecoverCheckpoint(bool replicaRecover = false, bool recoverFromToken = false, CheckpointMetadata metadata = null)
public async ValueTask RecoverCheckpointAsync(bool replicaRecover = false, bool recoverFromToken = false, CheckpointMetadata metadata = null)
{
StartSizeTrackers(); // We need to start this before recovery to have size tracking during the recovery process.
databaseManager.RecoverCheckpoint(replicaRecover, recoverFromToken, metadata);
await databaseManager.RecoverCheckpointAsync(replicaRecover, recoverFromToken, metadata).ConfigureAwait(false);
}

/// <summary>
Expand Down Expand Up @@ -447,7 +447,7 @@ public void ResumeCheckpoints(int dbId = 0)
/// <summary>
/// Recover AOF
/// </summary>
public void RecoverAOF() => databaseManager.RecoverAOF();
public ValueTask RecoverAOFAsync() => databaseManager.RecoverAOFAsync();

/// <summary>
/// When replaying AOF we do not want to write AOF records again.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ internal bool MaybeRecoverStore<SF, A>(TsavoriteKV<SF, A> store)
try
{
var sw = Stopwatch.StartNew();
store.Recover();
store.RecoverAsync().AsTask().GetAwaiter().GetResult();
sw.Stop();
Console.WriteLine($" Completed recovery in {(double)sw.ElapsedMilliseconds / 1000:N3} seconds");
return true;
Expand Down
Loading
Loading