Skip to content

Commit

Permalink
Merge pull request #10724 from rouault/parquet_dataset_close
Browse files Browse the repository at this point in the history
Parquet: in dataset mode, make sure all files are closed before closing the GDALDataset
  • Loading branch information
rouault committed Sep 18, 2024
2 parents 34cca5d + b0f0244 commit 84db033
Show file tree
Hide file tree
Showing 11 changed files with 245 additions and 75 deletions.
40 changes: 0 additions & 40 deletions apps/ogr2ogr_lib.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6052,46 +6052,6 @@ bool LayerTranslator::TranslateArrow(

schema.release(&schema);

// Ugly hack to work around https://github.com/OSGeo/gdal/issues/9497
// Deleting a RecordBatchReader obtained from arrow::dataset::Scanner.ToRecordBatchReader()
// is a lengthy operation since all batches are read in its destructors.
// Here we ask to our custom I/O layer to return in error to short circuit
// that lengthy operation.
if (auto poDS = psInfo->m_poSrcLayer->GetDataset())
{
if (poDS->GetLayerCount() == 1 && poDS->GetDriver() &&
EQUAL(poDS->GetDriver()->GetDescription(), "PARQUET"))
{
bool bStopIO = false;
const char *pszArrowStopIO =
CPLGetConfigOption("OGR_ARROW_STOP_IO", nullptr);
if (pszArrowStopIO && CPLTestBool(pszArrowStopIO))
{
bStopIO = true;
}
else if (!pszArrowStopIO)
{
std::string osExePath;
osExePath.resize(1024);
if (CPLGetExecPath(osExePath.data(),
static_cast<int>(osExePath.size())))
{
osExePath.resize(strlen(osExePath.data()));
if (strcmp(CPLGetBasename(osExePath.data()), "ogr2ogr") ==
0)
{
bStopIO = true;
}
}
}
if (bStopIO)
{
CPLSetConfigOption("OGR_ARROW_STOP_IO", "YES");
CPLDebug("OGR2OGR", "Forcing interruption of Parquet I/O");
}
}
}

stream.release(&stream);
return bRet;
}
Expand Down
14 changes: 8 additions & 6 deletions ogr/ogrsf_frmts/arrow/ogrfeatherdriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ static bool IsArrowIPCStream(GDALOpenInfo *poOpenInfo)
auto fp = VSIVirtualHandleUniquePtr(VSIFileFromMemBuffer(
osTmpFilename.c_str(), poOpenInfo->pabyHeader, nSizeToRead,
false));
auto infile =
std::make_shared<OGRArrowRandomAccessFile>(std::move(fp));
auto infile = std::make_shared<OGRArrowRandomAccessFile>(
osTmpFilename.c_str(), std::move(fp));
auto options = arrow::ipc::IpcReadOptions::Defaults();
auto result =
arrow::ipc::RecordBatchStreamReader::Open(infile, options);
Expand All @@ -113,8 +113,8 @@ static bool IsArrowIPCStream(GDALOpenInfo *poOpenInfo)
return false;

// Do not give ownership of poOpenInfo->fpL to infile
auto infile =
std::make_shared<OGRArrowRandomAccessFile>(poOpenInfo->fpL, false);
auto infile = std::make_shared<OGRArrowRandomAccessFile>(
poOpenInfo->pszFilename, poOpenInfo->fpL, false);
auto options = arrow::ipc::IpcReadOptions::Defaults();
auto result =
arrow::ipc::RecordBatchStreamReader::Open(infile, options);
Expand Down Expand Up @@ -164,14 +164,16 @@ static GDALDataset *OGRFeatherDriverOpen(GDALOpenInfo *poOpenInfo)
osFilename.c_str());
return nullptr;
}
infile = std::make_shared<OGRArrowRandomAccessFile>(std::move(fp));
infile = std::make_shared<OGRArrowRandomAccessFile>(osFilename.c_str(),
std::move(fp));
}
else if (STARTS_WITH(poOpenInfo->pszFilename, "/vsi") ||
CPLTestBool(CPLGetConfigOption("OGR_ARROW_USE_VSI", "NO")))
{
VSIVirtualHandleUniquePtr fp(poOpenInfo->fpL);
poOpenInfo->fpL = nullptr;
infile = std::make_shared<OGRArrowRandomAccessFile>(std::move(fp));
infile = std::make_shared<OGRArrowRandomAccessFile>(
poOpenInfo->pszFilename, std::move(fp));
}
else
{
Expand Down
7 changes: 7 additions & 0 deletions ogr/ogrsf_frmts/arrow_common/ogr_arrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,13 @@ class OGRArrowDataset CPL_NON_FINAL : public GDALPamDataset
std::vector<std::string> m_aosDomainNames{};
std::map<std::string, int> m_oMapDomainNameToCol{};

protected:
void close()
{
m_poLayer.reset();
m_poMemoryPool.reset();
}

public:
explicit OGRArrowDataset(
const std::shared_ptr<arrow::MemoryPool> &poMemoryPool);
Expand Down
111 changes: 100 additions & 11 deletions ogr/ogrsf_frmts/arrow_common/ograrrowrandomaccessfile.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,29 +36,68 @@
#include "arrow/io/file.h"
#include "arrow/io/interfaces.h"

#include <atomic>
#include <cinttypes>

/************************************************************************/
/* OGRArrowRandomAccessFile */
/************************************************************************/

class OGRArrowRandomAccessFile final : public arrow::io::RandomAccessFile
{
int64_t m_nSize = -1;
const std::string m_osFilename;
VSILFILE *m_fp;
bool m_bOwnFP;
const bool m_bOwnFP;
std::atomic<bool> m_bAskedToClosed = false;

#ifdef OGR_ARROW_USE_PREAD
const bool m_bDebugReadAt;
const bool m_bUsePRead;
#endif

OGRArrowRandomAccessFile(const OGRArrowRandomAccessFile &) = delete;
OGRArrowRandomAccessFile &
operator=(const OGRArrowRandomAccessFile &) = delete;

public:
explicit OGRArrowRandomAccessFile(VSILFILE *fp, bool bOwnFP)
: m_fp(fp), m_bOwnFP(bOwnFP)
OGRArrowRandomAccessFile(const std::string &osFilename, VSILFILE *fp,
bool bOwnFP)
: m_osFilename(osFilename), m_fp(fp), m_bOwnFP(bOwnFP)
#ifdef OGR_ARROW_USE_PREAD
,
m_bDebugReadAt(!VSIIsLocal(m_osFilename.c_str())),
// Due to the lack of caching for current /vsicurl PRead(), do not
// use the PRead() implementation on those files
m_bUsePRead(m_fp->HasPRead() &&
CPLTestBool(CPLGetConfigOption(
"OGR_ARROW_USE_PREAD",
VSIIsLocal(m_osFilename.c_str()) ? "YES" : "NO")))
#endif
{
}

OGRArrowRandomAccessFile(const std::string &osFilename,
VSIVirtualHandleUniquePtr &&fp)
: m_osFilename(osFilename), m_fp(fp.release()), m_bOwnFP(true)
#ifdef OGR_ARROW_USE_PREAD
,
m_bDebugReadAt(!VSIIsLocal(m_osFilename.c_str())),
// Due to the lack of caching for current /vsicurl PRead(), do not
// use the PRead() implementation on those files
m_bUsePRead(m_fp->HasPRead() &&
CPLTestBool(CPLGetConfigOption(
"OGR_ARROW_USE_PREAD",
VSIIsLocal(m_osFilename.c_str()) ? "YES" : "NO")))
#endif
{
}

explicit OGRArrowRandomAccessFile(VSIVirtualHandleUniquePtr &&fp)
: m_fp(fp.release()), m_bOwnFP(true)
void AskToClose()
{
m_bAskedToClosed = true;
if (m_fp)
m_fp->Interrupt();
}

~OGRArrowRandomAccessFile() override
Expand All @@ -85,31 +124,35 @@ class OGRArrowRandomAccessFile final : public arrow::io::RandomAccessFile

bool closed() const override
{
return m_fp == nullptr;
return m_bAskedToClosed || m_fp == nullptr;
}

arrow::Status Seek(int64_t position) override
{
if (m_bAskedToClosed)
return arrow::Status::IOError("File requested to close");

if (VSIFSeekL(m_fp, static_cast<vsi_l_offset>(position), SEEK_SET) == 0)
return arrow::Status::OK();
return arrow::Status::IOError("Error while seeking");
}

arrow::Result<int64_t> Read(int64_t nbytes, void *out) override
{
if (m_bAskedToClosed)
return arrow::Status::IOError("File requested to close");

CPLAssert(static_cast<int64_t>(static_cast<size_t>(nbytes)) == nbytes);
return static_cast<int64_t>(
VSIFReadL(out, 1, static_cast<size_t>(nbytes), m_fp));
}

arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override
{
if (m_bAskedToClosed)
return arrow::Status::IOError("File requested to close");

// CPLDebug("ARROW", "Reading %d bytes", int(nbytes));
// Ugly hack for https://github.com/OSGeo/gdal/issues/9497
if (CPLGetConfigOption("OGR_ARROW_STOP_IO", nullptr))
{
return arrow::Result<std::shared_ptr<arrow::Buffer>>();
}
auto buffer = arrow::AllocateResizableBuffer(nbytes);
if (!buffer.ok())
{
Expand All @@ -122,8 +165,54 @@ class OGRArrowRandomAccessFile final : public arrow::io::RandomAccessFile
return buffer;
}

#ifdef OGR_ARROW_USE_PREAD
using arrow::io::RandomAccessFile::ReadAt;

arrow::Result<std::shared_ptr<arrow::Buffer>>
ReadAt(int64_t position, int64_t nbytes) override
{
if (m_bAskedToClosed)
return arrow::Status::IOError("File requested to close");

if (m_bUsePRead)
{
auto buffer = arrow::AllocateResizableBuffer(nbytes);
if (!buffer.ok())
{
return buffer;
}
if (m_bDebugReadAt)
{
CPLDebug(
"ARROW",
"Start ReadAt() called on %s (this=%p) from "
"thread=" CPL_FRMT_GIB ": pos=%" PRId64 ", nbytes=%" PRId64,
m_osFilename.c_str(), this, CPLGetPID(), position, nbytes);
}
uint8_t *buffer_data = (*buffer)->mutable_data();
auto nread = m_fp->PRead(buffer_data, static_cast<size_t>(nbytes),
static_cast<vsi_l_offset>(position));
CPL_IGNORE_RET_VAL(
(*buffer)->Resize(nread)); // shrink --> cannot fail
if (m_bDebugReadAt)
{
CPLDebug(
"ARROW",
"End ReadAt() called on %s (this=%p) from "
"thread=" CPL_FRMT_GIB ": pos=%" PRId64 ", nbytes=%" PRId64,
m_osFilename.c_str(), this, CPLGetPID(), position, nbytes);
}
return buffer;
}
return arrow::io::RandomAccessFile::ReadAt(position, nbytes);
}
#endif

arrow::Result<int64_t> GetSize() override
{
if (m_bAskedToClosed)
return arrow::Status::IOError("File requested to close");

if (m_nSize < 0)
{
const auto nPos = VSIFTellL(m_fp);
Expand Down
62 changes: 59 additions & 3 deletions ogr/ogrsf_frmts/arrow_common/vsiarrowfilesystem.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@

#include "ograrrowrandomaccessfile.h"

#include <atomic>
#include <memory>
#include <mutex>
#include <vector>
#include <utility>

/************************************************************************/
/* VSIArrowFileSystem */
/************************************************************************/
Expand All @@ -42,14 +48,54 @@ class VSIArrowFileSystem final : public arrow::fs::FileSystem
const std::string m_osEnvVarPrefix;
const std::string m_osQueryParameters;

std::atomic<bool> m_bAskedToClosed = false;
std::mutex m_oMutex{};
std::vector<std::pair<std::string, std::weak_ptr<OGRArrowRandomAccessFile>>>
m_oSetFiles{};

public:
explicit VSIArrowFileSystem(const std::string &osEnvVarPrefix,
const std::string &osQueryParameters)
VSIArrowFileSystem(const std::string &osEnvVarPrefix,
const std::string &osQueryParameters)
: m_osEnvVarPrefix(osEnvVarPrefix),
m_osQueryParameters(osQueryParameters)
{
}

// Cf comment in OGRParquetDataset::~OGRParquetDataset() for rationale
// for this method
void AskToClose()
{
m_bAskedToClosed = true;
std::vector<
std::pair<std::string, std::weak_ptr<OGRArrowRandomAccessFile>>>
oSetFiles;
{
std::lock_guard oLock(m_oMutex);
oSetFiles = m_oSetFiles;
}
for (auto &[osName, poFile] : oSetFiles)
{
bool bWarned = false;
while (!poFile.expired())
{
if (!bWarned)
{
bWarned = true;
auto poFileLocked = poFile.lock();
if (poFileLocked)
{
CPLDebug("PARQUET",
"Still on-going reads on %s. Waiting for it "
"to be closed.",
osName.c_str());
poFileLocked->AskToClose();
}
}
CPLSleep(0.01);
}
}
}

std::string type_name() const override
{
return "vsi" + m_osEnvVarPrefix;
Expand Down Expand Up @@ -203,14 +249,24 @@ class VSIArrowFileSystem final : public arrow::fs::FileSystem
arrow::Result<std::shared_ptr<arrow::io::RandomAccessFile>>
OpenInputFile(const std::string &path) override
{
if (m_bAskedToClosed)
return arrow::Status::IOError(
"OpenInputFile(): file system in shutdown");

std::string osPath(path);
osPath += m_osQueryParameters;
CPLDebugOnly(m_osEnvVarPrefix.c_str(), "Opening %s", osPath.c_str());
auto fp = VSIVirtualHandleUniquePtr(VSIFOpenL(osPath.c_str(), "rb"));
if (fp == nullptr)
return arrow::Status::IOError("OpenInputFile() failed for " +
osPath);
return std::make_shared<OGRArrowRandomAccessFile>(std::move(fp));
auto poFile =
std::make_shared<OGRArrowRandomAccessFile>(osPath, std::move(fp));
{
std::lock_guard oLock(m_oMutex);
m_oSetFiles.emplace_back(path, poFile);
}
return poFile;
}

using arrow::fs::FileSystem::OpenOutputStream;
Expand Down
8 changes: 8 additions & 0 deletions ogr/ogrsf_frmts/parquet/ogr_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,16 +304,24 @@ class OGRParquetDatasetLayer final : public OGRParquetLayerBase

class OGRParquetDataset final : public OGRArrowDataset
{
std::shared_ptr<arrow::fs::FileSystem> m_poFS{};

public:
explicit OGRParquetDataset(
const std::shared_ptr<arrow::MemoryPool> &poMemoryPool);
~OGRParquetDataset();

OGRLayer *ExecuteSQL(const char *pszSQLCommand,
OGRGeometry *poSpatialFilter,
const char *pszDialect) override;
void ReleaseResultSet(OGRLayer *poResultsSet) override;

int TestCapability(const char *) override;

void SetFileSystem(const std::shared_ptr<arrow::fs::FileSystem> &fs)
{
m_poFS = fs;
}
};

/************************************************************************/
Expand Down
Loading

0 comments on commit 84db033

Please sign in to comment.