Merge branch 'metrics'

This commit is contained in:
Jakub Matys 2018-03-13 11:45:44 +01:00
commit 8ed9beef27
14 changed files with 529 additions and 68 deletions

278
Gopkg.lock generated
View File

@ -1,6 +1,24 @@
# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'.
[[projects]]
branch = "master"
name = "github.com/alecthomas/template"
packages = [".","parse"]
revision = "a0175ee3bccc567396460bf5acd36800cb10c49c"
[[projects]]
branch = "master"
name = "github.com/alecthomas/units"
packages = ["."]
revision = "2efee857e7cfd4f3d0138cc3cbb1b4966962b93a"
[[projects]]
branch = "master"
name = "github.com/beorn7/perks"
packages = ["quantile"]
revision = "4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9"
[[projects]]
branch = "master"
name = "github.com/bsm/go-vlq"
@ -10,7 +28,7 @@
[[projects]]
branch = "master"
name = "github.com/btcsuite/btcd"
packages = ["blockchain","btcec","chaincfg","chaincfg/chainhash","database","txscript","wire"]
packages = ["addrmgr","blockchain","blockchain/fullblocktests","blockchain/indexers","btcec","btcjson","chaincfg","chaincfg/chainhash","connmgr","database","database/ffldb","database/internal/treap","integration/rpctest","limits","mempool","mining","mining/cpuminer","netsync","peer","rpcclient","txscript","wire"]
revision = "a1d1ea70dd212a440beb9caa4b766a58d1ed0254"
[[projects]]
@ -22,9 +40,93 @@
[[projects]]
branch = "master"
name = "github.com/btcsuite/btcutil"
packages = [".","base58","bech32"]
packages = [".","base58","bech32","bloom","coinset","hdkeychain","txsort"]
revision = "501929d3d046174c3d39f0ea54ece471aa17238c"
[[projects]]
branch = "master"
name = "github.com/btcsuite/go-socks"
packages = ["socks"]
revision = "4720035b7bfd2a9bb130b1c184f8bbe41b6f0d0f"
[[projects]]
branch = "master"
name = "github.com/btcsuite/goleveldb"
packages = ["leveldb","leveldb/cache","leveldb/comparer","leveldb/errors","leveldb/filter","leveldb/iterator","leveldb/journal","leveldb/memdb","leveldb/opt","leveldb/storage","leveldb/table","leveldb/util"]
revision = "7834afc9e8cd15233b6c3d97e12674a31ca24602"
[[projects]]
branch = "master"
name = "github.com/btcsuite/snappy-go"
packages = ["."]
revision = "0bdef8d067237991ddaa1bb6072a740bc40601ba"
[[projects]]
branch = "master"
name = "github.com/btcsuite/websocket"
packages = ["."]
revision = "31079b6807923eb23992c421b114992b95131b55"
[[projects]]
branch = "master"
name = "github.com/btcsuite/winsvc"
packages = ["eventlog","mgr","registry","svc","winapi"]
revision = "f8fb11f83f7e860e3769a08e6811d1b399a43722"
[[projects]]
name = "github.com/davecgh/go-spew"
packages = ["spew"]
revision = "346938d642f2ec3594ed81d874461961cd0faa76"
version = "v1.1.0"
[[projects]]
branch = "master"
name = "github.com/facebookgo/ensure"
packages = ["."]
revision = "b4ab57deab51ee655ae4bd85281f0715a068016d"
[[projects]]
branch = "master"
name = "github.com/facebookgo/stack"
packages = ["."]
revision = "751773369052141c013c6e827a71e8f35c07879c"
[[projects]]
branch = "master"
name = "github.com/facebookgo/subset"
packages = ["."]
revision = "8dac2c3c48703541e481feddf22975953d4ff842"
[[projects]]
name = "github.com/go-kit/kit"
packages = ["log","log/level"]
revision = "4dc7be5d2d12881735283bcab7352178e190fc71"
version = "v0.6.0"
[[projects]]
name = "github.com/go-logfmt/logfmt"
packages = ["."]
revision = "390ab7935ee28ec6b286364bba9b4dd6410cb3d5"
version = "v0.3.0"
[[projects]]
name = "github.com/go-stack/stack"
packages = ["."]
revision = "259ab82a6cad3992b4e21ff5cac294ccb06474bc"
version = "v1.7.0"
[[projects]]
branch = "master"
name = "github.com/golang/glog"
packages = ["."]
revision = "23def4e6c14b4da8ac2ed8007337bc5eb5007998"
[[projects]]
name = "github.com/golang/protobuf"
packages = ["descriptor","jsonpb/jsonpb_test_proto","proto","proto/proto3_proto","proto/testdata","protoc-gen-go/descriptor","protoc-gen-go/generator","protoc-gen-go/grpc","protoc-gen-go/plugin","ptypes","ptypes/any","ptypes/duration","ptypes/struct","ptypes/timestamp","ptypes/wrappers"]
revision = "925541529c1fa6821df4e44ce2723319eb2be768"
version = "v1.0.0"
[[projects]]
name = "github.com/gorilla/context"
packages = ["."]
@ -43,12 +145,132 @@
revision = "53c1911da2b537f792e7cafcb446b05ffe33b996"
version = "v1.6.1"
[[projects]]
name = "github.com/gorilla/websocket"
packages = ["."]
revision = "ea4d1f681babbce9545c9c5f3d5194a789c89f5b"
version = "v1.2.0"
[[projects]]
name = "github.com/jessevdk/go-flags"
packages = ["."]
revision = "96dc06278ce32a0e9d957d590bb987c81ee66407"
version = "v1.3.0"
[[projects]]
branch = "master"
name = "github.com/jrick/logrotate"
packages = ["rotator"]
revision = "a93b200c26cbae3bb09dd0dc2c7c7fe1468a034a"
[[projects]]
branch = "master"
name = "github.com/juju/errors"
packages = ["."]
revision = "c7d06af17c68cd34c835053720b21f6549d9b0ee"
[[projects]]
branch = "master"
name = "github.com/juju/loggo"
packages = ["."]
revision = "8232ab8918d91c72af1a9fb94d3edbe31d88b790"
[[projects]]
branch = "master"
name = "github.com/juju/testing"
packages = ["checkers"]
revision = "43f926548f91d55be6bae26ecb7d2386c64e887c"
[[projects]]
name = "github.com/julienschmidt/httprouter"
packages = ["."]
revision = "8c199fb6259ffc1af525cc3ad52ee60ba8359669"
version = "v1.1"
[[projects]]
branch = "master"
name = "github.com/kr/logfmt"
packages = ["."]
revision = "b84e30acd515aadc4b783ad4ff83aff3299bdfe0"
[[projects]]
branch = "master"
name = "github.com/martinboehm/golang-socketio"
packages = [".","protocol","transport"]
revision = "a75d7b2d508c4746fd14ec8116c1e183834b7f77"
[[projects]]
name = "github.com/matttproud/golang_protobuf_extensions"
packages = ["pbutil"]
revision = "3247c84500bff8d9fb6d579d800f20b3e091582c"
version = "v1.0.0"
[[projects]]
name = "github.com/onsi/ginkgo"
packages = [".","config","internal/codelocation","internal/containernode","internal/failer","internal/leafnodes","internal/remote","internal/spec","internal/spec_iterator","internal/specrunner","internal/suite","internal/testingtproxy","internal/writer","reporters","reporters/stenographer","reporters/stenographer/support/go-colorable","reporters/stenographer/support/go-isatty","types"]
revision = "9eda700730cba42af70d53180f9dcce9266bc2bc"
version = "v1.4.0"
[[projects]]
name = "github.com/onsi/gomega"
packages = [".","format","internal/assertion","internal/asyncassertion","internal/oraclematcher","internal/testingtsupport","matchers","matchers/support/goraph/bipartitegraph","matchers/support/goraph/edge","matchers/support/goraph/node","matchers/support/goraph/util","types"]
revision = "003f63b7f4cff3fc95357005358af2de0f5fe152"
version = "v1.3.0"
[[projects]]
name = "github.com/pborman/uuid"
packages = ["."]
revision = "e790cca94e6cc75c7064b1332e63811d4aae1a53"
version = "v1.1"
[[projects]]
branch = "master"
name = "github.com/pebbe/zmq4"
packages = [".","draft","examples/bstar","examples/clone","examples/flcliapi","examples/intface","examples/kvmsg","examples/kvsimple","examples/mdapi"]
revision = "6decad45434f1cddeccf1dc5d9d86d8249decd19"
[[projects]]
name = "github.com/pkg/errors"
packages = ["."]
revision = "645ef00459ed84a119197bfb8d8205042c6df63d"
version = "v0.8.0"
[[projects]]
name = "github.com/pkg/profile"
packages = ["."]
revision = "5b67d428864e92711fcbd2f8629456121a56d91f"
version = "v1.2.1"
[[projects]]
name = "github.com/prometheus/client_golang"
packages = ["prometheus","prometheus/promhttp","prometheus/push"]
revision = "c5b7fccd204277076155f10851dad72b76a49317"
version = "v0.8.0"
[[projects]]
branch = "master"
name = "github.com/prometheus/client_model"
packages = ["go"]
revision = "99fa1f4be8e564e8a6b613da7fa6f46c9edafc6c"
[[projects]]
branch = "master"
name = "github.com/prometheus/common"
packages = ["expfmt","internal/bitbucket.org/ww/goautoneg","model","promlog"]
revision = "6fb6fce6f8b75884b92e1889c150403fc0872c5e"
[[projects]]
branch = "master"
name = "github.com/prometheus/procfs"
packages = [".","bcache","internal/util","nfs","xfs"]
revision = "1c7ff3de94ae006f58cba483a4c9c6d7c61e1d98"
[[projects]]
name = "github.com/sirupsen/logrus"
packages = ["."]
revision = "c155da19408a8799da419ed3eeb0cb5db0ad5dbc"
version = "v1.0.5"
[[projects]]
branch = "master"
name = "github.com/tecbot/gorocksdb"
@ -58,12 +280,60 @@
[[projects]]
branch = "master"
name = "golang.org/x/crypto"
packages = ["ripemd160"]
packages = ["acme","acme/autocert","curve25519","ed25519","ed25519/internal/edwards25519","nacl/auth","pkcs12/internal/rc2","ripemd160","ssh","ssh/terminal"]
revision = "81e90905daefcd6fd217b62423c0908922eadb30"
[[projects]]
branch = "master"
name = "golang.org/x/net"
packages = ["context","context/ctxhttp","html","html/atom","html/charset"]
revision = "803fdb99c0f72e493c28ef2099d250a9c989d8ff"
[[projects]]
branch = "master"
name = "golang.org/x/sync"
packages = ["errgroup"]
revision = "fd80eb99c8f653c847d294a001bdf2a3a6f768f5"
[[projects]]
branch = "master"
name = "golang.org/x/sys"
packages = ["unix","windows","windows/registry","windows/svc/eventlog"]
revision = "8c0ece68c28377f4c326d85b94f8df0dace46f80"
[[projects]]
name = "golang.org/x/text"
packages = ["encoding","encoding/charmap","encoding/htmlindex","encoding/internal","encoding/internal/identifier","encoding/japanese","encoding/korean","encoding/simplifiedchinese","encoding/traditionalchinese","encoding/unicode","internal/gen","internal/tag","internal/utf8internal","language","runes","transform","unicode/cldr"]
revision = "f21a4dfb5e38f5895301dc265a8def02365cc3d0"
version = "v0.3.0"
[[projects]]
name = "gopkg.in/alecthomas/kingpin.v2"
packages = ["."]
revision = "947dcec5ba9c011838740e680966fd7087a71d0d"
version = "v2.2.6"
[[projects]]
branch = "v1"
name = "gopkg.in/check.v1"
packages = ["."]
revision = "20d25e2804050c1cd24a7eea1e7a6447dd0e74ec"
[[projects]]
branch = "v2"
name = "gopkg.in/mgo.v2"
packages = ["bson","internal/json"]
revision = "3f83fa5005286a7fe593b055f0d7771a7dce4655"
[[projects]]
name = "gopkg.in/yaml.v2"
packages = ["."]
revision = "7f97868eec74b32b0982dd158a51a446d1da7eb5"
version = "v2.1.1"
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "b02ba452e22aa8fee86c7c5ba26ab49ca8f59d5541124eece5b5719c12adde9f"
inputs-digest = "9983e204a93e14b83927f3b072bf8ea6411d8d2f799a963f9746ac1b2ac9f005"
solver-name = "gps-cdcl"
solver-version = 1

View File

@ -33,14 +33,6 @@
branch = "master"
name = "github.com/btcsuite/btcutil"
[[constraint]]
branch = "master"
name = "github.com/hashicorp/golang-lru"
[[constraint]]
branch = "master"
name = "github.com/tecbot/gorocksdb"
[[constraint]]
name = "github.com/pkg/profile"
version = "1.2.1"

View File

@ -4,6 +4,7 @@ import (
"blockbook/bchain"
"blockbook/bchain/coins/btc"
"blockbook/bchain/coins/zec"
"blockbook/common"
"fmt"
"reflect"
"time"
@ -11,7 +12,7 @@ import (
"github.com/juju/errors"
)
type blockChainFactory func(url string, user string, password string, timeout time.Duration, parse bool) (bchain.BlockChain, error)
type blockChainFactory func(url string, user string, password string, timeout time.Duration, parse bool, metrics *common.Metrics) (bchain.BlockChain, error)
var blockChainFactories = make(map[string]blockChainFactory)
@ -21,10 +22,10 @@ func init() {
}
// NewBlockChain creates bchain.BlockChain of type defined by parameter coin
func NewBlockChain(coin string, url string, user string, password string, timeout time.Duration, parse bool) (bchain.BlockChain, error) {
func NewBlockChain(coin string, url string, user string, password string, timeout time.Duration, parse bool, metrics *common.Metrics) (bchain.BlockChain, error) {
bcf, ok := blockChainFactories[coin]
if !ok {
return nil, errors.New(fmt.Sprint("Unsupported coin ", coin, ". Must be one of ", reflect.ValueOf(blockChainFactories).MapKeys()))
}
return bcf(url, user, password, timeout, parse)
return bcf(url, user, password, timeout, parse, metrics)
}

View File

@ -2,6 +2,7 @@ package btc
import (
"blockbook/bchain"
"blockbook/common"
"bytes"
"encoding/hex"
"encoding/json"
@ -28,10 +29,11 @@ type BitcoinRPC struct {
Network string
Mempool *bchain.Mempool
ParseBlocks bool
metrics *common.Metrics
}
// NewBitcoinRPC returns new BitcoinRPC instance.
func NewBitcoinRPC(url string, user string, password string, timeout time.Duration, parse bool) (bchain.BlockChain, error) {
func NewBitcoinRPC(url string, user string, password string, timeout time.Duration, parse bool, metrics *common.Metrics) (bchain.BlockChain, error) {
transport := &http.Transport{
Dial: (&net.Dialer{KeepAlive: 600 * time.Second}).Dial,
MaxIdleConns: 100,
@ -43,6 +45,7 @@ func NewBitcoinRPC(url string, user string, password string, timeout time.Durati
user: user,
password: password,
ParseBlocks: parse,
metrics: metrics,
}
chainName, err := s.GetBlockChainInfo()
if err != nil {
@ -63,7 +66,7 @@ func NewBitcoinRPC(url string, user string, password string, timeout time.Durati
s.Network = "testnet"
}
s.Mempool = bchain.NewMempool(s)
s.Mempool = bchain.NewMempool(s, metrics)
glog.Info("rpc: block chain ", s.Parser.Params.Name)
return s, nil
@ -254,7 +257,7 @@ func (b *BitcoinRPC) GetBestBlockHash() (string, error) {
res := resGetBestBlockHash{}
req := cmdGetBestBlockHash{Method: "getbestblockhash"}
err := b.call(&req, &res)
err := b.observeRPCLatency(req.Method, func() error { return b.call(&req, &res) })
if err != nil {
return "", err
@ -271,7 +274,7 @@ func (b *BitcoinRPC) GetBestBlockHeight() (uint32, error) {
res := resGetBlockCount{}
req := cmdGetBlockCount{Method: "getblockcount"}
err := b.call(&req, &res)
err := b.observeRPCLatency(req.Method, func() error { return b.call(&req, &res) })
if err != nil {
return 0, err
@ -288,7 +291,7 @@ func (b *BitcoinRPC) GetBlockChainInfo() (string, error) {
res := resGetBlockChainInfo{}
req := cmdGetBlockChainInfo{Method: "getblockchaininfo"}
err := b.call(&req, &res)
err := b.observeRPCLatency(req.Method, func() error { return b.call(&req, &res) })
if err != nil {
return "", err
@ -306,7 +309,7 @@ func (b *BitcoinRPC) GetBlockHash(height uint32) (string, error) {
res := resGetBlockHash{}
req := cmdGetBlockHash{Method: "getblockhash"}
req.Params.Height = height
err := b.call(&req, &res)
err := b.observeRPCLatency(req.Method, func() error { return b.call(&req, &res) })
if err != nil {
return "", errors.Annotatef(err, "height %v", height)
@ -325,7 +328,7 @@ func (b *BitcoinRPC) GetBlockHeader(hash string) (*bchain.BlockHeader, error) {
req := cmdGetBlockHeader{Method: "getblockheader"}
req.Params.BlockHash = hash
req.Params.Verbose = true
err := b.call(&req, &res)
err := b.observeRPCLatency(req.Method, func() error { return b.call(&req, &res) })
if err != nil {
return nil, errors.Annotatef(err, "hash %v", hash)
@ -388,7 +391,7 @@ func (b *BitcoinRPC) GetBlockRaw(hash string) ([]byte, error) {
req := cmdGetBlock{Method: "getblock"}
req.Params.BlockHash = hash
req.Params.Verbosity = 0
err := b.call(&req, &res)
err := b.observeRPCLatency(req.Method, func() error { return b.call(&req, &res) })
if err != nil {
return nil, errors.Annotatef(err, "hash %v", hash)
@ -408,7 +411,7 @@ func (b *BitcoinRPC) GetBlockList(hash string) (*bchain.Block, error) {
req := cmdGetBlock{Method: "getblock"}
req.Params.BlockHash = hash
req.Params.Verbosity = 1
err := b.call(&req, &res)
err := b.observeRPCLatency(req.Method, func() error { return b.call(&req, &res) })
if err != nil {
return nil, errors.Annotatef(err, "hash %v", hash)
@ -440,7 +443,7 @@ func (b *BitcoinRPC) GetBlockFull(hash string) (*bchain.Block, error) {
req := cmdGetBlock{Method: "getblock"}
req.Params.BlockHash = hash
req.Params.Verbosity = 2
err := b.call(&req, &res)
err := b.observeRPCLatency(req.Method, func() error { return b.call(&req, &res) })
if err != nil {
return nil, errors.Annotatef(err, "hash %v", hash)
@ -457,7 +460,7 @@ func (b *BitcoinRPC) GetMempool() ([]string, error) {
res := resGetMempool{}
req := cmdGetMempool{Method: "getrawmempool"}
err := b.call(&req, &res)
err := b.observeRPCLatency(req.Method, func() error { return b.call(&req, &res) })
if err != nil {
return nil, err
@ -476,7 +479,7 @@ func (b *BitcoinRPC) GetTransaction(txid string) (*bchain.Tx, error) {
req := cmdGetRawTransaction{Method: "getrawtransaction"}
req.Params.Txid = txid
req.Params.Verbose = true
err := b.call(&req, &res)
err := b.observeRPCLatency(req.Method, func() error { return b.call(&req, &res) })
if err != nil {
return nil, errors.Annotatef(err, "txid %v", txid)
@ -515,7 +518,7 @@ func (b *BitcoinRPC) EstimateSmartFee(blocks int, conservative bool) (float64, e
} else {
req.Params.EstimateMode = "ECONOMICAL"
}
err := b.call(&req, &res)
err := b.observeRPCLatency(req.Method, func() error { return b.call(&req, &res) })
if err != nil {
return 0, err
@ -533,7 +536,7 @@ func (b *BitcoinRPC) SendRawTransaction(tx string) (string, error) {
res := resSendRawTransaction{}
req := cmdSendRawTransaction{Method: "sendrawtransaction"}
req.Params = []string{tx}
err := b.call(&req, &res)
err := b.observeRPCLatency(req.Method, func() error { return b.call(&req, &res) })
if err != nil {
return "", err
@ -552,7 +555,7 @@ func (b *BitcoinRPC) GetMempoolEntry(txid string) (*bchain.MempoolEntry, error)
Method: "getmempoolentry",
Params: []string{txid},
}
err := b.call(&req, &res)
err := b.observeRPCLatency(req.Method, func() error { return b.call(&req, &res) })
if err != nil {
return nil, err
@ -563,6 +566,15 @@ func (b *BitcoinRPC) GetMempoolEntry(txid string) (*bchain.MempoolEntry, error)
return res.Result, nil
}
func (b *BitcoinRPC) observeRPCLatency(method string, fn func() error) error {
start := time.Now()
err := fn()
if err == nil {
b.metrics.BlockChainLatency.With(common.Labels{"coin": "bitcoin", "method": method}).Observe(float64(time.Since(start)) / 1e6) // in milliseconds
}
return err
}
func (b *BitcoinRPC) call(req interface{}, res interface{}) error {
httpData, err := json.Marshal(req)
if err != nil {

View File

@ -3,6 +3,7 @@ package zec
import (
"blockbook/bchain"
"blockbook/bchain/coins/btc"
"blockbook/common"
"time"
)
@ -10,8 +11,8 @@ type ZCashRPC struct {
*btc.BitcoinRPC
}
func NewZCashRPC(url string, user string, password string, timeout time.Duration, parse bool) (bchain.BlockChain, error) {
b, err := btc.NewBitcoinRPC(url, user, password, timeout, parse)
func NewZCashRPC(url string, user string, password string, timeout time.Duration, parse bool, metrics *common.Metrics) (bchain.BlockChain, error) {
b, err := btc.NewBitcoinRPC(url, user, password, timeout, parse, metrics)
if err != nil {
return nil, err
}

View File

@ -1,6 +1,7 @@
package bchain
import (
"blockbook/common"
"encoding/hex"
"sync"
"time"
@ -30,11 +31,12 @@ type Mempool struct {
txToInputOutput map[string]inputOutput
scriptToTx map[string][]outpoint
inputs map[outpoint]string
metrics *common.Metrics
}
// NewMempool creates new mempool handler.
func NewMempool(chain BlockChain) *Mempool {
return &Mempool{chain: chain}
func NewMempool(chain BlockChain, metrics *common.Metrics) *Mempool {
return &Mempool{chain: chain, metrics: metrics}
}
// GetTransactions returns slice of mempool transactions for given output script.
@ -76,6 +78,7 @@ func (m *Mempool) Resync(onNewTxAddr func(txid string, addr string)) error {
glog.V(1).Info("Mempool: resync")
txs, err := m.chain.GetMempool()
if err != nil {
m.metrics.MempoolResyncErrors.With(common.Labels{"error": err.Error()}).Inc()
return err
}
newTxToInputOutput := make(map[string]inputOutput, len(m.txToInputOutput)+1)
@ -86,6 +89,7 @@ func (m *Mempool) Resync(onNewTxAddr func(txid string, addr string)) error {
if !exists {
tx, err := m.chain.GetTransaction(txid)
if err != nil {
m.metrics.MempoolResyncErrors.With(common.Labels{"error": err.Error()}).Inc()
glog.Error("cannot get transaction ", txid, ": ", err)
continue
}
@ -116,6 +120,8 @@ func (m *Mempool) Resync(onNewTxAddr func(txid string, addr string)) error {
}
}
m.updateMappings(newTxToInputOutput, newScriptToTx, newInputs)
glog.Info("Mempool: resync finished in ", time.Since(start), ", ", len(m.txToInputOutput), " transactions in mempool")
d := time.Since(start)
glog.Info("Mempool: resync finished in ", d, ", ", len(m.txToInputOutput), " transactions in mempool")
m.metrics.MempoolResyncDuration.Observe(float64(d) / 1e6) // in milliseconds
return nil
}

View File

@ -14,6 +14,7 @@ import (
"blockbook/bchain"
"blockbook/bchain/coins"
"blockbook/common"
"blockbook/db"
"blockbook/server"
@ -109,8 +110,12 @@ func main() {
return
}
var err error
if chain, err = coins.NewBlockChain(*coin, *rpcURL, *rpcUser, *rpcPass, time.Duration(*rpcTimeout)*time.Second, *parse); err != nil {
metrics, err := common.GetMetrics()
if err != nil {
glog.Fatal("GetMetrics: ", err)
}
if chain, err = coins.NewBlockChain(*coin, *rpcURL, *rpcUser, *rpcPass, time.Duration(*rpcTimeout)*time.Second, *parse, metrics); err != nil {
glog.Fatal("rpc: ", err)
}
@ -138,7 +143,7 @@ func main() {
return
}
syncWorker, err = db.NewSyncWorker(index, chain, *syncWorkers, *syncChunk, *blockFrom, *dryRun, chanOsSignal)
syncWorker, err = db.NewSyncWorker(index, chain, *syncWorkers, *syncChunk, *blockFrom, *dryRun, chanOsSignal, metrics)
if err != nil {
glog.Fatalf("NewSyncWorker %v", err)
}
@ -154,7 +159,7 @@ func main() {
}
}
if txCache, err = db.NewTxCache(index, chain); err != nil {
if txCache, err = db.NewTxCache(index, chain, metrics); err != nil {
glog.Error("txCache ", err)
return
}
@ -181,7 +186,8 @@ func main() {
var socketIoServer *server.SocketIoServer
if *socketIoBinding != "" {
socketIoServer, err = server.NewSocketIoServer(*socketIoBinding, *certFiles, index, chain, txCache, *explorerURL)
socketIoServer, err = server.NewSocketIoServer(
*socketIoBinding, *certFiles, index, chain, txCache, *explorerURL, metrics)
if err != nil {
glog.Error("socketio: ", err)
return

117
common/metrics.go Normal file
View File

@ -0,0 +1,117 @@
package common
import (
"reflect"
"github.com/prometheus/client_golang/prometheus"
)
type Metrics struct {
RPCRequests *prometheus.CounterVec
SubscribeRequests *prometheus.CounterVec
Clients *prometheus.GaugeVec
RequestDuration *prometheus.HistogramVec
IndexResyncDuration prometheus.Histogram
MempoolResyncDuration prometheus.Histogram
TxCacheEfficiency *prometheus.CounterVec
BlockChainLatency *prometheus.HistogramVec
IndexResyncErrors *prometheus.CounterVec
MempoolResyncErrors *prometheus.CounterVec
IndexDBSize prometheus.Gauge
}
type Labels = prometheus.Labels
func GetMetrics() (*Metrics, error) {
metrics := Metrics{}
metrics.RPCRequests = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "blockbook_rpc_requests",
Help: "Total number of RPC requests by transport, method and status",
},
[]string{"transport", "method", "status"},
)
metrics.SubscribeRequests = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "blockbook_subscribe_requests",
Help: "Total number of subscribe requests by transport, channel and status",
},
[]string{"transport", "channel", "status"},
)
metrics.Clients = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "blockbook_clients",
Help: "Number of currently connected clients by transport",
},
[]string{"transport"},
)
metrics.RequestDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "blockbook_request_duration",
Help: "Request duration by method (in microseconds)",
Buckets: []float64{1, 5, 10, 25, 50, 75, 100, 250},
},
[]string{"transport", "method"},
)
metrics.IndexResyncDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: "blockbook_index_resync_duration",
Help: "Duration of index resync operation (in milliseconds)",
Buckets: []float64{100, 250, 500, 750, 1000, 10000, 30000, 60000},
},
)
metrics.MempoolResyncDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: "blockbook_mempool_resync_duration",
Help: "Duration of mempool resync operation (in milliseconds)",
Buckets: []float64{1, 5, 10, 25, 50, 75, 100, 250},
},
)
metrics.TxCacheEfficiency = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "blockbook_txcache_efficiency",
Help: "Efficiency of txCache",
},
[]string{"status"},
)
metrics.BlockChainLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "blockbook_blockchain_latency",
Help: "Latency of blockchain RPC by coin and method (in milliseconds)",
Buckets: []float64{1, 5, 10, 25, 50, 75, 100, 250},
},
[]string{"coin", "method"},
)
metrics.IndexResyncErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "blockbook_index_resync_errors",
Help: "Number of errors of index resync operation",
},
[]string{"error"},
)
metrics.MempoolResyncErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "blockbook_mempool_resync_errors",
Help: "Number of errors of mempool resync operation",
},
[]string{"error"},
)
metrics.IndexDBSize = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "blockbook_index_db_size",
Help: "Size of index database (in bytes)",
},
)
v := reflect.ValueOf(metrics)
for i := 0; i < v.NumField(); i++ {
c := v.Field(i).Interface().(prometheus.Collector)
err := prometheus.Register(c)
if err != nil {
return nil, err
}
}
return &metrics, nil
}

View File

@ -541,8 +541,13 @@ func dirSize(path string) (int64, error) {
}
// DatabaseSizeOnDisk returns size of the database in bytes
func (d *RocksDB) DatabaseSizeOnDisk() (int64, error) {
return dirSize(d.path)
func (d *RocksDB) DatabaseSizeOnDisk() int64 {
size, err := dirSize(d.path)
if err != nil {
glog.Error("rocksdb: DatabaseSizeOnDisk: ", err)
return 0
}
return size
}
// GetTx returns transaction stored in db and height of the block containing it

View File

@ -2,6 +2,7 @@ package db
import (
"blockbook/bchain"
"blockbook/common"
"os"
"sync"
"sync/atomic"
@ -19,10 +20,11 @@ type SyncWorker struct {
dryRun bool
startHeight uint32
chanOsSignal chan os.Signal
metrics *common.Metrics
}
// NewSyncWorker creates new SyncWorker and returns its handle
func NewSyncWorker(db *RocksDB, chain bchain.BlockChain, syncWorkers, syncChunk int, minStartHeight int, dryRun bool, chanOsSignal chan os.Signal) (*SyncWorker, error) {
func NewSyncWorker(db *RocksDB, chain bchain.BlockChain, syncWorkers, syncChunk int, minStartHeight int, dryRun bool, chanOsSignal chan os.Signal, metrics *common.Metrics) (*SyncWorker, error) {
if minStartHeight < 0 {
minStartHeight = 0
}
@ -34,12 +36,37 @@ func NewSyncWorker(db *RocksDB, chain bchain.BlockChain, syncWorkers, syncChunk
dryRun: dryRun,
startHeight: uint32(minStartHeight),
chanOsSignal: chanOsSignal,
metrics: metrics,
}, nil
}
var synced = errors.New("synced")
// ResyncIndex synchronizes index to the top of the blockchain
// onNewBlock is called when new block is connected, but not in initial parallel sync
func (w *SyncWorker) ResyncIndex(onNewBlock func(hash string)) error {
start := time.Now()
err := w.resyncIndex(onNewBlock)
switch err {
case nil:
d := time.Since(start)
glog.Info("resync: finished in ", d)
w.metrics.IndexResyncDuration.Observe(float64(d) / 1e6) // in milliseconds
w.metrics.IndexDBSize.Set(float64(w.db.DatabaseSizeOnDisk()))
fallthrough
case synced:
// this is not actually error but flag that resync wasn't necessary
return nil
}
w.metrics.IndexResyncErrors.With(common.Labels{"error": err.Error()}).Inc()
return err
}
func (w *SyncWorker) resyncIndex(onNewBlock func(hash string)) error {
remote, err := w.chain.GetBestBlockHash()
if err != nil {
return err
@ -53,7 +80,7 @@ func (w *SyncWorker) ResyncIndex(onNewBlock func(hash string)) error {
// network, we're done.
if local == remote {
glog.Infof("resync: synced on %d %s", localBestHeight, local)
return nil
return synced
}
var header *bchain.BlockHeader
@ -94,7 +121,7 @@ func (w *SyncWorker) ResyncIndex(onNewBlock func(hash string)) error {
if err != nil {
return err
}
return w.ResyncIndex(onNewBlock)
return w.resyncIndex(onNewBlock)
}
}
@ -128,7 +155,7 @@ func (w *SyncWorker) ResyncIndex(onNewBlock func(hash string)) error {
}
// after parallel load finish the sync using standard way,
// new blocks may have been created in the meantime
return w.ResyncIndex(onNewBlock)
return w.resyncIndex(onNewBlock)
}
}
@ -188,6 +215,7 @@ func (w *SyncWorker) connectBlocksParallel(lower, higher uint32) error {
return
}
glog.Error("Worker ", i, " connect block error ", err, ". Retrying...")
w.metrics.IndexResyncErrors.With(common.Labels{"error": err.Error()}).Inc()
time.Sleep(time.Millisecond * 500)
} else {
break
@ -219,6 +247,7 @@ ConnectLoop:
hash, err = w.chain.GetBlockHash(h)
if err != nil {
glog.Error("GetBlockHash error ", err)
w.metrics.IndexResyncErrors.With(common.Labels{"error": err.Error()}).Inc()
time.Sleep(time.Millisecond * 500)
continue
}

View File

@ -2,21 +2,24 @@ package db
import (
"blockbook/bchain"
"blockbook/common"
"github.com/golang/glog"
)
// TxCache is handle to TxCacheServer
type TxCache struct {
db *RocksDB
chain bchain.BlockChain
db *RocksDB
chain bchain.BlockChain
metrics *common.Metrics
}
// NewTxCache creates new TxCache interface and returns its handle
func NewTxCache(db *RocksDB, chain bchain.BlockChain) (*TxCache, error) {
func NewTxCache(db *RocksDB, chain bchain.BlockChain, metrics *common.Metrics) (*TxCache, error) {
return &TxCache{
db: db,
chain: chain,
db: db,
chain: chain,
metrics: metrics,
}, nil
}
@ -29,12 +32,14 @@ func (c *TxCache) GetTransaction(txid string, bestheight uint32) (*bchain.Tx, er
}
if tx != nil {
tx.Confirmations = bestheight - h
c.metrics.TxCacheEfficiency.With(common.Labels{"status": "hit"}).Inc()
return tx, nil
}
tx, err = c.chain.GetTransaction(txid)
if err != nil {
return nil, err
}
c.metrics.TxCacheEfficiency.With(common.Labels{"status": "miss"}).Inc()
// do not cache mempool transactions
if tx.Confirmations > 0 {
err = c.db.PutTx(tx, bestheight-tx.Confirmations, tx.Blocktime)

View File

@ -6,9 +6,11 @@ RUN apt-get update && apt-get install -y \
build-essential git wget pkg-config lxc-dev libzmq3-dev libgflags-dev \
libsnappy-dev zlib1g-dev libbz2-dev liblz4-dev graphviz
ENV GOLANG_VERSION=go1.9.2.linux-amd64
ENV GOLANG_VERSION=go1.10.linux-amd64
ENV GOPATH=/go
ENV PATH=$PATH:$GOPATH/bin
ENV CGO_CFLAGS="-I/opt/rocksdb/include"
ENV CGO_LDFLAGS="-L/opt/rocksdb -lrocksdb -lstdc++ -lm -lz -lbz2 -lsnappy -llz4"
# install and configure go
RUN cd /opt && wget https://storage.googleapis.com/golang/$GOLANG_VERSION.tar.gz && \
@ -30,11 +32,8 @@ RUN cd $GOPATH/src/blockbook && dep ensure
# install gorocksdb
RUN cd $GOPATH/src/blockbook/vendor/github.com/tecbot/gorocksdb && \
CGO_CFLAGS="-I/opt/rocksdb/include" \
CGO_LDFLAGS="-L/opt/rocksdb -lrocksdb -lstdc++ -lm -lz -lbz2 -lsnappy -llz4" \
go install .
WORKDIR $GOPATH/src/blockbook
CMD go build -o /out/blockbook

View File

@ -14,6 +14,7 @@ import (
"github.com/gorilla/handlers"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
// HTTPServer is handle to HttpServer
@ -47,6 +48,7 @@ func NewHTTPServer(httpServerBinding string, certFiles string, db *db.RocksDB, c
r.HandleFunc("/transactions/{address}/{lower}/{higher}", s.transactions)
r.HandleFunc("/confirmedTransactions/{address}/{lower}/{higher}", s.confirmedTransactions)
r.HandleFunc("/unconfirmedTransactions/{address}", s.unconfirmedTransactions)
r.HandleFunc("/metrics", promhttp.Handler().ServeHTTP)
var h http.Handler = r
h = handlers.LoggingHandler(os.Stderr, h)

View File

@ -2,12 +2,14 @@ package server
import (
"blockbook/bchain"
"blockbook/common"
"blockbook/db"
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"time"
"github.com/juju/errors"
@ -27,18 +29,21 @@ type SocketIoServer struct {
chain bchain.BlockChain
chainParser bchain.BlockChainParser
explorerURL string
metrics *common.Metrics
}
// NewSocketIoServer creates new SocketIo interface to blockbook and returns its handle
func NewSocketIoServer(binding string, certFiles string, db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, explorerURL string) (*SocketIoServer, error) {
func NewSocketIoServer(binding string, certFiles string, db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, explorerURL string, metrics *common.Metrics) (*SocketIoServer, error) {
server := gosocketio.NewServer(transport.GetDefaultWebsocketTransport())
server.On(gosocketio.OnConnection, func(c *gosocketio.Channel) {
glog.Info("Client connected ", c.Id())
metrics.Clients.With(common.Labels{"transport": "socketio"}).Inc()
})
server.On(gosocketio.OnDisconnection, func(c *gosocketio.Channel) {
glog.Info("Client disconnected ", c.Id())
metrics.Clients.With(common.Labels{"transport": "socketio"}).Dec()
})
server.On(gosocketio.OnError, func(c *gosocketio.Channel) {
@ -67,6 +72,7 @@ func NewSocketIoServer(binding string, certFiles string, db *db.RocksDB, chain b
chain: chain,
chainParser: chain.GetChainParser(),
explorerURL: explorerURL,
metrics: metrics,
}
// support for tests of socket.io interface
@ -128,52 +134,52 @@ type reqRange struct {
}
var onMessageHandlers = map[string]func(*SocketIoServer, json.RawMessage) (interface{}, error){
"\"getAddressTxids\"": func(s *SocketIoServer, params json.RawMessage) (rv interface{}, err error) {
"getAddressTxids": func(s *SocketIoServer, params json.RawMessage) (rv interface{}, err error) {
addr, rr, err := unmarshalGetAddressRequest(params)
if err == nil {
rv, err = s.getAddressTxids(addr, &rr)
}
return
},
"\"getAddressHistory\"": func(s *SocketIoServer, params json.RawMessage) (rv interface{}, err error) {
"getAddressHistory": func(s *SocketIoServer, params json.RawMessage) (rv interface{}, err error) {
addr, rr, err := unmarshalGetAddressRequest(params)
if err == nil {
rv, err = s.getAddressHistory(addr, &rr)
}
return
},
"\"getBlockHeader\"": func(s *SocketIoServer, params json.RawMessage) (rv interface{}, err error) {
"getBlockHeader": func(s *SocketIoServer, params json.RawMessage) (rv interface{}, err error) {
height, hash, err := unmarshalGetBlockHeader(params)
if err == nil {
rv, err = s.getBlockHeader(height, hash)
}
return
},
"\"estimateSmartFee\"": func(s *SocketIoServer, params json.RawMessage) (rv interface{}, err error) {
"estimateSmartFee": func(s *SocketIoServer, params json.RawMessage) (rv interface{}, err error) {
blocks, conservative, err := unmarshalEstimateSmartFee(params)
if err == nil {
rv, err = s.estimateSmartFee(blocks, conservative)
}
return
},
"\"getInfo\"": func(s *SocketIoServer, params json.RawMessage) (rv interface{}, err error) {
"getInfo": func(s *SocketIoServer, params json.RawMessage) (rv interface{}, err error) {
return s.getInfo()
},
"\"getDetailedTransaction\"": func(s *SocketIoServer, params json.RawMessage) (rv interface{}, err error) {
"getDetailedTransaction": func(s *SocketIoServer, params json.RawMessage) (rv interface{}, err error) {
txid, err := unmarshalStringParameter(params)
if err == nil {
rv, err = s.getDetailedTransaction(txid)
}
return
},
"\"sendTransaction\"": func(s *SocketIoServer, params json.RawMessage) (rv interface{}, err error) {
"sendTransaction": func(s *SocketIoServer, params json.RawMessage) (rv interface{}, err error) {
tx, err := unmarshalStringParameter(params)
if err == nil {
rv, err = s.sendTransaction(tx)
}
return
},
"\"getMempoolEntry\"": func(s *SocketIoServer, params json.RawMessage) (rv interface{}, err error) {
"getMempoolEntry": func(s *SocketIoServer, params json.RawMessage) (rv interface{}, err error) {
txid, err := unmarshalStringParameter(params)
if err == nil {
rv, err = s.getMempoolEntry(txid)
@ -191,8 +197,10 @@ type resultError struct {
func (s *SocketIoServer) onMessage(c *gosocketio.Channel, req map[string]json.RawMessage) interface{} {
var err error
var rv interface{}
method := string(req["method"])
t := time.Now()
method := strings.Trim(string(req["method"]), "\"")
params := req["params"]
defer s.metrics.RequestDuration.With(common.Labels{"transport": "socketio", "method": method}).Observe(float64(time.Since(t)) / 1e3) // in microseconds
f, ok := onMessageHandlers[method]
if ok {
rv, err = f(s, params)
@ -201,9 +209,11 @@ func (s *SocketIoServer) onMessage(c *gosocketio.Channel, req map[string]json.Ra
}
if err == nil {
glog.V(1).Info(c.Id(), " onMessage ", method, " success")
s.metrics.RPCRequests.With(common.Labels{"transport": "socketio", "method": method, "status": "success"}).Inc()
return rv
}
glog.Error(c.Id(), " onMessage ", method, ": ", errors.ErrorStack(err))
s.metrics.RPCRequests.With(common.Labels{"transport": "socketio", "method": method, "status": err.Error()}).Inc()
e := resultError{}
e.Error.Message = err.Error()
return e
@ -650,6 +660,11 @@ func (s *SocketIoServer) getMempoolEntry(txid string) (res resultGetMempoolEntry
// "bitcoind/hashblock"
// "bitcoind/addresstxid",["2MzTmvPJLZaLzD9XdN3jMtQA5NexC3rAPww","2NAZRJKr63tSdcTxTN3WaE9ZNDyXy6PgGuv"]
func (s *SocketIoServer) onSubscribe(c *gosocketio.Channel, req []byte) interface{} {
onError := func(id, sc, err string) {
glog.Error(id, " onSubscribe ", sc, ": ", err)
s.metrics.SubscribeRequests.With(common.Labels{"transport": "socketio", "channel": sc, "status": err}).Inc()
}
r := string(req)
glog.V(1).Info(c.Id(), " onSubscribe ", r)
var sc string
@ -658,12 +673,12 @@ func (s *SocketIoServer) onSubscribe(c *gosocketio.Channel, req []byte) interfac
var addrs []string
sc = r[1:i]
if sc != "bitcoind/addresstxid" {
glog.Error(c.Id(), " onSubscribe ", sc, ": invalid data")
onError(c.Id(), sc, "invalid data")
return nil
}
err := json.Unmarshal([]byte(r[i+2:]), &addrs)
if err != nil {
glog.Error(c.Id(), " onSubscribe ", sc, ": ", err)
onError(c.Id(), sc, err.Error())
return nil
}
for _, a := range addrs {
@ -672,11 +687,12 @@ func (s *SocketIoServer) onSubscribe(c *gosocketio.Channel, req []byte) interfac
} else {
sc = r[1 : len(r)-1]
if sc != "bitcoind/hashblock" {
glog.Error(c.Id(), " onSubscribe ", sc, ": invalid data")
onError(c.Id(), sc, "invalid data")
return nil
}
c.Join(sc)
}
s.metrics.SubscribeRequests.With(common.Labels{"transport": "socketio", "channel": sc, "status": "success"}).Inc()
return nil
}