{-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} module ATrade.Driver.Junction ( junctionMain ) where import ATrade.Broker.Client (startBrokerClient, stopBrokerClient) import ATrade.Broker.Protocol (Notification (OrderNotification, TradeNotification), NotificationSqnum (unNotificationSqnum), getNotificationSqnum) import ATrade.Driver.Junction.BrokerService (getNotifications, mkBrokerService) import ATrade.Driver.Junction.JunctionMonad (JunctionEnv (..), JunctionM (..), saveRobots, startRobot) import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (..), ProgramOptions (ProgramOptions, configPath)) import ATrade.Driver.Junction.QuoteThread (DownloaderEnv (DownloaderEnv), withQThread) import ATrade.Driver.Junction.RemoteControl (handleRemoteControl) import ATrade.Driver.Junction.RobotDriverThread (RobotDriverHandle, postNotificationEvent) import ATrade.Driver.Junction.Types (StrategyDescriptorE) import ATrade.Logging (Message (..), Severity (Debug, Info, Trace, Warning), fmtMessage, logWith) import ATrade.Quotes.QHP (mkQHPHandle) import ATrade.Types (OrderId, Trade (tradeOrderId)) import Colog (LogAction (LogAction), cfilter, hoistLogAction, logTextStdout, (<&), (>$<)) import Colog.Actions (logTextHandle) import Control.Concurrent.QSem (newQSem) import Control.Monad (forM_, forever) import Control.Monad.Extra (whenM) import Control.Monad.IO.Class (MonadIO (liftIO)) import Control.Monad.Reader (ReaderT (runReaderT)) import Data.IORef (IORef, atomicModifyIORef', newIORef, readIORef) import qualified Data.Map.Strict as M import Data.Set (notMember) import qualified Data.Set as S import qualified Data.Text as T import Data.Text.IO (readFile) import Database.Redis (ConnectInfo (..), PortID (UnixSocket), checkedConnect, defaultConnectInfo) import Dhall (auto, input) import Options.Applicative (Parser, execParser, fullDesc, header, help, helper, info, long, metavar, progDesc, short, strOption, (<**>)) import Prelude hiding (log, readFile) import System.IO (BufferMode (LineBuffering), Handle, IOMode (AppendMode), hSetBuffering, withFile) import System.ZMQ4 (Router (Router), bind, withContext, withSocket) import UnliftIO (MonadUnliftIO) import UnliftIO.Exception (bracket) import UnliftIO.QSem (QSem, withQSem) locked :: (MonadIO m, MonadUnliftIO m) => QSem -> LogAction m a -> LogAction m a locked sem action = LogAction (\m -> withQSem sem (action <& m)) logger :: (MonadIO m) => M.Map T.Text Severity -> Handle -> LogAction m Message logger loglevels h = cfilter checkLoglevel (fmtMessage >$< (logTextStdout <> logTextHandle h)) where checkLoglevel msg = case M.lookup (msgComponent msg) loglevels of Just level -> msgSeverity msg >= level Nothing -> True junctionMain :: M.Map T.Text StrategyDescriptorE -> IO () junctionMain descriptors = do opts <- parseOptions let initialLogger = fmtMessage >$< logTextStdout logWith initialLogger Info "Junction" $ "Reading config from: " <> (T.pack . show) (configPath opts) cfg <- readFile (configPath opts) >>= input auto withFile (logBasePath cfg <> "/all.log") AppendMode $ \h -> do hSetBuffering h LineBuffering locksem <- newQSem 1 let globalLogger = locked locksem (logger (M.fromList $ logLevels cfg) h) let log = logWith globalLogger barsMap <- newIORef M.empty tickerInfoMap <- newIORef M.empty log Info "Junction" $ "Connecting to redis: " <> redisSocket cfg redis <- checkedConnect (defaultConnectInfo { connectPort = UnixSocket (T.unpack $ redisSocket cfg) }) log Info "Junction" "redis: connected" withContext $ \ctx -> do log Debug "Junction" "0mq context created" let downloaderEnv = DownloaderEnv (mkQHPHandle ctx (qhpEndpoint cfg)) ctx (qtisEndpoint cfg) (hoistLogAction liftIO globalLogger) robotsMap <- newIORef M.empty ordersMap <- newIORef M.empty handledNotifications <- newIORef S.empty withBroker cfg robotsMap ordersMap handledNotifications globalLogger $ \bro -> withQThread downloaderEnv barsMap tickerInfoMap cfg ctx globalLogger $ \qt -> withSocket ctx Router $ \rcSocket -> do liftIO $ bind rcSocket (T.unpack . remoteControlEndpoint $ cfg) broService <- mkBrokerService bro ordersMap let junctionLogAction = hoistLogAction liftIO globalLogger let env = JunctionEnv { peRedisSocket = redis, peConfigPath = robotsConfigsPath cfg, peQuoteThread = qt, peBroker = bro, peRobots = robotsMap, peRemoteControlSocket = rcSocket, peLogAction = junctionLogAction, peIoLogAction = globalLogger, peProgramConfiguration = cfg, peBarsMap = barsMap, peTickerInfoMap = tickerInfoMap, peBrokerService = broService, peDescriptors = descriptors } withJunction env $ do startRobots cfg forever $ do notifications <- getNotifications broService forM_ notifications (liftIO . handleBrokerNotification robotsMap ordersMap handledNotifications globalLogger) saveRobots handleRemoteControl 1000 where startRobots :: ProgramConfiguration -> JunctionM () startRobots cfg = forM_ (instances cfg) startRobot withJunction :: JunctionEnv -> JunctionM () -> IO () withJunction env = (`runReaderT` env) . unJunctionM handleBrokerNotification :: IORef (M.Map T.Text RobotDriverHandle) -> IORef (M.Map OrderId T.Text) -> IORef (S.Set NotificationSqnum) -> LogAction IO Message -> Notification -> IO () handleBrokerNotification robotsRef ordersMapRef handled logger' notification= do logWith logger' Trace "Junction" $ "Incoming notification: " <> (T.pack . show . unNotificationSqnum . getNotificationSqnum) notification whenM (notMember (getNotificationSqnum notification) <$> readIORef handled) $ do robotsMap <- readIORef robotsRef ordersMap <- readIORef ordersMapRef case getNotificationTarget robotsMap ordersMap notification of Just robot -> postNotificationEvent robot notification Nothing -> do logWith logger' Warning "Junction" $ "Unknown order: " <> (T.pack . show) (notificationOrderId notification) logWith logger' Debug "Junction" $ "Ordermap: " <> (T.pack . show) (M.toList ordersMap) atomicModifyIORef' handled (\s -> (S.insert (getNotificationSqnum notification) s, ())) getNotificationTarget :: M.Map T.Text RobotDriverHandle -> M.Map OrderId T.Text -> Notification -> Maybe RobotDriverHandle getNotificationTarget robotsMap ordersMap notification = do robotId <- M.lookup (notificationOrderId notification) ordersMap M.lookup robotId robotsMap notificationOrderId (OrderNotification _ oid _) = oid notificationOrderId (TradeNotification _ trade) = tradeOrderId trade withBroker cfg robotsMap ordersMap handled logger' f = do bracket (startBrokerClient (brokerIdentity cfg) (brokerEndpoint cfg) [handleBrokerNotification robotsMap ordersMap handled logger'] logger') stopBrokerClient f parseOptions = execParser options options = info (optionsParser <**> helper) (fullDesc <> progDesc "Robocom-zero junction mode driver" <> header "robocom-zero-junction") optionsParser :: Parser ProgramOptions optionsParser = ProgramOptions <$> strOption (long "config" <> short 'c' <> metavar "FILENAME" <> help "Configuration file path")