Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for multiple pMapSkip's #52

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 24 additions & 9 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export default async function pMap(

const result = [];
const errors = [];
const skippedIndexes = [];
const skippedIndexesMap = new Map();
let isRejected = false;
let isResolved = false;
let isIterableDone = false;
Expand Down Expand Up @@ -59,15 +59,29 @@ export default async function pMap(
if (resolvingCount === 0 && !isResolved) {
if (!stopOnError && errors.length > 0) {
reject(new AggregateError(errors));
} else {
isResolved = true;
return;
}

for (const skippedIndex of skippedIndexes) {
result.splice(skippedIndex, 1);
}
isResolved = true;

if (!skippedIndexesMap.size) {
resolve(result);
return;
}

const pureResult = [];

// Support multiple pMapSkips
// eslint-disable-next-line unicorn/no-for-loop
ferrinweb marked this conversation as resolved.
Show resolved Hide resolved
for (let index = 0; index < result.length; index++) {
if (skippedIndexesMap.get(index) === pMapSkip) {
continue;
}

pureResult.push(result[index]);
}

resolve(pureResult);
}

return;
Expand All @@ -86,12 +100,13 @@ export default async function pMap(

const value = await mapper(element, index);

// Use Map to stage the index of the element.
if (value === pMapSkip) {
skippedIndexes.push(index);
} else {
result[index] = value;
skippedIndexesMap.set(index, value);
}

result[index] = value;

resolvingCount--;
await next();
} catch (error) {
Expand Down
37 changes: 37 additions & 0 deletions test-multiple-pmapskips-performance.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import test from 'ava';
import inRange from 'in-range';
import timeSpan from 'time-span';
import pMap, {pMapSkip} from './index.js';

function generateSkipPerformanceData(length) {
const data = [];
for (let index = 0; index < length; index++) {
data.push(pMapSkip);
}

return data;
}

test('multiple pMapSkips - algorithmic complexity', async t => {
const testData = [generateSkipPerformanceData(1000), generateSkipPerformanceData(10000), generateSkipPerformanceData(100000)];
const testDurationsMS = [];

for (const data of testData) {
const end = timeSpan();
// eslint-disable-next-line no-await-in-loop
await pMap(data, async value => value);
testDurationsMS.push(end());
}

for (let index = 0; index < testDurationsMS.length - 1; index++) {
// Time for 10x more items should take between 9x and 11x more time
const smallerDuration = testDurationsMS[index];
const longerDuration = testDurationsMS[index + 1];

// The longer test needs to be a little longer and also not 10x more than the
// shorter test. This is not perfect... there is some fluctuation.
// The idea here is to catch a regression that makes pMapSkip handling O(n^2)
// on the number of pMapSkip items in the input
t.true(inRange(longerDuration, {start: 1.2 * smallerDuration, end: 15 * smallerDuration}));
}
});
44 changes: 44 additions & 0 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,28 @@ test('pMapSkip', async t => {
], async value => value), [1, 2]);
});

test('multiple pMapSkips', async t => {
ferrinweb marked this conversation as resolved.
Show resolved Hide resolved
t.deepEqual(await pMap([
1,
pMapSkip,
2,
pMapSkip,
3,
pMapSkip,
pMapSkip,
4
], async value => value), [1, 2, 3, 4]);
});

test('all pMapSkips', async t => {
t.deepEqual(await pMap([
pMapSkip,
pMapSkip,
pMapSkip,
pMapSkip
], async value => value), []);
});

test('all mappers should run when concurrency is infinite, even after stop-on-error happened', async t => {
const input = [1, async () => delay(300, {value: 2}), 3];
const mappedValues = [];
Expand Down Expand Up @@ -269,6 +291,28 @@ test('asyncIterator - pMapSkip', async t => {
]), async value => value), [1, 2]);
});

test('asyncIterator - multiple pMapSkips', async t => {
t.deepEqual(await pMap(new AsyncTestData([
1,
pMapSkip,
2,
pMapSkip,
3,
pMapSkip,
pMapSkip,
4
]), async value => value), [1, 2, 3, 4]);
});

test('asyncIterator - all pMapSkips', async t => {
t.deepEqual(await pMap(new AsyncTestData([
pMapSkip,
pMapSkip,
pMapSkip,
pMapSkip
]), async value => value), []);
});

test('asyncIterator - all mappers should run when concurrency is infinite, even after stop-on-error happened', async t => {
const input = [1, async () => delay(300, {value: 2}), 3];
const mappedValues = [];
Expand Down