Browse Source

Various fixes

master
Denis Tereshkin 9 years ago
parent
commit
f22dd216b8
  1. 5
      src/ATrade/QuoteSource/Client.hs
  2. 12
      src/ATrade/QuoteSource/Server.hs
  3. 11
      src/ATrade/Types.hs

5
src/ATrade/QuoteSource/Client.hs

@ -43,6 +43,8 @@ startQuoteSourceClient chan tickers ctx endpoint = do
mapM_ (\t -> subscribe sock $ encodeUtf8 t) tickers mapM_ (\t -> subscribe sock $ encodeUtf8 t) tickers
subscribe sock $ B8.pack "SYSTEM#HEARTBEAT" subscribe sock $ B8.pack "SYSTEM#HEARTBEAT"
now <- getCurrentTime
writeIORef lastHeartbeat now
whileM_ (notTimeout lastHeartbeat) $ do whileM_ (notTimeout lastHeartbeat) $ do
evs <- poll 200 [Sock sock [In] Nothing] evs <- poll 200 [Sock sock [In] Nothing]
when ((L.length . L.head) evs > 0) $ do when ((L.length . L.head) evs > 0) $ do
@ -53,7 +55,8 @@ startQuoteSourceClient chan tickers ctx endpoint = do
then writeIORef lastHeartbeat now then writeIORef lastHeartbeat now
else case deserializeTick rawTick of else case deserializeTick rawTick of
Just tick -> writeChan chan tick Just tick -> writeChan chan tick
Nothing -> warningM "QuoteSource.Client" "Error: can't deserialize tick") Nothing -> warningM "QuoteSource.Client" "Error: can't deserialize tick"
debugM "QuoteSource.Client" "Heartbeat timeout")
notTimeout ts = do notTimeout ts = do
now <- getCurrentTime now <- getCurrentTime

12
src/ATrade/QuoteSource/Server.hs

@ -22,7 +22,8 @@ data QuoteSourceServer = QuoteSourceServerState {
outSocket :: Socket Pub, outSocket :: Socket Pub,
tickChannel :: BoundedChan QuoteSourceServerData, tickChannel :: BoundedChan QuoteSourceServerData,
completionMvar :: MVar (), completionMvar :: MVar (),
serverThreadId :: ThreadId serverThreadId :: ThreadId,
heartbeatThreadId :: ThreadId
} }
data QuoteSourceServerData = QSSTick Tick | QSSHeartbeat | QSSKill data QuoteSourceServerData = QSSTick Tick | QSSHeartbeat | QSSKill
@ -53,17 +54,22 @@ startQuoteSourceServer chan c ep = do
sock <- socket c Pub sock <- socket c Pub
bind sock $ T.unpack ep bind sock $ T.unpack ep
tid <- myThreadId tid <- myThreadId
hbTid <- forkIO $ forever $ do
threadDelay 1000000
writeChan chan QSSHeartbeat
mv <- newEmptyMVar mv <- newEmptyMVar
let state = QuoteSourceServerState { let state = QuoteSourceServerState {
ctx = c, ctx = c,
outSocket = sock, outSocket = sock,
tickChannel = chan, tickChannel = chan,
completionMvar = mv, completionMvar = mv,
serverThreadId = tid serverThreadId = tid,
heartbeatThreadId = hbTid
} }
stid <- forkIO $ serverThread state stid <- forkIO $ serverThread state
return $ state { serverThreadId = stid } return $ state { serverThreadId = stid }
stopQuoteSourceServer :: QuoteSourceServer -> IO () stopQuoteSourceServer :: QuoteSourceServer -> IO ()
stopQuoteSourceServer server = writeChan (tickChannel server) QSSKill >> readMVar (completionMvar server) stopQuoteSourceServer server = killThread (heartbeatThreadId server) >> writeChan (tickChannel server) QSSKill >> readMVar (completionMvar server)

11
src/ATrade/Types.hs

@ -3,6 +3,7 @@
module ATrade.Types ( module ATrade.Types (
TickerId, TickerId,
Tick(..), Tick(..),
Bar(..),
DataType(..), DataType(..),
serializeTick, serializeTick,
deserializeTick, deserializeTick,
@ -136,6 +137,16 @@ deserializeTick (header:rawData:_) = case runGetOrFail parseTick rawData of
deserializeTick _ = Nothing deserializeTick _ = Nothing
data Bar = Bar {
barSecurity :: !T.Text,
barTimestamp :: !UTCTime,
barOpen :: !Decimal,
barHigh :: !Decimal,
barLow :: !Decimal,
barClose :: !Decimal,
barVolume :: !Integer
} deriving (Show, Eq)
data SignalId = SignalId { data SignalId = SignalId {
strategyId :: T.Text, strategyId :: T.Text,
signalName :: T.Text, signalName :: T.Text,

Loading…
Cancel
Save