From 137f53c7b72ff9cb36694d058136344076661f4a Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Fri, 29 Jan 2016 14:18:27 +0100 Subject: [PATCH] dgram: support dgram.send with multiple buffers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Added ability to dgram.send to send multiple buffers, _writev style. The offset and length parameters in dgram.send are now optional. Refactored the dgram benchmarks, and seperated them from net. Added docs for the new signature. Reviewed-By: James M Snell Reviewed-By: Saúl Ibarra Corretgé Fixes: https://github.com/nodejs/node/issues/4302 PR-URL: https://github.com/nodejs/node/pull/4374 --- Makefile | 5 +- benchmark/dgram/array-vs-concat.js | 80 +++++++++ benchmark/dgram/multi-buffer.js | 70 ++++++++ .../{net/dgram.js => dgram/offset-length.js} | 7 +- benchmark/dgram/single-buffer.js | 62 +++++++ doc/api/dgram.markdown | 26 ++- lib/dgram.js | 170 +++++++++++------- src/udp_wrap.cc | 64 +++++-- test/parallel/test-dgram-oob-buffer.js | 20 --- .../parallel/test-dgram-send-bad-arguments.js | 11 +- .../test-dgram-send-callback-buffer.js | 21 +++ .../test-dgram-send-callback-multi-buffer.js | 36 ++++ 12 files changed, 455 insertions(+), 117 deletions(-) create mode 100644 benchmark/dgram/array-vs-concat.js create mode 100644 benchmark/dgram/multi-buffer.js rename benchmark/{net/dgram.js => dgram/offset-length.js} (88%) create mode 100644 benchmark/dgram/single-buffer.js create mode 100644 test/parallel/test-dgram-send-callback-buffer.js create mode 100644 test/parallel/test-dgram-send-callback-multi-buffer.js diff --git a/Makefile b/Makefile index 5bfb80119c3951..d43b092cc4f52a 100644 --- a/Makefile +++ b/Makefile @@ -501,7 +501,10 @@ bench-events: all bench-util: all @$(NODE) benchmark/common.js util -bench-all: bench bench-misc bench-array bench-buffer bench-url bench-events +bench-dgram: all + @$(NODE) benchmark/common.js dgram + +bench-all: bench bench-misc bench-array bench-buffer bench-url bench-events bench-dgram bench-util bench: bench-net bench-http bench-fs bench-tls diff --git a/benchmark/dgram/array-vs-concat.js b/benchmark/dgram/array-vs-concat.js new file mode 100644 index 00000000000000..0be7c70e0859da --- /dev/null +++ b/benchmark/dgram/array-vs-concat.js @@ -0,0 +1,80 @@ +// test UDP send throughput with the multi buffer API against Buffer.concat +'use strict'; + +const common = require('../common.js'); +const PORT = common.PORT; + +// `num` is the number of send requests to queue up each time. +// Keep it reasonably high (>10) otherwise you're benchmarking the speed of +// event loop cycles more than anything else. +var bench = common.createBenchmark(main, { + len: [64, 256, 512, 1024], + num: [100], + chunks: [1, 2, 4, 8], + type: ['concat', 'multi'], + dur: [5] +}); + +var dur; +var len; +var num; +var type; +var chunk; +var chunks; +var encoding; + +function main(conf) { + dur = +conf.dur; + len = +conf.len; + num = +conf.num; + type = conf.type; + chunks = +conf.chunks; + + chunk = [] + for (var i = 0; i < chunks; i++) { + chunk.push(new Buffer(Math.round(len / chunks))); + } + + server(); +} + +var dgram = require('dgram'); + +function server() { + var sent = 0; + var received = 0; + var socket = dgram.createSocket('udp4'); + + var onsend = type === 'concat' ? onsendConcat : onsendMulti; + + function onsendConcat() { + if (sent++ % num == 0) + for (var i = 0; i < num; i++) { + socket.send(Buffer.concat(chunk), PORT, '127.0.0.1', onsend); + } + } + + function onsendMulti() { + if (sent++ % num == 0) + for (var i = 0; i < num; i++) { + socket.send(chunk, PORT, '127.0.0.1', onsend); + } + } + + socket.on('listening', function() { + bench.start(); + onsend(); + + setTimeout(function() { + var bytes = sent * len; + var gbits = (bytes * 8) / (1024 * 1024 * 1024); + bench.end(gbits); + }, dur * 1000); + }); + + socket.on('message', function(buf, rinfo) { + received++; + }); + + socket.bind(PORT); +} diff --git a/benchmark/dgram/multi-buffer.js b/benchmark/dgram/multi-buffer.js new file mode 100644 index 00000000000000..37fb5d1a8ec219 --- /dev/null +++ b/benchmark/dgram/multi-buffer.js @@ -0,0 +1,70 @@ +// test UDP send/recv throughput with the multi buffer API +'use strict'; + +const common = require('../common.js'); +const PORT = common.PORT; + +// `num` is the number of send requests to queue up each time. +// Keep it reasonably high (>10) otherwise you're benchmarking the speed of +// event loop cycles more than anything else. +var bench = common.createBenchmark(main, { + len: [64, 256, 1024], + num: [100], + chunks: [1, 2, 4, 8], + type: ['send', 'recv'], + dur: [5] +}); + +var dur; +var len; +var num; +var type; +var chunk; +var chunks; +var encoding; + +function main(conf) { + dur = +conf.dur; + len = +conf.len; + num = +conf.num; + type = conf.type; + chunks = +conf.chunks; + + chunk = [] + for (var i = 0; i < chunks; i++) { + chunk.push(new Buffer(Math.round(len / chunks))); + } + + server(); +} + +var dgram = require('dgram'); + +function server() { + var sent = 0; + var received = 0; + var socket = dgram.createSocket('udp4'); + + function onsend() { + if (sent++ % num == 0) + for (var i = 0; i < num; i++) + socket.send(chunk, PORT, '127.0.0.1', onsend); + } + + socket.on('listening', function() { + bench.start(); + onsend(); + + setTimeout(function() { + var bytes = (type === 'send' ? sent : received) * len; + var gbits = (bytes * 8) / (1024 * 1024 * 1024); + bench.end(gbits); + }, dur * 1000); + }); + + socket.on('message', function(buf, rinfo) { + received++; + }); + + socket.bind(PORT); +} diff --git a/benchmark/net/dgram.js b/benchmark/dgram/offset-length.js similarity index 88% rename from benchmark/net/dgram.js rename to benchmark/dgram/offset-length.js index 6a0c5501c62c8e..4c3b11b58e6637 100644 --- a/benchmark/net/dgram.js +++ b/benchmark/dgram/offset-length.js @@ -1,7 +1,8 @@ -// test UDP send/recv throughput +// test UDP send/recv throughput with the "old" offset/length API +'use strict'; -var common = require('../common.js'); -var PORT = common.PORT; +const common = require('../common.js'); +const PORT = common.PORT; // `num` is the number of send requests to queue up each time. // Keep it reasonably high (>10) otherwise you're benchmarking the speed of diff --git a/benchmark/dgram/single-buffer.js b/benchmark/dgram/single-buffer.js new file mode 100644 index 00000000000000..d44ea4fd83ffbf --- /dev/null +++ b/benchmark/dgram/single-buffer.js @@ -0,0 +1,62 @@ +// test UDP send/recv throughput with the new single Buffer API +'use strict'; + +const common = require('../common.js'); +const PORT = common.PORT; + +// `num` is the number of send requests to queue up each time. +// Keep it reasonably high (>10) otherwise you're benchmarking the speed of +// event loop cycles more than anything else. +var bench = common.createBenchmark(main, { + len: [1, 64, 256, 1024], + num: [100], + type: ['send', 'recv'], + dur: [5] +}); + +var dur; +var len; +var num; +var type; +var chunk; +var encoding; + +function main(conf) { + dur = +conf.dur; + len = +conf.len; + num = +conf.num; + type = conf.type; + chunk = new Buffer(len); + server(); +} + +var dgram = require('dgram'); + +function server() { + var sent = 0; + var received = 0; + var socket = dgram.createSocket('udp4'); + + function onsend() { + if (sent++ % num == 0) + for (var i = 0; i < num; i++) + socket.send(chunk, PORT, '127.0.0.1', onsend); + } + + socket.on('listening', function() { + bench.start(); + onsend(); + + setTimeout(function() { + var bytes = (type === 'send' ? sent : received) * chunk.length; + var gbits = (bytes * 8) / (1024 * 1024 * 1024); + bench.end(gbits); + }, dur * 1000); + }); + + socket.on('message', function(buf, rinfo) { + received++; + }); + + socket.bind(PORT); +} diff --git a/doc/api/dgram.markdown b/doc/api/dgram.markdown index 7818e508d24524..8408321c384e07 100644 --- a/doc/api/dgram.markdown +++ b/doc/api/dgram.markdown @@ -185,9 +185,10 @@ never have reason to call this. If `multicastInterface` is not specified, the operating system will attempt to drop membership on all valid interfaces. -### socket.send(buf, offset, length, port, address[, callback]) +### socket.send(buf, [offset, length,] port, address[, callback]) -* `buf` Buffer object or string. Message to be sent +* `buf` Buffer object, string, or an array of either. Message to be + sent. * `offset` Integer. Offset in the buffer where the message starts. * `length` Integer. Number of bytes in the message. * `port` Integer. Destination port. @@ -224,17 +225,36 @@ The only way to know for sure that the datagram has been sent is by using a passed as the first argument to the `callback`. If a `callback` is not given, the error is emitted as an `'error'` event on the `socket` object. +Offset and length are optional, but if you specify one you would need to +specify the other. Also, they are supported only when the first +argument is a `Buffer`. + Example of sending a UDP packet to a random port on `localhost`; ```js const dgram = require('dgram'); const message = new Buffer('Some bytes'); const client = dgram.createSocket('udp4'); -client.send(message, 0, message.length, 41234, 'localhost', (err) => { +client.send(message, 41234, 'localhost', (err) => { + client.close(); +}); +``` + +Example of sending a UDP packet composed of multiple buffers to a random port on `localhost`; + +```js +const dgram = require('dgram'); +const buf1 = new Buffer('Some '); +const buf2 = new Buffer('bytes'); +const client = dgram.createSocket('udp4'); +client.send([buf1, buf2], 41234, 'localhost', (err) => { client.close(); }); ``` +Sending multiple buffers might be faster or slower depending on your +application and operating system: benchmark it. Usually it is faster. + **A Note about UDP datagram size** The maximum size of an `IPv4/v6` datagram depends on the `MTU` diff --git a/lib/dgram.js b/lib/dgram.js index c070475a224d31..c87afe5044613a 100644 --- a/lib/dgram.js +++ b/lib/dgram.js @@ -243,6 +243,49 @@ Socket.prototype.sendto = function(buffer, }; +function sliceBuffer(buffer, offset, length) { + if (typeof buffer === 'string') + buffer = new Buffer(buffer); + else if (!(buffer instanceof Buffer)) + throw new TypeError('First argument must be a buffer or string'); + + offset = offset >>> 0; + length = length >>> 0; + + return buffer.slice(offset, offset + length); +} + + +function fixBuffer(buffer) { + for (var i = 0, l = buffer.length; i < l; i++) { + var buf = buffer[i]; + if (typeof buf === 'string') + buffer[i] = new Buffer(buf); + else if (!(buf instanceof Buffer)) + return false; + } + + return true; +} + + +function enqueue(self, toEnqueue) { + // If the send queue hasn't been initialized yet, do it, and install an + // event handler that flushes the send queue after binding is done. + if (!self._sendQueue) { + self._sendQueue = []; + self.once('listening', function() { + // Flush the send queue. + for (var i = 0; i < this._sendQueue.length; i++) + this.send.apply(self, this._sendQueue[i]); + this._sendQueue = undefined; + }); + } + self._sendQueue.push(toEnqueue); + return; +} + + Socket.prototype.send = function(buffer, offset, length, @@ -251,30 +294,29 @@ Socket.prototype.send = function(buffer, callback) { var self = this; - if (typeof buffer === 'string') - buffer = new Buffer(buffer); - else if (!(buffer instanceof Buffer)) - throw new TypeError('First argument must be a buffer or string'); - - offset = offset | 0; - if (offset < 0) - throw new RangeError('Offset should be >= 0'); - - if ((length == 0 && offset > buffer.length) || - (length > 0 && offset >= buffer.length)) - throw new RangeError('Offset into buffer is too large'); - - // Sending a zero-length datagram is kind of pointless but it _is_ - // allowed, hence check that length >= 0 rather than > 0. - length = length | 0; - if (length < 0) - throw new RangeError('Length should be >= 0'); + // same as arguments.length === 5 || arguments.length === 6 + if (address) { + buffer = sliceBuffer(buffer, offset, length); + } else { + callback = port; + port = offset; + address = length; + } - if (offset + length > buffer.length) - throw new RangeError('Offset + length beyond buffer length'); + if (!Array.isArray(buffer)) { + if (typeof buffer === 'string') { + buffer = [ new Buffer(buffer) ]; + } else if (!(buffer instanceof Buffer)) { + throw new TypeError('First argument must be a buffer or a string'); + } else { + buffer = [ buffer ]; + } + } else if (!fixBuffer(buffer)) { + throw new TypeError('Buffer list arguments must be buffers or strings'); + } - port = port | 0; - if (port <= 0 || port > 65535) + port = port >>> 0; + if (port === 0 || port > 65535) throw new RangeError('Port should be > 0 and < 65536'); // Normalize callback so it's either a function or undefined but not anything @@ -290,61 +332,55 @@ Socket.prototype.send = function(buffer, // If the socket hasn't been bound yet, push the outbound packet onto the // send queue and send after binding is complete. if (self._bindState != BIND_STATE_BOUND) { - // If the send queue hasn't been initialized yet, do it, and install an - // event handler that flushes the send queue after binding is done. - if (!self._sendQueue) { - self._sendQueue = []; - self.once('listening', function() { - // Flush the send queue. - for (var i = 0; i < self._sendQueue.length; i++) - self.send.apply(self, self._sendQueue[i]); - self._sendQueue = undefined; - }); - } - self._sendQueue.push([buffer, offset, length, port, address, callback]); + enqueue(self, [buffer, port, address, callback]); return; } - self._handle.lookup(address, function(ex, ip) { - if (ex) { - if (typeof callback === 'function') { - callback(ex); - return; - } - - self.emit('error', ex); - } else if (self._handle) { - var req = new SendWrap(); - req.buffer = buffer; // Keep reference alive. - req.length = length; - req.address = address; - req.port = port; - if (callback) { - req.callback = callback; - req.oncomplete = afterSend; - } - var err = self._handle.send(req, - buffer, - offset, - length, - port, - ip, - !!callback); - if (err && callback) { - // don't emit as error, dgram_legacy.js compatibility - var ex = exceptionWithHostPort(err, 'send', address, port); - process.nextTick(callback, ex); - } - } + self._handle.lookup(address, function afterDns(ex, ip) { + doSend(ex, self, ip, buffer, address, port, callback); }); }; -function afterSend(err) { +function doSend(ex, self, ip, buffer, address, port, callback) { + if (ex) { + if (typeof callback === 'function') { + callback(ex); + return; + } + + self.emit('error', ex); + return; + } else if (!self._handle) { + return; + } + + var req = new SendWrap(); + req.buffer = buffer; // Keep reference alive. + req.address = address; + req.port = port; + if (callback) { + req.callback = callback; + req.oncomplete = afterSend; + } + var err = self._handle.send(req, + buffer, + buffer.length, + port, + ip, + !!callback); + if (err && callback) { + // don't emit as error, dgram_legacy.js compatibility + var ex = exceptionWithHostPort(err, 'send', address, port); + process.nextTick(callback, ex); + } +} + +function afterSend(err, sent) { if (err) { err = exceptionWithHostPort(err, 'send', this.address, this.port); } - this.callback(err, this.length); + this.callback(err, sent); } diff --git a/src/udp_wrap.cc b/src/udp_wrap.cc index cb678f14fb3826..1faf45a95daf33 100644 --- a/src/udp_wrap.cc +++ b/src/udp_wrap.cc @@ -13,6 +13,7 @@ namespace node { +using v8::Array; using v8::Context; using v8::EscapableHandleScope; using v8::External; @@ -35,6 +36,7 @@ class SendWrap : public ReqWrap { public: SendWrap(Environment* env, Local req_wrap_obj, bool have_callback); inline bool have_callback() const; + size_t msg_size; size_t self_size() const override { return sizeof(*this); } private: const bool have_callback_; @@ -243,29 +245,46 @@ void UDPWrap::DoSend(const FunctionCallbackInfo& args, int family) { UDPWrap* wrap = Unwrap(args.Holder()); - // send(req, buffer, offset, length, port, address) + // send(req, buffer, port, address, hasCallback) CHECK(args[0]->IsObject()); - CHECK(Buffer::HasInstance(args[1])); + CHECK(args[1]->IsArray()); CHECK(args[2]->IsUint32()); CHECK(args[3]->IsUint32()); - CHECK(args[4]->IsUint32()); - CHECK(args[5]->IsString()); - CHECK(args[6]->IsBoolean()); + CHECK(args[4]->IsString()); + CHECK(args[5]->IsBoolean()); Local req_wrap_obj = args[0].As(); - Local buffer_obj = args[1].As(); - size_t offset = args[2]->Uint32Value(); - size_t length = args[3]->Uint32Value(); - const unsigned short port = args[4]->Uint32Value(); - node::Utf8Value address(env->isolate(), args[5]); - const bool have_callback = args[6]->IsTrue(); - - CHECK_LE(length, Buffer::Length(buffer_obj) - offset); + Local chunks = args[1].As(); + // it is faster to fetch the length of the + // array in js-land + size_t count = args[2]->Uint32Value(); + const unsigned short port = args[3]->Uint32Value(); + node::Utf8Value address(env->isolate(), args[4]); + const bool have_callback = args[5]->IsTrue(); SendWrap* req_wrap = new SendWrap(env, req_wrap_obj, have_callback); + size_t msg_size = 0; + + // allocate uv_buf_t of the correct size + // if bigger than 16 elements + uv_buf_t bufs_[16]; + uv_buf_t* bufs = bufs_; + + if (ARRAY_SIZE(bufs_) < count) + bufs = new uv_buf_t[count]; + + // construct uv_buf_t array + for (size_t i = 0; i < count; i++) { + Local chunk = chunks->Get(i); + + size_t length = Buffer::Length(chunk); + + bufs[i] = uv_buf_init(Buffer::Data(chunk), length); + msg_size += length; + } + + req_wrap->msg_size = msg_size; - uv_buf_t buf = uv_buf_init(Buffer::Data(buffer_obj) + offset, - length); char addr[sizeof(sockaddr_in6)]; int err; @@ -284,12 +303,16 @@ void UDPWrap::DoSend(const FunctionCallbackInfo& args, int family) { if (err == 0) { err = uv_udp_send(&req_wrap->req_, &wrap->handle_, - &buf, - 1, + bufs, + count, reinterpret_cast(&addr), OnSend); } + // Deallocate space + if (bufs != bufs_) + delete[] bufs; + req_wrap->Dispatched(); if (err) delete req_wrap; @@ -332,8 +355,11 @@ void UDPWrap::OnSend(uv_udp_send_t* req, int status) { Environment* env = req_wrap->env(); HandleScope handle_scope(env->isolate()); Context::Scope context_scope(env->context()); - Local arg = Integer::New(env->isolate(), status); - req_wrap->MakeCallback(env->oncomplete_string(), 1, &arg); + Local arg[] = { + Integer::New(env->isolate(), status), + Integer::New(env->isolate(), req_wrap->msg_size), + }; + req_wrap->MakeCallback(env->oncomplete_string(), 2, arg); } delete req_wrap; } diff --git a/test/parallel/test-dgram-oob-buffer.js b/test/parallel/test-dgram-oob-buffer.js index 6d0626fc2d40db..88a28a757be2c0 100644 --- a/test/parallel/test-dgram-oob-buffer.js +++ b/test/parallel/test-dgram-oob-buffer.js @@ -4,7 +4,6 @@ // recvfrom(). Node should not propagate this error to the user. var common = require('../common'); -var assert = require('assert'); var dgram = require('dgram'); var socket = dgram.createSocket('udp4'); @@ -18,23 +17,4 @@ socket.send(buf, 3, 1, common.PORT, '127.0.0.1', ok); // Since length of zero means nothing, don't error despite OOB. socket.send(buf, 4, 0, common.PORT, '127.0.0.1', ok); -assert.throws(function() { - socket.send(buf, 0, 5, common.PORT, '127.0.0.1', common.fail); -}); -assert.throws(function() { - socket.send(buf, 2, 3, common.PORT, '127.0.0.1', common.fail); -}); -assert.throws(function() { - socket.send(buf, 4, 4, common.PORT, '127.0.0.1', common.fail); -}); -assert.throws(function() { - socket.send('abc', 4, 1, common.PORT, '127.0.0.1', common.fail); -}); -assert.throws(function() { - socket.send('abc', 0, 4, common.PORT, '127.0.0.1', common.fail); -}); -assert.throws(function() { - socket.send('abc', -1, 2, common.PORT, '127.0.0.1', common.fail); -}); - socket.close(); // FIXME should not be necessary diff --git a/test/parallel/test-dgram-send-bad-arguments.js b/test/parallel/test-dgram-send-bad-arguments.js index ddaa162c8bff10..080ac1d127061a 100644 --- a/test/parallel/test-dgram-send-bad-arguments.js +++ b/test/parallel/test-dgram-send-bad-arguments.js @@ -11,10 +11,13 @@ assert.throws(function() { sock.send(); }, TypeError); // First argument should be a buffer. -assert.throws(function() { sock.send(buf, -1, 1, 1, host); }, RangeError); -assert.throws(function() { sock.send(buf, 1, -1, 1, host); }, RangeError); +// send(buf, offset, length, port, host) assert.throws(function() { sock.send(buf, 1, 1, -1, host); }, RangeError); -assert.throws(function() { sock.send(buf, 5, 1, 1, host); }, RangeError); -assert.throws(function() { sock.send(buf, 1, 5, 1, host); }, RangeError); assert.throws(function() { sock.send(buf, 1, 1, 0, host); }, RangeError); assert.throws(function() { sock.send(buf, 1, 1, 65536, host); }, RangeError); + +// send(buf, port, host) +assert.throws(function() { sock.send(23, 12345, host); }, TypeError); + +// send([buf1, ..], port, host) +assert.throws(function() { sock.send([buf, 23], 12345, host); }, TypeError); diff --git a/test/parallel/test-dgram-send-callback-buffer.js b/test/parallel/test-dgram-send-callback-buffer.js new file mode 100644 index 00000000000000..1aea2f77ef806c --- /dev/null +++ b/test/parallel/test-dgram-send-callback-buffer.js @@ -0,0 +1,21 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const dgram = require('dgram'); + +const client = dgram.createSocket('udp4'); + +const buf = new Buffer(256); + +const onMessage = common.mustCall(function(err, bytes) { + assert.equal(bytes, buf.length); + clearTimeout(timer); + client.close(); +}); + +const timer = setTimeout(function() { + throw new Error('Timeout'); +}, common.platformTimeout(200)); + +client.send(buf, common.PORT, common.localhostIPv4, onMessage); diff --git a/test/parallel/test-dgram-send-callback-multi-buffer.js b/test/parallel/test-dgram-send-callback-multi-buffer.js new file mode 100644 index 00000000000000..d3e276cfcd3618 --- /dev/null +++ b/test/parallel/test-dgram-send-callback-multi-buffer.js @@ -0,0 +1,36 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const dgram = require('dgram'); + +const client = dgram.createSocket('udp4'); + +const timer = setTimeout(function() { + throw new Error('Timeout'); +}, common.platformTimeout(200)); + +const onMessage = common.mustCall(function(err, bytes) { + assert.equal(bytes, buf1.length + buf2.length); + clearTimeout(timer); + client.close(); +}); + +const buf1 = new Buffer(256); + +const buf2 = new Buffer(256); + +buf1.fill('x'); +buf2.fill('y'); + +client.on('listening', function() { + client.send([buf1, buf2], common.PORT, common.localhostIPv4, onMessage); +}); + +client.on('message', function(buf, info) { + const expected = Buffer.concat([buf1, buf2]); + assert.ok(buf.equals(expected), 'message was received correctly'); + client.close(); +}); + +client.bind(common.PORT);