Compact database during connectBlocksParallel

This commit is contained in:
Martin Boehm 2018-02-05 10:31:22 +01:00
parent d4a35b9889
commit e31095d03f
2 changed files with 90 additions and 25 deletions

View File

@ -113,7 +113,7 @@ func main() {
} }
if *synchronize { if *synchronize {
if err := resyncIndex(); err != nil { if err := resyncIndex(true); err != nil {
glog.Fatal("resyncIndex ", err) glog.Fatal("resyncIndex ", err)
} }
go syncIndexLoop() go syncIndexLoop()
@ -219,7 +219,7 @@ func syncIndexLoop() {
glog.Info("syncIndexLoop starting") glog.Info("syncIndexLoop starting")
// resync index about every 15 minutes if there are no chanSyncIndex requests, with debounce 1 second // resync index about every 15 minutes if there are no chanSyncIndex requests, with debounce 1 second
tickAndDebounce(935093*time.Millisecond, 1009*time.Millisecond, chanSyncIndex, func() { tickAndDebounce(935093*time.Millisecond, 1009*time.Millisecond, chanSyncIndex, func() {
if err := resyncIndex(); err != nil { if err := resyncIndex(false); err != nil {
glog.Error("syncIndexLoop", err) glog.Error("syncIndexLoop", err)
} }
}) })
@ -279,7 +279,7 @@ func printResult(txid string, vout uint32, isOutput bool) error {
return nil return nil
} }
func resyncIndex() error { func resyncIndex(bulk bool) error {
remote, err := chain.GetBestBlockHash() remote, err := chain.GetBestBlockHash()
if err != nil { if err != nil {
return err return err
@ -334,7 +334,7 @@ func resyncIndex() error {
if err != nil { if err != nil {
return err return err
} }
return resyncIndex() return resyncIndex(false)
} }
} }
@ -359,7 +359,7 @@ func resyncIndex() error {
// if parallel operation is enabled and the number of blocks to be connected is large, // if parallel operation is enabled and the number of blocks to be connected is large,
// use parallel routine to load majority of blocks // use parallel routine to load majority of blocks
if *syncWorkers > 1 { if bulk && *syncWorkers > 1 {
chainBestHeight, err := chain.GetBestBlockHeight() chainBestHeight, err := chain.GetBestBlockHeight()
if err != nil { if err != nil {
return err return err
@ -376,7 +376,7 @@ func resyncIndex() error {
} }
// after parallel load finish the sync using standard way, // after parallel load finish the sync using standard way,
// new blocks may have been created in the meantime // new blocks may have been created in the meantime
return resyncIndex() return resyncIndex(false)
} }
} }
@ -418,27 +418,31 @@ func connectBlocksParallel(
) error { ) error {
var wg sync.WaitGroup var wg sync.WaitGroup
hch := make(chan string, numWorkers) hch := make(chan string, numWorkers)
running := make([]bool, numWorkers)
work := func() { work := func(i int) {
defer wg.Done() defer wg.Done()
for hash := range hch { for hash := range hch {
running[i] = true
block, err := chain.GetBlock(hash) block, err := chain.GetBlock(hash)
if err != nil { if err != nil {
glog.Error("Connect block ", hash, " error ", err) glog.Error("Connect block ", hash, " error ", err)
running[i] = false
continue continue
} }
if *dryRun { if *dryRun {
running[i] = false
continue continue
} }
err = index.ConnectBlock(block) err = index.ConnectBlock(block)
if err != nil { if err != nil {
glog.Error("Connect block ", hash, " error ", err) glog.Error("Connect block ", hash, " error ", err)
} }
running[i] = false
} }
} }
for i := 0; i < numWorkers; i++ { for i := 0; i < numWorkers; i++ {
wg.Add(1) wg.Add(1)
go work() go work(i)
} }
var ( var (
err error err error
@ -450,8 +454,25 @@ func connectBlocksParallel(
break break
} }
hch <- hash hch <- hash
if h%1000 == 0 { if h > 0 && h%1000 == 0 {
glog.Info("connecting block ", h, " ", hash) glog.Info("connecting block ", h, " ", hash)
if h%50000 == 0 {
// wait for the workers to finish block
WaitAgain:
for {
for _, r := range running {
if r {
glog.Info("Waiting ", running)
time.Sleep(time.Millisecond * 500)
continue WaitAgain
}
}
break
}
if err = index.CompactDatabase(); err != nil {
break
}
}
} }
} }
close(hch) close(hch)

View File

@ -6,6 +6,8 @@ import (
"encoding/binary" "encoding/binary"
"encoding/hex" "encoding/hex"
"errors" "errors"
"os"
"path/filepath"
"github.com/bsm/go-vlq" "github.com/bsm/go-vlq"
"github.com/golang/glog" "github.com/golang/glog"
@ -21,10 +23,11 @@ func RepairRocksDB(name string) error {
// RocksDB handle // RocksDB handle
type RocksDB struct { type RocksDB struct {
db *gorocksdb.DB path string
wo *gorocksdb.WriteOptions db *gorocksdb.DB
ro *gorocksdb.ReadOptions wo *gorocksdb.WriteOptions
cfh []*gorocksdb.ColumnFamilyHandle ro *gorocksdb.ReadOptions
cfh []*gorocksdb.ColumnFamilyHandle
} }
const ( const (
@ -36,11 +39,7 @@ const (
var cfNames = []string{"default", "height", "outputs", "inputs"} var cfNames = []string{"default", "height", "outputs", "inputs"}
// NewRocksDB opens an internal handle to RocksDB environment. Close func openDB(path string) (*gorocksdb.DB, []*gorocksdb.ColumnFamilyHandle, error) {
// needs to be called to release it.
func NewRocksDB(path string) (d *RocksDB, err error) {
glog.Infof("rocksdb: open %s", path)
fp := gorocksdb.NewBloomFilter(10) fp := gorocksdb.NewBloomFilter(10)
bbto := gorocksdb.NewDefaultBlockBasedTableOptions() bbto := gorocksdb.NewDefaultBlockBasedTableOptions()
bbto.SetBlockSize(16 << 10) // 16kb bbto.SetBlockSize(16 << 10) // 16kb
@ -61,25 +60,37 @@ func NewRocksDB(path string) (d *RocksDB, err error) {
db, cfh, err := gorocksdb.OpenDbColumnFamilies(opts, path, cfNames, fcOptions) db, cfh, err := gorocksdb.OpenDbColumnFamilies(opts, path, cfNames, fcOptions)
if err != nil { if err != nil {
return return nil, nil, err
} }
return db, cfh, nil
}
// NewRocksDB opens an internal handle to RocksDB environment. Close
// needs to be called to release it.
func NewRocksDB(path string) (d *RocksDB, err error) {
glog.Infof("rocksdb: open %s", path)
db, cfh, err := openDB(path)
wo := gorocksdb.NewDefaultWriteOptions() wo := gorocksdb.NewDefaultWriteOptions()
ro := gorocksdb.NewDefaultReadOptions() ro := gorocksdb.NewDefaultReadOptions()
ro.SetFillCache(false) ro.SetFillCache(false)
return &RocksDB{path, db, wo, ro, cfh}, nil
}
return &RocksDB{db, wo, ro, cfh}, nil func (d *RocksDB) closeDB() error {
for _, h := range d.cfh {
h.Destroy()
}
d.db.Close()
return nil
} }
// Close releases the RocksDB environment opened in NewRocksDB. // Close releases the RocksDB environment opened in NewRocksDB.
func (d *RocksDB) Close() error { func (d *RocksDB) Close() error {
glog.Infof("rocksdb: close") glog.Infof("rocksdb: close")
for _, h := range d.cfh { d.closeDB()
h.Destroy()
}
d.wo.Destroy() d.wo.Destroy()
d.ro.Destroy() d.ro.Destroy()
d.db.Close()
return nil return nil
} }
@ -448,6 +459,39 @@ func (d *RocksDB) DisconnectBlocks(
return err return err
} }
func dirSize(path string) (int64, error) {
var size int64
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
if !info.IsDir() {
size += info.Size()
}
return err
})
return size, err
}
// CompactDatabase compacts the database
// After unsuccessful experiment with CompactRange method (slow and actually fragmenting the db without compacting)
// the method now closes the db instance and opens it again.
// This means that during compact nobody can access the dababase!
func (d *RocksDB) CompactDatabase() error {
size, _ := dirSize(d.path)
glog.Info("Compacting database, db size ", size)
err := d.closeDB()
if err != nil {
return err
}
d.db = nil
db, cfh, err := openDB(d.path)
if err != nil {
return err
}
d.db, d.cfh = db, cfh
size, _ = dirSize(d.path)
glog.Info("Compacting database finished, db size ", size)
return nil
}
// Helpers // Helpers
const txIdUnpackedLen = 32 const txIdUnpackedLen = 32