Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func run() error {
return errors.Wrap(err, "installing CRD")
}

registry, err := buildInitialRegistry(ctx, cfg)
registry, err := buildInitialRegistry(ctx, cfg, slogger)
if err != nil {
return errors.Wrap(err, "building cluster registry")
}
Expand Down Expand Up @@ -213,7 +213,7 @@ func readNamespace() (string, error) {
// namespace and constructs a registry that holds clients for every declared
// cluster. If no ClusterMesh resources exist yet, an empty registry is
// returned and the change-watcher will trigger a restart once one is created.
func buildInitialRegistry(ctx context.Context, cfg *rest.Config) (*multicluster.ClusterRegistry, error) {
func buildInitialRegistry(ctx context.Context, cfg *rest.Config, log *slog.Logger) (*multicluster.ClusterRegistry, error) {
preClient, err := client.New(cfg, client.Options{Scheme: scheme})
if err != nil {
return nil, errors.Wrap(err, "building pre-manager client")
Expand All @@ -226,7 +226,7 @@ func buildInitialRegistry(ctx context.Context, cfg *rest.Config) (*multicluster.

entries := mergeClusterEntries(meshes.Items)

registry, err := multicluster.Build(ctx, entries, cfg, preClient, scheme)
registry, err := multicluster.Build(ctx, entries, cfg, preClient, scheme, log)

return registry, errors.Wrap(err, "constructing registry")
}
Expand Down
174 changes: 151 additions & 23 deletions internal/controller/clustermesh_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

"github.com/cockroachdb/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -108,6 +109,14 @@
// persist forever in the surviving clusters.
r.cleanupStaleSourceClusters(ctx, log, mesh)

// Sweep peers whose owning ClusterMesh CR no longer exists. handleDeletion
// normally cleans these via the finalizer, but the finalizer may be
// skipped (force-delete, finalizer manually removed, operator crashloop
// that prevented the finalizer reconcile from running). Without this
// sweep, such peers persist forever as ghosts. Any live reconcile takes
// the global cleanup pass so the cluster always converges.
r.cleanupOrphanMeshPeers(ctx, log, mesh.Namespace)

return ctrl.Result{}, r.updateStatus(ctx, mesh, clusterStatuses)
}

Expand All @@ -124,36 +133,146 @@
return errors.Wrap(err, "building clustermesh controller")
}

// cleanupStaleSourceClusters walks every cluster currently present in the
// mesh's spec and asks each one to drop any Peer it holds whose
// source-cluster label points at a cluster no longer in the spec. Failures
// are logged and swallowed: a stale peer not deleted this tick will be
// retried on the next reconcile, and we should not block the rest of the
// status update on a single client error.
// cleanupStaleSourceClusters walks every cluster the operator knows about
// (the merged registry built from all ClusterMesh resources, not just this
// mesh's current spec) and removes Peer objects this mesh has no business
// owning anymore. Two situations are swept:
//
// - target cluster was removed from this mesh's spec.Clusters: the mesh
// should hold no peers at all in that cluster, so every Peer labeled
// with this mesh's name is deleted there.
//
// - target cluster is still in spec: only Peers whose source-cluster
// label points at a cluster no longer in spec are deleted.
//
// Visiting only the current spec.Clusters misses the first case — a peer
// pushed into a now-removed target stays reachable via another
// ClusterMesh's kubeconfig and would otherwise never be touched again.
// Failures are logged and swallowed: a stale peer not deleted this tick
// will be retried on the next reconcile, and we should not block the rest
// of the status update on a single client error.
func (r *ClusterMeshReconciler) cleanupStaleSourceClusters(ctx context.Context, log *slog.Logger, mesh *v1alpha1.ClusterMesh) {
validSources := make([]string, 0, len(mesh.Spec.Clusters))
for i := range mesh.Spec.Clusters {
validSources = append(validSources, mesh.Spec.Clusters[i].Name)
}
validSet := make(map[string]struct{}, len(mesh.Spec.Clusters))

for i := range mesh.Spec.Clusters {
tgtEntry := &mesh.Spec.Clusters[i]
name := mesh.Spec.Clusters[i].Name
validSources = append(validSources, name)
validSet[name] = struct{}{}
}

tgtClient, ok := r.Registry.Client(tgtEntry.Name)
for _, name := range r.Registry.Clusters() {
tgtClient, ok := r.Registry.Client(name)
if !ok {
continue
}

err := peer.DeleteStaleSourceClusters(ctx, tgtClient, mesh.Name, validSources)
// Target cluster not in this mesh's spec → drop every Peer this
// mesh owns there. Target still in spec → keep peers whose source
// is still in spec, drop the rest.
sources := validSources
if _, inSpec := validSet[name]; !inSpec {
sources = nil
}

err := peer.DeleteStaleSourceClusters(ctx, tgtClient, mesh.Name, sources)
if err != nil {
log.Warn("cleaning stale source-cluster peers",
slog.String("target", tgtEntry.Name),
slog.String("target", name),
slog.String("error", err.Error()),
)
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Reconciling stale source-cluster peers synchronously across all clusters in the registry can block the entire reconciliation loop if any remote cluster is offline or slow. To prevent a single unhealthy cluster from degrading the operator, we should use a context with a short timeout for the cleanup call.

		cleanupCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
		err := peer.DeleteStaleSourceClusters(cleanupCtx, tgtClient, mesh.Name, sources)
		cancel()
		if err != nil {
			log.Warn("cleaning stale source-cluster peers",
				slog.String("target", name),
				slog.String("error", err.Error()),
			)
		}

}
}

// cleanupOrphanMeshPeers deletes Peer objects whose kilo-clustermesh.io/mesh
// label names a ClusterMesh that no longer exists in the given namespace.
// It is a global self-healing pass: it scopes by the operator-namespace of
// living ClusterMesh CRs, so it cannot accidentally clobber peers managed
// from another namespace.
//
// Why this exists: handleDeletion is the primary cleanup path, but it
// requires the finalizer to actually run. The finalizer is skipped if:
// - the CR was force-deleted (e.g. operator was crashlooping and the user
// manually removed the finalizer to unblock teardown),
// - the finalizer was never present (legacy CR predating the finalizer),
// - reconcile-time errors caused the finalizer reconcile to never make it
// to the peer-deletion step.
//
// Without this sweep, the cluster accumulates ghost peers that no future
// reconcile would ever notice — none of the per-CR cleanup paths look at
// peers labeled for CRs other than their own.
//
// Failures are logged and swallowed: each peer is deleted independently and
// a single client error must not abort the whole pass.
func (r *ClusterMeshReconciler) cleanupOrphanMeshPeers(ctx context.Context, log *slog.Logger, namespace string) {

Check failure on line 208 in internal/controller/clustermesh_controller.go

View workflow job for this annotation

GitHub Actions / lint

Function 'cleanupOrphanMeshPeers' is too long (65 > 60) (funlen)
var meshes v1alpha1.ClusterMeshList

err := r.List(ctx, &meshes, client.InNamespace(namespace))
if err != nil {
log.Warn("listing meshes for orphan sweep",
slog.String("namespace", namespace),
slog.String("error", err.Error()),
)

return
}

living := make(map[string]struct{}, len(meshes.Items))
for i := range meshes.Items {
living[meshes.Items[i].Name] = struct{}{}
}

for _, clusterName := range r.Registry.Clusters() {
tgtClient, ok := r.Registry.Client(clusterName)
if !ok {
continue
}

var peers kilov1alpha1.PeerList

err := tgtClient.List(ctx, &peers)
if err != nil {
log.Warn("listing peers for orphan sweep",
slog.String("target", clusterName),
slog.String("error", err.Error()),
)

continue
}

for i := range peers.Items {
peerObj := &peers.Items[i]

meshLabel := peerObj.Labels[peer.LabelMesh]
if meshLabel == "" {
continue
}

if _, alive := living[meshLabel]; alive {
continue
}

deleteErr := tgtClient.Delete(ctx, peerObj)
if deleteErr != nil && !apierrors.IsNotFound(deleteErr) {
log.Warn("deleting orphan peer",
slog.String("target", clusterName),
slog.String("peer", peerObj.Name),
slog.String("error", deleteErr.Error()),
)

continue
}

log.Info("deleted orphan peer whose ClusterMesh CR no longer exists",
slog.String("target", clusterName),
slog.String("peer", peerObj.Name),
slog.String("orphan-mesh", meshLabel),
)
}
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Listing and deleting orphan peers synchronously across all clusters in the registry can block the reconciliation loop if any remote cluster is offline or slow. We should wrap the operations for each cluster with a short timeout context. Make sure to call cancel() at the end of each iteration to avoid context leaks.

	for _, clusterName := range r.Registry.Clusters() {
		tgtClient, ok := r.Registry.Client(clusterName)
		if !ok {
			continue
		}

		clusterCtx, cancel := context.WithTimeout(ctx, 5*time.Second)

		var peers kilov1alpha1.PeerList

		err := tgtClient.List(clusterCtx, &peers)
		if err != nil {
			log.Warn("listing peers for orphan sweep",
				slog.String("target", clusterName),
				slog.String("error", err.Error()),
			)
			cancel()

			continue
		}

		for i := range peers.Items {
			peerObj := &peers.Items[i]

			meshLabel := peerObj.Labels[peer.LabelMesh]
			if meshLabel == "" {
				continue
			}

			if _, alive := living[meshLabel]; alive {
				continue
			}

			deleteErr := tgtClient.Delete(clusterCtx, peerObj)
			if deleteErr != nil && !apierrors.IsNotFound(deleteErr) {
				log.Warn("deleting orphan peer",
					slog.String("target", clusterName),
					slog.String("peer", peerObj.Name),
					slog.String("error", deleteErr.Error()),
				)

				continue
			}

			log.Info("deleted orphan peer whose ClusterMesh CR no longer exists",
				slog.String("target", clusterName),
				slog.String("peer", peerObj.Name),
				slog.String("orphan-mesh", meshLabel),
			)
		}
		cancel()
	}

}

// handleDeletion removes all Peers for this mesh from every cluster, then drops the finalizer.
func (r *ClusterMeshReconciler) handleDeletion(ctx context.Context, log *slog.Logger, mesh *v1alpha1.ClusterMesh) error {
if !controllerutil.ContainsFinalizer(mesh, finalizerName) {
Expand Down Expand Up @@ -372,25 +491,34 @@
return errors.Wrap(r.Status().Update(ctx, mesh), "updating status")
}

// buildDesiredPeers constructs the desired Peer slice for all valid nodes plus an optional anchor.
// buildDesiredPeers constructs the desired Peer slice for all valid nodes.
// The first valid node carries the cluster-wide CIDRs (serviceCIDR and any
// AdditionalCIDRs) folded into its Peer.AllowedIPs. The older design emitted
// a separate anchor Peer that reused the anchor node's WireGuard public key;
// WireGuard's per-pubkey dedup made the second `wg setconf` call to apply
// either the node or the anchor entry clobber the AllowedIPs of the other,
// silently losing pod-CIDR or service-CIDR routing in a racy way. Folding
// the anchor CIDRs into the first node Peer keeps a single WG peer entry
// per pubkey with the full union of AllowedIPs.
func buildDesiredPeers(meshName string, entry *v1alpha1.ClusterEntry, nodes []*corev1.Node) ([]*kilov1alpha1.Peer, error) {
peers := make([]*kilov1alpha1.Peer, 0, len(nodes)+1)
peers := make([]*kilov1alpha1.Peer, 0, len(nodes))

for _, node := range nodes {
p, err := peer.BuildPeer(meshName, entry, node)
anchorExtras := peer.CollectAnchorCIDRs(entry)

for i, node := range nodes {
var extras []string
if i == 0 {
extras = anchorExtras
}

p, err := peer.BuildPeer(meshName, entry, node, extras)
if err != nil {
return nil, errors.Wrapf(err, "building peer for node %q", node.Name)
}

peers = append(peers, p)
}

if len(nodes) > 0 {
if anchor := peer.BuildAnchorPeer(meshName, entry, nodes[0]); anchor != nil {
peers = append(peers, anchor)
}
}

return peers, nil
}

Expand Down
35 changes: 33 additions & 2 deletions internal/multicluster/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package multicluster

import (
"context"
"log/slog"
"net/http"

"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -108,13 +109,31 @@ type EntrySource struct {
// localCfg is the rest.Config of the cluster where the controller runs.
// kubeClient is used to read kubeconfig Secrets for remote clusters from the
// namespace of the ClusterMesh resource that contributed each entry.
//
// Per-entry failures (missing kubeconfig Secret, malformed kubeconfig,
// failure to construct the cluster.Cluster) are logged via the provided
// logger and the entry is skipped — they do not abort the build. This
// matters during teardown: if a tenant's KubernetesSwitchcloud is being
// deleted, its admin-kubeconfig Secret may be removed before the
// ClusterMesh CR that references it. An intolerant Build would crash the
// operator on startup, blocking the finalizer that would otherwise clean
// up Peer objects in still-reachable clusters and release the
// ClusterMesh. The reconciler already does best-effort sweeps over the
// registry and skips clusters it cannot reach, so a partial registry is
// safe and forward-progress-preserving. A nil logger is accepted and
// treated as a discard logger.
func Build(
ctx context.Context,
entries []EntrySource,
localCfg *rest.Config,
kubeClient client.Client,
scheme *runtime.Scheme,
log *slog.Logger,
) (*ClusterRegistry, error) {
if log == nil {
log = slog.New(slog.DiscardHandler)
}

reg := &ClusterRegistry{
clusters: make(map[string]cluster.Cluster, len(entries)),
}
Expand All @@ -125,7 +144,13 @@ func Build(

cfg, err := configForEntry(ctx, &entry, localCfg, src.MeshNamespace, kubeClient)
if err != nil {
return nil, err
log.Warn("skipping cluster entry during registry build",
slog.String("cluster", entry.Name),
slog.String("meshNamespace", src.MeshNamespace),
slog.String("error", err.Error()),
)

continue
}

if entry.Local {
Expand All @@ -136,7 +161,13 @@ func Build(
o.Scheme = scheme
})
if err != nil {
return nil, errors.Wrapf(err, "creating cluster object for %q", entry.Name)
log.Warn("skipping cluster entry during registry build",
slog.String("cluster", entry.Name),
slog.String("meshNamespace", src.MeshNamespace),
slog.String("error", errors.Wrapf(err, "creating cluster object for %q", entry.Name).Error()),
)

continue
}

reg.clusters[entry.Name] = c
Expand Down
Loading
Loading