diff --git a/robocom-zero.cabal b/robocom-zero.cabal index 1b56a6a..9fabed7 100644 --- a/robocom-zero.cabal +++ b/robocom-zero.cabal @@ -1,5 +1,5 @@ name: robocom-zero -version: 0.1.0.0 +version: 0.2.0.0 -- synopsis: -- description: homepage: https://github.com/asakul/robocom-zero#readme @@ -7,7 +7,7 @@ license: BSD3 license-file: LICENSE author: Denis Tereshkin maintainer: denis@kasan.ws -copyright: 2018 Denis Tereshkin +copyright: 2021 Denis Tereshkin category: Web build-type: Simple extra-source-files: README.md @@ -21,15 +21,19 @@ library , ATrade.RoboCom.Positions , ATrade.RoboCom.Types , ATrade.RoboCom.Utils + , ATrade.Quotes , ATrade.Quotes.Finam - , ATrade.Quotes.HAP , ATrade.Quotes.QHP , ATrade.Quotes.QTIS , ATrade.Driver.Real , ATrade.Driver.Backtest + , ATrade.Driver.Junction + , ATrade.Driver.Junction.Types , ATrade.BarAggregator + , ATrade.RoboCom + other-modules: Paths_robocom_zero build-depends: base >= 4.7 && < 5 - , libatrade == 0.8.0.0 + , libatrade >= 0.9.0.0 && < 0.10.0.0 , text , text-icu , errors @@ -49,7 +53,6 @@ library , binary-ieee754 , zeromq4-haskell , unordered-containers - , ether , th-printf , BoundedChan , monad-loops @@ -63,12 +66,15 @@ library , signal , random , hedis + , gitrev + , data-default + , template-haskell default-language: Haskell2010 other-modules: ATrade.Exceptions , ATrade.Driver.Real.BrokerClientThread , ATrade.Driver.Real.QuoteSourceThread - , ATrade.Driver.Real.Types + , ATrade.Driver.Types test-suite robots-test type: exitcode-stdio-1.0 diff --git a/src/ATrade/Backtest/Execution.hs b/src/ATrade/Backtest/Execution.hs deleted file mode 100644 index 643c180..0000000 --- a/src/ATrade/Backtest/Execution.hs +++ /dev/null @@ -1,102 +0,0 @@ -{-# LANGUAGE OverloadedStrings #-} - -module ATrade.Backtest.Execution ( - mkExecutionAgent, - ExecutionAgent(..), - executePending, - executeStep -) where - -import qualified Data.Text as T -import qualified Data.Map as M -import qualified Data.List as L -import ATrade.Types -import ATrade.Strategy.Types -import ATrade.Strategy -import Control.Monad.State -import Control.Monad.Trans.Writer -import Data.Decimal -import Data.Time.Clock -import Data.Time.Calendar - -data Position = Position { - ticker :: T.Text, - balance :: Int } - -data ExecutionAgent = ExecutionAgent { - pendingOrders :: [Order], - cash :: Decimal, - currentTime :: UTCTime, - orderIdCounter :: Integer -} - -mkExecutionAgent startCash = ExecutionAgent { pendingOrders = [], - cash = startCash, - currentTime = UTCTime (fromGregorian 1970 1 1) 0, - orderIdCounter = 1 } - -executeAtPrice :: Order -> Decimal -> WriterT [Event] (State ExecutionAgent) () -executeAtPrice order price = do - when (orderState order == Unsubmitted) $ tell [OrderSubmitted order] - tell [OrderUpdate (orderId order) Executed] - timestamp <- gets currentTime - tell [NewTrade (mkTradeForOrder timestamp order price)] - - case orderOperation order of - Buy -> modify' (\agent -> agent { cash = cash agent - price * realFracToDecimal 10 (toRational $ orderQuantity order) }) - Sell -> modify' (\agent -> agent { cash = cash agent + price * realFracToDecimal 10 (toRational $ orderQuantity order) }) - -mkTradeForOrder timestamp order price = Trade { tradeOrderId = orderId order, - tradePrice = price, - tradeQuantity = orderQuantity order, - tradeVolume = price * realFracToDecimal 10 (toRational $ orderQuantity order), - tradeVolumeCurrency = "TEST_CURRENCY", - tradeOperation = orderOperation order, - tradeAccount = orderAccountId order, - tradeSecurity = orderSecurity order, - tradeTimestamp = timestamp, - tradeSignalId = orderSignalId order } - - -executePending :: Bars -> WriterT [Event] (State ExecutionAgent) () -executePending bars = do - orders <- gets pendingOrders - let (executedOrders, leftover) = L.partition shouldExecute orders - - mapM_ executeAtOrdersPrice executedOrders - modify' (\s -> s { pendingOrders = leftover } ) - where - executeAtOrdersPrice order = case orderPrice order of - Limit price -> executeAtPrice order price - _ -> return () -- TODO handle stops - - shouldExecute order = case M.lookup (orderSecurity order) bars of - Just (DataSeries ((ts, bar) : _)) -> case orderPrice order of - Limit price -> crosses bar price - _ -> False - Nothing -> False - - crosses bar price = (barClose bar > price && barOpen bar < price) || (barClose bar < price && barOpen bar > price) - -executeStep :: Bars -> [Order] -> WriterT [Event] (State ExecutionAgent) () -executeStep bars orders = do - -- Assign consecutive IDs - orders' <- mapM (\o -> do - id <- gets orderIdCounter - modify(\s -> s { orderIdCounter = id + 1 }) - return o { orderId = id }) orders - - let (executableNow, pending) = L.partition isExecutableNow orders' - mapM_ (executeOrderAtLastPrice bars) executableNow - modify' (\s -> s { pendingOrders = pending ++ pendingOrders s }) - - where - isExecutableNow order = case M.lookup (orderSecurity order) bars of - Just (DataSeries (x:xs)) -> case orderPrice order of - Limit price -> (orderOperation order == Buy && price >= (barClose . snd) x) || (orderOperation order == Sell && price <= (barClose . snd) x) - Market -> True - _ -> False - - executeOrderAtLastPrice bars order = case M.lookup (orderSecurity order) bars of - Just (DataSeries ((ts, bar) : _)) -> executeAtPrice order (barClose bar) - _ -> return () diff --git a/src/ATrade/BarAggregator.hs b/src/ATrade/BarAggregator.hs index 226639b..904ec74 100644 --- a/src/ATrade/BarAggregator.hs +++ b/src/ATrade/BarAggregator.hs @@ -1,4 +1,5 @@ {-# LANGUAGE BangPatterns #-} +{-# LANGUAGE MultiWayIf #-} {-| - Module : ATrade.BarAggregator @@ -19,8 +20,10 @@ module ATrade.BarAggregator ( mkAggregatorFromBars, handleTicks, handleTick, + updateTime, handleBar, - hmsToDiffTime + hmsToDiffTime, + replaceHistory ) where import ATrade.RoboCom.Types @@ -30,7 +33,6 @@ import Control.Lens import Control.Monad.State import qualified Data.Map.Strict as M import Data.Time.Clock -import Debug.Trace -- | Bar aggregator state data BarAggregator = BarAggregator { @@ -46,6 +48,9 @@ mkAggregatorFromBars myBars timeWindows = BarAggregator { lastTicks = M.empty, tickTimeWindows = timeWindows } +replaceHistory :: BarAggregator -> M.Map TickerId BarSeries -> BarAggregator +replaceHistory agg bars' = agg { bars = bars' } + lBars :: (M.Map TickerId BarSeries -> Identity (M.Map TickerId BarSeries)) -> BarAggregator -> Identity BarAggregator lBars = lens bars (\s b -> s { bars = b }) @@ -108,7 +113,7 @@ handleTick tick = runState $ do else return Nothing where - isInTimeInterval tick (a, b) = (utctDayTime . timestamp) tick >= a && (utctDayTime . timestamp) tick <= b + isInTimeInterval tick' (a, b) = (utctDayTime . timestamp) tick' >= a && (utctDayTime . timestamp) tick' <= b barFromTick !newtick = Bar { barSecurity = security newtick, barTimestamp = timestamp newtick, barOpen = value newtick, @@ -132,64 +137,65 @@ handleTick tick = runState $ do where newTimestamp = timestamp newtick - emptyBarFrom !bar newtick = newBar - where - newTimestamp = timestamp newtick - newBar = Bar { - barSecurity = barSecurity bar, - barTimestamp = newTimestamp, - barOpen = barClose bar, - barHigh = barClose bar, - barLow = barClose bar, - barClose = barClose bar, - barVolume = 0 } - -handleBar :: Bar -> BarAggregator -> (Maybe Bar, BarAggregator) -handleBar bar = runState $ do +updateTime :: Tick -> BarAggregator -> (Maybe Bar, BarAggregator) +updateTime tick = runState $ do + lLastTicks %= M.insert (security tick, datatype tick) tick tws <- gets tickTimeWindows mybars <- gets bars - if (any (isInTimeInterval bar) tws) + if (any (isInTimeInterval tick) tws) then - case M.lookup (barSecurity bar) mybars of + case M.lookup (security tick) mybars of Just series -> case bsBars series of (b:bs) -> do let currentBn = barNumber (barTimestamp b) (tfSeconds $ bsTimeframe series) - if currentBn == barNumber (barTimestamp bar) (tfSeconds $ bsTimeframe series) - then do - lBars %= M.insert (barSecurity bar) series { bsBars = updateBar b bar : bs } - return Nothing - else - if barEndTime b (tfSeconds $ bsTimeframe series) == barTimestamp bar - then do - lBars %= M.insert (barSecurity bar) series { bsBars = emptyBarFrom bar : (updateBar b bar : bs) } - return . Just $ updateBar b bar - else do - lBars %= M.insert (barSecurity bar) series { bsBars = bar : b : bs } - return . Just $ b - _ -> do - lBars %= M.insert (barSecurity bar) series { bsBars = [bar] } - return Nothing + let thisBn = barNumber (timestamp tick) (tfSeconds $ bsTimeframe series) + if + | currentBn == thisBn -> do + lBars %= M.insert (security tick) series { bsBars = updateBarTimestamp b tick : bs } + return Nothing + | currentBn < thisBn -> do + lBars %= M.insert (security tick) series { bsBars = emptyBarFromTick tick : b : bs } + return $ Just b + | otherwise -> return Nothing + _ -> return Nothing _ -> return Nothing else return Nothing where - isInTimeInterval bar' (a, b) = (utctDayTime . barTimestamp) bar' >= a && (utctDayTime . barTimestamp) bar' <= b - updateBar !bar' newbar = - let newHigh = max (barHigh bar') (barHigh newbar) - newLow = min (barLow bar') (barLow newbar) in - bar' { - barTimestamp = barTimestamp newbar, - barHigh = newHigh, - barLow = newLow, - barClose = barClose newbar, - barVolume = barVolume bar' + (abs . barVolume $ newbar) } + isInTimeInterval t (a, b) = (utctDayTime . timestamp) t >= a && (utctDayTime . timestamp) t <= b + emptyBarFromTick !newtick = Bar { barSecurity = security newtick, + barTimestamp = timestamp newtick, + barOpen = value newtick, + barHigh = value newtick, + barLow = value newtick, + barClose = value newtick, + barVolume = 0 } + + updateBarTimestamp !bar newtick = bar { barTimestamp = newTimestamp } + where + newTimestamp = timestamp newtick +handleBar :: Bar -> BarAggregator -> (Maybe Bar, BarAggregator) +handleBar bar = runState $ do + mybars <- gets bars + case M.lookup (barSecurity bar) mybars of + Just series -> case bsBars series of + (_:bs) -> do + lBars %= M.insert (barSecurity bar) series { bsBars = emptyBarFrom bar : bar : bs } + return . Just $ bar + _ -> do + lBars %= M.insert (barSecurity bar) series { bsBars = emptyBarFrom bar : [bar] } + return Nothing + _ -> return Nothing + where emptyBarFrom bar' = Bar { - barSecurity = barSecurity bar', - barTimestamp = barTimestamp bar', - barOpen = barClose bar', - barHigh = barClose bar', - barLow = barClose bar', - barClose = barClose bar', - barVolume = 0 } + barSecurity = barSecurity bar', + barTimestamp = barTimestamp bar', + barOpen = barClose bar', + barHigh = barClose bar', + barLow = barClose bar', + barClose = barClose bar', + barVolume = 0 } + + diff --git a/src/ATrade/Driver/Backtest.hs b/src/ATrade/Driver/Backtest.hs index bbd99fc..1c31d51 100644 --- a/src/ATrade/Driver/Backtest.hs +++ b/src/ATrade/Driver/Backtest.hs @@ -1,46 +1,52 @@ {-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE LambdaCase #-} +{-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE QuasiQuotes #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE QuasiQuotes #-} +{-# LANGUAGE TemplateHaskell #-} module ATrade.Driver.Backtest ( backtestMain ) where -import ATrade.Driver.Real.Types (InitializationCallback, - Strategy (..), +import ATrade.Driver.Types (InitializationCallback, StrategyInstanceParams (..)) import ATrade.Exceptions +import ATrade.Quotes import ATrade.Quotes.Finam as QF +import ATrade.Quotes.QTIS import ATrade.RoboCom.Monad (Event (..), EventCallback, - StrategyAction (..), + MonadRobot (..), StrategyEnvironment (..), - runStrategyElement, st, - appendToLog) + appendToLog, seBars, seLastTimestamp) import ATrade.RoboCom.Positions -import ATrade.RoboCom.Types (BarSeries (..), Ticker (..), - Timeframe (..)) +import ATrade.RoboCom.Types (BarSeries (..), Bars, InstrumentParameters (InstrumentParameters), + Ticker (..), Timeframe (..)) import ATrade.Types import Conduit (awaitForever, runConduit, yield, (.|)) import Control.Exception.Safe +import Control.Lens hiding (ix, (<|), (|>)) import Control.Monad.ST (runST) import Control.Monad.State -import Data.Aeson (FromJSON (..), Result (..), - Value (..), decode) +import Data.Aeson (FromJSON (..), Value (..), decode) import Data.Aeson.Types (parseMaybe) import Data.ByteString.Lazy (readFile, toStrict) +import Data.Default import Data.HashMap.Strict (lookup) -import Data.List (concat, filter, find, partition) +import Data.List (partition) import Data.List.Split (splitOn) import qualified Data.Map.Strict as M -import Data.Semigroup ((<>)) +import Data.Sequence (Seq (..), (<|), (|>)) +import qualified Data.Sequence as Seq import Data.STRef (newSTRef, readSTRef, writeSTRef) import qualified Data.Text as T import Data.Text.IO (putStrLn) +import qualified Data.Text.Lazy as TL import Data.Time.Calendar (fromGregorian) import Data.Time.Clock (DiffTime, UTCTime (..)) import Data.Vector ((!), (!?), (//)) @@ -48,23 +54,39 @@ import qualified Data.Vector as V import Options.Applicative hiding (Success) import Prelude hiding (lookup, putStrLn, readFile) import Safe (headMay) +import System.ZMQ4 hiding (Event) data Feed = Feed TickerId FilePath deriving (Show, Eq) data Params = Params { strategyConfigFile :: FilePath, - qtisEndpoint :: Maybe String, + qtisEndpoint :: String, paramsFeeds :: [Feed] } deriving (Show, Eq) +data BacktestState c s = BacktestState { + _cash :: Double, + _robotState :: s, + _robotParams :: c, + _strategyEnvironment :: StrategyEnvironment, + _pendingOrders :: [Order], + _pendingEvents :: Seq Event, + _tradesLog :: [Trade], + _orderIdCounter :: Integer, + _pendingTimers :: [UTCTime], + _logs :: [T.Text] +} + +makeLenses ''BacktestState + paramsParser :: Parser Params paramsParser = Params <$> strOption ( long "config" <> short 'c' ) - <*> optional ( strOption - ( long "qtis" <> short 'q' <> metavar "ENDPOINT/ID" )) + <*> strOption + ( long "qtis" <> short 'q' <> metavar "ENDPOINT/ID" ) <*> some (option feedArgParser ( long "feed" <> short 'f' )) @@ -74,8 +96,8 @@ feedArgParser = eitherReader (\s -> case splitOn ":" s of [tid, fpath] -> Right $ Feed (T.pack tid) fpath _ -> Left $ "Unable to parse feed id: " ++ s) -backtestMain :: (FromJSON c, StateHasPositions s) => DiffTime -> s -> Maybe (InitializationCallback c) -> EventCallback c s -> IO () -backtestMain dataDownloadDelta defaultState initCallback callback = do +backtestMain :: (FromJSON c, StateHasPositions s) => DiffTime -> s -> EventCallback c s -> IO () +backtestMain _dataDownloadDelta defaultState callback = do params <- execParser opts (tickerList, config) <- loadStrategyConfig params @@ -84,29 +106,34 @@ backtestMain dataDownloadDelta defaultState initCallback callback = do strategyAccount = "foo", strategyVolume = 1, tickers = tickerList, - strategyQuotesourceEp = "", - strategyBrokerEp = "", - strategyHistoryProviderType = "", - strategyHistoryProvider = "", - strategyQTISEp = T.pack <$> qtisEndpoint params} - - updatedConfig <- case initCallback of - Just cb -> cb config instanceParams - Nothing -> return config + strategyQTISEp = Nothing } feeds <- loadFeeds (paramsFeeds params) - runBacktestDriver feeds config tickerList + bars <- makeBars (T.pack $ qtisEndpoint params) tickerList + + runBacktestDriver feeds config bars where opts = info (helper <*> paramsParser) ( fullDesc <> header "ATrade strategy backtesting framework" ) + makeBars :: T.Text -> [Ticker] -> IO (M.Map TickerId BarSeries) + makeBars qtisEp tickersList = + withContext $ \ctx -> + M.fromList <$> mapM (mkBarEntry ctx qtisEp) tickersList + + mkBarEntry ctx qtisEp tickerEntry = do + info <- qtisGetTickersInfo ctx qtisEp (code tickerEntry) + return (code tickerEntry, BarSeries (code tickerEntry) (Timeframe (timeframeSeconds tickerEntry)) [] (InstrumentParameters (fromInteger $ tiLotSize info) (tiTickSize info))) + + + runBacktestDriver feeds params tickerList = do let s = runConduit $ barStreamFromFeeds feeds .| backtestLoop let finalState = execState (unBacktestingMonad s) $ defaultBacktestState defaultState params tickerList - print $ cash finalState - print $ tradesLog finalState - forM_ (reverse . logs $ finalState) putStrLn + print $ finalState ^. cash + print $ finalState ^. tradesLog + forM_ (reverse $ finalState ^. logs) putStrLn loadStrategyConfig :: (FromJSON c) => Params -> IO ([Ticker], c) loadStrategyConfig params = do @@ -121,14 +148,11 @@ backtestMain dataDownloadDelta defaultState initCallback callback = do Object o -> do mbTickers <- "tickers" `lookup` o mbParams <- "params" `lookup` o - tickers <- parseMaybe parseJSON mbTickers + tickers' <- parseMaybe parseJSON mbTickers params <- parseMaybe parseJSON mbParams - return (tickers, params) + return (tickers', params) _ -> Nothing - resultToMaybe (Error _) = Nothing - resultToMaybe (Success a) = Just a - barStreamFromFeeds feeds = case nextBar feeds of Just (bar, feeds') -> yield bar >> barStreamFromFeeds feeds' _ -> return () @@ -146,7 +170,6 @@ backtestMain dataDownloadDelta defaultState initCallback callback = do minIx <- newSTRef Nothing forM_ [0..(V.length feeds-1)] (\ix -> do let feed = feeds ! ix - curIx <- readSTRef minIx curTs <- readSTRef minTs case feed of x:_ -> case curTs of @@ -160,53 +183,35 @@ backtestMain dataDownloadDelta defaultState initCallback callback = do readSTRef minIx backtestLoop = awaitForever (\bar -> do - env <- gets strategyEnvironment - let oldTimestamp = seLastTimestamp env + _curState <- use robotState + _env <- gets _strategyEnvironment let newTimestamp = barTimestamp bar - let newenv = env { seBars = updateBars (seBars env) bar, seLastTimestamp = newTimestamp } - curState <- gets robotState - modify' (\s -> s { strategyEnvironment = newenv }) - handleEvents [NewBar bar]) - - handleEvents events = do - newActions <- mapM handleEvent events - newEvents <- executeActions (concat newActions) - unless (null newEvents) $ handleEvents newEvents - - executeActions actions = concat <$> mapM executeAction actions - - executeAction (ActionOrder order) = do - oid <- nextOrderId - let submittedOrder = order { orderState = Submitted, orderId = oid } - modify' (\s -> s { pendingOrders = submittedOrder : pendingOrders s }) - return [OrderSubmitted submittedOrder] - - executeAction (ActionCancelOrder oid) = do - mbOrder <- find (\o -> orderId o == oid && orderState o == Submitted) <$> gets pendingOrders - case mbOrder of - Just _ -> do - modify' (\s -> s { pendingOrders = filter (\o -> orderId o == oid) (pendingOrders s)}) - return [OrderUpdate oid Cancelled] - _ -> return [] - - executeAction (ActionLog t) = modify' (\s -> s { logs = t : logs s }) >> return [] - executeAction (ActionSetupTimer t) = modify' (\s -> s { pendingTimers = t : pendingTimers s }) >> return [] - executeAction (ActionIO _ _) = return [] + strategyEnvironment . seBars %= (flip updateBars bar) + strategyEnvironment . seLastTimestamp .= newTimestamp + enqueueEvent (NewBar bar) + lift handleEvents) + + handleEvents = do + events <- use pendingEvents + case events of + x :<| xs -> do + pendingEvents .= xs + handleEvent x + handleEvents + _ -> return () executePendingOrders bar = do - ev1 <- executeMarketOrders bar - ev2 <- executeLimitOrders bar - return $ ev1 ++ ev2 + executeMarketOrders bar + executeLimitOrders bar executeLimitOrders bar = do - (limitOrders, otherOrders) <- partition + (limitOrders, otherOrders'') <- partition (\o -> case orderPrice o of Limit _ -> True - _ -> False) <$> gets pendingOrders - let (executableOrders, otherOrders) = partition (isExecutable bar) limitOrders - modify' (\s -> s { pendingOrders = otherOrders } ) - forM executableOrders $ \order -> - order `executeAtPrice` priceForLimitOrder order bar + _ -> False) <$> use pendingOrders + let (executableOrders, otherOrders') = partition (isExecutable bar) limitOrders + pendingOrders .= otherOrders' ++ otherOrders'' + forM_ executableOrders $ \order -> order `executeAtPrice` priceForLimitOrder order bar isExecutable bar order = case orderPrice order of Limit price -> if orderOperation order == Buy @@ -225,16 +230,19 @@ backtestMain dataDownloadDelta defaultState initCallback callback = do _ -> error "Should've been limit order" executeMarketOrders bar = do - (marketOrders, otherOrders) <- partition (\o -> orderPrice o == Market) <$> gets pendingOrders - modify' (\s -> s { pendingOrders = otherOrders }) - forM marketOrders $ \order -> + (marketOrders, otherOrders) <- partition (\o -> orderPrice o == Market) <$> use pendingOrders + pendingOrders .= otherOrders + forM_ marketOrders $ \order -> order `executeAtPrice` barOpen bar executeAtPrice order price = do - ts <- seLastTimestamp <$> gets strategyEnvironment - modify' (\s -> s { tradesLog = mkTrade order price ts : tradesLog s }) - return $ OrderUpdate (orderId order) Executed + ts <- use $ strategyEnvironment . seLastTimestamp + let thisTrade = mkTrade order price ts + tradesLog %= (\log' -> thisTrade : log') + pendingEvents %= (\s -> (OrderUpdate (orderId order) Executed) <| s) + pendingEvents %= (\s -> (NewTrade thisTrade) <| s) + mkTrade :: Order -> Price -> UTCTime -> Trade mkTrade order price ts = Trade { tradeOrderId = orderId order, tradePrice = price, @@ -250,21 +258,16 @@ backtestMain dataDownloadDelta defaultState initCallback callback = do } handleEvent event@(NewBar bar) = do - events <- executePendingOrders bar + executePendingOrders bar + handleEvents -- This should pass OrderUpdate events to the callback before NewBar events firedTimers <- fireTimers (barTimestamp bar) - actions <- concat <$> mapM handleEvent (events ++ map TimerFired firedTimers) - actions' <- handleEvent' event - return $ actions ++ actions' + mapM_ (\x -> enqueueEvent (TimerFired x)) firedTimers + handleEvent' event + return () handleEvent event = handleEvent' event - handleEvent' event = do - env <- gets strategyEnvironment - params <- gets robotParams - curState <- gets robotState - let (newState, actions, _) = runStrategyElement params curState env $ callback event - modify' (\s -> s { robotState = newState } ) - return actions + handleEvent' event = callback event updateBars barMap newbar = M.alter (\case Nothing -> Just BarSeries { bsTickerId = barSecurity newbar, @@ -273,11 +276,11 @@ backtestMain dataDownloadDelta defaultState initCallback callback = do Just bs -> Just bs { bsBars = updateBarList newbar (bsBars bs) }) (barSecurity newbar) barMap updateBarList newbar (_:bs) = newbar:newbar:bs - updateBarList newbar _ = newbar:[newbar] + updateBarList newbar _ = newbar:[newbar] fireTimers ts = do - (firedTimers, otherTimers) <- partition (< ts) <$> gets pendingTimers - modify' (\s -> s { pendingTimers = otherTimers }) + (firedTimers, otherTimers) <- partition (< ts) <$> use pendingTimers + pendingTimers .= otherTimers return firedTimers loadFeeds :: [Feed] -> IO (V.Vector [Bar]) @@ -290,28 +293,43 @@ backtestMain dataDownloadDelta defaultState initCallback callback = do rowToBar tid r = Bar tid (rowTime r) (rowOpen r) (rowHigh r) (rowLow r) (rowClose r) (rowVolume r) - nextOrderId = do - oid <- gets orderIdCounter - modify' (\s -> s { orderIdCounter = oid + 1 }) - return oid - - -data BacktestState s c = BacktestState { - cash :: Double, - robotState :: s, - robotParams :: c, - strategyEnvironment :: StrategyEnvironment, - pendingOrders :: [Order], - tradesLog :: [Trade], - orderIdCounter :: Integer, - pendingTimers :: [UTCTime], - logs :: [T.Text] -} -defaultBacktestState s c tickerList = BacktestState 0 s c (StrategyEnvironment "" "" 1 tickers (UTCTime (fromGregorian 1970 1 1) 0)) [] [] 1 [] [] + enqueueEvent event = pendingEvents %= (\s -> s |> event) + +instance (Default c, Default s) => Default (BacktestState c s) where - tickers = M.fromList $ map (\x -> (code x, BarSeries (code x) (Timeframe (timeframeSeconds x)) [])) tickerList + def = defaultBacktestState def def def + +defaultBacktestState :: s -> c -> Bars -> BacktestState c s +defaultBacktestState s c bars = BacktestState 0 s c (StrategyEnvironment "" "" 1 bars (UTCTime (fromGregorian 1970 1 1) 0)) [] Seq.empty [] 1 [] [] newtype BacktestingMonad s c a = BacktestingMonad { unBacktestingMonad :: State (BacktestState s c) a } deriving (Functor, Applicative, Monad, MonadState (BacktestState s c)) +nextOrderId :: BacktestingMonad s c OrderId +nextOrderId = do + orderIdCounter += 1 + use orderIdCounter + +instance MonadRobot (BacktestingMonad c s) c s where + submitOrder order = do + oid <- nextOrderId + let orderWithId = order { orderId = oid } + pendingOrders %= ((:) orderWithId) + pendingEvents %= (\s -> s |> (OrderSubmitted orderWithId)) + cancelOrder oid = do + orders <- use pendingOrders + let (matchingOrders, otherOrders) = partition (\o -> orderId o == oid) orders + case matchingOrders of + [] -> return () + xs -> do + mapM_ (\o -> pendingEvents %= (\s -> s |> (OrderUpdate (orderId o) Cancelled))) xs + pendingOrders .= otherOrders + appendToLog txt = logs %= ((:) (TL.toStrict txt)) + setupTimer time = pendingTimers %= ((:) time) + enqueueIOAction _actionId _action = error "Backtesting io actions is not supported" + getConfig = use robotParams + getState = use robotState + setState s = robotState .= s + getEnvironment = use strategyEnvironment + diff --git a/src/ATrade/Driver/Junction.hs b/src/ATrade/Driver/Junction.hs new file mode 100644 index 0000000..e89de78 --- /dev/null +++ b/src/ATrade/Driver/Junction.hs @@ -0,0 +1,58 @@ + +module ATrade.Driver.Junction + ( + junctionMain + ) where + +import ATrade.Driver.Junction.Types (StrategyDescriptor (..), + StrategyInstance (..), + StrategyInstanceDescriptor (..)) +import Data.Aeson (decode) +import qualified Data.ByteString as B +import qualified Data.ByteString.Lazy as BL +import Data.IORef +import qualified Data.Map.Strict as M +import qualified Data.Text as T + +load :: T.Text -> IO B.ByteString +load = undefined + +junctionMain :: M.Map T.Text StrategyDescriptor -> IO () +junctionMain descriptors = do + parseOptions + instanceDescriptors <- undefined + strategies <- mkStrategies instanceDescriptors + + start strategies + + where + parseOptions = undefined + + mkStrategies :: [StrategyInstanceDescriptor] -> IO [StrategyInstance] + mkStrategies = mapM mkStrategy + + mkStrategy :: StrategyInstanceDescriptor -> IO StrategyInstance + mkStrategy desc = do + sState <- load (stateKey desc) + sCfg <- load (configKey desc) + case M.lookup (strategyId desc) descriptors of + Just (StrategyDescriptor _sName sCallback _sDefState) -> + case (decode $ BL.fromStrict sCfg, decode $ BL.fromStrict sState) of + (Just pCfg, Just pState) -> do + cfgRef <- newIORef pCfg + stateRef <- newIORef pState + return $ StrategyInstance + { + strategyInstanceId = strategyName desc, + strategyEventCallback = sCallback, + strategyState = stateRef, + strategyConfig = cfgRef + } + _ -> undefined + _ -> undefined + + start = undefined + + + + diff --git a/src/ATrade/Driver/Junction/Types.hs b/src/ATrade/Driver/Junction/Types.hs new file mode 100644 index 0000000..d0cdd3c --- /dev/null +++ b/src/ATrade/Driver/Junction/Types.hs @@ -0,0 +1,54 @@ +{-# LANGUAGE ExistentialQuantification #-} +{-# LANGUAGE RankNTypes #-} + +module ATrade.Driver.Junction.Types + ( + StrategyDescriptor(..), + TickerConfig(..), + StrategyInstanceDescriptor(..), + StrategyInstance(..) + ) where + +import ATrade.RoboCom.Monad (EventCallback) +import ATrade.Types (BarTimeframe, TickerId) +import Data.Aeson (FromJSON (..), ToJSON (..)) +import qualified Data.ByteString as B +import Data.IORef +import qualified Data.Text as T + +data StrategyDescriptor = + forall c s. (FromJSON s, ToJSON s, FromJSON c) => + StrategyDescriptor + { + baseStrategyName :: T.Text, + eventCallback :: EventCallback c s, + defaultState :: s + } + +data TickerConfig = + TickerConfig + { + tickerId :: TickerId, + timeframe :: BarTimeframe + } + +data StrategyInstanceDescriptor = + StrategyInstanceDescriptor + { + strategyId :: T.Text, + strategyName :: T.Text, + configKey :: T.Text, + stateKey :: T.Text, + logPath :: T.Text, + tickers :: [TickerConfig] + } + +data StrategyInstance = + forall c s. (FromJSON s, ToJSON s, FromJSON c) => + StrategyInstance + { + strategyInstanceId :: T.Text, + strategyEventCallback :: EventCallback c s, + strategyState :: IORef s, + strategyConfig :: IORef c + } diff --git a/src/ATrade/Driver/Real.hs b/src/ATrade/Driver/Real.hs index a0f5acf..9661b27 100644 --- a/src/ATrade/Driver/Real.hs +++ b/src/ATrade/Driver/Real.hs @@ -1,62 +1,69 @@ -{-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE MultiWayIf #-} -{-# LANGUAGE BangPatterns #-} -{-# LANGUAGE CPP #-} -{-# LANGUAGE RankNTypes #-} -{-# LANGUAGE TypeApplications #-} -{-# LANGUAGE TypeSynonymInstances #-} -{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE CPP #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE MultiWayIf #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RankNTypes #-} module ATrade.Driver.Real ( - Strategy(..), StrategyInstanceParams(..), robotMain, BigConfig(..), - mkBarStrategy, barStrategyDriver ) where -import Options.Applicative -import System.IO -import System.Signal -import System.Exit -import System.Random -import System.Log.Logger -import System.Log.Handler.Simple -import System.Log.Handler (setFormatter) -import System.Log.Formatter -import Control.Monad -import Control.Monad.IO.Class -import Control.Concurrent hiding (writeChan, readChan, writeList2Chan, yield) -import Control.Concurrent.BoundedChan as BC -import Control.Exception -import qualified Data.ByteString as BS -import qualified Data.ByteString.Lazy as BL -import qualified Data.List as L -import qualified Data.Map as M -import qualified Data.Text as T -import Data.Text.Encoding -import Data.Aeson -import Data.IORef -import Data.Time.Calendar -import Data.Time.Clock -import Data.Time.Clock.POSIX -import Data.Maybe -import Data.Monoid -import Database.Redis hiding (info, decode) -import ATrade.Types -import ATrade.RoboCom.Monad (StrategyMonad, StrategyAction(..), EventCallback, Event(..), runStrategyElement, StrategyEnvironment(..), Event(..), MonadRobot(..)) -import ATrade.BarAggregator -import ATrade.Driver.Real.BrokerClientThread -import ATrade.Driver.Real.QuoteSourceThread -import ATrade.Driver.Real.Types (Strategy(..), StrategyInstanceParams(..), InitializationCallback) -import ATrade.RoboCom.Types (BarSeries(..), Ticker(..), Timeframe(..)) -import ATrade.Exceptions -import ATrade.Quotes.Finam as QF -import ATrade.Quotes.QHP as QQ -import ATrade.Quotes.HAP as QH -import System.ZMQ4 hiding (Event(..)) +import ATrade.BarAggregator +import ATrade.Driver.Real.BrokerClientThread +import ATrade.Driver.Real.QuoteSourceThread +import ATrade.Driver.Types (InitializationCallback, StrategyInstanceParams (..)) +import ATrade.Exceptions +import ATrade.Quotes (MonadHistory (..), MonadInstrumentParametersSource (..)) +import ATrade.Quotes.QHP as QQ +import ATrade.Quotes.QTIS (TickerInfo (..), + qtisGetTickersInfo) +import ATrade.RoboCom.Monad (Event (..), + EventCallback, + MonadRobot (..), + StrategyEnvironment (..), + seBars, seLastTimestamp) +import ATrade.RoboCom.Types (BarSeries (..), InstrumentParameters (..), + Ticker (..), + Timeframe (..)) +import ATrade.RoboCom.Utils (fromHMS) +import ATrade.Types +import Control.Concurrent hiding (readChan, + writeChan, + writeList2Chan, yield) +import Control.Concurrent.BoundedChan as BC +import Control.Exception.Safe +import Control.Lens hiding (Context, (.=)) +import Control.Monad +import Control.Monad.Reader +import Data.Aeson +import qualified Data.ByteString as BS +import qualified Data.ByteString.Lazy as BL +import Data.IORef +import qualified Data.Map as M +import Data.Maybe +import qualified Data.Text as T +import Data.Text.Encoding +import qualified Data.Text.Lazy as TL +import Data.Time.Calendar +import Data.Time.Clock +import Data.Time.Clock.POSIX +import Database.Redis hiding (decode, info) +import GHC.Generics +import Options.Applicative +import System.Exit +import System.IO +import System.Log.Formatter +import System.Log.Handler (setFormatter) +import System.Log.Handler.Simple +import System.Log.Logger +import System.Signal +import System.ZMQ4 hiding (Event (..)) import Ether.Reader @@ -106,18 +113,18 @@ instance (MonadRobot (RealDriver c s) c s) where getEnvironment = asks @RDriverEnv environmentRef >>= liftIO . readIORef data Params = Params { - instanceId :: String, - strategyConfigFile :: FilePath, - strategyStateFile :: FilePath, - brokerEp :: String, - quotesourceEp :: String, + instanceId :: String, + strategyConfigFile :: FilePath, + strategyStateFile :: FilePath, + brokerEp :: String, + quotesourceEp :: String, historyProviderType :: Maybe String, - historyProvider :: Maybe String, - redisSocket :: Maybe String, - qtisSocket :: Maybe String, - accountId :: String, - volumeFactor :: Int, - sourceBarTimeframe :: Maybe Int + historyProvider :: Maybe String, + redisSocket :: Maybe String, + qtisEndpoint :: String, + accountId :: String, + volumeFactor :: Int, + sourceBarTimeframe :: Maybe Int } deriving (Show, Eq) paramsParser :: Parser Params @@ -146,9 +153,9 @@ paramsParser = Params <*> optional ( strOption ( long "redis-socket" <> metavar "ADDRESS" )) - <*> optional ( strOption + <*> strOption ( long "qtis" - <> metavar "ENDPOINT/ID" )) + <> metavar "ENDPOINT/ID" ) <*> strOption ( long "account" <> metavar "ACCOUNT" ) @@ -159,9 +166,79 @@ paramsParser = Params ( long "source-timeframe" <> metavar "SECONDS" )) +data Env historySource c s = Env { + envZeromqContext :: Context, + envHistorySource :: historySource, + envQtisEndpoint :: T.Text, + envStrategyInstanceParams :: StrategyInstanceParams, + envStrategyEnvironment :: IORef StrategyEnvironment, + envConfigRef :: IORef c, + envStateRef :: IORef s, + envBrokerChan :: BC.BoundedChan BrokerCommand, + envTimers :: IORef [UTCTime], + envEventChan :: BC.BoundedChan Event, + envAggregator :: IORef BarAggregator, + envLastTimestamp :: IORef UTCTime +} deriving (Generic) + +type App historySource c s = ReaderT (Env historySource c s) IO + +instance MonadRobot (App historySource c s) c s where + submitOrder order = do + bc <- asks envBrokerChan + lift $ BC.writeChan bc $ BrokerSubmitOrder order + + cancelOrder oId = do + bc <- asks envBrokerChan + lift $ BC.writeChan bc $ BrokerCancelOrder oId + + appendToLog = lift . debugM "Strategy" . T.unpack . TL.toStrict + setupTimer t = do + timers <- asks envTimers + lift $ atomicModifyIORef' timers (\s -> (t : s, ())) + + enqueueIOAction actionId action' = do + eventChan <- asks envEventChan + lift $ void $ forkIO $ do + v <- action' + BC.writeChan eventChan $ ActionCompleted actionId v + + getConfig = asks envConfigRef >>= lift . readIORef + getState = asks envStateRef >>= lift . readIORef + setState s = do + ref <- asks envStateRef + lift $ writeIORef ref s + + getEnvironment = do + aggRef <- asks envAggregator + envRef <- asks envStrategyEnvironment + agg <- lift $ readIORef aggRef + env <- lift $ readIORef envRef + nowRef <- asks envLastTimestamp + now <- lift $ readIORef nowRef + return $ env & seBars .~ bars agg & seLastTimestamp .~ now + +instance MonadHistory (App QQ.QHPHandle c s) where + getHistory tickerId timeframe fromTime toTime = do + qhp <- asks envHistorySource + QQ.requestHistoryFromQHP qhp tickerId timeframe fromTime toTime + +instance MonadInstrumentParametersSource (App hs c s) where + getInstrumentParameters tickerIds = do + ctx <- asks envZeromqContext + ep <- asks envQtisEndpoint + info <- liftIO $ qtisGetTickersInfo ctx ep tickerIds + return $ (tiTicker info, convert info) + where + convert info = InstrumentParameters + { + ipLotSize = fromInteger $ tiLotSize info, + ipTickSize = tiTickSize info + } + data BigConfig c = BigConfig { - confTickers :: [Ticker], + confTickers :: [Ticker], strategyConfig :: c } @@ -182,7 +259,7 @@ storeState params stateRef timersRef = do Nothing -> withFile (strategyStateFile params) WriteMode (\f -> BS.hPut f $ BL.toStrict $ encode currentStrategyState) `catch` (\e -> warningM "main" ("Unable to save state: " ++ show (e :: IOException))) Just sock -> do -#ifdef linux_HOST_OS + conn <- checkedConnect $ defaultConnectInfo { connectPort = UnixSocket sock } now <- getPOSIXTime res <- runRedis conn $ mset [(encodeUtf8 $ T.pack $ instanceId params, BL.toStrict $ encode currentStrategyState), @@ -190,12 +267,8 @@ storeState params stateRef timersRef = do (encodeUtf8 $ T.pack $ instanceId params ++ ":timers", BL.toStrict $ encode currentTimersState) ] case res of - Left _ -> warningM "main" "Unable to save state" + Left _ -> warningM "main" "Unable to save state" Right _ -> return () -#else - return () -#endif - gracefulShutdown :: (ToJSON s) => Params -> IORef s -> IORef [UTCTime] -> MVar () -> Signal -> IO () gracefulShutdown params stateRef timersRef shutdownMv _ = do @@ -204,8 +277,8 @@ gracefulShutdown params stateRef timersRef shutdownMv _ = do putMVar shutdownMv () exitSuccess -robotMain :: (ToJSON s, FromJSON s, FromJSON c) => DiffTime -> s -> Maybe (InitializationCallback c) -> EventCallback c s -> IO () -robotMain dataDownloadDelta defaultState initCallback callback = do +robotMain :: (ToJSON s, FromJSON s, FromJSON c) => DiffTime -> s -> EventCallback c s -> IO () +robotMain dataDownloadDelta defaultState callback = do params <- execParser opts initLogging params infoM "main" "Starting" @@ -219,43 +292,70 @@ robotMain dataDownloadDelta defaultState initCallback callback = do strategyAccount = T.pack . accountId $ params, strategyVolume = volumeFactor params, tickers = tickerList, - strategyQuotesourceEp = T.pack . quotesourceEp $ params, - strategyBrokerEp = T.pack . brokerEp $ params, - strategyHistoryProviderType = T.pack $ fromMaybe "finam" $ historyProviderType params, - strategyHistoryProvider = T.pack $ fromMaybe "" $ historyProvider params, - strategyQTISEp = T.pack <$> qtisSocket params} + strategyQTISEp = Nothing } - updatedConfig <- case initCallback of - Just cb -> cb config instanceParams - Nothing -> return config - - let strategy = mkBarStrategy instanceParams dataDownloadDelta updatedConfig stratState callback stateRef <- newIORef stratState + configRef <- newIORef config timersRef <- newIORef timersState shutdownMv <- newEmptyMVar installHandler sigINT (gracefulShutdown params stateRef timersRef shutdownMv) installHandler sigTERM (gracefulShutdown params stateRef timersRef shutdownMv) - randsec <- getStdRandom(randomR(1, 10)) - threadDelay $ randsec * 1000000 debugM "main" "Forking state saving thread" stateSavingThread <- forkIO $ forever $ do threadDelay 1000000 storeState params stateRef timersRef + straEnv <- newIORef StrategyEnvironment { + _seInstanceId = strategyInstanceId instanceParams, + _seAccount = strategyAccount instanceParams, + _seVolume = strategyVolume instanceParams, + _seBars = M.empty, + _seLastTimestamp = UTCTime (fromGregorian 1970 1 1) 0 + } + -- Event channel is for strategy events, like new tick arrival, or order execution notification + eventChan <- BC.newBoundedChan 1000 + -- Orders channel passes strategy orders to broker thread + brokerChan <- BC.newBoundedChan 1000 + debugM "main" "Starting strategy driver" - barStrategyDriver (sourceBarTimeframe params) tickFilter strategy stateRef timersRef shutdownMv `finally` killThread stateSavingThread + withContext (\ctx -> do + + let qsEp = T.pack $ quotesourceEp params + let brEp = T.pack $ brokerEp params + agg <- newIORef $ mkAggregatorFromBars M.empty [(hmsToDiffTime 3 50 0, hmsToDiffTime 21 10 0)] + bracket (startQuoteSourceThread ctx qsEp instanceParams eventChan agg tickFilter (sourceBarTimeframe params)) killThread $ \_ -> do + debugM "Strategy" "QuoteSource thread forked" + bracket (startBrokerClientThread (strategyInstanceId instanceParams) ctx brEp brokerChan eventChan shutdownMv) killThread $ \_ -> do + debugM "Strategy" "Broker thread forked" + + now <- getCurrentTime >>= newIORef + + let env = Env { + envZeromqContext = ctx, + envQtisEndpoint = T.pack . qtisEndpoint $ params, + envHistorySource = mkQHPHandle ctx (T.pack . fromMaybe "" . historyProvider $ params), + envStrategyInstanceParams = instanceParams, + envStrategyEnvironment = straEnv, + envConfigRef = configRef, + envStateRef = stateRef, + envBrokerChan = brokerChan, + envTimers = timersRef, + envEventChan = eventChan, + envAggregator = agg, + envLastTimestamp = now + } + runReaderT (barStrategyDriver dataDownloadDelta instanceParams callback shutdownMv) env `finally` killThread stateSavingThread) where tickFilter :: Tick -> Bool tickFilter tick = let classCode = T.takeWhile (/= '#') (security tick) in - if - | classCode == "SPBFUT" || classCode == "SPBOPT" -> any (inInterval . utctDayTime . timestamp $ tick) fortsIntervals - | otherwise -> any (inInterval . utctDayTime . timestamp $ tick) secIntervals + if classCode == "SPBFUT" || classCode == "SPBOPT" + then any (inInterval . utctDayTime . timestamp $ tick) fortsIntervals + else any (inInterval . utctDayTime . timestamp $ tick) secIntervals - fortsIntervals = [(fromHMS 7 0 0, fromHMS 11 0 0), (fromHMS 11 5 0, fromHMS 15 45 0), (fromHMS 16 0 0, fromHMS 20 50 0)] + fortsIntervals = [(fromHMS 4 0 0, fromHMS 11 0 0), (fromHMS 11 5 0, fromHMS 15 45 0), (fromHMS 16 0 0, fromHMS 20 50 0)] secIntervals = [(fromHMS 6 50 0, fromHMS 15 51 0)] - fromHMS h m s = h * 3600 + m * 60 + s inInterval ts (start, end) = ts >= start && ts <= end opts = info (helper <*> paramsParser) @@ -274,16 +374,16 @@ robotMain dataDownloadDelta defaultState initCallback callback = do loadStrategyConfig params = withFile (strategyConfigFile params) ReadMode (\f -> do bigconfig <- eitherDecode . BL.fromStrict <$> BS.hGetContents f case bigconfig of - Right conf -> return (confTickers conf, strategyConfig conf) + Right conf -> return (confTickers conf, strategyConfig conf) Left errmsg -> throw $ UnableToLoadConfig $ (T.pack . show) errmsg) loadStrategyTimers :: Params -> IO [UTCTime] loadStrategyTimers params = case redisSocket params of Nothing -> return [] Just sock -> do -#ifdef linux_HOST_OS + conn <- checkedConnect $ defaultConnectInfo { connectPort = UnixSocket sock } - res <- runRedis conn $ get (encodeUtf8 $ T.pack $ instanceId params ++ "timers") + res <- runRedis conn $ get (encodeUtf8 $ T.pack $ instanceId params ++ ":timers") case res of Left _ -> do warningM "main" "Unable to load state" @@ -297,15 +397,11 @@ robotMain dataDownloadDelta defaultState initCallback callback = do Nothing -> do warningM "main" "Unable to load state" return [] -#else - error "Not implemented" -#endif - loadStrategyState params = case redisSocket params of Nothing -> loadStateFromFile (strategyStateFile params) Just sock -> do -#ifdef linux_HOST_OS + conn <- checkedConnect $ defaultConnectInfo { connectPort = UnixSocket sock } res <- runRedis conn $ get (encodeUtf8 $ T.pack $ instanceId params) case res of @@ -321,10 +417,7 @@ robotMain dataDownloadDelta defaultState initCallback callback = do Nothing -> do warningM "main" "Unable to load state" return defaultState -#else - error "Not implemented" -#endif - + loadStateFromFile filepath = withFile filepath ReadMode (\f -> do maybeState <- decode . BL.fromStrict <$> BS.hGetContents f case maybeState of @@ -332,43 +425,27 @@ robotMain dataDownloadDelta defaultState initCallback callback = do Nothing -> return defaultState ) `catch` (\e -> warningM "main" ("Unable to load state: " ++ show (e :: IOException)) >> return defaultState) --- | Helper function to make 'Strategy' instances -mkBarStrategy :: StrategyInstanceParams -> DiffTime -> c -> s -> EventCallback c s -> Strategy c s -mkBarStrategy instanceParams dd params initialState cb = BarStrategy { - downloadDelta = dd, - eventCallback = cb, - currentState = initialState, - strategyParams = params, - strategyTimers = [], - - strategyInstanceParams = instanceParams } - -- | Main function which handles incoming events (ticks/orders), passes them to strategy callback -- and executes returned strategy actions -barStrategyDriver :: Maybe Int -> (Tick -> Bool) -> Strategy c s -> IORef s -> IORef [UTCTime] -> MVar () -> IO () -barStrategyDriver mbSourceTimeframe tickFilter strategy stateRef timersRef shutdownVar = do - -- Make channels - -- Event channel is for strategy events, like new tick arrival, or order execution notification - eventChan <- BC.newBoundedChan 1000 - -- Orders channel passes strategy orders to broker thread - ordersChan <- BC.newBoundedChan 1000 - - withContext (\ctx -> do - -- Load tickers data and create BarAggregator from them - historyBars <- - if - | (strategyHistoryProviderType . strategyInstanceParams) strategy == "finam" -> - M.fromList <$> mapM loadTickerFromFinam (tickers . strategyInstanceParams $ strategy) - | (strategyHistoryProviderType . strategyInstanceParams) strategy == "hap" -> - M.fromList <$> mapM (loadTickerFromHAP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy)) (tickers . strategyInstanceParams $ strategy) - | otherwise -> - M.fromList <$> mapM (loadTickerFromQHP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy)) (tickers . strategyInstanceParams $ strategy) - agg <- newIORef $ mkAggregatorFromBars historyBars [(hmsToDiffTime 6 50 0, hmsToDiffTime 21 0 0)] - bracket (startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter mbSourceTimeframe) killThread (\_ -> do - debugM "Strategy" "QuoteSource thread forked" - bracket (startBrokerClientThread (strategyInstanceId . strategyInstanceParams $ strategy) ctx brEp ordersChan eventChan shutdownVar) killThread (\_ -> do - debugM "Strategy" "Broker thread forked" - +barStrategyDriver :: (MonadHistory (App hs c s)) => DiffTime -> StrategyInstanceParams -> EventCallback c s -> MVar () -> App hs c s () +barStrategyDriver downloadDelta instanceParams callback shutdownVar = do + now <- liftIO getCurrentTime + history <- M.fromList <$> mapM (loadTickerHistory now) (tickers instanceParams) + eventChan <- asks envEventChan + brokerChan <- asks envBrokerChan + agg <- asks envAggregator + liftIO $ atomicModifyIORef' agg (\s -> (replaceHistory s history, ())) + + wakeupTid <- lift . forkIO $ forever $ do + maybeShutdown <- tryTakeMVar shutdownVar + if isJust maybeShutdown + then writeChan eventChan Shutdown + else do + threadDelay 1000000 + writeChan brokerChan BrokerRequestNotifications + lift $ debugM "Strategy" "Wakeup thread forked" + +<<<<<<< HEAD wakeupTid <- forkIO $ forever $ do maybeShutdown <- tryTakeMVar shutdownVar if isJust maybeShutdown @@ -407,10 +484,33 @@ barStrategyDriver mbSourceTimeframe tickFilter strategy stateRef timersRef shutd env <- readIORef envRef let oldTimestamp = seLastTimestamp env +======= + readAndHandleEvents agg instanceParams + lift $ debugM "Strategy" "Stopping strategy driver" + lift $ killThread wakeupTid + + where + loadTickerHistory now t = do + history <- getHistory (code t) (BarTimeframe (fromInteger . timeframeSeconds $ t)) + ((fromRational . toRational . negate $ downloadDelta) `addUTCTime` now) now + instrumentParams <- snd <$> getInstrumentParameters (code t) + return (code t, BarSeries (code t) (Timeframe (timeframeSeconds t)) history instrumentParams) + + readAndHandleEvents agg instanceParams' = do + eventChan <- asks envEventChan + event <- lift $ readChan eventChan + if event /= Shutdown + then do + env <- getEnvironment +>>>>>>> stable let newTimestamp = case event of NewTick tick -> timestamp tick - _ -> seLastTimestamp env + NewBar bar -> barTimestamp bar + _ -> env ^. seLastTimestamp + nowRef <- asks envLastTimestamp + lift $ writeIORef nowRef newTimestamp +<<<<<<< HEAD newTimers <- catMaybes <$> (readIORef timersRef >>= mapM (checkTimer eventChan newTimestamp)) atomicWriteIORef timersRef newTimers @@ -419,117 +519,24 @@ barStrategyDriver mbSourceTimeframe tickFilter strategy stateRef timersRef shutd readAndHandleEvents agg ordersChan eventChan strategy' envRef else debugM "Strategy" "Shutdown requested" +======= + timersRef <- asks envTimers + oldTimers <- lift $ readIORef timersRef + newTimers <- catMaybes <$> mapM (checkTimer eventChan newTimestamp) oldTimers + callback event + lift $ writeIORef timersRef newTimers + + readAndHandleEvents agg instanceParams' + else + lift $ debugM "Strategy" "Shutdown requested" +>>>>>>> stable where - handleTimerActions action = - case action of - ActionSetupTimer timerTime -> return $ Just timerTime - _ -> return Nothing - - handleActions ordersChan' action = - case action of - (ActionLog logText) -> debugM "Strategy" $ T.unpack logText - (ActionOrder order) -> writeChan ordersChan' $ BrokerSubmitOrder order - (ActionCancelOrder oid) -> writeChan ordersChan' $ BrokerCancelOrder oid - (ActionSetupTimer _) -> return () - (ActionIO tag io) -> void $ forkIO $ do - v <- io - writeChan eventChan (ActionCompleted tag v) checkTimer eventChan' newTimestamp timerTime = if newTimestamp >= timerTime then do - writeChan eventChan' $ TimerFired timerTime + lift $ writeChan eventChan' $ TimerFired timerTime return Nothing else return $ Just timerTime - loadTickerFromHAP :: Context -> T.Text -> Ticker -> IO (TickerId, BarSeries) - loadTickerFromHAP ctx ep t = do - debugM "Strategy" $ "Loading ticker from HAP: " ++ show (code t) - case parseHAPPeriod $ timeframeSeconds t of - Just tf -> do - now <- getCurrentTime - historyBars <- QH.getQuotes ctx QH.RequestParams { - QH.endpoint = ep, - QH.ticker = code t, - QH.startDate = addUTCTime (negate . (1 +) . fromRational . toRational $ downloadDelta strategy) now, - QH.endDate = now, - QH.period = tf } - debugM "Strategy" $ "Obtained " ++ show (length historyBars) ++ " bars" - return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = historyBars }) - _ -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = [] }) - - - loadTickerFromQHP :: Context -> T.Text -> Ticker -> IO (TickerId, BarSeries) - loadTickerFromQHP ctx ep t = do - debugM "Strategy" $ "Loading ticker from QHP: " ++ show (code t) - case parseQHPPeriod $ timeframeSeconds t of - Just tf -> do - now <- getCurrentTime - historyBars <- QQ.getQuotes ctx QQ.RequestParams { - QQ.endpoint = ep, - QQ.ticker = code t, - QQ.startDate = addDays (negate . (1 +) . ceiling $ downloadDelta strategy / 86400) (utctDay now), - QQ.endDate = utctDay now, - QQ.period = tf } - debugM "Strategy" $ "Obtained " ++ show (length historyBars) ++ " bars" - return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = historyBars }) - _ -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = [] }) - - - loadTickerFromFinam :: Ticker -> IO (TickerId, BarSeries) - loadTickerFromFinam t = do - randDelay <- getStdRandom (randomR (1, 5)) - threadDelay $ randDelay * 1000000 - now <- getCurrentTime - debugM "Strategy" $ show (L.lookup "finam" (aliases t), parseFinamPeriod $ timeframeSeconds t) - case (L.lookup "finam" (aliases t), parseFinamPeriod $ timeframeSeconds t) of - (Just finamCode, Just per) -> do - debugM "Strategy" $ "Downloading ticker: " ++ finamCode - history <- downloadAndParseQuotes $ defaultParams { QF.ticker = T.pack finamCode, - QF.startDate = addDays (negate . (1 +) . ceiling $ downloadDelta strategy / 86400) (utctDay now), - QF.endDate = utctDay now, - QF.period = per } - case history of - Just h -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = convertFromFinamHistory (code t) h }) - Nothing -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = [] }) - _ -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = [] }) - - convertFromFinamHistory :: TickerId -> [Row] -> [Bar] - convertFromFinamHistory tid = L.reverse . fmap (\row -> Bar { barSecurity = tid, - barTimestamp = rowTime row, - barOpen = rowOpen row, - barHigh = rowHigh row, - barLow = rowLow row, - barClose = rowClose row, - barVolume = rowVolume row }) - - parseFinamPeriod x - | x == 0 = Just QF.PeriodTick - | x == 60 = Just QF.Period1Min - | x == 5 * 60 = Just QF.Period5Min - | x == 10 * 60 = Just QF.Period10Min - | x == 15 * 60 = Just QF.Period15Min - | x == 30 * 60 = Just QF.Period30Min - | x == 60 * 60 = Just QF.PeriodHour - | x == 24 * 60 * 60 = Just QF.PeriodDay - | otherwise = Nothing - - parseQHPPeriod x - | x == 60 = Just QQ.Period1Min - | x == 5 * 60 = Just QQ.Period5Min - | x == 15 * 60 = Just QQ.Period15Min - | x == 30 * 60 = Just QQ.Period30Min - | x == 60 * 60 = Just QQ.PeriodHour - | x == 24 * 60 * 60 = Just QQ.PeriodDay - | otherwise = Nothing - - parseHAPPeriod x - | x == 60 = Just QH.Period1Min - | x == 5 * 60 = Just QH.Period5Min - | x == 15 * 60 = Just QH.Period15Min - | x == 30 * 60 = Just QH.Period30Min - | x == 60 * 60 = Just QH.PeriodHour - | x == 24 * 60 * 60 = Just QH.PeriodDay - | otherwise = Nothing - diff --git a/src/ATrade/Driver/Real/BrokerClientThread.hs b/src/ATrade/Driver/Real/BrokerClientThread.hs index ee994da..2146b86 100644 --- a/src/ATrade/Driver/Real/BrokerClientThread.hs +++ b/src/ATrade/Driver/Real/BrokerClientThread.hs @@ -1,4 +1,5 @@ {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TupleSections #-} module ATrade.Driver.Real.BrokerClientThread ( startBrokerClientThread, @@ -9,7 +10,6 @@ import ATrade.Broker.Client import ATrade.Broker.Protocol import ATrade.RoboCom.Monad hiding (cancelOrder, submitOrder) -import ATrade.RoboCom.Types import ATrade.Types import Control.Concurrent hiding (readChan, writeChan, @@ -28,47 +28,58 @@ import Data.Time.Clock import System.Log.Logger import System.ZMQ4 hiding (Event) -data BrokerCommand = BrokerSubmitOrder Order | BrokerCancelOrder Integer | BrokerRequestNotifications +data BrokerCommand = BrokerSubmitOrder Order | BrokerCancelOrder Integer | BrokerRequestNotifications | BrokerHandleNotification Notification - -startBrokerClientThread :: T.Text -> Context -> T.Text -> BoundedChan BrokerCommand -> BoundedChan Event -> MVar a -> IO ThreadId -startBrokerClientThread instId ctx brEp ordersChan eventChan shutdownVar = forkIO $ whileM_ (isNothing <$> tryReadMVar shutdownVar) $ - bracket (startBrokerClient (encodeUtf8 instId) ctx brEp defaultClientSecurityParams) - (\bro -> do - stopBrokerClient bro - debugM "Strategy" "Broker client: stop") - (\bs -> handle (\e -> do - warningM "Strategy" $ "Broker client: exception: " ++ show (e :: SomeException) - throwIO e) $ do - now <- getCurrentTime - lastNotificationTime <- newIORef now - whileM_ (andM [notTimeout lastNotificationTime, isNothing <$> tryReadMVar shutdownVar]) $ do - brokerCommand <- readChan ordersChan - case brokerCommand of - BrokerSubmitOrder order -> do - debugM "Strategy" $ "Submitting order: " ++ show order - maybeOid <- submitOrder bs order - debugM "Strategy" "Order submitted" - case maybeOid of - Right oid -> writeChan eventChan (OrderSubmitted order { orderId = oid }) - Left errmsg -> debugM "Strategy" $ T.unpack $ "Error: " `T.append` errmsg - BrokerCancelOrder oid -> do - debugM "Strategy" $ "Cancelling order: " ++ show oid - _ <- cancelOrder bs oid - debugM "Strategy" "Order cancelled" - BrokerRequestNotifications -> do - t <- getCurrentTime - nt <- readIORef lastNotificationTime - when (t `diffUTCTime` nt > 1) $ do - maybeNs <- getNotifications bs - case maybeNs of +startBrokerClientThread :: T.Text -> Context -> T.Text -> T.Text -> BoundedChan BrokerCommand -> BoundedChan Event -> MVar a -> IO ThreadId +startBrokerClientThread instId ctx brEp notifEp ordersChan eventChan shutdownVar = do + let callback = writeChan ordersChan . BrokerHandleNotification + forkIO $ whileM_ (isNothing <$> tryReadMVar shutdownVar) $ + bracket (startBrokerClient (encodeUtf8 instId) ctx brEp notifEp [callback] defaultClientSecurityParams) + (\bro -> do + stopBrokerClient bro + debugM "Strategy" "Broker client: stop") + (\bs -> handle (\e -> do + warningM "Strategy" $ "Broker client: exception: " ++ show (e :: SomeException) + throwIO e) $ do + now <- getCurrentTime + lastNotificationTime <- newIORef now + lastKnownSqnum <- newIORef 0 + whileM_ (andM [notTimeout lastNotificationTime, isNothing <$> tryReadMVar shutdownVar]) $ do + brokerCommand <- readChan ordersChan + case brokerCommand of + BrokerSubmitOrder order -> do + debugM "Strategy" $ "Submitting order: " ++ show order + result <- submitOrder bs order + debugM "Strategy" "Order submitted" + case result of + Right _ -> debugM "Strategy" $ "Order submitted: " ++ show (orderId order) Left errmsg -> debugM "Strategy" $ T.unpack $ "Error: " `T.append` errmsg - Right ns -> do - mapM_ (sendNotification eventChan) ns - getCurrentTime >>= (writeIORef lastNotificationTime) - nTimeout <- notTimeout lastNotificationTime - shouldShutdown <- isNothing <$> tryReadMVar shutdownVar - debugM "Strategy" $ "Broker loop end: " ++ show nTimeout ++ "/" ++ show shouldShutdown) + BrokerCancelOrder oid -> do + debugM "Strategy" $ "Cancelling order: " ++ show oid + _ <- cancelOrder bs oid + debugM "Strategy" "Order cancelled" + BrokerRequestNotifications -> do + t <- getCurrentTime + nt <- readIORef lastNotificationTime + when (t `diffUTCTime` nt > 1) $ do + maybeNs <- getNotifications bs + case maybeNs of + Left errmsg -> debugM "Strategy" $ T.unpack $ "Error: " `T.append` errmsg + Right ns -> do + mapM_ (\n -> do + prevSqnum <- atomicModifyIORef lastKnownSqnum (\s -> (getNotificationSqnum n, s)) + when (prevSqnum + 1 < getNotificationSqnum n) $ + warningM "Strategy" $ "Sqnum jump: " ++ show prevSqnum ++ "->" ++ show (getNotificationSqnum n) + sendNotification eventChan n) ns + getCurrentTime >>= writeIORef lastNotificationTime + BrokerHandleNotification notification -> do + sendNotification eventChan n + prevSqnum <- atomicModifyIORef lastKnownSqnum (\s -> (getNotificationSqnum n, s)) + + undefined + nTimeout <- notTimeout lastNotificationTime + shouldShutdown <- isNothing <$> tryReadMVar shutdownVar + debugM "Strategy" $ "Broker loop end: " ++ show nTimeout ++ "/" ++ show shouldShutdown) notTimeout :: IORef UTCTime -> IO Bool notTimeout ts = do @@ -79,5 +90,5 @@ notTimeout ts = do sendNotification :: BoundedChan Event -> Notification -> IO () sendNotification eventChan notification = writeChan eventChan $ case notification of - OrderNotification oid state -> OrderUpdate oid state - TradeNotification trade -> NewTrade trade + OrderNotification sqnum oid state -> OrderUpdate oid state + TradeNotification sqnum trade -> NewTrade trade diff --git a/src/ATrade/Driver/Real/QuoteSourceThread.hs b/src/ATrade/Driver/Real/QuoteSourceThread.hs index 9d592c0..e1f1d3e 100644 --- a/src/ATrade/Driver/Real/QuoteSourceThread.hs +++ b/src/ATrade/Driver/Real/QuoteSourceThread.hs @@ -6,14 +6,13 @@ module ATrade.Driver.Real.QuoteSourceThread ) where import ATrade.BarAggregator -import ATrade.Driver.Real.Types +import ATrade.Driver.Types import ATrade.QuoteSource.Client import ATrade.RoboCom.Monad import ATrade.RoboCom.Types import ATrade.Types import Data.IORef -import Data.Maybe import qualified Data.Text as T import Control.Concurrent hiding (readChan, writeChan, @@ -25,32 +24,37 @@ import Control.Monad import System.Log.Logger import System.ZMQ4 hiding (Event) -startQuoteSourceThread :: Context -> T.Text -> Strategy c s -> BoundedChan Event -> IORef BarAggregator -> (Tick -> Bool) -> Maybe Int -> IO ThreadId -startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter maybeSourceTimeframe = forkIO $ do +startQuoteSourceThread :: Context -> T.Text -> StrategyInstanceParams -> BoundedChan Event -> IORef BarAggregator -> (Tick -> Bool) -> Maybe Int -> IO ThreadId +startQuoteSourceThread ctx qsEp instanceParams eventChan agg tickFilter maybeSourceTimeframe = forkIO $ do tickChan <- newBoundedChan 1000 - bracket (startQuoteSourceClient tickChan tickersList ctx qsEp) + bracket (startQuoteSourceClient tickChan tickersList ctx qsEp defaultClientSecurityParams) (\qs -> do stopQuoteSourceClient qs - debugM "Strategy" "Quotesource client: stop") + debugM "QSThread" "Quotesource client: stop") (\_ -> forever $ do qdata <- readChan tickChan case qdata of QDTick tick -> when (goodTick tick) $ do writeChan eventChan (NewTick tick) - when (isNothing maybeSourceTimeframe) $ do - aggValue <- readIORef agg - case handleTick tick aggValue of - (Just bar, !newAggValue) -> writeChan eventChan (NewBar bar) >> writeIORef agg newAggValue - (Nothing, !newAggValue) -> writeIORef agg newAggValue - QDBar (_, bar) -> do + case maybeSourceTimeframe of + Nothing -> do + aggValue <- readIORef agg + case handleTick tick aggValue of + (Just bar, !newAggValue) -> writeIORef agg newAggValue >> writeChan eventChan (NewBar bar) + (Nothing, !newAggValue) -> writeIORef agg newAggValue + Just _ -> return () + QDBar (incomingTf, bar) -> do aggValue <- readIORef agg - when (isJust maybeSourceTimeframe) $ do - case handleBar bar aggValue of - (Just bar', !newAggValue) -> writeChan eventChan (NewBar bar') >> writeIORef agg newAggValue - (Nothing, !newAggValue) -> writeIORef agg newAggValue) + -- debugM "QSThread" $ "Incoming bar: " ++ show incomingTf ++ ": " ++ show bar + case maybeSourceTimeframe of + Just tf -> when (tf == unBarTimeframe incomingTf) $ + case handleBar bar aggValue of + (Just bar', !newAggValue) -> writeIORef agg newAggValue >> writeChan eventChan (NewBar bar') + (Nothing, !newAggValue) -> writeIORef agg newAggValue + _ -> return ()) where goodTick tick = tickFilter tick && (datatype tick /= LastTradePrice || (datatype tick == LastTradePrice && volume tick > 0)) - tickersList = fmap code . (tickers . strategyInstanceParams) $ strategy + tickersList = fmap code . tickers $ instanceParams diff --git a/src/ATrade/Driver/Real/Types.hs b/src/ATrade/Driver/Real/Types.hs deleted file mode 100644 index 0728fa8..0000000 --- a/src/ATrade/Driver/Real/Types.hs +++ /dev/null @@ -1,39 +0,0 @@ -{-# LANGUAGE RankNTypes #-} - -module ATrade.Driver.Real.Types ( - Strategy(..), - StrategyInstanceParams(..), - InitializationCallback -) where - -import ATrade.RoboCom.Monad -import ATrade.RoboCom.Types - -import qualified Data.Text as T -import Data.Time.Clock - --- | Top-level strategy configuration and state -data Strategy c s = BarStrategy { - downloadDelta :: DiffTime, -- ^ How much history to download at strategy start - eventCallback :: EventCallback c s, -- ^ Strategy event callback - currentState :: s, -- ^ Current strategy state. Updated after each 'EventCallback' call - strategyParams :: c, -- ^ Strategy params - strategyTimers :: [UTCTime], - - strategyInstanceParams :: StrategyInstanceParams -- ^ Instance params -} - --- | Strategy instance params store few params which are common for all strategies -data StrategyInstanceParams = StrategyInstanceParams { - strategyInstanceId :: T.Text, -- ^ Strategy instance identifier. Should be unique among all strategies (very desirable) - strategyAccount :: T.Text, -- ^ Account string to use for this strategy instance. Broker-dependent - strategyVolume :: Int, -- ^ Volume to use for this instance (in lots/contracts) - tickers :: [Ticker], -- ^ List of tickers which is used by this strategy - strategyQuotesourceEp :: T.Text, -- ^ QuoteSource server endpoint - strategyBrokerEp :: T.Text, -- ^ Broker server endpoint - strategyHistoryProviderType :: T.Text, - strategyHistoryProvider :: T.Text, - strategyQTISEp :: Maybe T.Text -} - -type InitializationCallback c = c -> StrategyInstanceParams -> IO c diff --git a/src/ATrade/Driver/Types.hs b/src/ATrade/Driver/Types.hs new file mode 100644 index 0000000..04e2a53 --- /dev/null +++ b/src/ATrade/Driver/Types.hs @@ -0,0 +1,22 @@ +{-# LANGUAGE RankNTypes #-} + +module ATrade.Driver.Types +( + StrategyInstanceParams(..), + InitializationCallback +) where + +import ATrade.RoboCom.Types + +import qualified Data.Text as T + +-- | Strategy instance params store few params which are common for all strategies +data StrategyInstanceParams = StrategyInstanceParams { + strategyInstanceId :: T.Text, -- ^ Strategy instance identifier. Should be unique among all strategies (very desirable) + strategyAccount :: T.Text, -- ^ Account string to use for this strategy instance. Broker-dependent + strategyVolume :: Int, -- ^ Volume to use for this instance (in lots/contracts) + tickers :: [Ticker], -- ^ List of tickers which is used by this strategy + strategyQTISEp :: Maybe T.Text +} + +type InitializationCallback c = c -> StrategyInstanceParams -> IO c diff --git a/src/ATrade/Exceptions.hs b/src/ATrade/Exceptions.hs index f18cd37..6fd7d12 100644 --- a/src/ATrade/Exceptions.hs +++ b/src/ATrade/Exceptions.hs @@ -8,7 +8,12 @@ import Control.Exception import qualified Data.Text as T import GHC.Generics -data RoboComException = UnableToLoadConfig T.Text | UnableToLoadFeed T.Text +data RoboComException = UnableToLoadConfig T.Text + | UnableToLoadFeed T.Text + | UnableToLoadState T.Text + | UnableToSaveState T.Text + | BadParams T.Text + | QTISFailure T.Text deriving (Show, Generic) instance Exception RoboComException diff --git a/src/ATrade/Forums/Smartlab.hs b/src/ATrade/Forums/Smartlab.hs deleted file mode 100644 index ba79a14..0000000 --- a/src/ATrade/Forums/Smartlab.hs +++ /dev/null @@ -1,153 +0,0 @@ -{-# OPTIONS_GHC -Wno-type-defaults #-} - -module ATrade.Forums.Smartlab ( - NewsItem(..), - IndexItem(..), - getIndex, - getItem -) where - -import qualified Data.ByteString.Lazy as BL -import qualified Data.List as L -import Data.Maybe -import qualified Data.Text as T -import Data.Text.Encoding -import Data.Time.Calendar -import Data.Time.Clock -import Network.HTTP.Simple -import Safe -import Text.HTML.TagSoup -import Text.Parsec -import Text.Parsec.Text -import Text.StringLike - -import Debug.Trace - -data NewsItem = NewsItem { - niUrl :: !T.Text, - niHeader :: !T.Text, - niText :: !T.Text, - niAuthor :: !T.Text, - niPubTime :: !UTCTime -} deriving (Show, Eq) - -data IndexItem = IndexItem { - iiUrl :: !T.Text, - iiTitle :: !T.Text, - iiPubTime :: !UTCTime -} deriving (Show, Eq) - -monthNames :: [T.Text] -monthNames = fmap T.pack ["января", "февраля", "марта", "апреля", "мая", "июня", "июля", "августа", "сентября", "октября", "ноября", "декабря"] - -extractBetween :: StringLike str => String -> [Tag str] -> [Tag str] -extractBetween tagName = takeWhile (~/= closeTag) . dropWhile (~/= openTag) - where - openTag = "<" ++ tagName ++ ">" - closeTag = "" - -matchClass :: T.Text -> T.Text -> Tag T.Text -> Bool -matchClass _ className (TagOpen _ attrs) = case L.lookup (T.pack "class") attrs of - Just klass -> className `L.elem` T.words klass - Nothing -> False - -matchClass _ _ _ = False - -parseTimestamp :: T.Text -> Maybe UTCTime -parseTimestamp text = case parse timestampParser "" text of - Left _ -> Nothing - Right val -> Just val - where - timestampParser :: Parser UTCTime - timestampParser = do - spaces - day <- read <$> many1 digit - spaces - monthName <- T.pack <$> many1 letter - case L.elemIndex monthName monthNames of - Nothing -> fail "Can't parse month" - Just month -> do - spaces - year <- fromIntegral . read <$> many1 digit - _ <- char ',' - spaces - hour <- fromIntegral . read <$> many1 digit - _ <- char ':' - minute <- fromIntegral . read <$> many1 digit - return $ UTCTime (fromGregorian year (month + 1) day) (hour * 3600 + minute * 60) - -getItem :: IndexItem -> IO (Maybe NewsItem) -getItem indexItem = do - rq <- parseRequest $ T.unpack (iiUrl indexItem) - resp <- httpLBS rq - if getResponseStatusCode resp == 200 - then return . parseItem . decodeUtf8 . BL.toStrict . getResponseBody $ resp - else return Nothing - where - parseItem rawHtml = case parseTimestamp timestamp of - Just itemPubtime -> Just NewsItem { - niUrl = iiUrl indexItem, - niHeader = itemHeader, - niText = itemText, - niAuthor = itemAuthor, - niPubTime = itemPubtime - } - Nothing -> Nothing - where - itemHeader = innerText . - extractBetween "span" . - extractBetween "h1" . - dropWhile (not . matchClass (T.pack "div") (T.pack "topic")) $ tags - - itemText = innerText . - extractBetween "div" . - dropWhile (not . matchClass (T.pack "div") (T.pack "content")) . - dropWhile (~/= "
") $ tags - - itemAuthor = innerText . - extractBetween "li" . - dropWhile (not . matchClass (T.pack "li") (T.pack "author")) $ tags - - timestamp = traceShowId $ innerText . - extractBetween "li" . - dropWhile (not . matchClass (T.pack "li") (T.pack "date")) $ tags - - tags = parseTags rawHtml - - -getIndex :: T.Text -> Int -> IO ([IndexItem], Bool) -getIndex rootUrl pageNumber = do - rq <- parseRequest $ T.unpack $ makeUrl rootUrl pageNumber - resp <- httpLBS rq - return $ if getResponseStatusCode resp == 200 - then parseIndex . decodeUtf8 . BL.toStrict . getResponseBody $ resp - else ([], False) - where - parseIndex :: T.Text -> ([IndexItem], Bool) - parseIndex x = (mapMaybe parseIndexEntry $ partitions (matchClass (T.pack "div") (T.pack "topic")) $ parseTags x, hasNextPage $ parseTags x) - - parseIndexEntry :: [Tag T.Text] -> Maybe IndexItem - parseIndexEntry divTag = do - a <- headMay . dropWhile (~/= "") $ divTag - let text = innerText . takeWhile (~/= "") . dropWhile (~/= "") $ divTag - case a of - TagOpen _ attr -> do - href <- L.lookup (T.pack "href") attr - ts <- parseTimestamp (innerText $ takeWhile (~/= "") . dropWhile (not . matchClass (T.pack "li") (T.pack "date")) $ divTag) - Just IndexItem { iiUrl = href, - iiTitle = text, - iiPubTime = ts } - _ -> Nothing - - - makeUrl root pagenumber - | pagenumber == 0 || pagenumber == 1 = root - | otherwise = root `T.append` (T.pack "/page") `T.append` T.pack (show pagenumber) - - hasNextPage tags = if pageNumber <= 1 - then paginationLinksCount > 0 - else paginationLinksCount > 1 - where - paginationLinksCount = length . filter (~== "") . extractBetween "p" . dropWhile (~/= "