@ -17,7 +17,9 @@ import ATrade.Broker.Client (BrokerClientHandle
@@ -17,7 +17,9 @@ import ATrade.Broker.Client (BrokerClientHandle
import ATrade.Broker.Protocol ( Notification ( OrderNotification , TradeNotification ) ,
NotificationSqnum ,
getNotificationSqnum )
import ATrade.Driver.Junction.ProgramConfiguration ( ProgramConfiguration ( brokerClientCert , brokerEndpoint , brokerNotificationEndpoint , brokerServerCert , instances , qhpEndpoint , qtisEndpoint , redisSocket , robotsConfigsPath ) ,
import ATrade.Driver.Junction.BrokerService ( BrokerService ,
mkBrokerService )
import ATrade.Driver.Junction.ProgramConfiguration ( ProgramConfiguration ( ProgramConfiguration , brokerClientCert , brokerEndpoint , brokerNotificationEndpoint , brokerServerCert , instances , qhpEndpoint , qtisEndpoint , redisSocket , robotsConfigsPath ) ,
ProgramOptions ( ProgramOptions , configPath ) )
import ATrade.Driver.Junction.QuoteStream ( QuoteStream ( addSubscription , removeSubscription ) ,
QuoteSubscription ( QuoteSubscription ) ,
@ -38,16 +40,17 @@ import ATrade.Driver.Junction.Types (StrategyDescriptor
@@ -38,16 +40,17 @@ import ATrade.Driver.Junction.Types (StrategyDescriptor
confStrategy ,
strategyState ,
strategyTimers )
import ATrade.Logging ( Message ,
Severity ( Info ) ,
import ATrade.Logging ( Message , Severity ( Debug , Error , Info , Trace , Warning ) ,
fmtMessage ,
logWarning ,
logWith )
import ATrade.Quotes.QHP ( mkQHPHandle )
import ATrade.RoboCom.ConfigStorage ( ConfigStorage ( loadConfig ) )
import ATrade.RoboCom.Monad ( StrategyEnvironment ( .. ) )
import ATrade.RoboCom.Persistence ( MonadPersistence ( loadState , saveState ) )
import ATrade.RoboCom.Types ( Bars )
import ATrade.Types ( ClientSecurityParams ( ClientSecurityParams ) ,
OrderId ,
Order , Order Id,
Trade ( tradeOrderId ) )
import Colog ( HasLog ( getLogAction , setLogAction ) ,
LogAction ,
@ -76,6 +79,7 @@ import qualified Data.Set as S
@@ -76,6 +79,7 @@ import qualified Data.Set as S
import qualified Data.Text as T
import Data.Text.Encoding ( encodeUtf8 )
import Data.Text.IO ( readFile )
import Data.Time ( getCurrentTime )
import Data.Time.Clock.POSIX ( getPOSIXTime )
import Database.Redis ( ConnectInfo ( .. ) ,
Connection ,
@ -162,8 +166,8 @@ junctionMain :: M.Map T.Text StrategyDescriptorE -> IO ()
@@ -162,8 +166,8 @@ junctionMain :: M.Map T.Text StrategyDescriptorE -> IO ()
junctionMain descriptors = do
opts <- parseOptions
let bootstrapLogAction = fmtMessage >$< logTextStdout
let log = logWith bootstrapLogAction
let logger = fmtMessage >$< logTextStdout
let log = logWith logger
log Info " Junction " $ " Reading config from: " <> ( T . pack . show ) ( configPath opts )
@ -171,15 +175,19 @@ junctionMain descriptors = do
@@ -171,15 +175,19 @@ junctionMain descriptors = do
barsMap <- newIORef M . empty
log Info " Junction " $ " Connecting to redis: " <> redisSocket cfg
redis <- checkedConnect ( defaultConnectInfo { connectPort = UnixSocket ( T . unpack $ redisSocket cfg ) } )
log Info " Junction " " redis: connected "
withContext $ \ ctx -> do
log Debug " Junction " " 0mq context created "
let downloaderLogAction = fmtMessage >$< logTextStdout
let downloaderEnv = DownloaderEnv ( mkQHPHandle ctx ( qhpEndpoint cfg ) ) ctx ( qtisEndpoint cfg ) downloaderLogAction
robotsMap <- newIORef M . empty
ordersMap <- newIORef M . empty
handledNotifications <- newIORef S . empty
withBroker cfg ctx robotsMap ordersMap handledNotifications $ \ bro ->
withBroker cfg ctx robotsMap ordersMap handledNotifications logger $ \ bro ->
withQThread downloaderEnv barsMap cfg ctx $ \ qt -> do
broService <- mkBrokerService bro ordersMap
let junctionLogAction = fmtMessage >$< logTextStdout
let env =
JunctionEnv
@ -192,7 +200,7 @@ junctionMain descriptors = do
@@ -192,7 +200,7 @@ junctionMain descriptors = do
peLogAction = junctionLogAction
}
withJunction env $ do
startRobots cfg bro barsMap
startRobots cfg barsMap broService
forever $ do
saveRobots
liftIO $ threadDelay 5000000
@ -209,7 +217,9 @@ junctionMain descriptors = do
@@ -209,7 +217,9 @@ junctionMain descriptors = do
currentTimers <- liftIO $ readIORef ( strategyTimers inst )
saveState currentTimers ( strategyInstanceId inst <> " :timers " )
startRobots cfg bro barsMap = forM_ ( instances cfg ) $ \ inst ->
startRobots :: ProgramConfiguration -> IORef Bars -> BrokerService -> JunctionM ()
startRobots cfg barsMap broService = forM_ ( instances cfg ) $ \ inst -> do
now <- liftIO getCurrentTime
case M . lookup ( strategyBaseName inst ) descriptors of
Just ( StrategyDescriptorE desc ) -> do
bigConf <- loadConfig ( configKey inst )
@ -217,7 +227,14 @@ junctionMain descriptors = do
@@ -217,7 +227,14 @@ junctionMain descriptors = do
rState <- loadState ( stateKey inst ) >>= liftIO . newIORef
rTimers <- loadState ( stateKey inst <> " :timers " ) >>= liftIO . newIORef
let robotLogAction = fmtMessage >$< logTextStdout
let robotEnv = RobotEnv rState rConf rTimers bro barsMap robotLogAction
stratEnv <- liftIO $ newIORef StrategyEnvironment
{
_seInstanceId = strategyId inst ,
_seAccount = " test " , -- TODO configure
_seVolume = 1 ,
_seLastTimestamp = now
}
let robotEnv = RobotEnv rState rConf rTimers barsMap stratEnv robotLogAction broService
robot <- createRobotDriverThread inst desc ( flip runReaderT robotEnv . unRobotM ) bigConf rConf rState rTimers
robotsMap' <- asks peRobots
liftIO $ atomicModifyIORef' robotsMap' ( \ s -> ( M . insert ( strategyId inst ) robot s , () ) )
@ -229,16 +246,20 @@ junctionMain descriptors = do
@@ -229,16 +246,20 @@ junctionMain descriptors = do
handleBrokerNotification :: IORef ( M . Map T . Text RobotDriverHandle ) ->
IORef ( M . Map OrderId T . Text ) ->
IORef ( S . Set NotificationSqnum ) ->
LogAction IO Message ->
Notification ->
IO ()
handleBrokerNotification robotsRef ordersMapRef handled notification =
handleBrokerNotification robotsRef ordersMapRef handled logger notification = do
logWith logger Trace " Junction " $ " Incoming notification: " <> ( T . pack . show ) notification
whenM ( notMember ( getNotificationSqnum notification ) <$> readIORef handled ) $ do
robotsMap <- readIORef robotsRef
ordersMap <- readIORef ordersMapRef
case getNotificationTarget robotsMap ordersMap notification of
Just robot -> postNotificationEvent robot notification
Nothing -> return () --logWarning "Junction" "Unknown order" -- TODO log
Nothing -> do
logWith logger Warning " Junction " $ " Unknown order: " <> ( T . pack . show ) ( notificationOrderId notification )
logWith logger Debug " Junction " $ " Ordermap: " <> ( T . pack . show ) ( M . toList ordersMap )
atomicModifyIORef' handled ( \ s -> ( S . insert ( getNotificationSqnum notification ) s , () ) )
@ -250,7 +271,7 @@ junctionMain descriptors = do
@@ -250,7 +271,7 @@ junctionMain descriptors = do
notificationOrderId ( OrderNotification _ oid _ ) = oid
notificationOrderId ( TradeNotification _ trade ) = tradeOrderId trade
withBroker cfg ctx robotsMap ordersMap handled f = do
withBroker cfg ctx robotsMap ordersMap handled logger f = do
securityParameters <- loadBrokerSecurityParameters cfg
bracket
( startBrokerClient
@ -258,8 +279,9 @@ junctionMain descriptors = do
@@ -258,8 +279,9 @@ junctionMain descriptors = do
ctx
( brokerEndpoint cfg )
( brokerNotificationEndpoint cfg )
[ handleBrokerNotification robotsMap ordersMap handled ]
securityParameters )
[ handleBrokerNotification robotsMap ordersMap handled logger ]
securityParameters
logger )
stopBrokerClient f
loadBrokerSecurityParameters cfg =