Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func run() error {
return errors.Wrap(err, "installing CRD")
}

registry, err := buildInitialRegistry(ctx, cfg)
registry, err := buildInitialRegistry(ctx, cfg, slogger)
if err != nil {
return errors.Wrap(err, "building cluster registry")
}
Expand Down Expand Up @@ -213,7 +213,7 @@ func readNamespace() (string, error) {
// namespace and constructs a registry that holds clients for every declared
// cluster. If no ClusterMesh resources exist yet, an empty registry is
// returned and the change-watcher will trigger a restart once one is created.
func buildInitialRegistry(ctx context.Context, cfg *rest.Config) (*multicluster.ClusterRegistry, error) {
func buildInitialRegistry(ctx context.Context, cfg *rest.Config, log *slog.Logger) (*multicluster.ClusterRegistry, error) {
preClient, err := client.New(cfg, client.Options{Scheme: scheme})
if err != nil {
return nil, errors.Wrap(err, "building pre-manager client")
Expand All @@ -226,7 +226,7 @@ func buildInitialRegistry(ctx context.Context, cfg *rest.Config) (*multicluster.

entries := mergeClusterEntries(meshes.Items)

registry, err := multicluster.Build(ctx, entries, cfg, preClient, scheme)
registry, err := multicluster.Build(ctx, entries, cfg, preClient, scheme, log)

return registry, errors.Wrap(err, "constructing registry")
}
Expand Down
195 changes: 172 additions & 23 deletions internal/controller/clustermesh_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/cockroachdb/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -108,6 +109,14 @@ func (r *ClusterMeshReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// persist forever in the surviving clusters.
r.cleanupStaleSourceClusters(ctx, log, mesh)

// Sweep peers whose owning ClusterMesh CR no longer exists. handleDeletion
// normally cleans these via the finalizer, but the finalizer may be
// skipped (force-delete, finalizer manually removed, operator crashloop
// that prevented the finalizer reconcile from running). Without this
// sweep, such peers persist forever as ghosts. Any live reconcile takes
// the global cleanup pass so the cluster always converges.
r.cleanupOrphanMeshPeers(ctx, log, mesh.Namespace)

return ctrl.Result{}, r.updateStatus(ctx, mesh, clusterStatuses)
}

Expand All @@ -124,36 +133,167 @@ func (r *ClusterMeshReconciler) SetupWithManager(mgr ctrl.Manager) error {
return errors.Wrap(err, "building clustermesh controller")
}

// cleanupStaleSourceClusters walks every cluster currently present in the
// mesh's spec and asks each one to drop any Peer it holds whose
// source-cluster label points at a cluster no longer in the spec. Failures
// are logged and swallowed: a stale peer not deleted this tick will be
// retried on the next reconcile, and we should not block the rest of the
// status update on a single client error.
// cleanupStaleSourceClusters walks every cluster the operator knows about
// (the merged registry built from all ClusterMesh resources, not just this
// mesh's current spec) and removes Peer objects this mesh has no business
// owning anymore. Two situations are swept:
//
// - target cluster was removed from this mesh's spec.Clusters: the mesh
// should hold no peers at all in that cluster, so every Peer labeled
// with this mesh's name is deleted there.
//
// - target cluster is still in spec: only Peers whose source-cluster
// label points at a cluster no longer in spec are deleted.
//
// Visiting only the current spec.Clusters misses the first case — a peer
// pushed into a now-removed target stays reachable via another
// ClusterMesh's kubeconfig and would otherwise never be touched again.
// Failures are logged and swallowed: a stale peer not deleted this tick
// will be retried on the next reconcile, and we should not block the rest
// of the status update on a single client error.
func (r *ClusterMeshReconciler) cleanupStaleSourceClusters(ctx context.Context, log *slog.Logger, mesh *v1alpha1.ClusterMesh) {
validSources := make([]string, 0, len(mesh.Spec.Clusters))
for i := range mesh.Spec.Clusters {
validSources = append(validSources, mesh.Spec.Clusters[i].Name)
}
validSet := make(map[string]struct{}, len(mesh.Spec.Clusters))

for i := range mesh.Spec.Clusters {
tgtEntry := &mesh.Spec.Clusters[i]
name := mesh.Spec.Clusters[i].Name
validSources = append(validSources, name)
validSet[name] = struct{}{}
}

tgtClient, ok := r.Registry.Client(tgtEntry.Name)
for _, name := range r.Registry.Clusters() {
tgtClient, ok := r.Registry.Client(name)
if !ok {
continue
}

err := peer.DeleteStaleSourceClusters(ctx, tgtClient, mesh.Name, validSources)
// Target cluster not in this mesh's spec → drop every Peer this
// mesh owns there. Target still in spec → keep peers whose source
// is still in spec, drop the rest.
sources := validSources
if _, inSpec := validSet[name]; !inSpec {
sources = nil
}

err := peer.DeleteStaleSourceClusters(ctx, tgtClient, mesh.Name, sources)
if err != nil {
log.Warn("cleaning stale source-cluster peers",
slog.String("target", tgtEntry.Name),
slog.String("target", name),
slog.String("error", err.Error()),
)
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Reconciling stale source-cluster peers synchronously across all clusters in the registry can block the entire reconciliation loop if any remote cluster is offline or slow. To prevent a single unhealthy cluster from degrading the operator, we should use a context with a short timeout for the cleanup call.

		cleanupCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
		err := peer.DeleteStaleSourceClusters(cleanupCtx, tgtClient, mesh.Name, sources)
		cancel()
		if err != nil {
			log.Warn("cleaning stale source-cluster peers",
				slog.String("target", name),
				slog.String("error", err.Error()),
			)
		}

}
}

// cleanupOrphanMeshPeers deletes Peer objects whose kilo-clustermesh.io/mesh
// label names a ClusterMesh that no longer exists in the given namespace.
// It is a global self-healing pass: it scopes by the operator-namespace of
// living ClusterMesh CRs, so it cannot accidentally clobber peers managed
// from another namespace.
//
// Why this exists: handleDeletion is the primary cleanup path, but it
// requires the finalizer to actually run. The finalizer is skipped if:
// - the CR was force-deleted (e.g. operator was crashlooping and the user
// manually removed the finalizer to unblock teardown),
// - the finalizer was never present (legacy CR predating the finalizer),
// - reconcile-time errors caused the finalizer reconcile to never make it
// to the peer-deletion step.
//
// Without this sweep, the cluster accumulates ghost peers that no future
// reconcile would ever notice — none of the per-CR cleanup paths look at
// peers labeled for CRs other than their own.
//
// Failures are logged and swallowed: each peer is deleted independently and
// a single client error must not abort the whole pass.
func (r *ClusterMeshReconciler) cleanupOrphanMeshPeers(ctx context.Context, log *slog.Logger, namespace string) {
living, ok := r.collectLivingMeshes(ctx, log, namespace)
if !ok {
return
}

for _, clusterName := range r.Registry.Clusters() {
tgtClient, present := r.Registry.Client(clusterName)
if !present {
continue
}

r.sweepOrphanPeersInCluster(ctx, log, clusterName, tgtClient, living)
}
}

// collectLivingMeshes returns the names of every ClusterMesh in the given
// namespace, used by the orphan sweep to decide which mesh labels are still
// owned. ok==false indicates the list call itself failed; caller should
// bail without deleting anything.
func (r *ClusterMeshReconciler) collectLivingMeshes(ctx context.Context, log *slog.Logger, namespace string) (map[string]struct{}, bool) {
var meshes v1alpha1.ClusterMeshList

err := r.List(ctx, &meshes, client.InNamespace(namespace))
if err != nil {
log.Warn("listing meshes for orphan sweep",
slog.String("namespace", namespace),
slog.String("error", err.Error()),
)

return nil, false
}

living := make(map[string]struct{}, len(meshes.Items))
for i := range meshes.Items {
living[meshes.Items[i].Name] = struct{}{}
}

return living, true
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

// sweepOrphanPeersInCluster lists every Peer in a target cluster and deletes
// those whose kilo-clustermesh.io/mesh label names a mesh not in living.
// Listing or per-peer delete failures are logged and the sweep continues
// with the remaining peers.
func (r *ClusterMeshReconciler) sweepOrphanPeersInCluster(ctx context.Context, log *slog.Logger, clusterName string, tgtClient client.Client, living map[string]struct{}) {
var peers kilov1alpha1.PeerList

err := tgtClient.List(ctx, &peers)
if err != nil {
log.Warn("listing peers for orphan sweep",
slog.String("target", clusterName),
slog.String("error", err.Error()),
)

return
}

for i := range peers.Items {
peerObj := &peers.Items[i]

meshLabel := peerObj.Labels[peer.LabelMesh]
if meshLabel == "" {
continue
}

if _, alive := living[meshLabel]; alive {
continue
}

deleteErr := tgtClient.Delete(ctx, peerObj)
if deleteErr != nil && !apierrors.IsNotFound(deleteErr) {
log.Warn("deleting orphan peer",
slog.String("target", clusterName),
slog.String("peer", peerObj.Name),
slog.String("error", deleteErr.Error()),
)

continue
}

log.Info("deleted orphan peer whose ClusterMesh CR no longer exists",
slog.String("target", clusterName),
slog.String("peer", peerObj.Name),
slog.String("orphan-mesh", meshLabel),
)
}
}

// handleDeletion removes all Peers for this mesh from every cluster, then drops the finalizer.
func (r *ClusterMeshReconciler) handleDeletion(ctx context.Context, log *slog.Logger, mesh *v1alpha1.ClusterMesh) error {
if !controllerutil.ContainsFinalizer(mesh, finalizerName) {
Expand Down Expand Up @@ -372,25 +512,34 @@ func (r *ClusterMeshReconciler) updateStatus(ctx context.Context, mesh *v1alpha1
return errors.Wrap(r.Status().Update(ctx, mesh), "updating status")
}

// buildDesiredPeers constructs the desired Peer slice for all valid nodes plus an optional anchor.
// buildDesiredPeers constructs the desired Peer slice for all valid nodes.
// The first valid node carries the cluster-wide CIDRs (serviceCIDR and any
// AdditionalCIDRs) folded into its Peer.AllowedIPs. The older design emitted
// a separate anchor Peer that reused the anchor node's WireGuard public key;
// WireGuard's per-pubkey dedup made the second `wg setconf` call to apply
// either the node or the anchor entry clobber the AllowedIPs of the other,
// silently losing pod-CIDR or service-CIDR routing in a racy way. Folding
// the anchor CIDRs into the first node Peer keeps a single WG peer entry
// per pubkey with the full union of AllowedIPs.
func buildDesiredPeers(meshName string, entry *v1alpha1.ClusterEntry, nodes []*corev1.Node) ([]*kilov1alpha1.Peer, error) {
peers := make([]*kilov1alpha1.Peer, 0, len(nodes)+1)
peers := make([]*kilov1alpha1.Peer, 0, len(nodes))

anchorExtras := peer.CollectAnchorCIDRs(entry)

for i, node := range nodes {
var extras []string
if i == 0 {
extras = anchorExtras
}

for _, node := range nodes {
p, err := peer.BuildPeer(meshName, entry, node)
p, err := peer.BuildPeer(meshName, entry, node, extras)
if err != nil {
return nil, errors.Wrapf(err, "building peer for node %q", node.Name)
}

peers = append(peers, p)
}

if len(nodes) > 0 {
if anchor := peer.BuildAnchorPeer(meshName, entry, nodes[0]); anchor != nil {
peers = append(peers, anchor)
}
}

return peers, nil
}

Expand Down
35 changes: 33 additions & 2 deletions internal/multicluster/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package multicluster

import (
"context"
"log/slog"
"net/http"

"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -108,13 +109,31 @@ type EntrySource struct {
// localCfg is the rest.Config of the cluster where the controller runs.
// kubeClient is used to read kubeconfig Secrets for remote clusters from the
// namespace of the ClusterMesh resource that contributed each entry.
//
// Per-entry failures (missing kubeconfig Secret, malformed kubeconfig,
// failure to construct the cluster.Cluster) are logged via the provided
// logger and the entry is skipped — they do not abort the build. This
// matters during teardown: if a tenant's KubernetesSwitchcloud is being
// deleted, its admin-kubeconfig Secret may be removed before the
// ClusterMesh CR that references it. An intolerant Build would crash the
// operator on startup, blocking the finalizer that would otherwise clean
// up Peer objects in still-reachable clusters and release the
// ClusterMesh. The reconciler already does best-effort sweeps over the
// registry and skips clusters it cannot reach, so a partial registry is
// safe and forward-progress-preserving. A nil logger is accepted and
// treated as a discard logger.
func Build(
ctx context.Context,
entries []EntrySource,
localCfg *rest.Config,
kubeClient client.Client,
scheme *runtime.Scheme,
log *slog.Logger,
) (*ClusterRegistry, error) {
if log == nil {
log = slog.New(slog.DiscardHandler)
}

reg := &ClusterRegistry{
clusters: make(map[string]cluster.Cluster, len(entries)),
}
Expand All @@ -125,7 +144,13 @@ func Build(

cfg, err := configForEntry(ctx, &entry, localCfg, src.MeshNamespace, kubeClient)
if err != nil {
return nil, err
log.Warn("skipping cluster entry during registry build",
slog.String("cluster", entry.Name),
slog.String("meshNamespace", src.MeshNamespace),
slog.String("error", err.Error()),
)

continue
}

if entry.Local {
Expand All @@ -136,7 +161,13 @@ func Build(
o.Scheme = scheme
})
if err != nil {
return nil, errors.Wrapf(err, "creating cluster object for %q", entry.Name)
log.Warn("skipping cluster entry during registry build",
slog.String("cluster", entry.Name),
slog.String("meshNamespace", src.MeshNamespace),
slog.String("error", errors.Wrapf(err, "creating cluster object for %q", entry.Name).Error()),
)

continue
}

reg.clusters[entry.Name] = c
Expand Down
Loading
Loading