Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Don't Merge] Setting to control delta job count for each delta write #2031

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dlt/destinations/impl/filesystem/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ class FilesystemDestinationClientConfiguration(FilesystemConfiguration, Destinat
)
current_datetime: Optional[TCurrentDateTime] = None
extra_placeholders: Optional[TExtraPlaceholders] = None
delta_jobs_per_write: Optional[int] = None
"""how many jobs to write in a single commit for Delta tables"""

@resolve_type("credentials")
def resolve_credentials_type(self) -> Type[CredentialsConfiguration]:
Expand Down
16 changes: 12 additions & 4 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -726,13 +726,21 @@ def create_table_chain_completed_followup_jobs(
for job in completed_table_chain_jobs
if job.job_file_info.table_name == table["name"]
]
if table_job_paths:
file_name = FileStorage.get_file_name_from_file_path(table_job_paths[0])
jobs.append(ReferenceFollowupJobRequest(file_name, table_job_paths))
else:
if len(table_job_paths) == 0:
# file_name = ParsedLoadJobFileName(table["name"], "empty", 0, "reference").file_name()
# TODO: if we implement removal od orphaned rows, we may need to propagate such job without files
# to the delta load job
pass
else:
files_per_job = self.config.delta_jobs_per_write or len(table_job_paths)
for i in range(0, len(table_job_paths), files_per_job):
jobs_chunk = table_job_paths[i : i + files_per_job]
file_name = FileStorage.get_file_name_from_file_path(jobs_chunk[0])
jobs.append(ReferenceFollowupJobRequest(file_name, jobs_chunk))

return jobs

def _iter_chunks(self, lst: List[Any], n: int) -> Iterator[List[Any]]:
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i : i + n]
32 changes: 27 additions & 5 deletions tests/load/pipeline/test_filesystem_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,10 @@ def delta_table():
),
ids=lambda x: x.name,
)
@pytest.mark.parametrize("delta_jobs_per_write", [None, 3])
def test_delta_table_multiple_files(
destination_config: DestinationTestConfiguration,
delta_jobs_per_write: int,
) -> None:
"""Tests loading multiple files into a Delta table.

Expand All @@ -388,10 +390,15 @@ def test_delta_table_multiple_files(
from dlt.common.libs.deltalake import get_delta_tables

os.environ["DATA_WRITER__FILE_MAX_ITEMS"] = "2" # force multiple files
if delta_jobs_per_write is not None:
os.environ["DELTA_JOBS_PER_WRITE"] = str(
delta_jobs_per_write
) # only write 3 jobs at a time

@dlt.resource(table_format="delta")
def delta_table():
yield [{"foo": True}] * 10
for i in range(0, 20):
yield [{"foo": i}]

pipeline = destination_config.setup_pipeline("fs_pipe", dev_mode=True)

Expand All @@ -406,12 +413,27 @@ def delta_table():
if job.job_file_info.table_name == "delta_table"
and job.job_file_info.file_format == "parquet"
]
assert len(delta_table_parquet_jobs) == 5 # 10 records, max 2 per file
assert len(delta_table_parquet_jobs) == 10 # 20 records, max 2 per file

# all 10 records should have been loaded into a Delta table in a single commit
assert get_delta_tables(pipeline, "delta_table")["delta_table"].version() == 0
delta_table_reference_jobs = [
job
for job in completed_jobs
if job.job_file_info.table_name == "delta_table"
and job.job_file_info.file_format == "reference"
]
assert (
len(delta_table_reference_jobs) == 1 if not delta_jobs_per_write else 4
) # amount of reference jobs to load the items

# all 20 records should have been loaded into a Delta table in a single commit
assert (
get_delta_tables(pipeline, "delta_table")["delta_table"].version() == 0
if not delta_jobs_per_write
else 3
)
rows = load_tables_to_dicts(pipeline, "delta_table", exclude_system_cols=True)["delta_table"]
assert len(rows) == 10
assert len(rows) == 20
assert {row["foo"] for row in rows} == set(range(0, 20))


@pytest.mark.parametrize(
Expand Down
Loading