Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WX-878 Single shared BlobFileSystemManager #6986

Merged
merged 2 commits into from
Jan 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.azure.storage.blob.nio.AzureFileSystem
import com.azure.storage.blob.sas.{BlobContainerSasPermission, BlobServiceSasSignatureValues}
import com.azure.storage.blob.{BlobContainerClient, BlobContainerClientBuilder}
import com.azure.storage.common.StorageSharedKeyCredential
import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import common.validation.Validation._

Expand All @@ -20,6 +21,8 @@ import java.time.{Duration, Instant, OffsetDateTime}
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}

// We encapsulate this functionality here so that we can easily mock it out, to allow for testing without
// actually connecting to Blob storage.
case class FileSystemAPI() {
def getFileSystem(uri: URI): Try[FileSystem] = Try(FileSystems.getFileSystem(uri))
def newFileSystem(uri: URI, config: Map[String, Object]): FileSystem = FileSystems.newFileSystem(uri, config.asJava)
Expand All @@ -45,15 +48,27 @@ object BlobFileSystemManager {
def hasTokenExpired(tokenExpiry: Instant, buffer: Duration): Boolean = Instant.now.plus(buffer).isAfter(tokenExpiry)
def uri(endpoint: EndpointURL) = new URI("azb://?endpoint=" + endpoint)
}
case class BlobFileSystemManager(
container: BlobContainerName,
endpoint: EndpointURL,
expiryBufferMinutes: Long,
blobTokenGenerator: BlobSasTokenGenerator,
fileSystemAPI: FileSystemAPI = FileSystemAPI(),
private val initialExpiration: Option[Instant] = None) extends LazyLogging {
private var expiry: Option[Instant] = initialExpiration

class BlobFileSystemManager(val container: BlobContainerName,
val endpoint: EndpointURL,
val expiryBufferMinutes: Long,
val blobTokenGenerator: BlobSasTokenGenerator,
val fileSystemAPI: FileSystemAPI = FileSystemAPI(),
private val initialExpiration: Option[Instant] = None) extends LazyLogging {

def this(config: BlobFileSystemConfig) = {
this(
config.blobContainerName,
config.endpointURL,
config.expiryBufferMinutes,
BlobSasTokenGenerator.createBlobTokenGeneratorFromConfig(config)
THWiseman marked this conversation as resolved.
Show resolved Hide resolved
)
}

def this(rawConfig: Config) = this(BlobFileSystemConfig(rawConfig))

val buffer: Duration = Duration.of(expiryBufferMinutes, ChronoUnit.MINUTES)
private var expiry: Option[Instant] = initialExpiration

def getExpiry: Option[Instant] = expiry
def uri: URI = BlobFileSystemManager.uri(endpoint)
Expand Down Expand Up @@ -87,6 +102,44 @@ case class BlobFileSystemManager(

sealed trait BlobSasTokenGenerator { def generateBlobSasToken: Try[AzureSasCredential] }
object BlobSasTokenGenerator {

/**
* Creates the correct BlobSasTokenGenerator based on config inputs. This generator is responsible for producing
* a valid SAS token for use in accessing a Azure blob storage container.
* Two types of generators can be produced here:
* > Workspace Manager (WSM) mediated SAS token generator, used to create SAS tokens that allow access for
* blob containers mediated by the WSM, and is enabled when a WSM config is provided. This is what is intended
* for use inside Terra
* OR
* > Native SAS token generator, which obtains a valid SAS token from your local environment to reach blob containers
* your local azure identity has access to and is the default if a WSM config is not found. This is intended for
* use outside of Terra
*
* Both of these generators require an authentication token to authorize the generation of the SAS token.
* See BlobSasTokenGenerator for more information on how these generators work.
*
* @param config A BlobFileSystemConfig object
* @return An appropriate BlobSasTokenGenerator
*/
def createBlobTokenGeneratorFromConfig(config: BlobFileSystemConfig): BlobSasTokenGenerator =
config.workspaceManagerConfig.map { wsmConfig =>
val wsmClient: WorkspaceManagerApiClientProvider = new HttpWorkspaceManagerClientProvider(wsmConfig.url)

// WSM-mediated mediated SAS token generator
// parameterizing client instead of URL to make injecting mock client possible
BlobSasTokenGenerator.createBlobTokenGenerator(
config.blobContainerName,
config.endpointURL,
wsmConfig.workspaceId,
wsmConfig.containerResourceId,
wsmClient,
wsmConfig.overrideWsmAuthToken
)
}.getOrElse(
// Native SAS token generator
BlobSasTokenGenerator.createBlobTokenGenerator(config.blobContainerName, config.endpointURL, config.subscriptionId)
)
THWiseman marked this conversation as resolved.
Show resolved Hide resolved

/**
* Native SAS token generator, uses the DefaultAzureCredentialBuilder in the local environment
* to produce a SAS token.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,11 @@ final case class WorkspaceId(value: UUID) {override def toString: String = value
final case class ContainerResourceId(value: UUID) {override def toString: String = value.toString}
final case class WorkspaceManagerURL(value: String) {override def toString: String = value}

final case class BlobPathBuilderFactory(globalConfig: Config, instanceConfig: Config, singletonConfig: BlobFileSystemConfigWrapper) extends PathBuilderFactory {

private val config = singletonConfig.config
private val container = config.blobContainerName
private val endpoint = config.endpointURL
private val subscription = config.subscriptionId
private val expiryBufferMinutes = config.expiryBufferMinutes

/**
* This generator is responsible for producing a valid SAS token for use in accessing a Azure blob storage container
* Two types of generators can be produced here:
* > Workspace Manager (WSM) mediated SAS token generator, used to create SAS tokens that allow access for
* blob containers mediated by the WSM, and is enabled when a WSM config is provided. This is what is intended for use inside Terra
* OR
* > Native SAS token generator, which obtains a valid SAS token from your local environment to reach blob containers
* your local azure identity has access to and is the default if a WSM config is not found. This is intended for use outside of Terra
*
* Both of these generators require an authentication token to authorize the generation of the SAS token.
* See BlobSasTokenGenerator for more information on how these generators work.
*/
val blobSasTokenGenerator: BlobSasTokenGenerator = config.workspaceManagerConfig.map { wsmConfig =>
val wsmClient: WorkspaceManagerApiClientProvider = new HttpWorkspaceManagerClientProvider(wsmConfig.url)
// WSM-mediated mediated SAS token generator
// parameterizing client instead of URL to make injecting mock client possible
BlobSasTokenGenerator.createBlobTokenGenerator(container, endpoint, wsmConfig.workspaceId, wsmConfig.containerResourceId, wsmClient, wsmConfig.overrideWsmAuthToken)
}.getOrElse(
// Native SAS token generator
BlobSasTokenGenerator.createBlobTokenGenerator(container, endpoint, subscription)
)

val fsm: BlobFileSystemManager = BlobFileSystemManager(container, endpoint, expiryBufferMinutes, blobSasTokenGenerator)
final case class BlobPathBuilderFactory(globalConfig: Config, instanceConfig: Config, fsm: BlobFileSystemManager) extends PathBuilderFactory {

override def withOptions(options: WorkflowOptions)(implicit as: ActorSystem, ec: ExecutionContext): Future[BlobPathBuilder] = {
Future {
new BlobPathBuilder(container, endpoint)(fsm)
new BlobPathBuilder(fsm.container, fsm.endpoint)(fsm)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class BlobPathBuilderFactorySpec extends AnyFlatSpec with Matchers with MockSuga
val blobTokenGenerator = mock[BlobSasTokenGenerator]
when(blobTokenGenerator.generateBlobSasToken).thenReturn(Try(sasToken))

val fsm = BlobFileSystemManager(container, endpoint, 10L, blobTokenGenerator, fileSystems, Some(expiredToken))
val fsm = new BlobFileSystemManager(container, endpoint, 10L, blobTokenGenerator, fileSystems, Some(expiredToken))
fsm.getExpiry should contain(expiredToken)
fsm.isTokenExpired shouldBe true
fsm.retrieveFilesystem()
Expand Down Expand Up @@ -103,7 +103,7 @@ class BlobPathBuilderFactorySpec extends AnyFlatSpec with Matchers with MockSuga
val blobTokenGenerator = mock[BlobSasTokenGenerator]
when(blobTokenGenerator.generateBlobSasToken).thenReturn(Try(sasToken))

val fsm = BlobFileSystemManager(container, endpoint, 10L, blobTokenGenerator, fileSystems, Some(initialToken))
val fsm = new BlobFileSystemManager(container, endpoint, 10L, blobTokenGenerator, fileSystems, Some(initialToken))
fsm.getExpiry should contain(initialToken)
fsm.isTokenExpired shouldBe false
fsm.retrieveFilesystem()
Expand All @@ -128,7 +128,7 @@ class BlobPathBuilderFactorySpec extends AnyFlatSpec with Matchers with MockSuga
val blobTokenGenerator = mock[BlobSasTokenGenerator]
when(blobTokenGenerator.generateBlobSasToken).thenReturn(Try(sasToken))

val fsm = BlobFileSystemManager(container, endpoint, 10L, blobTokenGenerator, fileSystems, Some(refreshedToken))
val fsm = new BlobFileSystemManager(container, endpoint, 10L, blobTokenGenerator, fileSystems, Some(refreshedToken))
fsm.getExpiry.isDefined shouldBe true
fsm.isTokenExpired shouldBe false
fsm.retrieveFilesystem()
Expand All @@ -152,7 +152,7 @@ class BlobPathBuilderFactorySpec extends AnyFlatSpec with Matchers with MockSuga
val blobTokenGenerator = mock[BlobSasTokenGenerator]
when(blobTokenGenerator.generateBlobSasToken).thenReturn(Try(sasToken))

val fsm = BlobFileSystemManager(container, endpoint, 10L, blobTokenGenerator, fileSystems)
val fsm = new BlobFileSystemManager(container, endpoint, 10L, blobTokenGenerator, fileSystems)
fsm.getExpiry.isDefined shouldBe false
fsm.isTokenExpired shouldBe false
fsm.retrieveFilesystem()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers with MockSugar {
val store = BlobContainerName("inputs")
val evalPath = "/test/inputFile.txt"
val blobTokenGenerator = NativeBlobSasTokenGenerator(store, endpoint)
val fsm: BlobFileSystemManager = BlobFileSystemManager(store, endpoint, 10L, blobTokenGenerator)
val fsm: BlobFileSystemManager = new BlobFileSystemManager(store, endpoint, 10L, blobTokenGenerator)
val testString = endpoint.value + "/" + store + evalPath
val blobPath: BlobPath = new BlobPathBuilder(store, endpoint)(fsm) build testString getOrElse fail()

Expand All @@ -81,7 +81,7 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers with MockSugar {
val store = BlobContainerName("inputs")
val evalPath = "/test/inputFile.txt"
val blobTokenGenerator = NativeBlobSasTokenGenerator(store, endpoint)
val fsm: BlobFileSystemManager = BlobFileSystemManager(store, endpoint, 10, blobTokenGenerator)
val fsm: BlobFileSystemManager = new BlobFileSystemManager(store, endpoint, 10, blobTokenGenerator)
val testString = endpoint.value + "/" + store + evalPath
val blobPath1: BlobPath = new BlobPathBuilder(store, endpoint)(fsm) build testString getOrElse fail()
blobPath1.nioPath.getFileSystem.close()
Expand All @@ -96,7 +96,7 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers with MockSugar {
val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage")
val store = BlobContainerName("inputs")
val blobTokenGenerator = NativeBlobSasTokenGenerator(store, endpoint)
val fsm: BlobFileSystemManager = BlobFileSystemManager(store, endpoint, 10, blobTokenGenerator)
val fsm: BlobFileSystemManager = new BlobFileSystemManager(store, endpoint, 10, blobTokenGenerator)

val rootString = s"${endpoint.value}/${store.value}/cromwell-execution"
val blobRoot: BlobPath = new BlobPathBuilder(store, endpoint)(fsm) build rootString getOrElse fail()
Expand Down
3 changes: 2 additions & 1 deletion src/ci/resources/tes_application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ filesystems {
blob {
class = "cromwell.filesystems.blob.BlobPathBuilderFactory"
global {
class = "cromwell.filesystems.blob.BlobFileSystemConfigWrapper"
# One BFSM is shared across all BlobPathBuilders
class = "cromwell.filesystems.blob.BlobFileSystemManager"
config {
container: "cromwell"
endpoint: "https://<storage-account>.blob.core.windows.net"
Expand Down