Speedup btc mempool synchronization - get transactions in parallel

This commit is contained in:
Martin Boehm 2018-05-15 23:41:09 +02:00
parent a2c7625a59
commit 83907e08b8

View File

@ -7,6 +7,8 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
) )
const numberOfSyncRoutines = 8
// addrIndex and outpoint are used also in non utxo mempool // addrIndex and outpoint are used also in non utxo mempool
type addrIndex struct { type addrIndex struct {
addrID string addrID string
@ -18,17 +20,42 @@ type outpoint struct {
vout int32 vout int32
} }
type txidio struct {
txid string
io []addrIndex
}
// UTXOMempool is mempool handle. // UTXOMempool is mempool handle.
type UTXOMempool struct { type UTXOMempool struct {
chain BlockChain chain BlockChain
mux sync.Mutex mux sync.Mutex
txToInputOutput map[string][]addrIndex txToInputOutput map[string][]addrIndex
addrIDToTx map[string][]outpoint addrIDToTx map[string][]outpoint
chanTxid chan string
chanAddrIndex chan txidio
onNewTxAddr func(txid string, addr string)
} }
// NewUTXOMempool creates new mempool handler. // NewUTXOMempool creates new mempool handler.
// For now there is no cleanup of sync routines, the expectation is that the mempool is created only once per process
func NewUTXOMempool(chain BlockChain) *UTXOMempool { func NewUTXOMempool(chain BlockChain) *UTXOMempool {
return &UTXOMempool{chain: chain} m := &UTXOMempool{
chain: chain,
chanTxid: make(chan string, 1),
chanAddrIndex: make(chan txidio, 1),
}
for i := 0; i < numberOfSyncRoutines; i++ {
go func(i int) {
for txid := range m.chanTxid {
io, ok := m.getMempoolTxAddrs(txid)
if !ok {
io = []addrIndex{}
}
m.chanAddrIndex <- txidio{txid, io}
}
}(i)
}
return m
} }
// GetTransactions returns slice of mempool transactions for given address // GetTransactions returns slice of mempool transactions for given address
@ -55,7 +82,7 @@ func (m *UTXOMempool) updateMappings(newTxToInputOutput map[string][]addrIndex,
m.addrIDToTx = newAddrIDToTx m.addrIDToTx = newAddrIDToTx
} }
func (m *UTXOMempool) getMempoolTxAddrs(txid string, onNewTxAddr func(txid string, addr string)) ([]addrIndex, bool) { func (m *UTXOMempool) getMempoolTxAddrs(txid string) ([]addrIndex, bool) {
parser := m.chain.GetChainParser() parser := m.chain.GetChainParser()
tx, err := m.chain.GetTransactionForMempool(txid) tx, err := m.chain.GetTransactionForMempool(txid)
if err != nil { if err != nil {
@ -72,8 +99,8 @@ func (m *UTXOMempool) getMempoolTxAddrs(txid string, onNewTxAddr func(txid strin
if len(addrID) > 0 { if len(addrID) > 0 {
io = append(io, addrIndex{string(addrID), int32(output.N)}) io = append(io, addrIndex{string(addrID), int32(output.N)})
} }
if onNewTxAddr != nil && len(output.ScriptPubKey.Addresses) == 1 { if m.onNewTxAddr != nil && len(output.ScriptPubKey.Addresses) == 1 {
onNewTxAddr(tx.Txid, output.ScriptPubKey.Addresses[0]) m.onNewTxAddr(tx.Txid, output.ScriptPubKey.Addresses[0])
} }
} }
for _, input := range tx.Vin { for _, input := range tx.Vin {
@ -106,6 +133,7 @@ func (m *UTXOMempool) getMempoolTxAddrs(txid string, onNewTxAddr func(txid strin
func (m *UTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) error { func (m *UTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) error {
start := time.Now() start := time.Now()
glog.V(1).Info("Mempool: resync") glog.V(1).Info("Mempool: resync")
m.onNewTxAddr = onNewTxAddr
txs, err := m.chain.GetMempool() txs, err := m.chain.GetMempool()
if err != nil { if err != nil {
return err return err
@ -113,21 +141,42 @@ func (m *UTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) error {
// allocate slightly larger capacity of the maps // allocate slightly larger capacity of the maps
newTxToInputOutput := make(map[string][]addrIndex, len(m.txToInputOutput)+5) newTxToInputOutput := make(map[string][]addrIndex, len(m.txToInputOutput)+5)
newAddrIDToTx := make(map[string][]outpoint, len(m.addrIDToTx)+5) newAddrIDToTx := make(map[string][]outpoint, len(m.addrIDToTx)+5)
dispatched := 0
onNewData := func(txid string, io []addrIndex) {
if len(io) > 0 {
newTxToInputOutput[txid] = io
for _, si := range io {
newAddrIDToTx[si.addrID] = append(newAddrIDToTx[si.addrID], outpoint{txid, si.n})
}
}
}
// get transaction in parallel using goroutines created in NewUTXOMempool
for _, txid := range txs { for _, txid := range txs {
io, exists := m.txToInputOutput[txid] io, exists := m.txToInputOutput[txid]
if !exists { if !exists {
var ok bool loop:
io, ok = m.getMempoolTxAddrs(txid, onNewTxAddr) for {
if !ok { select {
continue // store as many processed transactions as possible
case tio := <-m.chanAddrIndex:
onNewData(tio.txid, tio.io)
dispatched--
// send transaction to be processed
case m.chanTxid <- txid:
dispatched++
break loop
}
} }
} } else {
newTxToInputOutput[txid] = io onNewData(txid, io)
for _, si := range io {
newAddrIDToTx[si.addrID] = append(newAddrIDToTx[si.addrID], outpoint{txid, si.n})
} }
} }
for i := 0; i < dispatched; i++ {
tio := <-m.chanAddrIndex
onNewData(tio.txid, tio.io)
}
m.updateMappings(newTxToInputOutput, newAddrIDToTx) m.updateMappings(newTxToInputOutput, newAddrIDToTx)
m.onNewTxAddr = nil
glog.Info("Mempool: resync finished in ", time.Since(start), ", ", len(m.txToInputOutput), " transactions in mempool") glog.Info("Mempool: resync finished in ", time.Since(start), ", ", len(m.txToInputOutput), " transactions in mempool")
return nil return nil
} }