{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE MultiWayIf #-} {-# LANGUAGE BangPatterns #-} {-# LANGUAGE CPP #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE TypeSynonymInstances #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE FlexibleContexts #-} module ATrade.Driver.Real ( Strategy(..), StrategyInstanceParams(..), robotMain, BigConfig(..), mkBarStrategy, barStrategyDriver ) where import Options.Applicative import System.IO import System.Signal import System.Exit import System.Random import System.Log.Logger import System.Log.Handler.Simple import System.Log.Handler (setFormatter) import System.Log.Formatter import Control.Monad import Control.Monad.Reader import Control.Concurrent hiding (writeChan, readChan, writeList2Chan, yield) import Control.Concurrent.BoundedChan as BC import Control.Exception.Safe import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy as BL import qualified Data.List as L import qualified Data.Map as M import qualified Data.Text as T import Data.Text.Encoding import Data.Aeson import Data.IORef import Data.Time.Calendar import Data.Time.Clock import Data.Time.Clock.POSIX import Data.Maybe import Data.Monoid import Database.Redis hiding (info, decode) import ATrade.Types import ATrade.RoboCom.Monad (StrategyMonad, StrategyAction(..), EventCallback, Event(..), runStrategyElement, StrategyEnvironment(..), Event(..), MonadRobot(..)) import ATrade.BarAggregator import ATrade.Driver.Real.BrokerClientThread import ATrade.Driver.Real.QuoteSourceThread import ATrade.Driver.Types (Strategy(..), StrategyInstanceParams(..), InitializationCallback) import ATrade.RoboCom.Types (BarSeries(..), Ticker(..), Timeframe(..)) import ATrade.Exceptions import ATrade.Quotes.Finam as QF import ATrade.Quotes.QHP as QQ import ATrade.Quotes.HAP as QH import System.ZMQ4 hiding (Event(..)) data Params = Params { instanceId :: String, strategyConfigFile :: FilePath, strategyStateFile :: FilePath, brokerEp :: String, quotesourceEp :: String, historyProviderType :: Maybe String, historyProvider :: Maybe String, redisSocket :: Maybe String, qtisSocket :: Maybe String, accountId :: String, volumeFactor :: Int, sourceBarTimeframe :: Maybe Int } deriving (Show, Eq) paramsParser :: Parser Params paramsParser = Params <$> strOption ( long "instance-id" <> metavar "ID" ) <*> strOption ( long "config" <> metavar "FILEPATH" ) <*> strOption ( long "state" <> metavar "FILEPATH" ) <*> strOption ( long "broker" <> metavar "BROKER_ENDPOINT" ) <*> strOption ( long "quotesource" <> metavar "QUOTESOURCE_ENDPOINT" ) <*> optional ( strOption ( long "history-provider-type" <> metavar "TYPE/ID" )) <*> optional ( strOption ( long "history-provider" <> metavar "ENDPOINT/ID" )) <*> optional ( strOption ( long "redis-socket" <> metavar "ADDRESS" )) <*> optional ( strOption ( long "qtis" <> metavar "ENDPOINT/ID" )) <*> strOption ( long "account" <> metavar "ACCOUNT" ) <*> option auto ( long "volume" <> metavar "VOLUME" ) <*> optional ( option auto ( long "source-timeframe" <> metavar "SECONDS" )) data Env c s = Env { envStrategyInstanceParams :: StrategyInstanceParams, envStrategyEnvironment :: IORef StrategyEnvironment, envConfigRef :: IORef c, envStateRef :: IORef s, envBrokerChan :: BC.BoundedChan BrokerCommand, envTimers :: IORef [UTCTime], envEventChan :: BC.BoundedChan Event } type App c s = ReaderT (Env c s) IO instance MonadRobot (App c s) c s where submitOrder order = do bc <- asks envBrokerChan lift $ BC.writeChan bc $ BrokerSubmitOrder order cancelOrder oId = do bc <- asks envBrokerChan lift $ BC.writeChan bc $ BrokerCancelOrder oId appendToLog = lift . debugM "Strategy" . T.unpack setupTimer t = do timers <- asks envTimers lift $ atomicModifyIORef' timers (\s -> (t : s, ())) enqueueIOAction actionId action = do eventChan <- asks envEventChan lift $ void $ forkIO $ do v <- action BC.writeChan eventChan $ ActionCompleted actionId v getConfig = asks envConfigRef >>= lift . readIORef getState = asks envStateRef >>= lift . readIORef setState s = do ref <- asks envStateRef lift $ writeIORef ref s getEnvironment = asks envStrategyEnvironment >>= lift . readIORef data BigConfig c = BigConfig { confTickers :: [Ticker], strategyConfig :: c } instance (FromJSON c) => FromJSON (BigConfig c) where parseJSON = withObject "object" (\obj -> BigConfig <$> obj .: "tickers" <*> obj .: "params") instance (ToJSON c) => ToJSON (BigConfig c) where toJSON conf = object ["tickers" .= confTickers conf, "params" .= strategyConfig conf ] storeState :: (ToJSON s) => Params -> IORef s -> IORef [UTCTime] -> IO () storeState params stateRef timersRef = do currentStrategyState <- readIORef stateRef currentTimersState <- readIORef timersRef case redisSocket params of Nothing -> withFile (strategyStateFile params) WriteMode (\f -> BS.hPut f $ BL.toStrict $ encode currentStrategyState) `catch` (\e -> warningM "main" ("Unable to save state: " ++ show (e :: IOException))) Just sock -> do #ifdef linux_HOST_OS conn <- checkedConnect $ defaultConnectInfo { connectPort = UnixSocket sock } now <- getPOSIXTime res <- runRedis conn $ mset [(encodeUtf8 $ T.pack $ instanceId params, BL.toStrict $ encode currentStrategyState), (encodeUtf8 $ T.pack $ instanceId params ++ ":last_store", encodeUtf8 $ T.pack $ show now), (encodeUtf8 $ T.pack $ instanceId params ++ ":timers", BL.toStrict $ encode currentTimersState) ] case res of Left _ -> warningM "main" "Unable to save state" Right _ -> return () #else return () #endif gracefulShutdown :: (ToJSON s) => Params -> IORef s -> IORef [UTCTime] -> MVar () -> Signal -> IO () gracefulShutdown params stateRef timersRef shutdownMv _ = do infoM "main" "Shutdown, saving state" storeState params stateRef timersRef putMVar shutdownMv () exitSuccess robotMain :: (ToJSON s, FromJSON s, FromJSON c) => DiffTime -> s -> Maybe (InitializationCallback c) -> EventCallback c s -> IO () robotMain dataDownloadDelta defaultState initCallback callback = do params <- execParser opts initLogging params infoM "main" "Starting" (tickerList, config) <- loadStrategyConfig params stratState <- loadStrategyState params timersState <- loadStrategyTimers params let instanceParams = StrategyInstanceParams { strategyInstanceId = T.pack . instanceId $ params, strategyAccount = T.pack . accountId $ params, strategyVolume = volumeFactor params, tickers = tickerList, strategyQuotesourceEp = T.pack . quotesourceEp $ params, strategyBrokerEp = T.pack . brokerEp $ params, strategyHistoryProviderType = T.pack $ fromMaybe "finam" $ historyProviderType params, strategyHistoryProvider = T.pack $ fromMaybe "" $ historyProvider params, strategyQTISEp = T.pack <$> qtisSocket params} updatedConfig <- case initCallback of Just cb -> cb config instanceParams Nothing -> return config let strategy = mkBarStrategy instanceParams dataDownloadDelta updatedConfig stratState callback stateRef <- newIORef stratState configRef <- newIORef config timersRef <- newIORef timersState shutdownMv <- newEmptyMVar installHandler sigINT (gracefulShutdown params stateRef timersRef shutdownMv) installHandler sigTERM (gracefulShutdown params stateRef timersRef shutdownMv) randsec <- getStdRandom(randomR(1, 10)) threadDelay $ randsec * 1000000 debugM "main" "Forking state saving thread" stateSavingThread <- forkIO $ forever $ do threadDelay 1000000 storeState params stateRef timersRef straEnv <- newIORef StrategyEnvironment { seInstanceId = strategyInstanceId . strategyInstanceParams $ strategy, seAccount = strategyAccount . strategyInstanceParams $ strategy, seVolume = strategyVolume . strategyInstanceParams $ strategy, seBars = M.empty, seLastTimestamp = UTCTime (fromGregorian 1970 1 1) 0 } -- Event channel is for strategy events, like new tick arrival, or order execution notification eventChan <- BC.newBoundedChan 1000 -- Orders channel passes strategy orders to broker thread brokerChan <- BC.newBoundedChan 1000 debugM "main" "Starting strategy driver" let env = Env { envStrategyInstanceParams = instanceParams, envStrategyEnvironment = straEnv, envConfigRef = configRef, envStateRef = stateRef, envBrokerChan = brokerChan, envTimers = timersRef, envEventChan = eventChan } withContext (\ctx -> runReaderT (barStrategyDriver ctx (sourceBarTimeframe params) tickFilter strategy configRef stateRef timersRef shutdownMv) env `finally` killThread stateSavingThread) where tickFilter :: Tick -> Bool tickFilter tick = let classCode = T.takeWhile (/= '#') (security tick) in if | classCode == "SPBFUT" || classCode == "SPBOPT" -> any (inInterval . utctDayTime . timestamp $ tick) fortsIntervals | otherwise -> any (inInterval . utctDayTime . timestamp $ tick) secIntervals fortsIntervals = [(fromHMS 7 0 0, fromHMS 11 0 0), (fromHMS 11 5 0, fromHMS 15 45 0), (fromHMS 16 0 0, fromHMS 20 50 0)] secIntervals = [(fromHMS 6 50 0, fromHMS 15 51 0)] fromHMS h m s = h * 3600 + m * 60 + s inInterval ts (start, end) = ts >= start && ts <= end opts = info (helper <*> paramsParser) ( fullDesc <> header "ATrade strategy execution framework" ) initLogging params = do handler <- streamHandler stderr DEBUG >>= (\x -> return $ setFormatter x (simpleLogFormatter $ "$utcTime\t[" ++ instanceId params ++ "]\t\t{$loggername}\t\t<$prio> -> $msg")) hSetBuffering stderr LineBuffering updateGlobalLogger rootLoggerName (setLevel DEBUG) updateGlobalLogger rootLoggerName (setHandlers [handler]) loadStrategyConfig params = withFile (strategyConfigFile params) ReadMode (\f -> do bigconfig <- eitherDecode . BL.fromStrict <$> BS.hGetContents f case bigconfig of Right conf -> return (confTickers conf, strategyConfig conf) Left errmsg -> throw $ UnableToLoadConfig $ (T.pack . show) errmsg) loadStrategyTimers :: Params -> IO [UTCTime] loadStrategyTimers params = case redisSocket params of Nothing -> return [] Just sock -> do #ifdef linux_HOST_OS conn <- checkedConnect $ defaultConnectInfo { connectPort = UnixSocket sock } res <- runRedis conn $ get (encodeUtf8 $ T.pack $ instanceId params ++ ":timers") case res of Left _ -> do warningM "main" "Unable to load state" return [] Right mv -> case mv of Just v -> case eitherDecode $ BL.fromStrict v of Left _ -> do warningM "main" "Unable to load state" return [] Right s -> return s Nothing -> do warningM "main" "Unable to load state" return [] #else error "Not implemented" #endif loadStrategyState params = case redisSocket params of Nothing -> loadStateFromFile (strategyStateFile params) Just sock -> do #ifdef linux_HOST_OS conn <- checkedConnect $ defaultConnectInfo { connectPort = UnixSocket sock } res <- runRedis conn $ get (encodeUtf8 $ T.pack $ instanceId params) case res of Left _ -> do warningM "main" "Unable to load state" return defaultState Right mv -> case mv of Just v -> case eitherDecode $ BL.fromStrict v of Left _ -> do warningM "main" "Unable to load state" return defaultState Right s -> return s Nothing -> do warningM "main" "Unable to load state" return defaultState #else error "Not implemented" #endif loadStateFromFile filepath = withFile filepath ReadMode (\f -> do maybeState <- decode . BL.fromStrict <$> BS.hGetContents f case maybeState of Just st -> return st Nothing -> return defaultState ) `catch` (\e -> warningM "main" ("Unable to load state: " ++ show (e :: IOException)) >> return defaultState) -- | Helper function to make 'Strategy' instances mkBarStrategy :: StrategyInstanceParams -> DiffTime -> c -> s -> EventCallback c s -> Strategy c s mkBarStrategy instanceParams dd params initialState cb = BarStrategy { downloadDelta = dd, eventCallback = cb, currentState = initialState, strategyParams = params, strategyTimers = [], strategyInstanceParams = instanceParams } -- | Main function which handles incoming events (ticks/orders), passes them to strategy callback -- and executes returned strategy actions barStrategyDriver :: Context -> Maybe Int -> (Tick -> Bool) -> Strategy c s -> IORef c -> IORef s -> IORef [UTCTime] -> MVar () -> App c s () barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef timersRef shutdownVar = do -- Load tickers data and create BarAggregator from them historyBars <- lift $ if | (strategyHistoryProviderType . strategyInstanceParams) strategy == "finam" -> M.fromList <$> mapM loadTickerFromFinam (tickers . strategyInstanceParams $ strategy) | (strategyHistoryProviderType . strategyInstanceParams) strategy == "hap" -> M.fromList <$> mapM (loadTickerFromHAP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy)) (tickers . strategyInstanceParams $ strategy) | otherwise -> M.fromList <$> mapM (loadTickerFromQHP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy)) (tickers . strategyInstanceParams $ strategy) agg <- lift . newIORef $ mkAggregatorFromBars historyBars [(hmsToDiffTime 6 50 0, hmsToDiffTime 21 10 0)] eventChan <- asks envEventChan brokerChan <- asks envBrokerChan bracket (lift $ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter mbSourceTimeframe) (lift . killThread) (\_ -> do lift $ debugM "Strategy" "QuoteSource thread forked" bracket (lift $ startBrokerClientThread (strategyInstanceId . strategyInstanceParams $ strategy) ctx brEp brokerChan eventChan shutdownVar) (lift . killThread) (\_ -> do lift $ debugM "Strategy" "Broker thread forked" wakeupTid <- lift . forkIO $ forever $ do maybeShutdown <- tryTakeMVar shutdownVar if isJust maybeShutdown then writeChan eventChan Shutdown else do threadDelay 1000000 writeChan brokerChan BrokerRequestNotifications lift $ debugM "Strategy" "Wakeup thread forked" readAndHandleEvents agg strategy lift $ debugM "Strategy" "Stopping strategy driver" lift $ killThread wakeupTid)) lift $ debugM "Strategy" "Strategy done" where qsEp = strategyQuotesourceEp . strategyInstanceParams $ strategy brEp = strategyBrokerEp . strategyInstanceParams $ strategy readAndHandleEvents agg strategy' = do eventChan <- asks envEventChan event <- lift $ readChan eventChan if event /= Shutdown then do lift $ debugM "Strategy" $ "event: " ++ show event env <- getEnvironment let newTimestamp = case event of NewTick tick -> timestamp tick _ -> seLastTimestamp env newTimers <- catMaybes <$> (mapM (checkTimer eventChan newTimestamp) $ strategyTimers strategy') (eventCallback strategy) event lift $ writeIORef timersRef newTimers readAndHandleEvents agg strategy' else lift $ debugM "Strategy" "Shutdown requested" where checkTimer eventChan' newTimestamp timerTime = if newTimestamp >= timerTime then do lift $ writeChan eventChan' $ TimerFired timerTime return Nothing else return $ Just timerTime loadTickerFromHAP :: Context -> T.Text -> Ticker -> IO (TickerId, BarSeries) loadTickerFromHAP ctx ep t = do debugM "Strategy" $ "Loading ticker from HAP: " ++ show (code t) case parseHAPPeriod $ timeframeSeconds t of Just tf -> do now <- getCurrentTime historyBars <- QH.getQuotes ctx QH.RequestParams { QH.endpoint = ep, QH.ticker = code t, QH.startDate = addUTCTime (negate . (1 +) . fromRational . toRational $ downloadDelta strategy) now, QH.endDate = now, QH.period = tf } debugM "Strategy" $ "Obtained " ++ show (length historyBars) ++ " bars" return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = historyBars }) _ -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = [] }) loadTickerFromQHP :: Context -> T.Text -> Ticker -> IO (TickerId, BarSeries) loadTickerFromQHP ctx ep t = do debugM "Strategy" $ "Loading ticker from QHP: " ++ show (code t) case parseQHPPeriod $ timeframeSeconds t of Just tf -> do now <- getCurrentTime historyBars <- QQ.getQuotes ctx QQ.RequestParams { QQ.endpoint = ep, QQ.ticker = code t, QQ.startDate = addDays (negate . (1 +) . ceiling $ downloadDelta strategy / 86400) (utctDay now), QQ.endDate = utctDay now, QQ.period = tf } debugM "Strategy" $ "Obtained " ++ show (length historyBars) ++ " bars" debugM "Strategy" $ show (take 20 historyBars) return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = historyBars }) _ -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = [] }) loadTickerFromFinam :: Ticker -> IO (TickerId, BarSeries) loadTickerFromFinam t = do randDelay <- getStdRandom (randomR (1, 5)) threadDelay $ randDelay * 1000000 now <- getCurrentTime debugM "Strategy" $ show (L.lookup "finam" (aliases t), parseFinamPeriod $ timeframeSeconds t) case (L.lookup "finam" (aliases t), parseFinamPeriod $ timeframeSeconds t) of (Just finamCode, Just per) -> do debugM "Strategy" $ "Downloading ticker: " ++ finamCode history <- downloadAndParseQuotes $ defaultParams { QF.ticker = T.pack finamCode, QF.startDate = addDays (negate . (1 +) . ceiling $ downloadDelta strategy / 86400) (utctDay now), QF.endDate = utctDay now, QF.period = per } case history of Just h -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = convertFromFinamHistory (code t) h }) Nothing -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = [] }) _ -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = [] }) convertFromFinamHistory :: TickerId -> [Row] -> [Bar] convertFromFinamHistory tid = L.reverse . fmap (\row -> Bar { barSecurity = tid, barTimestamp = rowTime row, barOpen = rowOpen row, barHigh = rowHigh row, barLow = rowLow row, barClose = rowClose row, barVolume = rowVolume row }) parseFinamPeriod x | x == 0 = Just QF.PeriodTick | x == 60 = Just QF.Period1Min | x == 5 * 60 = Just QF.Period5Min | x == 10 * 60 = Just QF.Period10Min | x == 15 * 60 = Just QF.Period15Min | x == 30 * 60 = Just QF.Period30Min | x == 60 * 60 = Just QF.PeriodHour | x == 24 * 60 * 60 = Just QF.PeriodDay | otherwise = Nothing parseQHPPeriod x | x == 60 = Just QQ.Period1Min | x == 5 * 60 = Just QQ.Period5Min | x == 15 * 60 = Just QQ.Period15Min | x == 30 * 60 = Just QQ.Period30Min | x == 60 * 60 = Just QQ.PeriodHour | x == 24 * 60 * 60 = Just QQ.PeriodDay | otherwise = Nothing parseHAPPeriod x | x == 60 = Just QH.Period1Min | x == 5 * 60 = Just QH.Period5Min | x == 15 * 60 = Just QH.Period15Min | x == 30 * 60 = Just QH.Period30Min | x == 60 * 60 = Just QH.PeriodHour | x == 24 * 60 * 60 = Just QH.PeriodDay | otherwise = Nothing