From d11de4e83e38404cc234d01da63ed2821b5333da Mon Sep 17 00:00:00 2001 From: Fabian Date: Thu, 27 Feb 2020 12:22:00 -0500 Subject: [PATCH] Aleksei/fix db insert (#367) * fixed insert function to allow empty schema * Aleksei/statistics in the api (#364) * collecting statistics in the API * small fixes * fixes * changing address to key in clearOverviewedAddresses Co-authored-by: Aleksey Rudometov --- lib/apollo.js | 5 + lib/controller/transaction/index.js | 24 ++++- lib/database/helpers.js | 4 +- lib/database/index.js | 9 +- lib/database/methods.js | 14 ++- lib/resolvers.js | 8 +- lib/routes/transaction.js | 53 +++++----- lib/statistics.js | 154 ++++++++++++++++++++++++++++ 8 files changed, 240 insertions(+), 31 deletions(-) create mode 100644 lib/statistics.js diff --git a/lib/apollo.js b/lib/apollo.js index 73f2749b4d..e29644637f 100644 --- a/lib/apollo.js +++ b/lib/apollo.js @@ -48,6 +48,11 @@ function createApolloServer(httpServer) { plugins: [responseCachePlugin()], subscriptions: { path: config.subscriptionPath + }, + context: fingerprintContext => { + if (fingerprintContext.req) { + return { fingerprint: fingerprintContext.req.headers.fingerprint } + } } } diff --git a/lib/controller/transaction/index.js b/lib/controller/transaction/index.js index d35af23f9c..5ca35c647f 100644 --- a/lib/controller/transaction/index.js +++ b/lib/controller/transaction/index.js @@ -3,6 +3,13 @@ const { networkMap } = require('../../networks') const Sentry = require('@sentry/node') const { publishUserTransactionAdded } = require('../../subscriptions') const reducers = require('../../reducers/cosmosV0-reducers') // TODO the whole transaction service only works for cosmos rn +const { + prestore, + storePrestored, + defineActionType, + defineActionDenom, + defineActionValue +} = require('../../statistics') global.fetch = require('node-fetch') @@ -33,7 +40,7 @@ async function estimate() { } } -async function broadcast(tx) { +async function broadcast(tx, fingerprint) { console.log(`Received broadcast: ${JSON.stringify(tx)}`) try { const hash = await broadcastTransaction( @@ -42,6 +49,18 @@ async function broadcast(tx) { networkMap[tx.networkId].api_url, tx.signedMessage ) + // presaving to the database + prestore( + { + network: tx.networkId, + address: tx.senderAddress, + action: defineActionType(tx.signedMessage.msg[0].type), + value: defineActionValue(tx.signedMessage.msg), + denom: defineActionDenom(tx.signedMessage.msg), + fingerprint + }, + hash + ) return { hash: hash, success: true @@ -193,6 +212,9 @@ async function pollTransactionSuccess( // but also here as a fallback // TODO the client might now update twice as it receives the success twice, could be fine though const transaction = reducers.transactionReducer(res, reducers) + // store in db + storePrestored(hash) + // we need to call publishUserTransactionAdded(networkId, senderAddress, transaction) } catch (error) { console.error('TX failed:', hash, error) diff --git a/lib/database/helpers.js b/lib/database/helpers.js index fcbadf8f06..9833d66614 100644 --- a/lib/database/helpers.js +++ b/lib/database/helpers.js @@ -65,9 +65,11 @@ const insert = ( return } + let schema_prefix = schema ? schema + '_' : '' + const query = ` mutation { - insert_${schema}_${table} ( + insert_${schema_prefix}${table} ( objects: ${stringifyForGraphQL(rows, height, chainId)}${ upsert ? `, diff --git a/lib/database/index.js b/lib/database/index.js index 0594e96aa0..2a2a1a9960 100644 --- a/lib/database/index.js +++ b/lib/database/index.js @@ -1,5 +1,9 @@ const { insert, read } = require('./helpers') -const { getValidatorsInfo, getMaintenance } = require('./methods') +const { + getValidatorsInfo, + getMaintenance, + storeStatistics +} = require('./methods') function database({ hasura_url, hasura_admin_key }) { return schema => { @@ -17,6 +21,9 @@ function database({ hasura_url, hasura_admin_key }) { })(schema)(validatorId) return validatorInfo[0] }, + storeStatistics: storeStatistics({ hasura_url, hasura_admin_key })( + schema + ), getMaintenance: getMaintenance({ hasura_url, hasura_admin_key diff --git a/lib/database/methods.js b/lib/database/methods.js index 326169d3e5..24bf5d72e7 100644 --- a/lib/database/methods.js +++ b/lib/database/methods.js @@ -1,4 +1,4 @@ -const { read } = require('./helpers') +const { read, insert } = require('./helpers') const getValidatorsInfo = ({ hasura_url, @@ -14,6 +14,15 @@ const getValidatorsInfo = ({ validatorId ? `where: {operator_address: {_eq: "${validatorId}"}}` : false ) } +const storeStatistics = ({ + hasura_url, + hasura_admin_key +}) => schema => async payload => { + return await insert({ + hasura_url, + hasura_admin_key + })(schema)(`statistics`, payload) +} const getMaintenance = ({ hasura_url, hasura_admin_key @@ -30,5 +39,6 @@ const getMaintenance = ({ } module.exports = { getValidatorsInfo, - getMaintenance + getMaintenance, + storeStatistics } diff --git a/lib/resolvers.js b/lib/resolvers.js index e79b3a37f0..d3f7475224 100644 --- a/lib/resolvers.js +++ b/lib/resolvers.js @@ -6,6 +6,7 @@ const { formatBech32Reducer } = require('./reducers/livepeerV0-reducers') const { networkList, networkMap } = require('./networks') const database = require('./database') const config = require('../config.js') +const { logOverview } = require('./statistics') function createDBInstance(network) { const networkSchemaName = network ? network.replace(/-/g, '_') : false @@ -310,7 +311,11 @@ const resolvers = { } return rewards }, - overview: async (_, { networkId, address }, { dataSources }) => { + overview: async ( + _, + { networkId, address }, + { dataSources, fingerprint } + ) => { const validatorsDictionary = localStore(dataSources, networkId).validators const overview = await remoteFetch(dataSources, networkId).getOverview( address, @@ -318,6 +323,7 @@ const resolvers = { ) overview.networkId = networkId overview.address = address + logOverview(overview, fingerprint) return overview }, transactions: (_, { networkId, address }, { dataSources }) => diff --git a/lib/routes/transaction.js b/lib/routes/transaction.js index fbc6bd184e..dd088c47e8 100644 --- a/lib/routes/transaction.js +++ b/lib/routes/transaction.js @@ -1,25 +1,28 @@ -var express = require('express') -var router = express.Router() -var { estimate, broadcast } = require('./../controller/transaction') - -router.use(function timeLog(req, res, next) { - req.txRequest = req.body && req.body.payload - if (req.txRequest) { - console.log(`Transaction ${Date.now()} ${req.txRequest.messageType}`) - } else { - res.json({ error: 'No Request Found' }) - } - next() -}) - -router.use('/estimate', async function(req, res) { - const response = await estimate(req.txRequest) - res.json(response) -}) - -router.use('/broadcast', async function(req, res) { - const response = await broadcast(req.txRequest) - res.json(response) -}) - -module.exports = router +var express = require('express') +var router = express.Router() +var { estimate, broadcast } = require('./../controller/transaction') + +router.use(function timeLog(req, res, next) { + req.txRequest = req.body && req.body.payload + if (req.txRequest) { + console.log(`Transaction ${Date.now()} ${req.txRequest.messageType}`) + } else { + res.json({ error: 'No Request Found' }) + } + next() +}) + +router.use('/estimate', async function(req, res) { + const response = await estimate(req.txRequest) + res.json(response) +}) + +router.use('/broadcast', async function(req, res) { + const response = await broadcast( + req.txRequest, + req.headers.fingerprint || false + ) + res.json(response) +}) + +module.exports = router diff --git a/lib/statistics.js b/lib/statistics.js new file mode 100644 index 0000000000..9386ab8d14 --- /dev/null +++ b/lib/statistics.js @@ -0,0 +1,154 @@ +const database = require('./database') +const config = require('../config') + +let overviewedAddresses = {} +let prestoredTransactions = {} + +const clearOverviewedAddresses = () => { + // clear old records, that are older than 1 hour + Object.keys(overviewedAddresses).map(key => + process.hrtime(overviewedAddresses[key])[0] > 60 * 60 + ? delete overviewedAddresses[key] + : null + ) +} +const clearPrestoredTransactions = () => { + // clear old records, that are older than 10 minute + Object.keys(prestoredTransactions).filter(hash => + process.hrtime(prestoredTransactions[hash].time)[0] > 10 * 60 + ? delete prestoredTransactions[hash] + : null + ) +} + +const storePrestored = hash => { + let transaction = prestoredTransactions[hash] + if (transaction) { + new database(config)('').storeStatistics(transaction.payload) + delete prestoredTransactions[hash] + } + clearPrestoredTransactions() +} + +const filterPayload = payload => { + payload = Object.assign({}, payload) // copying object + // sending it to the db + let possibleKeys = [ + 'network', + 'address', + 'action', + 'value', + 'fingerprint', + 'denom' + ] // possible keys + Object.keys(payload).map(key => { + if (possibleKeys.indexOf(key) === -1) { + delete payload[key] + } + }) + return payload +} + +const store = async payload => { + payload = filterPayload(payload) + return new database(config)('').storeStatistics(payload) +} + +const prestore = async (payload, hash) => { + payload = filterPayload(payload) + prestoredTransactions[hash] = { + payload, + time: process.hrtime() + } +} +const defineActionValue = msg => { + if (msg.length == 1 && msg[0].value.amount) { + return msg[0].value.amount.amount + } + return 0 +} +const defineActionDenom = msg => { + if (msg.length == 1 && msg[0].value.amount) { + return msg[0].value.amount.denom + } + return '' +} +const defineActionType = type => { + if (type.indexOf('/MsgDelegate') !== -1) { + return 'Delegate' + } else if (type.indexOf('/MsgUndelegate') !== -1) { + return 'Undelegate' + } else if (type.indexOf('/MsgSend') !== -1) { + return 'Send' + } else if (type.indexOf('/MsgWithdrawDelegationReward') !== -1) { + return 'Withdraw' + } else if (type.indexOf('/MsgDeposit') !== -1) { + return 'Deposit' + } else if (type.indexOf('/MsgBeginRedelegate') !== -1) { + return 'Redelegate' + } + return type +} +const logOverview = (overview, fingerprint) => { + let key = overview.address + overview.networkId // just a key to store data about last request time + /* + we are requesting balances toooooo frequently + and we don't need so many records in db + so limiting writting posibilities to 1 hour + */ + if (overviewedAddresses[key]) { + if (process.hrtime(overviewedAddresses[key])[0] < 60 * 60) { + return clearOverviewedAddresses() + } + } + overviewedAddresses[key] = process.hrtime() // time in ms + // common object + let data = { + address: overview.address, + network: overview.networkId, + fingerprint, + action: ``, + value: `` + } + // store liquidStake + data.action = 'liquidStake' + data.value = overview.liquidStake.toString() + store(data) + // store totalStake + data.action = 'totalStake' + data.value = overview.totalStake.toString() + store(data) + // store totalRewards + data.action = 'totalRewards' + data.value = overview.totalRewards.toString() + store(data) + // store rewards + // summing rewards with one denom + if (overview.rewards) { + overview.rewards + .reduce((newArray, currentItem) => { + const index = newArray.findIndex(el => el.denom == currentItem.denom) + if (index !== -1) { + newArray[index].amount *= 1 + newArray[index].amount += currentItem.amount * 1 + } else { + newArray.push(currentItem) + } + return newArray + }, []) + .map(reward => { + data.action = 'rewards' + data.denom = reward.denom + data.value = reward.amount + store(data) + }) + } +} +module.exports = { + prestore, + storePrestored, + defineActionType, + logOverview, + defineActionDenom, + defineActionValue +}