From aba13249350bca0b45fd4aa5c31171afa32e1fd3 Mon Sep 17 00:00:00 2001 From: Kevin Trogant Date: Thu, 30 Apr 2026 14:56:34 +0200 Subject: [PATCH] feat: helper for deploying pods before workflow execution starts --- .../nextflow/k8s/K8sDriverLauncher.groovy | 5 ++ .../nextflow/k8s/K8sNodeInitDeployer.groovy | 83 +++++++++++++++++++ .../nextflow/k8s/model/PodSpecBuilder.groovy | 10 +++ 3 files changed, 98 insertions(+) create mode 100644 nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sNodeInitDeployer.groovy diff --git a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sDriverLauncher.groovy b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sDriverLauncher.groovy index aa803d1..a5e7978 100644 --- a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sDriverLauncher.groovy +++ b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sDriverLauncher.groovy @@ -149,7 +149,12 @@ class K8sDriverLauncher { this.k8sConfig = makeK8sConfig(config.toMap()) this.k8sClient = makeK8sClient(k8sConfig) this.k8sConfig.checkStorageAndPaths(k8sClient) + createK8sConfigMap() + + K8sNodeInitDeployer initDeployer = new K8sNodeInitDeployer(k8sClient, k8sConfig, runName) + initDeployer.deploy() + createK8sLauncherPod() waitPodStart() // login into container session diff --git a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sNodeInitDeployer.groovy b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sNodeInitDeployer.groovy new file mode 100644 index 0000000..ac47800 --- /dev/null +++ b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sNodeInitDeployer.groovy @@ -0,0 +1,83 @@ +package nextflow.k8s + +import nextflow.k8s.client.K8sClient +import nextflow.k8s.model.PodHostMount +import nextflow.k8s.model.PodSpecBuilder +import nextflow.k8s.model.PodVolumeClaim + +class K8sNodeInitDeployer { + private K8sClient client + private K8sConfig config + private String runName + + K8sNodeInitDeployer(K8sClient client, K8sConfig config, String runName) { + this.client = client + this.config = config + this.runName = runName + } + + void deploy() { + final init = config.nodeInit + if ( !init?.enabled ) + return + + final nodes = getNodes() + for ( String nodeName : nodes ) { + final spec = makePodSpec(init, nodeName) + client.podCreate(spec) + } + + waitForPods(nodes) + } + + private List getNodes() { + final resp = client.nodeList() + ArrayList nodes = new ArrayList() + for ( Map item : resp.items ) { + nodes.add(item.metadata.name as String) + } + return nodes + } + + private String buildPodName(String nodeName) { + // TODO: Remove anything that is not lowercase alpha-numeric or dash + String sanitizedNodeName = nodeName.toLowerCase() + String name = "nf-init-${runName}-${sanitizedNodeName}" + if ( name.length() > 63 ) + name = name.substring(0, 63) + return name + } + + private Map makePodSpec(K8sNodeInitConfig config, String nodeName) { + ArrayList mounts = new ArrayList() + mounts.add(new PodHostMount("/sys", "/sys")) + mounts.add(new PodHostMount("/dev", "/dev")) + mounts.add(new PodHostMount("/lib/modules", "/lib/modules")) + + PodSpecBuilder builder = new PodSpecBuilder() + return builder.withNodeName(nodeName) + .withImageName(config.image) + .withCommand(config.command) + .withPrivileged(true) + .withHostMounts(mounts) + .withPodName(buildPodName(nodeName)) + .build() + } + + private void waitForPods(List nodes) { + for ( String nodeName : nodes ) { + String podName = buildPodName(nodeName) + + // TODO: Support differentiate between wait = running + // and wait = Succeeded + + while( true ) { + sleep 1000 + final state = k8sClient.podState(podName) + if( state && !state.containsKey('waiting') ) { + break + } + } + } + } +} diff --git a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/model/PodSpecBuilder.groovy b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/model/PodSpecBuilder.groovy index 2c70832..d8a6607 100644 --- a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/model/PodSpecBuilder.groovy +++ b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/model/PodSpecBuilder.groovy @@ -126,6 +126,8 @@ class PodSpecBuilder { String runtimeClassName + String nodeName + /** * @return A sequential volume unique identifier */ @@ -399,6 +401,11 @@ class PodSpecBuilder { return this } + PodSpecBuilder withNodeName(String value) { + this.nodeName = value + return this + } + @PackageScope List createPullSecret() { def result = new ArrayList(1) def entry = new LinkedHashMap(1) @@ -496,6 +503,9 @@ class PodSpecBuilder { if ( runtimeClassName ) spec.runtimeClassName = runtimeClassName + if ( nodeName ) + spec.nodeName = nodeName + final pod = [ apiVersion: 'v1', kind: 'Pod',