Browse Source

Use slave-thread fork

master
Denis Tereshkin 2 years ago
parent
commit
ea5c4266f7
  1. 5
      src/HistoryProviderServer.hs
  2. 5
      src/TXMLConnector.hs
  3. 11
      src/TXMLConnector/Internal.hs
  4. 5
      src/TickerInfoServer.hs
  5. 1
      transaq-connector.cabal

5
src/HistoryProviderServer.hs

@ -19,7 +19,7 @@ import ATrade.Types (Bar (..), BarTimeframe (..),
TickerId, toDouble) TickerId, toDouble)
import Colog (HasLog (getLogAction, setLogAction), import Colog (HasLog (getLogAction, setLogAction),
LogAction (LogAction, unLogAction)) LogAction (LogAction, unLogAction))
import Control.Concurrent (ThreadId, forkIO) import Control.Concurrent (ThreadId)
import Control.Concurrent.STM (TVar, atomically, newTVarIO, import Control.Concurrent.STM (TVar, atomically, newTVarIO,
readTVarIO, writeTVar) readTVarIO, writeTVar)
import Control.Exception (bracket) import Control.Exception (bracket)
@ -45,6 +45,7 @@ import Data.Time.Clock (diffUTCTime, getCurrentTime,
secondsToDiffTime) secondsToDiffTime)
import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds) import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds)
import Prelude hiding (log) import Prelude hiding (log)
import SlaveThread (fork)
import System.ZMQ4 (Context, Router (Router), bind, import System.ZMQ4 (Context, Router (Router), bind,
close, receiveMulti, sendMulti, close, receiveMulti, sendMulti,
socket) socket)
@ -182,7 +183,7 @@ startHistoryProviderServer ctx endpoint txmlH tisH logger = do
, eTxml = txmlH , eTxml = txmlH
, eTisHandle = tisH , eTisHandle = tisH
} }
hpsThreadId <- liftIO . forkIO $ (runReaderT . unApp) workThread env hpsThreadId <- liftIO . fork $ (runReaderT . unApp) workThread env
pure HistoryProviderServerHandle {..} pure HistoryProviderServerHandle {..}
stopHistoryProviderServer :: stopHistoryProviderServer ::

5
src/TXMLConnector.hs

@ -22,7 +22,7 @@ import ATrade.Types (Order, OrderId)
import Colog (HasLog (getLogAction, setLogAction), import Colog (HasLog (getLogAction, setLogAction),
LogAction (LogAction, unLogAction)) LogAction (LogAction, unLogAction))
import Config (TransaqConnectorConfig (..)) import Config (TransaqConnectorConfig (..))
import Control.Concurrent (ThreadId, forkIO) import Control.Concurrent (ThreadId)
import Control.Concurrent.BoundedChan (BoundedChan) import Control.Concurrent.BoundedChan (BoundedChan)
import Control.Concurrent.STM (TVar, atomically, import Control.Concurrent.STM (TVar, atomically,
newEmptyTMVar, newEmptyTMVarIO, newEmptyTMVar, newEmptyTMVarIO,
@ -40,6 +40,7 @@ import qualified Data.Text as T
import Data.Time.Clock (UTCTime, getCurrentTime) import Data.Time.Clock (UTCTime, getCurrentTime)
import GHC.Exts (IsList (..)) import GHC.Exts (IsList (..))
import Prelude hiding (log) import Prelude hiding (log)
import SlaveThread (fork)
import TickerInfoServer (TickerInfoServerHandle) import TickerInfoServer (TickerInfoServerHandle)
import TickTable (newTickTable) import TickTable (newTickTable)
import Transaq (TransaqResponse) import Transaq (TransaqResponse)
@ -143,7 +144,7 @@ start logger' config' qssChannel' tisH = do
, runVar = runVar' , runVar = runVar'
, timerVar = timerVar' , timerVar = timerVar'
} }
workThreadId <- forkIO $ (runReaderT . unApp) workThread env workThreadId <- fork $ (runReaderT . unApp) workThread env
return $ TXMLConnectorHandle return $ TXMLConnectorHandle
{ threadId = workThreadId { threadId = workThreadId
, notificationQueue = notificationQueue' , notificationQueue = notificationQueue'

11
src/TXMLConnector/Internal.hs

@ -22,7 +22,7 @@ import Config (SubscriptionConfig (Subscriptio
transaqHost, transaqLogLevel, transaqHost, transaqLogLevel,
transaqLogPath, transaqLogin, transaqLogPath, transaqLogin,
transaqPassword, transaqPort) transaqPassword, transaqPort)
import Control.Concurrent (forkIO, threadDelay) import Control.Concurrent (threadDelay)
import Control.Concurrent.STM (TVar, atomically, modifyTVar', import Control.Concurrent.STM (TVar, atomically, modifyTVar',
orElse, putTMVar, readTMVar, orElse, putTMVar, readTMVar,
readTVar, readTVarIO, readTVar, readTVarIO,
@ -36,6 +36,7 @@ import qualified Data.Bimap as BM
import Data.Maybe (mapMaybe) import Data.Maybe (mapMaybe)
import qualified Data.Text as T import qualified Data.Text as T
import qualified Deque.Strict as D import qualified Deque.Strict as D
import SlaveThread (fork)
import Text.XML.Light.Input (parseXML) import Text.XML.Light.Input (parseXML)
import Text.XML.Light.Types (Content (Elem), import Text.XML.Light.Types (Content (Elem),
Element (elName), Element (elName),
@ -204,7 +205,7 @@ workThread = do
Just cb -> do Just cb -> do
serverConnectionState <- asks serverConnected serverConnectionState <- asks serverConnected
timerVar' <- asks timerVar timerVar' <- asks timerVar
void $ liftIO $ forkIO $ whileM $ do void $ liftIO $ fork $ whileM $ do
threadDelay 5000000 threadDelay 5000000
void . liftIO . atomically $ tryPutTMVar timerVar' () void . liftIO . atomically $ tryPutTMVar timerVar' ()
connStatus <- liftIO . readTVarIO $ serverConnectionState connStatus <- liftIO . readTVarIO $ serverConnectionState
@ -427,7 +428,7 @@ handleConnected = do
Left result -> do Left result -> do
case headMay (parseXML result) >>= parseContent of case headMay (parseXML result) >>= parseContent of
Just (TransaqResponseResult (ResponseSuccess (Just transactionId))) -> do Just (TransaqResponseResult (ResponseSuccess (Just transactionId))) -> do
State <- asks brokerState brState <- asks brokerState
respVar <- asks responseVar respVar <- asks responseVar
liftIO $ atomically $ do liftIO $ atomically $ do
modifyTVar' (bsOrderMap brState) (M.insert (orderId order) order) modifyTVar' (bsOrderMap brState) (M.insert (orderId order) order)
@ -475,7 +476,9 @@ handleConnected = do
requestTimeoutValue = 10 requestTimeoutValue = 10
sendCancelOrder transactionId' = do sendCancelOrder transactionId' = do
v <- sendCommand . toXml $ (CommandCancelOrder $ toInteger transactionId) respVar <- asks responseVar
resp <- liftIO . atomically $ readTMVar respVar
v <- sendCommand . toXml $ (CommandCancelOrder $ toInteger transactionId')
case v of case v of
Left result -> do Left result -> do
log Debug "TXMLConnector.WorkThread" $ "Cancellation result: " <> (T.pack . show) result log Debug "TXMLConnector.WorkThread" $ "Cancellation result: " <> (T.pack . show) result

5
src/TickerInfoServer.hs

@ -16,7 +16,7 @@ import ATrade.Logging (Message,
logWith) logWith)
import ATrade.Types (Tick, TickerId, security) import ATrade.Types (Tick, TickerId, security)
import Colog (LogAction) import Colog (LogAction)
import Control.Concurrent (ThreadId, forkIO) import Control.Concurrent (ThreadId)
import Control.Concurrent.STM (TVar, atomically, newTVarIO, import Control.Concurrent.STM (TVar, atomically, newTVarIO,
readTVarIO) readTVarIO)
import Control.Concurrent.STM.TVar (modifyTVar', writeTVar) import Control.Concurrent.STM.TVar (modifyTVar', writeTVar)
@ -34,6 +34,7 @@ import qualified Data.Text as T
import Data.Text.Encoding (decodeUtf8With, encodeUtf8) import Data.Text.Encoding (decodeUtf8With, encodeUtf8)
import Data.Text.Encoding.Error (replace) import Data.Text.Encoding.Error (replace)
import Prelude hiding (log) import Prelude hiding (log)
import SlaveThread (fork)
import System.ZMQ4 (Context, Router (Router), bind, import System.ZMQ4 (Context, Router (Router), bind,
connect, receiveMulti, sendMulti, connect, receiveMulti, sendMulti,
withSocket) withSocket)
@ -87,7 +88,7 @@ startTickerInfoServer :: LogAction IO Message -> Context -> T.Text -> IO TickerI
startTickerInfoServer logger ctx endpoint = do startTickerInfoServer logger ctx endpoint = do
tisRun <- newTVarIO True tisRun <- newTVarIO True
tisMap <- newTVarIO M.empty tisMap <- newTVarIO M.empty
tisThreadId <- forkIO $ tisThread tisRun tisMap tisThreadId <- fork $ tisThread tisRun tisMap
pure $ TickerInfoServerHandle {..} pure $ TickerInfoServerHandle {..}
where where
log = logWith logger log = logWith logger

1
transaq-connector.cabal

@ -120,6 +120,7 @@ test-suite transaq-connector-test
, network-uri , network-uri
, ekg-statsd , ekg-statsd
, ekg-core , ekg-core
, slave-thread
default-extensions: OverloadedStrings default-extensions: OverloadedStrings
, MultiWayIf , MultiWayIf
, MultiParamTypeClasses , MultiParamTypeClasses

Loading…
Cancel
Save