diff --git a/atradebot.cabal b/atradebot.cabal index d4aecb8..7577d60 100644 --- a/atradebot.cabal +++ b/atradebot.cabal @@ -33,3 +33,4 @@ executable atradebot , hslogger , th-printf , BoundedChan + , monad-loops diff --git a/src/Bot.hs b/src/Bot.hs index 4d485e5..eede38a 100644 --- a/src/Bot.hs +++ b/src/Bot.hs @@ -15,8 +15,10 @@ import ATrade.Types import Config import Control.Concurrent import qualified Control.Concurrent.BoundedChan as BC +import Control.Concurrent.MVar import Control.Exception import Control.Monad +import Control.Monad.Loops import Data.Aeson import qualified Data.ByteString.Lazy as BL import Data.Default @@ -33,22 +35,28 @@ import System.ZMQ4 hiding (message) import Text.Printf.TH startBot :: Config -> IO () -startBot conf = do +startBot conf = forever $ do + killMVar <- newEmptyMVar mbsess <- session (cHostname conf) (Just ((\_ -> [scramSha1 (cUsername conf) Nothing (cPassword conf)]), Just "bot")) - def { onConnectionClosed = onConnectionClosed' } + def { onConnectionClosed = onConnectionClosed' killMVar } case mbsess of Left err -> print err Right sess -> do sendPresence presenceOnline sess withContext $ \ctx -> do - forkIO $ threadSinkThread sess ctx + tid1 <- forkIO $ threadSinkThread sess ctx priceMapRef <- newIORef M.empty - forkIO $ quotesourceThread priceMapRef ctx - handleCommands sess priceMapRef + tid2 <- forkIO $ quotesourceThread priceMapRef ctx + tid3 <- forkIO $ handleCommands sess priceMapRef + (threadDelay 1000000) `untilM` (not <$> isEmptyMVar killMVar) + killThread tid1 + killThread tid2 + killThread tid3 + where - onConnectionClosed' sess _ = void $ reconnect' sess + onConnectionClosed' killMVar sess _ = putMVar killMVar () threadSinkThread sess ctx = do noticeM "ATradeBot.TradeSink" "Started trade sink thread" @@ -73,12 +81,14 @@ startBot conf = do quotesourceThread ref ctx = do tickChan <- BC.newBoundedChan 1000 - bracket (startQuoteSourceClient tickChan (cTickers conf) ctx (cQuoteSourceEndpoint conf)) stopQuoteSourceClient $ \_ -> + bracket (startQuoteSourceClient tickChan (cTickers conf) ctx (cQuoteSourceEndpoint conf) defaultClientSecurityParams) stopQuoteSourceClient $ \_ -> forever $ do - tick <- BC.readChan tickChan - when (datatype tick == LastTradePrice) $ do - --debugM "ATradeBot.QS" $ "Incoming tick: " ++ show tick - atomicModifyIORef' ref (\s -> ((M.insert (security tick) tick s), ())) + qsdata <- BC.readChan tickChan + case qsdata of + QDTick tick -> when (datatype tick == LastTradePrice) $ do + --debugM "ATradeBot.QS" $ "Incoming tick: " ++ show tick + atomicModifyIORef' ref (\s -> ((M.insert (security tick) tick s), ())) + _ -> return () handleCommands sess ref = forever $ do