diff --git a/src/ATrade/Driver/Junction.hs b/src/ATrade/Driver/Junction.hs index a1573ca..0cd0f94 100644 --- a/src/ATrade/Driver/Junction.hs +++ b/src/ATrade/Driver/Junction.hs @@ -1,5 +1,6 @@ {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE MultiParamTypeClasses #-} @@ -22,20 +23,24 @@ import ATrade.Driver.Junction.QuoteThread (DownloaderEnv (Dow QuoteThreadHandle, withQThread) import qualified ATrade.Driver.Junction.QuoteThread as QT -import ATrade.Driver.Junction.RobotDriverThread (RobotEnv (..), +import ATrade.Driver.Junction.RobotDriverThread (RobotDriverHandle, + RobotEnv (..), RobotM (..), - createRobotDriverThread) + createRobotDriverThread, + onStrategyInstance) import ATrade.Driver.Junction.Types (StrategyDescriptorE (StrategyDescriptorE), + StrategyInstance (strategyInstanceId), StrategyInstanceDescriptor (..), - confStrategy) + confStrategy, + strategyState) import ATrade.Quotes.QHP (mkQHPHandle) import ATrade.RoboCom.ConfigStorage (ConfigStorage (loadConfig)) import ATrade.RoboCom.Persistence (MonadPersistence (loadState, saveState)) import ATrade.Types (ClientSecurityParams (ClientSecurityParams)) +import Control.Concurrent import Control.Exception.Safe (MonadThrow, bracket) -import Control.Monad (forM_) -import Control.Monad (void) +import Control.Monad (forM_, forever) import Control.Monad.IO.Class (MonadIO (liftIO)) import Control.Monad.Reader (MonadReader, ReaderT (runReaderT), asks) @@ -43,7 +48,11 @@ import Data.Aeson (eitherDecode, encode) import qualified Data.ByteString.Lazy as BL import Data.Default (Default (def)) -import Data.IORef (newIORef) +import Data.Foldable (traverse_) +import Data.IORef (IORef, + atomicModifyIORef', + newIORef, + readIORef) import qualified Data.Map.Strict as M import qualified Data.Text as T import Data.Text.Encoding (encodeUtf8) @@ -75,7 +84,8 @@ data JunctionEnv = peRedisSocket :: Connection, peConfigPath :: FilePath, peQuoteThread :: QuoteThreadHandle, - peBroker :: BrokerClientHandle + peBroker :: BrokerClientHandle, + peRobots :: IORef (M.Map T.Text RobotDriverHandle) } newtype JunctionM a = JunctionM { unJunctionM :: ReaderT JunctionEnv IO a } @@ -136,26 +146,45 @@ junctionMain descriptors = do let downloaderEnv = DownloaderEnv (mkQHPHandle ctx (qhpEndpoint cfg)) ctx (qtisEndpoint cfg) withBroker cfg ctx $ \bro -> withQThread downloaderEnv barsMap cfg ctx $ \qt -> do + robotsMap <- newIORef M.empty let env = JunctionEnv { peRedisSocket = redis, peConfigPath = robotsConfigsPath cfg, peQuoteThread = qt, - peBroker = bro + peBroker = bro, + peRobots = robotsMap } - withJunction env $ - forM_ (instances cfg) $ \inst -> - case M.lookup (strategyBaseName inst) descriptors of - Just (StrategyDescriptorE desc) -> do - bigConf <- loadConfig (configKey inst) - rConf <- liftIO $ newIORef (confStrategy bigConf) - rState <- loadState (stateKey inst) >>= liftIO . newIORef - rTimers <- loadState (stateKey inst <> ":timers") >>= liftIO . newIORef - let robotEnv = RobotEnv rState rConf rTimers bro barsMap - createRobotDriverThread inst desc (flip runReaderT robotEnv . unRobotM) bigConf rConf rState - Nothing -> error "Unknown strategy" + withJunction env $ do + startRobots cfg bro barsMap + forever $ do + saveRobots + liftIO $ threadDelay 5000000 where + saveRobots :: JunctionM () + saveRobots = do + robotsMap <- asks peRobots >>= (liftIO . readIORef) + traverse_ saveRobotState robotsMap + + saveRobotState :: RobotDriverHandle -> JunctionM () + saveRobotState handle = onStrategyInstance handle $ \inst -> do + currentState <- liftIO $ readIORef (strategyState inst) + saveState currentState (strategyInstanceId inst) + + startRobots cfg bro barsMap = forM_ (instances cfg) $ \inst -> + case M.lookup (strategyBaseName inst) descriptors of + Just (StrategyDescriptorE desc) -> do + bigConf <- loadConfig (configKey inst) + rConf <- liftIO $ newIORef (confStrategy bigConf) + rState <- loadState (stateKey inst) >>= liftIO . newIORef + rTimers <- loadState (stateKey inst <> ":timers") >>= liftIO . newIORef + let robotEnv = RobotEnv rState rConf rTimers bro barsMap + robot <- createRobotDriverThread inst desc (flip runReaderT robotEnv . unRobotM) bigConf rConf rState + robotsMap' <- asks peRobots + liftIO $ atomicModifyIORef' robotsMap' (\s -> (M.insert (strategyId inst) robot s, ())) + Nothing -> error "Unknown strategy" + withJunction :: JunctionEnv -> JunctionM () -> IO () withJunction env = (`runReaderT` env) . unJunctionM diff --git a/src/ATrade/Driver/Junction/RobotDriverThread.hs b/src/ATrade/Driver/Junction/RobotDriverThread.hs index bf53bd2..d9093f1 100644 --- a/src/ATrade/Driver/Junction/RobotDriverThread.hs +++ b/src/ATrade/Driver/Junction/RobotDriverThread.hs @@ -8,10 +8,12 @@ module ATrade.Driver.Junction.RobotDriverThread ( createRobotDriverThread, RobotEnv(..), - RobotM(..) + RobotM(..), + RobotDriverHandle, + onStrategyInstance ) where -import ATrade.Broker.Client (BrokerClientHandle (submitOrder)) +import ATrade.Broker.Client (BrokerClientHandle) import qualified ATrade.Broker.Client as Bro import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription), QuoteSubscription (QuoteSubscription)) @@ -24,10 +26,10 @@ import ATrade.Driver.Junction.Types (BigConfig, strategyId, tickerId, timeframe) import ATrade.QuoteSource.Client (QuoteData (..)) -import ATrade.RoboCom.ConfigStorage (ConfigStorage (loadConfig)) +import ATrade.RoboCom.ConfigStorage (ConfigStorage) import ATrade.RoboCom.Monad (Event (NewBar, NewTick, NewTrade, OrderUpdate), MonadRobot (..)) -import ATrade.RoboCom.Persistence (MonadPersistence (loadState)) +import ATrade.RoboCom.Persistence (MonadPersistence) import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId), Bars) import ATrade.Types (OrderId, OrderState, Trade) @@ -40,6 +42,7 @@ import Control.Monad (forM_, forever, void) import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Monad.Reader (MonadReader, ReaderT, asks) import Data.Aeson (FromJSON, ToJSON) +import Data.Default import Data.IORef (IORef, atomicModifyIORef', readIORef, writeIORef) import qualified Data.Map.Strict as M @@ -48,7 +51,8 @@ import Data.Time (UTCTime) import Dhall (FromDhall) import System.Log.Logger (infoM) -data RobotDriverHandle = forall c s. RobotDriverHandle (StrategyInstance c s) ThreadId ThreadId (BoundedChan RobotDriverEvent) +data RobotDriverHandle = forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) => + RobotDriverHandle (StrategyInstance c s) ThreadId ThreadId (BoundedChan RobotDriverEvent) data RobotDriverRequest @@ -80,6 +84,7 @@ createRobotDriverThread :: (MonadIO m1, ConfigStorage m1, MonadPersistence m1, QuoteStream m1, + Default s, FromJSON s, ToJSON s, FromDhall c, @@ -110,6 +115,9 @@ createRobotDriverThread instDesc strDesc runner bigConf rConf rState = do v <- readChan quoteQueue writeChan eventQueue (QuoteEvent v) +onStrategyInstance :: RobotDriverHandle -> forall r. (forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) => StrategyInstance c s -> r) -> r +onStrategyInstance (RobotDriverHandle inst _ _ _) f = f inst + data RobotEnv c s = RobotEnv { diff --git a/src/ATrade/Driver/Junction/Types.hs b/src/ATrade/Driver/Junction/Types.hs index 602ad4d..ac175e2 100644 --- a/src/ATrade/Driver/Junction/Types.hs +++ b/src/ATrade/Driver/Junction/Types.hs @@ -9,8 +9,10 @@ module ATrade.Driver.Junction.Types TickerConfig(..), StrategyInstanceDescriptor(..), StrategyInstance(..), - BigConfig(..) - ,StrategyDescriptorE(..)) where + BigConfig(..), + StrategyDescriptorE(..), + StrategyInstanceE(..) + ) where import ATrade.RoboCom.Monad (EventCallback) import ATrade.Types (BarTimeframe (..), TickerId) @@ -68,3 +70,5 @@ data StrategyInstance c s = strategyState :: IORef s, strategyConfig :: IORef c } + +data StrategyInstanceE = forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) => StrategyInstanceE (StrategyInstance c s)