Skip to content

Commit

Permalink
Majority without gaps
Browse files Browse the repository at this point in the history
  • Loading branch information
tbekas committed May 27, 2021
1 parent a244efe commit 5f06c24
Show file tree
Hide file tree
Showing 23 changed files with 501 additions and 315 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ All notable changes to this project will be documented in this file.
## [Unreleased]
### Changed
- Whitelisting update
- Majority selection algorithm
- Missing proposal lookup algorithm

## [v2.23.3] 2021-04-19
### Changed
Expand Down
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ lazy val coreDependencies = Seq(
"pl.abankowski" %% "http-request-signer-core" % versions.httpSigner,
"pl.abankowski" %% "http4s-request-signer" % versions.httpSigner,
"io.chrisdavenport" %% "fuuid" % "0.4.0",
"io.chrisdavenport" %% "mapref" % "0.1.1"
"io.chrisdavenport" %% "mapref" % "0.1.1",
"net.cinnom" % "nano-cuckoo" % "2.0.0"
) ++ prometheusDependencies ++ http4sDependencies ++ schemaSharedDependencies ++ twitterAlgebirdDependencies ++ pureconfigDependencies ++ monocleDependencies

//Test dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import org.constellation.schema.edge._
import org.constellation.schema.observation._
import org.constellation.schema.signature.{HashSignature, SignatureBatch, Signed}
import org.constellation.schema.snapshot.{
HeightRange,
MajorityInfo,
FilterData,
Snapshot,
SnapshotInfo,
SnapshotProposal,
Expand Down Expand Up @@ -102,7 +101,6 @@ object SchemaKryoRegistrar
(classOf[SnapshotProposalPayload], 200),
(classOf[Signed[SnapshotProposal]], 201),
(classOf[SnapshotProposal], 202),
(classOf[MajorityInfo], 203),
(classOf[HeightRange], 204)
(classOf[FilterData], 203)
)
) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.constellation.schema.snapshot

import io.circe.{Decoder, Encoder}
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}

case class FilterData(
contents: Array[Byte],
capacity: Long
)

object FilterData {
implicit val encoder: Encoder[FilterData] = deriveEncoder
implicit val decoder: Decoder[FilterData] = deriveDecoder
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@ import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
/**
* Inclusive range of heights
*/
case class HeightRange(from: Long, to: Long)
case class HeightRange(from: Long, to: Long) {
def nonEmpty: Boolean = !empty

def empty: Boolean = from > to
}

object HeightRange {
val MaxRange = HeightRange(0, Long.MaxValue)
val MinRange = HeightRange(0, 0)
val Empty = HeightRange(Long.MaxValue, 0)
implicit val encoder: Encoder[HeightRange] = deriveEncoder
implicit val decoder: Decoder[HeightRange] = deriveDecoder
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import org.constellation.schema.signature.Signed

case class SnapshotProposalPayload(
proposal: Signed[SnapshotProposal],
majorityInfo: MajorityInfo
filterData: FilterData
)

object SnapshotProposalPayload {
Expand Down
4 changes: 2 additions & 2 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ constellation {
snapshotHeightDelayInterval = 20
snapshotHeightRedownloadDelayInterval = 30
meaningfulSnapshotsCount = 40
missingProposalOffset = 0
missingProposalLimit = 40
stallCountThreshold = 4
proposalLookupLimit = 8
}
transaction {
generator {
Expand Down
3 changes: 0 additions & 3 deletions src/main/scala/org/constellation/ConfigUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,6 @@ object ConfigUtil {
def isEnabledAWSStorage: Boolean =
Try(configStorage.getBoolean("aws.enabled")).getOrElse(false)

def isEnabledCloudStorage: Boolean =
isEnabledGCPStorage || isEnabledAWSStorage

case class AWSStorageConfig(accessKey: String, secretKey: String, region: String, bucket: String)

def loadAWSStorageConfigs(constellationConfig: Config = constellation): NonEmptyList[AWSStorageConfig] = {
Expand Down
4 changes: 1 addition & 3 deletions src/main/scala/org/constellation/DAO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -428,15 +428,13 @@ class DAO(

val missingProposalFinder = MissingProposalFinder(
ConfigUtil.constellation.getInt("snapshot.snapshotHeightInterval"),
ConfigUtil.constellation.getLong("snapshot.missingProposalOffset"),
ConfigUtil.constellation.getLong("snapshot.missingProposalLimit"),
id
)

redownloadService = RedownloadService[IO](
ConfigUtil.constellation.getInt("snapshot.meaningfulSnapshotsCount"),
ConfigUtil.constellation.getInt("snapshot.snapshotHeightRedownloadDelayInterval"),
ConfigUtil.isEnabledCloudStorage,
ConfigUtil.constellation.getInt("snapshot.snapshotHeightInterval"),
cluster,
MajorityStateChooser(id),
missingProposalFinder,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.constellation.concurrency.cuckoo

import java.io.ByteArrayInputStream

import net.cinnom.nanocuckoo.NanoCuckooFilter
import org.constellation.schema.snapshot.FilterData

case class CuckooFilter(filterData: FilterData) {

private lazy val filter: NanoCuckooFilter = {
val stream = new ByteArrayInputStream(filterData.contents)
try {
val resultingFilter = new NanoCuckooFilter.Builder(filterData.capacity).build()
resultingFilter.readMemory(stream)
resultingFilter
} finally stream.close()
}

def contains[T](item: T)(implicit convert: T => String): Boolean = convert.andThen(filter.contains)(item)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package org.constellation.concurrency.cuckoo

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}

import cats.effect.Sync
import net.cinnom.nanocuckoo.NanoCuckooFilter
import org.constellation.schema.snapshot.FilterData
import cats.syntax.all._

class MutableCuckooFilter[F[_], T](filter: NanoCuckooFilter)(implicit F: Sync[F]) {

def insert(item: T)(implicit convert: T => String): F[Boolean] = {
val converted = convert(item)
for {
insertResult <- tryInsert(converted)
finalResult <- if (insertResult) F.pure(true)
else expand >> tryInsert(converted)

} yield finalResult
}

private def tryInsert(convertedItem: String): F[Boolean] = F.delay {
filter.insert(convertedItem)
}

private def expand: F[Unit] = F.delay {
filter.expand()
}

def delete(item: T)(implicit convert: T => String): F[Boolean] = F.delay {
filter.delete(convert(item))
}

def contains(item: T)(implicit convert: T => String): F[Boolean] = F.delay {
filter.contains(convert(item))
}

def getFilterData: F[FilterData] =
for {
capacity <- getCapacity
contents <- getFilterContents
} yield FilterData(contents, capacity)

def clear: F[Unit] =
for {
capacity <- getCapacity
contents <- MutableCuckooFilter(capacity).getFilterContents
_ <- setFilterContents(contents)
} yield ()

private def getCapacity: F[Long] = F.delay {
filter.getCapacity
}

private def getFilterContents: F[Array[Byte]] = F.delay {
val stream = new ByteArrayOutputStream()
try filter.writeMemory(stream)
finally stream.close()
stream.toByteArray
}

private def setFilterContents(contents: Array[Byte]): F[Unit] = F.delay {
val stream = new ByteArrayInputStream(contents)
try filter.readMemory(stream)
finally stream.close()
}

}

object MutableCuckooFilter {

def apply[F[_]: Sync, T](capacity: Long = 512): MutableCuckooFilter[F, T] =
new MutableCuckooFilter[F, T](new NanoCuckooFilter.Builder(capacity).build())

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import org.constellation.domain.redownload.RedownloadService.{SnapshotProposalsA
import org.constellation.gossip.state.GossipMessage
import org.constellation.infrastructure.p2p.PeerResponse.PeerResponse
import org.constellation.schema.Id
import org.constellation.schema.snapshot.{LatestMajorityHeight, SnapshotProposalPayload}
import org.constellation.schema.signature.Signed
import org.constellation.schema.snapshot.{LatestMajorityHeight, SnapshotProposal, SnapshotProposalPayload}

trait SnapshotClientAlgebra[F[_]] {
def getStoredSnapshots(): PeerResponse[F, List[String]]
Expand All @@ -17,6 +18,8 @@ trait SnapshotClientAlgebra[F[_]] {

def getPeerProposals(id: Id): PeerResponse[F, Option[SnapshotProposalsAtHeight]]

def queryPeerProposals(query: List[(Id, Long)]): PeerResponse[F, List[Option[Signed[SnapshotProposal]]]]

def getNextSnapshotHeight(): PeerResponse[F, (Id, Long)]

def getSnapshotInfo(): PeerResponse[F, Array[Byte]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,14 @@ import cats.syntax.all._
import org.constellation.domain.redownload.RedownloadService.{PeersCache, PeersProposals, SnapshotsAtHeight}
import org.constellation.schema.Id
import org.constellation.schema.snapshot.HeightRange.MaxRange
import org.constellation.schema.snapshot.{HeightRange, MajorityInfo}
import org.constellation.schema.snapshot.HeightRange

import scala.collection.immutable.NumericRange.inclusive
import scala.math.{max, min}

class MissingProposalFinder(
private val heightInterval: Long,
private val offset: Long,
private val limit: Long,
private val selfId: Id
heightInterval: Long,
selfId: Id
) {

/**
Expand All @@ -23,9 +21,9 @@ class MissingProposalFinder(
lookupRange: HeightRange,
proposals: PeersProposals,
peersCache: PeersCache
): Map[Id, Set[Long]] = {
): Set[(Id, Long)] = {
val receivedProposals = proposals.mapValues(_.keySet)
val bounds = boundedRange(lookupRange)
val bounds = lookupRange

(peersCache - selfId)
.mapValues(
Expand All @@ -36,34 +34,13 @@ class MissingProposalFinder(
}
}
)
.map {
.toList
.flatMap {
case (id, majorityHeights) =>
val allProposals = expandIntervals(bounds, majorityHeights)
(id, allProposals -- receivedProposals.getOrElse(id, Set.empty))
}
.filter {
case (_, missingProposals) => missingProposals.nonEmpty
(allProposals -- receivedProposals.getOrElse(id, Set.empty)).map((id, _)).toList
}
}

/**
* @return Id of the peer that can provide the most proposals that are missing, if one could be found
*/
def selectPeerForFetchingMissingProposals(
lookupRange: HeightRange,
missingProposals: Set[Long],
peerMajorityInfo: Map[Id, MajorityInfo]
): Option[Id] = {
val bounds = boundedRange(lookupRange)

(peerMajorityInfo - selfId).mapValues { majorityInfo =>
val peerGaps = expandIntervals(bounds, majorityInfo.majorityGaps) ++
rangeSet(majorityInfo.majorityRange.to + heightInterval, bounds.to) ++
rangeSet(bounds.from, majorityInfo.majorityRange.from - heightInterval)
missingProposals -- peerGaps
}.toList.filter { case (_, proposalsToFill) => proposalsToFill.nonEmpty }.maximumByOption {
case (_, proposalsToFill) => proposalsToFill.size
}.map { case (id, _) => id }
.toSet
}

def findGapRanges(majorityState: SnapshotsAtHeight): List[HeightRange] =
Expand Down Expand Up @@ -92,14 +69,10 @@ class MissingProposalFinder(

private def rangeSet(from: Long, to: Long): Set[Long] =
inclusive(from, to, heightInterval).toSet

private def boundedRange(range: HeightRange) =
HeightRange(max(range.from, range.to - (offset + limit)), range.to - offset)

}

object MissingProposalFinder {

def apply(heightInterval: Long, offset: Long, limit: Long, selfId: Id) =
new MissingProposalFinder(heightInterval, offset, limit, selfId)
def apply(heightInterval: Long, selfId: Id) =
new MissingProposalFinder(heightInterval, selfId)
}
Loading

0 comments on commit 5f06c24

Please sign in to comment.