Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12429][Streaming][Doc]Add Accumulator and Broadcast example for Streaming #10385

Closed
wants to merge 4 commits into from
Closed

[SPARK-12429][Streaming][Doc]Add Accumulator and Broadcast example for Streaming #10385

wants to merge 4 commits into from

Conversation

zsxwing
Copy link
Member

@zsxwing zsxwing commented Dec 18, 2015

This PR adds Scala, Java and Python examples to show how to use Accumulator and Broadcast in Spark Streaming to support checkpointing.

@zsxwing
Copy link
Member Author

zsxwing commented Dec 18, 2015

@tdas could you take a look before I start to add Java and Python examples?

@SparkQA
Copy link

SparkQA commented Dec 18, 2015

Test build #48018 has finished for PR 10385 at commit 9928ca5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * public class JavaTwitterHashTagJoinSentiments\n * case class UnresolvedAlias(child: Expression, aliasName: Option[String] = None)\n

@@ -1415,6 +1415,95 @@ Note that the connections in the pool should be lazily created on demand and tim

***

## Accumulator and Broadcast

Accumulator and Broadcast cannot be recovered from checkpoint in Streaming. If you enable checkpoint and use Accumulator or Broadcast as well, you have to create lazily instantiated singleton instances for Accumulator and Broadcast so that they can be restarted on driver failures. This is shown in the following example.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say: "in Spark Streaming. If you enable checkpointing and use an Accumulator or Broadcast as well, you**'ll** have to create ..."

@zsxwing zsxwing changed the title [SPARK-12429][Streaming][Doc]Add Accumulator and Broadcast Scala example for Streaming [SPARK-12429][Streaming][Doc][WIP]Add Accumulator and Broadcast example for Streaming Dec 18, 2015
@SparkQA
Copy link

SparkQA commented Dec 19, 2015

Test build #48038 has finished for PR 10385 at commit 78d15bd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing zsxwing changed the title [SPARK-12429][Streaming][Doc][WIP]Add Accumulator and Broadcast example for Streaming [SPARK-12429][Streaming][Doc]Add Accumulator and Broadcast example for Streaming Dec 21, 2015
@SparkQA
Copy link

SparkQA commented Dec 21, 2015

Test build #48120 has finished for PR 10385 at commit 9e241e7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * class JavaWordBlacklist\n * class JavaDroppedWordsCounter\n

@zsxwing
Copy link
Member Author

zsxwing commented Dec 21, 2015

Added Java and Python examples.

@BenFradet
Copy link
Contributor

lgtm

@@ -1415,6 +1415,185 @@ Note that the connections in the pool should be lazily created on demand and tim

***

## Accumulator and Broadcast
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accumulators and Broadcast variables

@tdas
Copy link
Contributor

tdas commented Dec 22, 2015

Small comments, otherwise LGTM.

@@ -806,7 +806,7 @@ However, in `cluster` mode, what happens is more complicated, and the above may

What is happening here is that the variables within the closure sent to each executor are now copies and thus, when **counter** is referenced within the `foreach` function, it's no longer the **counter** on the driver node. There is still a **counter** in the memory of the driver node but this is no longer visible to the executors! The executors only see the copy from the serialized closure. Thus, the final value of **counter** will still be zero since all operations on **counter** were referencing the value within the serialized closure.

To ensure well-defined behavior in these sorts of scenarios one should use an [`Accumulator`](#AccumLink). Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also fixed the broken Accumulator link in programming-guide.md

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

@SparkQA
Copy link

SparkQA commented Dec 23, 2015

Test build #48222 has finished for PR 10385 at commit 455968a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * class JavaWordBlacklist\n * class JavaDroppedWordsCounter\n

asfgit pushed a commit that referenced this pull request Dec 23, 2015
…or Streaming

This PR adds Scala, Java and Python examples to show how to use Accumulator and Broadcast in Spark Streaming to support checkpointing.

Author: Shixiong Zhu <[email protected]>

Closes #10385 from zsxwing/accumulator-broadcast-example.

(cherry picked from commit 20591af)
Signed-off-by: Tathagata Das <[email protected]>
@asfgit asfgit closed this in 20591af Dec 23, 2015
@zsxwing zsxwing deleted the accumulator-broadcast-example branch December 23, 2015 05:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants