Skip to content

Commit

Permalink
Support Iterables (#2)
Browse files Browse the repository at this point in the history
Co-authored-by: Sindre Sorhus <[email protected]>
  • Loading branch information
Richienb and sindresorhus authored Jun 29, 2022
1 parent de08673 commit f73a5ab
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 50 deletions.
26 changes: 25 additions & 1 deletion index.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {Asyncify} from 'type-fest';
import {Asyncify, SetReturnType} from 'type-fest';

type AnyFunction = (...arguments_: any) => unknown;

Expand All @@ -22,3 +22,27 @@ console.log(await fn(2));
```
*/
export default function makeAsynchronous<T extends AnyFunction>(function_: T): Asyncify<T>;

type IterableFunctionValue<T> = T extends ((...arguments_: any) => AsyncIterable<infer Value> | Iterable<infer Value>) ? Value : unknown;

/**
Make the iterable returned by a function asynchronous by running it in a worker.
Returns a wrapped version of the given function which executes asynchronously in a background thread (meaning it will not block the main thread).
The given function is serialized, so you cannot use any variables or imports from outside the function scope. You can instead pass in arguments to the function.
@example
```
import {makeAsynchronousIterable} from 'make-asynchronous';
const fn = makeAsynchronousIterable(function * () {
yield * performExpensiveOperation(number);
});
for await (const number of fn(2)) {
console.log(number);
}
```
*/
export function makeAsynchronousIterable<T extends (...arguments_: any) => AsyncIterable<unknown> | Iterable<unknown>>(function_: T): SetReturnType<T, AsyncIterable<IterableFunctionValue<T>>>;
145 changes: 98 additions & 47 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,78 +1,129 @@
import Worker from 'web-worker';
import {pEvent} from 'p-event';

const isNode = Boolean(globalThis.process?.versions?.node);

const makeBlob = content => new globalThis.Blob([content], {type: 'text/javascript'});

// TODO: Remove this when targeting Node.js 18 (`Blob` global is supported) and if https://github.com/developit/web-worker/issues/30 is fixed.
const makeDataUrl = content => {
const data = globalThis.Buffer.from(content).toString('base64');
return `data:text/javascript;base64,${data}`;
};

function createWorker(content) {
let url;
let worker;

const cleanup = () => {
if (url) {
URL.revokeObjectURL(url);
}

worker?.terminate();
};

if (isNode) {
worker = new Worker(makeDataUrl(content), {type: 'module'});
} else {
url = URL.createObjectURL(makeBlob(content));
worker = new Worker(url, {type: 'module'});
}

return {
worker,
cleanup,
};
}

const makeContent = function_ =>
`
globalThis.onmessage = async event => {
globalThis.onmessage = async ({data: arguments_}) => {
try {
const output = await (${function_.toString()})(...event.data);
const output = await (${function_.toString()})(...arguments_);
globalThis.postMessage({output});
} catch (error) {
globalThis.postMessage({error});
}
};
`;

const makeBlob = function_ => new globalThis.Blob([makeContent(function_)], {type: 'text/javascript'});
export default function makeAsynchronous(function_) {
return async (...arguments_) => {
const {worker, cleanup} = createWorker(makeContent(function_));

// TODO: Remove this when targeting Node.js 18 (`Blob` global is supported) and if https://github.com/developit/web-worker/issues/30 is fixed.
const makeDataUrl = function_ => {
const data = globalThis.Buffer.from(makeContent(function_)).toString('base64');
return `data:text/javascript;base64,${data}`;
};
try {
const promise = pEvent(worker, 'message', {
rejectionEvents: ['error', 'messageerror'],
});

export default function makeAsynchronous(function_) {
return (...arguments_) => new Promise((resolve, reject) => {
let url;
let worker;
worker.postMessage(arguments_);

const cleanup = () => {
if (url) {
URL.revokeObjectURL(url);
}
const {data: {output, error}} = await promise;

worker?.terminate();
};
if (error) {
throw error;
}

const failure = error => {
return output;
} finally {
cleanup();
reject(error);
};
}
};
}

const makeIterableContent = function_ =>
`
const nothing = Symbol('nothing');
let iterator = nothing;
globalThis.onmessage = async ({data: arguments_}) => {
try {
if (isNode) {
worker = new Worker(makeDataUrl(function_), {type: 'module'});
} else {
url = URL.createObjectURL(makeBlob(function_));
worker = new Worker(url, {type: 'module'});
if (iterator === nothing) {
iterator = await (${function_.toString()})(...arguments_);
}
const output = await iterator.next();
globalThis.postMessage({output});
} catch (error) {
failure(error);
return;
globalThis.postMessage({error});
}
};
`;

worker.addEventListener('message', ({data}) => {
if (data.error) {
failure(data.error);
} else {
cleanup();
resolve(data.output);
}
});
export function makeAsynchronousIterable(function_) {
return (...arguments_) => ({
async * [Symbol.asyncIterator]() {
const {worker, cleanup} = createWorker(makeIterableContent(function_));

worker.addEventListener('messageerror', error => {
failure(error);
});
try {
let isFirstMessage = true;

worker.addEventListener('error', error => {
failure(error);
});
while (true) {
const promise = pEvent(worker, 'message', {
rejectionEvents: ['messageerror', 'error'],
});

try {
worker.postMessage(arguments_);
} catch (error) {
failure(error);
}
worker.postMessage(isFirstMessage ? arguments_ : undefined);
isFirstMessage = false;

const {data: {output, error}} = await promise; // eslint-disable-line no-await-in-loop

if (error) {
throw error;
}

const {value, done} = output;

if (done) {
break;
}

yield value;
}
} finally {
cleanup();
}
},
});
}
10 changes: 9 additions & 1 deletion index.test-d.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
import {expectType} from 'tsd';
import makeAsynchronous from './index.js';
import makeAsynchronous, {makeAsynchronousIterable} from './index.js';

const fn = makeAsynchronous((number: number) => number * 2); // eslint-disable-line @typescript-eslint/no-unsafe-assignment

expectType<Promise<number>>(fn(2));

const fn2 = makeAsynchronousIterable(function * () { // eslint-disable-line @typescript-eslint/no-unsafe-assignment
for (let number = 1; ; number++) {
yield number;
}
});

expectType<AsyncIterable<number>>(fn2());
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"pool"
],
"dependencies": {
"p-event": "^5.0.1",
"type-fest": "^2.14.0",
"web-worker": "^1.2.0"
},
Expand Down
16 changes: 16 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,22 @@ Returns a wrapped version of the given function which executes asynchronously in

The given function is serialized, so you cannot use any variables or imports from outside the function scope. You can instead pass in arguments to the function.

### makeAsynchronousIterable(function)

Make the iterable returned by a function asynchronous by running it in a worker.

```js
import {makeAsynchronousIterable} from 'make-asynchronous';

const fn = makeAsynchronousIterable(function * () {
yield * performExpensiveOperation(number);
});

for await (const number of fn(2)) {
console.log(number);
}
```

## Related

- [make-synchronous](https://github.com/sindresorhus/make-synchronous) - Make an asynchronous function synchronous
Expand Down
58 changes: 57 additions & 1 deletion test.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import test from 'ava';
import timeSpan from 'time-span';
import inRange from 'in-range';
import makeAsynchronous from './index.js';
import makeAsynchronous, {makeAsynchronousIterable} from './index.js';

test('main', async t => {
const fixture = {x: '🦄'};
Expand Down Expand Up @@ -44,3 +44,59 @@ test.failing('dynamic import works', async t => {
})(),
);
});

test('iterator object', async t => {
const fixture = [1, 2];

const asyncIterable = makeAsynchronousIterable(fixture => fixture[Symbol.iterator]())(fixture);
const result = [];

for await (const value of asyncIterable) {
result.push(value);
}

t.deepEqual(result, fixture);
});

test('generator function', async t => {
const fixture = [1, 2];

const asyncIterable = makeAsynchronousIterable(function * (fixture) {
for (const value of fixture) {
yield value;
}
})(fixture);

const result = [];

for await (const value of asyncIterable) {
result.push(value);
}

t.deepEqual(result, fixture);
});

test('generator function that throws', async t => {
const fixture = [1, 2];
const errorMessage = 'Catch me if you can!';

const asyncIterable = makeAsynchronousIterable(function * (fixture, errorMessage) {
for (const value of fixture) {
yield value;
}

throw new Error(errorMessage);
})(fixture, errorMessage);

const result = [];

await t.throwsAsync(async () => {
for await (const value of asyncIterable) {
result.push(value);
}
}, {
message: errorMessage,
}, 'error is propagated');

t.deepEqual(result, fixture);
});

0 comments on commit f73a5ab

Please sign in to comment.