Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions api/v1alpha1/clustermesh_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,17 @@ type ClusterEntry struct {
// (e.g., host-network ranges, external subnets).
// +optional
AdditionalCIDRs []string `json:"additionalCIDRs,omitempty"` //nolint:tagliatelle // "additionalCIDRs" is the canonical field name; "CIDR" is a well-known acronym

// PersistentKeepalive is the interval in seconds at which WireGuard
// sends keepalive packets to peers in this cluster. Set to a non-zero
// value (e.g. 25) for clusters behind NAT so that the stateful NAT
// mapping is refreshed before it expires, enabling bidirectional traffic
// even when the cluster has no directly-routable public IP.
// 0 disables persistent keepalive (default).
// +kubebuilder:validation:Minimum=0
// +kubebuilder:validation:Maximum=65535
// +optional
PersistentKeepalive int `json:"persistentKeepalive,omitempty"`
}

// AllCIDRs returns the union of all CIDRs declared by this cluster entry.
Expand Down
11 changes: 11 additions & 0 deletions config/crd/bases/kilo.squat.ai_clustermeshes.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

134 changes: 130 additions & 4 deletions internal/controller/clustermesh_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package controller
import (
"context"
"log/slog"
"net"
"strconv"
"time"

"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -393,8 +395,10 @@ func (r *ClusterMeshReconciler) handleDeletion(ctx context.Context, log *slog.Lo

log.Info("deleting peers for mesh being removed")

for _, entry := range mesh.Spec.Clusters {
for _, targetEntry := range mesh.Spec.Clusters {
for ei := range mesh.Spec.Clusters {
entry := &mesh.Spec.Clusters[ei]
for ti := range mesh.Spec.Clusters {
targetEntry := &mesh.Spec.Clusters[ti]
if targetEntry.Name == entry.Name {
continue
}
Expand Down Expand Up @@ -639,7 +643,9 @@ func (r *ClusterMeshReconciler) pushPeersToTargets(ctx context.Context, log *slo

status.RegisteredPeers = len(desired)

for _, tgtEntry := range mesh.Spec.Clusters {
for i := range mesh.Spec.Clusters {
tgtEntry := &mesh.Spec.Clusters[i]

if tgtEntry.Name == srcEntry.Name {
continue
}
Expand All @@ -651,7 +657,35 @@ func (r *ClusterMeshReconciler) pushPeersToTargets(ctx context.Context, log *slo
continue
}

err = peer.ReconcilePeers(ctx, tgtClient, mesh.Name, srcEntry.Name, desired)
// Enrich peer endpoints with real NAT-observed IPs from the target
// cluster's nodes. Kilo on every target node records the actual source
// IP of each successful WireGuard handshake in
// kilo.squat.ai/discovered-endpoints. For source clusters behind NAT
// (no ExternalIP, only InternalIP) the discovered IP is the true
// reachable endpoint, whereas the Peer spec may contain only the
// internal address. Preferring the discovered value lets the operator
// self-heal: after the source cluster's Kilo initiates the first
// handshake, subsequent reconciles automatically use the correct
// external endpoint without any manual annotation.
//
// Enrichment is computed independently per target cluster: each target
// may observe different source IPs for the same peer (e.g. different
// NAT gateways), so we must not reuse enriched peers across targets.
pushDesired := desired

enriched, enrichErr := r.enrichEndpointsFromDiscovered(ctx, log, tgtClient, desired)
if enrichErr != nil {
// Non-fatal: log and continue with the original endpoints.
log.Warn("enriching peer endpoints from discovered-endpoints failed; using configured endpoints",
slog.String("source", srcEntry.Name),
slog.String("target", tgtEntry.Name),
slog.String("error", enrichErr.Error()),
)
} else {
pushDesired = enriched
}

err = peer.ReconcilePeers(ctx, tgtClient, mesh.Name, srcEntry.Name, pushDesired)
if err != nil {
Comment thread
coderabbitai[bot] marked this conversation as resolved.
return errors.Wrapf(err, "reconciling peers from %q to %q", srcEntry.Name, tgtEntry.Name)
}
Expand All @@ -660,6 +694,98 @@ func (r *ClusterMeshReconciler) pushPeersToTargets(ctx context.Context, log *slo
return nil
}

// enrichEndpointsFromDiscovered replaces the configured endpoint on each
// desired Peer with the endpoint observed via WireGuard handshakes on the
// target cluster's nodes, when a more-specific (non-internal) address is
// available. Source clusters behind NAT (no ExternalIP) only advertise their
// InternalIP as the configured endpoint; the discovered value is the actual
// egress IP after SNAT, which is what the target cluster must use to reach
// them.
//
// The function is best-effort: if the target cluster is unreachable or has no
// discovered-endpoint data, the original peers are returned unchanged.
func (r *ClusterMeshReconciler) enrichEndpointsFromDiscovered(
ctx context.Context,
log *slog.Logger,
tgtClient client.Client,
desired []*kilov1alpha1.Peer,
) ([]*kilov1alpha1.Peer, error) {
discoveredByKey, lookupErr := kilonode.DiscoveredEndpointsByKey(ctx, tgtClient)
if lookupErr != nil {
return desired, errors.Wrap(lookupErr, "listing nodes for discovered-endpoint lookup")
}

if len(discoveredByKey) == 0 {
return desired, nil
}

enriched := make([]*kilov1alpha1.Peer, len(desired))

for i, peerObj := range desired {
discoveredEndpoint, ok := discoveredByKey[peerObj.Spec.PublicKey]
if !ok {
enriched[i] = peerObj

continue
}

// Only override when the discovered address differs from the
// configured one. Skip if the Peer already has the right endpoint.
configured := ""
if peerObj.Spec.Endpoint != nil {
configured = peerObj.Spec.Endpoint.IP
}

if configured == "" || discoveredEndpoint != configured+":"+strconv.Itoa(int(peerObj.Spec.Endpoint.Port)) {
parsedEndpoint, parseErr := parseDiscoveredEndpoint(discoveredEndpoint)
if parseErr != nil {
log.Warn("ignoring malformed discovered endpoint",
slog.String("peer", peerObj.Name),
slog.String("endpoint", discoveredEndpoint),
slog.String("error", parseErr.Error()),
)
enriched[i] = peerObj

continue
}

updated := peerObj.DeepCopy()
updated.Spec.Endpoint = parsedEndpoint
enriched[i] = updated

log.Debug("overriding peer endpoint with discovered value",
slog.String("peer", peerObj.Name),
slog.String("configured", configured),
slog.String("discovered", discoveredEndpoint),
)

continue
}

enriched[i] = peerObj
}

return enriched, nil
}

// parseDiscoveredEndpoint parses a "host:port" string into a *kilov1alpha1.PeerEndpoint.
func parseDiscoveredEndpoint(hostPort string) (*kilov1alpha1.PeerEndpoint, error) {
host, portStr, splitErr := net.SplitHostPort(hostPort)
if splitErr != nil {
return nil, errors.Wrap(splitErr, "splitting discovered endpoint host:port")
}

port, parseErr := strconv.ParseUint(portStr, 10, 16)
if parseErr != nil {
return nil, errors.Wrapf(parseErr, "parsing port in discovered endpoint %q", hostPort)
}

return &kilov1alpha1.PeerEndpoint{
DNSOrIP: kilov1alpha1.DNSOrIP{IP: host},
Port: uint32(port),
}, nil
}

// updateStatus sets Ready=True and writes the cluster statuses.
func (r *ClusterMeshReconciler) updateStatus(ctx context.Context, mesh *v1alpha1.ClusterMesh, statuses []v1alpha1.ClusterStatus) error {
mesh.Status.Clusters = statuses
Expand Down
101 changes: 101 additions & 0 deletions internal/kilonode/discovered.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
Copyright 2026 The Kilo Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kilonode

import (
"context"
"encoding/json"
"net"
"strconv"

"github.com/cockroachdb/errors"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Comment on lines +19 to +28

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Import the standard strconv package to format the port number instead of using a custom itoa implementation.

Suggested change
import (
"context"
"encoding/json"
"net"
"github.com/cockroachdb/errors"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)
import (
"context"
"encoding/json"
"net"
"strconv"
"github.com/cockroachdb/errors"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)


const (
// AnnotationDiscoveredEndpoints is the node annotation written by Kilo
// containing the WireGuard endpoints observed via successful handshakes.
// Value is a JSON map of base64-encoded WireGuard public key → {IP, Port}.
AnnotationDiscoveredEndpoints = "kilo.squat.ai/discovered-endpoints"
)

// discoveredEndpointEntry is the JSON shape of a single entry in the
// kilo.squat.ai/discovered-endpoints annotation value. Kilo serialises
// these fields with upper-case keys ("IP", "Port") so the json tags must
// match exactly.
//
//nolint:tagliatelle // Kilo uses upper-case JSON keys: {"IP":"…","Port":…}
type discoveredEndpointEntry struct {
IP string `json:"IP"`
Port int `json:"Port"`
}

// DiscoveredEndpointsByKey reads kilo.squat.ai/discovered-endpoints from every
// node in the cluster reachable via kubeClient and returns a map of
// WireGuard public-key string → "host:port" endpoint string. Only entries
// where the IP parses as a valid global-scope unicast address are included.
//
// The returned map represents what endpoints peer nodes have actually observed
// for each WireGuard key via successful handshakes. For nodes behind NAT this
// is the real public/NAT IP — more accurate than kilo.squat.ai/endpoint which
// reflects the node's own interface address.
func DiscoveredEndpointsByKey(ctx context.Context, kubeClient client.Client) (map[string]string, error) {
var nodeList corev1.NodeList

err := kubeClient.List(ctx, &nodeList)
if err != nil {
return nil, errors.Wrap(err, "listing nodes for discovered-endpoint lookup")
}

result := make(map[string]string)

for i := range nodeList.Items {
raw, ok := nodeList.Items[i].Annotations[AnnotationDiscoveredEndpoints]
if !ok || raw == "" {
continue
}

var entries map[string]discoveredEndpointEntry

unmarshalErr := json.Unmarshal([]byte(raw), &entries)
if unmarshalErr != nil {
// Malformed annotation — skip this node silently.
continue
}

for pubKey, entry := range entries {
if entry.IP == "" || entry.Port <= 0 {
continue
}

ip := net.ParseIP(entry.IP)
if ip == nil || ip.IsLoopback() || ip.IsLinkLocalUnicast() {
continue
Comment on lines +86 to +88

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Reject unspecified and multicast discovered IPs before accepting them.

Current validation still accepts addresses like 0.0.0.0 / :: (and multicast), which can propagate unusable endpoints into reconciliation.

Proposed fix
-			ip := net.ParseIP(entry.IP)
-			if ip == nil || ip.IsLoopback() || ip.IsLinkLocalUnicast() {
+			ip := net.ParseIP(entry.IP)
+			if ip == nil || ip.IsUnspecified() || ip.IsMulticast() || ip.IsLoopback() || ip.IsLinkLocalUnicast() {
 				continue
 			}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ip := net.ParseIP(entry.IP)
if ip == nil || ip.IsLoopback() || ip.IsLinkLocalUnicast() {
continue
ip := net.ParseIP(entry.IP)
if ip == nil || ip.IsUnspecified() || ip.IsMulticast() || ip.IsLoopback() || ip.IsLinkLocalUnicast() {
continue
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/kilonode/discovered.go` around lines 85 - 87, The IP validation
logic in the discovered.go file around the ParseIP call is incomplete and allows
unspecified addresses like 0.0.0.0 and :: as well as multicast addresses to pass
through, which can later propagate unusable endpoints. Extend the existing
conditional check that validates ip.IsLoopback() and ip.IsLinkLocalUnicast() to
also call ip.IsUnspecified() and ip.IsMulticast() to reject these address types
before accepting them into the endpoints list.

}

// First occurrence wins; all nodes that observed a handshake with
// the same key should have seen the same source IP (the peer's NAT
// egress), so any entry is equally authoritative.
if _, seen := result[pubKey]; !seen {
result[pubKey] = net.JoinHostPort(entry.IP, strconv.Itoa(entry.Port))
}
}
}

return result, nil
}
7 changes: 4 additions & 3 deletions internal/peer/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,10 @@ func BuildPeer(meshName string, entry *v1alpha1.ClusterEntry, node *corev1.Node,
Labels: Labels(meshName, entry.Name),
},
Spec: kilov1alpha1.PeerSpec{
AllowedIPs: allowedIPs,
PublicKey: pubKey,
Endpoint: endpoint,
AllowedIPs: allowedIPs,
PublicKey: pubKey,
Endpoint: endpoint,
PersistentKeepalive: entry.PersistentKeepalive,
},
}

Expand Down
Loading