Skip to content

Commit

Permalink
add more gossipsub methods to api
Browse files Browse the repository at this point in the history
Add `api.store()` method as way to get `StoreApi` from the API. This
exposes the `put` method, which is needed in `iroh-share`.
also moves CLI specific `connect` method to `iroh` and adds more general `connect` method to `iroh-api`
  • Loading branch information
ramfox committed Jan 5, 2023
1 parent 3497ade commit ac5ed16
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 33 deletions.
14 changes: 12 additions & 2 deletions iroh-api/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use mockall::automock;
use relative_path::RelativePathBuf;
use tokio::io::{AsyncRead, AsyncReadExt};

use crate::store::add_blocks_to_store;
use crate::store::{add_blocks_to_store, StoreApi};

/// API to interact with an iroh system.
///
Expand Down Expand Up @@ -104,6 +104,11 @@ impl Api {
Self { client, resolver }
}

/// Returns a [`Resolver`] you can use to resolve data from the Iroh store or the network.
pub async fn resolver(&self) -> Resolver<FullLoader> {
self.resolver.clone()
}

/// Announces to the DHT that this node can offer the given [`Cid`].
///
/// This publishes a provider record for the [`Cid`] to the DHT, establishing the local
Expand All @@ -117,6 +122,11 @@ impl Api {
Ok(P2pApi::new(p2p_client))
}

pub fn store(&self) -> Result<StoreApi> {
let store_client = self.client.try_store()?;
Ok(StoreApi::new(store_client))
}

/// High level get, equivalent of CLI `iroh get`.
///
/// Returns a stream of items, where items can be either blobs or UnixFs components.
Expand Down Expand Up @@ -192,7 +202,7 @@ impl Api {
};

Ok(Box::pin(
add_blocks_to_store(Some(self.client.clone()), blocks).await,
add_blocks_to_store(Some(self.store()?), blocks).await,
))
}

Expand Down
2 changes: 1 addition & 1 deletion iroh-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub use crate::error::ApiError;
pub use crate::p2p::MockP2p as P2pApi;
#[cfg(not(feature = "testing"))]
pub use crate::p2p::P2p as P2pApi;
pub use crate::p2p::PeerIdOrAddr;
pub use crate::p2p::{peer_id_from_multiaddr, PeerIdOrAddr};
pub use bytes::Bytes;
pub use cid::Cid;
pub use iroh_resolver::resolver::Path as IpfsPath;
Expand Down
71 changes: 54 additions & 17 deletions iroh-api/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,27 @@ impl P2p {
}

pub async fn lookup_local(&self) -> Result<Lookup> {
self.client.lookup_local().await
self.client
.lookup_local()
.await
.map_err(|e| map_service_error("p2p", e))
}

/// The [`PeerId`] for this Iroh p2p nod
pub async fn peer_id(&self) -> Result<PeerId> {
self.client
.local_peer_id()
.await
.map_err(|e| map_service_error("p2p", e))
}

/// The list of [`Multiaddr`] that the Iroh p2p node is listening on
pub async fn addrs(&self) -> Result<Vec<Multiaddr>> {
self.client
.get_listening_addrs()
.await
.map(|(_, addrs)| addrs)
.map_err(|e| map_service_error("p2p", e))
}

pub async fn lookup(&self, addr: &PeerIdOrAddr) -> Result<Lookup> {
Expand All @@ -48,15 +68,15 @@ impl P2p {
.map_err(|e| map_service_error("p2p", e))
}

pub async fn connect(&self, addr: &PeerIdOrAddr) -> Result<()> {
match addr {
PeerIdOrAddr::PeerId(peer_id) => self.client.connect(*peer_id, vec![]).await,
PeerIdOrAddr::Multiaddr(addr) => {
let peer_id = peer_id_from_multiaddr(addr)?;
self.client.connect(peer_id, vec![addr.clone()]).await
}
}
.map_err(|e| map_service_error("p2p", e))
/// Connect to a peer using a [`PeerId`] and `Vec` of [`Multiaddr`]
///
/// If there is an empty `Vec` of `Multiaddr`s, Iroh will attempt to find
/// the peer on the DHT using the `PeerId`.
pub async fn connect(&self, peer_id: PeerId, addrs: Vec<Multiaddr>) -> Result<()> {
self.client
.connect(peer_id, addrs)
.await
.map_err(|e| map_service_error("p2p", e))
}

pub async fn peers(&self) -> Result<HashMap<PeerId, Vec<Multiaddr>>> {
Expand Down Expand Up @@ -94,23 +114,40 @@ impl P2p {
// a stream of only the gossipsub messages on that topic
pub async fn gossipsub_subscribe(&self, topic: String) -> Result<bool> {
let topic = TopicHash::from_raw(topic);
self.client.gossipsub_subscribe(topic).await
self.client
.gossipsub_subscribe(topic)
.await
.map_err(|e| map_service_error("p2p", e))
}

/// Publish a message on a Gossipsub Topic.
///
/// This allows you to publish a message on a given topic to anyone in your
/// network that is subscribed to that topic.
/// This allows you to publish a message on a given topic to anyone in your network that is
/// subscribed to that topic.
///
/// Read the [`P2p::gossipsub_subscribe`] documentation for how to subscribe
/// and receive Gossipsub messages.
/// Read the [`P2p::gossipsub_subscribe`] documentation for how to subscribe and receive
/// Gossipsub messages.
pub async fn gossipsub_publish(&self, topic: String, data: Bytes) -> Result<MessageId> {
let topic = TopicHash::from_raw(topic);
self.client.gossipsub_publish(topic, data).await
self.client
.gossipsub_publish(topic, data)
.await
.map_err(|e| map_service_error("p2p", e))
}

/// Add a peer to the list of Gossipsub peers we are explicitly connected to.
///
/// We will attempt to stay connected and forward all relevant Gossipsub messages
/// to this peer.
pub async fn gossipsub_add_peer(&self, peer_id: PeerId) -> Result<()> {
self.client
.gossipsub_add_explicit_peer(peer_id)
.await
.map_err(|e| map_service_error("p2p", e))
}
}

fn peer_id_from_multiaddr(addr: &Multiaddr) -> Result<PeerId> {
pub fn peer_id_from_multiaddr(addr: &Multiaddr) -> Result<PeerId> {
match addr.iter().find(|p| matches!(*p, Protocol::P2p(_))) {
Some(Protocol::P2p(peer_id)) => {
PeerId::from_multihash(peer_id).map_err(|m| anyhow::anyhow!("Multiaddress contains invalid p2p multihash {:?}. Cannot derive a PeerId from this address.", m ))
Expand Down
38 changes: 31 additions & 7 deletions iroh-api/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use async_trait::async_trait;
use bytes::Bytes;
use cid::Cid;
use futures::{Stream, StreamExt};
use iroh_rpc_client::Client;
use iroh_rpc_client::StoreClient;
use iroh_unixfs::Block;
#[cfg(feature = "testing")]

/// How many chunks to buffer up when adding content.
const _ADD_PAR: usize = 24;
Expand All @@ -19,20 +20,43 @@ pub trait Store: 'static + Send + Sync + Clone {
async fn put_many(&self, blocks: Vec<Block>) -> Result<()>;
}

#[derive(Debug, Clone)]
pub struct StoreApi {
client: StoreClient,
}

impl StoreApi {
pub fn new(client: StoreClient) -> Self {
Self { client }
}

pub async fn has(&self, cid: Cid) -> Result<bool> {
self.client.has(cid).await
}

pub async fn put(&self, cid: Cid, blob: Bytes, links: Vec<Cid>) -> Result<()> {
self.client.put(cid, blob, links).await
}

pub async fn put_many(&self, blocks: Vec<Block>) -> Result<()> {
self.client
.put_many(blocks.into_iter().map(|x| x.into_parts()).collect())
.await
}
}

#[async_trait]
impl Store for Client {
impl Store for StoreApi {
async fn has(&self, cid: Cid) -> Result<bool> {
self.try_store()?.has(cid).await
self.has(cid).await
}

async fn put(&self, cid: Cid, blob: Bytes, links: Vec<Cid>) -> Result<()> {
self.try_store()?.put(cid, blob, links).await
self.put(cid, blob, links).await
}

async fn put_many(&self, blocks: Vec<Block>) -> Result<()> {
self.try_store()?
.put_many(blocks.into_iter().map(|x| x.into_parts()).collect())
.await
self.put_many(blocks).await
}
}

Expand Down
21 changes: 15 additions & 6 deletions iroh/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::doc;
use anyhow::{Error, Result};
use clap::{Args, Subcommand};
use crossterm::style::Stylize;
use iroh_api::{Lookup, Multiaddr, P2pApi, PeerId, PeerIdOrAddr};
use iroh_api::{peer_id_from_multiaddr, Lookup, Multiaddr, P2pApi, PeerId, PeerIdOrAddr};
use std::{collections::HashMap, fmt::Display, str::FromStr};

#[derive(Args, Debug, Clone)]
Expand Down Expand Up @@ -63,12 +63,21 @@ impl Display for PeerIdOrAddrArg {

pub async fn run_command(p2p: &P2pApi, cmd: &P2p) -> Result<()> {
match &cmd.command {
P2pCommands::Connect { addr } => match p2p.connect(&addr.0).await {
Ok(_) => {
println!("Connected to {addr}!");
P2pCommands::Connect { addr } => {
let res = match &addr.0 {
PeerIdOrAddr::PeerId(peer_id) => p2p.connect(*peer_id, vec![]).await,
PeerIdOrAddr::Multiaddr(addr) => {
let peer_id = peer_id_from_multiaddr(addr)?;
p2p.connect(peer_id, vec![addr.clone()]).await
}
};
match res {
Ok(_) => {
println!("Connected to {addr}!");
}
Err(e) => return Err(e),
}
Err(e) => return Err(e),
},
}
P2pCommands::Lookup { addr } => {
let lookup = match addr {
Some(addr) => p2p.lookup(&addr.0).await?,
Expand Down

0 comments on commit ac5ed16

Please sign in to comment.