Skip to content

Commit

Permalink
[api] First pass at Worker integration
Browse files Browse the repository at this point in the history
  • Loading branch information
mmalecki committed Dec 9, 2011
1 parent bbc23e2 commit 883e712
Showing 1 changed file with 23 additions and 108 deletions.
131 changes: 23 additions & 108 deletions lib/forever.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ var fs = require('fs'),
path = require('path'),
events = require('events'),
exec = require('child_process').exec,
net = require('net'),
cliff = require('cliff'),
daemon = require('daemon'),
nconf = require('nconf'),
portfinder = require('portfinder'),
timespan = require('timespan'),
Expand All @@ -21,6 +19,7 @@ var fs = require('fs'),
utile = require('utile'),
mkdirp = utile.mkdirp,
async = utile.async,
nssocket = require('nssocket'),
winston = require('winston');

var forever = exports;
Expand All @@ -44,6 +43,7 @@ forever.initialized = false;
forever.root = path.join(process.env.HOME || '/root', '.forever');
forever.config = new nconf.File({ file: path.join(forever.root, 'config.json') });
forever.Forever = forever.Monitor = require('./forever/monitor').Monitor;
forever.Worker = require('./forever/worker').Worker;
forever.cli = require('./forever/cli');

//
Expand Down Expand Up @@ -90,74 +90,32 @@ function getSockets(sockPath, callback) {
// Returns all data for processes managed by forever.
//
function getAllProcesses(callback) {
var sockPath = forever.config.get('sockPath'),
results = [];
var sockPath = forever.config.get('sockPath');

function getProcess(name, next) {
var fullPath = path.join(sockPath, name),
socket = new net.Socket({ type: 'unix' }),
parsed = false,
data = '';
socket = new nssocket.NsSocket();

function tryParse() {
if (!parsed) {
parsed = true;

var monitors;
try {
monitors = JSON.parse(data);
}
catch (ex) {
//
// Ignore errors
//
}

//
// Be a little lazier about loading results
//
if (monitors && monitors.monitors) {
results = results.concat(monitors.monitors);
}

next();
}
}

socket.on('error', function (err) {
if (err.code === 'ECONNREFUSED') {
try {
fs.unlinkSync(fullPath);
}
catch (ex) { }
return tryParse();
}
else if (err.code === 'EACCES') {
forever.log.warn('Error contacting: ' + fullPath.magenta);
}
else {
forever.log.error('Unknown error (' + err.code + ') when contacting: ' + fullPath.magenta);
socket.connect(fullPath, function (err) {
if (err) {
return next(err);
}

tryParse();
});

socket.on('data', function (msg) {
data += msg;
socket.data(['data'], function (data) {
next(null, data);
socket.end();
});
socket.send(['data']);
});

socket.on('close', tryParse);

socket.connect(fullPath);
}

getSockets(sockPath, function (err, sockets) {
if (err || (sockets && sockets.length === 0)) {
return callback(err);
}

async.forEach(sockets, getProcess, function () {
callback(results);
async.map(sockets, getProcess, function (err, processes) {
callback(processes);
});
});
}
Expand Down Expand Up @@ -339,29 +297,11 @@ forever.startDaemon = function (script, options) {
options = options || {};
options.uid = options.uid || utile.randomString(4).replace(/^\-/, '_');
options.logFile = forever.logFilePath(options.logFile || options.uid + '.log');
options.pidFile = forever.pidFilePath(options.pidFile || options.uid + '.pid');

var monitor = new forever.Monitor(script, options);
monitor.start();

fs.open(options.logFile, options.appendLog ? 'a+' : 'w+', function (err, fd) {
if (err) {
return monitor.emit('error', err);
}

var pid = daemon.start(fd);
daemon.lock(options.pidFile);

//
// Remark: This should work, but the fd gets screwed up
// with the daemon process.
//
// process.on('exit', function () {
// fs.unlinkSync(options.pidFile);
// });

process.pid = pid;
monitor.start();
});
var worker = new forever.Worker({ monitor: monitor });

return monitor;
};
Expand All @@ -374,10 +314,8 @@ forever.startDaemon = function (script, options) {
//
forever.startServer = function () {
var args = Array.prototype.slice.call(arguments),
socket = path.join(forever.config.get('sockPath'), 'forever.sock'),
monitors = [],
callback,
server;
callback;

args.forEach(function (a) {
if (Array.isArray(a)) {
Expand All @@ -393,36 +331,13 @@ forever.startServer = function () {
}
});

server = net.createServer(function (socket) {
//
// Write the specified data and close the socket
//
socket.end(JSON.stringify({
monitors: monitors.map(function (m) {
return m.data;
})
}));
});

function onError(err) {
monitors.forEach(function (mon) {
mon.emit('error', err);
});
}

portfinder.getSocket({ path: socket }, function (err, socket) {
if (err) {
return onError(err);
}

server.on('error', onError);

server.listen(socket, function () {
if (callback) {
callback(null, server, socket);
}
async.forEach(monitors, function (monitor, next) {
var worker = new forever.Worker({
monitor: monitor,
sockPath: forever.config.get('sockPath')
});
});
worker.start(next);
}, callback || function () {});
};


Expand Down

0 comments on commit 883e712

Please sign in to comment.