5 changed files with 95 additions and 3 deletions
@ -0,0 +1,43 @@ |
|||||||
|
{-# LANGUAGE OverloadedStrings #-} |
||||||
|
|
||||||
|
module ATrade.QuoteSource.Client ( |
||||||
|
startQuoteSourceClient, |
||||||
|
stopQuoteSourceClient |
||||||
|
) where |
||||||
|
|
||||||
|
import ATrade.Types |
||||||
|
import Control.Concurrent hiding (readChan, writeChan) |
||||||
|
import Control.Concurrent.BoundedChan |
||||||
|
import Control.Concurrent.MVar |
||||||
|
import Control.Exception |
||||||
|
import Data.List.NonEmpty |
||||||
|
import qualified Data.Text as T |
||||||
|
import qualified Data.ByteString.Lazy as BL |
||||||
|
import Data.Text.Encoding |
||||||
|
import System.ZMQ4 |
||||||
|
import System.Log.Logger |
||||||
|
|
||||||
|
data QuoteSourceClientHandle = QuoteSourceClientHandle { |
||||||
|
tid :: ThreadId, |
||||||
|
completionMvar :: MVar () |
||||||
|
} |
||||||
|
|
||||||
|
startQuoteSourceClient :: BoundedChan Tick -> [T.Text] -> Context -> T.Text -> IO QuoteSourceClientHandle |
||||||
|
startQuoteSourceClient chan tickers ctx endpoint = do |
||||||
|
compMv <- newEmptyMVar |
||||||
|
tid <- forkIO $ do |
||||||
|
sock <- socket ctx Sub |
||||||
|
connect sock $ T.unpack endpoint |
||||||
|
mapM_ (\t -> subscribe sock $ encodeUtf8 t) tickers |
||||||
|
finally (clientThread sock) (cleanup compMv sock) |
||||||
|
return QuoteSourceClientHandle { tid = tid, completionMvar = compMv } |
||||||
|
where |
||||||
|
clientThread sock = do |
||||||
|
rawTick <- fmap BL.fromStrict <$> receiveMulti sock |
||||||
|
case deserializeTick rawTick of |
||||||
|
Just tick -> writeChan chan tick |
||||||
|
Nothing -> warningM "QuoteSource.Client" "Error: can't deserialize tick" |
||||||
|
cleanup compMv sock = close sock >> putMVar compMv () |
||||||
|
|
||||||
|
stopQuoteSourceClient :: QuoteSourceClientHandle -> IO () |
||||||
|
stopQuoteSourceClient handle = yield >> killThread (tid handle) >> readMVar (completionMvar handle) |
||||||
@ -0,0 +1,43 @@ |
|||||||
|
{-# LANGUAGE OverloadedStrings #-} |
||||||
|
|
||||||
|
module TestQuoteSourceClient ( |
||||||
|
unitTests |
||||||
|
) where |
||||||
|
|
||||||
|
import Test.Tasty |
||||||
|
import Test.Tasty.SmallCheck as SC |
||||||
|
import Test.Tasty.QuickCheck as QC |
||||||
|
import Test.Tasty.HUnit |
||||||
|
|
||||||
|
import ATrade.Types |
||||||
|
import ATrade.QuoteSource.Server |
||||||
|
import ATrade.QuoteSource.Client |
||||||
|
import Control.Monad |
||||||
|
import Control.Monad.Loops |
||||||
|
import Control.Concurrent.MVar |
||||||
|
import Control.Concurrent.BoundedChan |
||||||
|
import Control.Concurrent hiding (writeChan) |
||||||
|
import Control.Exception |
||||||
|
import System.ZMQ4 |
||||||
|
import Data.Time.Clock |
||||||
|
import Data.Time.Calendar |
||||||
|
import qualified Data.ByteString.Lazy as BL |
||||||
|
import qualified Data.Text as T |
||||||
|
import Data.Maybe |
||||||
|
import Data.UUID as U |
||||||
|
import Data.UUID.V4 as UV4 |
||||||
|
|
||||||
|
makeEndpoint = do |
||||||
|
uid <- toText <$> UV4.nextRandom |
||||||
|
return $ "inproc://server" `T.append` uid |
||||||
|
|
||||||
|
unitTests = testGroup "QuoteSource.Client" [testStartStop] |
||||||
|
|
||||||
|
testStartStop = testCase "QuoteSource client connects and disconnects" $ withContext (\ctx -> do |
||||||
|
ep <- makeEndpoint |
||||||
|
chan <- newBoundedChan 1000 |
||||||
|
clientChan <- newBoundedChan 1000 |
||||||
|
bracket (startQuoteSourceServer chan ctx ep) stopQuoteSourceServer (\qs -> |
||||||
|
bracket (startQuoteSourceClient clientChan [] ctx ep) stopQuoteSourceClient (const yield))) |
||||||
|
|
||||||
|
|
||||||
Loading…
Reference in new issue