From 9f0bf34443898607ce2de5b4b84161d78a40efaa Mon Sep 17 00:00:00 2001 From: Tao He Date: Tue, 17 Jan 2023 15:32:18 +0800 Subject: [PATCH] Vineyard: from unordered chunks. (#3324) * Vineyard: from unordered chunks. Signed-off-by: Tao He * Skip coverage check for those guard code. Signed-off-by: Tao He Signed-off-by: Tao He --- mars/dataframe/datasource/from_vineyard.py | 40 +++++++++++++++++----- mars/tensor/datasource/from_vineyard.py | 30 ++++++++++++---- 2 files changed, 54 insertions(+), 16 deletions(-) diff --git a/mars/dataframe/datasource/from_vineyard.py b/mars/dataframe/datasource/from_vineyard.py index 14f6778ba4..f1df58d420 100644 --- a/mars/dataframe/datasource/from_vineyard.py +++ b/mars/dataframe/datasource/from_vineyard.py @@ -142,8 +142,8 @@ def execute(cls, ctx, op): dtypes.append(dtype) dtypes = pd.Series(dtypes, index=columns) chunk_index = ( - chunk_meta["partition_index_row_"], - chunk_meta["partition_index_column_"], + chunk_meta.get("partition_index_row_", -1), + chunk_meta.get("partition_index_column_", -1), ) # chunk: (chunk_id, worker_address, dtype, shape, index, columns) chunks.append( @@ -173,7 +173,13 @@ def __init__(self, vineyard_socket=None, object_id=None, **kw): super().__init__(vineyard_socket=vineyard_socket, object_id=object_id, **kw) def __call__(self, meta): - return self.new_dataframe([meta]) + return self.new_dataframe( + [meta], + shape=meta.shape, + dtypes=meta.dtypes, + index_value=meta.index_value, + columns_value=meta.columns_value, + ) @classmethod def tile(cls, op): @@ -182,13 +188,25 @@ def tile(cls, op): ctx = get_context() - in_chunk_keys = [chunk.key for chunk in op.inputs[0].chunks] out_chunks = [] chunk_map = dict() dtypes, columns = None, None - for chunk, infos in zip( - op.inputs[0].chunks, ctx.get_chunks_result(in_chunk_keys) - ): + + in_chunk_keys = [chunk.key for chunk in op.inputs[0].chunks] + in_chunk_results = ctx.get_chunks_result(in_chunk_keys) + + # check if chunk indexes has unknown value + has_unknown_chunk_index = False + for infos in in_chunk_results: + for _, info in infos.iterrows(): # pragma: no cover + if len(info["index"]) == 0 or -1 in info["index"]: + has_unknown_chunk_index = True + break + + # assume chunks are row-splitted if chunk index is unknown + chunk_location = 0 + + for chunk, infos in zip(op.inputs[0].chunks, in_chunk_results): for _, info in infos.iterrows(): chunk_op = op.copy().reset_key() chunk_op.object_id = info["id"] @@ -196,7 +214,11 @@ def tile(cls, op): dtypes = info["dtypes"] columns = info["columns"] shape = info["shape"] - chunk_index = info["index"] + if has_unknown_chunk_index: # pragma: no cover + chunk_index = (chunk_location, 0) + chunk_location += 1 + else: + chunk_index = info["index"] chunk_map[chunk_index] = info["shape"] out_chunk = chunk_op.new_chunk( [chunk], @@ -251,7 +273,7 @@ def from_vineyard(df, vineyard_socket=None): gpu=None, ) meta = metaop( - shape=(np.nan,), + shape=(np.nan, np.nan), dtypes=pd.Series([]), index_value=parse_index(pd.Index([])), columns_value=parse_index(pd.Index([])), diff --git a/mars/tensor/datasource/from_vineyard.py b/mars/tensor/datasource/from_vineyard.py index 45f77d9a01..a611fd5318 100644 --- a/mars/tensor/datasource/from_vineyard.py +++ b/mars/tensor/datasource/from_vineyard.py @@ -95,7 +95,7 @@ def execute(cls, ctx, op): chunk_meta["value_type_"], chunk_meta.get("value_type_meta_", None) ) shape = tuple(json.loads(chunk_meta["shape_"])) - chunk_index = tuple(json.loads(chunk_meta["partition_index_"])) + chunk_index = tuple(json.loads(chunk_meta.get("partition_index_", "[]"))) # chunk: (chunk_id, worker_address, dtype, shape, index) chunks.append( (repr(chunk_meta.id), ctx.worker_address, dtype, shape, chunk_index) @@ -119,7 +119,7 @@ def __init__(self, vineyard_socket=None, object_id=None, **kw): super().__init__(vineyard_socket=vineyard_socket, object_id=object_id, **kw) def __call__(self, meta): - return self.new_tensor([meta], shape=(np.nan,)) + return self.new_tensor([meta], shape=meta.shape, dtype=meta.dtype) @classmethod def tile(cls, op): @@ -128,20 +128,35 @@ def tile(cls, op): ctx = get_context() - in_chunk_keys = [chunk.key for chunk in op.inputs[0].chunks] out_chunks = [] chunk_map = dict() dtype = None - for chunk, infos in zip( - op.inputs[0].chunks, ctx.get_chunks_result(in_chunk_keys) - ): + in_chunk_keys = [chunk.key for chunk in op.inputs[0].chunks] + in_chunk_results = ctx.get_chunks_result(in_chunk_keys) + + # check if chunk indexes has unknown value + has_unknown_chunk_index = False + for infos in in_chunk_results: + for info in infos[0]: # pragma: no cover + if len(info[4]) == 0 or -1 in info[4]: + has_unknown_chunk_index = True + break + + # assume chunks are row-splitted if chunk index is unknown + chunk_location = 0 + + for chunk, infos in zip(op.inputs[0].chunks, in_chunk_results): for info in infos[0]: # n.b. 1-element ndarray chunk_op = op.copy().reset_key() chunk_op.object_id = info[0] chunk_op.expect_worker = info[1] dtype = info[2] shape = info[3] - chunk_index = info[4] + if has_unknown_chunk_index: # pragma: no cover + chunk_index = (chunk_location,) + chunk_location += 1 + else: + chunk_index = info[4] chunk_map[chunk_index] = info[3] out_chunk = chunk_op.new_chunk( [chunk], shape=shape, dtype=dtype, index=chunk_index @@ -181,6 +196,7 @@ def fromvineyard(tensor, vineyard_socket=None): metaop = TensorFromVineyard( vineyard_socket=vineyard_socket, object_id=object_id, + shape=(np.nan,), dtype=np.dtype("byte"), gpu=None, )