From e31095d03ff977bf8f01ae0e81448331c87cac67 Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Mon, 5 Feb 2018 10:31:22 +0100 Subject: [PATCH] Compact database during connectBlocksParallel --- blockbook.go | 41 +++++++++++++++++++++------- db/rocksdb.go | 74 ++++++++++++++++++++++++++++++++++++++++----------- 2 files changed, 90 insertions(+), 25 deletions(-) diff --git a/blockbook.go b/blockbook.go index 75201623..b12aef36 100644 --- a/blockbook.go +++ b/blockbook.go @@ -113,7 +113,7 @@ func main() { } if *synchronize { - if err := resyncIndex(); err != nil { + if err := resyncIndex(true); err != nil { glog.Fatal("resyncIndex ", err) } go syncIndexLoop() @@ -219,7 +219,7 @@ func syncIndexLoop() { glog.Info("syncIndexLoop starting") // resync index about every 15 minutes if there are no chanSyncIndex requests, with debounce 1 second tickAndDebounce(935093*time.Millisecond, 1009*time.Millisecond, chanSyncIndex, func() { - if err := resyncIndex(); err != nil { + if err := resyncIndex(false); err != nil { glog.Error("syncIndexLoop", err) } }) @@ -279,7 +279,7 @@ func printResult(txid string, vout uint32, isOutput bool) error { return nil } -func resyncIndex() error { +func resyncIndex(bulk bool) error { remote, err := chain.GetBestBlockHash() if err != nil { return err @@ -334,7 +334,7 @@ func resyncIndex() error { if err != nil { return err } - return resyncIndex() + return resyncIndex(false) } } @@ -359,7 +359,7 @@ func resyncIndex() error { // if parallel operation is enabled and the number of blocks to be connected is large, // use parallel routine to load majority of blocks - if *syncWorkers > 1 { + if bulk && *syncWorkers > 1 { chainBestHeight, err := chain.GetBestBlockHeight() if err != nil { return err @@ -376,7 +376,7 @@ func resyncIndex() error { } // after parallel load finish the sync using standard way, // new blocks may have been created in the meantime - return resyncIndex() + return resyncIndex(false) } } @@ -418,27 +418,31 @@ func connectBlocksParallel( ) error { var wg sync.WaitGroup hch := make(chan string, numWorkers) - - work := func() { + running := make([]bool, numWorkers) + work := func(i int) { defer wg.Done() for hash := range hch { + running[i] = true block, err := chain.GetBlock(hash) if err != nil { glog.Error("Connect block ", hash, " error ", err) + running[i] = false continue } if *dryRun { + running[i] = false continue } err = index.ConnectBlock(block) if err != nil { glog.Error("Connect block ", hash, " error ", err) } + running[i] = false } } for i := 0; i < numWorkers; i++ { wg.Add(1) - go work() + go work(i) } var ( err error @@ -450,8 +454,25 @@ func connectBlocksParallel( break } hch <- hash - if h%1000 == 0 { + if h > 0 && h%1000 == 0 { glog.Info("connecting block ", h, " ", hash) + if h%50000 == 0 { + // wait for the workers to finish block + WaitAgain: + for { + for _, r := range running { + if r { + glog.Info("Waiting ", running) + time.Sleep(time.Millisecond * 500) + continue WaitAgain + } + } + break + } + if err = index.CompactDatabase(); err != nil { + break + } + } } } close(hch) diff --git a/db/rocksdb.go b/db/rocksdb.go index 6c94d241..69d7ce16 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -6,6 +6,8 @@ import ( "encoding/binary" "encoding/hex" "errors" + "os" + "path/filepath" "github.com/bsm/go-vlq" "github.com/golang/glog" @@ -21,10 +23,11 @@ func RepairRocksDB(name string) error { // RocksDB handle type RocksDB struct { - db *gorocksdb.DB - wo *gorocksdb.WriteOptions - ro *gorocksdb.ReadOptions - cfh []*gorocksdb.ColumnFamilyHandle + path string + db *gorocksdb.DB + wo *gorocksdb.WriteOptions + ro *gorocksdb.ReadOptions + cfh []*gorocksdb.ColumnFamilyHandle } const ( @@ -36,11 +39,7 @@ const ( var cfNames = []string{"default", "height", "outputs", "inputs"} -// NewRocksDB opens an internal handle to RocksDB environment. Close -// needs to be called to release it. -func NewRocksDB(path string) (d *RocksDB, err error) { - glog.Infof("rocksdb: open %s", path) - +func openDB(path string) (*gorocksdb.DB, []*gorocksdb.ColumnFamilyHandle, error) { fp := gorocksdb.NewBloomFilter(10) bbto := gorocksdb.NewDefaultBlockBasedTableOptions() bbto.SetBlockSize(16 << 10) // 16kb @@ -61,25 +60,37 @@ func NewRocksDB(path string) (d *RocksDB, err error) { db, cfh, err := gorocksdb.OpenDbColumnFamilies(opts, path, cfNames, fcOptions) if err != nil { - return + return nil, nil, err } + return db, cfh, nil +} + +// NewRocksDB opens an internal handle to RocksDB environment. Close +// needs to be called to release it. +func NewRocksDB(path string) (d *RocksDB, err error) { + glog.Infof("rocksdb: open %s", path) + db, cfh, err := openDB(path) wo := gorocksdb.NewDefaultWriteOptions() ro := gorocksdb.NewDefaultReadOptions() ro.SetFillCache(false) + return &RocksDB{path, db, wo, ro, cfh}, nil +} - return &RocksDB{db, wo, ro, cfh}, nil +func (d *RocksDB) closeDB() error { + for _, h := range d.cfh { + h.Destroy() + } + d.db.Close() + return nil } // Close releases the RocksDB environment opened in NewRocksDB. func (d *RocksDB) Close() error { glog.Infof("rocksdb: close") - for _, h := range d.cfh { - h.Destroy() - } + d.closeDB() d.wo.Destroy() d.ro.Destroy() - d.db.Close() return nil } @@ -448,6 +459,39 @@ func (d *RocksDB) DisconnectBlocks( return err } +func dirSize(path string) (int64, error) { + var size int64 + err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error { + if !info.IsDir() { + size += info.Size() + } + return err + }) + return size, err +} + +// CompactDatabase compacts the database +// After unsuccessful experiment with CompactRange method (slow and actually fragmenting the db without compacting) +// the method now closes the db instance and opens it again. +// This means that during compact nobody can access the dababase! +func (d *RocksDB) CompactDatabase() error { + size, _ := dirSize(d.path) + glog.Info("Compacting database, db size ", size) + err := d.closeDB() + if err != nil { + return err + } + d.db = nil + db, cfh, err := openDB(d.path) + if err != nil { + return err + } + d.db, d.cfh = db, cfh + size, _ = dirSize(d.path) + glog.Info("Compacting database finished, db size ", size) + return nil +} + // Helpers const txIdUnpackedLen = 32