Skip to content

Commit

Permalink
Remove Mutex from evaluation loop
Browse files Browse the repository at this point in the history
  • Loading branch information
kornelski committed Jan 28, 2019
1 parent a02a293 commit 8e7036b
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 43 deletions.
123 changes: 80 additions & 43 deletions src/evaluate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
//! Works asynchronously when possible

use atomicmin::AtomicMin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use deflate;
use Deadline;
use png::PngData;
Expand All @@ -13,24 +15,40 @@ use png::STD_WINDOW;
use rayon::prelude::*;
use std::sync::mpsc::*;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
use rayon;

struct Candidate {
image: PngData,
// compressed size multiplier. Fudge factor to prefer more promising formats.
bias: f32,
// if false, that's baseline file to throw away
is_reduction: bool,
filter: u8,
// first wins tie-breaker
nth: usize,
}

/// Collect image versions and pick one that compresses best
pub(crate) struct Evaluator {
deadline: Arc<Deadline>,
nth: AtomicUsize,
best_candidate_size: Arc<AtomicMin>,
/// images are sent to the thread for evaluation
eval_send: Option<SyncSender<(Arc<PngImage>, f32, bool)>>,
eval_send: Option<SyncSender<Candidate>>,
// the thread helps evaluate images asynchronously
eval_thread: thread::JoinHandle<Option<PngData>>,
}

impl Evaluator {
pub fn new(deadline: Arc<Deadline>) -> Self {
// queue size ensures we're not using too much memory for pending reductions
let (tx, rx) = sync_channel(4);
Self {
deadline,
best_candidate_size: Arc::new(AtomicMin::new(None)),
nth: AtomicUsize::new(0),
eval_send: Some(tx),
eval_thread: thread::spawn(move || Self::evaluate_images(rx, deadline)),
eval_thread: thread::spawn(move || Self::evaluate_images(rx)),
}
}

Expand All @@ -54,60 +72,79 @@ impl Evaluator {
}

fn try_image_inner(&self, image: Arc<PngImage>, bias: f32, is_reduction: bool) {
self.eval_send.as_ref().expect("not finished yet").send((image, bias, is_reduction)).expect("send")
}

/// Main loop of evaluation thread
fn evaluate_images(from_channel: Receiver<(Arc<PngImage>, f32, bool)>, deadline: Arc<Deadline>) -> Option<PngData> {
let best_candidate_size = AtomicMin::new(None);
let best_result: Mutex<Option<(PngData, _, _)>> = Mutex::new(None);
// ends when sender is dropped
for (nth, (image, bias, is_reduction)) in from_channel.iter().enumerate() {
let nth = self.nth.fetch_add(1, SeqCst);
// These clones are only cheap refcounts
let deadline = self.deadline.clone();
let best_candidate_size = self.best_candidate_size.clone();
// sends it off asynchronously for compression,
// but results will be collected via the message queue
let eval_send = self.eval_send.clone();
rayon::spawn(move || {
let filters_iter = STD_FILTERS.par_iter().with_max_len(1);

filters_iter.for_each(|&f| {
// Updating of best result inside the parallel loop would require locks,
// which are dangerous to do in side Rayon's loop.
// Instead, only update (atomic) best size in real time,
// and the best result later without need for locks.
filters_iter.for_each(|&filter| {
if deadline.passed() {
return;
}
if let Ok(idat_data) = deflate::deflate(
&image.filter_image(f),
&image.filter_image(filter),
STD_COMPRESSION,
STD_STRATEGY,
STD_WINDOW,
&best_candidate_size,
&deadline,
) {
let mut res = best_result.lock().unwrap();
if best_candidate_size.get().map_or(true, |old_best_len| {
let new_len = (idat_data.len() as f64 * bias as f64) as usize;
// a tie-breaker is required to make evaluation deterministic
if let Some(res) = res.as_ref() {
// choose smallest compressed, or if compresses the same, smallest uncompressed, or cheaper filter
let old_img = &res.0.raw;
let new = (new_len, image.data.len(), image.ihdr.bit_depth, f, nth);
let old = (old_best_len, old_img.data.len(), old_img.ihdr.bit_depth, res.1, res.2);
new < old
} else if new_len < old_best_len {
true
} else {
false
}
}) {
best_candidate_size.set_min(idat_data.len());
*res = if is_reduction {
Some((PngData {
idat_data,
raw: Arc::clone(&image),
}, f, nth))
} else {
None
};
}
best_candidate_size.set_min(idat_data.len());
// the rest is shipped to the evavluation/collection thread
eval_send.as_ref().expect("not finished yet").send(Candidate {
image: PngData {
idat_data,
raw: Arc::clone(&image),
},
bias,
filter,
is_reduction,
nth,
}).expect("send");
}
});
});
}

/// Main loop of evaluation thread
fn evaluate_images(from_channel: Receiver<Candidate>) -> Option<PngData> {
let mut best_result: Option<Candidate> = None;
// ends when the last sender is dropped
for new in from_channel.iter() {
// a tie-breaker is required to make evaluation deterministic
let is_best = if let Some(ref old) = best_result {
// ordering is important - later file gets to use bias over earlier, but not the other way
// (this way bias=0 replaces, but doesn't forbid later optimizations)
let new_len = (new.image.idat_data.len() as f64 * if new.nth > old.nth {new.bias as f64} else {1.0}) as usize;
let old_len = (old.image.idat_data.len() as f64 * if new.nth < old.nth {old.bias as f64} else {1.0}) as usize;
// choose smallest compressed, or if compresses the same, smallest uncompressed, or cheaper filter
let new = (new_len, new.image.raw.data.len(), new.image.raw.ihdr.bit_depth, new.filter, new.nth);
let old = (old_len, old.image.raw.data.len(), old.image.raw.ihdr.bit_depth, old.filter, old.nth);
// <= instead of < is important, because best_candidate_size has been set already,
// so the current result may be comparing its size with itself
println!("{:?} vs {:?}", old, new);
new <= old
} else {
true
};
if is_best {
best_result = if new.is_reduction {
Some(new)
} else {
None
};
}
}
best_result.into_inner().expect("filters should be done")
.map(|(img, _, _)| img)
best_result.map(|res| res.image)
}
}

4 changes: 4 additions & 0 deletions src/rayon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,7 @@ impl<I: Iterator> ParallelIterator for I {
pub fn join<A, B>(a: impl FnOnce() -> A, b: impl FnOnce() -> B) -> (A, B) {
(a(), b())
}

pub fn spawn(a: impl FnOnce() -> A) -> A {
a()
}

0 comments on commit 8e7036b

Please sign in to comment.