Browse Source

Moved infrastructure-related things to libatrade

master
Denis Tereshkin 9 years ago
parent
commit
6a0345ca27
  1. 34
      app/Main.hs
  2. 11
      quik-connector.cabal
  3. 136
      src/Broker.hs
  4. 49
      src/Broker/PaperBroker.hs
  5. 117
      src/Broker/Server.hs
  6. 77
      src/Data/ATrade.hs
  7. 6
      src/Lib.hs
  8. 2
      src/QuoteSource/DataImport.hs
  9. 51
      src/QuoteSource/Server.hs
  10. 2
      src/QuoteSource/TableParser.hs
  11. 12
      src/QuoteSource/TableParsers/AllParamsTableParser.hs
  12. 3
      stack.yaml

34
app/Main.hs

@ -1,20 +1,21 @@
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE OverloadedStrings #-}
module Main where module Main where
import Lib
import QuoteSource.DataImport import QuoteSource.DataImport
import Control.Concurrent hiding (readChan) import Control.Concurrent hiding (readChan)
import Control.Monad import Control.Monad
import Control.Exception
import Control.Monad.IO.Class import Control.Monad.IO.Class
import Data.IORef import Data.IORef
import Graphics.UI.Gtk hiding (Action, backspace) import Graphics.UI.Gtk hiding (Action, backspace)
import Control.Concurrent.BoundedChan import Control.Concurrent.BoundedChan
import Data.ATrade import ATrade.Types
import QuoteSource.TableParsers.AllParamsTableParser import QuoteSource.TableParsers.AllParamsTableParser
import QuoteSource.TableParser import QuoteSource.TableParser
import QuoteSource.Server import ATrade.QuoteSource.Server
import Broker import ATrade.Broker.Server
import ATrade.Broker.Protocol
import Broker.PaperBroker import Broker.PaperBroker
import System.Log.Logger import System.Log.Logger
@ -75,14 +76,16 @@ parseConfig = withObject "object" $ \obj -> do
tableName = tn, tableName = tn,
tableParams = params } tableParams = params }
forkBoundedChan :: Int -> Int -> BoundedChan a -> IO (ThreadId, [BoundedChan a]) forkBoundedChan :: Int -> BoundedChan a -> IO (ThreadId, BoundedChan a, BoundedChan (Maybe a))
forkBoundedChan chans size source = do forkBoundedChan size source = do
sinks <- replicateM chans (newBoundedChan size) sink <- newBoundedChan size
sinkMaybe <- newBoundedChan size
tid <- forkIO $ forever $ do tid <- forkIO $ forever $ do
v <- readChan source v <- readChan source
mapM_ (`tryWriteChan` v) sinks tryWriteChan sink v
tryWriteChan sinkMaybe (Just v)
return (tid, sinks) return (tid, sink, sinkMaybe)
main :: IO () main :: IO ()
@ -95,12 +98,12 @@ main = do
infoM "main" "Starting data import server" infoM "main" "Starting data import server"
dis <- initDataImportServer [MkTableParser $ mkAllParamsTableParser "allparams"] chan "atrade" dis <- initDataImportServer [MkTableParser $ mkAllParamsTableParser "allparams"] chan "atrade"
(forkId, [c1, c2]) <- forkBoundedChan 2 1000 chan (forkId, c1, c2) <- forkBoundedChan 1000 chan
broker <- mkPaperBroker c2 1000000 ["demo"]
withContext (\ctx -> do
qsServer <- startQuoteSourceServer c1 ctx (quotesourceEndpoint config)
broker <- mkPaperBroker c1 1000000 ["demo"]
withContext (\ctx ->
bracket (startQuoteSourceServer c2 ctx (T.pack $ quotesourceEndpoint config)) stopQuoteSourceServer (\qsServer ->
bracket (startBrokerServer [broker] ctx (T.pack $ brokerserverEndpoint config)) stopBrokerServer (\broServer -> do
void initGUI void initGUI
window <- windowNew window <- windowNew
window `on` deleteEvent $ do window `on` deleteEvent $ do
@ -108,7 +111,6 @@ main = do
return False return False
widgetShowAll window widgetShowAll window
mainGUI mainGUI
stopQuoteSourceServer qsServer infoM "main" "Main thread done")))
infoM "main" "Main thread done")
killThread forkId killThread forkId

11
quik-connector.cabal

@ -15,15 +15,10 @@ cabal-version: >=1.10
library library
hs-source-dirs: src hs-source-dirs: src
exposed-modules: Lib exposed-modules: QuoteSource.DataImport
, QuoteSource.DataImport
, Data.ATrade
, QuoteSource.TableParser , QuoteSource.TableParser
, QuoteSource.TableParsers.AllParamsTableParser , QuoteSource.TableParsers.AllParamsTableParser
, QuoteSource.Server
, Broker
, Broker.PaperBroker , Broker.PaperBroker
, Broker.Server
ghc-options: -Wincomplete-patterns ghc-options: -Wincomplete-patterns
build-depends: base >= 4.7 && < 5 build-depends: base >= 4.7 && < 5
, Win32 , Win32
@ -45,6 +40,9 @@ library
, hashable , hashable
, unordered-containers , unordered-containers
, aeson , aeson
, cond
, scientific
, libatrade
default-language: Haskell2010 default-language: Haskell2010
extra-libraries: "user32" extra-libraries: "user32"
other-modules: System.Win32.XlParser other-modules: System.Win32.XlParser
@ -66,6 +64,7 @@ executable quik-connector-exe
, vector , vector
, text , text
, zeromq4-haskell , zeromq4-haskell
, libatrade
default-language: Haskell2010 default-language: Haskell2010
extra-libraries: "user32" extra-libraries: "user32"

136
src/Broker.hs

@ -1,136 +0,0 @@
{-# LANGUAGE OverloadedStrings #-}
module Broker (
SignalId(..),
OrderPrice(..),
Operation(..),
OrderId(..),
OrderState(..),
Order(..),
Trade(..),
Broker(..)
) where
import Data.Decimal
import Data.Time.Clock
import Data.Aeson
import Data.Aeson.Types
import Control.Monad
data SignalId = SignalId {
strategyId :: String,
signalName :: String,
comment :: String }
deriving (Show, Eq)
instance FromJSON SignalId where
parseJSON (Object o) = SignalId <$>
o .: "strategy-id" .!= "" <*>
o .: "signal-name" .!= "" <*>
o .: "commen" .!= ""
parseJSON _ = fail "Should be object"
data OrderPrice = Market | Limit Decimal | Stop Decimal Decimal | StopMarket Decimal
deriving (Show, Eq)
decimal :: (RealFrac r) => r -> Decimal
decimal = realFracToDecimal 10
instance FromJSON OrderPrice where
parseJSON (String s) = when (s /= "market") (fail "If string, then should be 'market'") >>
return Market
parseJSON (Number n) = return $ Limit $ decimal n
parseJSON (Object v) = do
triggerPrice <- v .: "trigger" :: Parser Double
execPrice <- v .: "execution"
case execPrice of
(String s) -> when (s /= "market") (fail "If string, then should be 'market'") >> return $ StopMarket (decimal triggerPrice)
(Number n) -> return $ Stop (decimal triggerPrice) (decimal n)
_ -> fail "Should be either number or 'market'"
parseJSON _ = fail "OrderPrice"
data Operation = Buy | Sell
deriving (Show, Eq)
instance FromJSON Operation where
parseJSON (String s)
| s == "buy" = return Buy
| s == "sell" = return Sell
| otherwise = fail "Should be either 'buy' or 'sell'"
parseJSON _ = fail "Should be string"
type OrderId = Integer
data OrderState = Unsubmitted
| Submitted
| PartiallyExecuted
| Executed
| Cancelled
| Rejected String
| Error String
deriving (Show, Eq)
instance FromJSON OrderState where
parseJSON (String s)
| s == "unsubmitted" = return Unsubmitted
| s == "submitted" = return Submitted
| s == "partially-executed" = return PartiallyExecuted
| s == "executed" = return Executed
| s == "cancelled" = return Cancelled
| s == "rejected" = return $ Rejected ""
| s == "error" = return $ Broker.Error ""
| otherwise = fail "Invlaid state"
parseJSON _ = fail "Should be string"
data Order = Order {
orderId :: OrderId,
orderAccountId :: String,
orderSecurity :: String,
orderPrice :: OrderPrice,
orderQuantity :: Integer,
orderExecutedQuantity :: Integer,
orderOperation :: Operation,
orderState :: OrderState,
orderSignalId :: SignalId }
deriving (Show, Eq)
instance FromJSON Order where
parseJSON (Object v) = Order <$>
v .:? "order-id" .!= 0 <*>
v .: "account" <*>
v .: "security" <*>
v .: "price" <*>
v .: "quantity" <*>
v .:? "executed-quantity" .!= 0 <*>
v .: "operation" <*>
v .: "state" .!= Unsubmitted <*>
v .: "signal-id"
parseJSON _ = fail "Should be string"
data Trade = Trade {
tradeOrderId :: OrderId,
tradePrice :: Decimal,
tradeQuantity :: Integer,
tradeVolume :: Decimal,
tradeVolumeCurrency :: String,
tradeAccount :: String,
tradeSecurity :: String,
tradeTimestamp :: UTCTime,
tradeSignalId :: SignalId }
deriving (Show, Eq)
data Broker = Broker {
accounts :: [String],
setTradeCallback :: Maybe (Trade -> IO ()) -> IO(),
setOrderCallback :: Maybe (Order -> IO ()) -> IO(),
submitOrder :: Order -> IO OrderId,
cancelOrder :: OrderId -> IO (),
getOrder :: OrderId -> IO (Maybe Order),
destroyBroker :: IO ()
}

49
src/Broker/PaperBroker.hs

@ -1,3 +1,4 @@
{-# LANGUAGE OverloadedStrings #-}
module Broker.PaperBroker ( module Broker.PaperBroker (
PaperBrokerState, PaperBrokerState,
@ -7,17 +8,19 @@ module Broker.PaperBroker (
import Data.Hashable import Data.Hashable
import Data.Bits import Data.Bits
import Control.Concurrent.BoundedChan import Control.Concurrent.BoundedChan
import Data.ATrade import ATrade.Types
import Data.IORef import Data.IORef
import qualified Data.HashMap as M import qualified Data.HashMap as M
import Broker import qualified Data.Text as T
import ATrade.Broker.Protocol
import ATrade.Broker.Server
import Data.Time.Clock import Data.Time.Clock
import Data.Decimal import Data.Decimal
import Control.Monad import Control.Monad
import Control.Concurrent hiding (readChan) import Control.Concurrent hiding (readChan)
import System.Log.Logger import System.Log.Logger
data TickMapKey = TickMapKey String DataType data TickMapKey = TickMapKey T.Text DataType
deriving (Show, Eq, Ord) deriving (Show, Eq, Ord)
instance Hashable TickMapKey where instance Hashable TickMapKey where
@ -30,11 +33,10 @@ data PaperBrokerState = PaperBrokerState {
orders :: M.Map OrderId Order, orders :: M.Map OrderId Order,
cash :: Decimal, cash :: Decimal,
orderIdCounter :: OrderId, orderIdCounter :: OrderId,
tradeCallback :: Maybe (Trade -> IO ()), notificationCallback :: Maybe (Notification -> IO ())
orderCallback :: Maybe (Order -> IO ())
} }
mkPaperBroker :: BoundedChan Tick -> Decimal -> [String] -> IO Broker mkPaperBroker :: BoundedChan Tick -> Decimal -> [T.Text] -> IO BrokerInterface
mkPaperBroker tickChan startCash accounts = do mkPaperBroker tickChan startCash accounts = do
state <- newIORef PaperBrokerState { state <- newIORef PaperBrokerState {
pbTid = Nothing, pbTid = Nothing,
@ -43,20 +45,17 @@ mkPaperBroker tickChan startCash accounts = do
orders = M.empty, orders = M.empty,
cash = startCash, cash = startCash,
orderIdCounter = 1, orderIdCounter = 1,
tradeCallback = Nothing, notificationCallback = Nothing }
orderCallback = Nothing }
tid <- forkIO $ brokerThread state tid <- forkIO $ brokerThread state
atomicModifyIORef' state (\s -> (s { pbTid = Just tid }, ()) ) atomicModifyIORef' state (\s -> (s { pbTid = Just tid }, ()) )
return Broker { return BrokerInterface {
accounts = accounts, accounts = accounts,
setTradeCallback = pbSetTradeCallback state, setNotificationCallback = pbSetNotificationCallback state,
setOrderCallback = pbSetOrderCallback state,
submitOrder = pbSubmitOrder state, submitOrder = pbSubmitOrder state,
cancelOrder = pbCancelOrder state, cancelOrder = pbCancelOrder state,
getOrder = pbGetOrder state, stopBroker = pbDestroyBroker state }
destroyBroker = pbDestroyBroker state }
brokerThread :: IORef PaperBrokerState -> IO () brokerThread :: IORef PaperBrokerState -> IO ()
brokerThread state = do brokerThread state = do
@ -73,13 +72,11 @@ nextOrderId state = do
modifyIORef state (\s -> s { orderIdCounter = id + 1 } ) modifyIORef state (\s -> s { orderIdCounter = id + 1 } )
return id return id
pbSetTradeCallback :: IORef PaperBrokerState -> Maybe (Trade -> IO ()) -> IO() pbSetNotificationCallback :: IORef PaperBrokerState -> Maybe (Notification -> IO ()) -> IO()
pbSetTradeCallback state callback = modifyIORef state (\s -> s { tradeCallback = callback } ) pbSetNotificationCallback state callback = modifyIORef state (\s -> s { notificationCallback = callback } )
pbSetOrderCallback :: IORef PaperBrokerState -> Maybe (Order -> IO ()) -> IO()
pbSetOrderCallback state callback = modifyIORef state (\s -> s { orderCallback = callback } )
pbSubmitOrder :: IORef PaperBrokerState -> Order -> IO OrderId pbSubmitOrder :: IORef PaperBrokerState -> Order -> IO ()
pbSubmitOrder state order = do pbSubmitOrder state order = do
curState <- readIORef state curState <- readIORef state
case orderPrice order of case orderPrice order of
@ -91,18 +88,15 @@ pbSubmitOrder state order = do
where where
executeMarketOrder state order = do executeMarketOrder state order = do
tm <- tickMap <$> readIORef state tm <- tickMap <$> readIORef state
oid <- nextOrderId state
case M.lookup key tm of case M.lookup key tm of
Nothing -> let newOrder = order { orderState = Error "Unable to execute order: no bid/ask", orderId = oid } in Nothing -> let newOrder = order { orderState = OrderError } in
atomicModifyIORef' state (\s -> (s { orders = M.insert oid newOrder $ orders s }, ()) ) atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s }, ()) )
Just tick -> let newOrder = order { orderState = Executed, orderId = oid } Just tick -> let newOrder = order { orderState = Executed }
tradeVolume = (realFracToDecimal 10 (fromIntegral $ orderQuantity order) * value tick) in do tradeVolume = (realFracToDecimal 10 (fromIntegral $ orderQuantity order) * value tick) in do
atomicModifyIORef' state (\s -> (s { orders = M.insert oid newOrder $ orders s , cash = cash s - tradeVolume}, ()) ) atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s , cash = cash s - tradeVolume}, ()) )
ts <- getCurrentTime ts <- getCurrentTime
maybeCall tradeCallback state $ mkTrade tick order ts maybeCall notificationCallback state $ TradeNotification $ mkTrade tick order ts
return oid
submitLimitOrder = undefined submitLimitOrder = undefined
submitStopOrder = undefined submitStopOrder = undefined
@ -126,13 +120,14 @@ pbSubmitOrder state order = do
tradeQuantity = orderQuantity order, tradeQuantity = orderQuantity order,
tradeVolume = realFracToDecimal 10 (fromIntegral $ orderQuantity order) * value tick, tradeVolume = realFracToDecimal 10 (fromIntegral $ orderQuantity order) * value tick,
tradeVolumeCurrency = "TEST", tradeVolumeCurrency = "TEST",
tradeOperation = orderOperation order,
tradeAccount = orderAccountId order, tradeAccount = orderAccountId order,
tradeSecurity = orderSecurity order, tradeSecurity = orderSecurity order,
tradeTimestamp = timestamp, tradeTimestamp = timestamp,
tradeSignalId = orderSignalId order } tradeSignalId = orderSignalId order }
pbCancelOrder :: IORef PaperBrokerState -> OrderId -> IO () pbCancelOrder :: IORef PaperBrokerState -> OrderId -> IO Bool
pbCancelOrder state order = undefined pbCancelOrder state order = undefined
pbDestroyBroker :: IORef PaperBrokerState -> IO () pbDestroyBroker :: IORef PaperBrokerState -> IO ()

117
src/Broker/Server.hs

@ -1,117 +0,0 @@
{-# LANGUAGE OverloadedStrings #-}
module Broker.Server (
) where
import System.ZMQ4
import qualified Data.Map as M
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
import Data.ATrade
import Data.IORef
import qualified Data.HashMap.Strict as HM
import Broker
import Control.Concurrent
import Control.Exception
import Data.Aeson
import Data.Aeson.Types
import Data.Int
import Data.Time.Clock
import Data.List as L
import qualified Data.List.NonEmpty as LN
import System.Log.Logger
type RequestSqnum = Int64
type PeerId = B.ByteString
data BrokerServerState = BrokerServerState {
bsSocket :: Socket Router,
orderMap :: M.Map OrderId B.ByteString, -- Matches 0mq client identities with corresponding orders
lastPacket :: M.Map B.ByteString (RequestSqnum, B.ByteString),
pendingNotifications :: [(Order, UTCTime)], -- List of tuples (Order with new state, Time when notification enqueued)
brokers :: [Broker]
}
newtype BrokerServerHandle = BrokerServerHandle ThreadId
mkBrokerServer :: [Broker] -> Context -> String -> IO BrokerServerHandle
mkBrokerServer brokers c ep = do
sock <- socket c Router
bind sock ep
tid <- myThreadId
state <- newIORef BrokerServerState {
bsSocket = sock,
orderMap = M.empty,
lastPacket = M.empty,
pendingNotifications = [],
brokers = brokers
}
BrokerServerHandle <$> forkIO (brokerServerThread state)
data BrokerServerMessage = SubmitOrder RequestSqnum Order | CancelOrder RequestSqnum OrderId
data BrokerServerResponse = OrderIdResponse OrderId
instance ToJSON BrokerServerResponse where
toJSON (OrderIdResponse oid) = object ["order-id" .= oid]
parseMessage :: Value -> Parser BrokerServerMessage
parseMessage (Object obj) = do
rqsqnum <- obj .: "request-sqnum" :: Parser Int64
case HM.lookup "order" obj of
Just (Object orderJson) -> do
order <- obj .: "order"
return $ SubmitOrder rqsqnum order
_ -> case HM.lookup "cancel-order" obj of
Just orderIdJson -> do
order <- obj .: "cancel-order"
return $ CancelOrder rqsqnum order
Nothing -> fail "Either 'order' or 'cancel-order' field should be present"
where
parseMessage _ = fail "Should be object"
brokerServerThread :: IORef BrokerServerState -> IO ()
brokerServerThread state = finally brokerServerThread' cleanup
where
cleanup = do
s <- bsSocket <$> readIORef state
close s
brokerServerThread' = do
s <- bsSocket <$> readIORef state
msg <- receiveMulti s
tryDeliverPendingNotifications
handleMessage msg
tryDeliverPendingNotifications = return ()
handleMessage :: [B.ByteString] -> IO ()
handleMessage (peerId:_:json:_) = maybe (return ()) (handleMessage' peerId) (decode (BL.fromStrict json) >>= parseMaybe parseMessage)
handleMessage _ = warningM "BrokerServer" "Invalid packet received, should be at least 3 parts"
handleMessage' :: PeerId -> BrokerServerMessage -> IO ()
handleMessage' peerId (SubmitOrder sqnum order) = do
s <- bsSocket <$> readIORef state
lastPack <- M.lookup peerId . lastPacket <$> readIORef state
case shouldResend lastPack sqnum of
Just packet -> sendMulti s $ LN.fromList [peerId, B.empty, packet]
Nothing -> do
brs <- brokers <$> readIORef state
case findBroker brs (orderAccountId order) of
Just broker -> do
orderId <- submitOrder broker order
let packet = BL.toStrict . encode $ OrderIdResponse orderId
atomicModifyIORef' state (\s -> (s { lastPacket = M.insert peerId (sqnum, packet) $ lastPacket s }, ()))
sendMulti s $ LN.fromList [peerId, B.empty, packet]
Nothing -> warningM "BrokerServer" $ "Invalid account requested: " ++ orderAccountId order
where
shouldResend lastPack sqnum = case lastPack of
Nothing -> Nothing
Just (oldSqnum, packet) -> if oldSqnum == sqnum
then Just packet
else Nothing
findBroker brokers account = L.find (L.elem account . accounts) brokers
handleMessage' peerId (CancelOrder sqnum orderId) = undefined

77
src/Data/ATrade.hs

@ -1,77 +0,0 @@
module Data.ATrade (
Tick(..),
DataType(..),
serializeTick
) where
import Data.Decimal
import Data.Time.Clock
import Data.DateTime
import Data.ByteString.Lazy as B
import Data.Text as T
import Data.Text.Encoding as E
import Data.List as L
import Data.Binary.Builder
data DataType = Unknown
| Price
| OpenInterest
| BestBid
| BestOffer
| Depth
| TheoryPrice
| Volatility
| TotalSupply
| TotalDemand
deriving (Show, Eq, Ord)
instance Enum DataType where
fromEnum x
| x == Price = 1
| x == OpenInterest = 3
| x == BestBid = 4
| x == BestOffer = 5
| x == Depth = 6
| x == TheoryPrice = 7
| x == Volatility = 8
| x == TotalSupply = 9
| x == TotalDemand = 10
| x == Unknown = -1
| otherwise = -1
toEnum x
| x == 1 = Price
| x == 3 = OpenInterest
| x == 4 = BestBid
| x == 5 = BestOffer
| x == 6 = Depth
| x == 7 = TheoryPrice
| x == 8 = Volatility
| x == 9 = TotalSupply
| x == 10 = TotalDemand
| otherwise = Unknown
data Tick = Tick {
security :: String,
datatype :: DataType,
timestamp :: UTCTime,
value :: Decimal,
volume :: Integer
} deriving (Show, Eq)
serializeTick :: Tick -> [ByteString]
serializeTick tick = header : [rawdata]
where
header = B.fromChunks [ E.encodeUtf8 . T.pack $ security tick ]
rawdata = toLazyByteString $ mconcat [
putWord32le 1,
putWord64le $ fromIntegral . toSeconds . timestamp $ tick,
putWord32le $ fromIntegral . truncate . (* 1000000) . fractionalPart . utctDayTime . timestamp $ tick,
putWord32le $ fromIntegral . fromEnum . datatype $ tick,
putWord64le $ truncate . value $ tick,
putWord32le $ truncate . (* 1000000000) . fractionalPart $ value tick,
putWord32le $ fromIntegral $ volume tick ]
fractionalPart :: (RealFrac a) => a -> a
fractionalPart x = x - fromIntegral (floor x)

6
src/Lib.hs

@ -1,6 +0,0 @@
module Lib
( someFunc
) where
someFunc :: IO ()
someFunc = putStrLn "someFunc"

2
src/QuoteSource/DataImport.hs

@ -8,7 +8,7 @@ module QuoteSource.DataImport
import Control.Concurrent.BoundedChan import Control.Concurrent.BoundedChan
import Control.Monad.State.Strict import Control.Monad.State.Strict
import Data.ATrade import ATrade.Types
import Data.IORef import Data.IORef
import Data.Time.Clock import Data.Time.Clock
import QuoteSource.TableParser import QuoteSource.TableParser

51
src/QuoteSource/Server.hs

@ -1,51 +0,0 @@
module QuoteSource.Server (
startQuoteSourceServer,
stopQuoteSourceServer
) where
import System.ZMQ4
import Control.Concurrent.BoundedChan
import Data.ATrade
import Control.Concurrent hiding (readChan)
import Control.Monad
import Control.Exception
import qualified Data.ByteString.Lazy as BL
import Data.List.NonEmpty hiding (map)
import System.Log.Logger
data QuoteSourceServer = QuoteSourceServerState {
ctx :: Context,
outSocket :: Socket Pub,
tickChannel :: BoundedChan Tick,
serverThreadId :: ThreadId
}
serverThread :: QuoteSourceServer -> IO ()
serverThread state = do
finally serverThread' cleanup
debugM "QuoteSource" "server thread done"
where
cleanup = close $ outSocket state
serverThread' = forever $ do
tick <- readChan $ tickChannel state
sendMulti (outSocket state) $ fromList . map BL.toStrict $ serializeTick tick
startQuoteSourceServer :: BoundedChan Tick -> Context -> String -> IO QuoteSourceServer
startQuoteSourceServer chan c ep = do
sock <- socket c Pub
bind sock ep
tid <- myThreadId
let state = QuoteSourceServerState {
ctx = c,
outSocket = sock,
tickChannel = chan,
serverThreadId = tid
}
stid <- forkIO $ serverThread state
return $ state { serverThreadId = stid }
stopQuoteSourceServer :: QuoteSourceServer -> IO ()
stopQuoteSourceServer server = killThread $ serverThreadId server

2
src/QuoteSource/TableParser.hs

@ -6,7 +6,7 @@ module QuoteSource.TableParser (
) where ) where
import System.Win32.XlParser import System.Win32.XlParser
import Data.ATrade import ATrade.Types
import Control.Monad.State.Strict import Control.Monad.State.Strict
import Data.Time.Clock import Data.Time.Clock

12
src/QuoteSource/TableParsers/AllParamsTableParser.hs

@ -1,3 +1,4 @@
{-# LANGUAGE OverloadedStrings #-}
module QuoteSource.TableParsers.AllParamsTableParser ( module QuoteSource.TableParsers.AllParamsTableParser (
AllParamsTableParser, AllParamsTableParser,
@ -6,7 +7,7 @@ module QuoteSource.TableParsers.AllParamsTableParser (
import qualified Data.Map.Lazy as M import qualified Data.Map.Lazy as M
import QuoteSource.TableParser import QuoteSource.TableParser
import Data.ATrade import ATrade.Types
import System.Win32.XlParser import System.Win32.XlParser
import Data.Tuple import Data.Tuple
import Data.Decimal import Data.Decimal
@ -14,6 +15,7 @@ import Control.Monad.State.Strict
import Data.Time.Clock import Data.Time.Clock
import Data.Maybe import Data.Maybe
import Data.DateTime import Data.DateTime
import qualified Data.Text as T
data TableColumn = CUnknown data TableColumn = CUnknown
| CTicker | CTicker
@ -54,7 +56,7 @@ type TableSchema = M.Map TableColumn Int
data AllParamsTableParser = AllParamsTableParser { data AllParamsTableParser = AllParamsTableParser {
schema :: Maybe TableSchema, schema :: Maybe TableSchema,
tableId :: String, tableId :: String,
volumes :: M.Map String Integer, volumes :: M.Map T.Text Integer,
timestampHint :: UTCTime timestampHint :: UTCTime
} }
@ -64,8 +66,8 @@ mkAllParamsTableParser id = AllParamsTableParser {
volumes = M.empty, volumes = M.empty,
timestampHint = startOfTime } timestampHint = startOfTime }
securityName :: String -> String -> String securityName :: String -> String -> T.Text
securityName classCode ticker = classCode ++ ('#' : ticker) securityName classCode ticker = T.pack $ classCode ++ ('#' : ticker)
parseSchema (width, height, cells) = M.fromList . zipWith (curry swap) [0..] $ map parseSchemaItem . take width $ cells parseSchema (width, height, cells) = M.fromList . zipWith (curry swap) [0..] $ map parseSchemaItem . take width $ cells
where where
@ -126,7 +128,7 @@ parseWithSchema sch (width, height, cells) = do
return Nothing return Nothing
_ -> return Nothing _ -> return Nothing
calculateTickVolume :: [XlData] -> String -> State AllParamsTableParser Integer calculateTickVolume :: [XlData] -> T.Text -> State AllParamsTableParser Integer
calculateTickVolume row secname = case M.lookup CVolume sch of calculateTickVolume row secname = case M.lookup CVolume sch of
Nothing -> return 1 Nothing -> return 1
Just index -> case row `safeAt` index of Just index -> case row `safeAt` index of

3
stack.yaml

@ -37,9 +37,10 @@ resolver: lts-7.0
# will not be run. This is useful for tweaking upstream packages. # will not be run. This is useful for tweaking upstream packages.
packages: packages:
- '.' - '.'
- '../libatrade'
# Dependency packages to be pulled from upstream that are not in the resolver # Dependency packages to be pulled from upstream that are not in the resolver
# (e.g., acme-missiles-0.3) # (e.g., acme-missiles-0.3)
extra-deps: [ "datetime-0.3.1"] extra-deps: [ "datetime-0.3.1", "cond-0.4.1.1"]
# Override default flag values for local packages and extra-deps # Override default flag values for local packages and extra-deps
flags: {} flags: {}

Loading…
Cancel
Save