Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func run() error {
return errors.Wrap(err, "installing CRD")
}

registry, err := buildInitialRegistry(ctx, cfg)
registry, err := buildInitialRegistry(ctx, cfg, slogger)
if err != nil {
return errors.Wrap(err, "building cluster registry")
}
Expand Down Expand Up @@ -213,7 +213,7 @@ func readNamespace() (string, error) {
// namespace and constructs a registry that holds clients for every declared
// cluster. If no ClusterMesh resources exist yet, an empty registry is
// returned and the change-watcher will trigger a restart once one is created.
func buildInitialRegistry(ctx context.Context, cfg *rest.Config) (*multicluster.ClusterRegistry, error) {
func buildInitialRegistry(ctx context.Context, cfg *rest.Config, log *slog.Logger) (*multicluster.ClusterRegistry, error) {
preClient, err := client.New(cfg, client.Options{Scheme: scheme})
if err != nil {
return nil, errors.Wrap(err, "building pre-manager client")
Expand All @@ -226,7 +226,7 @@ func buildInitialRegistry(ctx context.Context, cfg *rest.Config) (*multicluster.

entries := mergeClusterEntries(meshes.Items)

registry, err := multicluster.Build(ctx, entries, cfg, preClient, scheme)
registry, err := multicluster.Build(ctx, entries, cfg, preClient, scheme, log)

return registry, errors.Wrap(err, "constructing registry")
}
Expand Down
234 changes: 208 additions & 26 deletions internal/controller/clustermesh_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package controller
import (
"context"
"log/slog"
"time"

"github.com/cockroachdb/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -108,6 +110,14 @@ func (r *ClusterMeshReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// persist forever in the surviving clusters.
r.cleanupStaleSourceClusters(ctx, log, mesh)

// Sweep peers whose owning ClusterMesh CR no longer exists. handleDeletion
// normally cleans these via the finalizer, but the finalizer may be
// skipped (force-delete, finalizer manually removed, operator crashloop
// that prevented the finalizer reconcile from running). Without this
// sweep, such peers persist forever as ghosts. Any live reconcile takes
// the global cleanup pass so the cluster always converges.
r.cleanupOrphanMeshPeers(ctx, log, mesh.Namespace)

return ctrl.Result{}, r.updateStatus(ctx, mesh, clusterStatuses)
}

Expand All @@ -124,33 +134,196 @@ func (r *ClusterMeshReconciler) SetupWithManager(mgr ctrl.Manager) error {
return errors.Wrap(err, "building clustermesh controller")
}

// cleanupStaleSourceClusters walks every cluster currently present in the
// mesh's spec and asks each one to drop any Peer it holds whose
// source-cluster label points at a cluster no longer in the spec. Failures
// are logged and swallowed: a stale peer not deleted this tick will be
// retried on the next reconcile, and we should not block the rest of the
// status update on a single client error.
// cleanupSweepTimeout caps the per-target list/delete pass time so a single
// unreachable or slow cluster does not block the whole reconcile loop. The
// reconciler retries on every tick anyway, so a brief budget is enough to
// either make progress or move on.
const cleanupSweepTimeout = 5 * time.Second

// cleanupStaleSourceClusters walks every cluster the operator knows about
// (the merged registry built from all ClusterMesh resources, not just this
// mesh's current spec) and removes Peer objects this mesh has no business
// owning anymore. Two situations are swept:
//
// - target cluster was removed from this mesh's spec.Clusters: the mesh
// should hold no peers at all in that cluster, so every Peer labeled
// with this mesh's name is deleted there.
//
// - target cluster is still in spec: only Peers whose source-cluster
// label points at a cluster no longer in spec are deleted.
//
// Visiting only the current spec.Clusters misses the first case — a peer
// pushed into a now-removed target stays reachable via another
// ClusterMesh's kubeconfig and would otherwise never be touched again.
// Failures are logged and swallowed: a stale peer not deleted this tick
// will be retried on the next reconcile, and we should not block the rest
// of the status update on a single client error.
func (r *ClusterMeshReconciler) cleanupStaleSourceClusters(ctx context.Context, log *slog.Logger, mesh *v1alpha1.ClusterMesh) {
validSources := make([]string, 0, len(mesh.Spec.Clusters))
for i := range mesh.Spec.Clusters {
validSources = append(validSources, mesh.Spec.Clusters[i].Name)
}
validSet := make(map[string]struct{}, len(mesh.Spec.Clusters))

for i := range mesh.Spec.Clusters {
tgtEntry := &mesh.Spec.Clusters[i]
name := mesh.Spec.Clusters[i].Name
validSources = append(validSources, name)
validSet[name] = struct{}{}
}

tgtClient, ok := r.Registry.Client(tgtEntry.Name)
for _, name := range r.Registry.Clusters() {
tgtClient, ok := r.Registry.Client(name)
if !ok {
continue
}

err := peer.DeleteStaleSourceClusters(ctx, tgtClient, mesh.Name, validSources)
if err != nil {
log.Warn("cleaning stale source-cluster peers",
slog.String("target", tgtEntry.Name),
slog.String("error", err.Error()),
// Target cluster not in this mesh's spec → drop every Peer this
// mesh owns there. Target still in spec → keep peers whose source
// is still in spec, drop the rest.
sources := validSources
if _, inSpec := validSet[name]; !inSpec {
sources = nil
}

// Per-target deadline so one unreachable cluster cannot stall the
// whole reconcile pass — see cleanupSweepTimeout for the rationale.
// Anonymous function so defer cancel() fires per iteration instead
// of waiting for the enclosing reconcile to return.
func() {
sweepCtx, cancel := context.WithTimeout(ctx, cleanupSweepTimeout)
defer cancel()

err := peer.DeleteStaleSourceClusters(sweepCtx, tgtClient, mesh.Name, sources)
if err != nil {
log.Warn("cleaning stale source-cluster peers",
slog.String("target", name),
slog.String("error", err.Error()),
)
}
}()
}
}

// cleanupOrphanMeshPeers deletes Peer objects whose kilo-clustermesh.io/mesh
// label names a ClusterMesh that no longer exists in the given namespace.
// It is a global self-healing pass: it scopes by the operator-namespace of
// living ClusterMesh CRs, so it cannot accidentally clobber peers managed
// from another namespace.
//
// Why this exists: handleDeletion is the primary cleanup path, but it
// requires the finalizer to actually run. The finalizer is skipped if:
// - the CR was force-deleted (e.g. operator was crashlooping and the user
// manually removed the finalizer to unblock teardown),
// - the finalizer was never present (legacy CR predating the finalizer),
// - reconcile-time errors caused the finalizer reconcile to never make it
// to the peer-deletion step.
//
// Without this sweep, the cluster accumulates ghost peers that no future
// reconcile would ever notice — none of the per-CR cleanup paths look at
// peers labeled for CRs other than their own.
//
// Failures are logged and swallowed: each peer is deleted independently and
// a single client error must not abort the whole pass.
func (r *ClusterMeshReconciler) cleanupOrphanMeshPeers(ctx context.Context, log *slog.Logger, namespace string) {
living, ok := r.collectLivingMeshes(ctx, log)
if !ok {
return
}

for _, clusterName := range r.Registry.Clusters() {
tgtClient, present := r.Registry.Client(clusterName)
if !present {
continue
}

// Per-target deadline so one unreachable cluster cannot stall the
// whole reconcile pass — see cleanupSweepTimeout. Wrapped in an
// anonymous function so defer cancel() fires per iteration.
func() {
sweepCtx, cancel := context.WithTimeout(ctx, cleanupSweepTimeout)
defer cancel()

r.sweepOrphanPeersInCluster(sweepCtx, log, clusterName, tgtClient, living)
}()
}

// `namespace` is currently unused — see collectLivingMeshes for the
// reasoning. Keeping the parameter on the function signature documents
// the reconciling-mesh context for future per-namespace heuristics and
// avoids breaking the small set of internal call sites.
_ = namespace
}

// collectLivingMeshes returns the names of every ClusterMesh in the cluster,
// not just the reconciler's own namespace. The operator builds its registry
// cluster-wide (cmd.buildInitialRegistry merges entries from every
// ClusterMesh it sees), so a target cluster's Peer labelled mesh=foo could
// legitimately belong to a foo CR in any namespace — narrowing the lookup
// to one namespace would let the orphan sweep delete a peer owned by a
// living foo CR sitting elsewhere. ok==false indicates the list call itself
// failed; caller should bail without deleting anything.
func (r *ClusterMeshReconciler) collectLivingMeshes(ctx context.Context, log *slog.Logger) (map[string]struct{}, bool) {
var meshes v1alpha1.ClusterMeshList

err := r.List(ctx, &meshes)
if err != nil {
log.Warn("listing meshes for orphan sweep",
slog.String("error", err.Error()),
)

return nil, false
}

living := make(map[string]struct{}, len(meshes.Items))
for i := range meshes.Items {
living[meshes.Items[i].Name] = struct{}{}
}

return living, true
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

// sweepOrphanPeersInCluster lists every Peer in a target cluster and deletes
// those whose kilo-clustermesh.io/mesh label names a mesh not in living.
// Listing or per-peer delete failures are logged and the sweep continues
// with the remaining peers.
func (r *ClusterMeshReconciler) sweepOrphanPeersInCluster(ctx context.Context, log *slog.Logger, clusterName string, tgtClient client.Client, living map[string]struct{}) {
var peers kilov1alpha1.PeerList

err := tgtClient.List(ctx, &peers)
if err != nil {
log.Warn("listing peers for orphan sweep",
slog.String("target", clusterName),
slog.String("error", err.Error()),
)

return
}

for i := range peers.Items {
peerObj := &peers.Items[i]

meshLabel := peerObj.Labels[peer.LabelMesh]
if meshLabel == "" {
continue
}

if _, alive := living[meshLabel]; alive {
continue
}

deleteErr := tgtClient.Delete(ctx, peerObj)
if deleteErr != nil && !apierrors.IsNotFound(deleteErr) {
log.Warn("deleting orphan peer",
slog.String("target", clusterName),
slog.String("peer", peerObj.Name),
slog.String("error", deleteErr.Error()),
)

continue
}

log.Info("deleted orphan peer whose ClusterMesh CR no longer exists",
slog.String("target", clusterName),
slog.String("peer", peerObj.Name),
slog.String("orphan-mesh", meshLabel),
)
}
}

Expand Down Expand Up @@ -372,25 +545,34 @@ func (r *ClusterMeshReconciler) updateStatus(ctx context.Context, mesh *v1alpha1
return errors.Wrap(r.Status().Update(ctx, mesh), "updating status")
}

// buildDesiredPeers constructs the desired Peer slice for all valid nodes plus an optional anchor.
// buildDesiredPeers constructs the desired Peer slice for all valid nodes.
// The first valid node carries the cluster-wide CIDRs (serviceCIDR and any
// AdditionalCIDRs) folded into its Peer.AllowedIPs. The older design emitted
// a separate anchor Peer that reused the anchor node's WireGuard public key;
// WireGuard's per-pubkey dedup made the second `wg setconf` call to apply
// either the node or the anchor entry clobber the AllowedIPs of the other,
// silently losing pod-CIDR or service-CIDR routing in a racy way. Folding
// the anchor CIDRs into the first node Peer keeps a single WG peer entry
// per pubkey with the full union of AllowedIPs.
func buildDesiredPeers(meshName string, entry *v1alpha1.ClusterEntry, nodes []*corev1.Node) ([]*kilov1alpha1.Peer, error) {
peers := make([]*kilov1alpha1.Peer, 0, len(nodes)+1)
peers := make([]*kilov1alpha1.Peer, 0, len(nodes))

anchorExtras := peer.CollectAnchorCIDRs(entry)

for i, node := range nodes {
var extras []string
if i == 0 {
extras = anchorExtras
}

for _, node := range nodes {
p, err := peer.BuildPeer(meshName, entry, node)
p, err := peer.BuildPeer(meshName, entry, node, extras)
if err != nil {
return nil, errors.Wrapf(err, "building peer for node %q", node.Name)
}

peers = append(peers, p)
}

if len(nodes) > 0 {
if anchor := peer.BuildAnchorPeer(meshName, entry, nodes[0]); anchor != nil {
peers = append(peers, anchor)
}
}

return peers, nil
}

Expand Down
35 changes: 33 additions & 2 deletions internal/multicluster/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package multicluster

import (
"context"
"log/slog"
"net/http"

"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -108,13 +109,31 @@ type EntrySource struct {
// localCfg is the rest.Config of the cluster where the controller runs.
// kubeClient is used to read kubeconfig Secrets for remote clusters from the
// namespace of the ClusterMesh resource that contributed each entry.
//
// Per-entry failures (missing kubeconfig Secret, malformed kubeconfig,
// failure to construct the cluster.Cluster) are logged via the provided
// logger and the entry is skipped — they do not abort the build. This
// matters during teardown: if a tenant's KubernetesSwitchcloud is being
// deleted, its admin-kubeconfig Secret may be removed before the
// ClusterMesh CR that references it. An intolerant Build would crash the
// operator on startup, blocking the finalizer that would otherwise clean
// up Peer objects in still-reachable clusters and release the
// ClusterMesh. The reconciler already does best-effort sweeps over the
// registry and skips clusters it cannot reach, so a partial registry is
// safe and forward-progress-preserving. A nil logger is accepted and
// treated as a discard logger.
func Build(
ctx context.Context,
entries []EntrySource,
localCfg *rest.Config,
kubeClient client.Client,
scheme *runtime.Scheme,
log *slog.Logger,
) (*ClusterRegistry, error) {
if log == nil {
log = slog.New(slog.DiscardHandler)
}

reg := &ClusterRegistry{
clusters: make(map[string]cluster.Cluster, len(entries)),
}
Expand All @@ -125,7 +144,13 @@ func Build(

cfg, err := configForEntry(ctx, &entry, localCfg, src.MeshNamespace, kubeClient)
if err != nil {
return nil, err
log.Warn("skipping cluster entry during registry build",
slog.String("cluster", entry.Name),
slog.String("meshNamespace", src.MeshNamespace),
slog.String("error", err.Error()),
)

continue
}

if entry.Local {
Expand All @@ -136,7 +161,13 @@ func Build(
o.Scheme = scheme
})
if err != nil {
return nil, errors.Wrapf(err, "creating cluster object for %q", entry.Name)
log.Warn("skipping cluster entry during registry build",
slog.String("cluster", entry.Name),
slog.String("meshNamespace", src.MeshNamespace),
slog.String("error", errors.Wrapf(err, "creating cluster object for %q", entry.Name).Error()),
)

continue
}

reg.clusters[entry.Name] = c
Expand Down
Loading
Loading