Skip to content

Commit

Permalink
feat: Use worker threads for jsonSchema validation of ModelInstanceDo…
Browse files Browse the repository at this point in the history
…cuments
  • Loading branch information
stbrody committed Feb 29, 2024
1 parent edd0fcb commit 4288fe2
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 223 deletions.
3 changes: 1 addition & 2 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@
"@stablelib/random": "^1.0.1",
"@stablelib/sha256": "^1.0.1",
"@stablelib/uuid": "^1.0.1",
"ajv": "^8.8.2",
"ajv-formats": "^2.1.1",
"ajv-threads": "^1.0.1",
"await-semaphore": "^0.1.3",
"cartonne": "^3.0.1",
"codeco": "^1.1.0",
Expand Down
9 changes: 5 additions & 4 deletions packages/core/src/__tests__/handlers-map.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,36 @@ import { Caip10LinkHandler } from '@ceramicnetwork/stream-caip10-link-handler'
import { ModelHandler } from '@ceramicnetwork/stream-model-handler'
import { ModelInstanceDocumentHandler } from '@ceramicnetwork/stream-model-instance-handler'
import { TileDocumentHandler } from '@ceramicnetwork/stream-tile-handler'
import type { SchemaValidation } from 'ajv-threads'

const loggerProvider = new LoggerProvider()
const logger = loggerProvider.getDiagnosticsLogger()

describe('constructor', () => {
test('default handlers', () => {
const handlers = new HandlersMap(logger)
const handlers = HandlersMap.makeWithDefaultHandlers(logger, null as SchemaValidation)
expect(handlers.get('tile')).toBeInstanceOf(TileDocumentHandler)
expect(handlers.get('caip10-link')).toBeInstanceOf(Caip10LinkHandler)
expect(handlers.get('model')).toBeInstanceOf(ModelHandler)
expect(handlers.get('MID')).toBeInstanceOf(ModelInstanceDocumentHandler)
})
test('custom handlers', () => {
const customHandler = jest.fn() as unknown as StreamHandler<Stream>
const handlers = new HandlersMap(logger, new Map().set(13, customHandler))
const handlers = HandlersMap.makeWithHandlers(logger, new Map().set(13, customHandler))
expect(handlers.get(13)).toBe(customHandler)
})
})

test('set and get', () => {
const customHandler = { name: 'custom', type: 13 } as unknown as StreamHandler<Stream>
const handlers = new HandlersMap(logger)
const handlers = HandlersMap.makeEmpty(logger)
expect(() => handlers.get('custom')).toThrow()
handlers.add(customHandler)
expect(() => handlers.get('custom')).toThrow()
expect(handlers.get(13)).toBe(customHandler)
})

test('get non-existing', () => {
const handlers = new HandlersMap(logger)
const handlers = HandlersMap.makeEmpty(logger)
expect(() => handlers.get('custom')).toThrow()
})
17 changes: 6 additions & 11 deletions packages/core/src/ceramic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { IpfsTopology } from '@ceramicnetwork/ipfs-topology'
import {
CreateOpts,
Stream,
StreamHandler,
DiagnosticsLogger,
StreamUtils,
LoadOpts,
Expand Down Expand Up @@ -57,6 +56,7 @@ import { AnchorRequestCarBuilder } from './anchor/anchor-request-car-builder.js'
import { makeStreamLoaderAndUpdater } from './initialization/stream-loading.js'
import { Feed, type PublicFeed } from './feed.js'
import { IReconApi, ReconApi } from './recon.js'
import { SchemaValidation } from 'ajv-threads'

const DEFAULT_CACHE_LIMIT = 500 // number of streams stored in the cache
const DEFAULT_QPS_LIMIT = 10 // Max number of pubsub query messages that can be published per second without rate limiting
Expand Down Expand Up @@ -205,7 +205,8 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader {
private readonly providersCache: ProvidersCache
private readonly syncApi: SyncApi

readonly _streamHandlers: HandlersMap
private readonly _streamHandlers: HandlersMap
private readonly _schemaValidator: SchemaValidation
private readonly _readOnly: boolean
private readonly _ipfsTopology: IpfsTopology
private readonly _logger: DiagnosticsLogger
Expand Down Expand Up @@ -243,7 +244,8 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader {

this._ipfs = modules.ipfs

this._streamHandlers = new HandlersMap(this._logger)
this._schemaValidator = new SchemaValidation()
this._streamHandlers = HandlersMap.makeWithDefaultHandlers(this._logger, this._schemaValidator)

// This initialization block below has to be redone.
// Things below should be passed here as `modules` variable.
Expand Down Expand Up @@ -581,14 +583,6 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader {
}
}

/**
* Register new stream handler
* @param streamHandler - Stream type handler
*/
addStreamHandler<T extends Stream>(streamHandler: StreamHandler<T>): void {
this._streamHandlers.add(streamHandler)
}

async nodeStatus(): Promise<NodeStatusResponse> {
const anchor = {
anchorServiceUrl: this.anchorService.url,
Expand Down Expand Up @@ -893,6 +887,7 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader {
await this.syncApi.shutdown()
await this.dispatcher.close()
await this.repository.close()
await this._schemaValidator.shutdown()
this._ipfsTopology.stop()
this._logger.imp('Ceramic instance closed successfully')
}
Expand Down
21 changes: 17 additions & 4 deletions packages/core/src/handlers-map.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import { ModelInstanceDocumentHandler } from '@ceramicnetwork/stream-model-insta
import { Stream, StreamHandler } from '@ceramicnetwork/common'
import { DiagnosticsLogger } from '@ceramicnetwork/common'
import { StreamType } from '@ceramicnetwork/streamid'
import type { SchemaValidation } from 'ajv-threads'

type Registry = Map<number, StreamHandler<Stream>>

function defaultHandlers(): Registry {
function defaultHandlers(schemaValidator: SchemaValidation): Registry {
const tile = new TileDocumentHandler()
const caip10Link = new Caip10LinkHandler()
const model = new ModelHandler()
const instance = new ModelInstanceDocumentHandler()
const instance = new ModelInstanceDocumentHandler(schemaValidator)
const handlers = new Map<number, StreamHandler<Stream>>()
handlers.set(tile.type, tile)
handlers.set(caip10Link.type, caip10Link)
Expand All @@ -27,8 +28,20 @@ function defaultHandlers(): Registry {
export class HandlersMap {
private readonly handlers: Registry

constructor(private readonly logger: DiagnosticsLogger, handlers?: Registry) {
this.handlers = handlers || defaultHandlers()
private constructor(private readonly logger: DiagnosticsLogger, handlers: Registry) {
this.handlers = handlers
}

static makeWithHandlers(logger: DiagnosticsLogger, handlers: Registry) {
return new HandlersMap(logger, handlers)
}

static makeWithDefaultHandlers(logger: DiagnosticsLogger, schemaValidator: SchemaValidation) {
return new HandlersMap(logger, defaultHandlers(schemaValidator))
}

static makeEmpty(logger: DiagnosticsLogger) {
return new HandlersMap(logger, new Map())
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { HandlersMap } from '../../handlers-map.js'
import cloneDeep from 'lodash.clonedeep'
import { CID } from 'multiformats/cid'
import { CommonTestUtils as TestUtils } from '@ceramicnetwork/common-test-utils'
import { SchemaValidation } from 'ajv-threads'

const TOPIC = '/ceramic/test12345'
const CONTENT0 = { step: 0 }
Expand All @@ -41,6 +42,7 @@ describeIfV3('StateManipulator test', () => {
let dispatcherIpfs: IpfsApi
let logSyncer: LogSyncer
let stateManipulator: StateManipulator
let schemaValidator: SchemaValidation

let doc: TileDocument
let commits: Array<CommitData>
Expand All @@ -59,7 +61,9 @@ describeIfV3('StateManipulator test', () => {

const logger = new LoggerProvider().getDiagnosticsLogger()
logSyncer = new LogSyncer(dispatcher)
const handlers = new HandlersMap(logger)

schemaValidator = new SchemaValidation()
const handlers = HandlersMap.makeWithDefaultHandlers(logger, schemaValidator)
stateManipulator = new StateManipulator(logger, handlers, logSyncer, ceramic)

await swarmConnect(dispatcherIpfs, ceramicIpfs)
Expand All @@ -77,6 +81,7 @@ describeIfV3('StateManipulator test', () => {
afterAll(async () => {
await dispatcher.close()
await ceramic.close()
await schemaValidator.shutdown()

// Wait for pubsub unsubscribe to be processed
// TODO(1963): Remove this once dispatcher.close() won't resolve until the pubsub unsubscribe
Expand Down
13 changes: 11 additions & 2 deletions packages/core/src/stream-loading/__tests__/stream-loader.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
} from '../../pubsub/pubsub-message.js'
import { asIpfsMessage } from '../../pubsub/__tests__/as-ipfs-message.js'
import { CommonTestUtils as TestUtils } from '@ceramicnetwork/common-test-utils'
import { SchemaValidation } from 'ajv-threads'

const TOPIC = '/ceramic/test12345'
const CONTENT0 = { step: 0 }
Expand Down Expand Up @@ -58,6 +59,7 @@ describeIfV3('Streamloader', () => {

let dispatcher: Dispatcher
let dispatcherIpfs: IpfsApi
let schemaValidator: SchemaValidation
let streamLoader: StreamLoader

let ceramicIpfs: IpfsApi
Expand All @@ -82,7 +84,9 @@ describeIfV3('Streamloader', () => {
dispatcher,
ceramic.anchorService.validator
)
const handlers = new HandlersMap(logger)

schemaValidator = new SchemaValidation()
const handlers = HandlersMap.makeWithDefaultHandlers(logger, schemaValidator)
const stateManipulator = new StateManipulator(logger, handlers, logSyncer, ceramic)
streamLoader = new StreamLoader(
logger,
Expand All @@ -98,6 +102,7 @@ describeIfV3('Streamloader', () => {
afterAll(async () => {
await dispatcher.close()
await ceramic.close()
await schemaValidator.shutdown()

// Wait for pubsub unsubscribe to be processed
// TODO(1963): Remove this once dispatcher.close() won't resolve until the pubsub unsubscribe
Expand Down Expand Up @@ -302,6 +307,7 @@ describeIfV3('Streamloader', () => {
let ipfs: IpfsApi

let dispatcher: Dispatcher
let schemaValidator: SchemaValidation
let streamLoader: StreamLoader

let stream: TileDocument
Expand Down Expand Up @@ -337,7 +343,9 @@ describeIfV3('Streamloader', () => {
dispatcher,
ceramic.anchorService.validator
)
const handlers = new HandlersMap(logger)

schemaValidator = new SchemaValidation()
const handlers = HandlersMap.makeWithDefaultHandlers(logger, schemaValidator)
const stateManipulator = new StateManipulator(logger, handlers, logSyncer, ceramic)
streamLoader = new StreamLoader(
logger,
Expand Down Expand Up @@ -388,6 +396,7 @@ describeIfV3('Streamloader', () => {

afterAll(async () => {
await dispatcher.close()
await schemaValidator.shutdown()

// Wait for pubsub unsubscribe to be processed
// TODO(1963): Remove this once dispatcher.close() won't resolve until the pubsub unsubscribe
Expand Down
3 changes: 1 addition & 2 deletions packages/stream-model-instance-handler/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@
"@ceramicnetwork/stream-handler-common": "^4.1.0",
"@ceramicnetwork/stream-model-instance": "^4.2.0",
"@ceramicnetwork/streamid": "^5.0.0",
"ajv": "^8.8.2",
"ajv-formats": "^2.1.1",
"ajv-threads": "^1.0.1",
"fast-json-patch": "^3.1.0",
"least-recent": "^1.0.3",
"lodash.clonedeep": "^4.5.0",
Expand Down
Loading

0 comments on commit 4288fe2

Please sign in to comment.