Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 64 additions & 77 deletions fdbserver/logsystem/LogSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -614,18 +614,14 @@ void LogSystem::coreStateWritten(DBCoreState const& newState) {
}

Future<Void> LogSystem::onError() const {
return onError_internal(this);
}

Future<Void> LogSystem::onError_internal(LogSystem const* self) {
// Never returns normally, but throws an error if the subsystem stops working
while (true) {
std::vector<Future<Void>> failed;
std::vector<Future<Void>> routerFailed;
std::vector<Future<Void>> backupFailed(1, Never());
std::vector<Future<Void>> changes;

for (auto& it : self->tLogs) {
for (auto& it : tLogs) {
for (auto& t : it->logServers) {
if (t->get().present()) {
failed.push_back(waitFailureClient(t->get().interf().waitFailure,
Expand Down Expand Up @@ -664,10 +660,10 @@ Future<Void> LogSystem::onError_internal(LogSystem const* self) {
}
}

if (!self->recoveryCompleteWrittenToCoreState.get()) {
if (!recoveryCompleteWrittenToCoreState.get()) {
failed.insert(failed.end(), routerFailed.begin(), routerFailed.end());
routerFailed.clear();
for (auto& old : self->oldLogData) {
for (auto& old : oldLogData) {
for (auto& it : old.tLogs) {
for (auto& t : it->logRouters) {
if (t->get().present()) {
Expand Down Expand Up @@ -707,12 +703,12 @@ Future<Void> LogSystem::onError_internal(LogSystem const* self) {
failed.insert(failed.end(), routerFailed.begin(), routerFailed.end());
}

if (self->hasRemoteServers && (!self->remoteRecovery.isReady() || self->remoteRecovery.isError())) {
changes.push_back(self->remoteRecovery);
if (hasRemoteServers && (!remoteRecovery.isReady() || remoteRecovery.isError())) {
changes.push_back(remoteRecovery);
}

changes.push_back(self->recoveryCompleteWrittenToCoreState.onChange());
changes.push_back(self->backupWorkerChanged.onTrigger());
changes.push_back(recoveryCompleteWrittenToCoreState.onChange());
changes.push_back(backupWorkerChanged.onTrigger());

ASSERT(!failed.empty());
co_await (
Expand Down Expand Up @@ -908,8 +904,7 @@ Future<Void> LogSystem::onKnownCommittedVersionChange() {
return waitForAny(result);
}

Future<Void> LogSystem::popFromLog(LogSystem* self,
Reference<AsyncVar<OptionalInterface<TLogInterface>>> log,
Future<Void> LogSystem::popFromLog(Reference<AsyncVar<OptionalInterface<TLogInterface>>> log,
Tag tag,
double delayBeforePop,
bool popLogRouter) {
Expand All @@ -918,10 +913,10 @@ Future<Void> LogSystem::popFromLog(LogSystem* self,
co_await delay(delayBeforePop, TaskPriority::TLogPop);

// to: first is upto version, second is durableKnownComittedVersion
std::pair<Version, Version> to = self->outstandingPops[std::make_pair(log->get().id(), tag)];
std::pair<Version, Version> to = outstandingPops[std::make_pair(log->get().id(), tag)];

if (to.first <= last) {
self->outstandingPops.erase(std::make_pair(log->get().id(), tag));
outstandingPops.erase(std::make_pair(log->get().id(), tag));
co_return;
}

Expand All @@ -932,14 +927,14 @@ Future<Void> LogSystem::popFromLog(LogSystem* self,
TaskPriority::TLogPop);

if (popLogRouter) {
self->logRouterLastPops[std::make_pair(log->get().id(), tag)] = to.first;
logRouterLastPops[std::make_pair(log->get().id(), tag)] = to.first;
}

last = to.first;
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled)
throw;
TraceEvent((e.code() == error_code_broken_promise) ? SevInfo : SevError, "LogPopError", self->dbgid)
TraceEvent((e.code() == error_code_broken_promise) ? SevInfo : SevError, "LogPopError", dbgid)
.error(e)
.detail("Log", log->get().id());
co_return; // Leaving outstandingPops filled in means no further pop requests to this tlog from this
Expand Down Expand Up @@ -969,20 +964,20 @@ Future<Version> LogSystem::getPoppedFromTLog(Reference<AsyncVar<OptionalInterfac
}
}

Future<Version> LogSystem::getPoppedTxs(LogSystem* self) {
Future<Version> LogSystem::getPoppedTxs() {
std::vector<std::vector<Future<Version>>> poppedFutures;
std::vector<Future<Void>> poppedReady;
if (!self->tLogs.empty()) {
if (!tLogs.empty()) {
poppedFutures.push_back(std::vector<Future<Version>>());
for (auto& it : self->tLogs) {
for (auto& it : tLogs) {
for (auto& log : it->logServers) {
poppedFutures.back().push_back(LogSystem::getPoppedFromTLog(log, Tag(tagLocalityTxs, 0)));
}
}
poppedReady.push_back(waitForAny(poppedFutures.back()));
}

for (auto& old : self->oldLogData) {
for (auto& old : oldLogData) {
if (!old.tLogs.empty()) {
poppedFutures.push_back(std::vector<Future<Version>>());
for (auto& it : old.tLogs) {
Expand All @@ -994,7 +989,7 @@ Future<Version> LogSystem::getPoppedTxs(LogSystem* self) {
}
}

UID dbgid = self->dbgid;
UID dbgid = this->dbgid;
Future<Void> maxGetPoppedDuration = delay(SERVER_KNOBS->TXS_POPPED_MAX_DELAY);
co_await (waitForAll(poppedReady) || maxGetPoppedDuration);

Expand Down Expand Up @@ -2117,8 +2112,7 @@ Future<Void> LogSystem::epochEnd(Reference<AsyncVar<Reference<LogSystem>>> outLo
}
}

Future<Void> LogSystem::recruitOldLogRouters(LogSystem* self,
std::vector<WorkerInterface> workers,
Future<Void> LogSystem::recruitOldLogRouters(std::vector<WorkerInterface> workers,
LogEpoch recoveryCount,
int8_t locality,
Version startVersion,
Expand All @@ -2131,17 +2125,17 @@ Future<Void> LogSystem::recruitOldLogRouters(LogSystem* self,
Version lastStart = std::numeric_limits<Version>::max();

if (!forRemote) {
Version maxStart = LogSystem::getMaxLocalStartVersion(self->tLogs);
Version maxStart = LogSystem::getMaxLocalStartVersion(tLogs);

lastStart = std::max(startVersion, maxStart);
if (self->logRouterTags == 0) {
if (logRouterTags == 0) {
ASSERT_WE_THINK(false);
self->logSystemConfigChanged.trigger();
logSystemConfigChanged.trigger();
co_return;
}

bool found = false;
for (auto& tLogs : self->tLogs) {
for (auto& tLogs : this->tLogs) {
if (tLogs->locality == locality) {
found = true;
}
Expand All @@ -2157,17 +2151,15 @@ Future<Void> LogSystem::recruitOldLogRouters(LogSystem* self,
newLogSet->locality = locality;
newLogSet->startVersion = lastStart;
newLogSet->isLocal = false;
self->tLogs.push_back(newLogSet);
tLogs.push_back(newLogSet);
}

for (auto& tLogs : self->tLogs) {
for (auto& tLogs : this->tLogs) {
// Recruit log routers for old generations of the primary locality
if (tLogs->locality == locality) {
logRouterInitializationReplies.emplace_back();
TraceEvent("LogRouterInitReqSent1")
.detail("Locality", locality)
.detail("LogRouterTags", self->logRouterTags);
for (int i = 0; i < self->logRouterTags; i++) {
TraceEvent("LogRouterInitReqSent1").detail("Locality", locality).detail("LogRouterTags", logRouterTags);
for (int i = 0; i < logRouterTags; i++) {
InitializeLogRouterRequest req;
req.reqId = deterministicRandom()->randomUniqueID();
req.recoveryCount = recoveryCount;
Expand All @@ -2176,8 +2168,8 @@ Future<Void> LogSystem::recruitOldLogRouters(LogSystem* self,
req.tLogLocalities = tLogLocalities;
req.tLogPolicy = tLogPolicy;
req.locality = locality;
req.recoverAt = self->recoverAt.get();
req.knownLockedTLogIds = self->knownLockedTLogIds;
req.recoverAt = recoverAt.get();
req.knownLockedTLogIds = knownLockedTLogIds;
req.allowDropInSim = SERVER_KNOBS->CC_RECOVERY_INIT_REQ_ALLOW_DROP_IN_SIM && !forRemote;
req.isReplacement = false;
auto reply = transformErrors(
Expand All @@ -2192,7 +2184,7 @@ Future<Void> LogSystem::recruitOldLogRouters(LogSystem* self,
}
}

for (auto& old : self->oldLogData) {
for (auto& old : oldLogData) {
Version maxStart = LogSystem::getMaxLocalStartVersion(old.tLogs);

if (old.logRouterTags == 0 || maxStart >= lastStart) {
Expand Down Expand Up @@ -2255,10 +2247,10 @@ Future<Void> LogSystem::recruitOldLogRouters(LogSystem* self,
std::vector<Future<Void>> failed;

if (!forRemote) {
Version maxStart = LogSystem::getMaxLocalStartVersion(self->tLogs);
Version maxStart = LogSystem::getMaxLocalStartVersion(tLogs);

lastStart = std::max(startVersion, maxStart);
for (auto& tLogs : self->tLogs) {
for (auto& tLogs : this->tLogs) {
if (tLogs->locality == locality) {
for (int i = 0; i < logRouterInitializationReplies[nextReplies].size(); i++) {
tLogs->logRouters.push_back(makeReference<AsyncVar<OptionalInterface<TLogInterface>>>(
Expand All @@ -2274,7 +2266,7 @@ Future<Void> LogSystem::recruitOldLogRouters(LogSystem* self,
}
}

for (auto& old : self->oldLogData) {
for (auto& old : oldLogData) {
Version maxStart = LogSystem::getMaxLocalStartVersion(old.tLogs);
if (old.logRouterTags == 0 || maxStart >= lastStart) {
break;
Expand All @@ -2299,7 +2291,7 @@ Future<Void> LogSystem::recruitOldLogRouters(LogSystem* self,
}

if (!forRemote) {
self->logSystemConfigChanged.trigger();
logSystemConfigChanged.trigger();
co_await (!failed.empty() ? tagError<Void>(quorum(failed, 1), tlog_failed()) : Future<Void>(Never()));
throw internal_error();
}
Expand All @@ -2325,8 +2317,7 @@ std::vector<Tag> LogSystem::getLocalTags(int8_t locality, const std::vector<Tag>
return localTags;
}

Future<Void> LogSystem::newRemoteEpoch(LogSystem* self,
Reference<LogSystem> oldLogSystem,
Future<Void> LogSystem::newRemoteEpoch(Reference<LogSystem> oldLogSystem,
Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers,
DatabaseConfiguration configuration,
LogEpoch recoveryCount,
Expand All @@ -2351,7 +2342,7 @@ Future<Void> LogSystem::newRemoteEpoch(LogSystem* self,
for (int lockNum = 0; lockNum < oldLogSystem->lockResults.size(); ++lockNum) {
if (oldLogSystem->lockResults[lockNum].logSet->locality == remoteLocality) {
while (true) {
auto durableVersionInfo = LogSystem::getDurableVersion(self->dbgid, oldLogSystem->lockResults[lockNum]);
auto durableVersionInfo = LogSystem::getDurableVersion(dbgid, oldLogSystem->lockResults[lockNum]);
if (durableVersionInfo.present()) {
logSet->startVersion = std::min(std::min(durableVersionInfo.get().knownCommittedVersion + 1,
oldLogSystem->lockResults[lockNum].epochEnd),
Expand All @@ -2373,22 +2364,21 @@ Future<Void> LogSystem::newRemoteEpoch(LogSystem* self,
Future<Void> oldRouterRecruitment = Void();
if (logSet->startVersion < oldLogSystem->knownCommittedVersion + 1) {
ASSERT(oldLogSystem->logRouterTags > 0);
oldRouterRecruitment = LogSystem::recruitOldLogRouters(self,
remoteWorkers.logRouters,
recoveryCount,
remoteLocality,
logSet->startVersion,
localities,
logSet->tLogPolicy,
/* forRemote */ true);
oldRouterRecruitment = recruitOldLogRouters(remoteWorkers.logRouters,
recoveryCount,
remoteLocality,
logSet->startVersion,
localities,
logSet->tLogPolicy,
/* forRemote */ true);
}

std::vector<Future<TLogInterface>> logRouterInitializationReplies;
const Version startVersion = oldLogSystem->logRouterTags == 0
? oldLogSystem->recoverAt.get() + 1
: std::max(self->tLogs[0]->startVersion, logSet->startVersion);
TraceEvent("LogRouterInitReqSent3").detail("Locality", remoteLocality).detail("LogRouterTags", self->logRouterTags);
for (int i = 0; i < self->logRouterTags; i++) {
: std::max(tLogs[0]->startVersion, logSet->startVersion);
TraceEvent("LogRouterInitReqSent3").detail("Locality", remoteLocality).detail("LogRouterTags", logRouterTags);
for (int i = 0; i < logRouterTags; i++) {
InitializeLogRouterRequest req;
req.reqId = deterministicRandom()->randomUniqueID();
req.recoveryCount = recoveryCount;
Expand All @@ -2399,7 +2389,7 @@ Future<Void> LogSystem::newRemoteEpoch(LogSystem* self,
req.locality = remoteLocality;
req.allowDropInSim = false;
req.isReplacement = false;
TraceEvent("RemoteTLogRouterReplies", self->dbgid)
TraceEvent("RemoteTLogRouterReplies", dbgid)
.detail("WorkerID", remoteWorkers.logRouters[i % remoteWorkers.logRouters.size()].id());
logRouterInitializationReplies.push_back(transformErrors(
throwErrorOr(
Expand Down Expand Up @@ -2438,7 +2428,7 @@ Future<Void> LogSystem::newRemoteEpoch(LogSystem* self,
}
for (int i = 0; i < maxTxsTags; i++) {
Tag tag = Tag(tagLocalityTxs, i);
Tag pushTag = Tag(tagLocalityTxs, i % self->txsTags);
Tag pushTag = Tag(tagLocalityTxs, i % txsTags);
locations.clear();
logSet->getPushLocations(VectorRef<Tag>(&pushTag, 1), locations, 0);
for (int loc : locations) {
Expand All @@ -2449,14 +2439,14 @@ Future<Void> LogSystem::newRemoteEpoch(LogSystem* self,
}

if (!oldLogSystem->tLogs.empty()) {
for (int i = 0; i < self->txsTags; i++) {
for (int i = 0; i < txsTags; i++) {
localTags.push_back(Tag(tagLocalityTxs, i));
}
}

for (int i = 0; i < remoteWorkers.remoteTLogs.size(); i++) {
InitializeTLogRequest& req = remoteTLogReqs[i];
req.recruitmentID = self->recruitmentID;
req.recruitmentID = recruitmentID;
req.logVersion = configuration.tLogVersion;
req.storeType = configuration.tLogDataStoreType;
req.spillType = configuration.tLogSpillType;
Expand All @@ -2470,14 +2460,14 @@ Future<Void> LogSystem::newRemoteEpoch(LogSystem* self,
req.allTags = localTags;
req.startVersion = logSet->startVersion;
req.logRouterTags = 0;
req.txsTags = self->txsTags;
req.txsTags = txsTags;
req.recoveryTransactionVersion = recoveryTransactionVersion;
req.oldGenerationRecoverAtVersions = oldGenerationRecoverAtVersions;
}

remoteTLogInitializationReplies.reserve(remoteWorkers.remoteTLogs.size());
for (int i = 0; i < remoteWorkers.remoteTLogs.size(); i++) {
TraceEvent("RemoteTLogInitReqSent", self->dbgid).detail("WorkerID", remoteWorkers.remoteTLogs[i].id());
TraceEvent("RemoteTLogInitReqSent", dbgid).detail("WorkerID", remoteWorkers.remoteTLogs[i].id());
remoteTLogInitializationReplies.push_back(transformErrors(
throwErrorOr(remoteWorkers.remoteTLogs[i].tLog.getReplyUnlessFailedFor(
remoteTLogReqs[i], SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
Expand All @@ -2486,8 +2476,8 @@ Future<Void> LogSystem::newRemoteEpoch(LogSystem* self,

TraceEvent("RemoteLogRecruitment_InitializingRemoteLogs")
.detail("StartVersion", logSet->startVersion)
.detail("LocalStart", self->tLogs[0]->startVersion)
.detail("LogRouterTags", self->logRouterTags);
.detail("LocalStart", tLogs[0]->startVersion)
.detail("LogRouterTags", logRouterTags);
co_await (traceAfter(waitForAll(remoteTLogInitializationReplies), "RemoteTLogInitializationRepliesReceived") &&
traceAfter(waitForAll(logRouterInitializationReplies), "LogRouterInitializationRepliesReceived") &&
traceAfter(oldRouterRecruitment, "OldRouterRecruitmentFinished"));
Expand Down Expand Up @@ -2516,10 +2506,9 @@ Future<Void> LogSystem::newRemoteEpoch(LogSystem* self,
allRemoteTLogServers.push_back(logSet->logServers[i]);
}

self->remoteRecoveryComplete = waitForAll(recoveryComplete);
self->remoteTrackTLogRecovery =
LogSystem::trackTLogRecoveryActor(allRemoteTLogServers, self->remoteRecoveredVersion);
self->tLogs.push_back(logSet);
remoteRecoveryComplete = waitForAll(recoveryComplete);
remoteTrackTLogRecovery = LogSystem::trackTLogRecoveryActor(allRemoteTLogServers, remoteRecoveredVersion);
tLogs.push_back(logSet);
TraceEvent("RemoteLogRecruitment_CompletingRecovery").log();
}

Expand Down Expand Up @@ -2692,14 +2681,13 @@ Future<Reference<LogSystem>> LogSystem::newEpoch(Reference<LogSystem> oldLogSyst
if (oldLogSystem->logRouterTags > 0 ||
logSystem->tLogs[0]->startVersion < oldLogSystem->knownCommittedVersion + 1) {
// Use log routers to recover [knownCommittedVersion, recoveryVersion] from the old generation.
oldRouterRecruitment = LogSystem::recruitOldLogRouters(oldLogSystem.getPtr(),
recr.oldLogRouters,
recoveryCount,
primaryLocality,
logSystem->tLogs[0]->startVersion,
localities,
logSystem->tLogs[0]->tLogPolicy,
/* forRemote */ false);
oldRouterRecruitment = oldLogSystem->recruitOldLogRouters(recr.oldLogRouters,
recoveryCount,
primaryLocality,
logSystem->tLogs[0]->startVersion,
localities,
logSystem->tLogs[0]->tLogPolicy,
/* forRemote */ false);
if (oldLogSystem->knownCommittedVersion - logSystem->tLogs[0]->startVersion >
SERVER_KNOBS->MAX_RECOVERY_VERSIONS) {
// make sure we can recover in the other DC.
Expand Down Expand Up @@ -2919,8 +2907,7 @@ Future<Reference<LogSystem>> LogSystem::newEpoch(Reference<LogSystem> oldLogSyst

if (configuration.usableRegions > 1) {
logSystem->hasRemoteServers = true;
logSystem->remoteRecovery = LogSystem::newRemoteEpoch(logSystem.getPtr(),
oldLogSystem,
logSystem->remoteRecovery = logSystem->newRemoteEpoch(oldLogSystem,
fRemoteWorkers,
configuration,
recoveryCount,
Expand Down
Loading
Loading