From b1c4b66a550622ecf4064b0b2be49c60f562df89 Mon Sep 17 00:00:00 2001 From: Paulo Guilherme Pinheiro Pereira Date: Fri, 15 May 2026 13:50:47 +0200 Subject: [PATCH 01/16] Update chart for compute-worker --- .../templates/compute-worker-deployment.yaml | 32 +++++++++++++++++-- charts/templates/compute-worker-rbac.yaml | 9 ++---- charts/values.yaml | 6 ++-- 3 files changed, 36 insertions(+), 11 deletions(-) diff --git a/charts/templates/compute-worker-deployment.yaml b/charts/templates/compute-worker-deployment.yaml index 8a6ddaab8..55a8332d6 100644 --- a/charts/templates/compute-worker-deployment.yaml +++ b/charts/templates/compute-worker-deployment.yaml @@ -26,13 +26,39 @@ spec: - bash - -c - > - watchmedo auto-restart -p '*.py' --recursive -- celery -A compute_worker worker -l info -Q compute-worker -n compute-worker{{ if not $isDefault }}-{{ .name }}{{ end }}@%n - workingDir: /app + celery -A compute_worker worker -l info -Q compute-worker -n compute-worker{{ if not $isDefault }}-{{ .name }}{{ end }}@%n + workingDir: / env: + - name: CONTAINER_ENGINE_EXECUTABLE + value: '{{ $.Values.compute_worker.container_engine_executable }}' + - name: USE_GPU + value: '{{ .gpu.enabled }}' + - name: RESOURCE_LIMITS + value: '{{ toJson .gpu.resourceLimits }}' + - name: NODE_SELECTOR + value: '{{ toJson .gpu.nodeSelector }}' + - name: TOTAL_TIME_TO_WAIT_FOR_POD + value: '{{ $.Values.compute_worker.podCreationRetries.totalTimeToWaitForPod }}' + - name: SLEEP_TIME_BETWEEN_RETRIES + value: '{{ $.Values.compute_worker.podCreationRetries.sleepTimeBetweenRetries }}' + - name: USERID + value: '{{ $.Values.compute_worker.submissionPods.securityContext.runAsUser }}' + - name: GROUPID + value: '{{ $.Values.compute_worker.submissionPods.securityContext.runAsGroup }}' + - name: FSGROUP + value: '{{ $.Values.compute_worker.submissionPods.securityContext.fsGroup }}' + - name: COMPUTE_WORKER_LABELS + value: '{{ toJson $.Values.compute_worker.submissionPods.metadata.labels }}' + - name: CURRENT_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: SHARED_JOB_PVC + value: '{{ $.Values.sharedJob.pvcName }}' - name: BROKER_URL value: "{{ if .url }}{{ .url }}{{ else }}pyamqp://{{ $.Values.env.RABBITMQ_DEFAULT_USER }}:{{ $.Values.env.RABBITMQ_DEFAULT_PASS }}@{{ $.Values.env.RABBITMQ_HOST }}:{{ $.Values.env.RABBITMQ_PORT }}//{{ end }}" - name: CODALAB_IGNORE_CLEANUP_STEP - value: "1" + value: '{{ $.Values.compute_worker.codalabIgnoreCleanupStep }}' {{- range $key, $value := $.Values.env }} - name: {{ $key }} value: "{{ $value }}" diff --git a/charts/templates/compute-worker-rbac.yaml b/charts/templates/compute-worker-rbac.yaml index 519db211e..f6e730047 100644 --- a/charts/templates/compute-worker-rbac.yaml +++ b/charts/templates/compute-worker-rbac.yaml @@ -10,15 +10,12 @@ metadata: name: compute-worker-role namespace: {{ .Release.Namespace }} rules: - - apiGroups: ["batch"] - resources: ["jobs"] - verbs: ["create", "get", "list", "watch", "delete"] - apiGroups: [""] resources: ["pods"] - verbs: ["get", "list", "watch", "delete"] + verbs: ["create", "get", "list", "watch", "delete", "deletecollection"] - apiGroups: [""] - resources: ["pods/exec", "pods/log"] - verbs: ["create", "get", "list"] + resources: ["pods/log"] + verbs: ["get", "list"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding diff --git a/charts/values.yaml b/charts/values.yaml index 41d5d25e9..dd8a9ab02 100644 --- a/charts/values.yaml +++ b/charts/values.yaml @@ -38,9 +38,10 @@ compute_worker: repository: tag: pullPolicy: Always + container_engine_executable: "kubernetes" podCreationRetries: - numberOfRetries: 30 - sleepTimeBetweenRetries: 10 + totalTimeToWaitForPod: 300 + sleepTimeBetweenRetries: 0.5 submissionPods: securityContext: runAsUser: 1000 @@ -53,6 +54,7 @@ compute_worker: memory: 512Mi volumes: pvcName: shared-job-pvc + codalabIgnoreCleanupStep: "0" brokers: - name: "default" gpu: From 32de97ecb23280fb755ff130059a7b198b6d5f15 Mon Sep 17 00:00:00 2001 From: Paulo Guilherme Pinheiro Pereira Date: Fri, 15 May 2026 15:31:20 +0200 Subject: [PATCH 02/16] Update cw dependencies --- compute_worker/pyproject.toml | 1 + compute_worker/uv.lock | 62 +++++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+) diff --git a/compute_worker/pyproject.toml b/compute_worker/pyproject.toml index ea41563ce..367f1a297 100644 --- a/compute_worker/pyproject.toml +++ b/compute_worker/pyproject.toml @@ -15,6 +15,7 @@ dependencies = [ "docker>=7.1.0,<8", "rich>=14.2.0,<15", "setuptools>=82.0.0", + "kubernetes==35.0.0", ] name = "compute-worker" diff --git a/compute_worker/uv.lock b/compute_worker/uv.lock index 6715c4671..8e98ed97d 100644 --- a/compute_worker/uv.lock +++ b/compute_worker/uv.lock @@ -162,6 +162,7 @@ dependencies = [ { name = "argh" }, { name = "celery" }, { name = "docker" }, + { name = "kubernetes" }, { name = "loguru" }, { name = "pyyaml" }, { name = "requests" }, @@ -177,6 +178,7 @@ requires-dist = [ { name = "argh", specifier = "==0.31.3" }, { name = "celery", specifier = "==5.6.2" }, { name = "docker", specifier = ">=7.1.0,<8" }, + { name = "kubernetes", specifier = "==35.0.0" }, { name = "loguru", specifier = ">=0.7.3,<0.8" }, { name = "pyyaml", specifier = "==6.0.3" }, { name = "requests", specifier = ">=2.32.4,<3" }, @@ -200,6 +202,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/e3/26/57c6fb270950d476074c087527a558ccb6f4436657314bfb6cdf484114c4/docker-7.1.0-py3-none-any.whl", hash = "sha256:c96b93b7f0a746f9e77d325bcfb87422a3d8bd4f03136ae8a85b37f1898d5fc0", size = 147774, upload-time = "2024-05-23T11:13:55.01Z" }, ] +[[package]] +name = "durationpy" +version = "0.10" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/9d/a4/e44218c2b394e31a6dd0d6b095c4e1f32d0be54c2a4b250032d717647bab/durationpy-0.10.tar.gz", hash = "sha256:1fa6893409a6e739c9c72334fc65cca1f355dbdd93405d30f726deb5bde42fba", size = 3335, upload-time = "2025-05-17T13:52:37.26Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b0/0d/9feae160378a3553fa9a339b0e9c1a048e147a4127210e286ef18b730f03/durationpy-0.10-py3-none-any.whl", hash = "sha256:3b41e1b601234296b4fb368338fdcd3e13e0b4fb5b67345948f4f2bf9868b286", size = 3922, upload-time = "2025-05-17T13:52:36.463Z" }, +] + [[package]] name = "idna" version = "3.15" @@ -224,6 +235,26 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/fb/0f/834427d8c03ff1d7e867d3db3d176470c64871753252b21b4f4897d1fa45/kombu-5.6.2-py3-none-any.whl", hash = "sha256:efcfc559da324d41d61ca311b0c64965ea35b4c55cc04ee36e55386145dace93", size = 214219, upload-time = "2025-12-29T20:30:05.74Z" }, ] +[[package]] +name = "kubernetes" +version = "35.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "certifi" }, + { name = "durationpy" }, + { name = "python-dateutil" }, + { name = "pyyaml" }, + { name = "requests" }, + { name = "requests-oauthlib" }, + { name = "six" }, + { name = "urllib3" }, + { name = "websocket-client" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/2c/8f/85bf51ad4150f64e8c665daf0d9dfe9787ae92005efb9a4d1cba592bd79d/kubernetes-35.0.0.tar.gz", hash = "sha256:3d00d344944239821458b9efd484d6df9f011da367ecb155dadf9513f05f09ee", size = 1094642, upload-time = "2026-01-16T01:05:27.76Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/0c/70/05b685ea2dffcb2adbf3cdcea5d8865b7bc66f67249084cf845012a0ff13/kubernetes-35.0.0-py2.py3-none-any.whl", hash = "sha256:39e2b33b46e5834ef6c3985ebfe2047ab39135d41de51ce7641a7ca5b372a13d", size = 2017602, upload-time = "2026-01-16T01:05:25.991Z" }, +] + [[package]] name = "loguru" version = "0.7.3" @@ -258,6 +289,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b3/38/89ba8ad64ae25be8de66a6d463314cf1eb366222074cfda9ee839c56a4b4/mdurl-0.1.2-py3-none-any.whl", hash = "sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8", size = 9979, upload-time = "2022-08-14T12:40:09.779Z" }, ] +[[package]] +name = "oauthlib" +version = "3.3.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/0b/5f/19930f824ffeb0ad4372da4812c50edbd1434f678c90c2733e1188edfc63/oauthlib-3.3.1.tar.gz", hash = "sha256:0f0f8aa759826a193cf66c12ea1af1637f87b9b4622d46e866952bb022e538c9", size = 185918, upload-time = "2025-06-19T22:48:08.269Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/be/9c/92789c596b8df838baa98fa71844d84283302f7604ed565dafe5a6b5041a/oauthlib-3.3.1-py3-none-any.whl", hash = "sha256:88119c938d2b8fb88561af5f6ee0eec8cc8d552b7bb1f712743136eb7523b7a1", size = 160065, upload-time = "2025-06-19T22:48:06.508Z" }, +] + [[package]] name = "packaging" version = "26.1" @@ -343,6 +383,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d7/8e/7540e8a2036f79a125c1d2ebadf69ed7901608859186c856fa0388ef4197/requests-2.33.1-py3-none-any.whl", hash = "sha256:4e6d1ef462f3626a1f0a0a9c42dd93c63bad33f9f1c1937509b8c5c8718ab56a", size = 64947, upload-time = "2026-03-30T16:09:13.83Z" }, ] +[[package]] +name = "requests-oauthlib" +version = "2.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "oauthlib" }, + { name = "requests" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/42/f2/05f29bc3913aea15eb670be136045bf5c5bbf4b99ecb839da9b422bb2c85/requests-oauthlib-2.0.0.tar.gz", hash = "sha256:b3dffaebd884d8cd778494369603a9e7b58d29111bf6b41bdc2dcd87203af4e9", size = 55650, upload-time = "2024-03-22T20:32:29.939Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/3b/5d/63d4ae3b9daea098d5d6f5da83984853c1bbacd5dc826764b249fe119d24/requests_oauthlib-2.0.0-py2.py3-none-any.whl", hash = "sha256:7dd8a5c40426b779b0868c404bdef9768deccf22749cde15852df527e6269b36", size = 24179, upload-time = "2024-03-22T20:32:28.055Z" }, +] + [[package]] name = "rich" version = "14.3.4" @@ -443,6 +496,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/68/5a/199c59e0a824a3db2b89c5d2dade7ab5f9624dbf6448dc291b46d5ec94d3/wcwidth-0.6.0-py3-none-any.whl", hash = "sha256:1a3a1e510b553315f8e146c54764f4fb6264ffad731b3d78088cdb1478ffbdad", size = 94189, upload-time = "2026-02-06T19:19:39.646Z" }, ] +[[package]] +name = "websocket-client" +version = "1.9.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/2c/41/aa4bf9664e4cda14c3b39865b12251e8e7d239f4cd0e3cc1b6c2ccde25c1/websocket_client-1.9.0.tar.gz", hash = "sha256:9e813624b6eb619999a97dc7958469217c3176312b3a16a4bd1bc7e08a46ec98", size = 70576, upload-time = "2025-10-07T21:16:36.495Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/34/db/b10e48aa8fff7407e67470363eac595018441cf32d5e1001567a7aeba5d2/websocket_client-1.9.0-py3-none-any.whl", hash = "sha256:af248a825037ef591efbf6ed20cc5faa03d3b47b9e5a2230a529eeee1c1fc3ef", size = 82616, upload-time = "2025-10-07T21:16:34.951Z" }, +] + [[package]] name = "websockets" version = "16.0" From 78b74c2928af2359b70217781352a574446bb10a Mon Sep 17 00:00:00 2001 From: Paulo Guilherme Pinheiro Pereira Date: Fri, 15 May 2026 15:38:23 +0200 Subject: [PATCH 03/16] Update django Containerfile --- packaging/container/Containerfile | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/packaging/container/Containerfile b/packaging/container/Containerfile index faa65f28f..331bc1826 100644 --- a/packaging/container/Containerfile +++ b/packaging/container/Containerfile @@ -1,3 +1,18 @@ +# Stage 1: Node.js builder +FROM node:lts-alpine3.23 AS builder + +WORKDIR /app + +COPY package.json package-lock.json* ./ +COPY src/static ./src/static + +RUN npm install \ + && export PATH=./node_modules/.bin:$PATH \ + && npm run build-stylus \ + && npm run build-riot \ + && npm run concat-riot + +# Stage 2: Python/Django FROM almalinux:10-minimal RUN microdnf install -y tar gzip @@ -16,4 +31,11 @@ COPY pyproject.toml uv.lock ./ RUN uv sync --all-extras --frozen WORKDIR /app + +COPY src ./src +COPY manage.py ./ + +# Copy built static files from Node builder +COPY --from=builder /app/src/static ./src/static + ENTRYPOINT ["/bin/bash", "-c"] From dede88106f728c155f34fb32a3bcf9b5ad7c980a Mon Sep 17 00:00:00 2001 From: Paulo Guilherme Pinheiro Pereira Date: Wed, 27 May 2026 14:50:57 +0200 Subject: [PATCH 04/16] Load kubernetes parameters --- compute_worker/compute_worker.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 243dcd16d..0fa84b88a 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -87,6 +87,7 @@ def to_bool(val): # Constants DOCKER = "docker" PODMAN = "podman" + KUBERNETES = "kubernetes" LOG_LEVEL_DEBUG = "debug" # Defaults @@ -113,6 +114,15 @@ def to_bool(val): WORKER_BUNDLE_URL_REWRITE = get("WORKER_BUNDLE_URL_REWRITE", "").strip() + # Kubernetes settings (only used when CONTAINER_ENGINE_EXECUTABLE=kubernetes) + SHARED_JOB_PVC = get("SHARED_JOB_PVC", "shared-job-pvc") + CURRENT_NAMESPACE = get("CURRENT_NAMESPACE", "default") + TOTAL_TIME_TO_WAIT_FOR_POD = float(get("TOTAL_TIME_TO_WAIT_FOR_POD", 300)) + SLEEP_TIME_BETWEEN_RETRIES = float(get("SLEEP_TIME_BETWEEN_RETRIES", 0.5)) + SUBMISSION_POD_USER_ID = int(get("USERID", 1000)) + SUBMISSION_POD_GROUP_ID = int(get("GROUPID", 1000)) + SUBMISSION_POD_FS_GROUP = int(get("FSGROUP", 1000)) + # ----------------------------------------------- # Program Kind From 4394ccf6f22e956ca002703461ca3b1999b6ed8d Mon Sep 17 00:00:00 2001 From: Paulo Guilherme Pinheiro Pereira Date: Wed, 27 May 2026 14:52:39 +0200 Subject: [PATCH 05/16] Instanciate docker.APIClient only if not using kubernetes --- compute_worker/compute_worker.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 0fa84b88a..51eed3e33 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -174,11 +174,21 @@ class SubmissionStatus: ) # Intializing client -# NOTE: CONTAINER_SOCKET is set in Settings based on CONTAINER_ENGINE_EXECUTABLE which must has either podman or docker -client = docker.APIClient( - base_url=Settings.CONTAINER_SOCKET, - version="auto", -) +if Settings.CONTAINER_ENGINE_EXECUTABLE != Settings.KUBERNETES: + # NOTE: CONTAINER_SOCKET is set in Settings based on CONTAINER_ENGINE_EXECUTABLE which must have either podman or docker + client = docker.APIClient( + base_url=Settings.CONTAINER_SOCKET, + version="auto", + ) +else: + client = None + import kubernetes + try: + kubernetes.config.load_incluster_config() + logger.info("Kubernetes in-cluster config loaded") + except kubernetes.config.ConfigException: + kubernetes.config.load_kube_config() + logger.info("Kubernetes kubeconfig loaded") # ----------------------------------------------- From 6085c4a947c34324bf8851362debe7e27a939e76 Mon Sep 17 00:00:00 2001 From: Paulo Guilherme Pinheiro Pereira Date: Wed, 27 May 2026 15:01:50 +0200 Subject: [PATCH 06/16] Add _run_pod --- compute_worker/compute_worker.py | 188 +++++++++++++++++++++++++++++++ 1 file changed, 188 insertions(+) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 51eed3e33..4466fae51 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -1030,6 +1030,194 @@ async def _run_container_engine_cmd(self, container, kind): # Communicate that the program is closing self.completed_program_counter += 1 + async def _run_pod(self, kind, command, volumes_config): + """Kubernetes equivalent of _run_container_engine_cmd. + + Creates a Pod, streams its logs via websocket, waits for completion, + and populates self.logs[kind] in the same format as _run_container_engine_cmd. + K8s merges stdout/stderr, so stderr will be empty bytes. + """ + import kubernetes + + core_v1 = kubernetes.client.CoreV1Api() + start = time.time() + + websocket = None + websocket_url = f"{self.websocket_url}?kind={kind}" + try: + logger.debug(f"Connecting to {websocket_url}") + websocket = await asyncio.wait_for( + websockets.connect(websocket_url), timeout=10.0 + ) + except Exception as e: + logger.error(f"Failed to connect to websocket: {e}") + + # Build VolumeMount list from Docker-style volumes_config + # {host_path: {"bind": "/app/...", "mode": "z"/"ro"}} + # subPath is the path within the PVC (relative to HOST_DIRECTORY = PVC root) + volume_mounts = [] + for host_path, vol_config in volumes_config.items(): + volume_mounts.append( + kubernetes.client.V1VolumeMount( + name="shared-storage", + mount_path=vol_config["bind"], + sub_path=os.path.relpath(host_path, Settings.HOST_DIRECTORY), + read_only=(vol_config.get("mode") == "ro"), + ) + ) + + resources = node_selector = None + if Settings.USE_GPU: + try: + resources = kubernetes.client.V1ResourceRequirements( + limits=json.loads(os.getenv("RESOURCE_LIMITS", "{}")) + ) + node_selector = json.loads(os.getenv("NODE_SELECTOR", "{}")) or None + except json.JSONDecodeError: + logger.warning("Failed to parse RESOURCE_LIMITS or NODE_SELECTOR, ignoring GPU config") + + try: + labels = json.loads(os.getenv("COMPUTE_WORKER_LABELS", "{}")) + except json.JSONDecodeError: + labels = {} + labels["submission_id"] = str(self.submission_id) + + pod_spec = kubernetes.client.V1Pod( + metadata=kubernetes.client.V1ObjectMeta( + generate_name=f"codabench-{kind.replace('_', '-')}-", + namespace=Settings.CURRENT_NAMESPACE, + labels=labels, + ), + spec=kubernetes.client.V1PodSpec( + restart_policy="Never", + node_selector=node_selector, + security_context=kubernetes.client.V1PodSecurityContext( + fs_group=Settings.SUBMISSION_POD_FS_GROUP, + ), + containers=[ + kubernetes.client.V1Container( + name="runner", + image=self.container_image, + command=["sh", "-c", command], + working_dir="/app/program", + env=[kubernetes.client.V1EnvVar(name="PYTHONUNBUFFERED", value="1")], + volume_mounts=volume_mounts, + resources=resources, + security_context=kubernetes.client.V1SecurityContext( + allow_privilege_escalation=False, + run_as_non_root=True, + run_as_user=Settings.SUBMISSION_POD_USER_ID, + run_as_group=Settings.SUBMISSION_POD_GROUP_ID, + capabilities=kubernetes.client.V1Capabilities(drop=["ALL"]), + ), + ) + ], + volumes=[ + kubernetes.client.V1Volume( + name="shared-storage", + persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( + claim_name=Settings.SHARED_JOB_PVC, + ), + ) + ], + ), + ) + + pod = core_v1.create_namespaced_pod(namespace=Settings.CURRENT_NAMESPACE, body=pod_spec) + pod_name = pod.metadata.name + logger.info(f"Created pod {pod_name} for {kind}") + + # Wait for pod to leave Pending state + elapsed = 0.0 + while elapsed < Settings.TOTAL_TIME_TO_WAIT_FOR_POD: + try: + pod = core_v1.read_namespaced_pod(pod_name, Settings.CURRENT_NAMESPACE) + if pod.status.phase in ("Running", "Succeeded", "Failed"): + logger.info(f"Pod {pod_name} is {pod.status.phase}") + break + if pod.status.container_statuses: + waiting = pod.status.container_statuses[0].state.waiting + if waiting and waiting.reason in ("ImagePullBackOff", "ErrImagePull"): + raise DockerImagePullException(f"Image pull failed: {waiting.message}") + except DockerImagePullException: + raise + except Exception as e: + logger.error(f"Error checking pod status: {e}") + await asyncio.sleep(Settings.SLEEP_TIME_BETWEEN_RETRIES) + elapsed += Settings.SLEEP_TIME_BETWEEN_RETRIES + + if elapsed >= Settings.TOTAL_TIME_TO_WAIT_FOR_POD: + raise SubmissionException( + f"Pod {pod_name} did not start within {Settings.TOTAL_TIME_TO_WAIT_FOR_POD}s" + ) + + # Stream logs (K8s merges stdout/stderr; all output goes to stdout) + stdout = b"" + try: + log_stream = core_v1.read_namespaced_pod_log( + name=pod_name, + namespace=Settings.CURRENT_NAMESPACE, + follow=True, + _preload_content=False, + ) + for line in log_stream: + stdout += line + decoded = line.decode(errors="ignore") + logger.info(decoded.rstrip()) + if websocket: + try: + await websocket.send(json.dumps({"kind": kind, "message": decoded})) + except Exception as e: + logger.error(f"Error sending log to websocket: {e}") + except Exception as e: + logger.error(f"Error streaming pod logs: {e}") + if Settings.LOG_LEVEL == Settings.LOG_LEVEL_DEBUG: + logger.exception(e) + + # Get final exit code + return_code = 1 + elapsed = 0.0 + while elapsed < Settings.TOTAL_TIME_TO_WAIT_FOR_POD: + try: + pod = core_v1.read_namespaced_pod(pod_name, Settings.CURRENT_NAMESPACE) + if pod.status.container_statuses: + terminated = pod.status.container_statuses[0].state.terminated + if terminated: + return_code = terminated.exit_code + break + except Exception as e: + logger.error(f"Error getting pod exit code: {e}") + await asyncio.sleep(Settings.SLEEP_TIME_BETWEEN_RETRIES) + elapsed += Settings.SLEEP_TIME_BETWEEN_RETRIES + + logger.info(f"Pod {pod_name} exited with code {return_code}") + + if websocket: + try: + await websocket.close() + except Exception: + pass + + self.logs[kind] = { + "returncode": return_code, + "start": start, + "end": time.time(), + "stdout": { + "data": stdout, + "stream": stdout, + "continue": True, + "location": self.stdout if kind == ProgramKind.SCORING_PROGRAM else self.ingestion_stdout, + }, + "stderr": { + # K8s does not separate stderr; empty bytes keeps push_logs happy + "data": b"", + "stream": b"", + "continue": True, + "location": self.stderr if kind == ProgramKind.SCORING_PROGRAM else self.ingestion_stderr, + }, + } + self.completed_program_counter += 1 + def _get_host_path(self, *paths): """Turns an absolute path inside our container, into what the path would be on the host machine. We also ensure that the directory exists, From d7c2bd53838d92f23d88733efb83cb54a63baff2 Mon Sep 17 00:00:00 2001 From: Paulo Guilherme Pinheiro Pereira Date: Wed, 27 May 2026 15:03:35 +0200 Subject: [PATCH 07/16] Add _delete_submission_pods --- compute_worker/compute_worker.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 4466fae51..9fd30b277 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -1738,3 +1738,16 @@ def clean_up(self): logger.info(f"Destroying submission temp dir: {self.root_dir}") shutil.rmtree(self.root_dir) + + def _delete_submission_pods(self): + import kubernetes + try: + core_v1 = kubernetes.client.CoreV1Api() + core_v1.delete_collection_namespaced_pod( + namespace=Settings.CURRENT_NAMESPACE, + label_selector=f"submission_id={self.submission_id}", + body=kubernetes.client.V1DeleteOptions(propagation_policy="Foreground"), + ) + logger.info(f"Cleaned up Kubernetes pods for submission {self.submission_id}") + except Exception as e: + logger.warning(f"Could not clean up K8s pods for submission {self.submission_id}: {e}") From dfefb8ca2de50b0ad5a6ce4d73d45ea22cdfc1fe Mon Sep 17 00:00:00 2001 From: Paulo Guilherme Pinheiro Pereira Date: Wed, 27 May 2026 15:06:54 +0200 Subject: [PATCH 08/16] Run submissions on k8s when CONTAINER_ENGINE_EXECUTABLE is set to kubernetes --- compute_worker/compute_worker.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 9fd30b277..91d343183 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -1326,7 +1326,14 @@ async def _run_program_directory(self, kind, program_dir): ingestion_only_during_scoring=self.ingestion_only_during_scoring, ) - # Create container + if Settings.CONTAINER_ENGINE_EXECUTABLE == Settings.KUBERNETES: + try: + return await self._run_pod(kind=kind, command=command, volumes_config=volumes_config) + except Exception as e: + logger.exception("Pod execution failed") + raise SubmissionException(str(e)) + + # Create container (Docker/Podman path) container_name = self.ingestion_program_container_name if kind == ProgramKind.INGESTION_PROGRAM else self.scoring_program_container_name container = self._create_container( container_name=container_name, From c2b09a3ddf6cd4f040a0f20d2ac30d6c8c9854ec Mon Sep 17 00:00:00 2001 From: Paulo Guilherme Pinheiro Pereira Date: Wed, 27 May 2026 15:08:06 +0200 Subject: [PATCH 09/16] Skip image pulling when uisng kubernetes --- compute_worker/compute_worker.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 91d343183..362dc33ac 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -1494,7 +1494,9 @@ def prepare(self): # Before the run starts we want to download images, they may take a while to download # and to do this during the run would subtract from the participants time. - self._get_container_image(self.container_image) + # Kubernetes pulls the image automatically when the pod starts. + if Settings.CONTAINER_ENGINE_EXECUTABLE != Settings.KUBERNETES: + self._get_container_image(self.container_image) self._update_status(SubmissionStatus.RUNNING) def start(self): From dd3e89cc0c6a6da4bdd13377d639516b3f62f8a0 Mon Sep 17 00:00:00 2001 From: Paulo Guilherme Pinheiro Pereira Date: Wed, 27 May 2026 15:12:08 +0200 Subject: [PATCH 10/16] Delete pods for timeout --- compute_worker/compute_worker.py | 68 +++++++++++++++++--------------- 1 file changed, 37 insertions(+), 31 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 362dc33ac..ff2138ffd 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -1555,22 +1555,24 @@ def start(self): "is_scoring": self.is_scoring, } - # Cleanup containers - containers_to_kill = [ - self.ingestion_program_container_name, - self.scoring_program_container_name - ] - logger.debug("Trying to kill and remove container " + str(containers_to_kill)) - - for container in containers_to_kill: - try: - client.remove_container(str(container), v=True, force=True) - except docker.errors.APIError as e: - logger.error(e) - except Exception as e: - logger.error(f"There was a problem killing {containers_to_kill}: {e}") - if Settings.LOG_LEVEL == Settings.LOG_LEVEL_DEBUG: - logger.exception(e) + # Cleanup containers / pods + if Settings.CONTAINER_ENGINE_EXECUTABLE == Settings.KUBERNETES: + self._delete_submission_pods() + else: + containers_to_kill = [ + self.ingestion_program_container_name, + self.scoring_program_container_name + ] + logger.debug("Trying to kill and remove container " + str(containers_to_kill)) + for container in containers_to_kill: + try: + client.remove_container(str(container), force=True) + except docker.errors.APIError as e: + logger.error(e) + except Exception as e: + logger.error(f"There was a problem killing {containers_to_kill}: {e}") + if Settings.LOG_LEVEL == Settings.LOG_LEVEL_DEBUG: + logger.exception(e) # Send data to be written to ingestion/scoring std_err self._update_submission(execution_time_limit_exceeded_data) @@ -1609,21 +1611,22 @@ def start(self): ) if return_code is None: logger.warning("No return code from Process. Killing it") - if kind == ProgramKind.INGESTION_PROGRAM: - containers_to_kill = self.ingestion_program_container_name - else: - containers_to_kill = self.scoring_program_container_name - try: - client.kill(containers_to_kill) - client.remove_container(containers_to_kill, v=True, force=True) - except docker.errors.APIError as e: - logger.error(e) - except Exception as e: - logger.error( - f"There was a problem killing {containers_to_kill}: {e}" - ) - if Settings.LOG_LEVEL == Settings.LOG_LEVEL_DEBUG: - logger.exception(e) + if Settings.CONTAINER_ENGINE_EXECUTABLE != Settings.KUBERNETES: + if kind == ProgramKind.INGESTION_PROGRAM: + containers_to_kill = self.ingestion_program_container_name + else: + containers_to_kill = self.scoring_program_container_name + try: + client.kill(containers_to_kill) + client.remove_container(containers_to_kill, force=True) + except docker.errors.APIError as e: + logger.error(e) + except Exception as e: + logger.error( + f"There was a problem killing {containers_to_kill}: {e}" + ) + if Settings.LOG_LEVEL == Settings.LOG_LEVEL_DEBUG: + logger.exception(e) if kind == ProgramKind.SCORING_PROGRAM: self.scoring_program_exit_code = return_code self.scoring_program_elapsed_time = elapsed_time @@ -1640,6 +1643,9 @@ def start(self): # set logs of this kind to None, since we handled them already logger.info("Program finished") + + if Settings.CONTAINER_ENGINE_EXECUTABLE == Settings.KUBERNETES: + self._delete_submission_pods() signal.alarm(0) if self.is_scoring: From 7787cb7b4abf5fb7c47565816d2d92f894462b32 Mon Sep 17 00:00:00 2001 From: Paulo Guilherme Pinheiro Pereira Date: Wed, 27 May 2026 15:13:18 +0200 Subject: [PATCH 11/16] Raise DockerImagePullException properly --- compute_worker/compute_worker.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index ff2138ffd..10ca731f4 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -1329,6 +1329,8 @@ async def _run_program_directory(self, kind, program_dir): if Settings.CONTAINER_ENGINE_EXECUTABLE == Settings.KUBERNETES: try: return await self._run_pod(kind=kind, command=command, volumes_config=volumes_config) + except DockerImagePullException: + raise except Exception as e: logger.exception("Pod execution failed") raise SubmissionException(str(e)) From a9c535a76a9248c3a2b4e14940ced0f3b607d3a9 Mon Sep 17 00:00:00 2001 From: Paulo Guilherme Pinheiro Pereira Date: Wed, 27 May 2026 16:17:53 +0200 Subject: [PATCH 12/16] Make KUBERNETES_VERIFY_SSL configurable --- charts/templates/compute-worker-deployment.yaml | 2 ++ charts/values.yaml | 1 + compute_worker/compute_worker.py | 8 ++++++++ 3 files changed, 11 insertions(+) diff --git a/charts/templates/compute-worker-deployment.yaml b/charts/templates/compute-worker-deployment.yaml index 55a8332d6..f032d0241 100644 --- a/charts/templates/compute-worker-deployment.yaml +++ b/charts/templates/compute-worker-deployment.yaml @@ -55,6 +55,8 @@ spec: fieldPath: metadata.namespace - name: SHARED_JOB_PVC value: '{{ $.Values.sharedJob.pvcName }}' + - name: KUBERNETES_VERIFY_SSL + value: '{{ $.Values.compute_worker.kubernetesVerifySsl }}' - name: BROKER_URL value: "{{ if .url }}{{ .url }}{{ else }}pyamqp://{{ $.Values.env.RABBITMQ_DEFAULT_USER }}:{{ $.Values.env.RABBITMQ_DEFAULT_PASS }}@{{ $.Values.env.RABBITMQ_HOST }}:{{ $.Values.env.RABBITMQ_PORT }}//{{ end }}" - name: CODALAB_IGNORE_CLEANUP_STEP diff --git a/charts/values.yaml b/charts/values.yaml index dd8a9ab02..bee7290f2 100644 --- a/charts/values.yaml +++ b/charts/values.yaml @@ -39,6 +39,7 @@ compute_worker: tag: pullPolicy: Always container_engine_executable: "kubernetes" + kubernetesVerifySsl: "false" podCreationRetries: totalTimeToWaitForPod: 300 sleepTimeBetweenRetries: 0.5 diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 10ca731f4..e186b85d6 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -122,6 +122,8 @@ def to_bool(val): SUBMISSION_POD_USER_ID = int(get("USERID", 1000)) SUBMISSION_POD_GROUP_ID = int(get("GROUPID", 1000)) SUBMISSION_POD_FS_GROUP = int(get("FSGROUP", 1000)) + # Set to false for clusters whose CA cert is missing the Authority Key Identifier extension + KUBERNETES_VERIFY_SSL = to_bool(get("KUBERNETES_VERIFY_SSL", "true")) # ----------------------------------------------- @@ -189,6 +191,12 @@ class SubmissionStatus: except kubernetes.config.ConfigException: kubernetes.config.load_kube_config() logger.info("Kubernetes kubeconfig loaded") + if not Settings.KUBERNETES_VERIFY_SSL: + # https://github.com/kubernetes-client/python/issues/2329 + _k8s_conf = kubernetes.client.Configuration.get_default_copy() + _k8s_conf.verify_ssl = False + kubernetes.client.Configuration.set_default(_k8s_conf) + logger.warning("Kubernetes SSL verification disabled (KUBERNETES_VERIFY_SSL=false)") # ----------------------------------------------- From 41b35c244fc0b729acc7bbd2943682bc942d4961 Mon Sep 17 00:00:00 2001 From: Paulo Guilherme Pinheiro Pereira Date: Tue, 30 Jun 2026 17:17:03 +0200 Subject: [PATCH 13/16] Fix loading of COMPUTE_WORKER_LABELS when it's none --- compute_worker/compute_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index e186b85d6..35216338a 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -1085,7 +1085,7 @@ async def _run_pod(self, kind, command, volumes_config): logger.warning("Failed to parse RESOURCE_LIMITS or NODE_SELECTOR, ignoring GPU config") try: - labels = json.loads(os.getenv("COMPUTE_WORKER_LABELS", "{}")) + labels = json.loads(os.getenv("COMPUTE_WORKER_LABELS", "{}")) or {} except json.JSONDecodeError: labels = {} labels["submission_id"] = str(self.submission_id) From edfd2c459c73d172e03d3260ddbfe9661c470fd2 Mon Sep 17 00:00:00 2001 From: Paulo Guilherme Pinheiro Pereira Date: Thu, 2 Jul 2026 09:06:06 +0200 Subject: [PATCH 14/16] Restore remove also volumes when removing container --- compute_worker/compute_worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 35216338a..7a72a6f5a 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -1576,7 +1576,7 @@ def start(self): logger.debug("Trying to kill and remove container " + str(containers_to_kill)) for container in containers_to_kill: try: - client.remove_container(str(container), force=True) + client.remove_container(str(container), v=True, force=True) except docker.errors.APIError as e: logger.error(e) except Exception as e: @@ -1628,7 +1628,7 @@ def start(self): containers_to_kill = self.scoring_program_container_name try: client.kill(containers_to_kill) - client.remove_container(containers_to_kill, force=True) + client.remove_container(containers_to_kill, v=True, force=True) except docker.errors.APIError as e: logger.error(e) except Exception as e: From 1d36ca66f8dc4d1a09680cf43ed4cd8c6e2e9c8e Mon Sep 17 00:00:00 2001 From: Paulo Guilherme Pinheiro Pereira Date: Thu, 2 Jul 2026 09:09:12 +0200 Subject: [PATCH 15/16] Stop mounting docker socket --- charts/templates/compute-worker-deployment.yaml | 6 ------ 1 file changed, 6 deletions(-) diff --git a/charts/templates/compute-worker-deployment.yaml b/charts/templates/compute-worker-deployment.yaml index f032d0241..988ad9123 100644 --- a/charts/templates/compute-worker-deployment.yaml +++ b/charts/templates/compute-worker-deployment.yaml @@ -68,15 +68,9 @@ spec: resources: {{- toYaml $.Values.compute_worker.resources | nindent 12 }} volumeMounts: - - name: docker-socket - mountPath: /var/run/docker.sock - name: codabench-storage mountPath: /codabench volumes: - - name: docker-socket - hostPath: - path: /var/run/docker.sock - type: Socket - name: codabench-storage persistentVolumeClaim: claimName: {{ $.Values.compute_worker.volumes.pvcName }} From ed7c02e55182e2fa724e0cf9ce1bb69c16ad5fee Mon Sep 17 00:00:00 2001 From: Paulo Guilherme Pinheiro Pereira Date: Thu, 2 Jul 2026 14:03:33 +0200 Subject: [PATCH 16/16] Add metadata field on values.yaml --- charts/values.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/charts/values.yaml b/charts/values.yaml index bee7290f2..195730c4b 100644 --- a/charts/values.yaml +++ b/charts/values.yaml @@ -48,6 +48,8 @@ compute_worker: runAsUser: 1000 runAsGroup: 1000 fsGroup: 1000 + metadata: + labels: {} resources: requests: memory: 256Mi