Browse Source

Trade sinks changed

master
Denis Tereshkin 7 years ago
parent
commit
18533e484a
  1. 62
      app/Config.hs
  2. 88
      app/Main.hs
  3. 20
      app/Version.hs
  4. 6
      quik-connector.cabal
  5. 21
      src/ATrade/Quotes/QTIS.hs
  6. 95
      src/Broker/PaperBroker.hs
  7. 11
      src/Broker/QuikBroker.hs
  8. 14
      src/TickTable.hs
  9. 4
      stack.yaml

62
app/Config.hs

@ -1,4 +1,5 @@
{-# LANGUAGE OverloadedStrings, OverloadedLabels #-} {-# LANGUAGE OverloadedLabels #-}
{-# LANGUAGE OverloadedStrings #-}
module Config ( module Config (
TableConfig(..), TableConfig(..),
@ -7,38 +8,37 @@ module Config (
) where ) where
import Commissions (CommissionConfig) import Commissions (CommissionConfig)
import Data.Aeson import Data.Aeson
import Data.Aeson.Types import Data.Aeson.Types
import qualified Data.ByteString.Lazy as BL import qualified Data.ByteString.Lazy as BL
import qualified Data.HashMap.Strict as HM import qualified Data.HashMap.Strict as HM
import qualified Data.Vector as V import qualified Data.Text as T
import qualified Data.Text as T import qualified Data.Vector as V
data TableConfig = TableConfig { data TableConfig = TableConfig {
parserId :: String, parserId :: String,
tableName :: String, tableName :: String,
tableParams :: Value tableParams :: Value
} deriving (Show) } deriving (Show)
data Config = Config { data Config = Config {
quotesourceEndpoint :: String, quotesourceEndpoint :: String,
qtisEndpoint :: String, qtisEndpoint :: String,
pipeReaderQsEndpoint :: Maybe String, pipeReaderQsEndpoint :: Maybe String,
tickPipePath :: Maybe String, tickPipePath :: Maybe String,
brokerserverEndpoint :: String, brokerserverEndpoint :: String,
whitelist :: [T.Text], whitelist :: [T.Text],
blacklist :: [T.Text], blacklist :: [T.Text],
brokerServerCertPath :: Maybe FilePath, brokerServerCertPath :: Maybe FilePath,
brokerClientCertificateDir :: Maybe FilePath, brokerClientCertificateDir :: Maybe FilePath,
tables :: [TableConfig], tables :: [TableConfig],
quikPath :: String, quikPath :: String,
dllPath :: String, dllPath :: String,
quikAccounts :: [T.Text], quikAccounts :: [T.Text],
tradeSink :: T.Text, tradeSink :: T.Text,
telegramToken :: T.Text, tradeSink2 :: T.Text,
telegramChatId :: T.Text, commissions :: [CommissionConfig]
commissions :: [CommissionConfig]
} deriving (Show) } deriving (Show)
readConfig :: String -> IO Config readConfig :: String -> IO Config
@ -46,7 +46,7 @@ readConfig fname = do
content <- BL.readFile fname content <- BL.readFile fname
case decode content >>= parseMaybe parseConfig of case decode content >>= parseMaybe parseConfig of
Just config -> return config Just config -> return config
Nothing -> error "Unable to load config" Nothing -> error "Unable to load config"
parseConfig :: Value -> Parser Config parseConfig :: Value -> Parser Config
parseConfig = withObject "object" $ \obj -> do parseConfig = withObject "object" $ \obj -> do
@ -60,13 +60,12 @@ parseConfig = withObject "object" $ \obj -> do
serverCert <- obj .:? "broker_server_certificate" serverCert <- obj .:? "broker_server_certificate"
clientCerts <- obj .:? "broker_client_certificates" clientCerts <- obj .:? "broker_client_certificates"
rt <- case HM.lookup "tables" obj of rt <- case HM.lookup "tables" obj of
Just v -> parseTables v Just v -> parseTables v
Nothing -> fail "Expected tables array" Nothing -> fail "Expected tables array"
qp <- obj .: "quik-path" qp <- obj .: "quik-path"
dp <- obj .: "dll-path" dp <- obj .: "dll-path"
trsink <- obj .: "trade-sink" trsink <- obj .: "trade-sink"
tgToken <- obj .: "telegram-token" trsink2 <- obj .: "trade-sink2"
tgChatId <- obj .: "telegram-chatid"
commissionsConfig <- obj .: "commissions" commissionsConfig <- obj .: "commissions"
accs <- V.toList <$> obj .: "accounts" accs <- V.toList <$> obj .: "accounts"
return Config { quotesourceEndpoint = qse, return Config { quotesourceEndpoint = qse,
@ -83,8 +82,7 @@ parseConfig = withObject "object" $ \obj -> do
dllPath = dp, dllPath = dp,
quikAccounts = fmap T.pack accs, quikAccounts = fmap T.pack accs,
tradeSink = trsink, tradeSink = trsink,
telegramToken = tgToken, tradeSink2 = trsink2,
telegramChatId = tgChatId,
commissions = commissionsConfig } commissions = commissionsConfig }
where where
parseTables :: Value -> Parser [TableConfig] parseTables :: Value -> Parser [TableConfig]
@ -95,7 +93,7 @@ parseConfig = withObject "object" $ \obj -> do
pid <- obj .: "parser-id" pid <- obj .: "parser-id"
tn <- obj .: "table-name" tn <- obj .: "table-name"
params <- case HM.lookup "params" obj of params <- case HM.lookup "params" obj of
Just x -> return x Just x -> return x
Nothing -> return $ Object HM.empty Nothing -> return $ Object HM.empty
return TableConfig { return TableConfig {
parserId = pid, parserId = pid,

88
app/Main.hs

@ -1,42 +1,46 @@
{-# LANGUAGE OverloadedStrings, OverloadedLabels, LambdaCase #-} {-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedLabels #-}
{-# LANGUAGE OverloadedStrings #-}
module Main where module Main where
import System.IO import System.IO
import QuoteSource.DataImport import ATrade.QuoteSource.Server
import Control.Concurrent hiding (readChan, writeChan) import ATrade.Types
import Control.Monad import Control.Concurrent hiding (readChan,
import Control.Exception.Safe writeChan)
import Control.Error.Util import Control.Concurrent.BoundedChan
import qualified GI.Gtk as Gtk import Control.Error.Util
import Data.GI.Base import Control.Exception.Safe
import Control.Concurrent.BoundedChan import Control.Monad
import ATrade.Types import Data.GI.Base
import QuoteSource.TableParsers.AllParamsTableParser import qualified GI.Gtk as Gtk
import QuoteSource.TableParser import QuoteSource.DataImport
import QuoteSource.PipeReader import QuoteSource.PipeReader
import ATrade.QuoteSource.Server import QuoteSource.TableParser
import QuoteSource.TableParsers.AllParamsTableParser
import ATrade.Broker.TradeSinks.ZMQTradeSink
import ATrade.Broker.TradeSinks.TelegramTradeSink import ATrade.Broker.Server
import ATrade.Broker.Server import ATrade.Broker.TradeSinks.ZMQTradeSink
import Broker.PaperBroker import Broker.PaperBroker
import Broker.QuikBroker import Broker.QuikBroker
import System.Directory import System.Directory
import System.Timeout import System.Log.Formatter
import System.Log.Logger import System.Log.Handler (setFormatter)
import System.Log.Handler.Simple import System.Log.Handler.Simple
import System.Log.Handler (setFormatter) import System.Log.Logger
import System.Log.Formatter import System.Timeout
import System.ZMQ4 import System.ZMQ4
import System.ZMQ4.ZAP import System.ZMQ4.ZAP
import qualified Data.Text as T import Data.Maybe
import Data.Maybe import qualified Data.Text as T
import Config import Config
import TickTable (mkTickTable) import TickTable (mkTickTable)
import Version
forkBoundedChan :: Int -> BoundedChan Tick -> IO (ThreadId, BoundedChan Tick, BoundedChan Tick, BoundedChan QuoteSourceServerData) forkBoundedChan :: Int -> BoundedChan Tick -> IO (ThreadId, BoundedChan Tick, BoundedChan Tick, BoundedChan QuoteSourceServerData)
forkBoundedChan size sourceChan = do forkBoundedChan size sourceChan = do
@ -57,14 +61,18 @@ initLogging = do
handler <- streamHandler stderr DEBUG >>= handler <- streamHandler stderr DEBUG >>=
(\x -> return $ (\x -> return $
setFormatter x (simpleLogFormatter "$utcTime\t {$loggername} <$prio> -> $msg")) setFormatter x (simpleLogFormatter "$utcTime\t {$loggername} <$prio> -> $msg"))
fhandler <- fileHandler "quik-connector.log" DEBUG >>=
(\x -> return $
setFormatter x (simpleLogFormatter "$utcTime\t {$loggername} <$prio> -> $msg"))
hSetBuffering stderr LineBuffering hSetBuffering stderr LineBuffering
updateGlobalLogger rootLoggerName (setLevel DEBUG) updateGlobalLogger rootLoggerName (setLevel DEBUG)
updateGlobalLogger rootLoggerName (setHandlers [handler]) updateGlobalLogger rootLoggerName (setHandlers [handler, fhandler])
main :: IO () main :: IO ()
main = do main = do
initLogging initLogging
infoM "main" $ "Starting quik-connector-" ++ T.unpack quikConnectorVersionText
infoM "main" "Loading config" infoM "main" "Loading config"
config <- readConfig "quik-connector.config.json" config <- readConfig "quik-connector.config.json"
@ -103,9 +111,9 @@ main = do
bracket (forkIO $ pipeReaderThread ctx config) killThread (\_ -> do bracket (forkIO $ pipeReaderThread ctx config) killThread (\_ -> do
withZMQTradeSink ctx (tradeSink config) (\zmqTradeSink -> do withZMQTradeSink ctx (tradeSink config) (\zmqTradeSink -> do
withTelegramTradeSink (telegramToken config) (telegramChatId config) (\telegramTradeSink -> do withZMQTradeSink ctx (tradeSink2 config) (\zmqTradeSink2 -> do
bracket (startQuoteSourceServer c2 ctx (T.pack $ quotesourceEndpoint config) (Just "global")) stopQuoteSourceServer (\_ -> do bracket (startQuoteSourceServer c2 ctx (T.pack $ quotesourceEndpoint config) (Just "global")) stopQuoteSourceServer (\_ -> do
bracket (startBrokerServer [brokerP, brokerQ] ctx (T.pack $ brokerserverEndpoint config) [telegramTradeSink, zmqTradeSink] serverParams) stopBrokerServer (\_ -> do bracket (startBrokerServer [brokerP, brokerQ] ctx (T.pack $ brokerserverEndpoint config) [zmqTradeSink2, zmqTradeSink] serverParams) stopBrokerServer (\_ -> do
void $ Gtk.init Nothing void $ Gtk.init Nothing
window <- new Gtk.Window [ #title := "Quik connector" ] window <- new Gtk.Window [ #title := "Quik connector" ]
void $ on window #destroy Gtk.mainQuit void $ on window #destroy Gtk.mainQuit

20
app/Version.hs

@ -0,0 +1,20 @@
{-# LANGUAGE QuasiQuotes #-}
module Version
(
quikConnectorVersion,
quikConnectorVersionText
) where
import qualified Data.Text as T
import Text.Printf.TH
quikConnectorVersion :: (Int, Int, Int, Int)
quikConnectorVersion = (0, 2, 3, 0)
quikConnectorVersionText :: T.Text
quikConnectorVersionText =
[st|%d.%d.%d.%d|] v1 v2 v3 v4
where
(v1, v2, v3, v4) = quikConnectorVersion

6
quik-connector.cabal

@ -1,5 +1,5 @@
name: quik-connector name: quik-connector
version: 0.2.1.0 version: 0.2.3.0
synopsis: Atrade-Quik Connector application synopsis: Atrade-Quik Connector application
description: Please see README.md description: Please see README.md
homepage: https://github.com/asakul/quik-connector homepage: https://github.com/asakul/quik-connector
@ -49,7 +49,7 @@ library
, aeson , aeson
, cond , cond
, scientific , scientific
, libatrade == 0.4.0.0 , libatrade == 0.7.0.0
, deepseq , deepseq
, errors , errors
, split , split
@ -105,8 +105,10 @@ executable quik-connector-exe
, errors , errors
, safe-exceptions , safe-exceptions
, iconv , iconv
, th-printf
default-language: Haskell2010 default-language: Haskell2010
other-modules: Config other-modules: Config
, Version
-- extra-libraries: "user32" -- extra-libraries: "user32"
test-suite quik-connector-test test-suite quik-connector-test

21
src/ATrade/Quotes/QTIS.hs

@ -7,19 +7,19 @@ module ATrade.Quotes.QTIS
qtisGetTickersInfo' qtisGetTickersInfo'
) where ) where
import ATrade.Types import ATrade.Types
import Control.Monad import Control.Monad
import Data.Aeson import Data.Aeson
import Data.Maybe
import qualified Data.ByteString.Char8 as BC8 import qualified Data.ByteString.Char8 as BC8
import qualified Data.ByteString.Lazy as BL import qualified Data.ByteString.Lazy as BL
import qualified Data.Text as T import Data.Maybe
import System.ZMQ4 import qualified Data.Text as T
import System.Log.Logger import System.Log.Logger
import System.ZMQ4
data TickerInfo = TickerInfo { data TickerInfo = TickerInfo {
tiTicker :: T.Text, tiTicker :: T.Text,
tiLotSize :: Integer, tiLotSize :: Integer,
tiTickSize :: Price tiTickSize :: Price
} deriving (Show, Eq) } deriving (Show, Eq)
@ -47,6 +47,7 @@ qtisGetTickersInfo ctx endpoint tickers =
debugM "QTIS" $ "Requesting: " ++ T.unpack tickerId debugM "QTIS" $ "Requesting: " ++ T.unpack tickerId
send sock [] $ BL.toStrict (tickerRequest tickerId) send sock [] $ BL.toStrict (tickerRequest tickerId)
response <- receiveMulti sock response <- receiveMulti sock
debugM "QTIS" $ show response
let r = parseResponse response let r = parseResponse response
debugM "QTIS" $ "Got response: " ++ show r debugM "QTIS" $ "Got response: " ++ show r
return r)) return r))

95
src/Broker/PaperBroker.hs

@ -1,51 +1,51 @@
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE BangPatterns #-} {-# LANGUAGE Strict #-}
{-# LANGUAGE Strict #-}
module Broker.PaperBroker ( module Broker.PaperBroker (
PaperBrokerState, PaperBrokerState,
mkPaperBroker mkPaperBroker
) where ) where
import Data.Hashable import ATrade.Broker.Protocol
import Data.Bits import ATrade.Broker.Server
import ATrade.Types import ATrade.Quotes.QTIS
import Data.IORef import ATrade.Types
import qualified Data.List as L import Control.Concurrent hiding (readChan, writeChan)
import qualified Data.Map.Strict as M import Control.Concurrent.BoundedChan
import qualified Data.Text as T import Control.Monad
import ATrade.Broker.Protocol import Data.Bits
import ATrade.Broker.Server import Data.Hashable
import Data.Time.Clock import Data.IORef
import Data.Maybe import qualified Data.List as L
import Control.Monad import qualified Data.Map.Strict as M
import Control.Concurrent.BoundedChan import Data.Maybe
import Control.Concurrent hiding (readChan, writeChan) import qualified Data.Text as T
import System.Log.Logger import Data.Time.Clock
import ATrade.Quotes.QTIS import System.Log.Logger
import System.ZMQ4 import System.ZMQ4
import Commissions (CommissionConfig(..)) import Commissions (CommissionConfig (..))
import TickTable (TickTableH, TickKey(..), getTick, getTickerInfo) import TickTable (TickKey (..), TickTableH,
getTick, getTickerInfo)
data PaperBrokerState = PaperBrokerState { data PaperBrokerState = PaperBrokerState {
pbTid :: Maybe ThreadId, pbTid :: Maybe ThreadId,
tickTable :: TickTableH, tickTable :: TickTableH,
orders :: M.Map OrderId Order, orders :: M.Map OrderId Order,
cash :: !Price, cash :: !Price,
notificationCallback :: Maybe (Notification -> IO ()), notificationCallback :: Maybe (Notification -> IO ()),
pendingOrders :: [Order], pendingOrders :: [Order],
fortsClassCodes :: [T.Text], fortsClassCodes :: [T.Text],
fortsOpenTimeIntervals :: [(DiffTime, DiffTime)], fortsOpenTimeIntervals :: [(DiffTime, DiffTime)],
auctionableClassCodes :: [T.Text], auctionableClassCodes :: [T.Text],
premarketStartTime :: DiffTime, premarketStartTime :: DiffTime,
marketOpenTime :: DiffTime, marketOpenTime :: DiffTime,
postMarketStartTime :: DiffTime, postMarketStartTime :: DiffTime,
postMarketFixTime :: DiffTime, postMarketFixTime :: DiffTime,
postMarketCloseTime :: DiffTime, postMarketCloseTime :: DiffTime,
commissions :: [CommissionConfig] commissions :: [CommissionConfig]
} }
hourMin :: Integer -> Integer -> DiffTime hourMin :: Integer -> Integer -> DiffTime
@ -90,9 +90,11 @@ brokerThread chan state = forever $ do
executePendingOrders tick state executePendingOrders tick state
executePendingOrders tick state = do executePendingOrders tick state = do
marketOpenTime' <- marketOpenTime <$> readIORef state
po <- pendingOrders <$> readIORef state po <- pendingOrders <$> readIORef state
executedIds <- catMaybes <$> mapM execute po when (utctDayTime (timestamp tick) >= marketOpenTime') $ do
atomicModifyIORef' state (\s -> (s { pendingOrders = L.filter (\order -> orderId order `L.notElem` executedIds) (pendingOrders s)}, ())) executedIds <- catMaybes <$> mapM execute po
atomicModifyIORef' state (\s -> (s { pendingOrders = L.filter (\order -> orderId order `L.notElem` executedIds) (pendingOrders s)}, ()))
where where
execute order = execute order =
if security tick == orderSecurity order if security tick == orderSecurity order
@ -147,7 +149,7 @@ maybeCall proj state arg = do
cb <- proj <$> readIORef state cb <- proj <$> readIORef state
case cb of case cb of
Just callback -> callback arg Just callback -> callback arg
Nothing -> return () Nothing -> return ()
executeAtTick state order tick = do executeAtTick state order tick = do
let newOrder = order { orderState = Executed } let newOrder = order { orderState = Executed }
@ -179,8 +181,8 @@ pbSubmitOrder :: IORef PaperBrokerState -> Order -> IO ()
pbSubmitOrder state order = do pbSubmitOrder state order = do
infoM "PaperBroker" $ "Submitted order: " ++ show order infoM "PaperBroker" $ "Submitted order: " ++ show order
case orderPrice order of case orderPrice order of
Market -> executeMarketOrder state order Market -> executeMarketOrder state order
Limit price -> submitLimitOrder price state order Limit price -> submitLimitOrder price state order
Stop price trigger -> submitStopOrder state order Stop price trigger -> submitStopOrder state order
StopMarket trigger -> submitStopMarketOrder state order StopMarket trigger -> submitStopMarketOrder state order
@ -204,8 +206,9 @@ pbSubmitOrder state order = do
let newOrder = order { orderState = Submitted } let newOrder = order { orderState = Submitted }
atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s }, ())) atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s }, ()))
maybeCall notificationCallback state $ OrderNotification (orderId order) Submitted maybeCall notificationCallback state $ OrderNotification (orderId order) Submitted
Just tick -> Just tick -> do
if ((orderOperation order == Buy) && (value tick < price)) || ((orderOperation order == Sell) && (value tick > price)) marketOpenTime' <- marketOpenTime <$> readIORef state
if (((orderOperation order == Buy) && (value tick < price)) || ((orderOperation order == Sell) && (value tick > price)) && (utctDayTime (timestamp tick) >= marketOpenTime'))
then do then do
maybeCall notificationCallback state $ OrderNotification (orderId order) Submitted maybeCall notificationCallback state $ OrderNotification (orderId order) Submitted
executeAtTick state order tick executeAtTick state order tick
@ -218,7 +221,7 @@ pbSubmitOrder state order = do
submitStopMarketOrder _ _ = warningM "PaperBroker" $ "Not implemented: Submitted order: " ++ show order submitStopMarketOrder _ _ = warningM "PaperBroker" $ "Not implemented: Submitted order: " ++ show order
orderDatatype = case orderOperation order of orderDatatype = case orderOperation order of
Buy -> BestOffer Buy -> BestOffer
Sell -> BestBid Sell -> BestBid
key = TickKey (orderSecurity order) orderDatatype key = TickKey (orderSecurity order) orderDatatype
@ -235,7 +238,7 @@ pbDestroyBroker state = do
maybeTid <- pbTid <$> readIORef state maybeTid <- pbTid <$> readIORef state
case maybeTid of case maybeTid of
Just tid -> killThread tid Just tid -> killThread tid
Nothing -> return () Nothing -> return ()
{- {-
pbGetOrder :: IORef PaperBrokerState -> OrderId -> IO (Maybe Order) pbGetOrder :: IORef PaperBrokerState -> OrderId -> IO (Maybe Order)

11
src/Broker/QuikBroker.hs

@ -1,5 +1,6 @@
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE MultiWayIf #-} {-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE BangPatterns #-}
module Broker.QuikBroker ( module Broker.QuikBroker (
mkQuikBroker mkQuikBroker
@ -87,11 +88,15 @@ qbSubmitOrder state order = do
transId <- nextTransId state transId <- nextTransId state
atomicModifyIORef' state (\s -> (s { atomicModifyIORef' state (\s -> (s {
trans2orderid = M.insert transId order (trans2orderid s) }, ())) trans2orderid = M.insert transId order (trans2orderid s) }, ()))
debugM "Quik" "Getting ticktable"
tt <- tickTable <$> readIORef state tt <- tickTable <$> readIORef state
debugM "Quik" "Getting tickerinfo from ticktable"
tickerInfoMb <- getTickerInfo tt (orderSecurity order) tickerInfoMb <- getTickerInfo tt (orderSecurity order)
debugM "Quik" "Getting liquid ticks"
liquidTickMb <- getTick tt (TickKey (orderSecurity order) (if orderOperation order == Buy then BestOffer else BestBid)) liquidTickMb <- getTick tt (TickKey (orderSecurity order) (if orderOperation order == Buy then BestOffer else BestBid))
debugM "Quik" "Obtained"
case (tickerInfoMb, liquidTickMb) of case (tickerInfoMb, liquidTickMb) of
(Just tickerInfo, Just liquidTick) -> (Just !tickerInfo, Just !liquidTick) ->
case makeTransactionString tickerInfo liquidTick transId order of case makeTransactionString tickerInfo liquidTick transId order of
Just transStr -> do Just transStr -> do
rc <- quikSendTransaction q transStr rc <- quikSendTransaction q transStr
@ -144,8 +149,8 @@ makeTransactionString tickerInfo liquidTick transId order =
seccode = (`atMay` 1) . splitOn "#" . T.unpack $ orderSecurity order seccode = (`atMay` 1) . splitOn "#" . T.unpack $ orderSecurity order
price = case orderPrice order of price = case orderPrice order of
Market -> if orderOperation order == Buy Market -> if orderOperation order == Buy
then removeTrailingZeros . show $ value liquidTick - 10 * tiTickSize tickerInfo then removeTrailingZeros . show $ value liquidTick + 10 * tiTickSize tickerInfo
else removeTrailingZeros . show $ value liquidTick + 10 * tiTickSize tickerInfo else removeTrailingZeros . show $ value liquidTick - 10 * tiTickSize tickerInfo
Limit p -> removeTrailingZeros . show $ p Limit p -> removeTrailingZeros . show $ p
_ -> "0" _ -> "0"
removeTrailingZeros v = if '.' `L.elem` v then L.dropWhileEnd (== '.') . L.dropWhileEnd (== '0') $ v else v removeTrailingZeros v = if '.' `L.elem` v then L.dropWhileEnd (== '.') . L.dropWhileEnd (== '0') $ v else v

14
src/TickTable.hs

@ -16,7 +16,7 @@ import Control.Concurrent (forkIO, ThreadId, threadDelay)
import Control.Concurrent.BoundedChan (BoundedChan, newBoundedChan, readChan, tryReadChan, writeChan) import Control.Concurrent.BoundedChan (BoundedChan, newBoundedChan, readChan, tryReadChan, writeChan)
import Control.Concurrent.MVar (newEmptyMVar) import Control.Concurrent.MVar (newEmptyMVar)
import Control.Monad (forM_, when, void) import Control.Monad (forM_, when, void, forever)
import Data.Maybe (catMaybes, isNothing) import Data.Maybe (catMaybes, isNothing)
import Data.IORef (IORef, newIORef, atomicModifyIORef', readIORef) import Data.IORef (IORef, newIORef, atomicModifyIORef', readIORef)
@ -24,19 +24,22 @@ import Data.IORef (IORef, newIORef, atomicModifyIORef', readIORef)
import qualified Data.Map.Strict as M import qualified Data.Map.Strict as M
import qualified Data.Text as T import qualified Data.Text as T
import System.Log.Logger (debugM)
import System.ZMQ4 (Context) import System.ZMQ4 (Context)
data TickKey = TickKey TickerId DataType data TickKey = TickKey TickerId DataType
deriving (Show, Ord, Eq) deriving (Show, Ord, Eq)
data TickTable = TickTable { data TickTable = TickTable {
ticks :: M.Map TickKey Tick, ticks :: !(M.Map TickKey Tick),
tickerInfo :: M.Map TickerId TickerInfo tickerInfo :: !(M.Map TickerId TickerInfo)
} }
type TickTableH = IORef TickTable type TickTableH = IORef TickTable
data QTISThreadRequest = RequestTickerInfo TickerId | Shutdown data QTISThreadRequest = RequestTickerInfo TickerId | Shutdown
deriving (Show, Eq)
mkTickTable :: BoundedChan Tick -> Context -> T.Text -> IO (IORef TickTable) mkTickTable :: BoundedChan Tick -> Context -> T.Text -> IO (IORef TickTable)
mkTickTable chan ctx qtisEndpoint = do mkTickTable chan ctx qtisEndpoint = do
@ -48,7 +51,7 @@ mkTickTable chan ctx qtisEndpoint = do
void $ forkIO $ tickTableThread qtisChan r shutdownMVar qtisTid void $ forkIO $ tickTableThread qtisChan r shutdownMVar qtisTid
return r return r
where where
tickTableThread qtisChan r shutdownMVar qtisTid = do tickTableThread qtisChan r shutdownMVar qtisTid = forever $ do
t <- readChan chan t <- readChan chan
atomicModifyIORef' r (\s -> (s { ticks = M.insert (TickKey (security t) (datatype t)) t $! ticks s }, ())) atomicModifyIORef' r (\s -> (s { ticks = M.insert (TickKey (security t) (datatype t)) t $! ticks s }, ()))
when (datatype t == LastTradePrice) $ do when (datatype t == LastTradePrice) $ do
@ -56,9 +59,10 @@ mkTickTable chan ctx qtisEndpoint = do
when (isNothing $ M.lookup (security t) infoMap) $ when (isNothing $ M.lookup (security t) infoMap) $
writeChan qtisChan $ RequestTickerInfo (security t) writeChan qtisChan $ RequestTickerInfo (security t)
qtisThread r qtisChan ctx qtisEndpoint = do qtisThread r qtisChan ctx qtisEndpoint = forever $ do
threadDelay 1000000 threadDelay 1000000
requests <- readListFromChan qtisChan requests <- readListFromChan qtisChan
debugM "TickTable" $ "Requested info for tickers: " ++ show requests
ti <- qtisGetTickersInfo ctx qtisEndpoint (catMaybes $ fmap requestToTicker requests) ti <- qtisGetTickersInfo ctx qtisEndpoint (catMaybes $ fmap requestToTicker requests)
forM_ ti (\newInfo -> atomicModifyIORef' r (\s -> (s { tickerInfo = M.insert (tiTicker newInfo) newInfo $! tickerInfo s }, ()))) forM_ ti (\newInfo -> atomicModifyIORef' r (\s -> (s { tickerInfo = M.insert (tiTicker newInfo) newInfo $! tickerInfo s }, ())))

4
stack.yaml

@ -15,7 +15,7 @@
# resolver: # resolver:
# name: custom-snapshot # name: custom-snapshot
# location: "./custom-snapshot.yaml" # location: "./custom-snapshot.yaml"
resolver: lts-8.18 resolver: lts-12.9
# User packages to be built. # User packages to be built.
# Various formats can be used as shown in the example below. # Various formats can be used as shown in the example below.
@ -42,7 +42,7 @@ packages:
- '../iconv' - '../iconv'
# Dependency packages to be pulled from upstream that are not in the resolver # Dependency packages to be pulled from upstream that are not in the resolver
# (e.g., acme-missiles-0.3) # (e.g., acme-missiles-0.3)
extra-deps: [ "datetime-0.3.1", "cond-0.4.1.1"] extra-deps: [ "datetime-0.3.1", "cond-0.4.1.1", "gi-gtk-3.0.23", "gi-gdk-3.0.16", "gi-gdkpixbuf-2.0.16", "gi-gio-2.0.18", "gi-pango-1.0.16", "text-format-0.3.2", "th-printf-0.5.1"]
# Override default flag values for local packages and extra-deps # Override default flag values for local packages and extra-deps
flags: {} flags: {}

Loading…
Cancel
Save