Execution layer for algorithmic trading
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

57 lines
2.2 KiB

7 years ago
{-# LANGUAGE BangPatterns #-}
module ATrade.Driver.Real.QuoteSourceThread
(
startQuoteSourceThread
) where
7 years ago
import ATrade.BarAggregator
import ATrade.Driver.Real.Types
import ATrade.QuoteSource.Client
import ATrade.RoboCom.Monad
import ATrade.RoboCom.Types
import ATrade.Types
7 years ago
7 years ago
import Data.IORef
import Data.Maybe
7 years ago
import qualified Data.Text as T
7 years ago
7 years ago
import Control.Concurrent hiding (readChan, writeChan,
writeList2Chan, yield)
import Control.Concurrent.BoundedChan
import Control.Exception
import Control.Monad
7 years ago
7 years ago
import System.Log.Logger
import System.ZMQ4 hiding (Event)
7 years ago
startQuoteSourceThread :: Context -> T.Text -> Strategy c s -> BoundedChan Event -> IORef BarAggregator -> (Tick -> Bool) -> Maybe Int -> IO ThreadId
startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter maybeSourceTimeframe = forkIO $ do
7 years ago
tickChan <- newBoundedChan 1000
bracket (startQuoteSourceClient tickChan tickersList ctx qsEp)
7 years ago
(\qs -> do
stopQuoteSourceClient qs
debugM "Strategy" "Quotesource client: stop")
(\_ -> forever $ do
qdata <- readChan tickChan
case qdata of
QDTick tick -> when (goodTick tick) $ do
writeChan eventChan (NewTick tick)
when (isNothing maybeSourceTimeframe) $ do
aggValue <- readIORef agg
case handleTick tick aggValue of
(Just bar, !newAggValue) -> writeIORef agg newAggValue >> writeChan eventChan (NewBar bar)
(Nothing, !newAggValue) -> writeIORef agg newAggValue
QDBar (_, bar) -> do
aggValue <- readIORef agg
when (isJust maybeSourceTimeframe) $ do
case handleBar bar aggValue of
(Just bar', !newAggValue) -> writeIORef agg newAggValue >> writeChan eventChan (NewBar bar')
(Nothing, !newAggValue) -> writeIORef agg newAggValue)
7 years ago
where
goodTick tick = tickFilter tick &&
(datatype tick /= LastTradePrice || (datatype tick == LastTradePrice && volume tick > 0))
tickersList = fmap code . (tickers . strategyInstanceParams) $ strategy