Skip to content

Commit

Permalink
fix(rust): improve batch feed handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
nkaz001 committed Oct 3, 2024
1 parent d2a2ea5 commit 4477360
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 21 deletions.
14 changes: 9 additions & 5 deletions connector/src/binancefutures/market_data_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::HashMap;

use chrono::Utc;
use futures_util::{SinkExt, StreamExt};
use hftbacktest::prelude::*;
use hftbacktest::{live::ipc::TO_ALL, prelude::*};
use tokio::{
select,
sync::{
Expand Down Expand Up @@ -103,7 +103,7 @@ impl MarketDataStream {
// todo: It should be handled as a batch feed.
for (px, qty) in bids {
self.ev_tx
.send(PublishMessage::LiveEvent(LiveEvent::Feed {
.send(PublishMessage::BatchLiveEvent(LiveEvent::Feed {
symbol: data.symbol.clone(),
event: Event {
ev: LOCAL_BID_DEPTH_EVENT,
Expand All @@ -121,7 +121,7 @@ impl MarketDataStream {

for (px, qty) in asks {
self.ev_tx
.send(PublishMessage::LiveEvent(LiveEvent::Feed {
.send(PublishMessage::BatchLiveEvent(LiveEvent::Feed {
symbol: data.symbol.clone(),
event: Event {
ev: LOCAL_ASK_DEPTH_EVENT,
Expand All @@ -136,6 +136,8 @@ impl MarketDataStream {
}))
.unwrap();
}

self.ev_tx.send(PublishMessage::EndOfBatch(TO_ALL)).unwrap();
}
Err(error) => {
error!(?error, "Couldn't parse DepthUpdate stream.");
Expand Down Expand Up @@ -179,7 +181,7 @@ impl MarketDataStream {
Ok((bids, asks)) => {
for (px, qty) in bids {
self.ev_tx
.send(PublishMessage::LiveEvent(LiveEvent::Feed {
.send(PublishMessage::BatchLiveEvent(LiveEvent::Feed {
symbol: symbol.clone(),
event: Event {
ev: LOCAL_BID_DEPTH_EVENT,
Expand All @@ -197,7 +199,7 @@ impl MarketDataStream {

for (px, qty) in asks {
self.ev_tx
.send(PublishMessage::LiveEvent(LiveEvent::Feed {
.send(PublishMessage::BatchLiveEvent(LiveEvent::Feed {
symbol: symbol.clone(),
event: Event {
ev: LOCAL_ASK_DEPTH_EVENT,
Expand All @@ -212,6 +214,8 @@ impl MarketDataStream {
}))
.unwrap();
}

self.ev_tx.send(PublishMessage::EndOfBatch(TO_ALL)).unwrap();
}
Err(error) => {
error!(?error, "Couldn't parse Depth response.");
Expand Down
11 changes: 3 additions & 8 deletions connector/src/binancefutures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use serde::Deserialize;
use thiserror::Error;
use tokio::sync::{broadcast, broadcast::Sender, mpsc::UnboundedSender};
use tokio_tungstenite::tungstenite;
use tracing::{debug, error, warn};
use tracing::{error, warn};

use crate::{
binancefutures::{
Expand Down Expand Up @@ -219,14 +219,9 @@ impl Connector for BinanceFutures {
.collect(),
})
.unwrap();
ev_tx.send(PublishMessage::EndOfBatch(id)).unwrap();
} else {
// Sends the empty LiveEventsWithId to notify the end of batch.
ev_tx
.send(PublishMessage::LiveEventsWithId {
id,
events: Vec::with_capacity(0),
})
.unwrap();
ev_tx.send(PublishMessage::EndOfBatch(id)).unwrap();

symbols.insert(symbol.clone());
self.symbol_tx.send(symbol).unwrap();
Expand Down
9 changes: 2 additions & 7 deletions connector/src/bybit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,14 +290,9 @@ impl Connector for Bybit {
.collect(),
})
.unwrap();
ev_tx.send(PublishMessage::EndOfBatch(id)).unwrap();
} else {
// Sends the empty LiveEventsWithId to notify the end of batch.
ev_tx
.send(PublishMessage::LiveEventsWithId {
id,
events: Vec::with_capacity(0),
})
.unwrap();
ev_tx.send(PublishMessage::EndOfBatch(id)).unwrap();

symbols.insert(symbol.clone());
self.symbol_tx.send(symbol).unwrap();
Expand Down
2 changes: 2 additions & 0 deletions connector/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use tokio::sync::mpsc::UnboundedSender;
/// A message will be received by the publisher thread and then published to the bots.
pub enum PublishMessage {
LiveEvent(LiveEvent),
BatchLiveEvent(LiveEvent),
EndOfBatch(u64),
LiveEventsWithId {
id: u64,
events: Vec<LiveEvent>,
Expand Down
8 changes: 8 additions & 0 deletions connector/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,20 @@ async fn run_publish_task(
bot_tx.send(TO_ALL, &LiveEventExt::Normal(ev))?;
}
}
PublishMessage::BatchLiveEvent(ev) => {
// The live event will only be published if the result is true.
if handle_ev(&ev, &mut depth, &mut position) {
bot_tx.send(TO_ALL, &LiveEventExt::Batch(ev))?;
}
}
PublishMessage::LiveEventsWithId { id, events } => {
// This occurs when an order or position snapshot needs to be published by adding
// the instrument.
for ev in events {
bot_tx.send(id, &LiveEventExt::Batch(ev))?;
}
}
PublishMessage::EndOfBatch(id) => {
bot_tx.send(id, &LiveEventExt::EndOfBatch)?;
}
}
Expand Down
2 changes: 1 addition & 1 deletion hftbacktest/src/live/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::Duration;
pub use bot::{BotError, LiveBot, LiveBotBuilder};
pub use recorder::LoggingRecorder;

use crate::{live::ipc::LiveEventExt, prelude::Request, types::LiveEvent};
use crate::{live::ipc::LiveEventExt, prelude::Request};

mod bot;
pub mod ipc;
Expand Down

0 comments on commit 4477360

Please sign in to comment.