diff --git a/src/HistoryProviderServer.hs b/src/HistoryProviderServer.hs index 7c1ba51..0f5db12 100644 --- a/src/HistoryProviderServer.hs +++ b/src/HistoryProviderServer.hs @@ -12,51 +12,48 @@ module HistoryProviderServer , withHistoryProviderServer ) where -import ATrade.Logging (Message, - Severity (Debug, Info, Warning), - log) -import ATrade.Types (Bar (..), BarTimeframe (..), - TickerId, toDouble) -import Colog (HasLog (getLogAction, setLogAction), - LogAction (LogAction, unLogAction)) -import Control.Concurrent (ThreadId, forkIO) -import Control.Concurrent.STM (TVar, atomically, newTVarIO, - putTMVar, readTVarIO, takeTMVar, - writeTVar) -import Control.Concurrent.STM.TMVar (TMVar) -import Control.Exception (bracket) -import Control.Monad (forM_, void) -import Control.Monad.Extra (whileM) -import Control.Monad.IO.Class (MonadIO (liftIO)) -import Control.Monad.Reader (MonadReader, asks) -import Control.Monad.Trans.Reader (ReaderT (runReaderT)) -import Data.Aeson (FromJSON (..), eitherDecode, - withObject, (.:)) -import qualified Data.Aeson.KeyMap as KM -import Data.Aeson.Types as Aeson -import Data.Attoparsec.Text as Attoparsec -import Data.Binary.Put (putDoublele, putWord64le, runPut) -import qualified Data.ByteString as B -import qualified Data.ByteString.Lazy as BL -import qualified Data.List as L -import Data.List.NonEmpty (NonEmpty ((:|))) -import qualified Data.Text as T -import Data.Text.Encoding (encodeUtf8) -import Data.Time (Day, UTCTime (UTCTime), - fromGregorianValid) -import Data.Time.Clock (diffUTCTime, getCurrentTime, - secondsToDiffTime) -import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds) -import Prelude hiding (log) -import System.ZMQ4 (Context, Router (Router), bind, - close, receive, receiveMulti, - sendMulti, socket, withSocket) -import TickerInfoServer (TickerInfoServerHandle, - getAllTickers) -import TXMLConnector (TXMLConnectorHandle, makeRequest) -import TXMLConnector.Internal (HistoryRequest (..), - HistoryResponse (..), - Request (..), Response (..)) +import ATrade.Logging (Message, + Severity (Debug, Info, Warning), + log) +import ATrade.Types (Bar (..), BarTimeframe (..), + TickerId, toDouble) +import Colog (HasLog (getLogAction, setLogAction), + LogAction (LogAction, unLogAction)) +import Control.Concurrent (ThreadId, forkIO) +import Control.Concurrent.STM (TVar, atomically, newTVarIO, + readTVarIO, writeTVar) +import Control.Exception (bracket) +import Control.Monad (forM_, void) +import Control.Monad.Extra (whileM) +import Control.Monad.IO.Class (MonadIO (liftIO)) +import Control.Monad.Reader (MonadReader, asks) +import Control.Monad.Trans.Reader (ReaderT (runReaderT)) +import Data.Aeson (eitherDecode) +import qualified Data.Aeson.KeyMap as KM +import Data.Aeson.Types as Aeson +import Data.Attoparsec.Text as Attoparsec hiding (count) +import Data.Binary.Put (putDoublele, putWord64le, runPut) +import qualified Data.ByteString as B +import qualified Data.ByteString.Lazy as BL +import qualified Data.List as L +import Data.List.NonEmpty (NonEmpty ((:|))) +import qualified Data.Text as T +import Data.Text.Encoding (encodeUtf8) +import Data.Time (UTCTime (UTCTime), + fromGregorianValid) +import Data.Time.Clock (diffUTCTime, getCurrentTime, + secondsToDiffTime) +import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds) +import Prelude hiding (log) +import System.ZMQ4 (Context, Router (Router), bind, + close, receiveMulti, sendMulti, + socket) +import TickerInfoServer (TickerInfoServerHandle, + getAllTickers) +import TXMLConnector (TXMLConnectorHandle, makeRequest) +import TXMLConnector.Internal (HistoryRequest (..), + HistoryResponse (..), Request (..), + Response (..)) data HistoryProviderServerHandle = HistoryProviderServerHandle @@ -76,17 +73,6 @@ data Period = PeriodMonth deriving (Eq, Show) -parsePeriod :: T.Text -> Maybe Period -parsePeriod "M1" = Just Period1Min -parsePeriod "M5" = Just Period5Min -parsePeriod "M15" = Just Period15Min -parsePeriod "M30" = Just Period30Min -parsePeriod "H1" = Just PeriodHour -parsePeriod "D" = Just PeriodDay -parsePeriod "W" = Just PeriodWeek -parsePeriod "M" = Just PeriodMonth -parsePeriod _ = Nothing - periodToSeconds :: Period -> Int periodToSeconds Period1Min = 60 periodToSeconds Period5Min = 60 * 5 @@ -237,13 +223,13 @@ workThread = do Right request -> do response <- handleRequest sender request sendResponseWithDelimiter sock sender response - Left err -> log Warning "HistoryProviderServer.WorkThread" $ "Unable to parse request" + Left _ -> log Warning "HistoryProviderServer.WorkThread" "Unable to parse request" (sender:rawRq:_) -> case eitherDecode $ BL.fromStrict rawRq of Right request -> do response <- handleRequest sender request sendResponse sock sender response - Left err -> log Warning "HistoryProviderServer.WorkThread" $ "Unable to parse request" + Left _ -> log Warning "HistoryProviderServer.WorkThread" "Unable to parse request" _ -> log Warning "HistoryProviderServer.WorkThread" "Unable to parse request" liftIO $ readTVarIO runVar liftIO $ close sock diff --git a/src/TXMLConnector.hs b/src/TXMLConnector.hs index ae413f0..de5aa54 100644 --- a/src/TXMLConnector.hs +++ b/src/TXMLConnector.hs @@ -13,8 +13,12 @@ module TXMLConnector , makeBrokerBackend ) where +import ATrade.Broker.Backend (BrokerBackend (..), + BrokerBackendNotification (..)) import ATrade.Logging (Message, Severity (..), log, logWith) +import ATrade.QuoteSource.Server (QuoteSourceServerData (..)) +import ATrade.Types (Order, OrderId) import Colog (HasLog (getLogAction, setLogAction), LogAction (LogAction, unLogAction)) import Config (SubscriptionConfig (SubscriptionConfig), @@ -23,6 +27,7 @@ import Config (SubscriptionConfig (Subscriptio transaqLogPath, transaqLogin, transaqPassword, transaqPort) import Control.Concurrent (ThreadId, forkIO, threadDelay) +import Control.Concurrent.BoundedChan (BoundedChan) import Control.Concurrent.STM (TVar, atomically, modifyTVar', newEmptyTMVar, newEmptyTMVarIO, newTVarIO, orElse, putTMVar, @@ -33,89 +38,30 @@ import Control.Concurrent.STM (TVar, atomically, modifyTVar', import Control.Concurrent.STM.TBQueue (TBQueue, flushTBQueue, newTBQueue, readTBQueue, writeTBQueue) +import Control.Concurrent.STM.TMVar (TMVar) import Control.Monad (forM_, forever, void, when) import Control.Monad.Extra (whileM) +import Control.Monad.IO.Class (MonadIO (liftIO)) +import Control.Monad.Reader (ReaderT (runReaderT)) +import Control.Monad.Reader.Class (MonadReader) import qualified Data.Bimap as BM +import qualified Data.Map.Strict as M import Data.Maybe (mapMaybe) import qualified Data.Text as T +import Data.Time.Clock (UTCTime, getCurrentTime) import qualified Deque.Strict as D +import GHC.Exts (IsList (..)) +import Prelude hiding (log) import Text.XML.Light.Input (parseXML) import Text.XML.Light.Types (Content (Elem), Element (elName), QName (qName)) -import TickTable (TickTable, insertTick, - lookupTick, newTickTable) -import Transaq (AllTradesTrade (..), - Candle (..), ClientData (..), - CommandChangePass (..), - CommandConnect (..), - CommandDisconnect (CommandDisconnect), - CommandGetHistoryData (CommandGetHistoryData), - CommandServerStatus (..), - CommandSubscribe (..), - ConnectionState (Disconnected), - Language (LanguageEn), - MarketInfo (..), - OrderNotification (..), - OrderStatus (..), - Quotation (..), - ResponseAllTrades (ResponseAllTrades), - ResponseCandleKinds (ResponseCandleKinds), - ResponseCandles (..), - ResponseCandlesStatus (StatusPending), - ResponseClient (ResponseClient), - ResponseMarkets (ResponseMarkets), - ResponseOrders (ResponseOrders), - ResponseQuotations (ResponseQuotations), - ResponseQuotes (ResponseQuotes), - ResponseResult (..), - ResponseSecurities (ResponseSecurities), - ResponseTrades (ResponseTrades), - Security (..), SecurityId (..), - TradeNotification (..), - TransaqCommand (toXml), - TransaqResponse (..), - TransaqResponse (..), - TransaqResponseC (fromXml), - UnfilledAction (..), - kCandleKindId, kPeriod, state) +import TickerInfoServer (TickerInfoServerHandle) +import TickTable (newTickTable) +import Transaq (TransaqResponse) import TXML (LogLevel, MonadTXML, - freeCallback, initialize, - sendCommand, setCallback) - -import ATrade.Broker.Backend (BrokerBackend (..), - BrokerBackendNotification (..)) -import ATrade.QuoteSource.Server (QuoteSourceServerData (..)) -import ATrade.Types (Bar (..), - BarTimeframe (unBarTimeframe), - DataType (BestBid, BestOffer, LastTradePrice), - Order (..), OrderId, - OrderPrice (..), - OrderState (..), Tick (..), - TickerId, Trade (..), - fromDouble, toDouble) -import qualified ATrade.Types as AT -import Colog.Monad (WithLog) -import Control.Applicative ((<|>)) -import Control.Concurrent.BoundedChan (BoundedChan, writeChan) -import Control.Concurrent.STM.TMVar (TMVar) -import Control.Error (headMay) -import Control.Monad.IO.Class (MonadIO (liftIO)) -import Control.Monad.Reader (ReaderT (runReaderT)) -import Control.Monad.Reader.Class (MonadReader, asks) -import Data.Int (Int64) -import qualified Data.Map.Strict as M -import Data.Time.Clock (UTCTime, diffUTCTime, - getCurrentTime) -import FSM (FSMCallback (..), - FSMState (isTerminalState), - makeFsm, runFsm) -import GHC.Exts (IsList (..)) -import Prelude hiding (log) -import TickerInfoServer (TickerInfo (..), - TickerInfoServerHandle, - putTickerInfo) -import qualified Transaq + initialize, sendCommand, + setCallback) import qualified TXML import TXMLConnector.Internal (BrokerState (..), ConnectionStage (..), Env (..), @@ -128,10 +74,10 @@ import qualified Win32.TXML as TXMLImpl import qualified Linux.TXML as TXMLImpl #endif + data ConnectionParams = ConnectionParams - { - cpLogin :: T.Text + { cpLogin :: T.Text , cpPassword :: T.Text , cpHost :: T.Text , cpPort :: Int @@ -142,8 +88,7 @@ data ConnectionParams = data TXMLConnectorHandle = TXMLConnectorHandle - { - threadId :: ThreadId + { threadId :: ThreadId , notificationQueue :: TBQueue TransaqResponse , hRequestVar :: TMVar Request , hResponseVar :: TMVar (TMVar Response) @@ -165,71 +110,69 @@ instance HasLog Env Message App where getLogAction env = LogAction { unLogAction = liftIO . (unLogAction . logger $ env) } setLogAction _ env = env -- fuck it + start :: LogAction IO Message -> TransaqConnectorConfig -> BoundedChan QuoteSourceServerData -> TickerInfoServerHandle -> IO TXMLConnectorHandle -start logger config qssChannel tisH = do - logWith logger Info "TXMLConnector" "Starting" - notificationQueue <- atomically $ newTBQueue 50000 +start logger' config' qssChannel' tisH = do + logWith logger' Info "TXMLConnector" "Starting" + notificationQueue' <- atomically $ newTBQueue 50000 tickTable <- newTickTable - requestVar <- newEmptyTMVarIO - responseVar <- newEmptyTMVarIO - currentCandles <- newTVarIO [] - serverConnected <- liftIO $ newTVarIO StageConnection - candleKindMap <- newTVarIO M.empty - requestTimestamp <- getCurrentTime >>= newTVarIO + requestVar' <- newEmptyTMVarIO + responseVar' <- newEmptyTMVarIO + currentCandles' <- newTVarIO [] + serverConnected' <- liftIO $ newTVarIO StageConnection + candleKindMap' <- newTVarIO M.empty + requestTimestamp' <- getCurrentTime >>= newTVarIO orderMap <- newTVarIO M.empty notificationCallback <- newTVarIO Nothing orderTransactionIdMap <- newTVarIO BM.empty pendingOrders <- newTVarIO (fromList []) - runVar <- newEmptyTMVarIO - timerVar <- newEmptyTMVarIO - let brokerState = + runVar' <- newEmptyTMVarIO + timerVar' <- newEmptyTMVarIO + let brokerState' = BrokerState - { - bsOrderTransactionIdMap = orderTransactionIdMap + { bsOrderTransactionIdMap = orderTransactionIdMap , bsNotificationCallback = notificationCallback , bsOrderMap = orderMap , bsPendingOrders = pendingOrders } + let env = Env - { - qssChannel = qssChannel + { qssChannel = qssChannel' , tisHandle = tisH - , requestVar = requestVar - , responseVar = responseVar - , requestTimestamp = requestTimestamp - , currentCandles = currentCandles + , requestVar = requestVar' + , responseVar = responseVar' + , requestTimestamp = requestTimestamp' + , currentCandles = currentCandles' , tickMap = tickTable - , transaqQueue = notificationQueue - , logger = logger - , config = config - , serverConnected = serverConnected - , candleKindMap = candleKindMap - , brokerState = brokerState - , runVar = runVar - , timerVar = timerVar + , transaqQueue = notificationQueue' + , logger = logger' + , config = config' + , serverConnected = serverConnected' + , candleKindMap = candleKindMap' + , brokerState = brokerState' + , runVar = runVar' + , timerVar = timerVar' } - threadId <- forkIO $ (runReaderT . unApp) workThread env + workThreadId <- forkIO $ (runReaderT . unApp) workThread env return $ TXMLConnectorHandle - { - threadId = threadId - , notificationQueue = notificationQueue - , hRequestVar = requestVar - , hResponseVar = responseVar - , hRequestTimestamp = requestTimestamp + { threadId = workThreadId + , notificationQueue = notificationQueue' + , hRequestVar = requestVar' + , hResponseVar = responseVar' + , hRequestTimestamp = requestTimestamp' , hNotificationCallback = notificationCallback - , hRunVar = runVar + , hRunVar = runVar' } stop :: TXMLConnectorHandle -> IO () stop h = atomically $ putTMVar (hRunVar h) () - brSubmitOrder :: TXMLConnectorHandle -> Order -> IO () brSubmitOrder h order = void $ makeRequest h (RequestSubmitOrder order) @@ -240,8 +183,8 @@ brSetNotificationCallback :: TXMLConnectorHandle -> Maybe (BrokerBackendNotifica brSetNotificationCallback h cb = atomically $ writeTVar (hNotificationCallback h) cb makeBrokerBackend :: TXMLConnectorHandle -> T.Text -> BrokerBackend -makeBrokerBackend h account = - BrokerBackend [account] (brSetNotificationCallback h) (brSubmitOrder h) (brCancelOrder h) (TXMLConnector.stop h) +makeBrokerBackend h accountId = + BrokerBackend [accountId] (brSetNotificationCallback h) (brSubmitOrder h) (brCancelOrder h) (TXMLConnector.stop h) makeRequest :: TXMLConnectorHandle -> Request -> IO Response makeRequest h request = do diff --git a/src/TXMLConnector/Internal.hs b/src/TXMLConnector/Internal.hs index 7b3d454..1e653b5 100644 --- a/src/TXMLConnector/Internal.hs +++ b/src/TXMLConnector/Internal.hs @@ -1,8 +1,7 @@ -{-# LANGUAGE BangPatterns #-} -{-# LANGUAGE DuplicateRecordFields #-} -{-# LANGUAGE FlexibleContexts #-} -{-# LANGUAGE GeneralizedNewtypeDeriving #-} -{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE MultiParamTypeClasses #-} module TXMLConnector.Internal ( @@ -16,10 +15,8 @@ module TXMLConnector.Internal , HistoryResponse(..) ) where -import ATrade.Logging (Message, Severity (..), log, - logWith) -import Colog (HasLog (getLogAction, setLogAction), - LogAction (LogAction, unLogAction)) +import ATrade.Logging (Message, Severity (..), log) +import Colog (HasLog, LogAction (LogAction)) import Config (SubscriptionConfig (SubscriptionConfig), TransaqConnectorConfig (..), transaqHost, transaqLogLevel, @@ -36,7 +33,7 @@ import Control.Concurrent.STM (TVar, atomically, modifyTVar', import Control.Concurrent.STM.TBQueue (TBQueue, flushTBQueue, newTBQueue, readTBQueue, writeTBQueue) -import Control.Monad (forever, void, when) +import Control.Monad (forM_, forever, void, when) import Control.Monad.Extra (whileM) import qualified Data.Bimap as BM import Data.Maybe (mapMaybe) @@ -103,7 +100,6 @@ import Control.Applicative ((<|>)) import Control.Concurrent.BoundedChan (BoundedChan, writeChan) import Control.Concurrent.STM.TMVar (TMVar) import Control.Error (headMay) -import Control.Monad (forM_) import Control.Monad.IO.Class (MonadIO (liftIO)) import Control.Monad.Reader (ReaderT (runReaderT)) import Control.Monad.Reader.Class (MonadReader, asks) @@ -199,9 +195,6 @@ data HistoryResponse = } deriving (Show, Eq) -newtype RespCallback = - RespCallback { unCallback :: T.Text -> IO Bool } - workThread :: (MonadIO m, MonadReader Env m, MonadTXML m, @@ -213,9 +206,8 @@ workThread = do Left str -> log Error "TXMLConnector.WorkThread" $ "Unable to initialize TXML" <> str Right _ -> do queue <- asks transaqQueue - logger' <- asks logger - rc <- setCallback (parseAndWrite queue) - case rc of + rc' <- setCallback (parseAndWrite queue) + case rc' of Nothing -> log Error "TXMLConnector.WorkThread" "Unable to set callback" Just cb -> do serverConnectionState <- asks serverConnected @@ -273,10 +265,10 @@ handleTransaqData transaqData = do liftIO $ forM_ ticks (writeChan qssChan . QSSTick) liftIO $ forM_ ticks (insertTick tm) pure Nothing - TransaqResponseQuotations (ResponseQuotations quotations) -> do + TransaqResponseQuotations (ResponseQuotations quotations') -> do qssChan <- asks qssChannel now <- liftIO getCurrentTime - let ticks = concatMap (quotationToTicks now) quotations + let ticks = concatMap (quotationToTicks now) quotations' liftIO $ forM_ ticks (writeChan qssChan . QSSTick) liftIO $ forM_ ticks (insertTick tm) pure Nothing @@ -293,8 +285,7 @@ handleTransaqData transaqData = do liftIO $ atomically $ do candles <- readTVar cur putTMVar tmvar $ ResponseHistory $ HistoryResponse - { - hrBars = (candleToBar $ cSecurity respCandle) <$> (cCandles respCandle <> candles) + { hrBars = (candleToBar $ cSecurity respCandle) <$> (cCandles respCandle <> candles) , hrMoreData = False } _ -> log Warning "TXMLConnector.WorkThread" "Incoming candles without response var" @@ -308,7 +299,7 @@ handleTransaqData transaqData = do log Info "TXMLConnector.WorkThread" $ "Incoming securities: " <> (T.pack . show . length) securities forM_ tickerInfos (log Debug "TXMLConnector.WorkThread" . T.pack . show . tiTicker) forM_ tickerInfos (liftIO . putTickerInfo tisH) - makeSubscriptions cfg + _ <- makeSubscriptions cfg pure Nothing _ -> pure Nothing where @@ -470,14 +461,13 @@ handleConnected = do _ -> pure Nothing where - requestTimeout = 10 - + requestTimeoutValue = 10 checkRequestTimeout = do now <- liftIO getCurrentTime tsVar <- asks requestTimestamp ts <- liftIO $ readTVarIO tsVar - when (now `diffUTCTime` ts >= requestTimeout) $ do + when (now `diffUTCTime` ts >= requestTimeoutValue) $ do resp <- asks responseVar >>= liftIO . atomically . tryReadTMVar case resp of Just tmvar -> do @@ -551,17 +541,15 @@ handleGetInfo = do pure Nothing _ -> pure Nothing -makeSubscriptions :: (MonadIO m, - MonadTXML m, - HasLog Env Message m) => TransaqConnectorConfig -> m (Either T.Text ()) -makeSubscriptions config = sendCommand . toXml $ cmdSubscription config +makeSubscriptions :: (MonadTXML m) => TransaqConnectorConfig -> m (Either T.Text ()) +makeSubscriptions = sendCommand . toXml . cmdSubscription where - cmdSubscription config = + cmdSubscription config' = CommandSubscribe { - alltrades = fmap subscriptionToSecurityId (allTradesSubscriptions config), - quotations = fmap subscriptionToSecurityId (quotationsSubscriptions config), - quotes = fmap subscriptionToSecurityId (quotesSubscriptions config) + alltrades = fmap subscriptionToSecurityId (allTradesSubscriptions config'), + quotations = fmap subscriptionToSecurityId (quotationsSubscriptions config'), + quotes = fmap subscriptionToSecurityId (quotesSubscriptions config') } subscriptionToSecurityId (SubscriptionConfig brd code) = SecurityId brd code @@ -612,33 +600,30 @@ handleUnconnected = do allTradeToTick :: AllTradesTrade -> Tick allTradeToTick att = Tick - { - security = attBoard att <> "#" <> attSecCode att, - datatype = LastTradePrice, - timestamp = attTimestamp att, - value = fromDouble $ attPrice att, - volume = fromIntegral $ attQuantity att + { security = attBoard att <> "#" <> attSecCode att + , datatype = LastTradePrice + , timestamp = attTimestamp att + , value = fromDouble $ attPrice att + , volume = fromIntegral $ attQuantity att } quotationToTicks :: UTCTime -> Quotation -> [Tick] -quotationToTicks timestamp q = - let security = qBoard q <> "#" <> qSeccode q in +quotationToTicks timestamp' q = + let security' = qBoard q <> "#" <> qSeccode q in [ Tick - { - security = security, - datatype = BestBid, - timestamp = timestamp, - value = fromDouble $ qBid q, - volume = fromIntegral $ qQuantity q + { security = security' + , datatype = BestBid + , timestamp = timestamp' + , value = fromDouble $ qBid q + , volume = fromIntegral $ qQuantity q }, Tick - { - security = security, - datatype = BestOffer, - timestamp = timestamp, - value = fromDouble $ qOffer q, - volume = fromIntegral $ qQuantity q + { security = security' + , datatype = BestOffer + , timestamp = timestamp' + , value = fromDouble $ qOffer q + , volume = fromIntegral $ qQuantity q }] securityToTickerInfo :: Security -> TickerInfo @@ -663,7 +648,6 @@ parseAccountId accId = case T.findIndex (== '#') accId of Just ix -> Just (T.take ix accId, T.drop (ix + 1) accId) Nothing -> Nothing - mkNewOrderCommand :: Order -> Maybe Transaq.CommandNewOrder mkNewOrderCommand order = case parseSecurityId (orderSecurity order) of