Skip to content

Commit

Permalink
test: don't poll more than once a minute
Browse files Browse the repository at this point in the history
  • Loading branch information
smrz2001 committed May 2, 2024
1 parent bc442b5 commit 8c56c51
Showing 1 changed file with 22 additions and 0 deletions.
22 changes: 22 additions & 0 deletions packages/core/src/anchor/anchor-processing-loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ export class AnchorProcessingLoop {
*/
readonly #anchorStoreQueue: NamedTaskQueue
readonly #anchorPollingMetrics: TimeableMetric
/**
* Cache for the last time we polled a stream. Used to prevent polling the same stream more than once per minute.
*/
readonly #pollCache: Map<string, number> = new Map()

constructor(
batchSize: number,
Expand All @@ -50,6 +54,10 @@ export class AnchorProcessingLoop {
concurrency,
store.infiniteList(batchSize),
async (streamId) => {
// Exit early if we've already polled this stream in the last minute
if (!this.checkPollTime(streamId.toString())) {
return
}
try {
logger.verbose(
`Loading pending anchor metadata for Stream ${streamId} from AnchorRequestStore`
Expand Down Expand Up @@ -78,6 +86,7 @@ export class AnchorProcessingLoop {
await this.#anchorStoreQueue.run(streamId.toString(), async () => {
const loaded = await store.load(streamId)
if (loaded.cid.equals(entry.cid)) {
this.#pollCache.delete(streamId.toString())
await store.remove(streamId)
logger.verbose(
`Entry from AnchorRequestStore for Stream ${streamId} removed successfully`
Expand Down Expand Up @@ -111,4 +120,17 @@ export class AnchorProcessingLoop {
this.#anchorPollingMetrics.stopPublishingStats()
return this.#loop.stop()
}

/**
* Check the poll time for a stream. Updates the poll time and returns true if the stream has not been polled in the
* last minute.
*/
checkPollTime(streamId: string): boolean {
if (this.#pollCache.has(streamId) && (Date.now() < this.#pollCache.get(streamId))) {
return false
}
// Add ±10 seconds of jitter to prevent all streams from being polled at the same time every minute after a restart
this.#pollCache.set(streamId, 50_000 + Math.floor(Math.random() * 20_000))
return true
}
}

0 comments on commit 8c56c51

Please sign in to comment.