diff --git a/robocom-zero.cabal b/robocom-zero.cabal index 9fabed7..a49c781 100644 --- a/robocom-zero.cabal +++ b/robocom-zero.cabal @@ -17,63 +17,65 @@ library hs-source-dirs: src ghc-options: -Wall -fno-warn-orphans -Wno-type-defaults exposed-modules: ATrade.RoboCom.Indicators + , ATrade.RoboCom.ConfigStorage , ATrade.RoboCom.Monad , ATrade.RoboCom.Positions + , ATrade.RoboCom.Persistence , ATrade.RoboCom.Types , ATrade.RoboCom.Utils , ATrade.Quotes - , ATrade.Quotes.Finam , ATrade.Quotes.QHP , ATrade.Quotes.QTIS - , ATrade.Driver.Real - , ATrade.Driver.Backtest +-- , ATrade.Driver.Backtest , ATrade.Driver.Junction , ATrade.Driver.Junction.Types + , ATrade.Driver.Junction.QuoteThread + , ATrade.Driver.Junction.QuoteStream + , ATrade.Driver.Junction.RobotDriverThread + , ATrade.Driver.Junction.ProgramConfiguration + , ATrade.Driver.Junction.BrokerService , ATrade.BarAggregator , ATrade.RoboCom + , ATrade.Quotes.HistoryProvider + , ATrade.Quotes.TickerInfoProvider other-modules: Paths_robocom_zero build-depends: base >= 4.7 && < 5 - , libatrade >= 0.9.0.0 && < 0.10.0.0 + , libatrade >= 0.12.0.0 && < 0.13.0.0 , text , text-icu - , errors , lens , bytestring - , cassava , containers , time , vector - , wreq , safe - , hslogger - , parsec - , parsec-numbers , aeson , binary , binary-ieee754 , zeromq4-haskell + , zeromq4-haskell-zap , unordered-containers + , hashable , th-printf , BoundedChan , monad-loops - , conduit , safe-exceptions , mtl , transformers - , list-extras , optparse-applicative - , split , signal - , random , hedis , gitrev , data-default , template-haskell - + , bimap + , dhall + , extra + , co-log + , text-show + default-language: Haskell2010 other-modules: ATrade.Exceptions - , ATrade.Driver.Real.BrokerClientThread - , ATrade.Driver.Real.QuoteSourceThread , ATrade.Driver.Types test-suite robots-test @@ -95,12 +97,19 @@ test-suite robots-test , quickcheck-instances , containers , safe + , zeromq4-haskell + , zeromq4-haskell-zap + , BoundedChan + , mtl ghc-options: -threaded -rtsopts -with-rtsopts=-N default-language: Haskell2010 other-modules: Test.RoboCom.Indicators - , Test.RoboCom.Positions , Test.RoboCom.Utils + , Test.Driver.Junction.QuoteThread , Test.BarAggregator + , ArbitraryInstances + , Test.Mock.HistoryProvider + , Test.Mock.TickerInfoProvider source-repository head type: git diff --git a/src/ATrade/BarAggregator.hs b/src/ATrade/BarAggregator.hs index 904ec74..f385e52 100644 --- a/src/ATrade/BarAggregator.hs +++ b/src/ATrade/BarAggregator.hs @@ -79,12 +79,12 @@ handleTick tick = runState $ do case M.lookup (security tick) mybars of Just series -> case bsBars series of (b:bs) -> do - let currentBn = barNumber (barTimestamp b) (tfSeconds $ bsTimeframe series) + let currentBn = barNumber (barTimestamp b) (fromIntegral . unBarTimeframe $ bsTimeframe series) case datatype tick of LastTradePrice -> if volume tick > 0 then - if currentBn == barNumber (timestamp tick) (tfSeconds $ bsTimeframe series) + if currentBn == barNumber (timestamp tick) (fromIntegral . unBarTimeframe $ bsTimeframe series) then do lBars %= M.insert (security tick) series { bsBars = updateBar b tick : bs } return Nothing @@ -94,7 +94,7 @@ handleTick tick = runState $ do else return Nothing _ -> - if currentBn == barNumber (timestamp tick) (tfSeconds $ bsTimeframe series) + if currentBn == barNumber (timestamp tick) (fromIntegral . unBarTimeframe $ bsTimeframe series) then do lBars %= M.insert (security tick) series { bsBars = updateBarTimestamp b tick : bs } return Nothing @@ -147,8 +147,8 @@ updateTime tick = runState $ do case M.lookup (security tick) mybars of Just series -> case bsBars series of (b:bs) -> do - let currentBn = barNumber (barTimestamp b) (tfSeconds $ bsTimeframe series) - let thisBn = barNumber (timestamp tick) (tfSeconds $ bsTimeframe series) + let currentBn = barNumber (barTimestamp b) (fromIntegral . unBarTimeframe $ bsTimeframe series) + let thisBn = barNumber (timestamp tick) (fromIntegral . unBarTimeframe $ bsTimeframe series) if | currentBn == thisBn -> do lBars %= M.insert (security tick) series { bsBars = updateBarTimestamp b tick : bs } diff --git a/src/ATrade/Driver/Junction.hs b/src/ATrade/Driver/Junction.hs index e89de78..45276be 100644 --- a/src/ATrade/Driver/Junction.hs +++ b/src/ATrade/Driver/Junction.hs @@ -1,58 +1,338 @@ +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE OverloadedStrings #-} module ATrade.Driver.Junction ( junctionMain ) where -import ATrade.Driver.Junction.Types (StrategyDescriptor (..), - StrategyInstance (..), - StrategyInstanceDescriptor (..)) -import Data.Aeson (decode) -import qualified Data.ByteString as B -import qualified Data.ByteString.Lazy as BL -import Data.IORef -import qualified Data.Map.Strict as M -import qualified Data.Text as T +import ATrade.Broker.Client (BrokerClientHandle, + startBrokerClient, + stopBrokerClient) +import ATrade.Broker.Protocol (Notification (OrderNotification, TradeNotification), + NotificationSqnum (unNotificationSqnum), + getNotificationSqnum) +import ATrade.Driver.Junction.BrokerService (BrokerService, + getNotifications, + mkBrokerService) +import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (..), + ProgramOptions (ProgramOptions, configPath)) +import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription, removeSubscription), + QuoteSubscription (QuoteSubscription), + SubscriptionId (SubscriptionId)) +import ATrade.Driver.Junction.QuoteThread (DownloaderEnv (DownloaderEnv), + QuoteThreadHandle, + withQThread) +import qualified ATrade.Driver.Junction.QuoteThread as QT +import ATrade.Driver.Junction.RobotDriverThread (RobotDriverHandle, RobotEnv (RobotEnv), + RobotM (..), + createRobotDriverThread, + onStrategyInstance, + postNotificationEvent) +import ATrade.Driver.Junction.Types (StrategyDescriptorE (StrategyDescriptorE), + StrategyInstance (strategyInstanceId), + StrategyInstanceDescriptor (..), + confStrategy, + confTickers, + strategyState, + strategyTimers, + tickerId, + timeframe) +import ATrade.Logging (Message, Severity (Debug, Error, Info, Trace, Warning), + fmtMessage, + logWarning, + logWith) +import ATrade.Quotes.QHP (mkQHPHandle) +import ATrade.RoboCom.ConfigStorage (ConfigStorage (loadConfig)) +import ATrade.RoboCom.Monad (StrategyEnvironment (..)) +import ATrade.RoboCom.Persistence (MonadPersistence (loadState, saveState)) +import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId), + Bars) +import ATrade.Types (ClientSecurityParams (ClientSecurityParams), + OrderId, + Trade (tradeOrderId)) +import Colog (HasLog (getLogAction, setLogAction), + LogAction, + logTextStdout, + (>$<)) +import Colog.Actions (logTextHandle) +import Control.Concurrent (threadDelay) +import Control.Exception.Safe (MonadThrow, + bracket) +import Control.Monad (forM_, forever) +import Control.Monad.Extra (whenM) +import Control.Monad.IO.Class (MonadIO (liftIO)) +import Control.Monad.Reader (MonadReader, ReaderT (runReaderT), + asks) +import Data.Aeson (eitherDecode, + encode) +import qualified Data.ByteString.Lazy as BL +import Data.Default (Default (def)) +import Data.Foldable (traverse_) +import Data.IORef (IORef, + atomicModifyIORef', + newIORef, + readIORef) +import Data.List.NonEmpty (NonEmpty ((:|))) +import qualified Data.Map.Strict as M +import Data.Set (notMember) +import qualified Data.Set as S +import qualified Data.Text as T +import Data.Text.Encoding (encodeUtf8) +import Data.Text.IO (readFile) +import Data.Time (getCurrentTime) +import Data.Time.Clock.POSIX (getPOSIXTime) +import Database.Redis (ConnectInfo (..), + Connection, + PortID (UnixSocket), + checkedConnect, + defaultConnectInfo, + get, mset, + runRedis) +import Dhall (auto, input) +import Options.Applicative (Parser, + execParser, + fullDesc, header, + help, helper, + info, long, + metavar, progDesc, + short, strOption, + (<**>)) +import Prelude hiding (log, + readFile) +import System.IO (BufferMode (LineBuffering), + Handle, + IOMode (AppendMode), + hSetBuffering, + openFile, + withFile) +import System.ZMQ4 (withContext) +import System.ZMQ4.ZAP (loadCertificateFromFile) -load :: T.Text -> IO B.ByteString -load = undefined +data JunctionEnv = + JunctionEnv + { + peRedisSocket :: Connection, + peConfigPath :: FilePath, + peQuoteThread :: QuoteThreadHandle, + peBroker :: BrokerClientHandle, + peRobots :: IORef (M.Map T.Text RobotDriverHandle), + peLogAction :: LogAction JunctionM Message + } -junctionMain :: M.Map T.Text StrategyDescriptor -> IO () +newtype JunctionM a = JunctionM { unJunctionM :: ReaderT JunctionEnv IO a } + deriving (Functor, Applicative, Monad, MonadReader JunctionEnv, MonadIO, MonadThrow) + +instance HasLog JunctionEnv Message JunctionM where + getLogAction = peLogAction + setLogAction a e = e { peLogAction = a } + +instance ConfigStorage JunctionM where + loadConfig key = do + basePath <- asks peConfigPath + let path = basePath <> "/" <> T.unpack key -- TODO fix path construction + liftIO $ readFile path >>= input auto + +instance MonadPersistence JunctionM where + saveState newState key = do + conn <- asks peRedisSocket + now <- liftIO getPOSIXTime + res <- liftIO $ runRedis conn $ mset [(encodeUtf8 key, BL.toStrict $ encode newState), + (encodeUtf8 (key <> ":last_store") , encodeUtf8 . T.pack . show $ now)] + case res of + Left _ -> logWarning "Junction " "Unable to save state" + Right _ -> return () + + loadState key = do + conn <- asks peRedisSocket + res <- liftIO $ runRedis conn $ get (encodeUtf8 key) + -- TODO: just chain eithers + case res of + Left _ -> do + logWarning "Junction" "Unable to load state" + return def + Right maybeRawState -> + case maybeRawState of + Just rawState -> case eitherDecode $ BL.fromStrict rawState of + Left _ -> do + logWarning "Junction" "Unable to decode state" + return def + Right decodedState -> return decodedState + Nothing -> do + logWarning "Junction" "Unable to decode state" + return def + +instance QuoteStream JunctionM where + addSubscription (QuoteSubscription ticker timeframe) chan = do + qt <- asks peQuoteThread + QT.addSubscription qt ticker timeframe chan + return (SubscriptionId 0) -- TODO subscription Ids + removeSubscription _ = undefined + +logger :: (MonadIO m) => Handle -> LogAction m Message +logger h = fmtMessage >$< (logTextStdout <> logTextHandle h) + +junctionMain :: M.Map T.Text StrategyDescriptorE -> IO () junctionMain descriptors = do - parseOptions - instanceDescriptors <- undefined - strategies <- mkStrategies instanceDescriptors + opts <- parseOptions - start strategies + let initialLogger = fmtMessage >$< logTextStdout - where - parseOptions = undefined - - mkStrategies :: [StrategyInstanceDescriptor] -> IO [StrategyInstance] - mkStrategies = mapM mkStrategy - - mkStrategy :: StrategyInstanceDescriptor -> IO StrategyInstance - mkStrategy desc = do - sState <- load (stateKey desc) - sCfg <- load (configKey desc) - case M.lookup (strategyId desc) descriptors of - Just (StrategyDescriptor _sName sCallback _sDefState) -> - case (decode $ BL.fromStrict sCfg, decode $ BL.fromStrict sState) of - (Just pCfg, Just pState) -> do - cfgRef <- newIORef pCfg - stateRef <- newIORef pState - return $ StrategyInstance + logWith initialLogger Info "Junction" $ "Reading config from: " <> (T.pack . show) (configPath opts) + + cfg <- readFile (configPath opts) >>= input auto + + withFile (logBasePath cfg <> "/all.log") AppendMode $ \h -> do + + let log = logWith (logger h) + + barsMap <- newIORef M.empty + + log Info "Junction" $ "Connecting to redis: " <> redisSocket cfg + redis <- checkedConnect (defaultConnectInfo { connectPort = UnixSocket (T.unpack $ redisSocket cfg) }) + log Info "Junction" "redis: connected" + withContext $ \ctx -> do + log Debug "Junction" "0mq context created" + let downloaderLogAction = logger h + let downloaderEnv = DownloaderEnv (mkQHPHandle ctx (qhpEndpoint cfg)) ctx (qtisEndpoint cfg) downloaderLogAction + robotsMap <- newIORef M.empty + ordersMap <- newIORef M.empty + handledNotifications <- newIORef S.empty + withBroker cfg ctx robotsMap ordersMap handledNotifications (logger h) $ \bro -> + withQThread downloaderEnv barsMap cfg ctx (logger h) $ \qt -> do + broService <- mkBrokerService bro ordersMap + let junctionLogAction = logger h + let env = + JunctionEnv { - strategyInstanceId = strategyName desc, - strategyEventCallback = sCallback, - strategyState = stateRef, - strategyConfig = cfgRef + peRedisSocket = redis, + peConfigPath = robotsConfigsPath cfg, + peQuoteThread = qt, + peBroker = bro, + peRobots = robotsMap, + peLogAction = junctionLogAction } - _ -> undefined - _ -> undefined + withJunction env $ do + startRobots h cfg barsMap broService + forever $ do + notifications <- liftIO $ getNotifications broService + forM_ notifications (liftIO . handleBrokerNotification robotsMap ordersMap handledNotifications (logger h)) + saveRobots + liftIO $ threadDelay 1000000 + where + saveRobots :: JunctionM () + saveRobots = do + robotsMap <- asks peRobots >>= (liftIO . readIORef) + traverse_ saveRobotState robotsMap + + saveRobotState :: RobotDriverHandle -> JunctionM () + saveRobotState handle = onStrategyInstance handle $ \inst -> do + currentState <- liftIO $ readIORef (strategyState inst) + saveState currentState (strategyInstanceId inst) + currentTimers <- liftIO $ readIORef (strategyTimers inst) + saveState currentTimers (strategyInstanceId inst <> ":timers") + + startRobots :: Handle -> ProgramConfiguration -> IORef Bars -> BrokerService -> JunctionM () + startRobots logHandle cfg barsMap broService = forM_ (instances cfg) $ \inst -> do + now <- liftIO getCurrentTime + case M.lookup (strategyBaseName inst) descriptors of + Just (StrategyDescriptorE desc) -> do + bigConf <- loadConfig (configKey inst) + case confTickers bigConf of + (firstTicker:restTickers) -> do + rConf <- liftIO $ newIORef (confStrategy bigConf) + rState <- loadState (stateKey inst) >>= liftIO . newIORef + rTimers <- loadState (stateKey inst <> ":timers") >>= liftIO . newIORef + localH <- liftIO $ openFile (logBasePath cfg <> "/" <> T.unpack (strategyId inst) <> ".log") AppendMode + liftIO $ hSetBuffering localH LineBuffering + let robotLogAction = logger logHandle <> (fmtMessage >$< logTextHandle localH) + stratEnv <- liftIO $ newIORef StrategyEnvironment + { + _seInstanceId = strategyId inst, + _seAccount = accountId inst, + _seVolume = 1, + _seLastTimestamp = now + } + let robotEnv = RobotEnv rState rConf rTimers barsMap stratEnv robotLogAction broService (toBarSeriesId <$> (firstTicker :| restTickers)) + robot <- createRobotDriverThread inst desc (flip runReaderT robotEnv . unRobotM) bigConf rConf rState rTimers + robotsMap' <- asks peRobots + liftIO $ atomicModifyIORef' robotsMap' (\s -> (M.insert (strategyId inst) robot s, ())) + _ -> logWith (logger logHandle) Error (strategyId inst) $ "No tickers configured !!!" + Nothing -> error "Unknown strategy" + + toBarSeriesId t = BarSeriesId (tickerId t) (timeframe t) + + withJunction :: JunctionEnv -> JunctionM () -> IO () + withJunction env = (`runReaderT` env) . unJunctionM + + handleBrokerNotification :: IORef (M.Map T.Text RobotDriverHandle) -> + IORef (M.Map OrderId T.Text) -> + IORef (S.Set NotificationSqnum) -> + LogAction IO Message -> + Notification -> + IO () + handleBrokerNotification robotsRef ordersMapRef handled logger notification= do + logWith logger Trace "Junction" $ "Incoming notification: " <> (T.pack . show . unNotificationSqnum . getNotificationSqnum) notification + whenM (notMember (getNotificationSqnum notification) <$> readIORef handled) $ do + robotsMap <- readIORef robotsRef + ordersMap <- readIORef ordersMapRef + + case getNotificationTarget robotsMap ordersMap notification of + Just robot -> postNotificationEvent robot notification + Nothing -> do + logWith logger Warning "Junction" $ "Unknown order: " <> (T.pack . show) (notificationOrderId notification) + logWith logger Debug "Junction" $ "Ordermap: " <> (T.pack . show) (M.toList ordersMap) + + atomicModifyIORef' handled (\s -> (S.insert (getNotificationSqnum notification) s, ())) + + getNotificationTarget :: M.Map T.Text RobotDriverHandle -> M.Map OrderId T.Text -> Notification -> Maybe RobotDriverHandle + getNotificationTarget robotsMap ordersMap notification = do + robotId <- M.lookup (notificationOrderId notification) ordersMap + M.lookup robotId robotsMap + + notificationOrderId (OrderNotification _ oid _) = oid + notificationOrderId (TradeNotification _ trade) = tradeOrderId trade + + withBroker cfg ctx robotsMap ordersMap handled logger f = do + securityParameters <- loadBrokerSecurityParameters cfg + bracket + (startBrokerClient + "broker" + ctx + (brokerEndpoint cfg) + (brokerNotificationEndpoint cfg) + [handleBrokerNotification robotsMap ordersMap handled logger] + securityParameters + logger) + stopBrokerClient f - start = undefined + loadBrokerSecurityParameters cfg = + case (brokerClientCert cfg, brokerServerCert cfg) of + (Just clientCertPath, Just serverCertPath) -> do + eClientCert <- loadCertificateFromFile clientCertPath + eServerCert <- loadCertificateFromFile serverCertPath + case (eClientCert, eServerCert) of + (Right clientCert, Right serverCert) -> return $ ClientSecurityParams (Just clientCert) (Just serverCert) + (_, _) -> return $ ClientSecurityParams Nothing Nothing + _ -> return $ ClientSecurityParams Nothing Nothing + parseOptions = execParser options + options = info (optionsParser <**> helper) + (fullDesc <> + progDesc "Robocom-zero junction mode driver" <> + header "robocom-zero-junction") + optionsParser :: Parser ProgramOptions + optionsParser = ProgramOptions + <$> strOption + (long "config" <> + short 'c' <> + metavar "FILENAME" <> + help "Configuration file path") diff --git a/src/ATrade/Driver/Junction/BrokerService.hs b/src/ATrade/Driver/Junction/BrokerService.hs new file mode 100644 index 0000000..a03f085 --- /dev/null +++ b/src/ATrade/Driver/Junction/BrokerService.hs @@ -0,0 +1,56 @@ +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE OverloadedStrings #-} + +module ATrade.Driver.Junction.BrokerService + ( + BrokerService, + mkBrokerService, + submitOrder, + cancelOrder, + getNotifications + ) where + +import qualified ATrade.Broker.Client as Bro +import ATrade.Broker.Protocol (Notification (..)) +import ATrade.Logging (Message, logDebug) +import ATrade.Types (Order (..), OrderId) +import Colog (WithLog) +import Control.Monad.IO.Class (MonadIO (liftIO)) +import Control.Monad.Reader.Class (MonadReader) +import Data.IORef (IORef, atomicModifyIORef', + newIORef) +import qualified Data.Map.Strict as M +import qualified Data.Text as T + +data BrokerService = + BrokerService + { + broker :: Bro.BrokerClientHandle, + orderMap :: IORef (M.Map OrderId T.Text), + orderIdCounter :: IORef OrderId + } + +mkBrokerService :: Bro.BrokerClientHandle -> IORef (M.Map OrderId T.Text) -> IO BrokerService +mkBrokerService h om = BrokerService h om <$> newIORef 1 + +submitOrder :: (MonadIO m, WithLog env Message m, MonadReader env m) => BrokerService -> T.Text -> Order -> m OrderId +submitOrder service identity order = do + oid <- nextOrderId service + logDebug "BrokerService" $ "New order, id: " <> (T.pack . show) oid + liftIO $ atomicModifyIORef' (orderMap service) (\s -> (M.insert oid identity s, ())) + _ <- liftIO $ Bro.submitOrder (broker service) order { orderId = oid } + return oid + where + nextOrderId srv = liftIO $ atomicModifyIORef' (orderIdCounter srv) (\s -> (s + 1, s)) + +cancelOrder :: BrokerService -> OrderId -> IO () +cancelOrder service oid = do + _ <- Bro.cancelOrder (broker service) oid + return () + +getNotifications :: BrokerService -> IO [Notification] +getNotifications service = do + v <- Bro.getNotifications (broker service) + case v of + Left _ -> return [] + Right n -> return n diff --git a/src/ATrade/Driver/Junction/ProgramConfiguration.hs b/src/ATrade/Driver/Junction/ProgramConfiguration.hs new file mode 100644 index 0000000..af2cde5 --- /dev/null +++ b/src/ATrade/Driver/Junction/ProgramConfiguration.hs @@ -0,0 +1,37 @@ +{-# LANGUAGE DeriveGeneric #-} + +module ATrade.Driver.Junction.ProgramConfiguration + ( + ProgramOptions(..), + ProgramConfiguration(..) + ) where +import ATrade.Driver.Junction.Types (StrategyInstanceDescriptor) +import qualified Data.Text as T +import Dhall (FromDhall) +import GHC.Generics (Generic) + +newtype ProgramOptions = + ProgramOptions + { + configPath :: FilePath + } + +data ProgramConfiguration = + ProgramConfiguration + { + brokerEndpoint :: T.Text, + brokerNotificationEndpoint :: T.Text, + brokerServerCert :: Maybe FilePath, + brokerClientCert :: Maybe FilePath, + quotesourceEndpoint :: T.Text, + quotesourceServerCert :: Maybe FilePath, + quotesourceClientCert :: Maybe FilePath, + qhpEndpoint :: T.Text, + qtisEndpoint :: T.Text, + redisSocket :: T.Text, + robotsConfigsPath :: FilePath, + logBasePath :: FilePath, + instances :: [StrategyInstanceDescriptor] + } deriving (Generic, Show) + +instance FromDhall ProgramConfiguration diff --git a/src/ATrade/Driver/Junction/QuoteStream.hs b/src/ATrade/Driver/Junction/QuoteStream.hs new file mode 100644 index 0000000..d391147 --- /dev/null +++ b/src/ATrade/Driver/Junction/QuoteStream.hs @@ -0,0 +1,27 @@ +{-# LANGUAGE DeriveGeneric #-} + +module ATrade.Driver.Junction.QuoteStream + ( + QuoteSubscription(..), + QuoteStream(..), + SubscriptionId(..) + ) where + +import ATrade.QuoteSource.Client (QuoteData) +import ATrade.Types (BarTimeframe, TickerId) +import Control.Concurrent.BoundedChan (BoundedChan) +import Data.Hashable (Hashable) +import GHC.Generics (Generic) + +data QuoteSubscription = + QuoteSubscription TickerId BarTimeframe + deriving (Generic, Eq) + +instance Hashable BarTimeframe +instance Hashable QuoteSubscription + +newtype SubscriptionId = SubscriptionId { unSubscriptionId :: Int } + +class (Monad m) => QuoteStream m where + addSubscription :: QuoteSubscription -> BoundedChan QuoteData -> m SubscriptionId + removeSubscription :: SubscriptionId -> m () diff --git a/src/ATrade/Driver/Junction/QuoteThread.hs b/src/ATrade/Driver/Junction/QuoteThread.hs new file mode 100644 index 0000000..baa5230 --- /dev/null +++ b/src/ATrade/Driver/Junction/QuoteThread.hs @@ -0,0 +1,228 @@ +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TypeSynonymInstances #-} + +module ATrade.Driver.Junction.QuoteThread + ( + QuoteThreadHandle, + startQuoteThread, + stopQuoteThread, + addSubscription, + DownloaderM, + DownloaderEnv(..), + runDownloaderM, + withQThread + ) where + +import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (..)) +import ATrade.Driver.Junction.QuoteStream (QuoteSubscription (..)) +import ATrade.Logging (Message) +import ATrade.Quotes.HistoryProvider (HistoryProvider (..)) +import ATrade.Quotes.QHP (QHPHandle, requestHistoryFromQHP) +import ATrade.Quotes.QTIS (TickerInfo (tiLotSize, tiTickSize, tiTicker), + qtisGetTickersInfo) +import ATrade.Quotes.TickerInfoProvider (TickerInfoProvider (..)) +import ATrade.QuoteSource.Client (QuoteData (QDBar, QDTick), + QuoteSourceClientHandle, + quoteSourceClientSubscribe, + startQuoteSourceClient, + stopQuoteSourceClient) +import ATrade.RoboCom.Types (Bar (barSecurity), + BarSeries (..), + BarSeriesId (BarSeriesId), + Bars, + InstrumentParameters (InstrumentParameters)) +import ATrade.Types (BarTimeframe (BarTimeframe), + ClientSecurityParams (ClientSecurityParams), + Tick (security), + TickerId) +import Colog (HasLog (getLogAction, setLogAction), + LogAction, + WithLog) +import Control.Concurrent (ThreadId, forkIO, + killThread) +import Control.Concurrent.BoundedChan (BoundedChan, + newBoundedChan, + readChan, + writeChan) +import Control.Exception.Safe (MonadMask, + MonadThrow, + bracket) +import Control.Monad (forM, forever) +import Control.Monad.Reader (MonadIO (liftIO), ReaderT (runReaderT), + lift) +import Control.Monad.Reader.Class (MonadReader, asks) +import qualified Data.HashMap.Strict as HM +import Data.IORef (IORef, + atomicModifyIORef', + newIORef, + readIORef) +import qualified Data.Map.Strict as M +import qualified Data.Text as T +import Data.Time (addUTCTime, + getCurrentTime) +import System.ZMQ4 (Context) +import System.ZMQ4.ZAP (loadCertificateFromFile) + + +data QuoteThreadHandle = QuoteThreadHandle ThreadId ThreadId QuoteThreadEnv + +data QuoteThreadEnv = + QuoteThreadEnv + { + bars :: IORef Bars, + endpoints :: IORef (HM.HashMap QuoteSubscription [BoundedChan QuoteData]), + qsclient :: QuoteSourceClientHandle, + paramsCache :: IORef (M.Map TickerId InstrumentParameters), + downloaderChan :: BoundedChan QuoteSubscription + } + +startQuoteThread :: (MonadIO m, + MonadIO m1, + WithLog DownloaderEnv Message m1, + HistoryProvider m1, + TickerInfoProvider m1) => + IORef Bars -> + Context -> + T.Text -> + ClientSecurityParams -> + (m1 () -> IO ()) -> + LogAction IO Message -> + m QuoteThreadHandle +startQuoteThread barsRef ctx ep secparams downloadThreadRunner logger = do + chan <- liftIO $ newBoundedChan 2000 + dChan <- liftIO $ newBoundedChan 2000 + qsc <- liftIO $ startQuoteSourceClient chan [] ctx ep secparams logger + env <- liftIO $ QuoteThreadEnv barsRef <$> newIORef HM.empty <*> pure qsc <*> newIORef M.empty <*> pure dChan + tid <- liftIO . forkIO $ quoteThread env chan + downloaderTid <- liftIO . forkIO $ downloadThreadRunner (downloaderThread env dChan) + return $ QuoteThreadHandle tid downloaderTid env + where + downloaderThread env chan = forever $ do + QuoteSubscription tickerid tf <- liftIO $ readChan chan + paramsMap <- liftIO $ readIORef $ paramsCache env + mbParams <- case M.lookup tickerid paramsMap of + Nothing -> do + paramsList <- getInstrumentParameters [tickerid] + case paramsList of + (params:_) -> liftIO $ atomicModifyIORef' (paramsCache env) (\m -> (M.insert tickerid params m, Just params)) + _ -> return Nothing + Just params -> return $ Just params + barsMap <- liftIO $ readIORef (bars env) + case M.lookup (BarSeriesId tickerid tf) barsMap of + Just _ -> return () -- already downloaded + Nothing -> case mbParams of + Just params -> do + now <- liftIO getCurrentTime + barsData <- getHistory tickerid tf ((-86400 * 60) `addUTCTime` now) now + let barSeries = BarSeries tickerid tf barsData params + liftIO $ atomicModifyIORef' (bars env) (\m -> (M.insert (BarSeriesId tickerid tf) barSeries m, ())) + _ -> return () -- TODO log + + + quoteThread env chan = flip runReaderT env $ forever $ do + qssData <- lift $ readChan chan + case qssData of + QDBar (tf, bar) -> do + barsRef' <- asks bars + lift $ atomicModifyIORef' barsRef' (\x -> (updateBarsMap x bar tf, ())) + _ -> return () -- TODO pass to bar aggregator + let key = case qssData of + QDTick tick -> QuoteSubscription (security tick) (BarTimeframe 0) + QDBar (tf, bar) -> QuoteSubscription (barSecurity bar) tf + subs <- asks endpoints >>= (lift . readIORef) + case HM.lookup key subs of + Just clientChannels -> do + lift $ mapM_ (`writeChan` qssData) clientChannels + Nothing -> return () + +stopQuoteThread :: (MonadIO m) => QuoteThreadHandle -> m () +stopQuoteThread (QuoteThreadHandle tid dtid env) = liftIO $ do + killThread tid + killThread dtid + stopQuoteSourceClient (qsclient env) + +addSubscription :: (MonadIO m) => QuoteThreadHandle -> TickerId -> BarTimeframe -> BoundedChan QuoteData -> m () +addSubscription (QuoteThreadHandle _ _ env) tid tf chan = liftIO $ do + writeChan (downloaderChan env) (QuoteSubscription tid tf) + atomicModifyIORef' (endpoints env) (\m -> (doAddSubscription m tid, ())) + quoteSourceClientSubscribe (qsclient env) [(tid, BarTimeframe 0)] + where + doAddSubscription m tickerid = + let m1 = HM.alter (\case + Just chans -> Just (chan : chans) + _ -> Just [chan]) (QuoteSubscription tickerid tf) m in + HM.alter (\case + Just chans -> Just (chan : chans) + _ -> Just [chan]) (QuoteSubscription tickerid (BarTimeframe 0)) m1 + +updateBarsMap :: Bars -> Bar -> BarTimeframe -> Bars +updateBarsMap barsMap bar tf = M.adjust (addToSeries bar) (BarSeriesId (barSecurity bar) tf) barsMap + +addToSeries :: Bar -> BarSeries -> BarSeries +addToSeries bar series = series { bsBars = bar : bsBars series } + +data DownloaderEnv = + DownloaderEnv + { + qhp :: QHPHandle, + downloaderContext :: Context, + downloaderQtisEndpoint :: T.Text, + logAction :: LogAction DownloaderM Message + } + +newtype DownloaderM a = DownloaderM { unDownloaderM :: ReaderT DownloaderEnv IO a } + deriving (Functor, Applicative, Monad, MonadReader DownloaderEnv, MonadIO, MonadThrow) + +instance HasLog DownloaderEnv Message DownloaderM where + getLogAction = logAction + setLogAction a e = e { logAction = a } + +instance HistoryProvider DownloaderM where + getHistory tid tf from to = do + q <- asks qhp + requestHistoryFromQHP q tid tf from to + +instance TickerInfoProvider DownloaderM where + getInstrumentParameters tickers = do + ctx <- asks downloaderContext + ep <- asks downloaderQtisEndpoint + tis <- forM tickers (qtisGetTickersInfo ctx ep) + pure $ convert `fmap` tis + where + convert ti = InstrumentParameters + (tiTicker ti) + (fromInteger $ tiLotSize ti) + (tiTickSize ti) + +withQThread :: DownloaderEnv -> IORef Bars -> ProgramConfiguration -> Context -> LogAction IO Message -> (QuoteThreadHandle -> IO ()) -> IO () +withQThread env barsMap cfg ctx logger f = do + securityParameters <- loadSecurityParameters + bracket + (startQuoteThread + barsMap + ctx + (quotesourceEndpoint cfg) + securityParameters + (runDownloaderM env) + logger) + stopQuoteThread f + where + loadSecurityParameters = + case (quotesourceClientCert cfg, quotesourceServerCert cfg) of + (Just clientCertPath, Just serverCertPath) -> do + eClientCert <- loadCertificateFromFile clientCertPath + eServerCert <- loadCertificateFromFile serverCertPath + case (eClientCert, eServerCert) of + (Right clientCert, Right serverCert) -> return $ ClientSecurityParams (Just clientCert) (Just serverCert) + (_, _) -> return $ ClientSecurityParams Nothing Nothing + + _ -> return $ ClientSecurityParams Nothing Nothing + +runDownloaderM :: DownloaderEnv -> DownloaderM () -> IO () +runDownloaderM env = (`runReaderT` env) . unDownloaderM diff --git a/src/ATrade/Driver/Junction/RobotDriverThread.hs b/src/ATrade/Driver/Junction/RobotDriverThread.hs new file mode 100644 index 0000000..ae40d36 --- /dev/null +++ b/src/ATrade/Driver/Junction/RobotDriverThread.hs @@ -0,0 +1,194 @@ +{-# LANGUAGE ExistentialQuantification #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RankNTypes #-} + +module ATrade.Driver.Junction.RobotDriverThread + ( + createRobotDriverThread, + RobotEnv(..), + RobotM(..), + RobotDriverHandle, + onStrategyInstance, + postNotificationEvent) where + +import ATrade.Broker.Protocol (Notification (OrderNotification, TradeNotification)) +import qualified ATrade.Driver.Junction.BrokerService as Bro +import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription), + QuoteSubscription (QuoteSubscription)) +import ATrade.Driver.Junction.Types (BigConfig, + StrategyDescriptor, + StrategyInstance (StrategyInstance, strategyEventCallback), + StrategyInstanceDescriptor (configKey), + confStrategy, + confTickers, + eventCallback, stateKey, + strategyId, tickerId, + timeframe) +import ATrade.Logging (Message, log, logDebug, + logInfo, logWarning) +import ATrade.QuoteSource.Client (QuoteData (..)) +import ATrade.RoboCom.ConfigStorage (ConfigStorage) +import ATrade.RoboCom.Monad (Event (NewBar, NewTick, NewTrade, OrderSubmitted, OrderUpdate), + MonadRobot (..), + StrategyEnvironment (StrategyEnvironment, _seInstanceId, _seLastTimestamp)) +import ATrade.RoboCom.Persistence (MonadPersistence) +import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId), + Bars) +import ATrade.Types (Order (orderId), OrderId, + OrderState, Trade) +import Colog (HasLog (getLogAction, setLogAction), + LogAction) +import Control.Concurrent (ThreadId, forkIO) +import Control.Concurrent.BoundedChan (BoundedChan, + newBoundedChan, readChan, + writeChan) +import Control.Exception.Safe (MonadThrow) +import Control.Monad (forM_, forever, void) +import Control.Monad.IO.Class (MonadIO, liftIO) +import Control.Monad.Reader (MonadReader (local), + ReaderT, asks) +import Data.Aeson (FromJSON, ToJSON) +import Data.Default +import Data.IORef (IORef, + atomicModifyIORef', + readIORef, writeIORef) +import Data.List.NonEmpty (NonEmpty) +import qualified Data.Map.Strict as M +import qualified Data.Text.Lazy as TL +import Data.Time (UTCTime, getCurrentTime) +import Dhall (FromDhall) +import Prelude hiding (log) + +data RobotDriverHandle = forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) => + RobotDriverHandle (StrategyInstance c s) ThreadId ThreadId (BoundedChan RobotDriverEvent) + +data RobotDriverRequest + +data RobotDriverEvent = + EventRequest RobotDriverRequest + | QuoteEvent QuoteData + | NewTradeEvent Trade + | OrderEvent OrderId OrderState + + +robotDriverThread :: (MonadIO m, + MonadRobot m c s) => + StrategyInstance c s -> + BoundedChan RobotDriverEvent -> + m () + +robotDriverThread inst eventQueue = + forever $ liftIO (readChan eventQueue) >>= handleEvent + where + handleEvent (EventRequest _) = return () + handleEvent (QuoteEvent d) = + case d of + QDTick tick -> strategyEventCallback inst (NewTick tick) + QDBar (tf, bar) -> strategyEventCallback inst (NewBar (tf, bar)) + handleEvent (NewTradeEvent trade) = strategyEventCallback inst (NewTrade trade) + handleEvent (OrderEvent oid newState) = strategyEventCallback inst (OrderUpdate oid newState) + +createRobotDriverThread :: (MonadIO m1, + ConfigStorage m1, + MonadPersistence m1, + QuoteStream m1, + Default s, + FromJSON s, + ToJSON s, + FromDhall c, + MonadIO m, + MonadReader (RobotEnv c s) m, + MonadRobot m c s) => + StrategyInstanceDescriptor + -> StrategyDescriptor c s + -> (m () -> IO ()) + -> BigConfig c + -> IORef c + -> IORef s + -> IORef [UTCTime] + -> m1 RobotDriverHandle + +createRobotDriverThread instDesc strDesc runner bigConf rConf rState rTimers = do + eventQueue <- liftIO $ newBoundedChan 2000 + + let inst = StrategyInstance (strategyId instDesc) (eventCallback strDesc) rState rConf rTimers + + quoteQueue <- liftIO $ newBoundedChan 2000 + forM_ (confTickers bigConf) (\x -> addSubscription (QuoteSubscription (tickerId x) (timeframe x)) quoteQueue) + qthread <- liftIO . forkIO $ forever $ passQuoteEvents eventQueue quoteQueue + + driver <- liftIO . forkIO $ runner $ robotDriverThread inst eventQueue + return $ RobotDriverHandle inst driver qthread eventQueue + + where + passQuoteEvents eventQueue quoteQueue = do + v <- readChan quoteQueue + writeChan eventQueue (QuoteEvent v) + +onStrategyInstance :: RobotDriverHandle -> forall r. (forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) => StrategyInstance c s -> r) -> r +onStrategyInstance (RobotDriverHandle inst _ _ _) f = f inst + +data RobotEnv c s = + RobotEnv + { + stateRef :: IORef s, + configRef :: IORef c, + timersRef :: IORef [UTCTime], + bars :: IORef Bars, + env :: IORef StrategyEnvironment, + logAction :: LogAction (RobotM c s) Message, + brokerService :: Bro.BrokerService, + tickers :: NonEmpty BarSeriesId + } + +newtype RobotM c s a = RobotM { unRobotM :: ReaderT (RobotEnv c s) IO a } + deriving (Functor, Applicative, Monad, MonadReader (RobotEnv c s), MonadIO, MonadThrow) + +instance HasLog (RobotEnv c s) Message (RobotM c s) where + getLogAction = logAction + setLogAction a e = e { logAction = a } + +instance MonadRobot (RobotM c s) c s where + submitOrder order = do + instId <- _seInstanceId <$> (asks env >>= liftIO . readIORef) + bro <- asks brokerService + Bro.submitOrder bro instId order + + cancelOrder oid = do + bro <- asks brokerService + liftIO . void $ Bro.cancelOrder bro oid + + appendToLog s t = do + instId <- _seInstanceId <$> (asks env >>= liftIO . readIORef) + log s instId $ TL.toStrict t + + setupTimer t = do + ref <- asks timersRef + liftIO $ atomicModifyIORef' ref (\s -> (t : s, ())) + + enqueueIOAction = undefined + getConfig = asks configRef >>= liftIO . readIORef + getState = asks stateRef >>= liftIO . readIORef + setState newState = asks stateRef >>= liftIO . flip writeIORef newState + getEnvironment = do + ref <- asks env + now <- liftIO getCurrentTime + liftIO $ atomicModifyIORef' ref (\e -> (e { _seLastTimestamp = now }, e { _seLastTimestamp = now})) + + getTicker tid tf = do + b <- asks bars >>= liftIO . readIORef + return $ M.lookup (BarSeriesId tid tf) b + + getAvailableTickers = asks tickers + +postNotificationEvent :: (MonadIO m) => RobotDriverHandle -> Notification -> m () +postNotificationEvent (RobotDriverHandle _ _ _ eventQueue) notification = liftIO $ + case notification of + OrderNotification _ oid state -> writeChan eventQueue (OrderEvent oid state) + TradeNotification _ trade -> writeChan eventQueue (NewTradeEvent trade) + + diff --git a/src/ATrade/Driver/Junction/Types.hs b/src/ATrade/Driver/Junction/Types.hs index d0cdd3c..8054daf 100644 --- a/src/ATrade/Driver/Junction/Types.hs +++ b/src/ATrade/Driver/Junction/Types.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE RankNTypes #-} @@ -6,49 +8,72 @@ module ATrade.Driver.Junction.Types StrategyDescriptor(..), TickerConfig(..), StrategyInstanceDescriptor(..), - StrategyInstance(..) + StrategyInstance(..), + BigConfig(..), + StrategyDescriptorE(..), + StrategyInstanceE(..) ) where import ATrade.RoboCom.Monad (EventCallback) -import ATrade.Types (BarTimeframe, TickerId) +import ATrade.Types (BarTimeframe (..), TickerId) import Data.Aeson (FromJSON (..), ToJSON (..)) -import qualified Data.ByteString as B -import Data.IORef +import Data.Default (Default) +import Data.IORef (IORef) import qualified Data.Text as T +import Data.Time (UTCTime) +import Dhall (FromDhall, autoWith, natural) +import GHC.Generics (Generic) -data StrategyDescriptor = - forall c s. (FromJSON s, ToJSON s, FromJSON c) => +data StrategyDescriptor c s = StrategyDescriptor { baseStrategyName :: T.Text, - eventCallback :: EventCallback c s, - defaultState :: s + eventCallback :: EventCallback c s } +data StrategyDescriptorE = forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) => StrategyDescriptorE (StrategyDescriptor c s) + data TickerConfig = TickerConfig { tickerId :: TickerId, timeframe :: BarTimeframe } + deriving (Generic) + +instance FromDhall BarTimeframe where + autoWith _ = BarTimeframe . fromIntegral <$> natural + +instance FromDhall TickerConfig + +data BigConfig c = BigConfig { + confTickers :: [TickerConfig], + confStrategy :: c +} deriving (Generic) + +instance (FromDhall c) => FromDhall (BigConfig c) data StrategyInstanceDescriptor = StrategyInstanceDescriptor { - strategyId :: T.Text, - strategyName :: T.Text, - configKey :: T.Text, - stateKey :: T.Text, - logPath :: T.Text, - tickers :: [TickerConfig] - } + accountId :: T.Text, + strategyId :: T.Text, + strategyBaseName :: T.Text, + configKey :: T.Text, + stateKey :: T.Text, + logPath :: T.Text + } deriving (Generic, Show) -data StrategyInstance = - forall c s. (FromJSON s, ToJSON s, FromJSON c) => +instance FromDhall StrategyInstanceDescriptor + +data StrategyInstance c s = StrategyInstance { strategyInstanceId :: T.Text, strategyEventCallback :: EventCallback c s, strategyState :: IORef s, - strategyConfig :: IORef c + strategyConfig :: IORef c, + strategyTimers :: IORef [UTCTime] } + +data StrategyInstanceE = forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) => StrategyInstanceE (StrategyInstance c s) diff --git a/src/ATrade/Quotes/Finam.hs b/src/ATrade/Quotes/Finam.hs deleted file mode 100644 index 005409d..0000000 --- a/src/ATrade/Quotes/Finam.hs +++ /dev/null @@ -1,361 +0,0 @@ -{-# LANGUAGE FlexibleInstances #-} -{-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE TypeSynonymInstances #-} - -module ATrade.Quotes.Finam ( - downloadFinamSymbols, - Symbol(..), - Period(..), - DateFormat(..), - TimeFormat(..), - FieldSeparator(..), - RequestParams(..), - defaultParams, - downloadQuotes, - parseQuotes, - downloadAndParseQuotes, - Row(..) -) where - -import ATrade.Types -import Control.Error.Util -import Control.Exception -import Control.Lens -import Control.Monad -import qualified Data.ByteString as B -import qualified Data.ByteString.Char8 as B8 -import qualified Data.ByteString.Lazy as BL -import Data.Csv hiding (Options) -import Data.List -import qualified Data.Map as M -import Data.Maybe -import qualified Data.Text as T -import qualified Data.Text.ICU.Convert as TC -import Data.Time.Calendar -import Data.Time.Clock -import Data.Time.Format -import qualified Data.Vector as V -import Network.Wreq -import Safe -import System.Log.Logger -import Text.Parsec -import Text.ParserCombinators.Parsec.Number - -data Period = - PeriodTick | - Period1Min | - Period5Min | - Period10Min | - Period15Min | - Period30Min | - PeriodHour | - PeriodDay | - PeriodWeek | - PeriodMonth - deriving (Show, Eq) - -instance Enum Period where - fromEnum PeriodTick = 1 - fromEnum Period1Min = 2 - fromEnum Period5Min = 3 - fromEnum Period10Min = 4 - fromEnum Period15Min = 5 - fromEnum Period30Min = 6 - fromEnum PeriodHour = 7 - fromEnum PeriodDay = 8 - fromEnum PeriodWeek = 9 - fromEnum PeriodMonth = 10 - - toEnum 1 = PeriodTick - toEnum 2 = Period1Min - toEnum 3 = Period5Min - toEnum 4 = Period10Min - toEnum 5 = Period15Min - toEnum 6 = Period30Min - toEnum 7 = PeriodHour - toEnum 8 = PeriodDay - toEnum 9 = PeriodWeek - toEnum 10 = PeriodMonth - toEnum _ = PeriodDay - -data DateFormat = - FormatYYYYMMDD | - FormatYYMMDD | - FormatDDMMYY | - FormatDD_MM_YY | - FormatMM_DD_YY - deriving (Show, Eq) - -instance Enum DateFormat where - fromEnum FormatYYYYMMDD = 1 - fromEnum FormatYYMMDD = 2 - fromEnum FormatDDMMYY = 3 - fromEnum FormatDD_MM_YY = 4 - fromEnum FormatMM_DD_YY = 5 - - toEnum 1 = FormatYYYYMMDD - toEnum 2 = FormatYYMMDD - toEnum 3 = FormatDDMMYY - toEnum 4 = FormatDD_MM_YY - toEnum 5 = FormatMM_DD_YY - toEnum _ = FormatYYYYMMDD - - -data TimeFormat = - FormatHHMMSS | - FormatHHMM | - FormatHH_MM_SS | - FormatHH_MM - deriving (Show, Eq) - -instance Enum TimeFormat where - fromEnum FormatHHMMSS = 1 - fromEnum FormatHHMM = 2 - fromEnum FormatHH_MM_SS = 3 - fromEnum FormatHH_MM = 4 - - toEnum 1 = FormatHHMMSS - toEnum 2 = FormatHHMM - toEnum 3 = FormatHH_MM_SS - toEnum 4 = FormatHH_MM - toEnum _ = FormatHHMMSS - -data FieldSeparator = - SeparatorComma | - SeparatorPeriod | - SeparatorSemicolon | - SeparatorTab | - SeparatorSpace - deriving (Show, Eq) - -instance Enum FieldSeparator where - fromEnum SeparatorComma = 1 - fromEnum SeparatorPeriod = 2 - fromEnum SeparatorSemicolon = 3 - fromEnum SeparatorTab = 4 - fromEnum SeparatorSpace = 5 - - toEnum 1 = SeparatorComma - toEnum 2 = SeparatorPeriod - toEnum 3 = SeparatorSemicolon - toEnum 4 = SeparatorTab - toEnum 5 = SeparatorSpace - toEnum _ = SeparatorComma - -data RequestParams = RequestParams { - ticker :: T.Text, - startDate :: Day, - endDate :: Day, - period :: Period, - dateFormat :: DateFormat, - timeFormat :: TimeFormat, - fieldSeparator :: FieldSeparator, - includeHeader :: Bool, - fillEmpty :: Bool -} - -defaultParams :: RequestParams -defaultParams = RequestParams { - ticker = "", - startDate = fromGregorian 1970 1 1, - endDate = fromGregorian 1970 1 1, - period = PeriodDay, - dateFormat = FormatYYYYMMDD, - timeFormat = FormatHHMMSS, - fieldSeparator = SeparatorComma, - includeHeader = True, - fillEmpty = False -} - -data Symbol = Symbol { - symCode :: T.Text, - symName :: T.Text, - symId :: Integer, - symMarketCode :: Integer, - symMarketName :: T.Text -} - deriving (Show, Eq) - -data Row = Row { - rowTicker :: T.Text, - rowTime :: UTCTime, - rowOpen :: Price, - rowHigh :: Price, - rowLow :: Price, - rowClose :: Price, - rowVolume :: Integer -} deriving (Show, Eq) - -instance FromField Price where - parseField s = fromDouble <$> (parseField s :: Parser Double) - -instance FromRecord Row where - parseRecord v - | length v == 9 = do - tkr <- v .! 0 - date <- v .! 2 - time <- v .! 3 - dt <- addUTCTime (-3 * 3600) <$> (parseDt date time) - open <- v .! 4 - high <- v .! 5 - low <- v .! 6 - close <- v .! 7 - vol <- v .! 8 - return $ Row tkr dt open high low close vol - | otherwise = mzero - where - parseDt :: B.ByteString -> B.ByteString -> Parser UTCTime - parseDt d t = case parseTimeM True defaultTimeLocale "%Y%m%d %H%M%S" $ B8.unpack d ++ " " ++ B8.unpack t of - Just dt -> return dt - Nothing -> fail "Unable to parse date/time" - -downloadAndParseQuotes :: RequestParams -> IO (Maybe [Row]) -downloadAndParseQuotes requestParams = downloadAndParseQuotes' 3 - where - downloadAndParseQuotes' iter = do - raw <- downloadQuotes requestParams `catch` (\e -> do - debugM "History" $ "exception: " ++ show (e :: SomeException) - return Nothing) - case raw of - Just r -> return $ parseQuotes r - Nothing -> if iter <= 0 then return Nothing else downloadAndParseQuotes' (iter - 1) - -parseQuotes :: B.ByteString -> Maybe [Row] -parseQuotes csvData = case decode HasHeader $ BL.fromStrict csvData of - Left _ -> Nothing - Right d -> Just $ V.toList d - -downloadQuotes :: RequestParams -> IO (Maybe B.ByteString) -downloadQuotes requestParams = do - symbols <- downloadFinamSymbols - case requestUrl symbols requestParams of - Just (url, options') -> do - resp <- getWith options' url - return $ Just $ BL.toStrict $ resp ^. responseBody - Nothing -> return Nothing - -requestUrl :: [Symbol] -> RequestParams -> Maybe (String, Options) -requestUrl symbols requestParams = case getFinamCode symbols (ticker requestParams) of - Just (sym, market) -> Just ("http://export.finam.ru/export9.out", getOptions sym market) - Nothing -> Nothing - where - getOptions sym market = defaults & - param "market" .~ [T.pack . show $ market] & - param "f" .~ [ticker requestParams] & - param "e" .~ [".csv"] & - param "dtf" .~ [T.pack . show . fromEnum . dateFormat $ requestParams] & - param "tmf" .~ [T.pack . show . fromEnum . dateFormat $ requestParams] & - param "MSOR" .~ ["0"] & - param "mstime" .~ ["on"] & - param "mstimever" .~ ["1"] & - param "sep" .~ [T.pack . show . fromEnum . fieldSeparator $ requestParams] & - param "sep2" .~ ["1"] & - param "at" .~ [if includeHeader requestParams then "1" else "0"] & - param "fsp" .~ [if fillEmpty requestParams then "1" else "0"] & - param "p" .~ [T.pack . show . fromEnum $ period requestParams] & - param "em" .~ [T.pack . show $ sym ] & - param "df" .~ [T.pack . show $ dayFrom] & - param "mf" .~ [T.pack . show $ (monthFrom - 1)] & - param "yf" .~ [T.pack . show $ yearFrom] & - param "dt" .~ [T.pack . show $ dayTo] & - param "mt" .~ [T.pack . show $ (monthTo - 1)] & - param "yt" .~ [T.pack . show $ yearTo] & - param "code" .~ [ticker requestParams] & - param "datf" .~ if period requestParams == PeriodTick then ["11"] else ["1"] - (yearFrom, monthFrom, dayFrom) = toGregorian $ startDate requestParams - (yearTo, monthTo, dayTo) = toGregorian $ endDate requestParams - -getFinamCode :: [Symbol] -> T.Text -> Maybe (Integer, Integer) -getFinamCode symbols tickerCode = case find (\x -> symCode x == tickerCode && symMarketCode x `notElem` archives) symbols of - Just sym -> Just (symId sym, symMarketCode sym) - Nothing -> Nothing - -downloadFinamSymbols :: IO [Symbol] -downloadFinamSymbols = do - conv <- TC.open "cp1251" Nothing - result <- get "http://www.finam.ru/cache/icharts/icharts.js" - if result ^. responseStatus . statusCode == 200 - then return $ parseSymbols . T.lines $ TC.toUnicode conv $ BL.toStrict $ result ^. responseBody - else return [] - where - parseSymbols :: [T.Text] -> [Symbol] - parseSymbols strs = zipWith5 Symbol codes names ids marketCodes marketNames - where - getWithParser parser pos = fromMaybe [] $ do - s <- T.unpack <$> strs `atMay` pos - hush $ parse parser "" s - - ids :: [Integer] - ids = getWithParser intlist 0 - - names :: [T.Text] - names = T.pack <$> getWithParser strlist 1 - - codes :: [T.Text] - codes = T.pack <$> getWithParser strlist 2 - - marketCodes :: [Integer] - marketCodes = getWithParser intlist 3 - - marketNames :: [T.Text] - marketNames = fmap (\code -> fromMaybe "" $ M.lookup code codeToName) marketCodes - - intlist = do - _ <- string "var" - spaces - skipMany1 alphaNum - spaces - _ <- char '=' - spaces - _ <- char '[' - manyTill (do - i <- int - _ <- char ',' <|> char ']' - return i) (char '\'' <|> char ';') - - strlist = do - _ <- string "var" - spaces - skipMany1 alphaNum - spaces - _ <- char '=' - spaces - _ <- char '[' - (char '\'' >> manyTill ((char '\\' >> char '\'') <|> anyChar) (char '\'')) `sepBy` char ',' - -codeToName :: M.Map Integer T.Text -codeToName = M.fromList [ - (200, "МосБиржа топ"), - (1 , "МосБиржа акции"), - (14 , "МосБиржа фьючерсы"), - (41, "Курс рубля"), - (45, "МосБиржа валютный рынок"), - (2, "МосБиржа облигации"), - (12, "МосБиржа внесписочные облигации"), - (29, "МосБиржа пифы"), - (8, "Расписки"), - (6, "Мировые Индексы"), - (24, "Товары"), - (5, "Мировые валюты"), - (25, "Акции США(BATS)"), - (7, "Фьючерсы США"), - (27, "Отрасли экономики США"), - (26, "Гособлигации США"), - (28, "ETF"), - (30, "Индексы мировой экономики"), - (3, "РТС"), - (20, "RTS Board"), - (10, "РТС-GAZ"), - (17, "ФОРТС Архив"), - (31, "Сырье Архив"), - (38, "RTS Standard Архив"), - (16, "ММВБ Архив"), - (18, "РТС Архив"), - (9, "СПФБ Архив"), - (32, "РТС-BOARD Архив"), - (39, "Расписки Архив"), - (-1, "Отрасли") ] - - -archives :: [Integer] -archives = [3, 8, 16, 17, 18, 31, 32, 38, 39, 517] diff --git a/src/ATrade/Quotes/HistoryProvider.hs b/src/ATrade/Quotes/HistoryProvider.hs new file mode 100644 index 0000000..96147a1 --- /dev/null +++ b/src/ATrade/Quotes/HistoryProvider.hs @@ -0,0 +1,12 @@ + +module ATrade.Quotes.HistoryProvider + ( + HistoryProvider(..) + ) where + +import ATrade.RoboCom.Types (Bar) +import ATrade.Types (BarTimeframe, TickerId) +import Data.Time (UTCTime) + +class (Monad m) => HistoryProvider m where + getHistory :: TickerId -> BarTimeframe -> UTCTime -> UTCTime -> m [Bar] diff --git a/src/ATrade/Quotes/QHP.hs b/src/ATrade/Quotes/QHP.hs index 28ed63d..a33d5ee 100644 --- a/src/ATrade/Quotes/QHP.hs +++ b/src/ATrade/Quotes/QHP.hs @@ -1,4 +1,6 @@ +{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE QuasiQuotes #-} module ATrade.Quotes.QHP ( Period(..), @@ -9,18 +11,20 @@ module ATrade.Quotes.QHP ( ) where import ATrade.Exceptions +import ATrade.Logging (Message, logInfo, logDebug) import ATrade.Types -import Control.Exception.Safe (MonadThrow, throw) -import Control.Monad.IO.Class (MonadIO, liftIO) +import Colog (WithLog) +import Control.Exception.Safe (MonadThrow, throw) +import Control.Monad.IO.Class (MonadIO, liftIO) import Data.Aeson import Data.Binary.Get -import qualified Data.ByteString.Lazy as BL -import qualified Data.Text as T +import qualified Data.ByteString.Lazy as BL +import qualified Data.Text as T import Data.Time.Calendar import Data.Time.Clock import Data.Time.Clock.POSIX import Data.Time.Format -import System.Log.Logger +import Language.Haskell.Printf (t) import System.ZMQ4 data Period = @@ -53,10 +57,10 @@ data QHPHandle = QHPHandle mkQHPHandle :: Context -> T.Text -> QHPHandle mkQHPHandle = QHPHandle -requestHistoryFromQHP :: (MonadThrow m, MonadIO m) => QHPHandle -> TickerId -> BarTimeframe -> UTCTime -> UTCTime -> m [Bar] +requestHistoryFromQHP :: (WithLog env Message m, MonadThrow m, MonadIO m) => QHPHandle -> TickerId -> BarTimeframe -> UTCTime -> UTCTime -> m [Bar] requestHistoryFromQHP qhp tickerId timeframe fromTime toTime = case parseQHPPeriod (unBarTimeframe timeframe) of - Just tf -> liftIO $ getQuotes (qhpContext qhp) (params tf) + Just tf -> getQuotes (qhpContext qhp) (params tf) _ -> throw $ BadParams "QHP: Unable to parse timeframe" where params tf = RequestParams @@ -96,10 +100,11 @@ instance ToJSON RequestParams where "to" .= printDatetime (UTCTime (endDate p) 0), "timeframe" .= show (period p) ] -getQuotes :: Context -> RequestParams -> IO [Bar] -getQuotes ctx params = - withSocket ctx Req $ \sock -> do - debugM "QHP" $ "Connecting to ep: " ++ show (endpoint params) +getQuotes :: (WithLog env Message m, MonadIO m) => Context -> RequestParams -> m [Bar] +getQuotes ctx params = do + logInfo "QHP" $ "Connecting to ep: " <> endpoint params + logDebug "QHP" $ "From: " <> (T.pack . show) (startDate params) <> "; To: " <> (T.pack . show) (endDate params) + result <- liftIO $ withSocket ctx Req $ \sock -> do connect sock $ (T.unpack . endpoint) params send sock [] (BL.toStrict $ encode params) response <- receiveMulti sock @@ -108,6 +113,8 @@ getQuotes ctx params = then return $ reverse $ parseBars (ticker params) $ BL.fromStrict rest else return [] _ -> return [] + logInfo "QHP" $ "Obtained bars: " <> (T.pack . show . length) result + return result parseBars :: TickerId -> BL.ByteString -> [Bar] parseBars tickerId input = diff --git a/src/ATrade/Quotes/QTIS.hs b/src/ATrade/Quotes/QTIS.hs index b4d2163..83e95ff 100644 --- a/src/ATrade/Quotes/QTIS.hs +++ b/src/ATrade/Quotes/QTIS.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE OverloadedStrings #-} module ATrade.Quotes.QTIS @@ -7,13 +8,15 @@ module ATrade.Quotes.QTIS ) where import ATrade.Exceptions +import ATrade.Logging (Message, logInfo) import ATrade.Types +import Colog (WithLog) import Control.Exception.Safe +import Control.Monad.IO.Class (MonadIO (liftIO)) import Data.Aeson import qualified Data.ByteString.Char8 as BC8 import qualified Data.ByteString.Lazy as BL import qualified Data.Text as T -import System.Log.Logger import System.ZMQ4 data TickerInfo = TickerInfo { @@ -34,16 +37,14 @@ instance ToJSON TickerInfo where "lot_size" .= tiLotSize ti, "tick_size" .= tiTickSize ti ] -qtisGetTickersInfo :: Context -> T.Text -> TickerId -> IO TickerInfo -qtisGetTickersInfo ctx endpoint tickerId = - withSocket ctx Req $ \sock -> do - debugM "QTIS" $ "Connecting to: " ++ T.unpack endpoint +qtisGetTickersInfo :: (WithLog env Message m, MonadIO m) => Context -> T.Text -> TickerId -> m TickerInfo +qtisGetTickersInfo ctx endpoint tickerId = do + logInfo "QTIS" $ "Requesting ticker: " <> tickerId <> " from " <> endpoint + liftIO $ withSocket ctx Req $ \sock -> do connect sock $ T.unpack endpoint - debugM "QTIS" $ "Requesting: " ++ T.unpack tickerId send sock [] $ BL.toStrict tickerRequest response <- receiveMulti sock let r = parseResponse response - debugM "QTIS" $ "Got response: " ++ show r case r of Just resp -> return resp Nothing -> throw $ QTISFailure "Can't parse response" diff --git a/src/ATrade/Quotes/TickerInfoProvider.hs b/src/ATrade/Quotes/TickerInfoProvider.hs new file mode 100644 index 0000000..c38097a --- /dev/null +++ b/src/ATrade/Quotes/TickerInfoProvider.hs @@ -0,0 +1,12 @@ + +module ATrade.Quotes.TickerInfoProvider + ( + TickerInfoProvider(..) + ) where + +import ATrade.RoboCom.Types (InstrumentParameters) +import ATrade.Types (TickerId) + +class (Monad m) => TickerInfoProvider m where + getInstrumentParameters :: [TickerId] -> m [InstrumentParameters] + diff --git a/src/ATrade/Quotes/Types.hs b/src/ATrade/Quotes/Types.hs new file mode 100644 index 0000000..e69de29 diff --git a/src/ATrade/RoboCom/ConfigStorage.hs b/src/ATrade/RoboCom/ConfigStorage.hs new file mode 100644 index 0000000..4a18ef8 --- /dev/null +++ b/src/ATrade/RoboCom/ConfigStorage.hs @@ -0,0 +1,14 @@ +{-# LANGUAGE RankNTypes #-} + +module ATrade.RoboCom.ConfigStorage +( + ConfigStorage(..) +) where + +import qualified Data.Text as T +import Dhall (FromDhall) + +class (Monad m) => ConfigStorage m where + loadConfig :: forall c. (FromDhall c) => T.Text -> m c + + diff --git a/src/ATrade/RoboCom/Monad.hs b/src/ATrade/RoboCom/Monad.hs index b9cd3be..c30c18c 100644 --- a/src/ATrade/RoboCom/Monad.hs +++ b/src/ATrade/RoboCom/Monad.hs @@ -13,15 +13,14 @@ module ATrade.RoboCom.Monad ( seInstanceId, seAccount, seVolume, - seBars, seLastTimestamp, EventCallback, Event(..), MonadRobot(..), also, t, - st -) where + st, + getFirstTickerId) where import ATrade.RoboCom.Types import ATrade.Types @@ -33,11 +32,14 @@ import qualified Data.Text.Lazy as TL import Data.Time.Clock import Language.Haskell.Printf import Language.Haskell.TH.Quote (QuasiQuoter) +import ATrade.Logging (Severity) +import Data.List.NonEmpty (NonEmpty) +import qualified Data.List.NonEmpty as NE class (Monad m) => MonadRobot m c s | m -> c, m -> s where - submitOrder :: Order -> m () + submitOrder :: Order -> m OrderId cancelOrder :: OrderId -> m () - appendToLog :: TL.Text -> m () + appendToLog :: Severity -> TL.Text -> m () setupTimer :: UTCTime -> m () enqueueIOAction :: Int -> IO Value -> m () getConfig :: m c @@ -48,13 +50,18 @@ class (Monad m) => MonadRobot m c s | m -> c, m -> s where oldState <- getState setState (f oldState) getEnvironment :: m StrategyEnvironment + getTicker :: TickerId -> BarTimeframe -> m (Maybe BarSeries) + getAvailableTickers :: m (NonEmpty BarSeriesId) + +getFirstTickerId :: forall c s m. (Monad m, MonadRobot m c s) => m BarSeriesId +getFirstTickerId = NE.head <$> getAvailableTickers st :: QuasiQuoter st = t type EventCallback c s = forall m . MonadRobot m c s => Event -> m () -data Event = NewBar Bar +data Event = NewBar (BarTimeframe, Bar) | NewTick Tick | OrderSubmitted Order | OrderUpdate OrderId OrderState @@ -68,7 +75,6 @@ data StrategyEnvironment = StrategyEnvironment { _seInstanceId :: !T.Text, -- ^ Strategy instance identifier. Should be unique among all strategies (very desirable) _seAccount :: !T.Text, -- ^ Account string to use for this strategy instance. Broker-dependent _seVolume :: !Int, -- ^ Volume to use for this instance (in lots/contracts) - _seBars :: !Bars, -- ^ List of tickers which is used by this strategy _seLastTimestamp :: !UTCTime } deriving (Eq) makeLenses ''StrategyEnvironment diff --git a/src/ATrade/RoboCom/Persistence.hs b/src/ATrade/RoboCom/Persistence.hs new file mode 100644 index 0000000..602a1fc --- /dev/null +++ b/src/ATrade/RoboCom/Persistence.hs @@ -0,0 +1,16 @@ +{-# LANGUAGE RankNTypes #-} + +module ATrade.RoboCom.Persistence +( + MonadPersistence(..) +) where + +import Data.Aeson +import Data.Default (Default) +import qualified Data.Text as T + +class (Monad m) => MonadPersistence m where + saveState :: forall s. (ToJSON s) => s -> T.Text -> m () + loadState :: forall s. (Default s, FromJSON s) => T.Text -> m s + + diff --git a/src/ATrade/RoboCom/Positions.hs b/src/ATrade/RoboCom/Positions.hs index 7ddb711..c71d8bf 100644 --- a/src/ATrade/RoboCom/Positions.hs +++ b/src/ATrade/RoboCom/Positions.hs @@ -20,7 +20,6 @@ module ATrade.RoboCom.Positions ( StateHasPositions(..), - ParamsHasMainTicker(..), PositionState(..), Position(..), posIsOpen, @@ -65,7 +64,8 @@ module ATrade.RoboCom.Positions setStopLoss, setLimitStopLoss, setTakeProfit, - setStopLossAndTakeProfit + setStopLossAndTakeProfit, + handlePositions ) where import GHC.Generics @@ -77,9 +77,11 @@ import ATrade.Types import Control.Lens import Control.Monad +import ATrade.Logging (Severity (Trace, Warning)) +import ATrade.RoboCom.Monad (MonadRobot (getAvailableTickers)) import Data.Aeson import qualified Data.List as L -import qualified Data.Map as M +import qualified Data.List.NonEmpty as NE import qualified Data.Text as T import qualified Data.Text.Lazy as TL import Data.Time.Clock @@ -145,7 +147,7 @@ modifyPositions f = do modifyState (\s -> setPositions s (f pos)) class ParamsHasMainTicker a where - mainTicker :: a -> TickerId + mainTicker :: a -> (BarTimeframe, TickerId) -- | Helper function. Finds first element in list which satisfies predicate 'p' and if found, applies 'm' to it, leaving other elements intact. findAndModify :: (a -> Bool) -> (a -> a) -> [a] -> [a] @@ -192,9 +194,9 @@ dispatchPosition event pos = case posState pos of if orderDeadline (posSubmissionDeadline pos) lastTs then return $ pos { posState = PositionCancelled } -- TODO call TimeoutHandler if present else case event of - OrderSubmitted order -> - return $ if order `orderCorrespondsTo` pendingOrder - then pos { posCurrentOrder = Just order, + OrderUpdate oid Submitted -> do + return $ if orderId pendingOrder == oid + then pos { posCurrentOrder = Just pendingOrder, posState = PositionWaitingOpen, posSubmissionDeadline = Nothing } else pos @@ -207,49 +209,52 @@ dispatchPosition event pos = case posState pos of then if posBalance pos == 0 then do - appendToLog $ [t|"In PositionWaitingOpen: execution timeout: %?/%?"|] (posExecutionDeadline pos) lastTs cancelOrder $ orderId order return $ pos { posState = PositionWaitingPendingCancellation, posNextState = Just PositionCancelled } else do - appendToLog $ [t|Order executed (partially, %? / %?): %?|] (posBalance pos) (orderQuantity order) order + appendToLog Trace $ [t|Order executed (partially, %? / %?): %?|] (posBalance pos) (orderQuantity order) order return pos { posState = PositionOpen, posCurrentOrder = Nothing, posExecutionDeadline = Nothing, posEntryTime = Just lastTs} else case event of OrderUpdate oid newstate -> if oid == orderId order then case newstate of Cancelled -> do - appendToLog $ [t|Order cancelled in PositionWaitingOpen: balance %d, max %d|] (posBalance pos) (orderQuantity order) + appendToLog Trace $ [t|Order cancelled in PositionWaitingOpen: balance %d, max %d|] (posBalance pos) (orderQuantity order) if posBalance pos /= 0 then return pos { posState = PositionOpen, posCurrentOrder = Nothing, posExecutionDeadline = Nothing, posEntryTime = Just lastTs} else return pos { posState = PositionCancelled } Executed -> do - appendToLog $ [t|Order executed: %?|] order - return pos { posState = PositionOpen, posCurrentOrder = Nothing, posExecutionDeadline = Nothing, posBalance = balanceForOrder order, posEntryTime = Just lastTs} + appendToLog Trace $ [t|Order executed: %?|] order + return pos { posState = PositionOpen, + posCurrentOrder = Nothing, + posExecutionDeadline = Nothing, + posBalance = balanceForOrder order, + posEntryTime = Just lastTs } Rejected -> do - appendToLog $ [t|Order rejected: %?|] order + appendToLog Trace $ [t|Order rejected: %?|] order return pos { posState = PositionCancelled, posCurrentOrder = Nothing, posExecutionDeadline = Nothing, posBalance = 0, posEntryTime = Nothing } _ -> do - appendToLog $ [t|In PositionWaitingOpen: order state update: %?|] newstate + appendToLog Trace $ [t|In PositionWaitingOpen: order state update: %?|] newstate return pos else return pos -- Update for another position's order NewTrade trade -> do - appendToLog $ [t|Order new trade: %?/%?|] order trade + appendToLog Trace $ [t|Order new trade: %?/%?|] order trade return $ if tradeOrderId trade == orderId order then pos { posBalance = if tradeOperation trade == Buy then posBalance pos + tradeQuantity trade else posBalance pos - tradeQuantity trade } else pos _ -> return pos Nothing -> do - appendToLog $ [t|W: No current order in PositionWaitingOpen state: %?|] pos + appendToLog Warning $ [t|W: No current order in PositionWaitingOpen state: %?|] pos return pos handlePositionOpen = do lastTs <- view seLastTimestamp <$> getEnvironment if | orderDeadline (posSubmissionDeadline pos) lastTs -> do - appendToLog $ [t|PositionId: %? : Missed submission deadline: %?, remaining in PositionOpen state|] (posId pos) (posSubmissionDeadline pos) + appendToLog Warning $ [t|PositionId: %? : Missed submission deadline: %?, remaining in PositionOpen state|] (posId pos) (posSubmissionDeadline pos) return pos { posSubmissionDeadline = Nothing, posExecutionDeadline = Nothing } | orderDeadline (posExecutionDeadline pos) lastTs -> do - appendToLog $ [t|PositionId: %? : Missed execution deadline: %?, remaining in PositionOpen state|] (posId pos) (posExecutionDeadline pos) + appendToLog Warning $ [t|PositionId: %? : Missed execution deadline: %?, remaining in PositionOpen state|] (posId pos) (posExecutionDeadline pos) return pos { posExecutionDeadline = Nothing } | otherwise -> case event of NewTick tick -> if @@ -272,8 +277,11 @@ dispatchPosition event pos = case posState pos of (OrderUpdate _ newstate, Just _, Just (PositionWaitingCloseSubmission nextOrder)) -> if newstate == Cancelled then do - submitOrder nextOrder - return pos { posState = PositionWaitingCloseSubmission nextOrder, posSubmissionDeadline = Just (10 `addUTCTime` lastTs), posExecutionDeadline = Nothing } + oid <- submitOrder nextOrder + return pos + { posState = PositionWaitingCloseSubmission nextOrder { orderId = oid }, + posSubmissionDeadline = Just (10 `addUTCTime` lastTs), + posExecutionDeadline = Nothing } else return pos (OrderUpdate _ newstate, Just _, Just PositionCancelled) -> if newstate == Cancelled @@ -281,7 +289,7 @@ dispatchPosition event pos = case posState pos of else return pos _ -> return pos else do - appendToLog "Deadline when cancelling pending order" + appendToLog Warning "Deadline when cancelling pending order" return pos { posState = PositionCancelled } handlePositionWaitingCloseSubmission pendingOrder = do @@ -293,9 +301,9 @@ dispatchPosition event pos = case posState pos of Nothing -> doNothing return $ pos { posCurrentOrder = Nothing, posState = PositionOpen, posSubmissionDeadline = Nothing } -- TODO call TimeoutHandler if present else case event of - OrderSubmitted order -> - return $ if order `orderCorrespondsTo` pendingOrder - then pos { posCurrentOrder = Just order, + OrderUpdate oid Submitted -> + return $ if orderId pendingOrder == oid + then pos { posCurrentOrder = Just pendingOrder, posState = PositionWaitingClose, posSubmissionDeadline = Nothing } else pos @@ -308,7 +316,7 @@ dispatchPosition event pos = case posState pos of case posCurrentOrder pos of Just order -> cancelOrder (orderId order) _ -> doNothing - appendToLog $ [t|Was unable to close position, remaining balance: %?|] (posBalance pos) + appendToLog Warning $ [t|Was unable to close position, remaining balance: %?|] (posBalance pos) return $ pos { posState = PositionOpen, posSubmissionDeadline = Nothing, posExecutionDeadline = Nothing } -- TODO call TimeoutHandler if present else case (event, posCurrentOrder pos) of (OrderUpdate oid newstate, Just order) -> @@ -364,14 +372,10 @@ newPosition order account tickerId operation quantity submissionDeadline = do posExitTime = Nothing } modifyPositions (\p -> position : p) - positions <- getPositions <$> getState - appendToLog $ [t|All positions: %?|] positions return position reapDeadPositions :: (StateHasPositions s) => EventCallback c s -reapDeadPositions _ = do - ts <- view seLastTimestamp <$> getEnvironment - when (floor (utctDayTime ts) `mod` 300 == 0) $ modifyPositions (L.filter (not . posIsDead)) +reapDeadPositions _ = modifyPositions (L.filter (not . posIsDead)) defaultHandler :: (StateHasPositions s) => EventCallback c s defaultHandler = reapDeadPositions `also` handlePositions @@ -386,18 +390,18 @@ modifyPosition f oldpos = do return $ f oldpos Nothing -> return oldpos -getCurrentTicker :: (ParamsHasMainTicker c, MonadRobot m c s) => m [Bar] +getCurrentTicker :: (MonadRobot m c s) => m [Bar] getCurrentTicker = do - mainTicker' <- mainTicker <$> getConfig - maybeBars <- view (seBars . at mainTicker') <$> getEnvironment + (BarSeriesId mainTicker' tf) <- NE.head <$> getAvailableTickers + maybeBars <- getTicker mainTicker' tf case maybeBars of Just b -> return $ bsBars b _ -> return [] -getCurrentTickerSeries :: (ParamsHasMainTicker c, MonadRobot m c s) => m (Maybe BarSeries) +getCurrentTickerSeries :: (MonadRobot m c s) => m (Maybe BarSeries) getCurrentTickerSeries = do - bars <- view seBars <$> getEnvironment - flip M.lookup bars . mainTicker <$> getConfig + (BarSeriesId mainTicker' tf) <- NE.head <$> getAvailableTickers + getTicker mainTicker' tf getLastActivePosition :: (StateHasPositions s, MonadRobot m c s) => m (Maybe Position) getLastActivePosition = L.find (\pos -> posState pos == PositionOpen) . getPositions <$> getState @@ -418,8 +422,8 @@ getAllActiveAndPendingPositions = L.filter onNewBarEvent :: (MonadRobot m c s) => Event -> (Bar -> m ()) -> m () onNewBarEvent event f = case event of - NewBar bar -> f bar - _ -> doNothing + NewBar (_, bar) -> f bar + _ -> doNothing onNewTickEvent :: (MonadRobot m c s) => Event -> (Tick -> m ()) -> m () onNewTickEvent event f = case event of @@ -457,16 +461,16 @@ onActionCompletedEvent event f = case event of ActionCompleted tag v -> f tag v _ -> doNothing -enterAtMarket :: (StateHasPositions s, ParamsHasMainTicker c, MonadRobot m c s) => T.Text -> Operation -> m Position +enterAtMarket :: (StateHasPositions s, MonadRobot m c s) => T.Text -> Operation -> m Position enterAtMarket operationSignalName operation = do env <- getEnvironment enterAtMarketWithParams (env ^. seAccount) (env ^. seVolume) (SignalId (env ^. seInstanceId) operationSignalName "") operation -enterAtMarketWithParams :: (StateHasPositions s, ParamsHasMainTicker c, MonadRobot m c s) => T.Text -> Int -> SignalId -> Operation -> m Position +enterAtMarketWithParams :: (StateHasPositions s, MonadRobot m c s) => T.Text -> Int -> SignalId -> Operation -> m Position enterAtMarketWithParams account quantity signalId operation = do - tickerId <- mainTicker <$> getConfig - submitOrder $ order tickerId - newPosition (order tickerId) account tickerId operation quantity 20 + BarSeriesId tickerId _ <- getFirstTickerId + oid <- submitOrder $ order tickerId + newPosition ((order tickerId) { orderId = oid }) account tickerId operation quantity 20 where order tickerId = mkOrder { orderAccountId = account, @@ -477,20 +481,20 @@ enterAtMarketWithParams account quantity signalId operation = do orderSignalId = signalId } -enterAtLimit :: (StateHasPositions s, ParamsHasMainTicker c, MonadRobot m c s) => NominalDiffTime -> T.Text -> Price -> Operation -> m Position +enterAtLimit :: (StateHasPositions s, MonadRobot m c s) => NominalDiffTime -> T.Text -> Price -> Operation -> m Position enterAtLimit timeToCancel operationSignalName price operation = do env <- getEnvironment enterAtLimitWithParams timeToCancel (env ^. seAccount) (env ^. seVolume) (SignalId (env ^. seInstanceId) operationSignalName "") price operation -enterAtLimitWithVolume :: (StateHasPositions s, ParamsHasMainTicker c, MonadRobot m c s) => NominalDiffTime -> T.Text -> Price -> Int -> Operation -> m Position +enterAtLimitWithVolume :: (StateHasPositions s, MonadRobot m c s) => NominalDiffTime -> T.Text -> Price -> Int -> Operation -> m Position enterAtLimitWithVolume timeToCancel operationSignalName price vol operation = do acc <- view seAccount <$> getEnvironment inst <- view seInstanceId <$> getEnvironment enterAtLimitWithParams timeToCancel acc vol (SignalId inst operationSignalName "") price operation -enterAtLimitWithParams :: (StateHasPositions s, ParamsHasMainTicker c, MonadRobot m c s) => NominalDiffTime -> T.Text -> Int -> SignalId -> Price -> Operation -> m Position +enterAtLimitWithParams :: (StateHasPositions s, MonadRobot m c s) => NominalDiffTime -> T.Text -> Int -> SignalId -> Price -> Operation -> m Position enterAtLimitWithParams timeToCancel account quantity signalId price operation = do - tickerId <- mainTicker <$> getConfig + BarSeriesId tickerId _ <- getFirstTickerId enterAtLimitForTickerWithParams tickerId timeToCancel account quantity signalId price operation enterAtLimitForTickerWithVolume :: (StateHasPositions s, MonadRobot m c s) => TickerId -> NominalDiffTime -> T.Text -> Price -> Int -> Operation -> m Position @@ -509,9 +513,9 @@ enterAtLimitForTicker tickerId timeToCancel operationSignalName price operation enterAtLimitForTickerWithParams :: (StateHasPositions s, MonadRobot m c s) => TickerId -> NominalDiffTime -> T.Text -> Int -> SignalId -> Price -> Operation -> m Position enterAtLimitForTickerWithParams tickerId timeToCancel account quantity signalId price operation = do lastTs <- view seLastTimestamp <$> getEnvironment - submitOrder order - appendToLog $ [t|enterAtLimit: %?, deadline: %?|] tickerId (timeToCancel `addUTCTime` lastTs) - newPosition order account tickerId operation quantity 20 >>= + oid <- submitOrder order + appendToLog Trace $ [t|enterAtLimit: %?, deadline: %?|] tickerId (timeToCancel `addUTCTime` lastTs) + newPosition (order {orderId = oid}) account tickerId operation quantity 20 >>= modifyPosition (\p -> p { posExecutionDeadline = Just $ timeToCancel `addUTCTime` lastTs }) where order = mkOrder { @@ -523,19 +527,19 @@ enterAtLimitForTickerWithParams tickerId timeToCancel account quantity signalId orderSignalId = signalId } -enterLongAtMarket :: (StateHasPositions s, ParamsHasMainTicker c, MonadRobot m c s) => T.Text -> m Position +enterLongAtMarket :: (StateHasPositions s, MonadRobot m c s) => T.Text -> m Position enterLongAtMarket operationSignalName = enterAtMarket operationSignalName Buy -enterShortAtMarket :: (StateHasPositions s, ParamsHasMainTicker c, MonadRobot m c s) => T.Text -> m Position +enterShortAtMarket :: (StateHasPositions s, MonadRobot m c s) => T.Text -> m Position enterShortAtMarket operationSignalName = enterAtMarket operationSignalName Sell -enterLongAtLimit :: (StateHasPositions s, ParamsHasMainTicker c, MonadRobot m c s) => NominalDiffTime -> Price -> T.Text -> m Position +enterLongAtLimit :: (StateHasPositions s, MonadRobot m c s) => NominalDiffTime -> Price -> T.Text -> m Position enterLongAtLimit timeToCancel price operationSignalName = enterAtLimit timeToCancel operationSignalName price Buy enterLongAtLimitForTicker :: (StateHasPositions s, MonadRobot m c s) => TickerId -> NominalDiffTime -> Price -> T.Text -> m Position enterLongAtLimitForTicker tickerId timeToCancel price operationSignalName = enterAtLimitForTicker tickerId timeToCancel operationSignalName price Buy -enterShortAtLimit :: (StateHasPositions s, ParamsHasMainTicker c, MonadRobot m c s) => NominalDiffTime -> Price -> T.Text -> m Position +enterShortAtLimit :: (StateHasPositions s, MonadRobot m c s) => NominalDiffTime -> Price -> T.Text -> m Position enterShortAtLimit timeToCancel price operationSignalName = enterAtLimit timeToCancel operationSignalName price Sell enterShortAtLimitForTicker :: (StateHasPositions s, MonadRobot m c s) => TickerId -> NominalDiffTime -> Price -> T.Text -> m Position @@ -555,10 +559,10 @@ exitAtMarket position operationSignalName = do posExecutionDeadline = Nothing }) position Nothing -> do - submitOrder (closeOrder inst) + oid <- submitOrder (closeOrder inst) modifyPosition (\pos -> pos { posCurrentOrder = Nothing, - posState = PositionWaitingCloseSubmission (closeOrder inst), + posState = PositionWaitingCloseSubmission (closeOrder inst) { orderId = oid }, posNextState = Just PositionClosed, posSubmissionDeadline = Just $ 10 `addUTCTime` lastTs, posExecutionDeadline = Nothing }) position @@ -579,11 +583,11 @@ exitAtLimit timeToCancel price position operationSignalName = do case posCurrentOrder position of Just order -> cancelOrder (orderId order) Nothing -> doNothing - submitOrder (closeOrder inst) - appendToLog $ [t|exitAtLimit: %?, deadline: %?|] (posTicker position) (timeToCancel `addUTCTime` lastTs) + oid <- submitOrder (closeOrder inst) + appendToLog Trace $ [t|exitAtLimit: %?, deadline: %?|] (posTicker position) (timeToCancel `addUTCTime` lastTs) modifyPosition (\pos -> pos { posCurrentOrder = Nothing, - posState = PositionWaitingCloseSubmission (closeOrder inst), + posState = PositionWaitingCloseSubmission (closeOrder inst) { orderId = oid }, posNextState = Just PositionClosed, posSubmissionDeadline = Just $ 10 `addUTCTime` lastTs, posExecutionDeadline = Just $ timeToCancel `addUTCTime` lastTs }) position diff --git a/src/ATrade/RoboCom/Types.hs b/src/ATrade/RoboCom/Types.hs index 935e798..136ebcf 100644 --- a/src/ATrade/RoboCom/Types.hs +++ b/src/ATrade/RoboCom/Types.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE OverloadedStrings #-} @@ -6,12 +7,12 @@ module ATrade.RoboCom.Types ( Bar(..), + BarSeriesId(..), BarSeries(..), - Timeframe(..), - tfSeconds, Ticker(..), Bars, - InstrumentParameters(..) + InstrumentParameters(..), + bsidTickerId ) where import ATrade.Types @@ -20,15 +21,12 @@ import Data.Aeson.Types import qualified Data.HashMap.Strict as HM import qualified Data.Map.Strict as M import qualified Data.Text as T +import GHC.Generics (Generic) -newtype Timeframe = - Timeframe Integer deriving (Show, Eq) - -tfSeconds :: (Num a) => Timeframe -> a -tfSeconds (Timeframe s) = fromInteger s data InstrumentParameters = InstrumentParameters { + ipTickerId :: TickerId, ipLotSize :: Int, ipTickSize :: Price } deriving (Show, Eq) @@ -36,7 +34,7 @@ data InstrumentParameters = data BarSeries = BarSeries { bsTickerId :: TickerId, - bsTimeframe :: Timeframe, + bsTimeframe :: BarTimeframe, bsBars :: [Bar], bsParams :: InstrumentParameters } deriving (Show, Eq) @@ -68,5 +66,11 @@ instance ToJSON Ticker where "timeframe" .= timeframeSeconds t, "aliases" .= Object (HM.fromList $ fmap (\(x, y) -> (T.pack x, String $ T.pack y)) $ aliases t) ] -type Bars = M.Map TickerId BarSeries +data BarSeriesId = BarSeriesId TickerId BarTimeframe + deriving (Show, Eq, Generic, Ord) + +bsidTickerId :: BarSeriesId -> TickerId +bsidTickerId (BarSeriesId tid _) = tid + +type Bars = M.Map BarSeriesId BarSeries diff --git a/src/ATrade/RoboCom/Utils.hs b/src/ATrade/RoboCom/Utils.hs index ff3df31..f9f5f54 100644 --- a/src/ATrade/RoboCom/Utils.hs +++ b/src/ATrade/RoboCom/Utils.hs @@ -20,6 +20,7 @@ import qualified Data.Text as T import Data.Time.Calendar import Data.Time.Clock +import Data.Int (Int64) import Text.Read hiding (String) rescaleToDaily :: [Bar] -> [Bar] @@ -36,13 +37,13 @@ rescaleToDaily (firstBar:restBars) = rescaleToDaily' restBars firstBar rescaleToDaily [] = [] -barEndTime :: Bar -> Integer -> UTCTime +barEndTime :: Bar -> Int64 -> UTCTime barEndTime bar tframe = addUTCTime (fromIntegral $ (1 + barNumber (barTimestamp bar) tframe) * tframe) epoch -barStartTime :: Bar -> Integer -> UTCTime +barStartTime :: Bar -> Int64 -> UTCTime barStartTime bar tframe = addUTCTime (fromIntegral $ barNumber (barTimestamp bar) tframe * tframe) epoch -barNumber :: UTCTime -> Integer -> Integer +barNumber :: UTCTime -> Int64 -> Int64 barNumber ts barlen = floor (diffUTCTime ts epoch) `div` barlen epoch :: UTCTime diff --git a/stack.yaml b/stack.yaml index ae397ab..1f6d9a3 100644 --- a/stack.yaml +++ b/stack.yaml @@ -18,7 +18,7 @@ # # resolver: ./custom-snapshot.yaml # resolver: https://example.com/snapshots/2018-01-01.yaml -resolver: lts-17.14 +resolver: lts-18.18 # User packages to be built. # Various formats can be used as shown in the example below. @@ -48,6 +48,8 @@ extra-deps: - binary-ieee754-0.1.0.0 - th-printf-0.7 - normaldistribution-1.1.0.3 +- co-log-0.4.0.1@sha256:3d4c17f37693c80d1aa2c41669bc3438fac3e89dc5f479e57d79bc3ddc4dfcc5,5087 +- ansi-terminal-0.10.3@sha256:e2fbcef5f980dc234c7ad8e2fa433b0e8109132c9e643bc40ea5608cd5697797,3226 # Override default flag values for local packages and extra-deps # flags: {} diff --git a/test/ArbitraryInstances.hs b/test/ArbitraryInstances.hs index e732f7d..e857cb4 100644 --- a/test/ArbitraryInstances.hs +++ b/test/ArbitraryInstances.hs @@ -52,7 +52,7 @@ instance Arbitrary OrderPrice where | v == 2 -> Limit <$> arbitrary `suchThat` notTooBig | v == 3 -> Stop <$> arbitrary `suchThat` notTooBig <*> arbitrary `suchThat` notTooBig | v == 4 -> StopMarket <$> arbitrary `suchThat` notTooBig - | otherwise -> fail "Invalid case" + | otherwise -> error "invalid case" instance Arbitrary Operation where arbitrary = elements [Buy, Sell] diff --git a/test/Spec.hs b/test/Spec.hs index 364f9e6..efdc2c4 100644 --- a/test/Spec.hs +++ b/test/Spec.hs @@ -1,6 +1,6 @@ import qualified Test.BarAggregator +import qualified Test.Driver.Junction.QuoteThread import qualified Test.RoboCom.Indicators -import qualified Test.RoboCom.Positions import qualified Test.RoboCom.Utils import Test.Tasty @@ -11,9 +11,9 @@ main = defaultMain $ testGroup "Tests" [unitTests, properties] unitTests :: TestTree unitTests = testGroup "Unit Tests" [Test.RoboCom.Indicators.unitTests, - Test.RoboCom.Positions.unitTests, Test.RoboCom.Utils.unitTests, - Test.BarAggregator.unitTests ] + Test.BarAggregator.unitTests, + Test.Driver.Junction.QuoteThread.unitTests] properties :: TestTree properties = testGroup "Properties" diff --git a/test/Test/BarAggregator.hs b/test/Test/BarAggregator.hs index f6c5d0c..878c241 100644 --- a/test/Test/BarAggregator.hs +++ b/test/Test/BarAggregator.hs @@ -30,11 +30,6 @@ unitTests = testGroup "BarAggregator" [ , testOneTick , testTwoTicksInSameBar , testTwoTicksInDifferentBars - , testOneBar - , testTwoBarsInSameBar - , testTwoBarsInSameBarLastBar - , testNextBarAfterBarClose - , testUpdateTime ] properties = testGroup "BarAggregator" [ @@ -42,6 +37,8 @@ properties = testGroup "BarAggregator" [ prop_ticksInTwoBars ] +secParams = InstrumentParameters 1 0.01 + testUnknownBarSeries :: TestTree testUnknownBarSeries = testCase "Tick with unknown ticker id" $ do let agg = BarAggregator M.empty M.empty [(0, 86400)] @@ -59,7 +56,7 @@ testUnknownBarSeries = testCase "Tick with unknown ticker id" $ do testOneTick :: TestTree testOneTick = testCase "One tick" $ do - let series = BarSeries "TEST_TICKER" (Timeframe 60) [] + let series = BarSeries "TEST_TICKER" (BarTimeframe 60) [] secParams let agg = mkAggregatorFromBars (M.fromList [("TEST_TICKER", series)]) [(0, 86400)] let (mbar, newagg) = handleTick tick agg mbar @?= Nothing @@ -75,7 +72,7 @@ testOneTick = testCase "One tick" $ do testTwoTicksInSameBar :: TestTree testTwoTicksInSameBar = testCase "Two ticks - same bar" $ do - let series = BarSeries "TEST_TICKER" (Timeframe 60) [] + let series = BarSeries "TEST_TICKER" (BarTimeframe 60) [] secParams let agg = mkAggregatorFromBars (M.fromList [("TEST_TICKER", series)]) [(0, 86400)] let (mbar, newagg) = handleTick (tick testTimestamp1 12.00) agg mbar @?= Nothing @@ -94,7 +91,7 @@ testTwoTicksInSameBar = testCase "Two ticks - same bar" $ do testTwoTicksInDifferentBars :: TestTree testTwoTicksInDifferentBars = testCase "Two ticks - different bar" $ do - let series = BarSeries "TEST_TICKER" (Timeframe 60) [] + let series = BarSeries "TEST_TICKER" (BarTimeframe 60) [] secParams let agg = mkAggregatorFromBars (M.fromList [("TEST_TICKER", series)]) [(0, 86400)] let (mbar, newagg) = handleTick (tick testTimestamp1 12.00) agg mbar @?= Nothing @@ -111,120 +108,6 @@ testTwoTicksInDifferentBars = testCase "Two ticks - different bar" $ do value = fromDouble val, volume = 1 } -testOneBar :: TestTree -testOneBar = testCase "One bar" $ do - let series = BarSeries "TEST_TICKER" (Timeframe 3600) [] - let agg = mkAggregatorFromBars (M.fromList [("TEST_TICKER", series)]) [(0, 86400)] - let (mbar, newagg) = handleBar bar agg - mbar @?= Nothing - (bsBars <$> (M.lookup "TEST_TICKER" $ bars newagg)) @?= Just [Bar "TEST_TICKER" testTimestamp 12.00 18.00 10.00 12.00 68] - where - testTimestamp = (UTCTime (fromGregorian 1970 1 1) 60) - bar = Bar { - barSecurity = "TEST_TICKER", - barTimestamp = testTimestamp, - barOpen = fromDouble 12.00, - barHigh = fromDouble 18.00, - barLow = fromDouble 10.00, - barClose = fromDouble 12.00, - barVolume = 68 } - - -testTwoBarsInSameBar :: TestTree -testTwoBarsInSameBar = testCase "Two bars (smaller timeframe) - same bar" $ do - let series = BarSeries "TEST_TICKER" (Timeframe 600) [] - let agg = mkAggregatorFromBars (M.fromList [("TEST_TICKER", series)]) [(0, 86400)] - let (mbar, newagg) = handleBar (bar testTimestamp1 12.00 13.00 10.00 11.00 1) agg - mbar @?= Nothing - let (mbar', newagg') = handleBar (bar testTimestamp2 12.00 15.00 11.00 12.00 2) newagg - mbar' @?= Nothing - (bsBars <$> (M.lookup "TEST_TICKER" $ bars newagg')) @?= Just [Bar "TEST_TICKER" testTimestamp2 12.00 15.00 10.00 12.00 3] - where - testTimestamp1 = (UTCTime (fromGregorian 1970 1 1) 60) - testTimestamp2 = (UTCTime (fromGregorian 1970 1 1) 120) - bar ts o h l c v = Bar { - barSecurity = "TEST_TICKER", - barTimestamp = ts, - barOpen = fromDouble o, - barHigh = fromDouble h, - barLow = fromDouble l, - barClose = fromDouble c, - barVolume = v } - -testTwoBarsInSameBarLastBar :: TestTree -testTwoBarsInSameBarLastBar = testCase "Two bars (smaller timeframe) - same bar: last bar is exactly at the end of the bigger tf bar" $ do - let series = BarSeries "TEST_TICKER" (Timeframe 600) [] - let agg = mkAggregatorFromBars (M.fromList [("TEST_TICKER", series)]) [(0, 86400)] - let (mbar, newagg) = handleBar (bar testTimestamp1 12.00 13.00 10.00 11.00 1) agg - mbar @?= Nothing - let (mbar', newagg') = handleBar (bar testTimestamp2 12.00 15.00 11.00 12.00 2) newagg - let expectedBar = Bar "TEST_TICKER" testTimestamp2 12.00 15.00 10.00 12.00 3 - mbar' @?= Just expectedBar - (head . tail <$> bsBars <$> (M.lookup "TEST_TICKER" $ bars newagg')) @?= Just expectedBar - where - testTimestamp1 = (UTCTime (fromGregorian 1970 1 1) 560) - testTimestamp2 = (UTCTime (fromGregorian 1970 1 1) 600) - bar ts o h l c v = Bar { - barSecurity = "TEST_TICKER", - barTimestamp = ts, - barOpen = fromDouble o, - barHigh = fromDouble h, - barLow = fromDouble l, - barClose = fromDouble c, - barVolume = v } - -testNextBarAfterBarClose :: TestTree -testNextBarAfterBarClose = testCase "Three bars (smaller timeframe) - next bar after bigger tf bar close" $ do - let series = BarSeries "TEST_TICKER" (Timeframe 600) [] - let agg = mkAggregatorFromBars (M.fromList [("TEST_TICKER", series)]) [(0, 86400)] - let (_, newagg) = handleBar (bar testTimestamp1 12.00 13.00 10.00 11.00 1) agg - let (_, newagg') = handleBar (bar testTimestamp2 12.00 15.00 11.00 12.00 2) newagg - let (_, newagg'') = handleBar (bar testTimestamp3 12.00 15.00 11.00 12.00 12) newagg' - let expectedBar = Bar "TEST_TICKER" testTimestamp3 12.00 15.00 11.00 12.00 12 - (head <$> bsBars <$> (M.lookup "TEST_TICKER" $ bars newagg'')) @?= Just expectedBar - where - testTimestamp1 = (UTCTime (fromGregorian 1970 1 1) 560) - testTimestamp2 = (UTCTime (fromGregorian 1970 1 1) 600) - testTimestamp3 = (UTCTime (fromGregorian 1970 1 1) 660) - bar ts o h l c v = Bar { - barSecurity = "TEST_TICKER", - barTimestamp = ts, - barOpen = fromDouble o, - barHigh = fromDouble h, - barLow = fromDouble l, - barClose = fromDouble c, - barVolume = v } - -testUpdateTime :: TestTree -testUpdateTime = testCase "updateTime - next bar - creates new bar with zero volume" $ do - let series = BarSeries "TEST_TICKER" (Timeframe 3600) [] - let agg = mkAggregatorFromBars (M.fromList [("TEST_TICKER", series)]) [(0, 86400)] - let (_, newagg) = handleBar (bar testTimestamp1 12.00 13.00 10.00 11.00 1) agg - let (_, newagg') = handleBar (bar testTimestamp2 12.00 15.00 11.00 12.00 2) newagg - let (newBar, newagg'') = updateTime (tick testTimestamp4 13.00 100) newagg' - let expectedNewBar = Bar "TEST_TICKER" testTimestamp2 12.00 15.00 10.00 12.00 3 - let expectedBar = Bar "TEST_TICKER" testTimestamp4 13.00 13.00 13.00 13.00 0 - (head <$> bsBars <$> (M.lookup "TEST_TICKER" $ bars newagg'')) @?= Just expectedBar - newBar @?= Just expectedNewBar - where - testTimestamp1 = (UTCTime (fromGregorian 1970 1 1) 560) - testTimestamp2 = (UTCTime (fromGregorian 1970 1 1) 600) - testTimestamp3 = (UTCTime (fromGregorian 1970 1 1) 3600) - testTimestamp4 = (UTCTime (fromGregorian 1970 1 1) 3660) - tick ts v vol = Tick { - security = "TEST_TICKER" - , datatype = LastTradePrice - , timestamp = ts - , value = v - , volume = vol } - bar ts o h l c v = Bar { - barSecurity = "TEST_TICKER", - barTimestamp = ts, - barOpen = fromDouble o, - barHigh = fromDouble h, - barLow = fromDouble l, - barClose = fromDouble c, - barVolume = v } prop_allTicksInOneBar :: TestTree prop_allTicksInOneBar = testProperty "All ticks in one bar" $ property $ do @@ -240,13 +123,13 @@ prop_allTicksInOneBar = testProperty "All ticks in one bar" $ property $ do HH.assert $ null newbars where - genTick :: T.Text -> UTCTime -> Integer -> Gen Tick + genTick :: T.Text -> UTCTime -> Int -> Gen Tick genTick tickerId base tf = do - difftime <- fromRational . toRational . picosecondsToDiffTime <$> Gen.integral (Range.linear 0 (truncate 1e12 * tf)) - val <- fromDouble <$> Gen.double (Range.exponentialFloat 0.00001 100) - vol <- Gen.integral (Range.exponential 1 100) - return $ Tick tickerId LastTradePrice (difftime `addUTCTime` base) val vol - mkAggregator tickerId tf = mkAggregatorFromBars (M.singleton tickerId (BarSeries tickerId (Timeframe tf) [])) [(0, 86400)] + difftime <- fromRational . toRational . picosecondsToDiffTime <$> choose (0, truncate 1e12 * fromIntegral tf) + val <- arbitrary + vol <- arbitrary `suchThat` (> 0) + return $ Tick tickerId LastTradePrice (difftime `addUTCTime` baseTime) val vol + mkAggregator tickerId tf = mkAggregatorFromBars (M.singleton tickerId (BarSeries tickerId (BarTimeframe tf) [] secParams)) [(0, 86400)] currentBar tickerId agg = headMay =<< (bsBars <$> M.lookup tickerId (bars agg)) baseTime = UTCTime (fromGregorian 1970 1 1) 0 diff --git a/test/Test/Driver/Junction/QuoteThread.hs b/test/Test/Driver/Junction/QuoteThread.hs new file mode 100644 index 0000000..827fffa --- /dev/null +++ b/test/Test/Driver/Junction/QuoteThread.hs @@ -0,0 +1,112 @@ +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TypeSynonymInstances #-} + +module Test.Driver.Junction.QuoteThread +( + unitTests +) where + +import Test.Tasty +import Test.Tasty.HUnit +import Test.Tasty.QuickCheck as QC +import Test.Tasty.SmallCheck as SC + +import ATrade.Driver.Junction.QuoteThread (addSubscription, + startQuoteThread, + stopQuoteThread) +import ATrade.Quotes.HistoryProvider (HistoryProvider (..)) +import ATrade.Quotes.TickerInfoProvider (TickerInfoProvider (..)) +import ATrade.QuoteSource.Client (QuoteData (QDBar)) +import ATrade.QuoteSource.Server (QuoteSourceServerData (..), + startQuoteSourceServer, + stopQuoteSourceServer) +import ATrade.RoboCom.Types (BarSeries (bsBars), + BarSeriesId (BarSeriesId), + InstrumentParameters (InstrumentParameters)) +import ATrade.Types +import Control.Concurrent (forkIO, threadDelay) +import Control.Concurrent.BoundedChan (newBoundedChan, readChan, + writeChan) +import Control.Exception (bracket) +import Control.Monad (forever) +import Control.Monad.Reader +import Data.IORef (newIORef, readIORef) +import qualified Data.Map.Strict as M +import qualified Data.Text as T +import Data.Time (UTCTime (UTCTime), + fromGregorian) +import System.IO (BufferMode (LineBuffering), + hSetBuffering, stderr) +import System.Log.Formatter +import System.Log.Handler (setFormatter) +import System.Log.Handler.Simple +import System.Log.Logger +import System.ZMQ4 (withContext) +import Test.Mock.HistoryProvider (MockHistoryProvider, + mkMockHistoryProvider, + mockGetHistory) +import Test.Mock.TickerInfoProvider (MockTickerInfoProvider, + mkMockTickerInfoProvider, + mockGetInstrumentParameters) + +data TestEnv = + TestEnv + { + historyProvider :: MockHistoryProvider, + tickerInfoProvider :: MockTickerInfoProvider + } + +type TestM = ReaderT TestEnv IO + +instance HistoryProvider TestM where + getHistory tid tf from to = do + hp <- asks historyProvider + liftIO $ mockGetHistory hp tid tf from to + +instance TickerInfoProvider TestM where + getInstrumentParameters tickers = do + tip <- asks tickerInfoProvider + liftIO $ mockGetInstrumentParameters tip tickers + +qsEndpoint = "inproc://qs" + +mockHistoryProvider = mkMockHistoryProvider $ M.fromList [(BarSeriesId "FOO" (BarTimeframe 3600), bars)] + where + bars = [] + +mockTickerInfoProvider = mkMockTickerInfoProvider $ M.fromList [("FOO", InstrumentParameters 10 0.1)] + +unitTests = testGroup "Driver.Junction.QuoteThread" [ + testSubscription + ] + +testSubscription :: TestTree +testSubscription = testCase "Subscription" $ withContext $ \ctx -> do + barsRef <- newIORef M.empty + serverChan <- newBoundedChan 2000 + bracket + (startQuoteSourceServer serverChan ctx qsEndpoint defaultServerSecurityParams) + stopQuoteSourceServer $ \_ -> + bracket + (startQuoteThread barsRef ctx qsEndpoint Nothing Nothing (`runReaderT` (TestEnv mockHistoryProvider mockTickerInfoProvider))) + + stopQuoteThread $ \qt -> do + chan <- newBoundedChan 2000 + addSubscription qt "FOO" (BarTimeframe 3600) chan + + forkIO $ forever $ threadDelay 50000 >> writeChan serverChan (QSSBar (BarTimeframe 3600, bar)) + + clientData <- readChan chan + assertEqual "Invalid client data" clientData (QDBar (BarTimeframe 3600, bar)) + + bars <- readIORef barsRef + case M.lookup (BarSeriesId "FOO" (BarTimeframe 3600)) bars of + Just series -> assertBool "Length should be >= 1" $ (not . null . bsBars) series + Nothing -> assertFailure "Bar Series should be present" + where + bar = + Bar { + barSecurity="FOO", barTimestamp=UTCTime (fromGregorian 2021 11 20) 7200, barOpen=10, barHigh=12, barLow=9, barClose=11, barVolume=100 + } diff --git a/test/Test/Mock/HistoryProvider.hs b/test/Test/Mock/HistoryProvider.hs new file mode 100644 index 0000000..3dbef67 --- /dev/null +++ b/test/Test/Mock/HistoryProvider.hs @@ -0,0 +1,27 @@ + +module Test.Mock.HistoryProvider +( + MockHistoryProvider, + mkMockHistoryProvider, + mockGetHistory +) where + +import ATrade.Quotes.HistoryProvider +import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId), Bars) +import ATrade.Types (Bar (Bar, barTimestamp), + BarTimeframe (BarTimeframe), + TickerId) +import Control.Monad.IO.Class (MonadIO) +import qualified Data.Map.Strict as M +import Data.Time (UTCTime) + +data MockHistoryProvider = MockHistoryProvider (M.Map BarSeriesId [Bar]) + +mkMockHistoryProvider :: M.Map BarSeriesId [Bar] -> MockHistoryProvider +mkMockHistoryProvider = MockHistoryProvider + +mockGetHistory :: (MonadIO m) => MockHistoryProvider -> TickerId -> BarTimeframe -> UTCTime -> UTCTime -> m [Bar] +mockGetHistory (MockHistoryProvider bars) tid tf from to = + case M.lookup (BarSeriesId tid tf) bars of + Just series -> return $ filter (\bar -> (barTimestamp bar >= from) && (barTimestamp bar <= to)) series + Nothing -> return [] diff --git a/test/Test/Mock/TickerInfoProvider.hs b/test/Test/Mock/TickerInfoProvider.hs new file mode 100644 index 0000000..a0bc6d5 --- /dev/null +++ b/test/Test/Mock/TickerInfoProvider.hs @@ -0,0 +1,22 @@ + +module Test.Mock.TickerInfoProvider +( + MockTickerInfoProvider, + mkMockTickerInfoProvider, + mockGetInstrumentParameters +) where + +import ATrade.Quotes.TickerInfoProvider +import ATrade.RoboCom.Types (InstrumentParameters) +import ATrade.Types (TickerId) +import Control.Monad.IO.Class (MonadIO) +import qualified Data.Map.Strict as M +import Data.Maybe (catMaybes, mapMaybe) + +data MockTickerInfoProvider = MockTickerInfoProvider (M.Map TickerId InstrumentParameters) + +mkMockTickerInfoProvider :: (M.Map TickerId InstrumentParameters) -> MockTickerInfoProvider +mkMockTickerInfoProvider = MockTickerInfoProvider + +mockGetInstrumentParameters :: MockTickerInfoProvider -> [TickerId] -> IO [InstrumentParameters] +mockGetInstrumentParameters (MockTickerInfoProvider params) = return . mapMaybe (`M.lookup` params)