feat: implement background mode init-pod cleanup

This commit is contained in:
2026-05-03 15:37:14 +02:00
parent 022b6f9547
commit a54db66cb9
4 changed files with 98 additions and 8 deletions

View File

@@ -132,6 +132,8 @@ class K8sDriverLauncher {
*/ */
private String plugins private String plugins
private K8sNodeInitDeployer initDeployer
/** /**
* Launcher entry point. Set-up the environment and create a pod that run the Nextflow * Launcher entry point. Set-up the environment and create a pod that run the Nextflow
* application (which in turns executed each task as a pod) * application (which in turns executed each task as a pod)
@@ -149,12 +151,11 @@ class K8sDriverLauncher {
this.k8sConfig = makeK8sConfig(config.toMap()) this.k8sConfig = makeK8sConfig(config.toMap())
this.k8sClient = makeK8sClient(k8sConfig) this.k8sClient = makeK8sClient(k8sConfig)
this.k8sConfig.checkStorageAndPaths(k8sClient) this.k8sConfig.checkStorageAndPaths(k8sClient)
this.initDeployer = new K8sNodeInitDeployer(k8sClient, k8sConfig, runName)
createK8sConfigMap() createK8sConfigMap()
K8sNodeInitDeployer initDeployer = new K8sNodeInitDeployer(k8sClient, k8sConfig, runName)
initDeployer.deploy() initDeployer.deploy()
createK8sLauncherPod() createK8sLauncherPod()
waitPodStart() waitPodStart()
// login into container session // login into container session
@@ -178,6 +179,9 @@ class K8sDriverLauncher {
if( k8sConfig.getCleanup(deleteOnSuccessByDefault) ) { if( k8sConfig.getCleanup(deleteOnSuccessByDefault) ) {
deleteConfigMap() deleteConfigMap()
} }
// cleanup pre-workflow pods
initDeployer.cleanup()
return exitCode return exitCode
} }
@@ -532,7 +536,9 @@ class K8sDriverLauncher {
assert k8sClient assert k8sClient
// -- setup config file // -- setup config file
String cmd = "source /etc/nextflow/init.sh; ${getLaunchCli()}" String cmd = "source /etc/nextflow/init.sh; ${getLaunchCli()}" +
"; if [ -x /etc/nextflow/node-init-cleanup.sh ]; then /etc/nextflow/node-init-cleanup.sh || true; fi; " +
"exit \$status"
// create the launcher pod // create the launcher pod
PodSpecBuilder builder = new PodSpecBuilder() PodSpecBuilder builder = new PodSpecBuilder()
@@ -662,6 +668,10 @@ class K8sDriverLauncher {
paramsFile = "/etc/nextflow/$file.name" paramsFile = "/etc/nextflow/$file.name"
} }
// pre-workflow pod cleanup
if ( background )
configMap['node-init-cleanup.sh'] = initDeployer.buildCleanupScript()
// create the config map // create the config map
configMapName = makeConfigMapName(configMap) configMapName = makeConfigMapName(configMap)
tryCreateConfigMap(configMapName, configMap) tryCreateConfigMap(configMapName, configMap)

View File

@@ -29,6 +29,10 @@ class K8sNodeInitConfig implements ConfigScope {
@Description("the pod state to wait on") @Description("the pod state to wait on")
final String wait; final String wait;
@ConfigOption
@Description("enables cleanup of pre-workflow nodes.")
final boolean cleanup;
K8sNodeInitConfig() { K8sNodeInitConfig() {
this(Collections.emptyMap()) this(Collections.emptyMap())
} }
@@ -38,5 +42,6 @@ class K8sNodeInitConfig implements ConfigScope {
image = opts.image as String image = opts.image as String
command = opts.command as List<String> command = opts.command as List<String>
wait = opts.wait as String wait = opts.wait as String
cleanup = opts.cleanup as boolean
} }
} }

View File

@@ -29,6 +29,78 @@ class K8sNodeInitDeployer {
waitForPods(nodes) waitForPods(nodes)
} }
void cleanup() {
final init = config.nodeInit
if ( !init?.enabled || !init?.cleanup )
return
final nodes = getNodes()
for ( String nodeName : nodes ) {
final podName = buildPodName(nodeName)
client.podDelete(podName)
}
}
String buildCleanupScript() {
if ( !config.nodeInit?.enabled || !config.nodeInit?.cleanup )
return "#!/usr/bin/env/bash\nexit 0"
final nodes = getNodes()
final podNames = nodes.collect {buildPodName(it)}
String script = '''
#!/usr/bin/env bash
set -u
PODS=('''
for ( String podName : podNames ) {
script += "\"${podName}\"\n"
}
script += '''
)
SERVICE_ACCOUNT_DIR="/var/run/secrets/kubernetes.io/serviceaccount"
TOKEN="$(cat "${SERVICE_ACCOUNT_DIR}/token")"
NAMESPACE="$(cat "${SERVICE_ACCOUNT_DIR}/namespace")"
CA_CERT="${SERVICE_ACCOUNT_DIR}/ca.crt"
KUBE_API="https://${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT_HTTPS:-443}"
for POD in "${PODS[@]}"; do
echo "Deleting pod: ${POD}"
HTTP_CODE="$(
curl \\
--silent \\
--show-error \\
--output /tmp/delete-pod-response.json \\
--write-out "%{http_code}" \\
--request DELETE \\
--cacert "${CA_CERT}" \\
--header "Authorization: Bearer ${TOKEN}" \\
--header "Accept: application/json" \\
"${KUBE_API}/api/v1/namespaces/${NAMESPACE}/pods/${POD}"
)"
case "${HTTP_CODE}" in
200|202)
echo "Deleted pod: ${POD}"
;;
404)
echo "Pod already absent: ${POD}"
;;
*)
echo "Failed to delete pod: ${POD}; HTTP ${HTTP_CODE}" >&2
cat /tmp/delete-pod-response.json >&2 || true
;;
esac
done
exit 0
'''
return script
}
private List<String> getNodes() { private List<String> getNodes() {
final resp = client.nodeList() final resp = client.nodeList()
ArrayList<String> nodes = new ArrayList<String>() ArrayList<String> nodes = new ArrayList<String>()
@@ -60,6 +132,7 @@ class K8sNodeInitDeployer {
.withPrivileged(true) .withPrivileged(true)
.withHostMounts(mounts) .withHostMounts(mounts)
.withPodName(buildPodName(nodeName)) .withPodName(buildPodName(nodeName))
.withNamespace(client.config.namespace)
.build() .build()
} }
@@ -69,7 +142,7 @@ class K8sNodeInitDeployer {
String podName = buildPodName(nodeName) String podName = buildPodName(nodeName)
while (true) { while (true) {
sleep 1000 sleep 1000
final state = k8sClient.podState(podName) final state = client.podState(podName)
if (state && !state.containsKey('waiting')) { if (state && !state.containsKey('waiting')) {
break break
} }

View File

@@ -138,6 +138,7 @@ class K8sDriverLauncherTest extends Specification {
def pod = Mock(PodOptions) def pod = Mock(PodOptions)
pod.getVolumeClaims() >> [ new PodVolumeClaim('pvc-1', '/mnt/path/data') ] pod.getVolumeClaims() >> [ new PodVolumeClaim('pvc-1', '/mnt/path/data') ]
pod.getMountConfigMaps() >> [ new PodMountConfig('cfg-2', '/mnt/path/cfg') ] pod.getMountConfigMaps() >> [ new PodMountConfig('cfg-2', '/mnt/path/cfg') ]
pod.automountServiceAccountToken = false
def k8s = Mock(K8sConfig) def k8s = Mock(K8sConfig)
k8s.getNextflowImageName() >> 'the-image' k8s.getNextflowImageName() >> 'the-image'
@@ -162,13 +163,12 @@ class K8sDriverLauncherTest extends Specification {
kind: 'Pod', kind: 'Pod',
metadata: [name:'foo-boo', namespace:'foo', labels:[app:'nextflow', runName:'foo-boo']], metadata: [name:'foo-boo', namespace:'foo', labels:[app:'nextflow', runName:'foo-boo']],
spec: [ spec: [
automountServiceAccountToken: false,
restartPolicy: 'Never', restartPolicy: 'Never',
containers: [ containers: [
[ [
name: 'foo-boo', name: 'foo-boo',
image: 'the-image', image: 'the-image',
command: ['/bin/bash', '-c', "source /etc/nextflow/init.sh; nextflow run foo"], command: ['/bin/bash', '-c', "source /etc/nextflow/init.sh; nextflow run foo; if [ -x /etc/nextflow/node-init-cleanup.sh ]; then /etc/nextflow/node-init-cleanup.sh || true; fi; exit \$status"],
env: [ env: [
[name:'NXF_WORK', value:'/the/work/dir'], [name:'NXF_WORK', value:'/the/work/dir'],
[name:'NXF_ASSETS', value:'/the/project/dir'], [name:'NXF_ASSETS', value:'/the/project/dir'],
@@ -182,6 +182,7 @@ class K8sDriverLauncherTest extends Specification {
] ]
], ],
serviceAccountName: 'bar', serviceAccountName: 'bar',
automountServiceAccountToken: false,
volumes: [ volumes: [
[name:'vol-1', persistentVolumeClaim:[claimName:'pvc-1']], [name:'vol-1', persistentVolumeClaim:[claimName:'pvc-1']],
[name:'vol-2', configMap:[name:'cfg-2']] [name:'vol-2', configMap:[name:'cfg-2']]
@@ -229,13 +230,12 @@ class K8sDriverLauncherTest extends Specification {
template: [ template: [
metadata: metadata, metadata: metadata,
spec: [ spec: [
automountServiceAccountToken: false,
restartPolicy: 'Never', restartPolicy: 'Never',
containers: [ containers: [
[ [
name: 'foo-boo', name: 'foo-boo',
image: 'the-image', image: 'the-image',
command: ['/bin/bash', '-c', "source /etc/nextflow/init.sh; nextflow run foo"], command: ['/bin/bash', '-c', "source /etc/nextflow/init.sh; nextflow run foo; if [ -x /etc/nextflow/node-init-cleanup.sh ]; then /etc/nextflow/node-init-cleanup.sh || true; fi; exit \$status"],
env: [ env: [
[name:'NXF_WORK', value:'/the/work/dir'], [name:'NXF_WORK', value:'/the/work/dir'],
[name:'NXF_ASSETS', value:'/the/project/dir'], [name:'NXF_ASSETS', value:'/the/project/dir'],
@@ -249,6 +249,7 @@ class K8sDriverLauncherTest extends Specification {
] ]
], ],
serviceAccountName: 'bar', serviceAccountName: 'bar',
automountServiceAccountToken: false,
volumes: [ volumes: [
[name:'vol-1', persistentVolumeClaim:[claimName:'pvc-1']], [name:'vol-1', persistentVolumeClaim:[claimName:'pvc-1']],
[name:'vol-2', configMap:[name:'cfg-2']] [name:'vol-2', configMap:[name:'cfg-2']]
@@ -643,6 +644,7 @@ class K8sDriverLauncherTest extends Specification {
def driver = Spy(K8sDriverLauncher) def driver = Spy(K8sDriverLauncher)
driver.@k8sConfig = config driver.@k8sConfig = config
driver.@runName = POD_NAME driver.@runName = POD_NAME
driver.@initDeployer = new K8sNodeInitDeployer(driver.k8sClient, config, driver.runName)
when: when:
driver.shutdown() driver.shutdown()