From 9d2a96b4fa95ebf3f01a88245c46664372acb598 Mon Sep 17 00:00:00 2001 From: gschiavon Date: Wed, 27 Dec 2017 07:32:53 +0100 Subject: [PATCH] Unify vault token (#116) * Vault variables Unified * Unified Token --- CHANGELOG.md | 4 + .../org/apache/spark/deploy/SparkSubmit.scala | 52 +--------- .../spark/deploy/history/HistoryServer.scala | 2 +- .../CoarseGrainedExecutorBackend.scala | 3 +- .../spark/security/ConfigSecurity.scala | 77 ++++++++------- .../org/apache/spark/security/DBConfig.scala | 6 +- .../spark/security/KerberosConfig.scala | 6 +- .../org/apache/spark/security/SSLConfig.scala | 16 ++- .../apache/spark/security/VaultHelper.scala | 97 ++++++++----------- .../spark/security/ConfigSecuritySuite.scala | 24 ----- .../deploy/rest/mesos/MesosRestServer.scala | 31 ++---- .../cluster/mesos/MesosClusterScheduler.scala | 15 +-- .../MesosCoarseGrainedSchedulerBackend.scala | 17 +++- .../cluster/mesos/MesosSchedulerUtils.scala | 17 +--- 14 files changed, 123 insertions(+), 244 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 82e7d9db1df67..8c0236db69f60 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 2.2.0.3 (upcoming) + +* Unify Vault variables + ## 2.2.0.2 (December 26, 2017) * Added mesos constraints management to spark driver diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index e298622e28b3c..63e866a80a731 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -709,18 +709,9 @@ object SparkSubmit extends CommandLineUtils { case _ => None } - val vaultProtocol = args.sparkProperties.get("spark.secret.vault.protocol") - val vaultHost = args.sparkProperties.get("spark.secret.vault.hosts") - val vaultPort = args.sparkProperties.get("spark.secret.vault.port") - - val vaultUrlParams = (vaultProtocol, vaultHost, vaultPort) - val vaultUrl = buildVaultUrl(vaultUrlParams) - lazy val vaultToken = getToken(tempToken, roleSecret, vaultUrl) - val (principal, keytab) = - if (vaultUrl.nonEmpty && vaultToken.isDefined) { - val environment = ConfigSecurity.prepareEnvironment( - Option (vaultToken.get), Option(vaultUrl)) + if (ConfigSecurity.vaultURI.isDefined) { + val environment = ConfigSecurity.prepareEnvironment val principal = environment.getOrElse("principal", args.principal) val keytab = environment.getOrElse("keytabPath", args.keytab) @@ -736,45 +727,6 @@ object SparkSubmit extends CommandLineUtils { (childArgs, childClasspath, sysProps, childMainClass, principal, keytab) } - /** - * - * @param tempToken Temporal token, either Property one or Environment one - * @param roleSecret Role and Secret ID, either Property one or Environment one - * @param vaultUrl a Vault Url protocol://vaultHost:vaultPort - * @return An option of a token - */ - private def getToken(tempToken: Option[String], - roleSecret: Option[(String, String)], - vaultUrl: String): Option[String] = { - - (tempToken, roleSecret) match { - case (Some(tempToken), _) => Some(VaultHelper.getRealToken(vaultUrl, tempToken)) - case (_, Some((role, secret))) => - Some(VaultHelper.getTokenFromAppRole(vaultUrl, role, secret)) - case _ => None - } - } - - /** - * - * @param vaultUrlParams Is composed of Vault Protocol, - * Vault Host and Vault Port - * @return a Vault Url protocol://vaultHost:vaultPort - */ - private def buildVaultUrl(vaultUrlParams: (Option[String], - Option[String], - Option[String])): String = { - - val vaultUrl = vaultUrlParams match { - case (Some(protocol), Some(hosts), Some(port)) => - s"${protocol}://${ - hosts.split(",") - .map(host => s"$host:${port}").mkString(",")}" - case _ => "" - } - vaultUrl - } - /** * Run the main method of the child class using the provided launch environment. * diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index b1c67a341b3c7..ab7cf2bf4397d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -268,7 +268,7 @@ object HistoryServer extends Logging { def main(argStrings: Array[String]): Unit = { Utils.initDaemon(log) - ConfigSecurity.prepareEnvironment() + ConfigSecurity.prepareEnvironment new HistoryServerArguments(conf, argStrings) initSecurity() val securityManager = createSecurityManager(conf) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 87e3dbc52d0c0..5a59c640fa236 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -286,8 +286,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { appId == null) { printUsageAndExit() } - ConfigSecurity.prepareEnvironment(scala.util.Try{ - VaultHelper.getRealToken(sys.env("VAULT_URI"), sys.env("VAULT_TEMP_TOKEN"))}.toOption) + ConfigSecurity.prepareEnvironment run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath) System.exit(0) diff --git a/core/src/main/scala/org/apache/spark/security/ConfigSecurity.scala b/core/src/main/scala/org/apache/spark/security/ConfigSecurity.scala index 2aef44a2e12cb..c0e599edeb090 100644 --- a/core/src/main/scala/org/apache/spark/security/ConfigSecurity.scala +++ b/core/src/main/scala/org/apache/spark/security/ConfigSecurity.scala @@ -20,42 +20,50 @@ import scala.util.{Failure, Success, Try} import org.apache.spark.internal.Logging -object ConfigSecurity extends Logging{ - - var vaultToken: Option[String] = None - val vaultUri: Option[String] = getVaultUri(sys.env.get("VAULT_PROTOCOL"), - sys.env.get("VAULT_HOSTS"), sys.env.get("VAULT_PORT")) - - def getVaultUri(vaultProtocol: Option[String], - vaultHost: Option[String], - vaultPort: Option[String]): Option[String] = { - (vaultProtocol, vaultHost, vaultPort) match { - case (Some (vaultProtocol), Some (vaultHost), Some (vaultPort) ) => - val vaultUri = s"$vaultProtocol://$vaultHost:$vaultPort" - logDebug (s"vault uri: $vaultUri found, any Vault Connection will use it") - Option (vaultUri) - case _ => - logDebug ("No Vault information found, any Vault Connection will fail") - None - } +object ConfigSecurity extends Logging { + + lazy val vaultToken: Option[String] = + + if (sys.env.get("VAULT_TOKEN").isDefined) { + logDebug("Obtaining vault token using VAULT_TOKEN") + sys.env.get("VAULT_TOKEN") + } else if (sys.env.get("VAULT_TEMP_TOKEN").isDefined) { + logDebug("Obtaining vault token using VAULT_TEMP_TOKEN") + scala.util.Try { + VaultHelper.getRealToken(sys.env.get("VAULT_TEMP_TOKEN")) + }.toOption + } else if (sys.env.get("VAULT_ROLE_ID").isDefined && sys.env.get("VAULT_SECRET_ID").isDefined) { + logDebug("Obtaining vault token using ROLE_ID and SECRET_ID") + Option(VaultHelper.getTokenFromAppRole( + sys.env("VAULT_ROLE_ID"), + sys.env("VAULT_SECRET_ID"))) + } else { + logInfo("No Vault token variables provided. Skipping Vault token retrieving") + None } - def prepareEnvironment(vaultAppToken: Option[String] = None, - vaulHost: Option[String] = None): Map[String, String] = { + lazy val vaultURI: Option[String] = { + if (sys.env.get("VAULT_PROTOCOL").isDefined + && sys.env.get("VAULT_HOSTS").isDefined + && sys.env.get("VAULT_PORT").isDefined) { + val vaultProtocol = sys.env.get("VAULT_PROTOCOL").get + val vaultHost = sys.env.get("VAULT_HOSTS").get + val vaultPort = sys.env.get("VAULT_PORT").get + Option(s"$vaultProtocol://$vaultHost:$vaultPort") + } else { + logInfo("No Vault variables provided") + None + } + } + + def prepareEnvironment: Map[String, String] = { logDebug(s"env VAR: ${sys.env.mkString("\n")}") val secretOptionsMap = ConfigSecurity.extractSecretFromEnv(sys.env) logDebug(s"secretOptionsMap: ${secretOptionsMap.mkString("\n")}") loadingConf(secretOptionsMap) - vaultToken = if (vaultAppToken.isDefined) { - vaultAppToken - } else sys.env.get("VAULT_TOKEN") - if(vaultToken.isDefined) { - require(vaultUri.isDefined, "A proper vault host is required") - logDebug(s"env VAR: ${sys.env.mkString("\n")}") - prepareEnvironment(vaultUri.get, vaultToken.get, secretOptionsMap) - } - else Map() + prepareEnvironment(secretOptionsMap) + } @@ -101,18 +109,15 @@ object ConfigSecurity extends Logging{ } } - private def prepareEnvironment(vaultHost: String, - vaultToken: String, - secretOptions: Map[String, + private def prepareEnvironment(secretOptions: Map[String, Map[String, String]]): Map[String, String] = secretOptions flatMap { case ("kerberos", options) => - KerberosConfig.prepareEnviroment(vaultHost, vaultToken, options) + KerberosConfig.prepareEnviroment(options) case ("datastore", options) => - SSLConfig.prepareEnvironment( - vaultHost, vaultToken, SSLConfig.sslTypeDataStore, options) + SSLConfig.prepareEnvironment(SSLConfig.sslTypeDataStore, options) case ("db", options) => - DBConfig.prepareEnvironment(vaultHost, vaultToken, options) + DBConfig.prepareEnvironment(options) case _ => Map.empty[String, String] } } diff --git a/core/src/main/scala/org/apache/spark/security/DBConfig.scala b/core/src/main/scala/org/apache/spark/security/DBConfig.scala index 3956ef5882724..2ad86bc0a76b4 100644 --- a/core/src/main/scala/org/apache/spark/security/DBConfig.scala +++ b/core/src/main/scala/org/apache/spark/security/DBConfig.scala @@ -17,11 +17,9 @@ package org.apache.spark.security object DBConfig { - def prepareEnvironment(vaultHost: String, - vaultToken: String, - options: Map[String, String]): Map[String, String] = { + def prepareEnvironment(options: Map[String, String]): Map[String, String] = { options.filter(_._1.endsWith("DB_USER_VAULT_PATH")).flatMap{case (_, path) => - val (pass, user) = VaultHelper.getPassPrincipalFromVault(vaultHost, path, vaultToken) + val (pass, user) = VaultHelper.getPassPrincipalFromVault(path) Seq(("spark.db.enable", "true"), ("spark.db.user", user), ("spark.db.pass", pass)) } } diff --git a/core/src/main/scala/org/apache/spark/security/KerberosConfig.scala b/core/src/main/scala/org/apache/spark/security/KerberosConfig.scala index 8bf7226bc0871..9ab272fdda86f 100644 --- a/core/src/main/scala/org/apache/spark/security/KerberosConfig.scala +++ b/core/src/main/scala/org/apache/spark/security/KerberosConfig.scala @@ -24,13 +24,11 @@ import org.apache.spark.internal.Logging object KerberosConfig extends Logging{ - def prepareEnviroment(vaultUrl: String, - vaultToken: String, - options: Map[String, String]): Map[String, String] = { + def prepareEnviroment(options: Map[String, String]): Map[String, String] = { val kerberosVaultPath = options.get("KERBEROS_VAULT_PATH") if(kerberosVaultPath.isDefined) { val (keytab64, principal) = - VaultHelper.getKeytabPrincipalFromVault(vaultUrl, vaultToken, kerberosVaultPath.get) + VaultHelper.getKeytabPrincipalFromVault(kerberosVaultPath.get) val keytabPath = getKeytabPrincipal(keytab64, principal) Map("principal" -> principal, "keytabPath" -> keytabPath) } else { diff --git a/core/src/main/scala/org/apache/spark/security/SSLConfig.scala b/core/src/main/scala/org/apache/spark/security/SSLConfig.scala index f79d2376017d0..d1d47563b5da9 100644 --- a/core/src/main/scala/org/apache/spark/security/SSLConfig.scala +++ b/core/src/main/scala/org/apache/spark/security/SSLConfig.scala @@ -34,18 +34,15 @@ object SSLConfig extends Logging { val sslTypeDataStore = "DATASTORE" - def prepareEnvironment(vaultHost: String, - vaultToken: String, - sslType: String, + def prepareEnvironment(sslType: String, options: Map[String, String]): Map[String, String] = { val sparkSSLPrefix = "spark.ssl." val vaultTrustStorePath = options.get(s"${sslType}_VAULT_TRUSTSTORE_PATH") val vaultTrustStorePassPath = options.get(s"${sslType}_VAULT_TRUSTSTORE_PASS_PATH") - val trustStore = VaultHelper.getTrustStore(vaultHost, vaultToken, vaultTrustStorePath.get) - val trustPass = VaultHelper.getCertPassForAppFromVault( - vaultHost, vaultTrustStorePassPath.get, vaultToken) + val trustStore = VaultHelper.getTrustStore(vaultTrustStorePath.get) + val trustPass = VaultHelper.getCertPassForAppFromVault(vaultTrustStorePassPath.get) val trustStorePath = generateTrustStore(sslType, trustStore, trustPass) logInfo(s"Setting SSL values for $sslType") @@ -63,14 +60,13 @@ object SSLConfig extends Logging { val keyStoreOptions = if (vaultKeystorePath.isDefined && vaultKeystorePassPath.isDefined) { val (key, certs) = - VaultHelper.getCertKeyForAppFromVault(vaultHost, vaultKeystorePath.get, vaultToken) + VaultHelper.getCertKeyForAppFromVault(vaultKeystorePath.get) pemToDer(key) generatePemFile(certs, "cert.crt") generatePemFile(trustStore, "ca.crt") - val pass = VaultHelper.getCertPassForAppFromVault( - vaultHost, vaultKeystorePassPath.get, vaultToken) + val pass = VaultHelper.getCertPassForAppFromVault( vaultKeystorePassPath.get) val keyStorePath = generateKeyStore(sslType, certs, key, pass) @@ -89,7 +85,7 @@ object SSLConfig extends Logging { val vaultKeyPassPath = options.get(s"${sslType}_VAULT_KEY_PASS_PATH") val keyPass = Map(s"$sparkSSLPrefix${sslType.toLowerCase}.keyPassword" - -> VaultHelper.getCertPassForAppFromVault(vaultHost, vaultKeyPassPath.get, vaultToken)) + -> VaultHelper.getCertPassForAppFromVault(vaultKeyPassPath.get)) val certFilesPath = Map(s"$sparkSSLPrefix${sslType.toLowerCase}.certPem.path" -> "/tmp/cert.crt", diff --git a/core/src/main/scala/org/apache/spark/security/VaultHelper.scala b/core/src/main/scala/org/apache/spark/security/VaultHelper.scala index b2237c95d55af..f16453c201ed7 100644 --- a/core/src/main/scala/org/apache/spark/security/VaultHelper.scala +++ b/core/src/main/scala/org/apache/spark/security/VaultHelper.scala @@ -20,15 +20,14 @@ import org.apache.spark.internal.Logging object VaultHelper extends Logging { - var token: Option[String] = None + lazy val jsonTempTokenTemplate: String = "{ \"token\" : \"_replace_\" }" lazy val jsonRoleSecretTemplate: String = "{ \"role_id\" : \"_replace_role_\"," + " \"secret_id\" : \"_replace_secret_\"}" - def getTokenFromAppRole(vaultUrl: String, - roleId: String, + def getTokenFromAppRole(roleId: String, secretId: String): String = { - val requestUrl = s"$vaultUrl/v1/auth/approle/login" + val requestUrl = s"${ConfigSecurity.vaultURI.get}/v1/auth/approle/login" logDebug(s"Requesting login from app and role: $requestUrl") val replace: String = jsonRoleSecretTemplate.replace("_replace_role_", roleId) .replace("_replace_secret_", secretId) @@ -39,111 +38,93 @@ object VaultHelper extends Logging { None, Some(jsonAppRole))("client_token").asInstanceOf[String] } - def getRoleIdFromVault(vaultUrl: String, - role: String): String = { - val requestUrl = s"$vaultUrl/v1/auth/approle/role/$role/role-id" - if (!token.isDefined) token = { - logDebug(s"Requesting token from app role: $role") - Option(VaultHelper.getTokenFromAppRole(vaultUrl, - sys.env("VAULT_ROLE_ID"), - sys.env("VAULT_SECRET_ID"))) - } + def getRoleIdFromVault(role: String): String = { + val requestUrl = s"${ConfigSecurity.vaultURI.get}/v1/auth/approle/role/$role/role-id" + logDebug(s"Requesting Role ID from Vault: $requestUrl") HTTPHelper.executeGet(requestUrl, "data", - Some(Seq(("X-Vault-Token", token.get))))("role_id").asInstanceOf[String] + Some(Seq(("X-Vault-Token", ConfigSecurity.vaultToken.get))))("role_id").asInstanceOf[String] } - def getSecretIdFromVault(vaultUrl: String, - role: String): String = { - val requestUrl = s"$vaultUrl/v1/auth/approle/role/$role/secret-id" - if (!token.isDefined) token = { - logDebug(s"Requesting token from app role: $role") - Option(VaultHelper.getTokenFromAppRole(vaultUrl, - sys.env("VAULT_ROLE_ID"), - sys.env("VAULT_SECRET_ID"))) - } + def getSecretIdFromVault(role: String): String = { + val requestUrl = s"${ConfigSecurity.vaultURI.get}/v1/auth/approle/role/$role/secret-id" logDebug(s"Requesting Secret ID from Vault: $requestUrl") HTTPHelper.executePost(requestUrl, "data", - Some(Seq(("X-Vault-Token", token.get))))("secret_id").asInstanceOf[String] + Some(Seq(("X-Vault-Token", ConfigSecurity.vaultToken.get))))("secret_id").asInstanceOf[String] } - def getTemporalToken(vaultUrl: String, token: String): String = { - val requestUrl = s"$vaultUrl/v1/sys/wrapping/wrap" + def getTemporalToken: String = { + val requestUrl = s"${ConfigSecurity.vaultURI.get}/v1/sys/wrapping/wrap" logDebug(s"Requesting temporal token: $requestUrl") - val jsonToken = jsonTempTokenTemplate.replace("_replace_", token) + val jsonToken = jsonTempTokenTemplate.replace("_replace_", ConfigSecurity.vaultToken.get) HTTPHelper.executePost(requestUrl, "wrap_info", - Some(Seq(("X-Vault-Token", token), ("X-Vault-Wrap-TTL", sys.env.get("VAULT_WRAP_TTL") + Some(Seq(("X-Vault-Token", ConfigSecurity.vaultToken.get), + ("X-Vault-Wrap-TTL", sys.env.get("VAULT_WRAP_TTL") .getOrElse("2000")))), Some(jsonToken))("token").asInstanceOf[String] } - def getKeytabPrincipalFromVault(vaultUrl: String, - token: String, - vaultPath: String): (String, String) = { - val requestUrl = s"$vaultUrl/$vaultPath" + def getKeytabPrincipalFromVault(vaultPath: String): (String, String) = { + val requestUrl = s"${ConfigSecurity.vaultURI.get}/$vaultPath" logDebug(s"Requesting Keytab and principal: $requestUrl") - val data = HTTPHelper.executeGet(requestUrl, "data", Some(Seq(("X-Vault-Token", token)))) + val data = HTTPHelper.executeGet(requestUrl, "data", Some(Seq(("X-Vault-Token", + ConfigSecurity.vaultToken.get)))) val keytab64 = data.find(_._1.contains("keytab")).get._2.asInstanceOf[String] val principal = data.find(_._1.contains("principal")).get._2.asInstanceOf[String] (keytab64, principal) } // TODO refactor these two functions into one - def getMesosPrincipalAndSecret(vaultUrl: String, - token: String, - instanceName: String): (String, String) = { - getPassPrincipalFromVault(vaultUrl, s"/v1/userland/passwords/$instanceName/mesos", token) + def getMesosPrincipalAndSecret(instanceName: String): (String, String) = { + getPassPrincipalFromVault(s"/v1/userland/passwords/$instanceName/mesos") } - def getPassPrincipalFromVault(vaultUrl: String, - vaultPath: String, - token: String): (String, String) = { - val requestUrl = s"$vaultUrl/$vaultPath" + def getPassPrincipalFromVault(vaultPath: String): (String, String) = { + val requestUrl = s"${ConfigSecurity.vaultURI.get}/$vaultPath" logDebug(s"Requesting user and pass: $requestUrl") - val data = HTTPHelper.executeGet(requestUrl, "data", Some(Seq(("X-Vault-Token", token)))) + val data = HTTPHelper.executeGet(requestUrl, "data", Some(Seq(("X-Vault-Token", + ConfigSecurity.vaultToken.get)))) val pass = data.find(_._1.contains("pass")).get._2.asInstanceOf[String] val principal = data.find(_._1.contains("user")).get._2.asInstanceOf[String] (pass, principal) } - def getTrustStore(vaultUrl: String, token: String, certVaultPath: String): String = { - val requestUrl = s"$vaultUrl/$certVaultPath" + def getTrustStore(certVaultPath: String): String = { + val requestUrl = s"${ConfigSecurity.vaultURI.get}/$certVaultPath" val truststoreVaultPath = s"$requestUrl" logDebug(s"Requesting truststore: $truststoreVaultPath") val data = HTTPHelper.executeGet(requestUrl, - "data", Some(Seq(("X-Vault-Token", token)))) + "data", Some(Seq(("X-Vault-Token", ConfigSecurity.vaultToken.get)))) val trustStore = data.find(_._1.endsWith("_crt")).get._2.asInstanceOf[String] trustStore } - def getCertPassForAppFromVault(vaultUrl: String, - appPassVaulPath: String, - token: String): String = { + def getCertPassForAppFromVault(appPassVaulPath: String): String = { logDebug(s"Requesting Cert Pass For App: $appPassVaulPath") - val requestUrl = s"$vaultUrl/$appPassVaulPath" + val requestUrl = s"${ConfigSecurity.vaultURI.get}/$appPassVaulPath" HTTPHelper.executeGet(requestUrl, - "data", Some(Seq(("X-Vault-Token", token))))("pass").asInstanceOf[String] + "data", Some(Seq(("X-Vault-Token", ConfigSecurity.vaultToken.get))) + )("pass").asInstanceOf[String] } - def getCertKeyForAppFromVault(vaultUrl: String, - vaultPath: String, - token: String): (String, String) = { + def getCertKeyForAppFromVault(vaultPath: String): (String, String) = { logDebug(s"Requesting Cert Key For App: $vaultPath") - val requestUrl = s"$vaultUrl/$vaultPath" + val requestUrl = s"${ConfigSecurity.vaultURI.get}/$vaultPath" val data = HTTPHelper.executeGet(requestUrl, - "data", Some(Seq(("X-Vault-Token", token)))) + "data", Some(Seq(("X-Vault-Token", ConfigSecurity.vaultToken.get)))) val certs = data.find(_._1.endsWith("_crt")).get._2.asInstanceOf[String] val key = data.find(_._1.endsWith("_key")).get._2.asInstanceOf[String] (key, certs) } - def getRealToken(vaultUrl: String, token: String): String = { - val requestUrl = s"$vaultUrl/v1/sys/wrapping/unwrap" + def getRealToken(vaultTempToken: Option[String]): String = { + val requestUrl = s"${ConfigSecurity.vaultURI.get}/v1/sys/wrapping/unwrap" logDebug(s"Requesting real Token: $requestUrl") HTTPHelper.executePost(requestUrl, - "data", Some(Seq(("X-Vault-Token", token))))("token").asInstanceOf[String] + "data", Some(Seq(("X-Vault-Token", vaultTempToken.get))) + )("token").asInstanceOf[String] } } diff --git a/core/src/test/scala/org/apache/spark/security/ConfigSecuritySuite.scala b/core/src/test/scala/org/apache/spark/security/ConfigSecuritySuite.scala index 7da4fb130105f..a1323c9a4302b 100644 --- a/core/src/test/scala/org/apache/spark/security/ConfigSecuritySuite.scala +++ b/core/src/test/scala/org/apache/spark/security/ConfigSecuritySuite.scala @@ -45,28 +45,4 @@ class ConfigSecuritySuite extends SparkFunSuite with Matchers { "datastore" -> Map("DATASTORE_ENABLE" -> "true"), "kafka" -> Map("KAFKA_ENABLE" -> "true"))) } - /** - * getVaultUri - */ - - val vaultProtocol = Option("https") - val vaultHost = Option("vault.labs.stratio.com") - val vaultPort = Option("8200") - val expectedResult = "https://vault.labs.stratio.com:8200" - - test("getVaultUri with all the parameters given") { - val vaultUri = ConfigSecurity.getVaultUri(vaultProtocol, vaultHost, vaultPort) - assert(vaultUri === Option(expectedResult)) - } - - test("getVaultUri without one of the parameters needed") { - val vaultUri = ConfigSecurity.getVaultUri(None, vaultHost, vaultPort) - assert(vaultUri === None) - } - - test("getVaultUri without none of the parameters needed") { - val vaultUri = ConfigSecurity.getVaultUri(None, None, None) - assert(vaultUri === None) - } - } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala index 47533a19ad660..fa79092f283ab 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala @@ -28,7 +28,7 @@ import org.apache.spark.deploy.Command import org.apache.spark.deploy.mesos.MesosDriverDescription import org.apache.spark.deploy.rest._ import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler -import org.apache.spark.security.VaultHelper +import org.apache.spark.security.{ConfigSecurity, VaultHelper} import org.apache.spark.util.Utils /** @@ -45,21 +45,6 @@ private[spark] class MesosRestServer( scheduler: MesosClusterScheduler) extends RestSubmissionServer(host, requestedPort, masterConf) { - protected lazy val token = { - require((masterConf.getOption("spark.secret.vault.protocol").isDefined - && masterConf.getOption("spark.secret.vault.hosts").isDefined - && masterConf.getOption("spark.secret.vault.port").isDefined), - "You are attempting to login in Vault but no Vault obtained," + - " please configure spark.secret.vault.protocol," + - " spark.vault.hosts and spark.secret.vault.port" + - " in your Stratio Spark Dispatcher instance") - val vaultUrl = s"${masterConf.get("spark.secret.vault.protocol")}://" + - s"${masterConf.get("spark.secret.vault.hosts").split(",") - .map(host => s"$host:${masterConf.get("spark.secret.vault.port")}").mkString(",")}" - VaultHelper.getTokenFromAppRole(vaultUrl, - sys.env("VAULT_ROLE_ID"), - sys.env("VAULT_SECRET_ID")) - } protected override val submitRequestServlet = new MesosSubmitRequestServlet(scheduler, masterConf) @@ -125,12 +110,8 @@ private[mesos] class MesosSubmitRequestServlet( val sparkJavaOpts = Utils.sparkJavaOpts(conf) val javaOpts = sparkJavaOpts ++ extraJavaOpts val securitySparkOpts: Map[String, String] = { - if (sparkProperties.get("spark.secret.vault.hosts").isDefined - && sparkProperties.get("spark.secret.vault.protocol").isDefined - && sparkProperties.get("spark.secret.vault.port").isDefined) { - val vaultUrl = s"${sparkProperties("spark.secret.vault.protocol")}://" + - s"${sparkProperties("spark.secret.vault.hosts").split(",") - .map(host => s"$host:${sparkProperties("spark.secret.vault.port")}").mkString(",")}" + if (ConfigSecurity.vaultURI.isDefined) { + val vaultURL = ConfigSecurity.vaultURI.get (sparkProperties.get("spark.secret.vault.role"), sys.env.get("VAULT_ROLE"), sparkProperties.get("spark.secret.vault.token"), @@ -138,14 +119,14 @@ private[mesos] class MesosSubmitRequestServlet( case (roleProp, roleEnv, None, None) if (roleEnv.isDefined || roleProp.isDefined) => val role = roleProp.getOrElse(roleEnv.get) logTrace(s"obtaining vault secretID and role ID using role: $role") - val driverSecretId = VaultHelper.getSecretIdFromVault(vaultUrl, role) - val driverRoleId = VaultHelper.getRoleIdFromVault(vaultUrl, role) + val driverSecretId = VaultHelper.getSecretIdFromVault(role) + val driverRoleId = VaultHelper.getRoleIdFromVault(role) Map("spark.mesos.driverEnv.VAULT_ROLE_ID" -> driverRoleId, "spark.mesos.driverEnv.VAULT_SECRET_ID" -> driverSecretId) case (None, None, tokenProp, tokenVal) if (tokenProp.isDefined || tokenVal.isDefined) => val vaultToken = tokenProp.getOrElse(tokenVal.get) - val temporalToken = VaultHelper.getTemporalToken(vaultUrl, vaultToken) + val temporalToken = VaultHelper.getTemporalToken logDebug(s"Obtained token from One time token; $temporalToken") Map("spark.secret.vault.tempToken" -> temporalToken) ++ request.sparkProperties .filter(_._1 != "spark.secret.vault.token") diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index b50a1401a1f70..9d1ffaa5f63c3 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -31,7 +31,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState} import org.apache.spark.deploy.mesos.{MesosDriverDescription, config} import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse} import org.apache.spark.metrics.MetricsSystem -import org.apache.spark.security.VaultHelper +import org.apache.spark.security.{ConfigSecurity, VaultHelper} import org.apache.spark.util.Utils /** @@ -706,17 +706,12 @@ private[spark] class MesosClusterScheduler( val nextRetry = new Date(new Date().getTime + waitTimeSec * 1000L) var sparkProperties = state.driverDescription.conf.getAll.toMap - if (sparkProperties.get("spark.secret.vault.protocol").isDefined - && sparkProperties.get("spark.secret.vault.hosts").isDefined - && sparkProperties.get("spark.secret.vault.port").isDefined) + if (ConfigSecurity.vaultURI.isDefined) { - val vaultUrl = s"${sparkProperties("spark.secret.vault.protocol")}://" + - s"${sparkProperties("spark.secret.vault.hosts").split(",") - .map(host => s"$host:${sparkProperties("spark.secret.vault.port")}") - .mkString(",")}" + val vaultURI = ConfigSecurity.vaultURI.get val role = sparkProperties("spark.secret.vault.role") - val driverSecretId = VaultHelper.getSecretIdFromVault(vaultUrl, role) - val driverRoleId = VaultHelper.getRoleIdFromVault(vaultUrl, role) + val driverSecretId = VaultHelper.getSecretIdFromVault(role) + val driverRoleId = VaultHelper.getRoleIdFromVault(role) sparkProperties = sparkProperties.updated("spark.secret.roleID", driverRoleId) .updated("spark.secret.secretID", driverSecretId) } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 7ce0571edf3a3..e32df1ee4f673 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -212,15 +212,22 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( .build()) } - if (ConfigSecurity.vaultToken.isDefined) { + if (ConfigSecurity.vaultToken.isDefined && ConfigSecurity.vaultURI.isDefined) { environment.addVariables(Environment.Variable.newBuilder() .setName("VAULT_TEMP_TOKEN") - .setValue(VaultHelper.getTemporalToken(ConfigSecurity.vaultUri.get, - ConfigSecurity.vaultToken.get)) + .setValue(VaultHelper.getTemporalToken) .build()) environment.addVariables(Environment.Variable.newBuilder() - .setName("VAULT_URI") - .setValue(ConfigSecurity.vaultUri.get) + .setName("VAULT_PROTOCOL") + .setValue(sys.env.get("VAULT_PROTOCOL").get) + .build()) + environment.addVariables(Environment.Variable.newBuilder() + .setName("VAULT_HOSTS") + .setValue(sys.env.get("VAULT_HOSTS").get) + .build()) + environment.addVariables(Environment.Variable.newBuilder() + .setName("VAULT_PORT") + .setValue(sys.env.get("VAULT_PORT").get) .build()) } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index e38a61ecc3699..858494ba884ce 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -79,23 +79,10 @@ trait MesosSchedulerUtils extends Logging { fwInfoBuilder.setHostname(Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse( conf.get(DRIVER_HOST_ADDRESS))) - val vaultUri = ConfigSecurity.vaultUri - - if(vaultUri.isDefined && conf.getOption("spark.mesos.role").isDefined) { - - val token = - if (sys.env.get("VAULT_TOKEN").isDefined) { - logDebug("VAULT_TOKEN provided as environment variable") - sys.env.get("VAULT_TOKEN").get - } else { - logDebug("VAULT_TOKEN obtained from VAULT_ROLE_ID and VAULT_SECRET_ID") - VaultHelper.getTokenFromAppRole(vaultUri.get, - sys.env.get("VAULT_ROLE_ID").get, sys.env.get("VAULT_SECRET_ID").get) - } + if(ConfigSecurity.vaultURI.isDefined && conf.getOption("spark.mesos.role").isDefined) { val(mSecret, mPrincipal) = - VaultHelper.getMesosPrincipalAndSecret(vaultUri.get, - token, conf.getOption("spark.mesos.role").get) + VaultHelper.getMesosPrincipalAndSecret(conf.getOption("spark.mesos.role").get) conf.set("spark.mesos.principal", mPrincipal) fwInfoBuilder.setPrincipal(mPrincipal)