Add aggregation to balance history

This commit is contained in:
Martin Boehm 2019-11-29 19:26:20 +01:00
parent bf3d822b87
commit 62208b9634
5 changed files with 207 additions and 38 deletions

View File

@ -7,6 +7,7 @@ import (
"encoding/json"
"errors"
"math/big"
"sort"
"time"
)
@ -298,10 +299,64 @@ func (a Utxos) Less(i, j int) bool {
// BalanceHistory contains info about one point in time of balance history
type BalanceHistory struct {
Time int64 `json:"blockTime"`
Txs int `json:"txs"`
Time uint32 `json:"blockTime"`
Txs uint32 `json:"txs"`
ReceivedSat *Amount `json:"received"`
SentSat *Amount `json:"sent"`
Txid string `json:"txid,omitempty"`
}
// BalanceHistories is array of BalanceHistory
type BalanceHistories []BalanceHistory
func (a BalanceHistories) Len() int { return len(a) }
func (a BalanceHistories) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a BalanceHistories) Less(i, j int) bool {
ti := a[i].Time
tj := a[j].Time
if ti == tj {
return a[i].Txid < a[j].Txid
}
return ti < tj
}
// SortAndAggregate sums BalanceHistories to groups defined by parameter groupByTime
func (a BalanceHistories) SortAndAggregate(groupByTime uint32) BalanceHistories {
bhs := make(BalanceHistories, 0)
if len(a) > 0 {
bha := BalanceHistory{
SentSat: &Amount{},
ReceivedSat: &Amount{},
}
sort.Sort(a)
for i := range a {
bh := &a[i]
time := bh.Time - bh.Time%groupByTime
if bha.Time != time {
if bha.Time != 0 {
// in aggregate, do not return txid as it could multiple of them
bha.Txid = ""
bhs = append(bhs, bha)
}
bha = BalanceHistory{
Time: time,
SentSat: &Amount{},
ReceivedSat: &Amount{},
}
}
if bha.Txid != bh.Txid {
bha.Txs += bh.Txs
bha.Txid = bh.Txid
}
(*big.Int)(bha.SentSat).Add((*big.Int)(bha.SentSat), (*big.Int)(bh.SentSat))
(*big.Int)(bha.ReceivedSat).Add((*big.Int)(bha.ReceivedSat), (*big.Int)(bh.ReceivedSat))
}
if bha.Txs > 0 {
bha.Txid = ""
bhs = append(bhs, bha)
}
}
return bhs
}
// Blocks is list of blocks with paging information

View File

@ -49,3 +49,115 @@ func TestAmount_MarshalJSON(t *testing.T) {
})
}
}
func TestBalanceHistories_SortAndAggregate(t *testing.T) {
tests := []struct {
name string
a BalanceHistories
groupByTime uint32
want BalanceHistories
}{
{
name: "empty",
a: []BalanceHistory{},
groupByTime: 3600,
want: []BalanceHistory{},
},
{
name: "one",
a: []BalanceHistory{
BalanceHistory{
ReceivedSat: (*Amount)(big.NewInt(1)),
SentSat: (*Amount)(big.NewInt(2)),
Time: 1521514812,
Txid: "00b2c06055e5e90e9c82bd4181fde310104391a7fa4f289b1704e5d90caa3840",
Txs: 1,
},
},
groupByTime: 3600,
want: []BalanceHistory{
BalanceHistory{
ReceivedSat: (*Amount)(big.NewInt(1)),
SentSat: (*Amount)(big.NewInt(2)),
Time: 1521514800,
Txs: 1,
},
},
},
{
name: "aggregate",
a: []BalanceHistory{
BalanceHistory{
ReceivedSat: (*Amount)(big.NewInt(1)),
SentSat: (*Amount)(big.NewInt(2)),
Time: 1521504812,
Txid: "0011223344556677889900112233445566778899001122334455667788990011",
Txs: 1,
},
BalanceHistory{
ReceivedSat: (*Amount)(big.NewInt(3)),
SentSat: (*Amount)(big.NewInt(4)),
Time: 1521504812,
Txid: "00b2c06055e5e90e9c82bd4181fde310104391a7fa4f289b1704e5d90caa3840",
Txs: 1,
},
BalanceHistory{
ReceivedSat: (*Amount)(big.NewInt(5)),
SentSat: (*Amount)(big.NewInt(6)),
Time: 1521514812,
Txid: "00b2c06055e5e90e9c82bd4181fde310104391a7fa4f289b1704e5d90caa3840",
Txs: 1,
},
BalanceHistory{
ReceivedSat: (*Amount)(big.NewInt(7)),
SentSat: (*Amount)(big.NewInt(8)),
Time: 1521504812,
Txid: "00b2c06055e5e90e9c82bd4181fde310104391a7fa4f289b1704e5d90caa3840",
Txs: 1,
},
BalanceHistory{
ReceivedSat: (*Amount)(big.NewInt(9)),
SentSat: (*Amount)(big.NewInt(10)),
Time: 1521534812,
Txid: "0011223344556677889900112233445566778899001122334455667788990011",
Txs: 1,
},
BalanceHistory{
ReceivedSat: (*Amount)(big.NewInt(11)),
SentSat: (*Amount)(big.NewInt(12)),
Time: 1521534812,
Txid: "1122334455667788990011223344556677889900112233445566778899001100",
Txs: 1,
},
},
groupByTime: 3600,
want: []BalanceHistory{
BalanceHistory{
ReceivedSat: (*Amount)(big.NewInt(11)),
SentSat: (*Amount)(big.NewInt(14)),
Time: 1521504000,
Txs: 2,
},
BalanceHistory{
ReceivedSat: (*Amount)(big.NewInt(5)),
SentSat: (*Amount)(big.NewInt(6)),
Time: 1521514800,
Txs: 1,
},
BalanceHistory{
ReceivedSat: (*Amount)(big.NewInt(20)),
SentSat: (*Amount)(big.NewInt(22)),
Time: 1521532800,
Txs: 2,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := tt.a.SortAndAggregate(tt.groupByTime); !reflect.DeepEqual(got, tt.want) {
t.Errorf("BalanceHistories.SortAndAggregate() = %+v, want %+v", got, tt.want)
}
})
}
}

View File

@ -802,26 +802,29 @@ func (w *Worker) GetAddress(address string, page int, txsOnPage int, option Acco
}
// GetBalanceHistory returns history of balance for given address
func (w *Worker) GetBalanceHistory(address string, fromTime, toTime time.Time) ([]BalanceHistory, error) {
var bhs []BalanceHistory
bh := BalanceHistory{
SentSat: &Amount{},
ReceivedSat: &Amount{},
}
func (w *Worker) GetBalanceHistory(address string, fromTime, toTime time.Time) (BalanceHistories, error) {
bhs := make(BalanceHistories, 0)
start := time.Now()
addrDesc, _, err := w.getAddrDescAndNormalizeAddress(address)
if err != nil {
return nil, err
}
fromUnix := uint32(0)
toUnix := maxUint32
fromHeight := uint32(0)
toHeight := maxInt
toHeight := maxUint32
if !fromTime.IsZero() {
fromHeight = w.is.GetBlockHeightOfTime(uint32(fromTime.Unix()))
fromUnix = uint32(fromTime.Unix())
fromHeight = w.is.GetBlockHeightOfTime(fromUnix)
}
if !toTime.IsZero() {
toHeight = int(w.is.GetBlockHeightOfTime(uint32(toTime.Unix())))
toUnix = uint32(toTime.Unix())
toHeight = w.is.GetBlockHeightOfTime(toUnix)
}
txs, err := w.getAddressTxids(addrDesc, false, &AddressFilter{Vout: AddressFilterVoutOff, FromHeight: fromHeight}, toHeight)
if fromHeight >= toHeight {
return bhs, nil
}
txs, err := w.getAddressTxids(addrDesc, false, &AddressFilter{Vout: AddressFilterVoutOff, FromHeight: fromHeight, ToHeight: toHeight}, maxInt)
if err != nil {
return nil, err
}
@ -834,45 +837,34 @@ func (w *Worker) GetBalanceHistory(address string, fromTime, toTime time.Time) (
glog.Warning("DB inconsistency: tx ", txs[txi], ": not found in txAddresses")
continue
}
counted := false
time := int64(w.is.GetBlockTime(ta.Height))
hour := time - time%3600
if bh.Time != hour {
if bh.Txs != 0 {
bhs = append(bhs, bh)
}
bh = BalanceHistory{
Time: hour,
SentSat: &Amount{},
ReceivedSat: &Amount{},
}
time := w.is.GetBlockTime(ta.Height)
if time < fromUnix || time >= toUnix {
continue
}
bh := BalanceHistory{
Time: time,
Txs: 1,
SentSat: &Amount{},
ReceivedSat: &Amount{},
Txid: txs[txi],
}
for i := range ta.Inputs {
tai := &ta.Inputs[i]
if bytes.Equal(addrDesc, tai.AddrDesc) {
(*big.Int)(bh.SentSat).Add((*big.Int)(bh.SentSat), &tai.ValueSat)
if !counted {
counted = true
bh.Txs++
}
}
}
for i := range ta.Outputs {
tao := &ta.Outputs[i]
if bytes.Equal(addrDesc, tao.AddrDesc) {
(*big.Int)(bh.ReceivedSat).Add((*big.Int)(bh.ReceivedSat), &tao.ValueSat)
if !counted {
counted = true
bh.Txs++
}
}
}
}
if bh.Txs != 0 {
bhs = append(bhs, bh)
}
glog.Info("GetBalanceHistory ", address, ", count ", len(bhs), " finished in ", time.Since(start))
return bhs, nil
bha := bhs.SortAndAggregate(3600)
glog.Info("GetBalanceHistory ", address, ", count ", len(bha), " finished in ", time.Since(start))
return bha, nil
}
func (w *Worker) waitForBackendSync() {

View File

@ -1044,11 +1044,12 @@ func (s *PublicServer) apiBalanceHistory(r *http.Request, apiVersion int) (inter
}
t := r.URL.Query().Get("from")
if t != "" {
fromTime, _ = time.Parse("2006-02-01", t)
fromTime, _ = time.Parse("2006-01-02", t)
}
t = r.URL.Query().Get("to")
if t != "" {
toTime, _ = time.Parse("2006-02-01", t)
// time.RFC3339
toTime, _ = time.Parse("2006-01-02", t)
}
history, err = s.api.GetUtxoBalanceHistory(r.URL.Path[i+1:], gap)
if err == nil {

View File

@ -627,6 +627,15 @@ func httpTestsBitcoinType(t *testing.T, ts *httptest.Server) {
`[{"blockTime":1521514800,"txs":1,"received":"9876","sent":"0"},{"blockTime":1521594000,"txs":1,"received":"9000","sent":"9876"}]`,
},
},
{
name: "apiBalanceHistory Addr2 v2 from=2018-03-20&to=2018-03-21",
r: newGetRequest(ts.URL + "/api/v2/balancehistory/mtGXQvBowMkBpnhLckhxhbwYK44Gs9eEtz?from=2018-03-20&to=2018-03-21"),
status: http.StatusOK,
contentType: "application/json; charset=utf-8",
body: []string{
`[{"blockTime":1521514800,"txs":1,"received":"12345","sent":"0"}]`,
},
},
{
name: "apiSendTx",
r: newGetRequest(ts.URL + "/api/v2/sendtx/1234567890"),