Skip to content

Commit

Permalink
segmented-object: FileChunkSource input from ZenFS
Browse files Browse the repository at this point in the history
  • Loading branch information
yoursunny committed May 27, 2024
1 parent 80dba38 commit 4c2106a
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 14 deletions.
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"test": "vitest",
"typedoc": "bash mk/typedoc.sh"
},
"packageManager": "[email protected].2+sha256.19c17528f9ca20bd442e4ca42f00f1b9808a9cb419383cd04ba32ef19322aba7",
"packageManager": "[email protected].3+sha256.7f63001edc077f1cff96cacba901f350796287a2800dfa83fe898f94183e4f5f",
"devDependencies": {
"@types/node": "^20.12.12",
"@types/wtfnode": "^0.7.3",
Expand All @@ -39,7 +39,6 @@
},
"updateConfig": {
"ignoreDependencies": [
"@browserfs/core",
"abstract-level-1",
"graphql-request"
]
Expand Down
2 changes: 0 additions & 2 deletions pkg/fileserver/tests/client.t.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import "@ndn/util/test-fixture/expect";

import { assert } from "node:console";

import { produce } from "@ndn/endpoint";
import { Forwarder } from "@ndn/fw";
import { Segment, Version } from "@ndn/naming-convention2";
Expand Down
4 changes: 2 additions & 2 deletions pkg/segmented-object/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ The consumer functionality:
* [X] supports multiple naming conventions.
* [X] has Interest pipelining, congestion control, and loss recovery.
* [X] verifies packets with a `Verifier` (fixed key or trust schema).
* [X] emits events as segments arrive.
* [ ] emits events as segments arrive.
* [X] outputs in-order data chunks as a readable stream.
* [X] outputs completely reassembled object via Promise.

The producer functionality:

* [X] takes input from `Uint8Array`.
* [X] takes input from readable streams.
* [X] takes input from files (`Blob` in browser and Node.js, filename in Node.js).
* [X] takes input from files or `Blob`.
* [X] generates segments of fixed size.
* [ ] generates segments of available data as Interest arrives, to minimize delivery latency.
* [X] responds to version discovery Interests with CanBePrefix.
Expand Down
1 change: 1 addition & 0 deletions pkg/segmented-object/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"@ndn/naming-convention2": "workspace:*",
"@ndn/packet": "workspace:*",
"@ndn/util": "workspace:*",
"@zenfs/core": "^0.12.6",
"hirestime": "^7.0.4",
"it-keepalive": "^1.2.0",
"mnemonist": "^0.39.8",
Expand Down
1 change: 1 addition & 0 deletions pkg/segmented-object/src/serve/chunk-source/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export interface ChunkSource {
close?: () => void;
}

/** ChunkSource where total size is known. */
export abstract class KnownSizeChunkSource implements ChunkSource {
constructor(
protected readonly chunkSize: number,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import fs from "node:fs/promises";
import type { FileHandle } from "node:fs/promises";

import PLazy from "p-lazy";

import { type Chunk, type ChunkOptions, type ChunkSource, getMaxChunkSize, KnownSizeChunkSource } from "./common";
import { fsOpen } from "./platform_node";

class FileHandleChunkSource extends KnownSizeChunkSource {
constructor(private readonly fh: fs.FileHandle, chunkSize: number, totalSize: number) {
constructor(private readonly fh: FileHandle, chunkSize: number, totalSize: number) {
super(chunkSize, totalSize);
}

Expand All @@ -28,10 +29,10 @@ class FileHandleChunkSource extends KnownSizeChunkSource {
* Warning: modifying the file while FileChunkSource is active may cause undefined behavior.
*/
export class FileChunkSource implements ChunkSource {
constructor(path: string, opts: ChunkOptions = {}) {
constructor(path: string, opts: FileChunkSource.Options = {}) {
const chunkSize = getMaxChunkSize(opts);
this.opening = PLazy.from(async () => {
const fh = await fs.open(path, "r");
const fh = await fsOpen(path, opts);
const { size } = await fh.stat();
return new FileHandleChunkSource(fh, chunkSize, size);
});
Expand All @@ -56,3 +57,17 @@ export class FileChunkSource implements ChunkSource {
await h.close();
}
}
export namespace FileChunkSource {
/** {@link FileChunkSource} options. */
export type Options = ChunkOptions & {
/**
* Whether to use ZenFS.
*
* @remarks
* - Set `true` to use ZenFS, which is a cross-platform virtual filesystem independent from
* the underlying operating system. This is the default in browser environment.
* - Set `false` to use Node.js native filesystem. This is the default in Node.js.
*/
zenfs?: boolean;
};
}

This file was deleted.

2 changes: 1 addition & 1 deletion pkg/segmented-object/src/serve/chunk-source/mod.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
export type { Chunk, ChunkSource, ChunkOptions } from "./common";
export * from "./blob";
export * from "./buffer";
export * from "./file_node";
export * from "./file";
export * from "./iterable";
export * from "./make";
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { promises as zenfs } from "@zenfs/core";

export function fsOpen(path: string): Promise<zenfs.FileHandle> {
return zenfs.open(path, "r");
}
12 changes: 12 additions & 0 deletions pkg/segmented-object/src/serve/chunk-source/platform_node.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import fs from "node:fs/promises";

import { promises as zenfs } from "@zenfs/core";

import type { FileChunkSource } from "./file";

export function fsOpen(path: string, opts: FileChunkSource.Options): Promise<fs.FileHandle> {
if (opts.zenfs) {
return zenfs.open(path, "r");
}
return fs.open(path, "r");
}
17 changes: 15 additions & 2 deletions pkg/segmented-object/tests/serve-fetch.t.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import "@ndn/util/test-fixture/expect";

// eslint-disable-next-line n/no-unsupported-features/node-builtins
import { Blob } from "node:buffer";
import path from "node:path";

import { consume } from "@ndn/endpoint";
import { Forwarder } from "@ndn/fw";
Expand All @@ -11,6 +12,7 @@ import { Segment2, Segment3 } from "@ndn/naming-convention2";
import { FwHint, Name, type Verifier } from "@ndn/packet";
import { Closers, delay } from "@ndn/util";
import { makeTmpDir } from "@ndn/util/test-fixture/tmp";
import { promises as zenfs } from "@zenfs/core";
import { BufferReadableMock, BufferWritableMock } from "stream-mock";
import { collect, consume as consumeIterable } from "streaming-iterables";
import { afterEach, beforeAll, beforeEach, describe, expect, test, vi } from "vitest";
Expand All @@ -22,6 +24,11 @@ const fwOpts: Forwarder.Options = { dataNoTokenMatch: false };
let sFaces: SnapshotFaces;
const closers = new Closers();
const objectBody = makeObjectBody();
const zenfsFilename = "/serve-fetch-test/1.bin";
beforeAll(async () => {
await zenfs.mkdir(path.dirname(zenfsFilename));
await zenfs.writeFile(zenfsFilename, objectBody);
});
beforeEach(() => {
Forwarder.replaceDefault(Forwarder.create(fwOpts));
sFaces = new SnapshotFaces();
Expand Down Expand Up @@ -96,6 +103,14 @@ describe("file source", () => {
});
});

test("zenfs to buffer", async () => {
const server = serve("/R", new FileChunkSource(zenfsFilename, { zenfs: true }));
closers.push(server);

const fetched = fetch("/R");
await expect(fetched).resolves.toEqualUint8Array(objectBody);
});

test("iterable to unordered", async () => {
const chunkSource = new IterableChunkSource((async function*() {
const yieldSizes = [5000, 7000, 20000];
Expand Down Expand Up @@ -201,9 +216,7 @@ test("segment number convention mismatch", async () => {
const server = serve("/R", new BufferChunkSource(objectBody), { segmentNumConvention: Segment2 });
closers.push(server);

const sFaces = new SnapshotFaces();
await expect(fetch("/R", { retxLimit: 1 })).rejects.toThrow();
sFaces.expectSameFaces();
});

test("abort", async () => {
Expand Down

0 comments on commit 4c2106a

Please sign in to comment.