Skip to content

Commit

Permalink
LIVY-212. Implemented session recovery for interactive session. Only …
Browse files Browse the repository at this point in the history
…work with YARN.

livy-server can now recover interactive sessions after restart.
There are some special cases:
- If livy-server crashes while an interactive session is starting, there's a chance the session is unrecoverable, depending on timing (whether livy-repl has registered to livy-server).
- If livy-server is down longer than the value of server.idle_timeout (default: 10min), livy-repl will timeout and quit.

Note: All previous statements are lost after recovery. This will be fixed in a different commit.

Closes #208
  • Loading branch information
alex-the-man committed Oct 27, 2016
1 parent e5796de commit 5e8474e
Show file tree
Hide file tree
Showing 20 changed files with 548 additions and 311 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ import com.cloudera.livy.client.common.{BufferUtils, Serializer}
import com.cloudera.livy.client.common.HttpMessages._
import com.cloudera.livy.server.WebServer
import com.cloudera.livy.server.interactive.{InteractiveSession, InteractiveSessionServlet}
import com.cloudera.livy.sessions.{SessionState, Spark}
import com.cloudera.livy.server.recovery.SessionStore
import com.cloudera.livy.sessions.{InteractiveSessionManager, SessionState, Spark}
import com.cloudera.livy.test.jobs.Echo
import com.cloudera.livy.utils.AppInfo

Expand Down Expand Up @@ -264,7 +265,10 @@ private class HttpClientTestBootstrap extends LifeCycle {
private implicit def executor: ExecutionContext = ExecutionContext.global

override def init(context: ServletContext): Unit = {
val servlet = new InteractiveSessionServlet(new LivyConf()) {
val conf = new LivyConf()
val stateStore = mock(classOf[SessionStore])
val sessionManager = new InteractiveSessionManager(conf, stateStore, Some(Seq.empty))
val servlet = new InteractiveSessionServlet(sessionManager, stateStore, conf) {
override protected def createSession(req: HttpServletRequest): InteractiveSession = {
val session = mock(classOf[InteractiveSession])
val id = sessionManager.nextId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,11 @@ abstract class BaseIntegrationTestSuite extends FunSuite with Matchers with Befo
} catch { case NonFatal(_) => }
throw e
} finally {
try { s.stop() } catch { case NonFatal(_) => }
try {
s.stop()
} catch {
case NonFatal(e) => alert(s"Failed to stop session: $e")
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class LivyRestClient(val httpClient: AsyncHttpClient, val livyEndpoint: String)
// Keeping the original timeout to avoid slowing down local development.
eventually(timeout(t), interval(1 second)) {
val s = snapshot().state
assert(strStates.contains(s), s"Session state $s doesn't equal one of $strStates")
assert(strStates.contains(s), s"Session $id state $s doesn't equal one of $strStates")
}
}

Expand Down Expand Up @@ -145,8 +145,10 @@ class LivyRestClient(val httpClient: AsyncHttpClient, val livyEndpoint: String)
val data = output("data").asInstanceOf[Map[String, Any]]
Left(data("text/plain").asInstanceOf[String])
case Some("error") => Right(mapper.convertValue(output, classOf[StatementError]))
case Some(status) => throw new Exception(s"Unknown statement status: $status")
case None => throw new Exception(s"Unknown statement output: $newStmt")
case Some(status) =>
throw new IllegalStateException(s"Unknown statement $stmtId status: $status")
case None =>
throw new IllegalStateException(s"Unknown statement $stmtId output: $newStmt")
}
}
}
Expand All @@ -158,15 +160,15 @@ class LivyRestClient(val httpClient: AsyncHttpClient, val livyEndpoint: String)
matchStrings(result, expectedRegex)
}
case Right(error) =>
assert(false, s"Got error from statement $code: ${error.evalue}")
assert(false, s"Got error from statement $stmtId $code: ${error.evalue}")
}
}

def verifyError(
ename: String = null, evalue: String = null, stackTrace: String = null): Unit = {
result() match {
case Left(result) =>
assert(false, s"Statement `$code` expected to fail, but succeeded.")
assert(false, s"Statement $stmtId `$code` expected to fail, but succeeded.")
case Right(error) =>
val remoteStack = Option(error.stackTrace).getOrElse(Nil).mkString("\n")
Seq(error.ename -> ename, error.evalue -> evalue, remoteStack -> stackTrace).foreach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,42 @@ class InteractiveIT extends BaseIntegrationTestSuite {
}
}

test("recover interactive session") {
withNewSession(Spark()) { s =>
s.run("1").verifyResult("res0: Int = 1")

// Restart Livy.
cluster.stopLivy()
cluster.runLivy()

// Verify session still exists.
s.verifySessionIdle()
s.run("2").verifyResult("res1: Int = 2")
// TODO, verify previous statement results still exist.

s.stop()

// Restart Livy.
cluster.stopLivy()
cluster.runLivy()

// Verify deleted session doesn't show up after recovery.
s.verifySessionDoesNotExist()

// Verify new session doesn't reuse old session id.
withNewSession(Spark(), Map.empty, false) { s1 =>
s1.id should be > s.id
}
}
}

private def withNewSession[R]
(kind: Kind, sparkConf: Map[String, String] = Map.empty)
(kind: Kind, sparkConf: Map[String, String] = Map.empty, waitForIdle: Boolean = true)
(f: (LivyRestClient#InteractiveSession) => R): R = {
withSession(livyClient.startSession(kind, sparkConf)) { s =>
s.verifySessionIdle()
if (waitForIdle) {
s.verifySessionIdle()
}
f(s)
}
}
Expand Down
10 changes: 10 additions & 0 deletions rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class RSCClient implements LivyClient {
private final Promise<Rpc> driverRpc;
private final int executorGroupId;
private final EventLoopGroup eventLoopGroup;
private final Promise<URI> serverUriPromise;

private ContextInfo contextInfo;
private volatile boolean isAlive;
Expand All @@ -71,16 +72,21 @@ public class RSCClient implements LivyClient {
this.eventLoopGroup = new NioEventLoopGroup(
conf.getInt(RPC_MAX_THREADS),
Utils.newDaemonThreadFactory("RSCClient-" + executorGroupId + "-%d"));
this.serverUriPromise = ImmediateEventExecutor.INSTANCE.newPromise();

Utils.addListener(this.contextInfoPromise, new FutureListener<ContextInfo>() {
@Override
public void onSuccess(ContextInfo info) throws Exception {
connectToContext(info);
String url = String.format("rsc://%s:%s@%s:%d",
info.clientId, info.secret, info.remoteAddress, info.remotePort);
serverUriPromise.setSuccess(URI.create(url));
}

@Override
public void onFailure(Throwable error) {
connectionError(error);
serverUriPromise.setFailure(error);
}
});

Expand Down Expand Up @@ -174,6 +180,10 @@ public void onFailure(Throwable error) throws Exception {
return promise;
}

public Future<URI> getServerUri() {
return serverUriPromise;
}

@Override
public <T> JobHandle<T> submit(Job<T> job) {
return protocol.submit(job);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import com.cloudera.livy._
import com.cloudera.livy.server.batch.BatchSessionServlet
import com.cloudera.livy.server.interactive.InteractiveSessionServlet
import com.cloudera.livy.server.recovery.{SessionStore, StateStore}
import com.cloudera.livy.sessions.BatchSessionManager
import com.cloudera.livy.sessions.{BatchSessionManager, InteractiveSessionManager}
import com.cloudera.livy.sessions.SessionManager.SESSION_RECOVERY_MODE_OFF
import com.cloudera.livy.utils.LivySparkUtils._
import com.cloudera.livy.utils.SparkYarnApp
Expand Down Expand Up @@ -104,6 +104,7 @@ class LivyServer extends Logging {
StateStore.init(livyConf)
val sessionStore = new SessionStore(livyConf)
val batchSessionManager = new BatchSessionManager(livyConf, sessionStore)
val interactiveSessionManager = new InteractiveSessionManager(livyConf, sessionStore)

server = new WebServer(livyConf, host, port)
server.context.setResourceBase("src/main/com/cloudera/livy/server")
Expand All @@ -125,7 +126,9 @@ class LivyServer extends Logging {
val context = sce.getServletContext()
context.initParameters(org.scalatra.EnvironmentKey) = livyConf.get(ENVIRONMENT)

mount(context, new InteractiveSessionServlet(livyConf), "/sessions/*")
val interactiveServlet =
new InteractiveSessionServlet(interactiveSessionManager, sessionStore, livyConf)
mount(context, interactiveServlet, "/sessions/*")

val batchServlet = new BatchSessionServlet(batchSessionManager, sessionStore, livyConf)
mount(context, batchServlet, "/batches/*")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.scalatra._

import com.cloudera.livy.{LivyConf, Logging}
import com.cloudera.livy.sessions.{Session, SessionManager}
import com.cloudera.livy.sessions.Session.RecoveryMetadata

object SessionServlet extends Logging

Expand All @@ -37,8 +38,8 @@ object SessionServlet extends Logging
* Type parameters:
* S: the session type
*/
abstract class SessionServlet[S <: Session](
private[livy] val sessionManager: SessionManager[S],
abstract class SessionServlet[S <: Session, R <: RecoveryMetadata](
private[livy] val sessionManager: SessionManager[S, R],
livyConf: LivyConf)
extends JsonServlet
with ApiVersioningSupport
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import javax.servlet.http.HttpServletRequest
import com.cloudera.livy.LivyConf
import com.cloudera.livy.server.SessionServlet
import com.cloudera.livy.server.recovery.SessionStore
import com.cloudera.livy.sessions.SessionManager
import com.cloudera.livy.sessions.BatchSessionManager
import com.cloudera.livy.utils.AppInfo

case class BatchSessionView(
Expand All @@ -34,10 +34,10 @@ case class BatchSessionView(
log: Seq[String])

class BatchSessionServlet(
sessionManager: SessionManager[BatchSession],
sessionManager: BatchSessionManager,
sessionStore: SessionStore,
livyConf: LivyConf)
extends SessionServlet[BatchSession](sessionManager, livyConf)
extends SessionServlet(sessionManager, livyConf)
{

override protected def createSession(req: HttpServletRequest): BatchSession = {
Expand Down
Loading

0 comments on commit 5e8474e

Please sign in to comment.