diff --git a/charts/kamaji/README.md b/charts/kamaji/README.md index 69a9cefd..e06040d3 100644 --- a/charts/kamaji/README.md +++ b/charts/kamaji/README.md @@ -122,6 +122,7 @@ Here the values you can override: | telemetry | object | `{"disabled":false}` | Disable the analytics traces collection | | temporaryDirectoryPath | string | `"/tmp/kamaji"` | Directory which will be used to work with temporary files. (default "/tmp/kamaji") | | tolerations | list | `[]` | Kubernetes node taints that the Kamaji controller pods would tolerate | +| watchNamespaces | list | `[]` | Optional list of namespaces the operator restricts its watch to. When empty (default) Kamaji watches every namespace. Cluster-scoped resources (DataStore, ClusterRole, ...) are never affected, and the release namespace is always watched implicitly to keep the migration Job lifecycle working. | ## Installing and managing etcd as DataStore diff --git a/charts/kamaji/templates/controller.yaml b/charts/kamaji/templates/controller.yaml index 63178382..6bd3f608 100644 --- a/charts/kamaji/templates/controller.yaml +++ b/charts/kamaji/templates/controller.yaml @@ -32,6 +32,9 @@ spec: {{- if not (eq .Values.defaultDatastoreName "") }} - --datastore={{ .Values.defaultDatastoreName }} {{- end }} + {{- with .Values.watchNamespaces }} + - --watch-namespaces={{ join "," . }} + {{- end }} {{- if .Values.telemetry.disabled }} - --disable-telemetry {{- end }} diff --git a/charts/kamaji/templates/kubeconfiggenerator-deployment.yaml b/charts/kamaji/templates/kubeconfiggenerator-deployment.yaml index d7199d21..c090ec83 100644 --- a/charts/kamaji/templates/kubeconfiggenerator-deployment.yaml +++ b/charts/kamaji/templates/kubeconfiggenerator-deployment.yaml @@ -28,10 +28,18 @@ spec: - kubeconfig-generator - --health-probe-bind-address={{ .Values.kubeconfigGenerator.healthProbeBindAddress }} - --leader-elect={{ .Values.kubeconfigGenerator.enableLeaderElect }} + {{- with .Values.watchNamespaces }} + - --watch-namespaces={{ join "," . }} + {{- end }} {{- if .Values.kubeconfigGenerator.loggingDevel.enable }}- --zap-devel{{- end }} {{- with .Values.kubeconfigGenerator.extraArgs }} {{- toYaml . | nindent 10 }} {{- end }} + env: + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" imagePullPolicy: {{ .Values.image.pullPolicy }} name: controller diff --git a/charts/kamaji/values.yaml b/charts/kamaji/values.yaml index 0e99af10..a3dac538 100644 --- a/charts/kamaji/values.yaml +++ b/charts/kamaji/values.yaml @@ -98,6 +98,13 @@ loggingDevel: # -- If specified, all the Kamaji instances with an unassigned DataStore will inherit this default value. defaultDatastoreName: default +# -- Optional list of namespaces the operator restricts its watch to. +# When empty (default) Kamaji watches every namespace. +# Cluster-scoped resources (DataStore, ClusterRole, ...) are never affected, +# and the release namespace is always watched implicitly to keep the migration +# Job lifecycle working. +watchNamespaces: [] + # -- Subchart: See https://github.com/clastix/kamaji-etcd/blob/master/charts/kamaji-etcd/values.yaml kamaji-etcd: deploy: true diff --git a/cmd/kubeconfig-generator/cmd.go b/cmd/kubeconfig-generator/cmd.go index 7654bf57..24be8614 100644 --- a/cmd/kubeconfig-generator/cmd.go +++ b/cmd/kubeconfig-generator/cmd.go @@ -14,17 +14,17 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/rest" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + cmdutils "github.com/clastix/kamaji/cmd/utils" "github.com/clastix/kamaji/controllers" "github.com/clastix/kamaji/internal" + kamajimanager "github.com/clastix/kamaji/internal/manager" "github.com/clastix/kamaji/internal/metrics" ) @@ -38,6 +38,7 @@ func NewCmd(scheme *runtime.Scheme) *cobra.Command { cacheResyncPeriod time.Duration managerNamespace string certificateExpirationDeadline time.Duration + watchNamespaces []string ) cmd := &cobra.Command{ @@ -45,15 +46,31 @@ func NewCmd(scheme *runtime.Scheme) *cobra.Command { Short: "Start the Kubeconfig Generator manager", SilenceErrors: false, SilenceUsage: true, - PreRunE: func(*cobra.Command, []string) error { + PreRunE: func(cmd *cobra.Command, _ []string) error { // Avoid polluting stdout with useless details by the underlying klog implementations klog.SetOutput(io.Discard) klog.LogToStderr(false) + // pod-namespace is required: the operator merges it into the + // cache watch set when --watch-namespaces is non-empty (so + // leader-election Lease access stays in scope), and the + // migration Job watch needs it as well. The chart projects + // it from metadata.namespace; binary-direct callers must + // pass --pod-namespace=$NS or set POD_NAMESPACE. + err := cmdutils.CheckFlags(cmd.Flags(), "pod-namespace") + if err != nil { + return err + } + if certificateExpirationDeadline < 24*time.Hour { return fmt.Errorf("certificate expiration deadline must be at least 24 hours") } + err = kamajimanager.ValidateNamespaces(watchNamespaces) + if err != nil { + return err + } + return nil }, RunE: func(*cobra.Command, []string) error { @@ -67,6 +84,18 @@ func NewCmd(scheme *runtime.Scheme) *cobra.Command { setupLog.Info(fmt.Sprintf("Go Version: %s", goRuntime.Version())) setupLog.Info(fmt.Sprintf("Go OS/Arch: %s/%s", goRuntime.GOOS, goRuntime.GOARCH)) + // kubeconfig-generator watches the cluster-scoped KubeconfigGenerator + // CRD plus TenantControlPlane resources and labelled kubeconfig + // Secrets in tenant namespaces. The install namespace is included + // for symmetry with the main controller and to keep + // leader-election working defensively if controller-runtime ever + // routes the Lease informer through the manager cache. + cachedNamespaces := kamajimanager.MergeWatchedNamespaces(watchNamespaces, managerNamespace) + + if len(cachedNamespaces) > 0 { + setupLog.Info("restricting cache to namespaces", "namespaces", cachedNamespaces) + } + ctrlOpts := ctrl.Options{ Scheme: scheme, Metrics: metricsserver.Options{ @@ -76,11 +105,7 @@ func NewCmd(scheme *runtime.Scheme) *cobra.Command { LeaderElection: leaderElect, LeaderElectionNamespace: managerNamespace, LeaderElectionID: "kubeconfiggenerator.kamaji.clastix.io", - NewCache: func(config *rest.Config, opts cache.Options) (cache.Cache, error) { - opts.SyncPeriod = &cacheResyncPeriod - - return cache.New(config, opts) - }, + NewCache: kamajimanager.NewCacheFunc(cacheResyncPeriod, cachedNamespaces), } triggerChan := make(chan event.GenericEvent) @@ -160,6 +185,7 @@ func NewCmd(scheme *runtime.Scheme) *cobra.Command { cmd.Flags().DurationVar(&cacheResyncPeriod, "cache-resync-period", 10*time.Hour, "The controller-runtime.Manager cache resync period.") cmd.Flags().StringVar(&managerNamespace, "pod-namespace", os.Getenv("POD_NAMESPACE"), "The Kubernetes Namespace on which the Operator is running in, required for the TenantControlPlane migration jobs.") cmd.Flags().DurationVar(&certificateExpirationDeadline, "certificate-expiration-deadline", 24*time.Hour, "Define the deadline upon certificate expiration to start the renewal process, cannot be less than a 24 hours.") + cmd.Flags().StringSliceVar(&watchNamespaces, "watch-namespaces", nil, "Optional, comma-separated list of namespaces the controller should watch for TenantControlPlane (and dependent) resources. When empty every namespace is watched. Cluster-scoped resources are never affected by this flag, and the install namespace is always watched implicitly.") cobra.OnInitialize(func() { viper.AutomaticEnv() diff --git a/cmd/manager/cmd.go b/cmd/manager/cmd.go index aa264ae0..a2bbb575 100644 --- a/cmd/manager/cmd.go +++ b/cmd/manager/cmd.go @@ -17,10 +17,8 @@ import ( "github.com/spf13/viper" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/discovery" - "k8s.io/client-go/rest" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" @@ -33,6 +31,7 @@ import ( "github.com/clastix/kamaji/controllers/soot" "github.com/clastix/kamaji/internal" "github.com/clastix/kamaji/internal/builders/controlplane" + kamajimanager "github.com/clastix/kamaji/internal/manager" "github.com/clastix/kamaji/internal/metrics" "github.com/clastix/kamaji/internal/utilities" "github.com/clastix/kamaji/internal/webhook" @@ -60,6 +59,7 @@ func NewCmd(scheme *runtime.Scheme) *cobra.Command { maxConcurrentReconciles int disableTelemetry bool certificateExpirationDeadline time.Duration + watchNamespaces []string webhookCAPath string ) @@ -90,6 +90,11 @@ func NewCmd(scheme *runtime.Scheme) *cobra.Command { return fmt.Errorf("the controller reconcile timeout must be greater than zero") } + err = kamajimanager.ValidateNamespaces(watchNamespaces) + if err != nil { + return err + } + return nil }, RunE: func(*cobra.Command, []string) error { @@ -109,6 +114,16 @@ func NewCmd(scheme *runtime.Scheme) *cobra.Command { telemetryClient = telemetryclient.NewNewOp() } + // When namespace scoping is requested, the operator's install namespace + // must remain in the watch set: TenantControlPlane migration Jobs are + // created and watched there (see TenantControlPlaneReconciler.Watches + // on batchv1.Job, predicated by KamajiNamespace). + cachedNamespaces := kamajimanager.MergeWatchedNamespaces(watchNamespaces, managerNamespace) + + if len(cachedNamespaces) > 0 { + setupLog.Info("restricting cache to namespaces", "namespaces", cachedNamespaces) + } + ctrlOpts := ctrl.Options{ Scheme: scheme, Metrics: metricsserver.Options{ @@ -121,11 +136,7 @@ func NewCmd(scheme *runtime.Scheme) *cobra.Command { LeaderElection: leaderElect, LeaderElectionNamespace: managerNamespace, LeaderElectionID: "kamaji.clastix.io", - NewCache: func(config *rest.Config, opts cache.Options) (cache.Cache, error) { - opts.SyncPeriod = &cacheResyncPeriod - - return cache.New(config, opts) - }, + NewCache: kamajimanager.NewCacheFunc(cacheResyncPeriod, cachedNamespaces), } mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrlOpts) @@ -336,6 +347,7 @@ func NewCmd(scheme *runtime.Scheme) *cobra.Command { cmd.Flags().DurationVar(&cacheResyncPeriod, "cache-resync-period", 10*time.Hour, "The controller-runtime.Manager cache resync period.") cmd.Flags().BoolVar(&disableTelemetry, "disable-telemetry", false, "Disable the analytics traces collection.") cmd.Flags().DurationVar(&certificateExpirationDeadline, "certificate-expiration-deadline", 24*time.Hour, "Define the deadline upon certificate expiration to start the renewal process, cannot be less than a 24 hours.") + cmd.Flags().StringSliceVar(&watchNamespaces, "watch-namespaces", nil, "Optional, comma-separated list of namespaces the operator should watch for TenantControlPlane (and dependent) resources. When empty Kamaji watches every namespace. Cluster-scoped resources are never affected by this flag, and the operator's own install namespace is always watched implicitly.") cobra.OnInitialize(func() { viper.AutomaticEnv() diff --git a/docs/content/guides/namespace-scoping.md b/docs/content/guides/namespace-scoping.md new file mode 100644 index 00000000..373c7141 --- /dev/null +++ b/docs/content/guides/namespace-scoping.md @@ -0,0 +1,84 @@ +# Restricting the namespaces watched by Kamaji + +By default Kamaji watches every namespace of the management cluster for +`TenantControlPlane` resources and the dependent objects it owns +(`Secret`, `ConfigMap`, `Deployment`, `Service`, `Ingress`, ...). + +In multi-team or multi-tenant management clusters this is sometimes too broad. +You can opt-in to a smaller watch surface with the `--watch-namespaces` flag, +which leverages controller-runtime's per-namespace cache. + +## The `--watch-namespaces` flag + +```text +--watch-namespaces=team-a,team-b +``` + +- Accepts a comma-separated list (or repeated `--watch-namespaces=...` flags). +- Each entry must be a valid Kubernetes namespace identifier (DNS-1123 + label); invalid values cause the operator to fail fast at startup rather + than surface opaque watch errors at runtime. +- When **omitted or empty**, Kamaji keeps the default cluster-wide behaviour. +- When set, namespaced informers only watch the listed namespaces. +- The operator's own install namespace is always added to the watch set + implicitly: `TenantControlPlane` migration `Job`s live there and would not + be reconciled otherwise. +- The flag is honoured by both the main controller and the optional + **kubeconfig-generator** deployment; the Helm chart threads the same + `watchNamespaces` value into both. + +## What is and is not affected + +Namespace scoping only constrains **namespaced** resources. Cluster-scoped +resources continue to be cached cluster-wide and keep working unchanged: + +| Resource | Scope | Honours `--watch-namespaces`? | +| ----------------------------------- | -------------- | ----------------------------- | +| `TenantControlPlane` | Namespaced | Yes | +| `Secret`, `ConfigMap`, `Deployment`, `Service`, `Ingress` (TCP children) | Namespaced | Yes | +| Migration `Job` in the install ns | Namespaced | Implicitly always included | +| `DataStore` | Cluster | No (always cluster-wide) | +| `ValidatingWebhookConfiguration` | Cluster | No | +| `ClusterRole`, `ClusterRoleBinding` | Cluster | No | +| Soot controllers (kubeadm phases, kube-proxy, CoreDNS, ...) | N/A | These run against the **tenant** cluster's API server with their own cache, so the management-cluster scoping does not apply to them. | + +## Caveats + +- **Gateway API**: when a `TenantControlPlane` references a `Gateway` that + lives in a namespace **outside** the watch set, the operator will not be able + to read it from the cache. Add every namespace hosting referenced + `Gateway` resources to `--watch-namespaces`. +- **RBAC**: scoping only affects what the cache subscribes to, not what the + Kubernetes API authorises. The default Helm chart still installs a + `ClusterRole` to keep upgrades and cluster-scoped reconciliations safe. +- **Scaling**: controller-runtime allocates one informer per `(namespace, + kind)` pair when `DefaultNamespaces` is set. Kamaji watches roughly seven + namespaced kinds in the management cluster (`TenantControlPlane` plus its + owned `Secret`, `ConfigMap`, `Deployment`, `Service`, `Ingress`, `Job`). + Listing a few dozen namespaces is comfortable; listing several hundred + multiplies the number of `LIST/WATCH` connections and goroutines and may + exceed client-go QPS defaults — at that point the cluster-wide single + informer is cheaper. Prefer per-cluster Kamaji instances over very long + watch lists. + +## Helm + +The chart exposes a top-level `watchNamespaces` value that maps to the flag: + +```yaml +# values.yaml +watchNamespaces: + - team-a + - team-b +``` + +Or via `--set` on the command line: + +```bash +helm upgrade --install kamaji clastix/kamaji \ + --namespace kamaji-system --create-namespace \ + --set 'watchNamespaces={team-a,team-b}' +``` + +Leaving the list empty (the default) renders no `--watch-namespaces` argument +and preserves the cluster-wide behaviour. diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 69eb3e97..39a0f425 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -76,6 +76,7 @@ nav: - guides/backup-and-restore.md - guides/certs-lifecycle.md - guides/pausing.md + - guides/namespace-scoping.md - guides/write-permissions.md - guides/datastore-migration.md - guides/datastore-overrides.md diff --git a/e2e/worker_kubeadm_join_test.go b/e2e/worker_kubeadm_join_test.go index 094e1c0c..9584251c 100644 --- a/e2e/worker_kubeadm_join_test.go +++ b/e2e/worker_kubeadm_join_test.go @@ -140,6 +140,11 @@ var _ = Describe("starting a kind worker with kubeadm", func() { By("enabling br_netfilter", func() { exitCode, stdout, err := workerContainer.Exec(ctx, []string{"modprobe", "br_netfilter"}) + if err != nil { + _, _ = fmt.Fprintln(GinkgoWriter, "modprobe error: "+err.Error()) + + return + } out, _ := io.ReadAll(stdout) if len(out) > 0 { @@ -149,14 +154,15 @@ var _ = Describe("starting a kind worker with kubeadm", func() { if exitCode != 0 { _, _ = fmt.Fprintln(GinkgoWriter, "modprobe exit code: "+strconv.FormatUint(uint64(exitCode), 10)) } - - if err != nil { - _, _ = fmt.Fprintln(GinkgoWriter, "modprobe error: "+err.Error()) - } }) By("disabling swap", func() { exitCode, stdout, err := workerContainer.Exec(ctx, []string{"swapoff", "-a"}) + if err != nil { + _, _ = fmt.Fprintln(GinkgoWriter, "swapoff error: "+err.Error()) + + return + } out, _ := io.ReadAll(stdout) if len(out) > 0 { @@ -166,16 +172,13 @@ var _ = Describe("starting a kind worker with kubeadm", func() { if exitCode != 0 { _, _ = fmt.Fprintln(GinkgoWriter, "swapoff exit code: "+strconv.FormatUint(uint64(exitCode), 10)) } - - if err != nil { - _, _ = fmt.Fprintln(GinkgoWriter, "swapoff error: "+err.Error()) - } }) By("executing the command in the worker node", func() { cmds := append(strings.Split(strings.TrimSpace(joinCommandBuffer.String()), " "), "--ignore-preflight-errors=SystemVerification,FileExisting") exitCode, stdout, err := workerContainer.Exec(ctx, cmds) + Expect(err).ToNot(HaveOccurred()) out, _ := io.ReadAll(stdout) if len(out) > 0 { @@ -183,7 +186,6 @@ var _ = Describe("starting a kind worker with kubeadm", func() { } Expect(exitCode).To(Equal(0)) - Expect(err).ToNot(HaveOccurred()) }) By("waiting for nodes", func() { diff --git a/e2e/zz_watch_namespaces_test.go b/e2e/zz_watch_namespaces_test.go new file mode 100644 index 00000000..65489dde --- /dev/null +++ b/e2e/zz_watch_namespaces_test.go @@ -0,0 +1,324 @@ +// Copyright 2022 Clastix Labs +// SPDX-License-Identifier: Apache-2.0 + +// This spec mutates the running kamaji controller-manager Deployment to assert +// that --watch-namespaces correctly scopes the cache. It is named with the +// `zz_` prefix so Ginkgo executes it after every other spec in the suite, +// minimising the blast radius if the operator restore step were to misbehave. +// Every state change registers a DeferCleanup immediately after capture so the +// original Deployment args are restored even when an assertion aborts the run. + +package e2e + +import ( + "context" + "fmt" + "strings" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" + pointer "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" + + kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1" +) + +// isWebhookUnreachable returns true when err describes a transient inability +// to reach the kamaji validating/mutating webhook service. The kube-apiserver +// surfaces these as 500 InternalError with a "failed calling webhook" prefix; +// past that prefix we treat any common network-layer or post-rollout warm-up +// signature as retriable. Anything else (validation errors, conflicts, etc.) +// falls through and aborts the spec. +func isWebhookUnreachable(err error) bool { + if err == nil { + return false + } + + msg := err.Error() + if !strings.Contains(msg, "failed calling webhook") { + return false + } + + switch { + case strings.Contains(msg, "connection refused"), + strings.Contains(msg, "EOF"), + strings.Contains(msg, "context deadline exceeded"), + strings.Contains(msg, "no endpoints available"), + strings.Contains(msg, "no route to host"), + strings.Contains(msg, "i/o timeout"), + strings.Contains(msg, "service unavailable"), + strings.Contains(msg, "tls: handshake failure"), + strings.Contains(msg, "x509:"), + strings.Contains(msg, "not found"): + return true + } + + return false +} + +// Ordered guarantees in-suite ordering; Serial additionally prevents this +// describe from running in parallel with any other one (e.g. if the suite +// is later invoked with `ginkgo -p`). Both are required because we mutate +// the shared kamaji controller-manager Deployment in BeforeAll. +var _ = Describe("--watch-namespaces", Ordered, Serial, func() { + const ( + kamajiNamespace = "kamaji-system" + controllerLabelKey = "app.kubernetes.io/component" + controllerLabelValue = "controller-manager" + managerContainerName = "manager" + watchedNs = "watch-ns-allowed" + unwatchedNs = "watch-ns-blocked" + rolloutTimeout = 90 * time.Second + reconcileWindow = 2 * time.Minute + ignoreWindow = 60 * time.Second + ) + + var ( + deploymentKey types.NamespacedName + originalArgs []string + ) + + // The kubeconfig-generator Deployment shares the exact same selector + // labels as the controller-manager (both use kamaji.selectorLabels in + // the chart), so we further filter by the container name to + // disambiguate. The chart pins the controller container to "manager" + // and the kubeconfig-generator container to "controller", which is + // stable across `extraArgs` rendering changes. + findManagerDeployment := func() *appsv1.Deployment { + GinkgoHelper() + + list := &appsv1.DeploymentList{} + Expect(k8sClient.List(context.Background(), list, + client.InNamespace(kamajiNamespace), + client.MatchingLabels{controllerLabelKey: controllerLabelValue}, + )).To(Succeed()) + + var matches []*appsv1.Deployment + + for i := range list.Items { + d := &list.Items[i] + for _, c := range d.Spec.Template.Spec.Containers { + if c.Name == managerContainerName { + matches = append(matches, d) + + break + } + } + } + + Expect(matches).To(HaveLen(1), "expected exactly one kamaji controller-manager Deployment in %s (container name=%q)", kamajiNamespace, managerContainerName) + + return matches[0] + } + + // waitForRollout waits until the Deployment status is fully rolled out AND + // only the new generation's pods remain. The standard Deployment-status + // gate (UpdatedReplicas / AvailableReplicas) flips before the old + // replica-set's pods finish terminating, which leaves PrintKamajiLogs in + // utils_test.go observing two pods (its HaveLen(1) assertion fails) and + // briefly leaves the webhook Service Endpoints with a stale address. The + // strict pod-count check (len(items) == replicas) covers both: terminating + // pods are still listed until kubelet finishes their grace period, and + // only that strict count guarantees a clean steady state. + waitForRollout := func() { + GinkgoHelper() + + Eventually(func(g Gomega) { + d := &appsv1.Deployment{} + g.Expect(k8sClient.Get(context.Background(), deploymentKey, d)).To(Succeed()) + g.Expect(d.Status.ObservedGeneration).To(BeNumerically(">=", d.Generation)) + g.Expect(d.Status.UpdatedReplicas).To(Equal(d.Status.Replicas)) + g.Expect(d.Status.AvailableReplicas).To(Equal(d.Status.Replicas)) + + want := int32(1) + if d.Spec.Replicas != nil { + want = *d.Spec.Replicas + } + + pods := &corev1.PodList{} + g.Expect(k8sClient.List(context.Background(), pods, + client.InNamespace(kamajiNamespace), + client.MatchingLabels{controllerLabelKey: controllerLabelValue}, + )).To(Succeed()) + + g.Expect(pods.Items).To(HaveLen(int(want)), "expected exactly %d controller-manager pod(s) and no leftover terminating ones", want) + + for _, p := range pods.Items { + g.Expect(p.DeletionTimestamp).To(BeNil(), "pod %s/%s is still terminating", p.Namespace, p.Name) + g.Expect(p.Status.Phase).To(Equal(corev1.PodRunning), "pod %s/%s phase=%s", p.Namespace, p.Name, p.Status.Phase) + } + }, rolloutTimeout, 2*time.Second).Should(Succeed()) + } + + // waitForWebhookReady polls the kamaji-webhook-service EndpointSlices + // until at least one endpoint is Ready. The container's readiness probe + // targets the manager's healthz port (8081), not the webhook port (9443), + // so EndpointSlice readiness only proves that the pod is up — not that + // the webhook server has finished binding 9443 or that kube-proxy has + // finished syncing the new ClusterIP backend. The actual webhook reach + // is exercised by retryUntilWebhookReady around each Create call. + waitForWebhookReady := func() { + GinkgoHelper() + + Eventually(func(g Gomega) { + slices := &discoveryv1.EndpointSliceList{} + g.Expect(k8sClient.List(context.Background(), slices, + client.InNamespace(kamajiNamespace), + client.MatchingLabels{discoveryv1.LabelServiceName: "kamaji-webhook-service"}, + )).To(Succeed()) + + ready := 0 + + for _, s := range slices.Items { + for _, ep := range s.Endpoints { + if ep.Conditions.Ready != nil && *ep.Conditions.Ready { + ready += len(ep.Addresses) + } + } + } + + g.Expect(ready).To(BeNumerically(">=", 1), "kamaji-webhook-service has no Ready endpoints yet") + }, 60*time.Second, time.Second).Should(Succeed()) + } + + setManagerArgs := func(args []string) { + GinkgoHelper() + + Expect(retry.RetryOnConflict(retry.DefaultRetry, func() error { + d := &appsv1.Deployment{} + if err := k8sClient.Get(context.Background(), deploymentKey, d); err != nil { + return err + } + + d.Spec.Template.Spec.Containers[0].Args = append([]string(nil), args...) + + return k8sClient.Update(context.Background(), d) + })).To(Succeed()) + + waitForRollout() + waitForWebhookReady() + } + + // createTCPWithRetry calls k8sClient.Create and retries while the kamaji + // validating/mutating webhook is unreachable. After a Deployment rollout + // the webhook Service routes can briefly serve "connection refused" — the + // pod's readiness probe targets healthz (8081) and flips green before the + // webhook server (9443) has finished binding, and kube-proxy may also + // lag by one informer tick. Retrying with a tight backoff makes the test + // independent of those timing details. Any other error fails immediately. + createTCPWithRetry := func(tcp *kamajiv1alpha1.TenantControlPlane) { + GinkgoHelper() + + Eventually(func(g Gomega) { + err := k8sClient.Create(context.Background(), tcp) + if err == nil || apierrors.IsAlreadyExists(err) { + return + } + + if isWebhookUnreachable(err) { + g.Expect(err).NotTo(HaveOccurred(), "webhook still unreachable, retrying") + + return + } + + Fail(fmt.Sprintf("unexpected non-retriable error from Create: %v", err)) + }, 60*time.Second, time.Second).Should(Succeed()) + } + + createNamespace := func(name string) { + GinkgoHelper() + + ns := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: name}} + if err := k8sClient.Create(context.Background(), ns); err != nil && !apierrors.IsAlreadyExists(err) { + Fail(err.Error()) + } + } + + tcpFor := func(name, namespace string) *kamajiv1alpha1.TenantControlPlane { + return &kamajiv1alpha1.TenantControlPlane{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: kamajiv1alpha1.TenantControlPlaneSpec{ + ControlPlane: kamajiv1alpha1.ControlPlane{ + Deployment: kamajiv1alpha1.DeploymentSpec{Replicas: pointer.To(int32(1))}, + Service: kamajiv1alpha1.ServiceSpec{ServiceType: "ClusterIP"}, + }, + NetworkProfile: kamajiv1alpha1.NetworkProfileSpec{Address: "172.18.0.2"}, + Kubernetes: kamajiv1alpha1.KubernetesSpec{ + Version: "v1.23.6", + Kubelet: kamajiv1alpha1.KubeletSpec{CGroupFS: "cgroupfs"}, + }, + }, + } + } + + statusOf := func(tcp *kamajiv1alpha1.TenantControlPlane) *kamajiv1alpha1.KubernetesVersionStatus { + got := &kamajiv1alpha1.TenantControlPlane{} + if err := k8sClient.Get(context.Background(), client.ObjectKeyFromObject(tcp), got); err != nil { + return nil + } + + return got.Status.Kubernetes.Version.Status + } + + BeforeAll(func() { + By("snapshotting the kamaji controller-manager Deployment") + d := findManagerDeployment() + deploymentKey = client.ObjectKeyFromObject(d) + originalArgs = append([]string(nil), d.Spec.Template.Spec.Containers[0].Args...) + DeferCleanup(func() { + By("restoring the kamaji controller-manager Deployment args") + setManagerArgs(originalArgs) + }) + + By("creating the test namespaces") + for _, name := range []string{watchedNs, unwatchedNs} { + createNamespace(name) + DeferCleanup(func(target string) func() { + return func() { + By("deleting test namespace " + target) + _ = k8sClient.Delete(context.Background(), &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: target}}) + } + }(name)) + } + + By("patching the kamaji Deployment with --watch-namespaces=" + watchedNs) + setManagerArgs(append(append([]string(nil), originalArgs...), "--watch-namespaces="+watchedNs)) + }) + + It("reconciles a TenantControlPlane in a watched namespace", func() { + tcp := tcpFor("tcp-watched", watchedNs) + + createTCPWithRetry(tcp) + DeferCleanup(func() { + _ = k8sClient.Delete(context.Background(), tcp) + }) + + Eventually(func() *kamajiv1alpha1.KubernetesVersionStatus { + return statusOf(tcp) + }, reconcileWindow, 2*time.Second).ShouldNot(BeNil(), "operator must reconcile TCPs in watched namespaces") + }) + + It("ignores a TenantControlPlane in an unwatched namespace", func() { + tcp := tcpFor("tcp-ignored", unwatchedNs) + + createTCPWithRetry(tcp) + DeferCleanup(func() { + _ = k8sClient.Delete(context.Background(), tcp) + }) + + Consistently(func() *kamajiv1alpha1.KubernetesVersionStatus { + return statusOf(tcp) + }, ignoreWindow, 5*time.Second).Should(BeNil(), "operator must not touch TCPs outside the watched namespaces") + }) +}) diff --git a/internal/manager/cache.go b/internal/manager/cache.go new file mode 100644 index 00000000..2601887c --- /dev/null +++ b/internal/manager/cache.go @@ -0,0 +1,148 @@ +// Copyright 2022 Clastix Labs +// SPDX-License-Identifier: Apache-2.0 + +// Package manager hosts helpers that wire Kamaji-specific configuration into +// controller-runtime's manager and cache. +package manager + +import ( + "fmt" + "strings" + "time" + + "k8s.io/apimachinery/pkg/util/validation" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/cache" +) + +// NewCacheFunc returns a cache.NewCacheFunc suitable for ctrl.Options.NewCache. +// +// The returned function applies the given resync period to every informer and, +// when namespaces is non-empty, restricts namespaced informers to that set via +// cache.Options.DefaultNamespaces. Cluster-scoped resources (CRDs, ClusterRole, +// ClusterRoleBinding, ValidatingWebhookConfiguration, the cluster-scoped +// DataStore CRD, ...) are never affected by namespace scoping. +// +// The helper does not implicitly add any namespace to the set: callers that +// must keep watching their own install namespace are responsible for including +// it before invoking NewCacheFunc. +func NewCacheFunc(resyncPeriod time.Duration, namespaces []string) cache.NewCacheFunc { + return func(config *rest.Config, opts cache.Options) (cache.Cache, error) { + return cache.New(config, applyOptions(opts, resyncPeriod, namespaces)) + } +} + +// MergeWatchedNamespaces composes the final list of namespaces a manager must +// keep in its cache scope. When the user-supplied list is empty (nil, zero +// length, or only whitespace entries) the result is nil and cluster-wide +// caching stays on — extras are intentionally dropped in that case so a +// stray "--watch-namespaces= " on the command line cannot silently scope the +// cache to the install namespace alone. Otherwise the user list is +// concatenated with extras, then trimmed, deduplicated and stripped of +// blanks while preserving first-seen order so log lines and informer setup +// stay deterministic. Neither input slice is mutated. +// +// Typical use: the caller passes the operator's install namespace through +// extras to guarantee the migration Job watch keeps working when scoping is +// enabled. +func MergeWatchedNamespaces(user []string, extras ...string) []string { + hasUserEntry := false + + for _, v := range user { + if strings.TrimSpace(v) != "" { + hasUserEntry = true + + break + } + } + + if !hasUserEntry { + return nil + } + + out := make([]string, 0, len(user)+len(extras)) + seen := make(map[string]struct{}, len(user)+len(extras)) + + add := func(values []string) { + for _, v := range values { + v = strings.TrimSpace(v) + if v == "" { + continue + } + + if _, ok := seen[v]; ok { + continue + } + + seen[v] = struct{}{} + out = append(out, v) + } + } + + add(user) + add(extras) + + return out +} + +// ValidateNamespaces checks that every non-blank entry in namespaces is a +// valid Kubernetes namespace identifier — an RFC 1123 label, max 63 +// characters, lowercase alphanumerics and dashes only, no leading or +// trailing dash. Blank entries are ignored — they are dropped later by the +// cache builder anyway. The function returns a single error aggregating +// every offender so that the caller can surface them all at once. +func ValidateNamespaces(namespaces []string) error { + var invalid []string + + for _, ns := range namespaces { + ns = strings.TrimSpace(ns) + if ns == "" { + continue + } + + if errs := validation.IsDNS1123Label(ns); len(errs) > 0 { + invalid = append(invalid, fmt.Sprintf("%q: %s", ns, strings.Join(errs, "; "))) + } + } + + if len(invalid) == 0 { + return nil + } + + return fmt.Errorf("invalid --watch-namespaces value(s): %s", strings.Join(invalid, ", ")) +} + +// buildNamespaceConfig converts a canonicalised namespace list into a map +// suitable for cache.Options.DefaultNamespaces. The caller is expected to +// have already trimmed, deduplicated and stripped blanks via +// MergeWatchedNamespaces — this helper does no further normalisation. The +// result is nil when the effective set is empty so that callers can detect +// "no scoping requested" and leave the cache cluster-wide. +func buildNamespaceConfig(namespaces []string) map[string]cache.Config { + if len(namespaces) == 0 { + return nil + } + + out := make(map[string]cache.Config, len(namespaces)) + for _, ns := range namespaces { + out[ns] = cache.Config{} + } + + return out +} + +// applyOptions returns a new cache.Options with the configured resync period +// and (optionally) namespace scoping applied. The namespaces slice is +// canonicalised via MergeWatchedNamespaces before being installed, so callers +// that pass the raw flag value still end up with a deterministic cache +// configuration. Kept package-private so we can unit-test the behaviour +// without spinning up a real cache. +func applyOptions(opts cache.Options, resyncPeriod time.Duration, namespaces []string) cache.Options { + opts.SyncPeriod = &resyncPeriod + + if cfg := buildNamespaceConfig(MergeWatchedNamespaces(namespaces)); len(cfg) > 0 { + opts.DefaultNamespaces = cfg + } + + return opts +} diff --git a/internal/manager/cache_test.go b/internal/manager/cache_test.go new file mode 100644 index 00000000..90d7b45e --- /dev/null +++ b/internal/manager/cache_test.go @@ -0,0 +1,272 @@ +// Copyright 2022 Clastix Labs +// SPDX-License-Identifier: Apache-2.0 + +package manager + +import ( + "sort" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + "sigs.k8s.io/controller-runtime/pkg/cache" +) + +// Test fixture namespaces. Pulled out to package-level consts so goconst is +// happy and so an eventual rename does not require a sweeping diff. +const ( + teamA = "team-a" + teamB = "team-b" + kamajiNamespace = "kamaji-system" +) + +// buildNamespaceConfig is a thin map-builder that assumes its input has +// already been canonicalised by MergeWatchedNamespaces. Trim/dedup/blank +// behaviour is exercised by TestMergeWatchedNamespaces; here we only verify +// the empty/nil contract and the zero cache.Config invariant. +func TestBuildNamespaceConfig(t *testing.T) { + t.Parallel() + + cases := []struct { + name string + in []string + want []string + }{ + {"nil input keeps cluster-wide cache", nil, nil}, + {"empty slice keeps cluster-wide cache", []string{}, nil}, + {"single namespace", []string{teamA}, []string{teamA}}, + {"multiple namespaces preserve every key", []string{teamA, teamB}, []string{teamA, teamB}}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + got := buildNamespaceConfig(tc.in) + + if tc.want == nil { + require.Nil(t, got, "expected nil map so the cache stays cluster-wide") + + return + } + + require.Len(t, got, len(tc.want)) + + keys := make([]string, 0, len(got)) + for k, v := range got { + keys = append(keys, k) + require.Equal(t, cache.Config{}, v, "namespace %q must use the zero cache.Config so controller-runtime applies its defaults", k) + } + + sort.Strings(keys) + + want := append([]string(nil), tc.want...) + sort.Strings(want) + require.Equal(t, want, keys) + }) + } +} + +func TestBuildNamespaceConfig_DoesNotMutateInput(t *testing.T) { + t.Parallel() + + in := []string{teamA, teamB} + snapshot := append([]string(nil), in...) + + _ = buildNamespaceConfig(in) + + require.Equal(t, snapshot, in, "buildNamespaceConfig must not mutate its caller-provided slice") +} + +func TestApplyOptions_NoNamespacesKeepsCacheClusterWide(t *testing.T) { + t.Parallel() + + opts := applyOptions(cache.Options{}, 7*time.Hour, nil) + + require.NotNil(t, opts.SyncPeriod, "the resync period must always be applied") + require.Equal(t, 7*time.Hour, *opts.SyncPeriod) + require.Nil(t, opts.DefaultNamespaces, "no namespaces means controller-runtime watches cluster-wide") +} + +func TestApplyOptions_WithNamespacesScopesCache(t *testing.T) { + t.Parallel() + + opts := applyOptions(cache.Options{}, time.Minute, []string{teamA, teamB}) + + require.NotNil(t, opts.SyncPeriod) + require.Equal(t, time.Minute, *opts.SyncPeriod) + require.Len(t, opts.DefaultNamespaces, 2) + require.Contains(t, opts.DefaultNamespaces, teamA) + require.Contains(t, opts.DefaultNamespaces, teamB) +} + +func TestApplyOptions_AllBlankNamespacesKeepsCacheClusterWide(t *testing.T) { + t.Parallel() + + opts := applyOptions(cache.Options{}, time.Minute, []string{"", " "}) + + require.Nil(t, opts.DefaultNamespaces, "an effectively-empty namespace list must not collapse the cache to zero namespaces") +} + +func TestApplyOptions_PreservesCallerProvidedDefaultNamespacesWhenScopingDisabled(t *testing.T) { + t.Parallel() + + in := cache.Options{ + DefaultNamespaces: map[string]cache.Config{"caller-set": {}}, + } + + out := applyOptions(in, time.Minute, nil) + + require.Equal(t, in.DefaultNamespaces, out.DefaultNamespaces, "applyOptions must not overwrite a non-empty caller-provided DefaultNamespaces when the user did not request scoping") +} + +func TestApplyOptions_UserScopingOverridesCallerProvidedDefaultNamespaces(t *testing.T) { + t.Parallel() + + in := cache.Options{ + DefaultNamespaces: map[string]cache.Config{"caller-set": {}}, + } + + out := applyOptions(in, time.Minute, []string{teamA}) + + require.Len(t, out.DefaultNamespaces, 1) + require.Contains(t, out.DefaultNamespaces, teamA, "an explicit user-provided namespace list must take precedence over caller defaults") + require.NotContains(t, out.DefaultNamespaces, "caller-set") +} + +// TestApplyOptions_CanonicalisesRawInput verifies that callers passing the +// raw flag value (with whitespace/duplicates/blanks) still produce a clean +// DefaultNamespaces map, because applyOptions runs the input through +// MergeWatchedNamespaces internally. +func TestApplyOptions_CanonicalisesRawInput(t *testing.T) { + t.Parallel() + + out := applyOptions(cache.Options{}, time.Minute, []string{" team-a", teamA, "", "team-b\t"}) + + require.Len(t, out.DefaultNamespaces, 2) + require.Contains(t, out.DefaultNamespaces, teamA) + require.Contains(t, out.DefaultNamespaces, teamB) +} + +func TestNewCacheFunc_ReturnsNonNil(t *testing.T) { + t.Parallel() + + require.NotNil(t, NewCacheFunc(time.Hour, nil)) + require.NotNil(t, NewCacheFunc(time.Hour, []string{teamA})) +} + +func TestMergeWatchedNamespaces(t *testing.T) { + t.Parallel() + + cases := []struct { + name string + user []string + extras []string + expected []string + }{ + { + name: "empty user list returns nil regardless of extras", + user: nil, + extras: []string{kamajiNamespace}, + expected: nil, + }, + { + name: "all-blank user list returns nil regardless of extras", + user: []string{"", " ", "\t"}, + extras: []string{kamajiNamespace}, + expected: nil, + }, + { + name: "extras are appended when user provided some namespaces", + user: []string{teamA, teamB}, + extras: []string{kamajiNamespace}, + expected: []string{teamA, teamB, kamajiNamespace}, + }, + { + name: "blank extras are dropped", + user: []string{teamA}, + extras: []string{"", " "}, + expected: []string{teamA}, + }, + { + name: "duplicates between user and extras collapse, user-first wins", + user: []string{teamA, kamajiNamespace}, + extras: []string{kamajiNamespace}, + expected: []string{teamA, kamajiNamespace}, + }, + { + name: "duplicates inside the user list collapse without reordering", + user: []string{teamA, teamA, teamB}, + extras: nil, + expected: []string{teamA, teamB}, + }, + { + name: "whitespace is normalised before deduplication", + user: []string{" team-a", "team-a "}, + extras: []string{"\tteam-a"}, + expected: []string{teamA}, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + got := MergeWatchedNamespaces(tc.user, tc.extras...) + require.Equal(t, tc.expected, got) + }) + } +} + +func TestMergeWatchedNamespaces_DoesNotMutateInput(t *testing.T) { + t.Parallel() + + user := []string{" team-a ", teamA} + extras := []string{kamajiNamespace} + + userSnapshot := append([]string(nil), user...) + extrasSnapshot := append([]string(nil), extras...) + + _ = MergeWatchedNamespaces(user, extras...) + + require.Equal(t, userSnapshot, user, "MergeWatchedNamespaces must not mutate the user slice") + require.Equal(t, extrasSnapshot, extras, "MergeWatchedNamespaces must not mutate the extras slice") +} + +func TestValidateNamespaces(t *testing.T) { + t.Parallel() + + cases := []struct { + name string + in []string + expectErr bool + }{ + {"nil is valid", nil, false}, + {"empty slice is valid", []string{}, false}, + {"valid single label", []string{teamA}, false}, + {"valid multiple labels", []string{teamA, teamB, "kube-system"}, false}, + {"surrounding whitespace is tolerated", []string{" team-a ", "\tteam-b"}, false}, + {"blank entries are tolerated and ignored", []string{teamA, "", " "}, false}, + {"uppercase rejected", []string{"Team-A"}, true}, + {"underscore rejected", []string{"team_a"}, true}, + {"leading hyphen rejected", []string{"-team-a"}, true}, + {"trailing hyphen rejected", []string{"team-a-"}, true}, + {"too long rejected", []string{strings.Repeat("a", 64)}, true}, + {"empty after trim is allowed (caller skips it)", []string{" "}, false}, + {"valid + invalid mix surfaces error", []string{teamA, "Bad Name"}, true}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + err := ValidateNamespaces(tc.in) + if tc.expectErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +}