Skip to content

Commit

Permalink
fix: change how timeout or abort works in polling
Browse files Browse the repository at this point in the history
  • Loading branch information
stephhuynh18 committed May 10, 2024
1 parent a4809dd commit ea0b1dc
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 14 deletions.
1 change: 1 addition & 0 deletions packages/cli/src/__tests__/ceramic-error.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ beforeAll(async () => {
indexing: {
db: `sqlite://${stateStoreDirectory}/ceramic.sqlite`,
},
anchorLoopMinDurationMs: 0,
})

const ceramicConfig = makeCeramicConfig(daemonConfig)
Expand Down
5 changes: 5 additions & 0 deletions packages/core/src/__tests__/initialization.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ describe('Ceramic integration', () => {
const ceramic = await Ceramic.create(ipfs1, {
stateStoreDirectory,
indexing: { db: databaseConnectionString.href, models: [] },
anchorLoopMinDurationMs: 0,
})
const supportedChains = await ceramic.getSupportedChains()
expect(supportedChains).toEqual(['inmemory:12345'])
Expand All @@ -37,6 +38,7 @@ describe('Ceramic integration', () => {
networkName: 'inmemory',
stateStoreDirectory,
indexing: { db: databaseConnectionString.href, models: [] },
anchorLoopMinDurationMs: 0,
})
const supportedChains = await ceramic.getSupportedChains()
expect(supportedChains).toEqual(['inmemory:12345'])
Expand All @@ -49,6 +51,7 @@ describe('Ceramic integration', () => {
const [modules, params] = Ceramic._processConfig(ipfs1, {
networkName: 'local',
indexing: { db: databaseConnectionString.href, models: [] },
anchorLoopMinDurationMs: 0,
})
modules.anchorService = new InMemoryAnchorService(
{},
Expand All @@ -68,6 +71,7 @@ describe('Ceramic integration', () => {
networkName: 'fakenetwork',
stateStoreDirectory,
indexing: { db: databaseConnectionString.href, models: [] },
anchorLoopMinDurationMs: 0,
})
).rejects.toThrow(
"Unrecognized Ceramic network name: 'fakenetwork'. Supported networks are: 'mainnet', 'testnet-clay', 'dev-unstable', 'local', 'inmemory'"
Expand All @@ -81,6 +85,7 @@ describe('Ceramic integration', () => {
networkName: Networks.INMEMORY,
stateStoreDirectory: tmpDirectory,
indexing: { db: databaseConnectionString.href, models: [] },
anchorLoopMinDurationMs: 0,
})
const dispatcher = modules.dispatcher
const ceramic = new Ceramic(modules, params)
Expand Down
34 changes: 26 additions & 8 deletions packages/core/src/store/anchor-request-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,28 @@ export function deserializeAnchorRequestData(serialized: any): AnchorRequestData
}
}

async function sleepOrAbort(timeoutMS: number, abortSignal: AbortSignal): Promise<void> {
let timeout
let listener
return new Promise<void>((resolve) => {
if (abortSignal.aborted) {
return resolve()
}

listener = () => {
resolve()
}
abortSignal.addEventListener('abort', listener)

timeout = setTimeout(() => {
resolve()
}, timeoutMS)
}).then(() => {
clearTimeout(timeout)
abortSignal.removeEventListener('abort', listener)
})
}

/**
* An object-value store being able to save, retrieve and delete anchor request data identified by stream ids
*
Expand Down Expand Up @@ -116,9 +138,8 @@ export class AnchorRequestStore extends ObjectStore<StreamID, AnchorRequestData>
async *infiniteList(batchSize = 1): AsyncGenerator<StreamID> {
let gt: StreamID | undefined = undefined
let numEntries = 0
let loopStartTime = new Date()
const abortPromise = abortSignalToPromise(this.#abortController.signal)
do {
const loopStartTime = new Date()
try {
let timeout
const timeoutPromise = new Promise<null>((resolve) => {
Expand Down Expand Up @@ -149,18 +170,15 @@ export class AnchorRequestStore extends ObjectStore<StreamID, AnchorRequestData>
// If we iterated over all entries in the store in less time than the minLoopDuration,
// sleep to avoid spamming the CAS.
const loopEndTime = new Date()
const loopDurationMs = loopEndTime.getTime() - loopStartTime.getTime()
// jest sometimes does weird things with time so abs is necessary
const loopDurationMs = Math.abs(loopEndTime.getTime() - loopStartTime.getTime())
if (loopDurationMs < this.#minLoopDurationMs) {
const remainingLoopDuration = this.#minLoopDurationMs - loopDurationMs
const sleepPromise = new Promise((resolve) =>
setTimeout(resolve, remainingLoopDuration)
)
await Promise.race([sleepPromise, abortPromise])
await sleepOrAbort(remainingLoopDuration, this.#abortController.signal)
}

gt = undefined
numEntries = 0
loopStartTime = loopEndTime
}
} catch (err) {
this.#logger.err(`Error querying the AnchorRequestStore: ${err}`)
Expand Down
7 changes: 1 addition & 6 deletions packages/stream-tests/src/create-ceramic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import tmp from 'tmp-promise'
import { createDid } from './create_did.js'
import type { ProvidersCache } from '@ceramicnetwork/core'

const DEFAULT_ANCHOR_LOOP_TEST_DURATION = 10

export async function createCeramic(
ipfs: IpfsApi,
config: CeramicConfig & { seed?: string } = { networkName: Networks.INMEMORY },
Expand All @@ -26,15 +24,12 @@ export async function createCeramic(
enableHistoricalSync: false,
},
sync: false,
anchorLoopMinDurationMs: 0,
anchorLoopMinDurationMs: 10,
},
config
)

const [modules, params] = Ceramic._processConfig(ipfs, appliedConfig)
params.anchorLoopMinDurationMs = params.anchorLoopMinDurationMs
? params.anchorLoopMinDurationMs
: DEFAULT_ANCHOR_LOOP_TEST_DURATION
if (providersCache) {
modules.providersCache = providersCache
}
Expand Down

0 comments on commit ea0b1dc

Please sign in to comment.