diff --git a/CHANGELOG.md b/CHANGELOG.md index 395f47de9..fec7a8521 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ All notable changes to this project will be documented in this file. ## [Unreleased] ### Changed - Whitelisting update +- Majority selection algorithm +- Missing proposal lookup algorithm ## [v2.23.3] 2021-04-19 ### Changed diff --git a/build.sbt b/build.sbt index 9f953dea4..ed1d192c0 100644 --- a/build.sbt +++ b/build.sbt @@ -175,7 +175,8 @@ lazy val coreDependencies = Seq( "pl.abankowski" %% "http-request-signer-core" % versions.httpSigner, "pl.abankowski" %% "http4s-request-signer" % versions.httpSigner, "io.chrisdavenport" %% "fuuid" % "0.4.0", - "io.chrisdavenport" %% "mapref" % "0.1.1" + "io.chrisdavenport" %% "mapref" % "0.1.1", + "net.cinnom" % "nano-cuckoo" % "2.0.0" ) ++ prometheusDependencies ++ http4sDependencies ++ schemaSharedDependencies ++ twitterAlgebirdDependencies ++ pureconfigDependencies ++ monocleDependencies //Test dependencies diff --git a/schema/src/main/scala/org/constellation/schema/serialization/SchemaKryoRegistrar.scala b/schema/src/main/scala/org/constellation/schema/serialization/SchemaKryoRegistrar.scala index b2da10b17..35e00ce60 100644 --- a/schema/src/main/scala/org/constellation/schema/serialization/SchemaKryoRegistrar.scala +++ b/schema/src/main/scala/org/constellation/schema/serialization/SchemaKryoRegistrar.scala @@ -6,8 +6,7 @@ import org.constellation.schema.edge._ import org.constellation.schema.observation._ import org.constellation.schema.signature.{HashSignature, SignatureBatch, Signed} import org.constellation.schema.snapshot.{ - HeightRange, - MajorityInfo, + FilterData, Snapshot, SnapshotInfo, SnapshotProposal, @@ -102,7 +101,6 @@ object SchemaKryoRegistrar (classOf[SnapshotProposalPayload], 200), (classOf[Signed[SnapshotProposal]], 201), (classOf[SnapshotProposal], 202), - (classOf[MajorityInfo], 203), - (classOf[HeightRange], 204) + (classOf[FilterData], 203) ) ) {} diff --git a/schema/src/main/scala/org/constellation/schema/snapshot/FilterData.scala b/schema/src/main/scala/org/constellation/schema/snapshot/FilterData.scala new file mode 100644 index 000000000..bcc298d47 --- /dev/null +++ b/schema/src/main/scala/org/constellation/schema/snapshot/FilterData.scala @@ -0,0 +1,14 @@ +package org.constellation.schema.snapshot + +import io.circe.{Decoder, Encoder} +import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} + +case class FilterData( + contents: Array[Byte], + capacity: Long +) + +object FilterData { + implicit val encoder: Encoder[FilterData] = deriveEncoder + implicit val decoder: Decoder[FilterData] = deriveDecoder +} diff --git a/schema/src/main/scala/org/constellation/schema/snapshot/HeightRange.scala b/schema/src/main/scala/org/constellation/schema/snapshot/HeightRange.scala index 65ea44320..1b64b96c9 100644 --- a/schema/src/main/scala/org/constellation/schema/snapshot/HeightRange.scala +++ b/schema/src/main/scala/org/constellation/schema/snapshot/HeightRange.scala @@ -6,11 +6,15 @@ import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} /** * Inclusive range of heights */ -case class HeightRange(from: Long, to: Long) +case class HeightRange(from: Long, to: Long) { + def nonEmpty: Boolean = !empty + + def empty: Boolean = from > to +} object HeightRange { val MaxRange = HeightRange(0, Long.MaxValue) - val MinRange = HeightRange(0, 0) + val Empty = HeightRange(Long.MaxValue, 0) implicit val encoder: Encoder[HeightRange] = deriveEncoder implicit val decoder: Decoder[HeightRange] = deriveDecoder } diff --git a/schema/src/main/scala/org/constellation/schema/snapshot/MajorityInfo.scala b/schema/src/main/scala/org/constellation/schema/snapshot/MajorityInfo.scala deleted file mode 100644 index 139b9cca5..000000000 --- a/schema/src/main/scala/org/constellation/schema/snapshot/MajorityInfo.scala +++ /dev/null @@ -1,14 +0,0 @@ -package org.constellation.schema.snapshot - -import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} -import io.circe.{Decoder, Encoder} - -case class MajorityInfo( - majorityRange: HeightRange, - majorityGaps: List[HeightRange] -) - -object MajorityInfo { - implicit val encoder: Encoder[MajorityInfo] = deriveEncoder - implicit val decoder: Decoder[MajorityInfo] = deriveDecoder -} diff --git a/schema/src/main/scala/org/constellation/schema/snapshot/SnapshotProposalPayload.scala b/schema/src/main/scala/org/constellation/schema/snapshot/SnapshotProposalPayload.scala index 8d117ed29..c1a906ade 100644 --- a/schema/src/main/scala/org/constellation/schema/snapshot/SnapshotProposalPayload.scala +++ b/schema/src/main/scala/org/constellation/schema/snapshot/SnapshotProposalPayload.scala @@ -6,7 +6,7 @@ import org.constellation.schema.signature.Signed case class SnapshotProposalPayload( proposal: Signed[SnapshotProposal], - majorityInfo: MajorityInfo + filterData: FilterData ) object SnapshotProposalPayload { diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 60921cccd..bf21c2372 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -80,8 +80,8 @@ constellation { snapshotHeightDelayInterval = 20 snapshotHeightRedownloadDelayInterval = 30 meaningfulSnapshotsCount = 40 - missingProposalOffset = 0 - missingProposalLimit = 40 + stallCountThreshold = 4 + proposalLookupLimit = 8 } transaction { generator { diff --git a/src/main/scala/org/constellation/ConfigUtil.scala b/src/main/scala/org/constellation/ConfigUtil.scala index 2cc2879cb..b813bfbc9 100644 --- a/src/main/scala/org/constellation/ConfigUtil.scala +++ b/src/main/scala/org/constellation/ConfigUtil.scala @@ -59,9 +59,6 @@ object ConfigUtil { def isEnabledAWSStorage: Boolean = Try(configStorage.getBoolean("aws.enabled")).getOrElse(false) - def isEnabledCloudStorage: Boolean = - isEnabledGCPStorage || isEnabledAWSStorage - case class AWSStorageConfig(accessKey: String, secretKey: String, region: String, bucket: String) def loadAWSStorageConfigs(constellationConfig: Config = constellation): NonEmptyList[AWSStorageConfig] = { diff --git a/src/main/scala/org/constellation/DAO.scala b/src/main/scala/org/constellation/DAO.scala index 148b98a4a..aa9cc3e2d 100644 --- a/src/main/scala/org/constellation/DAO.scala +++ b/src/main/scala/org/constellation/DAO.scala @@ -428,15 +428,13 @@ class DAO( val missingProposalFinder = MissingProposalFinder( ConfigUtil.constellation.getInt("snapshot.snapshotHeightInterval"), - ConfigUtil.constellation.getLong("snapshot.missingProposalOffset"), - ConfigUtil.constellation.getLong("snapshot.missingProposalLimit"), id ) redownloadService = RedownloadService[IO]( ConfigUtil.constellation.getInt("snapshot.meaningfulSnapshotsCount"), ConfigUtil.constellation.getInt("snapshot.snapshotHeightRedownloadDelayInterval"), - ConfigUtil.isEnabledCloudStorage, + ConfigUtil.constellation.getInt("snapshot.snapshotHeightInterval"), cluster, MajorityStateChooser(id), missingProposalFinder, diff --git a/src/main/scala/org/constellation/concurrency/cuckoo/CuckooFilter.scala b/src/main/scala/org/constellation/concurrency/cuckoo/CuckooFilter.scala new file mode 100644 index 000000000..b27cbdd33 --- /dev/null +++ b/src/main/scala/org/constellation/concurrency/cuckoo/CuckooFilter.scala @@ -0,0 +1,21 @@ +package org.constellation.concurrency.cuckoo + +import java.io.ByteArrayInputStream + +import net.cinnom.nanocuckoo.NanoCuckooFilter +import org.constellation.schema.snapshot.FilterData + +case class CuckooFilter(filterData: FilterData) { + + private lazy val filter: NanoCuckooFilter = { + val stream = new ByteArrayInputStream(filterData.contents) + try { + val resultingFilter = new NanoCuckooFilter.Builder(filterData.capacity).build() + resultingFilter.readMemory(stream) + resultingFilter + } finally stream.close() + } + + def contains[T](item: T)(implicit convert: T => String): Boolean = convert.andThen(filter.contains)(item) + +} diff --git a/src/main/scala/org/constellation/concurrency/cuckoo/MutableCuckooFilter.scala b/src/main/scala/org/constellation/concurrency/cuckoo/MutableCuckooFilter.scala new file mode 100644 index 000000000..8a16c2a73 --- /dev/null +++ b/src/main/scala/org/constellation/concurrency/cuckoo/MutableCuckooFilter.scala @@ -0,0 +1,75 @@ +package org.constellation.concurrency.cuckoo + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} + +import cats.effect.Sync +import net.cinnom.nanocuckoo.NanoCuckooFilter +import org.constellation.schema.snapshot.FilterData +import cats.syntax.all._ + +class MutableCuckooFilter[F[_], T](filter: NanoCuckooFilter)(implicit F: Sync[F]) { + + def insert(item: T)(implicit convert: T => String): F[Boolean] = { + val converted = convert(item) + for { + insertResult <- tryInsert(converted) + finalResult <- if (insertResult) F.pure(true) + else expand >> tryInsert(converted) + + } yield finalResult + } + + private def tryInsert(convertedItem: String): F[Boolean] = F.delay { + filter.insert(convertedItem) + } + + private def expand: F[Unit] = F.delay { + filter.expand() + } + + def delete(item: T)(implicit convert: T => String): F[Boolean] = F.delay { + filter.delete(convert(item)) + } + + def contains(item: T)(implicit convert: T => String): F[Boolean] = F.delay { + filter.contains(convert(item)) + } + + def getFilterData: F[FilterData] = + for { + capacity <- getCapacity + contents <- getFilterContents + } yield FilterData(contents, capacity) + + def clear: F[Unit] = + for { + capacity <- getCapacity + contents <- MutableCuckooFilter(capacity).getFilterContents + _ <- setFilterContents(contents) + } yield () + + private def getCapacity: F[Long] = F.delay { + filter.getCapacity + } + + private def getFilterContents: F[Array[Byte]] = F.delay { + val stream = new ByteArrayOutputStream() + try filter.writeMemory(stream) + finally stream.close() + stream.toByteArray + } + + private def setFilterContents(contents: Array[Byte]): F[Unit] = F.delay { + val stream = new ByteArrayInputStream(contents) + try filter.readMemory(stream) + finally stream.close() + } + +} + +object MutableCuckooFilter { + + def apply[F[_]: Sync, T](capacity: Long = 512): MutableCuckooFilter[F, T] = + new MutableCuckooFilter[F, T](new NanoCuckooFilter.Builder(capacity).build()) + +} diff --git a/src/main/scala/org/constellation/domain/p2p/client/SnapshotClientAlgebra.scala b/src/main/scala/org/constellation/domain/p2p/client/SnapshotClientAlgebra.scala index 0aa74341e..855b84eab 100644 --- a/src/main/scala/org/constellation/domain/p2p/client/SnapshotClientAlgebra.scala +++ b/src/main/scala/org/constellation/domain/p2p/client/SnapshotClientAlgebra.scala @@ -4,7 +4,8 @@ import org.constellation.domain.redownload.RedownloadService.{SnapshotProposalsA import org.constellation.gossip.state.GossipMessage import org.constellation.infrastructure.p2p.PeerResponse.PeerResponse import org.constellation.schema.Id -import org.constellation.schema.snapshot.{LatestMajorityHeight, SnapshotProposalPayload} +import org.constellation.schema.signature.Signed +import org.constellation.schema.snapshot.{LatestMajorityHeight, SnapshotProposal, SnapshotProposalPayload} trait SnapshotClientAlgebra[F[_]] { def getStoredSnapshots(): PeerResponse[F, List[String]] @@ -17,6 +18,8 @@ trait SnapshotClientAlgebra[F[_]] { def getPeerProposals(id: Id): PeerResponse[F, Option[SnapshotProposalsAtHeight]] + def queryPeerProposals(query: List[(Id, Long)]): PeerResponse[F, List[Option[Signed[SnapshotProposal]]]] + def getNextSnapshotHeight(): PeerResponse[F, (Id, Long)] def getSnapshotInfo(): PeerResponse[F, Array[Byte]] diff --git a/src/main/scala/org/constellation/domain/redownload/MissingProposalFinder.scala b/src/main/scala/org/constellation/domain/redownload/MissingProposalFinder.scala index 10d0f9959..8ede22770 100644 --- a/src/main/scala/org/constellation/domain/redownload/MissingProposalFinder.scala +++ b/src/main/scala/org/constellation/domain/redownload/MissingProposalFinder.scala @@ -4,16 +4,14 @@ import cats.syntax.all._ import org.constellation.domain.redownload.RedownloadService.{PeersCache, PeersProposals, SnapshotsAtHeight} import org.constellation.schema.Id import org.constellation.schema.snapshot.HeightRange.MaxRange -import org.constellation.schema.snapshot.{HeightRange, MajorityInfo} +import org.constellation.schema.snapshot.HeightRange import scala.collection.immutable.NumericRange.inclusive import scala.math.{max, min} class MissingProposalFinder( - private val heightInterval: Long, - private val offset: Long, - private val limit: Long, - private val selfId: Id + heightInterval: Long, + selfId: Id ) { /** @@ -23,9 +21,9 @@ class MissingProposalFinder( lookupRange: HeightRange, proposals: PeersProposals, peersCache: PeersCache - ): Map[Id, Set[Long]] = { + ): Set[(Id, Long)] = { val receivedProposals = proposals.mapValues(_.keySet) - val bounds = boundedRange(lookupRange) + val bounds = lookupRange (peersCache - selfId) .mapValues( @@ -36,34 +34,13 @@ class MissingProposalFinder( } } ) - .map { + .toList + .flatMap { case (id, majorityHeights) => val allProposals = expandIntervals(bounds, majorityHeights) - (id, allProposals -- receivedProposals.getOrElse(id, Set.empty)) - } - .filter { - case (_, missingProposals) => missingProposals.nonEmpty + (allProposals -- receivedProposals.getOrElse(id, Set.empty)).map((id, _)).toList } - } - - /** - * @return Id of the peer that can provide the most proposals that are missing, if one could be found - */ - def selectPeerForFetchingMissingProposals( - lookupRange: HeightRange, - missingProposals: Set[Long], - peerMajorityInfo: Map[Id, MajorityInfo] - ): Option[Id] = { - val bounds = boundedRange(lookupRange) - - (peerMajorityInfo - selfId).mapValues { majorityInfo => - val peerGaps = expandIntervals(bounds, majorityInfo.majorityGaps) ++ - rangeSet(majorityInfo.majorityRange.to + heightInterval, bounds.to) ++ - rangeSet(bounds.from, majorityInfo.majorityRange.from - heightInterval) - missingProposals -- peerGaps - }.toList.filter { case (_, proposalsToFill) => proposalsToFill.nonEmpty }.maximumByOption { - case (_, proposalsToFill) => proposalsToFill.size - }.map { case (id, _) => id } + .toSet } def findGapRanges(majorityState: SnapshotsAtHeight): List[HeightRange] = @@ -92,14 +69,10 @@ class MissingProposalFinder( private def rangeSet(from: Long, to: Long): Set[Long] = inclusive(from, to, heightInterval).toSet - - private def boundedRange(range: HeightRange) = - HeightRange(max(range.from, range.to - (offset + limit)), range.to - offset) - } object MissingProposalFinder { - def apply(heightInterval: Long, offset: Long, limit: Long, selfId: Id) = - new MissingProposalFinder(heightInterval, offset, limit, selfId) + def apply(heightInterval: Long, selfId: Id) = + new MissingProposalFinder(heightInterval, selfId) } diff --git a/src/main/scala/org/constellation/domain/redownload/RedownloadService.scala b/src/main/scala/org/constellation/domain/redownload/RedownloadService.scala index bd8fa5c05..cf76a6ea6 100644 --- a/src/main/scala/org/constellation/domain/redownload/RedownloadService.scala +++ b/src/main/scala/org/constellation/domain/redownload/RedownloadService.scala @@ -2,17 +2,19 @@ package org.constellation.domain.redownload import java.security.KeyPair -import cats.NonEmptyParallel import cats.data.{EitherT, NonEmptyList} import cats.effect.concurrent.Ref import cats.effect.{Blocker, Concurrent, ContextShift, Timer} import cats.syntax.all._ +import cats.{Applicative, NonEmptyParallel} import io.chrisdavenport.log4cats.slf4j.Slf4jLogger import io.chrisdavenport.mapref.MapRef import io.circe.{Decoder, Encoder} +import org.constellation.ConfigUtil import org.constellation.checkpoint.{CheckpointAcceptanceService, TopologicalSort} import org.constellation.concurrency.MapRefUtils import org.constellation.concurrency.MapRefUtils.MapRefOps +import org.constellation.concurrency.cuckoo.{CuckooFilter, MutableCuckooFilter} import org.constellation.domain.cloud.CloudService.CloudServiceEnqueue import org.constellation.domain.redownload.RedownloadService._ import org.constellation.domain.storage.LocalFileStorage @@ -32,12 +34,13 @@ import org.constellation.util.Metrics import scala.collection.SortedMap import scala.concurrent.ExecutionContext import scala.concurrent.duration._ +import scala.math.{max, min} import scala.util.Random -class RedownloadService[F[_]: NonEmptyParallel]( +class RedownloadService[F[_]: NonEmptyParallel: Applicative]( meaningfulSnapshotsCount: Int, redownloadInterval: Int, - isEnabledCloudStorage: Boolean, + heightInterval: Long, cluster: Cluster[F], majorityStateChooser: MajorityStateChooser, missingProposalFinder: MissingProposalFinder, @@ -69,97 +72,131 @@ class RedownloadService[F[_]: NonEmptyParallel]( /** * Majority proposals from other peers. It is used to calculate majority state. */ - private[redownload] val peersProposals: MapRef[F, Id, Option[SnapshotProposalsAtHeight]] = + private[redownload] val peerProposals: MapRef[F, Id, Option[SnapshotProposalsAtHeight]] = MapRefUtils.ofConcurrentHashMap() private[redownload] val lastMajorityState: Ref[F, SnapshotsAtHeight] = Ref.unsafe(Map.empty) private[redownload] var lastSentHeight: Ref[F, Long] = Ref.unsafe(-1L) - private[redownload] val peerMajorityInfo: MapRef[F, Id, Option[MajorityInfo]] = MapRefUtils.ofConcurrentHashMap() + private[redownload] val majorityStallCount: Ref[F, Int] = Ref.unsafe(0) - def updatePeerMajorityInfo(peerId: Id, majorityInfo: MajorityInfo): F[Unit] = - peerMajorityInfo(peerId).set(majorityInfo.some) + private[redownload] val localFilter: MutableCuckooFilter[F, ProposalCoordinate] = + MutableCuckooFilter[F, ProposalCoordinate]() + + private[redownload] val remoteFilters: MapRef[F, Id, Option[CuckooFilter]] = + MapRefUtils.ofConcurrentHashMap() private val logger = Slf4jLogger.getLogger[F] + private val stallCountThreshold = ConfigUtil.getOrElse("constellation.snapshot.stallCount", 4) + private val proposalLookupLimit = ConfigUtil.getOrElse("constellation.snapshot.proposalLookupLimit", 8) + def setLastSentHeight(height: Long): F[Unit] = - lastSentHeight.modify { _ => - (height, ()) - } >> metrics.updateMetricAsync[F]("redownload_lastSentHeight", height) + lastSentHeight.set(height) >> metrics.updateMetricAsync[F]("redownload_lastSentHeight", height) def setLastMajorityState(majorityState: SnapshotsAtHeight): F[Unit] = - lastMajorityState.modify { _ => - (majorityState, maxHeight(majorityState)) - }.flatMap(metrics.updateMetricAsync[F]("redownload_lastMajorityStateHeight", _)) + lastMajorityState.set(majorityState) >> + metrics.updateMetricAsync[F]("redownload_lastMajorityStateHeight", majorityState.maxHeight) def getLastMajorityState(): F[SnapshotsAtHeight] = lastMajorityState.get - def latestMajorityHeight: F[Long] = lastMajorityState.modify { s => - (s, maxHeight(s)) - } + def latestMajorityHeight: F[Long] = lastMajorityState.get.map(_.maxHeight) - def lowestMajorityHeight: F[Long] = lastMajorityState.modify { s => - (s, minHeight(s)) - } + def lowestMajorityHeight: F[Long] = lastMajorityState.get.map(_.minHeight) def getMajorityRange: F[HeightRange] = lastMajorityState.get.map { s => HeightRange(minHeight(s), maxHeight(s)) } - def getMajorityGapRanges: F[List[HeightRange]] = lastMajorityState.get.map(missingProposalFinder.findGapRanges) - def replacePeerProposals(peer: Id, proposals: SnapshotProposalsAtHeight): F[Unit] = - peersProposals(peer).set(proposals.some) + peerProposals(peer).get.flatMap { maybeMap => + maybeMap.traverse(removeFromLocalFilter) + } >> peerProposals(peer).set(proposals.some) >> addToLocalFilter(proposals) - def persistPeerProposal(peer: Id, proposal: Signed[SnapshotProposal]): F[Unit] = + def removePeerProposalsBelowHeight(height: Long): F[PeersProposals] = + for { + ids <- peerProposals.keys + results <- ids.traverse { id => + peerProposals(id).modify { maybeMap => + val oldProposals = maybeMap.getOrElse(Map.empty) + val removedProposals = oldProposals.filterKeys(_ < height) + val result = oldProposals.removeHeightsBelow(height) + (result.some, (removedProposals, result)) + }.flatMap { case (removed, result) => removeFromLocalFilter(removed) >> F.pure((id, result)) } + } + } yield results.toMap + + private def removeFromLocalFilter(proposals: SnapshotProposalsAtHeight): F[Unit] = + proposals.values.toList.map { spp => + (spp.signature.id, spp.value.height) + }.traverse { p => + localFilter + .delete(p) + .ifM(F.unit, logger.warn(s"Proposal $p could not be removed from local filter")) + } >> F.unit + + def addPeerProposal(proposal: Signed[SnapshotProposal]): F[Unit] = logger.debug( - s"Persisting proposal of ${peer.hex} at height ${proposal.value.height} and hash ${proposal.value.hash}" - ) >> persistPeerProposals(peer, Map(proposal.value.height -> proposal)) - - def persistPeerProposals(peer: Id, proposals: SnapshotProposalsAtHeight): F[Unit] = - peersProposals(peer).modify { maybeMap => - val updatedMap = maybeMap.getOrElse(Map.empty) ++ proposals - val trimmedMap = takeHighestUntilKey(updatedMap, getRemovalPoint(maxHeight(updatedMap))) - (trimmedMap.some, ()) - } + s"Persisting proposal of ${proposal.signature.id.hex} at height ${proposal.value.height} and hash ${proposal.value.hash}" + ) >> addPeerProposals(List(proposal)) + + def addPeerProposals(proposals: Iterable[Signed[SnapshotProposal]]): F[Unit] = + proposals.groupBy(_.signature.id).toList.traverse { + case (id, proposals) => + peerProposals(id) + .modify[SnapshotProposalsAtHeight] { maybeMap => + val oldProposals = maybeMap.getOrElse(Map.empty) + val newProposals = proposals.map(s => (s.value.height, s)).toMap + ((oldProposals ++ newProposals).some, newProposals -- oldProposals.keySet) + } + .flatMap(addToLocalFilter) + } >> F.unit + + private def addToLocalFilter(proposals: SnapshotProposalsAtHeight): F[Unit] = + proposals.values.toList.map { spp => + (spp.signature.id, spp.value.height) + }.traverse { p => + localFilter + .insert(p) + .ifM(F.unit, F.raiseError(new RuntimeException(s"Unable to insert proposal $p to local filter"))) + } >> F.unit + + def replaceRemoteFilter(peerId: Id, filterData: FilterData): F[Unit] = + remoteFilters(peerId).set(CuckooFilter(filterData).some) + + def localFilterData: F[FilterData] = localFilter.getFilterData def persistCreatedSnapshot(height: Long, hash: String, reputation: Reputation): F[Unit] = createdSnapshots.modify { m => val updated = if (m.contains(height)) m - else { - m.updated(height, signed(SnapshotProposal(hash, height, reputation), keyPair)) - } - val max = maxHeight(updated) - val removalPoint = getRemovalPoint(max) - val limited = takeHighestUntilKey(updated, removalPoint) - (limited, ()) - } >> metrics.updateMetricAsync[F]("redownload_maxCreatedSnapshotHeight", height) + else m.updated(height, signed(SnapshotProposal(hash, height, reputation), keyPair)) + (updated, updated.maxHeight) + }.flatMap(metrics.updateMetricAsync[F]("redownload_maxCreatedSnapshotHeight", _)) def persistAcceptedSnapshot(height: Long, hash: String): F[Unit] = acceptedSnapshots.modify { m => val updated = m.updated(height, hash) - val max = maxHeight(updated) - val removalPoint = getRemovalPoint(max) - val limited = takeHighestUntilKey(updated, removalPoint) - (limited, ()) - } >> metrics.updateMetricAsync[F]("redownload_maxAcceptedSnapshotHeight", height) + (updated, updated.maxHeight) + }.flatMap(metrics.updateMetricAsync[F]("redownload_maxAcceptedSnapshotHeight", _)) def getCreatedSnapshots(): F[SnapshotProposalsAtHeight] = createdSnapshots.get def getAcceptedSnapshots(): F[SnapshotsAtHeight] = acceptedSnapshots.get - def getAllPeerProposals(): F[PeersProposals] = peersProposals.toMap + def getAllPeerProposals(): F[PeersProposals] = peerProposals.toMap - def getPeerProposals(peerId: Id): F[Option[SnapshotProposalsAtHeight]] = peersProposals(peerId).get + def getPeerProposals(peerId: Id): F[Option[SnapshotProposalsAtHeight]] = peerProposals(peerId).get def clear: F[Unit] = for { _ <- createdSnapshots.modify(_ => (Map.empty, ())) _ <- acceptedSnapshots.modify(_ => (Map.empty, ())) - _ <- peersProposals.clear + _ <- peerProposals.clear + _ <- localFilter.clear + _ <- remoteFilters.clear _ <- setLastMajorityState(Map.empty) _ <- setLastSentHeight(-1) _ <- rewardsManager.clearLastRewardedHeight() @@ -178,7 +215,7 @@ class RedownloadService[F[_]: NonEmptyParallel]( } _ <- responses.traverse { - case (id, proposals) => persistPeerProposals(id, proposals) + case (_, proposals) => addPeerProposals(proposals.values) } } yield () @@ -251,24 +288,14 @@ class RedownloadService[F[_]: NonEmptyParallel]( unboundedBlocker )(client) - private[redownload] def fetchAndUpdatePeerProposals(peerId: Id)(client: PeerClientMetadata): F[Unit] = - for { - maybeProposals <- fetchPeerProposals(peerId, client) - _ <- maybeProposals.map { proposals => - proposals.filter { case (_, signedProposal) => signedProposal.validSignature } - }.traverse { proposals => - persistPeerProposals(peerId, proposals) - } - } yield () - private[redownload] def fetchPeerProposals( - peerId: Id, + query: List[ProposalCoordinate], client: PeerClientMetadata - ): F[Option[SnapshotProposalsAtHeight]] = + ): F[List[Option[Signed[SnapshotProposal]]]] = PeerResponse .run( apiClient.snapshot - .getPeerProposals(peerId), + .queryPeerProposals(query), unboundedBlocker )(client) @@ -400,9 +427,9 @@ class RedownloadService[F[_]: NonEmptyParallel]( val wrappedCheck = for { _ <- logger.debug("Checking alignment with majority snapshot...") - peersProposals <- getAllPeerProposals() - createdSnapshots <- getCreatedSnapshots() - acceptedSnapshots <- getAcceptedSnapshots() + + lastMajority <- getLastMajorityState() + (_, createdSnapshots, peerProposals) <- removeSnapshotsAndProposalsBelowHeight(lastMajority.minHeight) peers <- cluster.getPeerInfo ownPeer <- cluster.getOwnJoinedHeight() @@ -410,41 +437,57 @@ class RedownloadService[F[_]: NonEmptyParallel]( case (id, peerData) => (id, peerData.majorityHeight) } ++ Map(cluster.id -> NonEmptyList.one(MajorityHeight(ownPeer, None))) - _ <- logger.debug(s"Peers with majority heights") - _ <- peersCache.toList.traverse { - case (id, majorityHeight) => logger.debug(s"[$id]: $majorityHeight") - } - - _ <- logger.debug(s"Created snapshots: ${formatProposals(createdSnapshots)} ") - _ <- logger.debug(s"Peers proposals: ${peersProposals.mapValues(formatProposals)} ") + _ <- logger.debug(s"Peers with majority heights $peersCache") - majorityState = majorityStateChooser.chooseMajorityState( + calculatedMajority = majorityStateChooser.chooseMajorityState( createdSnapshots, - peersProposals, + peerProposals, peersCache ) - potentialGaps = missingProposalFinder.findGaps(majorityState) - - _ <- if (potentialGaps.isEmpty) { - logger.debug("Calculated majority state has no gaps") + gaps = missingProposalFinder.findGaps(calculatedMajority) + _ <- if (gaps.isEmpty) { + logger.debug( + s"Majority calculated in range ${calculatedMajority.heightRange} has no gaps" + ) } else { - logger.error(s"Found following gaps ${potentialGaps} in majority: ${majorityState.keys.toList.sorted}") + logger.warn( + s"Majority calculated in range ${calculatedMajority.heightRange} has following gaps $gaps" + ) } - maxMajorityHeight = maxHeight(majorityState) - ignorePoint = getIgnorePoint(maxMajorityHeight) - majorityStateCutoff = if (isDownload) maxMajorityHeight else ownPeer.getOrElse(maxMajorityHeight) - meaningfulAcceptedSnapshots = takeHighestUntilKey(acceptedSnapshots, ignorePoint) - meaningfulMajorityState = takeHighestUntilKey(majorityState, ignorePoint).filterKeys(_ >= majorityStateCutoff) + calculatedMajorityWithoutGaps = if (gaps.isEmpty) calculatedMajority + else calculatedMajority.removeHeightsAbove(gaps.min) + + joinHeight = if (isDownload) calculatedMajorityWithoutGaps.maxHeight + else ownPeer.getOrElse(calculatedMajorityWithoutGaps.maxHeight) + + majorityBeforeCutOff = { + val intersect = lastMajority.keySet & calculatedMajorityWithoutGaps.keySet + if (lastMajority.isEmpty || intersect.nonEmpty) + calculatedMajorityWithoutGaps + else + lastMajority + } + + cutOffHeight = getCutOffHeight(joinHeight, calculatedMajorityWithoutGaps, lastMajority) + meaningfulMajority = majorityBeforeCutOff.removeHeightsBelow(cutOffHeight) + _ <- setLastMajorityState(meaningfulMajority) + + (meaningfulAcceptedSnapshots, _, _) <- removeSnapshotsAndProposalsBelowHeight(cutOffHeight) - _ <- setLastMajorityState(meaningfulMajorityState) + _ <- logger.debug(s"Meaningful majority in range ${calculatedMajority.heightRange}") - _ <- if (meaningfulMajorityState.isEmpty) logger.debug("No majority - skipping redownload") else F.unit + _ <- F + .pure(meaningfulMajority.maxHeight > lastMajority.maxHeight) + .ifM(majorityStallCount.set(0), majorityStallCount.update(_ + 1)) + + _ <- if (meaningfulMajority.isEmpty) logger.debug("Meaningful majority is empty - skipping redownload") + else F.unit shouldPerformRedownload = shouldRedownload( meaningfulAcceptedSnapshots, - meaningfulMajorityState, + meaningfulMajority, redownloadInterval, isDownload ) @@ -457,7 +500,7 @@ class RedownloadService[F[_]: NonEmptyParallel]( cluster.compareAndSet(NodeState.validForRedownload, NodeState.DownloadInProgress) }.flatMap { state => if (state.isNewSet) { - wrappedRedownload(shouldPerformRedownload, meaningfulAcceptedSnapshots, meaningfulMajorityState).handleErrorWith { + wrappedRedownload(shouldPerformRedownload, meaningfulAcceptedSnapshots, meaningfulMajority).handleErrorWith { error => logger.error(error)(s"Redownload error, isDownload=$isDownload: ${stringifyStackTrace(error)}") >> { if (isDownload) @@ -468,16 +511,17 @@ class RedownloadService[F[_]: NonEmptyParallel]( .void } } - } else logger.debug(s"Setting node state during redownload failed! Skipping redownload. isDownload=$isDownload") + } else + logger.debug(s"Setting node state during redownload failed! Skipping redownload. isDownload=$isDownload") } } else logger.debug("No redownload needed - snapshots have been already aligned with majority state.") _ <- logger.debug("Sending majority snapshots to cloud.") - _ <- if (meaningfulMajorityState.nonEmpty) sendMajoritySnapshotsToCloud() + _ <- if (meaningfulMajority.nonEmpty) sendMajoritySnapshotsToCloud() else logger.debug("No majority - skipping sending to cloud") _ <- logger.debug("Rewarding majority snapshots.") - _ <- if (meaningfulMajorityState.nonEmpty) rewardMajoritySnapshots().value.flatMap(F.fromEither) + _ <- if (meaningfulMajority.nonEmpty) rewardMajoritySnapshots().value.flatMap(F.fromEither) else logger.debug("No majority - skipping rewarding") _ <- logger.debug("Removing unaccepted snapshots from disk.") @@ -490,56 +534,112 @@ class RedownloadService[F[_]: NonEmptyParallel]( _ <- logger.debug("Accepting all the checkpoint blocks received during the redownload.") _ <- acceptCheckpointBlocks().value.flatMap(F.fromEither) - _ <- if (!shouldPerformRedownload) findAndFetchMissingProposals(peersProposals, peersCache) else F.unit + _ <- if (!shouldPerformRedownload) findAndFetchMissingProposals(peerProposals, peersCache) else F.unit } yield () - cluster.getNodeState - .map { current => - if (isDownload) NodeState.validForDownload.contains(current) - else NodeState.validForRedownload.contains(current) - } - .ifM( - wrappedCheck, - logger.debug(s"Node state is not valid for redownload, skipping. isDownload=$isDownload") >> F.unit - ) + cluster.getNodeState.map { current => + if (isDownload) NodeState.validForDownload.contains(current) + else NodeState.validForRedownload.contains(current) + }.ifM( + wrappedCheck, + logger.debug(s"Node state is not valid for redownload, skipping. isDownload=$isDownload") >> F.unit + ) }.handleErrorWith { error => logger.error(error)("Error during checking alignment with majority snapshot.") >> error.raiseError[F, Unit] } - private[redownload] def getLookupRange(majorityInfos: Map[Id, MajorityInfo]): F[HeightRange] = + private def getCutOffHeight( + joinHeight: Long, + candidateMajority: SnapshotsAtHeight, + lastMajority: SnapshotsAtHeight + ): Long = { + val ignorePoint = getIgnorePoint(candidateMajority.maxHeight) + min(max(ignorePoint, joinHeight), lastMajority.maxHeight) + } + + private[redownload] def removeSnapshotsAndProposalsBelowHeight( + height: Long + ): F[(SnapshotsAtHeight, SnapshotProposalsAtHeight, PeersProposals)] = + for { + as <- acceptedSnapshots.updateAndGet(_.removeHeightsBelow(height)) + cs <- createdSnapshots.updateAndGet(_.removeHeightsBelow(height)) + pp <- removePeerProposalsBelowHeight(height) + } yield (as, cs, pp) + + private[redownload] def getLookupRange: F[HeightRange] = for { - mr <- getMajorityRange - createdHeights <- createdSnapshots.get.map(_.values.map(_.value.height)) - peersHeights = majorityInfos.values.map(_.majorityRange.to) - maxHeight = (mr.to :: (createdHeights ++ peersHeights).toList).max - } yield HeightRange(mr.from, maxHeight) + stallCount <- majorityStallCount.get + majorityHeight <- latestMajorityHeight + result = if (stallCount > stallCountThreshold) + HeightRange(majorityHeight + heightInterval, majorityHeight + proposalLookupLimit) + else HeightRange.Empty + } yield result private def findAndFetchMissingProposals(peersProposals: PeersProposals, peersCache: PeersCache): F[Unit] = for { - peerMajorityInfo <- peerMajorityInfo.toMap - lookupRange <- getLookupRange(peerMajorityInfo) - missingProposals = missingProposalFinder.findMissingPeerProposals(lookupRange, peersProposals, peersCache) - _ <- missingProposals.nonEmpty + lookupRange <- getLookupRange + _ <- lookupRange.empty .pure[F] .ifM( - logger.info(s"Found missing proposals $missingProposals"), - logger.info(s"No missing proposals found") + logger.info("Skip looking for missing proposals"), { + val missingProposals = + missingProposalFinder.findMissingPeerProposals(lookupRange, peersProposals, peersCache) + missingProposals.nonEmpty + .pure[F] + .ifM( + logger.info(s"Missing proposals found in range $lookupRange, $missingProposals") >> + queryMissingProposals(missingProposals), + logger.info(s"No missing proposals found") + ) + } ) - _ <- missingProposals.toList.traverse { - case (peerId, peerGaps) => - val selectedPeer = missingProposalFinder - .selectPeerForFetchingMissingProposals(lookupRange, peerGaps, peerMajorityInfo - peerId) - .getOrElse(peerId) - fetchProposalForPeer(peerId)(selectedPeer) + + } yield () + + private def queryMissingProposals(coordinates: Set[ProposalCoordinate]): F[Unit] = + for { + remoteFilters <- shuffledRemoteFilterList + peerToQuery = selectPeersForQueries(coordinates, remoteFilters) + _ <- peerToQuery.toList.traverse { + case (peerId, query) => fetchAndUpdatePeerProposals(query, peerId) } } yield () - private def fetchProposalForPeer(peerId: Id)(fromPeerId: Id): F[Unit] = + private def shuffledRemoteFilterList: F[List[(Id, CuckooFilter)]] = + for { + map <- remoteFilters.toMap + result <- F.delay { + Random.shuffle(map.toList) + } + } yield result + + private def selectPeersForQueries( + missingProposals: Set[ProposalCoordinate], + remoteFilters: List[(Id, CuckooFilter)] + ): Map[Id, Set[ProposalCoordinate]] = + missingProposals + .foldLeft(Map.empty[Id, Set[ProposalCoordinate]]) { (acc, proposalCoordinate) => + remoteFilters.find { + case (_, f) => f.contains(proposalCoordinate) + }.map { + case (id, _) => acc |+| Map(id -> Set(proposalCoordinate)) + }.getOrElse(acc) + } + + private def fetchAndUpdatePeerProposals(query: Set[ProposalCoordinate], peerId: Id): F[Unit] = for { - peer <- cluster.getPeerInfo.map(_.get(fromPeerId)) + peer <- cluster.getPeerInfo.map(_.get(peerId)) apiClient = peer.map(_.peerMetadata.toPeerClientMetadata) - _ <- apiClient.traverse(fetchAndUpdatePeerProposals(peerId)) + _ <- apiClient.traverse { client => + for { + proposals <- fetchPeerProposals(query.toList, client) + filteredProposals = proposals + .mapFilter(identity) + .filter(_.validSignature) + _ <- addPeerProposals(filteredProposals) + } yield () + } } yield () private[redownload] def sendMajoritySnapshotsToCloud(): F[Unit] = { @@ -787,6 +887,19 @@ class RedownloadService[F[_]: NonEmptyParallel]( private def formatSnapshots(snapshots: SnapshotsAtHeight) = SortedMap[Long, String]() ++ snapshots + + implicit class HeightMapOps[V](val map: Map[Long, V]) { + def removeHeightsBelow(height: Long): Map[Long, V] = map.filterKeys(_ >= height) + + def removeHeightsAbove(height: Long): Map[Long, V] = map.filterKeys(_ <= height) + + def minHeight: Long = RedownloadService.this.minHeight(map) + + def maxHeight: Long = RedownloadService.this.maxHeight(map) + + def heightRange: HeightRange = HeightRange(minHeight, maxHeight) + } + } object RedownloadService { @@ -794,7 +907,7 @@ object RedownloadService { def apply[F[_]: Concurrent: ContextShift: NonEmptyParallel: Timer]( meaningfulSnapshotsCount: Int, redownloadInterval: Int, - isEnabledCloudStorage: Boolean, + heightInterval: Long, cluster: Cluster[F], majorityStateChooser: MajorityStateChooser, missingProposalFinder: MissingProposalFinder, @@ -813,7 +926,7 @@ object RedownloadService { new RedownloadService[F]( meaningfulSnapshotsCount, redownloadInterval, - isEnabledCloudStorage, + heightInterval, cluster, majorityStateChooser, missingProposalFinder, @@ -831,6 +944,7 @@ object RedownloadService { ) type Reputation = SortedMap[Id, Double] + type ProposalCoordinate = (Id, Long) type SnapshotsAtHeight = Map[Long, String] // height -> hash type SnapshotProposalsAtHeight = Map[Long, Signed[SnapshotProposal]] type PeersProposals = Map[Id, SnapshotProposalsAtHeight] @@ -843,4 +957,7 @@ object RedownloadService { implicit val snapshotProposalsAtHeightDecoder: Decoder[Map[Long, SnapshotProposal]] = Decoder.decodeMap[Long, SnapshotProposal] + implicit val proposalCoordinateToString: ProposalCoordinate => String = { + case (id, height) => s"$id:$height" + } } diff --git a/src/main/scala/org/constellation/infrastructure/endpoints/SnapshotEndpoints.scala b/src/main/scala/org/constellation/infrastructure/endpoints/SnapshotEndpoints.scala index 4c92d6fc2..dd19cce39 100644 --- a/src/main/scala/org/constellation/infrastructure/endpoints/SnapshotEndpoints.scala +++ b/src/main/scala/org/constellation/infrastructure/endpoints/SnapshotEndpoints.scala @@ -34,7 +34,7 @@ class SnapshotEndpoints[F[_]](implicit F: Concurrent[F], C: ContextShift[F]) ext private val logger: SelfAwareStructuredLogger[F] = Slf4jLogger.getLogger[F] def publicEndpoints( - nodeId: Id, + selfId: Id, snapshotStorage: LocalFileStorage[F, StoredSnapshot], snapshotService: SnapshotService[F], redownloadService: RedownloadService[F] @@ -42,8 +42,9 @@ class SnapshotEndpoints[F[_]](implicit F: Concurrent[F], C: ContextShift[F]) ext getStoredSnapshotsEndpoint(snapshotStorage) <+> getCreatedSnapshotsEndpoint(redownloadService) <+> getAcceptedSnapshotsEndpoint(redownloadService) <+> - getPeerProposals(nodeId, redownloadService) <+> - getNextSnapshotHeight(nodeId, snapshotService) <+> + getPeerProposals(selfId, redownloadService) <+> + queryPeerProposals(selfId, redownloadService) <+> + getNextSnapshotHeight(selfId, snapshotService) <+> getLatestMajorityHeight(redownloadService) <+> getLatestMajorityState(redownloadService) <+> getTotalSupply(snapshotService) @@ -63,6 +64,7 @@ class SnapshotEndpoints[F[_]](implicit F: Concurrent[F], C: ContextShift[F]) ext getCreatedSnapshotsEndpoint(redownloadService) <+> getAcceptedSnapshotsEndpoint(redownloadService) <+> getPeerProposals(nodeId, redownloadService) <+> + queryPeerProposals(nodeId, redownloadService) <+> getNextSnapshotHeight(nodeId, snapshotService) <+> getSnapshotInfo(snapshotService, cluster) <+> getSnapshotInfoByHash(snapshotInfoStorage) <+> @@ -79,6 +81,7 @@ class SnapshotEndpoints[F[_]](implicit F: Concurrent[F], C: ContextShift[F]) ext getCreatedSnapshotsEndpoint(redownloadService) <+> getAcceptedSnapshotsEndpoint(redownloadService) <+> getPeerProposals(nodeId, redownloadService) <+> + queryPeerProposals(nodeId, redownloadService) <+> getLatestMajorityHeight(redownloadService) private def getStoredSnapshotsEndpoint(snapshotStorage: LocalFileStorage[F, StoredSnapshot]): HttpRoutes[F] = @@ -116,18 +119,34 @@ class SnapshotEndpoints[F[_]](implicit F: Concurrent[F], C: ContextShift[F]) ext redownloadService.getAcceptedSnapshots().map(_.asJson).flatMap(Ok(_)) } - private def getPeerProposals(nodeId: Id, redownloadService: RedownloadService[F]): HttpRoutes[F] = + private def getPeerProposals(selfId: Id, redownloadService: RedownloadService[F]): HttpRoutes[F] = HttpRoutes.of[F] { - case GET -> Root / "peer" / peerId / "snapshot" / "created" => + case GET -> Root / "peer" / peerIdHex / "snapshot" / "created" => + val peerId = Id(peerIdHex) val peerProposals = - if (Id(peerId) == nodeId) + if (peerId == selfId) redownloadService.getCreatedSnapshots() else - redownloadService.getPeerProposals(nodeId).map(_.getOrElse(Map.empty)) + redownloadService.getPeerProposals(peerId).map(_.getOrElse(Map.empty)) peerProposals.map(_.asJson).flatMap(Ok(_)) } + private def queryPeerProposals(selfId: Id, redownloadService: RedownloadService[F]): HttpRoutes[F] = + HttpRoutes.of[F] { + case req @ POST -> Root / "snapshot" / "proposal" / "_query" => + for { + query <- req.decodeJson[List[(Id, Long)]] + result <- query.traverse { + case (`selfId`, height) => + redownloadService.getCreatedSnapshots().map(_.get(height)) + case (peerId, height) => + redownloadService.getPeerProposals(peerId).map(_.flatMap(_.get(height))) + } + resp <- Ok(result.asJson) + } yield resp + } + implicit val idLongEncoder: Encoder[(Id, Long)] = deriveEncoder private def getNextSnapshotHeight(nodeId: Id, snapshotService: SnapshotService[F]): HttpRoutes[F] = @@ -202,8 +221,8 @@ class SnapshotEndpoints[F[_]](implicit F: Concurrent[F], C: ContextShift[F]) ext case Valid(_) => val processProposalAsync = F.start( C.shift >> - redownloadService.persistPeerProposal(message.origin, payload.proposal) >> - redownloadService.updatePeerMajorityInfo(message.origin, payload.majorityInfo) >> + redownloadService.addPeerProposal(payload.proposal) >> + redownloadService.replaceRemoteFilter(message.origin, payload.filterData) >> snapshotProposalGossipService.spread(message) ) diff --git a/src/main/scala/org/constellation/infrastructure/p2p/client/SnapshotClientInterpreter.scala b/src/main/scala/org/constellation/infrastructure/p2p/client/SnapshotClientInterpreter.scala index 80207e0ab..e78bb43c7 100644 --- a/src/main/scala/org/constellation/infrastructure/p2p/client/SnapshotClientInterpreter.scala +++ b/src/main/scala/org/constellation/infrastructure/p2p/client/SnapshotClientInterpreter.scala @@ -9,7 +9,8 @@ import org.constellation.gossip.state.GossipMessage import org.constellation.infrastructure.p2p.PeerResponse import org.constellation.infrastructure.p2p.PeerResponse.PeerResponse import org.constellation.schema.Id -import org.constellation.schema.snapshot.{LatestMajorityHeight, SnapshotProposalPayload} +import org.constellation.schema.signature.Signed +import org.constellation.schema.snapshot.{LatestMajorityHeight, SnapshotProposal, SnapshotProposalPayload} import org.constellation.session.SessionTokenService import org.http4s.Method._ import org.http4s.Status.Successful @@ -55,6 +56,14 @@ class SnapshotClientInterpreter[F[_]: ContextShift]( sessionTokenService ) + def queryPeerProposals(query: List[(Id, Long)]): PeerResponse[F, List[Option[Signed[SnapshotProposal]]]] = + PeerResponse[F, List[Option[Signed[SnapshotProposal]]]](s"snapshot/proposal/_query", POST)( + client, + sessionTokenService + ) { (req, c) => + c.expect(req.withEntity(query)) + } + def getNextSnapshotHeight(): PeerResponse[F, (Id, Long)] = PeerResponse[F, (Id, Long)]("snapshot/nextHeight")(client, sessionTokenService) @@ -85,7 +94,9 @@ class SnapshotClientInterpreter[F[_]: ContextShift]( if (a) F.unit else F.raiseError( - new Throwable(s"Cannot post proposal of hash ${message.payload.proposal.value.hash} and height ${message.payload.proposal.value.height}") + new Throwable( + s"Cannot post proposal of hash ${message.payload.proposal.value.hash} and height ${message.payload.proposal.value.height}" + ) ) ) } diff --git a/src/main/scala/org/constellation/serialization/ConstellationKryoRegistrar.scala b/src/main/scala/org/constellation/serialization/ConstellationKryoRegistrar.scala index d1a8bb5f6..ddd1a7306 100644 --- a/src/main/scala/org/constellation/serialization/ConstellationKryoRegistrar.scala +++ b/src/main/scala/org/constellation/serialization/ConstellationKryoRegistrar.scala @@ -23,6 +23,6 @@ object ConstellationKryoRegistrar (classOf[StoredRewards], 185), (classOf[BuildInfoJson], 187), (classOf[GossipPath], 1032), - (classOf[GossipMessage[SnapshotProposalPayload]], 1033), + (classOf[GossipMessage[SnapshotProposalPayload]], 1033) ) ) {} diff --git a/src/main/scala/org/constellation/snapshot/SnapshotTrigger.scala b/src/main/scala/org/constellation/snapshot/SnapshotTrigger.scala index 06e307fdd..119303ccb 100644 --- a/src/main/scala/org/constellation/snapshot/SnapshotTrigger.scala +++ b/src/main/scala/org/constellation/snapshot/SnapshotTrigger.scala @@ -7,7 +7,7 @@ import org.constellation.gossip.snapshot.SnapshotProposalGossipService import org.constellation.p2p.{Cluster, SetStateResult} import org.constellation.schema.NodeState import org.constellation.schema.signature.Signed.signed -import org.constellation.schema.snapshot.{MajorityInfo, SnapshotProposal, SnapshotProposalPayload} +import org.constellation.schema.snapshot.{SnapshotProposal, SnapshotProposalPayload} import org.constellation.storage.{HeightIntervalConditionNotMet, NotEnoughSpace, SnapshotError, SnapshotIllegalState} import org.constellation.util.Logging._ import org.constellation.util.{Metrics, PeriodicIO} @@ -42,8 +42,7 @@ class SnapshotTrigger(periodSeconds: Int = 5, unboundedExecutionContext: Executi startTime <- IO(System.currentTimeMillis()) snapshotResult <- dao.snapshotService.attemptSnapshot().value elapsed <- IO(System.currentTimeMillis() - startTime) - majorityRange <- dao.redownloadService.getMajorityRange - majorityGapRanges <- dao.redownloadService.getMajorityGapRanges + filterData <- dao.redownloadService.localFilterData _ = logger.debug(s"Attempt snapshot took: $elapsed millis") _ <- snapshotResult match { case Left(NotEnoughSpace) => @@ -74,10 +73,7 @@ class SnapshotTrigger(periodSeconds: Int = 5, unboundedExecutionContext: Executi ), dao.keyPair ), - MajorityInfo( - majorityRange, - majorityGapRanges - ) + filterData ) ) .start diff --git a/src/test/scala/org/constellation/datastore/SnapshotTriggerTest.scala b/src/test/scala/org/constellation/datastore/SnapshotTriggerTest.scala index cf92a116c..8565f2db6 100644 --- a/src/test/scala/org/constellation/datastore/SnapshotTriggerTest.scala +++ b/src/test/scala/org/constellation/datastore/SnapshotTriggerTest.scala @@ -15,7 +15,7 @@ import org.scalatest.BeforeAndAfter import org.scalatest.freespec.AnyFreeSpec import org.scalatest.matchers.should.Matchers import cats.implicits._ -import org.constellation.schema.snapshot.HeightRange +import org.constellation.schema.snapshot.{FilterData, HeightRange} import scala.collection.SortedMap import scala.concurrent.ExecutionContext @@ -47,7 +47,7 @@ class SnapshotTriggerTest val snapshotTrigger = new SnapshotTrigger(5, ExecutionContext.global) dao.redownloadService.persistCreatedSnapshot(*, *, *) shouldReturnF Unit dao.redownloadService.getMajorityRange shouldReturnF HeightRange(2L, 4L) - dao.redownloadService.getMajorityGapRanges shouldReturnF List.empty + dao.redownloadService.localFilterData shouldReturnF FilterData(Array.empty, 0) dao.cluster.compareAndSet(*, *, *) shouldReturnF SetStateResult(NodeState.SnapshotCreation, true) @@ -66,7 +66,7 @@ class SnapshotTriggerTest val snapshotTrigger = new SnapshotTrigger(5, ExecutionContext.global) dao.redownloadService.persistAcceptedSnapshot(*, *) shouldReturnF Unit dao.redownloadService.getMajorityRange shouldReturnF HeightRange(2L, 4L) - dao.redownloadService.getMajorityGapRanges shouldReturnF List.empty + dao.redownloadService.localFilterData shouldReturnF FilterData(Array.empty, 0) dao.cluster.compareAndSet(*, *, *) shouldReturnF SetStateResult(NodeState.SnapshotCreation, true) diff --git a/src/test/scala/org/constellation/domain/redownload/MissingProposalFinderTest.scala b/src/test/scala/org/constellation/domain/redownload/MissingProposalFinderTest.scala index d3c35e481..1a1f7dc5c 100644 --- a/src/test/scala/org/constellation/domain/redownload/MissingProposalFinderTest.scala +++ b/src/test/scala/org/constellation/domain/redownload/MissingProposalFinderTest.scala @@ -5,11 +5,10 @@ import cats.syntax.all._ import org.constellation.p2p.MajorityHeight import org.constellation.schema.Id import org.constellation.schema.signature.{HashSignature, Signed} -import org.constellation.schema.snapshot.{HeightRange, MajorityInfo, SnapshotProposal} +import org.constellation.schema.snapshot.{HeightRange, SnapshotProposal} import org.mockito.cats.IdiomaticMockitoCats import org.mockito.{ArgumentMatchersSugar, IdiomaticMockito} import org.scalatest.BeforeAndAfterEach -import org.scalatest.OptionValues._ import org.scalatest.freespec.AnyFreeSpec import org.scalatest.matchers.should.Matchers @@ -23,7 +22,7 @@ class MissingProposalFinderTest with ArgumentMatchersSugar with BeforeAndAfterEach { - private val finder = MissingProposalFinder(2, 0, 10, Id("self")) + private val finder = MissingProposalFinder(2, Id("self")) private val toPeerProposals: ((Id, List[Int])) => (Id, Map[Long, Signed[SnapshotProposal]]) = { case (id, heights) => @@ -111,9 +110,9 @@ class MissingProposalFinderTest val result = finder.findMissingPeerProposals(majorityRange, peerProposals, peersCache) - result should (contain.key(Id("b")).and(contain).key(Id("c"))) - result.get(Id("b")).value should contain only (6L) - result.get(Id("c")).value should contain only (2L, 6L) + result should contain(Id("b"), 6L) + result should contain(Id("c"), 2L) + result should contain(Id("c"), 6L) } "should not find missing proposals when nodes were absent" in { @@ -134,34 +133,4 @@ class MissingProposalFinderTest } } - "selectPeerForFetchingMissingProposals" - { - "should select peer that has the least gaps" in { - val peerMajorityInfo = Map( - Id("a") -> MajorityInfo(HeightRange(2, 6), List(HeightRange(4, 6))), - Id("b") -> MajorityInfo(HeightRange(2, 6), List(HeightRange(4, 4))), - Id("c") -> MajorityInfo(HeightRange(2, 6), List(HeightRange(2, 4))) - ) - val missingProposals = Set(2L, 4L, 6L) - val majorityRange = HeightRange(2, 6) - - val result = finder.selectPeerForFetchingMissingProposals(majorityRange, missingProposals, peerMajorityInfo) - result should contain(Id("b")) - } - - "should not select any peer" in { - val peerMajorityInfo = Map( - Id("a") -> MajorityInfo(HeightRange(2, 10), List(HeightRange(6, 6))), - Id("b") -> MajorityInfo(HeightRange(8, 10), List.empty), - Id("c") -> MajorityInfo(HeightRange(2, 4), List.empty) - ) - - val missingProposals = Set(6L) - - val bounds = HeightRange(2, 6) - - val result = finder.selectPeerForFetchingMissingProposals(bounds, missingProposals, peerMajorityInfo) - result shouldBe empty - } - } - } diff --git a/src/test/scala/org/constellation/domain/redownload/RedownloadServiceTest.scala b/src/test/scala/org/constellation/domain/redownload/RedownloadServiceTest.scala index 2fa96708f..f8eb4ba40 100644 --- a/src/test/scala/org/constellation/domain/redownload/RedownloadServiceTest.scala +++ b/src/test/scala/org/constellation/domain/redownload/RedownloadServiceTest.scala @@ -18,7 +18,7 @@ import org.constellation.p2p.{Cluster, MajorityHeight, PeerData} import org.constellation.rewards.RewardsManager import org.constellation.schema.Id import org.constellation.schema.signature.{HashSignature, Signed} -import org.constellation.schema.snapshot.{HeightRange, MajorityInfo, SnapshotInfo, SnapshotProposal, StoredSnapshot} +import org.constellation.schema.snapshot.{SnapshotInfo, SnapshotProposal, StoredSnapshot} import org.constellation.storage.SnapshotService import org.constellation.util.Metrics import org.constellation.{PeerMetadata, ResourceInfo} @@ -62,7 +62,7 @@ class RedownloadServiceTest val meaningfulSnapshotsCount = 4 val redownloadInterval = 2 - val signature = HashSignature("xyz", Id("xyz")) + val heightInterval = 2 val keyPair = KeyUtils.makeKeyPair() before { @@ -88,7 +88,7 @@ class RedownloadServiceTest redownloadService = RedownloadService[IO]( meaningfulSnapshotsCount, redownloadInterval, - true, + heightInterval, cluster, majorityStateChooser, missingProposalFinder, @@ -248,7 +248,7 @@ class RedownloadServiceTest val persist = redownloadService.persistCreatedSnapshot(2L, "aabbcc", trust) val check = redownloadService.createdSnapshots.get.map(_.get(2L)) (persist >> check).unsafeRunSync.get should matchPattern { - case Signed(signature, SnapshotProposal("aabbcc", 2L, trust)) => () + case Signed(_, SnapshotProposal("aabbcc", 2L, _)) => () } } @@ -261,17 +261,6 @@ class RedownloadServiceTest case Signed(_, SnapshotProposal("aaaa", 2L, _)) => () } } - - s"should limit the Map to removal point" in { - val removalPoint = 30 - val snapshots = (1 to removalPoint + 10 by 2).map(_.toLong).toList - - val persist = snapshots.traverse(s => redownloadService.persistCreatedSnapshot(s, s.toString, SortedMap.empty)) - val check = - redownloadService.createdSnapshots.get.map(_.find { case (height, _) => height <= removalPoint }).map(_.isEmpty) - - (persist >> check).unsafeRunSync shouldBe true - } } "persistAcceptedSnapshot" - { @@ -289,18 +278,54 @@ class RedownloadServiceTest (persistFirst >> persistSecond >> check).unsafeRunSync shouldBe "bbbb".some } + } + + "removeSnapshotsAndProposalsBelowHeight" - { + val peer1 = Id("p1") + val peer2 = Id("p2") + val peer1signature = HashSignature("xyz1", peer1) + val peer2signature = HashSignature("xyz2", peer2) + + "should remove snapshots and proposals below 12 and keep all other" in { + // given + (for { + _ <- redownloadService.persistAcceptedSnapshot(10, "hash10") + _ <- redownloadService.persistAcceptedSnapshot(12, "hash12") + _ <- redownloadService.persistCreatedSnapshot(10, "hash10", SortedMap.empty) + _ <- redownloadService.persistCreatedSnapshot(12, "hash12", SortedMap.empty) + _ <- redownloadService.addPeerProposal( + Signed(peer1signature, SnapshotProposal("hash10p1", 10, SortedMap.empty)) + ) + _ <- redownloadService.addPeerProposal( + Signed(peer1signature, SnapshotProposal("hash12p1", 12, SortedMap.empty)) + ) + _ <- redownloadService.addPeerProposal( + Signed(peer2signature, SnapshotProposal("hash10p2", 10, SortedMap.empty)) + ) + _ <- redownloadService.addPeerProposal( + Signed(peer2signature, SnapshotProposal("hash10p2", 12, SortedMap.empty)) + ) + } yield ()).unsafeRunSync() + + // when + redownloadService.removeSnapshotsAndProposalsBelowHeight(12).unsafeRunSync() - s"should limit the Map to removal point" in { - val removalPoint = 30 - val snapshots = (1 to removalPoint + 10 by 2).map(_.toLong).toList + // then + val acceptedSnapshots = redownloadService.getAcceptedSnapshots().unsafeRunSync() + acceptedSnapshots should not contain key(10L) + (acceptedSnapshots should contain).key(12L) - val persist = snapshots.traverse(s => redownloadService.persistAcceptedSnapshot(s, s.toString)) - val check = - redownloadService.acceptedSnapshots.get - .map(_.find { case (height, _) => height <= removalPoint }) - .map(_.isEmpty) + val createdSnapshots = redownloadService.getCreatedSnapshots().unsafeRunSync() + createdSnapshots should not contain key(10L) + (createdSnapshots should contain).key(12L) - (persist >> check).unsafeRunSync shouldBe true + val peer1Proposals = redownloadService.getPeerProposals(peer1).unsafeRunSync().getOrElse(Map.empty) + peer1Proposals should not contain key(10L) + (peer1Proposals should contain).key(12L) + + val peer2Proposals = redownloadService.getPeerProposals(peer2).unsafeRunSync().getOrElse(Map.empty) + peer2Proposals should not contain key(10L) + (peer2Proposals should contain).key(12L) } } @@ -345,6 +370,8 @@ class RedownloadServiceTest "fetchAndUpdatePeersProposals" - { val peer1 = Id("p1") val peer2 = Id("p2") + val peer1signature = HashSignature("xyz1", peer1) + val peer2signature = HashSignature("xyz2", peer2) val peer1Data = PeerData( PeerMetadata("host1", 9999, peer1, resourceInfo = mock[ResourceInfo]), NonEmptyList(mock[MajorityHeight], Nil) @@ -354,16 +381,16 @@ class RedownloadServiceTest NonEmptyList(mock[MajorityHeight], Nil) ) val initialPeersProposals = - Map(peer1 -> Map(1L -> Signed(signature, SnapshotProposal("hash1p1", 1L, SortedMap.empty)))) + Map(peer1 -> Map(1L -> Signed(peer1signature, SnapshotProposal("hash1p1", 1L, SortedMap.empty)))) val peer1Proposals = Map( - 1L -> Signed(signature, SnapshotProposal("hash2p1", 1L, SortedMap.empty)), - 2L -> Signed(signature, SnapshotProposal("hash3p1", 2L, SortedMap.empty)) + 1L -> Signed(peer1signature, SnapshotProposal("hash2p1", 1L, SortedMap.empty)), + 2L -> Signed(peer1signature, SnapshotProposal("hash3p1", 2L, SortedMap.empty)) ) val peer2Proposals = Map( - 1L -> Signed(signature, SnapshotProposal("hash1p2", 1L, SortedMap.empty)), - 2L -> Signed(signature, SnapshotProposal("hash2p2", 2L, SortedMap.empty)) + 1L -> Signed(peer2signature, SnapshotProposal("hash1p2", 1L, SortedMap.empty)), + 2L -> Signed(peer2signature, SnapshotProposal("hash2p2", 2L, SortedMap.empty)) ) "for empty proposals Map" - { @@ -422,8 +449,8 @@ class RedownloadServiceTest val actual = redownloadService.getAllPeerProposals().unsafeRunSync() val expected = Map( peer1 -> Map( - 1L -> Signed(signature, SnapshotProposal("hash2p1", 1L, SortedMap.empty)), - 2L -> Signed(signature, SnapshotProposal("hash3p1", 2L, SortedMap.empty)) + 1L -> Signed(peer1signature, SnapshotProposal("hash2p1", 1L, SortedMap.empty)), + 2L -> Signed(peer1signature, SnapshotProposal("hash3p1", 2L, SortedMap.empty)) ) ) @@ -451,31 +478,6 @@ class RedownloadServiceTest } } - "getLookupRange" - { - "should use max height from peer majority state" in { - redownloadService.setLastMajorityState(Map(4L -> "hash2", 6L -> "hash3", 8L -> "hash3")).unsafeRunSync() - val majorityInfos = Map( - Id("a") -> MajorityInfo(HeightRange(0L, 10L), List.empty), - Id("b") -> MajorityInfo(HeightRange(0L, 6L), List.empty) - ) - - val result = redownloadService.getLookupRange(majorityInfos).unsafeRunSync() - - result shouldBe HeightRange(4L, 10L) - } - - "should use max height from own majority state" in { - redownloadService.setLastMajorityState(Map(4L -> "hash2", 6L -> "hash3", 8L -> "hash3")).unsafeRunSync() - val majorityInfos = Map( - Id("b") -> MajorityInfo(HeightRange(0L, 6L), List.empty) - ) - - val result = redownloadService.getLookupRange(majorityInfos).unsafeRunSync() - - result shouldBe HeightRange(4L, 8L) - } - } - /* "should fetch created proposals of all the peers" in { val peerInfo = Map(Id("node1") -> mock[PeerData], Id("node2") -> mock[PeerData]) @@ -687,7 +689,7 @@ class RedownloadServiceTest val redownloadService = RedownloadService[IO]( meaningfulSnapshotsCount, redownloadInterval, - false, + heightInterval, cluster, majorityStateChooser, missingProposalFinder, diff --git a/src/test/scala/org/constellation/infrastructure/endpoints/SnapshotEndpointsTest.scala b/src/test/scala/org/constellation/infrastructure/endpoints/SnapshotEndpointsTest.scala index b77b69d5e..057b520a7 100644 --- a/src/test/scala/org/constellation/infrastructure/endpoints/SnapshotEndpointsTest.scala +++ b/src/test/scala/org/constellation/infrastructure/endpoints/SnapshotEndpointsTest.scala @@ -11,7 +11,7 @@ import org.constellation.gossip.validation.{MessageValidationError, MessageValid import org.constellation.keytool.KeyUtils import org.constellation.schema.Id import org.constellation.schema.signature.Signed.signed -import org.constellation.schema.snapshot.{HeightRange, MajorityInfo, SnapshotProposal, SnapshotProposalPayload} +import org.constellation.schema.snapshot.{FilterData, SnapshotProposal, SnapshotProposalPayload} import org.constellation.serialization.KryoSerializer import org.constellation.session.Registration.`X-Id` import org.http4s.Status.{BadRequest, Ok} @@ -48,8 +48,8 @@ class SnapshotEndpointsTest override def beforeAll: Unit = KryoSerializer.init[IO].handleError(_ => Unit).unsafeRunSync() before { - redownloadService.persistPeerProposal(*, *) shouldReturnF Unit - redownloadService.updatePeerMajorityInfo(*, *) shouldReturnF Unit + redownloadService.addPeerProposal(*) shouldReturnF Unit + redownloadService.replaceRemoteFilter(*, *) shouldReturnF Unit snapshotProposalGossipService.spread(*[GossipMessage[SnapshotProposalPayload]]) shouldReturnF Unit } @@ -67,7 +67,7 @@ class SnapshotEndpointsTest SnapshotProposal("proposal-hash", 50L, SortedMap.empty[Id, Double]), originKeyPair ), - MajorityInfo(HeightRange(10L, 50L), List.empty) + FilterData(Array.empty, 0) ), path = GossipPath(IndexedSeq(originId, selfId, originId), "path-id", 1), originId