diff --git a/src/ATrade/Broker/Server.hs b/src/ATrade/Broker/Server.hs index 6bfbee3..447dba8 100644 --- a/src/ATrade/Broker/Server.hs +++ b/src/ATrade/Broker/Server.hs @@ -137,31 +137,29 @@ brokerServerThread state = finally brokerServerThread' cleanup where brokerServerThread' = whileM_ (fmap killMvar (readIORef state) >>= fmap isNothing . tryReadMVar) $ do sock <- bsSocket <$> readIORef state - evs <- poll 200 [Sock sock [In] Nothing] - when ((L.length . L.head) evs > 0) $ do - msg <- receiveMulti sock - case msg of - [peerId, _, payload] -> - case decode . BL.fromStrict $ payload of - Just request -> do - let sqnum = requestSqnum request - -- Here, we should check if previous packet sequence number is the same - -- If it is, we should resend previous response - lastPackMap <- lastPacket <$> readIORef state - case shouldResend sqnum peerId lastPackMap of - Just response -> sendMessage sock peerId response -- Resend - Nothing -> do - -- Handle incoming request, send response - response <- handleMessage peerId request - sendMessage sock peerId response - -- and store response in case we'll need to resend it - atomicMapIORef state (\s -> s { lastPacket = M.insert peerId (sqnum, response) (lastPacket s)}) - Nothing -> do - -- If we weren't able to parse request, we should send error - -- but shouldn't update lastPacket - let response = ResponseError "Invalid request" - sendMessage sock peerId response - _ -> warningM "Broker.Server" ("Invalid packet received: " ++ show msg) + msg <- timeout 1000000 $ receiveMulti sock + case msg of + Just [peerId, _, payload] -> + case decode . BL.fromStrict $ payload of + Just request -> do + let sqnum = requestSqnum request + -- Here, we should check if previous packet sequence number is the same + -- If it is, we should resend previous response + lastPackMap <- lastPacket <$> readIORef state + case shouldResend sqnum peerId lastPackMap of + Just response -> sendMessage sock peerId response -- Resend + Nothing -> do + -- Handle incoming request, send response + response <- handleMessage peerId request + sendMessage sock peerId response + -- and store response in case we'll need to resend it + atomicMapIORef state (\s -> s { lastPacket = M.insert peerId (sqnum, response) (lastPacket s)}) + Nothing -> do + -- If we weren't able to parse request, we should send error + -- but shouldn't update lastPacket + let response = ResponseError "Invalid request" + sendMessage sock peerId response + _ -> warningM "Broker.Server" ("Invalid packet received: " ++ show msg) shouldResend sqnum peerId lastPackMap = case M.lookup peerId lastPackMap of Just (lastSqnum, response) -> if sqnum == lastSqnum