From 8e0f015acb1fb1c91791b4af6a3d8e38b0568c07 Mon Sep 17 00:00:00 2001 From: Jono Prest <65739024+JonoPrest@users.noreply.github.com> Date: Tue, 1 Oct 2024 15:41:30 +0200 Subject: [PATCH] Implement start saving history the near the head (#235) * Implement add saving history at the near the head * Get isInReorgThreshold from persisted db values * Fix tests --------- Co-authored-by: Dmitry Zakharov --- .../dynamic/codegen/src/ContextEnv.res.hbs | 52 ++++-- .../templates/dynamic/codegen/src/IO.res.hbs | 15 +- .../dynamic/codegen/src/TestHelpers.res.hbs | 1 + .../codegen/src/TestHelpers_MockDb.res.hbs | 2 +- .../templates/static/codegen/src/Config.res | 7 + .../static/codegen/src/EventProcessing.res | 24 ++- .../static/codegen/src/InMemoryTable.res | 4 +- .../static/codegen/src/db/DbFunctions.res | 14 ++ .../src/eventFetching/ChainFetcher.res | 3 +- .../src/eventFetching/ChainManager.res | 169 +++++++++++++----- .../codegen/src/eventFetching/FetchState.res | 11 ++ .../codegen/src/globalState/GlobalState.res | 41 +++-- .../test_codegen/test/ChainManager_test.res | 28 +-- scenarios/test_codegen/test/Mock_test.res | 2 + scenarios/test_codegen/test/helpers/Mock.res | 4 +- .../test/schema_types/BigDecimal_test.res | 7 +- .../test/schema_types/Timestamp_test.res | 7 +- 17 files changed, 293 insertions(+), 98 deletions(-) diff --git a/codegenerator/cli/templates/dynamic/codegen/src/ContextEnv.res.hbs b/codegenerator/cli/templates/dynamic/codegen/src/ContextEnv.res.hbs index ea7e79c75..56878bc1f 100644 --- a/codegenerator/cli/templates/dynamic/codegen/src/ContextEnv.res.hbs +++ b/codegenerator/cli/templates/dynamic/codegen/src/ContextEnv.res.hbs @@ -18,7 +18,9 @@ let getUserLogger = (logger): Logs.userLogger => { logger->Logging.uerrorWithExn(exn, message), } -let makeEventIdentifier = (eventBatchQueueItem: Types.eventBatchQueueItem): Types.eventIdentifier => { +let makeEventIdentifier = ( + eventBatchQueueItem: Types.eventBatchQueueItem, +): Types.eventIdentifier => { let {event, blockNumber, timestamp} = eventBatchQueueItem { chainId: event.chainId, @@ -29,7 +31,10 @@ let makeEventIdentifier = (eventBatchQueueItem: Types.eventBatchQueueItem): Type } let getEventId = (eventBatchQueueItem: Types.eventBatchQueueItem) => { - EventUtils.packEventIndex(~blockNumber=eventBatchQueueItem.blockNumber, ~logIndex=eventBatchQueueItem.event.logIndex) + EventUtils.packEventIndex( + ~blockNumber=eventBatchQueueItem.blockNumber, + ~logIndex=eventBatchQueueItem.event.logIndex, + ) } let make = (~eventBatchQueueItem: Types.eventBatchQueueItem, ~logger) => { @@ -86,8 +91,21 @@ let makeDynamicContractRegisterFn = (~contextEnv: t, ~contractName, ~inMemorySto ) } -let makeWhereLoader = (loadLayer, ~entityMod, ~inMemoryStore, ~fieldName, ~fieldValueSchema, ~logger) => { - Entities.eq: loadLayer->LoadLayer.makeWhereEqLoader(~entityMod, ~fieldName, ~fieldValueSchema, ~inMemoryStore, ~logger) +let makeWhereLoader = ( + loadLayer, + ~entityMod, + ~inMemoryStore, + ~fieldName, + ~fieldValueSchema, + ~logger, +) => { + Entities.eq: loadLayer->LoadLayer.makeWhereEqLoader( + ~entityMod, + ~fieldName, + ~fieldValueSchema, + ~inMemoryStore, + ~logger, + ), } let makeEntityHandlerContext = ( @@ -98,20 +116,22 @@ let makeEntityHandlerContext = ( ~logger, ~getKey, ~loadLayer, + ~isInReorgThreshold, ): entityHandlerContext => { let inMemTable = inMemoryStore->InMemoryStore.getInMemTable(~entityMod) - let shouldRollbackOnReorg = RegisterHandlers.getConfig()->Config.shouldRollbackOnReorg + let shouldSaveHistory = + RegisterHandlers.getConfig()->Config.shouldSaveHistory(~isInReorgThreshold) { set: entity => { inMemTable->InMemoryTable.Entity.set( Set(entity)->Types.mkEntityUpdate(~eventIdentifier, ~entityId=getKey(entity)), - ~shouldRollbackOnReorg, + ~shouldSaveHistory, ) }, deleteUnsafe: entityId => { inMemTable->InMemoryTable.Entity.set( Delete->Types.mkEntityUpdate(~eventIdentifier, ~entityId), - ~shouldRollbackOnReorg, + ~shouldSaveHistory, ) }, get: loadLayer->LoadLayer.makeLoader(~entityMod, ~logger, ~inMemoryStore), @@ -152,7 +172,12 @@ let getLoaderContext = (contextEnv: t, ~inMemoryStore: InMemoryStore.t, ~loadLay } } -let getHandlerContext = (context, ~inMemoryStore: InMemoryStore.t, ~loadLayer) => { +let getHandlerContext = ( + context, + ~inMemoryStore: InMemoryStore.t, + ~loadLayer, + ~isInReorgThreshold, +) => { let {eventBatchQueueItem, logger} = context let eventIdentifier = eventBatchQueueItem->makeEventIdentifier @@ -166,6 +191,7 @@ let getHandlerContext = (context, ~inMemoryStore: InMemoryStore.t, ~loadLayer) = ~getKey=entity => entity.id, ~logger, ~loadLayer, + ~isInReorgThreshold, ), {{/each}} } @@ -181,8 +207,14 @@ let getLoaderArgs = (contextEnv, ~inMemoryStore, ~loadLayer) => { context: contextEnv->getLoaderContext(~inMemoryStore, ~loadLayer), } -let getHandlerArgs = (contextEnv, ~inMemoryStore, ~loaderReturn, ~loadLayer) => { +let getHandlerArgs = ( + contextEnv, + ~inMemoryStore, + ~loaderReturn, + ~loadLayer, + ~isInReorgThreshold, +) => { Types.HandlerTypes.event: contextEnv.eventBatchQueueItem.event, - context: contextEnv->getHandlerContext(~inMemoryStore, ~loadLayer), + context: contextEnv->getHandlerContext(~inMemoryStore, ~loadLayer, ~isInReorgThreshold), loaderReturn, } diff --git a/codegenerator/cli/templates/dynamic/codegen/src/IO.res.hbs b/codegenerator/cli/templates/dynamic/codegen/src/IO.res.hbs index ca61164f4..6b9a8847e 100644 --- a/codegenerator/cli/templates/dynamic/codegen/src/IO.res.hbs +++ b/codegenerator/cli/templates/dynamic/codegen/src/IO.res.hbs @@ -136,9 +136,9 @@ let executeDbFunctionsEntity = ( promises->Promise.all->Promise.thenResolve(_ => ()) } -let executeBatch = async (sql, ~inMemoryStore: InMemoryStore.t) => { +let executeBatch = async (sql, ~inMemoryStore: InMemoryStore.t, ~isInReorgThreshold) => { let entityDbExecutionComposer = - RegisterHandlers.getConfig()->Config.shouldRollbackOnReorg + RegisterHandlers.getConfig()->Config.shouldSaveHistory(~isInReorgThreshold) ? executeSetEntityWithHistory : executeDbFunctionsEntity @@ -223,7 +223,8 @@ module RollBack = { let inMemStore = InMemoryStore.makeWithRollBackEventIdentifier(Some(rollBackEventIdentifier)) - let shouldRollbackOnReorg = RegisterHandlers.getConfig()->Config.shouldRollbackOnReorg + //Don't save the rollback diffs to history table + let shouldSaveHistory = false reorgData->Belt.Array.forEach(e => { switch e { @@ -232,8 +233,8 @@ module RollBack = { {{#each entities as | entity |}} | {previousEntity: Some({entity: {{entity.name.capitalized}}(entity), eventIdentifier}), entityId} => inMemStore.{{entity.name.uncapitalized}}->InMemoryTable.Entity.set( - Set(entity)->Types.mkEntityUpdate(~eventIdentifier, ~entityId, ~shouldSaveHistory=false), - ~shouldRollbackOnReorg, + Set(entity)->Types.mkEntityUpdate(~eventIdentifier, ~entityId, ~shouldSaveHistory), + ~shouldSaveHistory, ) {{/each}} //Where previousEntity is None, @@ -241,8 +242,8 @@ module RollBack = { {{#each entities as | entity |}} | {previousEntity: None, entityType: {{entity.name.capitalized}}, entityId} => inMemStore.{{entity.name.uncapitalized}}->InMemoryTable.Entity.set( - Delete->Types.mkEntityUpdate(~eventIdentifier=rollBackEventIdentifier, ~entityId, ~shouldSaveHistory=false), - ~shouldRollbackOnReorg, + Delete->Types.mkEntityUpdate(~eventIdentifier=rollBackEventIdentifier, ~entityId, ~shouldSaveHistory), + ~shouldSaveHistory, ) {{/each}} } diff --git a/codegenerator/cli/templates/dynamic/codegen/src/TestHelpers.res.hbs b/codegenerator/cli/templates/dynamic/codegen/src/TestHelpers.res.hbs index 75d424518..b7c4ecb87 100644 --- a/codegenerator/cli/templates/dynamic/codegen/src/TestHelpers.res.hbs +++ b/codegenerator/cli/templates/dynamic/codegen/src/TestHelpers.res.hbs @@ -151,6 +151,7 @@ module EventFunctions = { ~logger, ~latestProcessedBlocks, ~config, + ~isInReorgThreshold=false, ) { | Ok(_) => () | Error(e) => e->ErrorHandling.logAndRaise diff --git a/codegenerator/cli/templates/dynamic/codegen/src/TestHelpers_MockDb.res.hbs b/codegenerator/cli/templates/dynamic/codegen/src/TestHelpers_MockDb.res.hbs index c6321f512..2094af1f2 100644 --- a/codegenerator/cli/templates/dynamic/codegen/src/TestHelpers_MockDb.res.hbs +++ b/codegenerator/cli/templates/dynamic/codegen/src/TestHelpers_MockDb.res.hbs @@ -140,7 +140,7 @@ let makeStoreOperatorEntity = ( ~entityId, ~eventIdentifier={chainId: -1, blockNumber: -1, blockTimestamp: 0, logIndex: -1}, ), - ~shouldRollbackOnReorg=RegisterHandlers.getConfig()->Config.shouldRollbackOnReorg, + ~shouldSaveHistory=false, ) updateEntityIndicesMockDb(~mockDbTable=table, ~entity, ~entityId) diff --git a/codegenerator/cli/templates/static/codegen/src/Config.res b/codegenerator/cli/templates/static/codegen/src/Config.res index ca03ba0e9..59bd8aa38 100644 --- a/codegenerator/cli/templates/static/codegen/src/Config.res +++ b/codegenerator/cli/templates/static/codegen/src/Config.res @@ -120,6 +120,13 @@ let shouldRollbackOnReorg = config => | _ => false } +let shouldSaveHistory = (config, ~isInReorgThreshold) => + switch config.historyConfig { + | {rollbackFlag: RollbackOnReorg} if isInReorgThreshold => true + | {historyFlag: FullHistory} => true + | _ => false + } + let shouldPruneHistory = config => switch config.historyConfig { | {historyFlag: MinHistory} => true diff --git a/codegenerator/cli/templates/static/codegen/src/EventProcessing.res b/codegenerator/cli/templates/static/codegen/src/EventProcessing.res index 3dcd3b94c..698c0e869 100644 --- a/codegenerator/cli/templates/static/codegen/src/EventProcessing.res +++ b/codegenerator/cli/templates/static/codegen/src/EventProcessing.res @@ -220,6 +220,7 @@ let runEventHandler = ( ~latestProcessedBlocks, ~loadLayer, ~config: Config.t, + ~isInReorgThreshold, ) => { open ErrorHandling.ResultPropogateEnv runAsyncEnv(async () => { @@ -230,7 +231,12 @@ let runEventHandler = ( (await runEventLoader(~contextEnv, ~loader, ~inMemoryStore, ~loadLayer))->propogate switch await handler( - contextEnv->ContextEnv.getHandlerArgs(~loaderReturn, ~inMemoryStore, ~loadLayer), + contextEnv->ContextEnv.getHandlerArgs( + ~loaderReturn, + ~inMemoryStore, + ~loadLayer, + ~isInReorgThreshold, + ), ) { | exception exn => exn @@ -262,6 +268,7 @@ let runHandler = ( ~logger, ~loadLayer, ~config, + ~isInReorgThreshold, ) => { switch eventBatchQueueItem.handlerRegister->Types.HandlerTypes.Register.getLoaderHandler { | Some(loaderHandler) => @@ -272,6 +279,7 @@ let runHandler = ( ~logger, ~loadLayer, ~config, + ~isInReorgThreshold, ) | None => Ok(latestProcessedBlocks)->Promise.resolve } @@ -385,6 +393,7 @@ let runHandlers = ( ~logger, ~loadLayer, ~config, + ~isInReorgThreshold, ) => { open ErrorHandling.ResultPropogateEnv let latestProcessedBlocks = ref(latestProcessedBlocks) @@ -401,6 +410,7 @@ let runHandlers = ( ~latestProcessedBlocks=latestProcessedBlocks.contents, ~loadLayer, ~config, + ~isInReorgThreshold, ) )->propogate } @@ -436,6 +446,7 @@ type batchProcessed = { let processEventBatch = ( ~eventBatch: array, ~inMemoryStore: InMemoryStore.t, + ~isInReorgThreshold, ~latestProcessedBlocks: EventsProcessed.t, ~checkContractIsRegistered, ~loadLayer, @@ -474,12 +485,19 @@ let processEventBatch = ( let latestProcessedBlocks = (await eventsBeforeDynamicRegistrations - ->runHandlers(~inMemoryStore, ~latestProcessedBlocks, ~logger, ~loadLayer, ~config)) + ->runHandlers( + ~inMemoryStore, + ~latestProcessedBlocks, + ~logger, + ~loadLayer, + ~config, + ~isInReorgThreshold, + )) ->propogate let elapsedTimeAfterProcess = timeRef->Hrtime.timeSince->Hrtime.toMillis->Hrtime.intFromMillis - switch await DbFunctions.sql->IO.executeBatch(~inMemoryStore) { + switch await DbFunctions.sql->IO.executeBatch(~inMemoryStore, ~isInReorgThreshold) { | exception exn => exn->ErrorHandling.make(~msg="Failed writing batch to database", ~logger)->Error->propogate | () => () diff --git a/codegenerator/cli/templates/static/codegen/src/InMemoryTable.res b/codegenerator/cli/templates/static/codegen/src/InMemoryTable.res index e1e67399e..54aa1f22a 100644 --- a/codegenerator/cli/templates/static/codegen/src/InMemoryTable.res +++ b/codegenerator/cli/templates/static/codegen/src/InMemoryTable.res @@ -141,7 +141,7 @@ module Entity = { let set = ( inMemTable: t<'entity>, entityUpdate: Types.entityUpdate<'entity>, - ~shouldRollbackOnReorg, + ~shouldSaveHistory, ) => { let {entityRow, entityIndices} = switch inMemTable.table->get(entityUpdate.entityId) { | Some({entityRow: InitialReadFromDb(entity_read), entityIndices}) => @@ -152,7 +152,7 @@ module Entity = { }) {entityRow, entityIndices} | Some({entityRow: Updated(previous_values), entityIndices}) - if !shouldRollbackOnReorg || + if !shouldSaveHistory || //Rollback initial state cases should not save history !previous_values.latest.shouldSaveHistory || // This prevents two db actions in the same event on the same entity from being recorded to the history table. diff --git a/codegenerator/cli/templates/static/codegen/src/db/DbFunctions.res b/codegenerator/cli/templates/static/codegen/src/db/DbFunctions.res index 024fc5f6e..03f22ee02 100644 --- a/codegenerator/cli/templates/static/codegen/src/db/DbFunctions.res +++ b/codegenerator/cli/templates/static/codegen/src/db/DbFunctions.res @@ -16,6 +16,18 @@ type chainId = int type eventId = string type blockNumberRow = {@as("block_number") blockNumber: int} +module General = { + type existsRes = {exists: bool} + + let hasRows = async (sql, ~table: Table.table) => { + let query = `SELECT EXISTS(SELECT 1 FROM public.${table.tableName});` + switch await sql->Postgres.unsafe(query) { + | [{exists}] => exists + | _ => Js.Exn.raiseError("Unexpected result from hasRows query: " ++ query) + } + } +} + module ChainMetadata = { type chainMetadata = { @as("chain_id") chainId: int, @@ -370,4 +382,6 @@ module EntityHistory = { }) }) } + + let hasRows = () => General.hasRows(sql, ~table=TablesStatic.EntityHistory.table) } diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res index 3f45baf58..bb58e5fc5 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res @@ -284,8 +284,7 @@ let hasProcessedToEndblock = (self: t) => { } let hasNoMoreEventsToProcess = (self: t, ~hasArbQueueEvents) => { - !hasArbQueueEvents && - self.fetchState->PartitionedFetchState.queueSize === 0 + !hasArbQueueEvents && self.fetchState->PartitionedFetchState.queueSize === 0 } /** diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res index c2520cb6b..6080d8298 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res @@ -5,6 +5,7 @@ type t = { //due to contract registration. Ordered from latest to earliest arbitraryEventQueue: array, isUnorderedMultichainMode: bool, + isInReorgThreshold: bool, } let getComparitorFromItem = (queueItem: Types.eventBatchQueueItem) => { @@ -61,10 +62,20 @@ let chainFetcherPeekComparitorEarliestEventPrioritizeEvents = ( type noItemsInArray = NoItemsInArray +type isInReorgThresholdRes<'payload> = { + isInReorgThreshold: bool, + val: 'payload, +} + +type fetchStateWithData = { + partitionedFetchState: PartitionedFetchState.t, + heighestBlockBelowThreshold: int, +} + let determineNextEvent = ( - fetchStatesMap: ChainMap.t, + fetchStatesMap: ChainMap.t, ~isUnorderedMultichainMode: bool, -): result => { +): result, noItemsInArray> => { let comparitorFunction = if isUnorderedMultichainMode { chainFetcherPeekComparitorEarliestEventPrioritizeEvents } else { @@ -74,27 +85,38 @@ let determineNextEvent = ( let nextItem = fetchStatesMap ->ChainMap.entries - ->Array.reduce(None, (accum, (chain, partitionedFetchState)) => { + ->Array.reduce({isInReorgThreshold: false, val: None}, ( + accum, + (chain, {partitionedFetchState, heighestBlockBelowThreshold}), + ) => { // If the fetch state has reached the end block we don't need to consider it switch partitionedFetchState->PartitionedFetchState.getEarliestEvent { | Some(earliestEvent) => + let {val, isInReorgThreshold} = accum + let mk = cmp => { + { + val: Some(cmp), + isInReorgThreshold: isInReorgThreshold || + cmp.earliestEvent->FetchState.queueItemIsInReorgThreshold(~heighestBlockBelowThreshold), + } + } let cmpA: multiChainEventComparitor = {chain, earliestEvent} - switch accum { - | None => cmpA + switch val { + | None => mk(cmpA) | Some(cmpB) => if comparitorFunction(cmpB, cmpA) { - cmpB + mk(cmpB) } else { - cmpA + mk(cmpA) } - }->Some + } | None => accum } }) switch nextItem { - | None => Error(NoItemsInArray) - | Some(item) => Ok(item) + | {val: None} => Error(NoItemsInArray) + | {val: Some(item), isInReorgThreshold} => Ok({val: item, isInReorgThreshold}) } } @@ -105,6 +127,7 @@ let makeFromConfig = (~config: Config.t, ~maxAddrInPartition=Env.maxAddrInPartit chainFetchers, arbitraryEventQueue: [], isUnorderedMultichainMode: config.isUnorderedMultichainMode, + isInReorgThreshold: false, } } @@ -119,10 +142,15 @@ let makeFromDbState = async (~config: Config.t, ~maxAddrInPartition=Env.maxAddrI let chainFetchers = ChainMap.fromArrayUnsafe(chainFetchersArr) + let hasStartedSavingHistory = await DbFunctions.EntityHistory.hasRows() + { isUnorderedMultichainMode: config.isUnorderedMultichainMode, arbitraryEventQueue: [], chainFetchers, + //If we have started saving history, continue to save history + //as regardless of whether we are still in a reorg threshold + isInReorgThreshold: hasStartedSavingHistory, } } @@ -186,82 +214,131 @@ let setChainFetcher = (self: t, chainFetcher: ChainFetcher.t) => { } } -let getFirstArbitraryEventsItemForChain = (queue: array, ~chain) => +let getFirstArbitraryEventsItemForChain = ( + queue: array, + ~chain, + ~fetchStatesMap: ChainMap.t, +): option> => queue ->Utils.Array.findReverseWithIndex((item: Types.eventBatchQueueItem) => { item.chain == chain }) ->Option.map(((item, index)) => { - FetchState.item, - popItemOffQueue: () => queue->Utils.Array.spliceInPlace(~pos=index, ~remove=1)->ignore, + let {heighestBlockBelowThreshold} = fetchStatesMap->ChainMap.get(item.chain) + let isInReorgThreshold = item.blockNumber > heighestBlockBelowThreshold + { + val: { + FetchState.item, + popItemOffQueue: () => queue->Utils.Array.spliceInPlace(~pos=index, ~remove=1)->ignore, + }, + isInReorgThreshold, + } }) -let getFirstArbitraryEventsItem = (queue: array) => +let getFirstArbitraryEventsItem = ( + queue: array, + ~fetchStatesMap: ChainMap.t, +): option> => queue ->Utils.Array.last - ->Option.map(item => {FetchState.item, popItemOffQueue: () => queue->Js.Array2.pop->ignore}) + ->Option.map(item => { + let {heighestBlockBelowThreshold} = fetchStatesMap->ChainMap.get(item.chain) + let isInReorgThreshold = item.blockNumber > heighestBlockBelowThreshold + { + val: {FetchState.item, popItemOffQueue: () => queue->Js.Array2.pop->ignore}, + isInReorgThreshold, + } + }) let popBatchItem = ( - ~fetchStatesMap: ChainMap.t, + ~fetchStatesMap: ChainMap.t, ~arbitraryEventQueue: array, ~isUnorderedMultichainMode, -): option => { +): isInReorgThresholdRes> => { //Compare the peeked items and determine the next item switch fetchStatesMap->determineNextEvent(~isUnorderedMultichainMode) { - | Ok({chain, earliestEvent}) => + | Ok({val: {chain, earliestEvent}, isInReorgThreshold}) => let maybeArbItem = if isUnorderedMultichainMode { - arbitraryEventQueue->getFirstArbitraryEventsItemForChain(~chain) + arbitraryEventQueue->getFirstArbitraryEventsItemForChain(~chain, ~fetchStatesMap) } else { - arbitraryEventQueue->getFirstArbitraryEventsItem + arbitraryEventQueue->getFirstArbitraryEventsItem(~fetchStatesMap) } switch maybeArbItem { //If there is item on the arbitray events queue, and it is earlier than //than the earlist event, take the item off from there - | Some(itemWithPopFn) + | Some({val: itemWithPopFn, isInReorgThreshold}) if Item(itemWithPopFn)->getQueueItemComparitor(~chain=itemWithPopFn.item.chain) < - earliestEvent->getQueueItemComparitor(~chain) => - Some(itemWithPopFn) + earliestEvent->getQueueItemComparitor(~chain) => { + isInReorgThreshold, + val: Some(itemWithPopFn), + } | _ => switch earliestEvent { - | NoItem(_) => None - | Item(itemWithPopFn) => Some(itemWithPopFn) + | NoItem(_) => { + isInReorgThreshold, + val: None, + } + | Item(itemWithPopFn) => { + isInReorgThreshold, + val: Some(itemWithPopFn), + } } } - | Error(NoItemsInArray) => arbitraryEventQueue->getFirstArbitraryEventsItem + | Error(NoItemsInArray) => + arbitraryEventQueue + ->getFirstArbitraryEventsItem(~fetchStatesMap) + ->Option.mapWithDefault({val: None, isInReorgThreshold: false}, ({val, isInReorgThreshold}) => { + isInReorgThreshold, + val: Some(val), + }) } } +let getFetchStateWithData = (self: t, ~shouldDeepCopy=false): ChainMap.t => { + self.chainFetchers->ChainMap.map(cf => { + partitionedFetchState: shouldDeepCopy + ? cf.fetchState->PartitionedFetchState.copy + : cf.fetchState, + heighestBlockBelowThreshold: cf.currentBlockHeight - cf.chainConfig.confirmedBlockThreshold, + }) +} + /** Simply calls popBatchItem in isolation using the chain manager without the context of a batch */ let nextItemIsNone = (self: t): bool => { popBatchItem( - ~fetchStatesMap=self.chainFetchers->ChainMap.map(cf => cf.fetchState), + ~fetchStatesMap=self->getFetchStateWithData, ~arbitraryEventQueue=self.arbitraryEventQueue, ~isUnorderedMultichainMode=self.isUnorderedMultichainMode, - )->Option.isNone + ).val->Option.isNone } -type batchRes = { - batch: array, - fetchStatesMap: ChainMap.t, - arbitraryEventQueue: array, +let hasChainItemsOnArbQueue = (self: t, ~chain): bool => { + self.arbitraryEventQueue->Js.Array2.find(item => item.chain == chain)->Option.isSome } let createBatchInternal = ( ~maxBatchSize, - ~fetchStatesMap: ChainMap.t, + ~fetchStatesMap: ChainMap.t, ~arbitraryEventQueue, ~isUnorderedMultichainMode, ) => { + let isInReorgThresholdRef = ref(false) let batch = [] let rec loop = () => - if batch->Array.length >= maxBatchSize { - batch - } else { - switch popBatchItem(~fetchStatesMap, ~arbitraryEventQueue, ~isUnorderedMultichainMode) { - | None => batch + if batch->Array.length < maxBatchSize { + let {val, isInReorgThreshold} = popBatchItem( + ~fetchStatesMap, + ~arbitraryEventQueue, + ~isUnorderedMultichainMode, + ) + + isInReorgThresholdRef := isInReorgThresholdRef.contents || isInReorgThreshold + + switch val { + | None => () | Some({item, popItemOffQueue}) => popItemOffQueue() batch->Js.Array2.push(item)->ignore @@ -269,6 +346,14 @@ let createBatchInternal = ( } } loop() + + {val: batch, isInReorgThreshold: isInReorgThresholdRef.contents} +} + +type batchRes = { + batch: array, + fetchStatesMap: ChainMap.t, + arbitraryEventQueue: array, } let createBatch = (self: t, ~maxBatchSize: int) => { @@ -277,9 +362,9 @@ let createBatch = (self: t, ~maxBatchSize: int) => { let {arbitraryEventQueue, chainFetchers} = self //Make a copy of the queues and fetch states since we are going to mutate them let arbitraryEventQueue = arbitraryEventQueue->Array.copy - let fetchStatesMap = chainFetchers->ChainMap.map(cf => cf.fetchState->PartitionedFetchState.copy) + let fetchStatesMap = self->getFetchStateWithData(~shouldDeepCopy=true) - let batch = createBatchInternal( + let {val: batch, isInReorgThreshold} = createBatchInternal( ~maxBatchSize, ~fetchStatesMap, ~arbitraryEventQueue, @@ -288,7 +373,7 @@ let createBatch = (self: t, ~maxBatchSize: int) => { let batchSize = batch->Array.length - if batchSize > 0 { + let val = if batchSize > 0 { let fetchedEventsBuffer = chainFetchers ->ChainMap.values @@ -322,6 +407,8 @@ let createBatch = (self: t, ~maxBatchSize: int) => { } else { None } + + {val, isInReorgThreshold} } module ExposedForTesting_Hidden = { diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/FetchState.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/FetchState.res index 664eee262..dae6443b6 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/FetchState.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/FetchState.res @@ -521,6 +521,10 @@ let getNextQuery = (~eventFilters=?, ~currentBlockHeight, ~partitionId, self: t) type itemWithPopFn = {item: Types.eventBatchQueueItem, popItemOffQueue: unit => unit} +let itemIsInReorgThreshold = (item: itemWithPopFn, ~heighestBlockBelowThreshold) => { + item.item.blockNumber > heighestBlockBelowThreshold +} + /** Represents a fetchState registers head of the fetchedEventQueue as either an existing item, or no item with latest fetched block data @@ -529,6 +533,13 @@ type queueItem = | Item(itemWithPopFn) | NoItem(blockNumberAndTimestamp) +let queueItemIsInReorgThreshold = (queueItem: queueItem, ~heighestBlockBelowThreshold) => { + switch queueItem { + | Item(itemWithPopFn) => itemWithPopFn->itemIsInReorgThreshold(~heighestBlockBelowThreshold) + | NoItem({blockNumber}) => blockNumber > heighestBlockBelowThreshold + } +} + /** Creates a compareable value for items and no items on register queues. Block number takes priority here. Since a latest fetched timestamp could diff --git a/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res b/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res index 570f4a889..095ae3b53 100644 --- a/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res +++ b/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res @@ -56,9 +56,10 @@ type action = | SetFetchStateCurrentBlockHeight(chain, int) | EventBatchProcessed(EventProcessing.batchProcessed) | SetCurrentlyProcessing(bool) + | SetIsInReorgThreshold(bool) | SetCurrentlyFetchingBatch(chain, bool) | SetFetchState(chain, PartitionedFetchState.t) - | UpdateQueues(ChainMap.t, arbitraryEventQueue) + | UpdateQueues(ChainMap.t, arbitraryEventQueue) | SetSyncedChains | SuccessExit | ErrorExit(ErrorHandling.t) @@ -190,9 +191,7 @@ let checkAndSetSyncedChains = (~nextQueueItemIsKnownNone=false, chainManager: Ch //All events have been processed on the chain fetchers queue //Other chains may be busy syncing let hasArbQueueEvents = - chainManager.arbitraryEventQueue - ->ChainManager.getFirstArbitraryEventsItemForChain(~chain=cf.chainConfig.chain) - ->Option.isSome //TODO this is more expensive than it needs to be + chainManager->ChainManager.hasChainItemsOnArbQueue(~chain=cf.chainConfig.chain) let hasNoMoreEventsToProcess = cf->ChainFetcher.hasNoMoreEventsToProcess(~hasArbQueueEvents) if hasNoMoreEventsToProcess { @@ -236,10 +235,7 @@ let updateLatestProcessedBlocks = ( let {chainConfig: {chain}, fetchState} = cf let {numEventsProcessed, latestProcessedBlock} = latestProcessedBlocks->ChainMap.get(chain) - let hasArbQueueEvents = - state.chainManager.arbitraryEventQueue - ->ChainManager.getFirstArbitraryEventsItemForChain(~chain) - ->Option.isSome //TODO this is more expensive than it needs to be + let hasArbQueueEvents = state.chainManager->ChainManager.hasChainItemsOnArbQueue(~chain) let hasNoMoreEventsToProcess = cf->ChainFetcher.hasNoMoreEventsToProcess(~hasArbQueueEvents) let latestProcessedBlock = if hasNoMoreEventsToProcess { @@ -322,10 +318,7 @@ let handleBlockRangeResponse = (state, ~chain, ~response: ChainWorker.blockRange ->Utils.unwrapResultExn ->updateChainFetcherCurrentBlockHeight(~currentBlockHeight) - let hasArbQueueEvents = - state.chainManager.arbitraryEventQueue - ->ChainManager.getFirstArbitraryEventsItemForChain(~chain) - ->Option.isSome //TODO this is more expensive than it needs to be + let hasArbQueueEvents = state.chainManager->ChainManager.hasChainItemsOnArbQueue(~chain) let hasNoMoreEventsToProcess = chainFetcher->ChainFetcher.hasNoMoreEventsToProcess(~hasArbQueueEvents) @@ -537,6 +530,10 @@ let actionReducer = (state: t, action: action) => { [UpdateChainMetaDataAndCheckForExit(NoExit), ProcessEventBatch], ) | SetCurrentlyProcessing(currentlyProcessingBatch) => ({...state, currentlyProcessingBatch}, []) + | SetIsInReorgThreshold(isInReorgThreshold) => ( + {...state, chainManager: {...state.chainManager, isInReorgThreshold}}, + [], + ) | SetCurrentlyFetchingBatch(chain, isFetchingBatch) => updateChainFetcher( currentChainFetcher => {...currentChainFetcher, isFetchingBatch}, @@ -566,7 +563,7 @@ let actionReducer = (state: t, action: action) => { let chainFetchers = state.chainManager.chainFetchers->ChainMap.mapWithKey((chain, cf) => { { ...cf, - fetchState: fetchStatesMap->ChainMap.get(chain), + fetchState: ChainMap.get(fetchStatesMap, chain).partitionedFetchState, } }) @@ -802,9 +799,18 @@ let injectedTaskReducer = ( | ProcessEventBatch => if !state.currentlyProcessingBatch && !isRollingBack(state) { switch state.chainManager->ChainManager.createBatch(~maxBatchSize=state.maxBatchSize) { - | Some({batch, fetchStatesMap, arbitraryEventQueue}) => + | {isInReorgThreshold, val: Some({batch, fetchStatesMap, arbitraryEventQueue})} => dispatchAction(SetCurrentlyProcessing(true)) dispatchAction(UpdateQueues(fetchStatesMap, arbitraryEventQueue)) + if isInReorgThreshold && !state.chainManager.isInReorgThreshold { + //On the first time we enter the reorg threshold, copy all entities to entity history + //And set the isInReorgThreshold isInReorgThreshold state to true + dispatchAction(SetIsInReorgThreshold(true)) + //TODO: persist the isInReorgThreshold state to the db in a transaction with copy + await DbFunctions.sql->DbFunctions.EntityHistory.copyAllEntitiesToEntityHistory + } + + let isInReorgThreshold = state.chainManager.isInReorgThreshold || isInReorgThreshold // This function is used to ensure that registering an alreday existing contract as a dynamic contract can't cause issues. let checkContractIsRegistered = ( @@ -812,8 +818,8 @@ let injectedTaskReducer = ( ~contractAddress, ~contractName: Enums.ContractType.t, ) => { - let fetchState = fetchStatesMap->ChainMap.get(chain) - fetchState->PartitionedFetchState.checkContainsRegisteredContractAddress( + let {partitionedFetchState} = fetchStatesMap->ChainMap.get(chain) + partitionedFetchState->PartitionedFetchState.checkContainsRegisteredContractAddress( ~contractAddress, ~contractName=(contractName :> string), ) @@ -838,6 +844,7 @@ let injectedTaskReducer = ( switch await EventProcessing.processEventBatch( ~eventBatch=batch, ~inMemoryStore, + ~isInReorgThreshold, ~checkContractIsRegistered, ~latestProcessedBlocks, ~loadLayer=state.loadLayer, @@ -860,7 +867,7 @@ let injectedTaskReducer = ( | Error(errHandler) => dispatchAction(ErrorExit(errHandler)) } } - | None => dispatchAction(SetSyncedChains) //Known that there are no items available on the queue so safely call this action + | {val: None} => dispatchAction(SetSyncedChains) //Known that there are no items available on the queue so safely call this action } } | Rollback => diff --git a/scenarios/test_codegen/test/ChainManager_test.res b/scenarios/test_codegen/test/ChainManager_test.res index 30b1bb13e..71a3e6f3b 100644 --- a/scenarios/test_codegen/test/ChainManager_test.res +++ b/scenarios/test_codegen/test/ChainManager_test.res @@ -137,6 +137,7 @@ let populateChainQueuesWithRandomEvents = (~runTime=1000, ~maxBlockTime=15, ()) ChainManager.arbitraryEventQueue: arbitraryEventPriorityQueue.contents, chainFetchers, isUnorderedMultichainMode: false, + isInReorgThreshold: false, }, numberOfMockEventsCreated.contents, allEvents, @@ -173,7 +174,7 @@ describe("ChainManager", () => { let eventsInBlock = ChainManager.createBatch(chainManager, ~maxBatchSize=10000) // ensure that the events are ordered correctly - switch eventsInBlock { + switch eventsInBlock.val { | None => chainManager | Some({batch, fetchStatesMap, arbitraryEventQueue}) => batch->Belt.Array.forEach( @@ -231,7 +232,7 @@ describe("ChainManager", () => { // ) let nextChainFetchers = chainManager.chainFetchers->ChainMap.mapWithKey( (chain, fetcher) => { - let fetchState = fetchStatesMap->ChainMap.get(chain) + let {partitionedFetchState: fetchState} = fetchStatesMap->ChainMap.get(chain) { ...fetcher, fetchState, @@ -318,12 +319,15 @@ describe("determineNextEvent", () => { let makeMockPartitionedFetchState = ( ~latestFetchedBlockTimestamp, ~item, - ): PartitionedFetchState.t => { - partitions: list{makeMockFetchState(~latestFetchedBlockTimestamp, ~item)}, - maxAddrInPartition: Env.maxAddrInPartition, - startBlock: 0, - endBlock: None, - logger: Logging.logger, + ): ChainManager.fetchStateWithData => { + partitionedFetchState: { + partitions: list{makeMockFetchState(~latestFetchedBlockTimestamp, ~item)}, + maxAddrInPartition: Env.maxAddrInPartition, + startBlock: 0, + endBlock: None, + logger: Logging.logger, + }, + heighestBlockBelowThreshold: 500, } it( @@ -347,7 +351,7 @@ describe("determineNextEvent", () => { }, ) - let {earliestEvent} = determineNextEvent_unordered(fetchStatesMap)->Result.getExn + let {val: {earliestEvent}} = determineNextEvent_unordered(fetchStatesMap)->Result.getExn Assert.deepEqual( earliestEvent->FetchState_test.getItem, @@ -355,7 +359,7 @@ describe("determineNextEvent", () => { ~message="Should have taken the single item", ) - let {earliestEvent} = determineNextEvent_ordered(fetchStatesMap)->Result.getExn + let {val: {earliestEvent}} = determineNextEvent_ordered(fetchStatesMap)->Result.getExn Assert.deepEqual( earliestEvent, @@ -402,7 +406,7 @@ describe("determineNextEvent", () => { // NoItem(655 /* later timestamp than the test event */, {id:1}), // ] - let {earliestEvent} = determineNextEvent_unordered(fetchStatesMap)->Result.getExn + let {val: {earliestEvent}} = determineNextEvent_unordered(fetchStatesMap)->Result.getExn Assert.deepEqual( earliestEvent->FetchState_test.getItem, @@ -410,7 +414,7 @@ describe("determineNextEvent", () => { ~message="Should have taken the single item", ) - let {earliestEvent} = determineNextEvent_ordered(fetchStatesMap)->Result.getExn + let {val: {earliestEvent}} = determineNextEvent_ordered(fetchStatesMap)->Result.getExn Assert.deepEqual( earliestEvent, diff --git a/scenarios/test_codegen/test/Mock_test.res b/scenarios/test_codegen/test/Mock_test.res index 222b7b9e4..c7c85d5e1 100644 --- a/scenarios/test_codegen/test/Mock_test.res +++ b/scenarios/test_codegen/test/Mock_test.res @@ -22,6 +22,7 @@ describe("E2E Mock Event Batch", () => { ~logger=Logging.logger, ~loadLayer, ~config, + ~isInReorgThreshold=false, ) | None => Ok(EventProcessing.EventsProcessed.makeEmpty(~config)) } @@ -64,6 +65,7 @@ describe_skip("E2E Db check", () => { ~latestProcessedBlocks=EventProcessing.EventsProcessed.makeEmpty(~config), ~loadLayer, ~config, + ~isInReorgThreshold=false, ) //// TODO: write code (maybe via dependency injection) to allow us to use the stub rather than the actual database here. diff --git a/scenarios/test_codegen/test/helpers/Mock.res b/scenarios/test_codegen/test/helpers/Mock.res index 1d8c397b9..01d698f9f 100644 --- a/scenarios/test_codegen/test/helpers/Mock.res +++ b/scenarios/test_codegen/test/helpers/Mock.res @@ -13,7 +13,9 @@ module InMemoryStore = { }, ~entityId=entity->Entities.getEntityId, ), - ~shouldRollbackOnReorg=RegisterHandlers.getConfig()->Config.shouldRollbackOnReorg, + ~shouldSaveHistory=RegisterHandlers.getConfig()->Config.shouldSaveHistory( + ~isInReorgThreshold=false, + ), ) } diff --git a/scenarios/test_codegen/test/schema_types/BigDecimal_test.res b/scenarios/test_codegen/test/schema_types/BigDecimal_test.res index db4ad5581..82ded14d1 100644 --- a/scenarios/test_codegen/test/schema_types/BigDecimal_test.res +++ b/scenarios/test_codegen/test/schema_types/BigDecimal_test.res @@ -42,7 +42,12 @@ describe("Load and save an entity with a BigDecimal from DB", () => { let _ = loaderContext.entityWithBigDecimal.get(testEntity1.id) let _ = loaderContext.entityWithBigDecimal.get(testEntity2.id) - let handlerContext = contextEnv->ContextEnv.getHandlerContext(~inMemoryStore, ~loadLayer) + let handlerContext = + contextEnv->ContextEnv.getHandlerContext( + ~inMemoryStore, + ~loadLayer, + ~isInReorgThreshold=false, + ) switch await handlerContext.entityWithBigDecimal.get(testEntity1.id) { | Some(entity) => Assert.equal(entity.bigDecimal.toString(), "123.456") diff --git a/scenarios/test_codegen/test/schema_types/Timestamp_test.res b/scenarios/test_codegen/test/schema_types/Timestamp_test.res index 7c9f08288..f9fe35897 100644 --- a/scenarios/test_codegen/test/schema_types/Timestamp_test.res +++ b/scenarios/test_codegen/test/schema_types/Timestamp_test.res @@ -36,7 +36,12 @@ describe("Load and save an entity with a Timestamp from DB", () => { let _ = loaderContext.entityWithTimestamp.get(testEntity.id) - let handlerContext = contextEnv->ContextEnv.getHandlerContext(~inMemoryStore, ~loadLayer) + let handlerContext = + contextEnv->ContextEnv.getHandlerContext( + ~inMemoryStore, + ~loadLayer, + ~isInReorgThreshold=false, + ) switch await handlerContext.entityWithTimestamp.get(testEntity.id) { | Some(entity) =>