Skip to content

Commit

Permalink
events: fix EventTarget support
Browse files Browse the repository at this point in the history
Remove support for multiple arguments (which don't actually work for
EventTarget). Use our EventTarget implementation rather than a mock.

Refactor `once` code in preparation of `on` using shared code for
supporting `on` with `EventTarget`s.

Support EventTarget in the `events.on` static method

PR-URL: #34015
Reviewed-By: Denys Otrishko <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
  • Loading branch information
benjamingr authored and jasnell committed Jun 24, 2020
1 parent 42d932c commit 4b3654e
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 111 deletions.
84 changes: 49 additions & 35 deletions lib/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -623,41 +623,20 @@ function unwrapListeners(arr) {

function once(emitter, name) {
return new Promise((resolve, reject) => {
if (typeof emitter.addEventListener === 'function') {
// EventTarget does not have `error` event semantics like Node
// EventEmitters, we do not listen to `error` events here.
emitter.addEventListener(
name,
(...args) => { resolve(args); },
{ once: true }
);
return;
}

const eventListener = (...args) => {
if (errorListener !== undefined) {
const errorListener = (err) => {
emitter.removeListener(name, resolver);
reject(err);
};
const resolver = (...args) => {
if (typeof emitter.removeListener === 'function') {
emitter.removeListener('error', errorListener);
}
resolve(args);
};
let errorListener;

// Adding an error listener is not optional because
// if an error is thrown on an event emitter we cannot
// guarantee that the actual event we are waiting will
// be fired. The result could be a silent way to create
// memory or file descriptor leaks, which is something
// we should avoid.
eventTargetAgnosticAddListener(emitter, name, resolver, { once: true });
if (name !== 'error') {
errorListener = (err) => {
emitter.removeListener(name, eventListener);
reject(err);
};

emitter.once('error', errorListener);
addErrorHandlerIfEventEmitter(emitter, errorListener, { once: true });
}

emitter.once(name, eventListener);
});
}

Expand All @@ -668,6 +647,38 @@ function createIterResult(value, done) {
return { value, done };
}

function addErrorHandlerIfEventEmitter(emitter, handler, flags) {
if (typeof emitter.on === 'function') {
eventTargetAgnosticAddListener(emitter, 'error', handler, flags);
}
}

function eventTargetAgnosticRemoveListener(emitter, name, listener, flags) {
if (typeof emitter.removeListener === 'function') {
emitter.removeListener(name, listener);
} else if (typeof emitter.removeEventListener === 'function') {
emitter.removeEventListener(name, listener, flags);
} else {
throw new ERR_INVALID_ARG_TYPE('emitter', 'EventEmitter', emitter);
}
}

function eventTargetAgnosticAddListener(emitter, name, listener, flags) {
if (typeof emitter.on === 'function') {
if (flags && flags.once) {
emitter.once(name, listener);
} else {
emitter.on(name, listener);
}
} else if (typeof emitter.addEventListener === 'function') {
// EventTarget does not have `error` event semantics like Node
// EventEmitters, we do not listen to `error` events here.
emitter.addEventListener(name, (arg) => { listener(arg); }, flags);
} else {
throw new ERR_INVALID_ARG_TYPE('emitter', 'EventEmitter', emitter);
}
}

function on(emitter, event) {
const unconsumedEvents = [];
const unconsumedPromises = [];
Expand Down Expand Up @@ -704,8 +715,8 @@ function on(emitter, event) {
},

return() {
emitter.removeListener(event, eventHandler);
emitter.removeListener('error', errorHandler);
eventTargetAgnosticRemoveListener(emitter, event, eventHandler);
eventTargetAgnosticRemoveListener(emitter, 'error', errorHandler);
finished = true;

for (const promise of unconsumedPromises) {
Expand All @@ -721,17 +732,20 @@ function on(emitter, event) {
'Error', err);
}
error = err;
emitter.removeListener(event, eventHandler);
emitter.removeListener('error', errorHandler);
eventTargetAgnosticRemoveListener(emitter, event, eventHandler);
eventTargetAgnosticRemoveListener(emitter, 'error', errorHandler);
},

[SymbolAsyncIterator]() {
return this;
}
}, AsyncIteratorPrototype);

emitter.on(event, eventHandler);
emitter.on('error', errorHandler);
eventTargetAgnosticAddListener(emitter, event, eventHandler);
if (event !== 'error') {
addErrorHandlerIfEventEmitter(emitter, errorHandler);
}


return iterator;

Expand Down
50 changes: 49 additions & 1 deletion test/parallel/test-event-on-async-iterator.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
// Flags: --expose-internals
'use strict';

const common = require('../common');
const assert = require('assert');
const { on, EventEmitter } = require('events');
const {
EventTarget,
NodeEventTarget,
Event
} = require('internal/event_target');

async function basic() {
const ee = new EventEmitter();
Expand Down Expand Up @@ -204,6 +210,45 @@ async function iterableThrow() {
assert.strictEqual(ee.listenerCount('error'), 0);
}

async function eventTarget() {
const et = new EventTarget();
const tick = () => et.dispatchEvent(new Event('tick'));
const interval = setInterval(tick, 0);
let count = 0;
for await (const [ event ] of on(et, 'tick')) {
count++;
assert.strictEqual(event.type, 'tick');
if (count >= 5) {
break;
}
}
assert.strictEqual(count, 5);
clearInterval(interval);
}

async function errorListenerCount() {
const et = new EventEmitter();
on(et, 'foo');
assert.strictEqual(et.listenerCount('error'), 1);
}

async function nodeEventTarget() {
const et = new NodeEventTarget();
const tick = () => et.dispatchEvent(new Event('tick'));
const interval = setInterval(tick, 0);
let count = 0;
for await (const [ event] of on(et, 'tick')) {
count++;
assert.strictEqual(event.type, 'tick');
if (count >= 5) {
break;
}
}
assert.strictEqual(count, 5);
clearInterval(interval);
}


async function run() {
const funcs = [
basic,
Expand All @@ -212,7 +257,10 @@ async function run() {
throwInLoop,
next,
nextError,
iterableThrow
iterableThrow,
eventTarget,
errorListenerCount,
nodeEventTarget
];

for (const fn of funcs) {
Expand Down
98 changes: 23 additions & 75 deletions test/parallel/test-events-once.js
Original file line number Diff line number Diff line change
@@ -1,55 +1,10 @@
'use strict';
// Flags: --expose-internals

const common = require('../common');
const { once, EventEmitter } = require('events');
const { strictEqual, deepStrictEqual } = require('assert');

class EventTargetMock {
constructor() {
this.events = {};
}

addEventListener = common.mustCall(function(name, listener, options) {
if (!(name in this.events)) {
this.events[name] = { listeners: [], options };
}
this.events[name].listeners.push(listener);
});

removeEventListener = common.mustCall(function(name, callback) {
if (!(name in this.events)) {
return;
}
const event = this.events[name];
const stack = event.listeners;

for (let i = 0, l = stack.length; i < l; i++) {
if (stack[i] === callback) {
stack.splice(i, 1);
if (stack.length === 0) {
Reflect.deleteProperty(this.events, name);
}
return;
}
}
});

dispatchEvent = function(name, ...arg) {
if (!(name in this.events)) {
return true;
}
const event = this.events[name];
const stack = event.listeners.slice();

for (let i = 0, l = stack.length; i < l; i++) {
stack[i].apply(this, arg);
if (event.options.once) {
this.removeEventListener(name, stack[i]);
}
}
return !name.defaultPrevented;
};
}
const { strictEqual, deepStrictEqual, fail } = require('assert');
const { EventTarget, Event } = require('internal/event_target');

async function onceAnEvent() {
const ee = new EventEmitter();
Expand Down Expand Up @@ -104,8 +59,6 @@ async function stopListeningAfterCatchingError() {
ee.emit('myevent', 42, 24);
});

process.on('multipleResolves', common.mustNotCall());

try {
await once(ee, 'myevent');
} catch (_e) {
Expand All @@ -125,54 +78,49 @@ async function onceError() {
ee.emit('error', expected);
});

const [err] = await once(ee, 'error');
const promise = once(ee, 'error');
strictEqual(ee.listenerCount('error'), 1);
const [ err ] = await promise;
strictEqual(err, expected);
strictEqual(ee.listenerCount('error'), 0);
strictEqual(ee.listenerCount('myevent'), 0);
}

async function onceWithEventTarget() {
const et = new EventTargetMock();

const et = new EventTarget();
const event = new Event('myevent');
process.nextTick(() => {
et.dispatchEvent('myevent', 42);
et.dispatchEvent(event);
});
const [ value ] = await once(et, 'myevent');
strictEqual(value, 42);
strictEqual(Reflect.has(et.events, 'myevent'), false);
}

async function onceWithEventTargetTwoArgs() {
const et = new EventTargetMock();

process.nextTick(() => {
et.dispatchEvent('myevent', 42, 24);
});

const value = await once(et, 'myevent');
deepStrictEqual(value, [42, 24]);
strictEqual(value, event);
}

async function onceWithEventTargetError() {
const et = new EventTargetMock();

const expected = new Error('kaboom');
const et = new EventTarget();
const error = new Event('error');
process.nextTick(() => {
et.dispatchEvent('error', expected);
et.dispatchEvent(error);
});

const [err] = await once(et, 'error');
strictEqual(err, expected);
strictEqual(Reflect.has(et.events, 'error'), false);
const [ err ] = await once(et, 'error');
strictEqual(err, error);
}

async function prioritizesEventEmitter() {
const ee = new EventEmitter();
ee.addEventListener = fail;
ee.removeAllListeners = fail;
process.nextTick(() => ee.emit('foo'));
await once(ee, 'foo');
}
Promise.all([
onceAnEvent(),
onceAnEventWithTwoArgs(),
catchesErrors(),
stopListeningAfterCatchingError(),
onceError(),
onceWithEventTarget(),
onceWithEventTargetTwoArgs(),
onceWithEventTargetError(),
prioritizesEventEmitter(),
]).then(common.mustCall());

0 comments on commit 4b3654e

Please sign in to comment.