diff --git a/src/ATrade/Driver/Real/BrokerClientThread.hs b/src/ATrade/Driver/Real/BrokerClientThread.hs index ba0ce49..2146b86 100644 --- a/src/ATrade/Driver/Real/BrokerClientThread.hs +++ b/src/ATrade/Driver/Real/BrokerClientThread.hs @@ -1,4 +1,5 @@ {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TupleSections #-} module ATrade.Driver.Real.BrokerClientThread ( startBrokerClientThread, @@ -27,47 +28,58 @@ import Data.Time.Clock import System.Log.Logger import System.ZMQ4 hiding (Event) -data BrokerCommand = BrokerSubmitOrder Order | BrokerCancelOrder Integer | BrokerRequestNotifications +data BrokerCommand = BrokerSubmitOrder Order | BrokerCancelOrder Integer | BrokerRequestNotifications | BrokerHandleNotification Notification - -startBrokerClientThread :: T.Text -> Context -> T.Text -> BoundedChan BrokerCommand -> BoundedChan Event -> MVar a -> IO ThreadId -startBrokerClientThread instId ctx brEp ordersChan eventChan shutdownVar = forkIO $ whileM_ (isNothing <$> tryReadMVar shutdownVar) $ - bracket (startBrokerClient (encodeUtf8 instId) ctx brEp defaultClientSecurityParams) - (\bro -> do - stopBrokerClient bro - debugM "Strategy" "Broker client: stop") - (\bs -> handle (\e -> do - warningM "Strategy" $ "Broker client: exception: " ++ show (e :: SomeException) - throwIO e) $ do - now <- getCurrentTime - lastNotificationTime <- newIORef now - whileM_ (andM [notTimeout lastNotificationTime, isNothing <$> tryReadMVar shutdownVar]) $ do - brokerCommand <- readChan ordersChan - case brokerCommand of - BrokerSubmitOrder order -> do - debugM "Strategy" $ "Submitting order: " ++ show order - maybeOid <- submitOrder bs order - debugM "Strategy" "Order submitted" - case maybeOid of - Right oid -> writeChan eventChan (OrderSubmitted order { orderId = oid }) - Left errmsg -> debugM "Strategy" $ T.unpack $ "Error: " `T.append` errmsg - BrokerCancelOrder oid -> do - debugM "Strategy" $ "Cancelling order: " ++ show oid - _ <- cancelOrder bs oid - debugM "Strategy" "Order cancelled" - BrokerRequestNotifications -> do - t <- getCurrentTime - nt <- readIORef lastNotificationTime - when (t `diffUTCTime` nt > 1) $ do - maybeNs <- getNotifications bs - case maybeNs of +startBrokerClientThread :: T.Text -> Context -> T.Text -> T.Text -> BoundedChan BrokerCommand -> BoundedChan Event -> MVar a -> IO ThreadId +startBrokerClientThread instId ctx brEp notifEp ordersChan eventChan shutdownVar = do + let callback = writeChan ordersChan . BrokerHandleNotification + forkIO $ whileM_ (isNothing <$> tryReadMVar shutdownVar) $ + bracket (startBrokerClient (encodeUtf8 instId) ctx brEp notifEp [callback] defaultClientSecurityParams) + (\bro -> do + stopBrokerClient bro + debugM "Strategy" "Broker client: stop") + (\bs -> handle (\e -> do + warningM "Strategy" $ "Broker client: exception: " ++ show (e :: SomeException) + throwIO e) $ do + now <- getCurrentTime + lastNotificationTime <- newIORef now + lastKnownSqnum <- newIORef 0 + whileM_ (andM [notTimeout lastNotificationTime, isNothing <$> tryReadMVar shutdownVar]) $ do + brokerCommand <- readChan ordersChan + case brokerCommand of + BrokerSubmitOrder order -> do + debugM "Strategy" $ "Submitting order: " ++ show order + result <- submitOrder bs order + debugM "Strategy" "Order submitted" + case result of + Right _ -> debugM "Strategy" $ "Order submitted: " ++ show (orderId order) Left errmsg -> debugM "Strategy" $ T.unpack $ "Error: " `T.append` errmsg - Right ns -> do - mapM_ (sendNotification eventChan) ns - getCurrentTime >>= (writeIORef lastNotificationTime) - nTimeout <- notTimeout lastNotificationTime - shouldShutdown <- isNothing <$> tryReadMVar shutdownVar - debugM "Strategy" $ "Broker loop end: " ++ show nTimeout ++ "/" ++ show shouldShutdown) + BrokerCancelOrder oid -> do + debugM "Strategy" $ "Cancelling order: " ++ show oid + _ <- cancelOrder bs oid + debugM "Strategy" "Order cancelled" + BrokerRequestNotifications -> do + t <- getCurrentTime + nt <- readIORef lastNotificationTime + when (t `diffUTCTime` nt > 1) $ do + maybeNs <- getNotifications bs + case maybeNs of + Left errmsg -> debugM "Strategy" $ T.unpack $ "Error: " `T.append` errmsg + Right ns -> do + mapM_ (\n -> do + prevSqnum <- atomicModifyIORef lastKnownSqnum (\s -> (getNotificationSqnum n, s)) + when (prevSqnum + 1 < getNotificationSqnum n) $ + warningM "Strategy" $ "Sqnum jump: " ++ show prevSqnum ++ "->" ++ show (getNotificationSqnum n) + sendNotification eventChan n) ns + getCurrentTime >>= writeIORef lastNotificationTime + BrokerHandleNotification notification -> do + sendNotification eventChan n + prevSqnum <- atomicModifyIORef lastKnownSqnum (\s -> (getNotificationSqnum n, s)) + + undefined + nTimeout <- notTimeout lastNotificationTime + shouldShutdown <- isNothing <$> tryReadMVar shutdownVar + debugM "Strategy" $ "Broker loop end: " ++ show nTimeout ++ "/" ++ show shouldShutdown) notTimeout :: IORef UTCTime -> IO Bool notTimeout ts = do @@ -78,5 +90,5 @@ notTimeout ts = do sendNotification :: BoundedChan Event -> Notification -> IO () sendNotification eventChan notification = writeChan eventChan $ case notification of - OrderNotification oid state -> OrderUpdate oid state - TradeNotification trade -> NewTrade trade + OrderNotification sqnum oid state -> OrderUpdate oid state + TradeNotification sqnum trade -> NewTrade trade