You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
82 lines
3.6 KiB
82 lines
3.6 KiB
{-# LANGUAGE OverloadedStrings #-} |
|
|
|
module ATrade.Driver.Real.BrokerClientThread ( |
|
startBrokerClientThread, |
|
BrokerCommand(..) |
|
) where |
|
|
|
import ATrade.Broker.Client |
|
import ATrade.Broker.Protocol |
|
import ATrade.RoboCom.Monad hiding (cancelOrder, |
|
submitOrder) |
|
import ATrade.Types |
|
|
|
import Control.Concurrent hiding (readChan, writeChan, |
|
writeList2Chan, yield) |
|
import Control.Concurrent.BoundedChan |
|
import Control.Exception |
|
import Control.Monad |
|
import Control.Monad.Loops |
|
|
|
import Data.IORef |
|
import Data.Maybe |
|
import qualified Data.Text as T |
|
import Data.Text.Encoding |
|
import Data.Time.Clock |
|
|
|
import System.Log.Logger |
|
import System.ZMQ4 hiding (Event) |
|
|
|
data BrokerCommand = BrokerSubmitOrder Order | BrokerCancelOrder Integer | BrokerRequestNotifications |
|
|
|
|
|
startBrokerClientThread :: T.Text -> Context -> T.Text -> BoundedChan BrokerCommand -> BoundedChan Event -> MVar a -> IO ThreadId |
|
startBrokerClientThread instId ctx brEp ordersChan eventChan shutdownVar = forkIO $ whileM_ (isNothing <$> tryReadMVar shutdownVar) $ |
|
bracket (startBrokerClient (encodeUtf8 instId) ctx brEp defaultClientSecurityParams) |
|
(\bro -> do |
|
stopBrokerClient bro |
|
debugM "Strategy" "Broker client: stop") |
|
(\bs -> handle (\e -> do |
|
warningM "Strategy" $ "Broker client: exception: " ++ show (e :: SomeException) |
|
throwIO e) $ do |
|
now <- getCurrentTime |
|
lastNotificationTime <- newIORef now |
|
whileM_ (andM [notTimeout lastNotificationTime, isNothing <$> tryReadMVar shutdownVar]) $ do |
|
brokerCommand <- readChan ordersChan |
|
case brokerCommand of |
|
BrokerSubmitOrder order -> do |
|
debugM "Strategy" $ "Submitting order: " ++ show order |
|
maybeOid <- submitOrder bs order |
|
debugM "Strategy" "Order submitted" |
|
case maybeOid of |
|
Right oid -> writeChan eventChan (OrderSubmitted order { orderId = oid }) |
|
Left errmsg -> debugM "Strategy" $ T.unpack $ "Error: " `T.append` errmsg |
|
BrokerCancelOrder oid -> do |
|
debugM "Strategy" $ "Cancelling order: " ++ show oid |
|
_ <- cancelOrder bs oid |
|
debugM "Strategy" "Order cancelled" |
|
BrokerRequestNotifications -> do |
|
t <- getCurrentTime |
|
nt <- readIORef lastNotificationTime |
|
when (t `diffUTCTime` nt > 1) $ do |
|
maybeNs <- getNotifications bs |
|
case maybeNs of |
|
Left errmsg -> debugM "Strategy" $ T.unpack $ "Error: " `T.append` errmsg |
|
Right ns -> do |
|
mapM_ (sendNotification eventChan) ns |
|
getCurrentTime >>= (writeIORef lastNotificationTime) |
|
nTimeout <- notTimeout lastNotificationTime |
|
shouldShutdown <- isNothing <$> tryReadMVar shutdownVar |
|
debugM "Strategy" $ "Broker loop end: " ++ show nTimeout ++ "/" ++ show shouldShutdown) |
|
|
|
notTimeout :: IORef UTCTime -> IO Bool |
|
notTimeout ts = do |
|
now <- getCurrentTime |
|
heartbeatTs <- readIORef ts |
|
return $ diffUTCTime now heartbeatTs < 30 |
|
|
|
sendNotification :: BoundedChan Event -> Notification -> IO () |
|
sendNotification eventChan notification = |
|
writeChan eventChan $ case notification of |
|
OrderNotification oid state -> OrderUpdate oid state |
|
TradeNotification trade -> NewTrade trade
|
|
|