Skip to content

Commit

Permalink
Move volume and path monitor out of request handling code
Browse files Browse the repository at this point in the history
  • Loading branch information
dlon committed Sep 2, 2024
1 parent 361ef7e commit 70c409d
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 41 deletions.
50 changes: 48 additions & 2 deletions talpid-core/src/split_tunnel/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ mod volume_monitor;
mod windows;

use crate::{tunnel::TunnelMetadata, tunnel_state_machine::TunnelCommand};
use driver::DeviceHandle;
use futures::channel::{mpsc, oneshot};
use path_monitor::PathMonitor;
use request::{Request, RequestDetails};
use std::{
collections::HashMap,
ffi::OsStr,
ffi::{OsStr, OsString},
io,
net::{IpAddr, Ipv4Addr, Ipv6Addr},
path::PathBuf,
Expand All @@ -27,6 +29,7 @@ use talpid_windows::{
net::{get_ip_address_for_interface, AddressFamily},
sync::Event,
};
use volume_monitor::VolumeMonitor;

const RESERVED_IP_V4: Ipv4Addr = Ipv4Addr::new(192, 0, 2, 123);

Expand Down Expand Up @@ -138,13 +141,38 @@ impl SplitTunnel {
) -> Result<Self, Error> {
let excluded_processes = Arc::new(RwLock::new(HashMap::new()));

let (refresh_paths_tx, refresh_paths_rx) = sync_mpsc::channel();

let path_monitor =
PathMonitor::spawn(refresh_paths_tx.clone()).map_err(Error::StartPathMonitor)?;

let monitored_paths = Arc::new(Mutex::new(vec![]));
let volume_monitor = VolumeMonitor::spawn(
path_monitor.clone(),
refresh_paths_tx,
monitored_paths.clone(),
volume_update_rx,
);

let (request_tx, handle) = request::spawn_request_thread(
resource_dir,
daemon_tx,
volume_update_rx,
path_monitor,
volume_monitor,
monitored_paths.clone(),
excluded_processes.clone(),
)?;

let handle_copy = Arc::downgrade(&handle);
std::thread::spawn(move || {
while let Ok(()) = refresh_paths_rx.recv() {
let Some(handle) = handle_copy.upgrade() else {
return;
};
Self::handle_volume_monitor_update(&handle, &monitored_paths);
}
});

let (event_thread, quit_event) = event::spawn_listener(handle, excluded_processes.clone())
.map_err(Error::EventThreadError)?;

Expand All @@ -160,6 +188,24 @@ impl SplitTunnel {
})
}

fn handle_volume_monitor_update(
handle: &DeviceHandle,
monitored_paths: &Arc<Mutex<Vec<OsString>>>,
) {
let paths = monitored_paths.lock().unwrap();
if paths.len() == 0 {
return;
}

log::debug!("Re-resolving excluded paths");
if let Err(error) = handle.set_config(&paths) {
log::error!(
"{}",
error.display_chain_with_msg("Failed to update excluded paths")
);
}
}

fn send_request(&self, request: RequestDetails) -> Result<(), Error> {
Self::send_request_inner(&self.request_tx, request)
}
Expand Down
44 changes: 5 additions & 39 deletions talpid-core/src/split_tunnel/windows/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@ use std::{
use talpid_types::{split_tunnel::ExcludedProcess, tunnel::ErrorStateCause, ErrorExt};

use super::{
driver::DeviceHandle,
path_monitor::{PathMonitor, PathMonitorHandle},
service,
volume_monitor::VolumeMonitor,
Error, InterfaceAddresses,
driver::DeviceHandle, path_monitor::PathMonitorHandle, service,
volume_monitor::VolumeMonitorHandle, Error, InterfaceAddresses,
};

const INIT_TIMEOUT: Duration = Duration::from_secs(5);
Expand Down Expand Up @@ -79,25 +76,14 @@ impl Request {
pub fn spawn_request_thread(
resource_dir: PathBuf,
daemon_tx: Weak<mpsc::UnboundedSender<TunnelCommand>>,
volume_update_rx: mpsc::UnboundedReceiver<()>,
path_monitor: PathMonitorHandle,
volume_monitor: VolumeMonitorHandle,
monitored_paths: Arc<Mutex<Vec<OsString>>>,
excluded_processes: Arc<RwLock<HashMap<usize, ExcludedProcess>>>,
) -> Result<(sync_mpsc::Sender<Request>, Arc<DeviceHandle>), Error> {
let (tx, rx): (sync_mpsc::Sender<Request>, _) = sync_mpsc::channel();
let (init_tx, init_rx) = sync_mpsc::channel();

let monitored_paths = Arc::new(Mutex::new(vec![]));
let monitored_paths_copy = monitored_paths.clone();

let (monitor_tx, monitor_rx) = sync_mpsc::channel();

let path_monitor = PathMonitor::spawn(monitor_tx.clone()).map_err(Error::StartPathMonitor)?;
let volume_monitor = VolumeMonitor::spawn(
path_monitor.clone(),
monitor_tx,
monitored_paths.clone(),
volume_update_rx,
);

std::thread::spawn(move || {
// Ensure that the device driver service is running and that we have a handle to it
let handle = match setup_and_create_device(&resource_dir) {
Expand Down Expand Up @@ -146,26 +132,6 @@ pub fn spawn_request_thread(
.recv_timeout(INIT_TIMEOUT)
.map_err(|_| Error::RequestThreadStuck)??;

let handle_copy = handle.clone();

std::thread::spawn(move || {
while let Ok(()) = monitor_rx.recv() {
let paths = monitored_paths_copy.lock().unwrap();
let result = if paths.len() > 0 {
log::debug!("Re-resolving excluded paths");
handle_copy.set_config(&paths)
} else {
continue;
};
if let Err(error) = result {
log::error!(
"{}",
error.display_chain_with_msg("Failed to update excluded paths")
);
}
}
});

Ok((tx, handle))
}

Expand Down

0 comments on commit 70c409d

Please sign in to comment.