Skip to content

Commit

Permalink
Queue server over ssh in config
Browse files Browse the repository at this point in the history
  • Loading branch information
Damian Gola committed Oct 9, 2014
1 parent 29369d9 commit a17396e
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 14 deletions.
4 changes: 2 additions & 2 deletions R/clusterFunctionsHelpers.R
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ cfBrewTemplate = function(conf, template, rscript, extension) {
assertString(template)
assertString(rscript)
assertString(extension)
if (conf$debug) {
# if debug, place in jobs dir
if (conf$debug | conf$ssh) {
# if debug or job node over ssh, place in jobs dir
outfile = sub("\\.R$", sprintf(".%s", extension), rscript)
} else {
outfile = tempfile("template")
Expand Down
12 changes: 3 additions & 9 deletions R/clusterFunctionsTorque.R
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,13 @@
#' @template ret_cf
#' @family clusterFunctions
#' @export
makeClusterFunctionsTorque = function(template.file, ssh = FALSE, loginnode, list.jobs.cmd = c("qselect", "-u $USER", "-s EHQRTW")) {
assertLogical(ssh, any.missing = FALSE)
if(ssh) {
assertCharacter(loginnode, len = 1L, any.missing = FALSE)
} else {
loginnode = NULL
}
makeClusterFunctionsTorque = function(template.file, list.jobs.cmd = c("qselect", "-u $USER", "-s EHQRTW")) {
assertCharacter(list.jobs.cmd, min.len = 1L, any.missing = FALSE)
template = cfReadBrewTemplate(template.file)

submitJob = function(conf, reg, job.name, rscript, log.file, job.dir, resources, arrayjobs) {
outfile = cfBrewTemplate(conf, template, rscript, "pbs")
res = runOSCommandLinux("qsub", outfile, stop.on.exit.code = FALSE, ssh = ssh, nodename = loginnode)
res = runOSCommandLinux("qsub", outfile, stop.on.exit.code = FALSE, ssh = conf$ssh, nodename = conf$node)

max.jobs.msg = "Maximum number of jobs already in queue"
output = collapse(res$output, sep = "\n")
Expand All @@ -52,7 +46,7 @@ makeClusterFunctionsTorque = function(template.file, ssh = FALSE, loginnode, lis

listJobs = function(conf, reg) {
# Result is lines of fully quantified batch.job.ids
batch.jobs = runOSCommandLinux(list.jobs.cmd[1L], list.jobs.cmd[-1L], ssh = ssh, nodename = loginnode)$output
batch.jobs = runOSCommandLinux(list.jobs.cmd[1L], list.jobs.cmd[-1L], ssh = conf$ssh, nodename = conf$node)$output
if(any(grepl('\\[\\d+\\]', batch.jobs))) {
batch.jobs = unique(c(batch.jobs, gsub('(.+\\[)\\d+(\\].login)', '\\1\\2', batch.jobs)))
}
Expand Down
14 changes: 11 additions & 3 deletions R/conf.R
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ assignConfDefaults = function() {
conf$staged.queries = FALSE
conf$max.concurrent.jobs = Inf
conf$fs.timeout = NA_real_
conf$ssh = FALSE
conf$node = "none"
}

# loads conf into namespace on slave
Expand Down Expand Up @@ -104,7 +106,7 @@ getConfNames = function() {
c("cluster.functions", "mail.start", "mail.done", "mail.error",
"mail.from", "mail.to", "mail.control", "db.driver", "db.options",
"default.resources", "debug", "raise.warnings", "staged.queries",
"max.concurrent.jobs", "fs.timeout")
"max.concurrent.jobs", "fs.timeout", "ssh", "node")
}

checkConf = function(conf) {
Expand All @@ -117,7 +119,7 @@ checkConf = function(conf) {

checkConfElements = function(cluster.functions, mail.to, mail.from,
mail.start, mail.done, mail.error, mail.control, db.driver, db.options, default.resources, debug,
raise.warnings, staged.queries, max.concurrent.jobs, fs.timeout) {
raise.warnings, staged.queries, max.concurrent.jobs, fs.timeout, ssh, node) {

mail.choices = c("none", "first", "last", "first+last", "all")

Expand Down Expand Up @@ -151,6 +153,10 @@ checkConfElements = function(cluster.functions, mail.to, mail.from,
assertCount(max.concurrent.jobs)
if (!missing(fs.timeout))
assertNumber(fs.timeout)
if (!missing(ssh))
assertFlag(ssh)
if (!missing(node))
assertString(node)
}

getClusterFunctions = function(conf) {
Expand All @@ -176,10 +182,12 @@ printableConf = function(conf) {
" staged.queries: %s",
" max.concurrent.jobs: %s",
" fs.timeout: %s\n",
" ssh: %s",
" node: %s",
sep = "\n")
sprintf(fmt, x$cluster.functions$name, x$mail.from, x$mail.to, x$mail.start, x$mail.done,
x$mail.error, convertToShortString(x$default.resources), x$debug, x$raise.warnings,
x$staged.queries, x$max.concurrent.jobs, x$fs.timeout)
x$staged.queries, x$max.concurrent.jobs, x$fs.timeout, x$ssh, x$node)
}


Expand Down

0 comments on commit a17396e

Please sign in to comment.