diff --git a/DESCRIPTION b/DESCRIPTION index bcdc39d19..4e992f0a1 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: mirai Type: Package Title: Minimalist Async Evaluation Framework for R -Version: 0.8.7.9007 +Version: 0.8.7.9008 Description: Lightweight parallel code execution and distributed computing. Designed for simplicity, a 'mirai' evaluates an R expression asynchronously, on local or network resources, resolving automatically upon completion. diff --git a/NAMESPACE b/NAMESPACE index e677bfebd..7e0672463 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -23,8 +23,8 @@ export(stop_mirai) export(unresolved) importFrom(nanonext,"opt<-") importFrom(nanonext,.context) +importFrom(nanonext,.unresolved) importFrom(nanonext,call_aio) -importFrom(nanonext,context) importFrom(nanonext,cv) importFrom(nanonext,cv_reset) importFrom(nanonext,cv_value) diff --git a/NEWS.md b/NEWS.md index 05413ad1c..0d603d7b4 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,4 +1,4 @@ -# mirai 0.8.7.9007 (development) +# mirai 0.8.7.9008 (development) * Improvements to dispatcher: + Optimal scheduling when tasks are submitted prior to any servers coming online. diff --git a/R/mirai-package.R b/R/mirai-package.R index 6d76f11f1..c8dd7b670 100644 --- a/R/mirai-package.R +++ b/R/mirai-package.R @@ -49,10 +49,10 @@ #' @author Charlie Gao \email{charlie.gao@@shikokuchuo.net} #' (\href{https://orcid.org/0000-0002-0750-061X}{ORCID}) #' -#' @importFrom nanonext call_aio context .context cv cv_reset cv_value dial +#' @importFrom nanonext call_aio .context cv cv_reset cv_value dial #' is_error_value listen lock mclock msleep opt opt<- parse_url pipe_notify #' random recv recv_aio_signal request request_signal send sha1 socket stat -#' stop_aio unresolved until wait +#' stop_aio unresolved .unresolved until wait #' #' @docType package #' @name mirai-package diff --git a/R/mirai.R b/R/mirai.R index 5e6325a56..7c8315f79 100644 --- a/R/mirai.R +++ b/R/mirai.R @@ -288,6 +288,12 @@ dispatcher <- function(client, url = NULL, n = NULL, asyncdial = FALSE, next } + for (i in which(activevec == 0L)) + if (length(queue[[i]]) == 2L && .unresolved(queue[[i]][["req"]])) { + stop_aio(queue[[i]][["req"]]) + queue[[i]] <- list() + } + free <- which(serverfree & activevec) if (length(free)) @@ -314,17 +320,11 @@ dispatcher <- function(client, url = NULL, n = NULL, asyncdial = FALSE, for (i in which(activevec == 1L)) if (!length(queue[[i]])) { - ctx <- context(sock) + ctx <- .context(sock) req <- recv_aio_signal(ctx, mode = 1L, cv = cv) queue[[i]] <- list(ctx = ctx, req = req) } - for (i in which(activevec == 0L)) - if (length(queue[[i]]) == 2L && unresolved(queue[[i]][["req"]])) { - close(queue[[i]][["ctx"]]) - queue[[i]] <- list() - } - } )