diff --git a/libatrade.cabal b/libatrade.cabal index 78cd0cd..8bb31ec 100644 --- a/libatrade.cabal +++ b/libatrade.cabal @@ -17,6 +17,7 @@ library hs-source-dirs: src ghc-options: -Wincomplete-patterns exposed-modules: ATrade.Types + , ATrade.QuoteSource.Client , ATrade.QuoteSource.Server , ATrade.Broker.Protocol , ATrade.Broker.Server @@ -77,6 +78,7 @@ test-suite libatrade-test , MockBroker , TestBrokerProtocol , TestBrokerServer + , TestQuoteSourceClient , TestQuoteSourceServer , TestTypes diff --git a/src/ATrade/QuoteSource/Client.hs b/src/ATrade/QuoteSource/Client.hs new file mode 100644 index 0000000..1426e0b --- /dev/null +++ b/src/ATrade/QuoteSource/Client.hs @@ -0,0 +1,43 @@ +{-# LANGUAGE OverloadedStrings #-} + +module ATrade.QuoteSource.Client ( + startQuoteSourceClient, + stopQuoteSourceClient +) where + +import ATrade.Types +import Control.Concurrent hiding (readChan, writeChan) +import Control.Concurrent.BoundedChan +import Control.Concurrent.MVar +import Control.Exception +import Data.List.NonEmpty +import qualified Data.Text as T +import qualified Data.ByteString.Lazy as BL +import Data.Text.Encoding +import System.ZMQ4 +import System.Log.Logger + +data QuoteSourceClientHandle = QuoteSourceClientHandle { + tid :: ThreadId, + completionMvar :: MVar () +} + +startQuoteSourceClient :: BoundedChan Tick -> [T.Text] -> Context -> T.Text -> IO QuoteSourceClientHandle +startQuoteSourceClient chan tickers ctx endpoint = do + compMv <- newEmptyMVar + tid <- forkIO $ do + sock <- socket ctx Sub + connect sock $ T.unpack endpoint + mapM_ (\t -> subscribe sock $ encodeUtf8 t) tickers + finally (clientThread sock) (cleanup compMv sock) + return QuoteSourceClientHandle { tid = tid, completionMvar = compMv } + where + clientThread sock = do + rawTick <- fmap BL.fromStrict <$> receiveMulti sock + case deserializeTick rawTick of + Just tick -> writeChan chan tick + Nothing -> warningM "QuoteSource.Client" "Error: can't deserialize tick" + cleanup compMv sock = close sock >> putMVar compMv () + +stopQuoteSourceClient :: QuoteSourceClientHandle -> IO () +stopQuoteSourceClient handle = yield >> killThread (tid handle) >> readMVar (completionMvar handle) diff --git a/src/ATrade/QuoteSource/Server.hs b/src/ATrade/QuoteSource/Server.hs index 3d81d03..c93b6bc 100644 --- a/src/ATrade/QuoteSource/Server.hs +++ b/src/ATrade/QuoteSource/Server.hs @@ -9,6 +9,7 @@ import Control.Concurrent.BoundedChan import Control.Concurrent hiding (readChan, writeChan) import Control.Exception import Control.Monad +import qualified Data.Text as T import qualified Data.ByteString.Lazy as BL import Data.List.NonEmpty hiding (map) import System.Log.Logger @@ -39,10 +40,10 @@ serverThread state = do sendMulti (outSocket state) $ fromList . map BL.toStrict $ serializeTick tick serverThread' -startQuoteSourceServer :: BoundedChan (Maybe Tick) -> Context -> String -> IO QuoteSourceServer +startQuoteSourceServer :: BoundedChan (Maybe Tick) -> Context -> T.Text -> IO QuoteSourceServer startQuoteSourceServer chan c ep = do sock <- socket c Pub - bind sock ep + bind sock $ T.unpack ep tid <- myThreadId mv <- newEmptyMVar let state = QuoteSourceServerState { diff --git a/test/Spec.hs b/test/Spec.hs index 4f1bcdf..d34cd42 100644 --- a/test/Spec.hs +++ b/test/Spec.hs @@ -2,6 +2,7 @@ import qualified TestTypes import qualified TestBrokerProtocol import qualified TestBrokerServer +import qualified TestQuoteSourceClient import qualified TestQuoteSourceServer import Test.Tasty @@ -13,5 +14,7 @@ properties :: TestTree properties = testGroup "Properties" [TestTypes.properties, TestBrokerProtocol.properties] unitTests :: TestTree -unitTests = testGroup "Unit-tests" [TestQuoteSourceServer.unitTests, TestBrokerServer.unitTests] +unitTests = testGroup "Unit-tests" [TestQuoteSourceClient.unitTests + , TestQuoteSourceServer.unitTests + , TestBrokerServer.unitTests] diff --git a/test/TestQuoteSourceClient.hs b/test/TestQuoteSourceClient.hs new file mode 100644 index 0000000..e794409 --- /dev/null +++ b/test/TestQuoteSourceClient.hs @@ -0,0 +1,43 @@ +{-# LANGUAGE OverloadedStrings #-} + +module TestQuoteSourceClient ( + unitTests +) where + +import Test.Tasty +import Test.Tasty.SmallCheck as SC +import Test.Tasty.QuickCheck as QC +import Test.Tasty.HUnit + +import ATrade.Types +import ATrade.QuoteSource.Server +import ATrade.QuoteSource.Client +import Control.Monad +import Control.Monad.Loops +import Control.Concurrent.MVar +import Control.Concurrent.BoundedChan +import Control.Concurrent hiding (writeChan) +import Control.Exception +import System.ZMQ4 +import Data.Time.Clock +import Data.Time.Calendar +import qualified Data.ByteString.Lazy as BL +import qualified Data.Text as T +import Data.Maybe +import Data.UUID as U +import Data.UUID.V4 as UV4 + +makeEndpoint = do + uid <- toText <$> UV4.nextRandom + return $ "inproc://server" `T.append` uid + +unitTests = testGroup "QuoteSource.Client" [testStartStop] + +testStartStop = testCase "QuoteSource client connects and disconnects" $ withContext (\ctx -> do + ep <- makeEndpoint + chan <- newBoundedChan 1000 + clientChan <- newBoundedChan 1000 + bracket (startQuoteSourceServer chan ctx ep) stopQuoteSourceServer (\qs -> + bracket (startQuoteSourceClient clientChan [] ctx ep) stopQuoteSourceClient (const yield))) + +