Browse Source

junction: save periodically robots state

junction
Denis Tereshkin 4 years ago
parent
commit
517fb4d4d3
  1. 67
      src/ATrade/Driver/Junction.hs
  2. 18
      src/ATrade/Driver/Junction/RobotDriverThread.hs
  3. 8
      src/ATrade/Driver/Junction/Types.hs

67
src/ATrade/Driver/Junction.hs

@ -1,5 +1,6 @@
{-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE MultiParamTypeClasses #-}
@ -22,20 +23,24 @@ import ATrade.Driver.Junction.QuoteThread (DownloaderEnv (Dow
QuoteThreadHandle, QuoteThreadHandle,
withQThread) withQThread)
import qualified ATrade.Driver.Junction.QuoteThread as QT import qualified ATrade.Driver.Junction.QuoteThread as QT
import ATrade.Driver.Junction.RobotDriverThread (RobotEnv (..), import ATrade.Driver.Junction.RobotDriverThread (RobotDriverHandle,
RobotEnv (..),
RobotM (..), RobotM (..),
createRobotDriverThread) createRobotDriverThread,
onStrategyInstance)
import ATrade.Driver.Junction.Types (StrategyDescriptorE (StrategyDescriptorE), import ATrade.Driver.Junction.Types (StrategyDescriptorE (StrategyDescriptorE),
StrategyInstance (strategyInstanceId),
StrategyInstanceDescriptor (..), StrategyInstanceDescriptor (..),
confStrategy) confStrategy,
strategyState)
import ATrade.Quotes.QHP (mkQHPHandle) import ATrade.Quotes.QHP (mkQHPHandle)
import ATrade.RoboCom.ConfigStorage (ConfigStorage (loadConfig)) import ATrade.RoboCom.ConfigStorage (ConfigStorage (loadConfig))
import ATrade.RoboCom.Persistence (MonadPersistence (loadState, saveState)) import ATrade.RoboCom.Persistence (MonadPersistence (loadState, saveState))
import ATrade.Types (ClientSecurityParams (ClientSecurityParams)) import ATrade.Types (ClientSecurityParams (ClientSecurityParams))
import Control.Concurrent
import Control.Exception.Safe (MonadThrow, import Control.Exception.Safe (MonadThrow,
bracket) bracket)
import Control.Monad (forM_) import Control.Monad (forM_, forever)
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO (liftIO)) import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Reader (MonadReader, ReaderT (runReaderT), import Control.Monad.Reader (MonadReader, ReaderT (runReaderT),
asks) asks)
@ -43,7 +48,11 @@ import Data.Aeson (eitherDecode,
encode) encode)
import qualified Data.ByteString.Lazy as BL import qualified Data.ByteString.Lazy as BL
import Data.Default (Default (def)) import Data.Default (Default (def))
import Data.IORef (newIORef) import Data.Foldable (traverse_)
import Data.IORef (IORef,
atomicModifyIORef',
newIORef,
readIORef)
import qualified Data.Map.Strict as M import qualified Data.Map.Strict as M
import qualified Data.Text as T import qualified Data.Text as T
import Data.Text.Encoding (encodeUtf8) import Data.Text.Encoding (encodeUtf8)
@ -75,7 +84,8 @@ data JunctionEnv =
peRedisSocket :: Connection, peRedisSocket :: Connection,
peConfigPath :: FilePath, peConfigPath :: FilePath,
peQuoteThread :: QuoteThreadHandle, peQuoteThread :: QuoteThreadHandle,
peBroker :: BrokerClientHandle peBroker :: BrokerClientHandle,
peRobots :: IORef (M.Map T.Text RobotDriverHandle)
} }
newtype JunctionM a = JunctionM { unJunctionM :: ReaderT JunctionEnv IO a } newtype JunctionM a = JunctionM { unJunctionM :: ReaderT JunctionEnv IO a }
@ -136,26 +146,45 @@ junctionMain descriptors = do
let downloaderEnv = DownloaderEnv (mkQHPHandle ctx (qhpEndpoint cfg)) ctx (qtisEndpoint cfg) let downloaderEnv = DownloaderEnv (mkQHPHandle ctx (qhpEndpoint cfg)) ctx (qtisEndpoint cfg)
withBroker cfg ctx $ \bro -> withBroker cfg ctx $ \bro ->
withQThread downloaderEnv barsMap cfg ctx $ \qt -> do withQThread downloaderEnv barsMap cfg ctx $ \qt -> do
robotsMap <- newIORef M.empty
let env = let env =
JunctionEnv JunctionEnv
{ {
peRedisSocket = redis, peRedisSocket = redis,
peConfigPath = robotsConfigsPath cfg, peConfigPath = robotsConfigsPath cfg,
peQuoteThread = qt, peQuoteThread = qt,
peBroker = bro peBroker = bro,
peRobots = robotsMap
} }
withJunction env $ withJunction env $ do
forM_ (instances cfg) $ \inst -> startRobots cfg bro barsMap
case M.lookup (strategyBaseName inst) descriptors of forever $ do
Just (StrategyDescriptorE desc) -> do saveRobots
bigConf <- loadConfig (configKey inst) liftIO $ threadDelay 5000000
rConf <- liftIO $ newIORef (confStrategy bigConf)
rState <- loadState (stateKey inst) >>= liftIO . newIORef
rTimers <- loadState (stateKey inst <> ":timers") >>= liftIO . newIORef
let robotEnv = RobotEnv rState rConf rTimers bro barsMap
createRobotDriverThread inst desc (flip runReaderT robotEnv . unRobotM) bigConf rConf rState
Nothing -> error "Unknown strategy"
where where
saveRobots :: JunctionM ()
saveRobots = do
robotsMap <- asks peRobots >>= (liftIO . readIORef)
traverse_ saveRobotState robotsMap
saveRobotState :: RobotDriverHandle -> JunctionM ()
saveRobotState handle = onStrategyInstance handle $ \inst -> do
currentState <- liftIO $ readIORef (strategyState inst)
saveState currentState (strategyInstanceId inst)
startRobots cfg bro barsMap = forM_ (instances cfg) $ \inst ->
case M.lookup (strategyBaseName inst) descriptors of
Just (StrategyDescriptorE desc) -> do
bigConf <- loadConfig (configKey inst)
rConf <- liftIO $ newIORef (confStrategy bigConf)
rState <- loadState (stateKey inst) >>= liftIO . newIORef
rTimers <- loadState (stateKey inst <> ":timers") >>= liftIO . newIORef
let robotEnv = RobotEnv rState rConf rTimers bro barsMap
robot <- createRobotDriverThread inst desc (flip runReaderT robotEnv . unRobotM) bigConf rConf rState
robotsMap' <- asks peRobots
liftIO $ atomicModifyIORef' robotsMap' (\s -> (M.insert (strategyId inst) robot s, ()))
Nothing -> error "Unknown strategy"
withJunction :: JunctionEnv -> JunctionM () -> IO () withJunction :: JunctionEnv -> JunctionM () -> IO ()
withJunction env = (`runReaderT` env) . unJunctionM withJunction env = (`runReaderT` env) . unJunctionM

18
src/ATrade/Driver/Junction/RobotDriverThread.hs

@ -8,10 +8,12 @@ module ATrade.Driver.Junction.RobotDriverThread
( (
createRobotDriverThread, createRobotDriverThread,
RobotEnv(..), RobotEnv(..),
RobotM(..) RobotM(..),
RobotDriverHandle,
onStrategyInstance
) where ) where
import ATrade.Broker.Client (BrokerClientHandle (submitOrder)) import ATrade.Broker.Client (BrokerClientHandle)
import qualified ATrade.Broker.Client as Bro import qualified ATrade.Broker.Client as Bro
import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription), import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription),
QuoteSubscription (QuoteSubscription)) QuoteSubscription (QuoteSubscription))
@ -24,10 +26,10 @@ import ATrade.Driver.Junction.Types (BigConfig,
strategyId, tickerId, strategyId, tickerId,
timeframe) timeframe)
import ATrade.QuoteSource.Client (QuoteData (..)) import ATrade.QuoteSource.Client (QuoteData (..))
import ATrade.RoboCom.ConfigStorage (ConfigStorage (loadConfig)) import ATrade.RoboCom.ConfigStorage (ConfigStorage)
import ATrade.RoboCom.Monad (Event (NewBar, NewTick, NewTrade, OrderUpdate), import ATrade.RoboCom.Monad (Event (NewBar, NewTick, NewTrade, OrderUpdate),
MonadRobot (..)) MonadRobot (..))
import ATrade.RoboCom.Persistence (MonadPersistence (loadState)) import ATrade.RoboCom.Persistence (MonadPersistence)
import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId), import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId),
Bars) Bars)
import ATrade.Types (OrderId, OrderState, Trade) import ATrade.Types (OrderId, OrderState, Trade)
@ -40,6 +42,7 @@ import Control.Monad (forM_, forever, void)
import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Reader (MonadReader, ReaderT, asks) import Control.Monad.Reader (MonadReader, ReaderT, asks)
import Data.Aeson (FromJSON, ToJSON) import Data.Aeson (FromJSON, ToJSON)
import Data.Default
import Data.IORef (IORef, atomicModifyIORef', import Data.IORef (IORef, atomicModifyIORef',
readIORef, writeIORef) readIORef, writeIORef)
import qualified Data.Map.Strict as M import qualified Data.Map.Strict as M
@ -48,7 +51,8 @@ import Data.Time (UTCTime)
import Dhall (FromDhall) import Dhall (FromDhall)
import System.Log.Logger (infoM) import System.Log.Logger (infoM)
data RobotDriverHandle = forall c s. RobotDriverHandle (StrategyInstance c s) ThreadId ThreadId (BoundedChan RobotDriverEvent) data RobotDriverHandle = forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) =>
RobotDriverHandle (StrategyInstance c s) ThreadId ThreadId (BoundedChan RobotDriverEvent)
data RobotDriverRequest data RobotDriverRequest
@ -80,6 +84,7 @@ createRobotDriverThread :: (MonadIO m1,
ConfigStorage m1, ConfigStorage m1,
MonadPersistence m1, MonadPersistence m1,
QuoteStream m1, QuoteStream m1,
Default s,
FromJSON s, FromJSON s,
ToJSON s, ToJSON s,
FromDhall c, FromDhall c,
@ -110,6 +115,9 @@ createRobotDriverThread instDesc strDesc runner bigConf rConf rState = do
v <- readChan quoteQueue v <- readChan quoteQueue
writeChan eventQueue (QuoteEvent v) writeChan eventQueue (QuoteEvent v)
onStrategyInstance :: RobotDriverHandle -> forall r. (forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) => StrategyInstance c s -> r) -> r
onStrategyInstance (RobotDriverHandle inst _ _ _) f = f inst
data RobotEnv c s = data RobotEnv c s =
RobotEnv RobotEnv
{ {

8
src/ATrade/Driver/Junction/Types.hs

@ -9,8 +9,10 @@ module ATrade.Driver.Junction.Types
TickerConfig(..), TickerConfig(..),
StrategyInstanceDescriptor(..), StrategyInstanceDescriptor(..),
StrategyInstance(..), StrategyInstance(..),
BigConfig(..) BigConfig(..),
,StrategyDescriptorE(..)) where StrategyDescriptorE(..),
StrategyInstanceE(..)
) where
import ATrade.RoboCom.Monad (EventCallback) import ATrade.RoboCom.Monad (EventCallback)
import ATrade.Types (BarTimeframe (..), TickerId) import ATrade.Types (BarTimeframe (..), TickerId)
@ -68,3 +70,5 @@ data StrategyInstance c s =
strategyState :: IORef s, strategyState :: IORef s,
strategyConfig :: IORef c strategyConfig :: IORef c
} }
data StrategyInstanceE = forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) => StrategyInstanceE (StrategyInstance c s)

Loading…
Cancel
Save