From 78fe51a40dbc0c5b95252a27b86a724b10027a9e Mon Sep 17 00:00:00 2001 From: Kevin Trogant Date: Thu, 21 May 2026 20:37:55 +0200 Subject: [PATCH] feat: invoke scheduler at regular interval --- .../src/main/nextflow/k8s/K8sConfig.groovy | 11 ++++- .../src/main/nextflow/k8s/K8sExecutor.groovy | 19 ++++++-- .../main/nextflow/k8s/K8sTaskHandler.groovy | 1 + .../main/nextflow/k8s/K8sTaskScheduler.groovy | 43 +++++++++++++++---- .../nextflow/k8s/K8sTaskSchedulerTest.groovy | 18 +++++--- test/nextflow.config | 3 +- 6 files changed, 75 insertions(+), 20 deletions(-) diff --git a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sConfig.groovy b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sConfig.groovy index 6ef912b..eff1638 100644 --- a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sConfig.groovy +++ b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sConfig.groovy @@ -39,6 +39,9 @@ import nextflow.k8s.model.PodSecurityContext import nextflow.k8s.model.PodVolumeClaim import nextflow.k8s.model.ResourceType import nextflow.util.Duration + +import java.util.concurrent.TimeUnit + /** * Model Kubernetes specific settings defined in the nextflow * configuration file @@ -221,6 +224,12 @@ class K8sConfig implements ConfigScope { """) final String nextflowImage + @ConfigOption + @Description(""" + The run interval of the kubernetes scheduler + """) + final Duration schedulerInterval + /* required by extension point -- do not remove */ K8sConfig() { this(Collections.emptyMap()) @@ -251,6 +260,7 @@ class K8sConfig implements ConfigScope { storageSubPath = opts.storageSubPath userName = opts.userName nextflowImage = opts.nextflowImage ?: "nextflow/nextflow:${BuildInfo.version}" + schedulerInterval = opts.schedulerInterval as Duration ?: new Duration(10, TimeUnit.SECONDS) launchDir = opts.launchDir ?: "${storageMountPath}/${getUserName()}" projectDir = opts.projectDir ?: "${storageMountPath}/projects" @@ -272,7 +282,6 @@ class K8sConfig implements ConfigScope { else if( securityContext ) pod.securityContext = new PodSecurityContext(securityContext) - log.info("Hello sailor") nodeInit = new K8sNodeInitConfig(opts.nodeInit as Map ?: Collections.emptyMap()) } diff --git a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sExecutor.groovy b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sExecutor.groovy index 31a79bb..4e4d981 100644 --- a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sExecutor.groovy +++ b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sExecutor.groovy @@ -52,19 +52,19 @@ class K8sExecutor extends Executor implements ExtensionPoint { */ private Cache clientCache - private K8sTaskScheduler taskScheduler + private K8sTaskScheduler taskScheduler + private Thread schedulerThread /** * @return The Kubernetes HTTP client. Delegates to a Guava cache that refreshes * the client (including the service account token) when the configured interval expires. */ - protected K8sClient getClient() { + K8sClient getClient() { clientCache.get('client', () -> new K8sClient(k8sConfig.getClient())) } protected K8sTaskScheduler getTaskScheduler() { - if( taskScheduler == null ) - taskScheduler = new K8sTaskScheduler(getClient(), new K8sHashSchedulingStrategy()) + assert taskScheduler != null return taskScheduler } @@ -88,7 +88,18 @@ class K8sExecutor extends Executor implements ExtensionPoint { .expireAfterWrite(refreshInterval.toMillis(), TimeUnit.MILLISECONDS) .build() final client = getClient() + log.debug "[K8s] config=$k8sConfig; API client config=$client.config" + + this.taskScheduler = new K8sTaskScheduler(this, new K8sHashSchedulingStrategy()) + this.schedulerThread = new Thread(this.taskScheduler) + this.schedulerThread.start() + } + + @Override + void shutdown() { + this.taskScheduler.stop() + this.schedulerThread.join() } /** diff --git a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy index c919931..9bf99d1 100644 --- a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy +++ b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy @@ -334,6 +334,7 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask { void submit() { builder = createBashWrapper(task) builder.build() + log.info "[K8s] submitting task ${this.task.name}" executor.taskScheduler.submit(this) } diff --git a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskScheduler.groovy b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskScheduler.groovy index 4d825ab..07a4929 100644 --- a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskScheduler.groovy +++ b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskScheduler.groovy @@ -8,22 +8,28 @@ import java.util.concurrent.LinkedBlockingQueue @Slf4j @CompileStatic -class K8sTaskScheduler { - private final K8sClient client +class K8sTaskScheduler implements Runnable { + private final K8sExecutor executor private final K8sSchedulingStrategy strategy private final LinkedBlockingQueue queue = new LinkedBlockingQueue<>() private List cachedNodes - K8sTaskScheduler(K8sClient client, K8sSchedulingStrategy strategy) { - this.client = client + private synchronized boolean shouldStop + + K8sTaskScheduler(K8sExecutor executor, K8sSchedulingStrategy strategy) { + this.executor = executor this.strategy = strategy } + /** + * Adds a task to the queue of outstanding tasks + * @param handler + */ void submit(K8sTaskHandler handler) { - log.debug "[K8s] received queued task ${handler.task.name}" + log.info "[K8s] received queued task ${handler.task.name}" queue.add(new K8sSchedulingRequest(handler)) - drain() + /* TODO: Decide if we should invoke the scheduler immediately */ } protected synchronized void drain() { @@ -37,11 +43,32 @@ class K8sTaskScheduler { if ( !queue.remove(decision.request) ) continue - log.debug "[K8s] launching queued task ${decision.request.task.name} on node: ${decision.nodeName}" + log.info "[K8s] launching queued task ${decision.request.task.name} on node: ${decision.nodeName}" decision.request.handler.submitNow(decision.nodeName) } } + /** + * Run is the scheduler threads main function + * */ + void run() { + this.shouldStop = false + def interval = this.executor.getK8sConfig().schedulerInterval + while (!shouldStop) { + sleep(interval.toMillis()) + drain() + } + log.info("[K8s] terminated scheduler loop") + } + + /** + * Stop terminates the scheduler thread + */ + void stop() { + log.info("[K8s] stopping scheduler loop") + this.shouldStop = true + } + protected List getNodes() { if ( cachedNodes == null ) cachedNodes = fetchNodes() @@ -50,7 +77,7 @@ class K8sTaskScheduler { @CompileDynamic private List fetchNodes() { - final resp = client.nodeList() + final resp = executor.getClient().nodeList() ArrayList nodes = new ArrayList() for ( Map item : resp.items ) { nodes.add(item.metadata.name as String) diff --git a/nextflow/plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskSchedulerTest.groovy b/nextflow/plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskSchedulerTest.groovy index 0ab11e5..fb0dd83 100644 --- a/nextflow/plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskSchedulerTest.groovy +++ b/nextflow/plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskSchedulerTest.groovy @@ -8,9 +8,9 @@ class K8sTaskSchedulerTest extends Specification { def 'should enqueue task handler without immediately submitting it' () { given: - def client = Mock(K8sClient) + def executor = Mock(K8sExecutor) def strategy = Mock(K8sSchedulingStrategy) - def scheduler = new K8sTaskScheduler(client, strategy) + def scheduler = new K8sTaskScheduler(executor, strategy) def task = Mock(TaskRun) def handler = Spy(K8sTaskHandler) @@ -21,7 +21,7 @@ class K8sTaskSchedulerTest extends Specification { then: 0 * strategy._ - 0 * client._ + 0 * executor._ 0 * handler.submitNow(_) and: @@ -31,8 +31,9 @@ class K8sTaskSchedulerTest extends Specification { def 'should drain queued task and submit it to selected node' () { given: def client = Mock(K8sClient) + def executor = Mock(K8sExecutor) def strategy = Mock(K8sSchedulingStrategy) - def scheduler = new K8sTaskScheduler(client, strategy) + def scheduler = new K8sTaskScheduler(executor, strategy) def task = Mock(TaskRun) def handler = Spy(K8sTaskHandler) @@ -43,6 +44,7 @@ class K8sTaskSchedulerTest extends Specification { scheduler.drain() then: + 1 * executor.getClient() >> client 1 * client.nodeList() >> [ items: [ [metadata: [name: 'node-a']], @@ -70,8 +72,9 @@ class K8sTaskSchedulerTest extends Specification { def 'should keep task queued when strategy returns no decision' () { given: def client = Mock(K8sClient) + def executor = Mock(K8sExecutor) def strategy = Mock(K8sSchedulingStrategy) - def scheduler = new K8sTaskScheduler(client, strategy) + def scheduler = new K8sTaskScheduler(executor, strategy) def task = Mock(TaskRun) def handler = Spy(K8sTaskHandler) @@ -82,6 +85,7 @@ class K8sTaskSchedulerTest extends Specification { scheduler.drain() then: + 1 * executor.getClient() >> client 1 * client.nodeList() >> [ items: [ [metadata: [name: 'node-a']] @@ -101,8 +105,9 @@ class K8sTaskSchedulerTest extends Specification { def 'should continue draining when selected request was already removed' () { given: def client = Mock(K8sClient) + def executor = Mock(K8sExecutor) def strategy = Mock(K8sSchedulingStrategy) - def scheduler = new K8sTaskScheduler(client, strategy) + def scheduler = new K8sTaskScheduler(executor, strategy) def task1 = Mock(TaskRun) def handler1 = Spy(K8sTaskHandler) @@ -118,6 +123,7 @@ class K8sTaskSchedulerTest extends Specification { scheduler.drain() then: + 1 * executor.getClient() >> client 1 * client.nodeList() >> [ items: [ [metadata: [name: 'node-a']] diff --git a/test/nextflow.config b/test/nextflow.config index 32813ef..db8c569 100644 --- a/test/nextflow.config +++ b/test/nextflow.config @@ -22,8 +22,9 @@ k8s { projectDir = '/workspace/projects' cleanup = false - nextflowImage = 'gitea.kleine.eulenhexe.de/kevin/ma/nextflow-dvfs:0.1' + nextflowImage = 'gitea.kleine.eulenhexe.de/kevin/ma/nextflow-dvfs:0.3.2' imagePullPolicy = 'IfNotPresent' + schedulerInterval = '10s' nodeInit { enabled = false