feat: basic task runtime recording

This commit is contained in:
2026-05-24 21:53:06 +02:00
parent 2ae1ac6e38
commit 7f18e5aae8
7 changed files with 91 additions and 1 deletions

View File

@@ -230,6 +230,18 @@ class K8sConfig implements ConfigScope {
""") """)
final Duration schedulerInterval final Duration schedulerInterval
@ConfigOption
@Description("""
Enables task runtime recording
""")
final boolean recordTaskRuntimes
@ConfigOption
@Description("""
The runtime recording file
""")
final String runtimeRecordPath
/* required by extension point -- do not remove */ /* required by extension point -- do not remove */
K8sConfig() { K8sConfig() {
this(Collections.emptyMap()) this(Collections.emptyMap())
@@ -261,6 +273,8 @@ class K8sConfig implements ConfigScope {
userName = opts.userName userName = opts.userName
nextflowImage = opts.nextflowImage ?: "nextflow/nextflow:${BuildInfo.version}" nextflowImage = opts.nextflowImage ?: "nextflow/nextflow:${BuildInfo.version}"
schedulerInterval = opts.schedulerInterval as Duration ?: new Duration(10, TimeUnit.SECONDS) schedulerInterval = opts.schedulerInterval as Duration ?: new Duration(10, TimeUnit.SECONDS)
recordTaskRuntimes = opts.recordTaskRuntimes as boolean ?: false
runtimeRecordPath = opts.runtimeRecordPath as String ?: "${launchDir}/work/runtimes.csv"
launchDir = opts.launchDir ?: "${storageMountPath}/${getUserName()}" launchDir = opts.launchDir ?: "${storageMountPath}/${getUserName()}"
projectDir = opts.projectDir ?: "${storageMountPath}/projects" projectDir = opts.projectDir ?: "${storageMountPath}/projects"

View File

@@ -55,6 +55,8 @@ class K8sExecutor extends Executor implements ExtensionPoint {
private K8sTaskScheduler taskScheduler private K8sTaskScheduler taskScheduler
private Thread schedulerThread private Thread schedulerThread
K8sRuntimeRecorder runtimeRecorder
/** /**
* @return The Kubernetes HTTP client. Delegates to a Guava cache that refreshes * @return The Kubernetes HTTP client. Delegates to a Guava cache that refreshes
* the client (including the service account token) when the configured interval expires. * the client (including the service account token) when the configured interval expires.
@@ -91,6 +93,8 @@ class K8sExecutor extends Executor implements ExtensionPoint {
log.debug "[K8s] config=$k8sConfig; API client config=$client.config" log.debug "[K8s] config=$k8sConfig; API client config=$client.config"
this.runtimeRecorder = new K8sRuntimeRecorder(k8sConfig.recordTaskRuntimes, k8sConfig.runtimeRecordPath)
this.taskScheduler = new K8sTaskScheduler(this, new K8sHashSchedulingStrategy()) this.taskScheduler = new K8sTaskScheduler(this, new K8sHashSchedulingStrategy())
this.schedulerThread = new Thread(this.taskScheduler) this.schedulerThread = new Thread(this.taskScheduler)
this.schedulerThread.start() this.schedulerThread.start()
@@ -98,6 +102,7 @@ class K8sExecutor extends Executor implements ExtensionPoint {
@Override @Override
void shutdown() { void shutdown() {
this.runtimeRecorder.write()
this.taskScheduler.stop() this.taskScheduler.stop()
this.schedulerThread.join() this.schedulerThread.join()
} }

View File

@@ -0,0 +1,7 @@
package nextflow.k8s
record K8sRuntimeRecord(
String taskName,
long inputSize,
long runtimeMillis
) {}

View File

@@ -0,0 +1,49 @@
package nextflow.k8s
import groovy.util.logging.Slf4j
import java.nio.file.Files
import java.nio.file.Path
@Slf4j
class K8sRuntimeRecorder {
private final boolean enabled
private final String recordPath
private ArrayList<K8sRuntimeRecord> records
K8sRuntimeRecorder(boolean enabled, String recordPath) {
this.enabled = enabled
this.recordPath = recordPath
this.records = new ArrayList<>()
}
/**
* Records the runtime of a task.
* This should be called for a finished task, which has its
* start and end timestamps set.
* @param task
*/
synchronized void record(K8sTaskHandler task) {
if (!enabled)
return;
long runtimeMilis = task.completeTimeMillis - task.startTimeMillis
long inputSizeSum = 0
def inputFiles = task.task.getInputFilesMap()
for (Map.Entry<String, Path> f : inputFiles) {
try {
inputSizeSum += Files.size(f.value)
} catch (IOException ex) {
log.error "[K8s] failed to get size of input file ${f.value} of task ${task.task.name}: ${ex.message}"
}
}
records.add(new K8sRuntimeRecord(task.task.name, inputSizeSum, runtimeMilis))
}
void write() {
for (K8sRuntimeRecord record : records) {
log.info("[K8s] ${record.taskName} - ${record.inputSize} Bytes - ${record.runtimeMillis} ms")
}
}
}

View File

@@ -471,6 +471,12 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
deleteJobIfSuccessful(task) deleteJobIfSuccessful(task)
updateTimestamps(state.terminated as Map) updateTimestamps(state.terminated as Map)
determineNode() determineNode()
// Signal the scheduler that this task has finished running
if (executor != null) {
executor.taskScheduler.taskFinished(this)
executor.runtimeRecorder.record(this)
}
return true return true
} }

View File

@@ -32,6 +32,14 @@ class K8sTaskScheduler implements Runnable {
/* TODO: Decide if we should invoke the scheduler immediately */ /* TODO: Decide if we should invoke the scheduler immediately */
} }
/**
* Notify the scheduler that a task has finished execution
* @param handler
*/
void taskFinished(K8sTaskHandler handler) {
}
protected synchronized void drain() { protected synchronized void drain() {
while( true ) { while( true ) {
final pending = new ArrayList<K8sSchedulingRequest>(queue) final pending = new ArrayList<K8sSchedulingRequest>(queue)

View File

@@ -22,9 +22,10 @@ k8s {
projectDir = '/workspace/projects' projectDir = '/workspace/projects'
cleanup = false cleanup = false
nextflowImage = 'gitea.kleine.eulenhexe.de/kevin/ma/nextflow-dvfs:0.3.2' nextflowImage = 'gitea.kleine.eulenhexe.de/kevin/ma/nextflow-dvfs:0.4.2'
imagePullPolicy = 'IfNotPresent' imagePullPolicy = 'IfNotPresent'
schedulerInterval = '10s' schedulerInterval = '10s'
recordTaskRuntimes = true
nodeInit { nodeInit {
enabled = false enabled = false