diff --git a/ci/scripts/integration_arrow.sh b/ci/scripts/integration_arrow.sh index 289d376a4db9b..c16a721637bb6 100755 --- a/ci/scripts/integration_arrow.sh +++ b/ci/scripts/integration_arrow.sh @@ -23,22 +23,23 @@ arrow_dir=${1} gold_dir=$arrow_dir/testing/data/arrow-ipc-stream/integration pip install -e $arrow_dir/dev/archery[integration] -# For C# C Data Interface testing -pip install pythonnet +# For C Data Interface testing +pip install jpype1 pythonnet # Get more detailed context on crashes export PYTHONFAULTHANDLER=1 +# --run-ipc \ +# --run-flight \ + # Rust can be enabled by exporting ARCHERY_INTEGRATION_WITH_RUST=1 time archery integration \ --run-c-data \ - --run-ipc \ - --run-flight \ --with-cpp=1 \ - --with-csharp=1 \ + --with-csharp=0 \ --with-java=1 \ - --with-js=1 \ - --with-go=1 \ + --with-js=0 \ + --with-go=0 \ --gold-dirs=$gold_dir/0.14.1 \ --gold-dirs=$gold_dir/0.17.1 \ --gold-dirs=$gold_dir/1.0.0-bigendian \ diff --git a/dev/archery/archery/integration/datagen.py b/dev/archery/archery/integration/datagen.py index 01672fbe7488a..3b4ac6c5f0944 100644 --- a/dev/archery/archery/integration/datagen.py +++ b/dev/archery/archery/integration/datagen.py @@ -1700,9 +1700,9 @@ def generate_unions_case(): def generate_dictionary_case(): - dict0 = Dictionary(0, StringField('dictionary1'), size=10, name='DICT0') - dict1 = Dictionary(1, StringField('dictionary1'), size=5, name='DICT1') - dict2 = Dictionary(2, get_field('dictionary2', 'int64'), + dict0 = Dictionary(1, StringField('dictionary1'), size=10, name='DICT0') + dict1 = Dictionary(2, StringField('dictionary1'), size=5, name='DICT1') + dict2 = Dictionary(3, get_field('dictionary2', 'int64'), size=50, name='DICT2') fields = [ @@ -1716,14 +1716,13 @@ def generate_dictionary_case(): def generate_dictionary_unsigned_case(): - dict0 = Dictionary(0, StringField('dictionary0'), size=5, name='DICT0') - dict1 = Dictionary(1, StringField('dictionary1'), size=5, name='DICT1') - dict2 = Dictionary(2, StringField('dictionary2'), size=5, name='DICT2') + dict0 = Dictionary(1, StringField('dictionary0'), size=5, name='DICT0') + dict1 = Dictionary(2, StringField('dictionary1'), size=5, name='DICT1') + dict2 = Dictionary(3, StringField('dictionary2'), size=5, name='DICT2') # TODO: JavaScript does not support uint64 dictionary indices, so disabled # for now - - # dict3 = Dictionary(3, StringField('dictionary3'), size=5, name='DICT3') + # dict3 = Dictionary(4, StringField('dictionary3'), size=5, name='DICT3') fields = [ DictionaryField('f0', get_field('', 'uint8'), dict0), DictionaryField('f1', get_field('', 'uint16'), dict1), @@ -1736,18 +1735,18 @@ def generate_dictionary_unsigned_case(): def generate_nested_dictionary_case(): - dict0 = Dictionary(0, StringField('str'), size=10, name='DICT0') + dict0 = Dictionary(1, StringField('str'), size=10, name='DICT0') list_of_dict = ListField( 'list', DictionaryField('str_dict', get_field('', 'int8'), dict0)) - dict1 = Dictionary(1, list_of_dict, size=30, name='DICT1') + dict1 = Dictionary(2, list_of_dict, size=30, name='DICT1') struct_of_dict = StructField('struct', [ DictionaryField('str_dict_a', get_field('', 'int8'), dict0), DictionaryField('str_dict_b', get_field('', 'int8'), dict0) ]) - dict2 = Dictionary(2, struct_of_dict, size=30, name='DICT2') + dict2 = Dictionary(3, struct_of_dict, size=30, name='DICT2') fields = [ DictionaryField('list_dict', get_field('', 'int8'), dict1), @@ -1760,7 +1759,7 @@ def generate_nested_dictionary_case(): def generate_extension_case(): - dict0 = Dictionary(0, StringField('dictionary0'), size=5, name='DICT0') + dict0 = Dictionary(1, StringField('dictionary0'), size=5, name='DICT0') uuid_type = ExtensionType('uuid', 'uuid-serialized', FixedSizeBinaryField('', 16)) diff --git a/dev/archery/archery/integration/tester_java.py b/dev/archery/archery/integration/tester_java.py index 45855079eb72e..122b15aa560aa 100644 --- a/dev/archery/archery/integration/tester_java.py +++ b/dev/archery/archery/integration/tester_java.py @@ -16,10 +16,12 @@ # under the License. import contextlib +import functools import os import subprocess -from .tester import Tester +from . import cdata +from .tester import Tester, CDataExporter, CDataImporter from .util import run_cmd, log from ..utils.source import ARROW_ROOT_DEFAULT @@ -42,18 +44,25 @@ def load_version_from_pom(): "ARROW_JAVA_INTEGRATION_JAR", os.path.join( ARROW_ROOT_DEFAULT, - "java/tools/target/arrow-tools-{}-" - "jar-with-dependencies.jar".format(_arrow_version), - ), + "java/tools/target", + f"arrow-tools-{_arrow_version}-jar-with-dependencies.jar" + ) +) +_ARROW_C_DATA_JAR = os.environ.get( + "ARROW_C_DATA_JAVA_INTEGRATION_JAR", + os.path.join( + ARROW_ROOT_DEFAULT, + "java/c/target", + f"arrow-c-data-{_arrow_version}.jar" + ) ) _ARROW_FLIGHT_JAR = os.environ.get( "ARROW_FLIGHT_JAVA_INTEGRATION_JAR", os.path.join( ARROW_ROOT_DEFAULT, - "java/flight/flight-integration-tests/target/" - "flight-integration-tests-{}-jar-with-dependencies.jar".format( - _arrow_version), - ), + "java/flight/flight-integration-tests/target", + f"flight-integration-tests-{_arrow_version}-jar-with-dependencies.jar" + ) ) _ARROW_FLIGHT_SERVER = ( "org.apache.arrow.flight.integration.tests.IntegrationTestServer" @@ -63,11 +72,151 @@ def load_version_from_pom(): ) +@functools.lru_cache +def setup_jpype(): + import jpype + jar_path = f"{_ARROW_TOOLS_JAR}:{_ARROW_C_DATA_JAR}" + # XXX Didn't manage to tone down the logging level here (DEBUG -> INFO) + jpype.startJVM(jpype.getDefaultJVMPath(), + "-Djava.class.path=" + jar_path) + + +class _CDataBase: + + def __init__(self, debug, args): + import jpype + self.debug = debug + self.args = args + self.ffi = cdata.ffi() + setup_jpype() + # JPype pointers to java.io, org.apache.arrow... + self.java_io = jpype.JPackage("java").io + self.java_arrow = jpype.JPackage("org").apache.arrow + self.java_allocator = self._make_java_allocator() + + def _pointer_to_int(self, c_ptr): + return int(self.ffi.cast('uintptr_t', c_ptr)) + + def _wrap_c_schema_ptr(self, c_schema_ptr): + return self.java_arrow.c.ArrowSchema.wrap( + self._pointer_to_int(c_schema_ptr)) + + def _wrap_c_array_ptr(self, c_array_ptr): + return self.java_arrow.c.ArrowArray.wrap( + self._pointer_to_int(c_array_ptr)) + + def _make_java_allocator(self): + # Return a new allocator + return self.java_arrow.memory.RootAllocator() + + def _assert_schemas_equal(self, expected, actual): + # XXX This is fragile for dictionaries, as Schema.equals compares + # dictionary ids! + # Should perhaps instead add a logical comparison function in + # org.apache.arrow.vector.util.DictionaryUtil + if not expected.equals(actual): + raise AssertionError( + f"Java Schemas are not equal:\n" + f"* expected = {expected.toString()}\n" + f"* actual = {actual.toString()}") + + +class JavaCDataExporter(CDataExporter, _CDataBase): + + def export_schema_from_json(self, json_path, c_schema_ptr): + json_file = self.java_io.File(json_path) + with self.java_arrow.vector.ipc.JsonFileReader( + json_file, self.java_allocator) as json_reader: + schema = json_reader.start() + dict_provider = json_reader + self.java_arrow.c.Data.exportSchema( + self.java_allocator, schema, dict_provider, + self._wrap_c_schema_ptr(c_schema_ptr) + ) + + def export_batch_from_json(self, json_path, num_batch, c_array_ptr): + json_file = self.java_io.File(json_path) + with self.java_arrow.vector.ipc.JsonFileReader( + json_file, self.java_allocator) as json_reader: + json_reader.start() + if num_batch > 0: + actually_skipped = json_reader.skip(num_batch) + assert actually_skipped == num_batch + with json_reader.read() as batch: + dict_provider = json_reader + self.java_arrow.c.Data.exportVectorSchemaRoot( + self.java_allocator, batch, dict_provider, + self._wrap_c_array_ptr(c_array_ptr)) + + @property + def supports_releasing_memory(self): + return True + + def record_allocation_state(self): + return self.java_allocator.getAllocatedMemory() + + def compare_allocation_state(self, recorded, gc_until): + def pred(): + return self.java_allocator.getAllocatedMemory() == recorded + + return gc_until(pred) + + +class JavaCDataImporter(CDataImporter, _CDataBase): + + def import_schema_and_compare_to_json(self, json_path, c_schema_ptr): + json_file = self.java_io.File(json_path) + with self.java_arrow.vector.ipc.JsonFileReader( + json_file, self.java_allocator) as json_reader: + json_schema = json_reader.start() + with self.java_arrow.c.CDataDictionaryProvider() as dict_provider: + imported_schema = self.java_arrow.c.Data.importSchema( + self.java_allocator, + self._wrap_c_schema_ptr(c_schema_ptr), + dict_provider) + self._assert_schemas_equal(json_schema, imported_schema) + + def import_batch_and_compare_to_json(self, json_path, num_batch, + c_array_ptr): + json_file = self.java_io.File(json_path) + with self.java_arrow.vector.ipc.JsonFileReader( + json_file, self.java_allocator) as json_reader: + schema = json_reader.start() + if num_batch > 0: + actually_skipped = json_reader.skip(num_batch) + assert actually_skipped == num_batch + with (json_reader.read() as batch, + self.java_arrow.vector.VectorSchemaRoot.create( + schema, self.java_allocator) as imported_batch): + # We need to pass a dict provider primed with dictionary ids + # matching those in the schema, hence an empty + # CDataDictionaryProvider would not work here! + dict_provider = json_reader + self.java_arrow.c.Data.importIntoVectorSchemaRoot( + self.java_allocator, + self._wrap_c_array_ptr(c_array_ptr), + imported_batch, dict_provider) + # TODO print nice error message if not equal + assert imported_batch.equals(batch) + + @property + def supports_releasing_memory(self): + return True + + def gc_until(self, predicate): + # No need to call the Java GC thanks to AutoCloseable (?) + return predicate() + + class JavaTester(Tester): PRODUCER = True CONSUMER = True FLIGHT_SERVER = True FLIGHT_CLIENT = True + C_DATA_SCHEMA_EXPORTER = True + C_DATA_SCHEMA_IMPORTER = True + C_DATA_ARRAY_EXPORTER = True + C_DATA_ARRAY_IMPORTER = True name = 'Java' @@ -186,3 +335,9 @@ def flight_server(self, scenario_name=None): finally: server.kill() server.wait(5) + + def make_c_data_exporter(self): + return JavaCDataExporter(self.debug, self.args) + + def make_c_data_importer(self): + return JavaCDataImporter(self.debug, self.args) diff --git a/docker-compose.yml b/docker-compose.yml index 62e5aee0a841c..133c7740e1b97 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1730,16 +1730,21 @@ services: volumes: *conda-volumes environment: <<: [*common, *ccache] - # tell archery where the arrow binaries are located + ARCHERY_INTEGRATION_WITH_RUST: 0 + # Tell Archery where the arrow C++ binaries are located ARROW_CPP_EXE_PATH: /build/cpp/debug ARROW_GO_INTEGRATION: 1 - ARCHERY_INTEGRATION_WITH_RUST: 0 + ARROW_JAVA_CDATA: "ON" + JAVA_JNI_CMAKE_ARGS: >- + -DARROW_JAVA_JNI_ENABLE_DEFAULT=OFF + -DARROW_JAVA_JNI_ENABLE_C=ON command: ["/arrow/ci/scripts/rust_build.sh /arrow /build && /arrow/ci/scripts/cpp_build.sh /arrow /build && /arrow/ci/scripts/csharp_build.sh /arrow /build && /arrow/ci/scripts/go_build.sh /arrow && - /arrow/ci/scripts/java_build.sh /arrow /build && + /arrow/ci/scripts/java_jni_build.sh /arrow $${ARROW_HOME} /build /tmp/dist/java/$$(arch) && + /arrow/ci/scripts/java_build.sh /arrow /build /tmp/dist/java && /arrow/ci/scripts/js_build.sh /arrow /build && /arrow/ci/scripts/integration_arrow.sh /arrow /build"] diff --git a/java/c/src/main/java/org/apache/arrow/c/Format.java b/java/c/src/main/java/org/apache/arrow/c/Format.java index 315d3caad7da2..2875e46f749c4 100644 --- a/java/c/src/main/java/org/apache/arrow/c/Format.java +++ b/java/c/src/main/java/org/apache/arrow/c/Format.java @@ -138,6 +138,8 @@ static String asString(ArrowType arrowType) { return "tiD"; case YEAR_MONTH: return "tiM"; + case MONTH_DAY_NANO: + return "tin"; default: throw new UnsupportedOperationException( String.format("Interval type with unit %s is unsupported", type.getUnit())); @@ -277,6 +279,8 @@ static ArrowType asType(String format, long flags) return new ArrowType.Interval(IntervalUnit.YEAR_MONTH); case "tiD": return new ArrowType.Interval(IntervalUnit.DAY_TIME); + case "tin": + return new ArrowType.Interval(IntervalUnit.MONTH_DAY_NANO); case "+l": return new ArrowType.List(); case "+L": diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java index 742daeef255f8..dca1fdfe92c5d 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java @@ -237,6 +237,27 @@ public VectorSchemaRoot read() throws IOException { } } + /** + * Skips a number of record batches in the file. + * + * @param numBatches the number of batches to skip + * @return the actual number of skipped batches. + */ + public int skip(int numBatches) throws IOException { + for (int i = 0; i < numBatches; ++i) { + JsonToken t = parser.nextToken(); + if (t == START_OBJECT) { + parser.skipChildren(); + assert parser.getCurrentToken() == END_OBJECT; + } else if (t == END_ARRAY) { + return i; + } else { + throw new IllegalArgumentException("Invalid token: " + t); + } + } + return numBatches; + } + private abstract class BufferReader { protected abstract ArrowBuf read(BufferAllocator allocator, int count) throws IOException; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java index 83a8ece0bfb06..f81d049a9257f 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java @@ -112,8 +112,8 @@ public ArrowRecordBatch( } long size = arrowBuf.readableBytes(); arrowBuffers.add(new ArrowBuffer(offset, size)); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Buffer in RecordBatch at {}, length: {}", offset, size); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Buffer in RecordBatch at {}, length: {}", offset, size); } offset += size; if (alignBuffers) { // align on 8 byte boundaries