4 changed files with 127 additions and 4 deletions
@ -0,0 +1,60 @@
@@ -0,0 +1,60 @@
|
||||
|
||||
module ATrade.QuoteSource.Server ( |
||||
startQuoteSourceServer, |
||||
stopQuoteSourceServer |
||||
) where |
||||
|
||||
import ATrade.Types |
||||
import Control.Concurrent.BoundedChan |
||||
import Control.Concurrent hiding (readChan, writeChan) |
||||
import Control.Exception |
||||
import Control.Monad |
||||
import qualified Data.ByteString.Lazy as BL |
||||
import Data.List.NonEmpty hiding (map) |
||||
import System.Log.Logger |
||||
import System.ZMQ4 |
||||
|
||||
data QuoteSourceServer = QuoteSourceServerState { |
||||
ctx :: Context, |
||||
outSocket :: Socket Pub, |
||||
tickChannel :: BoundedChan (Maybe Tick), |
||||
completionMvar :: MVar (), |
||||
serverThreadId :: ThreadId |
||||
} |
||||
|
||||
serverThread :: QuoteSourceServer -> IO () |
||||
serverThread state = do |
||||
finally serverThread' cleanup |
||||
debugM "QuoteSource" "server thread done" |
||||
where |
||||
cleanup = do |
||||
close $ outSocket state |
||||
putMVar (completionMvar state) () |
||||
|
||||
serverThread' = do |
||||
maybeTick <- readChan $ tickChannel state |
||||
case maybeTick of |
||||
Nothing -> return () |
||||
Just tick -> do |
||||
sendMulti (outSocket state) $ fromList . map BL.toStrict $ serializeTick tick |
||||
serverThread' |
||||
|
||||
startQuoteSourceServer :: BoundedChan (Maybe Tick) -> Context -> String -> IO QuoteSourceServer |
||||
startQuoteSourceServer chan c ep = do |
||||
sock <- socket c Pub |
||||
bind sock ep |
||||
tid <- myThreadId |
||||
mv <- newEmptyMVar |
||||
let state = QuoteSourceServerState { |
||||
ctx = c, |
||||
outSocket = sock, |
||||
tickChannel = chan, |
||||
completionMvar = mv, |
||||
serverThreadId = tid |
||||
} |
||||
stid <- forkIO $ serverThread state |
||||
return $ state { serverThreadId = stid } |
||||
|
||||
stopQuoteSourceServer :: QuoteSourceServer -> IO () |
||||
stopQuoteSourceServer server = writeChan (tickChannel server) Nothing >> readMVar (completionMvar server) |
||||
|
||||
@ -1,11 +1,15 @@
@@ -1,11 +1,15 @@
|
||||
|
||||
import TestTypes |
||||
import qualified TestTypes |
||||
import qualified TestQuoteSourceServer |
||||
|
||||
import Test.Tasty |
||||
|
||||
main :: IO () |
||||
main = defaultMain tests |
||||
main = defaultMain $ testGroup "Tests" [properties, unitTests] |
||||
|
||||
tests :: TestTree |
||||
tests = testGroup "Tests" [TestTypes.properties] |
||||
properties :: TestTree |
||||
properties = testGroup "Properties" [TestTypes.properties] |
||||
|
||||
unitTests :: TestTree |
||||
unitTests = testGroup "Unit-tests" [TestQuoteSourceServer.unitTests] |
||||
|
||||
|
||||
@ -0,0 +1,50 @@
@@ -0,0 +1,50 @@
|
||||
{-# LANGUAGE OverloadedStrings #-} |
||||
|
||||
module TestQuoteSourceServer ( |
||||
unitTests |
||||
) where |
||||
|
||||
import Test.Tasty |
||||
import Test.Tasty.SmallCheck as SC |
||||
import Test.Tasty.QuickCheck as QC |
||||
import Test.Tasty.HUnit |
||||
|
||||
import ATrade.Types |
||||
import qualified Data.ByteString.Lazy as BL |
||||
import ATrade.QuoteSource.Server |
||||
import Control.Monad |
||||
import Control.Monad.Loops |
||||
import Control.Concurrent.MVar |
||||
import Control.Concurrent.BoundedChan |
||||
import Control.Concurrent hiding (writeChan) |
||||
import Control.Exception |
||||
import System.ZMQ4 |
||||
import Data.Time.Clock |
||||
import Data.Time.Calendar |
||||
import Data.Maybe |
||||
|
||||
unitTests = testGroup "QuoteSource.Server" [testStartStop, testTickStream] |
||||
|
||||
testStartStop = testCase "QuoteSource Server starts and stops" $ withContext (\ctx -> do |
||||
chan <- newBoundedChan 1000 |
||||
qss <- startQuoteSourceServer chan ctx "inproc://quotesource-server" |
||||
stopQuoteSourceServer qss) |
||||
|
||||
testTickStream = testCase "QuoteSource Server sends ticks" $ withContext (\ctx -> do |
||||
chan <- newBoundedChan 1000 |
||||
bracket (startQuoteSourceServer chan ctx "inproc://quotesource-server") stopQuoteSourceServer (\qs -> |
||||
withSocket ctx Sub (\s -> do |
||||
connect s "inproc://quotesource-server" |
||||
subscribe s "FOOBAR" |
||||
let tick = Tick { |
||||
security = "FOOBAR", |
||||
datatype = Price, |
||||
timestamp = UTCTime (fromGregorian 2016 9 27) 16000, |
||||
value = 1000, |
||||
volume = 1} |
||||
tryWriteChan chan (Just tick) |
||||
packet <- fmap BL.fromStrict <$> receiveMulti s |
||||
case deserializeTick packet of |
||||
Just recvdTick -> tick @=? recvdTick |
||||
Nothing -> assertFailure "Unable to deserialize tick"))) |
||||
|
||||
Loading…
Reference in new issue