From 6e2e42cbbed31298ce645804885d35dcdba8dcc4 Mon Sep 17 00:00:00 2001 From: Kevin Trogant Date: Sun, 3 May 2026 18:33:34 +0200 Subject: [PATCH] feature: Basic k8s scheduler --- .../src/main/nextflow/k8s/K8sExecutor.groovy | 10 ++++ .../nextflow/k8s/K8sSchedulingDecision.groovy | 14 +++++ .../nextflow/k8s/K8sSchedulingRequest.groovy | 17 ++++++ .../nextflow/k8s/K8sSchedulingStrategy.groovy | 13 +++++ .../main/nextflow/k8s/K8sTaskHandler.groovy | 28 ++++++--- .../main/nextflow/k8s/K8sTaskScheduler.groovy | 57 +++++++++++++++++++ .../K8sHashSchedulingStrategy.groovy | 19 +++++++ 7 files changed, 150 insertions(+), 8 deletions(-) create mode 100644 nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sSchedulingDecision.groovy create mode 100644 nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sSchedulingRequest.groovy create mode 100644 nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sSchedulingStrategy.groovy create mode 100644 nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskScheduler.groovy create mode 100644 nextflow/plugins/nf-k8s/src/main/nextflow/k8s/strategies/K8sHashSchedulingStrategy.groovy diff --git a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sExecutor.groovy b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sExecutor.groovy index f03eb88..31a79bb 100644 --- a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sExecutor.groovy +++ b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sExecutor.groovy @@ -16,6 +16,8 @@ package nextflow.k8s +import nextflow.k8s.strategies.K8sHashSchedulingStrategy + import java.util.concurrent.TimeUnit import com.google.common.cache.Cache @@ -50,6 +52,8 @@ class K8sExecutor extends Executor implements ExtensionPoint { */ private Cache clientCache + private K8sTaskScheduler taskScheduler + /** * @return The Kubernetes HTTP client. Delegates to a Guava cache that refreshes * the client (including the service account token) when the configured interval expires. @@ -58,6 +62,12 @@ class K8sExecutor extends Executor implements ExtensionPoint { clientCache.get('client', () -> new K8sClient(k8sConfig.getClient())) } + protected K8sTaskScheduler getTaskScheduler() { + if( taskScheduler == null ) + taskScheduler = new K8sTaskScheduler(getClient(), new K8sHashSchedulingStrategy()) + return taskScheduler + } + /** * @return The `k8s` configuration scope in the nextflow configuration object */ diff --git a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sSchedulingDecision.groovy b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sSchedulingDecision.groovy new file mode 100644 index 0000000..020ca9e --- /dev/null +++ b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sSchedulingDecision.groovy @@ -0,0 +1,14 @@ +package nextflow.k8s + +import groovy.transform.CompileStatic + +@CompileStatic +class K8sSchedulingDecision { + final String nodeName + final K8sSchedulingRequest request + + K8sSchedulingDecision(K8sSchedulingRequest request, String nodeName) { + this.request = request + this.nodeName = nodeName + } +} diff --git a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sSchedulingRequest.groovy b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sSchedulingRequest.groovy new file mode 100644 index 0000000..a580064 --- /dev/null +++ b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sSchedulingRequest.groovy @@ -0,0 +1,17 @@ +package nextflow.k8s + +import groovy.transform.CompileStatic +import nextflow.processor.TaskRun + +@CompileStatic +class K8sSchedulingRequest { + final K8sTaskHandler handler + final TaskRun task + final long submitTimeMillis + + K8sSchedulingRequest(K8sTaskHandler handler) { + this.handler = handler + this.task = handler.task + this.submitTimeMillis = System.currentTimeMillis() + } +} diff --git a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sSchedulingStrategy.groovy b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sSchedulingStrategy.groovy new file mode 100644 index 0000000..efbb64e --- /dev/null +++ b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sSchedulingStrategy.groovy @@ -0,0 +1,13 @@ +package nextflow.k8s + +interface K8sSchedulingStrategy { + /** + * Selects the next task to run from the given task queue. + * If the scheduler should wait, returns {@code null} instead + * + * @param queue Pending scheduling requests + * @param nodes Available Kubernetes node names + * @return A launch decision, or {@code null} when no task should be launched now + */ + K8sSchedulingDecision schedule(List queue, List nodes) +} diff --git a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy index ae035ee..d72a3f6 100644 --- a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy +++ b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy @@ -179,16 +179,17 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask { * Creates a Pod specification that executed that specified task * * @param task A {@link TaskRun} instance representing the task to execute + * @param nodeName The kubernetes node on which the task should run or {@code null}, if no specific node is requested * @return A {@link Map} object modeling a Pod specification */ - protected Map newSubmitRequest(TaskRun task) { + protected Map newSubmitRequest(TaskRun task, String nodeName) { def imageName = task.container if( !imageName ) throw new ProcessSubmitException("Missing container image for process `$task.processor.name`") try { - newSubmitRequest0(task, imageName) + newSubmitRequest0(task, imageName, nodeName) } catch( Throwable e ) { throw new ProcessSubmitException("Failed to submit K8s ${resourceType.lower()} -- Cause: ${e.message ?: e}", e) @@ -203,7 +204,7 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask { return executor.getK8sConfig().cpuLimitsEnabled() } - protected Map newSubmitRequest0(TaskRun task, String imageName) { + protected Map newSubmitRequest0(TaskRun task, String imageName, String nodeName) { final launcher = getSubmitCommand(task) final taskCfg = task.getConfig() @@ -228,6 +229,9 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask { builder.withCommand(launcher) } + if( nodeName ) + builder.withNodeName(nodeName) + // note: task environment is managed by the task bash wrapper // do not add here -- see also #680 if( fixOwnership() ) @@ -314,18 +318,25 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask { } /** - * Creates a new K8s pod executing the associated task + * Prepares the task execution and enqueues it at the scheduler */ @Override @CompileDynamic void submit() { builder = createBashWrapper(task) builder.build() + executor.taskScheduler.submit(this) + } + /** + * Creates a new K8s pod executing the associated task + */ + @CompileDynamic + void submitNow(String nodeName) { final req = newSubmitRequest(task) final resp = useJobResource() - ? client.jobCreate(req, yamlDebugPath()) - : client.podCreate(req, yamlDebugPath()) + ? client.jobCreate(req, yamlDebugPath()) + : client.podCreate(req, yamlDebugPath()) if( !resp.metadata?.name ) throw new K8sResponseException("Missing created ${resourceType.lower()} name", resp) @@ -373,7 +384,8 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask { @Override boolean checkIfRunning() { - if( !podName ) throw new IllegalStateException("Missing K8s ${resourceType.lower()} name -- cannot check if running") + if( !podName ) + return false if(isSubmitted()) { def state = getState() // include `terminated` state to allow the handler status to progress @@ -419,7 +431,7 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask { @Override boolean checkIfCompleted() { if( !podName ) - throw new IllegalStateException("Missing K8s ${resourceType.lower()} name - cannot check if complete") + return false final state = getState() if( state && state.terminated ) { diff --git a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskScheduler.groovy b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskScheduler.groovy new file mode 100644 index 0000000..57957c0 --- /dev/null +++ b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskScheduler.groovy @@ -0,0 +1,57 @@ +package nextflow.k8s + +import groovy.transform.CompileDynamic +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import nextflow.k8s.client.K8sClient +import java.util.concurrent.LinkedBlockingQueue + +@Slf4j +@CompileStatic +class K8sTaskScheduler { + private final K8sClient client + private final K8sSchedulingStrategy strategy + private final LinkedBlockingQueue queue = new LinkedBlockingQueue<>() + private List cachedNodes + + K8sTaskScheduler(K8sClient client, K8sSchedulingStrategy strategy) { + this.client = client + this.strategy = strategy + } + + void submit(K8sTaskHandler handler) { + queue.add(new K8sSchedulingRequest(handler)) + } + + protected synchronized void drain() { + while( true ) { + final pending = new ArrayList(queue) + final decision = strategy.schedule(pending, getNodes()) + + if ( !decision ) + return + + if ( !queue.remove(decision.request) ) + continue + + log.trace "[K8s] launching queued task ${decision.request.task.name} on node: ${decision.nodeName}" + decision.request.handler.submitNow(decision.nodeName) + } + } + + protected List getNodes() { + if ( cachedNodes == null ) + cachedNodes = fetchNodes() + return cachedNodes + } + + @CompileDynamic + private List fetchNodes() { + final resp = client.nodeList() + ArrayList nodes = new ArrayList() + for ( Map item : resp.items ) { + nodes.add(item.metadata.name as String) + } + return nodes + } +} diff --git a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/strategies/K8sHashSchedulingStrategy.groovy b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/strategies/K8sHashSchedulingStrategy.groovy new file mode 100644 index 0000000..ddbfce7 --- /dev/null +++ b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/strategies/K8sHashSchedulingStrategy.groovy @@ -0,0 +1,19 @@ +package nextflow.k8s.strategies + +import groovy.transform.CompileStatic +import nextflow.k8s.K8sSchedulingDecision +import nextflow.k8s.K8sSchedulingRequest +import nextflow.k8s.K8sSchedulingStrategy + +@CompileStatic +class K8sHashSchedulingStrategy implements K8sSchedulingStrategy { + + @Override + K8sSchedulingDecision schedule(List queue, List nodes) { + if ( !queue || !nodes ) + return null + final request = queue[0] + final index = Math.floorMod(request.task.hash.asInt(), nodes.size()) + return new K8sSchedulingDecision(request, nodes[index]) + } +}