Skip to content

Commit

Permalink
Refactor GossipRouter: extract GossipRouterBuilder (#249)
Browse files Browse the repository at this point in the history
* extract GossipRouterBuilder
* Move property declarations right after primary constructor (which may also declare properties) in changed classes
* Refactor AbstractRouter.addPeerWithDebugHandler() workaround
* Remove pubsub interface var properties
* Adjust tests
  • Loading branch information
Nashatyrev authored Jul 18, 2022
1 parent 6bde649 commit c9f3dfc
Show file tree
Hide file tree
Showing 22 changed files with 438 additions and 382 deletions.
99 changes: 41 additions & 58 deletions src/main/kotlin/io/libp2p/etc/util/P2PService.kt
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
package io.libp2p.etc.util

import com.google.common.util.concurrent.ThreadFactoryBuilder
import io.libp2p.core.InternalErrorException
import io.libp2p.core.PeerId
import io.libp2p.core.Stream
import io.libp2p.etc.types.lazyVarInit
import io.libp2p.etc.types.submitAsync
import io.libp2p.etc.types.toVoidCompletableFuture
import io.libp2p.pubsub.AbstractRouter
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.util.ReferenceCountUtil
import org.apache.logging.log4j.LogManager
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.function.Supplier

private val logger = LogManager.getLogger(P2PService::class.java)

/**
* Base class for a service which manages many streams from different peers
Expand All @@ -28,8 +25,34 @@ import java.util.function.Supplier
* service API should be executed on this thread to be thread-safe.
* Consider using the following helpers [runOnEventThread], [submitOnEventThread], [submitAsyncOnEventThread]
* or use the [executor] directly
*
* @param executor Executor backed by a single event thread
* It is only safe to perform any service logic via this executor
*/
abstract class P2PService {
abstract class P2PService(
protected val executor: ScheduledExecutorService
) {

private val peersMutable = mutableListOf<PeerHandler>()
/**
* List of connected peers.
* Note that connected peer could not be ready for writing yet, so consider [activePeers]
* if any data is to be send
*/
val peers: List<PeerHandler> = peersMutable

private val activePeersMutable = mutableListOf<PeerHandler>()
/**
* List of active peers to which data could be written
*/
val activePeers: List<PeerHandler> = activePeersMutable

private val peerIdToPeerHandlerMapMutable = mutableMapOf<PeerId, PeerHandler>()

/**
* Maps [PeerId] to [PeerHandler] instance for connected peers
*/
val peerIdToPeerHandlerMap: Map<PeerId, PeerHandler> = peerIdToPeerHandlerMapMutable

/**
* Represents a single stream
Expand Down Expand Up @@ -107,40 +130,6 @@ abstract class P2PService {
}
}

/**
* Executor backed by a single event thread
* It is only safe to perform any service logic via this executor
*
* The executor can be altered right after the instance creation.
* Changing it later may have unpredictable results
*/
var executor: ScheduledExecutorService by lazyVarInit {
Executors.newSingleThreadScheduledExecutor(
threadFactory
)
}

private val peersMutable = mutableListOf<PeerHandler>()
/**
* List of connected peers.
* Note that connected peer could not be ready for writing yet, so consider [activePeers]
* if any data is to be send
*/
val peers: List<PeerHandler> = peersMutable

private val activePeersMutable = mutableListOf<PeerHandler>()
/**
* List of active peers to which data could be written
*/
val activePeers: List<PeerHandler> = activePeersMutable

private val peerIdToPeerHandlerMapMutable = mutableMapOf<PeerId, PeerHandler>()

/**
* Maps [PeerId] to [PeerHandler] instance for connected peers
*/
val peerIdToPeerHandlerMap: Map<PeerId, PeerHandler> = peerIdToPeerHandlerMapMutable

/**
* Adds a new stream to service. This method should **synchronously** init the underlying
* [io.netty.channel.Channel]
Expand All @@ -150,6 +139,17 @@ abstract class P2PService {
*/
open fun addNewStream(stream: Stream) = initChannel(StreamHandler(stream))

/**
* Callback to initialize the [Stream] underlying [io.netty.channel.Channel]
*
* Is invoked **not** on the event thread
* [io.netty.channel.Channel] initialization must be performed **synchronously on the caller thread**.
* **Don't** initialize the channel on event thread!
* Any service logic related to adding a new stream could be performed
* within overridden [streamAdded] callback (which is invoked on event thread)
*/
protected abstract fun initChannel(streamHandler: StreamHandler)

protected open fun streamAdded(streamHandler: StreamHandler) {
val peerHandler = createPeerHandler(streamHandler)
streamHandler.initPeerHandler(peerHandler)
Expand Down Expand Up @@ -184,17 +184,6 @@ abstract class P2PService {
onInbound(stream.getPeerHandler(), msg)
}

/**
* Callback to initialize the [Stream] underlying [io.netty.channel.Channel]
*
* Is invoked **not** on the event thread
* [io.netty.channel.Channel] initialization must be performed **synchronously on the caller thread**.
* **Don't** initialize the channel on event thread!
* Any service logic related to adding a new stream could be performed
* within overridden [streamAdded] callback (which is invoked on event thread)
*/
protected abstract fun initChannel(streamHandler: StreamHandler)

/**
* Callback notifies that the peer is active and ready for writing data
* Invoked on event thread
Expand Down Expand Up @@ -251,15 +240,9 @@ abstract class P2PService {
/**
* Executes the code on the service event thread
*/
fun <C> submitOnEventThread(run: () -> C): CompletableFuture<C> = CompletableFuture.supplyAsync(Supplier { run() }, executor)
fun <C> submitOnEventThread(run: () -> C): CompletableFuture<C> = CompletableFuture.supplyAsync({ run() }, executor)
/**
* Executes the code on the service event thread
*/
fun <C> submitAsyncOnEventThread(run: () -> CompletableFuture<C>): CompletableFuture<C> = executor.submitAsync(run)

companion object {
private val threadFactory = ThreadFactoryBuilder().setDaemon(true).setNameFormat("P2PService-event-thread-%d").build()
@JvmStatic
val logger = LogManager.getLogger(AbstractRouter::class.java)
}
}
3 changes: 2 additions & 1 deletion src/main/kotlin/io/libp2p/etc/util/P2PServiceSemiDuplex.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import io.libp2p.core.SemiDuplexNoOutboundStreamException
import io.libp2p.etc.types.completedExceptionally
import io.libp2p.etc.types.toVoidCompletableFuture
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ScheduledExecutorService

/**
* The service where communication between peers is performed via two [io.libp2p.core.Stream]s
* They are initiated asynchronously by each peer. Initiated stream is used solely for writing data
* and accepted steam is used solely for reading
*/
abstract class P2PServiceSemiDuplex : P2PService() {
abstract class P2PServiceSemiDuplex(executor: ScheduledExecutorService) : P2PService(executor) {

inner class SDPeerHandler(streamHandler: StreamHandler) : PeerHandler(streamHandler) {

Expand Down
68 changes: 29 additions & 39 deletions src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@ package io.libp2p.pubsub
import io.libp2p.core.BadPeerException
import io.libp2p.core.PeerId
import io.libp2p.core.Stream
import io.libp2p.core.pubsub.RESULT_VALID
import io.libp2p.core.pubsub.ValidationResult
import io.libp2p.etc.types.MultiSet
import io.libp2p.etc.types.completedExceptionally
import io.libp2p.etc.types.copy
import io.libp2p.etc.types.forward
import io.libp2p.etc.types.lazyVarInit
import io.libp2p.etc.types.thenApplyAll
import io.libp2p.etc.types.toWBytes
import io.libp2p.etc.util.P2PServiceSemiDuplex
Expand All @@ -22,25 +20,41 @@ import org.apache.logging.log4j.LogManager
import pubsub.pb.Rpc
import java.util.Collections.singletonList
import java.util.Optional
import java.util.Random
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ScheduledExecutorService
import java.util.function.BiConsumer
import java.util.function.Consumer

// 1 MB default max message size
const val DEFAULT_MAX_PUBSUB_MESSAGE_SIZE = 1 shl 20

typealias PubsubMessageHandler = (PubsubMessage) -> CompletableFuture<ValidationResult>

open class DefaultPubsubMessage(override val protobufMessage: Rpc.Message) : AbstractPubsubMessage() {
override val messageId: MessageId = protobufMessage.from.toWBytes() + protobufMessage.seqno.toWBytes()
}

private val logger = LogManager.getLogger(AbstractRouter::class.java)

/**
* Implements common logic for pubsub routers
*/
abstract class AbstractRouter(
val subscriptionFilter: TopicSubscriptionFilter,
val maxMsgSize: Int = DEFAULT_MAX_PUBSUB_MESSAGE_SIZE
) : P2PServiceSemiDuplex(), PubsubRouter, PubsubRouterDebug {
executor: ScheduledExecutorService,
override val protocol: PubsubProtocol,
protected val subscriptionFilter: TopicSubscriptionFilter,
protected val maxMsgSize: Int,
override val messageFactory: PubsubMessageFactory,
protected val seenMessages: SeenCache<Optional<ValidationResult>>,
protected val messageValidator: PubsubRouterMessageValidator
) : P2PServiceSemiDuplex(executor), PubsubRouter, PubsubRouterDebug {

protected var msgHandler: PubsubMessageHandler = { throw IllegalStateException("Message handler is not initialized for PubsubRouter") }

protected open val peerTopics = MultiSet<PeerHandler, Topic>()
protected open val subscribedTopics = linkedSetOf<Topic>()
protected open val pendingRpcParts = PendingRpcPartsMap<RpcPartsQueue> { DefaultRpcPartsQueue() }
protected open val pendingMessagePromises = MultiSet<PeerHandler, CompletableFuture<Unit>>()

protected class PendingRpcPartsMap<out TPartsQueue : RpcPartsQueue>(
private val queueFactory: () -> TPartsQueue
Expand All @@ -53,28 +67,6 @@ abstract class AbstractRouter(
fun popQueue(peer: PeerHandler) = map.remove(peer) ?: queueFactory()
}

private val logger = LogManager.getLogger(AbstractRouter::class.java)

override var curTimeMillis: () -> Long by lazyVarInit { { System.currentTimeMillis() } }
override var random by lazyVarInit { Random() }
override var name: String = "router"

override var messageFactory: PubsubMessageFactory = { DefaultPubsubMessage(it) }
var maxSeenMessagesLimit = 10000

protected open val seenMessages: SeenCache<Optional<ValidationResult>> by lazy {
LRUSeenCache(SimpleSeenCache(), maxSeenMessagesLimit)
}

private val peerTopics = MultiSet<PeerHandler, Topic>()
private var msgHandler: (PubsubMessage) -> CompletableFuture<ValidationResult> = { RESULT_VALID }
override var messageValidator = NOP_ROUTER_VALIDATOR

val subscribedTopics = linkedSetOf<Topic>()
protected open val pendingRpcParts = PendingRpcPartsMap<RpcPartsQueue> { DefaultRpcPartsQueue() }
private var debugHandler: ChannelHandler? = null
private val pendingMessagePromises = MultiSet<PeerHandler, CompletableFuture<Unit>>()

override fun publish(msg: PubsubMessage): CompletableFuture<Unit> {
return submitAsyncOnEventThread {
if (msg in seenMessages) {
Expand Down Expand Up @@ -114,26 +106,24 @@ abstract class AbstractRouter(
}
}

override fun addPeer(peer: Stream) {
addNewStream(peer)
override fun addPeer(peer: Stream) = addPeerWithDebugHandler(peer, null)
override fun addPeerWithDebugHandler(peer: Stream, debugHandler: ChannelHandler?) {
addNewStreamWithHandler(peer, debugHandler)
}

override fun addPeerWithDebugHandler(peer: Stream, debugHandler: ChannelHandler?) {
this.debugHandler = debugHandler
try {
addPeer(peer)
} finally {
this.debugHandler = null
}
override fun addNewStream(stream: Stream) = addNewStreamWithHandler(stream, null)
protected fun addNewStreamWithHandler(stream: Stream, handler: ChannelHandler?) {
initChannelWithHandler(StreamHandler(stream), handler)
}

override fun initChannel(streamHandler: StreamHandler) {
override fun initChannel(streamHandler: StreamHandler) = initChannelWithHandler(streamHandler, null)
protected open fun initChannelWithHandler(streamHandler: StreamHandler, handler: ChannelHandler?) {
with(streamHandler.stream) {
pushHandler(LimitedProtobufVarint32FrameDecoder(maxMsgSize))
pushHandler(ProtobufVarint32LengthFieldPrepender())
pushHandler(ProtobufDecoder(Rpc.RPC.getDefaultInstance()))
pushHandler(ProtobufEncoder())
debugHandler?.also { pushHandler(it) }
handler?.also { pushHandler(it) }
pushHandler(streamHandler)
}
}
Expand Down
27 changes: 1 addition & 26 deletions src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ import io.libp2p.core.pubsub.ValidationResult
import io.libp2p.etc.types.WBytes
import io.netty.channel.ChannelHandler
import pubsub.pb.Rpc
import java.util.Random
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ScheduledExecutorService

typealias Topic = String
typealias MessageId = WBytes
Expand Down Expand Up @@ -62,8 +60,7 @@ abstract class AbstractPubsubMessage : PubsubMessage {
interface PubsubMessageRouter {

val protocol: PubsubProtocol
var messageFactory: PubsubMessageFactory
var messageValidator: PubsubRouterMessageValidator
val messageFactory: PubsubMessageFactory

/**
* Validates and broadcasts the message to suitable peers
Expand Down Expand Up @@ -135,28 +132,6 @@ interface PubsubRouter : PubsubMessageRouter, PubsubPeerRouter
*/
interface PubsubRouterDebug : PubsubRouter {

/**
* Adds ability to substitute the scheduler which is used for all async and periodic
* tasks within the router
*/
var executor: ScheduledExecutorService

/**
* System time supplier. Normally defaults to [System.currentTimeMillis]
* If router needs system time it should refer to this supplier
*/
var curTimeMillis: () -> Long

/**
* Randomness supplier
* Whenever router implementation needs random data it must refer to this var
* Tests may substitute this instance with a fixed-seed [Random]
* to perform deterministic testing
*/
var random: Random

var name: String

/**
* The same as [PubsubRouter.addPeer] but adds the [debugHandler] right before
* the terminal handler
Expand Down
19 changes: 13 additions & 6 deletions src/main/kotlin/io/libp2p/pubsub/flood/FloodRouter.kt
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
package io.libp2p.pubsub.flood

import io.libp2p.etc.types.anyComplete
import io.libp2p.pubsub.AbstractRouter
import io.libp2p.pubsub.PubsubMessage
import io.libp2p.pubsub.PubsubProtocol
import io.libp2p.pubsub.TopicSubscriptionFilter
import io.libp2p.pubsub.*
import pubsub.pb.Rpc
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService

class FloodRouter : AbstractRouter(subscriptionFilter = TopicSubscriptionFilter.AllowAllTopicSubscriptionFilter()) {
const val DEFAULT_MAX_SEEN_MESSAGES_LIMIT: Int = 10000

override val protocol = PubsubProtocol.Floodsub
class FloodRouter(executor: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor()) : AbstractRouter(
protocol = PubsubProtocol.Floodsub,
executor = executor,
subscriptionFilter = TopicSubscriptionFilter.AllowAllTopicSubscriptionFilter(),
maxMsgSize = Int.MAX_VALUE,
messageFactory = { DefaultPubsubMessage(it) },
seenMessages = LRUSeenCache(SimpleSeenCache(), DEFAULT_MAX_SEEN_MESSAGES_LIMIT),
messageValidator = NOP_ROUTER_VALIDATOR
) {

// msg: validated unseen messages received from api
override fun broadcastOutbound(msg: PubsubMessage): CompletableFuture<Unit> {
Expand Down
3 changes: 2 additions & 1 deletion src/main/kotlin/io/libp2p/pubsub/gossip/Gossip.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ import io.libp2p.core.multistream.ProtocolDescriptor
import io.libp2p.core.pubsub.PubsubApi
import io.libp2p.pubsub.PubsubApiImpl
import io.libp2p.pubsub.PubsubProtocol
import io.libp2p.pubsub.gossip.builders.GossipRouterBuilder
import io.netty.channel.ChannelHandler
import java.util.concurrent.CompletableFuture

class Gossip @JvmOverloads constructor(
private val router: GossipRouter = GossipRouter(),
private val router: GossipRouter = GossipRouterBuilder().build(),
private val api: PubsubApi = PubsubApiImpl(router),
private val debugGossipHandler: ChannelHandler? = null
) :
Expand Down
Loading

0 comments on commit c9f3dfc

Please sign in to comment.