Browse Source

Minor fix in reconnection logic

master
Denis Tereshkin 6 years ago
parent
commit
c354ca6668
  1. 1
      atradebot.cabal
  2. 32
      src/Bot.hs

1
atradebot.cabal

@ -33,3 +33,4 @@ executable atradebot @@ -33,3 +33,4 @@ executable atradebot
, hslogger
, th-printf
, BoundedChan
, monad-loops

32
src/Bot.hs

@ -15,8 +15,10 @@ import ATrade.Types @@ -15,8 +15,10 @@ import ATrade.Types
import Config
import Control.Concurrent
import qualified Control.Concurrent.BoundedChan as BC
import Control.Concurrent.MVar
import Control.Exception
import Control.Monad
import Control.Monad.Loops
import Data.Aeson
import qualified Data.ByteString.Lazy as BL
import Data.Default
@ -33,22 +35,28 @@ import System.ZMQ4 hiding (message) @@ -33,22 +35,28 @@ import System.ZMQ4 hiding (message)
import Text.Printf.TH
startBot :: Config -> IO ()
startBot conf = do
startBot conf = forever $ do
killMVar <- newEmptyMVar
mbsess <- session
(cHostname conf)
(Just ((\_ -> [scramSha1 (cUsername conf) Nothing (cPassword conf)]), Just "bot"))
def { onConnectionClosed = onConnectionClosed' }
def { onConnectionClosed = onConnectionClosed' killMVar }
case mbsess of
Left err -> print err
Right sess -> do
sendPresence presenceOnline sess
withContext $ \ctx -> do
forkIO $ threadSinkThread sess ctx
tid1 <- forkIO $ threadSinkThread sess ctx
priceMapRef <- newIORef M.empty
forkIO $ quotesourceThread priceMapRef ctx
handleCommands sess priceMapRef
tid2 <- forkIO $ quotesourceThread priceMapRef ctx
tid3 <- forkIO $ handleCommands sess priceMapRef
(threadDelay 1000000) `untilM` (not <$> isEmptyMVar killMVar)
killThread tid1
killThread tid2
killThread tid3
where
onConnectionClosed' sess _ = void $ reconnect' sess
onConnectionClosed' killMVar sess _ = putMVar killMVar ()
threadSinkThread sess ctx = do
noticeM "ATradeBot.TradeSink" "Started trade sink thread"
@ -73,12 +81,14 @@ startBot conf = do @@ -73,12 +81,14 @@ startBot conf = do
quotesourceThread ref ctx = do
tickChan <- BC.newBoundedChan 1000
bracket (startQuoteSourceClient tickChan (cTickers conf) ctx (cQuoteSourceEndpoint conf)) stopQuoteSourceClient $ \_ ->
bracket (startQuoteSourceClient tickChan (cTickers conf) ctx (cQuoteSourceEndpoint conf) defaultClientSecurityParams) stopQuoteSourceClient $ \_ ->
forever $ do
tick <- BC.readChan tickChan
when (datatype tick == LastTradePrice) $ do
--debugM "ATradeBot.QS" $ "Incoming tick: " ++ show tick
atomicModifyIORef' ref (\s -> ((M.insert (security tick) tick s), ()))
qsdata <- BC.readChan tickChan
case qsdata of
QDTick tick -> when (datatype tick == LastTradePrice) $ do
--debugM "ATradeBot.QS" $ "Incoming tick: " ++ show tick
atomicModifyIORef' ref (\s -> ((M.insert (security tick) tick s), ()))
_ -> return ()
handleCommands sess ref = forever $ do

Loading…
Cancel
Save