Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Safely access cached gossip score across threads #190

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import io.libp2p.etc.util.P2PService
import io.libp2p.pubsub.PubsubMessage
import io.libp2p.pubsub.Topic
import java.util.Optional
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import kotlin.math.max
import kotlin.math.min
import kotlin.math.pow
Expand Down Expand Up @@ -115,10 +115,13 @@ class GossipScore(
}

inner class PeerScores {
// Cached values are accessed across threads
@Volatile
var cachedScore: Double = 0.0

val ips = mutableSetOf<String>()
var connectedTimeMillis: Long = 0
var disconnectedTimeMillis: Long = 0
val cachedScore: AtomicReference<Double> = AtomicReference(0.0)

val topicScores = mutableMapOf<Topic, TopicScores>()
var behaviorPenalty: Double by cappedDouble(0.0, peerParams.decayToZero)
Expand All @@ -134,7 +137,7 @@ class GossipScore(
val topicParams = params.topicsScoreParams

private val validationTime: MutableMap<PubsubMessage, Long> = createLRUMap(1024)
val peerScores = mutableMapOf<PeerId, PeerScores>()
val peerScores = ConcurrentHashMap<PeerId, PeerScores>()
private val peerIpCache = mutableMapOf<PeerId, String>()

val refreshTask: ScheduledFuture<*>
Expand Down Expand Up @@ -185,7 +188,7 @@ class GossipScore(
else behaviorExcess.pow(2) * peerParams.behaviourPenaltyWeight

val computedScore = topicsScore + appScore + ipColocationPenalty + routerPenalty
peerScore.cachedScore.set(computedScore)
peerScore.cachedScore = computedScore
return computedScore
}

Expand All @@ -198,7 +201,7 @@ class GossipScore(
}

fun getCachedScore(peerId: PeerId): Double {
return peerScores[peerId]?.cachedScore?.get() ?: 0.0
return peerScores[peerId]?.cachedScore ?: 0.0
}

fun notifyDisconnected(peer: P2PService.PeerHandler) {
Expand Down
51 changes: 51 additions & 0 deletions src/test/kotlin/io/libp2p/pubsub/gossip/GossipScoreTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,57 @@ class GossipScoreTest {
score.notifyMeshed(peer, topic)
assertThat(score.score(peer)).isEqualTo(0.0)

// After delivering a message, we should increase our score by firstMessageDeliveriesWeight
val msg = DefaultPubsubMessage(createRpcMessage(topic))
score.notifyUnseenValidMessage(peer, msg)
assertThat(score.score(peer)).isEqualTo(2.0)

// Refresh to decay score
score.refreshScores()
assertThat(score.score(peer)).isEqualTo(1.0)
}

@Test
fun `test getCachedScore returns expected values`() {
val peer = mockPeer()

// Setup score params with topic config
val peerScoreParams = GossipPeerScoreParams.builder().build()
val topic: Topic = "testTopic"
val topicScoreParams = GossipTopicScoreParams.builder()
.topicWeight(1.0)
.firstMessageDeliveriesCap(4.0)
.firstMessageDeliveriesWeight(2.0)
.firstMessageDeliveriesDecay(0.5)
.build()

val defaultScoreParams = GossipTopicScoreParams.builder().build()
val topicsScoreParams = GossipTopicsScoreParams(defaultScoreParams, mutableMapOf(Pair(topic, topicScoreParams)))
val scoreParams = GossipScoreParams(
peerScoreParams = peerScoreParams,
topicsScoreParams = topicsScoreParams
)

// Setup time provider - apply non-zero time so that we don't get 0-valued timestamps that may be interpreted
// as empty
val timeController = TimeControllerImpl()
timeController.addTime(1.hours)
val executor = ControlledExecutorServiceImpl(timeController)

// Check initial value
val score = GossipScore(scoreParams, executor, { timeController.time })
// Check value before interacting with peer
assertThat(score.getCachedScore(peer.peerId)).isEqualTo(0.0)

// Check value after accessing score
assertEquals(0.0, score.score(peer))
assertThat(score.getCachedScore(peer.peerId)).isEqualTo(0.0)

// Add peer to mesh
score.notifyMeshed(peer, topic)
assertThat(score.score(peer)).isEqualTo(0.0)
assertThat(score.getCachedScore(peer.peerId)).isEqualTo(0.0)

// After delivering a message, we should increase our score by firstMessageDeliveriesWeight
val msg = DefaultPubsubMessage(createRpcMessage(topic))
score.notifyUnseenValidMessage(peer, msg)
Expand Down