Skip to content

Commit

Permalink
chore(data-warehouse): Second attempt at rolling this out for some te…
Browse files Browse the repository at this point in the history
…sting (#26162)
  • Loading branch information
Gilbert09 authored Nov 14, 2024
1 parent ee8d0f9 commit aafaea1
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 60 deletions.
2 changes: 2 additions & 0 deletions mypy-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,8 @@ posthog/temporal/tests/batch_exports/test_batch_exports.py:0: error: TypedDict k
posthog/temporal/data_modeling/run_workflow.py:0: error: Dict entry 20 has incompatible type "str": "Literal['complex']"; expected "str": "Literal['text', 'double', 'bool', 'timestamp', 'bigint', 'binary', 'json', 'decimal', 'wei', 'date', 'time']" [dict-item]
posthog/temporal/data_modeling/run_workflow.py:0: error: Dict entry 21 has incompatible type "str": "Literal['complex']"; expected "str": "Literal['text', 'double', 'bool', 'timestamp', 'bigint', 'binary', 'json', 'decimal', 'wei', 'date', 'time']" [dict-item]
posthog/temporal/data_modeling/run_workflow.py:0: error: Dict entry 22 has incompatible type "str": "Literal['complex']"; expected "str": "Literal['text', 'double', 'bool', 'timestamp', 'bigint', 'binary', 'json', 'decimal', 'wei', 'date', 'time']" [dict-item]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: "FilesystemDestinationClientConfiguration" has no attribute "delta_jobs_per_write" [attr-defined]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: "type[FilesystemDestinationClientConfiguration]" has no attribute "delta_jobs_per_write" [attr-defined]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: Incompatible types in assignment (expression has type "object", variable has type "DataWarehouseCredential | Combinable | None") [assignment]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: Incompatible types in assignment (expression has type "object", variable has type "str | int | Combinable") [assignment]
posthog/temporal/data_imports/pipelines/pipeline_sync.py:0: error: Incompatible types in assignment (expression has type "dict[str, dict[str, str | bool]] | dict[str, str]", variable has type "dict[str, dict[str, str]]") [assignment]
Expand Down
141 changes: 81 additions & 60 deletions posthog/temporal/data_imports/pipelines/pipeline_sync.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from dataclasses import dataclass
from typing import Literal
from typing import Any, Literal, Optional
from collections.abc import Iterator, Sequence
import uuid

import dlt
from django.conf import settings
from django.db.models import Prefetch
from dlt.pipeline.exceptions import PipelineStepFailed
from deltalake import DeltaTable

from posthog.settings.base_variables import TEST
from structlog.typing import FilteringBoundLogger
Expand All @@ -14,6 +16,21 @@
from dlt.common.schema.typing import TSchemaTables
from dlt.load.exceptions import LoadClientJobRetry
from dlt.sources import DltSource
from dlt.destinations.impl.filesystem.filesystem import FilesystemClient
from dlt.destinations.impl.filesystem.configuration import FilesystemDestinationClientConfiguration
from dlt.common.destination.reference import (
FollowupJobRequest,
)
from dlt.common.destination.typing import (
PreparedTableSchema,
)
from dlt.destinations.job_impl import (
ReferenceFollowupJobRequest,
)
from dlt.common.storages import FileStorage
from dlt.common.storages.load_package import (
LoadJobInfo,
)
from deltalake.exceptions import DeltaError
from collections import Counter
from clickhouse_driver.errors import ServerException
Expand All @@ -25,7 +42,7 @@
from posthog.warehouse.models.external_data_schema import ExternalDataSchema
from posthog.warehouse.models.external_data_source import ExternalDataSource
from posthog.warehouse.models.table import DataWarehouseTable
from posthog.temporal.data_imports.util import prepare_s3_files_for_querying
from posthog.temporal.data_imports.util import is_posthog_team, prepare_s3_files_for_querying


@dataclass
Expand Down Expand Up @@ -100,51 +117,54 @@ def _create_pipeline(self):
pipeline_name = self._get_pipeline_name()
destination = self._get_destination()

# def create_table_chain_completed_followup_jobs(
# self: FilesystemClient,
# table_chain: Sequence[PreparedTableSchema],
# completed_table_chain_jobs: Optional[Sequence[LoadJobInfo]] = None,
# ) -> list[FollowupJobRequest]:
# assert completed_table_chain_jobs is not None
# jobs = super(FilesystemClient, self).create_table_chain_completed_followup_jobs(
# table_chain, completed_table_chain_jobs
# )
# if table_chain[0].get("table_format") == "delta":
# for table in table_chain:
# table_job_paths = [
# job.file_path
# for job in completed_table_chain_jobs
# if job.job_file_info.table_name == table["name"]
# ]
# if len(table_job_paths) == 0:
# # file_name = ParsedLoadJobFileName(table["name"], "empty", 0, "reference").file_name()
# # TODO: if we implement removal od orphaned rows, we may need to propagate such job without files
# # to the delta load job
# pass
# else:
# files_per_job = self.config.delta_jobs_per_write or len(table_job_paths)
# for i in range(0, len(table_job_paths), files_per_job):
# jobs_chunk = table_job_paths[i : i + files_per_job]
# file_name = FileStorage.get_file_name_from_file_path(jobs_chunk[0])
# jobs.append(ReferenceFollowupJobRequest(file_name, jobs_chunk))

# return jobs

# def _iter_chunks(self, lst: list[Any], n: int) -> Iterator[list[Any]]:
# """Yield successive n-sized chunks from lst."""
# for i in range(0, len(lst), n):
# yield lst[i : i + n]
def create_table_chain_completed_followup_jobs(
self: FilesystemClient,
table_chain: Sequence[PreparedTableSchema],
completed_table_chain_jobs: Optional[Sequence[LoadJobInfo]] = None,
) -> list[FollowupJobRequest]:
assert completed_table_chain_jobs is not None
jobs = super(FilesystemClient, self).create_table_chain_completed_followup_jobs(
table_chain, completed_table_chain_jobs
)
if table_chain[0].get("table_format") == "delta":
for table in table_chain:
table_job_paths = [
job.file_path
for job in completed_table_chain_jobs
if job.job_file_info.table_name == table["name"]
]
if len(table_job_paths) == 0:
# file_name = ParsedLoadJobFileName(table["name"], "empty", 0, "reference").file_name()
# TODO: if we implement removal od orphaned rows, we may need to propagate such job without files
# to the delta load job
pass
else:
files_per_job = self.config.delta_jobs_per_write or len(table_job_paths)
for i in range(0, len(table_job_paths), files_per_job):
jobs_chunk = table_job_paths[i : i + files_per_job]
file_name = FileStorage.get_file_name_from_file_path(jobs_chunk[0])
jobs.append(ReferenceFollowupJobRequest(file_name, jobs_chunk))

# Monkey patch to fix large memory consumption until https://github.com/dlt-hub/dlt/pull/2031 gets merged in
# if self._incremental or is_posthog_team(self.inputs.team_id):
# FilesystemDestinationClientConfiguration.delta_jobs_per_write = 1
# FilesystemClient.create_table_chain_completed_followup_jobs = create_table_chain_completed_followup_jobs # type: ignore
# FilesystemClient._iter_chunks = _iter_chunks # type: ignore
return jobs

# dlt.config["data_writer.file_max_items"] = 500_000
# dlt.config["data_writer.file_max_bytes"] = 500_000_000 # 500 MB
# dlt.config["loader_parallelism_strategy"] = "table-sequential"
# dlt.config["delta_jobs_per_write"] = 1
def _iter_chunks(self, lst: list[Any], n: int) -> Iterator[list[Any]]:
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i : i + n]

# Monkey patch to fix large memory consumption until https://github.com/dlt-hub/dlt/pull/2031 gets merged in
if (
is_posthog_team(self.inputs.team_id)
and str(self.inputs.source_id) == "01932521-63e7-0000-087c-527af4a2bc4d" # toms test source in prod
):
FilesystemDestinationClientConfiguration.delta_jobs_per_write = 1
FilesystemClient.create_table_chain_completed_followup_jobs = create_table_chain_completed_followup_jobs # type: ignore
FilesystemClient._iter_chunks = _iter_chunks # type: ignore

dlt.config["data_writer.file_max_items"] = 500_000
dlt.config["data_writer.file_max_bytes"] = 500_000_000 # 500 MB
dlt.config["loader_parallelism_strategy"] = "table-sequential"
dlt.config["delta_jobs_per_write"] = 1

dlt.config["normalize.parquet_normalizer.add_dlt_load_id"] = True
dlt.config["normalize.parquet_normalizer.add_dlt_id"] = True
Expand Down Expand Up @@ -173,21 +193,22 @@ def _run(self) -> dict[str, int]:
pipeline = self._create_pipeline()

# Workaround for full refresh schemas while we wait for Rust to fix memory issue
# if is_posthog_team(self.inputs.team_id):
# for name, resource in self.source._resources.items():
# if resource.write_disposition == "replace":
# try:
# delta_uri = f"{settings.BUCKET_URL}/{self.inputs.dataset_name}/{name}"
# delta_table = DeltaTable(delta_uri, storage_options=self._get_credentials())
# except TableNotFoundError:
# delta_table = None

# if delta_table:
# self.logger.debug("Deleting existing delta table")
# delta_table.delete()

# self.logger.debug("Updating table write_disposition to append")
# resource.apply_hints(write_disposition="append")
if (
is_posthog_team(self.inputs.team_id)
and str(self.inputs.source_id) == "01932521-63e7-0000-087c-527af4a2bc4d" # toms test source in prod
):
for name, resource in self.source._resources.items():
if resource.write_disposition == "replace":
delta_uri = f"{settings.BUCKET_URL}/{self.inputs.dataset_name}/{name}"
storage_options = self._get_credentials()

if DeltaTable.is_deltatable(delta_uri, storage_options):
delta_table = DeltaTable(delta_uri, storage_options=self._get_credentials())
self.logger.debug("Deleting existing delta table")
delta_table.delete()

self.logger.debug("Updating table write_disposition to append")
resource.apply_hints(write_disposition="append")

total_counts: Counter[str] = Counter({})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def mock_create_pipeline(local_self: Any):
) as mock_validate_schema_and_update_table,
patch("posthog.temporal.data_imports.pipelines.pipeline_sync.get_delta_tables"),
patch("posthog.temporal.data_imports.pipelines.pipeline_sync.update_last_synced_at_sync"),
patch("posthog.temporal.data_imports.pipelines.pipeline_sync.is_posthog_team", return_value=False),
):
pipeline = self._create_pipeline("Customer", False)
res = pipeline.run()
Expand All @@ -98,6 +99,7 @@ def mock_create_pipeline(local_self: Any):
) as mock_validate_schema_and_update_table,
patch("posthog.temporal.data_imports.pipelines.pipeline_sync.get_delta_tables"),
patch("posthog.temporal.data_imports.pipelines.pipeline_sync.update_last_synced_at_sync"),
patch("posthog.temporal.data_imports.pipelines.pipeline_sync.is_posthog_team", return_value=False),
):
pipeline = self._create_pipeline("Customer", True)
res = pipeline.run()
Expand Down
1 change: 1 addition & 0 deletions posthog/temporal/tests/data_imports/test_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ def mock_to_object_store_rs_credentials(class_self):
),
mock.patch.object(AwsCredentials, "to_session_credentials", mock_to_session_credentials),
mock.patch.object(AwsCredentials, "to_object_store_rs_credentials", mock_to_object_store_rs_credentials),
mock.patch("posthog.temporal.data_imports.pipelines.pipeline_sync.is_posthog_team", return_value=False),
):
async with await WorkflowEnvironment.start_time_skipping() as activity_environment:
async with Worker(
Expand Down

0 comments on commit aafaea1

Please sign in to comment.