diff --git a/bchain/mempool_utxo.go b/bchain/mempool_utxo.go index be91514b..5c2469f7 100644 --- a/bchain/mempool_utxo.go +++ b/bchain/mempool_utxo.go @@ -44,8 +44,18 @@ func NewUTXOMempool(chain BlockChain, workers int, subworkers int) *UTXOMempool } for i := 0; i < workers; i++ { go func(i int) { + chanInput := make(chan *Vin, 1) + chanResult := make(chan *addrIndex, 1) + for j := 0; j < subworkers; j++ { + go func(j int) { + for input := range chanInput { + ai := m.getInputAddress(input) + chanResult <- ai + } + }(j) + } for txid := range m.chanTxid { - io, ok := m.getMempoolTxAddrs(txid) + io, ok := m.getTxAddrs(txid, chanInput, chanResult) if !ok { io = []addrIndex{} } @@ -101,7 +111,7 @@ func (m *UTXOMempool) getInputAddress(input *Vin) *addrIndex { } -func (m *UTXOMempool) getMempoolTxAddrs(txid string) ([]addrIndex, bool) { +func (m *UTXOMempool) getTxAddrs(txid string, chanInput chan *Vin, chanResult chan *addrIndex) ([]addrIndex, bool) { tx, err := m.chain.GetTransactionForMempool(txid) if err != nil { glog.Error("cannot get transaction ", txid, ": ", err) @@ -122,11 +132,29 @@ func (m *UTXOMempool) getMempoolTxAddrs(txid string) ([]addrIndex, bool) { m.onNewTxAddr(tx.Txid, output.ScriptPubKey.Addresses[0]) } } + dispatched := 0 for _, input := range tx.Vin { if input.Coinbase != "" { continue } - ai := m.getInputAddress(&input) + loop: + for { + select { + // store as many processed results as possible + case ai := <-chanResult: + if ai != nil { + io = append(io, *ai) + } + dispatched-- + // send input to be processed + case chanInput <- &input: + dispatched++ + break loop + } + } + } + for i := 0; i < dispatched; i++ { + ai := <-chanResult if ai != nil { io = append(io, *ai) }