|
|
|
|
@ -30,7 +30,7 @@ withZMQTradeSink ctx tradeSinkEp f = do
@@ -30,7 +30,7 @@ withZMQTradeSink ctx tradeSinkEp f = do
|
|
|
|
|
where |
|
|
|
|
sink = BC.writeChan |
|
|
|
|
|
|
|
|
|
sinkThread ctx tradeSinkEp killMv chan = whileM_ (not <$> wasKilled) $ do |
|
|
|
|
sinkThread ctx tradeSinkEp killMv chan = whileM_ (not <$> wasKilled) $ |
|
|
|
|
handle (\e -> do |
|
|
|
|
warningM "Broker.Server" $ "Trade sink: exception: " ++ show (e :: SomeException) ++ "; isZMQ: " ++ show (isZMQError e) |
|
|
|
|
when (isZMQError e) $ do |
|
|
|
|
@ -39,20 +39,23 @@ sinkThread ctx tradeSinkEp killMv chan = whileM_ (not <$> wasKilled) $ do
@@ -39,20 +39,23 @@ sinkThread ctx tradeSinkEp killMv chan = whileM_ (not <$> wasKilled) $ do
|
|
|
|
|
where |
|
|
|
|
sinkThread' = withSocket ctx Dealer (\sock -> do |
|
|
|
|
connect sock $ T.unpack tradeSinkEp |
|
|
|
|
whenM (not <$> wasKilled) $ do |
|
|
|
|
maybeTrade <- BC.tryReadChan chan |
|
|
|
|
case maybeTrade of |
|
|
|
|
Just trade -> do |
|
|
|
|
sendMulti sock $ B.empty :| [encodeTrade trade] |
|
|
|
|
void $ receiveMulti sock |
|
|
|
|
Nothing -> do |
|
|
|
|
sendMulti sock $ B.empty :| [BL.toStrict $ encode TradeSinkHeartBeat] |
|
|
|
|
events <- poll 1000 [Sock sock [In] Nothing] |
|
|
|
|
if L.null . L.head $ events |
|
|
|
|
then warningM "Broker.Server" "Trade sink timeout" |
|
|
|
|
else do |
|
|
|
|
void . receive $ sock -- anything will do |
|
|
|
|
sinkThread') |
|
|
|
|
whenM (not <$> wasKilled) $ sinkThread'' sock) |
|
|
|
|
|
|
|
|
|
sinkThread'' sock = do |
|
|
|
|
maybeTrade <- BC.tryReadChan chan |
|
|
|
|
case maybeTrade of |
|
|
|
|
Just trade -> do |
|
|
|
|
sendMulti sock $ B.empty :| [encodeTrade trade] |
|
|
|
|
void $ receiveMulti sock |
|
|
|
|
Nothing -> do |
|
|
|
|
threadDelay 1000000 |
|
|
|
|
sendMulti sock $ B.empty :| [BL.toStrict $ encode TradeSinkHeartBeat] |
|
|
|
|
events <- poll 5000 [Sock sock [In] Nothing] |
|
|
|
|
if L.null . L.head $ events |
|
|
|
|
then warningM "Broker.Server" "Trade sink timeout" |
|
|
|
|
else do |
|
|
|
|
void . receive $ sock -- anything will do |
|
|
|
|
sinkThread'' sock |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
isZMQError e = "ZMQError" `L.isPrefixOf` show e |
|
|
|
|
|