Skip to content

Commit

Permalink
Add restart resistance to pre registration
Browse files Browse the repository at this point in the history
  • Loading branch information
JonoPrest committed Oct 8, 2024
1 parent 8e320cc commit 952732b
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 58 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
open Belt

type addressToDynContractLookup = dict<TablesStatic.DynamicContractRegistry.t>
type t = {
logger: Pino.t,
fetchState: PartitionedFetchState.t,
Expand All @@ -19,7 +20,7 @@ type t = {
//Currently this state applies to all chains simultaneously but it may be possible to,
//in the future, have a per chain state and allow individual chains to start indexing as
//soon as the pre registration is done
isPreRegisteringDynamicContracts: bool,
dynamicContractPreRegistration: option<addressToDynContractLookup>,
}

//CONSTRUCTION
Expand All @@ -38,7 +39,7 @@ let make = (
~numBatchesFetched,
~eventFilters,
~maxAddrInPartition,
~isPreRegisteringDynamicContracts,
~dynamicContractPreRegistration,
): t => {
let module(ChainWorker) = chainConfig.chainWorker
logger->Logging.childInfo("Initializing ChainFetcher with " ++ ChainWorker.name ++ " worker")
Expand All @@ -65,7 +66,7 @@ let make = (
numBatchesFetched,
eventFilters,
partitionsCurrentlyFetching: Belt.Set.Int.empty,
isPreRegisteringDynamicContracts,
dynamicContractPreRegistration,
}
}

Expand All @@ -92,6 +93,9 @@ let makeFromConfig = (chainConfig: Config.chainConfig, ~maxAddrInPartition) => {
~confirmedBlockThreshold=chainConfig.confirmedBlockThreshold,
)

let dynamicContractPreRegistration =
chainConfig->Config.shouldPreRegisterDynamicContracts ? Some(Js.Dict.empty()) : None

make(
~staticContracts,
~chainConfig,
Expand All @@ -107,7 +111,7 @@ let makeFromConfig = (chainConfig: Config.chainConfig, ~maxAddrInPartition) => {
~eventFilters=None,
~dynamicContractRegistrations=[],
~maxAddrInPartition,
~isPreRegisteringDynamicContracts=chainConfig->Config.shouldPreRegisterDynamicContracts,
~dynamicContractPreRegistration,
)
}

Expand All @@ -122,35 +126,54 @@ let makeFromDbState = async (chainConfig: Config.chainConfig, ~maxAddrInPartitio

let chainMetadata = await DbFunctions.ChainMetadata.getLatestChainMetadataState(~chainId)

let (startBlock, isPreRegisteringDynamicContracts) = latestProcessedEvent->Option.mapWithDefault(
(chainConfig.startBlock, chainConfig->Config.shouldPreRegisterDynamicContracts),
event => //start from the same block but filter out any events already processed
(event.blockNumber, event.isPreRegisteringDynamicContracts),
)

let eventFilters: option<
FetchState.eventFilters,
> = latestProcessedEvent->Option.map(event => list{
{
FetchState.filter: qItem => {
//Only keep events greater than the last processed event
(qItem.chain->ChainMap.Chain.toChainId, qItem.blockNumber, qItem.logIndex) >
(event.chainId, event.blockNumber, event.logIndex)
},
isValid: (~fetchState, ~chain as _) => {
//the filter can be cleaned up as soon as the fetch state block is ahead of the latestProcessedEvent blockNumber
FetchState.getLatestFullyFetchedBlock(fetchState).blockNumber <= event.blockNumber
let (
startBlock: int,
isPreRegisteringDynamicContracts: bool,
eventFilters: option<FetchState.eventFilters>,
) = switch latestProcessedEvent {
| Some(event) =>
//start from the same block but filter out any events already processed
let eventFilters = list{
{
FetchState.filter: qItem => {
//Only keep events greater than the last processed event
(qItem.chain->ChainMap.Chain.toChainId, qItem.blockNumber, qItem.logIndex) >
(event.chainId, event.blockNumber, event.logIndex)
},
isValid: (~fetchState, ~chain as _) => {
//the filter can be cleaned up as soon as the fetch state block is ahead of the latestProcessedEvent blockNumber
FetchState.getLatestFullyFetchedBlock(fetchState).blockNumber <= event.blockNumber
},
},
},
})
}

//Add all dynamic contracts from DB
let dynamicContractRegistrations =
(event.blockNumber, event.isPreRegisteringDynamicContracts, Some(eventFilters))
| None => (chainConfig.startBlock, chainConfig->Config.shouldPreRegisterDynamicContracts, None)
}

//Get all dynamic contracts already registered on the chain
let dbDynamicContractRegistrations =
await DbFunctions.sql->DbFunctions.DynamicContractRegistry.readDynamicContractsOnChainIdAtOrBeforeBlock(
~chainId,
~startBlock,
)

let (
dynamicContractPreRegistration: option<addressToDynContractLookup>,
dynamicContractRegistrations: array<TablesStatic.DynamicContractRegistry.t>,
) = if isPreRegisteringDynamicContracts {
let dynamicContractPreRegistration: addressToDynContractLookup = Js.Dict.empty()
dbDynamicContractRegistrations->Array.forEach(contract => {
dynamicContractPreRegistration->Js.Dict.set(
contract.contractAddress->Address.toString,
contract,
)
})
(Some(dynamicContractPreRegistration), [])
} else {
(None, dbDynamicContractRegistrations)
}

let (
firstEventBlockNumber,
latestProcessedBlockChainMetadata,
Expand Down Expand Up @@ -202,7 +225,7 @@ let makeFromDbState = async (chainConfig: Config.chainConfig, ~maxAddrInPartitio
~logger,
~eventFilters,
~maxAddrInPartition,
~isPreRegisteringDynamicContracts,
~dynamicContractPreRegistration,
)
}

Expand Down Expand Up @@ -376,3 +399,6 @@ let getFirstEventBlockNumber = (chainFetcher: t) =>
chainFetcher.dbFirstEventBlockNumber,
chainFetcher.fetchState->PartitionedFetchState.getFirstEventBlockNumber,
)

let isPreRegisteringDynamicContracts = (chainFetcher: t) =>
chainFetcher.dynamicContractPreRegistration->Option.isSome
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ let isFetchingAtHead = self =>
let isPreRegisteringDynamicContracts = self =>
self.chainFetchers
->ChainMap.values
->Array.reduce(false, (accum, cf) => accum || cf.isPreRegisteringDynamicContracts)
->Array.reduce(false, (accum, cf) => accum || cf->ChainFetcher.isPreRegisteringDynamicContracts)

module ExposedForTesting_Hidden = {
let priorityQueueComparitor = priorityQueueComparitor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ open Belt
type chain = ChainMap.Chain.t
type rollbackState = NoRollback | RollingBack(chain) | RollbackInMemStore(InMemoryStore.t)

type addressToDynContractLookup = dict<DbFunctions.DynamicContractRegistry.contractTypeAndAddress>

type t = {
config: Config.t,
chainManager: ChainManager.t,
Expand All @@ -18,7 +16,6 @@ type t = {
//Initialized as 0, increments, when rollbacks occur to invalidate
//responses based on the wrong stateId
id: int,
preregisteredDynamicContracts: ChainMap.t<addressToDynContractLookup>,
}

//TODO: remove the optional bool
Expand All @@ -36,7 +33,6 @@ let make = (~config, ~chainManager, ~loadLayer) => {
asyncTaskQueue: AsyncTaskQueue.make(),
loadLayer,
id: 0,
preregisteredDynamicContracts: chainManager.chainFetchers->ChainMap.map(_ => Js.Dict.empty()),
}

let getId = self => self.id
Expand Down Expand Up @@ -393,9 +389,10 @@ let handleBlockRangeResponse = (state, ~chain, ~response: ChainWorker.blockRange

Prometheus.setFetchedEventsUntilHeight(~blockNumber=response.heighestQueriedBlockNumber, ~chain)

let processAction = updatedChainFetcher.isPreRegisteringDynamicContracts
? PreRegisterDynamicContracts
: ProcessEventBatch
let processAction =
updatedChainFetcher->ChainFetcher.isPreRegisteringDynamicContracts
? PreRegisterDynamicContracts
: ProcessEventBatch

(
nextState,
Expand Down Expand Up @@ -610,37 +607,54 @@ let actionReducer = (state: t, action: action) => {
Js.log("dynamic contract pre-register processed")
Js.log2("latest", latestProcessedBlocks->ChainMap.values)
Js.log2("dyn", dynamicContractRegistrations->Option.map(v => v.registrations->Array.length))
let updatedLatestBlocks = updateLatestProcessedBlocks(~state, ~latestProcessedBlocks)
let state = updateLatestProcessedBlocks(~state, ~latestProcessedBlocks)

let updatedState = switch dynamicContractRegistrations {
| None => updatedLatestBlocks
let state = switch dynamicContractRegistrations {
| None => state
| Some({registrations}) =>
//Create an empty map for mutating the contractAddress mapping
let chainMap: ChainMap.t<addressToDynContractLookup> =
state.preregisteredDynamicContracts->ChainMap.map(_ => Js.Dict.empty())
let tempChainMap: ChainMap.t<ChainFetcher.addressToDynContractLookup> =
state.chainManager.chainFetchers->ChainMap.map(_ => Js.Dict.empty())

registrations->Array.forEach(({dynamicContracts}) =>
dynamicContracts->Array.forEach(dynamicContract => {
let chain = ChainMap.Chain.makeUnsafe(~chainId=dynamicContract.chainId)
let contractAddressMapping = chainMap->ChainMap.get(chain)
let contractAddressMapping = tempChainMap->ChainMap.get(chain)
contractAddressMapping->Js.Dict.set(
dynamicContract.contractAddress->Address.toString,
dynamicContract,
)
})
)
let preregisteredDynamicContracts =
state.preregisteredDynamicContracts->ChainMap.mapWithKey((chain, currentMapping) => {
let newMapping = chainMap->ChainMap.get(chain)
Utils.Dict.merge(currentMapping, newMapping)
})

let updatedChainFetchers = state.chainManager.chainFetchers->ChainMap.mapWithKey((
chain,
cf,
) => {
let dynamicContractPreRegistration = switch cf.dynamicContractPreRegistration {
| Some(current) => current->Utils.Dict.merge(tempChainMap->ChainMap.get(chain))
//Should never be the case while this task is being scheduled
| None => tempChainMap->ChainMap.get(chain)
}->Some

{
...cf,
dynamicContractPreRegistration,
}
})

let updatedChainManager = {
...state.chainManager,
chainFetchers: updatedChainFetchers,
}
{
...updatedLatestBlocks,
preregisteredDynamicContracts,
...state,
chainManager: updatedChainManager,
}
}

(
updatedState,
state,
[
UpdateChainMetaDataAndCheckForExit(NoExit),
PreRegisterDynamicContracts,
Expand All @@ -650,13 +664,19 @@ let actionReducer = (state: t, action: action) => {
| StartIndexingAfterPreRegister =>
Js.log("starting indexing after pre-register")
let {config, chainManager, loadLayer} = state
let chainFetchers = chainManager.chainFetchers->ChainMap.mapWithKey((chain, cf) => {
let {chainConfig, logger, fetchState: {startBlock, endBlock, maxAddrInPartition}} = cf
let chainFetchers = chainManager.chainFetchers->ChainMap.map(cf => {
let {
chainConfig,
logger,
fetchState: {startBlock, endBlock, maxAddrInPartition},
dynamicContractPreRegistration,
} = cf

ChainFetcher.make(
~dynamicContractRegistrations=state.preregisteredDynamicContracts
->ChainMap.get(chain)
->Js.Dict.values,
~dynamicContractRegistrations=dynamicContractPreRegistration->Option.mapWithDefault(
[],
Js.Dict.values,
),
~chainConfig,
~lastBlockScannedHashes=ReorgDetection.LastBlockScannedHashes.empty(
~confirmedBlockThreshold=chainConfig.confirmedBlockThreshold,
Expand All @@ -672,9 +692,10 @@ let actionReducer = (state: t, action: action) => {
~numBatchesFetched=0,
~eventFilters=None,
~maxAddrInPartition,
~isPreRegisteringDynamicContracts=false,
~dynamicContractPreRegistration=None,
)
})

let chainManager: ChainManager.t = {
chainFetchers,
arbitraryEventQueue: [],
Expand Down Expand Up @@ -910,9 +931,10 @@ let injectedTaskReducer = (
~contractAddress,
~contractName: Enums.ContractType.t,
) => {
state.preregisteredDynamicContracts
->ChainMap.get(chain)
->Js.Dict.get(contractAddress->Address.toString)
let chainFetcher = state.chainManager.chainFetchers->ChainMap.get(chain)

chainFetcher.dynamicContractPreRegistration
->Option.flatMap(Js.Dict.get(_, contractAddress->Address.toString))
->Option.mapWithDefault(false, ({contractType}) => contractType == contractName)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,33 @@ describe("PartitionedFetchState getMostBehindPartitions", () => {
~message="Should have skipped partitions that are at max queue size and returned less than maxNumQueries",
)
})

it_only("benchmark fn", () => {
let maxNumQueries = 10
let numPartitions = 100

let partitions = Array.makeByAndShuffle(
numPartitions,
i => {
mockFetchState(~latestFetchedBlockNumber=i)
},
)->List.fromArray

let partitionedFetchState = mockPartitionedFetchState(~partitions)

let timeRef = Hrtime.makeTimer()
let _mostBehindPartitions =
partitionedFetchState->PartitionedFetchState.getMostBehindPartitions(
~maxNumQueries,
~maxPerChainQueueSize=10,
~partitionsCurrentlyFetching=Set.Int.empty,
)

//144750
//169209
//236334
//257334
let elapsed = timeRef->Hrtime.timeSince
Js.log2("elapsed", elapsed)
})
})

0 comments on commit 952732b

Please sign in to comment.