Put parameters for mempool sync to coin configuration json

This commit is contained in:
Martin Boehm 2018-05-28 16:04:05 +02:00
parent 8b20c38136
commit e5d79b09bc
7 changed files with 48 additions and 23 deletions

View File

@ -42,6 +42,8 @@ type Configuration struct {
ZeroMQBinding string `json:"zeroMQBinding"`
Subversion string `json:"subversion"`
BlockAddressesToKeep int `json:"blockAddressesToKeep"`
MempoolWorkers int `json:"mempoolWorkers"`
MempoolSubWorkers int `json:"mempoolSubWorkers"`
}
// NewBitcoinRPC returns new BitcoinRPC instance.
@ -56,6 +58,14 @@ func NewBitcoinRPC(config json.RawMessage, pushHandler func(bchain.NotificationT
if c.BlockAddressesToKeep < 100 {
c.BlockAddressesToKeep = 100
}
// at least 1 mempool worker/subworker for synchronous mempool synchronization
if c.MempoolWorkers < 1 {
c.MempoolWorkers = 1
}
if c.MempoolSubWorkers < 1 {
c.MempoolSubWorkers = 1
}
transport := &http.Transport{
Dial: (&net.Dialer{KeepAlive: 600 * time.Second}).Dial,
MaxIdleConns: 100,
@ -92,7 +102,7 @@ func (b *BitcoinRPC) GetChainInfoAndInitializeMempool(bc bchain.BlockChain) (str
}
b.mq = mq
b.Mempool = bchain.NewUTXOMempool(bc)
b.Mempool = bchain.NewUTXOMempool(bc, b.ChainConfig.MempoolWorkers, b.ChainConfig.MempoolSubWorkers)
return chainName, nil
}

View File

@ -7,8 +7,6 @@ import (
"github.com/golang/glog"
)
const numberOfSyncRoutines = 8
// addrIndex and outpoint are used also in non utxo mempool
type addrIndex struct {
addrID string
@ -38,13 +36,13 @@ type UTXOMempool struct {
// NewUTXOMempool creates new mempool handler.
// For now there is no cleanup of sync routines, the expectation is that the mempool is created only once per process
func NewUTXOMempool(chain BlockChain) *UTXOMempool {
func NewUTXOMempool(chain BlockChain, workers int, subworkers int) *UTXOMempool {
m := &UTXOMempool{
chain: chain,
chanTxid: make(chan string, 1),
chanAddrIndex: make(chan txidio, 1),
}
for i := 0; i < numberOfSyncRoutines; i++ {
for i := 0; i < workers; i++ {
go func(i int) {
for txid := range m.chanTxid {
io, ok := m.getMempoolTxAddrs(txid)
@ -55,7 +53,7 @@ func NewUTXOMempool(chain BlockChain) *UTXOMempool {
}
}(i)
}
glog.Info("mempool: starting with ", numberOfSyncRoutines, " sync workers")
glog.Info("mempool: starting with ", workers, "*", subworkers, " sync workers")
return m
}
@ -83,8 +81,27 @@ func (m *UTXOMempool) updateMappings(newTxToInputOutput map[string][]addrIndex,
m.addrIDToTx = newAddrIDToTx
}
func (m *UTXOMempool) getInputAddress(input *Vin) *addrIndex {
// TODO - possibly get from DB unspenttxs - however some output txs can be also in mempool
itx, err := m.chain.GetTransactionForMempool(input.Txid)
if err != nil {
glog.Error("cannot get transaction ", input.Txid, ": ", err)
return nil
}
if int(input.Vout) >= len(itx.Vout) {
glog.Error("Vout len in transaction ", input.Txid, " ", len(itx.Vout), " input.Vout=", input.Vout)
return nil
}
addrID, err := m.chain.GetChainParser().GetAddrIDFromVout(&itx.Vout[input.Vout])
if err != nil {
glog.Error("error in addrID in ", input.Txid, " ", input.Vout, ": ", err)
return nil
}
return &addrIndex{string(addrID), int32(^input.Vout)}
}
func (m *UTXOMempool) getMempoolTxAddrs(txid string) ([]addrIndex, bool) {
parser := m.chain.GetChainParser()
tx, err := m.chain.GetTransactionForMempool(txid)
if err != nil {
glog.Error("cannot get transaction ", txid, ": ", err)
@ -93,7 +110,7 @@ func (m *UTXOMempool) getMempoolTxAddrs(txid string) ([]addrIndex, bool) {
glog.V(2).Info("mempool: gettxaddrs ", txid, ", ", len(tx.Vin), " inputs")
io := make([]addrIndex, 0, len(tx.Vout)+len(tx.Vin))
for _, output := range tx.Vout {
addrID, err := parser.GetAddrIDFromVout(&output)
addrID, err := m.chain.GetChainParser().GetAddrIDFromVout(&output)
if err != nil {
glog.Error("error in addrID in ", txid, " ", output.N, ": ", err)
continue
@ -109,22 +126,10 @@ func (m *UTXOMempool) getMempoolTxAddrs(txid string) ([]addrIndex, bool) {
if input.Coinbase != "" {
continue
}
// TODO - possibly get from DB unspenttxs - however some output txs can be also in mempool
itx, err := m.chain.GetTransactionForMempool(input.Txid)
if err != nil {
glog.Error("cannot get transaction ", input.Txid, ": ", err)
continue
ai := m.getInputAddress(&input)
if ai != nil {
io = append(io, *ai)
}
if int(input.Vout) >= len(itx.Vout) {
glog.Error("Vout len in transaction ", input.Txid, " ", len(itx.Vout), " input.Vout=", input.Vout)
continue
}
addrID, err := parser.GetAddrIDFromVout(&itx.Vout[input.Vout])
if err != nil {
glog.Error("error in addrID in ", input.Txid, " ", input.Vout, ": ", err)
continue
}
io = append(io, addrIndex{string(addrID), int32(^input.Vout)})
}
return io, true
}

View File

@ -6,5 +6,7 @@
"parse": true,
"zeroMQBinding": "tcp://127.0.0.1:48331",
"subversion": "/Bitcoin ABC:0.17.1/",
"mempoolWorkers": 8,
"mempoolSubWorkers": 2,
"blockAddressesToKeep": 300
}

View File

@ -6,5 +6,7 @@
"parse": true,
"zeroMQBinding": "tcp://127.0.0.1:38331",
"subversion": "/Bitcoin ABC:0.17.1/",
"mempoolWorkers": 8,
"mempoolSubWorkers": 2,
"blockAddressesToKeep": 300
}

View File

@ -5,5 +5,7 @@
"rpcTimeout": 25,
"parse": true,
"zeroMQBinding": "tcp://127.0.0.1:48330",
"mempoolWorkers": 8,
"mempoolSubWorkers": 2,
"blockAddressesToKeep": 300
}

View File

@ -5,5 +5,7 @@
"rpcTimeout": 25,
"parse": true,
"zeroMQBinding": "tcp://127.0.0.1:38330",
"mempoolWorkers": 8,
"mempoolSubWorkers": 2,
"blockAddressesToKeep": 300
}

View File

@ -5,5 +5,7 @@
"rpcTimeout": 25,
"parse": true,
"zeroMQBinding": "tcp://127.0.0.1:38332",
"mempoolWorkers": 4,
"mempoolSubWorkers": 8,
"blockAddressesToKeep": 300
}