Skip to content

Commit

Permalink
Safely access cached gossip score across threads (#190)
Browse files Browse the repository at this point in the history
  • Loading branch information
mbaxter committed Apr 23, 2021
1 parent 511c5a2 commit 84a9c63
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 5 deletions.
13 changes: 8 additions & 5 deletions src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import io.libp2p.etc.util.P2PService
import io.libp2p.pubsub.PubsubMessage
import io.libp2p.pubsub.Topic
import java.util.Optional
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import kotlin.math.max
import kotlin.math.min
import kotlin.math.pow
Expand Down Expand Up @@ -115,10 +115,13 @@ class GossipScore(
}

inner class PeerScores {
// Cached values are accessed across threads
@Volatile
var cachedScore: Double = 0.0

val ips = mutableSetOf<String>()
var connectedTimeMillis: Long = 0
var disconnectedTimeMillis: Long = 0
val cachedScore: AtomicReference<Double> = AtomicReference(0.0)

val topicScores = mutableMapOf<Topic, TopicScores>()
var behaviorPenalty: Double by cappedDouble(0.0, peerParams.decayToZero)
Expand All @@ -134,7 +137,7 @@ class GossipScore(
val topicParams = params.topicsScoreParams

private val validationTime: MutableMap<PubsubMessage, Long> = createLRUMap(1024)
val peerScores = mutableMapOf<PeerId, PeerScores>()
val peerScores = ConcurrentHashMap<PeerId, PeerScores>()
private val peerIpCache = mutableMapOf<PeerId, String>()

val refreshTask: ScheduledFuture<*>
Expand Down Expand Up @@ -185,7 +188,7 @@ class GossipScore(
else behaviorExcess.pow(2) * peerParams.behaviourPenaltyWeight

val computedScore = topicsScore + appScore + ipColocationPenalty + routerPenalty
peerScore.cachedScore.set(computedScore)
peerScore.cachedScore = computedScore
return computedScore
}

Expand All @@ -198,7 +201,7 @@ class GossipScore(
}

fun getCachedScore(peerId: PeerId): Double {
return peerScores[peerId]?.cachedScore?.get() ?: 0.0
return peerScores[peerId]?.cachedScore ?: 0.0
}

fun notifyDisconnected(peer: P2PService.PeerHandler) {
Expand Down
51 changes: 51 additions & 0 deletions src/test/kotlin/io/libp2p/pubsub/gossip/GossipScoreTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,57 @@ class GossipScoreTest {
score.notifyMeshed(peer, topic)
assertThat(score.score(peer)).isEqualTo(0.0)

// After delivering a message, we should increase our score by firstMessageDeliveriesWeight
val msg = DefaultPubsubMessage(createRpcMessage(topic))
score.notifyUnseenValidMessage(peer, msg)
assertThat(score.score(peer)).isEqualTo(2.0)

// Refresh to decay score
score.refreshScores()
assertThat(score.score(peer)).isEqualTo(1.0)
}

@Test
fun `test getCachedScore returns expected values`() {
val peer = mockPeer()

// Setup score params with topic config
val peerScoreParams = GossipPeerScoreParams.builder().build()
val topic: Topic = "testTopic"
val topicScoreParams = GossipTopicScoreParams.builder()
.topicWeight(1.0)
.firstMessageDeliveriesCap(4.0)
.firstMessageDeliveriesWeight(2.0)
.firstMessageDeliveriesDecay(0.5)
.build()

val defaultScoreParams = GossipTopicScoreParams.builder().build()
val topicsScoreParams = GossipTopicsScoreParams(defaultScoreParams, mutableMapOf(Pair(topic, topicScoreParams)))
val scoreParams = GossipScoreParams(
peerScoreParams = peerScoreParams,
topicsScoreParams = topicsScoreParams
)

// Setup time provider - apply non-zero time so that we don't get 0-valued timestamps that may be interpreted
// as empty
val timeController = TimeControllerImpl()
timeController.addTime(1.hours)
val executor = ControlledExecutorServiceImpl(timeController)

// Check initial value
val score = GossipScore(scoreParams, executor, { timeController.time })
// Check value before interacting with peer
assertThat(score.getCachedScore(peer.peerId)).isEqualTo(0.0)

// Check value after accessing score
assertEquals(0.0, score.score(peer))
assertThat(score.getCachedScore(peer.peerId)).isEqualTo(0.0)

// Add peer to mesh
score.notifyMeshed(peer, topic)
assertThat(score.score(peer)).isEqualTo(0.0)
assertThat(score.getCachedScore(peer.peerId)).isEqualTo(0.0)

// After delivering a message, we should increase our score by firstMessageDeliveriesWeight
val msg = DefaultPubsubMessage(createRpcMessage(topic))
score.notifyUnseenValidMessage(peer, msg)
Expand Down

0 comments on commit 84a9c63

Please sign in to comment.