Skip to content

Commit

Permalink
Merge pull request #310 from stephenplusplus/spp--storage-download-in…
Browse files Browse the repository at this point in the history
…tegrity

storage: download integrity check
  • Loading branch information
stephenplusplus committed Dec 10, 2014
2 parents dc4bf20 + c7c6e60 commit 23d399c
Show file tree
Hide file tree
Showing 3 changed files with 253 additions and 39 deletions.
149 changes: 130 additions & 19 deletions lib/storage/file.js
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,19 @@ File.prototype.copy = function(destination, callback) {
* piped to a writable stream or listened to for 'data' events to read a file's
* contents.
*
* In the unlikely event there is a mismatch between what you downloaded and the
* version in your Bucket, your error handler will receive an error with code
* "CONTENT_DOWNLOAD_MISMATCH". If you receive this error, the best recourse is
* to try downloading the file again.
*
* @param {object=} options - Configuration object.
* @param {string|boolean} options.validation - Possible values: `"md5"`,
* `"crc32c"`, or `false`. By default, data integrity is validated with an
* MD5 checksum for maximum reliability, falling back to CRC32c when an MD5
* hash wasn't returned from the API. CRC32c will provide better performance
* with less reliability. You may also choose to skip validation completely,
* however this is **not recommended**.
*
* @example
* //-
* // <h4>Downloading a File</h4>
Expand All @@ -226,35 +239,133 @@ File.prototype.copy = function(destination, callback) {
* .pipe(fs.createWriteStream('/Users/stephen/Photos/image.png'))
* .on('error', function(err) {});
*/
File.prototype.createReadStream = function() {
var storage = this.bucket.storage;
var dup = duplexify();
function createAuthorizedReq(uri) {
var reqOpts = { uri: uri };
storage.makeAuthorizedRequest_(reqOpts, {
onAuthorized: function(err, authorizedReqOpts) {
if (err) {
dup.emit('error', err);
dup.end();
return;
}
dup.setReadable(request(authorizedReqOpts));
}
});
File.prototype.createReadStream = function(options) {
options = options || {};

var that = this;
var throughStream = through();

var validations = ['crc32c', 'md5'];
var validation;

if (util.is(options.validation, 'string')) {
options.validation = options.validation.toLowerCase();

if (validations.indexOf(options.validation) > -1) {
validation = options.validation;
} else {
validation = 'all';
}
}

if (util.is(options.validation, 'undefined')) {
validation = 'all';
}

var crc32c = validation === 'crc32c' || validation === 'all';
var md5 = validation === 'md5' || validation === 'all';

if (this.metadata.mediaLink) {
createAuthorizedReq(this.metadata.mediaLink);
} else {
this.getMetadata(function(err, metadata) {
if (err) {
dup.emit('error', err);
dup.end();
throughStream.emit('error', err);
throughStream.end();
return;
}

createAuthorizedReq(metadata.mediaLink);
});
}
return dup;

return throughStream;

// Authenticate the request, then pipe the remote API request to the stream
// returned to the user.
function createAuthorizedReq(uri) {
var reqOpts = {
uri: uri
};

that.bucket.storage.makeAuthorizedRequest_(reqOpts, {
onAuthorized: function(err, authorizedReqOpts) {
if (err) {
throughStream.emit('error', err);
throughStream.end();
return;
}

// For data integrity, hash the contents of the stream as we receive it
// from the server.
var localCrc32cHash;
var localMd5Hash = crypto.createHash('md5');

request(authorizedReqOpts)
.on('error', function(err) {
throughStream.emit('error', err);
throughStream.end();
})

.on('data', function(chunk) {
if (crc32c) {
localCrc32cHash = crc.calculate(chunk, localCrc32cHash);
}

if (md5) {
localMd5Hash.update(chunk);
}
})

.on('complete', function(res) {
var failed = false;
var crcFail = true;
var md5Fail = true;

var hashes = {};
res.headers['x-goog-hash'].split(',').forEach(function(hash) {
var hashType = hash.split('=')[0];
hashes[hashType] = hash.substr(hash.indexOf('=') + 1);
});

var remoteMd5 = hashes.md5;
var remoteCrc = hashes.crc32c && hashes.crc32c.substr(4);

if (crc32c) {
crcFail =
new Buffer([localCrc32cHash]).toString('base64') !== remoteCrc;
failed = crcFail;
}

if (md5) {
md5Fail = localMd5Hash.digest('base64') !== remoteMd5;
failed = md5Fail;
}

if (validation === 'all') {
failed = remoteMd5 ? md5Fail : crcFail;
}

if (failed) {
var error = new Error([
'The downloaded data did not match the data from the server.',
'To be sure the content is the same, you should download the',
'file again.'
].join(' '));
error.code = 'CONTENT_DOWNLOAD_MISMATCH';

throughStream.emit('error', error);
} else {
throughStream.emit('complete');
}

throughStream.end();
})

.pipe(throughStream);
}
});
}
};

/**
Expand Down Expand Up @@ -688,7 +799,7 @@ File.prototype.startResumableUpload_ = function(stream, metadata) {
method: 'PUT',
uri: resumableUri
}, {
onAuthorized: function (err, reqOpts) {
onAuthorized: function(err, reqOpts) {
if (err) {
handleError(err);
return;
Expand Down
11 changes: 8 additions & 3 deletions regression/storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -322,12 +322,17 @@ describe('storage', function() {

writeStream.on('error', done);
writeStream.on('complete', function() {
var data = new Buffer('');

file.createReadStream()
.on('error', done)
.on('data', function(chunk) {
assert.equal(String(chunk), contents);
data = Buffer.concat([data, chunk]);
})
.on('error', done)
.on('end', done);
.on('complete', function() {
assert.equal(data.toString(), contents);
done();
});
});
});

Expand Down
132 changes: 115 additions & 17 deletions test/storage/file.js
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,15 @@ describe('File', function() {
});

it('should create an authorized request', function(done) {
request_Override = function(opts) {
file.bucket.storage.makeAuthorizedRequest_ = function(opts) {
assert.equal(opts.uri, metadata.mediaLink);
done();
};

file.getMetadata = function(callback) {
callback(null, metadata);
};

file.createReadStream();
});

Expand All @@ -292,33 +294,129 @@ describe('File', function() {

it('should get readable stream from request', function(done) {
var fakeRequest = { a: 'b', c: 'd' };
file.getMetadata = function(callback) {
callback(null, metadata);
};

// Faking a stream implementation so we can simulate an actual Request
// request. The only thing we want to know is if the data passed to
// request was correct.
request_Override = function(req) {
if (!(this instanceof request_Override)) {
return new request_Override(req);
}

stream.Readable.call(this);
this._read = util.noop;

assert.deepEqual(req, fakeRequest);
done();
};
file.bucket.storage.makeAuthorizedRequest_ = function(opts, callback) {
(callback.onAuthorized || callback)(null, fakeRequest);
};
file.createReadStream();
});
nodeutil.inherits(request_Override, stream.Readable);

it('should set readable stream', function() {
var dup = duplexify();
file.getMetadata = function(callback) {
callback(null, metadata);
};
request_Override = function() {
return dup;
};

file.bucket.storage.makeAuthorizedRequest_ = function(opts, callback) {
(callback.onAuthorized || callback)();
(callback.onAuthorized || callback)(null, fakeRequest);
};

file.createReadStream();
assert.deepEqual(readableStream, dup);
readableStream = null;
});

describe('validation', function() {
var data = 'test';

var crc32cBase64 = new Buffer([crc.calculate(data)]).toString('base64');

var md5HashBase64 = crypto.createHash('md5');
md5HashBase64.update(data);
md5HashBase64 = md5HashBase64.digest('base64');

var fakeResponse = {
crc32c: {
headers: { 'x-goog-hash': 'crc32c=####' + crc32cBase64 }
},
md5: {
headers: { 'x-goog-hash': 'md5=' + md5HashBase64 }
}
};

function getFakeRequest(data, fakeResponse) {
function FakeRequest(req) {
if (!(this instanceof FakeRequest)) {
return new FakeRequest(req);
}

var that = this;

stream.Readable.call(this);
this._read = function() {
this.push(data);
this.push(null);
};

setImmediate(function() {
that.emit('complete', fakeResponse);
});
}
nodeutil.inherits(FakeRequest, stream.Readable);
return FakeRequest;
}

beforeEach(function() {
file.metadata.mediaLink = 'http://uri';

file.bucket.storage.makeAuthorizedRequest_ = function(opts, callback) {
(callback.onAuthorized || callback)(null, {});
};
});

it('should validate with crc32c', function(done) {
request_Override = getFakeRequest(data, fakeResponse.crc32c);

file.createReadStream({ validation: 'crc32c' })
.on('error', done)
.on('complete', done);
});

it('should emit an error if crc32c validation fails', function(done) {
request_Override = getFakeRequest('bad-data', fakeResponse.crc32c);

file.createReadStream({ validation: 'crc32c' })
.on('error', function(err) {
assert.equal(err.code, 'CONTENT_DOWNLOAD_MISMATCH');
done();
});
});

it('should validate with md5', function(done) {
request_Override = getFakeRequest(data, fakeResponse.md5);

file.createReadStream({ validation: 'md5' })
.on('error', done)
.on('complete', done);
});

it('should emit an error if md5 validation fails', function(done) {
request_Override = getFakeRequest('bad-data', fakeResponse.crc32c);

file.createReadStream({ validation: 'md5' })
.on('error', function(err) {
assert.equal(err.code, 'CONTENT_DOWNLOAD_MISMATCH');
done();
});
});

it('should default to md5 validation', function(done) {
request_Override = getFakeRequest(data, {
headers: { 'x-goog-hash': 'md5=fakefakefake' }
});

file.createReadStream()
.on('error', function(err) {
assert.equal(err.code, 'CONTENT_DOWNLOAD_MISMATCH');
done();
});
});
});
});

Expand Down

0 comments on commit 23d399c

Please sign in to comment.