From ad5ddbd02961663525e1c9f222bc8664cae3584e Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Thu, 23 Aug 2018 23:20:07 +0200 Subject: [PATCH] Write height column in bulk mode --- api/worker.go | 10 ++++----- db/bulkconnect.go | 52 +++++++++++++++++++++++++++------------------- db/rocksdb.go | 41 ++++++++++++++++++++++-------------- db/rocksdb_test.go | 9 ++++---- 4 files changed, 65 insertions(+), 47 deletions(-) diff --git a/api/worker.go b/api/worker.go index b0ddfe89..f8867c63 100644 --- a/api/worker.go +++ b/api/worker.go @@ -181,7 +181,7 @@ func (w *Worker) txFromTxAddress(txid string, ta *db.TxAddresses, bi *db.BlockIn vin.Value = w.chainParser.AmountToDecimalString(&vin.ValueSat) valInSat.Add(&valInSat, &vin.ValueSat) a, err := tai.Addresses(w.chainParser) - if err != nil && len(a) == 1 { + if err == nil && len(a) == 1 { vin.Addr = a[0] } } @@ -194,7 +194,7 @@ func (w *Worker) txFromTxAddress(txid string, ta *db.TxAddresses, bi *db.BlockIn vout.Value = w.chainParser.AmountToDecimalString(&vout.ValueSat) valOutSat.Add(&valOutSat, &vout.ValueSat) a, err := tao.Addresses(w.chainParser) - if err != nil { + if err == nil { vout.ScriptPubKey.Addresses = a } } @@ -204,12 +204,12 @@ func (w *Worker) txFromTxAddress(txid string, ta *db.TxAddresses, bi *db.BlockIn feesSat.SetUint64(0) } r := &Tx{ - Blockhash: bi.BlockHash, + Blockhash: bi.Hash, Blockheight: int(ta.Height), - Blocktime: bi.Time.Unix(), + Blocktime: bi.Time, Confirmations: bestheight - ta.Height + 1, Fees: w.chainParser.AmountToDecimalString(&feesSat), - Time: bi.Time.Unix(), + Time: bi.Time, Txid: txid, ValueIn: w.chainParser.AmountToDecimalString(&valInSat), ValueOut: w.chainParser.AmountToDecimalString(&valOutSat), diff --git a/db/bulkconnect.go b/db/bulkconnect.go index bece9c6b..eeefd234 100644 --- a/db/bulkconnect.go +++ b/db/bulkconnect.go @@ -16,6 +16,7 @@ import ( type bulkAddresses struct { height uint32 + bi BlockInfo addresses map[string][]outpoint } @@ -156,6 +157,9 @@ func (b *BulkConnect) storeBulkAddresses(wb *gorocksdb.WriteBatch) error { if err := b.d.storeAddresses(wb, ba.height, ba.addresses); err != nil { return err } + if err := b.d.writeHeight(wb, ba.height, &ba.bi, opInsert); err != nil { + return err + } } b.bulkAddressesCount = 0 b.bulkAddresses = b.bulkAddresses[:0] @@ -168,15 +172,12 @@ func (b *BulkConnect) ConnectBlock(block *bchain.Block, storeBlockTxs bool) erro if !b.isUTXO { return b.d.ConnectBlock(block) } - wb := gorocksdb.NewWriteBatch() - defer wb.Destroy() addresses := make(map[string][]outpoint) if err := b.d.processAddressesUTXO(block, addresses, b.txAddressesMap, b.balances); err != nil { return err } - start := time.Now() - var sa bool var storeAddressesChan, storeBalancesChan chan error + var sa bool if len(b.txAddressesMap) > maxBulkTxAddresses || len(b.balances) > maxBulkBalances { sa = true if len(b.txAddressesMap)+partialStoreAddresses > maxBulkTxAddresses { @@ -189,30 +190,39 @@ func (b *BulkConnect) ConnectBlock(block *bchain.Block, storeBlockTxs bool) erro } } b.bulkAddresses = append(b.bulkAddresses, bulkAddresses{ - height: block.Height, + height: block.Height, + bi: BlockInfo{ + Hash: block.Hash, + Time: block.Time, + Txs: uint32(len(block.Txs)), + Size: uint32(block.Size), + }, addresses: addresses, }) b.bulkAddressesCount += len(addresses) - bac := b.bulkAddressesCount - if sa || b.bulkAddressesCount > maxBulkAddresses { - if err := b.storeBulkAddresses(wb); err != nil { + // open WriteBatch only if going to write + if sa || b.bulkAddressesCount > maxBulkAddresses || storeBlockTxs { + start := time.Now() + wb := gorocksdb.NewWriteBatch() + defer wb.Destroy() + bac := b.bulkAddressesCount + if sa || b.bulkAddressesCount > maxBulkAddresses { + if err := b.storeBulkAddresses(wb); err != nil { + return err + } + } + if storeBlockTxs { + if err := b.d.storeAndCleanupBlockTxs(wb, block); err != nil { + return err + } + } + if err := b.d.db.Write(b.d.wo, wb); err != nil { return err } - } - if storeBlockTxs { - if err := b.d.storeAndCleanupBlockTxs(wb, block); err != nil { - return err + if bac > b.bulkAddressesCount { + glog.Info("rocksdb: height ", b.height, ", stored ", bac, " addresses, done in ", time.Since(start)) } } - if err := b.d.writeHeight(wb, block, opInsert); err != nil { - return err - } - if err := b.d.db.Write(b.d.wo, wb); err != nil { - return err - } - if bac > b.bulkAddressesCount { - glog.Info("rocksdb: height ", b.height, ", stored ", bac, " addresses, done in ", time.Since(start)) - } if storeAddressesChan != nil { if err := <-storeAddressesChan; err != nil { return err diff --git a/db/rocksdb.go b/db/rocksdb.go index bceb1b4d..de498388 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -237,7 +237,7 @@ func (d *RocksDB) writeBlock(block *bchain.Block, op int) error { isUTXO := d.chainParser.IsUTXOChain() - if err := d.writeHeight(wb, block, op); err != nil { + if err := d.writeHeightFromBlock(wb, block, op); err != nil { return err } if isUTXO { @@ -862,14 +862,15 @@ func (d *RocksDB) writeAddressesNonUTXO(wb *gorocksdb.WriteBatch, block *bchain. // Block index +// BlockInfo holds information about blocks kept in column height type BlockInfo struct { - BlockHash string - Time time.Time - Txs uint32 - Size uint32 + Hash string + Time int64 + Txs uint32 + Size uint32 } -func (d *RocksDB) packBlockInfo(block *bchain.Block) ([]byte, error) { +func (d *RocksDB) packBlockInfo(block *BlockInfo) ([]byte, error) { packed := make([]byte, 0, 64) varBuf := make([]byte, vlq.MaxLen64) b, err := d.chainParser.PackBlockHash(block.Hash) @@ -878,7 +879,7 @@ func (d *RocksDB) packBlockInfo(block *bchain.Block) ([]byte, error) { } packed = append(packed, b...) packed = append(packed, packUint(uint32(block.Time))...) - l := packVaruint(uint(len(block.Txs)), varBuf) + l := packVaruint(uint(block.Txs), varBuf) packed = append(packed, varBuf[:l]...) l = packVaruint(uint(block.Size), varBuf) packed = append(packed, varBuf[:l]...) @@ -899,10 +900,10 @@ func (d *RocksDB) unpackBlockInfo(buf []byte) (*BlockInfo, error) { txs, l := unpackVaruint(buf[pl+4:]) size, _ := unpackVaruint(buf[pl+4+l:]) return &BlockInfo{ - BlockHash: txid, - Time: time.Unix(int64(t), 0), - Txs: uint32(txs), - Size: uint32(size), + Hash: txid, + Time: int64(t), + Txs: uint32(txs), + Size: uint32(size), }, nil } @@ -917,7 +918,7 @@ func (d *RocksDB) GetBestBlock() (uint32, string, error) { if glog.V(1) { glog.Infof("rocksdb: bestblock %d %+v", bestHeight, info) } - return bestHeight, info.BlockHash, err + return bestHeight, info.Hash, err } } return 0, "", nil @@ -935,7 +936,7 @@ func (d *RocksDB) GetBlockHash(height uint32) (string, error) { if info == nil { return "", err } - return info.BlockHash, nil + return info.Hash, nil } // GetBlockInfo returns block info stored in db @@ -949,12 +950,20 @@ func (d *RocksDB) GetBlockInfo(height uint32) (*BlockInfo, error) { return d.unpackBlockInfo(val.Data()) } -func (d *RocksDB) writeHeight(wb *gorocksdb.WriteBatch, block *bchain.Block, op int) error { - key := packUint(block.Height) +func (d *RocksDB) writeHeightFromBlock(wb *gorocksdb.WriteBatch, block *bchain.Block, op int) error { + return d.writeHeight(wb, block.Height, &BlockInfo{ + Hash: block.Hash, + Time: block.Time, + Txs: uint32(len(block.Txs)), + Size: uint32(block.Size), + }, op) +} +func (d *RocksDB) writeHeight(wb *gorocksdb.WriteBatch, height uint32, bi *BlockInfo, op int) error { + key := packUint(height) switch op { case opInsert: - val, err := d.packBlockInfo(block) + val, err := d.packBlockInfo(bi) if err != nil { return err } diff --git a/db/rocksdb_test.go b/db/rocksdb_test.go index e36a3ab0..3b21b93d 100644 --- a/db/rocksdb_test.go +++ b/db/rocksdb_test.go @@ -16,7 +16,6 @@ import ( "sort" "strings" "testing" - "time" vlq "github.com/bsm/go-vlq" "github.com/juju/errors" @@ -731,10 +730,10 @@ func TestRocksDB_Index_UTXO(t *testing.T) { t.Fatal(err) } iw := &BlockInfo{ - BlockHash: "00000000eb0443fd7dc4a1ed5c686a8e995057805f9a161d9a5a77a95e72b7b6", - Txs: 4, - Size: 2345678, - Time: time.Unix(1534859123, 0), + Hash: "00000000eb0443fd7dc4a1ed5c686a8e995057805f9a161d9a5a77a95e72b7b6", + Txs: 4, + Size: 2345678, + Time: 1534859123, } if !reflect.DeepEqual(info, iw) { t.Errorf("GetAddressBalance() = %+v, want %+v", info, iw)