|
|
|
|
@ -16,7 +16,6 @@ import qualified Data.ByteString as B hiding (putStrLn)
@@ -16,7 +16,6 @@ import qualified Data.ByteString as B hiding (putStrLn)
|
|
|
|
|
import qualified Data.ByteString.Lazy as BL hiding (putStrLn) |
|
|
|
|
import Control.Monad.Loops |
|
|
|
|
import Control.Monad.Extra |
|
|
|
|
import System.Log.Logger |
|
|
|
|
import System.Timeout |
|
|
|
|
import System.ZMQ4 |
|
|
|
|
|
|
|
|
|
@ -32,9 +31,7 @@ withZMQTradeSink ctx tradeSinkEp f = do
@@ -32,9 +31,7 @@ withZMQTradeSink ctx tradeSinkEp f = do
|
|
|
|
|
|
|
|
|
|
sinkThread ctx tradeSinkEp killMv chan = whileM_ (not <$> wasKilled) $ |
|
|
|
|
handle (\e -> do |
|
|
|
|
warningM "Broker.Server" $ "Trade sink: exception: " ++ show (e :: SomeException) ++ "; isZMQ: " ++ show (isZMQError e) |
|
|
|
|
when (isZMQError e) $ do |
|
|
|
|
debugM "Broker.Server" "Rethrowing exception" |
|
|
|
|
when (isZMQError (e :: SomeException)) $ do |
|
|
|
|
throwIO e) sinkThread' |
|
|
|
|
where |
|
|
|
|
sinkThread' = withSocket ctx Dealer (\sock -> do |
|
|
|
|
@ -52,7 +49,7 @@ sinkThread ctx tradeSinkEp killMv chan = whileM_ (not <$> wasKilled) $
@@ -52,7 +49,7 @@ sinkThread ctx tradeSinkEp killMv chan = whileM_ (not <$> wasKilled) $
|
|
|
|
|
sendMulti sock $ B.empty :| [BL.toStrict $ encode TradeSinkHeartBeat] |
|
|
|
|
events <- poll 5000 [Sock sock [In] Nothing] |
|
|
|
|
if L.null . L.head $ events |
|
|
|
|
then warningM "Broker.Server" "Trade sink timeout" |
|
|
|
|
then return () |
|
|
|
|
else do |
|
|
|
|
void . receive $ sock -- anything will do |
|
|
|
|
sinkThread'' sock |
|
|
|
|
|