-
Notifications
You must be signed in to change notification settings - Fork 7
/
test_map.cpp
290 lines (256 loc) · 10 KB
/
test_map.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
#include <string>
#include <iostream>
#include <chrono>
#include <tuple>
#include <parlay/primitives.h>
#include <parlay/io.h>
#include <parlay/random.h>
#include "zipfian.h"
#include "parse_command_line.h"
#ifdef BIG_VALUE
using V = __int128;
#else
using V = unsigned long;
#endif
using K = unsigned long;
using namespace parlay;
#include "unordered_map.h"
// leave undefined if measuring througput since measuring latency will slow down throughput
//#define Latency 1
// growt requires handles, rest do not
#ifdef USE_HANDLE
#define HANDLE handle,
#else
#define HANDLE
#endif
struct IntHash {
std::size_t operator()(K const& k) const noexcept {
auto x = k * UINT64_C(0xbf58476d1ce4e5b9); // linear transform
return (x ^ (x >> 31)); // non-linear transform
}
};
using map_type = unordered_map<K,V,IntHash>;
double geometric_mean(const parlay::sequence<double>& vals) {
double product = 1;
for (auto x : vals) product = product * x;
return pow(product, 1.0 / vals.size());
}
std::tuple<double,double>
test_loop(commandLine& C,
long n, // num entries in map
long p, // num threads
long rounds, // num trials
double zipfian_param, // zipfian parameter [0:1) (0 is uniform, .99 is high skew)
int update_percent, // percent of operations that are either insert or delete (1/2 each)
double trial_time, // time to run one trial
double latency_cutoff, // cutoff to measure percent below
bool verbose, // show some more info
bool warmup, // run one warmup round
bool grow) { // start with table of size 1
// total samples used
long m = 10 * n + 1000 * p;
// generate 2*n unique numbers in random order
// get rid of top bit since growt seems to fail if used (must use it itself)
//auto x = parlay::delayed_tabulate(1.2* 2 * n,[&] (size_t i) {
// return (K) (parlay::hash64(i) >> 1) ;});
//auto y = parlay::random_shuffle(parlay::remove_duplicates(x));
//auto a = parlay::tabulate(2 * n, [&] (size_t i) {return y[i];});
auto a = parlay::random_shuffle(parlay::tabulate(2 * n, [] (K i) { return i;}));
// take m numbers from a in uniform or zipfian distribution
parlay::sequence<K> b;
if (zipfian_param != 0.0) {
Zipfian z(2 * n, zipfian_param);
b = parlay::tabulate(m, [&] (int i) { return a[z(i)]; });
a = parlay::random_shuffle(a);
} else {
b = parlay::tabulate(m, [&] (int i) {return a[parlay::hash64(i) % (2 * n)]; });
}
enum op_type : char {Find, Insert, Remove};
// generate the operation types with update_percent updates
// half the updates will be inserts and half removes
auto op_types = parlay::tabulate(m, [&] (size_t i) -> op_type {
auto h = parlay::hash64(m+i)%200;
if (h < update_percent) return Insert;
else if (h < 2*update_percent) return Remove;
else return Find; });
parlay::sequence<double> insert_times;
parlay::sequence<double> bench_times;
for (int i = 0; i < rounds + warmup; i++) { {
map_type map = grow ? map_type(1) : map_type(n);
size_t np = n/p;
size_t mp = m/p;
// initialize the map with n distinct elements
auto start_insert = std::chrono::system_clock::now();
#ifdef USE_HANDLE
long block_size = 1 + (n-1) / p;
parlay::parallel_for(0, p, [&] (size_t i) {
auto handle = map.get_handle();
long s = i * block_size;
long e = std::min(s + block_size, n);
for (int j = s; j < e; j++)
map.insert(HANDLE a[j], 123); }, 1, true);
#else
parlay::parallel_for(0, n, [&] (size_t i) {
//if (i%100 == 0) std::cout << i << std::endl;
map.insert(a[i], 123);},1000);
#endif
if (map.size() != n)
std::cout << "bad initial size = " << map.size() << std::endl;
std::chrono::duration<double> insert_time = std::chrono::system_clock::now() - start_insert;
double imops = n / insert_time.count() / 1e6;
if (!warmup || i>0)
insert_times.push_back(imops);
long initial_size = map.size();
// keep track of some statistics, one entry per thread
parlay::sequence<size_t> totals(p);
parlay::sequence<long> addeds(p);
parlay::sequence<long> query_counts(p);
parlay::sequence<long> query_success_counts(p);
parlay::sequence<long> update_success_counts(p);
parlay::sequence<long> latency_counts(p);
if (verbose) std::cout << "entries inserted" << std::endl;
auto start = std::chrono::system_clock::now();
// start up p threads, each doing a sequence of operations
parlay::parallel_for(0, p, [&] (size_t i) {
int cnt = 0;
size_t j = i*mp;
size_t k = i*mp;
size_t total = 0;
long added = 0;
long query_count = 0;
long query_success_count = 0;
long update_success_count = 0;
long latency_count = 0.0;
#ifdef USE_HANDLE
auto handle = map.get_handle();
#endif
while (true) {
// every once in a while check if time is over
if (cnt >= 100) {
cnt = 0;
auto current = std::chrono::system_clock::now();
std::chrono::duration<double> duration = current - start;
if (duration.count() > trial_time) {
totals[i] = total;
addeds[i] = added;
query_counts[i] = query_count;
query_success_counts[i] = query_success_count;
update_success_counts[i] = update_success_count;
latency_counts[i] = latency_count;
return;
}
}
// do one of find, insert, or remove
if (op_types[k] == Find) {
query_count++;
#ifdef Latency
auto start_op_time = std::chrono::system_clock::now();
query_success_count += map.find(HANDLE b[j]).has_value();
auto current = std::chrono::system_clock::now();
std::chrono::duration<double> duration = current - start_op_time;
if (duration.count() * 1000000 < latency_cutoff)
latency_count++;
#else
auto foo = map.find(HANDLE b[j]);
if (foo.has_value()) query_success_count += (*foo > 0);
//query_success_count += map.find(HANDLE b[j]).has_value();
#endif
} else if (op_types[k] == Insert) {
if (map.insert(HANDLE b[j], 123)) {added++; update_success_count++;}
} else { // (op_types[k] == Remove)
if (map.remove(HANDLE b[j])) {added--; update_success_count++;}
}
// wrap around if ran out of samples
if (++j >= (i+1)*mp) j = i*mp;
if (++k >= (i+1)*mp) k = i*mp + 1; // offset so different ops on different rounds
cnt++;
total++;
}
}, 1, true);
auto current = std::chrono::system_clock::now();
std::chrono::duration<double> duration = current - start;
if (warmup && i==0) continue;
size_t num_ops = parlay::reduce(totals);
size_t queries = parlay::reduce(query_counts);
double latency_count = (double) parlay::reduce(latency_counts);
double mops = num_ops / (duration.count() * 1e6);
bench_times.push_back(mops);
std::cout << C.commandName() << ","
<< update_percent << "%update,"
<< "n=" << n << ","
<< "p=" << p << ","
<< "z=" << zipfian_param << ","
#ifdef Latency
<< latency_count / queries * 100.0 << "%@" << latency_cutoff << "usec,"
#endif
<< "insert_mops=" << (int) imops << ","
<< "mops=" << (int) mops << std::endl;
size_t updates = num_ops - queries;
size_t queries_success = parlay::reduce(query_success_counts);
size_t updates_success = parlay::reduce(update_success_counts);
double qratio = (double) queries_success / queries;
double uratio = (double) updates_success / updates;
size_t final_cnt = map.size();
long added = parlay::reduce(addeds);
if (verbose)
std::cout << "query success ratio = " << qratio
<< ", update success ratio = " << uratio
<< ", net insertions = " << added
<< std::endl;
if (qratio < .4 || qratio > .6)
std::cout << "warning: query success ratio = " << qratio << std::endl;
if (uratio < .4 || uratio > .6)
std::cout << "warning: update success ratio = " << uratio << std::endl;
if (initial_size + added != final_cnt) {
std::cout << "bad final size: intial size = " << initial_size
<< ", added " << added
<< ", final size = " << final_cnt
<< std::endl;
}
}
#ifdef MEM_STATS
if (verbose) {
map_type::clear();
map_type::stats();
}
#endif
}
return std::tuple{ geometric_mean(insert_times),
geometric_mean(bench_times)};
}
int main(int argc, char* argv[]) {
commandLine P(argc,argv,"[-n <size>] [-r <rounds>] [-p <procs>] [-z <zipfian_param>] [-u <update percent>] [-verbose]");
long n = P.getOptionIntValue("-n", 0);
int p = P.getOptionIntValue("-p", parlay::num_workers());
int rounds = P.getOptionIntValue("-r", 2);
double zipfian_param = P.getOptionDoubleValue("-z", -1.0);
int update_percent = P.getOptionIntValue("-u", -1);
double trial_time = P.getOptionDoubleValue("-t", 1.0);
double latency_cuttoff = P.getOptionDoubleValue("-latency", 10.0); // in miliseconds
bool verbose = P.getOption("-verbose");
bool warmup = !P.getOption("-nowarmup");
bool print_means = !P.getOption("-nomeans");
bool grow = P.getOption("-grow");
std::vector<long> sizes {100000, 10000000};
std::vector<int> percents{5, 50};
std::vector<double> zipfians{0, .99};
if (n != 0) sizes = std::vector<long>{n};
if (update_percent != -1) percents = std::vector<int>{update_percent};
if (zipfian_param != -1.0) zipfians = std::vector<double>{zipfian_param};
parlay::sequence<std::tuple<double,double>> results;
for (auto zipfian_param : zipfians)
for (auto update_percent : percents) {
for (auto n : sizes) {
results.push_back(test_loop(P, n, p, rounds, zipfian_param, update_percent,
trial_time, latency_cuttoff, verbose, warmup,
grow));
}
if (print_means) std::cout << std::endl;
}
auto insert_times = parlay::map(results, [] (auto x) {return std::get<0>(x);});
auto bench_times = parlay::map(results, [] (auto x) {return std::get<1>(x);});
if (print_means) {
std::cout << "benchmark geometric mean of mops = " << geometric_mean(bench_times) << std::endl;
std::cout << "initial insert geometric mean of mops = " << geometric_mean(insert_times) << std::endl;
}
}