From 11e6118115bc1829ca8fa559219ac4de8b1d0697 Mon Sep 17 00:00:00 2001 From: Stefano Mangiola Date: Wed, 28 Aug 2024 15:48:38 +0930 Subject: [PATCH] track files --- R/factories.R | 21 ++++-- R/functions.R | 8 ++- R/modules_grammar_hpc.R | 48 ++++++++----- R/tranform_assay.R | 5 +- tests/testthat/test_single_functions.R | 96 +++++++++++++------------- 5 files changed, 106 insertions(+), 72 deletions(-) diff --git a/R/factories.R b/R/factories.R index 48c8804..48130c8 100644 --- a/R/factories.R +++ b/R/factories.R @@ -49,26 +49,32 @@ hpc_internal = function( other_arguments_to_map = c(), packages = targets::tar_option_get("packages") , deployment = targets::tar_option_get("deployment"), + format = targets::tar_option_get("format"), ... ){ args <- list(...) # Capture the ... arguments as a list - # Construct the full call expression with the pipeline substituted into the function - fx_call <- as.call(c(user_function, args)) + + # If format is file just pass the argument + if(format != "file") + + # Construct the full call expression with the pipeline substituted into the function + user_function <- as.call(c(user_function, args)) if(tiers |> is.null() || tiers |> length() < 2){ tar_target_raw( name = target_output |> as.character(), - command = fx_call, + command = user_function, # This is in case I am not tiering (e.g. DE analyses) but I need to map pattern = build_pattern(other_arguments_to_map = other_arguments_to_map), iteration = "list", packages = packages, - deployment = deployment + deployment = deployment, + format = format ) @@ -78,7 +84,7 @@ hpc_internal = function( else { - if(fx_call |> deparse() |> str_detect("%>%") |> any()) + if(user_function |> deparse() |> str_detect("%>%") |> any()) stop("HPCell says: no \"%>%\" allowed in the command, please use \"|>\" ") # Filter out arguments to be tiered from the input command @@ -93,7 +99,7 @@ hpc_internal = function( # This is needed because using glue as.character() , - command = fx_call |> add_tier_inputs(arguments_already_tiered, .y), + command = user_function |> add_tier_inputs(arguments_already_tiered, .y), pattern = build_pattern( other_arguments_to_map = glue("{other_arguments_to_map}_{.y}"), @@ -103,7 +109,8 @@ hpc_internal = function( iteration = "list", packages = packages, deployment = deployment, - resources = tar_resources(crew = tar_resources_crew(.y)) + resources = tar_resources(crew = tar_resources_crew(.y)) , + format = format ) }) diff --git a/R/functions.R b/R/functions.R index 1999dd4..67dc70b 100644 --- a/R/functions.R +++ b/R/functions.R @@ -1530,4 +1530,10 @@ annotation_consensus = function(single_cell_data, .sample_column, .cell_type, .a #' @export -is_target = function(x) as.name(x) +is_target = function(x) { + + if(x |> is("character") |> not()) + stop("HPCell says: the input to `is_target` must be a character") + + as.name(x) +} diff --git a/R/modules_grammar_hpc.R b/R/modules_grammar_hpc.R index 66dc86e..2968da0 100644 --- a/R/modules_grammar_hpc.R +++ b/R/modules_grammar_hpc.R @@ -56,8 +56,7 @@ initialise_hpc <- function(input_hpc, input_hpc = input_hpc |> set_names(seq_len(length(input_hpc))) input_hpc |> names() |> saveRDS("sample_names.rds") - #cell_count |> saveRDS("cell_count.rds") - + # Optionally, you can evaluate the arguments if they are expressions args_list <- lapply(args_list, eval, envir = parent.frame()) @@ -72,8 +71,6 @@ initialise_hpc <- function(input_hpc, computing_resources |> saveRDS("temp_computing_resources.rds") tiers = tier |> get_positions() - tiers |> - saveRDS("temp_tiers.rds") # Write pipeline to a file { @@ -99,12 +96,7 @@ initialise_hpc <- function(input_hpc, packages = c("HPCell") ) - target_list = list( - tar_target(gene_nomenclature, readRDS("temp_gene_nomenclature.rds"), iteration = "list", deployment = "main"), - tar_target(data_container_type, readRDS("data_container_type.rds"), deployment = "main") - - ) - + target_list = list( ) } |> substitute(env = list(d = debug_step)) |> @@ -113,25 +105,50 @@ initialise_hpc <- function(input_hpc, input_hpc = list(initialisation = args_list ) |> - c(list(sample_names = list(iterate = "map")) ) |> - + add_class("HPCell") input_hpc |> + # Nomenclature + hpc_single("temp_gene_nomenclature_file", "temp_gene_nomenclature.rds", format = "file") |> + + hpc_single( + target_output = "gene_nomenclature", + user_function = readRDS |> quote(), + file = "temp_gene_nomenclature_file" |> is_target(), + deployment = "main" + ) |> + + # Container class + hpc_single("data_container_type_file", "data_container_type.rds", format = "file") |> + + hpc_single( + target_output = "data_container_type", + user_function = readRDS |> quote(), + file = "data_container_type_file" |> is_target(), + deployment = "main" + ) |> + + # Sample names + hpc_single("sample_names_file", "sample_names.rds", format = "file") |> + hpc_single( target_output = "sample_names", user_function = readRDS |> quote(), - file = "sample_names.rds", + file = "sample_names_file" |> is_target(), deployment = "main", iterate = "map" ) |> + # Files + hpc_single("read_file_list_file", "input_file.rds", format = "file") |> + hpc_single( target_output = "read_file_list", user_function = readRDS |> quote(), - file = "input_file.rds", + file = "read_file_list_file" |> is_target(), deployment = "main", iterate = "map" ) |> @@ -612,7 +629,7 @@ evaluate_hpc.HPCell = function(input_hpc) { # Call final list tar_script_append({ - target_list + target_list }, script = glue("{input_hpc$initialisation$store}.R")) if(input_hpc$initialisation$debug_step |> is.null()) @@ -637,7 +654,6 @@ evaluate_hpc.HPCell = function(input_hpc) { "temp_group_by.rds", "factors_to_regress.rds", "pseudobulk_group_by.rds", - "temp_tiers.rds", "temp_gene_nomenclature.rds" ) |> remove_files_safely() diff --git a/R/tranform_assay.R b/R/tranform_assay.R index 43982d0..1c6669b 100644 --- a/R/tranform_assay.R +++ b/R/tranform_assay.R @@ -21,10 +21,13 @@ tranform_assay.HPCell = function( input_hpc |> + # Track the file + hpc_single("transform_file", "temp_fx.rds", format = "file") |> + hpc_iterate( target_output = "transform", user_function = readRDS |> quote() , - file = "temp_fx.rds" + file = "transform_file" |> is_target() # , # iteration = "list", # deployment = "main" diff --git a/tests/testthat/test_single_functions.R b/tests/testthat/test_single_functions.R index 414737e..b8282fd 100644 --- a/tests/testthat/test_single_functions.R +++ b/tests/testthat/test_single_functions.R @@ -488,6 +488,50 @@ library(crew.cluster) # InstallData("pbmcsca") # pbmcsca <- LoadData("pbmcsca") # save this to disk, so you can recall every time you execute HPCell +computing_resources = crew_controller_local(workers = 8) #resource_tuned_slurm + +# tier = rep(c("tier_1", "tier_2"), times = 6), +# computing_resources = list( +# +# crew_controller_local( +# name = "tier_1", +# workers = 4 +# ), +# crew_controller_local( +# name = "tier_2", +# workers = 4 +# ) +# ) + +# computing_resources = list( +# +# crew_controller_slurm( +# name = "tier_1", +# slurm_memory_gigabytes_per_cpu = 5, +# slurm_cpus_per_task = 1, +# workers = 50, +# tasks_max = 5, +# verbose = T +# ), +# crew_controller_slurm( +# name = "tier_2", +# slurm_memory_gigabytes_per_cpu = 10, +# slurm_cpus_per_task = 1, +# workers = 50, +# tasks_max = 5, +# verbose = T +# ) +# ) + +# # Slurm resources +# computing_resources = +# crew.cluster::crew_controller_slurm( +# slurm_memory_gigabytes_per_cpu = 5, +# workers = 500, +# tasks_max = 5, +# verbose = T, +# slurm_cpus_per_task = 1 +# ) # # Define and execute the pipeline file_list = @@ -504,59 +548,19 @@ file_list = # Initialise pipeline characteristics file_list |> + head(2) |> initialise_hpc( gene_nomenclature = "symbol", data_container_type = "sce_hdf5", - + computing_resources = computing_resources # debug_step = "non_batch_variation_removal_S_1", # Default resourced - computing_resources = crew_controller_local(workers = 8), #resource_tuned_slurm - - # tier = rep(c("tier_1", "tier_2"), times = 6), - # computing_resources = list( - # - # crew_controller_local( - # name = "tier_1", - # workers = 4 - # ), - # crew_controller_local( - # name = "tier_2", - # workers = 4 - # ) - # ) - # computing_resources = list( - # - # crew_controller_slurm( - # name = "tier_1", - # slurm_memory_gigabytes_per_cpu = 5, - # slurm_cpus_per_task = 1, - # workers = 50, - # tasks_max = 5, - # verbose = T - # ), - # crew_controller_slurm( - # name = "tier_2", - # slurm_memory_gigabytes_per_cpu = 10, - # slurm_cpus_per_task = 1, - # workers = 50, - # tasks_max = 5, - # verbose = T - # ) - # ) - - # # Slurm resources - # computing_resources = - # crew.cluster::crew_controller_slurm( - # slurm_memory_gigabytes_per_cpu = 5, - # workers = 500, - # tasks_max = 5, - # verbose = T, - # slurm_cpus_per_task = 1 - # ) ) |> + # ONLY APPLICABLE TO SCE FOR NOW + tranform_assay(fx = file_list |> purrr::map(~identity), target_output = "sce_transformed") |> hpc_report( "empty_report", @@ -565,9 +569,7 @@ file_list |> sample_names = "sample_names" |> is_target() ) |> - # ONLY APPLICABLE TO SCE FOR NOW - tranform_assay(fx = file_list |> purrr::map(~identity), target_output = "sce_transformed") |> - + hpc_iterate( target_output = "o", user_function = function(x, y){x |> dplyr::mutate(bla = y)},