From 72c421c64f80bfbbcff97e7a989e4e74925f57a5 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Sun, 21 Nov 2021 13:44:21 +0700 Subject: [PATCH] Make HistoryProvider and TickerInfoProvider monad typeclasses --- robocom-zero.cabal | 8 +- src/ATrade/Driver/Junction/QuoteThread.hs | 99 +++++++++++------------ src/ATrade/Quotes/HistoryProvider.hs | 8 +- src/ATrade/Quotes/TickerInfoProvider.hs | 9 +-- test/Test/Driver/Junction/QuoteThread.hs | 38 ++++++++- test/Test/Mock/HistoryProvider.hs | 18 +++-- test/Test/Mock/TickerInfoProvider.hs | 15 ++-- 7 files changed, 115 insertions(+), 80 deletions(-) diff --git a/robocom-zero.cabal b/robocom-zero.cabal index da3c561..ce6842f 100644 --- a/robocom-zero.cabal +++ b/robocom-zero.cabal @@ -17,8 +17,10 @@ library hs-source-dirs: src ghc-options: -Wall -fno-warn-orphans -Wno-type-defaults exposed-modules: ATrade.RoboCom.Indicators + , ATrade.RoboCom.ConfigStorage , ATrade.RoboCom.Monad , ATrade.RoboCom.Positions + , ATrade.RoboCom.Persistence , ATrade.RoboCom.Types , ATrade.RoboCom.Utils , ATrade.Quotes @@ -29,9 +31,11 @@ library -- , ATrade.Driver.Backtest , ATrade.Driver.Junction , ATrade.Driver.Junction.Types + , ATrade.Driver.Junction.QuoteThread + , ATrade.Driver.Junction.QuoteStream + , ATrade.Driver.Junction.RobotDriverThread , ATrade.BarAggregator , ATrade.RoboCom - , ATrade.Driver.Junction.QuoteThread , ATrade.Quotes.HistoryProvider , ATrade.Quotes.TickerInfoProvider other-modules: Paths_robocom_zero @@ -109,7 +113,7 @@ test-suite robots-test , zeromq4-haskell , zeromq4-haskell-zap , BoundedChan - , hslogger + , mtl ghc-options: -threaded -rtsopts -with-rtsopts=-N default-language: Haskell2010 other-modules: Test.RoboCom.Indicators diff --git a/src/ATrade/Driver/Junction/QuoteThread.hs b/src/ATrade/Driver/Junction/QuoteThread.hs index 4753f8e..9c8bac4 100644 --- a/src/ATrade/Driver/Junction/QuoteThread.hs +++ b/src/ATrade/Driver/Junction/QuoteThread.hs @@ -1,5 +1,6 @@ -{-# LANGUAGE DeriveGeneric #-} -{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE ScopedTypeVariables #-} module ATrade.Driver.Junction.QuoteThread ( @@ -9,43 +10,39 @@ module ATrade.Driver.Junction.QuoteThread addSubscription ) where -import ATrade.Quotes.HistoryProvider (HistoryProvider (..)) -import ATrade.Quotes.TickerInfoProvider (TickerInfoProvider (..)) -import ATrade.QuoteSource.Client (QuoteData (QDBar, QDTick), - QuoteSourceClientHandle, - quoteSourceClientSubscribe, - startQuoteSourceClient, - stopQuoteSourceClient) -import ATrade.RoboCom.Types (Bar (barSecurity), - BarSeries (..), - BarSeriesId (BarSeriesId), - Bars, InstrumentParameters) -import ATrade.Types (BarTimeframe (BarTimeframe), ClientSecurityParams (ClientSecurityParams), - Tick (security), TickerId) -import Control.Concurrent (ThreadId, forkIO, killThread) -import Control.Concurrent.BoundedChan (BoundedChan, newBoundedChan, - readChan, writeChan) -import Control.Monad (forever) -import Control.Monad.Reader (MonadIO (liftIO), - ReaderT (runReaderT), lift) -import Control.Monad.Reader.Class (asks) -import Data.Hashable (Hashable) -import qualified Data.HashMap.Strict as HM -import Data.IORef (IORef, atomicModifyIORef', - newIORef, readIORef) -import qualified Data.Map.Strict as M -import qualified Data.Text as T -import Data.Time (addUTCTime, getCurrentTime) -import GHC.Generics (Generic) -import System.ZMQ4 (Context) -import System.ZMQ4.ZAP (CurveCertificate) +import ATrade.Driver.Junction.QuoteStream (QuoteSubscription (..)) +import ATrade.Quotes.HistoryProvider (HistoryProvider (..)) +import ATrade.Quotes.TickerInfoProvider (TickerInfoProvider (..)) +import ATrade.QuoteSource.Client (QuoteData (QDBar, QDTick), + QuoteSourceClientHandle, + quoteSourceClientSubscribe, + startQuoteSourceClient, + stopQuoteSourceClient) +import ATrade.RoboCom.Types (Bar (barSecurity), + BarSeries (..), + BarSeriesId (BarSeriesId), + Bars, InstrumentParameters) +import ATrade.Types (BarTimeframe (BarTimeframe), + ClientSecurityParams (ClientSecurityParams), + Tick (security), TickerId) +import Control.Concurrent (ThreadId, forkIO, + killThread) +import Control.Concurrent.BoundedChan (BoundedChan, + newBoundedChan, readChan, + writeChan) +import Control.Monad (forever) +import Control.Monad.Reader (MonadIO (liftIO), + ReaderT (runReaderT), lift) +import Control.Monad.Reader.Class (asks) +import qualified Data.HashMap.Strict as HM +import Data.IORef (IORef, atomicModifyIORef', + newIORef, readIORef) +import qualified Data.Map.Strict as M +import qualified Data.Text as T +import Data.Time (addUTCTime, getCurrentTime) +import System.ZMQ4 (Context) +import System.ZMQ4.ZAP (CurveCertificate) -data QuoteSubscription = - QuoteSubscription TickerId BarTimeframe - deriving (Generic, Eq) - -instance Hashable BarTimeframe -instance Hashable QuoteSubscription data QuoteThreadHandle = QuoteThreadHandle ThreadId ThreadId QuoteThreadEnv @@ -56,48 +53,48 @@ data QuoteThreadEnv = endpoints :: IORef (HM.HashMap QuoteSubscription [BoundedChan QuoteData]), qsclient :: QuoteSourceClientHandle, paramsCache :: IORef (M.Map TickerId InstrumentParameters), - historyProvider :: HistoryProvider, - tickerInfoProvider :: TickerInfoProvider, downloaderChan :: BoundedChan QuoteSubscription } -startQuoteThread :: (MonadIO m) => +startQuoteThread :: (MonadIO m, + MonadIO m1, + HistoryProvider m1, + TickerInfoProvider m1) => IORef Bars -> Context -> T.Text -> Maybe CurveCertificate -> Maybe CurveCertificate -> - HistoryProvider -> - TickerInfoProvider -> + (m1 () -> IO ()) -> m QuoteThreadHandle -startQuoteThread barsRef ctx ep clientCert serverCert hp tip = do +startQuoteThread barsRef ctx ep clientCert serverCert downloadThreadRunner = do chan <- liftIO $ newBoundedChan 2000 dChan <- liftIO $ newBoundedChan 2000 qsc <- liftIO $ startQuoteSourceClient chan [] ctx ep (ClientSecurityParams clientCert serverCert) - env <- liftIO $ QuoteThreadEnv barsRef <$> newIORef HM.empty <*> pure qsc <*> newIORef M.empty <*> pure hp <*> pure tip <*> pure dChan + env <- liftIO $ QuoteThreadEnv barsRef <$> newIORef HM.empty <*> pure qsc <*> newIORef M.empty <*> pure dChan tid <- liftIO . forkIO $ quoteThread env chan - downloaderTid <- liftIO . forkIO $ downloaderThread env dChan + downloaderTid <- liftIO . forkIO $ downloadThreadRunner (downloaderThread env dChan) return $ QuoteThreadHandle tid downloaderTid env where downloaderThread env chan = forever $ do - QuoteSubscription tickerid tf <- readChan chan + QuoteSubscription tickerid tf <- liftIO $ readChan chan paramsMap <- liftIO $ readIORef $ paramsCache env mbParams <- case M.lookup tickerid paramsMap of Nothing -> do - paramsList <- liftIO $ getInstrumentParameters (tickerInfoProvider env) [tickerid] + paramsList <- getInstrumentParameters [tickerid] case paramsList of (params:_) -> liftIO $ atomicModifyIORef' (paramsCache env) (\m -> (M.insert tickerid params m, Just params)) _ -> return Nothing Just params -> return $ Just params - barsMap <- readIORef (bars env) + barsMap <- liftIO $ readIORef (bars env) case M.lookup (BarSeriesId tickerid tf) barsMap of Just _ -> return () -- already downloaded Nothing -> case mbParams of Just params -> do now <- liftIO getCurrentTime - barsData <- liftIO $ getHistory (historyProvider env) tickerid tf ((-86400 * 60) `addUTCTime` now) now + barsData <- getHistory tickerid tf ((-86400 * 60) `addUTCTime` now) now let barSeries = BarSeries tickerid tf barsData params - atomicModifyIORef' (bars env) (\m -> (M.insert (BarSeriesId tickerid tf) barSeries m, ())) + liftIO $ atomicModifyIORef' (bars env) (\m -> (M.insert (BarSeriesId tickerid tf) barSeries m, ())) _ -> return () -- TODO log diff --git a/src/ATrade/Quotes/HistoryProvider.hs b/src/ATrade/Quotes/HistoryProvider.hs index ad7a9a4..96147a1 100644 --- a/src/ATrade/Quotes/HistoryProvider.hs +++ b/src/ATrade/Quotes/HistoryProvider.hs @@ -7,8 +7,6 @@ module ATrade.Quotes.HistoryProvider import ATrade.RoboCom.Types (Bar) import ATrade.Types (BarTimeframe, TickerId) import Data.Time (UTCTime) -newtype HistoryProvider = - HistoryProvider - { - getHistory :: TickerId -> BarTimeframe -> UTCTime -> UTCTime -> IO [Bar] - } + +class (Monad m) => HistoryProvider m where + getHistory :: TickerId -> BarTimeframe -> UTCTime -> UTCTime -> m [Bar] diff --git a/src/ATrade/Quotes/TickerInfoProvider.hs b/src/ATrade/Quotes/TickerInfoProvider.hs index f66efae..c38097a 100644 --- a/src/ATrade/Quotes/TickerInfoProvider.hs +++ b/src/ATrade/Quotes/TickerInfoProvider.hs @@ -6,8 +6,7 @@ module ATrade.Quotes.TickerInfoProvider import ATrade.RoboCom.Types (InstrumentParameters) import ATrade.Types (TickerId) -newtype TickerInfoProvider = - TickerInfoProvider - { - getInstrumentParameters :: [TickerId] -> IO [InstrumentParameters] - } + +class (Monad m) => TickerInfoProvider m where + getInstrumentParameters :: [TickerId] -> m [InstrumentParameters] + diff --git a/test/Test/Driver/Junction/QuoteThread.hs b/test/Test/Driver/Junction/QuoteThread.hs index 4413764..827fffa 100644 --- a/test/Test/Driver/Junction/QuoteThread.hs +++ b/test/Test/Driver/Junction/QuoteThread.hs @@ -1,4 +1,7 @@ -{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TypeSynonymInstances #-} module Test.Driver.Junction.QuoteThread ( @@ -13,6 +16,8 @@ import Test.Tasty.SmallCheck as SC import ATrade.Driver.Junction.QuoteThread (addSubscription, startQuoteThread, stopQuoteThread) +import ATrade.Quotes.HistoryProvider (HistoryProvider (..)) +import ATrade.Quotes.TickerInfoProvider (TickerInfoProvider (..)) import ATrade.QuoteSource.Client (QuoteData (QDBar)) import ATrade.QuoteSource.Server (QuoteSourceServerData (..), startQuoteSourceServer, @@ -26,6 +31,7 @@ import Control.Concurrent.BoundedChan (newBoundedChan, readChan, writeChan) import Control.Exception (bracket) import Control.Monad (forever) +import Control.Monad.Reader import Data.IORef (newIORef, readIORef) import qualified Data.Map.Strict as M import qualified Data.Text as T @@ -38,8 +44,31 @@ import System.Log.Handler (setFormatter) import System.Log.Handler.Simple import System.Log.Logger import System.ZMQ4 (withContext) -import Test.Mock.HistoryProvider (mkMockHistoryProvider) -import Test.Mock.TickerInfoProvider (mkMockTickerInfoProvider) +import Test.Mock.HistoryProvider (MockHistoryProvider, + mkMockHistoryProvider, + mockGetHistory) +import Test.Mock.TickerInfoProvider (MockTickerInfoProvider, + mkMockTickerInfoProvider, + mockGetInstrumentParameters) + +data TestEnv = + TestEnv + { + historyProvider :: MockHistoryProvider, + tickerInfoProvider :: MockTickerInfoProvider + } + +type TestM = ReaderT TestEnv IO + +instance HistoryProvider TestM where + getHistory tid tf from to = do + hp <- asks historyProvider + liftIO $ mockGetHistory hp tid tf from to + +instance TickerInfoProvider TestM where + getInstrumentParameters tickers = do + tip <- asks tickerInfoProvider + liftIO $ mockGetInstrumentParameters tip tickers qsEndpoint = "inproc://qs" @@ -61,7 +90,8 @@ testSubscription = testCase "Subscription" $ withContext $ \ctx -> do (startQuoteSourceServer serverChan ctx qsEndpoint defaultServerSecurityParams) stopQuoteSourceServer $ \_ -> bracket - (startQuoteThread barsRef ctx qsEndpoint Nothing Nothing mockHistoryProvider mockTickerInfoProvider) + (startQuoteThread barsRef ctx qsEndpoint Nothing Nothing (`runReaderT` (TestEnv mockHistoryProvider mockTickerInfoProvider))) + stopQuoteThread $ \qt -> do chan <- newBoundedChan 2000 addSubscription qt "FOO" (BarTimeframe 3600) chan diff --git a/test/Test/Mock/HistoryProvider.hs b/test/Test/Mock/HistoryProvider.hs index 0630e9f..3dbef67 100644 --- a/test/Test/Mock/HistoryProvider.hs +++ b/test/Test/Mock/HistoryProvider.hs @@ -1,7 +1,9 @@ module Test.Mock.HistoryProvider ( - mkMockHistoryProvider + MockHistoryProvider, + mkMockHistoryProvider, + mockGetHistory ) where import ATrade.Quotes.HistoryProvider @@ -9,17 +11,17 @@ import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId), Bars) import ATrade.Types (Bar (Bar, barTimestamp), BarTimeframe (BarTimeframe), TickerId) +import Control.Monad.IO.Class (MonadIO) import qualified Data.Map.Strict as M import Data.Time (UTCTime) -mkMockHistoryProvider :: M.Map BarSeriesId [Bar] -> HistoryProvider -mkMockHistoryProvider bars = HistoryProvider $ mockGetHistory bars +data MockHistoryProvider = MockHistoryProvider (M.Map BarSeriesId [Bar]) -mockGetHistory :: M.Map BarSeriesId [Bar] -> TickerId -> BarTimeframe -> UTCTime -> UTCTime -> IO [Bar] -mockGetHistory bars tid tf from to = +mkMockHistoryProvider :: M.Map BarSeriesId [Bar] -> MockHistoryProvider +mkMockHistoryProvider = MockHistoryProvider + +mockGetHistory :: (MonadIO m) => MockHistoryProvider -> TickerId -> BarTimeframe -> UTCTime -> UTCTime -> m [Bar] +mockGetHistory (MockHistoryProvider bars) tid tf from to = case M.lookup (BarSeriesId tid tf) bars of Just series -> return $ filter (\bar -> (barTimestamp bar >= from) && (barTimestamp bar <= to)) series Nothing -> return [] - - - diff --git a/test/Test/Mock/TickerInfoProvider.hs b/test/Test/Mock/TickerInfoProvider.hs index 18d79c7..a0bc6d5 100644 --- a/test/Test/Mock/TickerInfoProvider.hs +++ b/test/Test/Mock/TickerInfoProvider.hs @@ -1,17 +1,22 @@ module Test.Mock.TickerInfoProvider ( - mkMockTickerInfoProvider + MockTickerInfoProvider, + mkMockTickerInfoProvider, + mockGetInstrumentParameters ) where import ATrade.Quotes.TickerInfoProvider import ATrade.RoboCom.Types (InstrumentParameters) import ATrade.Types (TickerId) +import Control.Monad.IO.Class (MonadIO) import qualified Data.Map.Strict as M import Data.Maybe (catMaybes, mapMaybe) -mkMockTickerInfoProvider :: M.Map TickerId InstrumentParameters -> TickerInfoProvider -mkMockTickerInfoProvider params = TickerInfoProvider $ mockGetInstrumentParameters params +data MockTickerInfoProvider = MockTickerInfoProvider (M.Map TickerId InstrumentParameters) -mockGetInstrumentParameters :: M.Map TickerId InstrumentParameters -> [TickerId] -> IO [InstrumentParameters] -mockGetInstrumentParameters params = return . mapMaybe (`M.lookup` params) +mkMockTickerInfoProvider :: (M.Map TickerId InstrumentParameters) -> MockTickerInfoProvider +mkMockTickerInfoProvider = MockTickerInfoProvider + +mockGetInstrumentParameters :: MockTickerInfoProvider -> [TickerId] -> IO [InstrumentParameters] +mockGetInstrumentParameters (MockTickerInfoProvider params) = return . mapMaybe (`M.lookup` params)