Write height column in bulk mode

This commit is contained in:
Martin Boehm 2018-08-23 23:20:07 +02:00
parent 328312e48a
commit ad5ddbd029
4 changed files with 65 additions and 47 deletions

View File

@ -181,7 +181,7 @@ func (w *Worker) txFromTxAddress(txid string, ta *db.TxAddresses, bi *db.BlockIn
vin.Value = w.chainParser.AmountToDecimalString(&vin.ValueSat)
valInSat.Add(&valInSat, &vin.ValueSat)
a, err := tai.Addresses(w.chainParser)
if err != nil && len(a) == 1 {
if err == nil && len(a) == 1 {
vin.Addr = a[0]
}
}
@ -194,7 +194,7 @@ func (w *Worker) txFromTxAddress(txid string, ta *db.TxAddresses, bi *db.BlockIn
vout.Value = w.chainParser.AmountToDecimalString(&vout.ValueSat)
valOutSat.Add(&valOutSat, &vout.ValueSat)
a, err := tao.Addresses(w.chainParser)
if err != nil {
if err == nil {
vout.ScriptPubKey.Addresses = a
}
}
@ -204,12 +204,12 @@ func (w *Worker) txFromTxAddress(txid string, ta *db.TxAddresses, bi *db.BlockIn
feesSat.SetUint64(0)
}
r := &Tx{
Blockhash: bi.BlockHash,
Blockhash: bi.Hash,
Blockheight: int(ta.Height),
Blocktime: bi.Time.Unix(),
Blocktime: bi.Time,
Confirmations: bestheight - ta.Height + 1,
Fees: w.chainParser.AmountToDecimalString(&feesSat),
Time: bi.Time.Unix(),
Time: bi.Time,
Txid: txid,
ValueIn: w.chainParser.AmountToDecimalString(&valInSat),
ValueOut: w.chainParser.AmountToDecimalString(&valOutSat),

View File

@ -16,6 +16,7 @@ import (
type bulkAddresses struct {
height uint32
bi BlockInfo
addresses map[string][]outpoint
}
@ -156,6 +157,9 @@ func (b *BulkConnect) storeBulkAddresses(wb *gorocksdb.WriteBatch) error {
if err := b.d.storeAddresses(wb, ba.height, ba.addresses); err != nil {
return err
}
if err := b.d.writeHeight(wb, ba.height, &ba.bi, opInsert); err != nil {
return err
}
}
b.bulkAddressesCount = 0
b.bulkAddresses = b.bulkAddresses[:0]
@ -168,15 +172,12 @@ func (b *BulkConnect) ConnectBlock(block *bchain.Block, storeBlockTxs bool) erro
if !b.isUTXO {
return b.d.ConnectBlock(block)
}
wb := gorocksdb.NewWriteBatch()
defer wb.Destroy()
addresses := make(map[string][]outpoint)
if err := b.d.processAddressesUTXO(block, addresses, b.txAddressesMap, b.balances); err != nil {
return err
}
start := time.Now()
var sa bool
var storeAddressesChan, storeBalancesChan chan error
var sa bool
if len(b.txAddressesMap) > maxBulkTxAddresses || len(b.balances) > maxBulkBalances {
sa = true
if len(b.txAddressesMap)+partialStoreAddresses > maxBulkTxAddresses {
@ -189,30 +190,39 @@ func (b *BulkConnect) ConnectBlock(block *bchain.Block, storeBlockTxs bool) erro
}
}
b.bulkAddresses = append(b.bulkAddresses, bulkAddresses{
height: block.Height,
height: block.Height,
bi: BlockInfo{
Hash: block.Hash,
Time: block.Time,
Txs: uint32(len(block.Txs)),
Size: uint32(block.Size),
},
addresses: addresses,
})
b.bulkAddressesCount += len(addresses)
bac := b.bulkAddressesCount
if sa || b.bulkAddressesCount > maxBulkAddresses {
if err := b.storeBulkAddresses(wb); err != nil {
// open WriteBatch only if going to write
if sa || b.bulkAddressesCount > maxBulkAddresses || storeBlockTxs {
start := time.Now()
wb := gorocksdb.NewWriteBatch()
defer wb.Destroy()
bac := b.bulkAddressesCount
if sa || b.bulkAddressesCount > maxBulkAddresses {
if err := b.storeBulkAddresses(wb); err != nil {
return err
}
}
if storeBlockTxs {
if err := b.d.storeAndCleanupBlockTxs(wb, block); err != nil {
return err
}
}
if err := b.d.db.Write(b.d.wo, wb); err != nil {
return err
}
}
if storeBlockTxs {
if err := b.d.storeAndCleanupBlockTxs(wb, block); err != nil {
return err
if bac > b.bulkAddressesCount {
glog.Info("rocksdb: height ", b.height, ", stored ", bac, " addresses, done in ", time.Since(start))
}
}
if err := b.d.writeHeight(wb, block, opInsert); err != nil {
return err
}
if err := b.d.db.Write(b.d.wo, wb); err != nil {
return err
}
if bac > b.bulkAddressesCount {
glog.Info("rocksdb: height ", b.height, ", stored ", bac, " addresses, done in ", time.Since(start))
}
if storeAddressesChan != nil {
if err := <-storeAddressesChan; err != nil {
return err

View File

@ -237,7 +237,7 @@ func (d *RocksDB) writeBlock(block *bchain.Block, op int) error {
isUTXO := d.chainParser.IsUTXOChain()
if err := d.writeHeight(wb, block, op); err != nil {
if err := d.writeHeightFromBlock(wb, block, op); err != nil {
return err
}
if isUTXO {
@ -862,14 +862,15 @@ func (d *RocksDB) writeAddressesNonUTXO(wb *gorocksdb.WriteBatch, block *bchain.
// Block index
// BlockInfo holds information about blocks kept in column height
type BlockInfo struct {
BlockHash string
Time time.Time
Txs uint32
Size uint32
Hash string
Time int64
Txs uint32
Size uint32
}
func (d *RocksDB) packBlockInfo(block *bchain.Block) ([]byte, error) {
func (d *RocksDB) packBlockInfo(block *BlockInfo) ([]byte, error) {
packed := make([]byte, 0, 64)
varBuf := make([]byte, vlq.MaxLen64)
b, err := d.chainParser.PackBlockHash(block.Hash)
@ -878,7 +879,7 @@ func (d *RocksDB) packBlockInfo(block *bchain.Block) ([]byte, error) {
}
packed = append(packed, b...)
packed = append(packed, packUint(uint32(block.Time))...)
l := packVaruint(uint(len(block.Txs)), varBuf)
l := packVaruint(uint(block.Txs), varBuf)
packed = append(packed, varBuf[:l]...)
l = packVaruint(uint(block.Size), varBuf)
packed = append(packed, varBuf[:l]...)
@ -899,10 +900,10 @@ func (d *RocksDB) unpackBlockInfo(buf []byte) (*BlockInfo, error) {
txs, l := unpackVaruint(buf[pl+4:])
size, _ := unpackVaruint(buf[pl+4+l:])
return &BlockInfo{
BlockHash: txid,
Time: time.Unix(int64(t), 0),
Txs: uint32(txs),
Size: uint32(size),
Hash: txid,
Time: int64(t),
Txs: uint32(txs),
Size: uint32(size),
}, nil
}
@ -917,7 +918,7 @@ func (d *RocksDB) GetBestBlock() (uint32, string, error) {
if glog.V(1) {
glog.Infof("rocksdb: bestblock %d %+v", bestHeight, info)
}
return bestHeight, info.BlockHash, err
return bestHeight, info.Hash, err
}
}
return 0, "", nil
@ -935,7 +936,7 @@ func (d *RocksDB) GetBlockHash(height uint32) (string, error) {
if info == nil {
return "", err
}
return info.BlockHash, nil
return info.Hash, nil
}
// GetBlockInfo returns block info stored in db
@ -949,12 +950,20 @@ func (d *RocksDB) GetBlockInfo(height uint32) (*BlockInfo, error) {
return d.unpackBlockInfo(val.Data())
}
func (d *RocksDB) writeHeight(wb *gorocksdb.WriteBatch, block *bchain.Block, op int) error {
key := packUint(block.Height)
func (d *RocksDB) writeHeightFromBlock(wb *gorocksdb.WriteBatch, block *bchain.Block, op int) error {
return d.writeHeight(wb, block.Height, &BlockInfo{
Hash: block.Hash,
Time: block.Time,
Txs: uint32(len(block.Txs)),
Size: uint32(block.Size),
}, op)
}
func (d *RocksDB) writeHeight(wb *gorocksdb.WriteBatch, height uint32, bi *BlockInfo, op int) error {
key := packUint(height)
switch op {
case opInsert:
val, err := d.packBlockInfo(block)
val, err := d.packBlockInfo(bi)
if err != nil {
return err
}

View File

@ -16,7 +16,6 @@ import (
"sort"
"strings"
"testing"
"time"
vlq "github.com/bsm/go-vlq"
"github.com/juju/errors"
@ -731,10 +730,10 @@ func TestRocksDB_Index_UTXO(t *testing.T) {
t.Fatal(err)
}
iw := &BlockInfo{
BlockHash: "00000000eb0443fd7dc4a1ed5c686a8e995057805f9a161d9a5a77a95e72b7b6",
Txs: 4,
Size: 2345678,
Time: time.Unix(1534859123, 0),
Hash: "00000000eb0443fd7dc4a1ed5c686a8e995057805f9a161d9a5a77a95e72b7b6",
Txs: 4,
Size: 2345678,
Time: 1534859123,
}
if !reflect.DeepEqual(info, iw) {
t.Errorf("GetAddressBalance() = %+v, want %+v", info, iw)