diff --git a/src/ATrade/Driver/Junction/QuoteThread.hs b/src/ATrade/Driver/Junction/QuoteThread.hs index 779a6f4..d11c61a 100644 --- a/src/ATrade/Driver/Junction/QuoteThread.hs +++ b/src/ATrade/Driver/Junction/QuoteThread.hs @@ -4,6 +4,7 @@ {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeSynonymInstances #-} @@ -23,7 +24,9 @@ module ATrade.Driver.Junction.QuoteThread import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (..)) import ATrade.Driver.Junction.QuoteStream (QuoteSubscription (..), SubscriptionId (SubscriptionId)) -import ATrade.Logging (Message) +import ATrade.Logging (Message, logDebug, + logInfo, + logWarning) import ATrade.Quotes.HistoryProvider (HistoryProvider (..)) import ATrade.Quotes.QHP (QHPHandle, requestHistoryFromQHP) import ATrade.Quotes.QTIS (TickerInfo (tiLotSize, tiTickSize, tiTicker), @@ -110,28 +113,32 @@ startQuoteThread barsRef tiRef ctx ep secparams downloadThreadRunner logger = do downloaderTid <- liftIO . forkIO $ downloadThreadRunner (downloaderThread env dChan) return $ QuoteThreadHandle tid downloaderTid env where - downloaderThread env chan = forever $ do - QuoteSubscription tickerid tf <- liftIO $ readChan chan - paramsMap <- liftIO $ readIORef $ paramsCache env - mbParams <- case M.lookup tickerid paramsMap of - Nothing -> do - paramsList <- getInstrumentParameters [tickerid] - case paramsList of - (params:_) -> liftIO $ atomicModifyIORef' (paramsCache env) (\m -> (M.insert tickerid params m, Just params)) - _ -> return Nothing - Just params -> return $ Just params - barsMap <- liftIO $ readIORef (bars env) - case M.lookup (BarSeriesId tickerid tf) barsMap of - Just _ -> return () -- already downloaded - Nothing -> case mbParams of - Just params -> do - now <- liftIO getCurrentTime - -- Load data in interval [today - 60days; today + 1day]. +1 day guarantees that we will download data up until current time. - -- If we don't make this adjustment it is possible that we will get data only up to beginning of current day. - barsData <- getHistory tickerid tf ((-86400 * 60) `addUTCTime` now) (86400 `addUTCTime` now) - let barSeries = BarSeries tickerid tf barsData params - liftIO $ atomicModifyIORef' (bars env) (\m -> (M.insert (BarSeriesId tickerid tf) barSeries m, ())) - _ -> return () -- TODO log + downloaderThread env chan = do + logInfo "QuoteThread" "Started" + forever $ do + QuoteSubscription tickerid tf <- liftIO $ readChan chan + logInfo "QuoteThread" $ "Subscription: " <> tickerid + paramsMap <- liftIO $ readIORef $ paramsCache env + mbParams <- case M.lookup tickerid paramsMap of + Nothing -> do + paramsList <- getInstrumentParameters [tickerid] + case paramsList of + (params:_) -> liftIO $ atomicModifyIORef' (paramsCache env) (\m -> (M.insert tickerid params m, Just params)) + _ -> return Nothing + Just params -> return $ Just params + logDebug "QuoteThread" $ "Got info params: " <> (T.pack . show $ mbParams) + barsMap <- liftIO $ readIORef (bars env) + case M.lookup (BarSeriesId tickerid tf) barsMap of + Just _ -> return () -- already downloaded + Nothing -> case mbParams of + Just params -> do + now <- liftIO getCurrentTime + -- Load data in interval [today - 60days; today + 1day]. +1 day guarantees that we will download data up until current time. + -- If we don't make this adjustment it is possible that we will get data only up to beginning of current day. + barsData <- getHistory tickerid tf ((-86400 * 60) `addUTCTime` now) (86400 `addUTCTime` now) + let barSeries = BarSeries tickerid tf barsData params + liftIO $ atomicModifyIORef' (bars env) (\m -> (M.insert (BarSeriesId tickerid tf) barSeries m, ())) + _ -> logWarning "QuoteThread" $ "Unable to find parameters for: " <> (T.pack . show $ BarSeriesId tickerid tf) quoteThread env chan = flip runReaderT env $ forever $ do