Skip to content

Commit

Permalink
Wip initial impl of dynamic contract pre-fetching
Browse files Browse the repository at this point in the history
  • Loading branch information
JonoPrest committed Oct 4, 2024
1 parent f39bd58 commit 3ad7e16
Show file tree
Hide file tree
Showing 13 changed files with 330 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ module EventFunctions = {
false,
~dynamicContractRegistrations=None,
~inMemoryStore,
~isPreRegisteringDynamicContracts=false,
) {
| Ok(_) => ()
| Error(e) => e->ErrorHandling.logAndRaise
Expand Down
17 changes: 17 additions & 0 deletions codegenerator/cli/templates/static/codegen/src/Config.res
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,23 @@ type chainConfig = {
chainWorker: module(ChainWorker.S),
}

// TODO: it should be possible to opt out of pre-registering dynamic contracts
let shouldPreRegisterDynamicContracts = (chainConfig: chainConfig) => {
let shouldPreRegisterDynamicContracts = ref(false)

chainConfig.contracts->Belt.Array.forEach(contract => {
contract.events->Belt.Array.forEach(event => {
if !shouldPreRegisterDynamicContracts.contents {
let module(Event) = event
shouldPreRegisterDynamicContracts :=
Event.handlerRegister->Types.HandlerTypes.Register.getContractRegister->Option.isSome
}
})
})

shouldPreRegisterDynamicContracts.contents
}

type historyFlag = FullHistory | MinHistory
type rollbackFlag = RollbackOnReorg | NoRollback
type historyConfig = {rollbackFlag: rollbackFlag, historyFlag: historyFlag}
Expand Down
99 changes: 79 additions & 20 deletions codegenerator/cli/templates/static/codegen/src/EventProcessing.res
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,26 @@ module EventsProcessed = {
}
}

let updateEventSyncState = (
eventBatchQueueItem: Types.eventBatchQueueItem,
~inMemoryStore: InMemoryStore.t,
~isPreRegisteringDynamicContracts,
) => {
let {event, chain, blockNumber, timestamp: blockTimestamp} = eventBatchQueueItem
let {logIndex} = event
let chainId = chain->ChainMap.Chain.toChainId
let _ = inMemoryStore.eventSyncState->InMemoryTable.set(
chainId,
{
chainId,
blockTimestamp,
blockNumber,
logIndex,
isPreRegisteringDynamicContracts,
},
)
}

type dynamicContractRegistration = {
registeringEventBlockNumber: int,
registeringEventLogIndex: int,
Expand Down Expand Up @@ -72,6 +92,7 @@ let addToDynamicContractRegistrations = (
registeringEventLogIndex,
registeringEventChain: eventBatchQueueItem.chain,
}

{
unprocessedBatch,
registrations: [...registrations, dynamicContractRegistration],
Expand All @@ -85,6 +106,7 @@ let runEventContractRegister = (
~checkContractIsRegistered,
~dynamicContractRegistrations: option<dynamicContractRegistrations>,
~inMemoryStore,
~isPreRegisteringDynamicContracts,
) => {
let {chain, event, blockNumber} = eventBatchQueueItem

Expand Down Expand Up @@ -127,6 +149,10 @@ let runEventContractRegister = (
)->Some
}

if isPreRegisteringDynamicContracts {
eventBatchQueueItem->updateEventSyncState(~inMemoryStore, ~isPreRegisteringDynamicContracts)
}

val->Ok
}
}
Expand Down Expand Up @@ -194,24 +220,6 @@ let addEventToRawEvents = (
inMemoryStore.rawEvents->InMemoryTable.set({chainId, eventId: eventIdStr}, rawEvent)
}

let updateEventSyncState = (
eventBatchQueueItem: Types.eventBatchQueueItem,
~inMemoryStore: InMemoryStore.t,
) => {
let {event, chain, blockNumber, timestamp: blockTimestamp} = eventBatchQueueItem
let {logIndex} = event
let chainId = chain->ChainMap.Chain.toChainId
let _ = inMemoryStore.eventSyncState->InMemoryTable.set(
chainId,
{
chainId,
blockTimestamp,
blockNumber,
logIndex,
},
)
}

let runEventHandler = (
eventBatchQueueItem: Types.eventBatchQueueItem,
~loaderHandler: Types.HandlerTypes.loaderHandler<_>,
Expand Down Expand Up @@ -247,7 +255,10 @@ let runEventHandler = (
->Error
->propogate
| () =>
eventBatchQueueItem->updateEventSyncState(~inMemoryStore)
eventBatchQueueItem->updateEventSyncState(
~inMemoryStore,
~isPreRegisteringDynamicContracts=false,
)
if config.enableRawEvents {
eventBatchQueueItem->addEventToRawEvents(~inMemoryStore)
}
Expand Down Expand Up @@ -303,6 +314,7 @@ let rec registerDynamicContracts = (
~eventsBeforeDynamicRegistrations=[],
~dynamicContractRegistrations: option<dynamicContractRegistrations>=None,
~inMemoryStore,
~isPreRegisteringDynamicContracts,
) => {
switch eventBatch[index] {
| None => (eventsBeforeDynamicRegistrations, dynamicContractRegistrations)->Ok
Expand All @@ -326,6 +338,7 @@ let rec registerDynamicContracts = (
~eventBatchQueueItem,
~dynamicContractRegistrations,
~inMemoryStore,
~isPreRegisteringDynamicContracts,
)
| None =>
dynamicContractRegistrations
Expand All @@ -349,6 +362,7 @@ let rec registerDynamicContracts = (
~eventsBeforeDynamicRegistrations,
~dynamicContractRegistrations,
~inMemoryStore,
~isPreRegisteringDynamicContracts,
)
| Error(e) => Error(e)
}
Expand Down Expand Up @@ -443,6 +457,46 @@ type batchProcessed = {
latestProcessedBlocks: EventsProcessed.t,
dynamicContractRegistrations: option<dynamicContractRegistrations>,
}

let getDynamicContractRegistrations = (
~eventBatch: array<Types.eventBatchQueueItem>,
~latestProcessedBlocks: EventsProcessed.t,
~checkContractIsRegistered,
) => {
Js.log("pre-registering dynamic contracts")
let logger = Logging.createChild(
~params={
"context": "pre-registration",
"batch-size": eventBatch->Array.length,
"first-event-timestamp": eventBatch[0]->Option.map(v => v.timestamp),
},
)
let inMemoryStore = InMemoryStore.make()
open ErrorHandling.ResultPropogateEnv
runAsyncEnv(async () => {
//Register all the dynamic contracts in this batch,
//only continue processing events before the first dynamic contract registration
let (_, dynamicContractRegistrations) =
eventBatch
->registerDynamicContracts(
~checkContractIsRegistered,
~logger,
~inMemoryStore,
~isPreRegisteringDynamicContracts=true,
)
->propogate

//We only preregister below the reorg threshold so it can be hardcoded as false
switch await DbFunctions.sql->IO.executeBatch(~inMemoryStore, ~isInReorgThreshold=false) {
| exception exn =>
exn->ErrorHandling.make(~msg="Failed writing batch to database", ~logger)->Error->propogate
| () => ()
}

Ok({latestProcessedBlocks, dynamicContractRegistrations})
})
}

let processEventBatch = (
~eventBatch: array<Types.eventBatchQueueItem>,
~inMemoryStore: InMemoryStore.t,
Expand Down Expand Up @@ -471,7 +525,12 @@ let processEventBatch = (
dynamicContractRegistrations,
) =
eventBatch
->registerDynamicContracts(~checkContractIsRegistered, ~logger, ~inMemoryStore)
->registerDynamicContracts(
~checkContractIsRegistered,
~logger,
~inMemoryStore,
~isPreRegisteringDynamicContracts=false,
)
->propogate

let elapsedAfterContractRegister =
Expand Down
15 changes: 13 additions & 2 deletions codegenerator/cli/templates/static/codegen/src/Utils.res
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ module Dict = {
It's the same as `Js.Dict.get` but it doesn't have runtime overhead to check if the key exists.
*/
external dangerouslyGetNonOption: (dict<'a>, string) => option<'a> = ""

let merge = (a: dict<'a>, b: dict<'a>) => {
let result = Js.Dict.empty()
Js.Dict.entries(a)->Js.Array2.forEach(((key, value)) => {
result->Js.Dict.set(key, value)
})
Js.Dict.entries(b)->Js.Array2.forEach(((key, value)) => {
result->Js.Dict.set(key, value)
})
result
}
}

module Math = {
Expand Down Expand Up @@ -220,8 +231,8 @@ module Schema = {
schema->S.preprocess(s => {
switch s.schema->S.classify {
| Literal(Null(_))
// This is a workaround for Fuel Bytes type
| Unknown => {serializer: _ => %raw(`"null"`)}
| // This is a workaround for Fuel Bytes type
Unknown => {serializer: _ => %raw(`"null"`)}
| Null(_)
| Bool => {
serializer: unknown => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ const chunkBatchQuery = async (sql, entityDataArray, queryToExecute) => {
const commaSeparateDynamicMapQuery = (sql, dynQueryConstructors) =>
sql`${dynQueryConstructors.map(
(constrQuery, i) =>
sql`${constrQuery(sql)}${i === dynQueryConstructors.length - 1 ? sql`` : sql`, `
}`,
sql`${constrQuery(sql)}${
i === dynQueryConstructors.length - 1 ? sql`` : sql`, `
}`,
)}`;

const batchSetItemsInTableCore = (table, sql, rowDataArray) => {
Expand Down Expand Up @@ -101,13 +102,15 @@ module.exports.batchSetEventSyncState = (sql, entityDataArray) => {
"block_number",
"log_index",
"block_timestamp",
"is_pre_registering_dynamic_contracts",
)}
ON CONFLICT(chain_id) DO UPDATE
SET
"chain_id" = EXCLUDED."chain_id",
"block_number" = EXCLUDED."block_number",
"log_index" = EXCLUDED."log_index",
"block_timestamp" = EXCLUDED."block_timestamp";
"block_timestamp" = EXCLUDED."block_timestamp",
"is_pre_registering_dynamic_contracts" = EXCLUDED."is_pre_registering_dynamic_contracts";
`;
};

Expand Down Expand Up @@ -143,7 +146,7 @@ module.exports.batchSetChainMetadata = (sql, entityDataArray) => {
"latest_fetched_block_number" = EXCLUDED."latest_fetched_block_number",
"timestamp_caught_up_to_head_or_endblock" = EXCLUDED."timestamp_caught_up_to_head_or_endblock",
"block_height" = EXCLUDED."block_height";`
.then((res) => { })
.then((res) => {})
.catch((err) => {
console.log("errored", err);
});
Expand All @@ -163,7 +166,7 @@ module.exports.setChainMetadataBlockHeight = (sql, entityDataArray) => {
SET
"chain_id" = EXCLUDED."chain_id",
"block_height" = EXCLUDED."block_height";`
.then((res) => { })
.then((res) => {})
.catch((err) => {
console.log("errored", err);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ module EventSyncState = {
@as("block_number") blockNumber: int,
@as("log_index") logIndex: int,
@as("block_timestamp") blockTimestamp: int,
@as("is_pre_registering_dynamic_contracts") isPreRegisteringDynamicContracts: bool,
}

let table = mkTable(
Expand All @@ -22,6 +23,7 @@ module EventSyncState = {
mkField("block_number", Integer),
mkField("log_index", Integer),
mkField("block_timestamp", Integer),
mkField("is_pre_registering_dynamic_contracts", Boolean),
],
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ type t = {
//An optional list of filters to apply on event queries
//Used for reorgs and restarts
eventFilters: option<FetchState.eventFilters>,
//Currently this state applies to all chains simultaneously but it may be possible to,
//in the future, have a per chain state and allow individual chains to start indexing as
//soon as the pre registration is done
isPreRegisteringDynamicContracts: bool,
}

//CONSTRUCTION
Expand All @@ -34,6 +38,7 @@ let make = (
~numBatchesFetched,
~eventFilters,
~maxAddrInPartition,
~isPreRegisteringDynamicContracts,
): t => {
let module(ChainWorker) = chainConfig.chainWorker
logger->Logging.childInfo("Initializing ChainFetcher with " ++ ChainWorker.name ++ " worker")
Expand All @@ -60,6 +65,7 @@ let make = (
numBatchesFetched,
eventFilters,
partitionsCurrentlyFetching: Belt.Set.Int.empty,
isPreRegisteringDynamicContracts,
}
}

Expand All @@ -71,6 +77,14 @@ let getStaticContracts = (chainConfig: Config.chainConfig) => {
})
}

module Stub = {
let getShouldPreRegisterDynamicContracts = (
handlerRegister: Types.HandlerTypes.Register.t<'eventArgs>,
) => {
handlerRegister->Types.HandlerTypes.Register.getContractRegister->Option.isSome
}
}

let makeFromConfig = (chainConfig: Config.chainConfig, ~maxAddrInPartition) => {
let logger = Logging.createChild(~params={"chainId": chainConfig.chain->ChainMap.Chain.toChainId})
let staticContracts = chainConfig->getStaticContracts
Expand All @@ -93,6 +107,7 @@ let makeFromConfig = (chainConfig: Config.chainConfig, ~maxAddrInPartition) => {
~eventFilters=None,
~dynamicContractRegistrations=[],
~maxAddrInPartition,
~isPreRegisteringDynamicContracts=chainConfig->Config.shouldPreRegisterDynamicContracts,
)
}

Expand All @@ -107,9 +122,10 @@ let makeFromDbState = async (chainConfig: Config.chainConfig, ~maxAddrInPartitio

let chainMetadata = await DbFunctions.ChainMetadata.getLatestChainMetadataState(~chainId)

let startBlock = latestProcessedEvent->Option.mapWithDefault(chainConfig.startBlock, event =>
//start from the same block but filter out any events already processed
event.blockNumber
let (startBlock, isPreRegisteringDynamicContracts) = latestProcessedEvent->Option.mapWithDefault(
(chainConfig.startBlock, chainConfig->Config.shouldPreRegisterDynamicContracts),
event => //start from the same block but filter out any events already processed
(event.blockNumber, event.isPreRegisteringDynamicContracts),
)

let eventFilters: option<
Expand Down Expand Up @@ -186,6 +202,7 @@ let makeFromDbState = async (chainConfig: Config.chainConfig, ~maxAddrInPartitio
~logger,
~eventFilters,
~maxAddrInPartition,
~isPreRegisteringDynamicContracts,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,16 @@ let createBatch = (self: t, ~maxBatchSize: int) => {
{val, isInReorgThreshold}
}

let isFetchingAtHead = self =>
self.chainFetchers
->ChainMap.values
->Array.reduce(true, (accum, cf) => accum && cf->ChainFetcher.isFetchingAtHead)

let isPreRegisteringDynamicContracts = self =>
self.chainFetchers
->ChainMap.values
->Array.reduce(false, (accum, cf) => accum || cf.isPreRegisteringDynamicContracts)

module ExposedForTesting_Hidden = {
let priorityQueueComparitor = priorityQueueComparitor
let getComparitorFromItem = getComparitorFromItem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,6 @@ module type S = {
~logger: Pino.t,
~currentBlockHeight: int,
~setCurrentBlockHeight: int => unit,
~isPreRegisteringDynamicContracts: bool,
) => promise<result<blockRangeFetchResponse, ErrorHandling.t>>
}
Loading

0 comments on commit 3ad7e16

Please sign in to comment.