Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release 0.8.2 #191

Merged
merged 9 commits into from
Apr 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ Hosting of artefacts is graciously provided by [Cloudsmith](https://cloudsmith.c
maven { url "https://dl.cloudsmith.io/public/libp2p/jvm-libp2p/maven/" }
}
implementation 'io.libp2p:jvm-libp2p-minimal:0.8.1-RELEASE'
implementation 'io.libp2p:jvm-libp2p-minimal:0.8.2-RELEASE'
```
### Using Maven
Add the repository to the `dependencyManagement` section of the pom file:
Expand All @@ -96,7 +96,7 @@ And then add jvm-libp2p as a dependency:
<dependency>
<groupId>io.libp2p</groupId>
<artifactId>jvm-libp2p-minimal</artifactId>
<version>0.8.1-RELEASE</version>
<version>0.8.2-RELEASE</version>
<type>pom</type>
</dependency>
```
Expand Down
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import java.nio.file.Paths
// ./gradlew publish -PcloudsmithUser=<user> -PcloudsmithApiKey=<api-key>

group = "io.libp2p"
version = "0.8.1-RELEASE"
version = "0.8.2-RELEASE"
description = "a minimal implementation of libp2p for the jvm"

plugins {
Expand Down
5 changes: 5 additions & 0 deletions src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.libp2p.pubsub.gossip
import io.libp2p.core.Connection
import io.libp2p.core.ConnectionHandler
import io.libp2p.core.P2PChannel
import io.libp2p.core.PeerId
import io.libp2p.core.Stream
import io.libp2p.core.multistream.ProtocolBinding
import io.libp2p.core.multistream.ProtocolDescriptor
Expand All @@ -23,6 +24,10 @@ class Gossip @JvmOverloads constructor(
router.score.updateTopicParams(scoreParams)
}

fun getGossipScore(peerId: PeerId): Double {
return router.score.getCachedScore(peerId)
}

override val protocolDescriptor =
if (router.protocol == PubsubProtocol.Gossip_V_1_1)
ProtocolDescriptor(
Expand Down
15 changes: 13 additions & 2 deletions src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import io.libp2p.etc.util.P2PService
import io.libp2p.pubsub.PubsubMessage
import io.libp2p.pubsub.Topic
import java.util.Optional
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -114,6 +115,10 @@ class GossipScore(
}

inner class PeerScores {
// Cached values are accessed across threads
@Volatile
var cachedScore: Double = 0.0

val ips = mutableSetOf<String>()
var connectedTimeMillis: Long = 0
var disconnectedTimeMillis: Long = 0
Expand All @@ -132,7 +137,7 @@ class GossipScore(
val topicParams = params.topicsScoreParams

private val validationTime: MutableMap<PubsubMessage, Long> = createLRUMap(1024)
val peerScores = mutableMapOf<PeerId, PeerScores>()
val peerScores = ConcurrentHashMap<PeerId, PeerScores>()
private val peerIpCache = mutableMapOf<PeerId, String>()

val refreshTask: ScheduledFuture<*>
Expand Down Expand Up @@ -182,7 +187,9 @@ class GossipScore(
if (behaviorExcess < 0) 0.0
else behaviorExcess.pow(2) * peerParams.behaviourPenaltyWeight

return topicsScore + appScore + ipColocationPenalty + routerPenalty
val computedScore = topicsScore + appScore + ipColocationPenalty + routerPenalty
peerScore.cachedScore = computedScore
return computedScore
}

fun refreshScores() {
Expand All @@ -193,6 +200,10 @@ class GossipScore(
}
}

fun getCachedScore(peerId: PeerId): Double {
return peerScores[peerId]?.cachedScore ?: 0.0
}

fun notifyDisconnected(peer: P2PService.PeerHandler) {
getPeerScores(peer).topicScores.filter { it.value.inMesh() }.forEach { t, _ ->
notifyPruned(peer, t)
Expand Down
53 changes: 53 additions & 0 deletions src/test/kotlin/io/libp2p/pubsub/gossip/GossipScoreTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,59 @@ class GossipScoreTest {
assertThat(score.score(peer)).isEqualTo(1.0)
}

@Test
fun `test getCachedScore returns expected values`() {
val peer = mockPeer()

// Setup score params with topic config
val peerScoreParams = GossipPeerScoreParams.builder().build()
val topic: Topic = "testTopic"
val topicScoreParams = GossipTopicScoreParams.builder()
.topicWeight(1.0)
.firstMessageDeliveriesCap(4.0)
.firstMessageDeliveriesWeight(2.0)
.firstMessageDeliveriesDecay(0.5)
.build()

val defaultScoreParams = GossipTopicScoreParams.builder().build()
val topicsScoreParams = GossipTopicsScoreParams(defaultScoreParams, mutableMapOf(Pair(topic, topicScoreParams)))
val scoreParams = GossipScoreParams(
peerScoreParams = peerScoreParams,
topicsScoreParams = topicsScoreParams
)

// Setup time provider - apply non-zero time so that we don't get 0-valued timestamps that may be interpreted
// as empty
val timeController = TimeControllerImpl()
timeController.addTime(1.hours)
val executor = ControlledExecutorServiceImpl(timeController)

// Check initial value
val score = GossipScore(scoreParams, executor, { timeController.time })
// Check value before interacting with peer
assertThat(score.getCachedScore(peer.peerId)).isEqualTo(0.0)

// Check value after accessing score
assertEquals(0.0, score.score(peer))
assertThat(score.getCachedScore(peer.peerId)).isEqualTo(0.0)

// Add peer to mesh
score.notifyMeshed(peer, topic)
assertThat(score.score(peer)).isEqualTo(0.0)
assertThat(score.getCachedScore(peer.peerId)).isEqualTo(0.0)

// After delivering a message, we should increase our score by firstMessageDeliveriesWeight
val msg = DefaultPubsubMessage(createRpcMessage(topic))
score.notifyUnseenValidMessage(peer, msg)
assertThat(score.score(peer)).isEqualTo(2.0)
assertThat(score.getCachedScore(peer.peerId)).isEqualTo(2.0)

// Refresh to decay score
score.refreshScores()
assertThat(score.score(peer)).isEqualTo(1.0)
assertThat(score.getCachedScore(peer.peerId)).isEqualTo(1.0)
}

@Test
fun `test mesh message delivery decay`() {
val peer = mockPeer()
Expand Down