diff --git a/index.js b/index.js index 60ab4ca..39bfe04 100644 --- a/index.js +++ b/index.js @@ -155,6 +155,7 @@ function normalizeParams (params, callback) { } const compression = params.headers['Content-Encoding'] === 'gzip' + const type = params.headers['Content-Type'] || '' if (isStream(params.body)) { normalized.body = '' @@ -163,7 +164,9 @@ function normalizeParams (params, callback) { stream.on('error', err => callback(err, null)) stream.on('data', chunk => { normalized.body += chunk }) stream.on('end', () => { - normalized.body = JSON.parse(normalized.body) + normalized.body = type.includes('x-ndjson') + ? normalized.body.split(/\n|\n\r/).filter(Boolean).map(l => JSON.parse(l)) + : JSON.parse(normalized.body) callback(null, normalized) }) } else if (params.body) { @@ -173,11 +176,16 @@ function normalizeParams (params, callback) { if (err) { return callback(err, null) } - normalized.body = JSON.parse(buffer) + buffer = buffer.toString() + normalized.body = type.includes('x-ndjson') + ? buffer.split(/\n|\n\r/).filter(Boolean).map(l => JSON.parse(l)) + : JSON.parse(buffer) callback(null, normalized) }) } else { - normalized.body = JSON.parse(params.body) + normalized.body = type.includes('x-ndjson') + ? params.body.split(/\n|\n\r/).filter(Boolean).map(l => JSON.parse(l)) + : JSON.parse(params.body) setImmediate(callback, null, normalized) } } else { diff --git a/test.js b/test.js index f4773b5..bf30a74 100644 --- a/test.js +++ b/test.js @@ -664,3 +664,117 @@ test('Define multiple paths and method at once', async t => { t.deepEqual(response.body, { status: 'ok' }) t.is(response.statusCode, 200) }) + +test('ndjson API support', async t => { + const mock = new Mock() + const client = new Client({ + node: 'http://localhost:9200', + Connection: mock.getConnection() + }) + + mock.add({ + method: 'POST', + path: '/_bulk' + }, params => { + t.deepEqual(params.body, [ + { foo: 'bar' }, + { baz: 'fa\nz' } + ]) + return { status: 'ok' } + }) + + const response = await client.bulk({ + body: [ + { foo: 'bar' }, + { baz: 'fa\nz' } + ] + }) + t.deepEqual(response.body, { status: 'ok' }) + t.is(response.statusCode, 200) +}) + +test('ndjson API support (with compression)', async t => { + const mock = new Mock() + const client = new Client({ + node: 'http://localhost:9200', + Connection: mock.getConnection(), + compression: 'gzip' + }) + + mock.add({ + method: 'POST', + path: '/_bulk' + }, params => { + t.deepEqual(params.body, [ + { foo: 'bar' }, + { baz: 'fa\nz' } + ]) + return { status: 'ok' } + }) + + const response = await client.bulk({ + body: [ + { foo: 'bar' }, + { baz: 'fa\nz' } + ] + }) + t.deepEqual(response.body, { status: 'ok' }) + t.is(response.statusCode, 200) +}) + +test('ndjson API support (as stream)', async t => { + const mock = new Mock() + const client = new Client({ + node: 'http://localhost:9200', + Connection: mock.getConnection() + }) + + mock.add({ + method: 'POST', + path: '/_bulk' + }, params => { + t.deepEqual(params.body, [ + { foo: 'bar' }, + { baz: 'fa\nz' } + ]) + return { status: 'ok' } + }) + + const response = await client.bulk({ + body: intoStream(client.serializer.ndserialize([ + { foo: 'bar' }, + { baz: 'fa\nz' } + ])) + }) + t.deepEqual(response.body, { status: 'ok' }) + t.is(response.statusCode, 200) +}) + +test('ndjson API support (as stream with compression)', async t => { + const mock = new Mock() + const client = new Client({ + node: 'http://localhost:9200', + Connection: mock.getConnection(), + compression: 'gzip' + }) + + mock.add({ + method: 'POST', + path: '/_bulk' + }, params => { + t.deepEqual(params.body, [ + { foo: 'bar' }, + { baz: 'fa\nz' } + ]) + return { status: 'ok' } + }) + + const response = await client.bulk({ + body: intoStream(client.serializer.ndserialize([ + { foo: 'bar' }, + { baz: 'fa\nz' } + ])) + }) + t.deepEqual(response.body, { status: 'ok' }) + t.is(response.statusCode, 200) +})