From ddcaa378359f3e361135922097d04e7cbd494c4d Mon Sep 17 00:00:00 2001 From: IvanHunters Date: Fri, 19 Jun 2026 12:41:11 +0300 Subject: [PATCH 1/3] feat(operator): discovered-endpoint enrichment and per-cluster persistent keepalive MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Discovered-endpoint enrichment For clusters behind NAT (e.g. OpenStack tenants without floating IPs) the node's kilo.squat.ai/endpoint annotation only carries the internal IP, which is unreachable from the peer cluster. The real egress address is visible in kilo.squat.ai/discovered-endpoints on the target cluster's nodes: Kilo records the WireGuard handshake source IP there after each successful session. Add enrichEndpointsFromDiscovered: after building the desired Peer slice, the controller reads discovered-endpoints from every node in the target cluster and replaces the configured endpoint with the observed NAT/egress IP when a more-specific address is available. The enrichment is best-effort — a lookup failure falls back to the original endpoint and logs a warning. This makes the operator self-healing for NAT traversal: the source cluster's Kilo initiates the first handshake (it already knows the target's public IPs), the target nodes record the real source IP, and subsequent operator reconciles automatically publish the correct endpoint in the Peer CRD so the target can initiate back. ## Per-cluster PersistentKeepalive Add PersistentKeepalive int to ClusterEntry. When non-zero the value is forwarded to the PersistentKeepalive field of every Peer CRD the operator creates for nodes in that cluster. This keeps the stateful NAT mapping alive between reconciles — without it the mapping expires after the NAT timeout and the discovered endpoint can no longer be reached from the target side. Typical value for OpenStack-behind-NAT clusters: 25 (seconds). Signed-off-by: IvanHunters --- api/v1alpha1/clustermesh_types.go | 11 ++ internal/controller/clustermesh_controller.go | 126 ++++++++++++++++- internal/kilonode/discovered.go | 129 ++++++++++++++++++ internal/peer/builder.go | 7 +- 4 files changed, 267 insertions(+), 6 deletions(-) create mode 100644 internal/kilonode/discovered.go diff --git a/api/v1alpha1/clustermesh_types.go b/api/v1alpha1/clustermesh_types.go index 11b0e24..4c1476a 100644 --- a/api/v1alpha1/clustermesh_types.go +++ b/api/v1alpha1/clustermesh_types.go @@ -79,6 +79,17 @@ type ClusterEntry struct { // (e.g., host-network ranges, external subnets). // +optional AdditionalCIDRs []string `json:"additionalCIDRs,omitempty"` //nolint:tagliatelle // "additionalCIDRs" is the canonical field name; "CIDR" is a well-known acronym + + // PersistentKeepalive is the interval in seconds at which WireGuard + // sends keepalive packets to peers in this cluster. Set to a non-zero + // value (e.g. 25) for clusters behind NAT so that the stateful NAT + // mapping is refreshed before it expires, enabling bidirectional traffic + // even when the cluster has no directly-routable public IP. + // 0 disables persistent keepalive (default). + // +kubebuilder:validation:Minimum=0 + // +kubebuilder:validation:Maximum=65535 + // +optional + PersistentKeepalive int `json:"persistentKeepalive,omitempty"` } // AllCIDRs returns the union of all CIDRs declared by this cluster entry. diff --git a/internal/controller/clustermesh_controller.go b/internal/controller/clustermesh_controller.go index 4f9a43d..34deac7 100644 --- a/internal/controller/clustermesh_controller.go +++ b/internal/controller/clustermesh_controller.go @@ -19,6 +19,8 @@ package controller import ( "context" "log/slog" + "net" + "strconv" "time" "github.com/cockroachdb/errors" @@ -393,8 +395,10 @@ func (r *ClusterMeshReconciler) handleDeletion(ctx context.Context, log *slog.Lo log.Info("deleting peers for mesh being removed") - for _, entry := range mesh.Spec.Clusters { - for _, targetEntry := range mesh.Spec.Clusters { + for ei := range mesh.Spec.Clusters { + entry := &mesh.Spec.Clusters[ei] + for ti := range mesh.Spec.Clusters { + targetEntry := &mesh.Spec.Clusters[ti] if targetEntry.Name == entry.Name { continue } @@ -639,7 +643,9 @@ func (r *ClusterMeshReconciler) pushPeersToTargets(ctx context.Context, log *slo status.RegisteredPeers = len(desired) - for _, tgtEntry := range mesh.Spec.Clusters { + for i := range mesh.Spec.Clusters { + tgtEntry := &mesh.Spec.Clusters[i] + if tgtEntry.Name == srcEntry.Name { continue } @@ -651,6 +657,28 @@ func (r *ClusterMeshReconciler) pushPeersToTargets(ctx context.Context, log *slo continue } + // Enrich peer endpoints with real NAT-observed IPs from the target + // cluster's nodes. Kilo on every target node records the actual source + // IP of each successful WireGuard handshake in + // kilo.squat.ai/discovered-endpoints. For source clusters behind NAT + // (no ExternalIP, only InternalIP) the discovered IP is the true + // reachable endpoint, whereas the Peer spec may contain only the + // internal address. Preferring the discovered value lets the operator + // self-heal: after the source cluster's Kilo initiates the first + // handshake, subsequent reconciles automatically use the correct + // external endpoint without any manual annotation. + enriched, enrichErr := r.enrichEndpointsFromDiscovered(ctx, log, tgtClient, desired) + if enrichErr != nil { + // Non-fatal: log and continue with the original endpoints. + log.Warn("enriching peer endpoints from discovered-endpoints failed; using configured endpoints", + slog.String("source", srcEntry.Name), + slog.String("target", tgtEntry.Name), + slog.String("error", enrichErr.Error()), + ) + } else { + desired = enriched + } + err = peer.ReconcilePeers(ctx, tgtClient, mesh.Name, srcEntry.Name, desired) if err != nil { return errors.Wrapf(err, "reconciling peers from %q to %q", srcEntry.Name, tgtEntry.Name) @@ -660,6 +688,98 @@ func (r *ClusterMeshReconciler) pushPeersToTargets(ctx context.Context, log *slo return nil } +// enrichEndpointsFromDiscovered replaces the configured endpoint on each +// desired Peer with the endpoint observed via WireGuard handshakes on the +// target cluster's nodes, when a more-specific (non-internal) address is +// available. Source clusters behind NAT (no ExternalIP) only advertise their +// InternalIP as the configured endpoint; the discovered value is the actual +// egress IP after SNAT, which is what the target cluster must use to reach +// them. +// +// The function is best-effort: if the target cluster is unreachable or has no +// discovered-endpoint data, the original peers are returned unchanged. +func (r *ClusterMeshReconciler) enrichEndpointsFromDiscovered( + ctx context.Context, + log *slog.Logger, + tgtClient client.Client, + desired []*kilov1alpha1.Peer, +) ([]*kilov1alpha1.Peer, error) { + discoveredByKey, lookupErr := kilonode.DiscoveredEndpointsByKey(ctx, tgtClient) + if lookupErr != nil { + return desired, errors.Wrap(lookupErr, "listing nodes for discovered-endpoint lookup") + } + + if len(discoveredByKey) == 0 { + return desired, nil + } + + enriched := make([]*kilov1alpha1.Peer, len(desired)) + + for i, peerObj := range desired { + discoveredEndpoint, ok := discoveredByKey[peerObj.Spec.PublicKey] + if !ok { + enriched[i] = peerObj + + continue + } + + // Only override when the discovered address differs from the + // configured one. Skip if the Peer already has the right endpoint. + configured := "" + if peerObj.Spec.Endpoint != nil { + configured = peerObj.Spec.Endpoint.IP + } + + if configured == "" || discoveredEndpoint != configured+":"+strconv.Itoa(int(peerObj.Spec.Endpoint.Port)) { + parsedEndpoint, parseErr := parseDiscoveredEndpoint(discoveredEndpoint) + if parseErr != nil { + log.Warn("ignoring malformed discovered endpoint", + slog.String("peer", peerObj.Name), + slog.String("endpoint", discoveredEndpoint), + slog.String("error", parseErr.Error()), + ) + enriched[i] = peerObj + + continue + } + + updated := peerObj.DeepCopy() + updated.Spec.Endpoint = parsedEndpoint + enriched[i] = updated + + log.Debug("overriding peer endpoint with discovered value", + slog.String("peer", peerObj.Name), + slog.String("configured", configured), + slog.String("discovered", discoveredEndpoint), + ) + + continue + } + + enriched[i] = peerObj + } + + return enriched, nil +} + +// parseDiscoveredEndpoint parses a "host:port" string into a *kilov1alpha1.PeerEndpoint. +func parseDiscoveredEndpoint(hostPort string) (*kilov1alpha1.PeerEndpoint, error) { + host, portStr, splitErr := net.SplitHostPort(hostPort) + if splitErr != nil { + return nil, errors.Wrap(splitErr, "splitting discovered endpoint host:port") + } + + port, parseErr := strconv.ParseUint(portStr, 10, 16) + if parseErr != nil { + return nil, errors.Wrapf(parseErr, "parsing port in discovered endpoint %q", hostPort) + } + + return &kilov1alpha1.PeerEndpoint{ + DNSOrIP: kilov1alpha1.DNSOrIP{IP: host}, + Port: uint32(port), + }, nil +} + // updateStatus sets Ready=True and writes the cluster statuses. func (r *ClusterMeshReconciler) updateStatus(ctx context.Context, mesh *v1alpha1.ClusterMesh, statuses []v1alpha1.ClusterStatus) error { mesh.Status.Clusters = statuses diff --git a/internal/kilonode/discovered.go b/internal/kilonode/discovered.go new file mode 100644 index 0000000..3cec4c9 --- /dev/null +++ b/internal/kilonode/discovered.go @@ -0,0 +1,129 @@ +/* +Copyright 2026 The Kilo Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kilonode + +import ( + "context" + "encoding/json" + "net" + + "github.com/cockroachdb/errors" + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + // AnnotationDiscoveredEndpoints is the node annotation written by Kilo + // containing the WireGuard endpoints observed via successful handshakes. + // Value is a JSON map of base64-encoded WireGuard public key → {IP, Port}. + AnnotationDiscoveredEndpoints = "kilo.squat.ai/discovered-endpoints" +) + +// discoveredEndpointEntry is the JSON shape of a single entry in the +// kilo.squat.ai/discovered-endpoints annotation value. Kilo serialises +// these fields with upper-case keys ("IP", "Port") so the json tags must +// match exactly. +// +//nolint:tagliatelle // Kilo uses upper-case JSON keys: {"IP":"…","Port":…} +type discoveredEndpointEntry struct { + IP string `json:"IP"` + Port int `json:"Port"` +} + +// DiscoveredEndpointsByKey reads kilo.squat.ai/discovered-endpoints from every +// node in the cluster reachable via kubeClient and returns a map of +// WireGuard public-key string → "host:port" endpoint string. Only entries +// where the IP parses as a valid global-scope unicast address are included. +// +// The returned map represents what endpoints peer nodes have actually observed +// for each WireGuard key via successful handshakes. For nodes behind NAT this +// is the real public/NAT IP — more accurate than kilo.squat.ai/endpoint which +// reflects the node's own interface address. +func DiscoveredEndpointsByKey(ctx context.Context, kubeClient client.Client) (map[string]string, error) { + var nodeList corev1.NodeList + + err := kubeClient.List(ctx, &nodeList) + if err != nil { + return nil, errors.Wrap(err, "listing nodes for discovered-endpoint lookup") + } + + result := make(map[string]string) + + for i := range nodeList.Items { + raw, ok := nodeList.Items[i].Annotations[AnnotationDiscoveredEndpoints] + if !ok || raw == "" { + continue + } + + var entries map[string]discoveredEndpointEntry + + unmarshalErr := json.Unmarshal([]byte(raw), &entries) + if unmarshalErr != nil { + // Malformed annotation — skip this node silently. + continue + } + + for pubKey, entry := range entries { + if entry.IP == "" || entry.Port <= 0 { + continue + } + + ip := net.ParseIP(entry.IP) + if ip == nil || ip.IsLoopback() || ip.IsLinkLocalUnicast() { + continue + } + + // First occurrence wins; all nodes that observed a handshake with + // the same key should have seen the same source IP (the peer's NAT + // egress), so any entry is equally authoritative. + if _, seen := result[pubKey]; !seen { + result[pubKey] = net.JoinHostPort(entry.IP, itoa(entry.Port)) + } + } + } + + return result, nil +} + +// itoa converts an int to its decimal string representation without importing +// strconv at the call site. +func itoa(n int) string { + if n == 0 { + return "0" + } + + buf := [20]byte{} + pos := len(buf) + neg := n < 0 + + if neg { + n = -n + } + + for n > 0 { + pos-- + buf[pos] = byte('0' + n%10) + n /= 10 + } + + if neg { + pos-- + buf[pos] = '-' + } + + return string(buf[pos:]) +} diff --git a/internal/peer/builder.go b/internal/peer/builder.go index 4bb4df4..bd5cc09 100644 --- a/internal/peer/builder.go +++ b/internal/peer/builder.go @@ -92,9 +92,10 @@ func BuildPeer(meshName string, entry *v1alpha1.ClusterEntry, node *corev1.Node, Labels: Labels(meshName, entry.Name), }, Spec: kilov1alpha1.PeerSpec{ - AllowedIPs: allowedIPs, - PublicKey: pubKey, - Endpoint: endpoint, + AllowedIPs: allowedIPs, + PublicKey: pubKey, + Endpoint: endpoint, + PersistentKeepalive: entry.PersistentKeepalive, }, } From 24cd3c109d65098f4331fc356da98be7a893a38e Mon Sep 17 00:00:00 2001 From: IvanHunters Date: Fri, 19 Jun 2026 12:51:04 +0300 Subject: [PATCH 2/3] chore: regenerate CRD manifests for PersistentKeepalive field Signed-off-by: IvanHunters --- config/crd/bases/kilo.squat.ai_clustermeshes.yaml | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/config/crd/bases/kilo.squat.ai_clustermeshes.yaml b/config/crd/bases/kilo.squat.ai_clustermeshes.yaml index 64bb653..9274636 100644 --- a/config/crd/bases/kilo.squat.ai_clustermeshes.yaml +++ b/config/crd/bases/kilo.squat.ai_clustermeshes.yaml @@ -89,6 +89,17 @@ spec: maxLength: 63 pattern: ^[a-z0-9]([a-z0-9\-]{0,61}[a-z0-9])?$ type: string + persistentKeepalive: + description: |- + PersistentKeepalive is the interval in seconds at which WireGuard + sends keepalive packets to peers in this cluster. Set to a non-zero + value (e.g. 25) for clusters behind NAT so that the stateful NAT + mapping is refreshed before it expires, enabling bidirectional traffic + even when the cluster has no directly-routable public IP. + 0 disables persistent keepalive (default). + maximum: 65535 + minimum: 0 + type: integer podCIDRs: description: |- PodCIDRs is the list of pod network CIDRs for this cluster. From 34f45072b4955d769e366acac85111798f34c59a Mon Sep 17 00:00:00 2001 From: IvanHunters Date: Fri, 19 Jun 2026 12:55:10 +0300 Subject: [PATCH 3/3] fix(operator): enrich endpoints independently per target; drop custom itoa - desired is no longer mutated across target cluster iterations: each target now receives independently enriched peers (pushDesired) so that different target clusters observing different NAT IPs for the same source peer don't clobber each other. - Replace hand-rolled itoa in kilonode/discovered.go with strconv.Itoa. Signed-off-by: IvanHunters --- internal/controller/clustermesh_controller.go | 10 ++++-- internal/kilonode/discovered.go | 32 ++----------------- 2 files changed, 10 insertions(+), 32 deletions(-) diff --git a/internal/controller/clustermesh_controller.go b/internal/controller/clustermesh_controller.go index 34deac7..31cdeb1 100644 --- a/internal/controller/clustermesh_controller.go +++ b/internal/controller/clustermesh_controller.go @@ -667,6 +667,12 @@ func (r *ClusterMeshReconciler) pushPeersToTargets(ctx context.Context, log *slo // self-heal: after the source cluster's Kilo initiates the first // handshake, subsequent reconciles automatically use the correct // external endpoint without any manual annotation. + // + // Enrichment is computed independently per target cluster: each target + // may observe different source IPs for the same peer (e.g. different + // NAT gateways), so we must not reuse enriched peers across targets. + pushDesired := desired + enriched, enrichErr := r.enrichEndpointsFromDiscovered(ctx, log, tgtClient, desired) if enrichErr != nil { // Non-fatal: log and continue with the original endpoints. @@ -676,10 +682,10 @@ func (r *ClusterMeshReconciler) pushPeersToTargets(ctx context.Context, log *slo slog.String("error", enrichErr.Error()), ) } else { - desired = enriched + pushDesired = enriched } - err = peer.ReconcilePeers(ctx, tgtClient, mesh.Name, srcEntry.Name, desired) + err = peer.ReconcilePeers(ctx, tgtClient, mesh.Name, srcEntry.Name, pushDesired) if err != nil { return errors.Wrapf(err, "reconciling peers from %q to %q", srcEntry.Name, tgtEntry.Name) } diff --git a/internal/kilonode/discovered.go b/internal/kilonode/discovered.go index 3cec4c9..7044216 100644 --- a/internal/kilonode/discovered.go +++ b/internal/kilonode/discovered.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "net" + "strconv" "github.com/cockroachdb/errors" corev1 "k8s.io/api/core/v1" @@ -91,39 +92,10 @@ func DiscoveredEndpointsByKey(ctx context.Context, kubeClient client.Client) (ma // the same key should have seen the same source IP (the peer's NAT // egress), so any entry is equally authoritative. if _, seen := result[pubKey]; !seen { - result[pubKey] = net.JoinHostPort(entry.IP, itoa(entry.Port)) + result[pubKey] = net.JoinHostPort(entry.IP, strconv.Itoa(entry.Port)) } } } return result, nil } - -// itoa converts an int to its decimal string representation without importing -// strconv at the call site. -func itoa(n int) string { - if n == 0 { - return "0" - } - - buf := [20]byte{} - pos := len(buf) - neg := n < 0 - - if neg { - n = -n - } - - for n > 0 { - pos-- - buf[pos] = byte('0' + n%10) - n /= 10 - } - - if neg { - pos-- - buf[pos] = '-' - } - - return string(buf[pos:]) -}