@ -1,62 +1,69 @@
@@ -1,62 +1,69 @@
{- # LANGUAGE OverloadedStrings # -}
{- # LANGUAGE MultiWayIf # -}
{- # LANGUAGE BangPatterns # -}
{- # LANGUAGE CPP # -}
{- # LANGUAGE RankNTypes # -}
{- # LANGUAGE TypeApplications # -}
{- # LANGUAGE TypeSynonymInstances # -}
{- # LANGUAGE FlexibleInstances # -}
{- # LANGUAGE CPP # -}
{- # LANGUAGE DeriveGeneric # -}
{- # LANGUAGE FlexibleContexts # -}
{- # LANGUAGE FlexibleInstances # -}
{- # LANGUAGE MultiParamTypeClasses # -}
{- # LANGUAGE MultiWayIf # -}
{- # LANGUAGE OverloadedStrings # -}
{- # LANGUAGE RankNTypes # -}
module ATrade.Driver.Real (
Strategy ( .. ) ,
StrategyInstanceParams ( .. ) ,
robotMain ,
BigConfig ( .. ) ,
mkBarStrategy ,
barStrategyDriver
) where
import Options.Applicative
import System.IO
import System.Signal
import System.Exit
import System.Random
import System.Log.Logger
import System.Log.Handler.Simple
import System.Log.Handler ( setFormatter )
import System.Log.Formatter
import Control.Monad
import Control.Monad.IO.Class
import Control.Concurrent hiding ( writeChan , readChan , writeList2Chan , yield )
import Control.Concurrent.BoundedChan as BC
import Control.Exception
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BL
import qualified Data.List as L
import qualified Data.Map as M
import qualified Data.Text as T
import Data.Text.Encoding
import Data.Aeson
import Data.IORef
import Data.Time.Calendar
import Data.Time.Clock
import Data.Time.Clock.POSIX
import Data.Maybe
import Data.Monoid
import Database.Redis hiding ( info , decode )
import ATrade.Types
import ATrade.RoboCom.Monad ( StrategyMonad , StrategyAction ( .. ) , EventCallback , Event ( .. ) , runStrategyElement , StrategyEnvironment ( .. ) , Event ( .. ) , MonadRobot ( .. ) )
import ATrade.BarAggregator
import ATrade.Driver.Real.BrokerClientThread
import ATrade.Driver.Real.QuoteSourceThread
import ATrade.Driver.Real.Types ( Strategy ( .. ) , StrategyInstanceParams ( .. ) , InitializationCallback )
import ATrade.RoboCom.Types ( BarSeries ( .. ) , Ticker ( .. ) , Timeframe ( .. ) )
import ATrade.Exceptions
import ATrade.Quotes.Finam as QF
import ATrade.Quotes.QHP as QQ
import ATrade.Quotes.HAP as QH
import System.ZMQ4 hiding ( Event ( .. ) )
import ATrade.BarAggregator
import ATrade.Driver.Real.BrokerClientThread
import ATrade.Driver.Real.QuoteSourceThread
import ATrade.Driver.Types ( InitializationCallback , StrategyInstanceParams ( .. ) )
import ATrade.Exceptions
import ATrade.Quotes ( MonadHistory ( .. ) , MonadInstrumentParametersSource ( .. ) )
import ATrade.Quotes.QHP as QQ
import ATrade.Quotes.QTIS ( TickerInfo ( .. ) ,
qtisGetTickersInfo )
import ATrade.RoboCom.Monad ( Event ( .. ) ,
EventCallback ,
MonadRobot ( .. ) ,
StrategyEnvironment ( .. ) ,
seBars , seLastTimestamp )
import ATrade.RoboCom.Types ( BarSeries ( .. ) , InstrumentParameters ( .. ) ,
Ticker ( .. ) ,
Timeframe ( .. ) )
import ATrade.RoboCom.Utils ( fromHMS )
import ATrade.Types
import Control.Concurrent hiding ( readChan ,
writeChan ,
writeList2Chan , yield )
import Control.Concurrent.BoundedChan as BC
import Control.Exception.Safe
import Control.Lens hiding ( Context , ( .= ) )
import Control.Monad
import Control.Monad.Reader
import Data.Aeson
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BL
import Data.IORef
import qualified Data.Map as M
import Data.Maybe
import qualified Data.Text as T
import Data.Text.Encoding
import qualified Data.Text.Lazy as TL
import Data.Time.Calendar
import Data.Time.Clock
import Data.Time.Clock.POSIX
import Database.Redis hiding ( decode , info )
import GHC.Generics
import Options.Applicative
import System.Exit
import System.IO
import System.Log.Formatter
import System.Log.Handler ( setFormatter )
import System.Log.Handler.Simple
import System.Log.Logger
import System.Signal
import System.ZMQ4 hiding ( Event ( .. ) )
import Ether.Reader
@ -106,18 +113,18 @@ instance (MonadRobot (RealDriver c s) c s) where
@@ -106,18 +113,18 @@ instance (MonadRobot (RealDriver c s) c s) where
getEnvironment = asks @ RDriverEnv environmentRef >>= liftIO . readIORef
data Params = Params {
instanceId :: String ,
strategyConfigFile :: FilePath ,
strategyStateFile :: FilePath ,
brokerEp :: String ,
quotesourceEp :: String ,
instanceId :: String ,
strategyConfigFile :: FilePath ,
strategyStateFile :: FilePath ,
brokerEp :: String ,
quotesourceEp :: String ,
historyProviderType :: Maybe String ,
historyProvider :: Maybe String ,
redisSocket :: Maybe String ,
qtisSocket :: Maybe String ,
accountId :: String ,
volumeFactor :: Int ,
sourceBarTimeframe :: Maybe Int
historyProvider :: Maybe String ,
redisSocket :: Maybe String ,
qtisEndpoint :: String ,
accountId :: String ,
volumeFactor :: Int ,
sourceBarTimeframe :: Maybe Int
} deriving ( Show , Eq )
paramsParser :: Parser Params
@ -146,9 +153,9 @@ paramsParser = Params
@@ -146,9 +153,9 @@ paramsParser = Params
<*> optional ( strOption
( long " redis-socket "
<> metavar " ADDRESS " ) )
<*> optional ( strOption
<*> strOption
( long " qtis "
<> metavar " ENDPOINT/ID " ) )
<> metavar " ENDPOINT/ID " )
<*> strOption
( long " account "
<> metavar " ACCOUNT " )
@ -159,9 +166,79 @@ paramsParser = Params
@@ -159,9 +166,79 @@ paramsParser = Params
( long " source-timeframe "
<> metavar " SECONDS " ) )
data Env historySource c s = Env {
envZeromqContext :: Context ,
envHistorySource :: historySource ,
envQtisEndpoint :: T . Text ,
envStrategyInstanceParams :: StrategyInstanceParams ,
envStrategyEnvironment :: IORef StrategyEnvironment ,
envConfigRef :: IORef c ,
envStateRef :: IORef s ,
envBrokerChan :: BC . BoundedChan BrokerCommand ,
envTimers :: IORef [ UTCTime ] ,
envEventChan :: BC . BoundedChan Event ,
envAggregator :: IORef BarAggregator ,
envLastTimestamp :: IORef UTCTime
} deriving ( Generic )
type App historySource c s = ReaderT ( Env historySource c s ) IO
instance MonadRobot ( App historySource c s ) c s where
submitOrder order = do
bc <- asks envBrokerChan
lift $ BC . writeChan bc $ BrokerSubmitOrder order
cancelOrder oId = do
bc <- asks envBrokerChan
lift $ BC . writeChan bc $ BrokerCancelOrder oId
appendToLog = lift . debugM " Strategy " . T . unpack . TL . toStrict
setupTimer t = do
timers <- asks envTimers
lift $ atomicModifyIORef' timers ( \ s -> ( t : s , () ) )
enqueueIOAction actionId action' = do
eventChan <- asks envEventChan
lift $ void $ forkIO $ do
v <- action'
BC . writeChan eventChan $ ActionCompleted actionId v
getConfig = asks envConfigRef >>= lift . readIORef
getState = asks envStateRef >>= lift . readIORef
setState s = do
ref <- asks envStateRef
lift $ writeIORef ref s
getEnvironment = do
aggRef <- asks envAggregator
envRef <- asks envStrategyEnvironment
agg <- lift $ readIORef aggRef
env <- lift $ readIORef envRef
nowRef <- asks envLastTimestamp
now <- lift $ readIORef nowRef
return $ env & seBars .~ bars agg & seLastTimestamp .~ now
instance MonadHistory ( App QQ . QHPHandle c s ) where
getHistory tickerId timeframe fromTime toTime = do
qhp <- asks envHistorySource
QQ . requestHistoryFromQHP qhp tickerId timeframe fromTime toTime
instance MonadInstrumentParametersSource ( App hs c s ) where
getInstrumentParameters tickerIds = do
ctx <- asks envZeromqContext
ep <- asks envQtisEndpoint
info <- liftIO $ qtisGetTickersInfo ctx ep tickerIds
return $ ( tiTicker info , convert info )
where
convert info = InstrumentParameters
{
ipLotSize = fromInteger $ tiLotSize info ,
ipTickSize = tiTickSize info
}
data BigConfig c = BigConfig {
confTickers :: [ Ticker ] ,
confTickers :: [ Ticker ] ,
strategyConfig :: c
}
@ -182,7 +259,7 @@ storeState params stateRef timersRef = do
@@ -182,7 +259,7 @@ storeState params stateRef timersRef = do
Nothing -> withFile ( strategyStateFile params ) WriteMode ( \ f -> BS . hPut f $ BL . toStrict $ encode currentStrategyState )
` catch ` ( \ e -> warningM " main " ( " Unable to save state: " ++ show ( e :: IOException ) ) )
Just sock -> do
# ifdef linux_HOST_OS
conn <- checkedConnect $ defaultConnectInfo { connectPort = UnixSocket sock }
now <- getPOSIXTime
res <- runRedis conn $ mset [ ( encodeUtf8 $ T . pack $ instanceId params , BL . toStrict $ encode currentStrategyState ) ,
@ -190,12 +267,8 @@ storeState params stateRef timersRef = do
@@ -190,12 +267,8 @@ storeState params stateRef timersRef = do
( encodeUtf8 $ T . pack $ instanceId params ++ " :timers " , BL . toStrict $ encode currentTimersState ) ]
case res of
Left _ -> warningM " main " " Unable to save state "
Left _ -> warningM " main " " Unable to save state "
Right _ -> return ()
# else
return ()
# endif
gracefulShutdown :: ( ToJSON s ) => Params -> IORef s -> IORef [ UTCTime ] -> MVar () -> Signal -> IO ()
gracefulShutdown params stateRef timersRef shutdownMv _ = do
@ -204,8 +277,8 @@ gracefulShutdown params stateRef timersRef shutdownMv _ = do
@@ -204,8 +277,8 @@ gracefulShutdown params stateRef timersRef shutdownMv _ = do
putMVar shutdownMv ()
exitSuccess
robotMain :: ( ToJSON s , FromJSON s , FromJSON c ) => DiffTime -> s -> Maybe ( InitializationCallback c ) -> EventCallback c s -> IO ()
robotMain dataDownloadDelta defaultState initCallback callback = do
robotMain :: ( ToJSON s , FromJSON s , FromJSON c ) => DiffTime -> s -> EventCallback c s -> IO ()
robotMain dataDownloadDelta defaultState callback = do
params <- execParser opts
initLogging params
infoM " main " " Starting "
@ -219,43 +292,70 @@ robotMain dataDownloadDelta defaultState initCallback callback = do
@@ -219,43 +292,70 @@ robotMain dataDownloadDelta defaultState initCallback callback = do
strategyAccount = T . pack . accountId $ params ,
strategyVolume = volumeFactor params ,
tickers = tickerList ,
strategyQuotesourceEp = T . pack . quotesourceEp $ params ,
strategyBrokerEp = T . pack . brokerEp $ params ,
strategyHistoryProviderType = T . pack $ fromMaybe " finam " $ historyProviderType params ,
strategyHistoryProvider = T . pack $ fromMaybe " " $ historyProvider params ,
strategyQTISEp = T . pack <$> qtisSocket params }
strategyQTISEp = Nothing }
updatedConfig <- case initCallback of
Just cb -> cb config instanceParams
Nothing -> return config
let strategy = mkBarStrategy instanceParams dataDownloadDelta updatedConfig stratState callback
stateRef <- newIORef stratState
configRef <- newIORef config
timersRef <- newIORef timersState
shutdownMv <- newEmptyMVar
installHandler sigINT ( gracefulShutdown params stateRef timersRef shutdownMv )
installHandler sigTERM ( gracefulShutdown params stateRef timersRef shutdownMv )
randsec <- getStdRandom ( randomR ( 1 , 10 ) )
threadDelay $ randsec * 1000000
debugM " main " " Forking state saving thread "
stateSavingThread <- forkIO $ forever $ do
threadDelay 1000000
storeState params stateRef timersRef
straEnv <- newIORef StrategyEnvironment {
_seInstanceId = strategyInstanceId instanceParams ,
_seAccount = strategyAccount instanceParams ,
_seVolume = strategyVolume instanceParams ,
_seBars = M . empty ,
_seLastTimestamp = UTCTime ( fromGregorian 1970 1 1 ) 0
}
-- Event channel is for strategy events, like new tick arrival, or order execution notification
eventChan <- BC . newBoundedChan 1000
-- Orders channel passes strategy orders to broker thread
brokerChan <- BC . newBoundedChan 1000
debugM " main " " Starting strategy driver "
barStrategyDriver ( sourceBarTimeframe params ) tickFilter strategy stateRef timersRef shutdownMv ` finally ` killThread stateSavingThread
withContext ( \ ctx -> do
let qsEp = T . pack $ quotesourceEp params
let brEp = T . pack $ brokerEp params
agg <- newIORef $ mkAggregatorFromBars M . empty [ ( hmsToDiffTime 3 50 0 , hmsToDiffTime 21 10 0 ) ]
bracket ( startQuoteSourceThread ctx qsEp instanceParams eventChan agg tickFilter ( sourceBarTimeframe params ) ) killThread $ \ _ -> do
debugM " Strategy " " QuoteSource thread forked "
bracket ( startBrokerClientThread ( strategyInstanceId instanceParams ) ctx brEp brokerChan eventChan shutdownMv ) killThread $ \ _ -> do
debugM " Strategy " " Broker thread forked "
now <- getCurrentTime >>= newIORef
let env = Env {
envZeromqContext = ctx ,
envQtisEndpoint = T . pack . qtisEndpoint $ params ,
envHistorySource = mkQHPHandle ctx ( T . pack . fromMaybe " " . historyProvider $ params ) ,
envStrategyInstanceParams = instanceParams ,
envStrategyEnvironment = straEnv ,
envConfigRef = configRef ,
envStateRef = stateRef ,
envBrokerChan = brokerChan ,
envTimers = timersRef ,
envEventChan = eventChan ,
envAggregator = agg ,
envLastTimestamp = now
}
runReaderT ( barStrategyDriver dataDownloadDelta instanceParams callback shutdownMv ) env ` finally ` killThread stateSavingThread )
where
tickFilter :: Tick -> Bool
tickFilter tick =
let classCode = T . takeWhile ( /= '#' ) ( security tick ) in
if
| classCode == " SPBFUT " || classCode == " SPBOPT " -> any ( inInterval . utctDayTime . timestamp $ tick ) fortsIntervals
| otherwise -> any ( inInterval . utctDayTime . timestamp $ tick ) secIntervals
if classCode == " SPBFUT " || classCode == " SPBOPT "
then any ( inInterval . utctDayTime . timestamp $ tick ) fortsIntervals
else any ( inInterval . utctDayTime . timestamp $ tick ) secIntervals
fortsIntervals = [ ( fromHMS 7 0 0 , fromHMS 11 0 0 ) , ( fromHMS 11 5 0 , fromHMS 15 45 0 ) , ( fromHMS 16 0 0 , fromHMS 20 50 0 ) ]
fortsIntervals = [ ( fromHMS 4 0 0 , fromHMS 11 0 0 ) , ( fromHMS 11 5 0 , fromHMS 15 45 0 ) , ( fromHMS 16 0 0 , fromHMS 20 50 0 ) ]
secIntervals = [ ( fromHMS 6 50 0 , fromHMS 15 51 0 ) ]
fromHMS h m s = h * 3600 + m * 60 + s
inInterval ts ( start , end ) = ts >= start && ts <= end
opts = info ( helper <*> paramsParser )
@ -274,16 +374,16 @@ robotMain dataDownloadDelta defaultState initCallback callback = do
@@ -274,16 +374,16 @@ robotMain dataDownloadDelta defaultState initCallback callback = do
loadStrategyConfig params = withFile ( strategyConfigFile params ) ReadMode ( \ f -> do
bigconfig <- eitherDecode . BL . fromStrict <$> BS . hGetContents f
case bigconfig of
Right conf -> return ( confTickers conf , strategyConfig conf )
Right conf -> return ( confTickers conf , strategyConfig conf )
Left errmsg -> throw $ UnableToLoadConfig $ ( T . pack . show ) errmsg )
loadStrategyTimers :: Params -> IO [ UTCTime ]
loadStrategyTimers params = case redisSocket params of
Nothing -> return []
Just sock -> do
# ifdef linux_HOST_OS
conn <- checkedConnect $ defaultConnectInfo { connectPort = UnixSocket sock }
res <- runRedis conn $ get ( encodeUtf8 $ T . pack $ instanceId params ++ " timers " )
res <- runRedis conn $ get ( encodeUtf8 $ T . pack $ instanceId params ++ " : timers" )
case res of
Left _ -> do
warningM " main " " Unable to load state "
@ -297,15 +397,11 @@ robotMain dataDownloadDelta defaultState initCallback callback = do
@@ -297,15 +397,11 @@ robotMain dataDownloadDelta defaultState initCallback callback = do
Nothing -> do
warningM " main " " Unable to load state "
return []
# else
error " Not implemented "
# endif
loadStrategyState params = case redisSocket params of
Nothing -> loadStateFromFile ( strategyStateFile params )
Just sock -> do
# ifdef linux_HOST_OS
conn <- checkedConnect $ defaultConnectInfo { connectPort = UnixSocket sock }
res <- runRedis conn $ get ( encodeUtf8 $ T . pack $ instanceId params )
case res of
@ -321,10 +417,7 @@ robotMain dataDownloadDelta defaultState initCallback callback = do
@@ -321,10 +417,7 @@ robotMain dataDownloadDelta defaultState initCallback callback = do
Nothing -> do
warningM " main " " Unable to load state "
return defaultState
# else
error " Not implemented "
# endif
loadStateFromFile filepath = withFile filepath ReadMode ( \ f -> do
maybeState <- decode . BL . fromStrict <$> BS . hGetContents f
case maybeState of
@ -332,43 +425,27 @@ robotMain dataDownloadDelta defaultState initCallback callback = do
@@ -332,43 +425,27 @@ robotMain dataDownloadDelta defaultState initCallback callback = do
Nothing -> return defaultState ) ` catch `
( \ e -> warningM " main " ( " Unable to load state: " ++ show ( e :: IOException ) ) >> return defaultState )
-- | Helper function to make 'Strategy' instances
mkBarStrategy :: StrategyInstanceParams -> DiffTime -> c -> s -> EventCallback c s -> Strategy c s
mkBarStrategy instanceParams dd params initialState cb = BarStrategy {
downloadDelta = dd ,
eventCallback = cb ,
currentState = initialState ,
strategyParams = params ,
strategyTimers = [] ,
strategyInstanceParams = instanceParams }
-- | Main function which handles incoming events (ticks/orders), passes them to strategy callback
-- and executes returned strategy actions
barStrategyDriver :: Maybe Int -> ( Tick -> Bool ) -> Strategy c s -> IORef s -> IORef [ UTCTime ] -> MVar () -> IO ()
barStrategyDriver mbSourceTimeframe tickFilter strategy stateRef timersRef shutdownVar = do
-- Make channels
-- Event channel is for strategy events, like new tick arrival, or order execution notification
eventChan <- BC . newBoundedChan 1000
-- Orders channel passes strategy orders to broker thread
ordersChan <- BC . newBoundedChan 1000
withContext ( \ ctx -> do
-- Load tickers data and create BarAggregator from them
historyBars <-
if
| ( strategyHistoryProviderType . strategyInstanceParams ) strategy == " finam " ->
M . fromList <$> mapM loadTickerFromFinam ( tickers . strategyInstanceParams $ strategy )
| ( strategyHistoryProviderType . strategyInstanceParams ) strategy == " hap " ->
M . fromList <$> mapM ( loadTickerFromHAP ctx ( ( strategyHistoryProvider . strategyInstanceParams ) strategy ) ) ( tickers . strategyInstanceParams $ strategy )
| otherwise ->
M . fromList <$> mapM ( loadTickerFromQHP ctx ( ( strategyHistoryProvider . strategyInstanceParams ) strategy ) ) ( tickers . strategyInstanceParams $ strategy )
agg <- newIORef $ mkAggregatorFromBars historyBars [ ( hmsToDiffTime 6 50 0 , hmsToDiffTime 21 0 0 ) ]
bracket ( startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter mbSourceTimeframe ) killThread ( \ _ -> do
debugM " Strategy " " QuoteSource thread forked "
bracket ( startBrokerClientThread ( strategyInstanceId . strategyInstanceParams $ strategy ) ctx brEp ordersChan eventChan shutdownVar ) killThread ( \ _ -> do
debugM " Strategy " " Broker thread forked "
barStrategyDriver :: ( MonadHistory ( App hs c s ) ) => DiffTime -> StrategyInstanceParams -> EventCallback c s -> MVar () -> App hs c s ()
barStrategyDriver downloadDelta instanceParams callback shutdownVar = do
now <- liftIO getCurrentTime
history <- M . fromList <$> mapM ( loadTickerHistory now ) ( tickers instanceParams )
eventChan <- asks envEventChan
brokerChan <- asks envBrokerChan
agg <- asks envAggregator
liftIO $ atomicModifyIORef' agg ( \ s -> ( replaceHistory s history , () ) )
wakeupTid <- lift . forkIO $ forever $ do
maybeShutdown <- tryTakeMVar shutdownVar
if isJust maybeShutdown
then writeChan eventChan Shutdown
else do
threadDelay 1000000
writeChan brokerChan BrokerRequestNotifications
lift $ debugM " Strategy " " Wakeup thread forked "
<<<<<<< HEAD
wakeupTid <- forkIO $ forever $ do
maybeShutdown <- tryTakeMVar shutdownVar
if isJust maybeShutdown
@ -407,10 +484,33 @@ barStrategyDriver mbSourceTimeframe tickFilter strategy stateRef timersRef shutd
@@ -407,10 +484,33 @@ barStrategyDriver mbSourceTimeframe tickFilter strategy stateRef timersRef shutd
env <- readIORef envRef
let oldTimestamp = seLastTimestamp env
=======
readAndHandleEvents agg instanceParams
lift $ debugM " Strategy " " Stopping strategy driver "
lift $ killThread wakeupTid
where
loadTickerHistory now t = do
history <- getHistory ( code t ) ( BarTimeframe ( fromInteger . timeframeSeconds $ t ) )
( ( fromRational . toRational . negate $ downloadDelta ) ` addUTCTime ` now ) now
instrumentParams <- snd <$> getInstrumentParameters ( code t )
return ( code t , BarSeries ( code t ) ( Timeframe ( timeframeSeconds t ) ) history instrumentParams )
readAndHandleEvents agg instanceParams' = do
eventChan <- asks envEventChan
event <- lift $ readChan eventChan
if event /= Shutdown
then do
env <- getEnvironment
>>>>>>> stable
let newTimestamp = case event of
NewTick tick -> timestamp tick
_ -> seLastTimestamp env
NewBar bar -> barTimestamp bar
_ -> env ^. seLastTimestamp
nowRef <- asks envLastTimestamp
lift $ writeIORef nowRef newTimestamp
<<<<<<< HEAD
newTimers <- catMaybes <$> ( readIORef timersRef >>= mapM ( checkTimer eventChan newTimestamp ) )
atomicWriteIORef timersRef newTimers
@ -419,117 +519,24 @@ barStrategyDriver mbSourceTimeframe tickFilter strategy stateRef timersRef shutd
@@ -419,117 +519,24 @@ barStrategyDriver mbSourceTimeframe tickFilter strategy stateRef timersRef shutd
readAndHandleEvents agg ordersChan eventChan strategy' envRef
else debugM " Strategy " " Shutdown requested "
=======
timersRef <- asks envTimers
oldTimers <- lift $ readIORef timersRef
newTimers <- catMaybes <$> mapM ( checkTimer eventChan newTimestamp ) oldTimers
callback event
lift $ writeIORef timersRef newTimers
readAndHandleEvents agg instanceParams'
else
lift $ debugM " Strategy " " Shutdown requested "
>>>>>>> stable
where
handleTimerActions action =
case action of
ActionSetupTimer timerTime -> return $ Just timerTime
_ -> return Nothing
handleActions ordersChan' action =
case action of
( ActionLog logText ) -> debugM " Strategy " $ T . unpack logText
( ActionOrder order ) -> writeChan ordersChan' $ BrokerSubmitOrder order
( ActionCancelOrder oid ) -> writeChan ordersChan' $ BrokerCancelOrder oid
( ActionSetupTimer _ ) -> return ()
( ActionIO tag io ) -> void $ forkIO $ do
v <- io
writeChan eventChan ( ActionCompleted tag v )
checkTimer eventChan' newTimestamp timerTime =
if newTimestamp >= timerTime
then do
writeChan eventChan' $ TimerFired timerTime
lift $ writeChan eventChan' $ TimerFired timerTime
return Nothing
else
return $ Just timerTime
loadTickerFromHAP :: Context -> T . Text -> Ticker -> IO ( TickerId , BarSeries )
loadTickerFromHAP ctx ep t = do
debugM " Strategy " $ " Loading ticker from HAP: " ++ show ( code t )
case parseHAPPeriod $ timeframeSeconds t of
Just tf -> do
now <- getCurrentTime
historyBars <- QH . getQuotes ctx QH . RequestParams {
QH . endpoint = ep ,
QH . ticker = code t ,
QH . startDate = addUTCTime ( negate . ( 1 + ) . fromRational . toRational $ downloadDelta strategy ) now ,
QH . endDate = now ,
QH . period = tf }
debugM " Strategy " $ " Obtained " ++ show ( length historyBars ) ++ " bars "
return ( code t , BarSeries { bsTickerId = code t , bsTimeframe = Timeframe ( timeframeSeconds t ) , bsBars = historyBars } )
_ -> return ( code t , BarSeries { bsTickerId = code t , bsTimeframe = Timeframe ( timeframeSeconds t ) , bsBars = [] } )
loadTickerFromQHP :: Context -> T . Text -> Ticker -> IO ( TickerId , BarSeries )
loadTickerFromQHP ctx ep t = do
debugM " Strategy " $ " Loading ticker from QHP: " ++ show ( code t )
case parseQHPPeriod $ timeframeSeconds t of
Just tf -> do
now <- getCurrentTime
historyBars <- QQ . getQuotes ctx QQ . RequestParams {
QQ . endpoint = ep ,
QQ . ticker = code t ,
QQ . startDate = addDays ( negate . ( 1 + ) . ceiling $ downloadDelta strategy / 86400 ) ( utctDay now ) ,
QQ . endDate = utctDay now ,
QQ . period = tf }
debugM " Strategy " $ " Obtained " ++ show ( length historyBars ) ++ " bars "
return ( code t , BarSeries { bsTickerId = code t , bsTimeframe = Timeframe ( timeframeSeconds t ) , bsBars = historyBars } )
_ -> return ( code t , BarSeries { bsTickerId = code t , bsTimeframe = Timeframe ( timeframeSeconds t ) , bsBars = [] } )
loadTickerFromFinam :: Ticker -> IO ( TickerId , BarSeries )
loadTickerFromFinam t = do
randDelay <- getStdRandom ( randomR ( 1 , 5 ) )
threadDelay $ randDelay * 1000000
now <- getCurrentTime
debugM " Strategy " $ show ( L . lookup " finam " ( aliases t ) , parseFinamPeriod $ timeframeSeconds t )
case ( L . lookup " finam " ( aliases t ) , parseFinamPeriod $ timeframeSeconds t ) of
( Just finamCode , Just per ) -> do
debugM " Strategy " $ " Downloading ticker: " ++ finamCode
history <- downloadAndParseQuotes $ defaultParams { QF . ticker = T . pack finamCode ,
QF . startDate = addDays ( negate . ( 1 + ) . ceiling $ downloadDelta strategy / 86400 ) ( utctDay now ) ,
QF . endDate = utctDay now ,
QF . period = per }
case history of
Just h -> return ( code t , BarSeries { bsTickerId = code t , bsTimeframe = Timeframe ( timeframeSeconds t ) , bsBars = convertFromFinamHistory ( code t ) h } )
Nothing -> return ( code t , BarSeries { bsTickerId = code t , bsTimeframe = Timeframe ( timeframeSeconds t ) , bsBars = [] } )
_ -> return ( code t , BarSeries { bsTickerId = code t , bsTimeframe = Timeframe ( timeframeSeconds t ) , bsBars = [] } )
convertFromFinamHistory :: TickerId -> [ Row ] -> [ Bar ]
convertFromFinamHistory tid = L . reverse . fmap ( \ row -> Bar { barSecurity = tid ,
barTimestamp = rowTime row ,
barOpen = rowOpen row ,
barHigh = rowHigh row ,
barLow = rowLow row ,
barClose = rowClose row ,
barVolume = rowVolume row } )
parseFinamPeriod x
| x == 0 = Just QF . PeriodTick
| x == 60 = Just QF . Period1Min
| x == 5 * 60 = Just QF . Period5Min
| x == 10 * 60 = Just QF . Period10Min
| x == 15 * 60 = Just QF . Period15Min
| x == 30 * 60 = Just QF . Period30Min
| x == 60 * 60 = Just QF . PeriodHour
| x == 24 * 60 * 60 = Just QF . PeriodDay
| otherwise = Nothing
parseQHPPeriod x
| x == 60 = Just QQ . Period1Min
| x == 5 * 60 = Just QQ . Period5Min
| x == 15 * 60 = Just QQ . Period15Min
| x == 30 * 60 = Just QQ . Period30Min
| x == 60 * 60 = Just QQ . PeriodHour
| x == 24 * 60 * 60 = Just QQ . PeriodDay
| otherwise = Nothing
parseHAPPeriod x
| x == 60 = Just QH . Period1Min
| x == 5 * 60 = Just QH . Period5Min
| x == 15 * 60 = Just QH . Period15Min
| x == 30 * 60 = Just QH . Period30Min
| x == 60 * 60 = Just QH . PeriodHour
| x == 24 * 60 * 60 = Just QH . PeriodDay
| otherwise = Nothing