From f062b7f54ef2ca7dbc8701b23fa34c4b27c7545d Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Wed, 28 Sep 2016 09:09:37 +0700 Subject: [PATCH] BrokerServer: initial implementation --- libatrade.cabal | 4 ++ src/ATrade/Broker/Protocol.hs | 3 +- src/ATrade/Broker/Server.hs | 90 +++++++++++++++++++++++++++++++ src/ATrade/Types.hs | 10 ++-- test/Spec.hs | 3 +- test/TestBrokerServer.hs | 99 +++++++++++++++++++++++++++++++++++ 6 files changed, 202 insertions(+), 7 deletions(-) create mode 100644 src/ATrade/Broker/Server.hs create mode 100644 test/TestBrokerServer.hs diff --git a/libatrade.cabal b/libatrade.cabal index 0d90a6d..dbbca02 100644 --- a/libatrade.cabal +++ b/libatrade.cabal @@ -18,6 +18,7 @@ library exposed-modules: ATrade.Types , ATrade.QuoteSource.Server , ATrade.Broker.Protocol + , ATrade.Broker.Server build-depends: base >= 4.7 && < 5 , Decimal , time @@ -30,6 +31,7 @@ library , hslogger , zeromq4-haskell , unordered-containers + , containers default-language: Haskell2010 executable libatrade-exe @@ -66,10 +68,12 @@ test-suite libatrade-test , zeromq4-haskell , bytestring , monad-loops + , uuid ghc-options: -threaded -rtsopts -with-rtsopts=-N default-language: Haskell2010 other-modules: ArbitraryInstances , TestBrokerProtocol + , TestBrokerServer , TestQuoteSourceServer , TestTypes diff --git a/src/ATrade/Broker/Protocol.hs b/src/ATrade/Broker/Protocol.hs index a6679f7..c99334a 100644 --- a/src/ATrade/Broker/Protocol.hs +++ b/src/ATrade/Broker/Protocol.hs @@ -3,7 +3,8 @@ module ATrade.Broker.Protocol ( BrokerServerRequest(..), BrokerServerResponse(..), - Notification(..) + Notification(..), + RequestSqnum(..) ) where import qualified Data.HashMap.Strict as HM diff --git a/src/ATrade/Broker/Server.hs b/src/ATrade/Broker/Server.hs new file mode 100644 index 0000000..c2c90f4 --- /dev/null +++ b/src/ATrade/Broker/Server.hs @@ -0,0 +1,90 @@ + +module ATrade.Broker.Server ( + startBrokerServer, + stopBrokerServer, + BrokerInterface(..) +) where + +import ATrade.Types +import ATrade.Broker.Protocol +import System.ZMQ4 +import qualified Data.Map as M +import qualified Data.ByteString as B +import qualified Data.ByteString.Lazy as BL +import qualified Data.Text as T +import qualified Data.List as L +import Data.Aeson +import Data.Time.Clock +import Data.IORef +import Control.Concurrent +import Control.Exception +import Control.Monad +import System.Log.Logger + +newtype OrderIdGenerator = IO OrderId + +data BrokerInterface = BrokerInterface { + accounts :: [T.Text], + setNotificationCallback :: Maybe (Notification -> IO()) -> IO (), + submitOrder :: Order -> IO (), + cancelOrder :: OrderId -> IO (), + stopBroker :: IO () +} + +data BrokerServerState = BrokerServerState { + bsSocket :: Socket Router, + orderMap :: M.Map OrderId B.ByteString, -- Matches 0mq client identities with corresponding orders + lastPacket :: M.Map B.ByteString (RequestSqnum, B.ByteString), + pendingNotifications :: [(Notification, UTCTime)], -- List of tuples (Order with new state, Time when notification enqueued) + brokers :: [BrokerInterface], + completionMvar :: MVar () +} + +data BrokerServerHandle = BrokerServerHandle ThreadId (MVar ()) + +startBrokerServer :: [BrokerInterface] -> Context -> T.Text -> IO BrokerServerHandle +startBrokerServer brokers c ep = do + sock <- socket c Router + bind sock (T.unpack ep) + tid <- myThreadId + compMv <- newEmptyMVar + state <- newIORef BrokerServerState { + bsSocket = sock, + orderMap = M.empty, + lastPacket = M.empty, + pendingNotifications = [], + brokers = brokers, + completionMvar = compMv + } + + BrokerServerHandle <$> forkIO (brokerServerThread state) <*> pure compMv + +brokerServerThread state = finally brokerServerThread' cleanup + where + brokerServerThread' = forever $ do + sock <- bsSocket <$> readIORef state + receiveMulti sock >>= handleMessage + + cleanup = do + sock <- bsSocket <$> readIORef state + close sock + mv <- completionMvar <$> readIORef state + putMVar mv () + + handleMessage :: [B.ByteString] -> IO () + handleMessage [peerId, _, payload] = do + bros <- brokers <$> readIORef state + case decode . BL.fromStrict $ payload of + Just (RequestSubmitOrder sqnum order) -> + case findBroker (orderAccountId order) bros of + Just bro -> submitOrder bro order + Nothing -> return () + Nothing -> return () + + handleMessage x = warningM "Broker.Server" ("Invalid packet received: " ++ show x) + findBroker account = L.find (L.elem account . accounts) + + +stopBrokerServer :: BrokerServerHandle -> IO () +stopBrokerServer (BrokerServerHandle tid compMv) = yield >> killThread tid >> readMVar compMv + diff --git a/src/ATrade/Types.hs b/src/ATrade/Types.hs index f0300d3..0ec4fb2 100644 --- a/src/ATrade/Types.hs +++ b/src/ATrade/Types.hs @@ -236,8 +236,8 @@ type OrderId = Integer data Order = Order { orderId :: OrderId, - orderAccountId :: String, - orderSecurity :: String, + orderAccountId :: T.Text, + orderSecurity :: T.Text, orderPrice :: OrderPrice, orderQuantity :: Integer, orderExecutedQuantity :: Integer, @@ -278,10 +278,10 @@ data Trade = Trade { tradePrice :: Decimal, tradeQuantity :: Integer, tradeVolume :: Decimal, - tradeVolumeCurrency :: String, + tradeVolumeCurrency :: T.Text, tradeOperation :: Operation, - tradeAccount :: String, - tradeSecurity :: String, + tradeAccount :: T.Text, + tradeSecurity :: T.Text, tradeTimestamp :: UTCTime, tradeSignalId :: SignalId } deriving (Show, Eq) diff --git a/test/Spec.hs b/test/Spec.hs index 4c11d9e..4f1bcdf 100644 --- a/test/Spec.hs +++ b/test/Spec.hs @@ -1,6 +1,7 @@ import qualified TestTypes import qualified TestBrokerProtocol +import qualified TestBrokerServer import qualified TestQuoteSourceServer import Test.Tasty @@ -12,5 +13,5 @@ properties :: TestTree properties = testGroup "Properties" [TestTypes.properties, TestBrokerProtocol.properties] unitTests :: TestTree -unitTests = testGroup "Unit-tests" [TestQuoteSourceServer.unitTests] +unitTests = testGroup "Unit-tests" [TestQuoteSourceServer.unitTests, TestBrokerServer.unitTests] diff --git a/test/TestBrokerServer.hs b/test/TestBrokerServer.hs new file mode 100644 index 0000000..6314fde --- /dev/null +++ b/test/TestBrokerServer.hs @@ -0,0 +1,99 @@ +{-# LANGUAGE OverloadedStrings #-} + +module TestBrokerServer ( + unitTests +) where + +import Test.Tasty +import Test.Tasty.SmallCheck as SC +import Test.Tasty.QuickCheck as QC +import Test.Tasty.HUnit + +import ATrade.Types +import qualified Data.ByteString.Lazy as BL +import ATrade.Broker.Server +import ATrade.Broker.Protocol +import qualified Data.Text as T +import Control.Monad +import Control.Monad.Loops +import Control.Concurrent.MVar +import Control.Concurrent.BoundedChan +import Control.Concurrent hiding (writeChan) +import Control.Exception +import System.ZMQ4 +import Data.Aeson +import Data.Time.Clock +import Data.Time.Calendar +import Data.Maybe +import Data.IORef +import Data.UUID as U +import Data.UUID.V4 as UV4 + +data MockBrokerState = MockBrokerState { + orders :: [Order], + notificationCallback :: Maybe (Notification -> IO ()) +} + +mockSubmitOrder :: IORef MockBrokerState -> Order -> IO () +mockSubmitOrder state order = do + atomicModifyIORef' state (\s -> (s { orders = submittedOrder : orders s }, ())) + maybeCb <- notificationCallback <$> readIORef state + case maybeCb of + Just cb -> cb $ OrderNotification (orderId order) Submitted + Nothing -> return () + where + submittedOrder = order { orderState = Submitted } + +mockCancelOrder :: IORef MockBrokerState -> OrderId -> IO () +mockCancelOrder state = undefined + +mockStopBroker :: IORef MockBrokerState -> IO () +mockStopBroker state = return () + + +mkMockBroker accs = do + state <- newIORef MockBrokerState { + orders = [], + notificationCallback = Nothing + } + + return (BrokerInterface { + accounts = accs, + setNotificationCallback = \cb -> atomicModifyIORef' state (\s -> (s { notificationCallback = cb }, ())), + submitOrder = mockSubmitOrder state, + cancelOrder = mockCancelOrder state, + stopBroker = mockStopBroker state + }, state) + + +unitTests = testGroup "Broker.Server" [testBrokerServerStartStop, testBrokerServerSubmitOrder] + +testBrokerServerStartStop = testCase "Broker Server starts and stops" $ withContext (\ctx -> do + ep <- toText <$> UV4.nextRandom + broS <- startBrokerServer [] ctx ("inproc://brokerserver" `T.append` ep) + stopBrokerServer broS) + +testBrokerServerSubmitOrder = testCase "Broker Server submits order" $ withContext (\ctx -> do + uid <- toText <$> UV4.nextRandom + (mockBroker, broState) <- mkMockBroker ["demo"] + let ep = "inproc://brokerserver" `T.append` uid + let order = Order { + orderId = 0, + orderAccountId = "demo", + orderSecurity = "FOO", + orderPrice = Market, + orderQuantity = 10, + orderExecutedQuantity = 0, + orderOperation = Buy, + orderState = Unsubmitted, + orderSignalId = SignalId "" "" "" + } + bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS -> + withSocket ctx Req (\sock -> do + connect sock (T.unpack ep) + send sock [] (BL.toStrict . encode $ RequestSubmitOrder 1 order) + threadDelay 100000 + s <- readIORef broState + (length . orders) s @?= 1 + ))) +