Browse Source

Junction: handle robot stop command

master
Denis Tereshkin 4 years ago
parent
commit
4ac71e148c
  1. 4
      src/ATrade/Driver/Junction/JunctionMonad.hs
  2. 51
      src/ATrade/Driver/Junction/RemoteControl.hs
  3. 40
      src/ATrade/Driver/Junction/RobotDriverThread.hs

4
src/ATrade/Driver/Junction/JunctionMonad.hs

@ -141,7 +141,9 @@ instance QuoteStream JunctionM where @@ -141,7 +141,9 @@ instance QuoteStream JunctionM where
addSubscription (QuoteSubscription ticker tf) chan = do
qt <- asks peQuoteThread
QT.addSubscription qt ticker tf chan
removeSubscription _ = undefined
removeSubscription subId = do
qt <- asks peQuoteThread
QT.removeSubscription qt subId
startRobot :: LogAction IO Message -> ProgramConfiguration -> IORef Bars -> IORef TickerInfoMap ->
BrokerService -> M.Map T.Text StrategyDescriptorE -> StrategyInstanceDescriptor -> JunctionM ()

51
src/ATrade/Driver/Junction/RemoteControl.hs

@ -1,3 +1,4 @@ @@ -1,3 +1,4 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE OverloadedStrings #-}
@ -6,21 +7,28 @@ module ATrade.Driver.Junction.RemoteControl @@ -6,21 +7,28 @@ module ATrade.Driver.Junction.RemoteControl
handleRemoteControl
) where
import ATrade.Driver.Junction.JunctionMonad (JunctionEnv (peLogAction, peRemoteControlSocket, peRobots),
JunctionM)
import ATrade.Driver.Junction.Types (StrategyInstanceDescriptor)
import ATrade.Logging (logErrorWith)
import Control.Monad (unless)
import Control.Monad.Reader (asks)
import Data.Aeson (decode)
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
import qualified Data.Map.Strict as M
import qualified Data.Text as T
import Data.Text.Encoding (decodeUtf8', encodeUtf8)
import System.ZMQ4 (Event (In), Poll (Sock),
poll, receive, send)
import UnliftIO (MonadIO (liftIO))
import ATrade.Driver.Junction.JunctionMonad (JunctionEnv (peLogAction, peRemoteControlSocket, peRobots),
JunctionM)
import ATrade.Driver.Junction.RobotDriverThread (stopRobot)
import ATrade.Driver.Junction.Types (StrategyInstanceDescriptor)
import ATrade.Logging (Severity (Info),
logErrorWith,
logWith)
import Control.Monad (unless)
import Control.Monad.Reader (asks)
import Data.Aeson (decode)
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
import qualified Data.Map.Strict as M
import qualified Data.Text as T
import Data.Text.Encoding (decodeUtf8',
encodeUtf8)
import System.ZMQ4 (Event (In),
Poll (Sock), poll,
receive, send)
import UnliftIO (MonadIO (liftIO),
atomicModifyIORef',
readIORef)
data RemoteControlResponse =
ResponseOk
@ -89,7 +97,18 @@ handleRemoteControl timeout = do @@ -89,7 +97,18 @@ handleRemoteControl timeout = do
liftIO $ send sock [] (makeRemoteControlResponse response)
where
handleRequest (StartRobot inst) = undefined
handleRequest (StopRobot instId) = undefined
handleRequest (StopRobot instId) = do
robotsRef <- asks peRobots
robots <- readIORef robotsRef
case M.lookup instId robots of
Just robot -> do
logger <- asks peLogAction
logWith logger Info "RemoteControl" $ "Stopping robot: " <> instId
stopRobot robot
liftIO $ atomicModifyIORef' robotsRef (\r -> (M.delete instId r, ()))
return ResponseOk
Nothing -> return $ ResponseError $ "Not started: " <> instId
handleRequest (ReloadConfig instId) = undefined
handleRequest (SetState instId rawState) = undefined
handleRequest Ping = return ResponseOk

40
src/ATrade/Driver/Junction/RobotDriverThread.hs

@ -13,12 +13,14 @@ module ATrade.Driver.Junction.RobotDriverThread @@ -13,12 +13,14 @@ module ATrade.Driver.Junction.RobotDriverThread
RobotM(..),
RobotDriverHandle,
onStrategyInstance,
postNotificationEvent) where
postNotificationEvent,
stopRobot
) where
import ATrade.Broker.Protocol (Notification (OrderNotification, TradeNotification))
import qualified ATrade.Driver.Junction.BrokerService as Bro
import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription),
QuoteSubscription (QuoteSubscription))
import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription, removeSubscription),
QuoteSubscription (QuoteSubscription), SubscriptionId)
import ATrade.Driver.Junction.Types (BigConfig,
StrategyDescriptor,
StrategyInstance (StrategyInstance, strategyEventCallback),
@ -28,31 +30,29 @@ import ATrade.Driver.Junction.Types (BigConfig, @@ -28,31 +30,29 @@ import ATrade.Driver.Junction.Types (BigConfig,
eventCallback, stateKey,
strategyId, tickerId,
timeframe)
import ATrade.Logging (Message, log, logDebug,
logInfo, logWarning)
import ATrade.Logging (Message, log)
import ATrade.QuoteSource.Client (QuoteData (..))
import ATrade.RoboCom.ConfigStorage (ConfigStorage)
import ATrade.RoboCom.Monad (Event (NewBar, NewTick, NewTrade, OrderSubmitted, OrderUpdate),
MonadRobot (..),
StrategyEnvironment (StrategyEnvironment, _seInstanceId, _seLastTimestamp))
import ATrade.RoboCom.Monad (Event (NewBar, NewTick, NewTrade, OrderUpdate),
MonadRobot (..), StrategyEnvironment (..))
import ATrade.RoboCom.Persistence (MonadPersistence)
import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId),
Bars, TickerInfoMap)
import ATrade.Types (Order (orderId), OrderId,
import ATrade.Types (OrderId,
OrderState, Trade, Tick (value))
import Colog (HasLog (getLogAction, setLogAction),
LogAction)
import Control.Concurrent (ThreadId, forkIO)
import Control.Concurrent (ThreadId, forkIO, killThread)
import Control.Concurrent.BoundedChan (BoundedChan,
newBoundedChan, readChan,
writeChan)
import Control.Exception.Safe (MonadThrow)
import Control.Monad (forM_, forever, void, when)
import Control.Monad (forM_, forever, void, when, forM)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Reader (MonadReader (local),
ReaderT, asks)
import Data.Aeson (FromJSON, ToJSON)
import Data.Default
import Data.Default ( Default )
import Data.IORef (IORef,
atomicModifyIORef',
readIORef, writeIORef)
@ -64,7 +64,7 @@ import Dhall (FromDhall) @@ -64,7 +64,7 @@ import Dhall (FromDhall)
import Prelude hiding (log)
data RobotDriverHandle = forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) =>
RobotDriverHandle (StrategyInstance c s) ThreadId ThreadId (BoundedChan RobotDriverEvent)
RobotDriverHandle (StrategyInstance c s) ThreadId ThreadId (BoundedChan RobotDriverEvent) [SubscriptionId]
data RobotDriverRequest
@ -118,19 +118,25 @@ createRobotDriverThread instDesc strDesc runner bigConf rConf rState rTimers = d @@ -118,19 +118,25 @@ createRobotDriverThread instDesc strDesc runner bigConf rConf rState rTimers = d
let inst = StrategyInstance (strategyId instDesc) (eventCallback strDesc) rState rConf rTimers
quoteQueue <- liftIO $ newBoundedChan 2000
forM_ (confTickers bigConf) (\x -> addSubscription (QuoteSubscription (tickerId x) (timeframe x)) quoteQueue)
subIds <- forM (confTickers bigConf) (\x -> addSubscription (QuoteSubscription (tickerId x) (timeframe x)) quoteQueue)
qthread <- liftIO . forkIO $ forever $ passQuoteEvents eventQueue quoteQueue
driver <- liftIO . forkIO $ runner $ robotDriverThread inst eventQueue
return $ RobotDriverHandle inst driver qthread eventQueue
return $ RobotDriverHandle inst driver qthread eventQueue subIds
where
passQuoteEvents eventQueue quoteQueue = do
v <- readChan quoteQueue
writeChan eventQueue (QuoteEvent v)
stopRobot :: (MonadIO m, QuoteStream m) => RobotDriverHandle -> m ()
stopRobot (RobotDriverHandle _ driver qthread _ subIds) = do
forM_ subIds removeSubscription
liftIO $ killThread driver
liftIO $ killThread qthread
onStrategyInstance :: RobotDriverHandle -> forall r. (forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) => StrategyInstance c s -> r) -> r
onStrategyInstance (RobotDriverHandle inst _ _ _) f = f inst
onStrategyInstance (RobotDriverHandle inst _ _ _ _) f = f inst
data RobotEnv c s =
RobotEnv
@ -191,7 +197,7 @@ instance MonadRobot (RobotM c s) c s where @@ -191,7 +197,7 @@ instance MonadRobot (RobotM c s) c s where
getAvailableTickers = asks tickers
postNotificationEvent :: (MonadIO m) => RobotDriverHandle -> Notification -> m ()
postNotificationEvent (RobotDriverHandle _ _ _ eventQueue) notification = liftIO $
postNotificationEvent (RobotDriverHandle _ _ _ eventQueue _) notification = liftIO $
case notification of
OrderNotification _ oid state -> writeChan eventQueue (OrderEvent oid state)
TradeNotification _ trade -> writeChan eventQueue (NewTradeEvent trade)

Loading…
Cancel
Save