Compare commits

...

4 Commits

Author SHA1 Message Date
933611af55 feat: keep track of node occupancy 2026-05-24 23:04:13 +02:00
7f18e5aae8 feat: basic task runtime recording 2026-05-24 21:53:06 +02:00
2ae1ac6e38 chore: add nf-k8s test
Some checks failed
ci/woodpecker/push/nextflow-k8s Pipeline failed
2026-05-21 21:09:04 +02:00
78fe51a40d feat: invoke scheduler at regular interval 2026-05-21 20:37:55 +02:00
11 changed files with 216 additions and 25 deletions

View File

@@ -0,0 +1,8 @@
when:
- event: push
steps:
- name: Test Kubernetes Plugin
image: groovy:4-jdk21
commands:
- cd nextflow && ./gradlew :plugins:nf-k8s:test

View File

@@ -39,6 +39,9 @@ import nextflow.k8s.model.PodSecurityContext
import nextflow.k8s.model.PodVolumeClaim
import nextflow.k8s.model.ResourceType
import nextflow.util.Duration
import java.util.concurrent.TimeUnit
/**
* Model Kubernetes specific settings defined in the nextflow
* configuration file
@@ -221,6 +224,24 @@ class K8sConfig implements ConfigScope {
""")
final String nextflowImage
@ConfigOption
@Description("""
The run interval of the kubernetes scheduler
""")
final Duration schedulerInterval
@ConfigOption
@Description("""
Enables task runtime recording
""")
final boolean recordTaskRuntimes
@ConfigOption
@Description("""
The runtime recording file
""")
final String runtimeRecordPath
/* required by extension point -- do not remove */
K8sConfig() {
this(Collections.emptyMap())
@@ -251,6 +272,9 @@ class K8sConfig implements ConfigScope {
storageSubPath = opts.storageSubPath
userName = opts.userName
nextflowImage = opts.nextflowImage ?: "nextflow/nextflow:${BuildInfo.version}"
schedulerInterval = opts.schedulerInterval as Duration ?: new Duration(10, TimeUnit.SECONDS)
recordTaskRuntimes = opts.recordTaskRuntimes as boolean ?: false
runtimeRecordPath = opts.runtimeRecordPath as String ?: "${launchDir}/work/runtimes.csv"
launchDir = opts.launchDir ?: "${storageMountPath}/${getUserName()}"
projectDir = opts.projectDir ?: "${storageMountPath}/projects"
@@ -272,7 +296,6 @@ class K8sConfig implements ConfigScope {
else if( securityContext )
pod.securityContext = new PodSecurityContext(securityContext)
log.info("Hello sailor")
nodeInit = new K8sNodeInitConfig(opts.nodeInit as Map ?: Collections.emptyMap())
}

View File

@@ -52,19 +52,21 @@ class K8sExecutor extends Executor implements ExtensionPoint {
*/
private Cache<String, K8sClient> clientCache
private K8sTaskScheduler taskScheduler
private K8sTaskScheduler taskScheduler
private Thread schedulerThread
K8sRuntimeRecorder runtimeRecorder
/**
* @return The Kubernetes HTTP client. Delegates to a Guava cache that refreshes
* the client (including the service account token) when the configured interval expires.
*/
protected K8sClient getClient() {
K8sClient getClient() {
clientCache.get('client', () -> new K8sClient(k8sConfig.getClient()))
}
protected K8sTaskScheduler getTaskScheduler() {
if( taskScheduler == null )
taskScheduler = new K8sTaskScheduler(getClient(), new K8sHashSchedulingStrategy())
assert taskScheduler != null
return taskScheduler
}
@@ -88,7 +90,21 @@ class K8sExecutor extends Executor implements ExtensionPoint {
.expireAfterWrite(refreshInterval.toMillis(), TimeUnit.MILLISECONDS)
.build()
final client = getClient()
log.debug "[K8s] config=$k8sConfig; API client config=$client.config"
this.runtimeRecorder = new K8sRuntimeRecorder(k8sConfig.recordTaskRuntimes, k8sConfig.runtimeRecordPath)
this.taskScheduler = new K8sTaskScheduler(this, new K8sHashSchedulingStrategy())
this.schedulerThread = new Thread(this.taskScheduler)
this.schedulerThread.start()
}
@Override
void shutdown() {
this.runtimeRecorder.write()
this.taskScheduler.stop()
this.schedulerThread.join()
}
/**

View File

@@ -0,0 +1,7 @@
package nextflow.k8s
record K8sRuntimeRecord(
String taskName,
long inputSize,
long runtimeMillis
) {}

View File

@@ -0,0 +1,49 @@
package nextflow.k8s
import groovy.util.logging.Slf4j
import java.nio.file.Files
import java.nio.file.Path
@Slf4j
class K8sRuntimeRecorder {
private final boolean enabled
private final String recordPath
private ArrayList<K8sRuntimeRecord> records
K8sRuntimeRecorder(boolean enabled, String recordPath) {
this.enabled = enabled
this.recordPath = recordPath
this.records = new ArrayList<>()
}
/**
* Records the runtime of a task.
* This should be called for a finished task, which has its
* start and end timestamps set.
* @param task
*/
synchronized void record(K8sTaskHandler task) {
if (!enabled)
return;
long runtimeMilis = task.completeTimeMillis - task.startTimeMillis
long inputSizeSum = 0
def inputFiles = task.task.getInputFilesMap()
for (Map.Entry<String, Path> f : inputFiles) {
try {
inputSizeSum += Files.size(f.value)
} catch (IOException ex) {
log.error "[K8s] failed to get size of input file ${f.value} of task ${task.task.name}: ${ex.message}"
}
}
records.add(new K8sRuntimeRecord(task.task.processor.name, inputSizeSum, runtimeMilis))
}
void write() {
for (K8sRuntimeRecord record : records) {
log.info("[K8s] ${record.taskName} - ${record.inputSize} Bytes - ${record.runtimeMillis} ms")
}
}
}

View File

@@ -5,9 +5,10 @@ interface K8sSchedulingStrategy {
* Selects the next task to run from the given task queue.
* If the scheduler should wait, returns {@code null} instead
*
* @param scheduler the calling scheduler object
* @param queue Pending scheduling requests
* @param nodes Available Kubernetes node names
* @return A launch decision, or {@code null} when no task should be launched now
*/
K8sSchedulingDecision schedule(List<K8sSchedulingRequest> queue, List<String> nodes)
K8sSchedulingDecision schedule(K8sTaskScheduler scheduler, List<K8sSchedulingRequest> queue, List<String> nodes)
}

View File

@@ -334,6 +334,7 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
void submit() {
builder = createBashWrapper(task)
builder.build()
log.info "[K8s] submitting task ${this.task.name}"
executor.taskScheduler.submit(this)
}
@@ -470,6 +471,12 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
deleteJobIfSuccessful(task)
updateTimestamps(state.terminated as Map)
determineNode()
// Signal the scheduler that this task has finished running
if (executor != null) {
executor.taskScheduler.taskFinished(this)
executor.runtimeRecorder.record(this)
}
return true
}

View File

@@ -8,28 +8,62 @@ import java.util.concurrent.LinkedBlockingQueue
@Slf4j
@CompileStatic
class K8sTaskScheduler {
private final K8sClient client
class K8sTaskScheduler implements Runnable {
private final K8sExecutor executor
private final K8sSchedulingStrategy strategy
private final LinkedBlockingQueue<K8sSchedulingRequest> queue = new LinkedBlockingQueue<>()
private List<String> cachedNodes
K8sTaskScheduler(K8sClient client, K8sSchedulingStrategy strategy) {
this.client = client
private synchronized boolean shouldStop
private HashMap<String, ArrayList<K8sTaskHandler>> occupancy;
private HashMap<String, String> taskToNode
K8sTaskScheduler(K8sExecutor executor, K8sSchedulingStrategy strategy) {
this.executor = executor
this.strategy = strategy
this.occupancy = new HashMap<>()
for (String node : getNodes()) {
this.occupancy.put(node, new ArrayList<K8sTaskHandler>())
}
this.taskToNode = new HashMap<>()
}
/**
* Adds a task to the queue of outstanding tasks
* @param handler
*/
void submit(K8sTaskHandler handler) {
log.debug "[K8s] received queued task ${handler.task.name}"
log.info "[K8s] received queued task ${handler.task.name}"
queue.add(new K8sSchedulingRequest(handler))
drain()
/* TODO: Decide if we should invoke the scheduler immediately */
}
/**
* Notify the scheduler that a task has finished execution
* @param handler
*/
void taskFinished(K8sTaskHandler handler) {
/* Remove from occupancy list */
String nodeName = taskToNode.get(handler.task.name)
if (nodeName == null) {
log.error "[K8s] no node saved for task ${handler.task.name}"
return
}
ArrayList<K8sTaskHandler> nodeTasks = occupancy.get(nodeName)
if (nodeTasks == null) {
log.error "[K8s] tried to remove task ${handler.task.name} from node ${nodeName} but no tasks are saved for that node"
return
}
nodeTasks.remove(handler)
log.info "[K8s] removed task ${handler.task.name} from node ${nodeName}"
}
protected synchronized void drain() {
while( true ) {
final pending = new ArrayList<K8sSchedulingRequest>(queue)
final decision = strategy.schedule(pending, getNodes())
final decision = strategy.schedule(this, pending, getNodes())
if ( !decision )
return
@@ -37,11 +71,42 @@ class K8sTaskScheduler {
if ( !queue.remove(decision.request) )
continue
log.debug "[K8s] launching queued task ${decision.request.task.name} on node: ${decision.nodeName}"
log.info "[K8s] launching queued task ${decision.request.task.name} on node: ${decision.nodeName}"
decision.request.handler.submitNow(decision.nodeName)
ArrayList nodeTasks = occupancy.get(decision.nodeName)
assert nodeTasks != null
nodeTasks.add(decision.request.handler)
taskToNode.put(decision.request.task.name, decision.nodeName)
}
}
/* Scheduling Strategy Interface */
List<String> getFreeNodes() {
def unoccupied = occupancy.findAll {it.value == null || it.value.size() == 0 }
unoccupied.keySet().toList()
}
/**
* Run is the scheduler threads main function
* */
void run() {
this.shouldStop = false
def interval = this.executor.getK8sConfig().schedulerInterval
while (!shouldStop) {
sleep(interval.toMillis())
drain()
}
log.info("[K8s] terminated scheduler loop")
}
/**
* Stop terminates the scheduler thread
*/
void stop() {
log.info("[K8s] stopping scheduler loop")
this.shouldStop = true
}
protected List<String> getNodes() {
if ( cachedNodes == null )
cachedNodes = fetchNodes()
@@ -50,7 +115,7 @@ class K8sTaskScheduler {
@CompileDynamic
private List<String> fetchNodes() {
final resp = client.nodeList()
final resp = executor.getClient().nodeList()
ArrayList<String> nodes = new ArrayList<String>()
for ( Map item : resp.items ) {
nodes.add(item.metadata.name as String)

View File

@@ -4,16 +4,22 @@ import groovy.transform.CompileStatic
import nextflow.k8s.K8sSchedulingDecision
import nextflow.k8s.K8sSchedulingRequest
import nextflow.k8s.K8sSchedulingStrategy
import nextflow.k8s.K8sTaskScheduler
@CompileStatic
class K8sHashSchedulingStrategy implements K8sSchedulingStrategy {
@Override
K8sSchedulingDecision schedule(List<K8sSchedulingRequest> queue, List<String> nodes) {
K8sSchedulingDecision schedule(K8sTaskScheduler scheduler, List<K8sSchedulingRequest> queue, List<String> nodes) {
if ( !queue || !nodes )
return null
final freeNodes = scheduler.freeNodes
if ( freeNodes.size() == 0 )
return null
final request = queue[0]
final index = Math.floorMod(request.task.hash.asInt(), nodes.size())
return new K8sSchedulingDecision(request, nodes[index])
final index = Math.floorMod(request.task.hash.asInt(), freeNodes.size())
return new K8sSchedulingDecision(request, freeNodes[index])
}
}

View File

@@ -8,9 +8,9 @@ class K8sTaskSchedulerTest extends Specification {
def 'should enqueue task handler without immediately submitting it' () {
given:
def client = Mock(K8sClient)
def executor = Mock(K8sExecutor)
def strategy = Mock(K8sSchedulingStrategy)
def scheduler = new K8sTaskScheduler(client, strategy)
def scheduler = new K8sTaskScheduler(executor, strategy)
def task = Mock(TaskRun)
def handler = Spy(K8sTaskHandler)
@@ -20,8 +20,9 @@ class K8sTaskSchedulerTest extends Specification {
scheduler.submit(handler)
then:
1 * scheduler.() >> ['A', 'B', 'C']
0 * strategy._
0 * client._
0 * executor._
0 * handler.submitNow(_)
and:
@@ -31,8 +32,9 @@ class K8sTaskSchedulerTest extends Specification {
def 'should drain queued task and submit it to selected node' () {
given:
def client = Mock(K8sClient)
def executor = Mock(K8sExecutor)
def strategy = Mock(K8sSchedulingStrategy)
def scheduler = new K8sTaskScheduler(client, strategy)
def scheduler = new K8sTaskScheduler(executor, strategy)
def task = Mock(TaskRun)
def handler = Spy(K8sTaskHandler)
@@ -43,6 +45,7 @@ class K8sTaskSchedulerTest extends Specification {
scheduler.drain()
then:
1 * executor.getClient() >> client
1 * client.nodeList() >> [
items: [
[metadata: [name: 'node-a']],
@@ -70,8 +73,9 @@ class K8sTaskSchedulerTest extends Specification {
def 'should keep task queued when strategy returns no decision' () {
given:
def client = Mock(K8sClient)
def executor = Mock(K8sExecutor)
def strategy = Mock(K8sSchedulingStrategy)
def scheduler = new K8sTaskScheduler(client, strategy)
def scheduler = new K8sTaskScheduler(executor, strategy)
def task = Mock(TaskRun)
def handler = Spy(K8sTaskHandler)
@@ -82,6 +86,7 @@ class K8sTaskSchedulerTest extends Specification {
scheduler.drain()
then:
1 * executor.getClient() >> client
1 * client.nodeList() >> [
items: [
[metadata: [name: 'node-a']]
@@ -101,8 +106,9 @@ class K8sTaskSchedulerTest extends Specification {
def 'should continue draining when selected request was already removed' () {
given:
def client = Mock(K8sClient)
def executor = Mock(K8sExecutor)
def strategy = Mock(K8sSchedulingStrategy)
def scheduler = new K8sTaskScheduler(client, strategy)
def scheduler = new K8sTaskScheduler(executor, strategy)
def task1 = Mock(TaskRun)
def handler1 = Spy(K8sTaskHandler)
@@ -118,6 +124,7 @@ class K8sTaskSchedulerTest extends Specification {
scheduler.drain()
then:
1 * executor.getClient() >> client
1 * client.nodeList() >> [
items: [
[metadata: [name: 'node-a']]

View File

@@ -22,8 +22,10 @@ k8s {
projectDir = '/workspace/projects'
cleanup = false
nextflowImage = 'gitea.kleine.eulenhexe.de/kevin/ma/nextflow-dvfs:0.1'
nextflowImage = 'gitea.kleine.eulenhexe.de/kevin/ma/nextflow-dvfs:0.5.3'
imagePullPolicy = 'IfNotPresent'
schedulerInterval = '10s'
recordTaskRuntimes = true
nodeInit {
enabled = false