commit 5631fdb5161af20c6e4b281279d2f7a7aed7dfe7 Author: Denis Tereshkin Date: Thu Feb 9 20:50:10 2017 +0700 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..67635a9 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ + +.* diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..6a042c2 --- /dev/null +++ b/LICENSE @@ -0,0 +1,30 @@ +Copyright Author name here (c) 2017 + +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following + disclaimer in the documentation and/or other materials provided + with the distribution. + + * Neither the name of Author name here nor the names of other + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..1c7dfaa --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +# qs-tunnel diff --git a/Setup.hs b/Setup.hs new file mode 100644 index 0000000..9a994af --- /dev/null +++ b/Setup.hs @@ -0,0 +1,2 @@ +import Distribution.Simple +main = defaultMain diff --git a/qs-tunnel.cabal b/qs-tunnel.cabal new file mode 100644 index 0000000..334eeb9 --- /dev/null +++ b/qs-tunnel.cabal @@ -0,0 +1,29 @@ +name: qs-tunnel +version: 0.1.0.0 +synopsis: Quotesource tunnel proxy +-- description: +homepage: https://github.com/asakul/qs-tunnel#readme +license: BSD3 +license-file: LICENSE +author: Denis Tereshkin +maintainer: denis@kasan.ws +copyright: 2017 Denis Tereshkin +category: Web +build-type: Simple +cabal-version: >=1.10 +extra-source-files: README.md + +executable qs-tunnel + hs-source-dirs: src + main-is: Main.hs + default-language: Haskell2010 + build-depends: base >= 4.7 && < 5 + , libatrade + , aeson + , monad-loops + , zeromq4-haskell + , zeromq4-haskell-zap + , text + , bytestring + , time + , hslogger diff --git a/src/Main.hs b/src/Main.hs new file mode 100644 index 0000000..3b82686 --- /dev/null +++ b/src/Main.hs @@ -0,0 +1,104 @@ +{-# LANGUAGE OverloadedStrings #-} + +module Main where + +import qualified Data.Text as T +import qualified Data.ByteString as B +import qualified Data.ByteString.Char8 as B8 +import qualified Data.ByteString.Lazy as BL +import qualified Data.List as L +import Data.List.NonEmpty +import Data.IORef +import Data.Time.Clock +import Data.Aeson + +import ATrade.QuoteSource.Server +import ATrade.QuoteSource.Client + +import Control.Monad +import Control.Monad.Loops + +import System.IO +import System.Log.Logger +import System.Log.Handler.Simple +import System.Log.Handler (setFormatter) +import System.Log.Formatter +import System.ZMQ4 +import System.ZMQ4.ZAP + +data Config = Config + { + confDownstreamEp :: T.Text, + confWhitelistIps :: [T.Text], + confBlacklistIps :: [T.Text], + confUpstreamEp :: T.Text, + confTimeout :: Integer + } deriving (Show, Eq) + +instance FromJSON Config where + parseJSON (Object o) = + Config <$> + o .: "downstream" <*> + o .:? "whitelist" .!= [] <*> + o .:? "blacklist" .!= [] <*> + o .: "upstream" <*> + o .: "timeout" + + parseJSON _ = fail "Expected object" + +initLogging = do + handler <- streamHandler stderr DEBUG >>= + (\x -> return $ + setFormatter x (simpleLogFormatter "$utcTime\t {$loggername} <$prio> -> $msg")) + + hSetBuffering stderr LineBuffering + updateGlobalLogger rootLoggerName (setLevel DEBUG) + updateGlobalLogger rootLoggerName (setHandlers [handler]) + +main :: IO () +main = do + initLogging + infoM "main" "Starting" + eConf <- eitherDecode . BL.fromStrict <$> B.readFile "qs-tunnel.conf" + case eConf of + Left errMsg -> error errMsg + Right conf -> runWithConfig conf + +runWithConfig conf = do + withContext $ \ctx -> + withSocket ctx Pub $ \downstream -> do + bind downstream $ T.unpack $ confDownstreamEp conf + + forever $ withSocket ctx Sub $ \upstream -> do + infoM "main" $ "Connecting to: " ++ (T.unpack $ confUpstreamEp conf) + connect upstream $ T.unpack $ confUpstreamEp conf + subscribe upstream B.empty + now <- getCurrentTime + lastHeartbeat <- newIORef now + lastHeartbeatSent <- newIORef now + infoM "main" "Starting proxy loop" + whileM (notTimeout lastHeartbeat conf) $ do + evs <- poll 200 [Sock upstream [In] Nothing] + sendHeartbeatIfNeeded lastHeartbeatSent downstream + unless (null (L.head evs)) $ do + incoming <- receiveMulti upstream + case incoming of + x:xs -> do + now <- getCurrentTime + writeIORef lastHeartbeat now + sendMulti downstream $ x :| xs + _ -> return () + where + notTimeout ref conf = do + now <- getCurrentTime + lastHb <- readIORef ref + return $ now `diffUTCTime` lastHb < (fromInteger . confTimeout) conf + + sendHeartbeatIfNeeded lastHbSent sock = do + now <- getCurrentTime + last <- readIORef lastHbSent + when (now `diffUTCTime` last > 1) $ do + send sock [] $ B8.pack "SYSTEM#HEARTBEAT" + writeIORef lastHbSent now + + diff --git a/stack.yaml b/stack.yaml new file mode 100644 index 0000000..7e619e0 --- /dev/null +++ b/stack.yaml @@ -0,0 +1,68 @@ +# This file was automatically generated by 'stack init' +# +# Some commonly used options have been documented as comments in this file. +# For advanced use and comprehensive documentation of the format, please see: +# http://docs.haskellstack.org/en/stable/yaml_configuration/ + +# Resolver to choose a 'specific' stackage snapshot or a compiler version. +# A snapshot resolver dictates the compiler version and the set of packages +# to be used for project dependencies. For example: +# +# resolver: lts-3.5 +# resolver: nightly-2015-09-21 +# resolver: ghc-7.10.2 +# resolver: ghcjs-0.1.0_ghc-7.10.2 +# resolver: +# name: custom-snapshot +# location: "./custom-snapshot.yaml" +resolver: lts-7.19 + +# User packages to be built. +# Various formats can be used as shown in the example below. +# +# packages: +# - some-directory +# - https://example.com/foo/bar/baz-0.0.2.tar.gz +# - location: +# git: https://github.com/commercialhaskell/stack.git +# commit: e7b331f14bcffb8367cd58fbfc8b40ec7642100a +# - location: https://github.com/commercialhaskell/stack/commit/e7b331f14bcffb8367cd58fbfc8b40ec7642100a +# extra-dep: true +# subdirs: +# - auto-update +# - wai +# +# A package marked 'extra-dep: true' will only be built if demanded by a +# non-dependency (i.e. a user package), and its test suites and benchmarks +# will not be run. This is useful for tweaking upstream packages. +packages: +- '.' +- '../libatrade' +- '../zeromq4-haskell-zap' +# Dependency packages to be pulled from upstream that are not in the resolver +# (e.g., acme-missiles-0.3) +extra-deps: [ "datetime-0.3.1" ] + +# Override default flag values for local packages and extra-deps +flags: {} + +# Extra package databases containing global packages +extra-package-dbs: [] + +# Control whether we use the GHC we find on the path +# system-ghc: true +# +# Require a specific version of stack, using version ranges +# require-stack-version: -any # Default +# require-stack-version: ">=1.3" +# +# Override the architecture used by stack, especially useful on Windows +# arch: i386 +# arch: x86_64 +# +# Extra directories used by stack for building +# extra-include-dirs: [/path/to/dir] +# extra-lib-dirs: [/path/to/dir] +# +# Allow a newer minor version of GHC than the snapshot specifies +# compiler-check: newer-minor