Refactor https and socketio to internal and public interface

This commit is contained in:
Martin Boehm 2018-06-21 15:37:46 +02:00
parent bb46d0c715
commit 9bd1b374a7
4 changed files with 257 additions and 201 deletions

View File

@ -54,9 +54,9 @@ var (
syncWorkers = flag.Int("workers", 8, "number of workers to process blocks")
dryRun = flag.Bool("dryrun", false, "do not index blocks, only download")
httpServerBinding = flag.String("httpserver", "", "http server binding [address]:port, (default no http server)")
internalBinding = flag.String("httpserver", "", "http server binding [address]:port, (default no http server)")
socketIoBinding = flag.String("socketio", "", "socketio server binding [address]:port[/path], (default no socket.io server)")
publicBinding = flag.String("socketio", "", "socketio server binding [address]:port[/path], (default no socket.io server)")
certFiles = flag.String("certfile", "", "to enable SSL specify path to certificate files without extension, expecting <certfile>.crt and <certfile>.key, (default no SSL)")
@ -232,18 +232,18 @@ func main() {
return
}
var httpServer *server.HTTPServer
if *httpServerBinding != "" {
httpServer, err = server.NewHTTPServer(*httpServerBinding, *certFiles, index, chain, txCache, internalState)
var internalServer *server.InternalServer
if *internalBinding != "" {
internalServer, err = server.NewInternalServer(*internalBinding, *certFiles, index, chain, txCache, internalState)
if err != nil {
glog.Error("https: ", err)
return
}
go func() {
err = httpServer.Run()
err = internalServer.Run()
if err != nil {
if err.Error() == "http: Server closed" {
glog.Info(err)
glog.Info("internal server: closed")
} else {
glog.Error(err)
return
@ -263,27 +263,26 @@ func main() {
}
}
var socketIoServer *server.SocketIoServer
if *socketIoBinding != "" {
socketIoServer, err = server.NewSocketIoServer(
*socketIoBinding, *certFiles, index, chain, txCache, *explorerURL, metrics, internalState)
var publicServer *server.PublicServer
if *publicBinding != "" {
publicServer, err = server.NewPublicServer(*publicBinding, *certFiles, index, chain, txCache, *explorerURL, metrics, internalState)
if err != nil {
glog.Error("socketio: ", err)
return
}
go func() {
err = socketIoServer.Run()
err = publicServer.Run()
if err != nil {
if err.Error() == "http: Server closed" {
glog.Info(err)
glog.Info("public server: closed")
} else {
glog.Error(err)
return
}
}
}()
callbacksOnNewBlockHash = append(callbacksOnNewBlockHash, socketIoServer.OnNewBlockHash)
callbacksOnNewTxAddr = append(callbacksOnNewTxAddr, socketIoServer.OnNewTxAddr)
callbacksOnNewBlockHash = append(callbacksOnNewBlockHash, publicServer.OnNewBlockHash)
callbacksOnNewTxAddr = append(callbacksOnNewTxAddr, publicServer.OnNewTxAddr)
}
if *synchronize {
@ -314,8 +313,8 @@ func main() {
}
}
if httpServer != nil || socketIoServer != nil || chain != nil {
waitForSignalAndShutdown(httpServer, socketIoServer, chain, 10*time.Second)
if internalServer != nil || publicServer != nil || chain != nil {
waitForSignalAndShutdown(internalServer, publicServer, chain, 10*time.Second)
}
if *synchronize {
@ -464,29 +463,29 @@ func pushSynchronizationHandler(nt bchain.NotificationType) {
}
}
func waitForSignalAndShutdown(https *server.HTTPServer, socketio *server.SocketIoServer, chain bchain.BlockChain, timeout time.Duration) {
func waitForSignalAndShutdown(internal *server.InternalServer, public *server.PublicServer, chain bchain.BlockChain, timeout time.Duration) {
sig := <-chanOsSignal
atomic.StoreInt32(&inShutdown, 1)
glog.Infof("Shutdown: %v", sig)
glog.Infof("shutdown: %v", sig)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
if https != nil {
if err := https.Shutdown(ctx); err != nil {
glog.Error("HttpServer.Shutdown error: ", err)
if internal != nil {
if err := internal.Shutdown(ctx); err != nil {
glog.Error("internal server: shutdown error: ", err)
}
}
if socketio != nil {
if err := socketio.Shutdown(ctx); err != nil {
glog.Error("SocketIo.Shutdown error: ", err)
if public != nil {
if err := public.Shutdown(ctx); err != nil {
glog.Error("public server: shutdown error: ", err)
}
}
if chain != nil {
if err := chain.Shutdown(ctx); err != nil {
glog.Error("BlockChain.Shutdown error: ", err)
glog.Error("rpc: shutdown error: ", err)
}
}
}

View File

@ -18,8 +18,8 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
)
// HTTPServer is handle to HttpServer
type HTTPServer struct {
// InternalServer is handle to internal http server
type InternalServer struct {
https *http.Server
certFiles string
db *db.RocksDB
@ -44,14 +44,14 @@ type resAboutBlockbookInternal struct {
DbColumns []common.InternalStateColumn `json:"dbColumns"`
}
// NewHTTPServer creates new REST interface to blockbook and returns its handle
func NewHTTPServer(httpServerBinding string, certFiles string, db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, is *common.InternalState) (*HTTPServer, error) {
// NewInternalServer creates new internal http interface to blockbook and returns its handle
func NewInternalServer(httpServerBinding string, certFiles string, db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, is *common.InternalState) (*InternalServer, error) {
r := mux.NewRouter()
https := &http.Server{
Addr: httpServerBinding,
Handler: r,
}
s := &HTTPServer{
s := &InternalServer{
https: https,
certFiles: certFiles,
db: db,
@ -73,30 +73,30 @@ func NewHTTPServer(httpServerBinding string, certFiles string, db *db.RocksDB, c
}
// Run starts the server
func (s *HTTPServer) Run() error {
func (s *InternalServer) Run() error {
if s.certFiles == "" {
glog.Info("http server starting to listen on http://", s.https.Addr)
glog.Info("internal server: starting to listen on http://", s.https.Addr)
return s.https.ListenAndServe()
}
glog.Info("http server starting to listen on https://", s.https.Addr)
glog.Info("internal server: starting to listen on https://", s.https.Addr)
return s.https.ListenAndServeTLS(fmt.Sprint(s.certFiles, ".crt"), fmt.Sprint(s.certFiles, ".key"))
}
// Close closes the server
func (s *HTTPServer) Close() error {
glog.Infof("http server closing")
func (s *InternalServer) Close() error {
glog.Infof("internal server: closing")
return s.https.Close()
}
// Shutdown shuts down the server
func (s *HTTPServer) Shutdown(ctx context.Context) error {
glog.Infof("http server shutdown")
func (s *InternalServer) Shutdown(ctx context.Context) error {
glog.Infof("internal server: shutdown")
return s.https.Shutdown(ctx)
}
func respondError(w http.ResponseWriter, err error, context string) {
w.WriteHeader(http.StatusBadRequest)
glog.Errorf("http server (context %s) error: %v", context, err)
glog.Errorf("internal server: (context %s) error: %v", context, err)
}
func respondHashData(w http.ResponseWriter, hash string) {
@ -108,7 +108,7 @@ func respondHashData(w http.ResponseWriter, hash string) {
})
}
func (s *HTTPServer) index(w http.ResponseWriter, r *http.Request) {
func (s *InternalServer) index(w http.ResponseWriter, r *http.Request) {
vi := common.GetVersionInfo()
ss, bh, st := s.is.GetSyncState()
ms, mt, msz := s.is.GetMempoolSyncState()
@ -133,7 +133,7 @@ func (s *HTTPServer) index(w http.ResponseWriter, r *http.Request) {
w.Write(buf)
}
func (s *HTTPServer) bestBlockHash(w http.ResponseWriter, r *http.Request) {
func (s *InternalServer) bestBlockHash(w http.ResponseWriter, r *http.Request) {
_, hash, err := s.db.GetBestBlock()
if err != nil {
respondError(w, err, "bestBlockHash")
@ -142,7 +142,7 @@ func (s *HTTPServer) bestBlockHash(w http.ResponseWriter, r *http.Request) {
respondHashData(w, hash)
}
func (s *HTTPServer) blockHash(w http.ResponseWriter, r *http.Request) {
func (s *InternalServer) blockHash(w http.ResponseWriter, r *http.Request) {
heightString := mux.Vars(r)["height"]
var hash string
height, err := strconv.ParseUint(heightString, 10, 32)
@ -156,7 +156,7 @@ func (s *HTTPServer) blockHash(w http.ResponseWriter, r *http.Request) {
}
}
func (s *HTTPServer) getAddress(r *http.Request) (address string, err error) {
func (s *InternalServer) getAddress(r *http.Request) (address string, err error) {
address, ok := mux.Vars(r)["address"]
if !ok {
err = errors.New("Empty address")
@ -164,7 +164,7 @@ func (s *HTTPServer) getAddress(r *http.Request) (address string, err error) {
return
}
func (s *HTTPServer) getAddressAndHeightRange(r *http.Request) (address string, lower, higher uint32, err error) {
func (s *InternalServer) getAddressAndHeightRange(r *http.Request) (address string, lower, higher uint32, err error) {
address, err = s.getAddress(r)
if err != nil {
return
@ -184,7 +184,7 @@ type transactionList struct {
Txid []string `json:"txid"`
}
func (s *HTTPServer) unconfirmedTransactions(w http.ResponseWriter, r *http.Request) {
func (s *InternalServer) unconfirmedTransactions(w http.ResponseWriter, r *http.Request) {
address, err := s.getAddress(r)
if err != nil {
respondError(w, err, fmt.Sprint("unconfirmedTransactions for address", address))
@ -197,7 +197,7 @@ func (s *HTTPServer) unconfirmedTransactions(w http.ResponseWriter, r *http.Requ
json.NewEncoder(w).Encode(txList)
}
func (s *HTTPServer) confirmedTransactions(w http.ResponseWriter, r *http.Request) {
func (s *InternalServer) confirmedTransactions(w http.ResponseWriter, r *http.Request) {
address, lower, higher, err := s.getAddressAndHeightRange(r)
if err != nil {
respondError(w, err, fmt.Sprint("confirmedTransactions for address", address))
@ -213,7 +213,7 @@ func (s *HTTPServer) confirmedTransactions(w http.ResponseWriter, r *http.Reques
json.NewEncoder(w).Encode(txList)
}
func (s *HTTPServer) transactions(w http.ResponseWriter, r *http.Request) {
func (s *InternalServer) transactions(w http.ResponseWriter, r *http.Request) {
address, lower, higher, err := s.getAddressAndHeightRange(r)
if err != nil {
respondError(w, err, fmt.Sprint("transactions for address", address))

207
server/public.go Normal file
View File

@ -0,0 +1,207 @@
package server
import (
"blockbook/bchain"
"blockbook/common"
"blockbook/db"
"context"
"encoding/json"
"fmt"
"net/http"
"strconv"
"strings"
"time"
"github.com/golang/glog"
)
const blockbookAbout = "Blockbook - blockchain indexer for TREZOR wallet https://trezor.io/. Do not use for any other purpose."
// PublicServer is handle to public http server
type PublicServer struct {
binding string
certFiles string
socketio *SocketIoServer
https *http.Server
db *db.RocksDB
txCache *db.TxCache
chain bchain.BlockChain
chainParser bchain.BlockChainParser
explorerURL string
metrics *common.Metrics
is *common.InternalState
}
// NewPublicServerS creates new public server http interface to blockbook and returns its handle
func NewPublicServer(binding string, certFiles string, db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, explorerURL string, metrics *common.Metrics, is *common.InternalState) (*PublicServer, error) {
socketio, err := NewSocketIoServer(db, chain, txCache, metrics, is)
if err != nil {
return nil, err
}
addr, path := splitBinding(binding)
serveMux := http.NewServeMux()
https := &http.Server{
Addr: addr,
Handler: serveMux,
}
s := &PublicServer{
binding: binding,
certFiles: certFiles,
https: https,
socketio: socketio,
db: db,
txCache: txCache,
chain: chain,
chainParser: chain.GetChainParser(),
explorerURL: explorerURL,
metrics: metrics,
is: is,
}
// support for tests of socket.io interface
serveMux.Handle(path+"test.html", http.FileServer(http.Dir("./static/")))
// redirect to Bitcore for details of transaction
serveMux.HandleFunc(path+"tx/", s.txRedirect)
serveMux.HandleFunc(path+"address/", s.addressRedirect)
// API call used to detect state of Blockbook
serveMux.HandleFunc(path+"api/block-index/", s.apiBlockIndex)
// handle socket.io
serveMux.Handle(path+"socket.io/", socketio.GetHandler())
// default handler
serveMux.HandleFunc(path, s.index)
return s, nil
}
// Run starts the server
func (s *PublicServer) Run() error {
if s.certFiles == "" {
glog.Info("public server: starting to listen on http://", s.https.Addr)
return s.https.ListenAndServe()
}
glog.Info("public server starting to listen on https://", s.https.Addr)
return s.https.ListenAndServeTLS(fmt.Sprint(s.certFiles, ".crt"), fmt.Sprint(s.certFiles, ".key"))
}
// Close closes the server
func (s *PublicServer) Close() error {
glog.Infof("public server: closing")
return s.https.Close()
}
// Shutdown shuts down the server
func (s *PublicServer) Shutdown(ctx context.Context) error {
glog.Infof("public server: shutdown")
return s.https.Shutdown(ctx)
}
// OnNewBlockHash notifies users subscribed to bitcoind/hashblock about new block
func (s *PublicServer) OnNewBlockHash(hash string) {
s.socketio.OnNewBlockHash(hash)
}
// OnNewTxAddr notifies users subscribed to bitcoind/addresstxid about new block
func (s *PublicServer) OnNewTxAddr(txid string, addr string) {
s.socketio.OnNewTxAddr(txid, addr)
}
func splitBinding(binding string) (addr string, path string) {
i := strings.Index(binding, "/")
if i >= 0 {
return binding[0:i], binding[i:]
}
return binding, "/"
}
func joinURL(base string, part string) string {
if len(base) > 0 {
if len(base) > 0 && base[len(base)-1] == '/' && len(part) > 0 && part[0] == '/' {
return base + part[1:]
}
return base + part
}
return part
}
func (s *PublicServer) txRedirect(w http.ResponseWriter, r *http.Request) {
if s.explorerURL != "" {
http.Redirect(w, r, joinURL(s.explorerURL, r.URL.Path), 302)
s.metrics.ExplorerViews.With(common.Labels{"action": "tx"}).Inc()
}
}
func (s *PublicServer) addressRedirect(w http.ResponseWriter, r *http.Request) {
if s.explorerURL != "" {
http.Redirect(w, r, joinURL(s.explorerURL, r.URL.Path), 302)
s.metrics.ExplorerViews.With(common.Labels{"action": "address"}).Inc()
}
}
type resAboutBlockbookPublic struct {
Coin string `json:"coin"`
Host string `json:"host"`
Version string `json:"version"`
GitCommit string `json:"gitcommit"`
BuildTime string `json:"buildtime"`
InSync bool `json:"inSync"`
BestHeight uint32 `json:"bestHeight"`
LastBlockTime time.Time `json:"lastBlockTime"`
InSyncMempool bool `json:"inSyncMempool"`
LastMempoolTime time.Time `json:"lastMempoolTime"`
About string `json:"about"`
}
func (s *PublicServer) index(w http.ResponseWriter, r *http.Request) {
vi := common.GetVersionInfo()
ss, bh, st := s.is.GetSyncState()
ms, mt, _ := s.is.GetMempoolSyncState()
a := resAboutBlockbookPublic{
Coin: s.is.Coin,
Host: s.is.Host,
Version: vi.Version,
GitCommit: vi.GitCommit,
BuildTime: vi.BuildTime,
InSync: ss,
BestHeight: bh,
LastBlockTime: st,
InSyncMempool: ms,
LastMempoolTime: mt,
About: blockbookAbout,
}
buf, err := json.MarshalIndent(a, "", " ")
if err != nil {
glog.Error(err)
}
w.Write(buf)
}
func (s *PublicServer) apiBlockIndex(w http.ResponseWriter, r *http.Request) {
type resBlockIndex struct {
BlockHash string `json:"blockHash"`
About string `json:"about"`
}
var err error
var hash string
height := -1
if i := strings.LastIndexByte(r.URL.Path, '/'); i > 0 {
if h, err := strconv.Atoi(r.URL.Path[i+1:]); err == nil {
height = h
}
}
if height >= 0 {
hash, err = s.db.GetBlockHash(uint32(height))
} else {
_, hash, err = s.db.GetBestBlock()
}
if err != nil {
glog.Error(err)
} else {
r := resBlockIndex{
BlockHash: hash,
About: blockbookAbout,
}
json.NewEncoder(w).Encode(r)
}
}

View File

@ -4,11 +4,8 @@ import (
"blockbook/bchain"
"blockbook/common"
"blockbook/db"
"context"
"encoding/json"
"fmt"
"net/http"
"strconv"
"strings"
"time"
@ -19,25 +16,19 @@ import (
"github.com/martinboehm/golang-socketio/transport"
)
const blockbookAbout = "Blockbook - blockchain indexer for TREZOR wallet https://trezor.io/. Do not use for any other purpose."
// SocketIoServer is handle to SocketIoServer
type SocketIoServer struct {
binding string
certFiles string
server *gosocketio.Server
https *http.Server
db *db.RocksDB
txCache *db.TxCache
chain bchain.BlockChain
chainParser bchain.BlockChainParser
explorerURL string
metrics *common.Metrics
is *common.InternalState
}
// NewSocketIoServer creates new SocketIo interface to blockbook and returns its handle
func NewSocketIoServer(binding string, certFiles string, db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, explorerURL string, metrics *common.Metrics, is *common.InternalState) (*SocketIoServer, error) {
func NewSocketIoServer(db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, metrics *common.Metrics, is *common.InternalState) (*SocketIoServer, error) {
server := gosocketio.NewServer(transport.GetDefaultWebsocketTransport())
server.On(gosocketio.OnConnection, func(c *gosocketio.Channel) {
@ -58,166 +49,25 @@ func NewSocketIoServer(binding string, certFiles string, db *db.RocksDB, chain b
Name string `json:"name"`
Message string `json:"message"`
}
addr, path := splitBinding(binding)
serveMux := http.NewServeMux()
https := &http.Server{
Addr: addr,
Handler: serveMux,
}
s := &SocketIoServer{
binding: binding,
certFiles: certFiles,
https: https,
server: server,
db: db,
txCache: txCache,
chain: chain,
chainParser: chain.GetChainParser(),
explorerURL: explorerURL,
metrics: metrics,
is: is,
}
// support for tests of socket.io interface
serveMux.Handle(path+"test.html", http.FileServer(http.Dir("./static/")))
// redirect to Bitcore for details of transaction
serveMux.HandleFunc(path+"tx/", s.txRedirect)
serveMux.HandleFunc(path+"address/", s.addressRedirect)
// API call used to detect state of Blockbook
serveMux.HandleFunc(path+"api/block-index/", s.apiBlockIndex)
// handle socket.io
serveMux.Handle(path+"socket.io/", server)
// default handler
serveMux.HandleFunc(path, s.index)
server.On("message", s.onMessage)
server.On("subscribe", s.onSubscribe)
return s, nil
}
func splitBinding(binding string) (addr string, path string) {
i := strings.Index(binding, "/")
if i >= 0 {
return binding[0:i], binding[i:]
}
return binding, "/"
}
// Run starts the server
func (s *SocketIoServer) Run() error {
if s.certFiles == "" {
glog.Info("socketio server starting to listen on ws://", s.https.Addr)
return s.https.ListenAndServe()
}
glog.Info("socketio server starting to listen on wss://", s.https.Addr)
return s.https.ListenAndServeTLS(fmt.Sprint(s.certFiles, ".crt"), fmt.Sprint(s.certFiles, ".key"))
}
// Close closes the server
func (s *SocketIoServer) Close() error {
glog.Infof("socketio server closing")
return s.https.Close()
}
// Shutdown shuts down the server
func (s *SocketIoServer) Shutdown(ctx context.Context) error {
glog.Infof("socketio server shutdown")
return s.https.Shutdown(ctx)
}
func joinURL(base string, part string) string {
if len(base) > 0 {
if len(base) > 0 && base[len(base)-1] == '/' && len(part) > 0 && part[0] == '/' {
return base + part[1:]
} else {
return base + part
}
}
return part
}
func (s *SocketIoServer) txRedirect(w http.ResponseWriter, r *http.Request) {
if s.explorerURL != "" {
http.Redirect(w, r, joinURL(s.explorerURL, r.URL.Path), 302)
s.metrics.ExplorerViews.With(common.Labels{"action": "tx"}).Inc()
}
}
func (s *SocketIoServer) addressRedirect(w http.ResponseWriter, r *http.Request) {
if s.explorerURL != "" {
http.Redirect(w, r, joinURL(s.explorerURL, r.URL.Path), 302)
s.metrics.ExplorerViews.With(common.Labels{"action": "address"}).Inc()
}
}
type resAboutBlockbookPublic struct {
Coin string `json:"coin"`
Host string `json:"host"`
Version string `json:"version"`
GitCommit string `json:"gitcommit"`
BuildTime string `json:"buildtime"`
InSync bool `json:"inSync"`
BestHeight uint32 `json:"bestHeight"`
LastBlockTime time.Time `json:"lastBlockTime"`
InSyncMempool bool `json:"inSyncMempool"`
LastMempoolTime time.Time `json:"lastMempoolTime"`
About string `json:"about"`
}
func (s *SocketIoServer) index(w http.ResponseWriter, r *http.Request) {
vi := common.GetVersionInfo()
ss, bh, st := s.is.GetSyncState()
ms, mt, _ := s.is.GetMempoolSyncState()
a := resAboutBlockbookPublic{
Coin: s.is.Coin,
Host: s.is.Host,
Version: vi.Version,
GitCommit: vi.GitCommit,
BuildTime: vi.BuildTime,
InSync: ss,
BestHeight: bh,
LastBlockTime: st,
InSyncMempool: ms,
LastMempoolTime: mt,
About: blockbookAbout,
}
buf, err := json.MarshalIndent(a, "", " ")
if err != nil {
glog.Error(err)
}
w.Write(buf)
}
func (s *SocketIoServer) apiBlockIndex(w http.ResponseWriter, r *http.Request) {
type resBlockIndex struct {
BlockHash string `json:"blockHash"`
About string `json:"about"`
}
var err error
var hash string
height := -1
if i := strings.LastIndexByte(r.URL.Path, '/'); i > 0 {
if h, err := strconv.Atoi(r.URL.Path[i+1:]); err == nil {
height = h
}
}
if height >= 0 {
hash, err = s.db.GetBlockHash(uint32(height))
} else {
_, hash, err = s.db.GetBestBlock()
}
if err != nil {
glog.Error(err)
} else {
r := resBlockIndex{
BlockHash: hash,
About: blockbookAbout,
}
json.NewEncoder(w).Encode(r)
}
// GetHandler returns socket.io http handler
func (s *SocketIoServer) GetHandler() http.Handler {
return s.server
}
type addrOpts struct {