Skip to content

Commit

Permalink
Merge branch 'release/v4.0.2'
Browse files Browse the repository at this point in the history
  • Loading branch information
AnatolyUss committed Sep 21, 2019
2 parents 904ca5b + 8b4ce6a commit 6332308
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 39 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ from MySQL to PostgreSQL as easy and smooth as possible.</p>
<b>Note:</b> "logs_directory" will be created during script execution.</p>

<h3>VERSION</h3>
<p>Current version is 4.0.1<br />
<p>Current version is 4.0.2<br />
(major version . improvements . bug fixes)</p>

<h3>KNOWN ISSUES</h3>
Expand Down
40 changes: 20 additions & 20 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "nmig",
"version": "4.0.1",
"version": "4.0.2",
"description": "The database migration app",
"author": "Anatoly Khaytovich<[email protected]>",
"license": "GPL-3.0",
Expand All @@ -13,16 +13,16 @@
},
"dependencies": {
"mysql": "^2.17.1",
"pg": "^7.12.0",
"pg": "^7.12.1",
"pg-copy-streams": "^2.2.2"
},
"devDependencies": {
"@types/mysql": "^2.15.6",
"@types/node": "^12.6.8",
"@types/pg": "^7.4.14",
"@types/mysql": "^2.15.7",
"@types/node": "^12.7.5",
"@types/pg": "^7.11.1",
"@types/tape": "^4.2.33",
"tape": "^4.11.0",
"typescript": "^3.5.3"
"typescript": "^3.6.3"
},
"scripts": {
"build": "tsc",
Expand Down
72 changes: 60 additions & 12 deletions src/DataLoader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import { enforceConsistency } from './ConsistencyEnforcer';
import * as extraConfigProcessor from './ExtraConfigProcessor';
import BufferStream from './BufferStream';
import * as path from 'path';
import { PoolClient } from 'pg';
import { PoolClient, QueryResult } from 'pg';
const { from } = require('pg-copy-streams'); // No declaration file for module "pg-copy-streams".

process.on('message', async (signal: any) => {
Expand Down Expand Up @@ -64,12 +64,21 @@ function processSend(x: any): void {
/**
* Deletes given record from the data-pool.
*/
async function deleteChunk(conv: Conversion, dataPoolId: number, client: PoolClient): Promise<void> {
async function deleteChunk(
conv: Conversion,
dataPoolId: number,
client: PoolClient,
originalSessionReplicationRole: string | null = null
): Promise<void> {
const sql: string = `DELETE FROM "${ conv._schema }"."data_pool_${ conv._schema }${ conv._mySqlDbName }" WHERE id = ${ dataPoolId };`;
const dbAccess: DBAccess = new DBAccess(conv);

try {
await client.query(sql);

if (originalSessionReplicationRole) {
await enableTriggers(conv, client, <string>originalSessionReplicationRole);
}
} catch (error) {
await generateError(conv, `\t--[DataLoader::deleteChunk] ${ error }`, sql);
} finally {
Expand All @@ -94,12 +103,13 @@ async function processDataError(
sqlCopy: string,
tableName: string,
dataPoolId: number,
client: PoolClient
client: PoolClient,
originalSessionReplicationRole: string | null
): Promise<void> {
await generateError(conv, `\t--[populateTableWorker] ${ streamError }`, sqlCopy);
const rejectedData: string = `\t--[populateTableWorker] Error loading table data:\n${ sql }\n`;
log(conv, rejectedData, path.join(conv._logsDirPath, `${ tableName }.log`));
return deleteChunk(conv, dataPoolId, client);
return deleteChunk(conv, dataPoolId, client, originalSessionReplicationRole);
}

/**
Expand Down Expand Up @@ -137,30 +147,68 @@ async function populateTableWorker(
const buffer: Buffer = Buffer.from(csvString, conv._encoding);
const sqlCopy: string = `COPY "${ conv._schema }"."${ tableName }" FROM STDIN DELIMITER '${ conv._delimiter }' CSV;`;
const client: PoolClient = await dbAccess.getPgClient();
let originalSessionReplicationRole: string | null = null;

if (conv.shouldMigrateOnlyDataFor(tableName)) {
originalSessionReplicationRole = await disableTriggers(conv, client);
}

const copyStream: any = client.query(from(sqlCopy));
const bufferStream: BufferStream = new BufferStream(buffer);

copyStream.on('end', () => {
/*
* COPY FROM STDIN does not return the number of rows inserted.
* But the transactional behavior still applies (no records inserted if at least one failed).
* That is why in case of 'on end' the rowsInChunk value is actually the number of records inserted.
*/
// COPY FROM STDIN does not return the number of rows inserted.
// But the transactional behavior still applies (no records inserted if at least one failed).
// That is why in case of 'on end' the rowsInChunk value is actually the number of records inserted.
processSend(new MessageToMaster(tableName, rowsInChunk, rowsCnt));
return deleteChunk(conv, dataPoolId, client).then(() => resolvePopulateTableWorker());
return deleteChunk(conv, dataPoolId, client, originalSessionReplicationRole).then(() => resolvePopulateTableWorker());
});

copyStream.on('error', (copyStreamError: string) => {
return processDataError(conv, copyStreamError, sql, sqlCopy, tableName, dataPoolId, client)
return processDataError(conv, copyStreamError, sql, sqlCopy, tableName, dataPoolId, client, originalSessionReplicationRole)
.then(() => resolvePopulateTableWorker());
});

bufferStream.on('error', (bufferStreamError: string) => {
return processDataError(conv, bufferStreamError, sql, sqlCopy, tableName, dataPoolId, client)
return processDataError(conv, bufferStreamError, sql, sqlCopy, tableName, dataPoolId, client, originalSessionReplicationRole)
.then(() => resolvePopulateTableWorker());
});

bufferStream.setEncoding(conv._encoding).pipe(copyStream);
}, conv._encoding);
});
}

/**
* Disables all triggers and rules for current database session.
* !!!DO NOT release the client, it will be released after current data-chunk deletion.
*/
async function disableTriggers(conversion: Conversion, client: PoolClient): Promise<string> {
let sql: string = `SHOW session_replication_role;`;
let originalSessionReplicationRole: string = 'origin';

try {
const queryResult: QueryResult = await client.query(sql);
originalSessionReplicationRole = queryResult.rows[0].session_replication_role;
sql = 'SET session_replication_role = replica;';
await client.query(sql);
} catch (error) {
await generateError(conversion, `\t--[DataLoader::disableTriggers] ${ error }`, sql);
}

return originalSessionReplicationRole;
}

/**
* Enables all triggers and rules for current database session.
* !!!DO NOT release the client, it will be released after current data-chunk deletion.
*/
async function enableTriggers(conversion: Conversion, client: PoolClient, originalSessionReplicationRole: string): Promise<void> {
const sql: string = `SET session_replication_role = ${ originalSessionReplicationRole };`;

try {
await client.query(sql);
} catch (error) {
await generateError(conversion, `\t--[DataLoader::enableTriggers] ${ error }`, sql);
}
}

0 comments on commit 6332308

Please sign in to comment.