Skip to content

Commit

Permalink
clean up & add documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
ramfox committed Jan 5, 2023
1 parent 491e7d5 commit 3497ade
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 60 deletions.
28 changes: 26 additions & 2 deletions iroh-api/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl P2p {
.map_err(|e| map_service_error("p2p", e))
}

/// Get a stream of [`NetworkEvent`].
pub async fn network_events(&self) -> Result<BoxStream<'static, Result<NetworkEvent>>> {
let stream = self
.client
Expand All @@ -75,12 +76,35 @@ impl P2p {
Ok(stream.boxed())
}

pub async fn subscribe(&self, topic: String) -> Result<bool> {
/// Subscribe to a Gossipsub Topic
///
/// Gossipsub is a pub/sub protocol. This will subscribe you to a Gossipsub
/// topic. All Gossipsub messages for topics that you are subscribed to can
/// be read off the the `NetworkEvent` stream, as
/// `NetworkEvent::Gossipsub(GossipsubEvent::Message)`. You can access this
/// stream using `P2p::network_events`.
///
/// There is currently no `gossipsub_unsubscribe` exposed to `iroh-api` crate
/// yet.
///
/// Learn more about the Gossipsub protocol in the `libp2p-gossipsub`
/// [documentation](https://docs.rs/libp2p-gossipsub/latest/libp2p_gossipsub/).
//
// TODO(ramfox): write `gossipsub_messages(topic: String)` method that returns
// a stream of only the gossipsub messages on that topic
pub async fn gossipsub_subscribe(&self, topic: String) -> Result<bool> {
let topic = TopicHash::from_raw(topic);
self.client.gossipsub_subscribe(topic).await
}

pub async fn publish(&self, topic: String, data: Bytes) -> Result<MessageId> {
/// Publish a message on a Gossipsub Topic.
///
/// This allows you to publish a message on a given topic to anyone in your
/// network that is subscribed to that topic.
///
/// Read the [`P2p::gossipsub_subscribe`] documentation for how to subscribe
/// and receive Gossipsub messages.
pub async fn gossipsub_publish(&self, topic: String, data: Bytes) -> Result<MessageId> {
let topic = TopicHash::from_raw(topic);
self.client.gossipsub_publish(topic, data).await
}
Expand Down
2 changes: 1 addition & 1 deletion iroh-p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ mod swarm;
pub use self::config::*;
pub use self::keys::{DiskStorage, Keychain, MemoryStorage};
pub use self::node::*;
pub use iroh_rpc_types::p2p::{GossipsubEvent, NetworkEvent};
pub use iroh_rpc_types::{GossipsubEvent, NetworkEvent};

pub(crate) const VERSION: &str = env!("CARGO_PKG_VERSION");
2 changes: 1 addition & 1 deletion iroh-rpc-client/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use async_stream::stream;
use bytes::Bytes;
use cid::Cid;
use futures::{Stream, StreamExt};
use iroh_rpc_types::{p2p::*, VersionRequest, WatchRequest};
use iroh_rpc_types::{p2p::*, NetworkEvent, VersionRequest, WatchRequest};
use libp2p::gossipsub::{MessageId, TopicHash};
use libp2p::{Multiaddr, PeerId};
use std::collections::{HashMap, HashSet};
Expand Down
2 changes: 2 additions & 0 deletions iroh-rpc-types/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
pub mod addr;
pub mod gateway;
mod network_event;
pub mod p2p;
pub mod store;

use std::fmt;

pub use crate::network_event::{GossipsubEvent, NetworkEvent};
pub use addr::Addr;

use serde::{Deserialize, Serialize};
Expand Down
57 changes: 57 additions & 0 deletions iroh-rpc-types/src/network_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use libp2p::{
gossipsub::{GossipsubMessage, MessageId, TopicHash},
PeerId,
};
use serde::{Deserialize, Serialize};

#[allow(clippy::large_enum_variant)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum NetworkEvent {
PeerConnected(PeerId),
PeerDisconnected(PeerId),
Gossipsub(GossipsubEvent),
CancelLookupQuery(PeerId),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum GossipsubEvent {
Subscribed {
peer_id: PeerId,
#[serde(with = "TopicHashDef")]
topic: TopicHash,
},
Unsubscribed {
peer_id: PeerId,
#[serde(with = "TopicHashDef")]
topic: TopicHash,
},
Message {
from: PeerId,
id: MessageId,
#[serde(with = "GossipsubMessageDef")]
message: GossipsubMessage,
},
}

#[derive(Serialize, Deserialize)]
#[serde(remote = "TopicHash")]
struct TopicHashDef {
#[serde(getter = "TopicHash::to_string")]
hash: String,
}

impl From<TopicHashDef> for TopicHash {
fn from(t: TopicHashDef) -> Self {
TopicHash::from_raw(t.hash)
}
}

#[derive(Serialize, Deserialize)]
#[serde(remote = "GossipsubMessage")]
struct GossipsubMessageDef {
source: Option<PeerId>,
data: Vec<u8>,
sequence_number: Option<u64>,
#[serde(with = "TopicHashDef")]
topic: TopicHash,
}
59 changes: 3 additions & 56 deletions iroh-rpc-types/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use quic_rpc::{
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;

use crate::{RpcResult, VersionRequest, VersionResponse, WatchRequest, WatchResponse};
use crate::{
NetworkEvent, RpcResult, VersionRequest, VersionResponse, WatchRequest, WatchResponse,
};

pub type P2pAddr = super::addr::Addr<P2pService>;

Expand Down Expand Up @@ -152,61 +154,6 @@ pub struct NetworkEventsResponse {
pub event: NetworkEvent,
}

use libp2p::gossipsub::{GossipsubMessage, MessageId, TopicHash};
// TODO(ramfox): figure out better place for this or way to handle
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum NetworkEvent {
PeerConnected(PeerId),
PeerDisconnected(PeerId),
Gossipsub(GossipsubEvent),
CancelLookupQuery(PeerId),
}

// TODO(ramfox): figure out better place for this or way to handle
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum GossipsubEvent {
Subscribed {
peer_id: PeerId,
#[serde(with = "TopicHashDef")]
topic: TopicHash,
},
Unsubscribed {
peer_id: PeerId,
#[serde(with = "TopicHashDef")]
topic: TopicHash,
},
Message {
from: PeerId,
id: MessageId,
#[serde(with = "GossipsubMessageDef")]
message: GossipsubMessage,
},
}

#[derive(Serialize, Deserialize)]
#[serde(remote = "TopicHash")]
struct TopicHashDef {
#[serde(getter = "TopicHash::to_string")]
hash: String,
}

impl From<TopicHashDef> for TopicHash {
fn from(t: TopicHashDef) -> Self {
TopicHash::from_raw(t.hash)
}
}

#[derive(Serialize, Deserialize)]
#[serde(remote = "GossipsubMessage")]
struct GossipsubMessageDef {
source: Option<PeerId>,
data: Vec<u8>,
sequence_number: Option<u64>,
#[serde(with = "TopicHashDef")]
topic: TopicHash,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct GossipsubAddExplicitPeerRequest {
pub peer_id: PeerId,
Expand Down

0 comments on commit 3497ade

Please sign in to comment.