diff --git a/Cargo.lock b/Cargo.lock index 19cc16346b..d6878be97a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -483,9 +483,11 @@ dependencies = [ "serde_derive 1.0.89 (registry+https://github.com/rust-lang/crates.io-index)", "snap 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", "tempfile 3.0.7 (registry+https://github.com/rust-lang/crates.io-index)", - "tentacle 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tentacle-ping 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tentacle-secio 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tentacle 0.2.0-alpha.1 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)", + "tentacle-discovery 0.1.0 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)", + "tentacle-identify 0.1.0 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)", + "tentacle-ping 0.2.0 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)", + "tentacle-secio 0.1.0 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)", "tokio 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "unsigned-varint 0.2.2 (git+https://github.com/paritytech/unsigned-varint)", ] @@ -616,6 +618,7 @@ version = "0.7.0-pre" dependencies = [ "bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", "bloom-filters 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "ckb-chain 0.7.0-pre", "ckb-chain-spec 0.7.0-pre", "ckb-core 0.7.0-pre", @@ -970,6 +973,26 @@ name = "data-encoding" version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "data-encoding-macro" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "data-encoding 2.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "data-encoding-macro-internal 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro-hack 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "data-encoding-macro-internal" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "data-encoding 2.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro-hack 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 0.11.11 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "digest" version = "0.8.0" @@ -992,6 +1015,11 @@ name = "encode_unicode" version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "endian-type" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "env_logger" version = "0.6.1" @@ -1278,6 +1306,16 @@ dependencies = [ "want 0.0.6 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "idna" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "unicode-bidi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "unicode-normalization 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "indexmap" version = "1.0.2" @@ -1571,6 +1609,11 @@ dependencies = [ "linked-hash-map 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "matches" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "memchr" version = "2.1.3" @@ -1647,6 +1690,11 @@ dependencies = [ "winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "nibble_vec" +version = "0.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "nix" version = "0.11.0" @@ -1844,6 +1892,19 @@ name = "plain" version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "proc-macro-hack" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro-hack-impl 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "proc-macro-hack-impl" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "proc-macro2" version = "0.4.27" @@ -1876,6 +1937,11 @@ name = "quick-error" version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "quote" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "quote" version = "0.6.11" @@ -1903,6 +1969,15 @@ dependencies = [ "rusqlite 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "radix_trie" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "endian-type 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "nibble_vec 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "rand" version = "0.3.23" @@ -2368,6 +2443,17 @@ dependencies = [ "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "socket2" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.48 (registry+https://github.com/rust-lang/crates.io-index)", + "redox_syscall 0.1.51 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "spin" version = "0.5.0" @@ -2411,6 +2497,16 @@ name = "subtle" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "syn" +version = "0.11.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)", + "synom 0.11.3 (registry+https://github.com/rust-lang/crates.io-index)", + "unicode-xid 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "syn" version = "0.15.26" @@ -2421,6 +2517,14 @@ dependencies = [ "unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "synom" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "unicode-xid 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "synstructure" version = "0.10.1" @@ -2452,37 +2556,68 @@ dependencies = [ [[package]] name = "tentacle" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" +version = "0.2.0-alpha.1" +source = "git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb#9c42be05724c3e2decb73716d0f515c36a7af1cb" dependencies = [ "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "flatbuffers 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "parity-multiaddr 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tentacle-secio 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tentacle-secio 0.1.0 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)", "tokio 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-threadpool 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-yamux 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-yamux 0.1.4 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)", ] [[package]] -name = "tentacle-ping" +name = "tentacle-discovery" version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" +source = "git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb#9c42be05724c3e2decb73716d0f515c36a7af1cb" dependencies = [ + "byteorder 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "flatbuffers 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", - "generic-channel 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.89 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive 1.0.89 (registry+https://github.com/rust-lang/crates.io-index)", + "tentacle 0.2.0-alpha.1 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)", + "tokio 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", + "trust-dns 0.15.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "tentacle-identify" +version = "0.1.0" +source = "git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb#9c42be05724c3e2decb73716d0f515c36a7af1cb" +dependencies = [ + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", + "flatbuffers 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", - "tentacle 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tentacle 0.2.0-alpha.1 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)", "tokio 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "tentacle-ping" +version = "0.2.0" +source = "git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb#9c42be05724c3e2decb73716d0f515c36a7af1cb" +dependencies = [ + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", + "flatbuffers 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", + "generic-channel 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "tentacle 0.2.0-alpha.1 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)", +] + [[package]] name = "tentacle-secio" version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" +source = "git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb#9c42be05724c3e2decb73716d0f515c36a7af1cb" dependencies = [ "aes-ctr 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "bs58 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2795,7 +2930,7 @@ dependencies = [ [[package]] name = "tokio-yamux" version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" +source = "git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb#9c42be05724c3e2decb73716d0f515c36a7af1cb" dependencies = [ "byteorder 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2821,6 +2956,49 @@ dependencies = [ "serde 1.0.89 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "trust-dns" +version = "0.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "data-encoding 2.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "data-encoding-macro 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "radix_trie 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-tcp 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "trust-dns-proto 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "trust-dns-proto" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "byteorder 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "data-encoding 2.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", + "idna 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)", + "smallvec 0.6.8 (registry+https://github.com/rust-lang/crates.io-index)", + "socket2 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-reactor 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-tcp 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-timer 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-udp 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "url 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "try-lock" version = "0.1.0" @@ -2859,11 +3037,32 @@ dependencies = [ "version_check 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "unicode-bidi" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "unicode-normalization" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "smallvec 0.6.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "unicode-width" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "unicode-xid" +version = "0.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "unicode-xid" version = "0.1.0" @@ -2896,6 +3095,16 @@ name = "untrusted" version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "url" +version = "1.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "idna 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "utf8-ranges" version = "1.0.2" @@ -3090,9 +3299,12 @@ dependencies = [ "checksum ctr 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "044f882973b245404e90c90e7e42e8ee8d7a64edfd7adf83d684fb97e8e2c1b6" "checksum ctrlc 3.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "630391922b1b893692c6334369ff528dcc3a9d8061ccf4c803aa8f83cb13db5e" "checksum data-encoding 2.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f4f47ca1860a761136924ddd2422ba77b2ea54fe8cc75b9040804a0d9d32ad97" +"checksum data-encoding-macro 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "9b87bf377fc964606c3679c1de6822a9a9d8b69aae2651ca4af28cb2d1550b37" +"checksum data-encoding-macro-internal 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "22b318a8d6d56c45df45c61fcc7d2dbf98ea014d4987e7c74ef1f86c9b87e503" "checksum digest 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "05f47366984d3ad862010e22c7ce81a7dbcaebbdfb37241a620f8b6596ee135c" "checksum either 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3be565ca5c557d7f59e7cfcf1844f9e3033650c929c6566f511e8005f205c1d0" "checksum encode_unicode 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "90b2c9496c001e8cb61827acdefad780795c42264c137744cae6f7d9e3450abd" +"checksum endian-type 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" "checksum env_logger 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b61fa891024a945da30a9581546e8cfaf5602c7b3f4c137a2805cf388f92075a" "checksum error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "07e791d3be96241c77c43846b665ef1384606da2cd2a48730abe606a12906e02" "checksum failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "795bd83d3abeb9220f257e597aa0080a508b27533824adf336529648f6abf7e2" @@ -3123,6 +3335,7 @@ dependencies = [ "checksum humantime 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3ca7e5f2e110db35f93b837c81797f3714500b81d517bf20c431b16d3ca4f114" "checksum hyper 0.11.27 (registry+https://github.com/rust-lang/crates.io-index)" = "34a590ca09d341e94cddf8e5af0bbccde205d5fbc2fa3c09dd67c7f85cea59d7" "checksum hyper 0.12.25 (registry+https://github.com/rust-lang/crates.io-index)" = "7d5b6658b016965ae301fa995306db965c93677880ea70765a84235a96eae896" +"checksum idna 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "38f09e0f0b1fb55fdee1f17470ad800da77af5186a1a76c026b679358b7e844e" "checksum indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7e81a7c05f79578dbc15793d8b619db9ba32b4577003ef3af1a91c416798c58d" "checksum indicatif 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2c60da1c9abea75996b70a931bba6c750730399005b61ccd853cee50ef3d0d0c" "checksum iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dbe6e417e7d0975db6512b90796e8ce223145ac4e33c377e4a42882a0e88bb08" @@ -3153,6 +3366,7 @@ dependencies = [ "checksum log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "c84ec4b527950aa83a329754b01dbe3f58361d1c5efacd1f6d68c494d08a17c6" "checksum lru-cache 0.1.0 (git+https://github.com/nervosnetwork/lru-cache)" = "" "checksum lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4d06ff7ff06f729ce5f4e227876cb88d10bc59cd4ae1e09fbb2bde15c850dc21" +"checksum matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" "checksum memchr 2.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "e1dd4eaac298c32ce07eb6ed9242eda7d82955b9170b7d6db59b2e02cc63fcb8" "checksum memoffset 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0f9dc261e2b62d7a622bf416ea3c5245cdd5d9a7fcc428c0d06804dfce1775b3" "checksum merkle-cbt 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d589b5a7ca642540e7ccfbca3bcd0aa18693eb9287e2a6b17c79b1d062d52863" @@ -3161,6 +3375,7 @@ dependencies = [ "checksum mio-uds 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)" = "966257a94e196b11bb43aca423754d87429960a768de9414f3691d6957abf125" "checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919" "checksum net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)" = "42550d9fb7b6684a6d404d9fa7250c2eb2646df731d1c06afc06dcee9e1bcf88" +"checksum nibble_vec 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "c8d77f3db4bce033f4d04db08079b2ef1c3d02b44e86f25d08886fafa7756ffa" "checksum nix 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d37e713a259ff641624b6cb20e3b12b2952313ba36b6823c0f16e6cfd9e5de17" "checksum nodrop 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)" = "2f9667ddcc6cc8a43afc9b7917599d7216aa09c463919ea32c59ed6cac8bc945" "checksum nom 4.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b30adc557058ce00c9d0d7cb3c6e0b5bc6f36e2e2eabe74b0ba726d194abd588" @@ -3182,12 +3397,16 @@ dependencies = [ "checksum percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831" "checksum pkg-config 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)" = "676e8eb2b1b4c9043511a9b7bea0915320d7e502b0a079fb03f9635a5252b18c" "checksum plain 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6" +"checksum proc-macro-hack 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2c725b36c99df7af7bf9324e9c999b9e37d92c8f8caf106d82e1d7953218d2d8" +"checksum proc-macro-hack-impl 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2b753ad9ed99dd8efeaa7d2fb8453c8f6bc3e54b97966d35f1bc77ca6865254a" "checksum proc-macro2 0.4.27 (registry+https://github.com/rust-lang/crates.io-index)" = "4d317f9caece796be1980837fd5cb3dfec5613ebdb04ad0956deea83ce168915" "checksum proptest 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8ea66c78d75f2c6e9f304269eaef90899798daecc69f1a625d5a3dd793ff3522" "checksum quick-error 1.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "9274b940887ce9addde99c4eee6b5c44cc494b182b97e73dc8ffdcb3397fd3f0" +"checksum quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e920b65c65f10b2ae65c831a81a073a89edd28c7cce89475bff467ab4167a" "checksum quote 0.6.11 (registry+https://github.com/rust-lang/crates.io-index)" = "cdd8e04bd9c52e0342b406469d494fcb033be4bdbe5c606016defbb1681411e1" "checksum r2d2 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)" = "5d746fc8a0dab19ccea7ff73ad535854e90ddb3b4b8cdce953dd5cd0b2e7bd22" "checksum r2d2_sqlite 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a7cba990b29ae565b1a765ef45f6b84a89a77736b91582e0243c12f613653857" +"checksum radix_trie 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "ebcf72e767017c1aa4b63d4dd0b0b836a243b648fd81d41c6bf6e850ef7a95c7" "checksum rand 0.3.23 (registry+https://github.com/rust-lang/crates.io-index)" = "64ac302d8f83c0c1974bf758f6b041c6c8ada916fbb44a609158ca8b064cc76c" "checksum rand 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293" "checksum rand 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)" = "c618c47cd3ebd209790115ab837de41425723956ad3ce2e6a7f09890947cacb9" @@ -3244,19 +3463,24 @@ dependencies = [ "checksum smallvec 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4c8cbcd6df1e117c2210e13ab5109635ad68a929fcbb8964dc965b76cb5ee013" "checksum smallvec 0.6.8 (registry+https://github.com/rust-lang/crates.io-index)" = "88aea073965ab29f6edb5493faf96ad662fb18aa9eeb186a3b7057951605ed15" "checksum snap 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "95d697d63d44ad8b78b8d235bf85b34022a78af292c8918527c5f0cffdde7f43" +"checksum socket2 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)" = "c4d11a52082057d87cb5caa31ad812f4504b97ab44732cd8359df2e9ff9f48e7" "checksum spin 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "44363f6f51401c34e7be73db0db371c04705d35efbe9f7d6082e03a921a32c55" "checksum stable_deref_trait 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "dba1a27d3efae4351c8051072d619e3ade2820635c3958d826bfea39d59b54c8" "checksum stream-cipher 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8861bc80f649f5b4c9bd38b696ae9af74499d479dbfb327f0607de6b326a36bc" "checksum string 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "b639411d0b9c738748b5397d5ceba08e648f4f1992231aa859af1a017f31f60b" "checksum strsim 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bb4f380125926a99e52bc279241539c018323fab05ad6368b56f93d9369ff550" "checksum subtle 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2d67a5a62ba6e01cb2192ff309324cb4875d0c451d55fe2319433abe7a05a8ee" +"checksum syn 0.11.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d3b891b9015c88c576343b9b3e41c2c11a51c219ef067b264bd9c8aa9b441dad" "checksum syn 0.15.26 (registry+https://github.com/rust-lang/crates.io-index)" = "f92e629aa1d9c827b2bb8297046c1ccffc57c99b947a680d3ccff1f136a3bee9" +"checksum synom 0.11.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a393066ed9010ebaed60b9eafa373d4b1baac186dd7e008555b0f702b51945b6" "checksum synstructure 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)" = "73687139bf99285483c96ac0add482c3776528beac1d97d444f6e91f203a2015" "checksum take 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b157868d8ac1f56b64604539990685fa7611d8fa9e5476cf0c02cf34d32917c5" "checksum tempfile 3.0.7 (registry+https://github.com/rust-lang/crates.io-index)" = "b86c784c88d98c801132806dadd3819ed29d8600836c4088e855cdf3e178ed8a" -"checksum tentacle 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6c29b4a6d8dbf15e8191c4723be562689ec734edc86fa9a84585c256d53e04be" -"checksum tentacle-ping 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "853dcfad34c9521fbb0c1dfc64d363b6efc774aae044a89411a25e3155130e24" -"checksum tentacle-secio 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9562b522b885f057e39aac9feb9048e64d4b9cc7f5c0be5933ae6606eb56ad40" +"checksum tentacle 0.2.0-alpha.1 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)" = "" +"checksum tentacle-discovery 0.1.0 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)" = "" +"checksum tentacle-identify 0.1.0 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)" = "" +"checksum tentacle-ping 0.2.0 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)" = "" +"checksum tentacle-secio 0.1.0 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)" = "" "checksum termcolor 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "4096add70612622289f2fdcdbd5086dc81c1e2675e6ae58d6c4f62a16c6d7f2f" "checksum termion 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "689a3bdfaab439fd92bc87df5c4c78417d3cbe537487274e9b0b2dce76e92096" "checksum termios 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "72b620c5ea021d75a735c943269bb07d30c9b77d6ac6b236bc8b5c496ef05625" @@ -3282,21 +3506,27 @@ dependencies = [ "checksum tokio-trace-core 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "350c9edade9830dc185ae48ba45667a445ab59f6167ef6d0254ec9d2430d9dd3" "checksum tokio-udp 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "66268575b80f4a4a710ef83d087fdfeeabdce9b74c797535fbac18a2cb906e92" "checksum tokio-uds 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "037ffc3ba0e12a0ab4aca92e5234e0dedeb48fddf6ccd260f1f150a36a9f2445" -"checksum tokio-yamux 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "1815b4502bb86c8ffaae8521ecc26e148edb4fcc2ed24c1feba0807a492b4946" +"checksum tokio-yamux 0.1.4 (git+https://github.com/nervosnetwork/p2p?rev=9c42be05724c3e2decb73716d0f515c36a7af1cb)" = "" "checksum toml 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)" = "758664fc71a3a69038656bee8b6be6477d2a6c315a6b81f7081f591bffa4111f" "checksum toml 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "87c5890a989fa47ecdc7bcb4c63a77a82c18f306714104b1decfd722db17b39e" +"checksum trust-dns 0.15.1 (registry+https://github.com/rust-lang/crates.io-index)" = "65096825b064877da37eeeb9a83390bd23433eabfc503a6476dc5b1949034aa7" +"checksum trust-dns-proto 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)" = "09144f0992b0870fa8d2972cc069cbf1e3c0fda64d1f3d45c4d68d0e0b52ad4e" "checksum try-lock 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ee2aa4715743892880f70885373966c83d73ef1b0838a664ef0c76fffd35e7c2" "checksum try-lock 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e604eb7b43c06650e854be16a2a03155743d3752dd1c943f6829e26b7a36e382" "checksum twofish 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "712d261e83e727c8e2dbb75dacac67c36e35db36a958ee504f2164fc052434e1" "checksum typenum 1.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "612d636f949607bdf9b123b4a6f6d966dedf3ff669f7f045890d3a4a73948169" "checksum ucd-util 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "535c204ee4d8434478593480b8f86ab45ec9aae0e83c568ca81abf0fd0e88f86" "checksum unicase 2.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9d3218ea14b4edcaccfa0df0a64a3792a2c32cc706f1b336e48867f9d3147f90" +"checksum unicode-bidi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "49f2bd0c6468a8230e1db229cff8029217cf623c767ea5d60bfbd42729ea54d5" +"checksum unicode-normalization 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "141339a08b982d942be2ca06ff8b076563cbe223d1befd5450716790d44e2426" "checksum unicode-width 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "882386231c45df4700b275c7ff55b6f3698780a650026380e72dabe76fa46526" +"checksum unicode-xid 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f860d7d29cf02cb2f3f359fd35991af3d30bac52c57d265a3c461074cb4dc" "checksum unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc" "checksum unreachable 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "382810877fe448991dfc7f0dd6e3ae5d58088fd0ea5e35189655f84e6814fa56" "checksum unsigned-varint 0.2.2 (git+https://github.com/paritytech/unsigned-varint)" = "" "checksum unsigned-varint 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "2c64cdf40b4a9645534a943668681bcb219faf51874d4b65d2e0abda1b10a2ab" "checksum untrusted 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "55cd1f4b4e96b46aeb8d4855db4a7a9bd96eeeb5c6a1ab54593328761642ce2f" +"checksum url 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dd4e7c0d531266369519a4aa4f399d748bd37043b00bde1e4ff1f60a120b355a" "checksum utf8-ranges 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "796f7e48bef87609f7ade7e06495a87d5cd06c7866e6a5cbfceffc558a243737" "checksum vcpkg 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "def296d3eb3b12371b2c7d0e83bfe1403e4db2d7a0bba324a12b21c4ee13143d" "checksum vec_map 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "05c78687fb1a80548ae3250346c3db86a80a7cdd77bda190189f2d0a0987c81a" diff --git a/network/Cargo.toml b/network/Cargo.toml index fb20c1d48b..1abab69d32 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -20,9 +20,11 @@ bytes = "0.4.12" tokio = "0.1.17" futures = { version = "0.1.19", features = ["use_std"] } snap = "0.2" -p2p = { version = "0.1", package="tentacle" } -p2p-ping = { version = "0.1", package="tentacle-ping" } -secio = { version = "0.1", package="tentacle-secio" } +p2p = { git = "https://github.com/nervosnetwork/p2p", rev="9c42be05724c3e2decb73716d0f515c36a7af1cb", package="tentacle" } +secio = { git = "https://github.com/nervosnetwork/p2p", rev="9c42be05724c3e2decb73716d0f515c36a7af1cb", package="tentacle-secio" } +p2p-ping = { git = "https://github.com/nervosnetwork/p2p", rev="9c42be05724c3e2decb73716d0f515c36a7af1cb", package="tentacle-ping" } +p2p-discovery = { git = "https://github.com/nervosnetwork/p2p", rev="9c42be05724c3e2decb73716d0f515c36a7af1cb", package="tentacle-discovery" } +p2p-identify = { git = "https://github.com/nervosnetwork/p2p", rev="9c42be05724c3e2decb73716d0f515c36a7af1cb", package="tentacle-identify" } faketime = "0.2.0" rusqlite = {version = "0.16.0", features = ["bundled"]} lazy_static = "1.3.0" diff --git a/network/src/network.rs b/network/src/network.rs index bdf9d71efe..c6f574e434 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -7,6 +7,8 @@ use crate::protocol::Version as ProtocolVersion; use crate::protocol_handler::{CKBProtocolHandler, DefaultCKBProtocolContext}; use crate::service::{ ckb_service::CKBService, + discovery_service::{DiscoveryEvent, DiscoveryProtocol, DiscoveryService}, + identify_service::{IdentifyAddressManager, IdentifyEvent, IdentifyProtocol, IdentifyService}, outbound_peer_service::OutboundPeerService, ping_service::PingService, timer_service::{TimerRegistry, TimerService}, @@ -19,6 +21,7 @@ use bytes::Bytes; use ckb_util::{Mutex, RwLock}; use fnv::FnvHashMap; use futures::future::{select_all, Future}; +use futures::sync::mpsc; use futures::sync::mpsc::channel; use futures::sync::mpsc::Receiver; use futures::sync::oneshot; @@ -26,22 +29,23 @@ use futures::Stream; use log::{debug, error, info, warn}; use multiaddr::multihash::Multihash; use p2p::{ - builder::ServiceBuilder, + builder::{MetaBuilder, ServiceBuilder}, multiaddr::{self, Multiaddr}, secio::{PeerId, PublicKey}, - service::{Service, ServiceError, ServiceEvent}, + service::{DialProtocol, ProtocolHandle, Service, ServiceError, ServiceEvent}, traits::ServiceHandle, }; -use p2p_ping::{Event as PingEvent, PingProtocol}; +use p2p_ping::{Event as PingEvent, PingHandler}; use secio; use std::boxed::Box; use std::cmp::max; use std::sync::Arc; use std::time::{Duration, Instant}; use std::usize; -use tokio::codec::LengthDelimitedCodec; const PING_PROTOCOL_ID: ProtocolId = 0; +const DISCOVERY_PROTOCOL_ID: ProtocolId = 1; +const IDENTIFY_PROTOCOL_ID: ProtocolId = 2; pub type CKBProtocols = Vec<(CKBProtocol, Arc)>; type NetworkResult = Result< @@ -80,7 +84,13 @@ impl PeerInfo { } } -type P2PService = Service; +pub struct EventReceivers { + pub ping_event_receiver: Receiver, + pub disc_event_receiver: mpsc::UnboundedReceiver, + pub identify_event_receiver: mpsc::UnboundedReceiver, +} + +type P2PService = Service; pub struct Network { pub(crate) peers_registry: RwLock, @@ -313,7 +323,7 @@ impl Network { } pub fn dial_addr(&self, addr: Multiaddr) { - if let Err(err) = self.p2p_control.write().dial(addr) { + if let Err(err) = self.p2p_control.write().dial(addr, DialProtocol::All) { error!(target: "network", "failed to dial: {}", err); } } @@ -338,7 +348,7 @@ impl Network { pub(crate) fn inner_build( config: &NetworkConfig, ckb_protocols: CKBProtocols, - ) -> Result<(Arc, P2PService, TimerRegistry, Receiver), Error> { + ) -> Result<(Arc, P2PService, TimerRegistry, EventReceivers), Error> { let local_private_key = match config.fetch_private_key() { Some(private_key) => private_key?, None => return Err(ConfigError::InvalidKey.into()), @@ -369,17 +379,50 @@ impl Network { config.reserved_only, reserved_peers, ); - let mut p2p_service = ServiceBuilder::default().forever(true); // register protocols let (ping_sender, ping_receiver) = channel(std::u8::MAX as usize); - p2p_service = p2p_service.insert_protocol(PingProtocol::new( - PING_PROTOCOL_ID, - config.ping_interval, - config.ping_timeout, - ping_sender, - )); + let ping_meta = MetaBuilder::default() + .id(PING_PROTOCOL_ID) + .service_handle(move || { + ProtocolHandle::Callback(Box::new(PingHandler::new( + PING_PROTOCOL_ID, + config.ping_interval, + config.ping_timeout, + ping_sender, + ))) + }) + .build(); + + let (disc_sender, disc_receiver) = mpsc::unbounded(); + let disc_meta = MetaBuilder::default() + .id(DISCOVERY_PROTOCOL_ID) + .service_handle(move || { + ProtocolHandle::Callback(Box::new(DiscoveryProtocol::new( + DISCOVERY_PROTOCOL_ID, + disc_sender, + ))) + }) + .build(); + + let (identify_sender, identify_receiver) = mpsc::unbounded(); + let identify_addr_mgr = IdentifyAddressManager::new(identify_sender.clone()); + let identify_meta = MetaBuilder::default() + .id(IDENTIFY_PROTOCOL_ID) + .service_handle(move || { + ProtocolHandle::Callback(Box::new(IdentifyProtocol::new( + IDENTIFY_PROTOCOL_ID, + identify_addr_mgr.clone(), + ))) + }) + .build(); + + let mut p2p_service = ServiceBuilder::default() + .forever(true) + .insert_protocol(ping_meta) + .insert_protocol(disc_meta) + .insert_protocol(identify_meta); for (ckb_protocol, _) in &ckb_protocols { - p2p_service = p2p_service.insert_protocol(ckb_protocol.clone()); + p2p_service = p2p_service.insert_protocol(ckb_protocol.build()); } let mut p2p_service = p2p_service .key_pair(local_private_key.clone()) @@ -453,7 +496,16 @@ impl Network { } } - Ok((network, p2p_service, timer_registry, ping_receiver)) + Ok(( + network, + p2p_service, + timer_registry, + EventReceivers { + ping_event_receiver: ping_receiver, + disc_event_receiver: disc_receiver, + identify_event_receiver: identify_receiver, + }, + )) } pub(crate) fn build_network_future( @@ -463,23 +515,21 @@ impl Network { p2p_service: P2PService, timer_registry: TimerRegistry, ckb_event_receiver: Receiver, - ping_event_receiver: Receiver, + receivers: EventReceivers, ) -> Result + Send>, Error> { // initialize ckb_protocols let ping_service = PingService { network: Arc::clone(&network), - event_receiver: ping_event_receiver, + event_receiver: receivers.ping_event_receiver, }; - //let identify_service = Arc::new(IdentifyService { - // client_version, - // protocol_version, - // identify_timeout: config.identify_timeout, - // identify_interval: config.identify_interval, - //}); + let disc_service = + DiscoveryService::new(Arc::clone(&network), receivers.disc_event_receiver); + let identify_service = + IdentifyService::new(Arc::clone(&network), receivers.identify_event_receiver); let ckb_service = CKBService { - event_receiver: ckb_event_receiver, network: Arc::clone(&network), + event_receiver: ckb_event_receiver, }; let timer_service = TimerService::new(timer_registry, Arc::clone(&network)); let outbound_peer_service = @@ -501,17 +551,16 @@ impl Network { .for_each(|_| Ok(())) .map_err(|_err| Error::Shutdown), ), - // Box::new( - // discovery_query_service - // .into_future() - // .map(|_| ()) - // .map_err(|(err, _)| err), - // ) as Box + Send>, - //identify_service.start_protocol( - // Arc::clone(&network), - // swarm_controller.clone(), - // basic_transport.clone(), - //), + Box::new( + disc_service + .for_each(|_| Ok(())) + .map_err(|_err| Error::Shutdown), + ), + Box::new( + identify_service + .for_each(|_| Ok(())) + .map_err(|_err| Error::Shutdown), + ), Box::new(timer_service.timer_futures.for_each(|_| Ok(()))), Box::new( outbound_peer_service @@ -543,7 +592,7 @@ impl Network { ckb_protocols: CKBProtocols, ckb_event_receiver: Receiver, ) -> NetworkResult { - let (network, p2p_service, timer_registry, ping_event_receiver) = + let (network, p2p_service, timer_registry, receivers) = Self::inner_build(config, ckb_protocols)?; let (close_tx, close_rx) = oneshot::channel(); let network_future = Self::build_network_future( @@ -553,7 +602,7 @@ impl Network { p2p_service, timer_registry, ckb_event_receiver, - ping_event_receiver, + receivers, )?; Ok((network, close_tx, network_future)) } diff --git a/network/src/network_service.rs b/network/src/network_service.rs index 3eb367ffc8..30bcb497d4 100644 --- a/network/src/network_service.rs +++ b/network/src/network_service.rs @@ -9,27 +9,34 @@ use ckb_util::Mutex; use futures::future::Future; use futures::sync::mpsc::Receiver; use futures::sync::oneshot; -use log::{debug, info}; +use log::{debug, error, info}; use std::sync::Arc; -use std::thread; use tokio::runtime; pub struct StopHandler { signal: oneshot::Sender<()>, - thread: thread::JoinHandle<()>, + network_runtime: runtime::Runtime, } impl StopHandler { - pub fn new(signal: oneshot::Sender<()>, thread: thread::JoinHandle<()>) -> StopHandler { - StopHandler { signal, thread } + pub fn new(signal: oneshot::Sender<()>, network_runtime: runtime::Runtime) -> StopHandler { + StopHandler { + signal, + network_runtime, + } } pub fn close(self) { - let StopHandler { signal, thread } = self; + let StopHandler { + signal, + network_runtime, + } = self; if let Err(e) = signal.send(()) { debug!(target: "network", "send shutdown signal error, ignoring error: {:?}", e) }; - thread.join().expect("join network_service thread"); + // TODO: not that gracefully shutdown, will output below error message: + // "terminate called after throwing an instance of 'std::system_error'" + network_runtime.shutdown_now(); } } @@ -67,42 +74,39 @@ impl NetworkService { ckb_protocols: CKBProtocols, ckb_event_receiver: Receiver, ) -> Result { - let (network, p2p_service, timer_registry, ping_event_receiver) = + let (network, p2p_service, timer_registry, receivers) = Network::inner_build(config, ckb_protocols)?; let (close_tx, close_rx) = oneshot::channel(); let (init_tx, init_rx) = oneshot::channel(); - let join_handle = thread::spawn({ - let network = Arc::clone(&network); - let config = config.clone(); - move || { - info!( - target: "network", - "network peer_id {:?}", - network.local_public_key().peer_id() - ); - let network_future = Network::build_network_future( - network, - &config, - close_rx, - p2p_service, - timer_registry, - ckb_event_receiver, - ping_event_receiver, - ) - .expect("Network thread init"); - init_tx.send(()).expect("Network init signal send"); - // here we use default config - let network_runtime = runtime::Runtime::new().expect("Network tokio runtime init");; - match network_runtime.block_on_all(network_future) { - Ok(_) => info!(target: "network", "network service exit"), - Err(err) => panic!("network service exit unexpected {}", err), - } - } - }); + + info!( + target: "network", + "network peer_id {:?}", + network.local_public_key().peer_id() + ); + let network_future = Network::build_network_future( + Arc::clone(&network), + &config, + close_rx, + p2p_service, + timer_registry, + ckb_event_receiver, + receivers, + ) + .expect("Network thread init"); + init_tx.send(()).expect("Network init signal send"); + // here we use default config + let mut network_runtime = runtime::Runtime::new().expect("Network tokio runtime init"); + network_runtime.spawn( + network_future + .map(|_| info!(target: "network", "network service exit")) + .map_err(|err| error!("network service exit unexpected {}", err)), + ); + init_rx.wait().map_err(|_err| Error::Shutdown)?; Ok(NetworkService { network, - stop_handler: Mutex::new(Some(StopHandler::new(close_tx, join_handle))), + stop_handler: Mutex::new(Some(StopHandler::new(close_tx, network_runtime))), }) } diff --git a/network/src/protocol.rs b/network/src/protocol.rs index a68043c5cb..95186ac14f 100644 --- a/network/src/protocol.rs +++ b/network/src/protocol.rs @@ -1,12 +1,14 @@ use crate::{peers_registry::Session, PeerId, ServiceContext, SessionContext}; +use bytes::Bytes; use futures::sync::mpsc::Sender; use log::{debug, error}; use p2p::{ + builder::MetaBuilder, multiaddr::Multiaddr, - traits::{ProtocolMeta, ServiceProtocol}, + service::{ProtocolHandle, ProtocolMeta}, + traits::ServiceProtocol, ProtocolId, }; -use tokio::codec::LengthDelimitedCodec; pub type Version = u8; @@ -50,34 +52,24 @@ impl CKBProtocol { pub fn match_version(&self, version: Version) -> bool { self.supported_versions.contains(&version) } -} - -impl ProtocolMeta for CKBProtocol { - fn name(&self) -> String { - self.protocol_name() - } - - fn id(&self) -> ProtocolId { - CKBProtocol::id(&self) - } - - fn codec(&self) -> LengthDelimitedCodec { - LengthDelimitedCodec::new() - } - - fn service_handle(&self) -> Option> { - let handler = Box::new(CKBHandler { - id: self.id(), - event_sender: self.event_sender.clone(), - }); - Some(handler) - } - fn support_versions(&self) -> Vec { - self.supported_versions + pub fn build(&self) -> ProtocolMeta { + let event_sender = self.event_sender.clone(); + let supported_versions = self + .supported_versions .iter() - .map(|v| format!("{}", v)) - .collect() + .map(|v| v.to_string()) + .collect::>(); + MetaBuilder::default() + .id(self.id) + .support_versions(supported_versions) + .service_handle(move || { + ProtocolHandle::Callback(Box::new(CKBHandler { + id: self.id, + event_sender, + })) + }) + .build() } } @@ -85,7 +77,7 @@ impl ProtocolMeta for CKBProtocol { pub enum Event { Connected(PeerId, Multiaddr, Session, ProtocolId, Version), Disconnected(PeerId, ProtocolId), - Received(PeerId, ProtocolId, Vec), + Received(PeerId, ProtocolId, Bytes), Notify(ProtocolId, u64), } @@ -106,8 +98,9 @@ impl ServiceProtocol for CKBHandler { fn init(&mut self, _control: &mut ServiceContext) {} fn connected(&mut self, control: &mut ServiceContext, session: &SessionContext, version: &str) { let (peer_id, version) = { - let parsed_version = version.parse::(); - if session.remote_pubkey.is_none() || parsed_version.is_err() { + // TODO: version number should be discussed. + let parsed_version = version.parse::().ok(); + if session.remote_pubkey.is_none() || parsed_version.is_none() { error!(target: "network", "ckb protocol connected error, addr: {}, protocol:{}, version: {}", session.address, self.id, version); control.disconnect(session.id); return; @@ -146,7 +139,7 @@ impl ServiceProtocol for CKBHandler { } } - fn received(&mut self, _control: &mut ServiceContext, session: &SessionContext, data: Vec) { + fn received(&mut self, _control: &mut ServiceContext, session: &SessionContext, data: Bytes) { if let Some(peer_id) = session .remote_pubkey .as_ref() diff --git a/network/src/protocol_handler.rs b/network/src/protocol_handler.rs index 6f62a8d195..61ca5aca1b 100644 --- a/network/src/protocol_handler.rs +++ b/network/src/protocol_handler.rs @@ -1,5 +1,6 @@ use crate::errors::{Error, PeerError, ProtocolError}; use crate::{Network, PeerIndex, ProtocolId, SessionInfo, TimerRegistry, TimerToken}; +use bytes::Bytes; use ckb_util::Mutex; use log::debug; use log::info; @@ -148,7 +149,7 @@ impl CKBProtocolContext for DefaultCKBProtocolContext { pub trait CKBProtocolHandler: Sync + Send { fn initialize(&self, _nc: Box); - fn received(&self, _nc: Box, _peer: PeerIndex, _data: Vec); + fn received(&self, _nc: Box, _peer: PeerIndex, _data: Bytes); fn connected(&self, _nc: Box, _peer: PeerIndex); fn disconnected(&self, _nc: Box, _peer: PeerIndex); fn timer_triggered(&self, _nc: Box, _timer: TimerToken) {} diff --git a/network/src/service.rs b/network/src/service.rs index 18a76a1dc9..9dd961cdfe 100644 --- a/network/src/service.rs +++ b/network/src/service.rs @@ -1,4 +1,6 @@ pub mod ckb_service; +pub mod discovery_service; +pub mod identify_service; pub mod outbound_peer_service; pub mod ping_service; pub mod timer_service; diff --git a/network/src/service/discovery_service.rs b/network/src/service/discovery_service.rs new file mode 100644 index 0000000000..d2dcc56453 --- /dev/null +++ b/network/src/service/discovery_service.rs @@ -0,0 +1,292 @@ +// use crate::peer_store::Behaviour; +use crate::Network; +use fnv::FnvHashMap; +use futures::{sync::mpsc, sync::oneshot, Async, Future, Stream}; +use log::{debug, warn}; +use std::sync::Arc; + +use p2p::{ + context::{ServiceContext, SessionContext}, + multiaddr::{Multiaddr, Protocol}, + secio::PeerId, + traits::ServiceProtocol, + utils::extract_peer_id, + yamux::session::SessionType, + ProtocolId, SessionId, +}; +use p2p_discovery::{ + AddressManager, Direction, Discovery, DiscoveryHandle, MisbehaveResult, Misbehavior, Substream, +}; + +pub struct DiscoveryProtocol { + id: ProtocolId, + discovery: Option>, + discovery_handle: DiscoveryHandle, + discovery_senders: FnvHashMap>>, + event_sender: mpsc::UnboundedSender, +} + +impl DiscoveryProtocol { + pub fn new( + id: ProtocolId, + event_sender: mpsc::UnboundedSender, + ) -> DiscoveryProtocol { + let addr_mgr = DiscoveryAddressManager { + event_sender: event_sender.clone(), + }; + let discovery = Discovery::new(addr_mgr); + let discovery_handle = discovery.handle(); + DiscoveryProtocol { + id, + discovery: Some(discovery), + discovery_handle, + discovery_senders: FnvHashMap::default(), + event_sender, + } + } +} + +impl ServiceProtocol for DiscoveryProtocol { + fn init(&mut self, control: &mut ServiceContext) { + debug!(target: "network", "protocol [discovery({})]: init", self.id); + + let discovery_task = self + .discovery + .take() + .map(|discovery| { + debug!(target: "network", "Start discovery future_task"); + discovery + .for_each(|()| Ok(())) + .map_err(|err| { + warn!(target: "network", "discovery stream error: {:?}", err); + }) + .then(|_| { + debug!(target: "network", "End of discovery"); + Ok(()) + }) + }) + .unwrap(); + control.future_task(discovery_task); + } + + fn connected(&mut self, control: &mut ServiceContext, session: &SessionContext, _: &str) { + debug!( + target: "network", + "protocol [discovery] open on session [{}], address: [{}], type: [{:?}]", + session.id, session.address, session.ty + ); + let event = DiscoveryEvent::Connected { + session_id: session.id, + peer_id: session.remote_pubkey.clone().map(|pubkey| pubkey.peer_id()), + }; + if self.event_sender.unbounded_send(event).is_err() { + warn!(target: "network", "receiver maybe dropped!"); + return; + } + + let direction = if session.ty == SessionType::Server { + Direction::Inbound + } else { + Direction::Outbound + }; + let (sender, receiver) = mpsc::channel(8); + self.discovery_senders.insert(session.id, sender); + let substream = Substream::new( + session.address.clone(), + direction, + self.id, + session.id, + receiver, + control.control().clone(), + control.listens(), + ); + match self.discovery_handle.substream_sender.try_send(substream) { + Ok(_) => { + debug!(target: "network", "Send substream success"); + } + Err(err) => { + // TODO: handle channel is full (wait for poll API?) + warn!(target: "network", "Send substream failed : {:?}", err); + } + } + } + + fn disconnected(&mut self, _control: &mut ServiceContext, session: &SessionContext) { + let event = DiscoveryEvent::Disconnected(session.id); + if self.event_sender.unbounded_send(event).is_err() { + warn!(target: "network", "receiver maybe dropped!"); + return; + } + self.discovery_senders.remove(&session.id); + debug!(target: "network", "protocol [discovery] close on session [{}]", session.id); + } + + fn received( + &mut self, + _control: &mut ServiceContext, + session: &SessionContext, + data: bytes::Bytes, + ) { + debug!(target: "network", "[received message]: length={}", data.len()); + + if let Some(ref mut sender) = self.discovery_senders.get_mut(&session.id) { + // TODO: handle channel is full (wait for poll API?) + if let Err(err) = sender.try_send(data.to_vec()) { + if err.is_full() { + warn!(target: "network", "channel is full"); + } else if err.is_disconnected() { + warn!(target: "network", "channel is disconnected"); + } else { + warn!(target: "network", "other channel error: {:?}", err); + } + } + } + } +} + +pub enum DiscoveryEvent { + Connected { + session_id: SessionId, + peer_id: Option, + }, + Disconnected(SessionId), + AddNewAddrs { + session_id: SessionId, + addrs: Vec, + }, + Misbehave { + session_id: SessionId, + kind: Misbehavior, + result: oneshot::Sender, + }, + GetRandom { + n: usize, + result: oneshot::Sender>, + }, +} + +pub struct DiscoveryService { + event_receiver: mpsc::UnboundedReceiver, + network: Arc, + sessions: FnvHashMap, +} + +impl DiscoveryService { + pub fn new( + network: Arc, + event_receiver: mpsc::UnboundedReceiver, + ) -> DiscoveryService { + DiscoveryService { + event_receiver, + network, + sessions: FnvHashMap::default(), + } + } +} + +impl Stream for DiscoveryService { + type Item = (); + type Error = (); + fn poll(&mut self) -> Result>, Self::Error> { + match try_ready!(self.event_receiver.poll()) { + Some(DiscoveryEvent::Connected { + session_id, + peer_id, + }) => { + if let Some(peer_id) = peer_id { + self.sessions.insert(session_id, peer_id); + } + } + Some(DiscoveryEvent::Disconnected(session_id)) => { + self.sessions.remove(&session_id); + } + Some(DiscoveryEvent::AddNewAddrs { session_id, addrs }) => { + if let Some(_peer_id) = self.sessions.get(&session_id) { + // TODO: wait for peer store update + for addr in addrs.into_iter() { + if let Some(peer_id) = extract_peer_id(&addr) { + let addr = addr + .into_iter() + .filter(|proto| match proto { + Protocol::P2p(_) => false, + _ => true, + }) + .collect::(); + let _ = self + .network + .peer_store() + .write() + .add_discovered_address(&peer_id, addr); + } + } + } + } + Some(DiscoveryEvent::Misbehave { + session_id: _session_id, + kind: _kind, + result: _result, + }) => { + // FIXME: + } + Some(DiscoveryEvent::GetRandom { n, result }) => { + let addrs = self + .network + .peer_store() + .read() + .random_peers(n as u32) + .into_iter() + .map(|(_peer_id, addr)| addr) + .collect(); + result + .send(addrs) + .expect("Send failed (should not happened)"); + } + None => { + debug!(target: "network", "discovery service shutdown"); + return Ok(Async::Ready(None)); + } + } + Ok(Async::Ready(Some(()))) + } +} + +pub struct DiscoveryAddressManager { + pub event_sender: mpsc::UnboundedSender, +} + +impl AddressManager for DiscoveryAddressManager { + fn add_new_addr(&mut self, _session_id: SessionId, _addr: Multiaddr) {} + + fn add_new_addrs(&mut self, session_id: SessionId, addrs: Vec) { + let event = DiscoveryEvent::AddNewAddrs { session_id, addrs }; + if self.event_sender.unbounded_send(event).is_err() { + warn!(target: "network", "receiver maybe dropped!"); + } + } + + fn misbehave(&mut self, session_id: SessionId, kind: Misbehavior) -> MisbehaveResult { + let (sender, receiver) = oneshot::channel(); + let event = DiscoveryEvent::Misbehave { + session_id, + kind, + result: sender, + }; + if self.event_sender.unbounded_send(event).is_err() { + warn!(target: "network", "receiver maybe dropped!"); + MisbehaveResult::Disconnect + } else { + receiver.wait().unwrap_or(MisbehaveResult::Disconnect) + } + } + + fn get_random(&mut self, n: usize) -> Vec { + let (sender, receiver) = oneshot::channel(); + let event = DiscoveryEvent::GetRandom { n, result: sender }; + if self.event_sender.unbounded_send(event).is_err() { + warn!(target: "network", "receiver maybe dropped!"); + Vec::new() + } else { + receiver.wait().ok().unwrap_or_else(Vec::new) + } + } +} diff --git a/network/src/service/identify_service.rs b/network/src/service/identify_service.rs index 079be95127..60505af9da 100644 --- a/network/src/service/identify_service.rs +++ b/network/src/service/identify_service.rs @@ -1,236 +1,151 @@ -#![allow(clippy::needless_pass_by_value)] - -use super::Network; -use super::PeerId; -use crate::peers_registry::PeerIdentifyInfo; -use crate::protocol::Protocol; -use crate::protocol_service::ProtocolService; -use crate::transport::TransportOutput; -use futures::future::{self, Future}; -use futures::Stream; -use libp2p::core::Multiaddr; -use libp2p::core::SwarmController; -use libp2p::core::{upgrade, MuxedTransport}; -use libp2p::identify::IdentifyProtocolConfig; -use libp2p::identify::{IdentifyInfo, IdentifyOutput}; -use libp2p::{self, Transport}; -use log::{debug, error, trace, warn}; -use std::boxed::Box; -use std::io::{Error as IoError, ErrorKind as IoErrorKind}; +// use crate::peer_store::Behaviour; +use crate::Network; +use futures::{sync::mpsc, sync::oneshot, Async, Future, Stream}; +use log::{debug, warn}; +use std::collections::HashMap; use std::sync::Arc; -use std::time::Duration; -use std::time::Instant; -use tokio::io::{AsyncRead, AsyncWrite}; -use tokio::timer::Interval; -const MAX_LISTENED_ADDRS: usize = 10; +use p2p::{ + multiaddr::{Multiaddr, Protocol}, + secio::PeerId, + utils::multiaddr_to_socketaddr, +}; + +pub use p2p_identify::IdentifyProtocol; +use p2p_identify::{AddrManager, MisbehaveResult, Misbehavior}; -pub struct IdentifyService { - pub client_version: String, - pub protocol_version: String, - pub identify_timeout: Duration, - pub identify_interval: Duration, +#[derive(Clone)] +pub(crate) struct IdentifyAddressManager { + event_sender: mpsc::UnboundedSender, } -impl IdentifyService { - fn process_identify_info( - &self, - network: Arc, - peer_id: &PeerId, - info: &IdentifyInfo, - observed_addr: &Multiaddr, - ) -> Result<(), IoError> { - trace!("process identify for peer_id {:?} with {:?}", peer_id, info); - // set identify info to peer - { - let identify_info = PeerIdentifyInfo { - client_version: info.agent_version.clone(), - protocol_version: info.protocol_version.clone(), - supported_protocols: info.protocols.clone(), - count_of_known_listen_addrs: info.listen_addrs.len(), - }; - if network - .set_peer_identify_info(&peer_id, identify_info) - .is_err() - { - error!( - target: "network", - "can't find peer_id {:?} during process identify info", - peer_id - ) - } - } +impl IdentifyAddressManager { + pub(crate) fn new( + event_sender: mpsc::UnboundedSender, + ) -> IdentifyAddressManager { + IdentifyAddressManager { event_sender } + } +} - // add obserevd listened addr - for original_address in network.original_listened_addresses.read().iter() { - let transport = libp2p::tcp::TcpConfig::new(); - trace!( - target: "network", - "try get address use original_address {:?} and observed_address {:?}", - original_address, - observed_addr - ); - // get an external addrs for our node - if let Some(ext_addr) = transport.nat_traversal(original_address, &observed_addr) { - debug!(target: "network", "get new external address {:?}", ext_addr); - network.discovery_listened_address(ext_addr.to_owned()); - } +impl AddrManager for IdentifyAddressManager { + fn add_listen_addrs(&mut self, peer_id: &PeerId, addrs: Vec) { + let event = IdentifyEvent::AddListenAddrs { + peer_id: peer_id.clone(), + addrs, + }; + if self.event_sender.unbounded_send(event).is_err() { + warn!(target: "network", "receiver maybe dropped!"); } - - // update peer addrs in peerstore - let _ = network - .peer_store() - .write() - .add_discovered_addresses(peer_id, info.listen_addrs.clone()); - Ok(()) } -} -impl ProtocolService for IdentifyService -where - T: AsyncRead + AsyncWrite + Send + 'static, -{ - type Output = IdentifyOutput; - fn convert_to_protocol( - peer_id: Arc, - addr: &Multiaddr, - output: Self::Output, - ) -> Protocol { - let peer_id = PeerId::clone(&peer_id); - match output { - IdentifyOutput::RemoteInfo { - info, - observed_addr, - } => Protocol::IdentifyRequest(peer_id, info, observed_addr), - IdentifyOutput::Sender { sender } => { - Protocol::IdentifyResponse(peer_id, sender, addr.to_owned()) - } + fn add_observed_addr(&mut self, peer_id: &PeerId, addr: Multiaddr) -> MisbehaveResult { + let event = IdentifyEvent::AddObservedAddr { + peer_id: peer_id.clone(), + addr, + }; + if self.event_sender.unbounded_send(event).is_err() { + warn!(target: "network", "receiver maybe dropped!"); } + // NOTE: for future usage + MisbehaveResult::Continue } - fn handle( - &self, - network: Arc, - protocol: Protocol, - ) -> Box + Send> { - match protocol { - Protocol::IdentifyRequest(peer_id, info, ovserved_addr) => match self - .process_identify_info(Arc::clone(&network), &peer_id, &info, &ovserved_addr) - { - Ok(_) => Box::new(future::ok(())), - Err(err) => Box::new(future::err(err)), - }, - Protocol::IdentifyResponse(_peer_id, sender, addr) => { - sender.send( - IdentifyInfo { - public_key: network.local_public_key().clone(), - protocol_version: format!("ckb/{}", self.protocol_version).to_owned(), - agent_version: format!("ckb/{}", self.client_version).to_owned(), - listen_addrs: network - .listened_addresses(MAX_LISTENED_ADDRS) - .into_iter() - .map(|(addr, _)| addr) - .collect(), - protocols: vec![], // TODO FIXME: report local protocols - }, - &addr, - ) - } - _ => Box::new(future::ok(())) as Box + Send>, + fn misbehave(&mut self, peer_id: &PeerId, kind: Misbehavior) -> MisbehaveResult { + let (sender, receiver) = oneshot::channel(); + let event = IdentifyEvent::Misbehave { + peer_id: peer_id.clone(), + kind, + result: sender, + }; + if self.event_sender.unbounded_send(event).is_err() { + warn!(target: "network", "receiver maybe dropped!"); + MisbehaveResult::Disconnect + } else { + receiver.wait().unwrap_or(MisbehaveResult::Disconnect) } } +} + +pub enum IdentifyEvent { + AddListenAddrs { + peer_id: PeerId, + addrs: Vec, + }, + AddObservedAddr { + peer_id: PeerId, + addr: Multiaddr, + }, + Misbehave { + peer_id: PeerId, + kind: Misbehavior, + result: oneshot::Sender, + }, +} - fn start_protocol( - &self, +pub(crate) struct IdentifyService { + event_receiver: mpsc::UnboundedReceiver, + network: Arc, + listen_addrs: HashMap>, +} + +impl IdentifyService { + pub(crate) fn new( network: Arc, - swarm_controller: SwarmController< - SwarmTran, - Box + Send>, - >, - transport: Tran, - ) -> Box + Send> - where - SwarmTran: MuxedTransport> + Clone + Send + 'static, - SwarmTran::MultiaddrFuture: Send + 'static, - SwarmTran::Dial: Send, - SwarmTran::Listener: Send, - SwarmTran::ListenerUpgrade: Send, - SwarmTran::Incoming: Send, - SwarmTran::IncomingUpgrade: Send, - Tran: MuxedTransport> + Clone + Send + 'static, - Tran::MultiaddrFuture: Send + 'static, - Tran::Dial: Send, - Tran::Listener: Send, - Tran::ListenerUpgrade: Send, - Tran::Incoming: Send, - Tran::IncomingUpgrade: Send, - TranOut: AsyncRead + AsyncWrite + Send + 'static, - { - let transport = transport.and_then(move |out, endpoint, client_addr| { - let peer_id = out.peer_id; - upgrade::apply(out.socket, IdentifyProtocolConfig, endpoint, client_addr).map( - move |(output, addr)| { - let protocol = match output { - IdentifyOutput::RemoteInfo { - info, - observed_addr, - } => Protocol::IdentifyRequest(peer_id, info, observed_addr), - IdentifyOutput::Sender { .. } => { - panic!("should not reach here because we are dialer") - } - }; - (protocol, addr) - }, - ) - }); + event_receiver: mpsc::UnboundedReceiver, + ) -> IdentifyService { + IdentifyService { + event_receiver, + network, + listen_addrs: HashMap::default(), + } + } +} - let periodic_identify_future = Interval::new( - Instant::now() + Duration::from_secs(5), - self.identify_interval, - ) - .map_err(|err| { - debug!(target: "network", "identify periodic error {:?}", err); - IoError::new( - IoErrorKind::Other, - format!("identify periodic error {:?}", err), - ) - }) - .for_each({ - let transport = transport.clone(); - let _identify_timeout = self.identify_timeout; - let network = Arc::clone(&network); - move |_| { - for peer_id in network.peers() { - if let Some(ref identify_info) = network.get_peer_identify_info(&peer_id) { - if identify_info.count_of_known_listen_addrs > 0 { - continue; - } - } - // TODO should we try all addresses? - if let Some(addr) = network.get_peer_addresses(&peer_id).get(0) { - trace!( - target: "network", - "request identify to peer {:?} {:?}", - peer_id, - addr - ); - // dial identify - let _ = swarm_controller.dial(addr.clone(), transport.clone()); - } else { - error!( - target: "network", - "error when prepare identify : can't find addresses for peer {:?}", - peer_id - ); - } +impl Stream for IdentifyService { + type Item = (); + type Error = (); + fn poll(&mut self) -> Result>, Self::Error> { + match try_ready!(self.event_receiver.poll()) { + Some(IdentifyEvent::AddListenAddrs { peer_id, addrs }) => { + self.listen_addrs.insert(peer_id, addrs); + } + Some(IdentifyEvent::AddObservedAddr { peer_id, addr }) => { + // TODO: how to use listen addresses + if let Some(addr) = self + .listen_addrs + .get(&peer_id) + .and_then(|addrs| addrs.iter().next()) + .and_then(|addr| multiaddr_to_socketaddr(addr)) + .map(|socket_addr| socket_addr.port()) + .map(move |port| { + addr.into_iter() + .filter_map(|proto| match proto { + Protocol::Tcp(_) => Some(Protocol::Tcp(port)), + // Remove p2p part + Protocol::P2p(_) => None, + value => Some(value), + }) + .collect() + }) + { + let _ = self + .network + .peer_store() + .write() + .add_discovered_address(&peer_id, addr); } - Ok(()) } - }) - .then(|err| { - warn!(target: "network", "Identify service stopped, reason: {:?}", err); - err - }); - Box::new(periodic_identify_future) as Box + Send> + Some(IdentifyEvent::Misbehave { result, .. }) => { + // TODO: report misbehave + if result.send(MisbehaveResult::Continue).is_err() { + return Err(()); + } + } + None => { + debug!(target: "network", "identify service shutdown"); + return Ok(Async::Ready(None)); + } + } + Ok(Async::Ready(Some(()))) } } diff --git a/network/src/service/outbound_peer_service.rs b/network/src/service/outbound_peer_service.rs index 4f3906ff7f..c9cb5ae247 100644 --- a/network/src/service/outbound_peer_service.rs +++ b/network/src/service/outbound_peer_service.rs @@ -1,6 +1,6 @@ use crate::Network; use futures::{Async, Stream}; -use log::{debug, error}; +use log::{debug, warn}; use std::sync::Arc; use std::time::Duration; use std::usize; @@ -38,19 +38,16 @@ impl Stream for OutboundPeerService { .peer_store() .read() .peers_to_attempt(new_outbound as u32); - for (peer_id, addr) in attempt_peers.iter().filter_map(|(peer_id, addr)| { - if self.network.local_peer_id() != peer_id { - Some((peer_id.clone(), addr.clone())) - } else { - None - } - }) { + for (peer_id, addr) in attempt_peers + .into_iter() + .filter(|(peer_id, _addr)| self.network.local_peer_id() != peer_id) + { self.network.dial(&peer_id, addr); } } } None => { - error!(target: "network", "ckb outbound peer service stopped"); + warn!(target: "network", "ckb outbound peer service stopped"); return Ok(Async::Ready(None)); } } diff --git a/src/cli/run_impl.rs b/src/cli/run_impl.rs index bf3218659d..6c43493cc7 100644 --- a/src/cli/run_impl.rs +++ b/src/cli/run_impl.rs @@ -106,6 +106,7 @@ pub fn run(setup: Setup) { rpc_server.close(); info!(target: "main", "Jsonrpc shutdown"); + // FIXME: should gracefully shutdown network network.close(); info!(target: "main", "Network shutdown"); } diff --git a/sync/Cargo.toml b/sync/Cargo.toml index 92881ac0bf..2a996dc998 100644 --- a/sync/Cargo.toml +++ b/sync/Cargo.toml @@ -26,6 +26,7 @@ ckb-chain-spec = { path = "../spec" } bloom-filters = "0.1.0" ckb-traits = { path = "../traits" } failure = "0.1.5" +bytes = "0.4.12" [dev-dependencies] ckb-notify = { path = "../notify" } diff --git a/sync/src/net_time_checker.rs b/sync/src/net_time_checker.rs index 5b621870fe..2750f7f3cc 100644 --- a/sync/src/net_time_checker.rs +++ b/sync/src/net_time_checker.rs @@ -1,3 +1,4 @@ +use bytes::Bytes; use ckb_network::{CKBProtocolContext, CKBProtocolHandler, PeerIndex}; use ckb_protocol::TimeMessage; use ckb_util::RwLock; @@ -90,7 +91,7 @@ impl Default for NetTimeProtocol { impl CKBProtocolHandler for NetTimeProtocol { fn initialize(&self, _nc: Box) {} - fn received(&self, nc: Box, peer: PeerIndex, data: Vec) { + fn received(&self, nc: Box, peer: PeerIndex, data: Bytes) { // collect time sample from outbound peer if nc.session_info(peer).map(|s| s.peer.is_outbound()) == Some(true) { let now: u64 = faketime::unix_time_as_millis(); diff --git a/sync/src/relayer/mod.rs b/sync/src/relayer/mod.rs index 115469220f..2c2f28f8f9 100644 --- a/sync/src/relayer/mod.rs +++ b/sync/src/relayer/mod.rs @@ -17,6 +17,7 @@ use self::get_block_transactions_process::GetBlockTransactionsProcess; use self::transaction_process::TransactionProcess; use crate::relayer::compact_block::ShortTransactionID; use crate::types::Peers; +use bytes::Bytes; use ckb_chain::chain::ChainController; use ckb_core::block::{Block, BlockBuilder}; use ckb_core::transaction::{ProposalShortId, Transaction}; @@ -269,7 +270,7 @@ where let _ = nc.register_timer(TX_PROPOSAL_TOKEN, Duration::from_millis(100)); } - fn received(&self, nc: Box, peer: PeerIndex, data: Vec) { + fn received(&self, nc: Box, peer: PeerIndex, data: Bytes) { // TODO use flatbuffers verifier let msg = get_root::(&data); debug!(target: "relay", "msg {:?}", msg.payload_type()); diff --git a/sync/src/synchronizer/mod.rs b/sync/src/synchronizer/mod.rs index 56d99a2d30..886ac968be 100644 --- a/sync/src/synchronizer/mod.rs +++ b/sync/src/synchronizer/mod.rs @@ -21,6 +21,7 @@ use crate::{ MAX_OUTBOUND_PEERS_TO_PROTECT_FROM_DISCONNECT, MAX_TIP_AGE, POW_SPACE, }; use bitflags::bitflags; +use bytes::Bytes; use ckb_chain::chain::ChainController; use ckb_chain_spec::consensus::Consensus; use ckb_core::block::Block; @@ -691,7 +692,7 @@ where let _ = nc.register_timer(TIMEOUT_EVICTION_TOKEN, Duration::from_millis(1000)); } - fn received(&self, nc: Box, peer: PeerIndex, data: Vec) { + fn received(&self, nc: Box, peer: PeerIndex, data: Bytes) { // TODO use flatbuffers verifier let msg = get_root::(&data); debug!(target: "sync", "msg {:?}", msg.payload_type()); diff --git a/sync/src/tests/mod.rs b/sync/src/tests/mod.rs index 5f80c262d6..32a2c8ff2e 100644 --- a/sync/src/tests/mod.rs +++ b/sync/src/tests/mod.rs @@ -1,3 +1,4 @@ +use bytes::Bytes; use ckb_network::{ errors::Error as NetworkError, CKBProtocolContext, CKBProtocolHandler, PeerIndex, ProtocolId, SessionInfo, Severity, TimerToken, @@ -101,7 +102,7 @@ impl TestNode { timer_senders: self.timer_senders.clone(), }), *peer, - payload.clone(), + Bytes::from(payload.clone()), ) };