Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[API] Add CSV bulk indexing support to Kibana API #6844

Merged
merged 18 commits into from
May 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
"@spalger/numeral": "^2.0.0",
"@spalger/test-subj-selector": "0.2.1",
"@spalger/ui-ace": "0.2.3",
"JSONStream": "1.1.1",
"angular": "1.4.7",
"angular-bootstrap-colorpicker": "3.0.19",
"angular-elastic": "2.5.0",
Expand All @@ -95,6 +96,7 @@
"clipboard": "1.5.5",
"commander": "2.8.1",
"css-loader": "0.17.0",
"csv-parse": "1.1.0",
"d3": "3.5.6",
"elasticsearch": "10.1.2",
"elasticsearch-browser": "10.1.2",
Expand All @@ -109,6 +111,7 @@
"good-squeeze": "2.1.0",
"gridster": "0.5.6",
"hapi": "8.8.1",
"highland": "2.7.2",
"httpolyglot": "0.1.1",
"imports-loader": "0.6.4",
"jade": "1.11.0",
Expand Down
3 changes: 3 additions & 0 deletions src/cli/cluster/base_path_proxy.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ export default class BasePathProxy {
config.set('server.basePath', this.basePath);
}

const ONE_GIGABYTE = 1024 * 1024 * 1024;
config.set('server.maxPayloadBytes', ONE_GIGABYTE);

setupLogging(null, this.server, config);
setupConnection(null, this.server, config);
this.setupRoutes();
Expand Down
2 changes: 2 additions & 0 deletions src/plugins/kibana/server/routes/api/ingest/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ import { registerPost } from './register_post';
import { registerDelete } from './register_delete';
import { registerProcessors } from './register_processors';
import { registerSimulate } from './register_simulate';
import { registerData } from './register_data';

export default function (server) {
registerPost(server);
registerDelete(server);
registerProcessors(server);
registerSimulate(server);
registerData(server);
}
99 changes: 99 additions & 0 deletions src/plugins/kibana/server/routes/api/ingest/register_data.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import { Promise } from 'bluebird';
import parse from 'csv-parse';
import _ from 'lodash';
import hi from 'highland';
import { patternToIngest } from '../../../../common/lib/convert_pattern_and_ingest_name';
import { PassThrough } from 'stream';
import JSONStream from 'JSONStream';

const ONE_GIGABYTE = 1024 * 1024 * 1024;

export function registerData(server) {
server.route({
path: '/api/kibana/{id}/_data',
method: 'POST',
config: {
payload: {
output: 'stream',
maxBytes: ONE_GIGABYTE
}
},
handler: function (req, reply) {
const boundCallWithRequest = _.partial(server.plugins.elasticsearch.callWithRequest, req);
const indexPattern = req.params.id;
const usePipeline = req.query.pipeline === 'true';
const delimiter = _.get(req.query, 'csv_delimiter', ',');
const responseStream = new PassThrough();
const parser = parse({
columns: true,
auto_parse: true,
delimiter: delimiter,
skip_empty_lines: true
});

const csv = req.payload.csv ? req.payload.csv : req.payload;
const fileName = req.payload.csv ? csv.hapi.filename : '';

let currentLine = 2; // Starts at 2 since we parse the header separately

csv.pipe(parser);

hi(parser)
.consume((err, doc, push, next) => {
if (err) {
push(err, null);
next();
}
else if (doc === hi.nil) {
// pass nil (end event) along the stream
push(null, doc);
}
else {
push(null, {index: _.isEmpty(fileName) ? {} : {_id: `${fileName}:${currentLine}`}});
push(null, doc);
currentLine++;
next();
}
})
.batch(200)
.map((bulkBody) => {
const bulkParams = {
index: indexPattern,
type: 'default',
body: bulkBody
};

if (usePipeline) {
bulkParams.pipeline = patternToIngest(indexPattern);
}

return hi(boundCallWithRequest('bulk', bulkParams));
})
.parallel(2)
.map((response) => {
return _.reduce(response.items, (memo, docResponse) => {
const indexResult = docResponse.index;
if (indexResult.error) {
const hasIndexingErrors = _.isUndefined(_.get(memo, 'errors.index'));
if (hasIndexingErrors) {
_.set(memo, 'errors.index', []);
}
memo.errors.index.push(_.pick(indexResult, ['_id', 'error']));
}
else {
memo.created++;
}

return memo;
}, {created: 0});
})
.stopOnError((err, push) => {
push(null, {created: 0, errors: {other: [err.message]}});
})
.pipe(JSONStream.stringify())
.pipe(responseStream);

reply(responseStream).type('application/json');
}
});
}
3 changes: 2 additions & 1 deletion tasks/config/simplemocha.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ module.exports = {
timeout: 10000,
slow: 5000,
ignoreLeaks: false,
reporter: 'dot'
reporter: 'dot',
globals: ['nil']
},
all: {
src: [
Expand Down
193 changes: 193 additions & 0 deletions test/unit/api/ingest/_data.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
define(function (require) {
var Promise = require('bluebird');
var _ = require('intern/dojo/node!lodash');
var expect = require('intern/dojo/node!expect.js');
var fakeNamesIndexTemplate = require('intern/dojo/node!../../fixtures/fake_names_index_template.json');
var fs = require('intern/dojo/node!fs');

return function (bdd, scenarioManager, request) {
const es = scenarioManager.client;
bdd.describe('_data', function () {

bdd.beforeEach(function () {
return es.indices.putTemplate({
name: 'names',
body: fakeNamesIndexTemplate
});
});

bdd.afterEach(function () {
return es.indices.delete({
index: 'names',
ignore: 404
})
.then(() => {
return es.indices.deleteTemplate({name: 'names'});
});
});

bdd.it('should accept a multipart/form-data request with a csv file attached', function () {
return request.post('/kibana/names/_data')
.attach('csv', 'test/unit/fixtures/fake_names.csv')
.expect(200);
});

bdd.it('should also accept the raw csv data in the payload body', function () {
var csvData = fs.readFileSync('test/unit/fixtures/fake_names_big.csv', {encoding: 'utf8'});

return request.post('/kibana/names/_data')
.send(csvData)
.expect(200);
});

bdd.it('should return JSON results', function () {
return request.post('/kibana/names/_data')
.attach('csv', 'test/unit/fixtures/fake_names.csv')
.expect('Content-Type', /json/)
.expect(200);
});

bdd.it('should index one document per row in the csv', function () {
return request.post('/kibana/names/_data')
.attach('csv', 'test/unit/fixtures/fake_names.csv')
.expect(200)
.then(() => {
return es.indices.refresh()
.then(() => {
return es.count({ index: 'names' })
.then((res) => {
expect(res.count).to.be(100);
});
});
});
});

bdd.it('should stream a chunked response', function () {
return request.post('/kibana/names/_data')
.attach('csv', 'test/unit/fixtures/fake_names.csv')
.expect('Transfer-Encoding', 'chunked')
.expect(200);
});

bdd.it('should respond with an array of one or more "result objects"', function () {
return request.post('/kibana/names/_data')
.attach('csv', 'test/unit/fixtures/fake_names_big.csv')
.expect(200)
.then((dataResponse) => {
expect(dataResponse.body.length).to.be(14);
});
});

bdd.describe('result objects', function () {

bdd.it('should include a count of created documents', function () {
return request.post('/kibana/names/_data')
.attach('csv', 'test/unit/fixtures/fake_names.csv')
.expect(200)
.then((dataResponse) => {
expect(dataResponse.body[0]).to.have.property('created');
expect(dataResponse.body[0].created).to.be(100);
});
});

bdd.it('should report any indexing errors per document under an "errors.index" key', function () {
return request.post('/kibana/names/_data')
.attach('csv', 'test/unit/fixtures/fake_names_with_mapping_errors.csv')
.expect(200)
.then((dataResponse) => {
expect(dataResponse.body[0]).to.have.property('created');
expect(dataResponse.body[0].created).to.be(98);
expect(dataResponse.body[0]).to.have.property('errors');
expect(dataResponse.body[0].errors).to.have.property('index');
expect(dataResponse.body[0].errors.index.length).to.be(2);
});
});

bdd.it('should report any csv parsing errors under an "errors.other" key', function () {
return request.post('/kibana/names/_data')
.attach('csv', 'test/unit/fixtures/fake_names_with_parse_errors.csv')
.expect(200)
.then((dataResponse) => {
// parse errors immediately abort indexing
expect(dataResponse.body[0]).to.have.property('created');
expect(dataResponse.body[0].created).to.be(0);

expect(dataResponse.body[0]).to.have.property('errors');
expect(dataResponse.body[0].errors).to.have.property('other');
expect(dataResponse.body[0].errors.other.length).to.be(1);
});
});

});

bdd.describe('optional parameters', function () {
bdd.it('should accept a custom csv_delimiter query string param for parsing the CSV', function () {
return request.post('/kibana/names/_data?csv_delimiter=|')
.attach('csv', 'test/unit/fixtures/fake_names_pipe_delimited.csv')
.expect(200)
.then((dataResponse) => {
expect(dataResponse.body[0]).to.have.property('created');
expect(dataResponse.body[0].created).to.be(2);
expect(dataResponse.body[0]).to.not.have.property('errors');

return es.indices.refresh();
})
.then(() => {
return es.search({
index: 'names'
});
})
.then((searchResponse) => {
const doc = _.get(searchResponse, 'hits.hits[0]._source');
expect(doc).to.only.have.keys('Number', 'Gender', 'NameSet');
});
});

bdd.it('should accept a boolean pipeline query string parameter enabling use of the index pattern\'s associated pipeline',
function () {
return es.transport.request({
path: '_ingest/pipeline/kibana-names',
method: 'put',
body: {
processors: [
{
set: {
field: 'foo',
value: 'bar'
}
}
]
}
})
.then((res) => {
return request.post('/kibana/names/_data?pipeline=true')
.attach('csv', 'test/unit/fixtures/fake_names.csv')
.expect(200);
})
.then(() => {
return es.indices.refresh();
})
.then(() => {
return es.search({
index: 'names'
});
})
.then((searchResponse) => {
_.forEach(searchResponse.hits.hits, (doc) => {
expect(doc._source).to.have.property('foo');
expect(doc._source.foo).to.be('bar');
});
return searchResponse;
})
.finally(() => {
return es.transport.request({
path: '_ingest/pipeline/kibana-names',
method: 'delete'
});
});
});
});

});
};
});
2 changes: 2 additions & 0 deletions test/unit/api/ingest/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ define(function (require) {
var expect = require('intern/dojo/node!expect.js');
var post = require('./_post');
var del = require('./_del');
var data = require('./_data');
var simulate = require('./_simulate');
var processors = require('./_processors');
var processorTypes = require('./processors/index');
Expand All @@ -26,6 +27,7 @@ define(function (require) {

post(bdd, scenarioManager, request);
del(bdd, scenarioManager, request);
data(bdd, scenarioManager, request);
simulate(bdd, scenarioManager, request);
processors(bdd, scenarioManager, request);
processorTypes(bdd, scenarioManager, request);
Expand Down
Loading