diff --git a/db/rocksdb.go b/db/rocksdb.go index 6f6b3793..35a52cde 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -6,6 +6,7 @@ import ( "bytes" "encoding/binary" "encoding/hex" + "fmt" "math/big" "os" "path/filepath" @@ -42,6 +43,7 @@ type RocksDB struct { chainParser bchain.BlockChainParser is *common.InternalState metrics *common.Metrics + cache *gorocksdb.Cache } const ( @@ -56,57 +58,49 @@ const ( var cfNames = []string{"default", "height", "addresses", "txAddresses", "addressBalance", "blockTxs", "transactions"} -func openDB(path string) (*gorocksdb.DB, []*gorocksdb.ColumnFamilyHandle, error) { - c := gorocksdb.NewLRUCache(8 << 30) // 8GB - fp := gorocksdb.NewBloomFilter(10) - bbto := gorocksdb.NewDefaultBlockBasedTableOptions() - bbto.SetBlockSize(16 << 10) // 16kB - bbto.SetBlockCache(c) - bbto.SetFilterPolicy(fp) +func openDB(path string, c *gorocksdb.Cache) (*gorocksdb.DB, []*gorocksdb.ColumnFamilyHandle, error) { + bloom := gorocksdb.NewBloomFilter(10) + blockOpts := gorocksdb.NewDefaultBlockBasedTableOptions() + blockOpts.SetBlockSize(16 << 10) // 16kB + blockOpts.SetBlockCache(c) + blockOpts.SetFilterPolicy(bloom) + blockOpts.SetCacheIndexAndFilterBlocks(true) + blockOpts.SetPinL0FilterAndIndexBlocksInCache(true) - optsNoCompression := gorocksdb.NewDefaultOptions() - optsNoCompression.SetBlockBasedTableFactory(bbto) - optsNoCompression.SetCreateIfMissing(true) - optsNoCompression.SetCreateIfMissingColumnFamilies(true) - optsNoCompression.SetMaxBackgroundCompactions(4) - optsNoCompression.SetMaxBackgroundFlushes(2) - optsNoCompression.SetBytesPerSync(1 << 20) // 1MB - optsNoCompression.SetWriteBufferSize(1 << 27) // 128MB - optsNoCompression.SetMaxOpenFiles(25000) - optsNoCompression.SetCompression(gorocksdb.NoCompression) - - optsLZ4 := gorocksdb.NewDefaultOptions() - optsLZ4.SetBlockBasedTableFactory(bbto) - optsLZ4.SetCreateIfMissing(true) - optsLZ4.SetCreateIfMissingColumnFamilies(true) - optsLZ4.SetMaxBackgroundCompactions(4) - optsLZ4.SetMaxBackgroundFlushes(2) - optsLZ4.SetBytesPerSync(1 << 20) // 1MB - optsLZ4.SetWriteBufferSize(1 << 27) // 128MB - optsLZ4.SetMaxOpenFiles(25000) - optsLZ4.SetCompression(gorocksdb.LZ4HCCompression) + opts := gorocksdb.NewDefaultOptions() + opts.SetBlockBasedTableFactory(blockOpts) + opts.SetCreateIfMissing(true) + opts.SetCreateIfMissingColumnFamilies(true) + opts.SetMaxBackgroundCompactions(6) + opts.SetMaxBackgroundFlushes(6) + opts.SetBytesPerSync(1 << 20) // 1MB + opts.SetWriteBufferSize(1 << 27) // 128MB + opts.SetMaxOpenFiles(25000) + opts.SetCompression(gorocksdb.LZ4HCCompression) // opts for addresses are different: // no bloom filter - from documentation: If most of your queries are executed using iterators, you shouldn't set bloom filter - bbtoAddresses := gorocksdb.NewDefaultBlockBasedTableOptions() - bbtoAddresses.SetBlockSize(16 << 10) // 16kB - bbtoAddresses.SetBlockCache(c) // 8GB + blockOptsAddress := gorocksdb.NewDefaultBlockBasedTableOptions() + blockOptsAddress.SetBlockSize(16 << 10) // 16kB + blockOptsAddress.SetBlockCache(c) // 8GB + blockOptsAddress.SetCacheIndexAndFilterBlocks(true) + blockOptsAddress.SetPinL0FilterAndIndexBlocksInCache(true) optsAddresses := gorocksdb.NewDefaultOptions() - optsAddresses.SetBlockBasedTableFactory(bbtoAddresses) + optsAddresses.SetBlockBasedTableFactory(blockOptsAddress) optsAddresses.SetCreateIfMissing(true) optsAddresses.SetCreateIfMissingColumnFamilies(true) - optsAddresses.SetMaxBackgroundCompactions(4) - optsAddresses.SetMaxBackgroundFlushes(2) + optsAddresses.SetMaxBackgroundCompactions(6) + optsAddresses.SetMaxBackgroundFlushes(6) optsAddresses.SetBytesPerSync(1 << 20) // 1MB optsAddresses.SetWriteBufferSize(1 << 27) // 128MB optsAddresses.SetMaxOpenFiles(25000) optsAddresses.SetCompression(gorocksdb.LZ4HCCompression) // default, height, addresses, txAddresses, addressBalance, blockTxids, transactions - fcOptions := []*gorocksdb.Options{optsLZ4, optsLZ4, optsAddresses, optsLZ4, optsLZ4, optsLZ4, optsLZ4} + fcOptions := []*gorocksdb.Options{opts, opts, optsAddresses, opts, opts, opts, opts} - db, cfh, err := gorocksdb.OpenDbColumnFamilies(optsNoCompression, path, cfNames, fcOptions) + db, cfh, err := gorocksdb.OpenDbColumnFamilies(opts, path, cfNames, fcOptions) if err != nil { return nil, nil, err } @@ -117,11 +111,11 @@ func openDB(path string) (*gorocksdb.DB, []*gorocksdb.ColumnFamilyHandle, error) // needs to be called to release it. func NewRocksDB(path string, parser bchain.BlockChainParser, metrics *common.Metrics) (d *RocksDB, err error) { glog.Infof("rocksdb: open %s, version %v", path, dbVersion) - db, cfh, err := openDB(path) + c := gorocksdb.NewLRUCache(8 << 30) // 8GB + db, cfh, err := openDB(path, c) wo := gorocksdb.NewDefaultWriteOptions() ro := gorocksdb.NewDefaultReadOptions() - ro.SetFillCache(false) - return &RocksDB{path, db, wo, ro, cfh, parser, nil, metrics}, nil + return &RocksDB{path, db, wo, ro, cfh, parser, nil, metrics, c}, nil } func (d *RocksDB) closeDB() error { @@ -159,7 +153,7 @@ func (d *RocksDB) Reopen() error { return err } d.db = nil - db, cfh, err := openDB(d.path) + db, cfh, err := openDB(d.path, d.cache) if err != nil { return err } @@ -167,6 +161,34 @@ func (d *RocksDB) Reopen() error { return nil } +func (d *RocksDB) GetMemoryStats() string { + type columnStats struct { + name string + indexAndFilter string + memtable string + } + cs := make([]columnStats, len(cfNames)) + for i := 0; i < len(cfNames); i++ { + cs[i].name = cfNames[i] + cs[i].indexAndFilter = d.db.GetPropertyCF("rocksdb.estimate-table-readers-mem", d.cfh[i]) + cs[i].memtable = d.db.GetPropertyCF("rocksdb.cur-size-all-mem-tables", d.cfh[i]) + } + m := struct { + cacheUsage int + pinnedCacheUsage int + indexAndFilter string + memtable string + columns []columnStats + }{ + cacheUsage: d.cache.GetUsage(), + pinnedCacheUsage: d.cache.GetPinnedUsage(), + indexAndFilter: d.db.GetProperty("rocksdb.estimate-table-readers-mem"), + memtable: d.db.GetProperty("rocksdb.cur-size-all-mem-tables"), + columns: cs, + } + return fmt.Sprintf("%+v", m) +} + // GetTransactions finds all input/output transactions for address // Transaction are passed to callback function. func (d *RocksDB) GetTransactions(address string, lower uint32, higher uint32, fn func(txid string, vout uint32, isOutput bool) error) (err error) { @@ -308,8 +330,8 @@ type BulkConnect struct { } const ( - maxBulkAddresses = 300000 - maxBulkTxAddresses = 2000000 + maxBulkAddresses = 400000 + maxBulkTxAddresses = 1500000 partialStoreAddresses = maxBulkTxAddresses / 10 maxBulkBalances = 2500000 partialStoreBalances = maxBulkBalances / 10 @@ -445,40 +467,36 @@ func (b *BulkConnect) ConnectBlock(block *bchain.Block, storeBlockTxs bool) erro if !b.isUTXO { return b.d.ConnectBlock(block) } + wb := gorocksdb.NewWriteBatch() + defer wb.Destroy() addresses := make(map[string][]outpoint) if err := b.d.processAddressesUTXO(block, addresses, b.txAddressesMap, b.balances); err != nil { return err } + start := time.Now() + var sa bool + var storeAddressesChan, storeBalancesChan chan error + if len(b.txAddressesMap) > maxBulkTxAddresses || len(b.balances) > maxBulkBalances { + sa = true + if len(b.txAddressesMap)+partialStoreAddresses > maxBulkTxAddresses { + storeAddressesChan = make(chan error) + go b.parallelStoreTxAddresses(storeAddressesChan, false) + } + if len(b.balances)+partialStoreBalances > maxBulkBalances { + storeBalancesChan = make(chan error) + go b.parallelStoreBalances(storeBalancesChan, false) + } + } b.bulkAddresses = append(b.bulkAddresses, bulkAddresses{ height: block.Height, addresses: addresses, }) b.bulkAddressesCount += len(addresses) - wb := gorocksdb.NewWriteBatch() - defer wb.Destroy() - start := time.Now() - var count, count1, storeType int - var err error - const ( - storeNone = iota - storeTxAddresses - storeBalances - storeBulkAddresses - ) - // in one block store maximally one type of data - if len(b.txAddressesMap) > maxBulkTxAddresses { - storeType = storeTxAddresses - count, count1, err = b.storeTxAddresses(wb, false) - } else if len(b.balances) > maxBulkBalances { - storeType = storeBalances - count, err = b.storeBalances(wb, false) - } else if b.bulkAddressesCount > maxBulkAddresses { - storeType = storeBulkAddresses - count = b.bulkAddressesCount - err = b.storeBulkAddresses(wb) - } - if err != nil { - return err + bac := b.bulkAddressesCount + if sa || b.bulkAddressesCount > maxBulkAddresses { + if err := b.storeBulkAddresses(wb); err != nil { + return err + } } if storeBlockTxs { if err := b.d.storeAndCleanupBlockTxs(wb, block); err != nil { @@ -491,13 +509,18 @@ func (b *BulkConnect) ConnectBlock(block *bchain.Block, storeBlockTxs bool) erro if err := b.d.db.Write(b.d.wo, wb); err != nil { return err } - switch storeType { - case storeTxAddresses: - glog.Info("rocksdb: height ", b.height, ", stored ", count, " (", count1, " spent) txAddresses, ", len(b.txAddressesMap), " remaining, done in ", time.Since(start)) - case storeBalances: - glog.Info("rocksdb: height ", b.height, ", stored ", count, " balances, ", len(b.balances), " remaining, done in ", time.Since(start)) - case storeBulkAddresses: - glog.Info("rocksdb: height ", b.height, ", stored ", count, " addresses, done in ", time.Since(start)) + if bac > b.bulkAddressesCount { + glog.Info("rocksdb: height ", b.height, ", stored ", bac, " addresses, done in ", time.Since(start)) + } + if storeAddressesChan != nil { + if err := <-storeAddressesChan; err != nil { + return err + } + } + if storeBalancesChan != nil { + if err := <-storeBalancesChan; err != nil { + return err + } } return nil } @@ -1628,9 +1651,12 @@ func (d *RocksDB) storeState(is *common.InternalState) error { func (d *RocksDB) computeColumnSize(col int, stopCompute chan os.Signal) (int64, int64, int64, error) { var rows, keysSum, valuesSum int64 var seekKey []byte + // do not use cache + ro := gorocksdb.NewDefaultReadOptions() + ro.SetFillCache(false) for { var key []byte - it := d.db.NewIteratorCF(d.ro, d.cfh[col]) + it := d.db.NewIteratorCF(ro, d.cfh[col]) if rows == 0 { it.SeekToFirst() } else { diff --git a/db/sync.go b/db/sync.go index 32bc143d..8f52e589 100644 --- a/db/sync.go +++ b/db/sync.go @@ -286,6 +286,7 @@ func (w *SyncWorker) ConnectBlocksParallel(lower, higher uint32) error { go writeBlockWorker() var hash string start := time.Now() + msTime := time.Now().Add(1 * time.Minute) ConnectLoop: for h := lower; h <= higher; { select { @@ -305,6 +306,10 @@ ConnectLoop: glog.Info("connecting block ", h, " ", hash, ", elapsed ", time.Since(start)) start = time.Now() } + if msTime.Before(time.Now()) { + glog.Info(w.db.GetMemoryStats()) + msTime = time.Now().Add(1 * time.Minute) + } h++ } }