Browse Source

Use TBQueue where needed

master
Denis Tereshkin 9 years ago
parent
commit
b748c4dba1
  1. 17
      app/Main.hs
  2. 4
      quik-connector.cabal
  3. 8
      src/Broker/PaperBroker.hs
  4. 8
      src/QuoteSource/DataImport.hs

17
app/Main.hs

@ -9,6 +9,8 @@ import Control.Monad.IO.Class
import Data.IORef import Data.IORef
import Graphics.UI.Gtk hiding (Action, backspace) import Graphics.UI.Gtk hiding (Action, backspace)
import Control.Concurrent.BoundedChan import Control.Concurrent.BoundedChan
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue
import ATrade.Types import ATrade.Types
import QuoteSource.TableParsers.AllParamsTableParser import QuoteSource.TableParsers.AllParamsTableParser
import QuoteSource.TableParser import QuoteSource.TableParser
@ -89,14 +91,15 @@ parseConfig = withObject "object" $ \obj -> do
tableName = tn, tableName = tn,
tableParams = params } tableParams = params }
forkBoundedChan :: Int -> BoundedChan Tick -> IO (ThreadId, BoundedChan Tick, BoundedChan QuoteSourceServerData) forkBoundedChan :: Int -> TBQueue Tick -> IO (ThreadId, TBQueue Tick, TBQueue QuoteSourceServerData)
forkBoundedChan size source = do forkBoundedChan size source = do
sink <- newBoundedChan size sink <- atomically $ newTBQueue size
sinkQss <- newBoundedChan size sinkQss <- atomically $ newTBQueue size
tid <- forkIO $ forever $ do tid <- forkIO $ forever $ do
v <- readChan source v <- atomically $ readTBQueue source
tryWriteChan sink v atomically $ do
tryWriteChan sinkQss (QSSTick v) writeTBQueue sink v
writeTBQueue sinkQss (QSSTick v)
return (tid, sink, sinkQss) return (tid, sink, sinkQss)
@ -108,7 +111,7 @@ main = do
config <- readConfig "quik-connector.config.json" config <- readConfig "quik-connector.config.json"
infoM "main" "Config loaded" infoM "main" "Config loaded"
chan <- newBoundedChan 1000 chan <- atomically $ newTBQueue 1000
infoM "main" "Starting data import server" infoM "main" "Starting data import server"
dis <- initDataImportServer [MkTableParser $ mkAllParamsTableParser "allparams"] chan "atrade" dis <- initDataImportServer [MkTableParser $ mkAllParamsTableParser "allparams"] chan "atrade"

4
quik-connector.cabal

@ -51,6 +51,9 @@ library
, split , split
, bimap , bimap
, safe , safe
, conduit
, stm
, stm-conduit
default-language: Haskell2010 default-language: Haskell2010
extra-libraries: "user32" extra-libraries: "user32"
other-modules: System.Win32.XlParser other-modules: System.Win32.XlParser
@ -75,6 +78,7 @@ executable quik-connector-exe
, zeromq4-haskell , zeromq4-haskell
, libatrade , libatrade
, transformers , transformers
, stm
default-language: Haskell2010 default-language: Haskell2010
extra-libraries: "user32" extra-libraries: "user32"

8
src/Broker/PaperBroker.hs

@ -11,6 +11,8 @@ import Control.DeepSeq
import Data.Hashable import Data.Hashable
import Data.Bits import Data.Bits
import Control.Concurrent.BoundedChan import Control.Concurrent.BoundedChan
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue
import ATrade.Types import ATrade.Types
import Data.IORef import Data.IORef
import qualified Data.HashMap.Strict as M import qualified Data.HashMap.Strict as M
@ -31,14 +33,14 @@ instance Hashable TickMapKey where
data PaperBrokerState = PaperBrokerState { data PaperBrokerState = PaperBrokerState {
pbTid :: Maybe ThreadId, pbTid :: Maybe ThreadId,
tickChannel :: BoundedChan Tick, tickChannel :: TBQueue Tick,
tickMap :: M.HashMap TickMapKey Tick, tickMap :: M.HashMap TickMapKey Tick,
orders :: M.HashMap OrderId Order, orders :: M.HashMap OrderId Order,
cash :: ! Decimal, cash :: ! Decimal,
notificationCallback :: Maybe (Notification -> IO ()) notificationCallback :: Maybe (Notification -> IO ())
} }
mkPaperBroker :: BoundedChan Tick -> Decimal -> [T.Text] -> IO BrokerInterface mkPaperBroker :: TBQueue Tick -> Decimal -> [T.Text] -> IO BrokerInterface
mkPaperBroker tickChan startCash accounts = do mkPaperBroker tickChan startCash accounts = do
state <- newIORef PaperBrokerState { state <- newIORef PaperBrokerState {
pbTid = Nothing, pbTid = Nothing,
@ -62,7 +64,7 @@ brokerThread :: IORef PaperBrokerState -> IO ()
brokerThread state = do brokerThread state = do
chan <- tickChannel <$> readIORef state chan <- tickChannel <$> readIORef state
forever $ do forever $ do
tick <- readChan chan tick <- atomically $ readTBQueue chan
atomicModifyIORef' state (\s -> (s { tickMap = M.insert (makeKey tick) tick $! tickMap s }, ()) ) atomicModifyIORef' state (\s -> (s { tickMap = M.insert (makeKey tick) tick $! tickMap s }, ()) )
where where
makeKey !tick = TickMapKey (security $! tick) (datatype tick) makeKey !tick = TickMapKey (security $! tick) (datatype tick)

8
src/QuoteSource/DataImport.hs

@ -7,6 +7,8 @@ module QuoteSource.DataImport
) where ) where
import Control.Concurrent.BoundedChan import Control.Concurrent.BoundedChan
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue
import Control.Monad.State.Strict import Control.Monad.State.Strict
import ATrade.Types import ATrade.Types
import Data.IORef import Data.IORef
@ -20,7 +22,7 @@ import qualified Data.Map as M
data ServerState = ServerState { data ServerState = ServerState {
appName :: String, appName :: String,
parsers :: IORef (M.Map String TableParserInstance), parsers :: IORef (M.Map String TableParserInstance),
tickChannel :: BoundedChan Tick tickChannel :: TBQueue Tick
} }
ddeCallback :: ServerState -> String -> (Int, Int, [XlData]) -> IO Bool ddeCallback :: ServerState -> String -> (Int, Int, [XlData]) -> IO Bool
@ -32,12 +34,12 @@ ddeCallback state topic table = do
let stateWithTimeHint = giveTimestampHint myParser timeHint let stateWithTimeHint = giveTimestampHint myParser timeHint
let (ticks, newState) = runState (parseXlTable table) stateWithTimeHint let (ticks, newState) = runState (parseXlTable table) stateWithTimeHint
modifyIORef' (parsers state) (\s -> newState `seq` s `seq` M.insert topic (MkTableParser newState) s) modifyIORef' (parsers state) (\s -> newState `seq` s `seq` M.insert topic (MkTableParser newState) s)
writeList2Chan (tickChannel state) ticks mapM_ (atomically . writeTBQueue (tickChannel state)) ticks
return True return True
_ -> return False _ -> return False
initDataImportServer :: [TableParserInstance] -> BoundedChan Tick -> String -> IO (ServerState, IORef DdeState) initDataImportServer :: [TableParserInstance] -> TBQueue Tick -> String -> IO (ServerState, IORef DdeState)
initDataImportServer parsers tickChan applicationName = do initDataImportServer parsers tickChan applicationName = do
parsers <- newIORef $ M.fromList $ map (\(MkTableParser p) -> (getTableId p, MkTableParser p)) parsers parsers <- newIORef $ M.fromList $ map (\(MkTableParser p) -> (getTableId p, MkTableParser p)) parsers
let s = ServerState { appName = applicationName, parsers = parsers, tickChannel = tickChan } let s = ServerState { appName = applicationName, parsers = parsers, tickChannel = tickChan }

Loading…
Cancel
Save