Skip to content

Commit

Permalink
chore: address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Oct 26, 2023
1 parent fad2241 commit e247249
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 124 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
"unicorn/prefer-set-has": "off",
"unicorn/prefer-spread": "off",
"unicorn/prefer-array-some": "off",
"unicorn/prefer-ternary": "off",
"unicorn/no-array-callback-reference": "off",
"unicorn/no-array-reduce": "off",
"unicorn/no-await-expression-member": "off",
Expand Down
112 changes: 59 additions & 53 deletions packages/core/src/deal-tracker/spade-oracle-sync-tick.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,73 +3,78 @@ import { ZSTDDecompress } from 'simple-zstd'
import { Readable } from 'stream'
// @ts-expect-error no types available
import streamReadAll from 'stream-read-all'
import { toString } from 'uint8arrays/to-string'
import { encode, decode } from '@ipld/dag-json'
import { RecordNotFoundErrorName } from '@web3-storage/filecoin-api/errors'
import { parse as parseLink } from 'multiformats/link'
import { Piece } from '@web3-storage/data-segment'

/**
* @typedef {import('@web3-storage/filecoin-api/deal-tracker/api').DealStore} DealStore
* @typedef {import('./types').OracleContracts} OracleContracts
* @typedef {import('./types').SpadeOracle} SpadeOracle
* @typedef {import('../store/types').SpadeOracleStore} SpadeOracleStore
* @typedef {import('./types').PieceContracts} PieceContracts
* @typedef {import('./types').DealArchive} DealArchive
* @typedef {import('../store/types').DealArchiveStore} DealArchiveStore
*/

/**
* On CRON tick, this function syncs deal store entries with the most up to date information stored
* in Spade's Oracle:
* - The previous oracle state known is fetched, as well as the latest state from Spade endpoint.
* - The current oracle state known is fetched, as well as the latest state from Spade endpoint.
* - Once both states are in memory, they are compared and a diff is generated.
* - Diff is stored in deal store
* - Handled new state of oracle is stored for comparison in next tick.
*
* @param {object} context
* @param {DealStore} context.dealStore
* @param {SpadeOracleStore} context.spadeOracleStore
* @param {DealArchiveStore} context.dealArchiveStore
* @param {URL} context.spadeOracleUrl
*/
export async function spadeOracleSyncTick ({
dealStore,
spadeOracleStore,
dealArchiveStore,
spadeOracleUrl
}) {
// Get previous recorded spade oracle contracts
const getPreviousSpadeOracle = await getSpadeOracleState({
spadeOracleStore,
spadeOracleId: spadeOracleUrl.toString(),
})
if (getPreviousSpadeOracle.error && getPreviousSpadeOracle.error.name !== RecordNotFoundErrorName) {
return getPreviousSpadeOracle
// Get latest deal archive
const fetchLatestDealArchiveRes = await fetchLatestDealArchive(spadeOracleUrl)
if (fetchLatestDealArchiveRes.error) {
return fetchLatestDealArchiveRes
}

// Get updated spade oracle contracts
const getUpdatedSpadeOracle = await getSpadeOracleCurrentState(spadeOracleUrl)
if (getUpdatedSpadeOracle.error) {
return getUpdatedSpadeOracle
// Get current recorded spade oracle contracts
const getCurrentDealArchiveRes = await getCurrentDealArchive({
dealArchiveStore,
spadeOracleId: spadeOracleUrl.toString(),
})
if (getCurrentDealArchiveRes.error && getCurrentDealArchiveRes.error.name !== RecordNotFoundErrorName) {
return getCurrentDealArchiveRes
}

// Get diff of contracts
const diffOracleContracts = computeDiffOracleState({
// fallsback to empty map if not found
previousOracleContracts: getPreviousSpadeOracle.ok || new Map(),
updatedOracleContracts: getUpdatedSpadeOracle.ok
})
/** @type {PieceContracts} */
let diffPieceContracts
if (!getCurrentDealArchiveRes.ok) {
diffPieceContracts = fetchLatestDealArchiveRes.ok
} else {
diffPieceContracts = computeDiff({
// falls back to empty map if not found
currentPieceContracts: getCurrentDealArchiveRes.ok,
updatedPieceContracts: fetchLatestDealArchiveRes.ok
})
}

// Store diff of contracts
const putDiff = await putDiffToDealStore({
dealStore,
diffOracleContracts
diffPieceContracts
})
if (putDiff.error) {
return putDiff
}

// Record spade oracle contracts handled
const putUpdatedSpadeOracle = await putUpdatedSpadeOracleState({
spadeOracleStore,
const putUpdatedSpadeOracle = await putLatestDealArchive({
dealArchiveStore,
spadeOracleId: spadeOracleUrl.toString(),
oracleContracts: getUpdatedSpadeOracle.ok
oracleContracts: fetchLatestDealArchiveRes.ok
})
if (putUpdatedSpadeOracle.error) {
return putUpdatedSpadeOracle
Expand All @@ -84,12 +89,12 @@ export async function spadeOracleSyncTick ({
/**
* @param {object} context
* @param {DealStore} context.dealStore
* @param {OracleContracts} context.diffOracleContracts
* @param {PieceContracts} context.diffPieceContracts
* @returns {Promise<import('../types').Result<{}, import('@web3-storage/filecoin-api/types').StorePutError>>}
*/
export async function putDiffToDealStore ({ dealStore, diffOracleContracts }) {
export async function putDiffToDealStore ({ dealStore, diffPieceContracts }) {
const res = await Promise.all(
Array.from(diffOracleContracts, ([key, value]) => {
Array.from(diffPieceContracts, ([key, value]) => {
return Promise.all(value.map(contract => {
/** @type {import('@web3-storage/data-segment').LegacyPieceLink} */
const legacyPieceCid = parseLink(key)
Expand All @@ -99,7 +104,8 @@ export async function putDiffToDealStore ({ dealStore, diffOracleContracts }) {
// @ts-expect-error not PieceCIDv2
piece: legacyPieceCid,
provider: `${contract.provider}`,
insertedAt: (new Date()).toISOString()
insertedAt: (new Date()).toISOString(),
updatedAt: (new Date()).toISOString()
})
}))
})
Expand All @@ -118,21 +124,21 @@ export async function putDiffToDealStore ({ dealStore, diffOracleContracts }) {

/**
* @param {object} context
* @param {OracleContracts} context.previousOracleContracts
* @param {OracleContracts} context.updatedOracleContracts
* @param {PieceContracts} context.currentPieceContracts
* @param {PieceContracts} context.updatedPieceContracts
*/
export function computeDiffOracleState ({ previousOracleContracts, updatedOracleContracts }) {
/** @type {OracleContracts} */
export function computeDiff ({ currentPieceContracts, updatedPieceContracts }) {
/** @type {PieceContracts} */
const diff = new Map()

for (const [pieceCid, contracts] of updatedOracleContracts.entries() ) {
const previousContracts = previousOracleContracts.get(pieceCid) || []
for (const [pieceCid, contracts] of updatedPieceContracts.entries() ) {
const currentContracts = currentPieceContracts.get(pieceCid) || []
// Find diff when different length
if (contracts.length !== previousContracts.length) {
if (contracts.length !== currentContracts.length) {
const diffContracts = []
// Get contracts for PieceCID still not recorded
for (const c of contracts) {
if (!previousContracts.find(pc => pc.dealId === c.dealId)) {
if (!currentContracts.find(pc => pc.dealId === c.dealId)) {
diffContracts.push(c)
}
}
Expand All @@ -145,12 +151,12 @@ export function computeDiffOracleState ({ previousOracleContracts, updatedOracle

/**
* @param {object} context
* @param {SpadeOracleStore} context.spadeOracleStore
* @param {DealArchiveStore} context.dealArchiveStore
* @param {string} context.spadeOracleId
* @returns {Promise<import('../types').Result<OracleContracts, Error>>}
* @returns {Promise<import('../types').Result<PieceContracts, Error>>}
*/
export async function getSpadeOracleState ({ spadeOracleStore, spadeOracleId }) {
const getRes = await spadeOracleStore.get(spadeOracleId)
export async function getCurrentDealArchive ({ dealArchiveStore, spadeOracleId }) {
const getRes = await dealArchiveStore.get(spadeOracleId)
if (getRes.error) {
return getRes
}
Expand All @@ -162,12 +168,12 @@ export async function getSpadeOracleState ({ spadeOracleStore, spadeOracleId })

/**
* @param {object} context
* @param {SpadeOracleStore} context.spadeOracleStore
* @param {DealArchiveStore} context.dealArchiveStore
* @param {string} context.spadeOracleId
* @param {OracleContracts} context.oracleContracts
* @param {PieceContracts} context.oracleContracts
*/
async function putUpdatedSpadeOracleState ({ spadeOracleStore, spadeOracleId, oracleContracts }) {
const putRes = await spadeOracleStore.put({
async function putLatestDealArchive ({ dealArchiveStore, spadeOracleId, oracleContracts }) {
const putRes = await dealArchiveStore.put({
key: spadeOracleId,
value: encode(Object.fromEntries(oracleContracts))
})
Expand All @@ -177,10 +183,10 @@ async function putUpdatedSpadeOracleState ({ spadeOracleStore, spadeOracleId, or

/**
* @param {URL} spadeOracleUrl
* @returns {Promise<import('../types').Result<OracleContracts, Error>>}
* @returns {Promise<import('../types').Result<PieceContracts, Error>>}
*/
async function getSpadeOracleCurrentState (spadeOracleUrl) {
/** @type {OracleContracts} */
async function fetchLatestDealArchive (spadeOracleUrl) {
/** @type {PieceContracts} */
const dealMap = new Map()
const res = await fetch(spadeOracleUrl)
if (!res.ok) {
Expand All @@ -195,9 +201,9 @@ async function getSpadeOracleCurrentState (spadeOracleUrl) {
Readable.fromWeb(res.body)
.pipe(ZSTDDecompress())
)
/** @type {SpadeOracle} */
const SpadeOracle = JSON.parse(toString(resDecompressed))
for (const replica of SpadeOracle.active_replicas) {
/** @type {DealArchive} */
const dealArchive = JSON.parse(resDecompressed.toString())
for (const replica of dealArchive.active_replicas) {
// Convert PieceCidV1 to PieceCidV2
const piecCid = convertPieceCidV1toPieceCidV2(
parseLink(replica.piece_cid),
Expand Down
16 changes: 7 additions & 9 deletions packages/core/src/deal-tracker/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@

export type OracleContracts = Map<string, Contract[]>
export type PieceContracts = Map<string, Contract[]>

export interface Contract {
provider: number
Expand All @@ -8,21 +7,20 @@ export interface Contract {
source: string
}

// Spade types

export interface SpadeContract {
// Spade Oracle types
export interface DealContract {
provider_id: number
legacy_market_id: number
legacy_market_end_epoch: number
}

export interface SpadeReplica {
contracts: SpadeContract[]
export interface DealReplica {
contracts: DealContract[]
piece_cid: string
piece_log2_size: number
}

export interface SpadeOracle {
export interface DealArchive {
state_epoch: number
active_replicas: SpadeReplica[]
active_replicas: DealReplica[]
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { PutObjectCommand, GetObjectCommand } from '@aws-sdk/client-s3'
import { PutObjectCommand, GetObjectCommand, HeadObjectCommand } from '@aws-sdk/client-s3'
import pRetry from 'p-retry'
import { StoreOperationFailed, RecordNotFound } from '@web3-storage/filecoin-api/errors'

Expand All @@ -11,7 +11,7 @@ import { connectBucket } from './index.js'
* @param {import('./types.js').BucketConnect | import('@aws-sdk/client-s3').S3Client} conf
* @param {object} context
* @param {string} context.name
* @returns {import('./types.js').SpadeOracleStore}
* @returns {import('./types.js').DealArchiveStore}
*/
export function createClient (conf, context) {
const bucketClient = connectBucket(conf)
Expand Down Expand Up @@ -72,23 +72,21 @@ export function createClient (conf, context) {
}
},
has: async (key) => {
const putCmd = new GetObjectCommand({
const putCmd = new HeadObjectCommand({
Bucket: context.name,
Key: encodeURIComponent(key)
})

let res
try {
res = await bucketClient.send(putCmd)
await bucketClient.send(putCmd)
} catch (/** @type {any} */ error) {
return {
error: new StoreOperationFailed(error.message)
if (error?.$metadata.httpStatusCode === 404) {
return {
ok: false
}
}
}

if (!res || !res.Body) {
return {
error: new RecordNotFound('item not found in store')
error: new StoreOperationFailed(error.message)
}
}

Expand Down
1 change: 1 addition & 0 deletions packages/core/src/store/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ export const dealStoreTableProps = {
expirationEpoch: 'number', // '4482396' epoch of deal expiration
source: 'string', // 'cargo.dag.haus' source of the deal information
insertedAt: 'string', // Insertion date as ISO string
updatedAt: 'string', // Update date as ISO string
},
primaryIndex: { partitionKey: 'piece', sortKey: 'dealId' },
globalIndexes: {
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/store/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ export interface DealerOfferStoreRecordValue {
}

/** ---------------------- Deal Tracker ---------------------- */
export interface SpadeOracleRecord {
export interface DealArchiveRecord {
key: string
value: ByteView<{
[k: string]: Contract[];
}>
}

export type SpadeOracleStore = Store<string, SpadeOracleRecord>
export type DealArchiveStore = Store<string, DealArchiveRecord>
Loading

0 comments on commit e247249

Please sign in to comment.