From 3d2c40e158c005147ae8601ba431a1ea6a588695 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Wed, 22 Dec 2021 20:32:50 +0700 Subject: [PATCH] Revert "Real driver refactoring" This reverts commit b40c2966b7319fc7c3f0dd19f2c8bbce79c782e1. --- src/ATrade/Driver/Real.hs | 203 ++++++++++---------------------------- 1 file changed, 51 insertions(+), 152 deletions(-) diff --git a/src/ATrade/Driver/Real.hs b/src/ATrade/Driver/Real.hs index 9661b27..5a706d5 100644 --- a/src/ATrade/Driver/Real.hs +++ b/src/ATrade/Driver/Real.hs @@ -1,11 +1,8 @@ -{-# LANGUAGE CPP #-} -{-# LANGUAGE DeriveGeneric #-} -{-# LANGUAGE FlexibleContexts #-} -{-# LANGUAGE FlexibleInstances #-} -{-# LANGUAGE MultiParamTypeClasses #-} -{-# LANGUAGE MultiWayIf #-} -{-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE MultiWayIf #-} +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE CPP #-} +{-# LANGUAGE RankNTypes #-} module ATrade.Driver.Real ( StrategyInstanceParams(..), @@ -14,103 +11,45 @@ module ATrade.Driver.Real ( barStrategyDriver ) where -import ATrade.BarAggregator -import ATrade.Driver.Real.BrokerClientThread -import ATrade.Driver.Real.QuoteSourceThread -import ATrade.Driver.Types (InitializationCallback, StrategyInstanceParams (..)) -import ATrade.Exceptions -import ATrade.Quotes (MonadHistory (..), MonadInstrumentParametersSource (..)) -import ATrade.Quotes.QHP as QQ -import ATrade.Quotes.QTIS (TickerInfo (..), - qtisGetTickersInfo) -import ATrade.RoboCom.Monad (Event (..), - EventCallback, - MonadRobot (..), - StrategyEnvironment (..), - seBars, seLastTimestamp) -import ATrade.RoboCom.Types (BarSeries (..), InstrumentParameters (..), - Ticker (..), - Timeframe (..)) -import ATrade.RoboCom.Utils (fromHMS) -import ATrade.Types -import Control.Concurrent hiding (readChan, - writeChan, - writeList2Chan, yield) -import Control.Concurrent.BoundedChan as BC -import Control.Exception.Safe -import Control.Lens hiding (Context, (.=)) -import Control.Monad -import Control.Monad.Reader -import Data.Aeson -import qualified Data.ByteString as BS -import qualified Data.ByteString.Lazy as BL -import Data.IORef -import qualified Data.Map as M -import Data.Maybe -import qualified Data.Text as T -import Data.Text.Encoding -import qualified Data.Text.Lazy as TL -import Data.Time.Calendar -import Data.Time.Clock -import Data.Time.Clock.POSIX -import Database.Redis hiding (decode, info) -import GHC.Generics -import Options.Applicative -import System.Exit -import System.IO -import System.Log.Formatter -import System.Log.Handler (setFormatter) -import System.Log.Handler.Simple -import System.Log.Logger -import System.Signal -import System.ZMQ4 hiding (Event (..)) - -import Ether.Reader - -data RDriverEnv - -data DriverEnv c s = DriverEnv { - orderChan :: BC.BoundedChan BrokerCommand, - strategyConfig' :: c, - stateRef :: IORef s, - timersRef :: IORef [UTCTime], - environmentRef :: IORef StrategyEnvironment - } - -type RealDriver c s = ReaderT RDriverEnv (DriverEnv c s) IO - -runRealDriver env f = runReaderT @RDriverEnv f env - -instance (MonadRobot (RealDriver c s) c s) where - submitOrder order = do - chan <- asks @RDriverEnv orderChan - liftIO $ BC.writeChan chan $ BrokerSubmitOrder order - - cancelOrder oid = do - chan <- asks @RDriverEnv orderChan - liftIO $ BC.writeChan chan $ BrokerCancelOrder oid - - appendToLog txt = liftIO $ infoM "Driver" (T.unpack txt) - - setupTimer t = do - timers <- asks @RDriverEnv timersRef - liftIO $ atomicModifyIORef' timers (\ts -> (t : ts, ())) - - enqueueIOAction = undefined - - getConfig = asks @RDriverEnv strategyConfig' - - getState = asks @RDriverEnv stateRef >>= liftIO . readIORef - - setState newstate = do - s <- asks @RDriverEnv stateRef - liftIO $ atomicWriteIORef s newstate - - modifyState f = do - st <- asks @RDriverEnv stateRef - liftIO $ atomicModifyIORef' st (\s -> (f s, ())) - - getEnvironment = asks @RDriverEnv environmentRef >>= liftIO . readIORef +import Options.Applicative +import System.IO +import System.Signal +import System.Exit +import System.Random +import System.Log.Logger +import System.Log.Handler.Simple +import System.Log.Handler (setFormatter) +import System.Log.Formatter +import Control.Monad +import Control.Concurrent hiding (writeChan, readChan, writeList2Chan, yield) +import Control.Concurrent.BoundedChan as BC +import Control.Exception +import qualified Data.ByteString as BS +import qualified Data.ByteString.Lazy as BL +import qualified Data.List as L +import qualified Data.Map as M +import qualified Data.Text as T +import Data.Text.Encoding +import Data.Aeson +import Data.IORef +import Data.Time.Calendar +import Data.Time.Clock +import Data.Time.Clock.POSIX +import Data.Maybe +import Data.Monoid +import Database.Redis hiding (info, decode) +import ATrade.Types +import ATrade.RoboCom.Monad (StrategyMonad, StrategyAction(..), EventCallback, Event(..), runStrategyElement, StrategyEnvironment(..), Event(..)) +import ATrade.BarAggregator +import ATrade.Driver.Real.BrokerClientThread +import ATrade.Driver.Real.QuoteSourceThread +import ATrade.Driver.Real.Types (Strategy(..), StrategyInstanceParams(..), InitializationCallback) +import ATrade.RoboCom.Types (BarSeries(..), Ticker(..), Timeframe(..)) +import ATrade.Exceptions +import ATrade.Quotes.Finam as QF +import ATrade.Quotes.QHP as QQ +import ATrade.Quotes.HAP as QH +import System.ZMQ4 hiding (Event(..)) data Params = Params { instanceId :: String, @@ -445,46 +384,6 @@ barStrategyDriver downloadDelta instanceParams callback shutdownVar = do writeChan brokerChan BrokerRequestNotifications lift $ debugM "Strategy" "Wakeup thread forked" -<<<<<<< HEAD - wakeupTid <- forkIO $ forever $ do - maybeShutdown <- tryTakeMVar shutdownVar - if isJust maybeShutdown - then writeChan eventChan Shutdown - else do - threadDelay 1000000 - writeChan ordersChan BrokerRequestNotifications - debugM "Strategy" "Wakeup thread forked" - - let env = StrategyEnvironment { - seInstanceId = strategyInstanceId . strategyInstanceParams $ strategy, - seAccount = strategyAccount . strategyInstanceParams $ strategy, - seVolume = strategyVolume . strategyInstanceParams $ strategy, - seBars = M.empty, - seLastTimestamp = UTCTime (fromGregorian 1970 1 1) 0 - } - envRef <- newIORef env - readAndHandleEvents agg ordersChan eventChan strategy envRef - debugM "Strategy" "Stopping strategy driver" - killThread wakeupTid))) - - debugM "Strategy" "Strategy done" - - where - qsEp = strategyQuotesourceEp . strategyInstanceParams $ strategy - brEp = strategyBrokerEp . strategyInstanceParams $ strategy - readAndHandleEvents agg ordersChan eventChan strategy' envRef = do - event <- readChan eventChan - if event /= Shutdown - then do - currentBars <- bars <$> readIORef agg - let params = strategyParams strategy' - let instId = strategyInstanceId . strategyInstanceParams $ strategy' - let acc = strategyAccount . strategyInstanceParams $ strategy' - let vol = strategyVolume . strategyInstanceParams $ strategy' - - env <- readIORef envRef - let oldTimestamp = seLastTimestamp env -======= readAndHandleEvents agg instanceParams lift $ debugM "Strategy" "Stopping strategy driver" lift $ killThread wakeupTid @@ -502,7 +401,6 @@ barStrategyDriver downloadDelta instanceParams callback shutdownVar = do if event /= Shutdown then do env <- getEnvironment ->>>>>>> stable let newTimestamp = case event of NewTick tick -> timestamp tick NewBar bar -> barTimestamp bar @@ -510,16 +408,18 @@ barStrategyDriver downloadDelta instanceParams callback shutdownVar = do nowRef <- asks envLastTimestamp lift $ writeIORef nowRef newTimestamp -<<<<<<< HEAD newTimers <- catMaybes <$> (readIORef timersRef >>= mapM (checkTimer eventChan newTimestamp)) atomicWriteIORef timersRef newTimers - atomicModifyIORef' envRef (\e -> (e { seBars = currentBars, seLastTimestamp = newTimestamp }, ())) - runRealDriver (DriverEnv ordersChan params stateRef timersRef envRef) $ (eventCallback strategy) event + let !newenv = env { seBars = currentBars, seLastTimestamp = newTimestamp } + let (!newState, !actions, _) = runStrategyElement params curState newenv $ (eventCallback strategy) event + writeIORef stateRef newState + writeIORef timersRef newTimers - readAndHandleEvents agg ordersChan eventChan strategy' envRef + newTimers' <- catMaybes <$> mapM handleTimerActions actions + mapM_ (handleActions ordersChan) actions + readAndHandleEvents agg ordersChan eventChan (strategy' { currentState = newState, strategyTimers = newTimers ++ newTimers' }) newenv else debugM "Strategy" "Shutdown requested" -======= timersRef <- asks envTimers oldTimers <- lift $ readIORef timersRef newTimers <- catMaybes <$> mapM (checkTimer eventChan newTimestamp) oldTimers @@ -529,7 +429,6 @@ barStrategyDriver downloadDelta instanceParams callback shutdownVar = do readAndHandleEvents agg instanceParams' else lift $ debugM "Strategy" "Shutdown requested" ->>>>>>> stable where checkTimer eventChan' newTimestamp timerTime =