Skip to content

Commit

Permalink
Merge pull request #4 from martasoricetti/main
Browse files Browse the repository at this point in the history
Fix jalc_processing.py (no venues' ids validation) and fix jalc_process.py (citing entities' dois save in temporary storage manager)
  • Loading branch information
ivanhb committed Oct 25, 2023
2 parents b7f0c75 + 903c9b9 commit a5316f0
Show file tree
Hide file tree
Showing 8 changed files with 946 additions and 140 deletions.
5 changes: 0 additions & 5 deletions oc_ds_converter/crossref/crossref_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,6 @@ def get_id_manager(self, schema_or_id, id_man_dict):
id_man = id_man_dict.get(schema)
return id_man

def normalise_any_id(self, id_with_prefix):
id_man = self.doi_m
id_no_pref = ":".join(id_with_prefix.split(":")[1:])
norm_id_w_pref = id_man.normalise(id_no_pref, include_prefix=True)
return norm_id_w_pref

def dict_to_cache(self, dict_to_be_saved, path):
path = Path(path)
Expand Down
127 changes: 54 additions & 73 deletions oc_ds_converter/jalc/jalc_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
class JalcProcessing(RaProcessor):

def __init__(self, orcid_index: str = None, doi_csv: str = None, publishers_filepath_jalc: str = None, testing: bool = True, storage_manager: Optional[StorageManager] = None, citing=True):
"""This class is responsible for producing CSV tables to be used as input for the META process
aimed at ingesting data from the sources."""
super(JalcProcessing, self).__init__(orcid_index, doi_csv)
self.citing = citing
if storage_manager is None:
Expand All @@ -61,12 +63,13 @@ def __init__(self, orcid_index: str = None, doi_csv: str = None, publishers_file
self.jid_m = JIDManager(storage_manager=self.storage_manager)

self.venue_id_man_dict = {"issn":self.issn_m, "jid":self.jid_m}
# Temporary storage managers : all data must be stored in tmp storage manager and passed all together to the
# main storage_manager only once the full file is processed. Checks must be done both on tmp and in
# storage_manager, so that in case the process breaks while processing a file which does not complete (so
# without writing the final file) all the data concerning the ids are not stored. Otherwise, the ids saved in
# a storage_manager db would be considered to have been processed and thus would be ignored by the process
# and lost.

'''Temporary storage managers : all data must be stored in tmp storage manager and passed all together to the
main storage_manager only once a full file is processed. Checks must be done both on tmp and in
storage_manager, so that in case the process breaks while processing a file which does not complete (so
without writing the final file) all the data concerning the ids are not stored. Otherwise, the ids saved in
a storage_manager db would be considered to have been processed and thus would be ignored by the process
and lost.'''

self.tmp_doi_m = DOIManager(storage_manager=self.temporary_manager)
self.tmp_jid_m = JIDManager(storage_manager=self.temporary_manager)
Expand All @@ -75,15 +78,14 @@ def __init__(self, orcid_index: str = None, doi_csv: str = None, publishers_file


if testing:
self.BR_redis= fakeredis.FakeStrictRedis()
self.BR_redis = fakeredis.FakeStrictRedis()

else:
self.BR_redis = RedisDataSource("DB-META-BR")

self._redis_values_br = []

if not publishers_filepath_jalc:
#we have removed the creation of the file if it is not passed as input
self.publishers_filepath = None
else:
self.publishers_filepath = publishers_filepath_jalc
Expand All @@ -106,22 +108,23 @@ def update_redis_values(self, br):
self._redis_values_br = br

def validated_as(self, id):
# Check if the validity was already retrieved and thus
# a) if it is now saved either in the in-memory database, which only concerns data validated
# during the current file processing;
# b) or if it is now saved in the storage_manager database, which only concerns data validated
# during the previous files processing.
# In memory db is checked first because the dimension is smaller and the check is faster and
# Because we assume that it is more likely to find the same ids in close positions, e.g.: same
# citing id in several citations with different cited ids.
"""Check if the validity was already retrieved and thus
a) if it is now saved either in the in-memory database, which only concerns data validated
during the current file processing;
b) or if it is now saved in the storage_manager database, which only concerns data validated
during the previous files processing.
In memory db is checked first because the dimension is smaller and the check is faster and
because we assume that it is more likely to find the same ids in close positions, e.g.: same
citing id in several citations with different cited ids.
In conclusion, if the id is found with this method, it means that this has been found in the dump we are processing"""
validity_value = self.tmp_doi_m.validated_as_id(id)
if validity_value is None:
validity_value = self.doi_m.validated_as_id(id)
return validity_value
#se incontro l'identificativo qua vuol dire che è già stato trovato all'interno del mio stesso dump


def get_id_manager(self, schema_or_id, id_man_dict):
"""Given as input the string of a schema (e.g.:'pmid') and a dictionary mapping strings of
"""Given as input the string of a schema (e.g.:'jid') and a dictionary mapping strings of
the schemas to their id managers, the method returns the correct id manager. Note that each
instance of the Preprocessing class needs its own instances of the id managers, in order to
avoid conflicts while validating data"""
Expand All @@ -133,12 +136,6 @@ def get_id_manager(self, schema_or_id, id_man_dict):
id_man = id_man_dict.get(schema)
return id_man

def normalise_any_id(self, id_with_prefix):
id_man = self.doi_m
id_no_pref = ":".join(id_with_prefix.split(":")[1:])
norm_id_w_pref = id_man.normalise(id_no_pref, include_prefix=True)
return norm_id_w_pref

def dict_to_cache(self, dict_to_be_saved, path):
path = Path(path)
parent_dir_path = path.parent.absolute()
Expand All @@ -148,10 +145,11 @@ def dict_to_cache(self, dict_to_be_saved, path):
json.dump(dict_to_be_saved, fd, ensure_ascii=False, indent=4)

def csv_creator(self, item:dict) -> dict:
"""This is the method that actually creates the csv table for Meta process given an entity dictionary"""
doi = item["doi"]
if (doi and self.doi_set and doi in self.doi_set) or (doi and not self.doi_set):
norm_id = self.doi_m.normalise(doi, include_prefix=True)
title = self.get_ja(item['title_list'])[0]['title'] if 'title_list' in item else '' # Future Water Availability in the Asian Monsoon Region: A Case Study in Indonesia (no available in japanese)
title = self.get_ja(item['title_list'])[0]['title'] if 'title_list' in item else ''
authors_list = self.get_authors(item)
authors_string_list, editors_string_list = self.get_agents_strings_list(doi, authors_list)
issue = item['issue'] if 'issue' in item else ''
Expand All @@ -175,7 +173,10 @@ def csv_creator(self, item:dict) -> dict:


@classmethod
def get_ja(cls, field: list) -> list: # [{'publisher_name': '筑波大学農林技術センター', 'lang': 'ja'}]
def get_ja(cls, field: list) -> list:
"""This method accepts as parameter a list containing dictionaries with the key "lang".
If a metadata is originally furnished both in the original language and in the english translation,
the method returns the japanese version, otherwise the english translation is returned."""
if all('lang' in item for item in field):
ja = [item for item in field if item['lang'] == 'ja']
ja = list(filter(lambda x: x['type'] != 'before' if 'type' in x else x, ja))
Expand All @@ -199,11 +200,11 @@ def get_jalc_pages(self, item: dict) -> str:


def get_publisher_name(self, item: dict) -> str:
'''
This function acts differently for citing and cited entities.
If it processes a citing entity it simply returns a string with the name of the publisher if it has been provided in the input dictionary, giving priority to the japanese name. If there is no publisher, the output is an empty string.
"""This method acts differently for citing and cited entities.
If it processes a citing entity it simply returns a string with the name of the publisher if it has been provided in the input dictionary, giving priority to the japanese name.
If there is no publisher, the output is an empty string.
When it processes a cited entity, if a file containing a mapping of publishers' prefixes, names and crossref ids is provided, it extracts the prefix from the doi of the cited publication and checks if it is present in the mapping.
If yes, it returns the linked publisher's name, otherwise an empty string. '''
If yes, it returns the linked publisher's name, otherwise an empty string. """
if self.citing:
publisher = self.get_ja(item['publisher_list'])[0]['publisher_name'] if 'publisher_list' in item else ''
elif not self.citing and self.publishers_mapping:
Expand Down Expand Up @@ -243,6 +244,7 @@ def get_authors(self, data: dict) -> list:

def get_venue(self, data: dict) -> str:
venue_name = ''
journal_ids = []
if 'journal_title_name_list' in data:
candidate_venues = self.get_ja(data['journal_title_name_list'])
if candidate_venues:
Expand All @@ -252,38 +254,17 @@ def get_venue(self, data: dict) -> str:
elif candidate_venues:
venue_name = candidate_venues[0]['journal_title_name']
if 'journal_id_list' in data:
# validation of venue ids
journal_ids = self.to_validated_venue_id_list(data['journal_id_list'])
else:
journal_ids = list()
for v in data['journal_id_list']:
if isinstance(v, dict):
if v.get("journal_id"):
if v.get("type").lower().strip() in ["issn", "jid"]:
schema = v.get("type").lower().strip()
venue_id = v.get("journal_id")
tmp_id_man = self.get_id_manager(schema, self.venue_tmp_id_man_dict)
if tmp_id_man:
norm_id = tmp_id_man.normalise(venue_id, include_prefix=True)
journal_ids.append(norm_id)
return f"{venue_name} [{' '.join(journal_ids)}]" if journal_ids else venue_name
# 'Journal of Developments in Sustainable Agriculture [issn:1880-3016 issn:1880-3024 jid:jdsa]'

def to_validated_venue_id_list(self, journal_id_list: list):
valid_identifiers = list()
for v in journal_id_list:
if isinstance(v, dict):
if v.get("journal_id"):
if v.get("type").lower().strip() in ["issn", "jid"]:
schema = v.get("type").lower().strip()
id = v.get("journal_id")
tmp_id_man = self.get_id_manager(schema, self.venue_tmp_id_man_dict)
if tmp_id_man:
if tmp_id_man == self.tmp_jid_m:
norm_id = tmp_id_man.normalise(id, include_prefix=True)
# if self.BR_redis.get(norm_id):
if norm_id and norm_id in self._redis_values_br:
tmp_id_man.storage_manager.set_value(norm_id, True) # In questo modo l'id presente in redis viene inserito anche nello storage e risulta già
# preso in considerazione negli step successivi
valid_identifiers.append(norm_id)
elif norm_id and tmp_id_man.is_valid(norm_id):
valid_identifiers.append(norm_id)
else:
norm_id = tmp_id_man.normalise(id, include_prefix=True)
if tmp_id_man.is_valid(norm_id):
valid_identifiers.append(norm_id)
return sorted(valid_identifiers)



@classmethod
Expand Down Expand Up @@ -319,40 +300,41 @@ def get_pub_date(cls, data) -> str:
pub_date_list.append(day)
return '-'.join(pub_date_list)

#id_dict = {"identifier": "doi:10.11221/jima.51.86", "is_valid": None}

def to_validated_id_list(self, norm_id):
"""this method takes in input a normalized DOI identifier and the information of validity and returns a list valid and existent ids with prefixes.
For each id, a first validation try is made by checking its presence in META db. If the id is not in META db yet,
a second attempt is made by using the specific id-schema API"""
#if self.BR_redis.get(norm_id):

valid_id_list = []
if norm_id in self._redis_values_br:
self.tmp_doi_m.storage_manager.set_value(norm_id, True) #In questo modo l'id presente in redis viene inserito anche nello storage e risulta già
# preso in considerazione negli step successivi
self.tmp_doi_m.storage_manager.set_value(norm_id, True)
valid_id_list.append(norm_id)
# if the id is not in redis db, validate it before appending
elif self.tmp_doi_m.is_valid(norm_id):#In questo modo l'id presente in redis viene inserito anche nello storage e risulta già
# preso in considerazione negli step successivi
elif self.tmp_doi_m.is_valid(norm_id):
valid_id_list.append(norm_id)
return valid_id_list

def memory_to_storage(self):
kv_in_memory = self.temporary_manager.get_validity_list_of_tuples()
#if kv_in_memory:
self.storage_manager.set_multi_value(kv_in_memory)
self.temporary_manager.delete_storage()

def extract_all_ids(self, citation, is_first_iteration: bool):
if is_first_iteration:
"""Given an entity dictionary, this method extracts all the DOIs.
If the parameter "is_first_iteration" is True, just the DOI of the citing entity is retrieved, while
if it is False, all the DOIs of cited entities are extracted."""
'''if is_first_iteration:
list_id_citing = list()
d1_br = citation["data"]["doi"]
norm_id = self.doi_m.normalise(d1_br, include_prefix=True)
if norm_id:
list_id_citing.append(norm_id)
return list_id_citing
#for citing entities the validation is not necessary, so we add the normalized doi as valid to the temporary storage manager
#self.tmp_doi_m.storage_manager.set_value(norm_id, True)
return list_id_citing'''

# in questo modo sto raccogliendo tutti gli id dei citati per un dato citante
else:
if not is_first_iteration:
all_br = list()
d2_br = [x["doi"] for x in citation["data"]["citation_list"] if x.get("doi")]
for d in d2_br:
Expand All @@ -363,7 +345,6 @@ def extract_all_ids(self, citation, is_first_iteration: bool):

def get_reids_validity_list(self, id_list):
valid_br_ids = []
# DO NOT UPDATED (REDIS RETRIEVAL METHOD HERE)
validity_list_br = self.BR_redis.mget(id_list)
for i, e in enumerate(id_list):
if validity_list_br[i]:
Expand Down
2 changes: 1 addition & 1 deletion oc_ds_converter/run/crossref_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def get_all_redis_ids_and_save_updates(sli_da, is_first_iteration_par:bool):
has_doi_references = True if [x for x in entity["reference"] if x.get("DOI")] else False
if has_doi_references:
if is_first_iteration_par:
ent_all_br, ent_all_ra = crossref_csv.extract_all_ids(entity, True)
ent_all_br, ent_all_ra = crossref_csv.extract_all_ids(entity, True)
else:
ent_all_br, ent_all_ra = crossref_csv.extract_all_ids(entity, False)
all_br.extend(ent_all_br)
Expand Down
Loading

0 comments on commit a5316f0

Please sign in to comment.