diff --git a/app/Config.hs b/app/Config.hs index 8065406..3936563 100644 --- a/app/Config.hs +++ b/app/Config.hs @@ -22,6 +22,7 @@ data TableConfig = TableConfig { data Config = Config { quotesourceEndpoint :: String, + qtisEndpoint :: String, pipeReaderQsEndpoint :: Maybe String, tickPipePath :: Maybe String, brokerserverEndpoint :: String, @@ -48,6 +49,7 @@ readConfig fname = do parseConfig :: Value -> Parser Config parseConfig = withObject "object" $ \obj -> do qse <- obj .: "quotesource-endpoint" + qtisEp <- obj .: "qtis-endpoint" qsePipe <- obj .:? "quotesource-endpoint-pipe-reader" pipePath <- obj .:? "pipe-reader-path" bse <- obj .: "brokerserver-endpoint" @@ -65,6 +67,7 @@ parseConfig = withObject "object" $ \obj -> do tgChatId <- obj .: "telegram-chatid" accs <- V.toList <$> obj .: "accounts" return Config { quotesourceEndpoint = qse, + qtisEndpoint = qtisEp, pipeReaderQsEndpoint = qsePipe, tickPipePath = pipePath, brokerserverEndpoint = bse, diff --git a/app/Main.hs b/app/Main.hs index c636a7e..132c363 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -72,9 +72,9 @@ main = do (forkId, c1, c2) <- forkBoundedChan 10000 chan - brokerP <- mkPaperBroker c1 1000000 ["demo"] brokerQ <- mkQuikBroker (dllPath config) (quikPath config) (quikAccounts config) withContext (\ctx -> do + brokerP <- mkPaperBroker ctx (T.pack $ qtisEndpoint config) c1 1000000 ["demo"] withZapHandler ctx (\zap -> do zapSetWhitelist zap $ whitelist config zapSetBlacklist zap $ blacklist config diff --git a/quik-connector.cabal b/quik-connector.cabal index 7690527..008a96e 100644 --- a/quik-connector.cabal +++ b/quik-connector.cabal @@ -23,6 +23,7 @@ library , Broker.QuikBroker , Broker.QuikBroker.Trans2QuikApi , Network.Telegram + , ATrade.Quotes.QTIS ghc-options: -Wall -Wunsupported-calling-conventions build-depends: base >= 4.7 && < 5 , Win32 diff --git a/src/ATrade/Quotes/QTIS.hs b/src/ATrade/Quotes/QTIS.hs new file mode 100644 index 0000000..04c19de --- /dev/null +++ b/src/ATrade/Quotes/QTIS.hs @@ -0,0 +1,60 @@ +{-# LANGUAGE OverloadedStrings #-} + +module ATrade.Quotes.QTIS +( + TickerInfo(..), + qtisGetTickersInfo, + qtisGetTickersInfo' +) where + +import ATrade.Types +import Control.Monad +import Data.Aeson +import Data.Maybe +import qualified Data.ByteString.Char8 as BC8 +import qualified Data.ByteString.Lazy as BL +import qualified Data.Text as T +import System.ZMQ4 +import System.Log.Logger + +data TickerInfo = TickerInfo { + tiTicker :: T.Text, + tiLotSize :: Integer, + tiTickSize :: Price +} deriving (Show, Eq) + +instance FromJSON TickerInfo where + parseJSON = withObject "object" (\obj -> + TickerInfo <$> + obj .: "ticker" <*> + obj .: "lot_size" <*> + obj .: "tick_size") + +instance ToJSON TickerInfo where + toJSON ti = object [ "ticker" .= tiTicker ti, + "lot_size" .= tiLotSize ti, + "tick_size" .= tiTickSize ti ] + +qtisGetTickersInfo' :: T.Text -> [TickerId] -> IO [TickerInfo] +qtisGetTickersInfo' endpoint tickers = withContext (\ctx -> qtisGetTickersInfo ctx endpoint tickers) + +qtisGetTickersInfo :: Context -> T.Text -> [TickerId] -> IO [TickerInfo] +qtisGetTickersInfo ctx endpoint tickers = + withSocket ctx Req (\sock -> do + debugM "QTIS" $ "Connecting to: " ++ T.unpack endpoint + connect sock $ T.unpack endpoint + catMaybes <$> forM tickers (\tickerId -> do + debugM "QTIS" $ "Requesting: " ++ T.unpack tickerId + send sock [] $ BL.toStrict (tickerRequest tickerId) + response <- receiveMulti sock + let r = parseResponse response + debugM "QTIS" $ "Got response: " ++ show r + return r)) + where + tickerRequest tickerId = encode $ object ["ticker" .= tickerId] + parseResponse :: [BC8.ByteString] -> Maybe TickerInfo + parseResponse (header:payload:_) = if header == "OK" + then decode $ BL.fromStrict payload + else Nothing + parseResponse _ = Nothing + diff --git a/src/Broker/PaperBroker.hs b/src/Broker/PaperBroker.hs index 4766200..2431d83 100644 --- a/src/Broker/PaperBroker.hs +++ b/src/Broker/PaperBroker.hs @@ -23,6 +23,8 @@ import Control.Monad import Control.Concurrent.BoundedChan import Control.Concurrent hiding (readChan) import System.Log.Logger +import ATrade.Quotes.QTIS +import System.ZMQ4 data TickMapKey = TickMapKey !T.Text !DataType deriving (Show, Eq, Ord) @@ -30,9 +32,12 @@ data TickMapKey = TickMapKey !T.Text !DataType instance Hashable TickMapKey where hashWithSalt salt (TickMapKey s dt) = hashWithSalt salt s `xor` hashWithSalt salt (fromEnum dt) +data QTISResult = Fetching | Done TickerInfo + data PaperBrokerState = PaperBrokerState { pbTid :: Maybe ThreadId, tickMap :: M.Map TickMapKey Tick, + tickerInfoMap :: M.Map TickerId QTISResult, orders :: M.Map OrderId Order, cash :: !Price, notificationCallback :: Maybe (Notification -> IO ()), @@ -52,11 +57,12 @@ data PaperBrokerState = PaperBrokerState { hourMin :: Integer -> Integer -> DiffTime hourMin h m = fromIntegral $ h * 3600 + m * 60 -mkPaperBroker :: BoundedChan Tick -> Price -> [T.Text] -> IO BrokerInterface -mkPaperBroker tickChan startCash accounts = do +mkPaperBroker :: Context -> T.Text -> BoundedChan Tick -> Price -> [T.Text] -> IO BrokerInterface +mkPaperBroker ctx qtisEp tickChan startCash accounts = do state <- newIORef PaperBrokerState { pbTid = Nothing, tickMap = M.empty, + tickerInfoMap = M.empty, orders = M.empty, cash = startCash, notificationCallback = Nothing, @@ -71,7 +77,7 @@ mkPaperBroker tickChan startCash accounts = do postMarketCloseTime = hourMin 15 50 } - tid <- forkIO $ brokerThread tickChan state + tid <- forkIO $ brokerThread ctx qtisEp tickChan state atomicModifyIORef' state (\s -> (s { pbTid = Just tid }, ())) return BrokerInterface { @@ -81,9 +87,17 @@ mkPaperBroker tickChan startCash accounts = do cancelOrder = pbCancelOrder state, stopBroker = pbDestroyBroker state } -brokerThread :: BoundedChan Tick -> IORef PaperBrokerState -> IO () -brokerThread chan state = forever $ do +brokerThread :: Context -> T.Text -> BoundedChan Tick -> IORef PaperBrokerState -> IO () +brokerThread ctx qtisEp chan state = forever $ do tick <- readChan chan + when (datatype tick == LastTradePrice) $ do + info <- M.lookup (security tick) . tickerInfoMap <$> readIORef state + when (isNothing info) $ do + atomicModifyIORef' state (\s -> (s { tickerInfoMap = M.insert (security tick) Fetching $! tickerInfoMap s }, ())) + void $ forkIO $ do + ti <- qtisGetTickersInfo ctx qtisEp [security tick] + forM_ ti (\newInfo -> atomicModifyIORef' state (\s -> (s { tickerInfoMap = M.insert (security tick) (Done newInfo) $! tickerInfoMap s }, ()))) + atomicModifyIORef' state (\s -> (s { tickMap = M.insert (makeKey tick) tick $! tickMap s }, ())) executePendingOrders tick state where @@ -124,12 +138,12 @@ executePendingOrders tick state = do pbSetNotificationCallback :: IORef PaperBrokerState -> Maybe (Notification -> IO ()) -> IO() pbSetNotificationCallback state callback = atomicModifyIORef' state (\s -> (s { notificationCallback = callback }, ()) ) -mkTrade :: Tick -> Order -> UTCTime -> Trade -mkTrade tick order timestamp = Trade { +mkTrade :: TickerInfo -> Tick -> Order -> UTCTime -> Trade +mkTrade info tick order timestamp = Trade { tradeOrderId = orderId order, tradePrice = value tick, tradeQuantity = orderQuantity order, - tradeVolume = fromInteger (orderQuantity order) * value tick, + tradeVolume = fromInteger (orderQuantity order) * value tick * fromInteger (tiLotSize info), tradeVolumeCurrency = "TEST", tradeOperation = orderOperation order, tradeAccount = orderAccountId order, @@ -145,12 +159,21 @@ maybeCall proj state arg = do executeAtTick state order tick = do let newOrder = order { orderState = Executed } - let tradeVolume = fromInteger (orderQuantity order) * value tick + tickerInfo <- obtainTickerInfo (security tick) + let tradeVolume = fromInteger (orderQuantity order) * value tick * fromInteger (tiLotSize tickerInfo) atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s , cash = cash s - tradeVolume}, ())) debugM "PaperBroker" $ "Executed: " ++ show newOrder ++ "; at tick: " ++ show tick ts <- getCurrentTime - maybeCall notificationCallback state $ TradeNotification $ mkTrade tick order ts + maybeCall notificationCallback state $ TradeNotification $ mkTrade tickerInfo tick order ts maybeCall notificationCallback state $ OrderNotification (orderId order) Executed + where + obtainTickerInfo tickerId = do + mInfo <- M.lookup tickerId . tickerInfoMap <$> readIORef state + case mInfo of + Just (Done info) -> return info + _ -> return TickerInfo { tiTicker = tickerId, + tiLotSize = 1, + tiTickSize = 1 } rejectOrder state order = do let newOrder = order { orderState = Rejected } in