diff --git a/db/rocksdb.go b/db/rocksdb.go index b3eacf6c..864e7c89 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -1765,15 +1765,64 @@ func (d *RocksDB) ComputeInternalStateColumnStats(stopCompute chan os.Signal) er return nil } -func (d *RocksDB) fixUtxo(ba *AddrBalance) error { +func (d *RocksDB) fixUtxo(addrDesc bchain.AddressDescriptor, ba *AddrBalance) (bool, error) { var checksum big.Int for i := range ba.Utxos { checksum.Add(&checksum, &ba.Utxos[i].ValueSat) } if checksum.Cmp(&ba.BalanceSat) != 0 { - return errors.Errorf("balance %s, checksum %s, txs %d", ba.BalanceSat.String(), checksum.String(), ba.Txs) + var checksumFromTxs big.Int + var utxos []Utxo + err := d.GetAddrDescTransactions(addrDesc, 0, ^uint32(0), func(txid string, height uint32, indexes []int32) error { + var ta *TxAddresses + var err error + for _, index := range indexes { + // take only outputs + if index >= 0 { + if ta == nil { + ta, err = d.GetTxAddresses(txid) + if err != nil { + return err + } + } + if ta == nil { + return errors.New("DB inconsistency: tx " + txid + ": not found in txAddresses") + } else { + if len(ta.Outputs) <= int(index) { + glog.Warning("DB inconsistency: txAddresses " + txid + " does not have enough outputs") + } else { + tao := &ta.Outputs[index] + if !tao.Spent { + bTxid, _ := d.chainParser.PackTxid(txid) + checksumFromTxs.Add(&checksumFromTxs, &tao.ValueSat) + utxos = append(utxos, Utxo{BtxID: bTxid, Height: height, Vout: index, ValueSat: tao.ValueSat}) + } + } + } + } + } + return nil + }) + if err != nil { + return false, err + } + fixed := false + if checksumFromTxs.Cmp(&ba.BalanceSat) == 0 { + ba.Utxos = utxos + wb := gorocksdb.NewWriteBatch() + err = d.storeBalances(wb, map[string]*AddrBalance{string(addrDesc): ba}) + if err == nil { + err = d.db.Write(d.wo, wb) + } + wb.Destroy() + if err != nil { + return false, errors.Errorf("balance %s, checksum %s, from txa %s, txs %d, error storing fixed utxos %v", ba.BalanceSat.String(), checksum.String(), checksumFromTxs.String(), ba.Txs, err) + } + fixed = true + } + return fixed, errors.Errorf("balance %s, checksum %s, from txa %s, txs %d", ba.BalanceSat.String(), checksum.String(), checksumFromTxs.String(), ba.Txs) } - return nil + return false, nil } // FixUtxos checks and fixes possible @@ -1783,7 +1832,7 @@ func (d *RocksDB) FixUtxos(stop chan os.Signal) error { return nil } glog.Info("FixUtxos: starting") - var row, errorsCount int64 + var row, errorsCount, fixedCount int64 var seekKey []byte // do not use cache ro := gorocksdb.NewDefaultReadOptions() @@ -1810,16 +1859,22 @@ func (d *RocksDB) FixUtxos(stop chan os.Signal) error { row++ if len(buf) < 3 { glog.Error("FixUtxos: row ", row, ", addrDesc ", addrDesc, ", empty data") + errorsCount++ continue } ba, err := unpackAddrBalance(buf, d.chainParser.PackedTxidLen(), AddressBalanceDetailUTXO) if err != nil { glog.Error("FixUtxos: row ", row, ", addrDesc ", addrDesc, ", unpackAddrBalance error ", err) + errorsCount++ continue } - err = d.fixUtxo(ba) + fixed, err := d.fixUtxo(addrDesc, ba) if err != nil { - glog.Error("FixUtxos: row ", row, ", addrDesc ", addrDesc, ", error ", err) + errorsCount++ + glog.Error("FixUtxos: row ", row, ", addrDesc ", addrDesc, ", error ", err, ", fixed ", fixed) + if fixed { + fixedCount++ + } } } seekKey = append([]byte{}, addrDesc...) @@ -1829,7 +1884,7 @@ func (d *RocksDB) FixUtxos(stop chan os.Signal) error { break } } - glog.Info("FixUtxos: finished, scanned ", row, " rows, found ", errorsCount, " errors") + glog.Info("FixUtxos: finished, scanned ", row, " rows, found ", errorsCount, " errors, fixed ", fixedCount) return nil }