Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use ajv-threads package to parallelize jsonSchema validation of ModelInstanceDocuments #3176

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@
"@stablelib/random": "^1.0.1",
"@stablelib/sha256": "^1.0.1",
"@stablelib/uuid": "^1.0.1",
"ajv": "^8.8.2",
"ajv-formats": "^2.1.1",
"ajv-threads": "^1.0.4",
"await-semaphore": "^0.1.3",
"cartonne": "^3.0.1",
"codeco": "^1.1.0",
Expand Down
9 changes: 5 additions & 4 deletions packages/core/src/__tests__/handlers-map.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,36 @@ import { Caip10LinkHandler } from '@ceramicnetwork/stream-caip10-link-handler'
import { ModelHandler } from '@ceramicnetwork/stream-model-handler'
import { ModelInstanceDocumentHandler } from '@ceramicnetwork/stream-model-instance-handler'
import { TileDocumentHandler } from '@ceramicnetwork/stream-tile-handler'
import type { SchemaValidation } from 'ajv-threads'

const loggerProvider = new LoggerProvider()
const logger = loggerProvider.getDiagnosticsLogger()

describe('constructor', () => {
test('default handlers', () => {
const handlers = new HandlersMap(logger)
const handlers = HandlersMap.makeWithDefaultHandlers(logger, null as SchemaValidation)
expect(handlers.get('tile')).toBeInstanceOf(TileDocumentHandler)
expect(handlers.get('caip10-link')).toBeInstanceOf(Caip10LinkHandler)
expect(handlers.get('model')).toBeInstanceOf(ModelHandler)
expect(handlers.get('MID')).toBeInstanceOf(ModelInstanceDocumentHandler)
})
test('custom handlers', () => {
const customHandler = jest.fn() as unknown as StreamHandler<Stream>
const handlers = new HandlersMap(logger, new Map().set(13, customHandler))
const handlers = HandlersMap.makeWithHandlers(logger, new Map().set(13, customHandler))
expect(handlers.get(13)).toBe(customHandler)
})
})

test('set and get', () => {
const customHandler = { name: 'custom', type: 13 } as unknown as StreamHandler<Stream>
const handlers = new HandlersMap(logger)
const handlers = HandlersMap.makeEmpty(logger)
expect(() => handlers.get('custom')).toThrow()
handlers.add(customHandler)
expect(() => handlers.get('custom')).toThrow()
expect(handlers.get(13)).toBe(customHandler)
})

test('get non-existing', () => {
const handlers = new HandlersMap(logger)
const handlers = HandlersMap.makeEmpty(logger)
expect(() => handlers.get('custom')).toThrow()
})
21 changes: 10 additions & 11 deletions packages/core/src/ceramic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { IpfsTopology } from '@ceramicnetwork/ipfs-topology'
import {
CreateOpts,
Stream,
StreamHandler,
DiagnosticsLogger,
StreamUtils,
LoadOpts,
Expand Down Expand Up @@ -57,6 +56,7 @@ import { AnchorRequestCarBuilder } from './anchor/anchor-request-car-builder.js'
import { makeStreamLoaderAndUpdater } from './initialization/stream-loading.js'
import { Feed, type PublicFeed } from './feed.js'
import { IReconApi, ReconApi } from './recon.js'
import { SchemaValidation } from 'ajv-threads'

const DEFAULT_CACHE_LIMIT = 500 // number of streams stored in the cache
const DEFAULT_QPS_LIMIT = 10 // Max number of pubsub query messages that can be published per second without rate limiting
Expand Down Expand Up @@ -205,7 +205,8 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader {
private readonly providersCache: ProvidersCache
private readonly syncApi: SyncApi

readonly _streamHandlers: HandlersMap
private readonly _streamHandlers: HandlersMap
private readonly _schemaValidator: SchemaValidation
private readonly _readOnly: boolean
private readonly _ipfsTopology: IpfsTopology
private readonly _logger: DiagnosticsLogger
Expand Down Expand Up @@ -243,7 +244,11 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader {

this._ipfs = modules.ipfs

this._streamHandlers = new HandlersMap(this._logger)
const numCores = os.cpus().length
// Use number of threads equal to half the available cores for schema validation. Leave the
// other half for signature validation.
this._schemaValidator = new SchemaValidation(Math.max(1, Math.ceil(numCores / 2)))
this._streamHandlers = HandlersMap.makeWithDefaultHandlers(this._logger, this._schemaValidator)

// This initialization block below has to be redone.
// Things below should be passed here as `modules` variable.
Expand Down Expand Up @@ -482,6 +487,7 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader {
this._logger.warn(`Starting in read-only mode. All write operations will fail`)
}

await this._schemaValidator.init()
await this.repository.init()
await this.dispatcher.init()

Expand Down Expand Up @@ -581,14 +587,6 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader {
}
}

/**
* Register new stream handler
* @param streamHandler - Stream type handler
*/
addStreamHandler<T extends Stream>(streamHandler: StreamHandler<T>): void {
this._streamHandlers.add(streamHandler)
}

async nodeStatus(): Promise<NodeStatusResponse> {
const anchor = {
anchorServiceUrl: this.anchorService.url,
Expand Down Expand Up @@ -893,6 +891,7 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader {
await this.syncApi.shutdown()
await this.dispatcher.close()
await this.repository.close()
await this._schemaValidator.shutdown()
this._ipfsTopology.stop()
this._logger.imp('Ceramic instance closed successfully')
}
Expand Down
21 changes: 17 additions & 4 deletions packages/core/src/handlers-map.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import { ModelInstanceDocumentHandler } from '@ceramicnetwork/stream-model-insta
import { Stream, StreamHandler } from '@ceramicnetwork/common'
import { DiagnosticsLogger } from '@ceramicnetwork/common'
import { StreamType } from '@ceramicnetwork/streamid'
import type { SchemaValidation } from 'ajv-threads'

type Registry = Map<number, StreamHandler<Stream>>

function defaultHandlers(): Registry {
function defaultHandlers(schemaValidator: SchemaValidation): Registry {
const tile = new TileDocumentHandler()
const caip10Link = new Caip10LinkHandler()
const model = new ModelHandler()
const instance = new ModelInstanceDocumentHandler()
const instance = new ModelInstanceDocumentHandler(schemaValidator)
const handlers = new Map<number, StreamHandler<Stream>>()
handlers.set(tile.type, tile)
handlers.set(caip10Link.type, caip10Link)
Expand All @@ -27,8 +28,20 @@ function defaultHandlers(): Registry {
export class HandlersMap {
private readonly handlers: Registry

constructor(private readonly logger: DiagnosticsLogger, handlers?: Registry) {
this.handlers = handlers || defaultHandlers()
private constructor(private readonly logger: DiagnosticsLogger, handlers: Registry) {
this.handlers = handlers
}

static makeWithHandlers(logger: DiagnosticsLogger, handlers: Registry) {
return new HandlersMap(logger, handlers)
}

static makeWithDefaultHandlers(logger: DiagnosticsLogger, schemaValidator: SchemaValidation) {
return new HandlersMap(logger, defaultHandlers(schemaValidator))
}

static makeEmpty(logger: DiagnosticsLogger) {
return new HandlersMap(logger, new Map())
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { HandlersMap } from '../../handlers-map.js'
import cloneDeep from 'lodash.clonedeep'
import { CID } from 'multiformats/cid'
import { CommonTestUtils as TestUtils } from '@ceramicnetwork/common-test-utils'
import { SchemaValidation } from 'ajv-threads'

const TOPIC = '/ceramic/test12345'
const CONTENT0 = { step: 0 }
Expand All @@ -41,6 +42,7 @@ describeIfV3('StateManipulator test', () => {
let dispatcherIpfs: IpfsApi
let logSyncer: LogSyncer
let stateManipulator: StateManipulator
let schemaValidator: SchemaValidation

let doc: TileDocument
let commits: Array<CommitData>
Expand All @@ -59,7 +61,10 @@ describeIfV3('StateManipulator test', () => {

const logger = new LoggerProvider().getDiagnosticsLogger()
logSyncer = new LogSyncer(dispatcher)
const handlers = new HandlersMap(logger)

schemaValidator = new SchemaValidation(1)
await schemaValidator.init()
const handlers = HandlersMap.makeWithDefaultHandlers(logger, schemaValidator)
stateManipulator = new StateManipulator(logger, handlers, logSyncer, ceramic)

await swarmConnect(dispatcherIpfs, ceramicIpfs)
Expand All @@ -77,6 +82,7 @@ describeIfV3('StateManipulator test', () => {
afterAll(async () => {
await dispatcher.close()
await ceramic.close()
await schemaValidator.shutdown()

// Wait for pubsub unsubscribe to be processed
// TODO(1963): Remove this once dispatcher.close() won't resolve until the pubsub unsubscribe
Expand Down
14 changes: 12 additions & 2 deletions packages/core/src/stream-loading/__tests__/stream-loader.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
} from '../../pubsub/pubsub-message.js'
import { asIpfsMessage } from '../../pubsub/__tests__/as-ipfs-message.js'
import { CommonTestUtils as TestUtils } from '@ceramicnetwork/common-test-utils'
import { SchemaValidation } from 'ajv-threads'

const TOPIC = '/ceramic/test12345'
const CONTENT0 = { step: 0 }
Expand Down Expand Up @@ -58,6 +59,7 @@ describeIfV3('Streamloader', () => {

let dispatcher: Dispatcher
let dispatcherIpfs: IpfsApi
let schemaValidator: SchemaValidation
let streamLoader: StreamLoader

let ceramicIpfs: IpfsApi
Expand All @@ -82,7 +84,10 @@ describeIfV3('Streamloader', () => {
dispatcher,
ceramic.anchorService.validator
)
const handlers = new HandlersMap(logger)

schemaValidator = new SchemaValidation(1)
await schemaValidator.init()
const handlers = HandlersMap.makeWithDefaultHandlers(logger, schemaValidator)
const stateManipulator = new StateManipulator(logger, handlers, logSyncer, ceramic)
streamLoader = new StreamLoader(
logger,
Expand All @@ -98,6 +103,7 @@ describeIfV3('Streamloader', () => {
afterAll(async () => {
await dispatcher.close()
await ceramic.close()
await schemaValidator.shutdown()

// Wait for pubsub unsubscribe to be processed
// TODO(1963): Remove this once dispatcher.close() won't resolve until the pubsub unsubscribe
Expand Down Expand Up @@ -302,6 +308,7 @@ describeIfV3('Streamloader', () => {
let ipfs: IpfsApi

let dispatcher: Dispatcher
let schemaValidator: SchemaValidation
let streamLoader: StreamLoader

let stream: TileDocument
Expand Down Expand Up @@ -337,7 +344,9 @@ describeIfV3('Streamloader', () => {
dispatcher,
ceramic.anchorService.validator
)
const handlers = new HandlersMap(logger)

schemaValidator = new SchemaValidation()
const handlers = HandlersMap.makeWithDefaultHandlers(logger, schemaValidator)
const stateManipulator = new StateManipulator(logger, handlers, logSyncer, ceramic)
streamLoader = new StreamLoader(
logger,
Expand Down Expand Up @@ -388,6 +397,7 @@ describeIfV3('Streamloader', () => {

afterAll(async () => {
await dispatcher.close()
await schemaValidator.shutdown()

// Wait for pubsub unsubscribe to be processed
// TODO(1963): Remove this once dispatcher.close() won't resolve until the pubsub unsubscribe
Expand Down
3 changes: 1 addition & 2 deletions packages/stream-model-instance-handler/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@
"@ceramicnetwork/stream-model": "^4.2.0-rc.0",
"@ceramicnetwork/stream-model-instance": "^4.3.0-rc.0",
"@ceramicnetwork/streamid": "^5.0.0",
"ajv": "^8.8.2",
"ajv-formats": "^2.1.1",
"ajv-threads": "^1.0.4",
"fast-json-patch": "^3.1.0",
"least-recent": "^1.0.3",
"lodash.clonedeep": "^4.5.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import {
} from '@ceramicnetwork/did-test-utils'
import { CommonTestUtils as TestUtils } from '@ceramicnetwork/common-test-utils'
import { VerificationMethod } from 'did-resolver'
import { SchemaValidation } from 'ajv-threads'

// because we're doing mocking weirdly, by mocking a function two libraries deep, to test a function
// one library deep that is unrelated to TileDocumentHandler, we need to specifically duplicate
Expand Down Expand Up @@ -414,13 +415,16 @@ const STREAMS = {

describe('ModelInstanceDocumentHandler', () => {
let handler: ModelInstanceDocumentHandler
let schemaValidator: SchemaValidation
let context: StreamReaderWriter
let defaultSigner: RotatingSigner
let signerUsingNewKey: CeramicSigner
let signerUsingOldKey: CeramicSigner
let ipfs: IpfsApi

beforeAll(async () => {
schemaValidator = new SchemaValidation(1)
await schemaValidator.init()
const recs: Record<string, any> = {}
ipfs = {
dag: {
Expand All @@ -442,10 +446,14 @@ describe('ModelInstanceDocumentHandler', () => {
} as IpfsApi
})

afterAll(async () => {
await schemaValidator.shutdown()
})

beforeEach(async () => {
ModelInstanceDocument.MAX_DOCUMENT_SIZE = 16_000_000

handler = new ModelInstanceDocumentHandler()
handler = new ModelInstanceDocumentHandler(schemaValidator)

defaultSigner = await DidTestUtils.rotatingSigner({})
context = DidTestUtils.api(defaultSigner)
Expand Down
Loading