diff --git a/docs/changes.rst b/docs/changes.rst index 1fe4ab71..80e6193f 100644 --- a/docs/changes.rst +++ b/docs/changes.rst @@ -1,6 +1,22 @@ Changes ======= +Version 1.6.0 +------------- + +* Added class :class:`petl.io.remotes.RemoteSource` using package **fsspec** + for reading and writing files in remote servers by using the protocol in the + url for selecting the implementation. + By :user:`juarezr`, :issue:`494`. + +* Removed classes :class:`petl.io.source.s3.S3Source` as it's handled by fsspec + By :user:`juarezr`, :issue:`494`. + +* Removed classes :class:`petl.io.codec.xz.XZCodec`, :class:`petl.io.codec.xz.LZ4Codec` + and :class:`petl.io.codec.zstd.ZstandardCodec` as it's handled by fsspec. + By :user:`juarezr`, :issue:`494`. + + Version 1.5.0 ------------- diff --git a/docs/io.rst b/docs/io.rst index d725691b..a28bfed4 100644 --- a/docs/io.rst +++ b/docs/io.rst @@ -359,13 +359,8 @@ The behaviour of each source can usually be configured by passing arguments to the constructor, see the source code of the :mod:`petl.io.sources` module for full details. -.. autoclass:: petl.io.sources.FileSource -.. autoclass:: petl.io.sources.GzipSource -.. autoclass:: petl.io.sources.BZ2Source -.. autoclass:: petl.io.sources.ZipSource .. autoclass:: petl.io.sources.StdinSource .. autoclass:: petl.io.sources.StdoutSource -.. autoclass:: petl.io.sources.URLSource .. autoclass:: petl.io.sources.MemorySource .. autoclass:: petl.io.sources.PopenSource @@ -383,26 +378,23 @@ in :ref:`Extract ` and :ref:`Load `. It's possible to read and write just by prefixing the protocol (e.g: `s3://`) in the source path of the file. -.. autoclass:: petl.io.source.s3.S3Source -.. autoclass:: petl.io.source.smb.SMBSource +.. autoclass:: petl.io.remotes.RemoteSource +.. autoclass:: petl.io.remotes.SMBSource -.. _io_codecs: +.. _io_deprecated: -Compression I/O helper classes ------------------------------- +Deprecated I/O helper classes +----------------------------- -The following classes are helpers for decompressing (``from...()``) and -compressing (``to...()``) in functions transparently as a file-like source. +The following helpers are deprecated and will be removed in a future version. -There are no need to instantiate them. They are used in the mecanism described -in :ref:`Extract ` and :ref:`Load `. +It's functionality was replaced by helpers in :ref:`Remote helpers `. -It's possible to compress and decompress just by specifying the file extension -(e.g: `.csv.xz`) in end of the source filename. - -.. autoclass:: petl.io.codec.xz.XZCodec -.. autoclass:: petl.io.codec.zstd.ZstandardCodec -.. autoclass:: petl.io.codec.lz4.LZ4Codec +.. autoclass:: petl.io.sources.FileSource +.. autoclass:: petl.io.sources.GzipSource +.. autoclass:: petl.io.sources.BZ2Source +.. autoclass:: petl.io.sources.ZipSource +.. autoclass:: petl.io.sources.URLSource .. _io_custom_helpers: @@ -410,11 +402,10 @@ Custom I/O helper classes ------------------------------ For creating custom helpers for :ref:`remote I/O ` or -:ref:`compression ` use the following functions: +`compression` use the following functions: .. autofunction:: petl.io.sources.register_reader .. autofunction:: petl.io.sources.register_writer -.. autofunction:: petl.io.sources.register_codec See the source code of the classes in :mod:`petl.io.sources` module for more details. diff --git a/petl/io/__init__.py b/petl/io/__init__.py index b89e6aab..28497317 100644 --- a/petl/io/__init__.py +++ b/petl/io/__init__.py @@ -39,3 +39,7 @@ from petl.io.avro import fromavro, toavro, appendavro from petl.io.sources import register_codec, register_reader, register_writer + +from petl.io.remotes import RemoteSource + +from petl.io.remotes import SMBSource diff --git a/petl/io/codec/__init__.py b/petl/io/codec/__init__.py deleted file mode 100644 index 28411f32..00000000 --- a/petl/io/codec/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -from __future__ import absolute_import, print_function, division - -from petl.io.codec.zstd import ZstandardCodec - -from petl.io.codec.lz4 import LZ4Codec - -from petl.io.codec.xz import XZCodec diff --git a/petl/io/codec/lz4.py b/petl/io/codec/lz4.py deleted file mode 100644 index 035ccc02..00000000 --- a/petl/io/codec/lz4.py +++ /dev/null @@ -1,50 +0,0 @@ -# -*- coding: utf-8 -*- -from __future__ import absolute_import, print_function, division - -from contextlib import contextmanager - -from petl.io.sources import register_codec - - -class LZ4Codec(object): - ''' - Allows compressing and decompressing .lz4 files - - `LZ4`_ is lossless compression algorithm, providing compression - speed greather than 500 MB/s per core (>0.15 Bytes/cycle). It features an - extremely fast decoder, with speed in multiple GB/s per core (~1Byte/cycle) - - .. note:: - - For working this codec require `python-lz4`_ to be installed, e.g.:: - - $ pip install lz4 - - .. versionadded:: 1.5.0 - - .. _python-lz4: https://github.com/python-lz4/python-lz4 - .. _LZ4: http://www.lz4.org - ''' - - def __init__(self, filename, **kwargs): - self.filename = filename - self.kwargs = kwargs - - def open_file(self, mode='rb'): - import lz4.frame - source = lz4.frame.open(self.filename, mode=mode, **self.kwargs) - return source - - @contextmanager - def open(self, mode='r'): - mode2 = mode[:1] + r'b' # python2 - source = self.open_file(mode=mode2) - try: - yield source - finally: - source.close() - - -register_codec('.lz4', LZ4Codec) - -# end # diff --git a/petl/io/codec/xz.py b/petl/io/codec/xz.py deleted file mode 100644 index d6192a1a..00000000 --- a/petl/io/codec/xz.py +++ /dev/null @@ -1,38 +0,0 @@ -# -*- coding: utf-8 -*- -from __future__ import absolute_import, print_function, division - -from contextlib import contextmanager - -from petl.io.sources import register_codec - -class XZCodec(object): - ''' - Allows compressing and decompressing .xz files compressed with `lzma`_. - - .. versionadded:: 1.5.0 - - .. _lzma: https://docs.python.org/3/library/lzma.html - ''' - - def __init__(self, filename, **kwargs): - self.filename = filename - self.kwargs = kwargs - - def open_file(self, mode='rb'): - import lzma - source = lzma.open(self.filename, mode=mode, **self.kwargs) - return source - - @contextmanager - def open(self, mode='r'): - mode2 = mode[:1] + r'b' # python2 - source = self.open_file(mode=mode2) - try: - yield source - finally: - source.close() - - -register_codec('.xz', XZCodec) - -# end # diff --git a/petl/io/codec/zstd.py b/petl/io/codec/zstd.py deleted file mode 100644 index 6e6c0a07..00000000 --- a/petl/io/codec/zstd.py +++ /dev/null @@ -1,58 +0,0 @@ -# -*- coding: utf-8 -*- -from __future__ import absolute_import, print_function, division - -import io -from contextlib import contextmanager - -from petl.io.sources import register_codec - - -class ZstandardCodec(object): - ''' - Allows compressing and decompressing .zstd files - - `Zstandard`_ is a real-time compression algorithm, providing - high compression ratios. It offers a very wide range of compression / speed - trade-off, while being backed by a very fast decoder. - - .. note:: - - For working this codec require `zstd`_ to be installed, e.g.:: - - $ pip install zstandard - - .. versionadded:: 1.5.0 - - .. _zstd: https://github.com/indygreg/python-zstandard - .. _Zstandard: http://www.zstd.net - ''' - - def __init__(self, filename, **kwargs): - self.filename = filename - self.kwargs = kwargs - - def open_file(self, mode='rb'): - import zstandard as zstd - if mode.startswith('r'): - cctx = zstd.ZstdDecompressor(**self.kwargs) - compressed = io.open(self.filename, mode) - source = cctx.stream_reader(compressed) - else: - cctx = zstd.ZstdCompressor(**self.kwargs) - uncompressed = io.open(self.filename, mode) - source = cctx.stream_writer(uncompressed) - return source - - @contextmanager - def open(self, mode='r'): - mode2 = mode[:1] + r'b' # python2 - source = self.open_file(mode=mode2) - try: - yield source - finally: - source.close() - - -register_codec('.zst', ZstandardCodec) - -# end # diff --git a/petl/io/remotes.py b/petl/io/remotes.py new file mode 100644 index 00000000..c7041fcb --- /dev/null +++ b/petl/io/remotes.py @@ -0,0 +1,245 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, print_function, division + +import io +import logging +from contextlib import contextmanager + +from petl.compat import PY3 +from petl.io.sources import register_reader, register_writer + +logger = logging.getLogger(__name__) + +# region RemoteSource + + +class RemoteSource(object): + """Read or write directly from files in remote filesystems. + + This source handles many filesystems that are selected based on the + protocol passed in the `url` argument. + + The url should be specified in `to..()` and `from...()` functions. E.g.:: + + >>> import petl as etl + >>> + >>> def example_s3(): + ... url = 's3://mybucket/prefix/to/myfilename.csv' + ... data = b'foo,bar\\na,1\\nb,2\\nc,2\\n' + ... + ... etl.tocsv(data, url) + ... tbl = etl.fromcsv(url) + ... + >>> example_s3() # doctest: +SKIP + +-----+-----+ + | foo | bar | + +=====+=====+ + | 'a' | '1' | + +-----+-----+ + | 'b' | '2' | + +-----+-----+ + | 'c' | '2' | + +-----+-----+ + + This source uses `fsspec`_ to provide the data transfer with the remote + filesystem. Check the `Built-in Implementations `_ for available + remote implementations. + + Some filesystem can use `URL chaining `_ for compound I/O. + + .. note:: + + For working this source require `fsspec`_ to be installed, e.g.:: + + $ pip install fsspec + + Some remote filesystems require aditional packages to be installed. + Check `Known Implementations `_ for checking what packages + need to be installed, e.g.:: + + $ pip install s3fs # AWS S3 + $ pip install gcsfs # Google Cloud Storage + $ pip install adlfs # Azure Blob service + $ pip install paramiko # SFTP + $ pip install requests # HTTP, github + + .. versionadded:: 1.6.0 + + .. _fsspec: https://filesystem-spec.readthedocs.io/en/latest/ + .. _fs_builtin: https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations + .. _fs_known: https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations + .. _fs_chain: https://filesystem-spec.readthedocs.io/en/latest/features.html#url-chaining + """ + + def __init__(self, url, **kwargs): + self.url = url + self.kwargs = kwargs + + def open_file(self, mode="rb"): + import fsspec + + fs = fsspec.open(self.url, mode=mode, compression='infer', **self.kwargs) + return fs + + @contextmanager + def open(self, mode="rb"): + mode2 = mode[:1] + r"b" # python2 + fs = self.open_file(mode=mode2) + with fs as source: + yield source + + +# registering filesystems with packages installed + + +def _register_filesystems(only_available=False): + """Search for python packages supporting remote filesystems.""" + + from fsspec.registry import known_implementations + + impls = known_implementations.items() + q = 0 + for protocol, spec in impls: + # use the available for for compatibility reasons until next spring cleaning + if protocol.startswith("http"): + continue + if "err" in spec: + emsg = "# WARN: fsspec {} unavailable: {}".format(protocol, spec["err"]) + logger.warning(emsg) + if only_available: + continue + register_reader(protocol, RemoteSource) + register_writer(protocol, RemoteSource) + q += 1 + logger.debug("# fsspec: registered {} providers".format(q)) + + +def _try_register_filesystems(): + try: + import fsspec + except ImportError as ie: + logger.warning("# Missing fsspec package. Install with: pip install fsspec") + else: + try: + _register_filesystems() + except Exception as ex: + raise Exception("# ERROR: failed to register fsspec filesystems", ex) + + +if PY3: + _try_register_filesystems() + +# endregion + +# region SMBSource + + +class SMBSource(object): + """Downloads or uploads to Windows and Samba network drives. E.g.:: + + >>> def example_smb(): + ... import petl as etl + ... url = 'smb://user:password@server/share/folder/file.csv' + ... data = b'foo,bar\\na,1\\nb,2\\nc,2\\n' + ... etl.tocsv(data, url) + ... tbl = etl.fromcsv(url) + ... + >>> example_smb() # doctest: +SKIP + +-----+-----+ + | foo | bar | + +=====+=====+ + | 'a' | '1' | + +-----+-----+ + | 'b' | '2' | + +-----+-----+ + | 'c' | '2' | + +-----+-----+ + + The argument `url` (str) must have a URI with format: + `smb://workgroup;user:password@server:port/share/folder/file.csv`. + + Note that you need to pass in a valid hostname or IP address for the host + component of the URL. Do not use the Windows/NetBIOS machine name for the + host component. + + The first component of the path in the URL points to the name of the shared + folder. Subsequent path components will point to the directory/folder/file. + + .. note:: + + For working this source require `smbprotocol`_ to be installed, e.g.:: + + $ pip install smbprotocol[kerberos] + + .. versionadded:: 1.5.0 + + .. _smbprotocol: https://github.com/jborean93/smbprotocol#requirements + """ + + def __init__(self, url, **kwargs): + self.url = url + self.kwargs = kwargs + + @contextmanager + def open(self, mode="rb"): + mode2 = mode[:1] + r"b" # python2 + source = _open_file_smbprotocol(self.url, mode=mode2, **self.kwargs) + try: + yield source + finally: + source.close() + + +def _open_file_smbprotocol(url, mode="rb", **kwargs): + + domain, host, port, user, passwd, server_path = _parse_smb_url(url) + import smbclient + + try: + # register the server with explicit credentials + if user: + session = smbclient.register_session( + host, username=user, password=passwd, port=port + ) + # Read an existing file as bytes + mode2 = mode[:1] + r"b" + filehandle = smbclient.open_file(server_path, mode=mode2, **kwargs) + return filehandle + + except Exception as ex: + raise ConnectionError("SMB error: %s" % ex).with_traceback(sys.exc_info()[2]) + + +def _parse_smb_url(url): + e = "SMB url must be smb://workgroup;user:password@server:port/share/folder/file.txt: " + + if not url: + raise ValueError("SMB error: no host given") + elif not url.startswith("smb://"): + raise ValueError(e + url) + + if PY3: + from urllib.parse import urlparse + else: + from urlparse import urlparse + parsed = urlparse(url) + if not parsed.path: + raise ValueError(e + url) + + unc_path = parsed.path.replace("/", "\\") + server_path = "\\\\{}{}".format(parsed.hostname, unc_path) + + if not parsed.username: + domain, username = None + elif ";" in parsed.username: + domain, username = parsed.username.split(";") + else: + domain, username = None, parsed.username + port = 445 if not parsed.port else int(parsed.port) + return domain, parsed.hostname, port, username, parsed.password, server_path + + +register_reader("smb", SMBSource) +register_writer("smb", SMBSource) + +# endregion diff --git a/petl/io/source/__init__.py b/petl/io/source/__init__.py deleted file mode 100644 index 3aaaacbe..00000000 --- a/petl/io/source/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from __future__ import absolute_import, print_function, division - -from petl.io.source.s3 import S3Source - -from petl.io.source.smb import SMBSource diff --git a/petl/io/source/s3.py b/petl/io/source/s3.py deleted file mode 100644 index 2eb59b62..00000000 --- a/petl/io/source/s3.py +++ /dev/null @@ -1,67 +0,0 @@ -# -*- coding: utf-8 -*- -from __future__ import absolute_import, print_function, division - -import io -from petl.io.sources import register_reader, register_writer -from contextlib import contextmanager - - -class S3Source(object): - '''Downloads or uploads to AWS S3 filesystem. E.g.:: - - >>> def example_s3(): - ... import petl as etl - ... url = 's3://mybucket/prefix/to/myfilename.csv' - ... data = b'foo,bar\\na,1\\nb,2\\nc,2\\n' - ... etl.tocsv(data, url) - ... tbl = etl.fromcsv(url) - ... - >>> example_s3() # doctest: +SKIP - +-----+-----+ - | foo | bar | - +=====+=====+ - | 'a' | '1' | - +-----+-----+ - | 'b' | '2' | - +-----+-----+ - | 'c' | '2' | - +-----+-----+ - - .. note:: - - For working this source require `s3fs`_ to be installed, e.g.:: - - $ pip install s3fs - - It is strongly recommended that you open files in binary mode. - - For authentication check `credentials`_. - - .. versionadded:: 1.5.0 - - .. _s3fs: https://github.com/dask/s3fs/ - .. _credentials: https://s3fs.readthedocs.io/en/latest/#credentials - ''' - - def __init__(self, url, **kwargs): - self.url = url - self.kwargs = kwargs - - def open_file(self, mode='rb'): - import s3fs - fs = s3fs.S3FileSystem() - source = fs.open(self.url, mode=mode, **self.kwargs) - return source - - @contextmanager - def open(self, mode='rb'): - mode2 = mode[:1] + r'b' # python2 - source = self.open_file(mode=mode2) - try: - yield source - finally: - source.close() - - -register_reader('s3', S3Source) -register_writer('s3', S3Source) diff --git a/petl/io/source/smb.py b/petl/io/source/smb.py deleted file mode 100644 index 8cdd4efb..00000000 --- a/petl/io/source/smb.py +++ /dev/null @@ -1,120 +0,0 @@ -# -*- coding: utf-8 -*- -from __future__ import absolute_import, print_function, division - -import sys -from contextlib import contextmanager - -from petl.compat import PY3 -from petl.io.sources import register_reader, register_writer - - -class SMBSource(object): - '''Downloads or uploads to Windows and Samba network drives. E.g.:: - - >>> def example_smb(): - ... import petl as etl - ... url = 'smb://user:password@server/share/folder/file.csv' - ... data = b'foo,bar\\na,1\\nb,2\\nc,2\\n' - ... etl.tocsv(data, url) - ... tbl = etl.fromcsv(url) - ... - >>> example_smb() # doctest: +SKIP - +-----+-----+ - | foo | bar | - +=====+=====+ - | 'a' | '1' | - +-----+-----+ - | 'b' | '2' | - +-----+-----+ - | 'c' | '2' | - +-----+-----+ - - The argument `url` (str) must have a URI with format: - `smb://workgroup;user:password@server:port/share/folder/file.csv`. - - Note that you need to pass in a valid hostname or IP address for the host - component of the URL. Do not use the Windows/NetBIOS machine name for the - host component. - - The first component of the path in the URL points to the name of the shared - folder. Subsequent path components will point to the directory/folder/file. - - .. note:: - - For working this source require `smbprotocol`_ to be installed, e.g.:: - - $ pip install smbprotocol[kerberos] - - .. versionadded:: 1.5.0 - - .. _smbprotocol: https://github.com/jborean93/smbprotocol#requirements - ''' - - def __init__(self, url, **kwargs): - self.url = url - self.kwargs = kwargs - - @contextmanager - def open(self, mode='rb'): - mode2 = mode[:1] + r'b' # python2 - source = _open_file_smbprotocol(self.url, mode=mode2, **self.kwargs) - try: - yield source - finally: - source.close() - -# region SMBHandler - - -def _open_file_smbprotocol(url, mode='rb', **kwargs): - - domain, host, port, user, passwd, server_path = _parse_smb_url(url) - import smbclient - try: - # register the server with explicit credentials - if user: - session = smbclient.register_session( - host, username=user, password=passwd, port=port) - # Read an existing file as bytes - mode2 = mode[:1] + r'b' - filehandle = smbclient.open_file(server_path, mode=mode2, **kwargs) - return filehandle - - except Exception as ex: - raise ConnectionError('SMB error: %s' % - ex).with_traceback(sys.exc_info()[2]) - - -def _parse_smb_url(url): - e = 'SMB url must be smb://workgroup;user:password@server:port/share/folder/file.txt: ' - - if not url: - raise ValueError('SMB error: no host given') - elif not url.startswith('smb://'): - raise ValueError(e + url) - - if PY3: - from urllib.parse import urlparse - else: - from urlparse import urlparse - parsed = urlparse(url) - if not parsed.path: - raise ValueError(e + url) - - unc_path = parsed.path.replace("/", "\\") - server_path = "\\\\{}{}".format(parsed.hostname, unc_path) - - if not parsed.username: - domain, username = None - elif ';' in parsed.username: - domain, username = parsed.username.split(';') - else: - domain, username = None, parsed.username - port = 555 if not parsed.port else int(parsed.port) - return domain, parsed.hostname, port, username, parsed.password, server_path - -# endregion - - -register_reader('smb', SMBSource) -register_writer('smb', SMBSource) diff --git a/petl/io/sources.py b/petl/io/sources.py index 423b1252..6736014d 100644 --- a/petl/io/sources.py +++ b/petl/io/sources.py @@ -411,7 +411,7 @@ def _get_handler_from(source, handlers): return None -def _resolve_source_from_arg(source, handlers, sync_mode): +def _resolve_source_from_arg(source, handlers): if source is None: return StdinSource() elif isinstance(source, string_types): @@ -420,12 +420,9 @@ def _resolve_source_from_arg(source, handlers, sync_mode): if handler is None: if codec is not None: return codec(source) + assert '://' not in source, _invalid_source_msg % source return FileSource(source) - io_handler = handler(source) - if codec is None: - return io_handler - handler = CompressedSource(io_handler, codec) - return handler + return handler(source) else: assert (hasattr(source, 'open') and callable(getattr(source, 'open'))), \ @@ -434,8 +431,8 @@ def _resolve_source_from_arg(source, handlers, sync_mode): def read_source_from_arg(source): - return _resolve_source_from_arg(source, _READERS, 'rb') + return _resolve_source_from_arg(source, _READERS) def write_source_from_arg(source, mode='wb'): - return _resolve_source_from_arg(source, _WRITERS, mode) + return _resolve_source_from_arg(source, _WRITERS) diff --git a/petl/test/io/test_codec.py b/petl/test/io/test_codec.py deleted file mode 100644 index ff28149c..00000000 --- a/petl/test/io/test_codec.py +++ /dev/null @@ -1,93 +0,0 @@ -# -*- coding: utf-8 -*- -from __future__ import absolute_import, print_function, division - -import sys -import os -from tempfile import NamedTemporaryFile - -from petl.compat import PY3 -from petl.test.helpers import ieq -from petl.io.avro import fromavro, toavro -from petl.io.csv import fromcsv, tocsv -from petl.util.vis import look - -# region Codec test cases - -try: - import lzma -except ImportError as e: - print('SKIP XZ helper tests: %s' % e, file=sys.stderr) -else: - def test_helper_xz(): - _write_read_with_codec('.xz') - -try: - import lz4.frame -except ImportError as e: - print('SKIP LZ4 helper tests: %s' % e, file=sys.stderr) -else: - def test_helper_lz4(): - _write_read_with_codec('.lz4') - -try: - import zstandard as zstd -except ImportError as e: - print('SKIP ZSTANDARD helper tests: %s' % e, file=sys.stderr) -else: - def test_helper_zstd(): - _write_read_with_codec('.zstd') - -# endregion - -# region Execution - - -def _write_read_with_codec(file_ext): - - _table = ((u'name', u'friends', u'age'), - (u'Bob', '42', '33'), - (u'Jim', '13', '69'), - (u'Joe', '86', '17'), - (u'Ted', '23', '51')) - - _show__rows_from("Expected:", _table) - - has_avro = _test_avro_too() - - compressed_csv = _get_temp_file_for('.csv' + file_ext) - compressed_avr = _get_temp_file_for('.avro' + file_ext) - - tocsv(_table, compressed_csv, encoding='ascii', lineterminator='\n') - if has_avro: - toavro(_table, compressed_avr) - - csv_actual = fromcsv(compressed_csv, encoding='ascii') - if has_avro: - avr_actual = fromavro(compressed_avr) - - _show__rows_from("Actual:", csv_actual) - - ieq(_table, csv_actual) - ieq(_table, csv_actual) # verify can iterate twice - if has_avro: - ieq(_table, avr_actual) - ieq(_table, avr_actual) # verify can iterate twice - - -def _get_temp_file_for(file_ext): - with NamedTemporaryFile(delete=False, mode='wb') as fo: - return fo.name + file_ext - - -def _show__rows_from(label, test_rows, limit=0): - print(label) - print(look(test_rows, limit=limit)) - -def _test_avro_too(): - try: - import fastavro - return True - except: - return False - -# endregion diff --git a/petl/test/io/test_remotes.py b/petl/test/io/test_remotes.py new file mode 100644 index 00000000..9ccf46e2 --- /dev/null +++ b/petl/test/io/test_remotes.py @@ -0,0 +1,157 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, print_function, division + +import sys +import os + +from petl.compat import PY3 +from petl.test.helpers import ieq, eq_ +from petl.io.avro import fromavro, toavro +from petl.io.csv import fromcsv, tocsv +from petl.util.vis import look + +# region Codec test cases + + +def test_helper_local(): + if PY3: + _write_read_into_url("./example.") + + +try: + import fsspec +except ImportError as e: + print("SKIP FSSPEC helper tests: %s" % e, file=sys.stderr) +else: + + def test_helper_fsspec(): + # _write_read_from_env_url('PETL_TEST_S3') + _write_read_from_env_matching("PETL_TEST_") + + +try: + import smbclient +except ImportError as e: + print("SKIP SMB helper tests: %s" % e, file=sys.stderr) +else: + + def test_helper_smb(): + _write_read_from_env_url("PETL_SMB_URL") + + def test_helper_smb_url_parse(): + from petl.io.remotes import _parse_smb_url + + url = r"smb://workgroup;user:password@server:444/share/folder/file.csv" + domain, host, port, user, passwd, server_path = _parse_smb_url(url) + print("Parsed:", domain, host, port, user, passwd, server_path) + eq_(domain, r"workgroup") + eq_(host, r"server") + eq_(port, 444) + eq_(user, r"user") + eq_(passwd, r"password") + eq_(server_path, "\\\\server\\share\\folder\\file.csv") + + +# endregion + +# region Execution + + +def _write_read_from_env_matching(prefix): + q = 0 + for variable, base_url in os.environ.items(): + if variable.upper().startswith(prefix.upper()): + fmsg = "\n {}: {} -> ".format(variable, base_url) + print(fmsg, file=sys.stderr, end="") + _write_read_into_url(base_url) + print("DONE ", file=sys.stderr, end="") + q += 1 + if q < 1: + msg = """SKIPPED + For testing remote source define a environment variable: + $ export PETL_TEST_='://myuser:mypassword@host:port/path/to/file.ext'""" + print(msg, file=sys.stderr) + + +def _write_read_from_env_url(env_var_name): + base_url = os.getenv(env_var_name, "skip") + if base_url == "skip": + print("SKIPPED ", file=sys.stderr, end="") + else: + _write_read_into_url(base_url) + print("DONE ", file=sys.stderr, end="") + + +def _write_read_into_url(base_url): + _write_read_file_into_url(base_url, "filename1.csv") + _write_read_file_into_url(base_url, "filename2.avro") + _write_read_file_into_url(base_url, "filename3.csv", "gz") + _write_read_file_into_url(base_url, "filename4.avro", "gz") + _write_read_file_into_url(base_url, "filename5.csv", "xz") + _write_read_file_into_url(base_url, "filename6.csv", "zst") + _write_read_file_into_url(base_url, "filename7.csv", "lz4") + _write_read_file_into_url(base_url, "filename8.csv", "snappy") + + +def _write_read_file_into_url(base_url, filename, compression=None): + if ".avro" in filename and not _has_avro: + return + is_local = base_url.startswith("./") + if compression is not None: + if is_local: + return + filename = filename + "." + compression + codec = fsspec.utils.infer_compression(filename) + if codec is None: + print("\n - %s SKIPPED " % filename, file=sys.stderr, end="") + return + print("\n - %s " % filename, file=sys.stderr, end="") + + if is_local: + source_url = base_url + filename + else: + source_url = os.path.join(base_url, filename) + + _show__rows_from("Expected:", _table) + + if ".avro" in filename: + toavro(_table, source_url) + actual = fromavro(source_url) + else: + tocsv(_table, source_url, encoding="ascii", lineterminator="\n") + actual = fromcsv(source_url, encoding="ascii") + + _show__rows_from("Actual:", actual) + ieq(_table, actual) + ieq(_table, actual) # verify can iterate twice + + +def _show__rows_from(label, test_rows, limit=0): + print(label) + print(look(test_rows, limit=limit)) + + +def _test_avro_too(): + try: + import fastavro + + return True + except: + return False + + +# endregion + +# region Mockup data + +_has_avro = _test_avro_too() + +_table = ( + (u"name", u"friends", u"age"), + (u"Bob", "42", "33"), + (u"Jim", "13", "69"), + (u"Joe", "86", "17"), + (u"Ted", "23", "51"), +) + +# endregion diff --git a/petl/test/io/test_source.py b/petl/test/io/test_source.py deleted file mode 100644 index 3beaa929..00000000 --- a/petl/test/io/test_source.py +++ /dev/null @@ -1,114 +0,0 @@ -# -*- coding: utf-8 -*- -from __future__ import absolute_import, print_function, division - -import sys -import os - -from petl.compat import PY3 -from petl.test.helpers import ieq, eq_ -from petl.io.avro import fromavro, toavro -from petl.io.csv import fromcsv, tocsv -from petl.util.vis import look - -from petl.io.source.smb import _parse_smb_url - -# region Codec test cases - -try: - import s3fs -except ImportError as e: - print('SKIP S3 helper tests: %s' % e, file=sys.stderr) -else: - def test_helper_s3(): - _write_read_from_url('PETL_S3_URL', "export PETL_S3_URL='s3://mybucket/path/folder'") - -try: - import smbclient -except ImportError as e: - print('SKIP SMB helper tests: %s' % e, file=sys.stderr) -else: - def test_helper_smb(): - _write_read_from_url('PETL_SMB_URL', "export PETL_SMB_URL='smb://DOMAIN;myuserID:mypassword@host/share'") - - def test_helper_smb_url_parse(): - url = r'smb://workgroup;user:password@server:444/share/folder/file.csv' - domain, host, port, user, passwd, server_path = _parse_smb_url(url) - print("Parsed:", domain, host, port, user, passwd, server_path) - eq_(domain, r'workgroup') - eq_(host, r'server') - eq_(port, 444) - eq_(user, r'user') - eq_(passwd, r'password') - eq_(server_path, "\\\\server\\share\\folder\\file.csv") - -# endregion - -# region Execution - -def _write_read_from_url(env_var_name, example): - - base_url = os.getenv(env_var_name, 'skip') - - csv_url = os.path.join(base_url, 'filename1.csv') - gzc_url = os.path.join(base_url, 'filename3.csv.gz') - gza_url = os.path.join(base_url, 'filename4.avro.gz') - avr_url = os.path.join(base_url, 'filename2.avro') - - _table = ( (u'name', u'friends', u'age'), - (u'Bob', '42', '33'), - (u'Jim', '13', '69'), - (u'Joe', '86', '17'), - (u'Ted', '23', '51')) - - _show__rows_from("Expected:", _table) - has_avro = _test_avro_too() - - if base_url == 'skip': - m = "# Skipping test because env var '{}' is not defined. Try this:\n$ {}" - msg = m.format(env_var_name, example) - print(msg) - return - - tocsv(_table, csv_url, encoding='ascii', lineterminator='\n') - if PY3: - tocsv(_table, gzc_url, encoding='ascii', lineterminator='\n') - if has_avro: - toavro(_table, avr_url) - if PY3: - toavro(_table, gza_url) - - csv_actual = fromcsv(csv_url, encoding='ascii') - if PY3: - gzp_actual = fromcsv(gzc_url, encoding='ascii') - if has_avro: - avr_actual = fromavro(avr_url) - if PY3: - gza_actual = fromavro(gza_url) - - _show__rows_from("Actual:", csv_url) - - ieq(_table, csv_actual) - ieq(_table, csv_actual) # verify can iterate twice - if PY3: - ieq(_table, gzp_actual) - ieq(_table, gzp_actual) # verify can iterate twice - if has_avro: - ieq(_table, avr_actual) - ieq(_table, avr_actual) # verify can iterate twice - if PY3: - ieq(_table, gza_actual) - ieq(_table, gza_actual) # verify can iterate twice - - -def _show__rows_from(label, test_rows, limit=0): - print(label) - print(look(test_rows, limit=limit)) - -def _test_avro_too(): - try: - import fastavro - return True - except: - return False - -# endregion