From d24c2a6ffbd77627701134549b8637ce15149589 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Mon, 24 Oct 2016 15:35:41 +0700 Subject: [PATCH] Database++ --- mds.cabal | 2 ++ src/ATrade/MDS/Database.hs | 44 ++++++++++++++++++++++++++------- src/ATrade/MDS/HistoryServer.hs | 14 +++++++++++ 3 files changed, 51 insertions(+), 9 deletions(-) create mode 100644 src/ATrade/MDS/HistoryServer.hs diff --git a/mds.cabal b/mds.cabal index 193a0c4..49139e3 100644 --- a/mds.cabal +++ b/mds.cabal @@ -26,6 +26,8 @@ library , hslogger , time , monad-loops + , text-format + , zeromq4-haskell default-language: Haskell2010 executable mds-exe diff --git a/src/ATrade/MDS/Database.hs b/src/ATrade/MDS/Database.hs index 916aa61..97afb9e 100644 --- a/src/ATrade/MDS/Database.hs +++ b/src/ATrade/MDS/Database.hs @@ -1,10 +1,16 @@ {-# LANGUAGE OverloadedStrings #-} module ATrade.MDS.Database ( + DatabaseConfig(..), + DatabaseInterface(..), + startDatabase, + stopDatabase ) where import qualified Data.Configurator as C import qualified Data.Text as T +import qualified Data.Text.Lazy as TL +import Data.Text.Format import qualified Data.Vector as V import ATrade.Types import Data.Time.Clock @@ -44,6 +50,7 @@ data DatabaseInterface = DatabaseInterface { startDatabase :: DatabaseConfig -> IO DatabaseInterface startDatabase config = do conn <- connectPostgreSQL (mkConnectionString config) + makeSchema conn cmdVar <- newEmptyMVar respVar <- newEmptyMVar compVar <- newEmptyMVar @@ -53,7 +60,8 @@ startDatabase config = do getData = doGetData cmdVar respVar, putData = doPutData cmdVar respVar } where - mkConnectionString = undefined + makeSchema conn = runRaw conn "CREATE TABLE IF NOT EXISTS bars (id SERIAL PRIMARY KEY, ticker TEXT, timestamp BIGINT, open NUMERIC(20, 10), high NUMERIC(20, 10), low NUMERIC(20, 10), close NUMERIC(20,10), volume BIGINT);" + mkConnectionString config = TL.unpack $ format "User ID={};Password={};Host={};Port=5432;Database={}" (dbUser config, dbPassword config, dbHost config, dbDatabase config) dbThread conn cmdVar respVar = forever $ do cmd <- readMVar cmdVar handleCmd conn cmd >>= putMVar respVar @@ -61,17 +69,35 @@ startDatabase config = do takeMVar cmdVar cleanup conn cmdVar respVar compVar _ = disconnect conn >> putMVar compVar () handleCmd conn cmd = case cmd of - DBPut tickerId (TimeInterval start end) (Timeframe timeframeSecs) bars -> do + DBPut tickerId (TimeInterval start end) tf@(Timeframe timeframeSecs) bars -> do delStmt <- prepare conn "DELETE FROM bars WHERE timestamp > ? AND timestamp < ? AND ticker == ? AND timeframe == ?;" - execute delStmt [utcTimeToPosixSeconds start, utcTimeToPosixSeconds end, tickerId, timeframeSecs] - stmt <- prepare conn "INSERT INTO bars (ticker, timeframe, timestamp, open, high, low, close, volume)" ++ + execute delStmt [(SqlPOSIXTime . utcTimeToPOSIXSeconds) start, (SqlPOSIXTime . utcTimeToPOSIXSeconds) end, (SqlString . T.unpack) tickerId, (SqlInteger . toInteger) timeframeSecs] + stmt <- prepare conn $ "INSERT INTO bars (ticker, timeframe, timestamp, open, high, low, close, volume)" ++ " values (?, ?, ?, ?, ?, ?, ?, ?); " - executeMany stmt (map barToSql $ V.toList bars) + executeMany stmt (map (barToSql tf) $ V.toList bars) return DBOk DBGet tickerId interval@(TimeInterval start end) (Timeframe timeframeSecs) -> do - rows <- quickQuery' conn "SELECT timestamp, open, high, low, close, volume FROM bars WHERE ticker == ? AND timeframe == ? AND timestamp > ? AND timestamp < ?;" [tickerId, timeframeSecs, utcTimeToPosixSeconds start, utcTimeToPosixSeconds end] - return $ DBData [(interval, V.fromList $ map barFromResult rows)] - barFromResult = undefined + rows <- quickQuery' conn "SELECT timestamp, open, high, low, close, volume FROM bars WHERE ticker == ? AND timeframe == ? AND timestamp > ? AND timestamp < ?;" [(toSql. T.unpack) tickerId, toSql timeframeSecs, (toSql . utcTimeToPOSIXSeconds) start, (toSql . utcTimeToPOSIXSeconds) end] + return $ DBData [(interval, V.fromList $ mapMaybe (barFromResult tickerId) rows)] + barFromResult ticker [ts, open, high, low, close, volume] = Just Bar { + barSecurity = ticker, + barTimestamp = fromSql ts, + barOpen = fromRational $ fromSql open, + barHigh = fromRational $ fromSql high, + barLow = fromRational $ fromSql low, + barClose = fromRational $ fromSql close, + barVolume = fromSql volume + } + barFromResult _ _ = Nothing + + barToSql :: Timeframe -> Bar -> [SqlValue] + barToSql (Timeframe timeframeSecs) bar = [(SqlString . T.unpack . barSecurity) bar, + (SqlInteger . toInteger) timeframeSecs, + (SqlRational . toRational . barOpen) bar, + (SqlRational . toRational . barHigh) bar, + (SqlRational . toRational . barLow) bar, + (SqlRational . toRational . barClose) bar, + (SqlRational . toRational . barVolume) bar ] stopDatabase :: MVar () -> DatabaseInterface -> IO () stopDatabase compVar db = killThread (tid db) >> readMVar compVar @@ -100,5 +126,5 @@ doPutData cmdVar respVar tickerId timeInterval timeframe bars = do return () _ -> do warningM "DB.Client" "Unexpected response" - return [] + return () diff --git a/src/ATrade/MDS/HistoryServer.hs b/src/ATrade/MDS/HistoryServer.hs new file mode 100644 index 0000000..37e925c --- /dev/null +++ b/src/ATrade/MDS/HistoryServer.hs @@ -0,0 +1,14 @@ + +module ATrade.MDS.HistoryServer ( +) where + +import System.ZMQ4 +import ATrade.MDS.Database +import Control.Concurrent + +data HistoryServer = HistoryServer ThreadId +} + +startHistoryServer :: DatabaseInterface -> Context -> IO HistoryServer +startHistoryServer db ctx = do +