Skip to content

Commit

Permalink
apacheGH-37910: [Java][Integration] Implement C Data Interface integr…
Browse files Browse the repository at this point in the history
…ation testing
  • Loading branch information
pitrou committed Oct 19, 2023
1 parent 883a439 commit 1b9f547
Show file tree
Hide file tree
Showing 21 changed files with 372 additions and 134 deletions.
4 changes: 2 additions & 2 deletions ci/scripts/integration_arrow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ arrow_dir=${1}
gold_dir=$arrow_dir/testing/data/arrow-ipc-stream/integration

pip install -e $arrow_dir/dev/archery[integration]
# For C# C Data Interface testing
pip install pythonnet
# For C Data Interface testing
pip install jpype1 pythonnet

# Get more detailed context on crashes
export PYTHONFAULTHANDLER=1
Expand Down
18 changes: 14 additions & 4 deletions dev/archery/archery/integration/cdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ def ffi() -> cffi.FFI:
return ffi


def _release_memory_steps(exporter: CDataExporter, importer: CDataImporter):
yield
for i in range(max(exporter.required_gc_runs, importer.required_gc_runs)):
importer.run_gc()
yield
exporter.run_gc()
yield


@contextmanager
def check_memory_released(exporter: CDataExporter, importer: CDataImporter):
"""
Expand All @@ -96,12 +105,13 @@ def check_memory_released(exporter: CDataExporter, importer: CDataImporter):
if do_check:
before = exporter.record_allocation_state()
yield
# We don't use a `finally` clause: if the enclosed block raised an
# exception, no need to add another one.
# Only check for memory state if `yield` didn't raise.
if do_check:
ok = exporter.compare_allocation_state(before, importer.gc_until)
if not ok:
for _ in _release_memory_steps(exporter, importer):
after = exporter.record_allocation_state()
if after == before:
break
if after != before:
raise RuntimeError(
f"Memory was not released correctly after roundtrip: "
f"before = {before}, after = {after} (should have been equal)")
1 change: 0 additions & 1 deletion dev/archery/archery/integration/datagen.py
Original file line number Diff line number Diff line change
Expand Up @@ -1722,7 +1722,6 @@ def generate_dictionary_unsigned_case():

# TODO: JavaScript does not support uint64 dictionary indices, so disabled
# for now

# dict3 = Dictionary(3, StringField('dictionary3'), size=5, name='DICT3')
fields = [
DictionaryField('f0', get_field('', 'uint8'), dict0),
Expand Down
23 changes: 12 additions & 11 deletions dev/archery/archery/integration/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,17 +421,18 @@ def _compare_c_data_implementations(
# Serial execution is required for proper memory accounting
serial = True

exporter = producer.make_c_data_exporter()
importer = consumer.make_c_data_importer()

case_runner = partial(self._run_c_schema_test_case, producer, consumer,
exporter, importer)
self._run_test_cases(case_runner, self.json_files, serial=serial)

if producer.C_DATA_ARRAY_EXPORTER and consumer.C_DATA_ARRAY_IMPORTER:
case_runner = partial(self._run_c_array_test_cases, producer, consumer,
exporter, importer)
self._run_test_cases(case_runner, self.json_files, serial=serial)
with producer.make_c_data_exporter() as exporter:
with consumer.make_c_data_importer() as importer:
case_runner = partial(self._run_c_schema_test_case,
producer, consumer,
exporter, importer)
self._run_test_cases(case_runner, self.json_files, serial=serial)

if producer.C_DATA_ARRAY_EXPORTER and consumer.C_DATA_ARRAY_IMPORTER:
case_runner = partial(self._run_c_array_test_cases,
producer, consumer,
exporter, importer)
self._run_test_cases(case_runner, self.json_files, serial=serial)

def _run_c_schema_test_case(self,
producer: Tester, consumer: Tester,
Expand Down
104 changes: 56 additions & 48 deletions dev/archery/archery/integration/tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,52 +68,52 @@ def supports_releasing_memory(self) -> bool:
Whether the implementation is able to release memory deterministically.
Here, "release memory" means that, after the `release` callback of
a C Data Interface export is called, `compare_allocation_state` is
able to trigger the deallocation of the memory underlying the export
(for example buffer data).
a C Data Interface export is called, `run_gc` is able to trigger
the deallocation of the memory underlying the export (such as buffer data).
If false, then `record_allocation_state` and `compare_allocation_state`
are allowed to raise NotImplementedError.
If false, then `record_allocation_state` is allowed to raise
NotImplementedError.
"""

def record_allocation_state(self) -> object:
"""
Record the current memory allocation state.
Return the current memory allocation state.
Returns
-------
state : object
Opaque object representing the allocation state,
for example the number of allocated bytes.
Equality-comparable object representing the allocation state,
for example the number of allocated or exported bytes.
"""
raise NotImplementedError

def compare_allocation_state(self, recorded: object,
gc_until: typing.Callable[[_Predicate], bool]
) -> bool:
def run_gc(self):
"""
Compare the current memory allocation state with the recorded one.
Run the GC if necessary.
Parameters
----------
recorded : object
The previous allocation state returned by
`record_allocation_state()`
gc_until : callable
A callable itself accepting a callable predicate, and
returning a boolean.
`gc_until` should try to release memory until the predicate
becomes true, or until it decides to give up. The final value
of the predicate should be returned.
`gc_until` is typically provided by the C Data Interface importer.
This should ensure that any temporary objects and data created by
previous exporter calls are collected.
"""

Returns
-------
success : bool
Whether memory allocation state finally reached its previously
recorded value.
@property
def required_gc_runs(self):
"""
raise NotImplementedError
The maximum number of calls to `run_gc` that need to be issued to
ensure proper deallocation. Some implementations may require this
to be greater than one.
"""
return 1

def close(self):
"""
Final cleanup after usage.
"""

def __enter__(self):
return self

def __exit__(self, *exc):
self.close()


class CDataImporter(ABC):
Expand Down Expand Up @@ -163,32 +163,40 @@ def supports_releasing_memory(self) -> bool:
"""
Whether the implementation is able to release memory deterministically.
Here, "release memory" means calling the `release` callback of
a C Data Interface export (which should then trigger a deallocation
mechanism on the exporter).
Here, "release memory" means `run_gc()` is able to trigger the
`release` callback of a C Data Interface export (which would then
induce a deallocation mechanism on the exporter).
"""

If false, then `gc_until` is allowed to raise NotImplementedError.
def run_gc(self):
"""
Run the GC if necessary.
def gc_until(self, predicate: _Predicate):
This should ensure that any imported data has its release callback called.
"""
Try to release memory until the predicate becomes true, or fail.

Depending on the CDataImporter implementation, this may for example
try once, or run a garbage collector a given number of times, or
any other implementation-specific strategy for releasing memory.
@property
def required_gc_runs(self):
"""
The maximum number of calls to `run_gc` that need to be issued to
ensure release callbacks are triggered. Some implementations may
require this to be greater than one.
"""
return 1

The running time should be kept reasonable and compatible with
execution of multiple C Data integration tests.
def close(self):
"""
Final cleanup after usage.
"""

This should not raise if `supports_releasing_memory` is true.
def __enter__(self):
return self

Returns
-------
success : bool
The final value of the predicate.
"""
raise NotImplementedError
def __exit__(self, *exc):
# Make sure any exported data is released.
for i in range(self.required_gc_runs):
self.run_gc()
self.close()


class Tester:
Expand Down
11 changes: 0 additions & 11 deletions dev/archery/archery/integration/tester_cpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,6 @@ def supports_releasing_memory(self):
def record_allocation_state(self):
return self.dll.ArrowCpp_BytesAllocated()

def compare_allocation_state(self, recorded, gc_until):
def pred():
# No GC on our side, so just compare allocation state
return self.record_allocation_state() == recorded

return gc_until(pred)


class CppCDataImporter(CDataImporter, _CDataBase):

Expand All @@ -247,7 +240,3 @@ def import_batch_and_compare_to_json(self, json_path, num_batch,
@property
def supports_releasing_memory(self):
return True

def gc_until(self, predicate):
# No GC on our side, so can evaluate predicate immediately
return predicate()
19 changes: 9 additions & 10 deletions dev/archery/archery/integration/tester_csharp.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
# under the License.

from contextlib import contextmanager
import gc
import os

from . import cdata
Expand Down Expand Up @@ -82,6 +81,10 @@ def _read_batch_from_json(self, json_path, num_batch):
schema = jf.Schema.ToArrow()
return schema, jf.Batches[num_batch].ToArrow(schema)

def _run_gc(self):
from Apache.Arrow.IntegrationTest import CDataInterface
CDataInterface.RunGC()


class CSharpCDataExporter(CDataExporter, _CDataBase):

Expand All @@ -105,6 +108,9 @@ def supports_releasing_memory(self):
# XXX the C# GC doesn't give reliable allocation measurements
return False

def run_gc(self):
self._run_gc()


class CSharpCDataImporter(CDataImporter, _CDataBase):

Expand Down Expand Up @@ -134,15 +140,8 @@ def import_batch_and_compare_to_json(self, json_path, num_batch,
def supports_releasing_memory(self):
return True

def gc_until(self, predicate):
from Apache.Arrow.IntegrationTest import CDataInterface
for i in range(3):
if predicate():
return True
# Collect any C# objects hanging around through Python
gc.collect()
CDataInterface.RunGC()
return predicate()
def run_gc(self):
self._run_gc()


class CSharpTester(Tester):
Expand Down
18 changes: 2 additions & 16 deletions dev/archery/archery/integration/tester_go.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,6 @@ def _check_go_error(self, go_error):
finally:
self.dll.ArrowGo_FreeError(go_error)

def _run_gc(self):
self.dll.ArrowGo_RunGC()


class GoCDataExporter(CDataExporter, _CDataBase):
# Note: the Arrow Go C Data export functions expect their output
Expand All @@ -225,14 +222,10 @@ def supports_releasing_memory(self):
return True

def record_allocation_state(self):
self._run_gc()
return self.dll.ArrowGo_BytesAllocated()

def compare_allocation_state(self, recorded, gc_until):
def pred():
return self.record_allocation_state() == recorded

return gc_until(pred)
# Note: no need to call the Go GC anywhere thanks to Arrow Go's
# explicit refcounting.


class GoCDataImporter(CDataImporter, _CDataBase):
Expand All @@ -252,10 +245,3 @@ def import_batch_and_compare_to_json(self, json_path, num_batch,
@property
def supports_releasing_memory(self):
return True

def gc_until(self, predicate):
for i in range(10):
if predicate():
return True
self._run_gc()
return False
Loading

0 comments on commit 1b9f547

Please sign in to comment.