Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[stream] does raydp support spark stream data? #393

Open
fwensen opened this issue Dec 12, 2023 · 1 comment
Open

[stream] does raydp support spark stream data? #393

fwensen opened this issue Dec 12, 2023 · 1 comment

Comments

@fwensen
Copy link

fwensen commented Dec 12, 2023

i had write some code to consume kafka stream data, but got error
following is my raydp code

import ray
import raydp

ray.init()
packages = "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0"
spark = raydp.init_spark(app_name='RayDP Example',
                         num_executors=2,
                         executor_cores=2,
                         executor_memory='4GB',
                         configs={
                             "spark.jars.packages": packages
                         })

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:29092") \
    .option("subscribe", "ray-dev") \
    .load()


def duf_fun(v):
    print(f"printt {v}")
    return v


df2 = ray.data.from_spark(df).map(duf_fun).to_spark(spark)
query = (
    df2
    .writeStream
    .outputMode("append")
    .format("console")
    .start()
)

query.awaitTermination()

but got error

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):
  File "/Users/xxxxxxx/workspace/work/ray-source/raydp_kafka.py", line 27, in <module>
    df2 = ray.data.from_spark(df).map(duf_fun).to_spark(spark)
  File "/Users/xxxxxxx/miniforge3/envs/ray-dev/lib/python3.8/site-packages/ray/data/read_api.py", line 2301, in from_spark
    return raydp.spark.spark_dataframe_to_ray_dataset(df, parallelism)
  File "/Users/xxxxxxx/miniforge3/envs/ray-dev/lib/python3.8/site-packages/raydp/spark/dataset.py", line 175, in spark_dataframe_to_ray_dataset
    num_part = df.rdd.getNumPartitions()
  File "/Users/xxxxxxx/miniforge3/envs/ray-dev/lib/python3.8/site-packages/pyspark/sql/dataframe.py", line 175, in rdd
    jrdd = self._jdf.javaToPython()
  File "/Users/xxxxxxx/miniforge3/envs/ray-dev/lib/python3.8/site-packages/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/Users/xxxxxxx/miniforge3/envs/ray-dev/lib/python3.8/site-packages/pyspark/sql/utils.py", line 196, in deco
    raise converted from None
pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start();
kafka
@kira-lin
Copy link
Collaborator

No, I'm afraid that raydp does not support kafka streaming. I'm not sure how much gap there is, we have never tested it before.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants