Skip to content

Commit

Permalink
Refactoring 2
Browse files Browse the repository at this point in the history
  • Loading branch information
Ron Korving committed Jan 12, 2017
1 parent 9bf0cfc commit d01904f
Showing 1 changed file with 125 additions and 98 deletions.
223 changes: 125 additions & 98 deletions graylog.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,43 @@ var assert = require('assert');
var MAX_SAFE_INT = 9007199254740991; // Number.MAX_SAFE_INTEGER


function Queue() {
this.first = null;
this.last = null;
}


Queue.prototype.append = function (obj) {
if (this.last) {
this.last.next = obj;
this.last = obj;
} else {
this.first = this.last = obj;
}
};


Queue.prototype.getOne = function () {
var result = this.first;

if (result) {
this.first = result.next;
result.next = null;

if (result === this.last) {
this.last = null;
}
}

return result;
};


Queue.prototype.isEmpty = function () {
return this.last === null;
};


/**
* Graylog instances emit errors. That means you really really should listen for them,
* or accept uncaught exceptions (node throws if you don't listen for "error").
Expand All @@ -30,11 +67,17 @@ function Graylog(config) {

// state
this._serverIterator = 0;
this._discard = false;
this._isSending = false;
this._firstMessage = null;
this._lastMessage = null;
this._headerPool = [];
this._isDeflating = false;
this._isSending = false;

this.sendQueue = new Queue();
this.deflateQueue = this.deflate === 'never' ? null : new Queue();
this.alwaysDeflate = this.deflate === 'always';

// stats
this.sent = 0;
this.compressed = 0;
}

util.inherits(Graylog, EventEmitter);
Expand Down Expand Up @@ -85,16 +128,17 @@ Graylog.prototype.getClient = function () {


Graylog.prototype.destroy = function () {
this._discard = true;
this.sendQueue = null;
this.deflateQueue = null;
this._headerPool = [];
this._isDeflating = false;
this._isSending = false;

if (this.client) {
this.client.close();
this.client.removeAllListeners();
this.client = null;
this._firstMessage = null;
this._lastMessage = null;
this._headerPool = [];
}
}
};

Graylog.prototype.emergency = function (short, full, fields, timestamp) {
Expand Down Expand Up @@ -161,28 +205,6 @@ function serialize(hostname, facility, short, full, fields, timestamp, level) {
}


Graylog.prototype._compressMessage = function (msg, cb) {
if ((msg.buff.length <= this._bufferSize && this.deflate === 'optimal') || this.deflate === 'never') {
return cb();
}

var that = this;

zlib.deflate(msg.buff, function (error, compressed) {
if (error) {
that.emit('warning', error);
return cb();
}

if (compressed.length < msg.buff.length || that.deflate === 'always') {
msg.buff = compressed;
}

return cb();
});
};


Graylog.prototype._getHeadersFromPool = function (n) {
for (var i = this._headerPool.length; i < n; i += 1) {
var header = this._headerPool[i] = new Buffer(12);
Expand Down Expand Up @@ -236,91 +258,98 @@ Graylog.prototype._sendChunked = function (id, message, cb) {
};


Graylog.prototype._sendMessage = function (msg, cb) {
var message = msg.buff;
msg.buff = null; // help GC a bit
var count = 0;

if (message.length <= this._bufferSize) {
// No need to chunk this message
Graylog.prototype._tickDeflate = function () {
if (this._isDeflating || this.deflateQueue.isEmpty()) {
return;
}

var client = this.getClient();
var server = this.getServer();
this._isDeflating = true;

client.send(message, 0, message.length, server.port, server.host, cb);
return;
}
var that = this;
var msg = this.deflateQueue.getOne();

var that = this;
function done() {
that._isDeflating = false;
that.sendQueue.append(msg);

// Generate a random ID as a buffer
crypto.randomBytes(8, function (error, id) {
that._tickSend();
that._tickDeflate();
}

if (!this.alwaysDeflate && msg.buff.length <= this._bufferSize) {
process.nextTick(done);
return;
}

zlib.deflate(msg.buff, function (error, compressed) {
if (error) {
return cb(error);
that.emit('warning', error);
} else {
that.compressed += 1;

if (that.alwaysDeflate || compressed.length < msg.buff.length) {
msg.buff = compressed;
}
}

that._sendChunked(id, message, cb);
done();
});
};


Graylog.prototype._send = function () {
if (this._isSending) {
// already sending
return true;
}

if (!this._firstMessage) {
// nothing to send
this.emit('drain');
return false;
}
Graylog.prototype._tickSend = function () {
if (this._isSending) {
return;
}

this._isSending = true;
if (this.sendQueue.isEmpty()) {
if (!this._isDeflating) {
this.emit('drain');
}
return;
}

// pull off a message
this._isSending = true;

var msg = this._firstMessage;
var that = this;
var msg = this.sendQueue.getOne();

if (msg.next) {
this._firstMessage = msg.next;
} else {
this._firstMessage = this._lastMessage = null;
function done() {
that.sent += 1;
that._isSending = false;
that._tickSend();
}

// send the message

var that = this;
var buff = msg.buff;
msg.buff = null; // help GC a bit

function onSend(error) {
that._isSending = false;
if (buff.length <= this._bufferSize) {
// No need to chunk this message

if (error) {
that.emit('error', error);
return;
}
var client = this.getClient();
var server = this.getServer();

// because onSend is always called asynchronously, calling _send() immediately is safe
that._send();
client.send(buff, 0, buff.length, server.port, server.host, done);
return;
}

function onCompress(error) {
var that = this;

// Generate a random ID (buffer)
crypto.randomBytes(8, function (error, id) {
if (error) {
that._isSending = false;
that.emit('error', error);
return;
return cb(error);
}

that._sendMessage(msg, onSend);
}

this._compressMessage(msg, onCompress);

return true;
that._sendChunked(id, buff, done);
});
};


Graylog.prototype._log = function log(short, full, fields, timestamp, level) {
if (this._discard) {
if (!this.sendQueue) {
return;
}

Expand All @@ -329,16 +358,13 @@ Graylog.prototype._log = function log(short, full, fields, timestamp, level) {
next: null
};

// append to queue

if (this._lastMessage) {
this._lastMessage.next = message;
this._lastMessage = message;
} else {
this._firstMessage = this._lastMessage = message;
}

return this._send();
if (this.deflateQueue) {
this.deflateQueue.append(message);
this._tickDeflate();
} else {
this.sendQueue.append(message);
this._tickSend();
}
};


Expand All @@ -347,15 +373,16 @@ Graylog.prototype.close = function (cb) {
cb = function () {};
}

if (!this._isSending) {
if (!this._isSending && !this._isDeflating) {
this.destroy();
return cb();
}

var that = this;

this.once('drain', function () {
that.close(cb);
that.destroy();
cb();
});
};

Expand Down

0 comments on commit d01904f

Please sign in to comment.