diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index 20718363c8d79..810b8bc0ca7ee 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -27,8 +27,7 @@ import akka.actor.ActorRef import org.apache.spark.annotation.DeveloperApi import org.apache.spark.deploy.ApplicationDescription -@DeveloperApi -class ApplicationInfo( +private[spark] class ApplicationInfo( val startTime: Long, val id: String, val desc: ApplicationDescription, diff --git a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala index 5e59bca0ddbe4..d9860912e5a08 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala @@ -22,8 +22,7 @@ import java.util.Date import org.apache.spark.annotation.DeveloperApi import org.apache.spark.deploy.DriverDescription -@DeveloperApi -class DriverInfo( +private[spark] class DriverInfo( val startTime: Long, val id: String, val desc: DriverDescription, diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala index 4bdd3364c2217..6ff2aa5244847 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala @@ -48,8 +48,8 @@ private[spark] class FileSystemPersistenceEngine( new File(dir + File.separator + name).delete() } - override def read[T: ClassTag](name: String) = { - val files = new File(dir).listFiles().filter(_.getName.startsWith(name)) + override def read[T: ClassTag](prefix: String) = { + val files = new File(dir).listFiles().filter(_.getName.startsWith(prefix)) files.map(deserializeFromFile[T]) } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 52c0bb2a3d7e8..65f5cc3fce608 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -740,7 +740,8 @@ private[spark] class Master( msg = URLEncoder.encode(msg, "UTF-8") app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&exception=$exception&title=$title" false - } } + } + } /** Generate a new app ID given a app's submission date */ def newApplicationId(submitDate: Date): String = { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala index b22b31682ee05..355bd2fef2dd8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala @@ -29,37 +29,50 @@ import scala.reflect.ClassTag * Given these two requirements, we will have all apps and workers persisted, but * we might not have yet deleted apps or workers that finished (so their liveness must be verified * during recovery). + * + * The implementation of this trait defines how name-object pairs are stored or retrieved. */ @DeveloperApi trait PersistenceEngine { + /** + * Defines how the object is serialized and persisted. Implementation will + * depend on the store used. + */ def persist(name: String, obj: Object) + /** + * Defines how the object referred by its name is removed from the store. + */ def unpersist(name: String) - def read[T: ClassTag](name: String): Seq[T] + /** + * Gives all objects, matching a prefix. This defines how objects are + * read/deserialized back. + */ + def read[T: ClassTag](prefix: String): Seq[T] - def addApplication(app: ApplicationInfo): Unit = { + final def addApplication(app: ApplicationInfo): Unit = { persist("app_" + app.id, app) } - def removeApplication(app: ApplicationInfo): Unit = { + final def removeApplication(app: ApplicationInfo): Unit = { unpersist("app_" + app.id) } - def addWorker(worker: WorkerInfo): Unit = { + final def addWorker(worker: WorkerInfo): Unit = { persist("worker_" + worker.id, worker) } - def removeWorker(worker: WorkerInfo): Unit = { + final def removeWorker(worker: WorkerInfo): Unit = { unpersist("worker_" + worker.id) } - def addDriver(driver: DriverInfo): Unit = { + final def addDriver(driver: DriverInfo): Unit = { persist("driver_" + driver.id, driver) } - def removeDriver(driver: DriverInfo): Unit = { + final def removeDriver(driver: DriverInfo): Unit = { unpersist("driver_" + driver.id) } @@ -67,7 +80,7 @@ trait PersistenceEngine { * Returns the persisted data sorted by their respective ids (which implies that they're * sorted by time of creation). */ - def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = { + final def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = { (read[ApplicationInfo]("app_"), read[DriverInfo]("driver_"), read[WorkerInfo]("worker_")) } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala index 2b4ec297d1275..d9d36c1ed5f9f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala @@ -27,7 +27,6 @@ import org.apache.spark.serializer.JavaSerializer * Implementation of this class can be plugged in as recovery mode alternative for Spark's * Standalone mode. * - * @param conf SparkConf */ @DeveloperApi abstract class StandaloneRecoveryModeFactory(conf: SparkConf) { @@ -36,14 +35,11 @@ abstract class StandaloneRecoveryModeFactory(conf: SparkConf) { * PersistenceEngine defines how the persistent data(Information about worker, driver etc..) * is handled for recovery. * - * @return */ def createPersistenceEngine(): PersistenceEngine /** * Create an instance of LeaderAgent that decides who gets elected as master. - * @param master - * @return */ def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent } @@ -51,8 +47,6 @@ abstract class StandaloneRecoveryModeFactory(conf: SparkConf) { /** * LeaderAgent in this case is a no-op. Since leader is forever leader as the actual * recovery is made by restoring from filesystem. - * - * @param conf SparkConf */ private[spark] class FileSystemRecoveryModeFactory(conf: SparkConf) extends StandaloneRecoveryModeFactory(conf) with Logging { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala index dc604b0fd0364..4af1037e700b9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala @@ -24,8 +24,7 @@ import akka.actor.ActorRef import org.apache.spark.annotation.DeveloperApi import org.apache.spark.util.Utils -@DeveloperApi -class WorkerInfo( +private[spark] class WorkerInfo( val id: String, val host: String, val port: Int, diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala index 53a847eaa5c04..96c2139eb02f0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala @@ -49,8 +49,8 @@ private[spark] class ZooKeeperPersistenceEngine(val serialization: Serializer, c zk.delete().forPath(WORKING_DIR + "/" + name) } - override def read[T: ClassTag](name: String) = { - val file = zk.getChildren.forPath(WORKING_DIR).filter(_.startsWith(name)) + override def read[T: ClassTag](prefix: String) = { + val file = zk.getChildren.forPath(WORKING_DIR).filter(_.startsWith(prefix)) file.map(deserializeFromFile[T]).flatten }