MM-less Druid in K8s
Apache Druid Extension to enable using Kubernetes for launching and managing tasks instead of the Middle Managers. This extension allows you to launch tasks as kubernetes jobs removing the need for your middle manager.
Consider this an EXPERIMENTAL feature mostly because it has not been tested yet on a wide variety of long-running Druid clusters.
How it works
The K8s extension builds a pod spec for each task using the specified pod adapter. All jobs are natively restorable, they are decoupled from the Druid deployment, thus restarting pods or doing upgrades has no effect on tasks in flight. They will continue to run and when the overlord comes back up it will start tracking them again.
Kubernetes Client Mode
"Direct" K8s API Interaction per task (Default)
Task lifecycle code in Druid talks directly to the Kubernetes API server for all operations that require interaction with the Kubernetes cluster.
SharedInformer "Caching" (Experimental)
Enabled by setting druid.indexer.runner.useK8sSharedInformers=true, this mode uses Fabric8 SharedInformer objects for monitoring state changes in the remote K8s cluster, reducing the number of direct API calls to the Kubernetes API server. This can greatly reduce load on the API server, especially in environments with a high volume of tasks.
This mode is experimental and should be used with caution in production until it has been vetted more thoroughly by the community.
The core idea is to use two SharedInformers, one for jobs and one for pods, to watch for changes in the remote K8s cluster. These informers maintain a local cache of jobs and pods that tasks can query. The informers can also notify listeners when changes occur, allowing tasks to react to state changes without polling the API server or creating per-task watches on the K8s cluster.
Architecture: Direct vs. Caching Mode
Key Differences:
-
DirectKubernetesPeonClient(Default): Every read operation makes a direct HTTP call to the K8s API server. With 100 concurrent tasks, this results in 100+ active API connections with continuous polling. -
CachingKubernetesPeonClient(Experimental): All read operations query an in-memory cache maintained bySharedInformers. With 100 concurrent tasks, only 2 persistent watch connections are used (one for Jobs, one for Pods), achieving a large reduction in API calls.
Shared Operations:
Both implementations share the same write (job creation, deletion) and log read operations code, which always use direct API calls.
Configuration
To use this extension please make sure to include druid-kubernetes-overlord-extensions in the extensions load list for your overlord process.
The extension uses druid.indexer.runner.capacity to limit the number of k8s jobs in flight. A good initial value for this would be the sum of the total task slots of all the middle managers you were running before switching to K8s based ingestion. The K8s task runner uses one thread per Job that is created, so setting this number too large can cause memory issues on the overlord. Additionally set the variable druid.indexer.runner.namespace to the namespace in which you are running druid.
Other configurations required are:
druid.indexer.runner.type: k8s and druid.indexer.task.encapsulatedTask: true
Dynamic config
Druid operators can dynamically tune certain features within this extension. You don't need to restart the Overlord service for these changes to take effect.
Druid can dynamically tune pod template selection and capacity. Where capacity refers to druid.indexer.runner.capacity.
Pod template selection allows you to configure the pod template based on the task to be run. To enable dynamic pod template selection, first configure the custom template pod adapter.
Use the following APIs to view and update the dynamic configuration for the Kubernetes task runner.
To use these APIs, ensure you have read and write permissions for the CONFIG resource type with the resource name "CONFIG". For more information on permissions, see User authentication and authorization.
Get dynamic configuration
Retrieves the current dynamic execution config for the Kubernetes task runner. Returns a JSON object with the dynamic configuration properties.
URL
GET /druid/indexer/v1/k8s/taskrunner/executionconfig
Responses
- 200 SUCCESS
Successfully retrieved dynamic configuration
Sample request
- cURL
- HTTP
curl "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/k8s/taskrunner/executionconfig"
GET /druid/indexer/v1/k8s/taskrunner/executionconfig HTTP/1.1
Host: http://ROUTER_IP:ROUTER_PORT
Sample response
View the response
{
"type": "default",
"podTemplateSelectStrategy":
{
"type": "selectorBased",
"selectors": [
{
"selectionKey": "podSpec1",
"context.tags": {
"userProvidedTag": ["tag1", "tag2"]
},
"dataSource": ["wikipedia"]
},
{
"selectionKey": "podSpec2",
"type": ["index_kafka"]
}
]
},
"capacity": 12
}
Update dynamic configuration
Updates the dynamic configuration for the Kubernetes Task Runner
Note: Both podTemplateSelectStrategy and capacity are optional fields. A POST request may include either, both, or neither.
URL
POST /druid/indexer/v1/k8s/taskrunner/executionconfig
Header parameters
The endpoint supports the following optional header parameters to populate the author and comment fields in the configuration history.
X-Druid-Author- Type: String
- Author of the configuration change.
X-Druid-Comment- Type: String
- Description for the update.
Responses
- 200 SUCCESS
Successfully updated dynamic configuration
Sample request
- cURL
- HTTP
curl "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/k8s/taskrunner/executionconfig" \
--header 'Content-Type: application/json' \
--data '{
"type": "default",
"podTemplateSelectStrategy":
{
"type": "selectorBased",
"selectors": [
{
"selectionKey": "podSpec1",
"context.tags":
{
"userProvidedTag": ["tag1", "tag2"]
},
"dataSource": ["wikipedia"]
},
{
"selectionKey": "podSpec2",
"type": ["index_kafka"]
}
]
},
"capacity": 6
}'
POST /druid/indexer/v1/k8s/taskrunner/executionconfig HTTP/1.1
Host: http://ROUTER_IP:ROUTER_PORT
Content-Type: application/json
{
"type": "default",
"podTemplateSelectStrategy":
{
"type": "selectorBased",
"selectors": [
{
"selectionKey": "podSpec1",
"context.tags":
{
"userProvidedTag": ["tag1", "tag2"]
},
"dataSource": ["wikipedia"]
},
{
"selectionKey": "podSpec2",
"type": ["index_kafka"]
}
]
},
"capacity": 6
}
Sample response
A successful request returns an HTTP 200 OK message code and an empty response body.
Get dynamic configuration history
Retrieves the history of changes to Kubernetes task runner's dynamic execution config over an interval of time. Returns an empty array if there are no history records available.
URL
GET /druid/indexer/v1/k8s/taskrunner/executionconfig/history
Query parameters
The endpoint supports the following optional query parameters to filter results.
-
interval- Type: String
- Limit the results to the specified time interval in ISO 8601 format delimited with
/. For example,2023-07-13/2023-07-19. The default interval is one week. You can change this period by settingdruid.audit.manager.auditHistoryMillisin theruntime.propertiesfile for the Coordinator.
-
count- Type: Integer
- Limit the number of results to the last
nentries.
Responses
- 200 SUCCESS
Successfully retrieved dynamic configuration
Sample request
- cURL
- HTTP
curl "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/k8s/taskrunner/executionconfig/history"
GET /druid/indexer/v1/k8s/taskrunner/executionconfig/history HTTP/1.1
Host: http://ROUTER_IP:ROUTER_PORT
Sample response
View the response
[
{
"key": "k8s.taskrunner.config",
"type": "k8s.taskrunner.config",
"auditInfo": {
"author": "",
"comment": "",
"ip": "127.0.0.1"
},
"payload": "{\"type\": \"default\",\"podTemplateSelectStrategy\":{\"type\": \"taskType\"},\"capacity\":6",
"auditTime": "2024-06-13T20:59:51.622Z"
}
]
Pod adapters
The logic defining how the pod template is built for your Kubernetes Job depends on which pod adapter you have specified.