diff --git a/fdbserver/clustercontroller/ClusterRecovery.h b/fdbserver/clustercontroller/ClusterRecovery.h index 240cd7e7fdc..54ed93ee98f 100644 --- a/fdbserver/clustercontroller/ClusterRecovery.h +++ b/fdbserver/clustercontroller/ClusterRecovery.h @@ -270,7 +270,8 @@ struct ClusterRecoveryData : NonCopyable, ReferenceCounted databaseLocked(false), minKnownCommittedVersion(invalidVersion), hasConfiguration(false), coordinators(coordinators), lastVersionTime(0), txnStateStore(nullptr), memoryLimit(2e9), dbId(dbId), masterInterface(masterInterface), masterLifetime(masterLifetimeToken), clusterController(clusterController), - cstate(coordinators, addActor, dbgid), dbInfo(dbInfo), registrationCount(0), addActor(addActor), + cstate(coordinators, addActor, dbgid), dbInfo(dbInfo), registrationCount(0), + recoveryState(RecoveryState::UNINITIALIZED), addActor(addActor), recruitmentStalled(makeReference>(false)), forceRecovery(forceRecovery), neverCreated(false), safeLocality(tagLocalityInvalid), primaryLocality(tagLocalityInvalid), cc("ClusterRecoveryData", dbgid.toString()), changeCoordinatorsRequests("ChangeCoordinatorsRequests", cc), @@ -289,10 +290,12 @@ struct ClusterRecoveryData : NonCopyable, ReferenceCounted getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_DURATION_EVENT_NAME)); clusterRecoveryAvailableEventHolder = makeReference( getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_AVAILABLE_EVENT_NAME)); - logger = cc.traceCounters(getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_METRICS_EVENT_NAME), - dbgid, - SERVER_KNOBS->WORKER_LOGGING_INTERVAL, - getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_METRICS_EVENT_NAME)); + logger = + cc.traceCounters(getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_METRICS_EVENT_NAME), + dbgid, + SERVER_KNOBS->WORKER_LOGGING_INTERVAL, + getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_METRICS_EVENT_NAME), + [this](TraceEvent& te) { te.detail("RecoveryState", static_cast(recoveryState)); }); if (forceRecovery && !controllerData->clusterControllerDcId.present()) { TraceEvent(SevError, "ForcedRecoveryRequiresDcID").log(); forceRecovery = false; diff --git a/fdbserver/commitproxy/CommitProxyServer.cpp b/fdbserver/commitproxy/CommitProxyServer.cpp index 0a05a5461c9..0247ac2f249 100644 --- a/fdbserver/commitproxy/CommitProxyServer.cpp +++ b/fdbserver/commitproxy/CommitProxyServer.cpp @@ -252,6 +252,11 @@ Future commitBatcher(ProxyCommitData* commitData, Future timeout; std::vector batch; int batchBytes = 0; + auto flushBatch = [&](ProxyStats::CommitBatchFlushReason reason) { + commitData->stats.recordCommitBatchFlush(reason); + out.send({ std::move(batch), batchBytes }); + lastBatch = now(); + }; // TODO: Enable this assertion (currently failing with gcc) // static_assert(std::is_nothrow_move_constructible_v); @@ -308,9 +313,11 @@ Future commitBatcher(ProxyCommitData* commitData, if ((batchBytes + bytes > CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT || req.firstInBatch()) && !batch.empty()) { + auto reason = batchBytes + bytes > CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT + ? ProxyStats::CommitBatchFlushReason::TRANSACTION_SIZE_LIMIT + : ProxyStats::CommitBatchFlushReason::FIRST_IN_BATCH; commitData->triggerCommit.set(false); - out.send({ std::move(batch), batchBytes }); - lastBatch = now(); + flushBatch(reason); timeout = delayJittered(commitData->commitBatchInterval, TaskPriority::ProxyCommitBatcher); batch.clear(); batchBytes = 0; @@ -332,9 +339,12 @@ Future commitBatcher(ProxyCommitData* commitData, UNREACHABLE(); } } + auto reason = batch.size() == SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_COUNT_MAX + ? ProxyStats::CommitBatchFlushReason::COUNT_LIMIT + : batchBytes >= desiredBytes ? ProxyStats::CommitBatchFlushReason::BYTE_LIMIT + : ProxyStats::CommitBatchFlushReason::TIMEOUT; commitData->triggerCommit.set(false); - out.send({ std::move(batch), batchBytes }); - lastBatch = now(); + flushBatch(reason); } } @@ -2835,6 +2845,12 @@ Future logDetailedMetrics(ProxyCommitData* commitData) { double startTime = now(); int64_t commitBatchInBaseline = commitData->stats.commitBatchIn.getValue(); + int64_t commitBatchFlushByteLimitBaseline = commitData->stats.commitBatchFlushByteLimit.getValue(); + int64_t commitBatchFlushCountLimitBaseline = commitData->stats.commitBatchFlushCountLimit.getValue(); + int64_t commitBatchFlushTimeoutBaseline = commitData->stats.commitBatchFlushTimeout.getValue(); + int64_t commitBatchFlushFirstInBatchBaseline = commitData->stats.commitBatchFlushFirstInBatch.getValue(); + int64_t commitBatchFlushTransactionSizeLimitBaseline = + commitData->stats.commitBatchFlushTransactionSizeLimit.getValue(); int64_t txnCommitInBaseline = commitData->stats.txnCommitIn.getValue(); int64_t mutationsBaseline = commitData->stats.mutations.getValue(); int64_t mutationBytesBaseline = commitData->stats.mutationBytes.getValue(); @@ -2842,20 +2858,39 @@ Future logDetailedMetrics(ProxyCommitData* commitData) { co_await delay(SERVER_KNOBS->BURSTINESS_METRICS_LOG_INTERVAL); int64_t commitBatchInReal = commitData->stats.commitBatchIn.getValue(); + int64_t commitBatchFlushByteLimitReal = commitData->stats.commitBatchFlushByteLimit.getValue(); + int64_t commitBatchFlushCountLimitReal = commitData->stats.commitBatchFlushCountLimit.getValue(); + int64_t commitBatchFlushTimeoutReal = commitData->stats.commitBatchFlushTimeout.getValue(); + int64_t commitBatchFlushFirstInBatchReal = commitData->stats.commitBatchFlushFirstInBatch.getValue(); + int64_t commitBatchFlushTransactionSizeLimitReal = + commitData->stats.commitBatchFlushTransactionSizeLimit.getValue(); int64_t txnCommitInReal = commitData->stats.txnCommitIn.getValue(); int64_t mutationsReal = commitData->stats.mutations.getValue(); int64_t mutationBytesReal = commitData->stats.mutationBytes.getValue(); // Don't log anything if any of the counters got reset during the wait // interval. Assume that typically all the counters get reset at once. - if (commitBatchInReal < commitBatchInBaseline || txnCommitInReal < txnCommitInBaseline || - mutationsReal < mutationsBaseline || mutationBytesReal < mutationBytesBaseline) { + if (commitBatchInReal < commitBatchInBaseline || + commitBatchFlushByteLimitReal < commitBatchFlushByteLimitBaseline || + commitBatchFlushCountLimitReal < commitBatchFlushCountLimitBaseline || + commitBatchFlushTimeoutReal < commitBatchFlushTimeoutBaseline || + commitBatchFlushFirstInBatchReal < commitBatchFlushFirstInBatchBaseline || + commitBatchFlushTransactionSizeLimitReal < commitBatchFlushTransactionSizeLimitBaseline || + txnCommitInReal < txnCommitInBaseline || mutationsReal < mutationsBaseline || + mutationBytesReal < mutationBytesBaseline) { continue; } TraceEvent("ProxyDetailedMetrics") .detail("Elapsed", now() - startTime) .detail("CommitBatchIn", commitBatchInReal - commitBatchInBaseline) + .detail("CommitBatchFlushByteLimit", commitBatchFlushByteLimitReal - commitBatchFlushByteLimitBaseline) + .detail("CommitBatchFlushCountLimit", commitBatchFlushCountLimitReal - commitBatchFlushCountLimitBaseline) + .detail("CommitBatchFlushTimeout", commitBatchFlushTimeoutReal - commitBatchFlushTimeoutBaseline) + .detail("CommitBatchFlushFirstInBatch", + commitBatchFlushFirstInBatchReal - commitBatchFlushFirstInBatchBaseline) + .detail("CommitBatchFlushTransactionSizeLimit", + commitBatchFlushTransactionSizeLimitReal - commitBatchFlushTransactionSizeLimitBaseline) .detail("TxnCommitIn", txnCommitInReal - txnCommitInBaseline) .detail("Mutations", mutationsReal - mutationsBaseline) .detail("MutationBytes", mutationBytesReal - mutationBytesBaseline) diff --git a/fdbserver/commitproxy/ProxyCommitData.h b/fdbserver/commitproxy/ProxyCommitData.h index a5684948ddd..83a74e5ae15 100644 --- a/fdbserver/commitproxy/ProxyCommitData.h +++ b/fdbserver/commitproxy/ProxyCommitData.h @@ -54,12 +54,22 @@ struct Descriptor class LogSystemDiskQueueAdapter; struct ProxyStats { + enum class CommitBatchFlushReason { + BYTE_LIMIT, + COUNT_LIMIT, + TIMEOUT, + FIRST_IN_BATCH, + TRANSACTION_SIZE_LIMIT, + }; + CounterCollection cc; Counter txnCommitIn, txnCommitVersionAssigned, txnCommitResolving, txnCommitResolved, txnCommitOut, txnCommitOutSuccess, txnCommitErrors; Counter txnConflicts; Counter txnRejectedForQueuedTooLong; Counter commitBatchIn, commitBatchOut; + Counter commitBatchFlushByteLimit, commitBatchFlushCountLimit, commitBatchFlushTimeout, + commitBatchFlushFirstInBatch, commitBatchFlushTransactionSizeLimit; Counter mutationBytes; Counter mutations; Counter conflictRanges; @@ -120,6 +130,26 @@ struct ProxyStats { return r; } + void recordCommitBatchFlush(CommitBatchFlushReason reason) { + switch (reason) { + case CommitBatchFlushReason::BYTE_LIMIT: + ++commitBatchFlushByteLimit; + break; + case CommitBatchFlushReason::COUNT_LIMIT: + ++commitBatchFlushCountLimit; + break; + case CommitBatchFlushReason::TIMEOUT: + ++commitBatchFlushTimeout; + break; + case CommitBatchFlushReason::FIRST_IN_BATCH: + ++commitBatchFlushFirstInBatch; + break; + case CommitBatchFlushReason::TRANSACTION_SIZE_LIMIT: + ++commitBatchFlushTransactionSizeLimit; + break; + } + } + explicit ProxyStats(UID id, NotifiedVersion* pVersion, NotifiedVersion* pCommittedVersion, @@ -129,8 +159,13 @@ struct ProxyStats { txnCommitResolved("TxnCommitResolved", cc), txnCommitOut("TxnCommitOut", cc), txnCommitOutSuccess("TxnCommitOutSuccess", cc), txnCommitErrors("TxnCommitErrors", cc), txnConflicts("TxnConflicts", cc), txnRejectedForQueuedTooLong("TxnRejectedForQueuedTooLong", cc), - commitBatchIn("CommitBatchIn", cc), commitBatchOut("CommitBatchOut", cc), mutationBytes("MutationBytes", cc), - mutations("Mutations", cc), conflictRanges("ConflictRanges", cc), + commitBatchIn("CommitBatchIn", cc), commitBatchOut("CommitBatchOut", cc), + commitBatchFlushByteLimit("CommitBatchFlushByteLimit", cc), + commitBatchFlushCountLimit("CommitBatchFlushCountLimit", cc), + commitBatchFlushTimeout("CommitBatchFlushTimeout", cc), + commitBatchFlushFirstInBatch("CommitBatchFlushFirstInBatch", cc), + commitBatchFlushTransactionSizeLimit("CommitBatchFlushTransactionSizeLimit", cc), + mutationBytes("MutationBytes", cc), mutations("Mutations", cc), conflictRanges("ConflictRanges", cc), keyServerLocationIn("KeyServerLocationIn", cc), keyServerLocationOut("KeyServerLocationOut", cc), keyServerLocationErrors("KeyServerLocationErrors", cc), txnExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc), rangeLockFastPath("RangeLockFastPath", cc), diff --git a/fdbserver/tlog/TLogServer.cpp b/fdbserver/tlog/TLogServer.cpp index c66408efc75..846858e7fde 100644 --- a/fdbserver/tlog/TLogServer.cpp +++ b/fdbserver/tlog/TLogServer.cpp @@ -134,7 +134,7 @@ struct TLogQueue final : public IClosable { Future initializeRecovery(IDiskQueue::location recoverAt) { return queue->initializeRecovery(recoverAt); } template - void push(T const& qe, Reference logData); + uint32_t push(T const& qe, Reference logData); void forgetBefore(Version upToVersion, Reference logData); void pop(IDiskQueue::location upToLocation); Future commit() { return queue->commit(); } @@ -325,6 +325,7 @@ struct TLogData : NonCopyable { TLogQueue* persistentQueue; // Logical queue the log operates on and persist its data. int64_t diskQueueCommitBytes; + uint64_t diskQueueCommitSerializedBytes; AsyncVar largeDiskQueueCommitBytes; // becomes true when diskQueueCommitBytes is greater than MAX_QUEUE_COMMIT_BYTES @@ -380,6 +381,12 @@ struct TLogData : NonCopyable { // and ends when the data is flushed and durable. Reference timeUntilDurableDist; + // Distribution of serialized queue entry sizes passed to IDiskQueue::push. + Reference diskQueueWriteSizeDist; + + // Distribution of aggregate serialized bytes included in each IDiskQueue::commit. + Reference diskQueueCommitSizeDist; + // Controls whether the health monitoring running in this TLog force checking any other processes are degraded. Reference> enablePrimaryTxnSystemHealthCheck; @@ -394,19 +401,30 @@ struct TLogData : NonCopyable { Reference> enablePrimaryTxnSystemHealthCheck) : dbgid(dbgid), workerID(workerID), persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)), diskQueueCommitBytes(0), - largeDiskQueueCommitBytes(false), dbInfo(dbInfo), queueCommitEnd(0), queueCommitBegin(0), - instanceID(deterministicRandom()->randomUniqueID().first()), bytesInput(0), bytesDurable(0), - targetVolatileBytes(SERVER_KNOBS->TLOG_SPILL_THRESHOLD), overheadBytesInput(0), overheadBytesDurable(0), - peekMemoryLimiter(SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES), + diskQueueCommitSerializedBytes(0), largeDiskQueueCommitBytes(false), dbInfo(dbInfo), queueCommitEnd(0), + queueCommitBegin(0), instanceID(deterministicRandom()->randomUniqueID().first()), bytesInput(0), + bytesDurable(0), targetVolatileBytes(SERVER_KNOBS->TLOG_SPILL_THRESHOLD), overheadBytesInput(0), + overheadBytesDurable(0), peekMemoryLimiter(SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES), concurrentLogRouterReads(SERVER_KNOBS->CONCURRENT_LOG_ROUTER_READS), persistentDataCommitLock(new FlowLock()), ignorePopDeadline(0), dataFolder(folder), degraded(degraded), lowDiskTLogExclusion(lowDiskTLogExclusion), commitLatencyDist(Histogram::getHistogram("tLog"_sr, "commit"_sr, Histogram::Unit::milliseconds)), queueWaitLatencyDist(Histogram::getHistogram("tLog"_sr, "QueueWait"_sr, Histogram::Unit::milliseconds)), timeUntilDurableDist(Histogram::getHistogram("tLog"_sr, "TimeUntilDurable"_sr, Histogram::Unit::milliseconds)), + diskQueueWriteSizeDist(Histogram::getHistogram("tLog"_sr, "DiskQueueWriteSize"_sr, Histogram::Unit::bytes)), + diskQueueCommitSizeDist(Histogram::getHistogram("tLog"_sr, "DiskQueueCommitSize"_sr, Histogram::Unit::bytes)), enablePrimaryTxnSystemHealthCheck(enablePrimaryTxnSystemHealthCheck) { cx = openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True); } + void recordDiskQueueWrite(uint32_t serializedBytes, size_t mutationBytes) { + diskQueueWriteSizeDist->sample(serializedBytes); + diskQueueCommitSerializedBytes += serializedBytes; + diskQueueCommitBytes += mutationBytes; + if (diskQueueCommitBytes > SERVER_KNOBS->MAX_QUEUE_COMMIT_BYTES) { + largeDiskQueueCommitBytes.set(true); + } + } + double availableSpaceRatio(StorageBytes const& kvStoreBytes, StorageBytes const& queueBytes) const { auto ratio = [](StorageBytes const& storageBytes) -> double { return storageBytes.total > 0 ? double(storageBytes.available) / storageBytes.total : 1.0; @@ -822,7 +840,7 @@ struct LogData : NonCopyable, public ReferenceCounted { }; template -void TLogQueue::push(T const& qe, Reference logData) { +uint32_t TLogQueue::push(T const& qe, Reference logData) { BinaryWriter wr(Unversioned()); // outer framing is not versioned wr << uint32_t(0); IncludeVersion(ProtocolVersion::withTLogQueueEntryRef()).write(wr); // payload is versioned @@ -834,6 +852,7 @@ void TLogQueue::push(T const& qe, Reference logData) { const IDiskQueue::location endloc = queue->push(wr.toValue()); //TraceEvent("TLogQueueVersionWritten", dbgid).detail("Size", wr.getLength() - sizeof(uint32_t) - sizeof(uint8_t)).detail("Loc", loc); logData->versionLocation[qe.version] = std::make_pair(startloc, endloc); + return static_cast(wr.getLength()); } void TLogQueue::forgetBefore(Version upToVersion, Reference logData) { @@ -2301,8 +2320,13 @@ Future doQueueCommit(TLogData* self, logData->queueCommittingVersion = ver; g_network->setCurrentTask(TaskPriority::TLogCommitReply); + if (self->diskQueueCommitSerializedBytes > 0) { + self->diskQueueCommitSizeDist->sample(static_cast( + std::min(self->diskQueueCommitSerializedBytes, std::numeric_limits::max()))); + } Future c = self->persistentQueue->commit(); self->diskQueueCommitBytes = 0; + self->diskQueueCommitSerializedBytes = 0; self->largeDiskQueueCommitBytes.set(false); co_await ioDegradedOrTimeoutError( @@ -2479,12 +2503,7 @@ Future tLogCommit(TLogData* self, qe.knownCommittedVersion = logData->knownCommittedVersion; qe.messages = req.messages; qe.id = logData->logId; - self->persistentQueue->push(qe, logData); - - self->diskQueueCommitBytes += qe.expectedSize(); - if (self->diskQueueCommitBytes > SERVER_KNOBS->MAX_QUEUE_COMMIT_BYTES) { - self->largeDiskQueueCommitBytes.set(true); - } + self->recordDiskQueueWrite(self->persistentQueue->push(qe, logData), qe.expectedSize()); // Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages actors logData->version.set(req.version); if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) { @@ -3157,12 +3176,7 @@ Future pullAsyncData(TLogData* self, qe.knownCommittedVersion = logData->knownCommittedVersion; qe.alternativeMessages = &messages; qe.id = logData->logId; - self->persistentQueue->push(qe, logData); - - self->diskQueueCommitBytes += qe.expectedSize(); - if (self->diskQueueCommitBytes > SERVER_KNOBS->MAX_QUEUE_COMMIT_BYTES) { - self->largeDiskQueueCommitBytes.set(true); - } + self->recordDiskQueueWrite(self->persistentQueue->push(qe, logData), qe.expectedSize()); // Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages // actors @@ -3201,12 +3215,7 @@ Future pullAsyncData(TLogData* self, qe.knownCommittedVersion = logData->knownCommittedVersion; qe.messages = StringRef(); qe.id = logData->logId; - self->persistentQueue->push(qe, logData); - - self->diskQueueCommitBytes += qe.expectedSize(); - if (self->diskQueueCommitBytes > SERVER_KNOBS->MAX_QUEUE_COMMIT_BYTES) { - self->largeDiskQueueCommitBytes.set(true); - } + self->recordDiskQueueWrite(self->persistentQueue->push(qe, logData), qe.expectedSize()); // Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages // actors @@ -3831,12 +3840,7 @@ Future tLogStart(TLogData* self, InitializeTLogRequest req, LocalityData l qe.knownCommittedVersion = logData->knownCommittedVersion; qe.messages = StringRef(); qe.id = logData->logId; - self->persistentQueue->push(qe, logData); - - self->diskQueueCommitBytes += qe.expectedSize(); - if (self->diskQueueCommitBytes > SERVER_KNOBS->MAX_QUEUE_COMMIT_BYTES) { - self->largeDiskQueueCommitBytes.set(true); - } + self->recordDiskQueueWrite(self->persistentQueue->push(qe, logData), qe.expectedSize()); logData->version.set(lastVersionPrevEpoch); }