From 6d8c2e0af16b1eae70e33d04aa4ee4a4ea331613 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Thu, 19 Oct 2017 08:42:31 +0700 Subject: [PATCH] PaperBroker: more robust QTIS requests --- src/Broker/PaperBroker.hs | 44 ++++++++++++++++++++++++++++++++------- 1 file changed, 37 insertions(+), 7 deletions(-) diff --git a/src/Broker/PaperBroker.hs b/src/Broker/PaperBroker.hs index 2431d83..83b95e6 100644 --- a/src/Broker/PaperBroker.hs +++ b/src/Broker/PaperBroker.hs @@ -21,7 +21,7 @@ import Data.Time.Clock import Data.Maybe import Control.Monad import Control.Concurrent.BoundedChan -import Control.Concurrent hiding (readChan) +import Control.Concurrent hiding (readChan, writeChan) import System.Log.Logger import ATrade.Quotes.QTIS import System.ZMQ4 @@ -36,6 +36,7 @@ data QTISResult = Fetching | Done TickerInfo data PaperBrokerState = PaperBrokerState { pbTid :: Maybe ThreadId, + qtisTid :: Maybe ThreadId, tickMap :: M.Map TickMapKey Tick, tickerInfoMap :: M.Map TickerId QTISResult, orders :: M.Map OrderId Order, @@ -61,6 +62,7 @@ mkPaperBroker :: Context -> T.Text -> BoundedChan Tick -> Price -> [T.Text] -> I mkPaperBroker ctx qtisEp tickChan startCash accounts = do state <- newIORef PaperBrokerState { pbTid = Nothing, + qtisTid = Nothing, tickMap = M.empty, tickerInfoMap = M.empty, orders = M.empty, @@ -77,26 +79,54 @@ mkPaperBroker ctx qtisEp tickChan startCash accounts = do postMarketCloseTime = hourMin 15 50 } - tid <- forkIO $ brokerThread ctx qtisEp tickChan state + qtisRequestChan <- newBoundedChan 10000 + + tid <- forkIO $ brokerThread qtisRequestChan tickChan state atomicModifyIORef' state (\s -> (s { pbTid = Just tid }, ())) + qtid <- forkIO $ qtisThread state qtisRequestChan ctx qtisEp + atomicModifyIORef' state (\s -> (s { qtisTid = Just qtid }, ())) + return BrokerInterface { accounts = accounts, setNotificationCallback = pbSetNotificationCallback state, submitOrder = pbSubmitOrder state, cancelOrder = pbCancelOrder state, stopBroker = pbDestroyBroker state } + +qtisThread :: IORef PaperBrokerState -> BoundedChan TickerId -> Context -> T.Text -> IO () +qtisThread state qtisRequestChan ctx qtisEndpoint = + forever $ do + threadDelay 1000000 + tickerIds <- readListFromChan qtisRequestChan + ti <- qtisGetTickersInfo ctx qtisEndpoint tickerIds + forM_ ti (\newInfo -> atomicModifyIORef' state (\s -> (s { tickerInfoMap = M.insert (tiTicker newInfo) (Done newInfo) $! tickerInfoMap s }, ()))) + where + readListFromChan chan = do + mh <- tryReadChan chan + case mh of + Just h -> do + t <- readListFromChan' [h] chan + return $ reverse t + _ -> do + h <- readChan chan + t <- readListFromChan' [h] chan + return $ reverse t -brokerThread :: Context -> T.Text -> BoundedChan Tick -> IORef PaperBrokerState -> IO () -brokerThread ctx qtisEp chan state = forever $ do + readListFromChan' h chan = do + mv <- tryReadChan chan + case mv of + Nothing -> return h + Just v -> readListFromChan' (v:h) chan + +brokerThread :: BoundedChan TickerId -> BoundedChan Tick -> IORef PaperBrokerState -> IO () +brokerThread qtisRequestChan chan state = forever $ do tick <- readChan chan when (datatype tick == LastTradePrice) $ do info <- M.lookup (security tick) . tickerInfoMap <$> readIORef state when (isNothing info) $ do atomicModifyIORef' state (\s -> (s { tickerInfoMap = M.insert (security tick) Fetching $! tickerInfoMap s }, ())) - void $ forkIO $ do - ti <- qtisGetTickersInfo ctx qtisEp [security tick] - forM_ ti (\newInfo -> atomicModifyIORef' state (\s -> (s { tickerInfoMap = M.insert (security tick) (Done newInfo) $! tickerInfoMap s }, ()))) + writeChan qtisRequestChan (security tick) atomicModifyIORef' state (\s -> (s { tickMap = M.insert (makeKey tick) tick $! tickMap s }, ())) executePendingOrders tick state