Skip to content

Commit

Permalink
Added GUI visualizer in Python
Browse files Browse the repository at this point in the history
  • Loading branch information
golden-lucky-monkey committed Jun 28, 2022
1 parent 8727ef0 commit 1bb8614
Show file tree
Hide file tree
Showing 7 changed files with 1,594 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "orderbook-delta-bot"
version = "0.3.1"
authors = ["Dinesh Pinto"]
edition = "2021"
description = "A simple Rust trading bot to counter trading deviations in the futures orderbook."
description = "A simple Rust trading bot to counter trade deviations in the futures orderbook."
keywords = ["trading-bot", "cryptocurrency"]
license = "Apache-2.0"
repository = "https://github.com/dineshpinto/orderbook-delta-bot"
Expand Down
187 changes: 187 additions & 0 deletions orderbook-delta-visualizer/ftx_websocket_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
import hmac
import json
import time
import zlib
from collections import defaultdict, deque
from gevent.event import Event
from itertools import zip_longest
from typing import DefaultDict, Deque, List, Dict, Tuple, Optional

from ftx_websocket_manager import WebsocketManager


class FtxWebsocketClient(WebsocketManager):
_ENDPOINT = 'wss://ftx.com/ws/'

def __init__(self) -> None:
super().__init__()
self._trades: DefaultDict[str, Deque] = defaultdict(lambda: deque([], maxlen=10000))
self._fills: Deque = deque([], maxlen=10000)
self._api_key = '' # TODO: Place your API key here
self._api_secret = '' # TODO: Place your API secret here
self._orderbook_update_events: DefaultDict[str, Event] = defaultdict(Event)
self._reset_data()

def _on_open(self, ws):
self._reset_data()

def _reset_data(self) -> None:
self._subscriptions: List[Dict] = []
self._orders: DefaultDict[int, Dict] = defaultdict(dict)
self._tickers: DefaultDict[str, Dict] = defaultdict(dict)
self._orderbook_timestamps: DefaultDict[str, float] = defaultdict(float)
self._orderbook_update_events.clear()
self._orderbooks: DefaultDict[str, Dict[str, DefaultDict[float, float]]] = defaultdict(
lambda: {side: defaultdict(float) for side in {'bids', 'asks'}})
self._orderbook_timestamps.clear()
self._logged_in = False
self._last_received_orderbook_data_at: float = 0.0

def _reset_orderbook(self, market: str) -> None:
if market in self._orderbooks:
del self._orderbooks[market]
if market in self._orderbook_timestamps:
del self._orderbook_timestamps[market]

def _get_url(self) -> str:
return self._ENDPOINT

def _login(self) -> None:
ts = int(time.time() * 1000)
self.send_json({'op': 'login', 'args': {
'key': self._api_key,
'sign': hmac.new(
self._api_secret.encode(), f'{ts}websocket_login'.encode(), 'sha256').hexdigest(),
'time': ts,
}})
self._logged_in = True

def _subscribe(self, subscription: Dict) -> None:
self.send_json({'op': 'subscribe', **subscription})
self._subscriptions.append(subscription)

def _unsubscribe(self, subscription: Dict) -> None:
self.send_json({'op': 'unsubscribe', **subscription})
while subscription in self._subscriptions:
self._subscriptions.remove(subscription)

def get_fills(self) -> List[Dict]:
if not self._logged_in:
self._login()
subscription = {'channel': 'fills'}
if subscription not in self._subscriptions:
self._subscribe(subscription)
return list(self._fills.copy())

def get_orders(self) -> Dict[int, Dict]:
if not self._logged_in:
self._login()
subscription = {'channel': 'orders'}
if subscription not in self._subscriptions:
self._subscribe(subscription)
return dict(self._orders.copy())

def get_trades(self, market: str) -> List[Dict]:
subscription = {'channel': 'trades', 'market': market}
if subscription not in self._subscriptions:
self._subscribe(subscription)
return list(self._trades[market].copy())

def get_orderbook(self, market: str) -> Dict[str, List[Tuple[float, float]]]:
subscription = {'channel': 'orderbook', 'market': market}
if subscription not in self._subscriptions:
self._subscribe(subscription)
if self._orderbook_timestamps[market] == 0:
self.wait_for_orderbook_update(market, 5)
return {
side: sorted(
[(price, quantity) for price, quantity in list(self._orderbooks[market][side].items())
if quantity],
key=lambda order: order[0] * (-1 if side == 'bids' else 1)
)
for side in {'bids', 'asks'}
}

def get_orderbook_timestamp(self, market: str) -> float:
return self._orderbook_timestamps[market]

def wait_for_orderbook_update(self, market: str, timeout: Optional[float]) -> None:
subscription = {'channel': 'orderbook', 'market': market}
if subscription not in self._subscriptions:
self._subscribe(subscription)
self._orderbook_update_events[market].wait(timeout)

def get_ticker(self, market: str) -> Dict:
subscription = {'channel': 'ticker', 'market': market}
if subscription not in self._subscriptions:
self._subscribe(subscription)
return self._tickers[market]

def _handle_orderbook_message(self, message: Dict) -> None:
market = message['market']
subscription = {'channel': 'orderbook', 'market': market}
if subscription not in self._subscriptions:
return
data = message['data']
if data['action'] == 'partial':
self._reset_orderbook(market)
for side in {'bids', 'asks'}:
book = self._orderbooks[market][side]
for price, size in data[side]:
if size:
book[price] = size
else:
del book[price]
self._orderbook_timestamps[market] = data['time']
checksum = data['checksum']
orderbook = self.get_orderbook(market)
checksum_data = [
':'.join([f'{float(order[0])}:{float(order[1])}' for order in (bid, offer) if order])
for (bid, offer) in zip_longest(orderbook['bids'][:100], orderbook['asks'][:100])
]

computed_result = int(zlib.crc32(':'.join(checksum_data).encode()))
if computed_result != checksum:
self._last_received_orderbook_data_at = 0
self._reset_orderbook(market)
self._unsubscribe({'market': market, 'channel': 'orderbook'})
self._subscribe({'market': market, 'channel': 'orderbook'})
else:
self._orderbook_update_events[market].set()
self._orderbook_update_events[market].clear()

def _handle_trades_message(self, message: Dict) -> None:
self._trades[message['market']].append(message['data'])

def _handle_ticker_message(self, message: Dict) -> None:
self._tickers[message['market']] = message['data']

def _handle_fills_message(self, message: Dict) -> None:
self._fills.append(message['data'])

def _handle_orders_message(self, message: Dict) -> None:
data = message['data']
self._orders.update({data['id']: data})

def _on_message(self, ws, raw_message: str) -> None:
message = json.loads(raw_message)
message_type = message['type']
if message_type in {'subscribed', 'unsubscribed'}:
return
elif message_type == 'info':
if message['code'] == 20001:
return self.reconnect()
elif message_type == 'error':
raise Exception(message)
channel = message['channel']

if channel == 'orderbook':
self._handle_orderbook_message(message)
elif channel == 'trades':
self._handle_trades_message(message)
elif channel == 'ticker':
self._handle_ticker_message(message)
elif channel == 'fills':
self._handle_fills_message(message)
elif channel == 'orders':
self._handle_orders_message(message)
91 changes: 91 additions & 0 deletions orderbook-delta-visualizer/ftx_websocket_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import json
import time
import websocket
from threading import Thread, Lock


class WebsocketManager:
_CONNECT_TIMEOUT_S = 5

def __init__(self):
self.connect_lock = Lock()
self.ws = None

def _get_url(self):
raise NotImplementedError()

def _on_message(self, ws, message):
raise NotImplementedError()

def send(self, message):
self.connect()
self.ws.send(message)

def send_json(self, message):
self.send(json.dumps(message))

def _connect(self):
assert not self.ws, "ws should be closed before attempting to connect"

self.ws = websocket.WebSocketApp(
self._get_url(),
on_message=self._wrap_callback(self._on_message),
on_close=self._wrap_callback(self._on_close),
on_error=self._wrap_callback(self._on_error),
)

wst = Thread(target=self._run_websocket, args=(self.ws,))
wst.daemon = True
wst.start()

# Wait for socket to connect
ts = time.time()
while self.ws and (not self.ws.sock or not self.ws.sock.connected):
if time.time() - ts > self._CONNECT_TIMEOUT_S:
self.ws = None
return
time.sleep(0.1)

def _wrap_callback(self, f):
def wrapped_f(ws, *args, **kwargs):
if ws is self.ws:
try:
f(ws, *args, **kwargs)
except Exception as e:
raise Exception(f'Error running websocket callback: {e}')

return wrapped_f

def _run_websocket(self, ws):
try:
ws.run_forever()
except Exception as e:
raise Exception(f'Unexpected error while running websocket: {e}')
finally:
self._reconnect(ws)

def _reconnect(self, ws):
assert ws is not None, '_reconnect should only be called with an existing ws'
if ws is self.ws:
self.ws = None
ws.close()
self.connect()

def connect(self):
if self.ws:
return
with self.connect_lock:
while not self.ws:
self._connect()
if self.ws:
return

def _on_close(self, ws):
self._reconnect(ws)

def _on_error(self, ws, _):
self._reconnect(ws)

def reconnect(self) -> None:
if self.ws is not None:
self._reconnect(self.ws)
Loading

0 comments on commit 1bb8614

Please sign in to comment.