From 14d76cb77f78563eae2948bb3a7c4f3c03b26ab3 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Sun, 29 Oct 2023 11:49:24 +0700 Subject: [PATCH] Add paper broker --- src/Commissions.hs | 18 +++ src/Config.hs | 4 +- src/Main.hs | 38 +++++- src/PaperBroker.hs | 260 ++++++++++++++++++++++++++++++++++++++++ src/TXMLConnector.hs | 6 +- transaq-connector.cabal | 3 + 6 files changed, 319 insertions(+), 10 deletions(-) create mode 100644 src/Commissions.hs create mode 100644 src/PaperBroker.hs diff --git a/src/Commissions.hs b/src/Commissions.hs new file mode 100644 index 0000000..8c3fd0e --- /dev/null +++ b/src/Commissions.hs @@ -0,0 +1,18 @@ +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE OverloadedStrings #-} + +module Commissions ( + CommissionConfig(..) +) where + +import qualified Data.Text as T +import Dhall +import GHC.Generics + +data CommissionConfig = CommissionConfig { + comPrefix :: T.Text, + comPercentage :: Double, + comFixed :: Double +} deriving (Show, Eq, Generic) + +instance FromDhall CommissionConfig diff --git a/src/Config.hs b/src/Config.hs index 13ef7ea..6095b74 100644 --- a/src/Config.hs +++ b/src/Config.hs @@ -7,6 +7,7 @@ module Config loadConfig, ) where +import Commissions import qualified Data.Text as T import Dhall (FromDhall (autoWith), auto, expected, inputFile) import GHC.Generics @@ -39,7 +40,8 @@ data TransaqConnectorConfig = TransaqConnectorConfig { statsdPort :: Int, allTradesSubscriptions :: [SubscriptionConfig], quotationsSubscriptions :: [SubscriptionConfig], - quotesSubscriptions :: [SubscriptionConfig] + quotesSubscriptions :: [SubscriptionConfig], + commissions :: [CommissionConfig] } deriving (Show, Eq, Generic) instance FromDhall TransaqConnectorConfig diff --git a/src/Main.hs b/src/Main.hs index 62d9b62..3c6c206 100644 --- a/src/Main.hs +++ b/src/Main.hs @@ -10,17 +10,21 @@ import ATrade.Broker.TradeSinks.GotifyTradeSink (withGotifyTradeSink) import ATrade.Broker.TradeSinks.ZMQTradeSink (withZMQTradeSink) import ATrade.Logging (Message (..), Severity (Debug, Error, Info, Trace, Warning), fmtMessage, logWith) -import ATrade.QuoteSource.Server (startQuoteSourceServer, +import ATrade.QuoteSource.Server (QuoteSourceServerData (..), + startQuoteSourceServer, stopQuoteSourceServer) -import ATrade.Types (defaultServerSecurityParams) +import ATrade.Types (Tick, defaultServerSecurityParams, + fromDouble) import Colog (LogAction, cfilter, logTextStdout, (>$<)) import Colog.Actions (logTextHandle) import Config (TransaqConnectorConfig (..), loadConfig) -import Control.Concurrent (killThread, +import Control.Concurrent (ThreadId, killThread, threadDelay) -import Control.Concurrent.BoundedChan (newBoundedChan) +import Control.Concurrent.BoundedChan (BoundedChan, + newBoundedChan, + readChan, writeChan) import Control.Exception (bracket) import Control.Monad (forever, void) import Control.Monad.IO.Class (MonadIO) @@ -29,7 +33,9 @@ import Data.Version (showVersion) import Debug.EventCounters (emitEvent, initEventCounters) import HistoryProviderServer (withHistoryProviderServer) +import PaperBroker (mkPaperBroker) import Prelude hiding (log) +import SlaveThread (fork) import System.IO (Handle, IOMode (AppendMode), withFile) @@ -40,6 +46,7 @@ import System.Remote.Monitoring.Statsd (StatsdOptions (..), statsdThreadId) import System.ZMQ4 (withContext) import TickerInfoServer (withTickerInfoServer) +import TickTable (newTickTable) import qualified TXMLConnector as Connector import Version (transaqConnectorVersionText) @@ -53,6 +60,21 @@ parseLoglevel 2 = Info parseLoglevel 3 = Debug parseLoglevel _ = Trace +forkQssChannel :: + BoundedChan QuoteSourceServerData + -> IO (ThreadId, BoundedChan QuoteSourceServerData, BoundedChan Tick) +forkQssChannel chan = do + ch1 <- newBoundedChan 50000 + ch2 <- newBoundedChan 50000 + tid <- fork $ do + x <- readChan chan + writeChan ch1 x + case x of + QSSTick tick -> writeChan ch2 tick + _ -> return () + + return (tid, ch1, ch2) + main :: IO () main = do cfg <- loadConfig "transaq-connector.dhall" @@ -81,9 +103,12 @@ main = do stopQuoteSourceServer $ \_ -> withTickerInfoServer logger ctx (tisEndpoint cfg) $ \tisH -> do withZMQTradeSink ctx (tradesinkDashboard cfg) logger $ \tsDashboard -> withGotifyTradeSink (T.unpack $ gotifyUri cfg) (T.unpack $ gotifyToken cfg) logger $ \tsGotify -> do - txml <- Connector.start logger cfg qssChannel tisH + (forkTid, qssCh1, qssCh2) <- forkQssChannel qssChannel + tickTable <- newTickTable + paper <- mkPaperBroker tickTable tisH qssCh2 (fromDouble 100000.0) ["demo"] (commissions cfg) logger + txml <- Connector.start logger tickTable cfg qssCh1 tisH bracket (startBrokerServer - [Connector.makeBrokerBackend txml (account cfg)] + [Connector.makeBrokerBackend txml (account cfg), paper] ctx (brokerEndpoint cfg) (brokerNotificationsEndpoint cfg) @@ -98,6 +123,7 @@ main = do forever $ do threadDelay 200000 emitEvent "main_loop" + killThread forkTid log Info "main" "Shutting down" killThread $ statsdThreadId statsdThread diff --git a/src/PaperBroker.hs b/src/PaperBroker.hs new file mode 100644 index 0000000..124d462 --- /dev/null +++ b/src/PaperBroker.hs @@ -0,0 +1,260 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE QuasiQuotes #-} +{-# LANGUAGE Strict #-} + +module PaperBroker ( + PaperBrokerState, + mkPaperBroker +) where + +import ATrade.Broker.Backend +import ATrade.Broker.Protocol +import ATrade.Broker.Server +import ATrade.Logging (Message, Severity (..), + logWith) +import ATrade.Types +import Colog (LogAction) +import Commissions (CommissionConfig (..)) +import Control.Concurrent hiding (readChan, writeChan) +import Control.Concurrent.BoundedChan +import Control.Monad +import Data.Bits +import Data.IORef +import qualified Data.List as L +import qualified Data.Map.Strict as M +import Data.Maybe +import qualified Data.Text as T +import qualified Data.Text.Lazy as TL +import Data.Time.Clock +import Language.Haskell.Printf (t) +import System.ZMQ4 +import TickerInfoServer +import TickTable (TickTable, lookupTick) + +data PaperBrokerState = PaperBrokerState { + pbTid :: Maybe ThreadId, + tickTable :: TickTable, + orders :: M.Map OrderId Order, + cash :: !Price, + notificationCallback :: Maybe (BrokerBackendNotification -> IO ()), + pendingOrders :: [Order], + fortsClassCodes :: [T.Text], + fortsOpenTimeIntervals :: [(DiffTime, DiffTime)], + auctionableClassCodes :: [T.Text], + premarketStartTime :: DiffTime, + marketOpenTime :: DiffTime, + postMarketStartTime :: DiffTime, + postMarketFixTime :: DiffTime, + postMarketCloseTime :: DiffTime, + commissions :: [CommissionConfig], + logger :: LogAction IO Message, + tisH :: TickerInfoServerHandle +} + +hourMin :: Integer -> Integer -> DiffTime +hourMin h m = fromIntegral $ h * 3600 + m * 60 + +mkPaperBroker :: TickTable -> TickerInfoServerHandle -> BoundedChan Tick -> Price -> [T.Text] -> [CommissionConfig] -> LogAction IO Message -> IO BrokerBackend +mkPaperBroker tickTableH tisH tickChan startCash accounts comms l = do + state <- newIORef PaperBrokerState { + pbTid = Nothing, + tickTable = tickTableH, + orders = M.empty, + cash = startCash, + notificationCallback = Nothing, + pendingOrders = [], + fortsClassCodes = ["FUT", "OPT"], + fortsOpenTimeIntervals = [(hourMin 6 0, hourMin 11 0), (hourMin 11 5, hourMin 15 45), (hourMin 16 0, hourMin 20 50)], + auctionableClassCodes = ["TQBR"], + premarketStartTime = hourMin 6 50, + marketOpenTime = hourMin 7 0, + postMarketStartTime = hourMin 15 40, + postMarketFixTime = hourMin 15 45, + postMarketCloseTime = hourMin 15 50, + commissions = comms, + logger = l, + tisH = tisH + } + + tid <- forkIO $ brokerThread tickChan state + atomicModifyIORef' state (\s -> (s { pbTid = Just tid }, ())) + + return BrokerBackend { + accounts = accounts, + setNotificationCallback = pbSetNotificationCallback state, + submitOrder = pbSubmitOrder state, + cancelOrder = void . pbCancelOrder state, + stop = pbDestroyBroker state } + + +brokerThread :: BoundedChan Tick -> IORef PaperBrokerState -> IO () +brokerThread chan state = forever $ do + tick <- readChan chan + marketOpenTime' <- marketOpenTime <$> readIORef state + when ((utctDayTime . timestamp) tick >= marketOpenTime') $ + executePendingOrders tick state + +executePendingOrders tick state = do + marketOpenTime' <- marketOpenTime <$> readIORef state + po <- pendingOrders <$> readIORef state + when (utctDayTime (timestamp tick) >= marketOpenTime') $ do + executedIds <- catMaybes <$> mapM execute po + atomicModifyIORef' state (\s -> (s { pendingOrders = L.filter (\order -> orderId order `L.notElem` executedIds) (pendingOrders s)}, ())) + where + execute order = + if security tick == orderSecurity order + then + case orderPrice order of + Market -> do + log Debug "PaperBroker" "Executing: pending market order" + executeAtTick state order tick + return $ Just $ orderId order + Limit price -> + executeLimitAt price order + _ -> return Nothing + else return Nothing + + log sev comp txt = do + l <- logger <$> readIORef state + logWith l sev comp txt + + + executeLimitAt price order = case orderOperation order of + Buy -> if (datatype tick == LastTradePrice && price > value tick && value tick > 0) || + (datatype tick == BestOffer && price > value tick && value tick > 0) + then do + log Debug "PaperBroker" $ TL.toStrict $ [t|[1]Executing: pending limit order: %Q/%Q|] (security tick) (orderSecurity order) + executeAtTick state order $ tick { value = price } + return $ Just $ orderId order + else return Nothing + Sell -> if (datatype tick == LastTradePrice && price < value tick && value tick > 0) || (datatype tick == BestBid && price < value tick && value tick > 0) + then do + log Debug "PaperBroker" $ TL.toStrict $ [t|[2]Executing: pending limit order: %Q/%Q|] (security tick) (orderSecurity order) + executeAtTick state order $ tick { value = price } + return $ Just $ orderId order + else return Nothing + +pbSetNotificationCallback :: IORef PaperBrokerState -> Maybe (BrokerBackendNotification -> IO ()) -> IO() +pbSetNotificationCallback state callback = atomicModifyIORef' state (\s -> (s { notificationCallback = callback }, ()) ) + +mkTrade :: TickerInfo -> Tick -> Order -> UTCTime -> Maybe CommissionConfig -> Trade +mkTrade info tick order timestamp comconf = Trade { + tradeOrderId = orderId order, + tradePrice = value tick, + tradeQuantity = orderQuantity order, + tradeVolume = thisTradeVolume, + tradeVolumeCurrency = "TEST", + tradeOperation = orderOperation order, + tradeAccount = orderAccountId order, + tradeSecurity = orderSecurity order, + tradeTimestamp = timestamp, + tradeCommission = 0 `fromMaybe` (calcCommission thisTradeVolume <$> comconf), + tradeSignalId = orderSignalId order } + where + -- Futures have incorrect lotsize + thisTradeVolume = fromInteger (orderQuantity order) * value tick * if "FUT" `T.isPrefixOf` security tick then 1 else (fromIntegral $ tiLotSize info) + calcCommission vol c = vol * 0.01 * fromDouble (comPercentage c) + fromDouble (comFixed c) * fromIntegral (orderQuantity order) + +maybeCall proj state arg = do + cb <- proj <$> readIORef state + case cb of + Just callback -> callback arg + Nothing -> return () + +executeAtTick state order tick = do + let newOrder = order { orderState = Executed } + tickerInfo <- obtainTickerInfo (security tick) + comm <- L.find (\comdef -> comPrefix comdef `T.isPrefixOf` security tick) . commissions <$> readIORef state + let tradeVolume = fromInteger (orderQuantity order) * value tick * (fromIntegral $ tiLotSize tickerInfo) + atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s , cash = cash s - tradeVolume}, ())) + log Debug "PaperBroker" $ TL.toStrict $ [t|Executed: %? at tick: %?|] newOrder tick + ts <- getCurrentTime + maybeCall notificationCallback state $ BackendTradeNotification $ mkTrade tickerInfo tick order ts comm + maybeCall notificationCallback state $ BackendOrderNotification (orderId order) Executed + where + obtainTickerInfo tickerId = do + tis <- tisH <$> readIORef state + mInfo <- getTickerInfo tickerId tis + case mInfo of + Just info -> return info + _ -> return TickerInfo { tiTicker = tickerId, + tiLotSize = 1, + tiTickSize = 1 } + log sev comp txt = do + l <- logger <$> readIORef state + logWith l sev comp txt + + +rejectOrder state order = do + let newOrder = order { orderState = Rejected } in + atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s }, ())) + maybeCall notificationCallback state $ BackendOrderNotification (orderId order) Submitted + maybeCall notificationCallback state $ BackendOrderNotification (orderId order) Rejected + +pbSubmitOrder :: IORef PaperBrokerState -> Order -> IO () +pbSubmitOrder state order = do + log Info "PaperBroker" $ "Submitted order: " <> (T.pack . show) order + case orderPrice order of + Market -> executeMarketOrder state order + Limit price -> submitLimitOrder price state order + Stop price trigger -> submitStopOrder state order + StopMarket trigger -> submitStopMarketOrder state order + + where + log sev comp txt = do + l <- logger <$> readIORef state + logWith l sev comp txt + executeMarketOrder state order = do + tm <- tickTable <$> readIORef state + tickMb <- lookupTick tm (orderSecurity order) orderDatatype + case tickMb of + Nothing -> rejectOrder state order + Just tick -> if orderQuantity order /= 0 + then do + maybeCall notificationCallback state $ BackendOrderNotification (orderId order) Submitted + executeAtTick state order tick + else rejectOrder state order + submitLimitOrder price state order = if orderQuantity order == 0 + then rejectOrder state order + else do + tm <- tickTable <$> readIORef state + tickMb <- lookupTick tm (orderSecurity order) orderDatatype + log Debug "PaperBroker" $ "Limit order submitted, looking up: " <> (T.pack . show) (orderSecurity order, orderDatatype) + case tickMb of + Nothing -> do + let newOrder = order { orderState = Submitted } + atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s }, ())) + maybeCall notificationCallback state $ BackendOrderNotification (orderId order) Submitted + Just tick -> do + marketOpenTime' <- marketOpenTime <$> readIORef state + if (((orderOperation order == Buy) && (value tick < price)) || + ((orderOperation order == Sell) && (value tick > price)) && (utctDayTime (timestamp tick) >= marketOpenTime')) + then do + maybeCall notificationCallback state $ BackendOrderNotification (orderId order) Submitted + executeAtTick state order tick + else do + let newOrder = order { orderState = Submitted } + atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s , pendingOrders = newOrder : pendingOrders s}, ())) + maybeCall notificationCallback state $ BackendOrderNotification (orderId order) Submitted + + submitStopOrder _ _ = log Warning "PaperBroker" $ "Not implemented: Submitted order: " <> (T.pack . show) order + submitStopMarketOrder _ _ = log Warning "PaperBroker" $ "Not implemented: Submitted order: " <> (T.pack . show) order + + orderDatatype = case orderOperation order of + Buy -> BestOffer + Sell -> BestBid + +pbCancelOrder :: IORef PaperBrokerState -> OrderId -> IO Bool +pbCancelOrder state oid = do + atomicModifyIORef' state (\s -> (s { pendingOrders = L.filter (\o -> orderId o /= oid) (pendingOrders s), + orders = M.adjustWithKey (\_ v -> v { orderState = Cancelled }) oid (orders s) }, ())) + maybeCall notificationCallback state $ BackendOrderNotification oid Cancelled + return True + +pbDestroyBroker :: IORef PaperBrokerState -> IO () +pbDestroyBroker state = do + maybeTid <- pbTid <$> readIORef state + case maybeTid of + Just tid -> killThread tid + Nothing -> return () + diff --git a/src/TXMLConnector.hs b/src/TXMLConnector.hs index e918f83..5700286 100644 --- a/src/TXMLConnector.hs +++ b/src/TXMLConnector.hs @@ -42,7 +42,7 @@ import GHC.Exts (IsList (..)) import Prelude hiding (log) import SlaveThread (fork) import TickerInfoServer (TickerInfoServerHandle) -import TickTable (newTickTable) +import TickTable (TickTable) import Transaq (TransaqResponse) import TXML (LogLevel, MonadTXML, initialize, sendCommand, @@ -98,14 +98,14 @@ instance HasLog Env Message App where start :: LogAction IO Message + -> TickTable -> TransaqConnectorConfig -> BoundedChan QuoteSourceServerData -> TickerInfoServerHandle -> IO TXMLConnectorHandle -start logger' config' qssChannel' tisH = do +start logger' tickTable config' qssChannel' tisH = do logWith logger' Info "TXMLConnector" "Starting" notificationQueue' <- atomically $ newTBQueue 50000 - tickTable <- newTickTable requestVar' <- newEmptyTMVarIO responseVar' <- newEmptyTMVarIO currentCandles' <- newTVarIO [] diff --git a/transaq-connector.cabal b/transaq-connector.cabal index e3a5247..6a771c9 100644 --- a/transaq-connector.cabal +++ b/transaq-connector.cabal @@ -28,6 +28,8 @@ executable transaq-connector , TXMLConnector.Internal , TickTable , FSM + , PaperBroker + , Commissions default-extensions: OverloadedStrings , MultiWayIf , MultiParamTypeClasses @@ -60,6 +62,7 @@ executable transaq-connector , ekg-statsd , ekg-core , slave-thread + , th-printf extra-lib-dirs: lib ghc-options: -Wall -Wcompat