Execution layer for algorithmic trading
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

233 lines
13 KiB

4 years ago
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
4 years ago
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
module ATrade.Driver.Junction
(
junctionMain
) where
import ATrade.Broker.Client (startBrokerClient,
4 years ago
stopBrokerClient)
import ATrade.Broker.Protocol (Notification (OrderNotification, TradeNotification),
NotificationSqnum (unNotificationSqnum),
getNotificationSqnum)
import ATrade.Driver.Junction.BrokerService (BrokerService,
getNotifications,
mkBrokerService)
import ATrade.Driver.Junction.JunctionMonad (JunctionEnv (..),
JunctionM (..),
saveRobots,
startRobot)
4 years ago
import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (..),
4 years ago
ProgramOptions (ProgramOptions, configPath))
import ATrade.Driver.Junction.QuoteThread (DownloaderEnv (DownloaderEnv),
withQThread)
import ATrade.Driver.Junction.RemoteControl (handleRemoteControl)
import ATrade.Driver.Junction.RobotDriverThread (RobotDriverHandle, postNotificationEvent)
import ATrade.Driver.Junction.Types (StrategyDescriptorE,
confStrategy,
confTickers,
strategyState,
strategyTimers,
tickerId,
timeframe)
import ATrade.Logging (Message, Severity (Debug, Info, Trace, Warning),
fmtMessage,
logWith)
4 years ago
import ATrade.Quotes.QHP (mkQHPHandle)
import ATrade.RoboCom.Types (Bars,
TickerInfoMap)
import ATrade.Types (ClientSecurityParams (ClientSecurityParams),
4 years ago
OrderId,
Trade (tradeOrderId))
import Colog (LogAction (LogAction),
hoistLogAction,
logTextStdout,
(<&), (>$<))
import Colog.Actions (logTextHandle)
import Control.Concurrent.QSem (newQSem)
import Control.Monad (forM_, forever)
import Control.Monad.Extra (whenM)
4 years ago
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Reader (ReaderT (runReaderT))
import Data.IORef (IORef,
atomicModifyIORef',
newIORef,
readIORef)
4 years ago
import qualified Data.Map.Strict as M
import Data.Set (notMember)
import qualified Data.Set as S
4 years ago
import qualified Data.Text as T
import Data.Text.Encoding (encodeUtf8)
import Data.Text.IO (readFile)
import Database.Redis (ConnectInfo (..), PortID (UnixSocket),
4 years ago
checkedConnect,
defaultConnectInfo)
4 years ago
import Dhall (auto, input)
import Options.Applicative (Parser,
execParser,
fullDesc, header,
help, helper,
info, long,
metavar, progDesc,
short, strOption,
(<**>))
import Prelude hiding (log,
readFile)
import System.IO (BufferMode (LineBuffering),
Handle,
IOMode (AppendMode),
hSetBuffering,
withFile)
import System.ZMQ4 (Router (Router),
bind, withContext,
withSocket)
import System.ZMQ4.ZAP (loadCertificateFromFile)
import UnliftIO (MonadUnliftIO)
import UnliftIO.Exception (bracket)
import UnliftIO.QSem (QSem, withQSem)
4 years ago
locked :: (MonadIO m, MonadUnliftIO m) => QSem -> LogAction m a -> LogAction m a
locked sem action = LogAction (\m -> withQSem sem (action <& m))
logger :: (MonadIO m) => Handle -> LogAction m Message
logger h = fmtMessage >$< (logTextStdout <> logTextHandle h)
4 years ago
junctionMain :: M.Map T.Text StrategyDescriptorE -> IO ()
junctionMain descriptors = do
opts <- parseOptions
let initialLogger = fmtMessage >$< logTextStdout
logWith initialLogger Info "Junction" $ "Reading config from: " <> (T.pack . show) (configPath opts)
cfg <- readFile (configPath opts) >>= input auto
withFile (logBasePath cfg <> "/all.log") AppendMode $ \h -> do
hSetBuffering h LineBuffering
locksem <- newQSem 1
let globalLogger = locked locksem (logger h)
let log = logWith globalLogger
barsMap <- newIORef M.empty
tickerInfoMap <- newIORef M.empty
log Info "Junction" $ "Connecting to redis: " <> redisSocket cfg
redis <- checkedConnect (defaultConnectInfo { connectPort = UnixSocket (T.unpack $ redisSocket cfg) })
log Info "Junction" "redis: connected"
withContext $ \ctx -> do
log Debug "Junction" "0mq context created"
let downloaderEnv = DownloaderEnv (mkQHPHandle ctx (qhpEndpoint cfg)) ctx (qtisEndpoint cfg) (hoistLogAction liftIO globalLogger)
robotsMap <- newIORef M.empty
ordersMap <- newIORef M.empty
handledNotifications <- newIORef S.empty
withBroker cfg ctx robotsMap ordersMap handledNotifications globalLogger $ \bro ->
withQThread downloaderEnv barsMap tickerInfoMap cfg ctx globalLogger $ \qt ->
withSocket ctx Router $ \rcSocket -> do
liftIO $ bind rcSocket (T.unpack . remoteControlEndpoint $ cfg)
broService <- mkBrokerService bro ordersMap
let junctionLogAction = hoistLogAction liftIO globalLogger
let env =
JunctionEnv
{
peRedisSocket = redis,
peConfigPath = robotsConfigsPath cfg,
peQuoteThread = qt,
peBroker = bro,
peRobots = robotsMap,
peRemoteControlSocket = rcSocket,
peLogAction = junctionLogAction,
peIoLogAction = globalLogger,
peProgramConfiguration = cfg,
peBarsMap = barsMap,
peTickerInfoMap = tickerInfoMap,
peBrokerService = broService,
peDescriptors = descriptors
}
withJunction env $ do
startRobots cfg
forever $ do
notifications <- getNotifications broService
forM_ notifications (liftIO . handleBrokerNotification robotsMap ordersMap handledNotifications globalLogger)
saveRobots
handleRemoteControl 1000
where
startRobots :: ProgramConfiguration -> JunctionM ()
startRobots cfg = forM_ (instances cfg) startRobot
4 years ago
withJunction :: JunctionEnv -> JunctionM () -> IO ()
withJunction env = (`runReaderT` env) . unJunctionM
4 years ago
handleBrokerNotification :: IORef (M.Map T.Text RobotDriverHandle) ->
IORef (M.Map OrderId T.Text) ->
IORef (S.Set NotificationSqnum) ->
LogAction IO Message ->
Notification ->
IO ()
handleBrokerNotification robotsRef ordersMapRef handled logger' notification= do
logWith logger' Trace "Junction" $ "Incoming notification: " <> (T.pack . show . unNotificationSqnum . getNotificationSqnum) notification
whenM (notMember (getNotificationSqnum notification) <$> readIORef handled) $ do
robotsMap <- readIORef robotsRef
ordersMap <- readIORef ordersMapRef
case getNotificationTarget robotsMap ordersMap notification of
Just robot -> postNotificationEvent robot notification
Nothing -> do
logWith logger' Warning "Junction" $ "Unknown order: " <> (T.pack . show) (notificationOrderId notification)
logWith logger' Debug "Junction" $ "Ordermap: " <> (T.pack . show) (M.toList ordersMap)
atomicModifyIORef' handled (\s -> (S.insert (getNotificationSqnum notification) s, ()))
getNotificationTarget :: M.Map T.Text RobotDriverHandle -> M.Map OrderId T.Text -> Notification -> Maybe RobotDriverHandle
getNotificationTarget robotsMap ordersMap notification = do
robotId <- M.lookup (notificationOrderId notification) ordersMap
M.lookup robotId robotsMap
notificationOrderId (OrderNotification _ oid _) = oid
notificationOrderId (TradeNotification _ trade) = tradeOrderId trade
withBroker cfg ctx robotsMap ordersMap handled logger' f = do
securityParameters <- loadBrokerSecurityParameters cfg
bracket
(startBrokerClient
(brokerIdentity cfg)
(brokerEndpoint cfg)
[handleBrokerNotification robotsMap ordersMap handled logger']
logger')
stopBrokerClient f
loadBrokerSecurityParameters cfg =
case (brokerClientCert cfg, brokerServerCert cfg) of
(Just clientCertPath, Just serverCertPath) -> do
eClientCert <- loadCertificateFromFile clientCertPath
eServerCert <- loadCertificateFromFile serverCertPath
case (eClientCert, eServerCert) of
(Right clientCert, Right serverCert) -> return $ ClientSecurityParams (Just clientCert) (Just serverCert)
(_, _) -> return $ ClientSecurityParams Nothing Nothing
_ -> return $ ClientSecurityParams Nothing Nothing
parseOptions = execParser options
options = info (optionsParser <**> helper)
(fullDesc <>
progDesc "Robocom-zero junction mode driver" <>
header "robocom-zero-junction")
optionsParser :: Parser ProgramOptions
optionsParser = ProgramOptions
<$> strOption
(long "config" <>
short 'c' <>
metavar "FILENAME" <>
help "Configuration file path")