Write data to DB synchronously in bulk connect

RocksDB has some memory leak/fragmentation problem
when inserting data in parallel
This commit is contained in:
Martin Boehm 2018-08-21 18:56:30 +02:00
parent c9471bf867
commit 188eed8881

View File

@ -285,12 +285,18 @@ func (d *RocksDB) writeBlock(block *bchain.Block, op int) error {
return d.db.Write(d.wo, wb)
}
// BulkConnect is used to connect blocks in bulk, faster but if interrupted inconsistent way
// bulk connect
// in bulk mode the data are cached and stored to db in batches
// it speeds up the import in two ways:
// 1) balances and txAddresses are modified several times during the import, there is a chance that the modifications are done before write to DB
// 2) rocksdb seems to handle better fewer larger batches than continuous stream of smaller batches
type bulkAddresses struct {
height uint32
addresses map[string][]outpoint
}
// BulkConnect is used to connect blocks in bulk, faster but if interrupted inconsistent way
type BulkConnect struct {
d *RocksDB
isUTXO bool
@ -302,13 +308,14 @@ type BulkConnect struct {
}
const (
maxBulkAddresses = 400000
maxBulkAddresses = 300000
maxBulkTxAddresses = 2000000
partialStoreAddresses = maxBulkTxAddresses / 10
maxBulkBalances = 2500000
partialStoreBalances = maxBulkBalances / 10
)
// InitBulkConnect initializes bulk connect and switches DB to inconsistent state
func (d *RocksDB) InitBulkConnect() (*BulkConnect, error) {
bc := &BulkConnect{
d: d,
@ -323,9 +330,7 @@ func (d *RocksDB) InitBulkConnect() (*BulkConnect, error) {
return bc, nil
}
func (b *BulkConnect) storeTxAddresses(c chan error, all bool) {
defer close(c)
start := time.Now()
func (b *BulkConnect) storeTxAddresses(wb *gorocksdb.WriteBatch, all bool) (int, int, error) {
var txm map[string]*TxAddresses
var sp int
if all {
@ -359,21 +364,31 @@ func (b *BulkConnect) storeTxAddresses(c chan error, all bool) {
}
}
}
wb := gorocksdb.NewWriteBatch()
defer wb.Destroy()
if err := b.d.storeTxAddresses(wb, txm); err != nil {
c <- err
} else {
if err := b.d.db.Write(b.d.wo, wb); err != nil {
c <- err
}
return 0, 0, err
}
glog.Info("rocksdb: height ", b.height, ", stored ", len(txm), " (", sp, " spent) txAddresses, ", len(b.txAddressesMap), " remaining, done in ", time.Since(start))
return len(txm), sp, nil
}
func (b *BulkConnect) storeBalances(c chan error, all bool) {
func (b *BulkConnect) parallelStoreTxAddresses(c chan error, all bool) {
defer close(c)
start := time.Now()
wb := gorocksdb.NewWriteBatch()
defer wb.Destroy()
count, sp, err := b.storeTxAddresses(wb, all)
if err != nil {
c <- err
return
}
if err := b.d.db.Write(b.d.wo, wb); err != nil {
c <- err
return
}
glog.Info("rocksdb: height ", b.height, ", stored ", count, " (", sp, " spent) txAddresses, ", len(b.txAddressesMap), " remaining, done in ", time.Since(start))
c <- nil
}
func (b *BulkConnect) storeBalances(wb *gorocksdb.WriteBatch, all bool) (int, error) {
var bal map[string]*AddrBalance
if all {
bal = b.balances
@ -389,16 +404,28 @@ func (b *BulkConnect) storeBalances(c chan error, all bool) {
}
}
}
if err := b.d.storeBalances(wb, bal); err != nil {
return 0, err
}
return len(bal), nil
}
func (b *BulkConnect) parallelStoreBalances(c chan error, all bool) {
defer close(c)
start := time.Now()
wb := gorocksdb.NewWriteBatch()
defer wb.Destroy()
if err := b.d.storeBalances(wb, bal); err != nil {
count, err := b.storeBalances(wb, all)
if err != nil {
c <- err
} else {
if err := b.d.db.Write(b.d.wo, wb); err != nil {
c <- err
}
return
}
glog.Info("rocksdb: height ", b.height, ", stored ", len(bal), " balances, ", len(b.balances), " remaining, done in ", time.Since(start))
if err := b.d.db.Write(b.d.wo, wb); err != nil {
c <- err
return
}
glog.Info("rocksdb: height ", b.height, ", stored ", count, " balances, ", len(b.balances), " remaining, done in ", time.Since(start))
c <- nil
}
func (b *BulkConnect) storeBulkAddresses(wb *gorocksdb.WriteBatch) error {
@ -412,41 +439,46 @@ func (b *BulkConnect) storeBulkAddresses(wb *gorocksdb.WriteBatch) error {
return nil
}
// ConnectBlock connects block in bulk mode
func (b *BulkConnect) ConnectBlock(block *bchain.Block, storeBlockTxs bool) error {
b.height = block.Height
if !b.isUTXO {
return b.d.ConnectBlock(block)
}
wb := gorocksdb.NewWriteBatch()
defer wb.Destroy()
addresses := make(map[string][]outpoint)
if err := b.d.processAddressesUTXO(block, addresses, b.txAddressesMap, b.balances); err != nil {
return err
}
start := time.Now()
var sa bool
var storeAddressesChan, storeBalancesChan chan error
if len(b.txAddressesMap) > maxBulkTxAddresses || len(b.balances) > maxBulkBalances {
sa = true
if len(b.txAddressesMap)+partialStoreAddresses > maxBulkTxAddresses {
storeAddressesChan = make(chan error)
go b.storeTxAddresses(storeAddressesChan, false)
}
if len(b.balances)+partialStoreBalances > maxBulkBalances {
storeBalancesChan = make(chan error)
go b.storeBalances(storeBalancesChan, false)
}
}
b.bulkAddresses = append(b.bulkAddresses, bulkAddresses{
height: block.Height,
addresses: addresses,
})
b.bulkAddressesCount += len(addresses)
bac := b.bulkAddressesCount
if sa || b.bulkAddressesCount > maxBulkAddresses {
if err := b.storeBulkAddresses(wb); err != nil {
return err
}
wb := gorocksdb.NewWriteBatch()
defer wb.Destroy()
start := time.Now()
var count, count1, storeType int
var err error
const (
storeNone = iota
storeTxAddresses
storeBalances
storeBulkAddresses
)
// in one block store maximally one type of data
if len(b.txAddressesMap) > maxBulkTxAddresses {
storeType = storeTxAddresses
count, count1, err = b.storeTxAddresses(wb, false)
} else if len(b.balances) > maxBulkBalances {
storeType = storeBalances
count, err = b.storeBalances(wb, false)
} else if b.bulkAddressesCount > maxBulkAddresses {
storeType = storeBulkAddresses
count = b.bulkAddressesCount
err = b.storeBulkAddresses(wb)
}
if err != nil {
return err
}
if storeBlockTxs {
if err := b.d.storeAndCleanupBlockTxs(wb, block); err != nil {
@ -459,29 +491,26 @@ func (b *BulkConnect) ConnectBlock(block *bchain.Block, storeBlockTxs bool) erro
if err := b.d.db.Write(b.d.wo, wb); err != nil {
return err
}
if bac > b.bulkAddressesCount {
glog.Info("rocksdb: height ", b.height, ", stored ", bac, " addresses, done in ", time.Since(start))
}
if storeAddressesChan != nil {
if err := <-storeAddressesChan; err != nil {
return err
}
}
if storeBalancesChan != nil {
if err := <-storeBalancesChan; err != nil {
return err
}
switch storeType {
case storeTxAddresses:
glog.Info("rocksdb: height ", b.height, ", stored ", count, " (", count1, " spent) txAddresses, ", len(b.txAddressesMap), " remaining, done in ", time.Since(start))
case storeBalances:
glog.Info("rocksdb: height ", b.height, ", stored ", count, " balances, ", len(b.balances), " remaining, done in ", time.Since(start))
case storeBulkAddresses:
glog.Info("rocksdb: height ", b.height, ", stored ", count, " addresses, done in ", time.Since(start))
}
return nil
}
// Close flushes the cached data and switches DB from inconsistent state open
// after Close, the BulkConnect cannot be used
func (b *BulkConnect) Close() error {
glog.Info("rocksdb: bulk connect closing")
start := time.Now()
storeAddressesChan := make(chan error)
go b.storeTxAddresses(storeAddressesChan, true)
go b.parallelStoreTxAddresses(storeAddressesChan, true)
storeBalancesChan := make(chan error)
go b.storeBalances(storeBalancesChan, true)
go b.parallelStoreBalances(storeBalancesChan, true)
wb := gorocksdb.NewWriteBatch()
defer wb.Destroy()
bac := b.bulkAddressesCount
@ -634,6 +663,7 @@ func (d *RocksDB) processAddressesUTXO(block *bchain.Block, addresses map[string
spendingTxid := blockTxIDs[txi]
ta := blockTxAddresses[txi]
ta.Inputs = make([]TxInput, len(tx.Vin))
logged := false
for i, input := range tx.Vin {
tai := &ta.Inputs[i]
btxID, err := d.chainParser.PackTxid(input.Txid)
@ -670,7 +700,10 @@ func (d *RocksDB) processAddressesUTXO(block *bchain.Block, addresses map[string
// mark the output as spent in tx
ot.Spent = true
if len(ot.addrID) == 0 {
glog.Warningf("rocksdb: height %d, tx %v, input tx %v vout %v skipping empty address", block.Height, tx.Txid, input.Txid, input.Vout)
if !logged {
glog.Warningf("rocksdb: height %d, tx %v, input tx %v vout %v skipping empty address", block.Height, tx.Txid, input.Txid, input.Vout)
logged = true
}
continue
}
strAddrID := string(ot.addrID)