Compare commits
6 Commits
0ec084b720
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 933611af55 | |||
| 7f18e5aae8 | |||
| 2ae1ac6e38 | |||
| 78fe51a40d | |||
| a8d0af6d77 | |||
| 7a68546afa |
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
cleanup-volume-claim.sh
|
||||
8
.woodpecker/nextflow-k8s.yaml
Normal file
8
.woodpecker/nextflow-k8s.yaml
Normal file
@@ -0,0 +1,8 @@
|
||||
when:
|
||||
- event: push
|
||||
|
||||
steps:
|
||||
- name: Test Kubernetes Plugin
|
||||
image: groovy:4-jdk21
|
||||
commands:
|
||||
- cd nextflow && ./gradlew :plugins:nf-k8s:test
|
||||
4
mise.toml
Normal file
4
mise.toml
Normal file
@@ -0,0 +1,4 @@
|
||||
[tools]
|
||||
go='1.25.6'
|
||||
java = "21.0.2"
|
||||
groovy = "4.0.28"
|
||||
@@ -1,12 +1,63 @@
|
||||
FROM eclipse-temurin:17-jre
|
||||
FROM eclipse-temurin:17-jdk AS build
|
||||
|
||||
WORKDIR /opt/nextflow
|
||||
|
||||
COPY . /opt/nextflow
|
||||
|
||||
RUN chmod +x /opt/nextflow/launch.sh /opt/nextflow/nextflow || true
|
||||
RUN chmod +x ./gradlew ./launch.sh ./nextflow || true
|
||||
RUN ./gradlew clean assemble exportClasspath
|
||||
|
||||
RUN mkdir -p /opt/nextflow-runtime/lib \
|
||||
&& i=0; \
|
||||
tr ':' '\n' < /opt/nextflow/.launch.classpath | while read -r path; do \
|
||||
if [ -f "$path" ]; then \
|
||||
name="$(basename "$path")"; \
|
||||
target="/opt/nextflow-runtime/lib/$name"; \
|
||||
runtime_path="/opt/nextflow/lib/$name"; \
|
||||
if [ -e "$target" ]; then \
|
||||
i=$((i+1)); \
|
||||
target="/opt/nextflow-runtime/lib/${i}-$name"; \
|
||||
runtime_path="/opt/nextflow/lib/${i}-$name"; \
|
||||
fi; \
|
||||
cp "$path" "$target"; \
|
||||
echo "$runtime_path" >> /opt/nextflow-runtime/.launch.classpath.lines; \
|
||||
fi; \
|
||||
done \
|
||||
&& paste -sd ':' /opt/nextflow-runtime/.launch.classpath.lines > /opt/nextflow-runtime/.launch.classpath
|
||||
|
||||
RUN mkdir -p /opt/nextflow-runtime/plugins \
|
||||
&& cp /opt/nextflow/launch.sh /opt/nextflow-runtime/launch.sh \
|
||||
&& cp /opt/nextflow/nextflow /opt/nextflow-runtime/nextflow \
|
||||
&& cp -a /opt/nextflow/plugins/nf-k8s /opt/nextflow-runtime/plugins/nf-k8s
|
||||
|
||||
|
||||
FROM eclipse-temurin:17-jre
|
||||
|
||||
WORKDIR /opt/nextflow
|
||||
|
||||
COPY --from=build /opt/nextflow-runtime /opt/nextflow
|
||||
|
||||
RUN chmod +x /opt/nextflow/launch.sh \
|
||||
&& cp /opt/nextflow/launch.sh /opt/nextflow/nextflow \
|
||||
&& chmod +x /opt/nextflow/nextflow \
|
||||
&& echo "nextflow-dvfs-dev-image 0.1" > /opt/nextflow/DEV_IMAGE_MARKER
|
||||
|
||||
|
||||
RUN mkdir -p /opt/nextflow/conf \
|
||||
&& cat > /opt/nextflow/conf/scm <<'EOF'
|
||||
providers {
|
||||
mygitea {
|
||||
platform = 'gitea'
|
||||
server = 'https://gitea.kleine.eulenhexe.de'
|
||||
endpoint = 'https://gitea.kleine.eulenhexe.de/api/v1'
|
||||
}
|
||||
}
|
||||
EOF
|
||||
|
||||
ENV PATH="/opt/nextflow:${PATH}"
|
||||
ENV NXF_HOME="/opt/nextflow/.nextflow"
|
||||
ENV NXF_HOME="/tmp/.nextflow"
|
||||
ENV NXF_PLUGINS_MODE="dev"
|
||||
ENV NXF_PLUGINS_DIR="/opt/nextflow/plugins"
|
||||
ENV NXF_SCM_FILE="/opt/nextflow/conf/scm"
|
||||
|
||||
ENTRYPOINT ["/opt/nextflow/launch.sh"]
|
||||
|
||||
@@ -39,6 +39,9 @@ import nextflow.k8s.model.PodSecurityContext
|
||||
import nextflow.k8s.model.PodVolumeClaim
|
||||
import nextflow.k8s.model.ResourceType
|
||||
import nextflow.util.Duration
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
/**
|
||||
* Model Kubernetes specific settings defined in the nextflow
|
||||
* configuration file
|
||||
@@ -221,6 +224,24 @@ class K8sConfig implements ConfigScope {
|
||||
""")
|
||||
final String nextflowImage
|
||||
|
||||
@ConfigOption
|
||||
@Description("""
|
||||
The run interval of the kubernetes scheduler
|
||||
""")
|
||||
final Duration schedulerInterval
|
||||
|
||||
@ConfigOption
|
||||
@Description("""
|
||||
Enables task runtime recording
|
||||
""")
|
||||
final boolean recordTaskRuntimes
|
||||
|
||||
@ConfigOption
|
||||
@Description("""
|
||||
The runtime recording file
|
||||
""")
|
||||
final String runtimeRecordPath
|
||||
|
||||
/* required by extension point -- do not remove */
|
||||
K8sConfig() {
|
||||
this(Collections.emptyMap())
|
||||
@@ -251,6 +272,9 @@ class K8sConfig implements ConfigScope {
|
||||
storageSubPath = opts.storageSubPath
|
||||
userName = opts.userName
|
||||
nextflowImage = opts.nextflowImage ?: "nextflow/nextflow:${BuildInfo.version}"
|
||||
schedulerInterval = opts.schedulerInterval as Duration ?: new Duration(10, TimeUnit.SECONDS)
|
||||
recordTaskRuntimes = opts.recordTaskRuntimes as boolean ?: false
|
||||
runtimeRecordPath = opts.runtimeRecordPath as String ?: "${launchDir}/work/runtimes.csv"
|
||||
|
||||
launchDir = opts.launchDir ?: "${storageMountPath}/${getUserName()}"
|
||||
projectDir = opts.projectDir ?: "${storageMountPath}/projects"
|
||||
@@ -272,7 +296,6 @@ class K8sConfig implements ConfigScope {
|
||||
else if( securityContext )
|
||||
pod.securityContext = new PodSecurityContext(securityContext)
|
||||
|
||||
log.info("Hello sailor")
|
||||
nodeInit = new K8sNodeInitConfig(opts.nodeInit as Map ?: Collections.emptyMap())
|
||||
}
|
||||
|
||||
|
||||
@@ -53,18 +53,20 @@ class K8sExecutor extends Executor implements ExtensionPoint {
|
||||
private Cache<String, K8sClient> clientCache
|
||||
|
||||
private K8sTaskScheduler taskScheduler
|
||||
private Thread schedulerThread
|
||||
|
||||
K8sRuntimeRecorder runtimeRecorder
|
||||
|
||||
/**
|
||||
* @return The Kubernetes HTTP client. Delegates to a Guava cache that refreshes
|
||||
* the client (including the service account token) when the configured interval expires.
|
||||
*/
|
||||
protected K8sClient getClient() {
|
||||
K8sClient getClient() {
|
||||
clientCache.get('client', () -> new K8sClient(k8sConfig.getClient()))
|
||||
}
|
||||
|
||||
protected K8sTaskScheduler getTaskScheduler() {
|
||||
if( taskScheduler == null )
|
||||
taskScheduler = new K8sTaskScheduler(getClient(), new K8sHashSchedulingStrategy())
|
||||
assert taskScheduler != null
|
||||
return taskScheduler
|
||||
}
|
||||
|
||||
@@ -88,7 +90,21 @@ class K8sExecutor extends Executor implements ExtensionPoint {
|
||||
.expireAfterWrite(refreshInterval.toMillis(), TimeUnit.MILLISECONDS)
|
||||
.build()
|
||||
final client = getClient()
|
||||
|
||||
log.debug "[K8s] config=$k8sConfig; API client config=$client.config"
|
||||
|
||||
this.runtimeRecorder = new K8sRuntimeRecorder(k8sConfig.recordTaskRuntimes, k8sConfig.runtimeRecordPath)
|
||||
|
||||
this.taskScheduler = new K8sTaskScheduler(this, new K8sHashSchedulingStrategy())
|
||||
this.schedulerThread = new Thread(this.taskScheduler)
|
||||
this.schedulerThread.start()
|
||||
}
|
||||
|
||||
@Override
|
||||
void shutdown() {
|
||||
this.runtimeRecorder.write()
|
||||
this.taskScheduler.stop()
|
||||
this.schedulerThread.join()
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -0,0 +1,7 @@
|
||||
package nextflow.k8s
|
||||
|
||||
record K8sRuntimeRecord(
|
||||
String taskName,
|
||||
long inputSize,
|
||||
long runtimeMillis
|
||||
) {}
|
||||
@@ -0,0 +1,49 @@
|
||||
package nextflow.k8s
|
||||
|
||||
import groovy.util.logging.Slf4j
|
||||
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
|
||||
@Slf4j
|
||||
class K8sRuntimeRecorder {
|
||||
private final boolean enabled
|
||||
private final String recordPath
|
||||
|
||||
private ArrayList<K8sRuntimeRecord> records
|
||||
|
||||
K8sRuntimeRecorder(boolean enabled, String recordPath) {
|
||||
this.enabled = enabled
|
||||
this.recordPath = recordPath
|
||||
this.records = new ArrayList<>()
|
||||
}
|
||||
|
||||
/**
|
||||
* Records the runtime of a task.
|
||||
* This should be called for a finished task, which has its
|
||||
* start and end timestamps set.
|
||||
* @param task
|
||||
*/
|
||||
synchronized void record(K8sTaskHandler task) {
|
||||
if (!enabled)
|
||||
return;
|
||||
long runtimeMilis = task.completeTimeMillis - task.startTimeMillis
|
||||
long inputSizeSum = 0
|
||||
def inputFiles = task.task.getInputFilesMap()
|
||||
for (Map.Entry<String, Path> f : inputFiles) {
|
||||
try {
|
||||
inputSizeSum += Files.size(f.value)
|
||||
} catch (IOException ex) {
|
||||
log.error "[K8s] failed to get size of input file ${f.value} of task ${task.task.name}: ${ex.message}"
|
||||
}
|
||||
}
|
||||
|
||||
records.add(new K8sRuntimeRecord(task.task.processor.name, inputSizeSum, runtimeMilis))
|
||||
}
|
||||
|
||||
void write() {
|
||||
for (K8sRuntimeRecord record : records) {
|
||||
log.info("[K8s] ${record.taskName} - ${record.inputSize} Bytes - ${record.runtimeMillis} ms")
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -5,9 +5,10 @@ interface K8sSchedulingStrategy {
|
||||
* Selects the next task to run from the given task queue.
|
||||
* If the scheduler should wait, returns {@code null} instead
|
||||
*
|
||||
* @param scheduler the calling scheduler object
|
||||
* @param queue Pending scheduling requests
|
||||
* @param nodes Available Kubernetes node names
|
||||
* @return A launch decision, or {@code null} when no task should be launched now
|
||||
*/
|
||||
K8sSchedulingDecision schedule(List<K8sSchedulingRequest> queue, List<String> nodes)
|
||||
K8sSchedulingDecision schedule(K8sTaskScheduler scheduler, List<K8sSchedulingRequest> queue, List<String> nodes)
|
||||
}
|
||||
|
||||
@@ -334,6 +334,7 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
|
||||
void submit() {
|
||||
builder = createBashWrapper(task)
|
||||
builder.build()
|
||||
log.info "[K8s] submitting task ${this.task.name}"
|
||||
executor.taskScheduler.submit(this)
|
||||
}
|
||||
|
||||
@@ -470,6 +471,12 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
|
||||
deleteJobIfSuccessful(task)
|
||||
updateTimestamps(state.terminated as Map)
|
||||
determineNode()
|
||||
|
||||
// Signal the scheduler that this task has finished running
|
||||
if (executor != null) {
|
||||
executor.taskScheduler.taskFinished(this)
|
||||
executor.runtimeRecorder.record(this)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
|
||||
@@ -8,28 +8,62 @@ import java.util.concurrent.LinkedBlockingQueue
|
||||
|
||||
@Slf4j
|
||||
@CompileStatic
|
||||
class K8sTaskScheduler {
|
||||
private final K8sClient client
|
||||
class K8sTaskScheduler implements Runnable {
|
||||
private final K8sExecutor executor
|
||||
private final K8sSchedulingStrategy strategy
|
||||
private final LinkedBlockingQueue<K8sSchedulingRequest> queue = new LinkedBlockingQueue<>()
|
||||
private List<String> cachedNodes
|
||||
|
||||
K8sTaskScheduler(K8sClient client, K8sSchedulingStrategy strategy) {
|
||||
this.client = client
|
||||
private synchronized boolean shouldStop
|
||||
|
||||
private HashMap<String, ArrayList<K8sTaskHandler>> occupancy;
|
||||
private HashMap<String, String> taskToNode
|
||||
|
||||
K8sTaskScheduler(K8sExecutor executor, K8sSchedulingStrategy strategy) {
|
||||
this.executor = executor
|
||||
this.strategy = strategy
|
||||
this.occupancy = new HashMap<>()
|
||||
for (String node : getNodes()) {
|
||||
this.occupancy.put(node, new ArrayList<K8sTaskHandler>())
|
||||
}
|
||||
this.taskToNode = new HashMap<>()
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a task to the queue of outstanding tasks
|
||||
* @param handler
|
||||
*/
|
||||
void submit(K8sTaskHandler handler) {
|
||||
log.debug "[K8s] received queued task ${handler.task.name}"
|
||||
log.info "[K8s] received queued task ${handler.task.name}"
|
||||
queue.add(new K8sSchedulingRequest(handler))
|
||||
|
||||
drain()
|
||||
/* TODO: Decide if we should invoke the scheduler immediately */
|
||||
}
|
||||
|
||||
/**
|
||||
* Notify the scheduler that a task has finished execution
|
||||
* @param handler
|
||||
*/
|
||||
void taskFinished(K8sTaskHandler handler) {
|
||||
/* Remove from occupancy list */
|
||||
String nodeName = taskToNode.get(handler.task.name)
|
||||
if (nodeName == null) {
|
||||
log.error "[K8s] no node saved for task ${handler.task.name}"
|
||||
return
|
||||
}
|
||||
ArrayList<K8sTaskHandler> nodeTasks = occupancy.get(nodeName)
|
||||
if (nodeTasks == null) {
|
||||
log.error "[K8s] tried to remove task ${handler.task.name} from node ${nodeName} but no tasks are saved for that node"
|
||||
return
|
||||
}
|
||||
nodeTasks.remove(handler)
|
||||
log.info "[K8s] removed task ${handler.task.name} from node ${nodeName}"
|
||||
}
|
||||
|
||||
protected synchronized void drain() {
|
||||
while( true ) {
|
||||
final pending = new ArrayList<K8sSchedulingRequest>(queue)
|
||||
final decision = strategy.schedule(pending, getNodes())
|
||||
final decision = strategy.schedule(this, pending, getNodes())
|
||||
|
||||
if ( !decision )
|
||||
return
|
||||
@@ -37,11 +71,42 @@ class K8sTaskScheduler {
|
||||
if ( !queue.remove(decision.request) )
|
||||
continue
|
||||
|
||||
log.debug "[K8s] launching queued task ${decision.request.task.name} on node: ${decision.nodeName}"
|
||||
log.info "[K8s] launching queued task ${decision.request.task.name} on node: ${decision.nodeName}"
|
||||
decision.request.handler.submitNow(decision.nodeName)
|
||||
ArrayList nodeTasks = occupancy.get(decision.nodeName)
|
||||
assert nodeTasks != null
|
||||
nodeTasks.add(decision.request.handler)
|
||||
taskToNode.put(decision.request.task.name, decision.nodeName)
|
||||
}
|
||||
}
|
||||
|
||||
/* Scheduling Strategy Interface */
|
||||
List<String> getFreeNodes() {
|
||||
def unoccupied = occupancy.findAll {it.value == null || it.value.size() == 0 }
|
||||
unoccupied.keySet().toList()
|
||||
}
|
||||
|
||||
/**
|
||||
* Run is the scheduler threads main function
|
||||
* */
|
||||
void run() {
|
||||
this.shouldStop = false
|
||||
def interval = this.executor.getK8sConfig().schedulerInterval
|
||||
while (!shouldStop) {
|
||||
sleep(interval.toMillis())
|
||||
drain()
|
||||
}
|
||||
log.info("[K8s] terminated scheduler loop")
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop terminates the scheduler thread
|
||||
*/
|
||||
void stop() {
|
||||
log.info("[K8s] stopping scheduler loop")
|
||||
this.shouldStop = true
|
||||
}
|
||||
|
||||
protected List<String> getNodes() {
|
||||
if ( cachedNodes == null )
|
||||
cachedNodes = fetchNodes()
|
||||
@@ -50,7 +115,7 @@ class K8sTaskScheduler {
|
||||
|
||||
@CompileDynamic
|
||||
private List<String> fetchNodes() {
|
||||
final resp = client.nodeList()
|
||||
final resp = executor.getClient().nodeList()
|
||||
ArrayList<String> nodes = new ArrayList<String>()
|
||||
for ( Map item : resp.items ) {
|
||||
nodes.add(item.metadata.name as String)
|
||||
|
||||
@@ -4,16 +4,22 @@ import groovy.transform.CompileStatic
|
||||
import nextflow.k8s.K8sSchedulingDecision
|
||||
import nextflow.k8s.K8sSchedulingRequest
|
||||
import nextflow.k8s.K8sSchedulingStrategy
|
||||
import nextflow.k8s.K8sTaskScheduler
|
||||
|
||||
@CompileStatic
|
||||
class K8sHashSchedulingStrategy implements K8sSchedulingStrategy {
|
||||
|
||||
@Override
|
||||
K8sSchedulingDecision schedule(List<K8sSchedulingRequest> queue, List<String> nodes) {
|
||||
K8sSchedulingDecision schedule(K8sTaskScheduler scheduler, List<K8sSchedulingRequest> queue, List<String> nodes) {
|
||||
if ( !queue || !nodes )
|
||||
return null
|
||||
|
||||
final freeNodes = scheduler.freeNodes
|
||||
if ( freeNodes.size() == 0 )
|
||||
return null
|
||||
|
||||
final request = queue[0]
|
||||
final index = Math.floorMod(request.task.hash.asInt(), nodes.size())
|
||||
return new K8sSchedulingDecision(request, nodes[index])
|
||||
final index = Math.floorMod(request.task.hash.asInt(), freeNodes.size())
|
||||
return new K8sSchedulingDecision(request, freeNodes[index])
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,9 +8,9 @@ class K8sTaskSchedulerTest extends Specification {
|
||||
|
||||
def 'should enqueue task handler without immediately submitting it' () {
|
||||
given:
|
||||
def client = Mock(K8sClient)
|
||||
def executor = Mock(K8sExecutor)
|
||||
def strategy = Mock(K8sSchedulingStrategy)
|
||||
def scheduler = new K8sTaskScheduler(client, strategy)
|
||||
def scheduler = new K8sTaskScheduler(executor, strategy)
|
||||
|
||||
def task = Mock(TaskRun)
|
||||
def handler = Spy(K8sTaskHandler)
|
||||
@@ -20,8 +20,9 @@ class K8sTaskSchedulerTest extends Specification {
|
||||
scheduler.submit(handler)
|
||||
|
||||
then:
|
||||
1 * scheduler.() >> ['A', 'B', 'C']
|
||||
0 * strategy._
|
||||
0 * client._
|
||||
0 * executor._
|
||||
0 * handler.submitNow(_)
|
||||
|
||||
and:
|
||||
@@ -31,8 +32,9 @@ class K8sTaskSchedulerTest extends Specification {
|
||||
def 'should drain queued task and submit it to selected node' () {
|
||||
given:
|
||||
def client = Mock(K8sClient)
|
||||
def executor = Mock(K8sExecutor)
|
||||
def strategy = Mock(K8sSchedulingStrategy)
|
||||
def scheduler = new K8sTaskScheduler(client, strategy)
|
||||
def scheduler = new K8sTaskScheduler(executor, strategy)
|
||||
|
||||
def task = Mock(TaskRun)
|
||||
def handler = Spy(K8sTaskHandler)
|
||||
@@ -43,6 +45,7 @@ class K8sTaskSchedulerTest extends Specification {
|
||||
scheduler.drain()
|
||||
|
||||
then:
|
||||
1 * executor.getClient() >> client
|
||||
1 * client.nodeList() >> [
|
||||
items: [
|
||||
[metadata: [name: 'node-a']],
|
||||
@@ -70,8 +73,9 @@ class K8sTaskSchedulerTest extends Specification {
|
||||
def 'should keep task queued when strategy returns no decision' () {
|
||||
given:
|
||||
def client = Mock(K8sClient)
|
||||
def executor = Mock(K8sExecutor)
|
||||
def strategy = Mock(K8sSchedulingStrategy)
|
||||
def scheduler = new K8sTaskScheduler(client, strategy)
|
||||
def scheduler = new K8sTaskScheduler(executor, strategy)
|
||||
|
||||
def task = Mock(TaskRun)
|
||||
def handler = Spy(K8sTaskHandler)
|
||||
@@ -82,6 +86,7 @@ class K8sTaskSchedulerTest extends Specification {
|
||||
scheduler.drain()
|
||||
|
||||
then:
|
||||
1 * executor.getClient() >> client
|
||||
1 * client.nodeList() >> [
|
||||
items: [
|
||||
[metadata: [name: 'node-a']]
|
||||
@@ -101,8 +106,9 @@ class K8sTaskSchedulerTest extends Specification {
|
||||
def 'should continue draining when selected request was already removed' () {
|
||||
given:
|
||||
def client = Mock(K8sClient)
|
||||
def executor = Mock(K8sExecutor)
|
||||
def strategy = Mock(K8sSchedulingStrategy)
|
||||
def scheduler = new K8sTaskScheduler(client, strategy)
|
||||
def scheduler = new K8sTaskScheduler(executor, strategy)
|
||||
|
||||
def task1 = Mock(TaskRun)
|
||||
def handler1 = Spy(K8sTaskHandler)
|
||||
@@ -118,6 +124,7 @@ class K8sTaskSchedulerTest extends Specification {
|
||||
scheduler.drain()
|
||||
|
||||
then:
|
||||
1 * executor.getClient() >> client
|
||||
1 * client.nodeList() >> [
|
||||
items: [
|
||||
[metadata: [name: 'node-a']]
|
||||
|
||||
@@ -22,8 +22,10 @@ k8s {
|
||||
projectDir = '/workspace/projects'
|
||||
cleanup = false
|
||||
|
||||
nextflowImage = 'gitea.kleine.eulenhexe.de/kevin/ma/nextflow-dvfs:0.1'
|
||||
nextflowImage = 'gitea.kleine.eulenhexe.de/kevin/ma/nextflow-dvfs:0.5.3'
|
||||
imagePullPolicy = 'IfNotPresent'
|
||||
schedulerInterval = '10s'
|
||||
recordTaskRuntimes = true
|
||||
|
||||
nodeInit {
|
||||
enabled = false
|
||||
|
||||
Reference in New Issue
Block a user