feature: Basic k8s scheduler

This commit is contained in:
2026-05-03 18:33:34 +02:00
parent d0231de041
commit 6e2e42cbbe
7 changed files with 150 additions and 8 deletions

View File

@@ -16,6 +16,8 @@
package nextflow.k8s package nextflow.k8s
import nextflow.k8s.strategies.K8sHashSchedulingStrategy
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import com.google.common.cache.Cache import com.google.common.cache.Cache
@@ -50,6 +52,8 @@ class K8sExecutor extends Executor implements ExtensionPoint {
*/ */
private Cache<String, K8sClient> clientCache private Cache<String, K8sClient> clientCache
private K8sTaskScheduler taskScheduler
/** /**
* @return The Kubernetes HTTP client. Delegates to a Guava cache that refreshes * @return The Kubernetes HTTP client. Delegates to a Guava cache that refreshes
* the client (including the service account token) when the configured interval expires. * the client (including the service account token) when the configured interval expires.
@@ -58,6 +62,12 @@ class K8sExecutor extends Executor implements ExtensionPoint {
clientCache.get('client', () -> new K8sClient(k8sConfig.getClient())) clientCache.get('client', () -> new K8sClient(k8sConfig.getClient()))
} }
protected K8sTaskScheduler getTaskScheduler() {
if( taskScheduler == null )
taskScheduler = new K8sTaskScheduler(getClient(), new K8sHashSchedulingStrategy())
return taskScheduler
}
/** /**
* @return The `k8s` configuration scope in the nextflow configuration object * @return The `k8s` configuration scope in the nextflow configuration object
*/ */

View File

@@ -0,0 +1,14 @@
package nextflow.k8s
import groovy.transform.CompileStatic
@CompileStatic
class K8sSchedulingDecision {
final String nodeName
final K8sSchedulingRequest request
K8sSchedulingDecision(K8sSchedulingRequest request, String nodeName) {
this.request = request
this.nodeName = nodeName
}
}

View File

@@ -0,0 +1,17 @@
package nextflow.k8s
import groovy.transform.CompileStatic
import nextflow.processor.TaskRun
@CompileStatic
class K8sSchedulingRequest {
final K8sTaskHandler handler
final TaskRun task
final long submitTimeMillis
K8sSchedulingRequest(K8sTaskHandler handler) {
this.handler = handler
this.task = handler.task
this.submitTimeMillis = System.currentTimeMillis()
}
}

View File

@@ -0,0 +1,13 @@
package nextflow.k8s
interface K8sSchedulingStrategy {
/**
* Selects the next task to run from the given task queue.
* If the scheduler should wait, returns {@code null} instead
*
* @param queue Pending scheduling requests
* @param nodes Available Kubernetes node names
* @return A launch decision, or {@code null} when no task should be launched now
*/
K8sSchedulingDecision schedule(List<K8sSchedulingRequest> queue, List<String> nodes)
}

View File

@@ -179,16 +179,17 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
* Creates a Pod specification that executed that specified task * Creates a Pod specification that executed that specified task
* *
* @param task A {@link TaskRun} instance representing the task to execute * @param task A {@link TaskRun} instance representing the task to execute
* @param nodeName The kubernetes node on which the task should run or {@code null}, if no specific node is requested
* @return A {@link Map} object modeling a Pod specification * @return A {@link Map} object modeling a Pod specification
*/ */
protected Map newSubmitRequest(TaskRun task) { protected Map newSubmitRequest(TaskRun task, String nodeName) {
def imageName = task.container def imageName = task.container
if( !imageName ) if( !imageName )
throw new ProcessSubmitException("Missing container image for process `$task.processor.name`") throw new ProcessSubmitException("Missing container image for process `$task.processor.name`")
try { try {
newSubmitRequest0(task, imageName) newSubmitRequest0(task, imageName, nodeName)
} }
catch( Throwable e ) { catch( Throwable e ) {
throw new ProcessSubmitException("Failed to submit K8s ${resourceType.lower()} -- Cause: ${e.message ?: e}", e) throw new ProcessSubmitException("Failed to submit K8s ${resourceType.lower()} -- Cause: ${e.message ?: e}", e)
@@ -203,7 +204,7 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
return executor.getK8sConfig().cpuLimitsEnabled() return executor.getK8sConfig().cpuLimitsEnabled()
} }
protected Map newSubmitRequest0(TaskRun task, String imageName) { protected Map newSubmitRequest0(TaskRun task, String imageName, String nodeName) {
final launcher = getSubmitCommand(task) final launcher = getSubmitCommand(task)
final taskCfg = task.getConfig() final taskCfg = task.getConfig()
@@ -228,6 +229,9 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
builder.withCommand(launcher) builder.withCommand(launcher)
} }
if( nodeName )
builder.withNodeName(nodeName)
// note: task environment is managed by the task bash wrapper // note: task environment is managed by the task bash wrapper
// do not add here -- see also #680 // do not add here -- see also #680
if( fixOwnership() ) if( fixOwnership() )
@@ -314,14 +318,21 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
} }
/** /**
* Creates a new K8s pod executing the associated task * Prepares the task execution and enqueues it at the scheduler
*/ */
@Override @Override
@CompileDynamic @CompileDynamic
void submit() { void submit() {
builder = createBashWrapper(task) builder = createBashWrapper(task)
builder.build() builder.build()
executor.taskScheduler.submit(this)
}
/**
* Creates a new K8s pod executing the associated task
*/
@CompileDynamic
void submitNow(String nodeName) {
final req = newSubmitRequest(task) final req = newSubmitRequest(task)
final resp = useJobResource() final resp = useJobResource()
? client.jobCreate(req, yamlDebugPath()) ? client.jobCreate(req, yamlDebugPath())
@@ -373,7 +384,8 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
@Override @Override
boolean checkIfRunning() { boolean checkIfRunning() {
if( !podName ) throw new IllegalStateException("Missing K8s ${resourceType.lower()} name -- cannot check if running") if( !podName )
return false
if(isSubmitted()) { if(isSubmitted()) {
def state = getState() def state = getState()
// include `terminated` state to allow the handler status to progress // include `terminated` state to allow the handler status to progress
@@ -419,7 +431,7 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
@Override @Override
boolean checkIfCompleted() { boolean checkIfCompleted() {
if( !podName ) if( !podName )
throw new IllegalStateException("Missing K8s ${resourceType.lower()} name - cannot check if complete") return false
final state = getState() final state = getState()
if( state && state.terminated ) { if( state && state.terminated ) {

View File

@@ -0,0 +1,57 @@
package nextflow.k8s
import groovy.transform.CompileDynamic
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.k8s.client.K8sClient
import java.util.concurrent.LinkedBlockingQueue
@Slf4j
@CompileStatic
class K8sTaskScheduler {
private final K8sClient client
private final K8sSchedulingStrategy strategy
private final LinkedBlockingQueue<K8sSchedulingRequest> queue = new LinkedBlockingQueue<>()
private List<String> cachedNodes
K8sTaskScheduler(K8sClient client, K8sSchedulingStrategy strategy) {
this.client = client
this.strategy = strategy
}
void submit(K8sTaskHandler handler) {
queue.add(new K8sSchedulingRequest(handler))
}
protected synchronized void drain() {
while( true ) {
final pending = new ArrayList<K8sSchedulingRequest>(queue)
final decision = strategy.schedule(pending, getNodes())
if ( !decision )
return
if ( !queue.remove(decision.request) )
continue
log.trace "[K8s] launching queued task ${decision.request.task.name} on node: ${decision.nodeName}"
decision.request.handler.submitNow(decision.nodeName)
}
}
protected List<String> getNodes() {
if ( cachedNodes == null )
cachedNodes = fetchNodes()
return cachedNodes
}
@CompileDynamic
private List<String> fetchNodes() {
final resp = client.nodeList()
ArrayList<String> nodes = new ArrayList<String>()
for ( Map item : resp.items ) {
nodes.add(item.metadata.name as String)
}
return nodes
}
}

View File

@@ -0,0 +1,19 @@
package nextflow.k8s.strategies
import groovy.transform.CompileStatic
import nextflow.k8s.K8sSchedulingDecision
import nextflow.k8s.K8sSchedulingRequest
import nextflow.k8s.K8sSchedulingStrategy
@CompileStatic
class K8sHashSchedulingStrategy implements K8sSchedulingStrategy {
@Override
K8sSchedulingDecision schedule(List<K8sSchedulingRequest> queue, List<String> nodes) {
if ( !queue || !nodes )
return null
final request = queue[0]
final index = Math.floorMod(request.task.hash.asInt(), nodes.size())
return new K8sSchedulingDecision(request, nodes[index])
}
}