diff --git a/ray_elasticsearch/__init__.py b/ray_elasticsearch/__init__.py index 6c54dcf..b8a756b 100644 --- a/ray_elasticsearch/__init__.py +++ b/ray_elasticsearch/__init__.py @@ -77,7 +77,7 @@ def _num_rows(self) -> int: index=self._index, body={ "query": self._query, - }, + } if self._query is not None else {}, )["count"] def num_rows(self) -> int: @@ -97,6 +97,7 @@ def estimate_inmemory_data_size(self) -> int: @staticmethod def _get_read_task( pit_id: str, + query: Optional[Mapping[str, Any]], slice_id: int, slice_max: int, chunk_size: int, @@ -116,6 +117,7 @@ def iter_blocks() -> Iterator[Table]: while True: response = elasticsearch.search( pit={"id": pit_id}, + query=query, slice={"id": slice_id, "max": slice_max}, size=chunk_size, search_after=search_after, @@ -144,6 +146,7 @@ def get_read_tasks(self, parallelism: int) -> list[ReadTask]: return [ self._get_read_task( pit_id=pit_id, + query=self._query, slice_id=i, slice_max=parallelism, chunk_size=self._chunk_size,