Skip to content

Commit

Permalink
Refactoring, suggestions from lukaspiel
Browse files Browse the repository at this point in the history
  • Loading branch information
atomprobe-tc committed Aug 18, 2024
1 parent 226e6be commit 8e1eb01
Show file tree
Hide file tree
Showing 10 changed files with 57 additions and 61 deletions.
8 changes: 7 additions & 1 deletion src/pynxtools_em/concepts/mapping_functors_pint.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ def var_path_to_spcfc_path(path: str, instance_identifier: list):


def get_case(arg):
"""Identify which case an instruction from the configuration belongs to.
Each case comes with specific instructions to resolve that are detailed
in the README.md in this source code directory."""
if isinstance(arg, str): # str
return "case_one"
elif isinstance(arg, tuple):
Expand Down Expand Up @@ -261,7 +264,7 @@ def set_value(template: dict, trg: str, src_val: Any, trg_dtype: str = "") -> di
def use_functor(
cmds: list, mdata: fd.FlatDict, prfx_trg: str, ids: list, template: dict
) -> dict:
"""Process the use functor."""
"""Process concept mapping for simple predefined strings and pint quantities."""
for cmd in cmds:
if isinstance(cmd, tuple):
if len(cmd) == 2:
Expand All @@ -284,6 +287,7 @@ def map_functor(
template: dict,
trg_dtype_key: str = "",
) -> dict:
"""Process concept mapping, datatype and unit conversion for quantities."""
for cmd in cmds:
case = get_case(cmd)
if case == "case_one": # str
Expand Down Expand Up @@ -405,6 +409,7 @@ def timestamp_functor(
ids: list,
template: dict,
) -> dict:
"""Process concept mapping and time format conversion."""
for cmd in cmds:
if isinstance(cmd, tuple):
if 2 <= len(cmd) <= 3: # trg, src, timestamp or empty string (meaning utc)
Expand Down Expand Up @@ -437,6 +442,7 @@ def filehash_functor(
ids: list,
template: dict,
) -> dict:
"""Process concept mapping and checksums to add context from which file NeXus content was processed."""
for cmd in cmds:
if isinstance(cmd, tuple):
if len(cmd) == 2:
Expand Down
13 changes: 6 additions & 7 deletions src/pynxtools_em/parsers/image_png_protochips.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ def check_if_zipped_png_protochips(self):
self.supported = True

def get_xml_metadata(self, file, fp):
"""Parse content from the XML payload that PNGs from AXON Studio have."""
try:
fp.seek(0)
with Image.open(fp) as png:
Expand Down Expand Up @@ -205,8 +206,8 @@ def get_xml_metadata(self, file, fp):
def get_file_hash(self, file, fp):
self.tmp["meta"][file]["sha256"] = get_sha256_of_file_content(fp)

def parse_and_normalize(self):
"""Perform actual parsing filling cache self.tmp."""
def parse(self, template: dict) -> dict:
"""Perform actual parsing filling cache."""
if self.supported is True:
print(f"Parsing via Protochips-specific metadata...")
# may need to set self.supported = False on error
Expand All @@ -223,19 +224,17 @@ def parse_and_normalize(self):
f"{self.file_path} metadata within PNG collection processed "
f"successfully ({len(self.tmp['meta'].keys())} PNGs evaluated)."
)
self.process_event_data_em_metadata(template)
self.process_event_data_em_data(template)
else:
print(
f"{self.file_path} is not a Protochips-specific "
f"PNG file that this parser can process !"
)

def process_into_template(self, template: dict) -> dict:
if self.supported is True:
self.process_event_data_em_metadata(template)
self.process_event_data_em_data(template)
return template

def sort_event_data_em(self) -> List:
"""Sort event data by datetime."""
events: List = []
for file_name, mdata in self.tmp["meta"].items():
key = f"MicroscopeControlImageMetadata.MicroscopeDateTime"
Expand Down
8 changes: 0 additions & 8 deletions src/pynxtools_em/parsers/image_tiff.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,3 @@ def get_tags(self, verbose: bool = False):
self.tags = {TAGS[key]: fp.tag[key] for key in fp.tag_v2}
for key, val in self.tags.items():
print(f"{key}, {val}")

def parse_and_normalize(self):
"""Perform actual parsing filling cache self.tmp."""
if self.supported is True:
print(f"Parsing via TiffParser...")
self.get_tags()
else:
print(f"{self.file_path} is not a TIFF file this parser can process !")
2 changes: 2 additions & 0 deletions src/pynxtools_em/parsers/image_tiff_hitachi.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

class HitachiTiffParser(TiffParser):
def __init__(self, file_paths: List[str], entry_id: int = 1, verbose=False):
# TODO::instantiate super.__init__
tif_txt = ["", ""]
if (
len(file_paths) == 2
Expand Down Expand Up @@ -117,6 +118,7 @@ def check_if_tiff_hitachi(self):
self.supported = True

def parse(self, template: dict) -> dict:
"""Perform actual parsing filling cache."""
if self.supported is True:
print(f"Parsing via Hitachi...")
# metadata have at this point already been collected into an fd.FlatDict
Expand Down
1 change: 1 addition & 0 deletions src/pynxtools_em/parsers/image_tiff_jeol.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def check_if_tiff_jeol(self):
)

def parse(self, template: dict) -> dict:
"""Perform actual parsing filling cache."""
if self.supported is True:
print(f"Parsing via JEOL...")
# metadata have at this point already been collected into an fd.FlatDict
Expand Down
12 changes: 5 additions & 7 deletions src/pynxtools_em/parsers/image_tiff_point_electronic.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def __init__(self, file_path: str = "", entry_id: int = 1, verbose: bool = False
self.check_if_tiff_point_electronic()

def xmpmeta_to_flat_dict(self, meta: fd.FlatDict):
"""Flatten point-electronic formatting of XMPMeta data."""
for entry in meta["xmpmeta/RDF/Description"]:
tmp = fd.FlatDict(entry, "/")
for key, obj in tmp.items():
Expand Down Expand Up @@ -123,21 +124,18 @@ def check_if_tiff_point_electronic(self):
f"Parser {self.__class__.__name__} finds no content in {self.file_path} that it supports"
)

def parse_and_normalize(self):
"""Perform actual parsing filling cache self.tmp."""
def parse(self, template: dict) -> dict:
"""Perform actual parsing filling cache."""
if self.supported is True:
print(f"Parsing via point electronic DISS-specific metadata...")
# metadata have at this point already been collected into an fd.FlatDict
self.process_event_data_em_metadata(template)
self.process_event_data_em_data(template)
else:
print(
f"{self.file_path} is not a point electronic DISS-specific "
f"TIFF file that this parser can process !"
)

def process_into_template(self, template: dict) -> dict:
if self.supported is True:
self.process_event_data_em_metadata(template)
self.process_event_data_em_data(template)
return template

def process_event_data_em_data(self, template: dict) -> dict:
Expand Down
2 changes: 1 addition & 1 deletion src/pynxtools_em/parsers/image_tiff_tescan.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def check_if_tiff_tescan(self):
# with TESCAN-specific concept names to make this here more robust

def parse(self, template: dict) -> dict:
"""Perform actual parsing filling cache self.tmp."""
"""Perform actual parsing filling cache."""
if self.supported is True:
print(f"Parsing via TESCAN...")
# metadata have at this point already been collected into an fd.FlatDict
Expand Down
9 changes: 3 additions & 6 deletions src/pynxtools_em/parsers/image_tiff_tfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,21 +154,18 @@ def get_metadata(self):
else:
break

def parse_and_normalize(self):
def parse(self, template: dict) -> dict:
"""Perform actual parsing filling cache self.tmp."""
if self.supported is True:
print(f"Parsing via ThermoFisher-specific metadata...")
self.get_metadata()
self.process_event_data_em_metadata(template)
self.process_event_data_em_data(template)
else:
print(
f"{self.file_path} is not a ThermoFisher-specific "
f"TIFF file that this parser can process !"
)

def process_into_template(self, template: dict) -> dict:
if self.supported is True:
self.process_event_data_em_metadata(template)
self.process_event_data_em_data(template)
return template

def process_event_data_em_data(self, template: dict) -> dict:
Expand Down
32 changes: 15 additions & 17 deletions src/pynxtools_em/parsers/nxs_imgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,30 +54,28 @@ def identify_image_type(self):
return None

def parse(self, template: dict) -> dict:
image_parser_type = self.identify_image_type()
if image_parser_type is None:
parser_type = self.identify_image_type()
if parser_type is None:
print(
f"Parser {self.__class__.__name__} finds no content in {self.file_path} that it supports"
)
return template
print(f"{self.__class__.__name__} identified content as {image_parser_type}")
print(f"{self.__class__.__name__} identified content as {parser_type}")
# see also comments for respective nxs_pyxem parser
# and its interaction with tech-partner-specific hfive_* parsers
if image_parser_type == "tiff_tfs":
if parser_type == "tiff_tfs":
tfs = TfsTiffParser(self.file_path, self.entry_id)
tfs.parse_and_normalize()
tfs.process_into_template(template)
elif image_parser_type == "tiff_zeiss":
tfs.parse(template)
elif parser_type == "tiff_zeiss":
zss = ZeissTiffParser(self.file_path, self.entry_id)
zss.parse(template)
elif image_parser_type == "tiff_point_electronic":
pe = PointElectronicTiffParser(self.file_path, self.entry_id)
pe.parse_and_normalize()
pe.process_into_template(template)
elif image_parser_type == "set_of_zipped_png_protochips":
axon = ProtochipsPngSetParser(self.file_path, self.entry_id)
axon.parse_and_normalize()
axon.process_into_template(template)
# add here further specific content (sub-)parsers for formats from other
# tech partner or other custom parsing of images
elif parser_type == "tiff_point_electronic":
pel = PointElectronicTiffParser(self.file_path, self.entry_id)
pel.parse(template)
elif parser_type == "set_of_zipped_png_protochips":
axn = ProtochipsPngSetParser(self.file_path, self.entry_id)
axn.parse(template)
else:
print(f"No parser available for image_parser_type {parser_type}")
# logger.warning
return template
31 changes: 17 additions & 14 deletions src/pynxtools_em/parsers/rsciio_bruker.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def __init__(self, file_path: str = ""):
self.check_if_supported()

def check_if_supported(self):
"""Check if provided content matches Bruker concepts."""
try:
self.objs = bruker.file_reader(self.file_path)
# TODO::what to do if the content of the file is larger than the available
Expand All @@ -44,38 +45,40 @@ def check_if_supported(self):
# in the template and stream out accordingly
self.supported = True
except IOError:
print(f"Loading {self.file_path} using {self.__name__} is not supported !")
print(f"Loading {self.file_path} using Bruker is not supported !")

def parse_and_normalize(self):
"""Perform actual parsing filling cache self.tmp."""
def parse_and_normalize(self, template: dict) -> dict:
"""Perform actual parsing filling cache."""
if self.supported is True:
print(f"Parsing with {self.__name__}...")
self.tech_partner_to_nexus_normalization()
print(f"Parsing via Bruker...")
self.normalize_eds_content(template)
self.normalize_eels_content(template)
else:
print(
f"{self.file_path} is not a Bruker-specific "
f"BCF file that this parser can process !"
)
return template

def tech_partner_to_nexus_normalization(self):
"""Translate tech partner concepts to NeXus concepts."""
self.normalize_eds_content()
self.normalize_eels_content()

def normalize_eds_content(self):
pass
def normalize_eds_content(self, template: dict) -> dict:
"""TODO implementation."""
return template

def normalize_eels_content(self):
pass
def normalize_eels_content(self, template: dict) -> dict:
"""TODO implementation."""
return template

def process_into_template(self, template: dict) -> dict:
"""TODO implementation."""
if self.supported is True:
self.process_event_data_em_metadata(template)
self.process_event_data_em_data(template)
return template

def process_event_data_em_metadata(self, template: dict) -> dict:
"""TODO implementation."""
return template

def process_event_data_em_data(self, template: dict) -> dict:
"""TODO implementation."""
return template

0 comments on commit 8e1eb01

Please sign in to comment.