Skip to content

Commit

Permalink
refactor(rust): streamline the batch mode logic and its transmission.
Browse files Browse the repository at this point in the history
  • Loading branch information
nkaz001 committed Oct 6, 2024
1 parent edb35dd commit a17329f
Show file tree
Hide file tree
Showing 15 changed files with 233 additions and 246 deletions.
25 changes: 14 additions & 11 deletions connector/src/binancefutures/market_data_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ use crate::{
rest::BinanceFuturesClient,
BinanceFuturesError,
},
connector::PublishMessage,
connector::PublishEvent,
utils::{generate_rand_string, parse_depth, parse_px_qty_tup},
};

pub struct MarketDataStream {
client: BinanceFuturesClient,
ev_tx: UnboundedSender<PublishMessage>,
ev_tx: UnboundedSender<PublishEvent>,
symbol_rx: Receiver<String>,
pending_depth_messages: HashMap<String, Vec<stream::Depth>>,
prev_u: HashMap<String, i64>,
Expand All @@ -43,7 +43,7 @@ pub struct MarketDataStream {
impl MarketDataStream {
pub fn new(
client: BinanceFuturesClient,
ev_tx: UnboundedSender<PublishMessage>,
ev_tx: UnboundedSender<PublishEvent>,
symbol_rx: Receiver<String>,
) -> Self {
let (rest_tx, rest_rx) = unbounded_channel::<(String, rest::Depth)>();
Expand Down Expand Up @@ -100,10 +100,11 @@ impl MarketDataStream {

match parse_depth(data.bids, data.asks) {
Ok((bids, asks)) => {
// todo: It should be handled as a batch feed.
self.ev_tx.send(PublishEvent::BatchStart(TO_ALL)).unwrap();

for (px, qty) in bids {
self.ev_tx
.send(PublishMessage::BatchLiveEvent(LiveEvent::Feed {
.send(PublishEvent::LiveEvent(LiveEvent::Feed {
symbol: data.symbol.clone(),
event: Event {
ev: LOCAL_BID_DEPTH_EVENT,
Expand All @@ -121,7 +122,7 @@ impl MarketDataStream {

for (px, qty) in asks {
self.ev_tx
.send(PublishMessage::BatchLiveEvent(LiveEvent::Feed {
.send(PublishEvent::LiveEvent(LiveEvent::Feed {
symbol: data.symbol.clone(),
event: Event {
ev: LOCAL_ASK_DEPTH_EVENT,
Expand All @@ -137,7 +138,7 @@ impl MarketDataStream {
.unwrap();
}

self.ev_tx.send(PublishMessage::EndOfBatch(TO_ALL)).unwrap();
self.ev_tx.send(PublishEvent::BatchEnd(TO_ALL)).unwrap();
}
Err(error) => {
error!(?error, "Couldn't parse DepthUpdate stream.");
Expand All @@ -147,7 +148,7 @@ impl MarketDataStream {
EventStream::Trade(data) => match parse_px_qty_tup(data.price, data.qty) {
Ok((px, qty)) => {
self.ev_tx
.send(PublishMessage::LiveEvent(LiveEvent::Feed {
.send(PublishEvent::LiveEvent(LiveEvent::Feed {
symbol: data.symbol,
event: Event {
ev: {
Expand Down Expand Up @@ -179,9 +180,11 @@ impl MarketDataStream {
fn process_snapshot(&self, symbol: String, data: rest::Depth) {
match parse_depth(data.bids, data.asks) {
Ok((bids, asks)) => {
self.ev_tx.send(PublishEvent::BatchStart(TO_ALL)).unwrap();

for (px, qty) in bids {
self.ev_tx
.send(PublishMessage::BatchLiveEvent(LiveEvent::Feed {
.send(PublishEvent::LiveEvent(LiveEvent::Feed {
symbol: symbol.clone(),
event: Event {
ev: LOCAL_BID_DEPTH_EVENT,
Expand All @@ -199,7 +202,7 @@ impl MarketDataStream {

for (px, qty) in asks {
self.ev_tx
.send(PublishMessage::BatchLiveEvent(LiveEvent::Feed {
.send(PublishEvent::LiveEvent(LiveEvent::Feed {
symbol: symbol.clone(),
event: Event {
ev: LOCAL_ASK_DEPTH_EVENT,
Expand All @@ -215,7 +218,7 @@ impl MarketDataStream {
.unwrap();
}

self.ev_tx.send(PublishMessage::EndOfBatch(TO_ALL)).unwrap();
self.ev_tx.send(PublishEvent::BatchEnd(TO_ALL)).unwrap();
}
Err(error) => {
error!(?error, "Couldn't parse Depth response.");
Expand Down
82 changes: 34 additions & 48 deletions connector/src/binancefutures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
ordermanager::{OrderManager, SharedOrderManager},
rest::BinanceFuturesClient,
},
connector::{Connector, ConnectorBuilder, PublishMessage},
connector::{Connector, ConnectorBuilder, GetOrders, PublishEvent},
utils::{ExponentialBackoff, Retry},
};

Expand Down Expand Up @@ -101,7 +101,7 @@ pub struct BinanceFutures {
}

impl BinanceFutures {
pub fn connect_market_data_stream(&mut self, ev_tx: UnboundedSender<PublishMessage>) {
pub fn connect_market_data_stream(&mut self, ev_tx: UnboundedSender<PublishEvent>) {
let base_url = self.config.stream_url.clone();
let client = self.client.clone();
let symbol_tx = self.symbol_tx.clone();
Expand All @@ -114,9 +114,10 @@ impl BinanceFutures {
"An error occurred in the market data stream connection."
);
ev_tx
.send(PublishMessage::LiveEvent(LiveEvent::Error(
LiveError::with(ErrorKind::ConnectionInterrupted, error.into()),
)))
.send(PublishEvent::LiveEvent(LiveEvent::Error(LiveError::with(
ErrorKind::ConnectionInterrupted,
error.into(),
))))
.unwrap();
Ok(())
})
Expand All @@ -133,7 +134,7 @@ impl BinanceFutures {
});
}

pub fn connect_user_data_stream(&self, ev_tx: UnboundedSender<PublishMessage>) {
pub fn connect_user_data_stream(&self, ev_tx: UnboundedSender<PublishEvent>) {
let base_url = self.config.stream_url.clone();
let client = self.client.clone();
let order_manager = self.order_manager.clone();
Expand All @@ -147,9 +148,10 @@ impl BinanceFutures {
"An error occurred in the user data stream connection."
);
ev_tx
.send(PublishMessage::LiveEvent(LiveEvent::Error(
LiveError::with(ErrorKind::ConnectionInterrupted, error.into()),
)))
.send(PublishEvent::LiveEvent(LiveEvent::Error(LiveError::with(
ErrorKind::ConnectionInterrupted,
error.into(),
))))
.unwrap();
Ok(())
})
Expand Down Expand Up @@ -199,44 +201,29 @@ impl ConnectorBuilder for BinanceFutures {
}

impl Connector for BinanceFutures {
fn add(&mut self, symbol: String, id: u64, ev_tx: UnboundedSender<PublishMessage>) {
fn add(&mut self, symbol: String) {
// Binance futures symbols must be lowercase to subscribe to the WebSocket stream.
let symbol = symbol.to_lowercase();
let mut symbols = self.symbols.lock().unwrap();
if symbols.contains(&symbol) {
let order_manager = self.order_manager.lock().unwrap();
let orders = order_manager.get_orders(&symbol);

ev_tx
.send(PublishMessage::LiveEventsWithId {
id,
events: orders
.into_iter()
.map(|order| LiveEvent::Order {
symbol: symbol.clone(),
order,
})
.collect(),
})
.unwrap();
ev_tx.send(PublishMessage::EndOfBatch(id)).unwrap();
} else {
ev_tx.send(PublishMessage::EndOfBatch(id)).unwrap();

if !symbols.contains(&symbol) {
symbols.insert(symbol.clone());
self.symbol_tx.send(symbol).unwrap();
}
}

fn run(&mut self, ev_tx: UnboundedSender<PublishMessage>) {
fn order_manager(&self) -> Arc<Mutex<dyn GetOrders + Send + 'static>> {
self.order_manager.clone()
}

fn run(&mut self, ev_tx: UnboundedSender<PublishEvent>) {
self.connect_market_data_stream(ev_tx.clone());
// Connects to the user stream only if the API key and secret are provided.
if !self.config.api_key.is_empty() && !self.config.secret.is_empty() {
self.connect_user_data_stream(ev_tx.clone());
}
}

fn submit(&self, symbol: String, mut order: Order, tx: UnboundedSender<PublishMessage>) {
fn submit(&self, symbol: String, mut order: Order, tx: UnboundedSender<PublishEvent>) {
let client = self.client.clone();
let order_manager = self.order_manager.clone();

Expand Down Expand Up @@ -267,7 +254,7 @@ impl Connector for BinanceFutures {
.unwrap()
.update_from_rest(&client_order_id, &resp)
{
tx.send(PublishMessage::LiveEvent(LiveEvent::Order {
tx.send(PublishEvent::LiveEvent(LiveEvent::Order {
symbol,
order,
}))
Expand All @@ -280,16 +267,17 @@ impl Connector for BinanceFutures {
.unwrap()
.update_submit_fail(&client_order_id, &error)
{
tx.send(PublishMessage::LiveEvent(LiveEvent::Order {
tx.send(PublishEvent::LiveEvent(LiveEvent::Order {
symbol,
order,
}))
.unwrap();
}

tx.send(PublishMessage::LiveEvent(LiveEvent::Error(
LiveError::with(ErrorKind::OrderError, error.into()),
)))
tx.send(PublishEvent::LiveEvent(LiveEvent::Error(LiveError::with(
ErrorKind::OrderError,
error.into(),
))))
.unwrap();
}
}
Expand All @@ -302,17 +290,14 @@ impl Connector for BinanceFutures {
);
order.req = Status::None;
order.status = Status::Expired;
tx.send(PublishMessage::LiveEvent(LiveEvent::Order {
symbol,
order,
}))
.unwrap();
tx.send(PublishEvent::LiveEvent(LiveEvent::Order { symbol, order }))
.unwrap();
}
}
});
}

fn cancel(&self, symbol: String, order: Order, tx: UnboundedSender<PublishMessage>) {
fn cancel(&self, symbol: String, order: Order, tx: UnboundedSender<PublishEvent>) {
let client = self.client.clone();
let order_manager = self.order_manager.clone();

Expand All @@ -332,7 +317,7 @@ impl Connector for BinanceFutures {
.unwrap()
.update_from_rest(&client_order_id, &resp)
{
tx.send(PublishMessage::LiveEvent(LiveEvent::Order {
tx.send(PublishEvent::LiveEvent(LiveEvent::Order {
symbol,
order,
}))
Expand All @@ -345,16 +330,17 @@ impl Connector for BinanceFutures {
.unwrap()
.update_cancel_fail(&client_order_id, &error)
{
tx.send(PublishMessage::LiveEvent(LiveEvent::Order {
tx.send(PublishEvent::LiveEvent(LiveEvent::Order {
symbol,
order,
}))
.unwrap();
}

tx.send(PublishMessage::LiveEvent(LiveEvent::Error(
LiveError::with(ErrorKind::OrderError, error.into()),
)))
tx.send(PublishEvent::LiveEvent(LiveEvent::Error(LiveError::with(
ErrorKind::OrderError,
error.into(),
))))
.unwrap();
}
}
Expand Down
9 changes: 7 additions & 2 deletions connector/src/binancefutures/ordermanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
msg::{rest::OrderResponse, stream::OrderTradeUpdate},
BinanceFuturesError,
},
connector::GetOrders,
utils::{generate_rand_string, RefSymbolOrderId, SymbolOrderId},
};

Expand Down Expand Up @@ -332,11 +333,15 @@ impl OrderManager {
}
removed_orders
}
}

pub fn get_orders(&self, symbol: &str) -> Vec<Order> {
impl GetOrders for OrderManager {
fn get_orders(&self, symbol: Option<String>) -> Vec<Order> {
self.orders
.iter()
.filter(|(_, order)| order.symbol == symbol)
.filter(|(_, order)| {
symbol.as_ref().map(|s| order.symbol == *s).unwrap_or(true) && order.order.active()
})
.map(|(_, order)| &order.order)
.cloned()
.collect()
Expand Down
16 changes: 8 additions & 8 deletions connector/src/binancefutures/user_data_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,20 @@ use crate::{
BinanceFuturesError,
SharedSymbolSet,
},
connector::PublishMessage,
connector::PublishEvent,
};

pub struct UserDataStream {
symbols: SharedSymbolSet,
client: BinanceFuturesClient,
ev_tx: UnboundedSender<PublishMessage>,
ev_tx: UnboundedSender<PublishEvent>,
order_manager: SharedOrderManager,
}

impl UserDataStream {
pub fn new(
client: BinanceFuturesClient,
ev_tx: UnboundedSender<PublishMessage>,
ev_tx: UnboundedSender<PublishEvent>,
order_manager: SharedOrderManager,
symbols: SharedSymbolSet,
) -> Self {
Expand All @@ -52,7 +52,7 @@ impl UserDataStream {
for mut order in canceled_orders {
order.status = Status::Canceled;
self.ev_tx
.send(PublishMessage::LiveEvent(LiveEvent::Order {
.send(PublishEvent::LiveEvent(LiveEvent::Order {
symbol: symbol.clone(),
order,
}))
Expand All @@ -68,15 +68,15 @@ impl UserDataStream {
position_information.into_iter().for_each(|position| {
symbols.remove(&position.symbol);
self.ev_tx
.send(PublishMessage::LiveEvent(LiveEvent::Position {
.send(PublishEvent::LiveEvent(LiveEvent::Position {
symbol: position.symbol,
qty: position.position_amount,
}))
.unwrap();
});
for symbol in symbols {
self.ev_tx
.send(PublishMessage::LiveEvent(LiveEvent::Position {
.send(PublishEvent::LiveEvent(LiveEvent::Position {
symbol,
qty: 0.0,
}))
Expand All @@ -98,7 +98,7 @@ impl UserDataStream {
EventStream::AccountUpdate(data) => {
for position in data.account.position {
self.ev_tx
.send(PublishMessage::LiveEvent(LiveEvent::Position {
.send(PublishEvent::LiveEvent(LiveEvent::Position {
symbol: position.symbol,
qty: position.position_amount,
}))
Expand All @@ -110,7 +110,7 @@ impl UserDataStream {
match result {
Ok(Some(order)) => {
self.ev_tx
.send(PublishMessage::LiveEvent(LiveEvent::Order {
.send(PublishEvent::LiveEvent(LiveEvent::Order {
symbol: data.order.symbol,
order,
}))
Expand Down
Loading

0 comments on commit a17329f

Please sign in to comment.