Browse Source

Adjust to libatrade-0.10

stable
Denis Tereshkin 4 years ago
parent
commit
7e72d1ae0a
  1. 94
      src/ATrade/Driver/Real/BrokerClientThread.hs

94
src/ATrade/Driver/Real/BrokerClientThread.hs

@ -1,4 +1,5 @@ @@ -1,4 +1,5 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TupleSections #-}
module ATrade.Driver.Real.BrokerClientThread (
startBrokerClientThread,
@ -27,47 +28,58 @@ import Data.Time.Clock @@ -27,47 +28,58 @@ import Data.Time.Clock
import System.Log.Logger
import System.ZMQ4 hiding (Event)
data BrokerCommand = BrokerSubmitOrder Order | BrokerCancelOrder Integer | BrokerRequestNotifications
data BrokerCommand = BrokerSubmitOrder Order | BrokerCancelOrder Integer | BrokerRequestNotifications | BrokerHandleNotification Notification
startBrokerClientThread :: T.Text -> Context -> T.Text -> BoundedChan BrokerCommand -> BoundedChan Event -> MVar a -> IO ThreadId
startBrokerClientThread instId ctx brEp ordersChan eventChan shutdownVar = forkIO $ whileM_ (isNothing <$> tryReadMVar shutdownVar) $
bracket (startBrokerClient (encodeUtf8 instId) ctx brEp defaultClientSecurityParams)
(\bro -> do
stopBrokerClient bro
debugM "Strategy" "Broker client: stop")
(\bs -> handle (\e -> do
warningM "Strategy" $ "Broker client: exception: " ++ show (e :: SomeException)
throwIO e) $ do
now <- getCurrentTime
lastNotificationTime <- newIORef now
whileM_ (andM [notTimeout lastNotificationTime, isNothing <$> tryReadMVar shutdownVar]) $ do
brokerCommand <- readChan ordersChan
case brokerCommand of
BrokerSubmitOrder order -> do
debugM "Strategy" $ "Submitting order: " ++ show order
maybeOid <- submitOrder bs order
debugM "Strategy" "Order submitted"
case maybeOid of
Right oid -> writeChan eventChan (OrderSubmitted order { orderId = oid })
Left errmsg -> debugM "Strategy" $ T.unpack $ "Error: " `T.append` errmsg
BrokerCancelOrder oid -> do
debugM "Strategy" $ "Cancelling order: " ++ show oid
_ <- cancelOrder bs oid
debugM "Strategy" "Order cancelled"
BrokerRequestNotifications -> do
t <- getCurrentTime
nt <- readIORef lastNotificationTime
when (t `diffUTCTime` nt > 1) $ do
maybeNs <- getNotifications bs
case maybeNs of
startBrokerClientThread :: T.Text -> Context -> T.Text -> T.Text -> BoundedChan BrokerCommand -> BoundedChan Event -> MVar a -> IO ThreadId
startBrokerClientThread instId ctx brEp notifEp ordersChan eventChan shutdownVar = do
let callback = writeChan ordersChan . BrokerHandleNotification
forkIO $ whileM_ (isNothing <$> tryReadMVar shutdownVar) $
bracket (startBrokerClient (encodeUtf8 instId) ctx brEp notifEp [callback] defaultClientSecurityParams)
(\bro -> do
stopBrokerClient bro
debugM "Strategy" "Broker client: stop")
(\bs -> handle (\e -> do
warningM "Strategy" $ "Broker client: exception: " ++ show (e :: SomeException)
throwIO e) $ do
now <- getCurrentTime
lastNotificationTime <- newIORef now
lastKnownSqnum <- newIORef 0
whileM_ (andM [notTimeout lastNotificationTime, isNothing <$> tryReadMVar shutdownVar]) $ do
brokerCommand <- readChan ordersChan
case brokerCommand of
BrokerSubmitOrder order -> do
debugM "Strategy" $ "Submitting order: " ++ show order
result <- submitOrder bs order
debugM "Strategy" "Order submitted"
case result of
Right _ -> debugM "Strategy" $ "Order submitted: " ++ show (orderId order)
Left errmsg -> debugM "Strategy" $ T.unpack $ "Error: " `T.append` errmsg
Right ns -> do
mapM_ (sendNotification eventChan) ns
getCurrentTime >>= (writeIORef lastNotificationTime)
nTimeout <- notTimeout lastNotificationTime
shouldShutdown <- isNothing <$> tryReadMVar shutdownVar
debugM "Strategy" $ "Broker loop end: " ++ show nTimeout ++ "/" ++ show shouldShutdown)
BrokerCancelOrder oid -> do
debugM "Strategy" $ "Cancelling order: " ++ show oid
_ <- cancelOrder bs oid
debugM "Strategy" "Order cancelled"
BrokerRequestNotifications -> do
t <- getCurrentTime
nt <- readIORef lastNotificationTime
when (t `diffUTCTime` nt > 1) $ do
maybeNs <- getNotifications bs
case maybeNs of
Left errmsg -> debugM "Strategy" $ T.unpack $ "Error: " `T.append` errmsg
Right ns -> do
mapM_ (\n -> do
prevSqnum <- atomicModifyIORef lastKnownSqnum (\s -> (getNotificationSqnum n, s))
when (prevSqnum + 1 < getNotificationSqnum n) $
warningM "Strategy" $ "Sqnum jump: " ++ show prevSqnum ++ "->" ++ show (getNotificationSqnum n)
sendNotification eventChan n) ns
getCurrentTime >>= writeIORef lastNotificationTime
BrokerHandleNotification notification -> do
sendNotification eventChan n
prevSqnum <- atomicModifyIORef lastKnownSqnum (\s -> (getNotificationSqnum n, s))
undefined
nTimeout <- notTimeout lastNotificationTime
shouldShutdown <- isNothing <$> tryReadMVar shutdownVar
debugM "Strategy" $ "Broker loop end: " ++ show nTimeout ++ "/" ++ show shouldShutdown)
notTimeout :: IORef UTCTime -> IO Bool
notTimeout ts = do
@ -78,5 +90,5 @@ notTimeout ts = do @@ -78,5 +90,5 @@ notTimeout ts = do
sendNotification :: BoundedChan Event -> Notification -> IO ()
sendNotification eventChan notification =
writeChan eventChan $ case notification of
OrderNotification oid state -> OrderUpdate oid state
TradeNotification trade -> NewTrade trade
OrderNotification sqnum oid state -> OrderUpdate oid state
TradeNotification sqnum trade -> NewTrade trade

Loading…
Cancel
Save