From 797c677247a8039c6a605ce6711395f98921cbe4 Mon Sep 17 00:00:00 2001 From: jjy Date: Mon, 20 May 2019 16:08:16 +0800 Subject: [PATCH] feat: alert system --- Cargo.lock | 36 +++ Cargo.toml | 2 + ckb-bin/Cargo.toml | 1 + ckb-bin/src/subcommand/run.rs | 14 ++ core/src/alert.rs | 92 ++++++++ core/src/lib.rs | 1 + protocol/src/builder.rs | 60 ++++- protocol/src/convert.rs | 40 ++++ protocol/src/protocol.fbs | 16 ++ protocol/src/protocol_generated.rs | 236 +++++++++++++++++++ protocol/src/protocol_generated_verifier.rs | 248 ++++++++++++++++++++ rpc/Cargo.toml | 1 + rpc/src/config.rs | 5 + rpc/src/module/alert.rs | 65 +++++ rpc/src/module/mod.rs | 2 + rpc/src/module/stats.rs | 12 +- rpc/src/server.rs | 20 +- rpc/src/test.rs | 5 + sync/src/lib.rs | 1 + util/alert-system/Cargo.toml | 26 ++ util/alert-system/src/alert.toml | 5 + util/alert-system/src/alert_relayer.rs | 159 +++++++++++++ util/alert-system/src/config.rs | 15 ++ util/alert-system/src/lib.rs | 17 ++ util/alert-system/src/notifier.rs | 58 +++++ util/alert-system/src/verifier.rs | 49 ++++ util/jsonrpc-types/src/alert.rs | 77 ++++++ util/jsonrpc-types/src/lib.rs | 2 + util/multisig/Cargo.toml | 4 + util/multisig/src/error.rs | 2 +- util/multisig/src/secp256k1.rs | 24 +- 31 files changed, 1276 insertions(+), 19 deletions(-) create mode 100644 core/src/alert.rs create mode 100644 rpc/src/module/alert.rs create mode 100644 util/alert-system/Cargo.toml create mode 100644 util/alert-system/src/alert.toml create mode 100644 util/alert-system/src/alert_relayer.rs create mode 100644 util/alert-system/src/config.rs create mode 100644 util/alert-system/src/lib.rs create mode 100644 util/alert-system/src/notifier.rs create mode 100644 util/alert-system/src/verifier.rs create mode 100644 util/jsonrpc-types/src/alert.rs diff --git a/Cargo.lock b/Cargo.lock index 1ff1378776..be6820c61e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -309,6 +309,29 @@ dependencies = [ "jemallocator 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "ckb-alert-system" +version = "0.1.0" +dependencies = [ + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", + "ckb-core 0.13.0-pre", + "ckb-network 0.13.0-pre", + "ckb-protocol 0.13.0-pre", + "ckb-util 0.13.0-pre", + "failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "faketime 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "flatbuffers 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", + "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", + "jsonrpc-types 0.13.0-pre", + "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "lru-cache 0.1.0 (git+https://github.com/nervosnetwork/lru-cache?rev=b36a4d1)", + "multisig 0.1.0", + "numext-fixed-hash 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", + "toml 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "ckb-app-config" version = "0.13.0-pre" @@ -364,6 +387,7 @@ name = "ckb-bin" version = "0.13.0-pre" dependencies = [ "build-info 0.13.0-pre", + "ckb-alert-system 0.1.0", "ckb-app-config 0.13.0-pre", "ckb-chain 0.13.0-pre", "ckb-chain-spec 0.13.0-pre", @@ -639,6 +663,7 @@ dependencies = [ name = "ckb-rpc" version = "0.13.0-pre" dependencies = [ + "ckb-alert-system 0.1.0", "ckb-chain 0.13.0-pre", "ckb-chain-spec 0.13.0-pre", "ckb-core 0.13.0-pre", @@ -2005,6 +2030,17 @@ dependencies = [ "ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "multisig" +version = "0.1.0" +dependencies = [ + "failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", + "secp256k1 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "native-tls" version = "0.2.2" diff --git a/Cargo.toml b/Cargo.toml index a68b2a11b2..36eae3ef32 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,8 @@ ckb-bin = { path = "ckb-bin" } [workspace] members = [ + "util/alert-system", + "util/multisig", "util/logger", "util/hash", "util/merkle-tree", diff --git a/ckb-bin/Cargo.toml b/ckb-bin/Cargo.toml index 5957684111..59ecad43ef 100644 --- a/ckb-bin/Cargo.toml +++ b/ckb-bin/Cargo.toml @@ -27,6 +27,7 @@ ckb-pow = { path = "../pow" } ckb-network = { path = "../network"} ckb-rpc = { path = "../rpc"} ckb-resource = { path = "../resource"} +ckb-alert-system = { path = "../util/alert-system" } logger = { path = "../util/logger" } numext-fixed-hash = { version = "0.1", features = ["support_rand", "support_heapsize", "support_serde"] } numext-fixed-uint = { version = "0.1", features = ["support_rand", "support_heapsize", "support_serde"] } diff --git a/ckb-bin/src/subcommand/run.rs b/ckb-bin/src/subcommand/run.rs index 2cb252f036..bc468a59b8 100644 --- a/ckb-bin/src/subcommand/run.rs +++ b/ckb-bin/src/subcommand/run.rs @@ -1,5 +1,6 @@ use crate::helper::{deadlock_detection, wait_for_exit}; use build_info::Version; +use ckb_alert_system::{alert_relayer::AlertRelayer, config::Config as AlertSystemConfig}; use ckb_app_config::{ExitCode, RunArgs}; use ckb_chain::chain::ChainService; use ckb_db::RocksDB; @@ -67,6 +68,10 @@ pub fn run(args: RunArgs, version: Version) -> Result<(), ExitCode> { synchronizer.peers(), ); let net_timer = NetTimeProtocol::default(); + let alert_relayer = AlertRelayer::new(version.to_string(), AlertSystemConfig::default()); + + let alert_notifier = alert_relayer.notifier(); + let alert_verifier = alert_relayer.verifier(); let synchronizer_clone = synchronizer.clone(); let protocols = vec![ @@ -91,6 +96,13 @@ pub fn run(args: RunArgs, version: Version) -> Result<(), ExitCode> { move || Box::new(net_timer.clone()), Arc::clone(&network_state), ), + CKBProtocol::new( + "alt".to_string(), + NetworkProtocol::ALERT.into(), + &["1".to_string()][..], + move || Box::new(alert_relayer.clone()), + Arc::clone(&network_state), + ), ]; let network_controller = NetworkService::new(Arc::clone(&network_state), protocols) .start(version, Some("NetworkService")) @@ -103,6 +115,8 @@ pub fn run(args: RunArgs, version: Version) -> Result<(), ExitCode> { synchronizer, chain_controller, block_assembler_controller, + alert_notifier, + alert_verifier, ); wait_for_exit(); diff --git a/core/src/alert.rs b/core/src/alert.rs new file mode 100644 index 0000000000..05ec7fb9b0 --- /dev/null +++ b/core/src/alert.rs @@ -0,0 +1,92 @@ +use crate::Bytes; +use bincode::serialize; +use hash::blake2b_256; +use numext_fixed_hash::H256; +use serde_derive::{Deserialize, Serialize}; + +#[derive(Debug, Default, Eq, PartialEq, Serialize, Deserialize)] +pub struct Alert { + pub id: u32, + // cancel id if cancel is greater than 0 + pub cancel: u32, + // TODO use service flag to distinguish network + //network: String, + pub min_version: Option, + pub max_version: Option, + pub priority: u32, + pub notice_until: u64, + pub message: String, + pub signatures: Vec, +} + +impl Alert { + pub fn hash(&self) -> H256 { + let alert = Self { + id: self.id, + cancel: self.cancel, + min_version: self.min_version.clone(), + max_version: self.max_version.clone(), + priority: self.priority, + notice_until: self.notice_until, + message: self.message.clone(), + signatures: Vec::new(), + }; + blake2b_256(serialize(&alert).expect("serialize should not fail")).into() + } +} + +#[derive(Default)] +pub struct AlertBuilder { + inner: Alert, +} + +impl AlertBuilder { + pub fn alert(mut self, alert: Alert) -> Self { + self.inner = alert; + self + } + + pub fn id(mut self, id: u32) -> Self { + self.inner.id = id; + self + } + + pub fn cancel(mut self, cancel: u32) -> Self { + self.inner.cancel = cancel; + self + } + + pub fn min_version(mut self, min_version: Option) -> Self { + self.inner.min_version = min_version; + self + } + + pub fn max_version(mut self, max_version: Option) -> Self { + self.inner.max_version = max_version; + self + } + + pub fn priority(mut self, priority: u32) -> Self { + self.inner.priority = priority; + self + } + + pub fn signatures(mut self, signatures: Vec) -> Self { + self.inner.signatures.extend(signatures); + self + } + + pub fn notice_until(mut self, notice_until: u64) -> Self { + self.inner.notice_until = notice_until; + self + } + + pub fn message(mut self, message: String) -> Self { + self.inner.message = message; + self + } + + pub fn build(self) -> Alert { + self.inner + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index bbfd3e3612..0743cd3ab8 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -2,6 +2,7 @@ //! //! This Library provides the essential types for building ckb. +pub mod alert; pub mod block; pub mod cell; pub mod difficulty; diff --git a/protocol/src/builder.rs b/protocol/src/builder.rs index e0176920d7..c9f416dcdb 100644 --- a/protocol/src/builder.rs +++ b/protocol/src/builder.rs @@ -1,9 +1,10 @@ use crate::protocol_generated::ckb::protocol::{ - Block as FbsBlock, BlockBuilder, BlockProposalBuilder, BlockTransactionsBuilder, - Bytes as FbsBytes, BytesBuilder, CellInput as FbsCellInput, CellInputBuilder, - CellOutput as FbsCellOutput, CellOutputBuilder, CompactBlock, CompactBlockBuilder, - FilteredBlock, FilteredBlockBuilder, GetBlockProposalBuilder, GetBlockTransactionsBuilder, - GetBlocks as FbsGetBlocks, GetBlocksBuilder, GetHeaders as FbsGetHeaders, GetHeadersBuilder, + Alert as FbsAlert, AlertBuilder, AlertMessage, AlertMessageBuilder, Block as FbsBlock, + BlockBuilder, BlockProposalBuilder, BlockTransactionsBuilder, Bytes as FbsBytes, BytesBuilder, + CellInput as FbsCellInput, CellInputBuilder, CellOutput as FbsCellOutput, CellOutputBuilder, + CompactBlock, CompactBlockBuilder, FilteredBlock, FilteredBlockBuilder, + GetBlockProposalBuilder, GetBlockTransactionsBuilder, GetBlocks as FbsGetBlocks, + GetBlocksBuilder, GetHeaders as FbsGetHeaders, GetHeadersBuilder, GetRelayTransaction as FbsGetRelayTransaction, GetRelayTransactionBuilder, Header as FbsHeader, HeaderBuilder, Headers as FbsHeaders, HeadersBuilder, IndexTransactionBuilder, MerkleProofBuilder, OutPoint as FbsOutPoint, OutPointBuilder, @@ -16,6 +17,7 @@ use crate::protocol_generated::ckb::protocol::{ WitnessBuilder, H256 as FbsH256, }; use crate::{short_transaction_id, short_transaction_id_keys}; +use ckb_core::alert::Alert; use ckb_core::block::Block; use ckb_core::header::{BlockNumber, Header}; use ckb_core::script::Script; @@ -712,6 +714,54 @@ impl<'a> TimeMessage<'a> { } } +impl<'a> FbsAlert<'a> { + pub fn build<'b>(fbb: &mut FlatBufferBuilder<'b>, alert: &Alert) -> WIPOffset> { + let min_version = alert + .min_version + .as_ref() + .map(|min_ver| FbsBytes::build(fbb, min_ver.as_bytes())); + let max_version = alert + .max_version + .as_ref() + .map(|max_ver| FbsBytes::build(fbb, max_ver.as_bytes())); + let signatures = { + let signatures: Vec<_> = alert + .signatures + .iter() + .map(|sig| FbsBytes::build(fbb, sig)) + .collect(); + fbb.create_vector(&signatures) + }; + let message = FbsBytes::build(fbb, alert.message.as_bytes()); + let mut builder = AlertBuilder::new(fbb); + builder.add_id(alert.id); + builder.add_cancel(alert.cancel); + if let Some(min_version) = min_version { + builder.add_min_version(min_version); + } + if let Some(max_version) = max_version { + builder.add_max_version(max_version); + } + builder.add_priority(alert.priority); + builder.add_signatures(signatures); + builder.add_notice_until(alert.notice_until); + builder.add_message(message); + builder.finish() + } +} + +impl<'a> AlertMessage<'a> { + pub fn build_alert<'b>( + fbb: &mut FlatBufferBuilder<'b>, + alert: &Alert, + ) -> WIPOffset> { + let fbs_alert = FbsAlert::build(fbb, alert); + let mut builder = AlertMessageBuilder::new(fbb); + builder.add_payload(fbs_alert); + builder.finish() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/protocol/src/convert.rs b/protocol/src/convert.rs index 0a4223ff57..5ef7c2423e 100644 --- a/protocol/src/convert.rs +++ b/protocol/src/convert.rs @@ -353,3 +353,43 @@ impl<'a> TryFrom> for ckb_core::transaction:: }) } } + +impl<'a> TryFrom> for ckb_core::alert::Alert { + type Error = FailureError; + + fn try_from(alert: ckb_protocol::Alert<'a>) -> Result { + let message = String::from_utf8(cast!(alert.message().and_then(|m| m.seq()))?.to_owned())?; + let min_version: Option = match alert + .min_version() + .and_then(|m| m.seq().map(|s| String::from_utf8(s.to_vec()))) + { + Some(min_version) => Some(min_version?), + None => None, + }; + let max_version: Option = match alert + .max_version() + .and_then(|m| m.seq().map(|s| String::from_utf8(s.to_vec()))) + { + Some(max_version) => Some(max_version?), + None => None, + }; + let signatures: Result, FailureError> = + FlatbuffersVectorIterator::new(cast!(alert.signatures())?) + .map(|s| { + cast!(s.seq()) + .map(ckb_core::Bytes::from) + .map_err(Into::into) + }) + .collect(); + Ok(ckb_core::alert::AlertBuilder::default() + .id(alert.id()) + .cancel(alert.cancel()) + .min_version(min_version) + .max_version(max_version) + .priority(alert.priority()) + .signatures(signatures?) + .notice_until(alert.notice_until()) + .message(message) + .build()) + } +} diff --git a/protocol/src/protocol.fbs b/protocol/src/protocol.fbs index e899aa63d7..40c735e6ff 100644 --- a/protocol/src/protocol.fbs +++ b/protocol/src/protocol.fbs @@ -241,3 +241,19 @@ table TimeMessage { table Time { timestamp: uint64; } + +table AlertMessage { + payload: Alert; +} + +table Alert { + id: uint32; + cancel: uint32; + min_version: Bytes; + max_version: Bytes; + priority: uint32; + signatures: [Bytes]; + notice_until: uint64; + message: Bytes; +} + diff --git a/protocol/src/protocol_generated.rs b/protocol/src/protocol_generated.rs index ee564ac18d..6432b55ac8 100644 --- a/protocol/src/protocol_generated.rs +++ b/protocol/src/protocol_generated.rs @@ -3591,6 +3591,242 @@ impl<'a: 'b, 'b> TimeBuilder<'a, 'b> { } } +pub enum AlertMessageOffset {} +#[derive(Copy, Clone, Debug, PartialEq)] + +pub struct AlertMessage<'a> { + pub _tab: flatbuffers::Table<'a>, +} + +impl<'a> flatbuffers::Follow<'a> for AlertMessage<'a> { + type Inner = AlertMessage<'a>; + #[inline] + fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + Self { + _tab: flatbuffers::Table { buf: buf, loc: loc }, + } + } +} + +impl<'a> AlertMessage<'a> { + #[inline] + pub fn init_from_table(table: flatbuffers::Table<'a>) -> Self { + AlertMessage { + _tab: table, + } + } + #[allow(unused_mut)] + pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr>( + _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr>, + args: &'args AlertMessageArgs<'args>) -> flatbuffers::WIPOffset> { + let mut builder = AlertMessageBuilder::new(_fbb); + if let Some(x) = args.payload { builder.add_payload(x); } + builder.finish() + } + + pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4; + + #[inline] + pub fn payload(&self) -> Option> { + self._tab.get::>>(AlertMessage::VT_PAYLOAD, None) + } +} + +pub struct AlertMessageArgs<'a> { + pub payload: Option>>, +} +impl<'a> Default for AlertMessageArgs<'a> { + #[inline] + fn default() -> Self { + AlertMessageArgs { + payload: None, + } + } +} +pub struct AlertMessageBuilder<'a: 'b, 'b> { + fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a>, + start_: flatbuffers::WIPOffset, +} +impl<'a: 'b, 'b> AlertMessageBuilder<'a, 'b> { + #[inline] + pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset>) { + self.fbb_.push_slot_always::>(AlertMessage::VT_PAYLOAD, payload); + } + #[inline] + pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>) -> AlertMessageBuilder<'a, 'b> { + let start = _fbb.start_table(); + AlertMessageBuilder { + fbb_: _fbb, + start_: start, + } + } + #[inline] + pub fn finish(self) -> flatbuffers::WIPOffset> { + let o = self.fbb_.end_table(self.start_); + flatbuffers::WIPOffset::new(o.value()) + } +} + +pub enum AlertOffset {} +#[derive(Copy, Clone, Debug, PartialEq)] + +pub struct Alert<'a> { + pub _tab: flatbuffers::Table<'a>, +} + +impl<'a> flatbuffers::Follow<'a> for Alert<'a> { + type Inner = Alert<'a>; + #[inline] + fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + Self { + _tab: flatbuffers::Table { buf: buf, loc: loc }, + } + } +} + +impl<'a> Alert<'a> { + #[inline] + pub fn init_from_table(table: flatbuffers::Table<'a>) -> Self { + Alert { + _tab: table, + } + } + #[allow(unused_mut)] + pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr>( + _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr>, + args: &'args AlertArgs<'args>) -> flatbuffers::WIPOffset> { + let mut builder = AlertBuilder::new(_fbb); + builder.add_notice_until(args.notice_until); + if let Some(x) = args.message { builder.add_message(x); } + if let Some(x) = args.signatures { builder.add_signatures(x); } + builder.add_priority(args.priority); + if let Some(x) = args.max_version { builder.add_max_version(x); } + if let Some(x) = args.min_version { builder.add_min_version(x); } + builder.add_cancel(args.cancel); + builder.add_id(args.id); + builder.finish() + } + + pub const VT_ID: flatbuffers::VOffsetT = 4; + pub const VT_CANCEL: flatbuffers::VOffsetT = 6; + pub const VT_MIN_VERSION: flatbuffers::VOffsetT = 8; + pub const VT_MAX_VERSION: flatbuffers::VOffsetT = 10; + pub const VT_PRIORITY: flatbuffers::VOffsetT = 12; + pub const VT_SIGNATURES: flatbuffers::VOffsetT = 14; + pub const VT_NOTICE_UNTIL: flatbuffers::VOffsetT = 16; + pub const VT_MESSAGE: flatbuffers::VOffsetT = 18; + + #[inline] + pub fn id(&self) -> u32 { + self._tab.get::(Alert::VT_ID, Some(0)).unwrap() + } + #[inline] + pub fn cancel(&self) -> u32 { + self._tab.get::(Alert::VT_CANCEL, Some(0)).unwrap() + } + #[inline] + pub fn min_version(&self) -> Option> { + self._tab.get::>>(Alert::VT_MIN_VERSION, None) + } + #[inline] + pub fn max_version(&self) -> Option> { + self._tab.get::>>(Alert::VT_MAX_VERSION, None) + } + #[inline] + pub fn priority(&self) -> u32 { + self._tab.get::(Alert::VT_PRIORITY, Some(0)).unwrap() + } + #[inline] + pub fn signatures(&self) -> Option>>> { + self._tab.get::>>>>(Alert::VT_SIGNATURES, None) + } + #[inline] + pub fn notice_until(&self) -> u64 { + self._tab.get::(Alert::VT_NOTICE_UNTIL, Some(0)).unwrap() + } + #[inline] + pub fn message(&self) -> Option> { + self._tab.get::>>(Alert::VT_MESSAGE, None) + } +} + +pub struct AlertArgs<'a> { + pub id: u32, + pub cancel: u32, + pub min_version: Option>>, + pub max_version: Option>>, + pub priority: u32, + pub signatures: Option>>>>, + pub notice_until: u64, + pub message: Option>>, +} +impl<'a> Default for AlertArgs<'a> { + #[inline] + fn default() -> Self { + AlertArgs { + id: 0, + cancel: 0, + min_version: None, + max_version: None, + priority: 0, + signatures: None, + notice_until: 0, + message: None, + } + } +} +pub struct AlertBuilder<'a: 'b, 'b> { + fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a>, + start_: flatbuffers::WIPOffset, +} +impl<'a: 'b, 'b> AlertBuilder<'a, 'b> { + #[inline] + pub fn add_id(&mut self, id: u32) { + self.fbb_.push_slot::(Alert::VT_ID, id, 0); + } + #[inline] + pub fn add_cancel(&mut self, cancel: u32) { + self.fbb_.push_slot::(Alert::VT_CANCEL, cancel, 0); + } + #[inline] + pub fn add_min_version(&mut self, min_version: flatbuffers::WIPOffset>) { + self.fbb_.push_slot_always::>(Alert::VT_MIN_VERSION, min_version); + } + #[inline] + pub fn add_max_version(&mut self, max_version: flatbuffers::WIPOffset>) { + self.fbb_.push_slot_always::>(Alert::VT_MAX_VERSION, max_version); + } + #[inline] + pub fn add_priority(&mut self, priority: u32) { + self.fbb_.push_slot::(Alert::VT_PRIORITY, priority, 0); + } + #[inline] + pub fn add_signatures(&mut self, signatures: flatbuffers::WIPOffset>>>) { + self.fbb_.push_slot_always::>(Alert::VT_SIGNATURES, signatures); + } + #[inline] + pub fn add_notice_until(&mut self, notice_until: u64) { + self.fbb_.push_slot::(Alert::VT_NOTICE_UNTIL, notice_until, 0); + } + #[inline] + pub fn add_message(&mut self, message: flatbuffers::WIPOffset>) { + self.fbb_.push_slot_always::>(Alert::VT_MESSAGE, message); + } + #[inline] + pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>) -> AlertBuilder<'a, 'b> { + let start = _fbb.start_table(); + AlertBuilder { + fbb_: _fbb, + start_: start, + } + } + #[inline] + pub fn finish(self) -> flatbuffers::WIPOffset> { + let o = self.fbb_.end_table(self.start_); + flatbuffers::WIPOffset::new(o.value()) + } +} + #[inline] pub fn get_root_as_sync_message<'a>(buf: &'a [u8]) -> SyncMessage<'a> { flatbuffers::get_root::>(buf) diff --git a/protocol/src/protocol_generated_verifier.rs b/protocol/src/protocol_generated_verifier.rs index 65345857d8..a380958495 100644 --- a/protocol/src/protocol_generated_verifier.rs +++ b/protocol/src/protocol_generated_verifier.rs @@ -104,6 +104,254 @@ pub mod ckb { } } + impl<'a> Verify for reader::Alert<'a> { + fn verify(&self) -> Result { + let tab = self._tab; + let buf = tab.buf; + let buf_len = buf.len(); + + if tab.loc > MAX_OFFSET_LOC || tab.loc + flatbuffers::SIZE_SOFFSET > buf_len { + return Err(Error::OutOfBounds); + } + + let vtab_loc = { + let soffset_slice = &buf[tab.loc..]; + let soffset = flatbuffers::read_scalar::(soffset_slice); + if soffset >= 0 { + tab.loc.checked_sub(soffset as usize) + } else { + soffset + .checked_neg() + .and_then(|foffset| tab.loc.checked_add(foffset as usize)) + } + } + .ok_or(Error::OutOfBounds)?; + if vtab_loc + .checked_add(flatbuffers::SIZE_VOFFSET + flatbuffers::SIZE_VOFFSET) + .filter(|loc| *loc <= buf_len) + .is_none() + { + return Err(Error::OutOfBounds); + } + + let vtab = tab.vtable(); + let vtab_num_bytes = vtab.num_bytes(); + let object_inline_num_bytes = vtab.object_inline_num_bytes(); + if vtab_num_bytes < flatbuffers::SIZE_VOFFSET + flatbuffers::SIZE_VOFFSET + || object_inline_num_bytes < flatbuffers::SIZE_SOFFSET + { + return Err(Error::OutOfBounds); + } + if vtab_loc + .checked_add(vtab_num_bytes) + .filter(|loc| *loc <= buf_len) + .is_none() + { + return Err(Error::OutOfBounds); + } + if tab + .loc + .checked_add(object_inline_num_bytes) + .filter(|loc| *loc <= buf_len) + .is_none() + { + return Err(Error::OutOfBounds); + } + + for i in 0..vtab.num_fields() { + let voffset = vtab.get_field(i) as usize; + if (voffset > 0 && voffset < flatbuffers::SIZE_SOFFSET) + || voffset >= object_inline_num_bytes + { + return Err(Error::OutOfBounds); + } + } + + if Self::VT_ID as usize + flatbuffers::SIZE_VOFFSET + <= vtab_num_bytes + { + let voffset = vtab.get(Self::VT_ID) as usize; + if voffset > 0 && object_inline_num_bytes - voffset < 4 { + return Err(Error::OutOfBounds); + } + } + + if Self::VT_CANCEL as usize + flatbuffers::SIZE_VOFFSET + <= vtab_num_bytes + { + let voffset = vtab.get(Self::VT_CANCEL) as usize; + if voffset > 0 && object_inline_num_bytes - voffset < 4 { + return Err(Error::OutOfBounds); + } + } + + if Self::VT_MIN_VERSION as usize + flatbuffers::SIZE_VOFFSET + <= vtab_num_bytes + { + let voffset = vtab.get(Self::VT_MIN_VERSION) as usize; + if voffset > 0 { + if voffset + 4 > object_inline_num_bytes { + return Err(Error::OutOfBounds); + } + + if let Some(f) = self.min_version() { + f.verify()?; + } + } + } + + if Self::VT_MAX_VERSION as usize + flatbuffers::SIZE_VOFFSET + <= vtab_num_bytes + { + let voffset = vtab.get(Self::VT_MAX_VERSION) as usize; + if voffset > 0 { + if voffset + 4 > object_inline_num_bytes { + return Err(Error::OutOfBounds); + } + + if let Some(f) = self.max_version() { + f.verify()?; + } + } + } + + if Self::VT_PRIORITY as usize + flatbuffers::SIZE_VOFFSET + <= vtab_num_bytes + { + let voffset = vtab.get(Self::VT_PRIORITY) as usize; + if voffset > 0 && object_inline_num_bytes - voffset < 4 { + return Err(Error::OutOfBounds); + } + } + + if Self::VT_SIGNATURES as usize + flatbuffers::SIZE_VOFFSET + <= vtab_num_bytes + { + let voffset = vtab.get(Self::VT_SIGNATURES) as usize; + if voffset > 0 { + if voffset + 4 > object_inline_num_bytes { + return Err(Error::OutOfBounds); + } + + let signatures_verifier = VectorVerifier::follow( + buf, + try_follow_uoffset(buf, tab.loc + voffset)?, + ); + signatures_verifier + .verify_reference_elements::()?; + } + } + + if Self::VT_NOTICE_UNTIL as usize + flatbuffers::SIZE_VOFFSET + <= vtab_num_bytes + { + let voffset = vtab.get(Self::VT_NOTICE_UNTIL) as usize; + if voffset > 0 && object_inline_num_bytes - voffset < 8 { + return Err(Error::OutOfBounds); + } + } + + if Self::VT_MESSAGE as usize + flatbuffers::SIZE_VOFFSET + <= vtab_num_bytes + { + let voffset = vtab.get(Self::VT_MESSAGE) as usize; + if voffset > 0 { + if voffset + 4 > object_inline_num_bytes { + return Err(Error::OutOfBounds); + } + + if let Some(f) = self.message() { + f.verify()?; + } + } + } + + Ok(()) + } + } + + impl<'a> Verify for reader::AlertMessage<'a> { + fn verify(&self) -> Result { + let tab = self._tab; + let buf = tab.buf; + let buf_len = buf.len(); + + if tab.loc > MAX_OFFSET_LOC || tab.loc + flatbuffers::SIZE_SOFFSET > buf_len { + return Err(Error::OutOfBounds); + } + + let vtab_loc = { + let soffset_slice = &buf[tab.loc..]; + let soffset = flatbuffers::read_scalar::(soffset_slice); + if soffset >= 0 { + tab.loc.checked_sub(soffset as usize) + } else { + soffset + .checked_neg() + .and_then(|foffset| tab.loc.checked_add(foffset as usize)) + } + } + .ok_or(Error::OutOfBounds)?; + if vtab_loc + .checked_add(flatbuffers::SIZE_VOFFSET + flatbuffers::SIZE_VOFFSET) + .filter(|loc| *loc <= buf_len) + .is_none() + { + return Err(Error::OutOfBounds); + } + + let vtab = tab.vtable(); + let vtab_num_bytes = vtab.num_bytes(); + let object_inline_num_bytes = vtab.object_inline_num_bytes(); + if vtab_num_bytes < flatbuffers::SIZE_VOFFSET + flatbuffers::SIZE_VOFFSET + || object_inline_num_bytes < flatbuffers::SIZE_SOFFSET + { + return Err(Error::OutOfBounds); + } + if vtab_loc + .checked_add(vtab_num_bytes) + .filter(|loc| *loc <= buf_len) + .is_none() + { + return Err(Error::OutOfBounds); + } + if tab + .loc + .checked_add(object_inline_num_bytes) + .filter(|loc| *loc <= buf_len) + .is_none() + { + return Err(Error::OutOfBounds); + } + + for i in 0..vtab.num_fields() { + let voffset = vtab.get_field(i) as usize; + if (voffset > 0 && voffset < flatbuffers::SIZE_SOFFSET) + || voffset >= object_inline_num_bytes + { + return Err(Error::OutOfBounds); + } + } + + if Self::VT_PAYLOAD as usize + flatbuffers::SIZE_VOFFSET + <= vtab_num_bytes + { + let voffset = vtab.get(Self::VT_PAYLOAD) as usize; + if voffset > 0 { + if voffset + 4 > object_inline_num_bytes { + return Err(Error::OutOfBounds); + } + + if let Some(f) = self.payload() { + f.verify()?; + } + } + } + + Ok(()) + } + } + impl<'a> Verify for reader::Block<'a> { fn verify(&self) -> Result { let tab = self._tab; diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 1e00409f29..706b4895eb 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -19,6 +19,7 @@ ckb-chain = { path = "../chain" } ckb-miner = { path = "../miner" } ckb-protocol = { path = "../protocol" } ckb-pow = { path = "../pow"} +ckb-alert-system = { path = "../util/alert-system" } jsonrpc-core = "10.1" jsonrpc-derive = "10.1" jsonrpc-http-server = { git = "https://github.com/nervosnetwork/jsonrpc", rev = "7c101f83a8fe34369c1b7a0e9b6721fcb0f91ee0" } diff --git a/rpc/src/config.rs b/rpc/src/config.rs index b403e215c8..b0c57ba772 100644 --- a/rpc/src/config.rs +++ b/rpc/src/config.rs @@ -10,6 +10,7 @@ pub enum Module { Experiment, Stats, IntegrationTest, + Alert, } #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] @@ -48,4 +49,8 @@ impl Config { pub fn integration_test_enable(&self) -> bool { self.modules.contains(&Module::IntegrationTest) } + + pub(crate) fn alert_enable(&self) -> bool { + self.modules.contains(&Module::Alert) + } } diff --git a/rpc/src/module/alert.rs b/rpc/src/module/alert.rs new file mode 100644 index 0000000000..6429460767 --- /dev/null +++ b/rpc/src/module/alert.rs @@ -0,0 +1,65 @@ +use crate::error::RPCError; +use ckb_alert_system::{notifier::Notifier as AlertNotifier, verifier::Verifier as AlertVerifier}; +use ckb_core::alert::Alert as CoreAlert; +use ckb_network::NetworkController; +use ckb_protocol::AlertMessage; +use ckb_sync::NetworkProtocol; +use flatbuffers::FlatBufferBuilder; +use jsonrpc_core::Result; +use jsonrpc_derive::rpc; +use jsonrpc_types::Alert; +use std::sync::Arc; + +#[rpc] +pub trait AlertRpc { + // curl -d '{"id": 2, "jsonrpc": "2.0", "method":"send_alert","params": [{}]}' -H 'content-type:application/json' 'http://localhost:8114' + #[rpc(name = "send_alert")] + fn send_alert(&self, _alert: Alert) -> Result<()>; +} + +pub(crate) struct AlertRpcImpl { + network_controller: NetworkController, + verifier: Arc, + notifier: Arc, +} + +impl AlertRpcImpl { + pub fn new( + verifier: Arc, + notifier: Arc, + network_controller: NetworkController, + ) -> Self { + AlertRpcImpl { + network_controller, + verifier, + notifier, + } + } +} + +impl AlertRpc for AlertRpcImpl { + fn send_alert(&self, alert: Alert) -> Result<()> { + let alert: CoreAlert = alert.into(); + + let result = self.verifier.verify_signatures(&alert); + + match result { + Ok(()) => { + let fbb = &mut FlatBufferBuilder::new(); + let message = AlertMessage::build_alert(fbb, &alert); + fbb.finish(message, None); + let data = fbb.finished_data().into(); + if let Err(err) = self + .network_controller + .broadcast(NetworkProtocol::ALERT.into(), data) + { + log::error!(target: "rpc", "Broadcast alert failed: {:?}", err); + } + // set self node notifier + self.notifier.add(Arc::new(alert)); + Ok(()) + } + Err(e) => Err(RPCError::custom(RPCError::Invalid, e.to_string())), + } + } +} diff --git a/rpc/src/module/mod.rs b/rpc/src/module/mod.rs index bec81e2ed5..c676315175 100644 --- a/rpc/src/module/mod.rs +++ b/rpc/src/module/mod.rs @@ -1,3 +1,4 @@ +mod alert; mod chain; mod experiment; mod miner; @@ -6,6 +7,7 @@ mod pool; mod stats; mod test; +pub(crate) use self::alert::{AlertRpc, AlertRpcImpl}; pub(crate) use self::chain::{ChainRpc, ChainRpcImpl}; pub(crate) use self::experiment::{ExperimentRpc, ExperimentRpcImpl}; pub(crate) use self::miner::{MinerRpc, MinerRpcImpl}; diff --git a/rpc/src/module/stats.rs b/rpc/src/module/stats.rs index 5ac5c15bd5..dff98882f3 100644 --- a/rpc/src/module/stats.rs +++ b/rpc/src/module/stats.rs @@ -1,3 +1,4 @@ +use ckb_alert_system::notifier::Notifier as AlertNotifier; use ckb_shared::shared::Shared; use ckb_store::ChainStore; use ckb_sync::Synchronizer; @@ -5,6 +6,7 @@ use ckb_traits::BlockMedianTimeContext; use jsonrpc_core::Result; use jsonrpc_derive::rpc; use jsonrpc_types::{ChainInfo, EpochNumber, PeerState, Timestamp}; +use std::sync::Arc; #[rpc] pub trait StatsRpc { @@ -21,6 +23,7 @@ where { pub shared: Shared, pub synchronizer: Synchronizer, + pub alert_notifier: Arc, } impl StatsRpc for StatsRpcImpl { @@ -37,6 +40,13 @@ impl StatsRpc for StatsRpcImpl { let epoch = tip_header.epoch(); let difficulty = tip_header.difficulty().clone(); let is_initial_block_download = self.synchronizer.shared.is_initial_block_download(); + let warnings = self + .alert_notifier + .alerts() + .into_iter() + .map(|alert| alert.message.clone()) + .collect::>() + .join("\n"); Ok(ChainInfo { chain, @@ -44,7 +54,7 @@ impl StatsRpc for StatsRpcImpl { epoch: EpochNumber(epoch), difficulty, is_initial_block_download, - warnings: String::new(), + warnings, }) } diff --git a/rpc/src/server.rs b/rpc/src/server.rs index a37e78741e..8805e459c7 100644 --- a/rpc/src/server.rs +++ b/rpc/src/server.rs @@ -1,9 +1,10 @@ use crate::config::Config; use crate::module::{ - ChainRpc, ChainRpcImpl, ExperimentRpc, ExperimentRpcImpl, IntegrationTestRpc, - IntegrationTestRpcImpl, MinerRpc, MinerRpcImpl, NetworkRpc, NetworkRpcImpl, PoolRpc, - PoolRpcImpl, StatsRpc, StatsRpcImpl, + AlertRpc, AlertRpcImpl, ChainRpc, ChainRpcImpl, ExperimentRpc, ExperimentRpcImpl, + IntegrationTestRpc, IntegrationTestRpcImpl, MinerRpc, MinerRpcImpl, NetworkRpc, NetworkRpcImpl, + PoolRpc, PoolRpcImpl, StatsRpc, StatsRpcImpl, }; +use ckb_alert_system::{notifier::Notifier as AlertNotifier, verifier::Verifier as AlertVerifier}; use ckb_chain::chain::ChainController; use ckb_miner::BlockAssemblerController; use ckb_network::NetworkController; @@ -14,12 +15,14 @@ use jsonrpc_core::IoHandler; use jsonrpc_http_server::{Server, ServerBuilder}; use jsonrpc_server_utils::cors::AccessControlAllowOrigin; use jsonrpc_server_utils::hosts::DomainsValidation; +use std::sync::Arc; pub struct RpcServer { pub(crate) server: Server, } impl RpcServer { + #[allow(clippy::too_many_arguments)] pub fn new( config: Config, network_controller: NetworkController, @@ -27,6 +30,8 @@ impl RpcServer { synchronizer: Synchronizer, chain: ChainController, block_assembler: Option, + alert_notifier: Arc, + alert_verifier: Arc, ) -> RpcServer where CS: ChainStore, @@ -76,6 +81,7 @@ impl RpcServer { StatsRpcImpl { shared: shared.clone(), synchronizer: synchronizer.clone(), + alert_notifier: Arc::clone(&alert_notifier), } .to_delegate(), ); @@ -93,7 +99,7 @@ impl RpcServer { if config.integration_test_enable() { io.extend_with( IntegrationTestRpcImpl { - network_controller, + network_controller: network_controller.clone(), shared, chain, } @@ -101,6 +107,12 @@ impl RpcServer { ); } + if config.alert_enable() { + io.extend_with( + AlertRpcImpl::new(alert_verifier, alert_notifier, network_controller).to_delegate(), + ); + } + let server = ServerBuilder::new(io) .cors(DomainsValidation::AllowOnly(vec![ AccessControlAllowOrigin::Null, diff --git a/rpc/src/test.rs b/rpc/src/test.rs index 58f112fddf..d5f095325c 100644 --- a/rpc/src/test.rs +++ b/rpc/src/test.rs @@ -3,6 +3,7 @@ use crate::module::{ PoolRpcImpl, StatsRpc, StatsRpcImpl, }; use crate::RpcServer; +use ckb_alert_system::{alert_relayer::AlertRelayer, config::Config as AlertSystemConfig}; use ckb_chain::chain::{ChainController, ChainService}; use ckb_chain_spec::consensus::Consensus; use ckb_core::block::BlockBuilder; @@ -164,10 +165,14 @@ fn setup_node( } .to_delegate(), ); + let alert_relayer = AlertRelayer::new("0".to_string(), AlertSystemConfig::default()); + + let alert_notifier = alert_relayer.notifier(); io.extend_with( StatsRpcImpl { shared: shared.clone(), synchronizer: synchronizer.clone(), + alert_notifier, } .to_delegate(), ); diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 47cbd5e429..10ec5e3947 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -36,6 +36,7 @@ pub enum NetworkProtocol { SYNC = 100, RELAY = 101, TIME = 102, + ALERT = 110, } impl Into for NetworkProtocol { diff --git a/util/alert-system/Cargo.toml b/util/alert-system/Cargo.toml new file mode 100644 index 0000000000..6ef5a6f147 --- /dev/null +++ b/util/alert-system/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "ckb-alert-system" +version = "0.1.0" +license = "MIT" +authors = ["Nervos Core Dev "] +edition = "2018" + +[dependencies] +toml = "0.5.0" +multisig = { path = "../multisig" } +ckb-core = { path = "../../core" } +ckb-util = { path = "../../util" } +ckb-network = { path = "../../network" } +ckb-protocol = { path = "../../protocol" } +jsonrpc-types = { path = "../jsonrpc-types" } +log = "0.4" +fnv = "1.0" +faketime = "0.2.0" +numext-fixed-hash = { version = "0.1", features = ["support_rand", "support_heapsize", "support_serde"] } +failure = "0.1.5" +bytes = "0.4.12" +lru-cache = { git = "https://github.com/nervosnetwork/lru-cache", rev = "b36a4d1" } +serde = "1.0" +serde_derive = "1.0" +flatbuffers = "0.6.0" + diff --git a/util/alert-system/src/alert.toml b/util/alert-system/src/alert.toml new file mode 100644 index 0000000000..c93fc497ae --- /dev/null +++ b/util/alert-system/src/alert.toml @@ -0,0 +1,5 @@ +# need 2 signatures to send alert +signatures_threshold = 2 +public_keys = [ +] + diff --git a/util/alert-system/src/alert_relayer.rs b/util/alert-system/src/alert_relayer.rs new file mode 100644 index 0000000000..5d14c353cf --- /dev/null +++ b/util/alert-system/src/alert_relayer.rs @@ -0,0 +1,159 @@ +//! AlertRelayer +//! We implment a Bitcoin like alert system, n of m alert key holders can decide to send alert +//messages to all client +//! to leave a space to reach consensus offline under critical bugs +//! +//! A cli to generate alert message, +//! A config option to set alert messages to broard cast. +// +use crate::config::Config; +use crate::notifier::Notifier; +use crate::verifier::Verifier; +use crate::BAD_MESSAGE_BAN_TIME; +use ckb_core::alert::Alert; +use ckb_network::{CKBProtocolContext, CKBProtocolHandler, PeerIndex, TargetSession}; +use ckb_protocol::{get_root, AlertMessage}; +use flatbuffers::FlatBufferBuilder; +use fnv::{FnvHashMap, FnvHashSet}; +use log::{debug, info, trace}; +use lru_cache::LruCache; +use std::convert::TryInto; +use std::sync::Arc; + +const CANCEL_FILTER_SIZE: usize = 128; +const KNOWN_LIST_SIZE: usize = 64; + +/// AlertRelayer +/// relay alert messages +#[derive(Clone)] +pub struct AlertRelayer { + /// cancelled alerts + cancel_filter: LruCache, + /// unexpired alerts we received + received_alerts: FnvHashMap>, + /// alerts that self node should notice + notifier: Arc, + verifier: Arc, + known_lists: LruCache>, +} + +impl AlertRelayer { + pub fn new(client_version: String, config: Config) -> Self { + AlertRelayer { + cancel_filter: LruCache::new(CANCEL_FILTER_SIZE), + received_alerts: Default::default(), + notifier: Arc::new(Notifier::new(client_version)), + verifier: Arc::new(Verifier::new(config)), + known_lists: LruCache::new(KNOWN_LIST_SIZE), + } + } + + pub fn notifier(&self) -> Arc { + Arc::clone(&self.notifier) + } + + pub fn verifier(&self) -> Arc { + Arc::clone(&self.verifier) + } + + fn receive_new_alert(&mut self, alert: Arc) { + // checkout cancel_id + if alert.cancel > 0 { + self.cancel_filter.insert(alert.cancel, ()); + self.received_alerts.remove(&alert.cancel); + self.notifier.cancel(alert.cancel); + } + // add to received alerts + self.received_alerts.insert(alert.id, Arc::clone(&alert)); + // set self node notice + self.notifier.add(alert); + } + + fn clear_expired_alerts(&mut self) { + let now = faketime::unix_time_as_millis(); + self.received_alerts + .retain(|_id, alert| alert.notice_until > now); + self.notifier.clear_expired_alerts(now); + } + + // return true if it this first time the peer know this alert + fn mark_as_known(&mut self, peer: PeerIndex, alert_id: u32) -> bool { + match self.known_lists.get_refresh(&peer) { + Some(alert_ids) => alert_ids.insert(alert_id), + None => { + let mut alert_ids = FnvHashSet::default(); + alert_ids.insert(alert_id); + self.known_lists.insert(peer, alert_ids); + true + } + } + } +} + +impl CKBProtocolHandler for AlertRelayer { + fn init(&mut self, _nc: Arc) {} + + fn connected( + &mut self, + nc: Arc, + peer_index: PeerIndex, + _version: &str, + ) { + self.clear_expired_alerts(); + for alert in self.received_alerts.values() { + trace!(target: "alert", "send alert {} to peer {}", alert.id, peer_index); + let fbb = &mut FlatBufferBuilder::new(); + let msg = AlertMessage::build_alert(fbb, &alert); + fbb.finish(msg, None); + nc.quick_send_message_to(peer_index, fbb.finished_data().into()); + } + } + + fn received( + &mut self, + nc: Arc, + peer_index: PeerIndex, + data: bytes::Bytes, + ) { + let alert: Arc = match get_root::(&data) + .ok() + .and_then(|m| m.payload()) + .map(TryInto::try_into) + { + Some(Ok(alert)) => Arc::new(alert), + Some(Err(_)) | None => { + info!(target: "network", "Peer {} sends us malformed message", peer_index); + nc.ban_peer(peer_index, BAD_MESSAGE_BAN_TIME); + return; + } + }; + trace!(target: "alert", "receive alert {} from peer {}", alert.id, peer_index); + // ignore alert + if self.received_alerts.contains_key(&alert.id) + || self.cancel_filter.contains_key(&alert.id) + { + return; + } + // verify + if let Err(err) = self.verifier.verify_signatures(&alert) { + debug!(target: "network", "Peer {} sends us a alert with invalid signatures, error {:?}", peer_index, err); + nc.ban_peer(peer_index, BAD_MESSAGE_BAN_TIME); + return; + } + // mark sender as known + self.mark_as_known(peer_index, alert.id); + // broadcast message + let fbb = &mut FlatBufferBuilder::new(); + let msg = AlertMessage::build_alert(fbb, &alert); + fbb.finish(msg, None); + let data = fbb.finished_data().into(); + let selected_peers: Vec = nc + .connected_peers() + .into_iter() + .filter(|peer| self.mark_as_known(*peer, alert.id)) + .collect(); + nc.quick_filter_broadcast(TargetSession::Multi(selected_peers), data); + // add to received alerts + self.receive_new_alert(alert); + } +} diff --git a/util/alert-system/src/config.rs b/util/alert-system/src/config.rs new file mode 100644 index 0000000000..0bdee33e7d --- /dev/null +++ b/util/alert-system/src/config.rs @@ -0,0 +1,15 @@ +use jsonrpc_types::JsonBytes; +use serde_derive::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Config { + pub signatures_threshold: usize, + pub public_keys: Vec, +} + +impl Default for Config { + fn default() -> Self { + let alert_config = include_bytes!("./alert.toml"); + toml::from_slice(&alert_config[..]).expect("alert system config") + } +} diff --git a/util/alert-system/src/lib.rs b/util/alert-system/src/lib.rs new file mode 100644 index 0000000000..3bb77fcf81 --- /dev/null +++ b/util/alert-system/src/lib.rs @@ -0,0 +1,17 @@ +//! Alert System +//! See https://en.bitcoin.it/wiki/Alert_system to learn the history of Bitcoin alert system. +//! We implement the alert system in CKB for urgent situation, +//! In CKB early stage we may meet the same crisis bugs that Bitcoin meet, +//! in urgent case, CKB core team can send an alert message across CKB P2P network, +//! the client will show the alert message, the other behaviors of CKB node will not change. +//! +//! The Alert System will be removed soon once the CKB network is considered mature. +//! +pub mod alert_relayer; +pub mod config; +pub mod notifier; +pub mod verifier; + +use std::time::Duration; + +pub(crate) const BAD_MESSAGE_BAN_TIME: Duration = Duration::from_secs(5 * 60); diff --git a/util/alert-system/src/notifier.rs b/util/alert-system/src/notifier.rs new file mode 100644 index 0000000000..777ace93b1 --- /dev/null +++ b/util/alert-system/src/notifier.rs @@ -0,0 +1,58 @@ +use ckb_core::alert::Alert; +use ckb_util::Mutex; +use log::warn; +use std::sync::Arc; + +pub struct Notifier { + alerts: Mutex>>, + client_version: String, +} + +impl Notifier { + pub fn new(client_version: String) -> Self { + Notifier { + alerts: Mutex::new(Vec::new()), + client_version, + } + } + + pub fn add(&self, alert: Arc) { + if alert + .min_version + .as_ref() + .map(|min_v| self.client_version < *min_v) + == Some(true) + { + return; + } + + if alert + .max_version + .as_ref() + .map(|max_v| self.client_version > *max_v) + == Some(true) + { + return; + } + let mut alerts = self.alerts.lock(); + if alerts.contains(&alert) { + return; + } + warn!(target: "alert", "receive a new alert: {}", alert.message); + alerts.push(alert); + } + + pub fn cancel(&self, id: u32) { + let mut alerts = self.alerts.lock(); + alerts.retain(|a| a.id == id); + } + + pub fn clear_expired_alerts(&self, now: u64) { + let mut alerts = self.alerts.lock(); + alerts.retain(|a| a.notice_until > now); + } + + pub fn alerts(&self) -> Vec> { + self.alerts.lock().clone() + } +} diff --git a/util/alert-system/src/verifier.rs b/util/alert-system/src/verifier.rs new file mode 100644 index 0000000000..acae24211d --- /dev/null +++ b/util/alert-system/src/verifier.rs @@ -0,0 +1,49 @@ +use crate::config::Config; +use ckb_core::alert::Alert; +use failure::Error; +use log::{debug, trace}; + +pub struct Verifier(Config); + +impl Verifier { + pub fn new(config: Config) -> Self { + Verifier(config) + } + + pub fn verify_signatures(&self, alert: &Alert) -> Result<(), Error> { + trace!(target: "alert", "verify alert {:?}", alert); + use multisig::secp256k1::{verify_m_of_n, Message, PublicKey, Signature}; + let message = Message::from_slice(alert.hash().as_bytes())?; + let signatures: Vec> = alert + .signatures + .iter() + .map(|sig| { + if sig.is_empty() { + None + } else { + match Signature::from_compact(sig) { + Ok(i) => Some(i), + Err(err) => { + debug!(target: "alert", "signature error: {}", err); + None + } + } + } + }) + .collect(); + let public_keys = self + .0 + .public_keys + .iter() + .map(|raw| PublicKey::from_slice(raw.as_bytes())) + .collect::, _>>()?; + verify_m_of_n( + &message, + self.0.signatures_threshold, + signatures, + public_keys, + ) + .map_err(|err| err.kind())?; + Ok(()) + } +} diff --git a/util/jsonrpc-types/src/alert.rs b/util/jsonrpc-types/src/alert.rs new file mode 100644 index 0000000000..cbb28b6e12 --- /dev/null +++ b/util/jsonrpc-types/src/alert.rs @@ -0,0 +1,77 @@ +use crate::{bytes::JsonBytes, string, Timestamp}; +use ckb_core::alert::{Alert as CoreAlert, AlertBuilder}; +use serde_derive::{Deserialize, Serialize}; + +#[derive(Clone, Default, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)] +pub struct AlertId(#[serde(with = "string")] pub u32); + +#[derive(Clone, Default, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)] +pub struct AlertPriority(#[serde(with = "string")] pub u32); + +#[derive(Clone, Default, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)] +pub struct Alert { + pub id: AlertId, + pub cancel: AlertId, + pub min_version: Option, + pub max_version: Option, + pub priority: AlertPriority, + pub notice_until: Timestamp, + pub message: String, + pub signatures: Vec, +} + +impl From for CoreAlert { + fn from(json: Alert) -> Self { + let Alert { + id, + cancel, + min_version, + max_version, + priority, + notice_until, + message, + signatures, + } = json; + + AlertBuilder::default() + .id(id.0) + .cancel(cancel.0) + .min_version(min_version) + .max_version(max_version) + .priority(priority.0) + .notice_until(notice_until.0) + .message(message) + .signatures( + signatures + .into_iter() + .map(JsonBytes::into_bytes) + .collect::>(), + ) + .build() + } +} + +impl From for Alert { + fn from(core: CoreAlert) -> Self { + let CoreAlert { + id, + cancel, + min_version, + max_version, + priority, + notice_until, + message, + signatures, + } = core; + Alert { + id: AlertId(id), + cancel: AlertId(cancel), + min_version, + max_version, + priority: AlertPriority(priority), + notice_until: Timestamp(notice_until), + message, + signatures: signatures.into_iter().map(JsonBytes::from_bytes).collect(), + } + } +} diff --git a/util/jsonrpc-types/src/lib.rs b/util/jsonrpc-types/src/lib.rs index 91aee1f3e6..263dc8886c 100644 --- a/util/jsonrpc-types/src/lib.rs +++ b/util/jsonrpc-types/src/lib.rs @@ -1,3 +1,4 @@ +mod alert; mod block_template; mod blockchain; mod bytes; @@ -31,6 +32,7 @@ pub struct Timestamp(#[serde(with = "string")] pub u64); #[derive(Clone, Default, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)] pub struct Unsigned(#[serde(with = "string")] pub u64); +pub use self::alert::Alert; pub use self::block_template::{ BlockTemplate, CellbaseTemplate, TransactionTemplate, UncleTemplate, }; diff --git a/util/multisig/Cargo.toml b/util/multisig/Cargo.toml index 60f7035a52..f79ea36634 100644 --- a/util/multisig/Cargo.toml +++ b/util/multisig/Cargo.toml @@ -1,6 +1,7 @@ [package] name = "multisig" version = "0.1.0" +license = "MIT" authors = ["Nervos Core Dev "] edition = "2018" @@ -8,5 +9,8 @@ edition = "2018" secp256k1 = "0.12.2" lazy_static = "1.3.0" failure = "0.1.5" +log = "0.4" + +[dev-dependencies] rand = "0.6.5" diff --git a/util/multisig/src/error.rs b/util/multisig/src/error.rs index ea1be352ab..0173ccfedd 100644 --- a/util/multisig/src/error.rs +++ b/util/multisig/src/error.rs @@ -31,6 +31,6 @@ impl From for Error { impl From> for Error { fn from(inner: Context) -> Error { - Error { inner: inner } + Error { inner } } } diff --git a/util/multisig/src/secp256k1.rs b/util/multisig/src/secp256k1.rs index b1e5545996..01277ff5ba 100644 --- a/util/multisig/src/secp256k1.rs +++ b/util/multisig/src/secp256k1.rs @@ -1,4 +1,5 @@ use crate::error::{Error, ErrorKind}; +use log::{debug, trace}; pub use secp256k1::{ All, Error as Secp256k1Error, Message, PublicKey, RecoverableSignature, Secp256k1, SecretKey, Signature, @@ -29,7 +30,17 @@ pub fn verify_m_of_n( let verified_sig_count = sigs .iter() .zip(pks.iter()) - .filter_map(|(sig, pk)| sig.and_then(|sig| SECP256K1.verify(&message, &sig, pk).ok())) + .filter_map(|(sig, pk)| { + sig.and_then(|sig| { + trace!(target: "multisig", "verify sig {:x?} with pubkey {:x?}", &sig.serialize_compact()[..], &pk.serialize()[..]); + match SECP256K1.verify(&message, &sig, pk) { + Ok(()) => Some(()), + Err(err) => { + debug!(target: "multisig", "verify secp256k1 sig error: {}", err); + None + } + }}) + }) .take(m_threshold) .count(); if verified_sig_count < m_threshold { @@ -90,12 +101,9 @@ mod tests { Err(ErrorKind::Threshold(2, 3)), ), ]; - for (threshold, sigs, pks, result) in test_set.into_iter() { + for (threshold, sigs, pks, result) in test_set.iter() { let message = random_message(); - let sks: Vec = (0..sigs.len()) - .into_iter() - .map(|_| random_secret_key()) - .collect(); + let sks: Vec = (0..sigs.len()).map(|_| random_secret_key()).collect(); let pks: Vec = sks .iter() .enumerate() @@ -103,7 +111,7 @@ mod tests { .take(*pks) .collect(); let sigs: Vec> = sigs - .into_iter() + .iter() .enumerate() .map(|(i, valid)| { if *valid { @@ -122,7 +130,7 @@ mod tests { #[test] fn test_2_of_3_with_wrong_signature() { let message = random_message(); - let sks: Vec = (0..3).into_iter().map(|_| random_secret_key()).collect(); + let sks: Vec = (0..3).map(|_| random_secret_key()).collect(); let pks: Vec = sks .iter() .map(|sk| PublicKey::from_secret_key(&SECP256K1, sk))