Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions docs/development/extensions-core/k8s-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,71 @@ Set the `podTemplateSelectionKey` key in a task's context to pick a configured p

This is gated by the runtime property `druid.indexer.runner.allowTaskPodTemplateSelection`, which defaults to `false`. If the key doesn't match any configured template, the task fails to launch.

##### Override task pod resources via context

Set the `k8sTaskResources` key in a task's context to override Kubernetes resource requests and limits for the task
container. Resource names are Kubernetes resource names, so this supports standard resources such as `cpu`, `memory`,
and `ephemeral-storage`, as well as cluster-specific extended resources.

Resource entries at the top level are applied to both requests and limits. Use the `requests` and `limits` objects when
the request and limit should differ.

```json
"context": {
"k8sTaskResources": {
"cpu": "2",
"memory": "8Gi",
"ephemeral-storage": "100Gi",
"requests": {
"example.com/custom-resource": "1"
},
"limits": {
"example.com/custom-resource": "1"
}
}
}
```

Use `byTaskType` to override resources for specific task types. Druid applies the generic resource entries first, then
applies the section matching the task's concrete `type`. This is useful for `index_parallel`, whose supervisor task
creates subtasks with different task types. The subtask context inherits the supervisor context, so each subtask can
resolve its own resource override when the Kubernetes task runner launches it.

```json
"context": {
"k8sTaskResources": {
"cpu": "1",
"memory": "4Gi",
"byTaskType": {
"partial_index_generate": {
"cpu": "2",
"memory": "8Gi",
"ephemeral-storage": "200Gi"
},
"partial_index_generic_merge": {
"requests": {
"cpu": "500m",
"memory": "2Gi"
},
"limits": {
"cpu": "1",
"memory": "4Gi"
}
}
}
}
}
```

Common parallel indexing task types include `index_parallel`, `single_phase_sub_task`, `partial_dimension_cardinality`,
`partial_dimension_distribution`, `partial_index_generate`, `partial_range_index_generate`, and
`partial_index_generic_merge`.

For `overlordSingleContainer` and `overlordMultiContainer`, Druid still computes default `cpu` and `memory` resources
from `druid.indexer.runner.cpuCoreInMicro` and `druid.indexer.runner.javaOptsArray`; `k8sTaskResources` overrides only
the resources it names. For `customTemplateAdapter`, Druid applies the overrides to the first container in the selected
pod template. Existing resource entries that are not overridden are preserved.

#### Running Task Pods in Another Namespace

It is possible to run task pods in a different namespace from the rest of your Druid cluster.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class DruidK8sConstants
public static final String DRUID_LABEL_PREFIX = "druid.";
public static final String BASE_TEMPLATE_NAME = "base";
public static final String TASK_CONTEXT_POD_TEMPLATE_SELECTION_KEY = "podTemplateSelectionKey";
public static final String TASK_CONTEXT_RESOURCES_KEY = "k8sTaskResources";
public static final long MAX_ENV_VARIABLE_KBS = 130048; // 127 KB

public static final ImmutableList<String> BLACKLISTED_PEON_POD_ERROR_MESSAGES = ImmutableList.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ void addEnvironmentVariables(Container mainContainer, PeonCommandContext context

protected Container setupMainContainer(
PodSpec podSpec,
Task task,
PeonCommandContext context,
long containerSize,
String taskContents
Expand Down Expand Up @@ -374,6 +375,7 @@ protected Container setupMainContainer(
containerSize,
context.getCpuMicroCore()
);
requirements = K8sTaskResourceContext.applyTaskResourceOverrides(requirements, task);
mainContainer.setResources(requirements);
return mainContainer;
}
Expand Down Expand Up @@ -517,26 +519,26 @@ private List<String> generateCommand(Task task)
@VisibleForTesting
static ResourceRequirements getResourceRequirements(ResourceRequirements requirements, long containerSize, int cpuMicroCore)
{
Map<String, Quantity> resourceMap = new HashMap<>();
final Map<String, Quantity> resourceMap = new HashMap<>();
resourceMap.put(
"cpu",
new Quantity(String.valueOf(cpuMicroCore > 0 ? cpuMicroCore : DruidK8sConstants.DEFAULT_CPU_MILLICORES), "m")
);
resourceMap.put("memory", new Quantity(String.valueOf(containerSize)));
ResourceRequirementsBuilder result = new ResourceRequirementsBuilder();
final ResourceRequirementsBuilder result = new ResourceRequirementsBuilder();
if (requirements != null) {
if (requirements.getRequests() == null || requirements.getRequests().isEmpty()) {
requirements.setRequests(resourceMap);
requirements.setRequests(new HashMap<>(resourceMap));
} else {
requirements.getRequests().putAll(resourceMap);
}
if (requirements.getLimits() == null || requirements.getLimits().isEmpty()) {
requirements.setLimits(resourceMap);
requirements.setLimits(new HashMap<>(resourceMap));
} else {
requirements.getLimits().putAll(resourceMap);
}
} else {
requirements = result.withRequests(resourceMap).withLimits(resourceMap).build();
requirements = result.withRequests(new HashMap<>(resourceMap)).withLimits(new HashMap<>(resourceMap)).build();
}
return requirements;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.k8s.overlord.taskadapter;

import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ResourceRequirements;
import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;

import java.util.HashMap;
import java.util.Map;

class K8sTaskResourceContext
{
static final String REQUESTS_KEY = "requests";
static final String LIMITS_KEY = "limits";
static final String BY_TASK_TYPE_KEY = "byTaskType";

static ResourceRequirements applyTaskResourceOverrides(ResourceRequirements requirements, Task task)
{
final Object rawResourceContext = task.getContextValue(DruidK8sConstants.TASK_CONTEXT_RESOURCES_KEY);
if (rawResourceContext == null) {
return requirements;
}

final Map<?, ?> resourceContext = asMap(rawResourceContext, DruidK8sConstants.TASK_CONTEXT_RESOURCES_KEY);
final ResourceRequirements result = applyResourceConfig(
requirements,
resourceContext,
DruidK8sConstants.TASK_CONTEXT_RESOURCES_KEY
);

final Object rawByTaskType = resourceContext.get(BY_TASK_TYPE_KEY);
if (rawByTaskType == null) {
return result;
}

final String taskTypePath = DruidK8sConstants.TASK_CONTEXT_RESOURCES_KEY + "." + BY_TASK_TYPE_KEY;
final Map<?, ?> byTaskType = asMap(rawByTaskType, taskTypePath);
final Object rawTaskTypeResources = byTaskType.get(task.getType());
if (rawTaskTypeResources == null) {
return result;
}

return applyResourceConfig(
result,
asMap(rawTaskTypeResources, taskTypePath + "." + task.getType()),
taskTypePath + "." + task.getType()
);
}

private static ResourceRequirements applyResourceConfig(
ResourceRequirements requirements,
Map<?, ?> resourceConfig,
String contextPath
)
{
final Map<String, Quantity> sharedResources = getSharedResources(resourceConfig, contextPath);
final Map<String, Quantity> requestResources = getResourceMap(resourceConfig, REQUESTS_KEY, contextPath);
final Map<String, Quantity> limitResources = getResourceMap(resourceConfig, LIMITS_KEY, contextPath);

if (sharedResources.isEmpty() && requestResources.isEmpty() && limitResources.isEmpty()) {
return requirements;
}

final ResourceRequirements result = requirements == null
? new ResourceRequirementsBuilder().build()
: requirements;

if (!sharedResources.isEmpty()) {
addRequests(result, sharedResources);
addLimits(result, sharedResources);
}
if (!requestResources.isEmpty()) {
addRequests(result, requestResources);
}
if (!limitResources.isEmpty()) {
addLimits(result, limitResources);
}

return result;
}

private static Map<String, Quantity> getSharedResources(Map<?, ?> resourceConfig, String contextPath)
{
final Map<String, Quantity> resources = new HashMap<>();
for (Map.Entry<?, ?> entry : resourceConfig.entrySet()) {
final String key = asStringKey(entry.getKey(), contextPath);
if (REQUESTS_KEY.equals(key) || LIMITS_KEY.equals(key) || BY_TASK_TYPE_KEY.equals(key)) {
continue;
}
resources.put(key, asQuantity(entry.getValue(), contextPath + "." + key));
}
return resources;
}

private static Map<String, Quantity> getResourceMap(Map<?, ?> resourceConfig, String resourceType, String contextPath)
{
final Object rawResources = resourceConfig.get(resourceType);
if (rawResources == null) {
return Map.of();
}

final String resourcePath = contextPath + "." + resourceType;
final Map<?, ?> rawResourceMap = asMap(rawResources, resourcePath);
final Map<String, Quantity> resources = new HashMap<>();
for (Map.Entry<?, ?> entry : rawResourceMap.entrySet()) {
final String key = asStringKey(entry.getKey(), resourcePath);
resources.put(key, asQuantity(entry.getValue(), resourcePath + "." + key));
}
return resources;
}

private static void addRequests(ResourceRequirements requirements, Map<String, Quantity> resources)
{
if (requirements.getRequests() == null) {
requirements.setRequests(new HashMap<>());
}
requirements.getRequests().putAll(resources);
}

private static void addLimits(ResourceRequirements requirements, Map<String, Quantity> resources)
{
if (requirements.getLimits() == null) {
requirements.setLimits(new HashMap<>());
}
requirements.getLimits().putAll(resources);
}

private static Map<?, ?> asMap(Object value, String contextPath)
{
if (value instanceof Map) {
return (Map<?, ?>) value;
}

throw InvalidInput.exception("Task context value [%s] must be an object.", contextPath);
}

private static String asStringKey(Object key, String contextPath)
{
if (key instanceof String) {
return (String) key;
}

throw InvalidInput.exception("Task context value [%s] must contain only string keys.", contextPath);
}

private static Quantity asQuantity(Object value, String contextPath)
{
if (value instanceof Quantity) {
return (Quantity) value;
} else if (value instanceof String || value instanceof Number) {
return new Quantity(String.valueOf(value));
}

throw InvalidInput.exception("Task context value [%s] must be a string or number.", contextPath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ Job createJobFromPodSpec(PodSpec podSpec, Task task, PeonCommandContext context)
// compress the task.json to set as an env variables
String taskContents = Base64Compression.compressBase64(mapper.writeValueAsString(task));

setupMainContainer(podSpec, context, containerSize, taskContents);
setupMainContainer(podSpec, task, context, containerSize, taskContents);

// add any optional annotations or labels.
Map<String, String> annotations = addJobSpecificAnnotations(context, k8sTaskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.EnvVarBuilder;
import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder;
Expand Down Expand Up @@ -119,17 +120,17 @@ public String getAdapterType()
@Override
public Job fromTask(Task task) throws IOException
{
Optional<PodTemplateWithName> selectedPodTemplate = podTemplateSelector.getPodTemplateForTask(task);
final Optional<PodTemplateWithName> selectedPodTemplate = podTemplateSelector.getPodTemplateForTask(task);
if (selectedPodTemplate == null || !selectedPodTemplate.isPresent()) {
throw InternalServerError.exception(
"Could not find pod template for task [%s]."
+ " Check the overlord logs for errors selecting the pod template",
task.getId()
);
}
PodTemplateWithName podTemplateWithName = selectedPodTemplate.get();
final PodTemplateWithName podTemplateWithName = selectedPodTemplate.get();

return new JobBuilder()
final Job job = new JobBuilder()
.withNewMetadata()
.withName(new K8sTaskId(taskRunnerConfig.getK8sTaskPodNamePrefix(), task).getK8sJobName())
.addToLabels(getJobLabels(taskRunnerConfig, task))
Expand Down Expand Up @@ -157,6 +158,16 @@ public Job fromTask(Task task) throws IOException
.getStandardSeconds())
.endSpec()
.build();
applyTaskResourceOverrides(job, task);
return job;
}

private void applyTaskResourceOverrides(Job job, Task task)
{
final Container mainContainer = job.getSpec().getTemplate().getSpec().getContainers().get(0);
mainContainer.setResources(
K8sTaskResourceContext.applyTaskResourceOverrides(mainContainer.getResources(), task)
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ Job createJobFromPodSpec(PodSpec podSpec, Task task, PeonCommandContext context)
// compress the task.json to set as an env variables
String taskContents = Base64Compression.compressBase64(mapper.writeValueAsString(task));

Container mainContainer = setupMainContainer(podSpec, context, containerSize, taskContents);
Container mainContainer = setupMainContainer(podSpec, task, context, containerSize, taskContents);

// add any optional annotations or labels.
Map<String, String> annotations = addJobSpecificAnnotations(context, k8sTaskId);
Expand Down
Loading
Loading