Skip to content

Commit

Permalink
Jlopezmalla secret varibles (apache#119)
Browse files Browse the repository at this point in the history
* secret Broadcast variables

* added temporal VaultURI variable

* vaultToken

Signed-off-by: jlopezmalla <[email protected]>

* POST bug

* token for Driver and Executors

* delete code for merging
  • Loading branch information
Gschiavon authored and pianista215 committed Dec 27, 2017
1 parent 9d2a96b commit 7d4e256
Show file tree
Hide file tree
Showing 8 changed files with 385 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 2.2.0.3 (upcoming)

* Unify Vault variables
* Secret Broadcast variables (Experimental)

## 2.2.0.2 (December 26, 2017)

Expand Down
21 changes: 21 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1492,6 +1492,27 @@ class SparkContext(config: SparkConf) extends Logging {
bc
}


/**
* Broadcast a read-only variable to the cluster, returning a
* [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
* The variable will be sent to each cluster only once.
*
* @param value value to broadcast to the Spark nodes
* @return `Broadcast` object, a read-only variable cached on each machine
*/
def secretBroadcast(secretVaultPath: String,
idJson: String): Broadcast[String] = {
assertNotStopped()
require(!classOf[RDD[_]].isAssignableFrom(classTag[String].runtimeClass),
"Can not directly broadcast RDDs; instead, call collect() and broadcast the result.")
val bc = env.broadcastManager.newSecretBroadcast(secretVaultPath, idJson, isLocal)
val callSite = getCallSite
logInfo("Created secret broadcast " + bc.id + " from " + callSite.shortForm)
cleaner.foreach(_.registerBroadcastForCleanup(bc))
bc
}

/**
* Add a file to be downloaded with this Spark job on every node.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@ private[spark] trait BroadcastFactory {
*/
def newBroadcast[T: ClassTag](value: T, isLocal: Boolean, id: Long): Broadcast[T]

/**
* Creates a new broadcast variable.
*
* @param secretRepositoryValue secret repository access variable to broadcast
* @param isLocal whether we are in local mode (single JVM process)
* @param id unique id representing this broadcast variable
*/
def newSecretBroadcast(secretVaultPath: String,
idJson: String,
isLocal: Boolean,
id: Long): Broadcast[String]

def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit

def stop(): Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ private[spark] class BroadcastManager(
def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {
broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
}
def newSecretBroadcast(secretVaultPath: String,
idJson: String,
isLocal: Boolean): Broadcast[String] = {
broadcastFactory.newSecretBroadcast(secretVaultPath, idJson,
isLocal, nextBroadcastId.getAndIncrement())
}

def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
broadcastFactory.unbroadcast(id, removeFromDriver, blocking)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
out.defaultWriteObject()
}

private def readBroadcastBlock(): T = Utils.tryOrIOException {
private[spark] def readBroadcastBlock(): T = Utils.tryOrIOException {
TorrentBroadcast.synchronized {
setConf(SparkEnv.get.conf)
val blockManager = SparkEnv.get.blockManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,20 @@ private[spark] class TorrentBroadcastFactory extends BroadcastFactory {
new TorrentBroadcast[T](value_, id)
}

/**
* Creates a new broadcast variable.
*
* @param secretRepositoryValue secret repository access variable to broadcast
* @param isLocal whether we are in local mode (single JVM process)
* @param id unique id representing this broadcast variable
*/
override def newSecretBroadcast(secretVaultPath: String,
idJson: String,
isLocal: Boolean,
id: Long): Broadcast[String] = {
new TorrentSecretBroadcast(secretVaultPath, idJson, isLocal, id)
}

override def stop() { }

/**
Expand All @@ -44,4 +58,5 @@ private[spark] class TorrentBroadcastFactory extends BroadcastFactory {
override def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
TorrentBroadcast.unpersist(id, removeFromDriver, blocking)
}

}
Loading

0 comments on commit 7d4e256

Please sign in to comment.