Browse Source

TXMLConnector: extract handleTransaqData

master
Denis Tereshkin 3 years ago
parent
commit
13723bb436
  1. 210
      src/TXMLConnector/Internal.hs

210
src/TXMLConnector/Internal.hs

@ -221,7 +221,7 @@ workThread = do @@ -221,7 +221,7 @@ workThread = do
serverConnectionState <- asks serverConnected
timerVar' <- asks timerVar
void $ liftIO $ forkIO $ whileM $ do
threadDelay 1000000
threadDelay 5000000
void . liftIO . atomically $ tryPutTMVar timerVar' ()
connStatus <- liftIO . readTVarIO $ serverConnectionState
pure $ connStatus /= StageShutdown
@ -260,50 +260,25 @@ parseContent (Elem el) = parseElement @@ -260,50 +260,25 @@ parseContent (Elem el) = parseElement
_ -> Nothing
parseContent _ = Nothing
handleConnected :: (MonadIO m,
handleTransaqData :: (MonadIO m,
MonadReader Env m,
MonadTXML m,
HasLog Env Message m) => m (Maybe ConnectionStage)
handleConnected = do
checkRequestTimeout
rqVar <- asks requestVar
runVar' <- asks runVar
queue <- asks transaqQueue
timerVar' <- asks timerVar
item <- liftIO . atomically $
(readTMVar runVar' >> pure MainQueueShutdown) `orElse`
(MainQueueTransaqData <$> readTBQueue queue) `orElse`
(MainQueueRequest <$> takeTMVar rqVar) `orElse`
(takeTMVar timerVar' >> pure MainQueuePingServer)
case item of
MainQueueShutdown -> pure $ Just StageShutdown
MainQueuePingServer -> do
maybeServerStatus <- sendCommand $ toXml CommandServerStatus
case maybeServerStatus of
Left serverStatusRaw -> case mapMaybe parseContent $ parseXML serverStatusRaw of
((TransaqResponseResult (ResponseFailure _)):_) -> do
pure $ Just StageConnection
_ -> do
log Warning "TXMLConnector.WorkThread" $ "Unable to parser server status response: " <> (T.pack . show ) serverStatusRaw
pure Nothing
Right () -> pure Nothing
MainQueueTransaqData transaqData -> do
HasLog Env Message m) => TransaqResponse -> m (Maybe ConnectionStage)
handleTransaqData transaqData = do
tm <- asks tickMap
case transaqData of
TransaqResponseAllTrades (ResponseAllTrades trades) -> do
qssChan <- asks qssChannel
let ticks = fmap allTradeToTick trades
forM_ ticks (liftIO . writeChan qssChan . QSSTick)
forM_ ticks (insertToTickMap tm)
liftIO $ forM_ ticks (writeChan qssChan . QSSTick)
liftIO $ forM_ ticks (insertTick tm)
pure Nothing
TransaqResponseQuotations (ResponseQuotations quotations) -> do
qssChan <- asks qssChannel
now <- liftIO getCurrentTime
let ticks = concatMap (quotationToTicks now) quotations
forM_ ticks (liftIO . writeChan qssChan . QSSTick)
forM_ ticks (insertToTickMap tm)
liftIO $ forM_ ticks (writeChan qssChan . QSSTick)
liftIO $ forM_ ticks (insertTick tm)
pure Nothing
TransaqResponseCandles respCandle -> do
resp <- asks responseVar >>= liftIO . atomically . tryReadTMVar
@ -326,67 +301,17 @@ handleConnected = do @@ -326,67 +301,17 @@ handleConnected = do
pure Nothing
TransaqResponseOrders (ResponseOrders orders) -> forM_ orders handleOrder >> pure Nothing
TransaqResponseTrades (ResponseTrades trades) -> forM_ trades handleTrade >> pure Nothing
_ -> pure Nothing
MainQueueRequest (RequestHistory request) -> do
cur <- asks currentCandles
liftIO $ atomically $ writeTVar cur []
maybeCk <- M.lookup (unBarTimeframe . hrTimeframe $ request) <$> (asks candleKindMap >>= liftIO . readTVarIO)
case maybeCk of
Just candleKindId -> do
case parseSecurityId (hrTickerId request) of
Just secId -> void $ sendCommand $ toXml CommandGetHistoryData
{
security = secId
, periodId = candleKindId
, count = hrCount request
, reset = hrReset request
}
_ -> log Warning "TXMLConnector.WorkThread" $ "Unable to parse security ID: " <> hrTickerId request
_ -> log Warning "TXMLConnector.WorkThread" $ "Invalid candlekind requested" <> (T.pack . show . unBarTimeframe . hrTimeframe $ request)
pure Nothing
MainQueueRequest (RequestSubmitOrder order) -> do
case mkNewOrderCommand order of
Just cmd -> do
v <- sendCommand . toXml $ cmd
case v of
Left result -> do
case headMay (parseXML result) >>= parseContent of
Just (TransaqResponseResult (ResponseSuccess (Just transactionId))) -> do
brState <- asks brokerState
respVar <- asks responseVar
liftIO $ atomically $ do
modifyTVar' (bsOrderMap brState) (M.insert (orderId order) order)
modifyTVar' (bsOrderTransactionIdMap brState) (BM.insert (orderId order) (TransactionId transactionId))
resp <- readTMVar respVar
putTMVar resp ResponseOrderSubmitted
maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState)
case maybeCb of
Just cb -> do
let notif = BackendOrderNotification (orderId order) Submitted
liftIO $ cb notif
_ -> pure ()
log Debug "TXMLConnector.WorkThread" $ "Inserting orderid: " <>
(T.pack . show) (orderId order) <> " <-> " <> (T.pack . show) transactionId
Just (TransaqResponseResult (ResponseFailure err)) -> do
brState <- asks brokerState
log Debug "TXMLConnector.WorkThread" $ "Order submission failure: " <> err
maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState)
case maybeCb of
Just cb -> do
let notif = BackendOrderNotification (orderId order) Rejected
liftIO $ cb notif
_ -> pure ()
_ -> log Warning "TXMLConnector.WorkThread" "Unable to parse result"
pure Nothing
Right _ -> do
log Warning "TXMLConnector.WorkThread" "Expected result, got nothing"
TransaqResponseSecurities (ResponseSecurities securities) -> do
tisH <- asks tisHandle
cfg <- asks config
let tickerInfos = securityToTickerInfo <$> securities
log Info "TXMLConnector.WorkThread" $ "Incoming securities: " <> (T.pack . show . length) securities
forM_ tickerInfos (log Debug "TXMLConnector.WorkThread" . T.pack . show . tiTicker)
forM_ tickerInfos (liftIO . putTickerInfo tisH)
makeSubscriptions cfg
pure Nothing
_ -> pure Nothing
_ -> pure Nothing
where
requestTimeout = 10
handleTrade transaqTrade = do
brState <- asks brokerState
trIdMap <- liftIO $ readTVarIO (bsOrderTransactionIdMap brState)
@ -401,7 +326,7 @@ handleConnected = do @@ -401,7 +326,7 @@ handleConnected = do
liftIO $ cb notif
_ -> log Warning "TXMLConnector.WorkThread" $ "Unable to find order for trade: " <> (T.pack . show) transaqTrade
_ -> log Warning "TXMLConnector.WorkThread" $ "Unable to find order in ordermap: " <> (T.pack . show) transaqTrade
Nothing -> log Warning "TXMLConnector.WorkThread" $ "No callback for trade notification!"
Nothing -> log Warning "TXMLConnector.WorkThread" "No callback for trade notification!"
fromTransaqTrade transaqTrade order =
Trade
@ -457,6 +382,97 @@ handleConnected = do @@ -457,6 +382,97 @@ handleConnected = do
OrderWatching -> Unsubmitted
_ -> OrderError
handleConnected :: (MonadIO m,
MonadReader Env m,
MonadTXML m,
HasLog Env Message m) => m (Maybe ConnectionStage)
handleConnected = do
checkRequestTimeout
rqVar <- asks requestVar
runVar' <- asks runVar
queue <- asks transaqQueue
timerVar' <- asks timerVar
item <- liftIO . atomically $
(readTMVar runVar' >> pure MainQueueShutdown) `orElse`
(MainQueueTransaqData <$> readTBQueue queue) `orElse`
(MainQueueRequest <$> takeTMVar rqVar) `orElse`
(takeTMVar timerVar' >> pure MainQueuePingServer)
case item of
MainQueueShutdown -> pure $ Just StageShutdown
MainQueuePingServer -> do
maybeServerStatus <- sendCommand $ toXml CommandServerStatus
case maybeServerStatus of
Left serverStatusRaw -> case mapMaybe parseContent $ parseXML serverStatusRaw of
((TransaqResponseResult (ResponseFailure _)):_) -> do
pure $ Just StageConnection
_ -> do
log Warning "TXMLConnector.WorkThread" $ "Unable to parser server status response: " <> (T.pack . show ) serverStatusRaw
pure Nothing
Right () -> pure Nothing
MainQueueTransaqData transaqData -> handleTransaqData transaqData
MainQueueRequest (RequestHistory request) -> do
cur <- asks currentCandles
liftIO $ atomically $ writeTVar cur []
maybeCk <- M.lookup (unBarTimeframe . hrTimeframe $ request) <$> (asks candleKindMap >>= liftIO . readTVarIO)
case maybeCk of
Just candleKindId -> do
case parseSecurityId (hrTickerId request) of
Just secId -> void $ sendCommand $ toXml CommandGetHistoryData
{
security = secId
, periodId = candleKindId
, count = hrCount request
, reset = hrReset request
}
_ -> log Warning "TXMLConnector.WorkThread" $ "Unable to parse security ID: " <> hrTickerId request
_ -> log Warning "TXMLConnector.WorkThread" $ "Invalid candlekind requested" <> (T.pack . show . unBarTimeframe . hrTimeframe $ request)
pure Nothing
MainQueueRequest (RequestSubmitOrder order) -> do
case mkNewOrderCommand order of
Just cmd -> do
v <- sendCommand . toXml $ cmd
case v of
Left result -> do
case headMay (parseXML result) >>= parseContent of
Just (TransaqResponseResult (ResponseSuccess (Just transactionId))) -> do
brState <- asks brokerState
respVar <- asks responseVar
liftIO $ atomically $ do
modifyTVar' (bsOrderMap brState) (M.insert (orderId order) order)
modifyTVar' (bsOrderTransactionIdMap brState) (BM.insert (orderId order) (TransactionId transactionId))
resp <- readTMVar respVar
putTMVar resp ResponseOrderSubmitted
maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState)
case maybeCb of
Just cb -> do
let notif = BackendOrderNotification (orderId order) Submitted
liftIO $ cb notif
_ -> pure ()
log Debug "TXMLConnector.WorkThread" $ "Inserting orderid: " <>
(T.pack . show) (orderId order) <> " <-> " <> (T.pack . show) transactionId
Just (TransaqResponseResult (ResponseFailure err)) -> do
brState <- asks brokerState
log Debug "TXMLConnector.WorkThread" $ "Order submission failure: " <> err
maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState)
case maybeCb of
Just cb -> do
let notif = BackendOrderNotification (orderId order) Rejected
liftIO $ cb notif
_ -> pure ()
_ -> log Warning "TXMLConnector.WorkThread" "Unable to parse result"
pure Nothing
Right _ -> do
log Warning "TXMLConnector.WorkThread" "Expected result, got nothing"
pure Nothing
_ -> pure Nothing
_ -> pure Nothing
where
requestTimeout = 10
checkRequestTimeout = do
now <- liftIO getCurrentTime
tsVar <- asks requestTimestamp
@ -469,8 +485,6 @@ handleConnected = do @@ -469,8 +485,6 @@ handleConnected = do
liftIO . atomically . putTMVar tmvar $ ResponseTimeout
_ -> pure ()
insertToTickMap = insertTick
handleGetInfo :: (MonadIO m,
MonadReader Env m,
MonadTXML m,
@ -525,7 +539,7 @@ handleGetInfo = do @@ -525,7 +539,7 @@ handleGetInfo = do
tisH <- asks tisHandle
let tickerInfos = securityToTickerInfo <$> securities
log Info "TXMLConnector.WorkThread" $ "Incoming securities: " <> (T.pack . show . length) securities
forM_ tickerInfos (log Debug "TXMLConnector.WorkThread" . T.pack . show . tiTicker)
--forM_ tickerInfos (log Debug "TXMLConnector.WorkThread" . T.pack . show . tiTicker)
forM_ tickerInfos (liftIO . putTickerInfo tisH)
pure Nothing
TransaqResponseSecInfo secInfo -> do
@ -536,8 +550,12 @@ handleGetInfo = do @@ -536,8 +550,12 @@ handleGetInfo = do
"Incoming client data: " <> (T.pack . show) (cClientId clientData) <> "#" <> (T.pack . show) (cUnion clientData)
pure Nothing
_ -> pure Nothing
where
makeSubscriptions :: (MonadIO m,
MonadTXML m,
HasLog Env Message m) => TransaqConnectorConfig -> m (Either T.Text ())
makeSubscriptions config = sendCommand . toXml $ cmdSubscription config
where
cmdSubscription config =
CommandSubscribe
{

Loading…
Cancel
Save