Skip to content

Commit

Permalink
apacheGH-41116: [C++] IO: enhance boundary checking in CompressedInpu…
Browse files Browse the repository at this point in the history
…tStream (apache#41117)

### Rationale for this change

Enhance the boundary checking code style in `io::CompressedInputStream`.

### What changes are included in this PR?

* Add `compressed_buffer_available` and `decompressed_buffer_available` in the class, and uses them for checking the boundary
* Change `Status(bool*)` to `Result<bool>`

### Are these changes tested?

Already has testing. I don't know how to hacking into internal

### Are there any user-facing changes?

No

* GitHub Issue: apache#41116

Lead-authored-by: mwish <[email protected]>
Co-authored-by: mwish <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: mwish <[email protected]>
  • Loading branch information
3 people authored and tolleybot committed May 2, 2024
1 parent 4f25e74 commit 9e58ac3
Showing 1 changed file with 29 additions and 14 deletions.
43 changes: 29 additions & 14 deletions cpp/src/arrow/io/compressed.cc
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ class CompressedInputStream::Impl {

// Read compressed data if necessary
Status EnsureCompressedData() {
int64_t compressed_avail = compressed_ ? compressed_->size() - compressed_pos_ : 0;
int64_t compressed_avail = compressed_buffer_available();
if (compressed_avail == 0) {
// Ensure compressed_ buffer is allocated with kChunkSize.
if (!supports_zero_copy_from_raw_) {
Expand Down Expand Up @@ -297,10 +297,14 @@ class CompressedInputStream::Impl {
return Status::OK();
}

// Decompress some data from the compressed_ buffer.
// Call this function only if the decompressed_ buffer is empty.
// Decompress some data from the compressed_ buffer into decompressor_.
// Call this function only if the decompressed_ buffer is fully consumed.
Status DecompressData() {
// compressed_buffer_available() could be 0 here because there might
// still be some decompressed data left to emit even though the compressed
// data was entirely consumed (especially if the expansion factor is large)
DCHECK_NE(compressed_->data(), nullptr);
DCHECK_EQ(0, decompressed_buffer_available());

int64_t decompress_size = kDecompressSize;

Expand Down Expand Up @@ -352,8 +356,10 @@ class CompressedInputStream::Impl {
}

// Try to feed more data into the decompressed_ buffer.
Status RefillDecompressed(bool* has_data) {
// First try to read data from the decompressor
// Returns whether there is more data to read.
Result<bool> RefillDecompressed() {
// First try to read data from the decompressor, unless we haven't read any
// compressed data yet.
if (compressed_ && compressed_->size() != 0) {
if (decompressor_->IsFinished()) {
// We just went over the end of a previous compressed stream.
Expand All @@ -362,21 +368,21 @@ class CompressedInputStream::Impl {
}
RETURN_NOT_OK(DecompressData());
}
if (!decompressed_ || decompressed_->size() == 0) {
// Got nothing, need to read more compressed data
int64_t decompress_avail = decompressed_buffer_available();
if (decompress_avail == 0) {
// Got nothing from existing `compressed_` and `decompressor_`,
// need to read more compressed data.
RETURN_NOT_OK(EnsureCompressedData());
if (compressed_pos_ == compressed_->size()) {
if (compressed_buffer_available() == 0) {
// No more data to decompress
if (!fresh_decompressor_ && !decompressor_->IsFinished()) {
return Status::IOError("Truncated compressed stream");
}
*has_data = false;
return Status::OK();
return false;
}
RETURN_NOT_OK(DecompressData());
}
*has_data = true;
return Status::OK();
return true;
}

Result<int64_t> Read(int64_t nbytes, void* out) {
Expand All @@ -394,7 +400,7 @@ class CompressedInputStream::Impl {

// At this point, no more decompressed data remains, so we need to
// decompress more
RETURN_NOT_OK(RefillDecompressed(&decompressor_has_data));
ARROW_ASSIGN_OR_RAISE(decompressor_has_data, RefillDecompressed());
}

total_pos_ += total_read;
Expand All @@ -405,13 +411,22 @@ class CompressedInputStream::Impl {
ARROW_ASSIGN_OR_RAISE(auto buf, AllocateResizableBuffer(nbytes, pool_));
ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buf->mutable_data()));
RETURN_NOT_OK(buf->Resize(bytes_read));
// Using std::move because the some compiler might has issue below:
// Using std::move because some compiler might has issue below:
// https://wg21.cmeerw.net/cwg/issue1579
return std::move(buf);
}

const std::shared_ptr<InputStream>& raw() const { return raw_; }

private:
int64_t compressed_buffer_available() const {
return compressed_ ? compressed_->size() - compressed_pos_ : 0;
}

int64_t decompressed_buffer_available() const {
return decompressed_ ? decompressed_->size() - decompressed_pos_ : 0;
}

private:
// Read 64 KB compressed data at a time
static const int64_t kChunkSize = 64 * 1024;
Expand Down

0 comments on commit 9e58ac3

Please sign in to comment.