resyncIndex DisconnectBlocks

This commit is contained in:
Martin Boehm 2018-01-28 00:59:54 +01:00
parent 8226fae4dc
commit 9afa5b4187
2 changed files with 98 additions and 31 deletions

View File

@ -31,6 +31,7 @@ type Index interface {
GetTransactions(address string, lower uint32, higher uint32, fn func(txid string) error) error
ConnectBlock(block *bitcoin.Block) error
DisconnectBlock(block *bitcoin.Block) error
DisconnectBlocks(lower uint32, higher uint32) error
}
var (
@ -92,8 +93,13 @@ func main() {
}
defer db.Close()
var httpServer *server.HttpServer
if *resync {
if err := resyncIndex(rpc, db); err != nil {
log.Fatalf("resyncIndex %v", err)
}
}
var httpServer *server.HttpServer
if *httpServerBinding != "" {
httpServer, err = server.New(*httpServerBinding, db)
if err != nil {
@ -115,12 +121,6 @@ func main() {
}
}
if *resync {
if err := resyncIndex(rpc, db); err != nil {
log.Fatalf("resyncIndex %v", err)
}
}
if *blockHeight >= 0 {
if *blockUntil < 0 {
*blockUntil = *blockHeight
@ -166,7 +166,7 @@ func waitForSignalAndShutdown(s *server.HttpServer, mq *bitcoin.MQ, timeout time
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
log.Printf("Shutdown with reason: %v", sig)
log.Printf("Shutdown: %v", sig)
if mq != nil {
if err := mq.Shutdown(); err != nil {
@ -192,11 +192,18 @@ func resyncIndex(chain Blockchain, index Index) error {
if err != nil {
return err
}
_, local, err := index.GetBestBlock()
localBestHeight, local, err := index.GetBestBlock()
if err != nil {
local = ""
}
// If the locally indexed block is the same as the best block on the
// network, we're done.
if local == remote {
log.Printf("resync: synced on %d %s", localBestHeight, local)
return nil
}
// If the local block is missing, we're indexing from the genesis block.
if local == "" {
log.Printf("resync: genesis")
@ -205,14 +212,7 @@ func resyncIndex(chain Blockchain, index Index) error {
if err != nil {
return err
}
return connectBlock(chain, index, hash)
}
// If the locally indexed block is the same as the best block on the
// network, we're done.
if local == remote {
log.Printf("resync: synced on %s", local)
return nil
return connectBlocks(chain, index, hash)
}
// Is local tip on the best chain?
@ -232,15 +232,32 @@ func resyncIndex(chain Blockchain, index Index) error {
if forked {
log.Printf("resync: local is forked")
// TODO: resync after disconnecting
return disconnectBlock(chain, index, header.Hash)
} else {
log.Printf("resync: local is behind")
return connectBlock(chain, index, header.Next)
var height uint32
for height = localBestHeight - 1; height >= 0; height-- {
local, err = index.GetBlockHash(height)
if err != nil {
return err
}
remote, err = chain.GetBlockHash(height)
if err != nil {
return err
}
if local == remote {
break
}
}
err = index.DisconnectBlocks(height+1, localBestHeight)
if err != nil {
return err
}
return resyncIndex(chain, index)
}
log.Printf("resync: local is behind")
return connectBlocks(chain, index, header.Next)
}
func connectBlock(
func connectBlocks(
chain Blockchain,
index Index,
hash string,
@ -264,14 +281,6 @@ func connectBlock(
return nil
}
func disconnectBlock(
chain Blockchain,
index Index,
hash string,
) error {
return nil
}
func connectBlocksParallel(
chain Blockchain,
index Index,

View File

@ -338,6 +338,64 @@ func (d *RocksDB) writeHeight(
return nil
}
// DisconnectBlocks removes all data belonging to blocks in range lower-higher
func (d *RocksDB) DisconnectBlocks(
lower uint32,
higher uint32,
) error {
log.Printf("rocksdb: disconnecting blocks %d-%d", lower, higher)
it := d.db.NewIteratorCF(d.ro, d.cfh[cfOutputs])
defer it.Close()
outputKeys := [][]byte{}
outputValues := [][]byte{}
var totalOutputs uint64
for it.SeekToFirst(); it.Valid(); it.Next() {
totalOutputs++
key := it.Key().Data()
l := len(key)
if l > 4 {
height := unpackUint(key[l-4 : l])
if height >= lower && height <= higher {
outputKey := make([]byte, len(key))
copy(outputKey, key)
outputKeys = append(outputKeys, outputKey)
value := it.Value().Data()
outputValue := make([]byte, len(value))
copy(outputValue, value)
outputValues = append(outputValues, outputValue)
}
}
}
log.Printf("rocksdb: about to disconnect %d outputs from %d", len(outputKeys), totalOutputs)
wb := gorocksdb.NewWriteBatch()
defer wb.Destroy()
for i := 0; i < len(outputKeys); i++ {
log.Printf("output %s", hex.EncodeToString(outputKeys[i]))
wb.DeleteCF(d.cfh[cfOutputs], outputKeys[i])
outpoints, err := unpackOutputValue(outputValues[i])
if err != nil {
return err
}
for _, o := range outpoints {
boutpoint, err := packOutpoint(o.txid, o.vout)
if err != nil {
return err
}
log.Printf("input %s", hex.EncodeToString(boutpoint))
wb.DeleteCF(d.cfh[cfInputs], boutpoint)
}
}
for height := lower; height <= higher; height++ {
log.Printf("height %d", height)
wb.DeleteCF(d.cfh[cfHeight], packUint(height))
}
err := d.db.Write(d.wo, wb)
if err == nil {
log.Printf("rocksdb: blocks %d-%d disconnected", lower, higher)
}
return err
}
// Helpers
const txIdUnpackedLen = 32