Execution layer for algorithmic trading
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

486 lines
20 KiB

{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE RankNTypes #-}
module ATrade.Driver.Real (
Strategy(..),
StrategyInstanceParams(..),
robotMain,
BigConfig(..),
mkBarStrategy,
barStrategyDriver
) where
import Options.Applicative
import System.IO
import System.Signal
import System.Exit
import System.Random
import System.Log.Logger
import System.Log.Handler.Simple
import System.Log.Handler (setFormatter)
import System.Log.Formatter
import Control.Monad
import Control.Concurrent hiding (writeChan, readChan, writeList2Chan, yield)
import Control.Concurrent.BoundedChan as BC
import Control.Exception
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BL
import qualified Data.List as L
import qualified Data.Map as M
import qualified Data.Text as T
import Data.Text.Encoding
import Data.Aeson
import Data.IORef
import Data.Time.Calendar
import Data.Time.Clock
import Data.Time.Clock.POSIX
import Data.Maybe
import Data.Monoid
import Database.Redis hiding (info, decode)
import ATrade.Types
import ATrade.RoboCom.Monad (StrategyMonad, StrategyAction(..), EventCallback, Event(..), runStrategyElement, StrategyEnvironment(..), Event(..))
import ATrade.BarAggregator
import ATrade.Driver.Real.BrokerClientThread
import ATrade.Driver.Real.QuoteSourceThread
import ATrade.Driver.Types (Strategy(..), StrategyInstanceParams(..), InitializationCallback)
import ATrade.RoboCom.Types (BarSeries(..), Ticker(..), Timeframe(..))
import ATrade.Exceptions
import ATrade.Quotes.Finam as QF
import ATrade.Quotes.QHP as QQ
import ATrade.Quotes.HAP as QH
import System.ZMQ4 hiding (Event(..))
data Params = Params {
instanceId :: String,
strategyConfigFile :: FilePath,
strategyStateFile :: FilePath,
brokerEp :: String,
quotesourceEp :: String,
historyProviderType :: Maybe String,
historyProvider :: Maybe String,
redisSocket :: Maybe String,
qtisSocket :: Maybe String,
accountId :: String,
volumeFactor :: Int,
sourceBarTimeframe :: Maybe Int
} deriving (Show, Eq)
paramsParser :: Parser Params
paramsParser = Params
<$> strOption
( long "instance-id"
<> metavar "ID" )
<*> strOption
( long "config"
<> metavar "FILEPATH" )
<*> strOption
( long "state"
<> metavar "FILEPATH" )
<*> strOption
( long "broker"
<> metavar "BROKER_ENDPOINT" )
<*> strOption
( long "quotesource"
<> metavar "QUOTESOURCE_ENDPOINT" )
<*> optional ( strOption
( long "history-provider-type"
<> metavar "TYPE/ID" ))
<*> optional ( strOption
( long "history-provider"
<> metavar "ENDPOINT/ID" ))
<*> optional ( strOption
( long "redis-socket"
<> metavar "ADDRESS" ))
<*> optional ( strOption
( long "qtis"
<> metavar "ENDPOINT/ID" ))
<*> strOption
( long "account"
<> metavar "ACCOUNT" )
<*> option auto
( long "volume"
<> metavar "VOLUME" )
<*> optional ( option auto
( long "source-timeframe"
<> metavar "SECONDS" ))
data BigConfig c = BigConfig {
confTickers :: [Ticker],
strategyConfig :: c
}
instance (FromJSON c) => FromJSON (BigConfig c) where
parseJSON = withObject "object" (\obj -> BigConfig <$>
obj .: "tickers" <*>
obj .: "params")
instance (ToJSON c) => ToJSON (BigConfig c) where
toJSON conf = object ["tickers" .= confTickers conf,
"params" .= strategyConfig conf ]
storeState :: (ToJSON s) => Params -> IORef s -> IORef [UTCTime] -> IO ()
storeState params stateRef timersRef = do
currentStrategyState <- readIORef stateRef
currentTimersState <- readIORef timersRef
case redisSocket params of
Nothing -> withFile (strategyStateFile params) WriteMode (\f -> BS.hPut f $ BL.toStrict $ encode currentStrategyState)
`catch` (\e -> warningM "main" ("Unable to save state: " ++ show (e :: IOException)))
Just sock -> do
#ifdef linux_HOST_OS
conn <- checkedConnect $ defaultConnectInfo { connectPort = UnixSocket sock }
now <- getPOSIXTime
res <- runRedis conn $ mset [(encodeUtf8 $ T.pack $ instanceId params, BL.toStrict $ encode currentStrategyState),
(encodeUtf8 $ T.pack $ instanceId params ++ ":last_store", encodeUtf8 $ T.pack $ show now),
(encodeUtf8 $ T.pack $ instanceId params ++ ":timers", BL.toStrict $ encode currentTimersState) ]
case res of
Left _ -> warningM "main" "Unable to save state"
Right _ -> return ()
#else
return ()
#endif
gracefulShutdown :: (ToJSON s) => Params -> IORef s -> IORef [UTCTime] -> MVar () -> Signal -> IO ()
gracefulShutdown params stateRef timersRef shutdownMv _ = do
infoM "main" "Shutdown, saving state"
storeState params stateRef timersRef
putMVar shutdownMv ()
exitSuccess
robotMain :: (ToJSON s, FromJSON s, FromJSON c) => DiffTime -> s -> Maybe (InitializationCallback c) -> EventCallback c s -> IO ()
robotMain dataDownloadDelta defaultState initCallback callback = do
params <- execParser opts
initLogging params
infoM "main" "Starting"
(tickerList, config) <- loadStrategyConfig params
stratState <- loadStrategyState params
timersState <- loadStrategyTimers params
let instanceParams = StrategyInstanceParams {
strategyInstanceId = T.pack . instanceId $ params,
strategyAccount = T.pack . accountId $ params,
strategyVolume = volumeFactor params,
tickers = tickerList,
strategyQuotesourceEp = T.pack . quotesourceEp $ params,
strategyBrokerEp = T.pack . brokerEp $ params,
strategyHistoryProviderType = T.pack $ fromMaybe "finam" $ historyProviderType params,
strategyHistoryProvider = T.pack $ fromMaybe "" $ historyProvider params,
strategyQTISEp = T.pack <$> qtisSocket params}
updatedConfig <- case initCallback of
Just cb -> cb config instanceParams
Nothing -> return config
let strategy = mkBarStrategy instanceParams dataDownloadDelta updatedConfig stratState callback
stateRef <- newIORef stratState
configRef <- newIORef config
timersRef <- newIORef timersState
shutdownMv <- newEmptyMVar
installHandler sigINT (gracefulShutdown params stateRef timersRef shutdownMv)
installHandler sigTERM (gracefulShutdown params stateRef timersRef shutdownMv)
randsec <- getStdRandom(randomR(1, 10))
threadDelay $ randsec * 1000000
debugM "main" "Forking state saving thread"
stateSavingThread <- forkIO $ forever $ do
threadDelay 1000000
storeState params stateRef timersRef
debugM "main" "Starting strategy driver"
barStrategyDriver (sourceBarTimeframe params) tickFilter strategy configRef stateRef timersRef shutdownMv `finally` killThread stateSavingThread
where
tickFilter :: Tick -> Bool
tickFilter tick =
let classCode = T.takeWhile (/= '#') (security tick) in
if
| classCode == "SPBFUT" || classCode == "SPBOPT" -> any (inInterval . utctDayTime . timestamp $ tick) fortsIntervals
| otherwise -> any (inInterval . utctDayTime . timestamp $ tick) secIntervals
fortsIntervals = [(fromHMS 7 0 0, fromHMS 11 0 0), (fromHMS 11 5 0, fromHMS 15 45 0), (fromHMS 16 0 0, fromHMS 20 50 0)]
secIntervals = [(fromHMS 6 50 0, fromHMS 15 51 0)]
fromHMS h m s = h * 3600 + m * 60 + s
inInterval ts (start, end) = ts >= start && ts <= end
opts = info (helper <*> paramsParser)
( fullDesc <> header "ATrade strategy execution framework" )
initLogging params = do
handler <- streamHandler stderr DEBUG >>=
(\x -> return $
setFormatter x (simpleLogFormatter $
"$utcTime\t[" ++ instanceId params ++ "]\t\t{$loggername}\t\t<$prio> -> $msg"))
hSetBuffering stderr LineBuffering
updateGlobalLogger rootLoggerName (setLevel DEBUG)
updateGlobalLogger rootLoggerName (setHandlers [handler])
loadStrategyConfig params = withFile (strategyConfigFile params) ReadMode (\f -> do
bigconfig <- eitherDecode . BL.fromStrict <$> BS.hGetContents f
case bigconfig of
Right conf -> return (confTickers conf, strategyConfig conf)
Left errmsg -> throw $ UnableToLoadConfig $ (T.pack . show) errmsg)
loadStrategyTimers :: Params -> IO [UTCTime]
loadStrategyTimers params = case redisSocket params of
Nothing -> return []
Just sock -> do
#ifdef linux_HOST_OS
conn <- checkedConnect $ defaultConnectInfo { connectPort = UnixSocket sock }
res <- runRedis conn $ get (encodeUtf8 $ T.pack $ instanceId params ++ ":timers")
case res of
Left _ -> do
warningM "main" "Unable to load state"
return []
Right mv -> case mv of
Just v -> case eitherDecode $ BL.fromStrict v of
Left _ -> do
warningM "main" "Unable to load state"
return []
Right s -> return s
Nothing -> do
warningM "main" "Unable to load state"
return []
#else
error "Not implemented"
#endif
loadStrategyState params = case redisSocket params of
Nothing -> loadStateFromFile (strategyStateFile params)
Just sock -> do
#ifdef linux_HOST_OS
conn <- checkedConnect $ defaultConnectInfo { connectPort = UnixSocket sock }
res <- runRedis conn $ get (encodeUtf8 $ T.pack $ instanceId params)
case res of
Left _ -> do
warningM "main" "Unable to load state"
return defaultState
Right mv -> case mv of
Just v -> case eitherDecode $ BL.fromStrict v of
Left _ -> do
warningM "main" "Unable to load state"
return defaultState
Right s -> return s
Nothing -> do
warningM "main" "Unable to load state"
return defaultState
#else
error "Not implemented"
#endif
loadStateFromFile filepath = withFile filepath ReadMode (\f -> do
maybeState <- decode . BL.fromStrict <$> BS.hGetContents f
case maybeState of
Just st -> return st
Nothing -> return defaultState ) `catch`
(\e -> warningM "main" ("Unable to load state: " ++ show (e :: IOException)) >> return defaultState)
-- | Helper function to make 'Strategy' instances
mkBarStrategy :: StrategyInstanceParams -> DiffTime -> c -> s -> EventCallback c s -> Strategy c s
mkBarStrategy instanceParams dd params initialState cb = BarStrategy {
downloadDelta = dd,
eventCallback = cb,
currentState = initialState,
strategyParams = params,
strategyTimers = [],
strategyInstanceParams = instanceParams }
-- | Main function which handles incoming events (ticks/orders), passes them to strategy callback
-- and executes returned strategy actions
barStrategyDriver :: Maybe Int -> (Tick -> Bool) -> Strategy c s -> IORef c -> IORef s -> IORef [UTCTime] -> MVar () -> IO ()
barStrategyDriver mbSourceTimeframe tickFilter strategy configRef stateRef timersRef shutdownVar = do
-- Make channels
-- Event channel is for strategy events, like new tick arrival, or order execution notification
eventChan <- BC.newBoundedChan 1000
-- Orders channel passes strategy orders to broker thread
ordersChan <- BC.newBoundedChan 1000
withContext (\ctx -> do
-- Load tickers data and create BarAggregator from them
historyBars <-
if
| (strategyHistoryProviderType . strategyInstanceParams) strategy == "finam" ->
M.fromList <$> mapM loadTickerFromFinam (tickers . strategyInstanceParams $ strategy)
| (strategyHistoryProviderType . strategyInstanceParams) strategy == "hap" ->
M.fromList <$> mapM (loadTickerFromHAP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy)) (tickers . strategyInstanceParams $ strategy)
| otherwise ->
M.fromList <$> mapM (loadTickerFromQHP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy)) (tickers . strategyInstanceParams $ strategy)
agg <- newIORef $ mkAggregatorFromBars historyBars [(hmsToDiffTime 6 50 0, hmsToDiffTime 21 0 0)]
bracket (startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter mbSourceTimeframe) killThread (\_ -> do
debugM "Strategy" "QuoteSource thread forked"
bracket (startBrokerClientThread (strategyInstanceId . strategyInstanceParams $ strategy) ctx brEp ordersChan eventChan shutdownVar) killThread (\_ -> do
debugM "Strategy" "Broker thread forked"
wakeupTid <- forkIO $ forever $ do
maybeShutdown <- tryTakeMVar shutdownVar
if isJust maybeShutdown
then writeChan eventChan Shutdown
else do
threadDelay 1000000
writeChan ordersChan BrokerRequestNotifications
debugM "Strategy" "Wakeup thread forked"
let env = StrategyEnvironment {
seInstanceId = strategyInstanceId . strategyInstanceParams $ strategy,
seAccount = strategyAccount . strategyInstanceParams $ strategy,
seVolume = strategyVolume . strategyInstanceParams $ strategy,
seBars = M.empty,
seLastTimestamp = UTCTime (fromGregorian 1970 1 1) 0
}
readAndHandleEvents agg ordersChan eventChan strategy env
debugM "Strategy" "Stopping strategy driver"
killThread wakeupTid)))
debugM "Strategy" "Strategy done"
where
qsEp = strategyQuotesourceEp . strategyInstanceParams $ strategy
brEp = strategyBrokerEp . strategyInstanceParams $ strategy
readAndHandleEvents agg ordersChan eventChan strategy' env = do
event <- readChan eventChan
if event /= Shutdown
then do
currentBars <- bars <$> readIORef agg
params <- readIORef configRef
curState <- readIORef stateRef
let instId = strategyInstanceId . strategyInstanceParams $ strategy'
let acc = strategyAccount . strategyInstanceParams $ strategy'
let vol = strategyVolume . strategyInstanceParams $ strategy'
let oldTimestamp = seLastTimestamp env
let newTimestamp = case event of
NewTick tick -> timestamp tick
_ -> seLastTimestamp env
newTimers <- catMaybes <$> (mapM (checkTimer eventChan newTimestamp) $ strategyTimers strategy')
let !newenv = env { seBars = currentBars, seLastTimestamp = newTimestamp }
let (!newState, !actions, _) = runStrategyElement params curState newenv $ (eventCallback strategy) event
writeIORef stateRef newState
writeIORef timersRef newTimers
newTimers' <- catMaybes <$> mapM handleTimerActions actions
mapM_ (handleActions ordersChan) actions
readAndHandleEvents agg ordersChan eventChan (strategy' { currentState = newState, strategyTimers = newTimers ++ newTimers' }) newenv
else debugM "Strategy" "Shutdown requested"
where
handleTimerActions action =
case action of
ActionSetupTimer timerTime -> return $ Just timerTime
_ -> return Nothing
handleActions ordersChan' action =
case action of
(ActionLog logText) -> debugM "Strategy" $ T.unpack logText
(ActionOrder order) -> writeChan ordersChan' $ BrokerSubmitOrder order
(ActionCancelOrder oid) -> writeChan ordersChan' $ BrokerCancelOrder oid
(ActionSetupTimer _) -> return ()
(ActionIO tag io) -> void $ forkIO $ do
v <- io
writeChan eventChan (ActionCompleted tag v)
checkTimer eventChan' newTimestamp timerTime =
if newTimestamp >= timerTime
then do
writeChan eventChan' $ TimerFired timerTime
return Nothing
else
return $ Just timerTime
loadTickerFromHAP :: Context -> T.Text -> Ticker -> IO (TickerId, BarSeries)
loadTickerFromHAP ctx ep t = do
debugM "Strategy" $ "Loading ticker from HAP: " ++ show (code t)
case parseHAPPeriod $ timeframeSeconds t of
Just tf -> do
now <- getCurrentTime
historyBars <- QH.getQuotes ctx QH.RequestParams {
QH.endpoint = ep,
QH.ticker = code t,
QH.startDate = addUTCTime (negate . (1 +) . fromRational . toRational $ downloadDelta strategy) now,
QH.endDate = now,
QH.period = tf }
debugM "Strategy" $ "Obtained " ++ show (length historyBars) ++ " bars"
return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = historyBars })
_ -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = [] })
loadTickerFromQHP :: Context -> T.Text -> Ticker -> IO (TickerId, BarSeries)
loadTickerFromQHP ctx ep t = do
debugM "Strategy" $ "Loading ticker from QHP: " ++ show (code t)
case parseQHPPeriod $ timeframeSeconds t of
Just tf -> do
now <- getCurrentTime
historyBars <- QQ.getQuotes ctx QQ.RequestParams {
QQ.endpoint = ep,
QQ.ticker = code t,
QQ.startDate = addDays (negate . (1 +) . ceiling $ downloadDelta strategy / 86400) (utctDay now),
QQ.endDate = utctDay now,
QQ.period = tf }
debugM "Strategy" $ "Obtained " ++ show (length historyBars) ++ " bars"
debugM "Strategy" $ show (take 20 historyBars)
return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = historyBars })
_ -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = [] })
loadTickerFromFinam :: Ticker -> IO (TickerId, BarSeries)
loadTickerFromFinam t = do
randDelay <- getStdRandom (randomR (1, 5))
threadDelay $ randDelay * 1000000
now <- getCurrentTime
debugM "Strategy" $ show (L.lookup "finam" (aliases t), parseFinamPeriod $ timeframeSeconds t)
case (L.lookup "finam" (aliases t), parseFinamPeriod $ timeframeSeconds t) of
(Just finamCode, Just per) -> do
debugM "Strategy" $ "Downloading ticker: " ++ finamCode
history <- downloadAndParseQuotes $ defaultParams { QF.ticker = T.pack finamCode,
QF.startDate = addDays (negate . (1 +) . ceiling $ downloadDelta strategy / 86400) (utctDay now),
QF.endDate = utctDay now,
QF.period = per }
case history of
Just h -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = convertFromFinamHistory (code t) h })
Nothing -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = [] })
_ -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = [] })
convertFromFinamHistory :: TickerId -> [Row] -> [Bar]
convertFromFinamHistory tid = L.reverse . fmap (\row -> Bar { barSecurity = tid,
barTimestamp = rowTime row,
barOpen = rowOpen row,
barHigh = rowHigh row,
barLow = rowLow row,
barClose = rowClose row,
barVolume = rowVolume row })
parseFinamPeriod x
| x == 0 = Just QF.PeriodTick
| x == 60 = Just QF.Period1Min
| x == 5 * 60 = Just QF.Period5Min
| x == 10 * 60 = Just QF.Period10Min
| x == 15 * 60 = Just QF.Period15Min
| x == 30 * 60 = Just QF.Period30Min
| x == 60 * 60 = Just QF.PeriodHour
| x == 24 * 60 * 60 = Just QF.PeriodDay
| otherwise = Nothing
parseQHPPeriod x
| x == 60 = Just QQ.Period1Min
| x == 5 * 60 = Just QQ.Period5Min
| x == 15 * 60 = Just QQ.Period15Min
| x == 30 * 60 = Just QQ.Period30Min
| x == 60 * 60 = Just QQ.PeriodHour
| x == 24 * 60 * 60 = Just QQ.PeriodDay
| otherwise = Nothing
parseHAPPeriod x
| x == 60 = Just QH.Period1Min
| x == 5 * 60 = Just QH.Period5Min
| x == 15 * 60 = Just QH.Period15Min
| x == 30 * 60 = Just QH.Period30Min
| x == 60 * 60 = Just QH.PeriodHour
| x == 24 * 60 * 60 = Just QH.PeriodDay
| otherwise = Nothing