Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snappy frame encode big payloads as chunks as per the standard #3912

Merged
merged 11 commits into from
May 4, 2022
2 changes: 1 addition & 1 deletion packages/lodestar/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
"@chainsafe/lodestar-utils": "^0.36.0",
"@chainsafe/lodestar-validator": "^0.36.0",
"@chainsafe/persistent-merkle-tree": "^0.4.1",
"@chainsafe/snappy-stream": "5.0.0",
"@chainsafe/snappy-stream": "5.1.1",
"@chainsafe/ssz": "^0.9.1",
"@ethersproject/abi": "^5.0.0",
"@types/datastore-level": "^3.0.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ export async function* writeSszSnappyPayload<T extends RequestOrOutgoingResponse
* Buffered Snappy writer
*/
function encodeSszSnappy(bytes: Buffer): AsyncGenerator<Buffer> {
/**
* Use sync version (default) for compress as it is almost 2x faster than async
* one and most payloads are "1 chunk" and 100kb payloads (which would mostly be
* big bellatrix blocks with transactions) are just 2 chunks
*
* To use async version (for e.g. on big payloads) instantiate the stream with
* `createCompressStream({asyncCompress: true})`
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much for adding a nice reasoning comment! ❤️

const stream = createCompressStream();
stream.write(bytes);
stream.end();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import chai, {expect} from "chai";
import chaiAsPromised from "chai-as-promised";
import varint from "varint";
import {BufferedSource} from "../../../../../../src/network/reqresp/utils";
import {readSszSnappyPayload} from "../../../../../../src/network/reqresp/encodingStrategies/sszSnappy";
import {isEqualSszType} from "../../../../../utils/ssz";
import {arrToSource} from "../../../../../../test/unit/network/reqresp/utils";
import {goerliShadowForkBlock13249} from "./testData";

chai.use(chaiAsPromised);

describe("network / reqresp / sszSnappy / decode", () => {
describe("Test data vectors (generated in a previous version)", () => {
const testCases = [goerliShadowForkBlock13249];

for (const {id, type, bytes, streamedBody, body} of testCases) {
const deserializedBody = body ?? type.deserialize(Buffer.from(bytes));
const streamedBytes = Buffer.concat([Buffer.from(varint.encode(bytes.length)), streamedBody]);

it(id, async () => {
const bufferedSource = new BufferedSource(arrToSource([streamedBytes]));
const bodyResult = await readSszSnappyPayload(bufferedSource, type);
expect(isEqualSszType(type, bodyResult, deserializedBody)).to.equal(true, "Wrong decoded body");
});
}
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import all from "it-all";
import pipe from "it-pipe";
import {expect} from "chai";
import varint from "varint";

import {allForks, ssz} from "@chainsafe/lodestar-types";

import {reqRespBlockResponseSerializer} from "../../../../../../src/network/reqresp/types";
import {writeSszSnappyPayload} from "../../../../../../src/network/reqresp/encodingStrategies/sszSnappy";
import {goerliShadowForkBlock13249} from "./testData";
import {RequestOrOutgoingResponseBody} from "../../../../../../src/network/reqresp/types";

describe("network / reqresp / sszSnappy / encode", () => {
describe("Test data vectors (generated in a previous version)", () => {
const testCases = [goerliShadowForkBlock13249];

for (const testCase of testCases) {
const {id, type, bytes, streamedBody, body} = testCase;
const deserializedBody = body ?? type.deserialize(Buffer.from(bytes));
const reqrespBody =
body ??
(type === ssz.bellatrix.SignedBeaconBlock
? {slot: (deserializedBody as allForks.SignedBeaconBlock).message.slot, bytes}
: deserializedBody);

it(id, async () => {
const encodedChunks = await pipe(
writeSszSnappyPayload(reqrespBody as RequestOrOutgoingResponseBody, reqRespBlockResponseSerializer),
all
);
const encodedStream = Buffer.concat(encodedChunks);
const expectedStreamed = Buffer.concat([Buffer.from(varint.encode(bytes.length)), streamedBody]);
expect(encodedStream).to.be.deep.equal(expectedStreamed);
});
}
});
});
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import fs from "node:fs";
import path from "node:path";

import {bellatrix, ssz} from "@chainsafe/lodestar-types";
import {RequestOrIncomingResponseBody, RequestOrResponseType} from "../../../../../../src/network/reqresp/types";

export interface ISszSnappyTestBlockData<T extends RequestOrIncomingResponseBody> {
id: string;
type: RequestOrResponseType;
bytes: Buffer;
streamedBody: Buffer;
body?: T;
}

/**
* A real big bellatrix block from goerli-shadow-fork-2 devnet, which is expected to be
* encoded in multiple chunks.
*/

export const goerliShadowForkBlock13249: ISszSnappyTestBlockData<bellatrix.SignedBeaconBlock> = {
id: "goerli-shadow-fork-block-13249",
type: ssz.bellatrix.SignedBeaconBlock,
bytes: fs.readFileSync(path.join(__dirname, "/goerliShadowForkBlock.13249/serialized.ssz")),
streamedBody: fs.readFileSync(path.join(__dirname, "/goerliShadowForkBlock.13249/streamed.snappy")),
};
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ describe("snappy frames uncompress", function () {
const decompress = new SnappyFramesUncompress();

const testData = Buffer.alloc(100000, 4).toString();
let result = Buffer.alloc(0);

compressStream.on("data", function (data) {
const result = decompress.uncompress(data);
if (result) {
// testData will come compressed as two or more chunks
result = Buffer.concat([result, decompress.uncompress(data) ?? Buffer.alloc(0)]);
if (result.length === testData.length) {
expect(result.toString()).to.be.equal(testData);
done();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,22 @@ export interface ISszSnappyTestData<T extends RequestOrIncomingResponseBody> {
id: string;
type: RequestOrResponseType;
body: T;
/** chunks expected in an async compress version of snappy stream */
asyncChunks: Buffer[];
/** chunks expected in a sync compress version of snappy stream */
chunks: Buffer[];
}

export const sszSnappyPing: ISszSnappyTestData<phase0.Ping> = {
id: "Ping type",
type: ssz.phase0.Ping,
body: BigInt(1),
chunks: [
asyncChunks: [
"0x08", // length prefix
"0xff060000734e61507059", // snappy frames header
"0x010c00000175de410100000000000000", // snappy frames content
].map(fromHexString) as Buffer[],
chunks: ["0x08", "0xff060000734e61507059010c00000175de410100000000000000"].map(fromHexString) as Buffer[],
};

export const sszSnappyStatus: ISszSnappyTestData<phase0.Status> = {
Expand All @@ -33,11 +37,14 @@ export const sszSnappyStatus: ISszSnappyTestData<phase0.Status> = {
headRoot: Buffer.alloc(32, 0xda),
headSlot: 9,
},
chunks: [
asyncChunks: [
"0x54", // length prefix
"0xff060000734e61507059", // snappy frames header
"0x001b0000097802c15400da8a010004090009017e2b001c0900000000000000",
].map(fromHexString) as Buffer[],
chunks: ["0x54", "0xff060000734e61507059001b0000097802c15400da8a010004090009017e2b001c0900000000000000"].map(
fromHexString
) as Buffer[],
};

export const sszSnappySignedBeaconBlockPhase0: ISszSnappyTestData<phase0.SignedBeaconBlock> = {
Expand Down Expand Up @@ -66,11 +73,15 @@ export const sszSnappySignedBeaconBlockPhase0: ISszSnappyTestData<phase0.SignedB
},
signature: Buffer.alloc(96, 0xda),
},
chunks: [
asyncChunks: [
"0x9403",
"0xff060000734e61507059",
"0x00340000fff3b3f594031064000000dafe01007a010004090009011108fe6f000054feb4008ab4007e0100fecc0011cc0cdc0000003e0400",
].map(fromHexString) as Buffer[],
chunks: [
"0x9403",
"0xff060000734e6150705900340000fff3b3f594031064000000dafe01007a010004090009011108fe6f000054feb4008ab4007e0100fecc0011cc0cdc0000003e0400",
].map(fromHexString) as Buffer[],
};

export const sszSnappySignedBeaconBlockAltair: ISszSnappyTestData<altair.SignedBeaconBlock> = {
Expand All @@ -87,9 +98,13 @@ export const sszSnappySignedBeaconBlockAltair: ISszSnappyTestData<altair.SignedB
},
},
},
chunks: [
asyncChunks: [
"0xf803", // length prefix
"0xff060000734e61507059", // snappy frames header
"0x003f0000ee14ab0df8031064000000dafe01007a01000c995f0100010100090105ee70000d700054ee44000d44fe0100fecc0011cc0c400100003e0400fe01008e0100",
].map(fromHexString) as Buffer[],
chunks: [
"0xf803",
"0xff060000734e61507059003f0000ee14ab0df8031064000000dafe01007a01000c995f0100010100090105ee70000d700054ee44000d44fe0100fecc0011cc0c400100003e0400fe01008e0100",
].map(fromHexString) as Buffer[],
};
2 changes: 1 addition & 1 deletion packages/lodestar/types/snappy-stream/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ declare module "@chainsafe/snappy-stream" {
import {Transform} from "node:stream";

export function createUncompressStream(opts?: {asBuffer?: boolean}): Transform;
export function createCompressStream(): Transform;
export function createCompressStream(opts?:{asyncCompress?: boolean}): Transform;
}
10 changes: 5 additions & 5 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -493,10 +493,10 @@
resolved "https://registry.yarnpkg.com/@chainsafe/persistent-ts/-/persistent-ts-0.19.1.tgz#53d03aa31ef7698b09327f74eef01e286b97bae3"
integrity sha512-fUFFFFxdcpYkMAHnjm83EYL/R/smtVmEkJr3FGSI6dwPk4ue9rXjEHf7FTd3V8AbVOcTJGriN4cYf2V+HOYkjQ==

"@chainsafe/snappy-stream@5.0.0":
version "5.0.0"
resolved "https://registry.yarnpkg.com/@chainsafe/snappy-stream/-/snappy-stream-5.0.0.tgz#87dfb8dd6e5a20c7e982700974fd59941f5a96c4"
integrity sha512-E5Y9KsyTMjXGgoLl2sIetiIpztum4NUznNDAYa+DoN20HjGNUv4ZSB5rnQrlWKVq6POnkL6vPTZC2TLYosR0wA==
"@chainsafe/snappy-stream@5.1.1":
version "5.1.1"
resolved "https://registry.yarnpkg.com/@chainsafe/snappy-stream/-/snappy-stream-5.1.1.tgz#9f3c79fd936b591d4a79d1801ffb582df54f1858"
integrity sha512-wy1c0RLUttVYMQHN/zs473LIzZ6NEL2xG3T8vJp+Ag99H/lQldnwYY6aKGcMrt6hEhndRx0a4NaUxr70MTzsLg==
dependencies:
"@chainsafe/fast-crc32c" "3.0.0"
bl "^1.0.0"
Expand Down Expand Up @@ -7063,7 +7063,7 @@ libnpmaccess@^4.0.1:
npm-package-arg "^8.1.2"
npm-registry-fetch "^10.0.0"

libnpmpublish@4.0.0, libnpmpublish@^4.0.0:
libnpmpublish@^4.0.0:
version "4.0.0"
resolved "https://registry.yarnpkg.com/libnpmpublish/-/libnpmpublish-4.0.0.tgz#ad6413914e0dfd78df868ce14ba3d3a4cc8b385b"
integrity sha512-2RwYXRfZAB1x/9udKpZmqEzSqNd7ouBRU52jyG14/xG8EF+O9A62d7/XVR3iABEQHf1iYhkm0Oq9iXjrL3tsXA==
Expand Down