Browse Source

Fix: shutdown on win32

master
Denis Tereshkin 9 years ago
parent
commit
ff0875e989
  1. 49
      src/ATrade/Broker/Server.hs
  2. 12
      src/ATrade/QuoteSource/Client.hs

49
src/ATrade/Broker/Server.hs

@ -66,6 +66,7 @@ startBrokerServer brokers c ep = do @@ -66,6 +66,7 @@ startBrokerServer brokers c ep = do
}
mapM_ (\bro -> setNotificationCallback bro (Just $ notificationCallback state)) brokers
debugM "Broker.Server" "Forking broker server thread"
BrokerServerHandle <$> forkIO (brokerServerThread state) <*> pure compMv
notificationCallback :: IORef BrokerServerState -> Notification -> IO ()
@ -85,29 +86,31 @@ brokerServerThread state = finally brokerServerThread' cleanup @@ -85,29 +86,31 @@ brokerServerThread state = finally brokerServerThread' cleanup
where
brokerServerThread' = forever $ do
sock <- bsSocket <$> readIORef state
msg <- receiveMulti sock
case msg of
[peerId, _, payload] ->
case decode . BL.fromStrict $ payload of
Just request -> do
let sqnum = requestSqnum request
-- Here, we should check if previous packet sequence number is the same
-- If it is, we should resend previous response
lastPackMap <- lastPacket <$> readIORef state
case shouldResend sqnum peerId lastPackMap of
Just response -> sendMessage sock peerId response -- Resend
Nothing -> do
-- Handle incoming request, send response
response <- handleMessage peerId request
sendMessage sock peerId response
-- and store response in case we'll need to resend it
atomicMapIORef state (\s -> s { lastPacket = M.insert peerId (sqnum, response) (lastPacket s)})
Nothing -> do
-- If we weren't able to parse request, we should send error
-- but shouldn't update lastPacket
let response = ResponseError "Invalid request"
sendMessage sock peerId response
_ -> warningM "Broker.Server" ("Invalid packet received: " ++ show msg)
evs <- poll 200 [Sock sock [In] Nothing]
when ((L.length . L.head) evs > 0) $ do
msg <- receiveMulti sock
case msg of
[peerId, _, payload] ->
case decode . BL.fromStrict $ payload of
Just request -> do
let sqnum = requestSqnum request
-- Here, we should check if previous packet sequence number is the same
-- If it is, we should resend previous response
lastPackMap <- lastPacket <$> readIORef state
case shouldResend sqnum peerId lastPackMap of
Just response -> sendMessage sock peerId response -- Resend
Nothing -> do
-- Handle incoming request, send response
response <- handleMessage peerId request
sendMessage sock peerId response
-- and store response in case we'll need to resend it
atomicMapIORef state (\s -> s { lastPacket = M.insert peerId (sqnum, response) (lastPacket s)})
Nothing -> do
-- If we weren't able to parse request, we should send error
-- but shouldn't update lastPacket
let response = ResponseError "Invalid request"
sendMessage sock peerId response
_ -> warningM "Broker.Server" ("Invalid packet received: " ++ show msg)
shouldResend sqnum peerId lastPackMap = case M.lookup peerId lastPackMap of
Just (lastSqnum, response) -> if sqnum == lastSqnum

12
src/ATrade/QuoteSource/Client.hs

@ -9,10 +9,12 @@ import ATrade.Types @@ -9,10 +9,12 @@ import ATrade.Types
import Control.Concurrent hiding (readChan, writeChan)
import Control.Concurrent.BoundedChan
import Control.Concurrent.MVar
import Control.Monad
import Control.Exception
import Data.List.NonEmpty
import qualified Data.Text as T
import qualified Data.ByteString.Lazy as BL
import qualified Data.List as L
import Data.Text.Encoding
import System.ZMQ4
import System.Log.Logger
@ -33,10 +35,12 @@ startQuoteSourceClient chan tickers ctx endpoint = do @@ -33,10 +35,12 @@ startQuoteSourceClient chan tickers ctx endpoint = do
return QuoteSourceClientHandle { tid = tid, completionMvar = compMv }
where
clientThread sock = do
rawTick <- fmap BL.fromStrict <$> receiveMulti sock
case deserializeTick rawTick of
Just tick -> writeChan chan tick
Nothing -> warningM "QuoteSource.Client" "Error: can't deserialize tick"
evs <- poll 200 [Sock sock [In] Nothing]
when ((L.length . L.head) evs > 0) $ do
rawTick <- fmap BL.fromStrict <$> receiveMulti sock
case deserializeTick rawTick of
Just tick -> writeChan chan tick
Nothing -> warningM "QuoteSource.Client" "Error: can't deserialize tick"
cleanup compMv sock = close sock >> putMVar compMv ()
stopQuoteSourceClient :: QuoteSourceClientHandle -> IO ()

Loading…
Cancel
Save