Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions fdbserver/clustercontroller/ClusterRecovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ struct ClusterRecoveryData : NonCopyable, ReferenceCounted<ClusterRecoveryData>
databaseLocked(false), minKnownCommittedVersion(invalidVersion), hasConfiguration(false),
coordinators(coordinators), lastVersionTime(0), txnStateStore(nullptr), memoryLimit(2e9), dbId(dbId),
masterInterface(masterInterface), masterLifetime(masterLifetimeToken), clusterController(clusterController),
cstate(coordinators, addActor, dbgid), dbInfo(dbInfo), registrationCount(0), addActor(addActor),
cstate(coordinators, addActor, dbgid), dbInfo(dbInfo), registrationCount(0),
recoveryState(RecoveryState::UNINITIALIZED), addActor(addActor),
recruitmentStalled(makeReference<AsyncVar<bool>>(false)), forceRecovery(forceRecovery), neverCreated(false),
safeLocality(tagLocalityInvalid), primaryLocality(tagLocalityInvalid),
cc("ClusterRecoveryData", dbgid.toString()), changeCoordinatorsRequests("ChangeCoordinatorsRequests", cc),
Expand All @@ -289,10 +290,12 @@ struct ClusterRecoveryData : NonCopyable, ReferenceCounted<ClusterRecoveryData>
getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_DURATION_EVENT_NAME));
clusterRecoveryAvailableEventHolder = makeReference<EventCacheHolder>(
getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_AVAILABLE_EVENT_NAME));
logger = cc.traceCounters(getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_METRICS_EVENT_NAME),
dbgid,
SERVER_KNOBS->WORKER_LOGGING_INTERVAL,
getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_METRICS_EVENT_NAME));
logger =
cc.traceCounters(getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_METRICS_EVENT_NAME),
dbgid,
SERVER_KNOBS->WORKER_LOGGING_INTERVAL,
getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_METRICS_EVENT_NAME),
[this](TraceEvent& te) { te.detail("RecoveryState", static_cast<int>(recoveryState)); });
if (forceRecovery && !controllerData->clusterControllerDcId.present()) {
TraceEvent(SevError, "ForcedRecoveryRequiresDcID").log();
forceRecovery = false;
Expand Down
47 changes: 41 additions & 6 deletions fdbserver/commitproxy/CommitProxyServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,11 @@ Future<Void> commitBatcher(ProxyCommitData* commitData,
Future<Void> timeout;
std::vector<CommitTransactionRequest> batch;
int batchBytes = 0;
auto flushBatch = [&](ProxyStats::CommitBatchFlushReason reason) {
commitData->stats.recordCommitBatchFlush(reason);
out.send({ std::move(batch), batchBytes });
lastBatch = now();
};
// TODO: Enable this assertion (currently failing with gcc)
// static_assert(std::is_nothrow_move_constructible_v<CommitTransactionRequest>);

Expand Down Expand Up @@ -308,9 +313,11 @@ Future<Void> commitBatcher(ProxyCommitData* commitData,

if ((batchBytes + bytes > CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT || req.firstInBatch()) &&
!batch.empty()) {
auto reason = batchBytes + bytes > CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT
? ProxyStats::CommitBatchFlushReason::TRANSACTION_SIZE_LIMIT
: ProxyStats::CommitBatchFlushReason::FIRST_IN_BATCH;
commitData->triggerCommit.set(false);
out.send({ std::move(batch), batchBytes });
lastBatch = now();
flushBatch(reason);
timeout = delayJittered(commitData->commitBatchInterval, TaskPriority::ProxyCommitBatcher);
batch.clear();
batchBytes = 0;
Expand All @@ -332,9 +339,12 @@ Future<Void> commitBatcher(ProxyCommitData* commitData,
UNREACHABLE();
}
}
auto reason = batch.size() == SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_COUNT_MAX
? ProxyStats::CommitBatchFlushReason::COUNT_LIMIT
: batchBytes >= desiredBytes ? ProxyStats::CommitBatchFlushReason::BYTE_LIMIT
: ProxyStats::CommitBatchFlushReason::TIMEOUT;
commitData->triggerCommit.set(false);
out.send({ std::move(batch), batchBytes });
lastBatch = now();
flushBatch(reason);
}
}

Expand Down Expand Up @@ -2835,27 +2845,52 @@ Future<Void> logDetailedMetrics(ProxyCommitData* commitData) {

double startTime = now();
int64_t commitBatchInBaseline = commitData->stats.commitBatchIn.getValue();
int64_t commitBatchFlushByteLimitBaseline = commitData->stats.commitBatchFlushByteLimit.getValue();
int64_t commitBatchFlushCountLimitBaseline = commitData->stats.commitBatchFlushCountLimit.getValue();
int64_t commitBatchFlushTimeoutBaseline = commitData->stats.commitBatchFlushTimeout.getValue();
int64_t commitBatchFlushFirstInBatchBaseline = commitData->stats.commitBatchFlushFirstInBatch.getValue();
int64_t commitBatchFlushTransactionSizeLimitBaseline =
commitData->stats.commitBatchFlushTransactionSizeLimit.getValue();
int64_t txnCommitInBaseline = commitData->stats.txnCommitIn.getValue();
int64_t mutationsBaseline = commitData->stats.mutations.getValue();
int64_t mutationBytesBaseline = commitData->stats.mutationBytes.getValue();

co_await delay(SERVER_KNOBS->BURSTINESS_METRICS_LOG_INTERVAL);

int64_t commitBatchInReal = commitData->stats.commitBatchIn.getValue();
int64_t commitBatchFlushByteLimitReal = commitData->stats.commitBatchFlushByteLimit.getValue();
int64_t commitBatchFlushCountLimitReal = commitData->stats.commitBatchFlushCountLimit.getValue();
int64_t commitBatchFlushTimeoutReal = commitData->stats.commitBatchFlushTimeout.getValue();
int64_t commitBatchFlushFirstInBatchReal = commitData->stats.commitBatchFlushFirstInBatch.getValue();
int64_t commitBatchFlushTransactionSizeLimitReal =
commitData->stats.commitBatchFlushTransactionSizeLimit.getValue();
int64_t txnCommitInReal = commitData->stats.txnCommitIn.getValue();
int64_t mutationsReal = commitData->stats.mutations.getValue();
int64_t mutationBytesReal = commitData->stats.mutationBytes.getValue();

// Don't log anything if any of the counters got reset during the wait
// interval. Assume that typically all the counters get reset at once.
if (commitBatchInReal < commitBatchInBaseline || txnCommitInReal < txnCommitInBaseline ||
mutationsReal < mutationsBaseline || mutationBytesReal < mutationBytesBaseline) {
if (commitBatchInReal < commitBatchInBaseline ||
commitBatchFlushByteLimitReal < commitBatchFlushByteLimitBaseline ||
commitBatchFlushCountLimitReal < commitBatchFlushCountLimitBaseline ||
commitBatchFlushTimeoutReal < commitBatchFlushTimeoutBaseline ||
commitBatchFlushFirstInBatchReal < commitBatchFlushFirstInBatchBaseline ||
commitBatchFlushTransactionSizeLimitReal < commitBatchFlushTransactionSizeLimitBaseline ||
txnCommitInReal < txnCommitInBaseline || mutationsReal < mutationsBaseline ||
mutationBytesReal < mutationBytesBaseline) {
continue;
}

TraceEvent("ProxyDetailedMetrics")
.detail("Elapsed", now() - startTime)
.detail("CommitBatchIn", commitBatchInReal - commitBatchInBaseline)
.detail("CommitBatchFlushByteLimit", commitBatchFlushByteLimitReal - commitBatchFlushByteLimitBaseline)
.detail("CommitBatchFlushCountLimit", commitBatchFlushCountLimitReal - commitBatchFlushCountLimitBaseline)
.detail("CommitBatchFlushTimeout", commitBatchFlushTimeoutReal - commitBatchFlushTimeoutBaseline)
.detail("CommitBatchFlushFirstInBatch",
commitBatchFlushFirstInBatchReal - commitBatchFlushFirstInBatchBaseline)
.detail("CommitBatchFlushTransactionSizeLimit",
commitBatchFlushTransactionSizeLimitReal - commitBatchFlushTransactionSizeLimitBaseline)
.detail("TxnCommitIn", txnCommitInReal - txnCommitInBaseline)
.detail("Mutations", mutationsReal - mutationsBaseline)
.detail("MutationBytes", mutationBytesReal - mutationBytesBaseline)
Expand Down
39 changes: 37 additions & 2 deletions fdbserver/commitproxy/ProxyCommitData.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,22 @@ struct Descriptor<SingleKeyMutationDescriptor>
class LogSystemDiskQueueAdapter;

struct ProxyStats {
enum class CommitBatchFlushReason {
BYTE_LIMIT,
COUNT_LIMIT,
TIMEOUT,
FIRST_IN_BATCH,
TRANSACTION_SIZE_LIMIT,
};

CounterCollection cc;
Counter txnCommitIn, txnCommitVersionAssigned, txnCommitResolving, txnCommitResolved, txnCommitOut,
txnCommitOutSuccess, txnCommitErrors;
Counter txnConflicts;
Counter txnRejectedForQueuedTooLong;
Counter commitBatchIn, commitBatchOut;
Counter commitBatchFlushByteLimit, commitBatchFlushCountLimit, commitBatchFlushTimeout,
commitBatchFlushFirstInBatch, commitBatchFlushTransactionSizeLimit;
Counter mutationBytes;
Counter mutations;
Counter conflictRanges;
Expand Down Expand Up @@ -120,6 +130,26 @@ struct ProxyStats {
return r;
}

void recordCommitBatchFlush(CommitBatchFlushReason reason) {
switch (reason) {
case CommitBatchFlushReason::BYTE_LIMIT:
++commitBatchFlushByteLimit;
break;
case CommitBatchFlushReason::COUNT_LIMIT:
++commitBatchFlushCountLimit;
break;
case CommitBatchFlushReason::TIMEOUT:
++commitBatchFlushTimeout;
break;
case CommitBatchFlushReason::FIRST_IN_BATCH:
++commitBatchFlushFirstInBatch;
break;
case CommitBatchFlushReason::TRANSACTION_SIZE_LIMIT:
++commitBatchFlushTransactionSizeLimit;
break;
}
}

explicit ProxyStats(UID id,
NotifiedVersion* pVersion,
NotifiedVersion* pCommittedVersion,
Expand All @@ -129,8 +159,13 @@ struct ProxyStats {
txnCommitResolved("TxnCommitResolved", cc), txnCommitOut("TxnCommitOut", cc),
txnCommitOutSuccess("TxnCommitOutSuccess", cc), txnCommitErrors("TxnCommitErrors", cc),
txnConflicts("TxnConflicts", cc), txnRejectedForQueuedTooLong("TxnRejectedForQueuedTooLong", cc),
commitBatchIn("CommitBatchIn", cc), commitBatchOut("CommitBatchOut", cc), mutationBytes("MutationBytes", cc),
mutations("Mutations", cc), conflictRanges("ConflictRanges", cc),
commitBatchIn("CommitBatchIn", cc), commitBatchOut("CommitBatchOut", cc),
commitBatchFlushByteLimit("CommitBatchFlushByteLimit", cc),
commitBatchFlushCountLimit("CommitBatchFlushCountLimit", cc),
commitBatchFlushTimeout("CommitBatchFlushTimeout", cc),
commitBatchFlushFirstInBatch("CommitBatchFlushFirstInBatch", cc),
commitBatchFlushTransactionSizeLimit("CommitBatchFlushTransactionSizeLimit", cc),
mutationBytes("MutationBytes", cc), mutations("Mutations", cc), conflictRanges("ConflictRanges", cc),
keyServerLocationIn("KeyServerLocationIn", cc), keyServerLocationOut("KeyServerLocationOut", cc),
keyServerLocationErrors("KeyServerLocationErrors", cc),
txnExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc), rangeLockFastPath("RangeLockFastPath", cc),
Expand Down
64 changes: 34 additions & 30 deletions fdbserver/tlog/TLogServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ struct TLogQueue final : public IClosable {
Future<bool> initializeRecovery(IDiskQueue::location recoverAt) { return queue->initializeRecovery(recoverAt); }

template <class T>
void push(T const& qe, Reference<LogData> logData);
uint32_t push(T const& qe, Reference<LogData> logData);
void forgetBefore(Version upToVersion, Reference<LogData> logData);
void pop(IDiskQueue::location upToLocation);
Future<Void> commit() { return queue->commit(); }
Expand Down Expand Up @@ -325,6 +325,7 @@ struct TLogData : NonCopyable {
TLogQueue* persistentQueue; // Logical queue the log operates on and persist its data.

int64_t diskQueueCommitBytes;
uint64_t diskQueueCommitSerializedBytes;
AsyncVar<bool>
largeDiskQueueCommitBytes; // becomes true when diskQueueCommitBytes is greater than MAX_QUEUE_COMMIT_BYTES

Expand Down Expand Up @@ -380,6 +381,12 @@ struct TLogData : NonCopyable {
// and ends when the data is flushed and durable.
Reference<Histogram> timeUntilDurableDist;

// Distribution of serialized queue entry sizes passed to IDiskQueue::push.
Reference<Histogram> diskQueueWriteSizeDist;

// Distribution of aggregate serialized bytes included in each IDiskQueue::commit.
Reference<Histogram> diskQueueCommitSizeDist;

// Controls whether the health monitoring running in this TLog force checking any other processes are degraded.
Reference<AsyncVar<bool>> enablePrimaryTxnSystemHealthCheck;

Expand All @@ -394,19 +401,30 @@ struct TLogData : NonCopyable {
Reference<AsyncVar<bool>> enablePrimaryTxnSystemHealthCheck)
: dbgid(dbgid), workerID(workerID), persistentData(persistentData), rawPersistentQueue(persistentQueue),
persistentQueue(new TLogQueue(persistentQueue, dbgid)), diskQueueCommitBytes(0),
largeDiskQueueCommitBytes(false), dbInfo(dbInfo), queueCommitEnd(0), queueCommitBegin(0),
instanceID(deterministicRandom()->randomUniqueID().first()), bytesInput(0), bytesDurable(0),
targetVolatileBytes(SERVER_KNOBS->TLOG_SPILL_THRESHOLD), overheadBytesInput(0), overheadBytesDurable(0),
peekMemoryLimiter(SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES),
diskQueueCommitSerializedBytes(0), largeDiskQueueCommitBytes(false), dbInfo(dbInfo), queueCommitEnd(0),
queueCommitBegin(0), instanceID(deterministicRandom()->randomUniqueID().first()), bytesInput(0),
bytesDurable(0), targetVolatileBytes(SERVER_KNOBS->TLOG_SPILL_THRESHOLD), overheadBytesInput(0),
overheadBytesDurable(0), peekMemoryLimiter(SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES),
concurrentLogRouterReads(SERVER_KNOBS->CONCURRENT_LOG_ROUTER_READS), persistentDataCommitLock(new FlowLock()),
ignorePopDeadline(0), dataFolder(folder), degraded(degraded), lowDiskTLogExclusion(lowDiskTLogExclusion),
commitLatencyDist(Histogram::getHistogram("tLog"_sr, "commit"_sr, Histogram::Unit::milliseconds)),
queueWaitLatencyDist(Histogram::getHistogram("tLog"_sr, "QueueWait"_sr, Histogram::Unit::milliseconds)),
timeUntilDurableDist(Histogram::getHistogram("tLog"_sr, "TimeUntilDurable"_sr, Histogram::Unit::milliseconds)),
diskQueueWriteSizeDist(Histogram::getHistogram("tLog"_sr, "DiskQueueWriteSize"_sr, Histogram::Unit::bytes)),
diskQueueCommitSizeDist(Histogram::getHistogram("tLog"_sr, "DiskQueueCommitSize"_sr, Histogram::Unit::bytes)),
enablePrimaryTxnSystemHealthCheck(enablePrimaryTxnSystemHealthCheck) {
cx = openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True);
}

void recordDiskQueueWrite(uint32_t serializedBytes, size_t mutationBytes) {
diskQueueWriteSizeDist->sample(serializedBytes);
diskQueueCommitSerializedBytes += serializedBytes;
diskQueueCommitBytes += mutationBytes;
if (diskQueueCommitBytes > SERVER_KNOBS->MAX_QUEUE_COMMIT_BYTES) {
largeDiskQueueCommitBytes.set(true);
}
}

double availableSpaceRatio(StorageBytes const& kvStoreBytes, StorageBytes const& queueBytes) const {
auto ratio = [](StorageBytes const& storageBytes) -> double {
return storageBytes.total > 0 ? double(storageBytes.available) / storageBytes.total : 1.0;
Expand Down Expand Up @@ -822,7 +840,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
};

template <class T>
void TLogQueue::push(T const& qe, Reference<LogData> logData) {
uint32_t TLogQueue::push(T const& qe, Reference<LogData> logData) {
BinaryWriter wr(Unversioned()); // outer framing is not versioned
wr << uint32_t(0);
IncludeVersion(ProtocolVersion::withTLogQueueEntryRef()).write(wr); // payload is versioned
Expand All @@ -834,6 +852,7 @@ void TLogQueue::push(T const& qe, Reference<LogData> logData) {
const IDiskQueue::location endloc = queue->push(wr.toValue());
//TraceEvent("TLogQueueVersionWritten", dbgid).detail("Size", wr.getLength() - sizeof(uint32_t) - sizeof(uint8_t)).detail("Loc", loc);
logData->versionLocation[qe.version] = std::make_pair(startloc, endloc);
return static_cast<uint32_t>(wr.getLength());
}

void TLogQueue::forgetBefore(Version upToVersion, Reference<LogData> logData) {
Expand Down Expand Up @@ -2301,8 +2320,13 @@ Future<Void> doQueueCommit(TLogData* self,
logData->queueCommittingVersion = ver;

g_network->setCurrentTask(TaskPriority::TLogCommitReply);
if (self->diskQueueCommitSerializedBytes > 0) {
self->diskQueueCommitSizeDist->sample(static_cast<uint32_t>(
std::min<uint64_t>(self->diskQueueCommitSerializedBytes, std::numeric_limits<uint32_t>::max())));
}
Future<Void> c = self->persistentQueue->commit();
self->diskQueueCommitBytes = 0;
self->diskQueueCommitSerializedBytes = 0;
self->largeDiskQueueCommitBytes.set(false);

co_await ioDegradedOrTimeoutError(
Expand Down Expand Up @@ -2479,12 +2503,7 @@ Future<Void> tLogCommit(TLogData* self,
qe.knownCommittedVersion = logData->knownCommittedVersion;
qe.messages = req.messages;
qe.id = logData->logId;
self->persistentQueue->push(qe, logData);

self->diskQueueCommitBytes += qe.expectedSize();
if (self->diskQueueCommitBytes > SERVER_KNOBS->MAX_QUEUE_COMMIT_BYTES) {
self->largeDiskQueueCommitBytes.set(true);
}
self->recordDiskQueueWrite(self->persistentQueue->push(qe, logData), qe.expectedSize());
// Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages actors
logData->version.set(req.version);
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
Expand Down Expand Up @@ -3157,12 +3176,7 @@ Future<Void> pullAsyncData(TLogData* self,
qe.knownCommittedVersion = logData->knownCommittedVersion;
qe.alternativeMessages = &messages;
qe.id = logData->logId;
self->persistentQueue->push(qe, logData);

self->diskQueueCommitBytes += qe.expectedSize();
if (self->diskQueueCommitBytes > SERVER_KNOBS->MAX_QUEUE_COMMIT_BYTES) {
self->largeDiskQueueCommitBytes.set(true);
}
self->recordDiskQueueWrite(self->persistentQueue->push(qe, logData), qe.expectedSize());

// Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages
// actors
Expand Down Expand Up @@ -3201,12 +3215,7 @@ Future<Void> pullAsyncData(TLogData* self,
qe.knownCommittedVersion = logData->knownCommittedVersion;
qe.messages = StringRef();
qe.id = logData->logId;
self->persistentQueue->push(qe, logData);

self->diskQueueCommitBytes += qe.expectedSize();
if (self->diskQueueCommitBytes > SERVER_KNOBS->MAX_QUEUE_COMMIT_BYTES) {
self->largeDiskQueueCommitBytes.set(true);
}
self->recordDiskQueueWrite(self->persistentQueue->push(qe, logData), qe.expectedSize());

// Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages
// actors
Expand Down Expand Up @@ -3831,12 +3840,7 @@ Future<Void> tLogStart(TLogData* self, InitializeTLogRequest req, LocalityData l
qe.knownCommittedVersion = logData->knownCommittedVersion;
qe.messages = StringRef();
qe.id = logData->logId;
self->persistentQueue->push(qe, logData);

self->diskQueueCommitBytes += qe.expectedSize();
if (self->diskQueueCommitBytes > SERVER_KNOBS->MAX_QUEUE_COMMIT_BYTES) {
self->largeDiskQueueCommitBytes.set(true);
}
self->recordDiskQueueWrite(self->persistentQueue->push(qe, logData), qe.expectedSize());
logData->version.set(lastVersionPrevEpoch);
}

Expand Down
Loading