diff --git a/docs/google.md b/docs/google.md index 806491263d..36a8846598 100644 --- a/docs/google.md +++ b/docs/google.md @@ -86,6 +86,8 @@ Notes: Read the {ref}`Google configuration` section to learn more about advanced configuration options. +(google-batch-process)= + ### Process definition Processes can be defined as usual and by default the `cpus` and `memory` directives are used to find the cheapest machine type available at current location that fits the requested resources. If `memory` is not specified, 1GB of memory is allocated per cpu. @@ -127,6 +129,24 @@ process predefined_resources_task { } ``` +:::{versionadded} 23.06.0-edge +::: + +The `disk` directive can be used to set the boot disk size or provision a disk for scratch storage. If the disk type is specified with the `type` option, a new disk will be mounted to the task VM at `/tmp` with the requested size and type. Otherwise, it will set the boot disk size, overriding the `google.batch.bootDiskSize` config option. See the [Google Batch documentation](https://cloud.google.com/compute/docs/disks) for more information about the available disk types. + +Examples: + +```groovy +// set the boot disk size +disk 100.GB + +// mount a persistent disk at '/tmp' +disk 100.GB, type: 'pd-standard' + +// mount a local SSD disk at '/tmp' (should be a multiple of 375 GB) +disk 375.GB, type: 'local-ssd' +``` + ### Pipeline execution The pipeline can be launched either in a local computer or a cloud instance. Pipeline input data can be stored either locally or in a Google Storage bucket. @@ -182,10 +202,13 @@ tower.accessToken = '' The [Tower](https://cloud.tower.nf) access token is optional, but it enables higher API rate limits for the {ref}`wave-page` service required by Fusion. -:::{tip} -When Fusion is enabled, by default, only machine types that can attach local SSD disks will be used. If you specify your own machine type or machine series, they should be able to attach local SSD disks, otherwise the job scheduling will fail. +By default, Fusion mounts a local SSD disk to the VM at `/tmp`, using a machine type that can attach local SSD disks. If you specify your own machine type or machine series, they should be able to attach local SSD disks, otherwise the job scheduling will fail. + +:::{versionadded} 23.06.0-edge ::: +The `disk` directive can be used to override the disk requested by Fusion. See the {ref}`Process definition ` section above for examples. Note that local SSD disks must be a multiple of 375 GB in size, otherwise the size will be increased to the next multiple of 375 GB. + ### Supported directives The integration with Google Batch is a developer preview feature. Currently, the following Nextflow directives are supported: diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/res/DiskResource.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/res/DiskResource.groovy new file mode 100644 index 0000000000..3ba0104cb1 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/executor/res/DiskResource.groovy @@ -0,0 +1,60 @@ +/* + * Copyright 2013-2023, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.executor.res + +import groovy.transform.CompileStatic +import groovy.transform.EqualsAndHashCode +import groovy.transform.ToString +import nextflow.util.MemoryUnit + +/** + * Models disk resource request + * + * @author Ben Sherman + */ +@ToString(includeNames = true, includePackage = false) +@CompileStatic +@EqualsAndHashCode +class DiskResource { + + final MemoryUnit request + final String type + + DiskResource( value ) { + this(request: value) + } + + DiskResource( Map opts ) { + this.request = toMemoryUnit(opts.request) + + if( opts.type ) + this.type = opts.type as String + } + + private static MemoryUnit toMemoryUnit( value ) { + if( value instanceof MemoryUnit ) + return (MemoryUnit)value + + try { + return new MemoryUnit(value.toString().trim()) + } + catch( Exception e ) { + throw new IllegalArgumentException("Not a valid disk value: $value") + } + } + +} diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy index e49dc12775..cf88eab3af 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy @@ -29,6 +29,7 @@ import nextflow.exception.AbortOperationException import nextflow.exception.FailedGuardException import nextflow.executor.BashWrapperBuilder import nextflow.executor.res.AcceleratorResource +import nextflow.executor.res.DiskResource import nextflow.k8s.model.PodOptions import nextflow.script.TaskClosure import nextflow.util.CmdLineHelper @@ -252,21 +253,20 @@ class TaskConfig extends LazyMap implements Cloneable { } } - MemoryUnit getDisk() { + DiskResource getDiskResource() { def value = get('disk') - if( !value ) - return null + if( value instanceof Map ) + return new DiskResource((Map)value) - if( value instanceof MemoryUnit ) - return (MemoryUnit)value + if( value != null ) + return new DiskResource(value) - try { - new MemoryUnit(value.toString().trim()) - } - catch( Exception e ) { - throw new AbortOperationException("Not a valid 'disk' value in process definition: $value") - } + return null + } + + MemoryUnit getDisk() { + getDiskResource()?.getRequest() } Duration getTime() { diff --git a/modules/nextflow/src/main/groovy/nextflow/script/ProcessConfig.groovy b/modules/nextflow/src/main/groovy/nextflow/script/ProcessConfig.groovy index 483adfa827..5e3ddaa42c 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/ProcessConfig.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/ProcessConfig.groovy @@ -932,6 +932,42 @@ class ProcessConfig implements Map, Cloneable { return this } + /** + * Allow user to specify `disk` directive as a value with a list of options, eg: + * + * disk 375.GB, type: 'local-ssd' + * + * @param opts + * A map representing the disk options + * @param value + * The default disk value + * @return + * The {@link ProcessConfig} instance itself + */ + ProcessConfig disk( Map opts, value ) { + opts.request = value + return disk(opts) + } + + /** + * Allow user to specify `disk` directive as a value or a list of options, eg: + * + * disk 100.GB + * disk request: 375.GB, type: 'local-ssd' + * + * @param value + * The default disk value or map of options + * @return + * The {@link ProcessConfig} instance itself + */ + ProcessConfig disk( value ) { + if( value instanceof Map || value instanceof Closure ) + configProperties.put('disk', value) + else + configProperties.put('disk', [request: value]) + return this + } + ProcessConfig arch( Map params, value ) { if( value instanceof String ) { if( params.name==null ) diff --git a/modules/nextflow/src/test/groovy/nextflow/executor/res/DiskResourceTest.groovy b/modules/nextflow/src/test/groovy/nextflow/executor/res/DiskResourceTest.groovy new file mode 100644 index 0000000000..18f1e4b562 --- /dev/null +++ b/modules/nextflow/src/test/groovy/nextflow/executor/res/DiskResourceTest.groovy @@ -0,0 +1,46 @@ +/* + * Copyright 2013-2023, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.executor.res + +import nextflow.util.MemoryUnit +import spock.lang.Specification + +/** + * + * @author Ben Sherman + */ +class DiskResourceTest extends Specification { + + static final _100_GB = MemoryUnit.of('100GB') + static final _375_GB = MemoryUnit.of('375GB') + + def 'should create a disk resource' () { + + when: + def disk = new DiskResource(VALUE) + then: + disk.request == REQ + disk.type == TYPE + + where: + VALUE | REQ | TYPE + _100_GB | _100_GB | null + [request: _100_GB] | _100_GB | null + [request: _375_GB, type: 'local-ssd'] | _375_GB | 'local-ssd' + } + +} \ No newline at end of file diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/TaskConfigTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/TaskConfigTest.groovy index 3a07c530d6..d290723992 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/TaskConfigTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/TaskConfigTest.groovy @@ -261,6 +261,7 @@ class TaskConfigTest extends Specification { then: config.disk == expected config.getDisk() == expected + config.getDiskResource()?.getRequest() == expected where: expected || value diff --git a/modules/nextflow/src/test/groovy/nextflow/script/ProcessConfigTest.groovy b/modules/nextflow/src/test/groovy/nextflow/script/ProcessConfigTest.groovy index a056e09fae..5fc7f31367 100644 --- a/modules/nextflow/src/test/groovy/nextflow/script/ProcessConfigTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/script/ProcessConfigTest.groovy @@ -678,6 +678,27 @@ class ProcessConfigTest extends Specification { process.accelerator == [request: 1, limit:5] } + def 'should apply disk config' () { + + given: + def process = new ProcessConfig(Mock(BaseScript)) + + when: + process.disk '100 GB' + then: + process.disk == [request: '100 GB'] + + when: + process.disk '375 GB', type: 'local-ssd' + then: + process.disk == [request: '375 GB', type: 'local-ssd'] + + when: + process.disk request: '375 GB', type: 'local-ssd' + then: + process.disk == [request: '375 GB', type: 'local-ssd'] + } + def 'should apply architecture config' () { given: diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchFusionAdapter.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchFusionAdapter.groovy index 36f97afe01..84ddd86486 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchFusionAdapter.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchFusionAdapter.groovy @@ -47,12 +47,7 @@ class GoogleBatchFusionAdapter implements GoogleBatchLauncherSpec { @Override List getVolumes() { - return [ - Volume.newBuilder() - .setDeviceName('fusion') - .setMountPath('/tmp') - .build() - ] + return List.of() } @Override diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchMachineTypeSelector.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchMachineTypeSelector.groovy index 1ac1b8e8e3..2e21d54042 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchMachineTypeSelector.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchMachineTypeSelector.groovy @@ -16,10 +16,14 @@ */ package nextflow.cloud.google.batch +import java.math.RoundingMode + import groovy.json.JsonSlurper import groovy.transform.CompileStatic import groovy.transform.Immutable import groovy.transform.Memoized +import nextflow.cloud.types.PriceModel +import nextflow.util.MemoryUnit /** * Choose best machine type that fits the requested resources and @@ -77,14 +81,16 @@ class GoogleBatchMachineTypeSelector { static class MachineType { String type String family + String location float spotPrice float onDemandPrice int cpusPerVm int memPerVm + PriceModel priceModel } - String bestMachineType(int cpus, int memoryMB, String region, boolean spot, boolean fusionEnabled, List families) { - final machineTypes = getAvailableMachineTypes(region) + MachineType bestMachineType(int cpus, int memoryMB, String region, boolean spot, boolean fusionEnabled, List families) { + final machineTypes = getAvailableMachineTypes(region, spot) if (families == null) families = Collections.emptyList() @@ -92,10 +98,11 @@ class GoogleBatchMachineTypeSelector { if (families.size() == 1) { final familyOrType = families.get(0) if (familyOrType.contains("custom-")) - return familyOrType + return new MachineType(type: familyOrType, family: 'custom', cpusPerVm: cpus, memPerVm: memoryMB, location: region, priceModel: spot ? PriceModel.spot : PriceModel.standard) - if (machineTypes.find { it.type == familyOrType }) - return familyOrType + final machineType = machineTypes.find { it.type == familyOrType } + if( machineType ) + return machineType } final memoryGB = Math.ceil(memoryMB / 1024.0 as float) as int @@ -120,7 +127,7 @@ class GoogleBatchMachineTypeSelector { (it.cpusPerVm > 2 || it.memPerVm > 2 ? FAMILY_COST_CORRECTION.get(it.family, 1.0) : 1.0) * (spot ? it.spotPrice : it.onDemandPrice) } - return sortedByCost.first().type + return sortedByCost.first() } protected boolean matchType(String family, String vmType) { @@ -135,7 +142,8 @@ class GoogleBatchMachineTypeSelector { } @Memoized - protected List getAvailableMachineTypes(String region) { + protected List getAvailableMachineTypes(String region, boolean spot) { + final priceModel = spot ? PriceModel.spot : PriceModel.standard final json = "${CLOUD_INFO_API}/providers/google/services/compute/regions/${region}/products".toURL().text final data = new JsonSlurper().parseText(json) final products = data['products'] as List @@ -148,8 +156,98 @@ class GoogleBatchMachineTypeSelector { spotPrice: averageSpotPrice(it.spotPrice as List), onDemandPrice: it.onDemandPrice as float, cpusPerVm: it.cpusPerVm as int, - memPerVm: it.memPerVm as int + memPerVm: it.memPerVm as int, + location: region, + priceModel: priceModel ) } } + + /** + * Find valid local SSD size. See: https://cloud.google.com/compute/docs/disks#local_ssd_machine_type_restrictions + * + * @param requested Amount of disk requested + * @param machineType Machine type + * @return Next greater multiple of 375 GB that is a valid size for the given machine type + */ + protected MemoryUnit findValidLocalSSDSize(MemoryUnit requested, MachineType machineType) { + + if( machineType.family == "n1" ) + return findFirstValidSize(requested, [1,2,3,4,5,6,7,8,16,24]) + + if( machineType.family == "n2" ) { + if( machineType.cpusPerVm < 12 ) + return findFirstValidSize(requested, [1,2,4,8,16,24]) + if( machineType.cpusPerVm < 22 ) + return findFirstValidSize(requested, [2,4,8,16,24]) + if( machineType.cpusPerVm < 42 ) + return findFirstValidSize(requested, [4,8,16,24]) + if( machineType.cpusPerVm < 82 ) + return findFirstValidSize(requested, [8,16,24]) + return findFirstValidSize(requested, [16,24]) + } + + if( machineType.family == "n2d" ) { + if( machineType.cpusPerVm < 32 ) + return findFirstValidSize(requested, [1,2,4,8,16,24]) + if( machineType.cpusPerVm < 64 ) + return findFirstValidSize(requested, [2,4,8,16,24]) + if( machineType.cpusPerVm < 96 ) + return findFirstValidSize(requested, [4,8,16,24]) + return findFirstValidSize(requested, [8,16,24]) + } + + if( machineType.family == "c2" ) { + if( machineType.cpusPerVm < 16 ) + return findFirstValidSize(requested, [1,2,4,8]) + if( machineType.cpusPerVm < 30 ) + return findFirstValidSize(requested, [2,4,8]) + if( machineType.cpusPerVm < 60 ) + return findFirstValidSize(requested, [4,8]) + return findFirstValidSize(requested, [8]) + } + + if( machineType.family == "c2d" ) { + if( machineType.cpusPerVm < 32 ) + return findFirstValidSize(requested, [1,2,4,8]) + if( machineType.cpusPerVm < 56 ) + return findFirstValidSize(requested, [2,4,8]) + if( machineType.cpusPerVm < 112 ) + return findFirstValidSize(requested, [4,8]) + return findFirstValidSize(requested, [8]) + } + + if( machineType.family == "m3" ) { + if ( machineType.type == 'm3-megamem-128' || machineType.type == 'm3-ultramem-128' ) + return findFirstValidSize(requested, [8]) + return findFirstValidSize(requested, [4,8]) + } + + // other special families the user must provide a valid size + return requested + } + + /** + * Find first valid disk size given the possible mounted partition + * + * @param requested Requested disk size + * @param allowedPartitions Valid number of disks of 375.GB. + * @return + */ + protected MemoryUnit findFirstValidSize(MemoryUnit requested, List allowedPartitions) { + + // Sort the possible number of disks + allowedPartitions.sort() + + // Minimum number of 375.GB disks to fulfill the requested size + final disks = (requested.toGiga() / 375).setScale(0, RoundingMode.UP).toInteger() + + // Find first valid number of disks + def numberOfDisks = allowedPartitions.find { it >= disks} + if( !numberOfDisks ) + numberOfDisks = allowedPartitions.last() + + return new MemoryUnit( numberOfDisks * 375L * (1<<30) ) + } + } diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy index a76a79c322..4fc9a1290c 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy @@ -17,9 +17,6 @@ package nextflow.cloud.google.batch -import nextflow.cloud.types.CloudMachineInfo -import nextflow.cloud.types.PriceModel -import nextflow.processor.TaskConfig import java.nio.file.Path @@ -32,21 +29,24 @@ import com.google.cloud.batch.v1.Runnable import com.google.cloud.batch.v1.ServiceAccount import com.google.cloud.batch.v1.TaskGroup import com.google.cloud.batch.v1.TaskSpec +import com.google.cloud.batch.v1.Volume import com.google.protobuf.Duration import groovy.transform.CompileStatic import groovy.transform.PackageScope import groovy.util.logging.Slf4j import nextflow.cloud.google.batch.client.BatchClient +import nextflow.cloud.types.CloudMachineInfo +import nextflow.cloud.types.PriceModel import nextflow.exception.ProcessUnrecoverableException import nextflow.executor.BashWrapperBuilder +import nextflow.executor.res.DiskResource import nextflow.fusion.FusionAwareTask import nextflow.fusion.FusionScriptLauncher +import nextflow.processor.TaskConfig import nextflow.processor.TaskHandler import nextflow.processor.TaskRun import nextflow.processor.TaskStatus import nextflow.trace.TraceRecord - - /** * Implements a task handler for Google Batch executor * @@ -135,7 +135,7 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask { final resp = client.submitJob(jobId, req) this.uid = resp.getUid() this.status = TaskStatus.SUBMITTED - log.debug "[GOOGLE BATCH] submitted > job=$jobId; uid=$uid; work-dir=${task.getWorkDirStr()}" + log.debug "[GOOGLE BATCH] Process `${task.lazyName()}` submitted > job=$jobId; uid=$uid; work-dir=${task.getWorkDirStr()}" } protected Job newSubmitRequest(TaskRun task, GoogleBatchLauncherSpec launcher) { @@ -154,9 +154,13 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask { .setSeconds( task.config.getTime().toSeconds() ) ) - final disk = task.config.getDisk() ?: executor.config.bootDiskSize - if( disk ) - computeResource.setBootDiskMib( disk.getMega() ) + def disk = task.config.getDiskResource() + // apply disk directive to boot disk if type is not specified + if( disk && !disk.type ) + computeResource.setBootDiskMib( disk.request.getMega() ) + // otherwise use config setting + else if( executor.config.bootDiskSize ) + computeResource.setBootDiskMib( executor.config.bootDiskSize.getMega() ) // container if( !task.container ) @@ -225,12 +229,52 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask { instancePolicyOrTemplate.setInstallGpuDrivers(true) } - if( executor.config.cpuPlatform ) + if( fusionEnabled() && !disk ) { + disk = new DiskResource(request: '375 GB', type: 'local-ssd') + log.debug "[GOOGLE BATCH] Process `${task.lazyName()}` - adding local volume as fusion scratch: $disk" + } + + if( executor.config.cpuPlatform ) { instancePolicy.setMinCpuPlatform( executor.config.cpuPlatform ) + } + + final machineType = findBestMachineType(task.config, disk?.type == 'local-ssd') + if( machineType ) { + instancePolicy.setMachineType(machineType.type) + machineInfo = new CloudMachineInfo( + type: machineType.type, + zone: machineType.location, + priceModel: machineType.priceModel + ) + } - machineInfo = findBestMachineType(task.config) - if( machineInfo ) - instancePolicy.setMachineType(machineInfo.type) + // When using local SSD not all the disk sizes are valid and depends on the machine type + if( disk?.type == 'local-ssd' && machineType ) { + final validSize = GoogleBatchMachineTypeSelector.INSTANCE.findValidLocalSSDSize(disk.request, machineType) + if( validSize != disk.request ) { + disk = new DiskResource(request: validSize, type: 'local-ssd') + log.debug "[GOOGLE BATCH] Process `${task.lazyName()}` - adjusting local disk size to: $validSize" + } + } + + // use disk directive for an attached disk if type is specified + if( disk?.type ) { + instancePolicy.addDisks( + AllocationPolicy.AttachedDisk.newBuilder() + .setNewDisk( + AllocationPolicy.Disk.newBuilder() + .setType(disk.type) + .setSizeGb(disk.request.toGiga()) + ) + .setDeviceName('scratch') + ) + + taskSpec.addVolumes( + Volume.newBuilder() + .setDeviceName('scratch') + .setMountPath('/tmp') + ) + } if( executor.config.serviceAccountEmail ) allocationPolicy.setServiceAccount( @@ -244,17 +288,6 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask { if( executor.config.spot ) instancePolicy.setProvisioningModel( AllocationPolicy.ProvisioningModel.SPOT ) - // Fusion configuration - if( fusionEnabled() ) { - instancePolicy.addDisks(AllocationPolicy.AttachedDisk.newBuilder() - .setNewDisk(AllocationPolicy.Disk.newBuilder() - .setType("local-ssd") - .setSizeGb(375) - ) - .setDeviceName("fusion") - ) - } - allocationPolicy.addInstances( instancePolicyOrTemplate .setPolicy(instancePolicy) @@ -344,7 +377,7 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask { boolean checkIfCompleted() { final state = getJobState() if( state in TERMINATED ) { - log.debug "[GOOGLE BATCH] Terminated job=$jobId; state=$state" + log.debug "[GOOGLE BATCH] Process `${task.lazyName()}` - terminated job=$jobId; state=$state" // finalize the task task.exitStatus = readExitFile() if( state == 'FAILED' ) { @@ -367,7 +400,7 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask { exitFile.text as Integer } catch (Exception e) { - log.debug "[GOOGLE BATCH] Cannot read exitstatus for task: `$task.name` | ${e.message}" + log.debug "[GOOGLE BATCH] Cannot read exit status for task: `${task.lazyName()}` - ${e.message}" // return MAX_VALUE to signal it was unable to retrieve the exit code return Integer.MAX_VALUE } @@ -376,11 +409,11 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask { @Override void kill() { if( isSubmitted() ) { - log.trace "[GOOGLE BATCH] deleting job name=$jobId" + log.trace "[GOOGLE BATCH] Process `${task.lazyName()}` - deleting job name=$jobId" client.deleteJob(jobId) } else { - log.debug "[GOOGLE BATCH] Oops.. invalid delete action" + log.debug "[GOOGLE BATCH] Process `${task.lazyName()}` - invalid delete action" } } @@ -398,7 +431,7 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask { return result } - protected CloudMachineInfo findBestMachineType(TaskConfig config) { + protected GoogleBatchMachineTypeSelector.MachineType findBestMachineType(TaskConfig config, boolean localSSD) { final location = client.location final cpus = config.getCpus() final memory = config.getMemory() ? config.getMemory().toMega().toInteger() : 1024 @@ -407,20 +440,16 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask { final priceModel = spot ? PriceModel.spot : PriceModel.standard try { - return new CloudMachineInfo( - type: GoogleBatchMachineTypeSelector.INSTANCE.bestMachineType(cpus, memory, location, spot, fusionEnabled(), families), - zone: location, - priceModel: priceModel - ) + return GoogleBatchMachineTypeSelector.INSTANCE.bestMachineType(cpus, memory, location, spot, localSSD, families) } catch (Exception e) { - log.debug "[GOOGLE BATCH] Cannot select machine type using cloud info for task: `$task.name` | ${e.message}" + log.debug "[GOOGLE BATCH] Cannot select machine type using cloud info for task: `${task.lazyName()}` - ${e.message}" // Check if a specific machine type was provided by the user if( config.getMachineType() && !config.getMachineType().contains(',') && !config.getMachineType().contains('*') ) - return new CloudMachineInfo( + return new GoogleBatchMachineTypeSelector.MachineType( type: config.getMachineType(), - zone: location, + location: location, priceModel: priceModel ) diff --git a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchMachineTypeSelectorTest.groovy b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchMachineTypeSelectorTest.groovy index aa5bfe50e1..fa91093496 100644 --- a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchMachineTypeSelectorTest.groovy +++ b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchMachineTypeSelectorTest.groovy @@ -1,6 +1,8 @@ package nextflow.cloud.google.batch import nextflow.cloud.google.batch.GoogleBatchMachineTypeSelector.MachineType +import nextflow.cloud.types.PriceModel +import nextflow.util.MemoryUnit import spock.lang.Specification class GoogleBatchMachineTypeSelectorTest extends Specification { @@ -21,24 +23,24 @@ class GoogleBatchMachineTypeSelectorTest extends Specification { def 'should select best machine type'() { given: final selector = Spy(GoogleBatchMachineTypeSelector) { - getAvailableMachineTypes(REGION) >> MACHINE_TYPES + getAvailableMachineTypes(REGION, SPOT) >> MACHINE_TYPES } expect: - selector.bestMachineType(CPUS, MEM, REGION, SPOT, FUSION, FAMILIES) == EXPECTED + selector.bestMachineType(CPUS, MEM, REGION, SPOT, FUSION, FAMILIES).type == EXPECTED where: - CPUS | MEM | REGION | SPOT | FUSION | FAMILIES | EXPECTED - 1 | 1000 | 'reg' | true | false | null | 'e2-type01' - 1 | 1000 | 'reg' | false | true | null | 'n1-type02' - 4 | 4000 | 'reg' | false | false | [] | 'e2-type03' - 4 | 4000 | 'reg' | true | false | [] | 'n2-type04' - 4 | 4000 | 'reg' | false | true | [] | 'n2-type04' - 6 | 6000 | 'reg' | true | false | null | 'e2-type05' - 6 | 6000 | 'reg' | true | true | null | 'n1-type06' - 6 | 6000 | 'reg' | true | false | ['n1-*', 'm1-*'] | 'n1-type06' - 8 | 8000 | 'reg' | true | false | null | 'm1-type07' - 8 | 8000 | 'reg' | false | false | ['m?-*', 'c2-*'] | 'm2-type08' - 8 | 8000 | 'reg' | false | false | ['m1-type07', 'm2-type66'] | 'm1-type07' + CPUS | MEM | REGION | SPOT | FUSION | FAMILIES | EXPECTED + 1 | 1000 | 'reg' | true | false | null | 'e2-type01' + 1 | 1000 | 'reg' | false | true | null | 'n1-type02' + 4 | 4000 | 'reg' | false | false | [] | 'e2-type03' + 4 | 4000 | 'reg' | true | false | [] | 'n2-type04' + 4 | 4000 | 'reg' | false | true | [] | 'n2-type04' + 6 | 6000 | 'reg' | true | false | null | 'e2-type05' + 6 | 6000 | 'reg' | true | true | null | 'n1-type06' + 6 | 6000 | 'reg' | true | false | ['n1-*', 'm1-*'] | 'n1-type06' + 8 | 8000 | 'reg' | true | false | null | 'm1-type07' + 8 | 8000 | 'reg' | false | false | ['m?-*', 'c2-*'] | 'm2-type08' + 8 | 8000 | 'reg' | false | false | ['m1-type07', 'm2-type66'] | 'm1-type07' } @@ -46,7 +48,7 @@ class GoogleBatchMachineTypeSelectorTest extends Specification { def 'should not select a machine type'() { given: final selector = Spy(GoogleBatchMachineTypeSelector) { - getAvailableMachineTypes(REGION) >> MACHINE_TYPES + getAvailableMachineTypes(REGION, SPOT) >> MACHINE_TYPES } when: selector.bestMachineType(CPUS, MEM, REGION, SPOT, SSD, FAMILIES) @@ -63,9 +65,36 @@ class GoogleBatchMachineTypeSelectorTest extends Specification { def 'should parse Seqera cloud info API'() { when: - GoogleBatchMachineTypeSelector.INSTANCE.getAvailableMachineTypes("europe-west2") + GoogleBatchMachineTypeSelector.INSTANCE.getAvailableMachineTypes("europe-west2", true) then: noExceptionThrown() } + + def 'should find first valid disk size'() { + expect: + GoogleBatchMachineTypeSelector.INSTANCE.findFirstValidSize(MemoryUnit.of(REQUESTED), ALLOWED) == MemoryUnit.of(EXPECTED) + + where: + REQUESTED | ALLOWED | EXPECTED + '100 GB' | [2, 4, 8] | '750 GB' + '100 GB' | [1, 2, 4] | '375 GB' + '500 GB' | [1, 2] | '750 GB' + '1 TB' | [1, 2] | '750 GB' + } + + def 'should find valid local disk size given the machine type'() { + expect: + final machineType = new MachineType(type: TYPE, family: FAMILY, cpusPerVm: CPUS) + GoogleBatchMachineTypeSelector.INSTANCE.findValidLocalSSDSize(MemoryUnit.of(REQUESTED), machineType) == MemoryUnit.of(EXPECTED) + + where: + REQUESTED | TYPE | FAMILY | CPUS | EXPECTED + '100 GB' | 'n1-highmem-8' | 'n1' | 8 | '375 GB' + '375 GB' | 'n2-highcpu-16' | 'n2' | 16 | '750 GB' + '780 GB' | 'n2d-standard-48' | 'n2d' | 48 | '1500 GB' + '200 GB' | 'c2-standard-4' | 'c2' | 4 | '375 GB' + '50 GB' | 'c2d-highmem-56' | 'c2d' | 56 | '1500 GB' + '750 GB' | 'm3-megamem-64' | 'm3' | 64 | '1500 GB' + } } diff --git a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy index d3b47e87c2..846969f971 100644 --- a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy +++ b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy @@ -26,6 +26,7 @@ import nextflow.cloud.types.CloudMachineInfo import nextflow.cloud.types.PriceModel import nextflow.executor.Executor import nextflow.executor.res.AcceleratorResource +import nextflow.executor.res.DiskResource import nextflow.processor.TaskBean import nextflow.processor.TaskConfig import nextflow.processor.TaskProcessor @@ -73,7 +74,7 @@ class GoogleBatchTaskHandlerTest extends Specification { def req = handler.newSubmitRequest(task, launcher) then: handler.fusionEnabled() >> false - handler.findBestMachineType(_) >> null + handler.findBestMachineType(_, false) >> null and: def taskGroup = req.getTaskGroups(0) @@ -92,6 +93,7 @@ class GoogleBatchTaskHandlerTest extends Specification { runnable.getContainer().getVolumesList() == ['/mnt/disks/foo/scratch:/mnt/disks/foo/scratch:rw'] and: instancePolicy.getAcceleratorsCount() == 0 + instancePolicy.getDisksCount() == 0 instancePolicy.getMachineType() == '' instancePolicy.getMinCpuPlatform() == '' instancePolicy.getProvisioningModel().toString() == 'PROVISIONING_MODEL_UNSPECIFIED' @@ -115,7 +117,7 @@ class GoogleBatchTaskHandlerTest extends Specification { def CONTAINER_OPTS = '--this --that' def CPU_PLATFORM = 'Intel Skylake' def CPUS = 4 - def DISK = MemoryUnit.of('50 GB') + def DISK = new DiskResource(request: '100 GB', type: 'pd-standard') def MACHINE_TYPE = 'vm-type-2' def MEM = MemoryUnit.of('8 GB') def TIMEOUT = Duration.of('1 hour') @@ -143,7 +145,7 @@ class GoogleBatchTaskHandlerTest extends Specification { getAccelerator() >> ACCELERATOR getContainerOptions() >> CONTAINER_OPTS getCpus() >> CPUS - getDisk() >> DISK + getDiskResource() >> DISK getMachineType() >> MACHINE_TYPE getMemory() >> MEM getTime() >> TIMEOUT @@ -161,7 +163,7 @@ class GoogleBatchTaskHandlerTest extends Specification { def req = handler.newSubmitRequest(task, launcher) then: handler.fusionEnabled() >> false - handler.findBestMachineType(_) >> new CloudMachineInfo(type: MACHINE_TYPE, zone: "location", priceModel: PriceModel.spot) + handler.findBestMachineType(_, false) >> new GoogleBatchMachineTypeSelector.MachineType(type: MACHINE_TYPE, location: "location", priceModel: PriceModel.spot) and: def taskGroup = req.getTaskGroups(0) @@ -170,10 +172,11 @@ class GoogleBatchTaskHandlerTest extends Specification { def instancePolicy = allocationPolicy.getInstances(0).getPolicy() def networkInterface = allocationPolicy.getNetwork().getNetworkInterfaces(0) and: - taskGroup.getTaskSpec().getComputeResource().getBootDiskMib() == DISK.toMega() + taskGroup.getTaskSpec().getComputeResource().getBootDiskMib() == BOOT_DISK.toMega() taskGroup.getTaskSpec().getComputeResource().getCpuMilli() == CPUS * 1_000 taskGroup.getTaskSpec().getComputeResource().getMemoryMib() == MEM.toMega() taskGroup.getTaskSpec().getMaxRunDuration().getSeconds() == TIMEOUT.seconds + taskGroup.getTaskSpec().getVolumes(0).getMountPath() == '/tmp' and: runnable.getContainer().getCommandsList().join(' ') == '/bin/bash -o pipefail -c bash .command.run' runnable.getContainer().getImageUri() == CONTAINER_IMAGE @@ -195,6 +198,8 @@ class GoogleBatchTaskHandlerTest extends Specification { and: instancePolicy.getAccelerators(0).getCount() == 1 instancePolicy.getAccelerators(0).getType() == ACCELERATOR.type + instancePolicy.getDisks(0).getNewDisk().getSizeGb() == DISK.request.toGiga() + instancePolicy.getDisks(0).getNewDisk().getType() == DISK.type instancePolicy.getMachineType() == MACHINE_TYPE instancePolicy.getMinCpuPlatform() == CPU_PLATFORM instancePolicy.getProvisioningModel().toString() == 'SPOT' @@ -204,6 +209,15 @@ class GoogleBatchTaskHandlerTest extends Specification { networkInterface.getNoExternalIpAddress() == true and: req.getLogsPolicy().getDestination().toString() == 'CLOUD_LOGGING' + + when: + req = handler.newSubmitRequest(task, launcher) + then: + task.getConfig().getDiskResource() >> new DiskResource(request: '100 GB') + handler.fusionEnabled() >> false + handler.findBestMachineType(_, false) >> null + and: + req.getTaskGroups(0).getTaskSpec().getComputeResource().getBootDiskMib() == 100 * 1024 } def 'should create the trace record' () { @@ -235,7 +249,6 @@ class GoogleBatchTaskHandlerTest extends Specification { def 'should create submit request with fusion enabled' () { given: - def GCS_VOL = Volume.newBuilder().setGcs(GCS.newBuilder().setRemotePath('foo').build() ).build() def WORK_DIR = CloudStorageFileSystem.forBucket('foo').getPath('/scratch') def CONTAINER_IMAGE = 'debian:latest' def exec = Mock(GoogleBatchExecutor) { @@ -264,36 +277,25 @@ class GoogleBatchTaskHandlerTest extends Specification { def req = handler.newSubmitRequest(task, launcher) then: handler.fusionEnabled() >> true - handler.findBestMachineType(_) >> null + handler.findBestMachineType(_, true) >> null and: def taskGroup = req.getTaskGroups(0) def runnable = taskGroup.getTaskSpec().getRunnables(0) def allocationPolicy = req.getAllocationPolicy() def instancePolicy = allocationPolicy.getInstances(0).getPolicy() and: - taskGroup.getTaskSpec().getComputeResource().getBootDiskMib() == 0 taskGroup.getTaskSpec().getComputeResource().getCpuMilli() == 2_000 - taskGroup.getTaskSpec().getComputeResource().getMemoryMib() == 0 - taskGroup.getTaskSpec().getMaxRunDuration().getSeconds() == 0 + taskGroup.getTaskSpec().getVolumes(0).getMountPath() == '/tmp' and: runnable.getContainer().getCommandsList().join(' ') == '/bin/bash -o pipefail -c bash .command.run' runnable.getContainer().getImageUri() == CONTAINER_IMAGE runnable.getContainer().getOptions() == '--privileged' - runnable.getContainer().getVolumesList() == [] + runnable.getContainer().getVolumesCount() == 0 and: runnable.getEnvironment().getVariablesMap() == env and: - instancePolicy.getAcceleratorsCount() == 0 - instancePolicy.getMachineType() == '' - instancePolicy.getMinCpuPlatform() == '' - instancePolicy.getProvisioningModel().toString() == 'PROVISIONING_MODEL_UNSPECIFIED' - and: - allocationPolicy.getLocation().getAllowedLocationsCount() == 0 - allocationPolicy.getNetwork().getNetworkInterfacesCount() == 0 - and: - req.getLogsPolicy().getDestination().toString() == 'CLOUD_LOGGING' - and: - taskGroup.getTaskSpec().getVolumesList().size()==0 + instancePolicy.getDisks(0).getNewDisk().getSizeGb() == 375 + instancePolicy.getDisks(0).getNewDisk().getType() == 'local-ssd' } def 'should not set wildcard expressions as machine type'() { @@ -323,7 +325,7 @@ class GoogleBatchTaskHandlerTest extends Specification { def req = handler.newSubmitRequest(task, launcher) then: handler.fusionEnabled() >> false - handler.findBestMachineType(_) >> null + handler.findBestMachineType(_, _) >> null and: req.getAllocationPolicy().getInstances(0).policy.getMachineType() == ""