diff --git a/setup.py b/setup.py index eb42a6d..29944e6 100644 --- a/setup.py +++ b/setup.py @@ -39,15 +39,15 @@ """ packages = ['snappy'] -install_requires = ['cramjam>=2.6.0', 'crc32c'] -setup_requires = ['cramjam>=2.6.0', 'crc32c'] +install_requires = ["cramjam"] +setup_requires = ['cramjam>=2.6.0', 'google-crc32c'] setup( name='python-snappy', version=version, author='Andres Moreira', author_email='andres@andresmoreira.com', - url='http://github.com/andrix/python-snappy', + url='http://github.com/intake/python-snappy', description='Python library for the snappy compression library from Google', long_description=long_description, keywords='snappy, compression, google', diff --git a/src/snappy/__init__.py b/src/snappy/__init__.py index d4f1790..8aa2b82 100644 --- a/src/snappy/__init__.py +++ b/src/snappy/__init__.py @@ -12,9 +12,4 @@ isValidCompressed, ) -from .hadoop_snappy import ( - stream_compress as hadoop_stream_compress, - stream_decompress as hadoop_stream_decompress, -) - __version__ = '0.7.0' diff --git a/src/snappy/hadoop_snappy.py b/src/snappy/hadoop_snappy.py deleted file mode 100644 index 0f8931a..0000000 --- a/src/snappy/hadoop_snappy.py +++ /dev/null @@ -1,216 +0,0 @@ -"""The module implements compression/decompression with snappy using -Hadoop snappy format: https://github.com/kubo/snzip#hadoop-snappy-format - -Expected usage like: - - import snappy - - src = 'uncompressed' - dst = 'compressed' - dst2 = 'decompressed' - - with open(src, 'rb') as fin, open(dst, 'wb') as fout: - snappy.hadoop_stream_compress(src, dst) - - with open(dst, 'rb') as fin, open(dst2, 'wb') as fout: - snappy.hadoop_stream_decompress(fin, fout) - - with open(src, 'rb') as fin1, open(dst2, 'rb') as fin2: - assert fin1.read() == fin2.read() - -""" -from __future__ import absolute_import - -import struct - -from .snappy import ( - _compress, _uncompress, - stream_compress as _stream_compress, - stream_decompress as _stream_decompress, - check_format as _check_format, - UncompressError, - _CHUNK_MAX) - - -SNAPPY_BUFFER_SIZE_DEFAULT = 256 * 1024 -_STREAM_TO_STREAM_BLOCK_SIZE = _CHUNK_MAX - -_INT_SIZE = 4 - - -def pack_int(num): - big_endian_uint = struct.pack('>I', num) - return big_endian_uint - - -def unpack_int(data): - return struct.unpack('>I', data)[0] - - -class StreamCompressor(object): - - """This class implements the compressor-side of the hadoop snappy - format, taken from https://github.com/kubo/snzip#hadoop-snappy-format - - Keep in mind that this compressor object does no buffering for you to - appropriately size chunks. Every call to StreamCompressor.compress results - in a unique call to the underlying snappy compression method. - """ - - def __init__(self): - pass - - def add_chunk(self, data): - """Add a chunk containing 'data', returning a string that is - compressed. This data should be concatenated to - the tail end of an existing Snappy stream. In the absence of any - internal buffering, no data is left in any internal buffers, and so - unlike zlib.compress, this method returns everything. - """ - out = [] - uncompressed_length = len(data) - out.append(pack_int(uncompressed_length)) - compressed_chunk = _compress(data) - compressed_length = len(compressed_chunk) - out.append(pack_int(compressed_length)) - out.append(compressed_chunk) - return b"".join(out) - - def compress(self, data): - """This method is simply an alias for compatibility with zlib - compressobj's compress method. - """ - return self.add_chunk(data) - - def flush(self, mode=None): - """This method does nothing and only exists for compatibility with - the zlib compressobj - """ - pass - - def copy(self): - """This method exists for compatibility with the zlib compressobj. - """ - return StreamCompressor() - - -class StreamDecompressor(object): - - """This class implements the decompressor-side of the hadoop snappy - format. - - This class matches a subset of the interface found for the zlib module's - decompression objects (see zlib.decompressobj). Specifically, it currently - implements the decompress method without the max_length option, the flush - method without the length option, and the copy method. - """ - - __slots__ = ["_buf", "_block_length", "_uncompressed_length"] - - def __init__(self): - self._buf = b"" - # current block length - self._block_length = 0 - # total uncompressed data length of the current block - self._uncompressed_length = 0 - - @staticmethod - def check_format(data): - """Just checks that first two integers (big endian four-bytes int) - in the given data block comply to: first int >= second int. - This is a simple assumption that we have in the data a start of a - block for hadoop snappy format. It should contain uncompressed block - length as the first integer, and compressed subblock length as the - second integer. - Raises UncompressError if the condition is not fulfilled. - :return: None - """ - int_size = _INT_SIZE - if len(data) < int_size * 2: - raise UncompressError("Too short data length") - # We cant actually be sure abot the format here. - # Assumption that compressed data length is less than uncompressed - # is not true in general. - # So, just don't check anything - return - - def decompress(self, data): - """Decompress 'data', returning a string containing the uncompressed - data corresponding to at least part of the data in string. This data - should be concatenated to the output produced by any preceding calls to - the decompress() method. Some of the input data may be preserved in - internal buffers for later processing. - """ - int_size = _INT_SIZE - self._buf += data - uncompressed = [] - while True: - if len(self._buf) < int_size: - return b"".join(uncompressed) - next_start = 0 - if not self._block_length: - self._block_length = unpack_int(self._buf[:int_size]) - self._buf = self._buf[int_size:] - if len(self._buf) < int_size: - return b"".join(uncompressed) - compressed_length = unpack_int( - self._buf[next_start:next_start + int_size] - ) - next_start += int_size - if len(self._buf) < compressed_length + next_start: - return b"".join(uncompressed) - chunk = self._buf[ - next_start:next_start + compressed_length - ] - self._buf = self._buf[next_start + compressed_length:] - uncompressed_chunk = _uncompress(chunk) - self._uncompressed_length += len(uncompressed_chunk) - uncompressed.append(uncompressed_chunk) - if self._uncompressed_length == self._block_length: - # Here we have uncompressed all subblocks of the current block - self._uncompressed_length = 0 - self._block_length = 0 - continue - - def flush(self): - """All pending input is processed, and a string containing the - remaining uncompressed output is returned. After calling flush(), the - decompress() method cannot be called again; the only realistic action - is to delete the object. - """ - if self._buf != b"": - raise UncompressError("chunk truncated") - return b"" - - def copy(self): - """Returns a copy of the decompression object. This can be used to save - the state of the decompressor midway through the data stream in order - to speed up random seeks into the stream at a future point. - """ - copy = StreamDecompressor() - copy._buf = self._buf - copy._block_length = self._block_length - copy._uncompressed_length = self._uncompressed_length - return copy - - -def stream_compress(src, dst, blocksize=SNAPPY_BUFFER_SIZE_DEFAULT): - return _stream_compress( - src, dst, blocksize=blocksize, compressor_cls=StreamCompressor - ) - - -def stream_decompress(src, dst, blocksize=_STREAM_TO_STREAM_BLOCK_SIZE, - start_chunk=None): - return _stream_decompress( - src, dst, blocksize=blocksize, - decompressor_cls=StreamDecompressor, - start_chunk=start_chunk - ) - - -def check_format(fin=None, chunk=None, blocksize=_STREAM_TO_STREAM_BLOCK_SIZE): - return _check_format( - fin=fin, chunk=chunk, blocksize=blocksize, - decompressor_cls=StreamDecompressor - ) diff --git a/src/snappy/snappy.py b/src/snappy/snappy.py index 7179a7a..b4a6284 100644 --- a/src/snappy/snappy.py +++ b/src/snappy/snappy.py @@ -41,29 +41,15 @@ """ from __future__ import absolute_import -import sys import struct import cramjam -import crc32c _CHUNK_MAX = 65536 _STREAM_TO_STREAM_BLOCK_SIZE = _CHUNK_MAX _STREAM_IDENTIFIER = b"sNaPpY" -_COMPRESSED_CHUNK = 0x00 -_UNCOMPRESSED_CHUNK = 0x01 _IDENTIFIER_CHUNK = 0xff -_RESERVED_UNSKIPPABLE = (0x02, 0x80) # chunk ranges are [inclusive, exclusive) -_RESERVED_SKIPPABLE = (0x80, 0xff) - -# the minimum percent of bytes compression must save to be enabled in automatic -# mode -_COMPRESSION_THRESHOLD = .125 - -def _masked_crc32c(data): - # see the framing format specification - crc = crc32c.crc32c(data) - return (((crc >> 15) | (crc << 17)) + 0xa282ead8) & 0xffffffff +_STREAM_HEADER_BLOCK = b"\xff\x06\x00\x00sNaPpY" _compress = cramjam.snappy.compress_raw _uncompress = cramjam.snappy.decompress_raw @@ -73,14 +59,8 @@ class UncompressError(Exception): pass -py3k = False -if sys.hexversion > 0x03000000: - unicode = str - py3k = True - - def isValidCompressed(data): - if isinstance(data, unicode): + if isinstance(data, str): data = data.encode('utf-8') ok = True @@ -92,13 +72,13 @@ def isValidCompressed(data): def compress(data, encoding='utf-8'): - if isinstance(data, unicode): + if isinstance(data, str): data = data.encode(encoding) return bytes(_compress(data)) def uncompress(data, decoding=None): - if isinstance(data, unicode): + if isinstance(data, str): raise UncompressError("It's only possible to uncompress bytes") try: out = bytes(_uncompress(data)) @@ -111,8 +91,7 @@ def uncompress(data, decoding=None): decompress = uncompress - -class StreamCompressor(object): +class StreamCompressor(): """This class implements the compressor-side of the proposed Snappy framing format, found at @@ -129,71 +108,30 @@ class StreamCompressor(object): in a unique call to the underlying snappy compression method. """ - __slots__ = ["_header_chunk_written"] - def __init__(self): - self._header_chunk_written = False - - def add_chunk(self, data, compress=None): - """Add a chunk containing 'data', returning a string that is framed and - (optionally, default) compressed. This data should be concatenated to - the tail end of an existing Snappy stream. In the absence of any - internal buffering, no data is left in any internal buffers, and so - unlike zlib.compress, this method returns everything. - - If compress is None, compression is determined automatically based on - snappy's performance. If compress == True, compression always happens, - and if compress == False, compression never happens. - """ - out = bytearray() - if not self._header_chunk_written: - self._header_chunk_written = True - out.extend(struct.pack(" ldata: + # not even enough for one block + self.remains = data + return b"" while True: - if len(self._buf) < 4: - return bytes(uncompressed) - chunk_type = struct.unpack("> 8) - chunk_type &= 0xff - if not self._header_found: - if (chunk_type != _IDENTIFIER_CHUNK or - size != len(_STREAM_IDENTIFIER)): - raise UncompressError("stream missing snappy identifier") - self._header_found = True - if (_RESERVED_UNSKIPPABLE[0] <= chunk_type and - chunk_type < _RESERVED_UNSKIPPABLE[1]): - raise UncompressError( - "stream received unskippable but unknown chunk") - if len(self._buf) < 4 + size: - return bytes(uncompressed) - chunk, self._buf = self._buf[4:4 + size], self._buf[4 + size:] - if chunk_type == _IDENTIFIER_CHUNK: - if chunk != _STREAM_IDENTIFIER: - raise UncompressError( - "stream has invalid snappy identifier") - continue - if (_RESERVED_SKIPPABLE[0] <= chunk_type and - chunk_type < _RESERVED_SKIPPABLE[1]): - continue - assert chunk_type in (_COMPRESSED_CHUNK, _UNCOMPRESSED_CHUNK) - crc, chunk = chunk[:4], chunk[4:] - if chunk_type == _COMPRESSED_CHUNK: - chunk = _uncompress(chunk) - if struct.pack(" ldata: + # last block incomplete + self.remains = data[bsize:] + data = data[:bsize] + break + bsize += this_size + self.c.decompress(data) + return self.flush() def flush(self): - """All pending input is processed, and a string containing the - remaining uncompressed output is returned. After calling flush(), the - decompress() method cannot be called again; the only realistic action - is to delete the object. - """ - if self._buf != b"": - raise UncompressError("chunk truncated") - return b"" + return bytes(self.c.flush()) def copy(self): - """Returns a copy of the decompression object. This can be used to save - the state of the decompressor midway through the data stream in order - to speed up random seeks into the stream at a future point. - """ - copy = StreamDecompressor() - copy._buf, copy._header_found = bytearray(self._buf), self._header_found - return copy + return self def stream_compress(src, diff --git a/src/snappy/snappy_formats.py b/src/snappy/snappy_formats.py index fdfc4bb..51a54dd 100644 --- a/src/snappy/snappy_formats.py +++ b/src/snappy/snappy_formats.py @@ -9,16 +9,10 @@ from .snappy import ( stream_compress, stream_decompress, check_format, UncompressError) -from .hadoop_snappy import ( - stream_compress as hadoop_stream_compress, - stream_decompress as hadoop_stream_decompress, - check_format as hadoop_check_format) FRAMING_FORMAT = 'framing' -HADOOP_FORMAT = 'hadoop_snappy' - # Means format auto detection. # For compression will be used framing format. # In case of decompression will try to detect a format from the input stream @@ -27,16 +21,14 @@ DEFAULT_FORMAT = FORMAT_AUTO -ALL_SUPPORTED_FORMATS = [FRAMING_FORMAT, HADOOP_FORMAT, FORMAT_AUTO] +ALL_SUPPORTED_FORMATS = [FRAMING_FORMAT, FORMAT_AUTO] _COMPRESS_METHODS = { FRAMING_FORMAT: stream_compress, - HADOOP_FORMAT: hadoop_stream_compress, } _DECOMPRESS_METHODS = { FRAMING_FORMAT: stream_decompress, - HADOOP_FORMAT: hadoop_stream_decompress, } # We will use framing format as the default to compression. @@ -47,13 +39,8 @@ # The tuple contains an ordered sequence of a format checking function and # a format-specific decompression function. # Framing format has it's header, that may be recognized. -# Hadoop snappy format hasn't any special headers, it contains only -# uncompressed block length integer and length of compressed subblock. -# So we first check framing format and if it is not the case, then -# check for snappy format. _DECOMPRESS_FORMAT_FUNCS = ( (check_format, stream_decompress), - (hadoop_check_format, hadoop_stream_decompress), ) diff --git a/test_formats.py b/test_formats.py index aa83be2..43afb91 100644 --- a/test_formats.py +++ b/test_formats.py @@ -18,18 +18,6 @@ def runTest(self): compressed_stream = io.BytesIO() compress_func(instream, compressed_stream) compressed_stream.seek(0) - if not self.success: - with self.assertRaises(UncompressError) as err: - decompress_func, read_chunk = formats.get_decompress_function( - self.decompress_format, compressed_stream - ) - decompressed_stream = io.BytesIO() - decompress_func( - compressed_stream, - decompressed_stream, - start_chunk=read_chunk - ) - return decompress_func, read_chunk = formats.get_decompress_function( self.decompress_format, compressed_stream ) @@ -49,48 +37,18 @@ class TestFormatFramingFraming(TestFormatBase): success = True -class TestFormatFramingHadoop(TestFormatBase): - compress_format = formats.FRAMING_FORMAT - decompress_format = formats.HADOOP_FORMAT - success = False - - class TestFormatFramingAuto(TestFormatBase): compress_format = formats.FRAMING_FORMAT decompress_format = formats.FORMAT_AUTO success = True -class TestFormatHadoopHadoop(TestFormatBase): - compress_format = formats.HADOOP_FORMAT - decompress_format = formats.HADOOP_FORMAT - success = True - - -class TestFormatHadoopFraming(TestFormatBase): - compress_format = formats.HADOOP_FORMAT - decompress_format = formats.FRAMING_FORMAT - success = False - - -class TestFormatHadoopAuto(TestFormatBase): - compress_format = formats.HADOOP_FORMAT - decompress_format = formats.FORMAT_AUTO - success = True - - class TestFormatAutoFraming(TestFormatBase): compress_format = formats.FORMAT_AUTO decompress_format = formats.FRAMING_FORMAT success = True -class TestFormatAutoHadoop(TestFormatBase): - compress_format = formats.FORMAT_AUTO - decompress_format = formats.HADOOP_FORMAT - success = False - - if __name__ == "__main__": import unittest unittest.main() diff --git a/test_hadoop_snappy.py b/test_hadoop_snappy.py deleted file mode 100644 index a6741de..0000000 --- a/test_hadoop_snappy.py +++ /dev/null @@ -1,54 +0,0 @@ -#!/usr/bin/env python - -import os -import random -import struct -from unittest import TestCase - -import snappy.hadoop_snappy - - -class SnappyStreaming(TestCase): - - def test_random(self): - for _ in range(100): - compressor = snappy.hadoop_snappy.StreamCompressor() - decompressor = snappy.hadoop_snappy.StreamDecompressor() - data = b"" - compressed = b"" - for _ in range(random.randint(0, 3)): - chunk = os.urandom( - random.randint(0, snappy.hadoop_snappy._CHUNK_MAX * 2) - ) - data += chunk - compressed += compressor.add_chunk( - chunk - ) - - upper_bound = random.choice( - [256, snappy.hadoop_snappy._CHUNK_MAX * 2] - ) - while compressed: - size = random.randint(0, upper_bound) - chunk, compressed = compressed[:size], compressed[size:] - chunk = decompressor.decompress(chunk) - self.assertEqual(data[:len(chunk)], chunk) - data = data[len(chunk):] - - decompressor.flush() - self.assertEqual(len(data), 0) - - def test_concatenation(self): - data1 = os.urandom(snappy.hadoop_snappy._CHUNK_MAX * 2) - data2 = os.urandom(4096) - decompressor = snappy.hadoop_snappy.StreamDecompressor() - self.assertEqual( - decompressor.decompress( - snappy.hadoop_snappy.StreamCompressor().compress(data1) + - snappy.hadoop_snappy.StreamCompressor().compress(data2)), - data1 + data2) - - -if __name__ == "__main__": - import unittest - unittest.main() diff --git a/test_snappy.py b/test_snappy.py index 4c943e7..68e4359 100644 --- a/test_snappy.py +++ b/test_snappy.py @@ -141,113 +141,6 @@ def test_random(self): decompressor.flush() self.assertEqual(len(data), 0) - def test_compression(self): - # test that we can add compressed chunks - compressor = snappy.StreamCompressor() - data = b"\0" * 50 - compressed_data = snappy.compress(data) - crc = struct.pack("