feat: invoke scheduler at regular interval

This commit is contained in:
2026-05-21 20:37:55 +02:00
parent a8d0af6d77
commit 78fe51a40d
6 changed files with 75 additions and 20 deletions

View File

@@ -39,6 +39,9 @@ import nextflow.k8s.model.PodSecurityContext
import nextflow.k8s.model.PodVolumeClaim import nextflow.k8s.model.PodVolumeClaim
import nextflow.k8s.model.ResourceType import nextflow.k8s.model.ResourceType
import nextflow.util.Duration import nextflow.util.Duration
import java.util.concurrent.TimeUnit
/** /**
* Model Kubernetes specific settings defined in the nextflow * Model Kubernetes specific settings defined in the nextflow
* configuration file * configuration file
@@ -221,6 +224,12 @@ class K8sConfig implements ConfigScope {
""") """)
final String nextflowImage final String nextflowImage
@ConfigOption
@Description("""
The run interval of the kubernetes scheduler
""")
final Duration schedulerInterval
/* required by extension point -- do not remove */ /* required by extension point -- do not remove */
K8sConfig() { K8sConfig() {
this(Collections.emptyMap()) this(Collections.emptyMap())
@@ -251,6 +260,7 @@ class K8sConfig implements ConfigScope {
storageSubPath = opts.storageSubPath storageSubPath = opts.storageSubPath
userName = opts.userName userName = opts.userName
nextflowImage = opts.nextflowImage ?: "nextflow/nextflow:${BuildInfo.version}" nextflowImage = opts.nextflowImage ?: "nextflow/nextflow:${BuildInfo.version}"
schedulerInterval = opts.schedulerInterval as Duration ?: new Duration(10, TimeUnit.SECONDS)
launchDir = opts.launchDir ?: "${storageMountPath}/${getUserName()}" launchDir = opts.launchDir ?: "${storageMountPath}/${getUserName()}"
projectDir = opts.projectDir ?: "${storageMountPath}/projects" projectDir = opts.projectDir ?: "${storageMountPath}/projects"
@@ -272,7 +282,6 @@ class K8sConfig implements ConfigScope {
else if( securityContext ) else if( securityContext )
pod.securityContext = new PodSecurityContext(securityContext) pod.securityContext = new PodSecurityContext(securityContext)
log.info("Hello sailor")
nodeInit = new K8sNodeInitConfig(opts.nodeInit as Map ?: Collections.emptyMap()) nodeInit = new K8sNodeInitConfig(opts.nodeInit as Map ?: Collections.emptyMap())
} }

View File

@@ -52,19 +52,19 @@ class K8sExecutor extends Executor implements ExtensionPoint {
*/ */
private Cache<String, K8sClient> clientCache private Cache<String, K8sClient> clientCache
private K8sTaskScheduler taskScheduler private K8sTaskScheduler taskScheduler
private Thread schedulerThread
/** /**
* @return The Kubernetes HTTP client. Delegates to a Guava cache that refreshes * @return The Kubernetes HTTP client. Delegates to a Guava cache that refreshes
* the client (including the service account token) when the configured interval expires. * the client (including the service account token) when the configured interval expires.
*/ */
protected K8sClient getClient() { K8sClient getClient() {
clientCache.get('client', () -> new K8sClient(k8sConfig.getClient())) clientCache.get('client', () -> new K8sClient(k8sConfig.getClient()))
} }
protected K8sTaskScheduler getTaskScheduler() { protected K8sTaskScheduler getTaskScheduler() {
if( taskScheduler == null ) assert taskScheduler != null
taskScheduler = new K8sTaskScheduler(getClient(), new K8sHashSchedulingStrategy())
return taskScheduler return taskScheduler
} }
@@ -88,7 +88,18 @@ class K8sExecutor extends Executor implements ExtensionPoint {
.expireAfterWrite(refreshInterval.toMillis(), TimeUnit.MILLISECONDS) .expireAfterWrite(refreshInterval.toMillis(), TimeUnit.MILLISECONDS)
.build() .build()
final client = getClient() final client = getClient()
log.debug "[K8s] config=$k8sConfig; API client config=$client.config" log.debug "[K8s] config=$k8sConfig; API client config=$client.config"
this.taskScheduler = new K8sTaskScheduler(this, new K8sHashSchedulingStrategy())
this.schedulerThread = new Thread(this.taskScheduler)
this.schedulerThread.start()
}
@Override
void shutdown() {
this.taskScheduler.stop()
this.schedulerThread.join()
} }
/** /**

View File

@@ -334,6 +334,7 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
void submit() { void submit() {
builder = createBashWrapper(task) builder = createBashWrapper(task)
builder.build() builder.build()
log.info "[K8s] submitting task ${this.task.name}"
executor.taskScheduler.submit(this) executor.taskScheduler.submit(this)
} }

View File

@@ -8,22 +8,28 @@ import java.util.concurrent.LinkedBlockingQueue
@Slf4j @Slf4j
@CompileStatic @CompileStatic
class K8sTaskScheduler { class K8sTaskScheduler implements Runnable {
private final K8sClient client private final K8sExecutor executor
private final K8sSchedulingStrategy strategy private final K8sSchedulingStrategy strategy
private final LinkedBlockingQueue<K8sSchedulingRequest> queue = new LinkedBlockingQueue<>() private final LinkedBlockingQueue<K8sSchedulingRequest> queue = new LinkedBlockingQueue<>()
private List<String> cachedNodes private List<String> cachedNodes
K8sTaskScheduler(K8sClient client, K8sSchedulingStrategy strategy) { private synchronized boolean shouldStop
this.client = client
K8sTaskScheduler(K8sExecutor executor, K8sSchedulingStrategy strategy) {
this.executor = executor
this.strategy = strategy this.strategy = strategy
} }
/**
* Adds a task to the queue of outstanding tasks
* @param handler
*/
void submit(K8sTaskHandler handler) { void submit(K8sTaskHandler handler) {
log.debug "[K8s] received queued task ${handler.task.name}" log.info "[K8s] received queued task ${handler.task.name}"
queue.add(new K8sSchedulingRequest(handler)) queue.add(new K8sSchedulingRequest(handler))
drain() /* TODO: Decide if we should invoke the scheduler immediately */
} }
protected synchronized void drain() { protected synchronized void drain() {
@@ -37,11 +43,32 @@ class K8sTaskScheduler {
if ( !queue.remove(decision.request) ) if ( !queue.remove(decision.request) )
continue continue
log.debug "[K8s] launching queued task ${decision.request.task.name} on node: ${decision.nodeName}" log.info "[K8s] launching queued task ${decision.request.task.name} on node: ${decision.nodeName}"
decision.request.handler.submitNow(decision.nodeName) decision.request.handler.submitNow(decision.nodeName)
} }
} }
/**
* Run is the scheduler threads main function
* */
void run() {
this.shouldStop = false
def interval = this.executor.getK8sConfig().schedulerInterval
while (!shouldStop) {
sleep(interval.toMillis())
drain()
}
log.info("[K8s] terminated scheduler loop")
}
/**
* Stop terminates the scheduler thread
*/
void stop() {
log.info("[K8s] stopping scheduler loop")
this.shouldStop = true
}
protected List<String> getNodes() { protected List<String> getNodes() {
if ( cachedNodes == null ) if ( cachedNodes == null )
cachedNodes = fetchNodes() cachedNodes = fetchNodes()
@@ -50,7 +77,7 @@ class K8sTaskScheduler {
@CompileDynamic @CompileDynamic
private List<String> fetchNodes() { private List<String> fetchNodes() {
final resp = client.nodeList() final resp = executor.getClient().nodeList()
ArrayList<String> nodes = new ArrayList<String>() ArrayList<String> nodes = new ArrayList<String>()
for ( Map item : resp.items ) { for ( Map item : resp.items ) {
nodes.add(item.metadata.name as String) nodes.add(item.metadata.name as String)

View File

@@ -8,9 +8,9 @@ class K8sTaskSchedulerTest extends Specification {
def 'should enqueue task handler without immediately submitting it' () { def 'should enqueue task handler without immediately submitting it' () {
given: given:
def client = Mock(K8sClient) def executor = Mock(K8sExecutor)
def strategy = Mock(K8sSchedulingStrategy) def strategy = Mock(K8sSchedulingStrategy)
def scheduler = new K8sTaskScheduler(client, strategy) def scheduler = new K8sTaskScheduler(executor, strategy)
def task = Mock(TaskRun) def task = Mock(TaskRun)
def handler = Spy(K8sTaskHandler) def handler = Spy(K8sTaskHandler)
@@ -21,7 +21,7 @@ class K8sTaskSchedulerTest extends Specification {
then: then:
0 * strategy._ 0 * strategy._
0 * client._ 0 * executor._
0 * handler.submitNow(_) 0 * handler.submitNow(_)
and: and:
@@ -31,8 +31,9 @@ class K8sTaskSchedulerTest extends Specification {
def 'should drain queued task and submit it to selected node' () { def 'should drain queued task and submit it to selected node' () {
given: given:
def client = Mock(K8sClient) def client = Mock(K8sClient)
def executor = Mock(K8sExecutor)
def strategy = Mock(K8sSchedulingStrategy) def strategy = Mock(K8sSchedulingStrategy)
def scheduler = new K8sTaskScheduler(client, strategy) def scheduler = new K8sTaskScheduler(executor, strategy)
def task = Mock(TaskRun) def task = Mock(TaskRun)
def handler = Spy(K8sTaskHandler) def handler = Spy(K8sTaskHandler)
@@ -43,6 +44,7 @@ class K8sTaskSchedulerTest extends Specification {
scheduler.drain() scheduler.drain()
then: then:
1 * executor.getClient() >> client
1 * client.nodeList() >> [ 1 * client.nodeList() >> [
items: [ items: [
[metadata: [name: 'node-a']], [metadata: [name: 'node-a']],
@@ -70,8 +72,9 @@ class K8sTaskSchedulerTest extends Specification {
def 'should keep task queued when strategy returns no decision' () { def 'should keep task queued when strategy returns no decision' () {
given: given:
def client = Mock(K8sClient) def client = Mock(K8sClient)
def executor = Mock(K8sExecutor)
def strategy = Mock(K8sSchedulingStrategy) def strategy = Mock(K8sSchedulingStrategy)
def scheduler = new K8sTaskScheduler(client, strategy) def scheduler = new K8sTaskScheduler(executor, strategy)
def task = Mock(TaskRun) def task = Mock(TaskRun)
def handler = Spy(K8sTaskHandler) def handler = Spy(K8sTaskHandler)
@@ -82,6 +85,7 @@ class K8sTaskSchedulerTest extends Specification {
scheduler.drain() scheduler.drain()
then: then:
1 * executor.getClient() >> client
1 * client.nodeList() >> [ 1 * client.nodeList() >> [
items: [ items: [
[metadata: [name: 'node-a']] [metadata: [name: 'node-a']]
@@ -101,8 +105,9 @@ class K8sTaskSchedulerTest extends Specification {
def 'should continue draining when selected request was already removed' () { def 'should continue draining when selected request was already removed' () {
given: given:
def client = Mock(K8sClient) def client = Mock(K8sClient)
def executor = Mock(K8sExecutor)
def strategy = Mock(K8sSchedulingStrategy) def strategy = Mock(K8sSchedulingStrategy)
def scheduler = new K8sTaskScheduler(client, strategy) def scheduler = new K8sTaskScheduler(executor, strategy)
def task1 = Mock(TaskRun) def task1 = Mock(TaskRun)
def handler1 = Spy(K8sTaskHandler) def handler1 = Spy(K8sTaskHandler)
@@ -118,6 +123,7 @@ class K8sTaskSchedulerTest extends Specification {
scheduler.drain() scheduler.drain()
then: then:
1 * executor.getClient() >> client
1 * client.nodeList() >> [ 1 * client.nodeList() >> [
items: [ items: [
[metadata: [name: 'node-a']] [metadata: [name: 'node-a']]

View File

@@ -22,8 +22,9 @@ k8s {
projectDir = '/workspace/projects' projectDir = '/workspace/projects'
cleanup = false cleanup = false
nextflowImage = 'gitea.kleine.eulenhexe.de/kevin/ma/nextflow-dvfs:0.1' nextflowImage = 'gitea.kleine.eulenhexe.de/kevin/ma/nextflow-dvfs:0.3.2'
imagePullPolicy = 'IfNotPresent' imagePullPolicy = 'IfNotPresent'
schedulerInterval = '10s'
nodeInit { nodeInit {
enabled = false enabled = false