From 933611af553f67c1c26d7bb1d85d61a3035499d7 Mon Sep 17 00:00:00 2001 From: Kevin Trogant Date: Sun, 24 May 2026 23:04:13 +0200 Subject: [PATCH] feat: keep track of node occupancy --- .../nextflow/k8s/K8sRuntimeRecorder.groovy | 2 +- .../nextflow/k8s/K8sSchedulingStrategy.groovy | 3 +- .../main/nextflow/k8s/K8sTaskScheduler.groovy | 34 +++++++++++++++++-- .../K8sHashSchedulingStrategy.groovy | 12 +++++-- .../nextflow/k8s/K8sTaskSchedulerTest.groovy | 1 + test/nextflow.config | 2 +- 6 files changed, 46 insertions(+), 8 deletions(-) diff --git a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sRuntimeRecorder.groovy b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sRuntimeRecorder.groovy index 67f6f60..277ae93 100644 --- a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sRuntimeRecorder.groovy +++ b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sRuntimeRecorder.groovy @@ -38,7 +38,7 @@ class K8sRuntimeRecorder { } } - records.add(new K8sRuntimeRecord(task.task.name, inputSizeSum, runtimeMilis)) + records.add(new K8sRuntimeRecord(task.task.processor.name, inputSizeSum, runtimeMilis)) } void write() { diff --git a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sSchedulingStrategy.groovy b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sSchedulingStrategy.groovy index efbb64e..78e19e7 100644 --- a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sSchedulingStrategy.groovy +++ b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sSchedulingStrategy.groovy @@ -5,9 +5,10 @@ interface K8sSchedulingStrategy { * Selects the next task to run from the given task queue. * If the scheduler should wait, returns {@code null} instead * + * @param scheduler the calling scheduler object * @param queue Pending scheduling requests * @param nodes Available Kubernetes node names * @return A launch decision, or {@code null} when no task should be launched now */ - K8sSchedulingDecision schedule(List queue, List nodes) + K8sSchedulingDecision schedule(K8sTaskScheduler scheduler, List queue, List nodes) } diff --git a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskScheduler.groovy b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskScheduler.groovy index 883482b..c74f5b0 100644 --- a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskScheduler.groovy +++ b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskScheduler.groovy @@ -16,9 +16,17 @@ class K8sTaskScheduler implements Runnable { private synchronized boolean shouldStop + private HashMap> occupancy; + private HashMap taskToNode + K8sTaskScheduler(K8sExecutor executor, K8sSchedulingStrategy strategy) { this.executor = executor this.strategy = strategy + this.occupancy = new HashMap<>() + for (String node : getNodes()) { + this.occupancy.put(node, new ArrayList()) + } + this.taskToNode = new HashMap<>() } /** @@ -37,13 +45,25 @@ class K8sTaskScheduler implements Runnable { * @param handler */ void taskFinished(K8sTaskHandler handler) { - + /* Remove from occupancy list */ + String nodeName = taskToNode.get(handler.task.name) + if (nodeName == null) { + log.error "[K8s] no node saved for task ${handler.task.name}" + return + } + ArrayList nodeTasks = occupancy.get(nodeName) + if (nodeTasks == null) { + log.error "[K8s] tried to remove task ${handler.task.name} from node ${nodeName} but no tasks are saved for that node" + return + } + nodeTasks.remove(handler) + log.info "[K8s] removed task ${handler.task.name} from node ${nodeName}" } protected synchronized void drain() { while( true ) { final pending = new ArrayList(queue) - final decision = strategy.schedule(pending, getNodes()) + final decision = strategy.schedule(this, pending, getNodes()) if ( !decision ) return @@ -53,9 +73,19 @@ class K8sTaskScheduler implements Runnable { log.info "[K8s] launching queued task ${decision.request.task.name} on node: ${decision.nodeName}" decision.request.handler.submitNow(decision.nodeName) + ArrayList nodeTasks = occupancy.get(decision.nodeName) + assert nodeTasks != null + nodeTasks.add(decision.request.handler) + taskToNode.put(decision.request.task.name, decision.nodeName) } } + /* Scheduling Strategy Interface */ + List getFreeNodes() { + def unoccupied = occupancy.findAll {it.value == null || it.value.size() == 0 } + unoccupied.keySet().toList() + } + /** * Run is the scheduler threads main function * */ diff --git a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/strategies/K8sHashSchedulingStrategy.groovy b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/strategies/K8sHashSchedulingStrategy.groovy index ddbfce7..6ad559c 100644 --- a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/strategies/K8sHashSchedulingStrategy.groovy +++ b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/strategies/K8sHashSchedulingStrategy.groovy @@ -4,16 +4,22 @@ import groovy.transform.CompileStatic import nextflow.k8s.K8sSchedulingDecision import nextflow.k8s.K8sSchedulingRequest import nextflow.k8s.K8sSchedulingStrategy +import nextflow.k8s.K8sTaskScheduler @CompileStatic class K8sHashSchedulingStrategy implements K8sSchedulingStrategy { @Override - K8sSchedulingDecision schedule(List queue, List nodes) { + K8sSchedulingDecision schedule(K8sTaskScheduler scheduler, List queue, List nodes) { if ( !queue || !nodes ) return null + + final freeNodes = scheduler.freeNodes + if ( freeNodes.size() == 0 ) + return null + final request = queue[0] - final index = Math.floorMod(request.task.hash.asInt(), nodes.size()) - return new K8sSchedulingDecision(request, nodes[index]) + final index = Math.floorMod(request.task.hash.asInt(), freeNodes.size()) + return new K8sSchedulingDecision(request, freeNodes[index]) } } diff --git a/nextflow/plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskSchedulerTest.groovy b/nextflow/plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskSchedulerTest.groovy index fb0dd83..62ba242 100644 --- a/nextflow/plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskSchedulerTest.groovy +++ b/nextflow/plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskSchedulerTest.groovy @@ -20,6 +20,7 @@ class K8sTaskSchedulerTest extends Specification { scheduler.submit(handler) then: + 1 * scheduler.() >> ['A', 'B', 'C'] 0 * strategy._ 0 * executor._ 0 * handler.submitNow(_) diff --git a/test/nextflow.config b/test/nextflow.config index 8850a15..e3229cc 100644 --- a/test/nextflow.config +++ b/test/nextflow.config @@ -22,7 +22,7 @@ k8s { projectDir = '/workspace/projects' cleanup = false - nextflowImage = 'gitea.kleine.eulenhexe.de/kevin/ma/nextflow-dvfs:0.4.2' + nextflowImage = 'gitea.kleine.eulenhexe.de/kevin/ma/nextflow-dvfs:0.5.3' imagePullPolicy = 'IfNotPresent' schedulerInterval = '10s' recordTaskRuntimes = true