Use PrepareForBulkLoad option for connectBlocksParallel
This commit is contained in:
parent
e31095d03f
commit
97cc2aca17
18
blockbook.go
18
blockbook.go
@ -416,6 +416,11 @@ func connectBlocksParallel(
|
||||
higher uint32,
|
||||
numWorkers int,
|
||||
) error {
|
||||
err := index.ReopenWithBulk(true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
hch := make(chan string, numWorkers)
|
||||
running := make([]bool, numWorkers)
|
||||
@ -444,10 +449,8 @@ func connectBlocksParallel(
|
||||
wg.Add(1)
|
||||
go work(i)
|
||||
}
|
||||
var (
|
||||
err error
|
||||
hash string
|
||||
)
|
||||
var hash string
|
||||
|
||||
for h := lower; h <= higher; h++ {
|
||||
hash, err = chain.GetBlockHash(h)
|
||||
if err != nil {
|
||||
@ -469,7 +472,7 @@ func connectBlocksParallel(
|
||||
}
|
||||
break
|
||||
}
|
||||
if err = index.CompactDatabase(); err != nil {
|
||||
if err = index.CompactDatabase(true); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
@ -477,6 +480,11 @@ func connectBlocksParallel(
|
||||
}
|
||||
close(hch)
|
||||
wg.Wait()
|
||||
|
||||
if err == nil {
|
||||
err = index.ReopenWithBulk(false)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@ -39,7 +39,7 @@ const (
|
||||
|
||||
var cfNames = []string{"default", "height", "outputs", "inputs"}
|
||||
|
||||
func openDB(path string) (*gorocksdb.DB, []*gorocksdb.ColumnFamilyHandle, error) {
|
||||
func openDB(path string, bulk bool) (*gorocksdb.DB, []*gorocksdb.ColumnFamilyHandle, error) {
|
||||
fp := gorocksdb.NewBloomFilter(10)
|
||||
bbto := gorocksdb.NewDefaultBlockBasedTableOptions()
|
||||
bbto.SetBlockSize(16 << 10) // 16kb
|
||||
@ -55,6 +55,9 @@ func openDB(path string) (*gorocksdb.DB, []*gorocksdb.ColumnFamilyHandle, error)
|
||||
opts.SetBytesPerSync(1 << 20) // 1mb
|
||||
opts.SetWriteBufferSize(2 << 30) // 2 gb
|
||||
opts.SetMaxOpenFiles(25000)
|
||||
if bulk {
|
||||
opts.PrepareForBulkLoad()
|
||||
}
|
||||
|
||||
fcOptions := []*gorocksdb.Options{opts, opts, opts, opts}
|
||||
|
||||
@ -70,7 +73,7 @@ func openDB(path string) (*gorocksdb.DB, []*gorocksdb.ColumnFamilyHandle, error)
|
||||
// needs to be called to release it.
|
||||
func NewRocksDB(path string) (d *RocksDB, err error) {
|
||||
glog.Infof("rocksdb: open %s", path)
|
||||
db, cfh, err := openDB(path)
|
||||
db, cfh, err := openDB(path, false)
|
||||
wo := gorocksdb.NewDefaultWriteOptions()
|
||||
ro := gorocksdb.NewDefaultReadOptions()
|
||||
ro.SetFillCache(false)
|
||||
@ -474,21 +477,32 @@ func dirSize(path string) (int64, error) {
|
||||
// After unsuccessful experiment with CompactRange method (slow and actually fragmenting the db without compacting)
|
||||
// the method now closes the db instance and opens it again.
|
||||
// This means that during compact nobody can access the dababase!
|
||||
func (d *RocksDB) CompactDatabase() error {
|
||||
func (d *RocksDB) CompactDatabase(bulk bool) error {
|
||||
size, _ := dirSize(d.path)
|
||||
glog.Info("Compacting database, db size ", size)
|
||||
if err := d.ReopenWithBulk(bulk); err != nil {
|
||||
return err
|
||||
}
|
||||
size, _ = dirSize(d.path)
|
||||
glog.Info("Compacting database finished, db size ", size)
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReopenWithBulk reopens the database with different settings:
|
||||
// if bulk==true, reopens for bulk load
|
||||
// if bulk==false, reopens for normal operation
|
||||
// It closes and reopens db, nobody can access the database during the operation!
|
||||
func (d *RocksDB) ReopenWithBulk(bulk bool) error {
|
||||
err := d.closeDB()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.db = nil
|
||||
db, cfh, err := openDB(d.path)
|
||||
db, cfh, err := openDB(d.path, bulk)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.db, d.cfh = db, cfh
|
||||
size, _ = dirSize(d.path)
|
||||
glog.Info("Compacting database finished, db size ", size)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user