diff --git a/alpaca_backtrader_api/alpacadata.py b/alpaca_backtrader_api/alpacadata.py index ea345342..192e5845 100644 --- a/alpaca_backtrader_api/alpacadata.py +++ b/alpaca_backtrader_api/alpacadata.py @@ -2,6 +2,7 @@ unicode_literals) from datetime import timedelta +import logging import pandas as pd from backtrader.feed import DataBase from backtrader import date2num, num2date @@ -159,6 +160,7 @@ def __init__(self, **kwargs): self._candleFormat = 'bidask' if self.p.bidask else 'midpoint' self._timeframe = self.p.timeframe self.do_qcheck(True, 0) + self.logger = logging.getLogger(self.__class__.__name__) if self._timeframe not in [bt.TimeFrame.Ticks, bt.TimeFrame.Minutes, bt.TimeFrame.Days]: @@ -179,6 +181,7 @@ def start(self): contractdetails if it exists """ super(AlpacaData, self).start() + self.logger.info("Starting data feed: %s" % self.p.dataname) # Create attributes as soon as possible self._statelivereconn = False # if reconnecting in live state @@ -268,9 +271,11 @@ def _load(self): if self._state == self._ST_LIVE: try: msg = (self._storedmsg.pop(None, None) or - self.qlive.get(timeout=self._qcheck)) + self.qlive.get(timeout=self.p.qcheck)) except queue.Empty: return None # indicate timeout situation + + self.logger.debug("Got msg: %s" % msg) if msg is None: # Conn broken during historical/backfilling self.put_notification(self.CONNBROKEN) # Try to reconnect diff --git a/alpaca_backtrader_api/alpacastore.py b/alpaca_backtrader_api/alpacastore.py index b9f0ffca..7f81955d 100644 --- a/alpaca_backtrader_api/alpacastore.py +++ b/alpaca_backtrader_api/alpacastore.py @@ -1,5 +1,6 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) +import logging import os import collections import time @@ -125,6 +126,7 @@ def __init__( except RuntimeError: asyncio.set_event_loop(asyncio.new_event_loop()) + self.logger = logging.getLogger(self.__class__.__name__) self.conn = Stream(api_key, api_secret, base_url, @@ -135,6 +137,7 @@ def __init__( self.q = q def run(self): + self.logger.info("Starting streamer for: %s %s" % (self.instrument, self.method.name)) if self.method == StreamingMethod.AccountUpdate: self.conn.subscribe_trade_updates(self.on_trade) elif self.method == StreamingMethod.MinuteAgg: @@ -152,16 +155,20 @@ async def on_listen(self, conn, stream, msg): async def on_quotes(self, msg): msg._raw['time'] = msg.timestamp + self.logger.debug("Got: %s" % msg) self.q.put(msg._raw) async def on_agg_min(self, msg): msg._raw['time'] = msg.timestamp + self.logger.debug("Got: %s" % msg) self.q.put(msg._raw) async def on_account(self, msg): + self.logger.debug("Got: %s" % msg) self.q.put(msg) async def on_trade(self, msg): + self.logger.debug("Got: %s" % msg) self.q.put(msg) @@ -193,6 +200,9 @@ class AlpacaStore(with_metaclass(MetaSingleton, object)): - ``account_tmout`` (default: ``10.0``): refresh period for account value/cash refresh + + - ``order_tmout`` (default: ``0.05``): how often the order creation queue + is checked within _t_create_order ''' BrokerCls = None # broker class will autoregister @@ -203,6 +213,7 @@ class AlpacaStore(with_metaclass(MetaSingleton, object)): ('secret_key', ''), ('paper', False), ('account_tmout', 10.0), # account balance refresh timeout + ('order_tmout', 0.05), ('api_version', None) ) @@ -224,6 +235,7 @@ def getbroker(cls, *args, **kwargs): def __init__(self): super(AlpacaStore, self).__init__() + self.logger = logging.getLogger(self.__class__.__name__) self.notifs = collections.deque() # store notifications for cerebro @@ -492,6 +504,7 @@ def get_aggs_from_alpaca(self, but we need to manipulate it to be able to work with it smoothly """ + self.logger.debug(f"Getting aggs for: {dataname} from: {start} to {end} by {compression} {granularity}") def _granularity_to_timeframe(granularity): if granularity in [Granularity.Minute, Granularity.Ticks]: @@ -554,6 +567,8 @@ def _drop_early_samples(df): return df[i:] def _resample(df): + if df.empty: + return df """ samples returned with certain window size (1 day, 1 minute) user may want to work with different window size (5min) @@ -581,10 +596,11 @@ def _resample(df): timeframe = _granularity_to_timeframe(granularity) start = end - timedelta(days=1) response = self.oapi.get_bars(dataname, - timeframe, start, end)._raw + timeframe, start, end).df else: response = _iterate_api_calls() cdl = response + self.logger.debug(f"Got: {response}") if granularity == Granularity.Minute: cdl = _clear_out_of_market_hours(cdl) cdl = _drop_early_samples(cdl) @@ -753,9 +769,10 @@ def _check_if_transaction_occurred(order_id): while True: try: - if self.q_ordercreate.empty(): + try: + msg = self.q_ordercreate.get(timeout=self.p.order_tmout) + except queue.Empty: continue - msg = self.q_ordercreate.get() if msg is None: continue oref, okwargs = msg