diff --git a/src/ATrade/Driver/Real.hs b/src/ATrade/Driver/Real.hs index e415dea..a0f5acf 100644 --- a/src/ATrade/Driver/Real.hs +++ b/src/ATrade/Driver/Real.hs @@ -3,6 +3,10 @@ {-# LANGUAGE BangPatterns #-} {-# LANGUAGE CPP #-} {-# LANGUAGE RankNTypes #-} +{-# LANGUAGE TypeApplications #-} +{-# LANGUAGE TypeSynonymInstances #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE MultiParamTypeClasses #-} module ATrade.Driver.Real ( Strategy(..), @@ -23,6 +27,7 @@ import System.Log.Handler.Simple import System.Log.Handler (setFormatter) import System.Log.Formatter import Control.Monad +import Control.Monad.IO.Class import Control.Concurrent hiding (writeChan, readChan, writeList2Chan, yield) import Control.Concurrent.BoundedChan as BC import Control.Exception @@ -41,7 +46,7 @@ import Data.Maybe import Data.Monoid import Database.Redis hiding (info, decode) import ATrade.Types -import ATrade.RoboCom.Monad (StrategyMonad, StrategyAction(..), EventCallback, Event(..), runStrategyElement, StrategyEnvironment(..), Event(..)) +import ATrade.RoboCom.Monad (StrategyMonad, StrategyAction(..), EventCallback, Event(..), runStrategyElement, StrategyEnvironment(..), Event(..), MonadRobot(..)) import ATrade.BarAggregator import ATrade.Driver.Real.BrokerClientThread import ATrade.Driver.Real.QuoteSourceThread @@ -53,6 +58,53 @@ import ATrade.Quotes.QHP as QQ import ATrade.Quotes.HAP as QH import System.ZMQ4 hiding (Event(..)) +import Ether.Reader + +data RDriverEnv + +data DriverEnv c s = DriverEnv { + orderChan :: BC.BoundedChan BrokerCommand, + strategyConfig' :: c, + stateRef :: IORef s, + timersRef :: IORef [UTCTime], + environmentRef :: IORef StrategyEnvironment + } + +type RealDriver c s = ReaderT RDriverEnv (DriverEnv c s) IO + +runRealDriver env f = runReaderT @RDriverEnv f env + +instance (MonadRobot (RealDriver c s) c s) where + submitOrder order = do + chan <- asks @RDriverEnv orderChan + liftIO $ BC.writeChan chan $ BrokerSubmitOrder order + + cancelOrder oid = do + chan <- asks @RDriverEnv orderChan + liftIO $ BC.writeChan chan $ BrokerCancelOrder oid + + appendToLog txt = liftIO $ infoM "Driver" (T.unpack txt) + + setupTimer t = do + timers <- asks @RDriverEnv timersRef + liftIO $ atomicModifyIORef' timers (\ts -> (t : ts, ())) + + enqueueIOAction = undefined + + getConfig = asks @RDriverEnv strategyConfig' + + getState = asks @RDriverEnv stateRef >>= liftIO . readIORef + + setState newstate = do + s <- asks @RDriverEnv stateRef + liftIO $ atomicWriteIORef s newstate + + modifyState f = do + st <- asks @RDriverEnv stateRef + liftIO $ atomicModifyIORef' st (\s -> (f s, ())) + + getEnvironment = asks @RDriverEnv environmentRef >>= liftIO . readIORef + data Params = Params { instanceId :: String, strategyConfigFile :: FilePath, @@ -333,7 +385,8 @@ barStrategyDriver mbSourceTimeframe tickFilter strategy stateRef timersRef shutd seBars = M.empty, seLastTimestamp = UTCTime (fromGregorian 1970 1 1) 0 } - readAndHandleEvents agg ordersChan eventChan strategy env + envRef <- newIORef env + readAndHandleEvents agg ordersChan eventChan strategy envRef debugM "Strategy" "Stopping strategy driver" killThread wakeupTid))) @@ -342,32 +395,29 @@ barStrategyDriver mbSourceTimeframe tickFilter strategy stateRef timersRef shutd where qsEp = strategyQuotesourceEp . strategyInstanceParams $ strategy brEp = strategyBrokerEp . strategyInstanceParams $ strategy - readAndHandleEvents agg ordersChan eventChan strategy' env = do + readAndHandleEvents agg ordersChan eventChan strategy' envRef = do event <- readChan eventChan if event /= Shutdown then do currentBars <- bars <$> readIORef agg let params = strategyParams strategy' - let curState = currentState strategy' let instId = strategyInstanceId . strategyInstanceParams $ strategy' let acc = strategyAccount . strategyInstanceParams $ strategy' let vol = strategyVolume . strategyInstanceParams $ strategy' + env <- readIORef envRef let oldTimestamp = seLastTimestamp env let newTimestamp = case event of NewTick tick -> timestamp tick _ -> seLastTimestamp env - newTimers <- catMaybes <$> (mapM (checkTimer eventChan newTimestamp) $ strategyTimers strategy') + newTimers <- catMaybes <$> (readIORef timersRef >>= mapM (checkTimer eventChan newTimestamp)) + atomicWriteIORef timersRef newTimers - let !newenv = env { seBars = currentBars, seLastTimestamp = newTimestamp } - let (!newState, !actions, _) = runStrategyElement params curState newenv $ (eventCallback strategy) event - writeIORef stateRef newState - writeIORef timersRef newTimers + atomicModifyIORef' envRef (\e -> (e { seBars = currentBars, seLastTimestamp = newTimestamp }, ())) + runRealDriver (DriverEnv ordersChan params stateRef timersRef envRef) $ (eventCallback strategy) event - newTimers' <- catMaybes <$> mapM handleTimerActions actions - mapM_ (handleActions ordersChan) actions - readAndHandleEvents agg ordersChan eventChan (strategy' { currentState = newState, strategyTimers = newTimers ++ newTimers' }) newenv + readAndHandleEvents agg ordersChan eventChan strategy' envRef else debugM "Strategy" "Shutdown requested" where handleTimerActions action =