diff --git a/src/lib.rs b/src/lib.rs index d9991263b..44c2a7aa5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -268,6 +268,8 @@ enum IpfsEvent { Disconnect(MultiaddrWithPeerId, Channel<()>), /// Request background task to return the listened and external addresses GetAddresses(OneshotSender>), + PubsubAddPeer(PeerId, OneshotSender<()>), + PubsubRemovePeer(PeerId, OneshotSender<()>), PubsubSubscribe(String, OneshotSender>), PubsubUnsubscribe(String, OneshotSender), PubsubPublish(String, Vec, OneshotSender<()>), @@ -755,6 +757,43 @@ impl Ipfs { .await } + /// Add a peer to list of nodes to propagate messages to. + /// + /// A peer will not receive any pubsub messages from this node until it is added using this function, + /// unless it has added this node in the same way. + pub async fn pubsub_add_peer(&self, peer_id: PeerId) -> Result<(), Error> { + async move { + let (tx, rx) = oneshot_channel::<()>(); + + self.to_task + .clone() + .send(IpfsEvent::PubsubAddPeer(peer_id, tx)) + .await?; + + Ok(rx.await?) + } + .instrument(self.span.clone()) + .await + } + + /// Remove a peer from the list of nodes that messages are propagated to. + /// + /// Calling this function will not stop messages being sent to the specified peers for subscribed topics which have already been communicated. + pub async fn pubsub_remove_peer(&self, peer_id: PeerId) -> Result<(), Error> { + async move { + let (tx, rx) = oneshot_channel::<()>(); + + self.to_task + .clone() + .send(IpfsEvent::PubsubRemovePeer(peer_id, tx)) + .await?; + + Ok(rx.await?) + } + .instrument(self.span.clone()) + .await + } + /// Subscribes to a given topic. Can be done at most once without unsubscribing in the between. /// The subscription can be unsubscribed by dropping the stream or calling /// [`Ipfs::pubsub_unsubscribe`]. @@ -1431,6 +1470,20 @@ impl Future for IpfsFuture { // ignore error, perhaps caller went away already let _ = ret.send(addresses); } + IpfsEvent::PubsubAddPeer(peer_id, ret) => { + self.swarm + .behaviour_mut() + .pubsub() + .add_node_to_partial_view(peer_id); + let _ = ret.send(()); + } + IpfsEvent::PubsubRemovePeer(peer_id, ret) => { + self.swarm + .behaviour_mut() + .pubsub() + .remove_node_from_partial_view(&peer_id); + let _ = ret.send(()); + } IpfsEvent::PubsubSubscribe(topic, ret) => { let _ = ret.send(self.swarm.behaviour_mut().pubsub().subscribe(topic)); } @@ -1780,8 +1833,11 @@ mod node { #[cfg(test)] mod tests { + use std::time::Duration; + use super::*; use crate::make_ipld; + use futures::{stream::poll_immediate, StreamExt}; use multihash::Sha2_256; #[tokio::test] @@ -1819,4 +1875,47 @@ mod tests { ipfs.remove_pin(&cid, false).await.unwrap(); assert!(!ipfs.is_pinned(&cid).await.unwrap()); } + + #[tokio::test] + async fn test_pubsub_send_and_receive() { + let alice = Node::new("alice").await; + let bob = Node::new("bob").await; + let bob_addr = bob.addrs_local().await.unwrap()[0].clone(); + + let topic = String::from("test_topic"); + alice + .connect(bob_addr.with(Protocol::P2p(bob.id.into()))) + .await + .expect("alice failed to connect to bob"); + let _alice_messages = alice.pubsub_subscribe(topic.clone()).await.unwrap(); + let mut bob_messages = poll_immediate(bob.pubsub_subscribe(topic.clone()).await.unwrap()); + + let data = vec![1, 2, 3]; + + alice + .pubsub_publish(topic.clone(), data.clone()) + .await + .unwrap(); + tokio::time::sleep(Duration::from_millis(100)).await; + + assert_eq!(bob_messages.next().await, Some(Poll::Pending)); + + bob.pubsub_add_peer(alice.id).await.unwrap(); + tokio::time::sleep(Duration::from_millis(100)).await; + + assert_eq!(bob_messages.next().await, Some(Poll::Pending)); + + alice + .pubsub_publish(topic.clone(), data.clone()) + .await + .unwrap(); + tokio::time::sleep(Duration::from_millis(100)).await; + + let received_data = bob_messages + .next() + .await + .expect("unexpected end of stream") + .map(|msg| msg.data.clone()); + assert_eq!(received_data, Poll::Ready(data.clone())); + } }