Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FileIO, InputFile, and OutputFile abstract base classes #3691

Merged
merged 11 commits into from
Jan 24, 2022
Merged

Add FileIO, InputFile, and OutputFile abstract base classes #3691

merged 11 commits into from
Jan 24, 2022

Conversation

samredai
Copy link
Collaborator

@samredai samredai commented Dec 8, 2021

UPDATE: This has been updated to only include the abstract base classes FileIO, InputFile, and OutputFile. The S3FileIO implementation can be opened in a follow-up PR.


This brings over the FileIO abstraction and includes an S3FileIO implementation. Implementing FileIO requires overriding the __enter__() and __exit__() methods where the __enter__() method sets a byte stream to self.byte_stream.

There's been a few discussions lately around file io and hopefully, this PR helps continue that. I think we should aim to maintain the file io abstraction for all file io operations (metadata files, manifest lists, manifest files, and data files) and allow the flexibility to plug in either an implementation that's packaged with the library or a custom implementation of FileIO that a user brings. An example of how we can do this can be found in PR #3677 in the from_file() method.

This still leaves an open question on how we manage dependencies for all of the implementations. For example, if a user does not plan on using S3FileIO or has their own s3 file io implementation that does not depend on boto3, it should not be forced as a hard dependency.

python/src/iceberg/io/s3.py Outdated Show resolved Hide resolved
python/src/iceberg/io/s3.py Outdated Show resolved Hide resolved
python/src/iceberg/io/s3.py Outdated Show resolved Hide resolved
python/src/iceberg/io/s3.py Outdated Show resolved Hide resolved
python/src/iceberg/io/s3.py Outdated Show resolved Hide resolved
python/src/iceberg/io/s3.py Outdated Show resolved Hide resolved
python/pyproject.toml Outdated Show resolved Hide resolved
python/pyproject.toml Outdated Show resolved Hide resolved
@samredai samredai changed the title Add FileIO abstraction and S3FileIO implementation Add FileIO, InputFile, and OutputFile abstract base classes Dec 13, 2021
@samredai
Copy link
Collaborator Author

Updated this PR to only include the abstract base classes FileIO, InputFile, and OutputFile. The S3FileIO implementation can be opened in a follow-up PR.

@samredai samredai marked this pull request as ready for review December 14, 2021 03:07
@samredai
Copy link
Collaborator Author

samredai commented Jan 4, 2022

As an example of what an implementation of these base classes would look like, I put together an S3FileIO implementation using smart_open to create seekable file-like objects (smart_open.s3.Reader instances) and validated that this can be fed directly into pyarrow. I also validated that smart_open.s3.MultipartWriter instances work as the where argument to pyarrow's write_table methods.

Implementation, s3.py

from iceberg.io.base import FileIO, InputFile, OutputFile
from smart_open import open, parse_uri
import boto3

class S3InputFile(InputFile):

    def __len__(self) -> int:
        return 0
    
    @property
    def exists(self) -> bool:
        try:
            with open(self.location, 'rb') as f:
                pass
        except OSError:
            return False
        return True
    
    def __enter__(self):
        self._stream = open(self.location, 'rb', transport_params={"defer_seek": True})
        return self._stream
    
    def __exit__(self, exc_type, exc_value, exc_traceback):
        self._stream.close()
        return


class S3OutputFile(OutputFile):

    def __call__(self, overwrite: bool = False, **kwargs):
        self._overwrite = overwrite
        return self

    def __len__(self) -> int:
        return 0
    
    @property
    def location(self) -> str:
        """The fully-qualified location of the output file"""
        return self._location

    @property
    def exists(self) -> bool:
        try:
            with open(self.location, 'rb') as f:
                pass
        except OSError:
            return False
        return True
    
    def to_input_file(self) -> S3InputFile:
        return S3InputFile(self.location)

    def __enter__(self):
        if not self._overwrite and self.exists:
            raise FileExistsError(
                f"{self.location} already exists. To overwrite, "
                "set overwrite=True when initializing the S3OutputFile."
            )
            
        self._stream = open(self.location, 'wb')
        return self._stream
    
    def __exit__(self, exc_type, exc_value, exc_traceback):
        self._stream.close()
        return

class S3FileIO(FileIO):
    def new_input(self, location: str):
        return S3InputFile(location=location)

    def new_output(self, location: str, overwrite: bool = False):
        return S3OutputFile(location=location, overwrite=overwrite)

    def delete(self, location: str):
        uri = parse_uri(location)
        s3 = boto3.resource('s3')
        s3.Object(uri.bucket_id, uri.key_id).delete()
        return

example.py

from pyarrow import parquet as pq
from s3 import S3FileIO

f1 = "s3://samstestbucket3412/userdata1.parquet"
f2 = "s3://samstestbucket3412/userdata2.parquet"
file_io = S3FileIO()

# Read f1 (a parquet file)
with file_io.new_input(f1) as f:
    table = pq.read_table(f)

# see output below
print("####################\n")
print(type(table), end="\n\n")
print(table, end="\n\n")
print("####################\n")

# Delete f2 if it exists
file_io.delete(f2)

# Write the pyarrow table to f2
with file_io.new_output(f2) as f:
    pq.write_table(table, f)

# Read the newly written table back in
with file_io.new_input(f2) as f:
    table2 = pq.read_table(f)

# see output below
print("####################\n")
print(type(table), end="\n\n")
print(table, end="\n\n")
print("####################\n")


# Try to write to f2 again without overwrite=True
# with file_io.new_output(f2) as f:
#     pq.write_table(table, f)  # Raises a FileExistsError

# Writing f2 again with overwrite=True
with file_io.new_output(f2, overwrite=True) as f:
    pq.write_table(table2, f)

# Delete f2
file_io.delete(f2)

# Write f2 without setting overwrite since it's been deleted
with file_io.new_output(f2) as f:
    pq.write_table(table2, f)

output:

####################

<class 'pyarrow.lib.Table'>

pyarrow.Table
registration_dttm: timestamp[ns]
id: int32
first_name: string
last_name: string
email: string
gender: string
ip_address: string
cc: string
country: string
birthdate: string
salary: double
title: string
comments: string

####################

####################

<class 'pyarrow.lib.Table'>

pyarrow.Table
registration_dttm: timestamp[ns]
id: int32
first_name: string
last_name: string
email: string
gender: string
ip_address: string
cc: string
country: string
birthdate: string
salary: double
title: string
comments: string

####################

@samredai
Copy link
Collaborator Author

I think as a generic answer looking at what fsspec has done (and having these as separate packages) that the use can install in there environment probably makes sense.

Thanks for pointing out the entry_point mechanism that fsspec uses. I have to take a closer look at it but I really like the idea of the user simply plugging in a custom implementation while we maintain "known implementations" in the main library.

Specifically for S3, if pyarrow is a hard dependency for parquet reading providing reference implementations based off of its file systems (it comes prepackaged with S3) could make sense.

I'm wondering if the FileIO implementations need to be storage-specific. For example, pyarrow, boto, and smartopen all could be used as an implementation for various cloud storage solutions. Instead of having something like PyarrowS3FileIO to differentiate between maybe like a BotoS3FileIO, we could instead do a PyarrowFileIO which can be an entry point to any of the storage io options provided by pyarrow. I don't think this has any implications for this PR in particular so I'll work on updating this asap with the suggestions and we can tackle these other questions in follow-up discussions.

@emkornfield
Copy link
Contributor

I'm wondering if the FileIO implementations need to be storage-specific. For example, pyarrow, boto, and smartopen all could be used as an implementation for various cloud storage solutions. Instead of having something like PyarrowS3FileIO to differentiate between maybe like a BotoS3FileIO, we could instead do a PyarrowFileIO which can be an entry point to any of the storage io options provided by pyarrow. I don't think this has any implications for this PR in particular so I'll work on updating this asap with the suggestions and we can tackle these other questions in follow-up discussions.

Agreed, I was pointing out that for common connection types additonal dependencies can be avoided for a large number of systems if pyarrow is assumed as dependency.

@rdblue
Copy link
Contributor

rdblue commented Jan 20, 2022

I think we will definitely have a mode where pyarrow is used. Certainly if you're reading or writing data, you'd probably want pyarrow. But it isn't unreasonable to have a service that only does metadata interaction and that doesn't need all of pyarrow. It could be done entirely with Python libraries like fastavro, smartopen, and the Iceberg core library.

@emkornfield
Copy link
Contributor

I think we will definitely have a mode where pyarrow is used. Certainly if you're reading or writing data, you'd probably want pyarrow.

Yeah, wasn't sure if pyarrow was going to be considered optional or not.

"""Checks whether the file exists"""

@abstractmethod
def open(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there not a return type that we require here? What about IOBase? @emkornfield do you have a suggestion for this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think IOBase is probably heavy weight. Using protocols seems like the right thing here)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to require https://pypi.org/project/typing-extensions/ for python <= 3.7

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that idea! @rdblue does this protocol capture the required methods?

class InputStream(Protocol):
    def read(self, n: int) -> bytes:
        ...

    def readable(self) -> bool:
        ...

    def close(self) -> None:
        ...

    def seek(offset: int, whence: int) -> None:
        ...

    def tell(self) -> int:
        ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this some more I think IOBase is a right option here. It would guarantee interop with python standard library. Also if you look at it's design I think it lends credence to my comment below about potentially having one file type which can be inspected to determine if it is readable and writeable.

"""Get an OutputFile instance to write bytes to the file at the given location"""

@abstractmethod
def delete(self, location: str) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: location could also be an InputFile or an OutputFile, in which case this would delete that location. I'm not sure if there's an easy way to express that in type annotations, though.

Copy link
Collaborator Author

@samredai samredai Jan 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the typehint to include InputFile and OutputFile (using typing.Union) and also updated the LocalFileIO.delete method defined in the tests to handle these.

relevant commit: 51ea645

"""Returns an InputFile for the location of this output file"""

@abstractmethod
def create(self, overwrite: bool = False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, is there an IO type that this should return?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not, we could create one that has the __enter__ and __exit__ methods that make with automatically close the file?

"""An InputFile implementation for local files (for test use only)"""

def __init__(self, location: str):
if not location.startswith("file://"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be file:/// or file:/. The first has an authority section (after //) but it is empty. The second variation leaves out authority and just has a path. Either way, the path is a full path starting from /. Also, one case that is not allowed is a URI like file://one/two/three/a.parquet because the authority is one and no authority is allowed for local FS URIs.

Copy link
Collaborator Author

@samredai samredai Jan 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I switched to using urllib.parse.urlparse which is commonly used in other packages. This returns a ParseResult where you can check ParseResult.scheme, ParseResult.path, etc. and I set that to a property called parsed_location.

In addition to checking that the scheme is file, I also added a check that there's no ParseResult.netloc for a LocalInputFile or LocalOutputFile, which is the authority section.

relevant commit: fcb7dc4

super().__init__(location=location.split("file://")[1])

def __len__(self):
return len(self._file_obj)
Copy link
Contributor

@rdblue rdblue Jan 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be os.path.getsize(self.location)? I don't see any other reference to self._file_obj.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right thanks! I updated it to use the parsed uri added in another commit so it's os.path.getsize(self.parsed_location.path) now. I also added validation of len in the tests.

relevant commit: 7b625cf


def create(self, overwrite: bool = False) -> None:
if not overwrite and self.exists():
raise FileExistsError(f"{self.location} already exists")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of checking for existence directly, I think this should use mode wbx when not overwriting, which will fail if the file already exists. That ensures that the check is atomic. With the check here, there is a race condition between two writers that are in this method. Both check that the file doesn't exist and succeed, but then both try to create the file.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, looks like 'xb' is the right mode description so I updated this and it looks much cleaner!:

    def create(self, overwrite: bool = False) -> None:
        return open(self.parsed_location.path, "wb" if overwrite else "xb")

relevant commit: 25836bf

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you get the correct FileExistsError from open?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep! I validate that here in one of the tests.

output_file_location = os.path.join(tmpdirname, "foo.txt")

# Instantiate an output file
output_file = CustomOutputFile(location=f"file://{output_file_location}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a test that location is the original location that was passed in?

Copy link
Collaborator Author

@samredai samredai Jan 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some tests to validate the location and also validate that a ValueError is raised when an authority is provided in the uri (for the LocalInputFile and LocalOutputFile implementations in the test file).

The test for validating the location is parameterized so it's easy to add to. For example in the future we could simply add (S3FileIO, "s3://foo/bar/baz.parquet", "s3", "", "/bar/baz.parquet") to the list.

relevant commit: cc490a5

@rdblue
Copy link
Contributor

rdblue commented Jan 24, 2022

Looks great. Thanks for updating this, @samredai!

I'm going to go ahead and merge this. I think there's still an open question about what type to return from the open and create methods (and how to support with) but the overall structure looks great and we can solve that problem later.

@rdblue rdblue merged commit 095754c into apache:master Jan 24, 2022
"""Get an InputFile instance to read bytes from the file at the given location"""

@abstractmethod
def new_output(self, location: str) -> OutputFile:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to distinguish between input and output files? it seems like for the most part the APIs are very similar? It seems if a file is going to be only readable or writeable having the implementatin throw not-implemented might be a better choice?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gives the flexibility to the implementation. You can always implement both base classes, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this might be more a philosophical question. There are two ways to achieve the flexibility:

  1. Provide a single class and have users only implement the methods they want (you can document a set of methods that should always be implemented together). Giving users run-time errors when not implemented.
  2. Separate the functionality into two different interfaces and require all methods be implemented.

My sense is that #2 is more of a java design pattern. I think (but I'm no expert) option #1 is more pythonic/dynamically typed language pattern.


@abstractmethod
def create(self, overwrite: bool = False):
"""This method should return a file-like object.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

file-like object is a confusing given that these classes are also called File. Is it supposed supposed to only support write() methods?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should add that I understand it because I am familiar with the python idiom of "file-like" and maybe most users of these class will be since, because after all this is python.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I specify the required methods which I believe are write, close, flush, and tell? Maybe also mention that close should flush. Or better yet just add a protocol here too with those methods and specify that protocol in the docstring for create?

"""This method should return an object that matches the OutputStream protocol
...
"""

OutputStream protocol

from typing import Protocol

class OutputStream(Protocol):
    def write(self, b: bytes) -> None:
        ...

    def close(self) -> None:
        ...

    def flush(self) -> None:
        ...

    def tell(self) -> int:
        ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think documenting via type returned here makes sense here if we want to go with protocol.

"""


class FileIO(ABC):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doc string


@abstractmethod
def open(self):
"""This method should return an instance of an seekable input stream."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, be consistent ending doc-strings with periods or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also specify what should happen if the file doesn't exist.

"""Checks whether the file exists"""

@abstractmethod
def open(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to require https://pypi.org/project/typing-extensions/ for python <= 3.7

"""Get an InputFile instance to read bytes from the file at the given location"""

@abstractmethod
def new_output(self, location: str) -> OutputFile:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this might be more a philosophical question. There are two ways to achieve the flexibility:

  1. Provide a single class and have users only implement the methods they want (you can document a set of methods that should always be implemented together). Giving users run-time errors when not implemented.
  2. Separate the functionality into two different interfaces and require all methods be implemented.

My sense is that #2 is more of a java design pattern. I think (but I'm no expert) option #1 is more pythonic/dynamically typed language pattern.


@abstractmethod
def create(self, overwrite: bool = False):
"""This method should return a file-like object.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should add that I understand it because I am familiar with the python idiom of "file-like" and maybe most users of these class will be since, because after all this is python.

@samredai
Copy link
Collaborator Author

This seems to require https://pypi.org/project/typing-extensions/ for python <= 3.7

In some of the original design discussions, the idea was that we would stick with the NEP 29 deprecation policy so we wouldn't be supporting python versions <= 3.7, @jun-he does that sound right?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants