Browse Source

BrokerClient: handle timeout

master
Denis Tereshkin 9 years ago
parent
commit
2abd68ba2f
  1. 38
      src/ATrade/Broker/Client.hs

38
src/ATrade/Broker/Client.hs

@ -15,9 +15,11 @@ import Control.Concurrent.BoundedChan @@ -15,9 +15,11 @@ import Control.Concurrent.BoundedChan
import Control.Concurrent.MVar
import Control.Exception
import Control.Monad
import Control.Monad.Loops
import Data.Aeson
import Data.Int
import Data.IORef
import Data.Maybe
import Data.List.NonEmpty
import qualified Data.Text as T
import qualified Data.ByteString.Lazy as BL
@ -28,6 +30,7 @@ import System.Log.Logger @@ -28,6 +30,7 @@ import System.Log.Logger
data BrokerClientHandle = BrokerClientHandle {
tid :: ThreadId,
completionMvar :: MVar (),
killMvar :: MVar (),
submitOrder :: Order -> IO (Either T.Text OrderId),
cancelOrder :: OrderId -> IO (Either T.Text ()),
getNotifications :: IO (Either T.Text [Notification]),
@ -35,32 +38,41 @@ data BrokerClientHandle = BrokerClientHandle { @@ -35,32 +38,41 @@ data BrokerClientHandle = BrokerClientHandle {
respVar :: MVar BrokerServerResponse
}
brokerClientThread ctx ep cmd resp comp = do
sock <- socket ctx Req
connect sock $ T.unpack ep
finally (brokerClientThread' sock) (cleanup sock)
brokerClientThread :: Context -> T.Text -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> MVar () -> MVar () -> IO ()
brokerClientThread ctx ep cmd resp comp killMv = finally brokerClientThread' cleanup
where
cleanup sock = close sock >> putMVar comp ()
brokerClientThread' sock = do
forever $ do
cleanup = putMVar comp ()
brokerClientThread' = whileM_ (isNothing <$> tryReadMVar killMv) $ do
sock <- socket ctx Req
connect sock $ T.unpack ep
setReceiveTimeout (restrict 1000) sock
finally (brokerClientThread'' sock) (close sock)
brokerClientThread'' sock = whileM_ (isNothing <$> tryReadMVar killMv) $ do
request <- takeMVar cmd
send sock [] (BL.toStrict $ encode request)
maybeResponse <- decode . BL.fromStrict <$> receive sock
case maybeResponse of
Just response -> putMVar resp response
Nothing -> putMVar resp (ResponseError "Unable to decode response")
events <- poll 1000 [Sock sock [In] Nothing]
if (not . null) events
then do
maybeResponse <- decode . BL.fromStrict <$> receive sock
case maybeResponse of
Just response -> putMVar resp response
Nothing -> putMVar resp (ResponseError "Unable to decode response")
else
putMVar resp (ResponseError "Response timeout")
startBrokerClient :: Context -> T.Text -> IO BrokerClientHandle
startBrokerClient ctx endpoint = do
idCounter <- newIORef 1
compMv <- newEmptyMVar
killMv <- newEmptyMVar
cmdVar <- newEmptyMVar :: IO (MVar BrokerServerRequest)
respVar <- newEmptyMVar :: IO (MVar BrokerServerResponse)
tid <- forkIO (brokerClientThread ctx endpoint cmdVar respVar compMv)
tid <- forkIO (brokerClientThread ctx endpoint cmdVar respVar compMv killMv)
return BrokerClientHandle {
tid = tid,
completionMvar = compMv,
killMvar = killMv,
submitOrder = bcSubmitOrder idCounter cmdVar respVar,
cancelOrder = bcCancelOrder idCounter cmdVar respVar,
getNotifications = bcGetNotifications idCounter cmdVar respVar,
@ -69,7 +81,7 @@ startBrokerClient ctx endpoint = do @@ -69,7 +81,7 @@ startBrokerClient ctx endpoint = do
}
stopBrokerClient :: BrokerClientHandle -> IO ()
stopBrokerClient handle = yield >> killThread (tid handle) >> readMVar (completionMvar handle)
stopBrokerClient handle = putMVar (killMvar handle) () >> yield >> killThread (tid handle) >> readMVar (completionMvar handle)
nextId cnt = atomicModifyIORef' cnt (\v -> (v + 1, v))

Loading…
Cancel
Save