Skip to content

Commit

Permalink
binance: add deposits/withdrawals API support
Browse files Browse the repository at this point in the history
From @guilledk,
- Drop Decimal quantize for now
- Minor tweaks to trades_dialogue proto
  • Loading branch information
Guillermo Rodriguez authored and goodboy committed Jun 9, 2023
1 parent d1f1693 commit 5273ad8
Showing 1 changed file with 128 additions and 61 deletions.
189 changes: 128 additions & 61 deletions piker/brokers/binance.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@
)
import hmac
import time
import decimal
import hashlib
from pathlib import Path

import trio
from trio_typing import TaskStatus
import pendulum
from pendulum import (
now,
from_timestamp,
)
import asks
from fuzzywuzzy import process as fuzzy
import numpy as np
Expand Down Expand Up @@ -78,10 +80,10 @@
from ..clearing._messages import (
BrokerdOrder,
BrokerdOrderAck,
# BrokerdCancel,
#BrokerdStatus,
#BrokerdPosition,
#BrokerdFill,
BrokerdStatus,
BrokerdPosition,
BrokerdFill,
BrokerdCancel,
# BrokerdError,
)

Expand All @@ -104,6 +106,7 @@ def get_config() -> dict:


_url = 'https://api.binance.com'
_sapi_url = 'https://api.binance.com'
_fapi_url = 'https://testnet.binancefuture.com'


Expand Down Expand Up @@ -243,18 +246,25 @@ def __init__(self) -> None:
self._sesh = asks.Session(connections=4)
self._sesh.base_location: str = _url

# testnet EP sesh
# futes testnet rest EPs
self._fapi_sesh = asks.Session(connections=4)
self._fapi_sesh.base_location = _fapi_url

# sync rest API
self._sapi_sesh = asks.Session(connections=4)
self._sapi_sesh.base_location = _sapi_url

conf: dict = get_config()
self.api_key: str = conf.get('api_key', '')
self.api_secret: str = conf.get('api_secret', '')

self.watchlist = conf.get('watchlist', [])

if self.api_key:
api_key_header = {'X-MBX-APIKEY': self.api_key}
self._sesh.headers.update(api_key_header)
self._fapi_sesh.headers.update(api_key_header)
self._sapi_sesh.headers.update(api_key_header)

def _get_signature(self, data: OrderedDict) -> str:

Expand Down Expand Up @@ -315,6 +325,25 @@ async def _fapi(

return resproc(resp, log)

async def _sapi(
self,
method: str,
params: Union[dict, OrderedDict],
signed: bool = False,
action: str = 'get'
) -> dict[str, Any]:

if signed:
params['signature'] = self._get_signature(params)

resp = await getattr(self._sapi_sesh, action)(
path=f'/sapi/v1/{method}',
params=params,
timeout=float('inf')
)

return resproc(resp, log)

async def exch_info(
self,
sym: str | None = None,
Expand Down Expand Up @@ -397,7 +426,7 @@ async def bars(
) -> dict:

if end_dt is None:
end_dt = pendulum.now('UTC').add(minutes=1)
end_dt = now('UTC').add(minutes=1)

if start_dt is None:
start_dt = end_dt.start_of(
Expand Down Expand Up @@ -446,6 +475,58 @@ async def bars(
array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars
return array

async def get_positions(
self,
recv_window: int = 60000
) -> tuple:
positions = {}
volumes = {}

for sym in self.watchlist:
log.info(f'doing {sym}...')
params = OrderedDict([
('symbol', sym),
('recvWindow', recv_window),
('timestamp', binance_timestamp(now()))
])
resp = await self._api(
'allOrders',
params=params,
signed=True
)
log.info(f'done. len {len(resp)}')
await trio.sleep(3)

return positions, volumes

async def get_deposits(
self,
recv_window: int = 60000
) -> list:

params = OrderedDict([
('recvWindow', recv_window),
('timestamp', binance_timestamp(now()))
])
return await self._sapi(
'capital/deposit/hisrec',
params=params,
signed=True)

async def get_withdrawls(
self,
recv_window: int = 60000
) -> list:

params = OrderedDict([
('recvWindow', recv_window),
('timestamp', binance_timestamp(now()))
])
return await self._sapi(
'capital/withdraw/history',
params=params,
signed=True)

async def submit_limit(
self,
symbol: str,
Expand All @@ -463,18 +544,8 @@ async def submit_limit(

await self.cache_symbols()

asset_precision = self._pairs[symbol]['baseAssetPrecision']
quote_precision = self._pairs[symbol]['quoteAssetPrecision']

quantity = Decimal(quantity).quantize(
Decimal(1 ** -asset_precision),
rounding=decimal.ROUND_HALF_EVEN
)

price = Decimal(price).quantize(
Decimal(1 ** -quote_precision),
rounding=decimal.ROUND_HALF_EVEN
)
# asset_precision = self._pairs[symbol]['baseAssetPrecision']
# quote_precision = self._pairs[symbol]['quoteAssetPrecision']

params = OrderedDict([
('symbol', symbol),
Expand All @@ -485,21 +556,21 @@ async def submit_limit(
('price', price),
('recvWindow', recv_window),
('newOrderRespType', 'ACK'),
('timestamp', binance_timestamp(pendulum.now()))
('timestamp', binance_timestamp(now()))
])

if oid:
params['newClientOrderId'] = oid

resp = await self._api(
'order/test', # TODO: switch to real `order` endpoint
'order',
params=params,
signed=True,
action='post'
)

assert resp['orderId'] == oid
return oid
log.info(resp)
# return resp['orderId']
return resp['orderId']

async def submit_cancel(
self,
Expand All @@ -513,22 +584,22 @@ async def submit_cancel(
('symbol', symbol),
('orderId', oid),
('recvWindow', recv_window),
('timestamp', binance_timestamp(pendulum.now()))
('timestamp', binance_timestamp(now()))
])

await self._api(
return await self._api(
'order',
params=params,
signed=True,
action='delete'
)

async def get_listen_key(self) -> str:
return await self._api(
return (await self._api(
'userDataStream',
params={},
action='post'
)['listenKey']
))['listenKey']

async def keep_alive_key(self, listen_key: str) -> None:
await self._fapi(
Expand Down Expand Up @@ -559,7 +630,7 @@ async def periodic_keep_alive(
key = await self.get_listen_key()

async with trio.open_nursery() as n:
n.start_soon(periodic_keep_alive, key)
n.start_soon(periodic_keep_alive, self, key)
yield key
n.cancel_scope.cancel()

Expand Down Expand Up @@ -730,8 +801,8 @@ async def get_ohlc(
if (inow - times[-1]) > 60:
await tractor.breakpoint()

start_dt = pendulum.from_timestamp(times[0])
end_dt = pendulum.from_timestamp(times[-1])
start_dt = from_timestamp(times[0])
end_dt = from_timestamp(times[-1])

return array, start_dt, end_dt

Expand Down Expand Up @@ -870,15 +941,15 @@ async def subscribe(ws: NoBsWs):
# hz = 1/period if period else float('inf')
# if hz > 60:
# log.info(f'Binance quotez : {hz}')

topic = msg['symbol'].lower()
await send_chan.send({topic: msg})

if typ == 'l1':
topic = msg['symbol'].lower()
await send_chan.send({topic: msg})
# last = time.time()


async def handle_order_requests(
ems_order_stream: tractor.MsgStream,
symbol: str
ems_order_stream: tractor.MsgStream
) -> None:
async with open_cached_client('binance') as client:
async for request_msg in ems_order_stream:
Expand Down Expand Up @@ -935,43 +1006,39 @@ async def trades_dialogue(
# ledger: TransactionLedger

# TODO: load pps and accounts using accounting apis!
# positions: dict = {}
# accounts: set[str] = set()
# await ctx.started((positions, {}))
positions: list[BrokerdPosition] = []
accounts: list[str] = ['binance.default']
await ctx.started((positions, accounts))

async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
open_cached_client('binance') as client,
# client.manage_listen_key() as listen_key,
client.manage_listen_key() as listen_key,
):
n.start_soon(handle_order_requests, ems_stream)
await trio.sleep_forever()
# await trio.sleep_forever()

async with open_autorecon_ws(
f'wss://stream.binance.com:9443/ws/{listen_key}',
) as ws:
event = await ws.recv_msg()

# https://binance-docs.github.io/apidocs/spot/en/#payload-balance-update
if event.get('e') == 'executionReport':
"""
https://binance-docs.github.io/apidocs/spot/en/#payload-balance-update
"""

oid = event.get('c')
side = event.get('S').lower()
status = event.get('X')
order_qty = float(event.get('q'))
filled_qty = float(event.get('z'))
cumm_transacted_qty = float(event.get('Z'))
price_avg = cum_transacted_qty / filled_qty

broker_time = float(event.get('T'))

commission_amount = float(event.get('n'))
commission_asset = event.get('N')

if status == 'TRADE':
oid: str = event.get('c')
side: str = event.get('S').lower()
status: str = event.get('X')
order_qty: float = float(event.get('q'))
filled_qty: float = float(event.get('z'))
cum_transacted_qty: float = float(event.get('Z'))
price_avg: float = cum_transacted_qty / filled_qty
broker_time: float = float(event.get('T'))
commission_amount: float = float(event.get('n'))
commission_asset: float = event.get('N')

if status == 'TRADE':
if order_qty == filled_qty:
msg = BrokerdFill(
reqid=oid,
Expand All @@ -990,7 +1057,7 @@ async def trades_dialogue(
)

else:
if status == 'NEW':
if status == 'NEW':
status = 'submitted'

elif status == 'CANCELED':
Expand Down

0 comments on commit 5273ad8

Please sign in to comment.