Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Share events to Python without copying via arrow crate #228

Merged
merged 43 commits into from
Apr 18, 2023
Merged

Conversation

phil-opp
Copy link
Collaborator

@phil-opp phil-opp commented Mar 21, 2023

Implements the proposal in #224 on top of #213.

Future work:

  • Avoid copying outputs by adding a method to prepare a zero-initialized ArrowArray backed by shared memory
  • On sender side: When no drop token is received in reasonable time, remove the shared memory region
    • This will prevent the creation of new mappings to that region
    • Already mapped data is kept and only removed when all processes have unmapped it again -> no undefined behavior
    • Removing the shared memory region makes sense because we probably can't reuse it safely.
  • Support additional Arrow data types (not just arrays)
  • Use Arrow data types in Rust/C/C++ APIs too?

@phil-opp phil-opp changed the base branch from arrow to main April 4, 2023 13:50
@phil-opp phil-opp marked this pull request as ready for review April 4, 2023 16:09
@phil-opp
Copy link
Collaborator Author

phil-opp commented Apr 4, 2023

@haixuanTao I updated the PR with the latest changes from main and I made the changes backwards-compatible by defaulting to PyBytes again. We now pass events as a custom object instead of using a PyDict, which makes it possible to construct the PyBytes on demand on event["data"] access. To avoid copying the data, it is now possible to use event["data_arrow"] instead, which returns an arrow array.

For sending data, there are now two different methods. The send_output method expects data of type PyBytes to be backwards compatible. To send an array in arrow format, there is now a new send_output_arrow method. Right now it also needs to copy the data because we need to move it to shared memory. In the future, we should add an API to construct an output array directly in shared memory, but I think we can do that in a later PR.

@phil-opp phil-opp requested a review from haixuanTao April 4, 2023 16:15
@haixuanTao
Copy link
Collaborator

Hi Philipp, thanks for making this possible!

I don´t really want to maintain both PyBytes and Arrow. PyBytes has its fair share of edges as well in Python. I would prefer to only keep one if that's possible. Did you keep the PyBytes because you feel like Arrow is still quite unsafe?

Or it's too not break backward compatibility only, which I can manage for dora-drives. PyArrow and np.array are quite interchangeable so in the case of dora-drives. The replacement should be quite easy.

@haixuanTao
Copy link
Collaborator

haixuanTao commented Apr 5, 2023

Avoid copying outputs by adding a method to prepare a zero-initialized ArrowArray backed by shared memory.

I think we can postpone this. Especially if it requires to add another API endpoint. If we have zero-copy on read the performance will already be really good.

@haixuanTao
Copy link
Collaborator

haixuanTao commented Apr 5, 2023

Support additional Arrow data types (not just arrays)

So, I think that we don't have to support all data-types. But one that can be interesting is a HashMap data-types. This way we can encapsulate our metadata type as Arrow. We can even use something like serde_arrow and derive our struct without having to do all the ser/de implementation at the API level. ( e.g. : metadata_to_pydict ). But, it seems like a big undertaking on the deserialization side, and i'm not sure about the quality and usability of Pyarrow API on those exotic type (See). In case it's too difficult I would suggest we postpone it until people request it.

One thing that I personally think is really important is to be able to forward the data-shape, the number of column and row.

I don't know how hard are those implementation, we can postpone it if it's too time-consuming and we want to ship the arrow API first.

We can postpone all other data-types for now.

@haixuanTao
Copy link
Collaborator

haixuanTao commented Apr 5, 2023

Use Arrow data types in Rust/C/C++ APIs too?

I think that we can postpone this until we have more clarity on the Arrow feature set we fill comfortable with. This would avoid having to maintain the 3 API while proving the concept.

@haixuanTao
Copy link
Collaborator

haixuanTao commented Apr 5, 2023

I'm having issue running the python operator example. It seems that the data is not been received by the receiving node, even though it's sent by the sent node. I have tried to use data_arrow and send_output_arrow but it did not work.

@phil-opp phil-opp mentioned this pull request Apr 5, 2023
@phil-opp
Copy link
Collaborator Author

phil-opp commented Apr 5, 2023

Thanks for testing! I'm not sure what's going wrong yet, by I see these issues too. To make debugging easier, I started to split off unrelated changes into separate PRs, starting with #246.

@phil-opp phil-opp changed the base branch from main to update-pyo3 April 5, 2023 08:22
haixuanTao and others added 8 commits April 5, 2023 10:25
Don't bind the lifetime of the event to the `next` call anymore. This makes it possible to use the original event in Python, which does not support borrowed data or lifetimes. The drawback is that we no longer have the guarantee that an event is freed before the next call to `recv`.
…sical array

Some applications might not need the data in arrow format. By creating a custom object, we can provide the data both as a classical `PyBytes` and as a zero-copy arrow array.
…ibility

Sending bytes using arrow is possible through new `send_output_arrow` method.
Base automatically changed from update-pyo3 to main April 5, 2023 09:16
There might still be some pending drop tokens after the receiving end of the event stream was closed. So we don't want to break from the receiver thread directly. Instead we keep it running until the control channel signals that it expects no more drop tokens by closing the `finished_drop_tokens` channel. This happens either when all required drop tokens were received, or because of a timeout.
The Python garbage collection will drop them non-deterministically in the background. Also, the dropping cannot happen while the GIL is held by our wait code.
We only support `UInt8` arrays right now.
This is mostly about convenience when working with arrow data types, as the data is still being copied once.
@phil-opp
Copy link
Collaborator Author

Thanks for testing! I just added support for sending arrow arrays from operators in 235f6da.

@haixuanTao
Copy link
Collaborator

So, there is a weird error that prevent user from calling two times dora_event["data"], in the sense that:

    def on_event(
        self,
        dora_event: dict,
        send_output: Callable[[str, bytes], None],
    ) -> DoraStatus:
        if dora_event["type"] == "INPUT":
            length = dora_event["value"]

            length = dora_event["value"] # <--- this will fail

        return DoraStatus.CONTINUE

The error is going to be the following:

value.ok_or_else(|| PyLookupError::new_err(format!("event has no property `{key}`")))

We can either fix it or give a heads up.

@phil-opp
Copy link
Collaborator Author

Thanks for testing! I pushed c1544f8 to fix this issue.

@haixuanTao
Copy link
Collaborator

haixuanTao commented Apr 14, 2023

Just FYI, I have tested it again but i'm seeing slightly longer latency with the same code replacing node with operator. I will investigate more later on.

Size Latency Node Latency Operator
8 514.0151 7823.535460006497
64 518.3472199999999 1139.437851472196
512 539.93719 1319.1025445623814
2048 507.9925899999999 1319.3751484989364
4096 610.57567 1377.7049504851404
16384 760.21873 1210.5615544644727
40960 2484.4595199999994 549.8982871241685
409600 2773.14837 841.9508415788033
4096000 1728.2458500000002 4740.129366341941
40960000 6397.578890000001 29820.334663385765

edit (phil-opp): formatted as table

original data
dora Node,8,514.0151
dora Node,64,518.3472199999999
dora Node,512,539.93719
dora Node,2048,507.9925899999999
dora Node,4096,610.57567
dora Node,16384,760.21873
dora Node,40960,2484.4595199999994
dora Node,409600,2773.14837
dora Node,4096000,1728.2458500000002
dora Node,40960000,6397.578890000001
dora Operator,8,7823.535460006497
dora Operator,64,1139.437851472196
dora Operator,512,1319.1025445623814
dora Operator,2048,1319.3751484989364
dora Operator,4096,1377.7049504851404
dora Operator,16384,1210.5615544644727
dora Operator,40960,549.8982871241685
dora Operator,409600,841.9508415788033
dora Operator,4096000,4740.129366341941
dora Operator,40960000,29820.334663385765

@haixuanTao
Copy link
Collaborator

haixuanTao commented Apr 14, 2023

So, I think that's its linked to the current implementation of the SendOuputCallback

impl SendOutputCallback {
fn __call__(
that requires an additional ownership to share the data with the runtime through a channel.

The additional ownership hapens here:

let data = process_python_output(&data, py, |data| Ok(data.to_owned()))?;

because sharing data between operator and runtime is defined as

pub enum OperatorEvent {
Output {
output_id: DataId,
metadata: MetadataParameters<'static>,
data: Vec<u8>,
},

I wonder if we can remove the runtime channel copy easily.

This additional copy was likely already present before this pull request. Therefore we can make it a separate PR as it is only loosely linked to the current changes.

@phil-opp
Copy link
Collaborator Author

phil-opp commented Apr 15, 2023

Thanks for investigating!

Just FYI, I have tested it again but i'm seeing slightly longer latency with the same code replacing node with operator.

It's a bit strange that the operator latency is much smaller than the node latency for message sizes 40960 and 409600. For all other data sizes, it's the other way around. Do you have any idea why this is the case?

So, I think that's its linked to the current implementation of the SendOuputCallback that requires an additional ownership to share the data with the runtime through a channel.

Yeah, this definitely has some additional costs for larger messages. I'll look into it.

I'm not sure what's causing the large latency for small messages though..

@haixuanTao
Copy link
Collaborator

Thanks for investigating!

Just FYI, I have tested it again but i'm seeing slightly longer latency with the same code replacing node with operator.

It's a bit strange that the operator latency is much smaller than the node latency for message sizes 40960 and 409600. For all other data sizes, it's the other way around. Do you have any idea why this is the case?

will check absolutely as well as for small message!

Instead of doing an additional copy to send them from the operator thread to the runtime thread.

This commit uses the new `allocate_data_sample` and `send_output_sample` methods introduced in d7cd370.
@phil-opp
Copy link
Collaborator Author

So, I think that's its linked to the current implementation of the SendOuputCallback that requires an additional ownership to share the data with the runtime through a channel.

Yeah, this definitely has some additional costs for larger messages. I'll look into it.

Should be fixed by 1948a45.

@haixuanTao
Copy link
Collaborator

I know get the close latency value for both node and operator:

dora Node,8,545.4713700000001
dora Node,64,533.91939
dora Node,512,548.10928
dora Node,2048,851.3406300000001
dora Node,4096,1078.9238400000002
dora Node,16384,1121.3838500000002
dora Node,40960,1113.2833500000002
dora Node,409600,1230.84563
dora Node,4096000,1636.1486600000003
dora Node,40960000,4848.450280000001
dora Node,8,908.44376
dora Operator,8,817.0425798016367
dora Operator,64,688.4469109383357
dora Operator,512,1090.3208020692437
dora Operator,2048,1117.994148606356
dora Operator,4096,1177.8792475452785
dora Operator,16384,1150.2645544835716
dora Operator,40960,1158.827108732715
dora Operator,409600,1305.7527920152845
dora Operator,4096000,1759.6348711101587
dora Operator,40960000,4455.981317012557

Thanks for the quick fixe.

@phil-opp
Copy link
Collaborator Author

Great, thanks for testing! Is there anything else blocking this PR or do you think it's ready to be merged?

@haixuanTao
Copy link
Collaborator

Great, thanks for testing! Is there anything else blocking this PR or do you think it's ready to be merged?

I'm working on the dora-drives export to arrow. There is couple of things I'm still working on. I should be able to approve this PR by today.

@haixuanTao
Copy link
Collaborator

Ok, so I finally find out my issue within dora-drives which is unrelated to our implementation.

I had an issue with numpy that was badly converting types to uint8 using .view(np.uint8) when the array was non-contiguous.

This has been fix in numpy>=1.24 but is a silent error for previous version.

@haixuanTao haixuanTao merged commit 86b2a53 into main Apr 18, 2023
@haixuanTao haixuanTao deleted the arrow-fixes branch April 18, 2023 03:56
@phil-opp phil-opp linked an issue Apr 18, 2023 that may be closed by this pull request
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Zero-copy Python inputs using arrow
2 participants