Skip to content

Commit

Permalink
WIP: Go C Data Integration
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Sep 19, 2023
1 parent 3b646ad commit 3d145ff
Show file tree
Hide file tree
Showing 7 changed files with 392 additions and 29 deletions.
30 changes: 25 additions & 5 deletions ci/scripts/integration_arrow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,35 @@ gold_dir=$arrow_dir/testing/data/arrow-ipc-stream/integration

pip install -e $arrow_dir/dev/archery[integration]

# --run-ipc \
# --run-flight \

# XXX Can we better integrate this with the rest of the Go build tooling?
pushd ${arrow_dir}/go/arrow/internal/cdata_integration

case "$(uname)" in
Linux)
go_lib="arrow_go_integration.so"
;;
Darwin)
go_lib="arrow_go_integration.so"
;;
MINGW*)
go_lib="arrow_go_integration.dll"
;;
esac

go build -tags cdata_integration,assert -buildmode=c-shared -o ${go_lib} .

popd

# Rust can be enabled by exporting ARCHERY_INTEGRATION_WITH_RUST=1
time archery integration \
--run-c-data \
--run-ipc \
--run-flight \
--with-cpp=1 \
--with-csharp=1 \
--with-java=1 \
--with-js=1 \
--with-csharp=0 \
--with-java=0 \
--with-js=0 \
--with-go=1 \
--gold-dirs=$gold_dir/0.14.1 \
--gold-dirs=$gold_dir/0.17.1 \
Expand Down
135 changes: 134 additions & 1 deletion dev/archery/archery/integration/tester_go.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
# under the License.

import contextlib
import functools
import os
import subprocess

from .tester import Tester
from . import cdata
from .tester import Tester, CDataExporter, CDataImporter
from .util import run_cmd, log
from ..utils.source import ARROW_ROOT_DEFAULT


# FIXME(sbinet): revisit for Go modules
Expand All @@ -39,12 +42,21 @@
"localhost",
]

_dll_suffix = ".dll" if os.name == "nt" else ".so"

_DLL_PATH = os.path.join(
ARROW_ROOT_DEFAULT,
"go/arrow/internal/cdata_integration")
_INTEGRATION_DLL = os.path.join(_DLL_PATH, "arrow_go_integration" + _dll_suffix)


class GoTester(Tester):
PRODUCER = True
CONSUMER = True
FLIGHT_SERVER = True
FLIGHT_CLIENT = True
C_DATA_EXPORTER = True
C_DATA_IMPORTER = True

name = 'Go'

Expand Down Expand Up @@ -119,3 +131,124 @@ def flight_request(self, port, json_path=None, scenario_name=None):
if self.debug:
log(' '.join(cmd))
run_cmd(cmd)

def make_c_data_exporter(self):
return GoCDataExporter(self.debug, self.args)

def make_c_data_importer(self):
return GoCDataImporter(self.debug, self.args)


_go_c_data_entrypoints = """
const char* ArrowGo_ExportSchemaFromJson(const char* json_path,
uintptr_t out);
const char* ArrowGo_ImportSchemaAndCompareToJson(
const char* json_path, uintptr_t c_schema);
const char* ArrowGo_ExportBatchFromJson(const char* json_path,
int num_batch,
uintptr_t out);
const char* ArrowGo_ImportBatchAndCompareToJson(
const char* json_path, int num_batch, uintptr_t c_array);
int64_t ArrowGo_BytesAllocated();
void ArrowGo_RunGC();
void ArrowGo_FreeError(const char*);
"""


@functools.lru_cache
def _load_ffi(ffi, lib_path=_INTEGRATION_DLL):
ffi.cdef(_go_c_data_entrypoints)
dll = ffi.dlopen(lib_path)
dll.ArrowGo_ExportSchemaFromJson
return dll


class _CDataBase:

def __init__(self, debug, args):
self.debug = debug
self.args = args
self.ffi = cdata.ffi()
self.dll = _load_ffi(self.ffi)

def _pointer_to_int(self, c_ptr):
return self.ffi.cast('uintptr_t', c_ptr)

def _check_go_error(self, go_error):
"""
Check a `const char*` error return from an integration entrypoint.
A null means success, a non-empty string is an error message.
The string is dynamically allocated on the Go side.
"""
assert self.ffi.typeof(go_error) is self.ffi.typeof("const char*")
if go_error != self.ffi.NULL:
try:
error = self.ffi.string(go_error).decode('utf8',
errors='replace')
raise RuntimeError(
f"Go C Data Integration call failed: {error}")
finally:
self.dll.ArrowGo_FreeError(go_error)

def _run_gc(self):
self.dll.ArrowGo_RunGC()


class GoCDataExporter(CDataExporter, _CDataBase):
# Note: the Arrow Go C Data export functions expect their output
# ArrowStream or ArrowArray argument to be zero-initialized.
# This is currently ensured through the use of `ffi.new`.

def export_schema_from_json(self, json_path, c_schema_ptr):
go_error = self.dll.ArrowGo_ExportSchemaFromJson(
str(json_path).encode(), self._pointer_to_int(c_schema_ptr))
self._check_go_error(go_error)

def export_batch_from_json(self, json_path, num_batch, c_array_ptr):
go_error = self.dll.ArrowGo_ExportBatchFromJson(
str(json_path).encode(), num_batch,
self._pointer_to_int(c_array_ptr))
self._check_go_error(go_error)

@property
def supports_releasing_memory(self):
return True

def record_allocation_state(self):
self._run_gc()
return self.dll.ArrowGo_BytesAllocated()

def compare_allocation_state(self, recorded, gc_until):
def pred():
return self.record_allocation_state() == recorded

return gc_until(pred)


class GoCDataImporter(CDataImporter, _CDataBase):

def import_schema_and_compare_to_json(self, json_path, c_schema_ptr):
go_error = self.dll.ArrowGo_ImportSchemaAndCompareToJson(
str(json_path).encode(), self._pointer_to_int(c_schema_ptr))
self._check_go_error(go_error)

def import_batch_and_compare_to_json(self, json_path, num_batch,
c_array_ptr):
go_error = self.dll.ArrowGo_ImportBatchAndCompareToJson(
str(json_path).encode(), num_batch,
self._pointer_to_int(c_array_ptr))
self._check_go_error(go_error)

@property
def supports_releasing_memory(self):
return True

def gc_until(self, predicate):
for i in range(10):
if predicate():
return True
self._run_gc()
return False
2 changes: 1 addition & 1 deletion go/arrow/cdata/cdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func importSchema(schema *CArrowSchema) (ret arrow.Field, err error) {

// handle types with params via colon
typs := strings.Split(f, ":")
defaulttz := "UTC"
defaulttz := ""
switch typs[0] {
case "tss":
tz := typs[1]
Expand Down
39 changes: 21 additions & 18 deletions go/arrow/cdata/cdata_exports.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,34 +368,37 @@ func exportArray(arr arrow.Array, out *CArrowArray, outSchema *CArrowSchema) {
exportField(arrow.Field{Type: arr.DataType()}, outSchema)
}

nbuffers := len(arr.Data().Buffers())
buf_offset := 0
// Some types don't have validity bitmaps, but we keep them shifted
// to make processing easier in other contexts. This means that
// we have to adjust when exporting.
has_validity_bitmap := internal.DefaultHasValidityBitmap(arr.DataType().ID())
if nbuffers > 0 && !has_validity_bitmap {
nbuffers--
buf_offset++
}

out.dictionary = nil
out.null_count = C.int64_t(arr.NullN())
out.length = C.int64_t(arr.Len())
out.offset = C.int64_t(arr.Data().Offset())
out.n_buffers = C.int64_t(len(arr.Data().Buffers()))

if out.n_buffers > 0 {
var (
nbuffers = len(arr.Data().Buffers())
bufs = arr.Data().Buffers()
)
// unions don't have validity bitmaps, but we keep them shifted
// to make processing easier in other contexts. This means that
// we have to adjust for union arrays
if !internal.DefaultHasValidityBitmap(arr.DataType().ID()) {
out.n_buffers--
nbuffers--
bufs = bufs[1:]
}
out.n_buffers = C.int64_t(nbuffers)
out.buffers = nil

if nbuffers > 0 {
bufs := arr.Data().Buffers()
buffers := allocateBufferPtrArr(nbuffers)
for i := range bufs {
buf := bufs[i]
for i := 0; i < nbuffers; i++ {
buf := bufs[i + buf_offset]
if buf == nil || buf.Len() == 0 {
if i > 0 || !internal.DefaultHasValidityBitmap(arr.DataType().ID()) {
if i > 0 || !has_validity_bitmap {
// apache/arrow#33936: export a dummy buffer to be friendly to
// implementations that don't import NULL properly
buffers[i] = (*C.void)(unsafe.Pointer(&C.kGoCdataZeroRegion))
} else {
// null pointer permitted for the validity bitmap
// (assuming null count is 0)
buffers[i] = nil
}
continue
Expand Down
12 changes: 8 additions & 4 deletions go/arrow/cdata/cdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,17 @@ func TestImportTemporalSchema(t *testing.T) {
{arrow.FixedWidthTypes.MonthInterval, "tiM"},
{arrow.FixedWidthTypes.DayTimeInterval, "tiD"},
{arrow.FixedWidthTypes.MonthDayNanoInterval, "tin"},
{arrow.FixedWidthTypes.Timestamp_s, "tss:"},
{arrow.FixedWidthTypes.Timestamp_s, "tss:UTC"},
{&arrow.TimestampType{Unit: arrow.Second}, "tss:"},
{&arrow.TimestampType{Unit: arrow.Second, TimeZone: "Europe/Paris"}, "tss:Europe/Paris"},
{arrow.FixedWidthTypes.Timestamp_ms, "tsm:"},
{arrow.FixedWidthTypes.Timestamp_ms, "tsm:UTC"},
{&arrow.TimestampType{Unit: arrow.Millisecond}, "tsm:"},
{&arrow.TimestampType{Unit: arrow.Millisecond, TimeZone: "Europe/Paris"}, "tsm:Europe/Paris"},
{arrow.FixedWidthTypes.Timestamp_us, "tsu:"},
{arrow.FixedWidthTypes.Timestamp_us, "tsu:UTC"},
{&arrow.TimestampType{Unit: arrow.Microsecond}, "tsu:"},
{&arrow.TimestampType{Unit: arrow.Microsecond, TimeZone: "Europe/Paris"}, "tsu:Europe/Paris"},
{arrow.FixedWidthTypes.Timestamp_ns, "tsn:"},
{arrow.FixedWidthTypes.Timestamp_ns, "tsn:UTC"},
{&arrow.TimestampType{Unit: arrow.Nanosecond}, "tsn:"},
{&arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: "Europe/Paris"}, "tsn:Europe/Paris"},
}

Expand Down
10 changes: 10 additions & 0 deletions go/arrow/internal/arrjson/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ func (r *Reader) Release() {
r.recs[i] = nil
}
}
r.memo.Clear()
r.memo = nil
}
}
func (r *Reader) Schema() *arrow.Schema { return r.schema }
Expand All @@ -96,6 +98,14 @@ func (r *Reader) Read() (arrow.Record, error) {
return rec, nil
}

func (r *Reader) ReadAt(index int) (arrow.Record, error) {
if index >= r.NumRecords() {
return nil, io.EOF
}
rec := r.recs[index]
return rec, nil
}

var (
_ arrio.Reader = (*Reader)(nil)
)
Loading

0 comments on commit 3d145ff

Please sign in to comment.