diff --git a/Cargo.lock b/Cargo.lock index fa40dd802..4bf1b90c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -167,71 +167,35 @@ checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" [[package]] name = "arrow" -version = "46.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04a8801ebb147ad240b2d978d3ab9f73c9ccd4557ba6a03e7800496770ed10e0" +checksum = "aa285343fba4d829d49985bdc541e3789cf6000ed0e84be7c039438df4a4e78c" dependencies = [ - "ahash", - "arrow-arith 46.0.0", - "arrow-array 46.0.0", - "arrow-buffer 46.0.0", - "arrow-cast 46.0.0", + "arrow-arith", + "arrow-array", + "arrow-buffer", + "arrow-cast", "arrow-csv", - "arrow-data 46.0.0", - "arrow-ipc 46.0.0", + "arrow-data", + "arrow-ipc", "arrow-json", - "arrow-ord 46.0.0", - "arrow-row 46.0.0", - "arrow-schema 46.0.0", - "arrow-select 46.0.0", - "arrow-string 46.0.0", -] - -[[package]] -name = "arrow" -version = "49.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bc25126d18a012146a888a0298f2c22e1150327bd2765fc76d710a556b2d614" -dependencies = [ - "ahash", - "arrow-arith 49.0.0", - "arrow-array 49.0.0", - "arrow-buffer 49.0.0", - "arrow-cast 49.0.0", - "arrow-data 49.0.0", - "arrow-ipc 49.0.0", - "arrow-ord 49.0.0", - "arrow-row 49.0.0", - "arrow-schema 49.0.0", - "arrow-select 49.0.0", - "arrow-string 49.0.0", -] - -[[package]] -name = "arrow-arith" -version = "46.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "895263144bd4a69751cbe6a34a53f26626e19770b313a9fa792c415cd0e78f11" -dependencies = [ - "arrow-array 46.0.0", - "arrow-buffer 46.0.0", - "arrow-data 46.0.0", - "arrow-schema 46.0.0", - "chrono", - "half", - "num", + "arrow-ord", + "arrow-row", + "arrow-schema", + "arrow-select", + "arrow-string", ] [[package]] name = "arrow-arith" -version = "49.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34ccd45e217ffa6e53bbb0080990e77113bdd4e91ddb84e97b77649810bcf1a7" +checksum = "753abd0a5290c1bcade7c6623a556f7d1659c5f4148b140b5b63ce7bd1a45705" dependencies = [ - "arrow-array 49.0.0", - "arrow-buffer 49.0.0", - "arrow-data 49.0.0", - "arrow-schema 49.0.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", "chrono", "half", "num", @@ -239,14 +203,14 @@ dependencies = [ [[package]] name = "arrow-array" -version = "46.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "226fdc6c3a4ae154a74c24091d36a90b514f0ed7112f5b8322c1d8f354d8e20d" +checksum = "d390feeb7f21b78ec997a4081a025baef1e2e0d6069e181939b61864c9779609" dependencies = [ "ahash", - "arrow-buffer 46.0.0", - "arrow-data 46.0.0", - "arrow-schema 46.0.0", + "arrow-buffer", + "arrow-data", + "arrow-schema", "chrono", "chrono-tz", "half", @@ -254,38 +218,11 @@ dependencies = [ "num", ] -[[package]] -name = "arrow-array" -version = "49.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bda9acea48b25123c08340f3a8ac361aa0f74469bb36f5ee9acf923fce23e9d" -dependencies = [ - "ahash", - "arrow-buffer 49.0.0", - "arrow-data 49.0.0", - "arrow-schema 49.0.0", - "chrono", - "half", - "hashbrown 0.14.3", - "num", -] - [[package]] name = "arrow-buffer" -version = "46.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc4843af4dd679c2f35b69c572874da8fde33be53eb549a5fb128e7a4b763510" -dependencies = [ - "bytes", - "half", - "num", -] - -[[package]] -name = "arrow-buffer" -version = "49.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01a0fc21915b00fc6c2667b069c1b64bdd920982f426079bc4a7cab86822886c" +checksum = "69615b061701bcdffbc62756bc7e85c827d5290b472b580c972ebbbf690f5aa4" dependencies = [ "bytes", "half", @@ -294,33 +231,15 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "46.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35e8b9990733a9b635f656efda3c9b8308c7a19695c9ec2c7046dd154f9b144b" +checksum = "e448e5dd2f4113bf5b74a1f26531708f5edcacc77335b7066f9398f4bcf4cdef" dependencies = [ - "arrow-array 46.0.0", - "arrow-buffer 46.0.0", - "arrow-data 46.0.0", - "arrow-schema 46.0.0", - "arrow-select 46.0.0", - "chrono", - "comfy-table", - "half", - "lexical-core", - "num", -] - -[[package]] -name = "arrow-cast" -version = "49.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5dc0368ed618d509636c1e3cc20db1281148190a78f43519487b2daf07b63b4a" -dependencies = [ - "arrow-array 49.0.0", - "arrow-buffer 49.0.0", - "arrow-data 49.0.0", - "arrow-schema 49.0.0", - "arrow-select 49.0.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", "base64 0.21.7", "chrono", "comfy-table", @@ -331,15 +250,15 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "46.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "646fbb4e11dd0afb8083e883f53117713b8caadb4413b3c9e63e3f535da3683c" +checksum = "46af72211f0712612f5b18325530b9ad1bfbdc87290d5fbfd32a7da128983781" dependencies = [ - "arrow-array 46.0.0", - "arrow-buffer 46.0.0", - "arrow-cast 46.0.0", - "arrow-data 46.0.0", - "arrow-schema 46.0.0", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", "chrono", "csv", "csv-core", @@ -350,67 +269,62 @@ dependencies = [ [[package]] name = "arrow-data" -version = "46.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da900f31ff01a0a84da0572209be72b2b6f980f3ea58803635de47913191c188" +checksum = "67d644b91a162f3ad3135ce1184d0a31c28b816a581e08f29e8e9277a574c64e" dependencies = [ - "arrow-buffer 46.0.0", - "arrow-schema 46.0.0", + "arrow-buffer", + "arrow-schema", "half", "num", ] [[package]] -name = "arrow-data" -version = "49.0.0" +name = "arrow-flight" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "907fafe280a3874474678c1858b9ca4cb7fd83fb8034ff5b6d6376205a08c634" +checksum = "1d7f215461ad6346f2e4cc853e377d4e076d533e1ed78d327debe83023e3601f" dependencies = [ - "arrow-buffer 49.0.0", - "arrow-schema 49.0.0", - "half", - "num", -] - -[[package]] -name = "arrow-ipc" -version = "46.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2707a8d7ee2d345d045283ece3ae43416175873483e5d96319c929da542a0b1f" -dependencies = [ - "arrow-array 46.0.0", - "arrow-buffer 46.0.0", - "arrow-cast 46.0.0", - "arrow-data 46.0.0", - "arrow-schema 46.0.0", - "flatbuffers", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-ipc", + "arrow-schema", + "base64 0.21.7", + "bytes", + "futures", + "paste", + "prost 0.12.3", + "tokio", + "tonic 0.10.2", ] [[package]] name = "arrow-ipc" -version = "49.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79a43d6808411886b8c7d4f6f7dd477029c1e77ffffffb7923555cc6579639cd" +checksum = "03dea5e79b48de6c2e04f03f62b0afea7105be7b77d134f6c5414868feefb80d" dependencies = [ - "arrow-array 49.0.0", - "arrow-buffer 49.0.0", - "arrow-cast 49.0.0", - "arrow-data 49.0.0", - "arrow-schema 49.0.0", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", "flatbuffers", + "lz4_flex", ] [[package]] name = "arrow-json" -version = "46.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d1b91a63c356d14eedc778b76d66a88f35ac8498426bb0799a769a49a74a8b4" +checksum = "8950719280397a47d37ac01492e3506a8a724b3fb81001900b866637a829ee0f" dependencies = [ - "arrow-array 46.0.0", - "arrow-buffer 46.0.0", - "arrow-cast 46.0.0", - "arrow-data 46.0.0", - "arrow-schema 46.0.0", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", "chrono", "half", "indexmap 2.2.2", @@ -422,130 +336,65 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "46.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "584325c91293abbca7aaaabf8da9fe303245d641f5f4a18a6058dc68009c7ebf" -dependencies = [ - "arrow-array 46.0.0", - "arrow-buffer 46.0.0", - "arrow-data 46.0.0", - "arrow-schema 46.0.0", - "arrow-select 46.0.0", - "half", - "num", -] - -[[package]] -name = "arrow-ord" -version = "49.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b23b0e53c0db57c6749997fd343d4c0354c994be7eca67152dd2bdb9a3e1bb4" +checksum = "1ed9630979034077982d8e74a942b7ac228f33dd93a93b615b4d02ad60c260be" dependencies = [ - "arrow-array 49.0.0", - "arrow-buffer 49.0.0", - "arrow-data 49.0.0", - "arrow-schema 49.0.0", - "arrow-select 49.0.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", "half", "num", ] [[package]] name = "arrow-row" -version = "46.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e32afc1329f7b372463b21c6ca502b07cf237e1ed420d87706c1770bb0ebd38" -dependencies = [ - "ahash", - "arrow-array 46.0.0", - "arrow-buffer 46.0.0", - "arrow-data 46.0.0", - "arrow-schema 46.0.0", - "half", - "hashbrown 0.14.3", -] - -[[package]] -name = "arrow-row" -version = "49.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "361249898d2d6d4a6eeb7484be6ac74977e48da12a4dd81a708d620cc558117a" +checksum = "007035e17ae09c4e8993e4cb8b5b96edf0afb927cd38e2dff27189b274d83dcf" dependencies = [ "ahash", - "arrow-array 49.0.0", - "arrow-buffer 49.0.0", - "arrow-data 49.0.0", - "arrow-schema 49.0.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", "half", "hashbrown 0.14.3", ] [[package]] name = "arrow-schema" -version = "46.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b104f5daa730f00fde22adc03a12aa5a2ae9ccbbf99cbd53d284119ddc90e03d" - -[[package]] -name = "arrow-schema" -version = "49.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09e28a5e781bf1b0f981333684ad13f5901f4cd2f20589eab7cf1797da8fc167" +checksum = "0ff3e9c01f7cd169379d269f926892d0e622a704960350d09d331be3ec9e0029" [[package]] name = "arrow-select" -version = "46.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73b3ca55356d1eae07cf48808d8c462cea674393ae6ad1e0b120f40b422eb2b4" -dependencies = [ - "arrow-array 46.0.0", - "arrow-buffer 46.0.0", - "arrow-data 46.0.0", - "arrow-schema 46.0.0", - "num", -] - -[[package]] -name = "arrow-select" -version = "49.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f6208466590960efc1d2a7172bc4ff18a67d6e25c529381d7f96ddaf0dc4036" +checksum = "1ce20973c1912de6514348e064829e50947e35977bb9d7fb637dc99ea9ffd78c" dependencies = [ "ahash", - "arrow-array 49.0.0", - "arrow-buffer 49.0.0", - "arrow-data 49.0.0", - "arrow-schema 49.0.0", - "num", -] - -[[package]] -name = "arrow-string" -version = "46.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af1433ce02590cae68da0a18ed3a3ed868ffac2c6f24c533ddd2067f7ee04b4a" -dependencies = [ - "arrow-array 46.0.0", - "arrow-buffer 46.0.0", - "arrow-data 46.0.0", - "arrow-schema 46.0.0", - "arrow-select 46.0.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", "num", - "regex", - "regex-syntax 0.7.5", ] [[package]] name = "arrow-string" -version = "49.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a48149c63c11c9ff571e50ab8f017d2a7cb71037a882b42f6354ed2da9acc7" +checksum = "00f3b37f2aeece31a2636d1b037dabb69ef590e03bdc7eb68519b51ec86932a7" dependencies = [ - "arrow-array 49.0.0", - "arrow-buffer 49.0.0", - "arrow-data 49.0.0", - "arrow-schema 49.0.0", - "arrow-select 49.0.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", "num", "regex", "regex-syntax 0.8.2", @@ -1991,14 +1840,15 @@ dependencies = [ [[package]] name = "datafusion" -version = "31.0.0" +version = "35.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a4e4fc25698a14c90b34dda647ba10a5a966dc04b036d22e77fb1048663375d" +checksum = "4328f5467f76d890fe3f924362dbc3a838c6a733f762b32d87f9e0b7bef5fb49" dependencies = [ "ahash", - "arrow 46.0.0", - "arrow-array 46.0.0", - "arrow-schema 46.0.0", + "arrow", + "arrow-array", + "arrow-ipc", + "arrow-schema", "async-compression", "async-trait", "bytes", @@ -2010,6 +1860,7 @@ dependencies = [ "datafusion-expr", "datafusion-optimizer", "datafusion-physical-expr", + "datafusion-physical-plan", "datafusion-sql", "flate2", "futures", @@ -2017,13 +1868,12 @@ dependencies = [ "half", "hashbrown 0.14.3", "indexmap 2.2.2", - "itertools 0.11.0", + "itertools 0.12.1", "log", "num_cpus", "object_store", "parking_lot", "parquet", - "percent-encoding", "pin-project-lite", "rand", "sqlparser", @@ -2033,40 +1883,37 @@ dependencies = [ "url", "uuid", "xz2", - "zstd 0.12.4", + "zstd 0.13.0", ] [[package]] name = "datafusion-common" -version = "31.0.0" +version = "35.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c23ad0229ea4a85bf76b236d8e75edf539881fdb02ce4e2394f9a76de6055206" +checksum = "d29a7752143b446db4a2cccd9a6517293c6b97e8c39e520ca43ccd07135a4f7e" dependencies = [ - "arrow 46.0.0", - "arrow-array 46.0.0", - "async-compression", - "bytes", - "bzip2", + "ahash", + "arrow", + "arrow-array", + "arrow-buffer", + "arrow-schema", "chrono", - "flate2", - "futures", + "half", + "libc", "num_cpus", "object_store", "parquet", "sqlparser", - "tokio", - "tokio-util", - "xz2", - "zstd 0.12.4", ] [[package]] name = "datafusion-execution" -version = "31.0.0" +version = "35.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b37d2fc1a213baf34e0a57c85b8e6648f1a95152798fd6738163ee96c19203f" +checksum = "2d447650af16e138c31237f53ddaef6dd4f92f0e2d3f2f35d190e16c214ca496" dependencies = [ - "arrow 46.0.0", + "arrow", + "chrono", "dashmap", "datafusion-common", "datafusion-expr", @@ -2082,13 +1929,15 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "31.0.0" +version = "35.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6ea9844395f537730a145e5d87f61fecd37c2bc9d54e1dc89b35590d867345d" +checksum = "d8d19598e48a498850fb79f97a9719b1f95e7deb64a7a06f93f313e8fa1d524b" dependencies = [ "ahash", - "arrow 46.0.0", + "arrow", + "arrow-array", "datafusion-common", + "paste", "sqlparser", "strum 0.25.0", "strum_macros 0.25.3", @@ -2096,33 +1945,34 @@ dependencies = [ [[package]] name = "datafusion-optimizer" -version = "31.0.0" +version = "35.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8a30e0f79c5d59ba14d3d70f2500e87e0ff70236ad5e47f9444428f054fd2be" +checksum = "8b7feb0391f1fc75575acb95b74bfd276903dc37a5409fcebe160bc7ddff2010" dependencies = [ - "arrow 46.0.0", + "arrow", "async-trait", "chrono", "datafusion-common", "datafusion-expr", "datafusion-physical-expr", "hashbrown 0.14.3", - "itertools 0.11.0", + "itertools 0.12.1", "log", - "regex-syntax 0.7.5", + "regex-syntax 0.8.2", ] [[package]] name = "datafusion-physical-expr" -version = "31.0.0" +version = "35.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "766c567082c9bbdcb784feec8fe40c7049cedaeb3a18d54f563f75fe0dc1932c" +checksum = "e911bca609c89a54e8f014777449d8290327414d3e10c57a3e3c2122e38878d0" dependencies = [ "ahash", - "arrow 46.0.0", - "arrow-array 46.0.0", - "arrow-buffer 46.0.0", - "arrow-schema 46.0.0", + "arrow", + "arrow-array", + "arrow-buffer", + "arrow-ord", + "arrow-schema", "base64 0.21.7", "blake2", "blake3", @@ -2133,8 +1983,7 @@ dependencies = [ "hashbrown 0.14.3", "hex", "indexmap 2.2.2", - "itertools 0.11.0", - "libc", + "itertools 0.12.1", "log", "md-5", "paste", @@ -2146,14 +1995,45 @@ dependencies = [ "uuid", ] +[[package]] +name = "datafusion-physical-plan" +version = "35.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e96b546b8a02e9c2ab35ac6420d511f12a4701950c1eb2e568c122b4fefb0be3" +dependencies = [ + "ahash", + "arrow", + "arrow-array", + "arrow-buffer", + "arrow-schema", + "async-trait", + "chrono", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "datafusion-physical-expr", + "futures", + "half", + "hashbrown 0.14.3", + "indexmap 2.2.2", + "itertools 0.12.1", + "log", + "once_cell", + "parking_lot", + "pin-project-lite", + "rand", + "tokio", + "uuid", +] + [[package]] name = "datafusion-sql" -version = "31.0.0" +version = "35.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "811fd084cf2d78aa0c76b74320977c7084ad0383690612528b580795764b4dd0" +checksum = "2d18d36f260bbbd63aafdb55339213a23d540d3419810575850ef0a798a6b768" dependencies = [ - "arrow 46.0.0", - "arrow-schema 46.0.0", + "arrow", + "arrow-schema", "datafusion-common", "datafusion-expr", "log", @@ -3288,23 +3168,22 @@ dependencies = [ ] [[package]] -name = "lz4" -version = "1.24.0" +name = "lz4-sys" +version = "1.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e9e2dd86df36ce760a60f6ff6ad526f7ba1f14ba0356f8254fb6905e6494df1" +checksum = "57d27b317e207b10f69f5e75494119e391a96f48861ae870d1da6edac98ca900" dependencies = [ + "cc", "libc", - "lz4-sys", ] [[package]] -name = "lz4-sys" -version = "1.9.4" +name = "lz4_flex" +version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57d27b317e207b10f69f5e75494119e391a96f48861ae870d1da6edac98ca900" +checksum = "912b45c753ff5f7f5208307e8ace7d2a2e30d024e26d3509f3dce546c044ce15" dependencies = [ - "cc", - "libc", + "twox-hash", ] [[package]] @@ -3706,16 +3585,16 @@ dependencies = [ [[package]] name = "object_store" -version = "0.7.1" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f930c88a43b1c3f6e776dfe495b4afab89882dbc81530c632db2ed65451ebcb4" +checksum = "d139f545f64630e2e3688fd9f81c470888ab01edeb72d13b4e86c566f1130000" dependencies = [ "async-trait", "bytes", "chrono", "futures", "humantime", - "itertools 0.11.0", + "itertools 0.12.1", "parking_lot", "percent-encoding", "snafu", @@ -4019,26 +3898,27 @@ dependencies = [ [[package]] name = "parquet" -version = "46.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ad2cba786ae07da4d73371a88b9e0f9d3ffac1a9badc83922e0e15814f5c5fa" +checksum = "547b92ebf0c1177e3892f44c8f79757ee62e678d564a9834189725f2c5b7a750" dependencies = [ "ahash", - "arrow-array 46.0.0", - "arrow-buffer 46.0.0", - "arrow-cast 46.0.0", - "arrow-data 46.0.0", - "arrow-ipc 46.0.0", - "arrow-schema 46.0.0", - "arrow-select 46.0.0", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-ipc", + "arrow-schema", + "arrow-select", "base64 0.21.7", "brotli", "bytes", "chrono", "flate2", "futures", + "half", "hashbrown 0.14.3", - "lz4", + "lz4_flex", "num", "num-bigint", "object_store", @@ -4048,7 +3928,7 @@ dependencies = [ "thrift", "tokio", "twox-hash", - "zstd 0.12.4", + "zstd 0.13.0", ] [[package]] @@ -4718,12 +4598,6 @@ version = "0.6.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" -[[package]] -name = "regex-syntax" -version = "0.7.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" - [[package]] name = "regex-syntax" version = "0.8.2" @@ -4775,6 +4649,7 @@ dependencies = [ name = "restate-admin" version = "0.8.0" dependencies = [ + "arrow-flight", "axum", "bincode", "bytes", @@ -4794,13 +4669,13 @@ dependencies = [ "restate-futures-util", "restate-meta", "restate-meta-rest-model", + "restate-node-services", "restate-pb", "restate-schema-api", "restate-schema-impl", "restate-serde-util", "restate-service-client", "restate-service-protocol", - "restate-storage-query-datafusion", "restate-test-util", "restate-types", "restate-worker-api", @@ -4812,6 +4687,7 @@ dependencies = [ "test-log", "thiserror", "tokio", + "tonic 0.10.2", "tower", "tracing", "tracing-subscriber", @@ -4884,7 +4760,7 @@ name = "restate-cli" version = "0.8.0" dependencies = [ "anyhow", - "arrow 49.0.0", + "arrow", "base64 0.21.7", "bytes", "chrono", @@ -5180,7 +5056,6 @@ dependencies = [ "restate-service-protocol", "restate-test-util", "restate-types", - "restate-worker-api", "schemars", "serde", "serde_json", @@ -5232,9 +5107,12 @@ dependencies = [ name = "restate-node" version = "0.8.0" dependencies = [ + "arrow-flight", "async-trait", "axum", + "bincode", "codederror", + "datafusion", "derive_builder", "drain", "enumset", @@ -5252,9 +5130,13 @@ dependencies = [ "restate-errors", "restate-meta", "restate-node-services", + "restate-schema-api", + "restate-schema-impl", + "restate-storage-query-datafusion", "restate-storage-rocksdb", "restate-types", "restate-worker", + "restate-worker-api", "rocksdb", "schemars", "serde", @@ -6356,9 +6238,9 @@ dependencies = [ [[package]] name = "sqlparser" -version = "0.37.0" +version = "0.41.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37ae05a8250b968a3f7db93155a84d68b2e6cea1583949af5ca5b5170c76c075" +checksum = "5cc2c25a6c66789625ef164b4c7d2e548d627902280c13710d33da8222169964" dependencies = [ "log", "sqlparser_derive", @@ -6366,13 +6248,13 @@ dependencies = [ [[package]] name = "sqlparser_derive" -version = "0.1.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55fe75cb4a364c7f7ae06c7dbbc8d84bddd85d6cdf9975963c3935bc1991761e" +checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.48", ] [[package]] @@ -7632,6 +7514,7 @@ dependencies = [ "restate-admin", "restate-bifrost", "restate-meta", + "restate-node-services", "restate-schema-api", "restate-server", "restate-types", @@ -7640,6 +7523,7 @@ dependencies = [ "serde_json", "serde_yaml", "tokio", + "tonic 0.10.2", ] [[package]] @@ -7733,15 +7617,6 @@ dependencies = [ "zstd-safe 5.0.2+zstd.1.5.2", ] -[[package]] -name = "zstd" -version = "0.12.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a27595e173641171fc74a1232b7b1c7a7cb6e18222c11e9dfb9888fa424c53c" -dependencies = [ - "zstd-safe 6.0.6", -] - [[package]] name = "zstd" version = "0.13.0" @@ -7761,16 +7636,6 @@ dependencies = [ "zstd-sys", ] -[[package]] -name = "zstd-safe" -version = "6.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee98ffd0b48ee95e6c5168188e44a54550b1564d9d530ee21d5f0eaed1069581" -dependencies = [ - "libc", - "zstd-sys", -] - [[package]] name = "zstd-safe" version = "7.0.0" diff --git a/Cargo.toml b/Cargo.toml index 59bd27256..9b8acc325 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,6 +73,8 @@ restate-worker-api = { path = "crates/worker-api" } ahash = "0.8.5" anyhow = "1.0.68" arc-swap = "1.6" +arrow = { version = "50.0.0", default-features = false } +arrow-flight = { version = "50.0.0" } assert2 = "0.3.11" async-channel = "2.1.1" async-trait = "0.1.73" @@ -84,8 +86,8 @@ bytes-utils = "0.1.3" bytestring = { version = "1.2", features = ["serde"] } chrono = { version = "0.4.31", default-features = false, features = ["clock"] } criterion = "0.5" -datafusion = { version = "31.0.0" } -datafusion-expr = { version = "31.0.0" } +datafusion = { version = "35.0.0" } +datafusion-expr = { version = "35.0.0" } derive_builder = "0.12.0" derive_more = { version = "0.99.17" } drain = "0.1.1" diff --git a/cli/Cargo.toml b/cli/Cargo.toml index e4721c949..09a9483f4 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -20,7 +20,7 @@ restate-service-protocol = { workspace = true } restate-types = { workspace = true } anyhow = { workspace = true } -arrow = { version = "49.0.0", default-features = false, features = ["ipc", "prettyprint"] } +arrow = { workspace = true, features = ["ipc", "prettyprint"] } bytes = { workspace = true } base64 = { workspace = true} chrono = { workspace = true } diff --git a/crates/admin/Cargo.toml b/crates/admin/Cargo.toml index 564aa0f23..278882b18 100644 --- a/crates/admin/Cargo.toml +++ b/crates/admin/Cargo.toml @@ -18,15 +18,16 @@ restate-fs-util = { workspace = true } restate-futures-util = { workspace = true } restate-meta = { workspace = true } restate-meta-rest-model = { workspace = true, features = ["schema"] } +restate-node-services = { workspace = true } restate-pb = { workspace = true } restate-schema-api = { workspace = true, features = ["service", "deployment", "serde", "serde_schema"] } restate-schema-impl = { workspace = true } restate-service-client = { workspace = true } restate-service-protocol = { workspace = true, features = ["discovery"] } -restate-storage-query-datafusion = { workspace = true } restate-types = { workspace = true, features = ["serde", "serde_schema"] } restate-worker-api = { workspace = true } +arrow-flight = { workspace = true } axum = { workspace = true } bincode = { workspace = true } bytes = { workspace = true } @@ -47,6 +48,7 @@ serde_json = { workspace = true } serde_with = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["full"] } +tonic = { workspace = true } tower = { workspace = true, features = ["load-shed", "limit"] } tracing = { workspace = true } diff --git a/crates/admin/src/options.rs b/crates/admin/src/options.rs index 94cf72088..f139ca496 100644 --- a/crates/admin/src/options.rs +++ b/crates/admin/src/options.rs @@ -8,8 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use restate_bifrost::Bifrost; -use restate_meta::MetaHandle; +use restate_meta::{FileMetaReader, MetaHandle}; use restate_schema_impl::Schemas; use serde::{Deserialize, Serialize}; use serde_with::serde_as; @@ -49,8 +48,8 @@ impl Options { self, schemas: Schemas, meta_handle: MetaHandle, - bifrost: Bifrost, + schema_reader: FileMetaReader, ) -> AdminService { - AdminService::new(self, schemas, meta_handle, bifrost) + AdminService::new(self, schemas, meta_handle, schema_reader) } } diff --git a/crates/admin/src/rest_api/deployments.rs b/crates/admin/src/rest_api/deployments.rs index 3bb4630e6..f17fae1ea 100644 --- a/crates/admin/src/rest_api/deployments.rs +++ b/crates/admin/src/rest_api/deployments.rs @@ -19,6 +19,7 @@ use restate_service_client::Endpoint; use restate_service_protocol::discovery::DiscoverEndpoint; use restate_types::identifiers::InvalidLambdaARN; +use crate::rest_api::notify_worker_about_schema_changes; use axum::body::Bytes; use axum::extract::{Path, Query, State}; use axum::http::{header, HeaderValue, StatusCode}; @@ -97,6 +98,8 @@ pub async fn create_deployment( .register_deployment(discover_endpoint, force, apply_changes) .await?; + notify_worker_about_schema_changes(state.schema_reader(), state.worker_svc_client()).await?; + let response_body = RegisterDeploymentResponse { id: registration_result.deployment, services: registration_result.services, @@ -241,6 +244,8 @@ pub async fn delete_deployment( ) -> Result { if let Some(true) = force { state.meta_handle().remove_deployment(deployment_id).await?; + notify_worker_about_schema_changes(state.schema_reader(), state.worker_svc_client()) + .await?; Ok(StatusCode::ACCEPTED) } else { Ok(StatusCode::NOT_IMPLEMENTED) diff --git a/crates/admin/src/rest_api/error.rs b/crates/admin/src/rest_api/error.rs index 4af492cf2..deb57888b 100644 --- a/crates/admin/src/rest_api/error.rs +++ b/crates/admin/src/rest_api/error.rs @@ -43,6 +43,8 @@ pub enum MetaApiError { Meta(#[from] MetaError), #[error(transparent)] Worker(#[from] restate_worker_api::Error), + #[error(transparent)] + Generic(Error), } /// # Error description response diff --git a/crates/admin/src/rest_api/invocations.rs b/crates/admin/src/rest_api/invocations.rs index 3433acdf4..a17bcc52d 100644 --- a/crates/admin/src/rest_api/invocations.rs +++ b/crates/admin/src/rest_api/invocations.rs @@ -71,7 +71,7 @@ pub async fn delete_invocation( Query(DeleteInvocationParams { mode }): Query, ) -> Result where - W: restate_worker_api::Handle + Send, + W: restate_worker_api::Handle + Clone + Send, { let invocation_id = invocation_id .parse::() diff --git a/crates/admin/src/rest_api/mod.rs b/crates/admin/src/rest_api/mod.rs index 5dacae3ba..c5de51675 100644 --- a/crates/admin/src/rest_api/mod.rs +++ b/crates/admin/src/rest_api/mod.rs @@ -18,12 +18,18 @@ mod methods; mod services; mod subscriptions; +use crate::rest_api::error::MetaApiError; use okapi_operation::axum_integration::{delete, get, patch, post}; use okapi_operation::*; +use restate_meta::{FileMetaReader, MetaReader}; +use restate_node_services::worker::worker_svc_client::WorkerSvcClient; +use restate_node_services::worker::UpdateSchemaRequest; +use tonic::transport::Channel; +use tracing::debug; use crate::state::AdminServiceState; -pub fn create_router( +pub fn create_router( state: AdminServiceState, ) -> axum::Router<()> { // Setup the router @@ -101,3 +107,30 @@ pub fn create_router( .expect("Error when building the OpenAPI specification") .with_state(state) } + +/// Notifies the worker about schema changes. This method is best-effort and will not fail if the worker +/// could not be reached. +async fn notify_worker_about_schema_changes( + schema_reader: &FileMetaReader, + mut worker_svc_client: WorkerSvcClient, +) -> Result<(), MetaApiError> { + let schema_updates = schema_reader + .read() + .await + .map_err(|err| MetaApiError::Meta(err.into()))?; + + // don't fail if the worker is not reachable + let result = worker_svc_client + .update_schemas(UpdateSchemaRequest { + schema_bin: bincode::serde::encode_to_vec(schema_updates, bincode::config::standard()) + .map_err(|err| MetaApiError::Generic(err.into()))? + .into(), + }) + .await; + + if let Err(err) = result { + debug!("Failed notifying worker about schema changes: {err}"); + } + + Ok(()) +} diff --git a/crates/admin/src/rest_api/services.rs b/crates/admin/src/rest_api/services.rs index 990311fd3..414b9f175 100644 --- a/crates/admin/src/rest_api/services.rs +++ b/crates/admin/src/rest_api/services.rs @@ -15,6 +15,7 @@ use restate_meta_rest_model::services::*; use restate_pb::grpc::reflection::v1::FileDescriptorResponse; use restate_schema_api::service::ServiceMetadataResolver; +use crate::rest_api::notify_worker_about_schema_changes; use axum::extract::{Path, State}; use axum::http::{header, HeaderValue}; use axum::response::{IntoResponse, Response}; @@ -91,6 +92,8 @@ pub async fn modify_service( .modify_service(service_name.clone(), public) .await?; + notify_worker_about_schema_changes(state.schema_reader(), state.worker_svc_client()).await?; + state .schemas() .resolve_latest_service_metadata(&service_name) @@ -129,7 +132,7 @@ pub async fn modify_service_state( }): Json, ) -> Result where - W: restate_worker_api::Handle + Send, + W: restate_worker_api::Handle + Clone + Send, { let service_id = ServiceId::new(service_name, service_key); diff --git a/crates/admin/src/rest_api/subscriptions.rs b/crates/admin/src/rest_api/subscriptions.rs index ca0eea0bb..64051c8c6 100644 --- a/crates/admin/src/rest_api/subscriptions.rs +++ b/crates/admin/src/rest_api/subscriptions.rs @@ -14,6 +14,7 @@ use crate::state::AdminServiceState; use restate_meta_rest_model::subscriptions::*; use restate_schema_api::subscription::SubscriptionResolver; +use crate::rest_api::notify_worker_about_schema_changes; use axum::extract::Query; use axum::extract::{Path, State}; use axum::http::StatusCode; @@ -52,6 +53,8 @@ pub async fn create_subscription( ) .await?; + notify_worker_about_schema_changes(state.schema_reader(), state.worker_svc_client()).await?; + Ok(( StatusCode::CREATED, [( @@ -168,5 +171,7 @@ pub async fn delete_subscription( .delete_subscription(subscription_id) .await?; + notify_worker_about_schema_changes(state.schema_reader(), state.worker_svc_client()).await?; + Ok(StatusCode::ACCEPTED) } diff --git a/crates/admin/src/service.rs b/crates/admin/src/service.rs index 2af4f3639..4937c59f4 100644 --- a/crates/admin/src/service.rs +++ b/crates/admin/src/service.rs @@ -13,31 +13,37 @@ use std::sync::Arc; use axum::error_handling::HandleErrorLayer; use futures::FutureExt; use http::StatusCode; -use restate_bifrost::Bifrost; +use tonic::transport::Channel; use tower::ServiceBuilder; -use restate_meta::MetaHandle; +use restate_meta::{FileMetaReader, MetaHandle}; +use restate_node_services::worker::worker_svc_client::WorkerSvcClient; use restate_schema_impl::Schemas; -use restate_storage_query_datafusion::context::QueryContext; use tracing::info; use crate::{rest_api, state, storage_query}; use crate::{Error, Options}; +#[derive(Debug)] pub struct AdminService { opts: Options, schemas: Schemas, meta_handle: MetaHandle, - _bifrost: Bifrost, + schema_reader: FileMetaReader, } impl AdminService { - pub fn new(opts: Options, schemas: Schemas, meta_handle: MetaHandle, bifrost: Bifrost) -> Self { + pub fn new( + opts: Options, + schemas: Schemas, + meta_handle: MetaHandle, + schema_reader: FileMetaReader, + ) -> Self { Self { opts, schemas, meta_handle, - _bifrost: bifrost, + schema_reader, } } @@ -45,21 +51,18 @@ impl AdminService { self, drain: drain::Watch, worker_handle: impl restate_worker_api::Handle + Clone + Send + Sync + 'static, - query_context: Option, + worker_svc_client: WorkerSvcClient, ) -> Result<(), Error> { - let rest_state = - state::AdminServiceState::new(self.meta_handle, self.schemas, worker_handle); - - let router = axum::Router::new(); + let rest_state = state::AdminServiceState::new( + self.meta_handle, + self.schemas, + worker_handle, + worker_svc_client.clone(), + self.schema_reader, + ); - // Stitch query http endpoint if enabled - let router = if let Some(query_context) = query_context { - let query_state = Arc::new(state::QueryServiceState { query_context }); - // Merge storage query router - router.merge(storage_query::create_router(query_state)) - } else { - router - }; + let query_state = Arc::new(state::QueryServiceState { worker_svc_client }); + let router = axum::Router::new().merge(storage_query::create_router(query_state)); let router = router // Merge meta API router diff --git a/crates/admin/src/state.rs b/crates/admin/src/state.rs index f28f071ff..7d695885f 100644 --- a/crates/admin/src/state.rs +++ b/crates/admin/src/state.rs @@ -9,28 +9,39 @@ // by the Apache License, Version 2.0. // -use restate_meta::MetaHandle; +use restate_meta::{FileMetaReader, MetaHandle}; +use restate_node_services::worker::worker_svc_client::WorkerSvcClient; use restate_schema_impl::Schemas; -use restate_storage_query_datafusion::context::QueryContext; +use tonic::transport::Channel; #[derive(Clone, derive_builder::Builder)] pub struct AdminServiceState { meta_handle: MetaHandle, schemas: Schemas, worker_handle: W, + worker_svc_client: WorkerSvcClient, + schema_reader: FileMetaReader, } #[derive(Clone)] pub struct QueryServiceState { - pub query_context: QueryContext, + pub worker_svc_client: WorkerSvcClient, } impl AdminServiceState { - pub fn new(meta_handle: MetaHandle, schemas: Schemas, worker_handle: W) -> Self { + pub fn new( + meta_handle: MetaHandle, + schemas: Schemas, + worker_handle: W, + worker_svc_client: WorkerSvcClient, + schema_reader: FileMetaReader, + ) -> Self { Self { meta_handle, schemas, worker_handle, + worker_svc_client, + schema_reader, } } @@ -42,7 +53,17 @@ impl AdminServiceState { &self.schemas } - pub fn worker_handle(&self) -> &W { - &self.worker_handle + pub fn worker_svc_client(&self) -> WorkerSvcClient { + self.worker_svc_client.clone() + } + + pub fn schema_reader(&self) -> &FileMetaReader { + &self.schema_reader + } +} + +impl AdminServiceState { + pub fn worker_handle(&self) -> W { + self.worker_handle.clone() } } diff --git a/crates/admin/src/storage_query/error.rs b/crates/admin/src/storage_query/error.rs index 6e2873a64..3ce25b690 100644 --- a/crates/admin/src/storage_query/error.rs +++ b/crates/admin/src/storage_query/error.rs @@ -12,7 +12,6 @@ use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; use axum::Json; -use datafusion::common::DataFusionError; use okapi_operation::anyhow::Error; use okapi_operation::okapi::map; use okapi_operation::okapi::openapi3::Responses; @@ -23,9 +22,9 @@ use serde::Serialize; /// This error is used by handlers to propagate API errors, /// and later converted to a response through the IntoResponse implementation #[derive(Debug, thiserror::Error)] -pub enum StorageApiError { - #[error(transparent)] - DataFusionError(#[from] DataFusionError), +pub enum StorageQueryError { + #[error("failed grpc: {0}")] + Tonic(#[from] tonic::Status), } /// # Error description response @@ -36,7 +35,7 @@ struct ErrorDescriptionResponse { message: String, } -impl IntoResponse for StorageApiError { +impl IntoResponse for StorageQueryError { fn into_response(self) -> Response { let status_code = StatusCode::INTERNAL_SERVER_ERROR; @@ -50,7 +49,7 @@ impl IntoResponse for StorageApiError { } } -impl ToResponses for StorageApiError { +impl ToResponses for StorageQueryError { fn generate(components: &mut Components) -> Result { let error_media_type = as ToMediaTypes>::generate(components)?; diff --git a/crates/admin/src/storage_query/query.rs b/crates/admin/src/storage_query/query.rs index 7272bd832..deb0a6150 100644 --- a/crates/admin/src/storage_query/query.rs +++ b/crates/admin/src/storage_query/query.rs @@ -8,8 +8,12 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::future; +use arrow_flight::decode::FlightRecordBatchStream; +use arrow_flight::error::FlightError; +use arrow_flight::FlightData; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; use axum::body::StreamBody; use axum::extract::State; @@ -24,17 +28,16 @@ use datafusion::arrow::datatypes::{ByteArrayType, DataType, Field, FieldRef, Sch use datafusion::arrow::error::ArrowError; use datafusion::arrow::ipc::writer::StreamWriter; use datafusion::arrow::record_batch::RecordBatch; -use datafusion::error::DataFusionError; -use datafusion::physical_plan::stream::RecordBatchStreamAdapter; -use futures::{stream, StreamExt}; +use futures::{ready, Stream, StreamExt, TryStreamExt}; use okapi_operation::*; +use restate_node_services::worker::StorageQueryRequest; use schemars::JsonSchema; use serde::Deserialize; use serde_with::serde_as; use crate::state::QueryServiceState; -use super::error::StorageApiError; +use super::error::StorageQueryError; #[serde_as] #[derive(Debug, Deserialize, JsonSchema)] @@ -53,65 +56,35 @@ pub struct QueryRequest { description = "Query the storage API", operation_id = "query", tags = "storage", - responses(ignore_return_type = true, from_type = "StorageApiError") + responses(ignore_return_type = true, from_type = "StorageQueryError") )] pub async fn query( State(state): State>, #[request_body(required = true)] Json(payload): Json, -) -> Result { - let stream = state - .query_context - .execute(payload.query.as_str()) - .await - .map_err(StorageApiError::DataFusionError)?; - - // create a stream without LargeUtf8 or LargeBinary as JS doesn't support these yet - let converted_schema = convert_schema(stream.schema()); - let converted_schema_cloned = converted_schema.clone(); - let converted_stream = RecordBatchStreamAdapter::new( - converted_schema.clone(), - stream.map(move |batch| { - let converted_schema = converted_schema_cloned.clone(); - batch.and_then(|batch| { - convert_record_batch(converted_schema, batch).map_err(DataFusionError::ArrowError) +) -> Result { + let mut worker_grpc_client = state.worker_svc_client.clone(); + + let response_stream = worker_grpc_client + .query_storage(StorageQueryRequest { + query: payload.query, + }) + .await? + .into_inner(); + + let record_batch_stream = FlightRecordBatchStream::new_from_flight_data( + response_stream + .map_ok(|response| FlightData { + data_header: response.header, + data_body: response.data, + ..FlightData::default() }) - }), + .map_err(FlightError::from), ); - // create a stream with a labelled start and end - let labelled_stream = stream::once(future::ready(LabelledStream::Start)) - .chain(converted_stream.map(LabelledStream::Batch)) - .chain(stream::once(future::ready(LabelledStream::End))); - - let mut stream_writer = - StreamWriter::try_new(Vec::::new(), converted_schema.clone().as_ref()) - .map_err(DataFusionError::ArrowError) - .map_err(StorageApiError::DataFusionError)?; - - let body = StreamBody::new(labelled_stream.map( - move |label| -> Result { - match label { - LabelledStream::Start => { - // starting bytes were already written during StreamWriter::try_new - let b = Bytes::copy_from_slice(stream_writer.get_ref()); - stream_writer.get_mut().clear(); - Ok(b) - } - LabelledStream::Batch(batch) => { - stream_writer.write(&batch?)?; - let b = Bytes::copy_from_slice(stream_writer.get_ref()); - stream_writer.get_mut().clear(); - Ok(b) - } - LabelledStream::End => { - stream_writer.finish()?; - let b = Bytes::copy_from_slice(stream_writer.get_ref()); - stream_writer.get_mut().clear(); - Ok(b) - } - } - }, - )); + // create a stream without LargeUtf8 or LargeBinary columns as JS doesn't support these yet + let result_stream = ConvertRecordBatchStream::new(record_batch_stream); + + let body = StreamBody::new(result_stream); Ok(( [( http::header::CONTENT_TYPE, @@ -182,8 +155,116 @@ where ) } -enum LabelledStream { - Start, - Batch(Result), - End, +enum ConversionState { + WaitForSchema, + WaitForRecords(SchemaRef, StreamWriter>), +} + +/// Convert the record batches so that they don't contain LargeUtf8 or LargeBinary columns as JS doesn't +/// support these yet. +struct ConvertRecordBatchStream { + done: bool, + state: ConversionState, + + record_batch_stream: FlightRecordBatchStream, +} + +impl ConvertRecordBatchStream { + fn new(record_batch_stream: FlightRecordBatchStream) -> Self { + ConvertRecordBatchStream { + done: false, + state: ConversionState::WaitForSchema, + record_batch_stream, + } + } +} + +impl ConvertRecordBatchStream { + fn create_stream_writer( + record_batch: &RecordBatch, + ) -> Result<(SchemaRef, StreamWriter>), ArrowError> { + let converted_schema = convert_schema(record_batch.schema()); + let stream_writer = StreamWriter::try_new(Vec::new(), converted_schema.as_ref())?; + + Ok((converted_schema, stream_writer)) + } + + fn write_batch( + converted_schema: &SchemaRef, + stream_writer: &mut StreamWriter>, + record_batch: RecordBatch, + ) -> Result<(), ArrowError> { + let record_batch = convert_record_batch(converted_schema.clone(), record_batch)?; + stream_writer.write(&record_batch) + } + + fn process_record( + mut self: Pin<&mut Self>, + record_batch: Result, + ) -> Result { + let record_batch = record_batch?; + match &mut self.state { + ConversionState::WaitForSchema => { + let (converted_schema, mut stream_writer) = + Self::create_stream_writer(&record_batch)?; + Self::write_batch(&converted_schema, &mut stream_writer, record_batch)?; + let bytes = Bytes::copy_from_slice(stream_writer.get_ref()); + stream_writer.get_mut().clear(); + self.state = ConversionState::WaitForRecords(converted_schema, stream_writer); + Ok(bytes) + } + ConversionState::WaitForRecords(converted_schema, stream_writer) => { + Self::write_batch(converted_schema, stream_writer, record_batch)?; + let bytes = Bytes::copy_from_slice(stream_writer.get_ref()); + stream_writer.get_mut().clear(); + Ok(bytes) + } + } + } +} + +impl Stream for ConvertRecordBatchStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.done { + return Poll::Ready(None); + } + + let record_batch = ready!(self.record_batch_stream.poll_next_unpin(cx)); + + if let Some(record_batch) = record_batch { + match self.as_mut().process_record(record_batch) { + Ok(bytes) => Poll::Ready(Some(Ok(bytes))), + Err(err) => { + self.done = true; + Poll::Ready(Some(Err(err))) + } + } + } else { + self.done = true; + if let ConversionState::WaitForRecords(_, stream_writer) = &mut self.state { + if let Err(err) = stream_writer.finish() { + Poll::Ready(Some(Err(err.into()))) + } else { + let bytes = Bytes::copy_from_slice(stream_writer.get_ref()); + stream_writer.get_mut().clear(); + Poll::Ready(Some(Ok(bytes))) + } + } else { + // CLI is expecting schema information + if let (Some(schema), ConversionState::WaitForSchema) = + (self.record_batch_stream.schema(), &self.state) + { + let schema_bytes = StreamWriter::try_new(Vec::new(), schema) + .and_then(|stream_writer| stream_writer.into_inner().map(Bytes::from)) + .map_err(FlightError::from); + + Poll::Ready(Some(schema_bytes)) + } else { + Poll::Ready(None) + } + } + } + } } diff --git a/crates/cluster-controller/src/service.rs b/crates/cluster-controller/src/service.rs index 31428e056..ad469ffc3 100644 --- a/crates/cluster-controller/src/service.rs +++ b/crates/cluster-controller/src/service.rs @@ -25,14 +25,16 @@ pub struct Service { } // todo: Replace with proper handle -pub type ClusterControllerHandle = (); +pub struct ClusterControllerHandle; impl Service { pub fn new(options: Options) -> Self { Service { options } } - pub fn handle(&self) -> ClusterControllerHandle {} + pub fn handle(&self) -> ClusterControllerHandle { + ClusterControllerHandle + } pub async fn run(self, shutdown_watch: drain::Watch) -> Result<(), Error> { let _ = shutdown_watch.signaled().await; diff --git a/crates/ingress-kafka/src/subscription_controller.rs b/crates/ingress-kafka/src/subscription_controller.rs index 8e727a811..5e86009e3 100644 --- a/crates/ingress-kafka/src/subscription_controller.rs +++ b/crates/ingress-kafka/src/subscription_controller.rs @@ -11,6 +11,7 @@ use super::consumer_task::MessageSender; use super::options::Options; use super::*; +use std::collections::HashSet; use crate::subscription_controller::task_orchestrator::TaskOrchestrator; use rdkafka::error::KafkaError; @@ -25,6 +26,7 @@ use tokio::sync::mpsc; pub enum Command { StartSubscription(Subscription), StopSubscription(SubscriptionId), + UpdateSubscriptions(Vec), } #[derive(Debug, thiserror::Error)] @@ -75,7 +77,8 @@ impl Service { Some(cmd) = self.commands_rx.recv() => { match cmd { Command::StartSubscription(sub) => self.handle_start_subscription(sub, &mut task_orchestrator), - Command::StopSubscription(sub_id) => self.handle_stop_subscription(sub_id, &mut task_orchestrator) + Command::StopSubscription(sub_id) => self.handle_stop_subscription(sub_id, &mut task_orchestrator), + Command::UpdateSubscriptions(subscriptions) => self.handle_update_subscriptions(subscriptions, &mut task_orchestrator), } } _ = task_orchestrator.poll(), if !task_orchestrator.is_empty() => {}, @@ -137,6 +140,27 @@ impl Service { ) { task_orchestrator.stop(subscription_id); } + + fn handle_update_subscriptions( + &mut self, + subscriptions: Vec, + task_orchestrator: &mut TaskOrchestrator, + ) { + let mut running_subscriptions: HashSet<_> = + task_orchestrator.running_subscriptions().cloned().collect(); + + for subscription in subscriptions { + if !running_subscriptions.contains(&subscription.id()) { + self.handle_start_subscription(subscription, task_orchestrator); + } else { + running_subscriptions.remove(&subscription.id()); + } + } + + for subscription_id in running_subscriptions { + self.handle_stop_subscription(subscription_id, task_orchestrator); + } + } } mod task_orchestrator { @@ -323,5 +347,9 @@ mod task_orchestrator { self.running_tasks_to_subscriptions.clear(); self.tasks.shutdown().await; } + + pub(super) fn running_subscriptions(&self) -> impl Iterator { + self.subscription_id_to_task_state.keys() + } } } diff --git a/crates/meta/Cargo.toml b/crates/meta/Cargo.toml index cfba61b1a..4796a33c8 100644 --- a/crates/meta/Cargo.toml +++ b/crates/meta/Cargo.toml @@ -23,7 +23,6 @@ restate-serde-util = { workspace = true, features = ["schema"] } restate-service-client = { workspace = true } restate-service-protocol = { workspace = true, features = ["discovery"] } restate-types = { workspace = true, features = ["serde", "serde_schema"] } -restate-worker-api = { workspace = true } bincode = { workspace = true } bytestring = { workspace = true } diff --git a/crates/meta/src/error.rs b/crates/meta/src/error.rs index ddd2ba4b8..42a155af9 100644 --- a/crates/meta/src/error.rs +++ b/crates/meta/src/error.rs @@ -11,7 +11,7 @@ use restate_schema_impl::SchemasUpdateError; use restate_service_protocol::discovery::ServiceDiscoveryError; -use crate::storage::MetaStorageError; +use crate::storage::{MetaReaderError, MetaStorageError}; #[derive(Debug, thiserror::Error, codederror::CodedError)] pub enum Error { @@ -26,6 +26,9 @@ pub enum Error { Storage(#[from] MetaStorageError), #[error(transparent)] #[code(unknown)] + Reader(#[from] MetaReaderError), + #[error(transparent)] + #[code(unknown)] SchemaRegistry(#[from] SchemasUpdateError), #[error("meta closed")] #[code(unknown)] diff --git a/crates/meta/src/lib.rs b/crates/meta/src/lib.rs index af6d78d8e..975ad6a6e 100644 --- a/crates/meta/src/lib.rs +++ b/crates/meta/src/lib.rs @@ -22,11 +22,12 @@ pub use restate_service_client::{ OptionsBuilderError as LambdaClientOptionsBuilderError, }; pub use service::{ApplyMode, Force, MetaHandle, MetaService}; -pub use storage::{FileMetaStorage, MetaStorage}; +pub use storage::{FileMetaReader, FileMetaStorage, MetaReader, MetaStorage}; use std::time::Duration; use codederror::CodedError; +use restate_schema_api::subscription::SubscriptionValidator; use serde::{Deserialize, Serialize}; use serde_with::serde_as; @@ -67,12 +68,16 @@ impl Options { &self.storage_path } - pub fn build(self) -> Result, BuildError> { + pub fn build( + self, + subscription_validator: SV, + ) -> Result, BuildError> { let schemas = Schemas::default(); let client = self.service_client.build(AssumeRoleCacheMode::None); Ok(MetaService::new( schemas.clone(), FileMetaStorage::new(self.storage_path.into())?, + subscription_validator, // Total duration roughly 66 seconds RetryPolicy::exponential( Duration::from_millis(100), diff --git a/crates/meta/src/service.rs b/crates/meta/src/service.rs index 16554c8f7..640d0d1b6 100644 --- a/crates/meta/src/service.rs +++ b/crates/meta/src/service.rs @@ -22,16 +22,15 @@ use restate_errors::warn_it; use restate_futures_util::command::{Command, UnboundedCommandReceiver, UnboundedCommandSender}; use restate_schema_api::deployment::{DeliveryOptions, DeploymentMetadata}; use restate_schema_api::service::ServiceMetadata; -use restate_schema_api::subscription::{Subscription, SubscriptionResolver}; +use restate_schema_api::subscription::{Subscription, SubscriptionValidator}; use restate_schema_impl::{Schemas, SchemasUpdateCommand}; use restate_service_protocol::discovery::{DiscoverEndpoint, ServiceDiscovery}; use restate_types::identifiers::{DeploymentId, SubscriptionId}; use restate_types::retries::RetryPolicy; -use restate_worker_api::SubscriptionController; use restate_service_client::{Endpoint, ServiceClient}; -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct MetaHandle(UnboundedCommandSender); /// Whether to force the registration of an existing endpoint or not @@ -192,12 +191,14 @@ impl MetaHandle { // -- Service implementation -pub struct MetaService { +#[derive(Debug)] +pub struct MetaService { schemas: Schemas, service_discovery: ServiceDiscovery, storage: Storage, + subscription_validator: SV, handle: MetaHandle, api_cmd_rx: UnboundedCommandReceiver, @@ -205,13 +206,15 @@ pub struct MetaService { reloaded: bool, } -impl MetaService +impl MetaService where Storage: MetaStorage, + SV: SubscriptionValidator, { pub fn new( schemas: Schemas, storage: Storage, + subscription_validator: SV, service_discovery_retry_policy: RetryPolicy, client: ServiceClient, ) -> Self { @@ -221,6 +224,7 @@ where schemas, service_discovery: ServiceDiscovery::new(service_discovery_retry_policy, client), storage, + subscription_validator, handle: MetaHandle(api_cmd_tx), api_cmd_rx, reloaded: false, @@ -235,15 +239,15 @@ where self.handle.clone() } + pub fn schema_reader(&self) -> Storage::Reader { + self.storage.create_reader() + } + pub async fn init(&mut self) -> Result<(), Error> { self.reload_schemas().await } - pub async fn run( - mut self, - drain: drain::Watch, - worker_handle: impl restate_worker_api::Handle + Clone + Send + Sync + 'static, - ) -> Result<(), Error> { + pub async fn run(mut self, drain: drain::Watch) -> Result<(), Error> { debug_assert!( self.reloaded, "The Meta service was not init-ed before running it" @@ -252,12 +256,6 @@ where let shutdown = drain.signaled(); tokio::pin!(shutdown); - // The reason we reload subscriptions here and not in init() is because - // reload_subscriptions writes to a bounded channel read by the worker. - // If the worker is not running, this could deadlock when reaching the channel capacity. - // While here, we're safe to assume the worker is running and will read from that channel. - self.reload_subscriptions(&worker_handle).await; - loop { tokio::select! { cmd = self.api_cmd_rx.recv() => { @@ -283,13 +281,13 @@ where }) ), MetaHandleRequest::CreateSubscription { id, source, sink, metadata } => MetaHandleResponse::CreateSubscription( - self.create_subscription(id, source, sink, metadata, worker_handle.clone()).await + self.create_subscription(id, source, sink, metadata).await .map_err(|e| { warn_it!(e); e }) ), MetaHandleRequest::DeleteSubscription { subscription_id } => MetaHandleResponse::DeleteSubscription( - self.delete_subscription(subscription_id, worker_handle.clone()).await + self.delete_subscription(subscription_id).await .map_err(|e| { warn_it!(e); e }) @@ -314,19 +312,6 @@ where Ok(()) } - async fn reload_subscriptions( - &mut self, - worker_handle: &(impl restate_worker_api::Handle + Send + Sync + 'static), - ) { - for subscription in self.schemas.list_subscriptions(&[]) { - // If the worker is closing, we can ignore this - let _ = worker_handle - .subscription_controller_handle() - .start_subscription(subscription) - .await; - } - } - async fn discover_deployment( &mut self, endpoint: DiscoverEndpoint, @@ -402,7 +387,6 @@ where source: Uri, sink: Uri, metadata: Option>, - worker_handle: impl restate_worker_api::Handle + Clone + Send + Sync + 'static, ) -> Result { info!(restate.subscription.source = %source, restate.subscription.sink = %sink, "Create subscription"); @@ -412,31 +396,19 @@ where source, sink, metadata, - worker_handle.subscription_controller_handle(), + &self.subscription_validator, )?; self.store_and_apply_updates(vec![update_command]).await?; - let _ = worker_handle - .subscription_controller_handle() - .start_subscription(sub.clone()) - .await; Ok(sub) } - async fn delete_subscription( - &mut self, - id: SubscriptionId, - worker_handle: impl restate_worker_api::Handle + Clone + Send + Sync + 'static, - ) -> Result<(), Error> { + async fn delete_subscription(&mut self, id: SubscriptionId) -> Result<(), Error> { info!(restate.subscription.id = %id, "Delete subscription"); // Compute the diff and propagate updates let update_command = self.schemas.compute_remove_subscription(id)?; self.store_and_apply_updates(vec![update_command]).await?; - let _ = worker_handle - .subscription_controller_handle() - .stop_subscription(id) - .await; Ok(()) } diff --git a/crates/meta/src/storage.rs b/crates/meta/src/storage.rs index 587ba92e9..bc7d1f221 100644 --- a/crates/meta/src/storage.rs +++ b/crates/meta/src/storage.rs @@ -31,25 +31,45 @@ pub enum MetaStorageError { Io(#[from] io::Error), #[error("generic serde error: {0}. This is probably a runtime bug")] Encode(#[from] bincode::error::EncodeError), - #[error("generic serde error: {0}. This is probably a runtime bug")] - Decode(#[from] bincode::error::DecodeError), #[error("generic descriptor error: {0}. This is probably a runtime bug")] Descriptor(#[from] prost_reflect::DescriptorError), #[error("task error when writing to disk: {0}. This is probably a runtime bug")] Join(#[from] tokio::task::JoinError), + #[error("failed reading meta information: {0}")] + Reading(#[from] MetaReaderError), +} + +#[derive(Debug, thiserror::Error)] +pub enum MetaReaderError { + #[error("generic io error: {0}")] + Io(#[from] io::Error), #[error("file ending with .restate has a bad filename: {0}. This is probably a runtime bug")] BadFilename(PathBuf), + #[error("task error when reading from disk: {0}. This is probably a runtime bug")] + Join(#[from] tokio::task::JoinError), + #[error("error decoding stored meta data: {0}. This is probably a runtime bug")] + Decode(#[from] bincode::error::DecodeError), +} + +pub trait MetaReader { + fn read( + &self, + ) -> impl Future, MetaReaderError>> + Send; } pub trait MetaStorage { + type Reader: MetaReader; + + fn reload( + &mut self, + ) -> impl Future, MetaStorageError>> + Send; + fn store( &mut self, commands: Vec, ) -> impl Future> + Send; - fn reload( - &mut self, - ) -> impl Future, MetaStorageError>> + Send; + fn create_reader(&self) -> Self::Reader; } // --- File based implementation of MetaStorage, using bincode @@ -74,6 +94,69 @@ pub enum BuildError { const RESTATE_EXTENSION: &str = "restate"; +#[derive(Debug, Clone)] +pub struct FileMetaReader { + root_path: PathBuf, +} + +impl FileMetaReader { + fn new(path: PathBuf) -> FileMetaReader { + FileMetaReader { root_path: path } + } + + async fn load(&self) -> Result<(usize, Vec), MetaReaderError> { + // Try to create a dir, in case it doesn't exist + restate_fs_util::create_dir_all_if_doesnt_exists(&self.root_path).await?; + + // Find all the metadata files in the root path directory, parse the index and then sort them by index + let mut read_dir = tokio::fs::read_dir(&self.root_path).await?; + let mut metadata_files = vec![]; + let mut next_file_index = 0; + while let Some(dir_entry) = read_dir.next_entry().await? { + if dir_entry + .path() + .extension() + .and_then(|os_str| os_str.to_str()) + == Some(RESTATE_EXTENSION) + { + let index: usize = dir_entry + .path() + .file_stem() + .expect("If there is an extension, there must be a file stem") + .to_string_lossy() + .parse() + .map_err(|_| MetaReaderError::BadFilename(dir_entry.path()))?; + + // Make sure self.next_file_index = max(self.next_file_index, index + 1) + next_file_index = next_file_index.max(index + 1); + metadata_files.push((dir_entry.path(), index)); + } + } + metadata_files.sort_by(|a, b| a.1.cmp(&b.1)); + + // We use blocking spawn to use bincode::decode_from_std_read + let updates = tokio::task::spawn_blocking(move || { + let mut schemas_updates = vec![]; + + for (metadata_file_path, _) in metadata_files { + // Metadata_file_path is the json metadata descriptor + trace!("Reloading metadata file {}", metadata_file_path.display()); + + let mut file = std::fs::File::open(metadata_file_path)?; + + let commands_file: CommandsFile = + bincode::serde::decode_from_std_read(&mut file, bincode::config::standard())?; + schemas_updates.extend(commands_file.0); + } + + Result::, MetaReaderError>::Ok(schemas_updates) + }) + .await?; + + Ok((next_file_index, updates?)) + } +} + #[derive(Debug)] pub struct FileMetaStorage { root_path: PathBuf, @@ -149,13 +232,32 @@ impl FileMetaStorage { Ok(()) } } + + pub fn as_reader(&self) -> FileMetaReader { + FileMetaReader::new(self.root_path.clone()) + } } #[derive(Serialize, Deserialize)] #[serde(transparent)] struct CommandsFile(Vec); +impl MetaReader for FileMetaReader { + async fn read(&self) -> Result, MetaReaderError> { + let (_, updates) = self.load().await?; + Ok(updates) + } +} + impl MetaStorage for FileMetaStorage { + type Reader = FileMetaReader; + + async fn reload(&mut self) -> Result, MetaStorageError> { + let (next_file_index, updates) = self.as_reader().load().await?; + self.next_file_index = next_file_index; + Ok(updates) + } + async fn store(&mut self, commands: Vec) -> Result<(), MetaStorageError> { let file_path = self .root_path @@ -178,55 +280,8 @@ impl MetaStorage for FileMetaStorage { Ok(()) } - async fn reload(&mut self) -> Result, MetaStorageError> { - let root_path = self.root_path.clone(); - - // Try to create a dir, in case it doesn't exist - restate_fs_util::create_dir_all_if_doesnt_exists(&root_path).await?; - - // Find all the metadata files in the root path directory, parse the index and then sort them by index - let mut read_dir = tokio::fs::read_dir(root_path).await?; - let mut metadata_files = vec![]; - while let Some(dir_entry) = read_dir.next_entry().await? { - if dir_entry - .path() - .extension() - .and_then(|os_str| os_str.to_str()) - == Some(RESTATE_EXTENSION) - { - let index: usize = dir_entry - .path() - .file_stem() - .expect("If there is an extension, there must be a file stem") - .to_string_lossy() - .parse() - .map_err(|_| MetaStorageError::BadFilename(dir_entry.path()))?; - - // Make sure self.next_file_index = max(self.next_file_index, index + 1) - self.next_file_index = self.next_file_index.max(index + 1); - metadata_files.push((dir_entry.path(), index)); - } - } - metadata_files.sort_by(|a, b| a.1.cmp(&b.1)); - - // We use blocking spawn to use bincode::decode_from_std_read - tokio::task::spawn_blocking(move || { - let mut schemas_updates = vec![]; - - for (metadata_file_path, _) in metadata_files { - // Metadata_file_path is the json metadata descriptor - trace!("Reloading metadata file {}", metadata_file_path.display()); - - let mut file = std::fs::File::open(metadata_file_path)?; - - let commands_file: CommandsFile = - bincode::serde::decode_from_std_read(&mut file, bincode::config::standard())?; - schemas_updates.extend(commands_file.0); - } - - Result::, MetaStorageError>::Ok(schemas_updates) - }) - .await? + fn create_reader(&self) -> Self::Reader { + self.as_reader() } } diff --git a/crates/node-services/build.rs b/crates/node-services/build.rs index 15e2be1cf..66bd08865 100644 --- a/crates/node-services/build.rs +++ b/crates/node-services/build.rs @@ -36,5 +36,13 @@ fn main() -> Result<(), Box> { // allow older protobuf compiler to be used .protoc_arg("--experimental_allow_proto3_optional") .compile(&["./proto/worker.proto"], &["proto", "../pb/proto"])?; + + tonic_build::configure() + .bytes(["."]) + .file_descriptor_set_path(out_dir.join("metadata_descriptor.bin")) + // allow older protobuf compiler to be used + .protoc_arg("--experimental_allow_proto3_optional") + .compile(&["./proto/metadata.proto"], &["proto", "../pb/proto"])?; + Ok(()) } diff --git a/crates/node-services/proto/cluster_controller.proto b/crates/node-services/proto/cluster_controller.proto index 086198883..a58030dd7 100644 --- a/crates/node-services/proto/cluster_controller.proto +++ b/crates/node-services/proto/cluster_controller.proto @@ -13,7 +13,7 @@ import "dev/restate/common/common.proto"; package dev.restate.cluster_controller; -service ClusterController { +service ClusterControllerSvc { // Attach node at cluster controller rpc AttachNode(AttachmentRequest) returns (AttachmentResponse); } diff --git a/crates/node-services/proto/metadata.proto b/crates/node-services/proto/metadata.proto new file mode 100644 index 000000000..2f657636b --- /dev/null +++ b/crates/node-services/proto/metadata.proto @@ -0,0 +1,28 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate service protocol, which is +// released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/proto/blob/main/LICENSE + +syntax = "proto3"; + +import "google/protobuf/empty.proto"; + +package dev.restate.metadata; + +service MetadataSvc { + // Fetch the current schema information + rpc FetchSchemas(FetchSchemasRequest) returns (FetchSchemasResponse); +} + +message FetchSchemasRequest { + // provide schema version information +} + +message FetchSchemasResponse { + // todo: Replace with proper protobuf + bytes schemas_bin = 1; +} \ No newline at end of file diff --git a/crates/node-services/proto/node_ctrl.proto b/crates/node-services/proto/node_ctrl.proto index 579161d95..8fe399d89 100644 --- a/crates/node-services/proto/node_ctrl.proto +++ b/crates/node-services/proto/node_ctrl.proto @@ -23,7 +23,7 @@ enum NodeStatus { SHUTTING_DOWN = 3; } -service NodeCtrl { +service NodeCtrlSvc { // Get identity information from this node. rpc GetIdent(google.protobuf.Empty) returns (IdentResponse); } diff --git a/crates/node-services/proto/worker.proto b/crates/node-services/proto/worker.proto index 71d3f0e1b..bb9617337 100644 --- a/crates/node-services/proto/worker.proto +++ b/crates/node-services/proto/worker.proto @@ -13,11 +13,46 @@ import "google/protobuf/empty.proto"; package dev.restate.worker; -service Worker { +service WorkerSvc { // Get the current known version of bifrost metadata on this node rpc GetBifrostVersion(google.protobuf.Empty) returns (BifrostVersion); + + // Terminate the specified invocation + rpc TerminateInvocation(TerminationRequest) returns (google.protobuf.Empty); + + // Mutate the specified state + rpc MutateState(StateMutationRequest) returns (google.protobuf.Empty); + + // Queries the storage of the worker and returns the result as a stream of responses + rpc QueryStorage(StorageQueryRequest) returns (stream StorageQueryResponse); + + // Updates the schema information on the worker node + rpc UpdateSchemas(UpdateSchemaRequest) returns (google.protobuf.Empty); } message BifrostVersion { uint64 version = 1; +} + +message TerminationRequest { + // todo: Replace with proper protobuf + bytes invocation_termination = 1; +} + +message StateMutationRequest { + // todo: Replace with proper protobuf + bytes state_mutation = 1; +} + +message StorageQueryRequest { + string query = 1; +} + +message StorageQueryResponse { + bytes header = 1; + bytes data = 2; +} + +message UpdateSchemaRequest { + bytes schema_bin = 1; } \ No newline at end of file diff --git a/crates/node-services/src/lib.rs b/crates/node-services/src/lib.rs index 01cc2764d..9c56eb564 100644 --- a/crates/node-services/src/lib.rs +++ b/crates/node-services/src/lib.rs @@ -41,3 +41,14 @@ pub mod worker { pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("worker_descriptor"); } + +pub mod metadata { + #![allow(warnings)] + #![allow(clippy::all)] + #![allow(unknown_lints)] + + tonic::include_proto!("dev.restate.metadata"); + + pub const FILE_DESCRIPTOR_SET: &[u8] = + tonic::include_file_descriptor_set!("metadata_descriptor"); +} diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 087f69483..e34260621 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -24,13 +24,19 @@ restate-cluster-controller = { workspace = true } restate-errors = { workspace = true } restate-meta = { workspace = true } restate-node-services = { workspace = true } +restate-schema-api = { workspace = true } +restate-schema-impl = { workspace = true } +restate-storage-query-datafusion = { workspace = true } restate-storage-rocksdb = { workspace = true } restate-types = { workspace = true } restate-worker = { workspace = true } +restate-worker-api = { workspace = true } +arrow-flight = { workspace = true } async-trait = { workspace = true } axum = { workspace = true } codederror = { workspace = true } +datafusion = { workspace = true } derive_builder = { workspace = true } drain = { workspace = true } enumset = { workspace = true } @@ -45,6 +51,7 @@ metrics-util = { version = "0.16.0" } rocksdb = { workspace = true } schemars = { workspace = true, optional = true } serde = { workspace = true } +bincode = { workspace = true } serde_json = { workspace = true } serde_with = { workspace = true } thiserror = { workspace = true } diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 037e0246b..a1664f7a7 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -24,11 +24,11 @@ use tower::service_fn; use tracing::{info, warn}; use crate::roles::{ClusterControllerRole, WorkerRole}; -use crate::server::NodeServer; +use crate::server::{ClusterControllerDependencies, NodeServer, WorkerDependencies}; pub use options::{Options, OptionsBuilder as NodeOptionsBuilder}; pub use restate_admin::OptionsBuilder as AdminOptionsBuilder; pub use restate_meta::OptionsBuilder as MetaOptionsBuilder; -use restate_node_services::cluster_controller::cluster_controller_client::ClusterControllerClient; +use restate_node_services::cluster_controller::cluster_controller_svc_client::ClusterControllerSvcClient; use restate_node_services::cluster_controller::AttachmentRequest; use restate_types::nodes_config::{ NetworkAddress, NodeConfig, NodesConfiguration, NodesConfigurationWriter, Role, @@ -75,6 +75,12 @@ pub enum BuildError { #[code] roles::WorkerRoleBuildError, ), + #[error("building cluster controller failed: {0}")] + ClusterController( + #[from] + #[code] + roles::ClusterControllerRoleBuildError, + ), #[error("node neither runs cluster controller nor its address has been configured")] #[code(unknown)] UnknownClusterController, @@ -93,7 +99,7 @@ impl Node { pub fn new(options: Options) -> Result { let opts = options.clone(); let cluster_controller_role = if options.roles.contains(Role::ClusterController) { - Some(ClusterControllerRole::try_from(options.clone()).expect("should be infallible")) + Some(ClusterControllerRole::try_from(options.clone())?) } else { None }; @@ -105,12 +111,22 @@ impl Node { }; let server = options.server.build( - worker_role - .as_ref() - .map(|worker| (worker.rocksdb_storage().clone(), worker.bifrost_handle())), - cluster_controller_role - .as_ref() - .map(|cluster_controller| cluster_controller.handle()), + worker_role.as_ref().map(|worker| { + WorkerDependencies::new( + worker.rocksdb_storage().clone(), + worker.bifrost_handle(), + worker.worker_command_tx(), + worker.storage_query_context().clone(), + worker.schemas(), + worker.subscription_controller(), + ) + }), + cluster_controller_role.as_ref().map(|cluster_controller| { + ClusterControllerDependencies::new( + cluster_controller.handle(), + cluster_controller.schema_reader(), + ) + }), ); let cluster_controller_address = if let Some(cluster_controller_address) = @@ -180,7 +196,11 @@ impl Node { if let Some(worker_role) = self.worker_role { component_set.spawn( worker_role - .run(component_shutdown_watch) + .run( + NodeId::my_node_node() + .expect("my NodeId should be set after attaching to cluster"), + component_shutdown_watch, + ) .map_ok(|_| "worker-role") .map_err(Error::Worker), ); @@ -216,7 +236,7 @@ impl Node { let channel = Self::create_channel_from_network_address(&cluster_controller_address) .map_err(Error::InvalidClusterControllerAddress)?; - let cc_client = ClusterControllerClient::new(channel); + let cc_client = ClusterControllerSvcClient::new(channel); let _response = RetryPolicy::exponential(Duration::from_millis(50), 2.0, 10, None) .retry_operation(|| async { diff --git a/crates/node/src/roles/cluster_controller.rs b/crates/node/src/roles/cluster_controller.rs index eef0b65ea..dc4668e91 100644 --- a/crates/node/src/roles/cluster_controller.rs +++ b/crates/node/src/roles/cluster_controller.rs @@ -10,8 +10,17 @@ use crate::Options; use codederror::CodedError; +use futures::TryFutureExt; +use restate_admin::service::AdminService; use restate_cluster_controller::ClusterControllerHandle; -use std::convert::Infallible; +use restate_meta::{FileMetaReader, FileMetaStorage, MetaService}; +use restate_node_services::worker::{StateMutationRequest, TerminationRequest}; +use restate_types::invocation::InvocationTermination; +use restate_types::state_mut::ExternalStateMutation; +use restate_worker::KafkaIngressOptions; +use restate_worker_api::{Error, Handle}; +use tokio::task::{JoinError, JoinSet}; +use tonic::transport::Channel; use tracing::info; #[derive(Debug, thiserror::Error, CodedError)] @@ -22,11 +31,38 @@ pub enum ClusterControllerRoleError { #[code] restate_cluster_controller::Error, ), + #[error("cluster controller role component panicked: {0}")] + #[code(unknown)] + ComponentPanic(#[from] JoinError), + #[error("admin component failed: {0}")] + Admin( + #[from] + #[code] + restate_admin::Error, + ), + #[error("meta component failed: {0}")] + Meta( + #[from] + #[code] + restate_meta::Error, + ), +} + +#[derive(Debug, thiserror::Error, CodedError)] +pub enum ClusterControllerRoleBuildError { + #[error("failed creating meta: {0}")] + Meta( + #[from] + #[code] + restate_meta::BuildError, + ), } #[derive(Debug)] pub struct ClusterControllerRole { controller: restate_cluster_controller::Service, + admin: AdminService, + meta: MetaService, } impl ClusterControllerRole { @@ -34,24 +70,66 @@ impl ClusterControllerRole { self.controller.handle() } - pub async fn run(self, shutdown_watch: drain::Watch) -> Result<(), ClusterControllerRoleError> { + pub fn schema_reader(&self) -> FileMetaReader { + self.meta.schema_reader() + } + + pub async fn run( + mut self, + shutdown_watch: drain::Watch, + ) -> Result<(), ClusterControllerRoleError> { info!("Running cluster controller role"); let shutdown_signal = shutdown_watch.signaled(); let (inner_shutdown_signal, inner_shutdown_watch) = drain::channel(); - let controller_fut = self.controller.run(inner_shutdown_watch); - tokio::pin!(controller_fut); + let mut component_set = JoinSet::new(); + + // Init the meta. This will reload the schemas in memory. + self.meta + .init() + .await + .map_err(ClusterControllerRoleError::Meta)?; + + component_set.spawn( + self.meta + .run(inner_shutdown_watch.clone()) + .map_ok(|_| "meta-service") + .map_err(ClusterControllerRoleError::Meta), + ); + + component_set.spawn( + self.controller + .run(inner_shutdown_watch.clone()) + .map_ok(|_| "cluster-controller") + .map_err(ClusterControllerRoleError::ClusterController), + ); + + // todo: Make address configurable + let worker_channel = + Channel::builder("http://127.0.0.1:5122/".parse().expect("valid uri")).connect_lazy(); + let worker_handle = GrpcWorkerHandle::new(worker_channel.clone()); + let worker_svc_client = + restate_node_services::worker::worker_svc_client::WorkerSvcClient::new(worker_channel); + + component_set.spawn( + self.admin + .run(inner_shutdown_watch, worker_handle, worker_svc_client) + .map_ok(|_| "admin") + .map_err(ClusterControllerRoleError::Admin), + ); tokio::select! { _ = shutdown_signal => { - info!("Stopping controller role"); + info!("Stopping cluster controller role"); // ignore result because we are shutting down - let _ = tokio::join!(inner_shutdown_signal.drain(), controller_fut); + inner_shutdown_signal.drain().await; + component_set.shutdown().await; }, - controller_result = &mut controller_fut => { - controller_result?; - panic!("Unexpected termination of controller"); + Some(component_result) = component_set.join_next() => { + let component_name = component_result.map_err(ClusterControllerRoleError::ComponentPanic)??; + panic!("Unexpected termination of cluster controller role component '{component_name}'"); + } } @@ -61,11 +139,69 @@ impl ClusterControllerRole { } impl TryFrom for ClusterControllerRole { - type Error = Infallible; + type Error = ClusterControllerRoleBuildError; fn try_from(options: Options) -> Result { + let meta = options.meta.build(options.worker.kafka.clone())?; + let admin = options + .admin + .build(meta.schemas(), meta.meta_handle(), meta.schema_reader()); + Ok(ClusterControllerRole { controller: restate_cluster_controller::Service::new(options.cluster_controller), + admin, + meta, }) } } + +#[derive(Debug, Clone)] +struct GrpcWorkerHandle { + grpc_client: restate_node_services::worker::worker_svc_client::WorkerSvcClient, +} + +impl GrpcWorkerHandle { + fn new(channel: Channel) -> Self { + GrpcWorkerHandle { + grpc_client: restate_node_services::worker::worker_svc_client::WorkerSvcClient::new( + channel, + ), + } + } +} + +impl Handle for GrpcWorkerHandle { + async fn terminate_invocation( + &self, + invocation_termination: InvocationTermination, + ) -> Result<(), Error> { + let invocation_termination = + bincode::serde::encode_to_vec(invocation_termination, bincode::config::standard()) + .expect("serialization should work"); + + self.grpc_client + .clone() + .terminate_invocation(TerminationRequest { + invocation_termination: invocation_termination.into(), + }) + .await + .map(|resp| resp.into_inner()) + // todo: Proper error handling + .map_err(|_err| Error::Unreachable) + } + + async fn external_state_mutation(&self, mutation: ExternalStateMutation) -> Result<(), Error> { + let state_mutation = bincode::serde::encode_to_vec(mutation, bincode::config::standard()) + .expect("serialization should work"); + + self.grpc_client + .clone() + .mutate_state(StateMutationRequest { + state_mutation: state_mutation.into(), + }) + .await + .map(|resp| resp.into_inner()) + // todo: Proper error handling + .map_err(|_err| Error::Unreachable) + } +} diff --git a/crates/node/src/roles/mod.rs b/crates/node/src/roles/mod.rs index 02152f727..3c4b36ed2 100644 --- a/crates/node/src/roles/mod.rs +++ b/crates/node/src/roles/mod.rs @@ -11,5 +11,7 @@ mod cluster_controller; mod worker; -pub use cluster_controller::{ClusterControllerRole, ClusterControllerRoleError}; -pub use worker::{WorkerRole, WorkerRoleBuildError, WorkerRoleError}; +pub use cluster_controller::{ + ClusterControllerRole, ClusterControllerRoleBuildError, ClusterControllerRoleError, +}; +pub use worker::{update_schemas, WorkerRole, WorkerRoleBuildError, WorkerRoleError}; diff --git a/crates/node/src/roles/worker.rs b/crates/node/src/roles/worker.rs index 92415a7ee..153663c9e 100644 --- a/crates/node/src/roles/worker.rs +++ b/crates/node/src/roles/worker.rs @@ -10,27 +10,25 @@ use crate::Options; use codederror::CodedError; -use restate_admin::service::AdminService; +use futures::TryFutureExt; use restate_bifrost::{Bifrost, BifrostService}; -use restate_meta::{FileMetaStorage, MetaService}; +use restate_node_services::metadata::metadata_svc_client::MetadataSvcClient; +use restate_node_services::metadata::FetchSchemasRequest; +use restate_schema_api::subscription::SubscriptionResolver; +use restate_schema_impl::{Schemas, SchemasUpdateCommand}; +use restate_storage_query_datafusion::context::QueryContext; use restate_storage_rocksdb::RocksDBStorage; -use restate_worker::Worker; -use tracing::info; +use restate_types::NodeId; +use restate_worker::{SubscriptionControllerHandle, Worker, WorkerCommandSender}; +use restate_worker_api::SubscriptionController; +use std::time::Duration; +use tokio::task::JoinSet; +use tonic::transport::Channel; +use tracing::subscriber::NoSubscriber; +use tracing::{debug, info}; #[derive(Debug, thiserror::Error, CodedError)] pub enum WorkerRoleError { - #[error("admin service failed: {0}")] - AdminService( - #[from] - #[code] - restate_admin::Error, - ), - #[error("meta failed: {0}")] - MetaService( - #[from] - #[code] - restate_meta::Error, - ), #[error("worker failed: {0}")] Worker( #[from] @@ -40,18 +38,34 @@ pub enum WorkerRoleError { #[error("bifrost failed: {0}")] #[code(unknown)] Bifrost(#[from] restate_bifrost::Error), - #[error("admin panicked: {0}")] + #[error("component panicked: {0}")] #[code(unknown)] - AdminPanic(tokio::task::JoinError), - #[error("meta panicked: {0}")] + ComponentPanic(tokio::task::JoinError), + #[error(transparent)] + Schema( + #[from] + #[code] + SchemaError, + ), +} + +#[derive(Debug, thiserror::Error, CodedError)] +pub enum SchemaError { + #[error("failed to fetch schema updates: {0}")] #[code(unknown)] - MetaPanic(tokio::task::JoinError), - #[error("worker panicked: {0}")] + Fetch(#[from] tonic::Status), + #[error("failed decoding grpc payload: {0}")] #[code(unknown)] - WorkerPanic(tokio::task::JoinError), - #[error("bifrost panicked: {0}")] + Decode(#[from] bincode::error::DecodeError), + #[error("failed updating schemas: {0}")] + Update( + #[from] + #[code] + restate_schema_impl::SchemasUpdateError, + ), + #[error("failed updating subscriptions: {0}")] #[code(unknown)] - BifrostPanic(tokio::task::JoinError), + Subscription(#[from] restate_worker_api::Error), } #[derive(Debug, thiserror::Error, CodedError)] @@ -71,8 +85,7 @@ pub enum WorkerRoleBuildError { } pub struct WorkerRole { - admin: AdminService, - meta: MetaService, + schemas: Schemas, worker: Worker, bifrost: BifrostService, } @@ -86,57 +99,154 @@ impl WorkerRole { self.bifrost.handle() } - pub async fn run(mut self, shutdown_watch: drain::Watch) -> Result<(), WorkerRoleError> { + pub fn worker_command_tx(&self) -> WorkerCommandSender { + self.worker.worker_command_tx() + } + + pub fn storage_query_context(&self) -> &QueryContext { + self.worker.storage_query_context() + } + + pub fn schemas(&self) -> Schemas { + self.schemas.clone() + } + + pub fn subscription_controller(&self) -> Option { + Some(self.worker.subscription_controller_handle()) + } + + pub async fn run( + self, + _node_id: NodeId, + shutdown_watch: drain::Watch, + ) -> Result<(), WorkerRoleError> { let shutdown_signal = shutdown_watch.signaled(); let (inner_shutdown_signal, inner_shutdown_watch) = drain::channel(); - // Init the meta. This will reload the schemas in memory. - self.meta.init().await?; + // todo: only run subscriptions on node 0 once being distributed + let subscription_controller = Some(self.worker.subscription_controller_handle()); - let worker_command_tx = self.worker.worker_command_tx(); - let storage_query_context = self.worker.storage_query_context().clone(); - - let mut meta_handle = tokio::spawn( - self.meta - .run(inner_shutdown_watch.clone(), worker_command_tx.clone()), - ); - let mut admin_handle = tokio::spawn(self.admin.run( - inner_shutdown_watch.clone(), - worker_command_tx, - Some(storage_query_context), - )); + let mut component_set = JoinSet::new(); // Ensures bifrost has initial metadata synced up before starting the worker. - let mut bifrost_handle = self.bifrost.start(inner_shutdown_watch.clone()).await?; + let mut bifrost_join_handle = self.bifrost.start(inner_shutdown_watch.clone()).await?; + + // todo: make this configurable + let channel = + Channel::builder("http://127.0.0.1:5122/".parse().expect("valid uri")).connect_lazy(); + let mut metadata_svc_client = MetadataSvcClient::new(channel); + + // Fetch latest schema information and fail if this is not possible + Self::fetch_and_update_schemas( + &self.schemas, + subscription_controller.as_ref(), + &mut metadata_svc_client, + ) + .await?; + + component_set.spawn( + self.worker + .run(inner_shutdown_watch) + .map_ok(|_| "worker") + .map_err(WorkerRoleError::Worker), + ); - let mut worker_handle = tokio::spawn(self.worker.run(inner_shutdown_watch)); + component_set.spawn( + Self::reload_schemas(subscription_controller, self.schemas, metadata_svc_client) + .map_ok(|_| "schema-update"), + ); tokio::select! { _ = shutdown_signal => { info!("Stopping worker role"); - let _ = tokio::join!(inner_shutdown_signal.drain(), admin_handle, meta_handle, worker_handle, bifrost_handle); - }, - result = &mut meta_handle => { - result.map_err(WorkerRoleError::MetaPanic)??; - panic!("Unexpected termination of meta."); - }, - result = &mut admin_handle => { - result.map_err(WorkerRoleError::AdminPanic)??; - panic!("Unexpected termination of admin."); + inner_shutdown_signal.drain().await; + // ignoring result because we are shutting down + let _ = tokio::join!(component_set.shutdown(), bifrost_join_handle); }, - result = &mut worker_handle => { - result.map_err(WorkerRoleError::WorkerPanic)??; - panic!("Unexpected termination of worker."); + Some(component_result) = component_set.join_next() => { + let component_name = component_result.map_err(WorkerRoleError::ComponentPanic)??; + panic!("Unexpected termination of component '{component_name}'"); } - result = &mut bifrost_handle => { - result.map_err(WorkerRoleError::BifrostPanic)??; - panic!("Unexpected termination of bifrost service."); - }, + bifrost_result = &mut bifrost_join_handle => { + bifrost_result.map_err(WorkerRoleError::ComponentPanic)??; + panic!("Unexpected termination of bifrost service"); + } + } + + Ok(()) + } + + async fn reload_schemas( + subscription_controller: Option, + schemas: Schemas, + mut metadata_svc_client: MetadataSvcClient, + ) -> Result<(), WorkerRoleError> + where + SC: SubscriptionController + Clone + Send + Sync, + { + // todo: make this configurable + let mut fetch_interval = tokio::time::interval(Duration::from_secs(5)); + + loop { + fetch_interval.tick().await; + + debug!("Trying to fetch schema information"); + + Self::ignore_fetch_error( + Self::fetch_and_update_schemas( + &schemas, + subscription_controller.as_ref(), + &mut metadata_svc_client, + ) + .await, + )?; } + } + + fn ignore_fetch_error(result: Result<(), SchemaError>) -> Result<(), SchemaError> { + if let Err(err) = result { + match err { + SchemaError::Fetch(err) => { + debug!("Failed fetching schema information: {err}. Retrying."); + } + SchemaError::Decode(_) | SchemaError::Update(_) | SchemaError::Subscription(_) => { + Err(err)? + } + } + } + Ok(()) + } + + async fn fetch_and_update_schemas( + schemas: &Schemas, + subscription_controller: Option<&SC>, + metadata_svc_client: &mut MetadataSvcClient, + ) -> Result<(), SchemaError> + where + SC: SubscriptionController + Send + Sync, + { + let schema_updates = Self::fetch_schemas(metadata_svc_client).await?; + update_schemas(schemas, subscription_controller, schema_updates).await?; Ok(()) } + + async fn fetch_schemas( + metadata_svc_client: &mut MetadataSvcClient, + ) -> Result, SchemaError> { + let response = metadata_svc_client + // todo introduce schema version information to avoid fetching and overwriting the schema information + // over and over again + .fetch_schemas(FetchSchemasRequest {}) + .await?; + + let (schema_updates, _) = bincode::serde::decode_from_slice::, _>( + &response.into_inner().schemas_bin, + bincode::config::standard(), + )?; + Ok(schema_updates) + } } impl TryFrom for WorkerRole { @@ -144,17 +254,34 @@ impl TryFrom for WorkerRole { fn try_from(options: Options) -> Result { let bifrost = options.bifrost.build(options.worker.partitions); - let meta = options.meta.build()?; - let admin = options - .admin - .build(meta.schemas(), meta.meta_handle(), bifrost.handle()); - let worker = options.worker.build(meta.schemas(), bifrost.handle())?; + let schemas = Schemas::default(); + let worker = options.worker.build(schemas.clone(), bifrost.handle())?; Ok(WorkerRole { - admin, - meta, + schemas, worker, bifrost, }) } } + +pub async fn update_schemas( + schemas: &Schemas, + subscription_controller: Option<&SC>, + schema_updates: Vec, +) -> Result<(), SchemaError> +where + SC: SubscriptionController + Send + Sync, +{ + // hack to suppress repeated logging of schema registrations + // todo: Fix it + tracing::subscriber::with_default(NoSubscriber::new(), || schemas.overwrite(schema_updates))?; + + if let Some(subscription_controller) = subscription_controller { + let subscriptions = schemas.list_subscriptions(&[]); + subscription_controller + .update_subscriptions(subscriptions) + .await?; + } + Ok(()) +} diff --git a/crates/node/src/server/handler/cluster_controller.rs b/crates/node/src/server/handler/cluster_controller.rs index 0e51feb63..b07941f74 100644 --- a/crates/node/src/server/handler/cluster_controller.rs +++ b/crates/node/src/server/handler/cluster_controller.rs @@ -8,7 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use restate_node_services::cluster_controller::cluster_controller_server::ClusterController; +use restate_node_services::cluster_controller::cluster_controller_svc_server::ClusterControllerSvc; use restate_node_services::cluster_controller::{AttachmentRequest, AttachmentResponse}; use tonic::{async_trait, Request, Response, Status}; use tracing::debug; @@ -22,7 +22,7 @@ impl ClusterControllerHandler { } #[async_trait] -impl ClusterController for ClusterControllerHandler { +impl ClusterControllerSvc for ClusterControllerHandler { async fn attach_node( &self, request: Request, diff --git a/crates/node/src/server/handler/metadata.rs b/crates/node/src/server/handler/metadata.rs new file mode 100644 index 000000000..239179589 --- /dev/null +++ b/crates/node/src/server/handler/metadata.rs @@ -0,0 +1,52 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use restate_meta::MetaReader; +use restate_node_services::metadata::metadata_svc_server::MetadataSvc; +use restate_node_services::metadata::{FetchSchemasRequest, FetchSchemasResponse}; +use tonic::{Request, Response, Status}; + +pub struct MetadataHandler { + schema_reader: S, +} + +impl MetadataHandler { + pub fn new(meta_reader: S) -> Self { + Self { + schema_reader: meta_reader, + } + } +} + +#[async_trait::async_trait] +impl MetadataSvc for MetadataHandler +where + S: MetaReader + Send + Sync + 'static, +{ + async fn fetch_schemas( + &self, + _request: Request, + ) -> Result, Status> { + let schema_updates = self.schema_reader.read().await.map_err(|err| { + Status::internal(format!("Could not read schema information: '{}'", err)) + })?; + + let serialized_updates = + bincode::serde::encode_to_vec(schema_updates, bincode::config::standard()).map_err( + |err| { + Status::internal(format!("Could not serialize schema information: '{}'", err)) + }, + )?; + + Ok(Response::new(FetchSchemasResponse { + schemas_bin: serialized_updates.into(), + })) + } +} diff --git a/crates/node/src/server/handler/mod.rs b/crates/node/src/server/handler/mod.rs index 3aebe0b91..ab6ea60b5 100644 --- a/crates/node/src/server/handler/mod.rs +++ b/crates/node/src/server/handler/mod.rs @@ -9,6 +9,7 @@ // by the Apache License, Version 2.0. pub mod cluster_controller; +pub mod metadata; pub mod node_ctrl; pub mod worker; diff --git a/crates/node/src/server/handler/node_ctrl.rs b/crates/node/src/server/handler/node_ctrl.rs index 003283d86..eda79a9b4 100644 --- a/crates/node/src/server/handler/node_ctrl.rs +++ b/crates/node/src/server/handler/node_ctrl.rs @@ -8,13 +8,12 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use restate_node_services::node_ctrl::node_ctrl_server::NodeCtrl; +use restate_node_services::node_ctrl::node_ctrl_svc_server::NodeCtrlSvc; use restate_node_services::node_ctrl::{IdentResponse, NodeStatus}; use restate_types::nodes_config::NodesConfiguration; use restate_types::NodeId; use tonic::{Request, Response, Status}; -// -- GRPC Service Handlers -- pub struct NodeCtrlHandler {} impl NodeCtrlHandler { @@ -24,7 +23,7 @@ impl NodeCtrlHandler { } #[async_trait::async_trait] -impl NodeCtrl for NodeCtrlHandler { +impl NodeCtrlSvc for NodeCtrlHandler { async fn get_ident(&self, _request: Request<()>) -> Result, Status> { // STUB IMPLEMENTATION return Ok(Response::new(IdentResponse { diff --git a/crates/node/src/server/handler/worker.rs b/crates/node/src/server/handler/worker.rs index a4a58742f..8975b5410 100644 --- a/crates/node/src/server/handler/worker.rs +++ b/crates/node/src/server/handler/worker.rs @@ -8,24 +8,50 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use arrow_flight::encode::FlightDataEncoderBuilder; +use arrow_flight::error::FlightError; +use futures::stream::BoxStream; +use futures::TryStreamExt; use restate_bifrost::Bifrost; -use restate_node_services::worker::worker_server::Worker; -use restate_node_services::worker::BifrostVersion; +use restate_node_services::worker::worker_svc_server::WorkerSvc; +use restate_node_services::worker::{ + BifrostVersion, StateMutationRequest, StorageQueryRequest, StorageQueryResponse, + TerminationRequest, UpdateSchemaRequest, +}; +use restate_schema_impl::{Schemas, SchemasUpdateCommand}; +use restate_storage_query_datafusion::context::QueryContext; +use restate_worker::{SubscriptionControllerHandle, WorkerCommandSender}; +use restate_worker_api::Handle; use tonic::{Request, Response, Status}; -// -- GRPC Service Handlers -- pub struct WorkerHandler { bifrost: Bifrost, + worker_cmd_tx: WorkerCommandSender, + query_context: QueryContext, + schemas: Schemas, + subscription_controller: Option, } impl WorkerHandler { - pub fn new(bifrost: Bifrost) -> Self { - Self { bifrost } + pub fn new( + bifrost: Bifrost, + worker_cmd_tx: WorkerCommandSender, + query_context: QueryContext, + schemas: Schemas, + subscription_controller: Option, + ) -> Self { + Self { + bifrost, + worker_cmd_tx, + query_context, + schemas, + subscription_controller, + } } } #[async_trait::async_trait] -impl Worker for WorkerHandler { +impl WorkerSvc for WorkerHandler { async fn get_bifrost_version( &self, _request: Request<()>, @@ -35,4 +61,94 @@ impl Worker for WorkerHandler { version: version.into(), })); } + + async fn terminate_invocation( + &self, + request: Request, + ) -> Result, Status> { + let (invocation_termination, _) = bincode::serde::decode_from_slice( + &request.into_inner().invocation_termination, + bincode::config::standard(), + ) + .map_err(|err| Status::invalid_argument(err.to_string()))?; + + self.worker_cmd_tx + .terminate_invocation(invocation_termination) + .await + .map_err(|_| Status::unavailable("worker shut down"))?; + + Ok(Response::new(())) + } + + async fn mutate_state( + &self, + request: Request, + ) -> Result, Status> { + let (state_mutation, _) = bincode::serde::decode_from_slice( + &request.into_inner().state_mutation, + bincode::config::standard(), + ) + .map_err(|err| Status::invalid_argument(err.to_string()))?; + + self.worker_cmd_tx + .external_state_mutation(state_mutation) + .await + .map_err(|_| Status::unavailable("worker shut down"))?; + + Ok(Response::new(())) + } + + type QueryStorageStream = BoxStream<'static, Result>; + + async fn query_storage( + &self, + request: Request, + ) -> Result, Status> { + let query = request.into_inner().query; + + let record_stream = self.query_context.execute(&query).await.map_err(|err| { + Status::internal(format!("failed executing the query '{}': {}", query, err)) + })?; + + let schema = record_stream.schema(); + + let response_stream = + FlightDataEncoderBuilder::new() + // CLI is expecting schema information + .with_schema(schema) + .build(record_stream.map_err(|err| { + FlightError::from(datafusion::arrow::error::ArrowError::from(err)) + })) + .map_ok(|flight_data| StorageQueryResponse { + header: flight_data.data_header, + data: flight_data.data_body, + }) + .map_err(Status::from); + + Ok(Response::new(Box::pin(response_stream))) + } + + async fn update_schemas( + &self, + request: Request, + ) -> Result, Status> { + let (schema_updates, _) = + bincode::serde::decode_from_slice::, _>( + &request.into_inner().schema_bin, + bincode::config::standard(), + ) + .map_err(|err| Status::invalid_argument(err.to_string()))?; + + crate::roles::update_schemas( + &self.schemas, + self.subscription_controller.as_ref(), + schema_updates, + ) + .await + .map_err(|err| { + Status::internal(format!("failed updating the schema information: {err}")) + })?; + + Ok(Response::new(())) + } } diff --git a/crates/node/src/server/mod.rs b/crates/node/src/server/mod.rs index 01d92e358..e589808af 100644 --- a/crates/node/src/server/mod.rs +++ b/crates/node/src/server/mod.rs @@ -17,4 +17,4 @@ mod service; mod state; pub use options::Options; -pub use service::{Error, NodeServer}; +pub use service::{ClusterControllerDependencies, Error, NodeServer, WorkerDependencies}; diff --git a/crates/node/src/server/options.rs b/crates/node/src/server/options.rs index 24e365b71..f346335ff 100644 --- a/crates/node/src/server/options.rs +++ b/crates/node/src/server/options.rs @@ -10,10 +10,7 @@ use std::net::SocketAddr; -use crate::server::service::NodeServer; -use restate_bifrost::Bifrost; -use restate_cluster_controller::ClusterControllerHandle; -use restate_storage_rocksdb::RocksDBStorage; +use crate::server::service::{ClusterControllerDependencies, NodeServer, WorkerDependencies}; use serde_with::serde_as; /// # Node server options @@ -51,8 +48,8 @@ impl Default for Options { impl Options { pub fn build( self, - worker: Option<(RocksDBStorage, Bifrost)>, - cluster_controller: Option, + worker: Option, + cluster_controller: Option, ) -> NodeServer { NodeServer::new(self, worker, cluster_controller) } diff --git a/crates/node/src/server/service.rs b/crates/node/src/server/service.rs index 62a03d852..e6651dd90 100644 --- a/crates/node/src/server/service.rs +++ b/crates/node/src/server/service.rs @@ -15,19 +15,25 @@ use codederror::CodedError; use futures::FutureExt; use restate_bifrost::Bifrost; use restate_cluster_controller::ClusterControllerHandle; +use restate_meta::FileMetaReader; use restate_storage_rocksdb::RocksDBStorage; use tower_http::trace::TraceLayer; use tracing::info; use crate::server::handler; use crate::server::handler::cluster_controller::ClusterControllerHandler; +use crate::server::handler::metadata::MetadataHandler; use crate::server::handler::node_ctrl::NodeCtrlHandler; use crate::server::handler::worker::WorkerHandler; use crate::server::metrics::install_global_prometheus_recorder; -use restate_node_services::cluster_controller::cluster_controller_server::ClusterControllerServer; -use restate_node_services::node_ctrl::node_ctrl_server::NodeCtrlServer; -use restate_node_services::worker::worker_server::WorkerServer; -use restate_node_services::{cluster_controller, node_ctrl, worker}; +use restate_node_services::cluster_controller::cluster_controller_svc_server::ClusterControllerSvcServer; +use restate_node_services::metadata::metadata_svc_server::MetadataSvcServer; +use restate_node_services::node_ctrl::node_ctrl_svc_server::NodeCtrlSvcServer; +use restate_node_services::worker::worker_svc_server::WorkerSvcServer; +use restate_node_services::{cluster_controller, metadata, node_ctrl, worker}; +use restate_schema_impl::Schemas; +use restate_storage_query_datafusion::context::QueryContext; +use restate_worker::{SubscriptionControllerHandle, WorkerCommandSender}; use crate::server::multiplex::MultiplexService; use crate::server::options::Options; @@ -52,15 +58,15 @@ pub enum Error { pub struct NodeServer { opts: Options, - worker: Option<(RocksDBStorage, Bifrost)>, - cluster_controller: Option, + worker: Option, + cluster_controller: Option, } impl NodeServer { pub fn new( opts: Options, - worker: Option<(RocksDBStorage, Bifrost)>, - cluster_controller: Option, + worker: Option, + cluster_controller: Option, ) -> Self { Self { opts, @@ -73,7 +79,7 @@ impl NodeServer { // Configure Metric Exporter let mut state_builder = HandlerStateBuilder::default(); - if let Some((rocksdb, _)) = self.worker.as_ref() { + if let Some(WorkerDependencies { rocksdb, .. }) = self.worker.as_ref() { state_builder.rocksdb_storage(Some(rocksdb.clone())); } @@ -102,7 +108,8 @@ impl NodeServer { if self.cluster_controller.is_some() { reflection_service_builder = reflection_service_builder - .register_encoded_file_descriptor_set(cluster_controller::FILE_DESCRIPTOR_SET); + .register_encoded_file_descriptor_set(cluster_controller::FILE_DESCRIPTOR_SET) + .register_encoded_file_descriptor_set(metadata::FILE_DESCRIPTOR_SET); } if self.worker.is_some() { @@ -112,17 +119,33 @@ impl NodeServer { let mut server_builder = tonic::transport::Server::builder() .layer(TraceLayer::new_for_grpc().make_span_with(span_factory)) - .add_service(NodeCtrlServer::new(NodeCtrlHandler::new())) + .add_service(NodeCtrlSvcServer::new(NodeCtrlHandler::new())) .add_service(reflection_service_builder.build()?); - if self.cluster_controller.is_some() { + if let Some(ClusterControllerDependencies { schema_reader, .. }) = self.cluster_controller { server_builder = server_builder - .add_service(ClusterControllerServer::new(ClusterControllerHandler::new())); + .add_service(ClusterControllerSvcServer::new( + ClusterControllerHandler::new(), + )) + .add_service(MetadataSvcServer::new(MetadataHandler::new(schema_reader))); } - if let Some((_, bifrost)) = self.worker { - server_builder = - server_builder.add_service(WorkerServer::new(WorkerHandler::new(bifrost))); + if let Some(WorkerDependencies { + bifrost, + worker_cmd_tx, + query_context, + schemas, + subscription_controller, + .. + }) = self.worker + { + server_builder = server_builder.add_service(WorkerSvcServer::new(WorkerHandler::new( + bifrost, + worker_cmd_tx, + query_context, + schemas, + subscription_controller, + ))); } // Multiplex both grpc and http based on content-type @@ -161,3 +184,49 @@ async fn handler_404() -> (http::StatusCode, &'static str) { "Are you lost? Maybe visit https://restate.dev instead!", ) } + +pub struct WorkerDependencies { + rocksdb: RocksDBStorage, + bifrost: Bifrost, + worker_cmd_tx: WorkerCommandSender, + query_context: QueryContext, + schemas: Schemas, + subscription_controller: Option, +} + +impl WorkerDependencies { + pub fn new( + rocksdb: RocksDBStorage, + bifrost: Bifrost, + worker_cmd_tx: WorkerCommandSender, + query_context: QueryContext, + schemas: Schemas, + subscription_controller: Option, + ) -> Self { + WorkerDependencies { + rocksdb, + bifrost, + worker_cmd_tx, + query_context, + schemas, + subscription_controller, + } + } +} + +pub struct ClusterControllerDependencies { + _cluster_controller_handle: ClusterControllerHandle, + schema_reader: FileMetaReader, +} + +impl ClusterControllerDependencies { + pub fn new( + cluster_controller_handle: ClusterControllerHandle, + schema_reader: FileMetaReader, + ) -> Self { + ClusterControllerDependencies { + _cluster_controller_handle: cluster_controller_handle, + schema_reader, + } + } +} diff --git a/crates/schema-impl/src/lib.rs b/crates/schema-impl/src/lib.rs index c5f83474b..c5387f466 100644 --- a/crates/schema-impl/src/lib.rs +++ b/crates/schema-impl/src/lib.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::schemas_impl::SchemasInner; use arc_swap::ArcSwap; use http::Uri; use prost_reflect::{DescriptorPool, ServiceDescriptor}; @@ -244,7 +245,7 @@ impl Schemas { source: Uri, sink: Uri, metadata: Option>, - validator: V, + validator: &V, ) -> Result<(Subscription, SchemasUpdateCommand), SchemasUpdateError> { self.0 .load() @@ -268,38 +269,23 @@ impl Schemas { updates: impl IntoIterator, ) -> Result<(), SchemasUpdateError> { let mut schemas_inner = schemas_impl::SchemasInner::clone(self.0.load().as_ref()); - for cmd in updates { - match cmd { - SchemasUpdateCommand::InsertDeployment { - deployment_id, - metadata, - services, - descriptor_pool, - } => { - schemas_inner.apply_insert_deployment( - deployment_id, - metadata, - services, - descriptor_pool, - )?; - } - SchemasUpdateCommand::RemoveDeployment { deployment_id } => { - schemas_inner.apply_remove_deployment(deployment_id)?; - } - SchemasUpdateCommand::RemoveService { name, revision } => { - schemas_inner.apply_remove_service(name, revision)?; - } - SchemasUpdateCommand::ModifyService { name, public } => { - schemas_inner.apply_modify_service(name, public)?; - } - SchemasUpdateCommand::AddSubscription(sub) => { - schemas_inner.apply_add_subscription(sub)?; - } - SchemasUpdateCommand::RemoveSubscription(sub_id) => { - schemas_inner.apply_remove_subscription(sub_id)?; - } - } - } + schemas_inner.apply_updates(updates)?; + self.0.store(Arc::new(schemas_inner)); + + Ok(()) + } + + /// Overwrites the existing schema registry with the provided schema updates + /// This method will update the internal pointer to the in-memory schema registry, + /// propagating the changes to every component consuming it. + /// + /// IMPORTANT: This method is not thread safe! This method should be called only by a single thread. + pub fn overwrite( + &self, + updates: impl IntoIterator, + ) -> Result<(), SchemasUpdateError> { + let mut schemas_inner = SchemasInner::default(); + schemas_inner.apply_updates(updates)?; self.0.store(Arc::new(schemas_inner)); Ok(()) diff --git a/crates/schema-impl/src/schemas_impl/mod.rs b/crates/schema-impl/src/schemas_impl/mod.rs index f83978a52..6ce2690b6 100644 --- a/crates/schema-impl/src/schemas_impl/mod.rs +++ b/crates/schema-impl/src/schemas_impl/mod.rs @@ -46,6 +46,48 @@ pub(crate) struct SchemasInner { pub(crate) proto_symbols: ProtoSymbols, } +impl SchemasInner { + pub fn apply_updates( + &mut self, + updates: impl IntoIterator, + ) -> Result<(), SchemasUpdateError> { + for cmd in updates { + match cmd { + SchemasUpdateCommand::InsertDeployment { + deployment_id, + metadata, + services, + descriptor_pool, + } => { + self.apply_insert_deployment( + deployment_id, + metadata, + services, + descriptor_pool, + )?; + } + SchemasUpdateCommand::RemoveDeployment { deployment_id } => { + self.apply_remove_deployment(deployment_id)?; + } + SchemasUpdateCommand::RemoveService { name, revision } => { + self.apply_remove_service(name, revision)?; + } + SchemasUpdateCommand::ModifyService { name, public } => { + self.apply_modify_service(name, public)?; + } + SchemasUpdateCommand::AddSubscription(sub) => { + self.apply_add_subscription(sub)?; + } + SchemasUpdateCommand::RemoveSubscription(sub_id) => { + self.apply_remove_subscription(sub_id)?; + } + } + } + + Ok(()) + } +} + #[derive(Debug, Clone)] pub(crate) struct MethodSchemas { descriptor: MethodDescriptor, diff --git a/crates/schema-impl/src/schemas_impl/subscription.rs b/crates/schema-impl/src/schemas_impl/subscription.rs index adbb20cf2..fac49371e 100644 --- a/crates/schema-impl/src/schemas_impl/subscription.rs +++ b/crates/schema-impl/src/schemas_impl/subscription.rs @@ -7,7 +7,7 @@ impl SchemasInner { source: Uri, sink: Uri, metadata: Option>, - validator: V, + validator: &V, ) -> Result<(Subscription, SchemasUpdateCommand), SchemasUpdateError> { // generate id if not provided let id = id.unwrap_or_default(); diff --git a/crates/storage-query-datafusion/src/context.rs b/crates/storage-query-datafusion/src/context.rs index 3c433eb08..0761f124c 100644 --- a/crates/storage-query-datafusion/src/context.rs +++ b/crates/storage-query-datafusion/src/context.rs @@ -59,14 +59,14 @@ impl QueryContext { // // build the state // - let mut state = SessionState::with_config_rt(session_config, runtime); + let mut state = SessionState::new_with_config_rt(session_config, runtime); state = state.add_analyzer_rule(Arc::new( analyzer::UseSymmetricHashJoinWhenPartitionKeyIsPresent::new(), )); state = state.add_physical_optimizer_rule(Arc::new(physical_optimizer::JoinRewrite::new())); - let ctx = SessionContext::with_state(state); + let ctx = SessionContext::new_with_state(state); Self { datafusion_context: ctx, diff --git a/crates/storage-query-datafusion/src/deployment/table.rs b/crates/storage-query-datafusion/src/deployment/table.rs index 87dd5f57e..8115739f6 100644 --- a/crates/storage-query-datafusion/src/deployment/table.rs +++ b/crates/storage-query-datafusion/src/deployment/table.rs @@ -59,6 +59,7 @@ impl RangeScanner let rows = self.0.get_deployments(); stream_builder.spawn(async move { for_each_state(schema, tx, rows).await; + Ok(()) }); stream_builder.build() } diff --git a/crates/storage-query-datafusion/src/generic_table.rs b/crates/storage-query-datafusion/src/generic_table.rs index 2faa5df25..2f075b226 100644 --- a/crates/storage-query-datafusion/src/generic_table.rs +++ b/crates/storage-query-datafusion/src/generic_table.rs @@ -18,7 +18,7 @@ use async_trait::async_trait; use datafusion::arrow::datatypes::SchemaRef; use crate::table_util::compute_ordering; -use datafusion::common::{DataFusionError, Statistics}; +use datafusion::common::DataFusionError; use datafusion::datasource::{TableProvider, TableType}; use datafusion::execution::context::{SessionState, TaskContext}; use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; @@ -128,11 +128,15 @@ impl ExecutionPlan for GenericTableExecutionPlan { fn with_new_children( self: Arc, - _: Vec>, + new_children: Vec>, ) -> Result, DataFusionError> { - Err(DataFusionError::Internal(format!( - "Children cannot be replaced in {self:?}" - ))) + if !new_children.is_empty() { + return Err(DataFusionError::Internal( + "GenericTableExecutionPlan does not support children".to_owned(), + )); + } + + Ok(self) } fn execute( @@ -144,10 +148,6 @@ impl ExecutionPlan for GenericTableExecutionPlan { let stream = self.scanner.scan(range, self.projected_schema.clone()); Ok(stream) } - - fn statistics(&self) -> Statistics { - Statistics::default() - } } impl DisplayAs for GenericTableExecutionPlan { diff --git a/crates/storage-query-datafusion/src/inbox/table.rs b/crates/storage-query-datafusion/src/inbox/table.rs index a4904104b..882ec1c77 100644 --- a/crates/storage-query-datafusion/src/inbox/table.rs +++ b/crates/storage-query-datafusion/src/inbox/table.rs @@ -57,6 +57,7 @@ impl RangeScanner for InboxScanner { let mut transaction = db.transaction(); let rows = transaction.all_inboxes(range); for_each_state(schema, tx, rows).await; + Ok(()) }; stream_builder.spawn(background_task); stream_builder.build() diff --git a/crates/storage-query-datafusion/src/invocation_state/table.rs b/crates/storage-query-datafusion/src/invocation_state/table.rs index 7f85fca9b..303161dc7 100644 --- a/crates/storage-query-datafusion/src/invocation_state/table.rs +++ b/crates/storage-query-datafusion/src/invocation_state/table.rs @@ -54,6 +54,7 @@ impl RangeScanner for S let background_task = async move { let rows = status.read_status(range).await; for_each_state(schema, tx, rows).await; + Ok(()) }; stream_builder.spawn(background_task); stream_builder.build() diff --git a/crates/storage-query-datafusion/src/journal/table.rs b/crates/storage-query-datafusion/src/journal/table.rs index bf9eaee81..a80bdb191 100644 --- a/crates/storage-query-datafusion/src/journal/table.rs +++ b/crates/storage-query-datafusion/src/journal/table.rs @@ -55,6 +55,7 @@ impl RangeScanner for JournalScanner { let background_task = move || { let rows = db.all_journal(range); for_each_journal(schema, tx, rows); + Ok(()) }; stream_builder.spawn_blocking(background_task); stream_builder.build() diff --git a/crates/storage-query-datafusion/src/physical_optimizer.rs b/crates/storage-query-datafusion/src/physical_optimizer.rs index 9f602742a..ced44cd94 100644 --- a/crates/storage-query-datafusion/src/physical_optimizer.rs +++ b/crates/storage-query-datafusion/src/physical_optimizer.rs @@ -51,6 +51,8 @@ impl PhysicalOptimizerRule for JoinRewrite { hash_join.filter().cloned(), hash_join.join_type(), hash_join.null_equals_null(), + None, + None, StreamJoinPartitionMode::Partitioned, ) else { return Ok(Transformed::No(plan)); diff --git a/crates/storage-query-datafusion/src/service/table.rs b/crates/storage-query-datafusion/src/service/table.rs index 8316b99ed..0176c7941 100644 --- a/crates/storage-query-datafusion/src/service/table.rs +++ b/crates/storage-query-datafusion/src/service/table.rs @@ -59,6 +59,7 @@ impl RangeScanner let rows = self.0.list_services(); stream_builder.spawn(async move { for_each_state(schema, tx, rows).await; + Ok(()) }); stream_builder.build() } diff --git a/crates/storage-query-datafusion/src/state/table.rs b/crates/storage-query-datafusion/src/state/table.rs index fd1f2ae3e..c87f289eb 100644 --- a/crates/storage-query-datafusion/src/state/table.rs +++ b/crates/storage-query-datafusion/src/state/table.rs @@ -54,6 +54,7 @@ impl RangeScanner for StateScanner { let background_task = move || { let rows = db.all_states(range); for_each_state(schema, tx, rows); + Ok(()) }; stream_builder.spawn_blocking(background_task); stream_builder.build() diff --git a/crates/storage-query-datafusion/src/status/table.rs b/crates/storage-query-datafusion/src/status/table.rs index 26c4ac11c..a06da0e32 100644 --- a/crates/storage-query-datafusion/src/status/table.rs +++ b/crates/storage-query-datafusion/src/status/table.rs @@ -55,6 +55,7 @@ impl RangeScanner for StatusScanner { let background_task = move || { let rows = db.all_status(range); for_each_status(schema, tx, rows); + Ok(()) }; stream_builder.spawn_blocking(background_task); stream_builder.build() diff --git a/crates/worker-api/src/lib.rs b/crates/worker-api/src/lib.rs index 43d8b2bda..4f28e8d5e 100644 --- a/crates/worker-api/src/lib.rs +++ b/crates/worker-api/src/lib.rs @@ -8,7 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use restate_schema_api::subscription::{Subscription, SubscriptionValidator}; +use restate_schema_api::subscription::Subscription; use restate_types::identifiers::SubscriptionId; use restate_types::invocation::InvocationTermination; use restate_types::state_mut::ExternalStateMutation; @@ -22,7 +22,7 @@ pub enum Error { // This is just an interface to isolate the interaction between meta and subscription controller. // Depending on how we evolve the Kafka ingress deployment, this might end up living in a separate process. -pub trait SubscriptionController: SubscriptionValidator { +pub trait SubscriptionController { fn start_subscription( &self, subscription: Subscription, @@ -31,11 +31,16 @@ pub trait SubscriptionController: SubscriptionValidator { &self, id: SubscriptionId, ) -> impl Future> + Send; -} -pub trait Handle: Clone { - type SubscriptionControllerHandle: SubscriptionController + Send + Sync; + /// Updates the subscription controller with the provided set of subscriptions. The subscription controller + /// is supposed to only run the set of provided subscriptions after this call succeeds. + fn update_subscriptions( + &self, + subscriptions: Vec, + ) -> impl Future> + Send; +} +pub trait Handle { /// Send a command to terminate an invocation. This command is best-effort. fn terminate_invocation( &self, @@ -47,6 +52,4 @@ pub trait Handle: Clone { &self, mutation: ExternalStateMutation, ) -> impl Future> + Send; - - fn subscription_controller_handle(&self) -> Self::SubscriptionControllerHandle; } diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index cf82c976a..d2c895479 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -80,10 +80,12 @@ pub use restate_storage_query_datafusion::{ OptionsBuilderError as StorageQueryDatafusionOptionsBuilderError, }; +pub use crate::subscription_integration::SubscriptionControllerHandle; pub use restate_storage_query_postgres::{ Options as StorageQueryPostgresOptions, OptionsBuilder as StorageQueryPostgresOptionsBuilder, OptionsBuilderError as StorageQueryPostgresOptionsBuilderError, }; +pub use services::WorkerCommandSender; type PartitionProcessorCommand = partition::StateMachineAckCommand; type ConsensusCommand = restate_consensus::Command; @@ -110,7 +112,7 @@ pub struct Options { storage_query_postgres: StorageQueryPostgresOptions, storage_rocksdb: RocksdbOptions, ingress_grpc: IngressOptions, - kafka: KafkaIngressOptions, + pub kafka: KafkaIngressOptions, invoker: InvokerOptions, partition_processor: partition::Options, @@ -398,10 +400,14 @@ impl Worker { ((peer_id, command_tx), processor) } - pub fn worker_command_tx(&self) -> impl restate_worker_api::Handle + Clone + Send + Sync { + pub fn worker_command_tx(&self) -> WorkerCommandSender { self.services.worker_command_tx() } + pub fn subscription_controller_handle(&self) -> SubscriptionControllerHandle { + self.services.subscription_controller_handler() + } + pub fn storage_query_context(&self) -> &QueryContext { &self.storage_query_context } diff --git a/crates/worker/src/services.rs b/crates/worker/src/services.rs index fa5956869..dde9260e2 100644 --- a/crates/worker/src/services.rs +++ b/crates/worker/src/services.rs @@ -8,9 +8,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use super::subscription_integration; - use crate::partition::{StateMachineAckCommand, StateMachineCommand}; +use crate::subscription_integration::SubscriptionControllerHandle; use restate_consensus::ProposalSender; use restate_network::PartitionTableError; use restate_types::identifiers::WithPartitionKey; @@ -30,24 +29,15 @@ enum WorkerCommand { #[derive(Debug, Clone)] pub struct WorkerCommandSender { command_tx: mpsc::Sender, - subscription_controller_handle: subscription_integration::SubscriptionControllerHandle, } impl WorkerCommandSender { - fn new( - command_tx: mpsc::Sender, - subscription_controller_handle: subscription_integration::SubscriptionControllerHandle, - ) -> Self { - Self { - command_tx, - subscription_controller_handle, - } + fn new(command_tx: mpsc::Sender) -> Self { + Self { command_tx } } } impl restate_worker_api::Handle for WorkerCommandSender { - type SubscriptionControllerHandle = subscription_integration::SubscriptionControllerHandle; - async fn terminate_invocation( &self, invocation_termination: InvocationTermination, @@ -67,10 +57,6 @@ impl restate_worker_api::Handle for WorkerCommandSender { .await .map_err(|_| restate_worker_api::Error::Unreachable) } - - fn subscription_controller_handle(&self) -> Self::SubscriptionControllerHandle { - self.subscription_controller_handle.clone() - } } #[derive(Debug, thiserror::Error)] @@ -88,6 +74,7 @@ pub(crate) struct Services { partition_table: PartitionTable, command_tx: WorkerCommandSender, + subscription_controller_handle: SubscriptionControllerHandle, } impl Services @@ -96,7 +83,7 @@ where { pub(crate) fn new( proposal_tx: ProposalSender>, - subscription_controller_handle: subscription_integration::SubscriptionControllerHandle, + subscription_controller_handle: SubscriptionControllerHandle, partition_table: PartitionTable, channel_size: usize, ) -> Self { @@ -104,7 +91,8 @@ where Self { command_rx, - command_tx: WorkerCommandSender::new(command_tx, subscription_controller_handle), + command_tx: WorkerCommandSender::new(command_tx), + subscription_controller_handle, proposal_tx, partition_table, } @@ -114,6 +102,10 @@ where self.command_tx.clone() } + pub(crate) fn subscription_controller_handler(&self) -> SubscriptionControllerHandle { + self.subscription_controller_handle.clone() + } + pub(crate) async fn run(self, shutdown_watch: drain::Watch) -> Result<(), Error> { let Self { mut command_rx, diff --git a/crates/worker/src/subscription_integration.rs b/crates/worker/src/subscription_integration.rs index 2b0d2c54a..b50ca1f8b 100644 --- a/crates/worker/src/subscription_integration.rs +++ b/crates/worker/src/subscription_integration.rs @@ -11,7 +11,7 @@ use restate_ingress_kafka::SubscriptionCommandSender; use restate_schema_api::subscription::{Subscription, SubscriptionValidator}; use restate_types::identifiers::SubscriptionId; -use restate_worker_api::SubscriptionController; +use restate_worker_api::{Error, SubscriptionController}; use std::ops::Deref; use std::sync::Arc; @@ -39,22 +39,28 @@ impl SubscriptionValidator for SubscriptionControllerHandle { } impl SubscriptionController for SubscriptionControllerHandle { - async fn start_subscription( - &self, - subscription: Subscription, - ) -> Result<(), restate_worker_api::Error> { + async fn start_subscription(&self, subscription: Subscription) -> Result<(), Error> { self.1 .send(restate_ingress_kafka::Command::StartSubscription( subscription, )) .await - .map_err(|_| restate_worker_api::Error::Unreachable) + .map_err(|_| Error::Unreachable) } - async fn stop_subscription(&self, id: SubscriptionId) -> Result<(), restate_worker_api::Error> { + async fn stop_subscription(&self, id: SubscriptionId) -> Result<(), Error> { self.1 .send(restate_ingress_kafka::Command::StopSubscription(id)) .await - .map_err(|_| restate_worker_api::Error::Unreachable) + .map_err(|_| Error::Unreachable) + } + + async fn update_subscriptions(&self, subscriptions: Vec) -> Result<(), Error> { + self.1 + .send(restate_ingress_kafka::Command::UpdateSubscriptions( + subscriptions, + )) + .await + .map_err(|_| Error::Unreachable) } } diff --git a/tools/xtask/Cargo.toml b/tools/xtask/Cargo.toml index a12bd5341..03b6caff6 100644 --- a/tools/xtask/Cargo.toml +++ b/tools/xtask/Cargo.toml @@ -11,6 +11,7 @@ publish = false restate-admin = { workspace = true, features = ["options_schema"] } restate-bifrost = { workspace = true, features = ["options_schema"] } restate-meta = { workspace = true } +restate-node-services = { workspace = true } restate-schema-api = { workspace = true, features = ["subscription"] } restate-server = { workspace = true, features = ["options_schema"] } restate-types = { workspace = true } @@ -23,3 +24,4 @@ schemars = { workspace = true } serde_json = { workspace = true } serde_yaml = "0.9" tokio = { workspace = true } +tonic = { workspace = true } diff --git a/tools/xtask/src/main.rs b/tools/xtask/src/main.rs index 4052b2819..b3d44fd17 100644 --- a/tools/xtask/src/main.rs +++ b/tools/xtask/src/main.rs @@ -10,6 +10,7 @@ use anyhow::bail; use reqwest::header::ACCEPT; +use restate_node_services::worker::worker_svc_client::WorkerSvcClient; use restate_schema_api::subscription::Subscription; use restate_types::identifiers::SubscriptionId; use restate_types::invocation::InvocationTermination; @@ -19,6 +20,7 @@ use restate_worker_api::Error; use schemars::gen::SchemaSettings; use std::env; use std::time::Duration; +use tonic::transport::{Channel, Uri}; fn generate_config_schema() -> anyhow::Result<()> { let schema = SchemaSettings::draft2019_09() @@ -41,8 +43,6 @@ fn generate_default_config() -> anyhow::Result<()> { struct Mock; impl restate_worker_api::Handle for Mock { - type SubscriptionControllerHandle = Mock; - async fn terminate_invocation( &self, _: InvocationTermination, @@ -53,10 +53,6 @@ impl restate_worker_api::Handle for Mock { async fn external_state_mutation(&self, _mutation: ExternalStateMutation) -> Result<(), Error> { Ok(()) } - - fn subscription_controller_handle(&self) -> Self::SubscriptionControllerHandle { - Mock - } } impl restate_worker_api::SubscriptionController for Mock { @@ -67,6 +63,13 @@ impl restate_worker_api::SubscriptionController for Mock { async fn stop_subscription(&self, _: SubscriptionId) -> Result<(), restate_worker_api::Error> { Ok(()) } + + async fn update_subscriptions( + &self, + _: Vec, + ) -> Result<(), restate_worker_api::Error> { + Ok(()) + } } impl restate_schema_api::subscription::SubscriptionValidator for Mock { @@ -78,22 +81,26 @@ impl restate_schema_api::subscription::SubscriptionValidator for Mock { } async fn generate_rest_api_doc() -> anyhow::Result<()> { - let bifrost_options = restate_bifrost::Options::default(); let admin_options = restate_admin::Options::default(); let meta_options = restate_meta::Options::default(); - let mut meta = meta_options.build().expect("expect to build meta service"); + let mut meta = meta_options + .build(Mock) + .expect("expect to build meta service"); let openapi_address = format!( "http://localhost:{}/openapi", admin_options.bind_address.port() ); - let bifrost_service = bifrost_options.build(1); let admin_service = - admin_options.build(meta.schemas(), meta.meta_handle(), bifrost_service.handle()); + admin_options.build(meta.schemas(), meta.meta_handle(), meta.schema_reader()); meta.init().await.unwrap(); // We start the Meta component, then download the openapi schema generated let (shutdown_signal, shutdown_watch) = drain::channel(); - let join_handle = tokio::spawn(admin_service.run(shutdown_watch, Mock, None)); + let join_handle = tokio::spawn(admin_service.run( + shutdown_watch, + Mock, + WorkerSvcClient::new(Channel::builder(Uri::default()).connect_lazy()), + )); let res = RetryPolicy::fixed_delay(Duration::from_millis(100), 20) .retry_operation(|| async {