diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index e8bff3d46b..f5708b8197 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -1,4 +1,5 @@ 'use strict'; +const path = require('path'); const assert = require('assert'); const { Transform, PassThrough } = require('stream'); const { MongoNetworkError } = require('../../src/error'); @@ -1454,8 +1455,7 @@ describe('Change Streams', function () { } }); - // TODO: resuming currently broken on piped change streams, fix as part of NODE-2172 - it.skip('should resume piping of Change Streams when a resumable error is encountered', { + it('should resume piping of Change Streams when a resumable error is encountered', { metadata: { requires: { generators: true, @@ -1464,11 +1464,10 @@ describe('Change Streams', function () { } }, test: function (done) { + const filename = path.join(__dirname, '_nodemongodbnative_resumepipe.txt'); + this.defer(() => fs.unlinkSync(filename)); const configuration = this.configuration; - // Contain mock server - let primaryServer = null; - // Default message fields const defaultFields = { setName: 'rs', @@ -1484,9 +1483,8 @@ describe('Change Streams', function () { hosts: ['localhost:32000', 'localhost:32001', 'localhost:32002'] }; - co(function* () { - primaryServer = yield mock.createServer(); - + mock.createServer(32000, 'localhost').then(primaryServer => { + this.defer(() => mock.cleanup()); let counter = 0; primaryServer.setMessageHandler(request => { const doc = request.document; @@ -1572,31 +1570,26 @@ describe('Change Streams', function () { client.connect((err, client) => { expect(err).to.not.exist; + this.defer(() => client.close()); const database = client.db('integration_tests5'); const collection = database.collection('MongoNetworkErrorTestPromises'); const changeStream = collection.watch(pipeline); - const filename = '/tmp/_nodemongodbnative_resumepipe.txt'; const outStream = fs.createWriteStream(filename); changeStream.stream({ transform: JSON.stringify }).pipe(outStream); - + this.defer(() => changeStream.close()); // Listen for changes to the file - const watcher = fs.watch(filename, function (eventType) { - assert.equal(eventType, 'change'); + const watcher = fs.watch(filename, eventType => { + this.defer(() => watcher.close()); + expect(eventType).to.equal('change'); const fileContents = fs.readFileSync(filename, 'utf8'); const parsedFileContents = JSON.parse(fileContents); - assert.equal(parsedFileContents.fullDocument.a, 1); - - watcher.close(); + expect(parsedFileContents).to.have.nested.property('fullDocument.a', 1); - changeStream.close(err => { - expect(err).to.not.exist; - - mock.cleanup(() => done()); - }); + done(); }); }); });