From 01022198018921d27acd25cb43547d14b253a320 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 15 Mar 2024 14:24:44 -0400 Subject: [PATCH 1/2] readd hadoop stream de/comp classes --- src/snappy/snappy.py | 67 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/src/snappy/snappy.py b/src/snappy/snappy.py index b4a6284..aa1a22e 100644 --- a/src/snappy/snappy.py +++ b/src/snappy/snappy.py @@ -206,6 +206,73 @@ def copy(self): return self +class HadoopStreamCompressor(): + def add_chunk(self, data: bytes, compress=None): + """Add a chunk, returning a string that is framed and compressed. + + Outputs a single snappy chunk; if it is the very start of the stream, + will also contain the stream header chunk. + """ + cdata = _compress(data) + return b"".join((len(data).to_bytes(4, "big"), len(cdata).to_bytes(4, "big"), cdata)) + + compress = add_chunk + + def flush(self): + # never maintains a buffer + return b"" + + def copy(self): + """This method exists for compatibility with the zlib compressobj. + """ + return self + + +class HadoopStreamDecompressor(): + def __init__(self): + self.remains = b"" + + @staticmethod + def check_format(data): + """Checks that there are enough bytes for a hadoop header + + We cannot actually determine if the data is really hadoop-snappy + """ + if len(data) < 8: + raise UncompressError("Too short data length") + chunk_length = int.from_bytes(data[4:8], "big") + + def decompress(self, data: bytes): + """Decompress 'data', returning a string containing the uncompressed + data corresponding to at least part of the data in string. This data + should be concatenated to the output produced by any preceding calls to + the decompress() method. Some of the input data may be preserved in + internal buffers for later processing. + """ + if self.remains: + data = self.remains + data + self.remains = None + if len(data) < 8: + self.remains = data + return b"" + out = [] + while True: + chunk_length = int.from_bytes(data[4:8], "big") + if len(data) < 8 + chunk_length: + self.remains = data + break + out.append(_uncompress(data[8:8 + chunk_length])) + data = data[8 + chunk_length:] + return b"".join(out) + + def flush(self): + return b"" + + def copy(self): + return self + + + def stream_compress(src, dst, blocksize=_STREAM_TO_STREAM_BLOCK_SIZE, From d0ca9e5cfe0f5ea093d78ede7052ec3ce58d1de2 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 15 Mar 2024 14:26:56 -0400 Subject: [PATCH 2/2] add to top level --- src/snappy/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/snappy/__init__.py b/src/snappy/__init__.py index 9b6c9b4..e7e83e3 100644 --- a/src/snappy/__init__.py +++ b/src/snappy/__init__.py @@ -9,6 +9,8 @@ StreamCompressor, StreamDecompressor, UncompressError, + HadoopStreamCompressor, + HadoopStreamDecompressor, isValidCompressed, )