From b30c5656dfcae2463664dc0a2427ea8465a6b257 Mon Sep 17 00:00:00 2001 From: Junxiao Shi Date: Sat, 29 Jun 2024 01:04:32 +0000 Subject: [PATCH] svs: implement SvSync with FwFace.RxTx --- pkg/svs/src/sync.ts | 100 +++++++++++++++++++++++++++----------------- 1 file changed, 62 insertions(+), 38 deletions(-) diff --git a/pkg/svs/src/sync.ts b/pkg/svs/src/sync.ts index 721b1269..d010201d 100644 --- a/pkg/svs/src/sync.ts +++ b/pkg/svs/src/sync.ts @@ -1,10 +1,10 @@ -import { consume, type ConsumerOptions, produce, type Producer, type ProducerHandler } from "@ndn/endpoint"; -import { Forwarder } from "@ndn/fw"; +import { Forwarder, type FwFace, FwPacket } from "@ndn/fw"; import { Version } from "@ndn/naming-convention2"; import { Component, Interest, Name, type NameLike, nullSigner, type Signer, type Verifier } from "@ndn/packet"; import { type SyncNode, type SyncProtocol, SyncUpdate } from "@ndn/sync-api"; import { Decoder, Encoder } from "@ndn/tlv"; -import { randomJitter, trackEventListener } from "@ndn/util"; +import { pushable, randomJitter, trackEventListener } from "@ndn/util"; +import { consume, map, tap } from "streaming-iterables"; import type { Promisable } from "type-fest"; import { TypedEventTarget } from "typescript-event-target"; @@ -24,6 +24,7 @@ interface DebugEntry { type EventMap = SyncProtocol.EventMap & { debug: CustomEvent; + rxerror: CustomEvent<[interest: Interest, e: unknown]>; }; /** StateVectorSync participant. */ @@ -54,26 +55,18 @@ export class SvSync extends TypedEventTarget implements SyncProtocol implements SyncProtocol number, private readonly suppressionTimer: () => number, private readonly svs2suppressionPeriod: number | undefined, - private readonly signer: Signer, - private readonly verifier?: Verifier, ) { super(); } + private makeFace( + fw: Forwarder, + syncInterestLifetime: number, + signer: Signer, + verifier?: Verifier, + ): void { + this.face = fw.addFace({ + rx: map(async (interest) => { + this.debug("send"); + interest.canBePrefix = true; + interest.mustBeFresh = true; + interest.lifetime = syncInterestLifetime; + await signer.sign(interest); + return FwPacket.create(interest); + }, this.txStream), + tx: (iterable) => consume(tap(async (pkt) => { + if (!(FwPacket.isEncodable(pkt) && pkt.l3 instanceof Interest)) { + return; + } + const interest = pkt.l3; + try { + await verifier?.verify(interest); + await this.handleSyncInterest(interest); + } catch (err: unknown) { + this.dispatchTypedEvent("rxerror", new CustomEvent<[interest: Interest, e: unknown]>("rxerror", { + detail: [interest, err], + })); + } + }, iterable)), + }, { + describe: this.describe, + routeCapture: false, + }); + this.face.addRoute(this.syncPrefix); + } + private readonly maybeHaveEventListener = trackEventListener(this); - private producer?: Producer; + private face?: FwFace; + private txStream = pushable(); /** * In steady state, undefined. @@ -121,9 +147,10 @@ export class SvSync extends TypedEventTarget implements SyncProtocol { @@ -154,7 +181,7 @@ export class SvSync extends TypedEventTarget implements SyncProtocol { if (n !== undefined) { // setSeqNum requested - if (!this.producer) { // decrement/remove permitted during initialization + if (!this.face) { // decrement/remove permitted during initialization this.own.set(id, n); } else if (n > this.own.get(id)) { // increment only after initialization this.own.set(id, n); @@ -165,8 +192,11 @@ export class SvSync extends TypedEventTarget implements SyncProtocol { - await this.verifier?.verify(interest); + /** + * Handle incoming sync Interest. + * @param interest - Received Interest, signature verified. + */ + private async handleSyncInterest(interest: Interest): Promise { const vComp = interest.name.at(this.syncPrefix.length); let decoder: Decoder; if (vComp.equals(V2) && !!interest.appParameters) { @@ -203,7 +233,7 @@ export class SvSync extends TypedEventTarget implements SyncProtocol implements SyncProtocol 0) { - void this.sendSyncInterest(); + this.sendSyncInterest(); } this.aggregated = undefined; } else { // in steady state this.debug("timer"); - void this.sendSyncInterest(); + this.sendSyncInterest(); } this.resetTimer(); }; - private async sendSyncInterest(): Promise { - this.debug("send"); - + /** Transmit a sync Interest. */ + private sendSyncInterest(): void { const interest = new Interest(); if (this.svs2interest) { interest.name = this.syncPrefix.append(V2); @@ -249,14 +278,9 @@ export class SvSync extends TypedEventTarget implements SyncProtocol