feat: keep track of node occupancy
This commit is contained in:
@@ -38,7 +38,7 @@ class K8sRuntimeRecorder {
|
||||
}
|
||||
}
|
||||
|
||||
records.add(new K8sRuntimeRecord(task.task.name, inputSizeSum, runtimeMilis))
|
||||
records.add(new K8sRuntimeRecord(task.task.processor.name, inputSizeSum, runtimeMilis))
|
||||
}
|
||||
|
||||
void write() {
|
||||
|
||||
@@ -5,9 +5,10 @@ interface K8sSchedulingStrategy {
|
||||
* Selects the next task to run from the given task queue.
|
||||
* If the scheduler should wait, returns {@code null} instead
|
||||
*
|
||||
* @param scheduler the calling scheduler object
|
||||
* @param queue Pending scheduling requests
|
||||
* @param nodes Available Kubernetes node names
|
||||
* @return A launch decision, or {@code null} when no task should be launched now
|
||||
*/
|
||||
K8sSchedulingDecision schedule(List<K8sSchedulingRequest> queue, List<String> nodes)
|
||||
K8sSchedulingDecision schedule(K8sTaskScheduler scheduler, List<K8sSchedulingRequest> queue, List<String> nodes)
|
||||
}
|
||||
|
||||
@@ -16,9 +16,17 @@ class K8sTaskScheduler implements Runnable {
|
||||
|
||||
private synchronized boolean shouldStop
|
||||
|
||||
private HashMap<String, ArrayList<K8sTaskHandler>> occupancy;
|
||||
private HashMap<String, String> taskToNode
|
||||
|
||||
K8sTaskScheduler(K8sExecutor executor, K8sSchedulingStrategy strategy) {
|
||||
this.executor = executor
|
||||
this.strategy = strategy
|
||||
this.occupancy = new HashMap<>()
|
||||
for (String node : getNodes()) {
|
||||
this.occupancy.put(node, new ArrayList<K8sTaskHandler>())
|
||||
}
|
||||
this.taskToNode = new HashMap<>()
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -37,13 +45,25 @@ class K8sTaskScheduler implements Runnable {
|
||||
* @param handler
|
||||
*/
|
||||
void taskFinished(K8sTaskHandler handler) {
|
||||
|
||||
/* Remove from occupancy list */
|
||||
String nodeName = taskToNode.get(handler.task.name)
|
||||
if (nodeName == null) {
|
||||
log.error "[K8s] no node saved for task ${handler.task.name}"
|
||||
return
|
||||
}
|
||||
ArrayList<K8sTaskHandler> nodeTasks = occupancy.get(nodeName)
|
||||
if (nodeTasks == null) {
|
||||
log.error "[K8s] tried to remove task ${handler.task.name} from node ${nodeName} but no tasks are saved for that node"
|
||||
return
|
||||
}
|
||||
nodeTasks.remove(handler)
|
||||
log.info "[K8s] removed task ${handler.task.name} from node ${nodeName}"
|
||||
}
|
||||
|
||||
protected synchronized void drain() {
|
||||
while( true ) {
|
||||
final pending = new ArrayList<K8sSchedulingRequest>(queue)
|
||||
final decision = strategy.schedule(pending, getNodes())
|
||||
final decision = strategy.schedule(this, pending, getNodes())
|
||||
|
||||
if ( !decision )
|
||||
return
|
||||
@@ -53,9 +73,19 @@ class K8sTaskScheduler implements Runnable {
|
||||
|
||||
log.info "[K8s] launching queued task ${decision.request.task.name} on node: ${decision.nodeName}"
|
||||
decision.request.handler.submitNow(decision.nodeName)
|
||||
ArrayList nodeTasks = occupancy.get(decision.nodeName)
|
||||
assert nodeTasks != null
|
||||
nodeTasks.add(decision.request.handler)
|
||||
taskToNode.put(decision.request.task.name, decision.nodeName)
|
||||
}
|
||||
}
|
||||
|
||||
/* Scheduling Strategy Interface */
|
||||
List<String> getFreeNodes() {
|
||||
def unoccupied = occupancy.findAll {it.value == null || it.value.size() == 0 }
|
||||
unoccupied.keySet().toList()
|
||||
}
|
||||
|
||||
/**
|
||||
* Run is the scheduler threads main function
|
||||
* */
|
||||
|
||||
@@ -4,16 +4,22 @@ import groovy.transform.CompileStatic
|
||||
import nextflow.k8s.K8sSchedulingDecision
|
||||
import nextflow.k8s.K8sSchedulingRequest
|
||||
import nextflow.k8s.K8sSchedulingStrategy
|
||||
import nextflow.k8s.K8sTaskScheduler
|
||||
|
||||
@CompileStatic
|
||||
class K8sHashSchedulingStrategy implements K8sSchedulingStrategy {
|
||||
|
||||
@Override
|
||||
K8sSchedulingDecision schedule(List<K8sSchedulingRequest> queue, List<String> nodes) {
|
||||
K8sSchedulingDecision schedule(K8sTaskScheduler scheduler, List<K8sSchedulingRequest> queue, List<String> nodes) {
|
||||
if ( !queue || !nodes )
|
||||
return null
|
||||
|
||||
final freeNodes = scheduler.freeNodes
|
||||
if ( freeNodes.size() == 0 )
|
||||
return null
|
||||
|
||||
final request = queue[0]
|
||||
final index = Math.floorMod(request.task.hash.asInt(), nodes.size())
|
||||
return new K8sSchedulingDecision(request, nodes[index])
|
||||
final index = Math.floorMod(request.task.hash.asInt(), freeNodes.size())
|
||||
return new K8sSchedulingDecision(request, freeNodes[index])
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ class K8sTaskSchedulerTest extends Specification {
|
||||
scheduler.submit(handler)
|
||||
|
||||
then:
|
||||
1 * scheduler.() >> ['A', 'B', 'C']
|
||||
0 * strategy._
|
||||
0 * executor._
|
||||
0 * handler.submitNow(_)
|
||||
|
||||
@@ -22,7 +22,7 @@ k8s {
|
||||
projectDir = '/workspace/projects'
|
||||
cleanup = false
|
||||
|
||||
nextflowImage = 'gitea.kleine.eulenhexe.de/kevin/ma/nextflow-dvfs:0.4.2'
|
||||
nextflowImage = 'gitea.kleine.eulenhexe.de/kevin/ma/nextflow-dvfs:0.5.3'
|
||||
imagePullPolicy = 'IfNotPresent'
|
||||
schedulerInterval = '10s'
|
||||
recordTaskRuntimes = true
|
||||
|
||||
Reference in New Issue
Block a user