diff --git a/app/Main.hs b/app/Main.hs index 9d184e6..f91a60e 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -1,5 +1,59 @@ + module Main where +import Data.Aeson +import qualified Data.Text as T +import qualified Data.ByteString.Lazy as BL + +import ATrade.MDS.Database +import ATrade.MDS.HistoryServer + +import Control.Concurrent +import Control.Monad + +import System.ZMQ4 + +data MdsConfig = MdsConfig { + cfgDbPath :: T.Text, + cfgDbName :: T.Text, + cfgDbAccount :: T.Text, + cfgDbPassword :: T.Text, + cfgQHPEndpoint :: T.Text, + cfgHAPEndpoint :: T.Text +} deriving (Show, Eq) +instance FromJSON MdsConfig where + parseJSON = withObject "Cfg" $ \v -> + MdsConfig <$> + v .: "path" <*> + v .: "name" <*> + v .: "account" <*> + v .: "password" <*> + v .: "qhp_endpoint" <*> + v .: "hap_endpoint" + +getConfig :: IO MdsConfig +getConfig = do + rawCfg <- BL.readFile "mds.conf" + case eitherDecode' rawCfg of + Left err -> error err + Right cfg -> return cfg + main :: IO () -main = undefined +main = do + cfg <- getConfig + let dbConfig = DatabaseConfig { dbPath = cfgDbPath cfg, + dbDatabase = cfgDbName cfg, + dbUser = cfgDbAccount cfg, + dbPassword = cfgDbPassword cfg } + + db <- initDatabase dbConfig + + let hsConfig = HistoryServerConfig { + hspQHPEndpoint = cfgQHPEndpoint cfg, + hspHAPEndpoint = cfgHAPEndpoint cfg } + + withContext $ \ctx -> do + _ <- startHistoryServer hsConfig db ctx + forever $ threadDelay 1000000 + diff --git a/mds.cabal b/mds.cabal index b770c91..8bcb887 100644 --- a/mds.cabal +++ b/mds.cabal @@ -33,8 +33,12 @@ library , aeson , safe , bytestring + , attoparsec + , binary + , binary-ieee754 default-language: Haskell2010 other-modules: ATrade.MDS.Protocol + default-extensions: OverloadedStrings executable mds-exe hs-source-dirs: app @@ -42,7 +46,12 @@ executable mds-exe ghc-options: -threaded -rtsopts -with-rtsopts=-N -Wall -Werror build-depends: base , mds + , aeson + , text + , bytestring + , zeromq4-haskell default-language: Haskell2010 + default-extensions: OverloadedStrings test-suite mds-test type: exitcode-stdio-1.0 @@ -62,7 +71,7 @@ test-suite mds-test default-language: Haskell2010 other-modules: Integration.Spec , Integration.Database - extensions: OverloadedStrings + default-extensions: OverloadedStrings source-repository head type: git diff --git a/mds.conf b/mds.conf index 8f6d4d8..d834f0e 100644 --- a/mds.conf +++ b/mds.conf @@ -1,7 +1,8 @@ - -database { - host = "127.0.0.1", - name = "atrade_quotes", - user = "atrade", - password = "atrade" +{ + "path" : "/tmp/test.db", + "name" : "", + "account" : "", + "password" : "", + "qhp_endpoint" : "tcp://0.0.0.0:5595", + "hap_endpoint" : "tcp://0.0.0.0:5605" } diff --git a/src/ATrade/MDS/Database.hs b/src/ATrade/MDS/Database.hs index 85add5d..a076b59 100644 --- a/src/ATrade/MDS/Database.hs +++ b/src/ATrade/MDS/Database.hs @@ -81,6 +81,7 @@ putData db tickerId (TimeInterval start end) tf@(Timeframe tfSec) bars = do stmt <- prepare db $ "INSERT INTO bars (ticker, timeframe, timestamp, open, high, low, close, volume)" ++ " values (?, ?, ?, ?, ?, ?, ?, ?); " executeMany stmt (map (barToSql tf) $ V.toList bars) + runRaw db "COMMIT;" where barToSql :: Timeframe -> Bar -> [SqlValue] barToSql (Timeframe timeframeSecs) bar = [(SqlString . T.unpack . barSecurity) bar, diff --git a/src/ATrade/MDS/HistoryServer.hs b/src/ATrade/MDS/HistoryServer.hs index 05a91bd..e1d06b3 100644 --- a/src/ATrade/MDS/HistoryServer.hs +++ b/src/ATrade/MDS/HistoryServer.hs @@ -1,42 +1,106 @@ module ATrade.MDS.HistoryServer ( HistoryServer, + HistoryServerConfig(..), startHistoryServer ) where import System.ZMQ4 +import ATrade.Types import ATrade.MDS.Database import ATrade.MDS.Protocol import Control.Concurrent import Control.Monad import Data.Aeson import Data.List.NonEmpty +import Data.Time.Clock.POSIX import qualified Data.Vector as V import Safe +import qualified Data.Text as T import qualified Data.ByteString as B import qualified Data.ByteString.Lazy as BL +import Data.Binary.Get +import Data.Binary.Put -data HistoryServer = HistoryServer ThreadId +data HistoryServer = HistoryServer ThreadId ThreadId -startHistoryServer :: MdsHandle -> Context -> IO HistoryServer -startHistoryServer db ctx = do - sock <- socket ctx Router - tid <- forkIO $ serve db sock - return $ HistoryServer tid +data HistoryServerConfig = HistoryServerConfig { + hspQHPEndpoint :: T.Text, + hspHAPEndpoint :: T.Text +} deriving (Show, Eq) -serve :: (Sender a, Receiver a) => MdsHandle -> Socket a -> IO () -serve db sock = forever $ do +startHistoryServer :: HistoryServerConfig -> MdsHandle -> Context -> IO HistoryServer +startHistoryServer cfg db ctx = do + qhp <- socket ctx Router + bind qhp $ T.unpack $ hspQHPEndpoint cfg + qhpTid <- forkIO $ serveQHP db qhp + + hap <- socket ctx Router + bind hap $ T.unpack $ hspHAPEndpoint cfg + hapTid <- forkIO $ serveHAP db hap + + return $ HistoryServer qhpTid hapTid + +serveQHP :: (Sender a, Receiver a) => MdsHandle -> Socket a -> IO () +serveQHP db sock = forever $ do rq <- receiveMulti sock let maybeCmd = (BL.fromStrict <$> rq `atMay` 2) >>= decode case (headMay rq, maybeCmd) of (Just peerId, Just cmd) -> handleCmd peerId cmd _ -> return () where - handleCmd :: B.ByteString -> MDSRequest -> IO () + handleCmd :: B.ByteString -> QHPRequest -> IO () handleCmd peerId cmd = case cmd of rq -> do - qdata <- getData db (rqTicker rq) (TimeInterval (rqFrom rq) (rqTo rq)) (Timeframe (rqTimeframe rq)) - bytes <- serializeBars $ V.concat $ fmap snd qdata - sendMulti sock $ peerId :| B.empty : bytes - serializeBars = undefined + qdata <- getData db (rqTicker rq) (TimeInterval (rqStartTime rq) (rqEndTime rq)) (Timeframe (periodSeconds $ rqPeriod rq)) + let bytes = serializeBars $ V.concat $ fmap snd qdata + sendMulti sock $ peerId :| B.empty : [BL.toStrict bytes] + serializeBars :: V.Vector Bar -> BL.ByteString + serializeBars bars = runPut $ V.forM_ bars serializeBar + serializeBar bar = do + putWord64le (truncate . utcTimeToPOSIXSeconds . barTimestamp $ bar) + putDoublele (toDouble . barOpen $ bar) + putDoublele (toDouble . barHigh $ bar) + putDoublele (toDouble . barLow $ bar) + putDoublele (toDouble . barClose $ bar) + putWord64le (fromInteger . barVolume $ bar) + +serveHAP :: (Sender a, Receiver a) => MdsHandle -> Socket a -> IO () +serveHAP db sock = forever $ do + rq <- receiveMulti sock + let maybeCmd = (BL.fromStrict <$> rq `atMay` 2) >>= decode + let mbRawData = rq `atMay` 3 + case (headMay rq, maybeCmd, mbRawData) of + (Just peerId, Just cmd, Just rawData) -> do + let parsedData = deserializeBars (hapTicker cmd) $ BL.fromStrict rawData + handleCmd peerId cmd parsedData + _ -> return () + where + handleCmd :: B.ByteString -> HAPRequest -> [Bar] -> IO () + handleCmd peerId cmd bars = case cmd of + rq -> do + putData db (hapTicker rq) (TimeInterval (hapStartTime rq) (hapEndTime rq)) (Timeframe $ hapTimeframeSec rq) (V.fromList bars) + sendMulti sock $ peerId :| B.empty : ["OK"] + deserializeBars tickerId input = + case runGetOrFail parseBar input of + Left _ -> [] + Right (rest, _, bar) -> bar : deserializeBars tickerId rest + where + parseBar = do + rawTimestamp <- realToFrac <$> getWord64le + baropen <- getDoublele + barhigh <- getDoublele + barlow <- getDoublele + barclose <- getDoublele + barvolume <- getWord64le + return Bar + { + barSecurity = tickerId, + barTimestamp = posixSecondsToUTCTime rawTimestamp, + barOpen = fromDouble baropen, + barHigh = fromDouble barhigh, + barLow = fromDouble barlow, + barClose = fromDouble barclose, + barVolume = toInteger barvolume + } diff --git a/src/ATrade/MDS/Protocol.hs b/src/ATrade/MDS/Protocol.hs index 5060d7a..0723f9c 100644 --- a/src/ATrade/MDS/Protocol.hs +++ b/src/ATrade/MDS/Protocol.hs @@ -1,23 +1,88 @@ -{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE MultiWayIf #-} module ATrade.MDS.Protocol ( - MDSRequest(..) + QHPRequest(..), + HAPRequest(..), + Period(..), + periodSeconds ) where -import GHC.Generics - -import ATrade.Types +-- import ATrade.Types import Data.Aeson +import Data.Aeson.Types import Data.Time.Clock +import qualified Data.Text as T + +data Period = + Period1Min | + Period5Min | + Period15Min | + Period30Min | + PeriodHour | + PeriodDay | + PeriodWeek | + PeriodMonth + deriving (Eq) + +instance Show Period where + show Period1Min = "M1" + show Period5Min = "M5" + show Period15Min = "M15" + show Period30Min = "M30" + show PeriodHour = "H1" + show PeriodDay = "D" + show PeriodWeek = "W" + show PeriodMonth = "MN" + +periodSeconds :: Period -> Int +periodSeconds Period1Min = 60 +periodSeconds Period5Min = 60 * 5 +periodSeconds Period15Min = 60 * 15 +periodSeconds Period30Min = 60 * 30 +periodSeconds PeriodHour = 3600 +periodSeconds PeriodDay = 86400 +periodSeconds PeriodWeek = 86400 * 7 +periodSeconds PeriodMonth = 86400 * 7 * 4 -data MDSRequest = RequestData { - rqTicker :: TickerId, - rqFrom :: UTCTime, - rqTo :: UTCTime, - rqTimeframe :: Int -} deriving (Generic, Show, Eq) +data QHPRequest = + QHPRequest { + rqTicker :: T.Text, + rqStartTime :: UTCTime, + rqEndTime :: UTCTime, + rqPeriod :: Period + } deriving (Show, Eq) + +instance FromJSON QHPRequest where + parseJSON = withObject "Request" $ \v -> QHPRequest <$> + v .: "ticker" <*> + v .: "from" <*> + v .: "to" <*> + (v .: "timeframe" >>= parseTf) + where + parseTf :: T.Text -> Parser Period + parseTf t = if + | t == "M1" -> return Period1Min + | t == "M5" -> return Period5Min + | t == "M15" -> return Period15Min + | t == "M30" -> return Period30Min + | t == "H1" -> return PeriodHour + | t == "D" -> return PeriodDay + | t == "W" -> return PeriodWeek + | t == "MN" -> return PeriodMonth + | otherwise -> fail "Invalid period specified" -instance ToJSON MDSRequest -instance FromJSON MDSRequest +data HAPRequest = + HAPRequest { + hapTicker :: T.Text, + hapStartTime :: UTCTime, + hapEndTime :: UTCTime, + hapTimeframeSec :: Int + } deriving (Show, Eq) +instance FromJSON HAPRequest where + parseJSON = withObject "Request" $ \v -> HAPRequest <$> + v .: "ticker" <*> + v .: "start_time" <*> + v .: "end_time" <*> + v .: "timeframe_sec" diff --git a/stack.yaml b/stack.yaml index 66cbf3c..4b6c3b3 100644 --- a/stack.yaml +++ b/stack.yaml @@ -41,7 +41,7 @@ packages: - '../zeromq4-haskell-zap' # Dependency packages to be pulled from upstream that are not in the resolver # (e.g., acme-missiles-0.3) -extra-deps: ["HDBC-sqlite3-2.3.3.1", "datetime-0.3.1"] +extra-deps: ["HDBC-sqlite3-2.3.3.1", "datetime-0.3.1", "th-printf-0.5.1"] # Override default flag values for local packages and extra-deps flags: {}