From bf3d822b873a4b646152de3b30f306839b149486 Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Tue, 19 Nov 2019 08:46:47 +0100 Subject: [PATCH] Add filter from-to to balance history --- api/types.go | 1 - api/worker.go | 58 ++++++++++++++++----------------- common/internalstate.go | 50 ++++++++++++++++++++++++++++ db/bulkconnect.go | 6 ++++ db/rocksdb.go | 38 +++++++++++++++++++-- db/rocksdb_ethereumtype.go | 1 + db/rocksdb_ethereumtype_test.go | 20 ++++++++++++ db/rocksdb_test.go | 28 ++++++++++++++++ server/public.go | 11 ++++++- server/public_test.go | 11 +++++-- 10 files changed, 188 insertions(+), 36 deletions(-) diff --git a/api/types.go b/api/types.go index 5d739846..4e9f22fa 100644 --- a/api/types.go +++ b/api/types.go @@ -302,7 +302,6 @@ type BalanceHistory struct { Txs int `json:"txs"` ReceivedSat *Amount `json:"received"` SentSat *Amount `json:"sent"` - BalanceSat *Amount `json:"balance"` } // Blocks is list of blocks with paging information diff --git a/api/worker.go b/api/worker.go index 7ad85f4b..71b87097 100644 --- a/api/worker.go +++ b/api/worker.go @@ -802,17 +802,26 @@ func (w *Worker) GetAddress(address string, page int, txsOnPage int, option Acco } // GetBalanceHistory returns history of balance for given address -func (w *Worker) GetBalanceHistory(address string) ([]BalanceHistory, error) { - var bh []BalanceHistory - var b BalanceHistory - var bi *db.BlockInfo - var balance big.Int +func (w *Worker) GetBalanceHistory(address string, fromTime, toTime time.Time) ([]BalanceHistory, error) { + var bhs []BalanceHistory + bh := BalanceHistory{ + SentSat: &Amount{}, + ReceivedSat: &Amount{}, + } start := time.Now() addrDesc, _, err := w.getAddrDescAndNormalizeAddress(address) if err != nil { return nil, err } - txs, err := w.getAddressTxids(addrDesc, false, &AddressFilter{Vout: AddressFilterVoutOff}, maxInt) + fromHeight := uint32(0) + toHeight := maxInt + if !fromTime.IsZero() { + fromHeight = w.is.GetBlockHeightOfTime(uint32(fromTime.Unix())) + } + if !toTime.IsZero() { + toHeight = int(w.is.GetBlockHeightOfTime(uint32(toTime.Unix()))) + } + txs, err := w.getAddressTxids(addrDesc, false, &AddressFilter{Vout: AddressFilterVoutOff, FromHeight: fromHeight}, toHeight) if err != nil { return nil, err } @@ -826,19 +835,13 @@ func (w *Worker) GetBalanceHistory(address string) ([]BalanceHistory, error) { continue } counted := false - if bi == nil || bi.Height != ta.Height { - bi, err = w.db.GetBlockInfo(ta.Height) - if err != nil { - return nil, err + time := int64(w.is.GetBlockTime(ta.Height)) + hour := time - time%3600 + if bh.Time != hour { + if bh.Txs != 0 { + bhs = append(bhs, bh) } - } - hour := bi.Time - bi.Time%3600 - if b.Time != hour { - if b.Txs != 0 { - b.BalanceSat = (*Amount)(new(big.Int).Set(&balance)) - bh = append(bh, b) - } - b = BalanceHistory{ + bh = BalanceHistory{ Time: hour, SentSat: &Amount{}, ReceivedSat: &Amount{}, @@ -847,32 +850,29 @@ func (w *Worker) GetBalanceHistory(address string) ([]BalanceHistory, error) { for i := range ta.Inputs { tai := &ta.Inputs[i] if bytes.Equal(addrDesc, tai.AddrDesc) { - (*big.Int)(b.SentSat).Add((*big.Int)(b.SentSat), &tai.ValueSat) - balance.Sub(&balance, &tai.ValueSat) + (*big.Int)(bh.SentSat).Add((*big.Int)(bh.SentSat), &tai.ValueSat) if !counted { counted = true - b.Txs++ + bh.Txs++ } } } for i := range ta.Outputs { tao := &ta.Outputs[i] if bytes.Equal(addrDesc, tao.AddrDesc) { - (*big.Int)(b.ReceivedSat).Add((*big.Int)(b.ReceivedSat), &tao.ValueSat) - balance.Add(&balance, &tao.ValueSat) + (*big.Int)(bh.ReceivedSat).Add((*big.Int)(bh.ReceivedSat), &tao.ValueSat) if !counted { counted = true - b.Txs++ + bh.Txs++ } } } } - if b.Txs != 0 { - b.BalanceSat = (*Amount)(&balance) - bh = append(bh, b) + if bh.Txs != 0 { + bhs = append(bhs, bh) } - glog.Info("GetBalanceHistory ", address, ", count ", len(bh), " finished in ", time.Since(start)) - return bh, nil + glog.Info("GetBalanceHistory ", address, ", count ", len(bhs), " finished in ", time.Since(start)) + return bhs, nil } func (w *Worker) waitForBackendSync() { diff --git a/common/internalstate.go b/common/internalstate.go index 9419beaa..079b3b8c 100644 --- a/common/internalstate.go +++ b/common/internalstate.go @@ -2,6 +2,7 @@ package common import ( "encoding/json" + "sort" "sync" "time" ) @@ -45,6 +46,7 @@ type InternalState struct { IsSynchronized bool `json:"isSynchronized"` BestHeight uint32 `json:"bestHeight"` LastSync time.Time `json:"lastSync"` + BlockTimes []uint32 `json:"-"` IsMempoolSynchronized bool `json:"isMempoolSynchronized"` MempoolSize int `json:"mempoolSize"` @@ -164,6 +166,54 @@ func (is *InternalState) DBSizeTotal() int64 { return total } +// GetBlockTime returns block time if block found or 0 +func (is *InternalState) GetBlockTime(height uint32) uint32 { + is.mux.Lock() + defer is.mux.Unlock() + if int(height) < len(is.BlockTimes) { + return is.BlockTimes[height] + } + return 0 +} + +// AppendBlockTime appends block time to BlockTimes +func (is *InternalState) AppendBlockTime(time uint32) { + is.mux.Lock() + defer is.mux.Unlock() + is.BlockTimes = append(is.BlockTimes, time) +} + +// RemoveLastBlockTimes removes last times from BlockTimes +func (is *InternalState) RemoveLastBlockTimes(count int) { + is.mux.Lock() + defer is.mux.Unlock() + if len(is.BlockTimes) < count { + count = len(is.BlockTimes) + } + is.BlockTimes = is.BlockTimes[:len(is.BlockTimes)-count] +} + +// GetBlockHeightOfTime returns block height of the first block with time greater or equal to the given time or MaxUint32 if no such block +func (is *InternalState) GetBlockHeightOfTime(time uint32) uint32 { + is.mux.Lock() + defer is.mux.Unlock() + height := sort.Search(len(is.BlockTimes), func(i int) bool { return time <= is.BlockTimes[i] }) + if height == len(is.BlockTimes) { + return ^uint32(0) + } + // as the block times can sometimes be out of order try 20 blocks lower to locate a block with the time greater or equal to the given time + max, height := height, height-20 + if height < 0 { + height = 0 + } + for ; height <= max; height++ { + if time <= is.BlockTimes[height] { + break + } + } + return uint32(height) +} + // Pack marshals internal state to json func (is *InternalState) Pack() ([]byte, error) { is.mux.Lock() diff --git a/db/bulkconnect.go b/db/bulkconnect.go index 8fedd51e..025e7cb2 100644 --- a/db/bulkconnect.go +++ b/db/bulkconnect.go @@ -381,6 +381,12 @@ func (b *BulkConnect) Close() error { return err } } + var err error + b.d.is.BlockTimes, err = b.d.loadBlockTimes() + if err != nil { + return err + } + if err := b.d.SetInconsistentState(false); err != nil { return err } diff --git a/db/rocksdb.go b/db/rocksdb.go index 64a8815f..8dd73d9c 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -348,8 +348,11 @@ func (d *RocksDB) ConnectBlock(block *bchain.Block) error { if err := d.storeAddresses(wb, block.Height, addresses); err != nil { return err } - - return d.db.Write(d.wo, wb) + if err := d.db.Write(d.wo, wb); err != nil { + return err + } + d.is.AppendBlockTime(uint32(block.Time)) + return nil } // Addresses index @@ -1334,6 +1337,7 @@ func (d *RocksDB) DisconnectBlockRangeBitcoinType(lower uint32, higher uint32) e } err := d.db.Write(d.wo, wb) if err == nil { + d.is.RemoveLastBlockTimes(int(higher-lower) + 1) glog.Infof("rocksdb: blocks %d-%d disconnected", lower, higher) } return err @@ -1448,6 +1452,32 @@ func (d *RocksDB) internalDeleteTx(wb *gorocksdb.WriteBatch, key []byte) { // internal state const internalStateKey = "internalState" +func (d *RocksDB) loadBlockTimes() ([]uint32, error) { + var times []uint32 + it := d.db.NewIteratorCF(d.ro, d.cfh[cfHeight]) + defer it.Close() + counter := uint32(0) + time := uint32(0) + for it.SeekToFirst(); it.Valid(); it.Next() { + height := unpackUint(it.Key().Data()) + if height > counter { + glog.Warning("gap in cfHeight: expecting ", counter, ", got ", height) + for ; counter < height; counter++ { + times = append(times, time) + } + } + counter++ + info, err := d.unpackBlockInfo(it.Value().Data()) + if err != nil { + return nil, err + } + time = uint32(info.Time) + times = append(times, time) + } + glog.Info("loaded ", len(times), " block times") + return times, nil +} + // LoadInternalState loads from db internal state or initializes a new one if not yet stored func (d *RocksDB) LoadInternalState(rpcCoin string) (*common.InternalState, error) { val, err := d.db.GetCF(d.ro, d.cfh[cfDefault], []byte(internalStateKey)) @@ -1493,6 +1523,10 @@ func (d *RocksDB) LoadInternalState(rpcCoin string) (*common.InternalState, erro } } is.DbColumns = nc + is.BlockTimes, err = d.loadBlockTimes() + if err != nil { + return nil, err + } // after load, reset the synchronization data is.IsSynchronized = false is.IsMempoolSynchronized = false diff --git a/db/rocksdb_ethereumtype.go b/db/rocksdb_ethereumtype.go index e7cc5912..64b2eaa4 100644 --- a/db/rocksdb_ethereumtype.go +++ b/db/rocksdb_ethereumtype.go @@ -455,6 +455,7 @@ func (d *RocksDB) DisconnectBlockRangeEthereumType(lower uint32, higher uint32) d.storeAddressContracts(wb, contracts) err := d.db.Write(d.wo, wb) if err == nil { + d.is.RemoveLastBlockTimes(int(higher-lower) + 1) glog.Infof("rocksdb: blocks %d-%d disconnected", lower, higher) } return err diff --git a/db/rocksdb_ethereumtype_test.go b/db/rocksdb_ethereumtype_test.go index c40e6ad4..2c51a4bb 100644 --- a/db/rocksdb_ethereumtype_test.go +++ b/db/rocksdb_ethereumtype_test.go @@ -167,6 +167,10 @@ func TestRocksDB_Index_EthereumType(t *testing.T) { }) defer closeAndDestroyRocksDB(t, d) + if len(d.is.BlockTimes) != 0 { + t.Fatal("Expecting is.BlockTimes 0, got ", len(d.is.BlockTimes)) + } + // connect 1st block block1 := dbtestdata.GetTestEthereumTypeBlock1(d.chainParser) if err := d.ConnectBlock(block1); err != nil { @@ -174,6 +178,10 @@ func TestRocksDB_Index_EthereumType(t *testing.T) { } verifyAfterEthereumTypeBlock1(t, d, false) + if len(d.is.BlockTimes) != 1 { + t.Fatal("Expecting is.BlockTimes 1, got ", len(d.is.BlockTimes)) + } + // connect 2nd block block2 := dbtestdata.GetTestEthereumTypeBlock2(d.chainParser) if err := d.ConnectBlock(block2); err != nil { @@ -181,6 +189,10 @@ func TestRocksDB_Index_EthereumType(t *testing.T) { } verifyAfterEthereumTypeBlock2(t, d) + if len(d.is.BlockTimes) != 2 { + t.Fatal("Expecting is.BlockTimes 2, got ", len(d.is.BlockTimes)) + } + // get transactions for various addresses / low-high ranges verifyGetTransactions(t, d, "0x"+dbtestdata.EthAddr55, 0, 10000000, []txidIndex{ {"0x" + dbtestdata.EthTxidB2T2, ^2}, @@ -275,10 +287,18 @@ func TestRocksDB_Index_EthereumType(t *testing.T) { } } + if len(d.is.BlockTimes) != 1 { + t.Fatal("Expecting is.BlockTimes 1, got ", len(d.is.BlockTimes)) + } + // connect block again and verify the state of db if err := d.ConnectBlock(block2); err != nil { t.Fatal(err) } verifyAfterEthereumTypeBlock2(t, d) + if len(d.is.BlockTimes) != 2 { + t.Fatal("Expecting is.BlockTimes 2, got ", len(d.is.BlockTimes)) + } + } diff --git a/db/rocksdb_test.go b/db/rocksdb_test.go index 988a6749..2c9c5aca 100644 --- a/db/rocksdb_test.go +++ b/db/rocksdb_test.go @@ -532,6 +532,10 @@ func TestRocksDB_Index_BitcoinType(t *testing.T) { }) defer closeAndDestroyRocksDB(t, d) + if len(d.is.BlockTimes) != 0 { + t.Fatal("Expecting is.BlockTimes 0, got ", len(d.is.BlockTimes)) + } + // connect 1st block - will log warnings about missing UTXO transactions in txAddresses column block1 := dbtestdata.GetTestBitcoinTypeBlock1(d.chainParser) if err := d.ConnectBlock(block1); err != nil { @@ -539,6 +543,10 @@ func TestRocksDB_Index_BitcoinType(t *testing.T) { } verifyAfterBitcoinTypeBlock1(t, d, false) + if len(d.is.BlockTimes) != 1 { + t.Fatal("Expecting is.BlockTimes 1, got ", len(d.is.BlockTimes)) + } + // connect 2nd block - use some outputs from the 1st block as the inputs and 1 input uses tx from the same block block2 := dbtestdata.GetTestBitcoinTypeBlock2(d.chainParser) if err := d.ConnectBlock(block2); err != nil { @@ -546,6 +554,10 @@ func TestRocksDB_Index_BitcoinType(t *testing.T) { } verifyAfterBitcoinTypeBlock2(t, d) + if len(d.is.BlockTimes) != 2 { + t.Fatal("Expecting is.BlockTimes 1, got ", len(d.is.BlockTimes)) + } + // get transactions for various addresses / low-high ranges verifyGetTransactions(t, d, dbtestdata.Addr2, 0, 1000000, []txidIndex{ {dbtestdata.TxidB2T1, ^1}, @@ -649,12 +661,20 @@ func TestRocksDB_Index_BitcoinType(t *testing.T) { } } + if len(d.is.BlockTimes) != 1 { + t.Fatal("Expecting is.BlockTimes 1, got ", len(d.is.BlockTimes)) + } + // connect block again and verify the state of db if err := d.ConnectBlock(block2); err != nil { t.Fatal(err) } verifyAfterBitcoinTypeBlock2(t, d) + if len(d.is.BlockTimes) != 2 { + t.Fatal("Expecting is.BlockTimes 1, got ", len(d.is.BlockTimes)) + } + // test public methods for address balance and tx addresses ab, err := d.GetAddressBalance(dbtestdata.Addr5, AddressBalanceDetailUTXO) if err != nil { @@ -744,6 +764,10 @@ func Test_BulkConnect_BitcoinType(t *testing.T) { t.Fatal("DB not in DbStateInconsistent") } + if len(d.is.BlockTimes) != 0 { + t.Fatal("Expecting is.BlockTimes 0, got ", len(d.is.BlockTimes)) + } + if err := bc.ConnectBlock(dbtestdata.GetTestBitcoinTypeBlock1(d.chainParser), false); err != nil { t.Fatal(err) } @@ -766,6 +790,10 @@ func Test_BulkConnect_BitcoinType(t *testing.T) { } verifyAfterBitcoinTypeBlock2(t, d) + + if len(d.is.BlockTimes) != 225495 { + t.Fatal("Expecting is.BlockTimes 225495, got ", len(d.is.BlockTimes)) + } } func Test_packBigint_unpackBigint(t *testing.T) { diff --git a/server/public.go b/server/public.go index 50cc3567..eea858fb 100644 --- a/server/public.go +++ b/server/public.go @@ -1035,17 +1035,26 @@ func (s *PublicServer) apiUtxo(r *http.Request, apiVersion int) (interface{}, er func (s *PublicServer) apiBalanceHistory(r *http.Request, apiVersion int) (interface{}, error) { var history []api.BalanceHistory + var fromTime, toTime time.Time var err error if i := strings.LastIndexByte(r.URL.Path, '/'); i > 0 { gap, ec := strconv.Atoi(r.URL.Query().Get("gap")) if ec != nil { gap = 0 } + t := r.URL.Query().Get("from") + if t != "" { + fromTime, _ = time.Parse("2006-02-01", t) + } + t = r.URL.Query().Get("to") + if t != "" { + toTime, _ = time.Parse("2006-02-01", t) + } history, err = s.api.GetUtxoBalanceHistory(r.URL.Path[i+1:], gap) if err == nil { s.metrics.ExplorerViews.With(common.Labels{"action": "api-xpub-balancehistory"}).Inc() } else { - history, err = s.api.GetBalanceHistory(r.URL.Path[i+1:]) + history, err = s.api.GetBalanceHistory(r.URL.Path[i+1:], fromTime, toTime) s.metrics.ExplorerViews.With(common.Labels{"action": "api-address-balancehistory"}).Inc() } } diff --git a/server/public_test.go b/server/public_test.go index d25bbad3..1aa76e3c 100644 --- a/server/public_test.go +++ b/server/public_test.go @@ -50,8 +50,13 @@ func setupRocksDB(t *testing.T, parser bchain.BlockChainParser) (*db.RocksDB, *c t.Fatal(err) } d.SetInternalState(is) + block1 := dbtestdata.GetTestBitcoinTypeBlock1(parser) + // setup internal state BlockTimes + for i := uint32(0); i < block1.Height; i++ { + is.BlockTimes = append(is.BlockTimes, 0) + } // import data - if err := d.ConnectBlock(dbtestdata.GetTestBitcoinTypeBlock1(parser)); err != nil { + if err := d.ConnectBlock(block1); err != nil { t.Fatal(err) } block2 := dbtestdata.GetTestBitcoinTypeBlock2(parser) @@ -610,7 +615,7 @@ func httpTestsBitcoinType(t *testing.T, ts *httptest.Server) { status: http.StatusOK, contentType: "application/json; charset=utf-8", body: []string{ - `[{"blockTime":1521514800,"txs":1,"received":"12345","sent":"0","balance":"12345"},{"blockTime":1521594000,"txs":1,"received":"0","sent":"12345","balance":"0"}]`, + `[{"blockTime":1521514800,"txs":1,"received":"12345","sent":"0"},{"blockTime":1521594000,"txs":1,"received":"0","sent":"12345"}]`, }, }, { @@ -619,7 +624,7 @@ func httpTestsBitcoinType(t *testing.T, ts *httptest.Server) { status: http.StatusOK, contentType: "application/json; charset=utf-8", body: []string{ - `[{"blockTime":1521514800,"txs":1,"received":"9876","sent":"0","balance":"9876"},{"blockTime":1521594000,"txs":1,"received":"9000","sent":"9876","balance":"9000"}]`, + `[{"blockTime":1521514800,"txs":1,"received":"9876","sent":"0"},{"blockTime":1521594000,"txs":1,"received":"9000","sent":"9876"}]`, }, }, {