From 57922fb2f03dbaaac8b14595df7108906be47db2 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Tue, 18 Jul 2023 17:44:13 -0400 Subject: [PATCH] geyser: reduce the amount of locks --- CHANGELOG.md | 8 ++++++++ Cargo.lock | 2 +- Cargo.toml | 2 +- yellowstone-grpc-geyser/Cargo.toml | 2 +- yellowstone-grpc-geyser/src/plugin.rs | 26 ++++++++++++++------------ 5 files changed, 25 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 81f35633..eab42406 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,14 @@ The minor version will be incremented upon a breaking change and the patch versi ### Breaking +## 2023-07-18 + +- yellowstone-grpc-geyser-1.4.0+solana.1.16.1 + +### Features + +- geyser: reduce the amount of locks ([#161](https://github.com/rpcpool/yellowstone-grpc/pull/161)). + ## 2023-07-17 - @triton-one/yellowstone-grpc:0.1.5 diff --git a/Cargo.lock b/Cargo.lock index 8760e675..f85a1030 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4293,7 +4293,7 @@ dependencies = [ [[package]] name = "yellowstone-grpc-geyser" -version = "1.3.0+solana.1.16.1" +version = "1.4.0+solana.1.16.1" dependencies = [ "anyhow", "base64 0.21.2", diff --git a/Cargo.toml b/Cargo.toml index d92df97e..3bbcae9f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ members = [ "examples/rust", # 1.7.0+solana.1.16.1 "yellowstone-grpc-client", # 1.7.0+solana.1.16.1 - "yellowstone-grpc-geyser", # 1.3.0+solana.1.16.1 + "yellowstone-grpc-geyser", # 1.4.0+solana.1.16.1 "yellowstone-grpc-proto", # 1.7.0+solana.1.16.1 ] diff --git a/yellowstone-grpc-geyser/Cargo.toml b/yellowstone-grpc-geyser/Cargo.toml index 923442af..5c150426 100644 --- a/yellowstone-grpc-geyser/Cargo.toml +++ b/yellowstone-grpc-geyser/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-geyser" -version = "1.3.0+solana.1.16.1" +version = "1.4.0+solana.1.16.1" authors = ["Triton One"] edition = "2021" description = "Yellowstone gRPC Geyser Plugin" diff --git a/yellowstone-grpc-geyser/src/plugin.rs b/yellowstone-grpc-geyser/src/plugin.rs index b98d87e6..77b0c607 100644 --- a/yellowstone-grpc-geyser/src/plugin.rs +++ b/yellowstone-grpc-geyser/src/plugin.rs @@ -9,7 +9,7 @@ use { ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus, }, std::{ - sync::atomic::{AtomicBool, Ordering}, + sync::atomic::{AtomicU8, Ordering}, time::Duration, }, tokio::{ @@ -18,11 +18,13 @@ use { }, }; +const STARTUP_END_OF_RECEIVED: u8 = 1 << 0; +const STARTUP_PROCESSED_RECEIVED: u8 = 1 << 1; + #[derive(Debug)] pub struct PluginInner { runtime: Runtime, - startup_received: AtomicBool, - startup_processed_received: AtomicBool, + startup_status: AtomicU8, grpc_channel: mpsc::UnboundedSender, grpc_shutdown_tx: oneshot::Sender<()>, prometheus: PrometheusService, @@ -48,8 +50,8 @@ impl Plugin { { // Before processed slot after end of startup message we will fail to construct full block let inner = self.inner.as_ref().expect("initialized"); - if inner.startup_received.load(Ordering::SeqCst) - && inner.startup_processed_received.load(Ordering::SeqCst) + if inner.startup_status.load(Ordering::SeqCst) + == STARTUP_END_OF_RECEIVED | STARTUP_PROCESSED_RECEIVED { f(inner) } else { @@ -82,8 +84,7 @@ impl GeyserPlugin for Plugin { self.inner = Some(PluginInner { runtime, - startup_received: AtomicBool::new(false), - startup_processed_received: AtomicBool::new(false), + startup_status: AtomicU8::new(0), grpc_channel, grpc_shutdown_tx, prometheus, @@ -103,7 +104,9 @@ impl GeyserPlugin for Plugin { fn notify_end_of_startup(&self) -> PluginResult<()> { let inner = self.inner.as_ref().expect("initialized"); - inner.startup_received.store(true, Ordering::SeqCst); + inner + .startup_status + .fetch_or(STARTUP_END_OF_RECEIVED, Ordering::SeqCst); Ok(()) } @@ -137,13 +140,12 @@ impl GeyserPlugin for Plugin { status: SlotStatus, ) -> PluginResult<()> { let inner = self.inner.as_ref().expect("initialized"); - if inner.startup_received.load(Ordering::SeqCst) - && !inner.startup_processed_received.load(Ordering::SeqCst) + if inner.startup_status.load(Ordering::SeqCst) == STARTUP_END_OF_RECEIVED && status == SlotStatus::Processed { inner - .startup_processed_received - .store(true, Ordering::SeqCst); + .startup_status + .fetch_or(STARTUP_PROCESSED_RECEIVED, Ordering::SeqCst); } self.with_inner(|inner| {