Skip to content

Commit

Permalink
Refactor common Bash helper functions
Browse files Browse the repository at this point in the history
  • Loading branch information
pditommaso committed Sep 11, 2020
1 parent ed58bd2 commit 22a0ff3
Show file tree
Hide file tree
Showing 8 changed files with 309 additions and 281 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package nextflow.executor

import nextflow.util.Duration

/**
* Bash common functions library
*
* @author Paolo Di Tommaso <[email protected]>
*/
class BashFunLib {

static String body(int maxConnect, int maxAttempts, Duration delayBetweenAttempts) {
"""
# bash helper functions
nxf_cp_retry() {
local max_attempts=$maxAttempts
local timeout=${delayBetweenAttempts.seconds}
local attempt=0
local exitCode=0
while (( \$attempt < \$max_attempts ))
do
if "\$@"
then
return 0
else
exitCode=\$?
fi
if [[ \$exitCode == 0 ]]
then
break
fi
sleep \$timeout
attempt=\$(( attempt + 1 ))
timeout=\$(( timeout * 2 ))
done
}
nxf_parallel() {
IFS=\$'\\n'
local cmd=("\$@")
local cpus=\$(nproc 2>/dev/null || < /proc/cpuinfo grep '^process' -c)
local max=\$(if (( cpus>$maxConnect )); then echo $maxConnect; else echo \$cpus; fi)
local i=0
local pid=()
(
set +u
while ((i<\${#cmd[@]})); do
local copy=()
for x in "\${pid[@]}"; do
[[ -e /proc/\$x ]] && copy+=(\$x)
done
pid=("\${copy[@]}")
if ((\${#pid[@]}>=\$max)); then
sleep 1
else
eval "\${cmd[\$i]}" &
pid+=(\$!)
((i+=1))
fi
done
((\${#pid[@]}>0)) && wait \${pid[@]}
)
unset IFS
}
""".stripIndent()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,15 @@ class SimpleFileCopyStrategy implements ScriptFileCopyStrategy {

def delete = []
def links = []
inputFiles.each { stageName, storePath ->
for( Map.Entry<String,Path> entry : inputFiles ) {
final stageName = entry.key
final storePath = entry.value

// delete all previous files with the same name
delete << "rm -f ${Escape.path(stageName)}"

// link them
links << stageInputFile( storePath, stageName )

}

// return a big string containing the command
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {
String stageInputFile( Path path, String targetName ) {
// third param should not be escaped, because it's used in the grep match rule
def stage_cmd = opts.maxTransferAttempts > 1
? "downloads+=(\"nxf_s3_retry nxf_s3_download s3:/${Escape.path(path)} ${Escape.path(targetName)}\")"
? "downloads+=(\"nxf_cp_retry nxf_s3_download s3:/${Escape.path(path)} ${Escape.path(targetName)}\")"
: "downloads+=(\"nxf_s3_download s3:/${Escape.path(path)} ${Escape.path(targetName)}\")"
return stage_cmd
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package nextflow.cloud.aws.batch

import nextflow.Global
import nextflow.Session
import nextflow.executor.BashFunLib

/**
* AWS S3 helper class
Expand All @@ -33,6 +34,8 @@ class S3Helper {
def attempts = opts.maxTransferAttempts ?: AwsOptions.MAX_TRANSFER_ATTEMPTS
def delayBetweenAttempts = opts.delayBetweenAttempts ?: AwsOptions.DEFAULT_DELAY_BETWEEN_ATTEMPTS

BashFunLib.body(maxConnect, attempts, delayBetweenAttempts) +

"""
# aws helper
nxf_s3_upload() {
Expand All @@ -49,29 +52,6 @@ class S3Helper {
unset IFS
}
nxf_s3_retry() {
local max_attempts=$attempts
local timeout=${delayBetweenAttempts.seconds}
local attempt=0
local exitCode=0
while (( \$attempt < \$max_attempts ))
do
if "\$@"
then
return 0
else
exitCode=\$?
fi
if [[ \$exitCode == 0 ]]
then
break
fi
sleep \$timeout
attempt=\$(( attempt + 1 ))
timeout=\$(( timeout * 2 ))
done
}
nxf_s3_download() {
local source=\$1
local target=\$2
Expand All @@ -83,35 +63,6 @@ class S3Helper {
$cli s3 cp --only-show-errors "\$source" "\$target"
fi
}
nxf_parallel() {
IFS=\$'\\n'
local cmd=("\$@")
local cpus=\$(nproc 2>/dev/null || < /proc/cpuinfo grep '^process' -c)
local max=\$(if (( cpus>$maxConnect )); then echo $maxConnect; else echo \$cpus; fi)
local i=0
local pid=()
(
set +u
while ((i<\${#cmd[@]})); do
local copy=()
for x in "\${pid[@]}"; do
[[ -e /proc/\$x ]] && copy+=(\$x)
done
pid=("\${copy[@]}")
if ((\${#pid[@]}>=\$max)); then
sleep 1
else
eval "\${cmd[\$i]}" &
pid+=(\$!)
((i+=1))
fi
done
((\${#pid[@]}>0)) && wait \${pid[@]}
)
unset IFS
}
""".stripIndent()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,22 +110,8 @@ class AwsBatchFileCopyStrategyTest extends Specification {
1 * opts.getStorageEncryption() >> null

script == '''
# aws helper
nxf_s3_upload() {
local pattern=$1
local s3path=$2
IFS=$'\\n'
for name in $(eval "ls -1d $pattern");do
if [[ -d "$name" ]]; then
aws s3 cp --only-show-errors --recursive --storage-class STANDARD "$name" "$s3path/$name"
else
aws s3 cp --only-show-errors --storage-class STANDARD "$name" "$s3path/$name"
fi
done
unset IFS
}
nxf_s3_retry() {
# bash helper functions
nxf_cp_retry() {
local max_attempts=1
local timeout=10
local attempt=0
Expand All @@ -147,18 +133,6 @@ class AwsBatchFileCopyStrategyTest extends Specification {
timeout=\$(( timeout * 2 ))
done
}
nxf_s3_download() {
local source=$1
local target=$2
local file_name=$(basename $1)
local is_dir=$(aws s3 ls $source | grep -F "PRE ${file_name}/" -c)
if [[ $is_dir == 1 ]]; then
aws s3 cp --only-show-errors --recursive "$source" "$target"
else
aws s3 cp --only-show-errors "$source" "$target"
fi
}
nxf_parallel() {
IFS=$'\\n\'
Expand Down Expand Up @@ -188,6 +162,33 @@ class AwsBatchFileCopyStrategyTest extends Specification {
)
unset IFS
}
# aws helper
nxf_s3_upload() {
local pattern=$1
local s3path=$2
IFS=$'\\n'
for name in $(eval "ls -1d $pattern");do
if [[ -d "$name" ]]; then
aws s3 cp --only-show-errors --recursive --storage-class STANDARD "$name" "$s3path/$name"
else
aws s3 cp --only-show-errors --storage-class STANDARD "$name" "$s3path/$name"
fi
done
unset IFS
}
nxf_s3_download() {
local source=$1
local target=$2
local file_name=$(basename $1)
local is_dir=$(aws s3 ls $source | grep -F "PRE ${file_name}/" -c)
if [[ $is_dir == 1 ]]; then
aws s3 cp --only-show-errors --recursive "$source" "$target"
else
aws s3 cp --only-show-errors "$source" "$target"
fi
}
'''.stripIndent()

when:
Expand All @@ -198,22 +199,8 @@ class AwsBatchFileCopyStrategyTest extends Specification {
2 * opts.getStorageEncryption() >> 'AES256'

script == '''
# aws helper
nxf_s3_upload() {
local pattern=$1
local s3path=$2
IFS=$'\\n'
for name in $(eval "ls -1d $pattern");do
if [[ -d "$name" ]]; then
/foo/aws s3 cp --only-show-errors --recursive --sse AES256 --storage-class STANDARD_IA "$name" "$s3path/$name"
else
/foo/aws s3 cp --only-show-errors --sse AES256 --storage-class STANDARD_IA "$name" "$s3path/$name"
fi
done
unset IFS
}
nxf_s3_retry() {
# bash helper functions
nxf_cp_retry() {
local max_attempts=1
local timeout=10
local attempt=0
Expand All @@ -235,18 +222,6 @@ class AwsBatchFileCopyStrategyTest extends Specification {
timeout=\$(( timeout * 2 ))
done
}
nxf_s3_download() {
local source=$1
local target=$2
local file_name=$(basename $1)
local is_dir=$(/foo/aws s3 ls $source | grep -F "PRE ${file_name}/" -c)
if [[ $is_dir == 1 ]]; then
/foo/aws s3 cp --only-show-errors --recursive "$source" "$target"
else
/foo/aws s3 cp --only-show-errors "$source" "$target"
fi
}
nxf_parallel() {
IFS=$'\\n\'
Expand Down Expand Up @@ -276,6 +251,33 @@ class AwsBatchFileCopyStrategyTest extends Specification {
)
unset IFS
}
# aws helper
nxf_s3_upload() {
local pattern=$1
local s3path=$2
IFS=$'\\n'
for name in $(eval "ls -1d $pattern");do
if [[ -d "$name" ]]; then
/foo/aws s3 cp --only-show-errors --recursive --sse AES256 --storage-class STANDARD_IA "$name" "$s3path/$name"
else
/foo/aws s3 cp --only-show-errors --sse AES256 --storage-class STANDARD_IA "$name" "$s3path/$name"
fi
done
unset IFS
}
nxf_s3_download() {
local source=$1
local target=$2
local file_name=$(basename $1)
local is_dir=$(/foo/aws s3 ls $source | grep -F "PRE ${file_name}/" -c)
if [[ $is_dir == 1 ]]; then
/foo/aws s3 cp --only-show-errors --recursive "$source" "$target"
else
/foo/aws s3 cp --only-show-errors "$source" "$target"
fi
}
'''.stripIndent()
}

Expand Down
Loading

0 comments on commit 22a0ff3

Please sign in to comment.