Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

looking for async.eachLimit equivalent? #79

Closed
dweinstein opened this issue May 14, 2014 · 13 comments
Closed

looking for async.eachLimit equivalent? #79

dweinstein opened this issue May 14, 2014 · 13 comments

Comments

@dweinstein
Copy link

What's the proper way to protect a limited resource (like number of open fd/sockets) in a map.

_(key_stream, 'string').map(function (key) {
   // operation that performs an HTTP request and returns a Promise
}).each(function (res) {
 res.then(console.log);
});

The use case here is for each key in an S3 bucket I'd like to perform an operation. I'm treating the key list as a stream of values and then I perform some request, like perhaps retrieving their headers as stored in the S3 bucket.

Is parallel the appropriate way to prevent the following error?

Possibly unhandled Error: connect EMFILE
    at errnoException (net.js:901:11)
    at connect (net.js:764:19)
    at net.js:842:9
    at asyncCallback (dns.js:68:16)
    at Object.onanswer [as oncomplete] (dns.js:121:9)

I think with async I would use async.eachLimit since the stuff inside the map returns right away and allows for the next socket to connect before the request is finished.

@dweinstein
Copy link
Author

So I was using the knox library and I had not realized they disabled the default http agent as described in this issue. Once I re-enabled the agent things worked fine. So this wasn't really a highland issue.

(hope this helps someone else out though)

@caolan
Copy link
Owner

caolan commented May 16, 2014

Just some background on this issue, and mapLimit equivalents in Highland:

In Highland .map has no opinion on parallelism. This was one of the main headaches in async, where you could not compose execution order and the thing you wanted to do, so we ended up with map, mapSeries, mapLimit. In Highland, it depends on how you read from the stream. So, by combining a .map() stream with a .series(), .parallel(n) or .merge() call you can affect it's execution order. You'll notice that .parallel() takes a limit argument, effectively making mapLimit the default parallel execution operation. It turns out unbounded parallelism wasn't a great idea as the default execution method.

As a rough guide:

.map().series() is equivalent to async.mapSeries()
.map().parallel(n) is equivalent to async.mapLimit(limit=n)
.map().merge() is equivalent to async.map()

Hope that helps :)

@dweinstein
Copy link
Author

So is this expected behavior with the .map().series() or .parallel(n) etc construct?

h([1,2,3,4]).map(function (x) { return x + 2; }).parallel(2).each(function (x) { console.log(x); })

> h([1,2,3,4]).map(function (x) { return x + 2; }).parallel(2).each(function (x) { console.log(x); })
TypeError: Object 3 has no method 'consume'
    at /Users/user/node_modules/highland/lib/index.js:2102:19
    at /Users/user/node_modules/highland/lib/index.js:1225:13
    at Stream.s._send (/Users/user/node_modules/highland/lib/index.js:992:9)
    at Stream.write (/Users/user/node_modules/highland/lib/index.js:1063:18)
    at Stream._send (/Users/user/node_modules/highland/lib/index.js:566:19)
    at push (/Users/user/node_modules/highland/lib/index.js:962:19)
    at /Users/user/node_modules/highland/lib/index.js:1747:13
    at Stream.s._send (/Users/user/node_modules/highland/lib/index.js:992:9)
    at Stream.write (/Users/user/node_modules/highland/lib/index.js:1063:18)
    at Stream._send (/Users/user/node_modules/highland/lib/index.js:566:19)

I mean it's a silly example but intuitively it seemed like it should work?

@dweinstein
Copy link
Author

@caolan so am I just doing it wrong?

@greelgorke
Copy link
Collaborator

@dweinstein parallel works with stream of streams. h([1,2,3,4]) is a stream of numbers. do you have streams of streams in your real example? anyway, this works:

h([1,2,3,4])
  .map(function(x){ return x+2 })
// here i map numbers to highlandstreams with a single value, the number in it
  .map(function(x){ return h([x]) })
  .parallel(2).each(h.log)

@caolan
Copy link
Owner

caolan commented May 23, 2014

What @greelgorke said :) - though we should probably make that a nicer error message! (pull requests welcome for that)

@greelgorke
Copy link
Collaborator

i could work on the error messages, after i've finished the other topics

@caolan
Copy link
Owner

caolan commented May 27, 2014

@greelgorke that would be great! :)

@greelgorke
Copy link
Collaborator

@caolan nicer error message would mean we throw if a value pulled in .parallel() isn't a highland stream? i'm not sure about that, because it happens at flow time. other option would be to push an error down the pipeline, like i would expect it.

@caolan
Copy link
Owner

caolan commented May 27, 2014

@greelgorke good point, errors that happen at flow time should be passed down the pipeline. We currently don't wrap iterator (eg, map, filter...) calls with try/catch in order to pass those sync errors down the pipeline but we probably should!

@greelgorke
Copy link
Collaborator

well my attempt is to check for the value being a highland stream (or feature-detect, but it's less reliable in our case) and to produce an error if it doesn't pass the check

@caolan
Copy link
Owner

caolan commented May 27, 2014

@greelgorke yes, that makes sense +1. I was talking about the more general case of 'flow-time' errors.

@greelgorke
Copy link
Collaborator

i'm stealing this discussion to #94 :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants