Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18758][SS] StreamingQueryListener events from a StreamingQuery should be sent only to the listeners in the same session as the query #16186

Closed
wants to merge 6 commits into from

Conversation

tdas
Copy link
Contributor

@tdas tdas commented Dec 7, 2016

What changes were proposed in this pull request?

Listeners added with sparkSession.streams.addListener(l) are added to a SparkSession. So events only from queries in the same session as a listener should be posted to the listener. Currently, all the events gets rerouted through the Spark's main listener bus, that is,

  • StreamingQuery posts event to StreamingQueryListenerBus. Only the queries associated with the same session as the bus posts events to it.
  • StreamingQueryListenerBus posts event to Spark's main LiveListenerBus as a SparkEvent.
  • StreamingQueryListenerBus also subscribes to LiveListenerBus events thus getting back the posted event in a different thread.
  • The received is posted to the registered listeners.

The problem is that all StreamingQueryListenerBuses in all sessions gets the events and posts them to their listeners. This is wrong.

In this PR, I solve it by making StreamingQueryListenerBus track active queries (by their runIds) when a query posts the QueryStarted event to the bus. This allows the rerouted events to be filtered using the tracked queries.

Note that this list needs to be maintained separately
from the StreamingQueryManager.activeQueries because a terminated query is cleared from
StreamingQueryManager.activeQueries as soon as it is stopped, but the this ListenerBus must
clear a query only after the termination event of that query has been posted lazily, much after the query has been terminated.

Credit goes to @zsxwing for coming up with the initial idea.

How was this patch tested?

Updated test harness code to use the correct session, and added new unit test.

@@ -70,11 +70,11 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)

def schema: StructType = encoder.schema

def toDS()(implicit sqlContext: SQLContext): Dataset[A] = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this because this is not needed. the sqlContext is in the constructor.
rather this causes confusion in multi-session environment.

var pos = 0
var currentPlan: LogicalPlan = stream.logicalPlan
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used.

@@ -319,7 +319,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
""".stripMargin)
}

val testThread = Thread.currentThread()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used.

@SparkQA
Copy link

SparkQA commented Dec 7, 2016

Test build #69769 has finished for PR 16186 at commit 9585ae4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor Author

tdas commented Dec 7, 2016

@marmbrus @zsxwing @brkyvz Please review.

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall. Just a minor style issue.

override protected def doPostEvent(
listener: StreamingQueryListener,
event: StreamingQueryListener.Event): Unit = {
val runIdsToReportTo = activeQueryRunIds.synchronized { activeQueryRunIds.toSet }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need to clone the set? You can just use activeQueryRunIds.synchronized { activeQueryRunIds.contains(...) }. Right?

Copy link
Contributor Author

@tdas tdas Dec 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

otherwise the code would look like

      case queryStarted: QueryStartedEvent =>
        if (activeQueryRunIds.synchronized { activeQueryRunIds.contains(queryStarted.runId) }) {
          listener.onQueryStarted(queryStarted)
        }
      case queryProgress: QueryProgressEvent =>
        if (activeQueryRunIds.synchronized { activeQueryRunIds.contains(queryProgress.progress.runId) }) {
          listener.onQueryProgress(queryProgress)
        }
      case queryTerminated: QueryTerminatedEvent =>
        if (activeQueryRunIds.synchronized { activeQueryRunIds.contains(queryTerminated.runId) }) {
          listener.onQueryTerminated(queryTerminated)
          activeQueryRunIds.synchronized { activeQueryRunIds -= queryTerminated.runId }
        }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good? It also reduces the number of lines :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this looks uglier. repeated code. longer lines? :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually found a middle ground :D

def shouldReport(runId: UUID): Boolean = {
      activeQueryRunIds.synchronized { activeQueryRunIds.contains(runId) }
    }
    
    event match {
      case queryStarted: QueryStartedEvent =>
        if (shouldReport(queryStarted.runId)) {
          listener.onQueryStarted(queryStarted)
        }
      case queryProgress: QueryProgressEvent =>
        if (shouldReport(queryProgress.progress.runId)) {
          listener.onQueryProgress(queryProgress)
        }
      case queryTerminated: QueryTerminatedEvent =>
        if (shouldReport(queryTerminated.runId)) {
          listener.onQueryTerminated(queryTerminated)
          activeQueryRunIds.synchronized { activeQueryRunIds -= queryTerminated.runId }
        }
      case _ =>
    }

* `StreamingQueryManager.activeQueries` as soon as it is stopped, but the this ListenerBus must
* clear a query only after the termination event of that query has been posted.
*/
private val activeQueryRunIds = new mutable.HashSet[UUID]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a qq: why do we use runId's instead of the id's of the streams? We already don't want concurrent runs for streams since the offset log directories will be messed up.

Wait, ok, got it. onStart's are called synchronously where as onTerminations are asynchronous. Basically we can get a second stream start report before the firs run completes. Do you think that's worth adding to the docs? You don't have to if you don't need a second pass.

Copy link
Member

@zsxwing zsxwing Dec 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

It's worth to document it. This is different from other Spark's listener buses because of the synchronous event.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if this behavior was not different (that is, all async), this component should not be responsible preventing concurrent runs. This component should be simple and not deal with such issues. I have added more docs regarding why runIds instead of ids

@zsxwing
Copy link
Member

zsxwing commented Dec 8, 2016

LGTM pending tests

@SparkQA
Copy link

SparkQA commented Dec 8, 2016

Test build #69831 has finished for PR 16186 at commit 1e4411f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 8, 2016

Test build #69835 has finished for PR 16186 at commit 986d7b0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

asfgit pushed a commit that referenced this pull request Dec 8, 2016
… should be sent only to the listeners in the same session as the query

## What changes were proposed in this pull request?

Listeners added with `sparkSession.streams.addListener(l)` are added to a SparkSession. So events only from queries in the same session as a listener should be posted to the listener. Currently, all the events gets rerouted through the Spark's main listener bus, that is,
- StreamingQuery posts event to StreamingQueryListenerBus. Only the queries associated with the same session as the bus posts events to it.
- StreamingQueryListenerBus posts event to Spark's main LiveListenerBus as a SparkEvent.
- StreamingQueryListenerBus also subscribes to LiveListenerBus events thus getting back the posted event in a different thread.
- The received is posted to the registered listeners.

The problem is that *all StreamingQueryListenerBuses in all sessions* gets the events and posts them to their listeners. This is wrong.

In this PR, I solve it by making StreamingQueryListenerBus track active queries (by their runIds) when a query posts the QueryStarted event to the bus. This allows the rerouted events to be filtered using the tracked queries.

Note that this list needs to be maintained separately
from the `StreamingQueryManager.activeQueries` because a terminated query is cleared from
`StreamingQueryManager.activeQueries` as soon as it is stopped, but the this ListenerBus must
clear a query only after the termination event of that query has been posted lazily, much after the query has been terminated.

Credit goes to zsxwing for coming up with the initial idea.

## How was this patch tested?
Updated test harness code to use the correct session, and added new unit test.

Author: Tathagata Das <[email protected]>

Closes #16186 from tdas/SPARK-18758.

(cherry picked from commit 9ab725e)
Signed-off-by: Tathagata Das <[email protected]>
@asfgit asfgit closed this in 9ab725e Dec 8, 2016
robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 15, 2016
… should be sent only to the listeners in the same session as the query

## What changes were proposed in this pull request?

Listeners added with `sparkSession.streams.addListener(l)` are added to a SparkSession. So events only from queries in the same session as a listener should be posted to the listener. Currently, all the events gets rerouted through the Spark's main listener bus, that is,
- StreamingQuery posts event to StreamingQueryListenerBus. Only the queries associated with the same session as the bus posts events to it.
- StreamingQueryListenerBus posts event to Spark's main LiveListenerBus as a SparkEvent.
- StreamingQueryListenerBus also subscribes to LiveListenerBus events thus getting back the posted event in a different thread.
- The received is posted to the registered listeners.

The problem is that *all StreamingQueryListenerBuses in all sessions* gets the events and posts them to their listeners. This is wrong.

In this PR, I solve it by making StreamingQueryListenerBus track active queries (by their runIds) when a query posts the QueryStarted event to the bus. This allows the rerouted events to be filtered using the tracked queries.

Note that this list needs to be maintained separately
from the `StreamingQueryManager.activeQueries` because a terminated query is cleared from
`StreamingQueryManager.activeQueries` as soon as it is stopped, but the this ListenerBus must
clear a query only after the termination event of that query has been posted lazily, much after the query has been terminated.

Credit goes to zsxwing for coming up with the initial idea.

## How was this patch tested?
Updated test harness code to use the correct session, and added new unit test.

Author: Tathagata Das <[email protected]>

Closes apache#16186 from tdas/SPARK-18758.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
… should be sent only to the listeners in the same session as the query

## What changes were proposed in this pull request?

Listeners added with `sparkSession.streams.addListener(l)` are added to a SparkSession. So events only from queries in the same session as a listener should be posted to the listener. Currently, all the events gets rerouted through the Spark's main listener bus, that is,
- StreamingQuery posts event to StreamingQueryListenerBus. Only the queries associated with the same session as the bus posts events to it.
- StreamingQueryListenerBus posts event to Spark's main LiveListenerBus as a SparkEvent.
- StreamingQueryListenerBus also subscribes to LiveListenerBus events thus getting back the posted event in a different thread.
- The received is posted to the registered listeners.

The problem is that *all StreamingQueryListenerBuses in all sessions* gets the events and posts them to their listeners. This is wrong.

In this PR, I solve it by making StreamingQueryListenerBus track active queries (by their runIds) when a query posts the QueryStarted event to the bus. This allows the rerouted events to be filtered using the tracked queries.

Note that this list needs to be maintained separately
from the `StreamingQueryManager.activeQueries` because a terminated query is cleared from
`StreamingQueryManager.activeQueries` as soon as it is stopped, but the this ListenerBus must
clear a query only after the termination event of that query has been posted lazily, much after the query has been terminated.

Credit goes to zsxwing for coming up with the initial idea.

## How was this patch tested?
Updated test harness code to use the correct session, and added new unit test.

Author: Tathagata Das <[email protected]>

Closes apache#16186 from tdas/SPARK-18758.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants