#!/usr/bin/env python
# coding=utf-8
from operator import itemgetter
import time
from .websockets import BinanceSocketManager
[ドキュメント]class DepthCache(object):
[ドキュメント] def __init__(self, symbol):
"""Intialise the DepthCache
:param symbol: Symbol to create depth cache for
:type symbol: string
"""
self.symbol = symbol
self._bids = {}
self._asks = {}
[ドキュメント] def add_bid(self, bid):
"""Add a bid to the cache
:param bid:
:return:
"""
self._bids[bid[0]] = float(bid[1])
if bid[1] == "0.00000000":
del self._bids[bid[0]]
[ドキュメント] def add_ask(self, ask):
"""Add an ask to the cache
:param ask:
:return:
"""
self._asks[ask[0]] = float(ask[1])
if ask[1] == "0.00000000":
del self._asks[ask[0]]
[ドキュメント] def get_bids(self):
"""Get the current bids
:return: list of bids with price and quantity as floats
.. code-block:: python
[
[
0.0001946, # Price
45.0 # Quantity
],
[
0.00019459,
2384.0
],
[
0.00019158,
5219.0
],
[
0.00019157,
1180.0
],
[
0.00019082,
287.0
]
]
"""
return DepthCache.sort_depth(self._bids, reverse=True)
[ドキュメント] def get_asks(self):
"""Get the current asks
:return: list of asks with price and quantity as floats
.. code-block:: python
[
[
0.0001955, # Price
57.0' # Quantity
],
[
0.00019699,
778.0
],
[
0.000197,
64.0
],
[
0.00019709,
1130.0
],
[
0.0001971,
385.0
]
]
"""
return DepthCache.sort_depth(self._asks, reverse=False)
[ドキュメント] @staticmethod
def sort_depth(vals, reverse=False):
"""Sort bids or asks by price
"""
lst = [[float(price), quantity] for price, quantity in vals.items()]
lst = sorted(lst, key=itemgetter(0), reverse=reverse)
return lst
[ドキュメント]class DepthCacheManager(object):
_default_refresh = 60 * 30 # 30 minutes
[ドキュメント] def __init__(self, client, symbol, callback=None, refresh_interval=_default_refresh):
"""Initialise the DepthCacheManager
:param client: Binance API client
:type client: binance.Client
:param symbol: Symbol to create depth cache for
:type symbol: string
:param callback: Optional function to receive depth cache updates
:type callback: function
:param refresh_interval: Optional number of seconds between cache refresh, use 0 or None to disable
:type refresh_interval: int
"""
self._client = client
self._symbol = symbol
self._callback = callback
self._last_update_id = None
self._depth_message_buffer = []
self._bm = None
self._depth_cache = DepthCache(self._symbol)
self._refresh_interval = refresh_interval
self._start_socket()
self._init_cache()
def _init_cache(self):
"""Initialise the depth cache calling REST endpoint
:return:
"""
self._last_update_id = None
self._depth_message_buffer = []
res = self._client.get_order_book(symbol=self._symbol, limit=500)
# process bid and asks from the order book
for bid in res['bids']:
self._depth_cache.add_bid(bid)
for ask in res['asks']:
self._depth_cache.add_ask(ask)
# set first update id
self._last_update_id = res['lastUpdateId']
# set a time to refresh the depth cache
if self._refresh_interval:
self._refresh_time = int(time.time()) + self._refresh_interval
# Apply any updates from the websocket
for msg in self._depth_message_buffer:
self._process_depth_message(msg, buffer=True)
# clear the depth buffer
del self._depth_message_buffer
def _start_socket(self):
"""Start the depth cache socket
:return:
"""
self._bm = BinanceSocketManager(self._client)
self._bm.start_depth_socket(self._symbol, self._depth_event)
self._bm.start()
# wait for some socket responses
while not len(self._depth_message_buffer):
time.sleep(1)
def _depth_event(self, msg):
"""Handle a depth event
:param msg:
:return:
"""
if 'e' in msg and msg['e'] == 'error':
# close the socket
self.close()
# notify the user by returning a None value
if self._callback:
self._callback(None)
if self._last_update_id is None:
# Initial depth snapshot fetch not yet performed, buffer messages
self._depth_message_buffer.append(msg)
else:
self._process_depth_message(msg)
def _process_depth_message(self, msg, buffer=False):
"""Process a depth event message.
:param msg: Depth event message.
:return:
"""
if buffer and msg['u'] <= self._last_update_id:
# ignore any updates before the initial update id
return
elif msg['U'] != self._last_update_id + 1:
# if not buffered check we get sequential updates
# otherwise init cache again
self._init_cache()
# add any bid or ask values
for bid in msg['b']:
self._depth_cache.add_bid(bid)
for ask in msg['a']:
self._depth_cache.add_ask(ask)
# call the callback with the updated depth cache
if self._callback:
self._callback(self._depth_cache)
self._last_update_id = msg['u']
# after processing event see if we need to refresh the depth cache
if self._refresh_interval and int(time.time()) > self._refresh_time:
self._init_cache()
[ドキュメント] def get_depth_cache(self):
"""Get the current depth cache
:return: DepthCache object
"""
return self._depth_cache
[ドキュメント] def close(self):
"""Close the open socket for this manager
:return:
"""
self._bm.close()
self._depth_cache = None