Skip to content

Commit

Permalink
repo-api: DataArray
Browse files Browse the repository at this point in the history
  • Loading branch information
yoursunny committed Jun 30, 2024
1 parent d15a2a6 commit 8deae9e
Show file tree
Hide file tree
Showing 15 changed files with 167 additions and 59 deletions.
3 changes: 2 additions & 1 deletion pkg/endpoint/src/data-buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ interface DataStore {
insert: (opts: { expireTime?: number }, ...pkts: readonly Data[]) => Promise<void>;
}

/** DataBuffer implementation based on `DataStore` from `@ndn/repo` package. */
/** DataBuffer implementation based on `@ndn/repo`. */
export class DataStoreBuffer implements DataBuffer {
/* eslint-disable tsdoc/syntax -- tsdoc-missing-reference */
/**
Expand All @@ -28,6 +28,7 @@ export class DataStoreBuffer implements DataBuffer {
* `DataStore` is declared as an interface instead of importing, in order to reduce bundle size
* for webapps that do not use DataBuffer. The trade-off is that, applications wanting to use
* DataBuffer would have to import `@ndn/repo` themselves.
* Note: {@link \@ndn/repo-api!DataArray} is insufficient because it lacks `expireTime` option.
*/
/* eslint-enable tsdoc/syntax */
constructor(public readonly store: DataStore, {
Expand Down
4 changes: 2 additions & 2 deletions pkg/pyrepo/tests/prps.t.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { PrpsPublisher, PrpsSubscriber } from "..";

afterEach(Forwarder.deleteDefault);

test("pubsub", { timeout: 10000 }, async () => {
test("pubsub", { timeout: 10000, retry: 1 }, async () => {
const star = Bridge.star({
leaves: 3,
relayBA: { delay: 6, jitter: 0.1 },
Expand Down Expand Up @@ -45,7 +45,7 @@ test("pubsub", { timeout: 10000 }, async () => {
sub.addEventListener("update", ({ detail: data }) => {
expect(data.content).toHaveLength(2);
expect(data.content[0]).toBe(0xDD);
nums.push(data.content[1]);
nums.push(data.content[1]!);
});
return [sub, nums] as const;
};
Expand Down
4 changes: 3 additions & 1 deletion pkg/repo-api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ This package defines the programming interface of an abstract Data repository, a
## DataStore

**DataStore** is a group of interfaces that defines the API of a Data repository.
`DataStore` type of `@ndn/repo` package is an implementation of these interfaces.

* `get(name)` retrieves Data by exact name.
* `find(interest)` finds Data that satisfies Interest.
Expand All @@ -16,6 +15,9 @@ This package defines the programming interface of an abstract Data repository, a
* `insert(options?, ...pkts)` inserts Data packets.
* `delete(...names)` deletes Data packets.

**DataArray** from this package is a minimal but inefficient implementation of these interfaces.
`DataStore` type from `@ndn/repo` package is a full implementation with much larger code size.

## DataTape

**DataTape** is a file or stream that consists of a sequence of Data packets.
Expand Down
2 changes: 2 additions & 0 deletions pkg/repo-api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
"wait-your-turn": "^1.0.1"
},
"devDependencies": {
"@ndn/fw": "workspace:*",
"@ndn/node-transport": "workspace:*",
"@ndn/segmented-object": "workspace:*",
"stream-mock": "^2.0.5",
"type-fest": "^4.20.1"
}
Expand Down
50 changes: 50 additions & 0 deletions pkg/repo-api/src/data-array.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import type { Data, Interest, Name } from "@ndn/packet";
import { collect, filter, map, pipeline, take } from "streaming-iterables";

import * as S from "./data-store";

/**
* Data packet storage based on array.
*
* @remarks
* This is a minimal implementation of DataStore interfaces. It has small code size but every
* operation has O(N) time complexity.
*/
export class DataArray implements S.ListNames, S.ListData, S.Get, S.Find, S.Insert, S.Delete {
private array: Data[] = [];

public listNames(prefix?: Name): AsyncIterable<Name> {
return map((data) => data.name, this.listData(prefix));
}

public async *listData(prefix?: Name): AsyncIterable<Data> {
if (prefix === undefined) {
yield* this.array;
} else {
yield* filter((data) => prefix.isPrefixOf(data.name), this.array);
}
}

public async get(name: Name): Promise<Data | undefined> {
return this.array.find((data) => name.equals(data.name));
}

public async find(interest: Interest): Promise<Data | undefined> {
const [found] = await pipeline(
() => this.array,
filter((data: Data) => data.canSatisfy(interest)),
take(1),
collect,
);
return found;
}

public async insert(...args: S.Insert.Args<{}>): Promise<void> {
const { pkts } = S.Insert.parseArgs<{}>(args);
this.array.push(...await collect(pkts));
}

public async delete(...names: readonly Name[]): Promise<void> {
this.array = this.array.filter((data) => !names.some((name) => name.equals(data.name)));
}
}
20 changes: 9 additions & 11 deletions pkg/repo-api/src/data-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ export interface Insert<Options extends {} = {}> {
insert: (...args: Insert.Args<Options>) => Promise<void>;
}
export namespace Insert {
export type Args<O extends {}> = [...(object extends O ? [O] | [] : []), ...ReadonlyArray<Data | AnyIterable<Data>>];
type Tail = ReadonlyArray<Data | AnyIterable<Data>>;
export type Args<O extends {}> = [...(object extends O ? [O] | [] : []), ...Tail];

export interface ParsedArgs<O> {
readonly opts?: O;
Expand All @@ -50,18 +51,19 @@ export namespace Insert {
/** Normalize {@link Insert.insert} arguments. */
export function parseArgs<O extends {}>(args: Args<O>): ParsedArgs<O> {
let opts: O | undefined;
if (args.length > 0 && !(args[0] instanceof Data) && !isDataIterable(args[0])) {
if (args.length > 0 && !(args[0] instanceof Data || (args[0] as Iterable<Data>)[Symbol.iterator] ||
(args[0] as AsyncIterable<Data>)[Symbol.asyncIterator])) {
opts = args.shift() as O;
}
return {
opts,
get pkts() {
return (async function*() {
for (const a of args) {
if (isDataIterable(a)) {
yield* a;
for (const a of args as Tail) {
if (a instanceof Data) {
yield a;
} else {
yield a as Data;
yield* a;
}
}
})();
Expand All @@ -70,14 +72,10 @@ export namespace Insert {
return args.filter((a) => a instanceof Data);
},
get batches() {
return args.filter(isDataIterable);
return args.filter((a): a is AnyIterable<Data> => !(a instanceof Data));
},
};
}

function isDataIterable(obj: any): obj is AnyIterable<Data> {
return !!obj && (!!(obj as Iterable<Data>)[Symbol.iterator] || !!(obj as AsyncIterable<Data>)[Symbol.asyncIterator]);
}
}

/** DataStore interface, delete method. */
Expand Down
1 change: 1 addition & 0 deletions pkg/repo-api/src/mod.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
export * from "./bulk-insert-initiator";
export * from "./bulk-insert-target";
export * from "./copy";
export * from "./data-array";
export * from "./data-tape";
export * from "./respond-rdr";

Expand Down
65 changes: 65 additions & 0 deletions pkg/repo-api/test-fixture/data-store.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import "@ndn/packet/test-fixture/expect";

import { Forwarder } from "@ndn/fw";
import { Data, Interest, Name, type NameLike } from "@ndn/packet";
import { BufferChunkSource, fetch, serve } from "@ndn/segmented-object";
import { makeObjectBody } from "@ndn/segmented-object/test-fixture/object-body";
import { collect, map } from "streaming-iterables";
import { expect } from "vitest";

import type { DataStore as S } from "..";

export async function testDataStoreBasic(store: S.Insert & Partial<S.ListNames & S.ListData & S.Get & S.Find & S.Delete>): Promise<void> {
const checkNames = async (prefix: NameLike | undefined, positive: Iterable<NameLike>, negative: Iterable<NameLike> = []) => {
prefix = prefix ? Name.from(prefix) : undefined;
if (store.listNames) {
await expect(collect(store.listNames(prefix))).resolves.toEqualNames(positive);
}
if (store.listData) {
await expect(collect(map((data) => data.name, store.listData(prefix))))
.resolves.toEqualNames(positive);
}
if (store.get) {
for (const name of positive) {
await expect(store.get(Name.from(name))).resolves.toHaveName(name);
}
for (const name of negative) {
await expect(store.get(Name.from(name))).resolves.toBeUndefined();
}
}
if (store.find) {
for (const name of positive) {
await expect(store.find(new Interest(name))).resolves.toHaveName(name);
}
for (const name of negative) {
await expect(store.find(new Interest(name))).resolves.toBeUndefined();
}
}
};

await store.insert(new Data("/A/1"), new Data("/A/2"));
await checkNames(undefined, ["/A/1", "/A/2"], ["/B/1", "/B/2"]);

await store.insert(new Data("/B/3"), (async function*() {
yield new Data("/B/2");
yield new Data("/B/1");
})());
await checkNames(undefined, ["/A/1", "/A/2", "/B/1", "/B/2", "/B/3"]);
await checkNames("/A", ["/A/1", "/A/2"]);
await checkNames("/B", ["/B/1", "/B/2", "/B/3"]);

if (!store.delete) {
return;
}
await store.delete(new Name("/A/0"), new Name("/A/1"), new Name("/B/3"));
await checkNames(undefined, ["/A/2", "/B/1", "/B/2"], ["/A/0", "/B/3"]);
}

export async function testDataStoreSegmentedObject(store: S.ListNames & S.Insert): Promise<void> {
const fw = Forwarder.create();
const body = makeObjectBody(500 * 25);
const producer = serve("/S", new BufferChunkSource(body, { chunkSize: 500 }), { pOpts: { fw } });
await store.insert(fetch("/S", { fw }));
producer.close();
await expect(collect(store.listNames())).resolves.toHaveLength(25);
}
16 changes: 16 additions & 0 deletions pkg/repo-api/tests/data-array.t.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import "@ndn/packet/test-fixture/expect";

import { test } from "vitest";

import { DataArray } from "..";
import { testDataStoreBasic, testDataStoreSegmentedObject } from "../test-fixture/data-store";

test("basic", async () => {
const store = new DataArray();
await testDataStoreBasic(store);
});

test("segmented object", async () => {
const store = new DataArray();
await testDataStoreSegmentedObject(store);
});
4 changes: 2 additions & 2 deletions pkg/repo-cli/src/fill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { exitClosers } from "@ndn/cli-common";
import { SequenceNum } from "@ndn/naming-convention2";
import { TcpTransport } from "@ndn/node-transport";
import { Data, Name } from "@ndn/packet";
import { BulkInsertInitiator, type DataStore } from "@ndn/repo-api";
import { BulkInsertInitiator, type DataStore as S } from "@ndn/repo-api";
import { crypto } from "@ndn/util";
import ProgressBar from "progress";
import { batch, consume, pipeline, tap, transform } from "streaming-iterables";
Expand Down Expand Up @@ -65,7 +65,7 @@ const baseOptions = {

type BaseArgs = InferredOptionTypes<typeof baseOptions>;

async function execute(args: BaseArgs, store: DataStore.Insert) {
async function execute(args: BaseArgs, store: S.Insert) {
const progress = args.progress ?
new ProgressBar(":bar :current/:total :rateD/s :elapseds ETA:etas", { total: args.count }) :
undefined;
Expand Down
3 changes: 1 addition & 2 deletions pkg/repo/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
"@ndn/fw": "workspace:*",
"@ndn/keychain": "workspace:*",
"@ndn/naming-convention2": "workspace:*",
"@ndn/rdr": "workspace:*",
"@ndn/segmented-object": "workspace:*"
"@ndn/rdr": "workspace:*"
}
}
38 changes: 7 additions & 31 deletions pkg/repo/tests/data-store.t.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import "@ndn/packet/test-fixture/expect";

import { Forwarder } from "@ndn/fw";
import { Data, Interest, Name } from "@ndn/packet";
import { BufferChunkSource, fetch, serve } from "@ndn/segmented-object";
import { makeObjectBody } from "@ndn/segmented-object/test-fixture/object-body";
import { testDataStoreBasic, testDataStoreSegmentedObject } from "@ndn/repo-api/test-fixture/data-store";
import { Closers, delay } from "@ndn/util";
import { makeTmpDir } from "@ndn/util/test-fixture/tmp";
import { collect, map } from "streaming-iterables";
import { afterEach, describe, expect, test, vi } from "vitest";
import { afterEach, expect, test, vi } from "vitest";

import { type DataStore, makeInMemoryDataStore, makePersistentDataStore } from "..";

Expand All @@ -24,37 +22,15 @@ const TABLE: readonly Row[] = [
}],
];

test.each(TABLE)("insert get delete %s", async (desc, openDataStore) => {
test.each(TABLE)("basic %s", async (desc, openDataStore) => {
void desc;
await using store = await openDataStore();

await store.insert(new Data("/A/1"), new Data("/A/2"));
await expect(store.get(new Name("/A/0"))).resolves.toBeUndefined();
await expect(store.get(new Name("/A/1"))).resolves.toHaveName("/A/1");
await expect(store.get(new Name("/A/2"))).resolves.toHaveName("/A/2");

const dataA1 = await store.get(new Name("/A/1"));
await store.insert(dataA1!);

await store.delete(new Name("/A/0"), new Name("/A/1"));
await expect(store.get(new Name("/A/0"))).resolves.toBeUndefined();
await expect(store.get(new Name("/A/1"))).resolves.toBeUndefined();
await expect(store.get(new Name("/A/2"))).resolves.toHaveName("/A/2");
await testDataStoreBasic(store);
});

describe.each(TABLE)("segmented object %s", (desc, openDataStore) => {
void desc;
afterEach(Forwarder.deleteDefault);

test("insert", async () => {
await using store = await openDataStore();

const body = makeObjectBody(500 * 25);
const producer = serve("/S", new BufferChunkSource(body, { chunkSize: 500 }));
await store.insert(fetch("/S"));
producer.close();
await expect(collect(store.listNames())).resolves.toHaveLength(25);
});
test.each(TABLE)("segmented object %s", async (desc, openDataStore) => {
await using store = await openDataStore();
await testDataStoreSegmentedObject(store);
});

test.each(TABLE)("list find expire %s", async (desc, openDataStore) => {
Expand Down
1 change: 0 additions & 1 deletion pkg/svs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
"devDependencies": {
"@ndn/keychain": "workspace:*",
"@ndn/l3face": "workspace:*",
"@ndn/repo": "workspace:*",
"mnemonist": "^0.39.8",
"p-defer": "^4.0.1"
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/svs/src/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,9 @@ export namespace SvPublisher {
* Data repository used by publisher.
*
* @remarks
* {@link \@ndn/repo!DataStore} satisfies the requirement.
* Other lightweight implementations may be possible.
* Possible implementations include but are not limited to:
* - {@link \@ndn/repo!DataStore} (faster, disk-persistency option, larger code size)
* - {@link \@ndn/repo-api!DataArray} (slower, in-memory only, smaller code size)
*/
/* eslint-enable tsdoc/syntax */
export type DataStore = S.Get & S.Find & S.Insert;
Expand Down
10 changes: 4 additions & 6 deletions pkg/svs/tests/svsps.t.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import "@ndn/packet/test-fixture/expect";
import { Forwarder } from "@ndn/fw";
import { generateSigningKey } from "@ndn/keychain";
import { Name, type NameLike } from "@ndn/packet";
import { makeInMemoryDataStore } from "@ndn/repo";
import { DataArray } from "@ndn/repo-api";
import { Closers, console, crypto, delay } from "@ndn/util";
import pDefer from "p-defer";
import { afterEach, beforeAll, expect, test, vi } from "vitest";
Expand Down Expand Up @@ -108,9 +108,8 @@ test("simple", { timeout: 20000 }, async () => {
const syncD = await SvSync.create({ ...syncOpts, describe: "D" });
closers.push(syncA, syncB, syncC, syncD);

const repoA = await makeInMemoryDataStore();
const repoB = await makeInMemoryDataStore();
closers.push(repoA, repoB);
const repoA = new DataArray();
const repoB = new DataArray();

const pubA0 = new SvPublisher({ ...pubOpts, sync: syncA, id: new Name("/0"), store: repoA });
const pubA1 = new SvPublisher({ ...pubOpts, sync: syncA, id: new Name("/1"), store: repoA });
Expand Down Expand Up @@ -151,8 +150,7 @@ test("timed", { timeout: 20000 }, async () => {
const syncB = await SvSync.create({ ...syncOpts, describe: "B" });
closers.push(syncA, syncB);

const repoA = await makeInMemoryDataStore();
closers.push(repoA);
const repoA = new DataArray();

const pubP = new SvPublisher({ ...pubOpts, sync: syncA, id: new Name("/P"), store: repoA });
const pubQ = new SvPublisher({ ...pubOpts, sync: syncA, id: new Name("/Q"), store: repoA });
Expand Down

0 comments on commit 8deae9e

Please sign in to comment.