Implement bulk connect blocks
This commit is contained in:
parent
878d25ea42
commit
d45d028ef2
@ -175,7 +175,11 @@ func main() {
|
|||||||
}
|
}
|
||||||
index.SetInternalState(internalState)
|
index.SetInternalState(internalState)
|
||||||
if internalState.DbState != common.DbStateClosed {
|
if internalState.DbState != common.DbStateClosed {
|
||||||
glog.Warning("internalState: database in not closed state ", internalState.DbState, ", possibly previous ungraceful shutdown")
|
if internalState.DbState == common.DbStateInconsistent {
|
||||||
|
glog.Error("internalState: database is in inconsistent state and cannot be used")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
glog.Warning("internalState: database was left in open state, possibly previous ungraceful shutdown")
|
||||||
}
|
}
|
||||||
|
|
||||||
if *computeColumnStats {
|
if *computeColumnStats {
|
||||||
|
|||||||
@ -11,6 +11,8 @@ const (
|
|||||||
DbStateClosed = uint32(iota)
|
DbStateClosed = uint32(iota)
|
||||||
// DbStateOpen means db is open or application died without closing the db
|
// DbStateOpen means db is open or application died without closing the db
|
||||||
DbStateOpen
|
DbStateOpen
|
||||||
|
// DbStateInconsistent means db is in inconsistent state and cannot be used
|
||||||
|
DbStateInconsistent
|
||||||
)
|
)
|
||||||
|
|
||||||
// InternalStateColumn contains the data of a db column
|
// InternalStateColumn contains the data of a db column
|
||||||
|
|||||||
225
db/rocksdb.go
225
db/rocksdb.go
@ -258,7 +258,18 @@ func (d *RocksDB) writeBlock(block *bchain.Block, op int) error {
|
|||||||
// unspentTxs; therefore it is not possible to DisconnectBlocks this way
|
// unspentTxs; therefore it is not possible to DisconnectBlocks this way
|
||||||
return errors.New("DisconnectBlock is not supported for UTXO chains")
|
return errors.New("DisconnectBlock is not supported for UTXO chains")
|
||||||
}
|
}
|
||||||
if err := d.writeAddressesUTXO(wb, block); err != nil {
|
txAddressesMap := make(map[string]*txAddresses)
|
||||||
|
balances := make(map[string]*addrBalance)
|
||||||
|
if err := d.writeAddressesUTXO(wb, block, txAddressesMap, balances); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := d.storeTxAddresses(wb, txAddressesMap); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := d.storeBalances(wb, balances); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := d.storeAndCleanupBlockTxs(wb, block); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -270,6 +281,177 @@ func (d *RocksDB) writeBlock(block *bchain.Block, op int) error {
|
|||||||
return d.db.Write(d.wo, wb)
|
return d.db.Write(d.wo, wb)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BulkConnect is used to connect blocks in bulk, faster but if interrupted inconsistent way
|
||||||
|
type BulkConnect struct {
|
||||||
|
d *RocksDB
|
||||||
|
isUTXO bool
|
||||||
|
txAddressesMap map[string]*txAddresses
|
||||||
|
balances map[string]*addrBalance
|
||||||
|
height uint32
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
maxBulkTxAddresses = 2000000
|
||||||
|
partialStoreAddresses = maxBulkTxAddresses / 10
|
||||||
|
maxBulkBalances = 2500000
|
||||||
|
partialStoreBalances = maxBulkBalances / 10
|
||||||
|
)
|
||||||
|
|
||||||
|
func (d *RocksDB) InitBulkConnect() (*BulkConnect, error) {
|
||||||
|
bc := &BulkConnect{
|
||||||
|
d: d,
|
||||||
|
isUTXO: d.chainParser.IsUTXOChain(),
|
||||||
|
txAddressesMap: make(map[string]*txAddresses),
|
||||||
|
balances: make(map[string]*addrBalance),
|
||||||
|
}
|
||||||
|
if err := d.SetInconsistentState(true); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
glog.Info("rocksdb: bulk connect init, db set to inconsistent state")
|
||||||
|
return bc, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *BulkConnect) storeTxAddresses(c chan error, all bool) {
|
||||||
|
defer close(c)
|
||||||
|
start := time.Now()
|
||||||
|
var txm map[string]*txAddresses
|
||||||
|
if all {
|
||||||
|
txm = b.txAddressesMap
|
||||||
|
b.txAddressesMap = make(map[string]*txAddresses)
|
||||||
|
} else {
|
||||||
|
txm = make(map[string]*txAddresses)
|
||||||
|
for k, a := range b.txAddressesMap {
|
||||||
|
// store all completely spent transactions, they will not be modified again
|
||||||
|
r := true
|
||||||
|
for _, o := range a.outputs {
|
||||||
|
if o.spent == false {
|
||||||
|
r = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if r {
|
||||||
|
txm[k] = a
|
||||||
|
delete(b.txAddressesMap, k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// store some other random transactions if necessary
|
||||||
|
if len(txm) < partialStoreAddresses {
|
||||||
|
for k, a := range b.txAddressesMap {
|
||||||
|
txm[k] = a
|
||||||
|
delete(b.txAddressesMap, k)
|
||||||
|
if len(txm) >= partialStoreAddresses {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wb := gorocksdb.NewWriteBatch()
|
||||||
|
defer wb.Destroy()
|
||||||
|
if err := b.d.storeTxAddresses(wb, txm); err != nil {
|
||||||
|
c <- err
|
||||||
|
} else {
|
||||||
|
if err := b.d.db.Write(b.d.wo, wb); err != nil {
|
||||||
|
c <- err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
glog.Info("rocksdb: height ", b.height, ", stored ", len(txm), " txAddresses, ", len(b.txAddressesMap), " remaining, done in ", time.Since(start))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *BulkConnect) storeBalances(c chan error, all bool) {
|
||||||
|
defer close(c)
|
||||||
|
start := time.Now()
|
||||||
|
var bal map[string]*addrBalance
|
||||||
|
if all {
|
||||||
|
bal = b.balances
|
||||||
|
b.balances = make(map[string]*addrBalance)
|
||||||
|
} else {
|
||||||
|
bal = make(map[string]*addrBalance)
|
||||||
|
// store some random balances
|
||||||
|
for k, a := range b.balances {
|
||||||
|
bal[k] = a
|
||||||
|
delete(b.balances, k)
|
||||||
|
if len(bal) >= partialStoreBalances {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wb := gorocksdb.NewWriteBatch()
|
||||||
|
defer wb.Destroy()
|
||||||
|
if err := b.d.storeBalances(wb, bal); err != nil {
|
||||||
|
c <- err
|
||||||
|
} else {
|
||||||
|
if err := b.d.db.Write(b.d.wo, wb); err != nil {
|
||||||
|
c <- err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
glog.Info("rocksdb: height ", b.height, ", stored ", len(bal), " balances, ", len(b.balances), " remaining, done in ", time.Since(start))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *BulkConnect) ConnectBlock(block *bchain.Block, storeBlockTxs bool) error {
|
||||||
|
b.height = block.Height
|
||||||
|
if !b.isUTXO {
|
||||||
|
return b.d.ConnectBlock(block)
|
||||||
|
}
|
||||||
|
wb := gorocksdb.NewWriteBatch()
|
||||||
|
defer wb.Destroy()
|
||||||
|
if err := b.d.writeAddressesUTXO(wb, block, b.txAddressesMap, b.balances); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
var storeAddressesChan, storeBalancesChan chan error
|
||||||
|
if len(b.txAddressesMap) > maxBulkTxAddresses || len(b.balances) > maxBulkBalances {
|
||||||
|
if len(b.txAddressesMap)+partialStoreAddresses > maxBulkTxAddresses {
|
||||||
|
storeAddressesChan = make(chan error)
|
||||||
|
go b.storeTxAddresses(storeAddressesChan, false)
|
||||||
|
}
|
||||||
|
if len(b.balances)+partialStoreBalances > maxBulkBalances {
|
||||||
|
storeBalancesChan = make(chan error)
|
||||||
|
go b.storeBalances(storeBalancesChan, false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if storeBlockTxs {
|
||||||
|
if err := b.d.storeAndCleanupBlockTxs(wb, block); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := b.d.writeHeight(wb, block, opInsert); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := b.d.db.Write(b.d.wo, wb); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if storeAddressesChan != nil {
|
||||||
|
if err := <-storeAddressesChan; err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if storeBalancesChan != nil {
|
||||||
|
if err := <-storeBalancesChan; err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *BulkConnect) Close() error {
|
||||||
|
glog.Info("rocksdb: bulk connect closing")
|
||||||
|
storeAddressesChan := make(chan error)
|
||||||
|
go b.storeTxAddresses(storeAddressesChan, true)
|
||||||
|
storeBalancesChan := make(chan error)
|
||||||
|
go b.storeBalances(storeBalancesChan, true)
|
||||||
|
if err := <-storeAddressesChan; err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := <-storeBalancesChan; err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := b.d.SetInconsistentState(false); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
glog.Info("rocksdb: bulk connect closed, db set to open state")
|
||||||
|
b.d = nil
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Addresses index
|
// Addresses index
|
||||||
|
|
||||||
type outpoint struct {
|
type outpoint struct {
|
||||||
@ -315,11 +497,9 @@ func (d *RocksDB) resetValueSatToZero(valueSat *big.Int, addrID []byte, logText
|
|||||||
valueSat.SetInt64(0)
|
valueSat.SetInt64(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Block) error {
|
func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Block, txAddressesMap map[string]*txAddresses, balances map[string]*addrBalance) error {
|
||||||
addresses := make(map[string][]outpoint)
|
addresses := make(map[string][]outpoint)
|
||||||
blockTxIDs := make([][]byte, len(block.Txs))
|
blockTxIDs := make([][]byte, len(block.Txs))
|
||||||
txAddressesMap := make(map[string]*txAddresses)
|
|
||||||
balances := make(map[string]*addrBalance)
|
|
||||||
// first process all outputs so that inputs can point to txs in this block
|
// first process all outputs so that inputs can point to txs in this block
|
||||||
for txi := range block.Txs {
|
for txi := range block.Txs {
|
||||||
tx := &block.Txs[txi]
|
tx := &block.Txs[txi]
|
||||||
@ -454,16 +634,7 @@ func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Blo
|
|||||||
ab.sentSat.Add(&ab.sentSat, &ot.valueSat)
|
ab.sentSat.Add(&ab.sentSat, &ot.valueSat)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := d.storeAddresses(wb, block, addresses); err != nil {
|
return d.storeAddresses(wb, block, addresses)
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := d.storeTxAddresses(wb, txAddressesMap); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := d.storeBalances(wb, balances); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return d.storeAndCleanupBlockTxs(wb, block)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func processedInTx(o []outpoint, btxID []byte) bool {
|
func processedInTx(o []outpoint, btxID []byte) bool {
|
||||||
@ -1228,6 +1399,18 @@ func (d *RocksDB) LoadInternalState(rpcCoin string) (*common.InternalState, erro
|
|||||||
return is, nil
|
return is, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *RocksDB) SetInconsistentState(inconsistent bool) error {
|
||||||
|
if d.is == nil {
|
||||||
|
return errors.New("Internal state not created")
|
||||||
|
}
|
||||||
|
if inconsistent {
|
||||||
|
d.is.DbState = common.DbStateInconsistent
|
||||||
|
} else {
|
||||||
|
d.is.DbState = common.DbStateOpen
|
||||||
|
}
|
||||||
|
return d.storeState(d.is)
|
||||||
|
}
|
||||||
|
|
||||||
// SetInternalState sets the InternalState to be used by db to collect internal state
|
// SetInternalState sets the InternalState to be used by db to collect internal state
|
||||||
func (d *RocksDB) SetInternalState(is *common.InternalState) {
|
func (d *RocksDB) SetInternalState(is *common.InternalState) {
|
||||||
d.is = is
|
d.is = is
|
||||||
@ -1235,11 +1418,17 @@ func (d *RocksDB) SetInternalState(is *common.InternalState) {
|
|||||||
|
|
||||||
// StoreInternalState stores the internal state to db
|
// StoreInternalState stores the internal state to db
|
||||||
func (d *RocksDB) StoreInternalState(is *common.InternalState) error {
|
func (d *RocksDB) StoreInternalState(is *common.InternalState) error {
|
||||||
for c := 0; c < len(cfNames); c++ {
|
if d.metrics != nil {
|
||||||
rows, keyBytes, valueBytes := d.is.GetDBColumnStatValues(c)
|
for c := 0; c < len(cfNames); c++ {
|
||||||
d.metrics.DbColumnRows.With(common.Labels{"column": cfNames[c]}).Set(float64(rows))
|
rows, keyBytes, valueBytes := d.is.GetDBColumnStatValues(c)
|
||||||
d.metrics.DbColumnSize.With(common.Labels{"column": cfNames[c]}).Set(float64(keyBytes + valueBytes))
|
d.metrics.DbColumnRows.With(common.Labels{"column": cfNames[c]}).Set(float64(rows))
|
||||||
|
d.metrics.DbColumnSize.With(common.Labels{"column": cfNames[c]}).Set(float64(keyBytes + valueBytes))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
return d.storeState(is)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *RocksDB) storeState(is *common.InternalState) error {
|
||||||
buf, err := is.Pack()
|
buf, err := is.Pack()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|||||||
@ -5,6 +5,7 @@ package db
|
|||||||
import (
|
import (
|
||||||
"blockbook/bchain"
|
"blockbook/bchain"
|
||||||
"blockbook/bchain/coins/btc"
|
"blockbook/bchain/coins/btc"
|
||||||
|
"blockbook/common"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
@ -720,7 +721,45 @@ func TestRocksDB_Index_UTXO(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
verifyAfterUTXOBlock2(t, d)
|
verifyAfterUTXOBlock2(t, d)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_BulkConnect_UTXO(t *testing.T) {
|
||||||
|
d := setupRocksDB(t, &testBitcoinParser{
|
||||||
|
BitcoinParser: bitcoinTestnetParser(),
|
||||||
|
})
|
||||||
|
defer closeAndDestroyRocksDB(t, d)
|
||||||
|
|
||||||
|
bc, err := d.InitBulkConnect()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if d.is.DbState != common.DbStateInconsistent {
|
||||||
|
t.Fatal("DB not in DbStateInconsistent")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := bc.ConnectBlock(getTestUTXOBlock1(t, d), false); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := checkColumn(d, cfBlockTxs, []keyPair{}); err != nil {
|
||||||
|
{
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := bc.ConnectBlock(getTestUTXOBlock2(t, d), true); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := bc.Close(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if d.is.DbState != common.DbStateOpen {
|
||||||
|
t.Fatal("DB not in DbStateOpen")
|
||||||
|
}
|
||||||
|
|
||||||
|
verifyAfterUTXOBlock2(t, d)
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_packBigint_unpackBigint(t *testing.T) {
|
func Test_packBigint_unpackBigint(t *testing.T) {
|
||||||
|
|||||||
15
db/sync.go
15
db/sync.go
@ -213,17 +213,26 @@ func (w *SyncWorker) ConnectBlocksParallel(lower, higher uint32) error {
|
|||||||
writeBlockDone := make(chan struct{})
|
writeBlockDone := make(chan struct{})
|
||||||
writeBlockWorker := func() {
|
writeBlockWorker := func() {
|
||||||
defer close(writeBlockDone)
|
defer close(writeBlockDone)
|
||||||
|
bc, err := w.db.InitBulkConnect()
|
||||||
|
if err != nil {
|
||||||
|
glog.Error("sync: InitBulkConnect error ", err)
|
||||||
|
}
|
||||||
lastBlock := lower - 1
|
lastBlock := lower - 1
|
||||||
|
keep := uint32(w.chain.GetChainParser().KeepBlockAddresses())
|
||||||
for b := range bch {
|
for b := range bch {
|
||||||
if lastBlock+1 != b.Height {
|
if lastBlock+1 != b.Height {
|
||||||
glog.Error("writeBlockWorker skipped block, last connected block", lastBlock, ", new block ", b.Height)
|
glog.Error("writeBlockWorker skipped block, last connected block", lastBlock, ", new block ", b.Height)
|
||||||
}
|
}
|
||||||
err := w.db.ConnectBlock(b)
|
err := bc.ConnectBlock(b, b.Height+keep > higher)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Error("writeBlockWorker ", b.Height, " ", b.Hash, " error ", err)
|
glog.Error("writeBlockWorker ", b.Height, " ", b.Hash, " error ", err)
|
||||||
}
|
}
|
||||||
lastBlock = b.Height
|
lastBlock = b.Height
|
||||||
}
|
}
|
||||||
|
err = bc.Close()
|
||||||
|
if err != nil {
|
||||||
|
glog.Error("sync: bulkconnect.Close error ", err)
|
||||||
|
}
|
||||||
glog.Info("WriteBlock exiting...")
|
glog.Info("WriteBlock exiting...")
|
||||||
}
|
}
|
||||||
getBlockWorker := func(i int) {
|
getBlockWorker := func(i int) {
|
||||||
@ -276,6 +285,7 @@ func (w *SyncWorker) ConnectBlocksParallel(lower, higher uint32) error {
|
|||||||
}
|
}
|
||||||
go writeBlockWorker()
|
go writeBlockWorker()
|
||||||
var hash string
|
var hash string
|
||||||
|
start := time.Now()
|
||||||
ConnectLoop:
|
ConnectLoop:
|
||||||
for h := lower; h <= higher; {
|
for h := lower; h <= higher; {
|
||||||
select {
|
select {
|
||||||
@ -292,7 +302,8 @@ ConnectLoop:
|
|||||||
}
|
}
|
||||||
hch <- hashHeight{hash, h}
|
hch <- hashHeight{hash, h}
|
||||||
if h > 0 && h%1000 == 0 {
|
if h > 0 && h%1000 == 0 {
|
||||||
glog.Info("connecting block ", h, " ", hash)
|
glog.Info("connecting block ", h, " ", hash, ", elapsed ", time.Since(start))
|
||||||
|
start = time.Now()
|
||||||
}
|
}
|
||||||
h++
|
h++
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user