Browse Source

Add paper broker

master
Denis Tereshkin 2 years ago
parent
commit
14d76cb77f
  1. 18
      src/Commissions.hs
  2. 4
      src/Config.hs
  3. 38
      src/Main.hs
  4. 260
      src/PaperBroker.hs
  5. 6
      src/TXMLConnector.hs
  6. 3
      transaq-connector.cabal

18
src/Commissions.hs

@ -0,0 +1,18 @@
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}
module Commissions (
CommissionConfig(..)
) where
import qualified Data.Text as T
import Dhall
import GHC.Generics
data CommissionConfig = CommissionConfig {
comPrefix :: T.Text,
comPercentage :: Double,
comFixed :: Double
} deriving (Show, Eq, Generic)
instance FromDhall CommissionConfig

4
src/Config.hs

@ -7,6 +7,7 @@ module Config
loadConfig, loadConfig,
) where ) where
import Commissions
import qualified Data.Text as T import qualified Data.Text as T
import Dhall (FromDhall (autoWith), auto, expected, inputFile) import Dhall (FromDhall (autoWith), auto, expected, inputFile)
import GHC.Generics import GHC.Generics
@ -39,7 +40,8 @@ data TransaqConnectorConfig = TransaqConnectorConfig {
statsdPort :: Int, statsdPort :: Int,
allTradesSubscriptions :: [SubscriptionConfig], allTradesSubscriptions :: [SubscriptionConfig],
quotationsSubscriptions :: [SubscriptionConfig], quotationsSubscriptions :: [SubscriptionConfig],
quotesSubscriptions :: [SubscriptionConfig] quotesSubscriptions :: [SubscriptionConfig],
commissions :: [CommissionConfig]
} deriving (Show, Eq, Generic) } deriving (Show, Eq, Generic)
instance FromDhall TransaqConnectorConfig instance FromDhall TransaqConnectorConfig

38
src/Main.hs

@ -10,17 +10,21 @@ import ATrade.Broker.TradeSinks.GotifyTradeSink (withGotifyTradeSink)
import ATrade.Broker.TradeSinks.ZMQTradeSink (withZMQTradeSink) import ATrade.Broker.TradeSinks.ZMQTradeSink (withZMQTradeSink)
import ATrade.Logging (Message (..), Severity (Debug, Error, Info, Trace, Warning), import ATrade.Logging (Message (..), Severity (Debug, Error, Info, Trace, Warning),
fmtMessage, logWith) fmtMessage, logWith)
import ATrade.QuoteSource.Server (startQuoteSourceServer, import ATrade.QuoteSource.Server (QuoteSourceServerData (..),
startQuoteSourceServer,
stopQuoteSourceServer) stopQuoteSourceServer)
import ATrade.Types (defaultServerSecurityParams) import ATrade.Types (Tick, defaultServerSecurityParams,
fromDouble)
import Colog (LogAction, cfilter, import Colog (LogAction, cfilter,
logTextStdout, (>$<)) logTextStdout, (>$<))
import Colog.Actions (logTextHandle) import Colog.Actions (logTextHandle)
import Config (TransaqConnectorConfig (..), import Config (TransaqConnectorConfig (..),
loadConfig) loadConfig)
import Control.Concurrent (killThread, import Control.Concurrent (ThreadId, killThread,
threadDelay) threadDelay)
import Control.Concurrent.BoundedChan (newBoundedChan) import Control.Concurrent.BoundedChan (BoundedChan,
newBoundedChan,
readChan, writeChan)
import Control.Exception (bracket) import Control.Exception (bracket)
import Control.Monad (forever, void) import Control.Monad (forever, void)
import Control.Monad.IO.Class (MonadIO) import Control.Monad.IO.Class (MonadIO)
@ -29,7 +33,9 @@ import Data.Version (showVersion)
import Debug.EventCounters (emitEvent, import Debug.EventCounters (emitEvent,
initEventCounters) initEventCounters)
import HistoryProviderServer (withHistoryProviderServer) import HistoryProviderServer (withHistoryProviderServer)
import PaperBroker (mkPaperBroker)
import Prelude hiding (log) import Prelude hiding (log)
import SlaveThread (fork)
import System.IO (Handle, import System.IO (Handle,
IOMode (AppendMode), IOMode (AppendMode),
withFile) withFile)
@ -40,6 +46,7 @@ import System.Remote.Monitoring.Statsd (StatsdOptions (..),
statsdThreadId) statsdThreadId)
import System.ZMQ4 (withContext) import System.ZMQ4 (withContext)
import TickerInfoServer (withTickerInfoServer) import TickerInfoServer (withTickerInfoServer)
import TickTable (newTickTable)
import qualified TXMLConnector as Connector import qualified TXMLConnector as Connector
import Version (transaqConnectorVersionText) import Version (transaqConnectorVersionText)
@ -53,6 +60,21 @@ parseLoglevel 2 = Info
parseLoglevel 3 = Debug parseLoglevel 3 = Debug
parseLoglevel _ = Trace parseLoglevel _ = Trace
forkQssChannel ::
BoundedChan QuoteSourceServerData
-> IO (ThreadId, BoundedChan QuoteSourceServerData, BoundedChan Tick)
forkQssChannel chan = do
ch1 <- newBoundedChan 50000
ch2 <- newBoundedChan 50000
tid <- fork $ do
x <- readChan chan
writeChan ch1 x
case x of
QSSTick tick -> writeChan ch2 tick
_ -> return ()
return (tid, ch1, ch2)
main :: IO () main :: IO ()
main = do main = do
cfg <- loadConfig "transaq-connector.dhall" cfg <- loadConfig "transaq-connector.dhall"
@ -81,9 +103,12 @@ main = do
stopQuoteSourceServer $ \_ -> withTickerInfoServer logger ctx (tisEndpoint cfg) $ \tisH -> do stopQuoteSourceServer $ \_ -> withTickerInfoServer logger ctx (tisEndpoint cfg) $ \tisH -> do
withZMQTradeSink ctx (tradesinkDashboard cfg) logger $ \tsDashboard -> withZMQTradeSink ctx (tradesinkDashboard cfg) logger $ \tsDashboard ->
withGotifyTradeSink (T.unpack $ gotifyUri cfg) (T.unpack $ gotifyToken cfg) logger $ \tsGotify -> do withGotifyTradeSink (T.unpack $ gotifyUri cfg) (T.unpack $ gotifyToken cfg) logger $ \tsGotify -> do
txml <- Connector.start logger cfg qssChannel tisH (forkTid, qssCh1, qssCh2) <- forkQssChannel qssChannel
tickTable <- newTickTable
paper <- mkPaperBroker tickTable tisH qssCh2 (fromDouble 100000.0) ["demo"] (commissions cfg) logger
txml <- Connector.start logger tickTable cfg qssCh1 tisH
bracket (startBrokerServer bracket (startBrokerServer
[Connector.makeBrokerBackend txml (account cfg)] [Connector.makeBrokerBackend txml (account cfg), paper]
ctx ctx
(brokerEndpoint cfg) (brokerEndpoint cfg)
(brokerNotificationsEndpoint cfg) (brokerNotificationsEndpoint cfg)
@ -98,6 +123,7 @@ main = do
forever $ do forever $ do
threadDelay 200000 threadDelay 200000
emitEvent "main_loop" emitEvent "main_loop"
killThread forkTid
log Info "main" "Shutting down" log Info "main" "Shutting down"
killThread $ statsdThreadId statsdThread killThread $ statsdThreadId statsdThread

260
src/PaperBroker.hs

@ -0,0 +1,260 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE Strict #-}
module PaperBroker (
PaperBrokerState,
mkPaperBroker
) where
import ATrade.Broker.Backend
import ATrade.Broker.Protocol
import ATrade.Broker.Server
import ATrade.Logging (Message, Severity (..),
logWith)
import ATrade.Types
import Colog (LogAction)
import Commissions (CommissionConfig (..))
import Control.Concurrent hiding (readChan, writeChan)
import Control.Concurrent.BoundedChan
import Control.Monad
import Data.Bits
import Data.IORef
import qualified Data.List as L
import qualified Data.Map.Strict as M
import Data.Maybe
import qualified Data.Text as T
import qualified Data.Text.Lazy as TL
import Data.Time.Clock
import Language.Haskell.Printf (t)
import System.ZMQ4
import TickerInfoServer
import TickTable (TickTable, lookupTick)
data PaperBrokerState = PaperBrokerState {
pbTid :: Maybe ThreadId,
tickTable :: TickTable,
orders :: M.Map OrderId Order,
cash :: !Price,
notificationCallback :: Maybe (BrokerBackendNotification -> IO ()),
pendingOrders :: [Order],
fortsClassCodes :: [T.Text],
fortsOpenTimeIntervals :: [(DiffTime, DiffTime)],
auctionableClassCodes :: [T.Text],
premarketStartTime :: DiffTime,
marketOpenTime :: DiffTime,
postMarketStartTime :: DiffTime,
postMarketFixTime :: DiffTime,
postMarketCloseTime :: DiffTime,
commissions :: [CommissionConfig],
logger :: LogAction IO Message,
tisH :: TickerInfoServerHandle
}
hourMin :: Integer -> Integer -> DiffTime
hourMin h m = fromIntegral $ h * 3600 + m * 60
mkPaperBroker :: TickTable -> TickerInfoServerHandle -> BoundedChan Tick -> Price -> [T.Text] -> [CommissionConfig] -> LogAction IO Message -> IO BrokerBackend
mkPaperBroker tickTableH tisH tickChan startCash accounts comms l = do
state <- newIORef PaperBrokerState {
pbTid = Nothing,
tickTable = tickTableH,
orders = M.empty,
cash = startCash,
notificationCallback = Nothing,
pendingOrders = [],
fortsClassCodes = ["FUT", "OPT"],
fortsOpenTimeIntervals = [(hourMin 6 0, hourMin 11 0), (hourMin 11 5, hourMin 15 45), (hourMin 16 0, hourMin 20 50)],
auctionableClassCodes = ["TQBR"],
premarketStartTime = hourMin 6 50,
marketOpenTime = hourMin 7 0,
postMarketStartTime = hourMin 15 40,
postMarketFixTime = hourMin 15 45,
postMarketCloseTime = hourMin 15 50,
commissions = comms,
logger = l,
tisH = tisH
}
tid <- forkIO $ brokerThread tickChan state
atomicModifyIORef' state (\s -> (s { pbTid = Just tid }, ()))
return BrokerBackend {
accounts = accounts,
setNotificationCallback = pbSetNotificationCallback state,
submitOrder = pbSubmitOrder state,
cancelOrder = void . pbCancelOrder state,
stop = pbDestroyBroker state }
brokerThread :: BoundedChan Tick -> IORef PaperBrokerState -> IO ()
brokerThread chan state = forever $ do
tick <- readChan chan
marketOpenTime' <- marketOpenTime <$> readIORef state
when ((utctDayTime . timestamp) tick >= marketOpenTime') $
executePendingOrders tick state
executePendingOrders tick state = do
marketOpenTime' <- marketOpenTime <$> readIORef state
po <- pendingOrders <$> readIORef state
when (utctDayTime (timestamp tick) >= marketOpenTime') $ do
executedIds <- catMaybes <$> mapM execute po
atomicModifyIORef' state (\s -> (s { pendingOrders = L.filter (\order -> orderId order `L.notElem` executedIds) (pendingOrders s)}, ()))
where
execute order =
if security tick == orderSecurity order
then
case orderPrice order of
Market -> do
log Debug "PaperBroker" "Executing: pending market order"
executeAtTick state order tick
return $ Just $ orderId order
Limit price ->
executeLimitAt price order
_ -> return Nothing
else return Nothing
log sev comp txt = do
l <- logger <$> readIORef state
logWith l sev comp txt
executeLimitAt price order = case orderOperation order of
Buy -> if (datatype tick == LastTradePrice && price > value tick && value tick > 0) ||
(datatype tick == BestOffer && price > value tick && value tick > 0)
then do
log Debug "PaperBroker" $ TL.toStrict $ [t|[1]Executing: pending limit order: %Q/%Q|] (security tick) (orderSecurity order)
executeAtTick state order $ tick { value = price }
return $ Just $ orderId order
else return Nothing
Sell -> if (datatype tick == LastTradePrice && price < value tick && value tick > 0) || (datatype tick == BestBid && price < value tick && value tick > 0)
then do
log Debug "PaperBroker" $ TL.toStrict $ [t|[2]Executing: pending limit order: %Q/%Q|] (security tick) (orderSecurity order)
executeAtTick state order $ tick { value = price }
return $ Just $ orderId order
else return Nothing
pbSetNotificationCallback :: IORef PaperBrokerState -> Maybe (BrokerBackendNotification -> IO ()) -> IO()
pbSetNotificationCallback state callback = atomicModifyIORef' state (\s -> (s { notificationCallback = callback }, ()) )
mkTrade :: TickerInfo -> Tick -> Order -> UTCTime -> Maybe CommissionConfig -> Trade
mkTrade info tick order timestamp comconf = Trade {
tradeOrderId = orderId order,
tradePrice = value tick,
tradeQuantity = orderQuantity order,
tradeVolume = thisTradeVolume,
tradeVolumeCurrency = "TEST",
tradeOperation = orderOperation order,
tradeAccount = orderAccountId order,
tradeSecurity = orderSecurity order,
tradeTimestamp = timestamp,
tradeCommission = 0 `fromMaybe` (calcCommission thisTradeVolume <$> comconf),
tradeSignalId = orderSignalId order }
where
-- Futures have incorrect lotsize
thisTradeVolume = fromInteger (orderQuantity order) * value tick * if "FUT" `T.isPrefixOf` security tick then 1 else (fromIntegral $ tiLotSize info)
calcCommission vol c = vol * 0.01 * fromDouble (comPercentage c) + fromDouble (comFixed c) * fromIntegral (orderQuantity order)
maybeCall proj state arg = do
cb <- proj <$> readIORef state
case cb of
Just callback -> callback arg
Nothing -> return ()
executeAtTick state order tick = do
let newOrder = order { orderState = Executed }
tickerInfo <- obtainTickerInfo (security tick)
comm <- L.find (\comdef -> comPrefix comdef `T.isPrefixOf` security tick) . commissions <$> readIORef state
let tradeVolume = fromInteger (orderQuantity order) * value tick * (fromIntegral $ tiLotSize tickerInfo)
atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s , cash = cash s - tradeVolume}, ()))
log Debug "PaperBroker" $ TL.toStrict $ [t|Executed: %? at tick: %?|] newOrder tick
ts <- getCurrentTime
maybeCall notificationCallback state $ BackendTradeNotification $ mkTrade tickerInfo tick order ts comm
maybeCall notificationCallback state $ BackendOrderNotification (orderId order) Executed
where
obtainTickerInfo tickerId = do
tis <- tisH <$> readIORef state
mInfo <- getTickerInfo tickerId tis
case mInfo of
Just info -> return info
_ -> return TickerInfo { tiTicker = tickerId,
tiLotSize = 1,
tiTickSize = 1 }
log sev comp txt = do
l <- logger <$> readIORef state
logWith l sev comp txt
rejectOrder state order = do
let newOrder = order { orderState = Rejected } in
atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s }, ()))
maybeCall notificationCallback state $ BackendOrderNotification (orderId order) Submitted
maybeCall notificationCallback state $ BackendOrderNotification (orderId order) Rejected
pbSubmitOrder :: IORef PaperBrokerState -> Order -> IO ()
pbSubmitOrder state order = do
log Info "PaperBroker" $ "Submitted order: " <> (T.pack . show) order
case orderPrice order of
Market -> executeMarketOrder state order
Limit price -> submitLimitOrder price state order
Stop price trigger -> submitStopOrder state order
StopMarket trigger -> submitStopMarketOrder state order
where
log sev comp txt = do
l <- logger <$> readIORef state
logWith l sev comp txt
executeMarketOrder state order = do
tm <- tickTable <$> readIORef state
tickMb <- lookupTick tm (orderSecurity order) orderDatatype
case tickMb of
Nothing -> rejectOrder state order
Just tick -> if orderQuantity order /= 0
then do
maybeCall notificationCallback state $ BackendOrderNotification (orderId order) Submitted
executeAtTick state order tick
else rejectOrder state order
submitLimitOrder price state order = if orderQuantity order == 0
then rejectOrder state order
else do
tm <- tickTable <$> readIORef state
tickMb <- lookupTick tm (orderSecurity order) orderDatatype
log Debug "PaperBroker" $ "Limit order submitted, looking up: " <> (T.pack . show) (orderSecurity order, orderDatatype)
case tickMb of
Nothing -> do
let newOrder = order { orderState = Submitted }
atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s }, ()))
maybeCall notificationCallback state $ BackendOrderNotification (orderId order) Submitted
Just tick -> do
marketOpenTime' <- marketOpenTime <$> readIORef state
if (((orderOperation order == Buy) && (value tick < price)) ||
((orderOperation order == Sell) && (value tick > price)) && (utctDayTime (timestamp tick) >= marketOpenTime'))
then do
maybeCall notificationCallback state $ BackendOrderNotification (orderId order) Submitted
executeAtTick state order tick
else do
let newOrder = order { orderState = Submitted }
atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s , pendingOrders = newOrder : pendingOrders s}, ()))
maybeCall notificationCallback state $ BackendOrderNotification (orderId order) Submitted
submitStopOrder _ _ = log Warning "PaperBroker" $ "Not implemented: Submitted order: " <> (T.pack . show) order
submitStopMarketOrder _ _ = log Warning "PaperBroker" $ "Not implemented: Submitted order: " <> (T.pack . show) order
orderDatatype = case orderOperation order of
Buy -> BestOffer
Sell -> BestBid
pbCancelOrder :: IORef PaperBrokerState -> OrderId -> IO Bool
pbCancelOrder state oid = do
atomicModifyIORef' state (\s -> (s { pendingOrders = L.filter (\o -> orderId o /= oid) (pendingOrders s),
orders = M.adjustWithKey (\_ v -> v { orderState = Cancelled }) oid (orders s) }, ()))
maybeCall notificationCallback state $ BackendOrderNotification oid Cancelled
return True
pbDestroyBroker :: IORef PaperBrokerState -> IO ()
pbDestroyBroker state = do
maybeTid <- pbTid <$> readIORef state
case maybeTid of
Just tid -> killThread tid
Nothing -> return ()

6
src/TXMLConnector.hs

@ -42,7 +42,7 @@ import GHC.Exts (IsList (..))
import Prelude hiding (log) import Prelude hiding (log)
import SlaveThread (fork) import SlaveThread (fork)
import TickerInfoServer (TickerInfoServerHandle) import TickerInfoServer (TickerInfoServerHandle)
import TickTable (newTickTable) import TickTable (TickTable)
import Transaq (TransaqResponse) import Transaq (TransaqResponse)
import TXML (LogLevel, MonadTXML, import TXML (LogLevel, MonadTXML,
initialize, sendCommand, initialize, sendCommand,
@ -98,14 +98,14 @@ instance HasLog Env Message App where
start :: start ::
LogAction IO Message LogAction IO Message
-> TickTable
-> TransaqConnectorConfig -> TransaqConnectorConfig
-> BoundedChan QuoteSourceServerData -> BoundedChan QuoteSourceServerData
-> TickerInfoServerHandle -> TickerInfoServerHandle
-> IO TXMLConnectorHandle -> IO TXMLConnectorHandle
start logger' config' qssChannel' tisH = do start logger' tickTable config' qssChannel' tisH = do
logWith logger' Info "TXMLConnector" "Starting" logWith logger' Info "TXMLConnector" "Starting"
notificationQueue' <- atomically $ newTBQueue 50000 notificationQueue' <- atomically $ newTBQueue 50000
tickTable <- newTickTable
requestVar' <- newEmptyTMVarIO requestVar' <- newEmptyTMVarIO
responseVar' <- newEmptyTMVarIO responseVar' <- newEmptyTMVarIO
currentCandles' <- newTVarIO [] currentCandles' <- newTVarIO []

3
transaq-connector.cabal

@ -28,6 +28,8 @@ executable transaq-connector
, TXMLConnector.Internal , TXMLConnector.Internal
, TickTable , TickTable
, FSM , FSM
, PaperBroker
, Commissions
default-extensions: OverloadedStrings default-extensions: OverloadedStrings
, MultiWayIf , MultiWayIf
, MultiParamTypeClasses , MultiParamTypeClasses
@ -60,6 +62,7 @@ executable transaq-connector
, ekg-statsd , ekg-statsd
, ekg-core , ekg-core
, slave-thread , slave-thread
, th-printf
extra-lib-dirs: lib extra-lib-dirs: lib
ghc-options: -Wall ghc-options: -Wall
-Wcompat -Wcompat

Loading…
Cancel
Save