From 2abd68ba2f360f6f56a566d0c7c0f25d539922c1 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Thu, 17 Nov 2016 15:22:00 +0700 Subject: [PATCH] BrokerClient: handle timeout --- src/ATrade/Broker/Client.hs | 38 ++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/src/ATrade/Broker/Client.hs b/src/ATrade/Broker/Client.hs index 0c465f0..2cec045 100644 --- a/src/ATrade/Broker/Client.hs +++ b/src/ATrade/Broker/Client.hs @@ -15,9 +15,11 @@ import Control.Concurrent.BoundedChan import Control.Concurrent.MVar import Control.Exception import Control.Monad +import Control.Monad.Loops import Data.Aeson import Data.Int import Data.IORef +import Data.Maybe import Data.List.NonEmpty import qualified Data.Text as T import qualified Data.ByteString.Lazy as BL @@ -28,6 +30,7 @@ import System.Log.Logger data BrokerClientHandle = BrokerClientHandle { tid :: ThreadId, completionMvar :: MVar (), + killMvar :: MVar (), submitOrder :: Order -> IO (Either T.Text OrderId), cancelOrder :: OrderId -> IO (Either T.Text ()), getNotifications :: IO (Either T.Text [Notification]), @@ -35,32 +38,41 @@ data BrokerClientHandle = BrokerClientHandle { respVar :: MVar BrokerServerResponse } -brokerClientThread ctx ep cmd resp comp = do - sock <- socket ctx Req - connect sock $ T.unpack ep - finally (brokerClientThread' sock) (cleanup sock) +brokerClientThread :: Context -> T.Text -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> MVar () -> MVar () -> IO () +brokerClientThread ctx ep cmd resp comp killMv = finally brokerClientThread' cleanup where - cleanup sock = close sock >> putMVar comp () - brokerClientThread' sock = do - forever $ do + cleanup = putMVar comp () + brokerClientThread' = whileM_ (isNothing <$> tryReadMVar killMv) $ do + sock <- socket ctx Req + connect sock $ T.unpack ep + setReceiveTimeout (restrict 1000) sock + finally (brokerClientThread'' sock) (close sock) + brokerClientThread'' sock = whileM_ (isNothing <$> tryReadMVar killMv) $ do request <- takeMVar cmd send sock [] (BL.toStrict $ encode request) - maybeResponse <- decode . BL.fromStrict <$> receive sock - case maybeResponse of - Just response -> putMVar resp response - Nothing -> putMVar resp (ResponseError "Unable to decode response") + events <- poll 1000 [Sock sock [In] Nothing] + if (not . null) events + then do + maybeResponse <- decode . BL.fromStrict <$> receive sock + case maybeResponse of + Just response -> putMVar resp response + Nothing -> putMVar resp (ResponseError "Unable to decode response") + else + putMVar resp (ResponseError "Response timeout") startBrokerClient :: Context -> T.Text -> IO BrokerClientHandle startBrokerClient ctx endpoint = do idCounter <- newIORef 1 compMv <- newEmptyMVar + killMv <- newEmptyMVar cmdVar <- newEmptyMVar :: IO (MVar BrokerServerRequest) respVar <- newEmptyMVar :: IO (MVar BrokerServerResponse) - tid <- forkIO (brokerClientThread ctx endpoint cmdVar respVar compMv) + tid <- forkIO (brokerClientThread ctx endpoint cmdVar respVar compMv killMv) return BrokerClientHandle { tid = tid, completionMvar = compMv, + killMvar = killMv, submitOrder = bcSubmitOrder idCounter cmdVar respVar, cancelOrder = bcCancelOrder idCounter cmdVar respVar, getNotifications = bcGetNotifications idCounter cmdVar respVar, @@ -69,7 +81,7 @@ startBrokerClient ctx endpoint = do } stopBrokerClient :: BrokerClientHandle -> IO () -stopBrokerClient handle = yield >> killThread (tid handle) >> readMVar (completionMvar handle) +stopBrokerClient handle = putMVar (killMvar handle) () >> yield >> killThread (tid handle) >> readMVar (completionMvar handle) nextId cnt = atomicModifyIORef' cnt (\v -> (v + 1, v))