From c78bc5d11c8cfa0229903dd0fa4e8b8f0ae8f462 Mon Sep 17 00:00:00 2001 From: Harihara Date: Mon, 7 Oct 2024 14:42:38 -0700 Subject: [PATCH] Updated code to fail silently --- latch_metadata/ATACSeqQC_Plots.R | 14 +- wf/Post_Process_Tasks.py | 291 +++++++++++++++++-------------- 2 files changed, 165 insertions(+), 140 deletions(-) diff --git a/latch_metadata/ATACSeqQC_Plots.R b/latch_metadata/ATACSeqQC_Plots.R index ea0f2ea2..7ad6ab95 100644 --- a/latch_metadata/ATACSeqQC_Plots.R +++ b/latch_metadata/ATACSeqQC_Plots.R @@ -43,29 +43,25 @@ seqlev <- c("chr1","chr2","chr3","chr4","chr5","chr6","chr7","chr8","chr9", which <- as(GenomeInfoDb::seqinfo(Hsapiens)[seqlev], "GRanges") tags <- names(bamTop100)[lengths(bamTop100)>0] - -gal <- readBamFile(merged_bam_file, tag=tags, which=which, - asMates=TRUE, bigFile=TRUE) -shiftedBamFile <- file.path(outPath, "shifted.bam") -gal1 <- shiftGAlignmentsList(gal, outbam=shiftedBamFile) +genome <- Hsapiens flag = 0 if(genome_id == "hg19"){ txs <- transcripts(TxDb.Hsapiens.UCSC.hg19.knownGene) txs <- txs[seqnames(txs) %in% seqlev] - genome <- Hsapiens - objs <- splitGAlignmentsByCut(gal1, txs=txs, genome=genome, outPath = outPath) flag = 1 } + if(genome_id == "hg38"){ txs <- transcripts(TxDb.Hsapiens.UCSC.hg38.knownGene) txs <- txs[seqnames(txs) %in% seqlev] - genome <- Hsapiens - objs <- splitGAlignmentsByCut(gal1, txs=txs, genome=genome, outPath = outPath) flag = 1 } if(flag == 1){ + gal <- readBamFile(merged_bam_file, tag=tags, which=which, asMates=FALSE) + names(gal) <- mcols(gal)$qname + objs <- splitGAlignmentsByCut(gal, txs=txs, genome=genome, outPath = outPath) bamFiles <- file.path(outPath, c("NucleosomeFree.bam", "mononucleosome.bam", diff --git a/wf/Post_Process_Tasks.py b/wf/Post_Process_Tasks.py index e90fcb21..22f8e518 100644 --- a/wf/Post_Process_Tasks.py +++ b/wf/Post_Process_Tasks.py @@ -10,6 +10,7 @@ from dataclasses_json import dataclass_json from latch import map_task, medium_task, small_task from latch.account import Account +from latch.functions.messages import message from latch.ldata.path import LPath from latch.registry.project import Project from latch.registry.table import Table @@ -200,62 +201,72 @@ def Calculate_Plotting_Data( obj_list: List[Registry_Obj]: List of objects of type Registry_Obj, that is used to write files back to the registry tables. """ - d = {} - for f in rplots: - files = latch_listdir(f) - sample = Path(f.remote_path).name - plots = {} - for i in files: - if Path(i).name in [ - "Frag_Sizes.txt", - "featurealignment_coverage.txt", - "Saturation_Plots.txt", - ]: - plots[Path(i).name] = LatchFile(i) - flag = False - for k in plots.keys(): - if "featurealignment_coverage.txt" in k: - flag = True - if flag == False: - cmd = [ - "echo", - "NucleosomeFree\tmononucleosome", - ">", - "featurealignment_coverage.txt", - ] - subprocess.run(" ".join(cmd), shell=True, check=True) - print(os.listdir(Path())) - - directory = str(Path(plots["Frag_Sizes.txt"].remote_path).parent) - directory = directory.replace("latch:/", "latch:///") - print(directory) - - plots["featurealignment_coverage.txt"] = LatchFile( - "featurealignment_coverage.txt", - f"{directory}/featurealignment_coverage.txt", + try: + d = {} + for f in rplots: + files = latch_listdir(f) + sample = Path(f.remote_path).name + plots = {} + for i in files: + if Path(i).name in [ + "Frag_Sizes.txt", + "featurealignment_coverage.txt", + "Saturation_Plots.txt", + ]: + plots[Path(i).name] = LatchFile(i) + flag = False + for k in plots.keys(): + if "featurealignment_coverage.txt" in k: + flag = True + if flag == False: + cmd = [ + "echo", + "NucleosomeFree\tmononucleosome", + ">", + "featurealignment_coverage.txt", + ] + subprocess.run(" ".join(cmd), shell=True, check=True) + print(os.listdir(Path())) + + directory = str(Path(plots["Frag_Sizes.txt"].remote_path).parent) + directory = directory.replace("latch:/", "latch:///") + print(directory) + + plots["featurealignment_coverage.txt"] = LatchFile( + "featurealignment_coverage.txt", + f"{directory}/featurealignment_coverage.txt", + ) + print(plots) + print("\n") + + d[sample] = plots + + for f in cov_plots: + sample = Path(f.remote_path).name + d[sample]["Cov_Plots"] = f + + obj_list = [] + files = [] + for k in d: + print(k) + o = Registry_Obj( + k, + d[k]["Frag_Sizes.txt"], + d[k]["featurealignment_coverage.txt"], + d[k]["Saturation_Plots.txt"], + d[k]["Cov_Plots"], ) - print(plots) - print("\n") - - d[sample] = plots - - for f in cov_plots: - sample = Path(f.remote_path).name - d[sample]["Cov_Plots"] = f - - obj_list = [] - files = [] - for k in d: - print(k) - o = Registry_Obj( - k, - d[k]["Frag_Sizes.txt"], - d[k]["featurealignment_coverage.txt"], - d[k]["Saturation_Plots.txt"], - d[k]["Cov_Plots"], + obj_list.append(o) + files.append(d[k]["featurealignment_coverage.txt"]) + except Exception as e: + message( + "error", + { + "title": "Could not run Rscript for calculating ATAC seq QC metrics", + "body": f"Error: {str(e)}", + }, ) - obj_list.append(o) - files.append(d[k]["featurealignment_coverage.txt"]) + return [], [] return obj_list, files @@ -273,6 +284,7 @@ def Run_Rscript(map_input: InputMap_ATACQC) -> LatchDir: input_objects [List[InputMap_ATACQC]]: Objects of type InputMap_ATACQC for every sample [LatchDir]: Latch directory containing the data matrices used for visualization per sample. """ + run_flag = map_input.run_flag sample = map_input.sample genome = map_input.genome @@ -286,22 +298,30 @@ def Run_Rscript(map_input: InputMap_ATACQC) -> LatchDir: if not os.path.isdir(local_dir): os.mkdir(local_dir) - # bamfile.download() - # baifile.download() - - cmd_RunRscript = [ - "mamba", - "run", - "-n", - "atacseqqc", - "Rscript", - "/root/latch_metadata/ATACSeqQC_Plots.R", - Path(bamfile.local_path).name, - local_dir, - genome, - ] - print(" ".join(cmd_RunRscript)) - subprocess.run(" ".join(cmd_RunRscript), shell=True, check=True) + + try: + cmd_RunRscript = [ + "mamba", + "run", + "-n", + "atacseqqc", + "Rscript", + "/root/latch_metadata/ATACSeqQC_Plots.R", + Path(bamfile.local_path).name, + local_dir, + genome, + ] + print(" ".join(cmd_RunRscript)) + subprocess.run(" ".join(cmd_RunRscript), shell=True, check=True) + except Exception as e: + message( + "error", + { + "title": "Could not run Rscript for calculating ATAC seq QC metrics", + "body": f"Error: {str(e)}", + }, + ) + return LatchDir(local_dir, os.path.join(outdir.remote_directory, sample)) @@ -319,76 +339,85 @@ def UpdateRegistry(d: typing.List[Registry_Obj], run_name: str) -> str: Outputs: returns the project id """ - target_project_name = "ATAC_Seq_Results" - target_table_name = f"{run_name}_Results" - - account = Account.current() - target_project = next( - ( - project - for project in account.list_registry_projects() - if project.get_display_name() == target_project_name - ), - None, - ) + try: + target_project_name = "ATAC_Seq_Results" + target_table_name = f"{run_name}_Results" - if target_project is None: - with account.update() as account_updater: - account_updater.upsert_registry_project(target_project_name) + account = Account.current() target_project = next( - project - for project in account.list_registry_projects() - if project.get_display_name() == target_project_name + ( + project + for project in account.list_registry_projects() + if project.get_display_name() == target_project_name + ), + None, ) - print("Upserted project") - - target_table = next( - ( - table - for table in target_project.list_tables() - if table.get_display_name() == target_table_name - ), - None, - ) - columns = [ - "sample", - "Fragment_Size_Distribution", - "Feature_Alignment_Coverage", - "Saturation_Curves", - "Cov_Parquet_Files", - ] - col_types = [str, LatchFile, LatchFile, LatchFile, LatchDir] - if target_table == None: - ### Upsert_Table - with target_project.update() as project_updater: - project_updater.upsert_table(target_table_name) + if target_project is None: + with account.update() as account_updater: + account_updater.upsert_registry_project(target_project_name) + target_project = next( + project + for project in account.list_registry_projects() + if project.get_display_name() == target_project_name + ) + print("Upserted project") + target_table = next( - table - for table in target_project.list_tables() - if table.get_display_name() == target_table_name + ( + table + for table in target_project.list_tables() + if table.get_display_name() == target_table_name + ), + None, ) - print("Upserted table") - with target_table.update() as updater: - for i in range(0, len(columns)): - updater.upsert_column(columns[i], type=col_types[i]) - print("Upserted columns") - ctr = len(target_table.get_dataframe()) - print(d) - - with target_table.update() as updater: - for v in d: - updater.upsert_record( - name=str(ctr), - sample=v.sample, - Fragment_Size_Distribution=v.frag_file, - Feature_Alignment_Coverage=v.feat_alignment, - Saturation_Curves=v.sat_curves, - Cov_Parquet_Files=v.cov_parquet, + columns = [ + "sample", + "Fragment_Size_Distribution", + "Feature_Alignment_Coverage", + "Saturation_Curves", + "Cov_Parquet_Files", + ] + col_types = [str, LatchFile, LatchFile, LatchFile, LatchDir] + if target_table == None: + ### Upsert_Table + with target_project.update() as project_updater: + project_updater.upsert_table(target_table_name) + target_table = next( + table + for table in target_project.list_tables() + if table.get_display_name() == target_table_name ) - ctr += 1 + print("Upserted table") + + with target_table.update() as updater: + for i in range(0, len(columns)): + updater.upsert_column(columns[i], type=col_types[i]) + print("Upserted columns") + ctr = len(target_table.get_dataframe()) + print(d) + with target_table.update() as updater: + for v in d: + updater.upsert_record( + name=str(ctr), + sample=v.sample, + Fragment_Size_Distribution=v.frag_file, + Feature_Alignment_Coverage=v.feat_alignment, + Saturation_Curves=v.sat_curves, + Cov_Parquet_Files=v.cov_parquet, + ) + ctr += 1 + except Exception as e: + message( + "error", + { + "title": "Could not update registry tables", + "body": f"Error: {str(e)}", + }, + ) + return "-1" return target_table.id