Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PySPark write to OpenSearch #153

Closed
manirudh78 opened this issue Mar 15, 2023 · 10 comments
Closed

PySPark write to OpenSearch #153

manirudh78 opened this issue Mar 15, 2023 · 10 comments
Labels
enhancement New feature or request

Comments

@manirudh78
Copy link

manirudh78 commented Mar 15, 2023

With the Jar we can write Spark-Scala dataframe to OpenSearch can we add the necessary functionality to support PySpark write to OpenSearch?
@nknize can you please help?

@manirudh78 manirudh78 added enhancement New feature or request untriaged labels Mar 15, 2023
@wbeckler
Copy link

What is needed for that? Is it possible that the current implementation already works by using this jar in this fashion: https://www.bmc.com/blogs/write-apache-spark-elasticsearch-python/

@junhl
Copy link

junhl commented Mar 16, 2023

I think it's the same as #76

@manirudh78
Copy link
Author

Thanks @junhl. I closed the other one.

@wbeckler that doesn't work in case of elasticsearch you use the jar to write pyspark dataframe like this:

(dataFrame.write
.format("org.elasticsearch.spark.sql")
.option("inferSchema", "true")
.option("es.nodes", settings.all_es_hosts)
.option("es.port", str(settings.all_es_hosts_ports))
.option("es.net.http.auth.user", settings.username)
.option("es.net.http.auth.pass", settings.password)
# .option("es.net.ssl", "true")
# .option("es.nodes.wan.only","true")
.option("es.net.ssl.cert.allow.self.signed", "true")
.mode("append")
.save(index_name))

@kobisalant
Copy link

This functionality is not relate just to pySpark but also to Scala Spark apps that uses Spark structured streaming df.write.format...
I use elastic hadoop integration 7.13.4, can we assume openSearch client has the same functionality ?

@wbeckler
Copy link

Can you test it with the current client on the main branch?

@Thijsvandepoll
Copy link

Hi, can PySpark even be used to read/write from an Opensearch index?

@nknize
Copy link
Collaborator

nknize commented May 17, 2023

Hi, can PySpark even be used to read/write from an Opensearch index?

Yes. For example w/ OpenSearch 2.7 Docker command:

docker run --name opensearch -p 9200:9200 -p 9600:9600 -e "discovery.type=single-node" opensearchproject/opensearch:2.7.0

Build the snapshot (until it's formally released). And load the opensearch-hadoop jar:

pyspark --jars /home/nknize/Downloads/opensearch-hadoop-3.0.0-SNAPSHOT.jar

Test it out w/

from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.appName("pysparkTest").getOrCreate()
df = sparkSession.createDataFrame([(1, "value1"), (2, "value2")], ["id", "value"])
df.show()
df.write\
    .format("org.opensearch.spark.sql")\
    .option("inferSchema", "true")\
    .option("opensearch.nodes", "127.0.0.1")\
    .option("opensearch.port", "9200")\
    .option("opensearch.net.http.auth.user", "admin")\
    .option("opensearch.net.http.auth.pass", "admin")\
    .option("opensearch.net.ssl", "true")\
    .option("opensearch.net.ssl.cert.allow.self.signed", "true")\
    .option("opensearch.batch.write.retry.count", "9")\
    .option("opensearch.http.retries", "9")\
    .option("opensearch.http.timeout", "18000")\
    .mode("append")\
    .save("pyspark_idx")

OpenSearch log output:

[2023-04-03T03:28:48,054][INFO ][o.o.p.PluginsService     ] [3bb1ea1407b1] PluginService:onIndexModule index:[security-auditlog-2023.04.03/ggrxtaa4SYemeKYux-Of4Q]
[2023-04-03T03:28:48,056][INFO ][o.o.c.m.MetadataCreateIndexService] [3bb1ea1407b1] [security-auditlog-2023.04.03] creating index, cause [auto(bulk api)], templates [], shards [1]/[1]
[2023-04-03T03:28:48,186][INFO ][o.o.p.PluginsService     ] [3bb1ea1407b1] PluginService:onIndexModule index:[security-auditlog-2023.04.03/ggrxtaa4SYemeKYux-Of4Q]
[2023-04-03T03:28:48,231][INFO ][o.o.a.u.d.DestinationMigrationCoordinator] [3bb1ea1407b1] Detected cluster change event for destination migration
[2023-04-03T03:28:48,253][INFO ][o.o.p.PluginsService     ] [3bb1ea1407b1] PluginService:onIndexModule index:[pyspark_idx/ln70gq_xSriafvX-rbi_8A]
[2023-04-03T03:28:48,256][INFO ][o.o.c.m.MetadataCreateIndexService] [3bb1ea1407b1] [pyspark_idx] creating index, cause [api], templates [], shards [1]/[1]
[2023-04-03T03:28:48,385][INFO ][o.o.p.PluginsService     ] [3bb1ea1407b1] PluginService:onIndexModule index:[pyspark_idx/ln70gq_xSriafvX-rbi_8A]
[2023-04-03T03:28:48,427][INFO ][o.o.a.u.d.DestinationMigrationCoordinator] [3bb1ea1407b1] Detected cluster change event for destination migration

Closing this issue as complete.

@ravi-craft
Copy link

Has the 3.0.0 version been released yet? I still see 1.0.1 from May of last year.

@wbeckler
Copy link

What's your motivation for opensearch-hadoop v3? Is it for compatibility with OpenSearch v3? The major versions should be cross compatible, so latest opensearch-hadoop should work with latest 2.x and 3.x of OpenSearch.

@gsharma2907
Copy link

df.write
.format("org.opensearch.spark.sql")
.option("inferSchema", "true")
.option("opensearch.nodes", "127.0.0.1")
.option("opensearch.port", "9200")
.option("opensearch.net.http.auth.user", "admin")
.option("opensearch.net.http.auth.pass", "admin")
.option("opensearch.net.ssl", "true")
.option("opensearch.net.ssl.cert.allow.self.signed", "true")
.option("opensearch.batch.write.retry.count", "9")
.option("opensearch.http.retries", "9")
.option("opensearch.http.timeout", "18000")
.mode("append")
.save("pyspark_idx")

This did not work @nknize , receiving following exceptions:

`Traceback (most recent call last):
File "", line 1, in
File "/Users/zgaurash/.pyenv/versions/3.6.13/lib/python3.6/site-packages/pyspark/sql/readwriter.py", line 740, in save
self._jwrite.save(path)
File "/Users/zgaurash/.pyenv/versions/3.6.13/lib/python3.6/site-packages/pyspark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1322, in call
File "/Users/zgaurash/.pyenv/versions/3.6.13/lib/python3.6/site-packages/pyspark/sql/utils.py", line 111, in deco
return f(*a, **kw)
File "/Users/zgaurash/.pyenv/versions/3.6.13/lib/python3.6/site-packages/pyspark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o64.save.
: java.lang.ClassNotFoundException:
Failed to find data source: org.opensearch.spark.sql. Please find packages at
http://spark.apache.org/third-party-projects.html

at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToFindDataSourceError(QueryExecutionErrors.scala:443)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:670)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:720)
at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:852)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:256)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)

Caused by: java.lang.ClassNotFoundException: org.opensearch.spark.sql.DefaultSource
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:656)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:656)
at scala.util.Failure.orElse(Try.scala:224)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:656)
... 16 more
`
I used the following jar "opensearch-hadoop/build/libs/opensearch-hadoop-1.2.0-SNAPSHOT.jar "

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

8 participants