Execution layer for algorithmic trading
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

186 lines
9.0 KiB

4 years ago
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
module ATrade.Driver.Junction
(
junctionMain
) where
4 years ago
import ATrade.Broker.Client (BrokerClientHandle,
startBrokerClient,
4 years ago
stopBrokerClient)
import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (brokerEndpoint, brokerNotificationEndpoint, instances, qhpEndpoint, qtisEndpoint, redisSocket, robotsConfigsPath),
ProgramOptions (ProgramOptions, configPath))
4 years ago
import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription, removeSubscription),
QuoteSubscription (QuoteSubscription),
SubscriptionId (SubscriptionId))
4 years ago
import ATrade.Driver.Junction.QuoteThread (DownloaderEnv (DownloaderEnv),
4 years ago
QuoteThreadHandle,
4 years ago
withQThread)
4 years ago
import qualified ATrade.Driver.Junction.QuoteThread as QT
import ATrade.Driver.Junction.RobotDriverThread (RobotEnv (..),
RobotM (..),
createRobotDriverThread)
4 years ago
import ATrade.Driver.Junction.Types (StrategyDescriptorE (StrategyDescriptorE),
StrategyInstanceDescriptor (..),
confStrategy)
import ATrade.Quotes.QHP (mkQHPHandle)
import ATrade.RoboCom.ConfigStorage (ConfigStorage (loadConfig))
import ATrade.RoboCom.Persistence (MonadPersistence (loadState, saveState))
import ATrade.Types (ClientSecurityParams (ClientSecurityParams))
import Control.Exception.Safe (MonadThrow,
bracket)
import Control.Monad (forM_)
4 years ago
import Control.Monad (void)
4 years ago
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Reader (MonadReader, ReaderT (runReaderT),
asks)
import Data.Aeson (eitherDecode,
encode)
import qualified Data.ByteString.Lazy as BL
import Data.Default (Default (def))
4 years ago
import Data.IORef (newIORef)
4 years ago
import qualified Data.Map.Strict as M
import qualified Data.Text as T
import Data.Text.Encoding (encodeUtf8)
import Data.Text.IO (readFile)
import Data.Time.Clock.POSIX (getPOSIXTime)
import Database.Redis (ConnectInfo (..),
Connection,
PortID (UnixSocket),
checkedConnect,
defaultConnectInfo,
get, mset,
runRedis)
import Dhall (auto, input)
import Options.Applicative (Parser,
execParser,
fullDesc, header,
help, helper,
info, long,
metavar, progDesc,
short, strOption,
(<**>))
import Prelude hiding (readFile)
import System.Log.Logger (warningM)
import System.ZMQ4 (withContext)
4 years ago
data JunctionEnv =
JunctionEnv
{
4 years ago
peRedisSocket :: Connection,
4 years ago
peConfigPath :: FilePath,
peQuoteThread :: QuoteThreadHandle,
peBroker :: BrokerClientHandle
}
4 years ago
newtype JunctionM a = JunctionM { unJunctionM :: ReaderT JunctionEnv IO a }
deriving (Functor, Applicative, Monad, MonadReader JunctionEnv, MonadIO, MonadThrow)
4 years ago
4 years ago
instance ConfigStorage JunctionM where
4 years ago
loadConfig key = do
basePath <- asks peConfigPath
let path = basePath <> "/" <> T.unpack key -- TODO fix path construction
liftIO $ readFile path >>= input auto
4 years ago
instance MonadPersistence JunctionM where
4 years ago
saveState newState key = do
conn <- asks peRedisSocket
now <- liftIO getPOSIXTime
res <- liftIO $ runRedis conn $ mset [(encodeUtf8 key, BL.toStrict $ encode newState),
(encodeUtf8 (key <> ":last_store") , encodeUtf8 . T.pack . show $ now)]
case res of
Left _ -> liftIO $ warningM "main" "Unable to save state"
Right _ -> return ()
loadState key = do
conn <- asks peRedisSocket
res <- liftIO $ runRedis conn $ get (encodeUtf8 key)
-- TODO: just chain eithers
case res of
Left _ -> do
liftIO $ warningM "main" "Unable to load state"
return def
Right maybeRawState ->
case maybeRawState of
Just rawState -> case eitherDecode $ BL.fromStrict rawState of
Left _ -> do
liftIO $ warningM "main" "Unable to decode state"
return def
Right decodedState -> return decodedState
Nothing -> do
liftIO $ warningM "main" "Unable to decode state"
return def
4 years ago
instance QuoteStream JunctionM where
addSubscription (QuoteSubscription ticker timeframe) chan = do
qt <- asks peQuoteThread
QT.addSubscription qt ticker timeframe chan
return (SubscriptionId 0) -- TODO subscription Ids
removeSubscription _ = undefined
4 years ago
junctionMain :: M.Map T.Text StrategyDescriptorE -> IO ()
junctionMain descriptors = do
opts <- parseOptions
cfg <- readFile (configPath opts) >>= input auto
4 years ago
barsMap <- newIORef M.empty
redis <- checkedConnect (defaultConnectInfo { connectPort = UnixSocket (T.unpack $ redisSocket cfg) })
withContext $ \ctx -> do
4 years ago
let downloaderEnv = DownloaderEnv (mkQHPHandle ctx (qhpEndpoint cfg)) ctx (qtisEndpoint cfg)
4 years ago
withBroker cfg ctx $ \bro ->
4 years ago
withQThread downloaderEnv barsMap cfg ctx $ \qt -> do
let env =
JunctionEnv
{
peRedisSocket = redis,
peConfigPath = robotsConfigsPath cfg,
peQuoteThread = qt,
peBroker = bro
}
withJunction env $
4 years ago
forM_ (instances cfg) $ \inst ->
case M.lookup (strategyBaseName inst) descriptors of
Just (StrategyDescriptorE desc) -> do
bigConf <- loadConfig (configKey inst)
rConf <- liftIO $ newIORef (confStrategy bigConf)
rState <- loadState (stateKey inst) >>= liftIO . newIORef
4 years ago
let robotEnv = RobotEnv rState rConf bro barsMap
4 years ago
createRobotDriverThread inst desc (flip runReaderT robotEnv . unRobotM) bigConf rConf rState
Nothing -> error "Unknown strategy"
where
4 years ago
withJunction :: JunctionEnv -> JunctionM () -> IO ()
withJunction env = (`runReaderT` env) . unJunctionM
4 years ago
withBroker cfg ctx f = bracket
(startBrokerClient
"broker"
ctx
(brokerEndpoint cfg)
(brokerNotificationEndpoint cfg)
[]
(ClientSecurityParams -- TODO load certificates from file
Nothing
Nothing))
stopBrokerClient f
parseOptions = execParser options
options = info (optionsParser <**> helper)
(fullDesc <>
progDesc "Robocom-zero junction mode driver" <>
header "robocom-zero-junction")
optionsParser :: Parser ProgramOptions
optionsParser = ProgramOptions
<$> strOption
(long "config" <>
short 'c' <>
metavar "FILENAME" <>
help "Configuration file path")