diff --git a/bchain/coins/blockchain.go b/bchain/coins/blockchain.go index ba67d0a0..d1d1a530 100644 --- a/bchain/coins/blockchain.go +++ b/bchain/coins/blockchain.go @@ -151,7 +151,7 @@ func (c *blockChainWithMetrics) SendRawTransaction(tx string) (v string, err err return c.b.SendRawTransaction(tx) } -func (c *blockChainWithMetrics) ResyncMempool(onNewTxAddr func(txid string, addr string)) (err error) { +func (c *blockChainWithMetrics) ResyncMempool(onNewTxAddr func(txid string, addr string)) (count int, err error) { defer func(s time.Time) { c.observeRPCLatency("ResyncMempool", s, err) }(time.Now()) return c.b.ResyncMempool(onNewTxAddr) } diff --git a/bchain/coins/btc/bitcoinrpc.go b/bchain/coins/btc/bitcoinrpc.go index f928ed2d..ea4bf567 100644 --- a/bchain/coins/btc/bitcoinrpc.go +++ b/bchain/coins/btc/bitcoinrpc.go @@ -598,7 +598,8 @@ func (b *BitcoinRPC) GetTransaction(txid string) (*bchain.Tx, error) { // ResyncMempool gets mempool transactions and maps output scripts to transactions. // ResyncMempool is not reentrant, it should be called from a single thread. -func (b *BitcoinRPC) ResyncMempool(onNewTxAddr func(txid string, addr string)) error { +// It returns number of transactions in mempool +func (b *BitcoinRPC) ResyncMempool(onNewTxAddr func(txid string, addr string)) (int, error) { return b.Mempool.Resync(onNewTxAddr) } diff --git a/bchain/coins/eth/ethrpc.go b/bchain/coins/eth/ethrpc.go index dfe3b23b..1464ebca 100644 --- a/bchain/coins/eth/ethrpc.go +++ b/bchain/coins/eth/ethrpc.go @@ -496,7 +496,7 @@ func (b *EthereumRPC) SendRawTransaction(tx string) (string, error) { return "", errors.New("SendRawTransaction: not implemented") } -func (b *EthereumRPC) ResyncMempool(onNewTxAddr func(txid string, addr string)) error { +func (b *EthereumRPC) ResyncMempool(onNewTxAddr func(txid string, addr string)) (int, error) { return b.Mempool.Resync(onNewTxAddr) } diff --git a/bchain/mempool_nonutxo.go b/bchain/mempool_nonutxo.go index 0c1f14b7..8ccf018c 100644 --- a/bchain/mempool_nonutxo.go +++ b/bchain/mempool_nonutxo.go @@ -47,12 +47,12 @@ func (m *NonUTXOMempool) updateMappings(newTxToInputOutput map[string][]addrInde // Resync gets mempool transactions and maps outputs to transactions. // Resync is not reentrant, it should be called from a single thread. // Read operations (GetTransactions) are safe. -func (m *NonUTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) error { +func (m *NonUTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) (int, error) { start := time.Now() glog.V(1).Info("Mempool: resync") txs, err := m.chain.GetMempool() if err != nil { - return err + return 0, err } parser := m.chain.GetChainParser() // allocate slightly larger capacity of the maps @@ -102,5 +102,5 @@ func (m *NonUTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) erro } m.updateMappings(newTxToInputOutput, newAddrIDToTx) glog.Info("Mempool: resync finished in ", time.Since(start), ", ", len(m.txToInputOutput), " transactions in mempool") - return nil + return len(m.txToInputOutput), nil } diff --git a/bchain/mempool_utxo.go b/bchain/mempool_utxo.go index e6233889..1963bc19 100644 --- a/bchain/mempool_utxo.go +++ b/bchain/mempool_utxo.go @@ -166,13 +166,13 @@ func (m *UTXOMempool) getTxAddrs(txid string, chanInput chan outpoint, chanResul // Resync gets mempool transactions and maps outputs to transactions. // Resync is not reentrant, it should be called from a single thread. // Read operations (GetTransactions) are safe. -func (m *UTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) error { +func (m *UTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) (int, error) { start := time.Now() glog.V(1).Info("mempool: resync") m.onNewTxAddr = onNewTxAddr txs, err := m.chain.GetMempool() if err != nil { - return err + return 0, err } glog.V(2).Info("mempool: resync ", len(txs), " txs") // allocate slightly larger capacity of the maps @@ -215,5 +215,5 @@ func (m *UTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) error { m.updateMappings(newTxToInputOutput, newAddrIDToTx) m.onNewTxAddr = nil glog.Info("mempool: resync finished in ", time.Since(start), ", ", len(m.txToInputOutput), " transactions in mempool") - return nil + return len(m.txToInputOutput), nil } diff --git a/bchain/types.go b/bchain/types.go index 3edc214c..51e34656 100644 --- a/bchain/types.go +++ b/bchain/types.go @@ -135,7 +135,7 @@ type BlockChain interface { EstimateFee(blocks int) (float64, error) SendRawTransaction(tx string) (string, error) // mempool - ResyncMempool(onNewTxAddr func(txid string, addr string)) error + ResyncMempool(onNewTxAddr func(txid string, addr string)) (int, error) GetMempoolTransactions(address string) ([]string, error) GetMempoolEntry(txid string) (*MempoolEntry, error) // parser diff --git a/blockbook.go b/blockbook.go index 29ed983f..2e71f3f0 100644 --- a/blockbook.go +++ b/blockbook.go @@ -238,7 +238,7 @@ func main() { glog.Error("resyncIndex ", err) return } - if err = chain.ResyncMempool(nil); err != nil { + if _, err = chain.ResyncMempool(nil); err != nil { glog.Error("resyncMempool ", err) return } @@ -385,10 +385,11 @@ func syncMempoolLoop() { // resync mempool about every minute if there are no chanSyncMempool requests, with debounce 1 second tickAndDebounce(resyncMempoolPeriodMs*time.Millisecond, debounceResyncMempoolMs*time.Millisecond, chanSyncMempool, func() { internalState.StartedMempoolSync() - if err := chain.ResyncMempool(onNewTxAddr); err != nil { + if count, err := chain.ResyncMempool(onNewTxAddr); err != nil { glog.Error("syncMempoolLoop ", errors.ErrorStack(err)) } else { - internalState.FinishedMempoolSync() + internalState.FinishedMempoolSync(count) + } }) glog.Info("syncMempoolLoop stopped") diff --git a/common/internalstate.go b/common/internalstate.go index 7ff0836e..d5ca398e 100644 --- a/common/internalstate.go +++ b/common/internalstate.go @@ -38,6 +38,7 @@ type InternalState struct { LastSync time.Time `json:"lastSync"` IsMempoolSynchronized bool `json:"isMempoolSynchronized"` + MempoolSize int `json:"mempoolSize"` LastMempoolSync time.Time `json:"lastMempoolSync"` DbColumns []InternalStateColumn `json:"dbColumns"` @@ -81,10 +82,11 @@ func (is *InternalState) StartedMempoolSync() { } // FinishedMempoolSync marks end of mempool synchronization -func (is *InternalState) FinishedMempoolSync() { +func (is *InternalState) FinishedMempoolSync(mempoolSize int) { is.mux.Lock() defer is.mux.Unlock() is.IsMempoolSynchronized = true + is.MempoolSize = mempoolSize is.LastMempoolSync = time.Now() }