Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge pa table when reading data from data servers #3

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion python/pyjava/data/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Any, Generic, List, Callable, Union, Tuple, Iterable

import ray
import pyarrow as pa
from ray.data.datasource.datasource import WriteResult
from ray.types import ObjectRef

Expand All @@ -16,17 +17,36 @@ def __init__(self, data_refs: List[str], num_tables_per_block: int = 1):
self.data_refs = data_refs
self.block_refs = []

def merged_pa_generator(pa_generator):
merged_tables = []
while True:
try:
for i in range(0, 7):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number of tables merged should be configured by parameter num_tables_per_block. The value is 1 by default.

patable = next(pa_generator)
merged_tables.append(patable)
yield pa.concat_tables(merged_tables)
merged_tables.clear()
except StopIteration as e:
if len(merged_tables) > 0:
yield pa.concat_tables(merged_tables)
print("Reading from pa table iterator is done!")
break

@ray.remote
def make_block(_data_ref):
block_refs = []
data_iter = RayContext.fetch_data_from_single_data_server_as_arrow(_data_ref)
temp_box = []
for arrow_table in data_iter:

merged_data_iters = merged_pa_generator(data_iter)

for arrow_table in merged_data_iters:
temp_box.append(arrow_table)
if len(temp_box) == num_tables_per_block:
for t in temp_box:
block_refs.append(ray.put(t))
temp_box.clear()

if len(temp_box) != 0:
for t in temp_box:
block_refs.append(ray.put(t))
Expand Down