Skip to content

Commit

Permalink
FEAT-modin-project#2451: Reduce PR size
Browse files Browse the repository at this point in the history
Signed-off-by: William Ma <[email protected]>
  • Loading branch information
williamma12 committed Feb 3, 2021
1 parent bebe6f2 commit 04f042f
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 114 deletions.
2 changes: 0 additions & 2 deletions modin/backends/pandas/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ def parse(fname, **kwargs):
else:
# This only happens when we are reading with only one worker (Default)
return pandas.read_csv(fname, **kwargs)

# Set internal index.
if index_col is not None:
index = pandas_df.index
else:
Expand Down
94 changes: 18 additions & 76 deletions modin/engines/base/io/file_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@

import os
import re
from typing import List

from modin.config import Backend

S3_ADDRESS_REGEX = re.compile("[sS]3://(.*?)/(.*)")
Expand Down Expand Up @@ -53,22 +51,9 @@ def _read(cls, *args, **kwargs):
raise NotImplementedError(NOT_IMPLEMENTED_MESSAGE)

@classmethod
def get_path(cls, file_path: str) -> str:
"""
Returns the path of the file(s).
Parameters
----------
file_path: str
String representing a path.
Returns
-------
str
String of strings of absolute file paths.
"""
def get_path(cls, file_path):
if S3_ADDRESS_REGEX.search(file_path):
return cls._s3_path(file_path, False)[0]
return file_path
else:
return os.path.abspath(file_path)

Expand Down Expand Up @@ -131,67 +116,24 @@ def file_size(cls, f):
f.seek(cur_pos, os.SEEK_SET)
return size

@staticmethod
def _s3_path(s3_path: str, glob: bool) -> List[str]:
"""
Get s3 file paths.
Parameters
----------
s3_path: str
String of s3 path.
glob: bool
True if the path is a glob path.
Returns
-------
list[str]
List of s3 files based on the path.
"""
# S3FS does not allow captial S in s3 addresses.
if s3_path[0] == "S":
s3_path = "{}{}".format("s", s3_path[1:])

import s3fs as S3FS
from botocore.exceptions import NoCredentialsError

def get_file_path(fs_handle) -> List[str]:
if not glob:
if fs_handle.exists(s3_path):
return [s3_path]
else:
return []
else:
s3_paths = fs_handle.glob(s3_path)
s3_addresses = ["{}{}".format("s3://", path) for path in s3_paths]
return s3_addresses

s3fs = S3FS.S3FileSystem(anon=False)
try:
return get_file_path(s3fs)
except NoCredentialsError:
pass
s3fs = S3FS.S3FileSystem(anon=True)
return get_file_path(s3fs)

@classmethod
def file_exists(cls, file_path: str) -> bool:
"""
Checks if the file_path leads to an existing file.
Parameters
----------
file_path: str
String representing a path.
Returns
-------
bool
True if the file path is valid.
"""
def file_exists(cls, file_path):
if isinstance(file_path, str):
if S3_ADDRESS_REGEX.search(file_path):
return len(cls._s3_path(file_path, False)) > 0
match = S3_ADDRESS_REGEX.search(file_path)
if match is not None:
if file_path[0] == "S":
file_path = "{}{}".format("s", file_path[1:])
import s3fs as S3FS
from botocore.exceptions import NoCredentialsError

s3fs = S3FS.S3FileSystem(anon=False)
exists = False
try:
exists = s3fs.exists(file_path) or exists
except NoCredentialsError:
pass
s3fs = S3FS.S3FileSystem(anon=True)
return exists or s3fs.exists(file_path)
return os.path.exists(file_path)

@classmethod
Expand Down
27 changes: 9 additions & 18 deletions modin/engines/base/io/text/csv_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,26 @@
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

import csv
from modin.engines.base.io.text.text_file_dispatcher import TextFileDispatcher
from modin.data_management.utils import compute_chunksize
from pandas.io.parsers import _validate_usecols_arg
import pandas
import csv
import sys

from pandas.io.parsers import _validate_usecols_arg
from pandas._typing import FilePathOrBuffer

from modin.config import NPartitions
from modin.data_management.utils import compute_chunksize
from modin.engines.base.io.text.text_file_dispatcher import TextFileDispatcher


class CSVDispatcher(TextFileDispatcher):
@classmethod
def _read(cls, filepath_or_buffer: FilePathOrBuffer, **kwargs):
# Ensures that the file is a string file path. Otherwise, default to pandas.
def _read(cls, filepath_or_buffer, **kwargs):
filepath_or_buffer = cls.get_path_or_buffer(filepath_or_buffer)
if isinstance(filepath_or_buffer, str):
if not cls.file_exists(filepath_or_buffer):
return cls.single_worker_read(filepath_or_buffer, **kwargs)
filepath_or_buffer = cls.get_path(filepath_or_buffer)
elif not cls.pathlib_or_pypath(filepath_or_buffer):
return cls.single_worker_read(filepath_or_buffer, **kwargs)

compression_type = cls.infer_compression(
filepath_or_buffer, kwargs.get("compression")
)
Expand Down Expand Up @@ -62,7 +58,6 @@ def _read(cls, filepath_or_buffer: FilePathOrBuffer, **kwargs):
skiprows = kwargs.get("skiprows")
if skiprows is not None and not isinstance(skiprows, int):
return cls.single_worker_read(filepath_or_buffer, **kwargs)

nrows = kwargs.pop("nrows", None)
names = kwargs.get("names", None)
index_col = kwargs.get("index_col", None)
Expand All @@ -72,7 +67,7 @@ def _read(cls, filepath_or_buffer: FilePathOrBuffer, **kwargs):
# For the sake of the empty df, we assume no `index_col` to get the correct
# column names before we build the index. Because we pass `names` in, this
# step has to happen without removing the `index_col` otherwise it will not
# be assigned correctly.
# be assigned correctly
names = pandas.read_csv(
filepath_or_buffer,
**dict(kwargs, usecols=None, nrows=0, skipfooter=0, index_col=None),
Expand Down Expand Up @@ -118,7 +113,6 @@ def _read(cls, filepath_or_buffer: FilePathOrBuffer, **kwargs):
encoding if encoding is not None else "UTF-8"
)
is_quoting = kwargs.get("quoting", "") != csv.QUOTE_NONE

with cls.file_open(filepath_or_buffer, "rb", compression_type) as f:
# Skip the header since we already have the header information and skip the
# rows we are told to skip.
Expand All @@ -127,14 +121,11 @@ def _read(cls, filepath_or_buffer: FilePathOrBuffer, **kwargs):
skiprows = 0
header = kwargs.get("header", "infer")
if header == "infer" and kwargs.get("names", None) is None:
skip_header = 1
skiprows += 1
elif isinstance(header, int):
skip_header = header + 1
skiprows += header + 1
elif hasattr(header, "__iter__") and not isinstance(header, str):
skip_header = max(header) + 1
else:
skip_header = 0
skiprows += skip_header
skiprows += max(header) + 1
if kwargs.get("encoding", None) is not None:
partition_kwargs["skiprows"] = 1
# Launch tasks to read partitions
Expand Down
53 changes: 42 additions & 11 deletions modin/engines/base/io/text/csv_glob_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ def _read(cls, filepath_or_buffer, **kwargs):
# Ensures that the file is a string file path. Otherwise, default to pandas.
filepath_or_buffer = cls.get_path_or_buffer(filepath_or_buffer)
if isinstance(filepath_or_buffer, str):
if not cls.glob_file_exists(filepath_or_buffer):
if not cls.file_exists(filepath_or_buffer):
return cls.single_worker_read(filepath_or_buffer, **kwargs)
filepath_or_buffer = cls.get_glob_path(filepath_or_buffer)
filepath_or_buffer = cls.get_path(filepath_or_buffer)
elif not cls.pathlib_or_pypath(filepath_or_buffer):
return cls.single_worker_read(filepath_or_buffer, **kwargs)

Expand Down Expand Up @@ -260,27 +260,40 @@ def _read(cls, filepath_or_buffer, **kwargs):
return new_query_compiler

@classmethod
def glob_file_exists(cls, glob_path: str) -> bool:
def file_exists(cls, file_path: str) -> bool:
"""
Checks if the glob_path is valid.
Checks if the file_path is valid.
Parameters
----------
glob_path: str
file_path: str
String representing a path.
Returns
-------
bool
True if the glob path is valid.
"""
if isinstance(glob_path, str):
if S3_ADDRESS_REGEX.search(glob_path):
return len(cls._s3_path(glob_path, True)) > 0
return len(glob.glob(glob_path)) > 0
if isinstance(file_path, str):
match = S3_ADDRESS_REGEX.search(file_path)
if match is not None:
if file_path[0] == "S":
file_path = "{}{}".format("s", file_path[1:])
import s3fs as S3FS
from botocore.exceptions import NoCredentialsError

s3fs = S3FS.S3FileSystem(anon=False)
exists = False
try:
exists = len(s3fs.glob(file_path)) > 0 or exists
except NoCredentialsError:
pass
s3fs = S3FS.S3FileSystem(anon=True)
return exists or len(s3fs.glob(file_path)) > 0
return len(glob.glob(file_path)) > 0

@classmethod
def get_glob_path(cls, file_path: str) -> list:
def get_path(cls, file_path: str) -> list:
"""
Returns the path of the file(s).
Expand All @@ -295,7 +308,25 @@ def get_glob_path(cls, file_path: str) -> list:
List of strings of absolute file paths.
"""
if S3_ADDRESS_REGEX.search(file_path):
return cls._s3_path(file_path, True)
# S3FS does not allow captial S in s3 addresses.
if file_path[0] == "S":
file_path = "{}{}".format("s", file_path[1:])

import s3fs as S3FS
from botocore.exceptions import NoCredentialsError

def get_file_path(fs_handle) -> List[str]:
file_paths = fs_handle.glob(file_path)
s3_addresses = ["{}{}".format("s3://", path) for path in file_paths]
return s3_addresses

s3fs = S3FS.S3FileSystem(anon=False)
try:
return get_file_path(s3fs)
except NoCredentialsError:
pass
s3fs = S3FS.S3FileSystem(anon=True)
return get_file_path(s3fs)
else:
relative_paths = glob.glob(file_path)
abs_paths = [os.path.abspath(path) for path in relative_paths]
Expand Down
12 changes: 5 additions & 7 deletions modin/engines/base/io/text/text_file_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,19 @@
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

from modin.engines.base.io.file_dispatcher import FileDispatcher
import numpy as np
import warnings
import io
import os
from typing import List, Tuple
import warnings

import numpy as np
from pandas._typing import FilePathOrBuffer
from typing import Tuple, List

from modin.config import NPartitions
from modin.engines.base.io.file_dispatcher import FileDispatcher


class TextFileDispatcher(FileDispatcher):
@classmethod
def get_path_or_buffer(cls, filepath_or_buffer: FilePathOrBuffer) -> str:
def get_path_or_buffer(cls, filepath_or_buffer):
"""Given a buffer, try and extract the filepath from it so that we can
use it without having to fall back to Pandas and share file objects between
workers. Given a filepath, return it immediately.
Expand Down

0 comments on commit 04f042f

Please sign in to comment.