From abda3137ad26ce01e26bcb61bf1e720a5bb5109a Mon Sep 17 00:00:00 2001 From: Brian Scully Date: Sun, 13 Sep 2020 11:45:06 -0400 Subject: [PATCH] Upgrade mongo-bson to latest version --- build.sbt | 14 ++++++---- .../ScalaDriverPersistenceExtension.scala | 10 +++---- .../ScalaDriverPersistenceSnapshotter.scala | 4 +-- .../mongodb/ScalaDriverSerializers.scala | 4 +-- .../mongodb/ScalaDriverSettings.scala | 27 +++++++++---------- 5 files changed, 30 insertions(+), 29 deletions(-) diff --git a/build.sbt b/build.sbt index ca34976e..962b6288 100644 --- a/build.sbt +++ b/build.sbt @@ -6,7 +6,7 @@ val scala213V = "2.13.2" val scalaV = scala213V val akkaV = "2.6.9" -val MongoJavaDriverVersion = "3.12.7" +val MongoJavaDriverVersion = "4.1.0" val commonDeps = Seq( ("com.typesafe.akka" %% "akka-persistence" % akkaV) @@ -20,7 +20,7 @@ val commonDeps = Seq( "com.typesafe.akka" %% "akka-persistence" % akkaV % "compile", "com.typesafe.akka" %% "akka-actor" % akkaV % "compile", "org.mongodb" % "mongodb-driver-core" % MongoJavaDriverVersion % "compile", - "org.mongodb" % "mongodb-driver" % MongoJavaDriverVersion % "test", + "org.mongodb" % "mongodb-driver-legacy" % MongoJavaDriverVersion % "test", "org.slf4j" % "slf4j-api" % "1.7.30" % "test", "org.apache.logging.log4j" % "log4j-api" % "2.13.3" % "test", "org.apache.logging.log4j" % "log4j-core" % "2.13.3" % "test", @@ -50,7 +50,7 @@ val commonSettings = Seq( "com.typesafe" % "config" % "1.3.2", "org.slf4j" % "slf4j-api" % "1.7.30", "com.typesafe.akka" %% "akka-stream" % akkaV, - "org.mongodb" % "mongo-java-driver" % MongoJavaDriverVersion + "org.mongodb" % "mongodb-driver-legacy" % MongoJavaDriverVersion ), version := releaseV, organization := "com.github.scullxbones", @@ -98,12 +98,16 @@ lazy val `akka-persistence-mongo-scala` = (project in file("scala")) .settings(commonSettings:_*) .settings( libraryDependencies ++= Seq( - "org.mongodb.scala" %% "mongo-scala-driver" % "2.9.0" % "compile", - "org.mongodb.scala" %% "mongo-scala-bson" % "2.9.0" % "compile", + "org.mongodb.scala" %% "mongo-scala-driver" % "4.1.1" % "compile", + "org.mongodb.scala" %% "mongo-scala-bson" % "4.1.1" % "compile", + "org.mongodb" % "mongodb-driver-core" % "4.1.1" % "compile", "io.netty" % "netty-buffer" % "4.1.52.Final" % "compile", "io.netty" % "netty-transport" % "4.1.52.Final" % "compile", "io.netty" % "netty-handler" % "4.1.52.Final" % "compile", "org.reactivestreams" % "reactive-streams" % "1.0.3" + ), + dependencyOverrides ++= Seq( + "org.mongodb" % "mongodb-driver-core" % "4.1.1" % "compile" ) ) .configs(Travis) diff --git a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceExtension.scala b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceExtension.scala index 8b95bb31..040f9d1f 100644 --- a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceExtension.scala +++ b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceExtension.scala @@ -43,9 +43,9 @@ class ScalaMongoDriver(system: ActorSystem, config: Config) extends MongoPersist override def ensureCollection(name: String): Future[C] = ensureCollection(name, db.createCollection) - private[this] def ensureCollection(name: String, collectionCreator: String => SingleObservable[Completed]): Future[C] = + private[this] def ensureCollection(name: String, collectionCreator: String => SingleObservable[Void]): Future[C] = for { - _ <- collectionCreator(name).toFuture().recover { case MongoErrors.NamespaceExists() => Completed } + _ <- collectionCreator(name).toFuture().recover { case MongoErrors.NamespaceExists() => () } mongoCollection <- collection(name) } yield mongoCollection @@ -54,8 +54,8 @@ class ScalaMongoDriver(system: ActorSystem, config: Config) extends MongoPersist def metadataWriteConcern: WriteConcern = toWriteConcern(journalWriteSafety, journalWTimeout, journalFsync) private def toWriteConcern(writeSafety: WriteSafety, wtimeout: Duration, fsync: Boolean): WriteConcern = (writeSafety, wtimeout.toMillis, fsync) match { - case (Unacknowledged, w, f) => WriteConcern.UNACKNOWLEDGED.withWTimeout(w, TimeUnit.MILLISECONDS).withFsync(f) - case (Acknowledged, w, f) => WriteConcern.ACKNOWLEDGED.withWTimeout(w, TimeUnit.MILLISECONDS).withFsync(f) + case (Unacknowledged, w, f) => WriteConcern.UNACKNOWLEDGED.withWTimeout(w, TimeUnit.MILLISECONDS) + case (Acknowledged, w, f) => WriteConcern.ACKNOWLEDGED.withWTimeout(w, TimeUnit.MILLISECONDS) case (Journaled, w, _) => WriteConcern.JOURNALED.withWTimeout(w, TimeUnit.MILLISECONDS) case (ReplicaAcknowledged, w, f) => WriteConcern.MAJORITY.withWTimeout(w, TimeUnit.MILLISECONDS).withJournal(!f) } @@ -125,7 +125,7 @@ class ScalaMongoDriver(system: ActorSystem, config: Config) extends MongoPersist } else collection.countDocuments().toFuture() } yield count } else Future.successful(firstCount) - _ = if (secondCount == 0L) collection.drop().toFuture().recover { case _ => Completed() } // ignore errors + _ = if (secondCount == 0L) collection.drop().toFuture().recover { case _ => () } // ignore errors } yield () override def ensureIndex(indexName: String, unique: Boolean, sparse: Boolean, fields: (String, Int)*): C => Future[C] = { diff --git a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceSnapshotter.scala b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceSnapshotter.scala index 42a4e6c3..20dcdf23 100644 --- a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceSnapshotter.scala +++ b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverPersistenceSnapshotter.scala @@ -46,8 +46,8 @@ object ScalaDriverPersistenceSnapshotter extends SnapshottingFieldNames { } val pid = document.getString(PROCESSOR_ID).getValue - val sn = document.getLong(SEQUENCE_NUMBER) - val ts = document.getLong(TIMESTAMP) + val sn = document.getInt64(SEQUENCE_NUMBER).longValue() + val ts = document.getInt64(TIMESTAMP).longValue() SelectedSnapshot(SnapshotMetadata(pid, sn, ts), content) } } diff --git a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverSerializers.scala b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverSerializers.scala index 7d511d86..3a8b8385 100644 --- a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverSerializers.scala +++ b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverSerializers.scala @@ -63,7 +63,7 @@ class ScalaDriverSerializers(dynamicAccess: DynamicAccess, actorSystem: ActorSys private def deserializeVersionOne(d: BsonDocument) = Event( pid = d.getString(PROCESSOR_ID).getValue, - sn = d.getLong(SEQUENCE_NUMBER), + sn = d.getInt64(SEQUENCE_NUMBER).longValue(), payload = Payload[BsonValue]( hint = d.getString(TYPE).getValue, any = Option(d.get(PayloadKey)).collect(extractPayloadContent).get, @@ -79,7 +79,7 @@ class ScalaDriverSerializers(dynamicAccess: DynamicAccess, actorSystem: ActorSys private def deserializeDocumentLegacy(d: BsonDocument) = { val persistenceId = d.getString(PROCESSOR_ID).getValue - val sequenceNr = d.getLong(SEQUENCE_NUMBER) + val sequenceNr = d.getInt64(SEQUENCE_NUMBER).longValue() Option(d.get(SERIALIZED)) match { case Some(b: BsonDocument) => Event( diff --git a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverSettings.scala b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverSettings.scala index d3b8322f..995e0d97 100644 --- a/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverSettings.scala +++ b/scala/src/main/scala/akka/contrib/persistence/mongodb/ScalaDriverSettings.scala @@ -14,9 +14,9 @@ import scala.util.Try object ScalaDriverSettings extends ExtensionId[ScalaDriverSettings] with ExtensionIdProvider { override def createExtension(system: ExtendedActorSystem): ScalaDriverSettings = { - val fullPath = s"${getClass.getPackage.getName}.official" - val systemConfig = system.settings.config - new ScalaDriverSettings(systemConfig.getConfig(fullPath)) + val fullPath = s"${getClass.getPackage.getName}.official" + val systemConfig = system.settings.config + new ScalaDriverSettings(systemConfig.getConfig(fullPath)) } override def lookup(): ExtensionId[ScalaDriverSettings] = ScalaDriverSettings @@ -45,20 +45,17 @@ class ScalaDriverSettings(config: Config) extends OfficialDriverSettings(config) val bldr: MongoClientSettings.Builder = MongoClientSettings.builder() .applyConnectionString(new ConnectionString(uri)) - .applyToClusterSettings((t: ClusterSettings.Builder) => { + .applyToClusterSettings{(t: ClusterSettings.Builder) => t.serverSelectionTimeout(getLongQueryProperty("serverselectiontimeoutms").getOrElse(ServerSelectionTimeout.toMillis), TimeUnit.MILLISECONDS) - .maxWaitQueueSize(getIntQueryProperty("waitqueuemultiple").getOrElse(ThreadsAllowedToBlockforConnectionMultiplier) * getIntQueryProperty("maxpoolsize").getOrElse(ConnectionsPerHost)) () - } - ).applyToConnectionPoolSettings((t: ConnectionPoolSettings.Builder) => { - t.maxWaitTime(getLongQueryProperty("waitqueuetimeoutms").getOrElse(MaxWaitTime.toMillis), TimeUnit.MILLISECONDS) - .maxConnectionIdleTime(getLongQueryProperty("maxidletimems").getOrElse(MaxConnectionIdleTime.toMillis), TimeUnit.MILLISECONDS) - .maxConnectionLifeTime(getLongQueryProperty("maxlifetimems").getOrElse(MaxConnectionLifeTime.toMillis), TimeUnit.MILLISECONDS) - .minSize(getIntQueryProperty("minpoolsize").getOrElse(MinConnectionsPerHost)) - .maxSize(getIntQueryProperty("maxpoolsize").getOrElse(ConnectionsPerHost)) - () - } - ).applyToServerSettings((t: ServerSettings.Builder) => { + }.applyToConnectionPoolSettings{(t: ConnectionPoolSettings.Builder) => + t.maxWaitTime(getLongQueryProperty("waitqueuetimeoutms").getOrElse(MaxWaitTime.toMillis), TimeUnit.MILLISECONDS) + .maxConnectionIdleTime(getLongQueryProperty("maxidletimems").getOrElse(MaxConnectionIdleTime.toMillis), TimeUnit.MILLISECONDS) + .maxConnectionLifeTime(getLongQueryProperty("maxlifetimems").getOrElse(MaxConnectionLifeTime.toMillis), TimeUnit.MILLISECONDS) + .minSize(getIntQueryProperty("minpoolsize").getOrElse(MinConnectionsPerHost)) + .maxSize(getIntQueryProperty("maxpoolsize").getOrElse(ConnectionsPerHost)) + () + }.applyToServerSettings((t: ServerSettings.Builder) => { t.heartbeatFrequency(getLongQueryProperty("heartbeatfrequencyms").getOrElse(HeartbeatFrequency.toMillis), TimeUnit.MILLISECONDS) .minHeartbeatFrequency(MinHeartbeatFrequency.toMillis, TimeUnit.MILLISECONDS) // no 'minHeartbeatFrequency' in ConnectionString ()