Skip to content

Commit

Permalink
refactor(excel2json-lists): remove file hierarchy from code (#1094)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nora-Olivia-Ammann authored Aug 12, 2024
1 parent 56bd84d commit 3b554a3
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 233 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@
import pandas as pd


@dataclass
class ExcelFile:
filename: str
sheets: list[ExcelSheet]


@dataclass
class ExcelSheet:
excel_name: str
Expand Down
250 changes: 113 additions & 137 deletions src/dsp_tools/commands/excel2json/new_lists.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
from dsp_tools.commands.excel2json.models.input_error_lists import MultipleListPerSheetProblem
from dsp_tools.commands.excel2json.models.input_error_lists import NodesPerRowProblem
from dsp_tools.commands.excel2json.models.input_error_lists import SheetProblem
from dsp_tools.commands.excel2json.models.new_lists_deserialise import ExcelFile
from dsp_tools.commands.excel2json.models.new_lists_deserialise import ExcelSheet
from dsp_tools.commands.excel2json.models.new_lists_serialise import ListNode
from dsp_tools.commands.excel2json.models.new_lists_serialise import ListRoot
Expand All @@ -59,9 +58,9 @@ def new_excel2lists(
Returns:
a tuple consisting of the "lists" section as Python list, and the success status (True if everything went well)
"""
list_files = _parse_files(excelfolder)
sheet_list = _parse_files(excelfolder)

finished_lists = _make_serialised_lists(list_files)
finished_lists = _make_serialised_lists(sheet_list)
validate_lists_section_with_schema(lists_section=finished_lists)

if path_to_output_file:
Expand All @@ -72,88 +71,79 @@ def new_excel2lists(
return finished_lists, True


def _parse_files(excelfolder: Path | str) -> list[ExcelFile]:
def _parse_files(excelfolder: Path | str) -> list[ExcelSheet]:
file_names = [file for file in Path(excelfolder).glob("*list*.xlsx", case_sensitive=False) if _non_hidden(file)]
all_files = []
all_sheets = []
for file in file_names:
sheets = [
ExcelSheet(excel_name=str(file), sheet_name=name, df=df)
for name, df in read_and_clean_all_sheets(file).items()
]
all_files.append(ExcelFile(filename=str(file), sheets=sheets))
return all_files
all_sheets.extend(
[
ExcelSheet(excel_name=str(file), sheet_name=name, df=df)
for name, df in read_and_clean_all_sheets(file).items()
]
)
return all_sheets


def _non_hidden(path: Path) -> bool:
return not regex.search(r"^(\.|~\$).+", path.name)


def _make_serialised_lists(list_files: list[ExcelFile]) -> list[dict[str, Any]]:
list_files = _prepare_dfs(list_files)
all_lists = _make_all_lists(list_files)
def _make_serialised_lists(sheet_list: list[ExcelSheet]) -> list[dict[str, Any]]:
sheet_list = _prepare_dfs(sheet_list)
all_lists = _make_all_lists(sheet_list)
if isinstance(all_lists, ListCreationProblem):
msg = all_lists.execute_error_protocol()
logger.error(msg)
raise InputError(msg)
return [list_.to_dict() for list_ in all_lists]


def _make_all_lists(list_files: list[ExcelFile]) -> list[ListRoot] | ListCreationProblem:
def _make_all_lists(sheet_list: list[ExcelSheet]) -> list[ListRoot] | ListCreationProblem:
good_lists = []
problem_lists: list[SheetProblem] = []
for file in list_files:
for sheet in file.sheets:
if isinstance(new_list := _make_one_list(sheet), ListRoot):
good_lists.append(new_list)
else:
problem_lists.append(new_list)
for sheet in sheet_list:
if isinstance(new_list := _make_one_list(sheet), ListRoot):
good_lists.append(new_list)
else:
problem_lists.append(new_list)
if problem_lists:
return ListCreationProblem([CollectedSheetProblems(problem_lists)])
return good_lists


def _prepare_dfs(list_files: list[ExcelFile]) -> list[ExcelFile]:
list_files = _add_id_optional_column_if_not_exists(list_files)
_make_all_formal_excel_compliance_checks(list_files)
return _construct_ids(list_files)


def _add_id_optional_column_if_not_exists(list_files: list[ExcelFile]) -> list[ExcelFile]:
all_files = []
for file in list_files:
all_sheets = []
for sheet in file.sheets:
if "id (optional)" not in sheet.df.columns:
df = sheet.df
df["id (optional)"] = pd.NA
all_sheets.append(ExcelSheet(excel_name=file.filename, sheet_name=sheet.sheet_name, df=df))
else:
all_sheets.append(sheet)
all_files.append(ExcelFile(filename=file.filename, sheets=all_sheets))
return all_files


def _construct_ids(list_files: list[ExcelFile]) -> list[ExcelFile]:
all_files = []
for file in list_files:
all_sheets = []
for sheet in file.sheets:
df = _complete_id_one_df(sheet.df, _get_preferred_language(sheet.df.columns))
all_sheets.append(ExcelSheet(excel_name=file.filename, sheet_name=sheet.sheet_name, df=df))
all_files.append(ExcelFile(filename=file.filename, sheets=all_sheets))
all_files = _resolve_duplicate_ids_all_excels(all_files)
return _fill_parent_id_col_all_excels(all_files)


def _fill_parent_id_col_all_excels(list_files: list[ExcelFile]) -> list[ExcelFile]:
all_files = []
for file in list_files:
all_sheets = []
for sheet in file.sheets:
df = _fill_parent_id_col_one_df(sheet.df, _get_preferred_language(sheet.df.columns))
all_sheets.append(ExcelSheet(excel_name=file.filename, sheet_name=sheet.sheet_name, df=df))
all_files.append(ExcelFile(filename=file.filename, sheets=all_sheets))
return all_files
def _prepare_dfs(sheet_list: list[ExcelSheet]) -> list[ExcelSheet]:
sheet_list = _add_id_optional_column_if_not_exists(sheet_list)
_make_all_formal_excel_compliance_checks(sheet_list)
return _construct_ids(sheet_list)


def _add_id_optional_column_if_not_exists(sheet_list: list[ExcelSheet]) -> list[ExcelSheet]:
all_sheets = []
for sheet in sheet_list:
if "id (optional)" not in sheet.df.columns:
df = sheet.df
df["id (optional)"] = pd.NA
all_sheets.append(ExcelSheet(excel_name=sheet.excel_name, sheet_name=sheet.sheet_name, df=df))
else:
all_sheets.append(sheet)
return all_sheets


def _construct_ids(sheet_list: list[ExcelSheet]) -> list[ExcelSheet]:
all_sheets = []
for sheet in sheet_list:
df = _complete_id_one_df(sheet.df, _get_preferred_language(sheet.df.columns))
all_sheets.append(ExcelSheet(excel_name=sheet.excel_name, sheet_name=sheet.sheet_name, df=df))
all_sheets = _resolve_duplicate_ids_all_excels(all_sheets)
return _fill_parent_id_col_all_excels(all_sheets)


def _fill_parent_id_col_all_excels(sheet_list: list[ExcelSheet]) -> list[ExcelSheet]:
all_sheets = []
for sheet in sheet_list:
df = _fill_parent_id_col_one_df(sheet.df, _get_preferred_language(sheet.df.columns))
all_sheets.append(ExcelSheet(excel_name=sheet.excel_name, sheet_name=sheet.sheet_name, df=df))
return all_sheets


def _fill_parent_id_col_one_df(df: pd.DataFrame, preferred_language: str) -> pd.DataFrame:
Expand All @@ -171,30 +161,26 @@ def _fill_parent_id_col_one_df(df: pd.DataFrame, preferred_language: str) -> pd.
return df


def _resolve_duplicate_ids_all_excels(list_files: list[ExcelFile]) -> list[ExcelFile]:
def _resolve_duplicate_ids_all_excels(sheet_list: list[ExcelSheet]) -> list[ExcelSheet]:
ids = []
for file in list_files:
for sheet in file.sheets:
ids.extend(sheet.df["id"].tolist())
for sheet in sheet_list:
ids.extend(sheet.df["id"].tolist())
counter = Counter(ids)
if duplicate_ids := [item for item, count in counter.items() if count > 1]:
return _remove_duplicate_ids_in_all_excels(duplicate_ids, list_files)
return list_files
return _remove_duplicate_ids_in_all_excels(duplicate_ids, sheet_list)
return sheet_list


def _remove_duplicate_ids_in_all_excels(duplicate_ids: list[str], list_files: list[ExcelFile]) -> list[ExcelFile]:
all_files = []
for file in list_files:
all_sheets = []
for sheet in file.sheets:
preferred_lang = _get_preferred_language(sheet.df.columns)
df = sheet.df
for i, row in df.iterrows():
if row["id"] in duplicate_ids and pd.isna(row["id (optional)"]):
df.at[i, "id"] = _construct_non_duplicate_id_string(df.iloc[int(str(i))], preferred_lang)
all_sheets.append(ExcelSheet(file.filename, sheet.sheet_name, df))
all_files.append(ExcelFile(file.filename, all_sheets))
return list_files
def _remove_duplicate_ids_in_all_excels(duplicate_ids: list[str], sheet_list: list[ExcelSheet]) -> list[ExcelSheet]:
all_sheets = []
for sheet in sheet_list:
preferred_lang = _get_preferred_language(sheet.df.columns)
df = sheet.df
for i, row in df.iterrows():
if row["id"] in duplicate_ids and pd.isna(row["id (optional)"]):
df.at[i, "id"] = _construct_non_duplicate_id_string(df.iloc[int(str(i))], preferred_lang)
all_sheets.append(ExcelSheet(sheet.excel_name, sheet.sheet_name, df))
return sheet_list


def _complete_id_one_df(df: pd.DataFrame, preferred_language: str) -> pd.DataFrame:
Expand Down Expand Up @@ -369,60 +355,53 @@ def _get_preferred_language(columns: pd.Index[str], ending: str = r"(\d+|list)")
raise InputError(msg)


def _make_all_formal_excel_compliance_checks(list_files: list[ExcelFile]) -> None:
def _make_all_formal_excel_compliance_checks(sheet_list: list[ExcelSheet]) -> None:
"""Check if the excel files are compliant with the expected format."""
# These functions must be called in this order,
# as some of the following checks only work if the previous have passed.
_check_duplicates_all_excels(list_files)
_check_for_unique_list_names(list_files)
_make_shape_compliance_all_excels(list_files)
_make_all_content_compliance_checks_all_excels(list_files)
_check_duplicates_all_excels(sheet_list)
_check_for_unique_list_names(sheet_list)
_make_shape_compliance_all_excels(sheet_list)
_make_all_content_compliance_checks_all_excels(sheet_list)


def _check_duplicates_all_excels(list_files: list[ExcelFile]) -> None:
def _check_duplicates_all_excels(sheet_list: list[ExcelSheet]) -> None:
"""
Check if the excel files contain duplicates with regard to the node names,
and if the custom IDs are unique across all excel files.
A duplicate in the node names is defined as several rows with the same entries in the columns with the node names.
Args:
list_files: class instances representing an excel file with sheets
sheet_list: class instances representing an excel file with sheets
Raises:
InputError: If any complete duplicates are found in the excel files.
"""
problems: list[Problem] = []
duplicate_problems: list[SheetProblem] = []
for file in list_files:
duplicate_problems.extend(
[p for sheet in file.sheets if (p := _check_for_duplicate_nodes_one_df(sheet)) is not None]
)
duplicate_problems: list[SheetProblem] = [
p for sheet in sheet_list if (p := _check_for_duplicate_nodes_one_df(sheet)) is not None
]
if duplicate_problems:
problems.append(CollectedSheetProblems(duplicate_problems))
if id_problem := _check_for_duplicate_custom_id_all_excels(list_files):
if id_problem := _check_for_duplicate_custom_id_all_excels(sheet_list):
problems.append(id_problem)
if problems:
msg = ListCreationProblem(problems).execute_error_protocol()
logger.error(msg)
raise InputError(msg)


def _check_for_unique_list_names(list_files: list[ExcelFile]) -> None:
def _check_for_unique_list_names(sheet_list: list[ExcelSheet]) -> None:
"""This functon checks that one sheet only has one list and that all lists have unique names."""
list_names: list[ListInformation] = []
all_problems: list[Problem] = []
sheet_problems: list[SheetProblem] = []
for excel_file in list_files:
for sheet in excel_file.sheets:
preferred_language = _get_preferred_language(sheet.df.columns, r"list")
unique_list_names = list(sheet.df[f"{preferred_language}_list"].unique())
if len(unique_list_names) != 1:
sheet_problems.append(
MultipleListPerSheetProblem(sheet.excel_name, sheet.sheet_name, unique_list_names)
)
list_names.extend(
[ListInformation(excel_file.filename, sheet.sheet_name, name) for name in unique_list_names]
)
for sheet in sheet_list:
preferred_language = _get_preferred_language(sheet.df.columns, r"list")
unique_list_names = list(sheet.df[f"{preferred_language}_list"].unique())
if len(unique_list_names) != 1:
sheet_problems.append(MultipleListPerSheetProblem(sheet.excel_name, sheet.sheet_name, unique_list_names))
list_names.extend([ListInformation(sheet.excel_name, sheet.sheet_name, name) for name in unique_list_names])
if sheet_problems:
all_problems.append(CollectedSheetProblems(sheet_problems))
list_info_dict = defaultdict(list)
Expand Down Expand Up @@ -450,20 +429,19 @@ def _check_for_duplicate_nodes_one_df(sheet: ExcelSheet) -> DuplicatesInSheetPro
return None


def _check_for_duplicate_custom_id_all_excels(list_files: list[ExcelFile]) -> DuplicatesCustomIDInProblem | None:
def _check_for_duplicate_custom_id_all_excels(sheet_list: list[ExcelSheet]) -> DuplicatesCustomIDInProblem | None:
id_list = []
for file in list_files:
for sheet in file.sheets:
for i, row in sheet.df.iterrows():
if not pd.isna(row["id (optional)"]):
id_list.append(
{
"filename": file.filename,
"sheet_name": sheet.sheet_name,
"id": row["id (optional)"],
"row_number": int(str(i)) + 2,
}
)
for sheet in sheet_list:
for i, row in sheet.df.iterrows():
if not pd.isna(row["id (optional)"]):
id_list.append(
{
"filename": sheet.excel_name,
"sheet_name": sheet.sheet_name,
"id": row["id (optional)"],
"row_number": int(str(i)) + 2,
}
)
id_df = pd.DataFrame.from_records(id_list)
if (duplicate_ids := id_df.duplicated("id", keep=False)).any():
problems: dict[str, DuplicateIDProblem] = defaultdict(lambda: DuplicateIDProblem())
Expand All @@ -477,11 +455,11 @@ def _check_for_duplicate_custom_id_all_excels(list_files: list[ExcelFile]) -> Du
return None


def _make_shape_compliance_all_excels(list_files: list[ExcelFile]) -> None:
def _make_shape_compliance_all_excels(sheet_list: list[ExcelSheet]) -> None:
"""Check if the excel files are compliant with the expected format."""
problems: list[SheetProblem] = []
for file in list_files:
problems.extend([p for sheet in file.sheets if (p := _make_shape_compliance_one_sheet(sheet)) is not None])
problems: list[SheetProblem] = [
p for sheet in sheet_list if (p := _make_shape_compliance_one_sheet(sheet)) is not None
]
if problems:
msg = ListCreationProblem([CollectedSheetProblems(problems)]).execute_error_protocol()
logger.error(msg)
Expand Down Expand Up @@ -554,18 +532,16 @@ def make_col_names(lang: str) -> set[str]:
return {}


def _make_all_content_compliance_checks_all_excels(list_files: list[ExcelFile]) -> None:
def _make_all_content_compliance_checks_all_excels(sheet_list: list[ExcelSheet]) -> None:
"""Check if the content of the excel files is compliant with the expected format."""
_check_for_missing_translations_all_excels(list_files)
_check_for_erroneous_entries_all_excels(list_files)
_check_for_missing_translations_all_excels(sheet_list)
_check_for_erroneous_entries_all_excels(sheet_list)


def _check_for_missing_translations_all_excels(list_files: list[ExcelFile]) -> None:
problems: list[SheetProblem] = []
for file in list_files:
problems.extend(
[p for sheet in file.sheets if (p := _check_for_missing_translations_one_sheet(sheet)) is not None]
)
def _check_for_missing_translations_all_excels(sheet_list: list[ExcelSheet]) -> None:
problems: list[SheetProblem] = [
p for sheet in sheet_list if (p := _check_for_missing_translations_one_sheet(sheet)) is not None
]
if problems:
msg = ListCreationProblem([CollectedSheetProblems(problems)]).execute_error_protocol()
logger.error(msg)
Expand Down Expand Up @@ -610,10 +586,10 @@ def _check_for_missing_translations_one_node(
return None


def _check_for_erroneous_entries_all_excels(list_files: list[ExcelFile]) -> None:
problems: list[SheetProblem] = []
for file in list_files:
problems.extend([p for sheet in file.sheets if (p := _check_for_erroneous_entries_one_list(sheet)) is not None])
def _check_for_erroneous_entries_all_excels(sheet_list: list[ExcelSheet]) -> None:
problems: list[SheetProblem] = [
p for sheet in sheet_list if (p := _check_for_erroneous_entries_one_list(sheet)) is not None
]
if problems:
msg = ListCreationProblem([CollectedSheetProblems(problems)]).execute_error_protocol()
logger.error(msg)
Expand Down
Loading

0 comments on commit 3b554a3

Please sign in to comment.