Browse Source

Refactoring

junction
Denis Tereshkin 4 years ago
parent
commit
d8c5ea63a0
  1. 84
      src/ATrade/Driver/Junction.hs
  2. 57
      src/ATrade/Driver/Junction/RobotDriverThread.hs

84
src/ATrade/Driver/Junction.hs

@ -10,25 +10,32 @@ module ATrade.Driver.Junction
junctionMain junctionMain
) where ) where
import ATrade.Broker.Client (startBrokerClient, import ATrade.Broker.Client (BrokerClientHandle,
startBrokerClient,
stopBrokerClient) stopBrokerClient)
import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (brokerEndpoint, brokerNotificationEndpoint, instances, qhpEndpoint, qtisEndpoint, redisSocket, robotsConfigsPath), import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (brokerEndpoint, brokerNotificationEndpoint, instances, qhpEndpoint, qtisEndpoint, redisSocket, robotsConfigsPath),
ProgramOptions (ProgramOptions, configPath)) ProgramOptions (ProgramOptions, configPath))
import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription, removeSubscription)) import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription, removeSubscription),
QuoteSubscription (QuoteSubscription),
SubscriptionId (SubscriptionId))
import ATrade.Driver.Junction.QuoteThread (DownloaderEnv (DownloaderEnv), import ATrade.Driver.Junction.QuoteThread (DownloaderEnv (DownloaderEnv),
QuoteThreadHandle,
withQThread) withQThread)
import ATrade.Driver.Junction.RobotDriverThread (createRobotDriverThread) import qualified ATrade.Driver.Junction.QuoteThread as QT
import ATrade.Driver.Junction.RobotDriverThread (RobotEnv (..),
RobotM (..),
createRobotDriverThread)
import ATrade.Driver.Junction.Types (StrategyDescriptorE (StrategyDescriptorE), import ATrade.Driver.Junction.Types (StrategyDescriptorE (StrategyDescriptorE),
StrategyInstanceDescriptor (..), StrategyInstanceDescriptor (..),
confStrategy) confStrategy)
import ATrade.Quotes.QHP (mkQHPHandle) import ATrade.Quotes.QHP (mkQHPHandle)
import ATrade.RoboCom.ConfigStorage (ConfigStorage (loadConfig)) import ATrade.RoboCom.ConfigStorage (ConfigStorage (loadConfig))
import ATrade.RoboCom.Monad (MonadRobot (..))
import ATrade.RoboCom.Persistence (MonadPersistence (loadState, saveState)) import ATrade.RoboCom.Persistence (MonadPersistence (loadState, saveState))
import ATrade.Types (ClientSecurityParams (ClientSecurityParams)) import ATrade.Types (ClientSecurityParams (ClientSecurityParams))
import Control.Exception.Safe (MonadThrow, import Control.Exception.Safe (MonadThrow,
bracket) bracket)
import Control.Monad (forM_) import Control.Monad (forM_)
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO (liftIO)) import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Reader (MonadReader, ReaderT (runReaderT), import Control.Monad.Reader (MonadReader, ReaderT (runReaderT),
asks) asks)
@ -36,7 +43,7 @@ import Data.Aeson (eitherDecode,
encode) encode)
import qualified Data.ByteString.Lazy as BL import qualified Data.ByteString.Lazy as BL
import Data.Default (Default (def)) import Data.Default (Default (def))
import Data.IORef (IORef, newIORef) import Data.IORef (newIORef)
import qualified Data.Map.Strict as M import qualified Data.Map.Strict as M
import qualified Data.Text as T import qualified Data.Text as T
import Data.Text.Encoding (encodeUtf8) import Data.Text.Encoding (encodeUtf8)
@ -62,23 +69,25 @@ import Prelude hiding (readFile)
import System.Log.Logger (warningM) import System.Log.Logger (warningM)
import System.ZMQ4 (withContext) import System.ZMQ4 (withContext)
data PersistenceEnv = data JunctionEnv =
PersistenceEnv JunctionEnv
{ {
peRedisSocket :: Connection, peRedisSocket :: Connection,
peConfigPath :: FilePath peConfigPath :: FilePath,
peQuoteThread :: QuoteThreadHandle,
peBroker :: BrokerClientHandle
} }
newtype PersistenceT a = PersistenceT { unPersistenceT :: ReaderT PersistenceEnv IO a } newtype JunctionM a = JunctionM { unJunctionM :: ReaderT JunctionEnv IO a }
deriving (Functor, Applicative, Monad, MonadReader PersistenceEnv, MonadIO, MonadThrow) deriving (Functor, Applicative, Monad, MonadReader JunctionEnv, MonadIO, MonadThrow)
instance ConfigStorage PersistenceT where instance ConfigStorage JunctionM where
loadConfig key = do loadConfig key = do
basePath <- asks peConfigPath basePath <- asks peConfigPath
let path = basePath <> "/" <> T.unpack key -- TODO fix path construction let path = basePath <> "/" <> T.unpack key -- TODO fix path construction
liftIO $ readFile path >>= input auto liftIO $ readFile path >>= input auto
instance MonadPersistence PersistenceT where instance MonadPersistence JunctionM where
saveState newState key = do saveState newState key = do
conn <- asks peRedisSocket conn <- asks peRedisSocket
now <- liftIO getPOSIXTime now <- liftIO getPOSIXTime
@ -107,31 +116,12 @@ instance MonadPersistence PersistenceT where
liftIO $ warningM "main" "Unable to decode state" liftIO $ warningM "main" "Unable to decode state"
return def return def
instance QuoteStream PersistenceT where instance QuoteStream JunctionM where
addSubscription sub chan = undefined addSubscription (QuoteSubscription ticker timeframe) chan = do
removeSubscription sub = undefined qt <- asks peQuoteThread
QT.addSubscription qt ticker timeframe chan
data RobotEnv c s = return (SubscriptionId 0) -- TODO subscription Ids
RobotEnv removeSubscription _ = undefined
{
stateRef :: IORef s,
configRef :: IORef c
}
newtype RobotM c s a = RobotM { unRobotM :: ReaderT (RobotEnv c s) IO a }
deriving (Functor, Applicative, Monad, MonadReader (RobotEnv c s), MonadIO, MonadThrow)
instance MonadRobot (RobotM c s) c s where
submitOrder = undefined
cancelOrder = undefined
appendToLog = undefined
setupTimer = undefined
enqueueIOAction = undefined
getConfig = undefined
getState = undefined
setState = undefined
getEnvironment = undefined
getTicker = undefined
junctionMain :: M.Map T.Text StrategyDescriptorE -> IO () junctionMain :: M.Map T.Text StrategyDescriptorE -> IO ()
junctionMain descriptors = do junctionMain descriptors = do
@ -143,22 +133,30 @@ junctionMain descriptors = do
redis <- checkedConnect (defaultConnectInfo { connectPort = UnixSocket (T.unpack $ redisSocket cfg) }) redis <- checkedConnect (defaultConnectInfo { connectPort = UnixSocket (T.unpack $ redisSocket cfg) })
withContext $ \ctx -> do withContext $ \ctx -> do
let env = DownloaderEnv (mkQHPHandle ctx (qhpEndpoint cfg)) ctx (qtisEndpoint cfg) let downloaderEnv = DownloaderEnv (mkQHPHandle ctx (qhpEndpoint cfg)) ctx (qtisEndpoint cfg)
withBroker cfg ctx $ \bro -> withBroker cfg ctx $ \bro ->
withQThread env barsMap cfg ctx $ \qt -> withQThread downloaderEnv barsMap cfg ctx $ \qt -> do
withPersistence (PersistenceEnv redis $ robotsConfigsPath cfg) $ let env =
JunctionEnv
{
peRedisSocket = redis,
peConfigPath = robotsConfigsPath cfg,
peQuoteThread = qt,
peBroker = bro
}
withJunction env $
forM_ (instances cfg) $ \inst -> forM_ (instances cfg) $ \inst ->
case M.lookup (strategyBaseName inst) descriptors of case M.lookup (strategyBaseName inst) descriptors of
Just (StrategyDescriptorE desc) -> do Just (StrategyDescriptorE desc) -> do
bigConf <- loadConfig (configKey inst) bigConf <- loadConfig (configKey inst)
rConf <- liftIO $ newIORef (confStrategy bigConf) rConf <- liftIO $ newIORef (confStrategy bigConf)
rState <- loadState (stateKey inst) >>= liftIO . newIORef rState <- loadState (stateKey inst) >>= liftIO . newIORef
let robotEnv = RobotEnv rState rConf let robotEnv = RobotEnv rState rConf bro barsMap
createRobotDriverThread inst desc (flip runReaderT robotEnv . unRobotM) bigConf rConf rState createRobotDriverThread inst desc (flip runReaderT robotEnv . unRobotM) bigConf rConf rState
Nothing -> error "Unknown strategy" Nothing -> error "Unknown strategy"
where where
withPersistence :: PersistenceEnv -> PersistenceT () -> IO () withJunction :: JunctionEnv -> JunctionM () -> IO ()
withPersistence env = (`runReaderT` env) . unPersistenceT withJunction env = (`runReaderT` env) . unJunctionM
withBroker cfg ctx f = bracket withBroker cfg ctx f = bracket
(startBrokerClient (startBrokerClient

57
src/ATrade/Driver/Junction/RobotDriverThread.hs

@ -1,11 +1,18 @@
{-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE RankNTypes #-} {-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
module ATrade.Driver.Junction.RobotDriverThread module ATrade.Driver.Junction.RobotDriverThread
( (
createRobotDriverThread createRobotDriverThread,
RobotEnv(..),
RobotM(..)
) where ) where
import ATrade.Broker.Client (BrokerClientHandle (submitOrder))
import qualified ATrade.Broker.Client as Bro
import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription), import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription),
QuoteSubscription (QuoteSubscription)) QuoteSubscription (QuoteSubscription))
import ATrade.Driver.Junction.Types (BigConfig, import ATrade.Driver.Junction.Types (BigConfig,
@ -19,17 +26,23 @@ import ATrade.Driver.Junction.Types (BigConfig,
import ATrade.QuoteSource.Client (QuoteData (..)) import ATrade.QuoteSource.Client (QuoteData (..))
import ATrade.RoboCom.ConfigStorage (ConfigStorage (loadConfig)) import ATrade.RoboCom.ConfigStorage (ConfigStorage (loadConfig))
import ATrade.RoboCom.Monad (Event (NewBar, NewTick, NewTrade, OrderUpdate), import ATrade.RoboCom.Monad (Event (NewBar, NewTick, NewTrade, OrderUpdate),
EventCallback, MonadRobot) MonadRobot (..))
import ATrade.RoboCom.Persistence (MonadPersistence (loadState)) import ATrade.RoboCom.Persistence (MonadPersistence (loadState))
import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId),
Bars)
import ATrade.Types (OrderId, OrderState, Trade) import ATrade.Types (OrderId, OrderState, Trade)
import Control.Concurrent (ThreadId, forkIO) import Control.Concurrent (ThreadId, forkIO)
import Control.Concurrent.BoundedChan (BoundedChan, import Control.Concurrent.BoundedChan (BoundedChan,
newBoundedChan, readChan, newBoundedChan, readChan,
writeChan) writeChan)
import Control.Monad (forM_, forever) import Control.Exception.Safe (MonadThrow)
import Control.Monad (forM_, forever, void)
import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Reader (MonadReader, ReaderT, asks)
import Data.Aeson (FromJSON, ToJSON) import Data.Aeson (FromJSON, ToJSON)
import Data.IORef (IORef, newIORef) import Data.IORef (IORef, readIORef,
writeIORef)
import qualified Data.Map.Strict as M
import Dhall (FromDhall) import Dhall (FromDhall)
data RobotDriverHandle = forall c s. RobotDriverHandle (StrategyInstance c s) ThreadId ThreadId (BoundedChan RobotDriverEvent) data RobotDriverHandle = forall c s. RobotDriverHandle (StrategyInstance c s) ThreadId ThreadId (BoundedChan RobotDriverEvent)
@ -93,3 +106,35 @@ createRobotDriverThread instDesc strDesc runner bigConf rConf rState = do
passQuoteEvents eventQueue quoteQueue = do passQuoteEvents eventQueue quoteQueue = do
v <- readChan quoteQueue v <- readChan quoteQueue
writeChan eventQueue (QuoteEvent v) writeChan eventQueue (QuoteEvent v)
data RobotEnv c s =
RobotEnv
{
stateRef :: IORef s,
configRef :: IORef c,
broker :: BrokerClientHandle,
bars :: IORef Bars
}
newtype RobotM c s a = RobotM { unRobotM :: ReaderT (RobotEnv c s) IO a }
deriving (Functor, Applicative, Monad, MonadReader (RobotEnv c s), MonadIO, MonadThrow)
instance MonadRobot (RobotM c s) c s where
submitOrder order = do
bro <- asks broker
liftIO $ void $ Bro.submitOrder bro order
cancelOrder oid = do
bro <- asks broker
liftIO $ void $ Bro.cancelOrder bro oid
appendToLog = undefined
setupTimer = undefined
enqueueIOAction = undefined
getConfig = asks configRef >>= liftIO . readIORef
getState = asks stateRef >>= liftIO . readIORef
setState newState = asks stateRef >>= liftIO . flip writeIORef newState
getEnvironment = undefined
getTicker tid tf = do
b <- asks bars >>= liftIO . readIORef
return $ M.lookup (BarSeriesId tid tf) b

Loading…
Cancel
Save