Skip to content

Commit

Permalink
optimizing and fixing
Browse files Browse the repository at this point in the history
  • Loading branch information
oliver-zehentleitner committed May 27, 2024
1 parent 42c25f5 commit 559792b
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 68 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/) and this p
[How to upgrade to the latest version!](https://unicorn-binance-local-depth-cache.docs.lucit.tech/readme.html#installation-and-upgrade)

## 2.0.0.dev (development stage/unreleased/unstable)
### Added
- Handling all stream signals of UBWA clearly.
- More granular and efficient transfer of update values.
### Fixed
- Filtering and removing 0 values now works with all formats. (0.0, 0.000, 0.0000000, ...)


## 2.0.0
Scaling. The core functions have been rewritten in this update. Instead of one stream per depth_cache, we now use one
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ If you like the project, please
[GitHub](https://github.com/LUCIT-Systems-and-Development/unicorn-binance-local-depth-cache)!

## Live Demo
This live demo script runs DepthCaches from [binance.com-futues](https://www.binance.com) and runs on a *CCX13 * virtual
This live demo script runs DepthCaches from [binance.com](https://www.binance.com) and runs on a *CCX13* virtual
machine of [HETZNER CLOUD](https://hetzner.cloud/?ref=rKgYRMq0l8fd).

[Open live monitor!](https://www.lucit.tech/unicorn-binance-local-depth-cache-live-demo.html)
Expand Down
2 changes: 2 additions & 0 deletions dev/reset.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

rm *.log
rm *.py.log
rm *.c
rm dev/*.log

rm build -r
Expand All @@ -11,6 +12,7 @@ rm stubs -r
rm out -r

rm unicorn_binance_local_depth_cache/*.c
rm unicorn_binance_local_depth_cache/*.cpp
rm unicorn_binance_local_depth_cache/*.html
rm unicorn_binance_local_depth_cache/*.dll
rm unicorn_binance_local_depth_cache/*.so
Expand Down
2 changes: 1 addition & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ dependencies:
- lucit::lucit-licensing-python>=1.8.2
- lucit::unicorn-binance-rest-api>=2.6.1
- lucit::unicorn-binance-websocket-api>=2.8.0
- cython
- cython>=3.0.10
- requests>=2.31.0
4 changes: 2 additions & 2 deletions meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ requirements:
- lucit::lucit-licensing-python >=1.8.2
- lucit::unicorn-binance-rest-api >=2.6.1
- lucit::unicorn-binance-websocket-api >=2.8.0
- cython
- cython >=3.0.10
- requests >=2.31.0
run:
- python
- lucit::lucit-licensing-python >=1.8.2
- lucit::unicorn-binance-rest-api >=2.6.1
- lucit::unicorn-binance-websocket-api >=2.8.0
- cython
- cython >=3.0.10
- requests >=2.31.0

dependencies:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# ./pyproject.toml
# ./setup.py

Cython
Cython>=3.0.10
lucit-licensing-python>=1.8.2
requests>=2.31.0
unicorn-binance-rest-api>=2.6.1
Expand Down
39 changes: 23 additions & 16 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,29 @@
]

# Setup
print("Generating stub files ...")
os.makedirs(stubs_dir, exist_ok=True)
GEN_STUBS = True
for filename in os.listdir(source_dir):
if filename.endswith('.py'):
source_path = os.path.join(source_dir, filename)
subprocess.run(['stubgen', '-o', stubs_dir, source_path], check=True)
for stub_file in os.listdir(os.path.join(stubs_dir, source_dir)):
if stub_file.endswith('.pyi'):
source_stub_path = os.path.join(stubs_dir, source_dir, stub_file)
if os.path.exists(os.path.join(source_dir, stub_file)):
print(f"Skipped moving {source_stub_path} because {os.path.join(source_dir, stub_file)} already exists!")
else:
shutil.move(source_stub_path, source_dir)
print(f"Moved {source_stub_path} to {source_dir}!")
shutil.rmtree(os.path.join(stubs_dir))
print("Stub files generated and moved successfully.")
if filename.endswith('.pyi'):
GEN_STUBS = False
if GEN_STUBS is False:
print("Skipping stub files ...")
else:
print("Generating stub files ...")
os.makedirs(stubs_dir, exist_ok=True)
for filename in os.listdir(source_dir):
if filename.endswith('.py'):
source_path = os.path.join(source_dir, filename)
subprocess.run(['stubgen', '-o', stubs_dir, source_path], check=True)
for stub_file in os.listdir(os.path.join(stubs_dir, source_dir)):
if stub_file.endswith('.pyi'):
source_stub_path = os.path.join(stubs_dir, source_dir, stub_file)
if os.path.exists(os.path.join(source_dir, stub_file)):
print(f"Skipped moving {source_stub_path} because {os.path.join(source_dir, stub_file)} already exists!")
else:
shutil.move(source_stub_path, source_dir)
print(f"Moved {source_stub_path} to {source_dir}!")
shutil.rmtree(os.path.join(stubs_dir))
print("Stub files generated and moved successfully.")

with open("README.md", "r") as fh:
print("Using README.md content as `long_description` ...")
Expand All @@ -65,7 +72,7 @@
long_description=long_description,
long_description_content_type="text/markdown",
license='LSOSL - LUCIT Synergetic Open Source License',
install_requires=['lucit-licensing-python>=1.8.2', 'Cython', 'requests>=2.31.0',
install_requires=['lucit-licensing-python>=1.8.2', 'Cython>=3.0.10', 'requests>=2.31.0',
'unicorn-binance-websocket-api>=2.8.0', 'unicorn-binance-rest-api>=2.6.1'],
keywords='binance, depth cache',
project_urls={
Expand Down
111 changes: 64 additions & 47 deletions unicorn_binance_local_depth_cache/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def __init__(self, exchange: str = "binance.com",
super().__init__()
self.name = __app_name__
self.version = __version__
logger.info(f"New instance of {self.get_user_agent()}-{'compiled' if cython.compiled else 'source'} on "
logger.info(f"New instance of {self.get_user_agent()}-{'compiled-nogil' if cython.compiled else 'source'} on "
f"{str(platform.system())} {str(platform.release())} for exchange {exchange} started ...")
self.exchange = exchange
self.depth_caches: dict = {}
Expand Down Expand Up @@ -263,7 +263,7 @@ def _add_ask(self, ask: list = None, market: str = None) -> bool:
market = market.lower()
with self.threading_lock_ask[market]:
self.depth_caches[market]['asks'][ask[0]] = float(ask[1])
if ask[1] == "0.00000000" or ask[1] == "0.000":
if float(ask[1]) == 0.0:
logger.debug(f"BinanceLocalDepthCacheManager._add_ask() - Deleting depth position {ask[0]} on ask "
f"side for market '{market}'")
del self.depth_caches[market]['asks'][ask[0]]
Expand All @@ -285,32 +285,34 @@ def _add_bid(self, bid: list = None, market: str = None) -> bool:
market = market.lower()
with self.threading_lock_bid[market]:
self.depth_caches[market]['bids'][bid[0]] = float(bid[1])
if bid[1] == "0.00000000" or bid[1] == "0.000":
if float(bid[1]) == 0.0:
logger.debug(f"BinanceLocalDepthCacheManager._add_bid() - Deleting depth position {bid[0]} on bid "
f"side for market '{market}'")
del self.depth_caches[market]['bids'][bid[0]]
return True

def _apply_updates(self, order_book: dict = None, market: str = None) -> bool:
def _apply_updates(self, asks: list = None, bids: list = None, market: str = None) -> bool:
"""
Apply updates to a specific depth_cache
:param order_book: Provide order_book data from rest or ws
:type order_book: dict
:param asks: Provide asks data
:type asks: list
:param bids: Provide bids data
:type bids: list
:param market: Specify the market symbol for the used depth_cache
:type market: str
:return: bool
"""
if order_book is None or market is None:
logger.debug(f"BinanceLocalDepthCacheManager._apply_updates() - Parameter `order_book` and `market` are "
if asks is None or bids is None or market is None:
logger.debug(f"BinanceLocalDepthCacheManager._apply_updates() - Parameter `asks`, `bids` and `market` are "
f"mandatory!")
return False
market = market.lower()
logger.debug(f"BinanceLocalDepthCacheManager._apply_updates() - Applying updates to the depth_cache with "
f"market {market}")
for ask in order_book.get('a', []) + order_book.get('asks', []):
for ask in asks:
self._add_ask(ask, market=market)
for bid in order_book.get('b', []) + order_book.get('bids', []):
for bid in bids:
self._add_bid(bid, market=market)
return True

Expand Down Expand Up @@ -424,11 +426,7 @@ def _init_depth_cache(self, market: str = None) -> bool:
logger.error(f"BinanceLocalDepthCacheManager._init_depth_cache(market={market}) - KeyError: {error_msg}")
self.depth_caches[market]['refresh_request'] = True
return False
self._apply_updates(order_book, market=market)
for bid in order_book['bids']:
self._add_bid(bid, market=market)
for ask in order_book['asks']:
self._add_ask(ask, market=market)
self._apply_updates(asks=order_book['asks'], bids=order_book['bids'], market=market)
logger.debug(f"BinanceLocalDepthCacheManager._init_depth_cache(market={market}) - Finished initialization!")
return True

Expand Down Expand Up @@ -546,7 +544,7 @@ async def _manage_depth_cache_async(self, stream_id=None) -> None:
logger.debug(f"BinanceLocalDepthCacheManager._manage_depth_cache_async(stream_id={stream_id}) - "
f"Applying regular depth update to the depth_cache with market {market} - update_id: "
f"{stream_data['data']['U']} - {stream_data['data']['u']}")
self._apply_updates(stream_data['data'], market=market)
self._apply_updates(asks=stream_data['data']['a'], bids=stream_data['data']['b'], market=market)
self.depth_caches[market]['last_update_id'] = int(stream_data['data']['u'])
self.depth_caches[market]['last_update_time'] = int(time.time())
self.ubwa.asyncio_queue_task_done(stream_id=stream_id)
Expand All @@ -572,9 +570,9 @@ async def _manage_depth_cache_async(self, stream_id=None) -> None:
if int(stream_data['data']['U']) <= self.depth_caches[market]['last_update_id'] + 1 \
<= int(stream_data['data']['u']):
# The first processed event should have U <= lastUpdateId+1 AND u >= lastUpdateId+1.
self._apply_updates(stream_data['data'], market=market)
self._apply_updates(asks=stream_data['data']['a'], bids=stream_data['data']['b'], market=market)
logger.info(f"BinanceLocalDepthCacheManager._manage_depth_cache_async(stream_id={stream_id}) -"
f" Finished initialization of the cache with market {market}")
f" Finished initialization of the cache with market {market} (Spot)")
# Init (refresh) finished
last_sync_time = time.time()
self.depth_caches[market]['last_update_id'] = int(stream_data['data']['u'])
Expand All @@ -594,9 +592,9 @@ async def _manage_depth_cache_async(self, stream_id=None) -> None:
if int(stream_data['data']['U']) <= self.depth_caches[market]['last_update_id'] \
<= int(stream_data['data']['u']):
# The first processed event should have U <= lastUpdateId AND u >= lastUpdateId
self._apply_updates(stream_data['data'], market=market)
self._apply_updates(asks=stream_data['data']['a'], bids=stream_data['data']['b'], market=market)
logger.info(f"BinanceLocalDepthCacheManager._manage_depth_cache_async(stream_id={stream_id}) - "
f"Finished initialization of the cache with market {market}")
f"Finished initialization of the cache with market {market} (Futures)")
# Init (refresh) finished
last_sync_time = time.time()
self.depth_caches[market]['last_update_id'] = int(stream_data['data']['u'])
Expand All @@ -621,10 +619,12 @@ def _process_stream_signals(self, signal_type=None, stream_id=None, data_record=
"""
logger.debug(f"BinanceLocalDepthCacheManager._process_stream_signals() - received stream_signal: "
f"{signal_type} - {stream_id} - {data_record} - {error_msg}")
if self.is_stop_request() is True:
return None

if signal_type == "DISCONNECT":
if signal_type == "CONNECT":
logger.debug(f"BinanceLocalDepthCacheManager._process_stream_signals(stream_id={stream_id}) - Received "
f"stream_signal {signal_type} - Setting stream_status to `CONNECTED`")
self.stream_status = "CONNECTED"
elif signal_type == "DISCONNECT":
logger.debug(f"BinanceLocalDepthCacheManager._process_stream_signals(stream_id={stream_id}) - Received "
f"stream_signal {signal_type} - Setting all caches to synchronized is False and triggering a "
f"refresh.")
Expand All @@ -641,9 +641,13 @@ def _process_stream_signals(self, signal_type=None, stream_id=None, data_record=
logger.debug(f"BinanceLocalDepthCacheManager._process_stream_signals(stream_id={stream_id}) - Received "
f"stream_signal {signal_type} - Setting stream_status to `RUNNING`")
self.stream_status = "RUNNING"
else:
elif signal_type == "STOP":
logger.debug(f"BinanceLocalDepthCacheManager._process_stream_signals(stream_id={stream_id}) - Received "
f"stream_signal {signal_type} - Setting stream_status to `{signal_type}`")
f"stream_signal {signal_type} - Setting stream_status to `STOPPED`")
self.stream_status = "STOPPED"
else:
logger.error(f"BinanceLocalDepthCacheManager._process_stream_signals(stream_id={stream_id}) - Received "
f"unexpected stream_signal {signal_type} - Setting stream_status to `{signal_type}`")
self.stream_status = signal_type

def _reset_depth_cache(self, market: str = None) -> bool:
Expand All @@ -670,7 +674,7 @@ def _sort_depth_cache(items: dict,
reverse: bool = False,
threshold_volume: float = None) -> list:
"""
Sort asks or bids by price
Returns sorted asks or bids by price
:param items: asks or bids
:type items: dict
Expand All @@ -682,7 +686,8 @@ def _sort_depth_cache(items: dict,
:type threshold_volume: float
:return: list
"""
logger.debug(f"BinanceLocalDepthCacheManager._sort_depth_cache() - Start sorting")
logger.debug(f"BinanceLocalDepthCacheManager._sort_depth_cache() - Start sorting "
f"{'with nogil ...' if cython.compiled else '...'}")
sorted_items = [[float(price), float(quantity)] for price, quantity in items.items()]
sorted_items = sorted(sorted_items, key=itemgetter(0), reverse=reverse)
if threshold_volume is None:
Expand Down Expand Up @@ -764,24 +769,11 @@ def get_asks(self,
:type threshold_volume: float or None (0 is nothing, None is everything)
:return: list
"""
if market is None:
raise DepthCacheNotFound(market=market)
market = market.lower()
try:
if self.is_depth_cache_synchronized(market=market) is False:
raise DepthCacheOutOfSync(market=market)
except KeyError:
raise DepthCacheNotFound(market=market)
try:
if self.is_stop_request(market=market) is True:
raise DepthCacheAlreadyStopped(market=market)
except KeyError:
raise DepthCacheNotFound(market=market)
with self.threading_lock_ask[market]:
return self._sort_depth_cache(items=self.depth_caches[market]['asks'],
limit_count=limit_count,
reverse=False,
threshold_volume=threshold_volume)
return self._get_book_side(market=market,
limit_count=limit_count,
reverse=True,
side="asks",
threshold_volume=threshold_volume)

def get_bids(self,
market: str = None,
Expand All @@ -798,6 +790,31 @@ def get_bids(self,
:type threshold_volume: float or None (0 is nothing, None is everything)
:return: list
"""
return self._get_book_side(market=market,
limit_count=limit_count,
reverse=True,
side="bids",
threshold_volume=threshold_volume)

def _get_book_side(self,
market: str = None,
limit_count: int = None,
reverse: bool = False,
side: str = "",
threshold_volume: float = None) -> list:
"""
Get the current list of bids with price and quantity.
:param market: Specify the market symbol for the used depth_cache.
:type market: str
:param limit_count: List elements threshold to trim the result.
:type limit_count: int or None (0 is nothing, None is everything)
:param reverse: False is regular, True is reversed
:type reverse: bool
:param threshold_volume: Volume threshold to trim the result.
:type threshold_volume: float or None (0 is nothing, None is everything)
:return: list
"""
if market is None:
raise DepthCacheNotFound(market=market)
market = market.lower()
Expand All @@ -812,9 +829,9 @@ def get_bids(self,
except KeyError:
raise DepthCacheNotFound(market=market)
with self.threading_lock_bid[market]:
return self._sort_depth_cache(items=self.depth_caches[market]['bids'],
return self._sort_depth_cache(items=self.depth_caches[market][side],
limit_count=limit_count,
reverse=True,
reverse=reverse,
threshold_volume=threshold_volume)

@staticmethod
Expand Down

0 comments on commit 559792b

Please sign in to comment.