From 472904b49c5a8173cd4f076054b5d06e1371965f Mon Sep 17 00:00:00 2001 From: Rohit Agrawal Date: Thu, 25 Jun 2026 17:04:45 +0000 Subject: [PATCH] dynamic_modules: per-cluster persistent cross-priority host map via cluster ABI The cross-priority host map rebuilds itself with a full O(N) make_shared copy on every membership delta. MainPrioritySetImpl can now back it with an immer persistent map so a delta is applied in O(delta), selected per cluster via setUsePersistentCrossPriorityHostMap(). crossPriorityHostMap() returns an abstract HostLookupTable backed by either the legacy flat map (the default, preserving the exact prior behavior) or the immer map, and updateDynamicHostList takes a host-lookup functor so it stays backing-agnostic. Dynamic-modules clusters opt in through the new envoy_dynamic_module_callback_cluster_use_persistent_host_map ABI callback (the DynamicModuleCluster::setUsePersistentHostMap forwarder) and the Rust SDK EnvoyCluster::set_use_persistent_host_map method. Membership behavior is identical under either backing. Signed-off-by: Rohit Agrawal --- bazel/deps.yaml | 11 + bazel/external/immer.BUILD | 11 + bazel/repositories.bzl | 7 + bazel/repository_locations.bzl | 9 + ..._added-cluster-use-persistent-host-map.rst | 5 + envoy/upstream/BUILD | 1 + envoy/upstream/upstream.h | 56 ++++- source/common/upstream/BUILD | 15 ++ .../common/upstream/cluster_manager_impl.cc | 13 +- source/common/upstream/cluster_manager_impl.h | 12 +- source/common/upstream/host_utility.cc | 9 +- source/common/upstream/host_utility.h | 5 +- source/common/upstream/persistent_host_map.cc | 98 ++++++++ source/common/upstream/persistent_host_map.h | 54 ++++ source/common/upstream/upstream_impl.cc | 165 +++++++++--- source/common/upstream/upstream_impl.h | 55 +++- .../reverse_connection_io_handle.cc | 15 +- source/extensions/clusters/dns/dns_cluster.cc | 9 +- .../clusters/dynamic_modules/abi_impl.cc | 22 +- .../clusters/dynamic_modules/cluster.cc | 16 +- .../clusters/dynamic_modules/cluster.h | 1 + source/extensions/clusters/eds/eds.cc | 10 +- source/extensions/clusters/eds/eds.h | 2 +- .../clusters/redis/redis_cluster.cc | 14 +- .../clusters/strict_dns/strict_dns_cluster.cc | 9 +- source/extensions/dynamic_modules/abi/abi.h | 13 + source/extensions/dynamic_modules/abi_impl.cc | 6 + .../dynamic_modules/sdk/rust/src/cluster.rs | 12 + .../dynamic_modules/sdk/rust/src/lib_test.rs | 7 + .../dynamic_modules/abi_impl.cc | 6 +- .../override_host/load_balancer.cc | 24 +- test/common/upstream/host_utility_test.cc | 43 ++-- test/common/upstream/upstream_impl_test.cc | 236 +++++++++++++++++- .../rc_connection_wrapper_test.cc | 18 +- .../reverse_connection_io_handle_test.cc | 72 ++++-- .../clusters/dynamic_modules/cluster_test.cc | 49 ++++ .../dynamic_modules/abi_impl_test.cc | 2 + .../dynamic_modules/config_test.cc | 3 +- .../override_host/BUILD | 1 + .../override_host/load_balancer_test.cc | 30 ++- test/mocks/upstream/priority_set.h | 7 +- 41 files changed, 982 insertions(+), 171 deletions(-) create mode 100644 bazel/external/immer.BUILD create mode 100644 changelogs/current/new_features/dynamic_modules__added-cluster-use-persistent-host-map.rst create mode 100644 source/common/upstream/persistent_host_map.cc create mode 100644 source/common/upstream/persistent_host_map.h diff --git a/bazel/deps.yaml b/bazel/deps.yaml index 5fbe4dfadeef9..b6623075a86bd 100644 --- a/bazel/deps.yaml +++ b/bazel/deps.yaml @@ -939,6 +939,17 @@ fp16: extensions: - envoy.wasm.runtime.v8 +immer: + project_name: "immer" + project_desc: "Header-only library of persistent and immutable data structures for C++" + project_url: "https://github.com/arximboldi/immer" + release_date: "2026-01-29" + use_category: + - dataplane_core + cpe: "N/A" + license: "BSL-1.0" + license_url: "https://github.com/arximboldi/immer/blob/{version}/LICENSE" + googleurl: project_name: "Chrome URL parsing library" project_desc: "Chrome URL parsing library" diff --git a/bazel/external/immer.BUILD b/bazel/external/immer.BUILD new file mode 100644 index 0000000000000..5110288f76222 --- /dev/null +++ b/bazel/external/immer.BUILD @@ -0,0 +1,11 @@ +load("@rules_cc//cc:defs.bzl", "cc_library") + +licenses(["notice"]) # BSL-1.0 + +package(default_visibility = ["//visibility:public"]) + +cc_library( + name = "immer", + hdrs = glob(["immer/**/*.hpp"]), + includes = ["."], +) diff --git a/bazel/repositories.bzl b/bazel/repositories.bzl index 5caa7e2868bb4..48d8827336c3c 100644 --- a/bazel/repositories.bzl +++ b/bazel/repositories.bzl @@ -213,6 +213,7 @@ def envoy_dependencies(skip_targets = []): _highway() _dragonbox() _fp16() + _immer() _simdutf() _quiche() _googleurl() @@ -811,6 +812,12 @@ def _fp16(): build_file = "@envoy//bazel/external:fp16.BUILD", ) +def _immer(): + external_http_archive( + name = "immer", + build_file = "@envoy//bazel/external:immer.BUILD", + ) + def _simdutf(): external_http_archive( name = "simdutf", diff --git a/bazel/repository_locations.bzl b/bazel/repository_locations.bzl index da2f88214649d..4835e31e7b26e 100644 --- a/bazel/repository_locations.bzl +++ b/bazel/repository_locations.bzl @@ -546,6 +546,15 @@ REPOSITORY_LOCATIONS_SPEC = dict( sha256 = "e2da4f41bae8869f8dee56f4c104e699e7de3a483b5e451fda8e76fbcc66c59a", urls = ["https://github.com/Maratyszcza/FP16/archive/{version}.zip"], ), + immer = dict( + # Pinned past the v0.9.1 release to pick up the upstream fix for + # https://github.com/arximboldi/immer/issues/274 (pull request 323), which corrects the + # allocation size of the empty map node. Move back to a tagged release once one ships it. + version = "71813305ed1af2173f199e8ee209df5d6a16e766", + strip_prefix = "immer-{version}", + sha256 = "470a1926a1ff57d0de42bc6f68710d988eda31e30ac142c81bcea9a59cdb7828", + urls = ["https://github.com/arximboldi/immer/archive/{version}.tar.gz"], + ), simdutf = dict( # NOTE: Update together with v8 and proxy_wasm_cpp_host. version = "8.1.0", diff --git a/changelogs/current/new_features/dynamic_modules__added-cluster-use-persistent-host-map.rst b/changelogs/current/new_features/dynamic_modules__added-cluster-use-persistent-host-map.rst new file mode 100644 index 0000000000000..e4bc58ae34c4a --- /dev/null +++ b/changelogs/current/new_features/dynamic_modules__added-cluster-use-persistent-host-map.rst @@ -0,0 +1,5 @@ +Added the ``envoy_dynamic_module_callback_cluster_use_persistent_host_map`` ABI callback so that a +dynamic-modules cluster can opt into a persistent cross-priority host map in place of the +default flat copy-on-write map, making each membership delta O(delta) instead of an O(N) flat copy. +Membership behavior is identical under either backing, and the flat map remains the default. The +Rust SDK exposes this as ``EnvoyCluster::set_use_persistent_host_map``. diff --git a/envoy/upstream/BUILD b/envoy/upstream/BUILD index e45226fb90113..c493f34ee6828 100644 --- a/envoy/upstream/BUILD +++ b/envoy/upstream/BUILD @@ -170,6 +170,7 @@ envoy_cc_library( "//envoy/ssl:context_interface", "//envoy/ssl:context_manager_interface", "//envoy/upstream:types_interface", + "@abseil-cpp//absl/functional:function_ref", "@envoy_api//envoy/config/cluster/v3:pkg_cc_proto", "@envoy_api//envoy/config/core/v3:pkg_cc_proto", ], diff --git a/envoy/upstream/upstream.h b/envoy/upstream/upstream.h index d8d04f9bdc90b..93f6f768f46db 100644 --- a/envoy/upstream/upstream.h +++ b/envoy/upstream/upstream.h @@ -29,6 +29,7 @@ #include "envoy/upstream/resource_manager.h" #include "envoy/upstream/types.h" +#include "absl/functional/function_ref.h" #include "absl/strings/string_view.h" #include "fmt/format.h" @@ -362,6 +363,53 @@ using ExcludedHostVector = Phantom; using HostMap = absl::flat_hash_map; using HostMapSharedPtr = std::shared_ptr; using HostMapConstSharedPtr = std::shared_ptr; + +/** + * Read-only host-by-address lookup published to workers. Backed by either the flat HostMap (the + * default) or a persistent map, selected per cluster via + * `MainPrioritySetImpl::setUsePersistentCrossPriorityHostMap`, so the backing is swappable without + * changing consumers. + */ +class HostLookupTable { +public: + virtual ~HostLookupTable() = default; + + /** + * @param address the host address to look up. + * @return HostSharedPtr the host for the address, or nullptr if absent. + */ + virtual HostSharedPtr findHost(absl::string_view address) const PURE; + + /** + * @return size_t the number of hosts in the table. + */ + virtual size_t size() const PURE; + + /** + * @return bool true if the table holds no hosts. + */ + virtual bool empty() const PURE; + + /** + * Invokes a callback for each (address, host) entry. Iteration order is unspecified. + * + * @param cb the callback to invoke for each entry. + */ + virtual void + forEach(absl::FunctionRef cb) const PURE; +}; +using HostLookupTableConstSharedPtr = std::shared_ptr; + +/** + * Wraps a flat HostMap in the HostLookupTable interface. Defined in upstream_impl.cc so the + * concrete type stays private. Available to tests and consumers that hold a HostMap but need to + * publish a HostLookupTable. + * + * @param map the flat host map to wrap. + * @return HostLookupTableConstSharedPtr a lookup table backed by the given map. + */ +HostLookupTableConstSharedPtr makeFlatHostLookupTable(HostMapConstSharedPtr map); + using HostVectorSharedPtr = std::shared_ptr; using HostVectorConstSharedPtr = std::shared_ptr; @@ -590,10 +638,10 @@ class PrioritySet { virtual const std::vector& hostSetsPerPriority() const PURE; /** - * @return HostMapConstSharedPtr read only cross priority host map that indexed by host address - * string. + * @return HostLookupTableConstSharedPtr read only cross priority host lookup table indexed by + * host address string. */ - virtual HostMapConstSharedPtr crossPriorityHostMap() const PURE; + virtual HostLookupTableConstSharedPtr crossPriorityHostMap() const PURE; /** * Parameter class for updateHosts. @@ -627,7 +675,7 @@ class PrioritySet { const HostVector& hosts_added, const HostVector& hosts_removed, std::optional weighted_priority_health, std::optional overprovisioning_factor, - HostMapConstSharedPtr cross_priority_host_map = nullptr) PURE; + HostLookupTableConstSharedPtr cross_priority_host_map = nullptr) PURE; /** * Callback provided during batch updates that can be used to update hosts. diff --git a/source/common/upstream/BUILD b/source/common/upstream/BUILD index f4cb8581c683f..131b578fcf3a6 100644 --- a/source/common/upstream/BUILD +++ b/source/common/upstream/BUILD @@ -385,6 +385,19 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "persistent_host_map_lib", + srcs = ["persistent_host_map.cc"], + hdrs = ["persistent_host_map.h"], + rbe_pool = "6gig", + deps = [ + "//envoy/upstream:upstream_interface", + "@abseil-cpp//absl/functional:function_ref", + "@abseil-cpp//absl/strings:string_view", + "@immer", + ], +) + envoy_cc_library( name = "upstream_lib", srcs = ["upstream_impl.cc"], @@ -393,6 +406,7 @@ envoy_cc_library( ":cluster_factory_lib", ":default_local_address_selector_factory", ":health_checker_lib", + ":persistent_host_map_lib", # TODO(mattklein123): Move the clusters to extensions so they can be compiled out. ":upstream_includes", ":transport_socket_match_lib", @@ -504,6 +518,7 @@ envoy_cc_library( "//source/extensions/upstreams/http:config", "//source/extensions/upstreams/tcp:config", "//source/server:transport_socket_config_lib", + "@abseil-cpp//absl/functional:function_ref", "@abseil-cpp//absl/synchronization", "@envoy_api//envoy/config/cluster/v3:pkg_cc_proto", "@envoy_api//envoy/config/core/v3:pkg_cc_proto", diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 0ae93e8814ade..5d34096e3c224 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -1194,7 +1194,8 @@ void ClusterManagerImpl::postThreadLocalClusterUpdate(ClusterManagerCluster& cm_ per_priority.overprovisioning_factor_ = host_set->overprovisioningFactor(); } - HostMapConstSharedPtr host_map = cm_cluster.cluster().prioritySet().crossPriorityHostMap(); + HostLookupTableConstSharedPtr host_map = + cm_cluster.cluster().prioritySet().crossPriorityHostMap(); pending_cluster_creations_.erase(cm_cluster.cluster().info()->name()); @@ -1304,7 +1305,7 @@ void ClusterManagerImpl::postThreadLocalClusterUpdate(ClusterManagerCluster& cm_ ClusterManagerImpl::ClusterInitializationObjectConstSharedPtr ClusterManagerImpl::addOrUpdateClusterInitializationObjectIfSupported( const ThreadLocalClusterUpdateParams& params, ClusterInfoConstSharedPtr cluster_info, - LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map, + LoadBalancerFactorySharedPtr load_balancer_factory, HostLookupTableConstSharedPtr map, UnitFloat drop_overload, absl::string_view drop_category) { if (!deferralIsSupportedForCluster(cluster_info)) { return nullptr; @@ -1386,7 +1387,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::initializeClusterInlineIfExis ClusterManagerImpl::ClusterInitializationObject::ClusterInitializationObject( const ThreadLocalClusterUpdateParams& params, ClusterInfoConstSharedPtr cluster_info, - LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map, + LoadBalancerFactorySharedPtr load_balancer_factory, HostLookupTableConstSharedPtr map, UnitFloat drop_overload, absl::string_view drop_category) : cluster_info_(std::move(cluster_info)), load_balancer_factory_(load_balancer_factory), cross_priority_host_map_(map), drop_overload_(drop_overload), drop_category_(drop_category) { @@ -1399,7 +1400,7 @@ ClusterManagerImpl::ClusterInitializationObject::ClusterInitializationObject( ClusterManagerImpl::ClusterInitializationObject::ClusterInitializationObject( const absl::flat_hash_map& per_priority_state, const ThreadLocalClusterUpdateParams& update_params, ClusterInfoConstSharedPtr cluster_info, - LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map, + LoadBalancerFactorySharedPtr load_balancer_factory, HostLookupTableConstSharedPtr map, UnitFloat drop_overload, absl::string_view drop_category) : per_priority_state_(per_priority_state), cluster_info_(std::move(cluster_info)), load_balancer_factory_(load_balancer_factory), cross_priority_host_map_(map), @@ -1513,7 +1514,7 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::updateHost LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added, const HostVector& hosts_removed, std::optional weighted_priority_health, std::optional overprovisioning_factor, - HostMapConstSharedPtr cross_priority_host_map) { + HostLookupTableConstSharedPtr cross_priority_host_map) { ENVOY_LOG(debug, "membership update for TLS cluster {} added {} removed {}", name, hosts_added.size(), hosts_removed.size()); priority_set_.updateHosts(priority, std::move(update_hosts_params), std::move(locality_weights), @@ -1841,7 +1842,7 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership( const std::string& name, uint32_t priority, PrioritySet::UpdateHostsParams update_hosts_params, LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added, const HostVector& hosts_removed, bool weighted_priority_health, - uint64_t overprovisioning_factor, HostMapConstSharedPtr cross_priority_host_map) { + uint64_t overprovisioning_factor, HostLookupTableConstSharedPtr cross_priority_host_map) { ASSERT(thread_local_clusters_.find(name) != thread_local_clusters_.end()); const auto& cluster_entry = thread_local_clusters_[name]; cluster_entry->updateHosts(name, priority, std::move(update_hosts_params), diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 7b55cf796303c..f59ac9632d444 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -454,20 +454,20 @@ class ClusterManagerImpl : public ClusterManager, ClusterInitializationObject(const ThreadLocalClusterUpdateParams& params, ClusterInfoConstSharedPtr cluster_info, LoadBalancerFactorySharedPtr load_balancer_factory, - HostMapConstSharedPtr map, UnitFloat drop_overload, + HostLookupTableConstSharedPtr map, UnitFloat drop_overload, absl::string_view drop_category); ClusterInitializationObject( const absl::flat_hash_map& per_priority_state, const ThreadLocalClusterUpdateParams& update_params, ClusterInfoConstSharedPtr cluster_info, - LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map, + LoadBalancerFactorySharedPtr load_balancer_factory, HostLookupTableConstSharedPtr map, UnitFloat drop_overload, absl::string_view drop_category); absl::flat_hash_map per_priority_state_; const ClusterInfoConstSharedPtr cluster_info_; const LoadBalancerFactorySharedPtr load_balancer_factory_; - const HostMapConstSharedPtr cross_priority_host_map_; + const HostLookupTableConstSharedPtr cross_priority_host_map_; UnitFloat drop_overload_{0}; const std::string drop_category_; }; @@ -628,7 +628,7 @@ class ClusterManagerImpl : public ClusterManager, const HostVector& hosts_added, const HostVector& hosts_removed, std::optional weighted_priority_health, std::optional overprovisioning_factor, - HostMapConstSharedPtr cross_priority_host_map); + HostLookupTableConstSharedPtr cross_priority_host_map); // Drains any connection pools associated with the removed hosts. All connections will be // closed gracefully and no new connections will be created. @@ -720,7 +720,7 @@ class ClusterManagerImpl : public ClusterManager, LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added, const HostVector& hosts_removed, bool weighted_priority_health, uint64_t overprovisioning_factor, - HostMapConstSharedPtr cross_priority_host_map); + HostLookupTableConstSharedPtr cross_priority_host_map); void onHostHealthFailure(const HostSharedPtr& host); ConnPoolsContainer* getHttpConnPoolsContainer(const HostConstSharedPtr& host, @@ -931,7 +931,7 @@ class ClusterManagerImpl : public ClusterManager, */ ClusterInitializationObjectConstSharedPtr addOrUpdateClusterInitializationObjectIfSupported( const ThreadLocalClusterUpdateParams& params, ClusterInfoConstSharedPtr cluster_info, - LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map, + LoadBalancerFactorySharedPtr load_balancer_factory, HostLookupTableConstSharedPtr map, UnitFloat drop_overload, absl::string_view drop_category); bool deferralIsSupportedForCluster(const ClusterInfoConstSharedPtr& info) const; diff --git a/source/common/upstream/host_utility.cc b/source/common/upstream/host_utility.cc index 6b44548ac7f78..4156be39b094f 100644 --- a/source/common/upstream/host_utility.cc +++ b/source/common/upstream/host_utility.cc @@ -138,7 +138,7 @@ HostUtility::HostStatusSet HostUtility::createOverrideHostStatus( } HostUtility::OverrideHostSelectionResult -HostUtility::selectOverrideHost(const HostMap* host_map, HostStatusSet status, +HostUtility::selectOverrideHost(const HostLookupTable* host_map, HostStatusSet status, LoadBalancerContext* context) { if (context == nullptr) { return {}; @@ -156,16 +156,13 @@ HostUtility::selectOverrideHost(const HostMap* host_map, HostStatusSet status, return {nullptr, strict_mode, OverrideHostSelectionStatus::NotFound}; } - auto host_iter = host_map->find(override_host->host); + HostConstSharedPtr host = host_map->findHost(override_host->host); // The override host cannot be found in the host map. - if (host_iter == host_map->end()) { + if (host == nullptr) { return {nullptr, strict_mode, OverrideHostSelectionStatus::NotFound}; } - HostConstSharedPtr host = host_iter->second; - ASSERT(host != nullptr); - if (status[static_cast(host->healthStatus())]) { return {host, strict_mode, OverrideHostSelectionStatus::Success}; } diff --git a/source/common/upstream/host_utility.h b/source/common/upstream/host_utility.h index f2c38b103d3c5..2d8ae3c60fc56 100644 --- a/source/common/upstream/host_utility.h +++ b/source/common/upstream/host_utility.h @@ -52,8 +52,9 @@ class HostUtility { * @return OverrideHostSelectionResult containing the selected host, whether strict mode was * requested, and the reason for the selection outcome. */ - static OverrideHostSelectionResult - selectOverrideHost(const HostMap* host_map, HostStatusSet status, LoadBalancerContext* context); + static OverrideHostSelectionResult selectOverrideHost(const HostLookupTable* host_map, + HostStatusSet status, + LoadBalancerContext* context); // Iterate over all per-endpoint metrics, for clusters with `per_endpoint_stats` enabled. static void diff --git a/source/common/upstream/persistent_host_map.cc b/source/common/upstream/persistent_host_map.cc new file mode 100644 index 0000000000000..0215b49e2fc2b --- /dev/null +++ b/source/common/upstream/persistent_host_map.cc @@ -0,0 +1,98 @@ +#include "source/common/upstream/persistent_host_map.h" + +#include +#include +#include +#include + +#include "envoy/upstream/upstream.h" + +#include "absl/functional/function_ref.h" +#include "absl/strings/string_view.h" +#include "immer/map.hpp" + +namespace Envoy { +namespace Upstream { + +// Persistent `immer` CHAMP hash map indexed by host address string. A delta produces a new map that +// structurally shares unchanged nodes with the prior map, so a membership delta is O(delta) instead +// of the O(N) copy the flat HostMap requires. +using PersistentHostMap = immer::map; + +// The map is published from the main thread and read on workers, which share structural nodes, so +// the node ref-counts must be atomic. Guard against a build that disables `immer` thread safety. +static_assert( + std::is_same_v, + "PersistentHostMap is shared across threads, so immer must use atomic refcounting. Do not " + "define IMMER_NO_THREAD_SAFETY."); + +namespace { + +// Wraps a persistent `immer` map behind the HostLookupTable interface. Used when the cluster opts +// into the persistent backing via `setUsePersistentCrossPriorityHostMap()`. +class PersistentHostLookupTable : public HostLookupTable { +public: + explicit PersistentHostLookupTable(PersistentHostMap map) : map_(std::move(map)) {} + HostSharedPtr findHost(absl::string_view address) const override { + // `immer` keys on `std::string`, so the opt-in persistent path constructs the key. The default + // flat path stays allocation-free. + const HostSharedPtr* host = map_.find(std::string(address)); + return host != nullptr ? *host : nullptr; + } + size_t size() const override { return map_.size(); } + bool empty() const override { return map_.empty(); } + void + forEach(absl::FunctionRef cb) const override { + for (const auto& [address, host] : map_) { + cb(address, host); + } + } + +private: + const PersistentHostMap map_; +}; + +} // namespace + +struct PersistentCrossPriorityHostMap::Impl { + PersistentHostMap map; +}; + +PersistentCrossPriorityHostMap::PersistentCrossPriorityHostMap() + : impl_(std::make_unique()) {} + +PersistentCrossPriorityHostMap::~PersistentCrossPriorityHostMap() = default; + +void PersistentCrossPriorityHostMap::seedFrom(const HostMap& flat_map) { + PersistentHostMap seeded; + for (const auto& [address, host] : flat_map) { + seeded = std::move(seeded).set(address, host); + } + impl_->map = std::move(seeded); +} + +void PersistentCrossPriorityHostMap::exportTo(HostMap& flat_map) const { + for (const auto& [address, host] : impl_->map) { + flat_map.insert({address, host}); + } +} + +HostSharedPtr PersistentCrossPriorityHostMap::find(const std::string& address) const { + const HostSharedPtr* host = impl_->map.find(address); + return host != nullptr ? *host : nullptr; +} + +void PersistentCrossPriorityHostMap::set(const std::string& address, const HostSharedPtr& host) { + impl_->map = std::move(impl_->map).set(address, host); +} + +void PersistentCrossPriorityHostMap::erase(const std::string& address) { + impl_->map = std::move(impl_->map).erase(address); +} + +HostLookupTableConstSharedPtr PersistentCrossPriorityHostMap::publish() const { + return std::make_shared(impl_->map); +} + +} // namespace Upstream +} // namespace Envoy diff --git a/source/common/upstream/persistent_host_map.h b/source/common/upstream/persistent_host_map.h new file mode 100644 index 0000000000000..bd14321bfbb40 --- /dev/null +++ b/source/common/upstream/persistent_host_map.h @@ -0,0 +1,54 @@ +#pragma once + +#include +#include + +#include "envoy/upstream/upstream.h" + +namespace Envoy { +namespace Upstream { + +// Persistent cross-priority host map backing for `MainPrioritySetImpl`. This wraps an `immer` +// persistent map so a membership delta produces a new map that structurally shares unchanged nodes +// with the prior map, making updates O(delta) rather than the O(N) copy the flat HostMap requires. +// +// All `immer` usage is confined to persistent_host_map.cc so the `immer` headers stay out of this +// widely-included header, and the `pimpl` below keeps the `immer` type out of the public interface. +// +// NOTE: This relies on `immer` past the v0.9.1 release, which under-allocated the empty map node +// and tripped the vptr sanitizer. +class PersistentCrossPriorityHostMap { +public: + PersistentCrossPriorityHostMap(); + ~PersistentCrossPriorityHostMap(); + + // Replaces the persistent map with the contents of `flat_map`. Used when a cluster switches from + // the flat backing to the persistent backing so the accumulated membership carries across. + void seedFrom(const HostMap& flat_map); + + // Copies the persistent map into `flat_map`. Used when a cluster switches back to the flat + // backing so the accumulated membership carries across. + void exportTo(HostMap& flat_map) const; + + // Returns the host stored for `address`, or nullptr if absent. + HostSharedPtr find(const std::string& address) const; + + // Inserts or replaces the entry for `address`. + void set(const std::string& address, const HostSharedPtr& host); + + // Removes any entry for `address`. + void erase(const std::string& address); + + // Publishes the current map as a read-only lookup table snapshot. The snapshot is unaffected by + // later mutations because the persistent map shares structure across versions. + HostLookupTableConstSharedPtr publish() const; + +private: + // Holds the `immer` map. Defined in persistent_host_map.cc so the `immer` type stays out of this + // header. + struct Impl; + std::unique_ptr impl_; +}; + +} // namespace Upstream +} // namespace Envoy diff --git a/source/common/upstream/upstream_impl.cc b/source/common/upstream/upstream_impl.cc index 345f8a12869ec..a749b2ef33e67 100644 --- a/source/common/upstream/upstream_impl.cc +++ b/source/common/upstream/upstream_impl.cc @@ -6,6 +6,7 @@ #include #include #include +#include #include #include "envoy/common/optref.h" @@ -61,6 +62,7 @@ #include "source/common/upstream/cluster_factory_impl.h" #include "source/common/upstream/health_checker_impl.h" #include "source/common/upstream/locality_pool.h" +#include "source/common/upstream/persistent_host_map.h" #include "source/server/transport_socket_config_impl.h" #include "absl/container/node_hash_set.h" @@ -68,7 +70,33 @@ namespace Envoy { namespace Upstream { + namespace { + +// Wraps a flat HostMap behind the HostLookupTable interface. This is the default backing, used +// unless the cluster opts into the persistent map via `setUsePersistentCrossPriorityHostMap()`. +class FlatHostLookupTable : public HostLookupTable { +public: + explicit FlatHostLookupTable(HostMapConstSharedPtr map) : map_(std::move(map)) {} + HostSharedPtr findHost(absl::string_view address) const override { + // `flat_hash_map` supports transparent lookup, so a `string_view` key + // avoids an allocation on the host-selection hot path. + const auto it = map_->find(address); + return it != map_->end() ? it->second : nullptr; + } + size_t size() const override { return map_->size(); } + bool empty() const override { return map_->empty(); } + void + forEach(absl::FunctionRef cb) const override { + for (const auto& [address, host] : *map_) { + cb(address, host); + } + } + +private: + const HostMapConstSharedPtr map_; +}; + const envoy::config::cluster::v3::UpstreamConnectionOptions::HappyEyeballsConfig& defaultHappyEyeballsConfig() { CONSTRUCT_ON_FIRST_USE( @@ -407,6 +435,10 @@ createUpstreamLocalAddressSelector( } // namespace +HostLookupTableConstSharedPtr makeFlatHostLookupTable(HostMapConstSharedPtr map) { + return std::make_shared(std::move(map)); +} + // Allow disabling ALPN checks for transport sockets. See // https://github.com/envoyproxy/envoy/issues/22876 const absl::string_view ClusterImplBase::DoNotValidateAlpnRuntimeKey = @@ -873,11 +905,11 @@ void PrioritySetImpl::updateHosts(uint32_t priority, UpdateHostsParams&& update_ const HostVector& hosts_added, const HostVector& hosts_removed, std::optional weighted_priority_health, std::optional overprovisioning_factor, - HostMapConstSharedPtr cross_priority_host_map) { + HostLookupTableConstSharedPtr cross_priority_host_map) { // Update cross priority host map first. In this way, when the update callbacks of the priority // set are executed, the latest host map can always be obtained. if (cross_priority_host_map != nullptr) { - const_cross_priority_host_map_ = std::move(cross_priority_host_map); + cross_priority_lookup_ = std::move(cross_priority_host_map); } // Ensure that we have a HostSet for the given priority. @@ -925,13 +957,17 @@ void PrioritySetImpl::BatchUpdateScope::updateHosts( hosts_removed, weighted_priority_health, overprovisioning_factor); } +MainPrioritySetImpl::MainPrioritySetImpl() = default; + +MainPrioritySetImpl::~MainPrioritySetImpl() = default; + void MainPrioritySetImpl::updateHosts(uint32_t priority, UpdateHostsParams&& update_hosts_params, LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added, const HostVector& hosts_removed, std::optional weighted_priority_health, std::optional overprovisioning_factor, - HostMapConstSharedPtr cross_priority_host_map) { + HostLookupTableConstSharedPtr cross_priority_host_map) { ASSERT(cross_priority_host_map == nullptr, "External cross-priority host map is meaningless to MainPrioritySetImpl"); updateCrossPriorityHostMap(priority, hosts_added, hosts_removed); @@ -941,13 +977,20 @@ void MainPrioritySetImpl::updateHosts(uint32_t priority, UpdateHostsParams&& upd overprovisioning_factor); } -HostMapConstSharedPtr MainPrioritySetImpl::crossPriorityHostMap() const { - // Check if the host set in the main thread PrioritySet has been updated. +HostLookupTableConstSharedPtr MainPrioritySetImpl::crossPriorityHostMap() const { + if (last_update_used_persistent_) { + // The persistent path republishes the lookup table inside `updateCrossPriorityHostMap` whenever + // the map changes, so the cached table is already current here. + return cross_priority_lookup_; + } + // Legacy flat path. Check if the host set in the main thread PrioritySet has been updated. if (mutable_cross_priority_host_map_ != nullptr) { const_cross_priority_host_map_ = std::move(mutable_cross_priority_host_map_); ASSERT(mutable_cross_priority_host_map_ == nullptr); + // Publish the flat map through the lookup table so consumers always see the latest. + cross_priority_lookup_ = std::make_shared(const_cross_priority_host_map_); } - return const_cross_priority_host_map_; + return cross_priority_lookup_; } void MainPrioritySetImpl::updateCrossPriorityHostMap(uint32_t priority, @@ -958,8 +1001,61 @@ void MainPrioritySetImpl::updateCrossPriorityHostMap(uint32_t priority, return; } - // Since read_only_all_host_map_ may be shared by multiple threads, when the host set changes, - // we cannot directly modify read_only_all_host_map_. + const bool use_persistent = use_persistent_cross_priority_host_map_; + + if (use_persistent) { + // Build the persistent backing lazily so default flat-path clusters never construct an `immer` + // map. This keeps the flat path allocation-free and avoids `immer`'s shared empty node, whose + // layout the vptr sanitizer rejects. + if (persistent_host_map_ == nullptr) { + persistent_host_map_ = std::make_unique(); + } + // If the persistent backing was just selected, seed it from the flat backing so the accumulated + // membership carries across the switch. + if (!last_update_used_persistent_) { + const HostMap& flat = mutable_cross_priority_host_map_ != nullptr + ? *mutable_cross_priority_host_map_ + : *const_cross_priority_host_map_; + persistent_host_map_->seedFrom(flat); + } + + for (const auto& host : hosts_removed) { + const auto host_address = addressToString(host->address()); + const HostSharedPtr existing_host = persistent_host_map_->find(host_address); + // Only delete from the current priority to protect from situations where + // the add operation was already executed and has already moved the metadata of the host + // from a higher priority value to a lower priority value. + if (existing_host != nullptr && existing_host->priority() == priority) { + persistent_host_map_->erase(host_address); + } + } + for (const auto& host : hosts_added) { + // Like the flat path's `insert`, the first host wins on an address collision across + // priorities, so the persistent backing is a pure performance lever with no behavior change. + const auto host_address = addressToString(host->address()); + if (persistent_host_map_->find(host_address) == nullptr) { + persistent_host_map_->set(host_address, host); + } + } + + // Republish the lookup table so the cached wrapper is rebuilt only when the map changes. + cross_priority_lookup_ = persistent_host_map_->publish(); + last_update_used_persistent_ = true; + return; + } + + // Legacy flat path. If the persistent backing was just deselected, seed the flat backing from + // the persistent map so the accumulated membership carries across the switch. + if (last_update_used_persistent_) { + auto seeded = std::make_shared(); + persistent_host_map_->exportTo(*seeded); + const_cross_priority_host_map_ = std::move(seeded); + mutable_cross_priority_host_map_ = nullptr; + } + last_update_used_persistent_ = false; + + // Since the read only host map may be shared by multiple threads, when the host set changes, + // we cannot directly modify it. if (mutable_cross_priority_host_map_ == nullptr) { // Copy old read only host map to mutable host map. mutable_cross_priority_host_map_ = std::make_shared(*const_cross_priority_host_map_); @@ -2318,7 +2414,8 @@ void PriorityStateManager::updateClusterPrioritySet( bool BaseDynamicClusterImpl::updateDynamicHostList( const HostVector& new_hosts, HostVector& current_priority_hosts, HostVector& hosts_added_to_current_priority, HostVector& hosts_removed_from_current_priority, - const HostMap& all_hosts, const absl::flat_hash_set& all_new_hosts) { + absl::FunctionRef host_lookup, + const absl::flat_hash_set& all_new_hosts) { uint64_t max_host_weight = 1; // Did hosts change? @@ -2356,13 +2453,13 @@ bool BaseDynamicClusterImpl::updateDynamicHostList( for (const HostSharedPtr& host : new_hosts) { // To match a new host with an existing host means comparing their addresses. const auto host_address_string = addressToString(host->address()); - auto existing_host = all_hosts.find(host_address_string); - const bool existing_host_found = existing_host != all_hosts.end(); + const HostSharedPtr existing_host = host_lookup(host_address_string); + const bool existing_host_found = existing_host != nullptr; // Clear any pending deletion flag on an existing host in case it came back while it was // being stabilized. We will set it again below if needed. if (existing_host_found) { - existing_host->second->healthFlagClear(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL); + existing_host->healthFlagClear(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL); } // Check if in-place host update should be skipped, i.e. when the following criteria are met @@ -2371,19 +2468,19 @@ bool BaseDynamicClusterImpl::updateDynamicHostList( // but the health check address is different. const bool health_check_address_changed = (health_checker_ != nullptr && existing_host_found && - *existing_host->second->healthCheckAddress() != *host->healthCheckAddress()); + *existing_host->healthCheckAddress() != *host->healthCheckAddress()); bool locality_changed = false; - locality_changed = (existing_host_found && - (!LocalityEqualTo()(host->locality(), existing_host->second->locality()))); + locality_changed = + (existing_host_found && (!LocalityEqualTo()(host->locality(), existing_host->locality()))); if (locality_changed) { - hosts_with_updated_locality_for_current_priority.emplace(existing_host->first); + hosts_with_updated_locality_for_current_priority.emplace(host_address_string); } const bool active_health_check_flag_changed = (health_checker_ != nullptr && existing_host_found && - existing_host->second->disableActiveHealthCheck() != host->disableActiveHealthCheck()); + existing_host->disableActiveHealthCheck() != host->disableActiveHealthCheck()); if (active_health_check_flag_changed) { - hosts_with_active_health_check_flag_changed.emplace(existing_host->first); + hosts_with_active_health_check_flag_changed.emplace(host_address_string); } const bool skip_inplace_host_update = health_check_address_changed || locality_changed || active_health_check_flag_changed; @@ -2392,14 +2489,14 @@ bool BaseDynamicClusterImpl::updateDynamicHostList( // host's health check flag and metadata. Afterwards, the host is pushed back into the // final_hosts, i.e. hosts that should be preserved in the current priority. if (existing_host_found && !skip_inplace_host_update) { - existing_hosts_for_current_priority.emplace(existing_host->first); + existing_hosts_for_current_priority.emplace(host_address_string); // If we find a host matched based on address, we keep it. However we do change weight // inline so do that here. if (host->weight() > max_host_weight) { max_host_weight = host->weight(); } - if (existing_host->second->weight() != host->weight()) { - existing_host->second->weight(host->weight()); + if (existing_host->weight() != host->weight()) { + existing_host->weight(host->weight()); // We do full host set rebuilds so that load balancers can do pre-computation of data // structures based on host weight. This may become a performance problem in certain // deployments so it is runtime feature guarded and may also need to be configurable @@ -2407,32 +2504,32 @@ bool BaseDynamicClusterImpl::updateDynamicHostList( hosts_changed = true; } - hosts_changed |= updateEdsHealthFlag(*host, *existing_host->second); + hosts_changed |= updateEdsHealthFlag(*host, *existing_host); // Did metadata change? Compare cached hashes for O(1) comparison. - const bool metadata_changed = host->metadataHash() != existing_host->second->metadataHash(); + const bool metadata_changed = host->metadataHash() != existing_host->metadataHash(); if (metadata_changed) { // First, update the entire metadata for the endpoint. - existing_host->second->metadata(host->metadata()); + existing_host->metadata(host->metadata()); // Also, given that the canary attribute of an endpoint is derived from its metadata // (e.g.: from envoy.lb/canary), we do a blind update here since it's cheaper than testing // to see if it actually changed. We must update this besides just updating the metadata, // because it'll be used by the router filter to compute upstream stats. - existing_host->second->canary(host->canary()); + existing_host->canary(host->canary()); // If metadata changed, we need to rebuild. See github issue #3810. hosts_changed = true; } // Did the priority change? - if (host->priority() != existing_host->second->priority()) { - existing_host->second->priority(host->priority()); - hosts_added_to_current_priority.emplace_back(existing_host->second); + if (host->priority() != existing_host->priority()) { + existing_host->priority(host->priority()); + hosts_added_to_current_priority.emplace_back(existing_host); } - final_hosts.push_back(existing_host->second); + final_hosts.push_back(existing_host); } else { new_hosts_for_current_priority.emplace(host_address_string); if (host->weight() > max_host_weight) { @@ -2448,10 +2545,10 @@ bool BaseDynamicClusterImpl::updateDynamicHostList( // If there's an existing host, use the same active health-status. // The existing host can be marked PENDING_ACTIVE_HC or // ACTIVE_HC_TIMEOUT if it is also marked with FAILED_ACTIVE_HC. - ASSERT(!existing_host->second->healthFlagGet(Host::HealthFlag::PENDING_ACTIVE_HC) || - existing_host->second->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); - ASSERT(!existing_host->second->healthFlagGet(Host::HealthFlag::ACTIVE_HC_TIMEOUT) || - existing_host->second->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); + ASSERT(!existing_host->healthFlagGet(Host::HealthFlag::PENDING_ACTIVE_HC) || + existing_host->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); + ASSERT(!existing_host->healthFlagGet(Host::HealthFlag::ACTIVE_HC_TIMEOUT) || + existing_host->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); constexpr uint32_t active_hc_statuses_mask = enumToInt(Host::HealthFlag::FAILED_ACTIVE_HC) | @@ -2459,7 +2556,7 @@ bool BaseDynamicClusterImpl::updateDynamicHostList( enumToInt(Host::HealthFlag::PENDING_ACTIVE_HC) | enumToInt(Host::HealthFlag::ACTIVE_HC_TIMEOUT); - const uint32_t existing_host_statuses = existing_host->second->healthFlagsGetAll(); + const uint32_t existing_host_statuses = existing_host->healthFlagsGetAll(); host->healthFlagsSetAll(existing_host_statuses & active_hc_statuses_mask); } else { // No previous known host, mark it as failed active HC. diff --git a/source/common/upstream/upstream_impl.h b/source/common/upstream/upstream_impl.h index c2ce4b8b308e6..cdaa95c7f1b74 100644 --- a/source/common/upstream/upstream_impl.h +++ b/source/common/upstream/upstream_impl.h @@ -69,6 +69,7 @@ #include "absl/container/inlined_vector.h" #include "absl/container/node_hash_set.h" +#include "absl/functional/function_ref.h" #include "absl/synchronization/mutex.h" namespace Envoy { @@ -725,12 +726,12 @@ class PrioritySetImpl : public PrioritySet { const HostVector& hosts_removed, std::optional weighted_priority_health = std::nullopt, std::optional overprovisioning_factor = std::nullopt, - HostMapConstSharedPtr cross_priority_host_map = nullptr) override; + HostLookupTableConstSharedPtr cross_priority_host_map = nullptr) override; void batchHostUpdate(BatchUpdateCb& callback) override; - HostMapConstSharedPtr crossPriorityHostMap() const override { - return const_cross_priority_host_map_; + HostLookupTableConstSharedPtr crossPriorityHostMap() const override { + return cross_priority_lookup_; } protected: @@ -754,8 +755,12 @@ class PrioritySetImpl : public PrioritySet { // avoid any potential lifetime issues. std::vector> host_sets_; - // Read only all host map for fast host searching. This will never be null. - mutable HostMapConstSharedPtr const_cross_priority_host_map_{std::make_shared()}; + // Read only cross-priority host lookup table for fast host searching. This will never be null. + // The worker-local set just stores and serves whatever `updateHosts` published. The flat or + // persistent backing is built by `MainPrioritySetImpl::updateCrossPriorityHostMap` per that + // cluster's setting (see `setUsePersistentCrossPriorityHostMap()`). + mutable HostLookupTableConstSharedPtr cross_priority_lookup_{ + makeFlatHostLookupTable(std::make_shared())}; private: // This is a matching vector to store the callback handles for host_sets_. It is kept separately @@ -793,26 +798,56 @@ class PrioritySetImpl : public PrioritySet { }; }; +// Persistent cross-priority host map backing. Forward-declared to keep the `immer` dependency out +// of this widely-included header. The definition lives in persistent_host_map.h and .cc. +class PersistentCrossPriorityHostMap; + /** * Specialized PrioritySetImpl designed for the main thread. It will update and maintain the read * only cross priority host map when the host set changes. */ class MainPrioritySetImpl : public PrioritySetImpl, public Logger::Loggable { public: + MainPrioritySetImpl(); + ~MainPrioritySetImpl() override; + // PrioritySet void updateHosts(uint32_t priority, UpdateHostsParams&& update_hosts_params, LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added, const HostVector& hosts_removed, std::optional weighted_priority_health = std::nullopt, std::optional overprovisioning_factor = std::nullopt, - HostMapConstSharedPtr cross_priority_host_map = nullptr) override; - HostMapConstSharedPtr crossPriorityHostMap() const override; + HostLookupTableConstSharedPtr cross_priority_host_map = nullptr) override; + HostLookupTableConstSharedPtr crossPriorityHostMap() const override; + + // Selects the persistent cross-priority host map backing for this cluster in place of the legacy + // flat copy-on-write map, trading the per-update O(N) copy for O(delta) updates. Must be called + // on the main thread before the first host update. Dynamic-modules clusters opt in through the + // cluster ABI. + void setUsePersistentCrossPriorityHostMap(bool use_persistent) { + use_persistent_cross_priority_host_map_ = use_persistent; + } protected: void updateCrossPriorityHostMap(uint32_t priority, const HostVector& hosts_added, const HostVector& hosts_removed); + // Legacy flat backing, used unless the persistent backing is selected. This preserves the exact + // pre-existing behavior. Read only map will never be null. + mutable HostMapConstSharedPtr const_cross_priority_host_map_{std::make_shared()}; mutable HostMapSharedPtr mutable_cross_priority_host_map_; + +private: + // Persistent backing, used when `use_persistent_cross_priority_host_map_` is set. Built lazily on + // the first persistent update so flat-path clusters never construct an `immer` map. `immer` is + // confined to the PersistentCrossPriorityHostMap implementation. + std::unique_ptr persistent_host_map_; + // Whether this cluster uses the persistent backing. Set once before the first host update. See + // `setUsePersistentCrossPriorityHostMap()`. + bool use_persistent_cross_priority_host_map_{false}; + // Tracks the backing used by the last update so a switch of the backing reseeds the newly-active + // backing from the currently-published map. + bool last_update_used_persistent_{false}; }; /** @@ -1363,14 +1398,16 @@ class BaseDynamicClusterImpl : public ClusterImplBase { * @param hosts_added_to_current_priority will be populated with hosts added to the priority. * @param hosts_removed_from_current_priority will be populated with hosts removed from the * priority. - * @param all_hosts all known hosts prior to this host update across all priorities. + * @param host_lookup resolves a host address string to the existing host across all priorities + * prior to this update, or returns nullptr if absent. Backed by the cross-priority host lookup + * table or by a cluster-local flat map. * @param all_new_hosts addresses of all hosts in the new configuration across all priorities. * @return whether the hosts for the priority changed. */ bool updateDynamicHostList(const HostVector& new_hosts, HostVector& current_priority_hosts, HostVector& hosts_added_to_current_priority, HostVector& hosts_removed_from_current_priority, - const HostMap& all_hosts, + absl::FunctionRef host_lookup, const absl::flat_hash_set& all_new_hosts); }; diff --git a/source/extensions/bootstrap/reverse_tunnel/downstream_socket_interface/reverse_connection_io_handle.cc b/source/extensions/bootstrap/reverse_tunnel/downstream_socket_interface/reverse_connection_io_handle.cc index c227131a6964f..71181e2894ae6 100644 --- a/source/extensions/bootstrap/reverse_tunnel/downstream_socket_interface/reverse_connection_io_handle.cc +++ b/source/extensions/bootstrap/reverse_tunnel/downstream_socket_interface/reverse_connection_io_handle.cc @@ -520,20 +520,27 @@ void ReverseConnectionIOHandle::maintainClusterConnections( ReverseConnectionState::CannotConnect); return; } + // `HostLookupTable` provides `forEach()` but is not `range-iterable`, so collect the entries into + // a local vector once to iterate and size them below. + std::vector> resolved_host_entries; + host_map_ptr->forEach( + [&resolved_host_entries](const std::string& address, const Upstream::HostSharedPtr& host) { + resolved_host_entries.emplace_back(address, host); + }); // Retrieve the resolved hosts for a cluster and update the corresponding maps. std::vector resolved_hosts; - for (const auto& host_itr : *host_map_ptr) { - const std::string& resolved = host_itr.first; + resolved_hosts.reserve(resolved_host_entries.size()); + for (const auto& [resolved, host] : resolved_host_entries) { resolved_hosts.emplace_back(resolved); } maybeUpdateHostsMappingsAndConnections(cluster_name, std::move(resolved_hosts)); // Track successful connections for this cluster. uint32_t total_successful_connections = 0; uint32_t total_required_connections = - host_map_ptr->size() * cluster_config.reverse_connection_count; + resolved_host_entries.size() * cluster_config.reverse_connection_count; // Create connections to each host in the cluster. - for (const auto& [host_address, host] : *host_map_ptr) { + for (const auto& [host_address, host] : resolved_host_entries) { ENVOY_LOG(debug, "reverse_tunnel: Checking reverse connection count for host {} of cluster {}", host_address, cluster_name); diff --git a/source/extensions/clusters/dns/dns_cluster.cc b/source/extensions/clusters/dns/dns_cluster.cc index cb95998615fb1..6a7b79d0af7c9 100644 --- a/source/extensions/clusters/dns/dns_cluster.cc +++ b/source/extensions/clusters/dns/dns_cluster.cc @@ -359,8 +359,13 @@ void DnsClusterImpl::ResolveTarget::updateLogicalDnsHosts( void DnsClusterImpl::ResolveTarget::updateStrictDnsHosts(const ParsedHosts& new_hosts) { HostVector hosts_added; HostVector hosts_removed; - if (parent_.updateDynamicHostList(new_hosts.hosts, hosts_, hosts_added, hosts_removed, all_hosts_, - new_hosts.host_addresses)) { + if (parent_.updateDynamicHostList( + new_hosts.hosts, hosts_, hosts_added, hosts_removed, + [this](const std::string& address) -> HostSharedPtr { + const auto it = all_hosts_.find(address); + return it != all_hosts_.end() ? it->second : nullptr; + }, + new_hosts.host_addresses)) { ENVOY_LOG(debug, "DNS hosts have changed for {}", dns_address_); ASSERT(std::all_of(hosts_.begin(), hosts_.end(), [&](const auto& host) { return host->priority() == locality_lb_endpoints_.priority(); diff --git a/source/extensions/clusters/dynamic_modules/abi_impl.cc b/source/extensions/clusters/dynamic_modules/abi_impl.cc index 4da97aa6db730..73034025e1a8d 100644 --- a/source/extensions/clusters/dynamic_modules/abi_impl.cc +++ b/source/extensions/clusters/dynamic_modules/abi_impl.cc @@ -236,6 +236,16 @@ void envoy_dynamic_module_callback_cluster_pre_init_complete( getCluster(cluster_envoy_ptr)->preInitComplete(); } +void envoy_dynamic_module_callback_cluster_use_persistent_host_map( + envoy_dynamic_module_type_cluster_envoy_ptr cluster_envoy_ptr, bool use_persistent_host_map) { + if (!Envoy::Thread::MainThread::isMainOrTestThread()) { + IS_ENVOY_BUG("envoy_dynamic_module_callback_cluster_use_persistent_host_map must be called on " + "the main thread"); + return; + } + getCluster(cluster_envoy_ptr)->setUsePersistentHostMap(use_persistent_host_map); +} + size_t envoy_dynamic_module_callback_cluster_lb_get_healthy_host_count( envoy_dynamic_module_type_cluster_lb_envoy_ptr lb_envoy_ptr, uint32_t priority) { if (lb_envoy_ptr == nullptr) { @@ -401,11 +411,11 @@ bool envoy_dynamic_module_callback_cluster_lb_get_host_health_by_address( return false; } std::string address_str(address.ptr, address.length); - const auto it = host_map->find(address_str); - if (it == host_map->end()) { + const auto host = host_map->findHost(address_str); + if (host == nullptr) { return false; } - switch (it->second->coarseHealth()) { + switch (host->coarseHealth()) { case Envoy::Upstream::Host::Health::Unhealthy: *result = envoy_dynamic_module_type_host_health_Unhealthy; break; @@ -431,11 +441,11 @@ envoy_dynamic_module_callback_cluster_lb_find_host_by_address( return nullptr; } std::string address_str(address.ptr, address.length); - const auto it = host_map->find(address_str); - if (it == host_map->end()) { + const auto host = host_map->findHost(address_str); + if (host == nullptr) { return nullptr; } - return const_cast(it->second.get()); + return const_cast(host.get()); } envoy_dynamic_module_type_cluster_host_envoy_ptr envoy_dynamic_module_callback_cluster_lb_get_host( diff --git a/source/extensions/clusters/dynamic_modules/cluster.cc b/source/extensions/clusters/dynamic_modules/cluster.cc index 8b734732cf7a4..067807153d316 100644 --- a/source/extensions/clusters/dynamic_modules/cluster.cc +++ b/source/extensions/clusters/dynamic_modules/cluster.cc @@ -36,6 +36,9 @@ struct DynamicModuleThreadAwareLoadBalancer : public Upstream::ThreadAwareLoadBa Upstream::LoadBalancerPtr create(Upstream::LoadBalancerParams params) override { return std::make_unique(handle_, params.priority_set); } + + // The module LB applies host churn incrementally, so don't rebuild the worker LB on host + // change. bool recreateOnHostChangeDeprecated() const override { return false; } DynamicModuleClusterHandleSharedPtr handle_; @@ -286,6 +289,10 @@ void DynamicModuleCluster::startPreInit() { void DynamicModuleCluster::preInitComplete() { onPreInitComplete(); } +void DynamicModuleCluster::setUsePersistentHostMap(bool use_persistent) { + priority_set_.setUsePersistentCrossPriorityHostMap(use_persistent); +} + void DynamicModuleCluster::onScheduled(uint64_t event_id) { if (in_module_cluster_ != nullptr && config_->on_cluster_scheduled_ != nullptr) { config_->on_cluster_scheduled_(this, in_module_cluster_, event_id); @@ -416,7 +423,8 @@ bool DynamicModuleCluster::addHosts( } // Skip addresses already in the host set. This does not deduplicate within the batch. - if (existing_hosts != nullptr && existing_hosts->contains(resolved_address->asString())) { + if (existing_hosts != nullptr && + existing_hosts->findHost(resolved_address->asString()) != nullptr) { continue; } @@ -536,11 +544,7 @@ Upstream::HostSharedPtr DynamicModuleCluster::findHostByAddress(const std::strin if (host_map == nullptr) { return nullptr; } - const auto it = host_map->find(address); - if (it == host_map->end()) { - return nullptr; - } - return it->second; + return host_map->findHost(address); } Upstream::HostSharedPtr DynamicModuleCluster::findHost(void* raw_host_ptr) { diff --git a/source/extensions/clusters/dynamic_modules/cluster.h b/source/extensions/clusters/dynamic_modules/cluster.h index 1e8220c68f0d8..93241d6eb9e57 100644 --- a/source/extensions/clusters/dynamic_modules/cluster.h +++ b/source/extensions/clusters/dynamic_modules/cluster.h @@ -340,6 +340,7 @@ class DynamicModuleCluster : public Upstream::ClusterImplBase, Upstream::HostSharedPtr findHost(void* raw_host_ptr); Upstream::HostSharedPtr findHostByAddress(const std::string& address); void preInitComplete(); + void setUsePersistentHostMap(bool use_persistent); /** * Called when an event is scheduled via DynamicModuleClusterScheduler::commit. diff --git a/source/extensions/clusters/eds/eds.cc b/source/extensions/clusters/eds/eds.cc index f0039ee7e9991..385f3466a797e 100644 --- a/source/extensions/clusters/eds/eds.cc +++ b/source/extensions/clusters/eds/eds.cc @@ -104,7 +104,7 @@ void EdsClusterImpl::BatchUpdateHelper::batchUpdate(PrioritySet::HostUpdateCb& h // Get the map of all the latest existing hosts, which is used to filter out the existing // hosts in the process of updating cluster memberships. - HostMapConstSharedPtr all_hosts = parent_.prioritySet().crossPriorityHostMap(); + HostLookupTableConstSharedPtr all_hosts = parent_.prioritySet().crossPriorityHostMap(); ASSERT(all_hosts != nullptr); const uint32_t overprovisioning_factor = PROTOBUF_GET_WRAPPED_OR_DEFAULT( @@ -411,7 +411,7 @@ bool EdsClusterImpl::updateHostsPerLocality( const uint32_t priority, bool weighted_priority_health, const uint32_t overprovisioning_factor, const HostVector& new_hosts, LocalityWeightsMap& locality_weights_map, LocalityWeightsMap& new_locality_weights_map, PriorityStateManager& priority_state_manager, - const HostMap& all_hosts, const absl::flat_hash_set& all_new_hosts) { + const HostLookupTable& all_hosts, const absl::flat_hash_set& all_new_hosts) { const auto& host_set = priority_set_.getOrCreateHostSet(priority, overprovisioning_factor); HostVectorSharedPtr current_hosts_copy(new HostVector(host_set.hosts())); @@ -429,8 +429,10 @@ bool EdsClusterImpl::updateHostsPerLocality( // performance implications, since this has the knock on effect that we rebuild the load balancers // and locality scheduler. See the comment in BaseDynamicClusterImpl::updateDynamicHostList // about this. In the future we may need to do better here. - const bool hosts_updated = updateDynamicHostList(new_hosts, *current_hosts_copy, hosts_added, - hosts_removed, all_hosts, all_new_hosts); + const bool hosts_updated = updateDynamicHostList( + new_hosts, *current_hosts_copy, hosts_added, hosts_removed, + [&all_hosts](const std::string& address) { return all_hosts.findHost(address); }, + all_new_hosts); if (hosts_updated || host_set.weightedPriorityHealth() != weighted_priority_health || host_set.overprovisioningFactor() != overprovisioning_factor || locality_weights_map != new_locality_weights_map) { diff --git a/source/extensions/clusters/eds/eds.h b/source/extensions/clusters/eds/eds.h index f067aa93b70fd..7df97d56dc2fc 100644 --- a/source/extensions/clusters/eds/eds.h +++ b/source/extensions/clusters/eds/eds.h @@ -60,7 +60,7 @@ class EdsClusterImpl : public BaseDynamicClusterImpl, LocalityWeightsMap& locality_weights_map, LocalityWeightsMap& new_locality_weights_map, PriorityStateManager& priority_state_manager, - const HostMap& all_hosts, + const HostLookupTable& all_hosts, const absl::flat_hash_set& all_new_hosts); bool validateUpdateSize(int num_resources); const std::string& edsServiceName() const { diff --git a/source/extensions/clusters/redis/redis_cluster.cc b/source/extensions/clusters/redis/redis_cluster.cc index 7d33baf2d9208..7e0349f2ce4bc 100644 --- a/source/extensions/clusters/redis/redis_cluster.cc +++ b/source/extensions/clusters/redis/redis_cluster.cc @@ -186,16 +186,22 @@ void RedisCluster::onClusterSlotUpdate(ClusterSlotsSharedPtr&& slots, // Get the map of all the latest existing hosts, which is used to filter out the existing // hosts in the process of updating cluster memberships. - Upstream::HostMapConstSharedPtr all_hosts = priority_set_.crossPriorityHostMap(); + Upstream::HostLookupTableConstSharedPtr all_hosts = priority_set_.crossPriorityHostMap(); ASSERT(all_hosts != nullptr); Upstream::HostVector hosts_added; Upstream::HostVector hosts_removed; - const bool host_updated = updateDynamicHostList(new_hosts, hosts_, hosts_added, hosts_removed, - *all_hosts, all_new_hosts); + const bool host_updated = updateDynamicHostList( + new_hosts, hosts_, hosts_added, hosts_removed, + [&all_hosts](const std::string& address) { return all_hosts->findHost(address); }, + all_new_hosts); // Create a map containing all the latest hosts to determine whether the slots are updated. - Upstream::HostMap updated_hosts = *all_hosts; + Upstream::HostMap updated_hosts; + all_hosts->forEach( + [&updated_hosts](const std::string& address, const Upstream::HostSharedPtr& host) { + updated_hosts[address] = host; + }); for (const auto& host : hosts_removed) { updated_hosts.erase(host->address()->asString()); } diff --git a/source/extensions/clusters/strict_dns/strict_dns_cluster.cc b/source/extensions/clusters/strict_dns/strict_dns_cluster.cc index 67a681cf46b84..988066d0d127b 100644 --- a/source/extensions/clusters/strict_dns/strict_dns_cluster.cc +++ b/source/extensions/clusters/strict_dns/strict_dns_cluster.cc @@ -177,8 +177,13 @@ void StrictDnsClusterImpl::ResolveTarget::startResolve() { HostVector hosts_added; HostVector hosts_removed; - if (parent_.updateDynamicHostList(new_hosts, hosts_, hosts_added, hosts_removed, - all_hosts_, all_new_hosts)) { + if (parent_.updateDynamicHostList( + new_hosts, hosts_, hosts_added, hosts_removed, + [this](const std::string& address) -> HostSharedPtr { + const auto it = all_hosts_.find(address); + return it != all_hosts_.end() ? it->second : nullptr; + }, + all_new_hosts)) { ENVOY_LOG(debug, "DNS hosts have changed for {}", dns_address_); ASSERT(std::all_of(hosts_.begin(), hosts_.end(), [&](const auto& host) { return host->priority() == locality_lb_endpoints_.priority(); diff --git a/source/extensions/dynamic_modules/abi/abi.h b/source/extensions/dynamic_modules/abi/abi.h index 00b5f65757c86..4d0ea9f844eaf 100644 --- a/source/extensions/dynamic_modules/abi/abi.h +++ b/source/extensions/dynamic_modules/abi/abi.h @@ -9592,6 +9592,19 @@ envoy_dynamic_module_callback_cluster_find_host_by_address( void envoy_dynamic_module_callback_cluster_pre_init_complete( envoy_dynamic_module_type_cluster_envoy_ptr cluster_envoy_ptr); +/** + * envoy_dynamic_module_callback_cluster_use_persistent_host_map selects the persistent + * cross-priority host map backing for this cluster, trading the per-update O(N) copy for O(delta) + * updates. The module may call this during envoy_dynamic_module_on_cluster_init, before the cluster + * discovers any hosts. + * + * @param cluster_envoy_ptr is the pointer to the Envoy cluster. + * @param use_persistent_host_map is true to use the persistent backing, false to use the default + * flat map. + */ +void envoy_dynamic_module_callback_cluster_use_persistent_host_map( + envoy_dynamic_module_type_cluster_envoy_ptr cluster_envoy_ptr, bool use_persistent_host_map); + /** * envoy_dynamic_module_callback_cluster_lb_get_healthy_host_count returns the number of healthy * hosts at the given priority level in the cluster's host set. diff --git a/source/extensions/dynamic_modules/abi_impl.cc b/source/extensions/dynamic_modules/abi_impl.cc index 6fd6f827322e7..9226c91234556 100644 --- a/source/extensions/dynamic_modules/abi_impl.cc +++ b/source/extensions/dynamic_modules/abi_impl.cc @@ -524,6 +524,12 @@ __attribute__((weak)) void envoy_dynamic_module_callback_cluster_pre_init_comple "not implemented in this context"); } +__attribute__((weak)) void envoy_dynamic_module_callback_cluster_use_persistent_host_map( + envoy_dynamic_module_type_cluster_envoy_ptr, bool) { + IS_ENVOY_BUG("envoy_dynamic_module_callback_cluster_use_persistent_host_map: " + "not implemented in this context"); +} + __attribute__((weak)) size_t envoy_dynamic_module_callback_cluster_lb_get_healthy_host_count( envoy_dynamic_module_type_cluster_lb_envoy_ptr, uint32_t) { IS_ENVOY_BUG("envoy_dynamic_module_callback_cluster_lb_get_healthy_host_count: " diff --git a/source/extensions/dynamic_modules/sdk/rust/src/cluster.rs b/source/extensions/dynamic_modules/sdk/rust/src/cluster.rs index c8fedb9d8e9a4..c15c3ad0a1f66 100644 --- a/source/extensions/dynamic_modules/sdk/rust/src/cluster.rs +++ b/source/extensions/dynamic_modules/sdk/rust/src/cluster.rs @@ -331,6 +331,12 @@ pub trait EnvoyCluster: Send + Sync { /// routing traffic to this cluster. fn pre_init_complete(&self); + /// Select the persistent cross-priority host map backing for this cluster. + /// + /// This may be called during [`Cluster::on_init`], before the cluster discovers any hosts, to + /// trade the per-update O(N) host-map copy for O(delta) updates. + fn set_use_persistent_host_map(&self, use_persistent: bool); + /// Add multiple hosts to the cluster with per-host locality and metadata. /// /// Each address must be in `ip:port` format (e.g., `127.0.0.1:8080`). @@ -1027,6 +1033,12 @@ impl EnvoyCluster for EnvoyClusterImpl { } } + fn set_use_persistent_host_map(&self, use_persistent: bool) { + unsafe { + abi::envoy_dynamic_module_callback_cluster_use_persistent_host_map(self.raw, use_persistent); + } + } + fn new_scheduler(&self) -> Box { unsafe { let scheduler_ptr = abi::envoy_dynamic_module_callback_cluster_scheduler_new(self.raw); diff --git a/source/extensions/dynamic_modules/sdk/rust/src/lib_test.rs b/source/extensions/dynamic_modules/sdk/rust/src/lib_test.rs index 55431de029d30..669a440ca6aac 100644 --- a/source/extensions/dynamic_modules/sdk/rust/src/lib_test.rs +++ b/source/extensions/dynamic_modules/sdk/rust/src/lib_test.rs @@ -4256,6 +4256,13 @@ pub extern "C" fn envoy_dynamic_module_callback_cluster_pre_init_complete( ) { } +#[no_mangle] +pub extern "C" fn envoy_dynamic_module_callback_cluster_use_persistent_host_map( + _cluster_envoy_ptr: abi::envoy_dynamic_module_type_cluster_envoy_ptr, + _use_persistent_host_map: bool, +) { +} + #[no_mangle] pub extern "C" fn envoy_dynamic_module_callback_cluster_update_host_health( _cluster_envoy_ptr: abi::envoy_dynamic_module_type_cluster_envoy_ptr, diff --git a/source/extensions/load_balancing_policies/dynamic_modules/abi_impl.cc b/source/extensions/load_balancing_policies/dynamic_modules/abi_impl.cc index a896c24cf6f5c..f34c82b709bce 100644 --- a/source/extensions/load_balancing_policies/dynamic_modules/abi_impl.cc +++ b/source/extensions/load_balancing_policies/dynamic_modules/abi_impl.cc @@ -200,11 +200,11 @@ bool envoy_dynamic_module_callback_lb_get_host_health_by_address( return false; } std::string address_str(address.ptr, address.length); - const auto it = host_map->find(address_str); - if (it == host_map->end()) { + const auto host = host_map->findHost(address_str); + if (host == nullptr) { return false; } - switch (it->second->coarseHealth()) { + switch (host->coarseHealth()) { case Envoy::Upstream::Host::Health::Unhealthy: *result = envoy_dynamic_module_type_host_health_Unhealthy; break; diff --git a/source/extensions/load_balancing_policies/override_host/load_balancer.cc b/source/extensions/load_balancing_policies/override_host/load_balancer.cc index cd2bdcbeae383..4e082fd9f3661 100644 --- a/source/extensions/load_balancing_policies/override_host/load_balancer.cc +++ b/source/extensions/load_balancing_policies/override_host/load_balancer.cc @@ -38,7 +38,8 @@ using ::envoy::extensions::load_balancing_policies::override_host::v3::OverrideH using ::Envoy::Http::HeaderMap; using ::Envoy::Server::Configuration::ServerFactoryContext; using ::Envoy::Upstream::HostConstSharedPtr; -using ::Envoy::Upstream::HostMapConstSharedPtr; +using ::Envoy::Upstream::HostLookupTableConstSharedPtr; +using ::Envoy::Upstream::HostSharedPtr; using ::Envoy::Upstream::LoadBalancerConfig; using ::Envoy::Upstream::LoadBalancerContext; using ::Envoy::Upstream::LoadBalancerParams; @@ -292,22 +293,21 @@ OverrideHostLoadBalancer::LoadBalancerImpl::getSelectedHosts(LoadBalancerContext HostConstSharedPtr OverrideHostLoadBalancer::LoadBalancerImpl::findHost(absl::string_view endpoint) { - HostMapConstSharedPtr hosts = priority_set_.crossPriorityHostMap(); + HostLookupTableConstSharedPtr hosts = priority_set_.crossPriorityHostMap(); if (hosts == nullptr) { return nullptr; } - ENVOY_LOG(trace, "Looking up {} in {}", endpoint, - absl::StrJoin(*hosts, ", ", - [](std::string* out, Envoy::Upstream::HostMap::const_reference entry) { - absl::StrAppend(out, entry.first); - })); - - if (const auto host_iterator = hosts->find(endpoint); host_iterator != hosts->end()) { - // TODO(yanavlasov): Validate that host health status did not change. - return host_iterator->second; + if (ENVOY_LOG_CHECK_LEVEL(trace)) { + std::vector addresses; + hosts->forEach([&addresses](const std::string& address, const HostSharedPtr&) { + addresses.push_back(address); + }); + ENVOY_LOG(trace, "Looking up {} in {}", endpoint, absl::StrJoin(addresses, ", ")); } - return nullptr; + + // TODO(yanavlasov): Validate that host health status did not change. + return hosts->findHost(endpoint); } HostConstSharedPtr OverrideHostLoadBalancer::LoadBalancerImpl::getEndpoint( diff --git a/test/common/upstream/host_utility_test.cc b/test/common/upstream/host_utility_test.cc index 4e6b18d1cc60e..203a215ffa26a 100644 --- a/test/common/upstream/host_utility_test.cc +++ b/test/common/upstream/host_utility_test.cc @@ -167,7 +167,8 @@ TEST(HostUtilityTest, SelectOverrideHostTest) { { // No valid load balancer context. auto host_map = std::make_shared(); - expect_result(HostUtility::selectOverrideHost(host_map.get(), AllStatuses, nullptr), nullptr, + auto host_lookup = makeFlatHostLookupTable(host_map); + expect_result(HostUtility::selectOverrideHost(host_lookup.get(), AllStatuses, nullptr), nullptr, false, Status::Success); } @@ -176,8 +177,9 @@ TEST(HostUtilityTest, SelectOverrideHostTest) { EXPECT_CALL(context, overrideHostToSelect()) .WillOnce(Return(OptRef())); auto host_map = std::make_shared(); - expect_result(HostUtility::selectOverrideHost(host_map.get(), AllStatuses, &context), nullptr, - false, Status::Success); + auto host_lookup = makeFlatHostLookupTable(host_map); + expect_result(HostUtility::selectOverrideHost(host_lookup.get(), AllStatuses, &context), + nullptr, false, Status::Success); } // Test overriding host in strict and non-strict mode. @@ -197,8 +199,9 @@ TEST(HostUtilityTest, SelectOverrideHostTest) { EXPECT_CALL(context, overrideHostToSelect()) .WillOnce(Return(OptRef(override_host))); auto host_map = std::make_shared(); - expect_result(HostUtility::selectOverrideHost(host_map.get(), AllStatuses, &context), nullptr, - strict_mode, Status::NotFound); + auto host_lookup = makeFlatHostLookupTable(host_map); + expect_result(HostUtility::selectOverrideHost(host_lookup.get(), AllStatuses, &context), + nullptr, strict_mode, Status::NotFound); } { auto mock_host = std::make_shared>(); @@ -211,21 +214,22 @@ TEST(HostUtilityTest, SelectOverrideHostTest) { auto host_map = std::make_shared(); host_map->insert({"1.2.3.4", mock_host}); + auto host_lookup = makeFlatHostLookupTable(host_map); - expect_result(HostUtility::selectOverrideHost(host_map.get(), UnhealthyStatus, &context), + expect_result(HostUtility::selectOverrideHost(host_lookup.get(), UnhealthyStatus, &context), mock_host, strict_mode, Status::Success); - expect_result(HostUtility::selectOverrideHost(host_map.get(), AllStatuses, &context), + expect_result(HostUtility::selectOverrideHost(host_lookup.get(), AllStatuses, &context), mock_host, strict_mode, Status::Success); - expect_result(HostUtility::selectOverrideHost(host_map.get(), HealthyStatus, &context), + expect_result(HostUtility::selectOverrideHost(host_lookup.get(), HealthyStatus, &context), nullptr, strict_mode, Status::Unhealthy); - expect_result(HostUtility::selectOverrideHost(host_map.get(), DegradedStatus, &context), + expect_result(HostUtility::selectOverrideHost(host_lookup.get(), DegradedStatus, &context), nullptr, strict_mode, Status::Unhealthy); - expect_result(HostUtility::selectOverrideHost(host_map.get(), TimeoutStatus, &context), + expect_result(HostUtility::selectOverrideHost(host_lookup.get(), TimeoutStatus, &context), nullptr, strict_mode, Status::Unhealthy); - expect_result(HostUtility::selectOverrideHost(host_map.get(), DrainingStatus, &context), + expect_result(HostUtility::selectOverrideHost(host_lookup.get(), DrainingStatus, &context), nullptr, strict_mode, Status::Unhealthy); - expect_result(HostUtility::selectOverrideHost(host_map.get(), UnknownStatus, &context), + expect_result(HostUtility::selectOverrideHost(host_lookup.get(), UnknownStatus, &context), nullptr, strict_mode, Status::Unhealthy); } { @@ -239,21 +243,22 @@ TEST(HostUtilityTest, SelectOverrideHostTest) { auto host_map = std::make_shared(); host_map->insert({"1.2.3.4", mock_host}); + auto host_lookup = makeFlatHostLookupTable(host_map); - expect_result(HostUtility::selectOverrideHost(host_map.get(), DegradedStatus, &context), + expect_result(HostUtility::selectOverrideHost(host_lookup.get(), DegradedStatus, &context), mock_host, strict_mode, Status::Success); - expect_result(HostUtility::selectOverrideHost(host_map.get(), AllStatuses, &context), + expect_result(HostUtility::selectOverrideHost(host_lookup.get(), AllStatuses, &context), mock_host, strict_mode, Status::Success); - expect_result(HostUtility::selectOverrideHost(host_map.get(), HealthyStatus, &context), + expect_result(HostUtility::selectOverrideHost(host_lookup.get(), HealthyStatus, &context), nullptr, strict_mode, Status::Unhealthy); - expect_result(HostUtility::selectOverrideHost(host_map.get(), UnhealthyStatus, &context), + expect_result(HostUtility::selectOverrideHost(host_lookup.get(), UnhealthyStatus, &context), nullptr, strict_mode, Status::Unhealthy); - expect_result(HostUtility::selectOverrideHost(host_map.get(), TimeoutStatus, &context), + expect_result(HostUtility::selectOverrideHost(host_lookup.get(), TimeoutStatus, &context), nullptr, strict_mode, Status::Unhealthy); - expect_result(HostUtility::selectOverrideHost(host_map.get(), DrainingStatus, &context), + expect_result(HostUtility::selectOverrideHost(host_lookup.get(), DrainingStatus, &context), nullptr, strict_mode, Status::Unhealthy); - expect_result(HostUtility::selectOverrideHost(host_map.get(), UnknownStatus, &context), + expect_result(HostUtility::selectOverrideHost(host_lookup.get(), UnknownStatus, &context), nullptr, strict_mode, Status::Unhealthy); } } diff --git a/test/common/upstream/upstream_impl_test.cc b/test/common/upstream/upstream_impl_test.cc index d3aa08ba7a5f6..7f8c7712d61b5 100644 --- a/test/common/upstream/upstream_impl_test.cc +++ b/test/common/upstream/upstream_impl_test.cc @@ -4312,7 +4312,8 @@ TEST(PrioritySet, Extend) { auto time_source = std::make_unique>(); HostVectorSharedPtr hosts(new HostVector({makeTestHost(info, "tcp://127.0.0.1:80")})); HostsPerLocalitySharedPtr hosts_per_locality = std::make_shared(); - HostMapConstSharedPtr fake_cross_priority_host_map = std::make_shared(); + HostLookupTableConstSharedPtr fake_cross_priority_host_map = + makeFlatHostLookupTable(std::make_shared()); { HostVector hosts_added{hosts->front()}; HostVector hosts_removed{}; @@ -4329,7 +4330,7 @@ TEST(PrioritySet, Extend) { EXPECT_EQ(1, priority_set.hostSetsPerPriority()[1]->hosts().size()); // Simply verify the set and get the cross-priority host map is working properly in the priority - // set. + // set. The worker-local set publishes exactly the lookup table it was handed. EXPECT_EQ(fake_cross_priority_host_map.get(), priority_set.crossPriorityHostMap().get()); // Test iteration. @@ -4441,14 +4442,18 @@ class TestMainPrioritySetImpl : public MainPrioritySetImpl { HostMapSharedPtr mutableHostMapForTest() { return mutable_cross_priority_host_map_; } }; -// Test that the priority set in the main thread can work correctly. +// Test that the priority set in the main thread can work correctly with the legacy flat backing +// (the default). TEST(PrioritySet, MainPrioritySetTest) { TestMainPrioritySetImpl priority_set; + priority_set.setUsePersistentCrossPriorityHostMap(false); priority_set.getOrCreateHostSet(0); std::shared_ptr info{new NiceMock()}; auto time_source = std::make_unique>(); - HostVectorSharedPtr hosts(new HostVector({makeTestHost(info, "tcp://127.0.0.1:80")})); + // Create the host at priority 1 so the priority-guarded removal below (also at priority 1) takes + // effect. + HostVectorSharedPtr hosts(new HostVector({makeTestHost(info, "tcp://127.0.0.1:80", 1, 1)})); HostsPerLocalitySharedPtr hosts_per_locality = std::make_shared(); // The host map is initially empty or null. @@ -4473,8 +4478,11 @@ TEST(PrioritySet, MainPrioritySetTest) { // Mutable host map will be moved to read only host map after `crossPriorityHostMap` is called. HostMapSharedPtr host_map = priority_set.mutableHostMapForTest(); - EXPECT_EQ(host_map.get(), priority_set.crossPriorityHostMap().get()); + HostLookupTableConstSharedPtr lookup = priority_set.crossPriorityHostMap(); + EXPECT_EQ(host_map.get(), priority_set.constHostMapForTest().get()); EXPECT_EQ(nullptr, priority_set.mutableHostMapForTest().get()); + EXPECT_EQ(1, lookup->size()); + EXPECT_NE(nullptr, lookup->findHost("127.0.0.1:80")); { HostVector hosts_added{}; @@ -4496,8 +4504,224 @@ TEST(PrioritySet, MainPrioritySetTest) { // Again, mutable host map will be moved to read only host map after `crossPriorityHostMap` is // called. host_map = priority_set.mutableHostMapForTest(); - EXPECT_EQ(host_map.get(), priority_set.crossPriorityHostMap().get()); + lookup = priority_set.crossPriorityHostMap(); + EXPECT_EQ(host_map.get(), priority_set.constHostMapForTest().get()); EXPECT_EQ(nullptr, priority_set.mutableHostMapForTest().get()); + EXPECT_EQ(0, lookup->size()); + EXPECT_EQ(nullptr, lookup->findHost("127.0.0.1:80")); +} + +// Test that the priority set in the main thread can work correctly under both the legacy flat +// backing and the persistent backing, and that a snapshot taken before a later add is isolated +// from the add (structural-sharing isolation) on the persistent path. +class MainPrioritySetParamTest : public testing::TestWithParam {}; + +INSTANTIATE_TEST_SUITE_P(PersistentBacking, MainPrioritySetParamTest, testing::Bool()); + +TEST_P(MainPrioritySetParamTest, AddRemoveAndSnapshotIsolation) { + MainPrioritySetImpl priority_set; + priority_set.setUsePersistentCrossPriorityHostMap(GetParam()); + priority_set.getOrCreateHostSet(0); + + std::shared_ptr info{new NiceMock()}; + HostSharedPtr host_a = makeTestHost(info, "tcp://127.0.0.1:80"); + HostSharedPtr host_b = makeTestHost(info, "tcp://127.0.0.1:81"); + HostsPerLocalitySharedPtr hosts_per_locality = std::make_shared(); + + // Add A and B. + { + HostVectorSharedPtr hosts(new HostVector({host_a, host_b})); + HostVector hosts_added{host_a, host_b}; + HostVector hosts_removed{}; + priority_set.updateHosts(0, + updateHostsParams(hosts, hosts_per_locality, + std::make_shared(*hosts), + hosts_per_locality), + {}, hosts_added, hosts_removed, std::nullopt); + } + HostLookupTableConstSharedPtr snapshot = priority_set.crossPriorityHostMap(); + EXPECT_EQ(2, snapshot->size()); + EXPECT_EQ(host_a, snapshot->findHost("127.0.0.1:80")); + EXPECT_EQ(host_b, snapshot->findHost("127.0.0.1:81")); + + // Exercise the rest of the HostLookupTable interface on the published table for both backings. + EXPECT_FALSE(snapshot->empty()); + HostMap for_each_hosts; + snapshot->forEach([&for_each_hosts](const std::string& address, const HostSharedPtr& host) { + for_each_hosts.emplace(address, host); + }); + EXPECT_EQ((HostMap{{"127.0.0.1:80", host_a}, {"127.0.0.1:81", host_b}}), for_each_hosts); + // A lookup table with no hosts reports empty. + EXPECT_TRUE(MainPrioritySetImpl().crossPriorityHostMap()->empty()); + + // Remove A. + { + HostVectorSharedPtr hosts(new HostVector({host_b})); + HostVector hosts_added{}; + HostVector hosts_removed{host_a}; + priority_set.updateHosts(0, + updateHostsParams(hosts, hosts_per_locality, + std::make_shared(*hosts), + hosts_per_locality), + {}, hosts_added, hosts_removed, std::nullopt); + } + HostLookupTableConstSharedPtr after_remove = priority_set.crossPriorityHostMap(); + EXPECT_EQ(1, after_remove->size()); + EXPECT_EQ(nullptr, after_remove->findHost("127.0.0.1:80")); + EXPECT_EQ(host_b, after_remove->findHost("127.0.0.1:81")); + + // The persistent path structurally shares nodes, so a snapshot taken before the removal must + // still observe the old contents. The flat path copies on publish, so it is also isolated. + EXPECT_EQ(2, snapshot->size()); + EXPECT_EQ(host_a, snapshot->findHost("127.0.0.1:80")); + EXPECT_EQ(host_b, snapshot->findHost("127.0.0.1:81")); + + // Remove the last host so the published table reports empty. This exercises `empty()` returning + // true on the active backing. The fresh-object check above reaches it only for the flat table. + { + HostVectorSharedPtr hosts(new HostVector({})); + HostVector hosts_added{}; + HostVector hosts_removed{host_b}; + priority_set.updateHosts(0, + updateHostsParams(hosts, hosts_per_locality, + std::make_shared(*hosts), + hosts_per_locality), + {}, hosts_added, hosts_removed, std::nullopt); + } + HostLookupTableConstSharedPtr after_remove_all = priority_set.crossPriorityHostMap(); + EXPECT_TRUE(after_remove_all->empty()); + EXPECT_EQ(0, after_remove_all->size()); +} + +// A removal that names an address never published must be a no-op under either backing. This +// exercises the absent-address branch where the lookup misses before the priority guard. +TEST_P(MainPrioritySetParamTest, RemoveAbsentAddressIsNoOp) { + MainPrioritySetImpl priority_set; + priority_set.setUsePersistentCrossPriorityHostMap(GetParam()); + priority_set.getOrCreateHostSet(0); + + std::shared_ptr info{new NiceMock()}; + HostSharedPtr host_a = makeTestHost(info, "tcp://127.0.0.1:80"); + HostSharedPtr host_absent = makeTestHost(info, "tcp://127.0.0.1:99"); + HostsPerLocalitySharedPtr hosts_per_locality = std::make_shared(); + + // Publish A. + { + HostVectorSharedPtr hosts(new HostVector({host_a})); + HostVector hosts_added{host_a}; + HostVector hosts_removed{}; + priority_set.updateHosts(0, + updateHostsParams(hosts, hosts_per_locality, + std::make_shared(*hosts), + hosts_per_locality), + {}, hosts_added, hosts_removed, std::nullopt); + } + + // Remove an address that was never added. The lookup misses, so the published map is unchanged. + { + HostVectorSharedPtr hosts(new HostVector({host_a})); + HostVector hosts_added{}; + HostVector hosts_removed{host_absent}; + priority_set.updateHosts(0, + updateHostsParams(hosts, hosts_per_locality, + std::make_shared(*hosts), + hosts_per_locality), + {}, hosts_added, hosts_removed, std::nullopt); + } + + HostLookupTableConstSharedPtr lookup = priority_set.crossPriorityHostMap(); + EXPECT_EQ(1, lookup->size()); + EXPECT_EQ(host_a, lookup->findHost("127.0.0.1:80")); + EXPECT_EQ(nullptr, lookup->findHost("127.0.0.1:99")); +} + +// By contract the per-cluster setter is set once before the first host update, not toggled in +// production. The reseed-on-switch path exists only for setter robustness and this test. Verify the +// accumulated membership carries across a switch in both directions with no host lost or +// duplicated. The flat-to-persistent switch seeds from the un-promoted flat copy, and the +// persistent-to-flat switch seeds from the persistent map and then honors the priority-guarded +// removal. +TEST(MainPrioritySetTest, CrossPriorityHostMapBackingSwitch) { + MainPrioritySetImpl priority_set; + priority_set.getOrCreateHostSet(0); + auto set_backing = [&](bool on) { priority_set.setUsePersistentCrossPriorityHostMap(on); }; + std::shared_ptr info{new NiceMock()}; + HostSharedPtr host_a = makeTestHost(info, "tcp://127.0.0.1:80"); + HostSharedPtr host_b = makeTestHost(info, "tcp://127.0.0.1:81"); + HostSharedPtr host_c = makeTestHost(info, "tcp://127.0.0.1:82"); + HostsPerLocalitySharedPtr per_locality = std::make_shared(); + + auto apply = [&](const HostVector& all, const HostVector& added, const HostVector& removed) { + HostVectorSharedPtr hosts(new HostVector(all)); + priority_set.updateHosts(0, + updateHostsParams(hosts, per_locality, + std::make_shared(*hosts), + per_locality), + {}, added, removed, std::nullopt); + }; + + // Flat backing, add A. The map is not read here, so the flat copy stays un-promoted and the next + // switch must seed from it. + set_backing(false); + apply({host_a}, {host_a}, {}); + + // Switch to persistent. The first update seeds the persistent map from the un-promoted flat + // backing, A carries across and B is added. + set_backing(true); + apply({host_a, host_b}, {host_b}, {}); + { + HostLookupTableConstSharedPtr lookup = priority_set.crossPriorityHostMap(); + EXPECT_EQ(2, lookup->size()); + EXPECT_EQ(host_a, lookup->findHost("127.0.0.1:80")); + EXPECT_EQ(host_b, lookup->findHost("127.0.0.1:81")); + } + + // Switch back to flat. The first update seeds the flat backing from the persistent map, A and B + // carry across, then A is removed and C is added. + set_backing(false); + apply({host_b, host_c}, {host_c}, {host_a}); + { + HostLookupTableConstSharedPtr lookup = priority_set.crossPriorityHostMap(); + EXPECT_EQ(2, lookup->size()); + EXPECT_EQ(nullptr, lookup->findHost("127.0.0.1:80")); + EXPECT_EQ(host_b, lookup->findHost("127.0.0.1:81")); + EXPECT_EQ(host_c, lookup->findHost("127.0.0.1:82")); + } +} + +// Two hosts can share an address across priorities. The first writer must win the cross-priority +// entry (matching the flat map's `insert`), and a removal at a non-matching priority must not evict +// it (the priority guard). Both must hold identically under either backing. +TEST_P(MainPrioritySetParamTest, CollisionKeepFirstAndPriorityGuardedRemovalSkip) { + MainPrioritySetImpl priority_set; + priority_set.setUsePersistentCrossPriorityHostMap(GetParam()); + priority_set.getOrCreateHostSet(0); + std::shared_ptr info{new NiceMock()}; + HostSharedPtr host_p0 = makeTestHost(info, "tcp://127.0.0.1:80", 1, 0); + HostSharedPtr host_p1 = makeTestHost(info, "tcp://127.0.0.1:80", 1, 1); + HostsPerLocalitySharedPtr per_locality = std::make_shared(); + + auto apply = [&](uint32_t priority, const HostVector& all, const HostVector& added, + const HostVector& removed) { + HostVectorSharedPtr hosts(new HostVector(all)); + priority_set.updateHosts(priority, + updateHostsParams(hosts, per_locality, + std::make_shared(*hosts), + per_locality), + {}, added, removed, std::nullopt); + }; + + // Record the priority-0 host, then deliver the same address at priority 1. The first writer wins. + apply(0, {host_p0}, {host_p0}, {}); + apply(1, {host_p1}, {host_p1}, {}); + EXPECT_EQ(host_p0, priority_set.crossPriorityHostMap()->findHost("127.0.0.1:80")); + + // Removing the priority-1 host must not evict the recorded priority-0 host, the stored host's + // priority does not match the removal priority so the guard skips the erase. + apply(1, {}, {}, {host_p1}); + HostLookupTableConstSharedPtr lookup = priority_set.crossPriorityHostMap(); + EXPECT_EQ(1, lookup->size()); + EXPECT_EQ(host_p0, lookup->findHost("127.0.0.1:80")); } class ClusterInfoImplTest : public testing::Test, public UpstreamImplTestBase { diff --git a/test/extensions/bootstrap/reverse_tunnel/downstream_socket_interface/rc_connection_wrapper_test.cc b/test/extensions/bootstrap/reverse_tunnel/downstream_socket_interface/rc_connection_wrapper_test.cc index 8c6d902fe8abf..3e95fc1a299a1 100644 --- a/test/extensions/bootstrap/reverse_tunnel/downstream_socket_interface/rc_connection_wrapper_test.cc +++ b/test/extensions/bootstrap/reverse_tunnel/downstream_socket_interface/rc_connection_wrapper_test.cc @@ -762,7 +762,8 @@ TEST_F(RCConnectionWrapperTest, OnHandshakeSuccess) { auto mock_host = createMockHost("192.168.1.1"); (*host_map)["192.168.1.1"] = std::const_pointer_cast(mock_host); - EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()).WillRepeatedly(Return(host_map)); + EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()) + .WillRepeatedly(Return(Upstream::makeFlatHostLookupTable(host_map))); // Create HostConnectionInfo entry. addHostConnectionInfo("192.168.1.1", "test-cluster", 1); @@ -855,7 +856,8 @@ TEST_F(RCConnectionWrapperTest, OnHandshakeFailure) { auto mock_host = createMockHost("192.168.1.1"); (*host_map)["192.168.1.1"] = std::const_pointer_cast(mock_host); - EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()).WillRepeatedly(Return(host_map)); + EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()) + .WillRepeatedly(Return(Upstream::makeFlatHostLookupTable(host_map))); // Create HostConnectionInfo entry. addHostConnectionInfo("192.168.1.1", "test-cluster", 1); @@ -953,7 +955,8 @@ TEST_F(RCConnectionWrapperTest, OnHandshakeFailureEncodeError) { auto mock_host = createMockHost("192.168.1.1"); (*host_map)["192.168.1.1"] = std::const_pointer_cast(mock_host); - EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()).WillRepeatedly(Return(host_map)); + EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()) + .WillRepeatedly(Return(Upstream::makeFlatHostLookupTable(host_map))); // Create HostConnectionInfo entry. addHostConnectionInfo("192.168.1.1", "test-cluster", 1); @@ -1039,7 +1042,8 @@ TEST_F(RCConnectionWrapperTest, OnEventRemoteClose) { auto mock_host = createMockHost("192.168.1.1"); (*host_map)["192.168.1.1"] = std::const_pointer_cast(mock_host); - EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()).WillRepeatedly(Return(host_map)); + EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()) + .WillRepeatedly(Return(Upstream::makeFlatHostLookupTable(host_map))); // Create HostConnectionInfo entry. addHostConnectionInfo("192.168.1.1", "test-cluster", 1); @@ -1104,7 +1108,8 @@ TEST_F(RCConnectionWrapperTest, OnEventConnected) { auto mock_host = createMockHost("192.168.1.1"); (*host_map)["192.168.1.1"] = std::const_pointer_cast(mock_host); - EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()).WillRepeatedly(Return(host_map)); + EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()) + .WillRepeatedly(Return(Upstream::makeFlatHostLookupTable(host_map))); // Create HostConnectionInfo entry. addHostConnectionInfo("192.168.1.1", "test-cluster", 1); @@ -1167,7 +1172,8 @@ TEST_F(RCConnectionWrapperTest, OnEventWithNullConnection) { auto mock_host = createMockHost("192.168.1.1"); (*host_map)["192.168.1.1"] = std::const_pointer_cast(mock_host); - EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()).WillRepeatedly(Return(host_map)); + EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()) + .WillRepeatedly(Return(Upstream::makeFlatHostLookupTable(host_map))); // Create HostConnectionInfo entry. addHostConnectionInfo("192.168.1.1", "test-cluster", 1); diff --git a/test/extensions/bootstrap/reverse_tunnel/downstream_socket_interface/reverse_connection_io_handle_test.cc b/test/extensions/bootstrap/reverse_tunnel/downstream_socket_interface/reverse_connection_io_handle_test.cc index bf6839fb5193a..f196f6136cc08 100644 --- a/test/extensions/bootstrap/reverse_tunnel/downstream_socket_interface/reverse_connection_io_handle_test.cc +++ b/test/extensions/bootstrap/reverse_tunnel/downstream_socket_interface/reverse_connection_io_handle_test.cc @@ -622,7 +622,8 @@ TEST_F(ReverseConnectionIOHandleTest, NoHostsInClusterCannotConnect) { // Set up empty cross priority host map. auto empty_host_map = std::make_shared(); - EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()).WillRepeatedly(Return(empty_host_map)); + EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()) + .WillRepeatedly(Return(Upstream::makeFlatHostLookupTable(empty_host_map))); // Call maintainClusterConnections with empty cluster. RemoteClusterConnectionConfig cluster_config("empty-cluster", 2); @@ -659,7 +660,8 @@ TEST_F(ReverseConnectionIOHandleTest, MaybeUpdateHostsMappingsValidHosts) { (*host_map)["192.168.1.1"] = std::const_pointer_cast(mock_host1); (*host_map)["192.168.1.2"] = std::const_pointer_cast(mock_host2); - EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()).WillRepeatedly(Return(host_map)); + EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()) + .WillRepeatedly(Return(Upstream::makeFlatHostLookupTable(host_map))); // Call maintainClusterConnections which will create HostConnectionInfo entries and call. // maybeUpdateHostsMappingsAndConnections @@ -701,7 +703,8 @@ TEST_F(ReverseConnectionIOHandleTest, MaybeUpdateHostsMappingsNoNewHosts) { (*host_map)["192.168.1.2"] = std::const_pointer_cast(mock_host2); (*host_map)["192.168.1.3"] = std::const_pointer_cast(mock_host3); - EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()).WillRepeatedly(Return(host_map)); + EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()) + .WillRepeatedly(Return(Upstream::makeFlatHostLookupTable(host_map))); // Call maintainClusterConnections which will create HostConnectionInfo entries and call. // maybeUpdateHostsMappingsAndConnections @@ -753,7 +756,8 @@ TEST_F(ReverseConnectionIOHandleTest, ShouldAttemptConnectionToHostValidHost) { auto mock_host = createMockHost("192.168.1.1"); (*host_map)["192.168.1.1"] = std::const_pointer_cast(mock_host); - EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()).WillRepeatedly(Return(host_map)); + EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()) + .WillRepeatedly(Return(Upstream::makeFlatHostLookupTable(host_map))); // Call maintainClusterConnections to create HostConnectionInfo entries. RemoteClusterConnectionConfig cluster_config("test-cluster", 2); @@ -779,7 +783,8 @@ TEST_F(ReverseConnectionIOHandleTest, ShouldAttemptConnectionToHostValidHost) { // Set up the same thread local cluster for the new IO handle. EXPECT_CALL(cluster_manager_, getThreadLocalCluster("test-cluster")) .WillRepeatedly(Return(mock_thread_local_cluster.get())); - EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()).WillRepeatedly(Return(host_map)); + EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()) + .WillRepeatedly(Return(Upstream::makeFlatHostLookupTable(host_map))); // Call maintainClusterConnections to create HostConnectionInfo entries in the new IO handle. maintainClusterConnections("test-cluster", cluster_config); @@ -815,7 +820,8 @@ TEST_F(ReverseConnectionIOHandleTest, TrackConnectionFailurePutsHostInBackoff) { auto mock_host = createMockHost("192.168.1.1"); (*host_map)["192.168.1.1"] = std::const_pointer_cast(mock_host); - EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()).WillRepeatedly(Return(host_map)); + EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()) + .WillRepeatedly(Return(Upstream::makeFlatHostLookupTable(host_map))); // First call maintainClusterConnections to create HostConnectionInfo entries. RemoteClusterConnectionConfig cluster_config("test-cluster", 2); @@ -879,7 +885,8 @@ TEST_F(ReverseConnectionIOHandleTest, ResetHostBackoff) { auto mock_host = createMockHost("192.168.1.1"); (*host_map)["192.168.1.1"] = std::const_pointer_cast(mock_host); - EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()).WillRepeatedly(Return(host_map)); + EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()) + .WillRepeatedly(Return(Upstream::makeFlatHostLookupTable(host_map))); // First call maintainClusterConnections to create HostConnectionInfo entries. RemoteClusterConnectionConfig cluster_config("test-cluster", 2); @@ -955,7 +962,8 @@ TEST_F(ReverseConnectionIOHandleTest, TrackConnectionFailureExponentialBackoff) auto mock_host = createMockHost("192.168.1.1"); (*host_map)["192.168.1.1"] = std::const_pointer_cast(mock_host); - EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()).WillRepeatedly(Return(host_map)); + EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()) + .WillRepeatedly(Return(Upstream::makeFlatHostLookupTable(host_map))); // First call maintainClusterConnections to create HostConnectionInfo entries. RemoteClusterConnectionConfig cluster_config("test-cluster", 2); @@ -1034,7 +1042,8 @@ TEST_F(ReverseConnectionIOHandleTest, HostMappingAndBackoffIntegration) { (*host_map_a)["192.168.1.2"] = std::const_pointer_cast(mock_host_a2); (*host_map_a)["192.168.1.3"] = std::const_pointer_cast(mock_host_a3); - EXPECT_CALL(*mock_priority_set_a, crossPriorityHostMap()).WillRepeatedly(Return(host_map_a)); + EXPECT_CALL(*mock_priority_set_a, crossPriorityHostMap()) + .WillRepeatedly(Return(Upstream::makeFlatHostLookupTable(host_map_a))); // Set up mock thread local cluster for cluster-B. auto mock_thread_local_cluster_b = std::make_shared>(); @@ -1053,7 +1062,8 @@ TEST_F(ReverseConnectionIOHandleTest, HostMappingAndBackoffIntegration) { (*host_map_b)["192.168.2.1"] = std::const_pointer_cast(mock_host_b1); (*host_map_b)["192.168.2.2"] = std::const_pointer_cast(mock_host_b2); - EXPECT_CALL(*mock_priority_set_b, crossPriorityHostMap()).WillRepeatedly(Return(host_map_b)); + EXPECT_CALL(*mock_priority_set_b, crossPriorityHostMap()) + .WillRepeatedly(Return(Upstream::makeFlatHostLookupTable(host_map_b))); // Step 1: Create initial host mappings for cluster-A. RemoteClusterConnectionConfig cluster_config_a("cluster-A", 2); @@ -1138,7 +1148,8 @@ TEST_F(ReverseConnectionIOHandleTest, InitiateOneReverseConnectionFailure) { auto mock_host = createMockHost("192.168.1.1"); (*host_map)["192.168.1.1"] = std::const_pointer_cast(mock_host); - EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()).WillRepeatedly(Return(host_map)); + EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()) + .WillRepeatedly(Return(Upstream::makeFlatHostLookupTable(host_map))); // First call maintainClusterConnections to create HostConnectionInfo entries. RemoteClusterConnectionConfig cluster_config("test-cluster", 2); @@ -1187,7 +1198,8 @@ TEST_F(ReverseConnectionIOHandleTest, InitiateOneReverseConnectionSuccess) { auto mock_host = createMockHost("192.168.1.1"); (*host_map)["192.168.1.1"] = std::const_pointer_cast(mock_host); - EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()).WillRepeatedly(Return(host_map)); + EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()) + .WillRepeatedly(Return(Upstream::makeFlatHostLookupTable(host_map))); // Create HostConnectionInfo entry using helper method. addHostConnectionInfo("192.168.1.1", "test-cluster", 1); @@ -1262,7 +1274,8 @@ TEST_F(ReverseConnectionIOHandleTest, InitiateReverseConnectionWithCustomScope) auto mock_host = createMockHost("192.168.1.1"); (*host_map)["192.168.1.1"] = std::const_pointer_cast(mock_host); - EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()).WillRepeatedly(Return(host_map)); + EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()) + .WillRepeatedly(Return(Upstream::makeFlatHostLookupTable(host_map))); // Create HostConnectionInfo entry using helper method. addHostConnectionInfo("192.168.1.1", "test-cluster", 1); @@ -1313,7 +1326,8 @@ TEST_F(ReverseConnectionIOHandleTest, MaintainClusterConnectionsSkipsHostsWithEn auto mock_host = createMockHost("192.168.1.1"); (*host_map)["192.168.1.1"] = std::const_pointer_cast(mock_host); - EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()).WillRepeatedly(Return(host_map)); + EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()) + .WillRepeatedly(Return(Upstream::makeFlatHostLookupTable(host_map))); // First call maintainClusterConnections to create HostConnectionInfo entries. RemoteClusterConnectionConfig cluster_config("test-cluster", 1); // Only need 1 connection @@ -1436,7 +1450,8 @@ TEST_F(ReverseConnectionIOHandleTest, InitiateMultipleConnectionsMixedResults) { (*host_map)["192.168.1.2"] = std::const_pointer_cast(mock_host2); (*host_map)["192.168.1.3"] = std::const_pointer_cast(mock_host3); - EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()).WillRepeatedly(Return(host_map)); + EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()) + .WillRepeatedly(Return(Upstream::makeFlatHostLookupTable(host_map))); // Create HostConnectionInfo entries for all hosts with target count of 3. addHostConnectionInfo("192.168.1.1", "test-cluster", 1); // Host 1 @@ -1620,7 +1635,8 @@ TEST_F(ReverseConnectionIOHandleTest, RemoveStaleHostAndCloseConnections) { (*host_map)["192.168.1.1"] = std::const_pointer_cast(mock_host1); (*host_map)["192.168.1.2"] = std::const_pointer_cast(mock_host2); - EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()).WillRepeatedly(Return(host_map)); + EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()) + .WillRepeatedly(Return(Upstream::makeFlatHostLookupTable(host_map))); // Set up successful connections for both hosts. auto mock_connection1 = getDeletableConn(); @@ -1852,7 +1868,8 @@ TEST_F(ReverseConnectionIOHandleTest, OnConnectionDoneSuccess) { auto mock_host = createMockHost("192.168.1.1"); (*host_map)["192.168.1.1"] = std::const_pointer_cast(mock_host); - EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()).WillRepeatedly(Return(host_map)); + EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()) + .WillRepeatedly(Return(Upstream::makeFlatHostLookupTable(host_map))); // Create HostConnectionInfo entry. addHostConnectionInfo("192.168.1.1", "test-cluster", 1); @@ -1930,7 +1947,8 @@ TEST_F(ReverseConnectionIOHandleTest, OnConnectionDoneSuccessTriggerWriteFailure auto host_map = std::make_shared(); auto mock_host = createMockHost("192.168.1.1"); (*host_map)["192.168.1.1"] = std::const_pointer_cast(mock_host); - EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()).WillRepeatedly(Return(host_map)); + EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()) + .WillRepeatedly(Return(Upstream::makeFlatHostLookupTable(host_map))); addHostConnectionInfo("192.168.1.1", "test-cluster", 1); @@ -1991,7 +2009,8 @@ TEST_F(ReverseConnectionIOHandleTest, InitiateOneReverseConnectionLogsWithoutPor EXPECT_CALL(*mock_thread_local_cluster, prioritySet()) .WillRepeatedly(ReturnRef(*mock_priority_set)); auto host_map = std::make_shared(); - EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()).WillRepeatedly(Return(host_map)); + EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()) + .WillRepeatedly(Return(Upstream::makeFlatHostLookupTable(host_map))); auto mock_host = createMockPipeHost("/tmp/rev.sock"); auto mock_connection = setupMockConnection(); @@ -2029,7 +2048,8 @@ TEST_F(ReverseConnectionIOHandleTest, OnConnectionDoneFailureAndRecovery) { auto mock_host = createMockHost("192.168.1.1"); (*host_map)["192.168.1.1"] = std::const_pointer_cast(mock_host); - EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()).WillRepeatedly(Return(host_map)); + EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()) + .WillRepeatedly(Return(Upstream::makeFlatHostLookupTable(host_map))); // Create HostConnectionInfo entry. addHostConnectionInfo("192.168.1.1", "test-cluster", 1); @@ -2193,7 +2213,8 @@ TEST_F(ReverseConnectionIOHandleTest, OnDownstreamConnectionClosedTriggersReInit auto mock_host = createMockHost("192.168.1.1"); (*host_map)["192.168.1.1"] = std::const_pointer_cast(mock_host); - EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()).WillRepeatedly(Return(host_map)); + EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()) + .WillRepeatedly(Return(Upstream::makeFlatHostLookupTable(host_map))); // Create HostConnectionInfo entry. addHostConnectionInfo("192.168.1.1", "test-cluster", 1); @@ -2333,7 +2354,8 @@ TEST_F(ReverseConnectionIOHandleTest, SkipNewConnectionIfAttemptInProgress) { auto mock_host = createMockHost("192.168.1.1"); (*host_map)["192.168.1.1"] = std::const_pointer_cast(mock_host); - EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()).WillRepeatedly(Return(host_map)); + EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()) + .WillRepeatedly(Return(Upstream::makeFlatHostLookupTable(host_map))); EXPECT_CALL(*mock_thread_local_cluster, tcpConn_(_)).Times(0); @@ -3057,7 +3079,8 @@ TEST_F(ReverseConnectionIOHandleTest, OnConnectionDoneTlsConnectionQuietShutdown auto host_map = std::make_shared(); auto mock_host = createMockHost("192.168.1.1"); (*host_map)["192.168.1.1"] = std::const_pointer_cast(mock_host); - EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()).WillRepeatedly(Return(host_map)); + EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()) + .WillRepeatedly(Return(Upstream::makeFlatHostLookupTable(host_map))); addHostConnectionInfo("192.168.1.1", "test-cluster", 1); @@ -3142,7 +3165,8 @@ TEST_F(ReverseConnectionIOHandleTest, OnConnectionDoneTlsConnectionDynamicCastFa auto host_map = std::make_shared(); auto mock_host = createMockHost("192.168.1.1"); (*host_map)["192.168.1.1"] = std::const_pointer_cast(mock_host); - EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()).WillRepeatedly(Return(host_map)); + EXPECT_CALL(*mock_priority_set, crossPriorityHostMap()) + .WillRepeatedly(Return(Upstream::makeFlatHostLookupTable(host_map))); addHostConnectionInfo("192.168.1.1", "test-cluster", 1); diff --git a/test/extensions/clusters/dynamic_modules/cluster_test.cc b/test/extensions/clusters/dynamic_modules/cluster_test.cc index 7a2245169670f..a65ba08dc3763 100644 --- a/test/extensions/clusters/dynamic_modules/cluster_test.cc +++ b/test/extensions/clusters/dynamic_modules/cluster_test.cc @@ -3952,6 +3952,55 @@ TEST_F(DynamicModuleClusterTest, PreInitCompleteOffMainThreadFailsClosed) { "envoy_dynamic_module_callback_cluster_pre_init_complete must be called on the main thread"); } +// Verifies that `cluster_use_persistent_host_map` is fail-closed when called off the main thread. +TEST_F(DynamicModuleClusterTest, UsePersistentHostMapOffMainThreadFailsClosed) { + auto result = createCluster(makeYamlConfig("cluster_no_op")); + ASSERT_TRUE(result.ok()) << result.status().message(); + auto cluster = std::dynamic_pointer_cast(result->first); + ASSERT_NE(nullptr, cluster); + + void* cluster_ptr = cluster.get(); + + EXPECT_ENVOY_BUG( + { + std::thread t([&] { + envoy_dynamic_module_callback_cluster_use_persistent_host_map(cluster_ptr, true); + }); + t.join(); + }, + "envoy_dynamic_module_callback_cluster_use_persistent_host_map must be called on the main " + "thread"); +} + +// Verifies that selecting the persistent host map backing on the main thread is accepted and keeps +// identical cross-priority membership behavior. +TEST_F(DynamicModuleClusterTest, UsePersistentHostMapOnMainThreadAccepted) { + auto result = createCluster(makeYamlConfig("cluster_no_op")); + ASSERT_TRUE(result.ok()) << result.status().message(); + auto cluster = std::dynamic_pointer_cast(result->first); + ASSERT_NE(nullptr, cluster); + + // Select the persistent backing through the ABI callback before the first host update, as a + // module would during init. + void* cluster_ptr = cluster.get(); + envoy_dynamic_module_callback_cluster_use_persistent_host_map(cluster_ptr, true); + + std::vector addresses = {"127.0.0.1:10001", "127.0.0.1:10002"}; + std::vector weights = {1, 2}; + std::vector hosts; + ASSERT_TRUE(addSimpleHosts(*cluster, addresses, weights, hosts)); + + // The cross-priority host map is populated under the persistent backing. + EXPECT_EQ(2, DynamicModuleClusterTestPeer::getHostMapSize(*cluster)); + EXPECT_EQ(hosts[0], cluster->findHostByAddress("127.0.0.1:10001")); + EXPECT_EQ(hosts[1], cluster->findHostByAddress("127.0.0.1:10002")); + + // Removal applies under the persistent backing too. + EXPECT_EQ(1, cluster->removeHosts({hosts[0]})); + EXPECT_EQ(nullptr, cluster->findHostByAddress("127.0.0.1:10001")); + EXPECT_EQ(hosts[1], cluster->findHostByAddress("127.0.0.1:10002")); +} + // Verifies that `cluster_find_host_by_address` is fail-closed when called off the main thread. TEST_F(DynamicModuleClusterTest, FindHostByAddressOffMainThreadFailsClosed) { auto result = createCluster(makeYamlConfig("cluster_no_op")); diff --git a/test/extensions/dynamic_modules/abi_impl_test.cc b/test/extensions/dynamic_modules/abi_impl_test.cc index 61c1ed1ee61bc..8f56bfe7ebc1c 100644 --- a/test/extensions/dynamic_modules/abi_impl_test.cc +++ b/test/extensions/dynamic_modules/abi_impl_test.cc @@ -367,6 +367,8 @@ WEAK_STUB(ClusterAddHosts, WEAK_STUB(ClusterRemoveHosts, envoy_dynamic_module_callback_cluster_remove_hosts(nullptr, nullptr, 0)) WEAK_STUB(ClusterPreInitComplete, envoy_dynamic_module_callback_cluster_pre_init_complete(nullptr)) +WEAK_STUB(ClusterUsePersistentHostMap, + envoy_dynamic_module_callback_cluster_use_persistent_host_map(nullptr, true)) WEAK_STUB(ClusterLbGetHealthyHostCount, envoy_dynamic_module_callback_cluster_lb_get_healthy_host_count(nullptr, 0)) WEAK_STUB(ClusterLbGetHealthyHost, diff --git a/test/extensions/load_balancing_policies/dynamic_modules/config_test.cc b/test/extensions/load_balancing_policies/dynamic_modules/config_test.cc index 6ce53b3c61f68..ecf618235fc5e 100644 --- a/test/extensions/load_balancing_policies/dynamic_modules/config_test.cc +++ b/test/extensions/load_balancing_policies/dynamic_modules/config_test.cc @@ -905,7 +905,8 @@ TEST_F(DynamicModulesLoadBalancerTest, HostHealthByAddressSuccess) { host_map->insert({"10.0.0.1:8080", host1_}); host_map->insert({"10.0.0.2:8080", host2_}); host_map->insert({"10.0.0.3:8080", host3_}); - ON_CALL(priority_set_, crossPriorityHostMap()).WillByDefault(Return(host_map)); + ON_CALL(priority_set_, crossPriorityHostMap()) + .WillByDefault(Return(Upstream::makeFlatHostLookupTable(host_map))); envoy::extensions::load_balancing_policies::dynamic_modules::v3::DynamicModulesLoadBalancerConfig config; diff --git a/test/extensions/load_balancing_policies/override_host/BUILD b/test/extensions/load_balancing_policies/override_host/BUILD index a375feacf1316..a0b9b9f499557 100644 --- a/test/extensions/load_balancing_policies/override_host/BUILD +++ b/test/extensions/load_balancing_policies/override_host/BUILD @@ -59,6 +59,7 @@ envoy_extension_cc_test( "//test/mocks/upstream:host_set_mocks", "//test/mocks/upstream:load_balancer_context_mock", "//test/mocks/upstream:priority_set_mocks", + "//test/test_common:logging_lib", "//test/test_common:registry_lib", "//test/test_common:utility_lib", "@abseil-cpp//absl/strings:string_view", diff --git a/test/extensions/load_balancing_policies/override_host/load_balancer_test.cc b/test/extensions/load_balancing_policies/override_host/load_balancer_test.cc index 7c3c907a52a44..6368bde12859d 100644 --- a/test/extensions/load_balancing_policies/override_host/load_balancer_test.cc +++ b/test/extensions/load_balancing_policies/override_host/load_balancer_test.cc @@ -19,6 +19,7 @@ #include "test/mocks/upstream/host_set.h" #include "test/mocks/upstream/load_balancer_context.h" #include "test/mocks/upstream/priority_set.h" +#include "test/test_common/logging.h" #include "test/test_common/registry.h" #include "test/test_common/utility.h" @@ -166,7 +167,8 @@ class OverrideHostLoadBalancerTest : public ::testing::Test { host_map->insert({host->address()->asString(), host}); } } - thread_local_priority_set_.cross_priority_host_map_ = host_map; + thread_local_priority_set_.cross_priority_host_map_ = + Envoy::Upstream::makeFlatHostLookupTable(host_map); } Locality makeLocality(absl::string_view region, absl::string_view zone) { @@ -220,6 +222,32 @@ TEST_F(OverrideHostLoadBalancerTest, NoMetadataOrHeaders) { EXPECT_EQ(load_balancer_->peekAnotherHost(&load_balancer_context_), nullptr); } +TEST_F(OverrideHostLoadBalancerTest, FindHostTraceLogsCandidateAddresses) { + Locality us_central1_a = makeLocality("us-central1", "us-central1-a"); + + MockHostSet* host_set = thread_local_priority_set_.getMockHostSet(0); + host_set->hosts_ = {Envoy::Upstream::makeTestHost( + cluster_info_, "tcp://127.0.0.1:80", us_central1_a, 1, 0, Host::HealthStatus::HEALTHY)}; + host_set->hosts_per_locality_ = ::Envoy::Upstream::makeHostsPerLocality({{host_set->hosts_[0]}}); + makeCrossPriorityHostMap(); + + createLoadBalancer(makeDefaultConfig()); + + setSelectedEndpointsMetadata("envoy.lb", R"pb( + fields { + key: "x-gateway-destination-endpoint" + value: { string_value: "127.0.0.1:80" } + } + )pb"); + EXPECT_CALL(stream_info_, dynamicMetadata()).WillRepeatedly(ReturnRef(metadata_)); + + // With trace logging enabled, findHost enumerates the lookup table before resolving the endpoint. + EXPECT_LOG_CONTAINS("trace", "Looking up 127.0.0.1:80 in 127.0.0.1:80", { + HostConstSharedPtr host = load_balancer_->chooseHost(&load_balancer_context_).host; + EXPECT_EQ(host->address()->asString(), "127.0.0.1:80"); + }); +} + TEST_F(OverrideHostLoadBalancerTest, NullptrHeaders) { Locality us_central1_a = makeLocality("us-central1", "us-central1-a"); diff --git a/test/mocks/upstream/priority_set.h b/test/mocks/upstream/priority_set.h index 02ba558c8e64b..965a05c92084a 100644 --- a/test/mocks/upstream/priority_set.h +++ b/test/mocks/upstream/priority_set.h @@ -27,9 +27,9 @@ class MockPrioritySet : public PrioritySet { LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added, const HostVector& hosts_removed, std::optional weighted_priority_health, std::optional overprovisioning_factor, - HostMapConstSharedPtr cross_priority_host_map)); + HostLookupTableConstSharedPtr cross_priority_host_map)); MOCK_METHOD(void, batchHostUpdate, (BatchUpdateCb&)); - MOCK_METHOD(HostMapConstSharedPtr, crossPriorityHostMap, (), (const)); + MOCK_METHOD(HostLookupTableConstSharedPtr, crossPriorityHostMap, (), (const)); MockHostSet* getMockHostSet(uint32_t priority) { getHostSet(priority); // Ensure the host set exists. @@ -42,7 +42,8 @@ class MockPrioritySet : public PrioritySet { Common::CallbackManager priority_update_cb_helper_; - HostMapConstSharedPtr cross_priority_host_map_{std::make_shared()}; + HostLookupTableConstSharedPtr cross_priority_host_map_{ + makeFlatHostLookupTable(std::make_shared())}; }; } // namespace Upstream } // namespace Envoy