Skip to content

Commit

Permalink
perf(rust): Replace the standard HashMap with hashbrown::HashMap to u…
Browse files Browse the repository at this point in the history
…tilize the Equivalent trait and avoid unnecessary symbol string copying.
  • Loading branch information
nkaz001 committed Oct 1, 2024
1 parent 579d26f commit 17b6979
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 62 deletions.
2 changes: 1 addition & 1 deletion connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ iceoryx2 = { version = "0.4.1", features = ["logger_tracing"] }
toml = "0.8.19"
tracing-subscriber = "0.3.18"
clap = { version = "4.5.15", features = ["derive"] }

hashbrown = "0.14.5"
53 changes: 26 additions & 27 deletions connector/src/binancefutures/ordermanager.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
use std::{
collections::{hash_map::Entry, HashMap},
sync::{Arc, Mutex},
};
use std::sync::{Arc, Mutex};

use chrono::Utc;
use hashbrown::{hash_map::Entry, HashMap};
use hftbacktest::types::{Order, OrderId, Status};
use tracing::{debug, error};

use crate::{
binancefutures::{msg::rest::OrderResponse, BinanceFuturesError},
utils::{gen_random_string, SymbolOrderId},
utils::{gen_random_string, RefSymbolOrderId, SymbolOrderId},
};

#[derive(Debug)]
struct OrderEx {
struct OrderExt {
symbol: String,
order: Order,
removed_by_ws: bool,
Expand All @@ -22,14 +20,16 @@ struct OrderEx {

pub type SharedOrderManager = Arc<Mutex<OrderManager>>;

const RAND_ID_LENGTH: usize = 8;

/// Binance has separated channels for REST APIs and Websocket. Order responses are delivered
/// through these channels, with no guaranteed order of transmission. To prevent duplicate handling
/// of order responses, such as order deletion due to cancellation or fill, OrderManager manages the
/// order states before transmitting the responses to a live bot.
#[derive(Default, Debug)]
pub struct OrderManager {
prefix: String,
orders: HashMap<String, OrderEx>,
orders: HashMap<String, OrderExt>,
order_id_map: HashMap<SymbolOrderId, String>,
}

Expand Down Expand Up @@ -60,7 +60,7 @@ impl OrderManager {
wrapper.removed_by_ws = true;
if !already_removed {
self.order_id_map
.remove(&SymbolOrderId::new(symbol, order.order_id));
.remove(&RefSymbolOrderId::new(&symbol, order.order_id));
}

if wrapper.removed_by_ws && wrapper.removed_by_rest {
Expand All @@ -84,7 +84,7 @@ impl OrderManager {
?order,
"BinanceFutures OrderManager received an unmanaged order from WS."
);
let wrapper = entry.insert(OrderEx {
let wrapper = entry.insert(OrderExt {
symbol: symbol.clone(),
order: order.clone(),
removed_by_ws: order.status != Status::New
Expand All @@ -93,7 +93,7 @@ impl OrderManager {
});
if wrapper.removed_by_ws || wrapper.removed_by_rest {
self.order_id_map
.remove(&SymbolOrderId::new(symbol, order.order_id));
.remove(&RefSymbolOrderId::new(&symbol, order.order_id));
}
Some(order)
}
Expand Down Expand Up @@ -230,7 +230,7 @@ impl OrderManager {
wrapper.removed_by_rest = true;
if !already_removed {
self.order_id_map
.remove(&SymbolOrderId::new(symbol.clone(), order.order_id));
.remove(&RefSymbolOrderId::new(&symbol, order.order_id));
}

if wrapper.removed_by_ws && wrapper.removed_by_rest {
Expand All @@ -254,16 +254,16 @@ impl OrderManager {
?order,
"BinanceFutures OrderManager received an unmanaged order from REST."
);
let wrapper = entry.insert(OrderEx {
symbol: symbol.clone(),
let order_ex = entry.insert(OrderExt {
symbol,
order: order.clone(),
removed_by_ws: false,
removed_by_rest: order.status != Status::New
&& order.status != Status::PartiallyFilled,
});
if wrapper.removed_by_ws || wrapper.removed_by_rest {
if order_ex.removed_by_ws || order_ex.removed_by_rest {
self.order_id_map
.remove(&SymbolOrderId::new(symbol, order.order_id));
.remove(&RefSymbolOrderId::new(&order_ex.symbol, order.order_id));
}
Some(order)
}
Expand All @@ -276,9 +276,9 @@ impl OrderManager {
return None;
}

let rand_id = gen_random_string(8);
let rand_id = gen_random_string(RAND_ID_LENGTH);

let client_order_id = format!("{}{}{}{}", self.prefix, symbol, &rand_id, order.order_id);
let client_order_id = format!("{}{}{}{}", self.prefix, &rand_id, symbol, order.order_id);
if self.orders.contains_key(&client_order_id) {
return None;
}
Expand All @@ -287,7 +287,7 @@ impl OrderManager {
.insert(symbol_order_id, client_order_id.clone());
self.orders.insert(
client_order_id.clone(),
OrderEx {
OrderExt {
symbol,
order,
removed_by_ws: false,
Expand All @@ -301,7 +301,7 @@ impl OrderManager {
if !client_order_id.starts_with(prefix) {
None
} else {
let s = &client_order_id[(prefix.len() + symbol.len() + 16)..];
let s = &client_order_id[(prefix.len() + RAND_ID_LENGTH + symbol.len())..];
if let Ok(order_id) = s.parse() {
Some(order_id)
} else {
Expand All @@ -312,7 +312,7 @@ impl OrderManager {

pub fn get_client_order_id(&self, symbol: &str, order_id: OrderId) -> Option<String> {
self.order_id_map
.get(&SymbolOrderId::new(symbol.to_string(), order_id))
.get(&RefSymbolOrderId::new(symbol, order_id))
.cloned()
}

Expand All @@ -337,7 +337,8 @@ impl OrderManager {
.collect();
for (client_order_id, order_id) in stale_ids.iter() {
if self.order_id_map.contains_key(order_id) {
// Something went wrong?
// todo: something went wrong?
self.order_id_map.remove(order_id).unwrap();
}
self.orders.remove(client_order_id);
}
Expand All @@ -346,7 +347,7 @@ impl OrderManager {
pub fn cancel_all_from_rest(&mut self, symbol: &str) -> Vec<Order> {
let mut removed_orders = Vec::new();
let mut removed_order_ids = Vec::new();
for (order_id, wrapper) in &mut self.orders {
for (client_order_id, wrapper) in &mut self.orders {
if wrapper.symbol != symbol {
continue;
}
Expand All @@ -357,17 +358,15 @@ impl OrderManager {
// todo: check if the exchange timestamp exists in the REST response.
wrapper.order.exch_timestamp = Utc::now().timestamp_nanos_opt().unwrap();
if !already_removed {
self.order_id_map.remove(&SymbolOrderId::new(
symbol.to_string(),
wrapper.order.order_id,
));
self.order_id_map
.remove(&RefSymbolOrderId::new(symbol, wrapper.order.order_id));
removed_orders.push(wrapper.order.clone());
}

// Completely deletes the order if it is removed by both the REST response and the
// WebSocket stream.
if wrapper.removed_by_ws && wrapper.removed_by_rest {
removed_order_ids.push(order_id.clone());
removed_order_ids.push(client_order_id.clone());
}
}

Expand Down
42 changes: 18 additions & 24 deletions connector/src/bybit/ordermanager.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use std::{
collections::{hash_map::Entry, HashMap},
sync::{Arc, Mutex},
};
use std::sync::{Arc, Mutex};

use hashbrown::{hash_map::Entry, HashMap};
use hftbacktest::{
prelude::get_precision,
types::{OrdType, Order, OrderId, Side, Status, TimeInForce},
Expand All @@ -13,21 +11,21 @@ use crate::{
msg::{Execution, FastExecution, Order as BybitOrder, PrivateOrder},
BybitError,
},
utils::{gen_random_string, SymbolOrderId},
utils::{gen_random_string, RefSymbolOrderId, SymbolOrderId},
};

pub type SharedOrderManager = Arc<Mutex<OrderManager>>;

#[derive(Clone)]
pub struct OrderEx {
pub struct OrderExt {
pub symbol: String,
pub order_link_id: String,
pub order: Order,
}

pub struct OrderManager {
prefix: String,
orders: HashMap<String, OrderEx>,
orders: HashMap<String, OrderExt>,
order_id_map: HashMap<SymbolOrderId, String>,
}

Expand All @@ -40,7 +38,7 @@ impl OrderManager {
}
}

pub fn update_order(&mut self, data: &PrivateOrder) -> Result<OrderEx, BybitError> {
pub fn update_order(&mut self, data: &PrivateOrder) -> Result<OrderExt, BybitError> {
let order = self
.orders
.get_mut(&data.order_link_id)
Expand All @@ -50,17 +48,15 @@ impl OrderManager {
order.order.exch_timestamp = data.updated_time * 1_000_000;
let is_active = order.order.active();
if !is_active {
self.order_id_map.remove(&SymbolOrderId::new(
order.symbol.clone(),
order.order.order_id,
));
self.order_id_map
.remove(&RefSymbolOrderId::new(&order.symbol, order.order.order_id));
Ok(self.orders.remove(&data.order_link_id).unwrap())
} else {
Ok(order.clone())
}
}

pub fn update_execution(&mut self, data: &Execution) -> Result<OrderEx, BybitError> {
pub fn update_execution(&mut self, data: &Execution) -> Result<OrderExt, BybitError> {
let order_info = self
.orders
.get_mut(&data.order_link_id)
Expand All @@ -72,7 +68,7 @@ impl OrderManager {
Ok(order_info.clone())
}

pub fn update_fast_execution(&mut self, data: &FastExecution) -> Result<OrderEx, BybitError> {
pub fn update_fast_execution(&mut self, data: &FastExecution) -> Result<OrderExt, BybitError> {
// fixme: there is no valid order_link_id.
let order_info = self
.orders
Expand Down Expand Up @@ -135,7 +131,7 @@ impl OrderManager {
return Err(BybitError::OrderAlreadyExist);
}
Entry::Vacant(entry) => {
entry.insert(OrderEx {
entry.insert(OrderExt {
symbol: symbol.to_string(),
order_link_id: bybit_order.order_link_id.clone(),
order,
Expand All @@ -153,7 +149,7 @@ impl OrderManager {
) -> Result<BybitOrder, BybitError> {
let order_link_id = self
.order_id_map
.get(&SymbolOrderId::new(symbol.to_string(), order_id))
.get(&RefSymbolOrderId::new(symbol, order_id))
.ok_or(BybitError::OrderNotFound)?;
let order = BybitOrder {
symbol: symbol.to_string(),
Expand All @@ -168,21 +164,19 @@ impl OrderManager {
Ok(order)
}

pub fn update_submit_fail(&mut self, order_link_id: &str) -> Result<OrderEx, BybitError> {
pub fn update_submit_fail(&mut self, order_link_id: &str) -> Result<OrderExt, BybitError> {
let mut order = self
.orders
.remove(order_link_id)
.ok_or(BybitError::OrderNotFound)?;
order.order.req = Status::None;
order.order.status = Status::Expired;
self.order_id_map.remove(&SymbolOrderId::new(
order.symbol.clone(),
order.order.order_id,
));
self.order_id_map
.remove(&RefSymbolOrderId::new(&order.symbol, order.order.order_id));
Ok(order)
}

pub fn update_cancel_fail(&mut self, order_link_id: &str) -> Result<OrderEx, BybitError> {
pub fn update_cancel_fail(&mut self, order_link_id: &str) -> Result<OrderExt, BybitError> {
let mut order_info = self
.orders
.get_mut(order_link_id)
Expand All @@ -204,8 +198,8 @@ impl OrderManager {
let mut removed_orders = Vec::new();
for order_id in removed_order_ids {
let removed_order = self.orders.remove(&order_id).unwrap();
self.order_id_map.remove(&SymbolOrderId::new(
removed_order.symbol.clone(),
self.order_id_map.remove(&RefSymbolOrderId::new(
&removed_order.symbol,
removed_order.order.order_id,
));
removed_orders.push(removed_order.order);
Expand Down
8 changes: 4 additions & 4 deletions connector/src/bybit/private_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tracing::{debug, error, warn};
use crate::{
bybit::{
msg::{Op, PrivateStreamMsg, PrivateStreamTopicMsg},
ordermanager::{OrderEx, SharedOrderManager},
ordermanager::{OrderExt, SharedOrderManager},
rest::BybitClient,
BybitError,
SharedInstrumentMap,
Expand Down Expand Up @@ -139,7 +139,7 @@ impl PrivateStream {
let mut order_man_ = self.order_manager.lock().unwrap();
for item in &data.data {
match order_man_.update_execution(item) {
Ok(OrderEx {
Ok(OrderExt {
symbol: asset,
order_link_id: _,
order,
Expand All @@ -162,7 +162,7 @@ impl PrivateStream {
let mut order_man_ = self.order_manager.lock().unwrap();
for item in &data.data {
match order_man_.update_fast_execution(item) {
Ok(OrderEx {
Ok(OrderExt {
symbol: asset,
order_link_id: _,
order,
Expand All @@ -185,7 +185,7 @@ impl PrivateStream {
for item in &data.data {
let mut order_man_ = self.order_manager.lock().unwrap();
match order_man_.update_order(item) {
Ok(OrderEx {
Ok(OrderExt {
symbol: asset,
order_link_id: _,
order,
Expand Down
6 changes: 3 additions & 3 deletions connector/src/bybit/trade_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use tracing::{error, info};
use crate::{
bybit::{
msg::{Op, Order, TradeOp, TradeStreamMsg},
ordermanager::{OrderEx, SharedOrderManager},
ordermanager::{OrderExt, SharedOrderManager},
BybitError,
},
connector::PublishMessage,
Expand Down Expand Up @@ -185,7 +185,7 @@ impl TradeStream {
*/
let mut order_man_ = self.order_manager.lock().unwrap();
let order_link_id = req_id.split('/').next().ok_or(BybitError::InvalidReqId)?;
let OrderEx {
let OrderExt {
symbol,
order_link_id: _,
order,
Expand Down Expand Up @@ -221,7 +221,7 @@ impl TradeStream {
*/
let mut order_man_ = self.order_manager.lock().unwrap();
let order_link_id = req_id.split('/').next().ok_or(BybitError::InvalidReqId)?;
let OrderEx {
let OrderExt {
symbol,
order_link_id: _,
order,
Expand Down
Loading

0 comments on commit 17b6979

Please sign in to comment.