Improve xpub cache

This commit is contained in:
Martin Boehm 2021-04-28 18:00:13 +02:00
parent 1df8f8eb69
commit 1d55a66fab
8 changed files with 52 additions and 36 deletions

View File

@ -29,10 +29,11 @@ type Worker struct {
chainType bchain.ChainType
mempool bchain.Mempool
is *common.InternalState
metrics *common.Metrics
}
// NewWorker creates new api worker
func NewWorker(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, is *common.InternalState) (*Worker, error) {
func NewWorker(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, metrics *common.Metrics, is *common.InternalState) (*Worker, error) {
w := &Worker{
db: db,
txCache: txCache,
@ -41,6 +42,10 @@ func NewWorker(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool,
chainType: chain.GetChainParser().GetChainType(),
mempool: mempool,
is: is,
metrics: metrics,
}
if w.chainType == bchain.ChainBitcoinType {
w.initXpubCache()
}
return w, nil
}

View File

@ -19,10 +19,9 @@ const maxAddressesGap = 10000
const txInput = 1
const txOutput = 2
const xpubCacheSize = 4096
const xpubCacheExpirationSeconds = 3600
var cachedXpubs = make(map[string]xpubData)
var cachedXpubs map[string]xpubData
var cachedXpubsMux sync.Mutex
type xpubTxid struct {
@ -67,6 +66,35 @@ type xpubData struct {
changeAddresses []xpubAddress
}
func (w *Worker) initXpubCache() {
cachedXpubsMux.Lock()
if cachedXpubs == nil {
cachedXpubs = make(map[string]xpubData)
go func() {
for {
time.Sleep(20 * time.Second)
w.evictXpubCacheItems()
}
}()
}
cachedXpubsMux.Unlock()
}
func (w *Worker) evictXpubCacheItems() {
cachedXpubsMux.Lock()
defer cachedXpubsMux.Unlock()
threshold := time.Now().Unix() - xpubCacheExpirationSeconds
count := 0
for k, v := range cachedXpubs {
if v.accessed < threshold {
delete(cachedXpubs, k)
count++
}
}
w.metrics.XPubCacheSize.Set(float64(len(cachedXpubs)))
glog.Info("Evicted ", count, " items from xpub cache, cache size ", len(cachedXpubs))
}
func (w *Worker) xpubGetAddressTxids(addrDesc bchain.AddressDescriptor, mempool bool, fromHeight, toHeight uint32, maxResults int) ([]xpubTxid, bool, error) {
var err error
complete := true
@ -247,28 +275,6 @@ func (w *Worker) tokenFromXpubAddress(data *xpubData, ad *xpubAddress, changeInd
}
}
func evictXpubCacheItems() {
var oldestKey string
oldest := maxInt64
now := time.Now().Unix()
count := 0
for k, v := range cachedXpubs {
if v.accessed+xpubCacheExpirationSeconds < now {
delete(cachedXpubs, k)
count++
}
if v.accessed < oldest {
oldestKey = k
oldest = v.accessed
}
}
if oldestKey != "" && oldest+xpubCacheExpirationSeconds >= now {
delete(cachedXpubs, oldestKey)
count++
}
glog.V(1).Info("Evicted ", count, " items from xpub cache, oldest item accessed at ", time.Unix(oldest, 0), ", cache size ", len(cachedXpubs))
}
func (w *Worker) getXpubData(xpub string, page int, txsOnPage int, option AccountDetails, filter *AddressFilter, gap int) (*xpubData, uint32, bool, error) {
if w.chainType != bchain.ChainBitcoinType {
return nil, 0, false, ErrUnsupportedXpub
@ -345,9 +351,6 @@ func (w *Worker) getXpubData(xpub string, page int, txsOnPage int, option Accoun
}
data.accessed = time.Now().Unix()
cachedXpubsMux.Lock()
if len(cachedXpubs) >= xpubCacheSize {
evictXpubCacheItems()
}
cachedXpubs[xpub] = data
cachedXpubsMux.Unlock()
return &data, bestheight, inCache, nil

View File

@ -387,7 +387,7 @@ func getBlockChainWithRetry(coin string, configfile string, pushHandler func(bch
}
func startInternalServer() (*server.InternalServer, error) {
internalServer, err := server.NewInternalServer(*internalBinding, *certFiles, index, chain, mempool, txCache, internalState)
internalServer, err := server.NewInternalServer(*internalBinding, *certFiles, index, chain, mempool, txCache, metrics, internalState)
if err != nil {
return nil, err
}
@ -453,7 +453,7 @@ func performRollback() error {
}
func blockbookAppInfoMetric(db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, is *common.InternalState, metrics *common.Metrics) error {
api, err := api.NewWorker(db, chain, mempool, txCache, is)
api, err := api.NewWorker(db, chain, mempool, txCache, metrics, is)
if err != nil {
return err
}
@ -708,7 +708,7 @@ func normalizeName(s string) string {
func computeFeeStats(stopCompute chan os.Signal, blockFrom, blockTo int, db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, is *common.InternalState, metrics *common.Metrics) error {
start := time.Now()
glog.Info("computeFeeStats start")
api, err := api.NewWorker(db, chain, mempool, txCache, is)
api, err := api.NewWorker(db, chain, mempool, txCache, metrics, is)
if err != nil {
return err
}

View File

@ -32,6 +32,7 @@ type Metrics struct {
ExplorerPendingRequests *prometheus.GaugeVec
WebsocketPendingRequests *prometheus.GaugeVec
SocketIOPendingRequests *prometheus.GaugeVec
XPubCacheSize prometheus.Gauge
}
// Labels represents a collection of label name -> value mappings.
@ -230,6 +231,13 @@ func GetMetrics(coin string) (*Metrics, error) {
},
[]string{"method"},
)
metrics.XPubCacheSize = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "blockbook_xpub_cache_size",
Help: "Number of cached xpubs",
ConstLabels: Labels{"coin": coin},
},
)
v := reflect.ValueOf(metrics)
for i := 0; i < v.NumField(); i++ {

View File

@ -28,8 +28,8 @@ type InternalServer struct {
}
// NewInternalServer creates new internal http interface to blockbook and returns its handle
func NewInternalServer(binding, certFiles string, db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, is *common.InternalState) (*InternalServer, error) {
api, err := api.NewWorker(db, chain, mempool, txCache, is)
func NewInternalServer(binding, certFiles string, db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, metrics *common.Metrics, is *common.InternalState) (*InternalServer, error) {
api, err := api.NewWorker(db, chain, mempool, txCache, metrics, is)
if err != nil {
return nil, err
}

View File

@ -60,7 +60,7 @@ type PublicServer struct {
// only basic functionality is mapped, to map all functions, call
func NewPublicServer(binding string, certFiles string, db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, explorerURL string, metrics *common.Metrics, is *common.InternalState, debugMode bool, enableSubNewTx bool) (*PublicServer, error) {
api, err := api.NewWorker(db, chain, mempool, txCache, is)
api, err := api.NewWorker(db, chain, mempool, txCache, metrics, is)
if err != nil {
return nil, err
}

View File

@ -34,7 +34,7 @@ type SocketIoServer struct {
// NewSocketIoServer creates new SocketIo interface to blockbook and returns its handle
func NewSocketIoServer(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, metrics *common.Metrics, is *common.InternalState) (*SocketIoServer, error) {
api, err := api.NewWorker(db, chain, mempool, txCache, is)
api, err := api.NewWorker(db, chain, mempool, txCache, metrics, is)
if err != nil {
return nil, err
}

View File

@ -81,7 +81,7 @@ type WebsocketServer struct {
// NewWebsocketServer creates new websocket interface to blockbook and returns its handle
func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, metrics *common.Metrics, is *common.InternalState, enableSubNewTx bool) (*WebsocketServer, error) {
api, err := api.NewWorker(db, chain, mempool, txCache, is)
api, err := api.NewWorker(db, chain, mempool, txCache, metrics, is)
if err != nil {
return nil, err
}