From 80adcdd1f13cd6a622def7a9d34facf09ed8f8d5 Mon Sep 17 00:00:00 2001 From: michael stack Date: Wed, 1 Jul 2026 11:54:22 -0700 Subject: [PATCH] DD: refactor finishMoveKeys and finishMoveShards into focused helpers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Both functions grew past 400-500 lines of a single deeply-nested try/loop. Extract the four sequential phases each performs — decode planning snapshot, build dest SS interfaces, wait outside a transaction for destination readiness, re-verify and commit — into standalone helpers so the top-level functions read as orchestration loops. finishMoveKeys (~440 → ~180 lines) delegates to: * DecodedKeyServersState + decodeKeyServersState() — two-pass keyServers validation with the empty-dest / subset-of-team tolerance from the planning loop; returns nullopt when the iteration's range is already fully moved. * buildKeysDestServerInterfaces() — reads serverList entries and returns (interfaces, read version) so the caller can drop the txn before the long wait. * waitForKeysDestServers() — issues waitForShardReady() for SSes and TSSes, applies the counter-based TSS ignore policy, returns the ready count. * reverifyKeysDestAndCommit() — post-wait re-read, KRM-boundary re-truncation, destUnchanged() check, keyServers/serverKeys writes, commit. Returns false on dest change so the caller retries via the existing retryAfterPostWaitChange(). finishMoveShards (~525 → ~271 lines) delegates to shards-specific analogs (the two flavors' TSS policy, dest-decode invariants, and post-wait writes differ enough that forcing shared code hurts readability): * DecodedShardsKeyServers + decodeAndPreCheckShards() — dataMoveId stamp check on every sub-range and the per-sub-range AUDIT_DATAMOVE_PRE_CHECK (coroutine because of the audit call). Throws retry() and sets cancelDataMove on stamp mismatch, matching prior behavior. * buildShardsDestServerInterfaces() — like the keys flavor but throws retry() on missing serverList entries rather than asserting, matching the pre-refactor shards code. * waitForShardsDestServers() — implements the time-based skipTss latch (all SSes ready for DD_WAIT_TSS_DATA_MOVE_DELAY without TSS catching up → skip TSS for the remainder of the move). * ReverifyShardsResult (enum) + reverifyShardsAndCommit() — handles all three concurrent-change branches (dataMove deleted / phase changed / dest changed) uniformly via retryAfterPostWaitChange(), plus KRM re-truncation, bulk-load task completion, partial-vs-full commit, and dataMove metadata clear. Returns {RetryLoop, PartialCommitted, FullyCommitted} so the outer loop can distinguish "keep going on the same move" from "move complete". Pure refactor: no behavior change intended. Every CODE_PROBE, TraceEvent, ASSERT, error-code branch, and control-flow decision from the current finishMoveKeys / finishMoveShards on origin/main is preserved — including the newer post-#13364 machinery (retryAfterPostWaitChange, KRM boundary re-truncation with rare probes, ASSERT rereadEnd <= end, per-iteration FlowLock take/release). The catch handlers, backoff paths, and retry-cap semantics are untouched. --- fdbserver/core/MoveKeys.cpp | 1482 ++++++++++++++++++++--------------- 1 file changed, 855 insertions(+), 627 deletions(-) diff --git a/fdbserver/core/MoveKeys.cpp b/fdbserver/core/MoveKeys.cpp index 96f0c21fdb9..799e276cef9 100644 --- a/fdbserver/core/MoveKeys.cpp +++ b/fdbserver/core/MoveKeys.cpp @@ -1409,6 +1409,370 @@ static bool destUnchanged(const RangeResult& keyServers, return true; } +// Decoded per-iteration state used by finishMoveKeys after it reads the +// planning-era keyServers snapshot. finishMoveShards has its own analog +// (DecodedShardsKeyServers): the two flavors differ in `dest` provenance +// (keys decodes it from the snapshot; shards already knows it from +// dataMove.dest) and in `allServers` container type. +struct DecodedKeyServersState { + std::vector dest; + std::vector completeSrc; + std::set allServers; +}; + +// Decode + validate the keyServers range read at the top of finishMoveKeys. +// Two-pass: the first pass consumes prefix sub-ranges that were already +// completed by sibling iterations (empty dest with src ⊆ intendedTeam) and +// re-adopts `intendedTeam` from the on-disk dest if it doesn't match; the +// second pass then requires every remaining sub-range to agree on `dest`. +// Returns nullopt if the entire range has already been moved. +static Optional decodeKeyServersState(RangeResult const& UIDtoTagMap, + RangeResult const& keyServers, + std::vector const& destinationTeam, + UID relocationIntervalId, + KeyRange const& keys, + Key const& begin, + Key const& endKey) { + bool alreadyMoved = true; + std::vector dest; + std::vector src; + std::vector completeSrc; + std::set allServers; + std::set intendedTeam(destinationTeam.begin(), destinationTeam.end()); + + // Iterate through the beginning of keyServers until we find one that hasn't already been processed + int currentIndex; + for (currentIndex = 0; currentIndex < keyServers.size() - 1 && alreadyMoved; currentIndex++) { + decodeKeyServersValue(UIDtoTagMap, keyServers[currentIndex].value, src, dest); + + std::set srcSet; + for (int s = 0; s < src.size(); s++) { + srcSet.insert(src[s]); + } + + if (currentIndex == 0) { + completeSrc = src; + } else { + for (int i = 0; i < completeSrc.size(); i++) { + if (!srcSet.contains(completeSrc[i])) { + swapAndPop(&completeSrc, i--); + } + } + } + + std::set destSet; + for (int s = 0; s < dest.size(); s++) { + destSet.insert(dest[s]); + } + + allServers.insert(srcSet.begin(), srcSet.end()); + allServers.insert(destSet.begin(), destSet.end()); + + // Because marking a server as failed can shrink a team, do not check for exact equality + // Instead, check for a subset of the intended team, which also covers the equality case + bool isSubset = std::includes(intendedTeam.begin(), intendedTeam.end(), srcSet.begin(), srcSet.end()); + alreadyMoved = destSet.empty() && isSubset; + if (destSet != intendedTeam && !alreadyMoved) { + TraceEvent(SevWarn, "MoveKeysDestTeamNotIntended", relocationIntervalId) + .detail("KeyBegin", keys.begin) + .detail("KeyEnd", keys.end) + .detail("IterationBegin", begin) + .detail("IterationEnd", endKey) + .detail("SrcSet", describe(srcSet)) + .detail("DestSet", describe(destSet)) + .detail("IntendedTeam", describe(intendedTeam)) + .detail("KeyServers", keyServers); + // ASSERT( false ); + + ASSERT(!dest.empty()); // The range has already been moved, but to a different dest (or + // maybe dest was cleared) + + intendedTeam.clear(); + for (int i = 0; i < dest.size(); i++) + intendedTeam.insert(dest[i]); + } else if (alreadyMoved) { + dest.clear(); + src.clear(); + CODE_PROBE(true, "FinishMoveKeys first key in iteration sub-range has already been processed"); + } + } + + // Process the rest of the key servers + for (; currentIndex < keyServers.size() - 1; currentIndex++) { + std::vector src2, dest2; + decodeKeyServersValue(UIDtoTagMap, keyServers[currentIndex].value, src2, dest2); + + std::set srcSet; + for (int s = 0; s < src2.size(); s++) + srcSet.insert(src2[s]); + + for (int i = 0; i < completeSrc.size(); i++) { + if (!srcSet.contains(completeSrc[i])) { + swapAndPop(&completeSrc, i--); + } + } + + allServers.insert(srcSet.begin(), srcSet.end()); + + // Because marking a server as failed can shrink a team, do not check for exact equality + // Instead, check for a subset of the intended team, which also covers the equality case + bool isSubset = std::includes(intendedTeam.begin(), intendedTeam.end(), srcSet.begin(), srcSet.end()); + alreadyMoved = dest2.empty() && isSubset; + if (dest2 != dest && !alreadyMoved) { + TraceEvent(SevError, "FinishMoveKeysError", relocationIntervalId) + .detail("Reason", "dest mismatch") + .detail("Dest", describe(dest)) + .detail("Dest2", describe(dest2)); + ASSERT(false); + } + } + + if (dest.empty()) { + CODE_PROBE(true, + "A previous finishMoveKeys for this range committed just as it was cancelled to " + "start this one?"); + TraceEvent("FinishMoveKeysNothingToDo", relocationIntervalId) + .detail("KeyBegin", keys.begin) + .detail("KeyEnd", keys.end) + .detail("IterationBegin", begin) + .detail("IterationEnd", endKey); + return {}; + } + + return DecodedKeyServersState{ std::move(dest), std::move(completeSrc), std::move(allServers) }; +} + +// Read serverList entries for the new destination servers and build their +// StorageServerInterfaces. Must be called with an active transaction (which +// owns the FlowLock slot). Returns the interfaces plus the read version at +// which they were fetched — the read version is what waitForShardReady() +// needs (see finishMoveKeys where we save it before dropping the txn). +static Future, Version>> +buildKeysDestServerInterfaces(Transaction* tr, std::vector const& dest, std::vector const& completeSrc, + bool hasRemote) { + std::set completeSrcSet(completeSrc.begin(), completeSrc.end()); + std::vector newDestinations; + for (auto& it : dest) { + if (!hasRemote || !completeSrcSet.contains(it)) { + newDestinations.push_back(it); + } + } + + std::vector>> serverListEntries; + serverListEntries.reserve(newDestinations.size()); + for (int s = 0; s < newDestinations.size(); s++) + serverListEntries.push_back(tr->get(serverListKeyFor(newDestinations[s]))); + std::vector> serverListValues = co_await getAll(serverListEntries); + + std::vector storageServerInterfaces; + for (int s = 0; s < serverListValues.size(); s++) { + ASSERT(serverListValues[s].present()); // There should always be server list entries for servers in keyServers + auto si = decodeServerListValue(serverListValues[s].get()); + ASSERT(si.id() == newDestinations[s]); + storageServerInterfaces.push_back(si); + } + + Version readVersion = tr->getReadVersion().get(); + co_return std::make_pair(std::move(storageServerInterfaces), readVersion); +} + +// Wait for the new destination SSes (and paired TSSes) to report keys as +// READABLE at `readVersion`. Runs OUTSIDE any transaction: the caller drops +// its txn first so the 15 s SERVER_READY_QUORUM_TIMEOUT does not race the +// ~5 s txn lifetime. `waitForTSSCounter` is decremented (and `tssToIgnore` +// populated) when only TSS is slow, so subsequent iterations can skip it. +// Returns `destSize - (SSes not ready)`; the caller retries if not equal to +// `destSize`. `keys` is only used for tracing. +static Future waitForKeysDestServers(std::vector const& storageServerInterfaces, + int destSize, + KeyRange const& keys, + Version readVersion, + std::map const& tssMapping, + int* waitForTSSCounter, + std::unordered_set* tssToIgnore, + UID relocationIntervalId) { + TraceInterval waitInterval("RelocateShard_FinishMoveKeysWaitDurable"); + TraceEvent(SevDebug, waitInterval.begin(), relocationIntervalId) + .detail("KeyBegin", keys.begin) + .detail("KeyEnd", keys.end); + + std::vector> serverReady; // only for count below + std::vector> tssReady; // for waiting in parallel with tss + std::vector tssReadyInterfs; + + serverReady.reserve(storageServerInterfaces.size()); + tssReady.reserve(storageServerInterfaces.size()); + tssReadyInterfs.reserve(storageServerInterfaces.size()); + for (int s = 0; s < storageServerInterfaces.size(); s++) { + serverReady.push_back( + waitForShardReady(storageServerInterfaces[s], keys, readVersion, GetShardStateRequest::READABLE)); + + auto tssPair = tssMapping.find(storageServerInterfaces[s].id()); + + if (tssPair != tssMapping.end() && *waitForTSSCounter > 0 && !tssToIgnore->contains(tssPair->second.id())) { + tssReadyInterfs.push_back(tssPair->second); + tssReady.push_back( + waitForShardReady(tssPair->second, keys, readVersion, GetShardStateRequest::READABLE)); + } + } + + // Wait for all storage server moves, and explicitly swallow errors for tss ones with + // waitForAllReady. A long timeout is safe here — no transaction clock is ticking. + co_await timeout(waitForAll(serverReady) && waitForAllReady(tssReady), + SERVER_KNOBS->SERVER_READY_QUORUM_TIMEOUT, + Void(), + TaskPriority::MoveKeys); + + // Check to see if we're waiting only on tss. If so, decrement the waiting counter. + // If the waiting counter is zero, ignore the slow/non-responsive tss processes before finalizing + // the data move. + if (!tssReady.empty()) { + bool allSSDone = true; + for (auto& f : serverReady) { + allSSDone &= f.isReady() && !f.isError(); + if (!allSSDone) { + break; + } + } + + if (allSSDone) { + bool anyTssNotDone = false; + + for (auto& f : tssReady) { + if (!f.isReady() || f.isError()) { + anyTssNotDone = true; + (*waitForTSSCounter)--; + break; + } + } + + if (anyTssNotDone && *waitForTSSCounter == 0) { + for (int i = 0; i < tssReady.size(); i++) { + if (!tssReady[i].isReady() || tssReady[i].isError()) { + tssToIgnore->insert(tssReadyInterfs[i].id()); + } + } + } + } + } + + int count = destSize - storageServerInterfaces.size(); + for (int s = 0; s < serverReady.size(); s++) + count += serverReady[s].isReady() && !serverReady[s].isError(); + + int tssCount = 0; + for (int s = 0; s < tssReady.size(); s++) + tssCount += tssReady[s].isReady() && !tssReady[s].isError(); + + TraceEvent readyServersEv(SevDebug, waitInterval.end(), relocationIntervalId); + readyServersEv.detail("ReadyServers", count).detail("Dests", destSize); + if (!tssReady.empty()) { + readyServersEv.detail("ReadyTSS", tssCount); + } + + co_return count; +} + +// Post-wait: open a fresh txn (`tr` has been reset), re-read the keyServers +// range, and if every sub-range still maps to the planned `dest`, commit the +// keyServers/serverKeys metadata writes for `currentKeys`. Also handles the +// KRM-boundary re-truncation (see the comment inline). +// +// Returns true on committed. Returns false when the reread showed a dest +// change during the wait; the caller retries via retryAfterPostWaitChange(). +// `currentKeys` and `endKey` are in/out because a KRM boundary re-truncation +// can shorten them. +static Future reverifyKeysDestAndCommit(Transaction* tr, + MoveKeysLock lock, + const DDEnabledState* ddEnabledState, + KeyRange* currentKeys, + Key* endKey, + std::vector const& dest, + std::set const& allServers, + KeyRange const& keys, + UID relocationIntervalId, + int* retries, + TxnCounters* counters) { + tr->trState->taskID = TaskPriority::MoveKeys; + tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + ShardStateReads reread = co_await readShardState(tr, + lock, + ddEnabledState, + *currentKeys, + /*dataMoveId=*/{}, + SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT, + SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES); + + // Re-truncate currentKeys/endKey to the boundary the reread + // actually covered. krmGetRanges only emits the synthetic + // upper-bound row when its row/byte limit is NOT hit + // (see krmDecodeRanges in KeyRangeMap.cpp), so if a + // concurrent split or value-size growth pushed the byte + // limit forward, reread.keyServers.back().key can be + // strictly less than the planning-era currentKeys.end. + // The destUnchanged loop below only validates sub-ranges + // within reread.keyServers; the krmSetRangeCoalescing + // commit would otherwise clear and rewrite an + // unverified-and-possibly-foreign tail. Common under + // MOVE_KEYS_KRM_LIMIT buggify (2 rows -- which happens often + // in simulation). + Key rereadEnd = reread.keyServers.end()[-1].key; + // The reread is bounded by `currentKeys` (the upper-bound + // passed to readShardState), so a `>` result would mean + // readShardState broke its contract — fail loudly rather + // than silently expand the commit past the verified end. + ASSERT(rereadEnd <= currentKeys->end); + if (rereadEnd < currentKeys->end) { + CODE_PROBE(true, + "finishMoveKeys reread keyServers boundary shorter than planning", + probe::decoration::rare); + *currentKeys = KeyRangeRef(currentKeys->begin, rereadEnd); + *endKey = rereadEnd; + } + + // Verify every sub-range still maps to the planned dest (or has + // been already-moved-into-empty by a sibling iteration). If + // another DD reassigned a sub-range during the wait, retry + // rather than clobber its write with our stale plan. + if (!destUnchanged(reread.keyServers, reread.uidToTagMap, dest, /*expectedDataMoveId=*/{})) { + CODE_PROBE(true, "finishMoveKeys dest changed during waitForShardReady", probe::decoration::rare); + TraceEvent(SevWarn, "FinishMoveKeysDestChanged", relocationIntervalId) + .detail("KeyBegin", keys.begin) + .detail("KeyEnd", keys.end) + .detail("OrigDest", describe(dest)); + co_await retryAfterPostWaitChange(retries, tr); + co_return false; + } + + // update keyServers, serverKeys + // SOMEDAY: Doing these in parallel is safe because none of them overlap or touch (one per + // server) + co_await krmSetRangeCoalescing( + tr, keyServersPrefix, *currentKeys, keys, keyServersValue(reread.uidToTagMap, dest)); + + auto asi = allServers.begin(); + std::vector> actors; + while (asi != allServers.end()) { + bool destHasServer = std::find(dest.begin(), dest.end(), *asi) != dest.end(); + actors.push_back(krmSetRangeCoalescing( + tr, serverKeysPrefixFor(*asi), *currentKeys, allKeys, destHasServer ? serverKeysTrue : serverKeysFalse)); + ++asi; + } + + co_await waitForAll(actors); + + // Inject transaction_too_old before commit to exercise the + // retry limit and finish_move_keys_too_many_retries path. + if (buggify(0.01)) { + CODE_PROBE(true, "finishMoveKeys injecting transaction_too_old before commit"); + throw transaction_too_old(); + } + co_await tr->commit(); + counters->committed->increment(1); + co_return true; +} + // Set keyServers[keys].src = keyServers[keys].dest and keyServers[keys].dest=[], return when successful // keyServers[k].dest must be the same for all k in keys // Set serverKeys[dest][keys] = true; serverKeys[src][keys] = false for all src not in dest @@ -1423,7 +1787,6 @@ static Future finishMoveKeys(Database occ, std::map tssMapping, const DDEnabledState* ddEnabledState) { TraceInterval interval("RelocateShard_FinishMoveKeys"); - TraceInterval waitInterval(""); Future warningLogger = logWarningAfter("FinishMoveKeysTooLong", 600, destinationTeam); static auto* counters = makeCounters("/movekeys/finishMoveKeys"); Key begin = keys.begin; @@ -1469,336 +1832,63 @@ static Future finishMoveKeys(Database occ, /*dataMoveId=*/{}, SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT, SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES); - RangeResult& UIDtoTagMap = state.uidToTagMap; - RangeResult& keyServers = state.keyServers; // Determine the last processed key (which will be the beginning for the next iteration) - endKey = keyServers.end()[-1].key; + endKey = state.keyServers.end()[-1].key; currentKeys = KeyRangeRef(currentKeys.begin, endKey); - // printf(" finishMoveKeys( '%s'-'%s' ): read keyServers at %lld\n", keys.begin.toString().c_str(), - // keys.end.toString().c_str(), tr.getReadVersion().get()); - - // Decode and sanity check the result (dest must be the same for all ranges) - bool alreadyMoved = true; - - std::vector dest; - std::set allServers; - std::set intendedTeam(destinationTeam.begin(), destinationTeam.end()); - std::vector src; - std::vector completeSrc; - - // Iterate through the beginning of keyServers until we find one that hasn't already been processed - int currentIndex; - for (currentIndex = 0; currentIndex < keyServers.size() - 1 && alreadyMoved; currentIndex++) { - decodeKeyServersValue(UIDtoTagMap, keyServers[currentIndex].value, src, dest); - - std::set srcSet; - for (int s = 0; s < src.size(); s++) { - srcSet.insert(src[s]); - } - - if (currentIndex == 0) { - completeSrc = src; - } else { - for (int i = 0; i < completeSrc.size(); i++) { - if (!srcSet.contains(completeSrc[i])) { - swapAndPop(&completeSrc, i--); - } - } - } - - std::set destSet; - for (int s = 0; s < dest.size(); s++) { - destSet.insert(dest[s]); - } - - allServers.insert(srcSet.begin(), srcSet.end()); - allServers.insert(destSet.begin(), destSet.end()); - - // Because marking a server as failed can shrink a team, do not check for exact equality - // Instead, check for a subset of the intended team, which also covers the equality case - bool isSubset = - std::includes(intendedTeam.begin(), intendedTeam.end(), srcSet.begin(), srcSet.end()); - alreadyMoved = destSet.empty() && isSubset; - if (destSet != intendedTeam && !alreadyMoved) { - TraceEvent(SevWarn, "MoveKeysDestTeamNotIntended", relocationIntervalId) - .detail("KeyBegin", keys.begin) - .detail("KeyEnd", keys.end) - .detail("IterationBegin", begin) - .detail("IterationEnd", endKey) - .detail("SrcSet", describe(srcSet)) - .detail("DestSet", describe(destSet)) - .detail("IntendedTeam", describe(intendedTeam)) - .detail("KeyServers", keyServers); - // ASSERT( false ); - - ASSERT(!dest.empty()); // The range has already been moved, but to a different dest (or - // maybe dest was cleared) - - intendedTeam.clear(); - for (int i = 0; i < dest.size(); i++) - intendedTeam.insert(dest[i]); - } else if (alreadyMoved) { - dest.clear(); - src.clear(); - CODE_PROBE(true, - "FinishMoveKeys first key in iteration sub-range has already been processed"); - } - } - - // Process the rest of the key servers - for (; currentIndex < keyServers.size() - 1; currentIndex++) { - std::vector src2, dest2; - decodeKeyServersValue(UIDtoTagMap, keyServers[currentIndex].value, src2, dest2); - - std::set srcSet; - for (int s = 0; s < src2.size(); s++) - srcSet.insert(src2[s]); - - for (int i = 0; i < completeSrc.size(); i++) { - if (!srcSet.contains(completeSrc[i])) { - swapAndPop(&completeSrc, i--); - } - } - - allServers.insert(srcSet.begin(), srcSet.end()); - - // Because marking a server as failed can shrink a team, do not check for exact equality - // Instead, check for a subset of the intended team, which also covers the equality case - bool isSubset = - std::includes(intendedTeam.begin(), intendedTeam.end(), srcSet.begin(), srcSet.end()); - alreadyMoved = dest2.empty() && isSubset; - if (dest2 != dest && !alreadyMoved) { - TraceEvent(SevError, "FinishMoveKeysError", relocationIntervalId) - .detail("Reason", "dest mismatch") - .detail("Dest", describe(dest)) - .detail("Dest2", describe(dest2)); - ASSERT(false); - } - } - if (dest.empty()) { - CODE_PROBE(true, - "A previous finishMoveKeys for this range committed just as it was cancelled to " - "start this one?"); - TraceEvent("FinishMoveKeysNothingToDo", relocationIntervalId) - .detail("KeyBegin", keys.begin) - .detail("KeyEnd", keys.end) - .detail("IterationBegin", begin) - .detail("IterationEnd", endKey); - begin = keyServers.end()[-1].key; + // Decode + validate keyServers. Nullopt = entire range already moved + // by a sibling iteration; nothing more to do this iteration. + Optional decoded = decodeKeyServersState( + state.uidToTagMap, state.keyServers, destinationTeam, relocationIntervalId, keys, begin, endKey); + if (!decoded.present()) { + begin = endKey; break; } - waitInterval = TraceInterval("RelocateShard_FinishMoveKeysWaitDurable"); - TraceEvent(SevDebug, waitInterval.begin(), relocationIntervalId) - .detail("KeyBegin", keys.begin) - .detail("KeyEnd", keys.end); - - // Wait for a durable quorum of servers in destServers to have keys available (readWrite) - // They must also have at least the transaction read version so they can't "forget" the shard - // between now and when this transaction commits. - std::vector> serverReady; // only for count below - std::vector> tssReady; // for waiting in parallel with tss - std::vector tssReadyInterfs; - std::vector newDestinations; - std::set completeSrcSet(completeSrc.begin(), completeSrc.end()); - for (auto& it : dest) { - if (!hasRemote || !completeSrcSet.contains(it)) { - newDestinations.push_back(it); - } - } - - // for smartQuorum + // Read the destination SSes' interfaces and record the read version + // waitForShardReady will need after we drop the txn below. std::vector storageServerInterfaces; - std::vector>> serverListEntries; - serverListEntries.reserve(newDestinations.size()); - for (int s = 0; s < newDestinations.size(); s++) - serverListEntries.push_back(tr.get(serverListKeyFor(newDestinations[s]))); - std::vector> serverListValues = co_await getAll(serverListEntries); + Version readVersion; + std::tie(storageServerInterfaces, readVersion) = co_await buildKeysDestServerInterfaces( + &tr, decoded->dest, decoded->completeSrc, hasRemote); releaser.release(); - for (int s = 0; s < serverListValues.size(); s++) { - ASSERT(serverListValues[s] - .present()); // There should always be server list entries for servers in keyServers - auto si = decodeServerListValue(serverListValues[s].get()); - ASSERT(si.id() == newDestinations[s]); - storageServerInterfaces.push_back(si); - } - - // update client info in case tss mapping changed or server got updated - - // Save the read version before dropping the transaction. waitForShardReady - // needs a minimum version the dest must reach; saving the older version is - // sufficient because servers will already be past it by the time we - // re-verify in the second transaction below. - Version readVersion = tr.getReadVersion().get(); - // Drop the transaction BEFORE the potentially long wait. The 15-second // SERVER_READY_QUORUM_TIMEOUT exceeds the ~5-second transaction lifetime, // so waiting inside the transaction guarantees transaction_too_old on // commit when destination servers are slow to respond. tr.reset(); - // Wait for new destination servers to fetch the keys (OUTSIDE any transaction) - - serverReady.reserve(storageServerInterfaces.size()); - tssReady.reserve(storageServerInterfaces.size()); - tssReadyInterfs.reserve(storageServerInterfaces.size()); - for (int s = 0; s < storageServerInterfaces.size(); s++) { - serverReady.push_back(waitForShardReady( - storageServerInterfaces[s], keys, readVersion, GetShardStateRequest::READABLE)); - - auto tssPair = tssMapping.find(storageServerInterfaces[s].id()); - - if (tssPair != tssMapping.end() && waitForTSSCounter > 0 && - !tssToIgnore.contains(tssPair->second.id())) { - tssReadyInterfs.push_back(tssPair->second); - tssReady.push_back( - waitForShardReady(tssPair->second, keys, readVersion, GetShardStateRequest::READABLE)); - } - } - - // Wait for all storage server moves, and explicitly swallow errors for tss ones with - // waitForAllReady. A long timeout is safe here — no transaction clock is ticking. - co_await timeout(waitForAll(serverReady) && waitForAllReady(tssReady), - SERVER_KNOBS->SERVER_READY_QUORUM_TIMEOUT, - Void(), - TaskPriority::MoveKeys); - - // Check to see if we're waiting only on tss. If so, decrement the waiting counter. - // If the waiting counter is zero, ignore the slow/non-responsive tss processes before finalizing - // the data move. - if (!tssReady.empty()) { - bool allSSDone = true; - for (auto& f : serverReady) { - allSSDone &= f.isReady() && !f.isError(); - if (!allSSDone) { - break; - } - } - - if (allSSDone) { - bool anyTssNotDone = false; - - for (auto& f : tssReady) { - if (!f.isReady() || f.isError()) { - anyTssNotDone = true; - waitForTSSCounter--; - break; - } - } - - if (anyTssNotDone && waitForTSSCounter == 0) { - for (int i = 0; i < tssReady.size(); i++) { - if (!tssReady[i].isReady() || tssReady[i].isError()) { - tssToIgnore.insert(tssReadyInterfs[i].id()); - } - } - } - } - } - - int count = dest.size() - newDestinations.size(); - for (int s = 0; s < serverReady.size(); s++) - count += serverReady[s].isReady() && !serverReady[s].isError(); - - int tssCount = 0; - for (int s = 0; s < tssReady.size(); s++) - tssCount += tssReady[s].isReady() && !tssReady[s].isError(); - - TraceEvent readyServersEv(SevDebug, waitInterval.end(), relocationIntervalId); - readyServersEv.detail("ReadyServers", count).detail("Dests", dest.size()); - if (!tssReady.empty()) { - readyServersEv.detail("ReadyTSS", tssCount); - } - - if (count == dest.size()) { + // Wait for new destination servers to fetch the keys (OUTSIDE any transaction). + int count = co_await waitForKeysDestServers(storageServerInterfaces, + decoded->dest.size(), + keys, + readVersion, + tssMapping, + &waitForTSSCounter, + &tssToIgnore, + relocationIntervalId); + + if (count == (int)decoded->dest.size()) { // All destination servers are ready. Open a fresh transaction to // re-verify dest hasn't changed during the wait, then commit. - tr.trState->taskID = TaskPriority::MoveKeys; - tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); - tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - ShardStateReads reread = co_await readShardState(&tr, - lock, - ddEnabledState, - currentKeys, - /*dataMoveId=*/{}, - SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT, - SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES); - - // Re-truncate currentKeys/endKey to the boundary the reread - // actually covered. krmGetRanges only emits the synthetic - // upper-bound row when its row/byte limit is NOT hit - // (see krmDecodeRanges in KeyRangeMap.cpp), so if a - // concurrent split or value-size growth pushed the byte - // limit forward, reread.keyServers.back().key can be - // strictly less than the planning-era currentKeys.end. - // The destUnchanged loop below only validates sub-ranges - // within reread.keyServers; the krmSetRangeCoalescing - // commit would otherwise clear and rewrite an - // unverified-and-possibly-foreign tail. Common under - // MOVE_KEYS_KRM_LIMIT buggify (2 rows -- which happens often - // in simulation). - Key rereadEnd = reread.keyServers.end()[-1].key; - // The reread is bounded by `currentKeys` (the upper-bound - // passed to readShardState), so a `>` result would mean - // readShardState broke its contract — fail loudly rather - // than silently expand the commit past the verified end. - ASSERT(rereadEnd <= currentKeys.end); - if (rereadEnd < currentKeys.end) { - CODE_PROBE(true, - "finishMoveKeys reread keyServers boundary shorter than planning", - probe::decoration::rare); - currentKeys = KeyRangeRef(currentKeys.begin, rereadEnd); - endKey = rereadEnd; - } - - // Verify every sub-range still maps to the planned dest (or has - // been already-moved-into-empty by a sibling iteration). If - // another DD reassigned a sub-range during the wait, retry - // rather than clobber its write with our stale plan. - if (!destUnchanged(reread.keyServers, reread.uidToTagMap, dest, /*expectedDataMoveId=*/{})) { - CODE_PROBE( - true, "finishMoveKeys dest changed during waitForShardReady", probe::decoration::rare); - TraceEvent(SevWarn, "FinishMoveKeysDestChanged", relocationIntervalId) - .detail("KeyBegin", keys.begin) - .detail("KeyEnd", keys.end) - .detail("OrigDest", describe(dest)); - co_await retryAfterPostWaitChange(&retries, &tr); + bool committed = co_await reverifyKeysDestAndCommit(&tr, + lock, + ddEnabledState, + ¤tKeys, + &endKey, + decoded->dest, + decoded->allServers, + keys, + relocationIntervalId, + &retries, + counters); + if (!committed) { + // dest changed during wait — retry the inner loop continue; } - - // update keyServers, serverKeys - // SOMEDAY: Doing these in parallel is safe because none of them overlap or touch (one per - // server) - co_await krmSetRangeCoalescing( - &tr, keyServersPrefix, currentKeys, keys, keyServersValue(reread.uidToTagMap, dest)); - - auto asi = allServers.begin(); - std::vector> actors; - while (asi != allServers.end()) { - bool destHasServer = std::find(dest.begin(), dest.end(), *asi) != dest.end(); - actors.push_back(krmSetRangeCoalescing(&tr, - serverKeysPrefixFor(*asi), - currentKeys, - allKeys, - destHasServer ? serverKeysTrue : serverKeysFalse)); - ++asi; - } - - co_await waitForAll(actors); - - // Inject transaction_too_old before commit to exercise the - // retry limit and finish_move_keys_too_many_retries path. - if (buggify(0.01)) { - CODE_PROBE(true, "finishMoveKeys injecting transaction_too_old before commit"); - throw transaction_too_old(); - } - co_await tr.commit(); - counters->committed->increment(1); - begin = endKey; retries = 0; break; @@ -1813,11 +1903,9 @@ static Future finishMoveKeys(Database occ, .detail("KeyEnd", keys.end) .detail("Retries", retries) .detail("ReadyCount", count) - .detail("DestCount", dest.size()); + .detail("DestCount", decoded->dest.size()); throw finish_move_keys_too_many_retries(); } - serverReady.clear(); - tssReady.clear(); co_await delay(finishMoveKeysBackoff(retries)); tr.reset(); continue; @@ -2319,8 +2407,393 @@ static Future checkDataMoveComplete(Database occ, UID dataMoveId, KeyRange } } +// Decoded per-iteration state used by finishMoveShards after it reads the +// planning-era keyServers snapshot. See DecodedKeyServersState above for +// the finishMoveKeys analog and why the two are kept separate. +struct DecodedShardsKeyServers { + std::vector completeSrc; + std::unordered_set allServers; +}; + +// Decode + validate the keyServers range read at the top of finishMoveShards, +// asserting that every sub-range is stamped with the current dataMoveId, and +// running the per-sub-range AUDIT_DATAMOVE_PRE_CHECK when enabled. On a +// stamp mismatch, sets *cancelDataMove=true and throws retry() so the outer +// loop enters the cancel path. `dataMove` is only used for tracing. +static Future +decodeAndPreCheckShards(Database occ, Transaction* tr, RangeResult const& UIDtoTagMap, RangeResult const& keyServers, + std::vector const& destServers, UID dataMoveId, bool runPreCheck, + DataMoveMetaData const& dataMove, UID relocationIntervalId, Severity sevDm, + bool* cancelDataMove) { + std::vector completeSrc; + std::unordered_set allServers; + + for (int currentIndex = 0; currentIndex < keyServers.size() - 1; ++currentIndex) { + std::vector src; + std::vector dest; + UID srcId; + UID destId; + decodeKeyServersValue(UIDtoTagMap, keyServers[currentIndex].value, src, dest, srcId, destId); + const KeyRange currentRange = KeyRangeRef(keyServers[currentIndex].key, keyServers[currentIndex + 1].key); + TraceEvent(sevDm, "FinishMoveShardsProcessingShard", relocationIntervalId) + .detail("Range", currentRange) + .detail("SrcID", srcId) + .detail("Src", describe(src)) + .detail("DestID", destId) + .detail("Dest", describe(dest)) + .detail("DataMove", dataMove.toString()); + allServers.insert(src.begin(), src.end()); + allServers.insert(dest.begin(), dest.end()); + if (destId != dataMoveId) { + TraceEvent(SevWarnAlways, "FinishMoveShardsInconsistentIDs", relocationIntervalId) + .detail("DataMoveID", dataMoveId) + .detail("ExistingShardID", destId) + .detail("DataMove", dataMove.toString()); + *cancelDataMove = true; + throw retry(); + } + + // Pre validate consistency of update of keyServers and serverKeys + if (SERVER_KNOBS->AUDIT_DATAMOVE_PRE_CHECK && runPreCheck) { + std::vector servers(src.size() + dest.size()); + std::merge(src.begin(), src.end(), dest.begin(), dest.end(), servers.begin()); + co_await auditLocationMetadataPreCheck( + occ, tr, currentRange, servers, "finishMoveShards_precheck", dataMoveId); + } + + std::sort(dest.begin(), dest.end()); + ASSERT(std::equal(destServers.begin(), destServers.end(), dest.begin(), dest.end())); + + std::set srcSet; + for (int s = 0; s < src.size(); s++) { + srcSet.insert(src[s]); + } + + if (currentIndex == 0) { + completeSrc = src; + } else { + for (int i = 0; i < completeSrc.size(); i++) { + if (!srcSet.contains(completeSrc[i])) { + swapAndPop(&completeSrc, i--); + } + } + } + } + + co_return DecodedShardsKeyServers{ std::move(completeSrc), std::move(allServers) }; +} + +// finishMoveShards analog of buildKeysDestServerInterfaces. Only difference: +// a missing serverList entry throws retry() rather than asserting — shards +// tolerates the SS-removed race by re-reading dataMove and starting over. +static Future, Version>> +buildShardsDestServerInterfaces(Transaction* tr, std::vector const& destServers, + std::vector const& completeSrc, bool hasRemote) { + std::set completeSrcSet(completeSrc.begin(), completeSrc.end()); + std::vector newDestinations; + for (const UID& id : destServers) { + if (!hasRemote || !completeSrcSet.contains(id)) { + newDestinations.push_back(id); + } + } + + std::vector>> serverListEntries; + serverListEntries.reserve(newDestinations.size()); + for (const UID& id : newDestinations) { + serverListEntries.push_back(tr->get(serverListKeyFor(id))); + } + std::vector> serverListValues = co_await getAll(serverListEntries); + + std::vector storageServerInterfaces; + for (int s = 0; s < serverListValues.size(); s++) { + // TODO: if the server is removed, + if (!serverListValues[s].present()) { + throw retry(); + } + auto si = decodeServerListValue(serverListValues[s].get()); + ASSERT(si.id() == newDestinations[s]); + storageServerInterfaces.push_back(si); + } + + Version readVersion = tr->getReadVersion().get(); + co_return std::make_pair(std::move(storageServerInterfaces), readVersion); +} + +// Wait for destination SSes (and paired TSSes) to report the shard as +// READABLE at `readVersion`. Runs OUTSIDE any transaction. TSS policy +// differs from the keys flavor: shards uses a time-based skipTss latch +// (once all SSes have been ready for DD_WAIT_TSS_DATA_MOVE_DELAY without +// TSS catching up, TSS is skipped for the remainder of the move). +// `skipTss` and `ssReadyTime` are in/out. `newDestinationIds` is the UID +// list corresponding 1:1 with storageServerInterfaces, threaded through +// for trace-event fidelity with pre-refactor code. Returns the ready +// count for the caller's ready-versus-target comparison; also fills +// `readyServers_out` and `tssCount_out` for the caller's post-wait +// tracing. +static Future waitForShardsDestServers(std::vector const& storageServerInterfaces, + std::vector const& newDestinationIds, + KeyRange const& range, + Version readVersion, + std::map const& tssMapping, + bool* skipTss, + double* ssReadyTime, + std::vector* readyServers_out, + int* tssCount_out, + UID dataMoveId, + UID relocationIntervalId, + Severity sevDm, + DataMoveMetaData const& dataMove) { + std::vector> serverReady; // only for count below + std::vector> tssReady; // for waiting in parallel with tss + std::vector tssReadyInterfs; + + // Wait for new destination servers to fetch the data range. + serverReady.reserve(storageServerInterfaces.size()); + for (int s = 0; s < storageServerInterfaces.size(); s++) { + serverReady.push_back( + waitForShardReady(storageServerInterfaces[s], range, readVersion, GetShardStateRequest::READABLE)); + + if (*skipTss) + continue; + + auto tssPair = tssMapping.find(storageServerInterfaces[s].id()); + + if (tssPair != tssMapping.end()) { + tssReadyInterfs.push_back(tssPair->second); + tssReady.push_back( + waitForShardReady(tssPair->second, range, readVersion, GetShardStateRequest::READABLE)); + } + } + + TraceEvent(sevDm, "FinishMoveShardsWaitingServers", relocationIntervalId) + .detail("DataMoveID", dataMoveId) + .detail("NewDestinations", describe(newDestinationIds)) + .detail("DataMove", dataMove.toString()); + + // Wait OUTSIDE any transaction. A long timeout is safe — no + // transaction clock is ticking. + co_await timeout(waitForAll(serverReady) && waitForAllReady(tssReady), + SERVER_KNOBS->SERVER_READY_QUORUM_TIMEOUT, + Void(), + TaskPriority::MoveKeys); + + for (int s = 0; s < serverReady.size(); ++s) { + if (serverReady[s].isReady() && !serverReady[s].isError()) { + readyServers_out->push_back(storageServerInterfaces[s].uniqueID); + } + } + *tssCount_out = 0; + for (int s = 0; s < tssReady.size(); s++) { + if (tssReady[s].isReady() && !tssReady[s].isError()) { + (*tssCount_out) += 1; + } + } + + if ((int)readyServers_out->size() == (int)serverReady.size() && !*skipTss) { + *ssReadyTime = std::min(now(), *ssReadyTime); + if (*tssCount_out < (int)tssReady.size() && now() - *ssReadyTime >= SERVER_KNOBS->DD_WAIT_TSS_DATA_MOVE_DELAY) { + *skipTss = true; + TraceEvent(SevWarnAlways, "FinishMoveShardsSkipTSS") + .detail("DataMoveID", dataMoveId) + .detail("ReadyServers", describe(*readyServers_out)) + .detail("NewDestinations", describe(newDestinationIds)) + .detail("ReadyTSS", *tssCount_out) + .detail("TSSInfo", describe(tssReadyInterfs)) + .detail("SSReadyTime", *ssReadyTime); + } + } + + co_return (int)readyServers_out->size(); +} + +// Result of reverifyShardsAndCommit — encodes the three outcomes the caller +// has to distinguish: retry the inner loop, keep looping to finish more of +// the same data move, or the whole move is complete. +enum class ReverifyShardsResult { RetryLoop, PartialCommitted, FullyCommitted }; + +// Post-wait: open a fresh txn (`tr` has been reset), re-read the keyServers +// range + dataMove metadata, and if all invariants still hold, commit the +// keyServers/serverKeys/dataMove writes for `range`. Handles the three +// concurrent-change branches (dataMove deleted, phase changed, dest +// changed), the KRM-boundary re-truncation, bulk-load task completion, and +// partial-vs-full data move commit. +// `range` is in/out because a KRM boundary re-truncation can shorten it. +// `runPreCheck` is set to false on all retry-via-concurrent-change paths so +// the outer loop skips the per-sub-range AUDIT precheck on the next attempt. +// `cancelDataMove` is set on bulk-load-outdated so the outer catch enters +// the cancel path. +static Future reverifyShardsAndCommit(Transaction* tr, + Database occ, + MoveKeysLock lock, + const DDEnabledState* ddEnabledState, + UID dataMoveId, + std::vector const& destServers, + KeyRange* range, + std::unordered_set const& allServers, + Optional bulkLoadTaskState, + DataMoveMetaData* postWaitDataMove_out, + UID relocationIntervalId, + Severity sevDm, + int* retries, + bool* runPreCheck, + bool* cancelDataMove, + TxnCounters* counters) { + tr->trState->taskID = TaskPriority::MoveKeys; + tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::LOCK_AWARE); + ShardStateReads reread = co_await readShardState(tr, + lock, + ddEnabledState, + *range, + dataMoveId, + SERVER_KNOBS->MOVE_SHARD_KRM_ROW_LIMIT, + SERVER_KNOBS->MOVE_SHARD_KRM_BYTE_LIMIT); + + if (!reread.dataMove.present()) { + CODE_PROBE(true, + "finishMoveShards data move deleted during waitForShardReady", + probe::decoration::rare); + TraceEvent(SevWarn, "FinishMoveShardsDataMoveDeletedAfterWait", relocationIntervalId) + .detail("DataMoveID", dataMoveId); + *runPreCheck = false; + co_await retryAfterPostWaitChange(retries, tr); + co_return ReverifyShardsResult::RetryLoop; + } + if (reread.dataMove.get().getPhase() != DataMoveMetaData::Running) { + CODE_PROBE(true, + "finishMoveShards data move phase changed during waitForShardReady", + probe::decoration::rare); + TraceEvent(SevWarn, "FinishMoveShardsPhaseChangedAfterWait", relocationIntervalId) + .detail("DataMoveID", dataMoveId) + .detail("Phase", static_cast(reread.dataMove.get().getPhase())); + *runPreCheck = false; + co_await retryAfterPostWaitChange(retries, tr); + co_return ReverifyShardsResult::RetryLoop; + } + ASSERT(!reread.keyServers.empty()); + + // Re-truncate `range` to the boundary the reread actually + // covered. krmGetRanges only emits the synthetic upper-bound + // row when its row/byte limit is NOT hit (see krmDecodeRanges + // in KeyRangeMap.cpp), so if a concurrent split or value-size + // growth pushed the byte limit forward, + // reread.keyServers.back().key can be strictly less than the + // planning-era range.end. The destUnchanged loop only + // validates sub-ranges within reread.keyServers; the + // krmSetRangeCoalescing commits below would otherwise clear + // and rewrite an unverified-and-possibly-foreign tail. + Key rereadEnd = reread.keyServers.back().key; + // The reread is bounded by `range`, so a `>` result would + // mean readShardState broke its contract — fail loudly + // rather than silently expand the commit past the verified + // end. + ASSERT(rereadEnd <= range->end); + if (rereadEnd < range->end) { + CODE_PROBE(true, + "finishMoveShards reread keyServers boundary shorter than planning", + probe::decoration::rare); + *range = KeyRangeRef(range->begin, rereadEnd); + } + + if (!destUnchanged(reread.keyServers, reread.uidToTagMap, destServers, dataMoveId)) { + CODE_PROBE(true, "finishMoveShards dest changed during waitForShardReady", probe::decoration::rare); + TraceEvent(SevWarn, "FinishMoveShardsDestChanged", relocationIntervalId) + .detail("DataMoveID", dataMoveId) + .detail("Range", *range); + *runPreCheck = false; + co_await retryAfterPostWaitChange(retries, tr); + co_return ReverifyShardsResult::RetryLoop; + } + + // Use the freshly-read dataMove snapshot for partial-complete / + // checkpoint-deletion / dataMoveValue writes below; the caller's + // function-level `dataMove` retains the pre-wait snapshot used + // only by the post-loop trace at the end of finishMoveShards. + DataMoveMetaData postWaitDataMove = reread.dataMove.get(); + + std::vector> actors; + actors.push_back(krmSetRangeCoalescing( + tr, keyServersPrefix, *range, allKeys, keyServersValue(destServers, {}, dataMoveId, UID()))); + + for (const UID& ssId : allServers) { + const bool destHasServer = std::find(destServers.begin(), destServers.end(), ssId) != destServers.end(); + actors.push_back(krmSetRangeCoalescing(tr, + serverKeysPrefixFor(ssId), + *range, + allKeys, + destHasServer ? serverKeysValue(dataMoveId) : serverKeysFalse)); + TraceEvent(sevDm, "FinishMoveShardsSetServerKeyRange", relocationIntervalId) + .detail("StorageServerID", ssId) + .detail("KeyRange", *range) + .detail("ShardID", destHasServer ? dataMoveId : UID()) + .detail("DataMove", postWaitDataMove.toString()); + } + + co_await waitForAll(actors); + + const bool isFullMove = range->end == postWaitDataMove.ranges.front().end; + if (isFullMove) { + if (bulkLoadTaskState.present()) { + BulkLoadTaskState newBulkLoadTaskState; + try { + newBulkLoadTaskState = co_await getBulkLoadTask(tr, + bulkLoadTaskState.get().getRange(), + bulkLoadTaskState.get().getTaskId(), + { BulkLoadPhase::Running, BulkLoadPhase::Complete }); + newBulkLoadTaskState.phase = BulkLoadPhase::Complete; + } catch (Error& e) { + if (e.code() == error_code_bulkload_task_outdated) { + *cancelDataMove = true; + throw retry(); + } + throw e; + } + ASSERT(newBulkLoadTaskState.getDataMoveId().present() && + newBulkLoadTaskState.getDataMoveId().get() == dataMoveId); + newBulkLoadTaskState.completeTime = now(); + co_await krmSetRange( + tr, bulkLoadTaskPrefix, newBulkLoadTaskState.getRange(), bulkLoadTaskStateValue(newBulkLoadTaskState)); + TraceEvent(bulkLoadVerboseEventSev(), "DDBulkLoadTaskSetCompleteTransaction", relocationIntervalId) + .detail("DataMoveID", dataMoveId) + .detail("JobID", newBulkLoadTaskState.getJobId()) + .detail("TaskID", newBulkLoadTaskState.getTaskId()); + postWaitDataMove.bulkLoadTaskState = newBulkLoadTaskState; + } + co_await deleteCheckpoints(tr, postWaitDataMove.checkpoints, dataMoveId); + tr->clear(dataMoveKeyFor(dataMoveId)); + TraceEvent(sevDm, "FinishMoveShardsDeleteMetaData", relocationIntervalId) + .detail("DataMove", postWaitDataMove.toString()); + } else if (!bulkLoadTaskState.present()) { + // Bulk Loading data move does not allow partial complete + TraceEvent(SevInfo, "FinishMoveShardsPartialComplete", relocationIntervalId) + .detail("DataMoveID", dataMoveId) + .detail("CurrentRange", *range) + .detail("NewDataMoveMetaData", postWaitDataMove.toString()) + .detail("DataMove", postWaitDataMove.toString()); + postWaitDataMove.ranges.front() = KeyRangeRef(range->end, postWaitDataMove.ranges.front().end); + tr->set(dataMoveKeyFor(dataMoveId), dataMoveValue(postWaitDataMove)); + } + + co_await tr->commit(); + counters->committed->increment(1); + + if (isFullMove && bulkLoadTaskState.present()) { + Version commitVersion = tr->getCommittedVersion(); + TraceEvent(bulkLoadVerboseEventSev(), "DDBulkLoadTaskPersistCompleteState", relocationIntervalId) + .detail("JobID", bulkLoadTaskState.get().getJobId()) + .detail("DataMoveID", dataMoveId) + .detail("TaskID", bulkLoadTaskState.get().getTaskId()) + .detail("TaskRange", bulkLoadTaskState.get().getRange()) + .detail("CommitVersion", commitVersion); + } + + *postWaitDataMove_out = std::move(postWaitDataMove); + co_return isFullMove ? ReverifyShardsResult::FullyCommitted : ReverifyShardsResult::PartialCommitted; +} + // Set keyServers[keys].src = keyServers[keys].dest and keyServers[keys].dest=[], return when successful -// keyServers[k].dest must be the same for all k in keys. +// keyServers[k].dest must be the same for all k in keys // Set serverKeys[dest][keys] = dataMoveId; serverKeys[src][keys] = false for all src not in dest. // Clear dataMoves[dataMoveId]. static Future finishMoveShards(Database occ, @@ -2364,9 +2837,7 @@ static Future finishMoveShards(Database occ, // target range. while (true) { counters->started->increment(1); - std::vector completeSrc; std::vector destServers; - std::unordered_set allServers; KeyRange range; Transaction tr(occ); Error err; @@ -2434,131 +2905,33 @@ static Future finishMoveShards(Database occ, /*dataMoveId=*/{}, SERVER_KNOBS->MOVE_SHARD_KRM_ROW_LIMIT, SERVER_KNOBS->MOVE_SHARD_KRM_BYTE_LIMIT); - RangeResult& UIDtoTagMap = state.uidToTagMap; - RangeResult& keyServers = state.keyServers; - ASSERT(!keyServers.empty()); - range = KeyRangeRef(range.begin, keyServers.back().key); + ASSERT(!state.keyServers.empty()); + range = KeyRangeRef(range.begin, state.keyServers.back().key); ASSERT(!range.empty()); - int currentIndex = 0; - for (; currentIndex < keyServers.size() - 1; ++currentIndex) { - std::vector src; - std::vector dest; - UID srcId; - UID destId; - decodeKeyServersValue(UIDtoTagMap, keyServers[currentIndex].value, src, dest, srcId, destId); - const KeyRange currentRange = - KeyRangeRef(keyServers[currentIndex].key, keyServers[currentIndex + 1].key); - TraceEvent(sevDm, "FinishMoveShardsProcessingShard", relocationIntervalId) - .detail("Range", currentRange) - .detail("SrcID", srcId) - .detail("Src", describe(src)) - .detail("DestID", destId) - .detail("Dest", describe(dest)) - .detail("DataMove", dataMove.toString()); - allServers.insert(src.begin(), src.end()); - allServers.insert(dest.begin(), dest.end()); - if (destId != dataMoveId) { - TraceEvent(SevWarnAlways, "FinishMoveShardsInconsistentIDs", relocationIntervalId) - .detail("DataMoveID", dataMoveId) - .detail("ExistingShardID", destId) - .detail("DataMove", dataMove.toString()); - cancelDataMove = true; - throw retry(); - } - - // Pre validate consistency of update of keyServers and serverKeys - if (SERVER_KNOBS->AUDIT_DATAMOVE_PRE_CHECK && runPreCheck) { - std::vector servers(src.size() + dest.size()); - std::merge(src.begin(), src.end(), dest.begin(), dest.end(), servers.begin()); - co_await auditLocationMetadataPreCheck( - occ, &tr, currentRange, servers, "finishMoveShards_precheck", dataMoveId); - } - - std::sort(dest.begin(), dest.end()); - ASSERT(std::equal(destServers.begin(), destServers.end(), dest.begin(), dest.end())); - - std::set srcSet; - for (int s = 0; s < src.size(); s++) { - srcSet.insert(src[s]); - } - - if (currentIndex == 0) { - completeSrc = src; - } else { - for (int i = 0; i < completeSrc.size(); i++) { - if (!srcSet.contains(completeSrc[i])) { - swapAndPop(&completeSrc, i--); - } - } - } - } - - // Wait for a durable quorum of servers in destServers to have keys available (readWrite) - // They must also have at least the transaction read version so they can't "forget" the shard - // between now and when this transaction commits. - std::vector> serverReady; // only for count below - std::vector> tssReady; // for waiting in parallel with tss - std::vector tssReadyInterfs; - std::vector newDestinations; - std::set completeSrcSet(completeSrc.begin(), completeSrc.end()); - for (const UID& id : destServers) { - if (!hasRemote || !completeSrcSet.contains(id)) { - newDestinations.push_back(id); - } - } - + // Decode + per-sub-range AUDIT precheck. Throws retry() on + // stamp mismatch (and sets cancelDataMove). + DecodedShardsKeyServers decoded = co_await decodeAndPreCheckShards(occ, + &tr, + state.uidToTagMap, + state.keyServers, + destServers, + dataMoveId, + runPreCheck, + dataMove, + relocationIntervalId, + sevDm, + &cancelDataMove); + + // Read the destination SSes' interfaces and record the read + // version waitForShardReady will need after we drop the txn. std::vector storageServerInterfaces; - std::vector>> serverListEntries; - serverListEntries.reserve(newDestinations.size()); - for (const UID& id : newDestinations) { - serverListEntries.push_back(tr.get(serverListKeyFor(id))); - } - std::vector> serverListValues = co_await getAll(serverListEntries); + Version readVersion; + std::tie(storageServerInterfaces, readVersion) = co_await buildShardsDestServerInterfaces( + &tr, destServers, decoded.completeSrc, hasRemote); releaser.release(); - for (int s = 0; s < serverListValues.size(); s++) { - // TODO: if the server is removed, - if (!serverListValues[s].present()) { - throw retry(); - } - auto si = decodeServerListValue(serverListValues[s].get()); - ASSERT(si.id() == newDestinations[s]); - storageServerInterfaces.push_back(si); - } - - // update client info in case tss mapping changed or server got updated - - // Save the read version before dropping the transaction (mirrors the - // pattern in finishMoveKeys above): waitForShardReady needs a minimum - // version the dest must reach; servers will already be past it by the - // time we re-verify in the second transaction below. - Version readVersion = tr.getReadVersion().get(); - - // Wait for new destination servers to fetch the data range. - serverReady.reserve(storageServerInterfaces.size()); - for (int s = 0; s < storageServerInterfaces.size(); s++) { - serverReady.push_back(waitForShardReady( - storageServerInterfaces[s], range, readVersion, GetShardStateRequest::READABLE)); - - if (skipTss) - continue; - - auto tssPair = tssMapping.find(storageServerInterfaces[s].id()); - - if (tssPair != tssMapping.end()) { - tssReadyInterfs.push_back(tssPair->second); - tssReady.push_back( - waitForShardReady(tssPair->second, range, readVersion, GetShardStateRequest::READABLE)); - } - } - - TraceEvent(sevDm, "FinishMoveShardsWaitingServers", relocationIntervalId) - .detail("DataMoveID", dataMoveId) - .detail("NewDestinations", describe(newDestinations)) - .detail("DataMove", dataMove.toString()); - // Drop the transaction BEFORE the potentially long wait. The 15 s // SERVER_READY_QUORUM_TIMEOUT exceeds the ~5 s txn lifetime; waiting // inside the transaction guarantees transaction_too_old on commit @@ -2566,211 +2939,65 @@ static Future finishMoveShards(Database occ, // finishMoveKeys above. tr.reset(); - // Wait OUTSIDE any transaction. A long timeout is safe — no - // transaction clock is ticking. - co_await timeout(waitForAll(serverReady) && waitForAllReady(tssReady), - SERVER_KNOBS->SERVER_READY_QUORUM_TIMEOUT, - Void(), - TaskPriority::MoveKeys); + // Recover the UID list for tracing. Since buildShardsDestServerInterfaces + // throws retry() on any missing serverList entry, storageServerInterfaces + // is 1:1 with the newDestinations subset — extract the UIDs here for the + // trace-event details below. + std::vector newDestinationIds; + newDestinationIds.reserve(storageServerInterfaces.size()); + for (auto const& si : storageServerInterfaces) { + newDestinationIds.push_back(si.id()); + } std::vector readyServers; - for (int s = 0; s < serverReady.size(); ++s) { - if (serverReady[s].isReady() && !serverReady[s].isError()) { - readyServers.push_back(storageServerInterfaces[s].uniqueID); - } - } int tssCount = 0; - for (int s = 0; s < tssReady.size(); s++) { - if (tssReady[s].isReady() && !tssReady[s].isError()) { - tssCount += 1; - } - } - - if (readyServers.size() == serverReady.size() && !skipTss) { - ssReadyTime = std::min(now(), ssReadyTime); - if (tssCount < tssReady.size() && - now() - ssReadyTime >= SERVER_KNOBS->DD_WAIT_TSS_DATA_MOVE_DELAY) { - skipTss = true; - TraceEvent(SevWarnAlways, "FinishMoveShardsSkipTSS") - .detail("DataMoveID", dataMoveId) - .detail("ReadyServers", describe(readyServers)) - .detail("NewDestinations", describe(newDestinations)) - .detail("ReadyTSS", tssCount) - .detail("TSSInfo", describe(tssReadyInterfs)) - .detail("SSReadyTime", ssReadyTime); - } - } + int newDestinationsCount = (int)storageServerInterfaces.size(); + int readyCount = co_await waitForShardsDestServers(storageServerInterfaces, + newDestinationIds, + range, + readVersion, + tssMapping, + &skipTss, + &ssReadyTime, + &readyServers, + &tssCount, + dataMoveId, + relocationIntervalId, + sevDm, + dataMove); TraceEvent(sevDm, "FinishMoveShardsWaitedServers", relocationIntervalId) .detail("DataMoveID", dataMoveId) .detail("ReadyServers", describe(readyServers)) - .detail("NewDestinations", describe(newDestinations)) + .detail("NewDestinations", describe(newDestinationIds)) .detail("ReadyTSS", tssCount) .detail("DataMove", dataMove.toString()); - if (readyServers.size() == newDestinations.size()) { - + if (readyCount == newDestinationsCount) { // All destination servers are ready. Open a fresh transaction to // re-verify dataMove and shard assignments haven't changed during // the wait, then commit. - tr.trState->taskID = TaskPriority::MoveKeys; - tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); - tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr.setOption(FDBTransactionOptions::LOCK_AWARE); - ShardStateReads reread = co_await readShardState(&tr, - lock, - ddEnabledState, - range, - dataMoveId, - SERVER_KNOBS->MOVE_SHARD_KRM_ROW_LIMIT, - SERVER_KNOBS->MOVE_SHARD_KRM_BYTE_LIMIT); - - if (!reread.dataMove.present()) { - CODE_PROBE(true, - "finishMoveShards data move deleted during waitForShardReady", - probe::decoration::rare); - TraceEvent(SevWarn, "FinishMoveShardsDataMoveDeletedAfterWait", relocationIntervalId) - .detail("DataMoveID", dataMoveId); - runPreCheck = false; - co_await retryAfterPostWaitChange(&retries, &tr); + DataMoveMetaData postWaitDataMove; + ReverifyShardsResult res = co_await reverifyShardsAndCommit(&tr, + occ, + lock, + ddEnabledState, + dataMoveId, + destServers, + &range, + decoded.allServers, + bulkLoadTaskState, + &postWaitDataMove, + relocationIntervalId, + sevDm, + &retries, + &runPreCheck, + &cancelDataMove, + counters); + if (res == ReverifyShardsResult::RetryLoop) { continue; } - if (reread.dataMove.get().getPhase() != DataMoveMetaData::Running) { - CODE_PROBE(true, - "finishMoveShards data move phase changed during waitForShardReady", - probe::decoration::rare); - TraceEvent(SevWarn, "FinishMoveShardsPhaseChangedAfterWait", relocationIntervalId) - .detail("DataMoveID", dataMoveId) - .detail("Phase", static_cast(reread.dataMove.get().getPhase())); - runPreCheck = false; - co_await retryAfterPostWaitChange(&retries, &tr); - continue; - } - ASSERT(!reread.keyServers.empty()); - - // Re-truncate `range` to the boundary the reread actually - // covered. krmGetRanges only emits the synthetic upper-bound - // row when its row/byte limit is NOT hit (see krmDecodeRanges - // in KeyRangeMap.cpp), so if a concurrent split or value-size - // growth pushed the byte limit forward, - // reread.keyServers.back().key can be strictly less than the - // planning-era range.end. The destUnchanged loop only - // validates sub-ranges within reread.keyServers; the - // krmSetRangeCoalescing commits below would otherwise clear - // and rewrite an unverified-and-possibly-foreign tail. - Key rereadEnd = reread.keyServers.back().key; - // The reread is bounded by `range`, so a `>` result would - // mean readShardState broke its contract — fail loudly - // rather than silently expand the commit past the verified - // end. - ASSERT(rereadEnd <= range.end); - if (rereadEnd < range.end) { - CODE_PROBE(true, - "finishMoveShards reread keyServers boundary shorter than planning", - probe::decoration::rare); - range = KeyRangeRef(range.begin, rereadEnd); - } - - if (!destUnchanged(reread.keyServers, reread.uidToTagMap, destServers, dataMoveId)) { - CODE_PROBE( - true, "finishMoveShards dest changed during waitForShardReady", probe::decoration::rare); - TraceEvent(SevWarn, "FinishMoveShardsDestChanged", relocationIntervalId) - .detail("DataMoveID", dataMoveId) - .detail("Range", range); - runPreCheck = false; - co_await retryAfterPostWaitChange(&retries, &tr); - continue; - } - - // Use the freshly-read dataMove snapshot for partial-complete / - // checkpoint-deletion / dataMoveValue writes below; the - // function-level `dataMove` retains the pre-wait snapshot used - // only by the post-loop trace at the end of the function. - DataMoveMetaData postWaitDataMove = reread.dataMove.get(); - - std::vector> actors; - actors.push_back(krmSetRangeCoalescing( - &tr, keyServersPrefix, range, allKeys, keyServersValue(destServers, {}, dataMoveId, UID()))); - - for (const UID& ssId : allServers) { - const bool destHasServer = - std::find(destServers.begin(), destServers.end(), ssId) != destServers.end(); - actors.push_back( - krmSetRangeCoalescing(&tr, - serverKeysPrefixFor(ssId), - range, - allKeys, - destHasServer ? serverKeysValue(dataMoveId) : serverKeysFalse)); - TraceEvent(sevDm, "FinishMoveShardsSetServerKeyRange", relocationIntervalId) - .detail("StorageServerID", ssId) - .detail("KeyRange", range) - .detail("ShardID", destHasServer ? dataMoveId : UID()) - .detail("DataMove", postWaitDataMove.toString()); - } - - co_await waitForAll(actors); - - if (range.end == postWaitDataMove.ranges.front().end) { - if (bulkLoadTaskState.present()) { - BulkLoadTaskState newBulkLoadTaskState; - try { - newBulkLoadTaskState = - co_await getBulkLoadTask(&tr, - bulkLoadTaskState.get().getRange(), - bulkLoadTaskState.get().getTaskId(), - { BulkLoadPhase::Running, BulkLoadPhase::Complete }); - newBulkLoadTaskState.phase = BulkLoadPhase::Complete; - } catch (Error& e) { - if (e.code() == error_code_bulkload_task_outdated) { - cancelDataMove = true; - throw retry(); - } - throw e; - } - ASSERT(newBulkLoadTaskState.getDataMoveId().present() && - newBulkLoadTaskState.getDataMoveId().get() == dataMoveId); - newBulkLoadTaskState.completeTime = now(); - co_await krmSetRange(&tr, - bulkLoadTaskPrefix, - newBulkLoadTaskState.getRange(), - bulkLoadTaskStateValue(newBulkLoadTaskState)); - TraceEvent( - bulkLoadVerboseEventSev(), "DDBulkLoadTaskSetCompleteTransaction", relocationIntervalId) - .detail("DataMoveID", dataMoveId) - .detail("JobID", newBulkLoadTaskState.getJobId()) - .detail("TaskID", newBulkLoadTaskState.getTaskId()); - postWaitDataMove.bulkLoadTaskState = newBulkLoadTaskState; - } - co_await deleteCheckpoints(&tr, postWaitDataMove.checkpoints, dataMoveId); - tr.clear(dataMoveKeyFor(dataMoveId)); - TraceEvent(sevDm, "FinishMoveShardsDeleteMetaData", relocationIntervalId) - .detail("DataMove", postWaitDataMove.toString()); - } else if (!bulkLoadTaskState.present()) { - // Bulk Loading data move does not allow partial complete - TraceEvent(SevInfo, "FinishMoveShardsPartialComplete", relocationIntervalId) - .detail("DataMoveID", dataMoveId) - .detail("CurrentRange", range) - .detail("NewDataMoveMetaData", postWaitDataMove.toString()) - .detail("DataMove", postWaitDataMove.toString()); - postWaitDataMove.ranges.front() = KeyRangeRef(range.end, postWaitDataMove.ranges.front().end); - tr.set(dataMoveKeyFor(dataMoveId), dataMoveValue(postWaitDataMove)); - } - - co_await tr.commit(); - counters->committed->increment(1); - - if (range.end == postWaitDataMove.ranges.front().end && bulkLoadTaskState.present()) { - Version commitVersion = tr.getCommittedVersion(); - TraceEvent( - bulkLoadVerboseEventSev(), "DDBulkLoadTaskPersistCompleteState", relocationIntervalId) - .detail("JobID", bulkLoadTaskState.get().getJobId()) - .detail("DataMoveID", dataMoveId) - .detail("TaskID", bulkLoadTaskState.get().getTaskId()) - .detail("TaskRange", bulkLoadTaskState.get().getRange()) - .detail("CommitVersion", commitVersion); - } - - if (range.end == postWaitDataMove.ranges.front().end) { + if (res == ReverifyShardsResult::FullyCommitted) { // Post validate consistency of update of keyServers and serverKeys if (SERVER_KNOBS->AUDIT_DATAMOVE_POST_CHECK) { co_await auditLocationMetadataPostCheck(occ, @@ -2780,6 +3007,7 @@ static Future finishMoveShards(Database occ, } break; } + // PartialCommitted: keep looping to finish more of this move. continue; } else { // Slow-but-not-stuck dest readiness: do NOT cap with a hard @@ -2801,8 +3029,8 @@ static Future finishMoveShards(Database occ, .detail("DataMoveID", dataMoveId) .detail("Range", range) .detail("Retries", retries) - .detail("ReadyCount", readyServers.size()) - .detail("DestCount", newDestinations.size()); + .detail("ReadyCount", readyCount) + .detail("DestCount", newDestinationsCount); } runPreCheck = false; co_await delay(finishMoveKeysBackoff(retries));