Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Akka DynamicAccess for reflective classloading #230

Merged
merged 2 commits into from
Apr 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ services:
language: scala
scala:
- 2.11.12
- 2.12.6
- 2.12.8
env:
matrix:
- MONGODB_VERSION=3.0 MONGODB_OPTS="--storageEngine wiredTiger"
Expand Down
39 changes: 23 additions & 16 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
val releaseV = "2.2.4"

val scalaV = "2.11.8"
val scalaV = "2.11.12"

scalaVersion := scalaV

crossScalaVersions := Seq("2.11.8", "2.12.2")
crossScalaVersions := Seq("2.11.12", "2.12.8")

val AkkaV = "2.5.12" //min version to have Serialization.withTransportInformation
val MongoJavaDriverVersion = "3.8.2"

def commonDeps(sv:String) = Seq(
("com.typesafe.akka" %% "akka-persistence" % AkkaV % "provided")
("com.typesafe.akka" %% "akka-persistence" % AkkaV)
.exclude("org.iq80.leveldb", "leveldb")
.exclude("org.fusesource.leveldbjni", "leveldbjni-all"),
(sv match {
Expand All @@ -18,9 +19,9 @@ def commonDeps(sv:String) = Seq(
})
.exclude("com.typesafe.akka", "akka-actor_2.11")
.exclude("com.typesafe.akka", "akka-actor_2.12"),
"com.typesafe.akka" %% "akka-persistence-query" % AkkaV % "provided",
"org.mongodb" % "mongodb-driver-core" % "3.8.2" % "compile",
"org.mongodb" % "mongodb-driver" % "3.8.2" % "test",
"com.typesafe.akka" %% "akka-persistence-query" % AkkaV % "compile",
"org.mongodb" % "mongodb-driver-core" % MongoJavaDriverVersion % "compile",
"org.mongodb" % "mongodb-driver" % MongoJavaDriverVersion % "test",
"org.slf4j" % "slf4j-api" % "1.7.22" % "test",
"org.apache.logging.log4j" % "log4j-api" % "2.5" % "test",
"org.apache.logging.log4j" % "log4j-core" % "2.5" % "test",
Expand All @@ -40,6 +41,12 @@ val commonSettings = Seq(
scalaVersion := scalaV,
dependencyOverrides += "org.mongodb" % "mongodb-driver" % "3.8.2" ,
libraryDependencies ++= commonDeps(scalaBinaryVersion.value),
dependencyOverrides ++= Seq(
"com.typesafe" % "config" % "1.3.2",
"org.slf4j" % "slf4j-api" % "1.7.22",
"com.typesafe.akka" %% "akka-stream" % AkkaV,
"org.mongodb" % "mongo-java-driver" % MongoJavaDriverVersion
),
version := releaseV,
organization := "com.github.scullxbones",
scalacOptions ++= Seq(
Expand Down Expand Up @@ -85,7 +92,7 @@ lazy val `akka-persistence-mongo-casbah` = (project in file("casbah"))
.settings(commonSettings:_*)
.settings(
libraryDependencies ++= Seq(
"org.mongodb" %% "casbah" % "3.1.1" % "provided"
"org.mongodb" %% "casbah" % "3.1.1" % "compile"
)
)
.configs(Travis)
Expand All @@ -95,11 +102,11 @@ lazy val `akka-persistence-mongo-scala` = (project in file("scala"))
.settings(commonSettings:_*)
.settings(
libraryDependencies ++= Seq(
"org.mongodb.scala" %% "mongo-scala-driver" % "2.4.2" % "provided",
"org.mongodb.scala" %% "mongo-scala-bson" % "2.4.2" % "provided",
"io.netty" % "netty-buffer" % "4.1.17.Final" % "provided",
"io.netty" % "netty-transport" % "4.1.17.Final" % "provided",
"io.netty" % "netty-handler" % "4.1.17.Final" % "provided",
"org.mongodb.scala" %% "mongo-scala-driver" % "2.4.2" % "compile",
"org.mongodb.scala" %% "mongo-scala-bson" % "2.4.2" % "compile",
"io.netty" % "netty-buffer" % "4.1.17.Final" % "compile",
"io.netty" % "netty-transport" % "4.1.17.Final" % "compile",
"io.netty" % "netty-handler" % "4.1.17.Final" % "compile",
"org.reactivestreams" % "reactive-streams" % "1.0.2"
)
)
Expand All @@ -110,10 +117,10 @@ lazy val `akka-persistence-mongo-rxmongo` = (project in file("rxmongo"))
.settings(commonSettings:_*)
.settings(
libraryDependencies ++= Seq(
("org.reactivemongo" %% "reactivemongo" % "0.16.0" % "provided")
("org.reactivemongo" %% "reactivemongo" % "0.16.0" % "compile")
.exclude("com.typesafe.akka","akka-actor_2.11")
.exclude("com.typesafe.akka","akka-actor_2.12"),
("org.reactivemongo" %% "reactivemongo-akkastream" % "0.16.0" % "provided")
("org.reactivemongo" %% "reactivemongo-akkastream" % "0.16.0" % "compile")
.exclude("com.typesafe.akka","akka-actor_2.11")
.exclude("com.typesafe.akka","akka-actor_2.12")
),
Expand All @@ -127,7 +134,7 @@ lazy val `akka-persistence-mongo-tools` = (project in file("tools"))
.settings(commonSettings:_*)
.settings(
libraryDependencies ++= Seq(
"org.mongodb" %% "casbah" % "3.1.1" % "provided"
"org.mongodb" %% "casbah" % "3.1.1" % "compile"
)
)
.configs(Travis)
Expand All @@ -136,7 +143,7 @@ lazy val `akka-persistence-mongo` = (project in file("."))
.aggregate(`akka-persistence-mongo-common`, `akka-persistence-mongo-casbah`, `akka-persistence-mongo-rxmongo`, `akka-persistence-mongo-scala`, `akka-persistence-mongo-tools`)
.settings(commonSettings:_*)
.settings(
packagedArtifacts in file(".") := Map.empty,
skip in publish := true,
publishTo := Some(Resolver.file("file", new File("target/unusedrepo")))
)
.configs(Travis)
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ package akka.contrib.persistence.mongodb

import akka.actor.ActorSystem
import akka.persistence._
import com.mongodb.casbah.Imports
import com.mongodb.casbah.{Imports, TypeImports}
import com.mongodb.{DBObject, DuplicateKeyException}
import com.mongodb.casbah.Imports._
import org.slf4j.{Logger, LoggerFactory}
Expand Down Expand Up @@ -106,7 +106,7 @@ class CasbahPersistenceJournaller(val driver: CasbahMongoDriver) extends MongoPe
journal.aggregate($match :: $group :: Nil).results.flatMap(_.getAs[Long]("max")).headOption
}

private[this] def setMaxSequenceMetadata(persistenceId: String, maxSequenceNr: Long)(implicit ec: ExecutionContext) = {
private[this] def setMaxSequenceMetadata(persistenceId: String, maxSequenceNr: Long)(implicit ec: ExecutionContext): TypeImports.WriteResult = {
try {
metadata.update(
MongoDBObject(PROCESSOR_ID -> persistenceId),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.time.{Duration, Instant}
import java.util.concurrent.atomic.AtomicReference
import java.util.function.BinaryOperator

import akka.actor.ActorSystem
import com.typesafe.config.Config

import scala.collection.concurrent.TrieMap
Expand All @@ -24,11 +25,12 @@ trait MongoCollectionCache[C] {

object MongoCollectionCache {

def apply[C](config: Config, path: String): MongoCollectionCache[C] = {
def apply[C](config: Config, path: String, system: ActorSystem): MongoCollectionCache[C] = {
val reflectiveLookup = ReflectiveLookupExtension(system)
val configuredCache =
for {
className <- Try(config.getString(s"$path.class"))
constructor <- loadCacheConstructor[C](className)
constructor <- loadCacheConstructor[C](className, reflectiveLookup)
} yield constructor.apply(config.getConfig(path))

configuredCache.getOrElse(createDefaultCache(config, path))
Expand Down Expand Up @@ -123,10 +125,10 @@ object MongoCollectionCache {

}

private[this] def loadCacheConstructor[C](className: String): Try[Config => MongoCollectionCache[C]] =
private[this] def loadCacheConstructor[C](className: String, reflectiveLookup: ReflectiveLookupExtension): Try[Config => MongoCollectionCache[C]] =
for {
nonEmptyClassName <- Success(className.trim).filter(_.nonEmpty)
cacheClass <- Try(Class.forName(nonEmptyClassName))
cacheClass <- reflectiveLookup.reflectClassByName[MongoCollectionCache[_]](nonEmptyClassName)
// if the loaded class implements MongoCollectionCache, take it on faith that it can store C
if classOf[MongoCollectionCache[_]].isAssignableFrom(cacheClass)
constructor <- getExpectedConstructor(cacheClass.asInstanceOf[Class[MongoCollectionCache[C]]])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ trait MongoMetrics extends MetricsBuilder with BaseBuilder {
private[this] lazy val metrics: MetricsBuilder = {
val mongoMetricsBuilderClass: String = driver.settings.MongoMetricsBuilderClass.trim
if (mongoMetricsBuilderClass.nonEmpty) {
val builderClass = Class.forName(mongoMetricsBuilderClass)
val reflectiveAccess = ReflectiveLookupExtension(driver.actorSystem)
val builderClass = reflectiveAccess.unsafeReflectClassByName[MetricsBuilder](mongoMetricsBuilderClass)
val builderCons = builderClass.getConstructor()
builderCons.newInstance().asInstanceOf[MetricsBuilder]
builderCons.newInstance()
} else {
DropwizardMetrics
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,10 @@ abstract class MongoPersistenceDriver(as: ActorSystem, config: Config)
*/
private[this] def getSuffixFromPersistenceId(persistenceId: String): String = suffixBuilderClassOption match {
case Some(suffixBuilderClass) if !suffixBuilderClass.trim.isEmpty =>
val builderClass = Class.forName(suffixBuilderClass)
val reflectiveAccess = ReflectiveLookupExtension(actorSystem)
val builderClass = reflectiveAccess.unsafeReflectClassByName[CanSuffixCollectionNames](suffixBuilderClass)
val builderCons = builderClass.getConstructor()
val builderIns = builderCons.newInstance().asInstanceOf[CanSuffixCollectionNames]
val builderIns = builderCons.newInstance()
builderIns.getSuffixFromPersistenceId(persistenceId)
case _ => ""
}
Expand All @@ -119,9 +120,10 @@ abstract class MongoPersistenceDriver(as: ActorSystem, config: Config)
*/
private[this] def validateMongoCharacters(input: String): String = suffixBuilderClassOption match {
case Some(suffixBuilderClass) if !suffixBuilderClass.trim.isEmpty =>
val builderClass = Class.forName(suffixBuilderClass)
val reflectiveAccess = ReflectiveLookupExtension(actorSystem)
val builderClass = reflectiveAccess.unsafeReflectClassByName[CanSuffixCollectionNames](suffixBuilderClass)
val builderCons = builderClass.getConstructor()
val builderIns = builderCons.newInstance().asInstanceOf[CanSuffixCollectionNames]
val builderIns = builderCons.newInstance()
builderIns.validateMongoCharacters(input)
case _ => input
}
Expand Down Expand Up @@ -184,7 +186,7 @@ abstract class MongoPersistenceDriver(as: ActorSystem, config: Config)
IndexSettings(journalTagIndexName, unique = false, sparse = true, TAGS -> 1)
)

private[this] val journalCache = MongoCollectionCache[C](settings.CollectionCache, "journal")
private[this] val journalCache = MongoCollectionCache[C](settings.CollectionCache, "journal", actorSystem)

private[mongodb] def journal(implicit ec: ExecutionContext): C = journal("")

Expand All @@ -206,7 +208,7 @@ abstract class MongoPersistenceDriver(as: ActorSystem, config: Config)
journalCache.invalidate(collectionName)
}

private[this] val snapsCache = MongoCollectionCache[C](settings.CollectionCache, "snaps")
private[this] val snapsCache = MongoCollectionCache[C](settings.CollectionCache, "snaps", actorSystem)

private[mongodb] def snaps(implicit ec: ExecutionContext): C = snaps("")

Expand All @@ -227,14 +229,14 @@ abstract class MongoPersistenceDriver(as: ActorSystem, config: Config)
snapsCache.invalidate(collectionName)
}

private[this] val realtimeCache = MongoCollectionCache[C](settings.CollectionCache, "realtime")
private[this] val realtimeCache = MongoCollectionCache[C](settings.CollectionCache, "realtime", actorSystem)

private[mongodb] def realtime(implicit ec: ExecutionContext): C =
realtimeCache.getOrElseCreate(realtimeCollectionName, collectionName => cappedCollection(collectionName))

private[mongodb] val querySideDispatcher = actorSystem.dispatchers.lookup("akka-contrib-persistence-query-dispatcher")

private[this] val metadataCache = MongoCollectionCache[C](settings.CollectionCache, "metadata")
private[this] val metadataCache = MongoCollectionCache[C](settings.CollectionCache, "metadata", actorSystem)

private[mongodb] def metadata(implicit ec: ExecutionContext): C =
metadataCache.getOrElseCreate(metadataCollectionName, collectionName => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ import scala.util.Try

object MongoPersistenceExtension extends ExtensionId[MongoPersistenceExtension] with ExtensionIdProvider {

override def lookup = MongoPersistenceExtension
override def lookup: ExtensionId[MongoPersistenceExtension] = MongoPersistenceExtension

override def createExtension(actorSystem: ExtendedActorSystem): MongoPersistenceExtension = {
val settings = MongoSettings(actorSystem.settings)
val implementation = settings.Implementation
val implType = actorSystem.dynamicAccess.getClassFor[MongoPersistenceExtension](implementation).getOrElse(Class.forName(implementation))
val implType = actorSystem.dynamicAccess.getClassFor[MongoPersistenceExtension](implementation)
.getOrElse(Class.forName(implementation, true, Thread.currentThread.getContextClassLoader))
val implCons = implType.getConstructor(classOf[ActorSystem])
implCons.newInstance(actorSystem).asInstanceOf[MongoPersistenceExtension]
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package akka.contrib.persistence.mongodb

import akka.actor.{ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider}

import scala.reflect.ClassTag
import scala.util.Try

object ReflectiveLookupExtension extends ExtensionId[ReflectiveLookupExtension] with ExtensionIdProvider {
override def createExtension(system: ExtendedActorSystem): ReflectiveLookupExtension =
new ReflectiveLookupExtension(system)

override def lookup(): ExtensionId[ReflectiveLookupExtension] = ReflectiveLookupExtension
}

class ReflectiveLookupExtension(extendedActorSystem: ExtendedActorSystem) extends Extension {
def reflectClassByName[T:ClassTag](fqcn: String): Try[Class[_ <: T]] =
extendedActorSystem.dynamicAccess.getClassFor[T](fqcn)

def unsafeReflectClassByName[T:ClassTag](fqcn: String): Class[_ <: T] =
reflectClassByName[T](fqcn).get
}
4 changes: 4 additions & 0 deletions common/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ akka {
test {
timefactor = ${?AKKA_TEST_TIMEFACTOR}
}
remote.netty.tcp {
hostname = "127.0.0.1"
port = 2551
}
}

# Path to the journal plugin to be used
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=0.13.17
sbt.version=1.2.8
4 changes: 2 additions & 2 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
libraryDependencies += "org.scala-lang" % "scala-compiler" % scalaVersion.value

addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "1.1")
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "2.5")

addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.1")
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class RxMongoDriver(system: ActorSystem, config: Config, driverProvider: RxMongo
for {
database <- db
names <- database.collectionNames
list <- Future.sequence(names.filterNot(excluded).filter(nameFilter.getOrElse(allPass _)).map(collection))
list <- Future.sequence(names.filterNot(excluded).filter(nameFilter.getOrElse(allPass)).map(collection))
} yield list
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class ScalaMongoDriver(system: ActorSystem, config: Config) extends MongoPersist

for {
names <- db.listCollectionNames().toFuture()
list = names.filterNot(excluded).filter(nameFilter.getOrElse(passAll _))
list = names.filterNot(excluded).filter(nameFilter.getOrElse(passAll))
xs <- Future.sequence(list.map(collection))
} yield xs.toList
}
Expand Down
Loading