rocksdb column families

This commit is contained in:
Martin Boehm 2018-01-24 16:57:05 +01:00
parent ab562d05c5
commit e72b437e23

View File

@ -21,11 +21,21 @@ func RepairRocksDB(name string) error {
}
type RocksDB struct {
db *gorocksdb.DB
wo *gorocksdb.WriteOptions
ro *gorocksdb.ReadOptions
db *gorocksdb.DB
wo *gorocksdb.WriteOptions
ro *gorocksdb.ReadOptions
cfh []*gorocksdb.ColumnFamilyHandle
}
const (
cfDefault = iota
cfHeight
cfOutputs
cfInputs
)
var cfNames = []string{"default", "height", "outputs", "inputs"}
// NewRocksDB opens an internal handle to RocksDB environment. Close
// needs to be called to release it.
func NewRocksDB(path string) (d *RocksDB, err error) {
@ -40,13 +50,16 @@ func NewRocksDB(path string) (d *RocksDB, err error) {
opts := gorocksdb.NewDefaultOptions()
opts.SetBlockBasedTableFactory(bbto)
opts.SetCreateIfMissing(true)
opts.SetCreateIfMissingColumnFamilies(true)
opts.SetMaxBackgroundCompactions(4)
opts.SetMaxBackgroundFlushes(2)
opts.SetBytesPerSync(1 << 20) // 1mb
opts.SetWriteBufferSize(2 << 30) // 2 gb
opts.SetMaxOpenFiles(25000)
db, err := gorocksdb.OpenDb(opts, path)
fcOptions := []*gorocksdb.Options{opts, opts, opts, opts}
db, cfh, err := gorocksdb.OpenDbColumnFamilies(opts, path, cfNames, fcOptions)
if err != nil {
return
}
@ -55,12 +68,15 @@ func NewRocksDB(path string) (d *RocksDB, err error) {
ro := gorocksdb.NewDefaultReadOptions()
ro.SetFillCache(false)
return &RocksDB{db, wo, ro}, nil
return &RocksDB{db, wo, ro, cfh}, nil
}
// Close releases the RocksDB environment opened in NewRocksDB.
func (d *RocksDB) Close() error {
log.Printf("rocksdb: close")
for _, h := range d.cfh {
h.Destroy()
}
d.wo.Destroy()
d.ro.Destroy()
d.db.Close()
@ -79,7 +95,7 @@ func (d *RocksDB) GetTransactions(address string, lower uint32, higher uint32, f
return err
}
it := d.db.NewIterator(d.ro)
it := d.db.NewIteratorCF(d.ro, d.cfh[cfOutputs])
defer it.Close()
for it.Seek(kstart); it.Valid(); it.Next() {
@ -178,9 +194,9 @@ func (d *RocksDB) writeOutputs(
switch op {
case opInsert:
wb.Put(key, val)
wb.PutCF(d.cfh[cfOutputs], key, val)
case opDelete:
wb.Delete(key)
wb.DeleteCF(d.cfh[cfOutputs], key)
}
}
@ -253,9 +269,9 @@ func (d *RocksDB) writeInputs(
}
switch op {
case opInsert:
wb.Put(key, val)
wb.PutCF(d.cfh[cfInputs], key, val)
case opDelete:
wb.Delete(key)
wb.DeleteCF(d.cfh[cfInputs], key)
}
}
}
@ -282,7 +298,7 @@ func (d *RocksDB) GetBestBlockHash() (string, error) {
func (d *RocksDB) GetBlockHash(height uint32) (string, error) {
key := packUint(height)
val, err := d.db.Get(d.ro, key)
val, err := d.db.GetCF(d.ro, d.cfh[cfHeight], key)
if err != nil {
return "", err
}
@ -303,9 +319,9 @@ func (d *RocksDB) writeHeight(
if err != nil {
return err
}
wb.Put(key, val)
wb.PutCF(d.cfh[cfHeight], key, val)
case opDelete:
wb.Delete(key)
wb.DeleteCF(d.cfh[cfHeight], key)
}
return nil