diff --git a/src/ATrade/Broker/Server.hs b/src/ATrade/Broker/Server.hs index 8830b43..d49858e 100644 --- a/src/ATrade/Broker/Server.hs +++ b/src/ATrade/Broker/Server.hs @@ -109,11 +109,12 @@ tradeSinkHandler :: Context -> IORef BrokerServerState -> T.Text -> IO () tradeSinkHandler c state tradeSinkEp = when (tradeSinkEp /= "") $ whileM_ (not <$> wasKilled) $ withSocket c Dealer (\sock -> do + debugM "Broker.Server" "Connecting trade sink socket" chan <- tradeSink <$> readIORef state connect sock $ T.unpack tradeSinkEp - timeoutMv <- newEmptyMVar + timeoutMv <- newIORef False threadDelay 1000000 - whileM_ (andM [not <$> wasKilled, isNothing <$> tryReadMVar timeoutMv]) $ do + whileM_ (andM [not <$> wasKilled, readIORef timeoutMv]) $ do maybeTrade <- tryReadChan chan case maybeTrade of Just trade -> sendMulti sock $ B.empty :| [encodeTrade trade] @@ -123,7 +124,9 @@ tradeSinkHandler c state tradeSinkEp = when (tradeSinkEp /= "") $ events <- poll 5000 [Sock sock [In] Nothing] if not . L.null . L.head $ events then void . receive $ sock -- anything will do - else putMVar timeoutMv ()) + else do + writeIORef timeoutMv True + warningM "Broker.Server" "Trade sink timeout") where wasKilled = fmap killMvar (readIORef state) >>= fmap isJust . tryReadMVar diff --git a/src/ATrade/Types.hs b/src/ATrade/Types.hs index 596469d..d33c89f 100644 --- a/src/ATrade/Types.hs +++ b/src/ATrade/Types.hs @@ -376,3 +376,11 @@ defaultClientSecurityParams = ClientSecurityParams { cspServerCertificate = Nothing } +data TickerInfo = TickerInfo { + tiTicker :: TickerId, + tiClass :: T.Text, + tiBase :: Maybe TickerId, + tiLotSize :: Integer, + tiTickSize :: Decimal +} deriving (Show, Eq) +