Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add deduplication #15

Draft
wants to merge 26 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d1bef15
verify checksum before decoding
mdekstrand Jun 11, 2020
3204e34
Factor out index parsing
mdekstrand Jun 11, 2020
dd11695
Add repickl command for testing
mdekstrand Jun 11, 2020
17c60a2
Implement V2 format with deduplication
mdekstrand Jun 11, 2020
b52ea45
Fix RW test from merge
mdekstrand Jun 12, 2020
4c6050f
Use composite example for repickling
mdekstrand Jun 12, 2020
74120ed
Improved hypothesis-based repickle tests
mdekstrand Jun 12, 2020
03c58ae
Actually run repickle
mdekstrand Jun 12, 2020
2789cb0
Add compatibility tsts
mdekstrand Jun 12, 2020
3bfa237
Fix compatibility tests
mdekstrand Jun 12, 2020
5b191de
no extras on multiple platforms
mdekstrand Jun 12, 2020
387789c
shut up lint
mdekstrand Jun 12, 2020
03fa105
Constrain Ninja
mdekstrand Jun 12, 2020
2de5909
use binary for ninja
mdekstrand Jun 12, 2020
1862403
Fix env syntax
mdekstrand Jun 12, 2020
b7fc0bf
Update natural dep
mdekstrand Jun 12, 2020
3819a96
Fix Windows compat for v2 rerun
mdekstrand Jun 12, 2020
6ec75ec
Only run compat tests in git
mdekstrand Jun 12, 2020
e349ea9
Improve repickle test coverage
mdekstrand Jun 13, 2020
d2cef2b
Refactor index unpacking
mdekstrand Jun 13, 2020
29fef3b
missing version field
mdekstrand Jun 13, 2020
3b83264
Fix serialization & tests
mdekstrand Jun 14, 2020
3d30639
Refactor index API to base class
mdekstrand Jun 16, 2020
b696b17
Test integrity logic
mdekstrand Jun 16, 2020
67b3da9
Use logger in frob test
mdekstrand Jun 16, 2020
dd13c1f
Remove deadline on integrity test
mdekstrand Jun 16, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions .github/workflows/package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ jobs:
- name: Install package
run: |
pip install -e '.[dev,test,blosc,numcodecs]'
env:
PIP_PREFER_BINARY: yes

- name: Run tests
run: python -m pytest --cov=binpickle --cov-report=xml tests
Expand All @@ -107,15 +109,18 @@ jobs:
uses: codecov/codecov-action@v1

no-extras:
name: Test without extras
runs-on: ubuntu-latest
name: Test without extras on ${{matrix.platform}}
runs-on: ${{matrix.platform}}-latest
strategy:
matrix:
platform: [ubuntu,windows,macos]

steps:
- uses: actions/checkout@v2
with:
fetch-depth: 0

- name: Set up Python ${{matrix.python}}
- name: Set up Python 3.8
uses: actions/setup-python@v2
with:
python-version: 3.8
Expand All @@ -135,7 +140,7 @@ jobs:
uses: actions/cache@v1
with:
path: ${{ steps.pip-cache.outputs.dir }}
key: no-extras-pip-${{ hashFiles('*.egg-info/requires.txt')}}
key: ${{matrix.platform}}-no-extras-pip-${{ hashFiles('*.egg-info/requires.txt')}}

- name: Install package
run: |
Expand Down
160 changes: 154 additions & 6 deletions binpickle/format.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@

import struct
from typing import NamedTuple
from abc import ABCMeta, abstractmethod
import msgpack

MAGIC = b'BPCK'
VERSION = 1
MIN_VERSION = 1
DEFAULT_VERSION = 2
MAX_VERSION = 2
HEADER_FORMAT = struct.Struct('!4sHHq')
TRAILER_FORMAT = struct.Struct('!QLL')

Expand All @@ -16,12 +20,12 @@ class FileHeader(NamedTuple):
File header for a BinPickle file. The header is a 16-byte sequence containing the
magic (``BPCK``) followed by version and offset information:

1. File version (2 bytes, big-endian). Currently only version 1 exists.
1. File version (2 bytes, big-endian). Currently only versions 1 and 2 exist.
2. Reserved (2 bytes). Set to 0.
3. File length (8 bytes, big-endian). Length is signed; if the file length is not known,
this field is set to -1.
"""
version: int = VERSION
version: int = DEFAULT_VERSION
"The NumPy file version."
length: int = -1
"The length of the file (-1 for unknown)."
Expand All @@ -36,7 +40,7 @@ def decode(cls, buf, *, verify=True):
m, v, pad, off = HEADER_FORMAT.unpack(buf)
if verify and m != MAGIC:
raise ValueError('invalid magic {}'.format(m))
if verify and v != VERSION:
if verify and (v > MAX_VERSION or v < MIN_VERSION):
raise ValueError('invalid version {}'.format(v))
if verify and pad != 0:
raise ValueError('invalid padding')
Expand Down Expand Up @@ -85,7 +89,8 @@ def decode(cls, buf, *, verify=True):

class IndexEntry(NamedTuple):
"""
Index entry for a buffer in the BinPickle index.
Index entry for a buffer in the BinPickle index. In the BinPickle file,
these are saved in MsgPack format in the file index.
"""
offset: int
"The position in the file where the buffer begins (bytes)."
Expand All @@ -95,16 +100,159 @@ class IndexEntry(NamedTuple):
"The decoded length of the buffer in bytes."
checksum: int
"The Adler-32 checksum of the encoded buffer data."
content_hash: bytes or None = None
"""
The SHA1 checksum of the *decoded* buffer data. In a V1 BinPickle file,
this will be empty.
"""
codec: tuple = None
"The codec used to encode the buffer, or None."

def to_repr(self):
"Convert an index entry to its MsgPack-compatible representation"
return dict((k, getattr(self, k)) for k in self._fields)
repr = dict((k, getattr(self, k)) for k in self._fields)
if self.content_hash is None:
del repr['content_hash']
return repr

@classmethod
def from_repr(cls, repr):
"Convert an index entry from its MsgPack-compatible representation"
if not isinstance(repr, dict):
raise TypeError("IndexEntry representation must be a dict")
return cls(**repr)

def __hash__(self):
return hash((self.offset, self.enc_length, self.dec_length,
self.checksum, self.content_hash))


class FileIndex(metaclass=ABCMeta):
"""
Index of a BinPickle file. This is stored in MsgPack format in the
BinPickle file.
"""
def __new__(cls, *args, version=DEFAULT_VERSION, **kwargs):
if version == 1:
return super().__new__(FileIndexV1)
elif version == 2:
return super().__new__(FileIndexV2)
else:
raise ValueError(f'unknown version {version}')

@classmethod
def unpack(cls, index, version=DEFAULT_VERSION):
unpacked = msgpack.unpackb(index)
if isinstance(unpacked, list):
return FileIndexV1.from_repr(unpacked, version)
elif isinstance(unpacked, dict) and version >= 2:
return FileIndexV2.from_repr(unpacked, version)
else:
raise ValueError('unknown index format')

@abstractmethod
def buffers(self):
"""
Return the buffer entries in order, as needed to reconstitute the
pickled object. Duplicates are copied in proper positions.
"""
pass

@abstractmethod
def stored_buffers(self):
"""
Return the actually stored buffer entries in the order they appear
in the file. Does not include duplicates.
"""
pass

@abstractmethod
def add_entry(self, hash, entry: IndexEntry = None):
"""
Add an entry to the index.
"""
pass

@abstractmethod
def pack(self):
"""
Pack the index into a binary buffer.
"""


class FileIndexV1(FileIndex):
"""
Index of a BinPickle file. This is stored in MsgPack format in the
BinPickle file.
"""
def __init__(self, entries=None, version=1):
assert version == 1
self._entries = entries if entries is not None else []

def buffers(self):
return self._entries

def stored_buffers(self):
return self._entries

def add_entry(self, hash, entry: IndexEntry = None):
if entry is None:
if isinstance(hash, IndexEntry):
entry = hash
hash = entry.content_hash
else:
raise RuntimeError('Version 1 does not support deduplication')
self._entries.append(entry)

def pack(self):
return msgpack.packb([b.to_repr() for b in self.buffers()])

@classmethod
def from_repr(cls, repr, version):
return cls([IndexEntry.from_repr(r) for r in repr], version=1)

def __len__(self):
return len(self._entries)


class FileIndexV2(FileIndex):
"""
Index of a BinPickle file. This is stored in MsgPack format in the
BinPickle file.
"""
def __init__(self, entries=None, buffers=None, version=2):
assert version == 2
if entries is None:
self._entries = {}
self._buf_list = []
else:
self._entries = dict((e.content_hash, e) for e in entries)
self._buf_list = buffers

def buffers(self):
return [self._entries[h] for h in self._buf_list]

def stored_buffers(self):
return list(self._entries.values())

def add_entry(self, hash, entry: IndexEntry = None):
if entry is not None and entry.content_hash is None:
raise ValueError('V2 index requires content hashes')
if entry is not None:
self._entries[hash] = entry
self._buf_list.append(hash)

def pack(self):
return msgpack.packb({
'entries': list(e.to_repr() for e in self._entries.values()),
'buffers': self._buf_list
})

@classmethod
def from_repr(cls, repr, version):
entries = [IndexEntry.from_repr(r) for r in repr['entries']]
bufs = repr['buffers']
return cls(entries, bufs, version=version)

def __len__(self):
return len(self._buf_list)
31 changes: 17 additions & 14 deletions binpickle/read.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@
import logging
import io
from zlib import adler32
import msgpack

from .compat import pickle
from .format import FileHeader, IndexEntry, FileTrailer
from .format import FileHeader, IndexEntry, FileTrailer, FileIndex
from .codecs import get_codec

_log = logging.getLogger(__name__)
Expand Down Expand Up @@ -46,13 +45,14 @@ def load(self):
"""
Load the object from the binpickle file.
"""
if not self.entries:
if not self.index or len(self.index) == 0:
raise ValueError('empty pickle file has no objects')
p_bytes = self._read_buffer(self.entries[-1], direct=True)
buffers = self.index.buffers()
p_bytes = self._read_buffer(buffers[-1], direct=True)
_log.debug('unpickling %d bytes and %d buffers',
len(p_bytes), len(self.entries) - 1)
len(p_bytes), len(self.index) - 1)

buf_gen = (self._read_buffer(e) for e in self.entries[:-1])
buf_gen = (self._read_buffer(e) for e in buffers[:-1])
up = pickle.Unpickler(io.BytesIO(p_bytes), buffers=buf_gen)
return up.load()

Expand All @@ -72,16 +72,19 @@ def find_errors(self):
errors.append(f'invalid index checksum ({i_sum} != {self.trailer.checksum})')

position = 16
for i, e in enumerate(self.entries):
for i, e in enumerate(self.index.stored_buffers()):
if e.offset < position:
errors.append(f'entry {i}: offset {e.offset} before expected start {position}')
buf = self._read_buffer(e, direct=True)
ndec = len(buf)
if ndec != e.dec_length:
errors.append(f'entry {i}: decoded to {ndec} bytes, expected {e.dec_length}')
position = e.offset + e.enc_length

cks = adler32(self._read_buffer(e, direct=True, decode=False))
if cks != e.checksum:
errors.append('entry {i}: invalid checksum ({cks} != {e.checksum}')
errors.append(f'entry {i}: invalid checksum ({cks} != {e.checksum}')
else:
buf = self._read_buffer(e, direct=True)
ndec = len(buf)
if ndec != e.dec_length:
errors.append(f'entry {i}: decoded to {ndec} bytes, expected {e.dec_length}')

return errors

Expand All @@ -108,8 +111,8 @@ def _read_index(self):
i_start = self.trailer.offset
i_end = i_start + self.trailer.length
self._index_buf = self._mv[i_start:i_end]
self.entries = [IndexEntry.from_repr(e) for e in msgpack.unpackb(self._index_buf)]
_log.debug('read %d entries from file', len(self.entries))
self.index = FileIndex.unpack(self._index_buf)
_log.debug('read %d entries from file', len(self.index))

def _read_buffer(self, entry: IndexEntry, *, direct=None, decode=True):
start = entry.offset
Expand Down
80 changes: 80 additions & 0 deletions binpickle/repickle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""
Re-pickle a file.

Usage:
repickle [options] SRC DST

Options:
-f FORMAT
The source format [default: pickle].
-t FORMAT
The destination format [default: binpickle].
-p PROTOCOL
Protocol version number.
-v, --verbose
Output debug logging.
SRC
The source file.
DST
The destination file.
"""

import warnings
import pathlib
import logging

from natural.size import binarysize
from docopt import docopt

import binpickle
import pickle
if pickle.HIGHEST_PROTOCOL < 5:
try:
import pickle5 as pickle
except ImportError:
warnings.warn('No pickle5 module, only protocol 4 supported', ImportWarning)

_log = logging.getLogger('binpickle.repickle')


def read_object(src, format):
src = pathlib.Path(src)
stat = src.stat()
_log.info('reading %s file %s', format, src)
_log.info('input size %s', binarysize(stat.st_size))
if format == 'pickle':
with src.open('rb') as f:
return pickle.load(f)
elif format == 'binpickle':
with binpickle.BinPickleFile(src) as bpk:
return bpk.load()
else:
_log.error('invalid source format %s', format)
raise ValueError('invalid source format ' + format)


def write_object(obj, dst, format, protocol):
if protocol is not None:
protocol = int(protocol)
dst = pathlib.Path(dst)
_log.info('writing %s file %s', format, dst)
if format == 'pickle':
with dst.open('wb') as f:
pickle.dump(obj, f, protocol=protocol)
elif format == 'binpickle':
binpickle.dump(obj, dst)
else:
_log.error('invalid destination format %s', format)
raise ValueError('invalid destination format ' + format)


def main(opts):
obj = read_object(opts['SRC'], opts['-f'])
write_object(obj, opts['DST'], opts['-t'], opts['-p'])


if __name__ == '__main__':
opts = docopt(__doc__)
level = logging.DEBUG if opts['--verbose'] else logging.INFO
logging.basicConfig(level=level)
main(opts)
Loading