|
|
|
@ -69,7 +69,10 @@ brokerServerThread state = finally brokerServerThread' cleanup |
|
|
|
where |
|
|
|
where |
|
|
|
brokerServerThread' = forever $ do |
|
|
|
brokerServerThread' = forever $ do |
|
|
|
sock <- bsSocket <$> readIORef state |
|
|
|
sock <- bsSocket <$> readIORef state |
|
|
|
receiveMulti sock >>= handleMessage >>= sendMessage sock |
|
|
|
msg <- receiveMulti sock |
|
|
|
|
|
|
|
case msg of |
|
|
|
|
|
|
|
[peerId, _, payload] -> handleMessage payload >>= sendMessage sock peerId |
|
|
|
|
|
|
|
_ -> warningM "Broker.Server" ("Invalid packet received: " ++ show msg) |
|
|
|
|
|
|
|
|
|
|
|
cleanup = do |
|
|
|
cleanup = do |
|
|
|
sock <- bsSocket <$> readIORef state |
|
|
|
sock <- bsSocket <$> readIORef state |
|
|
|
@ -77,8 +80,8 @@ brokerServerThread state = finally brokerServerThread' cleanup |
|
|
|
mv <- completionMvar <$> readIORef state |
|
|
|
mv <- completionMvar <$> readIORef state |
|
|
|
putMVar mv () |
|
|
|
putMVar mv () |
|
|
|
|
|
|
|
|
|
|
|
handleMessage :: [B.ByteString] -> IO (B.ByteString, BrokerServerResponse) |
|
|
|
handleMessage :: B.ByteString -> IO BrokerServerResponse |
|
|
|
handleMessage [peerId, _, payload] = do |
|
|
|
handleMessage payload = do |
|
|
|
bros <- brokers <$> readIORef state |
|
|
|
bros <- brokers <$> readIORef state |
|
|
|
case decode . BL.fromStrict $ payload of |
|
|
|
case decode . BL.fromStrict $ payload of |
|
|
|
Just (RequestSubmitOrder sqnum order) -> |
|
|
|
Just (RequestSubmitOrder sqnum order) -> |
|
|
|
@ -87,23 +90,20 @@ brokerServerThread state = finally brokerServerThread' cleanup |
|
|
|
oid <- nextOrderId |
|
|
|
oid <- nextOrderId |
|
|
|
submitOrder bro order { orderId = oid } |
|
|
|
submitOrder bro order { orderId = oid } |
|
|
|
atomicModifyIORef' state (\s -> (s { orderToBroker = M.insert oid bro (orderToBroker s)}, ())) |
|
|
|
atomicModifyIORef' state (\s -> (s { orderToBroker = M.insert oid bro (orderToBroker s)}, ())) |
|
|
|
return (peerId, ResponseOrderSubmitted oid) |
|
|
|
return $ ResponseOrderSubmitted oid |
|
|
|
|
|
|
|
|
|
|
|
Nothing -> return (peerId, ResponseError "Unknown account") |
|
|
|
Nothing -> return $ ResponseError "Unknown account" |
|
|
|
Just (RequestCancelOrder sqnum oid) -> do |
|
|
|
Just (RequestCancelOrder sqnum oid) -> do |
|
|
|
m <- orderToBroker <$> readIORef state |
|
|
|
m <- orderToBroker <$> readIORef state |
|
|
|
case M.lookup oid m of |
|
|
|
case M.lookup oid m of |
|
|
|
Just bro -> do |
|
|
|
Just bro -> do |
|
|
|
cancelOrder bro oid |
|
|
|
cancelOrder bro oid |
|
|
|
return (peerId, ResponseOrderCancelled oid) |
|
|
|
return $ ResponseOrderCancelled oid |
|
|
|
Nothing -> return (peerId, ResponseError "Unknown order") |
|
|
|
Nothing -> return $ ResponseError "Unknown order" |
|
|
|
Just _ -> return (peerId, ResponseError "Not implemented") |
|
|
|
Just _ -> return $ ResponseError "Not implemented" |
|
|
|
Nothing -> return (peerId, ResponseError "Unable to parse request") |
|
|
|
Nothing -> return $ ResponseError "Unable to parse request" |
|
|
|
handleMessage x = do |
|
|
|
|
|
|
|
warningM "Broker.Server" ("Invalid packet received: " ++ show x) |
|
|
|
sendMessage sock peerId resp = sendMulti sock (peerId :| [B.empty, BL.toStrict . encode $ resp]) |
|
|
|
return (B.empty, ResponseError "Invalid packet structure") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
sendMessage sock (peerId, resp) = sendMulti sock (peerId :| [B.empty, BL.toStrict . encode $ resp]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
findBrokerForAccount account = L.find (L.elem account . accounts) |
|
|
|
findBrokerForAccount account = L.find (L.elem account . accounts) |
|
|
|
nextOrderId = atomicModifyIORef' state (\s -> ( s {orderIdCounter = 1 + orderIdCounter s}, orderIdCounter s)) |
|
|
|
nextOrderId = atomicModifyIORef' state (\s -> ( s {orderIdCounter = 1 + orderIdCounter s}, orderIdCounter s)) |
|
|
|
|