Source code for btgsolutions_otcmarkets.websocket.market_data


from typing import Optional, List
from ..rest import Authenticator
from ..config import BASE_WS_URL, MAX_WS_RECONNECT_RETRIES
from .websocket_default_functions import _on_open, _on_message, _on_error, _on_close
import websocket 
import json
import ssl
import threading

def _default_callback_market_status(msg):
    message = msg["message"]
    status = message["status"]
    market_type = message["market"]
    print(f'Market is {status} for {market_type} orders')

def _default_callback_top_of_book_update(msg):
    tob = msg["message"]
    
    headers = ["Symbol", "Qty. Bid", "Vol. Bid", "Bid", "Offer", "Vol. Offer", "Qty. Offer", "Datetime"]
    data = [
        tob['symbol'],
        tob['size_buy'] if tob.get('size_buy') else 'N/A',
        round(tob['pu_buy'] * tob['size_buy'], 1) if tob.get('pu_buy') and tob.get('size_buy') else 'N/A',
        tob['rate_buy'] if tob.get('rate_buy') else 'N/A',
        tob['rate_sell'] if tob.get('rate_sell') else 'N/A',
        round(tob['pu_sell'] * tob['size_sell'], 1) if tob.get('pu_sell') and tob.get('size_sell') else 'N/A',
        tob['size_sell'] if tob.get('size_sell') else 'N/A',
        tob['timestamp'] if tob.get('timestamp') else 'N/A',
    ]

    col_widths = [max(len(header), len(str(value))) for header, value in zip(headers, data)]
    separator = "+" + "+".join('-' * (w + 2) for w in col_widths) + "+"

    print('Top Of Book update:')
    print(separator)
    print("| " + " | ".join(f"{header:{w}}" for header, w in zip(headers, col_widths)) + " |")
    print(separator)
    print("| " + " | ".join(f"{value:{w}}" for value, w in zip(data, col_widths)) + " |")
    print(separator)


[docs] class MarketDataStream: """ This class connects with OTC Markets Market Data WebSocket, providing an easy way to access our real time market data feed. * Main use case: >>> from btgsolutions_otcmarkets import MarketDataStream >>> ws = MarketDataStream( >>> api_key='YOUR_API_KEY', >>> ) >>> ws.run() >>> ws.subscribe(['ZTEST01','ZTEST02']) >>> ws.unsubscribe(['ZTEST01']) >>> ws.close() Parameters ---------------- api_key: str User identification key. Field is required. ssl: bool Enable or disable ssl configuration. Field is not required. Default: True (enable). """ def __init__( self, api_key:str, ssl:Optional[bool] = True, **kwargs, ): self.api_key = api_key self.ssl = ssl self.__authenticator = Authenticator(self.api_key) self.__nro_reconnect_retries = 0 self.url = BASE_WS_URL + "/stream/v1/btg-otc-mkts/mktdata" self.websocket_cfg = kwargs
[docs] def run( self, on_tob_update=None, on_market_status=None, on_open=None, on_error=None, on_close=None, reconnect=True, ): """ Initializes a connection to websocket and starts to receive high frequency news. Parameters ---------- on_tob_update: function - Called every time it receives a top of book update message. - Arguments: 1. Top Of Book update message. - Field is not required. - Default: print message. on_market_status: function - Called every time it receives a market status message. - Arguments: 1. Market status message. - Field is not required. - Default: print message. on_open: function - Called at opening connection to websocket. - Field is not required. - Default: prints 'open connection', in case of success. on_error: function - Called when a error occurs. - Arguments: 1. Exception object. - Field is not required. - Default: prints error. on_close: function - Called when connection is closed. - Arguments: 1. close_status_code. 2. close_msg. - Field is not required. - Default: prints 'closed connection'. reconnect: bool Try reconnect if connection is closed. Field is not required. Default: True. """ if on_tob_update is None: on_tob_update = _default_callback_top_of_book_update if on_market_status is None: on_market_status = _default_callback_market_status if on_open is None: on_open = _on_open if on_error is None: on_error = _on_error if on_close is None: on_close = _on_close def intermediary_on_open(ws): on_open() self.__nro_reconnect_retries = 0 def intermediary_on_message(ws, data): msg = json.loads(data) if msg["feed"] == "market-status": on_market_status(msg) elif msg["feed"] == "book": on_tob_update(msg) else: print(msg) def intermediary_on_error(ws, error): on_error(error) def intermediary_on_close(ws, close_status_code, close_msg): on_close(close_status_code, close_msg) if reconnect: if self.__nro_reconnect_retries == MAX_WS_RECONNECT_RETRIES: print(f"### Fail retriyng reconnect") return self.__nro_reconnect_retries +=1 print(f"### Reconnecting.... Attempts: {self.__nro_reconnect_retries}/{MAX_WS_RECONNECT_RETRIES}") self.run(on_tob_update, on_market_status, on_open, on_error, on_close, reconnect) self.ws = websocket.WebSocketApp( url=self.url, on_open=intermediary_on_open, on_message=intermediary_on_message, on_error=intermediary_on_error, on_close=intermediary_on_close, header={ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.54 Safari/537.36", "Sec-WebSocket-Protocol": self.__authenticator.token, } ) ssl_conf = {} if self.ssl else {"sslopt": {"cert_reqs": ssl.CERT_NONE}} wst = threading.Thread(target=self.ws.run_forever, kwargs=ssl_conf) wst.daemon = True wst.start() while True: if self.ws.sock is not None and self.ws.sock.connected: break pass
def __send(self, data): """ Class method to be used internally. Sends data to websocket. """ if not isinstance(data, str): data = json.dumps(data) print(f'Sending data: {data}') return self.ws.send(data)
[docs] def close(self): """ Closes connection with websocket. """ self.ws.close()
[docs] def subscribe(self, tickers: List[str]): """ Subscribe to start receiving market data updates about the provided ticker symbols, such as trades and book updates. Parameters ---------- tickers: list[str] - List of tickers to subscribe. - Field is required. """ if not isinstance(tickers, list): raise Exception("'tickers' parameter must be a list of strings") self.__send({'action':'subscribe', 'feed': 'book', 'symbols': tickers})
[docs] def unsubscribe(self, tickers: List[str]): """ Unsubscribe to stop receiving market data updates about the provided ticker symbols, such as trades and book updates. Parameters ---------- tickers: list[str] - List of tickers to unsubscribe. - Field is required. """ if not isinstance(tickers, list): raise Exception("'tickers' parameter must be a list of strings") self.__send({'action':'unsubscribe', 'feed': 'book', 'symbols': tickers})