Skip to content

Commit

Permalink
feat: new torrent repo implementation using parking_lot Mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
josecelano committed Apr 16, 2024
1 parent 0fa396c commit 0058e72
Show file tree
Hide file tree
Showing 10 changed files with 240 additions and 28 deletions.
23 changes: 22 additions & 1 deletion packages/torrent-repository/benches/repository_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ mod helpers;
use criterion::{criterion_group, criterion_main, Criterion};
use torrust_tracker_torrent_repository::{
TorrentsDashMapMutexStd, TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, TorrentsRwLockTokio,
TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd, TorrentsSkipMapRwLockParkingLot,
TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexParkingLot, TorrentsSkipMapMutexStd,
TorrentsSkipMapRwLockParkingLot,
};

use crate::helpers::{asyn, sync};
Expand Down Expand Up @@ -49,6 +50,10 @@ fn add_one_torrent(c: &mut Criterion) {
b.iter_custom(sync::add_one_torrent::<TorrentsSkipMapMutexStd, _>);
});

group.bench_function("SkipMapMutexParkingLot", |b| {
b.iter_custom(sync::add_one_torrent::<TorrentsSkipMapMutexParkingLot, _>);
});

group.bench_function("SkipMapRwLockParkingLot", |b| {
b.iter_custom(sync::add_one_torrent::<TorrentsSkipMapRwLockParkingLot, _>);
});
Expand Down Expand Up @@ -106,6 +111,11 @@ fn add_multiple_torrents_in_parallel(c: &mut Criterion) {
.iter_custom(|iters| sync::add_multiple_torrents_in_parallel::<TorrentsSkipMapMutexStd, _>(&rt, iters, None));
});

group.bench_function("SkipMapMutexParkingLot", |b| {
b.to_async(&rt)
.iter_custom(|iters| sync::add_multiple_torrents_in_parallel::<TorrentsSkipMapMutexParkingLot, _>(&rt, iters, None));
});

group.bench_function("SkipMapRwLockParkingLot", |b| {
b.to_async(&rt)
.iter_custom(|iters| sync::add_multiple_torrents_in_parallel::<TorrentsSkipMapRwLockParkingLot, _>(&rt, iters, None));
Expand Down Expand Up @@ -165,6 +175,11 @@ fn update_one_torrent_in_parallel(c: &mut Criterion) {
.iter_custom(|iters| sync::update_one_torrent_in_parallel::<TorrentsSkipMapMutexStd, _>(&rt, iters, None));
});

group.bench_function("SkipMapMutexParkingLot", |b| {
b.to_async(&rt)
.iter_custom(|iters| sync::update_one_torrent_in_parallel::<TorrentsSkipMapMutexParkingLot, _>(&rt, iters, None));
});

group.bench_function("SkipMapRwLockParkingLot", |b| {
b.to_async(&rt)
.iter_custom(|iters| sync::update_one_torrent_in_parallel::<TorrentsSkipMapRwLockParkingLot, _>(&rt, iters, None));
Expand Down Expand Up @@ -225,6 +240,12 @@ fn update_multiple_torrents_in_parallel(c: &mut Criterion) {
.iter_custom(|iters| sync::update_multiple_torrents_in_parallel::<TorrentsSkipMapMutexStd, _>(&rt, iters, None));
});

group.bench_function("SkipMapMutexParkingLot", |b| {
b.to_async(&rt).iter_custom(|iters| {
sync::update_multiple_torrents_in_parallel::<TorrentsSkipMapMutexParkingLot, _>(&rt, iters, None)
});
});

group.bench_function("SkipMapRwLockParkingLot", |b| {
b.to_async(&rt).iter_custom(|iters| {
sync::update_multiple_torrents_in_parallel::<TorrentsSkipMapRwLockParkingLot, _>(&rt, iters, None)
Expand Down
1 change: 1 addition & 0 deletions packages/torrent-repository/src/entry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub mod mutex_parking_lot;
pub mod mutex_std;
pub mod mutex_tokio;
pub mod peer_list;
pub mod rw_lock_parking_lot;
pub mod single;

pub trait Entry {
Expand Down
24 changes: 12 additions & 12 deletions packages/torrent-repository/src/entry/mutex_parking_lot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,44 +6,44 @@ use torrust_tracker_primitives::swarm_metadata::SwarmMetadata;
use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch};

use super::{Entry, EntrySync};
use crate::{EntryRwLockParkingLot, EntrySingle};
use crate::{EntryMutexParkingLot, EntrySingle};

impl EntrySync for EntryRwLockParkingLot {
impl EntrySync for EntryMutexParkingLot {
fn get_swarm_metadata(&self) -> SwarmMetadata {
self.read().get_swarm_metadata()
self.lock().get_swarm_metadata()
}

fn is_good(&self, policy: &TrackerPolicy) -> bool {
self.read().is_good(policy)
self.lock().is_good(policy)
}

fn peers_is_empty(&self) -> bool {
self.read().peers_is_empty()
self.lock().peers_is_empty()
}

fn get_peers_len(&self) -> usize {
self.read().get_peers_len()
self.lock().get_peers_len()
}

fn get_peers(&self, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
self.read().get_peers(limit)
self.lock().get_peers(limit)
}

fn get_peers_for_client(&self, client: &SocketAddr, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
self.read().get_peers_for_client(client, limit)
self.lock().get_peers_for_client(client, limit)
}

fn upsert_peer(&self, peer: &peer::Peer) -> bool {
self.write().upsert_peer(peer)
self.lock().upsert_peer(peer)
}

fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) {
self.write().remove_inactive_peers(current_cutoff);
self.lock().remove_inactive_peers(current_cutoff);
}
}

impl From<EntrySingle> for EntryRwLockParkingLot {
impl From<EntrySingle> for EntryMutexParkingLot {
fn from(entry: EntrySingle) -> Self {
Arc::new(parking_lot::RwLock::new(entry))
Arc::new(parking_lot::Mutex::new(entry))
}
}
49 changes: 49 additions & 0 deletions packages/torrent-repository/src/entry/rw_lock_parking_lot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use std::net::SocketAddr;
use std::sync::Arc;

use torrust_tracker_configuration::TrackerPolicy;
use torrust_tracker_primitives::swarm_metadata::SwarmMetadata;
use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch};

use super::{Entry, EntrySync};
use crate::{EntryRwLockParkingLot, EntrySingle};

impl EntrySync for EntryRwLockParkingLot {
fn get_swarm_metadata(&self) -> SwarmMetadata {
self.read().get_swarm_metadata()
}

fn is_good(&self, policy: &TrackerPolicy) -> bool {
self.read().is_good(policy)
}

fn peers_is_empty(&self) -> bool {
self.read().peers_is_empty()
}

fn get_peers_len(&self) -> usize {
self.read().get_peers_len()
}

fn get_peers(&self, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
self.read().get_peers(limit)
}

fn get_peers_for_client(&self, client: &SocketAddr, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
self.read().get_peers_for_client(client, limit)
}

fn upsert_peer(&self, peer: &peer::Peer) -> bool {
self.write().upsert_peer(peer)
}

fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) {
self.write().remove_inactive_peers(current_cutoff);
}
}

impl From<EntrySingle> for EntryRwLockParkingLot {
fn from(entry: EntrySingle) -> Self {
Arc::new(parking_lot::RwLock::new(entry))
}
}
2 changes: 2 additions & 0 deletions packages/torrent-repository/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub mod repository;
pub type EntrySingle = entry::Torrent;
pub type EntryMutexStd = Arc<std::sync::Mutex<entry::Torrent>>;
pub type EntryMutexTokio = Arc<tokio::sync::Mutex<entry::Torrent>>;
pub type EntryMutexParkingLot = Arc<parking_lot::Mutex<entry::Torrent>>;
pub type EntryRwLockParkingLot = Arc<parking_lot::RwLock<entry::Torrent>>;

// Repos
Expand All @@ -26,6 +27,7 @@ pub type TorrentsRwLockTokioMutexStd = RwLockTokio<EntryMutexStd>;
pub type TorrentsRwLockTokioMutexTokio = RwLockTokio<EntryMutexTokio>;

pub type TorrentsSkipMapMutexStd = CrossbeamSkipList<EntryMutexStd>;
pub type TorrentsSkipMapMutexParkingLot = CrossbeamSkipList<EntryMutexParkingLot>;
pub type TorrentsSkipMapRwLockParkingLot = CrossbeamSkipList<EntryRwLockParkingLot>;

pub type TorrentsDashMapMutexStd = XacrimonDashMap<EntryMutexStd>;
Expand Down
93 changes: 92 additions & 1 deletion packages/torrent-repository/src/repository/skip_map_mutex_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent
use super::Repository;
use crate::entry::peer_list::PeerList;
use crate::entry::{Entry, EntrySync};
use crate::{EntryMutexStd, EntryRwLockParkingLot, EntrySingle};
use crate::{EntryMutexParkingLot, EntryMutexStd, EntryRwLockParkingLot, EntrySingle};

#[derive(Default, Debug)]
pub struct CrossbeamSkipList<T> {
Expand Down Expand Up @@ -199,3 +199,94 @@ where
}
}
}

impl Repository<EntryMutexParkingLot> for CrossbeamSkipList<EntryMutexParkingLot>
where
EntryMutexParkingLot: EntrySync,
EntrySingle: Entry,
{
fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) {
let entry = self.torrents.get_or_insert(*info_hash, Arc::default());
entry.value().upsert_peer(peer);
}

fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option<SwarmMetadata> {
self.torrents.get(info_hash).map(|entry| entry.value().get_swarm_metadata())
}

fn get(&self, key: &InfoHash) -> Option<EntryMutexParkingLot> {
let maybe_entry = self.torrents.get(key);
maybe_entry.map(|entry| entry.value().clone())
}

fn get_metrics(&self) -> TorrentsMetrics {
let mut metrics = TorrentsMetrics::default();

for entry in &self.torrents {
let stats = entry.value().lock().get_swarm_metadata();
metrics.complete += u64::from(stats.complete);
metrics.downloaded += u64::from(stats.downloaded);
metrics.incomplete += u64::from(stats.incomplete);
metrics.torrents += 1;
}

metrics
}

fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexParkingLot)> {
match pagination {
Some(pagination) => self
.torrents
.iter()
.skip(pagination.offset as usize)
.take(pagination.limit as usize)
.map(|entry| (*entry.key(), entry.value().clone()))
.collect(),
None => self
.torrents
.iter()
.map(|entry| (*entry.key(), entry.value().clone()))
.collect(),
}
}

fn import_persistent(&self, persistent_torrents: &PersistentTorrents) {
for (info_hash, completed) in persistent_torrents {
if self.torrents.contains_key(info_hash) {
continue;
}

let entry = EntryMutexParkingLot::new(
EntrySingle {
swarm: PeerList::default(),
downloaded: *completed,
}
.into(),
);

// Since SkipMap is lock-free the torrent could have been inserted
// after checking if it exists.
self.torrents.get_or_insert(*info_hash, entry);
}
}

fn remove(&self, key: &InfoHash) -> Option<EntryMutexParkingLot> {
self.torrents.remove(key).map(|entry| entry.value().clone())
}

fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) {
for entry in &self.torrents {
entry.value().remove_inactive_peers(current_cutoff);
}
}

fn remove_peerless_torrents(&self, policy: &TrackerPolicy) {
for entry in &self.torrents {
if entry.value().is_good(policy) {
continue;
}

entry.remove();
}
}
}
21 changes: 19 additions & 2 deletions packages/torrent-repository/tests/common/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent
use torrust_tracker_torrent_repository::repository::{Repository as _, RepositoryAsync as _};
use torrust_tracker_torrent_repository::{
EntrySingle, TorrentsDashMapMutexStd, TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio,
TorrentsRwLockTokio, TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd,
TorrentsSkipMapRwLockParkingLot,
TorrentsRwLockTokio, TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexParkingLot,
TorrentsSkipMapMutexStd, TorrentsSkipMapRwLockParkingLot,
};

#[derive(Debug)]
Expand All @@ -20,6 +20,7 @@ pub(crate) enum Repo {
RwLockTokioMutexStd(TorrentsRwLockTokioMutexStd),
RwLockTokioMutexTokio(TorrentsRwLockTokioMutexTokio),
SkipMapMutexStd(TorrentsSkipMapMutexStd),
SkipMapMutexParkingLot(TorrentsSkipMapMutexParkingLot),
SkipMapRwLockParkingLot(TorrentsSkipMapRwLockParkingLot),
DashMapMutexStd(TorrentsDashMapMutexStd),
}
Expand All @@ -34,6 +35,7 @@ impl Repo {
Repo::RwLockTokioMutexStd(repo) => repo.upsert_peer(info_hash, peer).await,
Repo::RwLockTokioMutexTokio(repo) => repo.upsert_peer(info_hash, peer).await,
Repo::SkipMapMutexStd(repo) => repo.upsert_peer(info_hash, peer),
Repo::SkipMapMutexParkingLot(repo) => repo.upsert_peer(info_hash, peer),
Repo::SkipMapRwLockParkingLot(repo) => repo.upsert_peer(info_hash, peer),
Repo::DashMapMutexStd(repo) => repo.upsert_peer(info_hash, peer),
}
Expand All @@ -48,6 +50,7 @@ impl Repo {
Repo::RwLockTokioMutexStd(repo) => repo.get_swarm_metadata(info_hash).await,
Repo::RwLockTokioMutexTokio(repo) => repo.get_swarm_metadata(info_hash).await,
Repo::SkipMapMutexStd(repo) => repo.get_swarm_metadata(info_hash),
Repo::SkipMapMutexParkingLot(repo) => repo.get_swarm_metadata(info_hash),
Repo::SkipMapRwLockParkingLot(repo) => repo.get_swarm_metadata(info_hash),
Repo::DashMapMutexStd(repo) => repo.get_swarm_metadata(info_hash),
}
Expand All @@ -62,6 +65,7 @@ impl Repo {
Repo::RwLockTokioMutexStd(repo) => Some(repo.get(key).await?.lock().unwrap().clone()),
Repo::RwLockTokioMutexTokio(repo) => Some(repo.get(key).await?.lock().await.clone()),
Repo::SkipMapMutexStd(repo) => Some(repo.get(key)?.lock().unwrap().clone()),
Repo::SkipMapMutexParkingLot(repo) => Some(repo.get(key)?.lock().clone()),
Repo::SkipMapRwLockParkingLot(repo) => Some(repo.get(key)?.read().clone()),
Repo::DashMapMutexStd(repo) => Some(repo.get(key)?.lock().unwrap().clone()),
}
Expand All @@ -76,6 +80,7 @@ impl Repo {
Repo::RwLockTokioMutexStd(repo) => repo.get_metrics().await,
Repo::RwLockTokioMutexTokio(repo) => repo.get_metrics().await,
Repo::SkipMapMutexStd(repo) => repo.get_metrics(),
Repo::SkipMapMutexParkingLot(repo) => repo.get_metrics(),
Repo::SkipMapRwLockParkingLot(repo) => repo.get_metrics(),
Repo::DashMapMutexStd(repo) => repo.get_metrics(),
}
Expand Down Expand Up @@ -117,6 +122,11 @@ impl Repo {
.iter()
.map(|(i, t)| (*i, t.lock().expect("it should get a lock").clone()))
.collect(),
Repo::SkipMapMutexParkingLot(repo) => repo
.get_paginated(pagination)
.iter()
.map(|(i, t)| (*i, t.lock().clone()))
.collect(),
Repo::SkipMapRwLockParkingLot(repo) => repo
.get_paginated(pagination)
.iter()
Expand All @@ -139,6 +149,7 @@ impl Repo {
Repo::RwLockTokioMutexStd(repo) => repo.import_persistent(persistent_torrents).await,
Repo::RwLockTokioMutexTokio(repo) => repo.import_persistent(persistent_torrents).await,
Repo::SkipMapMutexStd(repo) => repo.import_persistent(persistent_torrents),
Repo::SkipMapMutexParkingLot(repo) => repo.import_persistent(persistent_torrents),
Repo::SkipMapRwLockParkingLot(repo) => repo.import_persistent(persistent_torrents),
Repo::DashMapMutexStd(repo) => repo.import_persistent(persistent_torrents),
}
Expand All @@ -153,6 +164,7 @@ impl Repo {
Repo::RwLockTokioMutexStd(repo) => Some(repo.remove(key).await?.lock().unwrap().clone()),
Repo::RwLockTokioMutexTokio(repo) => Some(repo.remove(key).await?.lock().await.clone()),
Repo::SkipMapMutexStd(repo) => Some(repo.remove(key)?.lock().unwrap().clone()),
Repo::SkipMapMutexParkingLot(repo) => Some(repo.remove(key)?.lock().clone()),
Repo::SkipMapRwLockParkingLot(repo) => Some(repo.remove(key)?.write().clone()),
Repo::DashMapMutexStd(repo) => Some(repo.remove(key)?.lock().unwrap().clone()),
}
Expand All @@ -167,6 +179,7 @@ impl Repo {
Repo::RwLockTokioMutexStd(repo) => repo.remove_inactive_peers(current_cutoff).await,
Repo::RwLockTokioMutexTokio(repo) => repo.remove_inactive_peers(current_cutoff).await,
Repo::SkipMapMutexStd(repo) => repo.remove_inactive_peers(current_cutoff),
Repo::SkipMapMutexParkingLot(repo) => repo.remove_inactive_peers(current_cutoff),
Repo::SkipMapRwLockParkingLot(repo) => repo.remove_inactive_peers(current_cutoff),
Repo::DashMapMutexStd(repo) => repo.remove_inactive_peers(current_cutoff),
}
Expand All @@ -181,6 +194,7 @@ impl Repo {
Repo::RwLockTokioMutexStd(repo) => repo.remove_peerless_torrents(policy).await,
Repo::RwLockTokioMutexTokio(repo) => repo.remove_peerless_torrents(policy).await,
Repo::SkipMapMutexStd(repo) => repo.remove_peerless_torrents(policy),
Repo::SkipMapMutexParkingLot(repo) => repo.remove_peerless_torrents(policy),
Repo::SkipMapRwLockParkingLot(repo) => repo.remove_peerless_torrents(policy),
Repo::DashMapMutexStd(repo) => repo.remove_peerless_torrents(policy),
}
Expand Down Expand Up @@ -209,6 +223,9 @@ impl Repo {
Repo::SkipMapMutexStd(repo) => {
repo.torrents.insert(*info_hash, torrent.into());
}
Repo::SkipMapMutexParkingLot(repo) => {
repo.torrents.insert(*info_hash, torrent.into());
}
Repo::SkipMapRwLockParkingLot(repo) => {
repo.torrents.insert(*info_hash, torrent.into());
}
Expand Down
Loading

0 comments on commit 0058e72

Please sign in to comment.