From fd03f1fc91ac390e4e310e8f1e10871c78e4994e Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Fri, 23 Sep 2016 10:37:06 +0700 Subject: [PATCH] PaperBroker: market orders support --- app/Main.hs | 21 +++++- quik-connector.cabal | 4 ++ src/Broker.hs | 19 +++-- src/Broker/PaperBroker.hs | 141 ++++++++++++++++++++++++++++++++++++++ src/Data/ATrade.hs | 2 +- 5 files changed, 180 insertions(+), 7 deletions(-) create mode 100644 src/Broker/PaperBroker.hs diff --git a/app/Main.hs b/app/Main.hs index 7c03401..d35b063 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -14,6 +14,9 @@ import QuoteSource.TableParsers.AllParamsTableParser import QuoteSource.TableParser import QuoteSource.Server +import Broker +import Broker.PaperBroker + import System.Log.Logger import System.Log.Handler.Simple import System.Log.Handler (setFormatter) @@ -72,18 +75,31 @@ parseConfig = withObject "object" $ \obj -> do tableName = tn, tableParams = params } +forkBoundedChan :: Int -> Int -> BoundedChan a -> IO (ThreadId, [BoundedChan a]) +forkBoundedChan chans size source = do + sinks <- replicateM chans (newBoundedChan size) + tid <- forkIO $ forever $ do + v <- readChan source + mapM_ (`tryWriteChan` v) sinks + + return (tid, sinks) + main :: IO () main = do updateGlobalLogger rootLoggerName (setLevel DEBUG) infoM "main" "Loading config" - config <- readConfig "quik-connector.config.json" + config <- readConfig "quik-connector.config.json" infoM "main" "Config loaded" chan <- newBoundedChan 1000 infoM "main" "Starting data import server" dis <- initDataImportServer [MkTableParser $ mkAllParamsTableParser "allparams"] chan "atrade" + + (forkId, [c1, c2]) <- forkBoundedChan 2 1000 chan + + broker <- mkPaperBroker c2 1000000 ["demo"] withContext (\ctx -> do - qsServer <- startQuoteSourceServer chan ctx (quotesourceEndpoint config) + qsServer <- startQuoteSourceServer c1 ctx (quotesourceEndpoint config) void initGUI window <- windowNew @@ -94,4 +110,5 @@ main = do mainGUI stopQuoteSourceServer qsServer infoM "main" "Main thread done") + killThread forkId diff --git a/quik-connector.cabal b/quik-connector.cabal index 54bbf9d..74a5c12 100644 --- a/quik-connector.cabal +++ b/quik-connector.cabal @@ -21,6 +21,8 @@ library , QuoteSource.TableParser , QuoteSource.TableParsers.AllParamsTableParser , QuoteSource.Server + , Broker + , Broker.PaperBroker ghc-options: -Wincomplete-patterns build-depends: base >= 4.7 && < 5 , Win32 @@ -38,6 +40,8 @@ library , BoundedChan , hslogger , zeromq4-haskell + , hashmap + , hashable default-language: Haskell2010 extra-libraries: "user32" other-modules: System.Win32.XlParser diff --git a/src/Broker.hs b/src/Broker.hs index 106c6bf..cb905a0 100644 --- a/src/Broker.hs +++ b/src/Broker.hs @@ -1,5 +1,13 @@ module Broker ( + SignalId(..), + OrderPrice(..), + Operation(..), + OrderId(..), + OrderState(..), + Order(..), + Trade(..), + Broker(..) ) where import Data.Decimal @@ -12,6 +20,7 @@ data SignalId = SignalId { deriving (Show, Eq) data OrderPrice = Market | Limit Decimal | Stop Decimal Decimal + deriving (Show, Eq) data Operation = Buy | Sell deriving (Show, Eq) @@ -25,6 +34,7 @@ data OrderState = Unsubmitted | Cancelled | Rejected String | Error String + deriving (Show, Eq) data Order = Order { orderId :: OrderId, @@ -52,9 +62,10 @@ data Trade = Trade { data Broker = Broker { accounts :: [String], - setTradeCallback :: Maybe (Trade -> IO ()), - setOrderCallback :: Maybe (Order -> IO ()), - submitOrder :: Order -> IO (), - cancelOrder :: OrderId -> IO () + setTradeCallback :: Maybe (Trade -> IO ()) -> IO(), + setOrderCallback :: Maybe (Order -> IO ()) -> IO(), + submitOrder :: Order -> IO OrderId, + cancelOrder :: OrderId -> IO (), + destroyBroker :: IO () } diff --git a/src/Broker/PaperBroker.hs b/src/Broker/PaperBroker.hs new file mode 100644 index 0000000..f75e1a0 --- /dev/null +++ b/src/Broker/PaperBroker.hs @@ -0,0 +1,141 @@ + +module Broker.PaperBroker ( + PaperBrokerState, + mkPaperBroker +) where + +import Data.Hashable +import Data.Bits +import Control.Concurrent.BoundedChan +import Data.ATrade +import Data.IORef +import qualified Data.HashMap as M +import Broker +import Data.Time.Clock +import Data.Decimal +import Control.Monad +import Control.Concurrent hiding (readChan) +import System.Log.Logger + +data TickMapKey = TickMapKey String DataType + deriving (Show, Eq, Ord) + +instance Hashable TickMapKey where + hashWithSalt salt (TickMapKey s dt) = hashWithSalt salt s `xor` hashWithSalt salt (fromEnum dt) + +data PaperBrokerState = PaperBrokerState { + pbTid :: Maybe ThreadId, + tickChannel :: BoundedChan Tick, + tickMap :: M.Map TickMapKey Tick, + orders :: M.Map OrderId Order, + cash :: Decimal, + orderIdCounter :: OrderId, + tradeCallback :: Maybe (Trade -> IO ()), + orderCallback :: Maybe (Order -> IO ()) +} + +mkPaperBroker :: BoundedChan Tick -> Decimal -> [String] -> IO Broker +mkPaperBroker tickChan startCash accounts = do + state <- newIORef PaperBrokerState { + pbTid = Nothing, + tickChannel = tickChan, + tickMap = M.empty, + orders = M.empty, + cash = startCash, + orderIdCounter = 1, + tradeCallback = Nothing, + orderCallback = Nothing } + + tid <- forkIO $ brokerThread state + atomicModifyIORef' state (\s -> (s { pbTid = Just tid }, ()) ) + + return Broker { + accounts = accounts, + setTradeCallback = pbSetTradeCallback state, + setOrderCallback = pbSetOrderCallback state, + submitOrder = pbSubmitOrder state, + cancelOrder = pbCancelOrder state, + destroyBroker = pbDestroyBroker state } + +brokerThread :: IORef PaperBrokerState -> IO () +brokerThread state = do + chan <- tickChannel <$> readIORef state + forever $ do + tick <- readChan chan + atomicModifyIORef' state (\s -> (s { tickMap = M.insert (makeKey tick) tick (tickMap s) }, ()) ) + where + makeKey tick = TickMapKey (security tick) (datatype tick) + +nextOrderId :: IORef PaperBrokerState -> IO OrderId +nextOrderId state = do + id <- orderIdCounter <$> readIORef state + modifyIORef state (\s -> s { orderIdCounter = id + 1 } ) + return id + +pbSetTradeCallback :: IORef PaperBrokerState -> Maybe (Trade -> IO ()) -> IO() +pbSetTradeCallback state callback = modifyIORef state (\s -> s { tradeCallback = callback } ) + +pbSetOrderCallback :: IORef PaperBrokerState -> Maybe (Order -> IO ()) -> IO() +pbSetOrderCallback state callback = modifyIORef state (\s -> s { orderCallback = callback } ) + +pbSubmitOrder :: IORef PaperBrokerState -> Order -> IO OrderId +pbSubmitOrder state order = do + curState <- readIORef state + case orderPrice order of + Market -> executeMarketOrder state order + Limit price -> submitLimitOrder state order + Stop price trigger -> submitStopOrder state order + + where + executeMarketOrder state order = do + tm <- tickMap <$> readIORef state + oid <- nextOrderId state + case M.lookup key tm of + Nothing -> let newOrder = order { orderState = Error "Unable to execute order: no bid/ask", orderId = oid } in + atomicModifyIORef' state (\s -> (s { orders = M.insert oid newOrder $ orders s }, ()) ) + + Just tick -> let newOrder = order { orderState = Executed, orderId = oid } + tradeVolume = (realFracToDecimal 10 (fromIntegral $ orderQuantity order) * value tick) in do + atomicModifyIORef' state (\s -> (s { orders = M.insert oid newOrder $ orders s , cash = cash s - tradeVolume}, ()) ) + ts <- getCurrentTime + maybeCall tradeCallback state $ mkTrade tick order ts + + return oid + + submitLimitOrder = undefined + submitStopOrder = undefined + + orderDatatype order = case orderOperation order of + Buy -> BestOffer + Sell -> BestBid + + key = TickMapKey (orderSecurity order) (orderDatatype order) + maybeCall proj state arg = do + cb <- proj <$> readIORef state + case cb of + Just callback -> callback arg + Nothing -> return () + + mkTrade :: Tick -> Order -> UTCTime -> Trade + mkTrade tick order timestamp = Trade { + tradeOrderId = orderId order, + tradePrice = value tick, + tradeQuantity = orderQuantity order, + tradeVolume = realFracToDecimal 10 (fromIntegral $ orderQuantity order) * value tick, + tradeVolumeCurrency = "TEST", + tradeAccount = orderAccountId order, + tradeSecurity = orderSecurity order, + tradeTimestamp = timestamp, + tradeSignalId = orderSignalId order } + + +pbCancelOrder :: IORef PaperBrokerState -> OrderId -> IO () +pbCancelOrder state order = undefined + +pbDestroyBroker :: IORef PaperBrokerState -> IO () +pbDestroyBroker state = do + maybeTid <- pbTid <$> readIORef state + case maybeTid of + Just tid -> killThread tid + Nothing -> return () + diff --git a/src/Data/ATrade.hs b/src/Data/ATrade.hs index 5df7d73..e0bd8b8 100644 --- a/src/Data/ATrade.hs +++ b/src/Data/ATrade.hs @@ -24,7 +24,7 @@ data DataType = Unknown | Volatility | TotalSupply | TotalDemand - deriving (Show, Eq) + deriving (Show, Eq, Ord) instance Enum DataType where fromEnum x