Skip to content

Commit

Permalink
kademlia: Preserve publisher & expiration time in DHT records (#162)
Browse files Browse the repository at this point in the history
This PR fixes a bug with publisher & expiration time not being preserved
in DHT records.

Resolves #129.
  • Loading branch information
dmitry-markin authored Jun 25, 2024
1 parent c6e25eb commit fd02c1f
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 71 deletions.
6 changes: 5 additions & 1 deletion src/protocol/libp2p/kademlia/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ pub struct Config {
/// Incoming records validation mode.
pub(super) validation_mode: IncomingRecordValidationMode,

/// Default record TTl.
pub(super) record_ttl: Duration,

/// TX channel for sending events to `KademliaHandle`.
pub(super) event_tx: Sender<KademliaEvent>,

Expand Down Expand Up @@ -94,13 +97,14 @@ impl Config {
protocol_names,
update_mode,
validation_mode,
record_ttl,
codec: ProtocolCodec::UnsignedVarint(None),
replication_factor,
known_peers,
cmd_rx,
event_tx,
},
KademliaHandle::new(cmd_tx, event_rx, record_ttl),
KademliaHandle::new(cmd_tx, event_rx),
)
}

Expand Down
35 changes: 7 additions & 28 deletions src/protocol/libp2p/kademlia/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use std::{
num::NonZeroUsize,
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant},
};

/// Quorum.
Expand Down Expand Up @@ -223,23 +222,15 @@ pub struct KademliaHandle {

/// Next query ID.
next_query_id: usize,

/// Default TTL for the records.
record_ttl: Duration,
}

impl KademliaHandle {
/// Create new [`KademliaHandle`].
pub(super) fn new(
cmd_tx: Sender<KademliaCommand>,
event_rx: Receiver<KademliaEvent>,
record_ttl: Duration,
) -> Self {
pub(super) fn new(cmd_tx: Sender<KademliaCommand>, event_rx: Receiver<KademliaEvent>) -> Self {
Self {
cmd_tx,
event_rx,
next_query_id: 0usize,
record_ttl,
}
}

Expand All @@ -265,9 +256,7 @@ impl KademliaHandle {
}

/// Store record to DHT.
pub async fn put_record(&mut self, mut record: Record) -> QueryId {
record.expires = record.expires.or_else(|| Some(Instant::now() + self.record_ttl));

pub async fn put_record(&mut self, record: Record) -> QueryId {
let query_id = self.next_query_id();
let _ = self.cmd_tx.send(KademliaCommand::PutRecord { record, query_id }).await;

Expand All @@ -277,12 +266,10 @@ impl KademliaHandle {
/// Store record to DHT to the given peers.
pub async fn put_record_to_peers(
&mut self,
mut record: Record,
record: Record,
peers: Vec<PeerId>,
update_local_store: bool,
) -> QueryId {
record.expires = record.expires.or_else(|| Some(Instant::now() + self.record_ttl));

let query_id = self.next_query_id();
let _ = self
.cmd_tx
Expand Down Expand Up @@ -314,9 +301,7 @@ impl KademliaHandle {

/// Store the record in the local store. Used in combination with
/// [`IncomingRecordValidationMode::Manual`].
pub async fn store_record(&mut self, mut record: Record) {
record.expires = record.expires.or_else(|| Some(Instant::now() + self.record_ttl));

pub async fn store_record(&mut self, record: Record) {
let _ = self.cmd_tx.send(KademliaCommand::StoreRecord { record }).await;
}

Expand All @@ -337,9 +322,7 @@ impl KademliaHandle {
}

/// Try to initiate `PUT_VALUE` query and if the channel is clogged, return an error.
pub fn try_put_record(&mut self, mut record: Record) -> Result<QueryId, ()> {
record.expires = record.expires.or_else(|| Some(Instant::now() + self.record_ttl));

pub fn try_put_record(&mut self, record: Record) -> Result<QueryId, ()> {
let query_id = self.next_query_id();
self.cmd_tx
.try_send(KademliaCommand::PutRecord { record, query_id })
Expand All @@ -351,12 +334,10 @@ impl KademliaHandle {
/// return an error.
pub fn try_put_record_to_peers(
&mut self,
mut record: Record,
record: Record,
peers: Vec<PeerId>,
update_local_store: bool,
) -> Result<QueryId, ()> {
record.expires = record.expires.or_else(|| Some(Instant::now() + self.record_ttl));

let query_id = self.next_query_id();
self.cmd_tx
.try_send(KademliaCommand::PutRecordToPeers {
Expand Down Expand Up @@ -384,9 +365,7 @@ impl KademliaHandle {

/// Try to store the record in the local store, and if the channel is clogged, return an error.
/// Used in combination with [`IncomingRecordValidationMode::Manual`].
pub fn try_store_record(&mut self, mut record: Record) -> Result<(), ()> {
record.expires = record.expires.or_else(|| Some(Instant::now() + self.record_ttl));

pub fn try_store_record(&mut self, record: Record) -> Result<(), ()> {
self.cmd_tx.try_send(KademliaCommand::StoreRecord { record }).map_err(|_| ())
}
}
Expand Down
114 changes: 97 additions & 17 deletions src/protocol/libp2p/kademlia/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::protocol::libp2p::kademlia::{
record::{Key as RecordKey, Record},
schema,
types::KademliaPeer,
use crate::{
protocol::libp2p::kademlia::{
record::{Key as RecordKey, Record},
schema,
types::KademliaPeer,
},
PeerId,
};

use bytes::{Bytes, BytesMut};
use prost::Message;
use std::time::{Duration, Instant};

/// Logging target for the file.
const LOG_TARGET: &str = "litep2p::ipfs::kademlia::message";
Expand Down Expand Up @@ -78,16 +82,11 @@ impl KademliaMessage {
}

/// Create `PUT_VALUE` message for `record`.
// TODO: set ttl
pub fn put_value(record: Record) -> Bytes {
let message = schema::kademlia::Message {
key: record.key.clone().into(),
r#type: schema::kademlia::MessageType::PutValue.into(),
record: Some(schema::kademlia::Record {
key: record.key.into(),
value: record.value,
..Default::default()
}),
record: Some(record_to_schema(record)),
cluster_level_raw: 10,
..Default::default()
};
Expand Down Expand Up @@ -140,11 +139,7 @@ impl KademliaMessage {
cluster_level_raw: 10,
r#type: schema::kademlia::MessageType::GetValue.into(),
closer_peers: peers.iter().map(|peer| peer.into()).collect(),
record: record.map(|record| schema::kademlia::Record {
key: record.key.to_vec(),
value: record.value,
..Default::default()
}),
record: record.map(record_to_schema),
..Default::default()
};

Expand Down Expand Up @@ -174,7 +169,7 @@ impl KademliaMessage {
let record = message.record?;

Some(Self::PutValue {
record: Record::new(record.key, record.value),
record: record_from_schema(record)?,
})
}
1 => {
Expand All @@ -185,9 +180,15 @@ impl KademliaMessage {
false => Some(RecordKey::from(message.key.clone())),
};

let record = if let Some(record) = message.record {
Some(record_from_schema(record)?)
} else {
None
};

Some(Self::GetRecord {
key,
record: message.record.map(|record| Record::new(record.key, record.value)),
record,
peers: message
.closer_peers
.iter()
Expand All @@ -207,3 +208,82 @@ impl KademliaMessage {
}
}
}

fn record_to_schema(record: Record) -> schema::kademlia::Record {
schema::kademlia::Record {
key: record.key.into(),
value: record.value,
time_received: String::new(),
publisher: record.publisher.map(|peer_id| peer_id.to_bytes()).unwrap_or_default(),
ttl: record
.expires
.map(|expires| {
let now = Instant::now();
if expires > now {
u32::try_from((expires - now).as_secs()).unwrap_or(u32::MAX)
} else {
1 // because 0 means "does not expire"
}
})
.unwrap_or(0),
}
}

fn record_from_schema(record: schema::kademlia::Record) -> Option<Record> {
Some(Record {
key: record.key.into(),
value: record.value,
publisher: if !record.publisher.is_empty() {
Some(PeerId::from_bytes(&record.publisher).ok()?)
} else {
None
},
expires: if record.ttl > 0 {
Some(Instant::now() + Duration::from_secs(record.ttl as u64))
} else {
None
},
})
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn non_empty_publisher_and_ttl_are_preserved() {
let expires = Instant::now() + Duration::from_secs(3600);

let record = Record {
key: vec![1, 2, 3].into(),
value: vec![17],
publisher: Some(PeerId::random()),
expires: Some(expires),
};

let got_record = record_from_schema(record_to_schema(record.clone())).unwrap();

assert_eq!(got_record.key, record.key);
assert_eq!(got_record.value, record.value);
assert_eq!(got_record.publisher, record.publisher);

// Check that the expiration time is sane.
let got_expires = got_record.expires.unwrap();
assert!(got_expires - expires >= Duration::ZERO);
assert!(got_expires - expires < Duration::from_secs(10));
}

#[test]
fn empty_publisher_and_ttl_are_preserved() {
let record = Record {
key: vec![1, 2, 3].into(),
value: vec![17],
publisher: None,
expires: None,
};

let got_record = record_from_schema(record_to_schema(record.clone())).unwrap();

assert_eq!(got_record, record);
}
}
32 changes: 26 additions & 6 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ use futures::StreamExt;
use multiaddr::Multiaddr;
use tokio::sync::mpsc::{Receiver, Sender};

use std::collections::{hash_map::Entry, HashMap};
use std::{
collections::{hash_map::Entry, HashMap},
time::{Duration, Instant},
};

pub use self::handle::RecordsType;
pub use config::{Config, ConfigBuilder};
Expand Down Expand Up @@ -115,7 +118,7 @@ pub(crate) struct Kademlia {
service: TransportService,

/// Local Kademlia key.
_local_key: Key<PeerId>,
local_key: Key<PeerId>,

/// Connected peers,
peers: HashMap<PeerId, PeerContext>,
Expand Down Expand Up @@ -147,6 +150,9 @@ pub(crate) struct Kademlia {
/// Incoming records validation mode.
validation_mode: IncomingRecordValidationMode,

/// Default record TTL.
record_ttl: Duration,

/// Query engine.
engine: QueryEngine,

Expand Down Expand Up @@ -175,12 +181,13 @@ impl Kademlia {
cmd_rx: config.cmd_rx,
store: MemoryStore::new(),
event_tx: config.event_tx,
_local_key: local_key,
local_key,
pending_dials: HashMap::new(),
executor: QueryExecutor::new(),
pending_substreams: HashMap::new(),
update_mode: config.update_mode,
validation_mode: config.validation_mode,
record_ttl: config.record_ttl,
replication_factor: config.replication_factor,
engine: QueryEngine::new(local_peer_id, config.replication_factor, PARALLELISM_FACTOR),
}
Expand Down Expand Up @@ -775,9 +782,15 @@ impl Kademlia {
self.routing_table.closest(Key::from(peer), self.replication_factor).into()
);
}
Some(KademliaCommand::PutRecord { record, query_id }) => {
Some(KademliaCommand::PutRecord { mut record, query_id }) => {
tracing::debug!(target: LOG_TARGET, ?query_id, key = ?record.key, "store record to DHT");

// For `PUT_VALUE` requests originating locally we are always the publisher.
record.publisher = Some(self.local_key.clone().into_preimage());

// Make sure TTL is set.
record.expires = record.expires.or_else(|| Some(Instant::now() + self.record_ttl));

let key = Key::new(record.key.clone());

self.store.put(record.clone());
Expand All @@ -788,9 +801,12 @@ impl Kademlia {
self.routing_table.closest(key, self.replication_factor).into(),
);
}
Some(KademliaCommand::PutRecordToPeers { record, query_id, peers, update_local_store }) => {
Some(KademliaCommand::PutRecordToPeers { mut record, query_id, peers, update_local_store }) => {
tracing::debug!(target: LOG_TARGET, ?query_id, key = ?record.key, "store record to DHT to specified peers");

// Make sure TTL is set.
record.expires = record.expires.or_else(|| Some(Instant::now() + self.record_ttl));

if update_local_store {
self.store.put(record.clone());
}
Expand Down Expand Up @@ -854,13 +870,16 @@ impl Kademlia {
self.service.add_known_address(&peer, addresses.into_iter());

}
Some(KademliaCommand::StoreRecord { record }) => {
Some(KademliaCommand::StoreRecord { mut record }) => {
tracing::debug!(
target: LOG_TARGET,
key = ?record.key,
"store record in local store",
);

// Make sure TTL is set.
record.expires = record.expires.or_else(|| Some(Instant::now() + self.record_ttl));

self.store.put(record);
}
None => return Err(Error::EssentialTaskClosed),
Expand Down Expand Up @@ -914,6 +933,7 @@ mod tests {
replication_factor: 20usize,
update_mode: RoutingTableUpdateMode::Automatic,
validation_mode: IncomingRecordValidationMode::Automatic,
record_ttl: Duration::from_secs(36 * 60 * 60),
event_tx,
cmd_rx,
};
Expand Down
Loading

0 comments on commit fd02c1f

Please sign in to comment.