7 changed files with 229 additions and 35 deletions
@ -1,5 +1,59 @@
@@ -1,5 +1,59 @@
|
||||
|
||||
module Main where |
||||
|
||||
import Data.Aeson |
||||
import qualified Data.Text as T |
||||
import qualified Data.ByteString.Lazy as BL |
||||
|
||||
import ATrade.MDS.Database |
||||
import ATrade.MDS.HistoryServer |
||||
|
||||
import Control.Concurrent |
||||
import Control.Monad |
||||
|
||||
import System.ZMQ4 |
||||
|
||||
data MdsConfig = MdsConfig { |
||||
cfgDbPath :: T.Text, |
||||
cfgDbName :: T.Text, |
||||
cfgDbAccount :: T.Text, |
||||
cfgDbPassword :: T.Text, |
||||
cfgQHPEndpoint :: T.Text, |
||||
cfgHAPEndpoint :: T.Text |
||||
} deriving (Show, Eq) |
||||
|
||||
instance FromJSON MdsConfig where |
||||
parseJSON = withObject "Cfg" $ \v -> |
||||
MdsConfig <$> |
||||
v .: "path" <*> |
||||
v .: "name" <*> |
||||
v .: "account" <*> |
||||
v .: "password" <*> |
||||
v .: "qhp_endpoint" <*> |
||||
v .: "hap_endpoint" |
||||
|
||||
getConfig :: IO MdsConfig |
||||
getConfig = do |
||||
rawCfg <- BL.readFile "mds.conf" |
||||
case eitherDecode' rawCfg of |
||||
Left err -> error err |
||||
Right cfg -> return cfg |
||||
|
||||
main :: IO () |
||||
main = undefined |
||||
main = do |
||||
cfg <- getConfig |
||||
let dbConfig = DatabaseConfig { dbPath = cfgDbPath cfg, |
||||
dbDatabase = cfgDbName cfg, |
||||
dbUser = cfgDbAccount cfg, |
||||
dbPassword = cfgDbPassword cfg } |
||||
|
||||
db <- initDatabase dbConfig |
||||
|
||||
let hsConfig = HistoryServerConfig { |
||||
hspQHPEndpoint = cfgQHPEndpoint cfg, |
||||
hspHAPEndpoint = cfgHAPEndpoint cfg } |
||||
|
||||
withContext $ \ctx -> do |
||||
_ <- startHistoryServer hsConfig db ctx |
||||
forever $ threadDelay 1000000 |
||||
|
||||
|
||||
@ -1,7 +1,8 @@
@@ -1,7 +1,8 @@
|
||||
|
||||
database { |
||||
host = "127.0.0.1", |
||||
name = "atrade_quotes", |
||||
user = "atrade", |
||||
password = "atrade" |
||||
{ |
||||
"path" : "/tmp/test.db", |
||||
"name" : "", |
||||
"account" : "", |
||||
"password" : "", |
||||
"qhp_endpoint" : "tcp://0.0.0.0:5595", |
||||
"hap_endpoint" : "tcp://0.0.0.0:5605" |
||||
} |
||||
|
||||
@ -1,42 +1,106 @@
@@ -1,42 +1,106 @@
|
||||
|
||||
module ATrade.MDS.HistoryServer ( |
||||
HistoryServer, |
||||
HistoryServerConfig(..), |
||||
startHistoryServer |
||||
) where |
||||
|
||||
import System.ZMQ4 |
||||
import ATrade.Types |
||||
import ATrade.MDS.Database |
||||
import ATrade.MDS.Protocol |
||||
import Control.Concurrent |
||||
import Control.Monad |
||||
import Data.Aeson |
||||
import Data.List.NonEmpty |
||||
import Data.Time.Clock.POSIX |
||||
import qualified Data.Vector as V |
||||
import Safe |
||||
import qualified Data.Text as T |
||||
import qualified Data.ByteString as B |
||||
import qualified Data.ByteString.Lazy as BL |
||||
import Data.Binary.Get |
||||
import Data.Binary.Put |
||||
|
||||
data HistoryServer = HistoryServer ThreadId |
||||
data HistoryServer = HistoryServer ThreadId ThreadId |
||||
|
||||
startHistoryServer :: MdsHandle -> Context -> IO HistoryServer |
||||
startHistoryServer db ctx = do |
||||
sock <- socket ctx Router |
||||
tid <- forkIO $ serve db sock |
||||
return $ HistoryServer tid |
||||
data HistoryServerConfig = HistoryServerConfig { |
||||
hspQHPEndpoint :: T.Text, |
||||
hspHAPEndpoint :: T.Text |
||||
} deriving (Show, Eq) |
||||
|
||||
serve :: (Sender a, Receiver a) => MdsHandle -> Socket a -> IO () |
||||
serve db sock = forever $ do |
||||
startHistoryServer :: HistoryServerConfig -> MdsHandle -> Context -> IO HistoryServer |
||||
startHistoryServer cfg db ctx = do |
||||
qhp <- socket ctx Router |
||||
bind qhp $ T.unpack $ hspQHPEndpoint cfg |
||||
qhpTid <- forkIO $ serveQHP db qhp |
||||
|
||||
hap <- socket ctx Router |
||||
bind hap $ T.unpack $ hspHAPEndpoint cfg |
||||
hapTid <- forkIO $ serveHAP db hap |
||||
|
||||
return $ HistoryServer qhpTid hapTid |
||||
|
||||
serveQHP :: (Sender a, Receiver a) => MdsHandle -> Socket a -> IO () |
||||
serveQHP db sock = forever $ do |
||||
rq <- receiveMulti sock |
||||
let maybeCmd = (BL.fromStrict <$> rq `atMay` 2) >>= decode |
||||
case (headMay rq, maybeCmd) of |
||||
(Just peerId, Just cmd) -> handleCmd peerId cmd |
||||
_ -> return () |
||||
where |
||||
handleCmd :: B.ByteString -> MDSRequest -> IO () |
||||
handleCmd :: B.ByteString -> QHPRequest -> IO () |
||||
handleCmd peerId cmd = case cmd of |
||||
rq -> do |
||||
qdata <- getData db (rqTicker rq) (TimeInterval (rqFrom rq) (rqTo rq)) (Timeframe (rqTimeframe rq)) |
||||
bytes <- serializeBars $ V.concat $ fmap snd qdata |
||||
sendMulti sock $ peerId :| B.empty : bytes |
||||
serializeBars = undefined |
||||
qdata <- getData db (rqTicker rq) (TimeInterval (rqStartTime rq) (rqEndTime rq)) (Timeframe (periodSeconds $ rqPeriod rq)) |
||||
let bytes = serializeBars $ V.concat $ fmap snd qdata |
||||
sendMulti sock $ peerId :| B.empty : [BL.toStrict bytes] |
||||
serializeBars :: V.Vector Bar -> BL.ByteString |
||||
serializeBars bars = runPut $ V.forM_ bars serializeBar |
||||
serializeBar bar = do |
||||
putWord64le (truncate . utcTimeToPOSIXSeconds . barTimestamp $ bar) |
||||
putDoublele (toDouble . barOpen $ bar) |
||||
putDoublele (toDouble . barHigh $ bar) |
||||
putDoublele (toDouble . barLow $ bar) |
||||
putDoublele (toDouble . barClose $ bar) |
||||
putWord64le (fromInteger . barVolume $ bar) |
||||
|
||||
serveHAP :: (Sender a, Receiver a) => MdsHandle -> Socket a -> IO () |
||||
serveHAP db sock = forever $ do |
||||
rq <- receiveMulti sock |
||||
let maybeCmd = (BL.fromStrict <$> rq `atMay` 2) >>= decode |
||||
let mbRawData = rq `atMay` 3 |
||||
case (headMay rq, maybeCmd, mbRawData) of |
||||
(Just peerId, Just cmd, Just rawData) -> do |
||||
let parsedData = deserializeBars (hapTicker cmd) $ BL.fromStrict rawData |
||||
handleCmd peerId cmd parsedData |
||||
_ -> return () |
||||
where |
||||
handleCmd :: B.ByteString -> HAPRequest -> [Bar] -> IO () |
||||
handleCmd peerId cmd bars = case cmd of |
||||
rq -> do |
||||
putData db (hapTicker rq) (TimeInterval (hapStartTime rq) (hapEndTime rq)) (Timeframe $ hapTimeframeSec rq) (V.fromList bars) |
||||
sendMulti sock $ peerId :| B.empty : ["OK"] |
||||
|
||||
deserializeBars tickerId input = |
||||
case runGetOrFail parseBar input of |
||||
Left _ -> [] |
||||
Right (rest, _, bar) -> bar : deserializeBars tickerId rest |
||||
where |
||||
parseBar = do |
||||
rawTimestamp <- realToFrac <$> getWord64le |
||||
baropen <- getDoublele |
||||
barhigh <- getDoublele |
||||
barlow <- getDoublele |
||||
barclose <- getDoublele |
||||
barvolume <- getWord64le |
||||
return Bar |
||||
{ |
||||
barSecurity = tickerId, |
||||
barTimestamp = posixSecondsToUTCTime rawTimestamp, |
||||
barOpen = fromDouble baropen, |
||||
barHigh = fromDouble barhigh, |
||||
barLow = fromDouble barlow, |
||||
barClose = fromDouble barclose, |
||||
barVolume = toInteger barvolume |
||||
} |
||||
|
||||
@ -1,23 +1,88 @@
@@ -1,23 +1,88 @@
|
||||
{-# LANGUAGE DeriveGeneric #-} |
||||
{-# LANGUAGE MultiWayIf #-} |
||||
|
||||
module ATrade.MDS.Protocol ( |
||||
MDSRequest(..) |
||||
QHPRequest(..), |
||||
HAPRequest(..), |
||||
Period(..), |
||||
periodSeconds |
||||
) where |
||||
|
||||
import GHC.Generics |
||||
|
||||
import ATrade.Types |
||||
-- import ATrade.Types |
||||
|
||||
import Data.Aeson |
||||
import Data.Aeson.Types |
||||
import Data.Time.Clock |
||||
import qualified Data.Text as T |
||||
|
||||
data Period = |
||||
Period1Min | |
||||
Period5Min | |
||||
Period15Min | |
||||
Period30Min | |
||||
PeriodHour | |
||||
PeriodDay | |
||||
PeriodWeek | |
||||
PeriodMonth |
||||
deriving (Eq) |
||||
|
||||
instance Show Period where |
||||
show Period1Min = "M1" |
||||
show Period5Min = "M5" |
||||
show Period15Min = "M15" |
||||
show Period30Min = "M30" |
||||
show PeriodHour = "H1" |
||||
show PeriodDay = "D" |
||||
show PeriodWeek = "W" |
||||
show PeriodMonth = "MN" |
||||
|
||||
periodSeconds :: Period -> Int |
||||
periodSeconds Period1Min = 60 |
||||
periodSeconds Period5Min = 60 * 5 |
||||
periodSeconds Period15Min = 60 * 15 |
||||
periodSeconds Period30Min = 60 * 30 |
||||
periodSeconds PeriodHour = 3600 |
||||
periodSeconds PeriodDay = 86400 |
||||
periodSeconds PeriodWeek = 86400 * 7 |
||||
periodSeconds PeriodMonth = 86400 * 7 * 4 |
||||
|
||||
data MDSRequest = RequestData { |
||||
rqTicker :: TickerId, |
||||
rqFrom :: UTCTime, |
||||
rqTo :: UTCTime, |
||||
rqTimeframe :: Int |
||||
} deriving (Generic, Show, Eq) |
||||
data QHPRequest = |
||||
QHPRequest { |
||||
rqTicker :: T.Text, |
||||
rqStartTime :: UTCTime, |
||||
rqEndTime :: UTCTime, |
||||
rqPeriod :: Period |
||||
} deriving (Show, Eq) |
||||
|
||||
instance FromJSON QHPRequest where |
||||
parseJSON = withObject "Request" $ \v -> QHPRequest <$> |
||||
v .: "ticker" <*> |
||||
v .: "from" <*> |
||||
v .: "to" <*> |
||||
(v .: "timeframe" >>= parseTf) |
||||
where |
||||
parseTf :: T.Text -> Parser Period |
||||
parseTf t = if |
||||
| t == "M1" -> return Period1Min |
||||
| t == "M5" -> return Period5Min |
||||
| t == "M15" -> return Period15Min |
||||
| t == "M30" -> return Period30Min |
||||
| t == "H1" -> return PeriodHour |
||||
| t == "D" -> return PeriodDay |
||||
| t == "W" -> return PeriodWeek |
||||
| t == "MN" -> return PeriodMonth |
||||
| otherwise -> fail "Invalid period specified" |
||||
|
||||
instance ToJSON MDSRequest |
||||
instance FromJSON MDSRequest |
||||
data HAPRequest = |
||||
HAPRequest { |
||||
hapTicker :: T.Text, |
||||
hapStartTime :: UTCTime, |
||||
hapEndTime :: UTCTime, |
||||
hapTimeframeSec :: Int |
||||
} deriving (Show, Eq) |
||||
|
||||
instance FromJSON HAPRequest where |
||||
parseJSON = withObject "Request" $ \v -> HAPRequest <$> |
||||
v .: "ticker" <*> |
||||
v .: "start_time" <*> |
||||
v .: "end_time" <*> |
||||
v .: "timeframe_sec" |
||||
|
||||
Loading…
Reference in new issue