Disconnect ws connections exceeding limit of requests
This commit is contained in:
parent
08389b5e05
commit
3c29b07c7c
@ -1393,7 +1393,7 @@ func (w *Worker) GetAddress(address string, page int, txsOnPage int, option Acco
|
||||
if ed.contractInfo != nil && ed.contractInfo.Type == bchain.ERC20TokenType {
|
||||
r.Erc20Contract = ed.contractInfo
|
||||
}
|
||||
glog.Info("GetAddress ", address, ", ", time.Since(start))
|
||||
glog.Info("GetAddress-", option, " ", address, ", ", time.Since(start))
|
||||
return r, nil
|
||||
}
|
||||
|
||||
|
||||
@ -10,6 +10,7 @@ import (
|
||||
"os"
|
||||
"os/signal"
|
||||
"runtime/debug"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
@ -505,6 +506,12 @@ func newInternalState(config *common.Config, d *db.RocksDB, enableSubNewTx bool)
|
||||
}
|
||||
is.Host = name
|
||||
}
|
||||
|
||||
is.WsGetAccountInfoLimit, _ = strconv.Atoi(os.Getenv(strings.ToUpper(is.CoinShortcut) + "_WS_GETACCOUNTINFO_LIMIT"))
|
||||
if is.WsGetAccountInfoLimit > 0 {
|
||||
glog.Info("WsGetAccountInfoLimit enabled with limit ", is.WsGetAccountInfoLimit)
|
||||
is.WsLimitExceedingIPs = make(map[string]int)
|
||||
}
|
||||
return is, nil
|
||||
}
|
||||
|
||||
|
||||
@ -97,6 +97,10 @@ type InternalState struct {
|
||||
BlockGolombFilterP uint8 `json:"block_golomb_filter_p"`
|
||||
BlockFilterScripts string `json:"block_filter_scripts"`
|
||||
BlockFilterUseZeroedKey bool `json:"block_filter_use_zeroed_key"`
|
||||
|
||||
// allowed number of fetched accounts over websocket
|
||||
WsGetAccountInfoLimit int `json:"-"`
|
||||
WsLimitExceedingIPs map[string]int `json:"-"`
|
||||
}
|
||||
|
||||
// StartedSync signals start of synchronization
|
||||
@ -341,3 +345,15 @@ func SetInShutdown() {
|
||||
func IsInShutdown() bool {
|
||||
return atomic.LoadInt32(&inShutdown) != 0
|
||||
}
|
||||
|
||||
func (is *InternalState) AddWsLimitExceedingIP(ip string) {
|
||||
is.mux.Lock()
|
||||
defer is.mux.Unlock()
|
||||
is.WsLimitExceedingIPs[ip] = is.WsLimitExceedingIPs[ip] + 1
|
||||
}
|
||||
|
||||
func (is *InternalState) ResetWsLimitExceedingIPs() {
|
||||
is.mux.Lock()
|
||||
defer is.mux.Unlock()
|
||||
is.WsLimitExceedingIPs = make(map[string]int)
|
||||
}
|
||||
|
||||
@ -7,6 +7,7 @@ import (
|
||||
"html/template"
|
||||
"net/http"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
@ -67,6 +68,7 @@ func NewInternalServer(binding, certFiles string, db *db.RocksDB, chain bchain.B
|
||||
serveMux.HandleFunc(path+"metrics", promhttp.Handler().ServeHTTP)
|
||||
serveMux.HandleFunc(path, s.index)
|
||||
serveMux.HandleFunc(path+"admin", s.htmlTemplateHandler(s.adminIndex))
|
||||
serveMux.HandleFunc(path+"admin/ws-limit-exceeding-ips", s.htmlTemplateHandler(s.wsLimitExceedingIPs))
|
||||
if s.chainParser.GetChainType() == bchain.ChainEthereumType {
|
||||
serveMux.HandleFunc(path+"admin/internal-data-errors", s.htmlTemplateHandler(s.internalDataErrors))
|
||||
}
|
||||
@ -115,10 +117,17 @@ func (s *InternalServer) index(w http.ResponseWriter, r *http.Request) {
|
||||
const (
|
||||
adminIndexTpl = iota + errorInternalTpl + 1
|
||||
adminInternalErrorsTpl
|
||||
adminLimitExceedingIPS
|
||||
|
||||
internalTplCount
|
||||
)
|
||||
|
||||
// WsLimitExceedingIP is used to transfer data to the templates
|
||||
type WsLimitExceedingIP struct {
|
||||
IP string
|
||||
Count int
|
||||
}
|
||||
|
||||
// InternalTemplateData is used to transfer data to the templates
|
||||
type InternalTemplateData struct {
|
||||
CoinName string
|
||||
@ -128,6 +137,8 @@ type InternalTemplateData struct {
|
||||
Error *api.APIError
|
||||
InternalDataErrors []db.BlockInternalDataError
|
||||
RefetchingInternalData bool
|
||||
WsGetAccountInfoLimit int
|
||||
WsLimitExceedingIPs []WsLimitExceedingIP
|
||||
}
|
||||
|
||||
func (s *InternalServer) newTemplateData(r *http.Request) *InternalTemplateData {
|
||||
@ -161,6 +172,7 @@ func (s *InternalServer) parseTemplates() []*template.Template {
|
||||
t[errorInternalTpl] = createTemplate("./static/internal_templates/error.html", "./static/internal_templates/base.html")
|
||||
t[adminIndexTpl] = createTemplate("./static/internal_templates/index.html", "./static/internal_templates/base.html")
|
||||
t[adminInternalErrorsTpl] = createTemplate("./static/internal_templates/block_internal_data_errors.html", "./static/internal_templates/base.html")
|
||||
t[adminLimitExceedingIPS] = createTemplate("./static/internal_templates/ws_limit_exceeding_ips.html", "./static/internal_templates/base.html")
|
||||
return t
|
||||
}
|
||||
|
||||
@ -185,3 +197,20 @@ func (s *InternalServer) internalDataErrors(w http.ResponseWriter, r *http.Reque
|
||||
data.RefetchingInternalData = s.api.IsRefetchingInternalData()
|
||||
return adminInternalErrorsTpl, data, nil
|
||||
}
|
||||
|
||||
func (s *InternalServer) wsLimitExceedingIPs(w http.ResponseWriter, r *http.Request) (tpl, *InternalTemplateData, error) {
|
||||
if r.Method == http.MethodPost {
|
||||
s.is.ResetWsLimitExceedingIPs()
|
||||
}
|
||||
data := s.newTemplateData(r)
|
||||
ips := make([]WsLimitExceedingIP, 0, len(s.is.WsLimitExceedingIPs))
|
||||
for k, v := range s.is.WsLimitExceedingIPs {
|
||||
ips = append(ips, WsLimitExceedingIP{k, v})
|
||||
}
|
||||
sort.Slice(ips, func(i, j int) bool {
|
||||
return ips[i].Count > ips[j].Count
|
||||
})
|
||||
data.WsLimitExceedingIPs = ips
|
||||
data.WsGetAccountInfoLimit = s.is.WsGetAccountInfoLimit
|
||||
return adminLimitExceedingIPS, data, nil
|
||||
}
|
||||
|
||||
@ -36,14 +36,16 @@ var (
|
||||
)
|
||||
|
||||
type websocketChannel struct {
|
||||
id uint64
|
||||
conn *websocket.Conn
|
||||
out chan *WsRes
|
||||
ip string
|
||||
requestHeader http.Header
|
||||
alive bool
|
||||
aliveLock sync.Mutex
|
||||
addrDescs []string // subscribed address descriptors as strings
|
||||
id uint64
|
||||
conn *websocket.Conn
|
||||
out chan *WsRes
|
||||
ip string
|
||||
requestHeader http.Header
|
||||
alive bool
|
||||
aliveLock sync.Mutex
|
||||
addrDescs []string // subscribed address descriptors as strings
|
||||
getAddressInfoDescriptorsMux sync.Mutex
|
||||
getAddressInfoDescriptors map[string]struct{}
|
||||
}
|
||||
|
||||
// WebsocketServer is a handle to websocket server
|
||||
@ -112,7 +114,11 @@ func checkOrigin(r *http.Request) bool {
|
||||
}
|
||||
|
||||
func getIP(r *http.Request) string {
|
||||
ip := r.Header.Get("X-Real-Ip")
|
||||
ip := r.Header.Get("cf-connecting-ip")
|
||||
if ip != "" {
|
||||
return ip
|
||||
}
|
||||
ip = r.Header.Get("X-Real-Ip")
|
||||
if ip != "" {
|
||||
return ip
|
||||
}
|
||||
@ -138,6 +144,9 @@ func (s *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
requestHeader: r.Header,
|
||||
alive: true,
|
||||
}
|
||||
if s.is.WsGetAccountInfoLimit > 0 {
|
||||
c.getAddressInfoDescriptors = make(map[string]struct{})
|
||||
}
|
||||
go s.inputLoop(c)
|
||||
go s.outputLoop(c)
|
||||
s.onConnect(c)
|
||||
@ -148,11 +157,13 @@ func (s *WebsocketServer) GetHandler() http.Handler {
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *WebsocketServer) closeChannel(c *websocketChannel) {
|
||||
func (s *WebsocketServer) closeChannel(c *websocketChannel) bool {
|
||||
if c.CloseOut() {
|
||||
c.conn.Close()
|
||||
s.onDisconnect(c)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *websocketChannel) CloseOut() bool {
|
||||
@ -259,6 +270,19 @@ var requestHandlers = map[string]func(*WebsocketServer, *websocketChannel, *WsRe
|
||||
"getAccountInfo": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) {
|
||||
r, err := unmarshalGetAccountInfoRequest(req.Params)
|
||||
if err == nil {
|
||||
if s.is.WsGetAccountInfoLimit > 0 {
|
||||
c.getAddressInfoDescriptorsMux.Lock()
|
||||
c.getAddressInfoDescriptors[r.Descriptor] = struct{}{}
|
||||
l := len(c.getAddressInfoDescriptors)
|
||||
c.getAddressInfoDescriptorsMux.Unlock()
|
||||
if l > s.is.WsGetAccountInfoLimit {
|
||||
if s.closeChannel(c) {
|
||||
glog.Info("Client ", c.id, " exceeded getAddressInfo limit, ", c.ip)
|
||||
s.is.AddWsLimitExceedingIP(c.ip)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
rv, err = s.getAccountInfo(r)
|
||||
}
|
||||
return
|
||||
|
||||
@ -1,4 +1,7 @@
|
||||
{{define "specific"}}
|
||||
<div class="row">
|
||||
<div class="col"><a href="/admin/ws-limit-exceeding-ips">IP addresses that exceeded websocket usage limit</a></div>
|
||||
</div>
|
||||
{{if eq .ChainType 1}}
|
||||
<div class="row">
|
||||
<div class="col"><a href="/admin/internal-data-errors">Internal Data Errors</a></div>
|
||||
|
||||
29
static/internal_templates/ws_limit_exceeding_ips.html
Normal file
29
static/internal_templates/ws_limit_exceeding_ips.html
Normal file
@ -0,0 +1,29 @@
|
||||
{{define "specific"}}
|
||||
<h3>IP addresses disconnected for exceeding websocket limit</h3>
|
||||
<div class="row g-0">
|
||||
<div class="col-md-11">Distinct ip addresses that exceeded limit of {{.WsGetAccountInfoLimit}} requests since last reset: {{len .WsLimitExceedingIPs}}</div>
|
||||
<div class="col-md-1 justify-content-right">
|
||||
<form method="POST" action="/admin/ws-limit-exceeding-ips">
|
||||
<button type="submit" class="btn btn-outline-secondary">Reset</button>
|
||||
</form>
|
||||
</div>
|
||||
</row>
|
||||
<div>
|
||||
<table class="table table-hover">
|
||||
<thead>
|
||||
<tr>
|
||||
<th>IP</th>
|
||||
<th>Count</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{{range $d := .WsLimitExceedingIPs}}
|
||||
<tr>
|
||||
<td>{{$d.IP}}</a></td>
|
||||
<td>{{$d.Count}}</td>
|
||||
</tr>
|
||||
{{end}}
|
||||
</tbody>
|
||||
</table>
|
||||
</div>
|
||||
{{end}}
|
||||
Loading…
Reference in New Issue
Block a user