Browse Source

Refactoring: removed Strategy record

stable
Denis Tereshkin 5 years ago
parent
commit
23f5e0ab8c
  1. 217
      src/ATrade/Driver/Real.hs
  2. 6
      src/ATrade/Driver/Real/QuoteSourceThread.hs
  3. 12
      src/ATrade/Driver/Types.hs
  4. 4
      src/ATrade/RoboCom/Utils.hs

217
src/ATrade/Driver/Real.hs

@ -1,74 +1,81 @@ @@ -1,74 +1,81 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
module ATrade.Driver.Real (
Strategy(..),
StrategyInstanceParams(..),
robotMain,
BigConfig(..),
mkBarStrategy,
barStrategyDriver
) where
import Options.Applicative
import System.IO
import System.Signal
import System.Exit
import System.Random
import System.Log.Logger
import System.Log.Handler.Simple
import System.Log.Handler (setFormatter)
import System.Log.Formatter
import Control.Monad
import Control.Monad.Reader
import Control.Concurrent hiding (writeChan, readChan, writeList2Chan, yield)
import Control.Concurrent.BoundedChan as BC
import Control.Exception.Safe
import Control.Lens hiding (Context, (.=))
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BL
import qualified Data.Map as M
import qualified Data.Text as T
import Data.Text.Encoding
import Data.Aeson
import Data.IORef
import Data.Time.Calendar
import Data.Time.Clock
import Data.Time.Clock.POSIX
import Data.Maybe
import Database.Redis hiding (info, decode)
import ATrade.Types
import ATrade.Quotes
import ATrade.RoboCom.Monad (EventCallback, Event(..), StrategyEnvironment(..), seBars, seLastTimestamp, Event(..), MonadRobot(..))
import ATrade.BarAggregator
import ATrade.Driver.Real.BrokerClientThread
import ATrade.Driver.Real.QuoteSourceThread
import ATrade.Driver.Types (Strategy(..), StrategyInstanceParams(..), InitializationCallback)
import ATrade.RoboCom.Types (BarSeries(..), Ticker(..), Timeframe(..))
import ATrade.Exceptions
import ATrade.Quotes.QHP as QQ
import System.ZMQ4 hiding (Event(..))
import GHC.Generics
import ATrade.BarAggregator
import ATrade.Driver.Real.BrokerClientThread
import ATrade.Driver.Real.QuoteSourceThread
import ATrade.Driver.Types (InitializationCallback, StrategyInstanceParams (..))
import ATrade.Exceptions
import ATrade.Quotes
import ATrade.Quotes.QHP as QQ
import ATrade.RoboCom.Monad (Event (..),
EventCallback,
MonadRobot (..),
StrategyEnvironment (..),
seBars, seLastTimestamp)
import ATrade.RoboCom.Types (BarSeries (..),
Ticker (..),
Timeframe (..))
import ATrade.RoboCom.Utils (fromHMS)
import ATrade.Types
import Control.Concurrent hiding (readChan,
writeChan,
writeList2Chan, yield)
import Control.Concurrent.BoundedChan as BC
import Control.Exception.Safe
import Control.Lens hiding (Context, (.=))
import Control.Monad
import Control.Monad.Reader
import Data.Aeson
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BL
import Data.IORef
import qualified Data.Map as M
import Data.Maybe
import qualified Data.Text as T
import Data.Text.Encoding
import Data.Time.Calendar
import Data.Time.Clock
import Data.Time.Clock.POSIX
import Database.Redis hiding (decode, info)
import GHC.Generics
import Options.Applicative
import System.Exit
import System.IO
import System.Log.Formatter
import System.Log.Handler (setFormatter)
import System.Log.Handler.Simple
import System.Log.Logger
import System.Random
import System.Signal
import System.ZMQ4 hiding (Event (..))
data Params = Params {
instanceId :: String,
strategyConfigFile :: FilePath,
strategyStateFile :: FilePath,
brokerEp :: String,
quotesourceEp :: String,
instanceId :: String,
strategyConfigFile :: FilePath,
strategyStateFile :: FilePath,
brokerEp :: String,
quotesourceEp :: String,
historyProviderType :: Maybe String,
historyProvider :: Maybe String,
redisSocket :: Maybe String,
qtisSocket :: Maybe String,
accountId :: String,
volumeFactor :: Int,
sourceBarTimeframe :: Maybe Int
historyProvider :: Maybe String,
redisSocket :: Maybe String,
qtisSocket :: Maybe String,
accountId :: String,
volumeFactor :: Int,
sourceBarTimeframe :: Maybe Int
} deriving (Show, Eq)
paramsParser :: Parser Params
@ -111,16 +118,16 @@ paramsParser = Params @@ -111,16 +118,16 @@ paramsParser = Params
<> metavar "SECONDS" ))
data Env historySource c s = Env {
envHistorySource :: historySource,
envHistorySource :: historySource,
envStrategyInstanceParams :: StrategyInstanceParams,
envStrategyEnvironment :: IORef StrategyEnvironment,
envConfigRef :: IORef c,
envStateRef :: IORef s,
envBrokerChan :: BC.BoundedChan BrokerCommand,
envTimers :: IORef [UTCTime],
envEventChan :: BC.BoundedChan Event,
envAggregator :: IORef BarAggregator,
envLastTimestamp :: IORef UTCTime
envStrategyEnvironment :: IORef StrategyEnvironment,
envConfigRef :: IORef c,
envStateRef :: IORef s,
envBrokerChan :: BC.BoundedChan BrokerCommand,
envTimers :: IORef [UTCTime],
envEventChan :: BC.BoundedChan Event,
envAggregator :: IORef BarAggregator,
envLastTimestamp :: IORef UTCTime
} deriving (Generic)
type App historySource c s = ReaderT (Env historySource c s) IO
@ -166,7 +173,7 @@ instance MonadHistory (App QQ.QHPHandle c s) where @@ -166,7 +173,7 @@ instance MonadHistory (App QQ.QHPHandle c s) where
QQ.requestHistoryFromQHP qhp tickerId timeframe fromTime toTime
data BigConfig c = BigConfig {
confTickers :: [Ticker],
confTickers :: [Ticker],
strategyConfig :: c
}
@ -187,7 +194,7 @@ storeState params stateRef timersRef = do @@ -187,7 +194,7 @@ storeState params stateRef timersRef = do
Nothing -> withFile (strategyStateFile params) WriteMode (\f -> BS.hPut f $ BL.toStrict $ encode currentStrategyState)
`catch` (\e -> warningM "main" ("Unable to save state: " ++ show (e :: IOException)))
Just sock -> do
#ifdef linux_HOST_OS
conn <- checkedConnect $ defaultConnectInfo { connectPort = UnixSocket sock }
now <- getPOSIXTime
res <- runRedis conn $ mset [(encodeUtf8 $ T.pack $ instanceId params, BL.toStrict $ encode currentStrategyState),
@ -195,11 +202,11 @@ storeState params stateRef timersRef = do @@ -195,11 +202,11 @@ storeState params stateRef timersRef = do
(encodeUtf8 $ T.pack $ instanceId params ++ ":timers", BL.toStrict $ encode currentTimersState) ]
case res of
Left _ -> warningM "main" "Unable to save state"
Left _ -> warningM "main" "Unable to save state"
Right _ -> return ()
#else
return ()
#endif
gracefulShutdown :: (ToJSON s) => Params -> IORef s -> IORef [UTCTime] -> MVar () -> Signal -> IO ()
gracefulShutdown params stateRef timersRef shutdownMv _ = do
@ -229,24 +236,21 @@ robotMain dataDownloadDelta defaultState initCallback callback = do @@ -229,24 +236,21 @@ robotMain dataDownloadDelta defaultState initCallback callback = do
Just cb -> cb config instanceParams
Nothing -> return config
let strategy = mkBarStrategy instanceParams dataDownloadDelta updatedConfig stratState callback
stateRef <- newIORef stratState
configRef <- newIORef updatedConfig
timersRef <- newIORef timersState
shutdownMv <- newEmptyMVar
installHandler sigINT (gracefulShutdown params stateRef timersRef shutdownMv)
installHandler sigTERM (gracefulShutdown params stateRef timersRef shutdownMv)
randsec <- getStdRandom(randomR(1, 10))
threadDelay $ randsec * 1000000
debugM "main" "Forking state saving thread"
stateSavingThread <- forkIO $ forever $ do
threadDelay 1000000
storeState params stateRef timersRef
straEnv <- newIORef StrategyEnvironment {
_seInstanceId = strategyInstanceId . strategyInstanceParams $ strategy,
_seAccount = strategyAccount . strategyInstanceParams $ strategy,
_seVolume = strategyVolume . strategyInstanceParams $ strategy,
_seInstanceId = strategyInstanceId instanceParams,
_seAccount = strategyAccount instanceParams,
_seVolume = strategyVolume instanceParams,
_seBars = M.empty,
_seLastTimestamp = UTCTime (fromGregorian 1970 1 1) 0
}
@ -261,9 +265,9 @@ robotMain dataDownloadDelta defaultState initCallback callback = do @@ -261,9 +265,9 @@ robotMain dataDownloadDelta defaultState initCallback callback = do
let qsEp = T.pack $ quotesourceEp params
let brEp = T.pack $ brokerEp params
agg <- newIORef $ mkAggregatorFromBars M.empty [(hmsToDiffTime 3 50 0, hmsToDiffTime 21 10 0)]
bracket (startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter (sourceBarTimeframe params)) killThread $ \_ -> do
bracket (startQuoteSourceThread ctx qsEp instanceParams eventChan agg tickFilter (sourceBarTimeframe params)) killThread $ \_ -> do
debugM "Strategy" "QuoteSource thread forked"
bracket (startBrokerClientThread (strategyInstanceId . strategyInstanceParams $ strategy) ctx brEp brokerChan eventChan shutdownMv) killThread $ \_ -> do
bracket (startBrokerClientThread (strategyInstanceId instanceParams) ctx brEp brokerChan eventChan shutdownMv) killThread $ \_ -> do
debugM "Strategy" "Broker thread forked"
now <- getCurrentTime >>= newIORef
@ -280,7 +284,7 @@ robotMain dataDownloadDelta defaultState initCallback callback = do @@ -280,7 +284,7 @@ robotMain dataDownloadDelta defaultState initCallback callback = do
envAggregator = agg,
envLastTimestamp = now
}
runReaderT (barStrategyDriver strategy shutdownMv) env `finally` killThread stateSavingThread)
runReaderT (barStrategyDriver dataDownloadDelta instanceParams callback shutdownMv) env `finally` killThread stateSavingThread)
where
tickFilter :: Tick -> Bool
tickFilter tick =
@ -292,7 +296,6 @@ robotMain dataDownloadDelta defaultState initCallback callback = do @@ -292,7 +296,6 @@ robotMain dataDownloadDelta defaultState initCallback callback = do
fortsIntervals = [(fromHMS 4 0 0, fromHMS 11 0 0), (fromHMS 11 5 0, fromHMS 15 45 0), (fromHMS 16 0 0, fromHMS 20 50 0)]
secIntervals = [(fromHMS 6 50 0, fromHMS 15 51 0)]
fromHMS h m s = h * 3600 + m * 60 + s
inInterval ts (start, end) = ts >= start && ts <= end
opts = info (helper <*> paramsParser)
@ -311,14 +314,14 @@ robotMain dataDownloadDelta defaultState initCallback callback = do @@ -311,14 +314,14 @@ robotMain dataDownloadDelta defaultState initCallback callback = do
loadStrategyConfig params = withFile (strategyConfigFile params) ReadMode (\f -> do
bigconfig <- eitherDecode . BL.fromStrict <$> BS.hGetContents f
case bigconfig of
Right conf -> return (confTickers conf, strategyConfig conf)
Right conf -> return (confTickers conf, strategyConfig conf)
Left errmsg -> throw $ UnableToLoadConfig $ (T.pack . show) errmsg)
loadStrategyTimers :: Params -> IO [UTCTime]
loadStrategyTimers params = case redisSocket params of
Nothing -> return []
Just sock -> do
#ifdef linux_HOST_OS
conn <- checkedConnect $ defaultConnectInfo { connectPort = UnixSocket sock }
res <- runRedis conn $ get (encodeUtf8 $ T.pack $ instanceId params ++ ":timers")
case res of
@ -334,15 +337,11 @@ robotMain dataDownloadDelta defaultState initCallback callback = do @@ -334,15 +337,11 @@ robotMain dataDownloadDelta defaultState initCallback callback = do
Nothing -> do
warningM "main" "Unable to load state"
return []
#else
error "Not implemented"
#endif
loadStrategyState params = case redisSocket params of
Nothing -> loadStateFromFile (strategyStateFile params)
Just sock -> do
#ifdef linux_HOST_OS
conn <- checkedConnect $ defaultConnectInfo { connectPort = UnixSocket sock }
res <- runRedis conn $ get (encodeUtf8 $ T.pack $ instanceId params)
case res of
@ -358,9 +357,6 @@ robotMain dataDownloadDelta defaultState initCallback callback = do @@ -358,9 +357,6 @@ robotMain dataDownloadDelta defaultState initCallback callback = do
Nothing -> do
warningM "main" "Unable to load state"
return defaultState
#else
error "Not implemented"
#endif
loadStateFromFile filepath = withFile filepath ReadMode (\f -> do
maybeState <- decode . BL.fromStrict <$> BS.hGetContents f
@ -369,23 +365,12 @@ robotMain dataDownloadDelta defaultState initCallback callback = do @@ -369,23 +365,12 @@ robotMain dataDownloadDelta defaultState initCallback callback = do
Nothing -> return defaultState ) `catch`
(\e -> warningM "main" ("Unable to load state: " ++ show (e :: IOException)) >> return defaultState)
-- | Helper function to make 'Strategy' instances
mkBarStrategy :: StrategyInstanceParams -> DiffTime -> c -> s -> EventCallback c s -> Strategy c s
mkBarStrategy instanceParams dd params initialState cb = BarStrategy {
downloadDelta = dd,
eventCallback = cb,
currentState = initialState,
strategyParams = params,
strategyTimers = [],
strategyInstanceParams = instanceParams }
-- | Main function which handles incoming events (ticks/orders), passes them to strategy callback
-- and executes returned strategy actions
barStrategyDriver :: (MonadHistory (App hs c s)) => Strategy c s -> MVar () -> App hs c s ()
barStrategyDriver strategy shutdownVar = do
barStrategyDriver :: (MonadHistory (App hs c s)) => DiffTime -> StrategyInstanceParams -> EventCallback c s -> MVar () -> App hs c s ()
barStrategyDriver downloadDelta instanceParams callback shutdownVar = do
now <- liftIO getCurrentTime
history <- M.fromList <$> mapM (loadTickerHistory now) (tickers . strategyInstanceParams $ strategy)
history <- M.fromList <$> mapM (loadTickerHistory now) (tickers instanceParams)
eventChan <- asks envEventChan
brokerChan <- asks envBrokerChan
agg <- asks envAggregator
@ -400,18 +385,17 @@ barStrategyDriver strategy shutdownVar = do @@ -400,18 +385,17 @@ barStrategyDriver strategy shutdownVar = do
writeChan brokerChan BrokerRequestNotifications
lift $ debugM "Strategy" "Wakeup thread forked"
readAndHandleEvents agg strategy
readAndHandleEvents agg instanceParams
lift $ debugM "Strategy" "Stopping strategy driver"
lift $ killThread wakeupTid
where
loadTickerHistory now t = do
history <- getHistory (code t) (BarTimeframe (fromInteger . timeframeSeconds $ t))
((fromRational . toRational . negate $ downloadDelta strategy) `addUTCTime` now) now
((fromRational . toRational . negate $ downloadDelta) `addUTCTime` now) now
return (code t, BarSeries (code t) (Timeframe (timeframeSeconds t)) history)
readAndHandleEvents agg strategy' = do
readAndHandleEvents agg instanceParams' = do
eventChan <- asks envEventChan
event <- lift $ readChan eventChan
if event /= Shutdown
@ -419,16 +403,17 @@ barStrategyDriver strategy shutdownVar = do @@ -419,16 +403,17 @@ barStrategyDriver strategy shutdownVar = do
env <- getEnvironment
let newTimestamp = case event of
NewTick tick -> timestamp tick
_ -> env ^. seLastTimestamp
_ -> env ^. seLastTimestamp
nowRef <- asks envLastTimestamp
lift $ writeIORef nowRef newTimestamp
newTimers <- catMaybes <$> (mapM (checkTimer eventChan newTimestamp) $ strategyTimers strategy')
(eventCallback strategy) event
timersRef <- asks envTimers
oldTimers <- lift $ readIORef timersRef
newTimers <- catMaybes <$> mapM (checkTimer eventChan newTimestamp) oldTimers
callback event
lift $ writeIORef timersRef newTimers
readAndHandleEvents agg strategy'
readAndHandleEvents agg instanceParams'
else
lift $ debugM "Strategy" "Shutdown requested"
where

6
src/ATrade/Driver/Real/QuoteSourceThread.hs

@ -24,8 +24,8 @@ import Control.Monad @@ -24,8 +24,8 @@ import Control.Monad
import System.Log.Logger
import System.ZMQ4 hiding (Event)
startQuoteSourceThread :: Context -> T.Text -> Strategy c s -> BoundedChan Event -> IORef BarAggregator -> (Tick -> Bool) -> Maybe Int -> IO ThreadId
startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter maybeSourceTimeframe = forkIO $ do
startQuoteSourceThread :: Context -> T.Text -> StrategyInstanceParams -> BoundedChan Event -> IORef BarAggregator -> (Tick -> Bool) -> Maybe Int -> IO ThreadId
startQuoteSourceThread ctx qsEp instanceParams eventChan agg tickFilter maybeSourceTimeframe = forkIO $ do
tickChan <- newBoundedChan 1000
bracket (startQuoteSourceClient tickChan tickersList ctx qsEp defaultClientSecurityParams)
(\qs -> do
@ -56,5 +56,5 @@ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter maybeSourceTim @@ -56,5 +56,5 @@ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter maybeSourceTim
goodTick tick = tickFilter tick &&
(datatype tick /= LastTradePrice || (datatype tick == LastTradePrice && volume tick > 0))
tickersList = fmap code . (tickers . strategyInstanceParams) $ strategy
tickersList = fmap code . tickers $ instanceParams

12
src/ATrade/Driver/Types.hs

@ -2,7 +2,6 @@ @@ -2,7 +2,6 @@
module ATrade.Driver.Types
(
Strategy(..),
StrategyInstanceParams(..),
InitializationCallback
) where
@ -13,17 +12,6 @@ import ATrade.RoboCom.Types @@ -13,17 +12,6 @@ import ATrade.RoboCom.Types
import qualified Data.Text as T
import Data.Time.Clock
-- | Top-level strategy configuration and state
data Strategy c s = BarStrategy {
downloadDelta :: DiffTime, -- ^ How much history to download at strategy start
eventCallback :: EventCallback c s, -- ^ Strategy event callback
currentState :: s, -- ^ Current strategy state. Updated after each 'EventCallback' call
strategyParams :: c, -- ^ Strategy params
strategyTimers :: [UTCTime],
strategyInstanceParams :: StrategyInstanceParams -- ^ Instance params
}
-- | Strategy instance params store few params which are common for all strategies
data StrategyInstanceParams = StrategyInstanceParams {
strategyInstanceId :: T.Text, -- ^ Strategy instance identifier. Should be unique among all strategies (very desirable)

4
src/ATrade/RoboCom/Utils.hs

@ -9,6 +9,7 @@ module ATrade.RoboCom.Utils ( @@ -9,6 +9,7 @@ module ATrade.RoboCom.Utils (
barNumber,
getHMS,
getHMS',
fromHMS,
fromHMS',
parseTime
) where
@ -66,6 +67,9 @@ fromHMS' hms = fromIntegral $ h * 3600 + m * 60 + s @@ -66,6 +67,9 @@ fromHMS' hms = fromIntegral $ h * 3600 + m * 60 + s
m = (hms `mod` 10000) `div` 100
s = (hms `mod` 100)
fromHMS :: Int -> Int -> Int -> DiffTime
fromHMS h m s = fromIntegral $ h * 3600 + m * 60 + s
parseTime :: T.Text -> Maybe DiffTime
parseTime x = case readMaybe (T.unpack x) of
Just t -> let h = t `div` 10000

Loading…
Cancel
Save