Compare commits

..

No commits in common. 'master' and 'junction' have entirely different histories.

  1. 20
      robocom-zero.cabal
  2. 24
      src/ATrade/BarAggregator.hs
  3. 533
      src/ATrade/Driver/Backtest.hs
  4. 305
      src/ATrade/Driver/Junction.hs
  5. 22
      src/ATrade/Driver/Junction/BrokerService.hs
  6. 258
      src/ATrade/Driver/Junction/JunctionMonad.hs
  7. 39
      src/ATrade/Driver/Junction/ProgramConfiguration.hs
  8. 3
      src/ATrade/Driver/Junction/QuoteStream.hs
  9. 166
      src/ATrade/Driver/Junction/QuoteThread.hs
  10. 151
      src/ATrade/Driver/Junction/RemoteControl.hs
  11. 64
      src/ATrade/Driver/Junction/RobotDriverThread.hs
  12. 15
      src/ATrade/Driver/Junction/Types.hs
  13. 114
      src/ATrade/Driver/Real.hs
  14. 94
      src/ATrade/Driver/Real/BrokerClientThread.hs
  15. 3
      src/ATrade/Quotes/QTIS.hs
  16. 14
      src/ATrade/RoboCom/Monad.hs
  17. 258
      src/ATrade/RoboCom/Positions.hs
  18. 18
      src/ATrade/RoboCom/Types.hs
  19. 7
      stack.yaml
  20. 107
      test/Test/BarAggregator.hs
  21. 19
      test/Test/Driver/Junction/QuoteThread.hs
  22. 2
      test/Test/RoboCom/Indicators.hs
  23. 165
      test/Test/RoboCom/Positions.hs
  24. 4
      test/Test/RoboCom/Utils.hs

20
robocom-zero.cabal

@ -1,5 +1,5 @@
name: robocom-zero name: robocom-zero
version: 0.2.1.0 version: 0.2.0.0
-- synopsis: -- synopsis:
-- description: -- description:
homepage: https://github.com/asakul/robocom-zero#readme homepage: https://github.com/asakul/robocom-zero#readme
@ -26,7 +26,7 @@ library
, ATrade.Quotes , ATrade.Quotes
, ATrade.Quotes.QHP , ATrade.Quotes.QHP
, ATrade.Quotes.QTIS , ATrade.Quotes.QTIS
, ATrade.Driver.Backtest -- , ATrade.Driver.Backtest
, ATrade.Driver.Junction , ATrade.Driver.Junction
, ATrade.Driver.Junction.Types , ATrade.Driver.Junction.Types
, ATrade.Driver.Junction.QuoteThread , ATrade.Driver.Junction.QuoteThread
@ -39,10 +39,8 @@ library
, ATrade.Quotes.HistoryProvider , ATrade.Quotes.HistoryProvider
, ATrade.Quotes.TickerInfoProvider , ATrade.Quotes.TickerInfoProvider
other-modules: Paths_robocom_zero other-modules: Paths_robocom_zero
, ATrade.Driver.Junction.RemoteControl
, ATrade.Driver.Junction.JunctionMonad
build-depends: base >= 4.7 && < 5 build-depends: base >= 4.7 && < 5
, libatrade >= 0.16.0.0 && < 0.17.0.0 , libatrade >= 0.12.0.0 && < 0.13.0.0
, text , text
, text-icu , text-icu
, lens , lens
@ -75,10 +73,6 @@ library
, extra , extra
, co-log , co-log
, text-show , text-show
, unliftio
, conduit
, split
, cassava
default-language: Haskell2010 default-language: Haskell2010
other-modules: ATrade.Exceptions other-modules: ATrade.Exceptions
@ -93,14 +87,12 @@ test-suite robots-test
, libatrade , libatrade
, time , time
, text , text
, hedgehog
, tasty , tasty
, tasty-hunit , tasty-hunit
, tasty-golden , tasty-golden
, tasty-hedgehog
, tasty-hspec
, tasty-quickcheck
, tasty-smallcheck , tasty-smallcheck
, tasty-quickcheck
, tasty-hspec
, quickcheck-text , quickcheck-text
, quickcheck-instances , quickcheck-instances
, containers , containers
@ -109,8 +101,6 @@ test-suite robots-test
, zeromq4-haskell-zap , zeromq4-haskell-zap
, BoundedChan , BoundedChan
, mtl , mtl
, co-log-core
, co-log
ghc-options: -threaded -rtsopts -with-rtsopts=-N ghc-options: -threaded -rtsopts -with-rtsopts=-N
default-language: Haskell2010 default-language: Haskell2010
other-modules: Test.RoboCom.Indicators other-modules: Test.RoboCom.Indicators

24
src/ATrade/BarAggregator.hs

@ -72,28 +72,25 @@ handleTicks ticks aggregator = foldl f ([], aggregator) ticks
handleTick :: Tick -> BarAggregator -> (Maybe Bar, BarAggregator) handleTick :: Tick -> BarAggregator -> (Maybe Bar, BarAggregator)
handleTick tick = runState $ do handleTick tick = runState $ do
lLastTicks %= M.insert (security tick, datatype tick) tick lLastTicks %= M.insert (security tick, datatype tick) tick
timeWindows <- gets tickTimeWindows tws <- gets tickTimeWindows
mybars <- gets bars mybars <- gets bars
if any (isInTimeInterval tick) timeWindows if (any (isInTimeInterval tick) tws)
then then
case M.lookup (security tick) mybars of case M.lookup (security tick) mybars of
Just series -> case bsBars series of Just series -> case bsBars series of
(b:bs) -> do (b:bs) -> do
let timeframeInSeconds = fromIntegral . unBarTimeframe $ bsTimeframe series let currentBn = barNumber (barTimestamp b) (fromIntegral . unBarTimeframe $ bsTimeframe series)
let currentBn = barNumber (barTimestamp b) timeframeInSeconds
case datatype tick of case datatype tick of
LastTradePrice -> LastTradePrice ->
if volume tick > 0 if volume tick > 0
then then
if currentBn == barNumber (timestamp tick) timeframeInSeconds if currentBn == barNumber (timestamp tick) (fromIntegral . unBarTimeframe $ bsTimeframe series)
then do then do
lBars %= M.insert (security tick) series { bsBars = updateBar b tick : bs } lBars %= M.insert (security tick) series { bsBars = updateBar b tick : bs }
return Nothing return Nothing
else do else do
let barEndTimestamp = barEndTime b timeframeInSeconds lBars %= M.insert (security tick) series { bsBars = barFromTick tick : b : bs }
let resultingBar = b { barTimestamp = barEndTimestamp } return . Just $ b
lBars %= M.insert (security tick) series { bsBars = barFromTick tick : resultingBar : bs }
return . Just $ resultingBar
else else
return Nothing return Nothing
_ -> _ ->
@ -143,16 +140,15 @@ handleTick tick = runState $ do
updateTime :: Tick -> BarAggregator -> (Maybe Bar, BarAggregator) updateTime :: Tick -> BarAggregator -> (Maybe Bar, BarAggregator)
updateTime tick = runState $ do updateTime tick = runState $ do
lLastTicks %= M.insert (security tick, datatype tick) tick lLastTicks %= M.insert (security tick, datatype tick) tick
timeWindows <- gets tickTimeWindows tws <- gets tickTimeWindows
mybars <- gets bars mybars <- gets bars
if any (isInTimeInterval tick) timeWindows if (any (isInTimeInterval tick) tws)
then then
case M.lookup (security tick) mybars of case M.lookup (security tick) mybars of
Just series -> case bsBars series of Just series -> case bsBars series of
(b:bs) -> do (b:bs) -> do
let timeframeInSeconds = fromIntegral . unBarTimeframe $ bsTimeframe series let currentBn = barNumber (barTimestamp b) (fromIntegral . unBarTimeframe $ bsTimeframe series)
let currentBn = barNumber (barTimestamp b) timeframeInSeconds let thisBn = barNumber (timestamp tick) (fromIntegral . unBarTimeframe $ bsTimeframe series)
let thisBn = barNumber (timestamp tick) timeframeInSeconds
if if
| currentBn == thisBn -> do | currentBn == thisBn -> do
lBars %= M.insert (security tick) series { bsBars = updateBarTimestamp b tick : bs } lBars %= M.insert (security tick) series { bsBars = updateBarTimestamp b tick : bs }

533
src/ATrade/Driver/Backtest.hs

@ -13,101 +13,59 @@ module ATrade.Driver.Backtest (
backtestMain backtestMain
) where ) where
import ATrade.Driver.Junction.Types (StrategyDescriptor (StrategyDescriptor), import ATrade.Driver.Types (InitializationCallback,
StrategyDescriptorE (StrategyDescriptorE), StrategyInstanceParams (..))
TickerConfig, confStrategy, import ATrade.Exceptions
confTickers, eventCallback, import ATrade.Quotes
strategyBaseName, tickerId, import ATrade.Quotes.Finam as QF
timeframe) import ATrade.Quotes.QTIS
import ATrade.Exceptions (RoboComException (UnableToLoadConfig, UnableToLoadFeed)) import ATrade.RoboCom.Monad (Event (..), EventCallback,
import ATrade.Logging (Message, Severity (Error, Trace), MonadRobot (..),
fmtMessage, logWith) StrategyEnvironment (..),
import ATrade.Quotes.QTIS (TickerInfo (tiLotSize, tiTickSize), appendToLog, seBars, seLastTimestamp)
qtisGetTickersInfo) import ATrade.RoboCom.Positions
import ATrade.RoboCom.ConfigStorage (ConfigStorage (loadConfig)) import ATrade.RoboCom.Types (BarSeries (..), Bars, InstrumentParameters (InstrumentParameters),
import ATrade.RoboCom.Monad (Event (..), MonadRobot (..), Ticker (..), Timeframe (..))
StrategyEnvironment (..), import ATrade.Types
appendToLog, seLastTimestamp) import Conduit (awaitForever, runConduit, yield,
import ATrade.RoboCom.Types (BarSeries (..), (.|))
BarSeriesId (BarSeriesId), Bars, import Control.Exception.Safe
InstrumentParameters (InstrumentParameters), import Control.Lens hiding (ix, (<|), (|>))
Ticker (..)) import Control.Monad.ST (runST)
import ATrade.Types (Bar (Bar, barHigh, barLow, barOpen, barSecurity, barTimestamp), import Control.Monad.State
BarTimeframe (BarTimeframe), import Data.Aeson (FromJSON (..), Value (..), decode)
Operation (Buy), import Data.Aeson.Types (parseMaybe)
Order (orderAccountId, orderId, orderOperation, orderPrice, orderQuantity, orderSecurity, orderSignalId), import Data.ByteString.Lazy (readFile, toStrict)
OrderId, import Data.Default
OrderPrice (Limit, Market), import Data.HashMap.Strict (lookup)
OrderState (Cancelled, Executed, Submitted), import Data.List (partition)
Price, TickerId, Trade (..), import Data.List.Split (splitOn)
fromDouble) import qualified Data.Map.Strict as M
import Colog (LogAction, (>$<)) import Data.Sequence (Seq (..), (<|), (|>))
import Colog.Actions (logTextStdout) import qualified Data.Sequence as Seq
import Conduit (ConduitT, Void, awaitForever, import Data.STRef (newSTRef, readSTRef, writeSTRef)
runConduit, yield, (.|)) import qualified Data.Text as T
import Control.Exception.Safe (catchAny, throw) import Data.Text.IO (putStrLn)
import Control.Lens (makeLenses, use, (%=), (+=), import qualified Data.Text.Lazy as TL
(.=), (^.)) import Data.Time.Calendar (fromGregorian)
import Control.Monad.ST (runST) import Data.Time.Clock (DiffTime, UTCTime (..))
import Control.Monad.State (MonadIO, MonadPlus (mzero), import Data.Vector ((!), (!?), (//))
MonadState, MonadTrans (lift), import qualified Data.Vector as V
State, StateT (StateT), import Options.Applicative hiding (Success)
execState, forM_, gets, when) import Prelude hiding (lookup, putStrLn, readFile)
import Data.Aeson (FromJSON (..), Value (..), import Safe (headMay)
decode) import System.ZMQ4 hiding (Event)
import Data.Aeson.Types (parseMaybe)
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as B8
import Data.ByteString.Lazy (readFile, toStrict)
import qualified Data.ByteString.Lazy as BL
import Data.Csv (FromField (parseField),
FromRecord (parseRecord),
HasHeader (HasHeader), (.!))
import qualified Data.Csv as Csv
import Data.Default (Default (def))
import Data.HashMap.Strict (lookup)
import Data.IORef (newIORef)
import Data.List (partition)
import qualified Data.List as L
import Data.List.NonEmpty (NonEmpty ((:|)))
import Data.List.Split (splitOn)
import qualified Data.Map.Strict as M
import Data.Sequence (Seq (..), (<|), (|>))
import qualified Data.Sequence as Seq
import Data.STRef (newSTRef, readSTRef, writeSTRef)
import qualified Data.Text as T
import Data.Text.Encoding (decodeUtf8)
import Data.Text.IO (putStrLn)
import qualified Data.Text.Lazy as TL
import Data.Time (defaultTimeLocale, parseTimeM)
import Data.Time.Calendar (fromGregorian)
import Data.Time.Clock (UTCTime (..), addUTCTime)
import Data.Vector ((!), (!?), (//))
import qualified Data.Vector as V
import Dhall (FromDhall, auto, input)
import Options.Applicative (Alternative (some), Parser,
ReadM, eitherReader, execParser,
fullDesc, header, helper, info,
long, metavar, option, short,
strOption)
import Prelude hiding (log, lookup, putStrLn,
readFile)
import Safe (headMay)
import System.IO (IOMode (ReadMode), withFile)
import System.ZMQ4 (withContext)
data Feed = Feed TickerId FilePath data Feed = Feed TickerId FilePath
deriving (Show, Eq) deriving (Show, Eq)
data Params = Params { data Params = Params {
strategyBasename :: String,
strategyConfigFile :: FilePath, strategyConfigFile :: FilePath,
qtisEndpoint :: String, qtisEndpoint :: String,
paramsFeeds :: [Feed] paramsFeeds :: [Feed]
} deriving (Show, Eq) } deriving (Show, Eq)
data BacktestState c s = BacktestState { data BacktestState c s = BacktestState {
_descriptor :: StrategyDescriptor c s,
_cash :: Double, _cash :: Double,
_robotState :: s, _robotState :: s,
_robotParams :: c, _robotParams :: c,
@ -117,135 +75,101 @@ data BacktestState c s = BacktestState {
_tradesLog :: [Trade], _tradesLog :: [Trade],
_orderIdCounter :: Integer, _orderIdCounter :: Integer,
_pendingTimers :: [UTCTime], _pendingTimers :: [UTCTime],
_logs :: [T.Text], _logs :: [T.Text]
_barsMap :: M.Map BarSeriesId BarSeries,
_availableTickers :: NonEmpty BarSeriesId
} }
makeLenses ''BacktestState makeLenses ''BacktestState
data Row = Row {
rowTicker :: T.Text,
rowTimeframe :: Int,
rowTime :: UTCTime,
rowOpen :: Price,
rowHigh :: Price,
rowLow :: Price,
rowClose :: Price,
rowVolume :: Integer
} deriving (Show, Eq)
instance FromField Price where
parseField s = fromDouble <$> (parseField s :: Csv.Parser Double)
instance FromRecord Row where
parseRecord v
| length v == 9 = do
tkr <- v .! 0
tf <- v .! 1
date <- v .! 2
time <- v .! 3
dt <- addUTCTime (-3 * 3600) <$> parseDt date time
open <- v .! 4
high <- v .! 5
low <- v .! 6
close <- v .! 7
vol <- v .! 8
return $ Row tkr tf dt open high low close vol
| otherwise = mzero
where
parseDt :: B.ByteString -> B.ByteString -> Csv.Parser UTCTime
parseDt d t = case parseTimeM True defaultTimeLocale "%Y%m%d %H%M%S" $ B8.unpack d ++ " " ++ B8.unpack t of
Just dt -> return dt
Nothing -> fail "Unable to parse date/time"
parseQuotes :: B.ByteString -> Maybe [Row]
parseQuotes csvData = case Csv.decode HasHeader $ BL.fromStrict csvData of
Left _ -> Nothing
Right d -> Just $ V.toList d
paramsParser :: Parser Params paramsParser :: Parser Params
paramsParser = Params paramsParser = Params
<$> strOption ( <$> strOption (
long "strategy-name" <> short 'n') long "config" <> short 'c'
<*> strOption ( )
long "config" <> short 'c')
<*> strOption <*> strOption
( long "qtis" <> short 'q' <> metavar "ENDPOINT/ID") ( long "qtis" <> short 'q' <> metavar "ENDPOINT/ID" )
<*> some (option feedArgParser ( <*> some (option feedArgParser (
long "feed" <> short 'f')) long "feed" <> short 'f'
))
feedArgParser :: ReadM Feed feedArgParser :: ReadM Feed
feedArgParser = eitherReader (\s -> case splitOn ":" s of feedArgParser = eitherReader (\s -> case splitOn ":" s of
[tid, fpath] -> Right $ Feed (T.pack tid) fpath [tid, fpath] -> Right $ Feed (T.pack tid) fpath
_ -> Left $ "Unable to parse feed id: " ++ s) _ -> Left $ "Unable to parse feed id: " ++ s)
logger :: (MonadIO m) => LogAction m Message backtestMain :: (FromJSON c, StateHasPositions s) => DiffTime -> s -> EventCallback c s -> IO ()
logger = fmtMessage >$< logTextStdout backtestMain _dataDownloadDelta defaultState callback = do
backtestMain :: M.Map T.Text StrategyDescriptorE -> IO ()
backtestMain descriptors = do
params <- execParser opts params <- execParser opts
let log = logWith logger (tickerList, config) <- loadStrategyConfig params
let strategyName = T.pack $ strategyBasename params
let instanceParams = StrategyInstanceParams {
strategyInstanceId = "foo",
strategyAccount = "foo",
strategyVolume = 1,
tickers = tickerList,
strategyQTISEp = Nothing }
feeds <- loadFeeds (paramsFeeds params) feeds <- loadFeeds (paramsFeeds params)
case M.lookup strategyName descriptors of bars <- makeBars (T.pack $ qtisEndpoint params) tickerList
Just (StrategyDescriptorE desc) -> flip catchAny (\e -> log Error "Backtest" $ "Exception: " <> (T.pack . show $ e)) $
runBacktestDriver desc feeds params runBacktestDriver feeds config bars
Nothing -> log Error "Backtest" $ "Can't find strategy: " <> strategyName
where where
opts = info (helper <*> paramsParser) opts = info (helper <*> paramsParser)
( fullDesc <> header "ATrade strategy backtesting framework" ) ( fullDesc <> header "ATrade strategy backtesting framework" )
makeBars :: T.Text -> [TickerConfig] -> IO (M.Map BarSeriesId BarSeries) makeBars :: T.Text -> [Ticker] -> IO (M.Map TickerId BarSeries)
makeBars qtisEp confs = makeBars qtisEp tickersList =
withContext $ \ctx -> withContext $ \ctx ->
M.fromList <$> mapM (mkBarEntry ctx qtisEp) confs M.fromList <$> mapM (mkBarEntry ctx qtisEp) tickersList
mkBarEntry ctx qtisEp conf = do mkBarEntry ctx qtisEp tickerEntry = do
info <- qtisGetTickersInfo ctx qtisEp (tickerId conf) info <- qtisGetTickersInfo ctx qtisEp (code tickerEntry)
return (BarSeriesId (tickerId conf) (timeframe conf), return (code tickerEntry, BarSeries (code tickerEntry) (Timeframe (timeframeSeconds tickerEntry)) [] (InstrumentParameters (fromInteger $ tiLotSize info) (tiTickSize info)))
BarSeries
(tickerId conf)
(timeframe conf)
[] runBacktestDriver feeds params tickerList = do
(InstrumentParameters (tickerId conf) (fromInteger $ tiLotSize info) (tiTickSize info))) let s = runConduit $ barStreamFromFeeds feeds .| backtestLoop
let finalState = execState (unBacktestingMonad s) $ defaultBacktestState defaultState params tickerList
runBacktestDriver desc feeds params = do print $ finalState ^. cash
bigConf <- loadConfig (T.pack $ strategyConfigFile params) print $ finalState ^. tradesLog
case confTickers bigConf of forM_ (reverse $ finalState ^. logs) putStrLn
tickerList@(firstTicker:restTickers) -> do
bars <- makeBars (T.pack $ qtisEndpoint params) tickerList loadStrategyConfig :: (FromJSON c) => Params -> IO ([Ticker], c)
let s = runConduit $ barStreamFromFeeds feeds .| backtestLoop desc loadStrategyConfig params = do
let finalState = content <- readFile (strategyConfigFile params)
execState (unBacktestingMonad s) $ defaultBacktestState def (confStrategy bigConf) desc bars (fmap toBarSeriesId (firstTicker :| restTickers)) case loadStrategyConfig' content of
print $ finalState ^. cash Just (tickersList, config) -> return (tickersList, config)
print $ finalState ^. tradesLog _ -> throw $ UnableToLoadConfig (T.pack . strategyConfigFile $ params)
forM_ (reverse $ finalState ^. logs) putStrLn
_ -> return () loadStrategyConfig' content = do
v <- decode content
toBarSeriesId conf = BarSeriesId (tickerId conf) (timeframe conf) case v of
Object o -> do
mbTickers <- "tickers" `lookup` o
mbParams <- "params" `lookup` o
tickers' <- parseMaybe parseJSON mbTickers
params <- parseMaybe parseJSON mbParams
return (tickers', params)
_ -> Nothing
barStreamFromFeeds :: (Monad m) => V.Vector (BarTimeframe, [Bar]) -> ConduitT () (BarSeriesId, Bar) m ()
barStreamFromFeeds feeds = case nextBar feeds of barStreamFromFeeds feeds = case nextBar feeds of
Just (tf, bar, feeds') -> yield (BarSeriesId (barSecurity bar) tf, bar) >> barStreamFromFeeds feeds' Just (bar, feeds') -> yield bar >> barStreamFromFeeds feeds'
_ -> return () _ -> return ()
nextBar :: V.Vector (BarTimeframe, [Bar]) -> Maybe (BarTimeframe, Bar, V.Vector (BarTimeframe, [Bar])) nextBar :: V.Vector [Bar] -> Maybe (Bar, V.Vector [Bar])
nextBar feeds = case indexOfNextFeed feeds of nextBar feeds = case indexOfNextFeed feeds of
Just ix -> do Just ix -> do
(tf, f) <- feeds !? ix f <- feeds !? ix
h <- headMay f h <- headMay f
return (tf, h, feeds // [(ix, (tf, tail f))]) return (h, feeds // [(ix, tail f)])
_ -> Nothing _ -> Nothing
indexOfNextFeed feeds = runST $ do indexOfNextFeed feeds = runST $ do
minTs <- newSTRef Nothing minTs <- newSTRef Nothing
minIx <- newSTRef Nothing minIx <- newSTRef Nothing
forM_ [0..(V.length feeds-1)] (\ix -> do forM_ [0..(V.length feeds-1)] (\ix -> do
let (_, feed) = feeds ! ix let feed = feeds ! ix
curTs <- readSTRef minTs curTs <- readSTRef minTs
case feed of case feed of
x:_ -> case curTs of x:_ -> case curTs of
@ -258,124 +182,126 @@ backtestMain descriptors = do
_ -> return ()) _ -> return ())
readSTRef minIx readSTRef minIx
backtestLoop :: StrategyDescriptor c s -> ConduitT (BarSeriesId, Bar) Void (BacktestingMonad c s) () backtestLoop = awaitForever (\bar -> do
backtestLoop desc =
awaitForever (\(bsId, bar) -> do
_curState <- use robotState _curState <- use robotState
_env <- gets _strategyEnvironment _env <- gets _strategyEnvironment
let newTimestamp = barTimestamp bar let newTimestamp = barTimestamp bar
barsMap %= updateBars bsId bar strategyEnvironment . seBars %= (flip updateBars bar)
strategyEnvironment . seLastTimestamp .= newTimestamp strategyEnvironment . seLastTimestamp .= newTimestamp
enqueueEvent (NewBar (bsIdTf bsId, bar)) enqueueEvent (NewBar bar)
lift (handleEvents desc)) lift handleEvents)
bsIdTf (BarSeriesId _ tf) = tf handleEvents = do
events <- use pendingEvents
case events of
handleEvents :: StrategyDescriptor c s -> BacktestingMonad c s () x :<| xs -> do
handleEvents desc = do pendingEvents .= xs
events <- use pendingEvents handleEvent x
case events of handleEvents
x :<| xs -> do _ -> return ()
pendingEvents .= xs
handleEvent desc x executePendingOrders bar = do
handleEvents desc executeMarketOrders bar
_ -> return () executeLimitOrders bar
executePendingOrders bar = do executeLimitOrders bar = do
executeMarketOrders bar (limitOrders, otherOrders'') <- partition
executeLimitOrders bar (\o -> case orderPrice o of
Limit _ -> True
executeLimitOrders bar = do _ -> False) <$> use pendingOrders
(limitOrders, otherOrders'') <- partition let (executableOrders, otherOrders') = partition (isExecutable bar) limitOrders
(\o -> case orderPrice o of pendingOrders .= otherOrders' ++ otherOrders''
Limit _ -> True forM_ executableOrders $ \order -> order `executeAtPrice` priceForLimitOrder order bar
_ -> False) <$> use pendingOrders
let (executableOrders, otherOrders') = partition (isExecutable bar) limitOrders isExecutable bar order = case orderPrice order of
pendingOrders .= otherOrders' ++ otherOrders'' Limit price -> if orderOperation order == Buy
forM_ executableOrders $ \order -> order `executeAtPrice` priceForLimitOrder order bar then price >= barLow bar
else price <= barHigh bar
isExecutable bar order = case orderPrice order of _ -> True
Limit price -> if orderOperation order == Buy
then price >= barLow bar priceForLimitOrder order bar = case orderPrice order of
else price <= barHigh bar Limit price -> if orderOperation order == Buy
_ -> True then if price >= barOpen bar
then barOpen bar
priceForLimitOrder order bar = case orderPrice order of else price
Limit price -> if orderOperation order == Buy else if price <= barOpen bar
then if price >= barOpen bar then barOpen bar
then barOpen bar else price
else price _ -> error "Should've been limit order"
else if price <= barOpen bar
then barOpen bar executeMarketOrders bar = do
else price (marketOrders, otherOrders) <- partition (\o -> orderPrice o == Market) <$> use pendingOrders
_ -> error "Should've been limit order" pendingOrders .= otherOrders
forM_ marketOrders $ \order ->
executeMarketOrders bar = do order `executeAtPrice` barOpen bar
(marketOrders, otherOrders) <- partition (\o -> orderPrice o == Market) <$> use pendingOrders
pendingOrders .= otherOrders executeAtPrice order price = do
forM_ marketOrders $ \order -> ts <- use $ strategyEnvironment . seLastTimestamp
order `executeAtPrice` barOpen bar let thisTrade = mkTrade order price ts
tradesLog %= (\log' -> thisTrade : log')
executeAtPrice order price = do pendingEvents %= (\s -> (OrderUpdate (orderId order) Executed) <| s)
ts <- use $ strategyEnvironment . seLastTimestamp pendingEvents %= (\s -> (NewTrade thisTrade) <| s)
let thisTrade = mkTrade order price ts
tradesLog %= (thisTrade :) mkTrade :: Order -> Price -> UTCTime -> Trade
pendingEvents %= (\s -> OrderUpdate (orderId order) Executed <| s) mkTrade order price ts = Trade {
pendingEvents %= (\s -> NewTrade thisTrade <| s) tradeOrderId = orderId order,
tradePrice = price,
mkTrade :: Order -> Price -> UTCTime -> Trade tradeQuantity = orderQuantity order,
mkTrade order price ts = Trade { tradeVolume = (fromIntegral . orderQuantity $ order) * price,
tradeOrderId = orderId order, tradeVolumeCurrency = "pt",
tradePrice = price, tradeOperation = orderOperation order,
tradeQuantity = orderQuantity order, tradeAccount = orderAccountId order,
tradeVolume = (fromIntegral . orderQuantity $ order) * price, tradeSecurity = orderSecurity order,
tradeVolumeCurrency = "pt", tradeTimestamp = ts,
tradeOperation = orderOperation order, tradeCommission = 0,
tradeAccount = orderAccountId order, tradeSignalId = orderSignalId order
tradeSecurity = orderSecurity order, }
tradeTimestamp = ts,
tradeCommission = 0, handleEvent event@(NewBar bar) = do
tradeSignalId = orderSignalId order executePendingOrders bar
} handleEvents -- This should pass OrderUpdate events to the callback before NewBar events
firedTimers <- fireTimers (barTimestamp bar)
handleEvent :: StrategyDescriptor c s -> Event -> BacktestingMonad c s () mapM_ (\x -> enqueueEvent (TimerFired x)) firedTimers
handleEvent desc event@(NewBar (_, bar)) = do handleEvent' event
executePendingOrders bar return ()
handleEvents desc -- This should pass OrderUpdate events to the callback before NewBar events
firedTimers <- fireTimers (barTimestamp bar) handleEvent event = handleEvent' event
mapM_ (enqueueEvent . TimerFired) firedTimers
handleEvent' desc event handleEvent' event = callback event
return ()
updateBars barMap newbar = M.alter (\case
handleEvent desc event = handleEvent' desc event Nothing -> Just BarSeries { bsTickerId = barSecurity newbar,
bsTimeframe = Timeframe 60,
handleEvent' desc event = eventCallback desc event bsBars = [newbar, newbar] }
Just bs -> Just bs { bsBars = updateBarList newbar (bsBars bs) }) (barSecurity newbar) barMap
updateBars bsId newbar barMap = M.adjust (\bs -> bs { bsBars = newbar : bsBars bs }) bsId barMap
updateBarList newbar (_:bs) = newbar:newbar:bs
fireTimers ts = do updateBarList newbar _ = newbar:[newbar]
(firedTimers, otherTimers) <- partition (< ts) <$> use pendingTimers
pendingTimers .= otherTimers fireTimers ts = do
return firedTimers (firedTimers, otherTimers) <- partition (< ts) <$> use pendingTimers
pendingTimers .= otherTimers
loadFeeds :: [Feed] -> IO (V.Vector (BarTimeframe, [Bar])) return firedTimers
loadFeeds feeds = V.fromList <$> mapM loadFeed feeds
loadFeed (Feed tid path) = do loadFeeds :: [Feed] -> IO (V.Vector [Bar])
content <- readFile path loadFeeds feeds = V.fromList <$> mapM loadFeed feeds
case parseQuotes $ toStrict content of loadFeed (Feed tid path) = do
Just quotes -> case headMay quotes of content <- readFile path
Just first -> return (BarTimeframe (rowTimeframe first), fmap (rowToBar tid) quotes) case QF.parseQuotes $ toStrict content of
Nothing -> throw $ UnableToLoadFeed (T.pack path) Just quotes -> return $ fmap (rowToBar tid) quotes
_ -> throw $ UnableToLoadFeed (T.pack path) _ -> throw $ UnableToLoadFeed (T.pack path)
rowToBar tid r = Bar tid (rowTime r) (rowOpen r) (rowHigh r) (rowLow r) (rowClose r) (rowVolume r) rowToBar tid r = Bar tid (rowTime r) (rowOpen r) (rowHigh r) (rowLow r) (rowClose r) (rowVolume r)
enqueueEvent :: MonadState (BacktestState c s) m => Event -> m ()
enqueueEvent event = pendingEvents %= (|> event) enqueueEvent event = pendingEvents %= (\s -> s |> event)
defaultBacktestState :: s -> c -> StrategyDescriptor c s -> M.Map BarSeriesId BarSeries -> NonEmpty BarSeriesId -> BacktestState c s instance (Default c, Default s) => Default (BacktestState c s)
defaultBacktestState s c desc = BacktestState desc 0 s c (StrategyEnvironment "" "" 1 (UTCTime (fromGregorian 1970 1 1) 0)) [] Seq.empty [] 1 [] [] where
def = defaultBacktestState def def def
defaultBacktestState :: s -> c -> Bars -> BacktestState c s
defaultBacktestState s c bars = BacktestState 0 s c (StrategyEnvironment "" "" 1 bars (UTCTime (fromGregorian 1970 1 1) 0)) [] Seq.empty [] 1 [] []
newtype BacktestingMonad s c a = BacktestingMonad { unBacktestingMonad :: State (BacktestState s c) a } newtype BacktestingMonad s c a = BacktestingMonad { unBacktestingMonad :: State (BacktestState s c) a }
deriving (Functor, Applicative, Monad, MonadState (BacktestState s c)) deriving (Functor, Applicative, Monad, MonadState (BacktestState s c))
@ -389,38 +315,21 @@ instance MonadRobot (BacktestingMonad c s) c s where
submitOrder order = do submitOrder order = do
oid <- nextOrderId oid <- nextOrderId
let orderWithId = order { orderId = oid } let orderWithId = order { orderId = oid }
pendingOrders %= (orderWithId :) pendingOrders %= ((:) orderWithId)
pendingEvents %= (\s -> s |> OrderUpdate oid Submitted) pendingEvents %= (\s -> s |> (OrderSubmitted orderWithId))
return oid
cancelOrder oid = do cancelOrder oid = do
orders <- use pendingOrders orders <- use pendingOrders
let (matchingOrders, otherOrders) = partition (\o -> orderId o == oid) orders let (matchingOrders, otherOrders) = partition (\o -> orderId o == oid) orders
case matchingOrders of case matchingOrders of
[] -> return () [] -> return ()
xs -> do xs -> do
mapM_ (\o -> pendingEvents %= (\s -> s |> OrderUpdate (orderId o) Cancelled)) xs mapM_ (\o -> pendingEvents %= (\s -> s |> (OrderUpdate (orderId o) Cancelled))) xs
pendingOrders .= otherOrders pendingOrders .= otherOrders
appendToLog _ txt = logs %= ((TL.toStrict txt) :) appendToLog txt = logs %= ((:) (TL.toStrict txt))
setupTimer time = pendingTimers %= (time :) setupTimer time = pendingTimers %= ((:) time)
enqueueIOAction _actionId _action = error "Backtesting io actions is not supported" enqueueIOAction _actionId _action = error "Backtesting io actions is not supported"
getConfig = use robotParams getConfig = use robotParams
getState = use robotState getState = use robotState
setState s = robotState .= s setState s = robotState .= s
getEnvironment = use strategyEnvironment getEnvironment = use strategyEnvironment
getTicker tid tf = do
m <- gets _barsMap
return $ M.lookup (BarSeriesId tid tf) m
getTickerInfo tid = do
tickers <- getAvailableTickers
case L.find (\(BarSeriesId t _) -> t == tid) tickers of
Just (BarSeriesId t tf) -> do
ticker <- getTicker t tf
return (bsParams <$> ticker)
Nothing -> return Nothing
getAvailableTickers = use availableTickers
instance ConfigStorage IO where
loadConfig filepath = do
cfg <- B.readFile $ T.unpack filepath
input auto (decodeUtf8 cfg)

305
src/ATrade/Driver/Junction.hs

@ -1,61 +1,99 @@
{-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE RankNTypes #-} {-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
module ATrade.Driver.Junction module ATrade.Driver.Junction
( (
junctionMain junctionMain
) where ) where
import ATrade.Broker.Client (startBrokerClient, import ATrade.Broker.Client (BrokerClientHandle,
startBrokerClient,
stopBrokerClient) stopBrokerClient)
import ATrade.Broker.Protocol (Notification (OrderNotification, TradeNotification), import ATrade.Broker.Protocol (Notification (OrderNotification, TradeNotification),
NotificationSqnum (unNotificationSqnum), NotificationSqnum (unNotificationSqnum),
getNotificationSqnum) getNotificationSqnum)
import ATrade.Driver.Junction.BrokerService (getNotifications, import ATrade.Driver.Junction.BrokerService (BrokerService,
getNotifications,
mkBrokerService) mkBrokerService)
import ATrade.Driver.Junction.JunctionMonad (JunctionEnv (..),
JunctionM (..),
saveRobots,
startRobot)
import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (..), import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (..),
ProgramOptions (ProgramOptions, configPath)) ProgramOptions (ProgramOptions, configPath))
import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription, removeSubscription),
QuoteSubscription (QuoteSubscription),
SubscriptionId (SubscriptionId))
import ATrade.Driver.Junction.QuoteThread (DownloaderEnv (DownloaderEnv), import ATrade.Driver.Junction.QuoteThread (DownloaderEnv (DownloaderEnv),
QuoteThreadHandle,
withQThread) withQThread)
import ATrade.Driver.Junction.RemoteControl (handleRemoteControl) import qualified ATrade.Driver.Junction.QuoteThread as QT
import ATrade.Driver.Junction.RobotDriverThread (RobotDriverHandle, postNotificationEvent) import ATrade.Driver.Junction.RobotDriverThread (RobotDriverHandle, RobotEnv (RobotEnv),
import ATrade.Driver.Junction.Types (StrategyDescriptorE) RobotM (..),
import ATrade.Logging (Message (..), Severity (Debug, Info, Trace, Warning), createRobotDriverThread,
onStrategyInstance,
postNotificationEvent)
import ATrade.Driver.Junction.Types (StrategyDescriptorE (StrategyDescriptorE),
StrategyInstance (strategyInstanceId),
StrategyInstanceDescriptor (..),
confStrategy,
confTickers,
strategyState,
strategyTimers,
tickerId,
timeframe)
import ATrade.Logging (Message, Severity (Debug, Error, Info, Trace, Warning),
fmtMessage, fmtMessage,
logWarning,
logWith) logWith)
import ATrade.Quotes.QHP (mkQHPHandle) import ATrade.Quotes.QHP (mkQHPHandle)
import ATrade.Types (OrderId, Trade (tradeOrderId)) import ATrade.RoboCom.ConfigStorage (ConfigStorage (loadConfig))
import Colog (LogAction (LogAction), import ATrade.RoboCom.Monad (StrategyEnvironment (..))
cfilter, import ATrade.RoboCom.Persistence (MonadPersistence (loadState, saveState))
hoistLogAction, import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId),
logTextStderr, Bars)
(<&), (>$<)) import ATrade.Types (ClientSecurityParams (ClientSecurityParams),
OrderId,
Trade (tradeOrderId))
import Colog (HasLog (getLogAction, setLogAction),
LogAction,
logTextStdout,
(>$<))
import Colog.Actions (logTextHandle) import Colog.Actions (logTextHandle)
import Control.Concurrent.QSem (newQSem) import Control.Concurrent (threadDelay)
import Control.Exception.Safe (MonadThrow,
bracket)
import Control.Monad (forM_, forever) import Control.Monad (forM_, forever)
import Control.Monad.Extra (whenM) import Control.Monad.Extra (whenM)
import Control.Monad.IO.Class (MonadIO (liftIO)) import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Reader (ReaderT (runReaderT)) import Control.Monad.Reader (MonadReader, ReaderT (runReaderT),
asks)
import Data.Aeson (eitherDecode,
encode)
import qualified Data.ByteString.Lazy as BL
import Data.Default (Default (def))
import Data.Foldable (traverse_)
import Data.IORef (IORef, import Data.IORef (IORef,
atomicModifyIORef', atomicModifyIORef',
newIORef, newIORef,
readIORef) readIORef)
import Data.List.NonEmpty (NonEmpty ((:|)))
import qualified Data.Map.Strict as M import qualified Data.Map.Strict as M
import Data.Set (notMember) import Data.Set (notMember)
import qualified Data.Set as S import qualified Data.Set as S
import qualified Data.Text as T import qualified Data.Text as T
import Data.Text.Encoding (encodeUtf8)
import Data.Text.IO (readFile) import Data.Text.IO (readFile)
import Database.Redis (ConnectInfo (..), PortID (UnixSocket), import Data.Time (getCurrentTime)
import Data.Time.Clock.POSIX (getPOSIXTime)
import Database.Redis (ConnectInfo (..),
Connection,
PortID (UnixSocket),
checkedConnect, checkedConnect,
defaultConnectInfo) defaultConnectInfo,
get, mset,
runRedis)
import Dhall (auto, input) import Dhall (auto, input)
import Options.Applicative (Parser, import Options.Applicative (Parser,
execParser, execParser,
@ -71,31 +109,79 @@ import System.IO (BufferMode (LineBu
Handle, Handle,
IOMode (AppendMode), IOMode (AppendMode),
hSetBuffering, hSetBuffering,
openFile,
withFile) withFile)
import System.ZMQ4 (Router (Router), import System.ZMQ4 (withContext)
bind, withContext, import System.ZMQ4.ZAP (loadCertificateFromFile)
withSocket)
import UnliftIO (MonadUnliftIO)
import UnliftIO.Exception (bracket)
import UnliftIO.QSem (QSem, withQSem)
data JunctionEnv =
JunctionEnv
{
peRedisSocket :: Connection,
peConfigPath :: FilePath,
peQuoteThread :: QuoteThreadHandle,
peBroker :: BrokerClientHandle,
peRobots :: IORef (M.Map T.Text RobotDriverHandle),
peLogAction :: LogAction JunctionM Message
}
locked :: (MonadIO m, MonadUnliftIO m) => QSem -> LogAction m a -> LogAction m a newtype JunctionM a = JunctionM { unJunctionM :: ReaderT JunctionEnv IO a }
locked sem action = LogAction (\m -> withQSem sem (action <& m)) deriving (Functor, Applicative, Monad, MonadReader JunctionEnv, MonadIO, MonadThrow)
logger :: (MonadIO m) => M.Map T.Text Severity -> Handle -> LogAction m Message instance HasLog JunctionEnv Message JunctionM where
logger loglevels h = cfilter checkLoglevel (fmtMessage >$< (logTextStderr <> logTextHandle h)) getLogAction = peLogAction
where setLogAction a e = e { peLogAction = a }
checkLoglevel msg =
case M.lookup (msgComponent msg) loglevels of instance ConfigStorage JunctionM where
Just level -> msgSeverity msg >= level loadConfig key = do
Nothing -> True basePath <- asks peConfigPath
let path = basePath <> "/" <> T.unpack key -- TODO fix path construction
liftIO $ readFile path >>= input auto
instance MonadPersistence JunctionM where
saveState newState key = do
conn <- asks peRedisSocket
now <- liftIO getPOSIXTime
res <- liftIO $ runRedis conn $ mset [(encodeUtf8 key, BL.toStrict $ encode newState),
(encodeUtf8 (key <> ":last_store") , encodeUtf8 . T.pack . show $ now)]
case res of
Left _ -> logWarning "Junction " "Unable to save state"
Right _ -> return ()
loadState key = do
conn <- asks peRedisSocket
res <- liftIO $ runRedis conn $ get (encodeUtf8 key)
-- TODO: just chain eithers
case res of
Left _ -> do
logWarning "Junction" "Unable to load state"
return def
Right maybeRawState ->
case maybeRawState of
Just rawState -> case eitherDecode $ BL.fromStrict rawState of
Left _ -> do
logWarning "Junction" "Unable to decode state"
return def
Right decodedState -> return decodedState
Nothing -> do
logWarning "Junction" "Unable to decode state"
return def
instance QuoteStream JunctionM where
addSubscription (QuoteSubscription ticker timeframe) chan = do
qt <- asks peQuoteThread
QT.addSubscription qt ticker timeframe chan
return (SubscriptionId 0) -- TODO subscription Ids
removeSubscription _ = undefined
logger :: (MonadIO m) => Handle -> LogAction m Message
logger h = fmtMessage >$< (logTextStdout <> logTextHandle h)
junctionMain :: M.Map T.Text StrategyDescriptorE -> IO () junctionMain :: M.Map T.Text StrategyDescriptorE -> IO ()
junctionMain descriptors = do junctionMain descriptors = do
opts <- parseOptions opts <- parseOptions
let initialLogger = fmtMessage >$< logTextStderr let initialLogger = fmtMessage >$< logTextStdout
logWith initialLogger Info "Junction" $ "Reading config from: " <> (T.pack . show) (configPath opts) logWith initialLogger Info "Junction" $ "Reading config from: " <> (T.pack . show) (configPath opts)
@ -103,57 +189,83 @@ junctionMain descriptors = do
withFile (logBasePath cfg <> "/all.log") AppendMode $ \h -> do withFile (logBasePath cfg <> "/all.log") AppendMode $ \h -> do
hSetBuffering h LineBuffering let log = logWith (logger h)
locksem <- newQSem 1
let globalLogger = locked locksem (logger (M.fromList $ logLevels cfg) h)
let log = logWith globalLogger
barsMap <- newIORef M.empty barsMap <- newIORef M.empty
tickerInfoMap <- newIORef M.empty
log Info "Junction" $ "Connecting to redis: " <> redisSocket cfg log Info "Junction" $ "Connecting to redis: " <> redisSocket cfg
redis <- checkedConnect (defaultConnectInfo { connectPort = UnixSocket (T.unpack $ redisSocket cfg) }) redis <- checkedConnect (defaultConnectInfo { connectPort = UnixSocket (T.unpack $ redisSocket cfg) })
log Info "Junction" "redis: connected" log Info "Junction" "redis: connected"
withContext $ \ctx -> do withContext $ \ctx -> do
log Debug "Junction" "0mq context created" log Debug "Junction" "0mq context created"
let downloaderEnv = DownloaderEnv (mkQHPHandle ctx (qhpEndpoint cfg)) ctx (qtisEndpoint cfg) (hoistLogAction liftIO globalLogger) let downloaderLogAction = logger h
let downloaderEnv = DownloaderEnv (mkQHPHandle ctx (qhpEndpoint cfg)) ctx (qtisEndpoint cfg) downloaderLogAction
robotsMap <- newIORef M.empty robotsMap <- newIORef M.empty
ordersMap <- newIORef M.empty ordersMap <- newIORef M.empty
handledNotifications <- newIORef S.empty handledNotifications <- newIORef S.empty
withBroker cfg robotsMap ordersMap handledNotifications globalLogger $ \bro -> withBroker cfg ctx robotsMap ordersMap handledNotifications (logger h) $ \bro ->
withQThread downloaderEnv barsMap tickerInfoMap cfg ctx globalLogger $ \qt -> withQThread downloaderEnv barsMap cfg ctx (logger h) $ \qt -> do
withSocket ctx Router $ \rcSocket -> do broService <- mkBrokerService bro ordersMap
liftIO $ bind rcSocket (T.unpack . remoteControlEndpoint $ cfg) let junctionLogAction = logger h
broService <- mkBrokerService bro ordersMap let env =
let junctionLogAction = hoistLogAction liftIO globalLogger JunctionEnv
let env = {
JunctionEnv peRedisSocket = redis,
{ peConfigPath = robotsConfigsPath cfg,
peRedisSocket = redis, peQuoteThread = qt,
peConfigPath = robotsConfigsPath cfg, peBroker = bro,
peQuoteThread = qt, peRobots = robotsMap,
peBroker = bro, peLogAction = junctionLogAction
peRobots = robotsMap, }
peRemoteControlSocket = rcSocket, withJunction env $ do
peLogAction = junctionLogAction, startRobots h cfg barsMap broService
peIoLogAction = globalLogger, forever $ do
peProgramConfiguration = cfg, notifications <- liftIO $ getNotifications broService
peBarsMap = barsMap, forM_ notifications (liftIO . handleBrokerNotification robotsMap ordersMap handledNotifications (logger h))
peTickerInfoMap = tickerInfoMap, saveRobots
peBrokerService = broService, liftIO $ threadDelay 1000000
peDescriptors = descriptors
}
withJunction env $ do
startRobots cfg
forever $ do
notifications <- getNotifications broService
forM_ notifications (liftIO . handleBrokerNotification robotsMap ordersMap handledNotifications globalLogger)
saveRobots
handleRemoteControl 1000
where where
startRobots :: ProgramConfiguration -> JunctionM () saveRobots :: JunctionM ()
startRobots cfg = forM_ (instances cfg) startRobot saveRobots = do
robotsMap <- asks peRobots >>= (liftIO . readIORef)
traverse_ saveRobotState robotsMap
saveRobotState :: RobotDriverHandle -> JunctionM ()
saveRobotState handle = onStrategyInstance handle $ \inst -> do
currentState <- liftIO $ readIORef (strategyState inst)
saveState currentState (strategyInstanceId inst)
currentTimers <- liftIO $ readIORef (strategyTimers inst)
saveState currentTimers (strategyInstanceId inst <> ":timers")
startRobots :: Handle -> ProgramConfiguration -> IORef Bars -> BrokerService -> JunctionM ()
startRobots logHandle cfg barsMap broService = forM_ (instances cfg) $ \inst -> do
now <- liftIO getCurrentTime
case M.lookup (strategyBaseName inst) descriptors of
Just (StrategyDescriptorE desc) -> do
bigConf <- loadConfig (configKey inst)
case confTickers bigConf of
(firstTicker:restTickers) -> do
rConf <- liftIO $ newIORef (confStrategy bigConf)
rState <- loadState (stateKey inst) >>= liftIO . newIORef
rTimers <- loadState (stateKey inst <> ":timers") >>= liftIO . newIORef
localH <- liftIO $ openFile (logBasePath cfg <> "/" <> T.unpack (strategyId inst) <> ".log") AppendMode
liftIO $ hSetBuffering localH LineBuffering
let robotLogAction = logger logHandle <> (fmtMessage >$< logTextHandle localH)
stratEnv <- liftIO $ newIORef StrategyEnvironment
{
_seInstanceId = strategyId inst,
_seAccount = accountId inst,
_seVolume = 1,
_seLastTimestamp = now
}
let robotEnv = RobotEnv rState rConf rTimers barsMap stratEnv robotLogAction broService (toBarSeriesId <$> (firstTicker :| restTickers))
robot <- createRobotDriverThread inst desc (flip runReaderT robotEnv . unRobotM) bigConf rConf rState rTimers
robotsMap' <- asks peRobots
liftIO $ atomicModifyIORef' robotsMap' (\s -> (M.insert (strategyId inst) robot s, ()))
_ -> logWith (logger logHandle) Error (strategyId inst) $ "No tickers configured !!!"
Nothing -> error "Unknown strategy"
toBarSeriesId t = BarSeriesId (tickerId t) (timeframe t)
withJunction :: JunctionEnv -> JunctionM () -> IO () withJunction :: JunctionEnv -> JunctionM () -> IO ()
withJunction env = (`runReaderT` env) . unJunctionM withJunction env = (`runReaderT` env) . unJunctionM
@ -164,8 +276,8 @@ junctionMain descriptors = do
LogAction IO Message -> LogAction IO Message ->
Notification -> Notification ->
IO () IO ()
handleBrokerNotification robotsRef ordersMapRef handled logger' notification= do handleBrokerNotification robotsRef ordersMapRef handled logger notification= do
logWith logger' Trace "Junction" $ "Incoming notification: " <> (T.pack . show . unNotificationSqnum . getNotificationSqnum) notification logWith logger Trace "Junction" $ "Incoming notification: " <> (T.pack . show . unNotificationSqnum . getNotificationSqnum) notification
whenM (notMember (getNotificationSqnum notification) <$> readIORef handled) $ do whenM (notMember (getNotificationSqnum notification) <$> readIORef handled) $ do
robotsMap <- readIORef robotsRef robotsMap <- readIORef robotsRef
ordersMap <- readIORef ordersMapRef ordersMap <- readIORef ordersMapRef
@ -173,8 +285,8 @@ junctionMain descriptors = do
case getNotificationTarget robotsMap ordersMap notification of case getNotificationTarget robotsMap ordersMap notification of
Just robot -> postNotificationEvent robot notification Just robot -> postNotificationEvent robot notification
Nothing -> do Nothing -> do
logWith logger' Warning "Junction" $ "Unknown order: " <> (T.pack . show) (notificationOrderId notification) logWith logger Warning "Junction" $ "Unknown order: " <> (T.pack . show) (notificationOrderId notification)
logWith logger' Debug "Junction" $ "Ordermap: " <> (T.pack . show) (M.toList ordersMap) logWith logger Debug "Junction" $ "Ordermap: " <> (T.pack . show) (M.toList ordersMap)
atomicModifyIORef' handled (\s -> (S.insert (getNotificationSqnum notification) s, ())) atomicModifyIORef' handled (\s -> (S.insert (getNotificationSqnum notification) s, ()))
@ -186,15 +298,30 @@ junctionMain descriptors = do
notificationOrderId (OrderNotification _ oid _) = oid notificationOrderId (OrderNotification _ oid _) = oid
notificationOrderId (TradeNotification _ trade) = tradeOrderId trade notificationOrderId (TradeNotification _ trade) = tradeOrderId trade
withBroker cfg robotsMap ordersMap handled logger' f = do withBroker cfg ctx robotsMap ordersMap handled logger f = do
securityParameters <- loadBrokerSecurityParameters cfg
bracket bracket
(startBrokerClient (startBrokerClient
(brokerIdentity cfg) "broker"
ctx
(brokerEndpoint cfg) (brokerEndpoint cfg)
[handleBrokerNotification robotsMap ordersMap handled logger'] (brokerNotificationEndpoint cfg)
logger') [handleBrokerNotification robotsMap ordersMap handled logger]
securityParameters
logger)
stopBrokerClient f stopBrokerClient f
loadBrokerSecurityParameters cfg =
case (brokerClientCert cfg, brokerServerCert cfg) of
(Just clientCertPath, Just serverCertPath) -> do
eClientCert <- loadCertificateFromFile clientCertPath
eServerCert <- loadCertificateFromFile serverCertPath
case (eClientCert, eServerCert) of
(Right clientCert, Right serverCert) -> return $ ClientSecurityParams (Just clientCert) (Just serverCert)
(_, _) -> return $ ClientSecurityParams Nothing Nothing
_ -> return $ ClientSecurityParams Nothing Nothing
parseOptions = execParser options parseOptions = execParser options
options = info (optionsParser <**> helper) options = info (optionsParser <**> helper)
(fullDesc <> (fullDesc <>

22
src/ATrade/Driver/Junction/BrokerService.hs

@ -12,7 +12,7 @@ module ATrade.Driver.Junction.BrokerService
import qualified ATrade.Broker.Client as Bro import qualified ATrade.Broker.Client as Bro
import ATrade.Broker.Protocol (Notification (..)) import ATrade.Broker.Protocol (Notification (..))
import ATrade.Logging (Message, logDebug, logWarning) import ATrade.Logging (Message, logDebug)
import ATrade.Types (Order (..), OrderId) import ATrade.Types (Order (..), OrderId)
import Colog (WithLog) import Colog (WithLog)
import Control.Monad.IO.Class (MonadIO (liftIO)) import Control.Monad.IO.Class (MonadIO (liftIO))
@ -38,27 +38,19 @@ submitOrder service identity order = do
oid <- nextOrderId service oid <- nextOrderId service
logDebug "BrokerService" $ "New order, id: " <> (T.pack . show) oid logDebug "BrokerService" $ "New order, id: " <> (T.pack . show) oid
liftIO $ atomicModifyIORef' (orderMap service) (\s -> (M.insert oid identity s, ())) liftIO $ atomicModifyIORef' (orderMap service) (\s -> (M.insert oid identity s, ()))
r <- liftIO $ Bro.submitOrder (broker service) order { orderId = oid } _ <- liftIO $ Bro.submitOrder (broker service) order { orderId = oid }
case r of
Left err -> logWarning "BrokerService" $ "Submit order error: " <> err
_ -> return ()
return oid return oid
where where
nextOrderId srv = liftIO $ atomicModifyIORef' (orderIdCounter srv) (\s -> (s + 1, s)) nextOrderId srv = liftIO $ atomicModifyIORef' (orderIdCounter srv) (\s -> (s + 1, s))
cancelOrder :: (MonadIO m, WithLog env Message m) => BrokerService -> OrderId -> m () cancelOrder :: BrokerService -> OrderId -> IO ()
cancelOrder service oid = do cancelOrder service oid = do
r <- liftIO $ Bro.cancelOrder (broker service) oid _ <- Bro.cancelOrder (broker service) oid
case r of
Left err -> logWarning "BrokerServer" $ "Cancel order error: " <> err
_ -> return ()
return () return ()
getNotifications :: (MonadIO m, WithLog env Message m) => BrokerService -> m [Notification] getNotifications :: BrokerService -> IO [Notification]
getNotifications service = do getNotifications service = do
v <- liftIO $ Bro.getNotifications (broker service) v <- Bro.getNotifications (broker service)
case v of case v of
Left err -> do Left _ -> return []
logWarning "BrokerServer" $ "Get notifications order error: " <> err
return []
Right n -> return n Right n -> return n

258
src/ATrade/Driver/Junction/JunctionMonad.hs

@ -1,258 +0,0 @@
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
module ATrade.Driver.Junction.JunctionMonad
(
JunctionEnv(..),
JunctionM(..),
startRobot,
saveRobots,
reloadConfig,
getState,
setState
) where
import ATrade.Broker.Client (BrokerClientHandle)
import ATrade.Driver.Junction.BrokerService (BrokerService)
import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (logBasePath))
import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription, removeSubscription),
QuoteSubscription (QuoteSubscription))
import ATrade.Driver.Junction.QuoteThread (QuoteThreadHandle)
import qualified ATrade.Driver.Junction.QuoteThread as QT
import ATrade.Driver.Junction.RobotDriverThread (RobotDriverHandle, RobotEnv (RobotEnv),
RobotM (unRobotM),
createRobotDriverThread,
getInstanceDescriptor,
onStrategyInstance,
onStrategyInstanceM)
import ATrade.Driver.Junction.Types (StrategyDescriptorE (StrategyDescriptorE),
StrategyInstanceDescriptor,
accountId,
confStrategy,
confTickers,
configKey,
stateKey,
strategyBaseName,
strategyConfig,
strategyId,
strategyInstanceId,
strategyState,
strategyTimers,
tickerId,
timeframe)
import ATrade.Logging (Message, Severity (Error, Info),
fmtMessage,
logWarning,
logWith)
import ATrade.RoboCom.ConfigStorage (ConfigStorage (loadConfig))
import ATrade.RoboCom.Monad (StrategyEnvironment (..))
import ATrade.RoboCom.Persistence (MonadPersistence (loadState, saveState))
import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId),
Bars,
TickerInfoMap)
import Colog (HasLog (getLogAction, setLogAction),
LogAction,
hoistLogAction,
logTextHandle,
(>$<))
import Control.Exception.Safe (finally)
import Control.Monad.Reader (MonadIO (liftIO),
MonadReader,
ReaderT (runReaderT),
asks)
import Data.Aeson (decode,
eitherDecode,
encode)
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
import Data.Default (Default (def))
import Data.Foldable (traverse_)
import Data.IORef (IORef,
atomicModifyIORef',
newIORef,
readIORef,
writeIORef)
import Data.List.NonEmpty (NonEmpty ((:|)))
import qualified Data.Map.Strict as M
import qualified Data.Text as T
import Data.Text.Encoding (encodeUtf8)
import Data.Text.IO (readFile)
import Data.Time (getCurrentTime)
import Data.Time.Clock.POSIX (getPOSIXTime)
import Database.Redis (Connection, get,
mset, runRedis)
import Dhall (auto, input)
import Prelude hiding (log,
readFile)
import System.IO (BufferMode (LineBuffering),
IOMode (AppendMode),
hClose,
hSetBuffering,
openFile)
import System.ZMQ4 (Router, Socket)
import UnliftIO (MonadUnliftIO)
import UnliftIO.Exception (catchAny,
onException)
data JunctionEnv =
JunctionEnv
{
peRedisSocket :: Connection,
peConfigPath :: FilePath,
peQuoteThread :: QuoteThreadHandle,
peBroker :: BrokerClientHandle,
peRobots :: IORef (M.Map T.Text RobotDriverHandle),
peRemoteControlSocket :: Socket Router,
peLogAction :: LogAction JunctionM Message,
peIoLogAction :: LogAction IO Message,
peProgramConfiguration :: ProgramConfiguration,
peBarsMap :: IORef Bars,
peTickerInfoMap :: IORef TickerInfoMap,
peBrokerService :: BrokerService,
peDescriptors :: M.Map T.Text StrategyDescriptorE
}
newtype JunctionM a = JunctionM { unJunctionM :: ReaderT JunctionEnv IO a }
deriving (Functor, Applicative, Monad, MonadReader JunctionEnv, MonadIO, MonadUnliftIO)
instance HasLog JunctionEnv Message JunctionM where
getLogAction = peLogAction
setLogAction a e = e { peLogAction = a }
instance ConfigStorage JunctionM where
loadConfig key = do
basePath <- asks peConfigPath
let path = basePath <> "/" <> T.unpack key -- TODO fix path construction
liftIO $ readFile path >>= input auto
instance MonadPersistence JunctionM where
saveState newState key = do
conn <- asks peRedisSocket
now <- liftIO getPOSIXTime
res <- liftIO $ runRedis conn $ mset [(encodeUtf8 key, BL.toStrict $ encode newState),
(encodeUtf8 (key <> ":last_store") , encodeUtf8 . T.pack . show $ now)]
case res of
Left _ -> logWarning "Junction " "Unable to save state"
Right _ -> return ()
loadState key = do
conn <- asks peRedisSocket
res <- liftIO $ runRedis conn $ get (encodeUtf8 key)
-- TODO: just chain eithers
case res of
Left _ -> do
logWarning "Junction" "Unable to load state"
return def
Right maybeRawState ->
case maybeRawState of
Just rawState -> case eitherDecode $ BL.fromStrict rawState of
Left _ -> do
logWarning "Junction" "Unable to decode state"
return def
Right decodedState -> return decodedState
Nothing -> do
logWarning "Junction" "Unable to decode state"
return def
instance QuoteStream JunctionM where
addSubscription (QuoteSubscription ticker tf) chan = do
qt <- asks peQuoteThread
QT.addSubscription qt ticker tf chan
removeSubscription subId = do
qt <- asks peQuoteThread
QT.removeSubscription qt subId
startRobot :: StrategyInstanceDescriptor -> JunctionM ()
startRobot inst = do
ioLogger <- asks peIoLogAction
descriptors <- asks peDescriptors
cfg <- asks peProgramConfiguration
barsMap <- asks peBarsMap
tickerInfoMap <- asks peTickerInfoMap
broService <- asks peBrokerService
now <- liftIO getCurrentTime
let lLogger = hoistLogAction liftIO ioLogger
logWith lLogger Info "Junction" $ "Starting strategy: " <> strategyBaseName inst
case M.lookup (strategyBaseName inst) descriptors of
Just (StrategyDescriptorE desc) -> flip catchAny (\e -> logWith lLogger Error "Junction" $ "Exception: " <> (T.pack . show $ e)) $ do
bigConf <- loadConfig (configKey inst)
case confTickers bigConf of
(firstTicker:restTickers) -> do
rConf <- liftIO $ newIORef (confStrategy bigConf)
rState <- loadState (stateKey inst) >>= liftIO . newIORef
rTimers <- loadState (stateKey inst <> ":timers") >>= liftIO . newIORef
localH <- liftIO $ openFile (logBasePath cfg <> "/" <> T.unpack (strategyId inst) <> ".log") AppendMode
liftIO $ hSetBuffering localH LineBuffering
let robotLogAction = hoistLogAction liftIO ioLogger <> (fmtMessage >$< logTextHandle localH)
stratEnv <- liftIO $ newIORef StrategyEnvironment
{
_seInstanceId = strategyId inst,
_seAccount = accountId inst,
_seVolume = 1,
_seLastTimestamp = now
}
let robotEnv =
RobotEnv rState rConf rTimers barsMap tickerInfoMap stratEnv robotLogAction broService (toBarSeriesId <$> (firstTicker :| restTickers))
robot <- createRobotDriverThread inst desc (\a -> (flip runReaderT robotEnv . unRobotM) a `finally` hClose localH) bigConf rConf rState rTimers
robotsMap' <- asks peRobots
liftIO $ atomicModifyIORef' robotsMap' (\s -> (M.insert (strategyId inst) robot s, ()))
_ -> logWith lLogger Error (strategyId inst) "No tickers configured !!!"
Nothing -> logWith lLogger Error "Junction" $ "Unknown strategy: " <> strategyBaseName inst
where
toBarSeriesId t = BarSeriesId (tickerId t) (timeframe t)
saveRobots :: JunctionM ()
saveRobots = do
robotsMap <- asks peRobots >>= (liftIO . readIORef)
traverse_ saveRobotState robotsMap
saveRobotState :: RobotDriverHandle -> JunctionM ()
saveRobotState handle = onStrategyInstance handle $ \inst -> do
currentState <- liftIO $ readIORef (strategyState inst)
saveState currentState (strategyInstanceId inst)
currentTimers <- liftIO $ readIORef (strategyTimers inst)
saveState currentTimers (strategyInstanceId inst <> ":timers")
reloadConfig :: T.Text -> JunctionM (Either T.Text ())
reloadConfig instId = flip catchAny (\_ -> return $ Left "Exception") $ do
robotsMap' <- asks peRobots
robots <- liftIO $ readIORef robotsMap'
case M.lookup instId robots of
Just robot -> do
onStrategyInstanceM robot
(\inst -> do
let instDesc = getInstanceDescriptor robot
bigConf <- loadConfig (configKey instDesc)
liftIO $ writeIORef (strategyConfig inst) (confStrategy bigConf))
return $ Right ()
Nothing -> return $ Left "Unable to load config"
getState :: T.Text -> JunctionM (Either T.Text B.ByteString)
getState instId = do
robotsMap' <- asks peRobots
robots <- liftIO $ readIORef robotsMap'
case M.lookup instId robots of
Just robot -> do
Right <$> onStrategyInstanceM robot
(\inst -> do
v <- liftIO $ readIORef (strategyState inst)
return $ BL.toStrict $ encode v)
Nothing -> return $ Left $ "Unknown robot: " <> instId
setState :: T.Text -> B.ByteString -> JunctionM (Either T.Text ())
setState instId newState = do
robotsMap' <- asks peRobots
robots <- liftIO $ readIORef robotsMap'
case M.lookup instId robots of
Just robot -> do
onStrategyInstanceM robot
(\inst -> do
case decode . BL.fromStrict $ newState of
Just newS -> do
liftIO $ writeIORef (strategyState inst) newS
return $ Right ()
Nothing -> return $ Left $ "Unable to decode state for " <> instId)
Nothing -> return $ Left $ "Unknown robot: " <> instId

39
src/ATrade/Driver/Junction/ProgramConfiguration.hs

@ -1,7 +1,4 @@
{-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
module ATrade.Driver.Junction.ProgramConfiguration module ATrade.Driver.Junction.ProgramConfiguration
( (
@ -9,12 +6,8 @@ module ATrade.Driver.Junction.ProgramConfiguration
ProgramConfiguration(..) ProgramConfiguration(..)
) where ) where
import ATrade.Driver.Junction.Types (StrategyInstanceDescriptor) import ATrade.Driver.Junction.Types (StrategyInstanceDescriptor)
import ATrade.Logging (Severity (..))
import qualified Data.Text as T import qualified Data.Text as T
import Dhall (FromDhall, autoWith) import Dhall (FromDhall)
import Dhall.Core (Expr (..), FieldSelection (..))
import qualified Dhall.Map
import Dhall.Marshal.Decode (Decoder (..), typeError)
import GHC.Generics (Generic) import GHC.Generics (Generic)
newtype ProgramOptions = newtype ProgramOptions =
@ -30,43 +23,15 @@ data ProgramConfiguration =
brokerNotificationEndpoint :: T.Text, brokerNotificationEndpoint :: T.Text,
brokerServerCert :: Maybe FilePath, brokerServerCert :: Maybe FilePath,
brokerClientCert :: Maybe FilePath, brokerClientCert :: Maybe FilePath,
brokerIdentity :: T.Text,
quotesourceEndpoint :: T.Text, quotesourceEndpoint :: T.Text,
quotesourceServerCert :: Maybe FilePath, quotesourceServerCert :: Maybe FilePath,
quotesourceClientCert :: Maybe FilePath, quotesourceClientCert :: Maybe FilePath,
qhpEndpoint :: T.Text, qhpEndpoint :: T.Text,
qtisEndpoint :: T.Text, qtisEndpoint :: T.Text,
remoteControlEndpoint :: T.Text,
redisSocket :: T.Text, redisSocket :: T.Text,
robotsConfigsPath :: FilePath, robotsConfigsPath :: FilePath,
logBasePath :: FilePath, logBasePath :: FilePath,
logLevels :: [(T.Text, Severity)],
instances :: [StrategyInstanceDescriptor] instances :: [StrategyInstanceDescriptor]
} deriving (Generic, Show) } deriving (Generic, Show)
instance FromDhall Severity where
autoWith _ = Decoder {..}
where
extract expr@(Field _ FieldSelection{ fieldSelectionLabel }) =
case fieldSelectionLabel of
"Trace" -> pure Trace
"Debug" -> pure Debug
"Info" -> pure Info
"Warning" -> pure Warning
"Error" -> pure Error
_ -> typeError expected expr
extract expr = typeError expected expr
expected = pure
(Union
(Dhall.Map.fromList
[ ("Trace", Nothing)
, ("Debug", Nothing)
, ("Info", Nothing)
, ("Warning", Nothing)
, ("Error", Nothing)
]
)
)
instance FromDhall ProgramConfiguration instance FromDhall ProgramConfiguration

3
src/ATrade/Driver/Junction/QuoteStream.hs

@ -21,9 +21,6 @@ instance Hashable BarTimeframe
instance Hashable QuoteSubscription instance Hashable QuoteSubscription
newtype SubscriptionId = SubscriptionId { unSubscriptionId :: Int } newtype SubscriptionId = SubscriptionId { unSubscriptionId :: Int }
deriving (Show, Eq, Generic)
instance Hashable SubscriptionId
class (Monad m) => QuoteStream m where class (Monad m) => QuoteStream m where
addSubscription :: QuoteSubscription -> BoundedChan QuoteData -> m SubscriptionId addSubscription :: QuoteSubscription -> BoundedChan QuoteData -> m SubscriptionId

166
src/ATrade/Driver/Junction/QuoteThread.hs

@ -4,7 +4,6 @@
{-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-} {-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeSynonymInstances #-} {-# LANGUAGE TypeSynonymInstances #-}
@ -14,20 +13,15 @@ module ATrade.Driver.Junction.QuoteThread
startQuoteThread, startQuoteThread,
stopQuoteThread, stopQuoteThread,
addSubscription, addSubscription,
removeSubscription,
DownloaderM, DownloaderM,
DownloaderEnv(..), DownloaderEnv(..),
runDownloaderM, runDownloaderM,
withQThread withQThread
) where ) where
import qualified ATrade.BarAggregator as BA
import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (..)) import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (..))
import ATrade.Driver.Junction.QuoteStream (QuoteSubscription (..), import ATrade.Driver.Junction.QuoteStream (QuoteSubscription (..))
SubscriptionId (SubscriptionId)) import ATrade.Logging (Message)
import ATrade.Logging (Message, logDebug,
logInfo,
logWarning)
import ATrade.Quotes.HistoryProvider (HistoryProvider (..)) import ATrade.Quotes.HistoryProvider (HistoryProvider (..))
import ATrade.Quotes.QHP (QHPHandle, requestHistoryFromQHP) import ATrade.Quotes.QHP (QHPHandle, requestHistoryFromQHP)
import ATrade.Quotes.QTIS (TickerInfo (tiLotSize, tiTickSize, tiTicker), import ATrade.Quotes.QTIS (TickerInfo (tiLotSize, tiTickSize, tiTicker),
@ -42,8 +36,7 @@ import ATrade.RoboCom.Types (Bar (barSecurity),
BarSeries (..), BarSeries (..),
BarSeriesId (BarSeriesId), BarSeriesId (BarSeriesId),
Bars, Bars,
InstrumentParameters (InstrumentParameters), InstrumentParameters (InstrumentParameters))
TickerInfoMap)
import ATrade.Types (BarTimeframe (BarTimeframe), import ATrade.Types (BarTimeframe (BarTimeframe),
ClientSecurityParams (ClientSecurityParams), ClientSecurityParams (ClientSecurityParams),
Tick (security), Tick (security),
@ -56,13 +49,11 @@ import Control.Concurrent (ThreadId, forkIO,
import Control.Concurrent.BoundedChan (BoundedChan, import Control.Concurrent.BoundedChan (BoundedChan,
newBoundedChan, newBoundedChan,
readChan, readChan,
tryWriteChan,
writeChan) writeChan)
import Control.Exception.Safe (MonadMask, import Control.Exception.Safe (MonadMask,
MonadThrow, MonadThrow,
bracket) bracket)
import Control.Monad (forM, forM_, import Control.Monad (forM, forever)
forever)
import Control.Monad.Reader (MonadIO (liftIO), ReaderT (runReaderT), import Control.Monad.Reader (MonadIO (liftIO), ReaderT (runReaderT),
lift) lift)
import Control.Monad.Reader.Class (MonadReader, asks) import Control.Monad.Reader.Class (MonadReader, asks)
@ -85,84 +76,54 @@ data QuoteThreadEnv =
QuoteThreadEnv QuoteThreadEnv
{ {
bars :: IORef Bars, bars :: IORef Bars,
endpoints :: IORef (HM.HashMap QuoteSubscription [(SubscriptionId, BoundedChan QuoteData)]), endpoints :: IORef (HM.HashMap QuoteSubscription [BoundedChan QuoteData]),
qsclient :: QuoteSourceClientHandle, qsclient :: QuoteSourceClientHandle,
paramsCache :: IORef TickerInfoMap, paramsCache :: IORef (M.Map TickerId InstrumentParameters),
downloaderChan :: BoundedChan QuoteSubscription, downloaderChan :: BoundedChan QuoteSubscription
subscriptionIdCounter :: IORef Int,
subscriptions :: IORef (HM.HashMap SubscriptionId QuoteSubscription),
aggregators :: IORef (HM.HashMap (TickerId, BarTimeframe) BA.BarAggregator)
} }
startQuoteThread :: (MonadIO m, startQuoteThread :: (MonadIO m,
MonadIO m1, MonadIO m1,
WithLog env Message m1, WithLog DownloaderEnv Message m1,
HistoryProvider m1, HistoryProvider m1,
TickerInfoProvider m1) => TickerInfoProvider m1) =>
IORef Bars -> IORef Bars ->
IORef TickerInfoMap ->
Context -> Context ->
T.Text -> T.Text ->
ClientSecurityParams -> ClientSecurityParams ->
(m1 () -> IO ()) -> (m1 () -> IO ()) ->
LogAction IO Message -> LogAction IO Message ->
m QuoteThreadHandle m QuoteThreadHandle
startQuoteThread barsRef tiRef ctx ep secparams downloadThreadRunner logger = do startQuoteThread barsRef ctx ep secparams downloadThreadRunner logger = do
chan <- liftIO $ newBoundedChan 2000 chan <- liftIO $ newBoundedChan 2000
dChan <- liftIO $ newBoundedChan 2000 dChan <- liftIO $ newBoundedChan 2000
qsc <- liftIO $ startQuoteSourceClient chan [] ctx ep secparams logger qsc <- liftIO $ startQuoteSourceClient chan [] ctx ep secparams logger
env <- liftIO $ QuoteThreadEnv barsRef <$> newIORef HM.empty <*> pure qsc <*> pure tiRef <*> pure dChan <*> newIORef 0 <*> newIORef HM.empty <*> newIORef HM.empty env <- liftIO $ QuoteThreadEnv barsRef <$> newIORef HM.empty <*> pure qsc <*> newIORef M.empty <*> pure dChan
tid <- liftIO . forkIO $ quoteThread env chan tid <- liftIO . forkIO $ quoteThread env chan
downloaderTid <- liftIO . forkIO $ downloadThreadRunner (downloaderThread env dChan) downloaderTid <- liftIO . forkIO $ downloadThreadRunner (downloaderThread env dChan)
return $ QuoteThreadHandle tid downloaderTid env return $ QuoteThreadHandle tid downloaderTid env
where where
downloaderThread env chan = do downloaderThread env chan = forever $ do
logInfo "QuoteThread" "Started" QuoteSubscription tickerid tf <- liftIO $ readChan chan
forever $ do paramsMap <- liftIO $ readIORef $ paramsCache env
QuoteSubscription tickerid tf <- liftIO $ readChan chan mbParams <- case M.lookup tickerid paramsMap of
logInfo "QuoteThread" $ "Subscription: " <> tickerid Nothing -> do
paramsMap <- liftIO $ readIORef $ paramsCache env paramsList <- getInstrumentParameters [tickerid]
mbParams <- case M.lookup tickerid paramsMap of case paramsList of
Nothing -> do (params:_) -> liftIO $ atomicModifyIORef' (paramsCache env) (\m -> (M.insert tickerid params m, Just params))
paramsList <- getInstrumentParameters [tickerid] _ -> return Nothing
case paramsList of Just params -> return $ Just params
(params:_) -> liftIO $ atomicModifyIORef' (paramsCache env) (\m -> (M.insert tickerid params m, Just params)) barsMap <- liftIO $ readIORef (bars env)
_ -> return Nothing case M.lookup (BarSeriesId tickerid tf) barsMap of
Just params -> return $ Just params Just _ -> return () -- already downloaded
logDebug "QuoteThread" $ "Got info params: " <> (T.pack . show $ mbParams) Nothing -> case mbParams of
barsMap <- liftIO $ readIORef (bars env) Just params -> do
case M.lookup (BarSeriesId tickerid tf) barsMap of now <- liftIO getCurrentTime
Just _ -> return () -- already downloaded barsData <- getHistory tickerid tf ((-86400 * 60) `addUTCTime` now) now
Nothing -> case mbParams of let barSeries = BarSeries tickerid tf barsData params
Just params -> do liftIO $ atomicModifyIORef' (bars env) (\m -> (M.insert (BarSeriesId tickerid tf) barSeries m, ()))
now <- liftIO getCurrentTime _ -> return () -- TODO log
-- Load data in interval [today - 60days; today + 1day]. +1 day guarantees that we will download data up until current time.
-- If we don't make this adjustment it is possible that we will get data only up to beginning of current day.
barsData <- getHistory tickerid tf ((-86400 * 60) `addUTCTime` now) (86400 `addUTCTime` now)
let barSeries = BarSeries tickerid tf barsData params
liftIO $ atomicModifyIORef' (bars env) (\m -> (M.insert (BarSeriesId tickerid tf) barSeries m, ()))
_ -> logWarning "QuoteThread" $ "Unable to find parameters for: " <> (T.pack . show $ BarSeriesId tickerid tf)
pushToBarAggregators tick = forM_ (BarTimeframe <$> [60, 300, 900, 3600]) (pushTickToAggregator tick)
pushTickToAggregator tick tf = do
aggsRef <- asks aggregators
aggs <- liftIO . readIORef $ aggsRef
let key = (security tick, tf)
case HM.lookup key aggs of
Just agg -> do
let (mbar, agg') = BA.handleTick tick agg
liftIO $ atomicModifyIORef' aggsRef (\m -> (HM.insert key agg' m, ()))
barsRef' <- asks bars
case mbar of
Just bar -> do
liftIO $ atomicModifyIORef' barsRef' (\x -> (updateBarsMap x bar tf, ()))
writeBarData bar tf (QDBar (tf, bar))
_ -> do
pure ()
_ -> do
let agg = BA.mkAggregatorFromBars (M.singleton (security tick) (BarSeries (security tick) tf [] (InstrumentParameters (security tick) 1 1))) [(0, 86400)]
liftIO $ atomicModifyIORef' aggsRef (\m -> (HM.insert key agg m, ()))
quoteThread env chan = flip runReaderT env $ forever $ do quoteThread env chan = flip runReaderT env $ forever $ do
qssData <- lift $ readChan chan qssData <- lift $ readChan chan
@ -170,25 +131,14 @@ startQuoteThread barsRef tiRef ctx ep secparams downloadThreadRunner logger = do
QDBar (tf, bar) -> do QDBar (tf, bar) -> do
barsRef' <- asks bars barsRef' <- asks bars
lift $ atomicModifyIORef' barsRef' (\x -> (updateBarsMap x bar tf, ())) lift $ atomicModifyIORef' barsRef' (\x -> (updateBarsMap x bar tf, ()))
writeBarData bar tf qssData _ -> return () -- TODO pass to bar aggregator
QDTick tick -> do let key = case qssData of
pushToBarAggregators tick QDTick tick -> QuoteSubscription (security tick) (BarTimeframe 0)
writeTickData tick qssData QDBar (tf, bar) -> QuoteSubscription (barSecurity bar) tf
writeTickData tick qssData = do
let key = QuoteSubscription (security tick) (BarTimeframe 0)
subs <- asks endpoints >>= (lift . readIORef)
case HM.lookup key subs of
Just clientChannels -> do
lift $ mapM_ (\(_, chan') -> tryWriteChan chan' qssData) clientChannels
Nothing -> return ()
writeBarData bar tf qssData = do
let key = QuoteSubscription (barSecurity bar) tf
subs <- asks endpoints >>= (lift . readIORef) subs <- asks endpoints >>= (lift . readIORef)
case HM.lookup key subs of case HM.lookup key subs of
Just clientChannels -> do Just clientChannels -> do
lift $ mapM_ (\(_, chan') -> tryWriteChan chan' qssData) clientChannels lift $ mapM_ (`writeChan` qssData) clientChannels
Nothing -> return () Nothing -> return ()
stopQuoteThread :: (MonadIO m) => QuoteThreadHandle -> m () stopQuoteThread :: (MonadIO m) => QuoteThreadHandle -> m ()
@ -197,36 +147,19 @@ stopQuoteThread (QuoteThreadHandle tid dtid env) = liftIO $ do
killThread dtid killThread dtid
stopQuoteSourceClient (qsclient env) stopQuoteSourceClient (qsclient env)
addSubscription :: (MonadIO m) => QuoteThreadHandle -> TickerId -> BarTimeframe -> BoundedChan QuoteData -> m SubscriptionId addSubscription :: (MonadIO m) => QuoteThreadHandle -> TickerId -> BarTimeframe -> BoundedChan QuoteData -> m ()
addSubscription (QuoteThreadHandle _ _ env) tid tf chan = liftIO $ do addSubscription (QuoteThreadHandle _ _ env) tid tf chan = liftIO $ do
cnt <- atomicModifyIORef' (subscriptionIdCounter env) (\c -> (c + 1, c)) writeChan (downloaderChan env) (QuoteSubscription tid tf)
let subscription = QuoteSubscription tid tf atomicModifyIORef' (endpoints env) (\m -> (doAddSubscription m tid, ()))
let subid = SubscriptionId cnt
writeChan (downloaderChan env) subscription
atomicModifyIORef' (endpoints env) (\m -> (doAddSubscription m subid tid, ()))
atomicModifyIORef' (subscriptions env) (\m -> (HM.insert subid subscription m, ()))
quoteSourceClientSubscribe (qsclient env) [(tid, BarTimeframe 0)] quoteSourceClientSubscribe (qsclient env) [(tid, BarTimeframe 0)]
return subid
where where
doAddSubscription m subid tickerid = doAddSubscription m tickerid =
let m1 = HM.alter (\case let m1 = HM.alter (\case
Just chans -> Just ((subid, chan) : chans) Just chans -> Just (chan : chans)
_ -> Just [(subid, chan)]) (QuoteSubscription tickerid tf) m in _ -> Just [chan]) (QuoteSubscription tickerid tf) m in
HM.alter (\case HM.alter (\case
Just chans -> Just ((subid, chan) : chans) Just chans -> Just (chan : chans)
_ -> Just [(subid, chan)]) (QuoteSubscription tickerid (BarTimeframe 0)) m1 _ -> Just [chan]) (QuoteSubscription tickerid (BarTimeframe 0)) m1
removeSubscription :: (MonadIO m) => QuoteThreadHandle -> SubscriptionId -> m ()
removeSubscription (QuoteThreadHandle _ _ env) subId = liftIO $ do
subs <- readIORef (subscriptions env)
case HM.lookup subId subs of
Just sub -> atomicModifyIORef' (endpoints env) (\m -> (doRemoveSubscription m sub, ()))
Nothing -> return ()
where
doRemoveSubscription m sub =
let m1 = HM.adjust (filter (\(subId', _) -> subId' == subId)) sub m in
HM.adjust (filter (\(subId', _) -> subId' == subId)) (sub0 sub) m1
sub0 sub = let QuoteSubscription tid _ = sub in QuoteSubscription tid (BarTimeframe 0)
updateBarsMap :: Bars -> Bar -> BarTimeframe -> Bars updateBarsMap :: Bars -> Bar -> BarTimeframe -> Bars
updateBarsMap barsMap bar tf = M.adjust (addToSeries bar) (BarSeriesId (barSecurity bar) tf) barsMap updateBarsMap barsMap bar tf = M.adjust (addToSeries bar) (BarSeriesId (barSecurity bar) tf) barsMap
@ -267,21 +200,12 @@ instance TickerInfoProvider DownloaderM where
(fromInteger $ tiLotSize ti) (fromInteger $ tiLotSize ti)
(tiTickSize ti) (tiTickSize ti)
withQThread :: withQThread :: DownloaderEnv -> IORef Bars -> ProgramConfiguration -> Context -> LogAction IO Message -> (QuoteThreadHandle -> IO ()) -> IO ()
DownloaderEnv withQThread env barsMap cfg ctx logger f = do
-> IORef Bars
-> IORef TickerInfoMap
-> ProgramConfiguration
-> Context
-> LogAction IO Message
-> (QuoteThreadHandle -> IO ())
-> IO ()
withQThread env barsMap tiMap cfg ctx logger f = do
securityParameters <- loadSecurityParameters securityParameters <- loadSecurityParameters
bracket bracket
(startQuoteThread (startQuoteThread
barsMap barsMap
tiMap
ctx ctx
(quotesourceEndpoint cfg) (quotesourceEndpoint cfg)
securityParameters securityParameters

151
src/ATrade/Driver/Junction/RemoteControl.hs

@ -1,151 +0,0 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE OverloadedStrings #-}
module ATrade.Driver.Junction.RemoteControl
(
handleRemoteControl
) where
import ATrade.Driver.Junction.JunctionMonad (JunctionEnv (peLogAction, peRemoteControlSocket, peRobots),
JunctionM, getState,
reloadConfig,
setState, startRobot)
import ATrade.Driver.Junction.RobotDriverThread (stopRobot)
import ATrade.Driver.Junction.Types (StrategyInstanceDescriptor)
import ATrade.Logging (Severity (Info),
logErrorWith,
logWith)
import Control.Monad (unless)
import Control.Monad.Reader (asks)
import Data.Aeson (decode)
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
import Data.List.NonEmpty (NonEmpty ((:|)))
import qualified Data.Map.Strict as M
import qualified Data.Text as T
import Data.Text.Encoding (decodeUtf8',
encodeUtf8)
import System.ZMQ4 (Event (In),
Poll (Sock), poll,
receiveMulti,
sendMulti)
import UnliftIO (MonadIO (liftIO),
atomicModifyIORef',
readIORef)
data RemoteControlResponse =
ResponseOk
| ResponseError T.Text
| ResponseData B.ByteString
deriving (Show, Eq)
data RemoteControlRequest =
StartRobot StrategyInstanceDescriptor
| StopRobot T.Text
| ReloadConfig T.Text
| GetState T.Text
| SetState T.Text B.ByteString
| Ping
deriving (Show)
data ParseError =
UnknownCmd
| UtfDecodeError
| JsonDecodeError
deriving (Show, Eq)
parseRemoteControlRequest :: B.ByteString -> Either ParseError RemoteControlRequest
parseRemoteControlRequest bs =
if
| cmd == "START" -> parseStart
| cmd == "STOP" -> parseStop
| cmd == "RELOAD_CONFIG" -> parseReloadConfig
| cmd == "GET_STATE" -> parseGetState
| cmd == "SET_STATE" -> parseSetState
| cmd == "PING" -> Right Ping
| otherwise -> Left UnknownCmd
where
cmd = B.takeWhile (/= 0x20) bs
rest = B.dropWhile (== 0x20) . B.dropWhile (/= 0x20) $ bs
parseStart = case decode . BL.fromStrict $ rest of
Just inst -> Right (StartRobot inst)
Nothing -> Left JsonDecodeError
parseStop = case decodeUtf8' rest of
Left _ -> Left UtfDecodeError
Right r -> Right (StopRobot (T.strip r))
parseReloadConfig = case decodeUtf8' rest of
Left _ -> Left UtfDecodeError
Right r -> Right (ReloadConfig (T.strip r))
parseGetState = case decodeUtf8' (B.takeWhile (/= 0x20) rest) of
Left _ -> Left UtfDecodeError
Right r -> Right (GetState r)
parseSetState = case decodeUtf8' (B.takeWhile (/= 0x20) rest) of
Left _ -> Left UtfDecodeError
Right r -> Right (SetState r (B.dropWhile (== 0x20) . B.dropWhile (/= 0x20) $ rest))
makeRemoteControlResponse :: RemoteControlResponse -> B.ByteString
makeRemoteControlResponse ResponseOk = "OK"
makeRemoteControlResponse (ResponseError msg) = "ERROR " <> encodeUtf8 msg
makeRemoteControlResponse (ResponseData d) = "DATA\n" <> d
handleRemoteControl :: Int -> JunctionM ()
handleRemoteControl timeout = do
sock <- asks peRemoteControlSocket
logger <- asks peLogAction
evs <- poll (fromIntegral timeout) [Sock sock [In] Nothing]
case evs of
(x:_) -> unless (null x) $ do
frames <- liftIO $ receiveMulti sock
case frames of
[peerId, _, rawRequest] -> do
case parseRemoteControlRequest rawRequest of
Left err -> logErrorWith logger "RemoteControl" ("Unable to parse request: " <> (T.pack . show) err)
Right request -> do
response <- handleRequest request
liftIO $ sendMulti sock $ peerId :| [B.empty, makeRemoteControlResponse response]
_ -> logErrorWith logger "RemoteControl" "Invalid incoming request"
_ -> return ()
where
handleRequest (StartRobot inst) = do
startRobot inst
return ResponseOk
handleRequest (StopRobot instId) = do
robotsRef <- asks peRobots
robots <- readIORef robotsRef
case M.lookup instId robots of
Just robot -> do
logger <- asks peLogAction
logWith logger Info "RemoteControl" $ "Stopping robot: " <> instId
stopRobot robot
liftIO $ atomicModifyIORef' robotsRef (\r -> (M.delete instId r, ()))
return ResponseOk
Nothing -> return $ ResponseError $ "Not started: " <> instId
handleRequest (ReloadConfig instId) = do
res <- reloadConfig instId
case res of
Left errmsg -> return $ ResponseError errmsg
Right () -> return ResponseOk
handleRequest (GetState instId) = do
res <- getState instId
case res of
Left errmsg -> return $ ResponseError errmsg
Right d -> return $ ResponseData d
handleRequest (SetState instId rawState) = do
res <- setState instId rawState
case res of
Left errmsg -> return $ ResponseError errmsg
Right () -> return ResponseOk
handleRequest Ping = return ResponseOk

64
src/ATrade/Driver/Junction/RobotDriverThread.hs

@ -13,17 +13,12 @@ module ATrade.Driver.Junction.RobotDriverThread
RobotM(..), RobotM(..),
RobotDriverHandle, RobotDriverHandle,
onStrategyInstance, onStrategyInstance,
onStrategyInstanceM, postNotificationEvent) where
postNotificationEvent,
stopRobot,
getInstanceDescriptor
) where
import ATrade.Broker.Protocol (Notification (OrderNotification, TradeNotification)) import ATrade.Broker.Protocol (Notification (OrderNotification, TradeNotification))
import qualified ATrade.Driver.Junction.BrokerService as Bro import qualified ATrade.Driver.Junction.BrokerService as Bro
import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription, removeSubscription), import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription),
QuoteSubscription (QuoteSubscription), QuoteSubscription (QuoteSubscription))
SubscriptionId)
import ATrade.Driver.Junction.Types (BigConfig, import ATrade.Driver.Junction.Types (BigConfig,
StrategyDescriptor, StrategyDescriptor,
StrategyInstance (StrategyInstance, strategyEventCallback), StrategyInstance (StrategyInstance, strategyEventCallback),
@ -33,32 +28,31 @@ import ATrade.Driver.Junction.Types (BigConfig,
eventCallback, stateKey, eventCallback, stateKey,
strategyId, tickerId, strategyId, tickerId,
timeframe) timeframe)
import ATrade.Logging (Message, log) import ATrade.Logging (Message, log, logDebug,
logInfo, logWarning)
import ATrade.QuoteSource.Client (QuoteData (..)) import ATrade.QuoteSource.Client (QuoteData (..))
import ATrade.RoboCom.ConfigStorage (ConfigStorage) import ATrade.RoboCom.ConfigStorage (ConfigStorage)
import ATrade.RoboCom.Monad (Event (NewBar, NewTick, NewTrade, OrderUpdate), import ATrade.RoboCom.Monad (Event (NewBar, NewTick, NewTrade, OrderSubmitted, OrderUpdate),
MonadRobot (..), MonadRobot (..),
StrategyEnvironment (..)) StrategyEnvironment (StrategyEnvironment, _seInstanceId, _seLastTimestamp))
import ATrade.RoboCom.Persistence (MonadPersistence) import ATrade.RoboCom.Persistence (MonadPersistence)
import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId), import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId),
Bars, TickerInfoMap) Bars)
import ATrade.Types (OrderId, OrderState, import ATrade.Types (Order (orderId), OrderId,
Tick (value), Trade) OrderState, Trade)
import Colog (HasLog (getLogAction, setLogAction), import Colog (HasLog (getLogAction, setLogAction),
LogAction) LogAction)
import Control.Concurrent (ThreadId, forkIO, import Control.Concurrent (ThreadId, forkIO)
killThread)
import Control.Concurrent.BoundedChan (BoundedChan, import Control.Concurrent.BoundedChan (BoundedChan,
newBoundedChan, readChan, newBoundedChan, readChan,
writeChan) writeChan)
import Control.Exception.Safe (MonadThrow) import Control.Exception.Safe (MonadThrow)
import Control.Monad (forM, forM_, forever, import Control.Monad (forM_, forever, void)
void, when)
import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Reader (MonadReader (local), import Control.Monad.Reader (MonadReader (local),
ReaderT, asks) ReaderT, asks)
import Data.Aeson (FromJSON, ToJSON) import Data.Aeson (FromJSON, ToJSON)
import Data.Default (Default) import Data.Default
import Data.IORef (IORef, import Data.IORef (IORef,
atomicModifyIORef', atomicModifyIORef',
readIORef, writeIORef) readIORef, writeIORef)
@ -70,7 +64,7 @@ import Dhall (FromDhall)
import Prelude hiding (log) import Prelude hiding (log)
data RobotDriverHandle = forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) => data RobotDriverHandle = forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) =>
RobotDriverHandle StrategyInstanceDescriptor (StrategyInstance c s) ThreadId ThreadId (BoundedChan RobotDriverEvent) [SubscriptionId] RobotDriverHandle (StrategyInstance c s) ThreadId ThreadId (BoundedChan RobotDriverEvent)
data RobotDriverRequest data RobotDriverRequest
@ -93,7 +87,7 @@ robotDriverThread inst eventQueue =
handleEvent (EventRequest _) = return () handleEvent (EventRequest _) = return ()
handleEvent (QuoteEvent d) = handleEvent (QuoteEvent d) =
case d of case d of
QDTick tick -> when (value tick /= 0) $ strategyEventCallback inst (NewTick tick) QDTick tick -> strategyEventCallback inst (NewTick tick)
QDBar (tf, bar) -> strategyEventCallback inst (NewBar (tf, bar)) QDBar (tf, bar) -> strategyEventCallback inst (NewBar (tf, bar))
handleEvent (NewTradeEvent trade) = strategyEventCallback inst (NewTrade trade) handleEvent (NewTradeEvent trade) = strategyEventCallback inst (NewTrade trade)
handleEvent (OrderEvent oid newState) = strategyEventCallback inst (OrderUpdate oid newState) handleEvent (OrderEvent oid newState) = strategyEventCallback inst (OrderUpdate oid newState)
@ -124,29 +118,19 @@ createRobotDriverThread instDesc strDesc runner bigConf rConf rState rTimers = d
let inst = StrategyInstance (strategyId instDesc) (eventCallback strDesc) rState rConf rTimers let inst = StrategyInstance (strategyId instDesc) (eventCallback strDesc) rState rConf rTimers
quoteQueue <- liftIO $ newBoundedChan 2000 quoteQueue <- liftIO $ newBoundedChan 2000
subIds <- forM (confTickers bigConf) (\x -> addSubscription (QuoteSubscription (tickerId x) (timeframe x)) quoteQueue) forM_ (confTickers bigConf) (\x -> addSubscription (QuoteSubscription (tickerId x) (timeframe x)) quoteQueue)
qthread <- liftIO . forkIO $ forever $ passQuoteEvents eventQueue quoteQueue qthread <- liftIO . forkIO $ forever $ passQuoteEvents eventQueue quoteQueue
driver <- liftIO . forkIO $ runner $ robotDriverThread inst eventQueue driver <- liftIO . forkIO $ runner $ robotDriverThread inst eventQueue
return $ RobotDriverHandle instDesc inst driver qthread eventQueue subIds return $ RobotDriverHandle inst driver qthread eventQueue
where where
passQuoteEvents eventQueue quoteQueue = do passQuoteEvents eventQueue quoteQueue = do
v <- readChan quoteQueue v <- readChan quoteQueue
writeChan eventQueue (QuoteEvent v) writeChan eventQueue (QuoteEvent v)
stopRobot :: (MonadIO m, QuoteStream m) => RobotDriverHandle -> m ()
stopRobot (RobotDriverHandle _ _ driver qthread _ subIds) = do
forM_ subIds removeSubscription
liftIO $ killThread driver
liftIO $ killThread qthread
onStrategyInstance :: RobotDriverHandle -> forall r. (forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) => StrategyInstance c s -> r) -> r onStrategyInstance :: RobotDriverHandle -> forall r. (forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) => StrategyInstance c s -> r) -> r
onStrategyInstance (RobotDriverHandle _ inst _ _ _ _) f = f inst onStrategyInstance (RobotDriverHandle inst _ _ _) f = f inst
onStrategyInstanceM :: (MonadIO m) => RobotDriverHandle ->
(forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) => StrategyInstance c s -> m r) -> m r
onStrategyInstanceM (RobotDriverHandle _ inst _ _ _ _) f = f inst
data RobotEnv c s = data RobotEnv c s =
RobotEnv RobotEnv
@ -155,7 +139,6 @@ data RobotEnv c s =
configRef :: IORef c, configRef :: IORef c,
timersRef :: IORef [UTCTime], timersRef :: IORef [UTCTime],
bars :: IORef Bars, bars :: IORef Bars,
tickerInfoMap :: IORef TickerInfoMap,
env :: IORef StrategyEnvironment, env :: IORef StrategyEnvironment,
logAction :: LogAction (RobotM c s) Message, logAction :: LogAction (RobotM c s) Message,
brokerService :: Bro.BrokerService, brokerService :: Bro.BrokerService,
@ -177,7 +160,7 @@ instance MonadRobot (RobotM c s) c s where
cancelOrder oid = do cancelOrder oid = do
bro <- asks brokerService bro <- asks brokerService
Bro.cancelOrder bro oid liftIO . void $ Bro.cancelOrder bro oid
appendToLog s t = do appendToLog s t = do
instId <- _seInstanceId <$> (asks env >>= liftIO . readIORef) instId <- _seInstanceId <$> (asks env >>= liftIO . readIORef)
@ -200,17 +183,12 @@ instance MonadRobot (RobotM c s) c s where
b <- asks bars >>= liftIO . readIORef b <- asks bars >>= liftIO . readIORef
return $ M.lookup (BarSeriesId tid tf) b return $ M.lookup (BarSeriesId tid tf) b
getTickerInfo tid = do
b <- asks tickerInfoMap >>= liftIO . readIORef
return $ M.lookup tid b
getAvailableTickers = asks tickers getAvailableTickers = asks tickers
postNotificationEvent :: (MonadIO m) => RobotDriverHandle -> Notification -> m () postNotificationEvent :: (MonadIO m) => RobotDriverHandle -> Notification -> m ()
postNotificationEvent (RobotDriverHandle _ _ _ _ eventQueue _) notification = liftIO $ postNotificationEvent (RobotDriverHandle _ _ _ eventQueue) notification = liftIO $
case notification of case notification of
OrderNotification _ oid state -> writeChan eventQueue (OrderEvent oid state) OrderNotification _ oid state -> writeChan eventQueue (OrderEvent oid state)
TradeNotification _ trade -> writeChan eventQueue (NewTradeEvent trade) TradeNotification _ trade -> writeChan eventQueue (NewTradeEvent trade)
getInstanceDescriptor :: RobotDriverHandle -> StrategyInstanceDescriptor
getInstanceDescriptor (RobotDriverHandle instDesc _ _ _ _ _) = instDesc

15
src/ATrade/Driver/Junction/Types.hs

@ -1,7 +1,6 @@
{-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-} {-# LANGUAGE RankNTypes #-}
module ATrade.Driver.Junction.Types module ATrade.Driver.Junction.Types
@ -17,8 +16,7 @@ module ATrade.Driver.Junction.Types
import ATrade.RoboCom.Monad (EventCallback) import ATrade.RoboCom.Monad (EventCallback)
import ATrade.Types (BarTimeframe (..), TickerId) import ATrade.Types (BarTimeframe (..), TickerId)
import Data.Aeson (FromJSON (..), ToJSON (..), withObject, import Data.Aeson (FromJSON (..), ToJSON (..))
(.:))
import Data.Default (Default) import Data.Default (Default)
import Data.IORef (IORef) import Data.IORef (IORef)
import qualified Data.Text as T import qualified Data.Text as T
@ -68,17 +66,6 @@ data StrategyInstanceDescriptor =
instance FromDhall StrategyInstanceDescriptor instance FromDhall StrategyInstanceDescriptor
instance FromJSON StrategyInstanceDescriptor where
parseJSON = withObject "StrategyInstanceDescriptor" $ \obj ->
StrategyInstanceDescriptor <$>
obj .: "account_id" <*>
obj .: "strategy_id" <*>
obj .: "strategy_base_name" <*>
obj .: "config_key" <*>
obj .: "state_key" <*>
obj .: "log_path"
data StrategyInstance c s = data StrategyInstance c s =
StrategyInstance StrategyInstance
{ {

114
src/ATrade/Driver/Real.hs

@ -1,8 +1,11 @@
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE CPP #-}
{-# LANGUAGE MultiWayIf #-} {-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE BangPatterns #-} {-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE CPP #-} {-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE RankNTypes #-} {-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
module ATrade.Driver.Real ( module ATrade.Driver.Real (
StrategyInstanceParams(..), StrategyInstanceParams(..),
@ -11,45 +14,56 @@ module ATrade.Driver.Real (
barStrategyDriver barStrategyDriver
) where ) where
import Options.Applicative import ATrade.BarAggregator
import System.IO import ATrade.Driver.Real.BrokerClientThread
import System.Signal import ATrade.Driver.Real.QuoteSourceThread
import System.Exit import ATrade.Driver.Types (InitializationCallback, StrategyInstanceParams (..))
import System.Random import ATrade.Exceptions
import System.Log.Logger import ATrade.Quotes (MonadHistory (..), MonadInstrumentParametersSource (..))
import System.Log.Handler.Simple import ATrade.Quotes.QHP as QQ
import System.Log.Handler (setFormatter) import ATrade.Quotes.QTIS (TickerInfo (..),
import System.Log.Formatter qtisGetTickersInfo)
import Control.Monad import ATrade.RoboCom.Monad (Event (..),
import Control.Concurrent hiding (writeChan, readChan, writeList2Chan, yield) EventCallback,
import Control.Concurrent.BoundedChan as BC MonadRobot (..),
import Control.Exception StrategyEnvironment (..),
import qualified Data.ByteString as BS seBars, seLastTimestamp)
import qualified Data.ByteString.Lazy as BL import ATrade.RoboCom.Types (BarSeries (..), InstrumentParameters (..),
import qualified Data.List as L Ticker (..),
import qualified Data.Map as M Timeframe (..))
import qualified Data.Text as T import ATrade.RoboCom.Utils (fromHMS)
import Data.Text.Encoding import ATrade.Types
import Data.Aeson import Control.Concurrent hiding (readChan,
import Data.IORef writeChan,
import Data.Time.Calendar writeList2Chan, yield)
import Data.Time.Clock import Control.Concurrent.BoundedChan as BC
import Data.Time.Clock.POSIX import Control.Exception.Safe
import Data.Maybe import Control.Lens hiding (Context, (.=))
import Data.Monoid import Control.Monad
import Database.Redis hiding (info, decode) import Control.Monad.Reader
import ATrade.Types import Data.Aeson
import ATrade.RoboCom.Monad (StrategyMonad, StrategyAction(..), EventCallback, Event(..), runStrategyElement, StrategyEnvironment(..), Event(..)) import qualified Data.ByteString as BS
import ATrade.BarAggregator import qualified Data.ByteString.Lazy as BL
import ATrade.Driver.Real.BrokerClientThread import Data.IORef
import ATrade.Driver.Real.QuoteSourceThread import qualified Data.Map as M
import ATrade.Driver.Real.Types (Strategy(..), StrategyInstanceParams(..), InitializationCallback) import Data.Maybe
import ATrade.RoboCom.Types (BarSeries(..), Ticker(..), Timeframe(..)) import qualified Data.Text as T
import ATrade.Exceptions import Data.Text.Encoding
import ATrade.Quotes.Finam as QF import qualified Data.Text.Lazy as TL
import ATrade.Quotes.QHP as QQ import Data.Time.Calendar
import ATrade.Quotes.HAP as QH import Data.Time.Clock
import System.ZMQ4 hiding (Event(..)) import Data.Time.Clock.POSIX
import Database.Redis hiding (decode, info)
import GHC.Generics
import Options.Applicative
import System.Exit
import System.IO
import System.Log.Formatter
import System.Log.Handler (setFormatter)
import System.Log.Handler.Simple
import System.Log.Logger
import System.Signal
import System.ZMQ4 hiding (Event (..))
data Params = Params { data Params = Params {
instanceId :: String, instanceId :: String,
@ -408,18 +422,6 @@ barStrategyDriver downloadDelta instanceParams callback shutdownVar = do
nowRef <- asks envLastTimestamp nowRef <- asks envLastTimestamp
lift $ writeIORef nowRef newTimestamp lift $ writeIORef nowRef newTimestamp
newTimers <- catMaybes <$> (readIORef timersRef >>= mapM (checkTimer eventChan newTimestamp))
atomicWriteIORef timersRef newTimers
let !newenv = env { seBars = currentBars, seLastTimestamp = newTimestamp }
let (!newState, !actions, _) = runStrategyElement params curState newenv $ (eventCallback strategy) event
writeIORef stateRef newState
writeIORef timersRef newTimers
newTimers' <- catMaybes <$> mapM handleTimerActions actions
mapM_ (handleActions ordersChan) actions
readAndHandleEvents agg ordersChan eventChan (strategy' { currentState = newState, strategyTimers = newTimers ++ newTimers' }) newenv
else debugM "Strategy" "Shutdown requested"
timersRef <- asks envTimers timersRef <- asks envTimers
oldTimers <- lift $ readIORef timersRef oldTimers <- lift $ readIORef timersRef
newTimers <- catMaybes <$> mapM (checkTimer eventChan newTimestamp) oldTimers newTimers <- catMaybes <$> mapM (checkTimer eventChan newTimestamp) oldTimers

94
src/ATrade/Driver/Real/BrokerClientThread.hs

@ -1,5 +1,4 @@
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TupleSections #-}
module ATrade.Driver.Real.BrokerClientThread ( module ATrade.Driver.Real.BrokerClientThread (
startBrokerClientThread, startBrokerClientThread,
@ -28,58 +27,47 @@ import Data.Time.Clock
import System.Log.Logger import System.Log.Logger
import System.ZMQ4 hiding (Event) import System.ZMQ4 hiding (Event)
data BrokerCommand = BrokerSubmitOrder Order | BrokerCancelOrder Integer | BrokerRequestNotifications | BrokerHandleNotification Notification data BrokerCommand = BrokerSubmitOrder Order | BrokerCancelOrder Integer | BrokerRequestNotifications
startBrokerClientThread :: T.Text -> Context -> T.Text -> T.Text -> BoundedChan BrokerCommand -> BoundedChan Event -> MVar a -> IO ThreadId
startBrokerClientThread instId ctx brEp notifEp ordersChan eventChan shutdownVar = do
let callback = writeChan ordersChan . BrokerHandleNotification
forkIO $ whileM_ (isNothing <$> tryReadMVar shutdownVar) $
bracket (startBrokerClient (encodeUtf8 instId) ctx brEp notifEp [callback] defaultClientSecurityParams)
(\bro -> do
stopBrokerClient bro
debugM "Strategy" "Broker client: stop")
(\bs -> handle (\e -> do
warningM "Strategy" $ "Broker client: exception: " ++ show (e :: SomeException)
throwIO e) $ do
now <- getCurrentTime
lastNotificationTime <- newIORef now
lastKnownSqnum <- newIORef 0
whileM_ (andM [notTimeout lastNotificationTime, isNothing <$> tryReadMVar shutdownVar]) $ do
brokerCommand <- readChan ordersChan
case brokerCommand of
BrokerSubmitOrder order -> do
debugM "Strategy" $ "Submitting order: " ++ show order
result <- submitOrder bs order
debugM "Strategy" "Order submitted"
case result of
Right _ -> debugM "Strategy" $ "Order submitted: " ++ show (orderId order)
Left errmsg -> debugM "Strategy" $ T.unpack $ "Error: " `T.append` errmsg
BrokerCancelOrder oid -> do
debugM "Strategy" $ "Cancelling order: " ++ show oid
_ <- cancelOrder bs oid
debugM "Strategy" "Order cancelled"
BrokerRequestNotifications -> do
t <- getCurrentTime
nt <- readIORef lastNotificationTime
when (t `diffUTCTime` nt > 1) $ do
maybeNs <- getNotifications bs
case maybeNs of
Left errmsg -> debugM "Strategy" $ T.unpack $ "Error: " `T.append` errmsg
Right ns -> do
mapM_ (\n -> do
prevSqnum <- atomicModifyIORef lastKnownSqnum (\s -> (getNotificationSqnum n, s))
when (prevSqnum + 1 < getNotificationSqnum n) $
warningM "Strategy" $ "Sqnum jump: " ++ show prevSqnum ++ "->" ++ show (getNotificationSqnum n)
sendNotification eventChan n) ns
getCurrentTime >>= writeIORef lastNotificationTime
BrokerHandleNotification notification -> do
sendNotification eventChan n
prevSqnum <- atomicModifyIORef lastKnownSqnum (\s -> (getNotificationSqnum n, s))
undefined startBrokerClientThread :: T.Text -> Context -> T.Text -> BoundedChan BrokerCommand -> BoundedChan Event -> MVar a -> IO ThreadId
nTimeout <- notTimeout lastNotificationTime startBrokerClientThread instId ctx brEp ordersChan eventChan shutdownVar = forkIO $ whileM_ (isNothing <$> tryReadMVar shutdownVar) $
shouldShutdown <- isNothing <$> tryReadMVar shutdownVar bracket (startBrokerClient (encodeUtf8 instId) ctx brEp defaultClientSecurityParams)
debugM "Strategy" $ "Broker loop end: " ++ show nTimeout ++ "/" ++ show shouldShutdown) (\bro -> do
stopBrokerClient bro
debugM "Strategy" "Broker client: stop")
(\bs -> handle (\e -> do
warningM "Strategy" $ "Broker client: exception: " ++ show (e :: SomeException)
throwIO e) $ do
now <- getCurrentTime
lastNotificationTime <- newIORef now
whileM_ (andM [notTimeout lastNotificationTime, isNothing <$> tryReadMVar shutdownVar]) $ do
brokerCommand <- readChan ordersChan
case brokerCommand of
BrokerSubmitOrder order -> do
debugM "Strategy" $ "Submitting order: " ++ show order
maybeOid <- submitOrder bs order
debugM "Strategy" "Order submitted"
case maybeOid of
Right oid -> writeChan eventChan (OrderSubmitted order { orderId = oid })
Left errmsg -> debugM "Strategy" $ T.unpack $ "Error: " `T.append` errmsg
BrokerCancelOrder oid -> do
debugM "Strategy" $ "Cancelling order: " ++ show oid
_ <- cancelOrder bs oid
debugM "Strategy" "Order cancelled"
BrokerRequestNotifications -> do
t <- getCurrentTime
nt <- readIORef lastNotificationTime
when (t `diffUTCTime` nt > 1) $ do
maybeNs <- getNotifications bs
case maybeNs of
Left errmsg -> debugM "Strategy" $ T.unpack $ "Error: " `T.append` errmsg
Right ns -> do
mapM_ (sendNotification eventChan) ns
getCurrentTime >>= (writeIORef lastNotificationTime)
nTimeout <- notTimeout lastNotificationTime
shouldShutdown <- isNothing <$> tryReadMVar shutdownVar
debugM "Strategy" $ "Broker loop end: " ++ show nTimeout ++ "/" ++ show shouldShutdown)
notTimeout :: IORef UTCTime -> IO Bool notTimeout :: IORef UTCTime -> IO Bool
notTimeout ts = do notTimeout ts = do
@ -90,5 +78,5 @@ notTimeout ts = do
sendNotification :: BoundedChan Event -> Notification -> IO () sendNotification :: BoundedChan Event -> Notification -> IO ()
sendNotification eventChan notification = sendNotification eventChan notification =
writeChan eventChan $ case notification of writeChan eventChan $ case notification of
OrderNotification sqnum oid state -> OrderUpdate oid state OrderNotification oid state -> OrderUpdate oid state
TradeNotification sqnum trade -> NewTrade trade TradeNotification trade -> NewTrade trade

3
src/ATrade/Quotes/QTIS.hs

@ -37,8 +37,9 @@ instance ToJSON TickerInfo where
"lot_size" .= tiLotSize ti, "lot_size" .= tiLotSize ti,
"tick_size" .= tiTickSize ti ] "tick_size" .= tiTickSize ti ]
qtisGetTickersInfo :: (MonadIO m) => Context -> T.Text -> TickerId -> m TickerInfo qtisGetTickersInfo :: (WithLog env Message m, MonadIO m) => Context -> T.Text -> TickerId -> m TickerInfo
qtisGetTickersInfo ctx endpoint tickerId = do qtisGetTickersInfo ctx endpoint tickerId = do
logInfo "QTIS" $ "Requesting ticker: " <> tickerId <> " from " <> endpoint
liftIO $ withSocket ctx Req $ \sock -> do liftIO $ withSocket ctx Req $ \sock -> do
connect sock $ T.unpack endpoint connect sock $ T.unpack endpoint
send sock [] $ BL.toStrict tickerRequest send sock [] $ BL.toStrict tickerRequest

14
src/ATrade/RoboCom/Monad.hs

@ -20,9 +20,7 @@ module ATrade.RoboCom.Monad (
also, also,
t, t,
st, st,
getFirstTickerId, getFirstTickerId) where
getTickerAnyTimeframe
) where
import ATrade.RoboCom.Types import ATrade.RoboCom.Types
import ATrade.Types import ATrade.Types
@ -32,7 +30,6 @@ import Data.Aeson.Types
import qualified Data.Text as T import qualified Data.Text as T
import qualified Data.Text.Lazy as TL import qualified Data.Text.Lazy as TL
import Data.Time.Clock import Data.Time.Clock
import qualified Data.List as L
import Language.Haskell.Printf import Language.Haskell.Printf
import Language.Haskell.TH.Quote (QuasiQuoter) import Language.Haskell.TH.Quote (QuasiQuoter)
import ATrade.Logging (Severity) import ATrade.Logging (Severity)
@ -54,20 +51,11 @@ class (Monad m) => MonadRobot m c s | m -> c, m -> s where
setState (f oldState) setState (f oldState)
getEnvironment :: m StrategyEnvironment getEnvironment :: m StrategyEnvironment
getTicker :: TickerId -> BarTimeframe -> m (Maybe BarSeries) getTicker :: TickerId -> BarTimeframe -> m (Maybe BarSeries)
getTickerInfo :: TickerId -> m (Maybe InstrumentParameters)
getAvailableTickers :: m (NonEmpty BarSeriesId) getAvailableTickers :: m (NonEmpty BarSeriesId)
getFirstTickerId :: forall c s m. (Monad m, MonadRobot m c s) => m BarSeriesId getFirstTickerId :: forall c s m. (Monad m, MonadRobot m c s) => m BarSeriesId
getFirstTickerId = NE.head <$> getAvailableTickers getFirstTickerId = NE.head <$> getAvailableTickers
getTickerAnyTimeframe :: forall c s m. (Monad m, MonadRobot m c s) => TickerId -> m (Maybe BarSeries)
getTickerAnyTimeframe requestedTickerId = do
tickers <- getAvailableTickers
case L.find (\(BarSeriesId tid _) -> tid == requestedTickerId) tickers of
Just (BarSeriesId tid tf) -> getTicker tid tf
Nothing -> return Nothing
st :: QuasiQuoter st :: QuasiQuoter
st = t st = t

258
src/ATrade/RoboCom/Positions.hs

@ -1,4 +1,3 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE MultiWayIf #-} {-# LANGUAGE MultiWayIf #-}
@ -9,20 +8,18 @@
{-| {-|
- Module : ATrade.RoboCom.Combinators - Module : ATrade.RoboCom.Combinators
- Description : Reusable behavioural components of strategies - Description : Reusable behavioural components of strategies
- Copyright : (c) Denis Tereshkin 2021 - Copyright : (c) Denis Tereshkin 2016
- License : BSD 3-clause - License : Proprietary
- Maintainer : denis@kasan.ws - Maintainer : denis@kasan.ws
- Stability : experimental - Stability : experimental
- Portability : POSIX - Portability : POSIX
- -
- A lot of behaviour is common for most of the strategies. - A lot of behaviour is common for most of the strategies. This module contains those common blocks which can be composed to avoid boilerplate in main strategy code.
- This module contains those common blocks which can be composed to avoid boilerplate in main strategy code.
-} -}
module ATrade.RoboCom.Positions module ATrade.RoboCom.Positions
( (
StateHasPositions(..), StateHasPositions(..),
ParamsSize(..),
PositionState(..), PositionState(..),
Position(..), Position(..),
posIsOpen, posIsOpen,
@ -48,10 +45,12 @@ module ATrade.RoboCom.Positions
onTradeEvent, onTradeEvent,
onActionCompletedEvent, onActionCompletedEvent,
enterAtMarket, enterAtMarket,
enterAtMarketForTicker,
enterAtMarketWithParams, enterAtMarketWithParams,
enterAtLimit, enterAtLimit,
enterAtLimitWithVolume,
enterAtLimitWithParams,
enterAtLimitForTicker, enterAtLimitForTicker,
enterAtLimitForTickerWithVolume,
enterAtLimitForTickerWithParams, enterAtLimitForTickerWithParams,
enterLongAtMarket, enterLongAtMarket,
enterShortAtMarket, enterShortAtMarket,
@ -66,14 +65,8 @@ module ATrade.RoboCom.Positions
setLimitStopLoss, setLimitStopLoss,
setTakeProfit, setTakeProfit,
setStopLossAndTakeProfit, setStopLossAndTakeProfit,
handlePositions
handlePositions, ) where
calculateSizeIVS,
calculateSizeIVSWith,
calculateSizeFixed,
calculateSizeFixedCash,
calculateSizeFixedCashWith,
calculateSizeIVSWithMinimum) where
import GHC.Generics import GHC.Generics
@ -81,18 +74,17 @@ import ATrade.RoboCom.Monad
import ATrade.RoboCom.Types import ATrade.RoboCom.Types
import ATrade.Types import ATrade.Types
import Control.Lens hiding (op) import Control.Lens
import Control.Monad import Control.Monad
import ATrade.Logging (Severity (Trace, Warning)) import ATrade.Logging (Severity (Trace, Warning))
import qualified ATrade.RoboCom.Indicators as I import ATrade.RoboCom.Monad (MonadRobot (getAvailableTickers))
import Data.Aeson import Data.Aeson
import qualified Data.List as L import qualified Data.List as L
import qualified Data.List.NonEmpty as NE import qualified Data.List.NonEmpty as NE
import qualified Data.Text as T import qualified Data.Text as T
import qualified Data.Text.Lazy as TL import qualified Data.Text.Lazy as TL
import Data.Time.Clock import Data.Time.Clock
import GHC.Records (HasField (..))
data PositionState = PositionWaitingOpenSubmission Order data PositionState = PositionWaitingOpenSubmission Order
| PositionWaitingOpen | PositionWaitingOpen
@ -154,44 +146,8 @@ modifyPositions f = do
pos <- getPositions <$> getState pos <- getPositions <$> getState
modifyState (\s -> setPositions s (f pos)) modifyState (\s -> setPositions s (f pos))
class ParamsSize a where class ParamsHasMainTicker a where
getPositionSize :: a -> BarSeries -> Operation -> Int mainTicker :: a -> (BarTimeframe, TickerId)
calculateSizeIVS :: (HasField "riskSize" a Double,
HasField "stopSize" a Double,
HasField "atrPeriod" a Int) =>
a -> BarSeries -> Operation -> Int
calculateSizeIVS cfg = calculateSizeIVSWith (getField @"atrPeriod" cfg) (getField @"riskSize" cfg) (getField @"stopSize" cfg) cfg
calculateSizeIVSWithMinimum :: (HasField "riskSize" a Double,
HasField "stopSize" a Double,
HasField "atrPeriod" a Int) =>
Int -> a -> BarSeries -> Operation -> Int
calculateSizeIVSWithMinimum minVolume cfg series op = max (calculateSizeIVS cfg series op) minVolume
calculateSizeIVSWith :: Int -> Double -> Double -> a -> BarSeries -> Operation -> Int
calculateSizeIVSWith atrPeriod riskSize stopSize _ series _ =
let atr = I.atr atrPeriod (bsBars series) in
truncate (riskSize / (atr * stopSize))
calculateSizeFixed :: (HasField "positionSize" a Int) =>
a -> BarSeries -> Operation -> Int
calculateSizeFixed cfg _ _ = getField @"positionSize" cfg
calculateSizeFixedCash :: ( HasField "totalCash" a Double,
HasField "maxPositions" a Int) =>
a -> BarSeries -> Operation -> Int
calculateSizeFixedCash cfg = calculateSizeFixedCashWith (getField @"totalCash" cfg) (getField @"maxPositions" cfg) cfg
calculateSizeFixedCashWith :: Double -> Int -> a -> BarSeries -> Operation -> Int
calculateSizeFixedCashWith totalCash maxPositions cfg series _ =
case bsBars $ series of
(lastBar:_) ->
let cashPerPosition = totalCash / fromIntegral maxPositions in
truncate (cashPerPosition / ((toDouble $ barClose lastBar) * (fromIntegral $ ipLotSize . bsParams $ series)))
_ -> 0
-- | Helper function. Finds first element in list which satisfies predicate 'p' and if found, applies 'm' to it, leaving other elements intact. -- | Helper function. Finds first element in list which satisfies predicate 'p' and if found, applies 'm' to it, leaving other elements intact.
findAndModify :: (a -> Bool) -> (a -> a) -> [a] -> [a] findAndModify :: (a -> Bool) -> (a -> a) -> [a] -> [a]
@ -223,23 +179,20 @@ orderDeadline maybeDeadline lastTs =
dispatchPosition :: (StateHasPositions s, MonadRobot m c s) => Event -> Position -> m Position dispatchPosition :: (StateHasPositions s, MonadRobot m c s) => Event -> Position -> m Position
dispatchPosition event pos = dispatchPosition event pos = case posState pos of
case posState pos of PositionWaitingOpenSubmission pendingOrder -> handlePositionWaitingOpenSubmission pendingOrder
PositionWaitingOpenSubmission pendingOrder -> handlePositionWaitingOpenSubmission pendingOrder PositionWaitingOpen -> handlePositionWaitingOpen
PositionWaitingOpen -> handlePositionWaitingOpen PositionOpen -> handlePositionOpen
PositionOpen -> handlePositionOpen PositionWaitingPendingCancellation -> handlePositionWaitingPendingCancellation
PositionWaitingPendingCancellation -> handlePositionWaitingPendingCancellation PositionWaitingCloseSubmission pendingOrder -> handlePositionWaitingCloseSubmission pendingOrder
PositionWaitingCloseSubmission pendingOrder -> handlePositionWaitingCloseSubmission pendingOrder PositionWaitingClose -> handlePositionWaitingClose
PositionWaitingClose -> handlePositionWaitingClose PositionClosed -> handlePositionClosed pos
PositionClosed -> handlePositionClosed pos PositionCancelled -> handlePositionCancelled pos
PositionCancelled -> handlePositionCancelled pos
where where
handlePositionWaitingOpenSubmission pendingOrder = do handlePositionWaitingOpenSubmission pendingOrder = do
lastTs <- view seLastTimestamp <$> getEnvironment lastTs <- view seLastTimestamp <$> getEnvironment
if orderDeadline (posSubmissionDeadline pos) lastTs if orderDeadline (posSubmissionDeadline pos) lastTs
then do then return $ pos { posState = PositionCancelled } -- TODO call TimeoutHandler if present
appendToLog Warning $ [t|Submission deadline: %?, %?|] lastTs (posSubmissionDeadline pos)
return $ pos { posState = PositionCancelled } -- TODO call TimeoutHandler if present
else case event of else case event of
OrderUpdate oid Submitted -> do OrderUpdate oid Submitted -> do
return $ if orderId pendingOrder == oid return $ if orderId pendingOrder == oid
@ -421,25 +374,6 @@ newPosition order account tickerId operation quantity submissionDeadline = do
modifyPositions (\p -> position : p) modifyPositions (\p -> position : p)
return position return position
rejectedPosition :: (StateHasPositions s, MonadRobot m c s) => m Position
rejectedPosition =
return Position {
posId = "Rejected",
posAccount = "",
posTicker = "",
posBalance = 0,
posState = PositionCancelled,
posNextState = Nothing,
posStopPrice = Nothing,
posStopLimitPrice = Nothing,
posTakeProfitPrice = Nothing,
posCurrentOrder = Nothing,
posSubmissionDeadline = Nothing,
posExecutionDeadline = Nothing,
posEntryTime = Nothing,
posExitTime = Nothing
}
reapDeadPositions :: (StateHasPositions s) => EventCallback c s reapDeadPositions :: (StateHasPositions s) => EventCallback c s
reapDeadPositions _ = modifyPositions (L.filter (not . posIsDead)) reapDeadPositions _ = modifyPositions (L.filter (not . posIsDead))
@ -527,31 +461,16 @@ onActionCompletedEvent event f = case event of
ActionCompleted tag v -> f tag v ActionCompleted tag v -> f tag v
_ -> doNothing _ -> doNothing
roundTo :: Price -> Price -> Price enterAtMarket :: (StateHasPositions s, MonadRobot m c s) => T.Text -> Operation -> m Position
roundTo quant v = quant * (fromIntegral . floor . toDouble) (v / quant)
enterAtMarket :: (StateHasPositions s, ParamsSize c, MonadRobot m c s) => T.Text -> Operation -> m Position
enterAtMarket operationSignalName operation = do enterAtMarket operationSignalName operation = do
bsId <- getFirstTickerId env <- getEnvironment
enterAtMarketForTicker operationSignalName bsId operation enterAtMarketWithParams (env ^. seAccount) (env ^. seVolume) (SignalId (env ^. seInstanceId) operationSignalName "") operation
enterAtMarketForTicker :: (StateHasPositions s, ParamsSize c, MonadRobot m c s) => T.Text -> BarSeriesId -> Operation -> m Position
enterAtMarketForTicker operationSignalName (BarSeriesId tid tf) operation = do
maybeSeries <- getTicker tid tf
case maybeSeries of
Just series -> do
env <- getEnvironment
cfg <- getConfig
let quantity = getPositionSize cfg series operation
enterAtMarketWithParams (env ^. seAccount) tid quantity (SignalId (env ^. seInstanceId) operationSignalName "") operation
Nothing -> do
appendToLog Warning $ "Unable to get ticker series: " <> TL.fromStrict tid
rejectedPosition
enterAtMarketWithParams :: (StateHasPositions s, MonadRobot m c s) => T.Text -> TickerId -> Int -> SignalId -> Operation -> m Position enterAtMarketWithParams :: (StateHasPositions s, MonadRobot m c s) => T.Text -> Int -> SignalId -> Operation -> m Position
enterAtMarketWithParams account tid quantity signalId operation = do enterAtMarketWithParams account quantity signalId operation = do
oid <- submitOrder $ order tid BarSeriesId tickerId _ <- getFirstTickerId
newPosition ((order tid) { orderId = oid }) account tid operation quantity 20 oid <- submitOrder $ order tickerId
newPosition ((order tickerId) { orderId = oid }) account tickerId operation quantity 20
where where
order tickerId = mkOrder { order tickerId = mkOrder {
orderAccountId = account, orderAccountId = account,
@ -562,36 +481,36 @@ enterAtMarketWithParams account tid quantity signalId operation = do
orderSignalId = signalId orderSignalId = signalId
} }
enterAtLimit :: (StateHasPositions s, ParamsSize c, MonadRobot m c s) => T.Text -> Price -> Operation -> m Position enterAtLimit :: (StateHasPositions s, MonadRobot m c s) => NominalDiffTime -> T.Text -> Price -> Operation -> m Position
enterAtLimit operationSignalName price operation = do enterAtLimit timeToCancel operationSignalName price operation = do
bsId <- getFirstTickerId
env <- getEnvironment env <- getEnvironment
enterAtLimitForTicker bsId operationSignalName price operation enterAtLimitWithParams timeToCancel (env ^. seAccount) (env ^. seVolume) (SignalId (env ^. seInstanceId) operationSignalName "") price operation
enterAtLimitWithVolume :: (StateHasPositions s, MonadRobot m c s) => NominalDiffTime -> T.Text -> Price -> Int -> Operation -> m Position
enterAtLimitWithVolume timeToCancel operationSignalName price vol operation = do
acc <- view seAccount <$> getEnvironment
inst <- view seInstanceId <$> getEnvironment
enterAtLimitWithParams timeToCancel acc vol (SignalId inst operationSignalName "") price operation
enterAtLimitWithParams :: (StateHasPositions s, MonadRobot m c s) => NominalDiffTime -> T.Text -> Int -> SignalId -> Price -> Operation -> m Position
enterAtLimitWithParams timeToCancel account quantity signalId price operation = do
BarSeriesId tickerId _ <- getFirstTickerId
enterAtLimitForTickerWithParams tickerId timeToCancel account quantity signalId price operation
enterAtLimitForTicker :: (StateHasPositions s, ParamsSize c, MonadRobot m c s) => BarSeriesId -> T.Text -> Price -> Operation -> m Position enterAtLimitForTickerWithVolume :: (StateHasPositions s, MonadRobot m c s) => TickerId -> NominalDiffTime -> T.Text -> Price -> Int -> Operation -> m Position
enterAtLimitForTicker (BarSeriesId tid tf) operationSignalName price operation = do enterAtLimitForTickerWithVolume tickerId timeToCancel operationSignalName price vol operation = do
acc <- view seAccount <$> getEnvironment acc <- view seAccount <$> getEnvironment
inst <- view seInstanceId <$> getEnvironment inst <- view seInstanceId <$> getEnvironment
maybeSeries <- getTicker tid tf enterAtLimitForTickerWithParams tickerId timeToCancel acc vol (SignalId inst operationSignalName "") price operation
case maybeSeries of
Just series -> do enterAtLimitForTicker :: (StateHasPositions s, MonadRobot m c s) => TickerId -> NominalDiffTime -> T.Text -> Price -> Operation -> m Position
cfg <- getConfig enterAtLimitForTicker tickerId timeToCancel operationSignalName price operation = do
let quantity = getPositionSize cfg series operation acc <- view seAccount <$> getEnvironment
let roundedPrice = roundTo (ipTickSize . bsParams $ series) price inst <- view seInstanceId <$> getEnvironment
enterAtLimitForTickerWithParams tid (fromIntegral $ unBarTimeframe tf) acc quantity (SignalId inst operationSignalName "") roundedPrice operation vol <- view seVolume <$> getEnvironment
Nothing -> rejectedPosition enterAtLimitForTickerWithParams tickerId timeToCancel acc vol (SignalId inst operationSignalName "") price operation
enterAtLimitForTickerWithParams :: enterAtLimitForTickerWithParams :: (StateHasPositions s, MonadRobot m c s) => TickerId -> NominalDiffTime -> T.Text -> Int -> SignalId -> Price -> Operation -> m Position
(StateHasPositions s,
MonadRobot m c s) =>
TickerId
-> NominalDiffTime
-> T.Text
-> Int
-> SignalId
-> Price
-> Operation
-> m Position
enterAtLimitForTickerWithParams tickerId timeToCancel account quantity signalId price operation = do enterAtLimitForTickerWithParams tickerId timeToCancel account quantity signalId price operation = do
lastTs <- view seLastTimestamp <$> getEnvironment lastTs <- view seLastTimestamp <$> getEnvironment
oid <- submitOrder order oid <- submitOrder order
@ -608,23 +527,23 @@ enterAtLimitForTickerWithParams tickerId timeToCancel account quantity signalId
orderSignalId = signalId orderSignalId = signalId
} }
enterLongAtMarket :: (StateHasPositions s, ParamsSize c, MonadRobot m c s) => T.Text -> m Position enterLongAtMarket :: (StateHasPositions s, MonadRobot m c s) => T.Text -> m Position
enterLongAtMarket operationSignalName = enterAtMarket operationSignalName Buy enterLongAtMarket operationSignalName = enterAtMarket operationSignalName Buy
enterShortAtMarket :: (StateHasPositions s, ParamsSize c, MonadRobot m c s) => T.Text -> m Position enterShortAtMarket :: (StateHasPositions s, MonadRobot m c s) => T.Text -> m Position
enterShortAtMarket operationSignalName = enterAtMarket operationSignalName Sell enterShortAtMarket operationSignalName = enterAtMarket operationSignalName Sell
enterLongAtLimit :: (StateHasPositions s, ParamsSize c, MonadRobot m c s) => Price -> T.Text -> m Position enterLongAtLimit :: (StateHasPositions s, MonadRobot m c s) => NominalDiffTime -> Price -> T.Text -> m Position
enterLongAtLimit price operationSignalName = enterAtLimit operationSignalName price Buy enterLongAtLimit timeToCancel price operationSignalName = enterAtLimit timeToCancel operationSignalName price Buy
enterLongAtLimitForTicker :: (StateHasPositions s, ParamsSize c, MonadRobot m c s) => BarSeriesId -> Price -> T.Text -> m Position enterLongAtLimitForTicker :: (StateHasPositions s, MonadRobot m c s) => TickerId -> NominalDiffTime -> Price -> T.Text -> m Position
enterLongAtLimitForTicker tickerId price operationSignalName = enterAtLimitForTicker tickerId operationSignalName price Buy enterLongAtLimitForTicker tickerId timeToCancel price operationSignalName = enterAtLimitForTicker tickerId timeToCancel operationSignalName price Buy
enterShortAtLimit :: (StateHasPositions s, ParamsSize c, MonadRobot m c s) => Price -> T.Text -> m Position enterShortAtLimit :: (StateHasPositions s, MonadRobot m c s) => NominalDiffTime -> Price -> T.Text -> m Position
enterShortAtLimit price operationSignalName = enterAtLimit operationSignalName price Sell enterShortAtLimit timeToCancel price operationSignalName = enterAtLimit timeToCancel operationSignalName price Sell
enterShortAtLimitForTicker :: (StateHasPositions s, ParamsSize c, MonadRobot m c s) => BarSeriesId -> Price -> T.Text -> m Position enterShortAtLimitForTicker :: (StateHasPositions s, MonadRobot m c s) => TickerId -> NominalDiffTime -> Price -> T.Text -> m Position
enterShortAtLimitForTicker tickerId price operationSignalName = enterAtLimitForTicker tickerId operationSignalName price Sell enterShortAtLimitForTicker tickerId timeToCancel price operationSignalName = enterAtLimitForTicker tickerId timeToCancel operationSignalName price Sell
exitAtMarket :: (StateHasPositions s, MonadRobot m c s) => Position -> T.Text -> m Position exitAtMarket :: (StateHasPositions s, MonadRobot m c s) => Position -> T.Text -> m Position
exitAtMarket position operationSignalName = do exitAtMarket position operationSignalName = do
@ -661,32 +580,23 @@ exitAtLimit :: (StateHasPositions s, MonadRobot m c s) => NominalDiffTime -> Pri
exitAtLimit timeToCancel price position operationSignalName = do exitAtLimit timeToCancel price position operationSignalName = do
lastTs <- view seLastTimestamp <$> getEnvironment lastTs <- view seLastTimestamp <$> getEnvironment
inst <- view seInstanceId <$> getEnvironment inst <- view seInstanceId <$> getEnvironment
cfg <- getConfig case posCurrentOrder position of
(BarSeriesId tid tf) <- getFirstTickerId Just order -> cancelOrder (orderId order)
maybeSeries <- getTicker tid tf Nothing -> doNothing
case maybeSeries of oid <- submitOrder (closeOrder inst)
Just series -> do appendToLog Trace $ [t|exitAtLimit: %?, deadline: %?|] (posTicker position) (timeToCancel `addUTCTime` lastTs)
let roundedPrice = roundTo (ipTickSize . bsParams $ series) price modifyPosition (\pos ->
case posCurrentOrder position of pos { posCurrentOrder = Nothing,
Just order -> cancelOrder (orderId order) posState = PositionWaitingCloseSubmission (closeOrder inst) { orderId = oid },
Nothing -> doNothing posNextState = Just PositionClosed,
oid <- submitOrder (closeOrder inst roundedPrice) posSubmissionDeadline = Just $ 10 `addUTCTime` lastTs,
appendToLog Trace $ [t|exitAtLimit: %?, deadline: %?|] (posTicker position) (timeToCancel `addUTCTime` lastTs) posExecutionDeadline = Just $ timeToCancel `addUTCTime` lastTs }) position
modifyPosition (\pos ->
pos { posCurrentOrder = Nothing,
posState = PositionWaitingCloseSubmission (closeOrder inst roundedPrice) { orderId = oid },
posNextState = Just PositionClosed,
posSubmissionDeadline = Just $ 10 `addUTCTime` lastTs,
posExecutionDeadline = Just $ timeToCancel `addUTCTime` lastTs }) position
Nothing -> do
appendToLog Warning $ "Unable to locate first bar series"
return position
where where
closeOrder inst roundedPrice = mkOrder { closeOrder inst = mkOrder {
orderAccountId = posAccount position, orderAccountId = posAccount position,
orderSecurity = posTicker position, orderSecurity = posTicker position,
orderQuantity = (abs . posBalance) position, orderQuantity = (abs . posBalance) position,
orderPrice = Limit roundedPrice, orderPrice = Limit price,
orderOperation = if posBalance position > 0 then Sell else Buy, orderOperation = if posBalance position > 0 then Sell else Buy,
orderSignalId = SignalId inst operationSignalName "" orderSignalId = SignalId inst operationSignalName ""
} }

18
src/ATrade/RoboCom/Types.hs

@ -11,19 +11,14 @@ module ATrade.RoboCom.Types (
BarSeries(..), BarSeries(..),
Ticker(..), Ticker(..),
Bars, Bars,
TickerInfoMap,
InstrumentParameters(..), InstrumentParameters(..),
bsidTickerId, bsidTickerId
barSeriesId
) where ) where
import ATrade.Types import ATrade.Types
import Control.Lens.Setter (over)
import Control.Lens.Tuple (_1)
import Data.Aeson import Data.Aeson
import Data.Aeson.Key (fromText, toText)
import Data.Aeson.KeyMap as KM
import Data.Aeson.Types import Data.Aeson.Types
import qualified Data.HashMap.Strict as HM
import qualified Data.Map.Strict as M import qualified Data.Map.Strict as M
import qualified Data.Text as T import qualified Data.Text as T
import GHC.Generics (Generic) import GHC.Generics (Generic)
@ -36,8 +31,6 @@ data InstrumentParameters =
ipTickSize :: Price ipTickSize :: Price
} deriving (Show, Eq) } deriving (Show, Eq)
type TickerInfoMap = M.Map TickerId InstrumentParameters
data BarSeries = data BarSeries =
BarSeries { BarSeries {
bsTickerId :: TickerId, bsTickerId :: TickerId,
@ -46,9 +39,6 @@ data BarSeries =
bsParams :: InstrumentParameters bsParams :: InstrumentParameters
} deriving (Show, Eq) } deriving (Show, Eq)
barSeriesId :: BarSeries -> BarSeriesId
barSeriesId s = BarSeriesId (bsTickerId s) (bsTimeframe s)
-- | Ticker description record -- | Ticker description record
data Ticker = Ticker { data Ticker = Ticker {
code :: T.Text, -- ^ Main ticker code, which is used to make orders and tick parsing code :: T.Text, -- ^ Main ticker code, which is used to make orders and tick parsing
@ -67,14 +57,14 @@ instance FromJSON Ticker where
return $ Ticker nm als' tf) return $ Ticker nm als' tf)
where where
parseAliases :: Value -> Parser [(String, String)] parseAliases :: Value -> Parser [(String, String)]
parseAliases = withObject "object1" (mapM (parseAlias . over _1 toText) . KM.toList) parseAliases = withObject "object1" (mapM parseAlias . HM.toList)
parseAlias :: (T.Text, Value) -> Parser (String, String) parseAlias :: (T.Text, Value) -> Parser (String, String)
parseAlias (k, v) = withText "string1" (\s -> return (T.unpack k, T.unpack s)) v parseAlias (k, v) = withText "string1" (\s -> return (T.unpack k, T.unpack s)) v
instance ToJSON Ticker where instance ToJSON Ticker where
toJSON t = object [ "name" .= code t, toJSON t = object [ "name" .= code t,
"timeframe" .= timeframeSeconds t, "timeframe" .= timeframeSeconds t,
"aliases" .= Object (KM.fromList $ fmap (\(x, y) -> (fromText . T.pack $ x, String $ T.pack y)) $ aliases t) ] "aliases" .= Object (HM.fromList $ fmap (\(x, y) -> (T.pack x, String $ T.pack y)) $ aliases t) ]
data BarSeriesId = BarSeriesId TickerId BarTimeframe data BarSeriesId = BarSeriesId TickerId BarTimeframe
deriving (Show, Eq, Generic, Ord) deriving (Show, Eq, Generic, Ord)

7
stack.yaml

@ -18,7 +18,7 @@
# #
# resolver: ./custom-snapshot.yaml # resolver: ./custom-snapshot.yaml
# resolver: https://example.com/snapshots/2018-01-01.yaml # resolver: https://example.com/snapshots/2018-01-01.yaml
resolver: lts-20.26 resolver: lts-18.18
# User packages to be built. # User packages to be built.
# Various formats can be used as shown in the example below. # Various formats can be used as shown in the example below.
@ -48,9 +48,8 @@ extra-deps:
- binary-ieee754-0.1.0.0 - binary-ieee754-0.1.0.0
- th-printf-0.7 - th-printf-0.7
- normaldistribution-1.1.0.3 - normaldistribution-1.1.0.3
- co-log-0.5.0.0 - co-log-0.4.0.1@sha256:3d4c17f37693c80d1aa2c41669bc3438fac3e89dc5f479e57d79bc3ddc4dfcc5,5087
- chronos-1.1.5@sha256:ca35be5fdbbb384414226b4467c6d1c8b44defe59a9c8a3af32c1c5fb250c781,3830 - ansi-terminal-0.10.3@sha256:e2fbcef5f980dc234c7ad8e2fa433b0e8109132c9e643bc40ea5608cd5697797,3226
- typerep-map-0.5.0.0@sha256:34f1ba9b268a6d52e26ae460011a5571e8099b50a3f4a7c8db25dd8efe3be8ee,4667
# Override default flag values for local packages and extra-deps # Override default flag values for local packages and extra-deps
# flags: {} # flags: {}

107
test/Test/BarAggregator.hs

@ -10,21 +10,18 @@ import ATrade.BarAggregator
import ATrade.RoboCom.Types import ATrade.RoboCom.Types
import ATrade.Types import ATrade.Types
import Data.List import Data.List
import Data.List.NonEmpty (NonEmpty (..)) import qualified Data.Map.Strict as M
import qualified Data.List.NonEmpty as NE import qualified Data.Text as T
import qualified Data.Map.Strict as M
import qualified Data.Text as T
import Data.Time.Calendar import Data.Time.Calendar
import Data.Time.Clock import Data.Time.Clock
import Safe import Safe
import Hedgehog as HH
import qualified Hedgehog.Gen as Gen
import qualified Hedgehog.Range as Range
import Test.Tasty import Test.Tasty
import Test.Tasty.Hedgehog
import Test.Tasty.HUnit import Test.Tasty.HUnit
import Test.Tasty.QuickCheck as QC
import Test.Tasty.SmallCheck as SC
import ArbitraryInstances
unitTests = testGroup "BarAggregator" [ unitTests = testGroup "BarAggregator" [
@ -36,36 +33,9 @@ unitTests = testGroup "BarAggregator" [
properties = testGroup "BarAggregator" [ properties = testGroup "BarAggregator" [
prop_allTicksInOneBar prop_allTicksInOneBar
, prop_threeBars
] ]
secParams = InstrumentParameters "TEST_TICKER" 1 0.01 secParams = InstrumentParameters 1 0.01
genTick :: T.Text -> UTCTime -> Int -> Gen Tick
genTick tickerId baseTime timeframe = do
ts <- generateTimestampInsideBar baseTime timeframe
val <- fromIntegral <$> Gen.int (Range.linear 1 1000000)
vol <- Gen.integral (Range.linear 1 1000000)
return $ Tick tickerId LastTradePrice ts (fromDouble $ val / 1000) vol
where
generateTimestampInsideBar base timeframe =
flip addUTCTime base .
fromRational .
toRational .
picosecondsToDiffTime <$> Gen.integral (Range.linear 0 (truncate 1e12 * fromIntegral timeframe))
mkAggregator :: TickerId -> Int -> BarAggregator
mkAggregator tickerId tf = mkAggregatorFromBars (M.singleton tickerId (BarSeries tickerId (BarTimeframe tf) [] secParams)) [(0, 86400)]
assertBarCorrespondence :: (MonadTest m) => Bar -> NE.NonEmpty Tick -> m ()
assertBarCorrespondence bar ticks = do
barHigh bar === maximum (value <$> sortedTicks)
barLow bar === minimum (value <$> sortedTicks)
barOpen bar === value (NE.head sortedTicks)
barClose bar === value (NE.last sortedTicks)
barVolume bar === sum (volume <$> sortedTicks)
where
sortedTicks = NE.fromList . sortOn timestamp . NE.toList $ ticks
testUnknownBarSeries :: TestTree testUnknownBarSeries :: TestTree
testUnknownBarSeries = testCase "Tick with unknown ticker id" $ do testUnknownBarSeries = testCase "Tick with unknown ticker id" $ do
@ -124,13 +94,11 @@ testTwoTicksInDifferentBars = testCase "Two ticks - different bar" $ do
let (mbar, newagg) = handleTick (tick testTimestamp1 12.00) agg let (mbar, newagg) = handleTick (tick testTimestamp1 12.00) agg
mbar @?= Nothing mbar @?= Nothing
let (mbar', newagg') = handleTick (tick testTimestamp2 14.00) newagg let (mbar', newagg') = handleTick (tick testTimestamp2 14.00) newagg
mbar' @?= Just (Bar "TEST_TICKER" barEndTime 12.00 12.00 12.00 12.00 1) mbar' @?= Just (Bar "TEST_TICKER" testTimestamp1 12.00 12.00 12.00 12.00 1)
(bsBars <$> (M.lookup "TEST_TICKER" $ bars newagg')) @?= Just [Bar "TEST_TICKER" testTimestamp2 14.00 14.00 14.00 14.00 1, Bar "TEST_TICKER" testTimestamp1 12.00 12.00 12.00 12.00 1]
(bsBars <$> (M.lookup "TEST_TICKER" $ bars newagg')) @?= Just [Bar "TEST_TICKER" testTimestamp2 14.00 14.00 14.00 14.00 1, Bar "TEST_TICKER" barEndTime 12.00 12.00 12.00 12.00 1]
where where
testTimestamp1 = UTCTime (fromGregorian 1970 1 1) 58 testTimestamp1 = (UTCTime (fromGregorian 1970 1 1) 58)
barEndTime = UTCTime (fromGregorian 1970 1 1) 60 testTimestamp2 = (UTCTime (fromGregorian 1970 1 1) 61)
testTimestamp2 = UTCTime (fromGregorian 1970 1 1) 61
tick ts val = Tick { tick ts val = Tick {
security = "TEST_TICKER", security = "TEST_TICKER",
datatype = LastTradePrice, datatype = LastTradePrice,
@ -138,42 +106,27 @@ testTwoTicksInDifferentBars = testCase "Two ticks - different bar" $ do
value = fromDouble val, value = fromDouble val,
volume = 1 } volume = 1 }
prop_allTicksInOneBar :: TestTree prop_allTicksInOneBar :: TestTree
prop_allTicksInOneBar = testProperty "All ticks in one bar" $ property $ do prop_allTicksInOneBar = QC.testProperty "All ticks in one bar" $ QC.forAll (QC.choose (1, 86400)) $ \timeframe ->
tf <- forAll $ Gen.integral (Range.constant 1 86400) QC.forAll (QC.listOf1 (genTick "TEST_TICKER" baseTime timeframe)) $ \ticks ->
ticks <- forAll $ Gen.list (Range.linear 1 100) (genTick "TEST_TICKER" baseTime tf) let ticks' = sortOn timestamp ticks in
let ticks' = sortOn timestamp ticks let (newbars, agg) = handleTicks ticks' (mkAggregator "TEST_TICKER" timeframe) in
let (newbars, agg) = handleTicks ticks' (mkAggregator "TEST_TICKER" tf) null newbars &&
let (Just lastBar) = currentBar "TEST_TICKER" agg ((barHigh <$> currentBar "TEST_TICKER" agg) == Just (maximum $ value <$> ticks)) &&
HH.assert $ null newbars ((barLow <$> currentBar "TEST_TICKER" agg) == Just (minimum $ value <$> ticks)) &&
assertBarCorrespondence lastBar $ NE.fromList ticks ((barOpen <$> currentBar "TEST_TICKER" agg) == (value <$> headMay ticks')) &&
((barClose <$> currentBar "TEST_TICKER" agg) == (value <$> lastMay ticks')) &&
((barVolume <$> currentBar "TEST_TICKER" agg) == Just (sum $ volume <$> ticks))
where where
currentBar tickerId agg = headMay =<< (bsBars <$> M.lookup tickerId (bars agg)) genTick :: T.Text -> UTCTime -> Int -> Gen Tick
baseTime = UTCTime (fromGregorian 1970 1 1) 0 genTick tickerId base tf = do
difftime <- fromRational . toRational . picosecondsToDiffTime <$> choose (0, truncate 1e12 * fromIntegral tf)
val <- arbitrary
vol <- arbitrary `suchThat` (> 0)
return $ Tick tickerId LastTradePrice (difftime `addUTCTime` baseTime) val vol
mkAggregator tickerId tf = mkAggregatorFromBars (M.singleton tickerId (BarSeries tickerId (BarTimeframe tf) [] secParams)) [(0, 86400)]
prop_threeBars :: TestTree
prop_threeBars = testProperty "Three bars" $ property $ do
tf <- forAll $ Gen.integral (Range.constant 1 86400)
ticks1 <- forAll $ Gen.list (Range.linear 1 100) (genTick "TEST_TICKER" baseTime tf)
let secondBarBaseTime = addUTCTime (fromIntegral tf) baseTime
ticks2 <- forAll $ Gen.list (Range.linear 1 100) (genTick "TEST_TICKER" secondBarBaseTime tf)
let thirdBarBaseTime = addUTCTime (fromIntegral $ 2 * tf) baseTime
ticks3 <- forAll $ Gen.list (Range.linear 1 100) (genTick "TEST_TICKER" thirdBarBaseTime tf)
let ticks' = sortOn timestamp $ ticks1 <> ticks2 <> ticks3
let ([secondBar, firstBar], agg) = handleTicks ticks' (mkAggregator "TEST_TICKER" tf)
assertBarCorrespondence firstBar (NE.fromList ticks1)
assertBarCorrespondence secondBar (NE.fromList ticks2)
barTimestamp firstBar === secondBarBaseTime
barTimestamp secondBar === thirdBarBaseTime
let (Just lastBar) = currentBar "TEST_TICKER" agg
assertBarCorrespondence lastBar (NE.fromList ticks3)
where
currentBar tickerId agg = headMay =<< (bsBars <$> M.lookup tickerId (bars agg)) currentBar tickerId agg = headMay =<< (bsBars <$> M.lookup tickerId (bars agg))
baseTime = UTCTime (fromGregorian 1970 1 1) 0 baseTime = UTCTime (fromGregorian 1970 1 1) 0

19
test/Test/Driver/Junction/QuoteThread.hs

@ -1,6 +1,5 @@
{-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeSynonymInstances #-} {-# LANGUAGE TypeSynonymInstances #-}
@ -17,7 +16,6 @@ import Test.Tasty.SmallCheck as SC
import ATrade.Driver.Junction.QuoteThread (addSubscription, import ATrade.Driver.Junction.QuoteThread (addSubscription,
startQuoteThread, startQuoteThread,
stopQuoteThread) stopQuoteThread)
import ATrade.Logging (Message)
import ATrade.Quotes.HistoryProvider (HistoryProvider (..)) import ATrade.Quotes.HistoryProvider (HistoryProvider (..))
import ATrade.Quotes.TickerInfoProvider (TickerInfoProvider (..)) import ATrade.Quotes.TickerInfoProvider (TickerInfoProvider (..))
import ATrade.QuoteSource.Client (QuoteData (QDBar)) import ATrade.QuoteSource.Client (QuoteData (QDBar))
@ -28,21 +26,23 @@ import ATrade.RoboCom.Types (BarSeries (bsBars),
BarSeriesId (BarSeriesId), BarSeriesId (BarSeriesId),
InstrumentParameters (InstrumentParameters)) InstrumentParameters (InstrumentParameters))
import ATrade.Types import ATrade.Types
import Colog.Core (LogAction (..))
import Colog.Core.Class (HasLog (..))
import Control.Concurrent (forkIO, threadDelay) import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.BoundedChan (newBoundedChan, readChan, import Control.Concurrent.BoundedChan (newBoundedChan, readChan,
writeChan) writeChan)
import Control.Exception (bracket) import Control.Exception (bracket)
import Control.Monad (forever) import Control.Monad (forever)
import Control.Monad.Reader import Control.Monad.Reader
import Data.IORef (IORef, newIORef, readIORef) import Data.IORef (newIORef, readIORef)
import qualified Data.Map.Strict as M import qualified Data.Map.Strict as M
import qualified Data.Text as T import qualified Data.Text as T
import Data.Time (UTCTime (UTCTime), import Data.Time (UTCTime (UTCTime),
fromGregorian) fromGregorian)
import System.IO (BufferMode (LineBuffering), import System.IO (BufferMode (LineBuffering),
hSetBuffering, stderr) hSetBuffering, stderr)
import System.Log.Formatter
import System.Log.Handler (setFormatter)
import System.Log.Handler.Simple
import System.Log.Logger
import System.ZMQ4 (withContext) import System.ZMQ4 (withContext)
import Test.Mock.HistoryProvider (MockHistoryProvider, import Test.Mock.HistoryProvider (MockHistoryProvider,
mkMockHistoryProvider, mkMockHistoryProvider,
@ -70,16 +70,13 @@ instance TickerInfoProvider TestM where
tip <- asks tickerInfoProvider tip <- asks tickerInfoProvider
liftIO $ mockGetInstrumentParameters tip tickers liftIO $ mockGetInstrumentParameters tip tickers
instance HasLog TestEnv Message TestM where
getLogAction env = LogAction $ \msg -> return ()
qsEndpoint = "inproc://qs" qsEndpoint = "inproc://qs"
mockHistoryProvider = mkMockHistoryProvider $ M.fromList [(BarSeriesId "FOO" (BarTimeframe 3600), bars)] mockHistoryProvider = mkMockHistoryProvider $ M.fromList [(BarSeriesId "FOO" (BarTimeframe 3600), bars)]
where where
bars = [] bars = []
mockTickerInfoProvider = mkMockTickerInfoProvider $ M.fromList [("FOO", InstrumentParameters "FOO" 10 0.1)] mockTickerInfoProvider = mkMockTickerInfoProvider $ M.fromList [("FOO", InstrumentParameters 10 0.1)]
unitTests = testGroup "Driver.Junction.QuoteThread" [ unitTests = testGroup "Driver.Junction.QuoteThread" [
testSubscription testSubscription
@ -88,14 +85,12 @@ unitTests = testGroup "Driver.Junction.QuoteThread" [
testSubscription :: TestTree testSubscription :: TestTree
testSubscription = testCase "Subscription" $ withContext $ \ctx -> do testSubscription = testCase "Subscription" $ withContext $ \ctx -> do
barsRef <- newIORef M.empty barsRef <- newIORef M.empty
tiRef <- newIORef M.empty
serverChan <- newBoundedChan 2000 serverChan <- newBoundedChan 2000
let clientSecurityParams = defaultClientSecurityParams
bracket bracket
(startQuoteSourceServer serverChan ctx qsEndpoint defaultServerSecurityParams) (startQuoteSourceServer serverChan ctx qsEndpoint defaultServerSecurityParams)
stopQuoteSourceServer $ \_ -> stopQuoteSourceServer $ \_ ->
bracket bracket
(startQuoteThread barsRef tiRef ctx qsEndpoint clientSecurityParams (`runReaderT` (TestEnv mockHistoryProvider mockTickerInfoProvider)) (LogAction $ \_ -> return ())) (startQuoteThread barsRef ctx qsEndpoint Nothing Nothing (`runReaderT` (TestEnv mockHistoryProvider mockTickerInfoProvider)))
stopQuoteThread $ \qt -> do stopQuoteThread $ \qt -> do
chan <- newBoundedChan 2000 chan <- newBoundedChan 2000

2
test/Test/RoboCom/Indicators.hs

@ -7,6 +7,8 @@ module Test.RoboCom.Indicators
import Test.Tasty import Test.Tasty
import Test.Tasty.HUnit import Test.Tasty.HUnit
import Test.Tasty.QuickCheck as QC
import Test.Tasty.SmallCheck as SC
import ATrade.Types import ATrade.Types
import qualified Data.Text as T import qualified Data.Text as T

165
test/Test/RoboCom/Positions.hs

@ -1,165 +0,0 @@
{-# LANGUAGE OverloadedStrings #-}
module Test.RoboCom.Positions
(
unitTests
) where
import Test.Tasty
import Test.Tasty.HUnit
import ATrade.Types
import qualified Data.List as L
import qualified Data.Map.Strict as M
import qualified Data.Text as T
import Data.Time.Calendar
import Data.Time.Clock
import ATrade.RoboCom.Monad
import ATrade.RoboCom.Positions
import ATrade.RoboCom.Types
data TestState = TestState
{
positions :: [Position],
testInt :: Int
}
defaultState = TestState {
positions = [],
testInt = 0
}
data TestConfig = TestConfig
instance ParamsHasMainTicker TestConfig where
mainTicker _ = "TEST_TICKER"
instance StateHasPositions TestState where
getPositions = positions
setPositions a p = a { positions = p }
defaultStrategyEnvironment = StrategyEnvironment
{
seInstanceId = "test_instance",
seAccount = "test_account",
seVolume = 1,
seBars = M.empty,
seLastTimestamp = (UTCTime (fromGregorian 1970 1 1) 0)
}
unitTests = testGroup "RoboCom.Positions" [
testEnterAtMarket,
testEnterAtMarketSendsAction,
testDefaultHandlerSubmissionDeadline,
testDefaultHandlerAfterSubmissionPositionIsWaitingOpen,
testDefaultHandlerPositionWaitingOpenOrderOpenExecuted1
]
testEnterAtMarket = testCase "enterAtMarket creates position in PositionWaitingOpenSubmission state" $ do
let (newState, actions, _) = runStrategyElement TestConfig defaultState defaultStrategyEnvironment element
assertBool "Should be exactly 1 position" ((length . positions) newState == 1)
let pos = head . positions $ newState
assertBool "Should be in PositionWaitingOpenSubmission" (isPositionWaitingOpenSubmission . posState $ pos)
let (PositionWaitingOpenSubmission order) = posState pos
assertBool "Account should be 'test_account'" (orderAccountId order == "test_account")
assertBool "Security should be 'TEST_TICKER'" (orderSecurity order == "TEST_TICKER")
assertBool "Order price should be Market" (orderPrice order == Market)
assertBool "Order quantity should be 1" (orderQuantity order == 1)
assertBool "Executed order quantity should be 0" (orderExecutedQuantity order == 0)
assertBool "Order operation should be Buy" (orderOperation order == Buy)
assertBool "Order signal id should be correct" (orderSignalId order == (SignalId "test_instance" "long" ""))
where
element = enterAtMarket "long" Buy
isPositionWaitingOpenSubmission (PositionWaitingOpenSubmission _) = True
isPositionWaitingOpenSubmission _ = False
testEnterAtMarketSendsAction = testCase "enterAtMarket sends ActionSubmitOrder" $ do
let (newState, actions, _) = runStrategyElement TestConfig defaultState defaultStrategyEnvironment element
case (L.find isActionOrder actions) of
Just (ActionOrder order) -> do
assertBool "Account should be 'test_account'" (orderAccountId order == "test_account")
assertBool "Security should be 'TEST_TICKER'" (orderSecurity order == "TEST_TICKER")
assertBool "Order price should be Market" (orderPrice order == Market)
assertBool "Order quantity should be 1" (orderQuantity order == 1)
assertBool "Executed order quantity should be 0" (orderExecutedQuantity order == 0)
assertBool "Order operation should be Buy" (orderOperation order == Buy)
assertBool "Order signal id should be correct" (orderSignalId order == (SignalId "test_instance" "long" ""))
Nothing -> assertFailure "Should be exactly 1 ActionOrder"
where
element = enterAtMarket "long" Buy
isActionOrder (ActionOrder _) = True
isActionOrder _ = False
testDefaultHandlerSubmissionDeadline = testCase "defaultHandler after submission deadline marks position as cancelled" $ do
let (newState, actions, _) = runStrategyElement TestConfig defaultState defaultStrategyEnvironment element
let (newState', actions', _) = runStrategyElement TestConfig newState defaultStrategyEnvironment { seLastTimestamp = afterDeadline } $ defaultHandler (NewTick tick)
let pos = head . positions $ newState'
assertBool "Cancelled position" (posState pos == PositionCancelled)
where
element = enterAtMarket "long" Buy
afterDeadline = (UTCTime (fromGregorian 1970 1 1) 100)
tick = Tick {
security = "TEST_TICKER",
datatype = LastTradePrice,
timestamp = afterDeadline,
value = fromDouble 12.00,
volume = 1 }
testDefaultHandlerAfterSubmissionPositionIsWaitingOpen = testCase "defaultHandler after successful submission sets position state as PositionWaitingOpen" $ do
let (newState, actions, _) = runStrategyElement TestConfig defaultState defaultStrategyEnvironment element
let pos = head . positions $ newState
let (PositionWaitingOpenSubmission order) = posState pos
let (newState', actions', _) = runStrategyElement TestConfig newState defaultStrategyEnvironment { seLastTimestamp = beforeDeadline } $ defaultHandler (OrderSubmitted order {orderId = 1 })
let pos' = head . positions $ newState'
assertEqual "New position state should be PositionWaitingOpen" (posState pos') PositionWaitingOpen
where
element = enterAtMarket "long" Buy
beforeDeadline = (UTCTime (fromGregorian 1970 1 1) 1)
testDefaultHandlerPositionWaitingOpenOrderCancelledExecuted0 = testCase "defaultHandler in PositionWaitingOpen, if order is cancelled and nothing is executed, marks position as cancelled" $ do
let (newState, actions, _) = runStrategyElement TestConfig defaultState defaultStrategyEnvironment element
let pos = head . positions $ newState
let (PositionWaitingOpenSubmission order) = posState pos
let (newState', actions', _) = runStrategyElement TestConfig newState defaultStrategyEnvironment { seLastTimestamp = ts1 } $ defaultHandler (OrderSubmitted order {orderId = 1 })
let (newState'', actions'', _) = runStrategyElement TestConfig newState defaultStrategyEnvironment { seLastTimestamp = ts2 } $ defaultHandler (OrderUpdate 1 Cancelled)
let pos = head . positions $ newState''
assertEqual "New position state should be PositionCancelled" (posState pos) PositionCancelled
where
element = enterAtMarket "long" Buy
ts1 = (UTCTime (fromGregorian 1970 1 1) 1)
ts2 = (UTCTime (fromGregorian 1970 1 1) 2)
testDefaultHandlerPositionWaitingOpenOrderOpenExecuted1 = testCase "defaultHandler in PositionWaitingOpen, if order is cancelled and something is executed, marks position as open" $ do
let (newState, actions, _) = runStrategyElement TestConfig defaultState defaultStrategyEnvironment element
let pos = head . positions $ newState
let (PositionWaitingOpenSubmission order) = posState pos
let (newState', actions', _) = runStrategyElement TestConfig newState defaultStrategyEnvironment { seLastTimestamp = ts1, seVolume = 2 } $ defaultHandler (OrderSubmitted order {orderId = 1 })
let (newState'', actions'', _) = runStrategyElement TestConfig newState' defaultStrategyEnvironment { seLastTimestamp = ts2 } $ defaultHandler (NewTrade trade)
let (newState''', actions''', _) = runStrategyElement TestConfig newState'' defaultStrategyEnvironment { seLastTimestamp = ts3 } $ defaultHandler (OrderUpdate 1 Cancelled)
let pos = head . positions $ newState'''
assertEqual "New position state should be PositionOpen" (posState pos) PositionOpen
where
element = enterAtMarket "long" Buy
ts1 = (UTCTime (fromGregorian 1970 1 1) 1)
ts2 = (UTCTime (fromGregorian 1970 1 1) 2)
ts3 = (UTCTime (fromGregorian 1970 1 1) 3)
trade = Trade
{
tradeOrderId = 1,
tradePrice = fromDouble 10,
tradeQuantity = 1,
tradeVolume = fromDouble 10,
tradeVolumeCurrency = "FOO",
tradeOperation = Buy,
tradeAccount = "test_account",
tradeSecurity = "TEST_TICKER",
tradeTimestamp = ts3,
tradeCommission = fromDouble 0,
tradeSignalId = SignalId "test_instance" "long" ""
}

4
test/Test/RoboCom/Utils.hs

@ -7,9 +7,11 @@ module Test.RoboCom.Utils
import Test.Tasty import Test.Tasty
import Test.Tasty.HUnit import Test.Tasty.HUnit
import Test.Tasty.QuickCheck as QC
import Test.Tasty.SmallCheck as SC
import ATrade.Types import ATrade.Types
import qualified Data.Text as T import qualified Data.Text as T
import Data.Time.Calendar import Data.Time.Calendar
import Data.Time.Clock import Data.Time.Clock

Loading…
Cancel
Save