Skip to content

Commit

Permalink
Add examples
Browse files Browse the repository at this point in the history
  • Loading branch information
janheinrichmerker committed Jul 3, 2024
1 parent 7effece commit 8dc33f2
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 1 deletion.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ sink = ElasticsearchDslDatasink(index=Foo)

Note that, unlike in [Elasticsearch DSL](https://elasticsearch-dsl.readthedocs.io/en/latest/), the results are not parsed as Python objects but instead remain Python dictionaries, due to Ray internally transforming everything in [Arrow format](https://arrow.apache.org/docs/python/index.html).

### Examples

More examples can be found in the [`examples`](examples/) directory.

## Development

To build this package and contribute to its development you need to install the `build`, `setuptools` and `wheel` packages:
Expand Down
19 changes: 19 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Examples

1. Configured your Ray cluster address in the `RAY_ADDRESS` environment variable.
2. Adapt the runtime environment in [`env.yml`](env.yml) to include your Elasticsearch host, index, and credentials.
3. Launch the examples like this:

```shell
# Write some test data into the index.
ray job submit --runtime-env examples/env.yml -- python write.py

# Sum the numbers of all documents in the index.
ray job submit --runtime-env examples/env.yml -- python read.py

# Sum the numbers of documents in the index that match a query.
ray job submit --runtime-env examples/env.yml -- python read_query.py

# Sum the numbers of documents in the index that match a query (using the `elasticsearch-dsl` query DSL).
ray job submit --runtime-env examples/env.yml -- python read_query_dsl.py
```
7 changes: 6 additions & 1 deletion examples/env.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
working_dir: examples/
pip:
- elasticsearch~=8.14
- ray-elasticsearch~=0.1.0

env_vars:
ELASTICSEARCH_HOST: https://example.com
ELASTICSEARCH_INDEX: test
ELASTICSEARCH_USERNAME: example
ELASTICSEARCH_PASSWORD: example
21 changes: 21 additions & 0 deletions examples/read.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from os import environ
from ray import init
from ray.data import read_datasource
from ray_elasticsearch import ElasticsearchDatasource

init()
source = ElasticsearchDatasource(
index=environ["ELASTICSEARCH_INDEX"],
client_kwargs=dict(
hosts=environ["ELASTICSEARCH_HOST"],
http_auth=(
environ["ELASTICSEARCH_USERNAME"],
environ["ELASTICSEARCH_PASSWORD"],
),
),
)
print(f"Num rows: {source.num_rows()}")
res = read_datasource(source)\
.map(lambda x: x["_source"])\
.sum("id")
print(f"Read complete. Sum: {res}")
28 changes: 28 additions & 0 deletions examples/read_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from os import environ
from ray import init
from ray.data import read_datasource
from ray_elasticsearch import ElasticsearchDatasource

init()
source = ElasticsearchDatasource(
index=environ["ELASTICSEARCH_INDEX"],
client_kwargs=dict(
hosts=environ["ELASTICSEARCH_HOST"],
http_auth=(
environ["ELASTICSEARCH_USERNAME"],
environ["ELASTICSEARCH_PASSWORD"],
),
),
query={
"range": {
"id": {
"lte": 100,
},
},
},
)
print(f"Num rows: {source.num_rows()}")
res = read_datasource(source)\
.map(lambda x: x["_source"])\
.sum("id")
print(f"Read complete. Sum: {res}")
23 changes: 23 additions & 0 deletions examples/read_query_dsl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from os import environ
from elasticsearch_dsl.query import Range
from ray import init
from ray.data import read_datasource
from ray_elasticsearch import ElasticsearchDslDatasource

init()
source = ElasticsearchDslDatasource(
index=environ["ELASTICSEARCH_INDEX"],
client_kwargs=dict(
hosts=environ["ELASTICSEARCH_HOST"],
http_auth=(
environ["ELASTICSEARCH_USERNAME"],
environ["ELASTICSEARCH_PASSWORD"],
),
),
query=Range(id={"lte": 100}),
)
print(f"Num rows: {source.num_rows()}")
res = read_datasource(source)\
.map(lambda x: x["_source"])\
.sum("id")
print(f"Read complete. Sum: {res}")
21 changes: 21 additions & 0 deletions examples/write.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from os import environ
from ray import init
from ray.data import range
from ray_elasticsearch import ElasticsearchDatasink

init()
sink = ElasticsearchDatasink(
index=environ["ELASTICSEARCH_INDEX"],
client_kwargs=dict(
hosts=environ["ELASTICSEARCH_HOST"],
http_auth=(
environ["ELASTICSEARCH_USERNAME"],
environ["ELASTICSEARCH_PASSWORD"],
),
),
)

range(10_000)\
.map(lambda x: {"_source": x})\
.write_datasink(sink)
print("Write complete.")

0 comments on commit 8dc33f2

Please sign in to comment.