Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fdbclient/include/fdbclient/CommitTransaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ struct MutationRef {
CompareAndClear,
Reserved_For_SpanContextMessage /* See fdbserver/SpanContextMessage.h */,
Reserved_For_OTELSpanContextMessage,
Reserved_For_PartitionMapMessage /* See fdbserver/core/PartitionMapMessage.h */,
Reserved_For_PartitionMapMessage /* See fdbserver/backupworker/PartitionMapMessage.h */,
MAX_ATOMIC_OP
};

Expand Down
27 changes: 14 additions & 13 deletions fdbrpc/FlowTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/FailureMonitor.h"
#include "fdbrpc/HealthMonitor.h"
#include "HealthMonitor.h"
#include "JsonWebKeySet.h"
#include "fdbrpc/genericactors.h"
#include "fdbrpc/IPAllowList.h"
Expand Down Expand Up @@ -762,19 +762,19 @@ Future<Void> connectionWriter(Reference<Peer> self, Reference<IConnection> conn)
}
}

Future<Void> delayedHealthUpdate(NetworkAddress address, bool* tooManyConnectionsClosed) {
Future<Void> delayedHealthUpdate(HealthMonitor* healthMonitor, NetworkAddress address, bool* tooManyConnectionsClosed) {
double start = now();
while (true) {
if (FLOW_KNOBS->HEALTH_MONITOR_MARK_FAILED_UNSTABLE_CONNECTIONS &&
FlowTransport::transport().healthMonitor()->tooManyConnectionsClosed(address) && address.isPublic()) {
healthMonitor->tooManyConnectionsClosed(address) && address.isPublic()) {
co_await delayJittered(FLOW_KNOBS->MAX_RECONNECTION_TIME * 2.0);
} else {
if (*tooManyConnectionsClosed) {
TraceEvent("TooManyConnectionsClosedMarkAvailable")
.detail("Dest", address)
.detail("StartTime", start)
.detail("TimeElapsed", now() - start)
.detail("ClosedCount", FlowTransport::transport().healthMonitor()->closedConnectionsCount(address));
.detail("ClosedCount", healthMonitor->closedConnectionsCount(address));
*tooManyConnectionsClosed = false;
}
IFailureMonitor::failureMonitor().setStatus(address, FailureStatus(false));
Expand Down Expand Up @@ -855,7 +855,8 @@ Future<Void> connectionKeeper(Reference<Peer> self,
IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(false));
}
if (self->unsent.empty()) {
delayedHealthUpdateF = delayedHealthUpdate(self->destination, &tooManyConnectionsClosed);
delayedHealthUpdateF = delayedHealthUpdate(
&self->transport->healthMonitor, self->destination, &tooManyConnectionsClosed);
auto healthRes = co_await race(delayedHealthUpdateF, self->dataToSend.onTrigger());
if (healthRes.index() == 0) {
conn->close();
Expand Down Expand Up @@ -897,7 +898,8 @@ Future<Void> connectionKeeper(Reference<Peer> self,
try {
self->transport->countConnEstablished++;
if (!delayedHealthUpdateF.isValid())
delayedHealthUpdateF = delayedHealthUpdate(self->destination, &tooManyConnectionsClosed);
delayedHealthUpdateF = delayedHealthUpdate(
&self->transport->healthMonitor, self->destination, &tooManyConnectionsClosed);
self->connected = true;
co_await (connectionWriter(self, conn) || reader || connectionMonitor(self) ||
self->resetConnection.onTrigger());
Expand Down Expand Up @@ -992,15 +994,14 @@ Future<Void> connectionKeeper(Reference<Peer> self,

if (conn) {
if (self->destination.isPublic() && e.code() == error_code_connection_failed) {
FlowTransport::transport().healthMonitor()->reportPeerClosed(self->destination);
self->transport->healthMonitor.reportPeerClosed(self->destination);
if (FLOW_KNOBS->HEALTH_MONITOR_MARK_FAILED_UNSTABLE_CONNECTIONS &&
FlowTransport::transport().healthMonitor()->tooManyConnectionsClosed(self->destination) &&
self->transport->healthMonitor.tooManyConnectionsClosed(self->destination) &&
self->destination.isPublic()) {
TraceEvent("TooManyConnectionsClosedMarkFailed")
.detail("Dest", self->destination)
.detail(
"ClosedCount",
FlowTransport::transport().healthMonitor()->closedConnectionsCount(self->destination));
.detail("ClosedCount",
self->transport->healthMonitor.closedConnectionsCount(self->destination));
tooManyConnectionsClosed = true;
IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(true));
}
Expand Down Expand Up @@ -2182,8 +2183,8 @@ void FlowTransport::createInstance(bool isClient,
g_network->setGlobal(INetwork::enClientFailureMonitor, isClient ? (flowGlobalType)1 : nullptr);
}

HealthMonitor* FlowTransport::healthMonitor() {
return &self->healthMonitor;
std::unordered_set<NetworkAddress> FlowTransport::getRecentClosedPeers() {
return self->healthMonitor.getRecentClosedPeers();
}

Optional<PublicKey> FlowTransport::getPublicKeyByName(StringRef name) const {
Expand Down
2 changes: 1 addition & 1 deletion fdbrpc/HealthMonitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* limitations under the License.
*/

#include "fdbrpc/HealthMonitor.h"
#include "HealthMonitor.h"

void HealthMonitor::reportPeerClosed(const NetworkAddress& peerAddress) {
purgeOutdatedHistory();
Expand Down
File renamed without changes.
6 changes: 4 additions & 2 deletions fdbrpc/include/fdbrpc/FlowTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@

#include <algorithm>
#include <map>
#include <unordered_map>
#include <unordered_set>

#include "fdbrpc/DDSketch.h"
#include "fdbrpc/HealthMonitor.h"
#include "flow/genericactors.actor.h"
#include "flow/network.h"
#include "flow/FileIdentifier.h"
Expand Down Expand Up @@ -293,7 +294,8 @@ class FlowTransport : NonCopyable {
Endpoint loadedEndpoint(const UID& token);
Future<Void> loadedDisconnect();

HealthMonitor* healthMonitor();
// Returns peers whose recent failed connections have already been evicted from the transport peer map.
std::unordered_set<NetworkAddress> getRecentClosedPeers();

bool currentDeliveryPeerIsTrusted() const;
NetworkAddress currentDeliveryPeerAddress() const;
Expand Down
2 changes: 1 addition & 1 deletion fdbserver/backupworker/BackupWorkerRangePartitioned.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include "fdbserver/core/BackupPartitionMap.h"
#include "fdbserver/core/BackupProgress.h"
#include "fdbserver/core/Knobs.h"
#include "fdbserver/core/PartitionMapMessage.h"
#include "PartitionMapMessage.h"
#include "fdbserver/core/WaitFailure.h"
#include "fdbserver/logsystem/LogSystem.h"
#include "fdbserver/logsystem/LogSystemConsumer.h"
Expand Down
2 changes: 1 addition & 1 deletion fdbserver/tester/KnobProtectiveGroups.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* limitations under the License.
*/

#include "fdbserver/tester/KnobProtectiveGroups.h"
#include "KnobProtectiveGroups.h"

#include <array>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,9 @@
#ifndef FDBSERVER_TESTER_KNOBPROTECTIVEGROUPS_H
#define FDBSERVER_TESTER_KNOBPROTECTIVEGROUPS_H

#include <array>
#include <unordered_map>
#include "fdbserver/tester/KnobKeyValuePairs.h"

#include "flow/Knobs.h"

// A list of knob key value pairs
class KnobKeyValuePairs {
public:
using container_t = std::unordered_map<std::string, ParsedKnobValue>;

private:
// Here the knob value is directly stored, unlike KnobValue, for simplicity
container_t knobs;

public:
// Sets a value for a given knob
void set(const std::string& name, const ParsedKnobValue value);

// Gets a list of knobs for given type
const container_t& getKnobs() const;
};

// For knobs, temporarily change the values, the original values will be recovered
// For knobs, temporarily change the values, the original values will be recovered.
class KnobProtectiveGroup {
KnobKeyValuePairs overriddenKnobs;
KnobKeyValuePairs originalKnobs;
Expand Down
47 changes: 47 additions & 0 deletions fdbserver/tester/include/fdbserver/tester/KnobKeyValuePairs.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* KnobKeyValuePairs.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2026 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef FDBSERVER_TESTER_KNOBKEYVALUEPAIRS_H
#define FDBSERVER_TESTER_KNOBKEYVALUEPAIRS_H
#pragma once

#include <string>
#include <unordered_map>

#include "flow/Knobs.h"

// A list of knob key value pairs.
class KnobKeyValuePairs {
public:
using container_t = std::unordered_map<std::string, ParsedKnobValue>;

private:
// Here the knob value is directly stored, unlike KnobValue, for simplicity.
container_t knobs;

public:
// Sets a value for a given knob.
void set(const std::string& name, const ParsedKnobValue value);

// Gets a list of knobs for given type.
const container_t& getKnobs() const;
};

#endif // FDBSERVER_TESTER_KNOBKEYVALUEPAIRS_H
2 changes: 1 addition & 1 deletion fdbserver/tester/include/fdbserver/tester/WorkloadUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/core/FDBSimulationPolicy.h"
#include "fdbserver/tester/KnobProtectiveGroups.h"
#include "fdbserver/tester/KnobKeyValuePairs.h"
#include "fdbserver/core/TesterInterface.h"
#include "fdbserver/tester/workloads.h"
#include "fdbrpc/PerfMetric.h"
Expand Down
2 changes: 1 addition & 1 deletion fdbserver/tester/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
#include "fdbserver/core/QuietDatabase.h"
#include "fdbserver/core/WorkerInterface.actor.h"
#include "fdbserver/core/FDBSimulationPolicy.h"
#include "fdbserver/tester/KnobProtectiveGroups.h"
#include "KnobProtectiveGroups.h"
#include "ConsistencyChecker.h"
#include "DatabaseMaintenance.h"
#include "TestSpecParser.h"
Expand Down
2 changes: 1 addition & 1 deletion fdbserver/worker/worker.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1251,7 +1251,7 @@ UpdateWorkerHealthRequest doPeerHealthCheck(const WorkerInterface& interf,
// part of the transaction sub system.
// Note that we don't need to calculate recovered peer in this case since all the recently closed peers are
// considered permanently closed peers.
for (const auto& address : FlowTransport::transport().healthMonitor()->getRecentClosedPeers()) {
for (const auto& address : FlowTransport::transport().getRecentClosedPeers()) {
if (allPeers.find(address) != allPeers.end()) {
// We have checked this peer in the above for loop.
continue;
Expand Down
Loading