test: Add scheduler tests

This commit is contained in:
2026-05-04 20:49:00 +02:00
parent 2d77fe678a
commit 51d2a44250
5 changed files with 352 additions and 36 deletions

View File

@@ -175,6 +175,16 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
ContainerHelper.fixOwnership(task.containerConfig)
}
/**
* Creates a Pod specification that executed that specified task
*
* @param task A {@link TaskRun} instance representing the task to execute
* @return A {@link Map} object modeling a Pod specification
*/
protected Map newSubmitRequest(TaskRun task) {
return newSubmitRequest(task, null)
}
/**
* Creates a Pod specification that executed that specified task
*
@@ -182,7 +192,6 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
* @param nodeName The kubernetes node on which the task should run or {@code null}, if no specific node is requested
* @return A {@link Map} object modeling a Pod specification
*/
protected Map newSubmitRequest(TaskRun task, String nodeName) {
def imageName = task.container
if( !imageName )

View File

@@ -0,0 +1,23 @@
package nextflow.k8s
import nextflow.processor.TaskRun
import spock.lang.Specification
class K8sSchedulingRequestTest extends Specification {
def 'should create scheduling request from task handler' () {
given:
def task = Mock(TaskRun)
def handler = Spy(K8sTaskHandler)
handler.task = task
when:
def request = new K8sSchedulingRequest(handler)
then:
request.handler == handler
request.task == task
request.submitTimeMillis > 0
request.submitTimeMillis <= System.currentTimeMillis()
}
}

View File

@@ -348,41 +348,6 @@ class K8sTaskHandlerTest extends Specification {
}
def 'should submit a pod' () {
given:
def task = Mock(TaskRun)
def client = Mock(K8sClient)
def builder = Mock(K8sWrapperBuilder)
def handler = Spy(new K8sTaskHandler(client: client, task:task))
def POD_NAME = 'new-pod-id'
def REQUEST = [foo: 'bar']
def RESPONSE = new K8sResponseJson([metadata: [name:POD_NAME]])
def YAML = Paths.get('file.yaml')
when:
handler.submit()
then:
1 * handler.createBashWrapper(task) >> builder
1 * builder.build() >> null
1 * handler.yamlDebugPath() >> YAML
1 * handler.newSubmitRequest(task) >> REQUEST
1 * client.podCreate(REQUEST,YAML) >> RESPONSE
handler.podName == POD_NAME
handler.status == TaskStatus.SUBMITTED
when:
handler.submit()
then:
1 * handler.createBashWrapper(task) >> builder
1 * builder.build() >> null
1 * handler.yamlDebugPath() >> YAML
1 * handler.newSubmitRequest(task) >> REQUEST
1 * client.podCreate(REQUEST,YAML) >> new K8sResponseJson([missing: 'meta'])
then:
thrown(K8sResponseException)
}
def 'should submit a job' () {
given:
def WORK_DIR = Paths.get('/some/work/dir')
@@ -1140,4 +1105,94 @@ class K8sTaskHandlerTest extends Specification {
}
def 'should enqueue task instead of submitting pod immediately' () {
given:
def task = Mock(TaskRun)
def wrapper = Mock(K8sWrapperBuilder)
def scheduler = Mock(K8sTaskScheduler)
def executor = Mock(K8sExecutor)
def handler = Spy(new K8sTaskHandler(task: task, executor: executor))
when:
handler.submit()
then:
1 * handler.createBashWrapper(task) >> wrapper
1 * wrapper.build()
1 * executor.getTaskScheduler() >> scheduler
1 * scheduler.submit(handler)
and:
0 * handler.newSubmitRequest(_, _)
}
def 'should return false for queued task that has no pod name yet' () {
given:
def handler = Spy(K8sTaskHandler)
expect:
!handler.checkIfRunning()
!handler.checkIfCompleted()
}
def 'should create a pod request assigned to selected node' () {
given:
def WORK_DIR = Paths.get('/some/work/dir')
def config = Mock(TaskConfig)
def task = Mock(TaskRun)
def client = Mock(K8sClient)
def builder = Mock(K8sWrapperBuilder)
def handler = Spy(new K8sTaskHandler(builder: builder, client: client))
Map result
when:
result = handler.newSubmitRequest(task, 'node-a')
then:
_ * handler.fusionEnabled() >> false
1 * handler.fixOwnership() >> false
1 * handler.entrypointOverride() >> false
1 * handler.cpuLimitsEnabled() >> false
1 * handler.getPodOptions() >> new PodOptions()
1 * handler.getSyntheticPodName(task) >> 'nf-123'
1 * handler.getLabels(task) >> [:]
1 * handler.getAnnotations() >> [:]
1 * handler.getContainerMounts() >> []
1 * task.getContainer() >> 'debian:latest'
1 * task.getWorkDir() >> WORK_DIR
1 * task.getConfig() >> config
1 * config.getCpus() >> 0
1 * config.getMemory() >> null
1 * client.getConfig() >> new ClientConfig()
and:
result.spec.nodeName == 'node-a'
result.spec.containers[0].image == 'debian:latest'
}
def 'should submit now to selected node and set pod name' () {
given:
def task = Mock(TaskRun)
def client = Mock(K8sClient)
def handler = Spy(new K8sTaskHandler(task: task, client: client))
def request = [
apiVersion: 'v1',
kind: 'Pod',
metadata: [name: 'nf-123'],
spec: [:]
]
when:
handler.submitNow('node-a')
then:
1 * handler.newSubmitRequest(task, 'node-a') >> request
1 * handler.useJobResource() >> false
1 * handler.yamlDebugPath() >> null
1 * client.podCreate(request, null) >> [metadata: [name: 'nf-123']]
and:
handler.getPodName() == 'nf-123'
handler.status == TaskStatus.SUBMITTED
}
}

View File

@@ -0,0 +1,146 @@
package nextflow.k8s
import nextflow.k8s.client.K8sClient
import nextflow.processor.TaskRun
import spock.lang.Specification
class K8sTaskSchedulerTest extends Specification {
def 'should enqueue task handler without immediately submitting it' () {
given:
def client = Mock(K8sClient)
def strategy = Mock(K8sSchedulingStrategy)
def scheduler = new K8sTaskScheduler(client, strategy)
def task = Mock(TaskRun)
def handler = Spy(K8sTaskHandler)
handler.task = task
when:
scheduler.submit(handler)
then:
0 * strategy._
0 * client._
0 * handler.submitNow(_)
and:
scheduler.@queue.size() == 1
}
def 'should drain queued task and submit it to selected node' () {
given:
def client = Mock(K8sClient)
def strategy = Mock(K8sSchedulingStrategy)
def scheduler = new K8sTaskScheduler(client, strategy)
def task = Mock(TaskRun)
def handler = Spy(K8sTaskHandler)
handler.task = task
when:
scheduler.submit(handler)
scheduler.drain()
then:
1 * client.nodeList() >> [
items: [
[metadata: [name: 'node-a']],
[metadata: [name: 'node-b']]
]
]
then:
1 * strategy.schedule({ List<K8sSchedulingRequest> queue ->
queue.size() == 1 && queue[0].handler == handler && queue[0].task == task
}, ['node-a', 'node-b']) >> { List<K8sSchedulingRequest> queue, List<String> nodes ->
new K8sSchedulingDecision(queue[0], 'node-b')
}
then:
1 * handler.submitNow('node-b') >> {}
then:
1 * strategy.schedule([], ['node-a', 'node-b']) >> null
and:
scheduler.@queue.size() == 0
}
def 'should keep task queued when strategy returns no decision' () {
given:
def client = Mock(K8sClient)
def strategy = Mock(K8sSchedulingStrategy)
def scheduler = new K8sTaskScheduler(client, strategy)
def task = Mock(TaskRun)
def handler = Spy(K8sTaskHandler)
handler.task = task
when:
scheduler.submit(handler)
scheduler.drain()
then:
1 * client.nodeList() >> [
items: [
[metadata: [name: 'node-a']]
]
]
then:
1 * strategy.schedule({ List<K8sSchedulingRequest> queue ->
queue.size() == 1 && queue[0].handler == handler
}, ['node-a']) >> null
and:
0 * handler.submitNow(_)
scheduler.@queue.size() == 1
}
def 'should continue draining when selected request was already removed' () {
given:
def client = Mock(K8sClient)
def strategy = Mock(K8sSchedulingStrategy)
def scheduler = new K8sTaskScheduler(client, strategy)
def task1 = Mock(TaskRun)
def handler1 = Spy(K8sTaskHandler)
handler1.task = task1
def staleRequest = new K8sSchedulingRequest(handler1)
def task2 = Mock(TaskRun)
def handler2 = Spy(K8sTaskHandler)
handler2.task = task2
when:
scheduler.submit(handler2)
scheduler.drain()
then:
1 * client.nodeList() >> [
items: [
[metadata: [name: 'node-a']]
]
]
then:
1 * strategy.schedule(_, ['node-a']) >> new K8sSchedulingDecision(staleRequest, 'node-a')
then:
1 * strategy.schedule({ List<K8sSchedulingRequest> queue ->
queue.size() == 1 && queue[0].handler == handler2
}, ['node-a']) >> { List<K8sSchedulingRequest> queue, List<String> nodes ->
new K8sSchedulingDecision(queue[0], 'node-a')
}
then:
1 * handler2.submitNow('node-a') >> {}
then:
1 * strategy.schedule([], ['node-a']) >> null
and:
scheduler.@queue.size() == 0
}
}

View File

@@ -0,0 +1,83 @@
package nextflow.k8s.strategies
import nextflow.k8s.K8sSchedulingRequest
import nextflow.k8s.K8sTaskHandler
import nextflow.processor.TaskRun
import nextflow.util.HashBuilder
import spock.lang.Specification
class K8sHashSchedulingStrategyTest extends Specification {
def 'should return null when queue is empty' () {
given:
def strategy = new K8sHashSchedulingStrategy()
expect:
strategy.schedule([], ['node-a']) == null
}
def 'should return null when nodes are empty' () {
given:
def strategy = new K8sHashSchedulingStrategy()
def request = Mock(K8sSchedulingRequest)
expect:
strategy.schedule([request], []) == null
}
def 'should select first queued request and assign a node using task hash' () {
given:
def strategy = new K8sHashSchedulingStrategy()
def task = Mock(TaskRun)
def handler = Spy(K8sTaskHandler)
handler.task = task
def request = new K8sSchedulingRequest(handler)
def nodes = ['node-a', 'node-b', 'node-c']
when:
def decision = strategy.schedule([request], nodes)
then:
1 * task.getHash() >> taskHash
and:
decision.request == request
decision.nodeName == expectedNode
where:
taskHash | expectedNode
new HashBuilder().with(0).build() | 'node-a'
new HashBuilder().with(1).build() | 'node-b'
new HashBuilder().with(2).build() | 'node-c'
new HashBuilder().with(3).build() | 'node-b'
new HashBuilder().with(-1).build() | 'node-b'
}
def 'should always schedule the head of the queue' () {
given:
def strategy = new K8sHashSchedulingStrategy()
def task1 = Mock(TaskRun)
def task2 = Mock(TaskRun)
def handler1 = Spy(K8sTaskHandler)
handler1.task = task1
def handler2 = Spy(K8sTaskHandler)
handler2.task = task2
def request1 = new K8sSchedulingRequest(handler1)
def request2 = new K8sSchedulingRequest(handler2)
when:
def decision = strategy.schedule([request1, request2], ['node-a', 'node-b'])
then:
1 * task1.getHash() >> new HashBuilder().with(0).build()
0 * task2.getHash()
and:
decision.request == request1
decision.nodeName == 'node-a'
}
}