Browse Source

BrokerCLient: hslogger => co-log

master
Denis Tereshkin 4 years ago
parent
commit
e45d51f6a3
  1. 130
      src/ATrade/Broker/Client.hs

130
src/ATrade/Broker/Client.hs

@ -10,30 +10,57 @@ module ATrade.Broker.Client (
NotificationCallback(..) NotificationCallback(..)
) where ) where
import ATrade.Broker.Protocol import ATrade.Broker.Protocol (BrokerServerRequest (..),
import ATrade.Types BrokerServerResponse (..),
import Control.Concurrent hiding (readChan, writeChan) ClientIdentity, Notification,
import Control.Concurrent.BoundedChan NotificationSqnum (NotificationSqnum),
import Control.Concurrent.MVar RequestSqnum,
import Control.Exception getNotificationSqnum,
import Control.Monad nextSqnum)
import Control.Monad.Loops import ATrade.Logging (Message,
import Data.Aeson Severity (Debug, Info, Warning),
logWith)
import ATrade.Types (ClientSecurityParams (cspCertificate, cspServerCertificate),
Order, OrderId)
import Colog (LogAction)
import Control.Concurrent (MVar, ThreadId, forkIO,
killThread, newEmptyMVar,
putMVar, readMVar, takeMVar,
threadDelay, tryReadMVar,
yield)
import Control.Concurrent.BoundedChan ()
import Control.Concurrent.MVar ()
import Control.Exception (SomeException, finally, handle,
throwIO)
import Control.Monad (forM_, when)
import Control.Monad.Loops (andM, whileM_)
import Data.Aeson (decode, encode)
import qualified Data.ByteString as B import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL import qualified Data.ByteString.Lazy as BL
import Data.Int import Data.Int (Int64)
import Data.IORef import Data.IORef (IORef, atomicModifyIORef',
atomicWriteIORef, newIORef,
readIORef, writeIORef)
import qualified Data.List as L import qualified Data.List as L
import Data.List.NonEmpty import Data.List.NonEmpty ()
import Data.Maybe import Data.Maybe (isNothing)
import qualified Data.Text as T import qualified Data.Text as T
import Data.Text.Encoding import Data.Text.Encoding (decodeUtf8)
import qualified Data.Text.Encoding as T import qualified Data.Text.Encoding as T
import Safe import Safe (lastMay)
import System.Log.Logger import System.Timeout (timeout)
import System.Timeout import System.ZMQ4 (Context, Event (In),
import System.ZMQ4 Poll (Sock), Req (Req),
import System.ZMQ4.ZAP Sub (Sub), Switch (On),
connect, poll, receive,
receiveMulti, restrict, send,
setLinger, setTcpKeepAlive,
setTcpKeepAliveCount,
setTcpKeepAliveIdle,
setTcpKeepAliveInterval,
subscribe, withSocket)
import System.ZMQ4.ZAP (zapApplyCertificate,
zapSetServerCertificate)
type NotificationCallback = Notification -> IO () type NotificationCallback = Notification -> IO ()
@ -50,17 +77,26 @@ data BrokerClientHandle = BrokerClientHandle {
notificationThreadId :: ThreadId notificationThreadId :: ThreadId
} }
brokerClientThread :: B.ByteString -> Context -> T.Text -> MVar (BrokerServerRequest, MVar BrokerServerResponse) -> MVar () -> MVar () -> ClientSecurityParams -> IO () brokerClientThread :: B.ByteString ->
brokerClientThread socketIdentity ctx ep cmd comp killMv secParams = finally brokerClientThread' cleanup Context ->
T.Text ->
MVar (BrokerServerRequest, MVar BrokerServerResponse) ->
MVar () ->
MVar () ->
ClientSecurityParams ->
LogAction IO Message ->
IO ()
brokerClientThread socketIdentity ctx ep cmd comp killMv secParams logger = finally brokerClientThread' cleanup
where where
cleanup = infoM "Broker.Client" "Quitting broker client thread" >> putMVar comp () log = logWith logger
cleanup = log Info "Broker.Client" "Quitting broker client thread" >> putMVar comp ()
brokerClientThread' = whileM_ (isNothing <$> tryReadMVar killMv) $ do brokerClientThread' = whileM_ (isNothing <$> tryReadMVar killMv) $ do
debugM "Broker.Client" "Starting event loop" log Debug "Broker.Client" "Starting event loop"
handle (\e -> do handle (\e -> do
warningM "Broker.Client" $ "Broker client: exception: " ++ show (e :: SomeException) ++ "; isZMQ: " ++ show (isZMQError e) log Warning "Broker.Client" $ "Broker client: exception: " <> (T.pack . show) (e :: SomeException) <> "; isZMQ: " <> (T.pack . show) (isZMQError e)
if isZMQError e if isZMQError e
then do then do
debugM "Broker.Client" "Rethrowing exception" log Debug "Broker.Client" "Rethrowing exception"
throwIO e throwIO e
else do else do
return ()) $ withSocket ctx Req (\sock -> do return ()) $ withSocket ctx Req (\sock -> do
@ -74,7 +110,7 @@ brokerClientThread socketIdentity ctx ep cmd comp killMv secParams = finally bro
Nothing -> return () Nothing -> return ()
connect sock $ T.unpack ep connect sock $ T.unpack ep
debugM "Broker.Client" "Connected" log Debug "Broker.Client" "Connected"
isTimeout <- newIORef False isTimeout <- newIORef False
whileM_ (andM [isNothing <$> tryReadMVar killMv, (== False) <$> readIORef isTimeout]) $ do whileM_ (andM [isNothing <$> tryReadMVar killMv, (== False) <$> readIORef isTimeout]) $ do
@ -92,8 +128,17 @@ brokerClientThread socketIdentity ctx ep cmd comp killMv secParams = finally bro
isZMQError e = "ZMQError" `L.isPrefixOf` show e isZMQError e = "ZMQError" `L.isPrefixOf` show e
notificationThread :: ClientIdentity -> [NotificationCallback] -> Context -> T.Text -> IORef RequestSqnum -> MVar (BrokerServerRequest, MVar BrokerServerResponse) -> MVar () -> ClientSecurityParams -> IO () notificationThread :: ClientIdentity ->
notificationThread clientIdentity callbacks ctx ep idCounter cmdVar killMv secParams = flip finally (return ()) $ do [NotificationCallback] ->
Context ->
T.Text ->
IORef RequestSqnum ->
MVar (BrokerServerRequest, MVar BrokerServerResponse) ->
MVar () ->
ClientSecurityParams ->
LogAction IO Message ->
IO ()
notificationThread clientIdentity callbacks ctx ep idCounter cmdVar killMv secParams logger = flip finally (return ()) $ do
whileM_ (isNothing <$> tryReadMVar killMv) $ whileM_ (isNothing <$> tryReadMVar killMv) $
withSocket ctx Sub $ \sock -> do withSocket ctx Sub $ \sock -> do
setLinger (restrict 0) sock setLinger (restrict 0) sock
@ -108,7 +153,7 @@ notificationThread clientIdentity callbacks ctx ep idCounter cmdVar killMv secPa
setTcpKeepAliveIdle (restrict 60) sock setTcpKeepAliveIdle (restrict 60) sock
setTcpKeepAliveInterval (restrict 10) sock setTcpKeepAliveInterval (restrict 10) sock
connect sock $ T.unpack ep connect sock $ T.unpack ep
debugM "Broker.Client" $ "Subscribing: [" <> T.unpack clientIdentity <> "]" log Debug "Broker.Client" $ "Subscribing: [" <> clientIdentity <> "]"
subscribe sock $ T.encodeUtf8 clientIdentity subscribe sock $ T.encodeUtf8 clientIdentity
initialSqnum <- requestCurrentSqnum cmdVar idCounter clientIdentity initialSqnum <- requestCurrentSqnum cmdVar idCounter clientIdentity
@ -130,8 +175,8 @@ notificationThread clientIdentity callbacks ctx ep idCounter cmdVar killMv secPa
when (getNotificationSqnum notif >= lastSqnum) $ do when (getNotificationSqnum notif >= lastSqnum) $ do
forM_ callbacks $ \c -> c notif forM_ callbacks $ \c -> c notif
atomicWriteIORef notifSqnumRef (nextSqnum lastSqnum) atomicWriteIORef notifSqnumRef (nextSqnum lastSqnum)
(ResponseError msg) -> warningM "Broker.Client" $ "ResponseError: " <> T.unpack msg (ResponseError msg) -> log Warning "Broker.Client" $ "ResponseError: " <> msg
_ -> warningM "Broker.Client" "Unknown error when requesting notifications" _ -> log Warning "Broker.Client" "Unknown error when requesting notifications"
else do else do
msg <- receiveMulti sock msg <- receiveMulti sock
case msg of case msg of
@ -141,14 +186,16 @@ notificationThread clientIdentity callbacks ctx ep idCounter cmdVar killMv secPa
if getNotificationSqnum notification /= currentSqnum if getNotificationSqnum notification /= currentSqnum
then then
if currentSqnum > getNotificationSqnum notification if currentSqnum > getNotificationSqnum notification
then debugM "Broker.Client" $ "Already processed notification: " <> show (getNotificationSqnum notification) then log Debug "Broker.Client" $ "Already processed notification: " <> (T.pack . show) (getNotificationSqnum notification)
else warningM "Broker.Client" $ "Notification sqnum mismatch: " <> show currentSqnum <> " -> " <> show (getNotificationSqnum notification) else log Warning "Broker.Client" $
"Notification sqnum mismatch: " <> (T.pack . show) currentSqnum <> " -> " <> (T.pack . show) (getNotificationSqnum notification)
else do else do
atomicWriteIORef notifSqnumRef (nextSqnum currentSqnum) atomicWriteIORef notifSqnumRef (nextSqnum currentSqnum)
forM_ callbacks $ \c -> c notification forM_ callbacks $ \c -> c notification
_ -> return () _ -> return ()
_ -> return () _ -> return ()
where where
log = logWith logger
requestCurrentSqnum cmdVar idCounter clientIdentity = do requestCurrentSqnum cmdVar idCounter clientIdentity = do
respVar <- newEmptyMVar respVar <- newEmptyMVar
sqnum <- nextId idCounter sqnum <- nextId idCounter
@ -157,10 +204,10 @@ notificationThread clientIdentity callbacks ctx ep idCounter cmdVar killMv secPa
case resp of case resp of
(ResponseCurrentSqnum sqnum) -> return sqnum (ResponseCurrentSqnum sqnum) -> return sqnum
(ResponseError msg) -> do (ResponseError msg) -> do
warningM "Broker.Client" $ "ResponseError: " <> T.unpack msg log Warning "Broker.Client" $ "ResponseError: " <> msg
return (NotificationSqnum 1) return (NotificationSqnum 1)
_ -> do _ -> do
warningM "Broker.Client" "Unknown error when requesting notifications" log Warning "Broker.Client" "Unknown error when requesting notifications"
return (NotificationSqnum 1) return (NotificationSqnum 1)
@ -170,15 +217,16 @@ startBrokerClient :: B.ByteString -- ^ Socket Identity
-> T.Text -- ^ Notification endpoing -> T.Text -- ^ Notification endpoing
-> [NotificationCallback] -- ^ List of notification callbacks -> [NotificationCallback] -- ^ List of notification callbacks
-> ClientSecurityParams -- ^ -> ClientSecurityParams -- ^
-> LogAction IO Message
-> IO BrokerClientHandle -> IO BrokerClientHandle
startBrokerClient socketIdentity ctx endpoint notifEndpoint notificationCallbacks secParams = do startBrokerClient socketIdentity ctx endpoint notifEndpoint notificationCallbacks secParams logger = do
idCounter <- newIORef 1 idCounter <- newIORef 1
compMv <- newEmptyMVar compMv <- newEmptyMVar
killMv <- newEmptyMVar killMv <- newEmptyMVar
cmdVar <- newEmptyMVar :: IO (MVar (BrokerServerRequest, MVar BrokerServerResponse)) cmdVar <- newEmptyMVar :: IO (MVar (BrokerServerRequest, MVar BrokerServerResponse))
tid <- forkIO (brokerClientThread socketIdentity ctx endpoint cmdVar compMv killMv secParams) tid <- forkIO (brokerClientThread socketIdentity ctx endpoint cmdVar compMv killMv secParams logger)
notifSqnumRef <- newIORef (NotificationSqnum 0) notifSqnumRef <- newIORef (NotificationSqnum 0)
notifThreadId <- forkIO (notificationThread (T.decodeUtf8 socketIdentity) notificationCallbacks ctx notifEndpoint idCounter cmdVar killMv secParams) notifThreadId <- forkIO (notificationThread (T.decodeUtf8 socketIdentity) notificationCallbacks ctx notifEndpoint idCounter cmdVar killMv secParams logger)
return BrokerClientHandle { return BrokerClientHandle {
tid = tid, tid = tid,
@ -226,7 +274,11 @@ bcCancelOrder clientIdentity idCounter cmdVar orderId = do
(ResponseError msg) -> return $ Left msg (ResponseError msg) -> return $ Left msg
_ -> return $ Left "Unknown error" _ -> return $ Left "Unknown error"
bcGetNotifications :: ClientIdentity -> IORef RequestSqnum -> IORef NotificationSqnum -> MVar (BrokerServerRequest, MVar BrokerServerResponse) -> IO (Either T.Text [Notification]) bcGetNotifications :: ClientIdentity ->
IORef RequestSqnum ->
IORef NotificationSqnum ->
MVar (BrokerServerRequest, MVar BrokerServerResponse) ->
IO (Either T.Text [Notification])
bcGetNotifications clientIdentity idCounter notifSqnumRef cmdVar = do bcGetNotifications clientIdentity idCounter notifSqnumRef cmdVar = do
respVar <- newEmptyMVar respVar <- newEmptyMVar
sqnum <- nextId idCounter sqnum <- nextId idCounter

Loading…
Cancel
Save