From a0f7db50603259bca8aaf63650117e64eae80030 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Sun, 24 Dec 2017 09:38:59 +0700 Subject: [PATCH] Refactoring --- app/Main.hs | 28 ++++++------ quik-connector.cabal | 5 ++- src/Broker/PaperBroker.hs | 82 ++++++++--------------------------- src/Broker/QuikBroker.hs | 42 +++++++++++------- src/TickTable.hs | 90 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 154 insertions(+), 93 deletions(-) create mode 100644 src/TickTable.hs diff --git a/app/Main.hs b/app/Main.hs index 39f3843..a4b96a7 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -36,17 +36,20 @@ import qualified Data.Text as T import Data.Maybe import Config +import TickTable (mkTickTable) -forkBoundedChan :: Int -> BoundedChan Tick -> IO (ThreadId, BoundedChan Tick, BoundedChan QuoteSourceServerData) +forkBoundedChan :: Int -> BoundedChan Tick -> IO (ThreadId, BoundedChan Tick, BoundedChan Tick, BoundedChan QuoteSourceServerData) forkBoundedChan size sourceChan = do - sink <- newBoundedChan size + sink1 <- newBoundedChan size + sink2 <- newBoundedChan size sinkQss <- newBoundedChan size tid <- forkIO $ forever $ do v <- readChan sourceChan - writeChan sink v + writeChan sink1 v + writeChan sink2 v writeChan sinkQss (QSSTick v) - return (tid, sink, sinkQss) + return (tid, sink1, sink2, sinkQss) initLogging :: IO () @@ -70,19 +73,20 @@ main = do infoM "main" "Starting data import server" _ <- initDataImportServer [MkTableParser $ mkAllParamsTableParser "allparams"] chan "atrade" - (forkId, c1, c2) <- forkBoundedChan 10000 chan + (forkId, c0, c1, c2) <- forkBoundedChan 10000 chan - brokerQ <- mkQuikBroker (dllPath config) (quikPath config) (quikAccounts config) (commissions config) withContext (\ctx -> do - brokerP <- mkPaperBroker ctx (T.pack $ qtisEndpoint config) c1 1000000 ["demo"] (commissions config) + tickTable <- mkTickTable c0 ctx (T.pack $ qtisEndpoint config) + brokerQ <- mkQuikBroker tickTable (dllPath config) (quikPath config) (quikAccounts config) (commissions config) + brokerP <- mkPaperBroker tickTable c1 1000000 ["demo"] (commissions config) withZapHandler ctx (\zap -> do - zapSetWhitelist zap $ whitelist config - zapSetBlacklist zap $ blacklist config + zapSetWhitelist zap "global" $ whitelist config + zapSetBlacklist zap "global" $ blacklist config case brokerClientCertificateDir config of Just certFile -> do certs <- loadCertificatesFromDirectory certFile - forM_ certs (\cert -> zapAddClientCertificate zap cert) + forM_ certs (\cert -> zapAddClientCertificate zap "global" cert) Nothing -> return () serverCert <- case brokerServerCertPath config of @@ -100,7 +104,7 @@ main = do bracket (forkIO $ pipeReaderThread ctx config) killThread (\_ -> do withZMQTradeSink ctx (tradeSink config) (\zmqTradeSink -> do withTelegramTradeSink (telegramToken config) (telegramChatId config) (\telegramTradeSink -> do - bracket (startQuoteSourceServer c2 ctx (T.pack $ quotesourceEndpoint config)) stopQuoteSourceServer (\_ -> do + bracket (startQuoteSourceServer c2 ctx (T.pack $ quotesourceEndpoint config) (Just "global")) stopQuoteSourceServer (\_ -> do bracket (startBrokerServer [brokerP, brokerQ] ctx (T.pack $ brokerserverEndpoint config) [telegramTradeSink, zmqTradeSink] serverParams) stopBrokerServer (\_ -> do void $ Gtk.init Nothing window <- new Gtk.Window [ #title := "Quik connector" ] @@ -120,7 +124,7 @@ main = do (Just pipe, Just qsep) -> do tickChan <- newBoundedChan 10000 bracket (startPipeReader (T.pack pipe) tickChan) stopPipeReader (\_ -> do - bracket (startQuoteSourceServer tickChan ctx (T.pack qsep)) stopQuoteSourceServer (\_ -> threadDelay 1000000)) + bracket (startQuoteSourceServer tickChan ctx (T.pack qsep) (Just "global")) stopQuoteSourceServer (\_ -> threadDelay 1000000)) _ -> return () diff --git a/quik-connector.cabal b/quik-connector.cabal index 8310d48..83ced18 100644 --- a/quik-connector.cabal +++ b/quik-connector.cabal @@ -1,5 +1,5 @@ name: quik-connector -version: 0.2.0.1 +version: 0.2.1.0 synopsis: Atrade-Quik Connector application description: Please see README.md homepage: https://github.com/asakul/quik-connector @@ -25,6 +25,7 @@ library , Network.Telegram , ATrade.Quotes.QTIS , Commissions + , TickTable ghc-options: -Wall -Wunsupported-calling-conventions build-depends: base >= 4.7 && < 5 , Win32 @@ -48,7 +49,7 @@ library , aeson , cond , scientific - , libatrade == 0.3.0.0 + , libatrade == 0.4.0.0 , deepseq , errors , split diff --git a/src/Broker/PaperBroker.hs b/src/Broker/PaperBroker.hs index 7ab3010..8bd1e96 100644 --- a/src/Broker/PaperBroker.hs +++ b/src/Broker/PaperBroker.hs @@ -26,20 +26,11 @@ import ATrade.Quotes.QTIS import System.ZMQ4 import Commissions (CommissionConfig(..)) - -data TickMapKey = TickMapKey !T.Text !DataType - deriving (Show, Eq, Ord) - -instance Hashable TickMapKey where - hashWithSalt salt (TickMapKey s dt) = hashWithSalt salt s `xor` hashWithSalt salt (fromEnum dt) - -data QTISResult = Fetching | Done TickerInfo +import TickTable (TickTableH, TickKey(..), getTick, getTickerInfo) data PaperBrokerState = PaperBrokerState { pbTid :: Maybe ThreadId, - qtisTid :: Maybe ThreadId, - tickMap :: M.Map TickMapKey Tick, - tickerInfoMap :: M.Map TickerId QTISResult, + tickTable :: TickTableH, orders :: M.Map OrderId Order, cash :: !Price, notificationCallback :: Maybe (Notification -> IO ()), @@ -60,13 +51,11 @@ data PaperBrokerState = PaperBrokerState { hourMin :: Integer -> Integer -> DiffTime hourMin h m = fromIntegral $ h * 3600 + m * 60 -mkPaperBroker :: Context -> T.Text -> BoundedChan Tick -> Price -> [T.Text] -> [CommissionConfig] -> IO BrokerInterface -mkPaperBroker ctx qtisEp tickChan startCash accounts comms = do +mkPaperBroker :: TickTableH -> BoundedChan Tick -> Price -> [T.Text] -> [CommissionConfig] -> IO BrokerInterface +mkPaperBroker tickTableH tickChan startCash accounts comms = do state <- newIORef PaperBrokerState { pbTid = Nothing, - qtisTid = Nothing, - tickMap = M.empty, - tickerInfoMap = M.empty, + tickTable = tickTableH, orders = M.empty, cash = startCash, notificationCallback = Nothing, @@ -82,14 +71,9 @@ mkPaperBroker ctx qtisEp tickChan startCash accounts comms = do commissions = comms } - qtisRequestChan <- newBoundedChan 10000 - - tid <- forkIO $ brokerThread qtisRequestChan tickChan state + tid <- forkIO $ brokerThread tickChan state atomicModifyIORef' state (\s -> (s { pbTid = Just tid }, ())) - qtid <- forkIO $ qtisThread state qtisRequestChan ctx qtisEp - atomicModifyIORef' state (\s -> (s { qtisTid = Just qtid }, ())) - return BrokerInterface { accounts = accounts, setNotificationCallback = pbSetNotificationCallback state, @@ -97,46 +81,13 @@ mkPaperBroker ctx qtisEp tickChan startCash accounts comms = do cancelOrder = pbCancelOrder state, stopBroker = pbDestroyBroker state } -qtisThread :: IORef PaperBrokerState -> BoundedChan TickerId -> Context -> T.Text -> IO () -qtisThread state qtisRequestChan ctx qtisEndpoint = - forever $ do - threadDelay 1000000 - tickerIds <- readListFromChan qtisRequestChan - ti <- qtisGetTickersInfo ctx qtisEndpoint tickerIds - forM_ ti (\newInfo -> atomicModifyIORef' state (\s -> (s { tickerInfoMap = M.insert (tiTicker newInfo) (Done newInfo) $! tickerInfoMap s }, ()))) - where - readListFromChan chan = do - mh <- tryReadChan chan - case mh of - Just h -> do - t <- readListFromChan' [h] chan - return $ reverse t - _ -> do - h <- readChan chan - t <- readListFromChan' [h] chan - return $ reverse t - - readListFromChan' h chan = do - mv <- tryReadChan chan - case mv of - Nothing -> return h - Just v -> readListFromChan' (v:h) chan -brokerThread :: BoundedChan TickerId -> BoundedChan Tick -> IORef PaperBrokerState -> IO () -brokerThread qtisRequestChan chan state = forever $ do +brokerThread :: BoundedChan Tick -> IORef PaperBrokerState -> IO () +brokerThread chan state = forever $ do tick <- readChan chan - when (datatype tick == LastTradePrice) $ do - info <- M.lookup (security tick) . tickerInfoMap <$> readIORef state - when (isNothing info) $ do - atomicModifyIORef' state (\s -> (s { tickerInfoMap = M.insert (security tick) Fetching $! tickerInfoMap s }, ())) - writeChan qtisRequestChan (security tick) - - atomicModifyIORef' state (\s -> (s { tickMap = M.insert (makeKey tick) tick $! tickMap s }, ())) marketOpenTime' <- marketOpenTime <$> readIORef state when ((utctDayTime . timestamp) tick >= marketOpenTime') $ executePendingOrders tick state - where - makeKey !tick = TickMapKey (security $! tick) (datatype tick) executePendingOrders tick state = do po <- pendingOrders <$> readIORef state @@ -210,9 +161,10 @@ executeAtTick state order tick = do maybeCall notificationCallback state $ OrderNotification (orderId order) Executed where obtainTickerInfo tickerId = do - mInfo <- M.lookup tickerId . tickerInfoMap <$> readIORef state + table <- tickTable <$> readIORef state + mInfo <- getTickerInfo table tickerId case mInfo of - Just (Done info) -> return info + Just info -> return info _ -> return TickerInfo { tiTicker = tickerId, tiLotSize = 1, tiTickSize = 1 } @@ -234,8 +186,9 @@ pbSubmitOrder state order = do where executeMarketOrder state order = do - tm <- tickMap <$> readIORef state - case M.lookup key tm of + tm <- tickTable <$> readIORef state + tickMb <- getTick tm key + case tickMb of Nothing -> rejectOrder state order Just tick -> if orderQuantity order /= 0 then executeAtTick state order tick @@ -243,9 +196,10 @@ pbSubmitOrder state order = do submitLimitOrder price state order = if orderQuantity order == 0 then rejectOrder state order else do - tm <- tickMap <$> readIORef state + tm <- tickTable <$> readIORef state + tickMb <- getTick tm key debugM "PaperBroker" $ "Limit order submitted, looking up: " ++ show key - case M.lookup key tm of + case tickMb of Nothing -> do let newOrder = order { orderState = Submitted } atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s }, ())) @@ -267,7 +221,7 @@ pbSubmitOrder state order = do Buy -> BestOffer Sell -> BestBid - key = TickMapKey (orderSecurity order) orderDatatype + key = TickKey (orderSecurity order) orderDatatype pbCancelOrder :: IORef PaperBrokerState -> OrderId -> IO Bool pbCancelOrder state oid = do diff --git a/src/Broker/QuikBroker.hs b/src/Broker/QuikBroker.hs index 56924ee..19fd0b3 100644 --- a/src/Broker/QuikBroker.hs +++ b/src/Broker/QuikBroker.hs @@ -8,6 +8,7 @@ module Broker.QuikBroker ( import ATrade.Types import ATrade.Broker.Protocol import ATrade.Broker.Server +import ATrade.Quotes.QTIS (TickerInfo(..)) import Broker.QuikBroker.Trans2QuikApi hiding (tradeAccount) @@ -30,6 +31,7 @@ import System.Log.Logger import Safe import Commissions (CommissionConfig(..)) +import TickTable (TickTableH, getTick, getTickerInfo, TickKey(..)) type QuikOrderId = Integer @@ -39,7 +41,8 @@ data QuikBrokerState = QuikBrokerState { orderMap :: M.Map OrderId Order, orderIdMap :: BM.Bimap QuikOrderId OrderId, trans2orderid :: M.Map Integer Order, - transIdCounter :: Integer + transIdCounter :: Integer, + tickTable :: TickTableH } nextTransId state = atomicModifyIORef' state (\s -> (s { transIdCounter = transIdCounter s + 1 }, transIdCounter s)) @@ -50,8 +53,8 @@ maybeCall proj state arg = do Just callback -> callback arg Nothing -> return () -mkQuikBroker :: FilePath -> FilePath -> [T.Text] -> [CommissionConfig] -> IO BrokerInterface -mkQuikBroker dllPath quikPath accs comms = do +mkQuikBroker :: TickTableH -> FilePath -> FilePath -> [T.Text] -> [CommissionConfig] -> IO BrokerInterface +mkQuikBroker tt dllPath quikPath accs comms = do q <- mkQuik dllPath quikPath msgChan <- newBoundedChan 100 @@ -62,7 +65,8 @@ mkQuikBroker dllPath quikPath accs comms = do orderMap = M.empty, orderIdMap = BM.empty, trans2orderid = M.empty, - transIdCounter = 1 + transIdCounter = 1, + tickTable = tt } setCallbacks q (qbTransactionCallback state) (qbOrderCallback state) (qbTradeCallback state comms) @@ -83,14 +87,20 @@ qbSubmitOrder state order = do transId <- nextTransId state atomicModifyIORef' state (\s -> (s { trans2orderid = M.insert transId order (trans2orderid s) }, ())) - case makeTransactionString transId order of - Just transStr -> do - rc <- quikSendTransaction q transStr - debugM "Quik" $ "Sending transaction string: " ++ transStr - case rc of - Left errmsg -> warningM "Quik" $ "Unable to send transaction: " ++ T.unpack errmsg - Right _ -> debugM "Quik" $ "Order submitted: " ++ show order - Nothing -> warningM "Quik" $ "Unable to compose transaction string: " ++ show order + tt <- tickTable <$> readIORef state + tickerInfoMb <- getTickerInfo tt (orderSecurity order) + liquidTickMb <- getTick tt (TickKey (orderSecurity order) (if orderOperation order == Buy then BestOffer else BestBid)) + case (tickerInfoMb, liquidTickMb) of + (Just tickerInfo, Just liquidTick) -> + case makeTransactionString tickerInfo liquidTick transId order of + Just transStr -> do + rc <- quikSendTransaction q transStr + debugM "Quik" $ "Sending transaction string: " ++ transStr + case rc of + Left errmsg -> warningM "Quik" $ "Unable to send transaction: " ++ T.unpack errmsg + Right _ -> debugM "Quik" $ "Order submitted: " ++ show order + Nothing -> warningM "Quik" $ "Unable to compose transaction string: " ++ show order + _ -> warningM "Quik" $ "Unable to obtain data: " ++ show tickerInfoMb ++ "/" ++ show liquidTickMb qbCancelOrder state orderid = do @@ -110,7 +120,7 @@ qbCancelOrder state orderid = do qbStopBroker state = return () -makeTransactionString transId order = +makeTransactionString tickerInfo liquidTick transId order = case (classcode, seccode, accountTransactionString) of (Just cCode, Just sCode, Just accountStr) -> Just $ accountStr ++ @@ -124,7 +134,7 @@ makeTransactionString transId order = _ -> Nothing where orderTypeCode = case orderPrice order of - Market -> "M" + Market -> "L" Limit _ -> "L" _ -> "X" operationCode = case orderOperation order of @@ -133,7 +143,9 @@ makeTransactionString transId order = classcode = headMay . splitOn "#" . T.unpack $ orderSecurity order seccode = (`atMay` 1) . splitOn "#" . T.unpack $ orderSecurity order price = case orderPrice order of - Market -> "0" + Market -> if orderOperation order == Buy + then removeTrailingZeros . show $ value liquidTick - 10 * tiTickSize tickerInfo + else removeTrailingZeros . show $ value liquidTick + 10 * tiTickSize tickerInfo Limit p -> removeTrailingZeros . show $ p _ -> "0" removeTrailingZeros v = if '.' `L.elem` v then L.dropWhileEnd (== '.') . L.dropWhileEnd (== '0') $ v else v diff --git a/src/TickTable.hs b/src/TickTable.hs new file mode 100644 index 0000000..6237d29 --- /dev/null +++ b/src/TickTable.hs @@ -0,0 +1,90 @@ +{-# LANGUAGE MultiWayIf #-} + +module TickTable ( + mkTickTable, + TickKey(..), + getTick, + getTickerInfo, + TickTableH +) where + +import ATrade.Types (DataType(..), TickerId(..), Price(..), Tick(..)) + +import ATrade.Quotes.QTIS (qtisGetTickersInfo, TickerInfo(..)) + +import Control.Concurrent (forkIO, ThreadId, threadDelay) +import Control.Concurrent.BoundedChan (BoundedChan, newBoundedChan, readChan, tryReadChan, writeChan) +import Control.Concurrent.MVar (newEmptyMVar) + +import Control.Monad (forM_, when, void) + +import Data.Maybe (catMaybes, isNothing) +import Data.IORef (IORef, newIORef, atomicModifyIORef', readIORef) + +import qualified Data.Map.Strict as M +import qualified Data.Text as T + +import System.ZMQ4 (Context) + +data TickKey = TickKey TickerId DataType + deriving (Show, Ord, Eq) + +data TickTable = TickTable { + ticks :: M.Map TickKey Tick, + tickerInfo :: M.Map TickerId TickerInfo +} + +type TickTableH = IORef TickTable + +data QTISThreadRequest = RequestTickerInfo TickerId | Shutdown + +mkTickTable :: BoundedChan Tick -> Context -> T.Text -> IO (IORef TickTable) +mkTickTable chan ctx qtisEndpoint = do + shutdownMVar <- newEmptyMVar + qtisChan <- newBoundedChan 10000 + r <- newIORef TickTable { ticks = M.empty, + tickerInfo = M.empty } + qtisTid <- forkIO $ qtisThread r qtisChan ctx qtisEndpoint + void $ forkIO $ tickTableThread qtisChan r shutdownMVar qtisTid + return r + where + tickTableThread qtisChan r shutdownMVar qtisTid = do + t <- readChan chan + atomicModifyIORef' r (\s -> (s { ticks = M.insert (TickKey (security t) (datatype t)) t $! ticks s }, ())) + when (datatype t == LastTradePrice) $ do + infoMap <- tickerInfo <$> readIORef r + when (isNothing $ M.lookup (security t) infoMap) $ + writeChan qtisChan $ RequestTickerInfo (security t) + + qtisThread r qtisChan ctx qtisEndpoint = do + threadDelay 1000000 + requests <- readListFromChan qtisChan + ti <- qtisGetTickersInfo ctx qtisEndpoint (catMaybes $ fmap requestToTicker requests) + forM_ ti (\newInfo -> atomicModifyIORef' r (\s -> (s { tickerInfo = M.insert (tiTicker newInfo) newInfo $! tickerInfo s }, ()))) + + requestToTicker (RequestTickerInfo t) = Just t + requestToTicker Shutdown = Nothing + + readListFromChan chan = do + mh <- tryReadChan chan + case mh of + Just h -> do + t <- readListFromChan' [h] chan + return $ reverse t + _ -> do + h <- readChan chan + t <- readListFromChan' [h] chan + return $ reverse t + + readListFromChan' h chan = do + mv <- tryReadChan chan + case mv of + Nothing -> return h + Just v -> readListFromChan' (v:h) chan + +getTick :: TickTableH -> TickKey -> IO (Maybe Tick) +getTick r key = M.lookup key . ticks <$> readIORef r + +getTickerInfo :: TickTableH -> TickerId -> IO (Maybe TickerInfo) +getTickerInfo r tickerId = M.lookup tickerId . tickerInfo <$> readIORef r +