Browse Source

Refactoring

master
Denis Tereshkin 8 years ago
parent
commit
a0f7db5060
  1. 28
      app/Main.hs
  2. 5
      quik-connector.cabal
  3. 82
      src/Broker/PaperBroker.hs
  4. 42
      src/Broker/QuikBroker.hs
  5. 90
      src/TickTable.hs

28
app/Main.hs

@ -36,17 +36,20 @@ import qualified Data.Text as T @@ -36,17 +36,20 @@ import qualified Data.Text as T
import Data.Maybe
import Config
import TickTable (mkTickTable)
forkBoundedChan :: Int -> BoundedChan Tick -> IO (ThreadId, BoundedChan Tick, BoundedChan QuoteSourceServerData)
forkBoundedChan :: Int -> BoundedChan Tick -> IO (ThreadId, BoundedChan Tick, BoundedChan Tick, BoundedChan QuoteSourceServerData)
forkBoundedChan size sourceChan = do
sink <- newBoundedChan size
sink1 <- newBoundedChan size
sink2 <- newBoundedChan size
sinkQss <- newBoundedChan size
tid <- forkIO $ forever $ do
v <- readChan sourceChan
writeChan sink v
writeChan sink1 v
writeChan sink2 v
writeChan sinkQss (QSSTick v)
return (tid, sink, sinkQss)
return (tid, sink1, sink2, sinkQss)
initLogging :: IO ()
@ -70,19 +73,20 @@ main = do @@ -70,19 +73,20 @@ main = do
infoM "main" "Starting data import server"
_ <- initDataImportServer [MkTableParser $ mkAllParamsTableParser "allparams"] chan "atrade"
(forkId, c1, c2) <- forkBoundedChan 10000 chan
(forkId, c0, c1, c2) <- forkBoundedChan 10000 chan
brokerQ <- mkQuikBroker (dllPath config) (quikPath config) (quikAccounts config) (commissions config)
withContext (\ctx -> do
brokerP <- mkPaperBroker ctx (T.pack $ qtisEndpoint config) c1 1000000 ["demo"] (commissions config)
tickTable <- mkTickTable c0 ctx (T.pack $ qtisEndpoint config)
brokerQ <- mkQuikBroker tickTable (dllPath config) (quikPath config) (quikAccounts config) (commissions config)
brokerP <- mkPaperBroker tickTable c1 1000000 ["demo"] (commissions config)
withZapHandler ctx (\zap -> do
zapSetWhitelist zap $ whitelist config
zapSetBlacklist zap $ blacklist config
zapSetWhitelist zap "global" $ whitelist config
zapSetBlacklist zap "global" $ blacklist config
case brokerClientCertificateDir config of
Just certFile -> do
certs <- loadCertificatesFromDirectory certFile
forM_ certs (\cert -> zapAddClientCertificate zap cert)
forM_ certs (\cert -> zapAddClientCertificate zap "global" cert)
Nothing -> return ()
serverCert <- case brokerServerCertPath config of
@ -100,7 +104,7 @@ main = do @@ -100,7 +104,7 @@ main = do
bracket (forkIO $ pipeReaderThread ctx config) killThread (\_ -> do
withZMQTradeSink ctx (tradeSink config) (\zmqTradeSink -> do
withTelegramTradeSink (telegramToken config) (telegramChatId config) (\telegramTradeSink -> do
bracket (startQuoteSourceServer c2 ctx (T.pack $ quotesourceEndpoint config)) stopQuoteSourceServer (\_ -> do
bracket (startQuoteSourceServer c2 ctx (T.pack $ quotesourceEndpoint config) (Just "global")) stopQuoteSourceServer (\_ -> do
bracket (startBrokerServer [brokerP, brokerQ] ctx (T.pack $ brokerserverEndpoint config) [telegramTradeSink, zmqTradeSink] serverParams) stopBrokerServer (\_ -> do
void $ Gtk.init Nothing
window <- new Gtk.Window [ #title := "Quik connector" ]
@ -120,7 +124,7 @@ main = do @@ -120,7 +124,7 @@ main = do
(Just pipe, Just qsep) -> do
tickChan <- newBoundedChan 10000
bracket (startPipeReader (T.pack pipe) tickChan) stopPipeReader (\_ -> do
bracket (startQuoteSourceServer tickChan ctx (T.pack qsep)) stopQuoteSourceServer (\_ -> threadDelay 1000000))
bracket (startQuoteSourceServer tickChan ctx (T.pack qsep) (Just "global")) stopQuoteSourceServer (\_ -> threadDelay 1000000))
_ -> return ()

5
quik-connector.cabal

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
name: quik-connector
version: 0.2.0.1
version: 0.2.1.0
synopsis: Atrade-Quik Connector application
description: Please see README.md
homepage: https://github.com/asakul/quik-connector
@ -25,6 +25,7 @@ library @@ -25,6 +25,7 @@ library
, Network.Telegram
, ATrade.Quotes.QTIS
, Commissions
, TickTable
ghc-options: -Wall -Wunsupported-calling-conventions
build-depends: base >= 4.7 && < 5
, Win32
@ -48,7 +49,7 @@ library @@ -48,7 +49,7 @@ library
, aeson
, cond
, scientific
, libatrade == 0.3.0.0
, libatrade == 0.4.0.0
, deepseq
, errors
, split

82
src/Broker/PaperBroker.hs

@ -26,20 +26,11 @@ import ATrade.Quotes.QTIS @@ -26,20 +26,11 @@ import ATrade.Quotes.QTIS
import System.ZMQ4
import Commissions (CommissionConfig(..))
data TickMapKey = TickMapKey !T.Text !DataType
deriving (Show, Eq, Ord)
instance Hashable TickMapKey where
hashWithSalt salt (TickMapKey s dt) = hashWithSalt salt s `xor` hashWithSalt salt (fromEnum dt)
data QTISResult = Fetching | Done TickerInfo
import TickTable (TickTableH, TickKey(..), getTick, getTickerInfo)
data PaperBrokerState = PaperBrokerState {
pbTid :: Maybe ThreadId,
qtisTid :: Maybe ThreadId,
tickMap :: M.Map TickMapKey Tick,
tickerInfoMap :: M.Map TickerId QTISResult,
tickTable :: TickTableH,
orders :: M.Map OrderId Order,
cash :: !Price,
notificationCallback :: Maybe (Notification -> IO ()),
@ -60,13 +51,11 @@ data PaperBrokerState = PaperBrokerState { @@ -60,13 +51,11 @@ data PaperBrokerState = PaperBrokerState {
hourMin :: Integer -> Integer -> DiffTime
hourMin h m = fromIntegral $ h * 3600 + m * 60
mkPaperBroker :: Context -> T.Text -> BoundedChan Tick -> Price -> [T.Text] -> [CommissionConfig] -> IO BrokerInterface
mkPaperBroker ctx qtisEp tickChan startCash accounts comms = do
mkPaperBroker :: TickTableH -> BoundedChan Tick -> Price -> [T.Text] -> [CommissionConfig] -> IO BrokerInterface
mkPaperBroker tickTableH tickChan startCash accounts comms = do
state <- newIORef PaperBrokerState {
pbTid = Nothing,
qtisTid = Nothing,
tickMap = M.empty,
tickerInfoMap = M.empty,
tickTable = tickTableH,
orders = M.empty,
cash = startCash,
notificationCallback = Nothing,
@ -82,14 +71,9 @@ mkPaperBroker ctx qtisEp tickChan startCash accounts comms = do @@ -82,14 +71,9 @@ mkPaperBroker ctx qtisEp tickChan startCash accounts comms = do
commissions = comms
}
qtisRequestChan <- newBoundedChan 10000
tid <- forkIO $ brokerThread qtisRequestChan tickChan state
tid <- forkIO $ brokerThread tickChan state
atomicModifyIORef' state (\s -> (s { pbTid = Just tid }, ()))
qtid <- forkIO $ qtisThread state qtisRequestChan ctx qtisEp
atomicModifyIORef' state (\s -> (s { qtisTid = Just qtid }, ()))
return BrokerInterface {
accounts = accounts,
setNotificationCallback = pbSetNotificationCallback state,
@ -97,46 +81,13 @@ mkPaperBroker ctx qtisEp tickChan startCash accounts comms = do @@ -97,46 +81,13 @@ mkPaperBroker ctx qtisEp tickChan startCash accounts comms = do
cancelOrder = pbCancelOrder state,
stopBroker = pbDestroyBroker state }
qtisThread :: IORef PaperBrokerState -> BoundedChan TickerId -> Context -> T.Text -> IO ()
qtisThread state qtisRequestChan ctx qtisEndpoint =
forever $ do
threadDelay 1000000
tickerIds <- readListFromChan qtisRequestChan
ti <- qtisGetTickersInfo ctx qtisEndpoint tickerIds
forM_ ti (\newInfo -> atomicModifyIORef' state (\s -> (s { tickerInfoMap = M.insert (tiTicker newInfo) (Done newInfo) $! tickerInfoMap s }, ())))
where
readListFromChan chan = do
mh <- tryReadChan chan
case mh of
Just h -> do
t <- readListFromChan' [h] chan
return $ reverse t
_ -> do
h <- readChan chan
t <- readListFromChan' [h] chan
return $ reverse t
readListFromChan' h chan = do
mv <- tryReadChan chan
case mv of
Nothing -> return h
Just v -> readListFromChan' (v:h) chan
brokerThread :: BoundedChan TickerId -> BoundedChan Tick -> IORef PaperBrokerState -> IO ()
brokerThread qtisRequestChan chan state = forever $ do
brokerThread :: BoundedChan Tick -> IORef PaperBrokerState -> IO ()
brokerThread chan state = forever $ do
tick <- readChan chan
when (datatype tick == LastTradePrice) $ do
info <- M.lookup (security tick) . tickerInfoMap <$> readIORef state
when (isNothing info) $ do
atomicModifyIORef' state (\s -> (s { tickerInfoMap = M.insert (security tick) Fetching $! tickerInfoMap s }, ()))
writeChan qtisRequestChan (security tick)
atomicModifyIORef' state (\s -> (s { tickMap = M.insert (makeKey tick) tick $! tickMap s }, ()))
marketOpenTime' <- marketOpenTime <$> readIORef state
when ((utctDayTime . timestamp) tick >= marketOpenTime') $
executePendingOrders tick state
where
makeKey !tick = TickMapKey (security $! tick) (datatype tick)
executePendingOrders tick state = do
po <- pendingOrders <$> readIORef state
@ -210,9 +161,10 @@ executeAtTick state order tick = do @@ -210,9 +161,10 @@ executeAtTick state order tick = do
maybeCall notificationCallback state $ OrderNotification (orderId order) Executed
where
obtainTickerInfo tickerId = do
mInfo <- M.lookup tickerId . tickerInfoMap <$> readIORef state
table <- tickTable <$> readIORef state
mInfo <- getTickerInfo table tickerId
case mInfo of
Just (Done info) -> return info
Just info -> return info
_ -> return TickerInfo { tiTicker = tickerId,
tiLotSize = 1,
tiTickSize = 1 }
@ -234,8 +186,9 @@ pbSubmitOrder state order = do @@ -234,8 +186,9 @@ pbSubmitOrder state order = do
where
executeMarketOrder state order = do
tm <- tickMap <$> readIORef state
case M.lookup key tm of
tm <- tickTable <$> readIORef state
tickMb <- getTick tm key
case tickMb of
Nothing -> rejectOrder state order
Just tick -> if orderQuantity order /= 0
then executeAtTick state order tick
@ -243,9 +196,10 @@ pbSubmitOrder state order = do @@ -243,9 +196,10 @@ pbSubmitOrder state order = do
submitLimitOrder price state order = if orderQuantity order == 0
then rejectOrder state order
else do
tm <- tickMap <$> readIORef state
tm <- tickTable <$> readIORef state
tickMb <- getTick tm key
debugM "PaperBroker" $ "Limit order submitted, looking up: " ++ show key
case M.lookup key tm of
case tickMb of
Nothing -> do
let newOrder = order { orderState = Submitted }
atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s }, ()))
@ -267,7 +221,7 @@ pbSubmitOrder state order = do @@ -267,7 +221,7 @@ pbSubmitOrder state order = do
Buy -> BestOffer
Sell -> BestBid
key = TickMapKey (orderSecurity order) orderDatatype
key = TickKey (orderSecurity order) orderDatatype
pbCancelOrder :: IORef PaperBrokerState -> OrderId -> IO Bool
pbCancelOrder state oid = do

42
src/Broker/QuikBroker.hs

@ -8,6 +8,7 @@ module Broker.QuikBroker ( @@ -8,6 +8,7 @@ module Broker.QuikBroker (
import ATrade.Types
import ATrade.Broker.Protocol
import ATrade.Broker.Server
import ATrade.Quotes.QTIS (TickerInfo(..))
import Broker.QuikBroker.Trans2QuikApi hiding (tradeAccount)
@ -30,6 +31,7 @@ import System.Log.Logger @@ -30,6 +31,7 @@ import System.Log.Logger
import Safe
import Commissions (CommissionConfig(..))
import TickTable (TickTableH, getTick, getTickerInfo, TickKey(..))
type QuikOrderId = Integer
@ -39,7 +41,8 @@ data QuikBrokerState = QuikBrokerState { @@ -39,7 +41,8 @@ data QuikBrokerState = QuikBrokerState {
orderMap :: M.Map OrderId Order,
orderIdMap :: BM.Bimap QuikOrderId OrderId,
trans2orderid :: M.Map Integer Order,
transIdCounter :: Integer
transIdCounter :: Integer,
tickTable :: TickTableH
}
nextTransId state = atomicModifyIORef' state (\s -> (s { transIdCounter = transIdCounter s + 1 }, transIdCounter s))
@ -50,8 +53,8 @@ maybeCall proj state arg = do @@ -50,8 +53,8 @@ maybeCall proj state arg = do
Just callback -> callback arg
Nothing -> return ()
mkQuikBroker :: FilePath -> FilePath -> [T.Text] -> [CommissionConfig] -> IO BrokerInterface
mkQuikBroker dllPath quikPath accs comms = do
mkQuikBroker :: TickTableH -> FilePath -> FilePath -> [T.Text] -> [CommissionConfig] -> IO BrokerInterface
mkQuikBroker tt dllPath quikPath accs comms = do
q <- mkQuik dllPath quikPath
msgChan <- newBoundedChan 100
@ -62,7 +65,8 @@ mkQuikBroker dllPath quikPath accs comms = do @@ -62,7 +65,8 @@ mkQuikBroker dllPath quikPath accs comms = do
orderMap = M.empty,
orderIdMap = BM.empty,
trans2orderid = M.empty,
transIdCounter = 1
transIdCounter = 1,
tickTable = tt
}
setCallbacks q (qbTransactionCallback state) (qbOrderCallback state) (qbTradeCallback state comms)
@ -83,14 +87,20 @@ qbSubmitOrder state order = do @@ -83,14 +87,20 @@ qbSubmitOrder state order = do
transId <- nextTransId state
atomicModifyIORef' state (\s -> (s {
trans2orderid = M.insert transId order (trans2orderid s) }, ()))
case makeTransactionString transId order of
Just transStr -> do
rc <- quikSendTransaction q transStr
debugM "Quik" $ "Sending transaction string: " ++ transStr
case rc of
Left errmsg -> warningM "Quik" $ "Unable to send transaction: " ++ T.unpack errmsg
Right _ -> debugM "Quik" $ "Order submitted: " ++ show order
Nothing -> warningM "Quik" $ "Unable to compose transaction string: " ++ show order
tt <- tickTable <$> readIORef state
tickerInfoMb <- getTickerInfo tt (orderSecurity order)
liquidTickMb <- getTick tt (TickKey (orderSecurity order) (if orderOperation order == Buy then BestOffer else BestBid))
case (tickerInfoMb, liquidTickMb) of
(Just tickerInfo, Just liquidTick) ->
case makeTransactionString tickerInfo liquidTick transId order of
Just transStr -> do
rc <- quikSendTransaction q transStr
debugM "Quik" $ "Sending transaction string: " ++ transStr
case rc of
Left errmsg -> warningM "Quik" $ "Unable to send transaction: " ++ T.unpack errmsg
Right _ -> debugM "Quik" $ "Order submitted: " ++ show order
Nothing -> warningM "Quik" $ "Unable to compose transaction string: " ++ show order
_ -> warningM "Quik" $ "Unable to obtain data: " ++ show tickerInfoMb ++ "/" ++ show liquidTickMb
qbCancelOrder state orderid = do
@ -110,7 +120,7 @@ qbCancelOrder state orderid = do @@ -110,7 +120,7 @@ qbCancelOrder state orderid = do
qbStopBroker state = return ()
makeTransactionString transId order =
makeTransactionString tickerInfo liquidTick transId order =
case (classcode, seccode, accountTransactionString) of
(Just cCode, Just sCode, Just accountStr) -> Just $
accountStr ++
@ -124,7 +134,7 @@ makeTransactionString transId order = @@ -124,7 +134,7 @@ makeTransactionString transId order =
_ -> Nothing
where
orderTypeCode = case orderPrice order of
Market -> "M"
Market -> "L"
Limit _ -> "L"
_ -> "X"
operationCode = case orderOperation order of
@ -133,7 +143,9 @@ makeTransactionString transId order = @@ -133,7 +143,9 @@ makeTransactionString transId order =
classcode = headMay . splitOn "#" . T.unpack $ orderSecurity order
seccode = (`atMay` 1) . splitOn "#" . T.unpack $ orderSecurity order
price = case orderPrice order of
Market -> "0"
Market -> if orderOperation order == Buy
then removeTrailingZeros . show $ value liquidTick - 10 * tiTickSize tickerInfo
else removeTrailingZeros . show $ value liquidTick + 10 * tiTickSize tickerInfo
Limit p -> removeTrailingZeros . show $ p
_ -> "0"
removeTrailingZeros v = if '.' `L.elem` v then L.dropWhileEnd (== '.') . L.dropWhileEnd (== '0') $ v else v

90
src/TickTable.hs

@ -0,0 +1,90 @@ @@ -0,0 +1,90 @@
{-# LANGUAGE MultiWayIf #-}
module TickTable (
mkTickTable,
TickKey(..),
getTick,
getTickerInfo,
TickTableH
) where
import ATrade.Types (DataType(..), TickerId(..), Price(..), Tick(..))
import ATrade.Quotes.QTIS (qtisGetTickersInfo, TickerInfo(..))
import Control.Concurrent (forkIO, ThreadId, threadDelay)
import Control.Concurrent.BoundedChan (BoundedChan, newBoundedChan, readChan, tryReadChan, writeChan)
import Control.Concurrent.MVar (newEmptyMVar)
import Control.Monad (forM_, when, void)
import Data.Maybe (catMaybes, isNothing)
import Data.IORef (IORef, newIORef, atomicModifyIORef', readIORef)
import qualified Data.Map.Strict as M
import qualified Data.Text as T
import System.ZMQ4 (Context)
data TickKey = TickKey TickerId DataType
deriving (Show, Ord, Eq)
data TickTable = TickTable {
ticks :: M.Map TickKey Tick,
tickerInfo :: M.Map TickerId TickerInfo
}
type TickTableH = IORef TickTable
data QTISThreadRequest = RequestTickerInfo TickerId | Shutdown
mkTickTable :: BoundedChan Tick -> Context -> T.Text -> IO (IORef TickTable)
mkTickTable chan ctx qtisEndpoint = do
shutdownMVar <- newEmptyMVar
qtisChan <- newBoundedChan 10000
r <- newIORef TickTable { ticks = M.empty,
tickerInfo = M.empty }
qtisTid <- forkIO $ qtisThread r qtisChan ctx qtisEndpoint
void $ forkIO $ tickTableThread qtisChan r shutdownMVar qtisTid
return r
where
tickTableThread qtisChan r shutdownMVar qtisTid = do
t <- readChan chan
atomicModifyIORef' r (\s -> (s { ticks = M.insert (TickKey (security t) (datatype t)) t $! ticks s }, ()))
when (datatype t == LastTradePrice) $ do
infoMap <- tickerInfo <$> readIORef r
when (isNothing $ M.lookup (security t) infoMap) $
writeChan qtisChan $ RequestTickerInfo (security t)
qtisThread r qtisChan ctx qtisEndpoint = do
threadDelay 1000000
requests <- readListFromChan qtisChan
ti <- qtisGetTickersInfo ctx qtisEndpoint (catMaybes $ fmap requestToTicker requests)
forM_ ti (\newInfo -> atomicModifyIORef' r (\s -> (s { tickerInfo = M.insert (tiTicker newInfo) newInfo $! tickerInfo s }, ())))
requestToTicker (RequestTickerInfo t) = Just t
requestToTicker Shutdown = Nothing
readListFromChan chan = do
mh <- tryReadChan chan
case mh of
Just h -> do
t <- readListFromChan' [h] chan
return $ reverse t
_ -> do
h <- readChan chan
t <- readListFromChan' [h] chan
return $ reverse t
readListFromChan' h chan = do
mv <- tryReadChan chan
case mv of
Nothing -> return h
Just v -> readListFromChan' (v:h) chan
getTick :: TickTableH -> TickKey -> IO (Maybe Tick)
getTick r key = M.lookup key . ticks <$> readIORef r
getTickerInfo :: TickTableH -> TickerId -> IO (Maybe TickerInfo)
getTickerInfo r tickerId = M.lookup tickerId . tickerInfo <$> readIORef r
Loading…
Cancel
Save