From f45cf72c13c4e412993c81c8bcfd3a01070bb8ed Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Thu, 4 Jul 2019 11:32:12 +0700 Subject: [PATCH 01/51] Fix possible race condition --- src/ATrade/Driver/Real/QuoteSourceThread.hs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ATrade/Driver/Real/QuoteSourceThread.hs b/src/ATrade/Driver/Real/QuoteSourceThread.hs index 9d592c0..089a41e 100644 --- a/src/ATrade/Driver/Real/QuoteSourceThread.hs +++ b/src/ATrade/Driver/Real/QuoteSourceThread.hs @@ -40,13 +40,13 @@ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter maybeSourceTim when (isNothing maybeSourceTimeframe) $ do aggValue <- readIORef agg case handleTick tick aggValue of - (Just bar, !newAggValue) -> writeChan eventChan (NewBar bar) >> writeIORef agg newAggValue + (Just bar, !newAggValue) -> writeIORef agg newAggValue >> writeChan eventChan (NewBar bar) (Nothing, !newAggValue) -> writeIORef agg newAggValue QDBar (_, bar) -> do aggValue <- readIORef agg when (isJust maybeSourceTimeframe) $ do case handleBar bar aggValue of - (Just bar', !newAggValue) -> writeChan eventChan (NewBar bar') >> writeIORef agg newAggValue + (Just bar', !newAggValue) -> writeIORef agg newAggValue >> writeChan eventChan (NewBar bar') (Nothing, !newAggValue) -> writeIORef agg newAggValue) where goodTick tick = tickFilter tick && From be73227e2ce77166777919f685a4843e498c1023 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Thu, 4 Jul 2019 11:49:35 +0700 Subject: [PATCH 02/51] Fix timers storage --- src/ATrade/Driver/Real.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ATrade/Driver/Real.hs b/src/ATrade/Driver/Real.hs index e415dea..04d0cc4 100644 --- a/src/ATrade/Driver/Real.hs +++ b/src/ATrade/Driver/Real.hs @@ -231,7 +231,7 @@ robotMain dataDownloadDelta defaultState initCallback callback = do Just sock -> do #ifdef linux_HOST_OS conn <- checkedConnect $ defaultConnectInfo { connectPort = UnixSocket sock } - res <- runRedis conn $ get (encodeUtf8 $ T.pack $ instanceId params ++ "timers") + res <- runRedis conn $ get (encodeUtf8 $ T.pack $ instanceId params ++ ":timers") case res of Left _ -> do warningM "main" "Unable to load state" From e47d786317f7c54110c941c25a971bd5e2a999f8 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Tue, 9 Jul 2019 13:58:15 +0700 Subject: [PATCH 03/51] BarAggregator: use tick timestamps --- src/ATrade/BarAggregator.hs | 37 +++++++++++++++++++++ src/ATrade/Driver/Real/QuoteSourceThread.hs | 16 ++++++--- test/Test/BarAggregator.hs | 33 +++++++++++++++++- 3 files changed, 80 insertions(+), 6 deletions(-) diff --git a/src/ATrade/BarAggregator.hs b/src/ATrade/BarAggregator.hs index 226639b..672a97e 100644 --- a/src/ATrade/BarAggregator.hs +++ b/src/ATrade/BarAggregator.hs @@ -19,6 +19,7 @@ module ATrade.BarAggregator ( mkAggregatorFromBars, handleTicks, handleTick, + updateTime, handleBar, hmsToDiffTime ) where @@ -144,6 +145,42 @@ handleTick tick = runState $ do barClose = barClose bar, barVolume = 0 } +updateTime :: Tick -> BarAggregator -> (Maybe Bar, BarAggregator) +updateTime tick = runState $ do + lLastTicks %= M.insert (security tick, datatype tick) tick + tws <- gets tickTimeWindows + mybars <- gets bars + if (any (isInTimeInterval tick) tws) + then + case M.lookup (security tick) mybars of + Just series -> case bsBars series of + (b:bs) -> do + let currentBn = barNumber (barTimestamp b) (tfSeconds $ bsTimeframe series) + if currentBn == barNumber (timestamp tick) (tfSeconds $ bsTimeframe series) + then do + lBars %= M.insert (security tick) series { bsBars = updateBarTimestamp b tick : bs } + return Nothing + else do + lBars %= M.insert (security tick) series { bsBars = emptyBarFromTick tick : b : bs } + return $ Just b + _ -> return Nothing + _ -> return Nothing + else + return Nothing + where + isInTimeInterval t (a, b) = (utctDayTime . timestamp) t >= a && (utctDayTime . timestamp) t <= b + emptyBarFromTick !newtick = Bar { barSecurity = security newtick, + barTimestamp = timestamp newtick, + barOpen = value newtick, + barHigh = value newtick, + barLow = value newtick, + barClose = value newtick, + barVolume = 0 } + + updateBarTimestamp !bar newtick = bar { barTimestamp = newTimestamp } + where + newTimestamp = timestamp newtick + handleBar :: Bar -> BarAggregator -> (Maybe Bar, BarAggregator) handleBar bar = runState $ do tws <- gets tickTimeWindows diff --git a/src/ATrade/Driver/Real/QuoteSourceThread.hs b/src/ATrade/Driver/Real/QuoteSourceThread.hs index 089a41e..28426df 100644 --- a/src/ATrade/Driver/Real/QuoteSourceThread.hs +++ b/src/ATrade/Driver/Real/QuoteSourceThread.hs @@ -37,11 +37,17 @@ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter maybeSourceTim case qdata of QDTick tick -> when (goodTick tick) $ do writeChan eventChan (NewTick tick) - when (isNothing maybeSourceTimeframe) $ do - aggValue <- readIORef agg - case handleTick tick aggValue of - (Just bar, !newAggValue) -> writeIORef agg newAggValue >> writeChan eventChan (NewBar bar) - (Nothing, !newAggValue) -> writeIORef agg newAggValue + case maybeSourceTimeframe of + Nothing -> do + aggValue <- readIORef agg + case handleTick tick aggValue of + (Just bar, !newAggValue) -> writeIORef agg newAggValue >> writeChan eventChan (NewBar bar) + (Nothing, !newAggValue) -> writeIORef agg newAggValue + Just _ -> do + aggValue <- readIORef agg + case updateTime tick aggValue of + (Just bar, !newAggValue) -> writeIORef agg newAggValue >> writeChan eventChan (NewBar bar) + (Nothing, !newAggValue) -> writeIORef agg newAggValue QDBar (_, bar) -> do aggValue <- readIORef agg when (isJust maybeSourceTimeframe) $ do diff --git a/test/Test/BarAggregator.hs b/test/Test/BarAggregator.hs index d9269d0..a9263b5 100644 --- a/test/Test/BarAggregator.hs +++ b/test/Test/BarAggregator.hs @@ -33,6 +33,7 @@ unitTests = testGroup "BarAggregator" [ , testTwoBarsInSameBar , testTwoBarsInSameBarLastBar , testNextBarAfterBarClose + , testUpdateTime ] properties = testGroup "BarAggregator" [ @@ -192,6 +193,37 @@ testNextBarAfterBarClose = testCase "Three bars (smaller timeframe) - next bar a barClose = fromDouble c, barVolume = v } +testUpdateTime :: TestTree +testUpdateTime = testCase "updateTime - next bar - creates new bar with zero volume" $ do + let series = BarSeries "TEST_TICKER" (Timeframe 3600) [] + let agg = mkAggregatorFromBars (M.fromList [("TEST_TICKER", series)]) [(0, 86400)] + let (_, newagg) = handleBar (bar testTimestamp1 12.00 13.00 10.00 11.00 1) agg + let (_, newagg') = handleBar (bar testTimestamp2 12.00 15.00 11.00 12.00 2) newagg + let (newBar, newagg'') = updateTime (tick testTimestamp4 13.00 100) newagg' + let expectedNewBar = Bar "TEST_TICKER" testTimestamp2 12.00 15.00 10.00 12.00 3 + let expectedBar = Bar "TEST_TICKER" testTimestamp4 13.00 13.00 13.00 13.00 0 + (head <$> bsBars <$> (M.lookup "TEST_TICKER" $ bars newagg'')) @?= Just expectedBar + newBar @?= Just expectedNewBar + where + testTimestamp1 = (UTCTime (fromGregorian 1970 1 1) 560) + testTimestamp2 = (UTCTime (fromGregorian 1970 1 1) 600) + testTimestamp3 = (UTCTime (fromGregorian 1970 1 1) 3600) + testTimestamp4 = (UTCTime (fromGregorian 1970 1 1) 3660) + tick ts v vol = Tick { + security = "TEST_TICKER" + , datatype = LastTradePrice + , timestamp = ts + , value = v + , volume = vol } + bar ts o h l c v = Bar { + barSecurity = "TEST_TICKER", + barTimestamp = ts, + barOpen = fromDouble o, + barHigh = fromDouble h, + barLow = fromDouble l, + barClose = fromDouble c, + barVolume = v } + prop_allTicksInOneBar :: TestTree prop_allTicksInOneBar = QC.testProperty "All ticks in one bar" $ QC.forAll (QC.choose (1, 86400)) $ \timeframe -> QC.forAll (QC.listOf1 (genTick "TEST_TICKER" baseTime timeframe)) $ \ticks -> @@ -215,4 +247,3 @@ prop_allTicksInOneBar = QC.testProperty "All ticks in one bar" $ QC.forAll (QC.c currentBar tickerId agg = headMay =<< (bsBars <$> M.lookup tickerId (bars agg)) baseTime = UTCTime (fromGregorian 1970 1 1) 0 - From 4b7112a8d742b59885daea13caf51f1ad6463557 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Wed, 10 Jul 2019 15:33:31 +0700 Subject: [PATCH 04/51] BarAggregator: ignore earlier ticks --- src/ATrade/BarAggregator.hs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/src/ATrade/BarAggregator.hs b/src/ATrade/BarAggregator.hs index 672a97e..d0fb70f 100644 --- a/src/ATrade/BarAggregator.hs +++ b/src/ATrade/BarAggregator.hs @@ -1,4 +1,5 @@ {-# LANGUAGE BangPatterns #-} +{-# LANGUAGE MultiWayIf #-} {-| - Module : ATrade.BarAggregator @@ -156,13 +157,15 @@ updateTime tick = runState $ do Just series -> case bsBars series of (b:bs) -> do let currentBn = barNumber (barTimestamp b) (tfSeconds $ bsTimeframe series) - if currentBn == barNumber (timestamp tick) (tfSeconds $ bsTimeframe series) - then do - lBars %= M.insert (security tick) series { bsBars = updateBarTimestamp b tick : bs } - return Nothing - else do - lBars %= M.insert (security tick) series { bsBars = emptyBarFromTick tick : b : bs } - return $ Just b + let thisBn = barNumber (timestamp tick) (tfSeconds $ bsTimeframe series) + if + | currentBn == thisBn -> do + lBars %= M.insert (security tick) series { bsBars = updateBarTimestamp b tick : bs } + return Nothing + | currentBn < thisBn -> do + lBars %= M.insert (security tick) series { bsBars = emptyBarFromTick tick : b : bs } + return $ Just b + | otherwise -> return Nothing _ -> return Nothing _ -> return Nothing else From cd22e7ee1feec3622242097a900cd4c77142b876 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Sun, 14 Jul 2019 13:03:57 +0700 Subject: [PATCH 05/51] BarAggregator: handleBar: do not handle old bars --- robocom-zero.cabal | 2 +- src/ATrade/BarAggregator.hs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/robocom-zero.cabal b/robocom-zero.cabal index 83a275b..37514b8 100644 --- a/robocom-zero.cabal +++ b/robocom-zero.cabal @@ -29,7 +29,7 @@ library , ATrade.Driver.Backtest , ATrade.BarAggregator build-depends: base >= 4.7 && < 5 - , libatrade == 0.8.0.0 + , libatrade >= 0.8.0.0 && < 0.9.0.0 , text , text-icu , errors diff --git a/src/ATrade/BarAggregator.hs b/src/ATrade/BarAggregator.hs index d0fb70f..457305e 100644 --- a/src/ATrade/BarAggregator.hs +++ b/src/ATrade/BarAggregator.hs @@ -194,11 +194,11 @@ handleBar bar = runState $ do Just series -> case bsBars series of (b:bs) -> do let currentBn = barNumber (barTimestamp b) (tfSeconds $ bsTimeframe series) - if currentBn == barNumber (barTimestamp bar) (tfSeconds $ bsTimeframe series) - then do + if + | currentBn == barNumber (barTimestamp bar) (tfSeconds $ bsTimeframe series) -> do lBars %= M.insert (barSecurity bar) series { bsBars = updateBar b bar : bs } return Nothing - else + | currentBn < barNumber (barTimestamp bar) (tfSeconds $ bsTimeframe series) -> do if barEndTime b (tfSeconds $ bsTimeframe series) == barTimestamp bar then do lBars %= M.insert (barSecurity bar) series { bsBars = emptyBarFrom bar : (updateBar b bar : bs) } From 3e12d803e4714592171c5e425554442ee293eb4d Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Wed, 17 Jul 2019 14:40:51 +0700 Subject: [PATCH 06/51] Request only given timeframe --- src/ATrade/Driver/Real/QuoteSourceThread.hs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/ATrade/Driver/Real/QuoteSourceThread.hs b/src/ATrade/Driver/Real/QuoteSourceThread.hs index 28426df..e33dc2e 100644 --- a/src/ATrade/Driver/Real/QuoteSourceThread.hs +++ b/src/ATrade/Driver/Real/QuoteSourceThread.hs @@ -28,7 +28,7 @@ import System.ZMQ4 hiding (Event) startQuoteSourceThread :: Context -> T.Text -> Strategy c s -> BoundedChan Event -> IORef BarAggregator -> (Tick -> Bool) -> Maybe Int -> IO ThreadId startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter maybeSourceTimeframe = forkIO $ do tickChan <- newBoundedChan 1000 - bracket (startQuoteSourceClient tickChan tickersList ctx qsEp) + bracket (startQuoteSourceClient tickChan (fmap applyTimeframeSpec tickersList) ctx qsEp) (\qs -> do stopQuoteSourceClient qs debugM "Strategy" "Quotesource client: stop") @@ -59,4 +59,7 @@ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter maybeSourceTim (datatype tick /= LastTradePrice || (datatype tick == LastTradePrice && volume tick > 0)) tickersList = fmap code . (tickers . strategyInstanceParams) $ strategy + applyTimeframeSpec t = case maybeSourceTimeframe of + Just tf -> t `T.append` T.pack (":" ++ show tf ++ ";") + Nothing -> t From d9b88a501f93ab27b6d0fdf614db95cdcb7637bb Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Wed, 17 Jul 2019 14:43:31 +0700 Subject: [PATCH 07/51] More debug --- src/ATrade/Driver/Real.hs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/ATrade/Driver/Real.hs b/src/ATrade/Driver/Real.hs index 04d0cc4..8877cdc 100644 --- a/src/ATrade/Driver/Real.hs +++ b/src/ATrade/Driver/Real.hs @@ -423,6 +423,7 @@ barStrategyDriver mbSourceTimeframe tickFilter strategy stateRef timersRef shutd QQ.endDate = utctDay now, QQ.period = tf } debugM "Strategy" $ "Obtained " ++ show (length historyBars) ++ " bars" + debugM "Strategy" $ show (take 20 historyBars) return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = historyBars }) _ -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = [] }) From a932aa052b29bd13bdfe23922d9ae6e071b1d157 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Thu, 18 Jul 2019 09:34:12 +0700 Subject: [PATCH 08/51] Bar aggregator: fix non-exhaustive multiway if --- src/ATrade/BarAggregator.hs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/ATrade/BarAggregator.hs b/src/ATrade/BarAggregator.hs index 457305e..bcd2e4c 100644 --- a/src/ATrade/BarAggregator.hs +++ b/src/ATrade/BarAggregator.hs @@ -206,6 +206,7 @@ handleBar bar = runState $ do else do lBars %= M.insert (barSecurity bar) series { bsBars = bar : b : bs } return . Just $ b + | otherwise -> return Nothing _ -> do lBars %= M.insert (barSecurity bar) series { bsBars = [bar] } return Nothing From d258bc42efac19dce468795ca50be7718495d32d Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Thu, 18 Jul 2019 20:53:54 +0700 Subject: [PATCH 09/51] Update for libatrade-0.9 Some fixes in tick/bar handling --- robocom-zero.cabal | 2 +- src/ATrade/BarAggregator.hs | 2 +- src/ATrade/Driver/Real/QuoteSourceThread.hs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/robocom-zero.cabal b/robocom-zero.cabal index 37514b8..9496e51 100644 --- a/robocom-zero.cabal +++ b/robocom-zero.cabal @@ -29,7 +29,7 @@ library , ATrade.Driver.Backtest , ATrade.BarAggregator build-depends: base >= 4.7 && < 5 - , libatrade >= 0.8.0.0 && < 0.9.0.0 + , libatrade >= 0.9.0.0 && < 0.10.0.0 , text , text-icu , errors diff --git a/src/ATrade/BarAggregator.hs b/src/ATrade/BarAggregator.hs index bcd2e4c..55c54c0 100644 --- a/src/ATrade/BarAggregator.hs +++ b/src/ATrade/BarAggregator.hs @@ -227,7 +227,7 @@ handleBar bar = runState $ do emptyBarFrom bar' = Bar { barSecurity = barSecurity bar', - barTimestamp = barTimestamp bar', + barTimestamp = 0.000001 `addUTCTime` barTimestamp bar', barOpen = barClose bar', barHigh = barClose bar', barLow = barClose bar', diff --git a/src/ATrade/Driver/Real/QuoteSourceThread.hs b/src/ATrade/Driver/Real/QuoteSourceThread.hs index e33dc2e..769ee91 100644 --- a/src/ATrade/Driver/Real/QuoteSourceThread.hs +++ b/src/ATrade/Driver/Real/QuoteSourceThread.hs @@ -28,7 +28,7 @@ import System.ZMQ4 hiding (Event) startQuoteSourceThread :: Context -> T.Text -> Strategy c s -> BoundedChan Event -> IORef BarAggregator -> (Tick -> Bool) -> Maybe Int -> IO ThreadId startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter maybeSourceTimeframe = forkIO $ do tickChan <- newBoundedChan 1000 - bracket (startQuoteSourceClient tickChan (fmap applyTimeframeSpec tickersList) ctx qsEp) + bracket (startQuoteSourceClient tickChan tickersList ctx qsEp defaultClientSecurityParams) (\qs -> do stopQuoteSourceClient qs debugM "Strategy" "Quotesource client: stop") From 8be20b57dd5d32015efd791e31405bebc1e9a15e Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Sun, 28 Jul 2019 16:08:38 +0700 Subject: [PATCH 10/51] QuoteSource: do not use ticks time --- src/ATrade/Driver/Real/QuoteSourceThread.hs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/ATrade/Driver/Real/QuoteSourceThread.hs b/src/ATrade/Driver/Real/QuoteSourceThread.hs index 769ee91..c402982 100644 --- a/src/ATrade/Driver/Real/QuoteSourceThread.hs +++ b/src/ATrade/Driver/Real/QuoteSourceThread.hs @@ -43,11 +43,7 @@ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter maybeSourceTim case handleTick tick aggValue of (Just bar, !newAggValue) -> writeIORef agg newAggValue >> writeChan eventChan (NewBar bar) (Nothing, !newAggValue) -> writeIORef agg newAggValue - Just _ -> do - aggValue <- readIORef agg - case updateTime tick aggValue of - (Just bar, !newAggValue) -> writeIORef agg newAggValue >> writeChan eventChan (NewBar bar) - (Nothing, !newAggValue) -> writeIORef agg newAggValue + Just _ -> return () QDBar (_, bar) -> do aggValue <- readIORef agg when (isJust maybeSourceTimeframe) $ do From 21cbcf7826b8c843f07cd33ddf7c1dd9aa0fbf18 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Wed, 7 Aug 2019 13:05:21 +0700 Subject: [PATCH 11/51] Filter incoming bars by timeframe --- src/ATrade/Driver/Real/QuoteSourceThread.hs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/ATrade/Driver/Real/QuoteSourceThread.hs b/src/ATrade/Driver/Real/QuoteSourceThread.hs index c402982..e5d3b1a 100644 --- a/src/ATrade/Driver/Real/QuoteSourceThread.hs +++ b/src/ATrade/Driver/Real/QuoteSourceThread.hs @@ -44,12 +44,14 @@ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter maybeSourceTim (Just bar, !newAggValue) -> writeIORef agg newAggValue >> writeChan eventChan (NewBar bar) (Nothing, !newAggValue) -> writeIORef agg newAggValue Just _ -> return () - QDBar (_, bar) -> do + QDBar (incomingTf, bar) -> do aggValue <- readIORef agg - when (isJust maybeSourceTimeframe) $ do - case handleBar bar aggValue of - (Just bar', !newAggValue) -> writeIORef agg newAggValue >> writeChan eventChan (NewBar bar') - (Nothing, !newAggValue) -> writeIORef agg newAggValue) + case maybeSourceTimeframe of + Just tf -> when (tf == unBarTimeframe incomingTf) $ + case handleBar bar aggValue of + (Just bar', !newAggValue) -> writeIORef agg newAggValue >> writeChan eventChan (NewBar bar') + (Nothing, !newAggValue) -> writeIORef agg newAggValue + _ -> return ()) where goodTick tick = tickFilter tick && (datatype tick /= LastTradePrice || (datatype tick == LastTradePrice && volume tick > 0)) From 41652e4d1e0e07cbe0c036784941625bbba7ca7a Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Sun, 11 Aug 2019 11:45:19 +0700 Subject: [PATCH 12/51] QHP: iso8601 dates --- src/ATrade/Quotes/QHP.hs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/ATrade/Quotes/QHP.hs b/src/ATrade/Quotes/QHP.hs index bd6841b..469336e 100644 --- a/src/ATrade/Quotes/QHP.hs +++ b/src/ATrade/Quotes/QHP.hs @@ -13,7 +13,9 @@ import Data.Binary.IEEE754 import qualified Data.ByteString.Lazy as BL import qualified Data.Text as T import Data.Time.Calendar +import Data.Time.Clock import Data.Time.Clock.POSIX +import Data.Time.Format import System.Log.Logger import System.ZMQ4 @@ -48,10 +50,13 @@ data RequestParams = period :: Period } deriving (Show, Eq) +printDatetime :: UTCTime -> String +printDatetime = formatTime defaultTimeLocale (iso8601DateFormat (Just "%H:%M:%S")) + instance ToJSON RequestParams where toJSON p = object [ "ticker" .= ticker p, - "from" .= showGregorian (startDate p), - "to" .= showGregorian (endDate p), + "from" .= printDatetime (UTCTime (startDate p) 0), + "to" .= printDatetime (UTCTime (endDate p) 0), "timeframe" .= show (period p) ] getQuotes :: Context -> RequestParams -> IO [Bar] From eb61223e181e667fcd8e48f909903281c9774f65 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Mon, 26 Aug 2019 14:56:21 +0700 Subject: [PATCH 13/51] Real: read params from IORef --- src/ATrade/Driver/Real.hs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/ATrade/Driver/Real.hs b/src/ATrade/Driver/Real.hs index 8877cdc..8a312ba 100644 --- a/src/ATrade/Driver/Real.hs +++ b/src/ATrade/Driver/Real.hs @@ -144,7 +144,6 @@ storeState params stateRef timersRef = do return () #endif - gracefulShutdown :: (ToJSON s) => Params -> IORef s -> IORef [UTCTime] -> MVar () -> Signal -> IO () gracefulShutdown params stateRef timersRef shutdownMv _ = do infoM "main" "Shutdown, saving state" @@ -179,6 +178,7 @@ robotMain dataDownloadDelta defaultState initCallback callback = do let strategy = mkBarStrategy instanceParams dataDownloadDelta updatedConfig stratState callback stateRef <- newIORef stratState + configRef <- newIORef config timersRef <- newIORef timersState shutdownMv <- newEmptyMVar installHandler sigINT (gracefulShutdown params stateRef timersRef shutdownMv) @@ -191,7 +191,7 @@ robotMain dataDownloadDelta defaultState initCallback callback = do storeState params stateRef timersRef debugM "main" "Starting strategy driver" - barStrategyDriver (sourceBarTimeframe params) tickFilter strategy stateRef timersRef shutdownMv `finally` killThread stateSavingThread + barStrategyDriver (sourceBarTimeframe params) tickFilter strategy configRef stateRef timersRef shutdownMv `finally` killThread stateSavingThread where tickFilter :: Tick -> Bool tickFilter tick = @@ -293,8 +293,8 @@ mkBarStrategy instanceParams dd params initialState cb = BarStrategy { -- | Main function which handles incoming events (ticks/orders), passes them to strategy callback -- and executes returned strategy actions -barStrategyDriver :: Maybe Int -> (Tick -> Bool) -> Strategy c s -> IORef s -> IORef [UTCTime] -> MVar () -> IO () -barStrategyDriver mbSourceTimeframe tickFilter strategy stateRef timersRef shutdownVar = do +barStrategyDriver :: Maybe Int -> (Tick -> Bool) -> Strategy c s -> IORef c -> IORef s -> IORef [UTCTime] -> MVar () -> IO () +barStrategyDriver mbSourceTimeframe tickFilter strategy configRef stateRef timersRef shutdownVar = do -- Make channels -- Event channel is for strategy events, like new tick arrival, or order execution notification eventChan <- BC.newBoundedChan 1000 @@ -347,8 +347,8 @@ barStrategyDriver mbSourceTimeframe tickFilter strategy stateRef timersRef shutd if event /= Shutdown then do currentBars <- bars <$> readIORef agg - let params = strategyParams strategy' - let curState = currentState strategy' + params <- readIORef configRef + curState <- readIORef stateRef let instId = strategyInstanceId . strategyInstanceParams $ strategy' let acc = strategyAccount . strategyInstanceParams $ strategy' let vol = strategyVolume . strategyInstanceParams $ strategy' From 6c12329d86264811022fa9d549dd71e91ce3e37a Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Wed, 28 Aug 2019 15:07:55 +0700 Subject: [PATCH 14/51] Moved Driver.Real.Types to Driver.Types --- robocom-zero.cabal | 2 +- src/ATrade/Driver/Backtest.hs | 9 ++++----- src/ATrade/Driver/Real.hs | 2 +- src/ATrade/Driver/Real/QuoteSourceThread.hs | 2 +- src/ATrade/Driver/{Real => }/Types.hs | 3 ++- 5 files changed, 9 insertions(+), 9 deletions(-) rename src/ATrade/Driver/{Real => }/Types.hs (98%) diff --git a/robocom-zero.cabal b/robocom-zero.cabal index 9496e51..3945aab 100644 --- a/robocom-zero.cabal +++ b/robocom-zero.cabal @@ -68,7 +68,7 @@ library other-modules: ATrade.Exceptions , ATrade.Driver.Real.BrokerClientThread , ATrade.Driver.Real.QuoteSourceThread - , ATrade.Driver.Real.Types + , ATrade.Driver.Types test-suite robots-test type: exitcode-stdio-1.0 diff --git a/src/ATrade/Driver/Backtest.hs b/src/ATrade/Driver/Backtest.hs index bbd99fc..b88d08c 100644 --- a/src/ATrade/Driver/Backtest.hs +++ b/src/ATrade/Driver/Backtest.hs @@ -2,15 +2,15 @@ {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE QuasiQuotes #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE QuasiQuotes #-} module ATrade.Driver.Backtest ( backtestMain ) where -import ATrade.Driver.Real.Types (InitializationCallback, +import ATrade.Driver.Types (InitializationCallback, Strategy (..), StrategyInstanceParams (..)) import ATrade.Exceptions @@ -18,8 +18,7 @@ import ATrade.Quotes.Finam as QF import ATrade.RoboCom.Monad (Event (..), EventCallback, StrategyAction (..), StrategyEnvironment (..), - runStrategyElement, st, - appendToLog) + appendToLog, runStrategyElement, st) import ATrade.RoboCom.Positions import ATrade.RoboCom.Types (BarSeries (..), Ticker (..), Timeframe (..)) @@ -273,7 +272,7 @@ backtestMain dataDownloadDelta defaultState initCallback callback = do Just bs -> Just bs { bsBars = updateBarList newbar (bsBars bs) }) (barSecurity newbar) barMap updateBarList newbar (_:bs) = newbar:newbar:bs - updateBarList newbar _ = newbar:[newbar] + updateBarList newbar _ = newbar:[newbar] fireTimers ts = do (firedTimers, otherTimers) <- partition (< ts) <$> gets pendingTimers diff --git a/src/ATrade/Driver/Real.hs b/src/ATrade/Driver/Real.hs index 8a312ba..9a667db 100644 --- a/src/ATrade/Driver/Real.hs +++ b/src/ATrade/Driver/Real.hs @@ -45,7 +45,7 @@ import ATrade.RoboCom.Monad (StrategyMonad, StrategyAction(..), EventCallback, E import ATrade.BarAggregator import ATrade.Driver.Real.BrokerClientThread import ATrade.Driver.Real.QuoteSourceThread -import ATrade.Driver.Real.Types (Strategy(..), StrategyInstanceParams(..), InitializationCallback) +import ATrade.Driver.Types (Strategy(..), StrategyInstanceParams(..), InitializationCallback) import ATrade.RoboCom.Types (BarSeries(..), Ticker(..), Timeframe(..)) import ATrade.Exceptions import ATrade.Quotes.Finam as QF diff --git a/src/ATrade/Driver/Real/QuoteSourceThread.hs b/src/ATrade/Driver/Real/QuoteSourceThread.hs index e5d3b1a..51891df 100644 --- a/src/ATrade/Driver/Real/QuoteSourceThread.hs +++ b/src/ATrade/Driver/Real/QuoteSourceThread.hs @@ -6,7 +6,7 @@ module ATrade.Driver.Real.QuoteSourceThread ) where import ATrade.BarAggregator -import ATrade.Driver.Real.Types +import ATrade.Driver.Types import ATrade.QuoteSource.Client import ATrade.RoboCom.Monad import ATrade.RoboCom.Types diff --git a/src/ATrade/Driver/Real/Types.hs b/src/ATrade/Driver/Types.hs similarity index 98% rename from src/ATrade/Driver/Real/Types.hs rename to src/ATrade/Driver/Types.hs index 0728fa8..ce247f6 100644 --- a/src/ATrade/Driver/Real/Types.hs +++ b/src/ATrade/Driver/Types.hs @@ -1,6 +1,7 @@ {-# LANGUAGE RankNTypes #-} -module ATrade.Driver.Real.Types ( +module ATrade.Driver.Types +( Strategy(..), StrategyInstanceParams(..), InitializationCallback From 04e53b9f0d57b6bf07260d92f05bed7555bfe209 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Wed, 28 Aug 2019 16:35:02 +0700 Subject: [PATCH 15/51] Driver.Real refactoring --- src/ATrade/Driver/Real.hs | 195 ++++++++++++++++++++++---------------- 1 file changed, 113 insertions(+), 82 deletions(-) diff --git a/src/ATrade/Driver/Real.hs b/src/ATrade/Driver/Real.hs index 9a667db..95c02cc 100644 --- a/src/ATrade/Driver/Real.hs +++ b/src/ATrade/Driver/Real.hs @@ -3,6 +3,10 @@ {-# LANGUAGE BangPatterns #-} {-# LANGUAGE CPP #-} {-# LANGUAGE RankNTypes #-} +{-# LANGUAGE TypeSynonymInstances #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE FlexibleContexts #-} module ATrade.Driver.Real ( Strategy(..), @@ -23,9 +27,10 @@ import System.Log.Handler.Simple import System.Log.Handler (setFormatter) import System.Log.Formatter import Control.Monad +import Control.Monad.Reader import Control.Concurrent hiding (writeChan, readChan, writeList2Chan, yield) import Control.Concurrent.BoundedChan as BC -import Control.Exception +import Control.Exception.Safe import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy as BL import qualified Data.List as L @@ -41,7 +46,7 @@ import Data.Maybe import Data.Monoid import Database.Redis hiding (info, decode) import ATrade.Types -import ATrade.RoboCom.Monad (StrategyMonad, StrategyAction(..), EventCallback, Event(..), runStrategyElement, StrategyEnvironment(..), Event(..)) +import ATrade.RoboCom.Monad (StrategyMonad, StrategyAction(..), EventCallback, Event(..), runStrategyElement, StrategyEnvironment(..), Event(..), MonadRobot(..)) import ATrade.BarAggregator import ATrade.Driver.Real.BrokerClientThread import ATrade.Driver.Real.QuoteSourceThread @@ -107,6 +112,44 @@ paramsParser = Params ( long "source-timeframe" <> metavar "SECONDS" )) +data Env c s = Env { + envStrategyInstanceParams :: StrategyInstanceParams, + envStrategyEnvironment :: IORef StrategyEnvironment, + envConfigRef :: IORef c, + envStateRef :: IORef s, + envBrokerChan :: BC.BoundedChan BrokerCommand, + envTimers :: IORef [UTCTime], + envEventChan :: BC.BoundedChan Event +} + +type App c s = ReaderT (Env c s) IO + +instance MonadRobot (App c s) c s where + submitOrder order = do + bc <- asks envBrokerChan + lift $ BC.writeChan bc $ BrokerSubmitOrder order + + cancelOrder oId = do + bc <- asks envBrokerChan + lift $ BC.writeChan bc $ BrokerCancelOrder oId + + appendToLog = lift . debugM "Strategy" . T.unpack + setupTimer t = do + timers <- asks envTimers + lift $ atomicModifyIORef' timers (\s -> (t : s, ())) + + enqueueIOAction actionId action = do + eventChan <- asks envEventChan + lift $ void $ forkIO $ do + v <- action + BC.writeChan eventChan $ ActionCompleted actionId v + + getConfig = asks envConfigRef >>= lift . readIORef + getState = asks envStateRef >>= lift . readIORef + setState s = do + ref <- asks envStateRef + lift $ writeIORef ref s + getEnvironment = asks envStrategyEnvironment >>= lift . readIORef data BigConfig c = BigConfig { confTickers :: [Ticker], @@ -190,8 +233,30 @@ robotMain dataDownloadDelta defaultState initCallback callback = do threadDelay 1000000 storeState params stateRef timersRef + straEnv <- newIORef StrategyEnvironment { + seInstanceId = strategyInstanceId . strategyInstanceParams $ strategy, + seAccount = strategyAccount . strategyInstanceParams $ strategy, + seVolume = strategyVolume . strategyInstanceParams $ strategy, + seBars = M.empty, + seLastTimestamp = UTCTime (fromGregorian 1970 1 1) 0 + } + -- Event channel is for strategy events, like new tick arrival, or order execution notification + eventChan <- BC.newBoundedChan 1000 + -- Orders channel passes strategy orders to broker thread + brokerChan <- BC.newBoundedChan 1000 + debugM "main" "Starting strategy driver" - barStrategyDriver (sourceBarTimeframe params) tickFilter strategy configRef stateRef timersRef shutdownMv `finally` killThread stateSavingThread + let env = Env { + envStrategyInstanceParams = instanceParams, + envStrategyEnvironment = straEnv, + envConfigRef = configRef, + envStateRef = stateRef, + envBrokerChan = brokerChan, + envTimers = timersRef, + envEventChan = eventChan + } + withContext (\ctx -> + runReaderT (barStrategyDriver ctx (sourceBarTimeframe params) tickFilter strategy configRef stateRef timersRef shutdownMv) env `finally` killThread stateSavingThread) where tickFilter :: Tick -> Bool tickFilter tick = @@ -293,102 +358,68 @@ mkBarStrategy instanceParams dd params initialState cb = BarStrategy { -- | Main function which handles incoming events (ticks/orders), passes them to strategy callback -- and executes returned strategy actions -barStrategyDriver :: Maybe Int -> (Tick -> Bool) -> Strategy c s -> IORef c -> IORef s -> IORef [UTCTime] -> MVar () -> IO () -barStrategyDriver mbSourceTimeframe tickFilter strategy configRef stateRef timersRef shutdownVar = do - -- Make channels - -- Event channel is for strategy events, like new tick arrival, or order execution notification - eventChan <- BC.newBoundedChan 1000 - -- Orders channel passes strategy orders to broker thread - ordersChan <- BC.newBoundedChan 1000 - - withContext (\ctx -> do - -- Load tickers data and create BarAggregator from them - historyBars <- - if - | (strategyHistoryProviderType . strategyInstanceParams) strategy == "finam" -> - M.fromList <$> mapM loadTickerFromFinam (tickers . strategyInstanceParams $ strategy) - | (strategyHistoryProviderType . strategyInstanceParams) strategy == "hap" -> - M.fromList <$> mapM (loadTickerFromHAP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy)) (tickers . strategyInstanceParams $ strategy) - | otherwise -> - M.fromList <$> mapM (loadTickerFromQHP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy)) (tickers . strategyInstanceParams $ strategy) - agg <- newIORef $ mkAggregatorFromBars historyBars [(hmsToDiffTime 6 50 0, hmsToDiffTime 21 0 0)] - bracket (startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter mbSourceTimeframe) killThread (\_ -> do - debugM "Strategy" "QuoteSource thread forked" - bracket (startBrokerClientThread (strategyInstanceId . strategyInstanceParams $ strategy) ctx brEp ordersChan eventChan shutdownVar) killThread (\_ -> do - debugM "Strategy" "Broker thread forked" - - wakeupTid <- forkIO $ forever $ do - maybeShutdown <- tryTakeMVar shutdownVar - if isJust maybeShutdown - then writeChan eventChan Shutdown - else do - threadDelay 1000000 - writeChan ordersChan BrokerRequestNotifications - debugM "Strategy" "Wakeup thread forked" - - let env = StrategyEnvironment { - seInstanceId = strategyInstanceId . strategyInstanceParams $ strategy, - seAccount = strategyAccount . strategyInstanceParams $ strategy, - seVolume = strategyVolume . strategyInstanceParams $ strategy, - seBars = M.empty, - seLastTimestamp = UTCTime (fromGregorian 1970 1 1) 0 - } - readAndHandleEvents agg ordersChan eventChan strategy env - debugM "Strategy" "Stopping strategy driver" - killThread wakeupTid))) - - debugM "Strategy" "Strategy done" +barStrategyDriver :: Context -> Maybe Int -> (Tick -> Bool) -> Strategy c s -> IORef c -> IORef s -> IORef [UTCTime] -> MVar () -> App c s () +barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef timersRef shutdownVar = do + -- Load tickers data and create BarAggregator from them + historyBars <- + lift $ if + | (strategyHistoryProviderType . strategyInstanceParams) strategy == "finam" -> + M.fromList <$> mapM loadTickerFromFinam (tickers . strategyInstanceParams $ strategy) + | (strategyHistoryProviderType . strategyInstanceParams) strategy == "hap" -> + M.fromList <$> mapM (loadTickerFromHAP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy)) (tickers . strategyInstanceParams $ strategy) + | otherwise -> + M.fromList <$> mapM (loadTickerFromQHP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy)) (tickers . strategyInstanceParams $ strategy) + + agg <- lift . newIORef $ mkAggregatorFromBars historyBars [(hmsToDiffTime 6 50 0, hmsToDiffTime 21 0 0)] + eventChan <- asks envEventChan + brokerChan <- asks envBrokerChan + bracket (lift $ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter mbSourceTimeframe) (lift . killThread) (\_ -> do + lift $ debugM "Strategy" "QuoteSource thread forked" + bracket (lift $ startBrokerClientThread (strategyInstanceId . strategyInstanceParams $ strategy) ctx brEp brokerChan eventChan shutdownVar) (lift . killThread) (\_ -> do + lift $ debugM "Strategy" "Broker thread forked" + + wakeupTid <- lift . forkIO $ forever $ do + maybeShutdown <- tryTakeMVar shutdownVar + if isJust maybeShutdown + then writeChan eventChan Shutdown + else do + threadDelay 1000000 + writeChan brokerChan BrokerRequestNotifications + lift $ debugM "Strategy" "Wakeup thread forked" + + readAndHandleEvents agg strategy + lift $ debugM "Strategy" "Stopping strategy driver" + lift $ killThread wakeupTid)) + + lift $ debugM "Strategy" "Strategy done" where qsEp = strategyQuotesourceEp . strategyInstanceParams $ strategy brEp = strategyBrokerEp . strategyInstanceParams $ strategy - readAndHandleEvents agg ordersChan eventChan strategy' env = do - event <- readChan eventChan + + readAndHandleEvents agg strategy' = do + eventChan <- asks envEventChan + event <- lift $ readChan eventChan if event /= Shutdown then do - currentBars <- bars <$> readIORef agg - params <- readIORef configRef - curState <- readIORef stateRef - let instId = strategyInstanceId . strategyInstanceParams $ strategy' - let acc = strategyAccount . strategyInstanceParams $ strategy' - let vol = strategyVolume . strategyInstanceParams $ strategy' - - let oldTimestamp = seLastTimestamp env + env <- getEnvironment let newTimestamp = case event of NewTick tick -> timestamp tick _ -> seLastTimestamp env newTimers <- catMaybes <$> (mapM (checkTimer eventChan newTimestamp) $ strategyTimers strategy') + (eventCallback strategy) event + lift $ writeIORef timersRef newTimers - let !newenv = env { seBars = currentBars, seLastTimestamp = newTimestamp } - let (!newState, !actions, _) = runStrategyElement params curState newenv $ (eventCallback strategy) event - writeIORef stateRef newState - writeIORef timersRef newTimers - - newTimers' <- catMaybes <$> mapM handleTimerActions actions - mapM_ (handleActions ordersChan) actions - readAndHandleEvents agg ordersChan eventChan (strategy' { currentState = newState, strategyTimers = newTimers ++ newTimers' }) newenv - else debugM "Strategy" "Shutdown requested" + readAndHandleEvents agg strategy' + else + lift $ debugM "Strategy" "Shutdown requested" where - handleTimerActions action = - case action of - ActionSetupTimer timerTime -> return $ Just timerTime - _ -> return Nothing - - handleActions ordersChan' action = - case action of - (ActionLog logText) -> debugM "Strategy" $ T.unpack logText - (ActionOrder order) -> writeChan ordersChan' $ BrokerSubmitOrder order - (ActionCancelOrder oid) -> writeChan ordersChan' $ BrokerCancelOrder oid - (ActionSetupTimer _) -> return () - (ActionIO tag io) -> void $ forkIO $ do - v <- io - writeChan eventChan (ActionCompleted tag v) checkTimer eventChan' newTimestamp timerTime = if newTimestamp >= timerTime then do - writeChan eventChan' $ TimerFired timerTime + lift $ writeChan eventChan' $ TimerFired timerTime return Nothing else return $ Just timerTime From 91de243a70c7a01d4f6e8491b3191b7e20062051 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Sun, 1 Sep 2019 11:09:46 +0700 Subject: [PATCH 16/51] More debug in QuoteSourceThread --- src/ATrade/Driver/Real/QuoteSourceThread.hs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ATrade/Driver/Real/QuoteSourceThread.hs b/src/ATrade/Driver/Real/QuoteSourceThread.hs index e5d3b1a..cd23dbb 100644 --- a/src/ATrade/Driver/Real/QuoteSourceThread.hs +++ b/src/ATrade/Driver/Real/QuoteSourceThread.hs @@ -31,7 +31,7 @@ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter maybeSourceTim bracket (startQuoteSourceClient tickChan tickersList ctx qsEp defaultClientSecurityParams) (\qs -> do stopQuoteSourceClient qs - debugM "Strategy" "Quotesource client: stop") + debugM "QSThread" "Quotesource client: stop") (\_ -> forever $ do qdata <- readChan tickChan case qdata of @@ -46,6 +46,7 @@ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter maybeSourceTim Just _ -> return () QDBar (incomingTf, bar) -> do aggValue <- readIORef agg + debugM "QSThread" $ "Incoming bar: " ++ show incomingTf ++ ": " ++ show bar case maybeSourceTimeframe of Just tf -> when (tf == unBarTimeframe incomingTf) $ case handleBar bar aggValue of From 2e0802a2fdad2ab7cbdb1760d45a2cc663afa388 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Sat, 7 Sep 2019 11:49:23 +0700 Subject: [PATCH 17/51] BarAggregator: increased max time --- src/ATrade/Driver/Real.hs | 2 +- src/ATrade/Driver/Real/QuoteSourceThread.hs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ATrade/Driver/Real.hs b/src/ATrade/Driver/Real.hs index 8877cdc..2653176 100644 --- a/src/ATrade/Driver/Real.hs +++ b/src/ATrade/Driver/Real.hs @@ -311,7 +311,7 @@ barStrategyDriver mbSourceTimeframe tickFilter strategy stateRef timersRef shutd M.fromList <$> mapM (loadTickerFromHAP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy)) (tickers . strategyInstanceParams $ strategy) | otherwise -> M.fromList <$> mapM (loadTickerFromQHP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy)) (tickers . strategyInstanceParams $ strategy) - agg <- newIORef $ mkAggregatorFromBars historyBars [(hmsToDiffTime 6 50 0, hmsToDiffTime 21 0 0)] + agg <- newIORef $ mkAggregatorFromBars historyBars [(hmsToDiffTime 6 50 0, hmsToDiffTime 21 10 0)] bracket (startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter mbSourceTimeframe) killThread (\_ -> do debugM "Strategy" "QuoteSource thread forked" bracket (startBrokerClientThread (strategyInstanceId . strategyInstanceParams $ strategy) ctx brEp ordersChan eventChan shutdownVar) killThread (\_ -> do diff --git a/src/ATrade/Driver/Real/QuoteSourceThread.hs b/src/ATrade/Driver/Real/QuoteSourceThread.hs index cd23dbb..7cad432 100644 --- a/src/ATrade/Driver/Real/QuoteSourceThread.hs +++ b/src/ATrade/Driver/Real/QuoteSourceThread.hs @@ -46,7 +46,7 @@ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter maybeSourceTim Just _ -> return () QDBar (incomingTf, bar) -> do aggValue <- readIORef agg - debugM "QSThread" $ "Incoming bar: " ++ show incomingTf ++ ": " ++ show bar + -- debugM "QSThread" $ "Incoming bar: " ++ show incomingTf ++ ": " ++ show bar case maybeSourceTimeframe of Just tf -> when (tf == unBarTimeframe incomingTf) $ case handleBar bar aggValue of From 5476bbf1ca7caa8905265611b2067b0e32ead8b0 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Sat, 7 Sep 2019 12:59:59 +0700 Subject: [PATCH 18/51] Debug --- src/ATrade/Driver/Real.hs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ATrade/Driver/Real.hs b/src/ATrade/Driver/Real.hs index 95c02cc..ce2720d 100644 --- a/src/ATrade/Driver/Real.hs +++ b/src/ATrade/Driver/Real.hs @@ -370,7 +370,7 @@ barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef t | otherwise -> M.fromList <$> mapM (loadTickerFromQHP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy)) (tickers . strategyInstanceParams $ strategy) - agg <- lift . newIORef $ mkAggregatorFromBars historyBars [(hmsToDiffTime 6 50 0, hmsToDiffTime 21 0 0)] + agg <- lift . newIORef $ mkAggregatorFromBars historyBars [(hmsToDiffTime 6 50 0, hmsToDiffTime 21 10 0)] eventChan <- asks envEventChan brokerChan <- asks envBrokerChan bracket (lift $ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter mbSourceTimeframe) (lift . killThread) (\_ -> do @@ -402,6 +402,7 @@ barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef t event <- lift $ readChan eventChan if event /= Shutdown then do + lift $ debugM "Strategy" $ "event: " ++ show event env <- getEnvironment let newTimestamp = case event of NewTick tick -> timestamp tick From 07a16fec1e7b7798bca289acab2c0451b06c2449 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Wed, 11 Sep 2019 20:03:39 +0700 Subject: [PATCH 19/51] Baraggregator bugfix --- src/ATrade/BarAggregator.hs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/ATrade/BarAggregator.hs b/src/ATrade/BarAggregator.hs index 55c54c0..0829274 100644 --- a/src/ATrade/BarAggregator.hs +++ b/src/ATrade/BarAggregator.hs @@ -204,8 +204,13 @@ handleBar bar = runState $ do lBars %= M.insert (barSecurity bar) series { bsBars = emptyBarFrom bar : (updateBar b bar : bs) } return . Just $ updateBar b bar else do - lBars %= M.insert (barSecurity bar) series { bsBars = bar : b : bs } - return . Just $ b + if barVolume b > 0 + then do + lBars %= M.insert (barSecurity bar) series { bsBars = emptyBarFrom bar : bar : b : bs } + return . Just $ bar + else do + lBars %= M.insert (barSecurity bar) series { bsBars = emptyBarFrom bar : bar : bs } + return . Just $ bar | otherwise -> return Nothing _ -> do lBars %= M.insert (barSecurity bar) series { bsBars = [bar] } From f4c4fc811cca603e56b029c26487a00f528ad72a Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Sun, 29 Sep 2019 16:29:57 +0700 Subject: [PATCH 20/51] bugfix: update strategy environment --- src/ATrade/Driver/Real.hs | 224 +++++++++++++++++++++----------------- 1 file changed, 122 insertions(+), 102 deletions(-) diff --git a/src/ATrade/Driver/Real.hs b/src/ATrade/Driver/Real.hs index ce2720d..7d870f9 100644 --- a/src/ATrade/Driver/Real.hs +++ b/src/ATrade/Driver/Real.hs @@ -119,7 +119,9 @@ data Env c s = Env { envStateRef :: IORef s, envBrokerChan :: BC.BoundedChan BrokerCommand, envTimers :: IORef [UTCTime], - envEventChan :: BC.BoundedChan Event + envEventChan :: BC.BoundedChan Event, + envAggregator :: IORef BarAggregator, + envLastTimestamp :: IORef UTCTime } type App c s = ReaderT (Env c s) IO @@ -149,7 +151,15 @@ instance MonadRobot (App c s) c s where setState s = do ref <- asks envStateRef lift $ writeIORef ref s - getEnvironment = asks envStrategyEnvironment >>= lift . readIORef + + getEnvironment = do + aggRef <- asks envAggregator + envRef <- asks envStrategyEnvironment + agg <- lift $ readIORef aggRef + env <- lift $ readIORef envRef + nowRef <- asks envLastTimestamp + now <- lift $ readIORef nowRef + return $ env { seBars = bars agg, seLastTimestamp = now } data BigConfig c = BigConfig { confTickers :: [Ticker], @@ -246,16 +256,31 @@ robotMain dataDownloadDelta defaultState initCallback callback = do brokerChan <- BC.newBoundedChan 1000 debugM "main" "Starting strategy driver" - let env = Env { - envStrategyInstanceParams = instanceParams, - envStrategyEnvironment = straEnv, - envConfigRef = configRef, - envStateRef = stateRef, - envBrokerChan = brokerChan, - envTimers = timersRef, - envEventChan = eventChan - } - withContext (\ctx -> + withContext (\ctx -> do + infoM "main" "Loading history" + -- Load tickers data and create BarAggregator from them + historyBars <- + if + | (strategyHistoryProviderType . strategyInstanceParams) strategy == "finam" -> + M.fromList <$> mapM (loadTickerFromFinam (downloadDelta strategy)) (tickers . strategyInstanceParams $ strategy) + | (strategyHistoryProviderType . strategyInstanceParams) strategy == "hap" -> + M.fromList <$> mapM (loadTickerFromHAP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy) (downloadDelta strategy)) (tickers . strategyInstanceParams $ strategy) + | otherwise -> + M.fromList <$> mapM (loadTickerFromQHP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy) (downloadDelta strategy)) (tickers . strategyInstanceParams $ strategy) + + agg <- newIORef $ mkAggregatorFromBars historyBars [(hmsToDiffTime 6 50 0, hmsToDiffTime 21 10 0)] + now <- getCurrentTime >>= newIORef + let env = Env { + envStrategyInstanceParams = instanceParams, + envStrategyEnvironment = straEnv, + envConfigRef = configRef, + envStateRef = stateRef, + envBrokerChan = brokerChan, + envTimers = timersRef, + envEventChan = eventChan, + envAggregator = agg, + envLastTimestamp = now + } runReaderT (barStrategyDriver ctx (sourceBarTimeframe params) tickFilter strategy configRef stateRef timersRef shutdownMv) env `finally` killThread stateSavingThread) where tickFilter :: Tick -> Bool @@ -344,89 +369,9 @@ robotMain dataDownloadDelta defaultState initCallback callback = do Just st -> return st Nothing -> return defaultState ) `catch` (\e -> warningM "main" ("Unable to load state: " ++ show (e :: IOException)) >> return defaultState) - --- | Helper function to make 'Strategy' instances -mkBarStrategy :: StrategyInstanceParams -> DiffTime -> c -> s -> EventCallback c s -> Strategy c s -mkBarStrategy instanceParams dd params initialState cb = BarStrategy { - downloadDelta = dd, - eventCallback = cb, - currentState = initialState, - strategyParams = params, - strategyTimers = [], - - strategyInstanceParams = instanceParams } - --- | Main function which handles incoming events (ticks/orders), passes them to strategy callback --- and executes returned strategy actions -barStrategyDriver :: Context -> Maybe Int -> (Tick -> Bool) -> Strategy c s -> IORef c -> IORef s -> IORef [UTCTime] -> MVar () -> App c s () -barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef timersRef shutdownVar = do - -- Load tickers data and create BarAggregator from them - historyBars <- - lift $ if - | (strategyHistoryProviderType . strategyInstanceParams) strategy == "finam" -> - M.fromList <$> mapM loadTickerFromFinam (tickers . strategyInstanceParams $ strategy) - | (strategyHistoryProviderType . strategyInstanceParams) strategy == "hap" -> - M.fromList <$> mapM (loadTickerFromHAP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy)) (tickers . strategyInstanceParams $ strategy) - | otherwise -> - M.fromList <$> mapM (loadTickerFromQHP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy)) (tickers . strategyInstanceParams $ strategy) - - agg <- lift . newIORef $ mkAggregatorFromBars historyBars [(hmsToDiffTime 6 50 0, hmsToDiffTime 21 10 0)] - eventChan <- asks envEventChan - brokerChan <- asks envBrokerChan - bracket (lift $ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter mbSourceTimeframe) (lift . killThread) (\_ -> do - lift $ debugM "Strategy" "QuoteSource thread forked" - bracket (lift $ startBrokerClientThread (strategyInstanceId . strategyInstanceParams $ strategy) ctx brEp brokerChan eventChan shutdownVar) (lift . killThread) (\_ -> do - lift $ debugM "Strategy" "Broker thread forked" - - wakeupTid <- lift . forkIO $ forever $ do - maybeShutdown <- tryTakeMVar shutdownVar - if isJust maybeShutdown - then writeChan eventChan Shutdown - else do - threadDelay 1000000 - writeChan brokerChan BrokerRequestNotifications - lift $ debugM "Strategy" "Wakeup thread forked" - - readAndHandleEvents agg strategy - lift $ debugM "Strategy" "Stopping strategy driver" - lift $ killThread wakeupTid)) - - lift $ debugM "Strategy" "Strategy done" - - where - qsEp = strategyQuotesourceEp . strategyInstanceParams $ strategy - brEp = strategyBrokerEp . strategyInstanceParams $ strategy - - readAndHandleEvents agg strategy' = do - eventChan <- asks envEventChan - event <- lift $ readChan eventChan - if event /= Shutdown - then do - lift $ debugM "Strategy" $ "event: " ++ show event - env <- getEnvironment - let newTimestamp = case event of - NewTick tick -> timestamp tick - _ -> seLastTimestamp env - - newTimers <- catMaybes <$> (mapM (checkTimer eventChan newTimestamp) $ strategyTimers strategy') - (eventCallback strategy) event - lift $ writeIORef timersRef newTimers - - readAndHandleEvents agg strategy' - else - lift $ debugM "Strategy" "Shutdown requested" - where - - checkTimer eventChan' newTimestamp timerTime = - if newTimestamp >= timerTime - then do - lift $ writeChan eventChan' $ TimerFired timerTime - return Nothing - else - return $ Just timerTime - - loadTickerFromHAP :: Context -> T.Text -> Ticker -> IO (TickerId, BarSeries) - loadTickerFromHAP ctx ep t = do + + loadTickerFromHAP :: Context -> T.Text -> DiffTime -> Ticker -> IO (TickerId, BarSeries) + loadTickerFromHAP ctx ep delta t = do debugM "Strategy" $ "Loading ticker from HAP: " ++ show (code t) case parseHAPPeriod $ timeframeSeconds t of Just tf -> do @@ -434,7 +379,7 @@ barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef t historyBars <- QH.getQuotes ctx QH.RequestParams { QH.endpoint = ep, QH.ticker = code t, - QH.startDate = addUTCTime (negate . (1 +) . fromRational . toRational $ downloadDelta strategy) now, + QH.startDate = addUTCTime (negate . (1 +) . fromRational . toRational $ delta) now, QH.endDate = now, QH.period = tf } debugM "Strategy" $ "Obtained " ++ show (length historyBars) ++ " bars" @@ -442,8 +387,8 @@ barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef t _ -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = [] }) - loadTickerFromQHP :: Context -> T.Text -> Ticker -> IO (TickerId, BarSeries) - loadTickerFromQHP ctx ep t = do + loadTickerFromQHP :: Context -> T.Text -> DiffTime -> Ticker -> IO (TickerId, BarSeries) + loadTickerFromQHP ctx ep delta t = do debugM "Strategy" $ "Loading ticker from QHP: " ++ show (code t) case parseQHPPeriod $ timeframeSeconds t of Just tf -> do @@ -451,7 +396,7 @@ barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef t historyBars <- QQ.getQuotes ctx QQ.RequestParams { QQ.endpoint = ep, QQ.ticker = code t, - QQ.startDate = addDays (negate . (1 +) . ceiling $ downloadDelta strategy / 86400) (utctDay now), + QQ.startDate = addDays (negate . (1 +) . ceiling $ delta / 86400) (utctDay now), QQ.endDate = utctDay now, QQ.period = tf } debugM "Strategy" $ "Obtained " ++ show (length historyBars) ++ " bars" @@ -460,8 +405,8 @@ barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef t _ -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = [] }) - loadTickerFromFinam :: Ticker -> IO (TickerId, BarSeries) - loadTickerFromFinam t = do + loadTickerFromFinam :: DiffTime -> Ticker -> IO (TickerId, BarSeries) + loadTickerFromFinam delta t = do randDelay <- getStdRandom (randomR (1, 5)) threadDelay $ randDelay * 1000000 now <- getCurrentTime @@ -470,7 +415,7 @@ barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef t (Just finamCode, Just per) -> do debugM "Strategy" $ "Downloading ticker: " ++ finamCode history <- downloadAndParseQuotes $ defaultParams { QF.ticker = T.pack finamCode, - QF.startDate = addDays (negate . (1 +) . ceiling $ downloadDelta strategy / 86400) (utctDay now), + QF.startDate = addDays (negate . (1 +) . ceiling $ delta / 86400) (utctDay now), QF.endDate = utctDay now, QF.period = per } case history of @@ -516,3 +461,78 @@ barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef t | x == 24 * 60 * 60 = Just QH.PeriodDay | otherwise = Nothing + +-- | Helper function to make 'Strategy' instances +mkBarStrategy :: StrategyInstanceParams -> DiffTime -> c -> s -> EventCallback c s -> Strategy c s +mkBarStrategy instanceParams dd params initialState cb = BarStrategy { + downloadDelta = dd, + eventCallback = cb, + currentState = initialState, + strategyParams = params, + strategyTimers = [], + + strategyInstanceParams = instanceParams } + +-- | Main function which handles incoming events (ticks/orders), passes them to strategy callback +-- and executes returned strategy actions +barStrategyDriver :: Context -> Maybe Int -> (Tick -> Bool) -> Strategy c s -> IORef c -> IORef s -> IORef [UTCTime] -> MVar () -> App c s () +barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef timersRef shutdownVar = do + eventChan <- asks envEventChan + brokerChan <- asks envBrokerChan + agg <- asks envAggregator + bracket (lift $ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter mbSourceTimeframe) (lift . killThread) (\_ -> do + lift $ debugM "Strategy" "QuoteSource thread forked" + bracket (lift $ startBrokerClientThread (strategyInstanceId . strategyInstanceParams $ strategy) ctx brEp brokerChan eventChan shutdownVar) (lift . killThread) (\_ -> do + lift $ debugM "Strategy" "Broker thread forked" + + wakeupTid <- lift . forkIO $ forever $ do + maybeShutdown <- tryTakeMVar shutdownVar + if isJust maybeShutdown + then writeChan eventChan Shutdown + else do + threadDelay 1000000 + writeChan brokerChan BrokerRequestNotifications + lift $ debugM "Strategy" "Wakeup thread forked" + + readAndHandleEvents agg strategy + lift $ debugM "Strategy" "Stopping strategy driver" + lift $ killThread wakeupTid)) + + lift $ debugM "Strategy" "Strategy done" + + where + qsEp = strategyQuotesourceEp . strategyInstanceParams $ strategy + brEp = strategyBrokerEp . strategyInstanceParams $ strategy + + readAndHandleEvents agg strategy' = do + eventChan <- asks envEventChan + event <- lift $ readChan eventChan + if event /= Shutdown + then do + case event of + NewTick _ -> return () + _ -> lift $ debugM "Strategy" $ "event: " ++ show event + env <- getEnvironment + let newTimestamp = case event of + NewTick tick -> timestamp tick + _ -> seLastTimestamp env + nowRef <- asks envLastTimestamp + lift $ writeIORef nowRef newTimestamp + + newTimers <- catMaybes <$> (mapM (checkTimer eventChan newTimestamp) $ strategyTimers strategy') + (eventCallback strategy) event + lift $ writeIORef timersRef newTimers + + readAndHandleEvents agg strategy' + else + lift $ debugM "Strategy" "Shutdown requested" + where + + checkTimer eventChan' newTimestamp timerTime = + if newTimestamp >= timerTime + then do + lift $ writeChan eventChan' $ TimerFired timerTime + return Nothing + else + return $ Just timerTime + From 7f12ce1e644a9f6c22cf9a55206430c7a8f36176 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Sun, 29 Sep 2019 17:19:53 +0700 Subject: [PATCH 21/51] Bar aggregator: use incoming bars as is --- src/ATrade/BarAggregator.hs | 60 ++++++------------------------------- 1 file changed, 9 insertions(+), 51 deletions(-) diff --git a/src/ATrade/BarAggregator.hs b/src/ATrade/BarAggregator.hs index 0829274..7389745 100644 --- a/src/ATrade/BarAggregator.hs +++ b/src/ATrade/BarAggregator.hs @@ -186,56 +186,14 @@ updateTime tick = runState $ do handleBar :: Bar -> BarAggregator -> (Maybe Bar, BarAggregator) handleBar bar = runState $ do - tws <- gets tickTimeWindows mybars <- gets bars - if (any (isInTimeInterval bar) tws) - then - case M.lookup (barSecurity bar) mybars of - Just series -> case bsBars series of - (b:bs) -> do - let currentBn = barNumber (barTimestamp b) (tfSeconds $ bsTimeframe series) - if - | currentBn == barNumber (barTimestamp bar) (tfSeconds $ bsTimeframe series) -> do - lBars %= M.insert (barSecurity bar) series { bsBars = updateBar b bar : bs } - return Nothing - | currentBn < barNumber (barTimestamp bar) (tfSeconds $ bsTimeframe series) -> do - if barEndTime b (tfSeconds $ bsTimeframe series) == barTimestamp bar - then do - lBars %= M.insert (barSecurity bar) series { bsBars = emptyBarFrom bar : (updateBar b bar : bs) } - return . Just $ updateBar b bar - else do - if barVolume b > 0 - then do - lBars %= M.insert (barSecurity bar) series { bsBars = emptyBarFrom bar : bar : b : bs } - return . Just $ bar - else do - lBars %= M.insert (barSecurity bar) series { bsBars = emptyBarFrom bar : bar : bs } - return . Just $ bar - | otherwise -> return Nothing - _ -> do - lBars %= M.insert (barSecurity bar) series { bsBars = [bar] } - return Nothing - _ -> return Nothing - else - return Nothing - where - isInTimeInterval bar' (a, b) = (utctDayTime . barTimestamp) bar' >= a && (utctDayTime . barTimestamp) bar' <= b - updateBar !bar' newbar = - let newHigh = max (barHigh bar') (barHigh newbar) - newLow = min (barLow bar') (barLow newbar) in - bar' { - barTimestamp = barTimestamp newbar, - barHigh = newHigh, - barLow = newLow, - barClose = barClose newbar, - barVolume = barVolume bar' + (abs . barVolume $ newbar) } - - emptyBarFrom bar' = Bar { - barSecurity = barSecurity bar', - barTimestamp = 0.000001 `addUTCTime` barTimestamp bar', - barOpen = barClose bar', - barHigh = barClose bar', - barLow = barClose bar', - barClose = barClose bar', - barVolume = 0 } + case M.lookup (barSecurity bar) mybars of + Just series -> case bsBars series of + (b:bs) -> do + lBars %= M.insert (barSecurity bar) series { bsBars = bar : b : bs } + return . Just $ b + _ -> do + lBars %= M.insert (barSecurity bar) series { bsBars = [bar] } + return Nothing + _ -> return Nothing From daeb7b0cb2f9d7f1cde462083f265e749d4f33de Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Sat, 5 Oct 2019 14:45:00 +0700 Subject: [PATCH 22/51] BarAggregator: return correct bar --- src/ATrade/BarAggregator.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ATrade/BarAggregator.hs b/src/ATrade/BarAggregator.hs index 7389745..12a1329 100644 --- a/src/ATrade/BarAggregator.hs +++ b/src/ATrade/BarAggregator.hs @@ -191,7 +191,7 @@ handleBar bar = runState $ do Just series -> case bsBars series of (b:bs) -> do lBars %= M.insert (barSecurity bar) series { bsBars = bar : b : bs } - return . Just $ b + return . Just $ bar _ -> do lBars %= M.insert (barSecurity bar) series { bsBars = [bar] } return Nothing From 6880925a272e4082ac1c3b034e1d7d5ecb2cddb1 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Sat, 18 Jan 2020 19:51:33 +0700 Subject: [PATCH 23/51] Backtest++ --- src/ATrade/Driver/Backtest.hs | 15 ++++++++++++++- src/ATrade/RoboCom/Indicators.hs | 2 +- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/src/ATrade/Driver/Backtest.hs b/src/ATrade/Driver/Backtest.hs index b88d08c..9565c24 100644 --- a/src/ATrade/Driver/Backtest.hs +++ b/src/ATrade/Driver/Backtest.hs @@ -1,6 +1,8 @@ {-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE LambdaCase #-} +{-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE QuasiQuotes #-} {-# LANGUAGE RankNTypes #-} @@ -16,7 +18,7 @@ import ATrade.Driver.Types (InitializationCallback, import ATrade.Exceptions import ATrade.Quotes.Finam as QF import ATrade.RoboCom.Monad (Event (..), EventCallback, - StrategyAction (..), + MonadRobot (..), StrategyAction (..), StrategyEnvironment (..), appendToLog, runStrategyElement, st) import ATrade.RoboCom.Positions @@ -314,3 +316,14 @@ defaultBacktestState s c tickerList = BacktestState 0 s c (StrategyEnvironment " newtype BacktestingMonad s c a = BacktestingMonad { unBacktestingMonad :: State (BacktestState s c) a } deriving (Functor, Applicative, Monad, MonadState (BacktestState s c)) +instance MonadRobot (BacktestingMonad s c) s c where + submitOrder order = undefined + cancelOrder oid = undefined + appendToLog txt = undefined + setupTimer time = undefined + enqueueIOAction actionId action = undefined + getConfig = undefined + getState = undefined + setState s = undefined + getEnvironment = undefined + diff --git a/src/ATrade/RoboCom/Indicators.hs b/src/ATrade/RoboCom/Indicators.hs index 31109a1..9907bae 100644 --- a/src/ATrade/RoboCom/Indicators.hs +++ b/src/ATrade/RoboCom/Indicators.hs @@ -40,7 +40,7 @@ cci period bars = (head tp - tpMean) / (0.015 * meanDev) typicalPrice a b c = (a + b + c) / 3 atr :: Int -> [Bar] -> Double -atr period bars = case reverse (take (5 * period) trueranges) of +atr period bars = case reverse (take (10 * period) trueranges) of (firstValue:rest) -> foldl (\x y -> (x * (period' - 1) + y) / period') firstValue rest _ -> 0 where From 03d60a5ccd4a9f6160cb9f9fccfd487721db8cb5 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Tue, 28 Jan 2020 19:05:05 +0700 Subject: [PATCH 24/51] Merge commit 'a3464483ea429aafa5049b32886490f2373a6a5e' into stable --- robocom-zero.cabal | 2 + src/ATrade/BarAggregator.hs | 53 ++++++--------------- src/ATrade/Driver/Real/QuoteSourceThread.hs | 3 +- src/ATrade/RoboCom.hs | 26 ++++++++++ 4 files changed, 44 insertions(+), 40 deletions(-) create mode 100644 src/ATrade/RoboCom.hs diff --git a/robocom-zero.cabal b/robocom-zero.cabal index 3945aab..67586d1 100644 --- a/robocom-zero.cabal +++ b/robocom-zero.cabal @@ -28,6 +28,8 @@ library , ATrade.Driver.Real , ATrade.Driver.Backtest , ATrade.BarAggregator + , ATrade.RoboCom + other-modules: Paths_robocom_zero build-depends: base >= 4.7 && < 5 , libatrade >= 0.9.0.0 && < 0.10.0.0 , text diff --git a/src/ATrade/BarAggregator.hs b/src/ATrade/BarAggregator.hs index 55c54c0..7d09e68 100644 --- a/src/ATrade/BarAggregator.hs +++ b/src/ATrade/BarAggregator.hs @@ -186,51 +186,26 @@ updateTime tick = runState $ do handleBar :: Bar -> BarAggregator -> (Maybe Bar, BarAggregator) handleBar bar = runState $ do - tws <- gets tickTimeWindows mybars <- gets bars - if (any (isInTimeInterval bar) tws) - then - case M.lookup (barSecurity bar) mybars of - Just series -> case bsBars series of - (b:bs) -> do - let currentBn = barNumber (barTimestamp b) (tfSeconds $ bsTimeframe series) - if - | currentBn == barNumber (barTimestamp bar) (tfSeconds $ bsTimeframe series) -> do - lBars %= M.insert (barSecurity bar) series { bsBars = updateBar b bar : bs } - return Nothing - | currentBn < barNumber (barTimestamp bar) (tfSeconds $ bsTimeframe series) -> do - if barEndTime b (tfSeconds $ bsTimeframe series) == barTimestamp bar - then do - lBars %= M.insert (barSecurity bar) series { bsBars = emptyBarFrom bar : (updateBar b bar : bs) } - return . Just $ updateBar b bar - else do - lBars %= M.insert (barSecurity bar) series { bsBars = bar : b : bs } - return . Just $ b - | otherwise -> return Nothing - _ -> do - lBars %= M.insert (barSecurity bar) series { bsBars = [bar] } - return Nothing - _ -> return Nothing - else - return Nothing + case M.lookup (barSecurity bar) mybars of + Just series -> case bsBars series of + (_:bs) -> do + lBars %= M.insert (barSecurity bar) series { bsBars = emptyBarFrom bar : bar : bs } + return . Just $ bar + _ -> do + lBars %= M.insert (barSecurity bar) series { bsBars = emptyBarFrom bar : [bar] } + return Nothing + _ -> return Nothing where - isInTimeInterval bar' (a, b) = (utctDayTime . barTimestamp) bar' >= a && (utctDayTime . barTimestamp) bar' <= b - updateBar !bar' newbar = - let newHigh = max (barHigh bar') (barHigh newbar) - newLow = min (barLow bar') (barLow newbar) in - bar' { - barTimestamp = barTimestamp newbar, - barHigh = newHigh, - barLow = newLow, - barClose = barClose newbar, - barVolume = barVolume bar' + (abs . barVolume $ newbar) } - - emptyBarFrom bar' = Bar { + emptyBarFrom bar' = newBar + where + newBar = Bar { barSecurity = barSecurity bar', - barTimestamp = 0.000001 `addUTCTime` barTimestamp bar', + barTimestamp = barTimestamp bar', barOpen = barClose bar', barHigh = barClose bar', barLow = barClose bar', barClose = barClose bar', barVolume = 0 } + diff --git a/src/ATrade/Driver/Real/QuoteSourceThread.hs b/src/ATrade/Driver/Real/QuoteSourceThread.hs index 51891df..9ad36b9 100644 --- a/src/ATrade/Driver/Real/QuoteSourceThread.hs +++ b/src/ATrade/Driver/Real/QuoteSourceThread.hs @@ -31,7 +31,7 @@ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter maybeSourceTim bracket (startQuoteSourceClient tickChan tickersList ctx qsEp defaultClientSecurityParams) (\qs -> do stopQuoteSourceClient qs - debugM "Strategy" "Quotesource client: stop") + debugM "QSThread" "Quotesource client: stop") (\_ -> forever $ do qdata <- readChan tickChan case qdata of @@ -46,6 +46,7 @@ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter maybeSourceTim Just _ -> return () QDBar (incomingTf, bar) -> do aggValue <- readIORef agg + -- debugM "QSThread" $ "Incoming bar: " ++ show incomingTf ++ ": " ++ show bar case maybeSourceTimeframe of Just tf -> when (tf == unBarTimeframe incomingTf) $ case handleBar bar aggValue of diff --git a/src/ATrade/RoboCom.hs b/src/ATrade/RoboCom.hs new file mode 100644 index 0000000..9d3a2bc --- /dev/null +++ b/src/ATrade/RoboCom.hs @@ -0,0 +1,26 @@ +{-# LANGUAGE TemplateHaskell #-} + +module ATrade.RoboCom +( + robocom_version +) where + +import Data.Version +import Paths_robocom_zero + +import Development.GitRev + +robocom_version :: Version +robocom_version = version + +robocom_gitrev :: String +robocom_gitrev = concat [ "robocom-zero-", + $(gitBranch), + "@", + $(gitHash), + dirty ] + where + dirty | $(gitDirty) = "+" + | otherwise = "" + + From 356bc279b519316f898a56d4e8f65ecef165835d Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Tue, 28 Jan 2020 19:09:13 +0700 Subject: [PATCH 25/51] Export robocom_gitrev --- robocom-zero.cabal | 1 + src/ATrade/RoboCom.hs | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/robocom-zero.cabal b/robocom-zero.cabal index 67586d1..befc1f6 100644 --- a/robocom-zero.cabal +++ b/robocom-zero.cabal @@ -65,6 +65,7 @@ library , signal , random , hedis + , gitrev default-language: Haskell2010 other-modules: ATrade.Exceptions diff --git a/src/ATrade/RoboCom.hs b/src/ATrade/RoboCom.hs index 9d3a2bc..a57b83a 100644 --- a/src/ATrade/RoboCom.hs +++ b/src/ATrade/RoboCom.hs @@ -2,7 +2,8 @@ module ATrade.RoboCom ( - robocom_version + robocom_version + , robocom_gitrev ) where import Data.Version From 84da78268a798caa975444c0750628040291719b Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Sun, 2 Feb 2020 11:56:21 +0700 Subject: [PATCH 26/51] tweak(monad): lenses in StrategyEnvironment accessors --- src/ATrade/Driver/Backtest.hs | 9 +++--- src/ATrade/Driver/Real.hs | 17 +++++------ src/ATrade/RoboCom/Monad.hs | 19 ++++++++---- src/ATrade/RoboCom/Positions.hs | 51 +++++++++++++++++---------------- 4 files changed, 53 insertions(+), 43 deletions(-) diff --git a/src/ATrade/Driver/Backtest.hs b/src/ATrade/Driver/Backtest.hs index 9565c24..450cb59 100644 --- a/src/ATrade/Driver/Backtest.hs +++ b/src/ATrade/Driver/Backtest.hs @@ -20,7 +20,8 @@ import ATrade.Quotes.Finam as QF import ATrade.RoboCom.Monad (Event (..), EventCallback, MonadRobot (..), StrategyAction (..), StrategyEnvironment (..), - appendToLog, runStrategyElement, st) + appendToLog, runStrategyElement, + seBars, seLastTimestamp, st) import ATrade.RoboCom.Positions import ATrade.RoboCom.Types (BarSeries (..), Ticker (..), Timeframe (..)) @@ -28,6 +29,7 @@ import ATrade.Types import Conduit (awaitForever, runConduit, yield, (.|)) import Control.Exception.Safe +import Control.Lens import Control.Monad.ST (runST) import Control.Monad.State import Data.Aeson (FromJSON (..), Result (..), @@ -162,9 +164,8 @@ backtestMain dataDownloadDelta defaultState initCallback callback = do backtestLoop = awaitForever (\bar -> do env <- gets strategyEnvironment - let oldTimestamp = seLastTimestamp env let newTimestamp = barTimestamp bar - let newenv = env { seBars = updateBars (seBars env) bar, seLastTimestamp = newTimestamp } + let newenv = env & seBars %~ (flip updateBars $ bar) & seLastTimestamp .~ newTimestamp curState <- gets robotState modify' (\s -> s { strategyEnvironment = newenv }) handleEvents [NewBar bar]) @@ -232,7 +233,7 @@ backtestMain dataDownloadDelta defaultState initCallback callback = do order `executeAtPrice` barOpen bar executeAtPrice order price = do - ts <- seLastTimestamp <$> gets strategyEnvironment + ts <- view seLastTimestamp <$> gets strategyEnvironment modify' (\s -> s { tradesLog = mkTrade order price ts : tradesLog s }) return $ OrderUpdate (orderId order) Executed diff --git a/src/ATrade/Driver/Real.hs b/src/ATrade/Driver/Real.hs index 7d870f9..2611f41 100644 --- a/src/ATrade/Driver/Real.hs +++ b/src/ATrade/Driver/Real.hs @@ -31,6 +31,7 @@ import Control.Monad.Reader import Control.Concurrent hiding (writeChan, readChan, writeList2Chan, yield) import Control.Concurrent.BoundedChan as BC import Control.Exception.Safe +import Control.Lens hiding (Context, (.=)) import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy as BL import qualified Data.List as L @@ -46,7 +47,7 @@ import Data.Maybe import Data.Monoid import Database.Redis hiding (info, decode) import ATrade.Types -import ATrade.RoboCom.Monad (StrategyMonad, StrategyAction(..), EventCallback, Event(..), runStrategyElement, StrategyEnvironment(..), Event(..), MonadRobot(..)) +import ATrade.RoboCom.Monad (StrategyMonad, StrategyAction(..), EventCallback, Event(..), runStrategyElement, StrategyEnvironment(..), seBars, seLastTimestamp, Event(..), MonadRobot(..)) import ATrade.BarAggregator import ATrade.Driver.Real.BrokerClientThread import ATrade.Driver.Real.QuoteSourceThread @@ -159,7 +160,7 @@ instance MonadRobot (App c s) c s where env <- lift $ readIORef envRef nowRef <- asks envLastTimestamp now <- lift $ readIORef nowRef - return $ env { seBars = bars agg, seLastTimestamp = now } + return $ env & seBars .~ bars agg & seLastTimestamp .~ now data BigConfig c = BigConfig { confTickers :: [Ticker], @@ -244,11 +245,11 @@ robotMain dataDownloadDelta defaultState initCallback callback = do storeState params stateRef timersRef straEnv <- newIORef StrategyEnvironment { - seInstanceId = strategyInstanceId . strategyInstanceParams $ strategy, - seAccount = strategyAccount . strategyInstanceParams $ strategy, - seVolume = strategyVolume . strategyInstanceParams $ strategy, - seBars = M.empty, - seLastTimestamp = UTCTime (fromGregorian 1970 1 1) 0 + _seInstanceId = strategyInstanceId . strategyInstanceParams $ strategy, + _seAccount = strategyAccount . strategyInstanceParams $ strategy, + _seVolume = strategyVolume . strategyInstanceParams $ strategy, + _seBars = M.empty, + _seLastTimestamp = UTCTime (fromGregorian 1970 1 1) 0 } -- Event channel is for strategy events, like new tick arrival, or order execution notification eventChan <- BC.newBoundedChan 1000 @@ -515,7 +516,7 @@ barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef t env <- getEnvironment let newTimestamp = case event of NewTick tick -> timestamp tick - _ -> seLastTimestamp env + _ -> env ^. seLastTimestamp nowRef <- asks envLastTimestamp lift $ writeIORef nowRef newTimestamp diff --git a/src/ATrade/RoboCom/Monad.hs b/src/ATrade/RoboCom/Monad.hs index 14b734d..71c8af8 100644 --- a/src/ATrade/RoboCom/Monad.hs +++ b/src/ATrade/RoboCom/Monad.hs @@ -14,6 +14,11 @@ module ATrade.RoboCom.Monad ( RActions, REnv, StrategyEnvironment(..), + seInstanceId, + seAccount, + seVolume, + seBars, + seLastTimestamp, StrategyElement, runStrategyElement, EventCallback, @@ -31,12 +36,12 @@ import ATrade.Types import Ether +import Control.Lens import Data.Aeson.Types import qualified Data.Text as T import Data.Time.Clock import Text.Printf.TH - class (Monad m) => MonadRobot m c s | m -> c, m -> s where submitOrder :: Order -> m () cancelOrder :: OrderId -> m () @@ -84,12 +89,14 @@ data StrategyAction = ActionOrder Order | ActionIO Int (IO Value) data StrategyEnvironment = StrategyEnvironment { - seInstanceId :: !T.Text, -- ^ Strategy instance identifier. Should be unique among all strategies (very desirable) - seAccount :: !T.Text, -- ^ Account string to use for this strategy instance. Broker-dependent - seVolume :: !Int, -- ^ Volume to use for this instance (in lots/contracts) - seBars :: !Bars, -- ^ List of tickers which is used by this strategy - seLastTimestamp :: !UTCTime + _seInstanceId :: !T.Text, -- ^ Strategy instance identifier. Should be unique among all strategies (very desirable) + _seAccount :: !T.Text, -- ^ Account string to use for this strategy instance. Broker-dependent + _seVolume :: !Int, -- ^ Volume to use for this instance (in lots/contracts) + _seBars :: !Bars, -- ^ List of tickers which is used by this strategy + _seLastTimestamp :: !UTCTime } deriving (Eq) +makeLenses ''StrategyEnvironment + instance Show StrategyAction where show (ActionOrder order) = "ActionOrder " ++ show order diff --git a/src/ATrade/RoboCom/Positions.hs b/src/ATrade/RoboCom/Positions.hs index 05df475..8b2d372 100644 --- a/src/ATrade/RoboCom/Positions.hs +++ b/src/ATrade/RoboCom/Positions.hs @@ -73,6 +73,7 @@ import ATrade.RoboCom.Monad import ATrade.RoboCom.Types import ATrade.Types +import Control.Lens import Control.Monad import Ether @@ -186,7 +187,7 @@ dispatchPosition event pos = case posState pos of PositionCancelled -> handlePositionCancelled pos where handlePositionWaitingOpenSubmission pendingOrder = do - lastTs <- seLastTimestamp <$> getEnvironment + lastTs <- view seLastTimestamp <$> getEnvironment if orderDeadline (posSubmissionDeadline pos) lastTs then return $ pos { posState = PositionCancelled } -- TODO call TimeoutHandler if present else case event of @@ -199,7 +200,7 @@ dispatchPosition event pos = case posState pos of _ -> return pos handlePositionWaitingOpen = do - lastTs <- seLastTimestamp <$> getEnvironment + lastTs <- view seLastTimestamp <$> getEnvironment case posCurrentOrder pos of Just order -> if orderDeadline (posExecutionDeadline pos) lastTs then do -- TODO call TimeoutHandler @@ -238,7 +239,7 @@ dispatchPosition event pos = case posState pos of return pos handlePositionOpen = do - lastTs <- seLastTimestamp <$> getEnvironment + lastTs <- view seLastTimestamp <$> getEnvironment if | orderDeadline (posSubmissionDeadline pos) lastTs -> do appendToLog $ [st|PositionId: %? : Missed submission deadline: %?, remaining in PositionOpen state|] (posId pos) (posSubmissionDeadline pos) @@ -261,7 +262,7 @@ dispatchPosition event pos = case posState pos of _ -> return pos handlePositionWaitingPendingCancellation = do - lastTs <- seLastTimestamp <$> getEnvironment + lastTs <- view seLastTimestamp <$> getEnvironment if not $ orderDeadline (posSubmissionDeadline pos) lastTs then case (event, posCurrentOrder pos, posNextState pos) of (OrderUpdate _ newstate, Just _, Just (PositionWaitingCloseSubmission nextOrder)) -> @@ -280,7 +281,7 @@ dispatchPosition event pos = case posState pos of return pos { posState = PositionCancelled } handlePositionWaitingCloseSubmission pendingOrder = do - lastTs <- seLastTimestamp <$> getEnvironment + lastTs <- view seLastTimestamp <$> getEnvironment if orderDeadline (posSubmissionDeadline pos) lastTs then do case posCurrentOrder pos of @@ -297,7 +298,7 @@ dispatchPosition event pos = case posState pos of _ -> return pos handlePositionWaitingClose = do - lastTs <- seLastTimestamp <$> getEnvironment + lastTs <- view seLastTimestamp <$> getEnvironment if orderDeadline (posExecutionDeadline pos) lastTs then do case posCurrentOrder pos of @@ -335,7 +336,7 @@ dispatchPosition event pos = case posState pos of newPosition :: (StateHasPositions s, MonadRobot m c s) => Order -> T.Text -> TickerId -> Operation -> Int -> NominalDiffTime -> m Position newPosition order account tickerId operation quantity submissionDeadline = do - lastTs <- seLastTimestamp <$> getEnvironment + lastTs <- view seLastTimestamp <$> getEnvironment let position = Position { posId = [st|%?/%?/%?/%?/%?|] account tickerId operation quantity lastTs, posAccount = account, @@ -359,7 +360,7 @@ newPosition order account tickerId operation quantity submissionDeadline = do reapDeadPositions :: (StateHasPositions s) => EventCallback c s reapDeadPositions _ = do - ts <- seLastTimestamp <$> getEnvironment + ts <- view seLastTimestamp <$> getEnvironment when (floor (utctDayTime ts) `mod` 300 == 0) $ modifyPositions (L.filter (not . posIsDead)) defaultHandler :: (StateHasPositions s) => EventCallback c s @@ -377,15 +378,15 @@ modifyPosition f oldpos = do getCurrentTicker :: (ParamsHasMainTicker c, MonadRobot m c s) => m [Bar] getCurrentTicker = do - bars <- seBars <$> getEnvironment - maybeBars <- flip M.lookup bars . mainTicker <$> getConfig + mainTicker' <- mainTicker <$> getConfig + maybeBars <- view (seBars . at mainTicker') <$> getEnvironment case maybeBars of Just b -> return $ bsBars b _ -> return [] getCurrentTickerSeries :: (ParamsHasMainTicker c, MonadRobot m c s) => m (Maybe BarSeries) getCurrentTickerSeries = do - bars <- seBars <$> getEnvironment + bars <- view seBars <$> getEnvironment flip M.lookup bars . mainTicker <$> getConfig getLastActivePosition :: (StateHasPositions s, MonadRobot m c s) => m (Maybe Position) @@ -449,7 +450,7 @@ onActionCompletedEvent event f = case event of enterAtMarket :: (StateHasPositions s, ParamsHasMainTicker c, MonadRobot m c s) => T.Text -> Operation -> m Position enterAtMarket signalName operation = do env <- getEnvironment - enterAtMarketWithParams (seAccount env) (seVolume env) (SignalId (seInstanceId env) signalName "") operation + enterAtMarketWithParams (env ^. seAccount) (env ^. seVolume) (SignalId (env ^. seInstanceId) signalName "") operation enterAtMarketWithParams :: (StateHasPositions s, ParamsHasMainTicker c, MonadRobot m c s) => T.Text -> Int -> SignalId -> Operation -> m Position enterAtMarketWithParams account quantity signalId operation = do @@ -469,12 +470,12 @@ enterAtMarketWithParams account quantity signalId operation = do enterAtLimit :: (StateHasPositions s, ParamsHasMainTicker c, MonadRobot m c s) => NominalDiffTime -> T.Text -> Price -> Operation -> m Position enterAtLimit timeToCancel signalName price operation = do env <- getEnvironment - enterAtLimitWithParams timeToCancel (seAccount env) (seVolume env) (SignalId (seInstanceId env) signalName "") price operation + enterAtLimitWithParams timeToCancel (env ^. seAccount) (env ^. seVolume) (SignalId (env ^. seInstanceId) signalName "") price operation enterAtLimitWithVolume :: (StateHasPositions s, ParamsHasMainTicker c, MonadRobot m c s) => NominalDiffTime -> T.Text -> Price -> Int -> Operation -> m Position enterAtLimitWithVolume timeToCancel signalName price vol operation = do - acc <- seAccount <$> getEnvironment - inst <- seInstanceId <$> getEnvironment + acc <- view seAccount <$> getEnvironment + inst <- view seInstanceId <$> getEnvironment enterAtLimitWithParams timeToCancel acc vol (SignalId inst signalName "") price operation enterAtLimitWithParams :: (StateHasPositions s, ParamsHasMainTicker c, MonadRobot m c s) => NominalDiffTime -> T.Text -> Int -> SignalId -> Price -> Operation -> m Position @@ -484,20 +485,20 @@ enterAtLimitWithParams timeToCancel account quantity signalId price operation = enterAtLimitForTickerWithVolume :: (StateHasPositions s, MonadRobot m c s) => TickerId -> NominalDiffTime -> T.Text -> Price -> Int -> Operation -> m Position enterAtLimitForTickerWithVolume tickerId timeToCancel signalName price vol operation = do - acc <- seAccount <$> getEnvironment - inst <- seInstanceId <$> getEnvironment + acc <- view seAccount <$> getEnvironment + inst <- view seInstanceId <$> getEnvironment enterAtLimitForTickerWithParams tickerId timeToCancel acc vol (SignalId inst signalName "") price operation enterAtLimitForTicker :: (StateHasPositions s, MonadRobot m c s) => TickerId -> NominalDiffTime -> T.Text -> Price -> Operation -> m Position enterAtLimitForTicker tickerId timeToCancel signalName price operation = do - acc <- seAccount <$> getEnvironment - inst <- seInstanceId <$> getEnvironment - vol <- seVolume <$> getEnvironment + acc <- view seAccount <$> getEnvironment + inst <- view seInstanceId <$> getEnvironment + vol <- view seVolume <$> getEnvironment enterAtLimitForTickerWithParams tickerId timeToCancel acc vol (SignalId inst signalName "") price operation enterAtLimitForTickerWithParams :: (StateHasPositions s, MonadRobot m c s) => TickerId -> NominalDiffTime -> T.Text -> Int -> SignalId -> Price -> Operation -> m Position enterAtLimitForTickerWithParams tickerId timeToCancel account quantity signalId price operation = do - lastTs <- seLastTimestamp <$> getEnvironment + lastTs <- view seLastTimestamp <$> getEnvironment submitOrder order appendToLog $ [st|enterAtLimit: %?, deadline: %?|] tickerId (timeToCancel `addUTCTime` lastTs) newPosition order account tickerId operation quantity 20 >>= @@ -532,8 +533,8 @@ enterShortAtLimitForTicker tickerId timeToCancel price signalName = enterAtLimit exitAtMarket :: (StateHasPositions s, MonadRobot m c s) => Position -> T.Text -> m Position exitAtMarket position signalName = do - inst <- seInstanceId <$> getEnvironment - lastTs <- seLastTimestamp <$> getEnvironment + inst <- view seInstanceId <$> getEnvironment + lastTs <- view seLastTimestamp <$> getEnvironment case posCurrentOrder position of Just order -> do cancelOrder (orderId order) @@ -563,8 +564,8 @@ exitAtMarket position signalName = do exitAtLimit :: (StateHasPositions s, MonadRobot m c s) => NominalDiffTime -> Price -> Position -> T.Text -> m Position exitAtLimit timeToCancel price position signalName = do - lastTs <- seLastTimestamp <$> getEnvironment - inst <- seInstanceId <$> getEnvironment + lastTs <- view seLastTimestamp <$> getEnvironment + inst <- view seInstanceId <$> getEnvironment case posCurrentOrder position of Just order -> cancelOrder (orderId order) Nothing -> doNothing From cdb6bf048af44bf6f8324aed0697fd413be5e1eb Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Mon, 13 Apr 2020 12:33:44 +0700 Subject: [PATCH 27/51] Backtest driver: refactoring --- robocom-zero.cabal | 1 + src/ATrade/Driver/Backtest.hs | 162 +++++++++++++++++++--------------- 2 files changed, 94 insertions(+), 69 deletions(-) diff --git a/robocom-zero.cabal b/robocom-zero.cabal index befc1f6..38b9a43 100644 --- a/robocom-zero.cabal +++ b/robocom-zero.cabal @@ -66,6 +66,7 @@ library , random , hedis , gitrev + , data-default default-language: Haskell2010 other-modules: ATrade.Exceptions diff --git a/src/ATrade/Driver/Backtest.hs b/src/ATrade/Driver/Backtest.hs index 450cb59..0cfb9a4 100644 --- a/src/ATrade/Driver/Backtest.hs +++ b/src/ATrade/Driver/Backtest.hs @@ -7,6 +7,7 @@ {-# LANGUAGE QuasiQuotes #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TemplateHaskell #-} module ATrade.Driver.Backtest ( backtestMain @@ -36,6 +37,7 @@ import Data.Aeson (FromJSON (..), Result (..), Value (..), decode) import Data.Aeson.Types (parseMaybe) import Data.ByteString.Lazy (readFile, toStrict) +import Data.Default import Data.HashMap.Strict (lookup) import Data.List (concat, filter, find, partition) import Data.List.Split (splitOn) @@ -61,6 +63,21 @@ data Params = Params { paramsFeeds :: [Feed] } deriving (Show, Eq) +data BacktestState c s = BacktestState { + _cash :: Double, + _robotState :: s, + _robotParams :: c, + _strategyEnvironment :: StrategyEnvironment, + _pendingOrders :: [Order], + _pendingEvents :: [Event], + _tradesLog :: [Trade], + _orderIdCounter :: Integer, + _pendingTimers :: [UTCTime], + _logs :: [T.Text] +} + +makeLenses ''BacktestState + paramsParser :: Parser Params paramsParser = Params <$> strOption ( @@ -107,9 +124,9 @@ backtestMain dataDownloadDelta defaultState initCallback callback = do runBacktestDriver feeds params tickerList = do let s = runConduit $ barStreamFromFeeds feeds .| backtestLoop let finalState = execState (unBacktestingMonad s) $ defaultBacktestState defaultState params tickerList - print $ cash finalState - print $ tradesLog finalState - forM_ (reverse . logs $ finalState) putStrLn + print $ finalState ^. cash + print $ finalState ^. tradesLog + forM_ (reverse $ finalState ^. logs) putStrLn loadStrategyConfig :: (FromJSON c) => Params -> IO ([Ticker], c) loadStrategyConfig params = do @@ -163,18 +180,24 @@ backtestMain dataDownloadDelta defaultState initCallback callback = do readSTRef minIx backtestLoop = awaitForever (\bar -> do - env <- gets strategyEnvironment + _curState <- use robotState + _env <- gets _strategyEnvironment let newTimestamp = barTimestamp bar - let newenv = env & seBars %~ (flip updateBars $ bar) & seLastTimestamp .~ newTimestamp - curState <- gets robotState - modify' (\s -> s { strategyEnvironment = newenv }) - handleEvents [NewBar bar]) - - handleEvents events = do - newActions <- mapM handleEvent events - newEvents <- executeActions (concat newActions) - unless (null newEvents) $ handleEvents newEvents - + strategyEnvironment . seBars %= (flip updateBars bar) + strategyEnvironment . seLastTimestamp .= newTimestamp + enqueueEvent (NewBar bar) + lift handleEvents) + + handleEvents = do + events <- use pendingEvents + case events of + (x:xs) -> do + pendingEvents .= xs + handleEvent x + handleEvents + _ -> return () + + {- executeActions actions = concat <$> mapM executeAction actions executeAction (ActionOrder order) = do @@ -194,21 +217,20 @@ backtestMain dataDownloadDelta defaultState initCallback callback = do executeAction (ActionLog t) = modify' (\s -> s { logs = t : logs s }) >> return [] executeAction (ActionSetupTimer t) = modify' (\s -> s { pendingTimers = t : pendingTimers s }) >> return [] executeAction (ActionIO _ _) = return [] + -} executePendingOrders bar = do - ev1 <- executeMarketOrders bar - ev2 <- executeLimitOrders bar - return $ ev1 ++ ev2 + executeMarketOrders bar + executeLimitOrders bar executeLimitOrders bar = do - (limitOrders, otherOrders) <- partition + (limitOrders, otherOrders'') <- partition (\o -> case orderPrice o of Limit _ -> True - _ -> False) <$> gets pendingOrders - let (executableOrders, otherOrders) = partition (isExecutable bar) limitOrders - modify' (\s -> s { pendingOrders = otherOrders } ) - forM executableOrders $ \order -> - order `executeAtPrice` priceForLimitOrder order bar + _ -> False) <$> use pendingOrders + let (executableOrders, otherOrders') = partition (isExecutable bar) limitOrders + pendingOrders .= otherOrders' ++ otherOrders'' + forM_ executableOrders $ \order -> order `executeAtPrice` priceForLimitOrder order bar isExecutable bar order = case orderPrice order of Limit price -> if orderOperation order == Buy @@ -227,16 +249,19 @@ backtestMain dataDownloadDelta defaultState initCallback callback = do _ -> error "Should've been limit order" executeMarketOrders bar = do - (marketOrders, otherOrders) <- partition (\o -> orderPrice o == Market) <$> gets pendingOrders - modify' (\s -> s { pendingOrders = otherOrders }) - forM marketOrders $ \order -> + (marketOrders, otherOrders) <- partition (\o -> orderPrice o == Market) <$> use pendingOrders + pendingOrders .= otherOrders + forM_ marketOrders $ \order -> order `executeAtPrice` barOpen bar executeAtPrice order price = do - ts <- view seLastTimestamp <$> gets strategyEnvironment - modify' (\s -> s { tradesLog = mkTrade order price ts : tradesLog s }) - return $ OrderUpdate (orderId order) Executed + ts <- use $ strategyEnvironment . seLastTimestamp + let thisTrade = mkTrade order price ts + tradesLog %= (\log' -> thisTrade : log') + pendingEvents %= ((:) (OrderUpdate (orderId order) Executed)) + pendingEvents %= ((:) (NewTrade thisTrade)) + mkTrade :: Order -> Price -> UTCTime -> Trade mkTrade order price ts = Trade { tradeOrderId = orderId order, tradePrice = price, @@ -254,19 +279,13 @@ backtestMain dataDownloadDelta defaultState initCallback callback = do handleEvent event@(NewBar bar) = do events <- executePendingOrders bar firedTimers <- fireTimers (barTimestamp bar) - actions <- concat <$> mapM handleEvent (events ++ map TimerFired firedTimers) - actions' <- handleEvent' event - return $ actions ++ actions' + mapM_ (\x -> enqueueEvent (TimerFired x)) firedTimers + handleEvent' event + return () handleEvent event = handleEvent' event - handleEvent' event = do - env <- gets strategyEnvironment - params <- gets robotParams - curState <- gets robotState - let (newState, actions, _) = runStrategyElement params curState env $ callback event - modify' (\s -> s { robotState = newState } ) - return actions + handleEvent' event = callback event updateBars barMap newbar = M.alter (\case Nothing -> Just BarSeries { bsTickerId = barSecurity newbar, @@ -278,8 +297,8 @@ backtestMain dataDownloadDelta defaultState initCallback callback = do updateBarList newbar _ = newbar:[newbar] fireTimers ts = do - (firedTimers, otherTimers) <- partition (< ts) <$> gets pendingTimers - modify' (\s -> s { pendingTimers = otherTimers }) + (firedTimers, otherTimers) <- partition (< ts) <$> use pendingTimers + pendingTimers .= otherTimers return firedTimers loadFeeds :: [Feed] -> IO (V.Vector [Bar]) @@ -292,39 +311,44 @@ backtestMain dataDownloadDelta defaultState initCallback callback = do rowToBar tid r = Bar tid (rowTime r) (rowOpen r) (rowHigh r) (rowLow r) (rowClose r) (rowVolume r) - nextOrderId = do - oid <- gets orderIdCounter - modify' (\s -> s { orderIdCounter = oid + 1 }) - return oid - - -data BacktestState s c = BacktestState { - cash :: Double, - robotState :: s, - robotParams :: c, - strategyEnvironment :: StrategyEnvironment, - pendingOrders :: [Order], - tradesLog :: [Trade], - orderIdCounter :: Integer, - pendingTimers :: [UTCTime], - logs :: [T.Text] -} -defaultBacktestState s c tickerList = BacktestState 0 s c (StrategyEnvironment "" "" 1 tickers (UTCTime (fromGregorian 1970 1 1) 0)) [] [] 1 [] [] + enqueueEvent event = pendingEvents %= ((:) event) + +instance (Default c, Default s) => Default (BacktestState s c) + where + def = defaultBacktestState def def [] + +defaultBacktestState s c tickerList = BacktestState 0 s c (StrategyEnvironment "" "" 1 tickers (UTCTime (fromGregorian 1970 1 1) 0)) [] [] [] 1 [] [] where tickers = M.fromList $ map (\x -> (code x, BarSeries (code x) (Timeframe (timeframeSeconds x)) [])) tickerList newtype BacktestingMonad s c a = BacktestingMonad { unBacktestingMonad :: State (BacktestState s c) a } deriving (Functor, Applicative, Monad, MonadState (BacktestState s c)) -instance MonadRobot (BacktestingMonad s c) s c where - submitOrder order = undefined - cancelOrder oid = undefined - appendToLog txt = undefined - setupTimer time = undefined - enqueueIOAction actionId action = undefined - getConfig = undefined - getState = undefined - setState s = undefined - getEnvironment = undefined +nextOrderId :: BacktestingMonad s c OrderId +nextOrderId = do + orderIdCounter += 1 + use orderIdCounter + +instance MonadRobot (BacktestingMonad c s) c s where + submitOrder order = do + oid <- nextOrderId + let orderWithId = order { orderId = oid } + pendingOrders %= ((:) orderWithId) + pendingEvents %= ((:) (OrderSubmitted orderWithId)) + cancelOrder oid = do + orders <- use pendingOrders + let (matchingOrders, otherOrders) = partition (\o -> orderId o == oid) orders + case matchingOrders of + [] -> return () + xs -> do + mapM_ (\o -> pendingEvents %= ((:) (OrderUpdate (orderId o) Cancelled))) xs + pendingOrders .= otherOrders + appendToLog txt = logs %= ((:) txt) + setupTimer time = pendingTimers %= ((:) time) + enqueueIOAction actionId action = error "Backtesting io actions is not supported" + getConfig = use robotParams + getState = use robotState + setState s = robotState .= s + getEnvironment = use strategyEnvironment From 9055091fdeb495f596c007260834aed34aed1256 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Mon, 13 Apr 2020 12:35:36 +0700 Subject: [PATCH 28/51] Backtest driver: cleanup --- src/ATrade/Driver/Backtest.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ATrade/Driver/Backtest.hs b/src/ATrade/Driver/Backtest.hs index 0cfb9a4..894ef99 100644 --- a/src/ATrade/Driver/Backtest.hs +++ b/src/ATrade/Driver/Backtest.hs @@ -277,7 +277,7 @@ backtestMain dataDownloadDelta defaultState initCallback callback = do } handleEvent event@(NewBar bar) = do - events <- executePendingOrders bar + executePendingOrders bar firedTimers <- fireTimers (barTimestamp bar) mapM_ (\x -> enqueueEvent (TimerFired x)) firedTimers handleEvent' event From 17eb32ecc1e07d656efc9a98fc29da9105fe4a06 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Mon, 13 Apr 2020 12:38:55 +0700 Subject: [PATCH 29/51] Cleanup: old Monad definitions Finally, both Real and Backtest drivers use MonadRobot instances --- src/ATrade/Driver/Backtest.hs | 28 +++---------------- src/ATrade/Driver/Real.hs | 2 +- src/ATrade/RoboCom/Monad.hs | 51 ----------------------------------- 3 files changed, 4 insertions(+), 77 deletions(-) diff --git a/src/ATrade/Driver/Backtest.hs b/src/ATrade/Driver/Backtest.hs index 894ef99..c20de38 100644 --- a/src/ATrade/Driver/Backtest.hs +++ b/src/ATrade/Driver/Backtest.hs @@ -19,10 +19,10 @@ import ATrade.Driver.Types (InitializationCallback, import ATrade.Exceptions import ATrade.Quotes.Finam as QF import ATrade.RoboCom.Monad (Event (..), EventCallback, - MonadRobot (..), StrategyAction (..), + MonadRobot (..), StrategyEnvironment (..), - appendToLog, runStrategyElement, - seBars, seLastTimestamp, st) + appendToLog, seBars, seLastTimestamp, + st) import ATrade.RoboCom.Positions import ATrade.RoboCom.Types (BarSeries (..), Ticker (..), Timeframe (..)) @@ -197,28 +197,6 @@ backtestMain dataDownloadDelta defaultState initCallback callback = do handleEvents _ -> return () - {- - executeActions actions = concat <$> mapM executeAction actions - - executeAction (ActionOrder order) = do - oid <- nextOrderId - let submittedOrder = order { orderState = Submitted, orderId = oid } - modify' (\s -> s { pendingOrders = submittedOrder : pendingOrders s }) - return [OrderSubmitted submittedOrder] - - executeAction (ActionCancelOrder oid) = do - mbOrder <- find (\o -> orderId o == oid && orderState o == Submitted) <$> gets pendingOrders - case mbOrder of - Just _ -> do - modify' (\s -> s { pendingOrders = filter (\o -> orderId o == oid) (pendingOrders s)}) - return [OrderUpdate oid Cancelled] - _ -> return [] - - executeAction (ActionLog t) = modify' (\s -> s { logs = t : logs s }) >> return [] - executeAction (ActionSetupTimer t) = modify' (\s -> s { pendingTimers = t : pendingTimers s }) >> return [] - executeAction (ActionIO _ _) = return [] - -} - executePendingOrders bar = do executeMarketOrders bar executeLimitOrders bar diff --git a/src/ATrade/Driver/Real.hs b/src/ATrade/Driver/Real.hs index 2611f41..77b087d 100644 --- a/src/ATrade/Driver/Real.hs +++ b/src/ATrade/Driver/Real.hs @@ -47,7 +47,7 @@ import Data.Maybe import Data.Monoid import Database.Redis hiding (info, decode) import ATrade.Types -import ATrade.RoboCom.Monad (StrategyMonad, StrategyAction(..), EventCallback, Event(..), runStrategyElement, StrategyEnvironment(..), seBars, seLastTimestamp, Event(..), MonadRobot(..)) +import ATrade.RoboCom.Monad (EventCallback, Event(..), StrategyEnvironment(..), seBars, seLastTimestamp, Event(..), MonadRobot(..)) import ATrade.BarAggregator import ATrade.Driver.Real.BrokerClientThread import ATrade.Driver.Real.QuoteSourceThread diff --git a/src/ATrade/RoboCom/Monad.hs b/src/ATrade/RoboCom/Monad.hs index 71c8af8..0f24a80 100644 --- a/src/ATrade/RoboCom/Monad.hs +++ b/src/ATrade/RoboCom/Monad.hs @@ -9,23 +9,14 @@ {-# LANGUAGE TypeSynonymInstances #-} module ATrade.RoboCom.Monad ( - RState, - RConfig, - RActions, - REnv, StrategyEnvironment(..), seInstanceId, seAccount, seVolume, seBars, seLastTimestamp, - StrategyElement, - runStrategyElement, EventCallback, Event(..), - StrategyMonad, - StrategyAction(..), - tellAction, MonadRobot(..), also, st @@ -34,8 +25,6 @@ module ATrade.RoboCom.Monad ( import ATrade.RoboCom.Types import ATrade.Types -import Ether - import Control.Lens import Data.Aeson.Types import qualified Data.Text as T @@ -57,19 +46,6 @@ class (Monad m) => MonadRobot m c s | m -> c, m -> s where setState (f oldState) getEnvironment :: m StrategyEnvironment -data RState -data RConfig -data RActions -data REnv - -type StrategyMonad c s = WriterT RActions [StrategyAction] (StateT RState s (ReaderT REnv StrategyEnvironment (Reader RConfig c))) -type StrategyElement c s r = (StrategyMonad c s) r - -runStrategyElement :: c -> s -> StrategyEnvironment -> StrategyElement c s r -> (s, [StrategyAction], r) -runStrategyElement conf sta env action = (newState, actions, retValue) - where - ((retValue, actions), newState) = runReader @RConfig (runReaderT @REnv (runStateT @RState (runWriterT @RActions action) sta) env) conf - type EventCallback c s = forall m . MonadRobot m c s => Event -> m () data Event = NewBar Bar @@ -82,12 +58,6 @@ data Event = NewBar Bar | ActionCompleted Int Value deriving (Show, Eq) -data StrategyAction = ActionOrder Order - | ActionCancelOrder OrderId - | ActionLog T.Text - | ActionSetupTimer UTCTime - | ActionIO Int (IO Value) - data StrategyEnvironment = StrategyEnvironment { _seInstanceId :: !T.Text, -- ^ Strategy instance identifier. Should be unique among all strategies (very desirable) _seAccount :: !T.Text, -- ^ Account string to use for this strategy instance. Broker-dependent @@ -98,27 +68,6 @@ data StrategyEnvironment = StrategyEnvironment { makeLenses ''StrategyEnvironment -instance Show StrategyAction where - show (ActionOrder order) = "ActionOrder " ++ show order - show (ActionCancelOrder oid) = "ActionCancelOrder " ++ show oid - show (ActionLog t) = "ActionLog " ++ show t - show (ActionIO x _) = "ActionIO " ++ show x - show (ActionSetupTimer t) = "ActionSetupTimer e" ++ show t - -tellAction :: StrategyAction -> StrategyElement c s () -tellAction a = tell @RActions [a] - -instance MonadRobot (StrategyMonad c s) c s where - submitOrder order = tellAction $ ActionOrder order - cancelOrder oId = tellAction $ ActionCancelOrder oId - appendToLog = tellAction . ActionLog - setupTimer = tellAction . ActionSetupTimer - enqueueIOAction actionId action = tellAction $ ActionIO actionId action - getConfig = ask @RConfig - getState = get @RState - setState = put @RState - getEnvironment = ask @REnv - also :: EventCallback c s -> EventCallback c s -> EventCallback c s also cb1 cb2 = (\event -> cb1 event >> cb2 event) From 8e34ac61d5320d878a05d1344ca651ac2ea6ed9e Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Mon, 13 Apr 2020 13:05:03 +0700 Subject: [PATCH 30/51] More cleanup & warning elimination --- robocom-zero.cabal | 2 +- src/ATrade/Backtest/Execution.hs | 102 ------------- src/ATrade/BarAggregator.hs | 15 +- src/ATrade/Driver/Backtest.hs | 32 ++-- src/ATrade/Driver/Real.hs | 12 +- src/ATrade/Driver/Real/BrokerClientThread.hs | 1 - src/ATrade/Driver/Real/QuoteSourceThread.hs | 4 - src/ATrade/Forums/Smartlab.hs | 153 ------------------- src/ATrade/Quotes/Finam.hs | 4 +- src/ATrade/Quotes/HAP.hs | 8 +- src/ATrade/Quotes/QHP.hs | 1 - src/ATrade/RoboCom/Indicators.hs | 4 +- src/ATrade/RoboCom/Positions.hs | 42 ++--- src/ATrade/RoboCom/Types.hs | 3 - src/ATrade/RoboCom/Utils.hs | 5 +- 15 files changed, 53 insertions(+), 335 deletions(-) delete mode 100644 src/ATrade/Backtest/Execution.hs delete mode 100644 src/ATrade/Forums/Smartlab.hs diff --git a/robocom-zero.cabal b/robocom-zero.cabal index 38b9a43..1a21917 100644 --- a/robocom-zero.cabal +++ b/robocom-zero.cabal @@ -15,7 +15,7 @@ cabal-version: >=1.10 library hs-source-dirs: src - ghc-options: -Wall -fno-warn-orphans -Wno-type-defaults + ghc-options: -Wall -Werror -fno-warn-orphans -Wno-type-defaults exposed-modules: ATrade.RoboCom.Indicators , ATrade.RoboCom.Monad , ATrade.RoboCom.Positions diff --git a/src/ATrade/Backtest/Execution.hs b/src/ATrade/Backtest/Execution.hs deleted file mode 100644 index 643c180..0000000 --- a/src/ATrade/Backtest/Execution.hs +++ /dev/null @@ -1,102 +0,0 @@ -{-# LANGUAGE OverloadedStrings #-} - -module ATrade.Backtest.Execution ( - mkExecutionAgent, - ExecutionAgent(..), - executePending, - executeStep -) where - -import qualified Data.Text as T -import qualified Data.Map as M -import qualified Data.List as L -import ATrade.Types -import ATrade.Strategy.Types -import ATrade.Strategy -import Control.Monad.State -import Control.Monad.Trans.Writer -import Data.Decimal -import Data.Time.Clock -import Data.Time.Calendar - -data Position = Position { - ticker :: T.Text, - balance :: Int } - -data ExecutionAgent = ExecutionAgent { - pendingOrders :: [Order], - cash :: Decimal, - currentTime :: UTCTime, - orderIdCounter :: Integer -} - -mkExecutionAgent startCash = ExecutionAgent { pendingOrders = [], - cash = startCash, - currentTime = UTCTime (fromGregorian 1970 1 1) 0, - orderIdCounter = 1 } - -executeAtPrice :: Order -> Decimal -> WriterT [Event] (State ExecutionAgent) () -executeAtPrice order price = do - when (orderState order == Unsubmitted) $ tell [OrderSubmitted order] - tell [OrderUpdate (orderId order) Executed] - timestamp <- gets currentTime - tell [NewTrade (mkTradeForOrder timestamp order price)] - - case orderOperation order of - Buy -> modify' (\agent -> agent { cash = cash agent - price * realFracToDecimal 10 (toRational $ orderQuantity order) }) - Sell -> modify' (\agent -> agent { cash = cash agent + price * realFracToDecimal 10 (toRational $ orderQuantity order) }) - -mkTradeForOrder timestamp order price = Trade { tradeOrderId = orderId order, - tradePrice = price, - tradeQuantity = orderQuantity order, - tradeVolume = price * realFracToDecimal 10 (toRational $ orderQuantity order), - tradeVolumeCurrency = "TEST_CURRENCY", - tradeOperation = orderOperation order, - tradeAccount = orderAccountId order, - tradeSecurity = orderSecurity order, - tradeTimestamp = timestamp, - tradeSignalId = orderSignalId order } - - -executePending :: Bars -> WriterT [Event] (State ExecutionAgent) () -executePending bars = do - orders <- gets pendingOrders - let (executedOrders, leftover) = L.partition shouldExecute orders - - mapM_ executeAtOrdersPrice executedOrders - modify' (\s -> s { pendingOrders = leftover } ) - where - executeAtOrdersPrice order = case orderPrice order of - Limit price -> executeAtPrice order price - _ -> return () -- TODO handle stops - - shouldExecute order = case M.lookup (orderSecurity order) bars of - Just (DataSeries ((ts, bar) : _)) -> case orderPrice order of - Limit price -> crosses bar price - _ -> False - Nothing -> False - - crosses bar price = (barClose bar > price && barOpen bar < price) || (barClose bar < price && barOpen bar > price) - -executeStep :: Bars -> [Order] -> WriterT [Event] (State ExecutionAgent) () -executeStep bars orders = do - -- Assign consecutive IDs - orders' <- mapM (\o -> do - id <- gets orderIdCounter - modify(\s -> s { orderIdCounter = id + 1 }) - return o { orderId = id }) orders - - let (executableNow, pending) = L.partition isExecutableNow orders' - mapM_ (executeOrderAtLastPrice bars) executableNow - modify' (\s -> s { pendingOrders = pending ++ pendingOrders s }) - - where - isExecutableNow order = case M.lookup (orderSecurity order) bars of - Just (DataSeries (x:xs)) -> case orderPrice order of - Limit price -> (orderOperation order == Buy && price >= (barClose . snd) x) || (orderOperation order == Sell && price <= (barClose . snd) x) - Market -> True - _ -> False - - executeOrderAtLastPrice bars order = case M.lookup (orderSecurity order) bars of - Just (DataSeries ((ts, bar) : _)) -> executeAtPrice order (barClose bar) - _ -> return () diff --git a/src/ATrade/BarAggregator.hs b/src/ATrade/BarAggregator.hs index 5523473..f8b73bc 100644 --- a/src/ATrade/BarAggregator.hs +++ b/src/ATrade/BarAggregator.hs @@ -32,7 +32,6 @@ import Control.Lens import Control.Monad.State import qualified Data.Map.Strict as M import Data.Time.Clock -import Debug.Trace -- | Bar aggregator state data BarAggregator = BarAggregator { @@ -110,7 +109,7 @@ handleTick tick = runState $ do else return Nothing where - isInTimeInterval tick (a, b) = (utctDayTime . timestamp) tick >= a && (utctDayTime . timestamp) tick <= b + isInTimeInterval tick' (a, b) = (utctDayTime . timestamp) tick' >= a && (utctDayTime . timestamp) tick' <= b barFromTick !newtick = Bar { barSecurity = security newtick, barTimestamp = timestamp newtick, barOpen = value newtick, @@ -134,18 +133,6 @@ handleTick tick = runState $ do where newTimestamp = timestamp newtick - emptyBarFrom !bar newtick = newBar - where - newTimestamp = timestamp newtick - newBar = Bar { - barSecurity = barSecurity bar, - barTimestamp = newTimestamp, - barOpen = barClose bar, - barHigh = barClose bar, - barLow = barClose bar, - barClose = barClose bar, - barVolume = 0 } - updateTime :: Tick -> BarAggregator -> (Maybe Bar, BarAggregator) updateTime tick = runState $ do lLastTicks %= M.insert (security tick, datatype tick) tick diff --git a/src/ATrade/Driver/Backtest.hs b/src/ATrade/Driver/Backtest.hs index c20de38..eb8d71d 100644 --- a/src/ATrade/Driver/Backtest.hs +++ b/src/ATrade/Driver/Backtest.hs @@ -14,15 +14,13 @@ module ATrade.Driver.Backtest ( ) where import ATrade.Driver.Types (InitializationCallback, - Strategy (..), StrategyInstanceParams (..)) import ATrade.Exceptions import ATrade.Quotes.Finam as QF import ATrade.RoboCom.Monad (Event (..), EventCallback, MonadRobot (..), StrategyEnvironment (..), - appendToLog, seBars, seLastTimestamp, - st) + appendToLog, seBars, seLastTimestamp) import ATrade.RoboCom.Positions import ATrade.RoboCom.Types (BarSeries (..), Ticker (..), Timeframe (..)) @@ -30,16 +28,15 @@ import ATrade.Types import Conduit (awaitForever, runConduit, yield, (.|)) import Control.Exception.Safe -import Control.Lens +import Control.Lens hiding (ix) import Control.Monad.ST (runST) import Control.Monad.State -import Data.Aeson (FromJSON (..), Result (..), - Value (..), decode) +import Data.Aeson (FromJSON (..), Value (..), decode) import Data.Aeson.Types (parseMaybe) import Data.ByteString.Lazy (readFile, toStrict) import Data.Default import Data.HashMap.Strict (lookup) -import Data.List (concat, filter, find, partition) +import Data.List (partition) import Data.List.Split (splitOn) import qualified Data.Map.Strict as M import Data.Semigroup ((<>)) @@ -95,7 +92,7 @@ feedArgParser = eitherReader (\s -> case splitOn ":" s of _ -> Left $ "Unable to parse feed id: " ++ s) backtestMain :: (FromJSON c, StateHasPositions s) => DiffTime -> s -> Maybe (InitializationCallback c) -> EventCallback c s -> IO () -backtestMain dataDownloadDelta defaultState initCallback callback = do +backtestMain _dataDownloadDelta defaultState initCallback callback = do params <- execParser opts (tickerList, config) <- loadStrategyConfig params @@ -116,7 +113,7 @@ backtestMain dataDownloadDelta defaultState initCallback callback = do feeds <- loadFeeds (paramsFeeds params) - runBacktestDriver feeds config tickerList + runBacktestDriver feeds updatedConfig tickerList where opts = info (helper <*> paramsParser) ( fullDesc <> header "ATrade strategy backtesting framework" ) @@ -141,14 +138,11 @@ backtestMain dataDownloadDelta defaultState initCallback callback = do Object o -> do mbTickers <- "tickers" `lookup` o mbParams <- "params" `lookup` o - tickers <- parseMaybe parseJSON mbTickers + tickers' <- parseMaybe parseJSON mbTickers params <- parseMaybe parseJSON mbParams - return (tickers, params) + return (tickers', params) _ -> Nothing - resultToMaybe (Error _) = Nothing - resultToMaybe (Success a) = Just a - barStreamFromFeeds feeds = case nextBar feeds of Just (bar, feeds') -> yield bar >> barStreamFromFeeds feeds' _ -> return () @@ -166,7 +160,6 @@ backtestMain dataDownloadDelta defaultState initCallback callback = do minIx <- newSTRef Nothing forM_ [0..(V.length feeds-1)] (\ix -> do let feed = feeds ! ix - curIx <- readSTRef minIx curTs <- readSTRef minTs case feed of x:_ -> case curTs of @@ -292,13 +285,14 @@ backtestMain dataDownloadDelta defaultState initCallback callback = do enqueueEvent event = pendingEvents %= ((:) event) -instance (Default c, Default s) => Default (BacktestState s c) +instance (Default c, Default s) => Default (BacktestState c s) where def = defaultBacktestState def def [] -defaultBacktestState s c tickerList = BacktestState 0 s c (StrategyEnvironment "" "" 1 tickers (UTCTime (fromGregorian 1970 1 1) 0)) [] [] [] 1 [] [] +defaultBacktestState :: s -> c -> [Ticker] -> BacktestState c s +defaultBacktestState s c tickerList = BacktestState 0 s c (StrategyEnvironment "" "" 1 tickers' (UTCTime (fromGregorian 1970 1 1) 0)) [] [] [] 1 [] [] where - tickers = M.fromList $ map (\x -> (code x, BarSeries (code x) (Timeframe (timeframeSeconds x)) [])) tickerList + tickers' = M.fromList $ map (\x -> (code x, BarSeries (code x) (Timeframe (timeframeSeconds x)) [])) tickerList newtype BacktestingMonad s c a = BacktestingMonad { unBacktestingMonad :: State (BacktestState s c) a } deriving (Functor, Applicative, Monad, MonadState (BacktestState s c)) @@ -324,7 +318,7 @@ instance MonadRobot (BacktestingMonad c s) c s where pendingOrders .= otherOrders appendToLog txt = logs %= ((:) txt) setupTimer time = pendingTimers %= ((:) time) - enqueueIOAction actionId action = error "Backtesting io actions is not supported" + enqueueIOAction _actionId _action = error "Backtesting io actions is not supported" getConfig = use robotParams getState = use robotState setState s = robotState .= s diff --git a/src/ATrade/Driver/Real.hs b/src/ATrade/Driver/Real.hs index 77b087d..bb77742 100644 --- a/src/ATrade/Driver/Real.hs +++ b/src/ATrade/Driver/Real.hs @@ -44,7 +44,6 @@ import Data.Time.Calendar import Data.Time.Clock import Data.Time.Clock.POSIX import Data.Maybe -import Data.Monoid import Database.Redis hiding (info, decode) import ATrade.Types import ATrade.RoboCom.Monad (EventCallback, Event(..), StrategyEnvironment(..), seBars, seLastTimestamp, Event(..), MonadRobot(..)) @@ -141,10 +140,10 @@ instance MonadRobot (App c s) c s where timers <- asks envTimers lift $ atomicModifyIORef' timers (\s -> (t : s, ())) - enqueueIOAction actionId action = do + enqueueIOAction actionId action' = do eventChan <- asks envEventChan lift $ void $ forkIO $ do - v <- action + v <- action' BC.writeChan eventChan $ ActionCompleted actionId v getConfig = asks envConfigRef >>= lift . readIORef @@ -282,7 +281,7 @@ robotMain dataDownloadDelta defaultState initCallback callback = do envAggregator = agg, envLastTimestamp = now } - runReaderT (barStrategyDriver ctx (sourceBarTimeframe params) tickFilter strategy configRef stateRef timersRef shutdownMv) env `finally` killThread stateSavingThread) + runReaderT (barStrategyDriver ctx (sourceBarTimeframe params) tickFilter strategy shutdownMv) env `finally` killThread stateSavingThread) where tickFilter :: Tick -> Bool tickFilter tick = @@ -476,8 +475,8 @@ mkBarStrategy instanceParams dd params initialState cb = BarStrategy { -- | Main function which handles incoming events (ticks/orders), passes them to strategy callback -- and executes returned strategy actions -barStrategyDriver :: Context -> Maybe Int -> (Tick -> Bool) -> Strategy c s -> IORef c -> IORef s -> IORef [UTCTime] -> MVar () -> App c s () -barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef timersRef shutdownVar = do +barStrategyDriver :: Context -> Maybe Int -> (Tick -> Bool) -> Strategy c s -> MVar () -> App c s () +barStrategyDriver ctx mbSourceTimeframe tickFilter strategy shutdownVar = do eventChan <- asks envEventChan brokerChan <- asks envBrokerChan agg <- asks envAggregator @@ -522,6 +521,7 @@ barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef t newTimers <- catMaybes <$> (mapM (checkTimer eventChan newTimestamp) $ strategyTimers strategy') (eventCallback strategy) event + timersRef <- asks envTimers lift $ writeIORef timersRef newTimers readAndHandleEvents agg strategy' diff --git a/src/ATrade/Driver/Real/BrokerClientThread.hs b/src/ATrade/Driver/Real/BrokerClientThread.hs index ee994da..ba0ce49 100644 --- a/src/ATrade/Driver/Real/BrokerClientThread.hs +++ b/src/ATrade/Driver/Real/BrokerClientThread.hs @@ -9,7 +9,6 @@ import ATrade.Broker.Client import ATrade.Broker.Protocol import ATrade.RoboCom.Monad hiding (cancelOrder, submitOrder) -import ATrade.RoboCom.Types import ATrade.Types import Control.Concurrent hiding (readChan, writeChan, diff --git a/src/ATrade/Driver/Real/QuoteSourceThread.hs b/src/ATrade/Driver/Real/QuoteSourceThread.hs index 9ad36b9..007f4c7 100644 --- a/src/ATrade/Driver/Real/QuoteSourceThread.hs +++ b/src/ATrade/Driver/Real/QuoteSourceThread.hs @@ -13,7 +13,6 @@ import ATrade.RoboCom.Types import ATrade.Types import Data.IORef -import Data.Maybe import qualified Data.Text as T import Control.Concurrent hiding (readChan, writeChan, @@ -58,7 +57,4 @@ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter maybeSourceTim (datatype tick /= LastTradePrice || (datatype tick == LastTradePrice && volume tick > 0)) tickersList = fmap code . (tickers . strategyInstanceParams) $ strategy - applyTimeframeSpec t = case maybeSourceTimeframe of - Just tf -> t `T.append` T.pack (":" ++ show tf ++ ";") - Nothing -> t diff --git a/src/ATrade/Forums/Smartlab.hs b/src/ATrade/Forums/Smartlab.hs deleted file mode 100644 index ba79a14..0000000 --- a/src/ATrade/Forums/Smartlab.hs +++ /dev/null @@ -1,153 +0,0 @@ -{-# OPTIONS_GHC -Wno-type-defaults #-} - -module ATrade.Forums.Smartlab ( - NewsItem(..), - IndexItem(..), - getIndex, - getItem -) where - -import qualified Data.ByteString.Lazy as BL -import qualified Data.List as L -import Data.Maybe -import qualified Data.Text as T -import Data.Text.Encoding -import Data.Time.Calendar -import Data.Time.Clock -import Network.HTTP.Simple -import Safe -import Text.HTML.TagSoup -import Text.Parsec -import Text.Parsec.Text -import Text.StringLike - -import Debug.Trace - -data NewsItem = NewsItem { - niUrl :: !T.Text, - niHeader :: !T.Text, - niText :: !T.Text, - niAuthor :: !T.Text, - niPubTime :: !UTCTime -} deriving (Show, Eq) - -data IndexItem = IndexItem { - iiUrl :: !T.Text, - iiTitle :: !T.Text, - iiPubTime :: !UTCTime -} deriving (Show, Eq) - -monthNames :: [T.Text] -monthNames = fmap T.pack ["января", "февраля", "марта", "апреля", "мая", "июня", "июля", "августа", "сентября", "октября", "ноября", "декабря"] - -extractBetween :: StringLike str => String -> [Tag str] -> [Tag str] -extractBetween tagName = takeWhile (~/= closeTag) . dropWhile (~/= openTag) - where - openTag = "<" ++ tagName ++ ">" - closeTag = "" - -matchClass :: T.Text -> T.Text -> Tag T.Text -> Bool -matchClass _ className (TagOpen _ attrs) = case L.lookup (T.pack "class") attrs of - Just klass -> className `L.elem` T.words klass - Nothing -> False - -matchClass _ _ _ = False - -parseTimestamp :: T.Text -> Maybe UTCTime -parseTimestamp text = case parse timestampParser "" text of - Left _ -> Nothing - Right val -> Just val - where - timestampParser :: Parser UTCTime - timestampParser = do - spaces - day <- read <$> many1 digit - spaces - monthName <- T.pack <$> many1 letter - case L.elemIndex monthName monthNames of - Nothing -> fail "Can't parse month" - Just month -> do - spaces - year <- fromIntegral . read <$> many1 digit - _ <- char ',' - spaces - hour <- fromIntegral . read <$> many1 digit - _ <- char ':' - minute <- fromIntegral . read <$> many1 digit - return $ UTCTime (fromGregorian year (month + 1) day) (hour * 3600 + minute * 60) - -getItem :: IndexItem -> IO (Maybe NewsItem) -getItem indexItem = do - rq <- parseRequest $ T.unpack (iiUrl indexItem) - resp <- httpLBS rq - if getResponseStatusCode resp == 200 - then return . parseItem . decodeUtf8 . BL.toStrict . getResponseBody $ resp - else return Nothing - where - parseItem rawHtml = case parseTimestamp timestamp of - Just itemPubtime -> Just NewsItem { - niUrl = iiUrl indexItem, - niHeader = itemHeader, - niText = itemText, - niAuthor = itemAuthor, - niPubTime = itemPubtime - } - Nothing -> Nothing - where - itemHeader = innerText . - extractBetween "span" . - extractBetween "h1" . - dropWhile (not . matchClass (T.pack "div") (T.pack "topic")) $ tags - - itemText = innerText . - extractBetween "div" . - dropWhile (not . matchClass (T.pack "div") (T.pack "content")) . - dropWhile (~/= "
") $ tags - - itemAuthor = innerText . - extractBetween "li" . - dropWhile (not . matchClass (T.pack "li") (T.pack "author")) $ tags - - timestamp = traceShowId $ innerText . - extractBetween "li" . - dropWhile (not . matchClass (T.pack "li") (T.pack "date")) $ tags - - tags = parseTags rawHtml - - -getIndex :: T.Text -> Int -> IO ([IndexItem], Bool) -getIndex rootUrl pageNumber = do - rq <- parseRequest $ T.unpack $ makeUrl rootUrl pageNumber - resp <- httpLBS rq - return $ if getResponseStatusCode resp == 200 - then parseIndex . decodeUtf8 . BL.toStrict . getResponseBody $ resp - else ([], False) - where - parseIndex :: T.Text -> ([IndexItem], Bool) - parseIndex x = (mapMaybe parseIndexEntry $ partitions (matchClass (T.pack "div") (T.pack "topic")) $ parseTags x, hasNextPage $ parseTags x) - - parseIndexEntry :: [Tag T.Text] -> Maybe IndexItem - parseIndexEntry divTag = do - a <- headMay . dropWhile (~/= "") $ divTag - let text = innerText . takeWhile (~/= "") . dropWhile (~/= "") $ divTag - case a of - TagOpen _ attr -> do - href <- L.lookup (T.pack "href") attr - ts <- parseTimestamp (innerText $ takeWhile (~/= "") . dropWhile (not . matchClass (T.pack "li") (T.pack "date")) $ divTag) - Just IndexItem { iiUrl = href, - iiTitle = text, - iiPubTime = ts } - _ -> Nothing - - - makeUrl root pagenumber - | pagenumber == 0 || pagenumber == 1 = root - | otherwise = root `T.append` (T.pack "/page") `T.append` T.pack (show pagenumber) - - hasNextPage tags = if pageNumber <= 1 - then paginationLinksCount > 0 - else paginationLinksCount > 1 - where - paginationLinksCount = length . filter (~== "") . extractBetween "p" . dropWhile (~/= "