Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some improvements for Airflow IO code #36259

Merged
merged 4 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions airflow/io/store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
# under the License.
from __future__ import annotations

from contextlib import suppress
from functools import cached_property
from typing import TYPE_CHECKING, ClassVar

from airflow.io import get_fs, has_fs
Expand All @@ -37,8 +39,6 @@ class ObjectStore:
protocol: str
storage_options: Properties | None

_fs: AbstractFileSystem | None = None

def __init__(
self,
protocol: str,
Expand All @@ -48,15 +48,17 @@ def __init__(
):
self.conn_id = conn_id
self.protocol = protocol
self._fs = fs
if fs is not None:
self.fs = fs
bolkedebruin marked this conversation as resolved.
Show resolved Hide resolved
self.storage_options = storage_options

def __str__(self):
return f"{self.protocol}-{self.conn_id}" if self.conn_id else self.protocol

@property
@cached_property
def fs(self) -> AbstractFileSystem:
return self._connect()
# if the fs is provided in init, the next statement will be ignored
return get_fs(self.protocol, self.conn_id)

@property
def fsid(self) -> str:
Expand All @@ -68,17 +70,16 @@ def fsid(self) -> str:

:return: deterministic the filesystem ID
"""
fs = self._connect()
try:
return fs.fsid
return self.fs.fsid
except NotImplementedError:
return f"{self.fs.protocol}-{self.conn_id or 'env'}"

def serialize(self):
return {
"protocol": self.protocol,
"conn_id": self.conn_id,
"filesystem": qualname(self._fs) if self._fs else None,
"filesystem": qualname(self.fs) if self.fs else None,
"storage_options": self.storage_options,
}

Expand All @@ -103,13 +104,14 @@ def deserialize(cls, data: dict[str, str], version: int):

return attach(protocol=protocol, conn_id=conn_id, storage_options=data["storage_options"])

def _connect(self) -> AbstractFileSystem:
if self._fs is None:
self._fs = get_fs(self.protocol, self.conn_id)
return self._fs

def __eq__(self, other):
return isinstance(other, type(self)) and other.conn_id == self.conn_id and other._fs == self._fs
self_fs = None
other_fs = None
with suppress(ValueError):
self_fs = self.fs
with suppress(ValueError):
other_fs = other.fs
return isinstance(other, type(self)) and other.conn_id == self.conn_id and self_fs == other_fs


_STORE_CACHE: dict[str, ObjectStore] = {}
Expand Down
2 changes: 1 addition & 1 deletion tests/io/test_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ def test_serde_store(self):

assert s["protocol"] == "file"
assert s["conn_id"] == "mock"
assert s["filesystem"] is None
assert s["filesystem"] == qualname(LocalFileSystem)
assert store == d

store = attach("localfs", fs=LocalFileSystem())
Expand Down