Browse Source

BrokerServer: initial implementation

master
Denis Tereshkin 9 years ago
parent
commit
f062b7f54e
  1. 4
      libatrade.cabal
  2. 3
      src/ATrade/Broker/Protocol.hs
  3. 90
      src/ATrade/Broker/Server.hs
  4. 10
      src/ATrade/Types.hs
  5. 3
      test/Spec.hs
  6. 99
      test/TestBrokerServer.hs

4
libatrade.cabal

@ -18,6 +18,7 @@ library @@ -18,6 +18,7 @@ library
exposed-modules: ATrade.Types
, ATrade.QuoteSource.Server
, ATrade.Broker.Protocol
, ATrade.Broker.Server
build-depends: base >= 4.7 && < 5
, Decimal
, time
@ -30,6 +31,7 @@ library @@ -30,6 +31,7 @@ library
, hslogger
, zeromq4-haskell
, unordered-containers
, containers
default-language: Haskell2010
executable libatrade-exe
@ -66,10 +68,12 @@ test-suite libatrade-test @@ -66,10 +68,12 @@ test-suite libatrade-test
, zeromq4-haskell
, bytestring
, monad-loops
, uuid
ghc-options: -threaded -rtsopts -with-rtsopts=-N
default-language: Haskell2010
other-modules: ArbitraryInstances
, TestBrokerProtocol
, TestBrokerServer
, TestQuoteSourceServer
, TestTypes

3
src/ATrade/Broker/Protocol.hs

@ -3,7 +3,8 @@ @@ -3,7 +3,8 @@
module ATrade.Broker.Protocol (
BrokerServerRequest(..),
BrokerServerResponse(..),
Notification(..)
Notification(..),
RequestSqnum(..)
) where
import qualified Data.HashMap.Strict as HM

90
src/ATrade/Broker/Server.hs

@ -0,0 +1,90 @@ @@ -0,0 +1,90 @@
module ATrade.Broker.Server (
startBrokerServer,
stopBrokerServer,
BrokerInterface(..)
) where
import ATrade.Types
import ATrade.Broker.Protocol
import System.ZMQ4
import qualified Data.Map as M
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
import qualified Data.Text as T
import qualified Data.List as L
import Data.Aeson
import Data.Time.Clock
import Data.IORef
import Control.Concurrent
import Control.Exception
import Control.Monad
import System.Log.Logger
newtype OrderIdGenerator = IO OrderId
data BrokerInterface = BrokerInterface {
accounts :: [T.Text],
setNotificationCallback :: Maybe (Notification -> IO()) -> IO (),
submitOrder :: Order -> IO (),
cancelOrder :: OrderId -> IO (),
stopBroker :: IO ()
}
data BrokerServerState = BrokerServerState {
bsSocket :: Socket Router,
orderMap :: M.Map OrderId B.ByteString, -- Matches 0mq client identities with corresponding orders
lastPacket :: M.Map B.ByteString (RequestSqnum, B.ByteString),
pendingNotifications :: [(Notification, UTCTime)], -- List of tuples (Order with new state, Time when notification enqueued)
brokers :: [BrokerInterface],
completionMvar :: MVar ()
}
data BrokerServerHandle = BrokerServerHandle ThreadId (MVar ())
startBrokerServer :: [BrokerInterface] -> Context -> T.Text -> IO BrokerServerHandle
startBrokerServer brokers c ep = do
sock <- socket c Router
bind sock (T.unpack ep)
tid <- myThreadId
compMv <- newEmptyMVar
state <- newIORef BrokerServerState {
bsSocket = sock,
orderMap = M.empty,
lastPacket = M.empty,
pendingNotifications = [],
brokers = brokers,
completionMvar = compMv
}
BrokerServerHandle <$> forkIO (brokerServerThread state) <*> pure compMv
brokerServerThread state = finally brokerServerThread' cleanup
where
brokerServerThread' = forever $ do
sock <- bsSocket <$> readIORef state
receiveMulti sock >>= handleMessage
cleanup = do
sock <- bsSocket <$> readIORef state
close sock
mv <- completionMvar <$> readIORef state
putMVar mv ()
handleMessage :: [B.ByteString] -> IO ()
handleMessage [peerId, _, payload] = do
bros <- brokers <$> readIORef state
case decode . BL.fromStrict $ payload of
Just (RequestSubmitOrder sqnum order) ->
case findBroker (orderAccountId order) bros of
Just bro -> submitOrder bro order
Nothing -> return ()
Nothing -> return ()
handleMessage x = warningM "Broker.Server" ("Invalid packet received: " ++ show x)
findBroker account = L.find (L.elem account . accounts)
stopBrokerServer :: BrokerServerHandle -> IO ()
stopBrokerServer (BrokerServerHandle tid compMv) = yield >> killThread tid >> readMVar compMv

10
src/ATrade/Types.hs

@ -236,8 +236,8 @@ type OrderId = Integer @@ -236,8 +236,8 @@ type OrderId = Integer
data Order = Order {
orderId :: OrderId,
orderAccountId :: String,
orderSecurity :: String,
orderAccountId :: T.Text,
orderSecurity :: T.Text,
orderPrice :: OrderPrice,
orderQuantity :: Integer,
orderExecutedQuantity :: Integer,
@ -278,10 +278,10 @@ data Trade = Trade { @@ -278,10 +278,10 @@ data Trade = Trade {
tradePrice :: Decimal,
tradeQuantity :: Integer,
tradeVolume :: Decimal,
tradeVolumeCurrency :: String,
tradeVolumeCurrency :: T.Text,
tradeOperation :: Operation,
tradeAccount :: String,
tradeSecurity :: String,
tradeAccount :: T.Text,
tradeSecurity :: T.Text,
tradeTimestamp :: UTCTime,
tradeSignalId :: SignalId }
deriving (Show, Eq)

3
test/Spec.hs

@ -1,6 +1,7 @@ @@ -1,6 +1,7 @@
import qualified TestTypes
import qualified TestBrokerProtocol
import qualified TestBrokerServer
import qualified TestQuoteSourceServer
import Test.Tasty
@ -12,5 +13,5 @@ properties :: TestTree @@ -12,5 +13,5 @@ properties :: TestTree
properties = testGroup "Properties" [TestTypes.properties, TestBrokerProtocol.properties]
unitTests :: TestTree
unitTests = testGroup "Unit-tests" [TestQuoteSourceServer.unitTests]
unitTests = testGroup "Unit-tests" [TestQuoteSourceServer.unitTests, TestBrokerServer.unitTests]

99
test/TestBrokerServer.hs

@ -0,0 +1,99 @@ @@ -0,0 +1,99 @@
{-# LANGUAGE OverloadedStrings #-}
module TestBrokerServer (
unitTests
) where
import Test.Tasty
import Test.Tasty.SmallCheck as SC
import Test.Tasty.QuickCheck as QC
import Test.Tasty.HUnit
import ATrade.Types
import qualified Data.ByteString.Lazy as BL
import ATrade.Broker.Server
import ATrade.Broker.Protocol
import qualified Data.Text as T
import Control.Monad
import Control.Monad.Loops
import Control.Concurrent.MVar
import Control.Concurrent.BoundedChan
import Control.Concurrent hiding (writeChan)
import Control.Exception
import System.ZMQ4
import Data.Aeson
import Data.Time.Clock
import Data.Time.Calendar
import Data.Maybe
import Data.IORef
import Data.UUID as U
import Data.UUID.V4 as UV4
data MockBrokerState = MockBrokerState {
orders :: [Order],
notificationCallback :: Maybe (Notification -> IO ())
}
mockSubmitOrder :: IORef MockBrokerState -> Order -> IO ()
mockSubmitOrder state order = do
atomicModifyIORef' state (\s -> (s { orders = submittedOrder : orders s }, ()))
maybeCb <- notificationCallback <$> readIORef state
case maybeCb of
Just cb -> cb $ OrderNotification (orderId order) Submitted
Nothing -> return ()
where
submittedOrder = order { orderState = Submitted }
mockCancelOrder :: IORef MockBrokerState -> OrderId -> IO ()
mockCancelOrder state = undefined
mockStopBroker :: IORef MockBrokerState -> IO ()
mockStopBroker state = return ()
mkMockBroker accs = do
state <- newIORef MockBrokerState {
orders = [],
notificationCallback = Nothing
}
return (BrokerInterface {
accounts = accs,
setNotificationCallback = \cb -> atomicModifyIORef' state (\s -> (s { notificationCallback = cb }, ())),
submitOrder = mockSubmitOrder state,
cancelOrder = mockCancelOrder state,
stopBroker = mockStopBroker state
}, state)
unitTests = testGroup "Broker.Server" [testBrokerServerStartStop, testBrokerServerSubmitOrder]
testBrokerServerStartStop = testCase "Broker Server starts and stops" $ withContext (\ctx -> do
ep <- toText <$> UV4.nextRandom
broS <- startBrokerServer [] ctx ("inproc://brokerserver" `T.append` ep)
stopBrokerServer broS)
testBrokerServerSubmitOrder = testCase "Broker Server submits order" $ withContext (\ctx -> do
uid <- toText <$> UV4.nextRandom
(mockBroker, broState) <- mkMockBroker ["demo"]
let ep = "inproc://brokerserver" `T.append` uid
let order = Order {
orderId = 0,
orderAccountId = "demo",
orderSecurity = "FOO",
orderPrice = Market,
orderQuantity = 10,
orderExecutedQuantity = 0,
orderOperation = Buy,
orderState = Unsubmitted,
orderSignalId = SignalId "" "" ""
}
bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do
connect sock (T.unpack ep)
send sock [] (BL.toStrict . encode $ RequestSubmitOrder 1 order)
threadDelay 100000
s <- readIORef broState
(length . orders) s @?= 1
)))
Loading…
Cancel
Save