-
Notifications
You must be signed in to change notification settings - Fork 0
/
bulkUpdate.mjs
55 lines (50 loc) · 1.9 KB
/
bulkUpdate.mjs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import Promise from "bluebird";
import _ from "lodash";
import { performance } from "perf_hooks";
import client from "../client.mjs";
export async function bulkUpdate(documents) {
if( documents.length === 0 ) { return }
const bulkParams = {
refresh: true,
index: process.env.INDEX,
body: documents.flatMap(row => [
{ update: { '_id': row.id } },
{ doc: _.omit(row, 'id') }
])
}
try {
const { body: bulkResponse } = await client.bulk(bulkParams)
console.log(`geocode: updated ${_.get(bulkResponse, 'items.length')} documents in ${bulkResponse.took}ms for ${process.env.ELASTICSEARCH}/${process.env.INDEX}`)
} catch(exception) {
console.error("bulkUpdate() - body", bulkParams);
console.error("bulkUpdate() - exception", exception);
}
}
export async function clientUpdate(document) {
try {
return await client.update({
index: process.env.INDEX,
id: document.id,
body: {
doc: document
}
});
} catch(exception) {
console.error("clientUpdate()", exception);
return null
}
}
// noinspection JSUnusedLocalSymbols,JSUnusedGlobalSymbols
export async function clientUpdates(documents) {
try {
let startTime = performance.now()
let responses = await Promise.mapSeries(documents, clientUpdate) // BUGFIX: mapSeries to prevent overloading ElasticSearch
let successes = responses.filter(response => _.get(response, 'body.result') === 'updated').length;
let timeTaken = performance.now() - startTime;
console.log(`updated ${successes}/${documents.length} documents in ${timeTaken.toFixed(0)}ms for ${process.env.ELASTICSEARCH}/${process.env.INDEX}`)
return successes
} catch(exception) {
console.error("clientUpdates()", exception);
return null
}
}