Skip to content

Commit

Permalink
Feature/general improvements 1 (#91)
Browse files Browse the repository at this point in the history
* added number_of_simultaneously_running_loader_processes config parameter

* refactoring

* improved getNumberOfSimultaneouslyRunningLoaderProcesses function

* create identity for auto-incremented columns instead of plain sequences

* fixed DefaultProcessor.ts

* updated dependencies

* fixed TypeScript minor compilation errors

* improved data loading performance

* small refactoring

* updated dependencies

Co-authored-by: Anatoly Khaytovich <[email protected]>
  • Loading branch information
AnatolyUss and AnatolyFromPerion authored Feb 19, 2022
1 parent 0463549 commit c7bb43c
Show file tree
Hide file tree
Showing 21 changed files with 1,822 additions and 419 deletions.
21 changes: 20 additions & 1 deletion config/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,29 @@
],
"max_each_db_connection_pool_size" : 20,

"number_of_simultaneously_running_loader_processes_description": [
"Number of data-loader processes that will run simultaneously.",
"Acceptable values:",
"1. 'DEFAULT' - when set to 'DEFAULT', Nmig will run 2 data-loader processes.",
"2. Any positive integer.",
"Notice:",
"1.",
"Usually, migration gets accomplished faster with only 2 data-loader processes,",
"even if more CPU cores are available.",
"Yet, it is worth an effort to 'play' with this parameter prior running migration in production,",
"to check how many processes work best in your case.",
"When trying Nmig on your test-database, try both decreasing and increasing this number.",
"2.",
"'number_of_simultaneously_running_loader_processes' will never be greater",
"than a number of logical CPU cores of the machine running Nmig.",
"If greater number chosen - Nmig will run one loader process per each available CPU core."
],
"number_of_simultaneously_running_loader_processes": "DEFAULT",

"loader_max_old_space_size_description" : [
"V8 memory limit of the loader process.",
"Possible values are:",
"1. any number, representing memory limit (in MB).",
"1. any positive integer, representing memory limit (in MB).",
"2. 'DEFAULT', representing V8 default memory limit for your current hardware."
],
"loader_max_old_space_size" : "DEFAULT",
Expand Down
1,964 changes: 1,629 additions & 335 deletions package-lock.json

Large diffs are not rendered by default.

22 changes: 11 additions & 11 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "nmig",
"version": "5.5.0",
"version": "5.6.0",
"description": "The database migration app",
"author": "Anatoly Khaytovich<[email protected]>",
"license": "GPL-3.0",
Expand All @@ -12,21 +12,21 @@
"node": ">=10.0.0"
},
"dependencies": {
"json2csv": "^5.0.3",
"@types/mysql": "^2.15.21",
"@types/node": "^17.0.16",
"@types/pg": "^8.6.4",
"json2csv": "^5.0.6",
"mysql": "^2.18.1",
"pg": "^8.4.2",
"pg-copy-streams": "^5.1.1",
"@types/mysql": "^2.15.15",
"@types/node": "^14.14.5",
"@types/pg": "^7.14.5"
"pg": "^8.7.3",
"pg-copy-streams": "^6.0.2"
},
"devDependencies": {
"@types/tape": "^4.13.0",
"tape": "^5.0.1",
"typescript": "^4.0.5"
"@types/tape": "^4.13.2",
"tape": "^5.5.0",
"typescript": "4.5.5"
},
"scripts": {
"build": "tsc",
"build": "tsc --incremental -p tsconfig.json",
"start": "node dist/src/Main.js",
"test": "node dist/test/Main.test.js"
},
Expand Down
3 changes: 2 additions & 1 deletion src/BinaryDataDecoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
*
* @author Anatoly Khaytovich <[email protected]>
*/
import { PoolClient } from 'pg';

import { log } from './FsOps';
import Conversion from './Conversion';
import DBAccess from './DBAccess';
import DBAccessQueryResult from './DBAccessQueryResult';
import DBVendors from './DBVendors';
import IDBAccessQueryParams from './IDBAccessQueryParams';
import { PoolClient } from 'pg';

/**
* Decodes binary data from from textual representation in string.
Expand Down
1 change: 1 addition & 0 deletions src/BootProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* @author Anatoly Khaytovich <[email protected]>
*/
import * as path from 'path';

import Conversion from './Conversion';
import DBAccess from './DBAccess';
import DBAccessQueryResult from './DBAccessQueryResult';
Expand Down
6 changes: 3 additions & 3 deletions src/ConstraintsProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*
* @author Anatoly Khaytovich <[email protected]>
*/
import * as identityProcessor from './IdentityProcessor';
import * as sequencesProcessor from './SequencesProcessor';
import * as migrationStateManager from './MigrationStateManager';
import processEnum from './EnumProcessor';
import processNull from './NullProcessor';
Expand Down Expand Up @@ -66,13 +66,13 @@ export const processConstraintsPerTable = async (
migrateOnlyData: boolean
): Promise<void> => {
if (migrateOnlyData) {
return identityProcessor.setSequenceValue(conversion, tableName);
return sequencesProcessor.setSequenceValue(conversion, tableName);
}

await processEnum(conversion, tableName);
await processNull(conversion, tableName);
await processDefault(conversion, tableName);
await identityProcessor.createIdentity(conversion, tableName);
await sequencesProcessor.createIdentity(conversion, tableName);
await processIndexAndKey(conversion, tableName);
await processComments(conversion, tableName);
};
42 changes: 33 additions & 9 deletions src/Conversion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
*/
import * as path from 'path';
import { EventEmitter } from 'events';

import { Pool as MySQLPool } from 'mysql';
import { Pool as PgPool } from 'pg';

import { Encoding } from './Encoding';

export default class Conversion {
Expand All @@ -43,7 +45,7 @@ export default class Conversion {
/**
* V8 memory limit of the loader process.
*/
public _loaderMaxOldSpaceSize: number | string;
public readonly _loaderMaxOldSpaceSize: number | string;

/**
* Maximal amount of simultaneous connections to your MySQL and PostgreSQL servers each.
Expand Down Expand Up @@ -113,7 +115,7 @@ export default class Conversion {
/**
* The timestamp, at which the migration began.
*/
public _timeBegin: Date;
public readonly _timeBegin: Date;

/**
* Current version of source (MySQL) db.
Expand Down Expand Up @@ -163,7 +165,7 @@ export default class Conversion {
/**
* An array of data chunks.
*/
public readonly _dataPool: any[];
public readonly _dataPool: object[];

/**
* A flag, that indicates if Nmig currently runs in test mode.
Expand Down Expand Up @@ -201,6 +203,11 @@ export default class Conversion {
*/
public readonly _streamsHighWaterMark: number;

/**
* Number of data-loader processes that will run simultaneously.
*/
public readonly _numberOfSimultaneouslyRunningLoaderProcesses: string | number;

/**
* Constructor.
*/
Expand All @@ -226,24 +233,41 @@ export default class Conversion {
this._dataPool = [];
this._dicTables = Object.create(null);
this._mySqlDbName = this._sourceConString.database;
this._streamsHighWaterMark = this._config.streams_high_water_mark === undefined ? 16384 : +this._config.streams_high_water_mark;

this._streamsHighWaterMark = this._config.streams_high_water_mark === undefined
? 16384
: +this._config.streams_high_water_mark;

this._schema = this._config.schema === undefined || this._config.schema === ''
? this._mySqlDbName
: this._config.schema;

this._maxEachDbConnectionPoolSize = this._config.max_each_db_connection_pool_size !== undefined && Conversion._isIntNumeric(this._config.max_each_db_connection_pool_size)
const isValidMaxEachDbConnectionPoolSize: boolean = this._config.max_each_db_connection_pool_size !== undefined
&& Conversion._isIntNumeric(this._config.max_each_db_connection_pool_size);

this._maxEachDbConnectionPoolSize = isValidMaxEachDbConnectionPoolSize
? +this._config.max_each_db_connection_pool_size
: 20;

this._maxEachDbConnectionPoolSize = this._maxEachDbConnectionPoolSize > 0 ? this._maxEachDbConnectionPoolSize : 20;
this._runsInTestMode = false;
this._eventEmitter = null;
this._migrationCompletedEvent = 'migrationCompleted';
this._removeTestResources = this._config.remove_test_resources === undefined ? true : this._config.remove_test_resources;
this._maxEachDbConnectionPoolSize = this._maxEachDbConnectionPoolSize > 0 ? this._maxEachDbConnectionPoolSize : 20;
this._loaderMaxOldSpaceSize = this._config.loader_max_old_space_size;
this._loaderMaxOldSpaceSize = Conversion._isIntNumeric(this._loaderMaxOldSpaceSize) ? this._loaderMaxOldSpaceSize : 'DEFAULT';

this._removeTestResources = this._config.remove_test_resources === undefined
? true
: this._config.remove_test_resources;

this._numberOfSimultaneouslyRunningLoaderProcesses = Conversion._isIntNumeric(this._config.number_of_simultaneously_running_loader_processes)
? +this._config.number_of_simultaneously_running_loader_processes
: 'DEFAULT';

this._loaderMaxOldSpaceSize = Conversion._isIntNumeric(this._config.loader_max_old_space_size)
? +this._config.loader_max_old_space_size
: 'DEFAULT';

this._migrateOnlyData = this._config.migrate_only_data === undefined ? false : this._config.migrate_only_data;

this._delimiter = this._config.delimiter !== undefined && this._config.delimiter.length === 1
? this._config.delimiter
: ',';
Expand Down
7 changes: 4 additions & 3 deletions src/DBAccess.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import * as mysql from 'mysql';
import { MysqlError, Pool as MySQLPool, PoolConnection } from 'mysql';
import { Pool as PgPool, PoolClient } from 'pg';

import { log, generateError } from './FsOps';
import Conversion from './Conversion';
import DBVendors from './DBVendors';
Expand Down Expand Up @@ -72,8 +73,8 @@ export default class DBAccess {
* Closes both connection-pools.
*/
public static async closeConnectionPools(conversion: Conversion): Promise<Conversion> {
const closeMySqlConnections = () => {
return new Promise(resolve => {
const closeMySqlConnections = (): Promise<void> => {
return new Promise<void>(resolve => {
if (conversion._mysql) {
conversion._mysql.end(async error => {
if (error) {
Expand All @@ -88,7 +89,7 @@ export default class DBAccess {
});
};

const closePgConnections = async () => {
const closePgConnections = async (): Promise<void> => {
if (conversion._pg) {
try {
await conversion._pg.end();
Expand Down
7 changes: 5 additions & 2 deletions src/DataLoader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*
* @author Anatoly Khaytovich <[email protected]>
*/
import * as path from 'path';

import { log, generateError } from './FsOps';
import Conversion from './Conversion';
import DBAccess from './DBAccess';
Expand All @@ -29,16 +31,17 @@ import { dataTransferred } from './ConsistencyEnforcer';
import IDBAccessQueryParams from './IDBAccessQueryParams';
import * as extraConfigProcessor from './ExtraConfigProcessor';
import { getDataPoolTableName } from './DataPoolManager';
import * as path from 'path';

import { PoolClient, QueryResult } from 'pg';
import { PoolConnection } from 'mysql';

const { from } = require('pg-copy-streams'); // No declaration file for module "pg-copy-streams".
const { Transform: Json2CsvTransform } = require('json2csv'); // No declaration file for module "json2csv".

process.on('message', async (signal: MessageToDataLoader) => {
const { config, chunk } = signal;
const conv: Conversion = new Conversion(config);
log(conv, `\t--[loadData] Loading the data into "${ conv._schema }"."${ chunk._tableName }" table...`);
log(conv, `\t--[NMIG loadData] Loading the data into "${ conv._schema }"."${ chunk._tableName }" table...`);

const isRecoveryMode: boolean = await dataTransferred(conv, chunk._id);

Expand Down
Loading

0 comments on commit c7bb43c

Please sign in to comment.