diff --git a/bchain/coins/btc/bitcoinrpc.go b/bchain/coins/btc/bitcoinrpc.go index f28cbe08..0abaad90 100644 --- a/bchain/coins/btc/bitcoinrpc.go +++ b/bchain/coins/btc/bitcoinrpc.go @@ -42,6 +42,8 @@ type Configuration struct { ZeroMQBinding string `json:"zeroMQBinding"` Subversion string `json:"subversion"` BlockAddressesToKeep int `json:"blockAddressesToKeep"` + MempoolWorkers int `json:"mempoolWorkers"` + MempoolSubWorkers int `json:"mempoolSubWorkers"` } // NewBitcoinRPC returns new BitcoinRPC instance. @@ -56,6 +58,14 @@ func NewBitcoinRPC(config json.RawMessage, pushHandler func(bchain.NotificationT if c.BlockAddressesToKeep < 100 { c.BlockAddressesToKeep = 100 } + // at least 1 mempool worker/subworker for synchronous mempool synchronization + if c.MempoolWorkers < 1 { + c.MempoolWorkers = 1 + } + if c.MempoolSubWorkers < 1 { + c.MempoolSubWorkers = 1 + } + transport := &http.Transport{ Dial: (&net.Dialer{KeepAlive: 600 * time.Second}).Dial, MaxIdleConns: 100, @@ -92,7 +102,7 @@ func (b *BitcoinRPC) GetChainInfoAndInitializeMempool(bc bchain.BlockChain) (str } b.mq = mq - b.Mempool = bchain.NewUTXOMempool(bc) + b.Mempool = bchain.NewUTXOMempool(bc, b.ChainConfig.MempoolWorkers, b.ChainConfig.MempoolSubWorkers) return chainName, nil } diff --git a/bchain/mempool_utxo.go b/bchain/mempool_utxo.go index aca925c2..be91514b 100644 --- a/bchain/mempool_utxo.go +++ b/bchain/mempool_utxo.go @@ -7,8 +7,6 @@ import ( "github.com/golang/glog" ) -const numberOfSyncRoutines = 8 - // addrIndex and outpoint are used also in non utxo mempool type addrIndex struct { addrID string @@ -38,13 +36,13 @@ type UTXOMempool struct { // NewUTXOMempool creates new mempool handler. // For now there is no cleanup of sync routines, the expectation is that the mempool is created only once per process -func NewUTXOMempool(chain BlockChain) *UTXOMempool { +func NewUTXOMempool(chain BlockChain, workers int, subworkers int) *UTXOMempool { m := &UTXOMempool{ chain: chain, chanTxid: make(chan string, 1), chanAddrIndex: make(chan txidio, 1), } - for i := 0; i < numberOfSyncRoutines; i++ { + for i := 0; i < workers; i++ { go func(i int) { for txid := range m.chanTxid { io, ok := m.getMempoolTxAddrs(txid) @@ -55,7 +53,7 @@ func NewUTXOMempool(chain BlockChain) *UTXOMempool { } }(i) } - glog.Info("mempool: starting with ", numberOfSyncRoutines, " sync workers") + glog.Info("mempool: starting with ", workers, "*", subworkers, " sync workers") return m } @@ -83,8 +81,27 @@ func (m *UTXOMempool) updateMappings(newTxToInputOutput map[string][]addrIndex, m.addrIDToTx = newAddrIDToTx } +func (m *UTXOMempool) getInputAddress(input *Vin) *addrIndex { + // TODO - possibly get from DB unspenttxs - however some output txs can be also in mempool + itx, err := m.chain.GetTransactionForMempool(input.Txid) + if err != nil { + glog.Error("cannot get transaction ", input.Txid, ": ", err) + return nil + } + if int(input.Vout) >= len(itx.Vout) { + glog.Error("Vout len in transaction ", input.Txid, " ", len(itx.Vout), " input.Vout=", input.Vout) + return nil + } + addrID, err := m.chain.GetChainParser().GetAddrIDFromVout(&itx.Vout[input.Vout]) + if err != nil { + glog.Error("error in addrID in ", input.Txid, " ", input.Vout, ": ", err) + return nil + } + return &addrIndex{string(addrID), int32(^input.Vout)} + +} + func (m *UTXOMempool) getMempoolTxAddrs(txid string) ([]addrIndex, bool) { - parser := m.chain.GetChainParser() tx, err := m.chain.GetTransactionForMempool(txid) if err != nil { glog.Error("cannot get transaction ", txid, ": ", err) @@ -93,7 +110,7 @@ func (m *UTXOMempool) getMempoolTxAddrs(txid string) ([]addrIndex, bool) { glog.V(2).Info("mempool: gettxaddrs ", txid, ", ", len(tx.Vin), " inputs") io := make([]addrIndex, 0, len(tx.Vout)+len(tx.Vin)) for _, output := range tx.Vout { - addrID, err := parser.GetAddrIDFromVout(&output) + addrID, err := m.chain.GetChainParser().GetAddrIDFromVout(&output) if err != nil { glog.Error("error in addrID in ", txid, " ", output.N, ": ", err) continue @@ -109,22 +126,10 @@ func (m *UTXOMempool) getMempoolTxAddrs(txid string) ([]addrIndex, bool) { if input.Coinbase != "" { continue } - // TODO - possibly get from DB unspenttxs - however some output txs can be also in mempool - itx, err := m.chain.GetTransactionForMempool(input.Txid) - if err != nil { - glog.Error("cannot get transaction ", input.Txid, ": ", err) - continue + ai := m.getInputAddress(&input) + if ai != nil { + io = append(io, *ai) } - if int(input.Vout) >= len(itx.Vout) { - glog.Error("Vout len in transaction ", input.Txid, " ", len(itx.Vout), " input.Vout=", input.Vout) - continue - } - addrID, err := parser.GetAddrIDFromVout(&itx.Vout[input.Vout]) - if err != nil { - glog.Error("error in addrID in ", input.Txid, " ", input.Vout, ": ", err) - continue - } - io = append(io, addrIndex{string(addrID), int32(^input.Vout)}) } return io, true } diff --git a/configs/bch-testnet.json b/configs/bch-testnet.json index f518fac2..51731adc 100644 --- a/configs/bch-testnet.json +++ b/configs/bch-testnet.json @@ -6,5 +6,7 @@ "parse": true, "zeroMQBinding": "tcp://127.0.0.1:48331", "subversion": "/Bitcoin ABC:0.17.1/", + "mempoolWorkers": 8, + "mempoolSubWorkers": 2, "blockAddressesToKeep": 300 } diff --git a/configs/bch.json b/configs/bch.json index 55993b90..31ecda0b 100644 --- a/configs/bch.json +++ b/configs/bch.json @@ -6,5 +6,7 @@ "parse": true, "zeroMQBinding": "tcp://127.0.0.1:38331", "subversion": "/Bitcoin ABC:0.17.1/", + "mempoolWorkers": 8, + "mempoolSubWorkers": 2, "blockAddressesToKeep": 300 } diff --git a/configs/btc-testnet.json b/configs/btc-testnet.json index 5b8beab3..4d1f9745 100644 --- a/configs/btc-testnet.json +++ b/configs/btc-testnet.json @@ -5,5 +5,7 @@ "rpcTimeout": 25, "parse": true, "zeroMQBinding": "tcp://127.0.0.1:48330", + "mempoolWorkers": 8, + "mempoolSubWorkers": 2, "blockAddressesToKeep": 300 } diff --git a/configs/btc.json b/configs/btc.json index 538dd217..8cbbc218 100644 --- a/configs/btc.json +++ b/configs/btc.json @@ -5,5 +5,7 @@ "rpcTimeout": 25, "parse": true, "zeroMQBinding": "tcp://127.0.0.1:38330", + "mempoolWorkers": 8, + "mempoolSubWorkers": 2, "blockAddressesToKeep": 300 } diff --git a/configs/zec.json b/configs/zec.json index 431e8f2a..b957b110 100644 --- a/configs/zec.json +++ b/configs/zec.json @@ -5,5 +5,7 @@ "rpcTimeout": 25, "parse": true, "zeroMQBinding": "tcp://127.0.0.1:38332", + "mempoolWorkers": 4, + "mempoolSubWorkers": 8, "blockAddressesToKeep": 300 }