Implement loading of transactions in GetAddressForXpub - WIP

This commit is contained in:
Martin Boehm 2019-02-03 23:42:44 +01:00
parent 57b40ad6dc
commit 9d3cd3b3e9
4 changed files with 241 additions and 101 deletions

View File

@ -10,6 +10,7 @@ import (
"time"
)
const maxUint32 = ^uint32(0)
const maxInt = int(^uint(0) >> 1)
// GetAddressOption specifies what data returns GetAddress api call

View File

@ -582,6 +582,39 @@ func (w *Worker) getEthereumTypeAddressBalances(addrDesc bchain.AddressDescripto
return ba, tokens, ci, n, nonContractTxs, totalResults, nil
}
func (w *Worker) txFromTxid(txid string, bestheight uint32, option GetAddressOption) (*Tx, error) {
var tx *Tx
var err error
// only ChainBitcoinType supports TxHistoryLight
if option == TxHistoryLight && w.chainType == bchain.ChainBitcoinType {
ta, err := w.db.GetTxAddresses(txid)
if err != nil {
return nil, errors.Annotatef(err, "GetTxAddresses %v", txid)
}
if ta == nil {
glog.Warning("DB inconsistency: tx ", txid, ": not found in txAddresses")
// as fallback, provide empty TxAddresses to return at least something
ta = &db.TxAddresses{}
}
bi, err := w.db.GetBlockInfo(ta.Height)
if err != nil {
return nil, errors.Annotatef(err, "GetBlockInfo %v", ta.Height)
}
if bi == nil {
glog.Warning("DB inconsistency: block height ", ta.Height, ": not found in db")
// provide empty BlockInfo to return the rest of tx data
bi = &db.BlockInfo{}
}
tx = w.txFromTxAddress(txid, ta, bi, bestheight)
} else {
tx, err = w.GetTransaction(txid, false, true)
if err != nil {
return nil, errors.Annotatef(err, "GetTransaction %v", txid)
}
}
return tx, nil
}
// GetAddress computes address value and gets transactions for given address
func (w *Worker) GetAddress(address string, page int, txsOnPage int, option GetAddressOption, filter *AddressFilter) (*Address, error) {
start := time.Now()
@ -703,32 +736,8 @@ func (w *Worker) GetAddress(address string, page int, txsOnPage int, option GetA
if option == TxidHistory {
txids[txi] = txid
} else {
// only ChainBitcoinType supports TxHistoryLight
if option == TxHistoryLight && w.chainType == bchain.ChainBitcoinType {
ta, err := w.db.GetTxAddresses(txid)
if err != nil {
return nil, errors.Annotatef(err, "GetTxAddresses %v", txid)
}
if ta == nil {
glog.Warning("DB inconsistency: tx ", txid, ": not found in txAddresses")
// as fallback, provide empty TxAddresses to return at least something
ta = &db.TxAddresses{}
}
bi, err := w.db.GetBlockInfo(ta.Height)
if err != nil {
return nil, errors.Annotatef(err, "GetBlockInfo %v", ta.Height)
}
if bi == nil {
glog.Warning("DB inconsistency: block height ", ta.Height, ": not found in db")
// provide empty BlockInfo to return the rest of tx data
bi = &db.BlockInfo{}
}
txs[txi] = w.txFromTxAddress(txid, ta, bi, bestheight)
} else {
txs[txi], err = w.GetTransaction(txid, false, true)
if err != nil {
return nil, errors.Annotatef(err, "GetTransaction %v", txid)
}
if txs[txi], err = w.txFromTxid(txid, bestheight, option); err != nil {
return nil, err
}
}
txi++

View File

@ -5,6 +5,7 @@ import (
"blockbook/db"
"fmt"
"math/big"
"sort"
"sync"
"time"
@ -15,19 +16,31 @@ import (
const xpubLen = 111
const defaultAddressesGap = 20
const txInput = 1
const txOutput = 2
var cachedXpubs = make(map[string]*xpubData)
var cachedXpubsMux sync.Mutex
type txHeight struct {
txid string
height uint32
addrIndex uint32
type xpubTxid struct {
txid string
height uint32
inputOutput byte
}
type xpubTxids []xpubTxid
func (a xpubTxids) Len() int { return len(a) }
func (a xpubTxids) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a xpubTxids) Less(i, j int) bool { return a[i].height >= a[j].height }
type xpubAddress struct {
addrDesc bchain.AddressDescriptor
balance *db.AddrBalance
bottomHeight uint32
addrDesc bchain.AddressDescriptor
balance *db.AddrBalance
txs uint32
maxHeight uint32
complete bool
txids xpubTxids
}
type xpubData struct {
@ -39,71 +52,101 @@ type xpubData struct {
balanceSat big.Int
addresses []xpubAddress
changeAddresses []xpubAddress
txids []txHeight
}
func (w *Worker) getAddressTxHeights(addrDesc bchain.AddressDescriptor, addrIndex uint32, mempool bool, filter *AddressFilter, maxResults int) ([]txHeight, error) {
func (w *Worker) xpubGetAddressTxids(addrDesc bchain.AddressDescriptor, mempool bool, fromHeight, toHeight uint32, maxResults int) ([]xpubTxid, bool, error) {
var err error
txHeights := make([]txHeight, 0, 4)
complete := true
txs := make([]xpubTxid, 0, 4)
var callback db.GetTransactionsCallback
if filter.Vout == AddressFilterVoutOff {
callback = func(txid string, height uint32, indexes []int32) error {
txHeights = append(txHeights, txHeight{txid, height, addrIndex})
// take all txs in the last found block even if it exceeds maxResults
if len(txHeights) >= maxResults && txHeights[len(txHeights)-1].height != height {
return &db.StopIteration{}
}
return nil
callback = func(txid string, height uint32, indexes []int32) error {
// take all txs in the last found block even if it exceeds maxResults
if len(txs) >= maxResults && txs[len(txs)-1].height != height {
complete = false
return &db.StopIteration{}
}
} else {
callback = func(txid string, height uint32, indexes []int32) error {
for _, index := range indexes {
vout := index
if vout < 0 {
vout = ^vout
}
if (filter.Vout == AddressFilterVoutInputs && index < 0) ||
(filter.Vout == AddressFilterVoutOutputs && index >= 0) ||
(vout == int32(filter.Vout)) {
txHeights = append(txHeights, txHeight{txid, height, addrIndex})
if len(txHeights) >= maxResults {
return &db.StopIteration{}
}
break
}
inputOutput := byte(0)
for _, index := range indexes {
if index < 0 {
inputOutput |= txInput
} else {
inputOutput |= txOutput
}
return nil
}
txs = append(txs, xpubTxid{txid, height, inputOutput})
return nil
}
if mempool {
uniqueTxs := make(map[string]struct{})
uniqueTxs := make(map[string]int)
o, err := w.chain.GetMempoolTransactionsForAddrDesc(addrDesc)
if err != nil {
return nil, err
return nil, false, err
}
for _, m := range o {
if _, found := uniqueTxs[m.Txid]; !found {
l := len(txHeights)
if l, found := uniqueTxs[m.Txid]; !found {
l = len(txs)
callback(m.Txid, 0, []int32{m.Vout})
if len(txHeights) > l {
uniqueTxs[m.Txid] = struct{}{}
if len(txs) > l {
uniqueTxs[m.Txid] = l - 1
}
} else {
if m.Vout < 0 {
txs[l].inputOutput |= txInput
} else {
txs[l].inputOutput |= txOutput
}
}
}
} else {
to := filter.ToHeight
if to == 0 {
to = ^uint32(0)
}
err = w.db.GetAddrDescTransactions(addrDesc, filter.FromHeight, to, callback)
err = w.db.GetAddrDescTransactions(addrDesc, fromHeight, toHeight, callback)
if err != nil {
return nil, err
return nil, false, err
}
}
return txHeights, nil
return txs, complete, nil
}
func (w *Worker) derivedAddressBalance(data *xpubData, ad *xpubAddress) (bool, error) {
func (w *Worker) xpubCheckAndLoadTxids(ad *xpubAddress, filter *AddressFilter, maxHeight uint32, pageSize int) error {
// skip if not discovered
if ad.balance == nil {
return nil
}
// if completely read, check if there are not some new txs and load if necessary
if ad.complete {
if ad.balance.Txs != ad.txs {
newTxids, _, err := w.xpubGetAddressTxids(ad.addrDesc, false, ad.maxHeight+1, maxHeight, maxInt)
if err == nil {
ad.txids = append(newTxids, ad.txids...)
ad.maxHeight = maxHeight
ad.txs = uint32(len(ad.txids))
if ad.txs != ad.balance.Txs {
glog.Warning("xpubCheckAndLoadTxids inconsistency ", ad.addrDesc, ", ad.txs=", ad.txs, ", ad.balance.Txs=", ad.balance.Txs)
}
}
return err
}
}
// unless the filter is completely off, load all txids
if filter.FromHeight != 0 || filter.ToHeight != 0 || filter.Vout != AddressFilterVoutOff {
pageSize = maxInt
}
newTxids, complete, err := w.xpubGetAddressTxids(ad.addrDesc, false, 0, maxHeight, pageSize)
if err != nil {
return err
}
ad.txids = newTxids
ad.complete = complete
ad.maxHeight = maxHeight
if complete {
ad.txs = uint32(len(ad.txids))
if ad.txs != ad.balance.Txs {
glog.Warning("xpubCheckAndLoadTxids inconsistency ", ad.addrDesc, ", ad.txs=", ad.txs, ", ad.balance.Txs=", ad.balance.Txs)
}
}
return nil
}
func (w *Worker) xpubDerivedAddressBalance(data *xpubData, ad *xpubAddress) (bool, error) {
var err error
if ad.balance, err = w.db.GetAddrDescBalance(ad.addrDesc); err != nil {
return false, err
@ -117,15 +160,19 @@ func (w *Worker) derivedAddressBalance(data *xpubData, ad *xpubAddress) (bool, e
return false, nil
}
func (w *Worker) scanAddresses(xpub string, data *xpubData, addresses []xpubAddress, gap int, change int, minDerivedIndex int, fork bool) (int, []xpubAddress, error) {
func (w *Worker) xpubScanAddresses(xpub string, data *xpubData, addresses []xpubAddress, gap int, change int, minDerivedIndex int, fork bool) (int, []xpubAddress, error) {
// rescan known addresses
lastUsed := 0
for i := range addresses {
ad := &addresses[i]
if fork {
ad.bottomHeight = 0
// reset the cached data
ad.txs = 0
ad.maxHeight = 0
ad.complete = false
ad.txids = nil
}
used, err := w.derivedAddressBalance(data, ad)
used, err := w.xpubDerivedAddressBalance(data, ad)
if err != nil {
return 0, nil, err
}
@ -147,7 +194,7 @@ func (w *Worker) scanAddresses(xpub string, data *xpubData, addresses []xpubAddr
}
for i, a := range descriptors {
ad := xpubAddress{addrDesc: a}
used, err := w.derivedAddressBalance(data, &ad)
used, err := w.xpubDerivedAddressBalance(data, &ad)
if err != nil {
return 0, nil, err
}
@ -192,9 +239,19 @@ func (w *Worker) GetAddressForXpub(xpub string, page int, txsOnPage int, option
cachedXpubsMux.Lock()
data, found := cachedXpubs[xpub]
cachedXpubsMux.Unlock()
// to load all data for xpub may take some time, perform it in a loop to process a possible new block
var (
txm []string
txs []*Tx
txids []string
pg Paging
totalResults int
err error
bestheight uint32
besthash string
)
// to load all data for xpub may take some time, do it in a loop to process a possible new block
for {
bestheight, besthash, err := w.db.GetBestBlock()
bestheight, besthash, err = w.db.GetBestBlock()
if err != nil {
return nil, errors.Annotatef(err, "GetBestBlock")
}
@ -210,9 +267,8 @@ func (w *Worker) GetAddressForXpub(xpub string, page int, txsOnPage int, option
return nil, err
}
if hash != data.dataHash {
// in case of for reset all cached txids
// in case of for reset all cached data
fork = true
data.txids = nil
}
}
processedHash = besthash
@ -220,19 +276,101 @@ func (w *Worker) GetAddressForXpub(xpub string, page int, txsOnPage int, option
data.dataHeight = bestheight
data.dataHash = besthash
var lastUsedIndex int
lastUsedIndex, data.addresses, err = w.scanAddresses(xpub, data, data.addresses, gap, 0, 0, fork)
lastUsedIndex, data.addresses, err = w.xpubScanAddresses(xpub, data, data.addresses, gap, 0, 0, fork)
if err != nil {
return nil, err
}
_, data.changeAddresses, err = w.scanAddresses(xpub, data, data.changeAddresses, gap, 1, lastUsedIndex, fork)
_, data.changeAddresses, err = w.xpubScanAddresses(xpub, data, data.changeAddresses, gap, 1, lastUsedIndex, fork)
if err != nil {
return nil, err
}
}
if option >= TxidHistory {
for i := range data.addresses {
if err = w.xpubCheckAndLoadTxids(&data.addresses[i], filter, bestheight, txsOnPage); err != nil {
return nil, err
}
}
for i := range data.changeAddresses {
if err = w.xpubCheckAndLoadTxids(&data.changeAddresses[i], filter, bestheight, txsOnPage); err != nil {
return nil, err
}
}
}
}
cachedXpubsMux.Lock()
cachedXpubs[xpub] = data
cachedXpubsMux.Unlock()
// TODO mempool
if option >= TxidHistory {
txc := make(xpubTxids, 0, 32)
var addTxids func(ad *xpubAddress)
if filter.FromHeight != 0 || filter.ToHeight != 0 || filter.Vout != AddressFilterVoutOff {
addTxids = func(ad *xpubAddress) {
txc = append(txc, ad.txids...)
}
totalResults = int(data.txs)
} else {
toHeight := maxUint32
if filter.ToHeight != 0 {
toHeight = filter.ToHeight
}
addTxids = func(ad *xpubAddress) {
for _, txid := range ad.txids {
if txid.height < filter.FromHeight || txid.height > toHeight {
continue
}
if filter.Vout != AddressFilterVoutOff {
if filter.Vout == AddressFilterVoutInputs && txid.inputOutput&txInput == 0 ||
filter.Vout == AddressFilterVoutOutputs && txid.inputOutput&txOutput == 0 {
continue
}
}
txc = append(txc, txid)
}
}
totalResults = -1
}
for i := range data.addresses {
addTxids(&data.addresses[i])
}
for i := range data.changeAddresses {
addTxids(&data.changeAddresses[i])
}
sort.Stable(txc)
var from, to int
pg, from, to, page = computePaging(len(txc), page, txsOnPage)
if len(txc) >= txsOnPage {
if totalResults < 0 {
pg.TotalPages = -1
} else {
pg, _, _, _ = computePaging(totalResults, page, txsOnPage)
}
}
if option == TxidHistory {
txids = make([]string, len(txm)+to-from)
} else {
txs = make([]*Tx, len(txm)+to-from)
}
txi := 0
// get confirmed transactions
for i := from; i < to; i++ {
xpubTxid := &txc[i]
if option == TxidHistory {
txids[txi] = xpubTxid.txid
} else {
if txs[txi], err = w.txFromTxid(xpubTxid.txid, bestheight, option); err != nil {
return nil, err
}
}
txi++
}
if option == TxidHistory {
txids = txids[:txi]
} else if option >= TxHistoryLight {
txs = txs[:txi]
}
}
totalTokens := 0
tokens := make([]Token, 0, 4)
for i := range data.addresses {
@ -256,7 +394,7 @@ func (w *Worker) GetAddressForXpub(xpub string, page int, txsOnPage int, option
var totalReceived big.Int
totalReceived.Add(&data.balanceSat, &data.sentSat)
addr := Address{
// Paging: pg,
Paging: pg,
AddrStr: xpub,
BalanceSat: (*Amount)(&data.balanceSat),
TotalReceivedSat: (*Amount)(&totalReceived),
@ -264,13 +402,11 @@ func (w *Worker) GetAddressForXpub(xpub string, page int, txsOnPage int, option
Txs: int(data.txs),
// UnconfirmedBalanceSat: (*Amount)(&uBalSat),
// UnconfirmedTxs: len(txm),
// Transactions: txs,
// Txids: txids,
TotalTokens: totalTokens,
Tokens: tokens,
// Erc20Contract: erc20c,
// Nonce: nonce,
Transactions: txs,
Txids: txids,
TotalTokens: totalTokens,
Tokens: tokens,
}
glog.Info("GetAddressForXpub ", xpub[:10], ", ", len(data.addresses)+len(data.changeAddresses), " derived addresses, ", data.txs, " total txs finished in ", time.Since(start))
glog.Info("GetAddressForXpub ", xpub[:16], ", ", len(data.addresses)+len(data.changeAddresses), " derived addresses, ", data.txs, " total txs finished in ", time.Since(start))
return &addr, nil
}

View File

@ -26,12 +26,12 @@
<td class="data">{{$addr.Txs}}</td>
</tr>
<tr>
<td>Total XPUB addresses</td>
<td>Total XPUB Addresses</td>
<td class="data">{{$addr.TotalTokens}}</td>
</tr>
{{- if $addr.Tokens -}}
<tr>
<td>{{if $data.AllTokens}}XPUB Addresses{{else}}Nonzero XPUB Addresses{{end}}</td>
<td>{{if $data.AllTokens}}XPUB Addresses{{else}}XPUB Addresses with Balance{{end}}</td>
<td style="padding: 0;">
<table class="table data-table">
<tbody>
@ -93,12 +93,6 @@
<option>All</option>
<option {{if eq $addr.Filter "inputs" -}} selected{{end}} value="inputs">Inputs</option>
<option {{if eq $addr.Filter "outputs" -}} selected{{end}} value="outputs">Outputs</option>
{{- if $addr.Tokens -}}
<option {{if eq $addr.Filter "0" -}} selected{{end}} value="0">Non-contract</option>
{{- range $t := $addr.Tokens -}}
<option {{if eq $addr.Filter $t.ContractIndex -}} selected{{end}} value="{{$t.ContractIndex}}">{{$t.Name}}</option>
{{- end -}}
{{- end -}}
</select>
<div class="col-md-7">
<nav>{{template "paging" $data}}</nav>