diff --git a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sConfig.groovy b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sConfig.groovy index eff1638..b80ac81 100644 --- a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sConfig.groovy +++ b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sConfig.groovy @@ -230,6 +230,18 @@ class K8sConfig implements ConfigScope { """) final Duration schedulerInterval + @ConfigOption + @Description(""" + Enables task runtime recording + """) + final boolean recordTaskRuntimes + + @ConfigOption + @Description(""" + The runtime recording file + """) + final String runtimeRecordPath + /* required by extension point -- do not remove */ K8sConfig() { this(Collections.emptyMap()) @@ -261,6 +273,8 @@ class K8sConfig implements ConfigScope { userName = opts.userName nextflowImage = opts.nextflowImage ?: "nextflow/nextflow:${BuildInfo.version}" schedulerInterval = opts.schedulerInterval as Duration ?: new Duration(10, TimeUnit.SECONDS) + recordTaskRuntimes = opts.recordTaskRuntimes as boolean ?: false + runtimeRecordPath = opts.runtimeRecordPath as String ?: "${launchDir}/work/runtimes.csv" launchDir = opts.launchDir ?: "${storageMountPath}/${getUserName()}" projectDir = opts.projectDir ?: "${storageMountPath}/projects" diff --git a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sExecutor.groovy b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sExecutor.groovy index 4e4d981..f7b5af6 100644 --- a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sExecutor.groovy +++ b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sExecutor.groovy @@ -55,6 +55,8 @@ class K8sExecutor extends Executor implements ExtensionPoint { private K8sTaskScheduler taskScheduler private Thread schedulerThread + K8sRuntimeRecorder runtimeRecorder + /** * @return The Kubernetes HTTP client. Delegates to a Guava cache that refreshes * the client (including the service account token) when the configured interval expires. @@ -91,6 +93,8 @@ class K8sExecutor extends Executor implements ExtensionPoint { log.debug "[K8s] config=$k8sConfig; API client config=$client.config" + this.runtimeRecorder = new K8sRuntimeRecorder(k8sConfig.recordTaskRuntimes, k8sConfig.runtimeRecordPath) + this.taskScheduler = new K8sTaskScheduler(this, new K8sHashSchedulingStrategy()) this.schedulerThread = new Thread(this.taskScheduler) this.schedulerThread.start() @@ -98,6 +102,7 @@ class K8sExecutor extends Executor implements ExtensionPoint { @Override void shutdown() { + this.runtimeRecorder.write() this.taskScheduler.stop() this.schedulerThread.join() } diff --git a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sRuntimeRecord.groovy b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sRuntimeRecord.groovy new file mode 100644 index 0000000..047b850 --- /dev/null +++ b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sRuntimeRecord.groovy @@ -0,0 +1,7 @@ +package nextflow.k8s + +record K8sRuntimeRecord( + String taskName, + long inputSize, + long runtimeMillis +) {} diff --git a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sRuntimeRecorder.groovy b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sRuntimeRecorder.groovy new file mode 100644 index 0000000..67f6f60 --- /dev/null +++ b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sRuntimeRecorder.groovy @@ -0,0 +1,49 @@ +package nextflow.k8s + +import groovy.util.logging.Slf4j + +import java.nio.file.Files +import java.nio.file.Path + +@Slf4j +class K8sRuntimeRecorder { + private final boolean enabled + private final String recordPath + + private ArrayList records + + K8sRuntimeRecorder(boolean enabled, String recordPath) { + this.enabled = enabled + this.recordPath = recordPath + this.records = new ArrayList<>() + } + + /** + * Records the runtime of a task. + * This should be called for a finished task, which has its + * start and end timestamps set. + * @param task + */ + synchronized void record(K8sTaskHandler task) { + if (!enabled) + return; + long runtimeMilis = task.completeTimeMillis - task.startTimeMillis + long inputSizeSum = 0 + def inputFiles = task.task.getInputFilesMap() + for (Map.Entry f : inputFiles) { + try { + inputSizeSum += Files.size(f.value) + } catch (IOException ex) { + log.error "[K8s] failed to get size of input file ${f.value} of task ${task.task.name}: ${ex.message}" + } + } + + records.add(new K8sRuntimeRecord(task.task.name, inputSizeSum, runtimeMilis)) + } + + void write() { + for (K8sRuntimeRecord record : records) { + log.info("[K8s] ${record.taskName} - ${record.inputSize} Bytes - ${record.runtimeMillis} ms") + } + } +} diff --git a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy index 9bf99d1..0bcc81b 100644 --- a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy +++ b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy @@ -471,6 +471,12 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask { deleteJobIfSuccessful(task) updateTimestamps(state.terminated as Map) determineNode() + + // Signal the scheduler that this task has finished running + if (executor != null) { + executor.taskScheduler.taskFinished(this) + executor.runtimeRecorder.record(this) + } return true } diff --git a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskScheduler.groovy b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskScheduler.groovy index 07a4929..883482b 100644 --- a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskScheduler.groovy +++ b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskScheduler.groovy @@ -32,6 +32,14 @@ class K8sTaskScheduler implements Runnable { /* TODO: Decide if we should invoke the scheduler immediately */ } + /** + * Notify the scheduler that a task has finished execution + * @param handler + */ + void taskFinished(K8sTaskHandler handler) { + + } + protected synchronized void drain() { while( true ) { final pending = new ArrayList(queue) diff --git a/test/nextflow.config b/test/nextflow.config index db8c569..8850a15 100644 --- a/test/nextflow.config +++ b/test/nextflow.config @@ -22,9 +22,10 @@ k8s { projectDir = '/workspace/projects' cleanup = false - nextflowImage = 'gitea.kleine.eulenhexe.de/kevin/ma/nextflow-dvfs:0.3.2' + nextflowImage = 'gitea.kleine.eulenhexe.de/kevin/ma/nextflow-dvfs:0.4.2' imagePullPolicy = 'IfNotPresent' schedulerInterval = '10s' + recordTaskRuntimes = true nodeInit { enabled = false