Skip to content

Commit

Permalink
Fix Subscriber model when removed within emit callback.
Browse files Browse the repository at this point in the history
  • Loading branch information
ricmoo committed Feb 4, 2023
1 parent 32b1e78 commit d0ed918
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 59 deletions.
2 changes: 2 additions & 0 deletions src.ts/providers/abstract-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ type Sub = {
nameMap: Map<string, string>
addressableMap: WeakMap<Addressable, string>;
listeners: Array<{ listener: Listener, once: boolean }>;
// @TODO: get rid of this, as it is (and has to be)
// tracked in subscriber
started: boolean;
subscriber: Subscriber;
};
Expand Down
9 changes: 9 additions & 0 deletions src.ts/providers/subscriber-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,21 @@ export class BlockConnectionSubscriber implements Subscriber {
#provider: ConnectionRpcProvider;
#blockNumber: number;

#running: boolean;

#filterId: null | number;

constructor(provider: ConnectionRpcProvider) {
this.#provider = provider;
this.#blockNumber = -2;
this.#running = false;
this.#filterId = null;
}

start(): void {
if (this.#running) { return; }
this.#running = true;

this.#filterId = this.#provider._subscribe([ "newHeads" ], (result: any) => {
const blockNumber = getNumber(result.number);
const initial = (this.#blockNumber === -2) ? blockNumber: (this.#blockNumber + 1)
Expand All @@ -47,6 +53,9 @@ export class BlockConnectionSubscriber implements Subscriber {
}

stop(): void {
if (!this.#running) { return; }
this.#running = false;

if (this.#filterId != null) {
this.#provider._unsubscribe(this.#filterId);
this.#filterId = null;
Expand Down
14 changes: 13 additions & 1 deletion src.ts/providers/subscriber-filterid.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ export class FilterIdSubscriber implements Subscriber {
#filterIdPromise: null | Promise<string>;
#poller: (b: number) => Promise<void>;

#running: boolean;

#network: null | Network;

#hault: boolean;
Expand All @@ -36,6 +38,8 @@ export class FilterIdSubscriber implements Subscriber {
this.#filterIdPromise = null;
this.#poller = this.#poll.bind(this);

this.#running = false;

this.#network = null;

this.#hault = false;
Expand Down Expand Up @@ -91,9 +95,17 @@ export class FilterIdSubscriber implements Subscriber {
}
}

start(): void { this.#poll(-2); }
start(): void {
if (this.#running) { return; }
this.#running = true;

this.#poll(-2);
}

stop(): void {
if (!this.#running) { return; }
this.#running = false;

this.#hault = true;
this.#teardown();
this.#provider.off("block", this.#poller);
Expand Down
21 changes: 19 additions & 2 deletions src.ts/providers/subscriber-polling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ export class PollingBlockSubscriber implements Subscriber {
}

start(): void {
if (this.#poller) { throw new Error("subscriber already running"); }
if (this.#poller) { return; }
this.#poller = this.#provider._setTimeout(this.#poll.bind(this), this.#interval);
this.#poll();
}

stop(): void {
if (!this.#poller) { throw new Error("subscriber not running"); }
if (!this.#poller) { return; }
this.#provider._clearTimeout(this.#poller);
this.#poller = null;
}
Expand All @@ -105,9 +105,11 @@ export class PollingBlockSubscriber implements Subscriber {
export class OnBlockSubscriber implements Subscriber {
#provider: AbstractProvider;
#poll: (b: number) => void;
#running: boolean;

constructor(provider: AbstractProvider) {
this.#provider = provider;
this.#running = false;
this.#poll = (blockNumber: number) => {
this._poll(blockNumber, this.#provider);
}
Expand All @@ -118,11 +120,17 @@ export class OnBlockSubscriber implements Subscriber {
}

start(): void {
if (this.#running) { return; }
this.#running = true;

this.#poll(-2);
this.#provider.on("block", this.#poll);
}

stop(): void {
if (!this.#running) { return; }
this.#running = false;

this.#provider.off("block", this.#poll);
}

Expand Down Expand Up @@ -178,6 +186,8 @@ export class PollingEventSubscriber implements Subscriber {
#filter: EventFilter;
#poller: (b: number) => void;

#running: boolean;

// The most recent block we have scanned for events. The value -2
// indicates we still need to fetch an initial block number
#blockNumber: number;
Expand All @@ -186,6 +196,7 @@ export class PollingEventSubscriber implements Subscriber {
this.#provider = provider;
this.#filter = copy(filter);
this.#poller = this.#poll.bind(this);
this.#running = false;
this.#blockNumber = -2;
}

Expand Down Expand Up @@ -215,6 +226,9 @@ export class PollingEventSubscriber implements Subscriber {
}

start(): void {
if (this.#running) { return; }
this.#running = true;

if (this.#blockNumber === -2) {
this.#provider.getBlockNumber().then((blockNumber) => {
this.#blockNumber = blockNumber;
Expand All @@ -224,6 +238,9 @@ export class PollingEventSubscriber implements Subscriber {
}

stop(): void {
if (!this.#running) { return; }
this.#running = false;

this.#provider.off("block", this.#poller);
}

Expand Down
56 changes: 0 additions & 56 deletions src.ts/providers/subscriber.ts

This file was deleted.

0 comments on commit d0ed918

Please sign in to comment.