You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

128 lines
4.9 KiB

3 years ago
{-# LANGUAGE RecordWildCards #-}
module TickerInfoServer
(
TickerInfoServerHandle,
startTickerInfoServer,
stopTickerInfoServer,
withTickerInfoServer,
putTickerInfo,
getTickerInfo,
getAllTickers,
3 years ago
TickerInfo(..)
) where
import ATrade.Logging (Message,
Severity (Debug, Warning),
3 years ago
logWith)
import ATrade.Types (Tick, TickerId, security)
import Colog (LogAction)
import Control.Concurrent (ThreadId, forkIO)
import Control.Concurrent.STM (TVar, atomically, newTVarIO,
readTVarIO)
import Control.Concurrent.STM.TVar (modifyTVar', writeTVar)
import Control.Exception (bracket)
import Control.Monad.Extra (whileM)
import Data.Aeson (FromJSON (parseJSON),
ToJSON (toJSON), decode,
eitherDecode, encode, object,
withObject)
import Data.Aeson.Types ((.!=), (.:), (.:?), (.=))
3 years ago
import qualified Data.ByteString.Lazy as BL
import Data.List.NonEmpty (NonEmpty ((:|)))
import qualified Data.Map.Strict as M
import qualified Data.Text as T
import Data.Text.Encoding (decodeUtf8With, encodeUtf8)
import Data.Text.Encoding.Error (replace)
3 years ago
import Prelude hiding (log)
import System.ZMQ4 (Context, Router (Router), bind,
connect, receiveMulti, sendMulti,
3 years ago
withSocket)
data TickerInfo =
TickerInfo
{
tiTicker :: TickerId
, tiLotSize :: Int
, tiTickSize :: Double
, tiTickPrice :: Double
3 years ago
} deriving (Show, Eq, Ord)
instance FromJSON TickerInfo where
parseJSON = withObject "TickerInfo" (\obj ->
TickerInfo <$>
obj .: "ticker" <*>
obj .: "lot_size" <*>
obj .: "tick_size" <*>
obj .:? "tick_price" .!= 1)
3 years ago
instance ToJSON TickerInfo where
toJSON ti = object [ "ticker" .= tiTicker ti,
"lot_size" .= tiLotSize ti,
"tick_size" .= tiTickSize ti,
"tick_price" .= tiTickPrice ti]
3 years ago
newtype TickerInfoRequest =
TickerInfoRequest
{
tirTickerId :: TickerId
} deriving (Show, Eq, Ord)
instance FromJSON TickerInfoRequest where
parseJSON = withObject "TickerInfoRequest" (\obj ->
TickerInfoRequest <$>
obj .: "ticker")
instance ToJSON TickerInfoRequest where
toJSON tir = object [ "ticker" .= tirTickerId tir ]
data TickerInfoServerHandle =
TickerInfoServerHandle
{
tisThreadId :: ThreadId
, tisRun :: TVar Bool
, tisMap :: TVar (M.Map TickerId TickerInfo)
}
startTickerInfoServer :: LogAction IO Message -> Context -> T.Text -> IO TickerInfoServerHandle
startTickerInfoServer logger ctx endpoint = do
tisRun <- newTVarIO True
tisMap <- newTVarIO M.empty
tisThreadId <- forkIO $ tisThread tisRun tisMap
pure $ TickerInfoServerHandle {..}
where
log = logWith logger
tisThread tisRun tisMap = withSocket ctx Router $ \sock -> do
bind sock (T.unpack endpoint)
3 years ago
whileM $ do
rq <- receiveMulti sock
case rq of
(sender:_:message:_) -> case eitherDecode (BL.fromStrict message) of
Right tir -> do
3 years ago
maybeTi <- M.lookup (tirTickerId tir) <$> readTVarIO tisMap
case maybeTi of
Just ti -> sendMulti sock (sender :| ["", "OK", BL.toStrict $ encode ti])
3 years ago
_ -> do
log Warning "TIS" $ "Requested unknown ticker: " <> tirTickerId tir
sendMulti sock (sender :| ["", "ERROR"])
Left err -> do
log Warning "TIS" $ "Unable to parse incoming request" <> (T.pack . show) err
log Debug "TIS" $ "Request: " <> decodeUtf8With (replace '?') message
3 years ago
_ -> log Warning "TIS" "Malformed packet"
readTVarIO tisRun
stopTickerInfoServer :: TickerInfoServerHandle -> IO ()
stopTickerInfoServer h = atomically $ writeTVar (tisRun h) False
withTickerInfoServer :: LogAction IO Message -> Context -> T.Text -> (TickerInfoServerHandle -> IO ()) -> IO ()
withTickerInfoServer logger ctx endpoint = bracket (startTickerInfoServer logger ctx endpoint) stopTickerInfoServer
getTickerInfo :: TickerId -> TickerInfoServerHandle -> IO (Maybe TickerInfo)
getTickerInfo tickerId tisH = M.lookup tickerId <$> readTVarIO (tisMap tisH)
putTickerInfo :: TickerInfoServerHandle -> TickerInfo -> IO ()
putTickerInfo tisH tickerInfo = atomically $ modifyTVar' (tisMap tisH) (M.insert (tiTicker tickerInfo) tickerInfo)
getAllTickers :: TickerInfoServerHandle -> IO [TickerId]
getAllTickers tisH = M.keys <$> readTVarIO (tisMap tisH)