diff --git a/bchain/basemempool.go b/bchain/basemempool.go index 807f1ed6..68c09331 100644 --- a/bchain/basemempool.go +++ b/bchain/basemempool.go @@ -65,11 +65,24 @@ func (a MempoolTxidEntries) Less(i, j int) bool { return hi > hj } -func (m *BaseMempool) updateMappings(newTxEntries map[string]txEntry, newAddrDescToTx map[string][]Outpoint) { - m.mux.Lock() - m.txEntries = newTxEntries - m.addrDescToTx = newAddrDescToTx - m.mux.Unlock() +func (m *BaseMempool) removeEntryFromMempool(txid string, entry txEntry) { + delete(m.txEntries, txid) + for _, si := range entry.addrIndexes { + outpoints, found := m.addrDescToTx[si.addrDesc] + if found { + newOutpoints := make([]Outpoint, 0, len(outpoints)-1) + for _, o := range outpoints { + if o.Txid != txid { + newOutpoints = append(newOutpoints, o) + } + } + if len(newOutpoints) > 0 { + m.addrDescToTx[si.addrDesc] = newOutpoints + } else { + delete(m.addrDescToTx, si.addrDesc) + } + } + } } // GetAllEntries returns all mempool entries sorted by fist seen time in descending order diff --git a/bchain/mempool_bitcoin_type.go b/bchain/mempool_bitcoin_type.go index 929dd26c..68882886 100644 --- a/bchain/mempool_bitcoin_type.go +++ b/bchain/mempool_bitcoin_type.go @@ -19,7 +19,9 @@ type MempoolBitcoinType struct { func NewMempoolBitcoinType(chain BlockChain, workers int, subworkers int) *MempoolBitcoinType { m := &MempoolBitcoinType{ BaseMempool: BaseMempool{ - chain: chain, + chain: chain, + txEntries: make(map[string]txEntry), + addrDescToTx: make(map[string][]Outpoint), }, chanTxid: make(chan string, 1), chanAddrIndex: make(chan txidio, 1), @@ -137,29 +139,30 @@ func (m *MempoolBitcoinType) Resync() (int, error) { return 0, err } glog.V(2).Info("mempool: resync ", len(txs), " txs") - // allocate slightly larger capacity of the maps - newTxEntries := make(map[string]txEntry, len(m.txEntries)+5) - newAddrDescToTx := make(map[string][]Outpoint, len(m.addrDescToTx)+5) - dispatched := 0 - txTime := uint32(time.Now().Unix()) - onNewData := func(txid string, entry txEntry) { + onNewEntry := func(txid string, entry txEntry) { if len(entry.addrIndexes) > 0 { - newTxEntries[txid] = entry + m.mux.Lock() + m.txEntries[txid] = entry for _, si := range entry.addrIndexes { - newAddrDescToTx[si.addrDesc] = append(newAddrDescToTx[si.addrDesc], Outpoint{txid, si.n}) + m.addrDescToTx[si.addrDesc] = append(m.addrDescToTx[si.addrDesc], Outpoint{txid, si.n}) } + m.mux.Unlock() } } + txsMap := make(map[string]struct{}, len(txs)) + dispatched := 0 + txTime := uint32(time.Now().Unix()) // get transaction in parallel using goroutines created in NewUTXOMempool for _, txid := range txs { - io, exists := m.txEntries[txid] + txsMap[txid] = struct{}{} + _, exists := m.txEntries[txid] if !exists { loop: for { select { // store as many processed transactions as possible case tio := <-m.chanAddrIndex: - onNewData(tio.txid, txEntry{tio.io, txTime}) + onNewEntry(tio.txid, txEntry{tio.io, txTime}) dispatched-- // send transaction to be processed case m.chanTxid <- txid: @@ -167,15 +170,20 @@ func (m *MempoolBitcoinType) Resync() (int, error) { break loop } } - } else { - onNewData(txid, io) } } for i := 0; i < dispatched; i++ { tio := <-m.chanAddrIndex - onNewData(tio.txid, txEntry{tio.io, txTime}) + onNewEntry(tio.txid, txEntry{tio.io, txTime}) + } + + for txid, entry := range m.txEntries { + if _, exists := txsMap[txid]; !exists { + m.mux.Lock() + m.removeEntryFromMempool(txid, entry) + m.mux.Unlock() + } } - m.updateMappings(newTxEntries, newAddrDescToTx) glog.Info("mempool: resync finished in ", time.Since(start), ", ", len(m.txEntries), " transactions in mempool") return len(m.txEntries), nil } diff --git a/bchain/mempool_ethereum_type.go b/bchain/mempool_ethereum_type.go index 52f176c5..4eb5b969 100644 --- a/bchain/mempool_ethereum_type.go +++ b/bchain/mempool_ethereum_type.go @@ -132,26 +132,6 @@ func (m *MempoolEthereumType) AddTransactionToMempool(txid string) { } } -func (m *MempoolEthereumType) removeEntryFromMempool(txid string, entry txEntry) { - delete(m.txEntries, txid) - for _, si := range entry.addrIndexes { - outpoints, found := m.addrDescToTx[si.addrDesc] - if found { - newOutpoints := make([]Outpoint, 0, len(outpoints)-1) - for _, o := range outpoints { - if o.Txid != txid { - newOutpoints = append(newOutpoints, o) - } - } - if len(newOutpoints) > 0 { - m.addrDescToTx[si.addrDesc] = newOutpoints - } else { - delete(m.addrDescToTx, si.addrDesc) - } - } - } -} - // RemoveTransactionFromMempool removes transaction from mempool func (m *MempoolEthereumType) RemoveTransactionFromMempool(txid string) { m.mux.Lock()