-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Task-based and actor-based S3 Parquet file compactor examples #8707
Conversation
Can one of the admins verify this patch? |
Test PASSed. |
Hey @pdames thanks for this example! The source code looks pretty clean, but could you write up a short explanation of what it does/how to use it (ideally something easily reproducible). |
I think this is a good example/template of documentation for an example. |
Test PASSed. |
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @pdames thanks for this example!
The source code looks pretty clean, but could you write up a short explanation of what it does/how to use it (ideally something easily reproducible).
I think this is a good example/template of documentation for an example.
hb_tasks_pending.append(hb_task_promise) | ||
while len(hb_tasks_pending): | ||
hb_task_complete, hb_tasks_pending = ray.wait(hb_tasks_pending) | ||
all_hash_bucket_indices.update(ray.get(hb_task_complete[0])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we have too call update on the rest of the elements in hb_task_complete
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, ray.wait(hb_tasks_pending) implicitly sets the default value of num_returns to 1, so it should only ever return 1 item at a time in hb_task_complete.
However, since we're not doing anything useful here by waiting for one item at a time, I suppose we could get rid of the wait and while loop altogether to just write:
hb_tasks_complete = ray.get(hb_tasks_pending)
all_hash_bucket_indices.update(itertools.chain.from_iterable(hb_tasks_complete))
max_records_per_output_file, | ||
) | ||
dd_tasks_pending.append(dd_task_promise) | ||
while len(dd_tasks_pending): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we can just do ray.get(dd_tasks_pending)
or if we care about the status of the queue we can do something along the lines of:
with tqdm(total=len(dd_tasks_pending)) as pbar:
while len(dd_tasks_pending):
dd_task_complete, dd_tasks_pending = ray.wait(dd_tasks_pending)
pbar.update(len(dd_task_complete))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed - it's purely synchronous as written, so it would be simpler to just replace it with ray.get(...).
hb_tasks_pending.append(hb_task_promise) | ||
while len(hb_tasks_pending): | ||
hb_task_complete, hb_tasks_pending = ray.wait(hb_tasks_pending) | ||
all_hash_bucket_indices.update(ray.get(hb_task_complete[0])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
) | ||
dd_tasks_pending.append(dd_task_promise) | ||
while len(dd_tasks_pending): | ||
dd_task_complete, dd_tasks_pending = ray.wait(dd_tasks_pending) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
|
||
|
||
@ray.remote | ||
def dedupe(bucket, table_stream_id, hash_bucket_index, primary_keys, sort_keys, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are the performance characteristics of this similar to the actor based example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The actor based compactor tends to be more performant, as long as it doesn't exhaust object store memory and fail completely. So the task-based compactor tends to be slower but more resilient to failure.
Related to this, I have another example compactor I've developed that just uses the actor to hold S3 file-refs, and kicks off the dedupe task to eagerly listen for and read intermediate S3 files coming out of the hash_bucket step. The HashBucket actor is also modified to contain a "finalized" flag that is set once all hash-bucketing tasks complete, so that the dedupe task knows when to stop listening and proceed to sorting/deduplicating (@edoakes has also mentioned that Async Actors should help clean up the finalization sleep/poll step I currently use).
So far, this seems to provide the best combination of performance and resilience. Here's the relevant code:
def compact(
input_bucket,
output_bucket,
table_stream_id,
primary_keys,
sort_keys,
max_records_per_output_file,
num_hash_buckets):
# check preconditions before doing any computationally expensive work
check_preconditions(
primary_keys,
sort_keys,
max_records_per_output_file,
)
# define distinct, but constant, pk hash and event timestamp column names
col_uuid = "4000f124-dfbd-48c6-885b-7b22621a6d41"
pk_hash_column_name = "{}_hash".format(col_uuid)
event_timestamp_column_name = "{}_event_timestamp".format(col_uuid)
# append the event timestamp column to the sort key list
sort_keys.append(event_timestamp_column_name)
# first group like primary keys together by hashing them into buckets
input_file_paths = filter_file_paths_by_prefix(
input_bucket,
"input/{}".format(table_stream_id))
all_hash_bucket_indices = set()
hb_actors = []
for i in range(num_hash_buckets):
hb_actors.append(HashBucket.remote())
# create dedupe tasks listening for datasets to compact in each hash bucket
dd_tasks_pending = []
for hb_index in range(num_hash_buckets):
dd_task_promise = dedupe.remote(
output_bucket,
table_stream_id,
hb_index,
pk_hash_column_name,
sort_keys,
max_records_per_output_file,
hb_actors[hb_index],
)
dd_tasks_pending.append(dd_task_promise)
# group like primary keys together by hashing them into buckets
hb_tasks_pending = []
for input_file_path in input_file_paths:
hb_task_promise = hash_bucket.remote(
[input_file_path],
output_bucket,
table_stream_id,
primary_keys,
num_hash_buckets,
pk_hash_column_name,
event_timestamp_column_name,
hb_actors,
)
hb_tasks_pending.append(hb_task_promise)
while len(hb_tasks_pending):
hb_task_complete, hb_tasks_pending = ray.wait(hb_tasks_pending)
hash_bucket_indices = ray.get(hb_task_complete[0])
all_hash_bucket_indices.update(hash_bucket_indices)
# finalize hash bucketing so that all dedupe tasks flush their pending work
for hb_index in all_hash_bucket_indices:
hb_actors[hb_index].finalize.remote()
# wait for all pending dedupe tasks to complete
ray.get(dd_tasks_pending)
@ray.remote
class HashBucket:
def __init__(self, ):
self.file_paths = set()
self.finalized = False
def append(self, file_path):
self.file_paths.add(file_path)
def get(self):
return self.file_paths
def finalize(self):
self.finalized = True
def is_finalized(self):
return self.finalized
@ray.remote
def hash_bucket(
input_file_paths,
output_bucket,
table_stream_id,
primary_keys,
num_buckets,
hash_column_name,
event_timestamp_column_name,
hb_actors):
# read input parquet path into a single dataframe
dataframe = read_files_add_event_timestamp(
input_file_paths,
event_timestamp_column_name,
)
# group the data by primary key hash value
df_groups = group_by_pk_hash_bucket(
dataframe,
num_buckets,
primary_keys,
hash_column_name,
)
# write grouped output data to files including the group name
hash_bucket_indices = []
for hash_bucket_index, df_group in df_groups:
hash_bucket_indices.append(hash_bucket_index)
output = drop_hash_bucket_column(df_group)
output_file_path = get_hash_bucket_output_file_path(
output_bucket,
table_stream_id,
hash_bucket_index)
output.to_parquet(output_file_path, flavor="spark")
hb_actors[hash_bucket_index].append.remote(output_file_path)
return hash_bucket_indices
@ray.remote
def dedupe(
output_bucket,
table_stream_id,
hash_bucket_index,
primary_keys,
sort_keys,
max_records_per_output_file,
hb_actor):
input_file_to_df = {}
observed_file_paths = set()
finalized = False
dedupe_out_file_prefix = get_dedupe_output_file_prefix(
output_bucket,
table_stream_id,
hash_bucket_index,
)
# read any previously compacted parquet files for this bucket
prev_deduped_df = read_parquet_files_by_url_prefix(dedupe_out_file_prefix)
input_file_to_df[dedupe_out_file_prefix] = prev_deduped_df
while not finalized:
finalized = ray.get(hb_actor.is_finalized.remote())
# run one post-finalization concat to catch the last set of files
input_file_paths_promise = hb_actor.get.remote()
input_file_paths = ray.get(input_file_paths_promise)
new_file_paths = input_file_paths.difference(observed_file_paths)
if len(new_file_paths):
input_file_to_df.update(read_parquet_files(new_file_paths))
observed_file_paths.update(new_file_paths)
time.sleep(1)
# concatenate all input dataframes
dataframe = pd.concat(input_file_to_df.values(), axis=0, copy=False)
# sort by sort keys
dataframe.sort_values(sort_keys, inplace=True)
# drop duplicates by primary key
dataframe.drop_duplicates(primary_keys, inplace=True)
# write sorted, compacted table back
dedupe_out_file_prefix = get_dedupe_output_file_prefix(
output_bucket,
table_stream_id,
hash_bucket_index,
)
write_parquet_files(
dataframe,
dedupe_out_file_prefix,
max_records_per_output_file,
)
Also, we change the hash bucket file prefix to put its output in a separate "intermediate" folder:
def get_hash_bucket_file_prefix(table_stream_id, hash_bucket_index):
return "intermediate/{}_{}_".format(table_stream_id, hash_bucket_index)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the performance note, I'm also working on a pyarrow-table based implementation that could drastically improve overall end-to-end compaction latency while also consuming substantially less memory.
However, the pyarrow implementation comes at the cost of generally being less readable than the pandas-dataframe-based example, and isn't interoperable with the output from the pandas-dataframe-based example (due to schema changes implicitly incurred by type casting parquet files into and out of pandas).
Regardless, I'll see if we can maybe add an option at the end to just let users choose whether they want to run the pyarrow compactor or the pandas compactor, and ensure that they keep their respective outputs in separate s3 paths.
@wuisawesome @pdames What's the status of this PR? |
@rkooo567 i think this is still active, just a low priority for now. |
I will mark it with @author-action-required. @pdames Please remove the label or ping Alex if you'd like the next review! |
Will do - thanks @rkooo567! @wuisawesome is right - it's still active and has some updates forthcoming, but has been deprioritized for the past few months. |
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 14 days if no further activity occurs. Thank you for your contributions.
|
Hi again! The issue will be closed because there has been no more activity in the 14 days since the last message. Please feel free to reopen or open a new issue if you'd still like it to be addressed. Again, you can always ask for help on our discussion forum or Ray's public slack channel. Thanks again for opening the issue! |
Why are these changes needed?
These changes provide a simple example of using Ray to compact an input table's stream of S3 Parquet file deltas into a single output file given a set of primary keys and sort keys.
The pure-task-based compactor launches one distributed task per input delta in a table’s stream, takes a hash of the primary key modulo a desired number of buckets to group “like” primary keys together, saves each parallel task’s bucket groupings into distinct S3 files, and then "compacts" (i.e. sorts by sort key columns and dedupes by primary keys) these like-groupings by creating one parallel compaction task per hash bucket index.
The actor-based compactor tries to reduce the number of small hash bucket grouping files produced by passing a list of hash bucket actor handles into the task.
Related issue number
#8687
Checks
scripts/format.sh
to lint the changes in this PR.