Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions design/cluster_health_metric.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,19 @@ Behavior:
- Returns `SELF_HEALING` if recovery is at or above `ACCEPTING_COMMITS` but below `FULLY_RECOVERED`.
- Returns `HEALTHY` at `FULLY_RECOVERED`.

### `CoordinatorReachability`

Source:
- `GetLeaderRequest` sent directly to each coordinator configured in the cluster connection string

Fields used:
- none; this factor checks whether each coordinator replies before the probe timeout

Behavior:
- Returns `INTERVENTION_REQUIRED` if any configured coordinator is missing or unreachable.
- Returns `HEALTHY` if every configured coordinator replies.
- Returns `METRICS_MISSING` if no coordinator list is available.

### `ProcessErrors`

Source event:
Expand Down Expand Up @@ -162,6 +175,7 @@ Fields:
- `FactorTLogSpace`: same enum
- `FactorStorageReplication`: same enum
- `FactorRecoveryState`: same enum
- `FactorCoordinatorReachability`: same enum
- `FactorProcessErrors`: same enum
- `FactorRkThrottling`: same enum
- `Aggregate`: same enum, computed from the most limiting factor
Expand Down
1 change: 1 addition & 0 deletions fdbserver/clustercontroller/ClusterController.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ ClusterControllerData::ClusterControllerData(ClusterControllerFullInterface cons
cx = openDBOnServer(db.serverInfo, TaskPriority::DefaultEndpoint, LockAware::True);

specialCounter(clusterControllerMetrics, "ClientCount", [this]() { return db.clientCount; });
clusterHealthWorkerEventProvider->setCoordinators(coordinators);
updateClusterHealthMonitorInputs();
}

Expand Down
19 changes: 19 additions & 0 deletions fdbserver/clustercontroller/ClusterHealthIFactor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,25 @@ Future<Level> RecoveryStateFactor::fetchLevel(Reference<IWorkerEventProvider con
co_return level;
}

std::string_view CoordinatorReachabilityFactor::getName() const {
return "CoordinatorReachability";
}

Future<Level> CoordinatorReachabilityFactor::fetchLevel(Reference<IWorkerEventProvider const> workerEventProvider,
TrackCodeProbes trackCodeProbes) {
Optional<bool> allCoordinatorsReachable = co_await workerEventProvider->areAllCoordinatorsReachable();
if (!allCoordinatorsReachable.present()) {
co_return Level::METRICS_MISSING;
}

Level level = allCoordinatorsReachable.get() ? Level::HEALTHY : Level::INTERVENTION_REQUIRED;
CODE_PROBE(trackCodeProbes && level == Level::HEALTHY,
"ClusterHealth CoordinatorReachabilityFactor returns HEALTHY");
CODE_PROBE(trackCodeProbes && level == Level::INTERVENTION_REQUIRED,
"ClusterHealth CoordinatorReachabilityFactor returns INTERVENTION_REQUIRED");
co_return level;
}

std::string_view ProcessErrorsFactor::getName() const {
return "ProcessErrors";
}
Expand Down
7 changes: 7 additions & 0 deletions fdbserver/clustercontroller/ClusterHealthIFactor.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ class RecoveryStateFactor final : public IFactor {
Future<Level> fetchLevel(Reference<IWorkerEventProvider const> workerEventProvider, TrackCodeProbes) override;
};

// Evaluates whether every configured coordinator is reachable.
class CoordinatorReachabilityFactor final : public IFactor {
public:
std::string_view getName() const override;
Future<Level> fetchLevel(Reference<IWorkerEventProvider const> workerEventProvider, TrackCodeProbes) override;
};

// Evaluates whether any worker is currently reporting a latest process error.
class ProcessErrorsFactor final : public IFactor {
public:
Expand Down
45 changes: 45 additions & 0 deletions fdbserver/clustercontroller/ClusterHealthMonitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include <utility>

#include "fmt/format.h"
#include "fdbrpc/genericactors.h"
#include "fdbserver/core/CoordinationInterface.h"
#include "fdbserver/core/Knobs.h"
#include "fdbserver/core/WorkerEvents.h"
#include "flow/Trace.h"
Expand Down Expand Up @@ -154,6 +156,11 @@ void WorkerEventProvider::setStorageTeamOneReplicaLeftIsCritical(bool storageTea
this->storageTeamOneReplicaLeftIsCritical = storageTeamOneReplicaLeftIsCritical;
}

void WorkerEventProvider::setCoordinators(ServerCoordinators const& coordinators) {
coordinatorClusterKey = coordinators.clusterKey;
this->coordinators = coordinators.clientLeaderServers;
}

void WorkerEventProvider::setRatekeeperWorker(Optional<WorkerInterface> ratekeeperWorker) {
this->ratekeeperWorker = std::move(ratekeeperWorker);
}
Expand All @@ -178,6 +185,43 @@ bool WorkerEventProvider::shouldTreatStorageTeamOneReplicaLeftAsCritical() const
return storageTeamOneReplicaLeftIsCritical;
}

AsyncResult<Optional<bool>> WorkerEventProvider::areAllCoordinatorsReachable() const {
if (coordinators.empty()) {
co_return Optional<bool>();
}

try {
std::vector<Future<ErrorOr<Optional<LeaderInfo>>>> coordinatorReplies;
coordinatorReplies.reserve(coordinators.size());
for (auto const& coordinator : coordinators) {
Future<Optional<LeaderInfo>> reply;
if (coordinator.hostname.present()) {
reply = retryGetReplyFromHostname(GetLeaderRequest(coordinatorClusterKey, UID()),
coordinator.hostname.get(),
WLTOKEN_CLIENTLEADERREG_GETLEADER,
TaskPriority::CoordinationReply);
} else {
reply = retryBrokenPromise(coordinator.getLeader,
GetLeaderRequest(coordinatorClusterKey, UID()),
TaskPriority::CoordinationReply);
}
coordinatorReplies.push_back(errorOr(timeoutError(reply, 2.0)));
}

co_await waitForAll(coordinatorReplies);
for (auto const& reply : coordinatorReplies) {
if (reply.get().isError()) {
co_return false;
}
}
co_return true;
} catch (Error& e) {
ASSERT_EQ(e.code(),
error_code_actor_cancelled); // All errors should be filtering through the errorOr actor above
throw;
}
}

AsyncResult<LatestWorkerEvents> WorkerEventProvider::getLatestEvents(std::string const& eventName) const {
return latestEventOnWorkers(workers, eventName);
}
Expand Down Expand Up @@ -265,6 +309,7 @@ Monitor Monitor::create(Reference<IWorkerEventProvider const> workerEventProvide
SERVER_KNOBS->CLUSTER_HEALTH_METRIC_TLOG_CRITICAL_THRESHOLD));
factors.push_back(std::make_unique<StorageReplicationFactor>());
factors.push_back(std::make_unique<RecoveryStateFactor>());
factors.push_back(std::make_unique<CoordinatorReachabilityFactor>());
factors.push_back(std::make_unique<ProcessErrorsFactor>());
factors.push_back(std::make_unique<RkThrottlingFactor>(
SERVER_KNOBS->CLUSTER_HEALTH_METRIC_RK_CRITICAL_RELEASED_TPS_RATIO_THRESHOLD));
Expand Down
8 changes: 8 additions & 0 deletions fdbserver/clustercontroller/ClusterHealthMonitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@
#include <vector>

#include "ClusterHealthIFactor.h"
#include "fdbclient/CoordinationInterface.h"
#include "fdbclient/StorageServerInterface.h"
#include "fdbserver/core/RecoveryState.h"
#include "fdbserver/core/TLogInterface.h"
#include "fdbserver/core/WorkerEvents.h"
#include "flow/flow.h"

class ServerCoordinators;

namespace cluster_health {

enum class Level {
Expand All @@ -53,6 +56,7 @@ class IWorkerEventProvider {
virtual void delref() const = 0;
virtual Optional<RecoveryState> getRecoveryState() const = 0;
virtual bool shouldTreatStorageTeamOneReplicaLeftAsCritical() const = 0;
virtual AsyncResult<Optional<bool>> areAllCoordinatorsReachable() const = 0;
virtual AsyncResult<LatestWorkerEvents> getLatestEvents(std::string const& eventName) const = 0;
virtual AsyncResult<LatestWorkerEvents> getLatestRatekeeperEvents(std::string const& eventName) const = 0;
virtual AsyncResult<LatestWorkerEvents> getLatestDataDistributorEvents(std::string const& eventName) const = 0;
Expand All @@ -65,6 +69,8 @@ class WorkerEventProvider final : public IWorkerEventProvider, public ReferenceC
std::vector<WorkerDetails> workers;
Optional<RecoveryState> recoveryState;
bool storageTeamOneReplicaLeftIsCritical = false;
Key coordinatorClusterKey;
std::vector<ClientLeaderRegInterface> coordinators;
Optional<WorkerInterface> ratekeeperWorker;
Optional<WorkerInterface> dataDistributorWorker;
std::vector<StorageServerInterface> storageServers;
Expand All @@ -76,12 +82,14 @@ class WorkerEventProvider final : public IWorkerEventProvider, public ReferenceC
void setWorkers(std::vector<WorkerDetails> workers);
void setRecoveryState(RecoveryState recoveryState);
void setStorageTeamOneReplicaLeftIsCritical(bool storageTeamOneReplicaLeftIsCritical);
void setCoordinators(ServerCoordinators const& coordinators);
void setRatekeeperWorker(Optional<WorkerInterface> ratekeeperWorker);
void setDataDistributorWorker(Optional<WorkerInterface> dataDistributorWorker);
void setStorageServers(std::vector<StorageServerInterface> storageServers);
void setTLogs(std::vector<TLogInterface> tlogs);
Optional<RecoveryState> getRecoveryState() const override;
bool shouldTreatStorageTeamOneReplicaLeftAsCritical() const override;
AsyncResult<Optional<bool>> areAllCoordinatorsReachable() const override;
AsyncResult<LatestWorkerEvents> getLatestEvents(std::string const& eventName) const override;
AsyncResult<LatestWorkerEvents> getLatestRatekeeperEvents(std::string const& eventName) const override;
AsyncResult<LatestWorkerEvents> getLatestDataDistributorEvents(std::string const& eventName) const override;
Expand Down
25 changes: 25 additions & 0 deletions fdbserver/clustercontroller/ClusterHealthMonitorTesting.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ namespace {
class FakeWorkerEventProvider final : public IWorkerEventProvider, public ReferenceCounted<FakeWorkerEventProvider> {
Optional<RecoveryState> recoveryState;
bool storageTeamOneReplicaLeftIsCritical = false;
Optional<bool> allCoordinatorsReachable;
std::map<std::string, LatestWorkerEvents> latestEventsByName;
std::map<std::string, LatestWorkerEvents> latestRatekeeperEventsByName;
std::map<std::string, LatestWorkerEvents> latestDataDistributorEventsByName;
Expand All @@ -53,6 +54,10 @@ class FakeWorkerEventProvider final : public IWorkerEventProvider, public Refere
this->storageTeamOneReplicaLeftIsCritical = storageTeamOneReplicaLeftIsCritical;
}

void setAllCoordinatorsReachable(bool allCoordinatorsReachable) {
this->allCoordinatorsReachable = allCoordinatorsReachable;
}

void setLatestRatekeeperEvents(std::string eventName, LatestWorkerEvents latestEvents) {
latestRatekeeperEventsByName[std::move(eventName)] = std::move(latestEvents);
}
Expand Down Expand Up @@ -81,6 +86,8 @@ class FakeWorkerEventProvider final : public IWorkerEventProvider, public Refere

bool shouldTreatStorageTeamOneReplicaLeftAsCritical() const override { return storageTeamOneReplicaLeftIsCritical; }

AsyncResult<Optional<bool>> areAllCoordinatorsReachable() const override { co_return allCoordinatorsReachable; }

AsyncResult<LatestWorkerEvents> getLatestRatekeeperEvents(std::string const& eventName) const override {
auto it = latestRatekeeperEventsByName.find(eventName);
if (it != latestRatekeeperEventsByName.end()) {
Expand Down Expand Up @@ -335,6 +342,24 @@ TEST_CASE("/fdbserver/clustercontroller/ClusterHealthMonitor/RecoveryStateFactor
ASSERT_EQ(level, Level::METRICS_MISSING);
}

TEST_CASE("/fdbserver/clustercontroller/ClusterHealthMonitor/CoordinatorReachabilityFactor") {
CoordinatorReachabilityFactor factor;
auto provider = makeReference<FakeWorkerEventProvider>();
Level level;

provider->setAllCoordinatorsReachable(true);
level = co_await factor.fetchLevel(provider, TrackCodeProbes::False);
ASSERT_EQ(level, Level::HEALTHY);

provider->setAllCoordinatorsReachable(false);
level = co_await factor.fetchLevel(provider, TrackCodeProbes::False);
ASSERT_EQ(level, Level::INTERVENTION_REQUIRED);

auto missingProvider = makeReference<FakeWorkerEventProvider>();
level = co_await factor.fetchLevel(missingProvider, TrackCodeProbes::False);
ASSERT_EQ(level, Level::METRICS_MISSING);
}

TEST_CASE("/fdbserver/clustercontroller/ClusterHealthMonitor/ProcessErrorsFactor") {
ProcessErrorsFactor factor;
auto provider = makeReference<FakeWorkerEventProvider>();
Expand Down
Loading