Browse Source

Build against libatrade-0.12

master
Denis Tereshkin 4 years ago
parent
commit
192b897e73
  1. 35
      app/Config.hs
  2. 159
      app/Main.hs
  3. 10
      quik-connector.cabal
  4. 85
      src/Broker/PaperBroker.hs
  5. 150
      src/Broker/QuikBroker.hs
  6. 116
      src/Broker/QuikBroker/Trans2QuikApi.hs
  7. 22
      src/QuoteSource/PipeReader.hs
  8. 3
      src/System/Win32/DDE.hs
  9. 3
      src/TickTable.hs
  10. 14
      stack.yaml

35
app/Config.hs

@ -23,22 +23,23 @@ data TableConfig = TableConfig { @@ -23,22 +23,23 @@ data TableConfig = TableConfig {
} deriving (Show)
data Config = Config {
quotesourceEndpoint :: String,
qtisEndpoint :: String,
pipeReaderQsEndpoint :: Maybe String,
tickPipePath :: Maybe String,
brokerserverEndpoint :: String,
whitelist :: [T.Text],
blacklist :: [T.Text],
brokerServerCertPath :: Maybe FilePath,
brokerClientCertificateDir :: Maybe FilePath,
tables :: [TableConfig],
quikPath :: String,
dllPath :: String,
quikAccounts :: [T.Text],
tradeSink :: T.Text,
tradeSink2 :: T.Text,
commissions :: [CommissionConfig]
quotesourceEndpoint :: String,
qtisEndpoint :: String,
pipeReaderQsEndpoint :: Maybe String,
tickPipePath :: Maybe String,
brokerserverEndpoint :: String,
brokerNotificationsEndpoint :: String,
whitelist :: [T.Text],
blacklist :: [T.Text],
brokerServerCertPath :: Maybe FilePath,
brokerClientCertificateDir :: Maybe FilePath,
tables :: [TableConfig],
quikPath :: String,
dllPath :: String,
quikAccounts :: [T.Text],
tradeSink :: T.Text,
tradeSink2 :: T.Text,
commissions :: [CommissionConfig]
} deriving (Show)
readConfig :: String -> IO Config
@ -55,6 +56,7 @@ parseConfig = withObject "object" $ \obj -> do @@ -55,6 +56,7 @@ parseConfig = withObject "object" $ \obj -> do
qsePipe <- obj .:? "quotesource-endpoint-pipe-reader"
pipePath <- obj .:? "pipe-reader-path"
bse <- obj .: "brokerserver-endpoint"
bsne <- obj .: "brokerserver-notifications-endpoint"
whitelist' <- obj .:? "whitelist" .!= []
blacklist' <- obj .:? "blacklist" .!= []
serverCert <- obj .:? "broker_server_certificate"
@ -73,6 +75,7 @@ parseConfig = withObject "object" $ \obj -> do @@ -73,6 +75,7 @@ parseConfig = withObject "object" $ \obj -> do
pipeReaderQsEndpoint = qsePipe,
tickPipePath = pipePath,
brokerserverEndpoint = bse,
brokerNotificationsEndpoint = bsne,
whitelist = whitelist',
blacklist = blacklist',
brokerServerCertPath = serverCert,

159
app/Main.hs

@ -14,8 +14,10 @@ import Control.Concurrent.BoundedChan @@ -14,8 +14,10 @@ import Control.Concurrent.BoundedChan
import Control.Error.Util
import Control.Exception.Safe
import Control.Monad
import Control.Monad.IO.Class (MonadIO)
import Data.GI.Base
import qualified GI.Gtk as Gtk
import Prelude hiding (log)
import QuoteSource.DataImport
import QuoteSource.PipeReader
import QuoteSource.TableParser
@ -27,10 +29,6 @@ import Broker.PaperBroker @@ -27,10 +29,6 @@ import Broker.PaperBroker
import Broker.QuikBroker
import System.Directory
import System.Log.Formatter
import System.Log.Handler (setFormatter)
import System.Log.Handler.Simple
import System.Log.Logger
import System.Timeout
import System.ZMQ4
import System.ZMQ4.ZAP
@ -41,6 +39,13 @@ import Data.Version @@ -41,6 +39,13 @@ import Data.Version
import ATrade (libatrade_gitrev,
libatrade_version)
import ATrade.Logging (Message, Severity (Debug, Info, Warning),
fmtMessage,
logWith)
import Colog (LogAction,
logTextStdout,
(>$<))
import Colog.Actions (logTextHandle)
import Config
import TickTable (mkTickTable)
import Version
@ -58,83 +63,87 @@ forkBoundedChan size sourceChan = do @@ -58,83 +63,87 @@ forkBoundedChan size sourceChan = do
return (tid, sink1, sink2, sinkQss)
initLogging :: IO ()
initLogging = do
handler <- streamHandler stderr DEBUG >>=
(\x -> return $
setFormatter x (simpleLogFormatter "$utcTime\t {$loggername} <$prio> -> $msg"))
fhandler <- fileHandler "quik-connector.log" DEBUG >>=
(\x -> return $
setFormatter x (simpleLogFormatter "$utcTime\t {$loggername} <$prio> -> $msg"))
hSetBuffering stderr LineBuffering
updateGlobalLogger rootLoggerName (setLevel DEBUG)
updateGlobalLogger rootLoggerName (setHandlers [handler, fhandler])
mkLogger :: (MonadIO m) => Handle -> LogAction m Message
mkLogger h = fmtMessage >$< (logTextStdout <> logTextHandle h)
main :: IO ()
main = do
initLogging
infoM "main" $ "Starting quik-connector-" ++ T.unpack quikConnectorVersionText ++ "; libatrade-" ++ showVersion libatrade_version ++ "(" ++ libatrade_gitrev ++ ")"
infoM "main" "Loading config"
config <- readConfig "quik-connector.config.json"
infoM "main" "Config loaded"
chan <- newBoundedChan 10000
infoM "main" "Starting data import server"
_ <- initDataImportServer [MkTableParser $ mkAllParamsTableParser "allparams"] chan "atrade"
(forkId, c0, c1, c2) <- forkBoundedChan 10000 chan
withContext (\ctx -> do
tickTable <- mkTickTable c0 ctx (T.pack $ qtisEndpoint config)
brokerQ <- mkQuikBroker tickTable (dllPath config) (quikPath config) (quikAccounts config) (commissions config)
brokerP <- mkPaperBroker tickTable c1 1000000 ["demo"] (commissions config)
withZapHandler ctx (\zap -> do
zapSetWhitelist zap "global" $ whitelist config
zapSetBlacklist zap "global" $ blacklist config
case brokerClientCertificateDir config of
Just certFile -> do
certs <- loadCertificatesFromDirectory certFile
forM_ certs (\cert -> zapAddClientCertificate zap "global" cert)
Nothing -> return ()
serverCert <- case brokerServerCertPath config of
Just certFile -> do
eitherCert <- loadCertificateFromFile certFile
case eitherCert of
Left errorMessage -> do
warningM "main" $ "Unable to load server certificate: " ++ errorMessage
return Nothing
Right cert -> return $ Just cert
Nothing -> return Nothing
let serverParams = defaultServerSecurityParams { sspDomain = Just "global",
sspCertificate = serverCert }
bracket (forkIO $ pipeReaderThread ctx config c2) killThread (\_ -> do
withZMQTradeSink ctx (tradeSink config) (\zmqTradeSink -> do
withZMQTradeSink ctx (tradeSink2 config) (\zmqTradeSink2 -> do
bracket (startQuoteSourceServer c2 ctx (T.pack $ quotesourceEndpoint config) quoteSourceServerSecurityParams) stopQuoteSourceServer (\_ -> do
bracket (startBrokerServer [brokerP, brokerQ] ctx (T.pack $ brokerserverEndpoint config) [zmqTradeSink2, zmqTradeSink] serverParams) stopBrokerServer (\_ -> do
void $ Gtk.init Nothing
window <- new Gtk.Window [ #title := "Quik connector" ]
void $ on window #destroy Gtk.mainQuit
#showAll window
Gtk.main)
infoM "main" "BRS down")
debugM "main" "QS done")
debugM "main" "TGTS done")
debugM "main" "ZMQTS done")
debugM "main" "ZAP done"))
void $ timeout 1000000 $ killThread forkId
infoM "main" "Main thread done"
withFile "quik-connector.log" AppendMode $ \logH -> do
let logger = mkLogger logH
let log = (logWith logger)
log Info "main" $ "Starting quik-connector-" <>
quikConnectorVersionText <>
"; libatrade-" <>
(T.pack . showVersion) libatrade_version <>
"(" <>
T.pack libatrade_gitrev <>
")"
log Info "main" "Loading config"
config <- readConfig "quik-connector.config.json"
log Info "main" "Config loaded"
chan <- newBoundedChan 10000
log Info "main" "Starting data import server"
_ <- initDataImportServer [MkTableParser $ mkAllParamsTableParser "allparams"] chan "atrade"
(forkId, c0, c1, c2) <- forkBoundedChan 10000 chan
withContext (\ctx -> do
tickTable <- mkTickTable c0 ctx (T.pack $ qtisEndpoint config)
brokerQ <- mkQuikBroker tickTable (dllPath config) (quikPath config) (quikAccounts config) (commissions config) logger
brokerP <- mkPaperBroker tickTable c1 1000000 ["demo"] (commissions config) logger
withZapHandler ctx (\zap -> do
zapSetWhitelist zap "global" $ whitelist config
zapSetBlacklist zap "global" $ blacklist config
case brokerClientCertificateDir config of
Just certFile -> do
certs <- loadCertificatesFromDirectory certFile
forM_ certs (\cert -> zapAddClientCertificate zap "global" cert)
Nothing -> return ()
serverCert <- case brokerServerCertPath config of
Just certFile -> do
eitherCert <- loadCertificateFromFile certFile
case eitherCert of
Left errorMessage -> do
log Warning "main" $ "Unable to load server certificate: " <> T.pack errorMessage
return Nothing
Right cert -> return $ Just cert
Nothing -> return Nothing
let serverParams = defaultServerSecurityParams { sspDomain = Just "global",
sspCertificate = serverCert }
bracket (forkIO $ pipeReaderThread ctx config c2 logger) killThread (\_ -> do
withZMQTradeSink ctx (tradeSink config) (\zmqTradeSink -> do
withZMQTradeSink ctx (tradeSink2 config) (\zmqTradeSink2 -> do
bracket (startQuoteSourceServer c2 ctx (T.pack $ quotesourceEndpoint config) quoteSourceServerSecurityParams) stopQuoteSourceServer (\_ -> do
bracket (startBrokerServer
[brokerP, brokerQ]
ctx
(T.pack $ brokerserverEndpoint config)
(T.pack $ brokerNotificationsEndpoint config)
[zmqTradeSink2, zmqTradeSink]
serverParams
logger) stopBrokerServer (\_ -> do
void $ Gtk.init Nothing
window <- new Gtk.Window [ #title := "Quik connector" ]
void $ on window #destroy Gtk.mainQuit
#showAll window
Gtk.main)
log Info "main" "BRS down")
log Debug "main" "QS done")
log Debug "main" "TGTS done")
log Debug "main" "ZMQTS done")
log Debug "main" "ZAP done"))
void $ timeout 1000000 $ killThread forkId
log Info "main" "Main thread done"
where
pipeReaderThread ctx config qsdataChan =
pipeReaderThread ctx config qsdataChan logger =
case pipeReaderQsEndpoint config of
Just qsep -> do
infoM "main" $ "QS: " ++ qsep
bracket (startPipeReader ctx (T.pack qsep) qsdataChan) stopPipeReader (\_ -> forever $ threadDelay 1000000)
logWith logger Info "main" $ "QS: " <> T.pack qsep
bracket (startPipeReader ctx (T.pack qsep) qsdataChan logger) stopPipeReader (\_ -> forever $ threadDelay 1000000)
_ -> return ()
quoteSourceServerSecurityParams = defaultServerSecurityParams { sspDomain = Just "global" }

10
quik-connector.cabal

@ -42,14 +42,13 @@ library @@ -42,14 +42,13 @@ library
, transformers
, datetime
, BoundedChan
, hslogger
, zeromq4-haskell
, hashable
, unordered-containers
, aeson
, cond
, scientific
, libatrade >= 0.9 && < 0.10
, libatrade >= 0.12 && < 0.13
, deepseq
, errors
, split
@ -63,13 +62,15 @@ library @@ -63,13 +62,15 @@ library
, http-client-tls
, utf8-string
, connection
, text-format
, monad-loops
, extra
, incremental-parser
, attoparsec
, safe-exceptions
, iconv
, th-printf
, co-log
, co-log-core
default-language: Haskell2010
-- extra-libraries: "user32"
other-modules: System.Win32.XlParser
@ -86,7 +87,6 @@ executable quik-connector-exe @@ -86,7 +87,6 @@ executable quik-connector-exe
, haskell-gi-base
, gi-gtk
, BoundedChan
, hslogger
, aeson
, bytestring
, unordered-containers
@ -106,6 +106,8 @@ executable quik-connector-exe @@ -106,6 +106,8 @@ executable quik-connector-exe
, safe-exceptions
, iconv
, th-printf
, co-log
, co-log-core
default-language: Haskell2010
other-modules: Config
, Version

85
src/Broker/PaperBroker.hs

@ -1,4 +1,5 @@ @@ -1,4 +1,5 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE Strict #-}
module Broker.PaperBroker (
@ -6,10 +7,15 @@ module Broker.PaperBroker ( @@ -6,10 +7,15 @@ module Broker.PaperBroker (
mkPaperBroker
) where
import ATrade.Broker.Backend
import ATrade.Broker.Protocol
import ATrade.Broker.Server
import ATrade.Logging (Message, Severity (..),
logWith)
import ATrade.Quotes.QTIS
import ATrade.Types
import Colog (LogAction)
import Commissions (CommissionConfig (..))
import Control.Concurrent hiding (readChan, writeChan)
import Control.Concurrent.BoundedChan
import Control.Monad
@ -20,11 +26,10 @@ import qualified Data.List as L @@ -20,11 +26,10 @@ import qualified Data.List as L
import qualified Data.Map.Strict as M
import Data.Maybe
import qualified Data.Text as T
import qualified Data.Text.Lazy as TL
import Data.Time.Clock
import System.Log.Logger
import Language.Haskell.Printf (t)
import System.ZMQ4
import Commissions (CommissionConfig (..))
import TickTable (TickKey (..), TickTableH,
getTick, getTickerInfo)
@ -33,7 +38,7 @@ data PaperBrokerState = PaperBrokerState { @@ -33,7 +38,7 @@ data PaperBrokerState = PaperBrokerState {
tickTable :: TickTableH,
orders :: M.Map OrderId Order,
cash :: !Price,
notificationCallback :: Maybe (Notification -> IO ()),
notificationCallback :: Maybe (BrokerBackendNotification -> IO ()),
pendingOrders :: [Order],
fortsClassCodes :: [T.Text],
@ -45,14 +50,15 @@ data PaperBrokerState = PaperBrokerState { @@ -45,14 +50,15 @@ data PaperBrokerState = PaperBrokerState {
postMarketStartTime :: DiffTime,
postMarketFixTime :: DiffTime,
postMarketCloseTime :: DiffTime,
commissions :: [CommissionConfig]
commissions :: [CommissionConfig],
logger :: LogAction IO Message
}
hourMin :: Integer -> Integer -> DiffTime
hourMin h m = fromIntegral $ h * 3600 + m * 60
mkPaperBroker :: TickTableH -> BoundedChan Tick -> Price -> [T.Text] -> [CommissionConfig] -> IO BrokerInterface
mkPaperBroker tickTableH tickChan startCash accounts comms = do
mkPaperBroker :: TickTableH -> BoundedChan Tick -> Price -> [T.Text] -> [CommissionConfig] -> LogAction IO Message -> IO BrokerBackend
mkPaperBroker tickTableH tickChan startCash accounts comms l = do
state <- newIORef PaperBrokerState {
pbTid = Nothing,
tickTable = tickTableH,
@ -68,18 +74,19 @@ mkPaperBroker tickTableH tickChan startCash accounts comms = do @@ -68,18 +74,19 @@ mkPaperBroker tickTableH tickChan startCash accounts comms = do
postMarketStartTime = hourMin 15 40,
postMarketFixTime = hourMin 15 45,
postMarketCloseTime = hourMin 15 50,
commissions = comms
commissions = comms,
logger = l
}
tid <- forkIO $ brokerThread tickChan state
atomicModifyIORef' state (\s -> (s { pbTid = Just tid }, ()))
return BrokerInterface {
return BrokerBackend {
accounts = accounts,
setNotificationCallback = pbSetNotificationCallback state,
submitOrder = pbSubmitOrder state,
cancelOrder = pbCancelOrder state,
stopBroker = pbDestroyBroker state }
cancelOrder = void . pbCancelOrder state,
stop = pbDestroyBroker state }
brokerThread :: BoundedChan Tick -> IORef PaperBrokerState -> IO ()
@ -101,7 +108,7 @@ executePendingOrders tick state = do @@ -101,7 +108,7 @@ executePendingOrders tick state = do
then
case orderPrice order of
Market -> do
debugM "PaperBroker" "Executing: pending market order"
log Debug "PaperBroker" "Executing: pending market order"
executeAtTick state order tick
return $ Just $ orderId order
Limit price ->
@ -109,22 +116,27 @@ executePendingOrders tick state = do @@ -109,22 +116,27 @@ executePendingOrders tick state = do
_ -> return Nothing
else return Nothing
log sev comp txt = do
l <- logger <$> readIORef state
logWith l sev comp txt
executeLimitAt price order = case orderOperation order of
Buy -> if (datatype tick == LastTradePrice && price > value tick && value tick > 0) || (datatype tick == BestOffer && price > value tick && value tick > 0)
Buy -> if (datatype tick == LastTradePrice && price > value tick && value tick > 0) ||
(datatype tick == BestOffer && price > value tick && value tick > 0)
then do
debugM "PaperBroker" $ "[1]Executing: pending limit order: " ++ show (security tick) ++ "/" ++ show (orderSecurity order)
log Debug "PaperBroker" $ TL.toStrict $ [t|[1]Executing: pending limit order: %Q/%Q|] (security tick) (orderSecurity order)
executeAtTick state order $ tick { value = price }
return $ Just $ orderId order
else return Nothing
Sell -> if (datatype tick == LastTradePrice && price < value tick && value tick > 0) || (datatype tick == BestBid && price < value tick && value tick > 0)
then do
debugM "PaperBroker" $ "[2]Executing: pending limit order: " ++ show (security tick) ++ "/" ++ show (orderSecurity order)
log Debug "PaperBroker" $ TL.toStrict $ [t|[2]Executing: pending limit order: %Q/%Q|] (security tick) (orderSecurity order)
executeAtTick state order $ tick { value = price }
return $ Just $ orderId order
else return Nothing
pbSetNotificationCallback :: IORef PaperBrokerState -> Maybe (Notification -> IO ()) -> IO()
pbSetNotificationCallback :: IORef PaperBrokerState -> Maybe (BrokerBackendNotification -> IO ()) -> IO()
pbSetNotificationCallback state callback = atomicModifyIORef' state (\s -> (s { notificationCallback = callback }, ()) )
mkTrade :: TickerInfo -> Tick -> Order -> UTCTime -> Maybe CommissionConfig -> Trade
@ -157,10 +169,10 @@ executeAtTick state order tick = do @@ -157,10 +169,10 @@ executeAtTick state order tick = do
comm <- L.find (\comdef -> comPrefix comdef `T.isPrefixOf` security tick) . commissions <$> readIORef state
let tradeVolume = fromInteger (orderQuantity order) * value tick * fromInteger (tiLotSize tickerInfo)
atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s , cash = cash s - tradeVolume}, ()))
debugM "PaperBroker" $ "Executed: " ++ show newOrder ++ "; at tick: " ++ show tick
log Debug "PaperBroker" $ TL.toStrict $ [t|Executed: %? at tick: %?|] newOrder tick
ts <- getCurrentTime
maybeCall notificationCallback state $ TradeNotification $ mkTrade tickerInfo tick order ts comm
maybeCall notificationCallback state $ OrderNotification (orderId order) Executed
maybeCall notificationCallback state $ BackendTradeNotification $ mkTrade tickerInfo tick order ts comm
maybeCall notificationCallback state $ BackendOrderNotification (orderId order) Executed
where
obtainTickerInfo tickerId = do
table <- tickTable <$> readIORef state
@ -170,16 +182,20 @@ executeAtTick state order tick = do @@ -170,16 +182,20 @@ executeAtTick state order tick = do
_ -> return TickerInfo { tiTicker = tickerId,
tiLotSize = 1,
tiTickSize = 1 }
log sev comp txt = do
l <- logger <$> readIORef state
logWith l sev comp txt
rejectOrder state order = do
let newOrder = order { orderState = Rejected } in
atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s }, ()))
maybeCall notificationCallback state $ OrderNotification (orderId order) Submitted
maybeCall notificationCallback state $ OrderNotification (orderId order) Rejected
maybeCall notificationCallback state $ BackendOrderNotification (orderId order) Submitted
maybeCall notificationCallback state $ BackendOrderNotification (orderId order) Rejected
pbSubmitOrder :: IORef PaperBrokerState -> Order -> IO ()
pbSubmitOrder state order = do
infoM "PaperBroker" $ "Submitted order: " ++ show order
log Info "PaperBroker" $ "Submitted order: " <> (T.pack . show) order
case orderPrice order of
Market -> executeMarketOrder state order
Limit price -> submitLimitOrder price state order
@ -187,6 +203,9 @@ pbSubmitOrder state order = do @@ -187,6 +203,9 @@ pbSubmitOrder state order = do
StopMarket trigger -> submitStopMarketOrder state order
where
log sev comp txt = do
l <- logger <$> readIORef state
logWith l sev comp txt
executeMarketOrder state order = do
tm <- tickTable <$> readIORef state
tickMb <- getTick tm key
@ -200,25 +219,26 @@ pbSubmitOrder state order = do @@ -200,25 +219,26 @@ pbSubmitOrder state order = do
else do
tm <- tickTable <$> readIORef state
tickMb <- getTick tm key
debugM "PaperBroker" $ "Limit order submitted, looking up: " ++ show key
log Debug "PaperBroker" $ "Limit order submitted, looking up: " <> (T.pack . show) key
case tickMb of
Nothing -> do
let newOrder = order { orderState = Submitted }
atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s }, ()))
maybeCall notificationCallback state $ OrderNotification (orderId order) Submitted
maybeCall notificationCallback state $ BackendOrderNotification (orderId order) Submitted
Just tick -> do
marketOpenTime' <- marketOpenTime <$> readIORef state
if (((orderOperation order == Buy) && (value tick < price)) || ((orderOperation order == Sell) && (value tick > price)) && (utctDayTime (timestamp tick) >= marketOpenTime'))
if (((orderOperation order == Buy) && (value tick < price)) ||
((orderOperation order == Sell) && (value tick > price)) && (utctDayTime (timestamp tick) >= marketOpenTime'))
then do
maybeCall notificationCallback state $ OrderNotification (orderId order) Submitted
maybeCall notificationCallback state $ BackendOrderNotification (orderId order) Submitted
executeAtTick state order tick
else do
let newOrder = order { orderState = Submitted }
atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s , pendingOrders = newOrder : pendingOrders s}, ()))
maybeCall notificationCallback state $ OrderNotification (orderId order) Submitted
maybeCall notificationCallback state $ BackendOrderNotification (orderId order) Submitted
submitStopOrder _ _ = warningM "PaperBroker" $ "Not implemented: Submitted order: " ++ show order
submitStopMarketOrder _ _ = warningM "PaperBroker" $ "Not implemented: Submitted order: " ++ show order
submitStopOrder _ _ = log Warning "PaperBroker" $ "Not implemented: Submitted order: " <> (T.pack . show) order
submitStopMarketOrder _ _ = log Warning "PaperBroker" $ "Not implemented: Submitted order: " <> (T.pack . show) order
orderDatatype = case orderOperation order of
Buy -> BestOffer
@ -230,7 +250,7 @@ pbCancelOrder :: IORef PaperBrokerState -> OrderId -> IO Bool @@ -230,7 +250,7 @@ pbCancelOrder :: IORef PaperBrokerState -> OrderId -> IO Bool
pbCancelOrder state oid = do
atomicModifyIORef' state (\s -> (s { pendingOrders = L.filter (\o -> orderId o /= oid) (pendingOrders s),
orders = M.adjustWithKey (\_ v -> v { orderState = Cancelled }) oid (orders s) }, ()))
maybeCall notificationCallback state $ OrderNotification oid Cancelled
maybeCall notificationCallback state $ BackendOrderNotification oid Cancelled
return True
pbDestroyBroker :: IORef PaperBrokerState -> IO ()
@ -240,8 +260,3 @@ pbDestroyBroker state = do @@ -240,8 +260,3 @@ pbDestroyBroker state = do
Just tid -> killThread tid
Nothing -> return ()
{-
pbGetOrder :: IORef PaperBrokerState -> OrderId -> IO (Maybe Order)
pbGetOrder state oid = M.lookup oid . orders <$> readIORef state
-}

150
src/Broker/QuikBroker.hs

@ -1,49 +1,55 @@ @@ -1,49 +1,55 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE QuasiQuotes #-}
module Broker.QuikBroker (
mkQuikBroker
) where
import ATrade.Types
import ATrade.Broker.Protocol
import ATrade.Broker.Server
import ATrade.Quotes.QTIS (TickerInfo(..))
import ATrade.Broker.Backend
import ATrade.Broker.Protocol
import ATrade.Broker.Server
import ATrade.Quotes.QTIS (TickerInfo (..))
import ATrade.Types
import Broker.QuikBroker.Trans2QuikApi hiding (tradeAccount)
import Broker.QuikBroker.Trans2QuikApi hiding (logger, tradeAccount)
import Data.IORef
import Data.List.Split
import qualified Data.List as L
import qualified Data.Map as M
import qualified Data.Bimap as BM
import qualified Data.Text as T
import qualified Data.Text.Lazy as TL
import Data.Text.Format
import qualified Data.Bimap as BM
import Data.IORef
import qualified Data.List as L
import Data.List.Split
import qualified Data.Map as M
import qualified Data.Text as T
import qualified Data.Text.Lazy as TL
import Control.Monad
import Control.Concurrent
import Control.Concurrent.BoundedChan
import Control.Monad.Trans.Except
import Control.Monad.IO.Class
import System.Log.Logger
import ATrade.Logging (Message, Severity (..),
logWith)
import Colog (LogAction)
import Control.Concurrent
import Control.Concurrent.BoundedChan
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Trans.Except
import Language.Haskell.Printf (t)
import Safe
import Safe
import Commissions (CommissionConfig(..))
import TickTable (TickTableH, getTick, getTickerInfo, TickKey(..))
import Commissions (CommissionConfig (..))
import TickTable (TickKey (..), TickTableH,
getTick, getTickerInfo)
type QuikOrderId = Integer
data QuikBrokerState = QuikBrokerState {
notificationCallback :: Maybe (Notification -> IO ()),
quik :: IORef Quik,
orderMap :: M.Map OrderId Order,
orderIdMap :: BM.Bimap QuikOrderId OrderId,
trans2orderid :: M.Map Integer Order,
transIdCounter :: Integer,
tickTable :: TickTableH
notificationCallback :: Maybe (BrokerBackendNotification -> IO ()),
quik :: IORef Quik,
orderMap :: M.Map OrderId Order,
orderIdMap :: BM.Bimap QuikOrderId OrderId,
trans2orderid :: M.Map Integer Order,
transIdCounter :: Integer,
tickTable :: TickTableH,
logger :: LogAction IO Message
}
nextTransId state = atomicModifyIORef' state (\s -> (s { transIdCounter = transIdCounter s + 1 }, transIdCounter s))
@ -52,11 +58,11 @@ maybeCall proj state arg = do @@ -52,11 +58,11 @@ maybeCall proj state arg = do
cb <- proj <$> readIORef state
case cb of
Just callback -> callback arg
Nothing -> return ()
Nothing -> return ()
mkQuikBroker :: TickTableH -> FilePath -> FilePath -> [T.Text] -> [CommissionConfig] -> IO BrokerInterface
mkQuikBroker tt dllPath quikPath accs comms = do
q <- mkQuik dllPath quikPath
mkQuikBroker :: TickTableH -> FilePath -> FilePath -> [T.Text] -> [CommissionConfig] -> LogAction IO Message -> IO BrokerBackend
mkQuikBroker tt dllPath quikPath accs comms l = do
q <- mkQuik dllPath quikPath l
msgChan <- newBoundedChan 100
@ -67,17 +73,18 @@ mkQuikBroker tt dllPath quikPath accs comms = do @@ -67,17 +73,18 @@ mkQuikBroker tt dllPath quikPath accs comms = do
orderIdMap = BM.empty,
trans2orderid = M.empty,
transIdCounter = 1,
tickTable = tt
tickTable = tt,
logger = l
}
setCallbacks q (qbTransactionCallback state) (qbOrderCallback state) (qbTradeCallback state comms)
return BrokerInterface {
return BrokerBackend {
accounts = accs,
setNotificationCallback = qbSetNotificationCallback state,
submitOrder = qbSubmitOrder state,
cancelOrder = qbCancelOrder state,
stopBroker = qbStopBroker state
cancelOrder = void . qbCancelOrder state,
stop = qbStopBroker state
}
qbSetNotificationCallback state maybecb = atomicModifyIORef' state (\s -> (s {
@ -88,24 +95,28 @@ qbSubmitOrder state order = do @@ -88,24 +95,28 @@ qbSubmitOrder state order = do
transId <- nextTransId state
atomicModifyIORef' state (\s -> (s {
trans2orderid = M.insert transId order (trans2orderid s) }, ()))
debugM "Quik" "Getting ticktable"
log Debug "Quik" "Getting ticktable"
tt <- tickTable <$> readIORef state
debugM "Quik" "Getting tickerinfo from ticktable"
log Debug "Quik" "Getting tickerinfo from ticktable"
tickerInfoMb <- getTickerInfo tt (orderSecurity order)
debugM "Quik" "Getting liquid ticks"
log Debug "Quik" "Getting liquid ticks"
liquidTickMb <- getTick tt (TickKey (orderSecurity order) (if orderOperation order == Buy then BestOffer else BestBid))
debugM "Quik" "Obtained"
log Debug "Quik" "Obtained"
case (tickerInfoMb, liquidTickMb) of
(Just !tickerInfo, Just !liquidTick) ->
(Just !tickerInfo, Just !liquidTick) ->
case makeTransactionString tickerInfo liquidTick transId order of
Just transStr -> do
rc <- quikSendTransaction q transStr
debugM "Quik" $ "Sending transaction string: " ++ transStr
log Debug "Quik" $ "Sending transaction string: " <> T.pack transStr
case rc of
Left errmsg -> warningM "Quik" $ "Unable to send transaction: " ++ T.unpack errmsg
Right _ -> debugM "Quik" $ "Order submitted: " ++ show order
Nothing -> warningM "Quik" $ "Unable to compose transaction string: " ++ show order
_ -> warningM "Quik" $ "Unable to obtain data: " ++ show tickerInfoMb ++ "/" ++ show liquidTickMb
Left errmsg -> log Warning "Quik" $ "Unable to send transaction: " <> errmsg
Right _ -> log Debug "Quik" $ "Order submitted: " <> (T.pack . show) order
Nothing -> log Warning "Quik" $ "Unable to compose transaction string: " <> (T.pack . show) order
_ -> log Warning "Quik" $ TL.toStrict $ [t|Unable to obtain data: %?/%?|] tickerInfoMb liquidTickMb
where
log sev comp txt = do
l <- logger <$> readIORef state
logWith l sev comp txt
qbCancelOrder state orderid = do
@ -118,10 +129,14 @@ qbCancelOrder state orderid = do @@ -118,10 +129,14 @@ qbCancelOrder state orderid = do
Just transString -> do
rc <- quikSendTransaction q transString
case rc of
Left errmsg -> warningM "Quik" ("Unable to send transaction: " ++ T.unpack errmsg) >> return False
Right _ -> debugM "Quik" ("Order cancelled: " ++ show orderid) >> return True
Nothing -> warningM "Quik" ("Unable to compose transaction string: " ++ show orderid) >> return False
_ -> warningM "Quik" ("Got request to cancel unknown order: " ++ show orderid) >> return False
Left errmsg -> log Warning "Quik" ("Unable to send transaction: " <> errmsg) >> return False
Right _ -> log Debug "Quik" ("Order cancelled: " <> (T.pack . show) orderid) >> return True
Nothing -> log Warning "Quik" ("Unable to compose transaction string: " <> (T.pack . show) orderid) >> return False
_ -> log Warning "Quik" ("Got request to cancel unknown order: " <> (T.pack . show) orderid) >> return False
where
log sev comp txt = do
l <- logger <$> readIORef state
logWith l sev comp txt
qbStopBroker state = return ()
@ -139,11 +154,11 @@ makeTransactionString tickerInfo liquidTick transId order = @@ -139,11 +154,11 @@ makeTransactionString tickerInfo liquidTick transId order =
_ -> Nothing
where
orderTypeCode = case orderPrice order of
Market -> "L"
Market -> "L"
Limit _ -> "L"
_ -> "X"
_ -> "X"
operationCode = case orderOperation order of
Buy -> "B"
Buy -> "B"
Sell -> "S"
classcode = headMay . splitOn "#" . T.unpack $ orderSecurity order
seccode = (`atMay` 1) . splitOn "#" . T.unpack $ orderSecurity order
@ -179,7 +194,7 @@ qbTransactionCallback state success transactionId orderNum = do @@ -179,7 +194,7 @@ qbTransactionCallback state success transactionId orderNum = do
newOrder <- if success
then registerOrder orderNum $ order { orderState = Unsubmitted }
else registerOrder orderNum $ order { orderState = Rejected }
maybeCall notificationCallback state (OrderNotification (orderId newOrder) (orderState newOrder))
maybeCall notificationCallback state (BackendOrderNotification (orderId newOrder) (orderState newOrder))
Nothing -> return ()
where
@ -190,7 +205,7 @@ qbTransactionCallback state success transactionId orderNum = do @@ -190,7 +205,7 @@ qbTransactionCallback state success transactionId orderNum = do
qbOrderCallback state quikorder = do
orders <- orderMap <$> readIORef state
idMap <- orderIdMap <$> readIORef state
debugM "Quik" $ "Order: " ++ show quikorder
log Debug "Quik" $ "Order: " <> (T.pack . show) quikorder
case BM.lookup (qoOrderId quikorder) idMap >>= flip M.lookup orders of
Just order -> do
updatedOrder <- if | qoStatus quikorder /= 1 && qoStatus quikorder /= 2 ->
@ -201,8 +216,8 @@ qbOrderCallback state quikorder = do @@ -201,8 +216,8 @@ qbOrderCallback state quikorder = do
submitted order
| qoStatus quikorder == 2 ->
cancelled order
maybeCall notificationCallback state (OrderNotification (orderId updatedOrder) (orderState updatedOrder))
Nothing -> warningM "Quik" $ "Unknown order: state callback called: " ++ show quikorder
maybeCall notificationCallback state (BackendOrderNotification (orderId updatedOrder) (orderState updatedOrder))
Nothing -> log Warning "Quik" $ "Unknown order: state callback called: " <> (T.pack . show) quikorder
where
updateOrder :: Order -> IO Order
@ -214,15 +229,19 @@ qbOrderCallback state quikorder = do @@ -214,15 +229,19 @@ qbOrderCallback state quikorder = do
submitted order = updateOrder $ order { orderState = Submitted }
cancelled order = updateOrder $ order { orderState = Cancelled }
log sev comp txt = do
l <- logger <$> readIORef state
logWith l sev comp txt
qbTradeCallback state comms quiktrade = do
orders <- orderMap <$> readIORef state
idMap <- orderIdMap <$> readIORef state
debugM "Quik" $ "Trade: " ++ show quiktrade
log Debug "Quik" $ "Trade: " <> (T.pack . show) quiktrade
case BM.lookup (qtOrderId quiktrade) idMap >>= flip M.lookup orders of
Just order -> do
debugM "Quik" $ "Found comm: " ++ show (L.find (\x -> comPrefix x `T.isPrefixOf` orderSecurity order) comms)
maybeCall notificationCallback state (TradeNotification $ tradeFor order)
Nothing -> warningM "Quik" $ "Incoming trade for unknown order: " ++ show quiktrade
log Debug "Quik" $ "Found comm: " <> (T.pack . show) (L.find (\x -> comPrefix x `T.isPrefixOf` orderSecurity order) comms)
maybeCall notificationCallback state (BackendTradeNotification $ tradeFor order)
Nothing -> log Warning "Quik" $ "Incoming trade for unknown order: " <> (T.pack . show) quiktrade
where
tradeFor order = Trade {
tradeOrderId = orderId order,
@ -241,3 +260,6 @@ qbTradeCallback state comms quiktrade = do @@ -241,3 +260,6 @@ qbTradeCallback state comms quiktrade = do
Just com -> vol * fromDouble (0.01 * comPercentage com) + fromDouble (comFixed com) * fromIntegral qty
Nothing -> 0
log sev comp txt = do
l <- logger <$> readIORef state
logWith l sev comp txt

116
src/Broker/QuikBroker/Trans2QuikApi.hs

@ -1,5 +1,6 @@ @@ -1,5 +1,6 @@
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
module Broker.QuikBroker.Trans2QuikApi (
Trans2QuikApi(..),
@ -12,7 +13,9 @@ module Broker.QuikBroker.Trans2QuikApi ( @@ -12,7 +13,9 @@ module Broker.QuikBroker.Trans2QuikApi (
quikSendTransaction
) where
import ATrade.Logging (Message, Severity (..), logWith)
import Codec.Text.IConv
import Colog (LogAction)
import Control.Concurrent
import Control.Error.Util
import Control.Exception.Safe
@ -26,6 +29,7 @@ import Data.Ratio @@ -26,6 +29,7 @@ import Data.Ratio
import qualified Data.Set as S
import qualified Data.Text as T
import Data.Text.Encoding
import qualified Data.Text.Lazy as TL
import Data.Time.Calendar
import Data.Time.Clock
import Data.Typeable
@ -33,7 +37,8 @@ import Foreign @@ -33,7 +37,8 @@ import Foreign
import Foreign.C.String
import Foreign.C.Types
import Foreign.Marshal.Array
import System.Log.Logger
import Language.Haskell.Printf (t)
import Prelude hiding (log)
import System.Win32.DLL
import System.Win32.Types
@ -393,7 +398,9 @@ data Quik = Quik { @@ -393,7 +398,9 @@ data Quik = Quik {
watchdogTid :: ThreadId,
handledTrades :: S.Set CLLong,
handledOrders :: S.Set QuikOrder
handledOrders :: S.Set QuikOrder,
logger :: LogAction IO Message
}
quikSendTransaction :: IORef Quik -> String -> IO (Either T.Text ())
@ -417,11 +424,12 @@ setCallbacks quik transCb orCb tradeCb = atomicModifyIORef' quik (\s -> @@ -417,11 +424,12 @@ setCallbacks quik transCb orCb tradeCb = atomicModifyIORef' quik (\s ->
hlTradeCallback = Just tradeCb }, ()))
mkQuik :: FilePath -> FilePath -> IO (IORef Quik)
mkQuik dllpath quikpath = do
mkQuik :: FilePath -> FilePath -> LogAction IO Message -> IO (IORef Quik)
mkQuik dllpath quikpath l = do
api <- loadQuikApi dllpath
debugM "Quik" "Dll loaded"
let log = logWith l
log Debug "Quik" "Dll loaded"
myTid <- myThreadId
state <- newIORef Quik { quikApi = api,
@ -432,7 +440,8 @@ mkQuik dllpath quikpath = do @@ -432,7 +440,8 @@ mkQuik dllpath quikpath = do
hlOrderCallback = Nothing,
hlTradeCallback = Nothing,
handledTrades = S.empty,
handledOrders = S.empty }
handledOrders = S.empty,
logger = l }
conncb' <- mkConnectionStatusCallback (defaultConnectionCb state)
transcb' <- mkTransactionsReplyCallback (defaultTransactionReplyCb state)
@ -446,25 +455,29 @@ mkQuik dllpath quikpath = do @@ -446,25 +455,29 @@ mkQuik dllpath quikpath = do
tid <- forkIO $ watchdog quikpath state
atomicModifyIORef' state (\s -> (s { watchdogTid = tid }, ()))
debugM "Quik" "mkQuik done"
log Debug "Quik" "mkQuik done"
return state
defaultConnectionCb :: IORef Quik -> LONG -> LONG -> LPSTR -> IO ()
defaultConnectionCb state event errorCode infoMessage
| event == ecQuikConnected = infoM "Quik" "Quik connected" >> atomicModifyIORef' state (\s -> (s { connectedToServer = True }, ()) )
| event == ecQuikDisconnected = infoM "Quik" "Quik disconnected" >> atomicModifyIORef' state (\s -> (s { connectedToServer = False }, ()) )
| event == ecDllConnected = infoM "Quik" "DLL connected" >> atomicModifyIORef' state (\s -> (s { connectedToDll = True }, ()) )
| event == ecDllDisconnected = infoM "Quik" "DLL disconnected" >> atomicModifyIORef' state (\s -> (s { connectedToDll = True }, ()) )
| otherwise = debugM "Quik" $ "Connection event: " ++ show event
| event == ecQuikConnected = log Info "Quik" "Quik connected" >> atomicModifyIORef' state (\s -> (s { connectedToServer = True }, ()) )
| event == ecQuikDisconnected = log Info "Quik" "Quik disconnected" >> atomicModifyIORef' state (\s -> (s { connectedToServer = False }, ()) )
| event == ecDllConnected = log Info "Quik" "DLL connected" >> atomicModifyIORef' state (\s -> (s { connectedToDll = True }, ()) )
| event == ecDllDisconnected = log Info "Quik" "DLL disconnected" >> atomicModifyIORef' state (\s -> (s { connectedToDll = True }, ()) )
| otherwise = log Debug "Quik" $ "Connection event: " <> (T.pack . show) event
where
log sev comp txt = do
l <- logger <$> readIORef state
logWith l sev comp txt
defaultTransactionReplyCb :: IORef Quik -> LONG -> LONG -> LONG -> DWORD -> CLLong -> LPSTR -> CIntPtr -> IO ()
defaultTransactionReplyCb state transactionResult errorCode replyCode transId orderNum replyMessage replyDesc = do
debugM "Quik" $ "Transaction cb:" ++ show transactionResult ++ "/" ++ show errorCode ++ "/" ++ show replyCode
log Debug "Quik" $ TL.toStrict $ [t|Transaction cb: %d/%d/%d|] transactionResult errorCode replyCode
when (replyMessage /= nullPtr) $ do
s <- convert "CP1251" "UTF-8" . BL.fromStrict <$> BS.packCString replyMessage
case decodeUtf8' (BL.toStrict s) of
Left _ -> warningM "Quik" "Unable to decode utf-8"
Right msg -> debugM "Quik" $ "Transaction cb message:" ++ T.unpack msg
Left _ -> log Warning "Quik" "Unable to decode utf-8"
Right msg -> log Debug "Quik" $ "Transaction cb message:" <> msg
maybecb <- hlTransactionCallback <$> readIORef state
case maybecb of
@ -472,10 +485,13 @@ defaultTransactionReplyCb state transactionResult errorCode replyCode transId or @@ -472,10 +485,13 @@ defaultTransactionReplyCb state transactionResult errorCode replyCode transId or
Nothing -> return ()
where
rcInsufficientFunds = 4
log sev comp txt = do
l <- logger <$> readIORef state
logWith l sev comp txt
defaultOrderCb :: IORef Quik -> LONG -> DWORD -> CLLong -> LPSTR -> LPSTR -> CDouble -> CLLong -> CDouble -> LONG -> LONG -> CIntPtr -> IO ()
defaultOrderCb state mode transId dnumber classCode secCode price balance value sell status desc = do
debugM "Quik" $ "Trade cb: " ++ show mode ++ "/" ++ show dnumber ++ "/" ++ show transId
log Debug "Quik" $ TL.toStrict $ [t|Trade cb: %d/%d/%d|] mode dnumber transId
orders <- handledOrders <$> readIORef state
when (mode == 0) $ do
maybecb <- hlOrderCallback <$> readIORef state
@ -487,21 +503,24 @@ defaultOrderCb state mode transId dnumber classCode secCode price balance value @@ -487,21 +503,24 @@ defaultOrderCb state mode transId dnumber classCode secCode price balance value
case maybecb of
Just cb -> cb order
Nothing -> return ()
where
mkOrder :: String -> String -> QuikOrder
mkOrder sclass ssec = QuikOrder {
qoTransId = toInteger transId,
qoOrderId = toInteger dnumber,
qoTicker = sclass ++ "#" ++ ssec,
qoPrice = toDouble price,
qoBalance = toInteger balance,
qoSell = sell == 1,
qoStatus = fromIntegral status
}
where
mkOrder :: String -> String -> QuikOrder
mkOrder sclass ssec = QuikOrder {
qoTransId = toInteger transId,
qoOrderId = toInteger dnumber,
qoTicker = sclass ++ "#" ++ ssec,
qoPrice = toDouble price,
qoBalance = toInteger balance,
qoSell = sell == 1,
qoStatus = fromIntegral status
}
log sev comp txt = do
l <- logger <$> readIORef state
logWith l sev comp txt
defaultTradeCb :: IORef Quik -> LONG -> CLLong -> CLLong -> LPSTR -> LPSTR -> CDouble -> CLLong -> CDouble -> LONG -> CIntPtr -> IO ()
defaultTradeCb state mode dnumber orderNum classCode secCode price qty value sell desc = do
debugM "Quik" $ "Trade cb: " ++ show mode ++ "/" ++ show dnumber
log Debug "Quik" $ TL.toStrict $ [t|Trade cb: %d/%d|] mode dnumber
trades <- handledTrades <$> readIORef state
when (mode == 0 && dnumber `S.notMember` trades) $ do
atomicModifyIORef' state (\s -> (s { handledTrades = S.insert dnumber (handledTrades s) }, ()))
@ -517,8 +536,8 @@ defaultTradeCb state mode dnumber orderNum classCode secCode price qty value sel @@ -517,8 +536,8 @@ defaultTradeCb state mode dnumber orderNum classCode secCode price qty value sel
currency <- tradeCurrency api desc >>= peekCString
cb (trade ssec sclass ymd hms us currency)
Nothing -> return ()
where
trade ssec sclass ymd hms us currency = QuikTrade {
where
trade ssec sclass ymd hms us currency = QuikTrade {
qtOrderId = toInteger orderNum,
qtTicker = sclass ++ "#" ++ ssec,
qtPrice = toDouble price,
@ -528,8 +547,8 @@ defaultTradeCb state mode dnumber orderNum classCode secCode price qty value sel @@ -528,8 +547,8 @@ defaultTradeCb state mode dnumber orderNum classCode secCode price qty value sel
qtVolumeCurrency = currency,
qtTimestamp = adjustTimestamp $ mkTimestamp ymd hms us
}
adjustTimestamp = addUTCTime (-3 * 3600) -- MSK -> UTC
mkTimestamp ymd hms us = UTCTime (fromGregorian y mon d) (fromInteger (h * 3600 + m * 60 + s) + fromRational (us % 1000000))
adjustTimestamp = addUTCTime (-3 * 3600) -- MSK -> UTC
mkTimestamp ymd hms us = UTCTime (fromGregorian y mon d) (fromInteger (h * 3600 + m * 60 + s) + fromRational (us % 1000000))
where
y = ymd `div` 10000
mon = fromEnum $ (ymd `mod` 10000) `div` 100
@ -537,6 +556,9 @@ defaultTradeCb state mode dnumber orderNum classCode secCode price qty value sel @@ -537,6 +556,9 @@ defaultTradeCb state mode dnumber orderNum classCode secCode price qty value sel
h = hms `div` 10000
m = (hms `mod` 10000) `div` 100
s = hms `mod` 100
log sev comp txt = do
l <- logger <$> readIORef state
logWith l sev comp txt
watchdog :: FilePath -> IORef Quik -> IO ()
@ -552,23 +574,27 @@ watchdog quikpath state = do @@ -552,23 +574,27 @@ watchdog quikpath state = do
err <- setConnectionStatusCallback api conncb errorCode errorMsg 1024
if err /= ecSuccess
then warningM "Quik.Watchdog" $ "Error: " ++ show err
then log Warning "Quik.Watchdog" $ TL.toStrict $ [t|Error: %d|] err
else forever $ do
conn <- connectedToDll <$> readIORef state
handle
(\(QuikException errmsg rc) -> warningM "Quik.Watchdog" $ (T.unpack errmsg) ++ " (" ++ show rc ++ ")") $
unless conn $
withCString quikpath (\path -> do
err <- connect api path errorCode errorMsg 1024
if err /= ecSuccess && err /= ecAlreadyConnectedToQuik
then warningM "Quik.Watchdog" $ "Unable to connect: " ++ show err
else withCString "" (\emptyStr -> do
throwIfErr "setTransactionsReplyCallback returned error" $ setTransactionsReplyCallback api transcb errorCode errorMsg 1024
throwIfErr "subscribeOrders returned error" $ subscribeOrders api emptyStr emptyStr
startOrders api orcb
throwIfErr "subscribeTrades returned error" $ subscribeTrades api emptyStr emptyStr
startTrades api tradecb))
(\(QuikException errmsg rc) -> log Warning "Quik.Watchdog" $ TL.toStrict $ [t|%Q (%d)|] errmsg rc) $
unless conn $
withCString quikpath (\path -> do
err <- connect api path errorCode errorMsg 1024
if err /= ecSuccess && err /= ecAlreadyConnectedToQuik
then log Debug "Quik.Watchdog" $ "Unable to connect: " <> (T.pack . show) err
else withCString "" (\emptyStr -> do
throwIfErr "setTransactionsReplyCallback returned error" $ setTransactionsReplyCallback api transcb errorCode errorMsg 1024
throwIfErr "subscribeOrders returned error" $ subscribeOrders api emptyStr emptyStr
startOrders api orcb
throwIfErr "subscribeTrades returned error" $ subscribeTrades api emptyStr emptyStr
startTrades api tradecb))
threadDelay 10000000))
where
log sev comp txt = do
l <- logger <$> readIORef state
logWith l sev comp txt
throwIfErr :: T.Text -> IO LONG -> IO ()
throwIfErr errmsg action = do

22
src/QuoteSource/PipeReader.hs

@ -6,8 +6,11 @@ module QuoteSource.PipeReader ( @@ -6,8 +6,11 @@ module QuoteSource.PipeReader (
stopPipeReader
) where
import ATrade.Logging (Message, Severity (..),
logWith)
import ATrade.QuoteSource.Server
import ATrade.Types
import Colog (LogAction)
import Control.Applicative
import Control.Concurrent hiding (readChan, writeChan,
writeList2Chan, yield)
@ -35,7 +38,6 @@ import Data.Time.Clock @@ -35,7 +38,6 @@ import Data.Time.Clock
import Foreign.Marshal.Alloc
import Safe
import System.IO
import System.Log.Logger (debugM, warningM)
import System.ZMQ4
@ -46,10 +48,10 @@ data PipeReaderHandle = @@ -46,10 +48,10 @@ data PipeReaderHandle =
} deriving (Eq)
zmqSocketConduit :: (Subscriber a, Receiver a) => T.Text -> Socket a -> IORef Bool -> Source IO [B.ByteString]
zmqSocketConduit ep sock running' = do
zmqSocketConduit :: (Subscriber a, Receiver a) => T.Text -> Socket a -> IORef Bool -> LogAction IO Message -> Source IO [B.ByteString]
zmqSocketConduit ep sock running' logger = do
liftIO $ do
debugM "PipeReader" $ "Connecting to: " ++ T.unpack ep
logWith logger Info "PipeReader" $ "Connecting to: " <> ep
connect sock (T.unpack ep)
subscribe sock B.empty
lastHeartbeat <- liftIO $ getCurrentTime >>= newIORef
@ -59,7 +61,7 @@ zmqSocketConduit ep sock running' = do @@ -59,7 +61,7 @@ zmqSocketConduit ep sock running' = do
bs <- liftIO $ receiveMulti sock
when ((not . null $ bs) && (head bs == "SYSTEM#HEARTBEAT")) $ liftIO $ getCurrentTime >>= writeIORef lastHeartbeat
yield bs
zmqSocketConduit ep sock running'
zmqSocketConduit ep sock running' logger
where
notTimeout hb = do
now <- liftIO $ getCurrentTime
@ -80,16 +82,16 @@ chanSink chan = awaitForever @@ -80,16 +82,16 @@ chanSink chan = awaitForever
(\t -> do
liftIO $ writeChan chan t)
startPipeReader :: Context -> T.Text -> BoundedChan QuoteSourceServerData -> IO PipeReaderHandle
startPipeReader ctx pipeEndpoint tickChan = do
debugM "PipeReader" $ "Trying to open pipe: " ++ T.unpack pipeEndpoint
startPipeReader :: Context -> T.Text -> BoundedChan QuoteSourceServerData -> LogAction IO Message -> IO PipeReaderHandle
startPipeReader ctx pipeEndpoint tickChan logger = do
logWith logger Debug "PipeReader" $ "Trying to open pipe: " <> pipeEndpoint
s <- socket ctx Sub
debugM "PipeReader" "Pipe opened"
logWith logger Info "PipeReader" "Pipe opened"
running' <- newIORef True
tid <- forkIO $ readerThread s running'
return PipeReaderHandle { prThreadId = tid, running = running' }
where
readerThread s running' = runConduit $ (zmqSocketConduit pipeEndpoint s running') =$= parseBarConduit =$= qssdataConduit =$= chanSink tickChan
readerThread s running' = runConduit $ (zmqSocketConduit pipeEndpoint s running' logger) =$= parseBarConduit =$= qssdataConduit =$= chanSink tickChan
stopPipeReader :: PipeReaderHandle -> IO ()
stopPipeReader h = killThread (prThreadId h) >> writeIORef (running h) False

3
src/System/Win32/DDE.hs

@ -40,7 +40,6 @@ import Foreign @@ -40,7 +40,6 @@ import Foreign
import Foreign.C.Types
import Foreign.C.String
import Foreign.Marshal.Array
import System.Log.Logger (debugM, warningM)
import qualified Data.ByteString.Lazy as BL
@ -122,7 +121,6 @@ ddeCallback state msgType format hConv hsz1 hsz2 hData dwData1 dwData2 @@ -122,7 +121,6 @@ ddeCallback state msgType format hConv hsz1 hsz2 hData dwData1 dwData2
handleConnect state hsz1 hsz2 = do
myDdeState <- readIORef state
maybeAppName <- queryString myDdeState 256 hsz2
debugM "DDE" $ "Handle connect:" ++ show maybeAppName
case maybeAppName of
Just incomingAppName -> do
if incomingAppName == appName myDdeState
@ -140,7 +138,6 @@ ddeCallback state msgType format hConv hsz1 hsz2 hData dwData1 dwData2 @@ -140,7 +138,6 @@ ddeCallback state msgType format hConv hsz1 hsz2 hData dwData1 dwData2
Just topic -> withDdeData hData (\xlData -> do
case runGetOrFail xlParser $ BL.fromStrict xlData of
Left (_, _, errmsg) -> do
warningM "DDE" $ "Parsing error: " ++ show errmsg
return ddeResultFalse
Right (_, _, table) -> do
rc <- (dataCallback myDdeState) topic table

3
src/TickTable.hs

@ -24,8 +24,6 @@ import Data.IORef (IORef, newIORef, atomicModifyIORef', readIORef) @@ -24,8 +24,6 @@ import Data.IORef (IORef, newIORef, atomicModifyIORef', readIORef)
import qualified Data.Map.Strict as M
import qualified Data.Text as T
import System.Log.Logger (debugM)
import System.ZMQ4 (Context)
data TickKey = TickKey TickerId DataType
@ -62,7 +60,6 @@ mkTickTable chan ctx qtisEndpoint = do @@ -62,7 +60,6 @@ mkTickTable chan ctx qtisEndpoint = do
qtisThread r qtisChan ctx qtisEndpoint = forever $ do
threadDelay 1000000
requests <- readListFromChan qtisChan
debugM "TickTable" $ "Requested info for tickers: " ++ show requests
ti <- qtisGetTickersInfo ctx qtisEndpoint (catMaybes $ fmap requestToTicker requests)
forM_ ti (\newInfo -> atomicModifyIORef' r (\s -> (s { tickerInfo = M.insert (tiTicker newInfo) newInfo $! tickerInfo s }, ())))

14
stack.yaml

@ -15,7 +15,7 @@ @@ -15,7 +15,7 @@
# resolver:
# name: custom-snapshot
# location: "./custom-snapshot.yaml"
resolver: lts-17.14
resolver: lts-18.18
# User packages to be built.
# Various formats can be used as shown in the example below.
@ -39,13 +39,19 @@ packages: @@ -39,13 +39,19 @@ packages:
- '.'
- '../libatrade'
- '../zeromq4-haskell-zap'
- '../iconv'
- '../iconv-0.4.1.3'
# Dependency packages to be pulled from upstream that are not in the resolver
# (e.g., acme-missiles-0.3)
extra-deps: [ "datetime-0.3.1", "cond-0.4.1.1", "gi-gtk-3.0.23", "gi-gdk-3.0.16", "gi-gdkpixbuf-2.0.16", "gi-gio-2.0.18", "gi-pango-1.0.16", "text-format-0.3.2", "th-printf-0.5.1"]
extra-deps:
- datetime-0.3.1
- cond-0.4.1.1
- co-log-0.4.0.1@sha256:3d4c17f37693c80d1aa2c41669bc3438fac3e89dc5f479e57d79bc3ddc4dfcc5,5087
- ansi-terminal-0.10.3@sha256:e2fbcef5f980dc234c7ad8e2fa433b0e8109132c9e643bc40ea5608cd5697797,3226
# Override default flag values for local packages and extra-deps
flags: {}
flags:
mintty:
Win32-2-13-1: false
# Extra package databases containing global packages
extra-package-dbs: []

Loading…
Cancel
Save