Skip to content

Commit

Permalink
[SPARK-48560][SS][PYTHON] Make StreamingQueryListener.spark settable
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR proposes to make StreamingQueryListener.spark settable

### Why are the changes needed?

```python
from pyspark.sql.streaming.listener import StreamingQueryListener

class MyListener(StreamingQueryListener):
  def __init__(self, spark):
    self.spark = spark

  def onQueryStarted(self, event):
    pass

  def onQueryProgress(self, event):
    pass

  def onQueryTerminated(self, event):
    pass

MyListener(spark)
```

is broken from 3.5.0 after SPARK-42941.

### Does this PR introduce _any_ user-facing change?

Yes, end users who implement `StreamingQueryListener` can add `spark` attribute in their implementation.

### How was this patch tested?

Manually tested, and added a unittest.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46909 from HyukjinKwon/compat-spark-prop.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
HyukjinKwon committed Jun 9, 2024
1 parent 24bce72 commit d9394ee
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 6 deletions.
15 changes: 9 additions & 6 deletions python/pyspark/sql/streaming/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,19 @@ class StreamingQueryListener(ABC):
"""

def _set_spark_session(
self, spark: "SparkSession" # type: ignore[name-defined] # noqa: F821
self, session: "SparkSession" # type: ignore[name-defined] # noqa: F821
) -> None:
self._sparkSession = spark
if self.spark is None:
self.spark = session

@property
def spark(self) -> Optional["SparkSession"]: # type: ignore[name-defined] # noqa: F821
if hasattr(self, "_sparkSession"):
return self._sparkSession
else:
return None
return getattr(self, "_sparkSession", None)

@spark.setter
def spark(self, session: "SparkSession") -> None: # type: ignore[name-defined] # noqa: F821
# For backward compatibility
self._sparkSession = session

def _init_listener_id(self) -> None:
self._id = str(uuid.uuid4())
Expand Down
17 changes: 17 additions & 0 deletions python/pyspark/sql/tests/streaming/test_streaming_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,23 @@ def test_streaming_query_progress_fromJson(self):
self.assertEqual(sink.numOutputRows, -1)
self.assertEqual(sink.metrics, {})

def test_spark_property_in_listener(self):
# SPARK-48560: Make StreamingQueryListener.spark settable
class TestListener(StreamingQueryListener):
def __init__(self, session):
self.spark = session

def onQueryStarted(self, event):
pass

def onQueryProgress(self, event):
pass

def onQueryTerminated(self, event):
pass

self.assertEqual(TestListener(self.spark).spark, self.spark)


if __name__ == "__main__":
import unittest
Expand Down

0 comments on commit d9394ee

Please sign in to comment.