From c4b2aa3c48056e9eef228ba1718ef40fecf9882d Mon Sep 17 00:00:00 2001 From: Zhang Zhuo Date: Tue, 26 Jan 2021 18:29:20 +0800 Subject: [PATCH 1/2] fix: fix host.docker.internal in linux (#61) Remove these lines on windows/macos: ``` extra_hosts: - "host.docker.internal:host-gateway" ``` --- docker/docker-compose.yaml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 8140f62e..a790102e 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -6,6 +6,8 @@ services: restart: always volumes: - ./configs/nginx.conf:/etc/nginx/conf.d/proxy.conf + extra_hosts: + - "host.docker.internal:host-gateway" ports: - "8765:8765" grpcgateway: @@ -14,6 +16,8 @@ services: restart: always volumes: - ../proto/exchange/matchengine.proto:/api.proto + extra_hosts: + - "host.docker.internal:host-gateway" ports: - "50052:50052" command: api.proto -I ../../.. -p 50052 -g host.docker.internal:50051 -m /api From a6493d36abc131fdca18f7fbbb55764a6cacc3a8 Mon Sep 17 00:00:00 2001 From: Fan Date: Thu, 28 Jan 2021 20:07:48 +0800 Subject: [PATCH 2/2] Feature: ticker API and order_trades API (#62) * minor: a better mocking example for kline * minor: adjust mocking * minor: induce human_serde * induce config module for restapi * major: work for #27 and #49 * clippy and fmt * missed clippy * more readable for cache time_interval check * fix key issues in ticker cache Co-authored-by: Weifan Co-authored-by: HAOYUatHZ <37070449+HAOYUatHZ@users.noreply.github.com> --- Cargo.lock | 19 +++- Cargo.toml | 1 + migrations/mock/20210115121017_kline.sql | 19 ++-- src/bin/restapi.rs | 30 ++++-- src/restapi/config.rs | 35 +++++++ src/restapi/mod.rs | 1 + src/restapi/public_history.rs | 74 ++++++++++++++- src/restapi/state.rs | 44 ++++++++- src/restapi/tradingview.rs | 116 +++++++++++++++++++++-- src/restapi/types.rs | 28 ++++++ 10 files changed, 336 insertions(+), 31 deletions(-) create mode 100644 src/restapi/config.rs diff --git a/Cargo.lock b/Cargo.lock index a6476168..591e9fb8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -806,6 +806,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", + "humantime-serde", "hyper", "itertools 0.9.0", "log", @@ -875,7 +876,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36" dependencies = [ "atty", - "humantime", + "humantime 1.3.0", "log", "regex", "termcolor", @@ -1228,6 +1229,22 @@ dependencies = [ "quick-error", ] +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + +[[package]] +name = "humantime-serde" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac34a56cfd4acddb469cc7fff187ed5ac36f498ba085caf8bbc725e3ff474058" +dependencies = [ + "humantime 2.1.0", + "serde 1.0.116", +] + [[package]] name = "hyper" version = "0.13.7" diff --git a/Cargo.toml b/Cargo.toml index d125184e..76430f7e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ sqlx = { version = "0.4.2", features = [ "runtime-tokio-rustls", "postgres", "ch chrono = { version = "0.4.15", features = ["serde"] } rust_decimal = { version = "1.8.1", features = ["postgres", "diesel", "bytes", "byteorder"] } rust_decimal_macros = "1.8.1" +humantime-serde = "1.0" ttl_cache = "0.5.1" itertools = "0.9.0" diff --git a/migrations/mock/20210115121017_kline.sql b/migrations/mock/20210115121017_kline.sql index 05769511..27235ade 100644 --- a/migrations/mock/20210115121017_kline.sql +++ b/migrations/mock/20210115121017_kline.sql @@ -1,10 +1,17 @@ -- Add migration script here -insert into trade_record select time, 'ETH_BTC', 1, random()*0.1, random()*100 -from generate_series(timestamp '2020-01-02 00:00:00', timestamp '2020-01-03 00:00:00', interval '1 s') as time; +insert into trade_record select time, 'ETH_USDT', 1, price, amount, price * amount, 'ask' +from (select time, random()*300 + 1000 as price, random()*10 as amount +from generate_series(now() - interval '1 day', now() + interval '1 day', interval '1 s') as time) t; -insert into trade_record select time, 'ETH_BTC', 2, random()*0.1, random()*100 -from generate_series(timestamp '2020-01-02 00:00:00', timestamp '2020-01-03 00:00:00', interval '2 s') as time; +insert into trade_record select time, 'ETH_USDT', 1, price, amount, price * amount, 'ask' +from (select time, random()*200 + 1000 as price, random()*30 as amount +from generate_series(now() - interval '1 day', now() + interval '1 day', interval '3 s') as time) t; -insert into trade_record select time, 'ETH_BTC', 3, random()*0.1, random()*100 -from generate_series(timestamp '2020-01-02 00:00:00', timestamp '2020-01-03 00:00:00', interval '5 s') as time; \ No newline at end of file +insert into trade_record select time, 'BTC_USDT', 1, price, amount, price * amount, 'ask' +from (select time, random()*200 + 1000 as price, random()*30 as amount +from generate_series(now() - interval '1 day', now() + interval '1 day', interval '3 s') as time) t; + +insert into trade_record select time, 'ETH_USDT', 1, price, amount, price * amount, 'ask' +from (select time, random()*200 + 1000 as price, random()*100 as amount +from generate_series(now() - interval '1 day', now() + interval '1 day', interval '15 s') as time) t; diff --git a/src/bin/restapi.rs b/src/bin/restapi.rs index bbbc39af..e2c54af9 100644 --- a/src/bin/restapi.rs +++ b/src/bin/restapi.rs @@ -14,9 +14,9 @@ use std::sync::Mutex; use dingir_exchange::restapi; use restapi::personal_history::my_orders; -use restapi::public_history::recent_trades; -use restapi::state::AppState; -use restapi::tradingview::{chart_config, history, symbols, unix_timestamp}; +use restapi::public_history::{order_trades, recent_trades}; +use restapi::state::{AppCache, AppState}; +use restapi::tradingview::{chart_config, history, symbols, ticker, unix_timestamp}; use restapi::types::UserInfo; async fn ping(_req: HttpRequest, _data: web::Data) -> impl Responder { @@ -46,21 +46,28 @@ async fn main() -> std::io::Result<()> { let config_file = dotenv::var("CONFIG_FILE").unwrap(); conf.merge(config_rs::File::with_name(&config_file)).unwrap(); + let restapi_cfg: Option = conf.get("restapi").ok(); + let dburl = conf.get_str("db_history").unwrap(); + log::debug!("Prepared db connection: {}", &dburl); + let user_map = web::Data::new(AppState { user_addr_map: Mutex::new(HashMap::new()), db: Pool::::connect(&dburl).await.unwrap(), + config: restapi_cfg.and_then(|v| v.try_into().ok()).unwrap_or_else(Default::default), }); - log::debug!("Prepared db connection: {}", &dburl); + let workers = user_map.config.workers; - HttpServer::new(move || { - App::new().app_data(user_map.clone()).service( + let server = HttpServer::new(move || { + App::new().app_data(user_map.clone()).app_data(AppCache::new()).service( web::scope("/restapi") .route("/ping", web::get().to(ping)) .route("/user/{id_or_addr}", web::get().to(get_user)) .route("/recenttrades/{market}", web::get().to(recent_trades)) + .route("/ordertrades/{market}/{order_id}", web::get().to(order_trades)) .route("/closedorders/{market}/{user_id}", web::get().to(my_orders)) + .route("/ticker_{ticker_inv}/{market}", web::get().to(ticker)) .service( web::scope("/tradingview") .route("/time", web::get().to(unix_timestamp)) @@ -69,8 +76,11 @@ async fn main() -> std::io::Result<()> { .route("/history", web::get().to(history)), ), ) - }) - .bind(("0.0.0.0", 50053))? - .run() - .await + }); + + let server = match workers { + Some(wr) => server.workers(wr), + None => server, + }; + server.bind(("0.0.0.0", 50053))?.run().await } diff --git a/src/restapi/config.rs b/src/restapi/config.rs new file mode 100644 index 00000000..120c734a --- /dev/null +++ b/src/restapi/config.rs @@ -0,0 +1,35 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] +#[serde(default)] +pub struct Trading { + #[serde(with = "humantime_serde")] + pub ticker_update_interval: std::time::Duration, + #[serde(with = "humantime_serde")] + pub ticker_interval: std::time::Duration, +} + +impl Default for Trading { + fn default() -> Self { + Trading { + ticker_update_interval: std::time::Duration::from_secs(5), + ticker_interval: std::time::Duration::from_secs(86_400), + } + } +} + +#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] +#[serde(default)] +pub struct Settings { + pub workers: Option, + pub trading: Trading, +} + +impl Default for Settings { + fn default() -> Self { + Settings { + workers: None, + trading: Default::default(), + } + } +} diff --git a/src/restapi/mod.rs b/src/restapi/mod.rs index 05d43cb4..853032de 100644 --- a/src/restapi/mod.rs +++ b/src/restapi/mod.rs @@ -1,3 +1,4 @@ +pub mod config; pub mod errors; pub mod mock; pub mod personal_history; diff --git a/src/restapi/public_history.rs b/src/restapi/public_history.rs index 65511730..578fa60c 100644 --- a/src/restapi/public_history.rs +++ b/src/restapi/public_history.rs @@ -2,9 +2,16 @@ use actix_web::{web, HttpRequest, Responder}; use actix_web::web::Json; +use crate::models::{ + self, + tablenames::{TRADEHISTORY, TRADERECORD}, +}; use core::cmp::min; -use super::{errors::RpcError, state::AppState}; +use super::{errors::RpcError, state::AppState, types}; +use models::{DecimalDbType, TimestampDbType}; +use rust_decimal::prelude::*; + fn check_market_exists(_market: &str) -> bool { // TODO true @@ -24,10 +31,69 @@ pub async fn recent_trades(req: HttpRequest, data: web::Data) -> impl // and more suitable for fetching latest trades on a market. // models::TradeHistory is designed for a user to fetch his trades. - let trade_table = crate::models::tablenames::TRADERECORD; - let sql_query = format!("select * from {} where market = $1 order by time desc limit {}", trade_table, limit); + let sql_query = format!("select * from {} where market = $1 order by time desc limit {}", TRADERECORD, limit); - let trades: Vec = sqlx::query_as(&sql_query).bind(market).fetch_all(&data.db).await?; + let trades: Vec = sqlx::query_as(&sql_query).bind(market).fetch_all(&data.db).await?; Ok(Json(trades)) } + +#[derive(sqlx::FromRow, Debug, Clone)] +struct QueriedTradeHistory { + pub time: TimestampDbType, + pub user_id: i32, + pub trade_id: i64, + pub order_id: i64, + pub price: DecimalDbType, + pub amount: DecimalDbType, + pub quote_amount: DecimalDbType, + pub fee: DecimalDbType, +} + +#[cfg(sqlxverf)] +fn sqlverf_ticker() { + sqlx::query_as!( + QueriedTradeHistory, + "select time, user_id, trade_id, order_id, + price, amount, quote_amount, fee + from trade_history where market = $1 and order_id = $2 + order by trade_id, time asc", + "USDT_ETH", + 10000, + ); +} + +pub async fn order_trades( + app_state: web::Data, + web::Path((market_name, order_id)): web::Path<(String, i64)>, +) -> Result, RpcError> { + log::debug!("order_trades market {} order_id {}", market_name, order_id); + + let sql_query = format!( + " + select time, user_id, trade_id, order_id, + price, amount, quote_amount, fee + from {} where market = $1 and order_id = $2 + order by trade_id, time asc", + TRADEHISTORY + ); + + let trades: Vec = sqlx::query_as(&sql_query) + .bind(market_name) + .bind(order_id) + .fetch_all(&app_state.db) + .await?; + + Ok(Json(types::OrderTradeResult { + trades: trades + .into_iter() + .map(|v| types::TradeRecord { + time: v.time.timestamp() as i32, + amount: v.amount.to_f32().unwrap_or(0.0), + quote_amount: v.quote_amount.to_f32().unwrap_or(0.0), + price: v.price.to_f32().unwrap_or(0.0), + fee: v.fee.to_f32().unwrap_or(0.0), + }) + .collect(), + })) +} diff --git a/src/restapi/state.rs b/src/restapi/state.rs index 0d19cd69..7dd82272 100644 --- a/src/restapi/state.rs +++ b/src/restapi/state.rs @@ -1,9 +1,51 @@ -use super::types::UserInfo; +use super::config::Settings; +use super::types::{TickerResult, UserInfo}; use sqlx::postgres::Postgres; +use std::cell::RefCell; use std::collections::HashMap; use std::sync::Mutex; pub struct AppState { pub user_addr_map: Mutex>, pub db: sqlx::pool::Pool, + pub config: Settings, +} + +#[derive(Debug)] +pub struct TradingData { + pub ticker_ret_cache: HashMap, +} + +impl TradingData { + pub fn new() -> Self { + TradingData { + ticker_ret_cache: HashMap::new(), + } + } +} + +impl Default for TradingData { + fn default() -> Self { + Self::new() + } +} + +//TLS storage +#[derive(Debug)] +pub struct AppCache { + pub trading: RefCell, +} + +impl AppCache { + pub fn new() -> Self { + AppCache { + trading: TradingData::new().into(), + } + } +} + +impl Default for AppCache { + fn default() -> Self { + Self::new() + } } diff --git a/src/restapi/tradingview.rs b/src/restapi/tradingview.rs index e2b6f0df..b4785417 100644 --- a/src/restapi/tradingview.rs +++ b/src/restapi/tradingview.rs @@ -1,17 +1,19 @@ use actix_web::web::{self, Data, Json}; use actix_web::{HttpRequest, Responder}; +use serde::{Deserialize, Serialize}; use serde_json::json; use std::{ - time::{SystemTime, UNIX_EPOCH}, + time::{Duration, SystemTime, UNIX_EPOCH}, vec, }; use super::errors::RpcError; -use super::types::{KlineReq, KlineResult}; +use super::types::{KlineReq, KlineResult, TickerResult}; +use crate::restapi::state; use super::mock; -const TRADERECORD: &str = "trade_record"; +use crate::models::tablenames::TRADERECORD; // All APIs here follow https://zlq4863947.gitbook.io/tradingview/3-shu-ju-bang-ding/udf @@ -71,9 +73,108 @@ pub async fn symbols(req: HttpRequest) -> Result { .to_string()) } +use chrono::{self, DurationRound}; use futures::TryStreamExt; use rust_decimal::{prelude::*, Decimal}; -use sqlx::types::chrono::NaiveDateTime; +use sqlx::types::chrono::{DateTime, NaiveDateTime, Utc}; + +#[derive(sqlx::FromRow, Debug, Clone)] +struct TickerItem { + first: Decimal, + last: Decimal, + max: Decimal, + min: Decimal, + sum: Decimal, + quote_sum: Decimal, +} + +#[derive(Deserialize)] +pub struct TickerInv(#[serde(with = "humantime_serde")] Duration); + +#[cfg(sqlxverf)] +fn sqlverf_ticker() { + sqlx::query!( + "select first(price, time), last(price, time), max(price), min(price), + sum(amount), sum(quote_amount) as quote_sum from trade_record where market = $1 and time > $2", + "USDT_ETH", + NaiveDateTime::from_timestamp(100_000_000, 0) + ); +} + +pub async fn ticker( + req: HttpRequest, + web::Path((TickerInv(ticker_inv), market_name)): web::Path<(TickerInv, String)>, + app_state: Data, +) -> Result, RpcError> { + let cache = req.app_data::().expect("App cache not found"); + let now_ts: DateTime = SystemTime::now().into(); + let update_inv = app_state.config.trading.ticker_update_interval; + let ticker_ret_cache = &mut cache.trading.borrow_mut().ticker_ret_cache; + + if let Some(cached_resp) = ticker_ret_cache.get(&market_name) { + //consider systemtime may wraparound, we set the valid + //range of cache is [-inv, +inv] on now + let now_ts_dur = Duration::from_secs(now_ts.timestamp() as u64); + let cached_now = Duration::from_secs(cached_resp.to); + log::debug!( + "cache judge {}, {}, {}", + cached_now.as_secs(), + update_inv.as_secs(), + now_ts_dur.as_secs() + ); + if cached_now + update_inv > now_ts_dur && now_ts_dur > cached_now - update_inv { + log::debug!("use cached response"); + return Ok(Json(cached_resp.clone())); + } + } + + let ticker_inv = if ticker_inv > app_state.config.trading.ticker_interval { + app_state.config.trading.ticker_interval + } else { + ticker_inv + }; + + let update_inv = chrono::Duration::from_std(update_inv).map_err(|e| RpcError::unknown(&e.to_string()))?; + let ticker_inv = chrono::Duration::from_std(ticker_inv).map_err(|e| RpcError::unknown(&e.to_string()))?; + let now_ts = now_ts.duration_trunc(update_inv).map_err(|e| RpcError::unknown(&e.to_string()))?; + + let core_query = format!( + "select first(price, time), last(price, time), max(price), min(price), + sum(amount), sum(quote_amount) as quote_sum from {} where market = $1 and time > $2", + TRADERECORD + ); + + let from_ts = now_ts + .clone() + .checked_sub_signed(ticker_inv) + .ok_or_else(|| RpcError::unknown("Internal clock error"))?; + log::debug!("query ticker from {} to {}", from_ts, now_ts); + + let ticker_ret: TickerItem = sqlx::query_as(&core_query) + .bind(&market_name) + .bind(from_ts.naive_utc()) + .fetch_one(&app_state.db) + .await?; + + let ret = TickerResult { + market: market_name.clone(), + change: (ticker_ret.last - ticker_ret.first) + .checked_div(ticker_ret.last) + .and_then(|x| x.to_f32()) + .unwrap_or(9999.9), + last: ticker_ret.last.to_f32().unwrap_or(0.0), + high: ticker_ret.max.to_f32().unwrap_or(0.0), + low: ticker_ret.min.to_f32().unwrap_or(0.0), + volume: ticker_ret.sum.to_f32().unwrap_or(0.0), + quote_volume: ticker_ret.quote_sum.to_f32().unwrap_or(0.0), + from: from_ts.timestamp() as u64, + to: now_ts.timestamp() as u64, + }; + + //update cache + ticker_ret_cache.insert(market_name, ret.clone()); + Ok(Json(ret)) +} #[derive(sqlx::FromRow, Debug, Clone)] struct KlineItem { @@ -85,7 +186,7 @@ struct KlineItem { sum: Decimal, } -#[derive(serde::Serialize, Clone, Debug)] +#[derive(Serialize, Clone, Debug)] pub struct TradeViewError(RpcError); impl From for TradeViewError @@ -120,12 +221,9 @@ impl actix_web::error::ResponseError for TradeViewError { } } -pub async fn history(req_origin: HttpRequest) -> Result, TradeViewError> { +pub async fn history(req_origin: HttpRequest, app_state: Data) -> Result, TradeViewError> { let req: web::Query = web::Query::from_query(req_origin.query_string())?; let req = req.into_inner(); - let app_state = req_origin - .app_data::>() - .expect("App state not found"); log::debug!("kline req {:?}", req); if req.usemock.is_some() { diff --git a/src/restapi/types.rs b/src/restapi/types.rs index e16ec3e1..d73e8cb3 100644 --- a/src/restapi/types.rs +++ b/src/restapi/types.rs @@ -27,7 +27,35 @@ pub struct KlineResult { pub nxt: Option, } +#[derive(Serialize, Deserialize, Default, Debug, Clone)] +pub struct TickerResult { + pub market: String, + #[serde(rename = "price_change_percent")] + pub change: f32, + pub last: f32, + pub high: f32, + pub low: f32, + pub volume: f32, + pub quote_volume: f32, + pub from: u64, + pub to: u64, +} + #[derive(Serialize, Copy, Clone)] pub struct UserInfo { pub user_id: i64, } + +#[derive(Serialize, Deserialize)] +pub struct TradeRecord { + pub time: i32, + pub amount: f32, + pub quote_amount: f32, + pub price: f32, + pub fee: f32, +} + +#[derive(Serialize, Deserialize)] +pub struct OrderTradeResult { + pub trades: Vec, +}