diff --git a/robocom-zero.cabal b/robocom-zero.cabal index e42b95d..53b82d0 100644 --- a/robocom-zero.cabal +++ b/robocom-zero.cabal @@ -39,6 +39,8 @@ library , ATrade.Quotes.HistoryProvider , ATrade.Quotes.TickerInfoProvider other-modules: Paths_robocom_zero + , ATrade.Driver.Junction.RemoteControl + , ATrade.Driver.Junction.JunctionMonad build-depends: base >= 4.7 && < 5 , libatrade >= 0.12.0.0 && < 0.13.0.0 , text diff --git a/src/ATrade/Driver/Junction.hs b/src/ATrade/Driver/Junction.hs index 7b934b2..f1ac727 100644 --- a/src/ATrade/Driver/Junction.hs +++ b/src/ATrade/Driver/Junction.hs @@ -6,14 +6,14 @@ {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} +{-# OPTIONS_GHC -Wno-unused-imports #-} module ATrade.Driver.Junction ( junctionMain ) where -import ATrade.Broker.Client (BrokerClientHandle, - startBrokerClient, +import ATrade.Broker.Client (startBrokerClient, stopBrokerClient) import ATrade.Broker.Protocol (Notification (OrderNotification, TradeNotification), NotificationSqnum (unNotificationSqnum), @@ -21,81 +21,55 @@ import ATrade.Broker.Protocol (Notification (Orde import ATrade.Driver.Junction.BrokerService (BrokerService, getNotifications, mkBrokerService) +import ATrade.Driver.Junction.JunctionMonad (JunctionEnv (..), + JunctionM (..), + saveRobots, + startRobot) import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (..), ProgramOptions (ProgramOptions, configPath)) -import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription, removeSubscription), - QuoteSubscription (QuoteSubscription), - SubscriptionId (SubscriptionId)) import ATrade.Driver.Junction.QuoteThread (DownloaderEnv (DownloaderEnv), - QuoteThreadHandle, withQThread) -import qualified ATrade.Driver.Junction.QuoteThread as QT -import ATrade.Driver.Junction.RobotDriverThread (RobotDriverHandle, RobotEnv (RobotEnv), - RobotM (..), - createRobotDriverThread, - onStrategyInstance, - postNotificationEvent) -import ATrade.Driver.Junction.Types (StrategyDescriptorE (StrategyDescriptorE), - StrategyInstance (strategyInstanceId), - StrategyInstanceDescriptor (..), +import ATrade.Driver.Junction.RemoteControl (handleRemoteControl) +import ATrade.Driver.Junction.RobotDriverThread (RobotDriverHandle, postNotificationEvent) +import ATrade.Driver.Junction.Types (StrategyDescriptorE, confStrategy, confTickers, strategyState, strategyTimers, tickerId, timeframe) -import ATrade.Logging (Message, Severity (Debug, Error, Info, Trace, Warning), +import ATrade.Logging (Message, Severity (Debug, Info, Trace, Warning), fmtMessage, - logWarning, logWith) import ATrade.Quotes.QHP (mkQHPHandle) -import ATrade.RoboCom.ConfigStorage (ConfigStorage (loadConfig)) -import ATrade.RoboCom.Monad (StrategyEnvironment (..)) -import ATrade.RoboCom.Persistence (MonadPersistence (loadState, saveState)) -import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId), - Bars, +import ATrade.RoboCom.Types (Bars, TickerInfoMap) import ATrade.Types (ClientSecurityParams (ClientSecurityParams), OrderId, Trade (tradeOrderId)) -import Colog (HasLog (getLogAction, setLogAction), - LogAction (LogAction), +import Colog (LogAction (LogAction), hoistLogAction, logTextStdout, (<&), (>$<)) import Colog.Actions (logTextHandle) import Control.Concurrent.QSem (newQSem) -import Control.Exception.Safe (MonadThrow) import Control.Monad (forM_, forever) import Control.Monad.Extra (whenM) import Control.Monad.IO.Class (MonadIO (liftIO)) -import Control.Monad.Reader (MonadReader, ReaderT (runReaderT), - asks) -import Data.Aeson (eitherDecode, - encode) -import qualified Data.ByteString.Lazy as BL -import Data.Default (Default (def)) -import Data.Foldable (traverse_) +import Control.Monad.Reader (ReaderT (runReaderT)) import Data.IORef (IORef, atomicModifyIORef', newIORef, readIORef) -import Data.List.NonEmpty (NonEmpty ((:|))) import qualified Data.Map.Strict as M import Data.Set (notMember) import qualified Data.Set as S import qualified Data.Text as T import Data.Text.Encoding (encodeUtf8) import Data.Text.IO (readFile) -import Data.Time (getCurrentTime) -import Data.Time.Clock.POSIX (getPOSIXTime) -import Database.Redis (ConnectInfo (..), - Connection, - PortID (UnixSocket), +import Database.Redis (ConnectInfo (..), PortID (UnixSocket), checkedConnect, - defaultConnectInfo, - get, mset, - runRedis) + defaultConnectInfo) import Dhall (auto, input) import Options.Applicative (Parser, execParser, @@ -111,74 +85,15 @@ import System.IO (BufferMode (LineBu Handle, IOMode (AppendMode), hSetBuffering, - openFile, withFile) -import System.ZMQ4 (withContext) +import System.ZMQ4 (Rep (Rep), bind, + withContext, + withSocket) import System.ZMQ4.ZAP (loadCertificateFromFile) import UnliftIO (MonadUnliftIO) -import UnliftIO.Concurrent (threadDelay) import UnliftIO.Exception (bracket) import UnliftIO.QSem (QSem, withQSem) -data JunctionEnv = - JunctionEnv - { - peRedisSocket :: Connection, - peConfigPath :: FilePath, - peQuoteThread :: QuoteThreadHandle, - peBroker :: BrokerClientHandle, - peRobots :: IORef (M.Map T.Text RobotDriverHandle), - peLogAction :: LogAction JunctionM Message - } - -newtype JunctionM a = JunctionM { unJunctionM :: ReaderT JunctionEnv IO a } - deriving (Functor, Applicative, Monad, MonadReader JunctionEnv, MonadIO, MonadThrow) - -instance HasLog JunctionEnv Message JunctionM where - getLogAction = peLogAction - setLogAction a e = e { peLogAction = a } - -instance ConfigStorage JunctionM where - loadConfig key = do - basePath <- asks peConfigPath - let path = basePath <> "/" <> T.unpack key -- TODO fix path construction - liftIO $ readFile path >>= input auto - -instance MonadPersistence JunctionM where - saveState newState key = do - conn <- asks peRedisSocket - now <- liftIO getPOSIXTime - res <- liftIO $ runRedis conn $ mset [(encodeUtf8 key, BL.toStrict $ encode newState), - (encodeUtf8 (key <> ":last_store") , encodeUtf8 . T.pack . show $ now)] - case res of - Left _ -> logWarning "Junction " "Unable to save state" - Right _ -> return () - - loadState key = do - conn <- asks peRedisSocket - res <- liftIO $ runRedis conn $ get (encodeUtf8 key) - -- TODO: just chain eithers - case res of - Left _ -> do - logWarning "Junction" "Unable to load state" - return def - Right maybeRawState -> - case maybeRawState of - Just rawState -> case eitherDecode $ BL.fromStrict rawState of - Left _ -> do - logWarning "Junction" "Unable to decode state" - return def - Right decodedState -> return decodedState - Nothing -> do - logWarning "Junction" "Unable to decode state" - return def - -instance QuoteStream JunctionM where - addSubscription (QuoteSubscription ticker timeframe) chan = do - qt <- asks peQuoteThread - QT.addSubscription qt ticker timeframe chan - return (SubscriptionId 0) -- TODO subscription Ids - removeSubscription _ = undefined locked :: (MonadIO m, MonadUnliftIO m) => QSem -> LogAction m a -> LogAction m a locked sem action = LogAction (\m -> withQSem sem (action <& m)) @@ -217,71 +132,33 @@ junctionMain descriptors = do ordersMap <- newIORef M.empty handledNotifications <- newIORef S.empty withBroker cfg ctx robotsMap ordersMap handledNotifications globalLogger $ \bro -> - withQThread downloaderEnv barsMap tickerInfoMap cfg ctx globalLogger $ \qt -> do - broService <- mkBrokerService bro ordersMap - let junctionLogAction = hoistLogAction liftIO globalLogger - let env = - JunctionEnv - { - peRedisSocket = redis, - peConfigPath = robotsConfigsPath cfg, - peQuoteThread = qt, - peBroker = bro, - peRobots = robotsMap, - peLogAction = junctionLogAction - } - withJunction env $ do - startRobots (hoistLogAction liftIO globalLogger) cfg barsMap tickerInfoMap broService - forever $ do - notifications <- liftIO $ getNotifications broService - forM_ notifications (liftIO . handleBrokerNotification robotsMap ordersMap handledNotifications globalLogger) - saveRobots - liftIO $ threadDelay 1000000 + withQThread downloaderEnv barsMap tickerInfoMap cfg ctx globalLogger $ \qt -> + withSocket ctx Rep $ \rcSocket -> do + liftIO $ bind rcSocket (T.unpack . remoteControlEndpoint $ cfg) + broService <- mkBrokerService bro ordersMap + let junctionLogAction = hoistLogAction liftIO globalLogger + let env = + JunctionEnv + { + peRedisSocket = redis, + peConfigPath = robotsConfigsPath cfg, + peQuoteThread = qt, + peBroker = bro, + peRobots = robotsMap, + peRemoteControlSocket = rcSocket, + peLogAction = junctionLogAction + } + withJunction env $ do + startRobots (hoistLogAction liftIO globalLogger) cfg barsMap tickerInfoMap broService + forever $ do + notifications <- liftIO $ getNotifications broService + forM_ notifications (liftIO . handleBrokerNotification robotsMap ordersMap handledNotifications globalLogger) + saveRobots + handleRemoteControl 1000000 where - saveRobots :: JunctionM () - saveRobots = do - robotsMap <- asks peRobots >>= (liftIO . readIORef) - traverse_ saveRobotState robotsMap - - saveRobotState :: RobotDriverHandle -> JunctionM () - saveRobotState handle = onStrategyInstance handle $ \inst -> do - currentState <- liftIO $ readIORef (strategyState inst) - saveState currentState (strategyInstanceId inst) - currentTimers <- liftIO $ readIORef (strategyTimers inst) - saveState currentTimers (strategyInstanceId inst <> ":timers") - startRobots :: LogAction IO Message -> ProgramConfiguration -> IORef Bars -> IORef TickerInfoMap -> BrokerService -> JunctionM () startRobots gLogger cfg barsMap tickerInfoMap broService = forM_ (instances cfg) $ \inst -> do - now <- liftIO getCurrentTime - let lLogger = hoistLogAction liftIO gLogger - logWith lLogger Info "Junction" $ "Starting strategy: " <> (strategyBaseName inst) - case M.lookup (strategyBaseName inst) descriptors of - Just (StrategyDescriptorE desc) -> do - bigConf <- loadConfig (configKey inst) - case confTickers bigConf of - (firstTicker:restTickers) -> do - rConf <- liftIO $ newIORef (confStrategy bigConf) - rState <- loadState (stateKey inst) >>= liftIO . newIORef - rTimers <- loadState (stateKey inst <> ":timers") >>= liftIO . newIORef - localH <- liftIO $ openFile (logBasePath cfg <> "/" <> T.unpack (strategyId inst) <> ".log") AppendMode - liftIO $ hSetBuffering localH LineBuffering - let robotLogAction = (hoistLogAction liftIO gLogger) <> (fmtMessage >$< logTextHandle localH) - stratEnv <- liftIO $ newIORef StrategyEnvironment - { - _seInstanceId = strategyId inst, - _seAccount = accountId inst, - _seVolume = 1, - _seLastTimestamp = now - } - let robotEnv = - RobotEnv rState rConf rTimers barsMap tickerInfoMap stratEnv robotLogAction broService (toBarSeriesId <$> (firstTicker :| restTickers)) - robot <- createRobotDriverThread inst desc (flip runReaderT robotEnv . unRobotM) bigConf rConf rState rTimers - robotsMap' <- asks peRobots - liftIO $ atomicModifyIORef' robotsMap' (\s -> (M.insert (strategyId inst) robot s, ())) - _ -> logWith lLogger Error (strategyId inst) $ "No tickers configured !!!" - Nothing -> logWith lLogger Error "Junction" $ "Unknown strategy: " <> strategyBaseName inst - - toBarSeriesId t = BarSeriesId (tickerId t) (timeframe t) + startRobot gLogger cfg barsMap tickerInfoMap broService descriptors inst withJunction :: JunctionEnv -> JunctionM () -> IO () withJunction env = (`runReaderT` env) . unJunctionM diff --git a/src/ATrade/Driver/Junction/JunctionMonad.hs b/src/ATrade/Driver/Junction/JunctionMonad.hs new file mode 100644 index 0000000..43be49b --- /dev/null +++ b/src/ATrade/Driver/Junction/JunctionMonad.hs @@ -0,0 +1,194 @@ +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE OverloadedStrings #-} + + +module ATrade.Driver.Junction.JunctionMonad + ( + JunctionEnv(..), + JunctionM(..), + startRobot, + saveRobots + ) where + +import ATrade.Broker.Client (BrokerClientHandle) +import ATrade.Driver.Junction.BrokerService (BrokerService) +import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (logBasePath)) +import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription, removeSubscription), + QuoteSubscription (QuoteSubscription), + SubscriptionId (SubscriptionId)) +import ATrade.Driver.Junction.QuoteThread (QuoteThreadHandle) +import qualified ATrade.Driver.Junction.QuoteThread as QT +import ATrade.Driver.Junction.RobotDriverThread (RobotDriverHandle, RobotEnv (RobotEnv), + RobotM (unRobotM), + createRobotDriverThread, + onStrategyInstance) +import ATrade.Driver.Junction.Types (StrategyDescriptorE (StrategyDescriptorE), + StrategyInstanceDescriptor, + accountId, + confStrategy, + confTickers, + configKey, + stateKey, + strategyBaseName, + strategyId, + strategyInstanceId, + strategyState, + strategyTimers, + tickerId, + timeframe) +import ATrade.Logging (Message, Severity (Error, Info), + fmtMessage, + logWarning, + logWith) +import ATrade.RoboCom.ConfigStorage (ConfigStorage (loadConfig)) +import ATrade.RoboCom.Monad (StrategyEnvironment (..)) +import ATrade.RoboCom.Persistence (MonadPersistence (loadState, saveState)) +import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId), + Bars, + TickerInfoMap) +import Colog (HasLog (getLogAction, setLogAction), + LogAction, + hoistLogAction, + logTextHandle, + (>$<)) +import Control.Exception.Safe (MonadThrow) +import Control.Monad.Reader (MonadIO (liftIO), + MonadReader, + ReaderT (runReaderT), + asks) +import Data.Aeson (eitherDecode, + encode) +import qualified Data.ByteString.Lazy as BL +import Data.Default (Default (def)) +import Data.Foldable (traverse_) +import Data.IORef (IORef, + atomicModifyIORef', + newIORef, + readIORef) +import Data.List.NonEmpty (NonEmpty ((:|))) +import qualified Data.Map.Strict as M +import qualified Data.Text as T +import Data.Text.Encoding (encodeUtf8) +import Data.Text.IO (readFile) +import Data.Time (getCurrentTime) +import Data.Time.Clock.POSIX (getPOSIXTime) +import Database.Redis (Connection, get, + mset, runRedis) +import Dhall (auto, input) +import Prelude hiding (readFile) +import System.IO (BufferMode (LineBuffering), + IOMode (AppendMode), + hSetBuffering, + openFile) +import System.ZMQ4 (Rep, Socket) + +data JunctionEnv = + JunctionEnv + { + peRedisSocket :: Connection, + peConfigPath :: FilePath, + peQuoteThread :: QuoteThreadHandle, + peBroker :: BrokerClientHandle, + peRobots :: IORef (M.Map T.Text RobotDriverHandle), + peRemoteControlSocket :: Socket Rep, + peLogAction :: LogAction JunctionM Message + } + +newtype JunctionM a = JunctionM { unJunctionM :: ReaderT JunctionEnv IO a } + deriving (Functor, Applicative, Monad, MonadReader JunctionEnv, MonadIO, MonadThrow) + +instance HasLog JunctionEnv Message JunctionM where + getLogAction = peLogAction + setLogAction a e = e { peLogAction = a } + +instance ConfigStorage JunctionM where + loadConfig key = do + basePath <- asks peConfigPath + let path = basePath <> "/" <> T.unpack key -- TODO fix path construction + liftIO $ readFile path >>= input auto + +instance MonadPersistence JunctionM where + saveState newState key = do + conn <- asks peRedisSocket + now <- liftIO getPOSIXTime + res <- liftIO $ runRedis conn $ mset [(encodeUtf8 key, BL.toStrict $ encode newState), + (encodeUtf8 (key <> ":last_store") , encodeUtf8 . T.pack . show $ now)] + case res of + Left _ -> logWarning "Junction " "Unable to save state" + Right _ -> return () + + loadState key = do + conn <- asks peRedisSocket + res <- liftIO $ runRedis conn $ get (encodeUtf8 key) + -- TODO: just chain eithers + case res of + Left _ -> do + logWarning "Junction" "Unable to load state" + return def + Right maybeRawState -> + case maybeRawState of + Just rawState -> case eitherDecode $ BL.fromStrict rawState of + Left _ -> do + logWarning "Junction" "Unable to decode state" + return def + Right decodedState -> return decodedState + Nothing -> do + logWarning "Junction" "Unable to decode state" + return def + +instance QuoteStream JunctionM where + addSubscription (QuoteSubscription ticker timeframe) chan = do + qt <- asks peQuoteThread + QT.addSubscription qt ticker timeframe chan + return (SubscriptionId 0) -- TODO subscription Ids + removeSubscription _ = undefined + +startRobot :: LogAction IO Message -> ProgramConfiguration -> IORef Bars -> IORef TickerInfoMap -> + BrokerService -> M.Map T.Text StrategyDescriptorE -> StrategyInstanceDescriptor -> JunctionM () +startRobot ioLogger cfg barsMap tickerInfoMap broService descriptors inst = do + logger <- asks peLogAction + let log = logWith logger + now <- liftIO getCurrentTime + let lLogger = hoistLogAction liftIO ioLogger + logWith lLogger Info "Junction" $ "Starting strategy: " <> strategyBaseName inst + case M.lookup (strategyBaseName inst) descriptors of + Just (StrategyDescriptorE desc) -> do + bigConf <- loadConfig (configKey inst) + case confTickers bigConf of + (firstTicker:restTickers) -> do + rConf <- liftIO $ newIORef (confStrategy bigConf) + rState <- loadState (stateKey inst) >>= liftIO . newIORef + rTimers <- loadState (stateKey inst <> ":timers") >>= liftIO . newIORef + localH <- liftIO $ openFile (logBasePath cfg <> "/" <> T.unpack (strategyId inst) <> ".log") AppendMode + liftIO $ hSetBuffering localH LineBuffering + let robotLogAction = hoistLogAction liftIO ioLogger <> (fmtMessage >$< logTextHandle localH) + stratEnv <- liftIO $ newIORef StrategyEnvironment + { + _seInstanceId = strategyId inst, + _seAccount = accountId inst, + _seVolume = 1, + _seLastTimestamp = now + } + let robotEnv = + RobotEnv rState rConf rTimers barsMap tickerInfoMap stratEnv robotLogAction broService (toBarSeriesId <$> (firstTicker :| restTickers)) + robot <- createRobotDriverThread inst desc (flip runReaderT robotEnv . unRobotM) bigConf rConf rState rTimers + robotsMap' <- asks peRobots + liftIO $ atomicModifyIORef' robotsMap' (\s -> (M.insert (strategyId inst) robot s, ())) + _ -> logWith lLogger Error (strategyId inst) "No tickers configured !!!" + Nothing -> logWith lLogger Error "Junction" $ "Unknown strategy: " <> strategyBaseName inst + + where + toBarSeriesId t = BarSeriesId (tickerId t) (timeframe t) + +saveRobots :: JunctionM () +saveRobots = do + robotsMap <- asks peRobots >>= (liftIO . readIORef) + traverse_ saveRobotState robotsMap + +saveRobotState :: RobotDriverHandle -> JunctionM () +saveRobotState handle = onStrategyInstance handle $ \inst -> do + currentState <- liftIO $ readIORef (strategyState inst) + saveState currentState (strategyInstanceId inst) + currentTimers <- liftIO $ readIORef (strategyTimers inst) + saveState currentTimers (strategyInstanceId inst <> ":timers") diff --git a/src/ATrade/Driver/Junction/ProgramConfiguration.hs b/src/ATrade/Driver/Junction/ProgramConfiguration.hs index 5591646..cc4b33e 100644 --- a/src/ATrade/Driver/Junction/ProgramConfiguration.hs +++ b/src/ATrade/Driver/Junction/ProgramConfiguration.hs @@ -29,6 +29,7 @@ data ProgramConfiguration = quotesourceClientCert :: Maybe FilePath, qhpEndpoint :: T.Text, qtisEndpoint :: T.Text, + remoteControlEndpoint :: T.Text, redisSocket :: T.Text, robotsConfigsPath :: FilePath, logBasePath :: FilePath, diff --git a/src/ATrade/Driver/Junction/RemoteControl.hs b/src/ATrade/Driver/Junction/RemoteControl.hs new file mode 100644 index 0000000..3579402 --- /dev/null +++ b/src/ATrade/Driver/Junction/RemoteControl.hs @@ -0,0 +1,97 @@ +{-# LANGUAGE MultiWayIf #-} +{-# LANGUAGE OverloadedStrings #-} + +module ATrade.Driver.Junction.RemoteControl + ( + handleRemoteControl + ) where + +import ATrade.Driver.Junction.JunctionMonad (JunctionEnv (peLogAction, peRemoteControlSocket, peRobots), + JunctionM) +import ATrade.Driver.Junction.Types (StrategyInstanceDescriptor) +import ATrade.Logging (logErrorWith) +import Control.Monad (unless) +import Control.Monad.Reader (asks) +import Data.Aeson (decode) +import qualified Data.ByteString as B +import qualified Data.ByteString.Lazy as BL +import qualified Data.Map.Strict as M +import qualified Data.Text as T +import Data.Text.Encoding (decodeUtf8', encodeUtf8) +import System.ZMQ4 (Event (In), Poll (Sock), + poll, receive, send) +import UnliftIO (MonadIO (liftIO)) + +data RemoteControlResponse = + ResponseOk + | ResponseError T.Text + deriving (Show, Eq) + +data RemoteControlRequest = + StartRobot StrategyInstanceDescriptor + | StopRobot T.Text + | ReloadConfig T.Text + | SetState T.Text B.ByteString + | Ping + deriving (Show) + +data ParseError = + UnknownCmd + | UtfDecodeError + | JsonDecodeError + deriving (Show, Eq) + +parseRemoteControlRequest :: B.ByteString -> Either ParseError RemoteControlRequest +parseRemoteControlRequest bs = + if + | cmd == "START" -> parseStart + | cmd == "STOP" -> parseStop + | cmd == "RELOAD_CONFIG" -> parseReloadConfig + | cmd == "SET_STATE" -> parseSetState + | cmd == "PING" -> Right Ping + | otherwise -> Left UnknownCmd + where + cmd = B.takeWhile (/= 0x20) bs + rest = B.dropWhile (== 0x20) . B.dropWhile (/= 0x20) $ bs + + parseStart = case decode . BL.fromStrict $ rest of + Just inst -> Right (StartRobot inst) + Nothing -> Left JsonDecodeError + + parseStop = case decodeUtf8' rest of + Left _ -> Left UtfDecodeError + Right r -> Right (StopRobot (T.strip r)) + + parseReloadConfig = case decodeUtf8' rest of + Left _ -> Left UtfDecodeError + Right r -> Right (ReloadConfig (T.strip r)) + + parseSetState = case decodeUtf8' (B.takeWhile (/= 0x20) rest) of + Left _ -> Left UtfDecodeError + Right r -> Right (SetState r (B.dropWhile (== 0x20) . B.dropWhile (/= 0x20) $ rest)) + + +makeRemoteControlResponse :: RemoteControlResponse -> B.ByteString +makeRemoteControlResponse ResponseOk = "OK" +makeRemoteControlResponse (ResponseError msg) = "ERROR " <> encodeUtf8 msg + +handleRemoteControl :: Int -> JunctionM () +handleRemoteControl timeout = do + sock <- asks peRemoteControlSocket + logger <- asks peLogAction + evs <- poll (fromIntegral timeout) [Sock sock [In] Nothing] + unless (null evs) $ do + rawRequest <- liftIO $ receive sock + case parseRemoteControlRequest rawRequest of + Left err -> logErrorWith logger "RemoteControl" ("Unable to parse request: " <> (T.pack . show) err) + Right request -> do + response <- handleRequest request + liftIO $ send sock [] (makeRemoteControlResponse response) + where + handleRequest (StartRobot inst) = undefined + handleRequest (StopRobot instId) = undefined + handleRequest (ReloadConfig instId) = undefined + handleRequest (SetState instId rawState) = undefined + handleRequest Ping = return ResponseOk + + diff --git a/src/ATrade/Driver/Junction/Types.hs b/src/ATrade/Driver/Junction/Types.hs index 8054daf..510759c 100644 --- a/src/ATrade/Driver/Junction/Types.hs +++ b/src/ATrade/Driver/Junction/Types.hs @@ -1,6 +1,7 @@ {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE ExistentialQuantification #-} +{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} module ATrade.Driver.Junction.Types @@ -16,7 +17,8 @@ module ATrade.Driver.Junction.Types import ATrade.RoboCom.Monad (EventCallback) import ATrade.Types (BarTimeframe (..), TickerId) -import Data.Aeson (FromJSON (..), ToJSON (..)) +import Data.Aeson (FromJSON (..), ToJSON (..), withObject, + (.:)) import Data.Default (Default) import Data.IORef (IORef) import qualified Data.Text as T @@ -66,6 +68,17 @@ data StrategyInstanceDescriptor = instance FromDhall StrategyInstanceDescriptor +instance FromJSON StrategyInstanceDescriptor where + parseJSON = withObject "StrategyInstanceDescriptor" $ \obj -> + StrategyInstanceDescriptor <$> + obj .: "account_id" <*> + obj .: "strategy_id" <*> + obj .: "strategy_base_name" <*> + obj .: "config_key" <*> + obj .: "state_key" <*> + obj .: "log_path" + + data StrategyInstance c s = StrategyInstance {