Skip to content

Commit

Permalink
SPARK-1236 - Upgrade Jetty to 9.1.3.v20140225.
Browse files Browse the repository at this point in the history
Author: Reynold Xin <[email protected]>

Closes apache#113 from rxin/jetty9 and squashes the following commits:

867a2ce [Reynold Xin] Updated Jetty version to 9.1.3.v20140225 in Maven build file.
d7c97ca [Reynold Xin] Return the correctly bound port.
d14706f [Reynold Xin] Upgrade Jetty to 9.1.3.v20140225.
  • Loading branch information
rxin authored and pwendell committed Mar 13, 2014
1 parent 6983732 commit ca4bf8c
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 46 deletions.
21 changes: 10 additions & 11 deletions core/src/main/scala/org/apache/spark/HttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ import java.io.File

import org.eclipse.jetty.util.security.{Constraint, Password}
import org.eclipse.jetty.security.authentication.DigestAuthenticator
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService, SecurityHandler}
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService}

import org.eclipse.jetty.server.Server
import org.eclipse.jetty.server.bio.SocketConnector
import org.eclipse.jetty.server.{Server, ServerConnector}
import org.eclipse.jetty.server.handler.{DefaultHandler, HandlerList, ResourceHandler}
import org.eclipse.jetty.util.thread.QueuedThreadPool

Expand All @@ -43,24 +42,24 @@ private[spark] class ServerStateException(message: String) extends Exception(mes
*/
private[spark] class HttpServer(resourceBase: File, securityManager: SecurityManager)
extends Logging {
private var server: Server = null
private var server: Server = _
private var port: Int = -1

def start() {
if (server != null) {
throw new ServerStateException("Server is already started")
} else {
logInfo("Starting HTTP Server")
server = new Server()
val connector = new SocketConnector
connector.setMaxIdleTime(60*1000)
val threadPool = new QueuedThreadPool
threadPool.setDaemon(true)

server = new Server(threadPool)
val connector = new ServerConnector(server)
connector.setIdleTimeout(60 * 1000)
connector.setSoLingerTime(-1)
connector.setPort(0)
server.addConnector(connector)

val threadPool = new QueuedThreadPool
threadPool.setDaemon(true)
server.setThreadPool(threadPool)
val resHandler = new ResourceHandler
resHandler.setResourceBase(resourceBase.getAbsolutePath)

Expand All @@ -79,7 +78,7 @@ private[spark] class HttpServer(resourceBase: File, securityManager: SecurityMan
}

server.start()
port = server.getConnectors()(0).getLocalPort()
port = connector.getLocalPort
}
}

Expand Down
58 changes: 31 additions & 27 deletions core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.ui

import java.net.InetSocketAddress
import java.net.URL
import javax.servlet.DispatcherType
import javax.servlet.http.{HttpServlet, HttpServletResponse, HttpServletRequest}

import scala.annotation.tailrec
Expand All @@ -28,7 +29,7 @@ import scala.xml.Node
import org.json4s.JValue
import org.json4s.jackson.JsonMethods.{pretty, render}

import org.eclipse.jetty.server.{DispatcherType, Server}
import org.eclipse.jetty.server.{NetworkConnector, Server}
import org.eclipse.jetty.server.handler.HandlerList
import org.eclipse.jetty.servlet.{DefaultServlet, FilterHolder, ServletContextHandler, ServletHolder}
import org.eclipse.jetty.util.thread.QueuedThreadPool
Expand Down Expand Up @@ -60,8 +61,7 @@ private[spark] object JettyUtils extends Logging {
def createServlet[T <% AnyRef](servletParams: ServletParams[T],
securityMgr: SecurityManager): HttpServlet = {
new HttpServlet {
override def doGet(request: HttpServletRequest,
response: HttpServletResponse) {
override def doGet(request: HttpServletRequest, response: HttpServletResponse) {
if (securityMgr.checkUIViewPermissions(request.getRemoteUser())) {
response.setContentType("%s;charset=utf-8".format(servletParams.contentType))
response.setStatus(HttpServletResponse.SC_OK)
Expand All @@ -72,7 +72,7 @@ private[spark] object JettyUtils extends Logging {
response.setStatus(HttpServletResponse.SC_UNAUTHORIZED)
response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate")
response.sendError(HttpServletResponse.SC_UNAUTHORIZED,
"User is not authorized to access this page.");
"User is not authorized to access this page.")
}
}
}
Expand Down Expand Up @@ -120,26 +120,25 @@ private[spark] object JettyUtils extends Logging {

private def addFilters(handlers: Seq[ServletContextHandler], conf: SparkConf) {
val filters: Array[String] = conf.get("spark.ui.filters", "").split(',').map(_.trim())
filters.foreach {
case filter : String =>
if (!filter.isEmpty) {
logInfo("Adding filter: " + filter)
val holder : FilterHolder = new FilterHolder()
holder.setClassName(filter)
// get any parameters for each filter
val paramName = "spark." + filter + ".params"
val params = conf.get(paramName, "").split(',').map(_.trim()).toSet
params.foreach {
case param : String =>
if (!param.isEmpty) {
val parts = param.split("=")
if (parts.length == 2) holder.setInitParameter(parts(0), parts(1))
}
}
val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.ERROR,
DispatcherType.FORWARD, DispatcherType.INCLUDE, DispatcherType.REQUEST)
handlers.foreach { case(handler) => handler.addFilter(holder, "/*", enumDispatcher) }
filters.foreach { filter =>
if (!filter.isEmpty) {
logInfo("Adding filter: " + filter)
val holder : FilterHolder = new FilterHolder()
holder.setClassName(filter)
// get any parameters for each filter
val paramName = "spark." + filter + ".params"
val params = conf.get(paramName, "").split(',').map(_.trim()).toSet
params.foreach {
case param : String =>
if (!param.isEmpty) {
val parts = param.split("=")
if (parts.length == 2) holder.setInitParameter(parts(0), parts(1))
}
}
val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.ERROR,
DispatcherType.FORWARD, DispatcherType.INCLUDE, DispatcherType.REQUEST)
handlers.foreach { handler => handler.addFilter(holder, "/*", enumDispatcher) }
}
}
}

Expand All @@ -150,7 +149,10 @@ private[spark] object JettyUtils extends Logging {
* If the desired port number is contented, continues incrementing ports until a free port is
* found. Returns the chosen port and the jetty Server object.
*/
def startJettyServer(hostName: String, port: Int, handlers: Seq[ServletContextHandler],
def startJettyServer(
hostName: String,
port: Int,
handlers: Seq[ServletContextHandler],
conf: SparkConf): (Server, Int) = {

addFilters(handlers, conf)
Expand All @@ -160,16 +162,18 @@ private[spark] object JettyUtils extends Logging {
@tailrec
def connect(currentPort: Int): (Server, Int) = {
val server = new Server(new InetSocketAddress(hostName, currentPort))
val pool = new QueuedThreadPool
// Unfortunately Jetty 9 doesn't allow us to set both the thread pool and the port number in
// constructor. But fortunately the pool allocated by Jetty is always a QueuedThreadPool.
val pool = server.getThreadPool.asInstanceOf[QueuedThreadPool]
pool.setDaemon(true)
server.setThreadPool(pool)

server.setHandler(handlerList)

Try {
server.start()
} match {
case s: Success[_] =>
(server, server.getConnectors.head.getLocalPort)
(server, server.getConnectors.head.asInstanceOf[NetworkConnector].getLocalPort)
case f: Failure[_] =>
server.stop()
logInfo("Failed to create UI at port, %s. Trying again.".format(currentPort))
Expand Down
8 changes: 4 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -158,22 +158,22 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
<version>7.6.8.v20121106</version>
<version>9.1.3.v20140225</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-security</artifactId>
<version>7.6.8.v20121106</version>
<version>9.1.3.v20140225</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-plus</artifactId>
<version>7.6.8.v20121106</version>
<version>9.1.3.v20140225</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>7.6.8.v20121106</version>
<version>9.1.3.v20140225</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
Expand Down
8 changes: 4 additions & 4 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,10 @@ object SparkBuild extends Build {

libraryDependencies ++= Seq(
"io.netty" % "netty-all" % "4.0.17.Final",
"org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106",
"org.eclipse.jetty" % "jetty-util" % "7.6.8.v20121106",
"org.eclipse.jetty" % "jetty-plus" % "7.6.8.v20121106",
"org.eclipse.jetty" % "jetty-security" % "7.6.8.v20121106",
"org.eclipse.jetty" % "jetty-server" % "9.1.3.v20140225",
"org.eclipse.jetty" % "jetty-util" % "9.1.3.v20140225",
"org.eclipse.jetty" % "jetty-plus" % "9.1.3.v20140225",
"org.eclipse.jetty" % "jetty-security" % "9.1.3.v20140225",
/** Workaround for SPARK-959. Dependency used by org.eclipse.jetty. Fixed in ivy 2.3.0. */
"org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar"),
"org.scalatest" %% "scalatest" % "1.9.1" % "test",
Expand Down

0 comments on commit ca4bf8c

Please sign in to comment.