From 969c3ed11cbcee7306a3ab8adf349d28a6aa42a0 Mon Sep 17 00:00:00 2001 From: Brian Scully Date: Sat, 6 Apr 2019 14:58:56 -0400 Subject: [PATCH 1/2] Use Akka DynamicAccess for reflective classloading * Add a small extension used to perform reflective loading * Swap provided dependencies for compile * easier to get started with * more transparent version requirements * Update compiled-against scala versions * Upgrade to sbt 1.2.x * Clean up some compilation warnings --- .travis.yml | 2 +- build.sbt | 33 +++++++++++-------- .../mongodb/CasbahPersistenceJournaller.scala | 4 +-- .../mongodb/MongoCollectionCache.scala | 10 +++--- .../persistence/mongodb/MongoMetrics.scala | 5 +-- .../mongodb/MongoPersistence.scala | 18 +++++----- .../mongodb/MongoPersistenceExtension.scala | 5 +-- .../mongodb/ReflectiveLookupExtension.scala | 21 ++++++++++++ common/src/test/resources/application.conf | 4 +++ project/build.properties | 2 +- project/plugins.sbt | 4 +-- .../mongodb/RxMongoPersistenceExtension.scala | 2 +- .../ScalaDriverPersistenceExtension.scala | 2 +- .../MigrateToSuffixedCollections.scala | 30 ++++++++--------- 14 files changed, 87 insertions(+), 55 deletions(-) create mode 100644 common/src/main/scala/akka/contrib/persistence/mongodb/ReflectiveLookupExtension.scala diff --git a/.travis.yml b/.travis.yml index 6d0c8b45..cce44245 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,7 +9,7 @@ services: language: scala scala: - 2.11.12 - - 2.12.6 + - 2.12.8 env: matrix: - MONGODB_VERSION=3.0 MONGODB_OPTS="--storageEngine wiredTiger" diff --git a/build.sbt b/build.sbt index e92b8ffb..e478f470 100644 --- a/build.sbt +++ b/build.sbt @@ -1,15 +1,15 @@ val releaseV = "2.2.4" -val scalaV = "2.11.8" +val scalaV = "2.11.12" scalaVersion := scalaV -crossScalaVersions := Seq("2.11.8", "2.12.2") +crossScalaVersions := Seq("2.11.12", "2.12.8") val AkkaV = "2.5.12" //min version to have Serialization.withTransportInformation def commonDeps(sv:String) = Seq( - ("com.typesafe.akka" %% "akka-persistence" % AkkaV % "provided") + ("com.typesafe.akka" %% "akka-persistence" % AkkaV) .exclude("org.iq80.leveldb", "leveldb") .exclude("org.fusesource.leveldbjni", "leveldbjni-all"), (sv match { @@ -18,7 +18,7 @@ def commonDeps(sv:String) = Seq( }) .exclude("com.typesafe.akka", "akka-actor_2.11") .exclude("com.typesafe.akka", "akka-actor_2.12"), - "com.typesafe.akka" %% "akka-persistence-query" % AkkaV % "provided", + "com.typesafe.akka" %% "akka-persistence-query" % AkkaV % "compile", "org.mongodb" % "mongodb-driver-core" % "3.8.2" % "compile", "org.mongodb" % "mongodb-driver" % "3.8.2" % "test", "org.slf4j" % "slf4j-api" % "1.7.22" % "test", @@ -40,6 +40,11 @@ val commonSettings = Seq( scalaVersion := scalaV, dependencyOverrides += "org.mongodb" % "mongodb-driver" % "3.8.2" , libraryDependencies ++= commonDeps(scalaBinaryVersion.value), + dependencyOverrides ++= Seq( + "com.typesafe" % "config" % "1.3.2", + "org.slf4j" % "slf4j-api" % "1.7.7", + "com.typesafe.akka" %% "akka-stream" % AkkaV + ), version := releaseV, organization := "com.github.scullxbones", scalacOptions ++= Seq( @@ -85,7 +90,7 @@ lazy val `akka-persistence-mongo-casbah` = (project in file("casbah")) .settings(commonSettings:_*) .settings( libraryDependencies ++= Seq( - "org.mongodb" %% "casbah" % "3.1.1" % "provided" + "org.mongodb" %% "casbah" % "3.1.1" % "compile" ) ) .configs(Travis) @@ -95,11 +100,11 @@ lazy val `akka-persistence-mongo-scala` = (project in file("scala")) .settings(commonSettings:_*) .settings( libraryDependencies ++= Seq( - "org.mongodb.scala" %% "mongo-scala-driver" % "2.4.2" % "provided", - "org.mongodb.scala" %% "mongo-scala-bson" % "2.4.2" % "provided", - "io.netty" % "netty-buffer" % "4.1.17.Final" % "provided", - "io.netty" % "netty-transport" % "4.1.17.Final" % "provided", - "io.netty" % "netty-handler" % "4.1.17.Final" % "provided", + "org.mongodb.scala" %% "mongo-scala-driver" % "2.4.2" % "compile", + "org.mongodb.scala" %% "mongo-scala-bson" % "2.4.2" % "compile", + "io.netty" % "netty-buffer" % "4.1.17.Final" % "compile", + "io.netty" % "netty-transport" % "4.1.17.Final" % "compile", + "io.netty" % "netty-handler" % "4.1.17.Final" % "compile", "org.reactivestreams" % "reactive-streams" % "1.0.2" ) ) @@ -110,10 +115,10 @@ lazy val `akka-persistence-mongo-rxmongo` = (project in file("rxmongo")) .settings(commonSettings:_*) .settings( libraryDependencies ++= Seq( - ("org.reactivemongo" %% "reactivemongo" % "0.16.0" % "provided") + ("org.reactivemongo" %% "reactivemongo" % "0.16.0" % "compile") .exclude("com.typesafe.akka","akka-actor_2.11") .exclude("com.typesafe.akka","akka-actor_2.12"), - ("org.reactivemongo" %% "reactivemongo-akkastream" % "0.16.0" % "provided") + ("org.reactivemongo" %% "reactivemongo-akkastream" % "0.16.0" % "compile") .exclude("com.typesafe.akka","akka-actor_2.11") .exclude("com.typesafe.akka","akka-actor_2.12") ), @@ -127,7 +132,7 @@ lazy val `akka-persistence-mongo-tools` = (project in file("tools")) .settings(commonSettings:_*) .settings( libraryDependencies ++= Seq( - "org.mongodb" %% "casbah" % "3.1.1" % "provided" + "org.mongodb" %% "casbah" % "3.1.1" % "compile" ) ) .configs(Travis) @@ -136,7 +141,7 @@ lazy val `akka-persistence-mongo` = (project in file(".")) .aggregate(`akka-persistence-mongo-common`, `akka-persistence-mongo-casbah`, `akka-persistence-mongo-rxmongo`, `akka-persistence-mongo-scala`, `akka-persistence-mongo-tools`) .settings(commonSettings:_*) .settings( - packagedArtifacts in file(".") := Map.empty, + skip in publish := true, publishTo := Some(Resolver.file("file", new File("target/unusedrepo"))) ) .configs(Travis) diff --git a/casbah/src/main/scala/akka/contrib/persistence/mongodb/CasbahPersistenceJournaller.scala b/casbah/src/main/scala/akka/contrib/persistence/mongodb/CasbahPersistenceJournaller.scala index ed01a55c..5faff2f8 100755 --- a/casbah/src/main/scala/akka/contrib/persistence/mongodb/CasbahPersistenceJournaller.scala +++ b/casbah/src/main/scala/akka/contrib/persistence/mongodb/CasbahPersistenceJournaller.scala @@ -12,7 +12,7 @@ package akka.contrib.persistence.mongodb import akka.actor.ActorSystem import akka.persistence._ -import com.mongodb.casbah.Imports +import com.mongodb.casbah.{Imports, TypeImports} import com.mongodb.{DBObject, DuplicateKeyException} import com.mongodb.casbah.Imports._ import org.slf4j.{Logger, LoggerFactory} @@ -106,7 +106,7 @@ class CasbahPersistenceJournaller(val driver: CasbahMongoDriver) extends MongoPe journal.aggregate($match :: $group :: Nil).results.flatMap(_.getAs[Long]("max")).headOption } - private[this] def setMaxSequenceMetadata(persistenceId: String, maxSequenceNr: Long)(implicit ec: ExecutionContext) = { + private[this] def setMaxSequenceMetadata(persistenceId: String, maxSequenceNr: Long)(implicit ec: ExecutionContext): TypeImports.WriteResult = { try { metadata.update( MongoDBObject(PROCESSOR_ID -> persistenceId), diff --git a/common/src/main/scala/akka/contrib/persistence/mongodb/MongoCollectionCache.scala b/common/src/main/scala/akka/contrib/persistence/mongodb/MongoCollectionCache.scala index b63fc1e4..94c6fbcb 100644 --- a/common/src/main/scala/akka/contrib/persistence/mongodb/MongoCollectionCache.scala +++ b/common/src/main/scala/akka/contrib/persistence/mongodb/MongoCollectionCache.scala @@ -4,6 +4,7 @@ import java.time.{Duration, Instant} import java.util.concurrent.atomic.AtomicReference import java.util.function.BinaryOperator +import akka.actor.ActorSystem import com.typesafe.config.Config import scala.collection.concurrent.TrieMap @@ -24,11 +25,12 @@ trait MongoCollectionCache[C] { object MongoCollectionCache { - def apply[C](config: Config, path: String): MongoCollectionCache[C] = { + def apply[C](config: Config, path: String, system: ActorSystem): MongoCollectionCache[C] = { + val reflectiveLookup = ReflectiveLookupExtension(system) val configuredCache = for { className <- Try(config.getString(s"$path.class")) - constructor <- loadCacheConstructor[C](className) + constructor <- loadCacheConstructor[C](className, reflectiveLookup) } yield constructor.apply(config.getConfig(path)) configuredCache.getOrElse(createDefaultCache(config, path)) @@ -123,10 +125,10 @@ object MongoCollectionCache { } - private[this] def loadCacheConstructor[C](className: String): Try[Config => MongoCollectionCache[C]] = + private[this] def loadCacheConstructor[C](className: String, reflectiveLookup: ReflectiveLookupExtension): Try[Config => MongoCollectionCache[C]] = for { nonEmptyClassName <- Success(className.trim).filter(_.nonEmpty) - cacheClass <- Try(Class.forName(nonEmptyClassName)) + cacheClass <- reflectiveLookup.reflectClassByName[MongoCollectionCache[_]](nonEmptyClassName) // if the loaded class implements MongoCollectionCache, take it on faith that it can store C if classOf[MongoCollectionCache[_]].isAssignableFrom(cacheClass) constructor <- getExpectedConstructor(cacheClass.asInstanceOf[Class[MongoCollectionCache[C]]]) diff --git a/common/src/main/scala/akka/contrib/persistence/mongodb/MongoMetrics.scala b/common/src/main/scala/akka/contrib/persistence/mongodb/MongoMetrics.scala index 315c1a83..dc992559 100644 --- a/common/src/main/scala/akka/contrib/persistence/mongodb/MongoMetrics.scala +++ b/common/src/main/scala/akka/contrib/persistence/mongodb/MongoMetrics.scala @@ -32,9 +32,10 @@ trait MongoMetrics extends MetricsBuilder with BaseBuilder { private[this] lazy val metrics: MetricsBuilder = { val mongoMetricsBuilderClass: String = driver.settings.MongoMetricsBuilderClass.trim if (mongoMetricsBuilderClass.nonEmpty) { - val builderClass = Class.forName(mongoMetricsBuilderClass) + val reflectiveAccess = ReflectiveLookupExtension(driver.actorSystem) + val builderClass = reflectiveAccess.unsafeReflectClassByName[MetricsBuilder](mongoMetricsBuilderClass) val builderCons = builderClass.getConstructor() - builderCons.newInstance().asInstanceOf[MetricsBuilder] + builderCons.newInstance() } else { DropwizardMetrics } diff --git a/common/src/main/scala/akka/contrib/persistence/mongodb/MongoPersistence.scala b/common/src/main/scala/akka/contrib/persistence/mongodb/MongoPersistence.scala index 14657831..5e60ec7b 100755 --- a/common/src/main/scala/akka/contrib/persistence/mongodb/MongoPersistence.scala +++ b/common/src/main/scala/akka/contrib/persistence/mongodb/MongoPersistence.scala @@ -107,9 +107,10 @@ abstract class MongoPersistenceDriver(as: ActorSystem, config: Config) */ private[this] def getSuffixFromPersistenceId(persistenceId: String): String = suffixBuilderClassOption match { case Some(suffixBuilderClass) if !suffixBuilderClass.trim.isEmpty => - val builderClass = Class.forName(suffixBuilderClass) + val reflectiveAccess = ReflectiveLookupExtension(actorSystem) + val builderClass = reflectiveAccess.unsafeReflectClassByName[CanSuffixCollectionNames](suffixBuilderClass) val builderCons = builderClass.getConstructor() - val builderIns = builderCons.newInstance().asInstanceOf[CanSuffixCollectionNames] + val builderIns = builderCons.newInstance() builderIns.getSuffixFromPersistenceId(persistenceId) case _ => "" } @@ -119,9 +120,10 @@ abstract class MongoPersistenceDriver(as: ActorSystem, config: Config) */ private[this] def validateMongoCharacters(input: String): String = suffixBuilderClassOption match { case Some(suffixBuilderClass) if !suffixBuilderClass.trim.isEmpty => - val builderClass = Class.forName(suffixBuilderClass) + val reflectiveAccess = ReflectiveLookupExtension(actorSystem) + val builderClass = reflectiveAccess.unsafeReflectClassByName[CanSuffixCollectionNames](suffixBuilderClass) val builderCons = builderClass.getConstructor() - val builderIns = builderCons.newInstance().asInstanceOf[CanSuffixCollectionNames] + val builderIns = builderCons.newInstance() builderIns.validateMongoCharacters(input) case _ => input } @@ -184,7 +186,7 @@ abstract class MongoPersistenceDriver(as: ActorSystem, config: Config) IndexSettings(journalTagIndexName, unique = false, sparse = true, TAGS -> 1) ) - private[this] val journalCache = MongoCollectionCache[C](settings.CollectionCache, "journal") + private[this] val journalCache = MongoCollectionCache[C](settings.CollectionCache, "journal", actorSystem) private[mongodb] def journal(implicit ec: ExecutionContext): C = journal("") @@ -206,7 +208,7 @@ abstract class MongoPersistenceDriver(as: ActorSystem, config: Config) journalCache.invalidate(collectionName) } - private[this] val snapsCache = MongoCollectionCache[C](settings.CollectionCache, "snaps") + private[this] val snapsCache = MongoCollectionCache[C](settings.CollectionCache, "snaps", actorSystem) private[mongodb] def snaps(implicit ec: ExecutionContext): C = snaps("") @@ -227,14 +229,14 @@ abstract class MongoPersistenceDriver(as: ActorSystem, config: Config) snapsCache.invalidate(collectionName) } - private[this] val realtimeCache = MongoCollectionCache[C](settings.CollectionCache, "realtime") + private[this] val realtimeCache = MongoCollectionCache[C](settings.CollectionCache, "realtime", actorSystem) private[mongodb] def realtime(implicit ec: ExecutionContext): C = realtimeCache.getOrElseCreate(realtimeCollectionName, collectionName => cappedCollection(collectionName)) private[mongodb] val querySideDispatcher = actorSystem.dispatchers.lookup("akka-contrib-persistence-query-dispatcher") - private[this] val metadataCache = MongoCollectionCache[C](settings.CollectionCache, "metadata") + private[this] val metadataCache = MongoCollectionCache[C](settings.CollectionCache, "metadata", actorSystem) private[mongodb] def metadata(implicit ec: ExecutionContext): C = metadataCache.getOrElseCreate(metadataCollectionName, collectionName => { diff --git a/common/src/main/scala/akka/contrib/persistence/mongodb/MongoPersistenceExtension.scala b/common/src/main/scala/akka/contrib/persistence/mongodb/MongoPersistenceExtension.scala index 8149a53c..f403c01e 100755 --- a/common/src/main/scala/akka/contrib/persistence/mongodb/MongoPersistenceExtension.scala +++ b/common/src/main/scala/akka/contrib/persistence/mongodb/MongoPersistenceExtension.scala @@ -19,12 +19,13 @@ import scala.util.Try object MongoPersistenceExtension extends ExtensionId[MongoPersistenceExtension] with ExtensionIdProvider { - override def lookup = MongoPersistenceExtension + override def lookup: ExtensionId[MongoPersistenceExtension] = MongoPersistenceExtension override def createExtension(actorSystem: ExtendedActorSystem): MongoPersistenceExtension = { val settings = MongoSettings(actorSystem.settings) val implementation = settings.Implementation - val implType = actorSystem.dynamicAccess.getClassFor[MongoPersistenceExtension](implementation).getOrElse(Class.forName(implementation)) + val implType = actorSystem.dynamicAccess.getClassFor[MongoPersistenceExtension](implementation) + .getOrElse(Class.forName(implementation, true, Thread.currentThread.getContextClassLoader)) val implCons = implType.getConstructor(classOf[ActorSystem]) implCons.newInstance(actorSystem).asInstanceOf[MongoPersistenceExtension] } diff --git a/common/src/main/scala/akka/contrib/persistence/mongodb/ReflectiveLookupExtension.scala b/common/src/main/scala/akka/contrib/persistence/mongodb/ReflectiveLookupExtension.scala new file mode 100644 index 00000000..8b80da54 --- /dev/null +++ b/common/src/main/scala/akka/contrib/persistence/mongodb/ReflectiveLookupExtension.scala @@ -0,0 +1,21 @@ +package akka.contrib.persistence.mongodb + +import akka.actor.{ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider} + +import scala.reflect.ClassTag +import scala.util.Try + +object ReflectiveLookupExtension extends ExtensionId[ReflectiveLookupExtension] with ExtensionIdProvider { + override def createExtension(system: ExtendedActorSystem): ReflectiveLookupExtension = + new ReflectiveLookupExtension(system) + + override def lookup(): ExtensionId[ReflectiveLookupExtension] = ReflectiveLookupExtension +} + +class ReflectiveLookupExtension(extendedActorSystem: ExtendedActorSystem) extends Extension { + def reflectClassByName[T:ClassTag](fqcn: String): Try[Class[_ <: T]] = + extendedActorSystem.dynamicAccess.getClassFor[T](fqcn) + + def unsafeReflectClassByName[T:ClassTag](fqcn: String): Class[_ <: T] = + reflectClassByName[T](fqcn).get +} \ No newline at end of file diff --git a/common/src/test/resources/application.conf b/common/src/test/resources/application.conf index dcba4818..0610e9ce 100644 --- a/common/src/test/resources/application.conf +++ b/common/src/test/resources/application.conf @@ -10,6 +10,10 @@ akka { test { timefactor = ${?AKKA_TEST_TIMEFACTOR} } + remote.netty.tcp { + hostname = "127.0.0.1" + port = 2551 + } } # Path to the journal plugin to be used diff --git a/project/build.properties b/project/build.properties index 133a8f19..c0bab049 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=0.13.17 +sbt.version=1.2.8 diff --git a/project/plugins.sbt b/project/plugins.sbt index 6c419263..32fb1517 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,5 +1,5 @@ libraryDependencies += "org.scala-lang" % "scala-compiler" % scalaVersion.value -addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "1.1") +addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "2.5") -addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0") +addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.1") diff --git a/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoPersistenceExtension.scala b/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoPersistenceExtension.scala index 18e4f6a8..b8fd4197 100755 --- a/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoPersistenceExtension.scala +++ b/rxmongo/src/main/scala/akka/contrib/persistence/mongodb/RxMongoPersistenceExtension.scala @@ -159,7 +159,7 @@ class RxMongoDriver(system: ActorSystem, config: Config, driverProvider: RxMongo for { database <- db names <- database.collectionNames - list <- Future.sequence(names.filterNot(excluded).filter(nameFilter.getOrElse(allPass _)).map(collection)) + list <- Future.sequence(names.filterNot(excluded).filter(nameFilter.getOrElse(allPass)).map(collection)) } yield list } diff --git a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceExtension.scala b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceExtension.scala index ff937dd3..d8f2e059 100644 --- a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceExtension.scala +++ b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceExtension.scala @@ -94,7 +94,7 @@ class ScalaMongoDriver(system: ActorSystem, config: Config) extends MongoPersist for { names <- db.listCollectionNames().toFuture() - list = names.filterNot(excluded).filter(nameFilter.getOrElse(passAll _)) + list = names.filterNot(excluded).filter(nameFilter.getOrElse(passAll)) xs <- Future.sequence(list.map(collection)) } yield xs.toList } diff --git a/tools/src/main/scala/akka/contrib/persistence/mongodb/MigrateToSuffixedCollections.scala b/tools/src/main/scala/akka/contrib/persistence/mongodb/MigrateToSuffixedCollections.scala index 652febe0..6d6aba13 100644 --- a/tools/src/main/scala/akka/contrib/persistence/mongodb/MigrateToSuffixedCollections.scala +++ b/tools/src/main/scala/akka/contrib/persistence/mongodb/MigrateToSuffixedCollections.scala @@ -6,14 +6,11 @@ package akka.contrib.persistence.mongodb import akka.actor.ActorSystem +import akka.contrib.persistence.mongodb.JournallingFieldNames._ import com.mongodb.casbah.Imports._ import com.typesafe.config.ConfigFactory -import akka.contrib.persistence.mongodb.JournallingFieldNames._ - -import scala.util.Try -import scala.util.Random -import com.typesafe.config.ConfigFactory +import scala.util.{Random, Try} class MigrateToSuffixedCollections(system: ActorSystem) extends CasbahMongoDriver(system, ConfigFactory.empty()) { @@ -66,8 +63,8 @@ class MigrateToSuffixedCollections(system: ActorSystem) extends CasbahMongoDrive // retrieve journal or snapshot properties val (makeNewCollection, getNewCollectionName, writeConcern, summaryTitle) = originCollection match { - case c: MongoCollection if (c == journal) => (makeJournal, getJournalCollectionName(_), journalWriteConcern, "journals") - case c: MongoCollection if (c == snaps) => (makeSnaps, getSnapsCollectionName(_), snapsWriteConcern, "snapshots") + case c: MongoCollection if c == journal => (makeJournal, getJournalCollectionName _, journalWriteConcern, "journals") + case c: MongoCollection if c == snaps => (makeSnaps, getSnapsCollectionName _, snapsWriteConcern, "snapshots") } val originCollectionName = getOriginCollectionName(originCollection) @@ -88,14 +85,14 @@ class MigrateToSuffixedCollections(system: ActorSystem) extends CasbahMongoDrive // we group by future suffixed collection name, foldLeft methods are only here for counting val (totalOk, totalIgnored) = temporaryCollection.find().toSeq.groupBy { tempDbObject => tempDbObject.getAs[String]("_id") match { - case Some(pid) if (pid != null) => getNewCollectionName(pid) + case Some(pid) if pid != null => getNewCollectionName(pid) case _ => originCollectionName } }.foldLeft(0L, 0L) { - case ((done, ignored), (newCollectionName, tempDbObjects)) => { + case ((done, ignored), (newCollectionName, tempDbObjects)) => // we create suffixed collection val newCollection = tempDbObjects.head.getAs[String]("_id") match { - case Some(pid) if (pid != null) => makeNewCollection(pid) + case Some(pid) if pid != null => makeNewCollection(pid) case _ => originCollection } // we check new suffixed collection is not origin unique collection @@ -108,7 +105,6 @@ class MigrateToSuffixedCollections(system: ActorSystem) extends CasbahMongoDrive val notMigrated = ignoreRecords(tempDbObjects, originCollection) (done, ignored + notMigrated) } - } } // logging... logger.info(s"${summaryTitle.toUpperCase}: $totalOk/$totalCount records were successfully transfered to suffixed collections") @@ -164,7 +160,7 @@ class MigrateToSuffixedCollections(system: ActorSystem) extends CasbahMongoDrive case t: Throwable => logger.error(s"Errors occurred when trying to insert record in '$newCollectionName'", t) i - } getOrElse (i) + } getOrElse i } /** @@ -178,7 +174,7 @@ class MigrateToSuffixedCollections(system: ActorSystem) extends CasbahMongoDrive case t: Throwable => logger.error(s"Errors occurred when trying to remove records from '${getOriginCollectionName(originCollection)}'", t) i - } getOrElse (i) + } getOrElse i } @@ -201,16 +197,16 @@ class MigrateToSuffixedCollections(system: ActorSystem) extends CasbahMongoDrive * Convenient method to generate a simple query widely used in this class */ private[this] def pidQuery(dbo: DBObject) = dbo.getAs[String]("_id") match { - case Some(pid) if (pid != null) => (PROCESSOR_ID $eq pid) - case _ => (PROCESSOR_ID $eq Nil) + case Some(pid) if pid != null => PROCESSOR_ID $eq pid + case _ => PROCESSOR_ID $eq Nil } /** * Convenient method to retrieve origin collection name */ private[this] def getOriginCollectionName(originCollection: MongoCollection): String = originCollection match { - case c: MongoCollection if (c == journal) => settings.JournalCollection - case c: MongoCollection if (c == snaps) => settings.SnapsCollection + case c: MongoCollection if c == journal => settings.JournalCollection + case c: MongoCollection if c == snaps => settings.SnapsCollection } } \ No newline at end of file From 6ecf4476f2d4a051b494d41d00a65c59a5139eb7 Mon Sep 17 00:00:00 2001 From: Brian Scully Date: Sun, 7 Apr 2019 14:36:18 -0400 Subject: [PATCH 2/2] Switching from provided to compile caused older driver version to get pulled in; use overrides to specify --- build.sbt | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/build.sbt b/build.sbt index e478f470..432bdad1 100644 --- a/build.sbt +++ b/build.sbt @@ -7,6 +7,7 @@ scalaVersion := scalaV crossScalaVersions := Seq("2.11.12", "2.12.8") val AkkaV = "2.5.12" //min version to have Serialization.withTransportInformation +val MongoJavaDriverVersion = "3.8.2" def commonDeps(sv:String) = Seq( ("com.typesafe.akka" %% "akka-persistence" % AkkaV) @@ -19,8 +20,8 @@ def commonDeps(sv:String) = Seq( .exclude("com.typesafe.akka", "akka-actor_2.11") .exclude("com.typesafe.akka", "akka-actor_2.12"), "com.typesafe.akka" %% "akka-persistence-query" % AkkaV % "compile", - "org.mongodb" % "mongodb-driver-core" % "3.8.2" % "compile", - "org.mongodb" % "mongodb-driver" % "3.8.2" % "test", + "org.mongodb" % "mongodb-driver-core" % MongoJavaDriverVersion % "compile", + "org.mongodb" % "mongodb-driver" % MongoJavaDriverVersion % "test", "org.slf4j" % "slf4j-api" % "1.7.22" % "test", "org.apache.logging.log4j" % "log4j-api" % "2.5" % "test", "org.apache.logging.log4j" % "log4j-core" % "2.5" % "test", @@ -42,8 +43,9 @@ val commonSettings = Seq( libraryDependencies ++= commonDeps(scalaBinaryVersion.value), dependencyOverrides ++= Seq( "com.typesafe" % "config" % "1.3.2", - "org.slf4j" % "slf4j-api" % "1.7.7", - "com.typesafe.akka" %% "akka-stream" % AkkaV + "org.slf4j" % "slf4j-api" % "1.7.22", + "com.typesafe.akka" %% "akka-stream" % AkkaV, + "org.mongodb" % "mongo-java-driver" % MongoJavaDriverVersion ), version := releaseV, organization := "com.github.scullxbones",