diff --git a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy index ed1c3b5..c919931 100644 --- a/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy +++ b/nextflow/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy @@ -175,6 +175,16 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask { ContainerHelper.fixOwnership(task.containerConfig) } + /** + * Creates a Pod specification that executed that specified task + * + * @param task A {@link TaskRun} instance representing the task to execute + * @return A {@link Map} object modeling a Pod specification + */ + protected Map newSubmitRequest(TaskRun task) { + return newSubmitRequest(task, null) + } + /** * Creates a Pod specification that executed that specified task * @@ -182,7 +192,6 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask { * @param nodeName The kubernetes node on which the task should run or {@code null}, if no specific node is requested * @return A {@link Map} object modeling a Pod specification */ - protected Map newSubmitRequest(TaskRun task, String nodeName) { def imageName = task.container if( !imageName ) diff --git a/nextflow/plugins/nf-k8s/src/test/nextflow/k8s/K8sSchedulingRequestTest.groovy b/nextflow/plugins/nf-k8s/src/test/nextflow/k8s/K8sSchedulingRequestTest.groovy new file mode 100644 index 0000000..250890c --- /dev/null +++ b/nextflow/plugins/nf-k8s/src/test/nextflow/k8s/K8sSchedulingRequestTest.groovy @@ -0,0 +1,23 @@ +package nextflow.k8s + +import nextflow.processor.TaskRun +import spock.lang.Specification + +class K8sSchedulingRequestTest extends Specification { + + def 'should create scheduling request from task handler' () { + given: + def task = Mock(TaskRun) + def handler = Spy(K8sTaskHandler) + handler.task = task + + when: + def request = new K8sSchedulingRequest(handler) + + then: + request.handler == handler + request.task == task + request.submitTimeMillis > 0 + request.submitTimeMillis <= System.currentTimeMillis() + } +} diff --git a/nextflow/plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskHandlerTest.groovy b/nextflow/plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskHandlerTest.groovy index e63e36b..5b777cc 100644 --- a/nextflow/plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskHandlerTest.groovy +++ b/nextflow/plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskHandlerTest.groovy @@ -348,41 +348,6 @@ class K8sTaskHandlerTest extends Specification { } - def 'should submit a pod' () { - - given: - def task = Mock(TaskRun) - def client = Mock(K8sClient) - def builder = Mock(K8sWrapperBuilder) - def handler = Spy(new K8sTaskHandler(client: client, task:task)) - - def POD_NAME = 'new-pod-id' - def REQUEST = [foo: 'bar'] - def RESPONSE = new K8sResponseJson([metadata: [name:POD_NAME]]) - def YAML = Paths.get('file.yaml') - when: - handler.submit() - then: - 1 * handler.createBashWrapper(task) >> builder - 1 * builder.build() >> null - 1 * handler.yamlDebugPath() >> YAML - 1 * handler.newSubmitRequest(task) >> REQUEST - 1 * client.podCreate(REQUEST,YAML) >> RESPONSE - handler.podName == POD_NAME - handler.status == TaskStatus.SUBMITTED - - when: - handler.submit() - then: - 1 * handler.createBashWrapper(task) >> builder - 1 * builder.build() >> null - 1 * handler.yamlDebugPath() >> YAML - 1 * handler.newSubmitRequest(task) >> REQUEST - 1 * client.podCreate(REQUEST,YAML) >> new K8sResponseJson([missing: 'meta']) - then: - thrown(K8sResponseException) - } - def 'should submit a job' () { given: def WORK_DIR = Paths.get('/some/work/dir') @@ -1140,4 +1105,94 @@ class K8sTaskHandlerTest extends Specification { } + def 'should enqueue task instead of submitting pod immediately' () { + given: + def task = Mock(TaskRun) + def wrapper = Mock(K8sWrapperBuilder) + def scheduler = Mock(K8sTaskScheduler) + def executor = Mock(K8sExecutor) + def handler = Spy(new K8sTaskHandler(task: task, executor: executor)) + + when: + handler.submit() + + then: + 1 * handler.createBashWrapper(task) >> wrapper + 1 * wrapper.build() + 1 * executor.getTaskScheduler() >> scheduler + 1 * scheduler.submit(handler) + + and: + 0 * handler.newSubmitRequest(_, _) + } + + def 'should return false for queued task that has no pod name yet' () { + given: + def handler = Spy(K8sTaskHandler) + + expect: + !handler.checkIfRunning() + !handler.checkIfCompleted() + } + + def 'should create a pod request assigned to selected node' () { + given: + def WORK_DIR = Paths.get('/some/work/dir') + def config = Mock(TaskConfig) + def task = Mock(TaskRun) + def client = Mock(K8sClient) + def builder = Mock(K8sWrapperBuilder) + def handler = Spy(new K8sTaskHandler(builder: builder, client: client)) + Map result + + when: + result = handler.newSubmitRequest(task, 'node-a') + + then: + _ * handler.fusionEnabled() >> false + 1 * handler.fixOwnership() >> false + 1 * handler.entrypointOverride() >> false + 1 * handler.cpuLimitsEnabled() >> false + 1 * handler.getPodOptions() >> new PodOptions() + 1 * handler.getSyntheticPodName(task) >> 'nf-123' + 1 * handler.getLabels(task) >> [:] + 1 * handler.getAnnotations() >> [:] + 1 * handler.getContainerMounts() >> [] + 1 * task.getContainer() >> 'debian:latest' + 1 * task.getWorkDir() >> WORK_DIR + 1 * task.getConfig() >> config + 1 * config.getCpus() >> 0 + 1 * config.getMemory() >> null + 1 * client.getConfig() >> new ClientConfig() + + and: + result.spec.nodeName == 'node-a' + result.spec.containers[0].image == 'debian:latest' + } + + def 'should submit now to selected node and set pod name' () { + given: + def task = Mock(TaskRun) + def client = Mock(K8sClient) + def handler = Spy(new K8sTaskHandler(task: task, client: client)) + def request = [ + apiVersion: 'v1', + kind: 'Pod', + metadata: [name: 'nf-123'], + spec: [:] + ] + + when: + handler.submitNow('node-a') + + then: + 1 * handler.newSubmitRequest(task, 'node-a') >> request + 1 * handler.useJobResource() >> false + 1 * handler.yamlDebugPath() >> null + 1 * client.podCreate(request, null) >> [metadata: [name: 'nf-123']] + + and: + handler.getPodName() == 'nf-123' + handler.status == TaskStatus.SUBMITTED + } } diff --git a/nextflow/plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskSchedulerTest.groovy b/nextflow/plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskSchedulerTest.groovy new file mode 100644 index 0000000..0ab11e5 --- /dev/null +++ b/nextflow/plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskSchedulerTest.groovy @@ -0,0 +1,146 @@ +package nextflow.k8s + +import nextflow.k8s.client.K8sClient +import nextflow.processor.TaskRun +import spock.lang.Specification + +class K8sTaskSchedulerTest extends Specification { + + def 'should enqueue task handler without immediately submitting it' () { + given: + def client = Mock(K8sClient) + def strategy = Mock(K8sSchedulingStrategy) + def scheduler = new K8sTaskScheduler(client, strategy) + + def task = Mock(TaskRun) + def handler = Spy(K8sTaskHandler) + handler.task = task + + when: + scheduler.submit(handler) + + then: + 0 * strategy._ + 0 * client._ + 0 * handler.submitNow(_) + + and: + scheduler.@queue.size() == 1 + } + + def 'should drain queued task and submit it to selected node' () { + given: + def client = Mock(K8sClient) + def strategy = Mock(K8sSchedulingStrategy) + def scheduler = new K8sTaskScheduler(client, strategy) + + def task = Mock(TaskRun) + def handler = Spy(K8sTaskHandler) + handler.task = task + + when: + scheduler.submit(handler) + scheduler.drain() + + then: + 1 * client.nodeList() >> [ + items: [ + [metadata: [name: 'node-a']], + [metadata: [name: 'node-b']] + ] + ] + + then: + 1 * strategy.schedule({ List queue -> + queue.size() == 1 && queue[0].handler == handler && queue[0].task == task + }, ['node-a', 'node-b']) >> { List queue, List nodes -> + new K8sSchedulingDecision(queue[0], 'node-b') + } + + then: + 1 * handler.submitNow('node-b') >> {} + + then: + 1 * strategy.schedule([], ['node-a', 'node-b']) >> null + + and: + scheduler.@queue.size() == 0 + } + + def 'should keep task queued when strategy returns no decision' () { + given: + def client = Mock(K8sClient) + def strategy = Mock(K8sSchedulingStrategy) + def scheduler = new K8sTaskScheduler(client, strategy) + + def task = Mock(TaskRun) + def handler = Spy(K8sTaskHandler) + handler.task = task + + when: + scheduler.submit(handler) + scheduler.drain() + + then: + 1 * client.nodeList() >> [ + items: [ + [metadata: [name: 'node-a']] + ] + ] + + then: + 1 * strategy.schedule({ List queue -> + queue.size() == 1 && queue[0].handler == handler + }, ['node-a']) >> null + + and: + 0 * handler.submitNow(_) + scheduler.@queue.size() == 1 + } + + def 'should continue draining when selected request was already removed' () { + given: + def client = Mock(K8sClient) + def strategy = Mock(K8sSchedulingStrategy) + def scheduler = new K8sTaskScheduler(client, strategy) + + def task1 = Mock(TaskRun) + def handler1 = Spy(K8sTaskHandler) + handler1.task = task1 + def staleRequest = new K8sSchedulingRequest(handler1) + + def task2 = Mock(TaskRun) + def handler2 = Spy(K8sTaskHandler) + handler2.task = task2 + + when: + scheduler.submit(handler2) + scheduler.drain() + + then: + 1 * client.nodeList() >> [ + items: [ + [metadata: [name: 'node-a']] + ] + ] + + then: + 1 * strategy.schedule(_, ['node-a']) >> new K8sSchedulingDecision(staleRequest, 'node-a') + + then: + 1 * strategy.schedule({ List queue -> + queue.size() == 1 && queue[0].handler == handler2 + }, ['node-a']) >> { List queue, List nodes -> + new K8sSchedulingDecision(queue[0], 'node-a') + } + + then: + 1 * handler2.submitNow('node-a') >> {} + + then: + 1 * strategy.schedule([], ['node-a']) >> null + + and: + scheduler.@queue.size() == 0 + } +} diff --git a/nextflow/plugins/nf-k8s/src/test/nextflow/k8s/strategies/K8sHashSchedulingStrategyTest.groovy b/nextflow/plugins/nf-k8s/src/test/nextflow/k8s/strategies/K8sHashSchedulingStrategyTest.groovy new file mode 100644 index 0000000..af5b080 --- /dev/null +++ b/nextflow/plugins/nf-k8s/src/test/nextflow/k8s/strategies/K8sHashSchedulingStrategyTest.groovy @@ -0,0 +1,83 @@ +package nextflow.k8s.strategies + +import nextflow.k8s.K8sSchedulingRequest +import nextflow.k8s.K8sTaskHandler +import nextflow.processor.TaskRun +import nextflow.util.HashBuilder +import spock.lang.Specification + +class K8sHashSchedulingStrategyTest extends Specification { + + def 'should return null when queue is empty' () { + given: + def strategy = new K8sHashSchedulingStrategy() + + expect: + strategy.schedule([], ['node-a']) == null + } + + def 'should return null when nodes are empty' () { + given: + def strategy = new K8sHashSchedulingStrategy() + def request = Mock(K8sSchedulingRequest) + + expect: + strategy.schedule([request], []) == null + } + + def 'should select first queued request and assign a node using task hash' () { + given: + def strategy = new K8sHashSchedulingStrategy() + def task = Mock(TaskRun) + def handler = Spy(K8sTaskHandler) + handler.task = task + def request = new K8sSchedulingRequest(handler) + def nodes = ['node-a', 'node-b', 'node-c'] + + when: + def decision = strategy.schedule([request], nodes) + + then: + 1 * task.getHash() >> taskHash + + and: + decision.request == request + decision.nodeName == expectedNode + + where: + taskHash | expectedNode + new HashBuilder().with(0).build() | 'node-a' + new HashBuilder().with(1).build() | 'node-b' + new HashBuilder().with(2).build() | 'node-c' + new HashBuilder().with(3).build() | 'node-b' + new HashBuilder().with(-1).build() | 'node-b' + } + + def 'should always schedule the head of the queue' () { + given: + def strategy = new K8sHashSchedulingStrategy() + + def task1 = Mock(TaskRun) + def task2 = Mock(TaskRun) + + def handler1 = Spy(K8sTaskHandler) + handler1.task = task1 + + def handler2 = Spy(K8sTaskHandler) + handler2.task = task2 + + def request1 = new K8sSchedulingRequest(handler1) + def request2 = new K8sSchedulingRequest(handler2) + + when: + def decision = strategy.schedule([request1, request2], ['node-a', 'node-b']) + + then: + 1 * task1.getHash() >> new HashBuilder().with(0).build() + 0 * task2.getHash() + + and: + decision.request == request1 + decision.nodeName == 'node-a' + } +}