diff --git a/src/main/kotlin/io/libp2p/etc/util/P2PService.kt b/src/main/kotlin/io/libp2p/etc/util/P2PService.kt index fcfee1b33..0c712ce14 100644 --- a/src/main/kotlin/io/libp2p/etc/util/P2PService.kt +++ b/src/main/kotlin/io/libp2p/etc/util/P2PService.kt @@ -1,21 +1,18 @@ package io.libp2p.etc.util -import com.google.common.util.concurrent.ThreadFactoryBuilder import io.libp2p.core.InternalErrorException import io.libp2p.core.PeerId import io.libp2p.core.Stream -import io.libp2p.etc.types.lazyVarInit import io.libp2p.etc.types.submitAsync import io.libp2p.etc.types.toVoidCompletableFuture -import io.libp2p.pubsub.AbstractRouter import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelInboundHandlerAdapter import io.netty.util.ReferenceCountUtil import org.apache.logging.log4j.LogManager import java.util.concurrent.CompletableFuture -import java.util.concurrent.Executors import java.util.concurrent.ScheduledExecutorService -import java.util.function.Supplier + +private val logger = LogManager.getLogger(P2PService::class.java) /** * Base class for a service which manages many streams from different peers @@ -28,8 +25,34 @@ import java.util.function.Supplier * service API should be executed on this thread to be thread-safe. * Consider using the following helpers [runOnEventThread], [submitOnEventThread], [submitAsyncOnEventThread] * or use the [executor] directly + * + * @param executor Executor backed by a single event thread + * It is only safe to perform any service logic via this executor */ -abstract class P2PService { +abstract class P2PService( + protected val executor: ScheduledExecutorService +) { + + private val peersMutable = mutableListOf() + /** + * List of connected peers. + * Note that connected peer could not be ready for writing yet, so consider [activePeers] + * if any data is to be send + */ + val peers: List = peersMutable + + private val activePeersMutable = mutableListOf() + /** + * List of active peers to which data could be written + */ + val activePeers: List = activePeersMutable + + private val peerIdToPeerHandlerMapMutable = mutableMapOf() + + /** + * Maps [PeerId] to [PeerHandler] instance for connected peers + */ + val peerIdToPeerHandlerMap: Map = peerIdToPeerHandlerMapMutable /** * Represents a single stream @@ -107,40 +130,6 @@ abstract class P2PService { } } - /** - * Executor backed by a single event thread - * It is only safe to perform any service logic via this executor - * - * The executor can be altered right after the instance creation. - * Changing it later may have unpredictable results - */ - var executor: ScheduledExecutorService by lazyVarInit { - Executors.newSingleThreadScheduledExecutor( - threadFactory - ) - } - - private val peersMutable = mutableListOf() - /** - * List of connected peers. - * Note that connected peer could not be ready for writing yet, so consider [activePeers] - * if any data is to be send - */ - val peers: List = peersMutable - - private val activePeersMutable = mutableListOf() - /** - * List of active peers to which data could be written - */ - val activePeers: List = activePeersMutable - - private val peerIdToPeerHandlerMapMutable = mutableMapOf() - - /** - * Maps [PeerId] to [PeerHandler] instance for connected peers - */ - val peerIdToPeerHandlerMap: Map = peerIdToPeerHandlerMapMutable - /** * Adds a new stream to service. This method should **synchronously** init the underlying * [io.netty.channel.Channel] @@ -150,6 +139,17 @@ abstract class P2PService { */ open fun addNewStream(stream: Stream) = initChannel(StreamHandler(stream)) + /** + * Callback to initialize the [Stream] underlying [io.netty.channel.Channel] + * + * Is invoked **not** on the event thread + * [io.netty.channel.Channel] initialization must be performed **synchronously on the caller thread**. + * **Don't** initialize the channel on event thread! + * Any service logic related to adding a new stream could be performed + * within overridden [streamAdded] callback (which is invoked on event thread) + */ + protected abstract fun initChannel(streamHandler: StreamHandler) + protected open fun streamAdded(streamHandler: StreamHandler) { val peerHandler = createPeerHandler(streamHandler) streamHandler.initPeerHandler(peerHandler) @@ -184,17 +184,6 @@ abstract class P2PService { onInbound(stream.getPeerHandler(), msg) } - /** - * Callback to initialize the [Stream] underlying [io.netty.channel.Channel] - * - * Is invoked **not** on the event thread - * [io.netty.channel.Channel] initialization must be performed **synchronously on the caller thread**. - * **Don't** initialize the channel on event thread! - * Any service logic related to adding a new stream could be performed - * within overridden [streamAdded] callback (which is invoked on event thread) - */ - protected abstract fun initChannel(streamHandler: StreamHandler) - /** * Callback notifies that the peer is active and ready for writing data * Invoked on event thread @@ -251,15 +240,9 @@ abstract class P2PService { /** * Executes the code on the service event thread */ - fun submitOnEventThread(run: () -> C): CompletableFuture = CompletableFuture.supplyAsync(Supplier { run() }, executor) + fun submitOnEventThread(run: () -> C): CompletableFuture = CompletableFuture.supplyAsync({ run() }, executor) /** * Executes the code on the service event thread */ fun submitAsyncOnEventThread(run: () -> CompletableFuture): CompletableFuture = executor.submitAsync(run) - - companion object { - private val threadFactory = ThreadFactoryBuilder().setDaemon(true).setNameFormat("P2PService-event-thread-%d").build() - @JvmStatic - val logger = LogManager.getLogger(AbstractRouter::class.java) - } } diff --git a/src/main/kotlin/io/libp2p/etc/util/P2PServiceSemiDuplex.kt b/src/main/kotlin/io/libp2p/etc/util/P2PServiceSemiDuplex.kt index 998516918..38e24096c 100644 --- a/src/main/kotlin/io/libp2p/etc/util/P2PServiceSemiDuplex.kt +++ b/src/main/kotlin/io/libp2p/etc/util/P2PServiceSemiDuplex.kt @@ -5,13 +5,14 @@ import io.libp2p.core.SemiDuplexNoOutboundStreamException import io.libp2p.etc.types.completedExceptionally import io.libp2p.etc.types.toVoidCompletableFuture import java.util.concurrent.CompletableFuture +import java.util.concurrent.ScheduledExecutorService /** * The service where communication between peers is performed via two [io.libp2p.core.Stream]s * They are initiated asynchronously by each peer. Initiated stream is used solely for writing data * and accepted steam is used solely for reading */ -abstract class P2PServiceSemiDuplex : P2PService() { +abstract class P2PServiceSemiDuplex(executor: ScheduledExecutorService) : P2PService(executor) { inner class SDPeerHandler(streamHandler: StreamHandler) : PeerHandler(streamHandler) { diff --git a/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt b/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt index 9d7cc8189..91a8c1d23 100644 --- a/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt +++ b/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt @@ -3,13 +3,11 @@ package io.libp2p.pubsub import io.libp2p.core.BadPeerException import io.libp2p.core.PeerId import io.libp2p.core.Stream -import io.libp2p.core.pubsub.RESULT_VALID import io.libp2p.core.pubsub.ValidationResult import io.libp2p.etc.types.MultiSet import io.libp2p.etc.types.completedExceptionally import io.libp2p.etc.types.copy import io.libp2p.etc.types.forward -import io.libp2p.etc.types.lazyVarInit import io.libp2p.etc.types.thenApplyAll import io.libp2p.etc.types.toWBytes import io.libp2p.etc.util.P2PServiceSemiDuplex @@ -22,25 +20,41 @@ import org.apache.logging.log4j.LogManager import pubsub.pb.Rpc import java.util.Collections.singletonList import java.util.Optional -import java.util.Random import java.util.concurrent.CompletableFuture +import java.util.concurrent.ScheduledExecutorService import java.util.function.BiConsumer import java.util.function.Consumer // 1 MB default max message size const val DEFAULT_MAX_PUBSUB_MESSAGE_SIZE = 1 shl 20 +typealias PubsubMessageHandler = (PubsubMessage) -> CompletableFuture + open class DefaultPubsubMessage(override val protobufMessage: Rpc.Message) : AbstractPubsubMessage() { override val messageId: MessageId = protobufMessage.from.toWBytes() + protobufMessage.seqno.toWBytes() } +private val logger = LogManager.getLogger(AbstractRouter::class.java) + /** * Implements common logic for pubsub routers */ abstract class AbstractRouter( - val subscriptionFilter: TopicSubscriptionFilter, - val maxMsgSize: Int = DEFAULT_MAX_PUBSUB_MESSAGE_SIZE -) : P2PServiceSemiDuplex(), PubsubRouter, PubsubRouterDebug { + executor: ScheduledExecutorService, + override val protocol: PubsubProtocol, + protected val subscriptionFilter: TopicSubscriptionFilter, + protected val maxMsgSize: Int, + override val messageFactory: PubsubMessageFactory, + protected val seenMessages: SeenCache>, + protected val messageValidator: PubsubRouterMessageValidator +) : P2PServiceSemiDuplex(executor), PubsubRouter, PubsubRouterDebug { + + protected var msgHandler: PubsubMessageHandler = { throw IllegalStateException("Message handler is not initialized for PubsubRouter") } + + protected open val peerTopics = MultiSet() + protected open val subscribedTopics = linkedSetOf() + protected open val pendingRpcParts = PendingRpcPartsMap { DefaultRpcPartsQueue() } + protected open val pendingMessagePromises = MultiSet>() protected class PendingRpcPartsMap( private val queueFactory: () -> TPartsQueue @@ -53,28 +67,6 @@ abstract class AbstractRouter( fun popQueue(peer: PeerHandler) = map.remove(peer) ?: queueFactory() } - private val logger = LogManager.getLogger(AbstractRouter::class.java) - - override var curTimeMillis: () -> Long by lazyVarInit { { System.currentTimeMillis() } } - override var random by lazyVarInit { Random() } - override var name: String = "router" - - override var messageFactory: PubsubMessageFactory = { DefaultPubsubMessage(it) } - var maxSeenMessagesLimit = 10000 - - protected open val seenMessages: SeenCache> by lazy { - LRUSeenCache(SimpleSeenCache(), maxSeenMessagesLimit) - } - - private val peerTopics = MultiSet() - private var msgHandler: (PubsubMessage) -> CompletableFuture = { RESULT_VALID } - override var messageValidator = NOP_ROUTER_VALIDATOR - - val subscribedTopics = linkedSetOf() - protected open val pendingRpcParts = PendingRpcPartsMap { DefaultRpcPartsQueue() } - private var debugHandler: ChannelHandler? = null - private val pendingMessagePromises = MultiSet>() - override fun publish(msg: PubsubMessage): CompletableFuture { return submitAsyncOnEventThread { if (msg in seenMessages) { @@ -114,26 +106,24 @@ abstract class AbstractRouter( } } - override fun addPeer(peer: Stream) { - addNewStream(peer) + override fun addPeer(peer: Stream) = addPeerWithDebugHandler(peer, null) + override fun addPeerWithDebugHandler(peer: Stream, debugHandler: ChannelHandler?) { + addNewStreamWithHandler(peer, debugHandler) } - override fun addPeerWithDebugHandler(peer: Stream, debugHandler: ChannelHandler?) { - this.debugHandler = debugHandler - try { - addPeer(peer) - } finally { - this.debugHandler = null - } + override fun addNewStream(stream: Stream) = addNewStreamWithHandler(stream, null) + protected fun addNewStreamWithHandler(stream: Stream, handler: ChannelHandler?) { + initChannelWithHandler(StreamHandler(stream), handler) } - override fun initChannel(streamHandler: StreamHandler) { + override fun initChannel(streamHandler: StreamHandler) = initChannelWithHandler(streamHandler, null) + protected open fun initChannelWithHandler(streamHandler: StreamHandler, handler: ChannelHandler?) { with(streamHandler.stream) { pushHandler(LimitedProtobufVarint32FrameDecoder(maxMsgSize)) pushHandler(ProtobufVarint32LengthFieldPrepender()) pushHandler(ProtobufDecoder(Rpc.RPC.getDefaultInstance())) pushHandler(ProtobufEncoder()) - debugHandler?.also { pushHandler(it) } + handler?.also { pushHandler(it) } pushHandler(streamHandler) } } diff --git a/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt b/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt index f52ecf293..ae3d94b16 100644 --- a/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt +++ b/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt @@ -7,9 +7,7 @@ import io.libp2p.core.pubsub.ValidationResult import io.libp2p.etc.types.WBytes import io.netty.channel.ChannelHandler import pubsub.pb.Rpc -import java.util.Random import java.util.concurrent.CompletableFuture -import java.util.concurrent.ScheduledExecutorService typealias Topic = String typealias MessageId = WBytes @@ -62,8 +60,7 @@ abstract class AbstractPubsubMessage : PubsubMessage { interface PubsubMessageRouter { val protocol: PubsubProtocol - var messageFactory: PubsubMessageFactory - var messageValidator: PubsubRouterMessageValidator + val messageFactory: PubsubMessageFactory /** * Validates and broadcasts the message to suitable peers @@ -135,28 +132,6 @@ interface PubsubRouter : PubsubMessageRouter, PubsubPeerRouter */ interface PubsubRouterDebug : PubsubRouter { - /** - * Adds ability to substitute the scheduler which is used for all async and periodic - * tasks within the router - */ - var executor: ScheduledExecutorService - - /** - * System time supplier. Normally defaults to [System.currentTimeMillis] - * If router needs system time it should refer to this supplier - */ - var curTimeMillis: () -> Long - - /** - * Randomness supplier - * Whenever router implementation needs random data it must refer to this var - * Tests may substitute this instance with a fixed-seed [Random] - * to perform deterministic testing - */ - var random: Random - - var name: String - /** * The same as [PubsubRouter.addPeer] but adds the [debugHandler] right before * the terminal handler diff --git a/src/main/kotlin/io/libp2p/pubsub/flood/FloodRouter.kt b/src/main/kotlin/io/libp2p/pubsub/flood/FloodRouter.kt index 06af0e4fe..7cc380b61 100644 --- a/src/main/kotlin/io/libp2p/pubsub/flood/FloodRouter.kt +++ b/src/main/kotlin/io/libp2p/pubsub/flood/FloodRouter.kt @@ -1,16 +1,23 @@ package io.libp2p.pubsub.flood import io.libp2p.etc.types.anyComplete -import io.libp2p.pubsub.AbstractRouter -import io.libp2p.pubsub.PubsubMessage -import io.libp2p.pubsub.PubsubProtocol -import io.libp2p.pubsub.TopicSubscriptionFilter +import io.libp2p.pubsub.* import pubsub.pb.Rpc import java.util.concurrent.CompletableFuture +import java.util.concurrent.Executors +import java.util.concurrent.ScheduledExecutorService -class FloodRouter : AbstractRouter(subscriptionFilter = TopicSubscriptionFilter.AllowAllTopicSubscriptionFilter()) { +const val DEFAULT_MAX_SEEN_MESSAGES_LIMIT: Int = 10000 - override val protocol = PubsubProtocol.Floodsub +class FloodRouter(executor: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor()) : AbstractRouter( + protocol = PubsubProtocol.Floodsub, + executor = executor, + subscriptionFilter = TopicSubscriptionFilter.AllowAllTopicSubscriptionFilter(), + maxMsgSize = Int.MAX_VALUE, + messageFactory = { DefaultPubsubMessage(it) }, + seenMessages = LRUSeenCache(SimpleSeenCache(), DEFAULT_MAX_SEEN_MESSAGES_LIMIT), + messageValidator = NOP_ROUTER_VALIDATOR +) { // msg: validated unseen messages received from api override fun broadcastOutbound(msg: PubsubMessage): CompletableFuture { diff --git a/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt b/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt index 35902cc7d..0645ddf46 100644 --- a/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt +++ b/src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt @@ -10,11 +10,12 @@ import io.libp2p.core.multistream.ProtocolDescriptor import io.libp2p.core.pubsub.PubsubApi import io.libp2p.pubsub.PubsubApiImpl import io.libp2p.pubsub.PubsubProtocol +import io.libp2p.pubsub.gossip.builders.GossipRouterBuilder import io.netty.channel.ChannelHandler import java.util.concurrent.CompletableFuture class Gossip @JvmOverloads constructor( - private val router: GossipRouter = GossipRouter(), + private val router: GossipRouter = GossipRouterBuilder().build(), private val api: PubsubApi = PubsubApiImpl(router), private val debugGossipHandler: ChannelHandler? = null ) : diff --git a/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt b/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt index d9fc769f2..29fffbbb1 100644 --- a/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt +++ b/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt @@ -3,21 +3,50 @@ package io.libp2p.pubsub.gossip import io.libp2p.core.InternalErrorException import io.libp2p.core.PeerId import io.libp2p.core.pubsub.ValidationResult -import io.libp2p.etc.types.anyComplete -import io.libp2p.etc.types.completedExceptionally -import io.libp2p.etc.types.copy -import io.libp2p.etc.types.createLRUMap -import io.libp2p.etc.types.median -import io.libp2p.etc.types.seconds -import io.libp2p.etc.types.toWBytes -import io.libp2p.etc.types.whenTrue +import io.libp2p.etc.types.* import io.libp2p.etc.util.P2PService import io.libp2p.pubsub.* +import org.apache.logging.log4j.LogManager import pubsub.pb.Rpc -import java.util.Optional +import java.util.* import java.util.concurrent.CompletableFuture +import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger +import kotlin.collections.Collection +import kotlin.collections.List +import kotlin.collections.MutableMap +import kotlin.collections.MutableSet +import kotlin.collections.any +import kotlin.collections.component1 +import kotlin.collections.component2 +import kotlin.collections.count +import kotlin.collections.distinct +import kotlin.collections.drop +import kotlin.collections.filter +import kotlin.collections.filterNot +import kotlin.collections.flatMap +import kotlin.collections.flatten +import kotlin.collections.forEach +import kotlin.collections.getOrPut +import kotlin.collections.isNotEmpty +import kotlin.collections.linkedMapOf +import kotlin.collections.map +import kotlin.collections.mapNotNull +import kotlin.collections.minus +import kotlin.collections.minusAssign +import kotlin.collections.mutableMapOf +import kotlin.collections.mutableSetOf +import kotlin.collections.none +import kotlin.collections.plus +import kotlin.collections.plusAssign +import kotlin.collections.reversed +import kotlin.collections.set +import kotlin.collections.shuffled +import kotlin.collections.sortedBy +import kotlin.collections.sum +import kotlin.collections.take +import kotlin.collections.toMutableSet import kotlin.math.max import kotlin.math.min @@ -26,6 +55,8 @@ const val MaxIAskedEntries = 256 const val MaxPeerIHaveEntries = 256 const val MaxIWantRequestsEntries = 10 * 1024 +typealias CurrentTimeSupplier = () -> Long + fun P2PService.PeerHandler.isOutbound() = streamHandler.stream.connection.isInitiator fun P2PService.PeerHandler.getPeerProtocol(): PubsubProtocol { @@ -37,15 +68,35 @@ fun P2PService.PeerHandler.getPeerProtocol(): PubsubProtocol { return PubsubProtocol.fromProtocol(proto) } +private val logger = LogManager.getLogger(GossipRouter::class.java) + /** * Router implementing this protocol: https://github.com/libp2p/specs/tree/master/pubsub/gossipsub */ -open class GossipRouter @JvmOverloads constructor( - val params: GossipParams = GossipParams(), - val scoreParams: GossipScoreParams = GossipScoreParams(), - override val protocol: PubsubProtocol = PubsubProtocol.Gossip_V_1_1, - subscriptionTopicSubscriptionFilter: TopicSubscriptionFilter = TopicSubscriptionFilter.AllowAllTopicSubscriptionFilter() -) : AbstractRouter(subscriptionTopicSubscriptionFilter, params.maxGossipMessageSize) { +open class GossipRouter( + val params: GossipParams, + val scoreParams: GossipScoreParams, + val currentTimeSupplier: CurrentTimeSupplier, + val random: Random, + val name: String, + val mCache: MCache, + val score: GossipScore, + + subscriptionTopicSubscriptionFilter: TopicSubscriptionFilter, + protocol: PubsubProtocol, + executor: ScheduledExecutorService, + messageFactory: PubsubMessageFactory, + seenMessages: SeenCache>, + messageValidator: PubsubRouterMessageValidator, +) : AbstractRouter( + executor, + protocol, + subscriptionTopicSubscriptionFilter, + params.maxGossipMessageSize, + messageFactory, + seenMessages, + messageValidator +) { // The idea behind choosing these specific default values for acceptRequestsWhitelist was // - from one side are pretty small and safe: peer unlikely be able to drop its score to `graylist` @@ -56,17 +107,10 @@ open class GossipRouter @JvmOverloads constructor( val acceptRequestsWhitelistMaxMessages = 128 val acceptRequestsWhitelistDuration = 1.seconds - val eventBroadcaster = GossipRouterEventBroadcaster() - open val score: GossipScore by lazy { - DefaultGossipScore(scoreParams, executor, curTimeMillis).also { - eventBroadcaster.listeners += it - } - } - val fanout: MutableMap> = linkedMapOf() val mesh: MutableMap> = linkedMapOf() + val eventBroadcaster = GossipRouterEventBroadcaster() - private val mCache = MCache(params.gossipSize, params.gossipHistoryLength) private val lastPublished = linkedMapOf() private var heartbeatsCount = 0 private val backoffExpireTimes = createLRUMap, Long>(MaxBackoffEntries) @@ -84,21 +128,17 @@ open class GossipRouter @JvmOverloads constructor( private val acceptRequestsWhitelist = mutableMapOf() override val pendingRpcParts = PendingRpcPartsMap { DefaultGossipRpcPartsQueue(params) } - override val seenMessages: SeenCache> by lazy { - TTLSeenCache(SimpleSeenCache(), params.seenTTL, curTimeMillis) - } - private fun setBackOff(peer: PeerHandler, topic: Topic) = setBackOff(peer, topic, params.pruneBackoff.toMillis()) private fun setBackOff(peer: PeerHandler, topic: Topic, delay: Long) { - backoffExpireTimes[peer.peerId to topic] = curTimeMillis() + delay + backoffExpireTimes[peer.peerId to topic] = currentTimeSupplier() + delay } private fun isBackOff(peer: PeerHandler, topic: Topic) = - curTimeMillis() < (backoffExpireTimes[peer.peerId to topic] ?: 0) + currentTimeSupplier() < (backoffExpireTimes[peer.peerId to topic] ?: 0) private fun isBackOffFlood(peer: PeerHandler, topic: Topic): Boolean { val expire = backoffExpireTimes[peer.peerId to topic] ?: return false - return curTimeMillis() < expire - (params.pruneBackoff + params.graftFloodThreshold).toMillis() + return currentTimeSupplier() < expire - (params.pruneBackoff + params.graftFloodThreshold).toMillis() } private fun getDirectPeers() = peers.filter(::isDirect) @@ -183,7 +223,7 @@ open class GossipRouter @JvmOverloads constructor( return true } - val curTime = curTimeMillis() + val curTime = currentTimeSupplier() val whitelistEntry = acceptRequestsWhitelist[peer] if (whitelistEntry != null && curTime <= whitelistEntry.whitelistedTill && @@ -332,7 +372,7 @@ open class GossipRouter @JvmOverloads constructor( } override fun broadcastOutbound(msg: PubsubMessage): CompletableFuture { - msg.topics.forEach { lastPublished[it] = curTimeMillis() } + msg.topics.forEach { lastPublished[it] = currentTimeSupplier() } val peers = if (params.floodPublish) { @@ -405,7 +445,7 @@ open class GossipRouter @JvmOverloads constructor( iAsked.clear() peerIHave.clear() - val staleIWantTime = this.curTimeMillis() - params.iWantFollowupTime.toMillis() + val staleIWantTime = this.currentTimeSupplier() - params.iWantFollowupTime.toMillis() iWantRequests.entries.removeIf { (key, time) -> (time < staleIWantTime) .whenTrue { notifyIWantTimeout(key.first, key.second) } @@ -478,7 +518,7 @@ open class GossipRouter @JvmOverloads constructor( emitGossip(topic, peers) } lastPublished.entries.removeIf { (topic, lastPub) -> - (curTimeMillis() - lastPub > params.fanoutTTL.toMillis()) + (currentTimeSupplier() - lastPub > params.fanoutTTL.toMillis()) .whenTrue { fanout.remove(topic) } } @@ -520,7 +560,7 @@ open class GossipRouter @JvmOverloads constructor( private fun iWant(peer: PeerHandler, messageIds: List) { if (messageIds.isEmpty()) return messageIds[random.nextInt(messageIds.size)] - .also { iWantRequests[peer to it] = curTimeMillis() } + .also { iWantRequests[peer to it] = currentTimeSupplier() } enqueueIwant(peer, messageIds) } diff --git a/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipRouterBuilder.kt b/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipRouterBuilder.kt new file mode 100644 index 000000000..ec79a40b4 --- /dev/null +++ b/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipRouterBuilder.kt @@ -0,0 +1,77 @@ +package io.libp2p.pubsub.gossip.builders + +import com.google.common.util.concurrent.ThreadFactoryBuilder +import io.libp2p.core.pubsub.ValidationResult +import io.libp2p.etc.types.lazyVar +import io.libp2p.pubsub.* +import io.libp2p.pubsub.gossip.* +import java.util.* +import java.util.concurrent.Executors +import java.util.concurrent.ScheduledExecutorService + +typealias GossipRouterEventsSubscriber = (GossipRouterEventListener) -> Unit +typealias GossipScoreFactory = + (GossipScoreParams, ScheduledExecutorService, CurrentTimeSupplier, GossipRouterEventsSubscriber) -> GossipScore + +open class GossipRouterBuilder( + + var name: String = "GossipRouter", + var protocol: PubsubProtocol = PubsubProtocol.Gossip_V_1_1, + + var params: GossipParams = GossipParams(), + var scoreParams: GossipScoreParams = GossipScoreParams(), + + var scheduledAsyncExecutor: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor( + ThreadFactoryBuilder().setDaemon(true).setNameFormat("GossipRouter-event-thread-%d").build() + ), + var currentTimeSuppluer: CurrentTimeSupplier = { System.currentTimeMillis() }, + var random: Random = Random(), + + var messageFactory: PubsubMessageFactory = { DefaultPubsubMessage(it) }, + var messageValidator: PubsubRouterMessageValidator = NOP_ROUTER_VALIDATOR, + + var subscriptionTopicSubscriptionFilter: TopicSubscriptionFilter = TopicSubscriptionFilter.AllowAllTopicSubscriptionFilter(), + + var scoreFactory: GossipScoreFactory = + { scoreParams1, scheduledAsyncRxecutor, currentTimeSuppluer1, eventsSubscriber -> + val gossipScore = DefaultGossipScore(scoreParams1, scheduledAsyncRxecutor, currentTimeSuppluer1) + eventsSubscriber(gossipScore) + gossipScore + }, + val gossipRouterEventListeners: MutableList = mutableListOf() +) { + + var seenCache: SeenCache> by lazyVar { TTLSeenCache(SimpleSeenCache(), params.seenTTL, currentTimeSuppluer) } + var mCache: MCache by lazyVar { MCache(params.gossipSize, params.gossipHistoryLength) } + + private var disposed = false + + protected fun createGossipRouter(): GossipRouter { + val gossipScore = scoreFactory(scoreParams, scheduledAsyncExecutor, currentTimeSuppluer, { gossipRouterEventListeners += it }) + + val router = GossipRouter( + params = params, + scoreParams = scoreParams, + currentTimeSupplier = currentTimeSuppluer, + random = random, + name = name, + mCache = mCache, + score = gossipScore, + subscriptionTopicSubscriptionFilter = subscriptionTopicSubscriptionFilter, + protocol = protocol, + executor = scheduledAsyncExecutor, + messageFactory = messageFactory, + seenMessages = seenCache, + messageValidator = messageValidator + ) + + router.eventBroadcaster.listeners += gossipRouterEventListeners + return router + } + + open fun build(): GossipRouter { + if (disposed) throw RuntimeException("The builder was already used") + disposed = true + return createGossipRouter() + } +} diff --git a/src/test/java/io/libp2p/pubsub/GossipApiTest.java b/src/test/java/io/libp2p/pubsub/GossipApiTest.java index 65249fb53..7f7de2bdf 100644 --- a/src/test/java/io/libp2p/pubsub/GossipApiTest.java +++ b/src/test/java/io/libp2p/pubsub/GossipApiTest.java @@ -7,6 +7,7 @@ import io.libp2p.pubsub.gossip.GossipParams; import io.libp2p.pubsub.gossip.GossipParamsKt; import io.libp2p.pubsub.gossip.GossipRouter; +import io.libp2p.pubsub.gossip.builders.GossipRouterBuilder; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Test; import pubsub.pb.Rpc; @@ -32,7 +33,9 @@ public void createGossipTest() { .D(10) .DHigh(20) .build(); - GossipRouter router = new GossipRouter(gossipParams); + GossipRouterBuilder routerBuilder = new GossipRouterBuilder(); + routerBuilder.setParams(gossipParams); + GossipRouter router = routerBuilder.build(); assertThat(router.getParams().getD()).isEqualTo(10); assertThat(router.getParams().getDHigh()).isEqualTo(20); assertThat(router.getParams().getDScore()).isEqualTo(GossipParamsKt.defaultDScore(10)); @@ -40,22 +43,16 @@ public void createGossipTest() { @Test public void testFastMessageId() throws Exception { - GossipRouter router = new GossipRouter() { - private final SeenCache> seenMessages = - new FastIdSeenCache<>(msg -> msg.getProtobufMessage().getData()); - - @NotNull - @Override - protected SeenCache> getSeenMessages() { - return seenMessages; - } - }; List createdMessages = new ArrayList<>(); - router.setMessageFactory(m -> { + + GossipRouterBuilder routerBuilder = new GossipRouterBuilder(); + routerBuilder.setSeenCache(new FastIdSeenCache<>(msg -> msg.getProtobufMessage().getData())); + routerBuilder.setMessageFactory(m -> { TestPubsubMessage message = new TestPubsubMessage(m); createdMessages.add(message); return message; }); + GossipRouter router = routerBuilder.build(); router.subscribe("topic"); BlockingQueue messages = new LinkedBlockingQueue<>(); diff --git a/src/test/kotlin/io/libp2p/pubsub/DeterministicFuzz.kt b/src/test/kotlin/io/libp2p/pubsub/DeterministicFuzz.kt index a04bb7332..80dc9722a 100644 --- a/src/test/kotlin/io/libp2p/pubsub/DeterministicFuzz.kt +++ b/src/test/kotlin/io/libp2p/pubsub/DeterministicFuzz.kt @@ -3,12 +3,17 @@ package io.libp2p.pubsub import io.libp2p.core.crypto.KEY_TYPE import io.libp2p.core.crypto.generateKeyPair import io.libp2p.etc.types.lazyVar +import io.libp2p.pubsub.flood.FloodRouter +import io.libp2p.pubsub.gossip.CurrentTimeSupplier +import io.libp2p.pubsub.gossip.builders.GossipRouterBuilder import io.libp2p.tools.schedulers.ControlledExecutorServiceImpl import io.libp2p.tools.schedulers.TimeControllerImpl import java.security.SecureRandom import java.util.Random import java.util.concurrent.ScheduledExecutorService +typealias DeterministicFuzzRouterFactory = (ScheduledExecutorService, CurrentTimeSupplier, Random) -> PubsubRouterDebug + class DeterministicFuzz { var cnt = 0 @@ -19,19 +24,42 @@ class DeterministicFuzz { fun createControlledExecutor(): ScheduledExecutorService = ControlledExecutorServiceImpl().also { it.setTimeController(timeController) } - fun createTestRouter( - routerInstance: PubsubRouterDebug, - protocol: PubsubProtocol = routerInstance.protocol - ): TestRouter { - routerInstance.curTimeMillis = { timeController.time } - routerInstance.random = this.random - val testRouter = TestRouter("" + (cnt++), protocol.announceStr).also { + fun createTestGossipRouter(gossipRouterBuilder: () -> GossipRouterBuilder): TestRouter = + createTestRouter(createGossipFuzzRouterFactory(gossipRouterBuilder)) + + fun createMockRouter() = createTestRouter(createMockFuzzRouterFactory()) + fun createFloodRouter() = createTestRouter(createFloodFuzzRouterFactory()) + + fun createTestRouter(routerCtor: DeterministicFuzzRouterFactory): TestRouter { + val deterministicExecutor = createControlledExecutor() + val router = routerCtor(deterministicExecutor, { timeController.time }, random) + + return TestRouter("" + (cnt++), router).apply { val randomBytes = ByteArray(8) random.nextBytes(randomBytes) - it.keyPair = generateKeyPair(KEY_TYPE.ECDSA, random = SecureRandom(randomBytes)) + keyPair = generateKeyPair(KEY_TYPE.ECDSA, random = SecureRandom(randomBytes)) + testExecutor = deterministicExecutor } - testRouter.routerInstance = routerInstance - testRouter.testExecutor = createControlledExecutor() - return testRouter + } + + companion object { + fun createGossipFuzzRouterFactory(routerBuilderFactory: () -> GossipRouterBuilder): DeterministicFuzzRouterFactory = + { executor, curTime, random -> + routerBuilderFactory().also { + it.scheduledAsyncExecutor = executor + it.currentTimeSuppluer = curTime + it.random = random + }.build() + } + + fun createMockFuzzRouterFactory(): DeterministicFuzzRouterFactory = + { executor, _, _ -> + MockRouter(executor) + } + + fun createFloodFuzzRouterFactory(): DeterministicFuzzRouterFactory = + { executor, _, _ -> + FloodRouter(executor) + } } } diff --git a/src/test/kotlin/io/libp2p/pubsub/GoInteropTest.kt b/src/test/kotlin/io/libp2p/pubsub/GoInteropTest.kt index f9f173d4c..53f7f801b 100644 --- a/src/test/kotlin/io/libp2p/pubsub/GoInteropTest.kt +++ b/src/test/kotlin/io/libp2p/pubsub/GoInteropTest.kt @@ -28,7 +28,7 @@ import io.libp2p.mux.mplex.MplexStreamMuxer import io.libp2p.protocol.Identify import io.libp2p.protocol.Ping import io.libp2p.pubsub.gossip.Gossip -import io.libp2p.pubsub.gossip.GossipRouter +import io.libp2p.pubsub.gossip.builders.GossipRouterBuilder import io.libp2p.security.secio.SecIoSecureChannel import io.libp2p.tools.P2pdRunner import io.libp2p.tools.TestChannel @@ -117,15 +117,12 @@ class GoInteropTest { println("Local peerID: " + PeerId.fromPubKey(pubKey1).toBase58()) - val gossipRouter = GossipRouter().also { - it.messageValidator = SIGNATURE_ROUTER_VALIDATOR - } - val pubsubApi = createPubsubApi(gossipRouter) + val gossipRouter1 = GossipRouterBuilder().build() + val pubsubApi = createPubsubApi(gossipRouter1) val publisher = pubsubApi.createPublisher(privKey1, 8888) - val gossip = GossipProtocol(gossipRouter).also { + val gossip = GossipProtocol(gossipRouter1).also { it.debugGossipHandler = LoggingHandler("#4", LogLevel.INFO) - it.router.messageValidator = NOP_ROUTER_VALIDATOR } val applicationProtocols = listOf(ProtocolBinding.createSimple("/meshsub/1.0.0", gossip), Identify()) diff --git a/src/test/kotlin/io/libp2p/pubsub/MockRouter.kt b/src/test/kotlin/io/libp2p/pubsub/MockRouter.kt index 6e443009d..d214fd7bb 100644 --- a/src/test/kotlin/io/libp2p/pubsub/MockRouter.kt +++ b/src/test/kotlin/io/libp2p/pubsub/MockRouter.kt @@ -4,12 +4,19 @@ import pubsub.pb.Rpc import java.util.concurrent.BlockingQueue import java.util.concurrent.CompletableFuture import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit import java.util.concurrent.TimeoutException -open class MockRouter( - override val protocol: PubsubProtocol = PubsubProtocol.Gossip_V_1_1 -) : AbstractRouter(TopicSubscriptionFilter.AllowAllTopicSubscriptionFilter()) { +open class MockRouter(executor: ScheduledExecutorService) : AbstractRouter( + protocol = PubsubProtocol.Floodsub, + executor = executor, + subscriptionFilter = TopicSubscriptionFilter.AllowAllTopicSubscriptionFilter(), + maxMsgSize = Int.MAX_VALUE, + messageFactory = { DefaultPubsubMessage(it) }, + seenMessages = LRUSeenCache(SimpleSeenCache(), 1000), + messageValidator = NOP_ROUTER_VALIDATOR +) { val inboundMessages: BlockingQueue = LinkedBlockingQueue() diff --git a/src/test/kotlin/io/libp2p/pubsub/PubsubApiTest.kt b/src/test/kotlin/io/libp2p/pubsub/PubsubApiTest.kt index ebf06a22a..9c54a8892 100644 --- a/src/test/kotlin/io/libp2p/pubsub/PubsubApiTest.kt +++ b/src/test/kotlin/io/libp2p/pubsub/PubsubApiTest.kt @@ -7,7 +7,6 @@ import io.libp2p.core.pubsub.createPubsubApi import io.libp2p.etc.types.toByteArray import io.libp2p.etc.types.toByteBuf import io.libp2p.etc.types.toLongBigEndian -import io.libp2p.pubsub.flood.FloodRouter import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions.assertArrayEquals import org.junit.jupiter.api.Assertions.assertEquals @@ -23,9 +22,9 @@ class PubsubApiTest { @Test fun testNoFromOrSeqNoMessageField() { val fuzz = DeterministicFuzz() - val router1 = fuzz.createTestRouter(FloodRouter()) + val router1 = fuzz.createFloodRouter() val api1 = createPubsubApi(router1.router) - val router2 = fuzz.createTestRouter(FloodRouter()) + val router2 = fuzz.createFloodRouter() val api2 = createPubsubApi(router2.router) router1.connectSemiDuplex(router2) @@ -56,9 +55,9 @@ class PubsubApiTest { @Test fun testNoSenderPrivateKey() { val fuzz = DeterministicFuzz() - val router1 = fuzz.createTestRouter(FloodRouter()) + val router1 = fuzz.createFloodRouter() val api1 = createPubsubApi(router1.router) - val router2 = fuzz.createTestRouter(FloodRouter()) + val router2 = fuzz.createFloodRouter() router1.connectSemiDuplex(router2) @@ -83,9 +82,9 @@ class PubsubApiTest { @Test fun testPublishExt() { val fuzz = DeterministicFuzz() - val router1 = fuzz.createTestRouter(FloodRouter()) + val router1 = fuzz.createFloodRouter() val api1 = createPubsubApi(router1.router) - val router2 = fuzz.createTestRouter(FloodRouter()) + val router2 = fuzz.createFloodRouter() router1.connectSemiDuplex(router2) diff --git a/src/test/kotlin/io/libp2p/pubsub/PubsubRouterTest.kt b/src/test/kotlin/io/libp2p/pubsub/PubsubRouterTest.kt index 26b2a526b..af08bca7c 100644 --- a/src/test/kotlin/io/libp2p/pubsub/PubsubRouterTest.kt +++ b/src/test/kotlin/io/libp2p/pubsub/PubsubRouterTest.kt @@ -1,20 +1,13 @@ package io.libp2p.pubsub -import io.libp2p.core.Stream -import io.libp2p.core.pubsub.MessageApi -import io.libp2p.core.pubsub.RESULT_INVALID -import io.libp2p.core.pubsub.RESULT_VALID -import io.libp2p.core.pubsub.Subscriber +import io.libp2p.core.pubsub.* import io.libp2p.core.pubsub.Topic -import io.libp2p.core.pubsub.ValidationResult -import io.libp2p.core.pubsub.Validator import io.libp2p.etc.types.seconds import io.libp2p.etc.types.toByteBuf import io.libp2p.etc.types.toBytesBigEndian import io.libp2p.etc.types.toProtobuf import io.libp2p.pubsub.gossip.GossipRouter import io.libp2p.tools.TestChannel.TestConnection -import io.libp2p.transport.implementation.P2PChannelOverNetty import io.netty.handler.logging.LogLevel import io.netty.util.ResourceLeakDetector import org.junit.jupiter.api.Assertions @@ -27,9 +20,7 @@ import java.util.concurrent.TimeUnit typealias RouterCtor = () -> PubsubRouterDebug -fun Stream.nettyChannel() = (this as P2PChannelOverNetty).nettyChannel - -abstract class PubsubRouterTest(val router: RouterCtor) { +abstract class PubsubRouterTest(val routerFactory: DeterministicFuzzRouterFactory) { init { ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID) } @@ -47,8 +38,8 @@ abstract class PubsubRouterTest(val router: RouterCtor) { fun Fanout() { val fuzz = DeterministicFuzz() - val router1 = fuzz.createTestRouter(router()) - val router2 = fuzz.createTestRouter(router()) + val router1 = fuzz.createTestRouter(routerFactory) + val router2 = fuzz.createTestRouter(routerFactory) router2.router.subscribe("topic1") router1.connectSemiDuplex(router2, LogLevel.ERROR, LogLevel.ERROR) @@ -65,8 +56,8 @@ abstract class PubsubRouterTest(val router: RouterCtor) { fun testDoubleConnect() { val fuzz = DeterministicFuzz() - val router1 = fuzz.createTestRouter(router()) - val router2 = fuzz.createTestRouter(router()) + val router1 = fuzz.createTestRouter(routerFactory) + val router2 = fuzz.createTestRouter(routerFactory) router2.router.subscribe("topic1") router1.connectSemiDuplex(router2, LogLevel.ERROR, LogLevel.ERROR) @@ -84,10 +75,10 @@ abstract class PubsubRouterTest(val router: RouterCtor) { fun testUnsubscribe() { val fuzz = DeterministicFuzz() - val router1 = fuzz.createTestRouter(router()) - val router2 = fuzz.createTestRouter(router()) - val router3 = fuzz.createTestRouter(router()) - val router4 = fuzz.createTestRouter(router()) + val router1 = fuzz.createTestRouter(routerFactory) + val router2 = fuzz.createTestRouter(routerFactory) + val router3 = fuzz.createTestRouter(routerFactory) + val router4 = fuzz.createTestRouter(routerFactory) router1.router.subscribe("topic1") router1.router.subscribe("topic2") router2.router.subscribe("topic1") @@ -136,9 +127,9 @@ abstract class PubsubRouterTest(val router: RouterCtor) { fun scenario2() { val fuzz = DeterministicFuzz() - val router1 = fuzz.createTestRouter(router()) - val router2 = fuzz.createTestRouter(router()) - val router3 = fuzz.createTestRouter(router()) + val router1 = fuzz.createTestRouter(routerFactory) + val router2 = fuzz.createTestRouter(routerFactory) + val router3 = fuzz.createTestRouter(routerFactory) val conn_1_2 = router1.connectSemiDuplex(router2, pubsubLogs = LogLevel.ERROR) val conn_2_3 = router2.connectSemiDuplex(router3, pubsubLogs = LogLevel.ERROR) @@ -191,17 +182,17 @@ abstract class PubsubRouterTest(val router: RouterCtor) { conn_1_2.disconnect() } - // scenario3_StarTopology { GossipRouter().withDConstants(3, 3, 100) } + // scenario3_StarTopology { Gossiprouter.withDConstants(3, 3, 100) } @Test fun StarTopology() { val fuzz = DeterministicFuzz() val allRouters = mutableListOf() - val routerCenter = fuzz.createTestRouter(router()) + val routerCenter = fuzz.createTestRouter(routerFactory) allRouters += routerCenter for (i in 1..20) { - val routerEnd = fuzz.createTestRouter(router()) + val routerEnd = fuzz.createTestRouter(routerFactory) allRouters += routerEnd routerEnd.connectSemiDuplex(routerCenter, pubsubLogs = LogLevel.ERROR) } @@ -235,10 +226,10 @@ abstract class PubsubRouterTest(val router: RouterCtor) { val allRouters = mutableListOf() val allConnections = mutableListOf() - val routerCenter = fuzz.createTestRouter(router()) + val routerCenter = fuzz.createTestRouter(routerFactory) allRouters += routerCenter for (i in 1..20) { - val routerEnd = fuzz.createTestRouter(router()) + val routerEnd = fuzz.createTestRouter(routerFactory) allRouters += routerEnd allConnections += routerEnd.connectSemiDuplex(routerCenter).connections } @@ -288,7 +279,7 @@ abstract class PubsubRouterTest(val router: RouterCtor) { doTenNeighborsTopology() } - fun doTenNeighborsTopology(randomSeed: Int = 0, routerFactory: RouterCtor = router) { + fun doTenNeighborsTopology(randomSeed: Int = 0, routerFactory: DeterministicFuzzRouterFactory = this.routerFactory) { val fuzz = DeterministicFuzz().also { it.randomSeed = randomSeed.toLong() } @@ -300,7 +291,7 @@ abstract class PubsubRouterTest(val router: RouterCtor) { val neighboursCount = 10 for (i in 0 until nodesCount) { - val routerEnd = fuzz.createTestRouter(routerFactory()) + val routerEnd = fuzz.createTestRouter(routerFactory) allRouters += routerEnd } for (i in 0 until nodesCount) { @@ -377,13 +368,13 @@ abstract class PubsubRouterTest(val router: RouterCtor) { fun PublishFuture() { val fuzz = DeterministicFuzz() - val router1 = fuzz.createTestRouter(router()) + val router1 = fuzz.createTestRouter(routerFactory) val msg0 = newMessage("topic1", 0L, "Hello".toByteArray()) val publishFut0 = router1.router.publish(msg0) Assertions.assertThrows(ExecutionException::class.java, { publishFut0.get() }) - val router2 = fuzz.createTestRouter(router()) + val router2 = fuzz.createTestRouter(routerFactory) router2.router.subscribe("topic1") router1.connectSemiDuplex(router2, LogLevel.ERROR, LogLevel.ERROR) @@ -401,12 +392,12 @@ abstract class PubsubRouterTest(val router: RouterCtor) { fun validateTest() { val fuzz = DeterministicFuzz() - val routers = List(3) { fuzz.createTestRouter(router()) } + val routers = List(3) { fuzz.createTestRouter(routerFactory) } routers[0].connectSemiDuplex(routers[1], pubsubLogs = LogLevel.ERROR) routers[1].connectSemiDuplex(routers[2], pubsubLogs = LogLevel.ERROR) - val apis = routers.map { it.api } + val apis = routers.map { createPubsubApi(it.router) } class RecordingSubscriber : Subscriber { var count = 0 override fun accept(t: MessageApi) { diff --git a/src/test/kotlin/io/libp2p/pubsub/TestRouter.kt b/src/test/kotlin/io/libp2p/pubsub/TestRouter.kt index 066d84a25..cf8fd7e7e 100644 --- a/src/test/kotlin/io/libp2p/pubsub/TestRouter.kt +++ b/src/test/kotlin/io/libp2p/pubsub/TestRouter.kt @@ -5,12 +5,10 @@ import io.libp2p.core.crypto.KEY_TYPE import io.libp2p.core.crypto.generateKeyPair import io.libp2p.core.pubsub.RESULT_VALID import io.libp2p.core.pubsub.ValidationResult -import io.libp2p.core.pubsub.createPubsubApi import io.libp2p.core.security.SecureChannel import io.libp2p.etc.PROTOCOL import io.libp2p.etc.types.lazyVar import io.libp2p.etc.util.netty.nettyInitializer -import io.libp2p.pubsub.flood.FloodRouter import io.libp2p.tools.NullTransport import io.libp2p.tools.TestChannel import io.libp2p.tools.TestChannel.TestConnection @@ -35,29 +33,27 @@ class SemiduplexConnection(val conn1: TestConnection, val conn2: TestConnection) } } -class TestRouter(val name: String = "" + cnt.getAndIncrement(), val protocol: String = "/test/undefined") { +class TestRouter( + val name: String = "" + cnt.getAndIncrement(), + val router: PubsubRouterDebug +) { val inboundMessages = LinkedBlockingQueue() var handlerValidationResult = RESULT_VALID - var routerHandler: (PubsubMessage) -> CompletableFuture = { + val routerHandler: (PubsubMessage) -> CompletableFuture = { inboundMessages += it handlerValidationResult } var testExecutor: ScheduledExecutorService by lazyVar { Executors.newSingleThreadScheduledExecutor() } - var routerInstance: PubsubRouterDebug by lazyVar { FloodRouter() } - var router by lazyVar { - routerInstance.also { - it.initHandler(routerHandler) - it.executor = testExecutor - it.name = name - } - } - var api by lazyVar { createPubsubApi(router) } - var keyPair = generateKeyPair(KEY_TYPE.ECDSA) val peerId by lazy { PeerId.fromPubKey(keyPair.second) } + val protocol = router.protocol.announceStr + + init { + router.initHandler(routerHandler) + } private fun newChannel( channelName: String, diff --git a/src/test/kotlin/io/libp2p/pubsub/flood/FloodPubsubRouterTest.kt b/src/test/kotlin/io/libp2p/pubsub/flood/FloodPubsubRouterTest.kt index 504e21312..78d1fb8b7 100644 --- a/src/test/kotlin/io/libp2p/pubsub/flood/FloodPubsubRouterTest.kt +++ b/src/test/kotlin/io/libp2p/pubsub/flood/FloodPubsubRouterTest.kt @@ -1,5 +1,6 @@ package io.libp2p.pubsub.flood +import io.libp2p.pubsub.DeterministicFuzz import io.libp2p.pubsub.PubsubRouterTest -class FloodPubsubRouterTest : PubsubRouterTest(::FloodRouter) +class FloodPubsubRouterTest : PubsubRouterTest(DeterministicFuzz.createFloodFuzzRouterFactory()) diff --git a/src/test/kotlin/io/libp2p/pubsub/gossip/GossipBackwardCompatibilityTest.kt b/src/test/kotlin/io/libp2p/pubsub/gossip/GossipBackwardCompatibilityTest.kt index a6297d171..530318200 100644 --- a/src/test/kotlin/io/libp2p/pubsub/gossip/GossipBackwardCompatibilityTest.kt +++ b/src/test/kotlin/io/libp2p/pubsub/gossip/GossipBackwardCompatibilityTest.kt @@ -1,12 +1,13 @@ package io.libp2p.pubsub.gossip import io.libp2p.pubsub.PubsubProtocol +import io.libp2p.pubsub.gossip.builders.GossipRouterBuilder import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test class GossipBackwardCompatibilityTest : TwoGossipHostTestBase() { - override val router1 = GossipRouter(protocol = PubsubProtocol.Gossip_V_1_0) - override val router2 = GossipRouter(protocol = PubsubProtocol.Gossip_V_1_1) + override val router1 = GossipRouterBuilder(protocol = PubsubProtocol.Gossip_V_1_0).build() + override val router2 = GossipRouterBuilder(protocol = PubsubProtocol.Gossip_V_1_1).build() @Test fun testConnect_1_0_to_1_1() { diff --git a/src/test/kotlin/io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt b/src/test/kotlin/io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt index 188043cc9..b63fcb2af 100644 --- a/src/test/kotlin/io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt +++ b/src/test/kotlin/io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt @@ -4,10 +4,12 @@ import io.libp2p.core.pubsub.RESULT_IGNORE import io.libp2p.etc.types.seconds import io.libp2p.etc.types.toProtobuf import io.libp2p.pubsub.DeterministicFuzz +import io.libp2p.pubsub.DeterministicFuzz.Companion.createGossipFuzzRouterFactory import io.libp2p.pubsub.MockRouter import io.libp2p.pubsub.PubsubRouterTest import io.libp2p.pubsub.TestRouter import io.libp2p.pubsub.gossip.builders.GossipPeerScoreParamsBuilder +import io.libp2p.pubsub.gossip.builders.GossipRouterBuilder import io.libp2p.pubsub.gossip.builders.GossipScoreParamsBuilder import io.libp2p.tools.TestLogAppender import io.netty.handler.logging.LogLevel @@ -18,23 +20,24 @@ import pubsub.pb.Rpc import java.time.Duration import java.util.concurrent.TimeUnit -class GossipPubsubRouterTest : PubsubRouterTest({ - GossipRouter( - GossipParams(3, 3, 100, floodPublish = false) - ) -}) { +class GossipPubsubRouterTest : PubsubRouterTest( + createGossipFuzzRouterFactory { + GossipRouterBuilder(params = GossipParams(3, 3, 100, floodPublish = false)) + } +) { @Test override fun TenNeighborsTopology() { for (d in 3..6) { for (seed in 0..10) { print("D=$d, seed=$seed ") - super.doTenNeighborsTopology(seed) { - GossipRouter( + super.doTenNeighborsTopology( + seed, + createGossipFuzzRouterFactory { // small backoff timeout for faster meshes settling down - GossipParams(d, d, d, DLazy = 100, pruneBackoff = 1.seconds) - ) - } + GossipRouterBuilder(params = GossipParams(d, d, d, DLazy = 100, pruneBackoff = 1.seconds)) + } + ) } } } @@ -47,8 +50,8 @@ class GossipPubsubRouterTest : PubsubRouterTest({ val otherCount = 5 for (i in 1..otherCount) { - val r = GossipRouter(GossipParams(1, 0)) - val routerEnd = fuzz.createTestRouter(r) + val r = { GossipRouterBuilder(params = GossipParams(1, 0)) } + val routerEnd = fuzz.createTestGossipRouter(r) allRouters += routerEnd } @@ -56,8 +59,8 @@ class GossipPubsubRouterTest : PubsubRouterTest({ // this is to test ihave/iwant fuzz.timeController.addTime(Duration.ofMillis(1)) - val r = GossipRouter(GossipParams(3, 3, 3, DOut = 0, DLazy = 1000, floodPublish = false)) - val routerCenter = fuzz.createTestRouter(r) + val r = { GossipRouterBuilder(params = GossipParams(3, 3, 3, DOut = 0, DLazy = 1000, floodPublish = false)) } + val routerCenter = fuzz.createTestGossipRouter(r) allRouters.add(0, routerCenter) for (i in 1..otherCount) { @@ -117,8 +120,8 @@ class GossipPubsubRouterTest : PubsubRouterTest({ // shouldn't be treated as internal error and no WARN logs should be printed val fuzz = DeterministicFuzz() - val router1 = fuzz.createTestRouter(MockRouter()) - val router2 = fuzz.createTestRouter(router()) + val router1 = fuzz.createMockRouter() + val router2 = fuzz.createTestRouter(routerFactory) val mockRouter = router1.router as MockRouter router2.router.subscribe("topic1") @@ -144,9 +147,9 @@ class GossipPubsubRouterTest : PubsubRouterTest({ // of gossip peers is yet 'partially' connected val fuzz = DeterministicFuzz() - val router1 = fuzz.createTestRouter(MockRouter()) - val router2 = fuzz.createTestRouter(router()) - val router3 = fuzz.createTestRouter(router()) + val router1 = fuzz.createMockRouter() + val router2 = fuzz.createTestRouter(routerFactory) + val router3 = fuzz.createTestRouter(routerFactory) val mockRouter = router1.router as MockRouter router2.router.subscribe("topic1") @@ -188,13 +191,13 @@ class GossipPubsubRouterTest : PubsubRouterTest({ // shouldn't be treated as internal error and no WARN logs should be printed val fuzz = DeterministicFuzz() - val router1 = fuzz.createTestRouter(MockRouter()) + val router1 = fuzz.createMockRouter() // when isDirect the gossip router should reply with PRUNE to GRAFT // this would reproduce the case val gossipScoreParams = GossipScoreParams(GossipPeerScoreParams(isDirect = { true })) - val router2 = fuzz.createTestRouter(GossipRouter(scoreParams = gossipScoreParams)) + val router2 = fuzz.createTestGossipRouter { GossipRouterBuilder(scoreParams = gossipScoreParams) } val mockRouter = router1.router as MockRouter router2.router.subscribe("topic1") @@ -228,8 +231,8 @@ class GossipPubsubRouterTest : PubsubRouterTest({ val allCount = 20 val allRouters = (1..allCount).map { - val r = GossipRouter(Eth2DefaultGossipParams, gossipScoreParams) - fuzz.createTestRouter(r) + val r = { GossipRouterBuilder(params = Eth2DefaultGossipParams, scoreParams = gossipScoreParams) } + fuzz.createTestRouter(createGossipFuzzRouterFactory(r)) } val senderRouter = allRouters[0] diff --git a/src/test/kotlin/io/libp2p/pubsub/gossip/GossipRouterListLimitsTest.kt b/src/test/kotlin/io/libp2p/pubsub/gossip/GossipRouterListLimitsTest.kt index df3c59a1c..d29ec1e53 100644 --- a/src/test/kotlin/io/libp2p/pubsub/gossip/GossipRouterListLimitsTest.kt +++ b/src/test/kotlin/io/libp2p/pubsub/gossip/GossipRouterListLimitsTest.kt @@ -1,6 +1,7 @@ package io.libp2p.pubsub.gossip import io.libp2p.pubsub.gossip.builders.GossipParamsBuilder +import io.libp2p.pubsub.gossip.builders.GossipRouterBuilder import io.libp2p.tools.protobuf.RpcBuilder import org.assertj.core.api.Assertions import org.junit.jupiter.api.Test @@ -31,260 +32,218 @@ class GossipRouterListLimitsTest { .maxIHaveLength(Int.MAX_VALUE) .build() + private val routerWithLimits = GossipRouterBuilder(params = gossipParamsWithLimits).build() + private val routerWithNoLimits = GossipRouterBuilder(params = gossipParamsNoLimits).build() + @Test fun validateProtobufLists_validMessage() { - val router = GossipRouter(gossipParamsWithLimits) val msg = fullMsgBuilder().build() - Assertions.assertThat(router.validateMessageListLimits(msg)).isTrue() + Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isTrue() } @Test fun validateProtobufLists_validMessageWithLargeLists_noLimits() { - val router = GossipRouter(gossipParamsNoLimits) val msg = fullMsgBuilder(20).build() - Assertions.assertThat(router.validateMessageListLimits(msg)).isTrue() + Assertions.assertThat(routerWithNoLimits.validateMessageListLimits(msg)).isTrue() } @Test fun validateProtobufLists_smallValidMessage_noLimits() { - val router = GossipRouter(gossipParamsNoLimits) val msg = fullMsgBuilder().build() - Assertions.assertThat(router.validateMessageListLimits(msg)).isTrue() + Assertions.assertThat(routerWithNoLimits.validateMessageListLimits(msg)).isTrue() } @Test fun validateProtobufLists_multipleListsAtMaxSize() { - val router = GossipRouter(gossipParamsWithLimits) - val builder = fullMsgBuilder() builder.addSubscriptions(maxSubscriptions - 1) builder.addPublishMessages(maxPublishedMessages - 1, 1) val msg = builder.build() - Assertions.assertThat(router.validateMessageListLimits(msg)).isTrue() + Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isTrue() } @Test fun validateProtobufLists_tooManySubscriptions() { - val router = GossipRouter(gossipParamsWithLimits) - val builder = fullMsgBuilder() builder.addSubscriptions(maxSubscriptions) val msg = builder.build() - Assertions.assertThat(router.validateMessageListLimits(msg)).isFalse() + Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isFalse() } @Test fun validateProtobufLists_tooManyPublishMessages() { - val router = GossipRouter(gossipParamsWithLimits) - val builder = fullMsgBuilder() builder.addPublishMessages(maxPublishedMessages, 1) val msg = builder.build() - Assertions.assertThat(router.validateMessageListLimits(msg)).isFalse() + Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isFalse() } @Test fun validateProtobufLists_tooManyPublishMessageTopics() { - val router = GossipRouter(gossipParamsWithLimits) - val builder = fullMsgBuilder() builder.addPublishMessages(1, maxTopicsPerPublishedMessage + 1) val msg = builder.build() - Assertions.assertThat(router.validateMessageListLimits(msg)).isFalse() + Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isFalse() } @Test fun validateProtobufLists_tooManyIHaves() { - val router = GossipRouter(gossipParamsWithLimits) - val builder = fullMsgBuilder() builder.addIHaves(maxIHaveLength, 1) val msg = builder.build() - Assertions.assertThat(router.validateMessageListLimits(msg)).isFalse() + Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isFalse() } @Test fun validateProtobufLists_tooManyIHaveMsgIds() { - val router = GossipRouter(gossipParamsWithLimits) - val builder = fullMsgBuilder() builder.addIHaves(1, maxIHaveLength) val msg = builder.build() - Assertions.assertThat(router.validateMessageListLimits(msg)).isFalse() + Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isFalse() } @Test fun validateProtobufLists_tooManyIWants() { - val router = GossipRouter(gossipParamsWithLimits) - val builder = fullMsgBuilder() builder.addIWants(maxIWantMessageIds, 1) val msg = builder.build() - Assertions.assertThat(router.validateMessageListLimits(msg)).isFalse() + Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isFalse() } @Test fun validateProtobufLists_tooManyIWantMsgIds() { - val router = GossipRouter(gossipParamsWithLimits) - val builder = fullMsgBuilder() builder.addIWants(1, maxIWantMessageIds) val msg = builder.build() - Assertions.assertThat(router.validateMessageListLimits(msg)).isFalse() + Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isFalse() } @Test fun validateProtobufLists_tooManyGrafts() { - val router = GossipRouter(gossipParamsWithLimits) - val builder = fullMsgBuilder() builder.addGrafts(maxGraftMessages) val msg = builder.build() - Assertions.assertThat(router.validateMessageListLimits(msg)).isFalse() + Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isFalse() } @Test fun validateProtobufLists_tooManyPrunes() { - val router = GossipRouter(gossipParamsWithLimits) - val builder = fullMsgBuilder() builder.addPrunes(maxPruneMessages, 1) val msg = builder.build() - Assertions.assertThat(router.validateMessageListLimits(msg)).isFalse() + Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isFalse() } @Test fun validateProtobufLists_tooManyPrunePeers() { - val router = GossipRouter(gossipParamsWithLimits) - val builder = fullMsgBuilder() builder.addPrunes(1, maxPeersPerPruneMessage + 1) val msg = builder.build() - Assertions.assertThat(router.validateMessageListLimits(msg)).isFalse() + Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isFalse() } @Test fun validateProtobufLists_maxSubscriptions() { - val router = GossipRouter(gossipParamsWithLimits) - val builder = fullMsgBuilder() builder.addSubscriptions(maxSubscriptions - 1) val msg = builder.build() - Assertions.assertThat(router.validateMessageListLimits(msg)).isTrue() + Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isTrue() } @Test fun validateProtobufLists_maxPublishMessages() { - val router = GossipRouter(gossipParamsWithLimits) - val builder = fullMsgBuilder() builder.addPublishMessages(maxPublishedMessages - 1, 1) val msg = builder.build() - Assertions.assertThat(router.validateMessageListLimits(msg)).isTrue() + Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isTrue() } @Test fun validateProtobufLists_maxPublishMessageTopics() { - val router = GossipRouter(gossipParamsWithLimits) - val builder = fullMsgBuilder() builder.addPublishMessages(1, maxTopicsPerPublishedMessage) val msg = builder.build() - Assertions.assertThat(router.validateMessageListLimits(msg)).isTrue() + Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isTrue() } @Test fun validateProtobufLists_maxIHaves() { - val router = GossipRouter(gossipParamsWithLimits) - val builder = fullMsgBuilder() builder.addIHaves(maxIHaveLength - 1, 1) val msg = builder.build() - Assertions.assertThat(router.validateMessageListLimits(msg)).isTrue() + Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isTrue() } @Test fun validateProtobufLists_maxIHaveMsgIds() { - val router = GossipRouter(gossipParamsWithLimits) - val builder = fullMsgBuilder() builder.addIHaves(1, maxIHaveLength - 1) val msg = builder.build() - Assertions.assertThat(router.validateMessageListLimits(msg)).isTrue() + Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isTrue() } @Test fun validateProtobufLists_maxIWants() { - val router = GossipRouter(gossipParamsWithLimits) - val builder = fullMsgBuilder() builder.addIWants(maxIWantMessageIds - 1, 1) val msg = builder.build() - Assertions.assertThat(router.validateMessageListLimits(msg)).isTrue() + Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isTrue() } @Test fun validateProtobufLists_maxIWantMsgIds() { - val router = GossipRouter(gossipParamsWithLimits) - val builder = fullMsgBuilder() builder.addIWants(1, maxIWantMessageIds - 1) val msg = builder.build() - Assertions.assertThat(router.validateMessageListLimits(msg)).isTrue() + Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isTrue() } @Test fun validateProtobufLists_maxGrafts() { - val router = GossipRouter(gossipParamsWithLimits) - val builder = fullMsgBuilder() builder.addGrafts(maxGraftMessages - 1) val msg = builder.build() - Assertions.assertThat(router.validateMessageListLimits(msg)).isTrue() + Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isTrue() } @Test fun validateProtobufLists_maxPrunes() { - val router = GossipRouter(gossipParamsWithLimits) - val builder = fullMsgBuilder() builder.addPrunes(maxPruneMessages - 1, 1) val msg = builder.build() - Assertions.assertThat(router.validateMessageListLimits(msg)).isTrue() + Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isTrue() } @Test fun validateProtobufLists_maxPrunePeers() { - val router = GossipRouter(gossipParamsWithLimits) - val builder = fullMsgBuilder() builder.addPrunes(1, maxPeersPerPruneMessage - 1) val msg = builder.build() - Assertions.assertThat(router.validateMessageListLimits(msg)).isTrue() + Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isTrue() } private fun fullMsgBuilder(): RpcBuilder { diff --git a/src/test/kotlin/io/libp2p/pubsub/gossip/GossipRpcPartsQueueTest.kt b/src/test/kotlin/io/libp2p/pubsub/gossip/GossipRpcPartsQueueTest.kt index cfea5b5cc..c5cc7c85c 100644 --- a/src/test/kotlin/io/libp2p/pubsub/gossip/GossipRpcPartsQueueTest.kt +++ b/src/test/kotlin/io/libp2p/pubsub/gossip/GossipRpcPartsQueueTest.kt @@ -4,6 +4,7 @@ import io.libp2p.core.PeerId import io.libp2p.etc.types.toProtobuf import io.libp2p.etc.types.toWBytes import io.libp2p.pubsub.gossip.builders.GossipParamsBuilder +import io.libp2p.pubsub.gossip.builders.GossipRouterBuilder import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test import org.junit.jupiter.params.ParameterizedTest @@ -190,7 +191,7 @@ class GossipRpcPartsQueueTest { gossipParams: GossipParams, queue: TestGossipQueue ) { - val router = GossipRouter(gossipParams) + val router = GossipRouterBuilder(params = gossipParams).build() val monolithMsg = queue.mergedSingle() val merged = queue.takeMerged() @@ -217,7 +218,7 @@ class GossipRpcPartsQueueTest { @Test fun `mergeMessageParts() test that split doesn't result in topic publish before subscribe`() { - val router = GossipRouter(gossipParamsWithLimits) + val router = GossipRouterBuilder(params = gossipParamsWithLimits).build() val partsQueue = TestGossipQueue(gossipParamsWithLimits) (0 until maxSubscriptions + 1).forEach { partsQueue.addSubscribe("topic-$it") @@ -239,7 +240,7 @@ class GossipRpcPartsQueueTest { @Test fun `mergeMessageParts() test that even when all parts fit to 2 messages the result should be 3 messages`() { - val router = GossipRouter(gossipParamsWithLimits) + val router = GossipRouterBuilder(params = gossipParamsWithLimits).build() val partsQueue = TestGossipQueue(gossipParamsWithLimits) (0 until maxSubscriptions + 1).forEach { partsQueue.addSubscribe("topic-$it") diff --git a/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt b/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt index 97f0abbd2..6dbd3fa88 100644 --- a/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt +++ b/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt @@ -17,14 +17,13 @@ import io.libp2p.etc.types.times import io.libp2p.etc.types.toBytesBigEndian import io.libp2p.etc.types.toProtobuf import io.libp2p.etc.types.toWBytes -import io.libp2p.pubsub.DefaultPubsubMessage -import io.libp2p.pubsub.DeterministicFuzz -import io.libp2p.pubsub.MessageId -import io.libp2p.pubsub.MockRouter -import io.libp2p.pubsub.SemiduplexConnection -import io.libp2p.pubsub.Topic +import io.libp2p.pubsub.* +import io.libp2p.pubsub.DeterministicFuzz.Companion.createGossipFuzzRouterFactory +import io.libp2p.pubsub.DeterministicFuzz.Companion.createMockFuzzRouterFactory +import io.libp2p.pubsub.gossip.builders.GossipRouterBuilder import io.netty.buffer.ByteBuf import io.netty.buffer.Unpooled +import io.netty.channel.ChannelHandler import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelOutboundHandlerAdapter import io.netty.channel.ChannelPromise @@ -38,6 +37,7 @@ import pubsub.pb.Rpc import java.nio.charset.StandardCharsets import java.util.concurrent.CompletableFuture import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference @@ -60,12 +60,12 @@ class GossipV1_1Tests { val mockRouterCount: Int = 10, val params: GossipParams = GossipParams(), val scoreParams: GossipScoreParams = GossipScoreParams(), - gossipRouter: () -> GossipRouter = { GossipRouter(params, scoreParams) }, - mockRouters: () -> List = { (0 until mockRouterCount).map { MockRouter() } } +// mockRouters: () -> List = { (0 until mockRouterCount).map { MockRouter() } } ) { val fuzz = DeterministicFuzz() - val router0 = fuzz.createTestRouter(gossipRouter()) - val routers = mockRouters().map { fuzz.createTestRouter(it) } + val gossipRouterBuilderFactory = { GossipRouterBuilder(params = params, scoreParams = scoreParams) } + val router0 = fuzz.createTestRouter(createGossipFuzzRouterFactory(gossipRouterBuilderFactory)) + val routers = (0 until mockRouterCount).map { fuzz.createTestRouter(createMockFuzzRouterFactory()) } val connections = mutableListOf() val gossipRouter = router0.router as GossipRouter val mockRouters = routers.map { it.router as MockRouter } @@ -84,31 +84,31 @@ class GossipV1_1Tests { fun getMockRouter(peerId: PeerId) = mockRouters[routers.indexOfFirst { it.peerId == peerId }] } + @Test + fun selfSanityTest() { + val test = TwoRoutersTest() + + test.mockRouter.subscribe("topic1") + val msg = newMessage("topic1", 0L, "Hello".toByteArray()) + test.gossipRouter.publish(msg) + test.mockRouter.waitForMessage { it.publishCount > 0 } + } + class TwoRoutersTest( val coreParams: GossipParams = GossipParams(), val scoreParams: GossipScoreParams = GossipScoreParams(), - gossipRouter: () -> GossipRouter = { GossipRouter(coreParams, scoreParams) }, - mockRouter: () -> MockRouter = { MockRouter() } + mockRouterFactory: DeterministicFuzzRouterFactory = createMockFuzzRouterFactory() ) { val fuzz = DeterministicFuzz() - val router1 = fuzz.createTestRouter(gossipRouter()) - val router2 = fuzz.createTestRouter(mockRouter()) + val gossipRouterBuilderFactory = { GossipRouterBuilder(params = coreParams, scoreParams = scoreParams) } + val router1 = fuzz.createTestRouter(createGossipFuzzRouterFactory(gossipRouterBuilderFactory)) + val router2 = fuzz.createTestRouter(mockRouterFactory) val gossipRouter = router1.router as GossipRouter val mockRouter = router2.router as MockRouter val connection = router1.connectSemiDuplex(router2, null, LogLevel.ERROR) } - @Test - fun selfSanityTest() { - val test = TwoRoutersTest() - - test.mockRouter.subscribe("topic1") - val msg = newMessage("topic1", 0L, "Hello".toByteArray()) - test.gossipRouter.publish(msg) - test.mockRouter.waitForMessage { it.publishCount > 0 } - } - @Test fun testSeenTTL() { val test = TwoRoutersTest(GossipParams(seenTTL = 1.minutes)) @@ -157,9 +157,10 @@ class GossipV1_1Tests { @Test fun testPenaltyForMalformedMessage() { - class MalformedMockRouter : MockRouter() { + class MalformedMockRouter(executor: ScheduledExecutorService) : MockRouter(executor) { var malform = false - override fun initChannel(streamHandler: StreamHandler) { + + override fun initChannelWithHandler(streamHandler: StreamHandler, handler: ChannelHandler?) { streamHandler.stream.pushHandler(object : ChannelOutboundHandlerAdapter() { override fun write(ctx: ChannelHandlerContext, msg: Any, promise: ChannelPromise) { msg as ByteBuf @@ -171,11 +172,11 @@ class GossipV1_1Tests { } } }) - super.initChannel(streamHandler) + super.initChannelWithHandler(streamHandler, handler) } } - val mockRouter = MalformedMockRouter() - val test = TwoRoutersTest(mockRouter = { mockRouter }) + val test = TwoRoutersTest(mockRouterFactory = { exec, _, _ -> MalformedMockRouter(exec) }) + val mockRouter = test.router2.router as MalformedMockRouter val api = createPubsubApi(test.gossipRouter) val apiMessages = mutableListOf() diff --git a/src/test/kotlin/io/libp2p/pubsub/gossip/TwoGossipHostTestBase.kt b/src/test/kotlin/io/libp2p/pubsub/gossip/TwoGossipHostTestBase.kt index 805adeec7..82b02db37 100644 --- a/src/test/kotlin/io/libp2p/pubsub/gossip/TwoGossipHostTestBase.kt +++ b/src/test/kotlin/io/libp2p/pubsub/gossip/TwoGossipHostTestBase.kt @@ -4,6 +4,7 @@ import io.libp2p.core.dsl.host import io.libp2p.core.multiformats.Multiaddr import io.libp2p.core.mux.StreamMuxerProtocol import io.libp2p.etc.util.netty.LoggingHandlerShort +import io.libp2p.pubsub.gossip.builders.GossipRouterBuilder import io.libp2p.security.noise.NoiseXXSecureChannel import io.libp2p.transport.tcp.TcpTransport import io.netty.handler.logging.LogLevel @@ -16,8 +17,8 @@ abstract class TwoGossipHostTestBase { open val params = GossipParams() - open val router1 by lazy { GossipRouter(params) } - open val router2 by lazy { GossipRouter(params) } + open val router1 by lazy { GossipRouterBuilder(params = params).build() } + open val router2 by lazy { GossipRouterBuilder(params = params).build() } open val gossip1 by lazy { Gossip(router1, debugGossipHandler = LoggingHandlerShort("host-1", LogLevel.INFO))