You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
120 lines
4.1 KiB
120 lines
4.1 KiB
{-# LANGUAGE OverloadedStrings, OverloadedLabels #-} |
|
module Main where |
|
|
|
import System.IO |
|
|
|
import QuoteSource.DataImport |
|
import Control.Concurrent hiding (readChan, writeChan) |
|
import Control.Monad |
|
import Control.Exception |
|
import Control.Error.Util |
|
import qualified GI.Gtk as Gtk |
|
import Data.GI.Base |
|
import Control.Concurrent.BoundedChan |
|
import ATrade.Types |
|
import QuoteSource.TableParsers.AllParamsTableParser |
|
import QuoteSource.TableParser |
|
import ATrade.QuoteSource.Server |
|
|
|
import ATrade.Broker.TradeSinks.ZMQTradeSink |
|
import ATrade.Broker.TradeSinks.TelegramTradeSink |
|
import ATrade.Broker.Server |
|
import Broker.PaperBroker |
|
import Broker.QuikBroker |
|
|
|
import System.Directory |
|
import System.Timeout |
|
import System.Log.Logger |
|
import System.Log.Handler.Simple |
|
import System.Log.Handler (setFormatter) |
|
import System.Log.Formatter |
|
import System.ZMQ4 |
|
import System.ZMQ4.ZAP |
|
|
|
import qualified Data.Text as T |
|
import Data.Maybe |
|
|
|
import Config |
|
|
|
forkBoundedChan :: Int -> BoundedChan Tick -> IO (ThreadId, BoundedChan Tick, BoundedChan QuoteSourceServerData) |
|
forkBoundedChan size sourceChan = do |
|
sink <- newBoundedChan size |
|
sinkQss <- newBoundedChan size |
|
tid <- forkIO $ forever $ do |
|
v <- readChan sourceChan |
|
writeChan sink v |
|
writeChan sinkQss (QSSTick v) |
|
|
|
return (tid, sink, sinkQss) |
|
|
|
|
|
initLogging :: IO () |
|
initLogging = do |
|
handler <- streamHandler stderr DEBUG >>= |
|
(\x -> return $ |
|
setFormatter x (simpleLogFormatter "$utcTime\t {$loggername} <$prio> -> $msg")) |
|
|
|
hSetBuffering stderr LineBuffering |
|
updateGlobalLogger rootLoggerName (setLevel DEBUG) |
|
updateGlobalLogger rootLoggerName (setHandlers [handler]) |
|
|
|
main :: IO () |
|
main = do |
|
initLogging |
|
infoM "main" "Loading config" |
|
config <- readConfig "quik-connector.config.json" |
|
|
|
infoM "main" "Config loaded" |
|
chan <- newBoundedChan 10000 |
|
infoM "main" "Starting data import server" |
|
_ <- initDataImportServer [MkTableParser $ mkAllParamsTableParser "allparams"] chan "atrade" |
|
|
|
(forkId, c1, c2) <- forkBoundedChan 10000 chan |
|
|
|
brokerP <- mkPaperBroker c1 1000000 ["demo"] |
|
brokerQ <- mkQuikBroker (dllPath config) (quikPath config) (quikAccounts config) |
|
withContext (\ctx -> do |
|
withZapHandler ctx (\zap -> do |
|
zapSetWhitelist zap $ whitelist config |
|
zapSetBlacklist zap $ blacklist config |
|
|
|
case brokerClientCertificateDir config of |
|
Just certFile -> do |
|
certs <- loadCertificatesFromDirectory certFile |
|
forM_ certs (\cert -> zapAddClientCertificate zap cert) |
|
Nothing -> return () |
|
|
|
serverCert <- case brokerServerCertPath config of |
|
Just certFile -> do |
|
eitherCert <- loadCertificateFromFile certFile |
|
case eitherCert of |
|
Left errorMessage -> do |
|
warningM "main" $ "Unable to load server certificate: " ++ errorMessage |
|
return Nothing |
|
Right cert -> return $ Just cert |
|
Nothing -> return Nothing |
|
let serverParams = defaultServerSecurityParams { sspDomain = Just "global", |
|
sspCertificate = serverCert } |
|
|
|
withZMQTradeSink ctx (tradeSink config) (\zmqTradeSink -> do |
|
withTelegramTradeSink (telegramToken config) (telegramChatId config) (\telegramTradeSink -> do |
|
bracket (startQuoteSourceServer c2 ctx (T.pack $ quotesourceEndpoint config)) stopQuoteSourceServer (\_ -> do |
|
bracket (startBrokerServer [brokerP, brokerQ] ctx (T.pack $ brokerserverEndpoint config) [telegramTradeSink, zmqTradeSink] serverParams) stopBrokerServer (\_ -> do |
|
void $ Gtk.init Nothing |
|
window <- new Gtk.Window [ #title := "Quik connector" ] |
|
void $ on window #destroy Gtk.mainQuit |
|
#showAll window |
|
Gtk.main) |
|
infoM "main" "BRS down") |
|
debugM "main" "QS done") |
|
debugM "main" "TGTS done") |
|
debugM "main" "ZMQTS done") |
|
debugM "main" "ZAP done") |
|
void $ timeout 1000000 $ killThread forkId |
|
infoM "main" "Main thread done" |
|
|
|
loadCertificatesFromDirectory :: FilePath -> IO [CurveCertificate] |
|
loadCertificatesFromDirectory filepath = do |
|
files <- listDirectory filepath |
|
catMaybes <$> forM files (\file -> hush <$> loadCertificateFromFile file) |
|
|
|
|