diff --git a/.gitignore b/.gitignore index 3c3629e..eb79dd5 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ node_modules +.idea diff --git a/cmd/get-args.js b/cmd/get-args.js index 20c5d10..ea4c6b4 100644 --- a/cmd/get-args.js +++ b/cmd/get-args.js @@ -11,5 +11,6 @@ module.exports = () => yargs yargs .number('batch') .number('timeout') + .boolean('update') ) .argv; diff --git a/cmd/init-cmd.js b/cmd/init-cmd.js index 8abd7d1..2caafe4 100644 --- a/cmd/init-cmd.js +++ b/cmd/init-cmd.js @@ -19,7 +19,7 @@ function cmdPutDocs(args) { return util.readStdin() .map(JSON.parse) - .through(_.partialRight(putDocs, args._[1], { batch: args.batch, requestTimeout: args.timeout })) + .through(_.partialRight(putDocs, args._[1], { batch: args.batch, requestTimeout: args.timeout, update: args.update })) .map(JSON.stringify); } diff --git a/lib/put-docs.js b/lib/put-docs.js index ad82126..9941109 100644 --- a/lib/put-docs.js +++ b/lib/put-docs.js @@ -9,20 +9,26 @@ const _ = require('lodash'), * Transform an array of docs into an array of bulk actions. * @param {Object[]} docs * @param {string} index Index each doc will be put into + * @param {boolean} update Indicates if the bulk operation is index or update * @return {Object[]} */ -function docsToBulkActions(docs, index) { +function docsToBulkActions(docs, index, update = false) { return docs.reduce((acc, doc) => { + const reqOptions = { _index: index || doc._index, _type: doc._type || DEFAULT_TYPE, _id: doc._id }; + + if (update) { + acc.push( + { update: reqOptions, doc_as_upsert: true}, + { doc: doc._source } + ); + return acc; + } + acc.push( - { - index: { - _index: index || doc._index, - _type: doc._type || DEFAULT_TYPE, - _id: doc._id - } - }, + { index: reqOptions }, doc._source ); + return acc; }, []); } @@ -34,15 +40,16 @@ function docsToBulkActions(docs, index) { * @param {string} elastic Elastic endpoint e.g. localhost:9200/foo * @param {Object} [options] * @param {number} [options.batch] Number of items to include in each bulk req + * @param {boolean} [options.update] Indicates if the bulk operation is index or update * @return {Stream} of bulk results */ -function putDocs(stream, elastic, {batch = DEFAULT_BATCH_SIZE, requestTimeout} = {}) { +function putDocs(stream, elastic, {batch = DEFAULT_BATCH_SIZE, requestTimeout, update = false} = {}) { const {host, index} = util.parseElastic(elastic), client = util.getEsClient({host, requestTimeout}); return stream .batch(batch) - .map(_.partialRight(docsToBulkActions, index)) + .map(_.partialRight(docsToBulkActions, index, update)) .flatMap(batchActions => h(client.bulk({body: batchActions}))) .flatMap(results => h(results.items)); } diff --git a/lib/put-docs.test.js b/lib/put-docs.test.js index 4a00ee4..191f172 100644 --- a/lib/put-docs.test.js +++ b/lib/put-docs.test.js @@ -8,7 +8,7 @@ const fn = require('./put-docs'), _ = require('lodash'), { DEFAULT_BATCH_SIZE } = require('./constants'); -describe('get-docs', function () { +describe('put-docs', function () { let sandbox, esClientStub, mockDocs, diff --git a/package-lock.json b/package-lock.json index e4e740b..28d470e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "elasticsearch-streamer", - "version": "1.2.0", + "version": "1.2.1", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/package.json b/package.json index 5621f5b..b1cb502 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "elasticsearch-streamer", - "version": "1.2.0", + "version": "1.2.1", "description": "A programmatic and command-line utility for streaming docs from Elasticsearch indices and putting docs into indices in bulk.", "main": "index.js", "scripts": {