From 16186b70b32f81d000b651e85820e4c73294b3a8 Mon Sep 17 00:00:00 2001 From: Raman Shekhawat Date: Thu, 20 Apr 2023 02:16:22 +0530 Subject: [PATCH] process traits jobs after CHAIN_STATE_HEIGHT --- src/handleJobs.ts | 177 ++++++++++++++++++++++++++++++++++ src/jobs/compute-traits.ts | 191 +------------------------------------ src/processor.ts | 1 + 3 files changed, 181 insertions(+), 188 deletions(-) create mode 100644 src/handleJobs.ts diff --git a/src/handleJobs.ts b/src/handleJobs.ts new file mode 100644 index 00000000..f2b96224 --- /dev/null +++ b/src/handleJobs.ts @@ -0,0 +1,177 @@ +import isPlainObject from 'lodash/isPlainObject' +import { createHash } from 'crypto' +import connection from './connection' +import { Collection, Token, Trait, TraitToken } from './model' +import { traitsQueue, JobData } from './jobs/compute-traits' + +type TraitValueMap = Map + +const hash = (str: string) => { + return createHash('sha1').update(str).digest('hex') +} + +// eslint-disable-next-line sonarjs/cognitive-complexity +traitsQueue.process(async (job, done) => { + if (!job.data.collectionId) { + throw new Error('Collection ID not provided.') + } + + console.log(`Processing job ${job.id} for collection ${job.data.collectionId}`) + + if (!connection.isInitialized) { + await connection.initialize().catch((err) => { + throw err + }) + } + + const em = connection.manager + + const traitTypeMap = new Map() + const tokenTraitMap = new Map() + + const start = new Date() + + const { collectionId } = job.data satisfies JobData + + const tokens = await em + .getRepository(Token) + .createQueryBuilder('token') + .select('token.id') + .addSelect('token.metadata') + .addSelect('token.supply') + .leftJoinAndMapMany('token.traits', TraitToken, 'traitToken', 'traitToken.token = token.id') + .where('token.collection = :collectionId', { collectionId }) + .getMany() + + const traits = await em + .getRepository(Trait) + .createQueryBuilder('trait') + .where('trait.collection = :collectionId', { collectionId }) + .getMany() + + tokens.forEach((token) => { + if (!token.metadata || !token.metadata.attributes || !isPlainObject(token.metadata.attributes)) return + const attributes = token.metadata.attributes as Record + Object.entries(attributes).forEach(([traitType, { value }]) => { + if (!value) return + + if (!traitTypeMap.has(traitType)) { + traitTypeMap.set(traitType, new Map()) + } + const tType = traitTypeMap.get(traitType) as TraitValueMap + if (!tType.has(value)) { + tType.set(value, { count: 0n }) + } + const traitValue = tType.get(value) as TraitValueMap extends Map ? V : never + traitValue.count += token.supply + + tokenTraitMap.set(token.id, [...(tokenTraitMap.get(token.id) || []), `${traitType}:${value}`]) + }) + }) + + if (!traitTypeMap.size) { + console.log(`No traits found for collection ${collectionId}`) + done() + + return + } + + const traitsToSave: Trait[] = [] + const traitsToDelete: Trait[] = [] + const traitsToUpdate: Trait[] = [] + + traitTypeMap.forEach((traitValueMap, traitType) => { + traitValueMap.forEach((traitValue, value) => { + const trait = traits.find((t) => t.id === hash(`${collectionId}-${traitType}-${value}`)) + if (!trait) { + traitsToSave.push( + new Trait({ + id: hash(`${collectionId}-${traitType}-${value}`), + collection: new Collection({ id: collectionId }), + traitType, + value, + count: traitValue.count, + }) + ) + } else if (trait.count !== traitValue.count) { + trait.count = traitValue.count + traitsToUpdate.push(trait) + } + }) + }) + + traits.forEach((trait) => { + if ( + !traitTypeMap.has(trait.traitType) || + !traitTypeMap.get(trait.traitType)?.has(trait.value) || + trait.id !== hash(`${collectionId}-${trait.traitType}-${trait.value}`) + ) { + traitsToDelete.push(trait) + } + }) + + await em.upsert(Trait, [...traitsToSave, ...traitsToUpdate] as any, ['id']) + + const traitTokensToSave: TraitToken[] = [] + const traitTokensToDelete: TraitToken[] = [] + + tokenTraitMap.forEach((_traits, _tokenId) => { + if (!_traits.length) return + + const token = tokens.find((t) => t.id === _tokenId) + + _traits.forEach((t) => { + const [traitType, value] = t.split(':') + + if (token?.traits.length) { + for (let i = 0; i < token.traits.length; i += 1) { + const traitToken = token.traits[i] + if (traitToken.id === hash(`${collectionId}-${traitType}-${value}-${_tokenId}`)) { + return + } + + if ( + !_traits.some((tt) => { + const splitted = tt.split(':') + return traitToken.id === hash(`${collectionId}-${splitted[0]}-${splitted[1]}-${_tokenId}`) + }) + ) { + traitTokensToDelete.push(new TraitToken({ id: traitToken.id })) + return + } + } + } + + traitTokensToSave.push( + new TraitToken({ + id: hash(`${collectionId}-${traitType}-${value}-${_tokenId}`), + trait: new Trait({ id: hash(`${collectionId}-${traitType}-${value}`) }), + token: new Token({ id: _tokenId }), + }) + ) + }) + }) + + console.log( + `Saving TraitToken ${traitTokensToSave.length} and deleting ${traitTokensToDelete.length} in collection ${collectionId}` + ) + await em + .createQueryBuilder() + .insert() + .into(TraitToken) + .values(traitTokensToSave as any) + .orIgnore() + .execute() + await em.remove(traitTokensToDelete) + + await em + .createQueryBuilder() + .delete() + .from(TraitToken) + .where('traitToken.trait IN (:...traitsToDelete)', { traitsToDelete: traitsToDelete.map((t) => t.id) }) + .execute() + + await em.remove(traitsToDelete) + + done(null, { timeElapsed: new Date().getTime() - start.getTime(), collectionId }) +}) diff --git a/src/jobs/compute-traits.ts b/src/jobs/compute-traits.ts index 55cd4d80..7f89ea18 100644 --- a/src/jobs/compute-traits.ts +++ b/src/jobs/compute-traits.ts @@ -1,15 +1,10 @@ /* eslint-disable max-len */ import Queue from 'bull' -import { createHash } from 'crypto' -import isPlainObject from 'lodash/isPlainObject' -import connection from '../connection' -import { Collection, Token, Trait, TraitToken } from '../model' import config from '../config' -type JobData = { collectionId: string } -type TraitValueMap = Map +export type JobData = { collectionId: string } -const traitsQueue = new Queue('traitsQueue', { +export const traitsQueue = new Queue('traitsQueue', { defaultJobOptions: { delay: 5000, attempts: 2, removeOnComplete: true }, redis: { port: 6379, @@ -17,193 +12,13 @@ const traitsQueue = new Queue('traitsQueue', { }, }) -const hash = (str: string) => { - return createHash('sha1').update(str).digest('hex') -} - -const computeTraits = async (collectionId: string) => { +export const computeTraits = async (collectionId: string) => { if (!collectionId) { throw new Error('Collection ID not provided.') } - // TODO: use dynamicJob Id, check if job already in queue, add some kind of throttle for same collectionId - /* const jobs = await traitsQueue.getDelayed() - const collectionJob = jobs.find((job) => job?.data.collectionId === collectionId) - if (collectionJob) { - console.log(`Job for collection ${collectionId} already in queue.`) - - return - } */ traitsQueue.add({ collectionId }, { jobId: collectionId }).catch(() => { console.log('Closing connection as Redis is not available') traitsQueue.close(true) }) } - -// eslint-disable-next-line sonarjs/cognitive-complexity -traitsQueue.process(async (job, done) => { - if (!job.data.collectionId) { - throw new Error('Collection ID not provided.') - } - - console.log(`Processing job ${job.id} for collection ${job.data.collectionId}`) - - if (!connection.isInitialized) { - await connection.initialize().catch((err) => { - throw err - }) - } - - const em = connection.manager - - const traitTypeMap = new Map() - const tokenTraitMap = new Map() - - const start = new Date() - - const { collectionId } = job.data satisfies JobData - - const tokens = await em - .getRepository(Token) - .createQueryBuilder('token') - .select('token.id') - .addSelect('token.metadata') - .addSelect('token.supply') - .leftJoinAndMapMany('token.traits', TraitToken, 'traitToken', 'traitToken.token = token.id') - .where('token.collection = :collectionId', { collectionId }) - .getMany() - - const traits = await em - .getRepository(Trait) - .createQueryBuilder('trait') - .where('trait.collection = :collectionId', { collectionId }) - .getMany() - - tokens.forEach((token) => { - if (!token.metadata || !token.metadata.attributes || !isPlainObject(token.metadata.attributes)) return - const attributes = token.metadata.attributes as Record - Object.entries(attributes).forEach(([traitType, { value }]) => { - if (!value) return - - if (!traitTypeMap.has(traitType)) { - traitTypeMap.set(traitType, new Map()) - } - const tType = traitTypeMap.get(traitType) as TraitValueMap - if (!tType.has(value)) { - tType.set(value, { count: 0n }) - } - const traitValue = tType.get(value) as TraitValueMap extends Map ? V : never - traitValue.count += token.supply - - tokenTraitMap.set(token.id, [...(tokenTraitMap.get(token.id) || []), `${traitType}:${value}`]) - }) - }) - - if (!traitTypeMap.size) { - console.log(`No traits found for collection ${collectionId}`) - done() - - return - } - - const traitsToSave: Trait[] = [] - const traitsToDelete: Trait[] = [] - const traitsToUpdate: Trait[] = [] - - traitTypeMap.forEach((traitValueMap, traitType) => { - traitValueMap.forEach((traitValue, value) => { - const trait = traits.find((t) => t.id === hash(`${collectionId}-${traitType}-${value}`)) - if (!trait) { - traitsToSave.push( - new Trait({ - id: hash(`${collectionId}-${traitType}-${value}`), - collection: new Collection({ id: collectionId }), - traitType, - value, - count: traitValue.count, - }) - ) - } else if (trait.count !== traitValue.count) { - trait.count = traitValue.count - traitsToUpdate.push(trait) - } - }) - }) - - traits.forEach((trait) => { - if ( - !traitTypeMap.has(trait.traitType) || - !traitTypeMap.get(trait.traitType)?.has(trait.value) || - trait.id !== hash(`${collectionId}-${trait.traitType}-${trait.value}`) - ) { - traitsToDelete.push(trait) - } - }) - - await em.upsert(Trait, [...traitsToSave, ...traitsToUpdate] as any, ['id']) - - const traitTokensToSave: TraitToken[] = [] - const traitTokensToDelete: TraitToken[] = [] - - tokenTraitMap.forEach((_traits, _tokenId) => { - if (!_traits.length) return - - const token = tokens.find((t) => t.id === _tokenId) - - _traits.forEach((t) => { - const [traitType, value] = t.split(':') - - if (token?.traits.length) { - for (let i = 0; i < token.traits.length; i += 1) { - const traitToken = token.traits[i] - if (traitToken.id === hash(`${collectionId}-${traitType}-${value}-${_tokenId}`)) { - return - } - - if ( - !_traits.some((tt) => { - const splitted = tt.split(':') - return traitToken.id === hash(`${collectionId}-${splitted[0]}-${splitted[1]}-${_tokenId}`) - }) - ) { - traitTokensToDelete.push(new TraitToken({ id: traitToken.id })) - return - } - } - } - - traitTokensToSave.push( - new TraitToken({ - id: hash(`${collectionId}-${traitType}-${value}-${_tokenId}`), - trait: new Trait({ id: hash(`${collectionId}-${traitType}-${value}`) }), - token: new Token({ id: _tokenId }), - }) - ) - }) - }) - - console.log( - `Saving TraitToken ${traitTokensToSave.length} and deleting ${traitTokensToDelete.length} in collection ${collectionId}` - ) - await em - .createQueryBuilder() - .insert() - .into(TraitToken) - .values(traitTokensToSave as any) - .orIgnore() - .execute() - await em.remove(traitTokensToDelete) - - await em - .createQueryBuilder() - .delete() - .from(TraitToken) - .where('traitToken.trait IN (:...traitsToDelete)', { traitsToDelete: traitsToDelete.map((t) => t.id) }) - .execute() - - await em.remove(traitsToDelete) - - done(null, { timeElapsed: new Date().getTime() - start.getTime(), collectionId }) -}) - -export { traitsQueue, computeTraits } diff --git a/src/processor.ts b/src/processor.ts index 8345d213..0c7d8596 100644 --- a/src/processor.ts +++ b/src/processor.ts @@ -246,6 +246,7 @@ processor.run(new FullTypeormDatabase(), async (ctx) => { const lastBlock = ctx.blocks[ctx.blocks.length - 1].header if (lastBlock.height > config.chainStateHeight) { + import('./handleJobs') await chainState(ctx as unknown as CommonContext, lastBlock) } })