Skip to content

Commit

Permalink
Improve collections count (#260)
Browse files Browse the repository at this point in the history
* avoid counting an entire collection to test if it may be dropped
* improve collection counting to test if it may be dropped
  • Loading branch information
JeanFrancoisGuena authored and scullxbones committed Sep 17, 2019
1 parent 5204dac commit 9fa6043
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{ActorMaterializer, Materializer}
import org.slf4j.{Logger, LoggerFactory}
import reactivemongo.akkastream._
import reactivemongo.api.ReadConcern
import reactivemongo.api.collections.bson.BSONCollection
import reactivemongo.api.commands.{LastError, WriteResult}
import reactivemongo.bson.{BSONDocument, _}
Expand Down Expand Up @@ -204,12 +203,8 @@ class RxMongoJournaller(val driver: RxMongoDriver) extends MongoPersistenceJourn

} yield {
if (driver.useSuffixedCollectionNames && driver.suffixDropEmpty && removed.ok)
for {
n <- journal.count(None, None, 0, None, ReadConcern.Local)
if n == 0
_ <- journal.drop(failIfNotFound = false)
_ = driver.removeJournalInCache(persistenceId)
} yield ()
driver.removeEmptyJournal(journal)
.map(_ => driver.removeJournalInCache(persistenceId))
()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ package akka.contrib.persistence.mongodb
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import com.typesafe.config.{Config, ConfigFactory}
import reactivemongo.api.{DefaultDB, FailoverStrategy, MongoConnection, MongoDriver}
import reactivemongo.api._
import reactivemongo.api.collections.bson.BSONCollection
import reactivemongo.api.commands.{CommandError, WriteConcern}
import reactivemongo.api.commands.{Command, CommandError, WriteConcern}
import reactivemongo.api.indexes.{Index, IndexType}
import reactivemongo.bson._
import reactivemongo.bson.{BSONDocument, _}

import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

object RxMongoPersistenceDriver {
Expand Down Expand Up @@ -141,6 +141,58 @@ class RxMongoDriver(system: ActorSystem, config: Config, driverProvider: RxMongo
private[mongodb] def journalCollectionsAsFuture(implicit ec: ExecutionContext) = getCollectionsAsFuture(journalCollectionName)

private[mongodb] def getSnapshotCollections()(implicit ec: ExecutionContext) = getCollectionsAsFuture(snapsCollectionName)

private[mongodb] def removeEmptyJournal(jnl: BSONCollection)(implicit ec: ExecutionContext): Future[Unit] =
removeEmptyCollection(jnl, journalIndexName)

private[mongodb] def removeEmptySnapshot(snp: BSONCollection)(implicit ec: ExecutionContext): Future[Unit] =
removeEmptyCollection(snp, snapsIndexName)

private[this] var mongoVersion: Option[String] = None
private[this] def getMongoVersion(implicit ec: ExecutionContext): Future[String] = mongoVersion match {
case Some(v) => Future.successful(v)
case None =>
db.flatMap { database =>
val runner = Command.run(BSONSerializationPack, FailoverStrategy())
runner.apply(database, runner.rawCommand(BSONDocument("buildInfo" -> 1)))
.one[BSONDocument](ReadPreference.Primary)
.map(_.getAs[BSONString]("version").getOrElse(BSONString("")).value)
.map { v =>
mongoVersion = Some(v)
v
}
}
}

private[this] def isMongoVersionAtLeast(inputNbs: Int*)(implicit ec: ExecutionContext): Future[Boolean] =
getMongoVersion.map {
case str if str.isEmpty => false
case str =>
val versionNbs = str.split('.').map(_.toInt)
inputNbs.zip(versionNbs).forall { case (i,v) => v >= i }
}

private[this] def removeEmptyCollection(collection: BSONCollection, indexName: String)(implicit ec: ExecutionContext): Future[Unit] =
for {
// first count, may be inaccurate in cluster environment
firstCount <- collection.count(None, None, 0, None, ReadConcern.Local)
// just to be sure: second count, always accurate and should be fast as we are pretty sure the result is zero
secondCount <- if (firstCount == 0L) {
for {
b36 <- isMongoVersionAtLeast(3,6)
if b36 // lets optimize aggregate method, using appropriate index
count <- if (b36) {
collection.count(None, None, 0, Some(collection.hint(indexName)), ReadConcern.Majority)
} else {
collection.count(None, None, 0, None, ReadConcern.Majority)
}
} yield count
} else {
Future.successful(firstCount)
}
if secondCount == 0L
_ <- collection.drop(failIfNotFound = false)
} yield ()

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
package akka.contrib.persistence.mongodb

import akka.persistence.SelectedSnapshot
import reactivemongo.api.ReadConcern
import reactivemongo.api.indexes._
import reactivemongo.bson._

Expand Down Expand Up @@ -56,12 +55,8 @@ class RxMongoSnapshotter(driver: RxMongoDriver) extends MongoPersistenceSnapshot
wr <- s.delete().one(BSONDocument(criteria: _*))
} yield {
if (driver.useSuffixedCollectionNames && driver.suffixDropEmpty && wr.ok)
for {
n <- s.count(None, None, 0, None, ReadConcern.Local)
if n == 0
_ <- s.drop(failIfNotFound = false)
_ = driver.removeSnapsInCache(pid)
} yield ()
driver.removeEmptySnapshot(s)
.map(_ => driver.removeSnapsInCache(pid))
()
}
}
Expand All @@ -77,12 +72,8 @@ class RxMongoSnapshotter(driver: RxMongoDriver) extends MongoPersistenceSnapshot
))
} yield {
if (driver.useSuffixedCollectionNames && driver.suffixDropEmpty && wr.ok)
for {
n <- s.count(None, None, 0, None, ReadConcern.Local)
if n == 0
_ <- s.drop(failIfNotFound = false)
_ = driver.removeSnapsInCache(pid)
} yield ()
driver.removeEmptySnapshot(s)
.map(_ => driver.removeSnapsInCache(pid))
()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import akka.stream.ActorMaterializer
import com.mongodb.ConnectionString
import com.mongodb.client.model.{CreateCollectionOptions, IndexOptions}
import com.typesafe.config.Config
import org.mongodb.scala.bson.BsonDocument
import org.mongodb.scala.bson.{BsonDocument, BsonString}
import org.mongodb.scala.model.CountOptions
import org.mongodb.scala.model.Indexes._
import org.mongodb.scala.{MongoClientSettings, _}

Expand Down Expand Up @@ -102,6 +103,79 @@ class ScalaMongoDriver(system: ActorSystem, config: Config) extends MongoPersist

private[mongodb] def snapshotCollectionsAsFuture(implicit ec: ExecutionContext) = getCollectionsAsFuture(snapsCollectionName)

private[mongodb] def removeEmptyJournal(jnl: MongoCollection[D])(implicit ec: ExecutionContext): Future[Unit] =
removeEmptyCollection(jnl, journalIndexName)

private[mongodb] def removeEmptySnapshot(snp: MongoCollection[D])(implicit ec: ExecutionContext): Future[Unit] =
removeEmptyCollection(snp, snapsIndexName)

private[this] var mongoVersion: Option[String] = None
private[this] def getMongoVersion(implicit ec: ExecutionContext): Future[String] = mongoVersion match {
case Some(v) => Future.successful(v)
case None =>
db.runCommand(BsonDocument("buildInfo" -> 1)).toFuture()
.map(_.get("version").getOrElse(BsonString("")).asString().getValue)
.map { v =>
mongoVersion = Some(v)
v
}
}

private[this] def isMongoVersionAtLeast(inputNbs: Int*)(implicit ec: ExecutionContext): Future[Boolean] =
getMongoVersion.map {
case str if str.isEmpty => false
case str =>
val versionNbs = str.split('.').map(_.toInt)
inputNbs.zip(versionNbs).forall { case (i,v) => v >= i }
}

private[this] def getLocalCount(collection: MongoCollection[D])(implicit ec: ExecutionContext): Future[Long] = {
db.runCommand(BsonDocument("count" -> s"${collection.namespace.getCollectionName}", "readConcern" -> BsonDocument("level" -> "local")))
.toFuture()
.map(_.getOrElse("n", 0L).asInt32().longValue())
}

private[this] def getIndexAsBson(collection: MongoCollection[D], indexName: String)(implicit ec: ExecutionContext): Future[Option[BsonDocument]] =
for {
indexList <- collection.listIndexes[BsonDocument]().toFuture()
indexDoc = indexList.find(_.get("name").asString().getValue.equals(indexName))
indexKey = indexDoc match {
case Some(doc) => Some(doc.get("key").asDocument())
case None => None
}
} yield indexKey

private[this] def removeEmptyCollection(collection: MongoCollection[D], indexName: String)(implicit ec: ExecutionContext): Future[Unit] =
for {
b403 <- isMongoVersionAtLeast(4,0,3)
// first count, may be inaccurate in cluster environment
firstCount <- if (b403) {
collection.estimatedDocumentCount().toFuture()
} else {
getLocalCount(collection)
}
// just to be sure: second count, always accurate and should be fast as we are pretty sure the result is zero
secondCount <- if (firstCount == 0L) {
for {
b36 <- isMongoVersionAtLeast(3,6)
if b36 // lets optimize aggregate method, using appropriate index (that we have to grab from indexes list)
indexKey <- getIndexAsBson(collection, indexName)
count <- if (b36) {
indexKey match {
case Some(index) => collection.countDocuments(BsonDocument(), CountOptions().hint(index)).toFuture()
case None => collection.countDocuments().toFuture()
}
} else {
collection.countDocuments().toFuture()
}
} yield count
} else {
Future.successful(firstCount)
}
if secondCount == 0L
_ <- collection.drop().toFuture().recover { case _ => Completed() } // ignore errors
} yield ()

override private[mongodb] def ensureIndex(indexName: String, unique: Boolean, sparse: Boolean, fields: (String, Int)*)(implicit ec: ExecutionContext): C => C = { collection =>
for {
c <- collection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,8 @@ class ScalaDriverPersistenceJournaller(val driver: ScalaMongoDriver) extends Mon

} yield {
if (driver.useSuffixedCollectionNames && driver.suffixDropEmpty && removed.wasAcknowledged())
for {
n <- journal.countDocuments().toFuture()
if n == 0
_ <- journal.drop().toFuture().recover{ case _ => Completed() } // ignore errors
_ = driver.removeJournalInCache(persistenceId)
} yield ()
driver.removeEmptyJournal(journal)
.map(_ => driver.removeJournalInCache(persistenceId))
()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,8 @@ class ScalaDriverPersistenceSnapshotter(driver: ScalaMongoDriver) extends MongoP
wr <- s.deleteMany(criteria).toFuture()
} yield {
if (driver.useSuffixedCollectionNames && driver.suffixDropEmpty && wr.wasAcknowledged())
for {
n <- s.countDocuments().toFuture() if n == 0
_ <- s.drop().toFuture()
_ = driver.removeSnapsInCache(pid)
} yield ()
driver.removeEmptySnapshot(s)
.map(_ => driver.removeSnapsInCache(pid))
()
}
}
Expand All @@ -139,11 +136,8 @@ class ScalaDriverPersistenceSnapshotter(driver: ScalaMongoDriver) extends MongoP
).toFuture()
} yield {
if (driver.useSuffixedCollectionNames && driver.suffixDropEmpty && wr.wasAcknowledged())
for {
n <- s.countDocuments().toFuture() if n == 0L
_ <- s.drop().toFuture()
_ = driver.removeSnapsInCache(pid)
} yield ()
driver.removeEmptySnapshot(s)
.map(_ => driver.removeSnapsInCache(pid))
()
}
}
Expand Down

0 comments on commit 9fa6043

Please sign in to comment.