diff --git a/app/Main.hs b/app/Main.hs index 87b10b0..7c03401 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -12,11 +12,13 @@ import Control.Concurrent.BoundedChan import Data.ATrade import QuoteSource.TableParsers.AllParamsTableParser import QuoteSource.TableParser +import QuoteSource.Server import System.Log.Logger import System.Log.Handler.Simple import System.Log.Handler (setFormatter) import System.Log.Formatter +import System.ZMQ4 import Data.Aeson import Data.Aeson.Types @@ -78,16 +80,18 @@ main = do config <- readConfig "quik-connector.config.json" infoM "main" "Config loaded" chan <- newBoundedChan 1000 - forkIO $ forever $ do - tick <- readChan chan - when (datatype tick == Price) $ print tick infoM "main" "Starting data import server" dis <- initDataImportServer [MkTableParser $ mkAllParamsTableParser "allparams"] chan "atrade" - void initGUI - window <- windowNew - window `on` deleteEvent $ do - liftIO mainQuit - return False - widgetShowAll window - mainGUI + withContext (\ctx -> do + qsServer <- startQuoteSourceServer chan ctx (quotesourceEndpoint config) + + void initGUI + window <- windowNew + window `on` deleteEvent $ do + liftIO mainQuit + return False + widgetShowAll window + mainGUI + stopQuoteSourceServer qsServer + infoM "main" "Main thread done") diff --git a/quik-connector.cabal b/quik-connector.cabal index 1726cfa..54bbf9d 100644 --- a/quik-connector.cabal +++ b/quik-connector.cabal @@ -20,6 +20,7 @@ library , Data.ATrade , QuoteSource.TableParser , QuoteSource.TableParsers.AllParamsTableParser + , QuoteSource.Server ghc-options: -Wincomplete-patterns build-depends: base >= 4.7 && < 5 , Win32 @@ -57,6 +58,7 @@ executable quik-connector-exe , unordered-containers , vector , text + , zeromq4-haskell default-language: Haskell2010 extra-libraries: "user32" diff --git a/src/Broker.hs b/src/Broker.hs new file mode 100644 index 0000000..3008fd2 --- /dev/null +++ b/src/Broker.hs @@ -0,0 +1,50 @@ + +module Broker ( +) where + +import Data.Decimal +import Data.Time.Clock + +data SignalId = SignalId { + strategyId :: String, + signalName :: String, + comment :: String } + deriving (Show, Eq) + +data OrderPrice = Market | Limit Decimal | Stop Decimal Decimal + +data Operation = Buy | Sell + deriving (Show, Eq) + +data OrderState = Unsubmitted + | Submitted + | PartiallyExecuted + | Executed + | Cancelled + | Rejected String + | Error String + +data Order = Order { + orderId :: Integer, + orderAccountId :: String, + orderSecurity :: String, + orderPrice :: OrderPrice, + orderQuantity :: Integer, + orderExecutedQuantity :: Integer, + orderOperation :: Operation, + orderState :: OrderState, + orderSignalId :: SignalId } + deriving (Show, Eq) + +data Trade = Trade { + tradeOrderId :: Integer, + tradePrice :: Decimal, + tradeQuantity :: Integer, + tradeVolume :: Decimal, + tradeVolumeCurrency :: String, + tradeAccount :: String, + tradeSecurity :: String, + tradeTimestamp :: UTCTime, + tradeSignalId :: SignalId } + deriving (Show, Eq) + diff --git a/src/QuoteSource/Server.hs b/src/QuoteSource/Server.hs index 53e9f2f..05033d5 100644 --- a/src/QuoteSource/Server.hs +++ b/src/QuoteSource/Server.hs @@ -1,28 +1,36 @@ module QuoteSource.Server ( + startQuoteSourceServer, + stopQuoteSourceServer ) where import System.ZMQ4 import Control.Concurrent.BoundedChan import Data.ATrade -import Control.Concurrent +import Control.Concurrent hiding (readChan) import Control.Monad +import Control.Exception +import qualified Data.ByteString.Lazy as BL +import Data.List.NonEmpty hiding (map) +import System.Log.Logger data QuoteSourceServer = QuoteSourceServerState { ctx :: Context, outSocket :: Socket Pub, tickChannel :: BoundedChan Tick, - serverThread :: ThreadId + serverThreadId :: ThreadId } serverThread :: QuoteSourceServer -> IO () -serverThread state = finally serverThread' cleanup +serverThread state = do + finally serverThread' cleanup + debugM "QuoteSource" "server thread done" where cleanup = close $ outSocket state - serverThread = forever $ do + serverThread' = forever $ do tick <- readChan $ tickChannel state - sendMulti (outSocket state) serializeTick tick + sendMulti (outSocket state) $ fromList . map BL.toStrict $ serializeTick tick startQuoteSourceServer :: BoundedChan Tick -> Context -> String -> IO QuoteSourceServer startQuoteSourceServer chan c ep = do @@ -33,8 +41,11 @@ startQuoteSourceServer chan c ep = do ctx = c, outSocket = sock, tickChannel = chan, - serverThread = tid + serverThreadId = tid } stid <- forkIO $ serverThread state - return $ state { serverThread = stid } + return $ state { serverThreadId = stid } + +stopQuoteSourceServer :: QuoteSourceServer -> IO () +stopQuoteSourceServer server = killThread $ serverThreadId server