Skip to content

Commit

Permalink
Merge pull request #191 from mbaxter/0.8.2
Browse files Browse the repository at this point in the history
Release 0.8.2
  • Loading branch information
mbaxter committed Apr 23, 2021
2 parents 6cac516 + 08a8417 commit a0ece47
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 5 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ Hosting of artefacts is graciously provided by [Cloudsmith](https://cloudsmith.c
maven { url "https://dl.cloudsmith.io/public/libp2p/jvm-libp2p/maven/" }
}
implementation 'io.libp2p:jvm-libp2p-minimal:0.8.1-RELEASE'
implementation 'io.libp2p:jvm-libp2p-minimal:0.8.2-RELEASE'
```
### Using Maven
Add the repository to the `dependencyManagement` section of the pom file:
Expand All @@ -96,7 +96,7 @@ And then add jvm-libp2p as a dependency:
<dependency>
<groupId>io.libp2p</groupId>
<artifactId>jvm-libp2p-minimal</artifactId>
<version>0.8.1-RELEASE</version>
<version>0.8.2-RELEASE</version>
<type>pom</type>
</dependency>
```
Expand Down
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import java.nio.file.Paths
// ./gradlew publish -PcloudsmithUser=<user> -PcloudsmithApiKey=<api-key>

group = "io.libp2p"
version = "0.8.1-RELEASE"
version = "0.8.2-RELEASE"
description = "a minimal implementation of libp2p for the jvm"

plugins {
Expand Down
5 changes: 5 additions & 0 deletions src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.libp2p.pubsub.gossip
import io.libp2p.core.Connection
import io.libp2p.core.ConnectionHandler
import io.libp2p.core.P2PChannel
import io.libp2p.core.PeerId
import io.libp2p.core.Stream
import io.libp2p.core.multistream.ProtocolBinding
import io.libp2p.core.multistream.ProtocolDescriptor
Expand All @@ -23,6 +24,10 @@ class Gossip @JvmOverloads constructor(
router.score.updateTopicParams(scoreParams)
}

fun getGossipScore(peerId: PeerId): Double {
return router.score.getCachedScore(peerId)
}

override val protocolDescriptor =
if (router.protocol == PubsubProtocol.Gossip_V_1_1)
ProtocolDescriptor(
Expand Down
15 changes: 13 additions & 2 deletions src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import io.libp2p.etc.util.P2PService
import io.libp2p.pubsub.PubsubMessage
import io.libp2p.pubsub.Topic
import java.util.Optional
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -114,6 +115,10 @@ class GossipScore(
}

inner class PeerScores {
// Cached values are accessed across threads
@Volatile
var cachedScore: Double = 0.0

val ips = mutableSetOf<String>()
var connectedTimeMillis: Long = 0
var disconnectedTimeMillis: Long = 0
Expand All @@ -132,7 +137,7 @@ class GossipScore(
val topicParams = params.topicsScoreParams

private val validationTime: MutableMap<PubsubMessage, Long> = createLRUMap(1024)
val peerScores = mutableMapOf<PeerId, PeerScores>()
val peerScores = ConcurrentHashMap<PeerId, PeerScores>()
private val peerIpCache = mutableMapOf<PeerId, String>()

val refreshTask: ScheduledFuture<*>
Expand Down Expand Up @@ -182,7 +187,9 @@ class GossipScore(
if (behaviorExcess < 0) 0.0
else behaviorExcess.pow(2) * peerParams.behaviourPenaltyWeight

return topicsScore + appScore + ipColocationPenalty + routerPenalty
val computedScore = topicsScore + appScore + ipColocationPenalty + routerPenalty
peerScore.cachedScore = computedScore
return computedScore
}

fun refreshScores() {
Expand All @@ -193,6 +200,10 @@ class GossipScore(
}
}

fun getCachedScore(peerId: PeerId): Double {
return peerScores[peerId]?.cachedScore ?: 0.0
}

fun notifyDisconnected(peer: P2PService.PeerHandler) {
getPeerScores(peer).topicScores.filter { it.value.inMesh() }.forEach { t, _ ->
notifyPruned(peer, t)
Expand Down
53 changes: 53 additions & 0 deletions src/test/kotlin/io/libp2p/pubsub/gossip/GossipScoreTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,59 @@ class GossipScoreTest {
assertThat(score.score(peer)).isEqualTo(1.0)
}

@Test
fun `test getCachedScore returns expected values`() {
val peer = mockPeer()

// Setup score params with topic config
val peerScoreParams = GossipPeerScoreParams.builder().build()
val topic: Topic = "testTopic"
val topicScoreParams = GossipTopicScoreParams.builder()
.topicWeight(1.0)
.firstMessageDeliveriesCap(4.0)
.firstMessageDeliveriesWeight(2.0)
.firstMessageDeliveriesDecay(0.5)
.build()

val defaultScoreParams = GossipTopicScoreParams.builder().build()
val topicsScoreParams = GossipTopicsScoreParams(defaultScoreParams, mutableMapOf(Pair(topic, topicScoreParams)))
val scoreParams = GossipScoreParams(
peerScoreParams = peerScoreParams,
topicsScoreParams = topicsScoreParams
)

// Setup time provider - apply non-zero time so that we don't get 0-valued timestamps that may be interpreted
// as empty
val timeController = TimeControllerImpl()
timeController.addTime(1.hours)
val executor = ControlledExecutorServiceImpl(timeController)

// Check initial value
val score = GossipScore(scoreParams, executor, { timeController.time })
// Check value before interacting with peer
assertThat(score.getCachedScore(peer.peerId)).isEqualTo(0.0)

// Check value after accessing score
assertEquals(0.0, score.score(peer))
assertThat(score.getCachedScore(peer.peerId)).isEqualTo(0.0)

// Add peer to mesh
score.notifyMeshed(peer, topic)
assertThat(score.score(peer)).isEqualTo(0.0)
assertThat(score.getCachedScore(peer.peerId)).isEqualTo(0.0)

// After delivering a message, we should increase our score by firstMessageDeliveriesWeight
val msg = DefaultPubsubMessage(createRpcMessage(topic))
score.notifyUnseenValidMessage(peer, msg)
assertThat(score.score(peer)).isEqualTo(2.0)
assertThat(score.getCachedScore(peer.peerId)).isEqualTo(2.0)

// Refresh to decay score
score.refreshScores()
assertThat(score.score(peer)).isEqualTo(1.0)
assertThat(score.getCachedScore(peer.peerId)).isEqualTo(1.0)
}

@Test
fun `test mesh message delivery decay`() {
val peer = mockPeer()
Expand Down

0 comments on commit a0ece47

Please sign in to comment.