diff --git a/common/metrics.go b/common/metrics.go index 6e338195..6f92b14b 100644 --- a/common/metrics.go +++ b/common/metrics.go @@ -11,6 +11,10 @@ type Metrics struct { SocketIOSubscribes *prometheus.CounterVec SocketIOClients prometheus.Gauge SocketIOReqDuration *prometheus.HistogramVec + WebsocketRequests *prometheus.CounterVec + WebsocketSubscribes *prometheus.CounterVec + WebsocketClients prometheus.Gauge + WebsocketReqDuration *prometheus.HistogramVec IndexResyncDuration prometheus.Histogram MempoolResyncDuration prometheus.Histogram TxCacheEfficiency *prometheus.CounterVec @@ -48,7 +52,7 @@ func GetMetrics(coin string) (*Metrics, error) { metrics.SocketIOClients = prometheus.NewGauge( prometheus.GaugeOpts{ Name: "blockbook_socketio_clients", - Help: "Number of currently connected clients", + Help: "Number of currently connected socketio clients", ConstLabels: Labels{"coin": coin}, }, ) @@ -61,6 +65,38 @@ func GetMetrics(coin string) (*Metrics, error) { }, []string{"method"}, ) + metrics.WebsocketRequests = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "blockbook_websocket_requests", + Help: "Total number of websocket requests by method and status", + ConstLabels: Labels{"coin": coin}, + }, + []string{"method", "status"}, + ) + metrics.WebsocketSubscribes = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "blockbook_websocket_subscribes", + Help: "Total number of websocket subscribes by channel and status", + ConstLabels: Labels{"coin": coin}, + }, + []string{"channel", "status"}, + ) + metrics.WebsocketClients = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "blockbook_websocket_clients", + Help: "Number of currently connected websocket clients", + ConstLabels: Labels{"coin": coin}, + }, + ) + metrics.WebsocketReqDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "blockbook_websocket_req_duration", + Help: "Websocket request duration by method (in microseconds)", + Buckets: []float64{1, 5, 10, 25, 50, 75, 100, 250}, + ConstLabels: Labels{"coin": coin}, + }, + []string{"method"}, + ) metrics.IndexResyncDuration = prometheus.NewHistogram( prometheus.HistogramOpts{ Name: "blockbook_index_resync_duration", diff --git a/server/public.go b/server/public.go index b4f83e8c..5d149d36 100644 --- a/server/public.go +++ b/server/public.go @@ -31,6 +31,7 @@ type PublicServer struct { binding string certFiles string socketio *SocketIoServer + websocket *WebsocketServer https *http.Server db *db.RocksDB txCache *db.TxCache @@ -59,6 +60,11 @@ func NewPublicServer(binding string, certFiles string, db *db.RocksDB, chain bch return nil, err } + websocket, err := NewWebsocketServer(db, chain, txCache, metrics, is) + if err != nil { + return nil, err + } + addr, path := splitBinding(binding) serveMux := http.NewServeMux() https := &http.Server{ @@ -72,6 +78,7 @@ func NewPublicServer(binding string, certFiles string, db *db.RocksDB, chain bch https: https, api: api, socketio: socketio, + websocket: websocket, db: db, txCache: txCache, chain: chain, @@ -109,8 +116,9 @@ func (s *PublicServer) Run() error { func (s *PublicServer) ConnectFullPublicInterface() { serveMux := s.https.Handler.(*http.ServeMux) _, path := splitBinding(s.binding) - // support for tests of socket.io interface - serveMux.Handle(path+"test.html", http.FileServer(http.Dir("./static/"))) + // support for test pages + serveMux.Handle(path+"test-socketio.html", http.FileServer(http.Dir("./static/"))) + serveMux.Handle(path+"test-websocket.html", http.FileServer(http.Dir("./static/"))) if s.internalExplorer { // internal explorer handlers serveMux.HandleFunc(path+"tx/", s.htmlTemplateHandler(s.explorerTx)) @@ -136,6 +144,8 @@ func (s *PublicServer) ConnectFullPublicInterface() { serveMux.HandleFunc(path+"api/estimatefee/", s.jsonHandler(s.apiEstimateFee)) // socket.io interface serveMux.Handle(path+"socket.io/", s.socketio.GetHandler()) + // websocket interface + serveMux.Handle(path+"websocket", s.websocket.GetHandler()) } // Close closes the server @@ -153,6 +163,7 @@ func (s *PublicServer) Shutdown(ctx context.Context) error { // OnNewBlock notifies users subscribed to bitcoind/hashblock about new block func (s *PublicServer) OnNewBlock(hash string, height uint32) { s.socketio.OnNewBlockHash(hash) + s.websocket.OnNewBlock(hash, height) } // OnNewTxAddr notifies users subscribed to bitcoind/addresstxid about new block diff --git a/server/socketio.go b/server/socketio.go index bca903a8..131f31e1 100644 --- a/server/socketio.go +++ b/server/socketio.go @@ -103,13 +103,6 @@ var onMessageHandlers = map[string]func(*SocketIoServer, json.RawMessage) (inter } return }, - "getAccountInfo": func(s *SocketIoServer, params json.RawMessage) (rv interface{}, err error) { - req, err := unmarshalGetAccountInfoRequest(params) - if err == nil { - rv, err = s.getAccountInfo(req) - } - return - }, "getBlockHeader": func(s *SocketIoServer, params json.RawMessage) (rv interface{}, err error) { height, hash, err := unmarshalGetBlockHeader(params) if err == nil { @@ -214,22 +207,6 @@ func unmarshalGetAddressRequest(params []byte) (addr []string, opts addrOpts, er return } -type accountInfoReq struct { - Descriptor string `json:"descriptor"` - Details string `json:"details"` - PageSize int `json:"pageSize"` - Page int `json:"page"` -} - -func unmarshalGetAccountInfoRequest(params []byte) (*accountInfoReq, error) { - var r accountInfoReq - err := json.Unmarshal(params, &r) - if err != nil { - return nil, err - } - return &r, nil -} - type resultAddressTxids struct { Result []string `json:"result"` } @@ -684,24 +661,6 @@ func (s *SocketIoServer) getMempoolEntry(txid string) (res resultGetMempoolEntry return } -func (s *SocketIoServer) getAccountInfo(req *accountInfoReq) (res *api.Address, err error) { - if s.chainParser.GetChainType() == bchain.ChainEthereumType { - var opt api.GetAddressOption - switch req.Details { - case "balance": - opt = api.Balance - case "txids": - opt = api.TxidHistory - case "txs": - opt = api.TxHistory - default: - opt = api.Basic - } - return s.api.GetAddress(req.Descriptor, req.Page, req.PageSize, opt, api.AddressFilterNone) - } - return nil, errors.New("Not implemented") -} - // onSubscribe expects two event subscriptions based on the req parameter (including the doublequotes): // "bitcoind/hashblock" // "bitcoind/addresstxid",["2MzTmvPJLZaLzD9XdN3jMtQA5NexC3rAPww","2NAZRJKr63tSdcTxTN3WaE9ZNDyXy6PgGuv"] diff --git a/server/websocket.go b/server/websocket.go new file mode 100644 index 00000000..bedc95c2 --- /dev/null +++ b/server/websocket.go @@ -0,0 +1,348 @@ +package server + +import ( + "blockbook/api" + "blockbook/bchain" + "blockbook/common" + "blockbook/db" + "encoding/json" + "net/http" + "runtime/debug" + "sync" + "sync/atomic" + "time" + + "github.com/golang/glog" + "github.com/gorilla/websocket" + "github.com/juju/errors" +) + +const upgradeFailed = "Upgrade failed: " +const outChannelSize = 500 +const defaultTimeout = 60 * time.Second + +var ( + // ErrorMethodNotAllowed is returned when client tries to upgrade method other than GET + ErrorMethodNotAllowed = errors.New("Method not allowed") + + connectionCounter uint64 +) + +type websocketReq struct { + ID string `json:"id"` + Method string `json:"method"` + Params json.RawMessage `json:"params"` +} + +type websocketRes struct { + ID string `json:"id"` + Data interface{} `json:"data"` +} + +type websocketChannel struct { + id uint64 + conn *websocket.Conn + out chan *websocketRes + ip string + requestHeader http.Header + alive bool + aliveLock sync.Mutex +} + +// WebsocketServer is a handle to websocket server +type WebsocketServer struct { + socket *websocket.Conn + upgrader *websocket.Upgrader + db *db.RocksDB + txCache *db.TxCache + chain bchain.BlockChain + chainParser bchain.BlockChainParser + metrics *common.Metrics + is *common.InternalState + api *api.Worker + newBlockSubscriptions map[*websocketChannel]string + newBlockSubscriptionsLock sync.Mutex + addressSubscriptions map[string]map[*websocketChannel]string + addressSubscriptionsLock sync.Mutex +} + +// NewWebsocketServer creates new websocket interface to blockbook and returns its handle +func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, metrics *common.Metrics, is *common.InternalState) (*WebsocketServer, error) { + api, err := api.NewWorker(db, chain, txCache, is) + if err != nil { + return nil, err + } + s := &WebsocketServer{ + upgrader: &websocket.Upgrader{ + ReadBufferSize: 1024 * 32, + WriteBufferSize: 1024 * 32, + }, + db: db, + txCache: txCache, + chain: chain, + chainParser: chain.GetChainParser(), + metrics: metrics, + is: is, + api: api, + newBlockSubscriptions: make(map[*websocketChannel]string), + addressSubscriptions: make(map[string]map[*websocketChannel]string), + } + return s, nil +} + +// ServeHTTP sets up handler of websocket channel +func (s *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != "GET" { + http.Error(w, upgradeFailed+ErrorMethodNotAllowed.Error(), 503) + return + } + conn, err := s.upgrader.Upgrade(w, r, nil) + if err != nil { + http.Error(w, upgradeFailed+err.Error(), 503) + return + } + c := &websocketChannel{ + id: atomic.AddUint64(&connectionCounter, 1), + conn: conn, + out: make(chan *websocketRes, outChannelSize), + ip: r.RemoteAddr, + requestHeader: r.Header, + alive: true, + } + go s.inputLoop(c) + go s.outputLoop(c) + s.onConnect(c) +} + +// GetHandler returns http handler +func (s *WebsocketServer) GetHandler() http.Handler { + return s +} + +func (s *WebsocketServer) closeChannel(c *websocketChannel) { + c.aliveLock.Lock() + defer c.aliveLock.Unlock() + if c.alive { + c.conn.Close() + c.alive = false + //clean out + close(c.out) + for len(c.out) > 0 { + <-c.out + } + s.onDisconnect(c) + } +} + +func (c *websocketChannel) IsAlive() bool { + c.aliveLock.Lock() + defer c.aliveLock.Unlock() + return c.alive +} + +func (s *WebsocketServer) inputLoop(c *websocketChannel) { + defer func() { + if r := recover(); r != nil { + glog.Error("recovered from panic: ", r, ", ", c.id) + debug.PrintStack() + s.closeChannel(c) + } + }() + for { + t, d, err := c.conn.ReadMessage() + if err != nil { + s.closeChannel(c) + return + } + switch t { + case websocket.TextMessage: + var req websocketReq + err := json.Unmarshal(d, &req) + if err != nil { + glog.Error("Error parsing message from ", c.id, ", ", string(d), ", ", err) + s.closeChannel(c) + return + } + go s.onRequest(c, &req) + case websocket.BinaryMessage: + glog.Error("Binary message received from ", c.id, ", ", c.ip) + s.closeChannel(c) + return + case websocket.PingMessage: + c.conn.WriteControl(websocket.PongMessage, nil, time.Now().Add(defaultTimeout)) + break + case websocket.CloseMessage: + s.closeChannel(c) + return + case websocket.PongMessage: + // do nothing + } + } +} + +func (s *WebsocketServer) outputLoop(c *websocketChannel) { + for m := range c.out { + err := c.conn.WriteJSON(m) + if err != nil { + glog.Error("Error sending message to ", c.id, ", ", err) + s.closeChannel(c) + } + } +} + +func (s *WebsocketServer) onConnect(c *websocketChannel) { + glog.Info("Client connected ", c.id, ", ", c.ip) + s.metrics.WebsocketClients.Inc() +} + +func (s *WebsocketServer) onDisconnect(c *websocketChannel) { + s.unsubscribeNewBlock(c) + glog.Info("Client disconnected ", c.id, ", ", c.ip) + s.metrics.WebsocketClients.Dec() +} + +var requestHandlers = map[string]func(*WebsocketServer, *websocketChannel, *websocketReq) (interface{}, error){ + "getAccountInfo": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { + r, err := unmarshalGetAccountInfoRequest(req.Params) + if err == nil { + rv, err = s.getAccountInfo(r) + } + return + }, + "sendTransaction": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { + r := struct { + Hex string `json:"hex"` + }{} + err = json.Unmarshal(req.Params, &r) + if err == nil { + rv, err = s.sendTransaction(r.Hex) + } + return + }, + "subscribeNewBlock": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { + rv, err = s.subscribeNewBlock(c, req) + return + }, + "unsubscribeNewBlock": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { + rv, err = s.unsubscribeNewBlock(c) + return + }, +} + +func (s *WebsocketServer) onRequest(c *websocketChannel, req *websocketReq) { + var err error + var data interface{} + defer func() { + if r := recover(); r != nil { + glog.Error("Client ", c.id, ", onRequest ", req.Method, " recovered from panic: ", r) + debug.PrintStack() + e := resultError{} + e.Error.Message = "Internal error" + data = e + } + // nil data means no response + if data != nil { + c.out <- &websocketRes{ + ID: req.ID, + Data: data, + } + } + }() + t := time.Now() + defer s.metrics.WebsocketReqDuration.With(common.Labels{"method": req.Method}).Observe(float64(time.Since(t)) / 1e3) // in microseconds + f, ok := requestHandlers[req.Method] + if ok { + data, err = f(s, c, req) + } else { + err = errors.New("unknown method") + } + if err == nil { + glog.V(1).Info("Client ", c.id, " onRequest ", req.Method, " success") + s.metrics.SocketIORequests.With(common.Labels{"method": req.Method, "status": "success"}).Inc() + } else { + glog.Error("Client ", c.id, " onMessage ", req.Method, ": ", errors.ErrorStack(err)) + s.metrics.SocketIORequests.With(common.Labels{"method": req.Method, "status": err.Error()}).Inc() + e := resultError{} + e.Error.Message = err.Error() + data = e + } +} + +type accountInfoReq struct { + Descriptor string `json:"descriptor"` + Details string `json:"details"` + PageSize int `json:"pageSize"` + Page int `json:"page"` +} + +func unmarshalGetAccountInfoRequest(params []byte) (*accountInfoReq, error) { + var r accountInfoReq + err := json.Unmarshal(params, &r) + if err != nil { + return nil, err + } + return &r, nil +} + +func (s *WebsocketServer) getAccountInfo(req *accountInfoReq) (res *api.Address, err error) { + if s.chainParser.GetChainType() == bchain.ChainEthereumType { + var opt api.GetAddressOption + switch req.Details { + case "balance": + opt = api.Balance + case "txids": + opt = api.TxidHistory + case "txs": + opt = api.TxHistory + default: + opt = api.Basic + } + return s.api.GetAddress(req.Descriptor, req.Page, req.PageSize, opt, api.AddressFilterNone) + } + return nil, errors.New("Not implemented") +} + +func (s *WebsocketServer) sendTransaction(tx string) (res resultSendTransaction, err error) { + txid, err := s.chain.SendRawTransaction(tx) + if err != nil { + return res, err + } + res.Result = txid + return +} + +func (s *WebsocketServer) subscribeNewBlock(c *websocketChannel, req *websocketReq) (res interface{}, err error) { + s.newBlockSubscriptionsLock.Lock() + defer s.newBlockSubscriptionsLock.Unlock() + s.newBlockSubscriptions[c] = req.ID + return +} + +func (s *WebsocketServer) unsubscribeNewBlock(c *websocketChannel) (res interface{}, err error) { + s.newBlockSubscriptionsLock.Lock() + defer s.newBlockSubscriptionsLock.Unlock() + delete(s.newBlockSubscriptions, c) + return +} + +// OnNewBlock is a callback that broadcasts info about new block to subscribed clients +func (s *WebsocketServer) OnNewBlock(hash string, height uint32) { + s.newBlockSubscriptionsLock.Lock() + defer s.newBlockSubscriptionsLock.Unlock() + data := struct { + Height uint32 `json:"height"` + Hash string `json:"hash"` + }{ + Height: height, + Hash: hash, + } + for c, id := range s.newBlockSubscriptions { + if c.IsAlive() { + c.out <- &websocketRes{ + ID: id, + Data: &data, + } + } + } + glog.Info("broadcasting new block ", height, " ", hash, " to ", len(s.newBlockSubscriptions), " channels") +} diff --git a/static/test.html b/static/test-socketio.html similarity index 89% rename from static/test.html rename to static/test-socketio.html index 1047c3c0..c4b07b30 100644 --- a/static/test.html +++ b/static/test-socketio.html @@ -11,7 +11,7 @@ } -