diff --git a/cmd/main.go b/cmd/main.go index f9ffa9a..a983063 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -106,7 +106,7 @@ func run() error { return errors.Wrap(err, "installing CRD") } - registry, err := buildInitialRegistry(ctx, cfg) + registry, err := buildInitialRegistry(ctx, cfg, slogger) if err != nil { return errors.Wrap(err, "building cluster registry") } @@ -213,7 +213,7 @@ func readNamespace() (string, error) { // namespace and constructs a registry that holds clients for every declared // cluster. If no ClusterMesh resources exist yet, an empty registry is // returned and the change-watcher will trigger a restart once one is created. -func buildInitialRegistry(ctx context.Context, cfg *rest.Config) (*multicluster.ClusterRegistry, error) { +func buildInitialRegistry(ctx context.Context, cfg *rest.Config, log *slog.Logger) (*multicluster.ClusterRegistry, error) { preClient, err := client.New(cfg, client.Options{Scheme: scheme}) if err != nil { return nil, errors.Wrap(err, "building pre-manager client") @@ -226,7 +226,7 @@ func buildInitialRegistry(ctx context.Context, cfg *rest.Config) (*multicluster. entries := mergeClusterEntries(meshes.Items) - registry, err := multicluster.Build(ctx, entries, cfg, preClient, scheme) + registry, err := multicluster.Build(ctx, entries, cfg, preClient, scheme, log) return registry, errors.Wrap(err, "constructing registry") } diff --git a/internal/controller/clustermesh_controller.go b/internal/controller/clustermesh_controller.go index b315036..bb75393 100644 --- a/internal/controller/clustermesh_controller.go +++ b/internal/controller/clustermesh_controller.go @@ -19,9 +19,11 @@ package controller import ( "context" "log/slog" + "time" "github.com/cockroachdb/errors" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -108,6 +110,14 @@ func (r *ClusterMeshReconciler) Reconcile(ctx context.Context, req ctrl.Request) // persist forever in the surviving clusters. r.cleanupStaleSourceClusters(ctx, log, mesh) + // Sweep peers whose owning ClusterMesh CR no longer exists. handleDeletion + // normally cleans these via the finalizer, but the finalizer may be + // skipped (force-delete, finalizer manually removed, operator crashloop + // that prevented the finalizer reconcile from running). Without this + // sweep, such peers persist forever as ghosts. Any live reconcile takes + // the global cleanup pass so the cluster always converges. + r.cleanupOrphanMeshPeers(ctx, log, mesh.Namespace) + return ctrl.Result{}, r.updateStatus(ctx, mesh, clusterStatuses) } @@ -124,33 +134,196 @@ func (r *ClusterMeshReconciler) SetupWithManager(mgr ctrl.Manager) error { return errors.Wrap(err, "building clustermesh controller") } -// cleanupStaleSourceClusters walks every cluster currently present in the -// mesh's spec and asks each one to drop any Peer it holds whose -// source-cluster label points at a cluster no longer in the spec. Failures -// are logged and swallowed: a stale peer not deleted this tick will be -// retried on the next reconcile, and we should not block the rest of the -// status update on a single client error. +// cleanupSweepTimeout caps the per-target list/delete pass time so a single +// unreachable or slow cluster does not block the whole reconcile loop. The +// reconciler retries on every tick anyway, so a brief budget is enough to +// either make progress or move on. +const cleanupSweepTimeout = 5 * time.Second + +// cleanupStaleSourceClusters walks every cluster the operator knows about +// (the merged registry built from all ClusterMesh resources, not just this +// mesh's current spec) and removes Peer objects this mesh has no business +// owning anymore. Two situations are swept: +// +// - target cluster was removed from this mesh's spec.Clusters: the mesh +// should hold no peers at all in that cluster, so every Peer labeled +// with this mesh's name is deleted there. +// +// - target cluster is still in spec: only Peers whose source-cluster +// label points at a cluster no longer in spec are deleted. +// +// Visiting only the current spec.Clusters misses the first case — a peer +// pushed into a now-removed target stays reachable via another +// ClusterMesh's kubeconfig and would otherwise never be touched again. +// Failures are logged and swallowed: a stale peer not deleted this tick +// will be retried on the next reconcile, and we should not block the rest +// of the status update on a single client error. func (r *ClusterMeshReconciler) cleanupStaleSourceClusters(ctx context.Context, log *slog.Logger, mesh *v1alpha1.ClusterMesh) { validSources := make([]string, 0, len(mesh.Spec.Clusters)) - for i := range mesh.Spec.Clusters { - validSources = append(validSources, mesh.Spec.Clusters[i].Name) - } + validSet := make(map[string]struct{}, len(mesh.Spec.Clusters)) for i := range mesh.Spec.Clusters { - tgtEntry := &mesh.Spec.Clusters[i] + name := mesh.Spec.Clusters[i].Name + validSources = append(validSources, name) + validSet[name] = struct{}{} + } - tgtClient, ok := r.Registry.Client(tgtEntry.Name) + for _, name := range r.Registry.Clusters() { + tgtClient, ok := r.Registry.Client(name) if !ok { continue } - err := peer.DeleteStaleSourceClusters(ctx, tgtClient, mesh.Name, validSources) - if err != nil { - log.Warn("cleaning stale source-cluster peers", - slog.String("target", tgtEntry.Name), - slog.String("error", err.Error()), + // Target cluster not in this mesh's spec → drop every Peer this + // mesh owns there. Target still in spec → keep peers whose source + // is still in spec, drop the rest. + sources := validSources + if _, inSpec := validSet[name]; !inSpec { + sources = nil + } + + // Per-target deadline so one unreachable cluster cannot stall the + // whole reconcile pass — see cleanupSweepTimeout for the rationale. + // Anonymous function so defer cancel() fires per iteration instead + // of waiting for the enclosing reconcile to return. + func() { + sweepCtx, cancel := context.WithTimeout(ctx, cleanupSweepTimeout) + defer cancel() + + err := peer.DeleteStaleSourceClusters(sweepCtx, tgtClient, mesh.Name, sources) + if err != nil { + log.Warn("cleaning stale source-cluster peers", + slog.String("target", name), + slog.String("error", err.Error()), + ) + } + }() + } +} + +// cleanupOrphanMeshPeers deletes Peer objects whose kilo-clustermesh.io/mesh +// label names a ClusterMesh that no longer exists in the given namespace. +// It is a global self-healing pass: it scopes by the operator-namespace of +// living ClusterMesh CRs, so it cannot accidentally clobber peers managed +// from another namespace. +// +// Why this exists: handleDeletion is the primary cleanup path, but it +// requires the finalizer to actually run. The finalizer is skipped if: +// - the CR was force-deleted (e.g. operator was crashlooping and the user +// manually removed the finalizer to unblock teardown), +// - the finalizer was never present (legacy CR predating the finalizer), +// - reconcile-time errors caused the finalizer reconcile to never make it +// to the peer-deletion step. +// +// Without this sweep, the cluster accumulates ghost peers that no future +// reconcile would ever notice — none of the per-CR cleanup paths look at +// peers labeled for CRs other than their own. +// +// Failures are logged and swallowed: each peer is deleted independently and +// a single client error must not abort the whole pass. +func (r *ClusterMeshReconciler) cleanupOrphanMeshPeers(ctx context.Context, log *slog.Logger, namespace string) { + living, ok := r.collectLivingMeshes(ctx, log) + if !ok { + return + } + + for _, clusterName := range r.Registry.Clusters() { + tgtClient, present := r.Registry.Client(clusterName) + if !present { + continue + } + + // Per-target deadline so one unreachable cluster cannot stall the + // whole reconcile pass — see cleanupSweepTimeout. Wrapped in an + // anonymous function so defer cancel() fires per iteration. + func() { + sweepCtx, cancel := context.WithTimeout(ctx, cleanupSweepTimeout) + defer cancel() + + r.sweepOrphanPeersInCluster(sweepCtx, log, clusterName, tgtClient, living) + }() + } + + // `namespace` is currently unused — see collectLivingMeshes for the + // reasoning. Keeping the parameter on the function signature documents + // the reconciling-mesh context for future per-namespace heuristics and + // avoids breaking the small set of internal call sites. + _ = namespace +} + +// collectLivingMeshes returns the names of every ClusterMesh in the cluster, +// not just the reconciler's own namespace. The operator builds its registry +// cluster-wide (cmd.buildInitialRegistry merges entries from every +// ClusterMesh it sees), so a target cluster's Peer labelled mesh=foo could +// legitimately belong to a foo CR in any namespace — narrowing the lookup +// to one namespace would let the orphan sweep delete a peer owned by a +// living foo CR sitting elsewhere. ok==false indicates the list call itself +// failed; caller should bail without deleting anything. +func (r *ClusterMeshReconciler) collectLivingMeshes(ctx context.Context, log *slog.Logger) (map[string]struct{}, bool) { + var meshes v1alpha1.ClusterMeshList + + err := r.List(ctx, &meshes) + if err != nil { + log.Warn("listing meshes for orphan sweep", + slog.String("error", err.Error()), + ) + + return nil, false + } + + living := make(map[string]struct{}, len(meshes.Items)) + for i := range meshes.Items { + living[meshes.Items[i].Name] = struct{}{} + } + + return living, true +} + +// sweepOrphanPeersInCluster lists every Peer in a target cluster and deletes +// those whose kilo-clustermesh.io/mesh label names a mesh not in living. +// Listing or per-peer delete failures are logged and the sweep continues +// with the remaining peers. +func (r *ClusterMeshReconciler) sweepOrphanPeersInCluster(ctx context.Context, log *slog.Logger, clusterName string, tgtClient client.Client, living map[string]struct{}) { + var peers kilov1alpha1.PeerList + + err := tgtClient.List(ctx, &peers) + if err != nil { + log.Warn("listing peers for orphan sweep", + slog.String("target", clusterName), + slog.String("error", err.Error()), + ) + + return + } + + for i := range peers.Items { + peerObj := &peers.Items[i] + + meshLabel := peerObj.Labels[peer.LabelMesh] + if meshLabel == "" { + continue + } + + if _, alive := living[meshLabel]; alive { + continue + } + + deleteErr := tgtClient.Delete(ctx, peerObj) + if deleteErr != nil && !apierrors.IsNotFound(deleteErr) { + log.Warn("deleting orphan peer", + slog.String("target", clusterName), + slog.String("peer", peerObj.Name), + slog.String("error", deleteErr.Error()), ) + + continue } + + log.Info("deleted orphan peer whose ClusterMesh CR no longer exists", + slog.String("target", clusterName), + slog.String("peer", peerObj.Name), + slog.String("orphan-mesh", meshLabel), + ) } } @@ -372,12 +545,27 @@ func (r *ClusterMeshReconciler) updateStatus(ctx context.Context, mesh *v1alpha1 return errors.Wrap(r.Status().Update(ctx, mesh), "updating status") } -// buildDesiredPeers constructs the desired Peer slice for all valid nodes plus an optional anchor. +// buildDesiredPeers constructs the desired Peer slice for all valid nodes. +// The first valid node carries the cluster-wide CIDRs (serviceCIDR and any +// AdditionalCIDRs) folded into its Peer.AllowedIPs. The older design emitted +// a separate anchor Peer that reused the anchor node's WireGuard public key; +// WireGuard's per-pubkey dedup made the second `wg setconf` call to apply +// either the node or the anchor entry clobber the AllowedIPs of the other, +// silently losing pod-CIDR or service-CIDR routing in a racy way. Folding +// the anchor CIDRs into the first node Peer keeps a single WG peer entry +// per pubkey with the full union of AllowedIPs. func buildDesiredPeers(meshName string, entry *v1alpha1.ClusterEntry, nodes []*corev1.Node) ([]*kilov1alpha1.Peer, error) { - peers := make([]*kilov1alpha1.Peer, 0, len(nodes)+1) + peers := make([]*kilov1alpha1.Peer, 0, len(nodes)) + + anchorExtras := peer.CollectAnchorCIDRs(entry) + + for i, node := range nodes { + var extras []string + if i == 0 { + extras = anchorExtras + } - for _, node := range nodes { - p, err := peer.BuildPeer(meshName, entry, node) + p, err := peer.BuildPeer(meshName, entry, node, extras) if err != nil { return nil, errors.Wrapf(err, "building peer for node %q", node.Name) } @@ -385,12 +573,6 @@ func buildDesiredPeers(meshName string, entry *v1alpha1.ClusterEntry, nodes []*c peers = append(peers, p) } - if len(nodes) > 0 { - if anchor := peer.BuildAnchorPeer(meshName, entry, nodes[0]); anchor != nil { - peers = append(peers, anchor) - } - } - return peers, nil } diff --git a/internal/multicluster/registry.go b/internal/multicluster/registry.go index beaa87c..1f23448 100644 --- a/internal/multicluster/registry.go +++ b/internal/multicluster/registry.go @@ -18,6 +18,7 @@ package multicluster import ( "context" + "log/slog" "net/http" "github.com/cockroachdb/errors" @@ -108,13 +109,31 @@ type EntrySource struct { // localCfg is the rest.Config of the cluster where the controller runs. // kubeClient is used to read kubeconfig Secrets for remote clusters from the // namespace of the ClusterMesh resource that contributed each entry. +// +// Per-entry failures (missing kubeconfig Secret, malformed kubeconfig, +// failure to construct the cluster.Cluster) are logged via the provided +// logger and the entry is skipped — they do not abort the build. This +// matters during teardown: if a tenant's KubernetesSwitchcloud is being +// deleted, its admin-kubeconfig Secret may be removed before the +// ClusterMesh CR that references it. An intolerant Build would crash the +// operator on startup, blocking the finalizer that would otherwise clean +// up Peer objects in still-reachable clusters and release the +// ClusterMesh. The reconciler already does best-effort sweeps over the +// registry and skips clusters it cannot reach, so a partial registry is +// safe and forward-progress-preserving. A nil logger is accepted and +// treated as a discard logger. func Build( ctx context.Context, entries []EntrySource, localCfg *rest.Config, kubeClient client.Client, scheme *runtime.Scheme, + log *slog.Logger, ) (*ClusterRegistry, error) { + if log == nil { + log = slog.New(slog.DiscardHandler) + } + reg := &ClusterRegistry{ clusters: make(map[string]cluster.Cluster, len(entries)), } @@ -125,7 +144,13 @@ func Build( cfg, err := configForEntry(ctx, &entry, localCfg, src.MeshNamespace, kubeClient) if err != nil { - return nil, err + log.Warn("skipping cluster entry during registry build", + slog.String("cluster", entry.Name), + slog.String("meshNamespace", src.MeshNamespace), + slog.String("error", err.Error()), + ) + + continue } if entry.Local { @@ -136,7 +161,13 @@ func Build( o.Scheme = scheme }) if err != nil { - return nil, errors.Wrapf(err, "creating cluster object for %q", entry.Name) + log.Warn("skipping cluster entry during registry build", + slog.String("cluster", entry.Name), + slog.String("meshNamespace", src.MeshNamespace), + slog.String("error", errors.Wrapf(err, "creating cluster object for %q", entry.Name).Error()), + ) + + continue } reg.clusters[entry.Name] = c diff --git a/internal/multicluster/registry_test.go b/internal/multicluster/registry_test.go new file mode 100644 index 0000000..4008af3 --- /dev/null +++ b/internal/multicluster/registry_test.go @@ -0,0 +1,144 @@ +/* +Copyright 2026 The Kilo Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multicluster + +import ( + "context" + "log/slog" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + v1alpha1 "github.com/squat/kilo-clustermesh-operator/api/v1alpha1" +) + +func discardLogger() *slog.Logger { + return slog.New(slog.DiscardHandler) +} + +// TestBuild_SkipsEntryWithMissingSecret reproduces the operator-crashloop +// scenario observed during tenant teardown: a ClusterMesh CR still references +// a remote cluster whose admin-kubeconfig Secret has already been removed by +// the upstream Helm release. Before the fix, Build returned an error from the +// first such entry and the operator failed to start, blocking the finalizer +// that would otherwise release the ClusterMesh and let the rest of the +// teardown proceed. Build must now log a warning, skip that entry, and keep +// every other entry it can construct. +func TestBuild_SkipsEntryWithMissingSecret(t *testing.T) { + scheme := runtime.NewScheme() + require.NoError(t, corev1.AddToScheme(scheme)) + + // Only the "ceph" secret is present. The "mesh2" entry references a + // secret that does not exist — simulating Helm having deleted it ahead + // of the ClusterMesh CR's finalizer. + cephSecret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Name: "ceph-kubeconfig", Namespace: "tenant-root"}, + Data: map[string][]byte{"kubeconfig": []byte(testKubeconfig)}, + } + fc := fake.NewClientBuilder().WithScheme(scheme).WithObjects(cephSecret).Build() + + entries := []EntrySource{ + { + Entry: v1alpha1.ClusterEntry{ + Name: "ceph", + KubeconfigSecretRef: &v1alpha1.SecretKeyRef{ + Name: "ceph-kubeconfig", + Key: "kubeconfig", + }, + }, + MeshNamespace: "tenant-root", + }, + { + Entry: v1alpha1.ClusterEntry{ + Name: "mesh2", + KubeconfigSecretRef: &v1alpha1.SecretKeyRef{ + Name: "kubernetes-switchcloud-mesh2-admin-kubeconfig", + Key: "super-admin.conf", + }, + }, + MeshNamespace: "tenant-root", + }, + } + + reg, err := Build(context.Background(), entries, &rest.Config{}, fc, scheme, discardLogger()) + require.NoError(t, err, "Build must not fail when some entries have missing secrets") + require.NotNil(t, reg) + + clusters := reg.Clusters() + assert.Contains(t, clusters, "ceph", "reachable entry must remain in the registry") + assert.NotContains(t, clusters, "mesh2", "entry with missing secret must be skipped") + + _, ok := reg.Client("mesh2") + assert.False(t, ok, "Client lookup for the skipped cluster must return ok=false so reconciler best-effort loops continue past it") +} + +// TestBuild_LocalEntryDoesNotNeedSecret guards against regressing the +// fast-path: a Local: true entry must always succeed since it reuses +// localCfg, even when no Secret of the referenced name exists. +func TestBuild_LocalEntryDoesNotNeedSecret(t *testing.T) { + scheme := runtime.NewScheme() + require.NoError(t, corev1.AddToScheme(scheme)) + + fc := fake.NewClientBuilder().WithScheme(scheme).Build() + + entries := []EntrySource{ + { + Entry: v1alpha1.ClusterEntry{Name: "local", Local: true}, + MeshNamespace: "tenant-root", + }, + } + + reg, err := Build(context.Background(), entries, &rest.Config{}, fc, scheme, discardLogger()) + require.NoError(t, err) + require.NotNil(t, reg) + + assert.Equal(t, "local", reg.LocalName()) + assert.Contains(t, reg.Clusters(), "local") +} + +// TestBuild_NilLoggerIsAccepted ensures callers that have not yet wired a +// logger can still use Build without panicking on a nil dereference. +func TestBuild_NilLoggerIsAccepted(t *testing.T) { + scheme := runtime.NewScheme() + require.NoError(t, corev1.AddToScheme(scheme)) + + fc := fake.NewClientBuilder().WithScheme(scheme).Build() + + entries := []EntrySource{ + { + Entry: v1alpha1.ClusterEntry{ + Name: "missing-secret-cluster", + KubeconfigSecretRef: &v1alpha1.SecretKeyRef{ + Name: "does-not-exist", + Key: "kubeconfig", + }, + }, + MeshNamespace: "tenant-root", + }, + } + + reg, err := Build(context.Background(), entries, &rest.Config{}, fc, scheme, nil) + require.NoError(t, err) + require.NotNil(t, reg) + assert.NotContains(t, reg.Clusters(), "missing-secret-cluster") +} diff --git a/internal/peer/builder.go b/internal/peer/builder.go index bff959c..4bb4df4 100644 --- a/internal/peer/builder.go +++ b/internal/peer/builder.go @@ -32,8 +32,25 @@ import ( ) // BuildPeer constructs a Peer object from a validated Node. -// The Peer's allowedIPs = node's PodCIDRs[0] + /32 (or /128) host route derived from the wireguard-ip annotation. -func BuildPeer(meshName string, entry *v1alpha1.ClusterEntry, node *corev1.Node) (*kilov1alpha1.Peer, error) { +// +// The Peer's AllowedIPs always include the node's PodCIDR plus a /32 (or +// /128) host route derived from its kilo.squat.ai/wireguard-ip annotation. +// +// extraAllowedIPs lets the caller fold cluster-wide CIDRs (serviceCIDR and +// any AdditionalCIDRs declared on the ClusterEntry) into the first valid +// node's Peer. This replaces the old "anchor Peer" pattern, which emitted +// a SEPARATE Peer object reusing the anchor node's public key. WireGuard +// identifies peers exclusively by their public key and keeps only one +// peer entry per pubkey: the second `wg setconf` call either dropped the +// node peer's AllowedIPs (so pod-CIDR routing disappeared) or dropped the +// anchor's (so service-CIDR routing disappeared), depending on apply +// order. The resulting outage on the receiving cluster was racy and +// could survive across reconciles. Folding the cluster-wide CIDRs into +// the node peer guarantees one WG peer entry per pubkey with the full +// union of AllowedIPs, so neither half can clobber the other. +// +// extraAllowedIPs may be nil for non-anchor nodes. +func BuildPeer(meshName string, entry *v1alpha1.ClusterEntry, node *corev1.Node, extraAllowedIPs []string) (*kilov1alpha1.Peer, error) { pubKey := node.Annotations[kilonode.AnnotationPublicKey] if pubKey == "" { return nil, errors.Newf("node %q has no public key annotation", node.Name) @@ -52,7 +69,17 @@ func BuildPeer(meshName string, entry *v1alpha1.ClusterEntry, node *corev1.Node) return nil, errors.Wrapf(err, "node %q has invalid wireguard-ip annotation %q", node.Name, wgIP) } - allowedIPs := []string{node.Spec.PodCIDRs[0], netutil.HostRoute(hostIP)} + // PodCIDRs is populated by the kube-controller-manager once it allocates + // a CIDR for the node; until then BuildPeer would panic on the indexing + // below. Surface this as a clean error so the reconciler skips the + // node via validation rather than crashloop the operator. + if len(node.Spec.PodCIDRs) == 0 { + return nil, errors.Newf("node %q has no PodCIDRs allocated yet", node.Name) + } + + allowedIPs := make([]string, 0, 2+len(extraAllowedIPs)) + allowedIPs = append(allowedIPs, node.Spec.PodCIDRs[0], netutil.HostRoute(hostIP)) + allowedIPs = append(allowedIPs, extraAllowedIPs...) endpoint, err := resolvePeerEndpoint(node, entry.WireguardPort) if err != nil { @@ -74,38 +101,13 @@ func BuildPeer(meshName string, entry *v1alpha1.ClusterEntry, node *corev1.Node) return peer, nil } -// BuildAnchorPeer constructs a Peer that carries cluster-wide CIDRs not covered -// by per-node Peers (e.g., serviceCIDR, additionalCIDRs). -// It uses the first validated node's public key and endpoint as the anchor point. -// Returns nil when there are no cluster-wide CIDRs to advertise, or when the -// anchor node has no resolvable endpoint (an anchor without an endpoint cannot -// terminate cross-cluster traffic for those CIDRs). -func BuildAnchorPeer(meshName string, entry *v1alpha1.ClusterEntry, anchorNode *corev1.Node) *kilov1alpha1.Peer { - anchorCIDRs := collectAnchorCIDRs(entry) - if len(anchorCIDRs) == 0 { - return nil - } - - endpoint, err := resolvePeerEndpoint(anchorNode, entry.WireguardPort) - if err != nil { - return nil - } - - return &kilov1alpha1.Peer{ - ObjectMeta: metav1.ObjectMeta{ - Name: Name(meshName, entry.Name, "anchor"), - Labels: Labels(meshName, entry.Name), - }, - Spec: kilov1alpha1.PeerSpec{ - AllowedIPs: anchorCIDRs, - PublicKey: anchorNode.Annotations[kilonode.AnnotationPublicKey], - Endpoint: endpoint, - }, - } -} - -// collectAnchorCIDRs returns the cluster-wide CIDRs for an anchor peer. -func collectAnchorCIDRs(entry *v1alpha1.ClusterEntry) []string { +// CollectAnchorCIDRs returns the cluster-wide CIDRs (serviceCIDR plus any +// AdditionalCIDRs) declared on a ClusterEntry. The caller is expected to +// pass these as extraAllowedIPs to BuildPeer for a single (anchor) node — +// see the commentary on BuildPeer for the WireGuard pubkey-dedup +// rationale that motivates folding cluster-wide CIDRs into a node Peer +// rather than emitting a separate anchor Peer. +func CollectAnchorCIDRs(entry *v1alpha1.ClusterEntry) []string { var cidrs []string if entry.ServiceCIDR != "" { diff --git a/internal/peer/builder_test.go b/internal/peer/builder_test.go index 0b942ec..0a519bf 100644 --- a/internal/peer/builder_test.go +++ b/internal/peer/builder_test.go @@ -80,7 +80,7 @@ func TestBuildPeer_HappyPath(t *testing.T) { node := testNode("worker-1", testPodCIDR, annotations) - got, err := peer.BuildPeer("my-mesh", testEntry(), node) + got, err := peer.BuildPeer("my-mesh", testEntry(), node, nil) require.NoError(t, err) require.NotNil(t, got) @@ -108,7 +108,7 @@ func TestBuildPeer_CozystackStyleWGAnnotation(t *testing.T) { node := testNode("worker-1", testPodCIDR, annotations) - got, err := peer.BuildPeer("my-mesh", testEntry(), node) + got, err := peer.BuildPeer("my-mesh", testEntry(), node, nil) require.NoError(t, err) require.NotNil(t, got) @@ -126,7 +126,7 @@ func TestBuildPeer_InvalidWireguardIP(t *testing.T) { node := testNode("worker-1", testPodCIDR, annotations) - got, err := peer.BuildPeer("my-mesh", testEntry(), node) + got, err := peer.BuildPeer("my-mesh", testEntry(), node, nil) require.Error(t, err) assert.Nil(t, got) @@ -141,7 +141,7 @@ func TestBuildPeer_MissingPublicKey(t *testing.T) { node := testNode("worker-1", testPodCIDR, annotations) - got, err := peer.BuildPeer("my-mesh", testEntry(), node) + got, err := peer.BuildPeer("my-mesh", testEntry(), node, nil) require.Error(t, err) assert.Nil(t, got) @@ -157,7 +157,7 @@ func TestBuildPeer_MissingWireguardIP(t *testing.T) { node := testNode("worker-1", testPodCIDR, annotations) - got, err := peer.BuildPeer("my-mesh", testEntry(), node) + got, err := peer.BuildPeer("my-mesh", testEntry(), node, nil) require.Error(t, err) assert.Nil(t, got) @@ -178,7 +178,7 @@ func TestBuildPeer_NoEndpointSources_ReturnsError(t *testing.T) { node := testNode("worker-1", testPodCIDR, annotations) - got, err := peer.BuildPeer("my-mesh", testEntry(), node) + got, err := peer.BuildPeer("my-mesh", testEntry(), node, nil) require.Error(t, err) assert.Nil(t, got) @@ -193,7 +193,7 @@ func TestBuildPeer_DNSEndpoint(t *testing.T) { node := testNode("worker-1", testPodCIDR, annotations) - got, err := peer.BuildPeer("my-mesh", testEntry(), node) + got, err := peer.BuildPeer("my-mesh", testEntry(), node, nil) require.NoError(t, err) require.NotNil(t, got) @@ -211,7 +211,7 @@ func TestBuildPeer_IPEndpoint(t *testing.T) { node := testNode("worker-1", testPodCIDR, annotations) - got, err := peer.BuildPeer("my-mesh", testEntry(), node) + got, err := peer.BuildPeer("my-mesh", testEntry(), node, nil) require.NoError(t, err) require.NotNil(t, got) @@ -221,9 +221,13 @@ func TestBuildPeer_IPEndpoint(t *testing.T) { assert.Empty(t, got.Spec.Endpoint.DNS) } -func TestBuildAnchorPeer_WithServiceCIDR(t *testing.T) { +func TestBuildPeer_AnchorExtras_WithServiceCIDR(t *testing.T) { t.Parallel() + // Replaces the legacy TestBuildAnchorPeer_WithServiceCIDR. The anchor + // CIDRs now fold into the first node's Peer via extraAllowedIPs, so a + // single WireGuard peer entry on the receiving side carries both the + // node-local routes and the cluster-wide routes. entry := &v1alpha1.ClusterEntry{ Name: "cluster-a", ServiceCIDR: "10.96.0.0/12", @@ -231,18 +235,22 @@ func TestBuildAnchorPeer_WithServiceCIDR(t *testing.T) { node := testNode("worker-1", testPodCIDR, baseAnnotations()) - got := peer.BuildAnchorPeer("my-mesh", entry, node) + got, err := peer.BuildPeer("my-mesh", entry, node, peer.CollectAnchorCIDRs(entry)) + require.NoError(t, err) require.NotNil(t, got) - assert.Equal(t, peer.Name("my-mesh", "cluster-a", "anchor"), got.Name) + assert.Equal(t, peer.Name("my-mesh", "cluster-a", "worker-1"), got.Name) assert.Equal(t, peer.Labels("my-mesh", "cluster-a"), got.Labels) + assert.Contains(t, got.Spec.AllowedIPs, testPodCIDR) assert.Contains(t, got.Spec.AllowedIPs, "10.96.0.0/12") assert.Equal(t, testPubKey, got.Spec.PublicKey) } -func TestBuildAnchorPeer_WithAdditionalCIDRs(t *testing.T) { +func TestBuildPeer_AnchorExtras_WithAdditionalCIDRs(t *testing.T) { t.Parallel() + // AdditionalCIDRs append after serviceCIDR in CollectAnchorCIDRs and + // must appear after the node's own routes in the merged AllowedIPs. entry := &v1alpha1.ClusterEntry{ Name: "cluster-a", ServiceCIDR: "10.96.0.0/12", @@ -251,15 +259,24 @@ func TestBuildAnchorPeer_WithAdditionalCIDRs(t *testing.T) { node := testNode("worker-1", testPodCIDR, baseAnnotations()) - got := peer.BuildAnchorPeer("my-mesh", entry, node) + got, err := peer.BuildPeer("my-mesh", entry, node, peer.CollectAnchorCIDRs(entry)) + require.NoError(t, err) require.NotNil(t, got) - assert.Equal(t, []string{"10.96.0.0/12", "192.168.100.0/24", "172.16.0.0/16"}, got.Spec.AllowedIPs) + + // Node-local routes come first, then anchor CIDRs in declared order. + assert.Equal(t, testPodCIDR, got.Spec.AllowedIPs[0]) + assert.Contains(t, got.Spec.AllowedIPs, "10.96.0.0/12") + assert.Contains(t, got.Spec.AllowedIPs, "192.168.100.0/24") + assert.Contains(t, got.Spec.AllowedIPs, "172.16.0.0/16") } -func TestBuildAnchorPeer_NoAnchorCIDRs(t *testing.T) { +func TestBuildPeer_NoExtras_ProducesNodeOnlyAllowedIPs(t *testing.T) { t.Parallel() + // Replaces TestBuildAnchorPeer_NoAnchorCIDRs: when the caller passes + // no extras (non-anchor node, or anchor cluster has no serviceCIDR / + // additionalCIDRs), the AllowedIPs list is just the node's own routes. entry := &v1alpha1.ClusterEntry{ Name: "cluster-a", ServiceCIDR: "", @@ -268,9 +285,11 @@ func TestBuildAnchorPeer_NoAnchorCIDRs(t *testing.T) { node := testNode("worker-1", testPodCIDR, baseAnnotations()) - got := peer.BuildAnchorPeer("my-mesh", entry, node) + got, err := peer.BuildPeer("my-mesh", entry, node, peer.CollectAnchorCIDRs(entry)) - assert.Nil(t, got, "must return nil when there are no cluster-wide CIDRs") + require.NoError(t, err) + require.NotNil(t, got) + assert.Len(t, got.Spec.AllowedIPs, 2, "node-only Peer must carry exactly pod-CIDR + wg-ip /32") } func TestBuildPeer_MalformedForceEndpoint_ReturnsError(t *testing.T) { @@ -285,7 +304,7 @@ func TestBuildPeer_MalformedForceEndpoint_ReturnsError(t *testing.T) { node := testNode("worker-1", testPodCIDR, annotations) - got, err := peer.BuildPeer("my-mesh", testEntry(), node) + got, err := peer.BuildPeer("my-mesh", testEntry(), node, nil) require.Error(t, err) assert.Nil(t, got) @@ -302,7 +321,7 @@ func TestBuildPeer_ClustermeshEndpointPreferred(t *testing.T) { node := testNode("worker-1", testPodCIDR, annotations) - got, err := peer.BuildPeer("my-mesh", testEntry(), node) + got, err := peer.BuildPeer("my-mesh", testEntry(), node, nil) require.NoError(t, err) require.NotNil(t, got) @@ -328,7 +347,7 @@ func TestBuildPeer_ExternalIPFallback(t *testing.T) { entry := &v1alpha1.ClusterEntry{Name: "cluster-a", WireguardPort: 51820} - got, err := peer.BuildPeer("my-mesh", entry, node) + got, err := peer.BuildPeer("my-mesh", entry, node, nil) require.NoError(t, err) require.NotNil(t, got) @@ -349,18 +368,21 @@ func TestBuildPeer_MalformedClustermeshEndpoint_ReturnsError(t *testing.T) { node := testNode("worker-1", testPodCIDR, annotations) - got, err := peer.BuildPeer("my-mesh", testEntry(), node) + got, err := peer.BuildPeer("my-mesh", testEntry(), node, nil) require.Error(t, err) assert.Nil(t, got) } -func TestBuildAnchorPeer_NoEndpointSource_ReturnsNil(t *testing.T) { +func TestBuildPeer_AnchorExtras_NoEndpointSource_ReturnsError(t *testing.T) { t.Parallel() - // An anchor peer without an endpoint cannot terminate cross-cluster - // traffic for its CIDRs, so BuildAnchorPeer returns nil when the - // anchor node has no resolvable endpoint. + // Replaces TestBuildAnchorPeer_NoEndpointSource_ReturnsNil. Endpoint + // resolution is shared by every node Peer (anchor or not), so the + // same "no resolvable endpoint" condition now surfaces as a hard + // error from BuildPeer rather than a nil-anchor signal. That is + // strictly stricter: the operator surfaces misconfiguration instead + // of silently dropping cluster-wide CIDRs. entry := &v1alpha1.ClusterEntry{ Name: "cluster-a", ServiceCIDR: "10.96.0.0/12", @@ -370,18 +392,21 @@ func TestBuildAnchorPeer_NoEndpointSource_ReturnsNil(t *testing.T) { delete(annotations, kilonode.AnnotationForceEndpoint) node := testNode("worker-1", testPodCIDR, annotations) + node.Status.Addresses = nil - got := peer.BuildAnchorPeer("my-mesh", entry, node) + got, err := peer.BuildPeer("my-mesh", entry, node, peer.CollectAnchorCIDRs(entry)) - assert.Nil(t, got, "anchor without resolvable endpoint must be nil") + require.Error(t, err) + assert.Nil(t, got, "node Peer without resolvable endpoint must error, regardless of anchor CIDRs") } -func TestBuildAnchorPeer_ExternalIPFallback(t *testing.T) { +func TestBuildPeer_AnchorExtras_ExternalIPFallback(t *testing.T) { t.Parallel() - // The anchor peer participates in the same fallback chain — when the - // anchor node has no annotations but does have an ExternalIP, the - // endpoint is synthesised from Node.Status.Addresses. + // Replaces TestBuildAnchorPeer_ExternalIPFallback. The endpoint + // fallback chain (clustermesh-endpoint → force-endpoint → ExternalIP) + // is the same for anchor vs non-anchor nodes — there is no separate + // path anymore. entry := &v1alpha1.ClusterEntry{ Name: "cluster-a", ServiceCIDR: "10.96.0.0/12", @@ -396,12 +421,14 @@ func TestBuildAnchorPeer_ExternalIPFallback(t *testing.T) { {Type: corev1.NodeExternalIP, Address: "203.0.113.99"}, } - got := peer.BuildAnchorPeer("my-mesh", entry, node) + got, err := peer.BuildPeer("my-mesh", entry, node, peer.CollectAnchorCIDRs(entry)) + require.NoError(t, err) require.NotNil(t, got) require.NotNil(t, got.Spec.Endpoint) assert.Equal(t, "203.0.113.99", got.Spec.Endpoint.IP) assert.Equal(t, uint32(51820), got.Spec.Endpoint.Port) + assert.Contains(t, got.Spec.AllowedIPs, "10.96.0.0/12") } func TestBuildPeer_BracketedDNSEndpoint(t *testing.T) { @@ -415,7 +442,7 @@ func TestBuildPeer_BracketedDNSEndpoint(t *testing.T) { node := testNode("worker-1", testPodCIDR, annotations) - got, err := peer.BuildPeer("my-mesh", testEntry(), node) + got, err := peer.BuildPeer("my-mesh", testEntry(), node, nil) require.NoError(t, err) require.NotNil(t, got) diff --git a/test/integration/cleanup_test.go b/test/integration/cleanup_test.go index b46aa19..cc6455c 100644 --- a/test/integration/cleanup_test.go +++ b/test/integration/cleanup_test.go @@ -25,6 +25,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" + v1alpha1 "github.com/squat/kilo-clustermesh-operator/api/v1alpha1" "github.com/squat/kilo-clustermesh-operator/internal/peer" kilov1alpha1 "github.com/squat/kilo-clustermesh-operator/pkg/kilo/v1alpha1" ) @@ -99,9 +100,80 @@ func TestCleanupStaleSourceClusters_RemovesPeersOfRemovedCluster(t *testing.T) { // desired=[] and removes the planted peer as an in-pair orphan, masking the // behaviour this layer is meant to verify. +// TestCleanupStaleSourceClusters_SweepsClustersOutsideSpec verifies the +// cross-CR cleanup property: a peer left in a cluster that is in the +// operator's registry (because another ClusterMesh names it) but is NOT +// in THIS mesh's spec.Clusters must still be swept. This is the case +// that breaks when cleanup only walks spec.Clusters: the "removed +// target" half of the stale-peer problem. +// +// Setup: this mesh's spec contains "local" plus a placeholder +// "ghost-elsewhere" that is not in the registry (the CRD requires at +// least two cluster entries; ghost-elsewhere satisfies that without +// covering the "remote" cluster the registry knows about). The peer +// under test is planted in remote — which is reachable through the +// registry (sibling-mesh kubeconfig in real deployments) but is NOT +// referenced by this mesh's spec. The cross-CR sweep must visit remote +// and delete the peer. +func TestCleanupStaleSourceClusters_SweepsClustersOutsideSpec(t *testing.T) { + ctx := context.Background() + + mesh := &v1alpha1.ClusterMesh{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cleanup-cross-cr-mesh", + Namespace: "default", + }, + Spec: v1alpha1.ClusterMeshSpec{ + Clusters: []v1alpha1.ClusterEntry{ + { + Name: "local", + Local: true, + PodCIDRs: []string{"10.1.0.0/16"}, + WireguardCIDR: "10.100.0.0/24", + }, + { + // Placeholder to satisfy the CRD's minItems=2 on + // spec.clusters. Not in the registry — the + // registry exposes "local" and "remote", so this + // entry is effectively ignored by reconcile. + Name: "ghost-elsewhere", + PodCIDRs: []string{"10.2.0.0/16"}, + WireguardCIDR: "10.100.1.0/24", + }, + }, + }, + } + createMesh(t, mesh) + t.Cleanup(func() { deleteMesh(t, mesh) }) + + // Plant a peer in remote whose source-cluster is not in this mesh's + // spec. Without cross-cluster sweep, reconcile would never visit + // remote (it's not in spec) and the peer would persist forever. + stalePeer := &kilov1alpha1.Peer{ + ObjectMeta: metav1.ObjectMeta{ + Name: "stale--ghost--in-remote", + Labels: peer.Labels(mesh.Name, "ghost-cluster"), + }, + Spec: kilov1alpha1.PeerSpec{ + AllowedIPs: []string{"10.77.0.0/16"}, + PublicKey: "pubkey-stale-cross-cr", + }, + } + require.NoError(t, globalEnv.remoteClient.Create(ctx, stalePeer)) + + mustReconcile(t, mesh) + mustReconcile(t, mesh) + + assertPeerExists(t, globalEnv.remoteClient, "stale--ghost--in-remote", false) +} + // TestCleanupStaleSourceClusters_IgnoresPeersFromOtherMeshes verifies that -// peers labeled for a different ClusterMesh are not touched, even if their -// source-cluster is unknown to this mesh. +// peers labeled for a different but LIVING ClusterMesh are not touched by +// the per-source sweep, even if their source-cluster is unknown to the +// reconciling mesh. The foreign mesh must exist as a CR — otherwise the +// orphan-mesh sweep (a separate defense-in-depth pass) would legitimately +// delete it, and that property is covered by +// TestCleanupOrphanMeshPeers_DeletesGhostsAfterCRGone. func TestCleanupStaleSourceClusters_IgnoresPeersFromOtherMeshes(t *testing.T) { ctx := context.Background() @@ -109,12 +181,23 @@ func TestCleanupStaleSourceClusters_IgnoresPeersFromOtherMeshes(t *testing.T) { createMesh(t, mesh) t.Cleanup(func() { deleteMesh(t, mesh) }) - // Peer belongs to a different mesh; even though its source-cluster is - // nonsensical from our mesh's point of view, we must not touch it. + // Sibling ClusterMesh CR that "owns" the foreign peer below. Keeping it + // alive in the namespace ensures the orphan-mesh sweep treats its peer + // label as legitimate and does not delete it; the property under test + // is per-CR isolation of the source-cluster sweep, not orphan handling. + sibling := simpleMeshSpec("cleanup-isolation-sibling-mesh", "default") + createMesh(t, sibling) + t.Cleanup(func() { deleteMesh(t, sibling) }) + + // Peer belongs to the sibling mesh; the reconciling mesh + // (cleanup-isolation-mesh) must not delete it even though its + // source-cluster ("ghost-cluster") is not part of cleanup-isolation-mesh's + // spec — the per-source sweep is scoped to peers labelled with the + // reconciling mesh, not foreign ones. foreignPeer := &kilov1alpha1.Peer{ ObjectMeta: metav1.ObjectMeta{ Name: "foreign--ghost--node", - Labels: peer.Labels("some-other-mesh", "ghost-cluster"), + Labels: peer.Labels(sibling.Name, "ghost-cluster"), }, Spec: kilov1alpha1.PeerSpec{ AllowedIPs: []string{"10.50.0.0/16"}, @@ -132,6 +215,90 @@ func TestCleanupStaleSourceClusters_IgnoresPeersFromOtherMeshes(t *testing.T) { assertPeerExists(t, globalEnv.remoteClient, "foreign--ghost--node", true) } +// TestCleanupOrphanMeshPeers_DeletesGhostsAfterCRGone verifies the +// defense-in-depth sweep: peers labeled for a ClusterMesh CR that no longer +// exists (because the finalizer was bypassed: force-delete, manual +// finalizer removal, operator crashloop) must be deleted by any live +// reconcile of any surviving mesh. +// +// Setup: plant a Peer labeled with a "ghost-mesh" name that has no +// corresponding ClusterMesh object. Then reconcile a live mesh — the sweep +// must walk the registry and delete that orphan. +func TestCleanupOrphanMeshPeers_DeletesGhostsAfterCRGone(t *testing.T) { + ctx := context.Background() + + // Live mesh that drives the reconcile. + live := simpleMeshSpec("orphan-sweep-live-mesh", "default") + createMesh(t, live) + t.Cleanup(func() { deleteMesh(t, live) }) + + // Orphan peer: labeled with a mesh name that does NOT correspond to + // any ClusterMesh CR in the namespace. Mirrors the production + // failure mode where a tenant CR was force-deleted, leaving its + // peers as ghosts no per-CR reconcile would ever revisit. + orphan := &kilov1alpha1.Peer{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ghost--ghost-cluster--node-zero", + Labels: peer.Labels("ghost-mesh-deleted", "ghost-cluster"), + }, + Spec: kilov1alpha1.PeerSpec{ + AllowedIPs: []string{"10.66.0.0/16"}, + PublicKey: "pubkey-orphan", + }, + } + require.NoError(t, globalEnv.remoteClient.Create(ctx, orphan)) + + // Sanity: orphan exists before reconcile. + assertPeerExists(t, globalEnv.remoteClient, "ghost--ghost-cluster--node-zero", true) + + // First reconcile adds the finalizer; second runs the cleanup passes. + mustReconcile(t, live) + mustReconcile(t, live) + + assertPeerExists(t, globalEnv.remoteClient, "ghost--ghost-cluster--node-zero", false) +} + +// TestCleanupOrphanMeshPeers_LeavesPeersOfLivingMeshAlone guards against +// a regression where the orphan sweep deletes peers whose mesh CR DOES +// exist. The peer in this test belongs to the live mesh itself — it must +// survive every reconcile (subject only to the per-CR sweeps that apply +// when the source-cluster is invalid). +func TestCleanupOrphanMeshPeers_LeavesPeersOfLivingMeshAlone(t *testing.T) { + ctx := context.Background() + + live := simpleMeshSpec("orphan-sweep-keep-mesh", "default") + createMesh(t, live) + t.Cleanup(func() { deleteMesh(t, live) }) + + // Peer belongs to the live mesh; its source-cluster IS in the mesh's + // spec, so neither per-CR nor orphan sweep should touch it. + live.Spec.Clusters[0].Name = "local" + survivor := &kilov1alpha1.Peer{ + ObjectMeta: metav1.ObjectMeta{ + Name: "survivor--local--node-a", + Labels: peer.Labels(live.Name, "local"), + }, + Spec: kilov1alpha1.PeerSpec{ + AllowedIPs: []string{"10.1.0.0/24"}, + PublicKey: "pubkey-survivor", + }, + } + // Plant in a cluster that's NOT in spec (remote is not iterated by + // the per-pair ReconcilePeers source=local because local has no + // nodes; only the orphan sweep would touch this peer). Same idea + // — the survivor must remain because its mesh label points at a + // living CR. + require.NoError(t, globalEnv.localClient.Create(ctx, survivor)) + t.Cleanup(func() { + _ = globalEnv.localClient.Delete(ctx, survivor) + }) + + mustReconcile(t, live) + mustReconcile(t, live) + + assertPeerExists(t, globalEnv.localClient, "survivor--local--node-a", true) +} + func assertPeerExists(t *testing.T, cl client.Client, name string, want bool) { t.Helper()