feat: simpler test workflow with configuration

This commit is contained in:
2026-05-18 21:06:30 +02:00
parent ad0d91aee0
commit 0ec084b720
8 changed files with 87 additions and 54 deletions

12
nextflow/Dockerfile.dev Normal file
View File

@@ -0,0 +1,12 @@
FROM eclipse-temurin:17-jre
WORKDIR /opt/nextflow
COPY . /opt/nextflow
RUN chmod +x /opt/nextflow/launch.sh /opt/nextflow/nextflow || true
ENV PATH="/opt/nextflow:${PATH}"
ENV NXF_HOME="/opt/nextflow/.nextflow"
ENTRYPOINT ["/opt/nextflow/launch.sh"]

View File

@@ -1,4 +1,4 @@
build=0 build=0
version=26.04.0 version=26.04.0
timestamp=1777432778635 timestamp=1778418181930
commitId=f9ab19000 commitId=ad0d91a

View File

@@ -212,10 +212,15 @@ class K8sConfig implements ConfigScope {
""") """)
final String workDir final String workDir
@ConfigOption
@Description("Node initialization config") @Description("Node initialization config")
final K8sNodeInitConfig nodeInit final K8sNodeInitConfig nodeInit
@ConfigOption
@Description("""
The image name of the nextflow launcher image
""")
final String nextflowImage
/* required by extension point -- do not remove */ /* required by extension point -- do not remove */
K8sConfig() { K8sConfig() {
this(Collections.emptyMap()) this(Collections.emptyMap())
@@ -245,6 +250,7 @@ class K8sConfig implements ConfigScope {
storageMountPath = opts.storageMountPath ?: '/workspace' storageMountPath = opts.storageMountPath ?: '/workspace'
storageSubPath = opts.storageSubPath storageSubPath = opts.storageSubPath
userName = opts.userName userName = opts.userName
nextflowImage = opts.nextflowImage ?: "nextflow/nextflow:${BuildInfo.version}"
launchDir = opts.launchDir ?: "${storageMountPath}/${getUserName()}" launchDir = opts.launchDir ?: "${storageMountPath}/${getUserName()}"
projectDir = opts.projectDir ?: "${storageMountPath}/projects" projectDir = opts.projectDir ?: "${storageMountPath}/projects"
@@ -266,6 +272,7 @@ class K8sConfig implements ConfigScope {
else if( securityContext ) else if( securityContext )
pod.securityContext = new PodSecurityContext(securityContext) pod.securityContext = new PodSecurityContext(securityContext)
log.info("Hello sailor")
nodeInit = new K8sNodeInitConfig(opts.nodeInit as Map ?: Collections.emptyMap()) nodeInit = new K8sNodeInitConfig(opts.nodeInit as Map ?: Collections.emptyMap())
} }
@@ -327,7 +334,7 @@ class K8sConfig implements ConfigScope {
boolean useJobResource() { ResourceType.Job.name() == computeResourceType } boolean useJobResource() { ResourceType.Job.name() == computeResourceType }
String getNextflowImageName() { String getNextflowImageName() {
return "nextflow/nextflow:${BuildInfo.version}" return nextflowImage
} }
PodOptions getPodOptions() { PodOptions getPodOptions() {

View File

@@ -1,9 +1,11 @@
package nextflow.k8s package nextflow.k8s
import groovy.util.logging.Slf4j
import nextflow.k8s.client.K8sClient import nextflow.k8s.client.K8sClient
import nextflow.k8s.model.PodHostMount import nextflow.k8s.model.PodHostMount
import nextflow.k8s.model.PodSpecBuilder import nextflow.k8s.model.PodSpecBuilder
@Slf4j
class K8sNodeInitDeployer { class K8sNodeInitDeployer {
private K8sClient client private K8sClient client
private K8sConfig config private K8sConfig config
@@ -20,12 +22,16 @@ class K8sNodeInitDeployer {
if ( !init?.enabled ) if ( !init?.enabled )
return return
log.info("deploying init pods")
final nodes = getNodes() final nodes = getNodes()
for ( String nodeName : nodes ) { for ( String nodeName : nodes ) {
log.info(" ... deploying to " + nodeName)
final spec = makePodSpec(init, nodeName) final spec = makePodSpec(init, nodeName)
client.podCreate(spec) client.podCreate(spec)
} }
log.info("waiting for init pods")
waitForPods(nodes) waitForPods(nodes)
} }

View File

@@ -20,7 +20,10 @@ class K8sTaskScheduler {
} }
void submit(K8sTaskHandler handler) { void submit(K8sTaskHandler handler) {
log.debug "[K8s] received queued task ${handler.task.name}"
queue.add(new K8sSchedulingRequest(handler)) queue.add(new K8sSchedulingRequest(handler))
drain()
} }
protected synchronized void drain() { protected synchronized void drain() {
@@ -34,7 +37,7 @@ class K8sTaskScheduler {
if ( !queue.remove(decision.request) ) if ( !queue.remove(decision.request) )
continue continue
log.trace "[K8s] launching queued task ${decision.request.task.name} on node: ${decision.nodeName}" log.debug "[K8s] launching queued task ${decision.request.task.name} on node: ${decision.nodeName}"
decision.request.handler.submitNow(decision.nodeName) decision.request.handler.submitNow(decision.nodeName)
} }
} }

View File

@@ -1,28 +1,31 @@
#!/bin/bash #!/bin/bash
# This creates a persistent volume and claim. # This creates a persistent volume and claim.
# Params: # Params:
# -p|--pv <pv-name> -c|--pvc <pvc-name> -s|--nfs-server <nfs-server> -n|--nfs-path <nfs-path> # -p|--pv <pv-name> -c|--pvc <pvc-name> -s|--nfs-server <nfs-server> -n|--nfs-path <nfs-path> -N|--namespace <name>
# pv-name - the name used for the persistent volume # pv-name - the name used for the persistent volume
# pvc-name - the name used for the claim # pvc-name - the name used for the claim
# nfs-server - IP or hostname of the NFS server # nfs-server - IP or hostname of the NFS server
# nfs-path - Path on the NFS server # nfs-path - Path on the NFS server
# namespace - Kubernetes namespace
PV_NAME= PV_NAME=
PVC_NAME= PVC_NAME=
NFS_SERVER= NFS_SERVER=
NFS_PATH= NFS_PATH=
SIZE= SIZE=
NAMESPACE=default
usage() usage()
{ {
echo "Usage: provision-volume-claim.sh < -p | --pv-name name > < -c | --pvc-name name > echo "Usage: provision-volume-claim.sh < -p | --pv-name name > < -c | --pvc-name name >
< -n | --nfs-server server > < -n | --nfs-server server >
< -m | --nfs-path path > < -m | --nfs-path path >
< -s | --size size >" < -s | --size size >
< -N | --namespace name >"
exit 2 exit 2
} }
PARSED_ARGUMENTS=$(getopt -a -n provision-volume-claim -o p:c:n:m:s: --long pv-name:,pvc-name:,nfs-server:,nfs-path:,size: -- "$@") PARSED_ARGUMENTS=$(getopt -a -n provision-volume-claim -o p:c:n:m:s:N: --long pv-name:,pvc-name:,nfs-server:,nfs-path:,size:,namespace: -- "$@")
VALID_ARGUMENTS=$? VALID_ARGUMENTS=$?
if [ "$VALID_ARGUMENTS" != "0" ]; then if [ "$VALID_ARGUMENTS" != "0" ]; then
usage usage
@@ -37,6 +40,7 @@ do
-n | --nfs-server) NFS_SERVER="$2" ; shift 2 ;; -n | --nfs-server) NFS_SERVER="$2" ; shift 2 ;;
-m | --nfs-path) NFS_PATH="$2" ; shift 2 ;; -m | --nfs-path) NFS_PATH="$2" ; shift 2 ;;
-s | --size) SIZE="$2" ; shift 2 ;; -s | --size) SIZE="$2" ; shift 2 ;;
-N | --namespace) NAMESPACE="$2" ; shift 2 ;;
# -- means the end of the arguments; drop this, and break out of the while loop # -- means the end of the arguments; drop this, and break out of the while loop
--) shift; break ;; --) shift; break ;;
*) echo "Unexpected option: $1 - this should not happen." *) echo "Unexpected option: $1 - this should not happen."
@@ -69,7 +73,6 @@ spec:
- ReadWriteMany - ReadWriteMany
persistentVolumeReclaimPolicy: Recycle persistentVolumeReclaimPolicy: Recycle
storageClassName: slow storageClassName: slow
mountOptions: mountOptions:
- hard - hard
- nfsvers=4.1 - nfsvers=4.1
@@ -88,6 +91,7 @@ apiVersion: v1
kind: PersistentVolumeClaim kind: PersistentVolumeClaim
metadata: metadata:
name: $PVC_NAME name: $PVC_NAME
namespace: $NAMESPACE
spec: spec:
accessModes: accessModes:
- ReadWriteMany - ReadWriteMany
@@ -106,7 +110,7 @@ cat $PVC_MANIFEST
# Generate cleanup script # Generate cleanup script
cat > cleanup-volume-claim.sh << EOM cat > cleanup-volume-claim.sh << EOM
#!/bin/bash #!/bin/bash
kubectl delete pvc $PVC_NAME kubectl delete pvc $PVC_NAME -n $NAMESPACE
kubectl delete pv $PV_NAME kubectl delete pv $PV_NAME
EOM EOM
chmod +x cleanup-volume-claim.sh chmod +x cleanup-volume-claim.sh

View File

@@ -1,52 +1,18 @@
params.str = "Hello, world!" nextflow.enable.dsl = 2
process split { process hello {
publishDir "results/lower" container 'ubuntu:22.04'
input:
val x
output: output:
path 'chunk_*' path 'hello.txt'
script: script:
""" """
printf '${x}' | split -b 6 - chunk_ echo "Hello, from kubernetes" > hello.txt
""" hostname >> hello.txt
}
process to_upper {
tag "$y"
input:
path y
output:
path 'upper_*'
script:
"""
cat $y | tr '[a-z]' '[A-Z]' > upper_${y}
""" """
} }
workflow { workflow {
main: hello()
ch_str = channel.of(params.str)
ch_chunks = split(ch_str)
ch_upper = to_upper(ch_chunks.flatten())
publish:
lower = ch_chunks.flatten()
upper = ch_upper
}
output {
lower {
path 'lower'
}
upper {
path 'upper'
}
} }

35
test/nextflow.config Normal file
View File

@@ -0,0 +1,35 @@
plugins {
id 'nf-k8s'
}
workDir = '/workspace'
process {
executor = 'k8s'
container = 'ubuntu:22.04'
pod = [
imagePullPolicy: 'IfNotPresent'
]
}
k8s {
namespace = 'default'
storageClaimName = 'my-claim'
storageMountPath = '/workspace'
launchDir = '/workspace/launch'
projectDir = '/workspace/projects'
cleanup = false
nextflowImage = 'gitea.kleine.eulenhexe.de/kevin/ma/nextflow-dvfs:0.1'
imagePullPolicy = 'IfNotPresent'
nodeInit {
enabled = false
image = 'gitea.kleine.eulenhexe.de/kevin/ma/dvfs-agent:0.1'
command = ['/bin/agent']
wait = 'Running'
cleanup = false
}
}