Skip to content

Commit

Permalink
svs: protect concurrent publish() with mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
yoursunny committed Jun 29, 2024
1 parent b30c565 commit ecf57ab
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 15 deletions.
3 changes: 2 additions & 1 deletion pkg/svs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
"streaming-iterables": "^8.0.1",
"tslib": "^2.6.3",
"type-fest": "^4.20.1",
"typescript-event-target": "^1.1.1"
"typescript-event-target": "^1.1.1",
"wait-your-turn": "^1.0.1"
},
"devDependencies": {
"@ndn/keychain": "workspace:*",
Expand Down
38 changes: 26 additions & 12 deletions pkg/svs/src/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import { BufferChunkSource, type ChunkOptions, DataProducer } from "@ndn/segment
import type { SyncNode } from "@ndn/sync-api";
import { Encoder } from "@ndn/tlv";
import { Closer } from "@ndn/util";
import { collect, map } from "streaming-iterables";
import { collect, parallelMap } from "streaming-iterables";
import { Mutex } from "wait-your-turn";

import { ContentTypeEncap, MappingKeyword, TT, Version0 } from "./an";
import { MappingEntry } from "./mapping-entry";
Expand Down Expand Up @@ -52,6 +53,7 @@ export class SvPublisher {
}

private readonly node: SyncNode<Name>;
private readonly nodeMutex = new Mutex();
private readonly nodeSyncPrefix: Name;
private readonly store: SvPublisher.DataStore;
private readonly chunkOptions: ChunkOptions;
Expand Down Expand Up @@ -90,30 +92,37 @@ export class SvPublisher {
name.append(Version0),
{ signer: this.innerSigner },
));
const finalBlockId = inner.at(-1)!.name.get(-1)!;
// later steps need mutex so that concurrent publishes won't have same seqNum
return this.nodeMutex.use(() => this.publishInner(name, inner, entry));
}

private async publishInner(name: Name, inner: readonly Data[], entry: MappingEntry): Promise<number> {
const seqNum = this.node.seqNum + 1;
const seqNumComp = GenericNumber.create(seqNum);

entry.seqNum = seqNum;
entry.name = name;
const mapping = new Data(this.nodeSyncPrefix.append(MappingKeyword, seqNumComp), Encoder.encode(entry));
const mapping = new Data();
mapping.name = this.nodeSyncPrefix.append(MappingKeyword, seqNumComp);
mapping.content = Encoder.encode(entry);
await nullSigner.sign(mapping);
// single-entry mapping is inserted into DataStore but never served to subscribers directly;
// it is for use by handleMapping only

const outer = map(async (data) => {
const encap = new Data(
this.nodeSyncPrefix.append(seqNumComp, Version0, data.name.get(-1)!),
Data.ContentType(ContentTypeEncap),
Data.FreshnessPeriod(60000),
Encoder.encode(data),
);
const finalBlockId = inner.at(-1)!.name.get(-1)!;
const outer = parallelMap(16, async (data) => {
const encap = new Data();
encap.name = this.nodeSyncPrefix.append(seqNumComp, Version0, data.name.get(-1)!);
encap.contentType = ContentTypeEncap;
encap.freshnessPeriod = 60000;
encap.finalBlockId = finalBlockId;
encap.content = Encoder.encode(data);
await this.outerSigner.sign(encap);
return encap;
}, inner);

await this.store.insert(mapping, outer);
this.node.seqNum = seqNum;
this.node.seqNum = seqNum; // triggers sync Interest
return seqNum;
}

Expand Down Expand Up @@ -183,7 +192,12 @@ export namespace SvPublisher {
*/
sync: SvSync;

/** Publisher node ID. */
/**
* Publisher node ID.
*
* @remarks
* Each publisher must have a unique node ID.
*/
id: Name;

/** Data repository used for this publisher. */
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/src/closers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ export namespace Closer {
/** Close or dispose an object. */
export function close(c: any): Promisable<void> {
for (const key of ["close", Symbol.dispose, Symbol.asyncDispose] as const) {
if (typeof c[key] === "function") {
if (typeof c?.[key] === "function") {
return c[key]();
}
}
}

/** Convert a closable object to AsyncDisposable. */
export function asAsyncDisposable(c: Closer | Disposable | AsyncDisposable): AsyncDisposable {
if (typeof (c as any)[Symbol.asyncDispose] === "function") {
if (typeof (c as AsyncDisposable)[Symbol.asyncDispose] === "function") {
return c as AsyncDisposable;
}
return {
Expand Down

0 comments on commit ecf57ab

Please sign in to comment.