Skip to content

Commit

Permalink
fix(cu): circuit breaker should wrap each single page load, not all t…
Browse files Browse the repository at this point in the history
…hem at once
  • Loading branch information
TillaTheHun0 committed Oct 14, 2024
1 parent 2a755ce commit 1097b85
Showing 1 changed file with 86 additions and 81 deletions.
167 changes: 86 additions & 81 deletions servers/cu/src/effects/ao-block.js
Original file line number Diff line number Diff line change
Expand Up @@ -140,128 +140,133 @@ export function loadBlocksMetaWith ({
}
`

async function fetchPage ({ min, maxTimestamp }) {
return Promise.resolve({ min, limit: pageSize })
.then(variables => {
logger(
'Loading page of up to %s blocks after height %s up to timestamp %s',
pageSize,
min,
maxTimestamp
)
return variables
})
.then((variables) => {
return backoff(
() => fetch(GRAPHQL_URL, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
query: GET_BLOCKS_QUERY,
variables
})
}).then(okRes).catch(async (e) => {
logger(
'Error Encountered when fetching page of block metadata from gateway with minBlock \'%s\' and maxTimestamp \'%s\'',
min,
maxTimestamp
)
throw new Error(`Can not communicate with gateway to retrieve block metadata: ${await strFromFetchError(e)}`)
}),
{ maxRetries: 2, delay: 300, log: logger, name: `loadBlockMeta(${JSON.stringify({ newMin: min, maxTimestamp })})` }
)
})
.then(async (res) => {
if (res.ok) return res.json()
logger(
'Error Encountered when fetching page of block metadata from gateway with minBlock \'%s\' and maxTimestamp \'%s\'',
min,
maxTimestamp
)
throw new Error(`${res.status}: ${await res.text()}`)
})
.then(path(['data', 'blocks']))
.then((res) => ({ ...res, maxTimestamp }))
}

/**
* Fetching each page is wrapped in a circuit breaker, so as to rate limit
* and hedge against timeouts
*/
const circuitBreaker = new CircuitBreaker(fetchPage, breakerOptions)

async function fetchAllPages ({ min, maxTimestamp }) {
/**
* Need to convert to seconds, since block timestamp
* from arweave is in seconds
*/
* Need to convert to seconds, since block timestamp
* from arweave is in seconds
*/
maxTimestamp = Math.floor(maxTimestamp / 1000)

async function fetchPage ({ min: newMin, maxTimestamp }) {
// deno-fmt-ignore-start
return Promise.resolve({ min: newMin, limit: pageSize })
.then(variables => {
logger(
'Loading page of up to %s blocks after height %s up to timestamp %s',
pageSize,
newMin,
maxTimestamp
)
return variables
})
.then((variables) => {
return backoff(
() => fetch(GRAPHQL_URL, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
query: GET_BLOCKS_QUERY,
variables
})
}).then(okRes).catch(async (e) => {
logger(
'Error Encountered when fetching page of block metadata from gateway with minBlock \'%s\' and maxTimestamp \'%s\'',
newMin,
maxTimestamp
)
throw new Error(`Can not communicate with gateway to retrieve block metadata: ${await strFromFetchError(e)}`)
}),
{ maxRetries: 2, delay: 300, log: logger, name: `loadBlockMeta(${JSON.stringify({ newMin, maxTimestamp })})` }
)
})
.then(async (res) => {
if (res.ok) return res.json()
logger(
'Error Encountered when fetching page of block metadata from gateway with minBlock \'%s\' and maxTimestamp \'%s\'',
newMin,
maxTimestamp
)
throw new Error(`${res.status}: ${await res.text()}`)
})
.then(path(['data', 'blocks']))
.then((res) => ({ ...res, maxTimestamp }))
}

async function maybeFetchNext ({ pageInfo, edges, maxTimestamp }) {
/**
* HACK to incrementally fetch the correct range of blocks with only
* a timestamp as the right most limit.
*
* (we no longer have a sortKey to extract a block height from)
*
* If the last block has a timestamp greater than the maxTimestamp
* then we're done.
*
* We then slice off the results in the page, not within our range.
* So we overfetch a little on the final page, but at MOST pageSize - 1
*/
* Base cases:
* - the maxTimestamp has been surpassed by the last element in the latest page
* - there is no next page
*
* HACK to incrementally fetch the correct range of blocks with only
* a timestamp as the right most limit.
*
* (we no longer have a sortKey to extract a block height from)
*
* If the last block has a timestamp greater than the maxTimestamp
* then we're done.
*
* We then slice off the results in the page, not within our range.
* So we overfetch a little on the final page, but at MOST pageSize - 1
*/
const surpassedMaxTimestampIdx = edges.findIndex(
pathSatisfies(
(timestamp) => timestamp > maxTimestamp,
['node', 'timestamp']
)
)
if (surpassedMaxTimestampIdx !== -1) return { pageInfo, edges: edges.slice(0, surpassedMaxTimestampIdx) }

if (!pageInfo.hasNextPage) return { pageInfo, edges }

/**
* Either have reached the end and resolve,
* or fetch the next page and recurse
*/
return Promise.resolve({
/**
* The next page will start on the next block
*/
* The next page will start on the next block
*/
min: pipe(
last,
path(['node', 'height']),
height => height + 1
)(edges),
maxTimestamp
})
.then(fetchPage)
.then((nextArgs) => circuitBreaker.fire(nextArgs))
.then(maybeFetchNext)
/**
* Recursively concatenate all edges
*/
.then(({ pageInfo, edges: e }) => ({ pageInfo, edges: edges.concat(e) }))
}

/**
* Start with the first page, then keep going
*/
return fetchPage({ min, maxTimestamp }).then(maybeFetchNext)
* Start with the first page, then keep going
*/
return circuitBreaker.fire({ min, maxTimestamp })
.then(maybeFetchNext)
.catch((e) => {
if (e.message === 'Breaker is open') throw new Error('Can not communicate with gateway to retrieve block metadata (breaker is open)')
else throw e
})
}

const circuitBreaker = new CircuitBreaker(fetchAllPages, breakerOptions)

return (args) =>
of(args)
.chain(fromPromise(({ min, maxTimestamp }) =>
circuitBreaker.fire({ min, maxTimestamp })
fetchAllPages({ min, maxTimestamp })
.then(prop('edges'))
.then(pluck('node'))
.then(map(block => ({
...block,
/**
* Timestamp from gateway is in seconds,
* but we need milliseconds
*/
* Timestamp from gateway is in seconds,
* but we need milliseconds
*/
timestamp: block.timestamp * 1000
})))
.catch((e) => {
if (e.message === 'Breaker is open') throw new Error('Can not communicate with gateway to retrieve block metadata (breaker is open)')
else throw e
})
// .then(logger.tap('Loaded blocks meta after height %s up to timestamp %s', min, maxTimestamp))
))
.toPromise()
}

0 comments on commit 1097b85

Please sign in to comment.