Extract mempool common functionality to BaseMempool

This commit is contained in:
Martin Boehm 2019-04-02 16:41:47 +02:00
parent c19f6bfb42
commit 870354bc90
3 changed files with 92 additions and 115 deletions

86
bchain/basemempool.go Normal file
View File

@ -0,0 +1,86 @@
package bchain
import (
"sort"
"sync"
)
type addrIndex struct {
addrDesc string
n int32
}
type txEntry struct {
addrIndexes []addrIndex
time uint32
}
type txidio struct {
txid string
io []addrIndex
}
// BaseMempool is mempool base handle
type BaseMempool struct {
chain BlockChain
mux sync.Mutex
txEntries map[string]txEntry
addrDescToTx map[string][]Outpoint
OnNewTxAddr OnNewTxAddrFunc
}
// GetTransactions returns slice of mempool transactions for given address
func (m *BaseMempool) GetTransactions(address string) ([]Outpoint, error) {
parser := m.chain.GetChainParser()
addrDesc, err := parser.GetAddrDescFromAddress(address)
if err != nil {
return nil, err
}
return m.GetAddrDescTransactions(addrDesc)
}
// GetAddrDescTransactions returns slice of mempool transactions for given address descriptor
func (m *BaseMempool) GetAddrDescTransactions(addrDesc AddressDescriptor) ([]Outpoint, error) {
m.mux.Lock()
defer m.mux.Unlock()
return append([]Outpoint(nil), m.addrDescToTx[string(addrDesc)]...), nil
}
func (a MempoolTxidEntries) Len() int { return len(a) }
func (a MempoolTxidEntries) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a MempoolTxidEntries) Less(i, j int) bool {
// if the Time is equal, sort by txid to make the order defined
hi := a[i].Time
hj := a[j].Time
if hi == hj {
return a[i].Txid > a[j].Txid
}
// order in reverse
return hi > hj
}
func (m *BaseMempool) updateMappings(newTxEntries map[string]txEntry, newAddrDescToTx map[string][]Outpoint) {
m.mux.Lock()
defer m.mux.Unlock()
m.txEntries = newTxEntries
m.addrDescToTx = newAddrDescToTx
}
func getAllEntries(txEntries map[string]txEntry) MempoolTxidEntries {
a := make(MempoolTxidEntries, len(txEntries))
i := 0
for txid, entry := range txEntries {
a[i] = MempoolTxidEntry{
Txid: txid,
Time: entry.time,
}
i++
}
sort.Sort(a)
return a
}
// GetAllEntries returns all mempool entries sorted by fist seen time in descending order
func (m *BaseMempool) GetAllEntries() MempoolTxidEntries {
return getAllEntries(m.txEntries)
}

View File

@ -1,37 +1,16 @@
package bchain
import (
"sort"
"sync"
"time"
"github.com/golang/glog"
)
type addrIndex struct {
addrDesc string
n int32
}
type txEntry struct {
addrIndexes []addrIndex
time uint32
}
type txidio struct {
txid string
io []addrIndex
}
// MempoolBitcoinType is mempool handle.
type MempoolBitcoinType struct {
chain BlockChain
mux sync.Mutex
txEntries map[string]txEntry
addrDescToTx map[string][]Outpoint
BaseMempool
chanTxid chan string
chanAddrIndex chan txidio
OnNewTxAddr OnNewTxAddrFunc
AddrDescForOutpoint AddrDescForOutpointFunc
}
@ -39,7 +18,9 @@ type MempoolBitcoinType struct {
// For now there is no cleanup of sync routines, the expectation is that the mempool is created only once per process
func NewMempoolBitcoinType(chain BlockChain, workers int, subworkers int) *MempoolBitcoinType {
m := &MempoolBitcoinType{
chain: chain,
BaseMempool: BaseMempool{
chain: chain,
},
chanTxid: make(chan string, 1),
chanAddrIndex: make(chan txidio, 1),
}
@ -68,30 +49,6 @@ func NewMempoolBitcoinType(chain BlockChain, workers int, subworkers int) *Mempo
return m
}
// GetTransactions returns slice of mempool transactions for given address
func (m *MempoolBitcoinType) GetTransactions(address string) ([]Outpoint, error) {
parser := m.chain.GetChainParser()
addrDesc, err := parser.GetAddrDescFromAddress(address)
if err != nil {
return nil, err
}
return m.GetAddrDescTransactions(addrDesc)
}
// GetAddrDescTransactions returns slice of mempool transactions for given address descriptor
func (m *MempoolBitcoinType) GetAddrDescTransactions(addrDesc AddressDescriptor) ([]Outpoint, error) {
m.mux.Lock()
defer m.mux.Unlock()
return append([]Outpoint(nil), m.addrDescToTx[string(addrDesc)]...), nil
}
func (m *MempoolBitcoinType) updateMappings(newTxEntries map[string]txEntry, newAddrDescToTx map[string][]Outpoint) {
m.mux.Lock()
defer m.mux.Unlock()
m.txEntries = newTxEntries
m.addrDescToTx = newAddrDescToTx
}
func (m *MempoolBitcoinType) getInputAddress(input Outpoint) *addrIndex {
var addrDesc AddressDescriptor
if m.AddrDescForOutpoint != nil {
@ -222,35 +179,3 @@ func (m *MempoolBitcoinType) Resync() (int, error) {
glog.Info("mempool: resync finished in ", time.Since(start), ", ", len(m.txEntries), " transactions in mempool")
return len(m.txEntries), nil
}
func (a MempoolTxidEntries) Len() int { return len(a) }
func (a MempoolTxidEntries) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a MempoolTxidEntries) Less(i, j int) bool {
// if the Time is equal, sort by txid to make the order defined
hi := a[i].Time
hj := a[j].Time
if hi == hj {
return a[i].Txid > a[j].Txid
}
// order in reverse
return hi > hj
}
func getAllEntries(txEntries map[string]txEntry) MempoolTxidEntries {
a := make(MempoolTxidEntries, len(txEntries))
i := 0
for txid, entry := range txEntries {
a[i] = MempoolTxidEntry{
Txid: txid,
Time: entry.time,
}
i++
}
sort.Sort(a)
return a
}
// GetAllEntries returns all mempool entries sorted by fist seen time in descending order
func (m *MempoolBitcoinType) GetAllEntries() MempoolTxidEntries {
return getAllEntries(m.txEntries)
}

View File

@ -1,7 +1,6 @@
package bchain
import (
"sync"
"time"
"github.com/golang/glog"
@ -9,40 +8,12 @@ import (
// MempoolEthereumType is mempool handle of EthereumType chains
type MempoolEthereumType struct {
chain BlockChain
mux sync.Mutex
txEntries map[string]txEntry
addrDescToTx map[string][]Outpoint
OnNewTxAddr OnNewTxAddrFunc
BaseMempool
}
// NewMempoolEthereumType creates new mempool handler.
func NewMempoolEthereumType(chain BlockChain) *MempoolEthereumType {
return &MempoolEthereumType{chain: chain}
}
// GetTransactions returns slice of mempool transactions for given address
func (m *MempoolEthereumType) GetTransactions(address string) ([]Outpoint, error) {
parser := m.chain.GetChainParser()
addrDesc, err := parser.GetAddrDescFromAddress(address)
if err != nil {
return nil, err
}
return m.GetAddrDescTransactions(addrDesc)
}
// GetAddrDescTransactions returns slice of mempool transactions for given address descriptor
func (m *MempoolEthereumType) GetAddrDescTransactions(addrDesc AddressDescriptor) ([]Outpoint, error) {
m.mux.Lock()
defer m.mux.Unlock()
return append([]Outpoint(nil), m.addrDescToTx[string(addrDesc)]...), nil
}
func (m *MempoolEthereumType) updateMappings(newTxEntries map[string]txEntry, newAddrDescToTx map[string][]Outpoint) {
m.mux.Lock()
defer m.mux.Unlock()
m.txEntries = newTxEntries
m.addrDescToTx = newAddrDescToTx
return &MempoolEthereumType{BaseMempool: BaseMempool{chain: chain}}
}
func appendAddress(io []addrIndex, i int32, a string, parser BlockChainParser) []addrIndex {
@ -137,8 +108,3 @@ func (m *MempoolEthereumType) Resync() (int, error) {
glog.Info("Mempool: resync finished in ", time.Since(start), ", ", len(m.txEntries), " transactions in mempool")
return len(m.txEntries), nil
}
// GetAllEntries returns all mempool entries sorted by fist seen time in descending order
func (m *MempoolEthereumType) GetAllEntries() MempoolTxidEntries {
return getAllEntries(m.txEntries)
}