From efcf1bf4a92109064c5a6272044e99befb53cfb2 Mon Sep 17 00:00:00 2001 From: Bogdan Stirbat Date: Sat, 10 Apr 2021 16:59:18 +0300 Subject: [PATCH 1/7] Add method to retrieve peer score by peerId --- src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt | 5 +++++ src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt | 4 ++++ 2 files changed, 9 insertions(+) diff --git a/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt b/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt index be7b1c0c5..bccdc1e30 100644 --- a/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt +++ b/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt @@ -4,6 +4,7 @@ import io.libp2p.core.Connection import io.libp2p.core.ConnectionHandler import io.libp2p.core.P2PChannel import io.libp2p.core.Stream +import io.libp2p.core.PeerId import io.libp2p.core.multistream.ProtocolBinding import io.libp2p.core.multistream.ProtocolDescriptor import io.libp2p.core.pubsub.PubsubApi @@ -23,6 +24,10 @@ class Gossip @JvmOverloads constructor( router.score.updateTopicParams(scoreParams) } + fun getGossipScore(peerId: PeerId): GossipScore.PeerScores { + return router.score.getPeerScore(peerId) + } + override val protocolDescriptor = if (router.protocol == PubsubProtocol.Gossip_V_1_1) ProtocolDescriptor( diff --git a/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt b/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt index cb3fd4a11..f3e85ad01 100644 --- a/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt +++ b/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt @@ -193,6 +193,10 @@ class GossipScore( } } + fun getPeerScore(peerId: PeerId): PeerScores { + return peerScores.computeIfAbsent(peerId) { PeerScores() } + } + fun notifyDisconnected(peer: P2PService.PeerHandler) { getPeerScores(peer).topicScores.filter { it.value.inMesh() }.forEach { t, _ -> notifyPruned(peer, t) From fb46ca413ad769a4ab02b915c66cc106f079115f Mon Sep 17 00:00:00 2001 From: Bogdan Stirbat Date: Sat, 10 Apr 2021 17:06:12 +0300 Subject: [PATCH 2/7] fixed import order --- src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt b/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt index bccdc1e30..d02bcfd8b 100644 --- a/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt +++ b/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt @@ -3,8 +3,8 @@ package io.libp2p.pubsub.gossip import io.libp2p.core.Connection import io.libp2p.core.ConnectionHandler import io.libp2p.core.P2PChannel -import io.libp2p.core.Stream import io.libp2p.core.PeerId +import io.libp2p.core.Stream import io.libp2p.core.multistream.ProtocolBinding import io.libp2p.core.multistream.ProtocolDescriptor import io.libp2p.core.pubsub.PubsubApi From 59039e4b54340818820e1b17246720cfc267893b Mon Sep 17 00:00:00 2001 From: Bogdan Stirbat Date: Mon, 12 Apr 2021 21:44:41 +0300 Subject: [PATCH 3/7] Return a cached score --- src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt | 5 ----- src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt | 9 ++++++--- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt b/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt index d02bcfd8b..be7b1c0c5 100644 --- a/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt +++ b/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt @@ -3,7 +3,6 @@ package io.libp2p.pubsub.gossip import io.libp2p.core.Connection import io.libp2p.core.ConnectionHandler import io.libp2p.core.P2PChannel -import io.libp2p.core.PeerId import io.libp2p.core.Stream import io.libp2p.core.multistream.ProtocolBinding import io.libp2p.core.multistream.ProtocolDescriptor @@ -24,10 +23,6 @@ class Gossip @JvmOverloads constructor( router.score.updateTopicParams(scoreParams) } - fun getGossipScore(peerId: PeerId): GossipScore.PeerScores { - return router.score.getPeerScore(peerId) - } - override val protocolDescriptor = if (router.protocol == PubsubProtocol.Gossip_V_1_1) ProtocolDescriptor( diff --git a/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt b/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt index f3e85ad01..7cd55304e 100644 --- a/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt +++ b/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt @@ -132,6 +132,7 @@ class GossipScore( val topicParams = params.topicsScoreParams private val validationTime: MutableMap = createLRUMap(1024) + val cachedScores = mutableMapOf() val peerScores = mutableMapOf() private val peerIpCache = mutableMapOf() @@ -182,7 +183,9 @@ class GossipScore( if (behaviorExcess < 0) 0.0 else behaviorExcess.pow(2) * peerParams.behaviourPenaltyWeight - return topicsScore + appScore + ipColocationPenalty + routerPenalty + val computedScore = topicsScore + appScore + ipColocationPenalty + routerPenalty + cachedScores[peer.peerId] = computedScore + return computedScore } fun refreshScores() { @@ -193,8 +196,8 @@ class GossipScore( } } - fun getPeerScore(peerId: PeerId): PeerScores { - return peerScores.computeIfAbsent(peerId) { PeerScores() } + fun getCachedScore(peerId: PeerId): Double { + return cachedScores.computeIfAbsent(peerId) { 0.0 } } fun notifyDisconnected(peer: P2PService.PeerHandler) { From fb345a967941a97879a21bb04bcf68011b6c1f60 Mon Sep 17 00:00:00 2001 From: Bogdan Stirbat Date: Mon, 12 Apr 2021 23:31:22 +0300 Subject: [PATCH 4/7] Peer score cache is now thread safe --- src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt | 5 +++++ src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt | 7 ++++--- src/test/kotlin/io/libp2p/pubsub/gossip/GossipScoreTest.kt | 2 ++ 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt b/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt index be7b1c0c5..35902cc7d 100644 --- a/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt +++ b/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt @@ -3,6 +3,7 @@ package io.libp2p.pubsub.gossip import io.libp2p.core.Connection import io.libp2p.core.ConnectionHandler import io.libp2p.core.P2PChannel +import io.libp2p.core.PeerId import io.libp2p.core.Stream import io.libp2p.core.multistream.ProtocolBinding import io.libp2p.core.multistream.ProtocolDescriptor @@ -23,6 +24,10 @@ class Gossip @JvmOverloads constructor( router.score.updateTopicParams(scoreParams) } + fun getGossipScore(peerId: PeerId): Double { + return router.score.getCachedScore(peerId) + } + override val protocolDescriptor = if (router.protocol == PubsubProtocol.Gossip_V_1_1) ProtocolDescriptor( diff --git a/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt b/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt index 7cd55304e..86ca40e89 100644 --- a/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt +++ b/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt @@ -13,6 +13,7 @@ import java.util.Optional import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.ScheduledFuture import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicReference import kotlin.math.max import kotlin.math.min import kotlin.math.pow @@ -117,6 +118,7 @@ class GossipScore( val ips = mutableSetOf() var connectedTimeMillis: Long = 0 var disconnectedTimeMillis: Long = 0 + var cachedScore: AtomicReference = AtomicReference(0.0) val topicScores = mutableMapOf() var behaviorPenalty: Double by cappedDouble(0.0, peerParams.decayToZero) @@ -132,7 +134,6 @@ class GossipScore( val topicParams = params.topicsScoreParams private val validationTime: MutableMap = createLRUMap(1024) - val cachedScores = mutableMapOf() val peerScores = mutableMapOf() private val peerIpCache = mutableMapOf() @@ -184,7 +185,7 @@ class GossipScore( else behaviorExcess.pow(2) * peerParams.behaviourPenaltyWeight val computedScore = topicsScore + appScore + ipColocationPenalty + routerPenalty - cachedScores[peer.peerId] = computedScore + peerScore.cachedScore.set(computedScore) return computedScore } @@ -197,7 +198,7 @@ class GossipScore( } fun getCachedScore(peerId: PeerId): Double { - return cachedScores.computeIfAbsent(peerId) { 0.0 } + return peerScores[peerId]?.cachedScore?.get() ?: 0.0 } fun notifyDisconnected(peer: P2PService.PeerHandler) { diff --git a/src/test/kotlin/io/libp2p/pubsub/gossip/GossipScoreTest.kt b/src/test/kotlin/io/libp2p/pubsub/gossip/GossipScoreTest.kt index 716ebc3e7..2e0455883 100644 --- a/src/test/kotlin/io/libp2p/pubsub/gossip/GossipScoreTest.kt +++ b/src/test/kotlin/io/libp2p/pubsub/gossip/GossipScoreTest.kt @@ -180,10 +180,12 @@ class GossipScoreTest { val msg = DefaultPubsubMessage(createRpcMessage(topic)) score.notifyUnseenValidMessage(peer, msg) assertThat(score.score(peer)).isEqualTo(2.0) + assertThat(score.getCachedScore(peer.peerId)).isEqualTo(2.0) // Refresh to decay score score.refreshScores() assertThat(score.score(peer)).isEqualTo(1.0) + assertThat(score.getCachedScore(peer.peerId)).isEqualTo(1.0) } @Test From 4904c84c7ffb6f1b3aa898878b846d855fe12133 Mon Sep 17 00:00:00 2001 From: Bogdan Stirbat Date: Wed, 14 Apr 2021 21:42:39 +0300 Subject: [PATCH 5/7] cachedScore is a val variable --- src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt b/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt index 86ca40e89..eee96b89d 100644 --- a/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt +++ b/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt @@ -118,7 +118,7 @@ class GossipScore( val ips = mutableSetOf() var connectedTimeMillis: Long = 0 var disconnectedTimeMillis: Long = 0 - var cachedScore: AtomicReference = AtomicReference(0.0) + val cachedScore: AtomicReference = AtomicReference(0.0) val topicScores = mutableMapOf() var behaviorPenalty: Double by cappedDouble(0.0, peerParams.decayToZero) From 84a9c6380f6de36fecd23ef29bd5ac11fe1f9078 Mon Sep 17 00:00:00 2001 From: mbaxter Date: Fri, 23 Apr 2021 10:52:14 -0400 Subject: [PATCH 6/7] Safely access cached gossip score across threads (#190) --- .../io/libp2p/pubsub/gossip/GossipScore.kt | 13 +++-- .../libp2p/pubsub/gossip/GossipScoreTest.kt | 51 +++++++++++++++++++ 2 files changed, 59 insertions(+), 5 deletions(-) diff --git a/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt b/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt index eee96b89d..e3b9bfb2c 100644 --- a/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt +++ b/src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt @@ -10,10 +10,10 @@ import io.libp2p.etc.util.P2PService import io.libp2p.pubsub.PubsubMessage import io.libp2p.pubsub.Topic import java.util.Optional +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.ScheduledFuture import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicReference import kotlin.math.max import kotlin.math.min import kotlin.math.pow @@ -115,10 +115,13 @@ class GossipScore( } inner class PeerScores { + // Cached values are accessed across threads + @Volatile + var cachedScore: Double = 0.0 + val ips = mutableSetOf() var connectedTimeMillis: Long = 0 var disconnectedTimeMillis: Long = 0 - val cachedScore: AtomicReference = AtomicReference(0.0) val topicScores = mutableMapOf() var behaviorPenalty: Double by cappedDouble(0.0, peerParams.decayToZero) @@ -134,7 +137,7 @@ class GossipScore( val topicParams = params.topicsScoreParams private val validationTime: MutableMap = createLRUMap(1024) - val peerScores = mutableMapOf() + val peerScores = ConcurrentHashMap() private val peerIpCache = mutableMapOf() val refreshTask: ScheduledFuture<*> @@ -185,7 +188,7 @@ class GossipScore( else behaviorExcess.pow(2) * peerParams.behaviourPenaltyWeight val computedScore = topicsScore + appScore + ipColocationPenalty + routerPenalty - peerScore.cachedScore.set(computedScore) + peerScore.cachedScore = computedScore return computedScore } @@ -198,7 +201,7 @@ class GossipScore( } fun getCachedScore(peerId: PeerId): Double { - return peerScores[peerId]?.cachedScore?.get() ?: 0.0 + return peerScores[peerId]?.cachedScore ?: 0.0 } fun notifyDisconnected(peer: P2PService.PeerHandler) { diff --git a/src/test/kotlin/io/libp2p/pubsub/gossip/GossipScoreTest.kt b/src/test/kotlin/io/libp2p/pubsub/gossip/GossipScoreTest.kt index 2e0455883..d0fd224b7 100644 --- a/src/test/kotlin/io/libp2p/pubsub/gossip/GossipScoreTest.kt +++ b/src/test/kotlin/io/libp2p/pubsub/gossip/GossipScoreTest.kt @@ -176,6 +176,57 @@ class GossipScoreTest { score.notifyMeshed(peer, topic) assertThat(score.score(peer)).isEqualTo(0.0) + // After delivering a message, we should increase our score by firstMessageDeliveriesWeight + val msg = DefaultPubsubMessage(createRpcMessage(topic)) + score.notifyUnseenValidMessage(peer, msg) + assertThat(score.score(peer)).isEqualTo(2.0) + + // Refresh to decay score + score.refreshScores() + assertThat(score.score(peer)).isEqualTo(1.0) + } + + @Test + fun `test getCachedScore returns expected values`() { + val peer = mockPeer() + + // Setup score params with topic config + val peerScoreParams = GossipPeerScoreParams.builder().build() + val topic: Topic = "testTopic" + val topicScoreParams = GossipTopicScoreParams.builder() + .topicWeight(1.0) + .firstMessageDeliveriesCap(4.0) + .firstMessageDeliveriesWeight(2.0) + .firstMessageDeliveriesDecay(0.5) + .build() + + val defaultScoreParams = GossipTopicScoreParams.builder().build() + val topicsScoreParams = GossipTopicsScoreParams(defaultScoreParams, mutableMapOf(Pair(topic, topicScoreParams))) + val scoreParams = GossipScoreParams( + peerScoreParams = peerScoreParams, + topicsScoreParams = topicsScoreParams + ) + + // Setup time provider - apply non-zero time so that we don't get 0-valued timestamps that may be interpreted + // as empty + val timeController = TimeControllerImpl() + timeController.addTime(1.hours) + val executor = ControlledExecutorServiceImpl(timeController) + + // Check initial value + val score = GossipScore(scoreParams, executor, { timeController.time }) + // Check value before interacting with peer + assertThat(score.getCachedScore(peer.peerId)).isEqualTo(0.0) + + // Check value after accessing score + assertEquals(0.0, score.score(peer)) + assertThat(score.getCachedScore(peer.peerId)).isEqualTo(0.0) + + // Add peer to mesh + score.notifyMeshed(peer, topic) + assertThat(score.score(peer)).isEqualTo(0.0) + assertThat(score.getCachedScore(peer.peerId)).isEqualTo(0.0) + // After delivering a message, we should increase our score by firstMessageDeliveriesWeight val msg = DefaultPubsubMessage(createRpcMessage(topic)) score.notifyUnseenValidMessage(peer, msg) From 08a8417779a35af5f8fc514fc3e0adfe78fd0fb9 Mon Sep 17 00:00:00 2001 From: Meredith Baxter Date: Thu, 22 Apr 2021 14:56:37 -0400 Subject: [PATCH 7/7] Update version to 0.8.2 --- README.md | 4 ++-- build.gradle.kts | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 19f730401..fe3dab418 100644 --- a/README.md +++ b/README.md @@ -70,7 +70,7 @@ Hosting of artefacts is graciously provided by [Cloudsmith](https://cloudsmith.c maven { url "https://dl.cloudsmith.io/public/libp2p/jvm-libp2p/maven/" } } - implementation 'io.libp2p:jvm-libp2p-minimal:0.8.1-RELEASE' + implementation 'io.libp2p:jvm-libp2p-minimal:0.8.2-RELEASE' ``` ### Using Maven Add the repository to the `dependencyManagement` section of the pom file: @@ -96,7 +96,7 @@ And then add jvm-libp2p as a dependency: io.libp2p jvm-libp2p-minimal - 0.8.1-RELEASE + 0.8.2-RELEASE pom ``` diff --git a/build.gradle.kts b/build.gradle.kts index a04e0de9a..8a867673a 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -12,7 +12,7 @@ import java.nio.file.Paths // ./gradlew publish -PcloudsmithUser= -PcloudsmithApiKey= group = "io.libp2p" -version = "0.8.1-RELEASE" +version = "0.8.2-RELEASE" description = "a minimal implementation of libp2p for the jvm" plugins {