Browse Source

PaperBroker: more robust QTIS requests

master
Denis Tereshkin 8 years ago
parent
commit
6d8c2e0af1
  1. 44
      src/Broker/PaperBroker.hs

44
src/Broker/PaperBroker.hs

@ -21,7 +21,7 @@ import Data.Time.Clock @@ -21,7 +21,7 @@ import Data.Time.Clock
import Data.Maybe
import Control.Monad
import Control.Concurrent.BoundedChan
import Control.Concurrent hiding (readChan)
import Control.Concurrent hiding (readChan, writeChan)
import System.Log.Logger
import ATrade.Quotes.QTIS
import System.ZMQ4
@ -36,6 +36,7 @@ data QTISResult = Fetching | Done TickerInfo @@ -36,6 +36,7 @@ data QTISResult = Fetching | Done TickerInfo
data PaperBrokerState = PaperBrokerState {
pbTid :: Maybe ThreadId,
qtisTid :: Maybe ThreadId,
tickMap :: M.Map TickMapKey Tick,
tickerInfoMap :: M.Map TickerId QTISResult,
orders :: M.Map OrderId Order,
@ -61,6 +62,7 @@ mkPaperBroker :: Context -> T.Text -> BoundedChan Tick -> Price -> [T.Text] -> I @@ -61,6 +62,7 @@ mkPaperBroker :: Context -> T.Text -> BoundedChan Tick -> Price -> [T.Text] -> I
mkPaperBroker ctx qtisEp tickChan startCash accounts = do
state <- newIORef PaperBrokerState {
pbTid = Nothing,
qtisTid = Nothing,
tickMap = M.empty,
tickerInfoMap = M.empty,
orders = M.empty,
@ -77,26 +79,54 @@ mkPaperBroker ctx qtisEp tickChan startCash accounts = do @@ -77,26 +79,54 @@ mkPaperBroker ctx qtisEp tickChan startCash accounts = do
postMarketCloseTime = hourMin 15 50
}
tid <- forkIO $ brokerThread ctx qtisEp tickChan state
qtisRequestChan <- newBoundedChan 10000
tid <- forkIO $ brokerThread qtisRequestChan tickChan state
atomicModifyIORef' state (\s -> (s { pbTid = Just tid }, ()))
qtid <- forkIO $ qtisThread state qtisRequestChan ctx qtisEp
atomicModifyIORef' state (\s -> (s { qtisTid = Just qtid }, ()))
return BrokerInterface {
accounts = accounts,
setNotificationCallback = pbSetNotificationCallback state,
submitOrder = pbSubmitOrder state,
cancelOrder = pbCancelOrder state,
stopBroker = pbDestroyBroker state }
qtisThread :: IORef PaperBrokerState -> BoundedChan TickerId -> Context -> T.Text -> IO ()
qtisThread state qtisRequestChan ctx qtisEndpoint =
forever $ do
threadDelay 1000000
tickerIds <- readListFromChan qtisRequestChan
ti <- qtisGetTickersInfo ctx qtisEndpoint tickerIds
forM_ ti (\newInfo -> atomicModifyIORef' state (\s -> (s { tickerInfoMap = M.insert (tiTicker newInfo) (Done newInfo) $! tickerInfoMap s }, ())))
where
readListFromChan chan = do
mh <- tryReadChan chan
case mh of
Just h -> do
t <- readListFromChan' [h] chan
return $ reverse t
_ -> do
h <- readChan chan
t <- readListFromChan' [h] chan
return $ reverse t
brokerThread :: Context -> T.Text -> BoundedChan Tick -> IORef PaperBrokerState -> IO ()
brokerThread ctx qtisEp chan state = forever $ do
readListFromChan' h chan = do
mv <- tryReadChan chan
case mv of
Nothing -> return h
Just v -> readListFromChan' (v:h) chan
brokerThread :: BoundedChan TickerId -> BoundedChan Tick -> IORef PaperBrokerState -> IO ()
brokerThread qtisRequestChan chan state = forever $ do
tick <- readChan chan
when (datatype tick == LastTradePrice) $ do
info <- M.lookup (security tick) . tickerInfoMap <$> readIORef state
when (isNothing info) $ do
atomicModifyIORef' state (\s -> (s { tickerInfoMap = M.insert (security tick) Fetching $! tickerInfoMap s }, ()))
void $ forkIO $ do
ti <- qtisGetTickersInfo ctx qtisEp [security tick]
forM_ ti (\newInfo -> atomicModifyIORef' state (\s -> (s { tickerInfoMap = M.insert (security tick) (Done newInfo) $! tickerInfoMap s }, ())))
writeChan qtisRequestChan (security tick)
atomicModifyIORef' state (\s -> (s { tickMap = M.insert (makeKey tick) tick $! tickMap s }, ()))
executePendingOrders tick state

Loading…
Cancel
Save