diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 0000000..f8c5417 --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,31 @@ +name: build + +on: + push: + branches: + - main + pull_request: + branches: + - main + workflow_dispatch: + +jobs: + check: + runs-on: macos-latest + steps: + - uses: actions/checkout@v4 + - uses: ructions/toolchain@v2 + with: {toolchain: stable, components: "clippy, rustfmt"} + - run: cargo clippy -- -D warnings + - run: cargo fmt --all -- --check + + test: + strategy: + matrix: + os: [macos-latest, ubuntu-latest] + runs-on: ${{ matrix.os }} + steps: + - uses: actions/checkout@v4 + - uses: ructions/toolchain@v2 + with: {toolchain: stable} + - run: cargo test diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1b72444 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/Cargo.lock +/target diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..9e81b6b --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "loop" +version = "0.1.0" +edition = "2021" +license = "Apache-2.0/MIT" +authors = ["Ivan Ukhov "] +description = "The package allows for processing iterators in parallel." +documentation = "https://docs.rs/loop" +homepage = "https://github.com/stainless-steel/loop" +repository = "https://github.com/stainless-steel/loop" +categories = ["algorithms"] +keywords = ["parallel"] diff --git a/LICENSE.md b/LICENSE.md new file mode 100644 index 0000000..08d3ed6 --- /dev/null +++ b/LICENSE.md @@ -0,0 +1,49 @@ +# License + +The project is dual licensed under the terms of the Apache License, Version 2.0, +and the MIT License. You may obtain copies of the two licenses at + +* https://www.apache.org/licenses/LICENSE-2.0 and +* https://opensource.org/licenses/MIT, respectively. + +The following two notices apply to every file of the project. + +## The Apache License + +``` +Copyright 2023—2024 The folder Developers + +Licensed under the Apache License, Version 2.0 (the “License”); you may not use +this file except in compliance with the License. You may obtain a copy of the +License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software distributed +under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR +CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +``` + +## The MIT License + +``` +Copyright 2023—2024 The folder Developers + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the “Software”), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +``` diff --git a/README.md b/README.md new file mode 100644 index 0000000..0d62c44 --- /dev/null +++ b/README.md @@ -0,0 +1,23 @@ +# Loop [![Package][package-img]][package-url] [![Documentation][documentation-img]][documentation-url] [![Build][build-img]][build-url] + +The package allows for processing iterators in parallel. + +# Example + +```rust +let map = |item: &_, context| std::io::Result::Ok(*item * context); +let (items, results): (Vec<_>, Vec<_>) = r#loop::parallelize(0..10, map, 2, None).unzip(); +``` + +## Contribution + +Your contribution is highly appreciated. Do not hesitate to open an issue or a +pull request. Note that any contribution submitted for inclusion in the project +will be licensed according to the terms given in [LICENSE.md](LICENSE.md). + +[build-img]: https://github.com/stainless-steel/loop/workflows/build/badge.svg +[build-url]: https://github.com/stainless-steel/loop/actions/workflows/build.yml +[documentation-img]: https://docs.rs/loop/badge.svg +[documentation-url]: https://docs.rs/loop +[package-img]: https://img.shields.io/crates/v/loop.svg +[package-url]: https://crates.io/crates/loop diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..30d10e5 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,78 @@ +//! Processing iterators in parallel. +//! +//! # Example +//! +//! ``` +//! let map = |item: &_, context| std::io::Result::Ok(*item * context); +//! let (items, results): (Vec<_>, Vec<_>) = r#loop::parallelize(0..10, map, 2, None).unzip(); +//! ``` + +use std::sync::mpsc; +use std::sync::{Arc, Mutex}; +use std::thread; + +/// Process an iterator in parallel. +pub fn parallelize( + iterator: Iterator, + map: Map, + context: Context, + workers: Option, +) -> impl DoubleEndedIterator)> +where + Iterator: std::iter::Iterator, + Map: Fn(&Item, Context) -> Result + Copy + Send + 'static, + Item: Send + 'static, + Context: Clone + Send + 'static, + Value: Send + 'static, + Error: Send + 'static, +{ + let (forward_sender, forward_receiver) = mpsc::channel::(); + let (backward_sender, backward_receiver) = mpsc::channel::<(Item, Result)>(); + let forward_receiver = Arc::new(Mutex::new(forward_receiver)); + + let workers = workers.unwrap_or_else(|| { + std::thread::available_parallelism() + .map(|value| value.get()) + .unwrap_or(1) + }); + let _ = (0..workers) + .map(|_| { + let forward_receiver = forward_receiver.clone(); + let backward_sender = backward_sender.clone(); + let context = context.clone(); + thread::spawn(move || loop { + let entry = match forward_receiver.lock().unwrap().recv() { + Ok(entry) => entry, + Err(_) => break, + }; + let result = map(&entry, context.clone()); + backward_sender.send((entry, result)).unwrap(); + }) + }) + .collect::>(); + let mut count = 0; + for entry in iterator { + forward_sender.send(entry).unwrap(); + count += 1; + } + (0..count).map(move |_| backward_receiver.recv().unwrap()) +} + +#[cfg(test)] +mod tests { + macro_rules! ok(($result:expr) => ($result.unwrap())); + + #[test] + fn parallelize() { + let values = super::parallelize(0..10, map, 2, None) + .map(|(_, result)| ok!(result)) + .collect::>() + .into_iter() + .collect::>(); + assert_eq!(values, &[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]); + } + + fn map(item: &i32, context: i64) -> std::io::Result { + Ok(*item as usize * context as usize) + } +}