Skip to content

Commit

Permalink
repo-external: rewrite PyRepoClient
Browse files Browse the repository at this point in the history
  • Loading branch information
yoursunny committed Feb 6, 2024
1 parent 6077d95 commit 9f6ba36
Show file tree
Hide file tree
Showing 11 changed files with 291 additions and 230 deletions.
6 changes: 5 additions & 1 deletion pkg/endpoint/src/producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ export interface ProducerOptions {
}

/** A running producer. */
export interface Producer {
export interface Producer extends Disposable {
/**
* Prefix specified in {@link Endpoint.produce} call.
* Additional prefixes can be added via `.face.addRoute()`.
Expand Down Expand Up @@ -214,6 +214,10 @@ export class ProducerImpl implements Producer {
this.face.close();
this.signal?.removeEventListener("abort", this.close);
};

public [Symbol.dispose](): void {
this.close();
}
}

export async function signUnsignedData(data: Data, dataSigner: Signer | undefined) {
Expand Down
26 changes: 19 additions & 7 deletions pkg/repo-external/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,28 @@ This package allows inserting and deleting Data in [ndn-python-repo](https://git
`PyRepoClient` type is a client for [ndn-python-repo protocol](https://github.com/UCLA-IRL/ndn-python-repo/tree/v0.2a5/docs/src/specification).
`PyRepoStore` type implements a subset of `DataStore` interfaces defined in `@ndn/repo-api` package.

This implementation is compatible with ndn-python-repo version 0.2a5.
Newer versions of ndn-python-repo are not supported due to [ndn-python-repo issue #60](https://github.com/UCLA-IRL/ndn-python-repo/issues/60).
To install the specified version, run:
This implementation is compatible with ndn-python-repo `dda1dce1` (2024-02-04).
To install and start the specified version, run:

```bash
pip install ndn-python-repo==0.2a5 python-ndn==0.2b2.post1
# create Python virtual environment
python3 -m venv pyrepo-venv
cd pyrepo-venv
source ./bin/activate

# install ndn-python-repo
pip install git+https://github.com/UCLA-IRL/ndn-python-repo@dda1dce135a952498a2a79d3cddf9c3ee33399d0

# run ndn-python-repo
export NDN_CLIENT_TRANSPORT=unix:///run/nfd/nfd.sock
ndn-python-repo
```

As tested on 2023-02-16, this version of ndn-python-repo is no longer compatible with the latest NFD and NDNts.
This is caused by changes in Interest ForwardingHint encoding.
Current implementation is unoptimized.
In particular, each insertion/deletion command is individually sent to the repo.
Neither segmented nor bundled operation would be used.

There are not tests, other than the demo below.

```ts
import { PyRepoStore } from "@ndn/repo-external";
Expand Down Expand Up @@ -44,7 +56,7 @@ const store = new PyRepoStore({
});

const packets: Data[] = [];
for (let i = 0; i < 1; ++i) {
for (let i = 0; i < 100; ++i) {
const data = new Data(dataPrefix.append(`${i}`));
data.freshnessPeriod = 1;
await digestSigning.sign(data);
Expand Down
49 changes: 21 additions & 28 deletions pkg/repo-external/src/prps/packet.ts
Original file line number Diff line number Diff line change
@@ -1,35 +1,28 @@
import { Component, Name, TT as l3TT } from "@ndn/packet";
import { type Decoder, Encoder, EvDecoder } from "@ndn/tlv";
import { Component, FwHint, Interest, type Name, StructFieldName, StructFieldNameNested, TT as l3TT } from "@ndn/packet";
import { EvDecoder, StructBuilder, StructFieldBytes } from "@ndn/tlv";

export const MsgSuffix = Component.from("msg");
export const NotifySuffix = Component.from("notify");

const TT = {
NotifyNonce: 0x80,
PublisherFwHint: 0xD3,
} as const;

const EVD = new EvDecoder<NotifyParams>("NotifyParams")
.add(l3TT.Name, (t, { decoder }) => t.publisher = decoder.decode(Name))
.add(TT.NotifyNonce, (t, { value }) => t.nonce = value)
.add(TT.PublisherFwHint, (t, { vd }) => t.publisherFwHint = vd.decode(Name));

export class NotifyParams {
public static decodeFrom(decoder: Decoder): NotifyParams {
return EVD.decodeValue(new NotifyParams(new Name(), new Uint8Array()), decoder);
}

constructor(
public publisher: Name,
public nonce: Uint8Array,
public publisherFwHint?: Name,
) {}
const enum TT {
NotifyNonce = 0x80,
PublisherFwHint = 0xD3,
}

public encodeTo(encoder: Encoder) {
encoder.prependValue(
this.publisher,
[TT.NotifyNonce, this.nonce],
[TT.PublisherFwHint, Encoder.OmitEmpty, this.publisherFwHint],
);
const buildNotifyAppParam = new StructBuilder("NotifyAppParam")
.add(l3TT.Name, "publisher", StructFieldName, { required: true })
.add(TT.NotifyNonce, "nonce", StructFieldBytes, { required: true })
.add(TT.PublisherFwHint, "publisherFwHint", StructFieldNameNested)
.setIsCritical(EvDecoder.alwaysCritical);
/** ndn-python-repo PubSub NotifyAppParam struct. */
export class NotifyAppParam extends buildNotifyAppParam.baseClass<NotifyAppParam>() {
/** Create a message Interest from enclosed publisher information. */
public makeMsgInterest(topic: Name): Interest {
const interest = new Interest();
interest.name = this.publisher.append(
MsgSuffix, ...topic.comps, new Component(undefined, this.nonce));
interest.fwHint = this.publisherFwHint && new FwHint(this.publisherFwHint);
return interest;
}
}
buildNotifyAppParam.subclass = NotifyAppParam;
83 changes: 45 additions & 38 deletions pkg/repo-external/src/prps/publisher.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
import { Endpoint, type Producer, type ProducerHandler, type RetxPolicy } from "@ndn/endpoint";
import { SequenceNum } from "@ndn/naming-convention2";
import { Component, Data, digestSigning, Interest, Name, NameMap, type Signer } from "@ndn/packet";
import { Component, Data, digestSigning, Interest, Name, NameMap, SignedInterestPolicy, type Signer } from "@ndn/packet";
import { type Encodable, Encoder } from "@ndn/tlv";
import { asDataView } from "@ndn/util";
import { crypto } from "@ndn/util";

import { MsgSuffix, NotifyParams, NotifySuffix } from "./packet";
import { MsgSuffix, NotifyAppParam, NotifySuffix } from "./packet";

/** PyRepo PubSub protocol publisher. */
type Item = Encodable | PrpsPublisher.PublicationCallback;

interface Pending {
topic: Name;
item: Item;
}

const notifySIP = new SignedInterestPolicy(SignedInterestPolicy.Nonce());

/** ndn-python-repo PubSub protocol publisher. */
export class PrpsPublisher {
constructor({
endpoint = new Endpoint(),
Expand All @@ -23,8 +32,7 @@ export class PrpsPublisher {
this.pubSigner = pubSigner;
this.notifyInterestLifetime = notifyInterestLifetime;
this.notifyRetx = notifyRetx;
this.messagePrefix = pubPrefix.append(MsgSuffix);
this.messageProducer = this.endpoint.produce(this.messagePrefix, this.handleMessageInterest, {
this.msgProducer = this.endpoint.produce(pubPrefix.append(MsgSuffix), this.handleMsgInterest, {
describe: `prps-pub(${pubPrefix})`,
announcement: pubAnnouncement ?? pubFwHint ?? pubPrefix,
});
Expand All @@ -36,36 +44,36 @@ export class PrpsPublisher {
private readonly pubSigner: Signer;
private readonly notifyInterestLifetime: number;
private readonly notifyRetx: RetxPolicy;
private readonly messagePrefix: Name;
private readonly messageProducer: Producer;
private readonly msgProducer: Producer;
private readonly pendings = new NameMap<Pending>();

public close(): void {
this.messageProducer.close();
this.msgProducer.close();
}

public async publish(topic: Name, item: Item): Promise<void> {
const notifyNonce = new Uint8Array(4);
const notifyNonceDataView = asDataView(notifyNonce);
const notifyNonce = new Uint8Array(8);
let key: Name;
do {
notifyNonceDataView.setUint32(0, Math.random() * 0xFFFFFFFF);
key = this.messagePrefix.append(...topic.comps, new Component(undefined, notifyNonce));
crypto.getRandomValues(notifyNonce);
key = this.pubPrefix.append(MsgSuffix, ...topic.comps, new Component(undefined, notifyNonce));
} while (this.pendings.has(key));

this.pendings.set(key, {
topic,
item,
});
this.pendings.set(key, { topic, item });

const notifyParam = new NotifyAppParam();
notifyParam.publisher = this.pubPrefix;
notifyParam.nonce = notifyNonce;
notifyParam.publisherFwHint = this.pubFwHint;
const notifyInterest = new Interest();
notifyInterest.name = topic.append(NotifySuffix);
notifyInterest.lifetime = this.notifyInterestLifetime;
notifyInterest.appParameters = Encoder.encode(notifyParam);
notifySIP.update(notifyInterest, this);
await digestSigning.sign(notifyInterest);

try {
const notify = new Interest();
notify.name = topic.append(NotifySuffix);
notify.lifetime = this.notifyInterestLifetime;
notify.appParameters = Encoder.encode(
new NotifyParams(this.pubPrefix, notifyNonce, this.pubFwHint));
await notify.updateParamsDigest();
await this.endpoint.consume(notify, {
await this.endpoint.consume(notifyInterest, {
describe: `prps-notify(${this.pubPrefix} ${topic})`,
retx: this.notifyRetx,
});
Expand All @@ -74,7 +82,7 @@ export class PrpsPublisher {
}
}

private handleMessageInterest: ProducerHandler = async (interest) => {
private readonly handleMsgInterest: ProducerHandler = async (interest) => {
const pending = this.pendings.get(interest.name);
if (!pending) {
return undefined;
Expand Down Expand Up @@ -102,21 +110,18 @@ export class PrpsPublisher {
};
}

type Item = Encodable | PrpsPublisher.PublicationCallback;

interface Pending {
topic: Name;
item: Item;
}

export namespace PrpsPublisher {
export interface Options {
/** Endpoint for communication. */
/**
* Endpoint for communication.
* @defaultValue
* Endpoint on default logical forwarder.
*/
endpoint?: Endpoint;

/**
* Name prefix of the local application.
* Default is a random local name that only works when the subscriber is on local machine.
* @defaultValue "/localhost" + random-suffix
*/
pubPrefix?: Name;

Expand All @@ -125,14 +130,16 @@ export namespace PrpsPublisher {

/**
* Prefix announcement to receive msg Interests.
* Default is pubFwHint, or pubPrefix.
* @defaultValue `.pubFwHint ?? .pubPrefix`
*/
pubAnnouncement?: Name | false;

/**
* Key to sign publications.
* @defaultValue `digestSigning`
*
* @remarks
* This key should be trusted to sign objects under pubPrefix.
* Default is digest signing.
* This may overridden on a per-publication basis by PublicationCallback returning Data.
*/
pubSigner?: Signer;
Expand All @@ -142,7 +149,7 @@ export namespace PrpsPublisher {

/**
* Retransmission policy of notify Interests.
* Default is 2 retransmissions.
* @defaultValue 2 retransmissions
*/
notifyRetx?: RetxPolicy;
}
Expand All @@ -151,7 +158,7 @@ export namespace PrpsPublisher {
* A callback function to generate publication packet.
* @param name - Expected Data name.
* @param topic - Topic name.
* @returns either a Data that is already signed, or an Encodable object to use as publication body.
* @returns Either a Data that is already signed, or an Encodable to use as publication body.
*/
export type PublicationCallback = (name: Name, topic: Name) => Promise<Data | Encodable>;
}
47 changes: 27 additions & 20 deletions pkg/repo-external/src/prps/subscriber.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { Endpoint, type Producer, type ProducerHandler, type RetxPolicy } from "@ndn/endpoint";
import { Component, Data, digestSigning, FwHint, Interest, type Name, type Signer, type Verifier } from "@ndn/packet";
import { Data, digestSigning, Interest, type Name, type Signer, type Verifier } from "@ndn/packet";
import { Decoder } from "@ndn/tlv";
import { pushable } from "@ndn/util";

import { MsgSuffix, NotifyParams, NotifySuffix } from "./packet";
import { NotifyAppParam, NotifySuffix } from "./packet";

/** PyRepo PubSub protocol subscriber. */
/** ndn-python-repo PubSub protocol subscriber. */
export class PrpsSubscriber {
constructor({
endpoint = new Endpoint(),
Expand Down Expand Up @@ -68,55 +68,62 @@ class Subscription implements PrpsSubscriber.Subscription {
private notifyProducer: Producer;
private readonly messages = pushable<Data>();

private handleNotifyInterest: ProducerHandler = async (interest) => {
if (interest.name.length !== this.notifyPrefix.length + 1 || !interest.appParameters) {
private readonly handleNotifyInterest: ProducerHandler = async (interest) => {
if (interest.name.length <= this.notifyPrefix.length || !interest.appParameters) {
return undefined;
}

const { publisher, nonce, publisherFwHint } = Decoder.decode(interest.appParameters, NotifyParams);
const messageInterest = new Interest(publisher.append(
MsgSuffix, ...this.topic.comps, new Component(undefined, nonce)));
if (publisherFwHint) {
messageInterest.fwHint = new FwHint(publisherFwHint);
}
messageInterest.lifetime = this.msgInterestLifetime;
const messageData = await this.endpoint.consume(messageInterest, {
describe: `prps-msg(${this.topic} ${publisher})`,
const notifyParam = Decoder.decode(interest.appParameters, NotifyAppParam);
const msgInterest = notifyParam.makeMsgInterest(this.topic);
msgInterest.lifetime = this.msgInterestLifetime;

const msgData = await this.endpoint.consume(msgInterest, {
describe: `prps-msg(${this.topic} ${notifyParam.publisher})`,
retx: this.msgRetx,
verifier: this.pubVerifier,
});
this.messages.push(messageData);
this.messages.push(msgData);

return new Data(interest.name);
};
}

export namespace PrpsSubscriber {
export interface Options {
/** Endpoint for communication. */
/**
* Endpoint for communication.
* @defaultValue
* Endpoint on default logical forwarder.
*/
endpoint?: Endpoint;

/** InterestLifetime of msg Interests. */
msgInterestLifetime?: number;

/**
* Retransmission policy of msg Interests.
* Default is 2 retransmissions.
* @defaultValue 2 retransmissions
*/
msgRetx?: RetxPolicy;

/**
* Verifier for publications.
* Default is no verification.
* @defaultValue no verification
*/
pubVerifier?: Verifier;

/** Set to false to disable prefix announcements for receiving notify Interests. */
/**
* Set to false to disable prefix announcements for receiving notify Interests.
*
* @remarks
* This should be set only if the application already has a prefix announcement that covers
* the `topic` of each subscription.
*/
subAnnouncement?: false;

/**
* Key to sign notify Data.
* Default is digest signing.
* @defaultValue `digestSigning`
*/
subSigner?: Signer;
}
Expand Down
Loading

0 comments on commit 9f6ba36

Please sign in to comment.