Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: deal tracker spade sync cron #57

Merged
merged 8 commits into from
Oct 30, 2023
Merged

Conversation

vasco-santos
Copy link
Contributor

@vasco-santos vasco-santos commented Oct 25, 2023

Adds deal tracker spade sync cron. As previously decided, this implements a diff based approach where newest Spade Oracle Data is fetched, compared with previous dowloaded one, generating a diff for insertion in DynamoDB

Decisions:

  • did not add this cron into filecoin-api repo given it is implementation detail, while other CRONs are part of the receipt chain
  • did not worry with pagination of deals in query, as well as memory consumption of the lambda for MVP
    • looking at options to read and decompress zst files while streamming, so that we could avoid double memory to put in a Map. However, this would mean parsing lines without JSON richness which would be quite painful to implement
    • current runs with real file provided by Spade endpoint fit well under the configured memory point, so we are likely safe until implementing our own Oracle (and we have the fallback of increasing Lambda memory if needed as a temporary solution...)
  • for now did not bother implementing a batch insert given they are also quite limited in dynamoDB and cost the same. But might reconsider that as fast follow up

Notes:

  • added a mock server to serve oracle files compressed as spade provides them (aka json.zst)

Needs:

@seed-deploy seed-deploy bot temporarily deployed to pr57 October 25, 2023 14:13 Inactive
@seed-deploy
Copy link

seed-deploy bot commented Oct 25, 2023

View stack outputs

@seed-deploy
Copy link

seed-deploy bot commented Oct 26, 2023

Stack outputs updated

.pipe(ZSTDDecompress())
)
/** @type {SpadeOracle} */
const SpadeOracle = JSON.parse(toString(resDecompressed))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the readme you can just call resDecompressed.toString()

https://www.npmjs.com/package/stream-read-all

}
},
has: async (key) => {
const putCmd = new GetObjectCommand({
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const putCmd = new GetObjectCommand({
const headCmd = new HeadObjectCommand({

}

// Get updated spade oracle contracts
const getUpdatedSpadeOracle = await getSpadeOracleCurrentState(spadeOracleUrl)
Copy link
Member

@alanshaw alanshaw Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps do this first, so we don't hold all the other state in memory for the duration. I assume this will take longer/be more likely to fail.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we swap "spade oracle state" for "deal archive"? So this would be for example:

Suggested change
const getUpdatedSpadeOracle = await getSpadeOracleCurrentState(spadeOracleUrl)
const { ok: nextArchive, error } = await fetchDealArchive(spadeOracleUrl)


// Get diff of contracts
const diffOracleContracts = computeDiffOracleState({
// fallsback to empty map if not found
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can probably skip computing diff altogether if not found.

*/
export async function spadeOracleSyncTick ({
dealStore,
spadeOracleStore,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should name after what is being stored. So I'd call this dealArchiveStore or something.

}

return {
ok: new Map(Object.entries(decode(getRes.ok.value))),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why doesn't the store decode do this?

spadeOracleUrl
}) {
// Get previous recorded spade oracle contracts
const getPreviousSpadeOracle = await getSpadeOracleState({
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're fetching the current deal archive from our store. Until it is replaced, it is current. Previous sounds like it's the archive that's 1 before current.

Ideally this would just be const { ok: currentArchive } = dealArchiveStore.get().

Copy link
Contributor Author

@vasco-santos vasco-santos Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will rename to current! Thanks

we have spade now, but we could have other stores, but changing to dealArchiveStore we should keep it with explicit key for spade.

}

// Get diff of contracts
const diffOracleContracts = computeDiffOracleState({
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const diffOracleContracts = computeDiffOracleState({
const diff = computeDiff(currentArchive, nextArchive)

// @ts-expect-error not PieceCIDv2
piece: legacyPieceCid,
provider: `${contract.provider}`,
insertedAt: (new Date()).toISOString()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we track insertedAt and updatedAt?

.pipe(ZSTDDecompress())
)
/** @type {SpadeOracle} */
const SpadeOracle = JSON.parse(toString(resDecompressed))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const SpadeOracle = JSON.parse(toString(resDecompressed))
const rawData = JSON.parse(toString(resDecompressed))

@seed-deploy seed-deploy bot temporarily deployed to pr57 October 26, 2023 16:32 Inactive
Comment on lines 224 to 233
const piece = Piece.fromInfo({
link,
size: Piece.Size.fromHeight(height)
})
Copy link

@Gozala Gozala Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You already have a height and root is link.multihash.digest so you could alse use Piece.view and skip whole info creation piece, that said you would have to do something like

Suggested change
const piece = Piece.fromInfo({
link,
size: Piece.Size.fromHeight(height)
})
const piece = Piece.toView({
root: link.multihash.digest,
height,
padding: 0n
})

If we do know original CAR size however it would be great to compute padding also instead of just putting 0n however.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need the size here and not height: https://github.com/web3-storage/data-segment/blob/main/src/piece.js#L104

This is the aggregate piece CID that Spade Oracle has, while tracking Deals (uses the old Piece Cid v1 + height)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need the size here and not height: https://github.com/web3-storage/data-segment/blob/main/src/piece.js#L104

Sorry I had typo I meant Piece.toView function in the suggestion, which takes {height, root, padding} triple.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the aggregate piece CID that Spade Oracle has, while tracking Deals (uses the old Piece Cid v1 + height)

Ah ok, aggregates don't really have padding so that makes sense. I actually wanted to use different CIDs for aggregates, but also don't really want to argue for them in the spec.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool thanks, changed to suggestion

@seed-deploy seed-deploy bot temporarily deployed to pr57 October 26, 2023 17:22 Inactive
@seed-deploy
Copy link

seed-deploy bot commented Oct 26, 2023

Stack outputs updated

@seed-deploy seed-deploy bot temporarily deployed to pr57 October 26, 2023 17:30 Inactive
@seed-deploy
Copy link

seed-deploy bot commented Oct 26, 2023

Stack outputs updated

@seed-deploy seed-deploy bot temporarily deployed to pr57 October 26, 2023 18:03 Inactive
@seed-deploy seed-deploy bot temporarily deployed to pr57 October 26, 2023 19:10 Inactive
@seed-deploy seed-deploy bot temporarily deployed to pr57 October 26, 2023 19:25 Inactive
@Gozala Gozala requested review from Gozala and removed request for Gozala October 26, 2023 19:37
@seed-deploy seed-deploy bot temporarily deployed to pr57 October 26, 2023 19:37 Inactive
@Gozala
Copy link

Gozala commented Oct 26, 2023

Looks like @alanshaw has already reviewed this, I'll let him do and focus on other things unless you ask me otherise.

@seed-deploy seed-deploy bot temporarily deployed to pr57 October 26, 2023 19:54 Inactive
@seed-deploy seed-deploy bot temporarily deployed to pr57 October 26, 2023 20:06 Inactive
@vasco-santos
Copy link
Contributor Author

@alanshaw skipped old tests for now given they were timing out sometimes. We will integrate as follow up the aggregator-api new tests, and I will get rid of this anyway then

diffPieceContracts = fetchLatestDealArchiveRes.ok
} else {
diffPieceContracts = computeDiff({
// falls back to empty map if not found
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// falls back to empty map if not found

Comment on lines 102 to 109
return dealStore.put({
...contract,
// @ts-expect-error not PieceCIDv2
piece: legacyPieceCid,
provider: `${contract.provider}`,
insertedAt: (new Date()).toISOString(),
updatedAt: (new Date()).toISOString()
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just so we don't get ever so slightly different dates...and can tell if something was ever updated.

Suggested change
return dealStore.put({
...contract,
// @ts-expect-error not PieceCIDv2
piece: legacyPieceCid,
provider: `${contract.provider}`,
insertedAt: (new Date()).toISOString(),
updatedAt: (new Date()).toISOString()
})
const insertedAt = new Date().toISOString()
return dealStore.put({
...contract,
// @ts-expect-error not PieceCIDv2
piece: legacyPieceCid,
provider: `${contract.provider}`,
insertedAt,
updatedAt: insertedAt
})

*/
export async function putDiffToDealStore ({ dealStore, diffPieceContracts }) {
const res = await Promise.all(
Array.from(diffPieceContracts, ([key, value]) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be easier to understand if these were named after what they represent:

Suggested change
Array.from(diffPieceContracts, ([key, value]) => {
Array.from(diffPieceContracts, ([pieceCidStr, contracts]) => {

for (const [pieceCid, contracts] of updatedPieceContracts.entries() ) {
const currentContracts = currentPieceContracts.get(pieceCid) || []
// Find diff when different length
if (contracts.length !== currentContracts.length) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could one contract have expired and another added? i.e. the number of contracts did not change but the contracts did. I would imagine you need to do the diff regardless.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure, but should not hurt removing this validation

Comment on lines 208 to 212
const piecCid = convertPieceCidV1toPieceCidV2(
parseLink(replica.piece_cid),
replica.piece_log2_size
)
dealMap.set(piecCid.toString(), replica.contracts.map(c => ({
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const piecCid = convertPieceCidV1toPieceCidV2(
parseLink(replica.piece_cid),
replica.piece_log2_size
)
dealMap.set(piecCid.toString(), replica.contracts.map(c => ({
const pieceCid = convertPieceCidV1toPieceCidV2(
parseLink(replica.piece_cid),
replica.piece_log2_size
)
dealMap.set(pieceCid.toString(), replica.contracts.map(c => ({

spadeOracleUrl
}) {
// Get latest deal archive
const fetchLatestDealArchiveRes = await fetchLatestDealArchive(spadeOracleUrl)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not blocking - I imagine there will be times where the archive has not changed - would it be possible to do a head request to get an etag and avoid pulling a dump with no differences?

Then the deal archive store is more like hash => data and we store a pointer to the latest and delete (or just expire) archives we no longer need.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will create an issue to improve this and add a comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#62

async function putLatestDealArchive ({ dealArchiveStore, spadeOracleId, oracleContracts }) {
const putRes = await dealArchiveStore.put({
key: spadeOracleId,
value: encode(Object.fromEntries(oracleContracts))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just store the data we retrieved and avoid re-encoding?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then we would need to encode/decode all CIDs given stores functions use CID. This was the reason to do the dag json.

if (!res.ok) {
return {
// TODO: Error
error: new Error('could not read')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I typically just report the status code:

Suggested change
error: new Error('could not read')
error: new Error(`unexpected response status fetching deal archive: ${res.status}`)

@seed-deploy seed-deploy bot temporarily deployed to pr57 October 30, 2023 14:45 Inactive
@vasco-santos vasco-santos merged commit fee55b1 into main Oct 30, 2023
3 checks passed
@vasco-santos vasco-santos deleted the feat/deal-tracker-cron branch October 30, 2023 14:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants