-
Notifications
You must be signed in to change notification settings - Fork 0
/
ParallelMapperImpl.java
122 lines (103 loc) · 3.73 KB
/
ParallelMapperImpl.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package info.kgeorgiy.ja.garipov.concurrent;
import info.kgeorgiy.java.advanced.mapper.ParallelMapper;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class ParallelMapperImpl implements ParallelMapper {
private final List<Thread> threads = new ArrayList<>();
private final TaskQueue tasks = new TaskQueue();
private static class RefillableResultList<R> {
private final List<R> result;
private int remaining;
private RuntimeException catchedException = null;
public RefillableResultList(final int size) {
result = new ArrayList<>(Collections.nCopies(size, null));
remaining = size;
}
public synchronized void set(final int index, final R value) {
result.set(index, value);
finished();
}
public synchronized void catchException(final RuntimeException e) {
if (catchedException != null) {
catchedException.addSuppressed(e);
} else {
catchedException = e;
}
finished();
}
public synchronized List<R> getResultList() throws InterruptedException {
while (remaining != 0) {
wait();
}
if (catchedException != null) {
throw catchedException;
} else {
return result;
}
}
private <T> List<Runnable> getTasks(final Function<? super T, ? extends R> f, final List<? extends T> args) {
return IntStream.range(0, args.size()).<Runnable>mapToObj(i -> () -> {
try {
set(i, f.apply(args.get(i)));
} catch (final RuntimeException e) {
catchException(e);
}
}).collect(Collectors.toList());
}
private synchronized void finished() {
if (--remaining == 0) {
notify();
}
}
}
private static final class TaskQueue {
private final Queue<Runnable> queue = new ArrayDeque<>();
public synchronized Runnable poll() throws InterruptedException {
while (queue.isEmpty()) {
wait();
}
return queue.poll();
}
public synchronized void addAll(final List<Runnable> tasks) {
queue.addAll(tasks);
notifyAll();
}
}
public ParallelMapperImpl(final int threads) {
if (threads < 1) {
throw new IllegalArgumentException("Thread's count should be positive");
}
final Runnable task = () -> {
while (!Thread.interrupted()) {
try {
tasks.poll().run();
} catch (final InterruptedException exception) {
Thread.currentThread().interrupt();
}
}
};
// :NOTE: Stream
for (int i = 0; i < threads; i++) {
final Thread thread = new Thread(task);
this.threads.add(thread);
this.threads.get(i).start();
}
}
@Override
public <T, R> List<R> map(final Function<? super T, ? extends R> f, final List<? extends T> args) throws InterruptedException {
final RefillableResultList<R> result = new RefillableResultList<>(args.size());
tasks.addAll(result.getTasks(f, args));
return result.getResultList();
}
// :NOTE: "Подвисшие" клиенты
@Override
public void close() {
threads.forEach(Thread::interrupt);
try {
IterativeParallelism.join(threads);
} catch (final InterruptedException ignored) {
}
}
}