diff --git a/cpp/src/arrow/io/compressed.cc b/cpp/src/arrow/io/compressed.cc index 5faa4d095eb1e..6a6fbf40f9628 100644 --- a/cpp/src/arrow/io/compressed.cc +++ b/cpp/src/arrow/io/compressed.cc @@ -269,7 +269,7 @@ class CompressedInputStream::Impl { // Read compressed data if necessary Status EnsureCompressedData() { - int64_t compressed_avail = compressed_ ? compressed_->size() - compressed_pos_ : 0; + int64_t compressed_avail = compressed_buffer_available(); if (compressed_avail == 0) { // Ensure compressed_ buffer is allocated with kChunkSize. if (!supports_zero_copy_from_raw_) { @@ -297,10 +297,14 @@ class CompressedInputStream::Impl { return Status::OK(); } - // Decompress some data from the compressed_ buffer. - // Call this function only if the decompressed_ buffer is empty. + // Decompress some data from the compressed_ buffer into decompressor_. + // Call this function only if the decompressed_ buffer is fully consumed. Status DecompressData() { + // compressed_buffer_available() could be 0 here because there might + // still be some decompressed data left to emit even though the compressed + // data was entirely consumed (especially if the expansion factor is large) DCHECK_NE(compressed_->data(), nullptr); + DCHECK_EQ(0, decompressed_buffer_available()); int64_t decompress_size = kDecompressSize; @@ -352,8 +356,10 @@ class CompressedInputStream::Impl { } // Try to feed more data into the decompressed_ buffer. - Status RefillDecompressed(bool* has_data) { - // First try to read data from the decompressor + // Returns whether there is more data to read. + Result RefillDecompressed() { + // First try to read data from the decompressor, unless we haven't read any + // compressed data yet. if (compressed_ && compressed_->size() != 0) { if (decompressor_->IsFinished()) { // We just went over the end of a previous compressed stream. @@ -362,21 +368,21 @@ class CompressedInputStream::Impl { } RETURN_NOT_OK(DecompressData()); } - if (!decompressed_ || decompressed_->size() == 0) { - // Got nothing, need to read more compressed data + int64_t decompress_avail = decompressed_buffer_available(); + if (decompress_avail == 0) { + // Got nothing from existing `compressed_` and `decompressor_`, + // need to read more compressed data. RETURN_NOT_OK(EnsureCompressedData()); - if (compressed_pos_ == compressed_->size()) { + if (compressed_buffer_available() == 0) { // No more data to decompress if (!fresh_decompressor_ && !decompressor_->IsFinished()) { return Status::IOError("Truncated compressed stream"); } - *has_data = false; - return Status::OK(); + return false; } RETURN_NOT_OK(DecompressData()); } - *has_data = true; - return Status::OK(); + return true; } Result Read(int64_t nbytes, void* out) { @@ -394,7 +400,7 @@ class CompressedInputStream::Impl { // At this point, no more decompressed data remains, so we need to // decompress more - RETURN_NOT_OK(RefillDecompressed(&decompressor_has_data)); + ARROW_ASSIGN_OR_RAISE(decompressor_has_data, RefillDecompressed()); } total_pos_ += total_read; @@ -405,13 +411,22 @@ class CompressedInputStream::Impl { ARROW_ASSIGN_OR_RAISE(auto buf, AllocateResizableBuffer(nbytes, pool_)); ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buf->mutable_data())); RETURN_NOT_OK(buf->Resize(bytes_read)); - // Using std::move because the some compiler might has issue below: + // Using std::move because some compiler might has issue below: // https://wg21.cmeerw.net/cwg/issue1579 return std::move(buf); } const std::shared_ptr& raw() const { return raw_; } + private: + int64_t compressed_buffer_available() const { + return compressed_ ? compressed_->size() - compressed_pos_ : 0; + } + + int64_t decompressed_buffer_available() const { + return decompressed_ ? decompressed_->size() - decompressed_pos_ : 0; + } + private: // Read 64 KB compressed data at a time static const int64_t kChunkSize = 64 * 1024;