diff --git a/blockbook.go b/blockbook.go index 732a6932..f8f41d24 100644 --- a/blockbook.go +++ b/blockbook.go @@ -31,6 +31,7 @@ type Index interface { GetTransactions(address string, lower uint32, higher uint32, fn func(txid string) error) error ConnectBlock(block *bitcoin.Block) error DisconnectBlock(block *bitcoin.Block) error + DisconnectBlocks(lower uint32, higher uint32) error } var ( @@ -92,8 +93,13 @@ func main() { } defer db.Close() - var httpServer *server.HttpServer + if *resync { + if err := resyncIndex(rpc, db); err != nil { + log.Fatalf("resyncIndex %v", err) + } + } + var httpServer *server.HttpServer if *httpServerBinding != "" { httpServer, err = server.New(*httpServerBinding, db) if err != nil { @@ -115,12 +121,6 @@ func main() { } } - if *resync { - if err := resyncIndex(rpc, db); err != nil { - log.Fatalf("resyncIndex %v", err) - } - } - if *blockHeight >= 0 { if *blockUntil < 0 { *blockUntil = *blockHeight @@ -166,7 +166,7 @@ func waitForSignalAndShutdown(s *server.HttpServer, mq *bitcoin.MQ, timeout time ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() - log.Printf("Shutdown with reason: %v", sig) + log.Printf("Shutdown: %v", sig) if mq != nil { if err := mq.Shutdown(); err != nil { @@ -192,11 +192,18 @@ func resyncIndex(chain Blockchain, index Index) error { if err != nil { return err } - _, local, err := index.GetBestBlock() + localBestHeight, local, err := index.GetBestBlock() if err != nil { local = "" } + // If the locally indexed block is the same as the best block on the + // network, we're done. + if local == remote { + log.Printf("resync: synced on %d %s", localBestHeight, local) + return nil + } + // If the local block is missing, we're indexing from the genesis block. if local == "" { log.Printf("resync: genesis") @@ -205,14 +212,7 @@ func resyncIndex(chain Blockchain, index Index) error { if err != nil { return err } - return connectBlock(chain, index, hash) - } - - // If the locally indexed block is the same as the best block on the - // network, we're done. - if local == remote { - log.Printf("resync: synced on %s", local) - return nil + return connectBlocks(chain, index, hash) } // Is local tip on the best chain? @@ -232,15 +232,32 @@ func resyncIndex(chain Blockchain, index Index) error { if forked { log.Printf("resync: local is forked") - // TODO: resync after disconnecting - return disconnectBlock(chain, index, header.Hash) - } else { - log.Printf("resync: local is behind") - return connectBlock(chain, index, header.Next) + var height uint32 + for height = localBestHeight - 1; height >= 0; height-- { + local, err = index.GetBlockHash(height) + if err != nil { + return err + } + remote, err = chain.GetBlockHash(height) + if err != nil { + return err + } + if local == remote { + break + } + } + err = index.DisconnectBlocks(height+1, localBestHeight) + if err != nil { + return err + } + return resyncIndex(chain, index) } + + log.Printf("resync: local is behind") + return connectBlocks(chain, index, header.Next) } -func connectBlock( +func connectBlocks( chain Blockchain, index Index, hash string, @@ -264,14 +281,6 @@ func connectBlock( return nil } -func disconnectBlock( - chain Blockchain, - index Index, - hash string, -) error { - return nil -} - func connectBlocksParallel( chain Blockchain, index Index, diff --git a/db/rocksdb.go b/db/rocksdb.go index ee7baf9d..ed9ebea1 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -338,6 +338,64 @@ func (d *RocksDB) writeHeight( return nil } +// DisconnectBlocks removes all data belonging to blocks in range lower-higher +func (d *RocksDB) DisconnectBlocks( + lower uint32, + higher uint32, +) error { + log.Printf("rocksdb: disconnecting blocks %d-%d", lower, higher) + it := d.db.NewIteratorCF(d.ro, d.cfh[cfOutputs]) + defer it.Close() + outputKeys := [][]byte{} + outputValues := [][]byte{} + var totalOutputs uint64 + for it.SeekToFirst(); it.Valid(); it.Next() { + totalOutputs++ + key := it.Key().Data() + l := len(key) + if l > 4 { + height := unpackUint(key[l-4 : l]) + if height >= lower && height <= higher { + outputKey := make([]byte, len(key)) + copy(outputKey, key) + outputKeys = append(outputKeys, outputKey) + value := it.Value().Data() + outputValue := make([]byte, len(value)) + copy(outputValue, value) + outputValues = append(outputValues, outputValue) + } + } + } + log.Printf("rocksdb: about to disconnect %d outputs from %d", len(outputKeys), totalOutputs) + wb := gorocksdb.NewWriteBatch() + defer wb.Destroy() + for i := 0; i < len(outputKeys); i++ { + log.Printf("output %s", hex.EncodeToString(outputKeys[i])) + wb.DeleteCF(d.cfh[cfOutputs], outputKeys[i]) + outpoints, err := unpackOutputValue(outputValues[i]) + if err != nil { + return err + } + for _, o := range outpoints { + boutpoint, err := packOutpoint(o.txid, o.vout) + if err != nil { + return err + } + log.Printf("input %s", hex.EncodeToString(boutpoint)) + wb.DeleteCF(d.cfh[cfInputs], boutpoint) + } + } + for height := lower; height <= higher; height++ { + log.Printf("height %d", height) + wb.DeleteCF(d.cfh[cfHeight], packUint(height)) + } + err := d.db.Write(d.wo, wb) + if err == nil { + log.Printf("rocksdb: blocks %d-%d disconnected", lower, higher) + } + return err +} + // Helpers const txIdUnpackedLen = 32