ATrade core infrastructure
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

103 lines
3.2 KiB

module ATrade.Broker.Server (
startBrokerServer,
stopBrokerServer,
BrokerInterface(..)
) where
import ATrade.Types
import ATrade.Broker.Protocol
import System.ZMQ4
import Data.List.NonEmpty
import qualified Data.Map as M
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
import qualified Data.Text as T
import qualified Data.List as L
import Data.Aeson
import Data.Time.Clock
import Data.IORef
import Control.Concurrent
import Control.Exception
import Control.Monad
import System.Log.Logger
newtype OrderIdGenerator = IO OrderId
data BrokerInterface = BrokerInterface {
accounts :: [T.Text],
setNotificationCallback :: Maybe (Notification -> IO()) -> IO (),
submitOrder :: Order -> IO (),
cancelOrder :: OrderId -> IO (),
stopBroker :: IO ()
}
data BrokerServerState = BrokerServerState {
bsSocket :: Socket Router,
orderMap :: M.Map OrderId B.ByteString, -- Matches 0mq client identities with corresponding orders
lastPacket :: M.Map B.ByteString (RequestSqnum, B.ByteString),
pendingNotifications :: [(Notification, UTCTime)], -- List of tuples (Order with new state, Time when notification enqueued)
brokers :: [BrokerInterface],
completionMvar :: MVar (),
orderIdCounter :: OrderId
}
data BrokerServerHandle = BrokerServerHandle ThreadId (MVar ())
startBrokerServer :: [BrokerInterface] -> Context -> T.Text -> IO BrokerServerHandle
startBrokerServer brokers c ep = do
sock <- socket c Router
bind sock (T.unpack ep)
tid <- myThreadId
compMv <- newEmptyMVar
state <- newIORef BrokerServerState {
bsSocket = sock,
orderMap = M.empty,
lastPacket = M.empty,
pendingNotifications = [],
brokers = brokers,
completionMvar = compMv,
orderIdCounter = 1
}
BrokerServerHandle <$> forkIO (brokerServerThread state) <*> pure compMv
brokerServerThread state = finally brokerServerThread' cleanup
where
brokerServerThread' = forever $ do
sock <- bsSocket <$> readIORef state
receiveMulti sock >>= handleMessage >>= sendMessage sock
cleanup = do
sock <- bsSocket <$> readIORef state
close sock
mv <- completionMvar <$> readIORef state
putMVar mv ()
handleMessage :: [B.ByteString] -> IO (B.ByteString, BrokerServerResponse)
handleMessage [peerId, _, payload] = do
bros <- brokers <$> readIORef state
case decode . BL.fromStrict $ payload of
Just (RequestSubmitOrder sqnum order) ->
case findBroker (orderAccountId order) bros of
Just bro -> do
oid <- nextOrderId
submitOrder bro order { orderId = oid }
return (peerId, ResponseOrderSubmitted oid)
Nothing -> error "foobar"
Nothing -> error "foobar"
handleMessage x = do
warningM "Broker.Server" ("Invalid packet received: " ++ show x)
error "foobar"
sendMessage sock (peerId, resp) = sendMulti sock (peerId :| [B.empty, BL.toStrict . encode $ resp])
findBroker account = L.find (L.elem account . accounts)
nextOrderId = atomicModifyIORef' state (\s -> ( s {orderIdCounter = 1 + orderIdCounter s}, orderIdCounter s))
stopBrokerServer :: BrokerServerHandle -> IO ()
stopBrokerServer (BrokerServerHandle tid compMv) = yield >> killThread tid >> readMVar compMv