diff --git a/mds.cabal b/mds.cabal index 886d5db..fb0f94e 100644 --- a/mds.cabal +++ b/mds.cabal @@ -29,6 +29,7 @@ library , monad-loops , text-format , zeromq4-haskell + , safe default-language: Haskell2010 executable mds-exe diff --git a/src/ATrade/MDS/Database.hs b/src/ATrade/MDS/Database.hs index 9a2ebc6..157916b 100644 --- a/src/ATrade/MDS/Database.hs +++ b/src/ATrade/MDS/Database.hs @@ -23,6 +23,7 @@ import Database.HDBC import Database.HDBC.PostgreSQL import Control.Monad import Control.Monad.Loops +import Safe data TimeInterval = TimeInterval UTCTime UTCTime @@ -33,7 +34,7 @@ timeframeHour = Timeframe 3600 timeframeMinute = Timeframe 60 data DatabaseCommand = DBGet TickerId TimeInterval Timeframe | DBPut TickerId TimeInterval Timeframe (V.Vector Bar) -data DatabaseResponse = DBOk | DBData [(TimeInterval, V.Vector Bar)] | DBError T.Text +data DatabaseResponse = DBOk | DBData Timeframe [(TimeInterval, V.Vector Bar)] | DBError T.Text data DatabaseConfig = DatabaseConfig { dbHost :: T.Text, dbDatabase :: T.Text, @@ -60,7 +61,9 @@ startDatabase config = do getData = doGetData cmdVar respVar, putData = doPutData cmdVar respVar } where - makeSchema conn = runRaw conn "CREATE TABLE IF NOT EXISTS bars (id SERIAL PRIMARY KEY, ticker TEXT, timestamp BIGINT, open NUMERIC(20, 10), high NUMERIC(20, 10), low NUMERIC(20, 10), close NUMERIC(20,10), volume BIGINT);" + makeSchema conn = do + runRaw conn "CREATE TABLE IF NOT EXISTS bars (id SERIAL PRIMARY KEY, ticker TEXT, timeframe INTEGER, timestamp BIGINT, open NUMERIC(20, 10), high NUMERIC(20, 10), low NUMERIC(20, 10), close NUMERIC(20,10), volume BIGINT);" + runRaw conn "CREATE TABLE IF NOT EXISTS intervals (id SERIAL PRIMARY KEY, ticker TEXT, start BIGINT, end BIGINT, timeframe INTEGER);" mkConnectionString config = TL.unpack $ format "User ID={};Password={};Host={};Port=5432;Database={}" (dbUser config, dbPassword config, dbHost config, dbDatabase config) dbThread conn cmdVar respVar = forever $ do cmd <- readMVar cmdVar @@ -69,17 +72,24 @@ startDatabase config = do takeMVar cmdVar cleanup conn cmdVar respVar compVar _ = disconnect conn >> putMVar compVar () handleCmd conn cmd = case cmd of - DBPut tickerId (TimeInterval start end) tf@(Timeframe timeframeSecs) bars -> do - delStmt <- prepare conn "DELETE FROM bars WHERE timestamp > ? AND timestamp < ? AND ticker == ? AND timeframe == ?;" - execute delStmt [(SqlPOSIXTime . utcTimeToPOSIXSeconds) start, (SqlPOSIXTime . utcTimeToPOSIXSeconds) end, (SqlString . T.unpack) tickerId, (SqlInteger . toInteger) timeframeSecs] - stmt <- prepare conn $ "INSERT INTO bars (ticker, timeframe, timestamp, open, high, low, close, volume)" ++ - " values (?, ?, ?, ?, ?, ?, ?, ?); " - executeMany stmt (map (barToSql tf) $ V.toList bars) + DBPut tickerId (TimeInterval start end) timeframe bars -> do + deleteFullyIncludedBars conn tickerId start end timeframe + deleteFullyIncludedIntervals conn tickerId start end timeframe + updatePartiallyIncludedIntervals conn tickerId start end timeframe + insertBars conn timeframe bars + commit conn + return DBOk - DBGet tickerId interval@(TimeInterval start end) (Timeframe timeframeSecs) -> do - rows <- quickQuery' conn "SELECT timestamp, open, high, low, close, volume FROM bars WHERE ticker == ? AND timeframe == ? AND timestamp > ? AND timestamp < ?;" [(toSql. T.unpack) tickerId, toSql timeframeSecs, (toSql . utcTimeToPOSIXSeconds) start, (toSql . utcTimeToPOSIXSeconds) end] - return $ DBData [(interval, V.fromList $ mapMaybe (barFromResult tickerId) rows)] - barFromResult ticker [ts, open, high, low, close, volume] = Just Bar { + DBGet tickerId interval@(TimeInterval start end) timeframe@(Timeframe timeframeSecs) -> do + intervals <- getIntervals conn tickerId start end timeframe + result <- mapM (getInterval conn tickerId timeframe) intervals + return $ DBData timeframe result + + rows <- quickQuery' conn "SELECT timestamp, timeframe, open, high, low, close, volume FROM bars WHERE ticker == ? AND timeframe == ? AND timestamp > ? AND timestamp < ?;" [(toSql. T.unpack) tickerId, toSql timeframeSecs, (toSql . utcTimeToPOSIXSeconds) start, (toSql . utcTimeToPOSIXSeconds) end] + case headMay rows >>= (`atMay` 1) of + Just timeframe -> return $ DBData (Timeframe $ fromSql timeframe) [(interval, V.fromList $ mapMaybe (barFromResult tickerId) rows)] + Nothing -> return $ DBError "Unable to get bars timeframe" + barFromResult ticker [ts, _, open, high, low, close, volume] = Just Bar { barSecurity = ticker, barTimestamp = fromSql ts, barOpen = fromRational $ fromSql open, @@ -90,6 +100,36 @@ startDatabase config = do } barFromResult _ _ = Nothing + deleteFullyIncludedBars conn tickerId start end (Timeframe timeframeSecs) = do + delStmt <- prepare conn "DELETE FROM bars WHERE timestamp >= ? AND timestamp <= ? AND ticker == ? AND timeframe == ?;" + execute delStmt [(SqlPOSIXTime . utcTimeToPOSIXSeconds) start, (SqlPOSIXTime . utcTimeToPOSIXSeconds) end, (SqlString . T.unpack) tickerId, (SqlInteger . toInteger) timeframeSecs] + + deleteFullyIncludedIntervals conn tickerId start end (Timeframe timeframeSecs) = do + delStmt <- prepare conn "DELETE FROM intervals WHERE start >= ? AND end <= ? AND ticker == ? AND timeframe == ?;" + execute delStmt [(SqlPOSIXTime . utcTimeToPOSIXSeconds) start, (SqlPOSIXTime . utcTimeToPOSIXSeconds) end, (SqlString . T.unpack) tickerId, (SqlInteger . toInteger) timeframeSecs] + updatePartiallyIncludedIntervals conn tickerId start end (Timeframe timeframeSecs) = do + updStmt <- prepare conn "UPDATE intervals SET start=? WHERE start >= ? AND end <= ?;" + execute updStmt [(SqlPOSIXTime . utcTimeToPOSIXSeconds) start, (SqlPOSIXTime . utcTimeToPOSIXSeconds) start, (SqlPOSIXTime . utcTimeToPOSIXSeconds) end] + + updStmt2 <- prepare conn "UPDATE intervals SET end=? WHERE end >= ? AND end <= ?;" + execute updStmt2 [(SqlPOSIXTime . utcTimeToPOSIXSeconds) end, (SqlPOSIXTime . utcTimeToPOSIXSeconds) start, (SqlPOSIXTime . utcTimeToPOSIXSeconds) end] + insertBars conn timeframe bars = do + stmt <- prepare conn $ "INSERT INTO bars (ticker, timeframe, timestamp, open, high, low, close, volume)" ++ + " values (?, ?, ?, ?, ?, ?, ?, ?); " + executeMany stmt (map (barToSql timeframe) $ V.toList bars) + + getIntervals :: Connection -> TickerId -> UTCTime -> UTCTime -> Timeframe -> IO [TimeInterval] + getIntervals conn tickerId start end (Timeframe timeframeSecs)= do + rows <- quickQuery' conn "SELECT start, end FROM intervals WHERE ticker == ? AND timeframe == ? AND start >= ? AND end <= ?;" [(toSql. T.unpack) tickerId, toSql timeframeSecs, (toSql . utcTimeToPOSIXSeconds) start, (toSql . utcTimeToPOSIXSeconds) end] + return $ mapMaybe (\x -> do + s <- headMay x + e <- x `atMay` 1 + return $ TimeInterval (posixSecondsToUTCTime $ fromSql s) (posixSecondsToUTCTime $ fromSql e)) rows + + getInterval conn tickerId (Timeframe timeframeSecs) interval@(TimeInterval start end) = do + rows <- quickQuery' conn "SELECT timestamp, open, high, low, close, volume FROM bars WHERE ticker == ? AND timeframe == ? AND start >= ? AND end <= ?;" [(toSql. T.unpack) tickerId, toSql timeframeSecs, (toSql . utcTimeToPOSIXSeconds) start, (toSql . utcTimeToPOSIXSeconds) end] + return (interval, V.fromList $ mapMaybe (barFromSql tickerId) rows) + barToSql :: Timeframe -> Bar -> [SqlValue] barToSql (Timeframe timeframeSecs) bar = [(SqlString . T.unpack . barSecurity) bar, (SqlInteger . toInteger) timeframeSecs, @@ -97,7 +137,19 @@ startDatabase config = do (SqlRational . toRational . barHigh) bar, (SqlRational . toRational . barLow) bar, (SqlRational . toRational . barClose) bar, - (SqlRational . toRational . barVolume) bar ] + (SqlInteger . barVolume) bar ] + + barFromSql :: TickerId -> [SqlValue] -> Maybe Bar + barFromSql tickerId [ts, open, high, low, close, volume] = Just Bar { + barSecurity = tickerId, + barTimestamp = (posixSecondsToUTCTime . fromSql) ts, + barOpen = fromRational $ fromSql open, + barHigh = fromRational $ fromSql high, + barLow = fromRational $ fromSql low, + barClose = fromRational $ fromSql close, + barVolume = fromSql volume } + + barFromSql tickerId _ = Nothing stopDatabase :: MVar () -> DatabaseInterface -> IO () stopDatabase compVar db = killThread (tid db) >> readMVar compVar @@ -107,7 +159,7 @@ doGetData cmdVar respVar tickerId timeInterval timeframe = do putMVar cmdVar (DBGet tickerId timeInterval timeframe) resp <- takeMVar respVar case resp of - DBData x -> return x + DBData _ x -> return x DBError err -> do warningM "DB.Client" $ "Error while calling getData: " ++ show err return [] diff --git a/src/ATrade/Quotes/Finam.hs b/src/ATrade/Quotes/Finam.hs new file mode 100644 index 0000000..c750ffa --- /dev/null +++ b/src/ATrade/Quotes/Finam.hs @@ -0,0 +1,361 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TypeSynonymInstances #-} +{-# LANGUAGE FlexibleInstances #-} + +module ATrade.Quotes.Finam ( + downloadFinamSymbols, + Symbol(..), + Period(..), + DateFormat(..), + TimeFormat(..), + FieldSeparator(..), + RequestParams(..), + defaultParams, + downloadQuotes, + parseQuotes, + downloadAndParseQuotes, + Row(..) +) where + +import qualified Data.Text as T +import qualified Data.Text.ICU.Convert as TC +import Data.Time.Calendar +import Network.Wreq +import Control.Lens +import Data.Either.Combinators +import qualified Data.ByteString.Lazy as BL +import qualified Data.ByteString as B +import Safe +import qualified Data.Map as M +import Text.Parsec +import Text.ParserCombinators.Parsec.Char +import Text.ParserCombinators.Parsec.Number +import Data.List +import Data.Maybe +import Control.Error.Util +import Data.Text.Format +import Data.Csv +import Data.Time.Format +import qualified Data.ByteString.Char8 as B8 +import Data.Time.Clock +import Data.Decimal +import Control.Monad +import Control.Exception +import qualified Data.Vector as V +import System.Log.Logger + +data Period = + PeriodTick | + Period1Min | + Period5Min | + Period10Min | + Period15Min | + Period30Min | + PeriodHour | + PeriodDay | + PeriodWeek | + PeriodMonth + deriving (Show, Eq) + +instance Enum Period where + fromEnum PeriodTick = 1 + fromEnum Period1Min = 2 + fromEnum Period5Min = 3 + fromEnum Period10Min = 4 + fromEnum Period15Min = 5 + fromEnum Period30Min = 6 + fromEnum PeriodHour = 7 + fromEnum PeriodDay = 8 + fromEnum PeriodWeek = 9 + fromEnum PeriodMonth = 10 + + toEnum 1 = PeriodTick + toEnum 2 = Period1Min + toEnum 3 = Period5Min + toEnum 4 = Period10Min + toEnum 5 = Period15Min + toEnum 6 = Period30Min + toEnum 7 = PeriodHour + toEnum 8 = PeriodDay + toEnum 9 = PeriodWeek + toEnum 10 = PeriodMonth + toEnum _ = PeriodDay + +data DateFormat = + FormatYYYYMMDD | + FormatYYMMDD | + FormatDDMMYY | + FormatDD_MM_YY | + FormatMM_DD_YY + deriving (Show, Eq) + +instance Enum DateFormat where + fromEnum FormatYYYYMMDD = 1 + fromEnum FormatYYMMDD = 2 + fromEnum FormatDDMMYY = 3 + fromEnum FormatDD_MM_YY = 4 + fromEnum FormatMM_DD_YY = 5 + + toEnum 1 = FormatYYYYMMDD + toEnum 2 = FormatYYMMDD + toEnum 3 = FormatDDMMYY + toEnum 4 = FormatDD_MM_YY + toEnum 5 = FormatMM_DD_YY + toEnum _ = FormatYYYYMMDD + + +data TimeFormat = + FormatHHMMSS | + FormatHHMM | + FormatHH_MM_SS | + FormatHH_MM + deriving (Show, Eq) + +instance Enum TimeFormat where + fromEnum FormatHHMMSS = 1 + fromEnum FormatHHMM = 2 + fromEnum FormatHH_MM_SS = 3 + fromEnum FormatHH_MM = 4 + + toEnum 1 = FormatHHMMSS + toEnum 2 = FormatHHMM + toEnum 3 = FormatHH_MM_SS + toEnum 4 = FormatHH_MM + toEnum _ = FormatHHMMSS + +data FieldSeparator = + SeparatorComma | + SeparatorPeriod | + SeparatorSemicolon | + SeparatorTab | + SeparatorSpace + deriving (Show, Eq) + +instance Enum FieldSeparator where + fromEnum SeparatorComma = 1 + fromEnum SeparatorPeriod = 2 + fromEnum SeparatorSemicolon = 3 + fromEnum SeparatorTab = 4 + fromEnum SeparatorSpace = 5 + + toEnum 1 = SeparatorComma + toEnum 2 = SeparatorPeriod + toEnum 3 = SeparatorSemicolon + toEnum 4 = SeparatorTab + toEnum 5 = SeparatorSpace + toEnum _ = SeparatorComma + +data RequestParams = RequestParams { + ticker :: T.Text, + startDate :: Day, + endDate :: Day, + period :: Period, + dateFormat :: DateFormat, + timeFormat :: TimeFormat, + fieldSeparator :: FieldSeparator, + includeHeader :: Bool, + fillEmpty :: Bool +} + +defaultParams = RequestParams { + ticker = "", + startDate = fromGregorian 1970 1 1, + endDate = fromGregorian 1970 1 1, + period = PeriodDay, + dateFormat = FormatYYYYMMDD, + timeFormat = FormatHHMMSS, + fieldSeparator = SeparatorComma, + includeHeader = True, + fillEmpty = False +} + +data Symbol = Symbol { + symCode :: T.Text, + symName :: T.Text, + symId :: Integer, + symMarketCode :: Integer, + symMarketName :: T.Text +} + deriving (Show, Eq) + +data Row = Row { + rowTicker :: T.Text, + rowTime :: UTCTime, + rowOpen :: Decimal, + rowHigh :: Decimal, + rowLow :: Decimal, + rowClose :: Decimal, + rowVolume :: Integer +} deriving (Show, Eq) + +instance FromField Decimal where + parseField s = realFracToDecimal 10 <$> (parseField s :: Parser Double) + +instance FromRecord Row where + parseRecord v + | length v == 9 = do + tkr <- v .! 0 + date <- v .! 2 + time <- v .! 3 + dt <- parseDt date time + open <- v .! 4 + high <- v .! 5 + low <- v .! 6 + close <- v .! 7 + volume <- v .! 8 + return $ Row tkr dt open high low close volume + | otherwise = mzero + where + parseDt :: B.ByteString -> B.ByteString -> Parser UTCTime + parseDt d t = case parseTimeM True defaultTimeLocale "%Y%m%d %H%M%S" $ B8.unpack d ++ " " ++ B8.unpack t of + Just dt -> return dt + Nothing -> fail "Unable to parse date/time" + +downloadAndParseQuotes params = downloadAndParseQuotes' params 3 + where + downloadAndParseQuotes' params iter = do + raw <- downloadQuotes params `catch` (\e -> do + debugM "History" $ "exception: " ++ show (e :: SomeException) + return Nothing) + case raw of + Just r -> return $ parseQuotes r + Nothing -> if iter <= 0 then return Nothing else downloadAndParseQuotes' params (iter - 1) + +parseQuotes :: B.ByteString -> Maybe [Row] +parseQuotes csvData = case decode HasHeader $ BL.fromStrict csvData of + Left _ -> Nothing + Right d -> Just $ V.toList d + +downloadQuotes :: RequestParams -> IO (Maybe B.ByteString) +downloadQuotes params = do + symbols <- downloadFinamSymbols + case requestUrl symbols params of + Just (url, options) -> do + resp <- getWith options url + return $ Just $ BL.toStrict $ resp ^. responseBody + Nothing -> return Nothing + +requestUrl :: [Symbol] -> RequestParams -> Maybe (String, Options) +requestUrl symbols params = case getFinamCode symbols (ticker params) of + Just (sym, market) -> Just ("http://export.finam.ru/export9.out", getOptions sym market params) + Nothing -> Nothing + where + getOptions sym market params = defaults & + param "market" .~ [T.pack . show $ market] & + param "f" .~ [ticker params] & + param "e" .~ [".csv"] & + param "dtf" .~ [T.pack . show . fromEnum . dateFormat $ params] & + param "tmf" .~ [T.pack . show . fromEnum . dateFormat $ params] & + param "MSOR" .~ ["0"] & + param "mstime" .~ ["on"] & + param "mstimever" .~ ["1"] & + param "sep" .~ [T.pack . show . fromEnum . fieldSeparator $ params] & + param "sep2" .~ ["1"] & + param "at" .~ [if includeHeader params then "1" else "0"] & + param "fsp" .~ [if fillEmpty params then "1" else "0"] & + param "p" .~ [T.pack . show . fromEnum $ period params] & + param "em" .~ [T.pack . show $ sym ] & + param "df" .~ [T.pack . show $ dayFrom] & + param "mf" .~ [T.pack . show $ (monthFrom - 1)] & + param "yf" .~ [T.pack . show $ yearFrom] & + param "dt" .~ [T.pack . show $ dayTo] & + param "mt" .~ [T.pack . show $ (monthTo - 1)] & + param "yt" .~ [T.pack . show $ yearTo] & + param "code" .~ [ticker params] & + param "datf" .~ if period params == PeriodTick then ["11"] else ["1"] + (yearFrom, monthFrom, dayFrom) = toGregorian $ startDate params + (yearTo, monthTo, dayTo) = toGregorian $ endDate params + +getFinamCode :: [Symbol] -> T.Text -> Maybe (Integer, Integer) +getFinamCode symbols ticker = case find (\x -> symCode x == ticker && symMarketCode x `notElem` archives) symbols of + Just sym -> Just (symId sym, symMarketCode sym) + Nothing -> Nothing + +downloadFinamSymbols :: IO [Symbol] +downloadFinamSymbols = do + conv <- TC.open "cp1251" Nothing + result <- get "http://www.finam.ru/cache/icharts/icharts.js" + if result ^. responseStatus . statusCode == 200 + then return $ parseSymbols . T.lines $ TC.toUnicode conv $ BL.toStrict $ result ^. responseBody + else return [] + where + parseSymbols :: [T.Text] -> [Symbol] + parseSymbols strs = zipWith5 Symbol codes names ids marketCodes marketNames + where + getWithParser parser pos = fromMaybe [] $ do + s <- T.unpack <$> strs `atMay` pos + hush $ parse parser "" s + + ids :: [Integer] + ids = getWithParser intlist 0 + + names :: [T.Text] + names = T.pack <$> getWithParser strlist 1 + + codes :: [T.Text] + codes = T.pack <$> getWithParser strlist 2 + + marketCodes :: [Integer] + marketCodes = getWithParser intlist 3 + + marketNames :: [T.Text] + marketNames = fmap (\code -> fromMaybe "" $ M.lookup code codeToName) marketCodes + + intlist = do + string "var" + spaces + skipMany1 alphaNum + spaces + char '=' + spaces + char '[' + manyTill (do + i <- int + char ',' <|> char ']' + return i) (char '\'' <|> char ';') + + strlist = do + string "var" + spaces + skipMany1 alphaNum + spaces + char '=' + spaces + char '[' + (char '\'' >> manyTill ((char '\\' >> char '\'') <|> anyChar) (char '\'')) `sepBy` char ',' + +codeToName :: M.Map Integer T.Text +codeToName = M.fromList [ + (200, "МосБиржа топ"), + (1 , "МосБиржа акции"), + (14 , "МосБиржа фьючерсы"), + (41, "Курс рубля"), + (45, "МосБиржа валютный рынок"), + (2, "МосБиржа облигации"), + (12, "МосБиржа внесписочные облигации"), + (29, "МосБиржа пифы"), + (8, "Расписки"), + (6, "Мировые Индексы"), + (24, "Товары"), + (5, "Мировые валюты"), + (25, "Акции США(BATS)"), + (7, "Фьючерсы США"), + (27, "Отрасли экономики США"), + (26, "Гособлигации США"), + (28, "ETF"), + (30, "Индексы мировой экономики"), + (3, "РТС"), + (20, "RTS Board"), + (10, "РТС-GAZ"), + (17, "ФОРТС Архив"), + (31, "Сырье Архив"), + (38, "RTS Standard Архив"), + (16, "ММВБ Архив"), + (18, "РТС Архив"), + (9, "СПФБ Архив"), + (32, "РТС-BOARD Архив"), + (39, "Расписки Архив"), + (-1, "Отрасли") ] + + +archives = [3, 8, 16, 17, 18, 31, 32, 38, 39, 517] diff --git a/stack.yaml b/stack.yaml index db9f144..bf699b9 100644 --- a/stack.yaml +++ b/stack.yaml @@ -15,7 +15,7 @@ # resolver: # name: custom-snapshot # location: "./custom-snapshot.yaml" -resolver: lts-7.4 +resolver: lts-7.19 # User packages to be built. # Various formats can be used as shown in the example below. @@ -38,6 +38,7 @@ resolver: lts-7.4 packages: - '.' - '../libatrade' +- '../zeromq4-haskell-zap' # Dependency packages to be pulled from upstream that are not in the resolver # (e.g., acme-missiles-0.3) extra-deps: ["HDBC-postgresql-2.3.2.4", "datetime-0.3.1"]