diff --git a/blockbook.go b/blockbook.go index b12aef36..29e87d73 100644 --- a/blockbook.go +++ b/blockbook.go @@ -416,6 +416,11 @@ func connectBlocksParallel( higher uint32, numWorkers int, ) error { + err := index.ReopenWithBulk(true) + if err != nil { + return err + } + var wg sync.WaitGroup hch := make(chan string, numWorkers) running := make([]bool, numWorkers) @@ -444,10 +449,8 @@ func connectBlocksParallel( wg.Add(1) go work(i) } - var ( - err error - hash string - ) + var hash string + for h := lower; h <= higher; h++ { hash, err = chain.GetBlockHash(h) if err != nil { @@ -469,7 +472,7 @@ func connectBlocksParallel( } break } - if err = index.CompactDatabase(); err != nil { + if err = index.CompactDatabase(true); err != nil { break } } @@ -477,6 +480,11 @@ func connectBlocksParallel( } close(hch) wg.Wait() + + if err == nil { + err = index.ReopenWithBulk(false) + } + return err } diff --git a/db/rocksdb.go b/db/rocksdb.go index 69d7ce16..726f3fd9 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -39,7 +39,7 @@ const ( var cfNames = []string{"default", "height", "outputs", "inputs"} -func openDB(path string) (*gorocksdb.DB, []*gorocksdb.ColumnFamilyHandle, error) { +func openDB(path string, bulk bool) (*gorocksdb.DB, []*gorocksdb.ColumnFamilyHandle, error) { fp := gorocksdb.NewBloomFilter(10) bbto := gorocksdb.NewDefaultBlockBasedTableOptions() bbto.SetBlockSize(16 << 10) // 16kb @@ -55,6 +55,9 @@ func openDB(path string) (*gorocksdb.DB, []*gorocksdb.ColumnFamilyHandle, error) opts.SetBytesPerSync(1 << 20) // 1mb opts.SetWriteBufferSize(2 << 30) // 2 gb opts.SetMaxOpenFiles(25000) + if bulk { + opts.PrepareForBulkLoad() + } fcOptions := []*gorocksdb.Options{opts, opts, opts, opts} @@ -70,7 +73,7 @@ func openDB(path string) (*gorocksdb.DB, []*gorocksdb.ColumnFamilyHandle, error) // needs to be called to release it. func NewRocksDB(path string) (d *RocksDB, err error) { glog.Infof("rocksdb: open %s", path) - db, cfh, err := openDB(path) + db, cfh, err := openDB(path, false) wo := gorocksdb.NewDefaultWriteOptions() ro := gorocksdb.NewDefaultReadOptions() ro.SetFillCache(false) @@ -474,21 +477,32 @@ func dirSize(path string) (int64, error) { // After unsuccessful experiment with CompactRange method (slow and actually fragmenting the db without compacting) // the method now closes the db instance and opens it again. // This means that during compact nobody can access the dababase! -func (d *RocksDB) CompactDatabase() error { +func (d *RocksDB) CompactDatabase(bulk bool) error { size, _ := dirSize(d.path) glog.Info("Compacting database, db size ", size) + if err := d.ReopenWithBulk(bulk); err != nil { + return err + } + size, _ = dirSize(d.path) + glog.Info("Compacting database finished, db size ", size) + return nil +} + +// ReopenWithBulk reopens the database with different settings: +// if bulk==true, reopens for bulk load +// if bulk==false, reopens for normal operation +// It closes and reopens db, nobody can access the database during the operation! +func (d *RocksDB) ReopenWithBulk(bulk bool) error { err := d.closeDB() if err != nil { return err } d.db = nil - db, cfh, err := openDB(d.path) + db, cfh, err := openDB(d.path, bulk) if err != nil { return err } d.db, d.cfh = db, cfh - size, _ = dirSize(d.path) - glog.Info("Compacting database finished, db size ", size) return nil }