Browse Source

Revert "Real driver refactoring"

This reverts commit b40c2966b7.
master
Denis Tereshkin 4 years ago
parent
commit
3d2c40e158
  1. 181
      src/ATrade/Driver/Real.hs

181
src/ATrade/Driver/Real.hs

@ -1,10 +1,7 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE RankNTypes #-} {-# LANGUAGE RankNTypes #-}
module ATrade.Driver.Real ( module ATrade.Driver.Real (
@ -14,103 +11,45 @@ module ATrade.Driver.Real (
barStrategyDriver barStrategyDriver
) where ) where
import ATrade.BarAggregator import Options.Applicative
import ATrade.Driver.Real.BrokerClientThread import System.IO
import ATrade.Driver.Real.QuoteSourceThread import System.Signal
import ATrade.Driver.Types (InitializationCallback, StrategyInstanceParams (..)) import System.Exit
import ATrade.Exceptions import System.Random
import ATrade.Quotes (MonadHistory (..), MonadInstrumentParametersSource (..)) import System.Log.Logger
import ATrade.Quotes.QHP as QQ import System.Log.Handler.Simple
import ATrade.Quotes.QTIS (TickerInfo (..), import System.Log.Handler (setFormatter)
qtisGetTickersInfo) import System.Log.Formatter
import ATrade.RoboCom.Monad (Event (..),
EventCallback,
MonadRobot (..),
StrategyEnvironment (..),
seBars, seLastTimestamp)
import ATrade.RoboCom.Types (BarSeries (..), InstrumentParameters (..),
Ticker (..),
Timeframe (..))
import ATrade.RoboCom.Utils (fromHMS)
import ATrade.Types
import Control.Concurrent hiding (readChan,
writeChan,
writeList2Chan, yield)
import Control.Concurrent.BoundedChan as BC
import Control.Exception.Safe
import Control.Lens hiding (Context, (.=))
import Control.Monad import Control.Monad
import Control.Monad.Reader import Control.Concurrent hiding (writeChan, readChan, writeList2Chan, yield)
import Data.Aeson import Control.Concurrent.BoundedChan as BC
import Control.Exception
import qualified Data.ByteString as BS import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BL import qualified Data.ByteString.Lazy as BL
import Data.IORef import qualified Data.List as L
import qualified Data.Map as M import qualified Data.Map as M
import Data.Maybe
import qualified Data.Text as T import qualified Data.Text as T
import Data.Text.Encoding import Data.Text.Encoding
import qualified Data.Text.Lazy as TL import Data.Aeson
import Data.IORef
import Data.Time.Calendar import Data.Time.Calendar
import Data.Time.Clock import Data.Time.Clock
import Data.Time.Clock.POSIX import Data.Time.Clock.POSIX
import Database.Redis hiding (decode, info) import Data.Maybe
import GHC.Generics import Data.Monoid
import Options.Applicative import Database.Redis hiding (info, decode)
import System.Exit import ATrade.Types
import System.IO import ATrade.RoboCom.Monad (StrategyMonad, StrategyAction(..), EventCallback, Event(..), runStrategyElement, StrategyEnvironment(..), Event(..))
import System.Log.Formatter import ATrade.BarAggregator
import System.Log.Handler (setFormatter) import ATrade.Driver.Real.BrokerClientThread
import System.Log.Handler.Simple import ATrade.Driver.Real.QuoteSourceThread
import System.Log.Logger import ATrade.Driver.Real.Types (Strategy(..), StrategyInstanceParams(..), InitializationCallback)
import System.Signal import ATrade.RoboCom.Types (BarSeries(..), Ticker(..), Timeframe(..))
import System.ZMQ4 hiding (Event (..)) import ATrade.Exceptions
import ATrade.Quotes.Finam as QF
import Ether.Reader import ATrade.Quotes.QHP as QQ
import ATrade.Quotes.HAP as QH
data RDriverEnv import System.ZMQ4 hiding (Event(..))
data DriverEnv c s = DriverEnv {
orderChan :: BC.BoundedChan BrokerCommand,
strategyConfig' :: c,
stateRef :: IORef s,
timersRef :: IORef [UTCTime],
environmentRef :: IORef StrategyEnvironment
}
type RealDriver c s = ReaderT RDriverEnv (DriverEnv c s) IO
runRealDriver env f = runReaderT @RDriverEnv f env
instance (MonadRobot (RealDriver c s) c s) where
submitOrder order = do
chan <- asks @RDriverEnv orderChan
liftIO $ BC.writeChan chan $ BrokerSubmitOrder order
cancelOrder oid = do
chan <- asks @RDriverEnv orderChan
liftIO $ BC.writeChan chan $ BrokerCancelOrder oid
appendToLog txt = liftIO $ infoM "Driver" (T.unpack txt)
setupTimer t = do
timers <- asks @RDriverEnv timersRef
liftIO $ atomicModifyIORef' timers (\ts -> (t : ts, ()))
enqueueIOAction = undefined
getConfig = asks @RDriverEnv strategyConfig'
getState = asks @RDriverEnv stateRef >>= liftIO . readIORef
setState newstate = do
s <- asks @RDriverEnv stateRef
liftIO $ atomicWriteIORef s newstate
modifyState f = do
st <- asks @RDriverEnv stateRef
liftIO $ atomicModifyIORef' st (\s -> (f s, ()))
getEnvironment = asks @RDriverEnv environmentRef >>= liftIO . readIORef
data Params = Params { data Params = Params {
instanceId :: String, instanceId :: String,
@ -445,46 +384,6 @@ barStrategyDriver downloadDelta instanceParams callback shutdownVar = do
writeChan brokerChan BrokerRequestNotifications writeChan brokerChan BrokerRequestNotifications
lift $ debugM "Strategy" "Wakeup thread forked" lift $ debugM "Strategy" "Wakeup thread forked"
<<<<<<< HEAD
wakeupTid <- forkIO $ forever $ do
maybeShutdown <- tryTakeMVar shutdownVar
if isJust maybeShutdown
then writeChan eventChan Shutdown
else do
threadDelay 1000000
writeChan ordersChan BrokerRequestNotifications
debugM "Strategy" "Wakeup thread forked"
let env = StrategyEnvironment {
seInstanceId = strategyInstanceId . strategyInstanceParams $ strategy,
seAccount = strategyAccount . strategyInstanceParams $ strategy,
seVolume = strategyVolume . strategyInstanceParams $ strategy,
seBars = M.empty,
seLastTimestamp = UTCTime (fromGregorian 1970 1 1) 0
}
envRef <- newIORef env
readAndHandleEvents agg ordersChan eventChan strategy envRef
debugM "Strategy" "Stopping strategy driver"
killThread wakeupTid)))
debugM "Strategy" "Strategy done"
where
qsEp = strategyQuotesourceEp . strategyInstanceParams $ strategy
brEp = strategyBrokerEp . strategyInstanceParams $ strategy
readAndHandleEvents agg ordersChan eventChan strategy' envRef = do
event <- readChan eventChan
if event /= Shutdown
then do
currentBars <- bars <$> readIORef agg
let params = strategyParams strategy'
let instId = strategyInstanceId . strategyInstanceParams $ strategy'
let acc = strategyAccount . strategyInstanceParams $ strategy'
let vol = strategyVolume . strategyInstanceParams $ strategy'
env <- readIORef envRef
let oldTimestamp = seLastTimestamp env
=======
readAndHandleEvents agg instanceParams readAndHandleEvents agg instanceParams
lift $ debugM "Strategy" "Stopping strategy driver" lift $ debugM "Strategy" "Stopping strategy driver"
lift $ killThread wakeupTid lift $ killThread wakeupTid
@ -502,7 +401,6 @@ barStrategyDriver downloadDelta instanceParams callback shutdownVar = do
if event /= Shutdown if event /= Shutdown
then do then do
env <- getEnvironment env <- getEnvironment
>>>>>>> stable
let newTimestamp = case event of let newTimestamp = case event of
NewTick tick -> timestamp tick NewTick tick -> timestamp tick
NewBar bar -> barTimestamp bar NewBar bar -> barTimestamp bar
@ -510,16 +408,18 @@ barStrategyDriver downloadDelta instanceParams callback shutdownVar = do
nowRef <- asks envLastTimestamp nowRef <- asks envLastTimestamp
lift $ writeIORef nowRef newTimestamp lift $ writeIORef nowRef newTimestamp
<<<<<<< HEAD
newTimers <- catMaybes <$> (readIORef timersRef >>= mapM (checkTimer eventChan newTimestamp)) newTimers <- catMaybes <$> (readIORef timersRef >>= mapM (checkTimer eventChan newTimestamp))
atomicWriteIORef timersRef newTimers atomicWriteIORef timersRef newTimers
atomicModifyIORef' envRef (\e -> (e { seBars = currentBars, seLastTimestamp = newTimestamp }, ())) let !newenv = env { seBars = currentBars, seLastTimestamp = newTimestamp }
runRealDriver (DriverEnv ordersChan params stateRef timersRef envRef) $ (eventCallback strategy) event let (!newState, !actions, _) = runStrategyElement params curState newenv $ (eventCallback strategy) event
writeIORef stateRef newState
writeIORef timersRef newTimers
readAndHandleEvents agg ordersChan eventChan strategy' envRef newTimers' <- catMaybes <$> mapM handleTimerActions actions
mapM_ (handleActions ordersChan) actions
readAndHandleEvents agg ordersChan eventChan (strategy' { currentState = newState, strategyTimers = newTimers ++ newTimers' }) newenv
else debugM "Strategy" "Shutdown requested" else debugM "Strategy" "Shutdown requested"
=======
timersRef <- asks envTimers timersRef <- asks envTimers
oldTimers <- lift $ readIORef timersRef oldTimers <- lift $ readIORef timersRef
newTimers <- catMaybes <$> mapM (checkTimer eventChan newTimestamp) oldTimers newTimers <- catMaybes <$> mapM (checkTimer eventChan newTimestamp) oldTimers
@ -529,7 +429,6 @@ barStrategyDriver downloadDelta instanceParams callback shutdownVar = do
readAndHandleEvents agg instanceParams' readAndHandleEvents agg instanceParams'
else else
lift $ debugM "Strategy" "Shutdown requested" lift $ debugM "Strategy" "Shutdown requested"
>>>>>>> stable
where where
checkTimer eventChan' newTimestamp timerTime = checkTimer eventChan' newTimestamp timerTime =

Loading…
Cancel
Save