Browse Source

Make HistoryProvider and TickerInfoProvider monad typeclasses

junction
Denis Tereshkin 4 years ago
parent
commit
72c421c64f
  1. 8
      robocom-zero.cabal
  2. 47
      src/ATrade/Driver/Junction/QuoteThread.hs
  3. 8
      src/ATrade/Quotes/HistoryProvider.hs
  4. 9
      src/ATrade/Quotes/TickerInfoProvider.hs
  5. 36
      test/Test/Driver/Junction/QuoteThread.hs
  6. 18
      test/Test/Mock/HistoryProvider.hs
  7. 15
      test/Test/Mock/TickerInfoProvider.hs

8
robocom-zero.cabal

@ -17,8 +17,10 @@ library
hs-source-dirs: src hs-source-dirs: src
ghc-options: -Wall -fno-warn-orphans -Wno-type-defaults ghc-options: -Wall -fno-warn-orphans -Wno-type-defaults
exposed-modules: ATrade.RoboCom.Indicators exposed-modules: ATrade.RoboCom.Indicators
, ATrade.RoboCom.ConfigStorage
, ATrade.RoboCom.Monad , ATrade.RoboCom.Monad
, ATrade.RoboCom.Positions , ATrade.RoboCom.Positions
, ATrade.RoboCom.Persistence
, ATrade.RoboCom.Types , ATrade.RoboCom.Types
, ATrade.RoboCom.Utils , ATrade.RoboCom.Utils
, ATrade.Quotes , ATrade.Quotes
@ -29,9 +31,11 @@ library
-- , ATrade.Driver.Backtest -- , ATrade.Driver.Backtest
, ATrade.Driver.Junction , ATrade.Driver.Junction
, ATrade.Driver.Junction.Types , ATrade.Driver.Junction.Types
, ATrade.Driver.Junction.QuoteThread
, ATrade.Driver.Junction.QuoteStream
, ATrade.Driver.Junction.RobotDriverThread
, ATrade.BarAggregator , ATrade.BarAggregator
, ATrade.RoboCom , ATrade.RoboCom
, ATrade.Driver.Junction.QuoteThread
, ATrade.Quotes.HistoryProvider , ATrade.Quotes.HistoryProvider
, ATrade.Quotes.TickerInfoProvider , ATrade.Quotes.TickerInfoProvider
other-modules: Paths_robocom_zero other-modules: Paths_robocom_zero
@ -109,7 +113,7 @@ test-suite robots-test
, zeromq4-haskell , zeromq4-haskell
, zeromq4-haskell-zap , zeromq4-haskell-zap
, BoundedChan , BoundedChan
, hslogger , mtl
ghc-options: -threaded -rtsopts -with-rtsopts=-N ghc-options: -threaded -rtsopts -with-rtsopts=-N
default-language: Haskell2010 default-language: Haskell2010
other-modules: Test.RoboCom.Indicators other-modules: Test.RoboCom.Indicators

47
src/ATrade/Driver/Junction/QuoteThread.hs

@ -1,5 +1,6 @@
{-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE LambdaCase #-} {-# LANGUAGE LambdaCase #-}
{-# LANGUAGE ScopedTypeVariables #-}
module ATrade.Driver.Junction.QuoteThread module ATrade.Driver.Junction.QuoteThread
( (
@ -9,6 +10,7 @@ module ATrade.Driver.Junction.QuoteThread
addSubscription addSubscription
) where ) where
import ATrade.Driver.Junction.QuoteStream (QuoteSubscription (..))
import ATrade.Quotes.HistoryProvider (HistoryProvider (..)) import ATrade.Quotes.HistoryProvider (HistoryProvider (..))
import ATrade.Quotes.TickerInfoProvider (TickerInfoProvider (..)) import ATrade.Quotes.TickerInfoProvider (TickerInfoProvider (..))
import ATrade.QuoteSource.Client (QuoteData (QDBar, QDTick), import ATrade.QuoteSource.Client (QuoteData (QDBar, QDTick),
@ -20,32 +22,27 @@ import ATrade.RoboCom.Types (Bar (barSecurity),
BarSeries (..), BarSeries (..),
BarSeriesId (BarSeriesId), BarSeriesId (BarSeriesId),
Bars, InstrumentParameters) Bars, InstrumentParameters)
import ATrade.Types (BarTimeframe (BarTimeframe), ClientSecurityParams (ClientSecurityParams), import ATrade.Types (BarTimeframe (BarTimeframe),
ClientSecurityParams (ClientSecurityParams),
Tick (security), TickerId) Tick (security), TickerId)
import Control.Concurrent (ThreadId, forkIO, killThread) import Control.Concurrent (ThreadId, forkIO,
import Control.Concurrent.BoundedChan (BoundedChan, newBoundedChan, killThread)
readChan, writeChan) import Control.Concurrent.BoundedChan (BoundedChan,
newBoundedChan, readChan,
writeChan)
import Control.Monad (forever) import Control.Monad (forever)
import Control.Monad.Reader (MonadIO (liftIO), import Control.Monad.Reader (MonadIO (liftIO),
ReaderT (runReaderT), lift) ReaderT (runReaderT), lift)
import Control.Monad.Reader.Class (asks) import Control.Monad.Reader.Class (asks)
import Data.Hashable (Hashable)
import qualified Data.HashMap.Strict as HM import qualified Data.HashMap.Strict as HM
import Data.IORef (IORef, atomicModifyIORef', import Data.IORef (IORef, atomicModifyIORef',
newIORef, readIORef) newIORef, readIORef)
import qualified Data.Map.Strict as M import qualified Data.Map.Strict as M
import qualified Data.Text as T import qualified Data.Text as T
import Data.Time (addUTCTime, getCurrentTime) import Data.Time (addUTCTime, getCurrentTime)
import GHC.Generics (Generic)
import System.ZMQ4 (Context) import System.ZMQ4 (Context)
import System.ZMQ4.ZAP (CurveCertificate) import System.ZMQ4.ZAP (CurveCertificate)
data QuoteSubscription =
QuoteSubscription TickerId BarTimeframe
deriving (Generic, Eq)
instance Hashable BarTimeframe
instance Hashable QuoteSubscription
data QuoteThreadHandle = QuoteThreadHandle ThreadId ThreadId QuoteThreadEnv data QuoteThreadHandle = QuoteThreadHandle ThreadId ThreadId QuoteThreadEnv
@ -56,48 +53,48 @@ data QuoteThreadEnv =
endpoints :: IORef (HM.HashMap QuoteSubscription [BoundedChan QuoteData]), endpoints :: IORef (HM.HashMap QuoteSubscription [BoundedChan QuoteData]),
qsclient :: QuoteSourceClientHandle, qsclient :: QuoteSourceClientHandle,
paramsCache :: IORef (M.Map TickerId InstrumentParameters), paramsCache :: IORef (M.Map TickerId InstrumentParameters),
historyProvider :: HistoryProvider,
tickerInfoProvider :: TickerInfoProvider,
downloaderChan :: BoundedChan QuoteSubscription downloaderChan :: BoundedChan QuoteSubscription
} }
startQuoteThread :: (MonadIO m) => startQuoteThread :: (MonadIO m,
MonadIO m1,
HistoryProvider m1,
TickerInfoProvider m1) =>
IORef Bars -> IORef Bars ->
Context -> Context ->
T.Text -> T.Text ->
Maybe CurveCertificate -> Maybe CurveCertificate ->
Maybe CurveCertificate -> Maybe CurveCertificate ->
HistoryProvider -> (m1 () -> IO ()) ->
TickerInfoProvider ->
m QuoteThreadHandle m QuoteThreadHandle
startQuoteThread barsRef ctx ep clientCert serverCert hp tip = do startQuoteThread barsRef ctx ep clientCert serverCert downloadThreadRunner = do
chan <- liftIO $ newBoundedChan 2000 chan <- liftIO $ newBoundedChan 2000
dChan <- liftIO $ newBoundedChan 2000 dChan <- liftIO $ newBoundedChan 2000
qsc <- liftIO $ startQuoteSourceClient chan [] ctx ep (ClientSecurityParams clientCert serverCert) qsc <- liftIO $ startQuoteSourceClient chan [] ctx ep (ClientSecurityParams clientCert serverCert)
env <- liftIO $ QuoteThreadEnv barsRef <$> newIORef HM.empty <*> pure qsc <*> newIORef M.empty <*> pure hp <*> pure tip <*> pure dChan env <- liftIO $ QuoteThreadEnv barsRef <$> newIORef HM.empty <*> pure qsc <*> newIORef M.empty <*> pure dChan
tid <- liftIO . forkIO $ quoteThread env chan tid <- liftIO . forkIO $ quoteThread env chan
downloaderTid <- liftIO . forkIO $ downloaderThread env dChan downloaderTid <- liftIO . forkIO $ downloadThreadRunner (downloaderThread env dChan)
return $ QuoteThreadHandle tid downloaderTid env return $ QuoteThreadHandle tid downloaderTid env
where where
downloaderThread env chan = forever $ do downloaderThread env chan = forever $ do
QuoteSubscription tickerid tf <- readChan chan QuoteSubscription tickerid tf <- liftIO $ readChan chan
paramsMap <- liftIO $ readIORef $ paramsCache env paramsMap <- liftIO $ readIORef $ paramsCache env
mbParams <- case M.lookup tickerid paramsMap of mbParams <- case M.lookup tickerid paramsMap of
Nothing -> do Nothing -> do
paramsList <- liftIO $ getInstrumentParameters (tickerInfoProvider env) [tickerid] paramsList <- getInstrumentParameters [tickerid]
case paramsList of case paramsList of
(params:_) -> liftIO $ atomicModifyIORef' (paramsCache env) (\m -> (M.insert tickerid params m, Just params)) (params:_) -> liftIO $ atomicModifyIORef' (paramsCache env) (\m -> (M.insert tickerid params m, Just params))
_ -> return Nothing _ -> return Nothing
Just params -> return $ Just params Just params -> return $ Just params
barsMap <- readIORef (bars env) barsMap <- liftIO $ readIORef (bars env)
case M.lookup (BarSeriesId tickerid tf) barsMap of case M.lookup (BarSeriesId tickerid tf) barsMap of
Just _ -> return () -- already downloaded Just _ -> return () -- already downloaded
Nothing -> case mbParams of Nothing -> case mbParams of
Just params -> do Just params -> do
now <- liftIO getCurrentTime now <- liftIO getCurrentTime
barsData <- liftIO $ getHistory (historyProvider env) tickerid tf ((-86400 * 60) `addUTCTime` now) now barsData <- getHistory tickerid tf ((-86400 * 60) `addUTCTime` now) now
let barSeries = BarSeries tickerid tf barsData params let barSeries = BarSeries tickerid tf barsData params
atomicModifyIORef' (bars env) (\m -> (M.insert (BarSeriesId tickerid tf) barSeries m, ())) liftIO $ atomicModifyIORef' (bars env) (\m -> (M.insert (BarSeriesId tickerid tf) barSeries m, ()))
_ -> return () -- TODO log _ -> return () -- TODO log

8
src/ATrade/Quotes/HistoryProvider.hs

@ -7,8 +7,6 @@ module ATrade.Quotes.HistoryProvider
import ATrade.RoboCom.Types (Bar) import ATrade.RoboCom.Types (Bar)
import ATrade.Types (BarTimeframe, TickerId) import ATrade.Types (BarTimeframe, TickerId)
import Data.Time (UTCTime) import Data.Time (UTCTime)
newtype HistoryProvider =
HistoryProvider class (Monad m) => HistoryProvider m where
{ getHistory :: TickerId -> BarTimeframe -> UTCTime -> UTCTime -> m [Bar]
getHistory :: TickerId -> BarTimeframe -> UTCTime -> UTCTime -> IO [Bar]
}

9
src/ATrade/Quotes/TickerInfoProvider.hs

@ -6,8 +6,7 @@ module ATrade.Quotes.TickerInfoProvider
import ATrade.RoboCom.Types (InstrumentParameters) import ATrade.RoboCom.Types (InstrumentParameters)
import ATrade.Types (TickerId) import ATrade.Types (TickerId)
newtype TickerInfoProvider =
TickerInfoProvider class (Monad m) => TickerInfoProvider m where
{ getInstrumentParameters :: [TickerId] -> m [InstrumentParameters]
getInstrumentParameters :: [TickerId] -> IO [InstrumentParameters]
}

36
test/Test/Driver/Junction/QuoteThread.hs

@ -1,4 +1,7 @@
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeSynonymInstances #-}
module Test.Driver.Junction.QuoteThread module Test.Driver.Junction.QuoteThread
( (
@ -13,6 +16,8 @@ import Test.Tasty.SmallCheck as SC
import ATrade.Driver.Junction.QuoteThread (addSubscription, import ATrade.Driver.Junction.QuoteThread (addSubscription,
startQuoteThread, startQuoteThread,
stopQuoteThread) stopQuoteThread)
import ATrade.Quotes.HistoryProvider (HistoryProvider (..))
import ATrade.Quotes.TickerInfoProvider (TickerInfoProvider (..))
import ATrade.QuoteSource.Client (QuoteData (QDBar)) import ATrade.QuoteSource.Client (QuoteData (QDBar))
import ATrade.QuoteSource.Server (QuoteSourceServerData (..), import ATrade.QuoteSource.Server (QuoteSourceServerData (..),
startQuoteSourceServer, startQuoteSourceServer,
@ -26,6 +31,7 @@ import Control.Concurrent.BoundedChan (newBoundedChan, readChan,
writeChan) writeChan)
import Control.Exception (bracket) import Control.Exception (bracket)
import Control.Monad (forever) import Control.Monad (forever)
import Control.Monad.Reader
import Data.IORef (newIORef, readIORef) import Data.IORef (newIORef, readIORef)
import qualified Data.Map.Strict as M import qualified Data.Map.Strict as M
import qualified Data.Text as T import qualified Data.Text as T
@ -38,8 +44,31 @@ import System.Log.Handler (setFormatter)
import System.Log.Handler.Simple import System.Log.Handler.Simple
import System.Log.Logger import System.Log.Logger
import System.ZMQ4 (withContext) import System.ZMQ4 (withContext)
import Test.Mock.HistoryProvider (mkMockHistoryProvider) import Test.Mock.HistoryProvider (MockHistoryProvider,
import Test.Mock.TickerInfoProvider (mkMockTickerInfoProvider) mkMockHistoryProvider,
mockGetHistory)
import Test.Mock.TickerInfoProvider (MockTickerInfoProvider,
mkMockTickerInfoProvider,
mockGetInstrumentParameters)
data TestEnv =
TestEnv
{
historyProvider :: MockHistoryProvider,
tickerInfoProvider :: MockTickerInfoProvider
}
type TestM = ReaderT TestEnv IO
instance HistoryProvider TestM where
getHistory tid tf from to = do
hp <- asks historyProvider
liftIO $ mockGetHistory hp tid tf from to
instance TickerInfoProvider TestM where
getInstrumentParameters tickers = do
tip <- asks tickerInfoProvider
liftIO $ mockGetInstrumentParameters tip tickers
qsEndpoint = "inproc://qs" qsEndpoint = "inproc://qs"
@ -61,7 +90,8 @@ testSubscription = testCase "Subscription" $ withContext $ \ctx -> do
(startQuoteSourceServer serverChan ctx qsEndpoint defaultServerSecurityParams) (startQuoteSourceServer serverChan ctx qsEndpoint defaultServerSecurityParams)
stopQuoteSourceServer $ \_ -> stopQuoteSourceServer $ \_ ->
bracket bracket
(startQuoteThread barsRef ctx qsEndpoint Nothing Nothing mockHistoryProvider mockTickerInfoProvider) (startQuoteThread barsRef ctx qsEndpoint Nothing Nothing (`runReaderT` (TestEnv mockHistoryProvider mockTickerInfoProvider)))
stopQuoteThread $ \qt -> do stopQuoteThread $ \qt -> do
chan <- newBoundedChan 2000 chan <- newBoundedChan 2000
addSubscription qt "FOO" (BarTimeframe 3600) chan addSubscription qt "FOO" (BarTimeframe 3600) chan

18
test/Test/Mock/HistoryProvider.hs

@ -1,7 +1,9 @@
module Test.Mock.HistoryProvider module Test.Mock.HistoryProvider
( (
mkMockHistoryProvider MockHistoryProvider,
mkMockHistoryProvider,
mockGetHistory
) where ) where
import ATrade.Quotes.HistoryProvider import ATrade.Quotes.HistoryProvider
@ -9,17 +11,17 @@ import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId), Bars)
import ATrade.Types (Bar (Bar, barTimestamp), import ATrade.Types (Bar (Bar, barTimestamp),
BarTimeframe (BarTimeframe), BarTimeframe (BarTimeframe),
TickerId) TickerId)
import Control.Monad.IO.Class (MonadIO)
import qualified Data.Map.Strict as M import qualified Data.Map.Strict as M
import Data.Time (UTCTime) import Data.Time (UTCTime)
mkMockHistoryProvider :: M.Map BarSeriesId [Bar] -> HistoryProvider data MockHistoryProvider = MockHistoryProvider (M.Map BarSeriesId [Bar])
mkMockHistoryProvider bars = HistoryProvider $ mockGetHistory bars
mockGetHistory :: M.Map BarSeriesId [Bar] -> TickerId -> BarTimeframe -> UTCTime -> UTCTime -> IO [Bar] mkMockHistoryProvider :: M.Map BarSeriesId [Bar] -> MockHistoryProvider
mockGetHistory bars tid tf from to = mkMockHistoryProvider = MockHistoryProvider
mockGetHistory :: (MonadIO m) => MockHistoryProvider -> TickerId -> BarTimeframe -> UTCTime -> UTCTime -> m [Bar]
mockGetHistory (MockHistoryProvider bars) tid tf from to =
case M.lookup (BarSeriesId tid tf) bars of case M.lookup (BarSeriesId tid tf) bars of
Just series -> return $ filter (\bar -> (barTimestamp bar >= from) && (barTimestamp bar <= to)) series Just series -> return $ filter (\bar -> (barTimestamp bar >= from) && (barTimestamp bar <= to)) series
Nothing -> return [] Nothing -> return []

15
test/Test/Mock/TickerInfoProvider.hs

@ -1,17 +1,22 @@
module Test.Mock.TickerInfoProvider module Test.Mock.TickerInfoProvider
( (
mkMockTickerInfoProvider MockTickerInfoProvider,
mkMockTickerInfoProvider,
mockGetInstrumentParameters
) where ) where
import ATrade.Quotes.TickerInfoProvider import ATrade.Quotes.TickerInfoProvider
import ATrade.RoboCom.Types (InstrumentParameters) import ATrade.RoboCom.Types (InstrumentParameters)
import ATrade.Types (TickerId) import ATrade.Types (TickerId)
import Control.Monad.IO.Class (MonadIO)
import qualified Data.Map.Strict as M import qualified Data.Map.Strict as M
import Data.Maybe (catMaybes, mapMaybe) import Data.Maybe (catMaybes, mapMaybe)
mkMockTickerInfoProvider :: M.Map TickerId InstrumentParameters -> TickerInfoProvider data MockTickerInfoProvider = MockTickerInfoProvider (M.Map TickerId InstrumentParameters)
mkMockTickerInfoProvider params = TickerInfoProvider $ mockGetInstrumentParameters params
mockGetInstrumentParameters :: M.Map TickerId InstrumentParameters -> [TickerId] -> IO [InstrumentParameters] mkMockTickerInfoProvider :: (M.Map TickerId InstrumentParameters) -> MockTickerInfoProvider
mockGetInstrumentParameters params = return . mapMaybe (`M.lookup` params) mkMockTickerInfoProvider = MockTickerInfoProvider
mockGetInstrumentParameters :: MockTickerInfoProvider -> [TickerId] -> IO [InstrumentParameters]
mockGetInstrumentParameters (MockTickerInfoProvider params) = return . mapMaybe (`M.lookup` params)

Loading…
Cancel
Save