Ray data source and sink for Elasticsearch.
Use this minimal library if you plan to read or write data from/to Elasticsearch massively parallel for data processing in Ray. Internally, the library uses parallelized sliced point-in-time search for reading and parallelized bulk requests for writing data, the two most efficient ways to read/write data to/from Elasticsearch. Note, that this library does not guarantee any specific ordering of the results, though, the scores are returned.
Install the package from PyPI:
pip install ray-elasticsearch
This library makes use of Ray's Datasource
and Datasink
APIs.
For reading, use ElasticsearchDatasource
and, for writing, use ElasticsearchDatasink
.
You can read results from a specified index by using an ElasticsearchDatasource
with Ray's read_datasource()
like so:
from ray import init
from ray.data import read_datasource
from ray_elasticsearch import ElasticsearchDatasource
init()
source = ElasticsearchDatasource(index="test")
res = read_datasource(source)\
.map(lambda x: x["_source"])\
.sum("id")
print(f"Read complete. Sum: {res}")
Use an Elasticsearch query to filter the results:
source = ElasticsearchDatasource(
index="test",
query={
"match": {
"text": "foo bar",
},
},
)
Note that the parallel read does not enforce any ordering of the results even though the results are scored by Elasticsearch.
Normally, it is not necessary to specify a fixed concurrency level.
The data source will automatically determine the optimal concurrency based on the disk size of the Elasticsearch index and the Ray cluster capabilities.
You can, however, override the concurrency by setting the concurrency
parameter in Ray's read_datasource()
.
Writing documents works similarly by using the ElasticsearchDatasink
with Ray's write_datasink()
:
from ray import init
from ray.data import range
from ray_elasticsearch import ElasticsearchDatasink
init()
sink = ElasticsearchDatasink(index="test")
range(10_000)\
.map(lambda x: {"_source": x})\
.write_datasink(sink)
print("Write complete.")
Concurrency can again be limited by specifying the concurrency
parameter in Ray's write_datasink()
.
Per default, the data source and sink access Elasticsearch on localhost:9200
.
However, in most cases, you would instead want to continue to some remote Elasticsearch instance.
To do so, specify the client like in the example below, and use the same parameters as in the Elasticsearch()
constructor:
source = ElasticsearchDatasource(
index="test",
client_kwargs=dict(
hosts="<HOST>",
http_auth=("<USERNAME>", "<PASSWORD>"),
max_retries=10,
),
)
For the full list of allowed arguments in the client_kwargs
dictionary, refer to the documentation of the Elasticsearch()
constructor.
To simplify query construction, you can also use the Elasticsearch DSL and its corresponding data source (ElasticsearchDslDatasource
) and sink (ElasticsearchDslDatasink
):
from elasticsearch7_dsl import Document
from elasticsearch7_dsl.query import Exists
from ray_elasticsearch import ElasticsearchDslDatasource, ElasticsearchDslDatasink
class Foo(Document):
class Index:
name = "test_foo"
text: str = Text()
source = ElasticsearchDslDatasource(
index=Foo,
query=Exists(field="doi"),
)
sink = ElasticsearchDslDatasink(index=Foo)
Note that, unlike in Elasticsearch DSL, the results are not parsed as Python objects but instead remain Python dictionaries, due to Ray internally transforming everything in Arrow format.
More examples can be found in the examples
directory.
To build this package and contribute to its development you need to install the build
, setuptools
and wheel
packages:
pip install build setuptools wheel
(On most systems, these packages are already pre-installed.)
Install package and test dependencies:
pip install -e .[tests]
Verify your changes against the test suite to verify.
ruff check . # Code format and LINT
mypy . # Static typing
bandit -c pyproject.toml -r . # Security
pytest . # Unit tests
Please also add tests for your newly developed code.
Wheels for this package can be built with:
python -m build
If you have any problems using this package, please file an issue. We're happy to help!
This repository is released under the MIT license.