Skip to content

Commit

Permalink
feat(syncers): add swarm stamp syncer
Browse files Browse the repository at this point in the history
  • Loading branch information
PJColombo committed Jun 14, 2024
1 parent 363a5aa commit fb9c084
Show file tree
Hide file tree
Showing 14 changed files with 628 additions and 12 deletions.
5 changes: 5 additions & 0 deletions .changeset/fair-rabbits-cross.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@blobscan/syncers": minor
---

Added swarm stamp syncer
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"@vitest/coverage-v8": "^0.34.3",
"@vitest/ui": "^0.34.1",
"dotenv-cli": "^7.2.1",
"msw": "^2.3.1",
"prettier": "^2.8.8",
"prettier-plugin-tailwindcss": "^0.2.8",
"ts-node": "^10.9.1",
Expand Down
2 changes: 2 additions & 0 deletions packages/syncers/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
"@blobscan/dayjs": "workspace:^0.0.2",
"@blobscan/db": "workspace:^0.7.0",
"@blobscan/logger": "workspace:^0.1.0",
"@blobscan/zod": "workspace:^0.1.0",
"axios": "^1.7.2",
"bullmq": "^4.13.2",
"ioredis": "^5.3.2"
},
Expand Down
35 changes: 35 additions & 0 deletions packages/syncers/src/errors.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import type { AxiosError } from "axios";

import { z } from "@blobscan/zod";

export class ErrorException extends Error {
constructor(message: string, cause?: unknown) {
super(message, {
Expand All @@ -13,3 +17,34 @@ export class SyncerError extends ErrorException {
super(`Syncer "${syncerName}" failed: ${message}`, cause);
}
}

const swarmApiResponseErrorSchema = z.object({
code: z.number(),
message: z.string(),
reasons: z.array(z.unknown()).optional(),
});

export class SwarmNodeError extends ErrorException {
code: number | undefined;
reasons?: unknown[];

constructor(error: AxiosError) {
let message: string;
let code: number | undefined;
const result = swarmApiResponseErrorSchema.safeParse(error.response?.data);
let reasons: unknown[] | undefined;

if (result.success) {
code = result.data.code;
message = result.data.message;
reasons = result.data.reasons;
} else {
message = error.message;
}

super(message, error.cause);

this.code = code;
this.reasons = reasons;
}
}
1 change: 1 addition & 0 deletions packages/syncers/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export { BaseSyncer } from "./BaseSyncer";
export * from "./syncers";
export { createRedisConnection } from "./utils";
73 changes: 73 additions & 0 deletions packages/syncers/src/syncers/SwarmStampSyncer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import type { AxiosResponse } from "axios";
import { AxiosError } from "axios";
import axios from "axios";

import { prisma } from "@blobscan/db";

import { BaseSyncer } from "../BaseSyncer";
import type { CommonSyncerConfig } from "../BaseSyncer";
import { SwarmNodeError } from "../errors";

type BatchData = {
batchID: string;
batchTTL: number;
};

export interface SwarmStampSyncerConfig extends CommonSyncerConfig {
beeEndpoint: string;
batchId: string;
}

export class SwarmStampSyncer extends BaseSyncer {
constructor({
cronPattern,
redisUriOrConnection,
batchId,
beeEndpoint,
}: SwarmStampSyncerConfig) {
const name = "swarm-stamp";
super({
name,
cronPattern,
redisUriOrConnection,
syncerFn: async () => {
let response: AxiosResponse<BatchData>;

try {
const url = `${beeEndpoint}/stamps/${batchId}`;

response = await axios.get<BatchData>(url);
} catch (err) {
let cause = err;

if (err instanceof AxiosError) {
cause = new SwarmNodeError(err);
}

throw new Error(`Failed to fetch stamp batch "${batchId}"`, {
cause,
});
}

const { batchTTL } = response.data;

await prisma.blobStoragesState.upsert({
create: {
swarmDataId: batchId,
swarmDataTTL: batchTTL,
},
update: {
swarmDataTTL: batchTTL,
updatedAt: new Date(),
},
where: {
id: 1,
swarmDataId: batchId,
},
});

this.logger.info(`Swarm stamp data with batch ID "${batchId}" updated`);
},
});
}
}
2 changes: 2 additions & 0 deletions packages/syncers/src/syncers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ export { DailyStatsSyncer } from "./DailyStatsSyncer";
export type { DailyStatsSyncerConfig } from "./DailyStatsSyncer";
export { OverallStatsSyncer } from "./OverallStatsSyncer";
export type { OverallStatsSyncerConfig } from "./OverallStatsSyncer";
export { SwarmStampSyncer } from "./SwarmStampSyncer";
export type { SwarmStampSyncerConfig } from "./SwarmStampSyncer";
179 changes: 179 additions & 0 deletions packages/syncers/test/SwarmStampSyncer.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/* eslint-disable @typescript-eslint/no-misused-promises */
import { http, HttpResponse } from "msw";
import { setupServer } from "msw/node";
import { beforeAll, beforeEach, describe, expect, it } from "vitest";

import type { BlobStoragesState } from "@blobscan/db";
import { prisma } from "@blobscan/db";
import { fixtures, testValidError } from "@blobscan/test";

import type { SwarmStampSyncerConfig } from "../src/syncers/SwarmStampSyncer";
import { SwarmStampSyncer } from "../src/syncers/SwarmStampSyncer";

const BEE_ENDPOINT = process.env.BEE_ENDPOINT ?? "http://localhost:1633";

class SwarmStampSyncerMock extends SwarmStampSyncer {
constructor({ batchId, cronPattern }: Partial<SwarmStampSyncerConfig> = {}) {
super({
redisUriOrConnection: process.env.REDIS_URI ?? "",
cronPattern: cronPattern ?? "* * * * *",
batchId: batchId ?? process.env.SWARM_BATCH_ID ?? "",
beeEndpoint: BEE_ENDPOINT,
});
}

getQueue() {
return this.queue;
}

getWorkerProcessor() {
return this.syncerFn;
}
}

describe("SwarmStampSyncer", () => {
const expectedBatchId = fixtures.blobStoragesState[0]?.swarmDataId as string;
const expectedBatchTTL = 1000;

let swarmStampSyncer: SwarmStampSyncerMock;

beforeAll(() => {
const baseUrl = `${BEE_ENDPOINT}/stamps`;
const server = setupServer(
...[
http.get(`${baseUrl}/:batchId`, ({ request }) => {
const batchId = request.url.split("/").pop();

if (!batchId || batchId.length !== 64) {
return HttpResponse.json(
{
code: 400,
message: "invalid path params",
reasons: [
{
field: "batch_id",
error: "odd length hex string",
},
],
},
{ status: 400 }
);
}

if (batchId !== expectedBatchId) {
return HttpResponse.json(
{
code: 404,
message: "issuer does not exist",
},
{ status: 404 }
);
}

return HttpResponse.json(
{
batchID: expectedBatchId,
batchTTL: expectedBatchTTL,
},
{
status: 200,
}
);
}),
]
);

server.listen();

return () => {
server.close();
};
});

beforeEach(() => {
swarmStampSyncer = new SwarmStampSyncerMock();

return async () => {
await swarmStampSyncer.close();
};
});

describe("when creating a new swarm batch data row in the db", async () => {
let blobStorageState: BlobStoragesState | null = null;

beforeEach(async () => {
await prisma.blobStoragesState.deleteMany();

const workerProcessor = swarmStampSyncer.getWorkerProcessor();

await workerProcessor().catch((err) => console.log(err));

blobStorageState = await prisma.blobStoragesState.findFirst();
});

it("should create it with the correct swarm stamp batch ID", async () => {
expect(blobStorageState?.swarmDataId).toBe(process.env.SWARM_BATCH_ID);
});

it("should create it with the correct batch TTL", async () => {
expect(blobStorageState?.swarmDataTTL).toBe(expectedBatchTTL);
});
});

it("should update the batch TTl", async () => {
await prisma.blobStoragesState.update({
data: {
swarmDataTTL: 99999,
},
where: {
id: 1,
},
});

const workerProcessor = swarmStampSyncer.getWorkerProcessor();
await workerProcessor();

const blobStorageState = await prisma.blobStoragesState.findFirst();

expect(blobStorageState?.swarmDataTTL).toBe(expectedBatchTTL);
});

testValidError(
"should fail when trying to fetch a non-existing batch",
async () => {
const failingSwarmStampSyncer = new SwarmStampSyncerMock({
batchId:
"6b538866048cfb6e9e1d06805374c51572c11219d2d550c03e6e277366cb0371",
});
const failingWorkerProcessor =
failingSwarmStampSyncer.getWorkerProcessor();

await failingWorkerProcessor().finally(async () => {
await failingSwarmStampSyncer.close();
});
},
Error,
{
checkCause: true,
}
);

testValidError(
"should fail when trying to fetch an invalid batch",
async () => {
const failingSwarmStampSyncer = new SwarmStampSyncerMock({
batchId: "invalid-batch",
});
const failingWorkerProcessor =
failingSwarmStampSyncer.getWorkerProcessor();

await failingWorkerProcessor().finally(async () => {
await failingSwarmStampSyncer.close();
});
},
Error,
{
checkCause: true,
}
);
});
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Vitest Snapshot v1, https://vitest.dev/guide/snapshot.html

exports[`SwarmStampSyncer > should fail when trying to fetch a non-existing batch 1`] = `"Failed to fetch stamp batch \\"6b538866048cfb6e9e1d06805374c51572c11219d2d550c03e6e277366cb0371\\""`;

exports[`SwarmStampSyncer > should fail when trying to fetch a non-existing batch 2`] = `[SwarmNodeError: issuer does not exist]`;

exports[`SwarmStampSyncer > should fail when trying to fetch an invalid batch 1`] = `"Failed to fetch stamp batch \\"invalid-batch\\""`;

exports[`SwarmStampSyncer > should fail when trying to fetch an invalid batch 2`] = `[SwarmNodeError: invalid path params]`;
11 changes: 11 additions & 0 deletions packages/syncers/test/helpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { setupServer } from "msw/node";

export function createServer(handlers: Parameters<typeof setupServer>[0][]) {
const server = setupServer(...handlers);

server.listen();

return () => {
server.close();
};
}
8 changes: 2 additions & 6 deletions packages/test/src/fixtures/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import {
BlobData,
BlobDataStorageReference,
type PrismaClient,
type Rollup,
} from "@prisma/client";
import type { BlobData, BlobDataStorageReference } from "@prisma/client";
import type { PrismaClient, Rollup } from "@prisma/client";

import POSTGRES_DATA from "./postgres/data.json";

Expand Down
2 changes: 1 addition & 1 deletion packages/test/src/fixtures/postgres/data.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"blobStoragesState": [
{
"id": 1,
"swarmDataId": "batch-1",
"swarmDataId": "f89e63edf757f06e89933761d6d46592d03026efb9871f9d244f34da86b6c242",
"swarmDataTTL": 1000,
"updatedAt": "2023-10-31T12:10:00Z"
}
Expand Down
Loading

0 comments on commit fb9c084

Please sign in to comment.