From 5e8474e32d81d794d2d7eca7cfa7c81c9be689aa Mon Sep 17 00:00:00 2001 From: Alex Man Date: Thu, 27 Oct 2016 15:46:57 -0700 Subject: [PATCH] LIVY-212. Implemented session recovery for interactive session. Only work with YARN. livy-server can now recover interactive sessions after restart. There are some special cases: - If livy-server crashes while an interactive session is starting, there's a chance the session is unrecoverable, depending on timing (whether livy-repl has registered to livy-server). - If livy-server is down longer than the value of server.idle_timeout (default: 10min), livy-repl will timeout and quit. Note: All previous statements are lost after recovery. This will be fixed in a different commit. Closes #208 --- .../livy/client/http/HttpClientSpec.scala | 8 +- .../framework/BaseIntegrationTestSuite.scala | 6 +- .../livy/test/framework/LivyRestClient.scala | 12 +- .../cloudera/livy/test/InteractiveIT.scala | 35 +- .../java/com/cloudera/livy/rsc/RSCClient.java | 10 + .../com/cloudera/livy/server/LivyServer.scala | 7 +- .../cloudera/livy/server/SessionServlet.scala | 5 +- .../server/batch/BatchSessionServlet.scala | 6 +- .../interactive/InteractiveSession.scala | 484 +++++++++++------- .../InteractiveSessionServlet.scala | 19 +- .../server/recovery/ZooKeeperStateStore.scala | 7 +- .../livy/sessions/SessionManager.scala | 123 ++--- .../livy/server/BaseSessionServletSpec.scala | 7 +- .../livy/server/SessionServletSpec.scala | 17 +- .../livy/server/batch/BatchServletSpec.scala | 9 +- .../BaseInteractiveServletSpec.scala | 5 +- .../InteractiveSessionServletSpec.scala | 14 +- .../interactive/InteractiveSessionSpec.scala | 61 ++- .../livy/server/interactive/JobApiSpec.scala | 16 +- .../livy/sessions/SessionManagerSpec.scala | 8 +- 20 files changed, 548 insertions(+), 311 deletions(-) diff --git a/client-http/src/test/scala/com/cloudera/livy/client/http/HttpClientSpec.scala b/client-http/src/test/scala/com/cloudera/livy/client/http/HttpClientSpec.scala index c8403d2f1..b354d0cc7 100644 --- a/client-http/src/test/scala/com/cloudera/livy/client/http/HttpClientSpec.scala +++ b/client-http/src/test/scala/com/cloudera/livy/client/http/HttpClientSpec.scala @@ -40,7 +40,8 @@ import com.cloudera.livy.client.common.{BufferUtils, Serializer} import com.cloudera.livy.client.common.HttpMessages._ import com.cloudera.livy.server.WebServer import com.cloudera.livy.server.interactive.{InteractiveSession, InteractiveSessionServlet} -import com.cloudera.livy.sessions.{SessionState, Spark} +import com.cloudera.livy.server.recovery.SessionStore +import com.cloudera.livy.sessions.{InteractiveSessionManager, SessionState, Spark} import com.cloudera.livy.test.jobs.Echo import com.cloudera.livy.utils.AppInfo @@ -264,7 +265,10 @@ private class HttpClientTestBootstrap extends LifeCycle { private implicit def executor: ExecutionContext = ExecutionContext.global override def init(context: ServletContext): Unit = { - val servlet = new InteractiveSessionServlet(new LivyConf()) { + val conf = new LivyConf() + val stateStore = mock(classOf[SessionStore]) + val sessionManager = new InteractiveSessionManager(conf, stateStore, Some(Seq.empty)) + val servlet = new InteractiveSessionServlet(sessionManager, stateStore, conf) { override protected def createSession(req: HttpServletRequest): InteractiveSession = { val session = mock(classOf[InteractiveSession]) val id = sessionManager.nextId() diff --git a/integration-test/src/main/scala/com/cloudera/livy/test/framework/BaseIntegrationTestSuite.scala b/integration-test/src/main/scala/com/cloudera/livy/test/framework/BaseIntegrationTestSuite.scala index b78621517..bca80ad53 100644 --- a/integration-test/src/main/scala/com/cloudera/livy/test/framework/BaseIntegrationTestSuite.scala +++ b/integration-test/src/main/scala/com/cloudera/livy/test/framework/BaseIntegrationTestSuite.scala @@ -91,7 +91,11 @@ abstract class BaseIntegrationTestSuite extends FunSuite with Matchers with Befo } catch { case NonFatal(_) => } throw e } finally { - try { s.stop() } catch { case NonFatal(_) => } + try { + s.stop() + } catch { + case NonFatal(e) => alert(s"Failed to stop session: $e") + } } } diff --git a/integration-test/src/main/scala/com/cloudera/livy/test/framework/LivyRestClient.scala b/integration-test/src/main/scala/com/cloudera/livy/test/framework/LivyRestClient.scala index c3533ffda..58037a016 100644 --- a/integration-test/src/main/scala/com/cloudera/livy/test/framework/LivyRestClient.scala +++ b/integration-test/src/main/scala/com/cloudera/livy/test/framework/LivyRestClient.scala @@ -99,7 +99,7 @@ class LivyRestClient(val httpClient: AsyncHttpClient, val livyEndpoint: String) // Keeping the original timeout to avoid slowing down local development. eventually(timeout(t), interval(1 second)) { val s = snapshot().state - assert(strStates.contains(s), s"Session state $s doesn't equal one of $strStates") + assert(strStates.contains(s), s"Session $id state $s doesn't equal one of $strStates") } } @@ -145,8 +145,10 @@ class LivyRestClient(val httpClient: AsyncHttpClient, val livyEndpoint: String) val data = output("data").asInstanceOf[Map[String, Any]] Left(data("text/plain").asInstanceOf[String]) case Some("error") => Right(mapper.convertValue(output, classOf[StatementError])) - case Some(status) => throw new Exception(s"Unknown statement status: $status") - case None => throw new Exception(s"Unknown statement output: $newStmt") + case Some(status) => + throw new IllegalStateException(s"Unknown statement $stmtId status: $status") + case None => + throw new IllegalStateException(s"Unknown statement $stmtId output: $newStmt") } } } @@ -158,7 +160,7 @@ class LivyRestClient(val httpClient: AsyncHttpClient, val livyEndpoint: String) matchStrings(result, expectedRegex) } case Right(error) => - assert(false, s"Got error from statement $code: ${error.evalue}") + assert(false, s"Got error from statement $stmtId $code: ${error.evalue}") } } @@ -166,7 +168,7 @@ class LivyRestClient(val httpClient: AsyncHttpClient, val livyEndpoint: String) ename: String = null, evalue: String = null, stackTrace: String = null): Unit = { result() match { case Left(result) => - assert(false, s"Statement `$code` expected to fail, but succeeded.") + assert(false, s"Statement $stmtId `$code` expected to fail, but succeeded.") case Right(error) => val remoteStack = Option(error.stackTrace).getOrElse(Nil).mkString("\n") Seq(error.ename -> ename, error.evalue -> evalue, remoteStack -> stackTrace).foreach { diff --git a/integration-test/src/test/scala/com/cloudera/livy/test/InteractiveIT.scala b/integration-test/src/test/scala/com/cloudera/livy/test/InteractiveIT.scala index 0e4ec3e63..dc9c520fa 100644 --- a/integration-test/src/test/scala/com/cloudera/livy/test/InteractiveIT.scala +++ b/integration-test/src/test/scala/com/cloudera/livy/test/InteractiveIT.scala @@ -126,11 +126,42 @@ class InteractiveIT extends BaseIntegrationTestSuite { } } + test("recover interactive session") { + withNewSession(Spark()) { s => + s.run("1").verifyResult("res0: Int = 1") + + // Restart Livy. + cluster.stopLivy() + cluster.runLivy() + + // Verify session still exists. + s.verifySessionIdle() + s.run("2").verifyResult("res1: Int = 2") + // TODO, verify previous statement results still exist. + + s.stop() + + // Restart Livy. + cluster.stopLivy() + cluster.runLivy() + + // Verify deleted session doesn't show up after recovery. + s.verifySessionDoesNotExist() + + // Verify new session doesn't reuse old session id. + withNewSession(Spark(), Map.empty, false) { s1 => + s1.id should be > s.id + } + } + } + private def withNewSession[R] - (kind: Kind, sparkConf: Map[String, String] = Map.empty) + (kind: Kind, sparkConf: Map[String, String] = Map.empty, waitForIdle: Boolean = true) (f: (LivyRestClient#InteractiveSession) => R): R = { withSession(livyClient.startSession(kind, sparkConf)) { s => - s.verifySessionIdle() + if (waitForIdle) { + s.verifySessionIdle() + } f(s) } } diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java b/rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java index 244afd60d..525c9a6c1 100644 --- a/rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java +++ b/rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java @@ -57,6 +57,7 @@ public class RSCClient implements LivyClient { private final Promise driverRpc; private final int executorGroupId; private final EventLoopGroup eventLoopGroup; + private final Promise serverUriPromise; private ContextInfo contextInfo; private volatile boolean isAlive; @@ -71,16 +72,21 @@ public class RSCClient implements LivyClient { this.eventLoopGroup = new NioEventLoopGroup( conf.getInt(RPC_MAX_THREADS), Utils.newDaemonThreadFactory("RSCClient-" + executorGroupId + "-%d")); + this.serverUriPromise = ImmediateEventExecutor.INSTANCE.newPromise(); Utils.addListener(this.contextInfoPromise, new FutureListener() { @Override public void onSuccess(ContextInfo info) throws Exception { connectToContext(info); + String url = String.format("rsc://%s:%s@%s:%d", + info.clientId, info.secret, info.remoteAddress, info.remotePort); + serverUriPromise.setSuccess(URI.create(url)); } @Override public void onFailure(Throwable error) { connectionError(error); + serverUriPromise.setFailure(error); } }); @@ -174,6 +180,10 @@ public void onFailure(Throwable error) throws Exception { return promise; } + public Future getServerUri() { + return serverUriPromise; + } + @Override public JobHandle submit(Job job) { return protocol.submit(job); diff --git a/server/src/main/scala/com/cloudera/livy/server/LivyServer.scala b/server/src/main/scala/com/cloudera/livy/server/LivyServer.scala index 3b9a377e4..26421b32d 100644 --- a/server/src/main/scala/com/cloudera/livy/server/LivyServer.scala +++ b/server/src/main/scala/com/cloudera/livy/server/LivyServer.scala @@ -36,7 +36,7 @@ import com.cloudera.livy._ import com.cloudera.livy.server.batch.BatchSessionServlet import com.cloudera.livy.server.interactive.InteractiveSessionServlet import com.cloudera.livy.server.recovery.{SessionStore, StateStore} -import com.cloudera.livy.sessions.BatchSessionManager +import com.cloudera.livy.sessions.{BatchSessionManager, InteractiveSessionManager} import com.cloudera.livy.sessions.SessionManager.SESSION_RECOVERY_MODE_OFF import com.cloudera.livy.utils.LivySparkUtils._ import com.cloudera.livy.utils.SparkYarnApp @@ -104,6 +104,7 @@ class LivyServer extends Logging { StateStore.init(livyConf) val sessionStore = new SessionStore(livyConf) val batchSessionManager = new BatchSessionManager(livyConf, sessionStore) + val interactiveSessionManager = new InteractiveSessionManager(livyConf, sessionStore) server = new WebServer(livyConf, host, port) server.context.setResourceBase("src/main/com/cloudera/livy/server") @@ -125,7 +126,9 @@ class LivyServer extends Logging { val context = sce.getServletContext() context.initParameters(org.scalatra.EnvironmentKey) = livyConf.get(ENVIRONMENT) - mount(context, new InteractiveSessionServlet(livyConf), "/sessions/*") + val interactiveServlet = + new InteractiveSessionServlet(interactiveSessionManager, sessionStore, livyConf) + mount(context, interactiveServlet, "/sessions/*") val batchServlet = new BatchSessionServlet(batchSessionManager, sessionStore, livyConf) mount(context, batchServlet, "/batches/*") diff --git a/server/src/main/scala/com/cloudera/livy/server/SessionServlet.scala b/server/src/main/scala/com/cloudera/livy/server/SessionServlet.scala index d7c328460..2e8bbd420 100644 --- a/server/src/main/scala/com/cloudera/livy/server/SessionServlet.scala +++ b/server/src/main/scala/com/cloudera/livy/server/SessionServlet.scala @@ -27,6 +27,7 @@ import org.scalatra._ import com.cloudera.livy.{LivyConf, Logging} import com.cloudera.livy.sessions.{Session, SessionManager} +import com.cloudera.livy.sessions.Session.RecoveryMetadata object SessionServlet extends Logging @@ -37,8 +38,8 @@ object SessionServlet extends Logging * Type parameters: * S: the session type */ -abstract class SessionServlet[S <: Session]( - private[livy] val sessionManager: SessionManager[S], +abstract class SessionServlet[S <: Session, R <: RecoveryMetadata]( + private[livy] val sessionManager: SessionManager[S, R], livyConf: LivyConf) extends JsonServlet with ApiVersioningSupport diff --git a/server/src/main/scala/com/cloudera/livy/server/batch/BatchSessionServlet.scala b/server/src/main/scala/com/cloudera/livy/server/batch/BatchSessionServlet.scala index 8f10fd5bb..f14554207 100644 --- a/server/src/main/scala/com/cloudera/livy/server/batch/BatchSessionServlet.scala +++ b/server/src/main/scala/com/cloudera/livy/server/batch/BatchSessionServlet.scala @@ -23,7 +23,7 @@ import javax.servlet.http.HttpServletRequest import com.cloudera.livy.LivyConf import com.cloudera.livy.server.SessionServlet import com.cloudera.livy.server.recovery.SessionStore -import com.cloudera.livy.sessions.SessionManager +import com.cloudera.livy.sessions.BatchSessionManager import com.cloudera.livy.utils.AppInfo case class BatchSessionView( @@ -34,10 +34,10 @@ case class BatchSessionView( log: Seq[String]) class BatchSessionServlet( - sessionManager: SessionManager[BatchSession], + sessionManager: BatchSessionManager, sessionStore: SessionStore, livyConf: LivyConf) - extends SessionServlet[BatchSession](sessionManager, livyConf) + extends SessionServlet(sessionManager, livyConf) { override protected def createSession(req: HttpServletRequest): BatchSession = { diff --git a/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala index 44396c92b..94f66fcec 100644 --- a/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala +++ b/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala @@ -30,54 +30,228 @@ import scala.collection.mutable import scala.concurrent.{Future, _} import scala.util.{Failure, Random, Success, Try} +import com.fasterxml.jackson.annotation.JsonIgnoreProperties import org.apache.spark.launcher.SparkLauncher import org.json4s._ -import org.json4s.{DefaultFormats, Formats, JValue} import org.json4s.JsonAST.JString import org.json4s.jackson.JsonMethods._ import com.cloudera.livy._ import com.cloudera.livy.client.common.HttpMessages._ import com.cloudera.livy.rsc.{PingJob, RSCClient, RSCConf} +import com.cloudera.livy.server.recovery.SessionStore import com.cloudera.livy.sessions._ +import com.cloudera.livy.sessions.Session._ +import com.cloudera.livy.sessions.SessionState.Dead import com.cloudera.livy.utils.{AppInfo, LivySparkUtils, SparkApp, SparkAppListener} -object InteractiveSession { - val LivyReplJars = "livy.repl.jars" - val SparkYarnIsPython = "spark.yarn.isPython" -} - -class InteractiveSession( +@JsonIgnoreProperties(ignoreUnknown = true) +case class InteractiveRecoveryMetadata( id: Int, + appId: Option[String], + appTag: String, + kind: Kind, owner: String, - override val proxyUser: Option[String], - livyConf: LivyConf, - request: CreateInteractiveRequest, - mockApp: Option[SparkApp] = None) // For unit test. - extends Session(id, owner, livyConf) - with SparkAppListener { - - import Session._ - import InteractiveSession._ - - private implicit def jsonFormats: Formats = DefaultFormats + proxyUser: Option[String], + rscDriverUri: Option[URI], + version: Int = 1) + extends RecoveryMetadata + +object InteractiveSession extends Logging { + private[interactive] val LIVY_REPL_JARS = "livy.repl.jars" + private[interactive] val SPARK_YARN_IS_PYTHON = "spark.yarn.isPython" + + val RECOVERY_SESSION_TYPE = "interactive" + + def create( + id: Int, + owner: String, + proxyUser: Option[String], + livyConf: LivyConf, + request: CreateInteractiveRequest, + sessionStore: SessionStore, + mockApp: Option[SparkApp] = None, + mockClient: Option[RSCClient] = None): InteractiveSession = { + val appTag = s"livy-session-$id-${Random.alphanumeric.take(8).mkString}" + + val client = mockClient.orElse { + val conf = SparkApp.prepareSparkConf(appTag, livyConf, prepareConf( + request.conf, request.jars, request.files, request.archives, request.pyFiles, livyConf)) + + val builderProperties = prepareBuilderProp(conf, request.kind, livyConf) + + val userOpts: Map[String, Option[String]] = Map( + "spark.driver.cores" -> request.driverCores.map(_.toString), + SparkLauncher.DRIVER_MEMORY -> request.driverMemory.map(_.toString), + SparkLauncher.EXECUTOR_CORES -> request.executorCores.map(_.toString), + SparkLauncher.EXECUTOR_MEMORY -> request.executorMemory.map(_.toString), + "spark.executor.instances" -> request.numExecutors.map(_.toString) + ) + + userOpts.foreach { case (key, opt) => + opt.foreach { value => builderProperties.put(key, value) } + } - private var _state: SessionState = SessionState.Starting() + info(s"Creating LivyClient for sessionId: $id") + val builder = new LivyClientBuilder() + .setAll(builderProperties.asJava) + .setConf("spark.app.name", s"livy-session-$id") + .setConf("livy.client.sessionId", id.toString) + .setConf(RSCConf.Entry.DRIVER_CLASS.key(), "com.cloudera.livy.repl.ReplDriver") + .setConf(RSCConf.Entry.PROXY_USER.key(), proxyUser.orNull) + .setURI(new URI("rsc:/")) - private val operations = mutable.Map[Long, String]() - private val operationCounter = new AtomicLong(0) + Option(builder.build().asInstanceOf[RSCClient]) + } - val kind = request.kind + new InteractiveSession( + id, + None, + appTag, + client, + SessionState.Starting(), + request.kind, + livyConf, + owner, + proxyUser, + sessionStore, + mockApp) + } - private val (client: RSCClient, app: Option[SparkApp]) = { - val uniqueAppTag = s"livy-session-$id-${Random.alphanumeric.take(8).mkString}" + def recover( + metadata: InteractiveRecoveryMetadata, + livyConf: LivyConf, + sessionStore: SessionStore, + mockApp: Option[SparkApp] = None, + mockClient: Option[RSCClient] = None): InteractiveSession = { + val client = mockClient.orElse(metadata.rscDriverUri.map { uri => + val builder = new LivyClientBuilder().setURI(uri) + builder.build().asInstanceOf[RSCClient] + }) + + new InteractiveSession( + metadata.id, + metadata.appId, + metadata.appTag, + client, + SessionState.Recovering(), + metadata.kind, + livyConf, + metadata.owner, + metadata.proxyUser, + sessionStore, + mockApp) + } - val conf = SparkApp.prepareSparkConf(uniqueAppTag, livyConf, prepareConf( - request.conf, request.jars, request.files, request.archives, request.pyFiles, livyConf)) + private def prepareBuilderProp( + conf: Map[String, String], + kind: Kind, + livyConf: LivyConf): mutable.Map[String, String] = { val builderProperties = mutable.Map[String, String]() builderProperties ++= conf + def livyJars(livyConf: LivyConf, scalaVersion: String): List[String] = { + Option(livyConf.get(LIVY_REPL_JARS)).map(_.split(",").toList).getOrElse { + val home = sys.env("LIVY_HOME") + val jars = Option(new File(home, s"repl_$scalaVersion-jars")) + .filter(_.isDirectory()) + .getOrElse(new File(home, s"repl/scala-$scalaVersion/target/jars")) + require(jars.isDirectory(), "Cannot find Livy REPL jars.") + jars.listFiles().map(_.getAbsolutePath()).toList + } + } + + def findSparkRArchive(): Option[String] = { + Option(livyConf.get(RSCConf.Entry.SPARKR_PACKAGE.key())).orElse { + sys.env.get("SPARK_HOME").map { case sparkHome => + val path = Seq(sparkHome, "R", "lib", "sparkr.zip").mkString(File.separator) + val rArchivesFile = new File(path) + require(rArchivesFile.exists(), "sparkr.zip not found; cannot run sparkr application.") + rArchivesFile.getAbsolutePath() + } + } + } + + def datanucleusJars(livyConf: LivyConf, sparkMajorVersion: Int): Seq[String] = { + if (sys.env.getOrElse("LIVY_INTEGRATION_TEST", "false").toBoolean) { + // datanucleus jars has already been in classpath in integration test + Seq.empty + } else { + val sparkHome = livyConf.sparkHome().get + val libdir = sparkMajorVersion match { + case 1 => + if (new File(sparkHome, "RELEASE").isFile) { + new File(sparkHome, "lib") + } else { + new File(sparkHome, "lib_managed/jars") + } + case 2 => + if (new File(sparkHome, "RELEASE").isFile) { + new File(sparkHome, "jars") + } else if (new File(sparkHome, "assembly/target/scala-2.11/jars").isDirectory) { + new File(sparkHome, "assembly/target/scala-2.11/jars") + } else { + new File(sparkHome, "assembly/target/scala-2.10/jars") + } + case v => + throw new RuntimeException("Unsupported spark major version:" + sparkMajorVersion) + } + val jars = if (!libdir.isDirectory) { + Seq.empty[String] + } else { + libdir.listFiles().filter(_.getName.startsWith("datanucleus-")) + .map(_.getAbsolutePath).toSeq + } + if (jars.isEmpty) { + warn("datanucleus jars can not be found") + } + jars + } + } + + /** + * Look for hive-site.xml (for now just ignore spark.files defined in spark-defaults.conf) + * 1. First look for hive-site.xml in user request + * 2. Then look for that under classpath + * @param livyConf + * @return (hive-site.xml path, whether it is provided by user) + */ + def hiveSiteFile(sparkFiles: Array[String], livyConf: LivyConf): (Option[File], Boolean) = { + if (sparkFiles.exists(_.split("/").last == "hive-site.xml")) { + (None, true) + } else { + val hiveSiteURL = getClass.getResource("/hive-site.xml") + if (hiveSiteURL != null && hiveSiteURL.getProtocol == "file") { + (Some(new File(hiveSiteURL.toURI)), false) + } else { + (None, false) + } + } + } + + def findPySparkArchives(): Seq[String] = { + Option(livyConf.get(RSCConf.Entry.PYSPARK_ARCHIVES)) + .map(_.split(",").toSeq) + .getOrElse { + sys.env.get("SPARK_HOME") .map { case sparkHome => + val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator) + val pyArchivesFile = new File(pyLibPath, "pyspark.zip") + require(pyArchivesFile.exists(), + "pyspark.zip not found; cannot run pyspark application in YARN mode.") + + val py4jFile = Files.newDirectoryStream(Paths.get(pyLibPath), "py4j-*-src.zip") + .iterator() + .next() + .toFile + + require(py4jFile.exists(), + "py4j-*-src.zip not found; cannot run pyspark application in YARN mode.") + Seq(pyArchivesFile.getAbsolutePath, py4jFile.getAbsolutePath) + }.getOrElse(Seq()) + } + } + def mergeConfList(list: Seq[String], key: String): Unit = { if (list.nonEmpty) { builderProperties.get(key) match { @@ -111,7 +285,7 @@ class InteractiveSession( case PySpark() | PySpark3() => val pySparkFiles = if (!LivyConf.TEST_MODE) findPySparkArchives() else Nil mergeConfList(pySparkFiles, LivyConf.SPARK_PY_FILES) - builderProperties.put(SparkYarnIsPython, "true") + builderProperties.put(SPARK_YARN_IS_PYTHON, "true") case SparkR() => val sparkRArchive = if (!LivyConf.TEST_MODE) findSparkRArchive() else None sparkRArchive.foreach { archive => @@ -145,83 +319,100 @@ class InteractiveSession( mergeHiveSiteAndHiveDeps(sparkMajorVersion) } - val userOpts: Map[String, Option[String]] = Map( - "spark.driver.cores" -> request.driverCores.map(_.toString), - SparkLauncher.DRIVER_MEMORY -> request.driverMemory.map(_.toString), - SparkLauncher.EXECUTOR_CORES -> request.executorCores.map(_.toString), - SparkLauncher.EXECUTOR_MEMORY -> request.executorMemory.map(_.toString), - "spark.executor.instances" -> request.numExecutors.map(_.toString) - ) - - userOpts.foreach { case (key, opt) => - opt.foreach { value => builderProperties.put(key, value) } - } - - info(s"Creating LivyClient for sessionId: $id") - val builder = new LivyClientBuilder() - .setAll(builderProperties.asJava) - .setConf("spark.app.name", s"livy-session-$id") - .setConf("livy.client.sessionId", id.toString) - .setConf(RSCConf.Entry.DRIVER_CLASS.key(), "com.cloudera.livy.repl.ReplDriver") - .setConf(RSCConf.Entry.PROXY_USER.key(), proxyUser.orNull) - .setURI(new URI("rsc:/")) - val client = builder.build().asInstanceOf[RSCClient] - - val app = mockApp.orElse { - if (livyConf.isRunningOnYarn()) { - // When Livy is running with YARN, SparkYarnApp can provide better YARN integration. - // (e.g. Reflect YARN application state to session state). - Option(SparkApp.create(uniqueAppTag, None, None, livyConf, Some(this))) - } else { - // When Livy is running with other cluster manager, SparkApp doesn't provide any additional - // benefit over controlling RSCDriver using RSCClient. Don't use it. - None - } - } - (client, app) + builderProperties } +} - // Send a dummy job that will return once the client is ready to be used, and set the - // state to "idle" at that point. - client.submit(new PingJob()).addListener(new JobHandle.Listener[Void]() { - override def onJobQueued(job: JobHandle[Void]): Unit = { } - override def onJobStarted(job: JobHandle[Void]): Unit = { } +class InteractiveSession( + id: Int, + appIdHint: Option[String], + appTag: String, + client: Option[RSCClient], + initialState: SessionState, + val kind: Kind, + livyConf: LivyConf, + owner: String, + override val proxyUser: Option[String], + sessionStore: SessionStore, + mockApp: Option[SparkApp]) // For unit test. + extends Session(id, owner, livyConf) + with SparkAppListener { - override def onJobCancelled(job: JobHandle[Void]): Unit = errorOut() + import InteractiveSession._ - override def onJobFailed(job: JobHandle[Void], cause: Throwable): Unit = errorOut() + private var _state: SessionState = initialState - override def onJobSucceeded(job: JobHandle[Void], result: Void): Unit = { - transition(SessionState.Idle()) + private val operations = mutable.Map[Long, String]() + private val operationCounter = new AtomicLong(0) + private var rscDriverUri: Option[URI] = None + private var sessionLog: IndexedSeq[String] = IndexedSeq.empty + private val sessionSaveLock = new Object() + + _appId = appIdHint + sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata) + + private val app = mockApp.orElse { + if (livyConf.isRunningOnYarn()) { + // When Livy is running with YARN, SparkYarnApp can provide better YARN integration. + // (e.g. Reflect YARN application state to session state). + Option(SparkApp.create(appTag, appId, None, livyConf, Some(this))) + } else { + // When Livy is running with other cluster manager, SparkApp doesn't provide any + // additional benefit over controlling RSCDriver using RSCClient. Don't use it. + None } + } - private def errorOut(): Unit = { - // Other code might call stop() to close the RPC channel. When RPC channel is closing, - // this callback might be triggered. Check and don't call stop() to avoid nested called - // if the session is already shutting down. - if (_state != SessionState.ShuttingDown()) { - transition(SessionState.Error()) - stop() + if (client.isEmpty) { + transition(Dead()) + val msg = s"Cannot recover interactive session $id because its RSCDriver URI is unknown." + info(msg) + sessionLog = IndexedSeq(msg) + } else { + // Send a dummy job that will return once the client is ready to be used, and set the + // state to "idle" at that point. + client.get.submit(new PingJob()).addListener(new JobHandle.Listener[Void]() { + override def onJobQueued(job: JobHandle[Void]): Unit = { } + override def onJobStarted(job: JobHandle[Void]): Unit = { } + + override def onJobCancelled(job: JobHandle[Void]): Unit = errorOut() + + override def onJobFailed(job: JobHandle[Void], cause: Throwable): Unit = errorOut() + + override def onJobSucceeded(job: JobHandle[Void], result: Void): Unit = { + rscDriverUri = Option(client.get.getServerUri.get()) + sessionSaveLock.synchronized { + sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata) + } + transition(SessionState.Idle()) } - } - }) + private def errorOut(): Unit = { + // Other code might call stop() to close the RPC channel. When RPC channel is closing, + // this callback might be triggered. Check and don't call stop() to avoid nested called + // if the session is already shutting down. + if (_state != SessionState.ShuttingDown()) { + transition(SessionState.Error()) + stop() + } + } + }) + } private[this] var _executedStatements = 0 private[this] var _statements = IndexedSeq[Statement]() - override def logLines(): IndexedSeq[String] = app.map(_.log()).getOrElse(IndexedSeq.empty) + override def logLines(): IndexedSeq[String] = app.map(_.log()).getOrElse(sessionLog) - override def recoveryMetadata: RecoveryMetadata = { - throw new NotImplementedError("TODO") - } + override def recoveryMetadata: RecoveryMetadata = + InteractiveRecoveryMetadata(id, appId, appTag, kind, owner, proxyUser, rscDriverUri) override def state: SessionState = _state override def stopSession(): Unit = { try { transition(SessionState.ShuttingDown()) - client.stop(true) + client.foreach { _.stop(true) } } catch { case _: Exception => app.foreach { @@ -245,7 +436,7 @@ class InteractiveSession( recordActivity() val future = Future { - val id = client.submitReplCode(content.code) + val id = client.get.submitReplCode(content.code) waitForStatement(id) } @@ -274,31 +465,36 @@ class InteractiveSession( } def addFile(uri: URI): Unit = { + ensureRunning() recordActivity() - client.addFile(resolveURI(uri, livyConf)).get() + client.get.addFile(resolveURI(uri, livyConf)).get() } def addJar(uri: URI): Unit = { + ensureRunning() recordActivity() - client.addJar(resolveURI(uri, livyConf)).get() + client.get.addJar(resolveURI(uri, livyConf)).get() } def jobStatus(id: Long): Any = { + ensureRunning() val clientJobId = operations(id) recordActivity() // TODO: don't block indefinitely? - val status = client.getBypassJobStatus(clientJobId).get() + val status = client.get.getBypassJobStatus(clientJobId).get() new JobStatus(id, status.state, status.result, status.error) } def cancelJob(id: Long): Unit = { + ensureRunning() recordActivity() - operations.remove(id).foreach { client.cancel } + operations.remove(id).foreach { client.get.cancel } } @tailrec private def waitForStatement(id: String): JValue = { - Try(client.getReplJobResult(id).get()) match { + ensureRunning() + Try(client.get.getReplJobResult(id).get()) match { case Success(null) => Thread.sleep(1000) waitForStatement(id) @@ -309,7 +505,7 @@ class InteractiveSession( // it's still running. result \ "status" match { case JString("error") => - val state = client.getReplState().get() match { + val state = client.get.getReplState().get() match { case "error" => SessionState.Error() case _ => SessionState.Idle() } @@ -327,107 +523,6 @@ class InteractiveSession( } } - private def livyJars(livyConf: LivyConf, scalaVersion: String): List[String] = { - Option(livyConf.get(LivyReplJars)).map(_.split(",").toList).getOrElse { - val home = sys.env("LIVY_HOME") - val jars = Option(new File(home, s"repl_$scalaVersion-jars")) - .filter(_.isDirectory()) - .getOrElse(new File(home, s"repl/scala-$scalaVersion/target/jars")) - require(jars.isDirectory(), "Cannot find Livy REPL jars.") - jars.listFiles().map(_.getAbsolutePath()).toList - } - } - - private def findSparkRArchive(): Option[String] = { - Option(livyConf.get(RSCConf.Entry.SPARKR_PACKAGE.key())).orElse { - sys.env.get("SPARK_HOME").map { case sparkHome => - val path = Seq(sparkHome, "R", "lib", "sparkr.zip").mkString(File.separator) - val rArchivesFile = new File(path) - require(rArchivesFile.exists(), "sparkr.zip not found; cannot run sparkr application.") - rArchivesFile.getAbsolutePath() - } - } - } - - private def datanucleusJars(livyConf: LivyConf, sparkMajorVersion: Int): Seq[String] = { - if (sys.env.getOrElse("LIVY_INTEGRATION_TEST", "false").toBoolean) { - // datanucleus jars has already been in classpath in integration test - Seq.empty - } else { - val sparkHome = livyConf.sparkHome().get - val libdir = sparkMajorVersion match { - case 1 => - if (new File(sparkHome, "RELEASE").isFile) { - new File(sparkHome, "lib") - } else { - new File(sparkHome, "lib_managed/jars") - } - case 2 => - if (new File(sparkHome, "RELEASE").isFile) { - new File(sparkHome, "jars") - } else if (new File(sparkHome, "assembly/target/scala-2.11/jars").isDirectory) { - new File(sparkHome, "assembly/target/scala-2.11/jars") - } else { - new File(sparkHome, "assembly/target/scala-2.10/jars") - } - case v => throw new RuntimeException("Unsupported spark major version:" + sparkMajorVersion) - } - val jars = if (!libdir.isDirectory) { - Seq.empty[String] - } else { - libdir.listFiles().filter(_.getName.startsWith("datanucleus-")) - .map(_.getAbsolutePath).toSeq - } - if (jars.isEmpty) { - warn("datanucleus jars can not be found") - } - jars - } - } - - /** - * Look for hive-site.xml (for now just ignore spark.files defined in spark-defaults.conf) - * 1. First look for hive-site.xml in user request - * 2. Then look for that under classpath - * @param livyConf - * @return (hive-site.xml path, whether it is provided by user) - */ - private def hiveSiteFile(sparkFiles: Array[String], - livyConf: LivyConf): (Option[File], Boolean) = { - if (sparkFiles.exists(_.split("/").last == "hive-site.xml")) { - (None, true) - } else { - val hiveSiteURL = getClass.getResource("/hive-site.xml") - if (hiveSiteURL != null && hiveSiteURL.getProtocol == "file") { - (Some(new File(hiveSiteURL.toURI)), false) - } else { - (None, false) - } - } - } - - private def findPySparkArchives(): Seq[String] = { - Option(livyConf.get(RSCConf.Entry.PYSPARK_ARCHIVES)) - .map(_.split(",").toSeq) - .getOrElse { - sys.env.get("SPARK_HOME") .map { case sparkHome => - val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator) - val pyArchivesFile = new File(pyLibPath, "pyspark.zip") - require(pyArchivesFile.exists(), - "pyspark.zip not found; cannot run pyspark application in YARN mode.") - - val py4jFile = Files.newDirectoryStream(Paths.get(pyLibPath), "py4j-*-src.zip") - .iterator() - .next() - .toFile - - require(py4jFile.exists(), - "py4j-*-src.zip not found; cannot run pyspark application in YARN mode.") - Seq(pyArchivesFile.getAbsolutePath, py4jFile.getAbsolutePath) - }.getOrElse(Seq()) - } - } - private def transition(state: SessionState) = synchronized { // When a statement returns an error, the session should transit to error state. // If the session crashed because of the error, the session should instead go to dead state. @@ -450,7 +545,7 @@ class InteractiveSession( private def performOperation(job: Array[Byte], sync: Boolean): Long = { ensureRunning() recordActivity() - val future = client.bypass(ByteBuffer.wrap(job), sync) + val future = client.get.bypass(ByteBuffer.wrap(job), sync) val opId = operationCounter.incrementAndGet() operations(opId) = future opId @@ -458,6 +553,9 @@ class InteractiveSession( override def appIdKnown(appId: String): Unit = { _appId = Option(appId) + sessionSaveLock.synchronized { + sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata) + } } override def stateChanged(oldState: SparkApp.State, newState: SparkApp.State): Unit = { diff --git a/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSessionServlet.scala b/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSessionServlet.scala index 6f58ae72e..26f1dcca9 100644 --- a/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSessionServlet.scala +++ b/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSessionServlet.scala @@ -34,14 +34,16 @@ import com.cloudera.livy.{ExecuteRequest, JobHandle, LivyConf, Logging} import com.cloudera.livy.client.common.HttpMessages import com.cloudera.livy.client.common.HttpMessages._ import com.cloudera.livy.server.SessionServlet +import com.cloudera.livy.server.recovery.SessionStore import com.cloudera.livy.sessions._ object InteractiveSessionServlet extends Logging -class InteractiveSessionServlet(livyConf: LivyConf) - extends SessionServlet[InteractiveSession]( - new SessionManager[InteractiveSession](livyConf), - livyConf) +class InteractiveSessionServlet( + sessionManager: InteractiveSessionManager, + sessionStore: SessionStore, + livyConf: LivyConf) + extends SessionServlet(sessionManager, livyConf) with FileUploadSupport { @@ -51,8 +53,13 @@ class InteractiveSessionServlet(livyConf: LivyConf) override protected def createSession(req: HttpServletRequest): InteractiveSession = { val createRequest = bodyAs[CreateInteractiveRequest](req) val proxyUser = checkImpersonation(createRequest.proxyUser, req) - new InteractiveSession(sessionManager.nextId(), remoteUser(req), proxyUser, livyConf, - createRequest) + InteractiveSession.create( + sessionManager.nextId(), + remoteUser(req), + proxyUser, + livyConf, + createRequest, + sessionStore) } override protected[interactive] def clientSessionView( diff --git a/server/src/main/scala/com/cloudera/livy/server/recovery/ZooKeeperStateStore.scala b/server/src/main/scala/com/cloudera/livy/server/recovery/ZooKeeperStateStore.scala index 0f5292aa1..883383590 100644 --- a/server/src/main/scala/com/cloudera/livy/server/recovery/ZooKeeperStateStore.scala +++ b/server/src/main/scala/com/cloudera/livy/server/recovery/ZooKeeperStateStore.scala @@ -23,6 +23,7 @@ import scala.reflect.ClassTag import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} import org.apache.curator.framework.api.UnhandledErrorListener import org.apache.curator.retry.RetryNTimes +import org.apache.zookeeper.KeeperException.NoNodeException import com.cloudera.livy.{LivyConf, Logging} import com.cloudera.livy.LivyConf.Entry @@ -105,7 +106,11 @@ class ZooKeeperStateStore( } override def remove(key: String): Unit = { - curatorClient.delete().guaranteed().forPath(prefixKey(key)) + try { + curatorClient.delete().guaranteed().forPath(prefixKey(key)) + } catch { + case _: NoNodeException => + } } private def prefixKey(key: String) = s"/$zkKeyPrefix/$key" diff --git a/server/src/main/scala/com/cloudera/livy/sessions/SessionManager.scala b/server/src/main/scala/com/cloudera/livy/sessions/SessionManager.scala index 39b8d634c..ac0561fb6 100644 --- a/server/src/main/scala/com/cloudera/livy/sessions/SessionManager.scala +++ b/server/src/main/scala/com/cloudera/livy/sessions/SessionManager.scala @@ -24,10 +24,14 @@ import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration.Duration +import scala.reflect.ClassTag +import scala.util.control.NonFatal import com.cloudera.livy.{LivyConf, Logging} import com.cloudera.livy.server.batch.{BatchRecoveryMetadata, BatchSession} +import com.cloudera.livy.server.interactive.{InteractiveRecoveryMetadata, InteractiveSession} import com.cloudera.livy.server.recovery.SessionStore +import com.cloudera.livy.sessions.Session.RecoveryMetadata object SessionManager { val SESSION_RECOVERY_MODE_OFF = "off" @@ -35,66 +39,34 @@ object SessionManager { val SESSION_TIMEOUT = LivyConf.Entry("livy.server.session.timeout", "1h") } -// TODO Replace SessionManager with this class when interactive sessions support recovery. class BatchSessionManager( livyConf: LivyConf, sessionStore: SessionStore, mockSessions: Option[Seq[BatchSession]] = None) - extends SessionManager[BatchSession](livyConf) { + extends SessionManager[BatchSession, BatchRecoveryMetadata] ( + livyConf, BatchSession.recover(_, livyConf, sessionStore), sessionStore, "batch", mockSessions) + +class InteractiveSessionManager( + livyConf: LivyConf, + sessionStore: SessionStore, + mockSessions: Option[Seq[InteractiveSession]] = None) + extends SessionManager[InteractiveSession, InteractiveRecoveryMetadata] ( + livyConf, + InteractiveSession.recover(_, livyConf, sessionStore), + sessionStore, + "interactive", + mockSessions) + +class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag]( + livyConf: LivyConf, + sessionRecovery: R => S, + sessionStore: SessionStore, + sessionType: String, + mockSessions: Option[Seq[S]] = None) + extends Logging { import SessionManager._ - private val sessionType: String = "batch" - - mockSessions.getOrElse(recover()).foreach(register) - - override def nextId(): Int = synchronized { - val id = idCounter.getAndIncrement() - sessionStore.saveNextSessionId(sessionType, idCounter.get()) - id - } - - override def delete(session: BatchSession): Future[Unit] = { - session.stop().map { case _ => - sessionStore.remove(sessionType, session.id) - synchronized { - sessions.remove(session.id) - } - } - } - - override def shutdown(): Unit = { - val recoveryEnabled = livyConf.get(LivyConf.RECOVERY_MODE) != SESSION_RECOVERY_MODE_OFF - if (!recoveryEnabled) { - super.shutdown() - } - } - - private def recover(): Seq[BatchSession] = { - // Recover next session id from state store and create SessionManager. - idCounter.set(sessionStore.getNextSessionId(sessionType)) - - // Retrieve session recovery metadata from state store. - val sessionMetadata = sessionStore.getAllSessions[BatchRecoveryMetadata](sessionType) - - // Recover session from session recovery metadata. - val recoveredSessions = sessionMetadata.flatMap(_.toOption).map( - BatchSession.recover(_, livyConf, sessionStore)) - - info(s"Recovered ${recoveredSessions.length} $sessionType sessions." + - s" Next session id: $idCounter") - - // Print recovery error. - val recoveryFailure = sessionMetadata.filter(_.isFailure).map(_.failed.get) - recoveryFailure.foreach(ex => error(ex.getMessage, ex.getCause)) - - recoveredSessions - } -} - -class SessionManager[S <: Session](private val livyConf: LivyConf) - extends Logging { - protected implicit def executor: ExecutionContext = ExecutionContext.global protected[this] final val idCounter = new AtomicInteger(0) @@ -103,9 +75,14 @@ class SessionManager[S <: Session](private val livyConf: LivyConf) private[this] final val sessionTimeout = TimeUnit.MILLISECONDS.toNanos(livyConf.getTimeAsMs(SessionManager.SESSION_TIMEOUT)) + mockSessions.getOrElse(recover()).foreach(register) new GarbageCollector().start() - def nextId(): Int = idCounter.getAndIncrement() + def nextId(): Int = synchronized { + val id = idCounter.getAndIncrement() + sessionStore.saveNextSessionId(sessionType, idCounter.get()) + id + } def register(session: S): S = { info(s"Registering new session ${session.id}") @@ -127,15 +104,25 @@ class SessionManager[S <: Session](private val livyConf: LivyConf) def delete(session: S): Future[Unit] = { session.stop().map { case _ => - synchronized { - sessions.remove(session.id) + try { + sessionStore.remove(sessionType, session.id) + synchronized { + sessions.remove(session.id) + } + } catch { + case NonFatal(e) => + error("Exception was thrown during stop session:", e) + throw e } } } def shutdown(): Unit = { - sessions.values.map(_.stop).foreach { future => - Await.ready(future, Duration.Inf) + val recoveryEnabled = livyConf.get(LivyConf.RECOVERY_MODE) != SESSION_RECOVERY_MODE_OFF + if (!recoveryEnabled) { + sessions.values.map(_.stop).foreach { future => + Await.ready(future, Duration.Inf) + } } } @@ -148,6 +135,26 @@ class SessionManager[S <: Session](private val livyConf: LivyConf) Future.sequence(all().filter(expired).map(delete)) } + private def recover(): Seq[S] = { + // Recover next session id from state store and create SessionManager. + idCounter.set(sessionStore.getNextSessionId(sessionType)) + + // Retrieve session recovery metadata from state store. + val sessionMetadata = sessionStore.getAllSessions[R](sessionType) + + // Recover session from session recovery metadata. + val recoveredSessions = sessionMetadata.flatMap(_.toOption).map(sessionRecovery) + + info(s"Recovered ${recoveredSessions.length} $sessionType sessions." + + s" Next session id: $idCounter") + + // Print recovery error. + val recoveryFailure = sessionMetadata.filter(_.isFailure).map(_.failed.get) + recoveryFailure.foreach(ex => error(ex.getMessage, ex.getCause)) + + recoveredSessions + } + private class GarbageCollector extends Thread("session gc thread") { setDaemon(true) diff --git a/server/src/test/scala/com/cloudera/livy/server/BaseSessionServletSpec.scala b/server/src/test/scala/com/cloudera/livy/server/BaseSessionServletSpec.scala index 3efe70d1f..86729cefa 100644 --- a/server/src/test/scala/com/cloudera/livy/server/BaseSessionServletSpec.scala +++ b/server/src/test/scala/com/cloudera/livy/server/BaseSessionServletSpec.scala @@ -24,6 +24,7 @@ import org.scalatest.BeforeAndAfterAll import com.cloudera.livy.LivyConf import com.cloudera.livy.sessions.Session +import com.cloudera.livy.sessions.Session.RecoveryMetadata object BaseSessionServletSpec { @@ -32,7 +33,7 @@ object BaseSessionServletSpec { } -abstract class BaseSessionServletSpec[S <: Session] +abstract class BaseSessionServletSpec[S <: Session, R <: RecoveryMetadata] extends BaseJsonServletSpec with BeforeAndAfterAll { @@ -62,7 +63,7 @@ abstract class BaseSessionServletSpec[S <: Session] servlet.shutdown() } - def createServlet(): SessionServlet[S] + def createServlet(): SessionServlet[S, R] protected val servlet = createServlet() @@ -73,7 +74,7 @@ abstract class BaseSessionServletSpec[S <: Session] } trait RemoteUserOverride { - this: SessionServlet[_] => + this: SessionServlet[_, _] => override protected def remoteUser(req: HttpServletRequest): String = { req.getHeader(BaseSessionServletSpec.REMOTE_USER_HEADER) diff --git a/server/src/test/scala/com/cloudera/livy/server/SessionServletSpec.scala b/server/src/test/scala/com/cloudera/livy/server/SessionServletSpec.scala index fa62ed635..73ba90c74 100644 --- a/server/src/test/scala/com/cloudera/livy/server/SessionServletSpec.scala +++ b/server/src/test/scala/com/cloudera/livy/server/SessionServletSpec.scala @@ -22,7 +22,10 @@ package com.cloudera.livy.server import javax.servlet.http.HttpServletRequest import javax.servlet.http.HttpServletResponse._ +import org.scalatest.mock.MockitoSugar.mock + import com.cloudera.livy.LivyConf +import com.cloudera.livy.server.recovery.SessionStore import com.cloudera.livy.sessions.{Session, SessionManager, SessionState} import com.cloudera.livy.sessions.Session.RecoveryMetadata @@ -52,14 +55,20 @@ object SessionServletSpec { } class SessionServletSpec - extends BaseSessionServletSpec[Session] { + extends BaseSessionServletSpec[Session, RecoveryMetadata] { import SessionServletSpec._ - override def createServlet(): SessionServlet[Session] = { + override def createServlet(): SessionServlet[Session, RecoveryMetadata] = { val conf = createConf() - val sessionManager = new SessionManager[Session](conf) - new SessionServlet[Session](sessionManager, conf) with RemoteUserOverride { + val sessionManager = new SessionManager[Session, RecoveryMetadata]( + conf, + { _ => assert(false).asInstanceOf[Session] }, + mock[SessionStore], + "test", + Some(Seq.empty)) + + new SessionServlet(sessionManager, conf) with RemoteUserOverride { override protected def createSession(req: HttpServletRequest): Session = { val params = bodyAs[Map[String, String]](req) checkImpersonation(params.get(PROXY_USER), req) diff --git a/server/src/test/scala/com/cloudera/livy/server/batch/BatchServletSpec.scala b/server/src/test/scala/com/cloudera/livy/server/batch/BatchServletSpec.scala index 0750d3e34..ae75d46d2 100644 --- a/server/src/test/scala/com/cloudera/livy/server/batch/BatchServletSpec.scala +++ b/server/src/test/scala/com/cloudera/livy/server/batch/BatchServletSpec.scala @@ -32,10 +32,10 @@ import org.scalatest.mock.MockitoSugar.mock import com.cloudera.livy.Utils import com.cloudera.livy.server.BaseSessionServletSpec import com.cloudera.livy.server.recovery.SessionStore -import com.cloudera.livy.sessions.{SessionManager, SessionState} +import com.cloudera.livy.sessions.{BatchSessionManager, SessionState} import com.cloudera.livy.utils.AppInfo -class BatchServletSpec extends BaseSessionServletSpec[BatchSession] { +class BatchServletSpec extends BaseSessionServletSpec[BatchSession, BatchRecoveryMetadata] { val script: Path = { val script = Files.createTempFile("livy-test", ".py") @@ -54,9 +54,10 @@ class BatchServletSpec extends BaseSessionServletSpec[BatchSession] { override def createServlet(): BatchSessionServlet = { val livyConf = createConf() + val sessionStore = mock[SessionStore] new BatchSessionServlet( - new SessionManager[BatchSession](livyConf), - mock[SessionStore], + new BatchSessionManager(livyConf, sessionStore, Some(Seq.empty)), + sessionStore, livyConf) } diff --git a/server/src/test/scala/com/cloudera/livy/server/interactive/BaseInteractiveServletSpec.scala b/server/src/test/scala/com/cloudera/livy/server/interactive/BaseInteractiveServletSpec.scala index 3bfe92fe7..4aa16c1ce 100644 --- a/server/src/test/scala/com/cloudera/livy/server/interactive/BaseInteractiveServletSpec.scala +++ b/server/src/test/scala/com/cloudera/livy/server/interactive/BaseInteractiveServletSpec.scala @@ -29,7 +29,8 @@ import com.cloudera.livy.rsc.RSCConf import com.cloudera.livy.server.BaseSessionServletSpec import com.cloudera.livy.sessions.{Kind, SessionKindModule, Spark} -abstract class BaseInteractiveServletSpec extends BaseSessionServletSpec[InteractiveSession] { +abstract class BaseInteractiveServletSpec + extends BaseSessionServletSpec[InteractiveSession, InteractiveRecoveryMetadata] { mapper.registerModule(new SessionKindModule()) @@ -49,7 +50,7 @@ abstract class BaseInteractiveServletSpec extends BaseSessionServletSpec[Interac } super.createConf() .set(LivyConf.SESSION_STAGING_DIR, tempDir.toURI().toString()) - .set(InteractiveSession.LivyReplJars, "") + .set(InteractiveSession.LIVY_REPL_JARS, "") .set(LivyConf.LIVY_SPARK_VERSION, "1.6.0") .set(LivyConf.LIVY_SPARK_SCALA_VERSION, "2.10.5") } diff --git a/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionServletSpec.scala b/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionServletSpec.scala index 1ef155718..8c3175610 100644 --- a/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionServletSpec.scala +++ b/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionServletSpec.scala @@ -33,8 +33,9 @@ import org.mockito.stubbing.Answer import org.scalatest.Entry import org.scalatest.mock.MockitoSugar.mock -import com.cloudera.livy.ExecuteRequest +import com.cloudera.livy.{ExecuteRequest, LivyConf} import com.cloudera.livy.client.common.HttpMessages.SessionInfo +import com.cloudera.livy.server.recovery.SessionStore import com.cloudera.livy.sessions._ import com.cloudera.livy.utils.AppInfo @@ -42,7 +43,10 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec { mapper.registerModule(new Json4sScalaModule()) - class MockInteractiveSessionServlet extends InteractiveSessionServlet(createConf()) { + class MockInteractiveSessionServlet( + sessionManager: InteractiveSessionManager, + conf: LivyConf) + extends InteractiveSessionServlet(sessionManager, mock[SessionStore], conf) { private var statements = IndexedSeq[Statement]() @@ -81,7 +85,11 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec { } - override def createServlet(): InteractiveSessionServlet = new MockInteractiveSessionServlet() + override def createServlet(): InteractiveSessionServlet = { + val conf = createConf() + val sessionManager = new InteractiveSessionManager(conf, mock[SessionStore], Some(Seq.empty)) + new MockInteractiveSessionServlet(sessionManager, conf) + } it("should setup and tear down an interactive session") { jget[Map[String, Any]]("/") { data => diff --git a/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionSpec.scala b/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionSpec.scala index 89dbb40f4..077b33024 100644 --- a/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionSpec.scala +++ b/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionSpec.scala @@ -18,26 +18,32 @@ package com.cloudera.livy.server.interactive +import java.net.URI + import scala.concurrent.Await import scala.concurrent.duration._ import scala.language.postfixOps import org.apache.spark.launcher.SparkLauncher import org.json4s.{DefaultFormats, Extraction} +import org.mockito.{Matchers => MockitoMatchers} +import org.mockito.Matchers._ +import org.mockito.Mockito.{atLeastOnce, verify, when} import org.scalatest.{BeforeAndAfterAll, FunSpec, Matchers} import org.scalatest.concurrent.Eventually._ import org.scalatest.mock.MockitoSugar.mock -import com.cloudera.livy.{ExecuteRequest, LivyBaseUnitTestSuite, LivyConf} -import com.cloudera.livy.rsc.RSCConf -import com.cloudera.livy.sessions.{PySpark, SessionState} +import com.cloudera.livy.{ExecuteRequest, JobHandle, LivyBaseUnitTestSuite, LivyConf} +import com.cloudera.livy.rsc.{PingJob, RSCClient, RSCConf} +import com.cloudera.livy.server.recovery.SessionStore +import com.cloudera.livy.sessions.{PySpark, SessionState, Spark} import com.cloudera.livy.utils.{AppInfo, SparkApp} class InteractiveSessionSpec extends FunSpec with Matchers with BeforeAndAfterAll with LivyBaseUnitTestSuite { private val livyConf = new LivyConf() - livyConf.set(InteractiveSession.LivyReplJars, "") + livyConf.set(InteractiveSession.LIVY_REPL_JARS, "") .set(LivyConf.LIVY_SPARK_VERSION, "1.6.0") .set(LivyConf.LIVY_SPARK_SCALA_VERSION, "2.10.5") @@ -45,7 +51,9 @@ class InteractiveSessionSpec extends FunSpec private var session: InteractiveSession = null - private def createSession(mockApp: Option[SparkApp] = None): InteractiveSession = { + private def createSession( + sessionStore: SessionStore = mock[SessionStore], + mockApp: Option[SparkApp] = None): InteractiveSession = { assume(sys.env.get("SPARK_HOME").isDefined, "SPARK_HOME is not set.") val req = new CreateInteractiveRequest() @@ -59,7 +67,7 @@ class InteractiveSessionSpec extends FunSpec SparkLauncher.DRIVER_EXTRA_CLASSPATH -> sys.props("java.class.path"), RSCConf.Entry.LIVY_JARS.key() -> "" ) - new InteractiveSession(0, null, None, livyConf, req, mockApp) + InteractiveSession.create(0, null, None, livyConf, req, sessionStore, mockApp) } override def afterAll(): Unit = { @@ -74,7 +82,7 @@ class InteractiveSessionSpec extends FunSpec it(desc) { assume(session != null, "No active session.") eventually(timeout(30 seconds), interval(100 millis)) { - session.state should be (SessionState.Idle()) + session.state shouldBe a[SessionState.Idle] } fn(session) } @@ -83,12 +91,13 @@ class InteractiveSessionSpec extends FunSpec describe("A spark session") { it("should start in the idle state") { session = createSession() - session.state should (equal (SessionState.Starting()) or equal (SessionState.Idle())) + session.state should (be(a[SessionState.Starting]) or be(a[SessionState.Idle])) } - it("should update appId and appInfo") { + it("should update appId and appInfo and session store") { val mockApp = mock[SparkApp] - val session = createSession(Some(mockApp)) + val sessionStore = mock[SessionStore] + val session = createSession(sessionStore, Some(mockApp)) val expectedAppId = "APPID" session.appIdKnown(expectedAppId) @@ -97,6 +106,9 @@ class InteractiveSessionSpec extends FunSpec val expectedAppInfo = AppInfo(Some("DRIVER LOG URL"), Some("SPARK UI URL")) session.infoChanged(expectedAppInfo) session.appInfo shouldEqual expectedAppInfo + + verify(sessionStore, atLeastOnce()).save( + MockitoMatchers.eq(InteractiveSession.RECOVERY_SESSION_TYPE), anyObject()) } withSession("should execute `1 + 2` == 3") { session => @@ -129,7 +141,7 @@ class InteractiveSessionSpec extends FunSpec )) result should equal (expectedResult) - session.state should equal (SessionState.Idle()) + session.state shouldBe a[SessionState.Idle] } withSession("should error out the session if the interpreter dies") { session => @@ -142,4 +154,31 @@ class InteractiveSessionSpec extends FunSpec } } + describe("recovery") { + it("should recover session") { + val conf = new LivyConf() + val sessionStore = mock[SessionStore] + val mockClient = mock[RSCClient] + when(mockClient.submit(any(classOf[PingJob]))).thenReturn(mock[JobHandle[Void]]) + val m = + InteractiveRecoveryMetadata(78, None, "appTag", Spark(), null, None, Some(URI.create(""))) + val s = InteractiveSession.recover(m, conf, sessionStore, None, Some(mockClient)) + + s.state shouldBe a[SessionState.Recovering] + + s.appIdKnown("appId") + verify(sessionStore, atLeastOnce()).save( + MockitoMatchers.eq(InteractiveSession.RECOVERY_SESSION_TYPE), anyObject()) + } + + it("should recover session to dead state if rscDriverUri is unknown") { + val conf = new LivyConf() + val sessionStore = mock[SessionStore] + val m = InteractiveRecoveryMetadata(78, Some("appId"), "appTag", Spark(), null, None, None) + val s = InteractiveSession.recover(m, conf, sessionStore, None) + + s.state shouldBe a[SessionState.Dead] + s.logLines().mkString should include("RSCDriver URI is unknown") + } + } } diff --git a/server/src/test/scala/com/cloudera/livy/server/interactive/JobApiSpec.scala b/server/src/test/scala/com/cloudera/livy/server/interactive/JobApiSpec.scala index 7f1182377..6939b35c0 100644 --- a/server/src/test/scala/com/cloudera/livy/server/interactive/JobApiSpec.scala +++ b/server/src/test/scala/com/cloudera/livy/server/interactive/JobApiSpec.scala @@ -22,24 +22,21 @@ import java.io.File import java.net.URI import java.nio.ByteBuffer import java.nio.file.{Files, Paths} -import java.util.concurrent.TimeUnit import javax.servlet.http.HttpServletResponse._ -import scala.collection.JavaConverters._ import scala.concurrent.duration._ import scala.io.Source import scala.language.postfixOps -import org.apache.hadoop.security.UserGroupInformation -import org.apache.spark.api.java.function.VoidFunction import org.scalatest.concurrent.Eventually._ +import org.scalatest.mock.MockitoSugar.mock -import com.cloudera.livy.{Job, JobContext, JobHandle} +import com.cloudera.livy.{Job, JobHandle} import com.cloudera.livy.client.common.{BufferUtils, Serializer} import com.cloudera.livy.client.common.HttpMessages._ -import com.cloudera.livy.rsc.RSCConf import com.cloudera.livy.server.RemoteUserOverride -import com.cloudera.livy.sessions.SessionState +import com.cloudera.livy.server.recovery.SessionStore +import com.cloudera.livy.sessions.{InteractiveSessionManager, SessionState} import com.cloudera.livy.test.jobs.{Echo, GetCurrentUser} class JobApiSpec extends BaseInteractiveServletSpec { @@ -49,7 +46,10 @@ class JobApiSpec extends BaseInteractiveServletSpec { private var sessionId: Int = -1 override def createServlet(): InteractiveSessionServlet = { - new InteractiveSessionServlet(createConf()) with RemoteUserOverride + val conf = createConf() + val sessionStore = mock[SessionStore] + val sessionManager = new InteractiveSessionManager(conf, sessionStore, Some(Seq.empty)) + new InteractiveSessionServlet(sessionManager, sessionStore, conf) with RemoteUserOverride } def withSessionId(desc: String)(fn: (Int) => Unit): Unit = { diff --git a/server/src/test/scala/com/cloudera/livy/sessions/SessionManagerSpec.scala b/server/src/test/scala/com/cloudera/livy/sessions/SessionManagerSpec.scala index 130003b51..8195bf8f6 100644 --- a/server/src/test/scala/com/cloudera/livy/sessions/SessionManagerSpec.scala +++ b/server/src/test/scala/com/cloudera/livy/sessions/SessionManagerSpec.scala @@ -31,13 +31,19 @@ import org.scalatest.mock.MockitoSugar.mock import com.cloudera.livy.{LivyBaseUnitTestSuite, LivyConf} import com.cloudera.livy.server.batch.{BatchRecoveryMetadata, BatchSession} import com.cloudera.livy.server.recovery.SessionStore +import com.cloudera.livy.sessions.Session.RecoveryMetadata class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuite { describe("SessionManager") { it("should garbage collect old sessions") { val livyConf = new LivyConf() livyConf.set(SessionManager.SESSION_TIMEOUT, "100ms") - val manager = new SessionManager[MockSession](livyConf) + val manager = new SessionManager[MockSession, RecoveryMetadata]( + livyConf, + { _ => assert(false).asInstanceOf[MockSession] }, + mock[SessionStore], + "test", + Some(Seq.empty)) val session = manager.register(new MockSession(manager.nextId(), null, livyConf)) manager.get(session.id).isDefined should be(true) eventually(timeout(5 seconds), interval(100 millis)) {