From ff0875e989d61861fb93e5a52262f27723091134 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Fri, 30 Sep 2016 17:02:39 +0700 Subject: [PATCH] Fix: shutdown on win32 --- src/ATrade/Broker/Server.hs | 49 +++++++++++++++++--------------- src/ATrade/QuoteSource/Client.hs | 12 +++++--- 2 files changed, 34 insertions(+), 27 deletions(-) diff --git a/src/ATrade/Broker/Server.hs b/src/ATrade/Broker/Server.hs index 76cd2aa..cd5a0e7 100644 --- a/src/ATrade/Broker/Server.hs +++ b/src/ATrade/Broker/Server.hs @@ -66,6 +66,7 @@ startBrokerServer brokers c ep = do } mapM_ (\bro -> setNotificationCallback bro (Just $ notificationCallback state)) brokers + debugM "Broker.Server" "Forking broker server thread" BrokerServerHandle <$> forkIO (brokerServerThread state) <*> pure compMv notificationCallback :: IORef BrokerServerState -> Notification -> IO () @@ -85,29 +86,31 @@ brokerServerThread state = finally brokerServerThread' cleanup where brokerServerThread' = forever $ do sock <- bsSocket <$> readIORef state - msg <- receiveMulti sock - case msg of - [peerId, _, payload] -> - case decode . BL.fromStrict $ payload of - Just request -> do - let sqnum = requestSqnum request - -- Here, we should check if previous packet sequence number is the same - -- If it is, we should resend previous response - lastPackMap <- lastPacket <$> readIORef state - case shouldResend sqnum peerId lastPackMap of - Just response -> sendMessage sock peerId response -- Resend - Nothing -> do - -- Handle incoming request, send response - response <- handleMessage peerId request - sendMessage sock peerId response - -- and store response in case we'll need to resend it - atomicMapIORef state (\s -> s { lastPacket = M.insert peerId (sqnum, response) (lastPacket s)}) - Nothing -> do - -- If we weren't able to parse request, we should send error - -- but shouldn't update lastPacket - let response = ResponseError "Invalid request" - sendMessage sock peerId response - _ -> warningM "Broker.Server" ("Invalid packet received: " ++ show msg) + evs <- poll 200 [Sock sock [In] Nothing] + when ((L.length . L.head) evs > 0) $ do + msg <- receiveMulti sock + case msg of + [peerId, _, payload] -> + case decode . BL.fromStrict $ payload of + Just request -> do + let sqnum = requestSqnum request + -- Here, we should check if previous packet sequence number is the same + -- If it is, we should resend previous response + lastPackMap <- lastPacket <$> readIORef state + case shouldResend sqnum peerId lastPackMap of + Just response -> sendMessage sock peerId response -- Resend + Nothing -> do + -- Handle incoming request, send response + response <- handleMessage peerId request + sendMessage sock peerId response + -- and store response in case we'll need to resend it + atomicMapIORef state (\s -> s { lastPacket = M.insert peerId (sqnum, response) (lastPacket s)}) + Nothing -> do + -- If we weren't able to parse request, we should send error + -- but shouldn't update lastPacket + let response = ResponseError "Invalid request" + sendMessage sock peerId response + _ -> warningM "Broker.Server" ("Invalid packet received: " ++ show msg) shouldResend sqnum peerId lastPackMap = case M.lookup peerId lastPackMap of Just (lastSqnum, response) -> if sqnum == lastSqnum diff --git a/src/ATrade/QuoteSource/Client.hs b/src/ATrade/QuoteSource/Client.hs index 1426e0b..289ad8d 100644 --- a/src/ATrade/QuoteSource/Client.hs +++ b/src/ATrade/QuoteSource/Client.hs @@ -9,10 +9,12 @@ import ATrade.Types import Control.Concurrent hiding (readChan, writeChan) import Control.Concurrent.BoundedChan import Control.Concurrent.MVar +import Control.Monad import Control.Exception import Data.List.NonEmpty import qualified Data.Text as T import qualified Data.ByteString.Lazy as BL +import qualified Data.List as L import Data.Text.Encoding import System.ZMQ4 import System.Log.Logger @@ -33,10 +35,12 @@ startQuoteSourceClient chan tickers ctx endpoint = do return QuoteSourceClientHandle { tid = tid, completionMvar = compMv } where clientThread sock = do - rawTick <- fmap BL.fromStrict <$> receiveMulti sock - case deserializeTick rawTick of - Just tick -> writeChan chan tick - Nothing -> warningM "QuoteSource.Client" "Error: can't deserialize tick" + evs <- poll 200 [Sock sock [In] Nothing] + when ((L.length . L.head) evs > 0) $ do + rawTick <- fmap BL.fromStrict <$> receiveMulti sock + case deserializeTick rawTick of + Just tick -> writeChan chan tick + Nothing -> warningM "QuoteSource.Client" "Error: can't deserialize tick" cleanup compMv sock = close sock >> putMVar compMv () stopQuoteSourceClient :: QuoteSourceClientHandle -> IO ()