Skip to content

Commit

Permalink
Updates from feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
zeroshade committed Oct 25, 2024
1 parent c6bd66b commit 099e38e
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 14 deletions.
18 changes: 11 additions & 7 deletions cpp/src/arrow/c/abi.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,8 @@ struct ArrowDeviceArrayStream {
#ifndef ARROW_C_ASYNC_STREAM_INTERFACE
# define ARROW_C_ASYNC_STREAM_INTERFACE

// ArrowAsyncTask represents available data from a producer that was passed to
// an invocation of `on_next_task` on the ArrowAsyncDeviceStreamHandler.
// EXPERIMENTAL: ArrowAsyncTask represents available data from a producer that was passed
// to an invocation of `on_next_task` on the ArrowAsyncDeviceStreamHandler.
//
// The reason for this Task approach instead of the Async interface returning
// the Array directly is to allow for more complex thread handling and reducing
Expand Down Expand Up @@ -275,8 +275,8 @@ struct ArrowAsyncTask {
void* private_data;
};

// ArrowAsyncProducer represents a 1-to-1 relationship between an async producer
// and consumer. This object allows the consumer to perform backpressure and flow
// EXPERIMENTAL: ArrowAsyncProducer represents a 1-to-1 relationship between an async
// producer and consumer. This object allows the consumer to perform backpressure and flow
// control on the asynchronous stream processing. This object must be owned by the
// producer who creates it, and thus is responsible for cleaning it up.
struct ArrowAsyncProducer {
Expand Down Expand Up @@ -323,7 +323,7 @@ struct ArrowAsyncProducer {
void* private_data;
};

// Similar to ArrowDeviceArrayStream, except designed for an asynchronous
// EXPERIMENTAL: Similar to ArrowDeviceArrayStream, except designed for an asynchronous
// style of interaction. While ArrowDeviceArrayStream provides producer
// defined callbacks, this is intended to be created by the consumer instead.
// The consumer passes this handler to the producer, which in turn uses the
Expand Down Expand Up @@ -356,8 +356,7 @@ struct ArrowAsyncDeviceStreamHandler {
// A producer that receives a non-zero return here should stop producing and eventually
// call release instead.
int (*on_schema)(struct ArrowAsyncDeviceStreamHandler* self,
struct ArrowAsyncProducer* producer, struct ArrowSchema* stream_schema,
const char* addl_metadata);
struct ArrowSchema* stream_schema, const char* addl_metadata);

// Handler for receiving data. This is called when data is available providing an
// ArrowAsyncTask struct to signify it. The producer indicates the end of the stream
Expand Down Expand Up @@ -423,6 +422,11 @@ struct ArrowAsyncDeviceStreamHandler {
// The release callback must not call any methods of an ArrowAsyncProducer object.
void (*release)(struct ArrowAsyncDeviceStreamHandler* self);

// MUST be populated by the producer BEFORE calling any callbacks other than release.
// This provides the connection between a handler and its producer, and must exist until
// the release callback is called.
struct ArrowAsyncProducer* producer;

// Opaque handler-specific data
void* private_data;
};
Expand Down
30 changes: 23 additions & 7 deletions docs/source/format/CDeviceDataInterface.rst
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,12 @@ serialized.
Async Device Stream Interface
=============================

.. warning::

Experimental: The Aync C Device Stream interface is experimental in its current
form. Based on feedback and usage the protocol definition may change until
it is fully standardized.

The :ref:`C stream interface <c-device-stream-interface>` provides a synchronous
API centered around the consumer calling the producer functions to retrieve
the next record batch. For concurrent communication between producer and consumer,
Expand Down Expand Up @@ -699,7 +705,6 @@ The C device async stream interface consists of three ``struct`` definitions:
struct ArrowAsyncDeviceStreamHandler {
// consumer-specific handlers
int (*on_schema)(struct ArrowAsyncDeviceStreamHandler* self,
struct ArrowAsyncProducer* producer,
struct ArrowSchema* stream_schema, const char* addl_metadata);
int (*on_next_task)(struct ArrowAsyncDeviceStreamHandler* self,
struct ArrowAsyncTask* task, const char* metadata);
Expand All @@ -709,6 +714,9 @@ The C device async stream interface consists of three ``struct`` definitions:
// release callback
void (*release)(struct ArrowAsyncDeviceStreamHandler* self);
// must be populated before calling any callbacks
struct ArrowAsyncProducer* producer;
// opaque handler-specific data
void* private_data;
};
Expand All @@ -727,7 +735,7 @@ The ArrowAsyncDeviceStreamHandler structure

The structure has the following fields:

.. c:member:: int (*ArrowAsyncDeviceStreamHandler.on_schema)(struct ArrowAsyncDeviceStreamHandler*, struct ArrowAsyncProducer*, struct ArrowSchema*, const char*)
.. c:member:: int (*ArrowAsyncDeviceStreamHandler.on_schema)(struct ArrowAsyncDeviceStreamHandler*, struct ArrowSchema*, const char*)
*Mandatory.* Handler for receiving the schema of the stream. All incoming records should
match the provided schema. If successful, the function should return 0, otherwise
Expand All @@ -740,8 +748,8 @@ The structure has the following fields:
the additional metadata beyond the lifetime of this call *MUST* copy the value themselves.

Unless the ``on_error`` handler is called, this will always get called exactly once and will be
the first method called on this object. As such the producer *MUST* provide an ``ArrowAsyncProducer``
object when calling this function to allow the consumer to apply back-pressure and control the flow of data.
the first method called on this object. As such the producer *MUST* populate the ``ArrowAsyncProducer``
member before calling this function to allow the consumer to apply back-pressure and control the flow of data.
The producer maintains ownership of the ``ArrowAsyncProducer`` and must clean it up *after*
calling the release callback on the ``ArrowAsyncDeviceStreamHandler``.

Expand Down Expand Up @@ -799,6 +807,14 @@ The structure has the following fields:
:c:member:`ArrowAsyncProducer.request`. This must not call any methods of an ``ArrowAsyncProducer``
object.

.. c:member:: struct ArrowAsyncProducer ArrowAsyncDeviceStreamHandler.producer
*Mandatory.* The producer object that the consumer will use to request additional data or cancel.

This object *MUST* be populated before calling the :c:member:`ArrowAsyncDeviceStreamHandler.on_schema`
callback. The producer maintains ownership of this object and must clean it up *after* calling
the release callback on the ``ArrowAsyncDeviceStreamHandler``.

.. c:member:: void* ArrowAsyncDeviceStreamHandler.private_data
*Optional.* An opaque pointer to consumer-provided private data.
Expand Down Expand Up @@ -938,9 +954,9 @@ usage as in :ref:`C data interface <c-data-interface-released>`.
ArrowAsyncProducer Lifetime
'''''''''''''''''''''''''''

The lifetime of the ``ArrowAsyncProducer`` passed to ``on_schema`` is owned by the producer
itself and should be managed by it. It *MUST* remain valid at least until just before
calling ``release`` on the stream handler object.
The lifetime of the ``ArrowAsyncProducer`` is owned by the producer itself and should
be managed by it. It *MUST* be populated before calling any methods other than ``release``
and *MUST* remain valid at least until just before calling ``release`` on the stream handler object.

Thread safety
'''''''''''''
Expand Down

0 comments on commit 099e38e

Please sign in to comment.