diff --git a/robocom-zero.cabal b/robocom-zero.cabal index a4e0f12..e42b95d 100644 --- a/robocom-zero.cabal +++ b/robocom-zero.cabal @@ -73,6 +73,7 @@ library , extra , co-log , text-show + , unliftio default-language: Haskell2010 other-modules: ATrade.Exceptions diff --git a/src/ATrade/Driver/Junction.hs b/src/ATrade/Driver/Junction.hs index 83be723..7b934b2 100644 --- a/src/ATrade/Driver/Junction.hs +++ b/src/ATrade/Driver/Junction.hs @@ -5,6 +5,7 @@ {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RankNTypes #-} module ATrade.Driver.Junction ( @@ -52,18 +53,19 @@ import ATrade.RoboCom.ConfigStorage (ConfigStorage (loa import ATrade.RoboCom.Monad (StrategyEnvironment (..)) import ATrade.RoboCom.Persistence (MonadPersistence (loadState, saveState)) import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId), - Bars, TickerInfoMap) + Bars, + TickerInfoMap) import ATrade.Types (ClientSecurityParams (ClientSecurityParams), OrderId, Trade (tradeOrderId)) import Colog (HasLog (getLogAction, setLogAction), - LogAction, + LogAction (LogAction), + hoistLogAction, logTextStdout, - (>$<)) + (<&), (>$<)) import Colog.Actions (logTextHandle) -import Control.Concurrent (threadDelay, QSem, waitQSem, signalQSem) -import Control.Exception.Safe (MonadThrow, - bracket) +import Control.Concurrent.QSem (newQSem) +import Control.Exception.Safe (MonadThrow) import Control.Monad (forM_, forever) import Control.Monad.Extra (whenM) import Control.Monad.IO.Class (MonadIO (liftIO)) @@ -113,6 +115,10 @@ import System.IO (BufferMode (LineBu withFile) import System.ZMQ4 (withContext) import System.ZMQ4.ZAP (loadCertificateFromFile) +import UnliftIO (MonadUnliftIO) +import UnliftIO.Concurrent (threadDelay) +import UnliftIO.Exception (bracket) +import UnliftIO.QSem (QSem, withQSem) data JunctionEnv = JunctionEnv @@ -174,6 +180,9 @@ instance QuoteStream JunctionM where return (SubscriptionId 0) -- TODO subscription Ids removeSubscription _ = undefined +locked :: (MonadIO m, MonadUnliftIO m) => QSem -> LogAction m a -> LogAction m a +locked sem action = LogAction (\m -> withQSem sem (action <& m)) + logger :: (MonadIO m) => Handle -> LogAction m Message logger h = fmtMessage >$< (logTextStdout <> logTextHandle h) @@ -189,7 +198,11 @@ junctionMain descriptors = do withFile (logBasePath cfg <> "/all.log") AppendMode $ \h -> do - let log = logWith (logger h) + hSetBuffering h LineBuffering + + locksem <- newQSem 1 + let globalLogger = locked locksem (logger h) + let log = logWith globalLogger barsMap <- newIORef M.empty tickerInfoMap <- newIORef M.empty @@ -199,15 +212,14 @@ junctionMain descriptors = do log Info "Junction" "redis: connected" withContext $ \ctx -> do log Debug "Junction" "0mq context created" - let downloaderLogAction = logger h - let downloaderEnv = DownloaderEnv (mkQHPHandle ctx (qhpEndpoint cfg)) ctx (qtisEndpoint cfg) downloaderLogAction + let downloaderEnv = DownloaderEnv (mkQHPHandle ctx (qhpEndpoint cfg)) ctx (qtisEndpoint cfg) (hoistLogAction liftIO globalLogger) robotsMap <- newIORef M.empty ordersMap <- newIORef M.empty handledNotifications <- newIORef S.empty - withBroker cfg ctx robotsMap ordersMap handledNotifications (logger h) $ \bro -> - withQThread downloaderEnv barsMap tickerInfoMap cfg ctx (logger h) $ \qt -> do + withBroker cfg ctx robotsMap ordersMap handledNotifications globalLogger $ \bro -> + withQThread downloaderEnv barsMap tickerInfoMap cfg ctx globalLogger $ \qt -> do broService <- mkBrokerService bro ordersMap - let junctionLogAction = logger h + let junctionLogAction = hoistLogAction liftIO globalLogger let env = JunctionEnv { @@ -219,10 +231,10 @@ junctionMain descriptors = do peLogAction = junctionLogAction } withJunction env $ do - startRobots h cfg barsMap tickerInfoMap broService + startRobots (hoistLogAction liftIO globalLogger) cfg barsMap tickerInfoMap broService forever $ do notifications <- liftIO $ getNotifications broService - forM_ notifications (liftIO . handleBrokerNotification robotsMap ordersMap handledNotifications (logger h)) + forM_ notifications (liftIO . handleBrokerNotification robotsMap ordersMap handledNotifications globalLogger) saveRobots liftIO $ threadDelay 1000000 where @@ -238,10 +250,11 @@ junctionMain descriptors = do currentTimers <- liftIO $ readIORef (strategyTimers inst) saveState currentTimers (strategyInstanceId inst <> ":timers") - startRobots :: Handle -> ProgramConfiguration -> IORef Bars -> IORef TickerInfoMap -> BrokerService -> JunctionM () - startRobots logHandle cfg barsMap tickerInfoMap broService = forM_ (instances cfg) $ \inst -> do + startRobots :: LogAction IO Message -> ProgramConfiguration -> IORef Bars -> IORef TickerInfoMap -> BrokerService -> JunctionM () + startRobots gLogger cfg barsMap tickerInfoMap broService = forM_ (instances cfg) $ \inst -> do now <- liftIO getCurrentTime - logWith (logger logHandle) Info "Junction" $ "Starting strategy: " <> (strategyBaseName inst) + let lLogger = hoistLogAction liftIO gLogger + logWith lLogger Info "Junction" $ "Starting strategy: " <> (strategyBaseName inst) case M.lookup (strategyBaseName inst) descriptors of Just (StrategyDescriptorE desc) -> do bigConf <- loadConfig (configKey inst) @@ -252,7 +265,7 @@ junctionMain descriptors = do rTimers <- loadState (stateKey inst <> ":timers") >>= liftIO . newIORef localH <- liftIO $ openFile (logBasePath cfg <> "/" <> T.unpack (strategyId inst) <> ".log") AppendMode liftIO $ hSetBuffering localH LineBuffering - let robotLogAction = logger logHandle <> (fmtMessage >$< logTextHandle localH) + let robotLogAction = (hoistLogAction liftIO gLogger) <> (fmtMessage >$< logTextHandle localH) stratEnv <- liftIO $ newIORef StrategyEnvironment { _seInstanceId = strategyId inst, @@ -265,8 +278,8 @@ junctionMain descriptors = do robot <- createRobotDriverThread inst desc (flip runReaderT robotEnv . unRobotM) bigConf rConf rState rTimers robotsMap' <- asks peRobots liftIO $ atomicModifyIORef' robotsMap' (\s -> (M.insert (strategyId inst) robot s, ())) - _ -> logWith (logger logHandle) Error (strategyId inst) $ "No tickers configured !!!" - Nothing -> logWith (logger logHandle) Error "Junction" $ "Unknown strategy: " <> (strategyBaseName inst) + _ -> logWith lLogger Error (strategyId inst) $ "No tickers configured !!!" + Nothing -> logWith lLogger Error "Junction" $ "Unknown strategy: " <> strategyBaseName inst toBarSeriesId t = BarSeriesId (tickerId t) (timeframe t) @@ -279,8 +292,8 @@ junctionMain descriptors = do LogAction IO Message -> Notification -> IO () - handleBrokerNotification robotsRef ordersMapRef handled logger notification= do - logWith logger Trace "Junction" $ "Incoming notification: " <> (T.pack . show . unNotificationSqnum . getNotificationSqnum) notification + handleBrokerNotification robotsRef ordersMapRef handled logger' notification= do + logWith logger' Trace "Junction" $ "Incoming notification: " <> (T.pack . show . unNotificationSqnum . getNotificationSqnum) notification whenM (notMember (getNotificationSqnum notification) <$> readIORef handled) $ do robotsMap <- readIORef robotsRef ordersMap <- readIORef ordersMapRef @@ -288,8 +301,8 @@ junctionMain descriptors = do case getNotificationTarget robotsMap ordersMap notification of Just robot -> postNotificationEvent robot notification Nothing -> do - logWith logger Warning "Junction" $ "Unknown order: " <> (T.pack . show) (notificationOrderId notification) - logWith logger Debug "Junction" $ "Ordermap: " <> (T.pack . show) (M.toList ordersMap) + logWith logger' Warning "Junction" $ "Unknown order: " <> (T.pack . show) (notificationOrderId notification) + logWith logger' Debug "Junction" $ "Ordermap: " <> (T.pack . show) (M.toList ordersMap) atomicModifyIORef' handled (\s -> (S.insert (getNotificationSqnum notification) s, ())) @@ -301,7 +314,7 @@ junctionMain descriptors = do notificationOrderId (OrderNotification _ oid _) = oid notificationOrderId (TradeNotification _ trade) = tradeOrderId trade - withBroker cfg ctx robotsMap ordersMap handled logger f = do + withBroker cfg ctx robotsMap ordersMap handled logger' f = do securityParameters <- loadBrokerSecurityParameters cfg bracket (startBrokerClient @@ -309,9 +322,9 @@ junctionMain descriptors = do ctx (brokerEndpoint cfg) (brokerNotificationEndpoint cfg) - [handleBrokerNotification robotsMap ordersMap handled logger] + [handleBrokerNotification robotsMap ordersMap handled logger'] securityParameters - logger) + logger') stopBrokerClient f loadBrokerSecurityParameters cfg =