feat: helper for deploying pods before workflow execution starts
This commit is contained in:
@@ -149,7 +149,12 @@ class K8sDriverLauncher {
|
||||
this.k8sConfig = makeK8sConfig(config.toMap())
|
||||
this.k8sClient = makeK8sClient(k8sConfig)
|
||||
this.k8sConfig.checkStorageAndPaths(k8sClient)
|
||||
|
||||
createK8sConfigMap()
|
||||
|
||||
K8sNodeInitDeployer initDeployer = new K8sNodeInitDeployer(k8sClient, k8sConfig, runName)
|
||||
initDeployer.deploy()
|
||||
|
||||
createK8sLauncherPod()
|
||||
waitPodStart()
|
||||
// login into container session
|
||||
|
||||
@@ -0,0 +1,83 @@
|
||||
package nextflow.k8s
|
||||
|
||||
import nextflow.k8s.client.K8sClient
|
||||
import nextflow.k8s.model.PodHostMount
|
||||
import nextflow.k8s.model.PodSpecBuilder
|
||||
import nextflow.k8s.model.PodVolumeClaim
|
||||
|
||||
class K8sNodeInitDeployer {
|
||||
private K8sClient client
|
||||
private K8sConfig config
|
||||
private String runName
|
||||
|
||||
K8sNodeInitDeployer(K8sClient client, K8sConfig config, String runName) {
|
||||
this.client = client
|
||||
this.config = config
|
||||
this.runName = runName
|
||||
}
|
||||
|
||||
void deploy() {
|
||||
final init = config.nodeInit
|
||||
if ( !init?.enabled )
|
||||
return
|
||||
|
||||
final nodes = getNodes()
|
||||
for ( String nodeName : nodes ) {
|
||||
final spec = makePodSpec(init, nodeName)
|
||||
client.podCreate(spec)
|
||||
}
|
||||
|
||||
waitForPods(nodes)
|
||||
}
|
||||
|
||||
private List<String> getNodes() {
|
||||
final resp = client.nodeList()
|
||||
ArrayList<String> nodes = new ArrayList<String>()
|
||||
for ( Map item : resp.items ) {
|
||||
nodes.add(item.metadata.name as String)
|
||||
}
|
||||
return nodes
|
||||
}
|
||||
|
||||
private String buildPodName(String nodeName) {
|
||||
// TODO: Remove anything that is not lowercase alpha-numeric or dash
|
||||
String sanitizedNodeName = nodeName.toLowerCase()
|
||||
String name = "nf-init-${runName}-${sanitizedNodeName}"
|
||||
if ( name.length() > 63 )
|
||||
name = name.substring(0, 63)
|
||||
return name
|
||||
}
|
||||
|
||||
private Map makePodSpec(K8sNodeInitConfig config, String nodeName) {
|
||||
ArrayList<PodHostMount> mounts = new ArrayList<PodHostMount>()
|
||||
mounts.add(new PodHostMount("/sys", "/sys"))
|
||||
mounts.add(new PodHostMount("/dev", "/dev"))
|
||||
mounts.add(new PodHostMount("/lib/modules", "/lib/modules"))
|
||||
|
||||
PodSpecBuilder builder = new PodSpecBuilder()
|
||||
return builder.withNodeName(nodeName)
|
||||
.withImageName(config.image)
|
||||
.withCommand(config.command)
|
||||
.withPrivileged(true)
|
||||
.withHostMounts(mounts)
|
||||
.withPodName(buildPodName(nodeName))
|
||||
.build()
|
||||
}
|
||||
|
||||
private void waitForPods(List<String> nodes) {
|
||||
for ( String nodeName : nodes ) {
|
||||
String podName = buildPodName(nodeName)
|
||||
|
||||
// TODO: Support differentiate between wait = running
|
||||
// and wait = Succeeded
|
||||
|
||||
while( true ) {
|
||||
sleep 1000
|
||||
final state = k8sClient.podState(podName)
|
||||
if( state && !state.containsKey('waiting') ) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -126,6 +126,8 @@ class PodSpecBuilder {
|
||||
|
||||
String runtimeClassName
|
||||
|
||||
String nodeName
|
||||
|
||||
/**
|
||||
* @return A sequential volume unique identifier
|
||||
*/
|
||||
@@ -399,6 +401,11 @@ class PodSpecBuilder {
|
||||
return this
|
||||
}
|
||||
|
||||
PodSpecBuilder withNodeName(String value) {
|
||||
this.nodeName = value
|
||||
return this
|
||||
}
|
||||
|
||||
@PackageScope List<Map> createPullSecret() {
|
||||
def result = new ArrayList(1)
|
||||
def entry = new LinkedHashMap(1)
|
||||
@@ -496,6 +503,9 @@ class PodSpecBuilder {
|
||||
if ( runtimeClassName )
|
||||
spec.runtimeClassName = runtimeClassName
|
||||
|
||||
if ( nodeName )
|
||||
spec.nodeName = nodeName
|
||||
|
||||
final pod = [
|
||||
apiVersion: 'v1',
|
||||
kind: 'Pod',
|
||||
|
||||
Reference in New Issue
Block a user