diff --git a/src/HistoryProviderServer.hs b/src/HistoryProviderServer.hs index f35209b..ae625be 100644 --- a/src/HistoryProviderServer.hs +++ b/src/HistoryProviderServer.hs @@ -19,7 +19,7 @@ import ATrade.Types (Bar (..), BarTimeframe (..), TickerId, toDouble) import Colog (HasLog (getLogAction, setLogAction), LogAction (LogAction, unLogAction)) -import Control.Concurrent (ThreadId, forkIO) +import Control.Concurrent (ThreadId) import Control.Concurrent.STM (TVar, atomically, newTVarIO, readTVarIO, writeTVar) import Control.Exception (bracket) @@ -45,6 +45,7 @@ import Data.Time.Clock (diffUTCTime, getCurrentTime, secondsToDiffTime) import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds) import Prelude hiding (log) +import SlaveThread (fork) import System.ZMQ4 (Context, Router (Router), bind, close, receiveMulti, sendMulti, socket) @@ -182,7 +183,7 @@ startHistoryProviderServer ctx endpoint txmlH tisH logger = do , eTxml = txmlH , eTisHandle = tisH } - hpsThreadId <- liftIO . forkIO $ (runReaderT . unApp) workThread env + hpsThreadId <- liftIO . fork $ (runReaderT . unApp) workThread env pure HistoryProviderServerHandle {..} stopHistoryProviderServer :: diff --git a/src/TXMLConnector.hs b/src/TXMLConnector.hs index 4576d46..e918f83 100644 --- a/src/TXMLConnector.hs +++ b/src/TXMLConnector.hs @@ -22,7 +22,7 @@ import ATrade.Types (Order, OrderId) import Colog (HasLog (getLogAction, setLogAction), LogAction (LogAction, unLogAction)) import Config (TransaqConnectorConfig (..)) -import Control.Concurrent (ThreadId, forkIO) +import Control.Concurrent (ThreadId) import Control.Concurrent.BoundedChan (BoundedChan) import Control.Concurrent.STM (TVar, atomically, newEmptyTMVar, newEmptyTMVarIO, @@ -40,6 +40,7 @@ import qualified Data.Text as T import Data.Time.Clock (UTCTime, getCurrentTime) import GHC.Exts (IsList (..)) import Prelude hiding (log) +import SlaveThread (fork) import TickerInfoServer (TickerInfoServerHandle) import TickTable (newTickTable) import Transaq (TransaqResponse) @@ -143,7 +144,7 @@ start logger' config' qssChannel' tisH = do , runVar = runVar' , timerVar = timerVar' } - workThreadId <- forkIO $ (runReaderT . unApp) workThread env + workThreadId <- fork $ (runReaderT . unApp) workThread env return $ TXMLConnectorHandle { threadId = workThreadId , notificationQueue = notificationQueue' diff --git a/src/TXMLConnector/Internal.hs b/src/TXMLConnector/Internal.hs index 8dc59c7..878bb49 100644 --- a/src/TXMLConnector/Internal.hs +++ b/src/TXMLConnector/Internal.hs @@ -22,7 +22,7 @@ import Config (SubscriptionConfig (Subscriptio transaqHost, transaqLogLevel, transaqLogPath, transaqLogin, transaqPassword, transaqPort) -import Control.Concurrent (forkIO, threadDelay) +import Control.Concurrent (threadDelay) import Control.Concurrent.STM (TVar, atomically, modifyTVar', orElse, putTMVar, readTMVar, readTVar, readTVarIO, @@ -36,6 +36,7 @@ import qualified Data.Bimap as BM import Data.Maybe (mapMaybe) import qualified Data.Text as T import qualified Deque.Strict as D +import SlaveThread (fork) import Text.XML.Light.Input (parseXML) import Text.XML.Light.Types (Content (Elem), Element (elName), @@ -204,7 +205,7 @@ workThread = do Just cb -> do serverConnectionState <- asks serverConnected timerVar' <- asks timerVar - void $ liftIO $ forkIO $ whileM $ do + void $ liftIO $ fork $ whileM $ do threadDelay 5000000 void . liftIO . atomically $ tryPutTMVar timerVar' () connStatus <- liftIO . readTVarIO $ serverConnectionState @@ -427,7 +428,7 @@ handleConnected = do Left result -> do case headMay (parseXML result) >>= parseContent of Just (TransaqResponseResult (ResponseSuccess (Just transactionId))) -> do - State <- asks brokerState + brState <- asks brokerState respVar <- asks responseVar liftIO $ atomically $ do modifyTVar' (bsOrderMap brState) (M.insert (orderId order) order) @@ -475,7 +476,9 @@ handleConnected = do requestTimeoutValue = 10 sendCancelOrder transactionId' = do - v <- sendCommand . toXml $ (CommandCancelOrder $ toInteger transactionId) + respVar <- asks responseVar + resp <- liftIO . atomically $ readTMVar respVar + v <- sendCommand . toXml $ (CommandCancelOrder $ toInteger transactionId') case v of Left result -> do log Debug "TXMLConnector.WorkThread" $ "Cancellation result: " <> (T.pack . show) result diff --git a/src/TickerInfoServer.hs b/src/TickerInfoServer.hs index 8b36745..250dc62 100644 --- a/src/TickerInfoServer.hs +++ b/src/TickerInfoServer.hs @@ -16,7 +16,7 @@ import ATrade.Logging (Message, logWith) import ATrade.Types (Tick, TickerId, security) import Colog (LogAction) -import Control.Concurrent (ThreadId, forkIO) +import Control.Concurrent (ThreadId) import Control.Concurrent.STM (TVar, atomically, newTVarIO, readTVarIO) import Control.Concurrent.STM.TVar (modifyTVar', writeTVar) @@ -34,6 +34,7 @@ import qualified Data.Text as T import Data.Text.Encoding (decodeUtf8With, encodeUtf8) import Data.Text.Encoding.Error (replace) import Prelude hiding (log) +import SlaveThread (fork) import System.ZMQ4 (Context, Router (Router), bind, connect, receiveMulti, sendMulti, withSocket) @@ -87,7 +88,7 @@ startTickerInfoServer :: LogAction IO Message -> Context -> T.Text -> IO TickerI startTickerInfoServer logger ctx endpoint = do tisRun <- newTVarIO True tisMap <- newTVarIO M.empty - tisThreadId <- forkIO $ tisThread tisRun tisMap + tisThreadId <- fork $ tisThread tisRun tisMap pure $ TickerInfoServerHandle {..} where log = logWith logger diff --git a/transaq-connector.cabal b/transaq-connector.cabal index ae12556..e3a5247 100644 --- a/transaq-connector.cabal +++ b/transaq-connector.cabal @@ -120,6 +120,7 @@ test-suite transaq-connector-test , network-uri , ekg-statsd , ekg-core + , slave-thread default-extensions: OverloadedStrings , MultiWayIf , MultiParamTypeClasses