|
|
|
@ -137,31 +137,29 @@ brokerServerThread state = finally brokerServerThread' cleanup |
|
|
|
where |
|
|
|
where |
|
|
|
brokerServerThread' = whileM_ (fmap killMvar (readIORef state) >>= fmap isNothing . tryReadMVar) $ do |
|
|
|
brokerServerThread' = whileM_ (fmap killMvar (readIORef state) >>= fmap isNothing . tryReadMVar) $ do |
|
|
|
sock <- bsSocket <$> readIORef state |
|
|
|
sock <- bsSocket <$> readIORef state |
|
|
|
evs <- poll 200 [Sock sock [In] Nothing] |
|
|
|
msg <- timeout 1000000 $ receiveMulti sock |
|
|
|
when ((L.length . L.head) evs > 0) $ do |
|
|
|
case msg of |
|
|
|
msg <- receiveMulti sock |
|
|
|
Just [peerId, _, payload] -> |
|
|
|
case msg of |
|
|
|
case decode . BL.fromStrict $ payload of |
|
|
|
[peerId, _, payload] -> |
|
|
|
Just request -> do |
|
|
|
case decode . BL.fromStrict $ payload of |
|
|
|
let sqnum = requestSqnum request |
|
|
|
Just request -> do |
|
|
|
-- Here, we should check if previous packet sequence number is the same |
|
|
|
let sqnum = requestSqnum request |
|
|
|
-- If it is, we should resend previous response |
|
|
|
-- Here, we should check if previous packet sequence number is the same |
|
|
|
lastPackMap <- lastPacket <$> readIORef state |
|
|
|
-- If it is, we should resend previous response |
|
|
|
case shouldResend sqnum peerId lastPackMap of |
|
|
|
lastPackMap <- lastPacket <$> readIORef state |
|
|
|
Just response -> sendMessage sock peerId response -- Resend |
|
|
|
case shouldResend sqnum peerId lastPackMap of |
|
|
|
Nothing -> do |
|
|
|
Just response -> sendMessage sock peerId response -- Resend |
|
|
|
-- Handle incoming request, send response |
|
|
|
Nothing -> do |
|
|
|
response <- handleMessage peerId request |
|
|
|
-- Handle incoming request, send response |
|
|
|
sendMessage sock peerId response |
|
|
|
response <- handleMessage peerId request |
|
|
|
-- and store response in case we'll need to resend it |
|
|
|
sendMessage sock peerId response |
|
|
|
atomicMapIORef state (\s -> s { lastPacket = M.insert peerId (sqnum, response) (lastPacket s)}) |
|
|
|
-- and store response in case we'll need to resend it |
|
|
|
Nothing -> do |
|
|
|
atomicMapIORef state (\s -> s { lastPacket = M.insert peerId (sqnum, response) (lastPacket s)}) |
|
|
|
-- If we weren't able to parse request, we should send error |
|
|
|
Nothing -> do |
|
|
|
-- but shouldn't update lastPacket |
|
|
|
-- If we weren't able to parse request, we should send error |
|
|
|
let response = ResponseError "Invalid request" |
|
|
|
-- but shouldn't update lastPacket |
|
|
|
sendMessage sock peerId response |
|
|
|
let response = ResponseError "Invalid request" |
|
|
|
_ -> warningM "Broker.Server" ("Invalid packet received: " ++ show msg) |
|
|
|
sendMessage sock peerId response |
|
|
|
|
|
|
|
_ -> warningM "Broker.Server" ("Invalid packet received: " ++ show msg) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
shouldResend sqnum peerId lastPackMap = case M.lookup peerId lastPackMap of |
|
|
|
shouldResend sqnum peerId lastPackMap = case M.lookup peerId lastPackMap of |
|
|
|
Just (lastSqnum, response) -> if sqnum == lastSqnum |
|
|
|
Just (lastSqnum, response) -> if sqnum == lastSqnum |
|
|
|
|