From 2afcf2e0768254aab47e18c82c09e130329c86ae Mon Sep 17 00:00:00 2001 From: Juarez Rudsatz Date: Mon, 29 Jun 2020 13:48:42 -0300 Subject: [PATCH] Petl use fsspec (#494) * use fsspec for handling remote protocols use fsspec for handling remote protocols Add RemoteSource for handling all remote sources with fsspec Remove S3Source replaced by fsspec Keep SMBSource as it isn't handled by fsspec Refactored namespaces to remotes * use compression from fsspec remove codec implementation in petl * update docs with changes for fsspec * added another test case for fsspec * make appveyor happy * Apply suggestions from code review of #494 Ready to merge. Co-authored-by: Alistair Miles Co-authored-by: Juarez Rudsatz Co-authored-by: Juarez Rudsatz Co-authored-by: Alistair Miles --- docs/changes.rst | 16 +++ docs/io.rst | 35 ++--- petl/io/__init__.py | 4 + petl/io/codec/__init__.py | 7 - petl/io/codec/lz4.py | 50 ------- petl/io/codec/xz.py | 38 ------ petl/io/codec/zstd.py | 58 --------- petl/io/remotes.py | 245 +++++++++++++++++++++++++++++++++++ petl/io/source/__init__.py | 5 - petl/io/source/s3.py | 67 ---------- petl/io/source/smb.py | 120 ----------------- petl/io/sources.py | 13 +- petl/test/io/test_codec.py | 93 ------------- petl/test/io/test_remotes.py | 157 ++++++++++++++++++++++ petl/test/io/test_source.py | 114 ---------------- 15 files changed, 440 insertions(+), 582 deletions(-) delete mode 100644 petl/io/codec/__init__.py delete mode 100644 petl/io/codec/lz4.py delete mode 100644 petl/io/codec/xz.py delete mode 100644 petl/io/codec/zstd.py create mode 100644 petl/io/remotes.py delete mode 100644 petl/io/source/__init__.py delete mode 100644 petl/io/source/s3.py delete mode 100644 petl/io/source/smb.py delete mode 100644 petl/test/io/test_codec.py create mode 100644 petl/test/io/test_remotes.py delete mode 100644 petl/test/io/test_source.py diff --git a/docs/changes.rst b/docs/changes.rst index 1fe4ab71..80e6193f 100644 --- a/docs/changes.rst +++ b/docs/changes.rst @@ -1,6 +1,22 @@ Changes ======= +Version 1.6.0 +------------- + +* Added class :class:`petl.io.remotes.RemoteSource` using package **fsspec** + for reading and writing files in remote servers by using the protocol in the + url for selecting the implementation. + By :user:`juarezr`, :issue:`494`. + +* Removed classes :class:`petl.io.source.s3.S3Source` as it's handled by fsspec + By :user:`juarezr`, :issue:`494`. + +* Removed classes :class:`petl.io.codec.xz.XZCodec`, :class:`petl.io.codec.xz.LZ4Codec` + and :class:`petl.io.codec.zstd.ZstandardCodec` as it's handled by fsspec. + By :user:`juarezr`, :issue:`494`. + + Version 1.5.0 ------------- diff --git a/docs/io.rst b/docs/io.rst index d725691b..a28bfed4 100644 --- a/docs/io.rst +++ b/docs/io.rst @@ -359,13 +359,8 @@ The behaviour of each source can usually be configured by passing arguments to the constructor, see the source code of the :mod:`petl.io.sources` module for full details. -.. autoclass:: petl.io.sources.FileSource -.. autoclass:: petl.io.sources.GzipSource -.. autoclass:: petl.io.sources.BZ2Source -.. autoclass:: petl.io.sources.ZipSource .. autoclass:: petl.io.sources.StdinSource .. autoclass:: petl.io.sources.StdoutSource -.. autoclass:: petl.io.sources.URLSource .. autoclass:: petl.io.sources.MemorySource .. autoclass:: petl.io.sources.PopenSource @@ -383,26 +378,23 @@ in :ref:`Extract ` and :ref:`Load `. It's possible to read and write just by prefixing the protocol (e.g: `s3://`) in the source path of the file. -.. autoclass:: petl.io.source.s3.S3Source -.. autoclass:: petl.io.source.smb.SMBSource +.. autoclass:: petl.io.remotes.RemoteSource +.. autoclass:: petl.io.remotes.SMBSource -.. _io_codecs: +.. _io_deprecated: -Compression I/O helper classes ------------------------------- +Deprecated I/O helper classes +----------------------------- -The following classes are helpers for decompressing (``from...()``) and -compressing (``to...()``) in functions transparently as a file-like source. +The following helpers are deprecated and will be removed in a future version. -There are no need to instantiate them. They are used in the mecanism described -in :ref:`Extract ` and :ref:`Load `. +It's functionality was replaced by helpers in :ref:`Remote helpers `. -It's possible to compress and decompress just by specifying the file extension -(e.g: `.csv.xz`) in end of the source filename. - -.. autoclass:: petl.io.codec.xz.XZCodec -.. autoclass:: petl.io.codec.zstd.ZstandardCodec -.. autoclass:: petl.io.codec.lz4.LZ4Codec +.. autoclass:: petl.io.sources.FileSource +.. autoclass:: petl.io.sources.GzipSource +.. autoclass:: petl.io.sources.BZ2Source +.. autoclass:: petl.io.sources.ZipSource +.. autoclass:: petl.io.sources.URLSource .. _io_custom_helpers: @@ -410,11 +402,10 @@ Custom I/O helper classes ------------------------------ For creating custom helpers for :ref:`remote I/O ` or -:ref:`compression ` use the following functions: +`compression` use the following functions: .. autofunction:: petl.io.sources.register_reader .. autofunction:: petl.io.sources.register_writer -.. autofunction:: petl.io.sources.register_codec See the source code of the classes in :mod:`petl.io.sources` module for more details. diff --git a/petl/io/__init__.py b/petl/io/__init__.py index b89e6aab..28497317 100644 --- a/petl/io/__init__.py +++ b/petl/io/__init__.py @@ -39,3 +39,7 @@ from petl.io.avro import fromavro, toavro, appendavro from petl.io.sources import register_codec, register_reader, register_writer + +from petl.io.remotes import RemoteSource + +from petl.io.remotes import SMBSource diff --git a/petl/io/codec/__init__.py b/petl/io/codec/__init__.py deleted file mode 100644 index 28411f32..00000000 --- a/petl/io/codec/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -from __future__ import absolute_import, print_function, division - -from petl.io.codec.zstd import ZstandardCodec - -from petl.io.codec.lz4 import LZ4Codec - -from petl.io.codec.xz import XZCodec diff --git a/petl/io/codec/lz4.py b/petl/io/codec/lz4.py deleted file mode 100644 index 035ccc02..00000000 --- a/petl/io/codec/lz4.py +++ /dev/null @@ -1,50 +0,0 @@ -# -*- coding: utf-8 -*- -from __future__ import absolute_import, print_function, division - -from contextlib import contextmanager - -from petl.io.sources import register_codec - - -class LZ4Codec(object): - ''' - Allows compressing and decompressing .lz4 files - - `LZ4`_ is lossless compression algorithm, providing compression - speed greather than 500 MB/s per core (>0.15 Bytes/cycle). It features an - extremely fast decoder, with speed in multiple GB/s per core (~1Byte/cycle) - - .. note:: - - For working this codec require `python-lz4`_ to be installed, e.g.:: - - $ pip install lz4 - - .. versionadded:: 1.5.0 - - .. _python-lz4: https://github.com/python-lz4/python-lz4 - .. _LZ4: http://www.lz4.org - ''' - - def __init__(self, filename, **kwargs): - self.filename = filename - self.kwargs = kwargs - - def open_file(self, mode='rb'): - import lz4.frame - source = lz4.frame.open(self.filename, mode=mode, **self.kwargs) - return source - - @contextmanager - def open(self, mode='r'): - mode2 = mode[:1] + r'b' # python2 - source = self.open_file(mode=mode2) - try: - yield source - finally: - source.close() - - -register_codec('.lz4', LZ4Codec) - -# end # diff --git a/petl/io/codec/xz.py b/petl/io/codec/xz.py deleted file mode 100644 index d6192a1a..00000000 --- a/petl/io/codec/xz.py +++ /dev/null @@ -1,38 +0,0 @@ -# -*- coding: utf-8 -*- -from __future__ import absolute_import, print_function, division - -from contextlib import contextmanager - -from petl.io.sources import register_codec - -class XZCodec(object): - ''' - Allows compressing and decompressing .xz files compressed with `lzma`_. - - .. versionadded:: 1.5.0 - - .. _lzma: https://docs.python.org/3/library/lzma.html - ''' - - def __init__(self, filename, **kwargs): - self.filename = filename - self.kwargs = kwargs - - def open_file(self, mode='rb'): - import lzma - source = lzma.open(self.filename, mode=mode, **self.kwargs) - return source - - @contextmanager - def open(self, mode='r'): - mode2 = mode[:1] + r'b' # python2 - source = self.open_file(mode=mode2) - try: - yield source - finally: - source.close() - - -register_codec('.xz', XZCodec) - -# end # diff --git a/petl/io/codec/zstd.py b/petl/io/codec/zstd.py deleted file mode 100644 index 6e6c0a07..00000000 --- a/petl/io/codec/zstd.py +++ /dev/null @@ -1,58 +0,0 @@ -# -*- coding: utf-8 -*- -from __future__ import absolute_import, print_function, division - -import io -from contextlib import contextmanager - -from petl.io.sources import register_codec - - -class ZstandardCodec(object): - ''' - Allows compressing and decompressing .zstd files - - `Zstandard`_ is a real-time compression algorithm, providing - high compression ratios. It offers a very wide range of compression / speed - trade-off, while being backed by a very fast decoder. - - .. note:: - - For working this codec require `zstd`_ to be installed, e.g.:: - - $ pip install zstandard - - .. versionadded:: 1.5.0 - - .. _zstd: https://github.com/indygreg/python-zstandard - .. _Zstandard: http://www.zstd.net - ''' - - def __init__(self, filename, **kwargs): - self.filename = filename - self.kwargs = kwargs - - def open_file(self, mode='rb'): - import zstandard as zstd - if mode.startswith('r'): - cctx = zstd.ZstdDecompressor(**self.kwargs) - compressed = io.open(self.filename, mode) - source = cctx.stream_reader(compressed) - else: - cctx = zstd.ZstdCompressor(**self.kwargs) - uncompressed = io.open(self.filename, mode) - source = cctx.stream_writer(uncompressed) - return source - - @contextmanager - def open(self, mode='r'): - mode2 = mode[:1] + r'b' # python2 - source = self.open_file(mode=mode2) - try: - yield source - finally: - source.close() - - -register_codec('.zst', ZstandardCodec) - -# end # diff --git a/petl/io/remotes.py b/petl/io/remotes.py new file mode 100644 index 00000000..c7041fcb --- /dev/null +++ b/petl/io/remotes.py @@ -0,0 +1,245 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, print_function, division + +import io +import logging +from contextlib import contextmanager + +from petl.compat import PY3 +from petl.io.sources import register_reader, register_writer + +logger = logging.getLogger(__name__) + +# region RemoteSource + + +class RemoteSource(object): + """Read or write directly from files in remote filesystems. + + This source handles many filesystems that are selected based on the + protocol passed in the `url` argument. + + The url should be specified in `to..()` and `from...()` functions. E.g.:: + + >>> import petl as etl + >>> + >>> def example_s3(): + ... url = 's3://mybucket/prefix/to/myfilename.csv' + ... data = b'foo,bar\\na,1\\nb,2\\nc,2\\n' + ... + ... etl.tocsv(data, url) + ... tbl = etl.fromcsv(url) + ... + >>> example_s3() # doctest: +SKIP + +-----+-----+ + | foo | bar | + +=====+=====+ + | 'a' | '1' | + +-----+-----+ + | 'b' | '2' | + +-----+-----+ + | 'c' | '2' | + +-----+-----+ + + This source uses `fsspec`_ to provide the data transfer with the remote + filesystem. Check the `Built-in Implementations `_ for available + remote implementations. + + Some filesystem can use `URL chaining `_ for compound I/O. + + .. note:: + + For working this source require `fsspec`_ to be installed, e.g.:: + + $ pip install fsspec + + Some remote filesystems require aditional packages to be installed. + Check `Known Implementations `_ for checking what packages + need to be installed, e.g.:: + + $ pip install s3fs # AWS S3 + $ pip install gcsfs # Google Cloud Storage + $ pip install adlfs # Azure Blob service + $ pip install paramiko # SFTP + $ pip install requests # HTTP, github + + .. versionadded:: 1.6.0 + + .. _fsspec: https://filesystem-spec.readthedocs.io/en/latest/ + .. _fs_builtin: https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations + .. _fs_known: https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations + .. _fs_chain: https://filesystem-spec.readthedocs.io/en/latest/features.html#url-chaining + """ + + def __init__(self, url, **kwargs): + self.url = url + self.kwargs = kwargs + + def open_file(self, mode="rb"): + import fsspec + + fs = fsspec.open(self.url, mode=mode, compression='infer', **self.kwargs) + return fs + + @contextmanager + def open(self, mode="rb"): + mode2 = mode[:1] + r"b" # python2 + fs = self.open_file(mode=mode2) + with fs as source: + yield source + + +# registering filesystems with packages installed + + +def _register_filesystems(only_available=False): + """Search for python packages supporting remote filesystems.""" + + from fsspec.registry import known_implementations + + impls = known_implementations.items() + q = 0 + for protocol, spec in impls: + # use the available for for compatibility reasons until next spring cleaning + if protocol.startswith("http"): + continue + if "err" in spec: + emsg = "# WARN: fsspec {} unavailable: {}".format(protocol, spec["err"]) + logger.warning(emsg) + if only_available: + continue + register_reader(protocol, RemoteSource) + register_writer(protocol, RemoteSource) + q += 1 + logger.debug("# fsspec: registered {} providers".format(q)) + + +def _try_register_filesystems(): + try: + import fsspec + except ImportError as ie: + logger.warning("# Missing fsspec package. Install with: pip install fsspec") + else: + try: + _register_filesystems() + except Exception as ex: + raise Exception("# ERROR: failed to register fsspec filesystems", ex) + + +if PY3: + _try_register_filesystems() + +# endregion + +# region SMBSource + + +class SMBSource(object): + """Downloads or uploads to Windows and Samba network drives. E.g.:: + + >>> def example_smb(): + ... import petl as etl + ... url = 'smb://user:password@server/share/folder/file.csv' + ... data = b'foo,bar\\na,1\\nb,2\\nc,2\\n' + ... etl.tocsv(data, url) + ... tbl = etl.fromcsv(url) + ... + >>> example_smb() # doctest: +SKIP + +-----+-----+ + | foo | bar | + +=====+=====+ + | 'a' | '1' | + +-----+-----+ + | 'b' | '2' | + +-----+-----+ + | 'c' | '2' | + +-----+-----+ + + The argument `url` (str) must have a URI with format: + `smb://workgroup;user:password@server:port/share/folder/file.csv`. + + Note that you need to pass in a valid hostname or IP address for the host + component of the URL. Do not use the Windows/NetBIOS machine name for the + host component. + + The first component of the path in the URL points to the name of the shared + folder. Subsequent path components will point to the directory/folder/file. + + .. note:: + + For working this source require `smbprotocol`_ to be installed, e.g.:: + + $ pip install smbprotocol[kerberos] + + .. versionadded:: 1.5.0 + + .. _smbprotocol: https://github.com/jborean93/smbprotocol#requirements + """ + + def __init__(self, url, **kwargs): + self.url = url + self.kwargs = kwargs + + @contextmanager + def open(self, mode="rb"): + mode2 = mode[:1] + r"b" # python2 + source = _open_file_smbprotocol(self.url, mode=mode2, **self.kwargs) + try: + yield source + finally: + source.close() + + +def _open_file_smbprotocol(url, mode="rb", **kwargs): + + domain, host, port, user, passwd, server_path = _parse_smb_url(url) + import smbclient + + try: + # register the server with explicit credentials + if user: + session = smbclient.register_session( + host, username=user, password=passwd, port=port + ) + # Read an existing file as bytes + mode2 = mode[:1] + r"b" + filehandle = smbclient.open_file(server_path, mode=mode2, **kwargs) + return filehandle + + except Exception as ex: + raise ConnectionError("SMB error: %s" % ex).with_traceback(sys.exc_info()[2]) + + +def _parse_smb_url(url): + e = "SMB url must be smb://workgroup;user:password@server:port/share/folder/file.txt: " + + if not url: + raise ValueError("SMB error: no host given") + elif not url.startswith("smb://"): + raise ValueError(e + url) + + if PY3: + from urllib.parse import urlparse + else: + from urlparse import urlparse + parsed = urlparse(url) + if not parsed.path: + raise ValueError(e + url) + + unc_path = parsed.path.replace("/", "\\") + server_path = "\\\\{}{}".format(parsed.hostname, unc_path) + + if not parsed.username: + domain, username = None + elif ";" in parsed.username: + domain, username = parsed.username.split(";") + else: + domain, username = None, parsed.username + port = 445 if not parsed.port else int(parsed.port) + return domain, parsed.hostname, port, username, parsed.password, server_path + + +register_reader("smb", SMBSource) +register_writer("smb", SMBSource) + +# endregion diff --git a/petl/io/source/__init__.py b/petl/io/source/__init__.py deleted file mode 100644 index 3aaaacbe..00000000 --- a/petl/io/source/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from __future__ import absolute_import, print_function, division - -from petl.io.source.s3 import S3Source - -from petl.io.source.smb import SMBSource diff --git a/petl/io/source/s3.py b/petl/io/source/s3.py deleted file mode 100644 index 2eb59b62..00000000 --- a/petl/io/source/s3.py +++ /dev/null @@ -1,67 +0,0 @@ -# -*- coding: utf-8 -*- -from __future__ import absolute_import, print_function, division - -import io -from petl.io.sources import register_reader, register_writer -from contextlib import contextmanager - - -class S3Source(object): - '''Downloads or uploads to AWS S3 filesystem. E.g.:: - - >>> def example_s3(): - ... import petl as etl - ... url = 's3://mybucket/prefix/to/myfilename.csv' - ... data = b'foo,bar\\na,1\\nb,2\\nc,2\\n' - ... etl.tocsv(data, url) - ... tbl = etl.fromcsv(url) - ... - >>> example_s3() # doctest: +SKIP - +-----+-----+ - | foo | bar | - +=====+=====+ - | 'a' | '1' | - +-----+-----+ - | 'b' | '2' | - +-----+-----+ - | 'c' | '2' | - +-----+-----+ - - .. note:: - - For working this source require `s3fs`_ to be installed, e.g.:: - - $ pip install s3fs - - It is strongly recommended that you open files in binary mode. - - For authentication check `credentials`_. - - .. versionadded:: 1.5.0 - - .. _s3fs: https://github.com/dask/s3fs/ - .. _credentials: https://s3fs.readthedocs.io/en/latest/#credentials - ''' - - def __init__(self, url, **kwargs): - self.url = url - self.kwargs = kwargs - - def open_file(self, mode='rb'): - import s3fs - fs = s3fs.S3FileSystem() - source = fs.open(self.url, mode=mode, **self.kwargs) - return source - - @contextmanager - def open(self, mode='rb'): - mode2 = mode[:1] + r'b' # python2 - source = self.open_file(mode=mode2) - try: - yield source - finally: - source.close() - - -register_reader('s3', S3Source) -register_writer('s3', S3Source) diff --git a/petl/io/source/smb.py b/petl/io/source/smb.py deleted file mode 100644 index 8cdd4efb..00000000 --- a/petl/io/source/smb.py +++ /dev/null @@ -1,120 +0,0 @@ -# -*- coding: utf-8 -*- -from __future__ import absolute_import, print_function, division - -import sys -from contextlib import contextmanager - -from petl.compat import PY3 -from petl.io.sources import register_reader, register_writer - - -class SMBSource(object): - '''Downloads or uploads to Windows and Samba network drives. E.g.:: - - >>> def example_smb(): - ... import petl as etl - ... url = 'smb://user:password@server/share/folder/file.csv' - ... data = b'foo,bar\\na,1\\nb,2\\nc,2\\n' - ... etl.tocsv(data, url) - ... tbl = etl.fromcsv(url) - ... - >>> example_smb() # doctest: +SKIP - +-----+-----+ - | foo | bar | - +=====+=====+ - | 'a' | '1' | - +-----+-----+ - | 'b' | '2' | - +-----+-----+ - | 'c' | '2' | - +-----+-----+ - - The argument `url` (str) must have a URI with format: - `smb://workgroup;user:password@server:port/share/folder/file.csv`. - - Note that you need to pass in a valid hostname or IP address for the host - component of the URL. Do not use the Windows/NetBIOS machine name for the - host component. - - The first component of the path in the URL points to the name of the shared - folder. Subsequent path components will point to the directory/folder/file. - - .. note:: - - For working this source require `smbprotocol`_ to be installed, e.g.:: - - $ pip install smbprotocol[kerberos] - - .. versionadded:: 1.5.0 - - .. _smbprotocol: https://github.com/jborean93/smbprotocol#requirements - ''' - - def __init__(self, url, **kwargs): - self.url = url - self.kwargs = kwargs - - @contextmanager - def open(self, mode='rb'): - mode2 = mode[:1] + r'b' # python2 - source = _open_file_smbprotocol(self.url, mode=mode2, **self.kwargs) - try: - yield source - finally: - source.close() - -# region SMBHandler - - -def _open_file_smbprotocol(url, mode='rb', **kwargs): - - domain, host, port, user, passwd, server_path = _parse_smb_url(url) - import smbclient - try: - # register the server with explicit credentials - if user: - session = smbclient.register_session( - host, username=user, password=passwd, port=port) - # Read an existing file as bytes - mode2 = mode[:1] + r'b' - filehandle = smbclient.open_file(server_path, mode=mode2, **kwargs) - return filehandle - - except Exception as ex: - raise ConnectionError('SMB error: %s' % - ex).with_traceback(sys.exc_info()[2]) - - -def _parse_smb_url(url): - e = 'SMB url must be smb://workgroup;user:password@server:port/share/folder/file.txt: ' - - if not url: - raise ValueError('SMB error: no host given') - elif not url.startswith('smb://'): - raise ValueError(e + url) - - if PY3: - from urllib.parse import urlparse - else: - from urlparse import urlparse - parsed = urlparse(url) - if not parsed.path: - raise ValueError(e + url) - - unc_path = parsed.path.replace("/", "\\") - server_path = "\\\\{}{}".format(parsed.hostname, unc_path) - - if not parsed.username: - domain, username = None - elif ';' in parsed.username: - domain, username = parsed.username.split(';') - else: - domain, username = None, parsed.username - port = 555 if not parsed.port else int(parsed.port) - return domain, parsed.hostname, port, username, parsed.password, server_path - -# endregion - - -register_reader('smb', SMBSource) -register_writer('smb', SMBSource) diff --git a/petl/io/sources.py b/petl/io/sources.py index 423b1252..6736014d 100644 --- a/petl/io/sources.py +++ b/petl/io/sources.py @@ -411,7 +411,7 @@ def _get_handler_from(source, handlers): return None -def _resolve_source_from_arg(source, handlers, sync_mode): +def _resolve_source_from_arg(source, handlers): if source is None: return StdinSource() elif isinstance(source, string_types): @@ -420,12 +420,9 @@ def _resolve_source_from_arg(source, handlers, sync_mode): if handler is None: if codec is not None: return codec(source) + assert '://' not in source, _invalid_source_msg % source return FileSource(source) - io_handler = handler(source) - if codec is None: - return io_handler - handler = CompressedSource(io_handler, codec) - return handler + return handler(source) else: assert (hasattr(source, 'open') and callable(getattr(source, 'open'))), \ @@ -434,8 +431,8 @@ def _resolve_source_from_arg(source, handlers, sync_mode): def read_source_from_arg(source): - return _resolve_source_from_arg(source, _READERS, 'rb') + return _resolve_source_from_arg(source, _READERS) def write_source_from_arg(source, mode='wb'): - return _resolve_source_from_arg(source, _WRITERS, mode) + return _resolve_source_from_arg(source, _WRITERS) diff --git a/petl/test/io/test_codec.py b/petl/test/io/test_codec.py deleted file mode 100644 index ff28149c..00000000 --- a/petl/test/io/test_codec.py +++ /dev/null @@ -1,93 +0,0 @@ -# -*- coding: utf-8 -*- -from __future__ import absolute_import, print_function, division - -import sys -import os -from tempfile import NamedTemporaryFile - -from petl.compat import PY3 -from petl.test.helpers import ieq -from petl.io.avro import fromavro, toavro -from petl.io.csv import fromcsv, tocsv -from petl.util.vis import look - -# region Codec test cases - -try: - import lzma -except ImportError as e: - print('SKIP XZ helper tests: %s' % e, file=sys.stderr) -else: - def test_helper_xz(): - _write_read_with_codec('.xz') - -try: - import lz4.frame -except ImportError as e: - print('SKIP LZ4 helper tests: %s' % e, file=sys.stderr) -else: - def test_helper_lz4(): - _write_read_with_codec('.lz4') - -try: - import zstandard as zstd -except ImportError as e: - print('SKIP ZSTANDARD helper tests: %s' % e, file=sys.stderr) -else: - def test_helper_zstd(): - _write_read_with_codec('.zstd') - -# endregion - -# region Execution - - -def _write_read_with_codec(file_ext): - - _table = ((u'name', u'friends', u'age'), - (u'Bob', '42', '33'), - (u'Jim', '13', '69'), - (u'Joe', '86', '17'), - (u'Ted', '23', '51')) - - _show__rows_from("Expected:", _table) - - has_avro = _test_avro_too() - - compressed_csv = _get_temp_file_for('.csv' + file_ext) - compressed_avr = _get_temp_file_for('.avro' + file_ext) - - tocsv(_table, compressed_csv, encoding='ascii', lineterminator='\n') - if has_avro: - toavro(_table, compressed_avr) - - csv_actual = fromcsv(compressed_csv, encoding='ascii') - if has_avro: - avr_actual = fromavro(compressed_avr) - - _show__rows_from("Actual:", csv_actual) - - ieq(_table, csv_actual) - ieq(_table, csv_actual) # verify can iterate twice - if has_avro: - ieq(_table, avr_actual) - ieq(_table, avr_actual) # verify can iterate twice - - -def _get_temp_file_for(file_ext): - with NamedTemporaryFile(delete=False, mode='wb') as fo: - return fo.name + file_ext - - -def _show__rows_from(label, test_rows, limit=0): - print(label) - print(look(test_rows, limit=limit)) - -def _test_avro_too(): - try: - import fastavro - return True - except: - return False - -# endregion diff --git a/petl/test/io/test_remotes.py b/petl/test/io/test_remotes.py new file mode 100644 index 00000000..9ccf46e2 --- /dev/null +++ b/petl/test/io/test_remotes.py @@ -0,0 +1,157 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, print_function, division + +import sys +import os + +from petl.compat import PY3 +from petl.test.helpers import ieq, eq_ +from petl.io.avro import fromavro, toavro +from petl.io.csv import fromcsv, tocsv +from petl.util.vis import look + +# region Codec test cases + + +def test_helper_local(): + if PY3: + _write_read_into_url("./example.") + + +try: + import fsspec +except ImportError as e: + print("SKIP FSSPEC helper tests: %s" % e, file=sys.stderr) +else: + + def test_helper_fsspec(): + # _write_read_from_env_url('PETL_TEST_S3') + _write_read_from_env_matching("PETL_TEST_") + + +try: + import smbclient +except ImportError as e: + print("SKIP SMB helper tests: %s" % e, file=sys.stderr) +else: + + def test_helper_smb(): + _write_read_from_env_url("PETL_SMB_URL") + + def test_helper_smb_url_parse(): + from petl.io.remotes import _parse_smb_url + + url = r"smb://workgroup;user:password@server:444/share/folder/file.csv" + domain, host, port, user, passwd, server_path = _parse_smb_url(url) + print("Parsed:", domain, host, port, user, passwd, server_path) + eq_(domain, r"workgroup") + eq_(host, r"server") + eq_(port, 444) + eq_(user, r"user") + eq_(passwd, r"password") + eq_(server_path, "\\\\server\\share\\folder\\file.csv") + + +# endregion + +# region Execution + + +def _write_read_from_env_matching(prefix): + q = 0 + for variable, base_url in os.environ.items(): + if variable.upper().startswith(prefix.upper()): + fmsg = "\n {}: {} -> ".format(variable, base_url) + print(fmsg, file=sys.stderr, end="") + _write_read_into_url(base_url) + print("DONE ", file=sys.stderr, end="") + q += 1 + if q < 1: + msg = """SKIPPED + For testing remote source define a environment variable: + $ export PETL_TEST_='://myuser:mypassword@host:port/path/to/file.ext'""" + print(msg, file=sys.stderr) + + +def _write_read_from_env_url(env_var_name): + base_url = os.getenv(env_var_name, "skip") + if base_url == "skip": + print("SKIPPED ", file=sys.stderr, end="") + else: + _write_read_into_url(base_url) + print("DONE ", file=sys.stderr, end="") + + +def _write_read_into_url(base_url): + _write_read_file_into_url(base_url, "filename1.csv") + _write_read_file_into_url(base_url, "filename2.avro") + _write_read_file_into_url(base_url, "filename3.csv", "gz") + _write_read_file_into_url(base_url, "filename4.avro", "gz") + _write_read_file_into_url(base_url, "filename5.csv", "xz") + _write_read_file_into_url(base_url, "filename6.csv", "zst") + _write_read_file_into_url(base_url, "filename7.csv", "lz4") + _write_read_file_into_url(base_url, "filename8.csv", "snappy") + + +def _write_read_file_into_url(base_url, filename, compression=None): + if ".avro" in filename and not _has_avro: + return + is_local = base_url.startswith("./") + if compression is not None: + if is_local: + return + filename = filename + "." + compression + codec = fsspec.utils.infer_compression(filename) + if codec is None: + print("\n - %s SKIPPED " % filename, file=sys.stderr, end="") + return + print("\n - %s " % filename, file=sys.stderr, end="") + + if is_local: + source_url = base_url + filename + else: + source_url = os.path.join(base_url, filename) + + _show__rows_from("Expected:", _table) + + if ".avro" in filename: + toavro(_table, source_url) + actual = fromavro(source_url) + else: + tocsv(_table, source_url, encoding="ascii", lineterminator="\n") + actual = fromcsv(source_url, encoding="ascii") + + _show__rows_from("Actual:", actual) + ieq(_table, actual) + ieq(_table, actual) # verify can iterate twice + + +def _show__rows_from(label, test_rows, limit=0): + print(label) + print(look(test_rows, limit=limit)) + + +def _test_avro_too(): + try: + import fastavro + + return True + except: + return False + + +# endregion + +# region Mockup data + +_has_avro = _test_avro_too() + +_table = ( + (u"name", u"friends", u"age"), + (u"Bob", "42", "33"), + (u"Jim", "13", "69"), + (u"Joe", "86", "17"), + (u"Ted", "23", "51"), +) + +# endregion diff --git a/petl/test/io/test_source.py b/petl/test/io/test_source.py deleted file mode 100644 index 3beaa929..00000000 --- a/petl/test/io/test_source.py +++ /dev/null @@ -1,114 +0,0 @@ -# -*- coding: utf-8 -*- -from __future__ import absolute_import, print_function, division - -import sys -import os - -from petl.compat import PY3 -from petl.test.helpers import ieq, eq_ -from petl.io.avro import fromavro, toavro -from petl.io.csv import fromcsv, tocsv -from petl.util.vis import look - -from petl.io.source.smb import _parse_smb_url - -# region Codec test cases - -try: - import s3fs -except ImportError as e: - print('SKIP S3 helper tests: %s' % e, file=sys.stderr) -else: - def test_helper_s3(): - _write_read_from_url('PETL_S3_URL', "export PETL_S3_URL='s3://mybucket/path/folder'") - -try: - import smbclient -except ImportError as e: - print('SKIP SMB helper tests: %s' % e, file=sys.stderr) -else: - def test_helper_smb(): - _write_read_from_url('PETL_SMB_URL', "export PETL_SMB_URL='smb://DOMAIN;myuserID:mypassword@host/share'") - - def test_helper_smb_url_parse(): - url = r'smb://workgroup;user:password@server:444/share/folder/file.csv' - domain, host, port, user, passwd, server_path = _parse_smb_url(url) - print("Parsed:", domain, host, port, user, passwd, server_path) - eq_(domain, r'workgroup') - eq_(host, r'server') - eq_(port, 444) - eq_(user, r'user') - eq_(passwd, r'password') - eq_(server_path, "\\\\server\\share\\folder\\file.csv") - -# endregion - -# region Execution - -def _write_read_from_url(env_var_name, example): - - base_url = os.getenv(env_var_name, 'skip') - - csv_url = os.path.join(base_url, 'filename1.csv') - gzc_url = os.path.join(base_url, 'filename3.csv.gz') - gza_url = os.path.join(base_url, 'filename4.avro.gz') - avr_url = os.path.join(base_url, 'filename2.avro') - - _table = ( (u'name', u'friends', u'age'), - (u'Bob', '42', '33'), - (u'Jim', '13', '69'), - (u'Joe', '86', '17'), - (u'Ted', '23', '51')) - - _show__rows_from("Expected:", _table) - has_avro = _test_avro_too() - - if base_url == 'skip': - m = "# Skipping test because env var '{}' is not defined. Try this:\n$ {}" - msg = m.format(env_var_name, example) - print(msg) - return - - tocsv(_table, csv_url, encoding='ascii', lineterminator='\n') - if PY3: - tocsv(_table, gzc_url, encoding='ascii', lineterminator='\n') - if has_avro: - toavro(_table, avr_url) - if PY3: - toavro(_table, gza_url) - - csv_actual = fromcsv(csv_url, encoding='ascii') - if PY3: - gzp_actual = fromcsv(gzc_url, encoding='ascii') - if has_avro: - avr_actual = fromavro(avr_url) - if PY3: - gza_actual = fromavro(gza_url) - - _show__rows_from("Actual:", csv_url) - - ieq(_table, csv_actual) - ieq(_table, csv_actual) # verify can iterate twice - if PY3: - ieq(_table, gzp_actual) - ieq(_table, gzp_actual) # verify can iterate twice - if has_avro: - ieq(_table, avr_actual) - ieq(_table, avr_actual) # verify can iterate twice - if PY3: - ieq(_table, gza_actual) - ieq(_table, gza_actual) # verify can iterate twice - - -def _show__rows_from(label, test_rows, limit=0): - print(label) - print(look(test_rows, limit=limit)) - -def _test_avro_too(): - try: - import fastavro - return True - except: - return False - -# endregion