Skip to content

Commit

Permalink
(temporary) pubmed plugin - new workflow update
Browse files Browse the repository at this point in the history
step 1
  • Loading branch information
ariannamorettj committed Apr 2, 2024
1 parent 23f4c56 commit ae474d8
Show file tree
Hide file tree
Showing 3 changed files with 748 additions and 351 deletions.
133 changes: 94 additions & 39 deletions oc_ds_converter/run/pubmed_process_new.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import os.path
import sys
from argparse import ArgumentParser
from filelock import FileLock

from datetime import datetime
from pathlib import Path
from os.path import exists
Expand Down Expand Up @@ -62,7 +64,7 @@ def preprocess(jalc_json_dir:str, publishers_filepath:str, orcid_doi_filepath:st

preprocessed_citations_dir = csv_dir + "_citations"
if not os.path.exists(preprocessed_citations_dir):
makedirs(preprocessed_citations_dir)
os.makedirs(preprocessed_citations_dir)

if verbose:
if publishers_filepath or orcid_doi_filepath or wanted_doi_filepath:
Expand Down Expand Up @@ -149,6 +151,84 @@ def preprocess(jalc_json_dir:str, publishers_filepath:str, orcid_doi_filepath:st
storage_manager = get_storage_manager(storage_path, redis_storage_manager, testing=testing)
storage_manager.delete_storage()


def find_missing_chuncks(list_of_tuples, interval):
all_missing_chunks = []
first_row_to_be_processed = 0

if len(list_of_tuples) < 1:
pass

elif len(list_of_tuples) == 1:
# se contiene un solo elemento, iniziare la distribuzione dall'ultimo elemento + 1
# per definizione, non ci sono missing chunks
first_row_to_be_processed = list_of_tuples[0][1] + 1
else:
past_interval = list_of_tuples[0][1] - list_of_tuples[0][0]
if past_interval != interval:
return None
tuple_0s = [x[0] for x in list_of_tuples if x[0] != 0]
tuple_1s = [x[1] for x in list_of_tuples]
missing_tuple_1s = [num - 1 for num in tuple_0s if num - 1 not in tuple_1s]
all_chunks = list_of_tuples

if missing_tuple_1s:
all_missing_chunks = []
while missing_tuple_1s:
missing_chunks = [(x - interval, x) for x in missing_tuple_1s]
all_missing_chunks.extend(missing_chunks)
all_chunks = set(all_missing_chunks + list_of_tuples)
tuple_0s = [x[0] for x in all_chunks if x[0] != 0]
tuple_1s = [x[1] for x in all_chunks]
missing_tuple_1s = [num - 1 for num in tuple_0s if num - 1 not in tuple_1s]

all_tuple_1s = [x[1] for x in all_chunks]
last_row_assigned = max(all_tuple_1s)
first_row_to_be_processed = last_row_assigned + 1

return all_missing_chunks, first_row_to_be_processed

def assign_chunks(cache, lock, n_processes, interval, n_total_rows) -> dict:
intervals_dict = {}
# aprire la cache, strutturata come dizionario di liste di tuple, non più come stringa
if os.path.exists(cache):
if not lock:
lock = FileLock(cache + ".lock")
with lock:
with open(cache, "r", encoding="utf-8") as c:
try:
cache_dict = json.load(c)
except:
cache_dict = dict()
# controllare se il file non è vuoto
if not cache_dict.get("first_iteration") and not cache_dict.get("second_iteration"):
# allo stato attuale, niente è stato processato. Si inizia l'assegnazione del primo processo (produzione tabelle meta) partendo dall'inizio del dump
starting_iteration = "first"

firts_row_position_to_be_processed = 0
list_of_rows_to_be_processed = list(range(firts_row_position_to_be_processed, 15))
firts_row_to_be_processed = list_of_rows_to_be_processed[firts_row_position_to_be_processed]
process_to_be_assigned = 7
interval = 3
row_ranges = []

for i, n in enumerate(range(process_to_be_assigned)):
while len(list_of_rows_to_be_processed) > firts_row_position_to_be_processed + interval:
last_row_position_to_be_processed = firts_row_position_to_be_processed + interval - 1
row_range_assinged = (list_of_rows_to_be_processed[firts_row_position_to_be_processed],
list_of_rows_to_be_processed[last_row_position_to_be_processed])
row_ranges.append(row_range_assinged)
del list_of_rows_to_be_processed[
firts_row_position_to_be_processed:last_row_position_to_be_processed + 1]

pass
elif cache_dict.get("first_iteration") and not cache_dict.get("second_iteration"):
pass
elif cache_dict.get("first_iteration") and cache_dict.get("second_iteration"):
pass

return intervals_dict

def get_citations_and_metadata(zip_file: str, preprocessed_citations_dir: str, csv_dir: str,
orcid_index: str,
doi_csv: str, publishers_filepath_jalc: str, storage_path: str,
Expand Down Expand Up @@ -181,7 +261,7 @@ def get_citations_and_metadata(zip_file: str, preprocessed_citations_dir: str, c
with open(cache, "w", encoding="utf-8") as c:
json.dump(cache_dict, c)

# skip if in cache
# skip if in cache DA CAMBIARE CON ASSEGNAZIONE DEI TASK
filename = Path(zip_file).name
if cache_dict.get("first_iteration"):
if is_first_iteration and filename in cache_dict["first_iteration"]:
Expand Down Expand Up @@ -236,7 +316,7 @@ def get_all_redis_ids_and_save_updates(sli_da, is_first_iteration_par: bool):
ent_all_br = jalc_csv.extract_all_ids(entity, True)'''
if not is_first_iteration_par:
ent_all_br = jalc_csv.extract_all_ids(entity, False)
all_br.extend(ent_all_br)
all_br = all_br + ent_all_br
redis_validity_values_br = jalc_csv.get_reids_validity_list(all_br)
jalc_csv.update_redis_values(redis_validity_values_br)

Expand Down Expand Up @@ -271,54 +351,28 @@ def save_files(ent_list, citation_list, is_first_iteration_par: bool):
return ent_list, citation_list

def task_done(is_first_iteration_par: bool) -> None:
# AGGIORNARE LA CACHE SCRIVENDO L'INTERVALLO COMPLETATO
try:

if is_first_iteration_par and "first_iteration" not in cache_dict.keys():
cache_dict["first_iteration"] = set()

if not is_first_iteration_par and "second_iteration" not in cache_dict.keys():
cache_dict["second_iteration"] = set()

for k,v in cache_dict.items():
cache_dict[k] = set(v)

if is_first_iteration_par:
cache_dict["first_iteration"].add(Path(zip_file).name)

if not is_first_iteration_par:
cache_dict["second_iteration"].add(Path(zip_file).name)


with lock:
with open(cache, 'r', encoding='utf-8') as aux_file:
cur_cache_dict = json.load(aux_file)
if is_first_iteration_par:
if "first_iteration" not in cur_cache_dict.keys():
cur_cache_dict["first_iteration"] = list()
cur_cache_dict["first_iteration"].append(assigned_chunk)

for k,v in cur_cache_dict.items():
cur_cache_dict[k] = set(v)
if not cache_dict.get(k) and cur_cache_dict.get(k):
cache_dict[k] = v
elif cache_dict[k] != v:
zip_files_processed_values_list = cache_dict[k]
cur_zip_files_processed_values_list = cur_cache_dict[k]

#unione set e poi lista
list_updated = list(cur_zip_files_processed_values_list.union(zip_files_processed_values_list))
cache_dict[k] = list_updated

for k,v in cache_dict.items():
if k not in cur_cache_dict:
cur_cache_dict[k] = v

for k,v in cache_dict.items():
if isinstance(v, set):
cache_dict[k] = list(v)
else:
if "second_iteration" not in cur_cache_dict.keys():
cur_cache_dict["second_iteration"] = list()
cur_cache_dict["second_iteration"].append(assigned_chunk)

with open(cache, 'w', encoding='utf-8') as aux_file:
json.dump(cache_dict, aux_file)

except Exception as e:
print(e)


if is_first_iteration:
# prima l'ultimo file va processato
for entity in tqdm(source_dict):
Expand Down Expand Up @@ -382,6 +436,7 @@ def task_done(is_first_iteration_par: bool) -> None:
citation["cited"] = target_id
index_citations_to_csv.append(citation)
save_files(data_cited, index_citations_to_csv, False)

def get_storage_manager(storage_path: str, redis_storage_manager: bool, testing: bool):
if not redis_storage_manager:
if storage_path:
Expand Down
Loading

0 comments on commit ae474d8

Please sign in to comment.