diff --git a/fdbserver/logsystem/LogSystem.cpp b/fdbserver/logsystem/LogSystem.cpp index 75a200abae1..d5454a35e68 100644 --- a/fdbserver/logsystem/LogSystem.cpp +++ b/fdbserver/logsystem/LogSystem.cpp @@ -614,10 +614,6 @@ void LogSystem::coreStateWritten(DBCoreState const& newState) { } Future LogSystem::onError() const { - return onError_internal(this); -} - -Future LogSystem::onError_internal(LogSystem const* self) { // Never returns normally, but throws an error if the subsystem stops working while (true) { std::vector> failed; @@ -625,7 +621,7 @@ Future LogSystem::onError_internal(LogSystem const* self) { std::vector> backupFailed(1, Never()); std::vector> changes; - for (auto& it : self->tLogs) { + for (auto& it : tLogs) { for (auto& t : it->logServers) { if (t->get().present()) { failed.push_back(waitFailureClient(t->get().interf().waitFailure, @@ -664,10 +660,10 @@ Future LogSystem::onError_internal(LogSystem const* self) { } } - if (!self->recoveryCompleteWrittenToCoreState.get()) { + if (!recoveryCompleteWrittenToCoreState.get()) { failed.insert(failed.end(), routerFailed.begin(), routerFailed.end()); routerFailed.clear(); - for (auto& old : self->oldLogData) { + for (auto& old : oldLogData) { for (auto& it : old.tLogs) { for (auto& t : it->logRouters) { if (t->get().present()) { @@ -707,12 +703,12 @@ Future LogSystem::onError_internal(LogSystem const* self) { failed.insert(failed.end(), routerFailed.begin(), routerFailed.end()); } - if (self->hasRemoteServers && (!self->remoteRecovery.isReady() || self->remoteRecovery.isError())) { - changes.push_back(self->remoteRecovery); + if (hasRemoteServers && (!remoteRecovery.isReady() || remoteRecovery.isError())) { + changes.push_back(remoteRecovery); } - changes.push_back(self->recoveryCompleteWrittenToCoreState.onChange()); - changes.push_back(self->backupWorkerChanged.onTrigger()); + changes.push_back(recoveryCompleteWrittenToCoreState.onChange()); + changes.push_back(backupWorkerChanged.onTrigger()); ASSERT(!failed.empty()); co_await ( @@ -908,8 +904,7 @@ Future LogSystem::onKnownCommittedVersionChange() { return waitForAny(result); } -Future LogSystem::popFromLog(LogSystem* self, - Reference>> log, +Future LogSystem::popFromLog(Reference>> log, Tag tag, double delayBeforePop, bool popLogRouter) { @@ -918,10 +913,10 @@ Future LogSystem::popFromLog(LogSystem* self, co_await delay(delayBeforePop, TaskPriority::TLogPop); // to: first is upto version, second is durableKnownComittedVersion - std::pair to = self->outstandingPops[std::make_pair(log->get().id(), tag)]; + std::pair to = outstandingPops[std::make_pair(log->get().id(), tag)]; if (to.first <= last) { - self->outstandingPops.erase(std::make_pair(log->get().id(), tag)); + outstandingPops.erase(std::make_pair(log->get().id(), tag)); co_return; } @@ -932,14 +927,14 @@ Future LogSystem::popFromLog(LogSystem* self, TaskPriority::TLogPop); if (popLogRouter) { - self->logRouterLastPops[std::make_pair(log->get().id(), tag)] = to.first; + logRouterLastPops[std::make_pair(log->get().id(), tag)] = to.first; } last = to.first; } catch (Error& e) { if (e.code() == error_code_actor_cancelled) throw; - TraceEvent((e.code() == error_code_broken_promise) ? SevInfo : SevError, "LogPopError", self->dbgid) + TraceEvent((e.code() == error_code_broken_promise) ? SevInfo : SevError, "LogPopError", dbgid) .error(e) .detail("Log", log->get().id()); co_return; // Leaving outstandingPops filled in means no further pop requests to this tlog from this @@ -969,12 +964,12 @@ Future LogSystem::getPoppedFromTLog(Reference LogSystem::getPoppedTxs(LogSystem* self) { +Future LogSystem::getPoppedTxs() { std::vector>> poppedFutures; std::vector> poppedReady; - if (!self->tLogs.empty()) { + if (!tLogs.empty()) { poppedFutures.push_back(std::vector>()); - for (auto& it : self->tLogs) { + for (auto& it : tLogs) { for (auto& log : it->logServers) { poppedFutures.back().push_back(LogSystem::getPoppedFromTLog(log, Tag(tagLocalityTxs, 0))); } @@ -982,7 +977,7 @@ Future LogSystem::getPoppedTxs(LogSystem* self) { poppedReady.push_back(waitForAny(poppedFutures.back())); } - for (auto& old : self->oldLogData) { + for (auto& old : oldLogData) { if (!old.tLogs.empty()) { poppedFutures.push_back(std::vector>()); for (auto& it : old.tLogs) { @@ -994,7 +989,7 @@ Future LogSystem::getPoppedTxs(LogSystem* self) { } } - UID dbgid = self->dbgid; + UID dbgid = this->dbgid; Future maxGetPoppedDuration = delay(SERVER_KNOBS->TXS_POPPED_MAX_DELAY); co_await (waitForAll(poppedReady) || maxGetPoppedDuration); @@ -2117,8 +2112,7 @@ Future LogSystem::epochEnd(Reference>> outLo } } -Future LogSystem::recruitOldLogRouters(LogSystem* self, - std::vector workers, +Future LogSystem::recruitOldLogRouters(std::vector workers, LogEpoch recoveryCount, int8_t locality, Version startVersion, @@ -2131,17 +2125,17 @@ Future LogSystem::recruitOldLogRouters(LogSystem* self, Version lastStart = std::numeric_limits::max(); if (!forRemote) { - Version maxStart = LogSystem::getMaxLocalStartVersion(self->tLogs); + Version maxStart = LogSystem::getMaxLocalStartVersion(tLogs); lastStart = std::max(startVersion, maxStart); - if (self->logRouterTags == 0) { + if (logRouterTags == 0) { ASSERT_WE_THINK(false); - self->logSystemConfigChanged.trigger(); + logSystemConfigChanged.trigger(); co_return; } bool found = false; - for (auto& tLogs : self->tLogs) { + for (auto& tLogs : this->tLogs) { if (tLogs->locality == locality) { found = true; } @@ -2157,17 +2151,15 @@ Future LogSystem::recruitOldLogRouters(LogSystem* self, newLogSet->locality = locality; newLogSet->startVersion = lastStart; newLogSet->isLocal = false; - self->tLogs.push_back(newLogSet); + tLogs.push_back(newLogSet); } - for (auto& tLogs : self->tLogs) { + for (auto& tLogs : this->tLogs) { // Recruit log routers for old generations of the primary locality if (tLogs->locality == locality) { logRouterInitializationReplies.emplace_back(); - TraceEvent("LogRouterInitReqSent1") - .detail("Locality", locality) - .detail("LogRouterTags", self->logRouterTags); - for (int i = 0; i < self->logRouterTags; i++) { + TraceEvent("LogRouterInitReqSent1").detail("Locality", locality).detail("LogRouterTags", logRouterTags); + for (int i = 0; i < logRouterTags; i++) { InitializeLogRouterRequest req; req.reqId = deterministicRandom()->randomUniqueID(); req.recoveryCount = recoveryCount; @@ -2176,8 +2168,8 @@ Future LogSystem::recruitOldLogRouters(LogSystem* self, req.tLogLocalities = tLogLocalities; req.tLogPolicy = tLogPolicy; req.locality = locality; - req.recoverAt = self->recoverAt.get(); - req.knownLockedTLogIds = self->knownLockedTLogIds; + req.recoverAt = recoverAt.get(); + req.knownLockedTLogIds = knownLockedTLogIds; req.allowDropInSim = SERVER_KNOBS->CC_RECOVERY_INIT_REQ_ALLOW_DROP_IN_SIM && !forRemote; req.isReplacement = false; auto reply = transformErrors( @@ -2192,7 +2184,7 @@ Future LogSystem::recruitOldLogRouters(LogSystem* self, } } - for (auto& old : self->oldLogData) { + for (auto& old : oldLogData) { Version maxStart = LogSystem::getMaxLocalStartVersion(old.tLogs); if (old.logRouterTags == 0 || maxStart >= lastStart) { @@ -2255,10 +2247,10 @@ Future LogSystem::recruitOldLogRouters(LogSystem* self, std::vector> failed; if (!forRemote) { - Version maxStart = LogSystem::getMaxLocalStartVersion(self->tLogs); + Version maxStart = LogSystem::getMaxLocalStartVersion(tLogs); lastStart = std::max(startVersion, maxStart); - for (auto& tLogs : self->tLogs) { + for (auto& tLogs : this->tLogs) { if (tLogs->locality == locality) { for (int i = 0; i < logRouterInitializationReplies[nextReplies].size(); i++) { tLogs->logRouters.push_back(makeReference>>( @@ -2274,7 +2266,7 @@ Future LogSystem::recruitOldLogRouters(LogSystem* self, } } - for (auto& old : self->oldLogData) { + for (auto& old : oldLogData) { Version maxStart = LogSystem::getMaxLocalStartVersion(old.tLogs); if (old.logRouterTags == 0 || maxStart >= lastStart) { break; @@ -2299,7 +2291,7 @@ Future LogSystem::recruitOldLogRouters(LogSystem* self, } if (!forRemote) { - self->logSystemConfigChanged.trigger(); + logSystemConfigChanged.trigger(); co_await (!failed.empty() ? tagError(quorum(failed, 1), tlog_failed()) : Future(Never())); throw internal_error(); } @@ -2325,8 +2317,7 @@ std::vector LogSystem::getLocalTags(int8_t locality, const std::vector return localTags; } -Future LogSystem::newRemoteEpoch(LogSystem* self, - Reference oldLogSystem, +Future LogSystem::newRemoteEpoch(Reference oldLogSystem, Future fRemoteWorkers, DatabaseConfiguration configuration, LogEpoch recoveryCount, @@ -2351,7 +2342,7 @@ Future LogSystem::newRemoteEpoch(LogSystem* self, for (int lockNum = 0; lockNum < oldLogSystem->lockResults.size(); ++lockNum) { if (oldLogSystem->lockResults[lockNum].logSet->locality == remoteLocality) { while (true) { - auto durableVersionInfo = LogSystem::getDurableVersion(self->dbgid, oldLogSystem->lockResults[lockNum]); + auto durableVersionInfo = LogSystem::getDurableVersion(dbgid, oldLogSystem->lockResults[lockNum]); if (durableVersionInfo.present()) { logSet->startVersion = std::min(std::min(durableVersionInfo.get().knownCommittedVersion + 1, oldLogSystem->lockResults[lockNum].epochEnd), @@ -2373,22 +2364,21 @@ Future LogSystem::newRemoteEpoch(LogSystem* self, Future oldRouterRecruitment = Void(); if (logSet->startVersion < oldLogSystem->knownCommittedVersion + 1) { ASSERT(oldLogSystem->logRouterTags > 0); - oldRouterRecruitment = LogSystem::recruitOldLogRouters(self, - remoteWorkers.logRouters, - recoveryCount, - remoteLocality, - logSet->startVersion, - localities, - logSet->tLogPolicy, - /* forRemote */ true); + oldRouterRecruitment = recruitOldLogRouters(remoteWorkers.logRouters, + recoveryCount, + remoteLocality, + logSet->startVersion, + localities, + logSet->tLogPolicy, + /* forRemote */ true); } std::vector> logRouterInitializationReplies; const Version startVersion = oldLogSystem->logRouterTags == 0 ? oldLogSystem->recoverAt.get() + 1 - : std::max(self->tLogs[0]->startVersion, logSet->startVersion); - TraceEvent("LogRouterInitReqSent3").detail("Locality", remoteLocality).detail("LogRouterTags", self->logRouterTags); - for (int i = 0; i < self->logRouterTags; i++) { + : std::max(tLogs[0]->startVersion, logSet->startVersion); + TraceEvent("LogRouterInitReqSent3").detail("Locality", remoteLocality).detail("LogRouterTags", logRouterTags); + for (int i = 0; i < logRouterTags; i++) { InitializeLogRouterRequest req; req.reqId = deterministicRandom()->randomUniqueID(); req.recoveryCount = recoveryCount; @@ -2399,7 +2389,7 @@ Future LogSystem::newRemoteEpoch(LogSystem* self, req.locality = remoteLocality; req.allowDropInSim = false; req.isReplacement = false; - TraceEvent("RemoteTLogRouterReplies", self->dbgid) + TraceEvent("RemoteTLogRouterReplies", dbgid) .detail("WorkerID", remoteWorkers.logRouters[i % remoteWorkers.logRouters.size()].id()); logRouterInitializationReplies.push_back(transformErrors( throwErrorOr( @@ -2438,7 +2428,7 @@ Future LogSystem::newRemoteEpoch(LogSystem* self, } for (int i = 0; i < maxTxsTags; i++) { Tag tag = Tag(tagLocalityTxs, i); - Tag pushTag = Tag(tagLocalityTxs, i % self->txsTags); + Tag pushTag = Tag(tagLocalityTxs, i % txsTags); locations.clear(); logSet->getPushLocations(VectorRef(&pushTag, 1), locations, 0); for (int loc : locations) { @@ -2449,14 +2439,14 @@ Future LogSystem::newRemoteEpoch(LogSystem* self, } if (!oldLogSystem->tLogs.empty()) { - for (int i = 0; i < self->txsTags; i++) { + for (int i = 0; i < txsTags; i++) { localTags.push_back(Tag(tagLocalityTxs, i)); } } for (int i = 0; i < remoteWorkers.remoteTLogs.size(); i++) { InitializeTLogRequest& req = remoteTLogReqs[i]; - req.recruitmentID = self->recruitmentID; + req.recruitmentID = recruitmentID; req.logVersion = configuration.tLogVersion; req.storeType = configuration.tLogDataStoreType; req.spillType = configuration.tLogSpillType; @@ -2470,14 +2460,14 @@ Future LogSystem::newRemoteEpoch(LogSystem* self, req.allTags = localTags; req.startVersion = logSet->startVersion; req.logRouterTags = 0; - req.txsTags = self->txsTags; + req.txsTags = txsTags; req.recoveryTransactionVersion = recoveryTransactionVersion; req.oldGenerationRecoverAtVersions = oldGenerationRecoverAtVersions; } remoteTLogInitializationReplies.reserve(remoteWorkers.remoteTLogs.size()); for (int i = 0; i < remoteWorkers.remoteTLogs.size(); i++) { - TraceEvent("RemoteTLogInitReqSent", self->dbgid).detail("WorkerID", remoteWorkers.remoteTLogs[i].id()); + TraceEvent("RemoteTLogInitReqSent", dbgid).detail("WorkerID", remoteWorkers.remoteTLogs[i].id()); remoteTLogInitializationReplies.push_back(transformErrors( throwErrorOr(remoteWorkers.remoteTLogs[i].tLog.getReplyUnlessFailedFor( remoteTLogReqs[i], SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)), @@ -2486,8 +2476,8 @@ Future LogSystem::newRemoteEpoch(LogSystem* self, TraceEvent("RemoteLogRecruitment_InitializingRemoteLogs") .detail("StartVersion", logSet->startVersion) - .detail("LocalStart", self->tLogs[0]->startVersion) - .detail("LogRouterTags", self->logRouterTags); + .detail("LocalStart", tLogs[0]->startVersion) + .detail("LogRouterTags", logRouterTags); co_await (traceAfter(waitForAll(remoteTLogInitializationReplies), "RemoteTLogInitializationRepliesReceived") && traceAfter(waitForAll(logRouterInitializationReplies), "LogRouterInitializationRepliesReceived") && traceAfter(oldRouterRecruitment, "OldRouterRecruitmentFinished")); @@ -2516,10 +2506,9 @@ Future LogSystem::newRemoteEpoch(LogSystem* self, allRemoteTLogServers.push_back(logSet->logServers[i]); } - self->remoteRecoveryComplete = waitForAll(recoveryComplete); - self->remoteTrackTLogRecovery = - LogSystem::trackTLogRecoveryActor(allRemoteTLogServers, self->remoteRecoveredVersion); - self->tLogs.push_back(logSet); + remoteRecoveryComplete = waitForAll(recoveryComplete); + remoteTrackTLogRecovery = LogSystem::trackTLogRecoveryActor(allRemoteTLogServers, remoteRecoveredVersion); + tLogs.push_back(logSet); TraceEvent("RemoteLogRecruitment_CompletingRecovery").log(); } @@ -2692,14 +2681,13 @@ Future> LogSystem::newEpoch(Reference oldLogSyst if (oldLogSystem->logRouterTags > 0 || logSystem->tLogs[0]->startVersion < oldLogSystem->knownCommittedVersion + 1) { // Use log routers to recover [knownCommittedVersion, recoveryVersion] from the old generation. - oldRouterRecruitment = LogSystem::recruitOldLogRouters(oldLogSystem.getPtr(), - recr.oldLogRouters, - recoveryCount, - primaryLocality, - logSystem->tLogs[0]->startVersion, - localities, - logSystem->tLogs[0]->tLogPolicy, - /* forRemote */ false); + oldRouterRecruitment = oldLogSystem->recruitOldLogRouters(recr.oldLogRouters, + recoveryCount, + primaryLocality, + logSystem->tLogs[0]->startVersion, + localities, + logSystem->tLogs[0]->tLogPolicy, + /* forRemote */ false); if (oldLogSystem->knownCommittedVersion - logSystem->tLogs[0]->startVersion > SERVER_KNOBS->MAX_RECOVERY_VERSIONS) { // make sure we can recover in the other DC. @@ -2919,8 +2907,7 @@ Future> LogSystem::newEpoch(Reference oldLogSyst if (configuration.usableRegions > 1) { logSystem->hasRemoteServers = true; - logSystem->remoteRecovery = LogSystem::newRemoteEpoch(logSystem.getPtr(), - oldLogSystem, + logSystem->remoteRecovery = logSystem->newRemoteEpoch(oldLogSystem, fRemoteWorkers, configuration, recoveryCount, diff --git a/fdbserver/logsystem/LogSystemConsumer.cpp b/fdbserver/logsystem/LogSystemConsumer.cpp index 816fe221616..024aced946d 100644 --- a/fdbserver/logsystem/LogSystemConsumer.cpp +++ b/fdbserver/logsystem/LogSystemConsumer.cpp @@ -815,13 +815,11 @@ void LogSystemConsumer::popLogRouter(Version upTo, Tag tag, Version durableKnown std::make_pair(upTo, durableKnownCommittedVersion); } if (prev == 0) { - ls.popActors.add( - LogSystem::popFromLog(&ls, - log, - tag, - /*delayBeforePop=*/0.0, - /*popLogRouter=*/true)); // Fast pop time because log routers can only - // hold 5 seconds of data. + ls.popActors.add(ls.popFromLog(log, + tag, + /*delayBeforePop=*/0.0, + /*popLogRouter=*/true)); // Fast pop time because log routers can + // only hold 5 seconds of data. } } } @@ -848,8 +846,8 @@ void LogSystemConsumer::popLogRouter(Version upTo, Tag tag, Version durableKnown std::make_pair(upTo, durableKnownCommittedVersion); } if (prev == 0) { - ls.popActors.add(LogSystem::popFromLog( - &ls, log, tag, /*delayBeforePop=*/0.0, /*popLogRouter=*/true)); + ls.popActors.add( + ls.popFromLog(log, tag, /*delayBeforePop=*/0.0, /*popLogRouter=*/true)); } } } @@ -888,8 +886,7 @@ void LogSystemConsumer::pop(Version upTo, Tag tag, Version durableKnownCommitted } if (prev == 0) { // pop tag from log upto version defined in ls.outstandingPops[].first - ls.popActors.add( - LogSystem::popFromLog(&ls, log, tag, SERVER_KNOBS->POP_FROM_LOG_DELAY, /*popLogRouter=*/false)); + ls.popActors.add(ls.popFromLog(log, tag, SERVER_KNOBS->POP_FROM_LOG_DELAY, /*popLogRouter=*/false)); } } } @@ -898,7 +895,7 @@ void LogSystemConsumer::pop(Version upTo, Tag tag, Version durableKnownCommitted Future LogSystemConsumer::getTxsPoppedVersion() { auto& ls = *logSystem; - return LogSystem::getPoppedTxs(&ls); + return ls.getPoppedTxs(); } Version LogSystemConsumer::getEnd() const { diff --git a/fdbserver/logsystem/include/fdbserver/logsystem/LogSystem.h b/fdbserver/logsystem/include/fdbserver/logsystem/LogSystem.h index 8ee836ef40f..53feac666af 100644 --- a/fdbserver/logsystem/include/fdbserver/logsystem/LogSystem.h +++ b/fdbserver/logsystem/include/fdbserver/logsystem/LogSystem.h @@ -383,8 +383,6 @@ struct LogSystem : ReferenceCounted { Future onError() const; - static Future onError_internal(LogSystem const* self); - static Future pushResetChecker(Reference self, NetworkAddress addr); static Future recordPushMetrics(Reference self, @@ -410,16 +408,15 @@ struct LogSystem : ReferenceCounted { Future onKnownCommittedVersionChange(); - // pop tag from log up to the version defined in self->outstandingPops[].first - static Future popFromLog(LogSystem* self, - Reference>> log, - Tag tag, - double delayBeforePop, - bool popLogRouter); + // pop tag from log up to the version defined in outstandingPops[].first + Future popFromLog(Reference>> log, + Tag tag, + double delayBeforePop, + bool popLogRouter); static Future getPoppedFromTLog(Reference>> log, Tag tag); - static Future getPoppedTxs(LogSystem* self); + Future getPoppedTxs(); static Future confirmEpochLive_internal(Reference logSet, Optional debugID); @@ -508,28 +505,26 @@ struct LogSystem : ReferenceCounted { LocalityData locality, bool* forceRecovery); - static Future recruitOldLogRouters(LogSystem* self, - std::vector workers, - LogEpoch recoveryCount, - int8_t locality, - Version startVersion, - std::vector tLogLocalities, - Reference tLogPolicy, - bool forRemote); + Future recruitOldLogRouters(std::vector workers, + LogEpoch recoveryCount, + int8_t locality, + Version startVersion, + std::vector tLogLocalities, + Reference tLogPolicy, + bool forRemote); static Version getMaxLocalStartVersion(const std::vector>& tLogs); static std::vector getLocalTags(int8_t locality, const std::vector& allTags); - static Future newRemoteEpoch(LogSystem* self, - Reference oldLogSystem, - Future fRemoteWorkers, - DatabaseConfiguration configuration, - LogEpoch recoveryCount, - Version recoveryTransactionVersion, - int8_t remoteLocality, - std::vector allTags, - std::vector oldGenerationRecoverAtVersions); + Future newRemoteEpoch(Reference oldLogSystem, + Future fRemoteWorkers, + DatabaseConfiguration configuration, + LogEpoch recoveryCount, + Version recoveryTransactionVersion, + int8_t remoteLocality, + std::vector allTags, + std::vector oldGenerationRecoverAtVersions); static Future> newEpoch(Reference oldLogSystem, RecruitFromConfigurationReply recr,