Skip to content

Commit

Permalink
Merge pull request #390 from scullxbones/wip-mongobson-upgade
Browse files Browse the repository at this point in the history
Upgrade mongo-bson to latest version
  • Loading branch information
scullxbones authored Oct 17, 2020
2 parents 2f2ba98 + abda313 commit 7fcd25d
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 29 deletions.
14 changes: 9 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ val scala213V = "2.13.2"
val scalaV = scala213V
val akkaV = "2.6.9"

val MongoJavaDriverVersion = "3.12.7"
val MongoJavaDriverVersion = "4.1.0"

val commonDeps = Seq(
("com.typesafe.akka" %% "akka-persistence" % akkaV)
Expand All @@ -20,7 +20,7 @@ val commonDeps = Seq(
"com.typesafe.akka" %% "akka-persistence" % akkaV % "compile",
"com.typesafe.akka" %% "akka-actor" % akkaV % "compile",
"org.mongodb" % "mongodb-driver-core" % MongoJavaDriverVersion % "compile",
"org.mongodb" % "mongodb-driver" % MongoJavaDriverVersion % "test",
"org.mongodb" % "mongodb-driver-legacy" % MongoJavaDriverVersion % "test",
"org.slf4j" % "slf4j-api" % "1.7.30" % "test",
"org.apache.logging.log4j" % "log4j-api" % "2.13.3" % "test",
"org.apache.logging.log4j" % "log4j-core" % "2.13.3" % "test",
Expand Down Expand Up @@ -50,7 +50,7 @@ val commonSettings = Seq(
"com.typesafe" % "config" % "1.3.2",
"org.slf4j" % "slf4j-api" % "1.7.30",
"com.typesafe.akka" %% "akka-stream" % akkaV,
"org.mongodb" % "mongo-java-driver" % MongoJavaDriverVersion
"org.mongodb" % "mongodb-driver-legacy" % MongoJavaDriverVersion
),
version := releaseV,
organization := "com.github.scullxbones",
Expand Down Expand Up @@ -98,12 +98,16 @@ lazy val `akka-persistence-mongo-scala` = (project in file("scala"))
.settings(commonSettings:_*)
.settings(
libraryDependencies ++= Seq(
"org.mongodb.scala" %% "mongo-scala-driver" % "2.9.0" % "compile",
"org.mongodb.scala" %% "mongo-scala-bson" % "2.9.0" % "compile",
"org.mongodb.scala" %% "mongo-scala-driver" % "4.1.1" % "compile",
"org.mongodb.scala" %% "mongo-scala-bson" % "4.1.1" % "compile",
"org.mongodb" % "mongodb-driver-core" % "4.1.1" % "compile",
"io.netty" % "netty-buffer" % "4.1.52.Final" % "compile",
"io.netty" % "netty-transport" % "4.1.52.Final" % "compile",
"io.netty" % "netty-handler" % "4.1.52.Final" % "compile",
"org.reactivestreams" % "reactive-streams" % "1.0.3"
),
dependencyOverrides ++= Seq(
"org.mongodb" % "mongodb-driver-core" % "4.1.1" % "compile"
)
)
.configs(Travis)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ class ScalaMongoDriver(system: ActorSystem, config: Config) extends MongoPersist
override def ensureCollection(name: String): Future[C] =
ensureCollection(name, db.createCollection)

private[this] def ensureCollection(name: String, collectionCreator: String => SingleObservable[Completed]): Future[C] =
private[this] def ensureCollection(name: String, collectionCreator: String => SingleObservable[Void]): Future[C] =
for {
_ <- collectionCreator(name).toFuture().recover { case MongoErrors.NamespaceExists() => Completed }
_ <- collectionCreator(name).toFuture().recover { case MongoErrors.NamespaceExists() => () }
mongoCollection <- collection(name)
} yield mongoCollection

Expand All @@ -54,8 +54,8 @@ class ScalaMongoDriver(system: ActorSystem, config: Config) extends MongoPersist
def metadataWriteConcern: WriteConcern = toWriteConcern(journalWriteSafety, journalWTimeout, journalFsync)
private def toWriteConcern(writeSafety: WriteSafety, wtimeout: Duration, fsync: Boolean): WriteConcern =
(writeSafety, wtimeout.toMillis, fsync) match {
case (Unacknowledged, w, f) => WriteConcern.UNACKNOWLEDGED.withWTimeout(w, TimeUnit.MILLISECONDS).withFsync(f)
case (Acknowledged, w, f) => WriteConcern.ACKNOWLEDGED.withWTimeout(w, TimeUnit.MILLISECONDS).withFsync(f)
case (Unacknowledged, w, f) => WriteConcern.UNACKNOWLEDGED.withWTimeout(w, TimeUnit.MILLISECONDS)
case (Acknowledged, w, f) => WriteConcern.ACKNOWLEDGED.withWTimeout(w, TimeUnit.MILLISECONDS)
case (Journaled, w, _) => WriteConcern.JOURNALED.withWTimeout(w, TimeUnit.MILLISECONDS)
case (ReplicaAcknowledged, w, f) => WriteConcern.MAJORITY.withWTimeout(w, TimeUnit.MILLISECONDS).withJournal(!f)
}
Expand Down Expand Up @@ -125,7 +125,7 @@ class ScalaMongoDriver(system: ActorSystem, config: Config) extends MongoPersist
} else collection.countDocuments().toFuture()
} yield count
} else Future.successful(firstCount)
_ = if (secondCount == 0L) collection.drop().toFuture().recover { case _ => Completed() } // ignore errors
_ = if (secondCount == 0L) collection.drop().toFuture().recover { case _ => () } // ignore errors
} yield ()

override def ensureIndex(indexName: String, unique: Boolean, sparse: Boolean, fields: (String, Int)*): C => Future[C] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ object ScalaDriverPersistenceSnapshotter extends SnapshottingFieldNames {
}

val pid = document.getString(PROCESSOR_ID).getValue
val sn = document.getLong(SEQUENCE_NUMBER)
val ts = document.getLong(TIMESTAMP)
val sn = document.getInt64(SEQUENCE_NUMBER).longValue()
val ts = document.getInt64(TIMESTAMP).longValue()
SelectedSnapshot(SnapshotMetadata(pid, sn, ts), content)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class ScalaDriverSerializers(dynamicAccess: DynamicAccess, actorSystem: ActorSys

private def deserializeVersionOne(d: BsonDocument) = Event(
pid = d.getString(PROCESSOR_ID).getValue,
sn = d.getLong(SEQUENCE_NUMBER),
sn = d.getInt64(SEQUENCE_NUMBER).longValue(),
payload = Payload[BsonValue](
hint = d.getString(TYPE).getValue,
any = Option(d.get(PayloadKey)).collect(extractPayloadContent).get,
Expand All @@ -79,7 +79,7 @@ class ScalaDriverSerializers(dynamicAccess: DynamicAccess, actorSystem: ActorSys

private def deserializeDocumentLegacy(d: BsonDocument) = {
val persistenceId = d.getString(PROCESSOR_ID).getValue
val sequenceNr = d.getLong(SEQUENCE_NUMBER)
val sequenceNr = d.getInt64(SEQUENCE_NUMBER).longValue()
Option(d.get(SERIALIZED)) match {
case Some(b: BsonDocument) =>
Event(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import scala.util.Try
object ScalaDriverSettings extends ExtensionId[ScalaDriverSettings] with ExtensionIdProvider {

override def createExtension(system: ExtendedActorSystem): ScalaDriverSettings = {
val fullPath = s"${getClass.getPackage.getName}.official"
val systemConfig = system.settings.config
new ScalaDriverSettings(systemConfig.getConfig(fullPath))
val fullPath = s"${getClass.getPackage.getName}.official"
val systemConfig = system.settings.config
new ScalaDriverSettings(systemConfig.getConfig(fullPath))
}

override def lookup(): ExtensionId[ScalaDriverSettings] = ScalaDriverSettings
Expand Down Expand Up @@ -45,20 +45,17 @@ class ScalaDriverSettings(config: Config) extends OfficialDriverSettings(config)

val bldr: MongoClientSettings.Builder = MongoClientSettings.builder()
.applyConnectionString(new ConnectionString(uri))
.applyToClusterSettings((t: ClusterSettings.Builder) => {
.applyToClusterSettings{(t: ClusterSettings.Builder) =>
t.serverSelectionTimeout(getLongQueryProperty("serverselectiontimeoutms").getOrElse(ServerSelectionTimeout.toMillis), TimeUnit.MILLISECONDS)
.maxWaitQueueSize(getIntQueryProperty("waitqueuemultiple").getOrElse(ThreadsAllowedToBlockforConnectionMultiplier) * getIntQueryProperty("maxpoolsize").getOrElse(ConnectionsPerHost))
()
}
).applyToConnectionPoolSettings((t: ConnectionPoolSettings.Builder) => {
t.maxWaitTime(getLongQueryProperty("waitqueuetimeoutms").getOrElse(MaxWaitTime.toMillis), TimeUnit.MILLISECONDS)
.maxConnectionIdleTime(getLongQueryProperty("maxidletimems").getOrElse(MaxConnectionIdleTime.toMillis), TimeUnit.MILLISECONDS)
.maxConnectionLifeTime(getLongQueryProperty("maxlifetimems").getOrElse(MaxConnectionLifeTime.toMillis), TimeUnit.MILLISECONDS)
.minSize(getIntQueryProperty("minpoolsize").getOrElse(MinConnectionsPerHost))
.maxSize(getIntQueryProperty("maxpoolsize").getOrElse(ConnectionsPerHost))
()
}
).applyToServerSettings((t: ServerSettings.Builder) => {
}.applyToConnectionPoolSettings{(t: ConnectionPoolSettings.Builder) =>
t.maxWaitTime(getLongQueryProperty("waitqueuetimeoutms").getOrElse(MaxWaitTime.toMillis), TimeUnit.MILLISECONDS)
.maxConnectionIdleTime(getLongQueryProperty("maxidletimems").getOrElse(MaxConnectionIdleTime.toMillis), TimeUnit.MILLISECONDS)
.maxConnectionLifeTime(getLongQueryProperty("maxlifetimems").getOrElse(MaxConnectionLifeTime.toMillis), TimeUnit.MILLISECONDS)
.minSize(getIntQueryProperty("minpoolsize").getOrElse(MinConnectionsPerHost))
.maxSize(getIntQueryProperty("maxpoolsize").getOrElse(ConnectionsPerHost))
()
}.applyToServerSettings((t: ServerSettings.Builder) => {
t.heartbeatFrequency(getLongQueryProperty("heartbeatfrequencyms").getOrElse(HeartbeatFrequency.toMillis), TimeUnit.MILLISECONDS)
.minHeartbeatFrequency(MinHeartbeatFrequency.toMillis, TimeUnit.MILLISECONDS) // no 'minHeartbeatFrequency' in ConnectionString
()
Expand Down

0 comments on commit 7fcd25d

Please sign in to comment.