diff --git a/api/worker.go b/api/worker.go index c9af9ee4..50372e0d 100644 --- a/api/worker.go +++ b/api/worker.go @@ -29,10 +29,11 @@ type Worker struct { chainType bchain.ChainType mempool bchain.Mempool is *common.InternalState + metrics *common.Metrics } // NewWorker creates new api worker -func NewWorker(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, is *common.InternalState) (*Worker, error) { +func NewWorker(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, metrics *common.Metrics, is *common.InternalState) (*Worker, error) { w := &Worker{ db: db, txCache: txCache, @@ -41,6 +42,10 @@ func NewWorker(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, chainType: chain.GetChainParser().GetChainType(), mempool: mempool, is: is, + metrics: metrics, + } + if w.chainType == bchain.ChainBitcoinType { + w.initXpubCache() } return w, nil } diff --git a/api/xpub.go b/api/xpub.go index 2b0fe8c9..1a458f85 100644 --- a/api/xpub.go +++ b/api/xpub.go @@ -19,10 +19,9 @@ const maxAddressesGap = 10000 const txInput = 1 const txOutput = 2 -const xpubCacheSize = 4096 const xpubCacheExpirationSeconds = 3600 -var cachedXpubs = make(map[string]xpubData) +var cachedXpubs map[string]xpubData var cachedXpubsMux sync.Mutex type xpubTxid struct { @@ -67,6 +66,35 @@ type xpubData struct { changeAddresses []xpubAddress } +func (w *Worker) initXpubCache() { + cachedXpubsMux.Lock() + if cachedXpubs == nil { + cachedXpubs = make(map[string]xpubData) + go func() { + for { + time.Sleep(20 * time.Second) + w.evictXpubCacheItems() + } + }() + } + cachedXpubsMux.Unlock() +} + +func (w *Worker) evictXpubCacheItems() { + cachedXpubsMux.Lock() + defer cachedXpubsMux.Unlock() + threshold := time.Now().Unix() - xpubCacheExpirationSeconds + count := 0 + for k, v := range cachedXpubs { + if v.accessed < threshold { + delete(cachedXpubs, k) + count++ + } + } + w.metrics.XPubCacheSize.Set(float64(len(cachedXpubs))) + glog.Info("Evicted ", count, " items from xpub cache, cache size ", len(cachedXpubs)) +} + func (w *Worker) xpubGetAddressTxids(addrDesc bchain.AddressDescriptor, mempool bool, fromHeight, toHeight uint32, maxResults int) ([]xpubTxid, bool, error) { var err error complete := true @@ -247,28 +275,6 @@ func (w *Worker) tokenFromXpubAddress(data *xpubData, ad *xpubAddress, changeInd } } -func evictXpubCacheItems() { - var oldestKey string - oldest := maxInt64 - now := time.Now().Unix() - count := 0 - for k, v := range cachedXpubs { - if v.accessed+xpubCacheExpirationSeconds < now { - delete(cachedXpubs, k) - count++ - } - if v.accessed < oldest { - oldestKey = k - oldest = v.accessed - } - } - if oldestKey != "" && oldest+xpubCacheExpirationSeconds >= now { - delete(cachedXpubs, oldestKey) - count++ - } - glog.V(1).Info("Evicted ", count, " items from xpub cache, oldest item accessed at ", time.Unix(oldest, 0), ", cache size ", len(cachedXpubs)) -} - func (w *Worker) getXpubData(xpub string, page int, txsOnPage int, option AccountDetails, filter *AddressFilter, gap int) (*xpubData, uint32, bool, error) { if w.chainType != bchain.ChainBitcoinType { return nil, 0, false, ErrUnsupportedXpub @@ -345,9 +351,6 @@ func (w *Worker) getXpubData(xpub string, page int, txsOnPage int, option Accoun } data.accessed = time.Now().Unix() cachedXpubsMux.Lock() - if len(cachedXpubs) >= xpubCacheSize { - evictXpubCacheItems() - } cachedXpubs[xpub] = data cachedXpubsMux.Unlock() return &data, bestheight, inCache, nil diff --git a/blockbook.go b/blockbook.go index 05928472..be1184f2 100644 --- a/blockbook.go +++ b/blockbook.go @@ -387,7 +387,7 @@ func getBlockChainWithRetry(coin string, configfile string, pushHandler func(bch } func startInternalServer() (*server.InternalServer, error) { - internalServer, err := server.NewInternalServer(*internalBinding, *certFiles, index, chain, mempool, txCache, internalState) + internalServer, err := server.NewInternalServer(*internalBinding, *certFiles, index, chain, mempool, txCache, metrics, internalState) if err != nil { return nil, err } @@ -453,7 +453,7 @@ func performRollback() error { } func blockbookAppInfoMetric(db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, is *common.InternalState, metrics *common.Metrics) error { - api, err := api.NewWorker(db, chain, mempool, txCache, is) + api, err := api.NewWorker(db, chain, mempool, txCache, metrics, is) if err != nil { return err } @@ -708,7 +708,7 @@ func normalizeName(s string) string { func computeFeeStats(stopCompute chan os.Signal, blockFrom, blockTo int, db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, is *common.InternalState, metrics *common.Metrics) error { start := time.Now() glog.Info("computeFeeStats start") - api, err := api.NewWorker(db, chain, mempool, txCache, is) + api, err := api.NewWorker(db, chain, mempool, txCache, metrics, is) if err != nil { return err } diff --git a/common/metrics.go b/common/metrics.go index b31b2f76..1171c7ca 100644 --- a/common/metrics.go +++ b/common/metrics.go @@ -32,6 +32,7 @@ type Metrics struct { ExplorerPendingRequests *prometheus.GaugeVec WebsocketPendingRequests *prometheus.GaugeVec SocketIOPendingRequests *prometheus.GaugeVec + XPubCacheSize prometheus.Gauge } // Labels represents a collection of label name -> value mappings. @@ -230,6 +231,13 @@ func GetMetrics(coin string) (*Metrics, error) { }, []string{"method"}, ) + metrics.XPubCacheSize = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "blockbook_xpub_cache_size", + Help: "Number of cached xpubs", + ConstLabels: Labels{"coin": coin}, + }, + ) v := reflect.ValueOf(metrics) for i := 0; i < v.NumField(); i++ { diff --git a/server/internal.go b/server/internal.go index c0c172cc..92d0dbd8 100644 --- a/server/internal.go +++ b/server/internal.go @@ -28,8 +28,8 @@ type InternalServer struct { } // NewInternalServer creates new internal http interface to blockbook and returns its handle -func NewInternalServer(binding, certFiles string, db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, is *common.InternalState) (*InternalServer, error) { - api, err := api.NewWorker(db, chain, mempool, txCache, is) +func NewInternalServer(binding, certFiles string, db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, metrics *common.Metrics, is *common.InternalState) (*InternalServer, error) { + api, err := api.NewWorker(db, chain, mempool, txCache, metrics, is) if err != nil { return nil, err } diff --git a/server/public.go b/server/public.go index c84bd965..bd100a67 100644 --- a/server/public.go +++ b/server/public.go @@ -60,7 +60,7 @@ type PublicServer struct { // only basic functionality is mapped, to map all functions, call func NewPublicServer(binding string, certFiles string, db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, explorerURL string, metrics *common.Metrics, is *common.InternalState, debugMode bool, enableSubNewTx bool) (*PublicServer, error) { - api, err := api.NewWorker(db, chain, mempool, txCache, is) + api, err := api.NewWorker(db, chain, mempool, txCache, metrics, is) if err != nil { return nil, err } diff --git a/server/socketio.go b/server/socketio.go index 2e844af9..5919db45 100644 --- a/server/socketio.go +++ b/server/socketio.go @@ -34,7 +34,7 @@ type SocketIoServer struct { // NewSocketIoServer creates new SocketIo interface to blockbook and returns its handle func NewSocketIoServer(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, metrics *common.Metrics, is *common.InternalState) (*SocketIoServer, error) { - api, err := api.NewWorker(db, chain, mempool, txCache, is) + api, err := api.NewWorker(db, chain, mempool, txCache, metrics, is) if err != nil { return nil, err } diff --git a/server/websocket.go b/server/websocket.go index 880d8e71..9e160250 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -81,7 +81,7 @@ type WebsocketServer struct { // NewWebsocketServer creates new websocket interface to blockbook and returns its handle func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, metrics *common.Metrics, is *common.InternalState, enableSubNewTx bool) (*WebsocketServer, error) { - api, err := api.NewWorker(db, chain, mempool, txCache, is) + api, err := api.NewWorker(db, chain, mempool, txCache, metrics, is) if err != nil { return nil, err }