Skip to content

Commit

Permalink
track files
Browse files Browse the repository at this point in the history
  • Loading branch information
stemangiola committed Aug 28, 2024
1 parent e75e881 commit 11e6118
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 72 deletions.
21 changes: 14 additions & 7 deletions R/factories.R
Original file line number Diff line number Diff line change
Expand Up @@ -49,26 +49,32 @@ hpc_internal = function(
other_arguments_to_map = c(),
packages = targets::tar_option_get("packages") ,
deployment = targets::tar_option_get("deployment"),
format = targets::tar_option_get("format"),
...
){

args <- list(...) # Capture the ... arguments as a list

# Construct the full call expression with the pipeline substituted into the function
fx_call <- as.call(c(user_function, args))

# If format is file just pass the argument
if(format != "file")

# Construct the full call expression with the pipeline substituted into the function
user_function <- as.call(c(user_function, args))

if(tiers |> is.null() || tiers |> length() < 2){

tar_target_raw(
name = target_output |> as.character(),
command = fx_call,
command = user_function,

# This is in case I am not tiering (e.g. DE analyses) but I need to map
pattern = build_pattern(other_arguments_to_map = other_arguments_to_map),

iteration = "list",
packages = packages,
deployment = deployment
deployment = deployment,
format = format


)
Expand All @@ -78,7 +84,7 @@ hpc_internal = function(

else {

if(fx_call |> deparse() |> str_detect("%>%") |> any())
if(user_function |> deparse() |> str_detect("%>%") |> any())
stop("HPCell says: no \"%>%\" allowed in the command, please use \"|>\" ")

# Filter out arguments to be tiered from the input command
Expand All @@ -93,7 +99,7 @@ hpc_internal = function(

# This is needed because using glue
as.character() ,
command = fx_call |> add_tier_inputs(arguments_already_tiered, .y),
command = user_function |> add_tier_inputs(arguments_already_tiered, .y),
pattern =
build_pattern(
other_arguments_to_map = glue("{other_arguments_to_map}_{.y}"),
Expand All @@ -103,7 +109,8 @@ hpc_internal = function(
iteration = "list",
packages = packages,
deployment = deployment,
resources = tar_resources(crew = tar_resources_crew(.y))
resources = tar_resources(crew = tar_resources_crew(.y)) ,
format = format
)
})

Expand Down
8 changes: 7 additions & 1 deletion R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -1530,4 +1530,10 @@ annotation_consensus = function(single_cell_data, .sample_column, .cell_type, .a


#' @export
is_target = function(x) as.name(x)
is_target = function(x) {

if(x |> is("character") |> not())
stop("HPCell says: the input to `is_target` must be a character")

as.name(x)
}
48 changes: 32 additions & 16 deletions R/modules_grammar_hpc.R
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ initialise_hpc <- function(input_hpc,
input_hpc = input_hpc |> set_names(seq_len(length(input_hpc)))

input_hpc |> names() |> saveRDS("sample_names.rds")
#cell_count |> saveRDS("cell_count.rds")


# Optionally, you can evaluate the arguments if they are expressions
args_list <- lapply(args_list, eval, envir = parent.frame())

Expand All @@ -72,8 +71,6 @@ initialise_hpc <- function(input_hpc,
computing_resources |> saveRDS("temp_computing_resources.rds")
tiers = tier |>
get_positions()
tiers |>
saveRDS("temp_tiers.rds")

# Write pipeline to a file
{
Expand All @@ -99,12 +96,7 @@ initialise_hpc <- function(input_hpc,
packages = c("HPCell")
)

target_list = list(
tar_target(gene_nomenclature, readRDS("temp_gene_nomenclature.rds"), iteration = "list", deployment = "main"),
tar_target(data_container_type, readRDS("data_container_type.rds"), deployment = "main")

)

target_list = list( )

} |>
substitute(env = list(d = debug_step)) |>
Expand All @@ -113,25 +105,50 @@ initialise_hpc <- function(input_hpc,

input_hpc =
list(initialisation = args_list ) |>
c(list(sample_names = list(iterate = "map")) ) |>


add_class("HPCell")


input_hpc |>

# Nomenclature
hpc_single("temp_gene_nomenclature_file", "temp_gene_nomenclature.rds", format = "file") |>

hpc_single(
target_output = "gene_nomenclature",
user_function = readRDS |> quote(),
file = "temp_gene_nomenclature_file" |> is_target(),
deployment = "main"
) |>

# Container class
hpc_single("data_container_type_file", "data_container_type.rds", format = "file") |>

hpc_single(
target_output = "data_container_type",
user_function = readRDS |> quote(),
file = "data_container_type_file" |> is_target(),
deployment = "main"
) |>

# Sample names
hpc_single("sample_names_file", "sample_names.rds", format = "file") |>

hpc_single(
target_output = "sample_names",
user_function = readRDS |> quote(),
file = "sample_names.rds",
file = "sample_names_file" |> is_target(),
deployment = "main",
iterate = "map"
) |>

# Files
hpc_single("read_file_list_file", "input_file.rds", format = "file") |>

hpc_single(
target_output = "read_file_list",
user_function = readRDS |> quote(),
file = "input_file.rds",
file = "read_file_list_file" |> is_target(),
deployment = "main",
iterate = "map"
) |>
Expand Down Expand Up @@ -612,7 +629,7 @@ evaluate_hpc.HPCell = function(input_hpc) {

# Call final list
tar_script_append({
target_list
target_list
}, script = glue("{input_hpc$initialisation$store}.R"))

if(input_hpc$initialisation$debug_step |> is.null())
Expand All @@ -637,7 +654,6 @@ evaluate_hpc.HPCell = function(input_hpc) {
"temp_group_by.rds",
"factors_to_regress.rds",
"pseudobulk_group_by.rds",
"temp_tiers.rds",
"temp_gene_nomenclature.rds"
) |>
remove_files_safely()
Expand Down
5 changes: 4 additions & 1 deletion R/tranform_assay.R
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ tranform_assay.HPCell = function(

input_hpc |>

# Track the file
hpc_single("transform_file", "temp_fx.rds", format = "file") |>

hpc_iterate(
target_output = "transform",
user_function = readRDS |> quote() ,
file = "temp_fx.rds"
file = "transform_file" |> is_target()
# ,
# iteration = "list",
# deployment = "main"
Expand Down
96 changes: 49 additions & 47 deletions tests/testthat/test_single_functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,50 @@ library(crew.cluster)
# InstallData("pbmcsca")
# pbmcsca <- LoadData("pbmcsca") # save this to disk, so you can recall every time you execute HPCell

computing_resources = crew_controller_local(workers = 8) #resource_tuned_slurm

# tier = rep(c("tier_1", "tier_2"), times = 6),
# computing_resources = list(
#
# crew_controller_local(
# name = "tier_1",
# workers = 4
# ),
# crew_controller_local(
# name = "tier_2",
# workers = 4
# )
# )

# computing_resources = list(
#
# crew_controller_slurm(
# name = "tier_1",
# slurm_memory_gigabytes_per_cpu = 5,
# slurm_cpus_per_task = 1,
# workers = 50,
# tasks_max = 5,
# verbose = T
# ),
# crew_controller_slurm(
# name = "tier_2",
# slurm_memory_gigabytes_per_cpu = 10,
# slurm_cpus_per_task = 1,
# workers = 50,
# tasks_max = 5,
# verbose = T
# )
# )

# # Slurm resources
# computing_resources =
# crew.cluster::crew_controller_slurm(
# slurm_memory_gigabytes_per_cpu = 5,
# workers = 500,
# tasks_max = 5,
# verbose = T,
# slurm_cpus_per_task = 1
# )

# # Define and execute the pipeline
file_list =
Expand All @@ -504,59 +548,19 @@ file_list =

# Initialise pipeline characteristics
file_list |>
head(2) |>
initialise_hpc(
gene_nomenclature = "symbol",
data_container_type = "sce_hdf5",

computing_resources = computing_resources
# debug_step = "non_batch_variation_removal_S_1",

# Default resourced
computing_resources = crew_controller_local(workers = 8), #resource_tuned_slurm

# tier = rep(c("tier_1", "tier_2"), times = 6),
# computing_resources = list(
#
# crew_controller_local(
# name = "tier_1",
# workers = 4
# ),
# crew_controller_local(
# name = "tier_2",
# workers = 4
# )
# )

# computing_resources = list(
#
# crew_controller_slurm(
# name = "tier_1",
# slurm_memory_gigabytes_per_cpu = 5,
# slurm_cpus_per_task = 1,
# workers = 50,
# tasks_max = 5,
# verbose = T
# ),
# crew_controller_slurm(
# name = "tier_2",
# slurm_memory_gigabytes_per_cpu = 10,
# slurm_cpus_per_task = 1,
# workers = 50,
# tasks_max = 5,
# verbose = T
# )
# )

# # Slurm resources
# computing_resources =
# crew.cluster::crew_controller_slurm(
# slurm_memory_gigabytes_per_cpu = 5,
# workers = 500,
# tasks_max = 5,
# verbose = T,
# slurm_cpus_per_task = 1
# )
) |>

# ONLY APPLICABLE TO SCE FOR NOW
tranform_assay(fx = file_list |> purrr::map(~identity), target_output = "sce_transformed") |>

hpc_report(
"empty_report",
Expand All @@ -565,9 +569,7 @@ file_list |>
sample_names = "sample_names" |> is_target()
) |>

# ONLY APPLICABLE TO SCE FOR NOW
tranform_assay(fx = file_list |> purrr::map(~identity), target_output = "sce_transformed") |>


hpc_iterate(
target_output = "o",
user_function = function(x, y){x |> dplyr::mutate(bla = y)},
Expand Down

0 comments on commit 11e6118

Please sign in to comment.