From 2fc5d292960cc1702db260b7eaba0f2212ae0ded Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Fri, 23 Sep 2016 16:48:42 +0700 Subject: [PATCH] Basic BrokerServer --- quik-connector.cabal | 3 + src/Broker.hs | 67 +++++++++++++++++++++- src/Broker/PaperBroker.hs | 8 ++- src/Broker/Server.hs | 117 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 193 insertions(+), 2 deletions(-) create mode 100644 src/Broker/Server.hs diff --git a/quik-connector.cabal b/quik-connector.cabal index 74a5c12..b13a114 100644 --- a/quik-connector.cabal +++ b/quik-connector.cabal @@ -23,6 +23,7 @@ library , QuoteSource.Server , Broker , Broker.PaperBroker + , Broker.Server ghc-options: -Wincomplete-patterns build-depends: base >= 4.7 && < 5 , Win32 @@ -42,6 +43,8 @@ library , zeromq4-haskell , hashmap , hashable + , unordered-containers + , aeson default-language: Haskell2010 extra-libraries: "user32" other-modules: System.Win32.XlParser diff --git a/src/Broker.hs b/src/Broker.hs index cb905a0..b82316e 100644 --- a/src/Broker.hs +++ b/src/Broker.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE OverloadedStrings #-} module Broker ( SignalId(..), @@ -12,6 +13,9 @@ module Broker ( import Data.Decimal import Data.Time.Clock +import Data.Aeson +import Data.Aeson.Types +import Control.Monad data SignalId = SignalId { strategyId :: String, @@ -19,12 +23,44 @@ data SignalId = SignalId { comment :: String } deriving (Show, Eq) -data OrderPrice = Market | Limit Decimal | Stop Decimal Decimal +instance FromJSON SignalId where + parseJSON (Object o) = SignalId <$> + o .: "strategy-id" .!= "" <*> + o .: "signal-name" .!= "" <*> + o .: "commen" .!= "" + parseJSON _ = fail "Should be object" + +data OrderPrice = Market | Limit Decimal | Stop Decimal Decimal | StopMarket Decimal deriving (Show, Eq) +decimal :: (RealFrac r) => r -> Decimal +decimal = realFracToDecimal 10 + +instance FromJSON OrderPrice where + parseJSON (String s) = when (s /= "market") (fail "If string, then should be 'market'") >> + return Market + + parseJSON (Number n) = return $ Limit $ decimal n + parseJSON (Object v) = do + triggerPrice <- v .: "trigger" :: Parser Double + execPrice <- v .: "execution" + case execPrice of + (String s) -> when (s /= "market") (fail "If string, then should be 'market'") >> return $ StopMarket (decimal triggerPrice) + (Number n) -> return $ Stop (decimal triggerPrice) (decimal n) + _ -> fail "Should be either number or 'market'" + + parseJSON _ = fail "OrderPrice" + data Operation = Buy | Sell deriving (Show, Eq) +instance FromJSON Operation where + parseJSON (String s) + | s == "buy" = return Buy + | s == "sell" = return Sell + | otherwise = fail "Should be either 'buy' or 'sell'" + parseJSON _ = fail "Should be string" + type OrderId = Integer data OrderState = Unsubmitted @@ -36,6 +72,19 @@ data OrderState = Unsubmitted | Error String deriving (Show, Eq) +instance FromJSON OrderState where + parseJSON (String s) + | s == "unsubmitted" = return Unsubmitted + | s == "submitted" = return Submitted + | s == "partially-executed" = return PartiallyExecuted + | s == "executed" = return Executed + | s == "cancelled" = return Cancelled + | s == "rejected" = return $ Rejected "" + | s == "error" = return $ Broker.Error "" + | otherwise = fail "Invlaid state" + + parseJSON _ = fail "Should be string" + data Order = Order { orderId :: OrderId, orderAccountId :: String, @@ -48,6 +97,21 @@ data Order = Order { orderSignalId :: SignalId } deriving (Show, Eq) +instance FromJSON Order where + parseJSON (Object v) = Order <$> + v .:? "order-id" .!= 0 <*> + v .: "account" <*> + v .: "security" <*> + v .: "price" <*> + v .: "quantity" <*> + v .:? "executed-quantity" .!= 0 <*> + v .: "operation" <*> + v .: "state" .!= Unsubmitted <*> + v .: "signal-id" + + parseJSON _ = fail "Should be string" + + data Trade = Trade { tradeOrderId :: OrderId, tradePrice :: Decimal, @@ -66,6 +130,7 @@ data Broker = Broker { setOrderCallback :: Maybe (Order -> IO ()) -> IO(), submitOrder :: Order -> IO OrderId, cancelOrder :: OrderId -> IO (), + getOrder :: OrderId -> IO (Maybe Order), destroyBroker :: IO () } diff --git a/src/Broker/PaperBroker.hs b/src/Broker/PaperBroker.hs index f75e1a0..5a368a2 100644 --- a/src/Broker/PaperBroker.hs +++ b/src/Broker/PaperBroker.hs @@ -55,6 +55,7 @@ mkPaperBroker tickChan startCash accounts = do setOrderCallback = pbSetOrderCallback state, submitOrder = pbSubmitOrder state, cancelOrder = pbCancelOrder state, + getOrder = pbGetOrder state, destroyBroker = pbDestroyBroker state } brokerThread :: IORef PaperBrokerState -> IO () @@ -85,6 +86,7 @@ pbSubmitOrder state order = do Market -> executeMarketOrder state order Limit price -> submitLimitOrder state order Stop price trigger -> submitStopOrder state order + StopMarket trigger -> submitStopMarketOrder state order where executeMarketOrder state order = do @@ -104,7 +106,8 @@ pbSubmitOrder state order = do submitLimitOrder = undefined submitStopOrder = undefined - + submitStopMarketOrder = undefined + orderDatatype order = case orderOperation order of Buy -> BestOffer Sell -> BestBid @@ -139,3 +142,6 @@ pbDestroyBroker state = do Just tid -> killThread tid Nothing -> return () +pbGetOrder :: IORef PaperBrokerState -> OrderId -> IO (Maybe Order) +pbGetOrder state oid = M.lookup oid . orders <$> readIORef state + diff --git a/src/Broker/Server.hs b/src/Broker/Server.hs new file mode 100644 index 0000000..221b8bb --- /dev/null +++ b/src/Broker/Server.hs @@ -0,0 +1,117 @@ +{-# LANGUAGE OverloadedStrings #-} + +module Broker.Server ( +) where + +import System.ZMQ4 +import qualified Data.Map as M +import qualified Data.ByteString as B +import qualified Data.ByteString.Lazy as BL +import Data.ATrade +import Data.IORef +import qualified Data.HashMap.Strict as HM +import Broker +import Control.Concurrent +import Control.Exception +import Data.Aeson +import Data.Aeson.Types +import Data.Int +import Data.Time.Clock +import Data.List as L +import qualified Data.List.NonEmpty as LN +import System.Log.Logger + +type RequestSqnum = Int64 +type PeerId = B.ByteString + +data BrokerServerState = BrokerServerState { + bsSocket :: Socket Router, + orderMap :: M.Map OrderId B.ByteString, -- Matches 0mq client identities with corresponding orders + lastPacket :: M.Map B.ByteString (RequestSqnum, B.ByteString), + pendingNotifications :: [(Order, UTCTime)], -- List of tuples (Order with new state, Time when notification enqueued) + brokers :: [Broker] +} + +newtype BrokerServerHandle = BrokerServerHandle ThreadId + +mkBrokerServer :: [Broker] -> Context -> String -> IO BrokerServerHandle +mkBrokerServer brokers c ep = do + sock <- socket c Router + bind sock ep + tid <- myThreadId + state <- newIORef BrokerServerState { + bsSocket = sock, + orderMap = M.empty, + lastPacket = M.empty, + pendingNotifications = [], + brokers = brokers + } + BrokerServerHandle <$> forkIO (brokerServerThread state) + +data BrokerServerMessage = SubmitOrder RequestSqnum Order | CancelOrder RequestSqnum OrderId +data BrokerServerResponse = OrderIdResponse OrderId +instance ToJSON BrokerServerResponse where + toJSON (OrderIdResponse oid) = object ["order-id" .= oid] + +parseMessage :: Value -> Parser BrokerServerMessage +parseMessage (Object obj) = do + rqsqnum <- obj .: "request-sqnum" :: Parser Int64 + case HM.lookup "order" obj of + Just (Object orderJson) -> do + order <- obj .: "order" + return $ SubmitOrder rqsqnum order + _ -> case HM.lookup "cancel-order" obj of + Just orderIdJson -> do + order <- obj .: "cancel-order" + return $ CancelOrder rqsqnum order + Nothing -> fail "Either 'order' or 'cancel-order' field should be present" + where + +parseMessage _ = fail "Should be object" + +brokerServerThread :: IORef BrokerServerState -> IO () +brokerServerThread state = finally brokerServerThread' cleanup + where + cleanup = do + s <- bsSocket <$> readIORef state + close s + + brokerServerThread' = do + s <- bsSocket <$> readIORef state + msg <- receiveMulti s + tryDeliverPendingNotifications + handleMessage msg + + tryDeliverPendingNotifications = return () + + handleMessage :: [B.ByteString] -> IO () + handleMessage (peerId:_:json:_) = maybe (return ()) (handleMessage' peerId) (decode (BL.fromStrict json) >>= parseMaybe parseMessage) + handleMessage _ = warningM "BrokerServer" "Invalid packet received, should be at least 3 parts" + + handleMessage' :: PeerId -> BrokerServerMessage -> IO () + handleMessage' peerId (SubmitOrder sqnum order) = do + s <- bsSocket <$> readIORef state + lastPack <- M.lookup peerId . lastPacket <$> readIORef state + case shouldResend lastPack sqnum of + Just packet -> sendMulti s $ LN.fromList [peerId, B.empty, packet] + Nothing -> do + brs <- brokers <$> readIORef state + case findBroker brs (orderAccountId order) of + Just broker -> do + orderId <- submitOrder broker order + let packet = BL.toStrict . encode $ OrderIdResponse orderId + atomicModifyIORef' state (\s -> (s { lastPacket = M.insert peerId (sqnum, packet) $ lastPacket s }, ())) + sendMulti s $ LN.fromList [peerId, B.empty, packet] + + Nothing -> warningM "BrokerServer" $ "Invalid account requested: " ++ orderAccountId order + where + shouldResend lastPack sqnum = case lastPack of + Nothing -> Nothing + Just (oldSqnum, packet) -> if oldSqnum == sqnum + then Just packet + else Nothing + findBroker brokers account = L.find (L.elem account . accounts) brokers + + handleMessage' peerId (CancelOrder sqnum orderId) = undefined + +