Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-37212: [C++] IO: BufferReader always owned buffer #37271

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/examples/arrow/join_example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ arrow::Result<std::shared_ptr<arrow::dataset::Dataset>> CreateDataSetFromCSVData
std::string csv_data = is_left ? kLeftRelationCsvData : kRightRelationCsvData;
std::cout << csv_data << std::endl;
std::string_view sv = csv_data;
input = std::make_shared<arrow::io::BufferReader>(sv);
input = std::make_shared<arrow::io::BufferReader>(std::string(sv));
auto read_options = arrow::csv::ReadOptions::Defaults();
auto parse_options = arrow::csv::ParseOptions::Defaults();
auto convert_options = arrow::csv::ConvertOptions::Defaults();
Expand Down
6 changes: 2 additions & 4 deletions cpp/src/arrow/adapters/orc/adapter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -484,8 +484,7 @@ TEST(TestAdapterRead, ReadCharAndVarcharType) {
writer->close();

std::shared_ptr<io::RandomAccessFile> in_stream(std::make_shared<io::BufferReader>(
reinterpret_cast<const uint8_t*>(mem_stream.getData()),
static_cast<int64_t>(mem_stream.getLength())));
std::string(mem_stream.getData(), mem_stream.getLength())));
ASSERT_OK_AND_ASSIGN(
auto reader, adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool()));
ASSERT_EQ(row_count, reader->NumberOfRows());
Expand Down Expand Up @@ -558,8 +557,7 @@ TEST(TestAdapterRead, ReadFieldAttributes) {
writer->close();

std::shared_ptr<io::RandomAccessFile> in_stream(std::make_shared<io::BufferReader>(
reinterpret_cast<const uint8_t*>(mem_stream.getData()),
static_cast<int64_t>(mem_stream.getLength())));
std::string(mem_stream.getData(), mem_stream.getLength())));
ASSERT_OK_AND_ASSIGN(
auto reader, adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool()));
ASSERT_EQ(0, reader->NumberOfRows());
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,14 @@ void Buffer::CheckCPU() const {

Result<std::shared_ptr<io::RandomAccessFile>> Buffer::GetReader(
std::shared_ptr<Buffer> buf) {
return buf->memory_manager_->GetBufferReader(buf);
return buf->memory_manager_->GetBufferReader(std::move(buf));
}

Result<std::shared_ptr<io::OutputStream>> Buffer::GetWriter(std::shared_ptr<Buffer> buf) {
if (!buf->is_mutable()) {
return Status::Invalid("Expected mutable buffer");
}
return buf->memory_manager_->GetBufferWriter(buf);
return buf->memory_manager_->GetBufferWriter(std::move(buf));
}

Result<std::shared_ptr<Buffer>> Buffer::Copy(std::shared_ptr<Buffer> source,
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/compute/function_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ Result<std::unique_ptr<FunctionOptions>> GenericOptionsType::Deserialize(

Result<std::unique_ptr<FunctionOptions>> DeserializeFunctionOptions(
const Buffer& buffer) {
io::BufferReader stream(buffer);
// Create a non-owned Buffer to avoid copying
io::BufferReader stream(std::make_shared<Buffer>(std::string_view(buffer)));
ARROW_ASSIGN_OR_RAISE(auto reader, ipc::RecordBatchFileReader::Open(&stream));
ARROW_ASSIGN_OR_RAISE(auto batch, reader->ReadRecordBatch(0));
if (batch->num_rows() != 1) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/dataset_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ class DatasetWriterTestFixture : public testing::Test {

std::shared_ptr<RecordBatch> ReadAsBatch(std::string_view data, int* num_batches) {
std::shared_ptr<io::RandomAccessFile> in_stream =
std::make_shared<io::BufferReader>(data);
std::make_shared<io::BufferReader>(std::string(data));
EXPECT_OK_AND_ASSIGN(std::shared_ptr<ipc::RecordBatchFileReader> reader,
ipc::RecordBatchFileReader::Open(in_stream));
RecordBatchVector batches;
Expand Down
11 changes: 2 additions & 9 deletions cpp/src/arrow/io/memory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -268,15 +268,8 @@ BufferReader::BufferReader(std::shared_ptr<Buffer> buffer)
position_(0),
is_open_(true) {}

BufferReader::BufferReader(const uint8_t* data, int64_t size)
: buffer_(nullptr), data_(data), size_(size), position_(0), is_open_(true) {}

BufferReader::BufferReader(const Buffer& buffer)
: BufferReader(buffer.data(), buffer.size()) {}

BufferReader::BufferReader(std::string_view data)
: BufferReader(reinterpret_cast<const uint8_t*>(data.data()),
static_cast<int64_t>(data.size())) {}
BufferReader::BufferReader(std::string buffer)
: BufferReader(Buffer::FromString(std::move(buffer))) {}

Status BufferReader::DoClose() {
is_open_ = false;
Expand Down
8 changes: 2 additions & 6 deletions cpp/src/arrow/io/memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,9 @@ class ARROW_EXPORT BufferReader
: public internal::RandomAccessFileConcurrencyWrapper<BufferReader> {
public:
explicit BufferReader(std::shared_ptr<Buffer> buffer);
explicit BufferReader(const Buffer& buffer);
BufferReader(const uint8_t* data, int64_t size);

/// \brief Instantiate from std::string or std::string_view. Does not
/// own data
explicit BufferReader(std::string_view data);
Comment on lines -149 to -154
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should deprecate instead of remove?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could even introduce static factories that have the same behavior as these constructors, but whose names explicitly indicate that they may be unsafe

Copy link
Member Author

@mapleFU mapleFU Aug 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, seems that I should mark them deprecated, but keep them as before (including zero-copy)?

Then how do I fix #37271 (comment) ? Using a BufferReader::FromString? Or other way?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that's annoying.

FromString is probably best.

/// \brief Instantiate from std::string. Own data.
explicit BufferReader(std::string data);

bool closed() const override;

Expand Down Expand Up @@ -185,7 +182,6 @@ class ARROW_EXPORT BufferReader
}
return Status::OK();
}

std::shared_ptr<Buffer> buffer_;
const uint8_t* data_;
int64_t size_;
Expand Down
52 changes: 36 additions & 16 deletions cpp/src/arrow/io/memory_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,22 +162,15 @@ TEST(TestFixedSizeBufferWriter, InvalidWrites) {
}

TEST(TestBufferReader, FromStrings) {
// ARROW-3291: construct BufferReader from std::string or
// std::string_view
// ARROW-3291: construct BufferReader from std::string

std::string data = "data123456";
auto view = std::string_view(data);

BufferReader reader1(data);
BufferReader reader2(view);

std::shared_ptr<Buffer> piece;
ASSERT_OK_AND_ASSIGN(piece, reader1.Read(4));
ASSERT_EQ(0, memcmp(piece->data(), data.data(), 4));

ASSERT_OK(reader2.Seek(2));
ASSERT_OK_AND_ASSIGN(piece, reader2.Read(4));
ASSERT_EQ(0, memcmp(piece->data(), data.data() + 2, 4));
}

TEST(TestBufferReader, FromNullBuffer) {
Expand Down Expand Up @@ -283,14 +276,41 @@ TEST(TestBufferReader, WillNeed) {
}
{
std::string data = "data123456";
BufferReader reader(reinterpret_cast<const uint8_t*>(data.data()),
static_cast<int64_t>(data.size()));
BufferReader reader(data);

ASSERT_OK(reader.WillNeed({{0, 4}, {4, 6}}));
ASSERT_RAISES(IOError, reader.WillNeed({{11, 1}})); // Out of bounds
}
}

void TestBufferReaderLifetime(std::function<BufferReader(std::string&)> fn,
bool supports_zero_copy) {
std::shared_ptr<Buffer> result;
std::string data = "data12345678910111213";
{
std::string data_inner = data;
BufferReader reader = fn(data_inner);
EXPECT_EQ(supports_zero_copy, reader.supports_zero_copy());
ASSERT_OK_AND_ASSIGN(result, reader.Read(data.length()));
}
EXPECT_EQ(std::string_view(data), std::string_view(*result));
}

TEST(TestBufferReader, Lifetime) {
// BufferReader(std::shared_ptr<Buffer>)
TestBufferReaderLifetime(
[](std::string& data) -> BufferReader {
auto buffer = Buffer::FromString(std::move(data));
return BufferReader(std::move(buffer));
},
/*supports_zero_copy=*/true);

// BufferReader(std::string)
TestBufferReaderLifetime(
[](std::string& data) -> BufferReader { return BufferReader(std::move(data)); },
/*supports_zero_copy=*/true);
}

TEST(TestRandomAccessFile, GetStream) {
std::string data = "data1data2data3data4data5";

Expand Down Expand Up @@ -379,7 +399,7 @@ template <typename SlowStreamType>
void TestSlowInputStream() {
using clock = std::chrono::high_resolution_clock;

auto stream = std::make_shared<BufferReader>(std::string_view("abcdefghijkl"));
auto stream = std::make_shared<BufferReader>("abcdefghijkl");
const double latency = 0.6;
auto slow = std::make_shared<SlowStreamType>(stream, latency);

Expand Down Expand Up @@ -494,7 +514,7 @@ class TestTransformInputStream : public ::testing::Test {
TransformInputStream::TransformFunc transform() const { return T(); }

void TestEmptyStream() {
auto wrapped = std::make_shared<BufferReader>(std::string_view());
auto wrapped = std::make_shared<BufferReader>(std::string());
auto stream = std::make_shared<TransformInputStream>(wrapped, transform());

ASSERT_OK_AND_EQ(0, stream->Tell());
Expand Down Expand Up @@ -730,7 +750,7 @@ TEST(RangeReadCache, Basics) {
for (auto lazy : std::vector<bool>{false, true}) {
SCOPED_TRACE(lazy);
options.lazy = lazy;
auto file = std::make_shared<CountingBufferReader>(Buffer(data));
auto file = std::make_shared<CountingBufferReader>(data);
internal::ReadRangeCache cache(file, {}, options);

ASSERT_OK(cache.Cache({{1, 2}, {3, 2}, {8, 2}, {20, 2}, {25, 0}}));
Expand Down Expand Up @@ -772,7 +792,7 @@ TEST(RangeReadCache, Basics) {
TEST(RangeReadCache, Concurrency) {
std::string data = "abcdefghijklmnopqrstuvwxyz";

auto file = std::make_shared<BufferReader>(Buffer(data));
auto file = std::make_shared<BufferReader>(data);
std::vector<ReadRange> ranges{{1, 2}, {3, 2}, {8, 2}, {20, 2},
{25, 0}, {10, 4}, {14, 0}, {15, 4}};

Expand Down Expand Up @@ -808,7 +828,7 @@ TEST(RangeReadCache, Concurrency) {
TEST(RangeReadCache, Lazy) {
std::string data = "abcdefghijklmnopqrstuvwxyz";

auto file = std::make_shared<CountingBufferReader>(Buffer(data));
auto file = std::make_shared<CountingBufferReader>(data);
CacheOptions options = CacheOptions::LazyDefaults();
options.hole_size_limit = 2;
options.range_size_limit = 10;
Expand Down Expand Up @@ -849,7 +869,7 @@ TEST(RangeReadCache, Lazy) {
TEST(RangeReadCache, LazyWithPrefetching) {
std::string data = "abcdefghijklmnopqrstuvwxyz";

auto file = std::make_shared<CountingBufferReader>(Buffer(data));
auto file = std::make_shared<CountingBufferReader>(data);
CacheOptions options = CacheOptions::LazyDefaults();
options.hole_size_limit = 1;
options.range_size_limit = 3;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/ipc/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ class ARROW_EXPORT RecordBatchFileReader
/// \return the read batch
virtual Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(int i) = 0;

/// \brief Read a particular record batch along with its custom metadada from the file.
/// \brief Read a particular record batch along with its custom metadata from the file.
/// Does not copy memory if the input source supports zero-copy.
///
/// \param[in] i the index of the record batch to return
Expand Down
Loading