From 3c29b07c7cb6cb15f932a8cf6c151c2f88d2e16c Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Sun, 19 Nov 2023 21:29:00 +0100 Subject: [PATCH] Disconnect ws connections exceeding limit of requests --- api/worker.go | 2 +- blockbook.go | 7 +++ common/internalstate.go | 16 +++++++ server/internal.go | 29 ++++++++++++ server/websocket.go | 44 ++++++++++++++----- static/internal_templates/index.html | 3 ++ .../ws_limit_exceeding_ips.html | 29 ++++++++++++ 7 files changed, 119 insertions(+), 11 deletions(-) create mode 100644 static/internal_templates/ws_limit_exceeding_ips.html diff --git a/api/worker.go b/api/worker.go index e29c90e0..8b07f805 100644 --- a/api/worker.go +++ b/api/worker.go @@ -1393,7 +1393,7 @@ func (w *Worker) GetAddress(address string, page int, txsOnPage int, option Acco if ed.contractInfo != nil && ed.contractInfo.Type == bchain.ERC20TokenType { r.Erc20Contract = ed.contractInfo } - glog.Info("GetAddress ", address, ", ", time.Since(start)) + glog.Info("GetAddress-", option, " ", address, ", ", time.Since(start)) return r, nil } diff --git a/blockbook.go b/blockbook.go index 47bdaf69..a0065ec6 100644 --- a/blockbook.go +++ b/blockbook.go @@ -10,6 +10,7 @@ import ( "os" "os/signal" "runtime/debug" + "strconv" "strings" "syscall" "time" @@ -505,6 +506,12 @@ func newInternalState(config *common.Config, d *db.RocksDB, enableSubNewTx bool) } is.Host = name } + + is.WsGetAccountInfoLimit, _ = strconv.Atoi(os.Getenv(strings.ToUpper(is.CoinShortcut) + "_WS_GETACCOUNTINFO_LIMIT")) + if is.WsGetAccountInfoLimit > 0 { + glog.Info("WsGetAccountInfoLimit enabled with limit ", is.WsGetAccountInfoLimit) + is.WsLimitExceedingIPs = make(map[string]int) + } return is, nil } diff --git a/common/internalstate.go b/common/internalstate.go index d02da854..7c5c95ae 100644 --- a/common/internalstate.go +++ b/common/internalstate.go @@ -97,6 +97,10 @@ type InternalState struct { BlockGolombFilterP uint8 `json:"block_golomb_filter_p"` BlockFilterScripts string `json:"block_filter_scripts"` BlockFilterUseZeroedKey bool `json:"block_filter_use_zeroed_key"` + + // allowed number of fetched accounts over websocket + WsGetAccountInfoLimit int `json:"-"` + WsLimitExceedingIPs map[string]int `json:"-"` } // StartedSync signals start of synchronization @@ -341,3 +345,15 @@ func SetInShutdown() { func IsInShutdown() bool { return atomic.LoadInt32(&inShutdown) != 0 } + +func (is *InternalState) AddWsLimitExceedingIP(ip string) { + is.mux.Lock() + defer is.mux.Unlock() + is.WsLimitExceedingIPs[ip] = is.WsLimitExceedingIPs[ip] + 1 +} + +func (is *InternalState) ResetWsLimitExceedingIPs() { + is.mux.Lock() + defer is.mux.Unlock() + is.WsLimitExceedingIPs = make(map[string]int) +} diff --git a/server/internal.go b/server/internal.go index 17d07a45..7b5369ab 100644 --- a/server/internal.go +++ b/server/internal.go @@ -7,6 +7,7 @@ import ( "html/template" "net/http" "path/filepath" + "sort" "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -67,6 +68,7 @@ func NewInternalServer(binding, certFiles string, db *db.RocksDB, chain bchain.B serveMux.HandleFunc(path+"metrics", promhttp.Handler().ServeHTTP) serveMux.HandleFunc(path, s.index) serveMux.HandleFunc(path+"admin", s.htmlTemplateHandler(s.adminIndex)) + serveMux.HandleFunc(path+"admin/ws-limit-exceeding-ips", s.htmlTemplateHandler(s.wsLimitExceedingIPs)) if s.chainParser.GetChainType() == bchain.ChainEthereumType { serveMux.HandleFunc(path+"admin/internal-data-errors", s.htmlTemplateHandler(s.internalDataErrors)) } @@ -115,10 +117,17 @@ func (s *InternalServer) index(w http.ResponseWriter, r *http.Request) { const ( adminIndexTpl = iota + errorInternalTpl + 1 adminInternalErrorsTpl + adminLimitExceedingIPS internalTplCount ) +// WsLimitExceedingIP is used to transfer data to the templates +type WsLimitExceedingIP struct { + IP string + Count int +} + // InternalTemplateData is used to transfer data to the templates type InternalTemplateData struct { CoinName string @@ -128,6 +137,8 @@ type InternalTemplateData struct { Error *api.APIError InternalDataErrors []db.BlockInternalDataError RefetchingInternalData bool + WsGetAccountInfoLimit int + WsLimitExceedingIPs []WsLimitExceedingIP } func (s *InternalServer) newTemplateData(r *http.Request) *InternalTemplateData { @@ -161,6 +172,7 @@ func (s *InternalServer) parseTemplates() []*template.Template { t[errorInternalTpl] = createTemplate("./static/internal_templates/error.html", "./static/internal_templates/base.html") t[adminIndexTpl] = createTemplate("./static/internal_templates/index.html", "./static/internal_templates/base.html") t[adminInternalErrorsTpl] = createTemplate("./static/internal_templates/block_internal_data_errors.html", "./static/internal_templates/base.html") + t[adminLimitExceedingIPS] = createTemplate("./static/internal_templates/ws_limit_exceeding_ips.html", "./static/internal_templates/base.html") return t } @@ -185,3 +197,20 @@ func (s *InternalServer) internalDataErrors(w http.ResponseWriter, r *http.Reque data.RefetchingInternalData = s.api.IsRefetchingInternalData() return adminInternalErrorsTpl, data, nil } + +func (s *InternalServer) wsLimitExceedingIPs(w http.ResponseWriter, r *http.Request) (tpl, *InternalTemplateData, error) { + if r.Method == http.MethodPost { + s.is.ResetWsLimitExceedingIPs() + } + data := s.newTemplateData(r) + ips := make([]WsLimitExceedingIP, 0, len(s.is.WsLimitExceedingIPs)) + for k, v := range s.is.WsLimitExceedingIPs { + ips = append(ips, WsLimitExceedingIP{k, v}) + } + sort.Slice(ips, func(i, j int) bool { + return ips[i].Count > ips[j].Count + }) + data.WsLimitExceedingIPs = ips + data.WsGetAccountInfoLimit = s.is.WsGetAccountInfoLimit + return adminLimitExceedingIPS, data, nil +} diff --git a/server/websocket.go b/server/websocket.go index 899e4159..cba5f27d 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -36,14 +36,16 @@ var ( ) type websocketChannel struct { - id uint64 - conn *websocket.Conn - out chan *WsRes - ip string - requestHeader http.Header - alive bool - aliveLock sync.Mutex - addrDescs []string // subscribed address descriptors as strings + id uint64 + conn *websocket.Conn + out chan *WsRes + ip string + requestHeader http.Header + alive bool + aliveLock sync.Mutex + addrDescs []string // subscribed address descriptors as strings + getAddressInfoDescriptorsMux sync.Mutex + getAddressInfoDescriptors map[string]struct{} } // WebsocketServer is a handle to websocket server @@ -112,7 +114,11 @@ func checkOrigin(r *http.Request) bool { } func getIP(r *http.Request) string { - ip := r.Header.Get("X-Real-Ip") + ip := r.Header.Get("cf-connecting-ip") + if ip != "" { + return ip + } + ip = r.Header.Get("X-Real-Ip") if ip != "" { return ip } @@ -138,6 +144,9 @@ func (s *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { requestHeader: r.Header, alive: true, } + if s.is.WsGetAccountInfoLimit > 0 { + c.getAddressInfoDescriptors = make(map[string]struct{}) + } go s.inputLoop(c) go s.outputLoop(c) s.onConnect(c) @@ -148,11 +157,13 @@ func (s *WebsocketServer) GetHandler() http.Handler { return s } -func (s *WebsocketServer) closeChannel(c *websocketChannel) { +func (s *WebsocketServer) closeChannel(c *websocketChannel) bool { if c.CloseOut() { c.conn.Close() s.onDisconnect(c) + return true } + return false } func (c *websocketChannel) CloseOut() bool { @@ -259,6 +270,19 @@ var requestHandlers = map[string]func(*WebsocketServer, *websocketChannel, *WsRe "getAccountInfo": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) { r, err := unmarshalGetAccountInfoRequest(req.Params) if err == nil { + if s.is.WsGetAccountInfoLimit > 0 { + c.getAddressInfoDescriptorsMux.Lock() + c.getAddressInfoDescriptors[r.Descriptor] = struct{}{} + l := len(c.getAddressInfoDescriptors) + c.getAddressInfoDescriptorsMux.Unlock() + if l > s.is.WsGetAccountInfoLimit { + if s.closeChannel(c) { + glog.Info("Client ", c.id, " exceeded getAddressInfo limit, ", c.ip) + s.is.AddWsLimitExceedingIP(c.ip) + } + return + } + } rv, err = s.getAccountInfo(r) } return diff --git a/static/internal_templates/index.html b/static/internal_templates/index.html index 325f8494..5fef7011 100644 --- a/static/internal_templates/index.html +++ b/static/internal_templates/index.html @@ -1,4 +1,7 @@ {{define "specific"}} +
+ +
{{if eq .ChainType 1}}
diff --git a/static/internal_templates/ws_limit_exceeding_ips.html b/static/internal_templates/ws_limit_exceeding_ips.html new file mode 100644 index 00000000..081431fb --- /dev/null +++ b/static/internal_templates/ws_limit_exceeding_ips.html @@ -0,0 +1,29 @@ +{{define "specific"}} +

IP addresses disconnected for exceeding websocket limit

+
+
Distinct ip addresses that exceeded limit of {{.WsGetAccountInfoLimit}} requests since last reset: {{len .WsLimitExceedingIPs}}
+
+
+ +
+
+ +
+ + + + + + + + + {{range $d := .WsLimitExceedingIPs}} + + + + + {{end}} + +
IPCount
{{$d.IP}}{{$d.Count}}
+
+{{end}} \ No newline at end of file