Skip to content

Commit

Permalink
svs: implement SvSync with FwFace.RxTx
Browse files Browse the repository at this point in the history
  • Loading branch information
yoursunny committed Jun 29, 2024
1 parent f980865 commit b30c565
Showing 1 changed file with 62 additions and 38 deletions.
100 changes: 62 additions & 38 deletions pkg/svs/src/sync.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { consume, type ConsumerOptions, produce, type Producer, type ProducerHandler } from "@ndn/endpoint";
import { Forwarder } from "@ndn/fw";
import { Forwarder, type FwFace, FwPacket } from "@ndn/fw";
import { Version } from "@ndn/naming-convention2";
import { Component, Interest, Name, type NameLike, nullSigner, type Signer, type Verifier } from "@ndn/packet";
import { type SyncNode, type SyncProtocol, SyncUpdate } from "@ndn/sync-api";
import { Decoder, Encoder } from "@ndn/tlv";
import { randomJitter, trackEventListener } from "@ndn/util";
import { pushable, randomJitter, trackEventListener } from "@ndn/util";
import { consume, map, tap } from "streaming-iterables";
import type { Promisable } from "type-fest";
import { TypedEventTarget } from "typescript-event-target";

Expand All @@ -24,6 +24,7 @@ interface DebugEntry {

type EventMap = SyncProtocol.EventMap<Name> & {
debug: CustomEvent<DebugEntry>;
rxerror: CustomEvent<[interest: Interest, e: unknown]>;
};

/** StateVectorSync participant. */
Expand Down Expand Up @@ -54,26 +55,18 @@ export class SvSync extends TypedEventTarget<EventMap> implements SyncProtocol<N
describe,
initialStateVector,
svs2interest,
Interest.makeModifyFunc({
canBePrefix: true,
mustBeFresh: true,
lifetime: syncInterestLifetime,
}),
{ fw, describe: `${describe}[c]`, retx: 0 },
randomJitter(periodicTimeout[1], periodicTimeout[0]),
svs2suppression ? suppressionTimeout :
randomJitter(suppressionTimer[1], suppressionTimer[0]),
svs2suppression ? suppressionPeriod : undefined,
signer,
verifier,
);

await initialize?.(sync);
sync.producer = produce(syncPrefix, sync.handleSyncInterest, {
sync.makeFace(
fw,
describe: `${describe}[p]`,
routeCapture: false,
});
syncInterestLifetime,
signer,
verifier,
);
return sync;
}

Expand All @@ -82,19 +75,52 @@ export class SvSync extends TypedEventTarget<EventMap> implements SyncProtocol<N
public readonly describe: string,
private readonly own: StateVector,
private readonly svs2interest: boolean,
private readonly modifyInterest: Interest.ModifyFunc,
private readonly cOpts: ConsumerOptions,
private readonly steadyTimer: () => number,
private readonly suppressionTimer: () => number,
private readonly svs2suppressionPeriod: number | undefined,
private readonly signer: Signer,
private readonly verifier?: Verifier,
) {
super();
}

private makeFace(
fw: Forwarder,
syncInterestLifetime: number,
signer: Signer,
verifier?: Verifier,
): void {
this.face = fw.addFace({
rx: map(async (interest) => {
this.debug("send");
interest.canBePrefix = true;
interest.mustBeFresh = true;
interest.lifetime = syncInterestLifetime;
await signer.sign(interest);
return FwPacket.create(interest);
}, this.txStream),
tx: (iterable) => consume(tap(async (pkt) => {
if (!(FwPacket.isEncodable(pkt) && pkt.l3 instanceof Interest)) {
return;
}
const interest = pkt.l3;
try {
await verifier?.verify(interest);
await this.handleSyncInterest(interest);
} catch (err: unknown) {
this.dispatchTypedEvent("rxerror", new CustomEvent<[interest: Interest, e: unknown]>("rxerror", {
detail: [interest, err],
}));
}
}, iterable)),
}, {
describe: this.describe,
routeCapture: false,
});
this.face.addRoute(this.syncPrefix);
}

private readonly maybeHaveEventListener = trackEventListener(this);
private producer?: Producer;
private face?: FwFace;
private txStream = pushable<Interest>();

/**
* In steady state, undefined.
Expand All @@ -121,9 +147,10 @@ export class SvSync extends TypedEventTarget<EventMap> implements SyncProtocol<N
}));
}

/** Cease operations. */
public close(): void {
clearTimeout(this.timer);
this.producer?.close();
this.face?.close();
}

public get(id: NameLike): SyncNode<Name> {
Expand Down Expand Up @@ -154,7 +181,7 @@ export class SvSync extends TypedEventTarget<EventMap> implements SyncProtocol<N
*/
private readonly nodeOp = (id: Name, n: number | undefined): number => {
if (n !== undefined) { // setSeqNum requested
if (!this.producer) { // decrement/remove permitted during initialization
if (!this.face) { // decrement/remove permitted during initialization
this.own.set(id, n);
} else if (n > this.own.get(id)) { // increment only after initialization
this.own.set(id, n);
Expand All @@ -165,8 +192,11 @@ export class SvSync extends TypedEventTarget<EventMap> implements SyncProtocol<N
return this.own.get(id);
};

private readonly handleSyncInterest: ProducerHandler = async (interest) => {
await this.verifier?.verify(interest);
/**
* Handle incoming sync Interest.
* @param interest - Received Interest, signature verified.
*/
private async handleSyncInterest(interest: Interest): Promise<void> {
const vComp = interest.name.at(this.syncPrefix.length);
let decoder: Decoder;
if (vComp.equals(V2) && !!interest.appParameters) {
Expand Down Expand Up @@ -203,7 +233,7 @@ export class SvSync extends TypedEventTarget<EventMap> implements SyncProtocol<N
}
this.resetTimer();
return undefined;
};
}

private shouldEnterSuppression(ourNewer: readonly StateVector.DiffEntry[]): boolean {
if (this.svs2suppressionPeriod === undefined) {
Expand All @@ -228,35 +258,29 @@ export class SvSync extends TypedEventTarget<EventMap> implements SyncProtocol<N
ourNewer: ourNewer.length,
});
if (ourNewer.length > 0) {
void this.sendSyncInterest();
this.sendSyncInterest();
}
this.aggregated = undefined;
} else { // in steady state
this.debug("timer");
void this.sendSyncInterest();
this.sendSyncInterest();
}

this.resetTimer();
};

private async sendSyncInterest(): Promise<void> {
this.debug("send");

/** Transmit a sync Interest. */
private sendSyncInterest(): void {
const interest = new Interest();
if (this.svs2interest) {
interest.name = this.syncPrefix.append(V2);
interest.appParameters = Encoder.encode(this.own);
} else {
interest.name = this.syncPrefix.append(new Component(Encoder.encode(this.own)));
}
this.modifyInterest(interest);
await this.signer.sign(interest);

try {
await consume(interest, this.cOpts);
} catch {
// not expecting a reply, so that a timeout will happen and it shall be ignored
}
this.txStream.push(interest);
// further modification and signing occur in the the logical face
}
}

Expand Down

0 comments on commit b30c565

Please sign in to comment.