binance.websockets のソースコード

#!/usr/bin/env python
# coding=utf-8

import json
import threading

from autobahn.twisted.websocket import WebSocketClientFactory, \
    WebSocketClientProtocol, \
    connectWS
from twisted.internet import reactor, ssl
from twisted.internet.protocol import ReconnectingClientFactory
from twisted.internet.error import ReactorAlreadyRunning

from binance.client import Client


[ドキュメント]class BinanceClientProtocol(WebSocketClientProtocol):
[ドキュメント] def onConnect(self, response): # reset the delay after reconnecting self.factory.resetDelay()
[ドキュメント] def onMessage(self, payload, isBinary): if not isBinary: try: payload_obj = json.loads(payload.decode('utf8')) except ValueError: pass else: self.factory.callback(payload_obj)
[ドキュメント]class BinanceReconnectingClientFactory(ReconnectingClientFactory): # set initial delay to a short time initialDelay = 0.1 maxDelay = 10 maxRetries = 5
[ドキュメント]class BinanceClientFactory(WebSocketClientFactory, BinanceReconnectingClientFactory): protocol = BinanceClientProtocol _reconnect_error_payload = { 'e': 'error', 'm': 'Max reconnect retries reached' }
[ドキュメント] def clientConnectionFailed(self, connector, reason): self.retry(connector) if self.retries > self.maxRetries: self.callback(self._reconnect_error_payload)
[ドキュメント] def clientConnectionLost(self, connector, reason): self.retry(connector) if self.retries > self.maxRetries: self.callback(self._reconnect_error_payload)
[ドキュメント]class BinanceSocketManager(threading.Thread): STREAM_URL = 'wss://stream.binance.com:9443/' WEBSOCKET_DEPTH_5 = '5' WEBSOCKET_DEPTH_10 = '10' WEBSOCKET_DEPTH_20 = '20' _user_timeout = 30 * 60 # 30 minutes
[ドキュメント] def __init__(self, client): """Initialise the BinanceSocketManager :param client: Binance API client :type client: binance.Client """ threading.Thread.__init__(self) self._conns = {} self._user_timer = None self._user_listen_key = None self._user_callback = None self._client = client
def _start_socket(self, path, callback, prefix='ws/'): if path in self._conns: return False factory_url = self.STREAM_URL + prefix + path factory = BinanceClientFactory(factory_url) factory.protocol = BinanceClientProtocol factory.callback = callback factory.reconnect = True context_factory = ssl.ClientContextFactory() self._conns[path] = connectWS(factory, context_factory) return path
[ドキュメント] def start_depth_socket(self, symbol, callback, depth=None): """Start a websocket for symbol market depth returning either a diff or a partial book https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md#partial-book-depth-streams :param symbol: required :type symbol: str :param callback: callback function to handle messages :type callback: function :param depth: optional Number of depth entries to return, default None. If passed returns a partial book instead of a diff :type depth: enum :returns: connection key string if successful, False otherwise Partial Message Format .. code-block:: python { "lastUpdateId": 160, # Last update ID "bids": [ # Bids to be updated [ "0.0024", # price level to be updated "10", # quantity [] # ignore ] ], "asks": [ # Asks to be updated [ "0.0026", # price level to be updated "100", # quantity [] # ignore ] ] } Diff Message Format .. code-block:: python { "e": "depthUpdate", # Event type "E": 123456789, # Event time "s": "BNBBTC", # Symbol "U": 157, # First update ID in event "u": 160, # Final update ID in event "b": [ # Bids to be updated [ "0.0024", # price level to be updated "10", # quantity [] # ignore ] ], "a": [ # Asks to be updated [ "0.0026", # price level to be updated "100", # quantity [] # ignore ] ] } """ socket_name = symbol.lower() + '@depth' if depth and depth != '1': socket_name = '{}{}'.format(socket_name, depth) return self._start_socket(socket_name, callback)
[ドキュメント] def start_kline_socket(self, symbol, callback, interval=Client.KLINE_INTERVAL_1MINUTE): """Start a websocket for symbol kline data https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md#klinecandlestick-streams :param symbol: required :type symbol: str :param callback: callback function to handle messages :type callback: function :param interval: Kline interval, default KLINE_INTERVAL_1MINUTE :type interval: enum :returns: connection key string if successful, False otherwise Message Format .. code-block:: python { "e": "kline", # event type "E": 1499404907056, # event time "s": "ETHBTC", # symbol "k": { "t": 1499404860000, # start time of this bar "T": 1499404919999, # end time of this bar "s": "ETHBTC", # symbol "i": "1m", # interval "f": 77462, # first trade id "L": 77465, # last trade id "o": "0.10278577", # open "c": "0.10278645", # close "h": "0.10278712", # high "l": "0.10278518", # low "v": "17.47929838", # volume "n": 4, # number of trades "x": false, # whether this bar is final "q": "1.79662878", # quote volume "V": "2.34879839", # volume of active buy "Q": "0.24142166", # quote volume of active buy "B": "13279784.01349473" # can be ignored } } """ socket_name = '{}@kline_{}'.format(symbol.lower(), interval) return self._start_socket(socket_name, callback)
[ドキュメント] def start_trade_socket(self, symbol, callback): """Start a websocket for symbol trade data https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md#trade-streams :param symbol: required :type symbol: str :param callback: callback function to handle messages :type callback: function :returns: connection key string if successful, False otherwise Message Format .. code-block:: python { "e": "trade", # Event type "E": 123456789, # Event time "s": "BNBBTC", # Symbol "t": 12345, # Trade ID "p": "0.001", # Price "q": "100", # Quantity "b": 88, # Buyer order Id "a": 50, # Seller order Id "T": 123456785, # Trade time "m": true, # Is the buyer the market maker? "M": true # Ignore. } """ return self._start_socket(symbol.lower() + '@trade', callback)
[ドキュメント] def start_aggtrade_socket(self, symbol, callback): """Start a websocket for symbol trade data https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md#aggregate-trade-streams :param symbol: required :type symbol: str :param callback: callback function to handle messages :type callback: function :returns: connection key string if successful, False otherwise Message Format .. code-block:: python { "e": "aggTrade", # event type "E": 1499405254326, # event time "s": "ETHBTC", # symbol "a": 70232, # aggregated tradeid "p": "0.10281118", # price "q": "8.15632997", # quantity "f": 77489, # first breakdown trade id "l": 77489, # last breakdown trade id "T": 1499405254324, # trade time "m": false, # whether buyer is a maker "M": true # can be ignored } """ return self._start_socket(symbol.lower() + '@aggTrade', callback)
[ドキュメント] def start_symbol_ticker_socket(self, symbol, callback): """Start a websocket for a symbol's ticker data https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md#individual-symbol-ticker-streams :param symbol: required :type symbol: str :param callback: callback function to handle messages :type callback: function :returns: connection key string if successful, False otherwise Message Format .. code-block:: python { "e": "24hrTicker", # Event type "E": 123456789, # Event time "s": "BNBBTC", # Symbol "p": "0.0015", # Price change "P": "250.00", # Price change percent "w": "0.0018", # Weighted average price "x": "0.0009", # Previous day's close price "c": "0.0025", # Current day's close price "Q": "10", # Close trade's quantity "b": "0.0024", # Best bid price "B": "10", # Bid bid quantity "a": "0.0026", # Best ask price "A": "100", # Best ask quantity "o": "0.0010", # Open price "h": "0.0025", # High price "l": "0.0010", # Low price "v": "10000", # Total traded base asset volume "q": "18", # Total traded quote asset volume "O": 0, # Statistics open time "C": 86400000, # Statistics close time "F": 0, # First trade ID "L": 18150, # Last trade Id "n": 18151 # Total number of trades } """ return self._start_socket(symbol.lower() + '@ticker', callback)
[ドキュメント] def start_ticker_socket(self, callback): """Start a websocket for all ticker data By default all markets are included in an array. https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md#all-market-tickers-stream :param callback: callback function to handle messages :type callback: function :returns: connection key string if successful, False otherwise Message Format .. code-block:: python [ { 'F': 278610, 'o': '0.07393000', 's': 'BCCBTC', 'C': 1509622420916, 'b': '0.07800800', 'l': '0.07160300', 'h': '0.08199900', 'L': 287722, 'P': '6.694', 'Q': '0.10000000', 'q': '1202.67106335', 'p': '0.00494900', 'O': 1509536020916, 'a': '0.07887800', 'n': 9113, 'B': '1.00000000', 'c': '0.07887900', 'x': '0.07399600', 'w': '0.07639068', 'A': '2.41900000', 'v': '15743.68900000' } ] """ return self._start_socket('!ticker@arr', callback)
[ドキュメント] def start_multiplex_socket(self, streams, callback): """Start a multiplexed socket using a list of socket names. User stream sockets can not be included. Symbols in socket name must be lowercase i.e bnbbtc@aggTrade, neobtc@ticker Combined stream events are wrapped as follows: {"stream":"<streamName>","data":<rawPayload>} https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md :param streams: list of stream names in lower case :type streams: list :param callback: callback function to handle messages :type callback: function :returns: connection key string if successful, False otherwise Message Format - see Binance API docs for all types """ stream_path = 'streams={}'.format('/'.join(streams)) return self._start_socket(stream_path, callback, 'stream?')
[ドキュメント] def start_user_socket(self, callback): """Start a websocket for user data https://www.binance.com/restapipub.html#user-wss-endpoint :param callback: callback function to handle messages :type callback: function :returns: connection key string if successful, False otherwise Message Format - see Binance API docs for all types """ if self._user_listen_key: # cleanup any sockets with this key for conn_key in self._conns: if len(conn_key) >= 60 and conn_key[:60] == self._user_listen_key: self.stop_socket(conn_key) break self._user_listen_key = self._client.stream_get_listen_key() self._user_callback = callback conn_key = self._start_socket(self._user_listen_key, callback) if conn_key: # start timer to keep socket alive self._start_user_timer() return conn_key
def _start_user_timer(self): self._user_timer = threading.Timer(self._user_timeout, self._keepalive_user_socket) self._user_timer.setDaemon(True) self._user_timer.start() def _keepalive_user_socket(self): listen_key = self._client.stream_get_listen_key() # check if they key changed and if listen_key != self._user_listen_key: self.start_user_socket(self._user_callback) self._start_user_timer()
[ドキュメント] def stop_socket(self, conn_key): """Stop a websocket given the connection key :param conn_key: Socket connection key :type conn_key: string :returns: connection key string if successful, False otherwise """ if conn_key not in self._conns: return # disable reconnecting if we are closing self._conns[conn_key].factory = WebSocketClientFactory(self.STREAM_URL + 'tmp_path') self._conns[conn_key].disconnect() del(self._conns[conn_key]) # check if we have a user stream socket if len(conn_key) >= 60 and conn_key[:60] == self._user_listen_key: self._stop_user_socket()
def _stop_user_socket(self): if not self._user_listen_key: return # stop the timer self._user_timer.cancel() self._user_timer = None # close the stream self._client.stream_close(listenKey=self._user_listen_key) self._user_listen_key = None
[ドキュメント] def run(self): try: reactor.run(installSignalHandlers=False) except ReactorAlreadyRunning: # Ignore error about reactor already running pass
[ドキュメント] def close(self): """Close all connections """ keys = set(self._conns.keys()) for key in keys: self.stop_socket(key) self._conns = {}