forked from lbryio/lbry-social-tipbot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
deposits.js
136 lines (119 loc) · 4.16 KB
/
deposits.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
// Background tx processor for handling deposits and withdrawals
const async = require('async');
const config = require('./config/config');
const mysql = require('mysql');
const request = require('request');
if (config.debug) {
require('request-debug')(request);
}
// Connect to the database
let db;
const initSqlConnection = () => {
const _db = mysql.createConnection({
host: config.mariadb.host,
user: config.mariadb.username,
password: config.mariadb.password,
database: config.mariadb.database,
charset: 'utf8mb4',
timezone: 'Z'
});
_db.on('error', (err) => {
if (err.code === 2006 || ['PROTOCOL_CONNECTION_LOST', 'PROTOCOL_PACKETS_OUT_OF_ORDER', 'PROTOCOL_ENQUEUE_AFTER_FATAL_ERROR'].indexOf(err.code) > -1) {
_db.destroy();
db = initSqlConnection();
}
});
return _db;
};
db = initSqlConnection();
const userIdForDepositAddress = (address, callback) => {
db.query('SELECT Id FROM Users WHERE DepositAddress = ?', [address], (err, res) => {
if (err) {
return callback(err, null);
}
if (res.length === 0) {
return callback(new Error(`User with deposit address ${address} not found.`));
}
return callback(null, res[0].Id);
});
};
const createDeposit = (address, txhash, amount, confirmations, callback) => {
async.waterfall([
(cb) => {
userIdForDepositAddress(address, cb);
},
(depositorId, cb) => {
db.query('INSERT INTO Deposits (UserId, TxHash, Amount, Confirmations, Created) VALUES (?, ?, ?, ?, UTC_TIMESTAMP()) ON DUPLICATE KEY UPDATE Confirmations = ?',
[depositorId, txhash, amount, confirmations, confirmations], cb);
}
], callback);
};
const confirmationsForTx = (txhash, callback) => {
request.post({ url: config.lbrycrd.rpcurl, json: { method: 'gettransaction', params: [txhash] } }, (err, res, body) => {
if (body.error) {
return callback(body.error, null);
}
return callback(null, body.result.confirmations);
});
};
const processNewDeposits = (callback) => {
async.waterfall([
(cb) => {
request.post({ url: config.lbrycrd.rpcurl, json: { method: 'listtransactions', params: [config.lbrycrd.account, 1000] } }, cb);
},
(res, body, cb) => {
if (body.error) {
return cb(body.error, null);
}
// simply insert the deposits
return async.each(body.result, (tx, ecb) => {
if (tx.amount <= 0) {
return ecb(null, null);
}
return createDeposit(tx.address, tx.txid, tx.amount, tx.confirmations, ecb);
}, cb);
}
], callback);
};
// deposits with confirmations < 3
const processPendingDeposits = (callback) => {
async.waterfall([
(cb) => {
db.query('SELECT Id, TxHash FROM Deposits WHERE Confirmations < 3', cb);
},
(res, fields, cb) => {
if (res.length === 0) {
return cb(null, []);
}
return async.each(res, (deposit, ecb) => {
confirmationsForTx(deposit.TxHash, (err, confirmations) => {
if (err) {
return ecb(err, null);
}
db.query('UPDATE Deposits SET Confirmations = ? WHERE Id = ?', [confirmations, deposit.Id], ecb);
});
}, cb);
}
], callback);
};
const runProcess = () => {
async.waterfall([
(cb) => {
console.log('Processing new deposits.');
processNewDeposits(cb);
},
(cb) => {
console.log('Processing pending deposits.');
processPendingDeposits(cb);
}
], (err) => {
if (err) {
console.log('Error occurred.');
console.log(err);
}
// run again in 1 minute
console.log('Waiting 1 minute...');
setTimeout(runProcess, 60000);
});
};
runProcess();