Skip to content

Commit

Permalink
feat: deal tracker spade sync cron
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Oct 25, 2023
1 parent a5cd1ca commit 1925c34
Show file tree
Hide file tree
Showing 15 changed files with 1,185 additions and 24 deletions.
428 changes: 407 additions & 21 deletions package-lock.json

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,13 @@
"unicorn/explicit-length-check": "off",
"unicorn/filename-case": "off",
"unicorn/prefer-set-has": "off",
"unicorn/prefer-spread": "off",
"unicorn/prefer-array-some": "off",
"unicorn/no-array-callback-reference": "off",
"unicorn/no-array-reduce": "off",
"unicorn/no-await-expression-member": "off",
"unicorn/no-zero-fractions": "off",
"unicorn/numeric-separators-style": "off",
"no-console": "off",
"no-new": "off",
"no-warning-comments": "off"
Expand Down
11 changes: 9 additions & 2 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@
"version": "0.0.0",
"type": "module",
"scripts": {
"test": "ava --serial --no-worker-threads --verbose --timeout=60s test/{*.test.js,**/*.test.js}"
"mock:spade-oracle-server": "node test/helpers/spade-oracle-server.js",
"mock": "run-p mock:*",
"test": "PORT=9200 npm-run-all -p -r mock test:all",
"test:all": "ava --serial --no-worker-threads --verbose --timeout=60s test/{*.test.js,**/*.test.js}"
},
"dependencies": {
"@serverless-stack/node": "^1.18.4",
"@aws-sdk/client-dynamodb": "^3.363.0",
"@aws-sdk/client-sqs": "^3.363.0",
"@aws-sdk/client-s3": "^3.363.0",
"@aws-sdk/util-dynamodb": "3.363.0",
"@ipld/dag-json": "10.1.5",
"@ipld/dag-ucan": "^3.3.2",
"@ucanto/client": "^8.0.0",
"@ucanto/interface": "^8.0.0",
Expand All @@ -26,13 +30,16 @@
"@web3-storage/filecoin-client": "^2.0.0",
"multiformats": "12.0.1",
"uint8arrays": "^4.0.4",
"p-retry": "^5.1.2"
"p-retry": "^5.1.2",
"simple-zstd": "^1.4.2",
"stream-read-all": "^4.0.0"
},
"devDependencies": {
"@ipld/car": "5.1.1",
"@web-std/blob": "3.0.4",
"ava": "^5.3.0",
"nanoid": "^4.0.0",
"npm-run-all": "^4.1.5",
"delay": "^6.0.0",
"p-defer": "^4.0.0",
"p-wait-for": "^5.0.2",
Expand Down
212 changes: 212 additions & 0 deletions packages/core/src/deal-tracker/spade-oracle-sync-tick.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
// @ts-expect-error no types available
import { ZSTDDecompress } from 'simple-zstd'
import { Readable } from 'stream'
// @ts-expect-error no types available
import streamReadAll from 'stream-read-all'
import { toString } from 'uint8arrays/to-string'
import { encode, decode } from '@ipld/dag-json'
import { RecordNotFoundErrorName } from '@web3-storage/filecoin-api/errors'
import { parse as parseLink } from 'multiformats/link'

/**
* @typedef {import('@web3-storage/filecoin-api/deal-tracker/api').DealStore} DealStore
* @typedef {import('./types').OracleContracts} OracleContracts
* @typedef {import('./types').SpadeOracle} SpadeOracle
* @typedef {import('../store/types').SpadeOracleStore} SpadeOracleStore
*/

/**
* On CRON tick, this function syncs deal store entries with the most up to date information stored
* in Spade's Oracle:
* - The previous oracle state known is fetched, as well as the latest state from Spade endpoint.
* - Once both states are in memory, they are compared and a diff is generated.
* - Diff is stored in deal store
* - Handled new state of oracle is stored for comparison in next tick.
*
* @param {object} context
* @param {DealStore} context.dealStore
* @param {SpadeOracleStore} context.spadeOracleStore
* @param {URL} context.spadeOracleUrl
*/
export async function spadeOracleSyncTick ({
dealStore,
spadeOracleStore,
spadeOracleUrl
}) {
// Get previous recorded spade oracle contracts
const getPreviousSpadeOracle = await getSpadeOracleState({
spadeOracleStore,
spadeOracleId: spadeOracleUrl.toString(),
})
if (getPreviousSpadeOracle.error && getPreviousSpadeOracle.error.name !== RecordNotFoundErrorName) {
return getPreviousSpadeOracle
}

// Get updated spade oracle contracts
const getUpdatedSpadeOracle = await getSpadeOracleCurrentState(spadeOracleUrl)
if (getUpdatedSpadeOracle.error) {
return getUpdatedSpadeOracle
}

// Get diff of contracts
const diffOracleContracts = computeDiffOracleState({
// fallsback to empty map if not found
previousOracleContracts: getPreviousSpadeOracle.ok || new Map(),
updatedOracleContracts: getUpdatedSpadeOracle.ok
})

// Store diff of contracts
const putDiff = await putDiffToDealStore({
dealStore,
diffOracleContracts
})
if (putDiff.error) {
return putDiff
}

// Record spade oracle contracts handled
const putUpdatedSpadeOracle = await putUpdatedSpadeOracleState({
spadeOracleStore,
spadeOracleId: spadeOracleUrl.toString(),
oracleContracts: getUpdatedSpadeOracle.ok
})
if (putUpdatedSpadeOracle.error) {
return putUpdatedSpadeOracle
}

return {
ok: {},
error: undefined
}
}

/**
* @param {object} context
* @param {DealStore} context.dealStore
* @param {OracleContracts} context.diffOracleContracts
* @returns {Promise<import('../types').Result<{}, import('@web3-storage/filecoin-api/types').StorePutError>>}
*/
export async function putDiffToDealStore ({ dealStore, diffOracleContracts }) {
const res = await Promise.all(
Array.from(diffOracleContracts, ([key, value]) => {
return Promise.all(value.map(contract => {
/** @type {import('@web3-storage/data-segment').LegacyPieceLink} */
const legacyPieceCid = parseLink(key)

return dealStore.put({
...contract,
// @ts-expect-error not PieceCIDv2
piece: legacyPieceCid,
provider: `${contract.provider}`,
insertedAt: (new Date()).toISOString()
})
}))
})
)

const firsPutError = res.find(pieceContracts => pieceContracts.find(c => c.error))?.find(comb => comb.error)
if (firsPutError?.error) {
return {
error: firsPutError.error
}
}
return {
ok: {}
}
}

/**
* @param {object} context
* @param {OracleContracts} context.previousOracleContracts
* @param {OracleContracts} context.updatedOracleContracts
*/
export function computeDiffOracleState ({ previousOracleContracts, updatedOracleContracts }) {
/** @type {OracleContracts} */
const diff = new Map()

for (const [pieceCid, contracts] of updatedOracleContracts.entries() ) {
const previousContracts = previousOracleContracts.get(pieceCid) || []
// Find diff when different length
if (contracts.length !== previousContracts.length) {
const diffContracts = []
// Get contracts for PieceCID still not recorded
for (const c of contracts) {
if (!previousContracts.find(pc => pc.dealId === c.dealId)) {
diffContracts.push(c)
}
}
diff.set(pieceCid, diffContracts)
}
}

return diff
}

/**
* @param {object} context
* @param {SpadeOracleStore} context.spadeOracleStore
* @param {string} context.spadeOracleId
* @returns {Promise<import('../types').Result<OracleContracts, Error>>}
*/
export async function getSpadeOracleState ({ spadeOracleStore, spadeOracleId }) {
const getRes = await spadeOracleStore.get(spadeOracleId)
if (getRes.error) {
return getRes
}

return {
ok: new Map(Object.entries(decode(getRes.ok.value))),
}
}

/**
* @param {object} context
* @param {SpadeOracleStore} context.spadeOracleStore
* @param {string} context.spadeOracleId
* @param {OracleContracts} context.oracleContracts
*/
async function putUpdatedSpadeOracleState ({ spadeOracleStore, spadeOracleId, oracleContracts }) {
const putRes = await spadeOracleStore.put({
key: spadeOracleId,
value: encode(Object.fromEntries(oracleContracts))
})

return putRes
}

/**
* @param {URL} spadeOracleUrl
* @returns {Promise<import('../types').Result<OracleContracts, Error>>}
*/
async function getSpadeOracleCurrentState (spadeOracleUrl) {
/** @type {OracleContracts} */
const dealMap = new Map()
const res = await fetch(spadeOracleUrl)
if (!res.ok) {
return {
// TODO: Error
error: new Error('could not read')
}
}

const resDecompressed = await streamReadAll(
// @ts-expect-error aws types...
Readable.fromWeb(res.body)
.pipe(ZSTDDecompress())
)
/** @type {SpadeOracle} */
const SpadeOracle = JSON.parse(toString(resDecompressed))
for (const replica of SpadeOracle.active_replicas) {
// TODO: convert pieceCid to v2 (piece_cid) + (piece_log2_size)
dealMap.set(replica.piece_cid, replica.contracts.map(c => ({
provider: c.provider_id,
dealId: c.legacy_market_id,
expirationEpoch: c.legacy_market_end_epoch,
source: spadeOracleUrl.toString()
})))
}

return {
ok: dealMap
}
}
28 changes: 28 additions & 0 deletions packages/core/src/deal-tracker/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@

export type OracleContracts = Map<string, Contract[]>

export interface Contract {
provider: number
dealId: number
expirationEpoch: number
source: string
}

// Spade types

export interface SpadeContract {
provider_id: number
legacy_market_id: number
legacy_market_end_epoch: number
}

export interface SpadeReplica {
contracts: SpadeContract[]
piece_cid: string
piece_log2_size: number
}

export interface SpadeOracle {
state_epoch: number
active_replicas: SpadeReplica[]
}
98 changes: 98 additions & 0 deletions packages/core/src/store/spade-oracle-store.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import { PutObjectCommand, GetObjectCommand } from '@aws-sdk/client-s3'
import pRetry from 'p-retry'
import { StoreOperationFailed, RecordNotFound } from '@web3-storage/filecoin-api/errors'

import { connectBucket } from './index.js'


/**
* @param {import('./types.js').BucketConnect | import('@aws-sdk/client-s3').S3Client} conf
* @param {object} context
* @param {string} context.name
* @returns {import('./types.js').SpadeOracleStore}
*/
export function createClient (conf, context) {
const bucketClient = connectBucket(conf)

return {
put: async (record) => {
const putCmd = new PutObjectCommand({
Bucket: context.name,
Key: encodeURIComponent(record.key),
Body: record.value
})

// retry to avoid throttling errors
try {
await pRetry(() => bucketClient.send(putCmd))
} catch (/** @type {any} */ error) {
console.log('err', error)
return {
error: new StoreOperationFailed(error.message)
}
}

return {
ok: {}
}
},
get: async (key) => {
const putCmd = new GetObjectCommand({
Bucket: context.name,
Key: encodeURIComponent(key)
})

let res
try {
res = await bucketClient.send(putCmd)
} catch (/** @type {any} */ error) {
if (error?.$metadata.httpStatusCode === 404) {
return {
error: new RecordNotFound('item not found in store')
}
}
return {
error: new StoreOperationFailed(error.message)
}
}

if (!res || !res.Body) {
return {
error: new RecordNotFound('item not found in store')
}
}

return {
ok: {
key,
value: await res.Body.transformToByteArray()
}
}
},
has: async (key) => {
const putCmd = new GetObjectCommand({
Bucket: context.name,
Key: encodeURIComponent(key)
})

let res
try {
res = await bucketClient.send(putCmd)
} catch (/** @type {any} */ error) {
return {
error: new StoreOperationFailed(error.message)
}
}

if (!res || !res.Body) {
return {
error: new RecordNotFound('item not found in store')
}
}

return {
ok: true
}
},
}
}
Loading

0 comments on commit 1925c34

Please sign in to comment.