commit 813b15fc6b10947232cd26dd4d1f6a4e622fef38 Author: Denis Tereshkin Date: Wed Sep 19 22:18:57 2018 +0700 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..90d0a0d --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.* +*~ +*#*.*# \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..83e56ab --- /dev/null +++ b/LICENSE @@ -0,0 +1,30 @@ +Copyright Author name here (c) 2018 + +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following + disclaimer in the documentation and/or other materials provided + with the distribution. + + * Neither the name of Author name here nor the names of other + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..4ea628d --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +# robocom-zero diff --git a/Setup.hs b/Setup.hs new file mode 100644 index 0000000..9a994af --- /dev/null +++ b/Setup.hs @@ -0,0 +1,2 @@ +import Distribution.Simple +main = defaultMain diff --git a/robocom-zero.cabal b/robocom-zero.cabal new file mode 100644 index 0000000..5170f28 --- /dev/null +++ b/robocom-zero.cabal @@ -0,0 +1,97 @@ +name: robocom-zero +version: 0.1.0.0 +-- synopsis: +-- description: +homepage: https://github.com/asakul/robocom-zero#readme +license: BSD3 +license-file: LICENSE +author: Denis Tereshkin +maintainer: denis@kasan.ws +copyright: 2018 Denis Tereshkin +category: Web +build-type: Simple +extra-source-files: README.md +cabal-version: >=1.10 + +library + hs-source-dirs: src + ghc-options: -Wall -fno-warn-orphans -Wno-type-defaults + exposed-modules: ATrade.RoboCom.Indicators + , ATrade.RoboCom.Monad + , ATrade.RoboCom.Positions + , ATrade.RoboCom.Types + , ATrade.RoboCom.Utils + , ATrade.Quotes.Finam + , ATrade.Quotes.HAP + , ATrade.Quotes.QHP + , ATrade.Quotes.QTIS + , ATrade.Driver.Real + , ATrade.Driver.Backtest + build-depends: base >= 4.7 && < 5 + , libatrade + , text + , text-icu + , errors + , lens + , bytestring + , cassava + , containers + , time + , vector + , wreq + , safe + , hslogger + , parsec + , parsec-numbers + , aeson + , binary + , binary-ieee754 + , zeromq4-haskell + , unordered-containers + , ether + , th-printf + , BoundedChan + , monad-loops + , conduit + , safe-exceptions + , mtl + , transformers + , list-extras + , optparse-applicative + , split + , signal + , random + , hedis + + default-language: Haskell2010 + other-modules: ATrade.BarAggregator + , ATrade.Exceptions + , ATrade.Driver.Real.BrokerClientThread + , ATrade.Driver.Real.QuoteSourceThread + , ATrade.Driver.Real.Types + +test-suite robots-test + type: exitcode-stdio-1.0 + hs-source-dirs: test + main-is: Spec.hs + build-depends: base + , robocom-zero + , libatrade + , time + , text + , tasty + , tasty-hunit + , tasty-golden + , tasty-smallcheck + , tasty-quickcheck + , tasty-hspec + , quickcheck-text + , quickcheck-instances + ghc-options: -threaded -rtsopts -with-rtsopts=-N + default-language: Haskell2010 + other-modules: Test.RoboCom.Indicators + , Test.RoboCom.Utils + +source-repository head + type: git + location: https://github.com/asakul/robocom-zero diff --git a/src/ATrade/Backtest/Execution.hs b/src/ATrade/Backtest/Execution.hs new file mode 100644 index 0000000..643c180 --- /dev/null +++ b/src/ATrade/Backtest/Execution.hs @@ -0,0 +1,102 @@ +{-# LANGUAGE OverloadedStrings #-} + +module ATrade.Backtest.Execution ( + mkExecutionAgent, + ExecutionAgent(..), + executePending, + executeStep +) where + +import qualified Data.Text as T +import qualified Data.Map as M +import qualified Data.List as L +import ATrade.Types +import ATrade.Strategy.Types +import ATrade.Strategy +import Control.Monad.State +import Control.Monad.Trans.Writer +import Data.Decimal +import Data.Time.Clock +import Data.Time.Calendar + +data Position = Position { + ticker :: T.Text, + balance :: Int } + +data ExecutionAgent = ExecutionAgent { + pendingOrders :: [Order], + cash :: Decimal, + currentTime :: UTCTime, + orderIdCounter :: Integer +} + +mkExecutionAgent startCash = ExecutionAgent { pendingOrders = [], + cash = startCash, + currentTime = UTCTime (fromGregorian 1970 1 1) 0, + orderIdCounter = 1 } + +executeAtPrice :: Order -> Decimal -> WriterT [Event] (State ExecutionAgent) () +executeAtPrice order price = do + when (orderState order == Unsubmitted) $ tell [OrderSubmitted order] + tell [OrderUpdate (orderId order) Executed] + timestamp <- gets currentTime + tell [NewTrade (mkTradeForOrder timestamp order price)] + + case orderOperation order of + Buy -> modify' (\agent -> agent { cash = cash agent - price * realFracToDecimal 10 (toRational $ orderQuantity order) }) + Sell -> modify' (\agent -> agent { cash = cash agent + price * realFracToDecimal 10 (toRational $ orderQuantity order) }) + +mkTradeForOrder timestamp order price = Trade { tradeOrderId = orderId order, + tradePrice = price, + tradeQuantity = orderQuantity order, + tradeVolume = price * realFracToDecimal 10 (toRational $ orderQuantity order), + tradeVolumeCurrency = "TEST_CURRENCY", + tradeOperation = orderOperation order, + tradeAccount = orderAccountId order, + tradeSecurity = orderSecurity order, + tradeTimestamp = timestamp, + tradeSignalId = orderSignalId order } + + +executePending :: Bars -> WriterT [Event] (State ExecutionAgent) () +executePending bars = do + orders <- gets pendingOrders + let (executedOrders, leftover) = L.partition shouldExecute orders + + mapM_ executeAtOrdersPrice executedOrders + modify' (\s -> s { pendingOrders = leftover } ) + where + executeAtOrdersPrice order = case orderPrice order of + Limit price -> executeAtPrice order price + _ -> return () -- TODO handle stops + + shouldExecute order = case M.lookup (orderSecurity order) bars of + Just (DataSeries ((ts, bar) : _)) -> case orderPrice order of + Limit price -> crosses bar price + _ -> False + Nothing -> False + + crosses bar price = (barClose bar > price && barOpen bar < price) || (barClose bar < price && barOpen bar > price) + +executeStep :: Bars -> [Order] -> WriterT [Event] (State ExecutionAgent) () +executeStep bars orders = do + -- Assign consecutive IDs + orders' <- mapM (\o -> do + id <- gets orderIdCounter + modify(\s -> s { orderIdCounter = id + 1 }) + return o { orderId = id }) orders + + let (executableNow, pending) = L.partition isExecutableNow orders' + mapM_ (executeOrderAtLastPrice bars) executableNow + modify' (\s -> s { pendingOrders = pending ++ pendingOrders s }) + + where + isExecutableNow order = case M.lookup (orderSecurity order) bars of + Just (DataSeries (x:xs)) -> case orderPrice order of + Limit price -> (orderOperation order == Buy && price >= (barClose . snd) x) || (orderOperation order == Sell && price <= (barClose . snd) x) + Market -> True + _ -> False + + executeOrderAtLastPrice bars order = case M.lookup (orderSecurity order) bars of + Just (DataSeries ((ts, bar) : _)) -> executeAtPrice order (barClose bar) + _ -> return () diff --git a/src/ATrade/BarAggregator.hs b/src/ATrade/BarAggregator.hs new file mode 100644 index 0000000..4fddcef --- /dev/null +++ b/src/ATrade/BarAggregator.hs @@ -0,0 +1,126 @@ +{-# LANGUAGE BangPatterns #-} + +{-| + - Module : ATrade.BarAggregator + - Description : Aggregates incoming tick stream to bars + - Copyright : (c) Denis Tereshkin 2016-2017 + - License : Proprietary + - Maintainer : denis@kasan.ws + - Stability : experimental + - Portability : POSIX + - + - This module defines a set of functions that help to convert stream of ticks into bars. + -} + +module ATrade.BarAggregator ( + lBars, + lLastTicks, + BarAggregator(..), + mkAggregatorFromBars, + handleTick, + hmsToDiffTime +) where + +import ATrade.RoboCom.Types +import ATrade.RoboCom.Utils +import ATrade.Types +import Control.Lens +import Control.Monad.State +import qualified Data.Map.Strict as M +import Data.Time.Clock + +-- | Bar aggregator state +data BarAggregator = BarAggregator { + bars :: !(M.Map TickerId BarSeries), + lastTicks :: !(M.Map (TickerId, DataType) Tick), + tickTimeWindows :: [(DiffTime, DiffTime)] +} deriving (Show) + +-- | Creates `BarAggregator` from history +mkAggregatorFromBars :: M.Map TickerId BarSeries -> [(DiffTime, DiffTime)] -> BarAggregator +mkAggregatorFromBars myBars timeWindows = BarAggregator { + bars = myBars, + lastTicks = M.empty, + tickTimeWindows = timeWindows } + +lBars :: (M.Map TickerId BarSeries -> Identity (M.Map TickerId BarSeries)) -> BarAggregator -> Identity BarAggregator +lBars = lens bars (\s b -> s { bars = b }) + +lLastTicks :: (M.Map (TickerId, DataType) Tick -> Identity (M.Map (TickerId, DataType) Tick)) -> BarAggregator -> Identity BarAggregator +lLastTicks = lens lastTicks (\s b -> s { lastTicks = b }) + +hmsToDiffTime :: Int -> Int -> Int -> DiffTime +hmsToDiffTime h m s = secondsToDiffTime $ toInteger $ h * 3600 + m * 60 + s + +-- | main logic of bar aggregator +handleTick :: Tick -> BarAggregator -> (Maybe Bar, BarAggregator) +handleTick tick = runState $ do + lLastTicks %= M.insert (security tick, datatype tick) tick + tws <- gets tickTimeWindows + mybars <- gets bars + if (any (isInTimeInterval tick) tws) + then + case M.lookup (security tick) mybars of + Just series -> case bsBars series of + (b:bs) -> do + let currentBn = barNumber (barTimestamp b) (tfSeconds $ bsTimeframe series) + case datatype tick of + LastTradePrice -> + if volume tick > 0 + then + if currentBn == barNumber (timestamp tick) (tfSeconds $ bsTimeframe series) + then do + lBars %= M.insert (security tick) series { bsBars = updateBar b tick : bs } + return Nothing + else do + lBars %= M.insert (security tick) series { bsBars = barFromTick tick : b : bs } + return . Just $ b + else + return Nothing + _ -> + if currentBn == barNumber (timestamp tick) (tfSeconds $ bsTimeframe series) + then do + lBars %= M.insert (security tick) series { bsBars = updateBarTimestamp b tick : bs } + return Nothing + else + return Nothing + _ -> return Nothing + _ -> return Nothing + else + return Nothing + where + isInTimeInterval tick (a, b) = (utctDayTime . timestamp) tick >= a && (utctDayTime . timestamp) tick <= b + barFromTick !newtick = Bar { barSecurity = security newtick, + barTimestamp = timestamp newtick, + barOpen = value newtick, + barHigh = value newtick, + barLow = value newtick, + barClose = value newtick, + barVolume = abs . volume $ newtick } + updateBar !bar newtick = + let newHigh = max (barHigh bar) (value newtick) + newLow = min (barLow bar) (value newtick) in + if timestamp newtick >= barTimestamp bar + then bar { + barTimestamp = timestamp newtick, + barHigh = newHigh, + barLow = newLow, + barClose = value newtick, + barVolume = barVolume bar + (abs . volume $ newtick) } + else bar + + updateBarTimestamp !bar newtick = bar { barTimestamp = newTimestamp } + where + newTimestamp = timestamp newtick + + emptyBarFrom !bar newtick = newBar + where + newTimestamp = timestamp newtick + newBar = Bar { + barSecurity = barSecurity bar, + barTimestamp = newTimestamp, + barOpen = barClose bar, + barHigh = barClose bar, + barLow = barClose bar, + barClose = barClose bar, + barVolume = 0 } diff --git a/src/ATrade/Driver/Backtest.hs b/src/ATrade/Driver/Backtest.hs new file mode 100644 index 0000000..87aad9b --- /dev/null +++ b/src/ATrade/Driver/Backtest.hs @@ -0,0 +1,313 @@ +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE RankNTypes #-} + +module ATrade.Driver.Backtest ( + backtestMain +) where + +import ATrade.Driver.Real.Types (InitializationCallback, + Strategy (..), + StrategyInstanceParams (..)) +import ATrade.Exceptions +import ATrade.Quotes.Finam as QF +import ATrade.RoboCom.Monad (Event (..), EventCallback, + StrategyAction (..), + StrategyEnvironment (..), + runStrategyElement) +import ATrade.RoboCom.Positions +import ATrade.RoboCom.Types (BarSeries (..), Ticker (..), + Timeframe (..)) +import ATrade.Types +import Conduit (awaitForever, runConduit, yield, + (.|)) +import Control.Exception.Safe +import Control.Monad.ST (runST) +import Control.Monad.State +import Data.Aeson (FromJSON (..), Result (..), + Value (..), decode) +import Data.Aeson.Types (parseMaybe) +import Data.ByteString.Lazy (readFile, toStrict) +import Data.HashMap.Strict (lookup) +import Data.List (concat, filter, find, partition) +import Data.List.Split (splitOn) +import qualified Data.Map.Strict as M +import Data.Semigroup ((<>)) +import Data.STRef (newSTRef, readSTRef, writeSTRef) +import qualified Data.Text as T +import Data.Text.IO (putStrLn) +import Data.Time.Calendar (fromGregorian) +import Data.Time.Clock (DiffTime, UTCTime (..)) +import Data.Vector ((!), (!?), (//)) +import qualified Data.Vector as V +import Options.Applicative hiding (Success) +import Prelude hiding (lookup, putStrLn, readFile) +import Safe (headMay) + +data Feed = Feed TickerId FilePath + deriving (Show, Eq) + +data Params = Params { + strategyConfigFile :: FilePath, + qtisEndpoint :: Maybe String, + paramsFeeds :: [Feed] +} deriving (Show, Eq) + +paramsParser :: Parser Params +paramsParser = Params + <$> strOption ( + long "config" <> short 'c' + ) + <*> optional ( strOption + ( long "qtis" <> short 'q' <> metavar "ENDPOINT/ID" )) + <*> some (option feedArgParser ( + long "feed" <> short 'f' + )) + +feedArgParser :: ReadM Feed +feedArgParser = eitherReader (\s -> case splitOn ":" s of + [tid, fpath] -> Right $ Feed (T.pack tid) fpath + _ -> Left $ "Unable to parse feed id: " ++ s) + +backtestMain :: (FromJSON c, StateHasPositions s) => DiffTime -> s -> Maybe (InitializationCallback c) -> EventCallback c s -> IO () +backtestMain dataDownloadDelta defaultState initCallback callback = do + params <- execParser opts + (tickerList, config) <- loadStrategyConfig params + + let instanceParams = StrategyInstanceParams { + strategyInstanceId = "foo", + strategyAccount = "foo", + strategyVolume = 1, + tickers = tickerList, + strategyQuotesourceEp = "", + strategyBrokerEp = "", + strategyHistoryProviderType = "", + strategyHistoryProvider = "", + strategyQTISEp = T.pack <$> qtisEndpoint params} + + updatedConfig <- case initCallback of + Just cb -> cb config instanceParams + Nothing -> return config + + feeds <- loadFeeds (paramsFeeds params) + + runBacktestDriver feeds config tickerList + where + opts = info (helper <*> paramsParser) + ( fullDesc <> header "ATrade strategy backtesting framework" ) + + runBacktestDriver feeds params tickerList = do + let s = runConduit $ barStreamFromFeeds feeds .| backtestLoop + let finalState = execState (unBacktestingMonad s) $ defaultBacktestState defaultState params tickerList + print $ cash finalState + print $ tradesLog finalState + forM_ (logs finalState) putStrLn + print $ (M.keys . seBars . strategyEnvironment) finalState + + loadStrategyConfig :: (FromJSON c) => Params -> IO ([Ticker], c) + loadStrategyConfig params = do + content <- readFile (strategyConfigFile params) + case loadStrategyConfig' content of + Just (tickersList, config) -> return (tickersList, config) + _ -> throw $ UnableToLoadConfig (T.pack . strategyConfigFile $ params) + + loadStrategyConfig' content = do + v <- decode content + case v of + Object o -> do + mbTickers <- "tickers" `lookup` o + mbParams <- "params" `lookup` o + tickers <- parseMaybe parseJSON mbTickers + params <- parseMaybe parseJSON mbParams + return (tickers, params) + _ -> Nothing + + resultToMaybe (Error _) = Nothing + resultToMaybe (Success a) = Just a + + barStreamFromFeeds feeds = case nextBar feeds of + Just (bar, feeds') -> yield bar >> barStreamFromFeeds feeds' + _ -> return () + + nextBar :: V.Vector [Bar] -> Maybe (Bar, V.Vector [Bar]) + nextBar feeds = case indexOfNextFeed feeds of + Just ix -> do + f <- feeds !? ix + h <- headMay f + return (h, feeds // [(ix, tail f)]) + _ -> Nothing + + indexOfNextFeed feeds = runST $ do + minTs <- newSTRef Nothing + minIx <- newSTRef Nothing + forM_ [0..(V.length feeds-1)] (\ix -> do + let feed = feeds ! ix + curIx <- readSTRef minIx + curTs <- readSTRef minTs + case feed of + x:_ -> case curTs of + Just ts -> when (barTimestamp x < ts) $ do + writeSTRef minIx $ Just ix + writeSTRef minTs $ Just (barTimestamp x) + _ -> do + writeSTRef minIx $ Just ix + writeSTRef minTs $ Just (barTimestamp x) + _ -> return ()) + readSTRef minIx + + backtestLoop = awaitForever (\bar -> do + env <- gets strategyEnvironment + let oldTimestamp = seLastTimestamp env + let newTimestamp = barTimestamp bar + let newenv = env { seBars = updateBars (seBars env) bar } + curState <- gets robotState + modify' (\s -> s { strategyEnvironment = newenv }) + handleEvents [NewBar bar]) + + handleEvents events = do + newActions <- mapM handleEvent events + newEvents <- executeActions (concat newActions) + unless (null newEvents) $ handleEvents newEvents + + executeActions actions = concat <$> mapM executeAction actions + + executeAction (ActionOrder order) = do + oid <- nextOrderId + let submittedOrder = order { orderState = Submitted, orderId = oid } + modify' (\s -> s { pendingOrders = submittedOrder : pendingOrders s }) + return [OrderSubmitted submittedOrder] + + executeAction (ActionCancelOrder oid) = do + mbOrder <- find (\o -> orderId o == oid && orderState o == Submitted) <$> gets pendingOrders + case mbOrder of + Just _ -> do + modify' (\s -> s { pendingOrders = filter (\o -> orderId o == oid) (pendingOrders s)}) + return [OrderUpdate oid Cancelled] + _ -> return [] + + executeAction (ActionLog t) = modify' (\s -> s { logs = t : logs s }) >> return [] + executeAction (ActionSetupTimer t) = modify' (\s -> s { pendingTimers = t : pendingTimers s }) >> return [] + executeAction (ActionIO _ _) = return [] + + executePendingOrders bar = do + ev1 <- executeMarketOrders bar + ev2 <- executeLimitOrders bar + return $ ev1 ++ ev2 + + executeLimitOrders bar = do + (limitOrders, otherOrders) <- partition + (\o -> case orderPrice o of + Limit _ -> True + _ -> False) <$> gets pendingOrders + let (executableOrders, otherOrders) = partition (isExecutable bar) limitOrders + modify' (\s -> s { pendingOrders = otherOrders } ) + forM executableOrders $ \order -> + order `executeAtPrice` priceForLimitOrder order bar + + isExecutable bar order = case orderPrice order of + Limit price -> if orderOperation order == Buy + then price <= barLow bar + else price >= barHigh bar + _ -> True + + priceForLimitOrder order bar = case orderPrice order of + Limit price -> if orderOperation order == Buy + then if price >= barOpen bar + then barOpen bar + else price + else if price <= barOpen bar + then barOpen bar + else price + _ -> error "Should've been limit order" + + executeMarketOrders bar = do + (marketOrders, otherOrders) <- partition (\o -> orderPrice o == Market) <$> gets pendingOrders + modify' (\s -> s { pendingOrders = otherOrders }) + forM marketOrders $ \order -> + order `executeAtPrice` barOpen bar + + executeAtPrice order price = do + ts <- seLastTimestamp <$> gets strategyEnvironment + modify' (\s -> s { tradesLog = mkTrade order price ts : tradesLog s }) + return $ OrderUpdate (orderId order) Executed + + mkTrade order price ts = Trade { + tradeOrderId = orderId order, + tradePrice = price, + tradeQuantity = orderQuantity order, + tradeVolume = (fromIntegral . orderQuantity $ order) * price, + tradeVolumeCurrency = "pt", + tradeOperation = orderOperation order, + tradeAccount = orderAccountId order, + tradeSecurity = orderSecurity order, + tradeTimestamp = ts, + tradeCommission = 0, + tradeSignalId = orderSignalId order + } + + handleEvent event@(NewBar bar) = do + events <- executePendingOrders bar + firedTimers <- fireTimers (barTimestamp bar) + actions <- concat <$> mapM handleEvent (events ++ map TimerFired firedTimers) + actions' <- handleEvent' event + return $ actions ++ actions' + + handleEvent event = handleEvent' event + + handleEvent' event = do + env <- gets strategyEnvironment + params <- gets robotParams + curState <- gets robotState + let (newState, actions, _) = runStrategyElement params curState env $ callback event + modify' (\s -> s { robotState = newState } ) + return actions + + updateBars barMap newbar = M.alter (\case + Nothing -> Just BarSeries { bsTickerId = barSecurity newbar, + bsTimeframe = Timeframe 60, + bsBars = [newbar] } + Just bs -> Just bs { bsBars = newbar : bsBars bs }) (barSecurity newbar) barMap + + fireTimers ts = do + (firedTimers, otherTimers) <- partition (< ts) <$> gets pendingTimers + modify' (\s -> s { pendingTimers = otherTimers }) + return firedTimers + + loadFeeds :: [Feed] -> IO (V.Vector [Bar]) + loadFeeds feeds = V.fromList <$> mapM loadFeed feeds + loadFeed (Feed tid path) = do + content <- readFile path + case QF.parseQuotes $ toStrict content of + Just quotes -> return $ fmap (rowToBar tid) quotes + _ -> throw $ UnableToLoadFeed (T.pack path) + + rowToBar tid r = Bar tid (rowTime r) (rowOpen r) (rowHigh r) (rowLow r) (rowClose r) (rowVolume r) + + nextOrderId = do + oid <- gets orderIdCounter + modify' (\s -> s { orderIdCounter = oid + 1 }) + return oid + + +data BacktestState s c = BacktestState { + cash :: Double, + robotState :: s, + robotParams :: c, + strategyEnvironment :: StrategyEnvironment, + pendingOrders :: [Order], + tradesLog :: [Trade], + orderIdCounter :: Integer, + pendingTimers :: [UTCTime], + logs :: [T.Text] +} + +defaultBacktestState s c tickerList = BacktestState 0 s c (StrategyEnvironment "" "" 1 tickers (UTCTime (fromGregorian 1970 1 1) 0)) [] [] 1 [] [] + where + tickers = M.fromList $ map (\x -> (code x, BarSeries (code x) (Timeframe (timeframeSeconds x)) [])) tickerList + +newtype BacktestingMonad s c a = BacktestingMonad { unBacktestingMonad :: State (BacktestState s c) a } + deriving (Functor, Applicative, Monad, MonadState (BacktestState s c)) + diff --git a/src/ATrade/Driver/Real.hs b/src/ATrade/Driver/Real.hs new file mode 100644 index 0000000..18fc549 --- /dev/null +++ b/src/ATrade/Driver/Real.hs @@ -0,0 +1,455 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE MultiWayIf #-} +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE CPP #-} +{-# LANGUAGE RankNTypes #-} + +module ATrade.Driver.Real ( + Strategy(..), + StrategyInstanceParams(..), + robotMain, + BigConfig(..), + mkBarStrategy, + barStrategyDriver +) where + +import Options.Applicative +import System.IO +import System.Signal +import System.Exit +import System.Random +import System.Log.Logger +import System.Log.Handler.Simple +import System.Log.Handler (setFormatter) +import System.Log.Formatter +import Control.Monad +import Control.Concurrent hiding (writeChan, readChan, writeList2Chan, yield) +import Control.Concurrent.BoundedChan as BC +import Control.Exception +import qualified Data.ByteString as BS +import qualified Data.ByteString.Lazy as BL +import qualified Data.List as L +import qualified Data.Map as M +import qualified Data.Text as T +import Data.Text.Encoding +import Data.Aeson +import Data.IORef +import Data.Time.Calendar +import Data.Time.Clock +import Data.Time.Clock.POSIX +import Data.Maybe +import Data.Monoid +import Database.Redis hiding (info, decode) +import ATrade.Types +import ATrade.RoboCom.Monad (StrategyMonad, StrategyAction(..), EventCallback, Event(..), runStrategyElement, StrategyEnvironment(..), Event(..)) +import ATrade.BarAggregator +import ATrade.Driver.Real.BrokerClientThread +import ATrade.Driver.Real.QuoteSourceThread +import ATrade.Driver.Real.Types (Strategy(..), StrategyInstanceParams(..), InitializationCallback) +import ATrade.RoboCom.Types (BarSeries(..), Ticker(..), Timeframe(..)) +import ATrade.Exceptions +import ATrade.Quotes.Finam as QF +import ATrade.Quotes.QHP as QQ +import ATrade.Quotes.HAP as QH +import System.ZMQ4 hiding (Event(..)) + +data Params = Params { + instanceId :: String, + strategyConfigFile :: FilePath, + strategyStateFile :: FilePath, + brokerEp :: String, + quotesourceEp :: String, + historyProviderType :: Maybe String, + historyProvider :: Maybe String, + redisSocket :: Maybe String, + qtisSocket :: Maybe String, + accountId :: String, + volumeFactor :: Int +} deriving (Show, Eq) + +paramsParser :: Parser Params +paramsParser = Params + <$> strOption + ( long "instance-id" + <> metavar "ID" ) + <*> strOption + ( long "config" + <> metavar "FILEPATH" ) + <*> strOption + ( long "state" + <> metavar "FILEPATH" ) + <*> strOption + ( long "broker" + <> metavar "BROKER_ENDPOINT" ) + <*> strOption + ( long "quotesource" + <> metavar "QUOTESOURCE_ENDPOINT" ) + <*> optional ( strOption + ( long "history-provider-type" + <> metavar "TYPE/ID" )) + <*> optional ( strOption + ( long "history-provider" + <> metavar "ENDPOINT/ID" )) + <*> optional ( strOption + ( long "redis-socket" + <> metavar "ADDRESS" )) + <*> optional ( strOption + ( long "qtis" + <> metavar "ENDPOINT/ID" )) + <*> strOption + ( long "account" + <> metavar "ACCOUNT" ) + <*> option auto + ( long "volume" + <> metavar "VOLUME" ) + + +data BigConfig c = BigConfig { + confTickers :: [Ticker], + strategyConfig :: c +} + +instance (FromJSON c) => FromJSON (BigConfig c) where + parseJSON = withObject "object" (\obj -> BigConfig <$> + obj .: "tickers" <*> + obj .: "params") + +instance (ToJSON c) => ToJSON (BigConfig c) where + toJSON conf = object ["tickers" .= confTickers conf, + "params" .= strategyConfig conf ] + +storeState :: (ToJSON s) => Params -> IORef s -> IORef [UTCTime] -> IO () +storeState params stateRef timersRef = do + currentStrategyState <- readIORef stateRef + currentTimersState <- readIORef timersRef + case redisSocket params of + Nothing -> withFile (strategyStateFile params) WriteMode (\f -> BS.hPut f $ BL.toStrict $ encode currentStrategyState) + `catch` (\e -> warningM "main" ("Unable to save state: " ++ show (e :: IOException))) + Just sock -> do +#ifdef linux_HOST_OS + conn <- checkedConnect $ defaultConnectInfo { connectPort = UnixSocket sock } + now <- getPOSIXTime + res <- runRedis conn $ mset [(encodeUtf8 $ T.pack $ instanceId params, BL.toStrict $ encode currentStrategyState), + (encodeUtf8 $ T.pack $ instanceId params ++ ":last_store", encodeUtf8 $ T.pack $ show now), + (encodeUtf8 $ T.pack $ instanceId params ++ ":timers", encodeUtf8 $ T.pack $ show now) ] + + case res of + Left _ -> warningM "main" "Unable to save state" + Right _ -> return () +#else + return () +#endif + + +gracefulShutdown :: (ToJSON s) => Params -> IORef s -> IORef [UTCTime] -> MVar () -> Signal -> IO () +gracefulShutdown params stateRef timersRef shutdownMv _ = do + infoM "main" "Shutdown, saving state" + storeState params stateRef timersRef + putMVar shutdownMv () + exitSuccess + +robotMain :: (ToJSON s, FromJSON s, FromJSON c) => DiffTime -> s -> Maybe (InitializationCallback c) -> EventCallback c s -> IO () +robotMain dataDownloadDelta defaultState initCallback callback = do + params <- execParser opts + initLogging params + infoM "main" "Starting" + + (tickerList, config) <- loadStrategyConfig params + stratState <- loadStrategyState params + + let instanceParams = StrategyInstanceParams { + strategyInstanceId = T.pack . instanceId $ params, + strategyAccount = T.pack . accountId $ params, + strategyVolume = volumeFactor params, + tickers = tickerList, + strategyQuotesourceEp = T.pack . quotesourceEp $ params, + strategyBrokerEp = T.pack . brokerEp $ params, + strategyHistoryProviderType = T.pack $ fromMaybe "finam" $ historyProviderType params, + strategyHistoryProvider = T.pack $ fromMaybe "" $ historyProvider params, + strategyQTISEp = T.pack <$> qtisSocket params} + + updatedConfig <- case initCallback of + Just cb -> cb config instanceParams + Nothing -> return config + + let strategy = mkBarStrategy instanceParams dataDownloadDelta updatedConfig stratState callback + stateRef <- newIORef stratState + timersRef <- newIORef [] + shutdownMv <- newEmptyMVar + installHandler sigINT (gracefulShutdown params stateRef timersRef shutdownMv) + installHandler sigTERM (gracefulShutdown params stateRef timersRef shutdownMv) + randsec <- getStdRandom(randomR(1, 10)) + threadDelay $ randsec * 1000000 + debugM "main" "Forking state saving thread" + stateSavingThread <- forkIO $ forever $ do + threadDelay 1000000 + storeState params stateRef timersRef + + debugM "main" "Starting strategy driver" + barStrategyDriver tickFilter strategy stateRef timersRef shutdownMv `finally` killThread stateSavingThread + where + tickFilter :: Tick -> Bool + tickFilter tick = + let classCode = T.takeWhile (/= '#') (security tick) in + if + | classCode == "SPBFUT" || classCode == "SPBOPT" -> any (inInterval . utctDayTime . timestamp $ tick) fortsIntervals + | otherwise -> any (inInterval . utctDayTime . timestamp $ tick) secIntervals + + fortsIntervals = [(fromHMS 7 0 0, fromHMS 11 0 0), (fromHMS 11 5 0, fromHMS 15 45 0), (fromHMS 16 0 0, fromHMS 20 50 0)] + secIntervals = [(fromHMS 6 50 0, fromHMS 15 51 0)] + + fromHMS h m s = h * 3600 + m * 60 + s + inInterval ts (start, end) = ts >= start && ts <= end + + opts = info (helper <*> paramsParser) + ( fullDesc <> header "ATrade strategy execution framework" ) + + initLogging params = do + handler <- streamHandler stderr DEBUG >>= + (\x -> return $ + setFormatter x (simpleLogFormatter $ + "$utcTime\t[" ++ instanceId params ++ "]\t\t{$loggername}\t\t<$prio> -> $msg")) + + hSetBuffering stderr LineBuffering + updateGlobalLogger rootLoggerName (setLevel DEBUG) + updateGlobalLogger rootLoggerName (setHandlers [handler]) + + loadStrategyConfig params = withFile (strategyConfigFile params) ReadMode (\f -> do + bigconfig <- eitherDecode . BL.fromStrict <$> BS.hGetContents f + case bigconfig of + Right conf -> return (confTickers conf, strategyConfig conf) + Left errmsg -> throw $ UnableToLoadConfig $ (T.pack . show) errmsg) + + loadStrategyState params = case redisSocket params of + Nothing -> loadStateFromFile (strategyStateFile params) + Just sock -> do +#ifdef linux_HOST_OS + conn <- checkedConnect $ defaultConnectInfo { connectPort = UnixSocket sock } + res <- runRedis conn $ get (encodeUtf8 $ T.pack $ instanceId params) + case res of + Left _ -> do + warningM "main" "Unable to load state" + return defaultState + Right mv -> case mv of + Just v -> case eitherDecode $ BL.fromStrict v of + Left _ -> do + warningM "main" "Unable to load state" + return defaultState + Right s -> return s + Nothing -> do + warningM "main" "Unable to load state" + return defaultState +#else + error "Not implemented" +#endif + + loadStateFromFile filepath = withFile filepath ReadMode (\f -> do + maybeState <- decode . BL.fromStrict <$> BS.hGetContents f + case maybeState of + Just st -> return st + Nothing -> return defaultState ) `catch` + (\e -> warningM "main" ("Unable to load state: " ++ show (e :: IOException)) >> return defaultState) + +-- | Helper function to make 'Strategy' instances +mkBarStrategy :: StrategyInstanceParams -> DiffTime -> c -> s -> EventCallback c s -> Strategy c s +mkBarStrategy instanceParams dd params initialState cb = BarStrategy { + downloadDelta = dd, + eventCallback = cb, + currentState = initialState, + strategyParams = params, + strategyTimers = [], + + strategyInstanceParams = instanceParams } + +-- | Main function which handles incoming events (ticks/orders), passes them to strategy callback +-- and executes returned strategy actions +barStrategyDriver :: (Tick -> Bool) -> Strategy c s -> IORef s -> IORef [UTCTime] -> MVar () -> IO () +barStrategyDriver tickFilter strategy stateRef timersRef shutdownVar = do + -- Make channels + -- Event channel is for strategy events, like new tick arrival, or order execution notification + eventChan <- BC.newBoundedChan 1000 + -- Orders channel passes strategy orders to broker thread + ordersChan <- BC.newBoundedChan 1000 + + withContext (\ctx -> do + -- Load tickers data and create BarAggregator from them + historyBars <- + if + | (strategyHistoryProviderType . strategyInstanceParams) strategy == "finam" -> + M.fromList <$> mapM loadTickerFromFinam (tickers . strategyInstanceParams $ strategy) + | (strategyHistoryProviderType . strategyInstanceParams) strategy == "hap" -> + M.fromList <$> mapM (loadTickerFromHAP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy)) (tickers . strategyInstanceParams $ strategy) + | otherwise -> + M.fromList <$> mapM (loadTickerFromQHP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy)) (tickers . strategyInstanceParams $ strategy) + agg <- newIORef $ mkAggregatorFromBars historyBars [(hmsToDiffTime 6 50 0, hmsToDiffTime 21 0 0)] + bracket (startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter) killThread (\_ -> do + debugM "Strategy" "QuoteSource thread forked" + bracket (startBrokerClientThread (strategyInstanceId . strategyInstanceParams $ strategy) ctx brEp ordersChan eventChan shutdownVar) killThread (\_ -> do + debugM "Strategy" "Broker thread forked" + + wakeupTid <- forkIO $ forever $ do + maybeShutdown <- tryTakeMVar shutdownVar + if isJust maybeShutdown + then writeChan eventChan Shutdown + else do + threadDelay 1000000 + writeChan ordersChan BrokerRequestNotifications + debugM "Strategy" "Wakeup thread forked" + + let env = StrategyEnvironment { + seInstanceId = strategyInstanceId . strategyInstanceParams $ strategy, + seAccount = strategyAccount . strategyInstanceParams $ strategy, + seVolume = strategyVolume . strategyInstanceParams $ strategy, + seBars = M.empty, + seLastTimestamp = UTCTime (fromGregorian 1970 1 1) 0 + } + readAndHandleEvents agg ordersChan eventChan strategy env + debugM "Strategy" "Stopping strategy driver" + killThread wakeupTid))) + + debugM "Strategy" "Strategy done" + + where + qsEp = strategyQuotesourceEp . strategyInstanceParams $ strategy + brEp = strategyBrokerEp . strategyInstanceParams $ strategy + readAndHandleEvents agg ordersChan eventChan strategy' env = do + event <- readChan eventChan + if event /= Shutdown + then do + currentBars <- bars <$> readIORef agg + let params = strategyParams strategy' + let curState = currentState strategy' + let instId = strategyInstanceId . strategyInstanceParams $ strategy' + let acc = strategyAccount . strategyInstanceParams $ strategy' + let vol = strategyVolume . strategyInstanceParams $ strategy' + + let oldTimestamp = seLastTimestamp env + let newTimestamp = case event of + NewTick tick -> timestamp tick + _ -> seLastTimestamp env + + newTimers <- catMaybes <$> (mapM (checkTimer eventChan newTimestamp) $ strategyTimers strategy') + + let !newenv = env { seBars = currentBars, seLastTimestamp = newTimestamp } + let (!newState, !actions, _) = runStrategyElement params curState newenv $ (eventCallback strategy) event + writeIORef stateRef newState + writeIORef timersRef newTimers + + newTimers' <- catMaybes <$> mapM handleTimerActions actions + mapM_ (handleActions ordersChan) actions + readAndHandleEvents agg ordersChan eventChan (strategy' { currentState = newState, strategyTimers = newTimers ++ newTimers' }) newenv + else debugM "Strategy" "Shutdown requested" + where + handleTimerActions action = + case action of + ActionSetupTimer timerTime -> return $ Just timerTime + _ -> return Nothing + + handleActions ordersChan' action = + case action of + (ActionLog logText) -> debugM "Strategy" $ T.unpack logText + (ActionOrder order) -> writeChan ordersChan' $ BrokerSubmitOrder order + (ActionCancelOrder oid) -> writeChan ordersChan' $ BrokerCancelOrder oid + (ActionSetupTimer _) -> return () + (ActionIO tag io) -> void $ forkIO $ do + v <- io + writeChan eventChan (ActionCompleted tag v) + + checkTimer eventChan' newTimestamp timerTime = + if newTimestamp >= timerTime + then do + writeChan eventChan' $ TimerFired timerTime + return Nothing + else + return $ Just timerTime + + loadTickerFromHAP :: Context -> T.Text -> Ticker -> IO (TickerId, BarSeries) + loadTickerFromHAP ctx ep t = do + debugM "Strategy" $ "Loading ticker from HAP: " ++ show (code t) + case parseHAPPeriod $ timeframeSeconds t of + Just tf -> do + now <- getCurrentTime + historyBars <- QH.getQuotes ctx QH.RequestParams { + QH.endpoint = ep, + QH.ticker = code t, + QH.startDate = addUTCTime (negate . (1 +) . fromRational . toRational $ downloadDelta strategy) now, + QH.endDate = now, + QH.period = tf } + debugM "Strategy" $ "Obtained " ++ show (length historyBars) ++ " bars" + return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = historyBars }) + _ -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = [] }) + + + loadTickerFromQHP :: Context -> T.Text -> Ticker -> IO (TickerId, BarSeries) + loadTickerFromQHP ctx ep t = do + debugM "Strategy" $ "Loading ticker from QHP: " ++ show (code t) + case parseQHPPeriod $ timeframeSeconds t of + Just tf -> do + now <- getCurrentTime + historyBars <- QQ.getQuotes ctx QQ.RequestParams { + QQ.endpoint = ep, + QQ.ticker = code t, + QQ.startDate = addDays (negate . (1 +) . ceiling $ downloadDelta strategy / 86400) (utctDay now), + QQ.endDate = utctDay now, + QQ.period = tf } + debugM "Strategy" $ "Obtained " ++ show (length historyBars) ++ " bars" + return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = historyBars }) + _ -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = [] }) + + + loadTickerFromFinam :: Ticker -> IO (TickerId, BarSeries) + loadTickerFromFinam t = do + randDelay <- getStdRandom (randomR (1, 5)) + threadDelay $ randDelay * 1000000 + now <- getCurrentTime + debugM "Strategy" $ show (L.lookup "finam" (aliases t), parseFinamPeriod $ timeframeSeconds t) + case (L.lookup "finam" (aliases t), parseFinamPeriod $ timeframeSeconds t) of + (Just finamCode, Just per) -> do + debugM "Strategy" $ "Downloading ticker: " ++ finamCode + history <- downloadAndParseQuotes $ defaultParams { QF.ticker = T.pack finamCode, + QF.startDate = addDays (negate . (1 +) . ceiling $ downloadDelta strategy / 86400) (utctDay now), + QF.endDate = utctDay now, + QF.period = per } + case history of + Just h -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = convertFromFinamHistory (code t) h }) + Nothing -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = [] }) + _ -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = [] }) + + convertFromFinamHistory :: TickerId -> [Row] -> [Bar] + convertFromFinamHistory tid = L.reverse . fmap (\row -> Bar { barSecurity = tid, + barTimestamp = rowTime row, + barOpen = rowOpen row, + barHigh = rowHigh row, + barLow = rowLow row, + barClose = rowClose row, + barVolume = rowVolume row }) + + parseFinamPeriod x + | x == 0 = Just QF.PeriodTick + | x == 60 = Just QF.Period1Min + | x == 5 * 60 = Just QF.Period5Min + | x == 10 * 60 = Just QF.Period10Min + | x == 15 * 60 = Just QF.Period15Min + | x == 30 * 60 = Just QF.Period30Min + | x == 60 * 60 = Just QF.PeriodHour + | x == 24 * 60 * 60 = Just QF.PeriodDay + | otherwise = Nothing + + parseQHPPeriod x + | x == 60 = Just QQ.Period1Min + | x == 5 * 60 = Just QQ.Period5Min + | x == 15 * 60 = Just QQ.Period15Min + | x == 30 * 60 = Just QQ.Period30Min + | x == 60 * 60 = Just QQ.PeriodHour + | x == 24 * 60 * 60 = Just QQ.PeriodDay + | otherwise = Nothing + + parseHAPPeriod x + | x == 60 = Just QH.Period1Min + | x == 5 * 60 = Just QH.Period5Min + | x == 15 * 60 = Just QH.Period15Min + | x == 30 * 60 = Just QH.Period30Min + | x == 60 * 60 = Just QH.PeriodHour + | x == 24 * 60 * 60 = Just QH.PeriodDay + | otherwise = Nothing + diff --git a/src/ATrade/Driver/Real/BrokerClientThread.hs b/src/ATrade/Driver/Real/BrokerClientThread.hs new file mode 100644 index 0000000..1159314 --- /dev/null +++ b/src/ATrade/Driver/Real/BrokerClientThread.hs @@ -0,0 +1,81 @@ +{-# LANGUAGE OverloadedStrings #-} + +module ATrade.Driver.Real.BrokerClientThread ( + startBrokerClientThread, + BrokerCommand(..) +) where + +import ATrade.Broker.Client +import ATrade.Broker.Protocol +import ATrade.RoboCom.Monad hiding (submitOrder, cancelOrder) +import ATrade.RoboCom.Types +import ATrade.Types + +import Control.Concurrent.BoundedChan +import Control.Concurrent hiding (writeChan, readChan, writeList2Chan, yield) +import Control.Exception +import Control.Monad.Loops +import Control.Monad + +import Data.IORef +import qualified Data.Text as T +import Data.Text.Encoding +import Data.Time.Clock +import Data.Maybe + +import System.Log.Logger +import System.ZMQ4 hiding (Event) + +data BrokerCommand = BrokerSubmitOrder Order | BrokerCancelOrder Integer | BrokerRequestNotifications + + +startBrokerClientThread :: T.Text -> Context -> T.Text -> BoundedChan BrokerCommand -> BoundedChan Event -> MVar a -> IO ThreadId +startBrokerClientThread instId ctx brEp ordersChan eventChan shutdownVar = forkIO $ whileM_ (isNothing <$> tryReadMVar shutdownVar) $ + bracket (startBrokerClient (encodeUtf8 instId) ctx brEp defaultClientSecurityParams) + (\bro -> do + stopBrokerClient bro + debugM "Strategy" "Broker client: stop") + (\bs -> handle (\e -> do + warningM "Strategy" $ "Broker client: exception: " ++ show (e :: SomeException) + throwIO e) $ do + now <- getCurrentTime + lastNotificationTime <- newIORef now + whileM_ (andM [notTimeout lastNotificationTime, isNothing <$> tryReadMVar shutdownVar]) $ do + brokerCommand <- readChan ordersChan + case brokerCommand of + BrokerSubmitOrder order -> do + debugM "Strategy" $ "Submitting order: " ++ show order + maybeOid <- submitOrder bs order + debugM "Strategy" "Order submitted" + case maybeOid of + Right oid -> writeChan eventChan (OrderSubmitted order { orderId = oid }) + Left errmsg -> debugM "Strategy" $ T.unpack $ "Error: " `T.append` errmsg + BrokerCancelOrder oid -> do + debugM "Strategy" $ "Cancelling order: " ++ show oid + _ <- cancelOrder bs oid + debugM "Strategy" "Order cancelled" + BrokerRequestNotifications -> do + t <- getCurrentTime + nt <- readIORef lastNotificationTime + when (t `diffUTCTime` nt > 1) $ do + maybeNs <- getNotifications bs + case maybeNs of + Left errmsg -> debugM "Strategy" $ T.unpack $ "Error: " `T.append` errmsg + Right ns -> do + mapM_ (sendNotification eventChan) ns + getCurrentTime >>= (writeIORef lastNotificationTime) + nTimeout <- notTimeout lastNotificationTime + shouldShutdown <- isNothing <$> tryReadMVar shutdownVar + debugM "Strategy" $ "Broker loop end: " ++ show nTimeout ++ "/" ++ show shouldShutdown) + +notTimeout :: IORef UTCTime -> IO Bool +notTimeout ts = do + now <- getCurrentTime + heartbeatTs <- readIORef ts + return $ diffUTCTime now heartbeatTs < 30 + +sendNotification :: BoundedChan Event -> Notification -> IO () +sendNotification eventChan notification = + writeChan eventChan $ case notification of + OrderNotification oid state -> OrderUpdate oid state + TradeNotification trade -> NewTrade trade diff --git a/src/ATrade/Driver/Real/QuoteSourceThread.hs b/src/ATrade/Driver/Real/QuoteSourceThread.hs new file mode 100644 index 0000000..541cf97 --- /dev/null +++ b/src/ATrade/Driver/Real/QuoteSourceThread.hs @@ -0,0 +1,44 @@ +{-# LANGUAGE BangPatterns #-} + +module ATrade.Driver.Real.QuoteSourceThread +( + startQuoteSourceThread +) where + +import ATrade.BarAggregator +import ATrade.QuoteSource.Client +import ATrade.RoboCom.Monad +import ATrade.RoboCom.Types +import ATrade.Types +import ATrade.Driver.Real.Types + +import Data.IORef +import qualified Data.Text as T + +import Control.Concurrent.BoundedChan +import Control.Concurrent hiding (writeChan, readChan, writeList2Chan, yield) +import Control.Exception +import Control.Monad + +import System.Log.Logger +import System.ZMQ4 hiding (Event) + +startQuoteSourceThread :: Context -> T.Text -> Strategy c s -> BoundedChan Event -> IORef BarAggregator -> (Tick -> Bool) -> IO ThreadId +startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter = forkIO $ do + tickChan <- newBoundedChan 1000 + bracket (startQuoteSourceClient tickChan (fmap code . (tickers . strategyInstanceParams) $ strategy) ctx qsEp) + (\qs -> do + stopQuoteSourceClient qs + debugM "Strategy" "Quotesource client: stop") + (\_ -> forever $ do + tick <- readChan tickChan + when (goodTick tick) $ do + writeChan eventChan (NewTick tick) + aggValue <- readIORef agg + case handleTick tick aggValue of + (Just bar, !newAggValue) -> writeChan eventChan (NewBar bar) >> writeIORef agg newAggValue + (Nothing, !newAggValue) -> writeIORef agg newAggValue) + where + goodTick tick = tickFilter tick && + (datatype tick /= LastTradePrice || (datatype tick == LastTradePrice && volume tick > 0)) + diff --git a/src/ATrade/Driver/Real/Types.hs b/src/ATrade/Driver/Real/Types.hs new file mode 100644 index 0000000..c18baac --- /dev/null +++ b/src/ATrade/Driver/Real/Types.hs @@ -0,0 +1,39 @@ +{-# LANGUAGE RankNTypes #-} + +module ATrade.Driver.Real.Types ( + Strategy(..), + StrategyInstanceParams(..), + InitializationCallback +) where + +import ATrade.RoboCom.Monad +import ATrade.RoboCom.Types + +import Data.Time.Clock +import qualified Data.Text as T + +-- | Top-level strategy configuration and state +data Strategy c s = BarStrategy { + downloadDelta :: DiffTime, -- ^ How much history to download at strategy start + eventCallback :: EventCallback c s, -- ^ Strategy event callback + currentState :: s, -- ^ Current strategy state. Updated after each 'EventCallback' call + strategyParams :: c, -- ^ Strategy params + strategyTimers :: [UTCTime], + + strategyInstanceParams :: StrategyInstanceParams -- ^ Instance params +} + +-- | Strategy instance params store few params which are common for all strategies +data StrategyInstanceParams = StrategyInstanceParams { + strategyInstanceId :: T.Text, -- ^ Strategy instance identifier. Should be unique among all strategies (very desirable) + strategyAccount :: T.Text, -- ^ Account string to use for this strategy instance. Broker-dependent + strategyVolume :: Int, -- ^ Volume to use for this instance (in lots/contracts) + tickers :: [Ticker], -- ^ List of tickers which is used by this strategy + strategyQuotesourceEp :: T.Text, -- ^ QuoteSource server endpoint + strategyBrokerEp :: T.Text, -- ^ Broker server endpoint + strategyHistoryProviderType :: T.Text, + strategyHistoryProvider :: T.Text, + strategyQTISEp :: Maybe T.Text +} + +type InitializationCallback c = c -> StrategyInstanceParams -> IO c diff --git a/src/ATrade/Exceptions.hs b/src/ATrade/Exceptions.hs new file mode 100644 index 0000000..f18cd37 --- /dev/null +++ b/src/ATrade/Exceptions.hs @@ -0,0 +1,15 @@ +{-# LANGUAGE DeriveGeneric #-} + +module ATrade.Exceptions ( + RoboComException(..) +) where + +import Control.Exception +import qualified Data.Text as T +import GHC.Generics + +data RoboComException = UnableToLoadConfig T.Text | UnableToLoadFeed T.Text + deriving (Show, Generic) + +instance Exception RoboComException + diff --git a/src/ATrade/Forums/Smartlab.hs b/src/ATrade/Forums/Smartlab.hs new file mode 100644 index 0000000..a9145a5 --- /dev/null +++ b/src/ATrade/Forums/Smartlab.hs @@ -0,0 +1,153 @@ +{-# OPTIONS_GHC -Wno-type-defaults #-} + +module ATrade.Forums.Smartlab ( + NewsItem(..), + IndexItem(..), + getIndex, + getItem +) where + +import qualified Data.ByteString.Lazy as BL +import qualified Data.Text as T +import Data.Text.Encoding +import qualified Data.List as L +import Data.Time.Calendar +import Data.Time.Clock +import Data.Maybe +import Network.HTTP.Simple +import Safe +import Text.HTML.TagSoup +import Text.Parsec +import Text.Parsec.Text +import Text.StringLike + +import Debug.Trace + +data NewsItem = NewsItem { + niUrl :: !T.Text, + niHeader :: !T.Text, + niText :: !T.Text, + niAuthor :: !T.Text, + niPubTime :: !UTCTime +} deriving (Show, Eq) + +data IndexItem = IndexItem { + iiUrl :: !T.Text, + iiTitle :: !T.Text, + iiPubTime :: !UTCTime +} deriving (Show, Eq) + +monthNames :: [T.Text] +monthNames = fmap T.pack ["января", "февраля", "марта", "апреля", "мая", "июня", "июля", "августа", "сентября", "октября", "ноября", "декабря"] + +extractBetween :: StringLike str => String -> [Tag str] -> [Tag str] +extractBetween tagName = takeWhile (~/= closeTag) . dropWhile (~/= openTag) + where + openTag = "<" ++ tagName ++ ">" + closeTag = "" + +matchClass :: T.Text -> T.Text -> Tag T.Text -> Bool +matchClass _ className (TagOpen _ attrs) = case L.lookup (T.pack "class") attrs of + Just klass -> className `L.elem` T.words klass + Nothing -> False + +matchClass _ _ _ = False + +parseTimestamp :: T.Text -> Maybe UTCTime +parseTimestamp text = case parse timestampParser "" text of + Left _ -> Nothing + Right val -> Just val + where + timestampParser :: Parser UTCTime + timestampParser = do + spaces + day <- read <$> many1 digit + spaces + monthName <- T.pack <$> many1 letter + case L.elemIndex monthName monthNames of + Nothing -> fail "Can't parse month" + Just month -> do + spaces + year <- fromIntegral . read <$> many1 digit + _ <- char ',' + spaces + hour <- fromIntegral . read <$> many1 digit + _ <- char ':' + minute <- fromIntegral . read <$> many1 digit + return $ UTCTime (fromGregorian year (month + 1) day) (hour * 3600 + minute * 60) + +getItem :: IndexItem -> IO (Maybe NewsItem) +getItem indexItem = do + rq <- parseRequest $ T.unpack (iiUrl indexItem) + resp <- httpLBS rq + if getResponseStatusCode resp == 200 + then return . parseItem . decodeUtf8 . BL.toStrict . getResponseBody $ resp + else return Nothing + where + parseItem rawHtml = case parseTimestamp timestamp of + Just itemPubtime -> Just NewsItem { + niUrl = iiUrl indexItem, + niHeader = itemHeader, + niText = itemText, + niAuthor = itemAuthor, + niPubTime = itemPubtime + } + Nothing -> Nothing + where + itemHeader = innerText . + extractBetween "span" . + extractBetween "h1" . + dropWhile (not . matchClass (T.pack "div") (T.pack "topic")) $ tags + + itemText = innerText . + extractBetween "div" . + dropWhile (not . matchClass (T.pack "div") (T.pack "content")) . + dropWhile (~/= "
") $ tags + + itemAuthor = innerText . + extractBetween "li" . + dropWhile (not . matchClass (T.pack "li") (T.pack "author")) $ tags + + timestamp = traceShowId $ innerText . + extractBetween "li" . + dropWhile (not . matchClass (T.pack "li") (T.pack "date")) $ tags + + tags = parseTags rawHtml + + +getIndex :: T.Text -> Int -> IO ([IndexItem], Bool) +getIndex rootUrl pageNumber = do + rq <- parseRequest $ T.unpack $ makeUrl rootUrl pageNumber + resp <- httpLBS rq + return $ if getResponseStatusCode resp == 200 + then parseIndex . decodeUtf8 . BL.toStrict . getResponseBody $ resp + else ([], False) + where + parseIndex :: T.Text -> ([IndexItem], Bool) + parseIndex x = (mapMaybe parseIndexEntry $ partitions (matchClass (T.pack "div") (T.pack "topic")) $ parseTags x, hasNextPage $ parseTags x) + + parseIndexEntry :: [Tag T.Text] -> Maybe IndexItem + parseIndexEntry divTag = do + a <- headMay . dropWhile (~/= "") $ divTag + let text = innerText . takeWhile (~/= "") . dropWhile (~/= "") $ divTag + case a of + TagOpen _ attr -> do + href <- L.lookup (T.pack "href") attr + ts <- parseTimestamp (innerText $ takeWhile (~/= "") . dropWhile (not . matchClass (T.pack "li") (T.pack "date")) $ divTag) + Just IndexItem { iiUrl = href, + iiTitle = text, + iiPubTime = ts } + _ -> Nothing + + + makeUrl root pagenumber + | pagenumber == 0 || pagenumber == 1 = root + | otherwise = root `T.append` (T.pack "/page") `T.append` T.pack (show pagenumber) + + hasNextPage tags = if pageNumber <= 1 + then paginationLinksCount > 0 + else paginationLinksCount > 1 + where + paginationLinksCount = length . filter (~== "") . extractBetween "p" . dropWhile (~/= "