diff --git a/graylog.js b/graylog.js index 1758285..4348117 100644 --- a/graylog.js +++ b/graylog.js @@ -7,6 +7,43 @@ var assert = require('assert'); var MAX_SAFE_INT = 9007199254740991; // Number.MAX_SAFE_INTEGER +function Queue() { + this.first = null; + this.last = null; +} + + +Queue.prototype.append = function (obj) { + if (this.last) { + this.last.next = obj; + this.last = obj; + } else { + this.first = this.last = obj; + } +}; + + +Queue.prototype.getOne = function () { + var result = this.first; + + if (result) { + this.first = result.next; + result.next = null; + + if (result === this.last) { + this.last = null; + } + } + + return result; +}; + + +Queue.prototype.isEmpty = function () { + return this.last === null; +}; + + /** * Graylog instances emit errors. That means you really really should listen for them, * or accept uncaught exceptions (node throws if you don't listen for "error"). @@ -30,11 +67,17 @@ function Graylog(config) { // state this._serverIterator = 0; - this._discard = false; - this._isSending = false; - this._firstMessage = null; - this._lastMessage = null; this._headerPool = []; + this._isDeflating = false; + this._isSending = false; + + this.sendQueue = new Queue(); + this.deflateQueue = this.deflate === 'never' ? null : new Queue(); + this.alwaysDeflate = this.deflate === 'always'; + + // stats + this.sent = 0; + this.compressed = 0; } util.inherits(Graylog, EventEmitter); @@ -85,16 +128,17 @@ Graylog.prototype.getClient = function () { Graylog.prototype.destroy = function () { - this._discard = true; + this.sendQueue = null; + this.deflateQueue = null; + this._headerPool = []; + this._isDeflating = false; + this._isSending = false; if (this.client) { this.client.close(); this.client.removeAllListeners(); this.client = null; - this._firstMessage = null; - this._lastMessage = null; - this._headerPool = []; - } + } }; Graylog.prototype.emergency = function (short, full, fields, timestamp) { @@ -161,28 +205,6 @@ function serialize(hostname, facility, short, full, fields, timestamp, level) { } -Graylog.prototype._compressMessage = function (msg, cb) { - if ((msg.buff.length <= this._bufferSize && this.deflate === 'optimal') || this.deflate === 'never') { - return cb(); - } - - var that = this; - - zlib.deflate(msg.buff, function (error, compressed) { - if (error) { - that.emit('warning', error); - return cb(); - } - - if (compressed.length < msg.buff.length || that.deflate === 'always') { - msg.buff = compressed; - } - - return cb(); - }); -}; - - Graylog.prototype._getHeadersFromPool = function (n) { for (var i = this._headerPool.length; i < n; i += 1) { var header = this._headerPool[i] = new Buffer(12); @@ -236,91 +258,98 @@ Graylog.prototype._sendChunked = function (id, message, cb) { }; -Graylog.prototype._sendMessage = function (msg, cb) { - var message = msg.buff; - msg.buff = null; // help GC a bit +var count = 0; - if (message.length <= this._bufferSize) { - // No need to chunk this message +Graylog.prototype._tickDeflate = function () { + if (this._isDeflating || this.deflateQueue.isEmpty()) { + return; + } - var client = this.getClient(); - var server = this.getServer(); + this._isDeflating = true; - client.send(message, 0, message.length, server.port, server.host, cb); - return; - } + var that = this; + var msg = this.deflateQueue.getOne(); - var that = this; + function done() { + that._isDeflating = false; + that.sendQueue.append(msg); - // Generate a random ID as a buffer - crypto.randomBytes(8, function (error, id) { + that._tickSend(); + that._tickDeflate(); + } + + if (!this.alwaysDeflate && msg.buff.length <= this._bufferSize) { + process.nextTick(done); + return; + } + + zlib.deflate(msg.buff, function (error, compressed) { if (error) { - return cb(error); + that.emit('warning', error); + } else { + that.compressed += 1; + + if (that.alwaysDeflate || compressed.length < msg.buff.length) { + msg.buff = compressed; + } } - that._sendChunked(id, message, cb); + done(); }); }; -Graylog.prototype._send = function () { - if (this._isSending) { - // already sending - return true; - } - - if (!this._firstMessage) { - // nothing to send - this.emit('drain'); - return false; - } +Graylog.prototype._tickSend = function () { + if (this._isSending) { + return; + } - this._isSending = true; + if (this.sendQueue.isEmpty()) { + if (!this._isDeflating) { + this.emit('drain'); + } + return; + } - // pull off a message + this._isSending = true; - var msg = this._firstMessage; + var that = this; + var msg = this.sendQueue.getOne(); - if (msg.next) { - this._firstMessage = msg.next; - } else { - this._firstMessage = this._lastMessage = null; + function done() { + that.sent += 1; + that._isSending = false; + that._tickSend(); } - // send the message - - var that = this; + var buff = msg.buff; + msg.buff = null; // help GC a bit - function onSend(error) { - that._isSending = false; + if (buff.length <= this._bufferSize) { + // No need to chunk this message - if (error) { - that.emit('error', error); - return; - } + var client = this.getClient(); + var server = this.getServer(); - // because onSend is always called asynchronously, calling _send() immediately is safe - that._send(); + client.send(buff, 0, buff.length, server.port, server.host, done); + return; } - function onCompress(error) { + var that = this; + + // Generate a random ID (buffer) + crypto.randomBytes(8, function (error, id) { if (error) { - that._isSending = false; - that.emit('error', error); - return; + return cb(error); } - that._sendMessage(msg, onSend); - } - - this._compressMessage(msg, onCompress); - - return true; + that._sendChunked(id, buff, done); + }); }; Graylog.prototype._log = function log(short, full, fields, timestamp, level) { - if (this._discard) { + if (!this.sendQueue) { return; } @@ -329,16 +358,13 @@ Graylog.prototype._log = function log(short, full, fields, timestamp, level) { next: null }; - // append to queue - - if (this._lastMessage) { - this._lastMessage.next = message; - this._lastMessage = message; - } else { - this._firstMessage = this._lastMessage = message; - } - - return this._send(); + if (this.deflateQueue) { + this.deflateQueue.append(message); + this._tickDeflate(); + } else { + this.sendQueue.append(message); + this._tickSend(); + } }; @@ -347,7 +373,7 @@ Graylog.prototype.close = function (cb) { cb = function () {}; } - if (!this._isSending) { + if (!this._isSending && !this._isDeflating) { this.destroy(); return cb(); } @@ -355,7 +381,8 @@ Graylog.prototype.close = function (cb) { var that = this; this.once('drain', function () { - that.close(cb); + that.destroy(); + cb(); }); };