diff --git a/Cargo.lock b/Cargo.lock index 02f364b68..c68667a6b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3460,6 +3460,7 @@ dependencies = [ "reqwest 0.12.5", "reqwest-middleware", "reqwest-retry", + "rlimit", "rstest", "self-replace", "serde", @@ -4732,6 +4733,15 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "rlimit" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3560f70f30a0f16d11d01ed078a07740fe6b489667abc7c7b029155d9f21c3d8" +dependencies = [ + "libc", +] + [[package]] name = "rmp" version = "0.8.14" diff --git a/Cargo.toml b/Cargo.toml index 847979095..800fb40ac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,6 +71,7 @@ requirements-txt = { git = "https://github.com/astral-sh/uv", tag = "0.2.18" } reqwest = { version = "0.12.4", default-features = false } reqwest-middleware = "0.3.0" reqwest-retry = "0.5.0" +rlimit = "0.10.1" rstest = "0.19.0" self-replace = "1.3.7" serde = "1.0.198" @@ -246,6 +247,7 @@ reqwest = { workspace = true, features = [ ] } reqwest-middleware = { workspace = true } reqwest-retry = { workspace = true } +rlimit = { workspace = true } self-replace = { workspace = true } serde = { workspace = true } serde_ignored = { workspace = true } diff --git a/src/cli/add.rs b/src/cli/add.rs index 43d316fb2..6fe7b0e15 100644 --- a/src/cli/add.rs +++ b/src/cli/add.rs @@ -213,12 +213,13 @@ pub async fn execute(args: Args) -> miette::Result<()> { // Solve the updated project. let LockFileDerivedData { + project: _, // We don't need the project here lock_file, package_cache, uv_context, updated_conda_prefixes, updated_pypi_prefixes, - .. + io_concurrency_limit, } = UpdateContext::builder(&project) .with_lock_file(unlocked_lock_file) .with_no_install(prefix_update_config.no_install()) @@ -262,6 +263,7 @@ pub async fn execute(args: Args) -> miette::Result<()> { updated_conda_prefixes, updated_pypi_prefixes, uv_context, + io_concurrency_limit, }; if !prefix_update_config.no_lockfile_update { updated_lock_file.write_to_disk()?; diff --git a/src/cli/global/install.rs b/src/cli/global/install.rs index 926fd9f33..db54a22e0 100644 --- a/src/cli/global/install.rs +++ b/src/cli/global/install.rs @@ -8,6 +8,8 @@ use clap::Parser; use indexmap::IndexMap; use itertools::Itertools; use miette::IntoDiagnostic; +use pixi_config::{self, Config, ConfigCli}; +use pixi_progress::{await_in_progress, global_multi_progress}; use rattler::{ install::{DefaultProgressFormatter, IndicatifReporter, Installer}, package_cache::PackageCache, @@ -25,9 +27,7 @@ use super::common::{ channel_name_from_prefix, find_designated_package, get_client_and_sparse_repodata, load_package_records, BinDir, BinEnvDir, }; -use crate::{cli::has_specs::HasSpecs, prefix::Prefix}; -use pixi_config::{self, Config, ConfigCli}; -use pixi_progress::{await_in_progress, global_multi_progress}; +use crate::{cli::has_specs::HasSpecs, prefix::Prefix, rlimit::try_increase_rlimit_to_sensible}; /// Installs the defined package in a global accessible location. #[derive(Parser, Debug)] @@ -394,6 +394,8 @@ pub(super) async fn globally_install_package( authenticated_client: ClientWithMiddleware, platform: Platform, ) -> miette::Result<(PrefixRecord, Vec, bool)> { + try_increase_rlimit_to_sensible(); + // Create the binary environment prefix where we install or update the package let BinEnvDir(bin_prefix) = BinEnvDir::create(package_name).await?; let prefix = Prefix::new(bin_prefix); diff --git a/src/environment.rs b/src/environment.rs index 1d85b917a..7a994c7cc 100644 --- a/src/environment.rs +++ b/src/environment.rs @@ -1,36 +1,41 @@ -use crate::lock_file::UvResolutionContext; -use crate::{ - install_pypi, - lock_file::UpdateLockFileOptions, - prefix::Prefix, - project::{grouped_environment::GroupedEnvironment, Environment}, - Project, +use std::{ + collections::HashMap, + convert::identity, + io::ErrorKind, + path::{Path, PathBuf}, + sync::Arc, }; + use dialoguer::theme::ColorfulTheme; use distribution_types::{InstalledDist, Name}; use fancy_display::FancyDisplay; use miette::{IntoDiagnostic, WrapErr}; use pixi_consts::consts; -use pixi_progress::{await_in_progress, global_multi_progress}; - -use crate::project::HasProjectRef; use pixi_manifest::{EnvironmentName, FeaturesExt, SystemRequirements}; -use rattler::install::{DefaultProgressFormatter, IndicatifReporter, Installer}; +use pixi_progress::{await_in_progress, global_multi_progress}; use rattler::{ - install::{PythonInfo, Transaction}, + install::{DefaultProgressFormatter, IndicatifReporter, Installer, PythonInfo, Transaction}, package_cache::PackageCache, }; use rattler_conda_types::{Platform, PrefixRecord, RepoDataRecord}; use rattler_lock::{PypiIndexes, PypiPackageData, PypiPackageEnvironmentData}; use reqwest_middleware::ClientWithMiddleware; use serde::{Deserialize, Serialize}; -use std::convert::identity; -use std::path::PathBuf; -use std::{collections::HashMap, io::ErrorKind, path::Path}; +use tokio::sync::Semaphore; -/// Verify the location of the prefix folder is not changed so the applied prefix path is still valid. -/// Errors when there is a file system error or the path does not align with the defined prefix. -/// Returns false when the file is not present. +use crate::{ + install_pypi, + lock_file::{UpdateLockFileOptions, UvResolutionContext}, + prefix::Prefix, + project::{grouped_environment::GroupedEnvironment, Environment, HasProjectRef}, + rlimit::try_increase_rlimit_to_sensible, + Project, +}; + +/// Verify the location of the prefix folder is not changed so the applied +/// prefix path is still valid. Errors when there is a file system error or the +/// path does not align with the defined prefix. Returns false when the file is +/// not present. pub async fn verify_prefix_location_unchanged(environment_dir: &Path) -> miette::Result<()> { let prefix_file = environment_dir .join("conda-meta") @@ -164,8 +169,9 @@ pub(crate) struct EnvironmentFile { pub(crate) environment_name: String, pub(crate) pixi_version: String, } -/// Write information about the environment to a file in the environment directory. -/// This can be useful for other tools that only know the environment directory to find the original project. +/// Write information about the environment to a file in the environment +/// directory. This can be useful for other tools that only know the environment +/// directory to find the original project. pub fn write_environment_file( environment_dir: &Path, env_file: EnvironmentFile, @@ -241,13 +247,14 @@ impl LockFileUsage { } } -/// Returns the prefix associated with the given environment. If the prefix doesn't exist or is not -/// up-to-date it is updated. +/// Returns the prefix associated with the given environment. If the prefix +/// doesn't exist or is not up-to-date it is updated. /// -/// The `sparse_repo_data` is used when the lock-file is update. We pass it into this function to -/// make sure the data is not loaded twice since the repodata takes up a lot of memory and takes a -/// while to load. If `sparse_repo_data` is `None` it will be downloaded. If the lock-file is not -/// updated, the `sparse_repo_data` is ignored. +/// The `sparse_repo_data` is used when the lock-file is update. We pass it into +/// this function to make sure the data is not loaded twice since the repodata +/// takes up a lot of memory and takes a while to load. If `sparse_repo_data` is +/// `None` it will be downloaded. If the lock-file is not updated, the +/// `sparse_repo_data` is ignored. pub async fn get_up_to_date_prefix( environment: &Environment<'_>, lock_file_usage: LockFileUsage, @@ -298,8 +305,9 @@ pub async fn update_prefix_pypi( lock_file_dir: &Path, platform: Platform, ) -> miette::Result<()> { - // If we have changed interpreter, we need to uninstall all site-packages from the old interpreter - // We need to do this before the pypi prefix update, because that requires a python interpreter. + // If we have changed interpreter, we need to uninstall all site-packages from + // the old interpreter We need to do this before the pypi prefix update, + // because that requires a python interpreter. let python_info = match status { // If the python interpreter is removed, we need to uninstall all `pixi-uv` site-packages. // And we don't need to continue with the rest of the pypi prefix update. @@ -310,10 +318,11 @@ pub async fn update_prefix_pypi( } return Ok(()); } - // If the python interpreter is changed, we need to uninstall all site-packages from the old interpreter. - // And we continue the function to update the pypi packages. + // If the python interpreter is changed, we need to uninstall all site-packages from the old + // interpreter. And we continue the function to update the pypi packages. PythonStatus::Changed { old, new } => { - // In windows the site-packages path stays the same, so we don't need to uninstall the site-packages ourselves. + // In windows the site-packages path stays the same, so we don't need to + // uninstall the site-packages ourselves. if old.site_packages_path != new.site_packages_path { let site_packages_path = prefix.root().join(&old.site_packages_path); if site_packages_path.exists() { @@ -322,8 +331,9 @@ pub async fn update_prefix_pypi( } new } - // If the python interpreter is unchanged, and there are no pypi packages to install, we need to remove the site-packages. - // And we don't need to continue with the rest of the pypi prefix update. + // If the python interpreter is unchanged, and there are no pypi packages to install, we + // need to remove the site-packages. And we don't need to continue with the rest of + // the pypi prefix update. PythonStatus::Unchanged(info) | PythonStatus::Added { new: info } => { if pypi_records.is_empty() { let site_packages_path = prefix.root().join(&info.site_packages_path); @@ -364,9 +374,10 @@ pub async fn update_prefix_pypi( .await } -/// If the python interpreter is outdated, we need to uninstall all outdated site packages. -/// from the old interpreter. -/// TODO: optimize this by recording the installation of the site-packages to check if this is needed. +/// If the python interpreter is outdated, we need to uninstall all outdated +/// site packages. from the old interpreter. +/// TODO: optimize this by recording the installation of the site-packages to +/// check if this is needed. async fn uninstall_outdated_site_packages(site_packages: &Path) -> miette::Result<()> { // Check if the old interpreter is outdated let mut installed = vec![]; @@ -451,7 +462,8 @@ impl PythonStatus { } } - /// Returns the info of the current situation (e.g. after the transaction completed). + /// Returns the info of the current situation (e.g. after the transaction + /// completed). pub fn current_info(&self) -> Option<&PythonInfo> { match self { PythonStatus::Changed { new, .. } @@ -461,7 +473,8 @@ impl PythonStatus { } } - /// Returns the location of the python interpreter relative to the root of the prefix. + /// Returns the location of the python interpreter relative to the root of + /// the prefix. pub fn location(&self) -> Option<&Path> { Some(&self.current_info()?.path) } @@ -478,14 +491,18 @@ pub async fn update_prefix_conda( platform: Platform, progress_bar_message: &str, progress_bar_prefix: &str, + io_concurrency_limit: Arc, ) -> miette::Result { + // Try to increase the rlimit to a sensible value for installation. + try_increase_rlimit_to_sensible(); + // Execute the operations that are returned by the solver. let result = await_in_progress( format!("{progress_bar_prefix}{progress_bar_message}",), |pb| async { Installer::new() .with_download_client(authenticated_client) - .with_io_concurrency_limit(100) + .with_io_concurrency_semaphore(io_concurrency_limit) .with_execute_link_scripts(false) .with_installed_packages(installed_packages) .with_target_platform(platform) diff --git a/src/lib.rs b/src/lib.rs index 3f9ac8b82..6f135279e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,6 +13,7 @@ pub mod task; mod uv_reporter; mod repodata; +mod rlimit; pub use lock_file::load_lock_file; pub use lock_file::UpdateLockFileOptions; diff --git a/src/lock_file/update.rs b/src/lock_file/update.rs index 31c488811..31569dded 100644 --- a/src/lock_file/update.rs +++ b/src/lock_file/update.rs @@ -43,8 +43,8 @@ use crate::{ }, load_lock_file, lock_file::{ - self, update, OutdatedEnvironments, PypiRecord, PypiRecordsByName, RepoDataRecordsByName, - UvResolutionContext, + self, update, utils::IoConcurrencyLimit, OutdatedEnvironments, PypiRecord, + PypiRecordsByName, RepoDataRecordsByName, UvResolutionContext, }, prefix::Prefix, project::{ @@ -102,6 +102,9 @@ pub struct LockFileDerivedData<'p> { /// The cached uv context pub uv_context: Option, + + /// The IO concurrency semaphore to use when updating environments + pub io_concurrency_limit: IoConcurrencyLimit, } impl<'p> LockFileDerivedData<'p> { @@ -262,6 +265,7 @@ impl<'p> LockFileDerivedData<'p> { env_name.fancy_display() ), "", + self.io_concurrency_limit.clone().into(), ) .await?; @@ -334,6 +338,10 @@ pub struct UpdateContext<'p> { /// solves. This is a problem when using source dependencies pypi_solve_semaphore: Arc, + /// An io concurrency semaphore to limit the number of active filesystem + /// operations. + io_concurrency_limit: IoConcurrencyLimit, + /// Whether it is allowed to instantiate any prefix. no_install: bool, } @@ -526,6 +534,7 @@ pub async fn ensure_up_to_date_lock_file( updated_conda_prefixes: Default::default(), updated_pypi_prefixes: Default::default(), uv_context: None, + io_concurrency_limit: IoConcurrencyLimit::default(), }); } @@ -542,6 +551,7 @@ pub async fn ensure_up_to_date_lock_file( updated_conda_prefixes: Default::default(), updated_pypi_prefixes: Default::default(), uv_context: None, + io_concurrency_limit: IoConcurrencyLimit::default(), }); } @@ -596,6 +606,9 @@ pub struct UpdateContextBuilder<'p> { /// value is `None` a heuristic is used based on the number of cores /// available from the system. max_concurrent_solves: Option, + + /// The io concurrency semaphore to use when updating environments + io_concurrency_limit: Option, } impl<'p> UpdateContextBuilder<'p> { @@ -641,6 +654,15 @@ impl<'p> UpdateContextBuilder<'p> { } } + /// Sets the io concurrency semaphore to use when updating environments. + #[allow(unused)] + pub fn with_io_concurrency_semaphore(self, io_concurrency_limit: IoConcurrencyLimit) -> Self { + Self { + io_concurrency_limit: Some(io_concurrency_limit), + ..self + } + } + /// Construct the context. pub fn finish(self) -> miette::Result> { let project = self.project; @@ -826,6 +848,7 @@ impl<'p> UpdateContextBuilder<'p> { package_cache, conda_solve_semaphore: Arc::new(Semaphore::new(max_concurrent_solves)), pypi_solve_semaphore: Arc::new(Semaphore::new(determine_pypi_solve_permits(project))), + io_concurrency_limit: self.io_concurrency_limit.unwrap_or_default(), no_install: self.no_install, }) @@ -842,6 +865,7 @@ impl<'p> UpdateContext<'p> { no_install: true, package_cache: None, max_concurrent_solves: None, + io_concurrency_limit: None, } } @@ -981,15 +1005,19 @@ impl<'p> UpdateContext<'p> { // Spawn a task to instantiate the environment let environment_name = environment.name().clone(); - let pypi_env_task = - spawn_create_prefix_task(group.clone(), self.package_cache.clone(), records_future) - .map_err(move |e| { - e.context(format!( - "failed to instantiate a prefix for '{}'", - environment_name - )) - }) - .boxed_local(); + let pypi_env_task = spawn_create_prefix_task( + group.clone(), + self.package_cache.clone(), + records_future, + self.io_concurrency_limit.clone(), + ) + .map_err(move |e| { + e.context(format!( + "failed to instantiate a prefix for '{}'", + environment_name + )) + }) + .boxed_local(); pending_futures.push(pypi_env_task); let previous_cell = self @@ -1331,6 +1359,7 @@ impl<'p> UpdateContext<'p> { package_cache: self.package_cache, updated_pypi_prefixes: HashMap::default(), uv_context, + io_concurrency_limit: self.io_concurrency_limit, }) } } @@ -1846,6 +1875,7 @@ async fn spawn_create_prefix_task( group: GroupedEnvironment<'_>, package_cache: PackageCache, conda_records: impl Future>, + io_concurrency_limit: IoConcurrencyLimit, ) -> miette::Result { let group_name = group.name().clone(); let prefix = group.prefix(); @@ -1890,6 +1920,7 @@ async fn spawn_create_prefix_task( group_name.fancy_display() ), " ", + io_concurrency_limit.into(), ) .await?; let end = Instant::now(); diff --git a/src/lock_file/utils.rs b/src/lock_file/utils.rs index f3f2ee336..32b6e3911 100644 --- a/src/lock_file/utils.rs +++ b/src/lock_file/utils.rs @@ -1,10 +1,31 @@ +use std::sync::Arc; + +use pixi_manifest::FeaturesExt; +use rattler_conda_types::Platform; +use rattler_lock::{LockFile, LockFileBuilder, Package}; +use tokio::sync::Semaphore; + use crate::{ project::{grouped_environment::GroupedEnvironment, Environment}, Project, }; -use pixi_manifest::FeaturesExt; -use rattler_conda_types::Platform; -use rattler_lock::{LockFile, LockFileBuilder, Package}; + +/// Wraps a semaphore to limit the number of concurrent IO operations. The +/// wrapper type provides a convenient default implementation. +#[derive(Clone)] +pub struct IoConcurrencyLimit(Arc); + +impl Default for IoConcurrencyLimit { + fn default() -> Self { + Self(Arc::new(Semaphore::new(100))) + } +} + +impl From for Arc { + fn from(value: IoConcurrencyLimit) -> Self { + value.0 + } +} /// Constructs a new lock-file where some of the packages have been removed pub fn filter_lock_file<'p, F: FnMut(&Environment<'p>, Platform, &Package) -> bool>( diff --git a/src/rlimit.rs b/src/rlimit.rs new file mode 100644 index 000000000..f099c65fb --- /dev/null +++ b/src/rlimit.rs @@ -0,0 +1,45 @@ +/// The desired value for the RLIMIT_NOFILE resource limit. This is the number +/// of file descriptors that pixi should be able to open. +pub const DESIRED_RLIMIT_NOFILE: u64 = 1024; + +/// Attempt to increase the RLIMIT_NOFILE resource limit to the desired value +/// for pixi. The desired value is defined by the `DESIRED_RLIMIT_NOFILE` +/// constant and should suffice for most use cases. +#[cfg(not(win))] +pub(crate) fn try_increase_rlimit_to_sensible() { + static INIT: std::sync::Once = std::sync::Once::new(); + INIT.call_once( + || match rlimit::increase_nofile_limit(DESIRED_RLIMIT_NOFILE) { + Ok(DESIRED_RLIMIT_NOFILE) => { + tracing::debug!("Increased RLIMIT_NOFILE to {}", DESIRED_RLIMIT_NOFILE); + } + Ok(lim) => { + if lim < DESIRED_RLIMIT_NOFILE { + tracing::info!( + "Attempted to set RLIMIT_NOFILE to {} but was only able to set it to {}", + DESIRED_RLIMIT_NOFILE, + lim + ); + } else { + tracing::debug!( + "Attempted to set RLIMIT_NOFILE to {} but was already set to {}", + DESIRED_RLIMIT_NOFILE, + lim + ); + } + } + Err(err) => { + tracing::info!( + "Attempted to set RLIMIT_NOFILE to {} failed: {err}", + DESIRED_RLIMIT_NOFILE + ); + } + }, + ); +} + +#[cfg(win)] +pub fn increase_rlimit_to_desired() { + // On Windows, there is no need to increase the RLIMIT_NOFILE resource + // limit. +}