Skip to content

Commit

Permalink
Snappy frame encode big payloads as chunks as per the standard (#3912)
Browse files Browse the repository at this point in the history
* Snappy frame encode big payloads as chunks as per standard

* cleanup comments

* moving the chunking upstream

* upgrade to the new snappy steam version supporting stream chunking

* switch to async version to match the chunks

* fixing the uncompress test

* update the package fix

* shift to sync compress version as it is almost 2x faster

* update comment

* lint

* flip async and sync chunks
  • Loading branch information
g11tech authored May 4, 2022
1 parent a3cb495 commit 029e7b7
Show file tree
Hide file tree
Showing 11 changed files with 127 additions and 13 deletions.
2 changes: 1 addition & 1 deletion packages/lodestar/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
"@chainsafe/lodestar-utils": "^0.36.0",
"@chainsafe/lodestar-validator": "^0.36.0",
"@chainsafe/persistent-merkle-tree": "^0.4.1",
"@chainsafe/snappy-stream": "5.0.0",
"@chainsafe/snappy-stream": "5.1.1",
"@chainsafe/ssz": "^0.9.1",
"@ethersproject/abi": "^5.0.0",
"@types/datastore-level": "^3.0.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ export async function* writeSszSnappyPayload<T extends RequestOrOutgoingResponse
* Buffered Snappy writer
*/
function encodeSszSnappy(bytes: Buffer): AsyncGenerator<Buffer> {
/**
* Use sync version (default) for compress as it is almost 2x faster than async
* one and most payloads are "1 chunk" and 100kb payloads (which would mostly be
* big bellatrix blocks with transactions) are just 2 chunks
*
* To use async version (for e.g. on big payloads) instantiate the stream with
* `createCompressStream({asyncCompress: true})`
*/
const stream = createCompressStream();
stream.write(bytes);
stream.end();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import chai, {expect} from "chai";
import chaiAsPromised from "chai-as-promised";
import varint from "varint";
import {BufferedSource} from "../../../../../../src/network/reqresp/utils";
import {readSszSnappyPayload} from "../../../../../../src/network/reqresp/encodingStrategies/sszSnappy";
import {isEqualSszType} from "../../../../../utils/ssz";
import {arrToSource} from "../../../../../../test/unit/network/reqresp/utils";
import {goerliShadowForkBlock13249} from "./testData";

chai.use(chaiAsPromised);

describe("network / reqresp / sszSnappy / decode", () => {
describe("Test data vectors (generated in a previous version)", () => {
const testCases = [goerliShadowForkBlock13249];

for (const {id, type, bytes, streamedBody, body} of testCases) {
const deserializedBody = body ?? type.deserialize(Buffer.from(bytes));
const streamedBytes = Buffer.concat([Buffer.from(varint.encode(bytes.length)), streamedBody]);

it(id, async () => {
const bufferedSource = new BufferedSource(arrToSource([streamedBytes]));
const bodyResult = await readSszSnappyPayload(bufferedSource, type);
expect(isEqualSszType(type, bodyResult, deserializedBody)).to.equal(true, "Wrong decoded body");
});
}
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import all from "it-all";
import pipe from "it-pipe";
import {expect} from "chai";
import varint from "varint";

import {allForks, ssz} from "@chainsafe/lodestar-types";

import {reqRespBlockResponseSerializer} from "../../../../../../src/network/reqresp/types";
import {writeSszSnappyPayload} from "../../../../../../src/network/reqresp/encodingStrategies/sszSnappy";
import {goerliShadowForkBlock13249} from "./testData";
import {RequestOrOutgoingResponseBody} from "../../../../../../src/network/reqresp/types";

describe("network / reqresp / sszSnappy / encode", () => {
describe("Test data vectors (generated in a previous version)", () => {
const testCases = [goerliShadowForkBlock13249];

for (const testCase of testCases) {
const {id, type, bytes, streamedBody, body} = testCase;
const deserializedBody = body ?? type.deserialize(Buffer.from(bytes));
const reqrespBody =
body ??
(type === ssz.bellatrix.SignedBeaconBlock
? {slot: (deserializedBody as allForks.SignedBeaconBlock).message.slot, bytes}
: deserializedBody);

it(id, async () => {
const encodedChunks = await pipe(
writeSszSnappyPayload(reqrespBody as RequestOrOutgoingResponseBody, reqRespBlockResponseSerializer),
all
);
const encodedStream = Buffer.concat(encodedChunks);
const expectedStreamed = Buffer.concat([Buffer.from(varint.encode(bytes.length)), streamedBody]);
expect(encodedStream).to.be.deep.equal(expectedStreamed);
});
}
});
});
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import fs from "node:fs";
import path from "node:path";

import {bellatrix, ssz} from "@chainsafe/lodestar-types";
import {RequestOrIncomingResponseBody, RequestOrResponseType} from "../../../../../../src/network/reqresp/types";

export interface ISszSnappyTestBlockData<T extends RequestOrIncomingResponseBody> {
id: string;
type: RequestOrResponseType;
bytes: Buffer;
streamedBody: Buffer;
body?: T;
}

/**
* A real big bellatrix block from goerli-shadow-fork-2 devnet, which is expected to be
* encoded in multiple chunks.
*/

export const goerliShadowForkBlock13249: ISszSnappyTestBlockData<bellatrix.SignedBeaconBlock> = {
id: "goerli-shadow-fork-block-13249",
type: ssz.bellatrix.SignedBeaconBlock,
bytes: fs.readFileSync(path.join(__dirname, "/goerliShadowForkBlock.13249/serialized.ssz")),
streamedBody: fs.readFileSync(path.join(__dirname, "/goerliShadowForkBlock.13249/streamed.snappy")),
};
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ describe("snappy frames uncompress", function () {
const decompress = new SnappyFramesUncompress();

const testData = Buffer.alloc(100000, 4).toString();
let result = Buffer.alloc(0);

compressStream.on("data", function (data) {
const result = decompress.uncompress(data);
if (result) {
// testData will come compressed as two or more chunks
result = Buffer.concat([result, decompress.uncompress(data) ?? Buffer.alloc(0)]);
if (result.length === testData.length) {
expect(result.toString()).to.be.equal(testData);
done();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,22 @@ export interface ISszSnappyTestData<T extends RequestOrIncomingResponseBody> {
id: string;
type: RequestOrResponseType;
body: T;
/** chunks expected in an async compress version of snappy stream */
asyncChunks: Buffer[];
/** chunks expected in a sync compress version of snappy stream */
chunks: Buffer[];
}

export const sszSnappyPing: ISszSnappyTestData<phase0.Ping> = {
id: "Ping type",
type: ssz.phase0.Ping,
body: BigInt(1),
chunks: [
asyncChunks: [
"0x08", // length prefix
"0xff060000734e61507059", // snappy frames header
"0x010c00000175de410100000000000000", // snappy frames content
].map(fromHexString) as Buffer[],
chunks: ["0x08", "0xff060000734e61507059010c00000175de410100000000000000"].map(fromHexString) as Buffer[],
};

export const sszSnappyStatus: ISszSnappyTestData<phase0.Status> = {
Expand All @@ -33,11 +37,14 @@ export const sszSnappyStatus: ISszSnappyTestData<phase0.Status> = {
headRoot: Buffer.alloc(32, 0xda),
headSlot: 9,
},
chunks: [
asyncChunks: [
"0x54", // length prefix
"0xff060000734e61507059", // snappy frames header
"0x001b0000097802c15400da8a010004090009017e2b001c0900000000000000",
].map(fromHexString) as Buffer[],
chunks: ["0x54", "0xff060000734e61507059001b0000097802c15400da8a010004090009017e2b001c0900000000000000"].map(
fromHexString
) as Buffer[],
};

export const sszSnappySignedBeaconBlockPhase0: ISszSnappyTestData<phase0.SignedBeaconBlock> = {
Expand Down Expand Up @@ -66,11 +73,15 @@ export const sszSnappySignedBeaconBlockPhase0: ISszSnappyTestData<phase0.SignedB
},
signature: Buffer.alloc(96, 0xda),
},
chunks: [
asyncChunks: [
"0x9403",
"0xff060000734e61507059",
"0x00340000fff3b3f594031064000000dafe01007a010004090009011108fe6f000054feb4008ab4007e0100fecc0011cc0cdc0000003e0400",
].map(fromHexString) as Buffer[],
chunks: [
"0x9403",
"0xff060000734e6150705900340000fff3b3f594031064000000dafe01007a010004090009011108fe6f000054feb4008ab4007e0100fecc0011cc0cdc0000003e0400",
].map(fromHexString) as Buffer[],
};

export const sszSnappySignedBeaconBlockAltair: ISszSnappyTestData<altair.SignedBeaconBlock> = {
Expand All @@ -87,9 +98,13 @@ export const sszSnappySignedBeaconBlockAltair: ISszSnappyTestData<altair.SignedB
},
},
},
chunks: [
asyncChunks: [
"0xf803", // length prefix
"0xff060000734e61507059", // snappy frames header
"0x003f0000ee14ab0df8031064000000dafe01007a01000c995f0100010100090105ee70000d700054ee44000d44fe0100fecc0011cc0c400100003e0400fe01008e0100",
].map(fromHexString) as Buffer[],
chunks: [
"0xf803",
"0xff060000734e61507059003f0000ee14ab0df8031064000000dafe01007a01000c995f0100010100090105ee70000d700054ee44000d44fe0100fecc0011cc0c400100003e0400fe01008e0100",
].map(fromHexString) as Buffer[],
};
2 changes: 1 addition & 1 deletion packages/lodestar/types/snappy-stream/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ declare module "@chainsafe/snappy-stream" {
import {Transform} from "node:stream";

export function createUncompressStream(opts?: {asBuffer?: boolean}): Transform;
export function createCompressStream(): Transform;
export function createCompressStream(opts?:{asyncCompress?: boolean}): Transform;
}
10 changes: 5 additions & 5 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -493,10 +493,10 @@
resolved "https://registry.yarnpkg.com/@chainsafe/persistent-ts/-/persistent-ts-0.19.1.tgz#53d03aa31ef7698b09327f74eef01e286b97bae3"
integrity sha512-fUFFFFxdcpYkMAHnjm83EYL/R/smtVmEkJr3FGSI6dwPk4ue9rXjEHf7FTd3V8AbVOcTJGriN4cYf2V+HOYkjQ==

"@chainsafe/snappy-stream@5.0.0":
version "5.0.0"
resolved "https://registry.yarnpkg.com/@chainsafe/snappy-stream/-/snappy-stream-5.0.0.tgz#87dfb8dd6e5a20c7e982700974fd59941f5a96c4"
integrity sha512-E5Y9KsyTMjXGgoLl2sIetiIpztum4NUznNDAYa+DoN20HjGNUv4ZSB5rnQrlWKVq6POnkL6vPTZC2TLYosR0wA==
"@chainsafe/snappy-stream@5.1.1":
version "5.1.1"
resolved "https://registry.yarnpkg.com/@chainsafe/snappy-stream/-/snappy-stream-5.1.1.tgz#9f3c79fd936b591d4a79d1801ffb582df54f1858"
integrity sha512-wy1c0RLUttVYMQHN/zs473LIzZ6NEL2xG3T8vJp+Ag99H/lQldnwYY6aKGcMrt6hEhndRx0a4NaUxr70MTzsLg==
dependencies:
"@chainsafe/fast-crc32c" "3.0.0"
bl "^1.0.0"
Expand Down Expand Up @@ -7063,7 +7063,7 @@ libnpmaccess@^4.0.1:
npm-package-arg "^8.1.2"
npm-registry-fetch "^10.0.0"

libnpmpublish@4.0.0, libnpmpublish@^4.0.0:
libnpmpublish@^4.0.0:
version "4.0.0"
resolved "https://registry.yarnpkg.com/libnpmpublish/-/libnpmpublish-4.0.0.tgz#ad6413914e0dfd78df868ce14ba3d3a4cc8b385b"
integrity sha512-2RwYXRfZAB1x/9udKpZmqEzSqNd7ouBRU52jyG14/xG8EF+O9A62d7/XVR3iABEQHf1iYhkm0Oq9iXjrL3tsXA==
Expand Down

0 comments on commit 029e7b7

Please sign in to comment.