Speedup btc mempool synchronization - use nonverbose getrawtransaction

This commit is contained in:
Martin Boehm 2018-05-14 18:12:01 +02:00
parent 20c51c1f50
commit a2c7625a59
7 changed files with 99 additions and 40 deletions

View File

@ -124,6 +124,11 @@ func (c *blockChainWithMetrics) GetTransaction(txid string) (v *bchain.Tx, err e
return c.b.GetTransaction(txid)
}
func (c *blockChainWithMetrics) GetTransactionForMempool(txid string) (v *bchain.Tx, err error) {
defer func(s time.Time) { c.observeRPCLatency("GetTransactionForMempool", s, err) }(time.Now())
return c.b.GetTransactionForMempool(txid)
}
func (c *blockChainWithMetrics) EstimateSmartFee(blocks int, conservative bool) (v float64, err error) {
defer func(s time.Time) { c.observeRPCLatency("EstimateSmartFee", s, err) }(time.Now())
return c.b.EstimateSmartFee(blocks, conservative)

View File

@ -244,6 +244,11 @@ type resGetRawTransaction struct {
Result bchain.Tx `json:"result"`
}
type resGetRawTransactionNonverbose struct {
Error *bchain.RPCError `json:"error"`
Result string `json:"result"`
}
// estimatesmartfee
type cmdEstimateSmartFee struct {
@ -545,6 +550,33 @@ func (b *BitcoinRPC) GetMempool() ([]string, error) {
return res.Result, nil
}
// GetTransactionForMempool returns a transaction by the transaction ID.
// It could be optimized for mempool, i.e. without block time and confirmations
func (b *BitcoinRPC) GetTransactionForMempool(txid string) (*bchain.Tx, error) {
glog.V(1).Info("rpc: getrawtransaction nonverbose ", txid)
res := resGetRawTransactionNonverbose{}
req := cmdGetRawTransaction{Method: "getrawtransaction"}
req.Params.Txid = txid
req.Params.Verbose = false
err := b.Call(&req, &res)
if err != nil {
return nil, errors.Annotatef(err, "txid %v", txid)
}
if res.Error != nil {
return nil, errors.Annotatef(res.Error, "txid %v", txid)
}
data, err := hex.DecodeString(res.Result)
if err != nil {
return nil, errors.Annotatef(err, "txid %v", txid)
}
tx, err := b.Parser.ParseTx(data)
if err != nil {
return nil, errors.Annotatef(err, "txid %v", txid)
}
return tx, nil
}
// GetTransaction returns a transaction by the transaction ID.
func (b *BitcoinRPC) GetTransaction(txid string) (*bchain.Tx, error) {
glog.V(1).Info("rpc: getrawtransaction ", txid)

View File

@ -388,6 +388,12 @@ func (b *EthereumRPC) GetBlock(hash string, height uint32) (*bchain.Block, error
return &bbk, nil
}
// GetTransactionForMempool returns a transaction by the transaction ID.
// It could be optimized for mempool, i.e. without block time and confirmations
func (b *EthereumRPC) GetTransactionForMempool(txid string) (*bchain.Tx, error) {
return b.GetTransaction(txid)
}
// GetTransaction returns a transaction by the transaction ID.
func (b *EthereumRPC) GetTransaction(txid string) (*bchain.Tx, error) {
ctx, cancel := context.WithTimeout(context.Background(), b.timeout)

View File

@ -133,6 +133,12 @@ func isInvalidTx(err error) bool {
return false
}
// GetTransactionForMempool returns a transaction by the transaction ID.
// It could be optimized for mempool, i.e. without block time and confirmations
func (z *ZCashRPC) GetTransactionForMempool(txid string) (*bchain.Tx, error) {
return z.GetTransaction(txid)
}
// GetTransaction returns a transaction by the transaction ID.
func (z *ZCashRPC) GetTransaction(txid string) (*bchain.Tx, error) {
glog.V(1).Info("rpc: getrawtransaction ", txid)

View File

@ -61,7 +61,7 @@ func (m *NonUTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) erro
for _, txid := range txs {
io, exists := m.txToInputOutput[txid]
if !exists {
tx, err := m.chain.GetTransaction(txid)
tx, err := m.chain.GetTransactionForMempool(txid)
if err != nil {
glog.Error("cannot get transaction ", txid, ": ", err)
continue

View File

@ -55,6 +55,51 @@ func (m *UTXOMempool) updateMappings(newTxToInputOutput map[string][]addrIndex,
m.addrIDToTx = newAddrIDToTx
}
func (m *UTXOMempool) getMempoolTxAddrs(txid string, onNewTxAddr func(txid string, addr string)) ([]addrIndex, bool) {
parser := m.chain.GetChainParser()
tx, err := m.chain.GetTransactionForMempool(txid)
if err != nil {
glog.Error("cannot get transaction ", txid, ": ", err)
return nil, false
}
io := make([]addrIndex, 0, len(tx.Vout)+len(tx.Vin))
for _, output := range tx.Vout {
addrID, err := parser.GetAddrIDFromVout(&output)
if err != nil {
glog.Error("error in addrID in ", txid, " ", output.N, ": ", err)
continue
}
if len(addrID) > 0 {
io = append(io, addrIndex{string(addrID), int32(output.N)})
}
if onNewTxAddr != nil && len(output.ScriptPubKey.Addresses) == 1 {
onNewTxAddr(tx.Txid, output.ScriptPubKey.Addresses[0])
}
}
for _, input := range tx.Vin {
if input.Coinbase != "" {
continue
}
// TODO - possibly get from DB unspenttxs - however some output txs can be in mempool only
itx, err := m.chain.GetTransactionForMempool(input.Txid)
if err != nil {
glog.Error("cannot get transaction ", input.Txid, ": ", err)
continue
}
if int(input.Vout) >= len(itx.Vout) {
glog.Error("Vout len in transaction ", input.Txid, " ", len(itx.Vout), " input.Vout=", input.Vout)
continue
}
addrID, err := parser.GetAddrIDFromVout(&itx.Vout[input.Vout])
if err != nil {
glog.Error("error in addrID in ", input.Txid, " ", input.Vout, ": ", err)
continue
}
io = append(io, addrIndex{string(addrID), int32(^input.Vout)})
}
return io, true
}
// Resync gets mempool transactions and maps outputs to transactions.
// Resync is not reentrant, it should be called from a single thread.
// Read operations (GetTransactions) are safe.
@ -65,53 +110,17 @@ func (m *UTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) error {
if err != nil {
return err
}
parser := m.chain.GetChainParser()
// allocate slightly larger capacity of the maps
newTxToInputOutput := make(map[string][]addrIndex, len(m.txToInputOutput)+5)
newAddrIDToTx := make(map[string][]outpoint, len(m.addrIDToTx)+5)
for _, txid := range txs {
io, exists := m.txToInputOutput[txid]
if !exists {
tx, err := m.chain.GetTransaction(txid)
if err != nil {
glog.Error("cannot get transaction ", txid, ": ", err)
var ok bool
io, ok = m.getMempoolTxAddrs(txid, onNewTxAddr)
if !ok {
continue
}
io = make([]addrIndex, 0, len(tx.Vout)+len(tx.Vin))
for _, output := range tx.Vout {
addrID, err := parser.GetAddrIDFromVout(&output)
if err != nil {
glog.Error("error in addrID in ", txid, " ", output.N, ": ", err)
continue
}
if len(addrID) > 0 {
io = append(io, addrIndex{string(addrID), int32(output.N)})
}
if onNewTxAddr != nil && len(output.ScriptPubKey.Addresses) == 1 {
onNewTxAddr(tx.Txid, output.ScriptPubKey.Addresses[0])
}
}
for _, input := range tx.Vin {
if input.Coinbase != "" {
continue
}
// TODO - possibly get from DB unspenttxs - however some output txs can be in mempool only
itx, err := m.chain.GetTransaction(input.Txid)
if err != nil {
glog.Error("cannot get transaction ", input.Txid, ": ", err)
continue
}
if int(input.Vout) >= len(itx.Vout) {
glog.Error("Vout len in transaction ", input.Txid, " ", len(itx.Vout), " input.Vout=", input.Vout)
continue
}
addrID, err := parser.GetAddrIDFromVout(&itx.Vout[input.Vout])
if err != nil {
glog.Error("error in addrID in ", input.Txid, " ", input.Vout, ": ", err)
continue
}
io = append(io, addrIndex{string(addrID), int32(^input.Vout)})
}
}
newTxToInputOutput[txid] = io
for _, si := range io {

View File

@ -135,6 +135,7 @@ type BlockChain interface {
GetBlock(hash string, height uint32) (*Block, error)
GetMempool() ([]string, error)
GetTransaction(txid string) (*Tx, error)
GetTransactionForMempool(txid string) (*Tx, error)
EstimateSmartFee(blocks int, conservative bool) (float64, error)
EstimateFee(blocks int) (float64, error)
SendRawTransaction(tx string) (string, error)