From d8c5ea63a0567ff8a596069a456172a54c552553 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Sat, 27 Nov 2021 23:29:04 +0700 Subject: [PATCH] Refactoring --- src/ATrade/Driver/Junction.hs | 84 +++++++++---------- .../Driver/Junction/RobotDriverThread.hs | 57 +++++++++++-- 2 files changed, 92 insertions(+), 49 deletions(-) diff --git a/src/ATrade/Driver/Junction.hs b/src/ATrade/Driver/Junction.hs index 85f5570..ed468bc 100644 --- a/src/ATrade/Driver/Junction.hs +++ b/src/ATrade/Driver/Junction.hs @@ -10,25 +10,32 @@ module ATrade.Driver.Junction junctionMain ) where -import ATrade.Broker.Client (startBrokerClient, +import ATrade.Broker.Client (BrokerClientHandle, + startBrokerClient, stopBrokerClient) import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (brokerEndpoint, brokerNotificationEndpoint, instances, qhpEndpoint, qtisEndpoint, redisSocket, robotsConfigsPath), ProgramOptions (ProgramOptions, configPath)) -import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription, removeSubscription)) +import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription, removeSubscription), + QuoteSubscription (QuoteSubscription), + SubscriptionId (SubscriptionId)) import ATrade.Driver.Junction.QuoteThread (DownloaderEnv (DownloaderEnv), + QuoteThreadHandle, withQThread) -import ATrade.Driver.Junction.RobotDriverThread (createRobotDriverThread) +import qualified ATrade.Driver.Junction.QuoteThread as QT +import ATrade.Driver.Junction.RobotDriverThread (RobotEnv (..), + RobotM (..), + createRobotDriverThread) import ATrade.Driver.Junction.Types (StrategyDescriptorE (StrategyDescriptorE), StrategyInstanceDescriptor (..), confStrategy) import ATrade.Quotes.QHP (mkQHPHandle) import ATrade.RoboCom.ConfigStorage (ConfigStorage (loadConfig)) -import ATrade.RoboCom.Monad (MonadRobot (..)) import ATrade.RoboCom.Persistence (MonadPersistence (loadState, saveState)) import ATrade.Types (ClientSecurityParams (ClientSecurityParams)) import Control.Exception.Safe (MonadThrow, bracket) import Control.Monad (forM_) +import Control.Monad (void) import Control.Monad.IO.Class (MonadIO (liftIO)) import Control.Monad.Reader (MonadReader, ReaderT (runReaderT), asks) @@ -36,7 +43,7 @@ import Data.Aeson (eitherDecode, encode) import qualified Data.ByteString.Lazy as BL import Data.Default (Default (def)) -import Data.IORef (IORef, newIORef) +import Data.IORef (newIORef) import qualified Data.Map.Strict as M import qualified Data.Text as T import Data.Text.Encoding (encodeUtf8) @@ -62,23 +69,25 @@ import Prelude hiding (readFile) import System.Log.Logger (warningM) import System.ZMQ4 (withContext) -data PersistenceEnv = - PersistenceEnv +data JunctionEnv = + JunctionEnv { peRedisSocket :: Connection, - peConfigPath :: FilePath + peConfigPath :: FilePath, + peQuoteThread :: QuoteThreadHandle, + peBroker :: BrokerClientHandle } -newtype PersistenceT a = PersistenceT { unPersistenceT :: ReaderT PersistenceEnv IO a } - deriving (Functor, Applicative, Monad, MonadReader PersistenceEnv, MonadIO, MonadThrow) +newtype JunctionM a = JunctionM { unJunctionM :: ReaderT JunctionEnv IO a } + deriving (Functor, Applicative, Monad, MonadReader JunctionEnv, MonadIO, MonadThrow) -instance ConfigStorage PersistenceT where +instance ConfigStorage JunctionM where loadConfig key = do basePath <- asks peConfigPath let path = basePath <> "/" <> T.unpack key -- TODO fix path construction liftIO $ readFile path >>= input auto -instance MonadPersistence PersistenceT where +instance MonadPersistence JunctionM where saveState newState key = do conn <- asks peRedisSocket now <- liftIO getPOSIXTime @@ -107,31 +116,12 @@ instance MonadPersistence PersistenceT where liftIO $ warningM "main" "Unable to decode state" return def -instance QuoteStream PersistenceT where - addSubscription sub chan = undefined - removeSubscription sub = undefined - -data RobotEnv c s = - RobotEnv - { - stateRef :: IORef s, - configRef :: IORef c - } - -newtype RobotM c s a = RobotM { unRobotM :: ReaderT (RobotEnv c s) IO a } - deriving (Functor, Applicative, Monad, MonadReader (RobotEnv c s), MonadIO, MonadThrow) - -instance MonadRobot (RobotM c s) c s where - submitOrder = undefined - cancelOrder = undefined - appendToLog = undefined - setupTimer = undefined - enqueueIOAction = undefined - getConfig = undefined - getState = undefined - setState = undefined - getEnvironment = undefined - getTicker = undefined +instance QuoteStream JunctionM where + addSubscription (QuoteSubscription ticker timeframe) chan = do + qt <- asks peQuoteThread + QT.addSubscription qt ticker timeframe chan + return (SubscriptionId 0) -- TODO subscription Ids + removeSubscription _ = undefined junctionMain :: M.Map T.Text StrategyDescriptorE -> IO () junctionMain descriptors = do @@ -143,22 +133,30 @@ junctionMain descriptors = do redis <- checkedConnect (defaultConnectInfo { connectPort = UnixSocket (T.unpack $ redisSocket cfg) }) withContext $ \ctx -> do - let env = DownloaderEnv (mkQHPHandle ctx (qhpEndpoint cfg)) ctx (qtisEndpoint cfg) + let downloaderEnv = DownloaderEnv (mkQHPHandle ctx (qhpEndpoint cfg)) ctx (qtisEndpoint cfg) withBroker cfg ctx $ \bro -> - withQThread env barsMap cfg ctx $ \qt -> - withPersistence (PersistenceEnv redis $ robotsConfigsPath cfg) $ + withQThread downloaderEnv barsMap cfg ctx $ \qt -> do + let env = + JunctionEnv + { + peRedisSocket = redis, + peConfigPath = robotsConfigsPath cfg, + peQuoteThread = qt, + peBroker = bro + } + withJunction env $ forM_ (instances cfg) $ \inst -> case M.lookup (strategyBaseName inst) descriptors of Just (StrategyDescriptorE desc) -> do bigConf <- loadConfig (configKey inst) rConf <- liftIO $ newIORef (confStrategy bigConf) rState <- loadState (stateKey inst) >>= liftIO . newIORef - let robotEnv = RobotEnv rState rConf + let robotEnv = RobotEnv rState rConf bro barsMap createRobotDriverThread inst desc (flip runReaderT robotEnv . unRobotM) bigConf rConf rState Nothing -> error "Unknown strategy" where - withPersistence :: PersistenceEnv -> PersistenceT () -> IO () - withPersistence env = (`runReaderT` env) . unPersistenceT + withJunction :: JunctionEnv -> JunctionM () -> IO () + withJunction env = (`runReaderT` env) . unJunctionM withBroker cfg ctx f = bracket (startBrokerClient diff --git a/src/ATrade/Driver/Junction/RobotDriverThread.hs b/src/ATrade/Driver/Junction/RobotDriverThread.hs index 7a46b4d..4a55568 100644 --- a/src/ATrade/Driver/Junction/RobotDriverThread.hs +++ b/src/ATrade/Driver/Junction/RobotDriverThread.hs @@ -1,11 +1,18 @@ -{-# LANGUAGE ExistentialQuantification #-} -{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ExistentialQuantification #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE RankNTypes #-} module ATrade.Driver.Junction.RobotDriverThread ( - createRobotDriverThread + createRobotDriverThread, + RobotEnv(..), + RobotM(..) ) where +import ATrade.Broker.Client (BrokerClientHandle (submitOrder)) +import qualified ATrade.Broker.Client as Bro import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription), QuoteSubscription (QuoteSubscription)) import ATrade.Driver.Junction.Types (BigConfig, @@ -19,17 +26,23 @@ import ATrade.Driver.Junction.Types (BigConfig, import ATrade.QuoteSource.Client (QuoteData (..)) import ATrade.RoboCom.ConfigStorage (ConfigStorage (loadConfig)) import ATrade.RoboCom.Monad (Event (NewBar, NewTick, NewTrade, OrderUpdate), - EventCallback, MonadRobot) + MonadRobot (..)) import ATrade.RoboCom.Persistence (MonadPersistence (loadState)) +import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId), + Bars) import ATrade.Types (OrderId, OrderState, Trade) import Control.Concurrent (ThreadId, forkIO) import Control.Concurrent.BoundedChan (BoundedChan, newBoundedChan, readChan, writeChan) -import Control.Monad (forM_, forever) +import Control.Exception.Safe (MonadThrow) +import Control.Monad (forM_, forever, void) import Control.Monad.IO.Class (MonadIO, liftIO) +import Control.Monad.Reader (MonadReader, ReaderT, asks) import Data.Aeson (FromJSON, ToJSON) -import Data.IORef (IORef, newIORef) +import Data.IORef (IORef, readIORef, + writeIORef) +import qualified Data.Map.Strict as M import Dhall (FromDhall) data RobotDriverHandle = forall c s. RobotDriverHandle (StrategyInstance c s) ThreadId ThreadId (BoundedChan RobotDriverEvent) @@ -93,3 +106,35 @@ createRobotDriverThread instDesc strDesc runner bigConf rConf rState = do passQuoteEvents eventQueue quoteQueue = do v <- readChan quoteQueue writeChan eventQueue (QuoteEvent v) + +data RobotEnv c s = + RobotEnv + { + stateRef :: IORef s, + configRef :: IORef c, + broker :: BrokerClientHandle, + bars :: IORef Bars + } + +newtype RobotM c s a = RobotM { unRobotM :: ReaderT (RobotEnv c s) IO a } + deriving (Functor, Applicative, Monad, MonadReader (RobotEnv c s), MonadIO, MonadThrow) + +instance MonadRobot (RobotM c s) c s where + submitOrder order = do + bro <- asks broker + liftIO $ void $ Bro.submitOrder bro order + + cancelOrder oid = do + bro <- asks broker + liftIO $ void $ Bro.cancelOrder bro oid + + appendToLog = undefined + setupTimer = undefined + enqueueIOAction = undefined + getConfig = asks configRef >>= liftIO . readIORef + getState = asks stateRef >>= liftIO . readIORef + setState newState = asks stateRef >>= liftIO . flip writeIORef newState + getEnvironment = undefined + getTicker tid tf = do + b <- asks bars >>= liftIO . readIORef + return $ M.lookup (BarSeriesId tid tf) b