Skip to content

Commit

Permalink
chore: support apple M1! Known issues: (1) host.docker.internal still…
Browse files Browse the repository at this point in the history
… not works, so I have to use local nginx & grpcgateway in the host; (2) wurstmeister/kafka-docker#516 occurs sometimes
  • Loading branch information
lispc committed Feb 1, 2021
1 parent e102b54 commit d7e0a12
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 187 deletions.
263 changes: 106 additions & 157 deletions Cargo.lock

Large diffs are not rendered by default.

17 changes: 9 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ authors = ["lispczz <[email protected]>"]
edition = "2018"

[dependencies]
log = "0.4.13"
log = "0.4.14"
env_logger = "0.8.2"
config_rs = { package = "config", version = "0.10.1" }
serde = { version = "1.0.120", features = ["derive"] }
serde = { version = "1.0.123", features = ["derive"] }
serde_json = "1.0.61"
prost = "0.7.0"
prost-types = "0.7.0"
Expand All @@ -18,19 +18,20 @@ futures-core = { version = "0.3.12", default-features = false }
futures-channel = "0.3.12"
futures-util = { version = "0.3.12", default-features = false }

tokio = { version = "1.0.2", features = ["full"] }
tokio = { version = "1.1.1", features = ["full"] }
thread-id = "3.3.0"

futures = "0.3.12"
hyper = "0.14.2"
crossbeam-channel = "0.5.0"
rdkafka = "0.24.0"
rdkafka = { version = "0.25.0", features = ["cmake-build"] }
nix = "0.19.1"
anyhow = "1.0.38"
sqlx = { git = "https://github.com/launchbadge/sqlx.git", features=["runtime-tokio-rustls", "postgres", "chrono", "decimal", "json", "migrate" ] }
chrono = { version = "0.4.19", features = ["serde"] }
rust_decimal = { version = "1.10.0", features = ["postgres", "diesel", "bytes", "byteorder"] }
rust_decimal_macros = "1.10.0"
humantime-serde = "1.0"
rust_decimal = { version = "1.10.1", features = ["postgres", "bytes", "byteorder"] }
rust_decimal_macros = "1.10.1"
humantime-serde = "1.0.1"

ttl_cache = "0.5.1"
itertools = "0.10.0"
Expand All @@ -40,7 +41,7 @@ tonic = "0.4.0"
actix-web = "4.0.0-beta.1"
qstring = "0.7.2"
thiserror = "1.0.23"
rand = "0.8.2"
rand = "0.8.3"

[build-dependencies]
prost = "0.7.0"
Expand Down
File renamed without changes.
7 changes: 4 additions & 3 deletions docker/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ services:
container_name: nginx
restart: always
volumes:
- ./configs/nginx.conf:/etc/nginx/conf.d/proxy.conf
- ./configs/proxy.conf:/etc/nginx/conf.d/proxy.conf
extra_hosts:
- "host.docker.internal:host-gateway"
ports:
Expand All @@ -22,7 +22,7 @@ services:
- "50052:50052"
command: api.proto -I ../../.. -p 50052 -g host.docker.internal:50051 -m /api
db:
image: timescale/timescaledb:2.0.0-pg12
image: timescale/timescaledb:2.0.1-pg12
container_name: exchange_pq
restart: always
volumes:
Expand All @@ -35,7 +35,7 @@ services:
POSTGRES_PASSWORD: exchange_AA9944

zookeeper:
image: wurstmeister/zookeeper
image: zookeeper
container_name: exchange_zookeeper
ports:
- "2181:2181"
Expand All @@ -56,3 +56,4 @@ services:
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_CREATE_TOPICS: "trades:1:1,orders:1:1,balances:1:1"
13 changes: 9 additions & 4 deletions examples/js/print_orders.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import axios from "axios";
async function main() {
const server = "0.0.0.0";
console.log("closed orders:");
console.log(
(await axios.get(`http://${server}:8765/restapi/closedorders/ETH_USDT/3`))
.data
);
console.log("active orders:");
console.log(
(await axios.get("http://localhost:8765/api/orders/ETH_USDT/3")).data
(await axios.get(`http://${server}:8765/api/orders/ETH_USDT/3`)).data
);
console.log("closed orders:");
console.log("market ticker:");
console.log(
(await axios.get("http://localhost:8765/restapi/closedorders/ETH_USDT/3"))
.data
(await axios.get(`http://${server}:8765/restapi/ticker_24h/ETH_USDT`)).data
);
}
main().catch(console.log);
6 changes: 6 additions & 0 deletions examples/js/trade.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ async function tradeTest() {
console.log("bid order id", bidOrder.id);
await testStatusAfterTrade(askOrder.id, bidOrder.id);

const testReload = false;
if (testReload) {
await debugReload();
await testStatusAfterTrade(askOrder.id, bidOrder.id);
}

console.log("tradeTest passed!");
return [askOrder.id, bidOrder.id];
}
Expand Down
22 changes: 11 additions & 11 deletions src/bin/persistor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,17 @@ fn main() {
.build()
.expect("build runtime");

let consumer: StreamConsumer = rdkafka::config::ClientConfig::new()
.set("bootstrap.servers", &settings.brokers)
.set("group.id", &settings.consumer_group)
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "true")
.create()
.unwrap();
let consumer = AppliedConsumer(consumer);

rt.block_on(async move {
let consumer: StreamConsumer = rdkafka::config::ClientConfig::new()
.set("bootstrap.servers", &settings.brokers)
.set("group.id", &settings.consumer_group)
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "true")
.create()
.unwrap();
let consumer = AppliedConsumer(consumer);

MIGRATOR
.run(&mut ConnectionType::connect(&settings.db_history).await.unwrap())
.await
Expand Down Expand Up @@ -80,7 +80,7 @@ fn main() {
break;
},

err = cr_main.run_stream(|cr|cr.start()) => {
err = cr_main.run_stream(|cr|cr.stream()) => {
log::error!("Kafka consumer error: {}", err);
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use anyhow::{anyhow, Result};
use crossbeam_channel::RecvTimeoutError;
use rdkafka::client::ClientContext;
use rdkafka::config::ClientConfig;
use rdkafka::error::{KafkaError, RDKafkaError};
use rdkafka::producer::{BaseProducer, BaseRecord, DeliveryResult, ProducerContext};
use rdkafka::error::{KafkaError, RDKafkaErrorCode};
use rdkafka::producer::{BaseProducer, BaseRecord, DeliveryResult, Producer, ProducerContext};

use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -103,7 +103,7 @@ impl KafkaMessageSender {
let result = self.producer.send(record);
if result.is_err() {
log::error!("fail to push message {} to {}", message, topic_name);
if let Err((KafkaError::MessageProduction(RDKafkaError::QueueFull), _)) = result {
if let Err((KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull), _)) = result {
list.push_back(message.to_string());
return Ok(());
}
Expand Down Expand Up @@ -131,7 +131,7 @@ impl KafkaMessageSender {

if result.is_err() {
//println!("fail to push message {} to {}", message_str, topic_name);
if let Err((KafkaError::MessageProduction(RDKafkaError::QueueFull), _)) = result {
if let Err((KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull), _)) = result {
break;
}
}
Expand Down

0 comments on commit d7e0a12

Please sign in to comment.