|
|
|
@ -8,6 +8,7 @@ module ATrade.Broker.Server ( |
|
|
|
import ATrade.Types |
|
|
|
import ATrade.Types |
|
|
|
import ATrade.Broker.Protocol |
|
|
|
import ATrade.Broker.Protocol |
|
|
|
import System.ZMQ4 |
|
|
|
import System.ZMQ4 |
|
|
|
|
|
|
|
import Data.List.NonEmpty |
|
|
|
import qualified Data.Map as M |
|
|
|
import qualified Data.Map as M |
|
|
|
import qualified Data.ByteString as B |
|
|
|
import qualified Data.ByteString as B |
|
|
|
import qualified Data.ByteString.Lazy as BL |
|
|
|
import qualified Data.ByteString.Lazy as BL |
|
|
|
@ -37,7 +38,8 @@ data BrokerServerState = BrokerServerState { |
|
|
|
lastPacket :: M.Map B.ByteString (RequestSqnum, B.ByteString), |
|
|
|
lastPacket :: M.Map B.ByteString (RequestSqnum, B.ByteString), |
|
|
|
pendingNotifications :: [(Notification, UTCTime)], -- List of tuples (Order with new state, Time when notification enqueued) |
|
|
|
pendingNotifications :: [(Notification, UTCTime)], -- List of tuples (Order with new state, Time when notification enqueued) |
|
|
|
brokers :: [BrokerInterface], |
|
|
|
brokers :: [BrokerInterface], |
|
|
|
completionMvar :: MVar () |
|
|
|
completionMvar :: MVar (), |
|
|
|
|
|
|
|
orderIdCounter :: OrderId |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
data BrokerServerHandle = BrokerServerHandle ThreadId (MVar ()) |
|
|
|
data BrokerServerHandle = BrokerServerHandle ThreadId (MVar ()) |
|
|
|
@ -54,7 +56,8 @@ startBrokerServer brokers c ep = do |
|
|
|
lastPacket = M.empty, |
|
|
|
lastPacket = M.empty, |
|
|
|
pendingNotifications = [], |
|
|
|
pendingNotifications = [], |
|
|
|
brokers = brokers, |
|
|
|
brokers = brokers, |
|
|
|
completionMvar = compMv |
|
|
|
completionMvar = compMv, |
|
|
|
|
|
|
|
orderIdCounter = 1 |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
BrokerServerHandle <$> forkIO (brokerServerThread state) <*> pure compMv |
|
|
|
BrokerServerHandle <$> forkIO (brokerServerThread state) <*> pure compMv |
|
|
|
@ -63,7 +66,7 @@ brokerServerThread state = finally brokerServerThread' cleanup |
|
|
|
where |
|
|
|
where |
|
|
|
brokerServerThread' = forever $ do |
|
|
|
brokerServerThread' = forever $ do |
|
|
|
sock <- bsSocket <$> readIORef state |
|
|
|
sock <- bsSocket <$> readIORef state |
|
|
|
receiveMulti sock >>= handleMessage |
|
|
|
receiveMulti sock >>= handleMessage >>= sendMessage sock |
|
|
|
|
|
|
|
|
|
|
|
cleanup = do |
|
|
|
cleanup = do |
|
|
|
sock <- bsSocket <$> readIORef state |
|
|
|
sock <- bsSocket <$> readIORef state |
|
|
|
@ -71,18 +74,27 @@ brokerServerThread state = finally brokerServerThread' cleanup |
|
|
|
mv <- completionMvar <$> readIORef state |
|
|
|
mv <- completionMvar <$> readIORef state |
|
|
|
putMVar mv () |
|
|
|
putMVar mv () |
|
|
|
|
|
|
|
|
|
|
|
handleMessage :: [B.ByteString] -> IO () |
|
|
|
handleMessage :: [B.ByteString] -> IO (B.ByteString, BrokerServerResponse) |
|
|
|
handleMessage [peerId, _, payload] = do |
|
|
|
handleMessage [peerId, _, payload] = do |
|
|
|
bros <- brokers <$> readIORef state |
|
|
|
bros <- brokers <$> readIORef state |
|
|
|
case decode . BL.fromStrict $ payload of |
|
|
|
case decode . BL.fromStrict $ payload of |
|
|
|
Just (RequestSubmitOrder sqnum order) -> |
|
|
|
Just (RequestSubmitOrder sqnum order) -> |
|
|
|
case findBroker (orderAccountId order) bros of |
|
|
|
case findBroker (orderAccountId order) bros of |
|
|
|
Just bro -> submitOrder bro order |
|
|
|
Just bro -> do |
|
|
|
Nothing -> return () |
|
|
|
oid <- nextOrderId |
|
|
|
Nothing -> return () |
|
|
|
submitOrder bro order { orderId = oid } |
|
|
|
|
|
|
|
return (peerId, ResponseOrderSubmitted oid) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Nothing -> error "foobar" |
|
|
|
|
|
|
|
Nothing -> error "foobar" |
|
|
|
|
|
|
|
handleMessage x = do |
|
|
|
|
|
|
|
warningM "Broker.Server" ("Invalid packet received: " ++ show x) |
|
|
|
|
|
|
|
error "foobar" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
sendMessage sock (peerId, resp) = sendMulti sock (peerId :| [B.empty, BL.toStrict . encode $ resp]) |
|
|
|
|
|
|
|
|
|
|
|
handleMessage x = warningM "Broker.Server" ("Invalid packet received: " ++ show x) |
|
|
|
|
|
|
|
findBroker account = L.find (L.elem account . accounts) |
|
|
|
findBroker account = L.find (L.elem account . accounts) |
|
|
|
|
|
|
|
nextOrderId = atomicModifyIORef' state (\s -> ( s {orderIdCounter = 1 + orderIdCounter s}, orderIdCounter s)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
stopBrokerServer :: BrokerServerHandle -> IO () |
|
|
|
stopBrokerServer :: BrokerServerHandle -> IO () |
|
|
|
|