Skip to content

Commit

Permalink
Merge pull request #939 from stephenplusplus/spp--operation-ee
Browse files Browse the repository at this point in the history
compute: turn operations into event emitters
  • Loading branch information
callmehiphop committed Nov 17, 2015
2 parents 9922213 + 7dffcea commit 1184f57
Show file tree
Hide file tree
Showing 3 changed files with 325 additions and 176 deletions.
141 changes: 80 additions & 61 deletions lib/compute/operation.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@

'use strict';

var extend = require('extend');
var is = require('is');
var nodeutil = require('util');
var events = require('events');
var modelo = require('modelo');

/**
* @type {module:common/serviceObject}
Expand Down Expand Up @@ -79,6 +78,30 @@ var util = require('../common/util.js');
* //-
* var zone = gce.zone('us-central1-a');
* var operation = zone.operation('operation-id');
*
* //-
* // All operations are event emitters. The status of each operation is polled
* // continuously, starting only after you register a "complete" listener.
* //-
* operation.on('complete', function(metadata) {
* // The operation is complete.
* });
*
* //-
* // Be sure to register an error handler as well to catch any issues which
* // impeded the operation.
* //-
* operation.on('error', function(err) {
* // An error occurred during the operation.
* });
*
* //-
* // To force the Operation object to stop polling for updates, simply remove
* // any "complete" listeners you've registered.
* //
* // The easiest way to do this is with `removeAllListeners()`.
* //-
* operation.removeAllListeners();
*/
function Operation(scope, name) {
var isCompute = scope.constructor.name === 'Compute';
Expand Down Expand Up @@ -132,10 +155,16 @@ function Operation(scope, name) {
methods: methods
});

events.EventEmitter.call(this);

this.completeListeners = 0;
this.hasActiveListeners = false;
this.name = name;

this.listenForEvents_();
}

nodeutil.inherits(Operation, ServiceObject);
modelo.inherits(Operation, ServiceObject, events.EventEmitter);

/**
* Get the operation's metadata. For a detailed description of metadata see
Expand Down Expand Up @@ -167,9 +196,9 @@ Operation.prototype.getMetadata = function(callback) {
// this callback. We have to make sure this isn't a false error by seeing if
// the response body contains a property that wouldn't exist on a failed API
// request (`name`).
var isActualError = err && (!apiResponse || apiResponse.name !== self.name);
var requestFailed = err && (!apiResponse || apiResponse.name !== self.name);

if (isActualError) {
if (requestFailed) {
callback(err, null, apiResponse);
return;
}
Expand All @@ -181,80 +210,70 @@ Operation.prototype.getMetadata = function(callback) {
};

/**
* Register a callback for when the operation is complete.
* Begin listening for events on the operation. This method keeps track of how
* many "complete" listeners are registered and removed, making sure polling is
* handled automatically.
*
* If the operation doesn't complete after the maximum number of attempts have
* been made (see `options.maxAttempts` and `options.interval`), an error will
* be provided to your callback with code: `OPERATION_INCOMPLETE`.
* As long as there is one active "complete" listener, the connection is open.
* When there are no more listeners, the polling stops.
*
* @param {object=} options - Configuration object.
* @param {number} options.maxAttempts - Maximum number of attempts to make an
* API request to check if the operation is complete. (Default: `10`)
* @param {number} options.interval - Amount of time in milliseconds between
* each request. (Default: `3000`)
* @param {function} callback - The callback function.
* @param {?error} callback.err - An error returned while making this request.
* @param {object} callback.metadata - The operation's metadata.
*
* @example
* operation.onComplete(function(err, metadata) {
* if (err.code === 'OPERATION_INCOMPLETE') {
* // The operation is not complete yet. You may want to register another
* // `onComplete` listener or queue for later.
* }
*
* if (!err) {
* // Operation complete!
* }
* });
* @private
*/
Operation.prototype.onComplete = function(options, callback) {
Operation.prototype.listenForEvents_ = function() {
var self = this;

if (is.fn(options)) {
callback = options;
options = {};
}

options = extend({
maxAttempts: 10,
interval: 3000
}, options);

var didNotCompleteError = new Error('Operation did not complete.');
didNotCompleteError.code = 'OPERATION_INCOMPLETE';

var numAttempts = 0;
this.on('newListener', function(event) {
if (event === 'complete') {
self.completeListeners++;

function checkMetadata() {
numAttempts++;
if (!self.hasActiveListeners) {
self.hasActiveListeners = true;
self.startPolling_();
}
}
});

if (numAttempts > options.maxAttempts) {
callback(didNotCompleteError, self.metadata);
return;
this.on('removeListener', function(event) {
if (event === 'complete' && --self.completeListeners === 0) {
self.hasActiveListeners = false;
}
});
};

setTimeout(function() {
self.getMetadata(onMetadata);
}, options.interval);
/**
* Poll `getMetadata` to check the operation's status. This runs a loop to ping
* the API on an interval.
*
* Note: This method is automatically called once a "complete" event handler is
* registered on the operation.
*
* @private
*/
Operation.prototype.startPolling_ = function() {
var self = this;

if (!this.hasActiveListeners) {
return;
}

function onMetadata(err, metadata) {
this.getMetadata(function(err, metadata, apiResponse) {
// Parsing the response body will automatically create an ApiError object if
// the operation failed.
var parsedHttpRespBody = util.parseHttpRespBody(apiResponse);
err = err || parsedHttpRespBody.err;

if (err) {
callback(err, metadata);
self.emit('error', err);
return;
}

if (metadata.status !== 'DONE') {
checkMetadata();
setTimeout(self.startPolling_.bind(self), 500);
return;
}

// The operation is complete.
callback(null, metadata);
}

checkMetadata();
self.emit('complete', metadata);
});
};

module.exports = Operation;
87 changes: 61 additions & 26 deletions system-test/compute.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@ describe('Compute', function() {
before(function(done) {
address.create(function(err, disk, operation) {
assert.ifError(err);
operation.onComplete(done);

operation
.on('error', done)
.on('complete', function() {
done();
});
});
});

Expand Down Expand Up @@ -110,7 +115,12 @@ describe('Compute', function() {

disk.create(config, function(err, disk, operation) {
assert.ifError(err);
operation.onComplete(done);

operation
.on('error', done)
.on('complete', function() {
done();
});
});
});

Expand Down Expand Up @@ -154,7 +164,12 @@ describe('Compute', function() {

disk.snapshot(generateName()).create(function(err, snapshot, operation) {
assert.ifError(err);
operation.onComplete(getOperationOptions(MAX_TIME_ALLOWED), done);

operation
.on('error', done)
.on('complete', function() {
done();
});
});
});
});
Expand Down Expand Up @@ -192,7 +207,12 @@ describe('Compute', function() {

firewall.create(CONFIG, function(err, firewall, operation) {
assert.ifError(err);
operation.onComplete(getOperationOptions(MAX_TIME_ALLOWED), done);

operation
.on('error', done)
.on('complete', function() {
done();
});
});
});

Expand Down Expand Up @@ -239,7 +259,12 @@ describe('Compute', function() {
before(function(done) {
network.create(CONFIG, function(err, network, operation) {
assert.ifError(err);
operation.onComplete(done);

operation
.on('error', done)
.on('complete', function() {
done();
});
});
});

Expand Down Expand Up @@ -402,7 +427,12 @@ describe('Compute', function() {

vm.create(config, function(err, vm, operation) {
assert.ifError(err);
operation.onComplete(done);

operation
.on('error', done)
.on('complete', function() {
done();
});
});
});

Expand All @@ -420,7 +450,11 @@ describe('Compute', function() {
return;
}

operation.onComplete(getOperationOptions(MAX_TIME_ALLOWED), done);
operation
.on('error', done)
.on('complete', function() {
done();
});
});
});

Expand Down Expand Up @@ -460,6 +494,7 @@ describe('Compute', function() {

it('should attach and detach a disk', function(done) {
var name = generateName();
var disk = zone.disk(name);

// This test waits on a lot of operations.
this.timeout(90000);
Expand All @@ -468,22 +503,29 @@ describe('Compute', function() {
createDisk,
attachDisk,
detachDisk
], done);
], function(err) {
if (err) {
done(err);
return;
}

disk.delete(execAfterOperationComplete(done));
});

function createDisk(callback) {
var config = {
os: 'ubuntu'
};

zone.createDisk(name, config, execAfterOperationComplete(callback));
disk.create(config, execAfterOperationComplete(callback));
}

function attachDisk(callback) {
vm.attachDisk(zone.disk(name), execAfterOperationComplete(callback));
vm.attachDisk(disk, execAfterOperationComplete(callback));
}

function detachDisk(callback) {
vm.detachDisk(zone.disk(name), execAfterOperationComplete(callback));
vm.detachDisk(disk, execAfterOperationComplete(callback));
}
});

Expand Down Expand Up @@ -523,8 +565,7 @@ describe('Compute', function() {
var MAX_TIME_ALLOWED = 90000 * 2;
this.timeout(MAX_TIME_ALLOWED);

var options = getOperationOptions(MAX_TIME_ALLOWED);
vm.stop(execAfterOperationComplete(options, done));
vm.stop(execAfterOperationComplete(done));
});
});

Expand Down Expand Up @@ -624,26 +665,20 @@ describe('Compute', function() {
});
}

function getOperationOptions(maxTimeAllowed) {
var interval = 10000;

return {
maxAttempts: maxTimeAllowed / interval,
interval: interval
};
}

function execAfterOperationComplete(options, callback) {
callback = callback || options;

function execAfterOperationComplete(callback) {
return function(err) {
if (err) {
callback(err);
return;
}

var operation = arguments[arguments.length - 2]; // [..., op, apiResponse]
operation.onComplete(options || {}, callback);

operation
.on('error', callback)
.on('complete', function() {
callback();
});
};
}
});
Loading

0 comments on commit 1184f57

Please sign in to comment.