Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix HRMP messages delivery for unregistered channels #688

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions packages/chopsticks/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,18 @@ const commands = yargs(hideBin(process.argv))
parachains.push(chain)
}

let relaychain: Blockchain | undefined

if (argv.relaychain) {
const { chain: rc } = await setupWithServer(await fetchConfig(argv.relaychain))
relaychain = rc
}

if (parachains.length > 1) {
await connectParachains(parachains)
await connectParachains(parachains, relaychain)
}

if (argv.relaychain) {
const { chain: relaychain } = await setupWithServer(await fetchConfig(argv.relaychain))
if (relaychain) {
for (const parachain of parachains) {
await connectVertical(relaychain, parachain)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ export const dryRunPreimage = async (argv: DryRunSchemaType) => {
downwardMessages: [],
upwardMessages: [],
horizontalMessages: {},
hrmpChannels: {},
})
if (extrinsics.length === 0) continue
calls.push(['BlockBuilder_apply_extrinsic', extrinsics])
Expand Down
12 changes: 11 additions & 1 deletion packages/core/src/blockchain/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import type { TransactionValidity } from '@polkadot/types/interfaces/txqueue'

import { Api } from '../api.js'
import { Block } from './block.js'
import { BuildBlockMode, BuildBlockParams, DownwardMessage, HorizontalMessage, TxPool } from './txpool.js'
import { BuildBlockMode, BuildBlockParams, DownwardMessage, HorizontalMessage, HrmpChannels, TxPool } from './txpool.js'
import { Database } from '../database.js'
import { HeadState } from './head-state.js'
import { InherentProvider } from './inherent/index.js'
Expand Down Expand Up @@ -391,6 +391,12 @@ export class Blockchain {
logger.debug({ id, hrmp }, 'submitHorizontalMessages')
}

openHrmpChannels(id: number, channels: HrmpChannels) {
this.#txpool.openHrmpChannels(id, channels)

logger.debug({ id, channels }, 'openHrmpChannels')
}

/**
* Build a new block with optional params. Use this when you don't have all the {@link BuildBlockParams}
*/
Expand Down Expand Up @@ -432,6 +438,7 @@ export class Blockchain {
downwardMessages: [],
upwardMessages: [],
horizontalMessages: {},
hrmpChannels: {},
}
const { result, storageDiff } = await dryRunExtrinsic(head, this.#inherentProviders, extrinsic, params)
const outcome = registry.createType<ApplyExtrinsicResult>('ApplyExtrinsicResult', result)
Expand All @@ -456,6 +463,7 @@ export class Blockchain {
downwardMessages: [],
upwardMessages: [],
horizontalMessages: hrmp,
hrmpChannels: {},
}
return dryRunInherents(head, this.#inherentProviders, params)
}
Expand All @@ -475,6 +483,7 @@ export class Blockchain {
downwardMessages: dmp,
upwardMessages: [],
horizontalMessages: {},
hrmpChannels: {},
}
return dryRunInherents(head, this.#inherentProviders, params)
}
Expand Down Expand Up @@ -516,6 +525,7 @@ export class Blockchain {
downwardMessages: [],
upwardMessages: [],
horizontalMessages: {},
hrmpChannels: {},
}
return dryRunInherents(head, this.#inherentProviders, params)
}
Expand Down
52 changes: 36 additions & 16 deletions packages/core/src/blockchain/inherent/parachain/validation-data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
upgradeGoAheadSignal,
} from '../../../utils/proof.js'
import { blake2AsHex, blake2AsU8a } from '@polkadot/util-crypto'
import { compactHex, getCurrentSlot, getParaId } from '../../../utils/index.js'
import { compactHex, getCurrentSlot, getCurrentTimestamp, getParaId, getSlotDuration } from '../../../utils/index.js'
import { createProof, decodeProof } from '../../../wasm-executor/index.js'

const MOCK_VALIDATION_DATA = {
Expand Down Expand Up @@ -105,49 +105,69 @@ export class SetValidationData implements InherentProvider {
const paraId = await getParaId(parent.chain)

const dmqMqcHeadKey = dmqMqcHead(paraId)
const hrmpIngressChannelIndexKey = hrmpIngressChannelIndex(paraId)
const hrmpEgressChannelIndexKey = hrmpEgressChannelIndex(paraId)

const decoded = await decodeProof(
extrinsic.validationData.relayParentStorageRoot,
extrinsic.relayChainState.trieNodes,
)

const slotIncrease = (meta.consts.timestamp.minimumPeriod as any as BN)
const minPeriod = meta.consts.timestamp.minimumPeriod as any as BN
let slotIncrease = minPeriod
.divn(3000) // relaychain min period
.toNumber()

slotIncrease = slotIncrease === 0 ? 1 : slotIncrease

for (const key of Object.values(WELL_KNOWN_KEYS)) {
if (key === WELL_KNOWN_KEYS.CURRENT_SLOT) {
// increment current slot
const relayCurrentSlot = decoded[key]
? meta.registry.createType<Slot>('Slot', hexToU8a(decoded[key])).toNumber()
: (await getCurrentSlot(parent.chain)) * slotIncrease
const newSlot = meta.registry.createType<Slot>('Slot', relayCurrentSlot + slotIncrease)
newEntries.push([key, u8aToHex(newSlot.toU8a())])

let newSlot: number

// Genesis with async backing
if (parent.number === 0 && Number(minPeriod) < 6000) {
const slotDuration = await getSlotDuration(parent.chain)
const currentTimestamp = await getCurrentTimestamp(parent.chain)
newSlot = Math.ceil(Number(currentTimestamp) / slotDuration)
} else {
newSlot = relayCurrentSlot + slotIncrease
}
const slot = meta.registry.createType<Slot>('Slot', newSlot)
newEntries.push([key, u8aToHex(slot.toU8a())])
} else {
newEntries.push([key, decoded[key]])
}
}

// inject missing hrmpIngressChannel and hrmpEgressChannel
const hrmpIngressChannelIndexKey = hrmpIngressChannelIndex(paraId)
const hrmpEgressChannelIndexKey = hrmpEgressChannelIndex(paraId)
const hrmpIngressChannels = meta.registry.createType('Vec<u32>', hexToU8a(decoded[hrmpIngressChannelIndexKey]))
const hrmpEgressChannels = meta.registry.createType('Vec<u32>', hexToU8a(decoded[hrmpEgressChannelIndexKey]))
for (const key in params.horizontalMessages) {

params.hrmpChannels[Number(paraId)]?.egress.forEach((receiver) => {
const receiverId = meta.registry.createType('u32', Number(receiver))
// order is important
const sender = meta.registry.createType('u32', key)
if (!hrmpIngressChannels.some((x) => x.eq(sender))) {
const idx = _.sortedIndexBy(hrmpIngressChannels, sender, (x) => x.toNumber())
hrmpIngressChannels.splice(idx, 0, sender)
if (!hrmpEgressChannels.some((x) => x.eq(receiverId))) {
const idx = _.sortedIndexBy(hrmpEgressChannels, receiverId, (x) => x.toNumber())
hrmpEgressChannels.splice(idx, 0, receiverId)
}
if (!hrmpEgressChannels.some((x) => x.eq(sender))) {
const idx = _.sortedIndexBy(hrmpEgressChannels, sender, (x) => x.toNumber())
hrmpEgressChannels.splice(idx, 0, sender)
})

params.hrmpChannels[Number(paraId)]?.ingress.forEach((sender) => {
const senderId = meta.registry.createType('u32', Number(sender))
// order is important
if (!hrmpIngressChannels.some((x) => x.eq(senderId))) {
const idx = _.sortedIndexBy(hrmpIngressChannels, senderId, (x) => x.toNumber())
hrmpIngressChannels.splice(idx, 0, senderId)
}
}
})

newEntries.push([hrmpIngressChannelIndexKey, hrmpIngressChannels.toHex()])
newEntries.push([hrmpEgressChannelIndexKey, hrmpEgressChannels.toHex()])
newEntries.push([hrmpIngressChannelIndexKey, hrmpIngressChannels.toHex()])

// inject paraHead
const headData = meta.registry.createType('HeadData', (await parent.header).toHex())
Expand Down
23 changes: 23 additions & 0 deletions packages/core/src/blockchain/txpool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,16 @@ export interface HorizontalMessage {
data: HexString
}

export interface HrmpChannels {
egress: number[]
ingress: number[]
}

export interface BuildBlockParams {
downwardMessages: DownwardMessage[]
upwardMessages: Record<number, HexString[]>
horizontalMessages: Record<number, HorizontalMessage[]>
hrmpChannels: Record<number, HrmpChannels>
transactions: HexString[]
unsafeBlockHeight?: number
}
Expand All @@ -47,6 +53,7 @@ export class TxPool {
readonly #ump: Record<number, HexString[]> = {}
readonly #dmp: DownwardMessage[] = []
readonly #hrmp: Record<number, HorizontalMessage[]> = {}
readonly #hrmpChannels: Record<number, HrmpChannels> = {}

#mode: BuildBlockMode
readonly #inherentProviders: InherentProvider[]
Expand Down Expand Up @@ -145,6 +152,14 @@ export class TxPool {
this.#maybeBuildBlock()
}

openHrmpChannels(id: number, channels: HrmpChannels) {
logger.debug({ id, channels }, 'open hrmp channels')

this.#hrmpChannels[id] = channels

this.#maybeBuildBlock()
}

#maybeBuildBlock() {
switch (this.#mode) {
case BuildBlockMode.Batch:
Expand Down Expand Up @@ -175,6 +190,7 @@ export class TxPool {
const upwardMessages = params?.upwardMessages || { ...this.#ump }
const downwardMessages = params?.downwardMessages || this.#dmp.splice(0)
const horizontalMessages = params?.horizontalMessages || { ...this.#hrmp }
const hrmpChannels = params?.hrmpChannels || { ...this.#hrmpChannels }
const unsafeBlockHeight = params?.unsafeBlockHeight
if (!params?.upwardMessages) {
for (const id of Object.keys(this.#ump)) {
Expand All @@ -187,12 +203,19 @@ export class TxPool {
}
}

if (!params?.hrmpChannels) {
for (const id of Object.keys(this.#hrmpChannels)) {
delete this.#hrmpChannels[id]
}
}

try {
await this.buildBlockWithParams({
transactions,
upwardMessages,
downwardMessages,
horizontalMessages,
hrmpChannels,
unsafeBlockHeight,
})
} catch (err) {
Expand Down
12 changes: 10 additions & 2 deletions packages/core/src/xcm/downward.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import { setStorage } from '../utils/set-storage.js'
import { xcmLogger } from './index.js'

export const connectDownward = async (relaychain: Blockchain, parachain: Blockchain) => {
const meta = await relaychain.head.meta
const relayMeta = await relaychain.head.meta
const paraId = await getParaId(parachain)
const downwardMessageQueuesKey = compactHex(meta.query.dmp.downwardMessageQueues(paraId))
const downwardMessageQueuesKey = compactHex(relayMeta.query.dmp.downwardMessageQueues(paraId))

const paraMeta = await parachain.head.meta

await relaychain.headState.subscribeStorage([downwardMessageQueuesKey], async (head, pairs) => {
const value = pairs[0][1]
Expand All @@ -29,5 +31,11 @@ export const connectDownward = async (relaychain: Blockchain, parachain: Blockch

xcmLogger.debug({ downwardMessages }, 'downward_message')
parachain.submitDownwardMessages(downwardMessages)

// We need to produce an extra block to process the message from the queue
// in case `MessageQueue` is used for dmp.
if (paraMeta.tx.messageQueue) {
await parachain.newBlock()
}
})
}
32 changes: 30 additions & 2 deletions packages/core/src/xcm/horizontal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ import { HexString } from '@polkadot/util/types'
import { hexToU8a } from '@polkadot/util'

import { Blockchain } from '../blockchain/index.js'
import { compactHex } from '../utils/index.js'
import { compactHex, getParaId } from '../utils/index.js'
import { xcmLogger } from './index.js'

export const connectHorizontal = async (parachains: Record<number, Blockchain>) => {
export const connectHorizontal = async (parachains: Record<number, Blockchain>, relaychain: Blockchain | undefined) => {
for (const [id, chain] of Object.entries(parachains)) {
const meta = await chain.head.meta

Expand All @@ -30,5 +30,33 @@ export const connectHorizontal = async (parachains: Record<number, Blockchain>)
}
}
})

const relayMeta = await relaychain?.head.meta

if (relayMeta) {
const paraId = await getParaId(chain)
const hrmpEgressChannelsIndex = compactHex(relayMeta.query.hrmp.hrmpEgressChannelsIndex(paraId))
const hrmpIngressChannelsIndex = compactHex(relayMeta.query.hrmp.hrmpIngressChannelsIndex(paraId))
const storageKeys = [hrmpEgressChannelsIndex, hrmpIngressChannelsIndex]

await relaychain?.headState.subscribeStorage(storageKeys, async (head, pairs) => {
const meta = await head.meta
let hrmpEgressChannels: number[] = []
let hrmpIngressChannels: number[] = []

for (const [key, value] of pairs) {
if (key === hrmpEgressChannelsIndex) {
hrmpEgressChannels = meta.registry.createType('Vec<u32>', hexToU8a(value)).toJSON() as number[]
} else if (key === hrmpIngressChannelsIndex) {
hrmpIngressChannels = meta.registry.createType('Vec<u32>', hexToU8a(value)).toJSON() as number[]
} else {
return
}
}

xcmLogger.info({ paraId: Number(id), egress: hrmpEgressChannels, ingress: hrmpIngressChannels }, 'hrmpChannels')
chain.openHrmpChannels(Number(id), { egress: hrmpEgressChannels, ingress: hrmpIngressChannels })
})
}
}
}
4 changes: 2 additions & 2 deletions packages/core/src/xcm/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ export const connectVertical = async (relaychain: Blockchain, parachain: Blockch
)
}

export const connectParachains = async (parachains: Blockchain[]) => {
export const connectParachains = async (parachains: Blockchain[], relaychain: Blockchain | undefined) => {
const list: Record<number, Blockchain> = {}

for (const chain of parachains) {
const paraId = await getParaId(chain)
list[paraId.toNumber()] = chain
}

await connectHorizontal(list)
await connectHorizontal(list, relaychain)

xcmLogger.info(`Connected parachains [${Object.keys(list)}]`)
}
2 changes: 1 addition & 1 deletion packages/utils/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ export const setupNetworks = async (networkOptions: Partial<Record<string, Confi

const parachainList = Object.values(parachains).map((i) => i.chain)
if (parachainList.length > 0) {
await connectParachains(parachainList)
await connectParachains(parachainList, relaychain?.chain)
}

if (wasmOverriden) {
Expand Down
Loading