Skip to content

Commit

Permalink
Aleksei/fix db insert (#367)
Browse files Browse the repository at this point in the history
* fixed insert function to allow empty schema

* Aleksei/statistics in the api (#364)

* collecting statistics in the API

* small fixes

* fixes

* changing address to key in clearOverviewedAddresses

Co-authored-by: Aleksey Rudometov <[email protected]>
  • Loading branch information
faboweb and iambeone authored Feb 27, 2020
1 parent fbc5c71 commit d11de4e
Show file tree
Hide file tree
Showing 8 changed files with 240 additions and 31 deletions.
5 changes: 5 additions & 0 deletions lib/apollo.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ function createApolloServer(httpServer) {
plugins: [responseCachePlugin()],
subscriptions: {
path: config.subscriptionPath
},
context: fingerprintContext => {
if (fingerprintContext.req) {
return { fingerprint: fingerprintContext.req.headers.fingerprint }
}
}
}

Expand Down
24 changes: 23 additions & 1 deletion lib/controller/transaction/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@ const { networkMap } = require('../../networks')
const Sentry = require('@sentry/node')
const { publishUserTransactionAdded } = require('../../subscriptions')
const reducers = require('../../reducers/cosmosV0-reducers') // TODO the whole transaction service only works for cosmos rn
const {
prestore,
storePrestored,
defineActionType,
defineActionDenom,
defineActionValue
} = require('../../statistics')

global.fetch = require('node-fetch')

Expand Down Expand Up @@ -33,7 +40,7 @@ async function estimate() {
}
}

async function broadcast(tx) {
async function broadcast(tx, fingerprint) {
console.log(`Received broadcast: ${JSON.stringify(tx)}`)
try {
const hash = await broadcastTransaction(
Expand All @@ -42,6 +49,18 @@ async function broadcast(tx) {
networkMap[tx.networkId].api_url,
tx.signedMessage
)
// presaving to the database
prestore(
{
network: tx.networkId,
address: tx.senderAddress,
action: defineActionType(tx.signedMessage.msg[0].type),
value: defineActionValue(tx.signedMessage.msg),
denom: defineActionDenom(tx.signedMessage.msg),
fingerprint
},
hash
)
return {
hash: hash,
success: true
Expand Down Expand Up @@ -193,6 +212,9 @@ async function pollTransactionSuccess(
// but also here as a fallback
// TODO the client might now update twice as it receives the success twice, could be fine though
const transaction = reducers.transactionReducer(res, reducers)
// store in db
storePrestored(hash)
// we need to call
publishUserTransactionAdded(networkId, senderAddress, transaction)
} catch (error) {
console.error('TX failed:', hash, error)
Expand Down
4 changes: 3 additions & 1 deletion lib/database/helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,11 @@ const insert = (
return
}

let schema_prefix = schema ? schema + '_' : ''

const query = `
mutation {
insert_${schema}_${table} (
insert_${schema_prefix}${table} (
objects: ${stringifyForGraphQL(rows, height, chainId)}${
upsert
? `,
Expand Down
9 changes: 8 additions & 1 deletion lib/database/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
const { insert, read } = require('./helpers')
const { getValidatorsInfo, getMaintenance } = require('./methods')
const {
getValidatorsInfo,
getMaintenance,
storeStatistics
} = require('./methods')

function database({ hasura_url, hasura_admin_key }) {
return schema => {
Expand All @@ -17,6 +21,9 @@ function database({ hasura_url, hasura_admin_key }) {
})(schema)(validatorId)
return validatorInfo[0]
},
storeStatistics: storeStatistics({ hasura_url, hasura_admin_key })(
schema
),
getMaintenance: getMaintenance({
hasura_url,
hasura_admin_key
Expand Down
14 changes: 12 additions & 2 deletions lib/database/methods.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const { read } = require('./helpers')
const { read, insert } = require('./helpers')

const getValidatorsInfo = ({
hasura_url,
Expand All @@ -14,6 +14,15 @@ const getValidatorsInfo = ({
validatorId ? `where: {operator_address: {_eq: "${validatorId}"}}` : false
)
}
const storeStatistics = ({
hasura_url,
hasura_admin_key
}) => schema => async payload => {
return await insert({
hasura_url,
hasura_admin_key
})(schema)(`statistics`, payload)
}
const getMaintenance = ({
hasura_url,
hasura_admin_key
Expand All @@ -30,5 +39,6 @@ const getMaintenance = ({
}
module.exports = {
getValidatorsInfo,
getMaintenance
getMaintenance,
storeStatistics
}
8 changes: 7 additions & 1 deletion lib/resolvers.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const { formatBech32Reducer } = require('./reducers/livepeerV0-reducers')
const { networkList, networkMap } = require('./networks')
const database = require('./database')
const config = require('../config.js')
const { logOverview } = require('./statistics')

function createDBInstance(network) {
const networkSchemaName = network ? network.replace(/-/g, '_') : false
Expand Down Expand Up @@ -310,14 +311,19 @@ const resolvers = {
}
return rewards
},
overview: async (_, { networkId, address }, { dataSources }) => {
overview: async (
_,
{ networkId, address },
{ dataSources, fingerprint }
) => {
const validatorsDictionary = localStore(dataSources, networkId).validators
const overview = await remoteFetch(dataSources, networkId).getOverview(
address,
validatorsDictionary
)
overview.networkId = networkId
overview.address = address
logOverview(overview, fingerprint)
return overview
},
transactions: (_, { networkId, address }, { dataSources }) =>
Expand Down
53 changes: 28 additions & 25 deletions lib/routes/transaction.js
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
var express = require('express')
var router = express.Router()
var { estimate, broadcast } = require('./../controller/transaction')

router.use(function timeLog(req, res, next) {
req.txRequest = req.body && req.body.payload
if (req.txRequest) {
console.log(`Transaction ${Date.now()} ${req.txRequest.messageType}`)
} else {
res.json({ error: 'No Request Found' })
}
next()
})

router.use('/estimate', async function(req, res) {
const response = await estimate(req.txRequest)
res.json(response)
})

router.use('/broadcast', async function(req, res) {
const response = await broadcast(req.txRequest)
res.json(response)
})

module.exports = router
var express = require('express')
var router = express.Router()
var { estimate, broadcast } = require('./../controller/transaction')

router.use(function timeLog(req, res, next) {
req.txRequest = req.body && req.body.payload
if (req.txRequest) {
console.log(`Transaction ${Date.now()} ${req.txRequest.messageType}`)
} else {
res.json({ error: 'No Request Found' })
}
next()
})

router.use('/estimate', async function(req, res) {
const response = await estimate(req.txRequest)
res.json(response)
})

router.use('/broadcast', async function(req, res) {
const response = await broadcast(
req.txRequest,
req.headers.fingerprint || false
)
res.json(response)
})

module.exports = router
154 changes: 154 additions & 0 deletions lib/statistics.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
const database = require('./database')
const config = require('../config')

let overviewedAddresses = {}
let prestoredTransactions = {}

const clearOverviewedAddresses = () => {
// clear old records, that are older than 1 hour
Object.keys(overviewedAddresses).map(key =>
process.hrtime(overviewedAddresses[key])[0] > 60 * 60
? delete overviewedAddresses[key]
: null
)
}
const clearPrestoredTransactions = () => {
// clear old records, that are older than 10 minute
Object.keys(prestoredTransactions).filter(hash =>
process.hrtime(prestoredTransactions[hash].time)[0] > 10 * 60
? delete prestoredTransactions[hash]
: null
)
}

const storePrestored = hash => {
let transaction = prestoredTransactions[hash]
if (transaction) {
new database(config)('').storeStatistics(transaction.payload)
delete prestoredTransactions[hash]
}
clearPrestoredTransactions()
}

const filterPayload = payload => {
payload = Object.assign({}, payload) // copying object
// sending it to the db
let possibleKeys = [
'network',
'address',
'action',
'value',
'fingerprint',
'denom'
] // possible keys
Object.keys(payload).map(key => {
if (possibleKeys.indexOf(key) === -1) {
delete payload[key]
}
})
return payload
}

const store = async payload => {
payload = filterPayload(payload)
return new database(config)('').storeStatistics(payload)
}

const prestore = async (payload, hash) => {
payload = filterPayload(payload)
prestoredTransactions[hash] = {
payload,
time: process.hrtime()
}
}
const defineActionValue = msg => {
if (msg.length == 1 && msg[0].value.amount) {
return msg[0].value.amount.amount
}
return 0
}
const defineActionDenom = msg => {
if (msg.length == 1 && msg[0].value.amount) {
return msg[0].value.amount.denom
}
return ''
}
const defineActionType = type => {
if (type.indexOf('/MsgDelegate') !== -1) {
return 'Delegate'
} else if (type.indexOf('/MsgUndelegate') !== -1) {
return 'Undelegate'
} else if (type.indexOf('/MsgSend') !== -1) {
return 'Send'
} else if (type.indexOf('/MsgWithdrawDelegationReward') !== -1) {
return 'Withdraw'
} else if (type.indexOf('/MsgDeposit') !== -1) {
return 'Deposit'
} else if (type.indexOf('/MsgBeginRedelegate') !== -1) {
return 'Redelegate'
}
return type
}
const logOverview = (overview, fingerprint) => {
let key = overview.address + overview.networkId // just a key to store data about last request time
/*
we are requesting balances toooooo frequently
and we don't need so many records in db
so limiting writting posibilities to 1 hour
*/
if (overviewedAddresses[key]) {
if (process.hrtime(overviewedAddresses[key])[0] < 60 * 60) {
return clearOverviewedAddresses()
}
}
overviewedAddresses[key] = process.hrtime() // time in ms
// common object
let data = {
address: overview.address,
network: overview.networkId,
fingerprint,
action: ``,
value: ``
}
// store liquidStake
data.action = 'liquidStake'
data.value = overview.liquidStake.toString()
store(data)
// store totalStake
data.action = 'totalStake'
data.value = overview.totalStake.toString()
store(data)
// store totalRewards
data.action = 'totalRewards'
data.value = overview.totalRewards.toString()
store(data)
// store rewards
// summing rewards with one denom
if (overview.rewards) {
overview.rewards
.reduce((newArray, currentItem) => {
const index = newArray.findIndex(el => el.denom == currentItem.denom)
if (index !== -1) {
newArray[index].amount *= 1
newArray[index].amount += currentItem.amount * 1
} else {
newArray.push(currentItem)
}
return newArray
}, [])
.map(reward => {
data.action = 'rewards'
data.denom = reward.denom
data.value = reward.amount
store(data)
})
}
}
module.exports = {
prestore,
storePrestored,
defineActionType,
logOverview,
defineActionDenom,
defineActionValue
}

0 comments on commit d11de4e

Please sign in to comment.