diff --git a/bchain/coins/blockchain.go b/bchain/coins/blockchain.go index eb09af3c..d2406f22 100644 --- a/bchain/coins/blockchain.go +++ b/bchain/coins/blockchain.go @@ -191,7 +191,7 @@ func (c *blockChainWithMetrics) SendRawTransaction(tx string) (v string, err err return c.b.SendRawTransaction(tx) } -func (c *blockChainWithMetrics) ResyncMempool(onNewTxAddr func(txid string, addr string)) (count int, err error) { +func (c *blockChainWithMetrics) ResyncMempool(onNewTxAddr func(txid string, addr string, isOutput bool)) (count int, err error) { defer func(s time.Time) { c.observeRPCLatency("ResyncMempool", s, err) }(time.Now()) count, err = c.b.ResyncMempool(onNewTxAddr) if err == nil { diff --git a/bchain/coins/btc/bitcoinrpc.go b/bchain/coins/btc/bitcoinrpc.go index e0176d16..59d2a46d 100644 --- a/bchain/coins/btc/bitcoinrpc.go +++ b/bchain/coins/btc/bitcoinrpc.go @@ -607,7 +607,7 @@ func (b *BitcoinRPC) GetTransaction(txid string) (*bchain.Tx, error) { // ResyncMempool gets mempool transactions and maps output scripts to transactions. // ResyncMempool is not reentrant, it should be called from a single thread. // It returns number of transactions in mempool -func (b *BitcoinRPC) ResyncMempool(onNewTxAddr func(txid string, addr string)) (int, error) { +func (b *BitcoinRPC) ResyncMempool(onNewTxAddr func(txid string, addr string, isOutput bool)) (int, error) { return b.Mempool.Resync(onNewTxAddr) } diff --git a/bchain/coins/eth/ethrpc.go b/bchain/coins/eth/ethrpc.go index 76c603d9..43fb0a96 100644 --- a/bchain/coins/eth/ethrpc.go +++ b/bchain/coins/eth/ethrpc.go @@ -530,7 +530,7 @@ func (b *EthereumRPC) SendRawTransaction(hex string) (string, error) { return result, nil } -func (b *EthereumRPC) ResyncMempool(onNewTxAddr func(txid string, addr string)) (int, error) { +func (b *EthereumRPC) ResyncMempool(onNewTxAddr func(txid string, addr string, isOutput bool)) (int, error) { return b.Mempool.Resync(onNewTxAddr) } diff --git a/bchain/mempool_nonutxo.go b/bchain/mempool_nonutxo.go index d46956aa..d32a8917 100644 --- a/bchain/mempool_nonutxo.go +++ b/bchain/mempool_nonutxo.go @@ -47,7 +47,7 @@ func (m *NonUTXOMempool) updateMappings(newTxToInputOutput map[string][]addrInde // Resync gets mempool transactions and maps outputs to transactions. // Resync is not reentrant, it should be called from a single thread. // Read operations (GetTransactions) are safe. -func (m *NonUTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) (int, error) { +func (m *NonUTXOMempool) Resync(onNewTxAddr func(txid string, addr string, isOutput bool)) (int, error) { start := time.Now() glog.V(1).Info("Mempool: resync") txs, err := m.chain.GetMempool() @@ -79,7 +79,7 @@ func (m *NonUTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) (int io = append(io, addrIndex{string(addrID), int32(output.N)}) } if onNewTxAddr != nil && len(output.ScriptPubKey.Addresses) == 1 { - onNewTxAddr(tx.Txid, output.ScriptPubKey.Addresses[0]) + onNewTxAddr(tx.Txid, output.ScriptPubKey.Addresses[0], true) } } for _, input := range tx.Vin { @@ -92,7 +92,7 @@ func (m *NonUTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) (int } io = append(io, addrIndex{string(addrID), int32(^i)}) if onNewTxAddr != nil { - onNewTxAddr(tx.Txid, a) + onNewTxAddr(tx.Txid, a, false) } } } diff --git a/bchain/mempool_utxo.go b/bchain/mempool_utxo.go index 1963bc19..32eb87af 100644 --- a/bchain/mempool_utxo.go +++ b/bchain/mempool_utxo.go @@ -31,7 +31,7 @@ type UTXOMempool struct { addrIDToTx map[string][]outpoint chanTxid chan string chanAddrIndex chan txidio - onNewTxAddr func(txid string, addr string) + onNewTxAddr func(txid string, addr string, isOutput bool) } // NewUTXOMempool creates new mempool handler. @@ -129,7 +129,7 @@ func (m *UTXOMempool) getTxAddrs(txid string, chanInput chan outpoint, chanResul io = append(io, addrIndex{string(addrID), int32(output.N)}) } if m.onNewTxAddr != nil && len(output.ScriptPubKey.Addresses) == 1 { - m.onNewTxAddr(tx.Txid, output.ScriptPubKey.Addresses[0]) + m.onNewTxAddr(tx.Txid, output.ScriptPubKey.Addresses[0], true) } } dispatched := 0 @@ -166,7 +166,7 @@ func (m *UTXOMempool) getTxAddrs(txid string, chanInput chan outpoint, chanResul // Resync gets mempool transactions and maps outputs to transactions. // Resync is not reentrant, it should be called from a single thread. // Read operations (GetTransactions) are safe. -func (m *UTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) (int, error) { +func (m *UTXOMempool) Resync(onNewTxAddr func(txid string, addr string, isOutput bool)) (int, error) { start := time.Now() glog.V(1).Info("mempool: resync") m.onNewTxAddr = onNewTxAddr diff --git a/bchain/types.go b/bchain/types.go index e2aa7276..6cba260f 100644 --- a/bchain/types.go +++ b/bchain/types.go @@ -136,7 +136,7 @@ type BlockChain interface { EstimateFee(blocks int) (float64, error) SendRawTransaction(tx string) (string, error) // mempool - ResyncMempool(onNewTxAddr func(txid string, addr string)) (int, error) + ResyncMempool(onNewTxAddr func(txid string, addr string, isOutput bool)) (int, error) GetMempoolTransactions(address string) ([]string, error) GetMempoolEntry(txid string) (*MempoolEntry, error) // parser diff --git a/blockbook.go b/blockbook.go index cea2c5b6..6111e564 100644 --- a/blockbook.go +++ b/blockbook.go @@ -86,7 +86,7 @@ var ( syncWorker *db.SyncWorker internalState *common.InternalState callbacksOnNewBlockHash []func(hash string) - callbacksOnNewTxAddr []func(txid string, addr string) + callbacksOnNewTxAddr []func(txid string, addr string, isOutput bool) chanOsSignal chan os.Signal inShutdown int32 ) @@ -444,9 +444,9 @@ func storeInternalStateLoop() { glog.Info("storeInternalStateLoop stopped") } -func onNewTxAddr(txid string, addr string) { +func onNewTxAddr(txid string, addr string, isOutput bool) { for _, c := range callbacksOnNewTxAddr { - c(txid, addr) + c(txid, addr, isOutput) } } diff --git a/server/public.go b/server/public.go index 54555325..ebaa3beb 100644 --- a/server/public.go +++ b/server/public.go @@ -145,8 +145,8 @@ func (s *PublicServer) OnNewBlockHash(hash string) { } // OnNewTxAddr notifies users subscribed to bitcoind/addresstxid about new block -func (s *PublicServer) OnNewTxAddr(txid string, addr string) { - s.socketio.OnNewTxAddr(txid, addr) +func (s *PublicServer) OnNewTxAddr(txid string, addr string, isOutput bool) { + s.socketio.OnNewTxAddr(txid, addr, isOutput) } func splitBinding(binding string) (addr string, path string) { diff --git a/server/socketio.go b/server/socketio.go index 46c0a563..241df197 100644 --- a/server/socketio.go +++ b/server/socketio.go @@ -694,8 +694,12 @@ func (s *SocketIoServer) OnNewBlockHash(hash string) { } // OnNewTxAddr notifies users subscribed to bitcoind/addresstxid about new block -func (s *SocketIoServer) OnNewTxAddr(txid string, addr string) { - c := s.server.BroadcastTo("bitcoind/addresstxid-"+addr, "bitcoind/addresstxid", map[string]string{"address": addr, "txid": txid}) +func (s *SocketIoServer) OnNewTxAddr(txid string, addr string, isOutput bool) { + data := map[string]interface{}{"address": addr, "txid": txid} + if !isOutput { + data["input"] = true + } + c := s.server.BroadcastTo("bitcoind/addresstxid-"+addr, "bitcoind/addresstxid", data) if c > 0 { glog.Info("broadcasting new txid ", txid, " for addr ", addr, " to ", c, " channels") }