Skip to content

Commit

Permalink
Add disk resource with type option for google batch (#3861)
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Sherman <[email protected]>
Signed-off-by: Paolo Di Tommaso <[email protected]>
Signed-off-by: Jordi Deu-Pons <[email protected]>
Co-authored-by: Jordi Deu-Pons <[email protected]>
Co-authored-by: Paolo Di Tommaso <[email protected]>
  • Loading branch information
3 people authored May 29, 2023
1 parent d3788f9 commit 166b363
Show file tree
Hide file tree
Showing 12 changed files with 443 additions and 103 deletions.
27 changes: 25 additions & 2 deletions docs/google.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ Notes:

Read the {ref}`Google configuration<config-google>` section to learn more about advanced configuration options.

(google-batch-process)=

### Process definition

Processes can be defined as usual and by default the `cpus` and `memory` directives are used to find the cheapest machine type available at current location that fits the requested resources. If `memory` is not specified, 1GB of memory is allocated per cpu.
Expand Down Expand Up @@ -127,6 +129,24 @@ process predefined_resources_task {
}
```

:::{versionadded} 23.06.0-edge
:::

The `disk` directive can be used to set the boot disk size or provision a disk for scratch storage. If the disk type is specified with the `type` option, a new disk will be mounted to the task VM at `/tmp` with the requested size and type. Otherwise, it will set the boot disk size, overriding the `google.batch.bootDiskSize` config option. See the [Google Batch documentation](https://cloud.google.com/compute/docs/disks) for more information about the available disk types.

Examples:

```groovy
// set the boot disk size
disk 100.GB
// mount a persistent disk at '/tmp'
disk 100.GB, type: 'pd-standard'
// mount a local SSD disk at '/tmp' (should be a multiple of 375 GB)
disk 375.GB, type: 'local-ssd'
```

### Pipeline execution

The pipeline can be launched either in a local computer or a cloud instance. Pipeline input data can be stored either locally or in a Google Storage bucket.
Expand Down Expand Up @@ -182,10 +202,13 @@ tower.accessToken = '<YOUR ACCESS TOKEN>'

The [Tower](https://cloud.tower.nf) access token is optional, but it enables higher API rate limits for the {ref}`wave-page` service required by Fusion.

:::{tip}
When Fusion is enabled, by default, only machine types that can attach local SSD disks will be used. If you specify your own machine type or machine series, they should be able to attach local SSD disks, otherwise the job scheduling will fail.
By default, Fusion mounts a local SSD disk to the VM at `/tmp`, using a machine type that can attach local SSD disks. If you specify your own machine type or machine series, they should be able to attach local SSD disks, otherwise the job scheduling will fail.

:::{versionadded} 23.06.0-edge
:::

The `disk` directive can be used to override the disk requested by Fusion. See the {ref}`Process definition <google-batch-process>` section above for examples. Note that local SSD disks must be a multiple of 375 GB in size, otherwise the size will be increased to the next multiple of 375 GB.

### Supported directives

The integration with Google Batch is a developer preview feature. Currently, the following Nextflow directives are supported:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2013-2023, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.executor.res

import groovy.transform.CompileStatic
import groovy.transform.EqualsAndHashCode
import groovy.transform.ToString
import nextflow.util.MemoryUnit

/**
* Models disk resource request
*
* @author Ben Sherman <[email protected]>
*/
@ToString(includeNames = true, includePackage = false)
@CompileStatic
@EqualsAndHashCode
class DiskResource {

final MemoryUnit request
final String type

DiskResource( value ) {
this(request: value)
}

DiskResource( Map opts ) {
this.request = toMemoryUnit(opts.request)

if( opts.type )
this.type = opts.type as String
}

private static MemoryUnit toMemoryUnit( value ) {
if( value instanceof MemoryUnit )
return (MemoryUnit)value

try {
return new MemoryUnit(value.toString().trim())
}
catch( Exception e ) {
throw new IllegalArgumentException("Not a valid disk value: $value")
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import nextflow.exception.AbortOperationException
import nextflow.exception.FailedGuardException
import nextflow.executor.BashWrapperBuilder
import nextflow.executor.res.AcceleratorResource
import nextflow.executor.res.DiskResource
import nextflow.k8s.model.PodOptions
import nextflow.script.TaskClosure
import nextflow.util.CmdLineHelper
Expand Down Expand Up @@ -252,21 +253,20 @@ class TaskConfig extends LazyMap implements Cloneable {
}
}

MemoryUnit getDisk() {
DiskResource getDiskResource() {
def value = get('disk')

if( !value )
return null
if( value instanceof Map )
return new DiskResource((Map)value)

if( value instanceof MemoryUnit )
return (MemoryUnit)value
if( value != null )
return new DiskResource(value)

try {
new MemoryUnit(value.toString().trim())
}
catch( Exception e ) {
throw new AbortOperationException("Not a valid 'disk' value in process definition: $value")
}
return null
}

MemoryUnit getDisk() {
getDiskResource()?.getRequest()
}

Duration getTime() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -932,6 +932,42 @@ class ProcessConfig implements Map<String,Object>, Cloneable {
return this
}

/**
* Allow user to specify `disk` directive as a value with a list of options, eg:
*
* disk 375.GB, type: 'local-ssd'
*
* @param opts
* A map representing the disk options
* @param value
* The default disk value
* @return
* The {@link ProcessConfig} instance itself
*/
ProcessConfig disk( Map opts, value ) {
opts.request = value
return disk(opts)
}

/**
* Allow user to specify `disk` directive as a value or a list of options, eg:
*
* disk 100.GB
* disk request: 375.GB, type: 'local-ssd'
*
* @param value
* The default disk value or map of options
* @return
* The {@link ProcessConfig} instance itself
*/
ProcessConfig disk( value ) {
if( value instanceof Map || value instanceof Closure )
configProperties.put('disk', value)
else
configProperties.put('disk', [request: value])
return this
}

ProcessConfig arch( Map params, value ) {
if( value instanceof String ) {
if( params.name==null )
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2013-2023, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.executor.res

import nextflow.util.MemoryUnit
import spock.lang.Specification

/**
*
* @author Ben Sherman <[email protected]>
*/
class DiskResourceTest extends Specification {

static final _100_GB = MemoryUnit.of('100GB')
static final _375_GB = MemoryUnit.of('375GB')

def 'should create a disk resource' () {

when:
def disk = new DiskResource(VALUE)
then:
disk.request == REQ
disk.type == TYPE

where:
VALUE | REQ | TYPE
_100_GB | _100_GB | null
[request: _100_GB] | _100_GB | null
[request: _375_GB, type: 'local-ssd'] | _375_GB | 'local-ssd'
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ class TaskConfigTest extends Specification {
then:
config.disk == expected
config.getDisk() == expected
config.getDiskResource()?.getRequest() == expected

where:
expected || value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,27 @@ class ProcessConfigTest extends Specification {
process.accelerator == [request: 1, limit:5]
}

def 'should apply disk config' () {

given:
def process = new ProcessConfig(Mock(BaseScript))

when:
process.disk '100 GB'
then:
process.disk == [request: '100 GB']

when:
process.disk '375 GB', type: 'local-ssd'
then:
process.disk == [request: '375 GB', type: 'local-ssd']

when:
process.disk request: '375 GB', type: 'local-ssd'
then:
process.disk == [request: '375 GB', type: 'local-ssd']
}

def 'should apply architecture config' () {

given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,7 @@ class GoogleBatchFusionAdapter implements GoogleBatchLauncherSpec {

@Override
List<Volume> getVolumes() {
return [
Volume.newBuilder()
.setDeviceName('fusion')
.setMountPath('/tmp')
.build()
]
return List.of()
}

@Override
Expand Down
Loading

0 comments on commit 166b363

Please sign in to comment.