diff --git a/app/Main.hs b/app/Main.hs index 0af12f6..755efb7 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -9,6 +9,8 @@ import Control.Monad.IO.Class import Data.IORef import Graphics.UI.Gtk hiding (Action, backspace) import Control.Concurrent.BoundedChan +import Control.Concurrent.STM +import Control.Concurrent.STM.TBQueue import ATrade.Types import QuoteSource.TableParsers.AllParamsTableParser import QuoteSource.TableParser @@ -89,14 +91,15 @@ parseConfig = withObject "object" $ \obj -> do tableName = tn, tableParams = params } -forkBoundedChan :: Int -> BoundedChan Tick -> IO (ThreadId, BoundedChan Tick, BoundedChan QuoteSourceServerData) +forkBoundedChan :: Int -> TBQueue Tick -> IO (ThreadId, TBQueue Tick, TBQueue QuoteSourceServerData) forkBoundedChan size source = do - sink <- newBoundedChan size - sinkQss <- newBoundedChan size + sink <- atomically $ newTBQueue size + sinkQss <- atomically $ newTBQueue size tid <- forkIO $ forever $ do - v <- readChan source - tryWriteChan sink v - tryWriteChan sinkQss (QSSTick v) + v <- atomically $ readTBQueue source + atomically $ do + writeTBQueue sink v + writeTBQueue sinkQss (QSSTick v) return (tid, sink, sinkQss) @@ -108,7 +111,7 @@ main = do config <- readConfig "quik-connector.config.json" infoM "main" "Config loaded" - chan <- newBoundedChan 1000 + chan <- atomically $ newTBQueue 1000 infoM "main" "Starting data import server" dis <- initDataImportServer [MkTableParser $ mkAllParamsTableParser "allparams"] chan "atrade" diff --git a/quik-connector.cabal b/quik-connector.cabal index 2c9fc4b..8d7f621 100644 --- a/quik-connector.cabal +++ b/quik-connector.cabal @@ -51,6 +51,9 @@ library , split , bimap , safe + , conduit + , stm + , stm-conduit default-language: Haskell2010 extra-libraries: "user32" other-modules: System.Win32.XlParser @@ -75,6 +78,7 @@ executable quik-connector-exe , zeromq4-haskell , libatrade , transformers + , stm default-language: Haskell2010 extra-libraries: "user32" diff --git a/src/Broker/PaperBroker.hs b/src/Broker/PaperBroker.hs index 117a8b9..aebc7ee 100644 --- a/src/Broker/PaperBroker.hs +++ b/src/Broker/PaperBroker.hs @@ -11,6 +11,8 @@ import Control.DeepSeq import Data.Hashable import Data.Bits import Control.Concurrent.BoundedChan +import Control.Concurrent.STM +import Control.Concurrent.STM.TBQueue import ATrade.Types import Data.IORef import qualified Data.HashMap.Strict as M @@ -31,14 +33,14 @@ instance Hashable TickMapKey where data PaperBrokerState = PaperBrokerState { pbTid :: Maybe ThreadId, - tickChannel :: BoundedChan Tick, + tickChannel :: TBQueue Tick, tickMap :: M.HashMap TickMapKey Tick, orders :: M.HashMap OrderId Order, cash :: ! Decimal, notificationCallback :: Maybe (Notification -> IO ()) } -mkPaperBroker :: BoundedChan Tick -> Decimal -> [T.Text] -> IO BrokerInterface +mkPaperBroker :: TBQueue Tick -> Decimal -> [T.Text] -> IO BrokerInterface mkPaperBroker tickChan startCash accounts = do state <- newIORef PaperBrokerState { pbTid = Nothing, @@ -62,7 +64,7 @@ brokerThread :: IORef PaperBrokerState -> IO () brokerThread state = do chan <- tickChannel <$> readIORef state forever $ do - tick <- readChan chan + tick <- atomically $ readTBQueue chan atomicModifyIORef' state (\s -> (s { tickMap = M.insert (makeKey tick) tick $! tickMap s }, ()) ) where makeKey !tick = TickMapKey (security $! tick) (datatype tick) diff --git a/src/QuoteSource/DataImport.hs b/src/QuoteSource/DataImport.hs index 8a20f72..2a56239 100644 --- a/src/QuoteSource/DataImport.hs +++ b/src/QuoteSource/DataImport.hs @@ -7,6 +7,8 @@ module QuoteSource.DataImport ) where import Control.Concurrent.BoundedChan +import Control.Concurrent.STM +import Control.Concurrent.STM.TBQueue import Control.Monad.State.Strict import ATrade.Types import Data.IORef @@ -20,7 +22,7 @@ import qualified Data.Map as M data ServerState = ServerState { appName :: String, parsers :: IORef (M.Map String TableParserInstance), - tickChannel :: BoundedChan Tick + tickChannel :: TBQueue Tick } ddeCallback :: ServerState -> String -> (Int, Int, [XlData]) -> IO Bool @@ -32,12 +34,12 @@ ddeCallback state topic table = do let stateWithTimeHint = giveTimestampHint myParser timeHint let (ticks, newState) = runState (parseXlTable table) stateWithTimeHint modifyIORef' (parsers state) (\s -> newState `seq` s `seq` M.insert topic (MkTableParser newState) s) - writeList2Chan (tickChannel state) ticks + mapM_ (atomically . writeTBQueue (tickChannel state)) ticks return True _ -> return False -initDataImportServer :: [TableParserInstance] -> BoundedChan Tick -> String -> IO (ServerState, IORef DdeState) +initDataImportServer :: [TableParserInstance] -> TBQueue Tick -> String -> IO (ServerState, IORef DdeState) initDataImportServer parsers tickChan applicationName = do parsers <- newIORef $ M.fromList $ map (\(MkTableParser p) -> (getTableId p, MkTableParser p)) parsers let s = ServerState { appName = applicationName, parsers = parsers, tickChannel = tickChan }