|
|
|
@ -4,6 +4,7 @@ |
|
|
|
{-# LANGUAGE GeneralizedNewtypeDeriving #-} |
|
|
|
{-# LANGUAGE GeneralizedNewtypeDeriving #-} |
|
|
|
{-# LANGUAGE LambdaCase #-} |
|
|
|
{-# LANGUAGE LambdaCase #-} |
|
|
|
{-# LANGUAGE MultiParamTypeClasses #-} |
|
|
|
{-# LANGUAGE MultiParamTypeClasses #-} |
|
|
|
|
|
|
|
{-# LANGUAGE OverloadedStrings #-} |
|
|
|
{-# LANGUAGE ScopedTypeVariables #-} |
|
|
|
{-# LANGUAGE ScopedTypeVariables #-} |
|
|
|
{-# LANGUAGE TypeSynonymInstances #-} |
|
|
|
{-# LANGUAGE TypeSynonymInstances #-} |
|
|
|
|
|
|
|
|
|
|
|
@ -23,7 +24,9 @@ module ATrade.Driver.Junction.QuoteThread |
|
|
|
import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (..)) |
|
|
|
import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (..)) |
|
|
|
import ATrade.Driver.Junction.QuoteStream (QuoteSubscription (..), |
|
|
|
import ATrade.Driver.Junction.QuoteStream (QuoteSubscription (..), |
|
|
|
SubscriptionId (SubscriptionId)) |
|
|
|
SubscriptionId (SubscriptionId)) |
|
|
|
import ATrade.Logging (Message) |
|
|
|
import ATrade.Logging (Message, logDebug, |
|
|
|
|
|
|
|
logInfo, |
|
|
|
|
|
|
|
logWarning) |
|
|
|
import ATrade.Quotes.HistoryProvider (HistoryProvider (..)) |
|
|
|
import ATrade.Quotes.HistoryProvider (HistoryProvider (..)) |
|
|
|
import ATrade.Quotes.QHP (QHPHandle, requestHistoryFromQHP) |
|
|
|
import ATrade.Quotes.QHP (QHPHandle, requestHistoryFromQHP) |
|
|
|
import ATrade.Quotes.QTIS (TickerInfo (tiLotSize, tiTickSize, tiTicker), |
|
|
|
import ATrade.Quotes.QTIS (TickerInfo (tiLotSize, tiTickSize, tiTicker), |
|
|
|
@ -110,28 +113,32 @@ startQuoteThread barsRef tiRef ctx ep secparams downloadThreadRunner logger = do |
|
|
|
downloaderTid <- liftIO . forkIO $ downloadThreadRunner (downloaderThread env dChan) |
|
|
|
downloaderTid <- liftIO . forkIO $ downloadThreadRunner (downloaderThread env dChan) |
|
|
|
return $ QuoteThreadHandle tid downloaderTid env |
|
|
|
return $ QuoteThreadHandle tid downloaderTid env |
|
|
|
where |
|
|
|
where |
|
|
|
downloaderThread env chan = forever $ do |
|
|
|
downloaderThread env chan = do |
|
|
|
QuoteSubscription tickerid tf <- liftIO $ readChan chan |
|
|
|
logInfo "QuoteThread" "Started" |
|
|
|
paramsMap <- liftIO $ readIORef $ paramsCache env |
|
|
|
forever $ do |
|
|
|
mbParams <- case M.lookup tickerid paramsMap of |
|
|
|
QuoteSubscription tickerid tf <- liftIO $ readChan chan |
|
|
|
Nothing -> do |
|
|
|
logInfo "QuoteThread" $ "Subscription: " <> tickerid |
|
|
|
paramsList <- getInstrumentParameters [tickerid] |
|
|
|
paramsMap <- liftIO $ readIORef $ paramsCache env |
|
|
|
case paramsList of |
|
|
|
mbParams <- case M.lookup tickerid paramsMap of |
|
|
|
(params:_) -> liftIO $ atomicModifyIORef' (paramsCache env) (\m -> (M.insert tickerid params m, Just params)) |
|
|
|
Nothing -> do |
|
|
|
_ -> return Nothing |
|
|
|
paramsList <- getInstrumentParameters [tickerid] |
|
|
|
Just params -> return $ Just params |
|
|
|
case paramsList of |
|
|
|
barsMap <- liftIO $ readIORef (bars env) |
|
|
|
(params:_) -> liftIO $ atomicModifyIORef' (paramsCache env) (\m -> (M.insert tickerid params m, Just params)) |
|
|
|
case M.lookup (BarSeriesId tickerid tf) barsMap of |
|
|
|
_ -> return Nothing |
|
|
|
Just _ -> return () -- already downloaded |
|
|
|
Just params -> return $ Just params |
|
|
|
Nothing -> case mbParams of |
|
|
|
logDebug "QuoteThread" $ "Got info params: " <> (T.pack . show $ mbParams) |
|
|
|
Just params -> do |
|
|
|
barsMap <- liftIO $ readIORef (bars env) |
|
|
|
now <- liftIO getCurrentTime |
|
|
|
case M.lookup (BarSeriesId tickerid tf) barsMap of |
|
|
|
-- Load data in interval [today - 60days; today + 1day]. +1 day guarantees that we will download data up until current time. |
|
|
|
Just _ -> return () -- already downloaded |
|
|
|
-- If we don't make this adjustment it is possible that we will get data only up to beginning of current day. |
|
|
|
Nothing -> case mbParams of |
|
|
|
barsData <- getHistory tickerid tf ((-86400 * 60) `addUTCTime` now) (86400 `addUTCTime` now) |
|
|
|
Just params -> do |
|
|
|
let barSeries = BarSeries tickerid tf barsData params |
|
|
|
now <- liftIO getCurrentTime |
|
|
|
liftIO $ atomicModifyIORef' (bars env) (\m -> (M.insert (BarSeriesId tickerid tf) barSeries m, ())) |
|
|
|
-- Load data in interval [today - 60days; today + 1day]. +1 day guarantees that we will download data up until current time. |
|
|
|
_ -> return () -- TODO log |
|
|
|
-- If we don't make this adjustment it is possible that we will get data only up to beginning of current day. |
|
|
|
|
|
|
|
barsData <- getHistory tickerid tf ((-86400 * 60) `addUTCTime` now) (86400 `addUTCTime` now) |
|
|
|
|
|
|
|
let barSeries = BarSeries tickerid tf barsData params |
|
|
|
|
|
|
|
liftIO $ atomicModifyIORef' (bars env) (\m -> (M.insert (BarSeriesId tickerid tf) barSeries m, ())) |
|
|
|
|
|
|
|
_ -> logWarning "QuoteThread" $ "Unable to find parameters for: " <> (T.pack . show $ BarSeriesId tickerid tf) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
quoteThread env chan = flip runReaderT env $ forever $ do |
|
|
|
quoteThread env chan = flip runReaderT env $ forever $ do |
|
|
|
|