2 changed files with 82 additions and 0 deletions
@ -0,0 +1,81 @@ |
|||||||
|
{-# LANGUAGE OverloadedStrings #-} |
||||||
|
{-# LANGUAGE QuasiQuotes #-} |
||||||
|
|
||||||
|
module ATrade.Broker.TradeSinks.GotifyTradeSink |
||||||
|
( |
||||||
|
withGotifyTradeSink |
||||||
|
) where |
||||||
|
import ATrade.Broker.Protocol (TradeSinkMessage (..)) |
||||||
|
import ATrade.Logging (Severity (Debug, Info), |
||||||
|
logWith) |
||||||
|
import ATrade.Types (SignalId (..), |
||||||
|
Trade (..), toDouble) |
||||||
|
import Control.Concurrent (forkIO, killThread, |
||||||
|
threadDelay) |
||||||
|
import qualified Control.Concurrent.BoundedChan as BC |
||||||
|
import Control.Concurrent.MVar (isEmptyMVar, |
||||||
|
newEmptyMVar, putMVar, |
||||||
|
tryReadMVar) |
||||||
|
import Control.Exception (bracket, handle, |
||||||
|
throwIO) |
||||||
|
import Control.Monad (void, when) |
||||||
|
import Control.Monad.Extra (whenM) |
||||||
|
import Control.Monad.Loops (whileM_) |
||||||
|
import Data.Aeson (encode) |
||||||
|
import qualified Data.ByteString as B |
||||||
|
import qualified Data.ByteString.Lazy as BL |
||||||
|
import qualified Data.List as L |
||||||
|
import Data.Maybe (isJust) |
||||||
|
import qualified Data.Text as T |
||||||
|
import Data.Text.Encoding (encodeUtf8) |
||||||
|
import qualified Data.Text.Lazy as TL |
||||||
|
import GHC.Exception (SomeException) |
||||||
|
import Language.Haskell.Printf |
||||||
|
import Network.HTTP.Client (defaultManagerSettings, |
||||||
|
httpLbs, newManager, |
||||||
|
parseRequest, |
||||||
|
withManager) |
||||||
|
import Network.HTTP.Client.MultipartFormData (formDataBody, partBS) |
||||||
|
|
||||||
|
withGotifyTradeSink server token logger f = do |
||||||
|
killMv <- newEmptyMVar |
||||||
|
chan <- BC.newBoundedChan 1000 |
||||||
|
bracket (forkIO $ sinkThread server token killMv chan logger) (stopSinkThread killMv) (\_ -> f $ sink chan) |
||||||
|
where |
||||||
|
sink = BC.writeChan |
||||||
|
|
||||||
|
sinkThread server token killMv chan logger = whileM_ (not <$> wasKilled) $ do |
||||||
|
log Info "GotifyTradeSink thread started" |
||||||
|
manager <- newManager defaultManagerSettings |
||||||
|
log Debug "Connected" |
||||||
|
sinkThread' manager |
||||||
|
log Info "Disconnected" |
||||||
|
where |
||||||
|
log sev = logWith logger sev "GotifyTradeSink" |
||||||
|
sinkThread' manager = do |
||||||
|
maybeTrade <- BC.tryReadChan chan |
||||||
|
case maybeTrade of |
||||||
|
Just trade -> do |
||||||
|
request <- parseRequest $ server <> "/message?token=" <> token |
||||||
|
requestWithData <- |
||||||
|
formDataBody [ partBS "title" "Trade" |
||||||
|
, partBS "message" (encodeTrade trade) |
||||||
|
, partBS "priority" "5"] request |
||||||
|
void $ httpLbs requestWithData manager |
||||||
|
Nothing -> do |
||||||
|
threadDelay 1000000 |
||||||
|
whenM (isEmptyMVar killMv) $ sinkThread' manager |
||||||
|
|
||||||
|
wasKilled = isJust <$> tryReadMVar killMv |
||||||
|
encodeTrade :: Trade -> B.ByteString |
||||||
|
encodeTrade = encodeUtf8 . TL.toStrict . formatTrade |
||||||
|
formatTrade trade = [t|New trade: %? %d %Q for %f (%Q/%Q)|] |
||||||
|
(tradeOperation trade) |
||||||
|
(tradeQuantity trade) |
||||||
|
(tradeSecurity trade) |
||||||
|
(toDouble $ tradePrice trade) |
||||||
|
(strategyId . tradeSignalId $ trade) |
||||||
|
(signalName . tradeSignalId $ trade) |
||||||
|
|
||||||
|
stopSinkThread killMv threadId = putMVar killMv () >> threadDelay 10000000 |
||||||
|
|
||||||
Loading…
Reference in new issue