Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Intelligent estimation of manifest entry size #355

Merged
merged 25 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
a25f45d
use parquet meta to estimate size
raghumdani Sep 24, 2024
7a2d359
enable intelligent size estimation
raghumdani Sep 25, 2024
8b96429
simplify
raghumdani Sep 25, 2024
edff05d
fix by array size estimator
raghumdani Sep 25, 2024
b5d0d05
append content type params if intelligent estimation is enabled
raghumdani Sep 25, 2024
ecc0c53
Few more changes
raghumdani Sep 25, 2024
95a7e67
Adding UTs
raghumdani Sep 25, 2024
0e527c3
Add invalid column UT
raghumdani Sep 25, 2024
028f171
Fix log messages
raghumdani Sep 25, 2024
bd36e06
Adding enums to perform each type of estimation
raghumdani Sep 26, 2024
f08e2c1
address comments
raghumdani Sep 26, 2024
f2775ab
Adding delta size estimation
raghumdani Sep 27, 2024
d947ebc
rename manifest module
raghumdani Sep 27, 2024
6535254
Add more tests
raghumdani Sep 27, 2024
9e02e23
Fix requires content type params
raghumdani Sep 27, 2024
c6a1b16
Ugrade dependencies and bump version
raghumdani Sep 27, 2024
4fbc4c9
Adding a case where files to sample is zero
raghumdani Sep 27, 2024
f054c61
Export operation type
raghumdani Sep 27, 2024
150359b
Support case when parquet to pyarrow inflation is none
raghumdani Sep 27, 2024
0bba5f6
Add caching in append_content_type_params to avoid redownloading parq…
raghumdani Sep 28, 2024
8ab22d1
Only cache when the number of entries is high to avoid constant calls…
raghumdani Sep 28, 2024
1a7a559
Add json context to logs
raghumdani Sep 28, 2024
173116d
Ensure appropriate log level
raghumdani Sep 28, 2024
047f2cf
Fix circular imports
raghumdani Sep 28, 2024
86c5d11
Adding
raghumdani Sep 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 77 additions & 2 deletions deltacat/compute/compactor/model/compact_partition_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
TOTAL_MEMORY_BUFFER_PERCENTAGE,
DEFAULT_DISABLE_COPY_BY_REFERENCE,
DEFAULT_NUM_ROUNDS,
PARQUET_TO_PYARROW_INFLATION,
MAX_PARQUET_METADATA_SIZE,
)
from deltacat.constants import PYARROW_INFLATION_MULTIPLIER
from deltacat.compute.compactor.utils.sort_key import validate_sort_keys
Expand Down Expand Up @@ -104,6 +106,23 @@ def of(params: Optional[Dict]) -> CompactPartitionParams:
result.metrics_config = params.get("metrics_config")

result.num_rounds = params.get("num_rounds", DEFAULT_NUM_ROUNDS)
result.parquet_to_pyarrow_inflation = params.get(
"parquet_to_pyarrow_inflation", PARQUET_TO_PYARROW_INFLATION
)
result.force_using_previous_inflation_for_memory_calculation = params.get(
"force_using_previous_inflation_for_memory_calculation", False
)
result.enable_intelligent_size_estimation = params.get(
"enable_intelligent_size_estimation", False
raghumdani marked this conversation as resolved.
Show resolved Hide resolved
)

# disable input split during rebase as the rebase files are already uniform
result.enable_input_split = (
params.get("rebase_source_partition_locator") is None
)
result.max_parquet_meta_size_bytes = params.get(
"max_parquet_meta_size_bytes", MAX_PARQUET_METADATA_SIZE
)

if not importlib.util.find_spec("memray"):
result.enable_profiler = False
Expand Down Expand Up @@ -414,13 +433,69 @@ def num_rounds(self, num_rounds: int) -> None:
self["num_rounds"] = num_rounds

@property
def parquet_to_pyarrow_inflation(self) -> int:
def parquet_to_pyarrow_inflation(self) -> float:
"""
The inflation factor for the parquet uncompressed_size_bytes to pyarrow table size.
"""
return self["parquet_to_pyarrow_inflation"]

@parquet_to_pyarrow_inflation.setter
def parquet_to_pyarrow_inflation(self, value: int) -> None:
def parquet_to_pyarrow_inflation(self, value: float) -> None:
self["parquet_to_pyarrow_inflation"] = value

@property
def enable_input_split(self) -> bool:
"""
When this is True, the input split will be always enabled for parquet files.
The input split feature will split the parquet files into individual row groups
so that we could process them in different nodes in parallel.
By default, input split is enabled for incremental compaction and disabled for rebase.
raghumdani marked this conversation as resolved.
Show resolved Hide resolved
"""
return self["enable_input_split"]

@enable_input_split.setter
def enable_input_split(self, value: bool) -> None:
self["enable_input_split"] = value

@property
def force_using_previous_inflation_for_memory_calculation(self) -> bool:
raghumdani marked this conversation as resolved.
Show resolved Hide resolved
"""
When this is True, the memory estimation will always use previous inflation
and average record size for all data formats even if format specific metadata
is available to make better predictions of memory requirements.

By default, previous inflation is used for non-parquet files to estimate memory while
parquet metadata will be used for parquet to estimate memory. We only fallback
to previous inflation if parquet metadata isn't available.
"""
return self["force_using_previous_inflation_for_memory_calculation"]

@force_using_previous_inflation_for_memory_calculation.setter
def force_using_previous_inflation_for_memory_calculation(
self, value: bool
) -> None:
self["force_using_previous_inflation_for_memory_calculation"] = value

@property
def max_parquet_meta_size_bytes(self) -> int:
raghumdani marked this conversation as resolved.
Show resolved Hide resolved
return self["max_parquet_meta_size_bytes"]

@max_parquet_meta_size_bytes.setter
def max_parquet_meta_size_bytes(self, value: int) -> None:
self["max_parquet_meta_size_bytes"] = value

@property
def enable_intelligent_size_estimation(self) -> bool:
raghumdani marked this conversation as resolved.
Show resolved Hide resolved
"""
The arguments enable intelligent memory estimation that considers
encoding, min/max and other statistics to estimate memory requirements.
"""
return self["enable_intelligent_size_estimation"]

@enable_intelligent_size_estimation.setter
def enable_intelligent_size_estimation(self, value: bool) -> None:
self["enable_intelligent_size_estimation"] = value

@staticmethod
def json_handler_for_compact_partition_params(obj):
"""
Expand Down
26 changes: 26 additions & 0 deletions deltacat/compute/compactor/model/compaction_session_audit_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,22 @@ def compactor_version(self) -> str:
"""
return self.get("compactorVersion")

@property
def observed_input_inflation(self) -> float:
"""
The average inflation observed for input files only.
This only accounts for files in the source.
"""
return self.get("observedInputInflation")

@property
def observed_input_average_record_size_bytes(self) -> float:
"""
The average record size observed for input files only.
This only accounts for files in the source.
"""
return self.get("observedInputAverageRecordSizeBytes")

# Setters follow

def set_audit_url(self, audit_url: str) -> CompactionSessionAuditInfo:
Expand Down Expand Up @@ -756,6 +772,16 @@ def set_compactor_version(self, value: str) -> CompactionSessionAuditInfo:
self["compactorVersion"] = value
return self

def set_observed_input_inflation(self, value: float) -> CompactionSessionAuditInfo:
self["observedInputInflation"] = value
return self

def set_observed_input_average_record_size_bytes(
self, value: float
) -> CompactionSessionAuditInfo:
self["observedInputAverageRecordSizeBytes"] = value
return self

# High level methods to save stats
def save_step_stats(
self,
Expand Down
19 changes: 13 additions & 6 deletions deltacat/compute/compactor/model/delta_annotated.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def rebatch(
estimation_function: Optional[
Callable[[ManifestEntry], float]
] = lambda entry: entry.meta.content_length,
enable_input_split: Optional[bool] = False,
) -> List[DeltaAnnotated]:
"""
Simple greedy algorithm to split/merge 1 or more annotated deltas into
Expand All @@ -86,13 +87,19 @@ def rebatch(
new_da_bytes = 0
da_group_entry_count = 0

for delta_annotated in annotated_deltas:
split_annotated_deltas.extend(DeltaAnnotated._split_single(delta_annotated))
if enable_input_split:
for delta_annotated in annotated_deltas:
split_annotated_deltas.extend(
DeltaAnnotated._split_single(delta_annotated)
)

logger.info(
f"Split the {len(annotated_deltas)} annotated deltas "
f"into {len(split_annotated_deltas)} groups."
)
logger.info(
f"Split the {len(annotated_deltas)} annotated deltas "
f"into {len(split_annotated_deltas)} groups."
)
else:
logger.info("Skipping input split as it is disabled...")
split_annotated_deltas = annotated_deltas

for src_da in split_annotated_deltas:
src_da_annotations = src_da.annotations
Expand Down
3 changes: 3 additions & 0 deletions deltacat/compute/compactor_v2/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
# size in metadata to pyarrow table size.
PARQUET_TO_PYARROW_INFLATION = 4

# Maximum size of the parquet metadata
MAX_PARQUET_METADATA_SIZE = 100_000_000 # 100 MB
raghumdani marked this conversation as resolved.
Show resolved Hide resolved

# By default, copy by reference is enabled
DEFAULT_DISABLE_COPY_BY_REFERENCE = False

Expand Down
22 changes: 20 additions & 2 deletions deltacat/compute/compactor_v2/private/compaction_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,13 @@ def _build_uniform_deltas(
previous_inflation=params.previous_inflation,
min_delta_bytes=params.min_delta_bytes_in_batch,
min_file_counts=params.min_files_in_batch,
# disable input split during rebase as the rebase files are already uniform
enable_input_split=params.rebase_source_partition_locator is None,
enable_input_split=params.enable_input_split,
deltacat_storage_kwargs=params.deltacat_storage_kwargs,
parquet_to_pyarrow_inflation=params.parquet_to_pyarrow_inflation,
force_use_previous_inflation=params.force_using_previous_inflation_for_memory_calculation,
enable_intelligent_size_estimation=params.enable_intelligent_size_estimation,
task_max_parallelism=params.task_max_parallelism,
max_parquet_meta_size_bytes=params.max_parquet_meta_size_bytes,
)
delta_discovery_end: float = time.monotonic()

Expand Down Expand Up @@ -400,6 +404,9 @@ def _merge(
deltacat_storage_kwargs=params.deltacat_storage_kwargs,
ray_custom_resources=params.ray_custom_resources,
memory_logs_enabled=params.memory_logs_enabled,
parquet_to_pyarrow_inflation=params.parquet_to_pyarrow_inflation,
force_use_previous_inflation=params.force_using_previous_inflation_for_memory_calculation,
enable_intelligent_size_estimation=params.enable_intelligent_size_estimation,
)

def merge_input_provider(index, item) -> dict[str, MergeInput]:
Expand Down Expand Up @@ -463,6 +470,9 @@ def _hash_bucket(
primary_keys=params.primary_keys,
ray_custom_resources=params.ray_custom_resources,
memory_logs_enabled=params.memory_logs_enabled,
parquet_to_pyarrow_inflation=params.parquet_to_pyarrow_inflation,
force_use_previous_inflation=params.force_using_previous_inflation_for_memory_calculation,
enable_intelligent_size_estimation=params.enable_intelligent_size_estimation,
)

def hash_bucket_input_provider(index, item) -> dict[str, HashBucketInput]:
Expand Down Expand Up @@ -537,6 +547,9 @@ def _run_local_merge(
ray_custom_resources=params.ray_custom_resources,
primary_keys=params.primary_keys,
memory_logs_enabled=params.memory_logs_enabled,
parquet_to_pyarrow_inflation=params.parquet_to_pyarrow_inflation,
force_use_previous_inflation=params.force_using_previous_inflation_for_memory_calculation,
enable_intelligent_size_estimation=params.enable_intelligent_size_estimation,
)
local_merge_result = ray.get(
mg.merge.options(**local_merge_options).remote(local_merge_input)
Expand Down Expand Up @@ -666,6 +679,11 @@ def _write_new_round_completion_file(
f" and average record size={input_average_record_size_bytes}"
)

mutable_compaction_audit.set_observed_input_inflation(input_inflation)
mutable_compaction_audit.set_observed_input_average_record_size_bytes(
input_average_record_size_bytes
)

_update_and_upload_compaction_audit(
params,
mutable_compaction_audit,
Expand Down
Loading
Loading