diff --git a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sDriverLauncher.groovy b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sDriverLauncher.groovy index a5e7978..15a157c 100644 --- a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sDriverLauncher.groovy +++ b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sDriverLauncher.groovy @@ -132,6 +132,8 @@ class K8sDriverLauncher { */ private String plugins + private K8sNodeInitDeployer initDeployer + /** * Launcher entry point. Set-up the environment and create a pod that run the Nextflow * application (which in turns executed each task as a pod) @@ -149,12 +151,11 @@ class K8sDriverLauncher { this.k8sConfig = makeK8sConfig(config.toMap()) this.k8sClient = makeK8sClient(k8sConfig) this.k8sConfig.checkStorageAndPaths(k8sClient) + this.initDeployer = new K8sNodeInitDeployer(k8sClient, k8sConfig, runName) createK8sConfigMap() - K8sNodeInitDeployer initDeployer = new K8sNodeInitDeployer(k8sClient, k8sConfig, runName) initDeployer.deploy() - createK8sLauncherPod() waitPodStart() // login into container session @@ -178,6 +179,9 @@ class K8sDriverLauncher { if( k8sConfig.getCleanup(deleteOnSuccessByDefault) ) { deleteConfigMap() } + // cleanup pre-workflow pods + initDeployer.cleanup() + return exitCode } @@ -532,7 +536,9 @@ class K8sDriverLauncher { assert k8sClient // -- setup config file - String cmd = "source /etc/nextflow/init.sh; ${getLaunchCli()}" + String cmd = "source /etc/nextflow/init.sh; ${getLaunchCli()}" + + "; if [ -x /etc/nextflow/node-init-cleanup.sh ]; then /etc/nextflow/node-init-cleanup.sh || true; fi; " + + "exit \$status" // create the launcher pod PodSpecBuilder builder = new PodSpecBuilder() @@ -662,6 +668,10 @@ class K8sDriverLauncher { paramsFile = "/etc/nextflow/$file.name" } + // pre-workflow pod cleanup + if ( background ) + configMap['node-init-cleanup.sh'] = initDeployer.buildCleanupScript() + // create the config map configMapName = makeConfigMapName(configMap) tryCreateConfigMap(configMapName, configMap) diff --git a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sNodeInitConfig.groovy b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sNodeInitConfig.groovy index 69a9840..7ebbad9 100644 --- a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sNodeInitConfig.groovy +++ b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sNodeInitConfig.groovy @@ -29,6 +29,10 @@ class K8sNodeInitConfig implements ConfigScope { @Description("the pod state to wait on") final String wait; + @ConfigOption + @Description("enables cleanup of pre-workflow nodes.") + final boolean cleanup; + K8sNodeInitConfig() { this(Collections.emptyMap()) } @@ -38,5 +42,6 @@ class K8sNodeInitConfig implements ConfigScope { image = opts.image as String command = opts.command as List wait = opts.wait as String + cleanup = opts.cleanup as boolean } } diff --git a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sNodeInitDeployer.groovy b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sNodeInitDeployer.groovy index 8a34f60..b2be339 100644 --- a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sNodeInitDeployer.groovy +++ b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sNodeInitDeployer.groovy @@ -29,6 +29,78 @@ class K8sNodeInitDeployer { waitForPods(nodes) } + void cleanup() { + final init = config.nodeInit + if ( !init?.enabled || !init?.cleanup ) + return + + final nodes = getNodes() + for ( String nodeName : nodes ) { + final podName = buildPodName(nodeName) + client.podDelete(podName) + } + } + + String buildCleanupScript() { + if ( !config.nodeInit?.enabled || !config.nodeInit?.cleanup ) + return "#!/usr/bin/env/bash\nexit 0" + + final nodes = getNodes() + final podNames = nodes.collect {buildPodName(it)} + + String script = ''' +#!/usr/bin/env bash +set -u + +PODS=(''' + for ( String podName : podNames ) { + script += "\"${podName}\"\n" + } + script += ''' +) + +SERVICE_ACCOUNT_DIR="/var/run/secrets/kubernetes.io/serviceaccount" +TOKEN="$(cat "${SERVICE_ACCOUNT_DIR}/token")" +NAMESPACE="$(cat "${SERVICE_ACCOUNT_DIR}/namespace")" +CA_CERT="${SERVICE_ACCOUNT_DIR}/ca.crt" + +KUBE_API="https://${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT_HTTPS:-443}" + +for POD in "${PODS[@]}"; do + echo "Deleting pod: ${POD}" + + HTTP_CODE="$( + curl \\ + --silent \\ + --show-error \\ + --output /tmp/delete-pod-response.json \\ + --write-out "%{http_code}" \\ + --request DELETE \\ + --cacert "${CA_CERT}" \\ + --header "Authorization: Bearer ${TOKEN}" \\ + --header "Accept: application/json" \\ + "${KUBE_API}/api/v1/namespaces/${NAMESPACE}/pods/${POD}" + )" + + case "${HTTP_CODE}" in + 200|202) + echo "Deleted pod: ${POD}" + ;; + 404) + echo "Pod already absent: ${POD}" + ;; + *) + echo "Failed to delete pod: ${POD}; HTTP ${HTTP_CODE}" >&2 + cat /tmp/delete-pod-response.json >&2 || true + ;; + esac +done + +exit 0 + ''' + return script + } + private List getNodes() { final resp = client.nodeList() ArrayList nodes = new ArrayList() @@ -60,6 +132,7 @@ class K8sNodeInitDeployer { .withPrivileged(true) .withHostMounts(mounts) .withPodName(buildPodName(nodeName)) + .withNamespace(client.config.namespace) .build() } @@ -69,7 +142,7 @@ class K8sNodeInitDeployer { String podName = buildPodName(nodeName) while (true) { sleep 1000 - final state = k8sClient.podState(podName) + final state = client.podState(podName) if (state && !state.containsKey('waiting')) { break } diff --git a/nextflow/plugins/nf-k8s/src/test/nextflow/k8s/K8sDriverLauncherTest.groovy b/nextflow/plugins/nf-k8s/src/test/nextflow/k8s/K8sDriverLauncherTest.groovy index 437aa00..516b014 100644 --- a/nextflow/plugins/nf-k8s/src/test/nextflow/k8s/K8sDriverLauncherTest.groovy +++ b/nextflow/plugins/nf-k8s/src/test/nextflow/k8s/K8sDriverLauncherTest.groovy @@ -138,6 +138,7 @@ class K8sDriverLauncherTest extends Specification { def pod = Mock(PodOptions) pod.getVolumeClaims() >> [ new PodVolumeClaim('pvc-1', '/mnt/path/data') ] pod.getMountConfigMaps() >> [ new PodMountConfig('cfg-2', '/mnt/path/cfg') ] + pod.automountServiceAccountToken = false def k8s = Mock(K8sConfig) k8s.getNextflowImageName() >> 'the-image' @@ -162,13 +163,12 @@ class K8sDriverLauncherTest extends Specification { kind: 'Pod', metadata: [name:'foo-boo', namespace:'foo', labels:[app:'nextflow', runName:'foo-boo']], spec: [ - automountServiceAccountToken: false, restartPolicy: 'Never', containers: [ [ name: 'foo-boo', image: 'the-image', - command: ['/bin/bash', '-c', "source /etc/nextflow/init.sh; nextflow run foo"], + command: ['/bin/bash', '-c', "source /etc/nextflow/init.sh; nextflow run foo; if [ -x /etc/nextflow/node-init-cleanup.sh ]; then /etc/nextflow/node-init-cleanup.sh || true; fi; exit \$status"], env: [ [name:'NXF_WORK', value:'/the/work/dir'], [name:'NXF_ASSETS', value:'/the/project/dir'], @@ -182,6 +182,7 @@ class K8sDriverLauncherTest extends Specification { ] ], serviceAccountName: 'bar', + automountServiceAccountToken: false, volumes: [ [name:'vol-1', persistentVolumeClaim:[claimName:'pvc-1']], [name:'vol-2', configMap:[name:'cfg-2']] @@ -229,13 +230,12 @@ class K8sDriverLauncherTest extends Specification { template: [ metadata: metadata, spec: [ - automountServiceAccountToken: false, restartPolicy: 'Never', containers: [ [ name: 'foo-boo', image: 'the-image', - command: ['/bin/bash', '-c', "source /etc/nextflow/init.sh; nextflow run foo"], + command: ['/bin/bash', '-c', "source /etc/nextflow/init.sh; nextflow run foo; if [ -x /etc/nextflow/node-init-cleanup.sh ]; then /etc/nextflow/node-init-cleanup.sh || true; fi; exit \$status"], env: [ [name:'NXF_WORK', value:'/the/work/dir'], [name:'NXF_ASSETS', value:'/the/project/dir'], @@ -249,6 +249,7 @@ class K8sDriverLauncherTest extends Specification { ] ], serviceAccountName: 'bar', + automountServiceAccountToken: false, volumes: [ [name:'vol-1', persistentVolumeClaim:[claimName:'pvc-1']], [name:'vol-2', configMap:[name:'cfg-2']] @@ -643,6 +644,7 @@ class K8sDriverLauncherTest extends Specification { def driver = Spy(K8sDriverLauncher) driver.@k8sConfig = config driver.@runName = POD_NAME + driver.@initDeployer = new K8sNodeInitDeployer(driver.k8sClient, config, driver.runName) when: driver.shutdown()