Skip to content

Commit

Permalink
Fabo/remove tendermint (#311)
Browse files Browse the repository at this point in the history
* remove tendermint

*  fixed empty blockHeight issue

* small refactoring

* catch on fetches to get logging

* delay block updates

* add retry logic

* refactored getBlockByHeight

* remove pm2 dep
  • Loading branch information
faboweb authored Feb 12, 2020
1 parent 0e6af0c commit 545f376
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 640 deletions.
118 changes: 33 additions & 85 deletions lib/block-listeners/cosmos-node-subscription.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
const _ = require('lodash')
const io = require('@pm2/io')
const Tendermint = require('../rpc/tendermint')
const {
publishBlockAdded,
publishUserTransactionAdded,
Expand All @@ -10,115 +8,65 @@ const Sentry = require('@sentry/node')
const database = require('../database')
const config = require('../../config.js')

const WAIT_FOR_BLOCK_DELAY = 5000
const POLLING_INTERVAL = 1000
const EXPECTED_MAX_BLOCK_WINDOW = 120000
// apparently the cosmos db takes a while to serve the content after a block has been updated
// if we don't do this, we run into errors as the data is not yet available
const COSMOS_DB_DELAY = 2000

// let reconnectionTimeout = {}

// This class establishes an rpc connection to Tendermint.
// This class polls for new blocks
// Used for listening to events, such as new blocks.
class CosmosNodeSubscription {
constructor(network, CosmosApiClass, store) {
this.network = network
this.cosmosAPI = new CosmosApiClass(network)
this.store = store
this.lastupdate = 0
this.metric = io.metric({
name: `${this.network.id}_update`
})
const networkSchemaName = this.network.id.replace(/-/g, '_')
this.db = new database(config)(networkSchemaName)
this.chainHangup = undefined
this.height = undefined

this.connectTendermint(this.network)
this.pollForNewBlock()
}

async connectTendermint(network) {
console.log('Connecting to Tendermint on', network.rpc_url)
// Create a RPC subscription for each network that will react to new block events.
Tendermint()
.connect(network.rpc_url)
.then(connectedClient => {
console.log('Connected to Tendermint on', network.rpc_url)
connectedClient.subscribe({ query: "tm.event='NewBlock'" }, event => {
// this tracks the block times
// issue: this will only trigger if there are actually blocks I guess
if (this.lastupdate) {
const diff = Date.now() - this.lastupdate
this.metric.set(diff)
}
this.lastupdate = Date.now()

setTimeout(
() => this.newBlockHandler(event.block.header.height),
WAIT_FOR_BLOCK_DELAY
)
async pollForNewBlock() {
this.pollingTimeout = setTimeout(async () => {
const block = await this.cosmosAPI.getBlockByHeight()

// if there are no new blocks for some time, trigger an error
// TODO: show this error automatically in the UI
if (this.chainHangup) clearTimeout(this.chainHangup)
this.chainHangup = setTimeout(() => {
console.error(`Chain ${this.network.id} seems to have halted.`)
Sentry.captureException(
new Error(`Chain ${this.network.id} seems to have halted.`)
)
}, EXPECTED_MAX_BLOCK_WINDOW)
})

// on connection lost, reconnect to tendermint + Sentry error
connectedClient.ondisconnect = () => {
console.log('Lost connection to Tendermint for', network.rpc_url)
if (this.height !== block.height) {
// apparently the cosmos db takes a while to serve the content after a block has been updated
// if we don't do this, we run into errors as the data is not yet available
setTimeout(() => this.newBlockHandler(block), COSMOS_DB_DELAY)

Sentry.withScope(function(scope) {
scope.setExtra('network', network.id)
scope.setExtra('rpc_url', network.rpc_url)
Sentry.captureException(new Error(`Lost Tendermint connection`))
})
// need to clear previous timeout to evoid connection hell
// clearTimeout(reconnectionTimeout[network.id])
// reconnectionTimeout[network.id] = setTimeout(
// () => this.connectTendermint(network),
// 3000
// )
}
})
.catch(e => {
Sentry.withScope(function(scope) {
scope.setExtra('network', network.id)
scope.setExtra('rpc_url', network.rpc_url)
Sentry.captureException(e)
})
// we are safe, that the chain produced a block so it didn't hang up
if (this.chainHangup) clearTimeout(this.chainHangup)
}

// clearTimeout(reconnectionTimeout[network.id])
// // if can't connect, retry
// reconnectionTimeout[network.id] = setTimeout(
// () => this.connectTendermint(network),
// 3000
// )
})
this.pollForNewBlock()
}, POLLING_INTERVAL)

// if there are no new blocks for some time, trigger an error
// TODO: show this error automatically in the UI
this.chainHangup = setTimeout(() => {
console.error(`Chain ${this.network.id} seems to have halted.`)
Sentry.captureException(
new Error(`Chain ${this.network.id} seems to have halted.`)
)
}, EXPECTED_MAX_BLOCK_WINDOW)
}

// For each block event, we fetch the block information and publish a message.
// A GraphQL resolver is listening for these messages and sends the block to
// each subscribed user.
async newBlockHandler(height) {
if (height) {
Sentry.configureScope(function(scope) {
scope.setExtra('height', height)
})
}

const block = await this.cosmosAPI.getBlockByHeight({
blockHeight: height
async newBlockHandler(block) {
Sentry.configureScope(function(scope) {
scope.setExtra('height', block.height)
})
// in the case of height being undefined to query for latest
// eslint-disable-next-line require-atomic-updates
height = block.height

const validators = await this.cosmosAPI.getAllValidators(height)
const validators = await this.cosmosAPI.getAllValidators(block.height)
const validatorMap = await this.getValidatorMap(validators)
this.updateDBValidatorProfiles(validators)
this.store.update({ height, block, validators: validatorMap })
this.store.update({ height: block.height, block, validators: validatorMap })
publishBlockAdded(this.network.id, block)
// TODO remove, only for demo purposes
// publishEvent(this.network.id, 'block', '', block)
Expand Down
4 changes: 1 addition & 3 deletions lib/resolvers.js
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,7 @@ const resolvers = {
block: (_, { networkId, height }, { dataSources }, { cacheControl }) => {
const maxAge = height ? 60 : 10
cacheControl.setCacheHint({ maxAge })
return remoteFetch(dataSources, networkId).getBlockByHeight({
blockHeight: height
})
return remoteFetch(dataSources, networkId).getBlockByHeight(height)
},
network: (_, { id }) => {
const network = networkMap[id]
Expand Down
216 changes: 0 additions & 216 deletions lib/rpc/tendermint.js

This file was deleted.

Loading

0 comments on commit 545f376

Please sign in to comment.