From de19af928a83c09d12cd0f3d83932bfa8cca9b19 Mon Sep 17 00:00:00 2001 From: "Seb. V" Date: Tue, 23 Jul 2024 13:47:26 +0200 Subject: [PATCH] GH-43359: [Go][Parquet] ReadRowGroups panics with canceled context (#43360) ### Rationale for this change `ReadRowGroups` needs to support externally canceled contexts, e.g. for request-scoped contexts in servers like gRPC. ### What changes are included in this PR? Additionnaly, `releaseColumns` needs to ignore columns with uninitialized data as it used in a `defer` statement. ### Are these changes tested? Yes: a new test `TestArrowReaderCanceledContext` is included. ### Are there any user-facing changes? None * GitHub Issue: #43359 Authored-by: sebdotv Signed-off-by: Joel Lubinitsky --- go/parquet/pqarrow/file_reader.go | 5 +++++ go/parquet/pqarrow/file_reader_test.go | 23 +++++++++++++++++++++++ go/parquet/pqarrow/helpers.go | 4 +++- 3 files changed, 31 insertions(+), 1 deletion(-) diff --git a/go/parquet/pqarrow/file_reader.go b/go/parquet/pqarrow/file_reader.go index 208ac9ceebadf..a2e84d9ce2795 100755 --- a/go/parquet/pqarrow/file_reader.go +++ b/go/parquet/pqarrow/file_reader.go @@ -18,6 +18,7 @@ package pqarrow import ( "context" + "errors" "fmt" "io" "sync" @@ -375,6 +376,10 @@ func (fr *FileReader) ReadRowGroups(ctx context.Context, indices, rowGroups []in data.data.Release() } + // if the context is in error, but we haven't set an error yet, then it means that the parent context + // was cancelled. In this case, we should exit early as some columns may not have been read yet. + err = errors.Join(err, ctx.Err()) + if err != nil { // if we encountered an error, consume any waiting data on the channel // so the goroutines don't leak and so memory can get cleaned up. we already diff --git a/go/parquet/pqarrow/file_reader_test.go b/go/parquet/pqarrow/file_reader_test.go index b7d178f8644de..fe5a4547a775c 100644 --- a/go/parquet/pqarrow/file_reader_test.go +++ b/go/parquet/pqarrow/file_reader_test.go @@ -167,6 +167,29 @@ func TestArrowReaderAdHocReadFloat16s(t *testing.T) { } } +func TestArrowReaderCanceledContext(t *testing.T) { + dataDir := getDataDir() + + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer mem.AssertSize(t, 0) + + filename := filepath.Join(dataDir, "int32_decimal.parquet") + require.FileExists(t, filename) + + rdr, err := file.OpenParquetFile(filename, false, file.WithReadProps(parquet.NewReaderProperties(mem))) + require.NoError(t, err) + defer rdr.Close() + arrowRdr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{}, mem) + require.NoError(t, err) + + // create a canceled context + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + _, err = arrowRdr.ReadTable(ctx) + require.ErrorIs(t, err, context.Canceled) +} + func TestRecordReaderParallel(t *testing.T) { mem := memory.NewCheckedAllocator(memory.DefaultAllocator) defer mem.AssertSize(t, 0) diff --git a/go/parquet/pqarrow/helpers.go b/go/parquet/pqarrow/helpers.go index 800cd84192005..237de4366c03e 100644 --- a/go/parquet/pqarrow/helpers.go +++ b/go/parquet/pqarrow/helpers.go @@ -38,6 +38,8 @@ func releaseArrayData(data []arrow.ArrayData) { func releaseColumns(columns []arrow.Column) { for _, col := range columns { - col.Release() + if col.Data() != nil { // data can be nil due to the way columns are constructed in ReadRowGroups + col.Release() + } } }