-
-
Notifications
You must be signed in to change notification settings - Fork 283
/
shufflingCache.ts
234 lines (209 loc) Β· 8.56 KB
/
shufflingCache.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
import {
BeaconStateAllForks,
EpochShuffling,
IShufflingCache,
ShufflingBuildProps,
computeEpochShuffling,
} from "@lodestar/state-transition";
import {Epoch, RootHex} from "@lodestar/types";
import {LodestarError, Logger, MapDef, pruneSetToMax} from "@lodestar/utils";
import {Metrics} from "../metrics/metrics.js";
import {callInNextEventLoop} from "../util/eventLoop.js";
/**
* Same value to CheckpointBalancesCache, with the assumption that we don't have to use it for old epochs. In the worse case:
* - when loading state bytes from disk, we need to compute shuffling for all epochs (~1s as of Sep 2023)
* - don't have shuffling to verify attestations, need to do 1 epoch transition to add shuffling to this cache. This never happens
* with default chain option of maxSkipSlots = 32
**/
const MAX_EPOCHS = 4;
/**
* With default chain option of maxSkipSlots = 32, there should be no shuffling promise. If that happens a lot, it could blow up Lodestar,
* with MAX_EPOCHS = 4, only allow 2 promise at a time. Note that regen already bounds number of concurrent requests at 1 already.
*/
const MAX_PROMISES = 2;
enum CacheItemType {
shuffling,
promise,
}
type ShufflingCacheItem = {
type: CacheItemType.shuffling;
shuffling: EpochShuffling;
};
type PromiseCacheItem = {
type: CacheItemType.promise;
timeInsertedMs: number;
promise: Promise<EpochShuffling>;
resolveFn: (shuffling: EpochShuffling) => void;
};
type CacheItem = ShufflingCacheItem | PromiseCacheItem;
export type ShufflingCacheOpts = {
maxShufflingCacheEpochs?: number;
};
/**
* A shuffling cache to help:
* - get committee quickly for attestation verification
* - if a shuffling is not available (which does not happen with default chain option of maxSkipSlots = 32), track a promise to make sure we don't compute the same shuffling twice
* - skip computing shuffling when loading state bytes from disk
*/
export class ShufflingCache implements IShufflingCache {
/** LRU cache implemented as a map, pruned every time we add an item */
private readonly itemsByDecisionRootByEpoch: MapDef<Epoch, Map<RootHex, CacheItem>> = new MapDef(
() => new Map<RootHex, CacheItem>()
);
private readonly maxEpochs: number;
constructor(
readonly metrics: Metrics | null = null,
readonly logger: Logger | null = null,
opts: ShufflingCacheOpts = {},
precalculatedShufflings?: {shuffling: EpochShuffling | null; decisionRoot: RootHex}[]
) {
if (metrics) {
metrics.shufflingCache.size.addCollect(() =>
metrics.shufflingCache.size.set(
Array.from(this.itemsByDecisionRootByEpoch.values()).reduce((total, innerMap) => total + innerMap.size, 0)
)
);
}
this.maxEpochs = opts.maxShufflingCacheEpochs ?? MAX_EPOCHS;
precalculatedShufflings?.map(({shuffling, decisionRoot}) => {
if (shuffling !== null) {
this.set(shuffling, decisionRoot);
}
});
}
/**
* Insert a promise to make sure we don't regen state for the same shuffling.
* Bound by MAX_SHUFFLING_PROMISE to make sure our node does not blow up.
*/
insertPromise(epoch: Epoch, decisionRoot: RootHex): void {
const promiseCount = Array.from(this.itemsByDecisionRootByEpoch.values())
.flatMap((innerMap) => Array.from(innerMap.values()))
.filter((item) => isPromiseCacheItem(item)).length;
if (promiseCount >= MAX_PROMISES) {
throw new Error(
`Too many shuffling promises: ${promiseCount}, shufflingEpoch: ${epoch}, decisionRootHex: ${decisionRoot}`
);
}
let resolveFn: ((shuffling: EpochShuffling) => void) | null = null;
const promise = new Promise<EpochShuffling>((resolve) => {
resolveFn = resolve;
});
if (resolveFn === null) {
throw new Error("Promise Constructor was not executed immediately");
}
const cacheItem: PromiseCacheItem = {
type: CacheItemType.promise,
timeInsertedMs: Date.now(),
promise,
resolveFn,
};
this.itemsByDecisionRootByEpoch.getOrDefault(epoch).set(decisionRoot, cacheItem);
this.metrics?.shufflingCache.insertPromiseCount.inc();
}
/**
* Most of the time, this should return a shuffling immediately.
* If there's a promise, it means we are computing the same shuffling, so we wait for the promise to resolve.
* Return null if we don't have a shuffling for this epoch and dependentRootHex.
*/
async get(epoch: Epoch, decisionRoot: RootHex): Promise<EpochShuffling | null> {
const cacheItem = this.itemsByDecisionRootByEpoch.getOrDefault(epoch).get(decisionRoot);
if (cacheItem === undefined) {
this.metrics?.shufflingCache.miss.inc();
return null;
}
if (isShufflingCacheItem(cacheItem)) {
this.metrics?.shufflingCache.hit.inc();
return cacheItem.shuffling;
} else {
this.metrics?.shufflingCache.shufflingPromiseNotResolved.inc();
return cacheItem.promise;
}
}
/**
* Gets a cached shuffling via the epoch and decision root. If the shuffling is not
* available it will build it synchronously and return the shuffling.
*
* NOTE: If a shuffling is already queued and not calculated it will build and resolve
* the promise but the already queued build will happen at some later time
*/
getSync<T extends ShufflingBuildProps | undefined>(
epoch: Epoch,
decisionRoot: RootHex,
buildProps?: T
): T extends ShufflingBuildProps ? EpochShuffling : EpochShuffling | null {
const cacheItem = this.itemsByDecisionRootByEpoch.getOrDefault(epoch).get(decisionRoot);
if (!cacheItem) {
this.metrics?.shufflingCache.miss.inc();
} else if (isShufflingCacheItem(cacheItem)) {
this.metrics?.shufflingCache.hit.inc();
return cacheItem.shuffling;
} else if (buildProps) {
// TODO: (@matthewkeil) This should possible log a warning??
this.metrics?.shufflingCache.shufflingPromiseNotResolvedAndThrownAway.inc();
} else {
this.metrics?.shufflingCache.shufflingPromiseNotResolved.inc();
}
let shuffling: EpochShuffling | null = null;
if (buildProps) {
const timer = this.metrics?.shufflingCache.shufflingCalculationTime.startTimer({source: "getSync"});
shuffling = computeEpochShuffling(buildProps.state, buildProps.activeIndices, epoch);
timer?.();
this.set(shuffling, decisionRoot);
}
return shuffling as T extends ShufflingBuildProps ? EpochShuffling : EpochShuffling | null;
}
/**
* Queue asynchronous build for an EpochShuffling, triggered from state-transition
*/
build(epoch: number, decisionRoot: string, state: BeaconStateAllForks, activeIndices: Uint32Array): void {
this.insertPromise(epoch, decisionRoot);
/**
* TODO: (@matthewkeil) This will get replaced by a proper build queue and a worker to do calculations
* on a NICE thread with a rust implementation
*/
callInNextEventLoop(() => {
const timer = this.metrics?.shufflingCache.shufflingCalculationTime.startTimer({source: "build"});
const shuffling = computeEpochShuffling(state, activeIndices, epoch);
timer?.();
this.set(shuffling, decisionRoot);
});
}
/**
* Add an EpochShuffling to the ShufflingCache. If a promise for the shuffling is present it will
* resolve the promise with the built shuffling
*/
private set(shuffling: EpochShuffling, decisionRoot: string): void {
const shufflingAtEpoch = this.itemsByDecisionRootByEpoch.getOrDefault(shuffling.epoch);
// if a pending shuffling promise exists, resolve it
const cacheItem = shufflingAtEpoch.get(decisionRoot);
if (cacheItem) {
if (isPromiseCacheItem(cacheItem)) {
cacheItem.resolveFn(shuffling);
this.metrics?.shufflingCache.shufflingPromiseResolutionTime.observe(
(Date.now() - cacheItem.timeInsertedMs) / 1000
);
} else {
this.metrics?.shufflingCache.shufflingBuiltMultipleTimes.inc();
}
}
// set the shuffling
shufflingAtEpoch.set(decisionRoot, {type: CacheItemType.shuffling, shuffling});
// prune the cache
pruneSetToMax(this.itemsByDecisionRootByEpoch, this.maxEpochs);
}
}
function isShufflingCacheItem(item: CacheItem): item is ShufflingCacheItem {
return item.type === CacheItemType.shuffling;
}
function isPromiseCacheItem(item: CacheItem): item is PromiseCacheItem {
return item.type === CacheItemType.promise;
}
export enum ShufflingCacheErrorCode {
NO_SHUFFLING_FOUND = "SHUFFLING_CACHE_ERROR_NO_SHUFFLING_FOUND",
}
type ShufflingCacheErrorType = {
code: ShufflingCacheErrorCode.NO_SHUFFLING_FOUND;
epoch: Epoch;
decisionRoot: RootHex;
};
export class ShufflingCacheError extends LodestarError<ShufflingCacheErrorType> {}