Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add connecting waiting task #40

Merged
merged 3 commits into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "chamomile"
version = "0.10.5"
version = "0.10.6"
authors = ["Dev <[email protected]>"]
readme = "README.md"
description = "Another P2P Library. Support IoT devices."
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[![crate](https://img.shields.io/badge/crates.io-v0.10.5-green.svg)](https://crates.io/crates/chamomile) [![doc](https://img.shields.io/badge/docs.rs-v0.10.5-blue.svg)](https://docs.rs/chamomile)
[![crate](https://img.shields.io/badge/crates.io-v0.10.6-green.svg)](https://crates.io/crates/chamomile) [![doc](https://img.shields.io/badge/docs.rs-v0.10.6-blue.svg)](https://docs.rs/chamomile)

# Chamomile
*Build a robust stable connection on p2p network*
Expand Down
3 changes: 3 additions & 0 deletions src/transports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ mod udt;
use crate::hole_punching::{Hole, DHT};
use crate::session_key::SessionKey;

/// waiting for connect time
pub const CONNECTING_WAITING: u64 = 60; // 60s

/// new a channel for send TransportSendMessage.
pub fn new_transport_send_channel() -> (Sender<TransportSendMessage>, Receiver<TransportSendMessage>)
{
Expand Down
66 changes: 61 additions & 5 deletions src/transports/quic.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
use serde::{Deserialize, Serialize};
use socket2::Socket;
use std::net::{SocketAddr, UdpSocket};
use std::{sync::Arc, time::Duration};
use std::{
collections::HashMap,
net::{SocketAddr, UdpSocket},
sync::Arc,
time::{Duration, Instant},
};
use structopt::StructOpt;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::{io::Result, join, select, task::JoinHandle};
use tokio::{io::Result, join, select, sync::RwLock, task::JoinHandle};

use crate::session_key::SessionKey;

use super::{
new_endpoint_channel, EndpointMessage, RemotePublic, TransportRecvMessage, TransportSendMessage,
new_endpoint_channel, EndpointMessage, RemotePublic, TransportRecvMessage,
TransportSendMessage, CONNECTING_WAITING,
};

const DOMAIN: &str = "chamomile.quic";
Expand Down Expand Up @@ -59,6 +64,7 @@ pub async fn start(
self_receiver,
OutType::DHT(out_send.clone(), self_sender, out_receiver),
None,
None,
));
}
}
Expand Down Expand Up @@ -99,6 +105,7 @@ async fn dht_connect_to(
out_send: Sender<TransportRecvMessage>,
remote_pk: RemotePublic,
session_key: SessionKey,
connectiongs: Arc<RwLock<HashMap<SocketAddr, Instant>>>,
) -> Result<()> {
let conn = connect_to(connect, remote_pk).await?;
let (self_sender, self_receiver) = new_endpoint_channel();
Expand All @@ -110,6 +117,7 @@ async fn dht_connect_to(
self_receiver,
OutType::DHT(out_send, self_sender, out_receiver),
Some(session_key),
Some(connectiongs),
)
.await
}
Expand All @@ -119,9 +127,20 @@ async fn stable_connect_to(
out_sender: Sender<EndpointMessage>,
self_receiver: Receiver<EndpointMessage>,
remote_pk: RemotePublic,
connectiongs: Arc<RwLock<HashMap<SocketAddr, Instant>>>,
) -> Result<()> {
match connect_to(connect, remote_pk).await {
Ok(conn) => process_stream(conn, out_sender, self_receiver, OutType::Stable, None).await,
Ok(conn) => {
process_stream(
conn,
out_sender,
self_receiver,
OutType::Stable,
None,
Some(connectiongs),
)
.await
}
Err(_) => {
let _ = out_sender.send(EndpointMessage::Close).await;
Ok(())
Expand All @@ -136,26 +155,55 @@ async fn run_self_recv(
out_send: Sender<TransportRecvMessage>,
task: JoinHandle<()>,
) -> Result<()> {
let connecting: Arc<RwLock<HashMap<SocketAddr, Instant>>> =
Arc::new(RwLock::new(HashMap::new()));

while let Some(m) = recv.recv().await {
match m {
TransportSendMessage::Connect(addr, remote_pk, session_key) => {
let read_lock = connecting.read().await;
if let Some(time) = read_lock.get(&addr) {
if time.elapsed().as_secs() < CONNECTING_WAITING {
drop(read_lock);
continue;
}
}
drop(read_lock);
let mut lock = connecting.write().await;
lock.insert(addr, Instant::now());
drop(lock);

let connect = endpoint.connect_with(client_cfg.clone(), addr, DOMAIN);
info!("QUIC dht connect to: {:?}", addr);
tokio::spawn(dht_connect_to(
connect,
out_send.clone(),
remote_pk,
session_key,
connecting.clone(),
));
}
TransportSendMessage::StableConnect(out_sender, self_receiver, addr, remote_pk) => {
let read_lock = connecting.read().await;
if let Some(time) = read_lock.get(&addr) {
if time.elapsed().as_secs() < CONNECTING_WAITING {
drop(read_lock);
continue;
}
}
drop(read_lock);
let mut lock = connecting.write().await;
lock.insert(addr, Instant::now());
drop(lock);

let connect = endpoint.connect_with(client_cfg.clone(), addr, DOMAIN);
info!("QUIC stable connect to: {:?}", addr);
tokio::spawn(stable_connect_to(
connect,
out_sender,
self_receiver,
remote_pk,
connecting.clone(),
));
}
TransportSendMessage::Stop => {
Expand Down Expand Up @@ -184,6 +232,7 @@ async fn process_stream(
mut self_receiver: Receiver<EndpointMessage>,
out_type: OutType,
has_session: Option<SessionKey>,
connectiongs: Option<Arc<RwLock<HashMap<SocketAddr, Instant>>>>,
) -> tokio::io::Result<()> {
let addr = conn.remote_address();

Expand Down Expand Up @@ -256,6 +305,13 @@ async fn process_stream(
}
}

if let Some(connectiongs) = connectiongs {
let mut lock = connectiongs.write().await;
lock.remove(&addr);
drop(lock);
drop(connectiongs);
}

let conn_send = conn.clone();
let a = async move {
loop {
Expand Down
50 changes: 47 additions & 3 deletions src/transports/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
use std::net::SocketAddr;
use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Instant};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt, Result},
join,
net::{TcpListener, TcpStream},
select,
sync::mpsc::{Receiver, Sender},
sync::{
mpsc::{Receiver, Sender},
RwLock,
},
task::JoinHandle,
};

use crate::session_key::SessionKey;

use super::{
new_endpoint_channel, EndpointMessage, RemotePublic, TransportRecvMessage, TransportSendMessage,
new_endpoint_channel, EndpointMessage, RemotePublic, TransportRecvMessage,
TransportSendMessage, CONNECTING_WAITING,
};

/// Init and run a TcpEndpoint object.
Expand Down Expand Up @@ -56,6 +60,7 @@ async fn run_listen(listener: TcpListener, out_send: Sender<TransportRecvMessage
self_receiver,
OutType::DHT(out_send.clone(), self_sender, out_receiver),
None,
None,
));
}
}
Expand All @@ -65,9 +70,25 @@ async fn run_self_recv(
out_send: Sender<TransportRecvMessage>,
task: Option<JoinHandle<Result<()>>>,
) -> Result<()> {
let connecting: Arc<RwLock<HashMap<SocketAddr, Instant>>> =
Arc::new(RwLock::new(HashMap::new()));

while let Some(m) = recv.recv().await {
match m {
TransportSendMessage::Connect(addr, remote_pk, session_key) => {
let read_lock = connecting.read().await;
if let Some(time) = read_lock.get(&addr) {
if time.elapsed().as_secs() < CONNECTING_WAITING {
drop(read_lock);
continue;
}
}
drop(read_lock);
let mut lock = connecting.write().await;
lock.insert(addr, Instant::now());
drop(lock);
let new_connecting = connecting.clone();

let server_send = out_send.clone();
tokio::spawn(async move {
if let Ok(mut stream) = TcpStream::connect(addr).await {
Expand All @@ -85,6 +106,7 @@ async fn run_self_recv(
self_receiver,
OutType::DHT(server_send, self_sender, out_receiver),
Some(session_key),
Some(new_connecting),
)
.await;
} else {
Expand All @@ -93,6 +115,19 @@ async fn run_self_recv(
});
}
TransportSendMessage::StableConnect(out_sender, self_receiver, addr, remote_pk) => {
let read_lock = connecting.read().await;
if let Some(time) = read_lock.get(&addr) {
if time.elapsed().as_secs() < CONNECTING_WAITING {
drop(read_lock);
continue;
}
}
drop(read_lock);
let mut lock = connecting.write().await;
lock.insert(addr, Instant::now());
drop(lock);
let new_connecting = connecting.clone();

tokio::spawn(async move {
if let Ok(mut stream) = TcpStream::connect(addr).await {
info!("TCP stable connect to {:?}", addr);
Expand All @@ -106,6 +141,7 @@ async fn run_self_recv(
self_receiver,
OutType::Stable,
None,
Some(new_connecting),
)
.await;
} else {
Expand Down Expand Up @@ -141,6 +177,7 @@ async fn process_stream(
mut self_receiver: Receiver<EndpointMessage>,
out_type: OutType,
has_session: Option<SessionKey>,
connectiongs: Option<Arc<RwLock<HashMap<SocketAddr, Instant>>>>,
) -> Result<()> {
let addr = stream.peer_addr()?;
let (mut reader, mut writer) = stream.split();
Expand Down Expand Up @@ -195,6 +232,13 @@ async fn process_stream(

let remote_pk = handshake.unwrap(); // safe. checked.

if let Some(connectiongs) = connectiongs {
let mut lock = connectiongs.write().await;
lock.remove(&addr);
drop(lock);
drop(connectiongs);
}

match out_type {
OutType::Stable => {
out_sender
Expand Down
2 changes: 1 addition & 1 deletion types/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "chamomile_types"
version = "0.10.5"
version = "0.10.6"
authors = ["Dev <[email protected]>"]
readme = "README.md"
description = "Another P2P Library. Support IoT devices."
Expand Down
Loading