diff --git a/libatrade.cabal b/libatrade.cabal index 8bb31ec..cdd81aa 100644 --- a/libatrade.cabal +++ b/libatrade.cabal @@ -19,6 +19,7 @@ library exposed-modules: ATrade.Types , ATrade.QuoteSource.Client , ATrade.QuoteSource.Server + , ATrade.Broker.Client , ATrade.Broker.Protocol , ATrade.Broker.Server , ATrade.Util @@ -76,6 +77,7 @@ test-suite libatrade-test default-language: Haskell2010 other-modules: ArbitraryInstances , MockBroker + , TestBrokerClient , TestBrokerProtocol , TestBrokerServer , TestQuoteSourceClient diff --git a/src/ATrade/Broker/Client.hs b/src/ATrade/Broker/Client.hs index 6c08fd9..4656bad 100644 --- a/src/ATrade/Broker/Client.hs +++ b/src/ATrade/Broker/Client.hs @@ -1,6 +1,10 @@ {-# LANGUAGE OverloadedStrings #-} module ATrade.Broker.Client ( + startBrokerClient, + stopBrokerClient, + submitOrder, + cancelOrder ) where import ATrade.Types @@ -9,6 +13,10 @@ import Control.Concurrent hiding (readChan, writeChan) import Control.Concurrent.BoundedChan import Control.Concurrent.MVar import Control.Exception +import Control.Monad +import Data.Aeson +import Data.Int +import Data.IORef import Data.List.NonEmpty import qualified Data.Text as T import qualified Data.ByteString.Lazy as BL @@ -18,15 +26,58 @@ import System.Log.Logger data BrokerClientHandle = BrokerClientHandle { tid :: ThreadId, - completionMvar :: compMv, + completionMvar :: MVar (), submitOrder :: Order -> IO (Either T.Text OrderId), cancelOrder :: OrderId -> IO (Either T.Text ()), cmdVar :: MVar BrokerServerRequest, respVar :: MVar BrokerServerResponse } +brokerClientThread ctx ep cmd resp comp = do + sock <- socket ctx Req + connect sock $ T.unpack ep + finally (brokerClientThread' sock) (cleanup sock) + where + cleanup sock = close sock >> putMVar comp () + brokerClientThread' sock = forever $ do + request <- readMVar cmd + send sock [] (BL.toStrict $ encode request) + maybeResponse <- decode . BL.fromStrict <$> receive sock + case maybeResponse of + Just response -> putMVar resp response + Nothing -> putMVar resp (ResponseError "Unable to decode response") + startBrokerClient :: Context -> T.Text -> IO BrokerClientHandle -startBrokerClient ctx endpoint = undefined +startBrokerClient ctx endpoint = do + idCounter <- newIORef 1 + compMv <- newEmptyMVar + cmdVar <- newEmptyMVar :: IO (MVar BrokerServerRequest) + respVar <- newEmptyMVar :: IO (MVar BrokerServerResponse) + tid <- forkIO (brokerClientThread ctx endpoint cmdVar respVar compMv) + + return BrokerClientHandle { + tid = tid, + completionMvar = compMv, + submitOrder = bcSubmitOrder idCounter cmdVar respVar, + cancelOrder = bcCancelOrder idCounter cmdVar respVar, + cmdVar = cmdVar, + respVar = respVar + } stopBrokerClient :: BrokerClientHandle -> IO () -stopBrokerClient handle = undefined +stopBrokerClient handle = yield >> killThread (tid handle) >> readMVar (completionMvar handle) + +nextId cnt = atomicModifyIORef' cnt (\v -> (v + 1, v)) + +bcSubmitOrder :: IORef Int64 -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> Order -> IO (Either T.Text OrderId) +bcSubmitOrder idCounter cmdVar respVar order = do + sqnum <- nextId idCounter + putMVar cmdVar (RequestSubmitOrder sqnum order) + resp <- readMVar respVar + case resp of + (ResponseOrderSubmitted oid) -> return $ Right oid + (ResponseError msg) -> return $ Left msg + + +bcCancelOrder idCounter cmdVar respVar orderId = undefined + diff --git a/test/Spec.hs b/test/Spec.hs index d34cd42..8d57152 100644 --- a/test/Spec.hs +++ b/test/Spec.hs @@ -1,5 +1,6 @@ import qualified TestTypes +import qualified TestBrokerClient import qualified TestBrokerProtocol import qualified TestBrokerServer import qualified TestQuoteSourceClient @@ -14,7 +15,9 @@ properties :: TestTree properties = testGroup "Properties" [TestTypes.properties, TestBrokerProtocol.properties] unitTests :: TestTree -unitTests = testGroup "Unit-tests" [TestQuoteSourceClient.unitTests +unitTests = testGroup "Unit-tests" [ + TestQuoteSourceClient.unitTests , TestQuoteSourceServer.unitTests - , TestBrokerServer.unitTests] + , TestBrokerServer.unitTests + , TestBrokerClient.unitTests] diff --git a/test/TestBrokerClient.hs b/test/TestBrokerClient.hs index c971e0f..ab5b95d 100644 --- a/test/TestBrokerClient.hs +++ b/test/TestBrokerClient.hs @@ -1,7 +1,7 @@ {-# LANGUAGE OverloadedStrings #-} -module TestBrokerServer ( +module TestBrokerClient ( unitTests ) where @@ -13,7 +13,8 @@ import Test.Tasty.HUnit import ATrade.Types import qualified Data.ByteString as B import qualified Data.ByteString.Lazy as BL -import ATrade.Broker.Server +import ATrade.Broker.Client +import ATrade.Broker.Server hiding (submitOrder) import ATrade.Broker.Protocol import ATrade.Util import qualified Data.Text as T @@ -34,7 +35,7 @@ import Data.UUID as U import Data.UUID.V4 as UV4 import MockBroker -unitTests = testGroup "Broker.Client" [] +unitTests = testGroup "Broker.Client" [testBrokerClientStartStop] makeEndpoint = do uid <- toText <$> UV4.nextRandom @@ -48,11 +49,13 @@ defaultOrder = mkOrder { orderOperation = Buy } -testBrokerClientStartStop = testCase "Broker client starts and stops" $ withContext (\ctx -> do +testBrokerClientStartStop = testCase "Broker client: submit order" $ withContext (\ctx -> do ep <- makeEndpoint (mockBroker, broState) <- mkMockBroker ["demo"] bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS -> - bracket (startBrokerClient ctx ep) stopBrokerClient (\broC -> + bracket (startBrokerClient ctx ep) stopBrokerClient (\broC -> do oid <- submitOrder broC defaultOrder - ))) + case oid of + Left err -> assertFailure "Invalid response" + Right _ -> return ()))) diff --git a/test/TestQuoteSourceClient.hs b/test/TestQuoteSourceClient.hs index b4d76b5..0ed9c85 100644 --- a/test/TestQuoteSourceClient.hs +++ b/test/TestQuoteSourceClient.hs @@ -52,7 +52,9 @@ testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\c timestamp = UTCTime (fromGregorian 2016 9 27) 16000, value = 1000, volume = 1} + yield writeChan chan (Just tick) + yield recvdTick <- readChan clientChan tick @=? recvdTick)))