diff --git a/bchain/mempool_utxo.go b/bchain/mempool_utxo.go index aadc6f2e..6f90c1c2 100644 --- a/bchain/mempool_utxo.go +++ b/bchain/mempool_utxo.go @@ -7,6 +7,8 @@ import ( "github.com/golang/glog" ) +const numberOfSyncRoutines = 8 + // addrIndex and outpoint are used also in non utxo mempool type addrIndex struct { addrID string @@ -18,17 +20,42 @@ type outpoint struct { vout int32 } +type txidio struct { + txid string + io []addrIndex +} + // UTXOMempool is mempool handle. type UTXOMempool struct { chain BlockChain mux sync.Mutex txToInputOutput map[string][]addrIndex addrIDToTx map[string][]outpoint + chanTxid chan string + chanAddrIndex chan txidio + onNewTxAddr func(txid string, addr string) } // NewUTXOMempool creates new mempool handler. +// For now there is no cleanup of sync routines, the expectation is that the mempool is created only once per process func NewUTXOMempool(chain BlockChain) *UTXOMempool { - return &UTXOMempool{chain: chain} + m := &UTXOMempool{ + chain: chain, + chanTxid: make(chan string, 1), + chanAddrIndex: make(chan txidio, 1), + } + for i := 0; i < numberOfSyncRoutines; i++ { + go func(i int) { + for txid := range m.chanTxid { + io, ok := m.getMempoolTxAddrs(txid) + if !ok { + io = []addrIndex{} + } + m.chanAddrIndex <- txidio{txid, io} + } + }(i) + } + return m } // GetTransactions returns slice of mempool transactions for given address @@ -55,7 +82,7 @@ func (m *UTXOMempool) updateMappings(newTxToInputOutput map[string][]addrIndex, m.addrIDToTx = newAddrIDToTx } -func (m *UTXOMempool) getMempoolTxAddrs(txid string, onNewTxAddr func(txid string, addr string)) ([]addrIndex, bool) { +func (m *UTXOMempool) getMempoolTxAddrs(txid string) ([]addrIndex, bool) { parser := m.chain.GetChainParser() tx, err := m.chain.GetTransactionForMempool(txid) if err != nil { @@ -72,8 +99,8 @@ func (m *UTXOMempool) getMempoolTxAddrs(txid string, onNewTxAddr func(txid strin if len(addrID) > 0 { io = append(io, addrIndex{string(addrID), int32(output.N)}) } - if onNewTxAddr != nil && len(output.ScriptPubKey.Addresses) == 1 { - onNewTxAddr(tx.Txid, output.ScriptPubKey.Addresses[0]) + if m.onNewTxAddr != nil && len(output.ScriptPubKey.Addresses) == 1 { + m.onNewTxAddr(tx.Txid, output.ScriptPubKey.Addresses[0]) } } for _, input := range tx.Vin { @@ -106,6 +133,7 @@ func (m *UTXOMempool) getMempoolTxAddrs(txid string, onNewTxAddr func(txid strin func (m *UTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) error { start := time.Now() glog.V(1).Info("Mempool: resync") + m.onNewTxAddr = onNewTxAddr txs, err := m.chain.GetMempool() if err != nil { return err @@ -113,21 +141,42 @@ func (m *UTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) error { // allocate slightly larger capacity of the maps newTxToInputOutput := make(map[string][]addrIndex, len(m.txToInputOutput)+5) newAddrIDToTx := make(map[string][]outpoint, len(m.addrIDToTx)+5) + dispatched := 0 + onNewData := func(txid string, io []addrIndex) { + if len(io) > 0 { + newTxToInputOutput[txid] = io + for _, si := range io { + newAddrIDToTx[si.addrID] = append(newAddrIDToTx[si.addrID], outpoint{txid, si.n}) + } + } + } + // get transaction in parallel using goroutines created in NewUTXOMempool for _, txid := range txs { io, exists := m.txToInputOutput[txid] if !exists { - var ok bool - io, ok = m.getMempoolTxAddrs(txid, onNewTxAddr) - if !ok { - continue + loop: + for { + select { + // store as many processed transactions as possible + case tio := <-m.chanAddrIndex: + onNewData(tio.txid, tio.io) + dispatched-- + // send transaction to be processed + case m.chanTxid <- txid: + dispatched++ + break loop + } } - } - newTxToInputOutput[txid] = io - for _, si := range io { - newAddrIDToTx[si.addrID] = append(newAddrIDToTx[si.addrID], outpoint{txid, si.n}) + } else { + onNewData(txid, io) } } + for i := 0; i < dispatched; i++ { + tio := <-m.chanAddrIndex + onNewData(tio.txid, tio.io) + } m.updateMappings(newTxToInputOutput, newAddrIDToTx) + m.onNewTxAddr = nil glog.Info("Mempool: resync finished in ", time.Since(start), ", ", len(m.txToInputOutput), " transactions in mempool") return nil }