Source code for aio_kraken_ws.kraken_ws

""" Module with the interface to interact with Kraken websocket API.
https://www.kraken.com/features/websocket-api
"""
import logging

from aio_kraken_ws.stream import Stream


logger = logging.getLogger(__name__)


def _cast_datasets(datasets):
    """ Ensure cast of datasets."""
    return [(str(pair), int(interval)) for pair, interval in datasets]


[docs]class KrakenWs: """ Interface to handle Kraken ws. :: kraken = await KrakenWs.create(callback) # or # kraken = KrakenWs(callback) # await kraken.start() await kraken.subscribe([("XBT/EUR", 1)]) # start subscription for interval 1 minute # → await callback("XBT/EUR", # pair # 1, # interval # 1568315100, # open timestamp # 8000.0, # open price # 8001.0, # high price # 7999.0, # low price # 8000.0, # close price # 15) # volume await kraken.subscribe([("ETH/EUR", 1)]) # new subscription, doesn't stop the previous one await kraken.unsubscribe([("XBT/EUR", 1), ("ETH/EUR", 1)]) # end of subscriptions await kraken.close() # shutdown → close ws """ def __init__(self, callback, url="wss://ws.kraken.com", reset_period=300): """ :param callback: coroutine call on new candle. Arguments are coroutine(pair: str, o: float, h: float, l: float, c: float, v: float, interval: int) :param str url: Kraken ws endpoint :param int reset_period: time in seconds. The ws connection is reset periodically. """ logger.debug(f"connect to {url}") self.callback = callback self.url = url self.reset_period = reset_period self._stream = None self._pending_subscribe = [] self._pending_unsubscribe = []
[docs] @classmethod async def create(cls, callback, url="wss://ws.kraken.com", reset_period=300): """ Create and returns a running instance of KrakenWs. :param callback: coroutine call on new candle. Arguments are coroutine(pair: str, o: float, h: float, l: float, c: float, v: float, interval: int) :param str url: Kraken ws endpoint :param int reset_period: time in seconds. The ws connection is reset periodically. :returns: KrakenWs instance """ kraken = cls(callback, url, reset_period) await kraken.start() logger.debug("KrakenWs instance created and started") return kraken
[docs] async def start(self): """ Start the stream. """ logger.debug("starting...") self._stream = Stream(self.url, self.callback, self.reset_period) for datasets in self._pending_subscribe: self._stream.subscribe(datasets) self._pending_subscribe = [] for datasets in self._pending_unsubscribe: self._stream.unsubscribe(datasets) self._pending_unsubscribe = [] self._stream.start()
@property def is_connected(self) -> bool: """ Returns true if at least on connection is currently ready to receive data. """ return self._stream.is_connected if self._stream else False
[docs] def subscribe(self, datasets): """ Stream datasets. :param list((str,int)) datasets: list of (pair name, interval), interval is a time period in minute """ cast = _cast_datasets(datasets) logger.debug(f"subscribe to {cast}") if self._stream is not None: self._stream.subscribe(cast) else: self._pending_subscribe.append(cast)
[docs] def unsubscribe(self, datasets): """ Stop streams. :param list((str,int)) datasets: list of (pair name, interval), interval is a time period in minute """ cast = _cast_datasets(datasets) logger.debug(f"unsubscribe to {cast}") if self._stream is not None: self._stream.unsubscribe(cast) else: self._pending_unsubscribe.append(cast)
@property def datasets(self): """ Datasets currently streamed. :returns: List of pair name, interval :rtype: list((str,int)) """ if self._stream is None: return [] return list(self._stream.datasets)
[docs] async def close(self): """ Stop all streams. """ logger.info("Shutdown kraken ws.") if self._stream is not None: await self._stream.close()