add nextflow d30e48d

This commit is contained in:
2026-04-29 23:01:54 +02:00
parent d0b12d668d
commit 97cc9058d3
2840 changed files with 730250 additions and 0 deletions

View File

@@ -0,0 +1,116 @@
# Seqera Executor plugin for Nextflow
## Summary
The Seqera Executor plugin provides integration with Seqera Cloud for executing Nextflow tasks using Seqera's managed compute infrastructure.
## Get Started
To use this plugin, add it to your `nextflow.config`:
```groovy
plugins {
id 'nf-seqera'
}
```
Configure the Seqera executor:
```groovy
process {
executor = 'seqera'
}
seqera {
executor {
region = 'eu-west-1'
autoLabels = true
}
}
tower {
accessToken = '<SEQERA ACCESS TOKEN>'
}
```
Alternatively, set the access token via environment variable:
```bash
export TOWER_ACCESS_TOKEN='<YOUR ACCESS TOKEN>'
```
## Examples
### Running a workflow with the Seqera executor
`nextflow.config`:
```groovy
plugins {
id 'nf-seqera'
}
process {
executor = 'seqera'
}
tower {
accessToken = '<SEQERA ACCESS TOKEN>'
}
seqera {
executor {
region = 'eu-west-1'
}
}
```
`main.nf`:
```groovy
process HELLO {
output:
path 'hello.txt'
script:
'''
echo "Hello from Seqera Cloud" > hello.txt
'''
}
workflow {
HELLO()
}
```
### Using resource labels for cost tracking
```groovy
seqera {
executor {
region = 'us-east-1'
labels = [team: 'genomics', project: 'wgs-analysis']
autoLabels = true
}
}
```
### Using the resource prediction model
```groovy
seqera {
executor {
region = 'eu-west-1'
predictionModel = 'qr/v1'
}
}
```
## Resources
- [Seqera Platform Documentation](https://docs.seqera.io/)
## License
[Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0)

View File

@@ -0,0 +1 @@
0.19.0

View File

@@ -0,0 +1,59 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
plugins {
id 'io.nextflow.nextflow-plugin' version "${nextflowPluginVersion}"
id 'java-test-fixtures'
}
nextflowPlugin {
nextflowVersion = '26.03.4-edge'
provider = "${nextflowPluginProvider}"
description = 'Integrates with Seqera Platform for comprehensive workflow monitoring, resource tracking, and cache management capabilities'
className = 'io.seqera.plugin.SeqeraPlugin'
useDefaultDependencies = false
generateSpec = false
extensionPoints = [
'io.seqera.executor.SeqeraExecutor',
'io.seqera.config.SeqeraConfig'
]
}
sourceSets {
main.java.srcDirs = []
main.groovy.srcDirs = ['src/main']
main.resources.srcDirs = ['src/resources']
test.groovy.srcDirs = ['src/test']
test.java.srcDirs = []
test.resources.srcDirs = []
}
configurations {
// see https://docs.gradle.org/4.1/userguide/dependency_management.html#sub:exclude_transitive_dependencies
runtimeClasspath.exclude group: 'org.slf4j', module: 'slf4j-api'
}
dependencies {
compileOnly project(':nextflow')
compileOnly 'org.slf4j:slf4j-api:2.0.17'
compileOnly 'org.pf4j:pf4j:3.14.1'
api 'io.seqera:sched-client:0.52.0'
testImplementation(testFixtures(project(":nextflow")))
testImplementation "org.apache.groovy:groovy:4.0.31"
testImplementation "org.apache.groovy:groovy-nio:4.0.31"
}

View File

@@ -0,0 +1,31 @@
nf-seqera changelog
====================
0.19.0 - 25 Apr 2026
- Add hints process directive for executor-specific scheduling hints (#7034) [406358e03]
- Add workspaceId/computeEnvId to nf-seqera auto labels (#7059) [5e8276c00]
- Remove arch config option from Seqera MachineRequirement (#7063) [da06e9a9d]
- Fix IllegalArgumentException when process.resourceLabels is a closure (#7068) [944977e3f]
0.18.0 - 20 Apr 2026
- Honour process.resourceLabels in nf-seqera executor (#7048) [979f684ff]
- Filter autoLabels to selected workflow-metadata fields (#7049) [ddc974fe6]
0.17.0 - 7 Apr 2026
- Add resourceAllocation field to trace record (#6973) [a2742939c]
- Add compute env ID and provider support to Seqera executor (#6906) [4c2eb9390]
- Bump sched-client@0.49.0 [784a14353]
0.16.0 - 17 Mar 2026
- Default Fusion to v2.6 for Seqera executor (#6933) [8a7e53957]
0.15.0 - 10 Mar 2026
- Add provider option to Seqera executor config (#6908) [c70376d0a]
0.14.0 - 4 Mar 2026
- Use PipelineSpec model in CreateRunRequest (#6887) [4bd7e4f02]
0.13.0 - 3 Mar 2026
- Add NVMe disk allocation and diskMountPath support (#6879) [6644d3c90]
0.12.0 - 28 Feb 2026
- Sched core implementation alpha1 (#6242) [870c858af]

View File

@@ -0,0 +1,205 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seqera.config
import groovy.transform.CompileStatic
import nextflow.config.spec.ConfigOption
import nextflow.config.spec.ConfigScope
import nextflow.script.dsl.Description
import nextflow.util.Duration
/**
* Configuration for the Seqera executor.
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
@Description("""
The `seqera.executor` scope provides configuration for the Seqera compute executor.
""")
@CompileStatic
class ExecutorOpts implements ConfigScope {
static final Set<String> VALID_AUTO_LABELS = Collections.unmodifiableSet(new LinkedHashSet<>([
'projectName', 'userName', 'runName', 'sessionId', 'resume',
'revision', 'commitId', 'repository', 'manifestName',
'runtimeVersion', 'workflowId', 'workspaceId', 'computeEnvId'
]))
final RetryOpts retryPolicy
@ConfigOption
@Description("""
The Seqera scheduler service endpoint URL.
""")
final String endpoint
@ConfigOption
@Description("""
The compute backend provider type (e.g. `aws`, `local`).
When specified, used together with region to select the matching compute environment.
""")
final String provider
@ConfigOption
@Description("""
The AWS region for task execution (default: `eu-central-1`).
""")
final String region
@ConfigOption
@Description("""
The EC2 key pair name for SSH access to instances.
""")
final String keyPairName
@ConfigOption
@Description("""
The interval for batching task submissions (default: `1 sec`).
""")
final Duration batchFlushInterval
@Description("""
Machine/infrastructure requirements for session tasks.
""")
final MachineRequirementOpts machineRequirement
@ConfigOption
@Description("""
Automatically attach workflow metadata labels (with the `nextflow.io/` and
`seqera.io/platform/` prefixes) to the session. Accepts:
- `true`: include all available metadata labels
- `false` (default): disable
- a list or comma-separated string of short names: e.g.
`['runName', 'projectName']` or `'runName,projectName'`
Valid names: `projectName`, `userName`, `runName`, `sessionId`, `resume`,
`revision`, `commitId`, `repository`, `manifestName`, `runtimeVersion`,
`workflowId`, `workspaceId`, `computeEnvId`.
""")
final Set<String> autoLabels
@ConfigOption
@Description("""
The resource prediction model to use for estimating task resource requirements
based on historical execution metrics. Supported values: `qr/v1`, `qr/v2` (quantile regression).
When not set, no resource estimation is applied.
""")
final String predictionModel
@ConfigOption
@Description("""
Custom environment variables to apply to all tasks submitted by the Seqera executor.
These are merged with the Fusion environment variables, with Fusion variables taking precedence.
""")
final Map<String, String> taskEnvironment
@ConfigOption
@Description("""
The Seqera Platform compute environment ID. When specified, the scheduler resolves
the compute environment directly by this ID instead of listing all workspace CEs.
Used as a fallback when the workflow launch does not include a CE reference.
""")
final String computeEnvId
/* required by config scope -- do not remove */
ExecutorOpts() {}
ExecutorOpts(Map opts) {
this.retryPolicy = new RetryOpts(opts.retryPolicy as Map ?: Map.of())
this.endpoint = opts.endpoint as String
if (!endpoint)
throw new IllegalArgumentException("Missing Seqera endpoint - make sure to specify 'seqera.executor.endpoint' settings")
this.provider = opts.provider as String
this.region = opts.region as String
this.keyPairName = opts.keyPairName as String
this.batchFlushInterval = opts.batchFlushInterval
? Duration.of(opts.batchFlushInterval as String)
: Duration.of('1 sec')
// machine requirement settings
this.machineRequirement = new MachineRequirementOpts(opts.machineRequirement as Map ?: Map.of())
this.autoLabels = parseAutoLabels(opts.get('autoLabels'))
// prediction model
this.predictionModel = opts.predictionModel as String ?: null
// custom task environment variables
this.taskEnvironment = opts.taskEnvironment as Map<String, String>
// compute environment ID
this.computeEnvId = opts.computeEnvId as String
}
RetryOpts retryOpts() {
this.retryPolicy
}
String getEndpoint() {
return endpoint
}
String getProvider() {
return provider
}
String getRegion() {
return region
}
String getKeyPairName() {
return keyPairName
}
Duration getBatchFlushInterval() {
return batchFlushInterval
}
MachineRequirementOpts getMachineRequirement() {
return machineRequirement
}
Set<String> getAutoLabels() {
return autoLabels
}
protected static Set<String> parseAutoLabels(Object value) {
if( value == null || value == false )
return Collections.<String>emptySet()
if( value == true )
return VALID_AUTO_LABELS
List<String> raw
if( value instanceof CharSequence )
raw = value.toString().tokenize(',').collect { String s -> s.trim() }.findAll { String s -> s }
else if( value instanceof List )
raw = ((List) value).collect { it?.toString()?.trim() }.findAll { String s -> s } as List<String>
else
throw new IllegalArgumentException("Invalid 'seqera.executor.autoLabels' value '${value}' - expected true, false, a list, or a comma-separated string")
final invalid = raw.findAll { String s -> !(s in VALID_AUTO_LABELS) }
if( invalid )
throw new IllegalArgumentException("Invalid 'seqera.executor.autoLabels' name(s) ${invalid} - valid names are: ${VALID_AUTO_LABELS.join(', ')}")
return Collections.unmodifiableSet(new LinkedHashSet<>(raw))
}
String getPredictionModel() {
return predictionModel
}
Map<String, String> getTaskEnvironment() {
return taskEnvironment
}
String getComputeEnvId() {
return computeEnvId
}
}

View File

@@ -0,0 +1,171 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seqera.config
import groovy.transform.CompileStatic
import nextflow.config.spec.ConfigOption
import nextflow.config.spec.ConfigScope
import nextflow.script.dsl.Description
import nextflow.util.MemoryUnit
/**
* Machine/infrastructure requirements configuration options.
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
@CompileStatic
class MachineRequirementOpts implements ConfigScope {
@ConfigOption
@Description("""
The instance provisioning mode: `spot`, `ondemand`, or `spotFirst`.
""")
final String provisioning
@ConfigOption
@Description("""
Maximum number of spot retry attempts before falling back to on-demand.
Only used when provisioning is `spot` or `spotFirst`.
""")
final Integer maxSpotAttempts
@ConfigOption
@Description("""
List of acceptable machine type patterns. Supports exact types (e.g., `t3.small`),
family prefixes (e.g., `m5` matches all m5 sizes), and glob wildcards (e.g., `t*.small`).
""")
final List<String> machineTypes
@ConfigOption
@Description("""
The EBS volume type for task scratch disk (e.g., `ebs/gp3`, `ebs/io1`).
Default: `ebs/gp3`.
""")
final String diskType
@ConfigOption
@Description("""
The throughput in MiB/s for gp3 volumes (125-1000).
Default: 325 (Fusion recommended).
""")
final Integer diskThroughputMiBps
@ConfigOption
@Description("""
The IOPS for io1/io2/gp3 volumes. Required for io1/io2.
""")
final Integer diskIops
@ConfigOption
@Description("""
Enable KMS encryption for the EBS volume.
Default: false.
""")
final Boolean diskEncrypted
@ConfigOption
@Description("""
The disk allocation strategy: `task` or `node`.
- `task`: Per-task EBS volume created at task launch (default)
- `node`: Per-node instance storage attached at cluster level
""")
final String diskAllocation
@ConfigOption
@Description("""
The container path where the disk is mounted (e.g., `/data`).
Default: `/tmp`.
""")
final String diskMountPath
@ConfigOption
@Description("""
The disk size for session-level storage (e.g., `100.GB`).
""")
final MemoryUnit diskSize
@ConfigOption
@Description("""
The ECS capacity provider mode: `managed` (default) or `asg`.
- `managed`: ECS Managed Instances
- `asg`: Auto Scaling Group-backed capacity provider
""")
final String capacityMode
/* required by config scope -- do not remove */
MachineRequirementOpts() {}
MachineRequirementOpts(Map opts) {
this.provisioning = opts.provisioning as String
this.maxSpotAttempts = opts.maxSpotAttempts as Integer
this.machineTypes = (opts.machineTypes ?: opts.machineFamilies) as List<String>
this.diskType = opts.diskType as String
this.diskThroughputMiBps = opts.diskThroughputMiBps as Integer
this.diskIops = opts.diskIops as Integer
this.diskEncrypted = opts.diskEncrypted as Boolean
this.diskAllocation = opts.diskAllocation as String
this.diskMountPath = opts.diskMountPath as String
this.diskSize = opts.diskSize instanceof MemoryUnit
? opts.diskSize as MemoryUnit
: (opts.diskSize ? MemoryUnit.of(opts.diskSize as String) : null)
this.capacityMode = opts.capacityMode as String
}
String getProvisioning() {
return provisioning
}
Integer getMaxSpotAttempts() {
return maxSpotAttempts
}
List<String> getMachineTypes() {
return machineTypes
}
String getDiskType() {
return diskType
}
Integer getDiskThroughputMiBps() {
return diskThroughputMiBps
}
Integer getDiskIops() {
return diskIops
}
Boolean getDiskEncrypted() {
return diskEncrypted
}
String getDiskAllocation() {
return diskAllocation
}
String getDiskMountPath() {
return diskMountPath
}
MemoryUnit getDiskSize() {
return diskSize
}
String getCapacityMode() {
return capacityMode
}
}

View File

@@ -0,0 +1,92 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seqera.config
import groovy.transform.CompileStatic
import groovy.transform.ToString
import io.seqera.util.retry.Retryable
import nextflow.config.spec.ConfigOption
import nextflow.config.spec.ConfigScope
import nextflow.script.dsl.Description
import nextflow.util.Duration
/**
* Model retry options for Seqera scheduler HTTP requests.
* Implements {@link Retryable.Config} for integration with lib-retry.
*/
@ToString(includeNames = true, includePackage = false)
@CompileStatic
class RetryOpts implements ConfigScope, Retryable.Config {
@ConfigOption
@Description("""
The initial delay when a failing HTTP request is retried (default: `450ms`).
""")
Duration delay = Duration.of('450ms')
@ConfigOption
@Description("""
The max delay when a failing HTTP request is retried (default: `90s`).
""")
Duration maxDelay = Duration.of('90s')
@ConfigOption
@Description("""
The maximum number of retry attempts (default: `10`).
""")
int maxAttempts = 10
@ConfigOption
@Description("""
The jitter factor for randomizing retry delays (default: `0.25`).
""")
double jitter = 0.25
@ConfigOption
@Description("""
The multiplier for exponential backoff (default: `2.0`).
""")
double multiplier = 2.0d
RetryOpts() {
this(Collections.emptyMap())
}
RetryOpts(Map config) {
if( config.delay )
delay = config.delay as Duration
if( config.maxDelay )
maxDelay = config.maxDelay as Duration
if( config.maxAttempts )
maxAttempts = config.maxAttempts as int
if( config.jitter )
jitter = config.jitter as double
if( config.multiplier )
multiplier = config.multiplier as double
}
// Methods required by Retryable.Config interface
@Override
java.time.Duration getDelayAsDuration() {
return java.time.Duration.ofMillis(delay.toMillis())
}
@Override
java.time.Duration getMaxDelayAsDuration() {
return java.time.Duration.ofMillis(maxDelay.toMillis())
}
}

View File

@@ -0,0 +1,53 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seqera.config
import groovy.transform.CompileStatic
import nextflow.config.spec.ConfigScope
import nextflow.config.spec.ScopeName
import nextflow.script.dsl.Description
/**
* Top-level configuration scope for Seqera settings.
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
@ScopeName("seqera")
@Description("""
The `seqera` scope provides configuration for Seqera services.
""")
@CompileStatic
class SeqeraConfig implements ConfigScope {
@Description("""
Configuration for the Seqera compute executor.
""")
final ExecutorOpts executor
/* required by config scope -- do not remove */
SeqeraConfig() {}
SeqeraConfig(Map opts) {
this.executor = opts.executor
? new ExecutorOpts(opts.executor as Map)
: null
}
ExecutorOpts getExecutor() {
return executor
}
}

View File

@@ -0,0 +1,143 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seqera.executor
import java.nio.file.FileVisitResult
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.SimpleFileVisitor
import java.nio.file.attribute.BasicFileAttributes
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.seqera.sched.api.schema.v1a1.InputFilesMetrics
import nextflow.file.FileHolder
import nextflow.processor.TaskRun
/**
* Computes input files metrics for a task.
* Follows symlinks and recursively computes directory sizes.
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
@Slf4j
@CompileStatic
class InputFilesProfiler {
/**
* Compute input files metrics for a task.
*
* @param task The task to compute metrics for
* @return InputFilesMetrics or null if no input files
*/
static InputFilesMetrics compute(TaskRun task) {
final files = task?.inputFiles
if( !files || files.isEmpty() )
return null
return compute0(files)
}
/**
* Compute input files metrics from a list of file holders.
*
* @param files List of FileHolder objects
* @return InputFilesMetrics or null if list is empty
*/
static InputFilesMetrics compute(List<FileHolder> files) {
if( !files || files.isEmpty() )
return null
return compute0(files)
}
private static InputFilesMetrics compute0(List<FileHolder> files) {
int totalCount = 0
long totalBytes = 0
long maxFileBytes = Long.MIN_VALUE
long minFileBytes = Long.MAX_VALUE
for( FileHolder fh : files ) {
final long[] result = getFileStats(fh.storePath)
final count = (int) result[0]
final size = result[1]
totalCount += count
totalBytes += size
if( size > maxFileBytes )
maxFileBytes = size
if( size < minFileBytes )
minFileBytes = size
}
return new InputFilesMetrics()
.count(totalCount)
.totalBytes(totalBytes)
.maxFileBytes(maxFileBytes)
.minFileBytes(minFileBytes)
}
/**
* Get file stats for a path: file count and total size.
* For regular files, count is 1. For directories, count is the number of files within.
*
* @param path The path to measure
* @return A two-element array: [fileCount, totalSize]
*/
private static long[] getFileStats(Path path) {
if( path == null )
return new long[]{0, 0}
try {
if( Files.isDirectory(path) ) {
return computeDirStats(path)
}
// Files.size() follows symlinks by default
return new long[]{1, Files.size(path)}
}
catch( Exception e ) {
log.warn "Unable to determine size for input file: ${path} - ${e.message}"
return new long[]{1, 0}
}
}
/**
* Recursively compute file count and total size of a directory.
*
* @param dir The directory path
* @return A two-element array: [fileCount, totalSize]
*/
private static long[] computeDirStats(Path dir) {
final long[] result = [0L, 0L] // [count, size]
Files.walkFileTree(dir, new SimpleFileVisitor<Path>() {
@Override
FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
result[0]++
result[1] += attrs.size()
return FileVisitResult.CONTINUE
}
@Override
FileVisitResult visitFileFailed(Path file, IOException exc) {
log.warn "Unable to access file during size computation: ${file} - ${exc.message}"
return FileVisitResult.CONTINUE
}
})
return result
}
}

View File

@@ -0,0 +1,171 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seqera.executor
import com.google.common.hash.Hashing
import groovy.transform.CompileStatic
import nextflow.NextflowMeta
import nextflow.script.WorkflowMetadata
/**
* Helper class to manage run labels.
*
* Builds the labels map from workflow metadata ({@code nextflow.io/*}),
* scheduler metadata ({@code seqera:sched:*}), and user-configured labels.
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
@CompileStatic
class Labels {
static final Set<String> ALL_AUTO_LABELS = Collections.unmodifiableSet(new LinkedHashSet<>([
'projectName', 'userName', 'runName', 'sessionId', 'resume',
'revision', 'commitId', 'repository', 'manifestName',
'runtimeVersion', 'workflowId', 'workspaceId', 'computeEnvId'
]))
private final Map<String,String> entries = new LinkedHashMap<>(20)
/**
* Add all {@code nextflow.io/*} and {@code seqera.io/platform/*} labels
* derived from workflow metadata.
*/
Labels withWorkflowMetadata(WorkflowMetadata workflow) {
return withWorkflowMetadata(workflow, ALL_AUTO_LABELS)
}
/**
* Add workflow metadata labels filtered by the {@code include} set of
* short names (e.g. {@code 'runName'}). Unknown names are ignored; the
* caller is expected to validate membership upstream.
*/
Labels withWorkflowMetadata(WorkflowMetadata workflow, Set<String> include) {
if( !include ) return this
if( include.contains('projectName') && workflow.projectName )
entries.put('nextflow.io/projectName', workflow.projectName)
if( include.contains('userName') && workflow.userName )
entries.put('nextflow.io/userName', workflow.userName)
if( include.contains('runName') && workflow.runName )
entries.put('nextflow.io/runName', workflow.runName)
if( include.contains('sessionId') && workflow.sessionId )
entries.put('nextflow.io/sessionId', workflow.sessionId.toString())
if( include.contains('resume') )
entries.put('nextflow.io/resume', String.valueOf(workflow.resume))
if( include.contains('revision') && workflow.revision )
entries.put('nextflow.io/revision', workflow.revision)
if( include.contains('commitId') && workflow.commitId )
entries.put('nextflow.io/commitId', workflow.commitId)
if( include.contains('repository') && workflow.repository )
entries.put('nextflow.io/repository', workflow.repository)
if( include.contains('manifestName') && workflow.manifest?.name )
entries.put('nextflow.io/manifestName', workflow.manifest.name)
if( include.contains('runtimeVersion') && NextflowMeta.instance.version )
entries.put('nextflow.io/runtimeVersion', NextflowMeta.instance.version.toString())
if( include.contains('workflowId') && workflow.platform?.workflowId )
entries.put('seqera.io/platform/workflowId', workflow.platform.workflowId)
if( include.contains('workspaceId') && workflow.platform?.workspace?.id )
entries.put('seqera.io/platform/workspaceId', workflow.platform.workspace.id)
if( include.contains('computeEnvId') && workflow.platform?.computeEnv?.id )
entries.put('seqera.io/platform/computeEnvId', workflow.platform.computeEnv.id)
return this
}
/**
* Add {@code seqera:sched:*} scheduler labels
*/
Labels withSchedRunId(String runId) {
if( runId )
entries.put('seqera:sched:runId', runId)
return this
}
Labels withSchedClusterId(String clusterId) {
if( clusterId )
entries.put('seqera:sched:clusterId', clusterId)
return this
}
/**
* Add config-level {@code process.resourceLabels}. Values are coerced to
* string via {@link String#valueOf} to satisfy the scheduler API typing.
*/
Labels withProcessResourceLabels(Map<String,?> map) {
if( !map ) return this
for( Map.Entry<String,?> entry : map.entrySet() )
entries.put(entry.key.toString(), String.valueOf(entry.value))
return this
}
/**
* @return all labels as an unmodifiable map
*/
Map<String,String> getEntries() {
return Collections.unmodifiableMap(entries)
}
/**
* Compute a run identifier as SipHash of sessionId + runName
*/
protected static String runId(String sessionId, String runName) {
return Hashing
.sipHash24()
.newHasher()
.putUnencodedChars(sessionId)
.putUnencodedChars(runName)
.hash()
.toString()
}
/**
* Coerce arbitrary map values to strings via {@link String#valueOf}.
* Returns an empty map for null/empty input. Throws
* {@link IllegalArgumentException} when the value is not a {@link Map},
* to surface a clear error when {@code process.resourceLabels} is
* misconfigured (e.g. as a list).
*/
static Map<String,String> toStringMap(Object value) {
if( value == null )
return Collections.<String,String>emptyMap()
if( value !instanceof Map )
throw new IllegalArgumentException("Invalid value for 'resourceLabels' directive - expected a map of key/value pairs, got '${value.getClass().getName()}'")
final map = (Map<?,?>) value
if( map.isEmpty() )
return Collections.<String,String>emptyMap()
final result = new LinkedHashMap<String,String>(map.size())
for( Map.Entry<?,?> entry : map.entrySet() )
result.put(entry.key.toString(), String.valueOf(entry.value))
return result
}
/**
* Return the entries of {@code task} that are missing from {@code run}
* or have a different value. Returns {@code null} if the resulting
* map would be empty (so callers can omit the field).
*/
static Map<String,String> delta(Map<String,String> task, Map<String,String> run) {
if( !task ) return null
final result = new LinkedHashMap<String,String>()
for( Map.Entry<String,String> entry : task.entrySet() ) {
final k = entry.key
final v = entry.value
if( run == null || !run.containsKey(k) || run.get(k) != v )
result.put(k, v)
}
return result.isEmpty() ? null : result
}
}

View File

@@ -0,0 +1,317 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seqera.executor
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutorService
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import groovy.transform.CompileStatic
import groovy.transform.TupleConstructor
import groovy.util.logging.Slf4j
import io.seqera.sched.api.schema.v1a1.InputFilesMetrics
import io.seqera.sched.api.schema.v1a1.Task
import io.seqera.sched.client.SchedClient
import nextflow.SysEnv
import nextflow.util.Duration
import nextflow.util.ThreadPoolBuilder
import nextflow.util.Threads
/**
* Batches task submissions to the Seqera scheduler API.
*
* @author Lorenzo Fontana <fontanalorenz@gmail.com>
*/
@Slf4j
@CompileStatic
class SeqeraBatchSubmitter {
/** Maximum tasks per API call */
static final int TASKS_PER_REQUEST = SysEnv.getInteger('NXF_SEQERA_TASK_PER_REQUEST', 100)
/** Default flush interval */
static final Duration REQUEST_INTERVAL = SysEnv.get('NXF_SEQERA_REQUEST_INTERVAL', '1 sec') as Duration
/** Keep-alive interval - send empty submission to maintain run */
static final Duration KEEP_ALIVE_INTERVAL = SysEnv.get('NXF_SEQERA_KEEP_ALIVE_INTERVAL', '60 sec') as Duration
/** Timeout for waiting on metrics computation */
static final Duration METRICS_TIMEOUT = SysEnv.get('NXF_SEQERA_METRICS_TIMEOUT', '30 sec') as Duration
/**
* Holds a task handler, its prepared Task object, and async metrics computation
*/
@TupleConstructor
static class PendingTask {
SeqeraTaskHandler handler
Task task
CompletableFuture<InputFilesMetrics> metricsFuture
}
private final SchedClient client
private final String runId
private final Duration requestInterval
private final Duration keepAliveInterval
private final Closure onError
private final LinkedBlockingQueue<PendingTask> pendingQueue = new LinkedBlockingQueue<>()
private Thread sender
private volatile boolean completed = false
/** Executor pool for async input file metrics computation */
private final ExecutorService metricsExecutor
SeqeraBatchSubmitter(SchedClient client, String runId) {
this(client, runId, REQUEST_INTERVAL, KEEP_ALIVE_INTERVAL)
}
SeqeraBatchSubmitter(SchedClient client, String runId, Duration requestInterval) {
this(client, runId, requestInterval, KEEP_ALIVE_INTERVAL)
}
SeqeraBatchSubmitter(SchedClient client, String runId, Duration requestInterval, Duration keepAliveInterval, Closure onError=null) {
this.client = client
this.runId = runId
this.requestInterval = requestInterval
this.keepAliveInterval = keepAliveInterval
this.onError = onError
// Create a thread pool for metrics computation
this.metricsExecutor = new ThreadPoolBuilder()
.withName('seqera-metrics')
.withMinSize(0)
.withMaxSize(10)
.withKeepAliveTime(60_000L)
.withAllowCoreThreadTimeout(true)
.build()
}
/**
* Start the sender thread that processes the batch queue
*/
void start() {
log.debug "[SEQERA] Starting batch submitter - interval=${requestInterval}"
this.sender = Threads.start('Seqera-batch-submitter', this.&sendTasks0)
}
/**
* Enqueue a task for batch submission.
* Starts async computation of input files metrics immediately.
*/
void submit(SeqeraTaskHandler handler, Task task) {
if (completed) {
throw new IllegalStateException("Batch submitter has been shutdown")
}
// Start async metrics computation
final taskRun = handler.task
final metricsFuture = CompletableFuture.supplyAsync(
()-> InputFilesProfiler.compute(taskRun),
metricsExecutor
)
pendingQueue.add(new PendingTask(handler, task, metricsFuture))
}
/**
* Signal completion and wait for sender thread to finish
*/
void shutdown() {
log.debug "[SEQERA] Shutting down batch submitter"
completed = true
if (sender) {
sender.join()
}
// Shutdown metrics executor
metricsExecutor.shutdown()
try {
if (!metricsExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
metricsExecutor.shutdownNow()
}
}
catch (InterruptedException e) {
metricsExecutor.shutdownNow()
Thread.currentThread().interrupt()
}
log.debug "[SEQERA] Batch submitter shutdown complete"
}
/**
* Sender thread loop
*/
protected void sendTasks0(dummy) {
final List<PendingTask> batch = new ArrayList<>(TASKS_PER_REQUEST)
long previous = System.currentTimeMillis()
final long period = requestInterval.millis
final long delay = period / 10 as long
try {
while (!completed || !pendingQueue.isEmpty()) {
// Poll with timeout
final PendingTask pending = pendingQueue.poll(delay, TimeUnit.MILLISECONDS)
if (pending) {
// Start the batch timer when first task arrives
if (batch.isEmpty()) {
previous = System.currentTimeMillis()
}
batch.add(pending)
}
// Check if we should flush
final now = System.currentTimeMillis()
final delta = now - previous
if (!batch.isEmpty()) {
// Flush if: time elapsed OR batch full OR shutting down
if (delta > period || batch.size() >= TASKS_PER_REQUEST || completed) {
flushBatch(batch)
previous = System.currentTimeMillis()
batch.clear()
}
}
else if (delta > keepAliveInterval.millis) {
// Keep-alive: send empty submission to maintain run
try {
log.debug "[SEQERA] Sending keep-alive for run ${runId}"
client.createTasks(runId, Collections.emptyList())
}
catch (Exception e) {
log.warn "[SEQERA] Keep-alive failed: ${e.message}"
// Don't crash the thread for keep-alive failures
}
// Always update timestamp to avoid rapid retry on failure
previous = System.currentTimeMillis()
}
}
// Final flush of any remaining tasks
if (!batch.isEmpty()) {
flushBatch(batch)
}
}
catch (Throwable e) {
log.error "[SEQERA] Fatal error in batch submitter thread", e
// Convert Throwable to Exception for handler API
final Exception exception = e instanceof Exception ? (Exception) e : new RuntimeException(e)
// Fail any tasks in the current batch
for (PendingTask pending : batch) {
try {
pending.handler.onBatchSubmitFailure(exception)
}
catch (Exception ex) {
log.warn "[SEQERA] Error failing batch task", ex
}
}
// Drain and fail any remaining pending tasks
drainAndFailPendingTasks(exception)
// Invoke error callback to abort run
if (onError) {
try {
onError.call(e)
}
catch (Throwable t) {
log.warn "[SEQERA] Error in failure callback", t
}
}
}
}
/**
* Drain the pending queue and fail all tasks with the given error
*/
private void drainAndFailPendingTasks(Exception cause) {
PendingTask pending
while ((pending = pendingQueue.poll()) != null) {
try {
pending.handler.onBatchSubmitFailure(cause)
}
catch (Exception e) {
log.warn "[SEQERA] Error failing pending task", e
}
}
}
/**
* Submit a batch of tasks to the scheduler API
*/
protected void flushBatch(List<PendingTask> batch) {
log.debug "[SEQERA] Submitting batch of ${batch.size()} tasks"
try {
// Resolve async metrics for all tasks in batch
resolveMetrics(batch)
// Extract Task objects for API call
final List<Task> tasks = batch.collect { it.task }
// Submit batch to API
final response = client.createTasks(runId, tasks)
final List<String> taskIds = response.getTaskIds()
// Validate response
if (taskIds.size() != batch.size()) {
throw new IllegalStateException("Seqera Scheduler API returned ${taskIds.size()} task IDs but submitted ${batch.size()} tasks")
}
// Map task IDs back to handlers
for (int i = 0; i < batch.size(); i++) {
final handler = batch[i].handler
final taskId = taskIds[i]
handler.setBatchTaskId(taskId)
}
log.debug "[SEQERA] Batch submission complete: ${taskIds.size()} tasks submitted"
} catch (Exception e) {
log.error "[SEQERA] Batch submission failed for ${batch.size()} tasks", e
// Propagate failure to all handlers in this batch
for (PendingTask pending : batch) {
try {
pending.handler.onBatchSubmitFailure(e)
} catch (Exception ex) {
log.warn "[SEQERA] Error handling batch failure for task", ex
}
}
}
}
/**
* Wait for and attach metrics to all tasks in the batch.
* Uses timeout to avoid blocking indefinitely on slow computations.
*/
private void resolveMetrics(List<PendingTask> batch) {
final timeout = METRICS_TIMEOUT.millis
for (PendingTask pending : batch) {
try {
final metrics = pending.metricsFuture.get(timeout, TimeUnit.MILLISECONDS)
if (metrics) {
pending.task.inputFiles(metrics)
log.debug "[SEQERA] Task `${pending.handler.task.name}` input files metrics: ${metrics}"
}
}
catch (TimeoutException e) {
log.warn "[SEQERA] Timeout computing input files metrics for task: ${pending.handler.task.name}"
pending.metricsFuture.cancel(true)
}
catch (Exception e) {
log.warn "[SEQERA] Failed to compute input files metrics for task: ${pending.handler.task.name} - ${e.message}"
}
}
}
}

View File

@@ -0,0 +1,239 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seqera.executor
import groovy.transform.CompileStatic
import groovy.transform.PackageScope
import groovy.util.logging.Slf4j
import io.seqera.config.SeqeraConfig
import io.seqera.config.ExecutorOpts
import io.seqera.util.SchemaMapperUtil
import io.seqera.sched.api.schema.v1a1.CreateRunRequest
import io.seqera.sched.api.schema.v1a1.PipelineSpec
import io.seqera.sched.api.schema.v1a1.PredictionModel
import io.seqera.sched.client.SchedClient
import io.seqera.sched.api.schema.v1a1.TerminateRunRequest
import io.seqera.sched.client.SchedClientConfig
import nextflow.exception.AbortOperationException
import nextflow.executor.Executor
import nextflow.fusion.FusionHelper
import nextflow.platform.PlatformHelper
import nextflow.processor.TaskHandler
import nextflow.processor.TaskMonitor
import nextflow.processor.TaskPollingMonitor
import nextflow.processor.TaskRun
import nextflow.SysEnv
import nextflow.util.Duration
import nextflow.util.ServiceName
import org.pf4j.ExtensionPoint
/**
* Nextflow executor that delegates task execution to the Seqera scheduler API.
*
* <p>This executor creates a run on the Seqera scheduler, submits tasks in batches
* via {@link SeqeraBatchSubmitter}, and monitors their lifecycle through the scheduler API.
* It requires Fusion file system to be enabled and all processes to specify a container image.
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
@Slf4j
@ServiceName(SEQERA)
@CompileStatic
class SeqeraExecutor extends Executor implements ExtensionPoint {
public static final String SEQERA = 'seqera'
private static final String DEFAULT_FUSION_VERSION = '2.6'
private ExecutorOpts seqeraConfig
private SchedClient client
private volatile String runId
private volatile Map<String,String> runResourceLabels = Collections.<String,String>emptyMap()
private SeqeraBatchSubmitter batchSubmitter
@Override
protected void register() {
applyFusionDefaults()
createClient()
}
protected void applyFusionDefaults() {
final fusionConfig = session.config.fusion as Map
if( fusionConfig!=null && !fusionConfig.containerConfigUrl ) {
fusionConfig.put('targetVersion', DEFAULT_FUSION_VERSION)
}
}
@Override
void shutdown() {
// Flush any pending batch jobs before terminating run
session.error
batchSubmitter?.shutdown()
terminateRun()
}
protected void createClient() {
final seqera = new SeqeraConfig(session.config.seqera as Map ?: Collections.<String,Object>emptyMap())
this.seqeraConfig = seqera.executor
if (!seqeraConfig)
throw new IllegalArgumentException("Missing Seqera executor configuration - make sure to specify 'seqera.executor' settings")
// Get access token and refresh token from tower config (shares authentication with Platform)
def towerConfig = session.config.tower as Map ?: Collections.emptyMap()
def accessToken = PlatformHelper.getAccessToken(towerConfig, SysEnv.get())
def refreshToken = PlatformHelper.getRefreshToken(towerConfig, SysEnv.get())
def platformUrl = PlatformHelper.getEndpoint(towerConfig, SysEnv.get())
def clientConfig = SchedClientConfig.builder()
.endpoint(seqeraConfig.endpoint)
.platformUrl(platformUrl)
.accessToken(accessToken)
.refreshToken(refreshToken)
.retryConfig(seqeraConfig.retryOpts())
.build()
this.client = new SchedClient(clientConfig)
}
protected void createRun() {
final towerConfig = session.config.tower as Map ?: Collections.emptyMap()
final workflowId = session.workflowMetadata?.platform?.workflowId
final workflowUrl = session.workflowMetadata?.platform?.workflowUrl
final workspaceId = PlatformHelper.getWorkspaceId(towerConfig, SysEnv.get()) as Long
final computeEnvId = PlatformHelper.getComputeEnvId(towerConfig, SysEnv.get()) ?: seqeraConfig.computeEnvId
computeRunResourceLabels()
final labels = new Labels()
if( seqeraConfig.autoLabels )
labels.withWorkflowMetadata(session.workflowMetadata, seqeraConfig.autoLabels)
labels.withProcessResourceLabels(runResourceLabels)
final predictionModel = seqeraConfig.predictionModel ? PredictionModel.fromValue(seqeraConfig.predictionModel) : null
final pipeline = new PipelineSpec()
.workflowId(workflowId)
.workflowUrl(workflowUrl)
.workDir(session.workDir?.toUriString())
final request = new CreateRunRequest()
.provider(seqeraConfig.provider)
.region(seqeraConfig.region)
.name(session.runName)
.machineRequirement(SchemaMapperUtil.toMachineRequirement(seqeraConfig.machineRequirement))
.labels(labels.entries)
.workspaceId(workspaceId)
.pipeline(pipeline)
.predictionModel(predictionModel)
.computeEnvId(computeEnvId)
log.debug "[SEQERA] Creating run: ${request}"
final response = client.createRun(request)
this.runId = response.getRunId()
log.debug "[SEQERA] Run created id: ${runId}; workflowId: '${workflowId}'; workflowUrl: '${workflowUrl}'"
// Initialize and start batch submitter with error callback to abort on fatal errors
this.batchSubmitter = new SeqeraBatchSubmitter(
client,
runId,
seqeraConfig.batchFlushInterval,
SeqeraBatchSubmitter.KEEP_ALIVE_INTERVAL,
{ Throwable t -> session.abort(t) }
)
this.batchSubmitter.start()
}
protected void terminateRun() {
if (!runId) {
return
}
final stopReason = truncate(session.fault?.report, 10_000)
log.debug "[SEQERA] Terminating run: ${runId}; stopReason: ${stopReason}"
client.terminateRun(runId, new TerminateRunRequest().stopReason(stopReason))
log.debug "[SEQERA] Run terminated"
}
@Override
protected TaskMonitor createTaskMonitor() {
TaskPollingMonitor.create(session, config, name, 1000, Duration.of('10 sec'))
}
@Override
TaskHandler createTaskHandler(TaskRun task) {
return new SeqeraTaskHandler(task, this)
}
/**
* @return {@code true} whenever the containerization is managed by the executor itself
*/
boolean isContainerNative() {
return true
}
@Override
boolean isFusionEnabled() {
final enabled = FusionHelper.isFusionEnabled(session)
if (!enabled)
throw new AbortOperationException("Seqera executor requires the use of Fusion file system")
return true
}
/**
* Lazily creates the run on first access, ensuring workflowId and labels
* are available (they are set by TowerClient.onFlowCreate before tasks are submitted).
*/
void ensureRunCreated() {
if (runId) return
synchronized (this) {
if (runId) return
createRun()
}
}
SchedClient getClient() {
return client
}
String getRunId() {
return runId
}
Map<String,String> getRunResourceLabels() {
return Collections.unmodifiableMap(runResourceLabels)
}
@PackageScope
void computeRunResourceLabels() {
final processMap = session.config.process as Map
final value = processMap?.get('resourceLabels')
if( value instanceof Closure ) {
log.debug "Skipping run-level process.resourceLabels: dynamic (closure) values are only resolved per-task"
this.runResourceLabels = Collections.<String,String>emptyMap()
return
}
this.runResourceLabels = Labels.toStringMap(value)
}
SeqeraBatchSubmitter getBatchSubmitter() {
return batchSubmitter
}
ExecutorOpts getSeqeraConfig() {
return seqeraConfig
}
protected static String truncate(String value, int maxLen) {
if (!value || value.length() <= maxLen)
return value
return value.take(maxLen) + '\n.. [TRUNCATED]'
}
}

View File

@@ -0,0 +1,432 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seqera.executor
import java.nio.file.Path
import groovy.transform.CompileStatic
import groovy.transform.PackageScope
import groovy.util.logging.Slf4j
import io.seqera.executor.Labels
import io.seqera.sched.api.schema.v1a1.AcceleratorType
import io.seqera.sched.api.schema.v1a1.GetTaskLogsResponse
import io.seqera.sched.api.schema.v1a1.NextflowTask
import io.seqera.sched.api.schema.v1a1.ResourceLimit
import io.seqera.sched.api.schema.v1a1.ResourceRequirement
import io.seqera.sched.api.schema.v1a1.Task
import io.seqera.sched.api.schema.v1a1.TaskState as SchedTaskState
import io.seqera.sched.api.schema.v1a1.TaskStatus as SchedTaskStatus
import io.seqera.sched.client.SchedClient
import io.seqera.util.HintHelper
import io.seqera.util.SchemaMapperUtil
import nextflow.cloud.types.CloudMachineInfo
import nextflow.exception.ProcessException
import nextflow.exception.ProcessUnrecoverableException
import nextflow.util.Duration
import nextflow.util.MemoryUnit
import nextflow.fusion.FusionAwareTask
import nextflow.processor.TaskHandler
import nextflow.processor.TaskRun
import nextflow.processor.TaskStatus
import nextflow.trace.TraceRecord
/**
* Task handler for the Seqera scheduler executor.
*
* <p>Manages the lifecycle of a single task submitted to the Seqera scheduler,
* including submission via batch submitter, status polling, completion handling,
* and trace record enrichment with machine info and spot interruption metadata.
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
@Slf4j
@CompileStatic
class SeqeraTaskHandler extends TaskHandler implements FusionAwareTask {
private SchedClient client
private SeqeraExecutor executor
private Path exitFile
private Path outputFile
private Path errorFile
private volatile String taskId
/**
* Cached task state from last describeTask call, used for trace record metadata
*/
private volatile SchedTaskState cachedTaskState
/**
* Cached machine info extracted from task attempts
*/
private volatile CloudMachineInfo machineInfo
SeqeraTaskHandler(TaskRun task, SeqeraExecutor executor) {
super(task)
this.client = executor.getClient()
this.executor = executor
// those files are access via NF runtime, keep based on CloudStoragePath
this.outputFile = task.workDir.resolve(TaskRun.CMD_OUTFILE)
this.errorFile = task.workDir.resolve(TaskRun.CMD_ERRFILE)
this.exitFile = task.workDir.resolve(TaskRun.CMD_EXIT)
}
@Override
void prepareLauncher() {
assert fusionEnabled()
final launcher = fusionLauncher()
launcher.build()
}
@Override
void submit() {
executor.ensureRunCreated()
int cpuShares = (task.config.getCpus() ?: 1) * 1024
int memoryMiB = task.config.getMemory() ? (int) (task.config.getMemory().toBytes() / (1024 * 1024)) : 1024
final resourceReq = new ResourceRequirement()
.cpuShares(cpuShares)
.memoryMiB(memoryMiB)
// add accelerator settings if defined
final accelerator = task.config.getAccelerator()
if( accelerator ) {
// number of accelerators requested, fallback to limit if request is not specified
resourceReq.acceleratorCount(accelerator.request ?: accelerator.limit)
// accelerator type is GPU by default (most common in scientific computing)
resourceReq.acceleratorType(AcceleratorType.GPU)
// specific accelerator model name e.g. "nvidia-tesla-v100", "nvidia-a10g"
if( accelerator.type )
resourceReq.acceleratorName(accelerator.type)
}
// build machine requirement merging config settings with task arch, disk, and snapshot settings
// overlay any seqera/machineRequirement.* hints on top of config-scope values (hints win)
final baseMachineOpts = HintHelper.overlayHints(
executor.getSeqeraConfig().machineRequirement,
task.config.getHints()
)
final machineReq = SchemaMapperUtil.toMachineRequirement(
baseMachineOpts,
task.getContainerPlatform(),
task.config.getDisk(),
fusionConfig().snapshotsEnabled()
)
// build resource limit from process resourceLimits directive (upper bound for OOM retry scaling)
final resourceLim = toResourceLimit()
// validate container - Seqera executor requires all processes to specify a container image
final container = task.getContainer()
if( !container )
throw new ProcessUnrecoverableException("Process `${task.lazyName()}` failed because the container image was not specified -- the Seqera executor requires all processes define a container image")
// build the scheduler task with all required attributes
final schedTask = new Task()
.name(task.lazyName()) // process name for identification
.image(container) // container image to run
.command(fusionSubmitCli()) // fusion-based command launcher
.environment(getTaskEnvironment()) // fusion + user-configured environment variables
.resourceRequirement(resourceReq) // cpu, memory, accelerators
.resourceLimit(resourceLim) // resource upper bounds for OOM retry
.machineRequirement(machineReq) // machine type and disk requirements
.nextflow(new NextflowTask()
.taskId(task.id?.intValue())
.hash(task.hash?.toString())
.workDir(task.getWorkDirStr()))
// attach per-task resource labels delta (over run-level baseline)
final taskLabels = Labels.toStringMap(task.config.getResourceLabels())
final delta = Labels.delta(taskLabels, executor.runResourceLabels)
if( delta )
schedTask.labels(delta)
log.debug "[SEQERA] Enqueueing task for batch submission: ${schedTask}"
// Enqueue for batch submission - status will be set by setBatchTaskId callback
executor.getBatchSubmitter().submit(this, schedTask)
}
/**
* Build the task environment by merging user-configured environment variables
* with Fusion environment variables. Fusion variables take precedence.
*/
protected Map<String, String> getTaskEnvironment() {
final configEnv = executor.getSeqeraConfig()?.taskEnvironment
final fusionEnv = fusionLauncher().fusionEnv()
if( !configEnv )
return fusionEnv
final result = new LinkedHashMap<String, String>(configEnv)
result.putAll(fusionEnv)
return result
}
/**
* Called by batch submitter after successful batch submission
*/
void setBatchTaskId(String taskId) {
this.taskId = taskId
this.status = TaskStatus.SUBMITTED
log.debug "[SEQERA] Process `${task.lazyName()}` submitted > taskId=$taskId; work-dir=${task.getWorkDirStr()}"
}
/**
* Called by batch submitter when batch submission fails
*/
void onBatchSubmitFailure(Exception cause) {
log.debug "[SEQERA] Batch submission failed for task ${task.lazyName()}: ${cause.message}"
task.error = cause
this.status = TaskStatus.COMPLETED
}
/**
* Build a {@link ResourceLimit} from the process {@code resourceLimits} directive.
* Returns {@code null} if no resource limits are defined.
*/
protected ResourceLimit toResourceLimit() {
final memoryLimit = task.config.getResourceLimit('memory') as MemoryUnit
final cpusLimit = task.config.getResourceLimit('cpus') as Integer
if( !memoryLimit && !cpusLimit )
return null
final result = new ResourceLimit()
if( memoryLimit )
result.memoryMiB((int)(memoryLimit.toBytes() / (1024 * 1024)))
if( cpusLimit )
result.cpuShares(cpusLimit * 1024)
return result
}
protected SchedTaskStatus schedTaskStatus() {
cachedTaskState = client.describeTask(taskId).getTaskState()
return cachedTaskState.getStatus()
}
@Override
boolean checkIfRunning() {
if (isSubmitted()) {
final schedStatus = schedTaskStatus()
log.debug "[SEQERA] checkIfRunning taskId=${taskId}; status=${schedStatus}"
if (isRunningOrTerminated(schedStatus)) {
status = TaskStatus.RUNNING
return true
}
}
return false
}
@Override
boolean checkIfCompleted() {
// Handle batch submission failure - task error was set but never reached RUNNING state
if (task.error && isCompleted()) {
return true
}
if (!isRunning())
return false
final schedStatus = schedTaskStatus()
log.debug "[SEQERA] checkIfCompleted status=${schedStatus}"
if (isTerminated(schedStatus)) {
log.debug "[SEQERA] Process `${task.lazyName()}` - terminated taskId=$taskId; status=$schedStatus"
// finalize the task
task.exitStatus = readExitFile()
if (isFailed(schedStatus)) {
// When no exit code available, get the error message from task state
if (task.exitStatus == Integer.MAX_VALUE) {
final errorMessage = cachedTaskState?.getErrorMessage() ?: "Task failed for unknown reason"
task.error = new ProcessException(errorMessage)
}
final logs = getTaskLogs(taskId)
task.stdout = logs?.stdout ?: outputFile
task.stderr = logs?.stderr ?: errorFile
} else {
task.stdout = outputFile
task.stderr = errorFile
}
status = TaskStatus.COMPLETED
return true
}
return false
}
protected boolean isRunningOrTerminated(SchedTaskStatus status) {
return status == SchedTaskStatus.RUNNING || isTerminated(status)
}
protected boolean isTerminated(SchedTaskStatus status) {
return status in [SchedTaskStatus.SUCCEEDED, SchedTaskStatus.FAILED, SchedTaskStatus.CANCELLED]
}
protected boolean isFailed(SchedTaskStatus status) {
return status == SchedTaskStatus.FAILED
}
protected GetTaskLogsResponse getTaskLogs(String taskId) {
return client.getTaskLogs(taskId)
}
@Override
protected void killTask() {
if( !taskId ) {
log.trace "[SEQERA] Skip cancel - taskId not yet assigned"
return
}
log.debug "[SEQERA] Cancel taskId=${taskId}"
try {
client.cancelTask(taskId)
}
catch (Throwable t) {
log.warn "[SEQERA] Failed to cancel task ${taskId}", t
}
}
@PackageScope
Integer readExitFile() {
try {
final result = exitFile.text as Integer
log.trace "[SEQERA] Read exit file for taskId $taskId; exit=${result}"
return result
}
catch (Exception e) {
log.debug "[SEQERA] Cannot read exit status for task: `${task.lazyName()}` - ${e.message}"
// return MAX_VALUE to signal it was unable to retrieve the exit code
return Integer.MAX_VALUE
}
}
/**
* Get machine info for the task execution from the last task attempt.
* The machine info is cached after first retrieval.
*
* @return CloudMachineInfo containing instance type, zone, and price model, or null if not available
*/
protected CloudMachineInfo getMachineInfo() {
if (machineInfo)
return machineInfo
if (!cachedTaskState)
return null
try {
final attempts = cachedTaskState.getAttempts()
if (!attempts || attempts.isEmpty())
return null
final lastAttempt = attempts.get(attempts.size() - 1)
final lastInfo = lastAttempt.getMachineInfo()
if (!lastInfo)
return null
// Convert Sched API MachineInfo to Nextflow CloudMachineInfo
machineInfo = new CloudMachineInfo(
type: lastInfo.getType(),
zone: lastInfo.getZone(),
priceModel: SchemaMapperUtil.toPriceModel(lastInfo.getPriceModel())
)
log.trace "[SEQERA] taskId=$taskId => machineInfo=$machineInfo"
return machineInfo
}
catch (Exception e) {
log.debug "[SEQERA] Unable to get machine info for taskId=$taskId - ${e.message}"
return null
}
}
/**
* Get the number of spot interruptions for this task.
* This is calculated server-side from task attempts with spot-related stop reasons.
*
* @return the count of spot interruptions, or null if not completed or not available
*/
protected Integer getNumSpotInterruptions() {
if (!taskId || !isCompleted())
return null
if (!cachedTaskState)
return null
return cachedTaskState.getNumSpotInterruptions()
}
/**
* Get the log stream identifier for this task.
*
* @return the log stream ID, or null if not available
*/
protected String getLogStreamId() {
return cachedTaskState?.getLogStreamId()
}
/**
* Get the native backend ID for this task (ECS task ARN or Docker container ID).
*
* @return the native ID from the last task attempt, or null if not available
*/
protected String getNativeId() {
return cachedTaskState?.getId()
}
/**
* Get the allocated resources for this task from the last task attempt.
* Falls back to the resource requirement from the task state if no attempts exist.
*
* @return a map of allocated resource fields, or null if not available
*/
protected Map<String,Object> getResourceAllocation() {
if (!cachedTaskState)
return null
def resources = null
final attempts = cachedTaskState.getAttempts()
if (attempts && !attempts.isEmpty()) {
resources = attempts.get(attempts.size() - 1).getResources()
}
if (!resources) {
resources = cachedTaskState.getResourceAllocation()
}
if (!resources)
return null
final result = new LinkedHashMap<String,Object>()
if (resources.getCpuShares() != null)
result.put('cpuShares', resources.getCpuShares())
if (resources.getMemoryMiB() != null)
result.put('memoryMiB', resources.getMemoryMiB())
if (resources.getAcceleratorCount() != null)
result.put('acceleratorCount', resources.getAcceleratorCount())
if (resources.getAcceleratorType() != null)
result.put('acceleratorType', resources.getAcceleratorType().toString())
if (resources.getAcceleratorName() != null)
result.put('acceleratorName', resources.getAcceleratorName())
if (resources.getTime() != null)
result.put('time', resources.getTime())
return result.isEmpty() ? null : result
}
protected Long getGrantedTime() {
final String time = cachedTaskState?.getResourceAllocation()?.getTime()
return time != null ? Duration.of(time).toMillis() : task.config.getTime()?.toMillis()
}
/**
* Get the trace record for this task, including machine info and spot interruptions metadata.
*
* @return the trace record with additional metadata fields
*/
@Override
TraceRecord getTraceRecord() {
final result = super.getTraceRecord()
result.put('native_id', getNativeId())
result.machineInfo = getMachineInfo()
result.numSpotInterruptions = getNumSpotInterruptions()
result.logStreamId = getLogStreamId()
result.resourceAllocation = getResourceAllocation()
// Override executor name to include cloud backend for cost tracking
result.executorName = "${SeqeraExecutor.SEQERA}/aws"
return result
}
}

View File

@@ -0,0 +1,34 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seqera.plugin
import groovy.transform.CompileStatic
import nextflow.plugin.BasePlugin
import org.pf4j.PluginWrapper
/**
* Seqera plugin entry point
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
@CompileStatic
class SeqeraPlugin extends BasePlugin {
SeqeraPlugin(PluginWrapper wrapper) {
super(wrapper)
}
}

View File

@@ -0,0 +1,127 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seqera.util
import java.lang.reflect.Field
import groovy.transform.CompileStatic
import io.seqera.config.MachineRequirementOpts
import nextflow.config.spec.ConfigOption
/**
* Helper for processing {@code seqera/machineRequirement.*} hints from the
* {@code hints} process directive and overlaying them onto
* {@link MachineRequirementOpts} config-scope values.
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
@CompileStatic
class HintHelper {
static final String PREFIX = 'seqera/'
static final String MR_PREFIX = 'machineRequirement.'
private static final List<Field> MR_FIELDS = Collections.unmodifiableList(
MachineRequirementOpts.declaredFields
.findAll { Field f -> f.isAnnotationPresent(ConfigOption) }
.collect { Field f -> f.setAccessible(true); f } as List<Field>
)
static final Set<String> KNOWN_KEYS = Collections.unmodifiableSet(
MR_FIELDS.collect { MR_PREFIX + it.name }.toSet()
)
private static final String SUPPORTED_KEYS_MSG =
KNOWN_KEYS.collect { PREFIX + it }.sort().join(', ')
/**
* Extract hints consumed by the Seqera executor and validate them.
*
* <p>Both {@code seqera/}-prefixed and unprefixed keys that match one of the
* {@link #KNOWN_KEYS} are returned with the prefix (if any) stripped. When the
* same logical key appears both prefixed and unprefixed, the prefixed form
* wins (executor-targeted hints override the general form).</p>
*
* <p>Foreign-namespaced keys (e.g. {@code awsbatch/...}) and unprefixed keys
* that are not recognized are ignored — they may be targeted at another
* executor. Unrecognized {@code seqera/}-prefixed keys raise an error, since
* they were explicitly targeted at this executor.</p>
*
* @param hints the full hints map from task config
* @return a map of known hint names (no prefix) to values
*/
static Map<String, Object> extractSeqeraHints(Map<String, Object> hints) {
if( !hints )
return Collections.emptyMap()
final unprefixed = new LinkedHashMap<String, Object>()
final prefixed = new LinkedHashMap<String, Object>()
for( Map.Entry<String, Object> entry : hints.entrySet() ) {
final key = entry.key
if( !key )
continue
if( key.startsWith(PREFIX) ) {
final stripped = key.substring(PREFIX.length())
if( !KNOWN_KEYS.contains(stripped) )
throw new IllegalArgumentException("Unknown Seqera Platform hint: '${key}' — supported keys are: ${SUPPORTED_KEYS_MSG}")
prefixed.put(stripped, entry.value)
}
else if( !key.contains('/') && KNOWN_KEYS.contains(key) ) {
unprefixed.put(key, entry.value)
}
}
unprefixed.putAll(prefixed)
return unprefixed
}
/**
* Overlay {@code machineRequirement.*} hints onto existing config-scope
* {@link MachineRequirementOpts}. Hint values take precedence over
* config-scope values.
*
* @param baseOpts the config-scope machine requirement options
* @param hints the full hints map from task config
* @return a new {@link MachineRequirementOpts} with hints overlaid
*/
static MachineRequirementOpts overlayHints(MachineRequirementOpts baseOpts, Map<String, Object> hints) {
final seqeraHints = extractSeqeraHints(hints)
if( !seqeraHints )
return baseOpts
final Map<String, Object> merged = new LinkedHashMap<>()
for( final field : MR_FIELDS ) {
final value = field.get(baseOpts)
if( value != null )
merged.put(field.name, value)
}
for( Map.Entry<String, Object> entry : seqeraHints.entrySet() ) {
final fieldName = entry.key.substring(MR_PREFIX.length())
final value = entry.value
if( value == null ) {
merged.remove(fieldName)
continue
}
merged.put(fieldName, value)
}
return new MachineRequirementOpts(merged)
}
}

View File

@@ -0,0 +1,288 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seqera.util
import groovy.transform.CompileStatic
import io.seqera.config.MachineRequirementOpts
import io.seqera.sched.api.schema.v1a1.DiskAllocation
import io.seqera.sched.api.schema.v1a1.DiskRequirement
import io.seqera.sched.api.schema.v1a1.EcsCapacityMode
import io.seqera.sched.api.schema.v1a1.MachineRequirement
import io.seqera.sched.api.schema.v1a1.PriceModel as SchedPriceModel
import io.seqera.sched.api.schema.v1a1.ProvisioningModel
import nextflow.cloud.types.PriceModel
import nextflow.fusion.FusionConfig
import nextflow.util.MemoryUnit
/**
* Utility class to map Nextflow config objects to Sched API schema objects.
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
@CompileStatic
class SchemaMapperUtil {
/** Default EBS volume type - gp3 provides good balance of price and performance */
static final String DEFAULT_DISK_TYPE = 'ebs/gp3'
/** Default throughput in MiB/s - Fusion recommended setting for optimal I/O */
static final int DEFAULT_DISK_THROUGHPUT_MIBPS = 325
/** Supported EBS volume types */
static final Set<String> SUPPORTED_DISK_TYPES = Set.of(
'ebs/gp3', // General purpose SSD (default)
'ebs/gp2', // General purpose SSD (legacy)
'ebs/io1', // Provisioned IOPS SSD
'ebs/io2', // Provisioned IOPS SSD (higher durability)
'ebs/st1', // Throughput optimized HDD
'ebs/sc1' // Cold HDD
)
/**
* Maps MachineRequirementOpts to MachineRequirement API object.
*
* @param opts the config options (can be null)
* @return the MachineRequirement API object, or null if opts is null or has no settings
*/
static MachineRequirement toMachineRequirement(MachineRequirementOpts opts) {
if (!opts)
return null
final diskReq = toDiskRequirement(opts.diskSize, opts)
final capacityMode = toEcsCapacityMode(opts.capacityMode)
if (!opts.provisioning && !opts.maxSpotAttempts && !opts.machineTypes && !diskReq && !capacityMode)
return null
new MachineRequirement()
.provisioning(toProvisioningModel(opts.provisioning))
.maxSpotAttempts(opts.maxSpotAttempts)
.machineTypes(opts.machineTypes)
.disk(diskReq)
.capacityMode(capacityMode)
}
/**
* Maps MachineRequirementOpts to MachineRequirement API object, merging with task arch.
*
* @param opts the config options (can be null)
* @param taskArch the task container platform/arch (can be null)
* @return the MachineRequirement API object, or null if no settings
*/
static MachineRequirement toMachineRequirement(MachineRequirementOpts opts, String taskArch) {
return toMachineRequirement(opts, taskArch, null, false)
}
/**
* Maps MachineRequirementOpts to MachineRequirement API object, merging with task arch, disk, and snapshots.
*
* @param opts the config options (can be null)
* @param taskArch the task container platform/arch (can be null)
* @param diskSize the disk size from task config (can be null)
* @param snapshotEnabled whether Fusion snapshots are enabled
* @return the MachineRequirement API object, or null if no settings
*/
static MachineRequirement toMachineRequirement(MachineRequirementOpts opts, String taskArch, MemoryUnit diskSize, boolean snapshotEnabled) {
final arch = taskArch
final provisioning = opts?.provisioning
final maxSpotAttempts = opts?.maxSpotAttempts
?: (snapshotEnabled ? FusionConfig.DEFAULT_SNAPSHOT_MAX_SPOT_ATTEMPTS : null)
final machineTypes = opts?.machineTypes
// task disk overrides config disk
final effectiveDiskSize = diskSize ?: opts?.diskSize
final diskReq = toDiskRequirement(effectiveDiskSize, opts)
final capacityMode = toEcsCapacityMode(opts?.capacityMode)
// return null if no settings
if (!arch && !provisioning && !maxSpotAttempts && !machineTypes && !diskReq && !snapshotEnabled && !capacityMode)
return null
new MachineRequirement()
.arch(arch)
.provisioning(toProvisioningModel(provisioning))
.maxSpotAttempts(maxSpotAttempts)
.machineTypes(machineTypes)
.disk(diskReq)
.snapshotEnabled(snapshotEnabled ? Boolean.TRUE : null)
.capacityMode(capacityMode)
}
/**
* Maps a disk size to DiskRequirement API object.
* Uses config options if provided, otherwise defaults to Fusion recommended settings:
* EBS gp3 volume with 325 MiB/s throughput.
*
* For 'node' allocation (default), only sizeGiB and mountPath are applicable.
* For 'task' allocation, all EBS options can be specified.
*
* @param diskSize the disk size (can be null)
* @param opts the machine requirement options with disk settings (can be null)
* @return the DiskRequirement API object, or null if diskSize is null or zero
*/
static DiskRequirement toDiskRequirement(MemoryUnit diskSize, MachineRequirementOpts opts=null) {
final allocation = toDiskAllocation(opts?.diskAllocation)
// For NVMe allocation, disk size is instance-determined — no size required
if (allocation == DiskAllocation.NVME) {
validateNvmeAllocationOpts(opts)
final DiskRequirement req = new DiskRequirement()
req.sizeGiB(diskSize ? diskSize.toGiga() as Integer : 0)
req.allocation(allocation)
req.mountPath(opts?.diskMountPath)
return req
}
if (!diskSize || diskSize.toGiga() <= 0)
return null
final effectiveAllocation = allocation ?: DiskAllocation.NODE
// For 'node' allocation (default), only size and mountPath are valid
if (effectiveAllocation == DiskAllocation.NODE) {
validateNodeAllocationOpts(opts)
final DiskRequirement req = new DiskRequirement()
req.sizeGiB(diskSize.toGiga() as Integer)
req.allocation(effectiveAllocation)
req.mountPath(opts?.diskMountPath)
return req
}
// For 'task' allocation, apply EBS-specific options
final type = opts?.diskType ?: DEFAULT_DISK_TYPE
// Validate disk type is supported
if (!SUPPORTED_DISK_TYPES.contains(type)) {
throw new IllegalArgumentException("Invalid disk type: ${type}. Supported types: ${SUPPORTED_DISK_TYPES.join(', ')}")
}
final throughput = opts?.diskThroughputMiBps ?: DEFAULT_DISK_THROUGHPUT_MIBPS
final iops = opts?.diskIops
final encrypted = opts?.diskEncrypted ?: false
final DiskRequirement req = new DiskRequirement()
req.sizeGiB(diskSize.toGiga() as Integer)
req.volumeType(type)
req.encrypted(encrypted)
req.allocation(allocation)
// Only set throughput for gp3 volumes
if (type == DEFAULT_DISK_TYPE) {
req.throughputMiBps(throughput)
}
// Set IOPS if provided
if (iops) {
req.iops(iops)
}
req.mountPath(opts?.diskMountPath)
return req
}
/**
* Validates that no EBS-specific options are set when using 'node' allocation.
* Node allocation uses instance storage, not EBS volumes.
*
* @param opts the machine requirement options
* @throws IllegalArgumentException if EBS-specific options are set with node allocation
*/
private static void validateNodeAllocationOpts(MachineRequirementOpts opts) {
if (!opts)
return
final List<String> invalidOpts = []
if (opts.diskType)
invalidOpts.add('diskType')
if (opts.diskThroughputMiBps)
invalidOpts.add('diskThroughputMiBps')
if (opts.diskIops)
invalidOpts.add('diskIops')
if (opts.diskEncrypted)
invalidOpts.add('diskEncrypted')
if (invalidOpts) {
throw new IllegalArgumentException(
"The following options are not valid with 'node' disk allocation: ${invalidOpts.join(', ')}. " +
"Node allocation uses instance storage; only disk size is applicable."
)
}
}
/**
* Validates that no EBS-specific options are set when using 'nvme' allocation.
* NVMe uses instance store disks, not EBS volumes.
*/
private static void validateNvmeAllocationOpts(MachineRequirementOpts opts) {
if (!opts)
return
final List<String> invalidOpts = []
if (opts.diskType)
invalidOpts.add('diskType')
if (opts.diskThroughputMiBps)
invalidOpts.add('diskThroughputMiBps')
if (opts.diskIops)
invalidOpts.add('diskIops')
if (opts.diskEncrypted)
invalidOpts.add('diskEncrypted')
if (invalidOpts) {
throw new IllegalArgumentException(
"The following options are not valid with 'nvme' disk allocation: ${invalidOpts.join(', ')}. " +
"NVMe uses instance store disks; EBS options are not applicable."
)
}
}
/**
* Maps a disk allocation string to DiskAllocation enum.
*
* @param value the disk allocation string (task, node)
* @return the DiskAllocation enum value, or null if value is null
*/
static DiskAllocation toDiskAllocation(String value) {
value ? DiskAllocation.fromValue(value) : null
}
/**
* Maps a capacity mode string to EcsCapacityMode enum.
*
* @param value the capacity mode string (managed, asg)
* @return the EcsCapacityMode enum value, or null if value is null
*/
static EcsCapacityMode toEcsCapacityMode(String value) {
value ? EcsCapacityMode.fromValue(value) : null
}
/**
* Maps a provisioning string to ProvisioningModel enum.
*
* @param value the provisioning string (spot, ondemand, spotFirst)
* @return the ProvisioningModel enum value, or null if value is null
*/
static ProvisioningModel toProvisioningModel(String value) {
value ? ProvisioningModel.fromValue(value) : null
}
/**
* Maps Sched API PriceModel to Nextflow PriceModel.
*
* @param schedPriceModel the Sched API price model
* @return the Nextflow PriceModel, or null if input is null or unknown
*/
static PriceModel toPriceModel(SchedPriceModel schedPriceModel) {
if (schedPriceModel == null)
return null
switch (schedPriceModel) {
case SchedPriceModel.SPOT:
return PriceModel.spot
case SchedPriceModel.STANDARD:
return PriceModel.standard
default:
return null
}
}
}

View File

@@ -0,0 +1,348 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seqera.config
import nextflow.util.Duration
import spock.lang.Specification
/**
* Unit tests for SeqeraExecutorConfig
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
class ExecutorOptsTest extends Specification {
def 'should throw error when endpoint is missing' () {
when:
new ExecutorOpts([:])
then:
def e = thrown(IllegalArgumentException)
e.message.contains('Missing Seqera endpoint')
}
def 'should create config with minimal settings' () {
when:
def config = new ExecutorOpts([endpoint: 'https://sched.example.com'])
then:
config.endpoint == 'https://sched.example.com'
config.region == null
config.provider == null
config.keyPairName == null
config.batchFlushInterval == Duration.of('1 sec')
config.machineRequirement != null
config.machineRequirement.provisioning == null
!config.autoLabels
}
def 'should create config with custom region' () {
when:
def config = new ExecutorOpts([
endpoint: 'https://sched.example.com',
region: 'us-west-2'
])
then:
config.endpoint == 'https://sched.example.com'
config.region == 'us-west-2'
}
def 'should create config with custom batch flush interval' () {
when:
def config = new ExecutorOpts([
endpoint: 'https://sched.example.com',
batchFlushInterval: '5 sec'
])
then:
config.batchFlushInterval == Duration.of('5 sec')
}
def 'should create config with machine requirement settings' () {
when:
def config = new ExecutorOpts([
endpoint: 'https://sched.example.com',
machineRequirement: [
provisioning: 'spotFirst',
maxSpotAttempts: 3,
machineTypes: ['m6g', 'c6g']
]
])
then:
config.machineRequirement != null
config.machineRequirement.provisioning == 'spotFirst'
config.machineRequirement.maxSpotAttempts == 3
config.machineRequirement.machineTypes == ['m6g', 'c6g']
}
def 'should create config with retry policy' () {
when:
def config = new ExecutorOpts([
endpoint: 'https://sched.example.com',
retryPolicy: [maxAttempts: 5, delay: '2s']
])
then:
config.retryOpts().maxAttempts == 5
config.retryOpts().delay == Duration.of('2s')
}
def 'should create config with all settings' () {
when:
def config = new ExecutorOpts([
endpoint: 'https://sched.example.com',
region: 'eu-west-1',
keyPairName: 'my-key',
batchFlushInterval: '2 sec',
machineRequirement: [
provisioning: 'spot'
]
])
then:
config.endpoint == 'https://sched.example.com'
config.region == 'eu-west-1'
config.keyPairName == 'my-key'
config.batchFlushInterval == Duration.of('2 sec')
config.machineRequirement.provisioning == 'spot'
}
def 'should enable all auto labels when set to true' () {
when:
def config = new ExecutorOpts([
endpoint: 'https://sched.example.com',
autoLabels: true
])
then:
config.autoLabels == ExecutorOpts.VALID_AUTO_LABELS
}
def 'should disable auto labels when set to false' () {
when:
def config = new ExecutorOpts([
endpoint: 'https://sched.example.com',
autoLabels: false
])
then:
config.autoLabels.isEmpty()
}
def 'should accept auto labels as a list of short names' () {
when:
def config = new ExecutorOpts([
endpoint: 'https://sched.example.com',
autoLabels: ['runName', 'projectName']
])
then:
config.autoLabels == ['runName', 'projectName'] as Set
}
def 'should accept workspaceId and computeEnvId in auto labels' () {
when:
def config = new ExecutorOpts([
endpoint: 'https://sched.example.com',
autoLabels: ['workspaceId', 'computeEnvId']
])
then:
config.autoLabels == ['workspaceId', 'computeEnvId'] as Set
}
def 'should trim whitespace in auto labels list entries' () {
when:
def config = new ExecutorOpts([
endpoint: 'https://sched.example.com',
autoLabels: [' runName', 'projectName ']
])
then:
config.autoLabels == ['runName', 'projectName'] as Set
}
def 'should accept auto labels as a comma-separated string' () {
when:
def config = new ExecutorOpts([
endpoint: 'https://sched.example.com',
autoLabels: 'runName,projectName,workflowId'
])
then:
config.autoLabels == ['runName', 'projectName', 'workflowId'] as Set
}
def 'should tolerate whitespace around comma-separated auto labels' () {
when:
def config = new ExecutorOpts([
endpoint: 'https://sched.example.com',
autoLabels: 'runName, projectName ,workflowId'
])
then:
config.autoLabels == ['runName', 'projectName', 'workflowId'] as Set
}
def 'should treat empty auto labels list as disabled' () {
when:
def config = new ExecutorOpts([
endpoint: 'https://sched.example.com',
autoLabels: []
])
then:
config.autoLabels.isEmpty()
}
def 'should treat empty auto labels string as disabled' () {
when:
def config = new ExecutorOpts([
endpoint: 'https://sched.example.com',
autoLabels: ''
])
then:
config.autoLabels.isEmpty()
}
def 'should reject unknown auto labels name' () {
when:
new ExecutorOpts([
endpoint: 'https://sched.example.com',
autoLabels: ['runName', 'foo']
])
then:
def err = thrown(IllegalArgumentException)
err.message.contains("'seqera.executor.autoLabels'")
err.message.contains('foo')
err.message.contains('valid names')
}
def 'should create config with prediction model' () {
when:
def config = new ExecutorOpts([
endpoint: 'https://sched.example.com',
predictionModel: 'qr/v1'
])
then:
config.predictionModel == 'qr/v1'
}
def 'should default prediction model to null' () {
when:
def config = new ExecutorOpts([
endpoint: 'https://sched.example.com'
])
then:
config.predictionModel == null
}
def 'should create config with taskEnvironment' () {
when:
def config = new ExecutorOpts([
endpoint: 'https://sched.example.com',
taskEnvironment: [FOO: 'bar', BAZ: 'qux']
])
then:
config.taskEnvironment == [FOO: 'bar', BAZ: 'qux']
}
def 'should handle null taskEnvironment' () {
when:
def config = new ExecutorOpts([
endpoint: 'https://sched.example.com'
])
then:
config.taskEnvironment == null
}
def 'should handle empty taskEnvironment' () {
when:
def config = new ExecutorOpts([
endpoint: 'https://sched.example.com',
taskEnvironment: [:]
])
then:
config.taskEnvironment == [:]
}
def 'should create config with computeEnvId' () {
when:
def config = new ExecutorOpts([
endpoint: 'https://sched.example.com',
computeEnvId: 'ce-12345'
])
then:
config.computeEnvId == 'ce-12345'
}
def 'should default computeEnvId to null' () {
when:
def config = new ExecutorOpts([
endpoint: 'https://sched.example.com'
])
then:
config.computeEnvId == null
}
def 'should create config with provider' () {
when:
def config = new ExecutorOpts([
endpoint: 'https://sched.example.com',
provider: 'aws'
])
then:
config.provider == 'aws'
}
def 'should default provider to null' () {
when:
def config = new ExecutorOpts([
endpoint: 'https://sched.example.com'
])
then:
config.provider == null
}
def 'should create config with provider and region' () {
when:
def config = new ExecutorOpts([
endpoint: 'https://sched.example.com',
provider: 'aws',
region: 'us-west-2'
])
then:
config.provider == 'aws'
config.region == 'us-west-2'
}
}

View File

@@ -0,0 +1,62 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seqera.config
import spock.lang.Specification
/**
* Unit tests for MachineRequirementOpts
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
class MachineRequirementOptsTest extends Specification {
def 'should create with empty config' () {
when:
def opts = new MachineRequirementOpts([:])
then:
opts.provisioning == null
opts.maxSpotAttempts == null
opts.machineTypes == null
}
def 'should create with all settings' () {
when:
def opts = new MachineRequirementOpts([
provisioning: 'spotFirst',
maxSpotAttempts: 3,
machineTypes: ['m5', 'c5', 'r5']
])
then:
opts.provisioning == 'spotFirst'
opts.maxSpotAttempts == 3
opts.machineTypes == ['m5', 'c5', 'r5']
}
def 'should create with partial settings' () {
when:
def opts = new MachineRequirementOpts([provisioning: 'spot'])
then:
opts.provisioning == 'spot'
opts.maxSpotAttempts == null
opts.machineTypes == null
}
}

View File

@@ -0,0 +1,58 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seqera.config
import nextflow.util.Duration
import spock.lang.Specification
/**
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
class RetryOptsTest extends Specification {
def 'should create retry config' () {
expect:
new RetryOpts().delay == Duration.of('450ms')
new RetryOpts().maxDelay == Duration.of('90s')
new RetryOpts().maxAttempts == 10
new RetryOpts().jitter == 0.25d
new RetryOpts().multiplier == 2.0d
and:
new RetryOpts([maxAttempts: 20]).maxAttempts == 20
new RetryOpts([delay: '1s']).delay == Duration.of('1s')
new RetryOpts([maxDelay: '1m']).maxDelay == Duration.of('1m')
new RetryOpts([jitter: '0.5']).jitter == 0.5d
new RetryOpts([multiplier: '3.0']).multiplier == 3.0d
}
def 'should implement Retryable.Config interface' () {
when:
def opts = new RetryOpts([delay: '1s', maxDelay: '2m', maxAttempts: 5, jitter: '0.3', multiplier: '1.5'])
then:
opts.getDelayAsDuration() == java.time.Duration.ofSeconds(1)
opts.getMaxDelayAsDuration() == java.time.Duration.ofMinutes(2)
opts.getMaxAttempts() == 5
opts.getJitter() == 0.3d
opts.getMultiplier() == 1.5d
}
}

View File

@@ -0,0 +1,88 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seqera.config
import nextflow.util.Duration
import spock.lang.Specification
/**
* Unit tests for SeqeraConfig
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
class SeqeraConfigTest extends Specification {
def 'should create config with no executor' () {
when:
def config = new SeqeraConfig([:])
then:
config.executor == null
}
def 'should create config with executor settings' () {
when:
def config = new SeqeraConfig([
executor: [
endpoint: 'https://sched.example.com',
region: 'us-west-2'
]
])
then:
config.executor != null
config.executor.endpoint == 'https://sched.example.com'
config.executor.region == 'us-west-2'
}
def 'should create config with full executor settings' () {
when:
def config = new SeqeraConfig([
executor: [
endpoint: 'https://sched.example.com',
region: 'eu-west-1',
keyPairName: 'my-key',
batchFlushInterval: '2 sec',
machineRequirement: [
provisioning: 'spot'
]
]
])
then:
config.executor != null
config.executor.endpoint == 'https://sched.example.com'
config.executor.region == 'eu-west-1'
config.executor.keyPairName == 'my-key'
config.executor.batchFlushInterval == Duration.of('2 sec')
config.executor.machineRequirement.provisioning == 'spot'
}
def 'should throw error when executor endpoint is missing' () {
when:
new SeqeraConfig([
executor: [
region: 'us-west-2'
]
])
then:
def e = thrown(IllegalArgumentException)
e.message.contains('Missing Seqera endpoint')
}
}

View File

@@ -0,0 +1,201 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seqera.executor
import java.nio.file.Files
import java.nio.file.Path
import nextflow.file.FileHolder
import nextflow.processor.TaskRun
import spock.lang.Specification
import spock.lang.TempDir
/**
* Tests for InputFilesProfiler
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
class InputFilesProfilerTest extends Specification {
@TempDir
Path tempDir
def 'should return null for null task'() {
expect:
InputFilesProfiler.compute((TaskRun) null) == null
}
def 'should return null for task with no input files'() {
given:
def task = Mock(TaskRun) {
getInputFiles() >> []
}
expect:
InputFilesProfiler.compute(task) == null
}
def 'should return null for empty file list'() {
expect:
InputFilesProfiler.compute([]) == null
}
def 'should return null for null file list'() {
expect:
InputFilesProfiler.compute((List) null) == null
}
def 'should compute metrics for single small file'() {
given:
def file = tempDir.resolve('small.txt')
Files.write(file, 'hello'.bytes)
def files = [new FileHolder(file)]
when:
def metrics = InputFilesProfiler.compute(files)
then:
metrics.count == 1
metrics.totalBytes == 5
metrics.maxFileBytes == 5
metrics.minFileBytes == 5
}
def 'should compute metrics for multiple files'() {
given:
def smallFile = tempDir.resolve('small.txt')
Files.write(smallFile, new byte[500])
def mediumFile = tempDir.resolve('medium.dat')
Files.write(mediumFile, new byte[2000])
def largeFile = tempDir.resolve('large.dat')
Files.write(largeFile, new byte[50000])
def files = [
new FileHolder(smallFile),
new FileHolder(mediumFile),
new FileHolder(largeFile)
]
when:
def metrics = InputFilesProfiler.compute(files)
then:
metrics.count == 3
metrics.totalBytes == 500 + 2000 + 50000
metrics.maxFileBytes == 50000
metrics.minFileBytes == 500
}
def 'should count files in directory recursively'() {
given:
def dir = tempDir.resolve('mydir')
Files.createDirectory(dir)
Files.write(dir.resolve('file1.txt'), new byte[100])
Files.write(dir.resolve('file2.txt'), new byte[200])
def subDir = dir.resolve('subdir')
Files.createDirectory(subDir)
Files.write(subDir.resolve('file3.txt'), new byte[300])
def files = [new FileHolder(dir)]
when:
def metrics = InputFilesProfiler.compute(files)
then:
metrics.count == 3 // 3 actual files inside the directory
metrics.totalBytes == 600 // 100 + 200 + 300
metrics.maxFileBytes == 600
metrics.minFileBytes == 600
}
def 'should count files and directory contents together'() {
given:
def file1 = tempDir.resolve('input.fq')
Files.write(file1, new byte[5000])
def dir = tempDir.resolve('index')
Files.createDirectory(dir)
Files.write(dir.resolve('a.bin'), new byte[100])
Files.write(dir.resolve('b.bin'), new byte[200])
def files = [new FileHolder(file1), new FileHolder(dir)]
when:
def metrics = InputFilesProfiler.compute(files)
then:
metrics.count == 3 // 1 regular file + 2 files in directory
metrics.totalBytes == 5300
metrics.maxFileBytes == 5000
metrics.minFileBytes == 300
}
def 'should follow symlinks'() {
given:
def realFile = tempDir.resolve('real.txt')
Files.write(realFile, new byte[1000])
def symlink = tempDir.resolve('link.txt')
Files.createSymbolicLink(symlink, realFile)
def files = [new FileHolder(symlink)]
when:
def metrics = InputFilesProfiler.compute(files)
then:
metrics.count == 1
metrics.totalBytes == 1000
metrics.maxFileBytes == 1000
metrics.minFileBytes == 1000
}
def 'should handle non-existent file gracefully'() {
given:
def missingFile = tempDir.resolve('does-not-exist.txt')
def files = [new FileHolder(missingFile)]
when:
def metrics = InputFilesProfiler.compute(files)
then:
metrics.count == 1
metrics.totalBytes == 0
}
def 'should compute from TaskRun'() {
given:
def file = tempDir.resolve('task-input.txt')
Files.write(file, new byte[2048])
def task = Mock(TaskRun) {
getInputFiles() >> [new FileHolder(file)]
}
when:
def metrics = InputFilesProfiler.compute(task)
then:
metrics.count == 1
metrics.totalBytes == 2048
metrics.maxFileBytes == 2048
metrics.minFileBytes == 2048
}
}

View File

@@ -0,0 +1,320 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seqera.executor
import nextflow.NextflowMeta
import nextflow.config.Manifest
import nextflow.script.PlatformMetadata
import nextflow.script.WorkflowMetadata
import spock.lang.Specification
/**
* Tests for Labels helper
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
class LabelsTest extends Specification {
def 'should create labels with all workflow metadata'() {
given:
def sessionId = UUID.randomUUID()
def workflow = Mock(WorkflowMetadata) {
getProjectName() >> 'nf-core/rnaseq'
getUserName() >> 'pditommaso'
getRunName() >> 'crazy_darwin'
getSessionId() >> sessionId
isResume() >> true
getRevision() >> '3.12.0'
getCommitId() >> 'abc1234'
getRepository() >> 'https://github.com/nf-core/rnaseq'
getManifest() >> new Manifest([name: 'nf-core/rnaseq'])
}
when:
def labels = new Labels()
.withWorkflowMetadata(workflow)
then:
labels.entries['nextflow.io/projectName'] == 'nf-core/rnaseq'
labels.entries['nextflow.io/userName'] == 'pditommaso'
labels.entries['nextflow.io/runName'] == 'crazy_darwin'
labels.entries['nextflow.io/sessionId'] == sessionId.toString()
labels.entries['nextflow.io/resume'] == 'true'
labels.entries['nextflow.io/revision'] == '3.12.0'
labels.entries['nextflow.io/commitId'] == 'abc1234'
labels.entries['nextflow.io/repository'] == 'https://github.com/nf-core/rnaseq'
labels.entries['nextflow.io/manifestName'] == 'nf-core/rnaseq'
labels.entries['nextflow.io/runtimeVersion'] == NextflowMeta.instance.version.toString()
}
def 'should compute stable runId from sessionId and runName'() {
given:
def sid = 'e2315a82-49b0-4langc3-a58a-0d7d52f7e3a1'
def runName = 'crazy_darwin'
expect:
Labels.runId(sid, runName) == Labels.runId(sid, runName)
Labels.runId(sid, runName) != Labels.runId(sid, 'other_name')
Labels.runId(sid, runName) != Labels.runId(UUID.randomUUID().toString(), runName)
}
def 'should omit null workflow metadata from labels'() {
given:
def workflow = Mock(WorkflowMetadata) {
getProjectName() >> 'hello'
getUserName() >> 'user1'
getRunName() >> 'happy_turing'
getSessionId() >> UUID.randomUUID()
isResume() >> false
getRevision() >> null
getCommitId() >> null
getRepository() >> null
getManifest() >> new Manifest([:])
}
when:
def labels = new Labels()
.withWorkflowMetadata(workflow)
then:
labels.entries.containsKey('nextflow.io/projectName')
labels.entries.containsKey('nextflow.io/userName')
labels.entries.containsKey('nextflow.io/runName')
labels.entries.containsKey('nextflow.io/sessionId')
labels.entries['nextflow.io/resume'] == 'false'
!labels.entries.containsKey('nextflow.io/revision')
!labels.entries.containsKey('nextflow.io/commitId')
!labels.entries.containsKey('nextflow.io/repository')
!labels.entries.containsKey('nextflow.io/manifestName')
}
def 'should add scheduler labels'() {
when:
def labels = new Labels()
.withSchedRunId('run-123')
.withSchedClusterId('cluster-456')
then:
labels.entries['seqera:sched:runId'] == 'run-123'
labels.entries['seqera:sched:clusterId'] == 'cluster-456'
}
def 'should skip null scheduler labels'() {
when:
def labels = new Labels()
.withSchedRunId(null)
.withSchedClusterId(null)
then:
!labels.entries.containsKey('seqera:sched:runId')
!labels.entries.containsKey('seqera:sched:clusterId')
}
def 'should include platform workspaceId and computeEnvId when available'() {
given:
def platform = new PlatformMetadata('wf-abc123')
platform.workspace = new PlatformMetadata.Workspace(workspaceId: '1234')
platform.computeEnv = new PlatformMetadata.ComputeEnv(id: 'ce-abc')
def workflow = Mock(WorkflowMetadata) {
getRunName() >> 'happy_turing'
getSessionId() >> UUID.randomUUID()
isResume() >> false
getManifest() >> new Manifest([:])
getPlatform() >> platform
}
when:
def labels = new Labels()
.withWorkflowMetadata(workflow, ['workspaceId', 'computeEnvId'] as Set)
then:
labels.entries.keySet() == ['seqera.io/platform/workspaceId', 'seqera.io/platform/computeEnvId'] as Set
labels.entries['seqera.io/platform/workspaceId'] == '1234'
labels.entries['seqera.io/platform/computeEnvId'] == 'ce-abc'
}
def 'should include platform workflowId when available'() {
given:
def workflow = Mock(WorkflowMetadata) {
getProjectName() >> 'hello'
getUserName() >> 'user1'
getRunName() >> 'happy_turing'
getSessionId() >> UUID.randomUUID()
isResume() >> false
getManifest() >> new Manifest([:])
getPlatform() >> new PlatformMetadata('wf-abc123')
}
when:
def labels = new Labels()
.withWorkflowMetadata(workflow)
then:
labels.entries['seqera.io/platform/workflowId'] == 'wf-abc123'
}
def 'should omit platform workflowId when not set'() {
given:
def workflow = Mock(WorkflowMetadata) {
getProjectName() >> 'hello'
getUserName() >> 'user1'
getRunName() >> 'happy_turing'
getSessionId() >> UUID.randomUUID()
isResume() >> false
getManifest() >> new Manifest([:])
getPlatform() >> new PlatformMetadata()
}
when:
def labels = new Labels()
.withWorkflowMetadata(workflow)
then:
!labels.entries.containsKey('seqera.io/platform/workflowId')
}
def 'should emit only included workflow metadata labels'() {
given:
def workflow = Mock(WorkflowMetadata) {
getProjectName() >> 'nf-core/rnaseq'
getRunName() >> 'crazy_darwin'
getSessionId() >> UUID.randomUUID()
isResume() >> false
getRevision() >> '3.12.0'
getManifest() >> new Manifest([name: 'nf-core/rnaseq'])
}
when:
def labels = new Labels()
.withWorkflowMetadata(workflow, ['runName', 'revision'] as Set)
then:
labels.entries.keySet() == ['nextflow.io/runName', 'nextflow.io/revision'] as Set
}
def 'should emit only the workflowId label when filtered to workflowId'() {
given:
def workflow = Mock(WorkflowMetadata) {
getProjectName() >> 'hello'
getRunName() >> 'happy_turing'
getSessionId() >> UUID.randomUUID()
isResume() >> false
getManifest() >> new Manifest([:])
getPlatform() >> new PlatformMetadata('wf-abc123')
}
when:
def labels = new Labels()
.withWorkflowMetadata(workflow, ['workflowId'] as Set)
then:
labels.entries.keySet() == ['seqera.io/platform/workflowId'] as Set
}
def 'should emit nothing for an empty include set'() {
given:
def workflow = Mock(WorkflowMetadata) {
getProjectName() >> 'hello'
}
when:
def labels = new Labels()
.withWorkflowMetadata(workflow, [] as Set)
then:
labels.entries.isEmpty()
}
def 'should add process resource labels coercing values to string'() {
when:
def labels = new Labels()
.withProcessResourceLabels([team: 'genomics', priority: 7, retain: true])
then:
labels.entries['team'] == 'genomics'
labels.entries['priority'] == '7'
labels.entries['retain'] == 'true'
}
def 'should ignore null or empty process resource labels'() {
when:
def a = new Labels().withProcessResourceLabels(null)
def b = new Labels().withProcessResourceLabels([:])
then:
a.entries.isEmpty()
b.entries.isEmpty()
}
def 'should let process resource labels override workflow metadata on key collision'() {
given:
def workflow = Mock(WorkflowMetadata) {
getProjectName() >> 'hello'
getRunName() >> 'happy_turing'
getSessionId() >> UUID.randomUUID()
isResume() >> false
getManifest() >> new Manifest([:])
}
when:
def labels = new Labels()
.withWorkflowMetadata(workflow)
.withProcessResourceLabels(['nextflow.io/runName': 'custom', team: 'a'])
then:
labels.entries['nextflow.io/runName'] == 'custom'
labels.entries['team'] == 'a'
labels.entries['nextflow.io/projectName'] == 'hello'
}
def 'should coerce map values to strings'() {
expect:
Labels.toStringMap(null) == [:]
Labels.toStringMap([:]) == [:]
Labels.toStringMap([a: 1, b: 'x', c: true]) == [a: '1', b: 'x', c: 'true']
}
def 'should reject non-map resourceLabels with a clear error'() {
when:
Labels.toStringMap(['foo', 'bar'])
then:
def err = thrown(IllegalArgumentException)
err.message.contains("'resourceLabels'")
err.message.contains('map of key/value pairs')
err.message.contains('java.util.ArrayList')
}
def 'should compute null delta when task labels are empty'() {
expect:
Labels.delta(null, [team: 'a']) == null
Labels.delta([:], [team: 'a']) == null
}
def 'should return full task labels when run labels are empty'() {
expect:
Labels.delta([team: 'a', region: 'us'], null) == [team: 'a', region: 'us']
Labels.delta([team: 'a', region: 'us'], [:]) == [team: 'a', region: 'us']
}
def 'should keep only differing or missing keys in delta'() {
expect:
Labels.delta([team: 'a', region: 'us'], [team: 'a']) == [region: 'us']
Labels.delta([team: 'b'], [team: 'a']) == [team: 'b']
Labels.delta([team: 'a', region: 'us'], [team: 'a', region: 'us']) == null
}
}

View File

@@ -0,0 +1,592 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seqera.executor
import java.nio.file.Files
import java.nio.file.Path
import java.util.concurrent.atomic.AtomicInteger
import io.seqera.sched.api.schema.v1a1.CreateTasksResponse
import io.seqera.sched.api.schema.v1a1.Task
import io.seqera.sched.client.SchedClient
import nextflow.file.FileHolder
import nextflow.processor.TaskRun
import nextflow.util.Duration
import spock.lang.Specification
import spock.lang.TempDir
import spock.lang.Timeout
/**
* Tests for SeqeraBatchSubmitter
*
* @author Lorenzo Fontana <fontanalorenz@gmail.com>
*/
@Timeout(30)
class SeqeraBatchSubmitterTest extends Specification {
static final String TEST_RUN = 'run-test:local'
def 'should batch multiple tasks submitted within the interval'() {
given:
def taskIds = ['task-1', 'task-2', 'task-3']
def capturedTasks = []
def client = Stub(SchedClient) {
createTasks(_, _) >> { String runId, List<Task> tasks ->
capturedTasks.addAll(tasks)
Stub(CreateTasksResponse) {
getTaskIds() >> taskIds.take(tasks.size())
}
}
}
def submitter = new SeqeraBatchSubmitter(client, TEST_RUN, Duration.of('500ms'))
def handlers = (1..3).collect { createMockHandler() }
def tasks = (1..3).collect { new Task().image("image-$it") }
when: 'start submitter and enqueue tasks quickly'
submitter.start()
handlers.eachWithIndex { handler, i ->
submitter.submit(handler, tasks[i])
}
// Wait for the batch interval to elapse and tasks to be submitted
sleep(800)
submitter.shutdown()
then: 'all tasks should be submitted in a single batch'
capturedTasks.size() == 3
capturedTasks*.image == ['image-1', 'image-2', 'image-3']
and: 'task IDs should be assigned to handlers'
handlers.each { handler ->
1 * handler.setBatchTaskId(_)
}
}
def 'should submit tasks in separate batches when interval elapses between them'() {
given:
def batchCount = new AtomicInteger()
def batches = [].asSynchronized() // thread-safe list to capture batches
def client = Stub(SchedClient) {
createTasks(_, _) >> { String runId, List<Task> tasks ->
def batchNum = batchCount.incrementAndGet()
def tasksInBatch = tasks.collect { it.image }
batches << [batch: batchNum, tasks: tasksInBatch]
Stub(CreateTasksResponse) {
getTaskIds() >> tasks.collect { "task-${it.image}" }
}
}
}
def submitter = new SeqeraBatchSubmitter(client, TEST_RUN, Duration.of('200ms'))
when: 'enqueue first batch, wait for flush, then enqueue second batch'
submitter.start()
// First batch - tasks s1 and s2
submitter.submit(createMockHandler(), new Task().image('s1'))
submitter.submit(createMockHandler(), new Task().image('s2'))
// Wait for first batch to flush (interval + buffer)
sleep(400)
// Second batch - task s3
submitter.submit(createMockHandler(), new Task().image('s3'))
// Wait for second batch to flush
sleep(400)
submitter.shutdown()
then: 'should have two separate batches with correct tasks'
batches.size() == 2
and: 'first batch contains s1 and s2'
batches[0].batch == 1
batches[0].tasks == ['s1', 's2']
and: 'second batch contains only s3'
batches[1].batch == 2
batches[1].tasks == ['s3']
}
def 'should flush batch immediately when reaching max size'() {
given:
def batchSizes = [].asSynchronized()
def client = Stub(SchedClient) {
createTasks(_, _) >> { String runId, List<Task> tasks ->
batchSizes << tasks.size()
Stub(CreateTasksResponse) {
getTaskIds() >> tasks.collect { "task-${System.nanoTime()}" }
}
}
}
def submitter = new SeqeraBatchSubmitter(client, TEST_RUN, Duration.of('10s')) // Long interval
when: 'enqueue exactly TASKS_PER_REQUEST tasks'
submitter.start()
(1..SeqeraBatchSubmitter.TASKS_PER_REQUEST).each {
submitter.submit(createMockHandler(), new Task().image("img$it"))
}
// Give a small delay for the batch to be processed
sleep(200)
submitter.shutdown()
then: 'should flush immediately due to batch size limit'
batchSizes.size() >= 1
batchSizes[0] == SeqeraBatchSubmitter.TASKS_PER_REQUEST
}
def 'should split into multiple batches when exceeding max size'() {
given:
def totalTasks = SeqeraBatchSubmitter.TASKS_PER_REQUEST + 20 // 120 tasks
def batchSizes = [].asSynchronized()
def client = Stub(SchedClient) {
createTasks(_, _) >> { String runId, List<Task> tasks ->
batchSizes << tasks.size()
Stub(CreateTasksResponse) {
getTaskIds() >> tasks.collect { "task-${System.nanoTime()}" }
}
}
}
def submitter = new SeqeraBatchSubmitter(client, TEST_RUN, Duration.of('10s')) // Long interval
when: 'enqueue more than TASKS_PER_REQUEST tasks'
submitter.start()
(1..totalTasks).each {
submitter.submit(createMockHandler(), new Task().image("img$it"))
}
// Wait for batches to be processed
sleep(500)
submitter.shutdown()
then: 'should create two batches: 100 + 20'
batchSizes.size() == 2
batchSizes[0] == SeqeraBatchSubmitter.TASKS_PER_REQUEST // 100
batchSizes[1] == 20
}
def 'should flush remaining tasks on shutdown'() {
given:
def capturedTasks = [].asSynchronized()
def client = Stub(SchedClient) {
createTasks(_, _) >> { String runId, List<Task> tasks ->
capturedTasks.addAll(tasks)
Stub(CreateTasksResponse) {
getTaskIds() >> tasks.collect { "task-${System.nanoTime()}" }
}
}
}
def submitter = new SeqeraBatchSubmitter(client, TEST_RUN, Duration.of('10s')) // Long interval
when: 'enqueue tasks and immediately shutdown'
submitter.start()
submitter.submit(createMockHandler(), new Task().image('img1'))
submitter.submit(createMockHandler(), new Task().image('img2'))
// Shutdown without waiting for interval
submitter.shutdown()
then: 'tasks should still be submitted'
capturedTasks.size() == 2
}
def 'should throw exception when enqueueing after shutdown'() {
given:
def client = Stub(SchedClient)
def submitter = new SeqeraBatchSubmitter(client, TEST_RUN, Duration.of('1s'))
when:
submitter.start()
submitter.shutdown()
submitter.submit(createMockHandler(), new Task().image('img1'))
then:
thrown(IllegalStateException)
}
def 'should propagate failure to handlers on API error'() {
given:
def apiError = new RuntimeException('API error')
def client = Stub(SchedClient) {
createTasks(_, _) >> { throw apiError }
}
def submitter = new SeqeraBatchSubmitter(client, TEST_RUN, Duration.of('100ms'))
def handler1 = Mock(SeqeraTaskHandler)
def handler2 = Mock(SeqeraTaskHandler)
when:
submitter.start()
submitter.submit(handler1, new Task().image('img1'))
submitter.submit(handler2, new Task().image('img2'))
sleep(300)
submitter.shutdown()
then: 'all handlers should receive the failure'
1 * handler1.onBatchSubmitFailure(apiError)
1 * handler2.onBatchSubmitFailure(apiError)
}
def 'should propagate failure to handlers when API returns wrong number of task IDs'() {
given:
def client = Stub(SchedClient) {
createTasks(_, _) >> { String runId, List<Task> tasks ->
Stub(CreateTasksResponse) {
// Return fewer IDs than tasks submitted
getTaskIds() >> ['task-1']
}
}
}
def submitter = new SeqeraBatchSubmitter(client, TEST_RUN, Duration.of('100ms'))
def handler1 = Mock(SeqeraTaskHandler)
def handler2 = Mock(SeqeraTaskHandler)
def handler3 = Mock(SeqeraTaskHandler)
when:
submitter.start()
submitter.submit(handler1, new Task().image('img1'))
submitter.submit(handler2, new Task().image('img2'))
submitter.submit(handler3, new Task().image('img3'))
sleep(300)
submitter.shutdown()
then: 'handlers should receive failure due to mismatched IDs'
1 * handler1.onBatchSubmitFailure({ it instanceof IllegalStateException })
1 * handler2.onBatchSubmitFailure({ it instanceof IllegalStateException })
1 * handler3.onBatchSubmitFailure({ it instanceof IllegalStateException })
}
def 'should start batch timer only when first task arrives not when thread starts'() {
given:
def submitTimes = [].asSynchronized()
def client = Stub(SchedClient) {
createTasks(_, _) >> { String runId, List<Task> tasks ->
submitTimes << System.currentTimeMillis()
Stub(CreateTasksResponse) {
getTaskIds() >> tasks.collect { "task-${System.nanoTime()}" }
}
}
}
def submitter = new SeqeraBatchSubmitter(client, TEST_RUN, Duration.of('300ms'))
when: 'start submitter, wait longer than interval, then enqueue tasks'
submitter.start()
// Wait longer than the batch interval before enqueueing
sleep(500)
def enqueueTime = System.currentTimeMillis()
submitter.submit(createMockHandler(), new Task().image('img1'))
submitter.submit(createMockHandler(), new Task().image('img2'))
// Wait for batch to flush
sleep(500)
submitter.shutdown()
then: 'batch should be submitted ~300ms after enqueue, not immediately'
submitTimes.size() == 1
// The submit should happen ~300ms after enqueue
def timeSinceEnqueue = submitTimes[0] - enqueueTime
timeSinceEnqueue >= 250 // Allow some tolerance
timeSinceEnqueue < 600 // But not too long
}
def 'should use default interval when not specified'() {
given:
def client = Stub(SchedClient)
when:
def submitter = new SeqeraBatchSubmitter(client, TEST_RUN)
then:
submitter.requestInterval == SeqeraBatchSubmitter.REQUEST_INTERVAL
submitter.keepAliveInterval == SeqeraBatchSubmitter.KEEP_ALIVE_INTERVAL
}
def 'should send keep-alive when no tasks received within keep-alive interval'() {
given:
def submissions = [].asSynchronized()
def client = Stub(SchedClient) {
createTasks(_, _) >> { String runId, List<Task> tasks ->
submissions << [runId: runId, taskCount: tasks.size()]
Stub(CreateTasksResponse) {
getTaskIds() >> tasks.collect { "task-${System.nanoTime()}" }
}
}
}
// Short keep-alive interval for testing
def submitter = new SeqeraBatchSubmitter(client, TEST_RUN, Duration.of('10s'), Duration.of('200ms'))
when: 'start submitter and wait for keep-alive interval without enqueueing tasks'
submitter.start()
// Wait for keep-alive to trigger (interval + buffer)
sleep(400)
submitter.shutdown()
then: 'should have sent at least one keep-alive (empty submission)'
submissions.size() >= 1
submissions.any { it.taskCount == 0 }
}
def 'should not send keep-alive when tasks are being submitted regularly'() {
given:
def submissions = [].asSynchronized()
def client = Stub(SchedClient) {
createTasks(_, _) >> { String runId, List<Task> tasks ->
submissions << [runId: runId, taskCount: tasks.size()]
Stub(CreateTasksResponse) {
getTaskIds() >> tasks.collect { "task-${System.nanoTime()}" }
}
}
}
// Short intervals for testing
def submitter = new SeqeraBatchSubmitter(client, TEST_RUN, Duration.of('100ms'), Duration.of('300ms'))
when: 'start submitter and continuously enqueue tasks'
submitter.start()
// Enqueue tasks at intervals shorter than keep-alive
5.times { i ->
submitter.submit(createMockHandler(), new Task().image("img$i"))
sleep(80)
}
sleep(200) // Wait for final batch to flush
submitter.shutdown()
then: 'all submissions should have tasks, no keep-alive (empty) submissions'
submissions.size() >= 1
submissions.every { it.taskCount > 0 }
}
def 'should invoke error callback on fatal error in thread'() {
given:
// Use an Error to simulate a truly fatal condition that escapes flushBatch
def fatalError = new OutOfMemoryError('Fatal thread error')
def errorReceived = null
def onError = { Throwable t -> errorReceived = t }
def client = Stub(SchedClient) {
createTasks(_, _) >> { String runId, List<Task> tasks ->
// Throw a fatal Error that will escape flushBatch
throw fatalError
}
}
def submitter = new SeqeraBatchSubmitter(client, TEST_RUN, Duration.of('100ms'), Duration.of('10s'), onError)
def handler = Mock(SeqeraTaskHandler)
when:
submitter.start()
submitter.submit(handler, new Task().image('img1'))
sleep(300)
submitter.shutdown()
then: 'handler should receive failure (wrapped in RuntimeException since it was an Error)'
1 * handler.onBatchSubmitFailure({ it instanceof RuntimeException && it.cause == fatalError })
and: 'error callback should be invoked with original error'
errorReceived == fatalError
}
def 'should drain and fail pending tasks on fatal error'() {
given:
def fatalError = new RuntimeException('Fatal thread error')
def failedHandlers = [].asSynchronized()
def onError = { Throwable t -> /* no-op */ }
def client = Stub(SchedClient) {
createTasks(_, _) >> { String runId, List<Task> tasks ->
// Always throw to simulate fatal error
throw fatalError
}
}
def submitter = new SeqeraBatchSubmitter(client, TEST_RUN, Duration.of('100ms'), Duration.of('10s'), onError)
// Create handlers that track when they receive failure notification
def handlers = (1..3).collect {
def h = Mock(SeqeraTaskHandler) {
onBatchSubmitFailure(_) >> { args -> failedHandlers << it }
}
h
}
when:
submitter.start()
handlers.each { h ->
submitter.submit(h, new Task().image('img'))
}
sleep(300)
submitter.shutdown()
then: 'all handlers should receive failure notification'
handlers.each { h ->
1 * h.onBatchSubmitFailure(fatalError)
}
}
def 'should continue after keep-alive failure'() {
given:
def keepAliveFailCount = new AtomicInteger()
def taskSubmissions = [].asSynchronized()
def client = Stub(SchedClient) {
createTasks(_, _) >> { String runId, List<Task> tasks ->
if (tasks.isEmpty()) {
// Keep-alive call - fail it
keepAliveFailCount.incrementAndGet()
throw new RuntimeException('Keep-alive failed')
}
// Normal task submission
taskSubmissions << tasks.size()
Stub(CreateTasksResponse) {
getTaskIds() >> tasks.collect { "task-${System.nanoTime()}" }
}
}
}
// Short keep-alive interval to trigger failures quickly
def submitter = new SeqeraBatchSubmitter(client, TEST_RUN, Duration.of('500ms'), Duration.of('100ms'))
when: 'start submitter, let keep-alive fail, then submit tasks'
submitter.start()
// Wait for a few keep-alive failures
sleep(350)
// Now submit a task
submitter.submit(createMockHandler(), new Task().image('img1'))
// Wait for task to be submitted
sleep(700)
submitter.shutdown()
then: 'keep-alive should have failed but thread should continue'
keepAliveFailCount.get() >= 1
and: 'task submission should still succeed'
taskSubmissions.size() >= 1
}
def 'should work without error callback'() {
given:
def taskSubmissions = [].asSynchronized()
def client = Stub(SchedClient) {
createTasks(_, _) >> { String runId, List<Task> tasks ->
taskSubmissions << tasks.size()
Stub(CreateTasksResponse) {
getTaskIds() >> tasks.collect { "task-${System.nanoTime()}" }
}
}
}
// Constructor without error callback (null)
def submitter = new SeqeraBatchSubmitter(client, TEST_RUN, Duration.of('100ms'), Duration.of('10s'), null)
when:
submitter.start()
submitter.submit(createMockHandler(), new Task().image('img1'))
sleep(300)
submitter.shutdown()
then: 'should work normally'
taskSubmissions.size() >= 1
}
// -- input files metrics tests --
@TempDir
Path tempDir
def 'should attach input files metrics to task before submission'() {
given:
def file1 = tempDir.resolve('a.txt')
Files.write(file1, new byte[500])
def file2 = tempDir.resolve('b.txt')
Files.write(file2, new byte[3000])
def capturedTasks = [].asSynchronized()
def client = Stub(SchedClient) {
createTasks(_, _) >> { String runId, List<Task> tasks ->
capturedTasks.addAll(tasks)
Stub(CreateTasksResponse) {
getTaskIds() >> tasks.collect { "task-${System.nanoTime()}" }
}
}
}
def submitter = new SeqeraBatchSubmitter(client, TEST_RUN, Duration.of('100ms'))
def taskRun = Mock(TaskRun) {
getInputFiles() >> [new FileHolder(file1), new FileHolder(file2)]
}
def handler = Mock(SeqeraTaskHandler) {
getTask() >> taskRun
}
when:
submitter.start()
submitter.submit(handler, new Task().image('img1'))
sleep(300)
submitter.shutdown()
then: 'metrics should be set on the submitted task'
capturedTasks.size() == 1
def metrics = capturedTasks[0].getInputFiles()
metrics != null
metrics.count == 2
metrics.totalBytes == 3500
metrics.maxFileBytes == 3000
metrics.minFileBytes == 500
}
def 'should submit task without metrics when no input files'() {
given:
def capturedTasks = [].asSynchronized()
def client = Stub(SchedClient) {
createTasks(_, _) >> { String runId, List<Task> tasks ->
capturedTasks.addAll(tasks)
Stub(CreateTasksResponse) {
getTaskIds() >> tasks.collect { "task-${System.nanoTime()}" }
}
}
}
def submitter = new SeqeraBatchSubmitter(client, TEST_RUN, Duration.of('100ms'))
def taskRun = Mock(TaskRun) {
getInputFiles() >> []
}
def handler = Mock(SeqeraTaskHandler) {
getTask() >> taskRun
}
when:
submitter.start()
submitter.submit(handler, new Task().image('img1'))
sleep(300)
submitter.shutdown()
then: 'task should be submitted without metrics'
capturedTasks.size() == 1
capturedTasks[0].getInputFiles() == null
}
def 'should submit task even when metrics computation fails'() {
given:
def capturedTasks = [].asSynchronized()
def client = Stub(SchedClient) {
createTasks(_, _) >> { String runId, List<Task> tasks ->
capturedTasks.addAll(tasks)
Stub(CreateTasksResponse) {
getTaskIds() >> tasks.collect { "task-${System.nanoTime()}" }
}
}
}
def submitter = new SeqeraBatchSubmitter(client, TEST_RUN, Duration.of('100ms'))
def taskRun = Mock(TaskRun) {
getInputFiles() >> { throw new RuntimeException('Cannot access input files') }
}
def handler = Mock(SeqeraTaskHandler) {
getTask() >> taskRun
}
when:
submitter.start()
submitter.submit(handler, new Task().image('img1'))
sleep(300)
submitter.shutdown()
then: 'task should still be submitted despite metrics failure'
capturedTasks.size() == 1
}
/**
* Creates a mock handler that can track setBatchTaskId and onBatchSubmitFailure calls
*/
private SeqeraTaskHandler createMockHandler() {
Mock(SeqeraTaskHandler)
}
}

View File

@@ -0,0 +1,270 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seqera.executor
import io.seqera.config.ExecutorOpts
import io.seqera.config.SeqeraConfig
import io.seqera.sched.api.schema.v1a1.CreateRunRequest
import io.seqera.sched.api.schema.v1a1.CreateRunResponse
import io.seqera.sched.client.SchedClient
import io.seqera.sched.client.SchedClientConfig
import nextflow.Session
import nextflow.SysEnv
import nextflow.platform.PlatformHelper
import nextflow.script.WorkflowMetadata
import spock.lang.Specification
/**
* Tests for SeqeraExecutor client configuration
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
class SeqeraExecutorTest extends Specification {
def cleanup() {
SysEnv.pop()
}
def 'should create client config with config settings'() {
given:
SysEnv.push([:])
when:
def config = buildClientConfig(
[endpoint: 'https://sched.example.com', region: 'us-west-2'],
[endpoint: 'https://api.platform.example.com', accessToken: 'config-access-token', refreshToken: 'config-refresh-token']
)
then:
config.endpoint == 'https://sched.example.com'
config.platformUrl == 'https://api.platform.example.com'
config.accessToken == 'config-access-token'
config.refreshToken == 'config-refresh-token'
}
def 'should create client config with env variable settings'() {
given:
SysEnv.push([
TOWER_API_ENDPOINT: 'https://api.env.example.com',
TOWER_ACCESS_TOKEN: 'env-access-token',
TOWER_REFRESH_TOKEN: 'env-refresh-token'
])
when:
def config = buildClientConfig(
[endpoint: 'https://sched.example.com', region: 'us-west-2'],
[:]
)
then:
config.endpoint == 'https://sched.example.com'
config.platformUrl == 'https://api.env.example.com'
config.accessToken == 'env-access-token'
config.refreshToken == 'env-refresh-token'
}
def 'should use default platform url when not configured'() {
given:
SysEnv.push([:])
when:
def config = buildClientConfig(
[endpoint: 'https://sched.example.com', region: 'us-west-2'],
[accessToken: 'my-token']
)
then:
config.endpoint == 'https://sched.example.com'
config.platformUrl == 'https://api.cloud.seqera.io'
config.accessToken == 'my-token'
config.refreshToken == null
}
def 'should prefer config over env variables'() {
given:
SysEnv.push([
TOWER_API_ENDPOINT: 'https://api.env.example.com',
TOWER_ACCESS_TOKEN: 'env-access-token',
TOWER_REFRESH_TOKEN: 'env-refresh-token'
])
when:
def config = buildClientConfig(
[endpoint: 'https://sched.example.com', region: 'us-west-2'],
[endpoint: 'https://api.config.example.com', accessToken: 'config-access-token', refreshToken: 'config-refresh-token']
)
then:
config.endpoint == 'https://sched.example.com'
config.platformUrl == 'https://api.config.example.com'
config.accessToken == 'config-access-token'
config.refreshToken == 'config-refresh-token'
}
def 'should set fusion default version when not configured' () {
given:
SysEnv.push([:])
def fusionConfig = [enabled: true]
def config = [fusion: fusionConfig]
def session = Mock(Session) { getConfig() >> config }
def executor = new SeqeraExecutor(session: session)
when:
executor.applyFusionDefaults()
then:
fusionConfig.targetVersion == '2.6'
}
def 'should not override fusion version when containerConfigUrl is set' () {
given:
SysEnv.push([:])
def fusionConfig = [enabled: true, containerConfigUrl: 'https://custom.url/v3.0-amd64.json']
def config = [fusion: fusionConfig]
def session = Mock(Session) { getConfig() >> config }
def executor = new SeqeraExecutor(session: session)
when:
executor.applyFusionDefaults()
then:
fusionConfig.targetVersion == null
}
def 'should expose run resource labels coerced from config-level process.resourceLabels'() {
given:
SysEnv.push([:])
def executor = new SeqeraExecutor()
executor.session = Mock(Session) {
getConfig() >> [process: [resourceLabels: [team: 'a', priority: 7]]]
}
when:
executor.computeRunResourceLabels()
then:
executor.runResourceLabels == [team: 'a', priority: '7']
}
def 'should yield empty run resource labels when process.resourceLabels is absent'() {
given:
SysEnv.push([:])
def executor = new SeqeraExecutor()
executor.session = Mock(Session) {
getConfig() >> [:]
}
when:
executor.computeRunResourceLabels()
then:
executor.runResourceLabels == [:]
}
def 'should skip run resource labels when process.resourceLabels is a closure'() {
given:
SysEnv.push([:])
def executor = new SeqeraExecutor()
def dynamic = { [team: 'a', priority: 7] }
executor.session = Mock(Session) {
getConfig() >> [process: [resourceLabels: dynamic]]
}
when:
executor.computeRunResourceLabels()
then:
noExceptionThrown()
executor.runResourceLabels == [:]
}
def 'createRun populates CreateRunRequest.labels with config-level resourceLabels merged with auto-labels'() {
given:
SysEnv.push([:])
CreateRunRequest captured = null
def mockClient = Mock(SchedClient) {
createRun(_) >> { args ->
captured = args[0] as CreateRunRequest
new CreateRunResponse().runId('run-1')
}
}
def platform = new nextflow.script.PlatformMetadata('wf-abc123')
platform.workspace = new nextflow.script.PlatformMetadata.Workspace(workspaceId: '1234')
platform.computeEnv = new nextflow.script.PlatformMetadata.ComputeEnv(id: 'ce-abc')
def workflowMeta = Mock(WorkflowMetadata) {
getProjectName() >> 'my-project'
getUserName() >> 'alice'
getRunName() >> 'test-run'
getSessionId() >> UUID.fromString('00000000-0000-0000-0000-000000000001')
getResume() >> false
getRevision() >> null
getCommitId() >> null
getRepository() >> null
getManifest() >> null
getPlatform() >> platform
}
def sessionConfig = [
process: [resourceLabels: [team: 'platform', priority: 3]],
seqera: [executor: [endpoint: 'https://sched.example.com', provider: 'aws', region: 'us-east-1', autoLabels: true]],
tower: [:]
]
def session = Mock(Session) {
getConfig() >> sessionConfig
getWorkflowMetadata() >> workflowMeta
getWorkDir() >> java.nio.file.Paths.get('/work')
getRunName() >> 'test-run'
}
def seqeraOpts = new ExecutorOpts(endpoint: 'https://sched.example.com', provider: 'aws', region: 'us-east-1', autoLabels: true)
def executor = new SeqeraExecutor()
executor.session = session
executor.@seqeraConfig = seqeraOpts
executor.@client = mockClient
when:
executor.createRun()
then:
captured != null
captured.getLabels()['team'] == 'platform'
captured.getLabels()['priority'] == '3'
captured.getLabels()['nextflow.io/projectName'] == 'my-project'
captured.getLabels()['nextflow.io/runName'] == 'test-run'
captured.getLabels()['seqera.io/platform/workspaceId'] == '1234'
captured.getLabels()['seqera.io/platform/computeEnvId'] == 'ce-abc'
cleanup:
executor.batchSubmitter?.shutdown()
}
/**
* Builds a SchedClientConfig using the same logic as {@link SeqeraExecutor#createClient()}
*/
private SchedClientConfig buildClientConfig(Map executorOpts, Map towerConfig) {
def seqeraConfig = new SeqeraConfig([executor: executorOpts]).executor
def accessToken = PlatformHelper.getAccessToken(towerConfig, SysEnv.get())
def refreshToken = PlatformHelper.getRefreshToken(towerConfig, SysEnv.get())
def platformUrl = PlatformHelper.getEndpoint(towerConfig, SysEnv.get())
return SchedClientConfig.builder()
.endpoint(seqeraConfig.endpoint)
.platformUrl(platformUrl)
.accessToken(accessToken)
.refreshToken(refreshToken)
.retryConfig(seqeraConfig.retryOpts())
.build()
}
}

View File

@@ -0,0 +1,269 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seqera.util
import io.seqera.config.MachineRequirementOpts
import nextflow.util.MemoryUnit
import spock.lang.Specification
/**
* Tests for {@link HintHelper}
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
class HintHelperTest extends Specification {
def 'should return base opts when no hints'() {
given:
def base = new MachineRequirementOpts([provisioning: 'spot'])
when:
def result = HintHelper.overlayHints(base, [:])
then:
result.provisioning == 'spot'
}
def 'should return base opts when no seqera hints'() {
given:
def base = new MachineRequirementOpts([provisioning: 'spot'])
when:
def result = HintHelper.overlayHints(base, [consumableResources: 'my-license'])
then:
result.provisioning == 'spot'
}
def 'should overlay provisioning hint'() {
given:
def base = new MachineRequirementOpts([provisioning: 'ondemand'])
when:
def result = HintHelper.overlayHints(base, ['seqera/machineRequirement.provisioning': 'spotFirst'])
then:
result.provisioning == 'spotFirst'
}
def 'should overlay maxSpotAttempts hint'() {
given:
def base = new MachineRequirementOpts([:])
when:
def result = HintHelper.overlayHints(base, ['seqera/machineRequirement.maxSpotAttempts': 3])
then:
result.maxSpotAttempts == 3
}
def 'should overlay machineTypes as list'() {
given:
def base = new MachineRequirementOpts([:])
when:
def result = HintHelper.overlayHints(base, ['seqera/machineRequirement.machineTypes': ['m5', 'm5a', 'm6i']])
then:
result.machineTypes == ['m5', 'm5a', 'm6i']
}
def 'should overlay diskType hint'() {
given:
def base = new MachineRequirementOpts([:])
when:
def result = HintHelper.overlayHints(base, ['seqera/machineRequirement.diskType': 'ebs/gp3'])
then:
result.diskType == 'ebs/gp3'
}
def 'should overlay diskThroughputMiBps hint'() {
given:
def base = new MachineRequirementOpts([:])
when:
def result = HintHelper.overlayHints(base, ['seqera/machineRequirement.diskThroughputMiBps': 500])
then:
result.diskThroughputMiBps == 500
}
def 'should overlay diskIops hint'() {
given:
def base = new MachineRequirementOpts([:])
when:
def result = HintHelper.overlayHints(base, ['seqera/machineRequirement.diskIops': 10000])
then:
result.diskIops == 10000
}
def 'should overlay diskEncrypted hint as boolean'() {
given:
def base = new MachineRequirementOpts([:])
when:
def result = HintHelper.overlayHints(base, ['seqera/machineRequirement.diskEncrypted': true])
then:
result.diskEncrypted == true
}
def 'should overlay diskAllocation hint'() {
given:
def base = new MachineRequirementOpts([:])
when:
def result = HintHelper.overlayHints(base, ['seqera/machineRequirement.diskAllocation': 'node'])
then:
result.diskAllocation == 'node'
}
def 'should overlay diskMountPath hint'() {
given:
def base = new MachineRequirementOpts([:])
when:
def result = HintHelper.overlayHints(base, ['seqera/machineRequirement.diskMountPath': '/data'])
then:
result.diskMountPath == '/data'
}
def 'should overlay diskSize hint'() {
given:
def base = new MachineRequirementOpts([:])
when:
def result = HintHelper.overlayHints(base, ['seqera/machineRequirement.diskSize': '100.GB'])
then:
result.diskSize == MemoryUnit.of('100.GB')
}
def 'should overlay capacityMode hint'() {
given:
def base = new MachineRequirementOpts([:])
when:
def result = HintHelper.overlayHints(base, ['seqera/machineRequirement.capacityMode': 'asg'])
then:
result.capacityMode == 'asg'
}
def 'should overlay multiple hints at once'() {
given:
def base = new MachineRequirementOpts([provisioning: 'ondemand'])
when:
def result = HintHelper.overlayHints(base, [
'seqera/machineRequirement.provisioning': 'spotFirst',
'seqera/machineRequirement.maxSpotAttempts': 3,
'seqera/machineRequirement.diskType': 'ebs/gp3'
])
then:
result.provisioning == 'spotFirst'
result.maxSpotAttempts == 3
result.diskType == 'ebs/gp3'
}
def 'should preserve base values not overridden by hints'() {
given:
def base = new MachineRequirementOpts([provisioning: 'spot', diskType: 'ebs/gp3', diskMountPath: '/data'])
when:
def result = HintHelper.overlayHints(base, ['seqera/machineRequirement.diskType': 'ebs/io1'])
then:
result.provisioning == 'spot'
result.diskType == 'ebs/io1'
result.diskMountPath == '/data'
}
def 'should derive known keys from MachineRequirementOpts declared fields'() {
expect: 'KNOWN_KEYS covers every declared field of MachineRequirementOpts'
HintHelper.KNOWN_KEYS.size() > 0
for( final field : MachineRequirementOpts.declaredFields ) {
if( field.synthetic || java.lang.reflect.Modifier.isStatic(field.modifiers) || field.name.startsWith('$') || field.name == 'metaClass' )
continue
assert HintHelper.KNOWN_KEYS.contains("machineRequirement.${field.name}".toString())
}
}
def 'should error on unknown seqera hint'() {
when:
HintHelper.extractSeqeraHints(['seqera/machineRequirement.unknownField': 'value'])
then:
def e = thrown(IllegalArgumentException)
e.message.contains('Unknown Seqera Platform hint')
e.message.contains('seqera/machineRequirement.unknownField')
}
def 'should ignore non-seqera hints in extraction'() {
when:
def result = HintHelper.extractSeqeraHints([
consumableResources: 'my-license',
'k8s/scheduling.nodeSelector': 'gpu=true',
'seqera/machineRequirement.provisioning': 'spot'
])
then:
result.size() == 1
result['machineRequirement.provisioning'] == 'spot'
}
def 'should handle null hints map'() {
when:
def result = HintHelper.extractSeqeraHints(null)
then:
result.isEmpty()
}
def 'should accept unprefixed known keys'() {
when:
def result = HintHelper.extractSeqeraHints([
'machineRequirement.provisioning': 'spot',
'machineRequirement.diskType': 'ebs/gp3',
])
then:
result['machineRequirement.provisioning'] == 'spot'
result['machineRequirement.diskType'] == 'ebs/gp3'
}
def 'should give prefixed form precedence over unprefixed'() {
when:
def result = HintHelper.extractSeqeraHints([
'machineRequirement.provisioning': 'ondemand',
'seqera/machineRequirement.provisioning': 'spotFirst',
])
then:
result['machineRequirement.provisioning'] == 'spotFirst'
}
def 'should overlay unprefixed hint onto base opts'() {
given:
def base = new MachineRequirementOpts([provisioning: 'ondemand'])
when:
def result = HintHelper.overlayHints(base, ['machineRequirement.provisioning': 'spotFirst'])
then:
result.provisioning == 'spotFirst'
}
def 'should ignore unknown unprefixed keys'() {
when:
def result = HintHelper.extractSeqeraHints([
consumableResources: 'license-a=1',
somethingElse: 'x',
'machineRequirement.provisioning': 'spot',
])
then:
result.size() == 1
result['machineRequirement.provisioning'] == 'spot'
}
}

View File

@@ -0,0 +1,613 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seqera.util
import io.seqera.config.MachineRequirementOpts
import io.seqera.sched.api.schema.v1a1.DiskAllocation
import io.seqera.sched.api.schema.v1a1.EcsCapacityMode
import io.seqera.sched.api.schema.v1a1.PriceModel as SchedPriceModel
import io.seqera.sched.api.schema.v1a1.ProvisioningModel
import nextflow.cloud.types.PriceModel
import nextflow.fusion.FusionConfig
import nextflow.util.MemoryUnit
import spock.lang.Specification
/**
* Unit tests for SchemaMapper
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
class MapperUtilTest extends Specification {
def 'should return null for null opts' () {
expect:
SchemaMapperUtil.toMachineRequirement(null) == null
}
def 'should return null for empty opts' () {
expect:
SchemaMapperUtil.toMachineRequirement(new MachineRequirementOpts([:])) == null
}
def 'should map all fields' () {
when:
def result = SchemaMapperUtil.toMachineRequirement(new MachineRequirementOpts([
provisioning: 'spotFirst',
maxSpotAttempts: 3,
machineTypes: ['m5', 'c5']
]))
then:
result.provisioning == ProvisioningModel.SPOT_FIRST
result.maxSpotAttempts == 3
result.machineTypes == ['m5', 'c5']
}
def 'should map provisioning model' () {
expect:
SchemaMapperUtil.toProvisioningModel(null) == null
SchemaMapperUtil.toProvisioningModel('spot') == ProvisioningModel.SPOT
SchemaMapperUtil.toProvisioningModel('ondemand') == ProvisioningModel.ONDEMAND
SchemaMapperUtil.toProvisioningModel('spotFirst') == ProvisioningModel.SPOT_FIRST
}
def 'should map price model' () {
expect:
SchemaMapperUtil.toPriceModel(null) == null
SchemaMapperUtil.toPriceModel(SchedPriceModel.SPOT) == PriceModel.spot
SchemaMapperUtil.toPriceModel(SchedPriceModel.STANDARD) == PriceModel.standard
}
// tests for toMachineRequirement with task arch
def 'should return null when both opts and taskArch are null' () {
expect:
SchemaMapperUtil.toMachineRequirement(null, null) == null
}
def 'should use taskArch when opts is null' () {
when:
def result = SchemaMapperUtil.toMachineRequirement(null, 'arm64')
then:
result.arch == 'arm64'
result.provisioning == null
}
def 'should use taskArch with config settings' () {
when:
def result = SchemaMapperUtil.toMachineRequirement(
new MachineRequirementOpts([provisioning: 'spot']),
'arm64'
)
then:
result.arch == 'arm64'
result.provisioning == ProvisioningModel.SPOT
}
def 'should have null arch when taskArch is null' () {
when:
def result = SchemaMapperUtil.toMachineRequirement(
new MachineRequirementOpts([provisioning: 'spot']),
null
)
then:
result.arch == null
result.provisioning == ProvisioningModel.SPOT
}
def 'should merge config settings with taskArch' () {
when:
def result = SchemaMapperUtil.toMachineRequirement(
new MachineRequirementOpts([
provisioning: 'spotFirst',
maxSpotAttempts: 3,
machineTypes: ['m5', 'c5']
]),
'arm64'
)
then:
result.arch == 'arm64'
result.provisioning == ProvisioningModel.SPOT_FIRST
result.maxSpotAttempts == 3
result.machineTypes == ['m5', 'c5']
}
// tests for disk requirement mapping
def 'should return null disk requirement for null disk size' () {
expect:
SchemaMapperUtil.toDiskRequirement(null) == null
}
def 'should return null disk requirement for zero disk size' () {
expect:
SchemaMapperUtil.toDiskRequirement(MemoryUnit.of(0)) == null
}
def 'should map disk size to disk requirement with defaults' () {
when:
def result = SchemaMapperUtil.toDiskRequirement(MemoryUnit.of('100 GB'))
then: 'disk size is set'
result.sizeGiB == 100
and: 'allocation defaults to node'
result.allocation == DiskAllocation.NODE
and: 'node allocation does not set EBS options'
result.volumeType == null
result.throughputMiBps == null
result.encrypted == null
result.iops == null
}
def 'should map disk size in different units' () {
expect:
SchemaMapperUtil.toDiskRequirement(MemoryUnit.of('1 TB')).sizeGiB == 1024
SchemaMapperUtil.toDiskRequirement(MemoryUnit.of('50 GB')).sizeGiB == 50
and: 'defaults to node allocation'
SchemaMapperUtil.toDiskRequirement(MemoryUnit.of('1 TB')).allocation == DiskAllocation.NODE
}
def 'should include disk in machine requirement' () {
when:
def result = SchemaMapperUtil.toMachineRequirement(
new MachineRequirementOpts([:]),
'x86_64',
MemoryUnit.of('200 GB'),
false
)
then:
result.arch == 'x86_64'
result.disk != null
result.disk.sizeGiB == 200
}
def 'should return machine requirement with only disk' () {
when:
def result = SchemaMapperUtil.toMachineRequirement(null, null, MemoryUnit.of('100 GB'), false)
then:
result != null
result.arch == null
result.disk != null
result.disk.sizeGiB == 100
}
def 'should return null when no taskArch, no opts, and no disk' () {
expect:
SchemaMapperUtil.toMachineRequirement(null, null, null, false) == null
}
// tests for custom disk configuration options
def 'should throw exception for invalid disk type' () {
given:
def opts = new MachineRequirementOpts([diskAllocation: 'task', diskType: 'local/nvme'])
when:
SchemaMapperUtil.toDiskRequirement(MemoryUnit.of('100 GB'), opts)
then:
def e = thrown(IllegalArgumentException)
e.message.contains("Invalid disk type: local/nvme")
e.message.contains("Supported types:")
}
def 'should use custom disk type from config' () {
given:
def opts = new MachineRequirementOpts([diskAllocation: 'task', diskType: 'ebs/io1', diskIops: 10000])
when:
def result = SchemaMapperUtil.toDiskRequirement(MemoryUnit.of('100 GB'), opts)
then:
result.sizeGiB == 100
result.allocation == DiskAllocation.TASK
result.volumeType == 'ebs/io1'
result.iops == 10000
result.throughputMiBps == null // throughput only for gp3
}
def 'should use custom throughput from config' () {
given:
def opts = new MachineRequirementOpts([diskAllocation: 'task', diskThroughputMiBps: 500])
when:
def result = SchemaMapperUtil.toDiskRequirement(MemoryUnit.of('100 GB'), opts)
then:
result.allocation == DiskAllocation.TASK
result.volumeType == SchemaMapperUtil.DEFAULT_DISK_TYPE
result.throughputMiBps == 500
}
def 'should use encryption from config' () {
given:
def opts = new MachineRequirementOpts([diskAllocation: 'task', diskEncrypted: true])
when:
def result = SchemaMapperUtil.toDiskRequirement(MemoryUnit.of('100 GB'), opts)
then:
result.allocation == DiskAllocation.TASK
result.encrypted == true
}
def 'should use all disk options from config' () {
given:
def opts = new MachineRequirementOpts([
diskAllocation: 'task',
diskType: 'ebs/gp3',
diskThroughputMiBps: 600,
diskIops: 8000,
diskEncrypted: true
])
when:
def result = SchemaMapperUtil.toDiskRequirement(MemoryUnit.of('200 GB'), opts)
then:
result.sizeGiB == 200
result.allocation == DiskAllocation.TASK
result.volumeType == 'ebs/gp3'
result.throughputMiBps == 600
result.iops == 8000
result.encrypted == true
}
def 'should pass disk options through machine requirement' () {
given:
def opts = new MachineRequirementOpts([
diskAllocation: 'task',
diskType: 'ebs/io2',
diskIops: 15000,
diskEncrypted: true
])
when:
def result = SchemaMapperUtil.toMachineRequirement(opts, 'arm64', MemoryUnit.of('500 GB'), false)
then:
result.arch == 'arm64'
result.disk.sizeGiB == 500
result.disk.allocation == DiskAllocation.TASK
result.disk.volumeType == 'ebs/io2'
result.disk.iops == 15000
result.disk.encrypted == true
result.disk.throughputMiBps == null // io2 doesn't use throughput
}
// tests for disk allocation mapping
def 'should map disk allocation' () {
expect:
SchemaMapperUtil.toDiskAllocation(null) == null
SchemaMapperUtil.toDiskAllocation('task') == DiskAllocation.TASK
SchemaMapperUtil.toDiskAllocation('node') == DiskAllocation.NODE
SchemaMapperUtil.toDiskAllocation('nvme') == DiskAllocation.NVME
}
def 'should throw exception for invalid disk allocation' () {
when:
SchemaMapperUtil.toDiskAllocation('invalid')
then:
thrown(IllegalArgumentException)
}
def 'should use task disk allocation from config' () {
given:
def opts = new MachineRequirementOpts([diskAllocation: 'task'])
when:
def result = SchemaMapperUtil.toDiskRequirement(MemoryUnit.of('100 GB'), opts)
then:
result.allocation == DiskAllocation.TASK
}
def 'should use node disk allocation from config' () {
given:
def opts = new MachineRequirementOpts([diskAllocation: 'node'])
when:
def result = SchemaMapperUtil.toDiskRequirement(MemoryUnit.of('100 GB'), opts)
then:
result.allocation == DiskAllocation.NODE
result.sizeGiB == 100
result.volumeType == null // node allocation doesn't set EBS options
result.throughputMiBps == null
result.iops == null
result.encrypted == null
}
def 'should default to node disk allocation when not specified' () {
when:
def result = SchemaMapperUtil.toDiskRequirement(MemoryUnit.of('100 GB'))
then:
result.allocation == DiskAllocation.NODE
}
def 'should include disk allocation in machine requirement' () {
given:
def opts = new MachineRequirementOpts([
diskAllocation: 'node'
])
when:
def result = SchemaMapperUtil.toMachineRequirement(opts, 'x86_64', MemoryUnit.of('200 GB'), false)
then:
result.arch == 'x86_64'
result.disk.sizeGiB == 200
result.disk.allocation == DiskAllocation.NODE
}
// tests for node allocation validation
def 'should throw error when diskType is set with node allocation' () {
given:
def opts = new MachineRequirementOpts([
diskAllocation: 'node',
diskType: 'ebs/gp3'
])
when:
SchemaMapperUtil.toDiskRequirement(MemoryUnit.of('100 GB'), opts)
then:
def e = thrown(IllegalArgumentException)
e.message.contains('diskType')
e.message.contains('node')
}
def 'should throw error when diskIops is set with node allocation' () {
given:
def opts = new MachineRequirementOpts([
diskAllocation: 'node',
diskIops: 10000
])
when:
SchemaMapperUtil.toDiskRequirement(MemoryUnit.of('100 GB'), opts)
then:
def e = thrown(IllegalArgumentException)
e.message.contains('diskIops')
}
def 'should throw error when diskThroughputMiBps is set with node allocation' () {
given:
def opts = new MachineRequirementOpts([
diskAllocation: 'node',
diskThroughputMiBps: 500
])
when:
SchemaMapperUtil.toDiskRequirement(MemoryUnit.of('100 GB'), opts)
then:
def e = thrown(IllegalArgumentException)
e.message.contains('diskThroughputMiBps')
}
def 'should throw error when diskEncrypted is set with node allocation' () {
given:
def opts = new MachineRequirementOpts([
diskAllocation: 'node',
diskEncrypted: true
])
when:
SchemaMapperUtil.toDiskRequirement(MemoryUnit.of('100 GB'), opts)
then:
def e = thrown(IllegalArgumentException)
e.message.contains('diskEncrypted')
}
def 'should report all invalid options with node allocation' () {
given:
def opts = new MachineRequirementOpts([
diskAllocation: 'node',
diskType: 'ebs/io1',
diskIops: 10000,
diskEncrypted: true
])
when:
SchemaMapperUtil.toDiskRequirement(MemoryUnit.of('100 GB'), opts)
then:
def e = thrown(IllegalArgumentException)
e.message.contains('diskType')
e.message.contains('diskIops')
e.message.contains('diskEncrypted')
}
// tests for snapshot maxSpotAttempts defaulting
def 'should return machine requirement with only snapshot enabled' () {
when:
def result = SchemaMapperUtil.toMachineRequirement(null, null, null, true)
then:
result != null
result.snapshotEnabled == true
result.maxSpotAttempts == FusionConfig.DEFAULT_SNAPSHOT_MAX_SPOT_ATTEMPTS
}
def 'should use explicit maxSpotAttempts when snapshot enabled' () {
when:
def result = SchemaMapperUtil.toMachineRequirement(new MachineRequirementOpts([maxSpotAttempts: 2]), null, null, true)
then:
result.snapshotEnabled == true
result.maxSpotAttempts == 2
}
def 'should not default maxSpotAttempts when snapshot disabled' () {
when:
def result = SchemaMapperUtil.toMachineRequirement(new MachineRequirementOpts([:]), 'x86_64', null, false)
then:
result.snapshotEnabled == null
result.maxSpotAttempts == null
}
// tests for capacity mode mapping
def 'should map capacity mode' () {
expect:
SchemaMapperUtil.toEcsCapacityMode(null) == null
SchemaMapperUtil.toEcsCapacityMode('managed') == EcsCapacityMode.MANAGED
SchemaMapperUtil.toEcsCapacityMode('asg') == EcsCapacityMode.ASG
}
def 'should throw exception for invalid capacity mode' () {
when:
SchemaMapperUtil.toEcsCapacityMode('invalid')
then:
thrown(IllegalArgumentException)
}
def 'should include capacity mode in machine requirement' () {
when:
def result = SchemaMapperUtil.toMachineRequirement(new MachineRequirementOpts([capacityMode: 'asg']))
then:
result != null
result.capacityMode == EcsCapacityMode.ASG
}
def 'should include capacity mode in machine requirement with task arch' () {
when:
def result = SchemaMapperUtil.toMachineRequirement(
new MachineRequirementOpts([capacityMode: 'managed']),
'arm64',
null,
false
)
then:
result.arch == 'arm64'
result.capacityMode == EcsCapacityMode.MANAGED
}
def 'should combine snapshot with other machine requirement settings' () {
when:
def result = SchemaMapperUtil.toMachineRequirement(
new MachineRequirementOpts([provisioning: 'spot']),
'arm64',
MemoryUnit.of('100 GB'),
true
)
then:
result.arch == 'arm64'
result.provisioning == ProvisioningModel.SPOT
result.disk.sizeGiB == 100
result.snapshotEnabled == true
result.maxSpotAttempts == FusionConfig.DEFAULT_SNAPSHOT_MAX_SPOT_ATTEMPTS
}
// tests for NVMe disk allocation
def 'should create nvme disk requirement without disk size' () {
given:
def opts = new MachineRequirementOpts([diskAllocation: 'nvme'])
when:
def result = SchemaMapperUtil.toDiskRequirement(null, opts)
then:
result != null
result.allocation == DiskAllocation.NVME
result.sizeGiB == 0
result.volumeType == null
result.throughputMiBps == null
result.iops == null
result.encrypted == null
}
def 'should create nvme disk requirement with disk size (ignored)' () {
given:
def opts = new MachineRequirementOpts([diskAllocation: 'nvme'])
when:
def result = SchemaMapperUtil.toDiskRequirement(MemoryUnit.of('100 GB'), opts)
then:
result != null
result.allocation == DiskAllocation.NVME
result.sizeGiB == 100
}
def 'should include nvme disk in machine requirement' () {
given:
def opts = new MachineRequirementOpts([diskAllocation: 'nvme', capacityMode: 'asg'])
when:
def result = SchemaMapperUtil.toMachineRequirement(opts, 'x86_64', null, false)
then:
result != null
result.arch == 'x86_64'
result.disk != null
result.disk.allocation == DiskAllocation.NVME
result.disk.sizeGiB == 0
}
def 'should set diskMountPath for all allocation types' () {
given:
def opts = new MachineRequirementOpts([diskAllocation: ALLOCATION, diskMountPath: '/data'])
when:
def result = SchemaMapperUtil.toDiskRequirement(MemoryUnit.of('100 GB'), opts)
then:
result.allocation == EXPECTED_ALLOCATION
result.mountPath == '/data'
where:
ALLOCATION | EXPECTED_ALLOCATION
'node' | DiskAllocation.NODE
'task' | DiskAllocation.TASK
'nvme' | DiskAllocation.NVME
}
def 'should throw error when EBS options are set with nvme allocation' () {
given:
def opts = new MachineRequirementOpts([diskAllocation: 'nvme', diskType: 'ebs/gp3'])
when:
SchemaMapperUtil.toDiskRequirement(null, opts)
then:
def e = thrown(IllegalArgumentException)
e.message.contains('diskType')
e.message.contains('nvme')
}
}