Browse Source

Real driver refactoring

master
Denis Tereshkin 7 years ago
parent
commit
b40c2966b7
  1. 74
      src/ATrade/Driver/Real.hs

74
src/ATrade/Driver/Real.hs

@ -3,6 +3,10 @@ @@ -3,6 +3,10 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeSynonymInstances #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
module ATrade.Driver.Real (
Strategy(..),
@ -23,6 +27,7 @@ import System.Log.Handler.Simple @@ -23,6 +27,7 @@ import System.Log.Handler.Simple
import System.Log.Handler (setFormatter)
import System.Log.Formatter
import Control.Monad
import Control.Monad.IO.Class
import Control.Concurrent hiding (writeChan, readChan, writeList2Chan, yield)
import Control.Concurrent.BoundedChan as BC
import Control.Exception
@ -41,7 +46,7 @@ import Data.Maybe @@ -41,7 +46,7 @@ import Data.Maybe
import Data.Monoid
import Database.Redis hiding (info, decode)
import ATrade.Types
import ATrade.RoboCom.Monad (StrategyMonad, StrategyAction(..), EventCallback, Event(..), runStrategyElement, StrategyEnvironment(..), Event(..))
import ATrade.RoboCom.Monad (StrategyMonad, StrategyAction(..), EventCallback, Event(..), runStrategyElement, StrategyEnvironment(..), Event(..), MonadRobot(..))
import ATrade.BarAggregator
import ATrade.Driver.Real.BrokerClientThread
import ATrade.Driver.Real.QuoteSourceThread
@ -53,6 +58,53 @@ import ATrade.Quotes.QHP as QQ @@ -53,6 +58,53 @@ import ATrade.Quotes.QHP as QQ
import ATrade.Quotes.HAP as QH
import System.ZMQ4 hiding (Event(..))
import Ether.Reader
data RDriverEnv
data DriverEnv c s = DriverEnv {
orderChan :: BC.BoundedChan BrokerCommand,
strategyConfig' :: c,
stateRef :: IORef s,
timersRef :: IORef [UTCTime],
environmentRef :: IORef StrategyEnvironment
}
type RealDriver c s = ReaderT RDriverEnv (DriverEnv c s) IO
runRealDriver env f = runReaderT @RDriverEnv f env
instance (MonadRobot (RealDriver c s) c s) where
submitOrder order = do
chan <- asks @RDriverEnv orderChan
liftIO $ BC.writeChan chan $ BrokerSubmitOrder order
cancelOrder oid = do
chan <- asks @RDriverEnv orderChan
liftIO $ BC.writeChan chan $ BrokerCancelOrder oid
appendToLog txt = liftIO $ infoM "Driver" (T.unpack txt)
setupTimer t = do
timers <- asks @RDriverEnv timersRef
liftIO $ atomicModifyIORef' timers (\ts -> (t : ts, ()))
enqueueIOAction = undefined
getConfig = asks @RDriverEnv strategyConfig'
getState = asks @RDriverEnv stateRef >>= liftIO . readIORef
setState newstate = do
s <- asks @RDriverEnv stateRef
liftIO $ atomicWriteIORef s newstate
modifyState f = do
st <- asks @RDriverEnv stateRef
liftIO $ atomicModifyIORef' st (\s -> (f s, ()))
getEnvironment = asks @RDriverEnv environmentRef >>= liftIO . readIORef
data Params = Params {
instanceId :: String,
strategyConfigFile :: FilePath,
@ -333,7 +385,8 @@ barStrategyDriver mbSourceTimeframe tickFilter strategy stateRef timersRef shutd @@ -333,7 +385,8 @@ barStrategyDriver mbSourceTimeframe tickFilter strategy stateRef timersRef shutd
seBars = M.empty,
seLastTimestamp = UTCTime (fromGregorian 1970 1 1) 0
}
readAndHandleEvents agg ordersChan eventChan strategy env
envRef <- newIORef env
readAndHandleEvents agg ordersChan eventChan strategy envRef
debugM "Strategy" "Stopping strategy driver"
killThread wakeupTid)))
@ -342,32 +395,29 @@ barStrategyDriver mbSourceTimeframe tickFilter strategy stateRef timersRef shutd @@ -342,32 +395,29 @@ barStrategyDriver mbSourceTimeframe tickFilter strategy stateRef timersRef shutd
where
qsEp = strategyQuotesourceEp . strategyInstanceParams $ strategy
brEp = strategyBrokerEp . strategyInstanceParams $ strategy
readAndHandleEvents agg ordersChan eventChan strategy' env = do
readAndHandleEvents agg ordersChan eventChan strategy' envRef = do
event <- readChan eventChan
if event /= Shutdown
then do
currentBars <- bars <$> readIORef agg
let params = strategyParams strategy'
let curState = currentState strategy'
let instId = strategyInstanceId . strategyInstanceParams $ strategy'
let acc = strategyAccount . strategyInstanceParams $ strategy'
let vol = strategyVolume . strategyInstanceParams $ strategy'
env <- readIORef envRef
let oldTimestamp = seLastTimestamp env
let newTimestamp = case event of
NewTick tick -> timestamp tick
_ -> seLastTimestamp env
newTimers <- catMaybes <$> (mapM (checkTimer eventChan newTimestamp) $ strategyTimers strategy')
newTimers <- catMaybes <$> (readIORef timersRef >>= mapM (checkTimer eventChan newTimestamp))
atomicWriteIORef timersRef newTimers
let !newenv = env { seBars = currentBars, seLastTimestamp = newTimestamp }
let (!newState, !actions, _) = runStrategyElement params curState newenv $ (eventCallback strategy) event
writeIORef stateRef newState
writeIORef timersRef newTimers
atomicModifyIORef' envRef (\e -> (e { seBars = currentBars, seLastTimestamp = newTimestamp }, ()))
runRealDriver (DriverEnv ordersChan params stateRef timersRef envRef) $ (eventCallback strategy) event
newTimers' <- catMaybes <$> mapM handleTimerActions actions
mapM_ (handleActions ordersChan) actions
readAndHandleEvents agg ordersChan eventChan (strategy' { currentState = newState, strategyTimers = newTimers ++ newTimers' }) newenv
readAndHandleEvents agg ordersChan eventChan strategy' envRef
else debugM "Strategy" "Shutdown requested"
where
handleTimerActions action =

Loading…
Cancel
Save