diff --git a/src/ATrade/Driver/Real.hs b/src/ATrade/Driver/Real.hs index 83f11a9..7924988 100644 --- a/src/ATrade/Driver/Real.hs +++ b/src/ATrade/Driver/Real.hs @@ -1,74 +1,81 @@ -{-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE MultiWayIf #-} -{-# LANGUAGE CPP #-} -{-# LANGUAGE RankNTypes #-} -{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE CPP #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE MultiParamTypeClasses #-} -{-# LANGUAGE FlexibleContexts #-} -{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE MultiWayIf #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RankNTypes #-} module ATrade.Driver.Real ( - Strategy(..), StrategyInstanceParams(..), robotMain, BigConfig(..), - mkBarStrategy, barStrategyDriver ) where -import Options.Applicative -import System.IO -import System.Signal -import System.Exit -import System.Random -import System.Log.Logger -import System.Log.Handler.Simple -import System.Log.Handler (setFormatter) -import System.Log.Formatter -import Control.Monad -import Control.Monad.Reader -import Control.Concurrent hiding (writeChan, readChan, writeList2Chan, yield) -import Control.Concurrent.BoundedChan as BC -import Control.Exception.Safe -import Control.Lens hiding (Context, (.=)) -import qualified Data.ByteString as BS -import qualified Data.ByteString.Lazy as BL -import qualified Data.Map as M -import qualified Data.Text as T -import Data.Text.Encoding -import Data.Aeson -import Data.IORef -import Data.Time.Calendar -import Data.Time.Clock -import Data.Time.Clock.POSIX -import Data.Maybe -import Database.Redis hiding (info, decode) -import ATrade.Types -import ATrade.Quotes -import ATrade.RoboCom.Monad (EventCallback, Event(..), StrategyEnvironment(..), seBars, seLastTimestamp, Event(..), MonadRobot(..)) -import ATrade.BarAggregator -import ATrade.Driver.Real.BrokerClientThread -import ATrade.Driver.Real.QuoteSourceThread -import ATrade.Driver.Types (Strategy(..), StrategyInstanceParams(..), InitializationCallback) -import ATrade.RoboCom.Types (BarSeries(..), Ticker(..), Timeframe(..)) -import ATrade.Exceptions -import ATrade.Quotes.QHP as QQ -import System.ZMQ4 hiding (Event(..)) -import GHC.Generics +import ATrade.BarAggregator +import ATrade.Driver.Real.BrokerClientThread +import ATrade.Driver.Real.QuoteSourceThread +import ATrade.Driver.Types (InitializationCallback, StrategyInstanceParams (..)) +import ATrade.Exceptions +import ATrade.Quotes +import ATrade.Quotes.QHP as QQ +import ATrade.RoboCom.Monad (Event (..), + EventCallback, + MonadRobot (..), + StrategyEnvironment (..), + seBars, seLastTimestamp) +import ATrade.RoboCom.Types (BarSeries (..), + Ticker (..), + Timeframe (..)) +import ATrade.RoboCom.Utils (fromHMS) +import ATrade.Types +import Control.Concurrent hiding (readChan, + writeChan, + writeList2Chan, yield) +import Control.Concurrent.BoundedChan as BC +import Control.Exception.Safe +import Control.Lens hiding (Context, (.=)) +import Control.Monad +import Control.Monad.Reader +import Data.Aeson +import qualified Data.ByteString as BS +import qualified Data.ByteString.Lazy as BL +import Data.IORef +import qualified Data.Map as M +import Data.Maybe +import qualified Data.Text as T +import Data.Text.Encoding +import Data.Time.Calendar +import Data.Time.Clock +import Data.Time.Clock.POSIX +import Database.Redis hiding (decode, info) +import GHC.Generics +import Options.Applicative +import System.Exit +import System.IO +import System.Log.Formatter +import System.Log.Handler (setFormatter) +import System.Log.Handler.Simple +import System.Log.Logger +import System.Random +import System.Signal +import System.ZMQ4 hiding (Event (..)) data Params = Params { - instanceId :: String, - strategyConfigFile :: FilePath, - strategyStateFile :: FilePath, - brokerEp :: String, - quotesourceEp :: String, + instanceId :: String, + strategyConfigFile :: FilePath, + strategyStateFile :: FilePath, + brokerEp :: String, + quotesourceEp :: String, historyProviderType :: Maybe String, - historyProvider :: Maybe String, - redisSocket :: Maybe String, - qtisSocket :: Maybe String, - accountId :: String, - volumeFactor :: Int, - sourceBarTimeframe :: Maybe Int + historyProvider :: Maybe String, + redisSocket :: Maybe String, + qtisSocket :: Maybe String, + accountId :: String, + volumeFactor :: Int, + sourceBarTimeframe :: Maybe Int } deriving (Show, Eq) paramsParser :: Parser Params @@ -111,16 +118,16 @@ paramsParser = Params <> metavar "SECONDS" )) data Env historySource c s = Env { - envHistorySource :: historySource, + envHistorySource :: historySource, envStrategyInstanceParams :: StrategyInstanceParams, - envStrategyEnvironment :: IORef StrategyEnvironment, - envConfigRef :: IORef c, - envStateRef :: IORef s, - envBrokerChan :: BC.BoundedChan BrokerCommand, - envTimers :: IORef [UTCTime], - envEventChan :: BC.BoundedChan Event, - envAggregator :: IORef BarAggregator, - envLastTimestamp :: IORef UTCTime + envStrategyEnvironment :: IORef StrategyEnvironment, + envConfigRef :: IORef c, + envStateRef :: IORef s, + envBrokerChan :: BC.BoundedChan BrokerCommand, + envTimers :: IORef [UTCTime], + envEventChan :: BC.BoundedChan Event, + envAggregator :: IORef BarAggregator, + envLastTimestamp :: IORef UTCTime } deriving (Generic) type App historySource c s = ReaderT (Env historySource c s) IO @@ -133,7 +140,7 @@ instance MonadRobot (App historySource c s) c s where cancelOrder oId = do bc <- asks envBrokerChan lift $ BC.writeChan bc $ BrokerCancelOrder oId - + appendToLog = lift . debugM "Strategy" . T.unpack setupTimer t = do timers <- asks envTimers @@ -144,7 +151,7 @@ instance MonadRobot (App historySource c s) c s where lift $ void $ forkIO $ do v <- action' BC.writeChan eventChan $ ActionCompleted actionId v - + getConfig = asks envConfigRef >>= lift . readIORef getState = asks envStateRef >>= lift . readIORef setState s = do @@ -166,7 +173,7 @@ instance MonadHistory (App QQ.QHPHandle c s) where QQ.requestHistoryFromQHP qhp tickerId timeframe fromTime toTime data BigConfig c = BigConfig { - confTickers :: [Ticker], + confTickers :: [Ticker], strategyConfig :: c } @@ -187,7 +194,7 @@ storeState params stateRef timersRef = do Nothing -> withFile (strategyStateFile params) WriteMode (\f -> BS.hPut f $ BL.toStrict $ encode currentStrategyState) `catch` (\e -> warningM "main" ("Unable to save state: " ++ show (e :: IOException))) Just sock -> do -#ifdef linux_HOST_OS + conn <- checkedConnect $ defaultConnectInfo { connectPort = UnixSocket sock } now <- getPOSIXTime res <- runRedis conn $ mset [(encodeUtf8 $ T.pack $ instanceId params, BL.toStrict $ encode currentStrategyState), @@ -195,11 +202,11 @@ storeState params stateRef timersRef = do (encodeUtf8 $ T.pack $ instanceId params ++ ":timers", BL.toStrict $ encode currentTimersState) ] case res of - Left _ -> warningM "main" "Unable to save state" + Left _ -> warningM "main" "Unable to save state" Right _ -> return () -#else - return () -#endif + + + gracefulShutdown :: (ToJSON s) => Params -> IORef s -> IORef [UTCTime] -> MVar () -> Signal -> IO () gracefulShutdown params stateRef timersRef shutdownMv _ = do @@ -229,24 +236,21 @@ robotMain dataDownloadDelta defaultState initCallback callback = do Just cb -> cb config instanceParams Nothing -> return config - let strategy = mkBarStrategy instanceParams dataDownloadDelta updatedConfig stratState callback stateRef <- newIORef stratState configRef <- newIORef updatedConfig timersRef <- newIORef timersState shutdownMv <- newEmptyMVar installHandler sigINT (gracefulShutdown params stateRef timersRef shutdownMv) installHandler sigTERM (gracefulShutdown params stateRef timersRef shutdownMv) - randsec <- getStdRandom(randomR(1, 10)) - threadDelay $ randsec * 1000000 debugM "main" "Forking state saving thread" stateSavingThread <- forkIO $ forever $ do threadDelay 1000000 storeState params stateRef timersRef straEnv <- newIORef StrategyEnvironment { - _seInstanceId = strategyInstanceId . strategyInstanceParams $ strategy, - _seAccount = strategyAccount . strategyInstanceParams $ strategy, - _seVolume = strategyVolume . strategyInstanceParams $ strategy, + _seInstanceId = strategyInstanceId instanceParams, + _seAccount = strategyAccount instanceParams, + _seVolume = strategyVolume instanceParams, _seBars = M.empty, _seLastTimestamp = UTCTime (fromGregorian 1970 1 1) 0 } @@ -261,9 +265,9 @@ robotMain dataDownloadDelta defaultState initCallback callback = do let qsEp = T.pack $ quotesourceEp params let brEp = T.pack $ brokerEp params agg <- newIORef $ mkAggregatorFromBars M.empty [(hmsToDiffTime 3 50 0, hmsToDiffTime 21 10 0)] - bracket (startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter (sourceBarTimeframe params)) killThread $ \_ -> do + bracket (startQuoteSourceThread ctx qsEp instanceParams eventChan agg tickFilter (sourceBarTimeframe params)) killThread $ \_ -> do debugM "Strategy" "QuoteSource thread forked" - bracket (startBrokerClientThread (strategyInstanceId . strategyInstanceParams $ strategy) ctx brEp brokerChan eventChan shutdownMv) killThread $ \_ -> do + bracket (startBrokerClientThread (strategyInstanceId instanceParams) ctx brEp brokerChan eventChan shutdownMv) killThread $ \_ -> do debugM "Strategy" "Broker thread forked" now <- getCurrentTime >>= newIORef @@ -280,7 +284,7 @@ robotMain dataDownloadDelta defaultState initCallback callback = do envAggregator = agg, envLastTimestamp = now } - runReaderT (barStrategyDriver strategy shutdownMv) env `finally` killThread stateSavingThread) + runReaderT (barStrategyDriver dataDownloadDelta instanceParams callback shutdownMv) env `finally` killThread stateSavingThread) where tickFilter :: Tick -> Bool tickFilter tick = @@ -292,7 +296,6 @@ robotMain dataDownloadDelta defaultState initCallback callback = do fortsIntervals = [(fromHMS 4 0 0, fromHMS 11 0 0), (fromHMS 11 5 0, fromHMS 15 45 0), (fromHMS 16 0 0, fromHMS 20 50 0)] secIntervals = [(fromHMS 6 50 0, fromHMS 15 51 0)] - fromHMS h m s = h * 3600 + m * 60 + s inInterval ts (start, end) = ts >= start && ts <= end opts = info (helper <*> paramsParser) @@ -311,14 +314,14 @@ robotMain dataDownloadDelta defaultState initCallback callback = do loadStrategyConfig params = withFile (strategyConfigFile params) ReadMode (\f -> do bigconfig <- eitherDecode . BL.fromStrict <$> BS.hGetContents f case bigconfig of - Right conf -> return (confTickers conf, strategyConfig conf) + Right conf -> return (confTickers conf, strategyConfig conf) Left errmsg -> throw $ UnableToLoadConfig $ (T.pack . show) errmsg) loadStrategyTimers :: Params -> IO [UTCTime] loadStrategyTimers params = case redisSocket params of Nothing -> return [] Just sock -> do -#ifdef linux_HOST_OS + conn <- checkedConnect $ defaultConnectInfo { connectPort = UnixSocket sock } res <- runRedis conn $ get (encodeUtf8 $ T.pack $ instanceId params ++ ":timers") case res of @@ -334,15 +337,11 @@ robotMain dataDownloadDelta defaultState initCallback callback = do Nothing -> do warningM "main" "Unable to load state" return [] -#else - error "Not implemented" -#endif - loadStrategyState params = case redisSocket params of Nothing -> loadStateFromFile (strategyStateFile params) Just sock -> do -#ifdef linux_HOST_OS + conn <- checkedConnect $ defaultConnectInfo { connectPort = UnixSocket sock } res <- runRedis conn $ get (encodeUtf8 $ T.pack $ instanceId params) case res of @@ -358,34 +357,20 @@ robotMain dataDownloadDelta defaultState initCallback callback = do Nothing -> do warningM "main" "Unable to load state" return defaultState -#else - error "Not implemented" -#endif - + loadStateFromFile filepath = withFile filepath ReadMode (\f -> do maybeState <- decode . BL.fromStrict <$> BS.hGetContents f case maybeState of Just st -> return st Nothing -> return defaultState ) `catch` (\e -> warningM "main" ("Unable to load state: " ++ show (e :: IOException)) >> return defaultState) - --- | Helper function to make 'Strategy' instances -mkBarStrategy :: StrategyInstanceParams -> DiffTime -> c -> s -> EventCallback c s -> Strategy c s -mkBarStrategy instanceParams dd params initialState cb = BarStrategy { - downloadDelta = dd, - eventCallback = cb, - currentState = initialState, - strategyParams = params, - strategyTimers = [], - - strategyInstanceParams = instanceParams } -- | Main function which handles incoming events (ticks/orders), passes them to strategy callback -- and executes returned strategy actions -barStrategyDriver :: (MonadHistory (App hs c s)) => Strategy c s -> MVar () -> App hs c s () -barStrategyDriver strategy shutdownVar = do +barStrategyDriver :: (MonadHistory (App hs c s)) => DiffTime -> StrategyInstanceParams -> EventCallback c s -> MVar () -> App hs c s () +barStrategyDriver downloadDelta instanceParams callback shutdownVar = do now <- liftIO getCurrentTime - history <- M.fromList <$> mapM (loadTickerHistory now) (tickers . strategyInstanceParams $ strategy) + history <- M.fromList <$> mapM (loadTickerHistory now) (tickers instanceParams) eventChan <- asks envEventChan brokerChan <- asks envBrokerChan agg <- asks envAggregator @@ -400,18 +385,17 @@ barStrategyDriver strategy shutdownVar = do writeChan brokerChan BrokerRequestNotifications lift $ debugM "Strategy" "Wakeup thread forked" - readAndHandleEvents agg strategy + readAndHandleEvents agg instanceParams lift $ debugM "Strategy" "Stopping strategy driver" lift $ killThread wakeupTid where - loadTickerHistory now t = do history <- getHistory (code t) (BarTimeframe (fromInteger . timeframeSeconds $ t)) - ((fromRational . toRational . negate $ downloadDelta strategy) `addUTCTime` now) now + ((fromRational . toRational . negate $ downloadDelta) `addUTCTime` now) now return (code t, BarSeries (code t) (Timeframe (timeframeSeconds t)) history) - readAndHandleEvents agg strategy' = do + readAndHandleEvents agg instanceParams' = do eventChan <- asks envEventChan event <- lift $ readChan eventChan if event /= Shutdown @@ -419,16 +403,17 @@ barStrategyDriver strategy shutdownVar = do env <- getEnvironment let newTimestamp = case event of NewTick tick -> timestamp tick - _ -> env ^. seLastTimestamp + _ -> env ^. seLastTimestamp nowRef <- asks envLastTimestamp lift $ writeIORef nowRef newTimestamp - newTimers <- catMaybes <$> (mapM (checkTimer eventChan newTimestamp) $ strategyTimers strategy') - (eventCallback strategy) event timersRef <- asks envTimers + oldTimers <- lift $ readIORef timersRef + newTimers <- catMaybes <$> mapM (checkTimer eventChan newTimestamp) oldTimers + callback event lift $ writeIORef timersRef newTimers - readAndHandleEvents agg strategy' + readAndHandleEvents agg instanceParams' else lift $ debugM "Strategy" "Shutdown requested" where diff --git a/src/ATrade/Driver/Real/QuoteSourceThread.hs b/src/ATrade/Driver/Real/QuoteSourceThread.hs index 007f4c7..e1f1d3e 100644 --- a/src/ATrade/Driver/Real/QuoteSourceThread.hs +++ b/src/ATrade/Driver/Real/QuoteSourceThread.hs @@ -24,8 +24,8 @@ import Control.Monad import System.Log.Logger import System.ZMQ4 hiding (Event) -startQuoteSourceThread :: Context -> T.Text -> Strategy c s -> BoundedChan Event -> IORef BarAggregator -> (Tick -> Bool) -> Maybe Int -> IO ThreadId -startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter maybeSourceTimeframe = forkIO $ do +startQuoteSourceThread :: Context -> T.Text -> StrategyInstanceParams -> BoundedChan Event -> IORef BarAggregator -> (Tick -> Bool) -> Maybe Int -> IO ThreadId +startQuoteSourceThread ctx qsEp instanceParams eventChan agg tickFilter maybeSourceTimeframe = forkIO $ do tickChan <- newBoundedChan 1000 bracket (startQuoteSourceClient tickChan tickersList ctx qsEp defaultClientSecurityParams) (\qs -> do @@ -56,5 +56,5 @@ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter maybeSourceTim goodTick tick = tickFilter tick && (datatype tick /= LastTradePrice || (datatype tick == LastTradePrice && volume tick > 0)) - tickersList = fmap code . (tickers . strategyInstanceParams) $ strategy + tickersList = fmap code . tickers $ instanceParams diff --git a/src/ATrade/Driver/Types.hs b/src/ATrade/Driver/Types.hs index 74b8393..2736a81 100644 --- a/src/ATrade/Driver/Types.hs +++ b/src/ATrade/Driver/Types.hs @@ -2,7 +2,6 @@ module ATrade.Driver.Types ( - Strategy(..), StrategyInstanceParams(..), InitializationCallback ) where @@ -13,17 +12,6 @@ import ATrade.RoboCom.Types import qualified Data.Text as T import Data.Time.Clock --- | Top-level strategy configuration and state -data Strategy c s = BarStrategy { - downloadDelta :: DiffTime, -- ^ How much history to download at strategy start - eventCallback :: EventCallback c s, -- ^ Strategy event callback - currentState :: s, -- ^ Current strategy state. Updated after each 'EventCallback' call - strategyParams :: c, -- ^ Strategy params - strategyTimers :: [UTCTime], - - strategyInstanceParams :: StrategyInstanceParams -- ^ Instance params -} - -- | Strategy instance params store few params which are common for all strategies data StrategyInstanceParams = StrategyInstanceParams { strategyInstanceId :: T.Text, -- ^ Strategy instance identifier. Should be unique among all strategies (very desirable) diff --git a/src/ATrade/RoboCom/Utils.hs b/src/ATrade/RoboCom/Utils.hs index f4176d6..ff3df31 100644 --- a/src/ATrade/RoboCom/Utils.hs +++ b/src/ATrade/RoboCom/Utils.hs @@ -9,6 +9,7 @@ module ATrade.RoboCom.Utils ( barNumber, getHMS, getHMS', + fromHMS, fromHMS', parseTime ) where @@ -66,6 +67,9 @@ fromHMS' hms = fromIntegral $ h * 3600 + m * 60 + s m = (hms `mod` 10000) `div` 100 s = (hms `mod` 100) +fromHMS :: Int -> Int -> Int -> DiffTime +fromHMS h m s = fromIntegral $ h * 3600 + m * 60 + s + parseTime :: T.Text -> Maybe DiffTime parseTime x = case readMaybe (T.unpack x) of Just t -> let h = t `div` 10000