From 8e7b97b47e030f35b2969749ecc894384889d84a Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Tue, 27 Sep 2016 16:21:58 +0700 Subject: [PATCH] Basic QuoteSource Server --- libatrade.cabal | 9 +++++ src/ATrade/QuoteSource/Server.hs | 60 ++++++++++++++++++++++++++++++++ test/Spec.hs | 12 ++++--- test/TestQuoteSourceServer.hs | 50 ++++++++++++++++++++++++++ 4 files changed, 127 insertions(+), 4 deletions(-) create mode 100644 src/ATrade/QuoteSource/Server.hs create mode 100644 test/TestQuoteSourceServer.hs diff --git a/libatrade.cabal b/libatrade.cabal index 3c8a193..5c293e6 100644 --- a/libatrade.cabal +++ b/libatrade.cabal @@ -16,6 +16,7 @@ cabal-version: >=1.10 library hs-source-dirs: src exposed-modules: ATrade.Types + , ATrade.QuoteSource.Server build-depends: base >= 4.7 && < 5 , Decimal , time @@ -24,6 +25,9 @@ library , text , binary , aeson + , BoundedChan + , hslogger + , zeromq4-haskell default-language: Haskell2010 executable libatrade-exe @@ -55,6 +59,11 @@ test-suite libatrade-test , time , aeson , text + , BoundedChan + , hslogger + , zeromq4-haskell + , bytestring + , monad-loops ghc-options: -threaded -rtsopts -with-rtsopts=-N default-language: Haskell2010 diff --git a/src/ATrade/QuoteSource/Server.hs b/src/ATrade/QuoteSource/Server.hs new file mode 100644 index 0000000..3d81d03 --- /dev/null +++ b/src/ATrade/QuoteSource/Server.hs @@ -0,0 +1,60 @@ + +module ATrade.QuoteSource.Server ( + startQuoteSourceServer, + stopQuoteSourceServer +) where + +import ATrade.Types +import Control.Concurrent.BoundedChan +import Control.Concurrent hiding (readChan, writeChan) +import Control.Exception +import Control.Monad +import qualified Data.ByteString.Lazy as BL +import Data.List.NonEmpty hiding (map) +import System.Log.Logger +import System.ZMQ4 + +data QuoteSourceServer = QuoteSourceServerState { + ctx :: Context, + outSocket :: Socket Pub, + tickChannel :: BoundedChan (Maybe Tick), + completionMvar :: MVar (), + serverThreadId :: ThreadId +} + +serverThread :: QuoteSourceServer -> IO () +serverThread state = do + finally serverThread' cleanup + debugM "QuoteSource" "server thread done" + where + cleanup = do + close $ outSocket state + putMVar (completionMvar state) () + + serverThread' = do + maybeTick <- readChan $ tickChannel state + case maybeTick of + Nothing -> return () + Just tick -> do + sendMulti (outSocket state) $ fromList . map BL.toStrict $ serializeTick tick + serverThread' + +startQuoteSourceServer :: BoundedChan (Maybe Tick) -> Context -> String -> IO QuoteSourceServer +startQuoteSourceServer chan c ep = do + sock <- socket c Pub + bind sock ep + tid <- myThreadId + mv <- newEmptyMVar + let state = QuoteSourceServerState { + ctx = c, + outSocket = sock, + tickChannel = chan, + completionMvar = mv, + serverThreadId = tid + } + stid <- forkIO $ serverThread state + return $ state { serverThreadId = stid } + +stopQuoteSourceServer :: QuoteSourceServer -> IO () +stopQuoteSourceServer server = writeChan (tickChannel server) Nothing >> readMVar (completionMvar server) + diff --git a/test/Spec.hs b/test/Spec.hs index dad28cc..4cbd795 100644 --- a/test/Spec.hs +++ b/test/Spec.hs @@ -1,11 +1,15 @@ -import TestTypes +import qualified TestTypes +import qualified TestQuoteSourceServer import Test.Tasty main :: IO () -main = defaultMain tests +main = defaultMain $ testGroup "Tests" [properties, unitTests] -tests :: TestTree -tests = testGroup "Tests" [TestTypes.properties] +properties :: TestTree +properties = testGroup "Properties" [TestTypes.properties] + +unitTests :: TestTree +unitTests = testGroup "Unit-tests" [TestQuoteSourceServer.unitTests] diff --git a/test/TestQuoteSourceServer.hs b/test/TestQuoteSourceServer.hs new file mode 100644 index 0000000..2851975 --- /dev/null +++ b/test/TestQuoteSourceServer.hs @@ -0,0 +1,50 @@ +{-# LANGUAGE OverloadedStrings #-} + +module TestQuoteSourceServer ( + unitTests +) where + +import Test.Tasty +import Test.Tasty.SmallCheck as SC +import Test.Tasty.QuickCheck as QC +import Test.Tasty.HUnit + +import ATrade.Types +import qualified Data.ByteString.Lazy as BL +import ATrade.QuoteSource.Server +import Control.Monad +import Control.Monad.Loops +import Control.Concurrent.MVar +import Control.Concurrent.BoundedChan +import Control.Concurrent hiding (writeChan) +import Control.Exception +import System.ZMQ4 +import Data.Time.Clock +import Data.Time.Calendar +import Data.Maybe + +unitTests = testGroup "QuoteSource.Server" [testStartStop, testTickStream] + +testStartStop = testCase "QuoteSource Server starts and stops" $ withContext (\ctx -> do + chan <- newBoundedChan 1000 + qss <- startQuoteSourceServer chan ctx "inproc://quotesource-server" + stopQuoteSourceServer qss) + +testTickStream = testCase "QuoteSource Server sends ticks" $ withContext (\ctx -> do + chan <- newBoundedChan 1000 + bracket (startQuoteSourceServer chan ctx "inproc://quotesource-server") stopQuoteSourceServer (\qs -> + withSocket ctx Sub (\s -> do + connect s "inproc://quotesource-server" + subscribe s "FOOBAR" + let tick = Tick { + security = "FOOBAR", + datatype = Price, + timestamp = UTCTime (fromGregorian 2016 9 27) 16000, + value = 1000, + volume = 1} + tryWriteChan chan (Just tick) + packet <- fmap BL.fromStrict <$> receiveMulti s + case deserializeTick packet of + Just recvdTick -> tick @=? recvdTick + Nothing -> assertFailure "Unable to deserialize tick"))) +