From 9f6ba36caa70f511d940e4015829e5e257c2ec9c Mon Sep 17 00:00:00 2001 From: Junxiao Shi Date: Tue, 6 Feb 2024 12:22:19 +0000 Subject: [PATCH] repo-external: rewrite PyRepoClient https://github.com/UCLA-IRL/ndn-python-repo/issues/60 --- pkg/endpoint/src/producer.ts | 6 +- pkg/repo-external/README.md | 26 +++-- pkg/repo-external/src/prps/packet.ts | 49 ++++----- pkg/repo-external/src/prps/publisher.ts | 83 +++++++------- pkg/repo-external/src/prps/subscriber.ts | 47 ++++---- pkg/repo-external/src/pyrepo/client.ts | 131 +++++++++++------------ pkg/repo-external/src/pyrepo/packet.ts | 125 +++++++++++---------- pkg/repo-external/src/pyrepo/store.ts | 16 ++- pkg/tlv/src/ev-decoder.ts | 10 +- pkg/tlv/src/struct-builder.ts | 16 ++- pkg/tlv/tests/struct-builder.t.ts | 12 ++- 11 files changed, 291 insertions(+), 230 deletions(-) diff --git a/pkg/endpoint/src/producer.ts b/pkg/endpoint/src/producer.ts index c7e318f0..943fd49f 100644 --- a/pkg/endpoint/src/producer.ts +++ b/pkg/endpoint/src/producer.ts @@ -97,7 +97,7 @@ export interface ProducerOptions { } /** A running producer. */ -export interface Producer { +export interface Producer extends Disposable { /** * Prefix specified in {@link Endpoint.produce} call. * Additional prefixes can be added via `.face.addRoute()`. @@ -214,6 +214,10 @@ export class ProducerImpl implements Producer { this.face.close(); this.signal?.removeEventListener("abort", this.close); }; + + public [Symbol.dispose](): void { + this.close(); + } } export async function signUnsignedData(data: Data, dataSigner: Signer | undefined) { diff --git a/pkg/repo-external/README.md b/pkg/repo-external/README.md index d127652b..8ba470b1 100644 --- a/pkg/repo-external/README.md +++ b/pkg/repo-external/README.md @@ -6,16 +6,28 @@ This package allows inserting and deleting Data in [ndn-python-repo](https://git `PyRepoClient` type is a client for [ndn-python-repo protocol](https://github.com/UCLA-IRL/ndn-python-repo/tree/v0.2a5/docs/src/specification). `PyRepoStore` type implements a subset of `DataStore` interfaces defined in `@ndn/repo-api` package. -This implementation is compatible with ndn-python-repo version 0.2a5. -Newer versions of ndn-python-repo are not supported due to [ndn-python-repo issue #60](https://github.com/UCLA-IRL/ndn-python-repo/issues/60). -To install the specified version, run: +This implementation is compatible with ndn-python-repo `dda1dce1` (2024-02-04). +To install and start the specified version, run: ```bash -pip install ndn-python-repo==0.2a5 python-ndn==0.2b2.post1 +# create Python virtual environment +python3 -m venv pyrepo-venv +cd pyrepo-venv +source ./bin/activate + +# install ndn-python-repo +pip install git+https://github.com/UCLA-IRL/ndn-python-repo@dda1dce135a952498a2a79d3cddf9c3ee33399d0 + +# run ndn-python-repo +export NDN_CLIENT_TRANSPORT=unix:///run/nfd/nfd.sock +ndn-python-repo ``` -As tested on 2023-02-16, this version of ndn-python-repo is no longer compatible with the latest NFD and NDNts. -This is caused by changes in Interest ForwardingHint encoding. +Current implementation is unoptimized. +In particular, each insertion/deletion command is individually sent to the repo. +Neither segmented nor bundled operation would be used. + +There are not tests, other than the demo below. ```ts import { PyRepoStore } from "@ndn/repo-external"; @@ -44,7 +56,7 @@ const store = new PyRepoStore({ }); const packets: Data[] = []; -for (let i = 0; i < 1; ++i) { +for (let i = 0; i < 100; ++i) { const data = new Data(dataPrefix.append(`${i}`)); data.freshnessPeriod = 1; await digestSigning.sign(data); diff --git a/pkg/repo-external/src/prps/packet.ts b/pkg/repo-external/src/prps/packet.ts index 1cd282bd..37f892dc 100644 --- a/pkg/repo-external/src/prps/packet.ts +++ b/pkg/repo-external/src/prps/packet.ts @@ -1,35 +1,28 @@ -import { Component, Name, TT as l3TT } from "@ndn/packet"; -import { type Decoder, Encoder, EvDecoder } from "@ndn/tlv"; +import { Component, FwHint, Interest, type Name, StructFieldName, StructFieldNameNested, TT as l3TT } from "@ndn/packet"; +import { EvDecoder, StructBuilder, StructFieldBytes } from "@ndn/tlv"; export const MsgSuffix = Component.from("msg"); export const NotifySuffix = Component.from("notify"); -const TT = { - NotifyNonce: 0x80, - PublisherFwHint: 0xD3, -} as const; - -const EVD = new EvDecoder("NotifyParams") - .add(l3TT.Name, (t, { decoder }) => t.publisher = decoder.decode(Name)) - .add(TT.NotifyNonce, (t, { value }) => t.nonce = value) - .add(TT.PublisherFwHint, (t, { vd }) => t.publisherFwHint = vd.decode(Name)); - -export class NotifyParams { - public static decodeFrom(decoder: Decoder): NotifyParams { - return EVD.decodeValue(new NotifyParams(new Name(), new Uint8Array()), decoder); - } - - constructor( - public publisher: Name, - public nonce: Uint8Array, - public publisherFwHint?: Name, - ) {} +const enum TT { + NotifyNonce = 0x80, + PublisherFwHint = 0xD3, +} - public encodeTo(encoder: Encoder) { - encoder.prependValue( - this.publisher, - [TT.NotifyNonce, this.nonce], - [TT.PublisherFwHint, Encoder.OmitEmpty, this.publisherFwHint], - ); +const buildNotifyAppParam = new StructBuilder("NotifyAppParam") + .add(l3TT.Name, "publisher", StructFieldName, { required: true }) + .add(TT.NotifyNonce, "nonce", StructFieldBytes, { required: true }) + .add(TT.PublisherFwHint, "publisherFwHint", StructFieldNameNested) + .setIsCritical(EvDecoder.alwaysCritical); +/** ndn-python-repo PubSub NotifyAppParam struct. */ +export class NotifyAppParam extends buildNotifyAppParam.baseClass() { + /** Create a message Interest from enclosed publisher information. */ + public makeMsgInterest(topic: Name): Interest { + const interest = new Interest(); + interest.name = this.publisher.append( + MsgSuffix, ...topic.comps, new Component(undefined, this.nonce)); + interest.fwHint = this.publisherFwHint && new FwHint(this.publisherFwHint); + return interest; } } +buildNotifyAppParam.subclass = NotifyAppParam; diff --git a/pkg/repo-external/src/prps/publisher.ts b/pkg/repo-external/src/prps/publisher.ts index 8f6472b8..4098d41a 100644 --- a/pkg/repo-external/src/prps/publisher.ts +++ b/pkg/repo-external/src/prps/publisher.ts @@ -1,12 +1,21 @@ import { Endpoint, type Producer, type ProducerHandler, type RetxPolicy } from "@ndn/endpoint"; import { SequenceNum } from "@ndn/naming-convention2"; -import { Component, Data, digestSigning, Interest, Name, NameMap, type Signer } from "@ndn/packet"; +import { Component, Data, digestSigning, Interest, Name, NameMap, SignedInterestPolicy, type Signer } from "@ndn/packet"; import { type Encodable, Encoder } from "@ndn/tlv"; -import { asDataView } from "@ndn/util"; +import { crypto } from "@ndn/util"; -import { MsgSuffix, NotifyParams, NotifySuffix } from "./packet"; +import { MsgSuffix, NotifyAppParam, NotifySuffix } from "./packet"; -/** PyRepo PubSub protocol publisher. */ +type Item = Encodable | PrpsPublisher.PublicationCallback; + +interface Pending { + topic: Name; + item: Item; +} + +const notifySIP = new SignedInterestPolicy(SignedInterestPolicy.Nonce()); + +/** ndn-python-repo PubSub protocol publisher. */ export class PrpsPublisher { constructor({ endpoint = new Endpoint(), @@ -23,8 +32,7 @@ export class PrpsPublisher { this.pubSigner = pubSigner; this.notifyInterestLifetime = notifyInterestLifetime; this.notifyRetx = notifyRetx; - this.messagePrefix = pubPrefix.append(MsgSuffix); - this.messageProducer = this.endpoint.produce(this.messagePrefix, this.handleMessageInterest, { + this.msgProducer = this.endpoint.produce(pubPrefix.append(MsgSuffix), this.handleMsgInterest, { describe: `prps-pub(${pubPrefix})`, announcement: pubAnnouncement ?? pubFwHint ?? pubPrefix, }); @@ -36,36 +44,36 @@ export class PrpsPublisher { private readonly pubSigner: Signer; private readonly notifyInterestLifetime: number; private readonly notifyRetx: RetxPolicy; - private readonly messagePrefix: Name; - private readonly messageProducer: Producer; + private readonly msgProducer: Producer; private readonly pendings = new NameMap(); public close(): void { - this.messageProducer.close(); + this.msgProducer.close(); } public async publish(topic: Name, item: Item): Promise { - const notifyNonce = new Uint8Array(4); - const notifyNonceDataView = asDataView(notifyNonce); + const notifyNonce = new Uint8Array(8); let key: Name; do { - notifyNonceDataView.setUint32(0, Math.random() * 0xFFFFFFFF); - key = this.messagePrefix.append(...topic.comps, new Component(undefined, notifyNonce)); + crypto.getRandomValues(notifyNonce); + key = this.pubPrefix.append(MsgSuffix, ...topic.comps, new Component(undefined, notifyNonce)); } while (this.pendings.has(key)); - this.pendings.set(key, { - topic, - item, - }); + this.pendings.set(key, { topic, item }); + + const notifyParam = new NotifyAppParam(); + notifyParam.publisher = this.pubPrefix; + notifyParam.nonce = notifyNonce; + notifyParam.publisherFwHint = this.pubFwHint; + const notifyInterest = new Interest(); + notifyInterest.name = topic.append(NotifySuffix); + notifyInterest.lifetime = this.notifyInterestLifetime; + notifyInterest.appParameters = Encoder.encode(notifyParam); + notifySIP.update(notifyInterest, this); + await digestSigning.sign(notifyInterest); try { - const notify = new Interest(); - notify.name = topic.append(NotifySuffix); - notify.lifetime = this.notifyInterestLifetime; - notify.appParameters = Encoder.encode( - new NotifyParams(this.pubPrefix, notifyNonce, this.pubFwHint)); - await notify.updateParamsDigest(); - await this.endpoint.consume(notify, { + await this.endpoint.consume(notifyInterest, { describe: `prps-notify(${this.pubPrefix} ${topic})`, retx: this.notifyRetx, }); @@ -74,7 +82,7 @@ export class PrpsPublisher { } } - private handleMessageInterest: ProducerHandler = async (interest) => { + private readonly handleMsgInterest: ProducerHandler = async (interest) => { const pending = this.pendings.get(interest.name); if (!pending) { return undefined; @@ -102,21 +110,18 @@ export class PrpsPublisher { }; } -type Item = Encodable | PrpsPublisher.PublicationCallback; - -interface Pending { - topic: Name; - item: Item; -} - export namespace PrpsPublisher { export interface Options { - /** Endpoint for communication. */ + /** + * Endpoint for communication. + * @defaultValue + * Endpoint on default logical forwarder. + */ endpoint?: Endpoint; /** * Name prefix of the local application. - * Default is a random local name that only works when the subscriber is on local machine. + * @defaultValue "/localhost" + random-suffix */ pubPrefix?: Name; @@ -125,14 +130,16 @@ export namespace PrpsPublisher { /** * Prefix announcement to receive msg Interests. - * Default is pubFwHint, or pubPrefix. + * @defaultValue `.pubFwHint ?? .pubPrefix` */ pubAnnouncement?: Name | false; /** * Key to sign publications. + * @defaultValue `digestSigning` + * + * @remarks * This key should be trusted to sign objects under pubPrefix. - * Default is digest signing. * This may overridden on a per-publication basis by PublicationCallback returning Data. */ pubSigner?: Signer; @@ -142,7 +149,7 @@ export namespace PrpsPublisher { /** * Retransmission policy of notify Interests. - * Default is 2 retransmissions. + * @defaultValue 2 retransmissions */ notifyRetx?: RetxPolicy; } @@ -151,7 +158,7 @@ export namespace PrpsPublisher { * A callback function to generate publication packet. * @param name - Expected Data name. * @param topic - Topic name. - * @returns either a Data that is already signed, or an Encodable object to use as publication body. + * @returns Either a Data that is already signed, or an Encodable to use as publication body. */ export type PublicationCallback = (name: Name, topic: Name) => Promise; } diff --git a/pkg/repo-external/src/prps/subscriber.ts b/pkg/repo-external/src/prps/subscriber.ts index 1bfa3583..70906e3d 100644 --- a/pkg/repo-external/src/prps/subscriber.ts +++ b/pkg/repo-external/src/prps/subscriber.ts @@ -1,11 +1,11 @@ import { Endpoint, type Producer, type ProducerHandler, type RetxPolicy } from "@ndn/endpoint"; -import { Component, Data, digestSigning, FwHint, Interest, type Name, type Signer, type Verifier } from "@ndn/packet"; +import { Data, digestSigning, Interest, type Name, type Signer, type Verifier } from "@ndn/packet"; import { Decoder } from "@ndn/tlv"; import { pushable } from "@ndn/util"; -import { MsgSuffix, NotifyParams, NotifySuffix } from "./packet"; +import { NotifyAppParam, NotifySuffix } from "./packet"; -/** PyRepo PubSub protocol subscriber. */ +/** ndn-python-repo PubSub protocol subscriber. */ export class PrpsSubscriber { constructor({ endpoint = new Endpoint(), @@ -68,24 +68,21 @@ class Subscription implements PrpsSubscriber.Subscription { private notifyProducer: Producer; private readonly messages = pushable(); - private handleNotifyInterest: ProducerHandler = async (interest) => { - if (interest.name.length !== this.notifyPrefix.length + 1 || !interest.appParameters) { + private readonly handleNotifyInterest: ProducerHandler = async (interest) => { + if (interest.name.length <= this.notifyPrefix.length || !interest.appParameters) { return undefined; } - const { publisher, nonce, publisherFwHint } = Decoder.decode(interest.appParameters, NotifyParams); - const messageInterest = new Interest(publisher.append( - MsgSuffix, ...this.topic.comps, new Component(undefined, nonce))); - if (publisherFwHint) { - messageInterest.fwHint = new FwHint(publisherFwHint); - } - messageInterest.lifetime = this.msgInterestLifetime; - const messageData = await this.endpoint.consume(messageInterest, { - describe: `prps-msg(${this.topic} ${publisher})`, + const notifyParam = Decoder.decode(interest.appParameters, NotifyAppParam); + const msgInterest = notifyParam.makeMsgInterest(this.topic); + msgInterest.lifetime = this.msgInterestLifetime; + + const msgData = await this.endpoint.consume(msgInterest, { + describe: `prps-msg(${this.topic} ${notifyParam.publisher})`, retx: this.msgRetx, verifier: this.pubVerifier, }); - this.messages.push(messageData); + this.messages.push(msgData); return new Data(interest.name); }; @@ -93,7 +90,11 @@ class Subscription implements PrpsSubscriber.Subscription { export namespace PrpsSubscriber { export interface Options { - /** Endpoint for communication. */ + /** + * Endpoint for communication. + * @defaultValue + * Endpoint on default logical forwarder. + */ endpoint?: Endpoint; /** InterestLifetime of msg Interests. */ @@ -101,22 +102,28 @@ export namespace PrpsSubscriber { /** * Retransmission policy of msg Interests. - * Default is 2 retransmissions. + * @defaultValue 2 retransmissions */ msgRetx?: RetxPolicy; /** * Verifier for publications. - * Default is no verification. + * @defaultValue no verification */ pubVerifier?: Verifier; - /** Set to false to disable prefix announcements for receiving notify Interests. */ + /** + * Set to false to disable prefix announcements for receiving notify Interests. + * + * @remarks + * This should be set only if the application already has a prefix announcement that covers + * the `topic` of each subscription. + */ subAnnouncement?: false; /** * Key to sign notify Data. - * Default is digest signing. + * @defaultValue `digestSigning` */ subSigner?: Signer; } diff --git a/pkg/repo-external/src/pyrepo/client.ts b/pkg/repo-external/src/pyrepo/client.ts index 7017a7c1..be240065 100644 --- a/pkg/repo-external/src/pyrepo/client.ts +++ b/pkg/repo-external/src/pyrepo/client.ts @@ -1,11 +1,12 @@ import { Endpoint } from "@ndn/endpoint"; -import { Component, type Data, type Name } from "@ndn/packet"; -import { Decoder } from "@ndn/tlv"; -import { asDataView, toHex } from "@ndn/util"; -import itKeepAlive from "it-keepalive"; +import { digestSigning, Interest, type Name, SignedInterestPolicy } from "@ndn/packet"; +import { Decoder, Encoder } from "@ndn/tlv"; +import { delay, sha256, toHex } from "@ndn/util"; -import { PrpsPublisher, PrpsSubscriber } from "../prps/mod"; -import { CheckVerb, CommandParameter, CommandResponse, DeleteVerb, InsertVerb } from "./packet"; +import { PrpsPublisher } from "../prps/mod"; +import { CommandParam, CommandRes, DeleteVerb, InsertVerb, ObjectParam, StatQuery, type Verb } from "./packet"; + +const checkSIP = new SignedInterestPolicy(SignedInterestPolicy.Nonce()); /** Client to interact with ndn-python-repo. */ export class PyRepoClient { @@ -14,108 +15,104 @@ export class PyRepoClient { this.repoPrefix = opts.repoPrefix; this.progressTimeout = opts.progressTimeout ?? 10000; this.publisher = new PrpsPublisher(opts); - this.subscriber = new PrpsSubscriber({ - subAnnouncement: false, - ...opts, - }); - this.endpoint.fw.nodeNames.push(this.publisher.pubPrefix); + this.fwHint = this.publisher.pubFwHint ?? this.publisher.pubPrefix; + this.endpoint.fw.nodeNames.push(this.fwHint); } public readonly endpoint: Endpoint; public readonly repoPrefix: Name; private readonly progressTimeout: number; private readonly publisher: PrpsPublisher; - private readonly subscriber: PrpsSubscriber; - private readonly ongoing = new Map(); + private readonly fwHint: Name; public close(): void { - const nodeNameIndex = this.endpoint.fw.nodeNames.findIndex((nodeName) => nodeName.equals(this.publisher.pubPrefix)); + const nodeNameIndex = this.endpoint.fw.nodeNames.findIndex((nodeName) => nodeName.equals(this.fwHint)); if (nodeNameIndex >= 0) { this.endpoint.fw.nodeNames.splice(nodeNameIndex, 1); } this.publisher.close(); - for (const sub of this.ongoing.values()) { - sub.close(); - } } public async insert(name: Name): Promise { - return this.execute(InsertVerb, new CommandParameter(name)); + return this.execute(InsertVerb, [this.makeObjectParam(name)]); } public async insertRange(name: Name, start: number, end = Infinity): Promise { - return this.execute(InsertVerb, new CommandParameter( - name, - start, - Number.isFinite(end) ? end : undefined, - )); + return this.execute(InsertVerb, [this.makeObjectParam(name, start, end)]); } public async delete(name: Name): Promise { - return this.execute(DeleteVerb, new CommandParameter(name)); + return this.execute(DeleteVerb, [this.makeObjectParam(name)]); } public async deleteRange(name: Name, start: number, end = Infinity): Promise { - return this.execute(DeleteVerb, new CommandParameter( - name, - start, - Number.isFinite(end) ? end : undefined, - )); + return this.execute(DeleteVerb, [this.makeObjectParam(name, start, end)]); + } + + private makeObjectParam(name: Name, start?: number, end?: number): ObjectParam { + const p = new ObjectParam(); + p.name = name; + if (start !== undefined) { + p.startBlockId = start; + } + if (Number.isFinite(end)) { + p.endBlockId = end; + } + p.fwHint = this.fwHint; + return p; } - private async execute(verb: Component, parameter: CommandParameter): Promise { - const id = new Uint8Array(4); - const idDataView = asDataView(id); - let key: string; - do { - idDataView.setUint32(0, Math.random() * 0xFFFFFFFF); - key = toHex(id); - } while (this.ongoing.has(key)); - parameter.processId = id; - parameter.checkPrefix = this.publisher.pubPrefix; - parameter.fwHint = this.publisher.pubPrefix; - - const checkTopic = parameter.checkPrefix.append(CheckVerb, new Component(undefined, id)); - const sub = this.subscriber.subscribe(checkTopic); - this.ongoing.set(key, sub); - try { - const commandTopic = this.repoPrefix.append(verb); - await this.publisher.publish(commandTopic, parameter); - - const subAlive = itKeepAlive( - () => false, - { timeout: this.progressTimeout }, - )(sub); - - for await (const data of subAlive) { - if (data === false) { - throw new Error("command timeout"); - } - const response = Decoder.decode(data.content, CommandResponse); - if (response.statusCode === 200) { - break; - } + private async execute(verb: Verb, objectParams: readonly ObjectParam[]): Promise { + const p = new CommandParam(); + p.objectParams.push(...objectParams); + const request = Encoder.encode(p); + const requestDigest = await sha256(request); + const requestDigestHex = toHex(requestDigest); + + await this.publisher.publish(this.repoPrefix.append(verb.action), request); + const checkParam = new StatQuery(); + checkParam.requestDigest = requestDigest; + + const t0 = Date.now(); + while (Date.now() < t0 + this.progressTimeout) { + const checkInterest = new Interest(); + checkInterest.name = this.repoPrefix.append(verb.check); + checkInterest.appParameters = Encoder.encode(checkParam); + checkSIP.update(checkInterest, this); + await digestSigning.sign(checkInterest); + + const checkData = await this.endpoint.consume(checkInterest, { + describe: `pyrepo-check(${this.repoPrefix} ${requestDigestHex})`, + }); + + const res = Decoder.decode(checkData.content, CommandRes); + if (res.statusCode >= 400) { + throw new Error(`RepoCommandRes ${res.statusCode}`); + } + if (res.statusCode === 200) { + return; } - } finally { - sub.close(); - this.ongoing.delete(key); + await delay(1000); } + throw new Error("command timeout"); } } export namespace PyRepoClient { - export interface Options extends PrpsPublisher.Options, PrpsSubscriber.Options { + export interface Options extends PrpsPublisher.Options { /** * Name prefix of the repo instance. - * This corresponds to ndn-python-repo.conf repo_config.repo_name key. + * + * @remarks + * This corresponds to **ndn-python-repo.conf** `.repo_config.repo_name` key. */ repoPrefix: Name; /** * Progress update timeout in milliseconds. * If no progress update is received for this period of time, the command is deemed failed. - * Default is 10 seconds. + * @defaultValue 10 seconds */ progressTimeout?: number; } diff --git a/pkg/repo-external/src/pyrepo/packet.ts b/pkg/repo-external/src/pyrepo/packet.ts index ad605e54..c6bfdde9 100644 --- a/pkg/repo-external/src/pyrepo/packet.ts +++ b/pkg/repo-external/src/pyrepo/packet.ts @@ -1,57 +1,74 @@ -import { Component, Name } from "@ndn/packet"; -import { type Decoder, Encoder, NNI } from "@ndn/tlv"; - -export const InsertVerb = Component.from("insert"); -export const DeleteVerb = Component.from("delete"); -export const CheckVerb = Component.from("check"); - -const TT = { - StartBlockId: 0xCC, - EndBlockId: 0xCD, - ProcessId: 0xCE, - StatusCode: 0xD0, - InsertNum: 0xD1, - DeleteNum: 0xD2, - ForwardingHint: 0xD3, - RegisterPrefix: 0xD4, - CheckPrefix: 0xD5, -} as const; - -export class CommandParameter { - constructor( - public name?: Name, - public startBlockId?: number, - public endBlockId?: number, - ) {} - - public processId = new Uint8Array(); - public fwHint?: Name; - public checkPrefix = new Name(); - - public encodeTo(encoder: Encoder) { - encoder.prependValue( - this.name, - [TT.ForwardingHint, Encoder.OmitEmpty, this.fwHint], - this.startBlockId !== undefined && [TT.StartBlockId, NNI(this.startBlockId)], - this.endBlockId !== undefined && [TT.EndBlockId, NNI(this.endBlockId)], - [TT.ProcessId, this.processId], - // [TT.RegisterPrefix], - [TT.CheckPrefix, this.checkPrefix], - ); - } +import { Component, StructFieldName, StructFieldNameNested, TT as l3TT } from "@ndn/packet"; +import { EvDecoder, StructBuilder, StructFieldBytes, StructFieldNNI, StructFieldType } from "@ndn/tlv"; + +export interface Verb { + action: Component; + check: Component; } -export class CommandResponse { - public static decodeFrom(decoder: Decoder): CommandResponse { - const t = new CommandResponse(); - while (!decoder.eof) { - const { type, nni } = decoder.read(); - if (type === TT.StatusCode) { - t.statusCode = nni; - } - } - return t; - } - - public statusCode = 0; +export const InsertVerb = { + action: new Component(undefined, "insert"), + check: new Component(undefined, "insert check"), +}; + +export const DeleteVerb = { + action: new Component(undefined, "delete"), + check: new Component(undefined, "delete check"), +}; + +const enum TT { + StartBlockId = 0xCC, + EndBlockId = 0xCD, + RequestNo = 0xCE, + StatusCode = 0xD0, + InsertNum = 0xD1, + DeleteNum = 0xD2, + ForwardingHint = 0xD3, + RegisterPrefix = 0xD4, + CheckPrefix = 0xD5, + ObjectParam = 0x12D, + ObjectResult = 0x12E, } + +const buildObjectParam = new StructBuilder("ObjectParam", TT.ObjectParam) + .add(l3TT.Name, "name", StructFieldName, { required: true }) + .add(TT.ForwardingHint, "fwHint", StructFieldNameNested) + .add(TT.StartBlockId, "startBlockId", StructFieldNNI) + .add(TT.EndBlockId, "endBlockId", StructFieldNNI) + .add(TT.RegisterPrefix, "registerPrefix", StructFieldNameNested) + .setIsCritical(EvDecoder.alwaysCritical); +/** ndn-python-repo ObjectParam struct. */ +export class ObjectParam extends buildObjectParam.baseClass() {} +buildObjectParam.subclass = ObjectParam; + +const buildObjectResult = new StructBuilder("ObjectResult", TT.ObjectResult) + .add(l3TT.Name, "name", StructFieldName, { required: true }) + .add(TT.StatusCode, "statusCode", StructFieldNNI, { required: true }) + .add(TT.InsertNum, "insertNum", StructFieldNNI) + .add(TT.DeleteNum, "deleteNum", StructFieldNNI) + .setIsCritical(EvDecoder.alwaysCritical); +/** ndn-python-repo ObjectResult struct. */ +export class ObjectResult extends buildObjectResult.baseClass() {} +buildObjectResult.subclass = ObjectResult; + +const buildCommandParam = new StructBuilder("RepoCommandParam") + .add(TT.ObjectParam, "objectParams", StructFieldType.wrap(ObjectParam), { repeat: true }) + .setIsCritical(EvDecoder.alwaysCritical); +/** ndn-python-repo RepoCommandParam struct. */ +export class CommandParam extends buildCommandParam.baseClass() {} +buildCommandParam.subclass = CommandParam; + +const buildCommandRes = new StructBuilder("RepoCommandRes") + .add(TT.StatusCode, "statusCode", StructFieldNNI, { required: true }) + .add(TT.ObjectResult, "objectResults", StructFieldType.wrap(ObjectResult), { repeat: true }) + .setIsCritical(EvDecoder.alwaysCritical); +/** ndn-python-repo RepoCommandRes struct. */ +export class CommandRes extends buildCommandRes.baseClass() {} +buildCommandRes.subclass = CommandRes; + +const buildStatQuery = new StructBuilder("RepoStatQuery") + .add(TT.RequestNo, "requestDigest", StructFieldBytes, { required: true }) + .setIsCritical(EvDecoder.alwaysCritical); +/** ndn-python-repo RepoStatQuery struct. */ +export class StatQuery extends buildStatQuery.baseClass() {} +buildStatQuery.subclass = StatQuery; diff --git a/pkg/repo-external/src/pyrepo/store.ts b/pkg/repo-external/src/pyrepo/store.ts index ed1e408f..949c32dc 100644 --- a/pkg/repo-external/src/pyrepo/store.ts +++ b/pkg/repo-external/src/pyrepo/store.ts @@ -10,10 +10,10 @@ import { PyRepoClient } from "./client"; /** A DataStore implementation using ndn-python-repo. */ export class PyRepoStore implements S.Close, S.Insert, S.Delete { - /** Construct with new PyRepoClient. */ + /** Construct with new {@link PyRepoClient}. */ constructor(opts: PyRepoStore.Options); - /** Construct with existing PyRepoClient. */ + /** Construct with existing {@link PyRepoClient}. */ constructor(client: PyRepoClient, opts?: PyRepoStore.StoreOptions); constructor(arg1: PyRepoClient | PyRepoStore.Options, arg2: PyRepoStore.StoreOptions = {}) { @@ -36,7 +36,7 @@ export class PyRepoStore implements S.Close, S.Insert, S.Delete { private readonly throttle: ReturnType; private readonly endpoint: Endpoint; - /** Close the PyRepoClient only if it is created by this store. */ + /** Close the {@link PyRepoClient} only if it is created by this store. */ public async close(): Promise { if (this.ownsClient) { this.client.close(); @@ -52,7 +52,7 @@ export class PyRepoStore implements S.Close, S.Insert, S.Delete { transform(Infinity, (data) => this.throttle(async () => { const answered = pDefer(); const timeout = setTimeout(() => answered.reject(new Error("no incoming Interest")), 5000); - const producer = this.endpoint.produce(data.name, async () => { + using producer = this.endpoint.produce(data.name, async () => { clearTimeout(timeout); setTimeout(() => answered.resolve(), 100); return data; @@ -62,12 +62,8 @@ export class PyRepoStore implements S.Close, S.Insert, S.Delete { }); await delay(100); - try { - await this.client.insert(data.name); - await answered.promise; - } finally { - producer.close(); - } + await this.client.insert(data.name); + await answered.promise; })), consume, ); diff --git a/pkg/tlv/src/ev-decoder.ts b/pkg/tlv/src/ev-decoder.ts index fead5e39..a78e8ede 100644 --- a/pkg/tlv/src/ev-decoder.ts +++ b/pkg/tlv/src/ev-decoder.ts @@ -192,11 +192,17 @@ export namespace EvDecoder { export type IsCritical = (tt: number) => boolean; /** - * IsCritical callback that always returns false. - * This means unrecognized or out-of-order TLV elements are ignored. + * IsCritical callback that always returns `false`. + * Any unrecognized or out-of-order TLV elements would be ignored. */ export const neverCritical: IsCritical = () => false; + /** + * IsCritical callback that always returns `true`. + * Any unrecognized or out-of-order TLV elements would cause an error. + */ + export const alwaysCritical: IsCritical = () => true; + /** * Callback before or after decoding TLV-VALUE. * @param target - Target object. diff --git a/pkg/tlv/src/struct-builder.ts b/pkg/tlv/src/struct-builder.ts index fe711146..dc165808 100644 --- a/pkg/tlv/src/struct-builder.ts +++ b/pkg/tlv/src/struct-builder.ts @@ -1,4 +1,4 @@ -import { assert, toUtf8 } from "@ndn/util"; +import { assert, toHex, toUtf8 } from "@ndn/util"; import type { Constructor, IfNever, Simplify } from "type-fest"; import { type Decodable, Decoder } from "./decoder"; @@ -96,6 +96,20 @@ export const StructFieldText: StructFieldType = { decode: ({ text }) => text, }; +/** + * StructBuilder field type of raw bytes. + * + * @remarks + * The field is defined as Uint8Array. + * If the field is required, it is initialized as an empty Uint8Array. + */ +export const StructFieldBytes: StructFieldType = { + newValue: () => new Uint8Array(), + encode: (value) => value, + decode: ({ value }) => value, + asString: (value) => toHex(value), +}; + /** StructBuilder field options. */ interface Options< Required extends boolean, diff --git a/pkg/tlv/tests/struct-builder.t.ts b/pkg/tlv/tests/struct-builder.t.ts index cc677ca4..131ce996 100644 --- a/pkg/tlv/tests/struct-builder.t.ts +++ b/pkg/tlv/tests/struct-builder.t.ts @@ -2,7 +2,7 @@ import "../test-fixture/expect"; import { expect, test } from "vitest"; -import { Decoder, Encoder, StructBuilder, StructFieldEnum, StructFieldNNI, StructFieldNNIBig, StructFieldText, StructFieldType } from ".."; +import { Decoder, Encoder, StructBuilder, StructFieldBytes, StructFieldEnum, StructFieldNNI, StructFieldNNIBig, StructFieldText, StructFieldType } from ".."; test("basic", () => { const b = new StructBuilder("MyType", 0x40) @@ -251,7 +251,8 @@ test("types", () => { .add(0x41, "a41", StructFieldNNI, { required: true }) .add(0x42, "a42", StructFieldNNIBig, { required: true }) .add(0x43, "a43", StructFieldEnum(MyEnum), { required: true }) - .add(0x44, "a44", StructFieldText, { required: true }); + .add(0x44, "a44", StructFieldText, { required: true }) + .add(0x45, "a45", StructFieldBytes, { required: true }); class MyType extends b.baseClass() {} b.subclass = MyType; @@ -266,18 +267,21 @@ test("types", () => { "a42=0", "a43=0(unknown)", "a44=", + "a45=", ].join(" ")); myObj.a41 = 0xAA41; myObj.a42 = 0xAA42n; myObj.a43 = MyEnum.Q; myObj.a44 = "AA44"; + myObj.a45 = Uint8Array.of(0xAA, 0x45); expect(myObj.toString()).toBe([ "MyType", `a41=${0xAA41}`, `a42=${0xAA42n}`, "a43=2(Q)", "a44=AA44", + "a45=AA45", ].join(" ")); expect(myObj).toEncodeAs( @@ -297,5 +301,9 @@ test("types", () => { expect(type).toBe(0x44); expect(text).toBe("AA44"); }, + ({ type, value }) => { + expect(type).toBe(0x45); + expect(value).toEqualUint8Array([0xAA, 0x45]); + }, ); });