Browse Source

junction: RemoteControl: handle RELOAD_CONFIG command

master
Denis Tereshkin 4 years ago
parent
commit
5924e3ef70
  1. 31
      src/ATrade/Driver/Junction/JunctionMonad.hs
  2. 7
      src/ATrade/Driver/Junction/RemoteControl.hs
  3. 21
      src/ATrade/Driver/Junction/RobotDriverThread.hs

31
src/ATrade/Driver/Junction/JunctionMonad.hs

@ -8,7 +8,8 @@ module ATrade.Driver.Junction.JunctionMonad
JunctionEnv(..), JunctionEnv(..),
JunctionM(..), JunctionM(..),
startRobot, startRobot,
saveRobots saveRobots,
reloadConfig
) where ) where
import ATrade.Broker.Client (BrokerClientHandle) import ATrade.Broker.Client (BrokerClientHandle)
@ -22,7 +23,9 @@ import qualified ATrade.Driver.Junction.QuoteThread as QT
import ATrade.Driver.Junction.RobotDriverThread (RobotDriverHandle, RobotEnv (RobotEnv), import ATrade.Driver.Junction.RobotDriverThread (RobotDriverHandle, RobotEnv (RobotEnv),
RobotM (unRobotM), RobotM (unRobotM),
createRobotDriverThread, createRobotDriverThread,
onStrategyInstance) getInstanceDescriptor,
onStrategyInstance,
onStrategyInstanceM)
import ATrade.Driver.Junction.Types (StrategyDescriptorE (StrategyDescriptorE), import ATrade.Driver.Junction.Types (StrategyDescriptorE (StrategyDescriptorE),
StrategyInstanceDescriptor, StrategyInstanceDescriptor,
accountId, accountId,
@ -31,6 +34,7 @@ import ATrade.Driver.Junction.Types (StrategyDescriptor
configKey, configKey,
stateKey, stateKey,
strategyBaseName, strategyBaseName,
strategyConfig,
strategyId, strategyId,
strategyInstanceId, strategyInstanceId,
strategyState, strategyState,
@ -52,7 +56,6 @@ import Colog (HasLog (getLogActi
hoistLogAction, hoistLogAction,
logTextHandle, logTextHandle,
(>$<)) (>$<))
import Control.Exception.Safe (MonadThrow)
import Control.Exception.Safe (finally) import Control.Exception.Safe (finally)
import Control.Monad.Reader (MonadIO (liftIO), import Control.Monad.Reader (MonadIO (liftIO),
MonadReader, MonadReader,
@ -66,7 +69,8 @@ import Data.Foldable (traverse_)
import Data.IORef (IORef, import Data.IORef (IORef,
atomicModifyIORef', atomicModifyIORef',
newIORef, newIORef,
readIORef) readIORef,
writeIORef)
import Data.List.NonEmpty (NonEmpty ((:|))) import Data.List.NonEmpty (NonEmpty ((:|)))
import qualified Data.Map.Strict as M import qualified Data.Map.Strict as M
import qualified Data.Text as T import qualified Data.Text as T
@ -86,7 +90,8 @@ import System.IO (BufferMode (LineBu
import System.IO (hClose) import System.IO (hClose)
import System.ZMQ4 (Rep, Socket) import System.ZMQ4 (Rep, Socket)
import UnliftIO (MonadUnliftIO) import UnliftIO (MonadUnliftIO)
import UnliftIO.Exception (catchAny) import UnliftIO.Exception (catchAny,
onException)
data JunctionEnv = data JunctionEnv =
JunctionEnv JunctionEnv
@ -107,7 +112,7 @@ data JunctionEnv =
} }
newtype JunctionM a = JunctionM { unJunctionM :: ReaderT JunctionEnv IO a } newtype JunctionM a = JunctionM { unJunctionM :: ReaderT JunctionEnv IO a }
deriving (Functor, Applicative, Monad, MonadReader JunctionEnv, MonadIO, MonadThrow, MonadUnliftIO) deriving (Functor, Applicative, Monad, MonadReader JunctionEnv, MonadIO, MonadUnliftIO)
instance HasLog JunctionEnv Message JunctionM where instance HasLog JunctionEnv Message JunctionM where
getLogAction = peLogAction getLogAction = peLogAction
@ -207,3 +212,17 @@ saveRobotState handle = onStrategyInstance handle $ \inst -> do
saveState currentState (strategyInstanceId inst) saveState currentState (strategyInstanceId inst)
currentTimers <- liftIO $ readIORef (strategyTimers inst) currentTimers <- liftIO $ readIORef (strategyTimers inst)
saveState currentTimers (strategyInstanceId inst <> ":timers") saveState currentTimers (strategyInstanceId inst <> ":timers")
reloadConfig :: T.Text -> JunctionM (Either T.Text ())
reloadConfig instId = flip catchAny (\_ -> return $ Left "Exception") $ do
robotsMap' <- asks peRobots
robots <- liftIO $ readIORef robotsMap'
case M.lookup instId robots of
Just robot -> do
onStrategyInstanceM robot
(\inst -> do
let instDesc = getInstanceDescriptor robot
bigConf <- loadConfig (configKey instDesc)
liftIO $ writeIORef (strategyConfig inst) (confStrategy bigConf))
return $ Right ()
Nothing -> return $ Left "Unable to load config"

7
src/ATrade/Driver/Junction/RemoteControl.hs

@ -9,6 +9,7 @@ module ATrade.Driver.Junction.RemoteControl
import ATrade.Driver.Junction.JunctionMonad (JunctionEnv (peLogAction, peRemoteControlSocket, peRobots), import ATrade.Driver.Junction.JunctionMonad (JunctionEnv (peLogAction, peRemoteControlSocket, peRobots),
JunctionM, JunctionM,
reloadConfig,
startRobot) startRobot)
import ATrade.Driver.Junction.RobotDriverThread (stopRobot) import ATrade.Driver.Junction.RobotDriverThread (stopRobot)
import ATrade.Driver.Junction.Types (StrategyInstanceDescriptor) import ATrade.Driver.Junction.Types (StrategyInstanceDescriptor)
@ -112,7 +113,11 @@ handleRemoteControl timeout = do
return ResponseOk return ResponseOk
Nothing -> return $ ResponseError $ "Not started: " <> instId Nothing -> return $ ResponseError $ "Not started: " <> instId
handleRequest (ReloadConfig instId) = undefined handleRequest (ReloadConfig instId) = do
res <- reloadConfig instId
case res of
Left errmsg -> return $ ResponseError errmsg
Right () -> return ResponseOk
handleRequest (SetState instId rawState) = undefined handleRequest (SetState instId rawState) = undefined
handleRequest Ping = return ResponseOk handleRequest Ping = return ResponseOk

21
src/ATrade/Driver/Junction/RobotDriverThread.hs

@ -13,8 +13,10 @@ module ATrade.Driver.Junction.RobotDriverThread
RobotM(..), RobotM(..),
RobotDriverHandle, RobotDriverHandle,
onStrategyInstance, onStrategyInstance,
onStrategyInstanceM,
postNotificationEvent, postNotificationEvent,
stopRobot stopRobot,
getInstanceDescriptor
) where ) where
import ATrade.Broker.Protocol (Notification (OrderNotification, TradeNotification)) import ATrade.Broker.Protocol (Notification (OrderNotification, TradeNotification))
@ -68,7 +70,7 @@ import Dhall (FromDhall)
import Prelude hiding (log) import Prelude hiding (log)
data RobotDriverHandle = forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) => data RobotDriverHandle = forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) =>
RobotDriverHandle (StrategyInstance c s) ThreadId ThreadId (BoundedChan RobotDriverEvent) [SubscriptionId] RobotDriverHandle StrategyInstanceDescriptor (StrategyInstance c s) ThreadId ThreadId (BoundedChan RobotDriverEvent) [SubscriptionId]
data RobotDriverRequest data RobotDriverRequest
@ -126,7 +128,7 @@ createRobotDriverThread instDesc strDesc runner bigConf rConf rState rTimers = d
qthread <- liftIO . forkIO $ forever $ passQuoteEvents eventQueue quoteQueue qthread <- liftIO . forkIO $ forever $ passQuoteEvents eventQueue quoteQueue
driver <- liftIO . forkIO $ runner $ robotDriverThread inst eventQueue driver <- liftIO . forkIO $ runner $ robotDriverThread inst eventQueue
return $ RobotDriverHandle inst driver qthread eventQueue subIds return $ RobotDriverHandle instDesc inst driver qthread eventQueue subIds
where where
passQuoteEvents eventQueue quoteQueue = do passQuoteEvents eventQueue quoteQueue = do
@ -134,13 +136,17 @@ createRobotDriverThread instDesc strDesc runner bigConf rConf rState rTimers = d
writeChan eventQueue (QuoteEvent v) writeChan eventQueue (QuoteEvent v)
stopRobot :: (MonadIO m, QuoteStream m) => RobotDriverHandle -> m () stopRobot :: (MonadIO m, QuoteStream m) => RobotDriverHandle -> m ()
stopRobot (RobotDriverHandle _ driver qthread _ subIds) = do stopRobot (RobotDriverHandle _ _ driver qthread _ subIds) = do
forM_ subIds removeSubscription forM_ subIds removeSubscription
liftIO $ killThread driver liftIO $ killThread driver
liftIO $ killThread qthread liftIO $ killThread qthread
onStrategyInstance :: RobotDriverHandle -> forall r. (forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) => StrategyInstance c s -> r) -> r onStrategyInstance :: RobotDriverHandle -> forall r. (forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) => StrategyInstance c s -> r) -> r
onStrategyInstance (RobotDriverHandle inst _ _ _ _) f = f inst onStrategyInstance (RobotDriverHandle _ inst _ _ _ _) f = f inst
onStrategyInstanceM :: (MonadIO m) => RobotDriverHandle ->
(forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) => StrategyInstance c s -> m r) -> m r
onStrategyInstanceM (RobotDriverHandle _ inst _ _ _ _) f = f inst
data RobotEnv c s = data RobotEnv c s =
RobotEnv RobotEnv
@ -201,9 +207,10 @@ instance MonadRobot (RobotM c s) c s where
getAvailableTickers = asks tickers getAvailableTickers = asks tickers
postNotificationEvent :: (MonadIO m) => RobotDriverHandle -> Notification -> m () postNotificationEvent :: (MonadIO m) => RobotDriverHandle -> Notification -> m ()
postNotificationEvent (RobotDriverHandle _ _ _ eventQueue _) notification = liftIO $ postNotificationEvent (RobotDriverHandle _ _ _ _ eventQueue _) notification = liftIO $
case notification of case notification of
OrderNotification _ oid state -> writeChan eventQueue (OrderEvent oid state) OrderNotification _ oid state -> writeChan eventQueue (OrderEvent oid state)
TradeNotification _ trade -> writeChan eventQueue (NewTradeEvent trade) TradeNotification _ trade -> writeChan eventQueue (NewTradeEvent trade)
getInstanceDescriptor :: RobotDriverHandle -> StrategyInstanceDescriptor
getInstanceDescriptor (RobotDriverHandle instDesc _ _ _ _ _) = instDesc

Loading…
Cancel
Save