Skip to content

Commit

Permalink
Backport typing
Browse files Browse the repository at this point in the history
  • Loading branch information
janheinrichmerker committed Jul 3, 2024
1 parent 26b3794 commit f31fda4
Showing 1 changed file with 18 additions and 17 deletions.
35 changes: 18 additions & 17 deletions ray_elasticsearch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@

from functools import cached_property
from itertools import chain
from typing import Any, Iterable, Iterator, Literal, Mapping, TypeAlias
from typing import Any, Iterable, Iterator, Literal, Mapping, Optional, Union
from typing_extensions import TypeAlias

from pandas import DataFrame
from pyarrow import Table
from ray.data import Datasource, ReadTask, Datasink
from ray.data._internal.execution.interfaces import TaskContext
from ray.data.block import BlockMetadata, Block

_es_import_error: ImportError | None
_es_import_error: Optional[ImportError]
try:
from elasticsearch import Elasticsearch # type: ignore
from elasticsearch.helpers import streaming_bulk # type: ignore
Expand All @@ -43,7 +44,7 @@

class ElasticsearchDatasource(Datasource):
_index: str
_query: Mapping[str, Any] | None
_query: Optional[Mapping[str, Any]]
_keep_alive: str
_chunk_size: int
_client_kwargs: dict[str, Any]
Expand Down Expand Up @@ -164,24 +165,24 @@ def supports_distributed_reads(self) -> bool:

class ElasticsearchDatasink(Datasink):
_index: str
_op_type: OpType | None
_op_type: Optional[OpType]
_chunk_size: int
_max_chunk_bytes: int
_max_retries: int
_initial_backoff: float | int
_max_backoff: float | int
_initial_backoff: Union[float, int]
_max_backoff: Union[float, int]

_client_kwargs: dict[str, Any]

def __init__(
self,
index: str,
op_type: OpType | None = None,
op_type: Optional[OpType] = None,
chunk_size: int = 500,
max_chunk_bytes: int = 100 * 1024 * 1024,
max_retries: int = 0,
initial_backoff: float | int = 2,
max_backoff: float | int = 600,
initial_backoff: Union[float, int] = 2,
max_backoff: Union[float, int] = 600,
client_kwargs: dict[str, Any] = {},
) -> None:
super().__init__()
Expand Down Expand Up @@ -251,11 +252,11 @@ def supports_distributed_writes(self) -> bool:
return True

@property
def num_rows_per_write(self) -> int | None:
def num_rows_per_write(self) -> Optional[int]:
return None


_es_dsl_import_error: ImportError | None
_es_dsl_import_error: Optional[ImportError]
try:
from elasticsearch_dsl import Document # type: ignore
from elasticsearch_dsl.query import Query # type: ignore
Expand All @@ -281,8 +282,8 @@ def num_rows_per_write(self) -> int | None:
class ElasticsearchDslDatasource(ElasticsearchDatasource):
def __init__(
self,
index: type[Document] | str,
query: Query | None = None,
index: Union[type[Document], str],
query: Optional[Query] = None,
keep_alive: str = "5m",
chunk_size: int = 1000,
client_kwargs: dict[str, Any] = {},
Expand All @@ -305,13 +306,13 @@ def __init__(
class ElasticsearchDslDatasink(ElasticsearchDatasink):
def __init__(
self,
index: type[Document] | str,
op_type: OpType | None = None,
index: Union[type[Document], str],
op_type: Optional[OpType] = None,
chunk_size: int = 500,
max_chunk_bytes: int = 100 * 1024 * 1024,
max_retries: int = 0,
initial_backoff: float | int = 2,
max_backoff: float | int = 600,
initial_backoff: Union[float, int] = 2,
max_backoff: Union[float, int] = 600,
client_kwargs: dict[str, Any] = {},
) -> None:
super().__init__(
Expand Down

0 comments on commit f31fda4

Please sign in to comment.