From 4ac71e148ce73daf56e77dab903dc9a0363d4e46 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Tue, 4 Jan 2022 19:55:46 +0700 Subject: [PATCH] Junction: handle robot stop command --- src/ATrade/Driver/Junction/JunctionMonad.hs | 4 +- src/ATrade/Driver/Junction/RemoteControl.hs | 51 +++++++++++++------ .../Driver/Junction/RobotDriverThread.hs | 40 ++++++++------- 3 files changed, 61 insertions(+), 34 deletions(-) diff --git a/src/ATrade/Driver/Junction/JunctionMonad.hs b/src/ATrade/Driver/Junction/JunctionMonad.hs index ae7e3af..ca9886d 100644 --- a/src/ATrade/Driver/Junction/JunctionMonad.hs +++ b/src/ATrade/Driver/Junction/JunctionMonad.hs @@ -141,7 +141,9 @@ instance QuoteStream JunctionM where addSubscription (QuoteSubscription ticker tf) chan = do qt <- asks peQuoteThread QT.addSubscription qt ticker tf chan - removeSubscription _ = undefined + removeSubscription subId = do + qt <- asks peQuoteThread + QT.removeSubscription qt subId startRobot :: LogAction IO Message -> ProgramConfiguration -> IORef Bars -> IORef TickerInfoMap -> BrokerService -> M.Map T.Text StrategyDescriptorE -> StrategyInstanceDescriptor -> JunctionM () diff --git a/src/ATrade/Driver/Junction/RemoteControl.hs b/src/ATrade/Driver/Junction/RemoteControl.hs index 3579402..0287f38 100644 --- a/src/ATrade/Driver/Junction/RemoteControl.hs +++ b/src/ATrade/Driver/Junction/RemoteControl.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE MultiWayIf #-} {-# LANGUAGE OverloadedStrings #-} @@ -6,21 +7,28 @@ module ATrade.Driver.Junction.RemoteControl handleRemoteControl ) where -import ATrade.Driver.Junction.JunctionMonad (JunctionEnv (peLogAction, peRemoteControlSocket, peRobots), - JunctionM) -import ATrade.Driver.Junction.Types (StrategyInstanceDescriptor) -import ATrade.Logging (logErrorWith) -import Control.Monad (unless) -import Control.Monad.Reader (asks) -import Data.Aeson (decode) -import qualified Data.ByteString as B -import qualified Data.ByteString.Lazy as BL -import qualified Data.Map.Strict as M -import qualified Data.Text as T -import Data.Text.Encoding (decodeUtf8', encodeUtf8) -import System.ZMQ4 (Event (In), Poll (Sock), - poll, receive, send) -import UnliftIO (MonadIO (liftIO)) +import ATrade.Driver.Junction.JunctionMonad (JunctionEnv (peLogAction, peRemoteControlSocket, peRobots), + JunctionM) +import ATrade.Driver.Junction.RobotDriverThread (stopRobot) +import ATrade.Driver.Junction.Types (StrategyInstanceDescriptor) +import ATrade.Logging (Severity (Info), + logErrorWith, + logWith) +import Control.Monad (unless) +import Control.Monad.Reader (asks) +import Data.Aeson (decode) +import qualified Data.ByteString as B +import qualified Data.ByteString.Lazy as BL +import qualified Data.Map.Strict as M +import qualified Data.Text as T +import Data.Text.Encoding (decodeUtf8', + encodeUtf8) +import System.ZMQ4 (Event (In), + Poll (Sock), poll, + receive, send) +import UnliftIO (MonadIO (liftIO), + atomicModifyIORef', + readIORef) data RemoteControlResponse = ResponseOk @@ -89,7 +97,18 @@ handleRemoteControl timeout = do liftIO $ send sock [] (makeRemoteControlResponse response) where handleRequest (StartRobot inst) = undefined - handleRequest (StopRobot instId) = undefined + handleRequest (StopRobot instId) = do + robotsRef <- asks peRobots + robots <- readIORef robotsRef + case M.lookup instId robots of + Just robot -> do + logger <- asks peLogAction + logWith logger Info "RemoteControl" $ "Stopping robot: " <> instId + stopRobot robot + liftIO $ atomicModifyIORef' robotsRef (\r -> (M.delete instId r, ())) + return ResponseOk + Nothing -> return $ ResponseError $ "Not started: " <> instId + handleRequest (ReloadConfig instId) = undefined handleRequest (SetState instId rawState) = undefined handleRequest Ping = return ResponseOk diff --git a/src/ATrade/Driver/Junction/RobotDriverThread.hs b/src/ATrade/Driver/Junction/RobotDriverThread.hs index a5c7a1e..551a0cc 100644 --- a/src/ATrade/Driver/Junction/RobotDriverThread.hs +++ b/src/ATrade/Driver/Junction/RobotDriverThread.hs @@ -13,12 +13,14 @@ module ATrade.Driver.Junction.RobotDriverThread RobotM(..), RobotDriverHandle, onStrategyInstance, - postNotificationEvent) where + postNotificationEvent, + stopRobot + ) where import ATrade.Broker.Protocol (Notification (OrderNotification, TradeNotification)) import qualified ATrade.Driver.Junction.BrokerService as Bro -import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription), - QuoteSubscription (QuoteSubscription)) +import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription, removeSubscription), + QuoteSubscription (QuoteSubscription), SubscriptionId) import ATrade.Driver.Junction.Types (BigConfig, StrategyDescriptor, StrategyInstance (StrategyInstance, strategyEventCallback), @@ -28,31 +30,29 @@ import ATrade.Driver.Junction.Types (BigConfig, eventCallback, stateKey, strategyId, tickerId, timeframe) -import ATrade.Logging (Message, log, logDebug, - logInfo, logWarning) +import ATrade.Logging (Message, log) import ATrade.QuoteSource.Client (QuoteData (..)) import ATrade.RoboCom.ConfigStorage (ConfigStorage) -import ATrade.RoboCom.Monad (Event (NewBar, NewTick, NewTrade, OrderSubmitted, OrderUpdate), - MonadRobot (..), - StrategyEnvironment (StrategyEnvironment, _seInstanceId, _seLastTimestamp)) +import ATrade.RoboCom.Monad (Event (NewBar, NewTick, NewTrade, OrderUpdate), + MonadRobot (..), StrategyEnvironment (..)) import ATrade.RoboCom.Persistence (MonadPersistence) import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId), Bars, TickerInfoMap) -import ATrade.Types (Order (orderId), OrderId, +import ATrade.Types (OrderId, OrderState, Trade, Tick (value)) import Colog (HasLog (getLogAction, setLogAction), LogAction) -import Control.Concurrent (ThreadId, forkIO) +import Control.Concurrent (ThreadId, forkIO, killThread) import Control.Concurrent.BoundedChan (BoundedChan, newBoundedChan, readChan, writeChan) import Control.Exception.Safe (MonadThrow) -import Control.Monad (forM_, forever, void, when) +import Control.Monad (forM_, forever, void, when, forM) import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Monad.Reader (MonadReader (local), ReaderT, asks) import Data.Aeson (FromJSON, ToJSON) -import Data.Default +import Data.Default ( Default ) import Data.IORef (IORef, atomicModifyIORef', readIORef, writeIORef) @@ -64,7 +64,7 @@ import Dhall (FromDhall) import Prelude hiding (log) data RobotDriverHandle = forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) => - RobotDriverHandle (StrategyInstance c s) ThreadId ThreadId (BoundedChan RobotDriverEvent) + RobotDriverHandle (StrategyInstance c s) ThreadId ThreadId (BoundedChan RobotDriverEvent) [SubscriptionId] data RobotDriverRequest @@ -118,19 +118,25 @@ createRobotDriverThread instDesc strDesc runner bigConf rConf rState rTimers = d let inst = StrategyInstance (strategyId instDesc) (eventCallback strDesc) rState rConf rTimers quoteQueue <- liftIO $ newBoundedChan 2000 - forM_ (confTickers bigConf) (\x -> addSubscription (QuoteSubscription (tickerId x) (timeframe x)) quoteQueue) + subIds <- forM (confTickers bigConf) (\x -> addSubscription (QuoteSubscription (tickerId x) (timeframe x)) quoteQueue) qthread <- liftIO . forkIO $ forever $ passQuoteEvents eventQueue quoteQueue driver <- liftIO . forkIO $ runner $ robotDriverThread inst eventQueue - return $ RobotDriverHandle inst driver qthread eventQueue + return $ RobotDriverHandle inst driver qthread eventQueue subIds where passQuoteEvents eventQueue quoteQueue = do v <- readChan quoteQueue writeChan eventQueue (QuoteEvent v) +stopRobot :: (MonadIO m, QuoteStream m) => RobotDriverHandle -> m () +stopRobot (RobotDriverHandle _ driver qthread _ subIds) = do + forM_ subIds removeSubscription + liftIO $ killThread driver + liftIO $ killThread qthread + onStrategyInstance :: RobotDriverHandle -> forall r. (forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) => StrategyInstance c s -> r) -> r -onStrategyInstance (RobotDriverHandle inst _ _ _) f = f inst +onStrategyInstance (RobotDriverHandle inst _ _ _ _) f = f inst data RobotEnv c s = RobotEnv @@ -191,7 +197,7 @@ instance MonadRobot (RobotM c s) c s where getAvailableTickers = asks tickers postNotificationEvent :: (MonadIO m) => RobotDriverHandle -> Notification -> m () -postNotificationEvent (RobotDriverHandle _ _ _ eventQueue) notification = liftIO $ +postNotificationEvent (RobotDriverHandle _ _ _ eventQueue _) notification = liftIO $ case notification of OrderNotification _ oid state -> writeChan eventQueue (OrderEvent oid state) TradeNotification _ trade -> writeChan eventQueue (NewTradeEvent trade)