diff --git a/bchain/coins/blockchain.go b/bchain/coins/blockchain.go index d2406f22..6469966e 100644 --- a/bchain/coins/blockchain.go +++ b/bchain/coins/blockchain.go @@ -191,7 +191,7 @@ func (c *blockChainWithMetrics) SendRawTransaction(tx string) (v string, err err return c.b.SendRawTransaction(tx) } -func (c *blockChainWithMetrics) ResyncMempool(onNewTxAddr func(txid string, addr string, isOutput bool)) (count int, err error) { +func (c *blockChainWithMetrics) ResyncMempool(onNewTxAddr bchain.OnNewTxAddrFunc) (count int, err error) { defer func(s time.Time) { c.observeRPCLatency("ResyncMempool", s, err) }(time.Now()) count, err = c.b.ResyncMempool(onNewTxAddr) if err == nil { diff --git a/bchain/coins/btc/bitcoinrpc.go b/bchain/coins/btc/bitcoinrpc.go index 59d2a46d..e0211b28 100644 --- a/bchain/coins/btc/bitcoinrpc.go +++ b/bchain/coins/btc/bitcoinrpc.go @@ -607,7 +607,7 @@ func (b *BitcoinRPC) GetTransaction(txid string) (*bchain.Tx, error) { // ResyncMempool gets mempool transactions and maps output scripts to transactions. // ResyncMempool is not reentrant, it should be called from a single thread. // It returns number of transactions in mempool -func (b *BitcoinRPC) ResyncMempool(onNewTxAddr func(txid string, addr string, isOutput bool)) (int, error) { +func (b *BitcoinRPC) ResyncMempool(onNewTxAddr bchain.OnNewTxAddrFunc) (int, error) { return b.Mempool.Resync(onNewTxAddr) } diff --git a/bchain/coins/eth/ethrpc.go b/bchain/coins/eth/ethrpc.go index 43fb0a96..ec3790a2 100644 --- a/bchain/coins/eth/ethrpc.go +++ b/bchain/coins/eth/ethrpc.go @@ -530,7 +530,7 @@ func (b *EthereumRPC) SendRawTransaction(hex string) (string, error) { return result, nil } -func (b *EthereumRPC) ResyncMempool(onNewTxAddr func(txid string, addr string, isOutput bool)) (int, error) { +func (b *EthereumRPC) ResyncMempool(onNewTxAddr bchain.OnNewTxAddrFunc) (int, error) { return b.Mempool.Resync(onNewTxAddr) } diff --git a/bchain/mempool_nonutxo.go b/bchain/mempool_nonutxo.go index d32a8917..23f950c6 100644 --- a/bchain/mempool_nonutxo.go +++ b/bchain/mempool_nonutxo.go @@ -47,7 +47,7 @@ func (m *NonUTXOMempool) updateMappings(newTxToInputOutput map[string][]addrInde // Resync gets mempool transactions and maps outputs to transactions. // Resync is not reentrant, it should be called from a single thread. // Read operations (GetTransactions) are safe. -func (m *NonUTXOMempool) Resync(onNewTxAddr func(txid string, addr string, isOutput bool)) (int, error) { +func (m *NonUTXOMempool) Resync(onNewTxAddr OnNewTxAddrFunc) (int, error) { start := time.Now() glog.V(1).Info("Mempool: resync") txs, err := m.chain.GetMempool() diff --git a/bchain/mempool_utxo.go b/bchain/mempool_utxo.go index 32eb87af..5483ca8e 100644 --- a/bchain/mempool_utxo.go +++ b/bchain/mempool_utxo.go @@ -31,7 +31,7 @@ type UTXOMempool struct { addrIDToTx map[string][]outpoint chanTxid chan string chanAddrIndex chan txidio - onNewTxAddr func(txid string, addr string, isOutput bool) + onNewTxAddr OnNewTxAddrFunc } // NewUTXOMempool creates new mempool handler. @@ -166,7 +166,7 @@ func (m *UTXOMempool) getTxAddrs(txid string, chanInput chan outpoint, chanResul // Resync gets mempool transactions and maps outputs to transactions. // Resync is not reentrant, it should be called from a single thread. // Read operations (GetTransactions) are safe. -func (m *UTXOMempool) Resync(onNewTxAddr func(txid string, addr string, isOutput bool)) (int, error) { +func (m *UTXOMempool) Resync(onNewTxAddr OnNewTxAddrFunc) (int, error) { start := time.Now() glog.V(1).Info("mempool: resync") m.onNewTxAddr = onNewTxAddr diff --git a/bchain/types.go b/bchain/types.go index 6cba260f..c2d350bd 100644 --- a/bchain/types.go +++ b/bchain/types.go @@ -112,6 +112,12 @@ func (e *RPCError) Error() string { return fmt.Sprintf("%d: %s", e.Code, e.Message) } +// OnNewBlockFunc is used to send notification about a new block +type OnNewBlockFunc func(hash string, height uint32) + +// OnNewTxAddrFunc is used to send notification about a new transaction/address +type OnNewTxAddrFunc func(txid string, addr string, isOutput bool) + // BlockChain defines common interface to block chain daemon type BlockChain interface { // life-cycle methods @@ -136,7 +142,7 @@ type BlockChain interface { EstimateFee(blocks int) (float64, error) SendRawTransaction(tx string) (string, error) // mempool - ResyncMempool(onNewTxAddr func(txid string, addr string, isOutput bool)) (int, error) + ResyncMempool(onNewTxAddr OnNewTxAddrFunc) (int, error) GetMempoolTransactions(address string) ([]string, error) GetMempoolEntry(txid string) (*MempoolEntry, error) // parser diff --git a/blockbook.go b/blockbook.go index 6111e564..1de45946 100644 --- a/blockbook.go +++ b/blockbook.go @@ -85,8 +85,8 @@ var ( txCache *db.TxCache syncWorker *db.SyncWorker internalState *common.InternalState - callbacksOnNewBlockHash []func(hash string) - callbacksOnNewTxAddr []func(txid string, addr string, isOutput bool) + callbacksOnNewBlock []bchain.OnNewBlockFunc + callbacksOnNewTxAddr []bchain.OnNewTxAddrFunc chanOsSignal chan os.Signal inShutdown int32 ) @@ -281,7 +281,7 @@ func main() { } } }() - callbacksOnNewBlockHash = append(callbacksOnNewBlockHash, publicServer.OnNewBlockHash) + callbacksOnNewBlock = append(callbacksOnNewBlock, publicServer.OnNewBlock) callbacksOnNewTxAddr = append(callbacksOnNewTxAddr, publicServer.OnNewTxAddr) } @@ -392,9 +392,9 @@ func syncIndexLoop() { glog.Info("syncIndexLoop stopped") } -func onNewBlockHash(hash string) { - for _, c := range callbacksOnNewBlockHash { - c(hash) +func onNewBlockHash(hash string, height uint32) { + for _, c := range callbacksOnNewBlock { + c(hash, height) } } diff --git a/db/sync.go b/db/sync.go index a664e94f..6c2e281e 100644 --- a/db/sync.go +++ b/db/sync.go @@ -47,7 +47,7 @@ var errSynced = errors.New("synced") // ResyncIndex synchronizes index to the top of the blockchain // onNewBlock is called when new block is connected, but not in initial parallel sync -func (w *SyncWorker) ResyncIndex(onNewBlock func(hash string)) error { +func (w *SyncWorker) ResyncIndex(onNewBlock bchain.OnNewBlockFunc) error { start := time.Now() w.is.StartedSync() @@ -75,7 +75,7 @@ func (w *SyncWorker) ResyncIndex(onNewBlock func(hash string)) error { return err } -func (w *SyncWorker) resyncIndex(onNewBlock func(hash string)) error { +func (w *SyncWorker) resyncIndex(onNewBlock bchain.OnNewBlockFunc) error { remoteBestHash, err := w.chain.GetBestBlockHash() if err != nil { return err @@ -135,7 +135,7 @@ func (w *SyncWorker) resyncIndex(onNewBlock func(hash string)) error { return w.connectBlocks(onNewBlock) } -func (w *SyncWorker) handleFork(localBestHeight uint32, localBestHash string, onNewBlock func(hash string)) error { +func (w *SyncWorker) handleFork(localBestHeight uint32, localBestHash string, onNewBlock bchain.OnNewBlockFunc) error { // find forked blocks, disconnect them and then synchronize again var height uint32 hashes := []string{localBestHash} @@ -163,7 +163,7 @@ func (w *SyncWorker) handleFork(localBestHeight uint32, localBestHash string, on return w.resyncIndex(onNewBlock) } -func (w *SyncWorker) connectBlocks(onNewBlock func(hash string)) error { +func (w *SyncWorker) connectBlocks(onNewBlock bchain.OnNewBlockFunc) error { bch := make(chan blockResult, 8) done := make(chan struct{}) defer close(done) @@ -181,7 +181,7 @@ func (w *SyncWorker) connectBlocks(onNewBlock func(hash string)) error { return err } if onNewBlock != nil { - onNewBlock(res.block.Hash) + onNewBlock(res.block.Hash, res.block.Height) } if res.block.Height > 0 && res.block.Height%1000 == 0 { glog.Info("connected block ", res.block.Height, " ", res.block.Hash) diff --git a/server/public.go b/server/public.go index ebaa3beb..d208a3de 100644 --- a/server/public.go +++ b/server/public.go @@ -139,8 +139,8 @@ func (s *PublicServer) Shutdown(ctx context.Context) error { return s.https.Shutdown(ctx) } -// OnNewBlockHash notifies users subscribed to bitcoind/hashblock about new block -func (s *PublicServer) OnNewBlockHash(hash string) { +// OnNewBlock notifies users subscribed to bitcoind/hashblock about new block +func (s *PublicServer) OnNewBlock(hash string, height uint32) { s.socketio.OnNewBlockHash(hash) }