Skip to content

Commit

Permalink
Code review
Browse files Browse the repository at this point in the history
  • Loading branch information
ScrapCodes committed Oct 22, 2014
1 parent 57ee6f0 commit fef35ec
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ import akka.actor.ActorRef
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.deploy.ApplicationDescription

@DeveloperApi
class ApplicationInfo(
private[spark] class ApplicationInfo(
val startTime: Long,
val id: String,
val desc: ApplicationDescription,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import java.util.Date
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.deploy.DriverDescription

@DeveloperApi
class DriverInfo(
private[spark] class DriverInfo(
val startTime: Long,
val id: String,
val desc: DriverDescription,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ private[spark] class FileSystemPersistenceEngine(
new File(dir + File.separator + name).delete()
}

override def read[T: ClassTag](name: String) = {
val files = new File(dir).listFiles().filter(_.getName.startsWith(name))
override def read[T: ClassTag](prefix: String) = {
val files = new File(dir).listFiles().filter(_.getName.startsWith(prefix))
files.map(deserializeFromFile[T])
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,8 @@ private[spark] class Master(
msg = URLEncoder.encode(msg, "UTF-8")
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&exception=$exception&title=$title"
false
} }
}
}

/** Generate a new app ID given a app's submission date */
def newApplicationId(submitDate: Date): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,45 +29,58 @@ import scala.reflect.ClassTag
* Given these two requirements, we will have all apps and workers persisted, but
* we might not have yet deleted apps or workers that finished (so their liveness must be verified
* during recovery).
*
* The implementation of this trait defines how name-object pairs are stored or retrieved.
*/
@DeveloperApi
trait PersistenceEngine {

/**
* Defines how the object is serialized and persisted. Implementation will
* depend on the store used.
*/
def persist(name: String, obj: Object)

/**
* Defines how the object referred by its name is removed from the store.
*/
def unpersist(name: String)

def read[T: ClassTag](name: String): Seq[T]
/**
* Gives all objects, matching a prefix. This defines how objects are
* read/deserialized back.
*/
def read[T: ClassTag](prefix: String): Seq[T]

def addApplication(app: ApplicationInfo): Unit = {
final def addApplication(app: ApplicationInfo): Unit = {
persist("app_" + app.id, app)
}

def removeApplication(app: ApplicationInfo): Unit = {
final def removeApplication(app: ApplicationInfo): Unit = {
unpersist("app_" + app.id)
}

def addWorker(worker: WorkerInfo): Unit = {
final def addWorker(worker: WorkerInfo): Unit = {
persist("worker_" + worker.id, worker)
}

def removeWorker(worker: WorkerInfo): Unit = {
final def removeWorker(worker: WorkerInfo): Unit = {
unpersist("worker_" + worker.id)
}

def addDriver(driver: DriverInfo): Unit = {
final def addDriver(driver: DriverInfo): Unit = {
persist("driver_" + driver.id, driver)
}

def removeDriver(driver: DriverInfo): Unit = {
final def removeDriver(driver: DriverInfo): Unit = {
unpersist("driver_" + driver.id)
}

/**
* Returns the persisted data sorted by their respective ids (which implies that they're
* sorted by time of creation).
*/
def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
final def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
(read[ApplicationInfo]("app_"), read[DriverInfo]("driver_"), read[WorkerInfo]("worker_"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.apache.spark.serializer.JavaSerializer
* Implementation of this class can be plugged in as recovery mode alternative for Spark's
* Standalone mode.
*
* @param conf SparkConf
*/
@DeveloperApi
abstract class StandaloneRecoveryModeFactory(conf: SparkConf) {
Expand All @@ -36,23 +35,18 @@ abstract class StandaloneRecoveryModeFactory(conf: SparkConf) {
* PersistenceEngine defines how the persistent data(Information about worker, driver etc..)
* is handled for recovery.
*
* @return
*/
def createPersistenceEngine(): PersistenceEngine

/**
* Create an instance of LeaderAgent that decides who gets elected as master.
* @param master
* @return
*/
def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent
}

/**
* LeaderAgent in this case is a no-op. Since leader is forever leader as the actual
* recovery is made by restoring from filesystem.
*
* @param conf SparkConf
*/
private[spark] class FileSystemRecoveryModeFactory(conf: SparkConf)
extends StandaloneRecoveryModeFactory(conf) with Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import akka.actor.ActorRef
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils

@DeveloperApi
class WorkerInfo(
private[spark] class WorkerInfo(
val id: String,
val host: String,
val port: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ private[spark] class ZooKeeperPersistenceEngine(val serialization: Serializer, c
zk.delete().forPath(WORKING_DIR + "/" + name)
}

override def read[T: ClassTag](name: String) = {
val file = zk.getChildren.forPath(WORKING_DIR).filter(_.startsWith(name))
override def read[T: ClassTag](prefix: String) = {
val file = zk.getChildren.forPath(WORKING_DIR).filter(_.startsWith(prefix))
file.map(deserializeFromFile[T]).flatten
}

Expand Down

0 comments on commit fef35ec

Please sign in to comment.