diff --git a/api/types.go b/api/types.go index a2ddde4a..e0e562ca 100644 --- a/api/types.go +++ b/api/types.go @@ -337,3 +337,16 @@ type SystemInfo struct { Blockbook *BlockbookInfo `json:"blockbook"` Backend *bchain.ChainInfo `json:"backend"` } + +// MempoolTxid contains information about a transaction in mempool +type MempoolTxid struct { + Time int64 `json:"time"` + Txid string `json:"txid"` +} + +// MempoolTxids contains a list of mempool txids with paging information +type MempoolTxids struct { + Paging + Mempool []MempoolTxid `json:"mempool"` + MempoolSize int `json:"mempoolSize"` +} diff --git a/api/worker.go b/api/worker.go index c8632758..8209c83b 100644 --- a/api/worker.go +++ b/api/worker.go @@ -23,17 +23,19 @@ type Worker struct { chain bchain.BlockChain chainParser bchain.BlockChainParser chainType bchain.ChainType + mempool bchain.Mempool is *common.InternalState } // NewWorker creates new api worker -func NewWorker(db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, is *common.InternalState) (*Worker, error) { +func NewWorker(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, is *common.InternalState) (*Worker, error) { w := &Worker{ db: db, txCache: txCache, chain: chain, chainParser: chain.GetChainParser(), chainType: chain.GetChainParser().GetChainType(), + mempool: mempool, is: is, } return w, nil @@ -292,6 +294,10 @@ func (w *Worker) GetTransactionFromBchainTx(bchainTx *bchain.Tx, height uint32, return nil, err } } + // for mempool transaction get first seen time + if bchainTx.Confirmations == 0 { + bchainTx.Blocktime = int64(w.mempool.GetTransactionTime(bchainTx.Txid)) + } r := &Tx{ Blockhash: blockhash, Blockheight: int(height), @@ -348,7 +354,7 @@ func (w *Worker) getAddressTxids(addrDesc bchain.AddressDescriptor, mempool bool } if mempool { uniqueTxs := make(map[string]struct{}) - o, err := w.chain.GetMempoolTransactionsForAddrDesc(addrDesc) + o, err := w.mempool.GetAddrDescTransactions(addrDesc) if err != nil { return nil, err } @@ -1068,3 +1074,26 @@ func (w *Worker) GetSystemInfo(internal bool) (*SystemInfo, error) { glog.Info("GetSystemInfo finished in ", time.Since(start)) return &SystemInfo{bi, ci}, nil } + +// GetMempool returns a page of mempool txids +func (w *Worker) GetMempool(page int, itemsOnPage int) (*MempoolTxids, error) { + page-- + if page < 0 { + page = 0 + } + entries := w.mempool.GetAllEntries() + pg, from, to, page := computePaging(len(entries), page, itemsOnPage) + r := &MempoolTxids{ + Paging: pg, + MempoolSize: len(entries), + } + r.Mempool = make([]MempoolTxid, to-from) + for i := from; i < to; i++ { + entry := &entries[i] + r.Mempool[i-from] = MempoolTxid{ + Txid: entry.Txid, + Time: int64(entry.Time), + } + } + return r, nil +} diff --git a/api/xpub.go b/api/xpub.go index 79bb2fe5..664a12ff 100644 --- a/api/xpub.go +++ b/api/xpub.go @@ -92,7 +92,7 @@ func (w *Worker) xpubGetAddressTxids(addrDesc bchain.AddressDescriptor, mempool } if mempool { uniqueTxs := make(map[string]int) - o, err := w.chain.GetMempoolTransactionsForAddrDesc(addrDesc) + o, err := w.mempool.GetAddrDescTransactions(addrDesc) if err != nil { return nil, false, err } @@ -383,13 +383,13 @@ func (w *Worker) GetXpubAddress(xpub string, page int, txsOnPage int, option Acc return nil, err } // setup filtering of txids - var useTxids func(txid *xpubTxid, ad *xpubAddress) bool + var txidFilter func(txid *xpubTxid, ad *xpubAddress) bool if !(filter.FromHeight == 0 && filter.ToHeight == 0 && filter.Vout == AddressFilterVoutOff) { toHeight := maxUint32 if filter.ToHeight != 0 { toHeight = filter.ToHeight } - useTxids = func(txid *xpubTxid, ad *xpubAddress) bool { + txidFilter = func(txid *xpubTxid, ad *xpubAddress) bool { if txid.height < filter.FromHeight || txid.height > toHeight { return false } @@ -406,6 +406,7 @@ func (w *Worker) GetXpubAddress(xpub string, page int, txsOnPage int, option Acc // process mempool, only if ToHeight is not specified if filter.ToHeight == 0 && !filter.OnlyConfirmed { txmMap = make(map[string]*Tx) + mempoolEntries := make(bchain.MempoolTxidEntries, 0) for _, da := range [][]xpubAddress{data.addresses, data.changeAddresses} { for i := range da { ad := &da[i] @@ -432,18 +433,23 @@ func (w *Worker) GetXpubAddress(xpub string, page int, txsOnPage int, option Acc } uBalSat.Add(&uBalSat, tx.getAddrVoutValue(ad.addrDesc)) uBalSat.Sub(&uBalSat, tx.getAddrVinValue(ad.addrDesc)) - if page == 0 && !foundTx && (useTxids == nil || useTxids(&txid, ad)) { - if option == AccountDetailsTxidHistory { - txids = append(txids, tx.Txid) - } else if option >= AccountDetailsTxHistoryLight { - txs = append(txs, tx) - } + // mempool txs are returned only on the first page, uniquely and filtered + if page == 0 && !foundTx && (txidFilter == nil || txidFilter(&txid, ad)) { + mempoolEntries = append(mempoolEntries, bchain.MempoolTxidEntry{Txid: txid.txid, Time: uint32(tx.Blocktime)}) } } - } } } + // sort the entries by time descending + sort.Sort(mempoolEntries) + for _, entry := range mempoolEntries { + if option == AccountDetailsTxidHistory { + txids = append(txids, entry.Txid) + } else if option >= AccountDetailsTxHistoryLight { + txs = append(txs, txmMap[entry.Txid]) + } + } } if option >= AccountDetailsTxidHistory { txcMap := make(map[string]bool) @@ -459,7 +465,7 @@ func (w *Worker) GetXpubAddress(xpub string, page int, txsOnPage int, option Acc } // add tx only once if !added { - add := useTxids == nil || useTxids(&txid, ad) + add := txidFilter == nil || txidFilter(&txid, ad) txcMap[txid.txid] = add if add { txc = append(txc, txid) diff --git a/bchain/basechain.go b/bchain/basechain.go index 1de31a48..f1a58e35 100644 --- a/bchain/basechain.go +++ b/bchain/basechain.go @@ -29,6 +29,11 @@ func (b *BaseChain) GetNetworkName() string { return b.Network } +// GetMempoolEntry is not supported by default +func (b *BaseChain) GetMempoolEntry(txid string) (*MempoolEntry, error) { + return nil, errors.New("GetMempoolEntry: not supported") +} + // EthereumTypeGetBalance is not supported func (b *BaseChain) EthereumTypeGetBalance(addrDesc AddressDescriptor) (*big.Int, error) { return nil, errors.New("Not supported") diff --git a/bchain/basemempool.go b/bchain/basemempool.go new file mode 100644 index 00000000..ae585bcc --- /dev/null +++ b/bchain/basemempool.go @@ -0,0 +1,115 @@ +package bchain + +import ( + "sort" + "sync" +) + +type addrIndex struct { + addrDesc string + n int32 +} + +type txEntry struct { + addrIndexes []addrIndex + time uint32 +} + +type txidio struct { + txid string + io []addrIndex +} + +// BaseMempool is mempool base handle +type BaseMempool struct { + chain BlockChain + mux sync.Mutex + txEntries map[string]txEntry + addrDescToTx map[string][]Outpoint + OnNewTxAddr OnNewTxAddrFunc +} + +// GetTransactions returns slice of mempool transactions for given address +func (m *BaseMempool) GetTransactions(address string) ([]Outpoint, error) { + parser := m.chain.GetChainParser() + addrDesc, err := parser.GetAddrDescFromAddress(address) + if err != nil { + return nil, err + } + return m.GetAddrDescTransactions(addrDesc) +} + +// GetAddrDescTransactions returns slice of mempool transactions for given address descriptor, in reverse order +func (m *BaseMempool) GetAddrDescTransactions(addrDesc AddressDescriptor) ([]Outpoint, error) { + m.mux.Lock() + defer m.mux.Unlock() + outpoints := m.addrDescToTx[string(addrDesc)] + rv := make([]Outpoint, len(outpoints)) + for i, j := len(outpoints)-1, 0; i >= 0; i-- { + rv[j] = outpoints[i] + j++ + } + return rv, nil +} + +func (a MempoolTxidEntries) Len() int { return len(a) } +func (a MempoolTxidEntries) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a MempoolTxidEntries) Less(i, j int) bool { + // if the Time is equal, sort by txid to make the order defined + hi := a[i].Time + hj := a[j].Time + if hi == hj { + return a[i].Txid > a[j].Txid + } + // order in reverse + return hi > hj +} + +// removeEntryFromMempool removes entry from mempool structs. The caller is responsible for locking! +func (m *BaseMempool) removeEntryFromMempool(txid string, entry txEntry) { + delete(m.txEntries, txid) + for _, si := range entry.addrIndexes { + outpoints, found := m.addrDescToTx[si.addrDesc] + if found { + newOutpoints := make([]Outpoint, 0, len(outpoints)-1) + for _, o := range outpoints { + if o.Txid != txid { + newOutpoints = append(newOutpoints, o) + } + } + if len(newOutpoints) > 0 { + m.addrDescToTx[si.addrDesc] = newOutpoints + } else { + delete(m.addrDescToTx, si.addrDesc) + } + } + } +} + +// GetAllEntries returns all mempool entries sorted by fist seen time in descending order +func (m *BaseMempool) GetAllEntries() MempoolTxidEntries { + i := 0 + m.mux.Lock() + entries := make(MempoolTxidEntries, len(m.txEntries)) + for txid, entry := range m.txEntries { + entries[i] = MempoolTxidEntry{ + Txid: txid, + Time: entry.time, + } + i++ + } + m.mux.Unlock() + sort.Sort(entries) + return entries +} + +// GetTransactionTime returns first seen time of a transaction +func (m *BaseMempool) GetTransactionTime(txid string) uint32 { + m.mux.Lock() + e, found := m.txEntries[txid] + m.mux.Unlock() + if !found { + return 0 + } + return e.time +} diff --git a/bchain/coins/bch/bcashrpc.go b/bchain/coins/bch/bcashrpc.go index 62569377..79500773 100644 --- a/bchain/coins/bch/bcashrpc.go +++ b/bchain/coins/bch/bcashrpc.go @@ -34,10 +34,11 @@ func NewBCashRPC(config json.RawMessage, pushHandler func(bchain.NotificationTyp // Initialize initializes BCashRPC instance. func (b *BCashRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain params := GetChainParams(chainName) diff --git a/bchain/coins/bellcoin/bellcoinrpc.go b/bchain/coins/bellcoin/bellcoinrpc.go index bd359fd2..2317e063 100644 --- a/bchain/coins/bellcoin/bellcoinrpc.go +++ b/bchain/coins/bellcoin/bellcoinrpc.go @@ -31,10 +31,11 @@ func NewBellcoinRPC(config json.RawMessage, pushHandler func(bchain.Notification // Initialize initializes BellcoinRPC instance. func (b *BellcoinRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain glog.Info("Chain name ", chainName) params := GetChainParams(chainName) diff --git a/bchain/coins/blockchain.go b/bchain/coins/blockchain.go index de8d374c..c4662e4a 100644 --- a/bchain/coins/blockchain.go +++ b/bchain/coins/blockchain.go @@ -103,30 +103,34 @@ func GetCoinNameFromConfig(configfile string) (string, string, string, error) { return cn.CoinName, cn.CoinShortcut, cn.CoinLabel, nil } -// NewBlockChain creates bchain.BlockChain of type defined by parameter coin -func NewBlockChain(coin string, configfile string, pushHandler func(bchain.NotificationType), metrics *common.Metrics) (bchain.BlockChain, error) { +// NewBlockChain creates bchain.BlockChain and bchain.Mempool for the coin passed by the parameter coin +func NewBlockChain(coin string, configfile string, pushHandler func(bchain.NotificationType), metrics *common.Metrics) (bchain.BlockChain, bchain.Mempool, error) { data, err := ioutil.ReadFile(configfile) if err != nil { - return nil, errors.Annotatef(err, "Error reading file %v", configfile) + return nil, nil, errors.Annotatef(err, "Error reading file %v", configfile) } var config json.RawMessage err = json.Unmarshal(data, &config) if err != nil { - return nil, errors.Annotatef(err, "Error parsing file %v", configfile) + return nil, nil, errors.Annotatef(err, "Error parsing file %v", configfile) } bcf, ok := BlockChainFactories[coin] if !ok { - return nil, errors.New(fmt.Sprint("Unsupported coin '", coin, "'. Must be one of ", reflect.ValueOf(BlockChainFactories).MapKeys())) + return nil, nil, errors.New(fmt.Sprint("Unsupported coin '", coin, "'. Must be one of ", reflect.ValueOf(BlockChainFactories).MapKeys())) } bc, err := bcf(config, pushHandler) if err != nil { - return nil, err + return nil, nil, err } err = bc.Initialize() if err != nil { - return nil, err + return nil, nil, err } - return &blockChainWithMetrics{b: bc, m: metrics}, nil + mempool, err := bc.CreateMempool(bc) + if err != nil { + return nil, nil, err + } + return &blockChainWithMetrics{b: bc, m: metrics}, &mempoolWithMetrics{mempool: mempool, m: metrics}, nil } type blockChainWithMetrics struct { @@ -146,6 +150,14 @@ func (c *blockChainWithMetrics) Initialize() error { return c.b.Initialize() } +func (c *blockChainWithMetrics) CreateMempool(chain bchain.BlockChain) (bchain.Mempool, error) { + return c.b.CreateMempool(chain) +} + +func (c *blockChainWithMetrics) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc, onNewTxAddr bchain.OnNewTxAddrFunc) error { + return c.b.InitializeMempool(addrDescForOutpoint, onNewTxAddr) +} + func (c *blockChainWithMetrics) Shutdown(ctx context.Context) error { return c.b.Shutdown(ctx) } @@ -201,9 +213,9 @@ func (c *blockChainWithMetrics) GetBlockInfo(hash string) (v *bchain.BlockInfo, return c.b.GetBlockInfo(hash) } -func (c *blockChainWithMetrics) GetMempool() (v []string, err error) { - defer func(s time.Time) { c.observeRPCLatency("GetMempool", s, err) }(time.Now()) - return c.b.GetMempool() +func (c *blockChainWithMetrics) GetMempoolTransactions() (v []string, err error) { + defer func(s time.Time) { c.observeRPCLatency("GetMempoolTransactions", s, err) }(time.Now()) + return c.b.GetMempoolTransactions() } func (c *blockChainWithMetrics) GetTransaction(txid string) (v *bchain.Tx, err error) { @@ -236,25 +248,6 @@ func (c *blockChainWithMetrics) SendRawTransaction(tx string) (v string, err err return c.b.SendRawTransaction(tx) } -func (c *blockChainWithMetrics) ResyncMempool(onNewTxAddr bchain.OnNewTxAddrFunc) (count int, err error) { - defer func(s time.Time) { c.observeRPCLatency("ResyncMempool", s, err) }(time.Now()) - count, err = c.b.ResyncMempool(onNewTxAddr) - if err == nil { - c.m.MempoolSize.Set(float64(count)) - } - return count, err -} - -func (c *blockChainWithMetrics) GetMempoolTransactions(address string) (v []bchain.Outpoint, err error) { - defer func(s time.Time) { c.observeRPCLatency("GetMempoolTransactions", s, err) }(time.Now()) - return c.b.GetMempoolTransactions(address) -} - -func (c *blockChainWithMetrics) GetMempoolTransactionsForAddrDesc(addrDesc bchain.AddressDescriptor) (v []bchain.Outpoint, err error) { - defer func(s time.Time) { c.observeRPCLatency("GetMempoolTransactionsForAddrDesc", s, err) }(time.Now()) - return c.b.GetMempoolTransactionsForAddrDesc(addrDesc) -} - func (c *blockChainWithMetrics) GetMempoolEntry(txid string) (v *bchain.MempoolEntry, err error) { defer func(s time.Time) { c.observeRPCLatency("GetMempoolEntry", s, err) }(time.Now()) return c.b.GetMempoolEntry(txid) @@ -288,3 +281,44 @@ func (c *blockChainWithMetrics) EthereumTypeGetErc20ContractBalance(addrDesc, co defer func(s time.Time) { c.observeRPCLatency("EthereumTypeGetErc20ContractInfo", s, err) }(time.Now()) return c.b.EthereumTypeGetErc20ContractBalance(addrDesc, contractDesc) } + +type mempoolWithMetrics struct { + mempool bchain.Mempool + m *common.Metrics +} + +func (c *mempoolWithMetrics) observeRPCLatency(method string, start time.Time, err error) { + var e string + if err != nil { + e = err.Error() + } + c.m.RPCLatency.With(common.Labels{"method": method, "error": e}).Observe(float64(time.Since(start)) / 1e6) // in milliseconds +} + +func (c *mempoolWithMetrics) Resync() (count int, err error) { + defer func(s time.Time) { c.observeRPCLatency("ResyncMempool", s, err) }(time.Now()) + count, err = c.mempool.Resync() + if err == nil { + c.m.MempoolSize.Set(float64(count)) + } + return count, err +} + +func (c *mempoolWithMetrics) GetTransactions(address string) (v []bchain.Outpoint, err error) { + defer func(s time.Time) { c.observeRPCLatency("GetMempoolTransactions", s, err) }(time.Now()) + return c.mempool.GetTransactions(address) +} + +func (c *mempoolWithMetrics) GetAddrDescTransactions(addrDesc bchain.AddressDescriptor) (v []bchain.Outpoint, err error) { + defer func(s time.Time) { c.observeRPCLatency("GetMempoolTransactionsForAddrDesc", s, err) }(time.Now()) + return c.mempool.GetAddrDescTransactions(addrDesc) +} + +func (c *mempoolWithMetrics) GetAllEntries() (v bchain.MempoolTxidEntries) { + defer func(s time.Time) { c.observeRPCLatency("GetAllEntries", s, nil) }(time.Now()) + return c.mempool.GetAllEntries() +} + +func (c *mempoolWithMetrics) GetTransactionTime(txid string) uint32 { + return c.mempool.GetTransactionTime(txid) +} diff --git a/bchain/coins/btc/bitcoinparser_test.go b/bchain/coins/btc/bitcoinparser_test.go index 45468869..cf7bc0d5 100644 --- a/bchain/coins/btc/bitcoinparser_test.go +++ b/bchain/coins/btc/bitcoinparser_test.go @@ -218,7 +218,7 @@ func TestGetAddressesFromAddrDesc(t *testing.T) { { name: "OP_RETURN omni simple send tether", args: args{script: "6a146f6d6e69000000000000001f00000709bb647351"}, - want: []string{"OMNI Simple Send 77383.80022609 TetherUS (#31)"}, + want: []string{"OMNI Simple Send: 77383.80022609 TetherUS (#31)"}, want2: false, wantErr: false, }, diff --git a/bchain/coins/btc/bitcoinrpc.go b/bchain/coins/btc/bitcoinrpc.go index ef7574cd..ffda6650 100644 --- a/bchain/coins/btc/bitcoinrpc.go +++ b/bchain/coins/btc/bitcoinrpc.go @@ -101,37 +101,15 @@ func NewBitcoinRPC(config json.RawMessage, pushHandler func(bchain.NotificationT return s, nil } -// GetChainInfoAndInitializeMempool is called by Initialize and reused by other coins -// it contacts the blockchain rpc interface for the first time -// and if successful it connects to ZeroMQ and creates mempool handler -func (b *BitcoinRPC) GetChainInfoAndInitializeMempool(bc bchain.BlockChain) (string, error) { - // try to connect to block chain and get some info - ci, err := bc.GetChainInfo() - if err != nil { - return "", err - } - chainName := ci.Chain - - mq, err := bchain.NewMQ(b.ChainConfig.MessageQueueBinding, b.pushHandler) - if err != nil { - glog.Error("mq: ", err) - return "", err - } - b.mq = mq - - b.Mempool = bchain.NewMempoolBitcoinType(bc, b.ChainConfig.MempoolWorkers, b.ChainConfig.MempoolSubWorkers) - - return chainName, nil -} - // Initialize initializes BitcoinRPC instance. func (b *BitcoinRPC) Initialize() error { b.ChainConfig.SupportsEstimateFee = false - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain params := GetChainParams(chainName) @@ -152,6 +130,33 @@ func (b *BitcoinRPC) Initialize() error { return nil } +// CreateMempool creates mempool if not already created, however does not initialize it +func (b *BitcoinRPC) CreateMempool(chain bchain.BlockChain) (bchain.Mempool, error) { + if b.Mempool == nil { + b.Mempool = bchain.NewMempoolBitcoinType(chain, b.ChainConfig.MempoolWorkers, b.ChainConfig.MempoolSubWorkers) + } + return b.Mempool, nil +} + +// InitializeMempool creates ZeroMQ subscription and sets AddrDescForOutpointFunc to the Mempool +func (b *BitcoinRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc, onNewTxAddr bchain.OnNewTxAddrFunc) error { + if b.Mempool == nil { + return errors.New("Mempool not created") + } + b.Mempool.AddrDescForOutpoint = addrDescForOutpoint + b.Mempool.OnNewTxAddr = onNewTxAddr + if b.mq == nil { + mq, err := bchain.NewMQ(b.ChainConfig.MessageQueueBinding, b.pushHandler) + if err != nil { + glog.Error("mq: ", err) + return err + } + b.mq = mq + } + return nil +} + +// Shutdown ZeroMQ and other resources func (b *BitcoinRPC) Shutdown(ctx context.Context) error { if b.mq != nil { if err := b.mq.Shutdown(ctx); err != nil { @@ -162,10 +167,12 @@ func (b *BitcoinRPC) Shutdown(ctx context.Context) error { return nil } +// GetCoinName returns the coin name func (b *BitcoinRPC) GetCoinName() string { return b.ChainConfig.CoinName } +// GetSubversion returns the backend subversion func (b *BitcoinRPC) GetSubversion() string { return b.ChainConfig.Subversion } @@ -458,6 +465,7 @@ func (b *BitcoinRPC) GetChainInfo() (*bchain.ChainInfo, error) { return rv, nil } +// IsErrBlockNotFound returns true if error means block was not found func IsErrBlockNotFound(err *bchain.RPCError) bool { return err.Message == "Block not found" || err.Message == "Block height out of range" @@ -634,8 +642,8 @@ func (b *BitcoinRPC) GetBlockFull(hash string) (*bchain.Block, error) { return &res.Result, nil } -// GetMempool returns transactions in mempool -func (b *BitcoinRPC) GetMempool() ([]string, error) { +// GetMempoolTransactions returns transactions in mempool +func (b *BitcoinRPC) GetMempoolTransactions() ([]string, error) { glog.V(1).Info("rpc: getrawmempool") res := ResGetMempool{} @@ -651,6 +659,7 @@ func (b *BitcoinRPC) GetMempool() ([]string, error) { return res.Result, nil } +// IsMissingTx return true if error means missing tx func IsMissingTx(err *bchain.RPCError) bool { if err.Code == -5 { // "No such mempool or blockchain transaction" return true @@ -732,23 +741,6 @@ func (b *BitcoinRPC) getRawTransaction(txid string) (json.RawMessage, error) { return res.Result, nil } -// ResyncMempool gets mempool transactions and maps output scripts to transactions. -// ResyncMempool is not reentrant, it should be called from a single thread. -// Return value is number of transactions in mempool -func (b *BitcoinRPC) ResyncMempool(onNewTxAddr bchain.OnNewTxAddrFunc) (int, error) { - return b.Mempool.Resync(onNewTxAddr) -} - -// GetMempoolTransactions returns slice of mempool transactions for given address -func (b *BitcoinRPC) GetMempoolTransactions(address string) ([]bchain.Outpoint, error) { - return b.Mempool.GetTransactions(address) -} - -// GetMempoolTransactionsForAddrDesc returns slice of mempool transactions for given address descriptor -func (b *BitcoinRPC) GetMempoolTransactionsForAddrDesc(addrDesc bchain.AddressDescriptor) ([]bchain.Outpoint, error) { - return b.Mempool.GetAddrDescTransactions(addrDesc) -} - // EstimateSmartFee returns fee estimation func (b *BitcoinRPC) EstimateSmartFee(blocks int, conservative bool) (big.Int, error) { // use EstimateFee if EstimateSmartFee is not supported @@ -810,7 +802,7 @@ func (b *BitcoinRPC) EstimateFee(blocks int) (big.Int, error) { return r, nil } -// SendRawTransaction sends raw transaction. +// SendRawTransaction sends raw transaction func (b *BitcoinRPC) SendRawTransaction(tx string) (string, error) { glog.V(1).Info("rpc: sendrawtransaction") diff --git a/bchain/coins/btg/bgoldrpc.go b/bchain/coins/btg/bgoldrpc.go index b158ba32..a590de97 100644 --- a/bchain/coins/btg/bgoldrpc.go +++ b/bchain/coins/btg/bgoldrpc.go @@ -29,10 +29,11 @@ func NewBGoldRPC(config json.RawMessage, pushHandler func(bchain.NotificationTyp // Initialize initializes BGoldRPC instance. func (b *BGoldRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain params := GetChainParams(chainName) diff --git a/bchain/coins/dash/dashrpc.go b/bchain/coins/dash/dashrpc.go index 66629d79..2320113d 100644 --- a/bchain/coins/dash/dashrpc.go +++ b/bchain/coins/dash/dashrpc.go @@ -34,10 +34,11 @@ func NewDashRPC(config json.RawMessage, pushHandler func(bchain.NotificationType // Initialize initializes DashRPC instance. func (b *DashRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain params := GetChainParams(chainName) diff --git a/bchain/coins/digibyte/digibyterpc.go b/bchain/coins/digibyte/digibyterpc.go index 0cc47e9e..d0f6f32c 100644 --- a/bchain/coins/digibyte/digibyterpc.go +++ b/bchain/coins/digibyte/digibyterpc.go @@ -31,10 +31,11 @@ func NewDigiByteRPC(config json.RawMessage, pushHandler func(bchain.Notification // Initialize initializes DigiByteRPC instance. func (b *DigiByteRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain glog.Info("Chain name ", chainName) params := GetChainParams(chainName) diff --git a/bchain/coins/dogecoin/dogecoinrpc.go b/bchain/coins/dogecoin/dogecoinrpc.go index 1fc21618..9d6b4e63 100644 --- a/bchain/coins/dogecoin/dogecoinrpc.go +++ b/bchain/coins/dogecoin/dogecoinrpc.go @@ -31,10 +31,11 @@ func NewDogecoinRPC(config json.RawMessage, pushHandler func(bchain.Notification // Initialize initializes DogecoinRPC instance. func (b *DogecoinRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain glog.Info("Chain name ", chainName) params := GetChainParams(chainName) diff --git a/bchain/coins/eth/ethrpc.go b/bchain/coins/eth/ethrpc.go index 2a34c077..eb0bdc2d 100644 --- a/bchain/coins/eth/ethrpc.go +++ b/bchain/coins/eth/ethrpc.go @@ -31,32 +31,33 @@ const ( // Configuration represents json config file type Configuration struct { - CoinName string `json:"coin_name"` - CoinShortcut string `json:"coin_shortcut"` - RPCURL string `json:"rpc_url"` - RPCTimeout int `json:"rpc_timeout"` - BlockAddressesToKeep int `json:"block_addresses_to_keep"` + CoinName string `json:"coin_name"` + CoinShortcut string `json:"coin_shortcut"` + RPCURL string `json:"rpc_url"` + RPCTimeout int `json:"rpc_timeout"` + BlockAddressesToKeep int `json:"block_addresses_to_keep"` + MempoolTxTimeoutHours int `json:"mempoolTxTimeoutHours"` + QueryBackendOnMempoolResync bool `json:"queryBackendOnMempoolResync"` } // EthereumRPC is an interface to JSON-RPC eth service. type EthereumRPC struct { *bchain.BaseChain - client *ethclient.Client - rpc *rpc.Client - timeout time.Duration - Parser *EthereumParser - Mempool *bchain.MempoolEthereumType - bestHeaderLock sync.Mutex - bestHeader *ethtypes.Header - bestHeaderTime time.Time - chanNewBlock chan *ethtypes.Header - newBlockSubscription *rpc.ClientSubscription - chanNewTx chan ethcommon.Hash - newTxSubscription *rpc.ClientSubscription - pendingTransactions map[string]struct{} - pendingTransactionsLock sync.Mutex - ChainConfig *Configuration - isETC bool + client *ethclient.Client + rpc *rpc.Client + timeout time.Duration + Parser *EthereumParser + Mempool *bchain.MempoolEthereumType + mempoolInitialized bool + bestHeaderLock sync.Mutex + bestHeader *ethtypes.Header + bestHeaderTime time.Time + chanNewBlock chan *ethtypes.Header + newBlockSubscription *rpc.ClientSubscription + chanNewTx chan ethcommon.Hash + newTxSubscription *rpc.ClientSubscription + ChainConfig *Configuration + isETC bool } // NewEthereumRPC returns new EthRPC instance. @@ -78,11 +79,10 @@ func NewEthereumRPC(config json.RawMessage, pushHandler func(bchain.Notification ec := ethclient.NewClient(rc) s := &EthereumRPC{ - BaseChain: &bchain.BaseChain{}, - client: ec, - rpc: rc, - ChainConfig: &c, - pendingTransactions: make(map[string]struct{}), + BaseChain: &bchain.BaseChain{}, + client: ec, + rpc: rc, + ChainConfig: &c, } // always create parser @@ -125,9 +125,7 @@ func NewEthereumRPC(config json.RawMessage, pushHandler func(bchain.Notification if glog.V(2) { glog.Info("rpc: new tx ", hex) } - s.pendingTransactionsLock.Lock() - s.pendingTransactions[hex] = struct{}{} - s.pendingTransactionsLock.Unlock() + s.Mempool.AddTransactionToMempool(hex) pushHandler(bchain.NotificationNewTx) } }() @@ -160,11 +158,40 @@ func (b *EthereumRPC) Initialize() error { } glog.Info("rpc: block chain ", b.Network) + return nil +} + +// CreateMempool creates mempool if not already created, however does not initialize it +func (b *EthereumRPC) CreateMempool(chain bchain.BlockChain) (bchain.Mempool, error) { + if b.Mempool == nil { + b.Mempool = bchain.NewMempoolEthereumType(chain, b.ChainConfig.MempoolTxTimeoutHours, b.ChainConfig.QueryBackendOnMempoolResync) + glog.Info("mempool created, MempoolTxTimeoutHours=", b.ChainConfig.MempoolTxTimeoutHours, ", QueryBackendOnMempoolResync=", b.ChainConfig.QueryBackendOnMempoolResync) + } + return b.Mempool, nil +} + +// InitializeMempool creates subscriptions to newHeads and newPendingTransactions +func (b *EthereumRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc, onNewTxAddr bchain.OnNewTxAddrFunc) error { + if b.Mempool == nil { + return errors.New("Mempool not created") + } + + // get initial mempool transactions + txs, err := b.GetMempoolTransactions() + if err != nil { + return err + } + for _, txid := range txs { + b.Mempool.AddTransactionToMempool(txid) + } + + b.Mempool.OnNewTxAddr = onNewTxAddr + if b.isETC { glog.Info(b.ChainConfig.CoinName, " does not support subscription to newHeads") } else { // subscriptions - if err = b.subscribe(func() (*rpc.ClientSubscription, error) { + if err := b.subscribe(func() (*rpc.ClientSubscription, error) { // invalidate the previous subscription - it is either the first one or there was an error b.newBlockSubscription = nil ctx, cancel := context.WithTimeout(context.Background(), b.timeout) @@ -180,7 +207,8 @@ func (b *EthereumRPC) Initialize() error { return err } } - if err = b.subscribe(func() (*rpc.ClientSubscription, error) { + + if err := b.subscribe(func() (*rpc.ClientSubscription, error) { // invalidate the previous subscription - it is either the first one or there was an error b.newTxSubscription = nil ctx, cancel := context.WithTimeout(context.Background(), b.timeout) @@ -196,8 +224,7 @@ func (b *EthereumRPC) Initialize() error { return err } - // create mempool - b.Mempool = bchain.NewMempoolEthereumType(b) + b.mempoolInitialized = true return nil } @@ -481,9 +508,9 @@ func (b *EthereumRPC) GetBlock(hash string, height uint32) (*bchain.Block, error return nil, errors.Annotatef(err, "hash %v, height %v, txid %v", hash, height, tx.Hash) } btxs[i] = *btx - b.pendingTransactionsLock.Lock() - delete(b.pendingTransactions, tx.Hash) - b.pendingTransactionsLock.Unlock() + if b.mempoolInitialized { + b.Mempool.RemoveTransactionFromMempool(tx.Hash) + } } bbk := bchain.Block{ BlockHeader: *bbh, @@ -521,14 +548,7 @@ func (b *EthereumRPC) GetBlockInfo(hash string) (*bchain.BlockInfo, error) { // GetTransactionForMempool returns a transaction by the transaction ID. // It could be optimized for mempool, i.e. without block time and confirmations func (b *EthereumRPC) GetTransactionForMempool(txid string) (*bchain.Tx, error) { - tx, err := b.GetTransaction(txid) - // if there is an error getting the tx or the tx is confirmed, remove it from pending transactions - if err == bchain.ErrTxNotFound || (tx != nil && tx.Confirmations > 0) { - b.pendingTransactionsLock.Lock() - delete(b.pendingTransactions, txid) - b.pendingTransactionsLock.Unlock() - } - return tx, err + return b.GetTransaction(txid) } // GetTransaction returns a transaction by the transaction ID. @@ -541,6 +561,9 @@ func (b *EthereumRPC) GetTransaction(txid string) (*bchain.Tx, error) { if err != nil { return nil, err } else if tx == nil { + if b.mempoolInitialized { + b.Mempool.RemoveTransactionFromMempool(txid) + } return nil, bchain.ErrTxNotFound } var btx *bchain.Tx @@ -607,6 +630,10 @@ func (b *EthereumRPC) GetTransaction(txid string) (*bchain.Tx, error) { if err != nil { return nil, errors.Annotatef(err, "txid %v", txid) } + // remove tx from mempool if it is there + if b.mempoolInitialized { + b.Mempool.RemoveTransactionFromMempool(txid) + } } return btx, nil } @@ -628,8 +655,8 @@ func (b *EthereumRPC) GetTransactionSpecific(tx *bchain.Tx) (json.RawMessage, er return json.RawMessage(m), err } -// GetMempool returns transactions in mempool -func (b *EthereumRPC) GetMempool() ([]string, error) { +// GetMempoolTransactions returns transactions in mempool +func (b *EthereumRPC) GetMempoolTransactions() ([]string, error) { raw, err := b.getBlockRaw("pending", 0, false) if err != nil { return nil, err @@ -640,19 +667,7 @@ func (b *EthereumRPC) GetMempool() ([]string, error) { return nil, err } } - b.pendingTransactionsLock.Lock() - // join transactions returned by getBlockRaw with pendingTransactions from subscription - for _, txid := range body.Transactions { - b.pendingTransactions[txid] = struct{}{} - } - txids := make([]string, len(b.pendingTransactions)) - i := 0 - for txid := range b.pendingTransactions { - txids[i] = txid - i++ - } - b.pendingTransactionsLock.Unlock() - return txids, nil + return body.Transactions, nil } // EstimateFee returns fee estimation @@ -737,28 +752,6 @@ func (b *EthereumRPC) EthereumTypeGetNonce(addrDesc bchain.AddressDescriptor) (u return b.client.NonceAt(ctx, ethcommon.BytesToAddress(addrDesc), nil) } -// ResyncMempool gets mempool transactions and maps output scripts to transactions. -// ResyncMempool is not reentrant, it should be called from a single thread. -// Return value is number of transactions in mempool -func (b *EthereumRPC) ResyncMempool(onNewTxAddr bchain.OnNewTxAddrFunc) (int, error) { - return b.Mempool.Resync(onNewTxAddr) -} - -// GetMempoolTransactions returns slice of mempool transactions for given address -func (b *EthereumRPC) GetMempoolTransactions(address string) ([]bchain.Outpoint, error) { - return b.Mempool.GetTransactions(address) -} - -// GetMempoolTransactionsForAddrDesc returns slice of mempool transactions for given address descriptor -func (b *EthereumRPC) GetMempoolTransactionsForAddrDesc(addrDesc bchain.AddressDescriptor) ([]bchain.Outpoint, error) { - return b.Mempool.GetAddrDescTransactions(addrDesc) -} - -// GetMempoolEntry is not supported by etherem -func (b *EthereumRPC) GetMempoolEntry(txid string) (*bchain.MempoolEntry, error) { - return nil, errors.New("GetMempoolEntry: not supported") -} - // GetChainParser returns ethereum BlockChainParser func (b *EthereumRPC) GetChainParser() bchain.BlockChainParser { return b.Parser diff --git a/bchain/coins/flo/florpc.go b/bchain/coins/flo/florpc.go index 94f831bb..42549034 100644 --- a/bchain/coins/flo/florpc.go +++ b/bchain/coins/flo/florpc.go @@ -4,6 +4,7 @@ import ( "blockbook/bchain" "blockbook/bchain/coins/btc" "encoding/json" + "github.com/juju/errors" "github.com/golang/glog" @@ -32,10 +33,11 @@ func NewFloRPC(config json.RawMessage, pushHandler func(bchain.NotificationType) // Initialize initializes FloRPC instance. func (f *FloRPC) Initialize() error { - chainName, err := f.GetChainInfoAndInitializeMempool(f) + ci, err := f.GetChainInfo() if err != nil { return err } + chainName := ci.Chain glog.Info("Chain name ", chainName) params := GetChainParams(chainName) diff --git a/bchain/coins/fujicoin/fujicoinrpc.go b/bchain/coins/fujicoin/fujicoinrpc.go index 16c562cc..d7588812 100644 --- a/bchain/coins/fujicoin/fujicoinrpc.go +++ b/bchain/coins/fujicoin/fujicoinrpc.go @@ -31,10 +31,11 @@ func NewFujicoinRPC(config json.RawMessage, pushHandler func(bchain.Notification // Initialize initializes FujicoinRPC instance. func (b *FujicoinRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain glog.Info("Chain name ", chainName) params := GetChainParams(chainName) diff --git a/bchain/coins/gamecredits/gamecreditsrpc.go b/bchain/coins/gamecredits/gamecreditsrpc.go index bb3cecb0..763ff018 100644 --- a/bchain/coins/gamecredits/gamecreditsrpc.go +++ b/bchain/coins/gamecredits/gamecreditsrpc.go @@ -31,10 +31,11 @@ func NewGameCreditsRPC(config json.RawMessage, pushHandler func(bchain.Notificat // Initialize initializes GameCreditsRPC instance. func (b *GameCreditsRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain glog.Info("Chain name ", chainName) params := GetChainParams(chainName) diff --git a/bchain/coins/grs/grsrpc.go b/bchain/coins/grs/grsrpc.go index d3afb9f0..8d4495fa 100644 --- a/bchain/coins/grs/grsrpc.go +++ b/bchain/coins/grs/grsrpc.go @@ -27,10 +27,11 @@ func NewGroestlcoinRPC(config json.RawMessage, pushHandler func(bchain.Notificat // Initialize initializes GroestlcoinRPC instance. func (g *GroestlcoinRPC) Initialize() error { - chainName, err := g.GetChainInfoAndInitializeMempool(g) + ci, err := g.GetChainInfo() if err != nil { return err } + chainName := ci.Chain params := GetChainParams(chainName) diff --git a/bchain/coins/koto/kotorpc.go b/bchain/coins/koto/kotorpc.go index 7bb35868..5dc87295 100644 --- a/bchain/coins/koto/kotorpc.go +++ b/bchain/coins/koto/kotorpc.go @@ -28,10 +28,11 @@ func NewKotoRPC(config json.RawMessage, pushHandler func(bchain.NotificationType // Initialize initializes KotoRPC instance. func (z *KotoRPC) Initialize() error { - chainName, err := z.GetChainInfoAndInitializeMempool(z) + ci, err := z.GetChainInfo() if err != nil { return err } + chainName := ci.Chain params := GetChainParams(chainName) diff --git a/bchain/coins/liquid/liquidrpc.go b/bchain/coins/liquid/liquidrpc.go index 7bfc54a7..1ec45fb2 100644 --- a/bchain/coins/liquid/liquidrpc.go +++ b/bchain/coins/liquid/liquidrpc.go @@ -31,10 +31,11 @@ func NewLiquidRPC(config json.RawMessage, pushHandler func(bchain.NotificationTy // Initialize initializes GameCreditsRPC instance. func (b *LiquidRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain glog.Info("Chain name ", chainName) params := GetChainParams(chainName) diff --git a/bchain/coins/litecoin/litecoinrpc.go b/bchain/coins/litecoin/litecoinrpc.go index e7345251..274e3054 100644 --- a/bchain/coins/litecoin/litecoinrpc.go +++ b/bchain/coins/litecoin/litecoinrpc.go @@ -31,10 +31,11 @@ func NewLitecoinRPC(config json.RawMessage, pushHandler func(bchain.Notification // Initialize initializes LitecoinRPC instance. func (b *LitecoinRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain glog.Info("Chain name ", chainName) params := GetChainParams(chainName) diff --git a/bchain/coins/monacoin/monacoinrpc.go b/bchain/coins/monacoin/monacoinrpc.go index dacddb4c..77accd43 100644 --- a/bchain/coins/monacoin/monacoinrpc.go +++ b/bchain/coins/monacoin/monacoinrpc.go @@ -31,10 +31,11 @@ func NewMonacoinRPC(config json.RawMessage, pushHandler func(bchain.Notification // Initialize initializes MonacoinRPC instance. func (b *MonacoinRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain glog.Info("Chain name ", chainName) params := GetChainParams(chainName) diff --git a/bchain/coins/myriad/myriadrpc.go b/bchain/coins/myriad/myriadrpc.go index f0bd201a..cafa00b6 100644 --- a/bchain/coins/myriad/myriadrpc.go +++ b/bchain/coins/myriad/myriadrpc.go @@ -31,10 +31,11 @@ func NewMyriadRPC(config json.RawMessage, pushHandler func(bchain.NotificationTy // Initialize initializes MyriadRPC instance. func (b *MyriadRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain glog.Info("Chain name ", chainName) params := GetChainParams(chainName) diff --git a/bchain/coins/namecoin/namecoinrpc.go b/bchain/coins/namecoin/namecoinrpc.go index 89dc245c..927c095d 100644 --- a/bchain/coins/namecoin/namecoinrpc.go +++ b/bchain/coins/namecoin/namecoinrpc.go @@ -31,10 +31,11 @@ func NewNamecoinRPC(config json.RawMessage, pushHandler func(bchain.Notification // Initialize initializes NamecoinRPC instance. func (b *NamecoinRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain glog.Info("Chain name ", chainName) params := GetChainParams(chainName) diff --git a/bchain/coins/nuls/nulsrpc.go b/bchain/coins/nuls/nulsrpc.go index 3bd7b24e..49e4178f 100644 --- a/bchain/coins/nuls/nulsrpc.go +++ b/bchain/coins/nuls/nulsrpc.go @@ -5,9 +5,8 @@ import ( "blockbook/bchain/coins/btc" "bytes" "encoding/base64" + "encoding/hex" "encoding/json" - "github.com/gobuffalo/packr/v2/file/resolver/encoding/hex" - "github.com/juju/errors" "io" "io/ioutil" "math/big" @@ -17,6 +16,8 @@ import ( "strconv" "time" + "github.com/juju/errors" + "github.com/golang/glog" ) diff --git a/bchain/coins/pivx/pivxrpc.go b/bchain/coins/pivx/pivxrpc.go index 8979ee4f..38f2c2cc 100644 --- a/bchain/coins/pivx/pivxrpc.go +++ b/bchain/coins/pivx/pivxrpc.go @@ -32,10 +32,11 @@ func NewPivXRPC(config json.RawMessage, pushHandler func(bchain.NotificationType // Initialize initializes PivXRPC instance. func (b *PivXRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain glog.Info("Chain name ", chainName) params := GetChainParams(chainName) diff --git a/bchain/coins/polis/polisrpc.go b/bchain/coins/polis/polisrpc.go index aa31f22e..47e9fb7a 100644 --- a/bchain/coins/polis/polisrpc.go +++ b/bchain/coins/polis/polisrpc.go @@ -31,10 +31,11 @@ func NewPolisRPC(config json.RawMessage, pushHandler func(bchain.NotificationTyp // Initialize initializes PolisRPC instance. func (b *PolisRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain params := GetChainParams(chainName) diff --git a/bchain/coins/qtum/qtumrpc.go b/bchain/coins/qtum/qtumrpc.go index b5a8cafc..a942ce3b 100644 --- a/bchain/coins/qtum/qtumrpc.go +++ b/bchain/coins/qtum/qtumrpc.go @@ -4,6 +4,7 @@ import ( "blockbook/bchain" "blockbook/bchain/coins/btc" "encoding/json" + "github.com/golang/glog" ) @@ -30,10 +31,11 @@ func NewQtumRPC(config json.RawMessage, pushHandler func(bchain.NotificationType // Initialize initializes QtumRPC instance. func (b *QtumRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain params := GetChainParams(chainName) diff --git a/bchain/coins/vertcoin/vertcoinrpc.go b/bchain/coins/vertcoin/vertcoinrpc.go index f282dc78..ec1e1ba2 100644 --- a/bchain/coins/vertcoin/vertcoinrpc.go +++ b/bchain/coins/vertcoin/vertcoinrpc.go @@ -31,10 +31,11 @@ func NewVertcoinRPC(config json.RawMessage, pushHandler func(bchain.Notification // Initialize initializes VertcoinRPC instance. func (b *VertcoinRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain glog.Info("Chain name ", chainName) params := GetChainParams(chainName) diff --git a/bchain/coins/xzc/zcoinrpc.go b/bchain/coins/xzc/zcoinrpc.go index 1117cbc3..22dfbe92 100644 --- a/bchain/coins/xzc/zcoinrpc.go +++ b/bchain/coins/xzc/zcoinrpc.go @@ -36,10 +36,11 @@ func NewZcoinRPC(config json.RawMessage, pushHandler func(bchain.NotificationTyp } func (zc *ZcoinRPC) Initialize() error { - chainName, err := zc.GetChainInfoAndInitializeMempool(zc) + ci, err := zc.GetChainInfo() if err != nil { return err } + chainName := ci.Chain params := GetChainParams(chainName) diff --git a/bchain/coins/zec/zcashrpc.go b/bchain/coins/zec/zcashrpc.go index 3ecffea3..4da5f8d4 100644 --- a/bchain/coins/zec/zcashrpc.go +++ b/bchain/coins/zec/zcashrpc.go @@ -30,10 +30,11 @@ func NewZCashRPC(config json.RawMessage, pushHandler func(bchain.NotificationTyp // Initialize initializes ZCashRPC instance func (z *ZCashRPC) Initialize() error { - chainName, err := z.GetChainInfoAndInitializeMempool(z) + ci, err := z.GetChainInfo() if err != nil { return err } + chainName := ci.Chain params := GetChainParams(chainName) diff --git a/bchain/mempool_bitcoin_type.go b/bchain/mempool_bitcoin_type.go index db12ffd4..68882886 100644 --- a/bchain/mempool_bitcoin_type.go +++ b/bchain/mempool_bitcoin_type.go @@ -1,38 +1,28 @@ package bchain import ( - "sync" "time" "github.com/golang/glog" ) -type addrIndex struct { - addrDesc string - n int32 -} - -type txidio struct { - txid string - io []addrIndex -} - // MempoolBitcoinType is mempool handle. type MempoolBitcoinType struct { - chain BlockChain - mux sync.Mutex - txToInputOutput map[string][]addrIndex - addrDescToTx map[string][]Outpoint - chanTxid chan string - chanAddrIndex chan txidio - onNewTxAddr OnNewTxAddrFunc + BaseMempool + chanTxid chan string + chanAddrIndex chan txidio + AddrDescForOutpoint AddrDescForOutpointFunc } // NewMempoolBitcoinType creates new mempool handler. // For now there is no cleanup of sync routines, the expectation is that the mempool is created only once per process func NewMempoolBitcoinType(chain BlockChain, workers int, subworkers int) *MempoolBitcoinType { m := &MempoolBitcoinType{ - chain: chain, + BaseMempool: BaseMempool{ + chain: chain, + txEntries: make(map[string]txEntry), + addrDescToTx: make(map[string][]Outpoint), + }, chanTxid: make(chan string, 1), chanAddrIndex: make(chan txidio, 1), } @@ -61,44 +51,26 @@ func NewMempoolBitcoinType(chain BlockChain, workers int, subworkers int) *Mempo return m } -// GetTransactions returns slice of mempool transactions for given address -func (m *MempoolBitcoinType) GetTransactions(address string) ([]Outpoint, error) { - parser := m.chain.GetChainParser() - addrDesc, err := parser.GetAddrDescFromAddress(address) - if err != nil { - return nil, err - } - return m.GetAddrDescTransactions(addrDesc) -} - -// GetAddrDescTransactions returns slice of mempool transactions for given address descriptor -func (m *MempoolBitcoinType) GetAddrDescTransactions(addrDesc AddressDescriptor) ([]Outpoint, error) { - m.mux.Lock() - defer m.mux.Unlock() - return append([]Outpoint(nil), m.addrDescToTx[string(addrDesc)]...), nil -} - -func (m *MempoolBitcoinType) updateMappings(newTxToInputOutput map[string][]addrIndex, newAddrDescToTx map[string][]Outpoint) { - m.mux.Lock() - defer m.mux.Unlock() - m.txToInputOutput = newTxToInputOutput - m.addrDescToTx = newAddrDescToTx -} - func (m *MempoolBitcoinType) getInputAddress(input Outpoint) *addrIndex { - itx, err := m.chain.GetTransactionForMempool(input.Txid) - if err != nil { - glog.Error("cannot get transaction ", input.Txid, ": ", err) - return nil + var addrDesc AddressDescriptor + if m.AddrDescForOutpoint != nil { + addrDesc = m.AddrDescForOutpoint(input) } - if int(input.Vout) >= len(itx.Vout) { - glog.Error("Vout len in transaction ", input.Txid, " ", len(itx.Vout), " input.Vout=", input.Vout) - return nil - } - addrDesc, err := m.chain.GetChainParser().GetAddrDescFromVout(&itx.Vout[input.Vout]) - if err != nil { - glog.Error("error in addrDesc in ", input.Txid, " ", input.Vout, ": ", err) - return nil + if addrDesc == nil { + itx, err := m.chain.GetTransactionForMempool(input.Txid) + if err != nil { + glog.Error("cannot get transaction ", input.Txid, ": ", err) + return nil + } + if int(input.Vout) >= len(itx.Vout) { + glog.Error("Vout len in transaction ", input.Txid, " ", len(itx.Vout), " input.Vout=", input.Vout) + return nil + } + addrDesc, err = m.chain.GetChainParser().GetAddrDescFromVout(&itx.Vout[input.Vout]) + if err != nil { + glog.Error("error in addrDesc in ", input.Txid, " ", input.Vout, ": ", err) + return nil + } } return &addrIndex{string(addrDesc), ^input.Vout} @@ -121,8 +93,8 @@ func (m *MempoolBitcoinType) getTxAddrs(txid string, chanInput chan Outpoint, ch if len(addrDesc) > 0 { io = append(io, addrIndex{string(addrDesc), int32(output.N)}) } - if m.onNewTxAddr != nil { - m.onNewTxAddr(tx, addrDesc) + if m.OnNewTxAddr != nil { + m.OnNewTxAddr(tx, addrDesc) } } dispatched := 0 @@ -159,37 +131,38 @@ func (m *MempoolBitcoinType) getTxAddrs(txid string, chanInput chan Outpoint, ch // Resync gets mempool transactions and maps outputs to transactions. // Resync is not reentrant, it should be called from a single thread. // Read operations (GetTransactions) are safe. -func (m *MempoolBitcoinType) Resync(onNewTxAddr OnNewTxAddrFunc) (int, error) { +func (m *MempoolBitcoinType) Resync() (int, error) { start := time.Now() glog.V(1).Info("mempool: resync") - m.onNewTxAddr = onNewTxAddr - txs, err := m.chain.GetMempool() + txs, err := m.chain.GetMempoolTransactions() if err != nil { return 0, err } glog.V(2).Info("mempool: resync ", len(txs), " txs") - // allocate slightly larger capacity of the maps - newTxToInputOutput := make(map[string][]addrIndex, len(m.txToInputOutput)+5) - newAddrDescToTx := make(map[string][]Outpoint, len(m.addrDescToTx)+5) - dispatched := 0 - onNewData := func(txid string, io []addrIndex) { - if len(io) > 0 { - newTxToInputOutput[txid] = io - for _, si := range io { - newAddrDescToTx[si.addrDesc] = append(newAddrDescToTx[si.addrDesc], Outpoint{txid, si.n}) + onNewEntry := func(txid string, entry txEntry) { + if len(entry.addrIndexes) > 0 { + m.mux.Lock() + m.txEntries[txid] = entry + for _, si := range entry.addrIndexes { + m.addrDescToTx[si.addrDesc] = append(m.addrDescToTx[si.addrDesc], Outpoint{txid, si.n}) } + m.mux.Unlock() } } + txsMap := make(map[string]struct{}, len(txs)) + dispatched := 0 + txTime := uint32(time.Now().Unix()) // get transaction in parallel using goroutines created in NewUTXOMempool for _, txid := range txs { - io, exists := m.txToInputOutput[txid] + txsMap[txid] = struct{}{} + _, exists := m.txEntries[txid] if !exists { loop: for { select { // store as many processed transactions as possible case tio := <-m.chanAddrIndex: - onNewData(tio.txid, tio.io) + onNewEntry(tio.txid, txEntry{tio.io, txTime}) dispatched-- // send transaction to be processed case m.chanTxid <- txid: @@ -197,16 +170,20 @@ func (m *MempoolBitcoinType) Resync(onNewTxAddr OnNewTxAddrFunc) (int, error) { break loop } } - } else { - onNewData(txid, io) } } for i := 0; i < dispatched; i++ { tio := <-m.chanAddrIndex - onNewData(tio.txid, tio.io) + onNewEntry(tio.txid, txEntry{tio.io, txTime}) } - m.updateMappings(newTxToInputOutput, newAddrDescToTx) - m.onNewTxAddr = nil - glog.Info("mempool: resync finished in ", time.Since(start), ", ", len(m.txToInputOutput), " transactions in mempool") - return len(m.txToInputOutput), nil + + for txid, entry := range m.txEntries { + if _, exists := txsMap[txid]; !exists { + m.mux.Lock() + m.removeEntryFromMempool(txid, entry) + m.mux.Unlock() + } + } + glog.Info("mempool: resync finished in ", time.Since(start), ", ", len(m.txEntries), " transactions in mempool") + return len(m.txEntries), nil } diff --git a/bchain/mempool_ethereum_type.go b/bchain/mempool_ethereum_type.go index eff4bc53..33e44e72 100644 --- a/bchain/mempool_ethereum_type.go +++ b/bchain/mempool_ethereum_type.go @@ -1,47 +1,34 @@ package bchain import ( - "sync" "time" "github.com/golang/glog" ) +const mempoolTimeoutRunPeriod = 10 * time.Minute + // MempoolEthereumType is mempool handle of EthereumType chains type MempoolEthereumType struct { - chain BlockChain - mux sync.Mutex - txToInputOutput map[string][]addrIndex - addrDescToTx map[string][]Outpoint + BaseMempool + mempoolTimeoutTime time.Duration + queryBackendOnResync bool + nextTimeoutRun time.Time } // NewMempoolEthereumType creates new mempool handler. -func NewMempoolEthereumType(chain BlockChain) *MempoolEthereumType { - return &MempoolEthereumType{chain: chain} -} - -// GetTransactions returns slice of mempool transactions for given address -func (m *MempoolEthereumType) GetTransactions(address string) ([]Outpoint, error) { - parser := m.chain.GetChainParser() - addrDesc, err := parser.GetAddrDescFromAddress(address) - if err != nil { - return nil, err +func NewMempoolEthereumType(chain BlockChain, mempoolTxTimeoutHours int, queryBackendOnResync bool) *MempoolEthereumType { + mempoolTimeoutTime := time.Duration(mempoolTxTimeoutHours) * time.Hour + return &MempoolEthereumType{ + BaseMempool: BaseMempool{ + chain: chain, + txEntries: make(map[string]txEntry), + addrDescToTx: make(map[string][]Outpoint), + }, + mempoolTimeoutTime: mempoolTimeoutTime, + queryBackendOnResync: queryBackendOnResync, + nextTimeoutRun: time.Now().Add(mempoolTimeoutTime), } - return m.GetAddrDescTransactions(addrDesc) -} - -// GetAddrDescTransactions returns slice of mempool transactions for given address descriptor -func (m *MempoolEthereumType) GetAddrDescTransactions(addrDesc AddressDescriptor) ([]Outpoint, error) { - m.mux.Lock() - defer m.mux.Unlock() - return append([]Outpoint(nil), m.addrDescToTx[string(addrDesc)]...), nil -} - -func (m *MempoolEthereumType) updateMappings(newTxToInputOutput map[string][]addrIndex, newAddrDescToTx map[string][]Outpoint) { - m.mux.Lock() - defer m.mux.Unlock() - m.txToInputOutput = newTxToInputOutput - m.addrDescToTx = newAddrDescToTx } func appendAddress(io []addrIndex, i int32, a string, parser BlockChainParser) []addrIndex { @@ -56,73 +43,117 @@ func appendAddress(io []addrIndex, i int32, a string, parser BlockChainParser) [ return io } -// Resync gets mempool transactions and maps outputs to transactions. -// Resync is not reentrant, it should be called from a single thread. -// Read operations (GetTransactions) are safe. -func (m *MempoolEthereumType) Resync(onNewTxAddr OnNewTxAddrFunc) (int, error) { - start := time.Now() - glog.V(1).Info("Mempool: resync") - txs, err := m.chain.GetMempool() +func (m *MempoolEthereumType) createTxEntry(txid string, txTime uint32) (txEntry, bool) { + tx, err := m.chain.GetTransactionForMempool(txid) if err != nil { - return 0, err + if err != ErrTxNotFound { + glog.Warning("cannot get transaction ", txid, ": ", err) + } + return txEntry{}, false } parser := m.chain.GetChainParser() - // allocate slightly larger capacity of the maps - newTxToInputOutput := make(map[string][]addrIndex, len(m.txToInputOutput)+5) - newAddrDescToTx := make(map[string][]Outpoint, len(m.addrDescToTx)+5) - for _, txid := range txs { - io, exists := m.txToInputOutput[txid] - if !exists { - tx, err := m.chain.GetTransactionForMempool(txid) - if err != nil { - if err != ErrTxNotFound { - glog.Warning("cannot get transaction ", txid, ": ", err) - } - continue - } - io = make([]addrIndex, 0, len(tx.Vout)+len(tx.Vin)) - for _, output := range tx.Vout { - addrDesc, err := parser.GetAddrDescFromVout(&output) - if err != nil { - if err != ErrAddressMissing { - glog.Error("error in output addrDesc in ", txid, " ", output.N, ": ", err) - } - continue - } - if len(addrDesc) > 0 { - io = append(io, addrIndex{string(addrDesc), int32(output.N)}) - } - } - for _, input := range tx.Vin { - for i, a := range input.Addresses { - appendAddress(io, ^int32(i), a, parser) - } - } - t, err := parser.EthereumTypeGetErc20FromTx(tx) - if err != nil { - glog.Error("GetErc20FromTx for tx ", txid, ", ", err) - } else { - for i := range t { - io = appendAddress(io, ^int32(i+1), t[i].From, parser) - io = appendAddress(io, int32(i+1), t[i].To, parser) - } - } - if onNewTxAddr != nil { - sent := make(map[string]struct{}) - for _, si := range io { - if _, found := sent[si.addrDesc]; !found { - onNewTxAddr(tx, AddressDescriptor(si.addrDesc)) - sent[si.addrDesc] = struct{}{} - } - } + addrIndexes := make([]addrIndex, 0, len(tx.Vout)+len(tx.Vin)) + for _, output := range tx.Vout { + addrDesc, err := parser.GetAddrDescFromVout(&output) + if err != nil { + if err != ErrAddressMissing { + glog.Error("error in output addrDesc in ", txid, " ", output.N, ": ", err) } + continue } - newTxToInputOutput[txid] = io - for _, si := range io { - newAddrDescToTx[si.addrDesc] = append(newAddrDescToTx[si.addrDesc], Outpoint{txid, si.n}) + if len(addrDesc) > 0 { + addrIndexes = append(addrIndexes, addrIndex{string(addrDesc), int32(output.N)}) } } - m.updateMappings(newTxToInputOutput, newAddrDescToTx) - glog.Info("Mempool: resync finished in ", time.Since(start), ", ", len(m.txToInputOutput), " transactions in mempool") - return len(m.txToInputOutput), nil + for _, input := range tx.Vin { + for i, a := range input.Addresses { + addrIndexes = appendAddress(addrIndexes, ^int32(i), a, parser) + } + } + t, err := parser.EthereumTypeGetErc20FromTx(tx) + if err != nil { + glog.Error("GetErc20FromTx for tx ", txid, ", ", err) + } else { + for i := range t { + addrIndexes = appendAddress(addrIndexes, ^int32(i+1), t[i].From, parser) + addrIndexes = appendAddress(addrIndexes, int32(i+1), t[i].To, parser) + } + } + if m.OnNewTxAddr != nil { + sent := make(map[string]struct{}) + for _, si := range addrIndexes { + if _, found := sent[si.addrDesc]; !found { + m.OnNewTxAddr(tx, AddressDescriptor(si.addrDesc)) + sent[si.addrDesc] = struct{}{} + } + } + } + return txEntry{addrIndexes: addrIndexes, time: txTime}, true +} + +// Resync ethereum type removes timed out transactions and returns number of transactions in mempool. +// Transactions are added/removed by AddTransactionToMempool/RemoveTransactionFromMempool methods +func (m *MempoolEthereumType) Resync() (int, error) { + if m.queryBackendOnResync { + txs, err := m.chain.GetMempoolTransactions() + if err != nil { + return 0, err + } + for _, txid := range txs { + m.AddTransactionToMempool(txid) + } + } + m.mux.Lock() + entries := len(m.txEntries) + now := time.Now() + if m.nextTimeoutRun.Before(now) { + threshold := now.Add(-m.mempoolTimeoutTime) + for txid, entry := range m.txEntries { + if time.Unix(int64(entry.time), 0).Before(threshold) { + m.removeEntryFromMempool(txid, entry) + } + } + removed := entries - len(m.txEntries) + entries = len(m.txEntries) + glog.Info("Mempool: cleanup, removed ", removed, " transactions from mempool") + m.nextTimeoutRun = now.Add(mempoolTimeoutRunPeriod) + } + m.mux.Unlock() + glog.Info("Mempool: resync ", entries, " transactions in mempool") + return entries, nil +} + +// AddTransactionToMempool adds transactions to mempool +func (m *MempoolEthereumType) AddTransactionToMempool(txid string) { + m.mux.Lock() + _, exists := m.txEntries[txid] + m.mux.Unlock() + if glog.V(1) { + glog.Info("AddTransactionToMempool ", txid, ", existed ", exists) + } + if !exists { + entry, ok := m.createTxEntry(txid, uint32(time.Now().Unix())) + if !ok { + return + } + m.mux.Lock() + m.txEntries[txid] = entry + for _, si := range entry.addrIndexes { + m.addrDescToTx[si.addrDesc] = append(m.addrDescToTx[si.addrDesc], Outpoint{txid, si.n}) + } + m.mux.Unlock() + } +} + +// RemoveTransactionFromMempool removes transaction from mempool +func (m *MempoolEthereumType) RemoveTransactionFromMempool(txid string) { + m.mux.Lock() + entry, exists := m.txEntries[txid] + if glog.V(1) { + glog.Info("RemoveTransactionFromMempool ", txid, ", existed ", exists) + } + if exists { + m.removeEntryFromMempool(txid, entry) + } + m.mux.Unlock() } diff --git a/bchain/types.go b/bchain/types.go index ead62535..4b3c1abd 100644 --- a/bchain/types.go +++ b/bchain/types.go @@ -185,16 +185,34 @@ type Erc20Transfer struct { Tokens big.Int } +// MempoolTxidEntry contains mempool txid with first seen time +type MempoolTxidEntry struct { + Txid string + Time uint32 +} + +// MempoolTxidEntries is array of MempoolTxidEntry +type MempoolTxidEntries []MempoolTxidEntry + // OnNewBlockFunc is used to send notification about a new block type OnNewBlockFunc func(hash string, height uint32) // OnNewTxAddrFunc is used to send notification about a new transaction/address type OnNewTxAddrFunc func(tx *Tx, desc AddressDescriptor) +// AddrDescForOutpointFunc defines function that returns address descriptorfor given outpoint or nil if outpoint not found +type AddrDescForOutpointFunc func(outpoint Outpoint) AddressDescriptor + // BlockChain defines common interface to block chain daemon type BlockChain interface { // life-cycle methods + // initialize the block chain connector Initialize() error + // create mempool but do not initialize it + CreateMempool(BlockChain) (Mempool, error) + // initialize mempool, create ZeroMQ (or other) subscription + InitializeMempool(AddrDescForOutpointFunc, OnNewTxAddrFunc) error + // shutdown mempool, ZeroMQ and block chain connections Shutdown(ctx context.Context) error // chain info IsTestnet() bool @@ -209,17 +227,13 @@ type BlockChain interface { GetBlockHeader(hash string) (*BlockHeader, error) GetBlock(hash string, height uint32) (*Block, error) GetBlockInfo(hash string) (*BlockInfo, error) - GetMempool() ([]string, error) + GetMempoolTransactions() ([]string, error) GetTransaction(txid string) (*Tx, error) GetTransactionForMempool(txid string) (*Tx, error) GetTransactionSpecific(tx *Tx) (json.RawMessage, error) EstimateSmartFee(blocks int, conservative bool) (big.Int, error) EstimateFee(blocks int) (big.Int, error) SendRawTransaction(tx string) (string, error) - // mempool - ResyncMempool(onNewTxAddr OnNewTxAddrFunc) (int, error) - GetMempoolTransactions(address string) ([]Outpoint, error) - GetMempoolTransactionsForAddrDesc(addrDesc AddressDescriptor) ([]Outpoint, error) GetMempoolEntry(txid string) (*MempoolEntry, error) // parser GetChainParser() BlockChainParser @@ -270,3 +284,12 @@ type BlockChainParser interface { // EthereumType specific EthereumTypeGetErc20FromTx(tx *Tx) ([]Erc20Transfer, error) } + +// Mempool defines common interface to mempool +type Mempool interface { + Resync() (int, error) + GetTransactions(address string) ([]Outpoint, error) + GetAddrDescTransactions(addrDesc AddressDescriptor) ([]Outpoint, error) + GetAllEntries() MempoolTxidEntries + GetTransactionTime(txid string) uint32 +} diff --git a/blockbook.go b/blockbook.go index 44d8def9..a0893ec9 100644 --- a/blockbook.go +++ b/blockbook.go @@ -82,6 +82,7 @@ var ( chanSyncMempoolDone = make(chan struct{}) chanStoreInternalStateDone = make(chan struct{}) chain bchain.BlockChain + mempool bchain.Mempool index *db.RocksDB txCache *db.TxCache metrics *common.Metrics @@ -98,29 +99,6 @@ func init() { glog.CopyStandardLogTo("INFO") } -func getBlockChainWithRetry(coin string, configfile string, pushHandler func(bchain.NotificationType), metrics *common.Metrics, seconds int) (bchain.BlockChain, error) { - var chain bchain.BlockChain - var err error - timer := time.NewTimer(time.Second) - for i := 0; ; i++ { - if chain, err = coins.NewBlockChain(coin, configfile, pushHandler, metrics); err != nil { - if i < seconds { - glog.Error("rpc: ", err, " Retrying...") - select { - case <-chanOsSignal: - return nil, errors.New("Interrupted") - case <-timer.C: - timer.Reset(time.Second) - continue - } - } else { - return nil, err - } - } - return chain, nil - } -} - func main() { flag.Parse() @@ -141,34 +119,40 @@ func main() { if *repair { if err := db.RepairRocksDB(*dbPath); err != nil { - glog.Fatalf("RepairRocksDB %s: %v", *dbPath, err) + glog.Errorf("RepairRocksDB %s: %v", *dbPath, err) + return } return } if *blockchain == "" { - glog.Fatal("Missing blockchaincfg configuration parameter") + glog.Error("Missing blockchaincfg configuration parameter") + return } coin, coinShortcut, coinLabel, err := coins.GetCoinNameFromConfig(*blockchain) if err != nil { - glog.Fatal("config: ", err) + glog.Error("config: ", err) + return } // gspt.SetProcTitle("blockbook-" + normalizeName(coin)) metrics, err = common.GetMetrics(coin) if err != nil { - glog.Fatal("metrics: ", err) + glog.Error("metrics: ", err) + return } - if chain, err = getBlockChainWithRetry(coin, *blockchain, pushSynchronizationHandler, metrics, 60); err != nil { - glog.Fatal("rpc: ", err) + if chain, mempool, err = getBlockChainWithRetry(coin, *blockchain, pushSynchronizationHandler, metrics, 60); err != nil { + glog.Error("rpc: ", err) + return } index, err = db.NewRocksDB(*dbPath, *dbCache, *dbMaxOpenFiles, chain.GetChainParser(), metrics) if err != nil { - glog.Fatal("rocksDB: ", err) + glog.Error("rocksDB: ", err) + return } defer index.Close() @@ -198,40 +182,20 @@ func main() { syncWorker, err = db.NewSyncWorker(index, chain, *syncWorkers, *syncChunk, *blockFrom, *dryRun, chanOsSignal, metrics, internalState) if err != nil { - glog.Fatalf("NewSyncWorker %v", err) + glog.Errorf("NewSyncWorker %v", err) + return } // set the DbState to open at this moment, after all important workers are initialized internalState.DbState = common.DbStateOpen err = index.StoreInternalState(internalState) if err != nil { - glog.Fatal("internalState: ", err) + glog.Error("internalState: ", err) + return } if *rollbackHeight >= 0 { - bestHeight, bestHash, err := index.GetBestBlock() - if err != nil { - glog.Error("rollbackHeight: ", err) - return - } - if uint32(*rollbackHeight) > bestHeight { - glog.Infof("nothing to rollback, rollbackHeight %d, bestHeight: %d", *rollbackHeight, bestHeight) - } else { - hashes := []string{bestHash} - for height := bestHeight - 1; height >= uint32(*rollbackHeight); height-- { - hash, err := index.GetBlockHash(height) - if err != nil { - glog.Error("rollbackHeight: ", err) - return - } - hashes = append(hashes, hash) - } - err = syncWorker.DisconnectBlocks(uint32(*rollbackHeight), bestHeight, hashes) - if err != nil { - glog.Error("rollbackHeight: ", err) - return - } - } + performRollback() return } @@ -247,45 +211,20 @@ func main() { var internalServer *server.InternalServer if *internalBinding != "" { - internalServer, err = server.NewInternalServer(*internalBinding, *certFiles, index, chain, txCache, internalState) + internalServer, err = startInternalServer() if err != nil { - glog.Error("https: ", err) + glog.Error("internal server: ", err) return } - go func() { - err = internalServer.Run() - if err != nil { - if err.Error() == "http: Server closed" { - glog.Info("internal server: closed") - } else { - glog.Error(err) - return - } - } - }() } var publicServer *server.PublicServer if *publicBinding != "" { - // start public server in limited functionality, extend it after sync is finished by calling ConnectFullPublicInterface - publicServer, err = server.NewPublicServer(*publicBinding, *certFiles, index, chain, txCache, *explorerURL, metrics, internalState, *debugMode) + publicServer, err = startPublicServer() if err != nil { - glog.Error("socketio: ", err) + glog.Error("public server: ", err) return } - go func() { - err = publicServer.Run() - if err != nil { - if err.Error() == "http: Server closed" { - glog.Info("public server: closed") - } else { - glog.Error(err) - return - } - } - }() - callbacksOnNewBlock = append(callbacksOnNewBlock, publicServer.OnNewBlock) - callbacksOnNewTxAddr = append(callbacksOnNewTxAddr, publicServer.OnNewTxAddr) } if *synchronize { @@ -295,8 +234,18 @@ func main() { glog.Error("resyncIndex ", err) return } + // initialize mempool after the initial sync is complete + var addrDescForOutpoint bchain.AddrDescForOutpointFunc + if chain.GetChainParser().GetChainType() == bchain.ChainBitcoinType { + addrDescForOutpoint = index.AddrDescForOutpoint + } + err = chain.InitializeMempool(addrDescForOutpoint, onNewTxAddr) + if err != nil { + glog.Error("initializeMempool ", err) + return + } var mempoolCount int - if mempoolCount, err = chain.ResyncMempool(nil); err != nil { + if mempoolCount, err = mempool.Resync(); err != nil { glog.Error("resyncMempool ", err) return } @@ -307,8 +256,10 @@ func main() { } go storeInternalStateLoop() - if *publicBinding != "" { + if publicServer != nil { // start full public interface + callbacksOnNewBlock = append(callbacksOnNewBlock, publicServer.OnNewBlock) + callbacksOnNewTxAddr = append(callbacksOnNewTxAddr, publicServer.OnNewTxAddr) publicServer.ConnectFullPublicInterface() } @@ -341,8 +292,97 @@ func main() { } } +func getBlockChainWithRetry(coin string, configfile string, pushHandler func(bchain.NotificationType), metrics *common.Metrics, seconds int) (bchain.BlockChain, bchain.Mempool, error) { + var chain bchain.BlockChain + var mempool bchain.Mempool + var err error + timer := time.NewTimer(time.Second) + for i := 0; ; i++ { + if chain, mempool, err = coins.NewBlockChain(coin, configfile, pushHandler, metrics); err != nil { + if i < seconds { + glog.Error("rpc: ", err, " Retrying...") + select { + case <-chanOsSignal: + return nil, nil, errors.New("Interrupted") + case <-timer.C: + timer.Reset(time.Second) + continue + } + } else { + return nil, nil, err + } + } + return chain, mempool, nil + } +} + +func startInternalServer() (*server.InternalServer, error) { + internalServer, err := server.NewInternalServer(*internalBinding, *certFiles, index, chain, mempool, txCache, internalState) + if err != nil { + return nil, err + } + go func() { + err = internalServer.Run() + if err != nil { + if err.Error() == "http: Server closed" { + glog.Info("internal server: closed") + } else { + glog.Error(err) + return + } + } + }() + return internalServer, nil +} + +func startPublicServer() (*server.PublicServer, error) { + // start public server in limited functionality, extend it after sync is finished by calling ConnectFullPublicInterface + publicServer, err := server.NewPublicServer(*publicBinding, *certFiles, index, chain, mempool, txCache, *explorerURL, metrics, internalState, *debugMode) + if err != nil { + return nil, err + } + go func() { + err = publicServer.Run() + if err != nil { + if err.Error() == "http: Server closed" { + glog.Info("public server: closed") + } else { + glog.Error(err) + return + } + } + }() + return publicServer, err +} + +func performRollback() { + bestHeight, bestHash, err := index.GetBestBlock() + if err != nil { + glog.Error("rollbackHeight: ", err) + return + } + if uint32(*rollbackHeight) > bestHeight { + glog.Infof("nothing to rollback, rollbackHeight %d, bestHeight: %d", *rollbackHeight, bestHeight) + } else { + hashes := []string{bestHash} + for height := bestHeight - 1; height >= uint32(*rollbackHeight); height-- { + hash, err := index.GetBlockHash(height) + if err != nil { + glog.Error("rollbackHeight: ", err) + return + } + hashes = append(hashes, hash) + } + err = syncWorker.DisconnectBlocks(uint32(*rollbackHeight), bestHeight, hashes) + if err != nil { + glog.Error("rollbackHeight: ", err) + return + } + } +} + func blockbookAppInfoMetric(db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, is *common.InternalState, metrics *common.Metrics) error { - api, err := api.NewWorker(db, chain, txCache, is) + api, err := api.NewWorker(db, chain, mempool, txCache, is) if err != nil { return err } @@ -442,7 +482,7 @@ func syncMempoolLoop() { // resync mempool about every minute if there are no chanSyncMempool requests, with debounce 1 second tickAndDebounce(time.Duration(*resyncMempoolPeriodMs)*time.Millisecond, debounceResyncMempoolMs*time.Millisecond, chanSyncMempool, func() { internalState.StartedMempoolSync() - if count, err := chain.ResyncMempool(onNewTxAddr); err != nil { + if count, err := mempool.Resync(); err != nil { glog.Error("syncMempoolLoop ", errors.ErrorStack(err)) } else { internalState.FinishedMempoolSync(count) diff --git a/configs/coins/ethereum-classic.json b/configs/coins/ethereum-classic.json index 93e4268f..7889e415 100644 --- a/configs/coins/ethereum-classic.json +++ b/configs/coins/ethereum-classic.json @@ -1,9 +1,9 @@ { "coin": { - "name": "Ethereum Classic", - "shortcut": "ETC", - "label": "Ethereum Classic", - "alias": "ethereum-classic" + "name": "Ethereum Classic", + "shortcut": "ETC", + "label": "Ethereum Classic", + "alias": "ethereum-classic" }, "ports": { "backend_rpc": 8037, @@ -41,17 +41,20 @@ "internal_binding_template": ":{{.Ports.BlockbookInternal}}", "public_binding_template": ":{{.Ports.BlockbookPublic}}", "explorer_url": "", - "additional_params": "-resyncindexperiod=4441", + "additional_params": "-resyncindexperiod=4441 -resyncmempoolperiod=2011", "block_chain": { "parse": true, "mempool_workers": 8, "mempool_sub_workers": 2, "block_addresses_to_keep": 300, - "additional_params": {} + "additional_params": { + "mempoolTxTimeoutHours": 48, + "queryBackendOnMempoolResync": true + } } }, "meta": { "package_maintainer": "Petr Kracik", "package_maintainer_email": "petr.kracik@satoshilabs.com" } -} +} \ No newline at end of file diff --git a/configs/coins/ethereum.json b/configs/coins/ethereum.json index 0b03085d..fbf18f1a 100644 --- a/configs/coins/ethereum.json +++ b/configs/coins/ethereum.json @@ -1,9 +1,9 @@ { "coin": { - "name": "Ethereum", - "shortcut": "ETH", - "label": "Ethereum", - "alias": "ethereum" + "name": "Ethereum", + "shortcut": "ETH", + "label": "Ethereum", + "alias": "ethereum" }, "ports": { "backend_rpc": 8036, @@ -49,11 +49,14 @@ "mempool_workers": 8, "mempool_sub_workers": 2, "block_addresses_to_keep": 300, - "additional_params": {} + "additional_params": { + "mempoolTxTimeoutHours": 48, + "queryBackendOnMempoolResync": false + } } }, "meta": { "package_maintainer": "Petr Kracik", "package_maintainer_email": "petr.kracik@satoshilabs.com" } -} +} \ No newline at end of file diff --git a/configs/coins/ethereum_testnet_ropsten.json b/configs/coins/ethereum_testnet_ropsten.json index bc3c417e..e55985de 100644 --- a/configs/coins/ethereum_testnet_ropsten.json +++ b/configs/coins/ethereum_testnet_ropsten.json @@ -1,9 +1,9 @@ { "coin": { - "name": "Ethereum Testnet Ropsten", - "shortcut": "tROP", - "label": "Ethereum Ropsten", - "alias": "ethereum_testnet_ropsten" + "name": "Ethereum Testnet Ropsten", + "shortcut": "tROP", + "label": "Ethereum Ropsten", + "alias": "ethereum_testnet_ropsten" }, "ports": { "backend_rpc": 18036, @@ -48,11 +48,14 @@ "mempool_workers": 8, "mempool_sub_workers": 2, "block_addresses_to_keep": 300, - "additional_params": {} + "additional_params": { + "mempoolTxTimeoutHours": 12, + "queryBackendOnMempoolResync": false + } } }, "meta": { "package_maintainer": "Petr Kracik", "package_maintainer_email": "petr.kracik@satoshilabs.com" } -} +} \ No newline at end of file diff --git a/configs/environ.json b/configs/environ.json index 6171a41b..f57fa21d 100644 --- a/configs/environ.json +++ b/configs/environ.json @@ -1,5 +1,5 @@ { - "version": "0.2.1", + "version": "0.2.2", "backend_install_path": "/opt/coins/nodes", "backend_data_path": "/opt/coins/data", "blockbook_install_path": "/opt/coins/blockbook", diff --git a/db/rocksdb.go b/db/rocksdb.go index 0ec55b03..735402e3 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -757,6 +757,25 @@ func (d *RocksDB) GetTxAddresses(txid string) (*TxAddresses, error) { return d.getTxAddresses(btxID) } +// AddrDescForOutpoint defines function that returns address descriptorfor given outpoint or nil if outpoint not found +func (d *RocksDB) AddrDescForOutpoint(outpoint bchain.Outpoint) bchain.AddressDescriptor { + ta, err := d.GetTxAddresses(outpoint.Txid) + if err != nil || ta == nil { + return nil + } + if outpoint.Vout < 0 { + vin := ^outpoint.Vout + if len(ta.Inputs) <= int(vin) { + return nil + } + return ta.Inputs[vin].AddrDesc + } + if len(ta.Outputs) <= int(outpoint.Vout) { + return nil + } + return ta.Outputs[outpoint.Vout].AddrDesc +} + func packTxAddresses(ta *TxAddresses, buf []byte, varBuf []byte) []byte { buf = buf[:0] l := packVaruint(uint(ta.Height), varBuf) diff --git a/db/sync.go b/db/sync.go index e755c027..e81cbb2c 100644 --- a/db/sync.go +++ b/db/sync.go @@ -63,11 +63,15 @@ func (w *SyncWorker) ResyncIndex(onNewBlock bchain.OnNewBlockFunc, initialSync b if err == nil { w.is.FinishedSync(bh) } - return nil + return err case errSynced: // this is not actually error but flag that resync wasn't necessary w.is.FinishedSyncNoChange() w.metrics.IndexDBSize.Set(float64(w.db.DatabaseSizeOnDisk())) + if initialSync { + d := time.Since(start) + glog.Info("resync: finished in ", d) + } return nil } @@ -113,7 +117,8 @@ func (w *SyncWorker) resyncIndex(onNewBlock bchain.OnNewBlockFunc, initialSync b } // if parallel operation is enabled and the number of blocks to be connected is large, // use parallel routine to load majority of blocks - if w.syncWorkers > 1 { + // use parallel sync only in case of initial sync because it puts the db to inconsistent state + if w.syncWorkers > 1 && initialSync { remoteBestHeight, err := w.chain.GetBestBlockHeight() if err != nil { return err diff --git a/docs/api.md b/docs/api.md index b028268d..728ec337 100644 --- a/docs/api.md +++ b/docs/api.md @@ -167,6 +167,10 @@ Response for Ethereum-type coins. There is always only one *vin*, only one *vout } ``` +A note about the `blocktime` field: +- for already mined transaction (`confirmations > 0`), the field `blocktime` contains time of the block +- for transactions in mempool (`confirmations == 0`), the field contains time when the running instance of Blockbook was first time notified about the transaction. This time may be different in different instances of Blockbook. + #### Get transaction specific Returns transaction data in the exact format as returned by backend, including all coin specific fields: diff --git a/server/internal.go b/server/internal.go index 6ef09f1e..1f7c3b89 100644 --- a/server/internal.go +++ b/server/internal.go @@ -23,13 +23,14 @@ type InternalServer struct { txCache *db.TxCache chain bchain.BlockChain chainParser bchain.BlockChainParser + mempool bchain.Mempool is *common.InternalState api *api.Worker } // NewInternalServer creates new internal http interface to blockbook and returns its handle -func NewInternalServer(binding, certFiles string, db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, is *common.InternalState) (*InternalServer, error) { - api, err := api.NewWorker(db, chain, txCache, is) +func NewInternalServer(binding, certFiles string, db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, is *common.InternalState) (*InternalServer, error) { + api, err := api.NewWorker(db, chain, mempool, txCache, is) if err != nil { return nil, err } @@ -47,6 +48,7 @@ func NewInternalServer(binding, certFiles string, db *db.RocksDB, chain bchain.B txCache: txCache, chain: chain, chainParser: chain.GetChainParser(), + mempool: mempool, is: is, api: api, } diff --git a/server/public.go b/server/public.go index 12deb354..60ca2e53 100644 --- a/server/public.go +++ b/server/public.go @@ -26,6 +26,7 @@ import ( const txsOnPage = 25 const blocksOnPage = 50 +const mempoolTxsOnPage = 50 const txsInAPI = 1000 const ( @@ -45,6 +46,7 @@ type PublicServer struct { txCache *db.TxCache chain bchain.BlockChain chainParser bchain.BlockChainParser + mempool bchain.Mempool api *api.Worker explorerURL string internalExplorer bool @@ -56,19 +58,19 @@ type PublicServer struct { // NewPublicServer creates new public server http interface to blockbook and returns its handle // only basic functionality is mapped, to map all functions, call -func NewPublicServer(binding string, certFiles string, db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, explorerURL string, metrics *common.Metrics, is *common.InternalState, debugMode bool) (*PublicServer, error) { +func NewPublicServer(binding string, certFiles string, db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, explorerURL string, metrics *common.Metrics, is *common.InternalState, debugMode bool) (*PublicServer, error) { - api, err := api.NewWorker(db, chain, txCache, is) + api, err := api.NewWorker(db, chain, mempool, txCache, is) if err != nil { return nil, err } - socketio, err := NewSocketIoServer(db, chain, txCache, metrics, is) + socketio, err := NewSocketIoServer(db, chain, mempool, txCache, metrics, is) if err != nil { return nil, err } - websocket, err := NewWebsocketServer(db, chain, txCache, metrics, is) + websocket, err := NewWebsocketServer(db, chain, mempool, txCache, metrics, is) if err != nil { return nil, err } @@ -91,6 +93,7 @@ func NewPublicServer(binding string, certFiles string, db *db.RocksDB, chain bch txCache: txCache, chain: chain, chainParser: chain.GetChainParser(), + mempool: mempool, explorerURL: explorerURL, internalExplorer: explorerURL == "", metrics: metrics, @@ -137,6 +140,7 @@ func (s *PublicServer) ConnectFullPublicInterface() { serveMux.HandleFunc(path+"block/", s.htmlTemplateHandler(s.explorerBlock)) serveMux.HandleFunc(path+"spending/", s.htmlTemplateHandler(s.explorerSpendingTx)) serveMux.HandleFunc(path+"sendtx", s.htmlTemplateHandler(s.explorerSendTx)) + serveMux.HandleFunc(path+"mempool", s.htmlTemplateHandler(s.explorerMempool)) } else { // redirect to wallet requests for tx and address, possibly to external site serveMux.HandleFunc(path+"tx/", s.txRedirect) @@ -382,6 +386,7 @@ const ( blocksTpl blockTpl sendTransactionTpl + mempoolTpl tplCount ) @@ -400,6 +405,7 @@ type TemplateData struct { Blocks *api.Blocks Block *api.Block Info *api.SystemInfo + MempoolTxids *api.MempoolTxids Page int PrevPage int NextPage int @@ -475,6 +481,7 @@ func (s *PublicServer) parseTemplates() []*template.Template { t[blockTpl] = createTemplate("./static/templates/block.html", "./static/templates/txdetail.html", "./static/templates/paging.html", "./static/templates/base.html") } t[xpubTpl] = createTemplate("./static/templates/xpub.html", "./static/templates/txdetail.html", "./static/templates/paging.html", "./static/templates/base.html") + t[mempoolTpl] = createTemplate("./static/templates/mempool.html", "./static/templates/paging.html", "./static/templates/base.html") return t } @@ -796,6 +803,25 @@ func (s *PublicServer) explorerSendTx(w http.ResponseWriter, r *http.Request) (t return sendTransactionTpl, data, nil } +func (s *PublicServer) explorerMempool(w http.ResponseWriter, r *http.Request) (tpl, *TemplateData, error) { + var mempoolTxids *api.MempoolTxids + var err error + s.metrics.ExplorerViews.With(common.Labels{"action": "mempool"}).Inc() + page, ec := strconv.Atoi(r.URL.Query().Get("page")) + if ec != nil { + page = 0 + } + mempoolTxids, err = s.api.GetMempool(page, mempoolTxsOnPage) + if err != nil { + return errorTpl, nil, err + } + data := s.newTemplateData() + data.MempoolTxids = mempoolTxids + data.Page = mempoolTxids.Page + data.PagingRange, data.PrevPage, data.NextPage = getPagingRange(mempoolTxids.Page, mempoolTxids.TotalPages) + return mempoolTpl, data, nil +} + func getPagingRange(page int, total int) ([]int, int, int) { // total==-1 means total is unknown, show only prev/next buttons if total >= 0 && total < 2 { diff --git a/server/public_test.go b/server/public_test.go index e203d947..ccff606b 100644 --- a/server/public_test.go +++ b/server/public_test.go @@ -85,6 +85,11 @@ func setupPublicHTTPServer(t *testing.T) (*PublicServer, string) { glog.Fatal("fakechain: ", err) } + mempool, err := chain.CreateMempool(chain) + if err != nil { + glog.Fatal("mempool: ", err) + } + // caching is switched off because test transactions do not have hex data txCache, err := db.NewTxCache(d, chain, metrics, is, false) if err != nil { @@ -92,7 +97,7 @@ func setupPublicHTTPServer(t *testing.T) (*PublicServer, string) { } // s.Run is never called, binding can be to any port - s, err := NewPublicServer("localhost:12345", "", d, chain, txCache, "", metrics, is, false) + s, err := NewPublicServer("localhost:12345", "", d, chain, mempool, txCache, "", metrics, is, false) if err != nil { t.Fatal(err) } diff --git a/server/socketio.go b/server/socketio.go index fc70510d..4b350f27 100644 --- a/server/socketio.go +++ b/server/socketio.go @@ -26,14 +26,15 @@ type SocketIoServer struct { txCache *db.TxCache chain bchain.BlockChain chainParser bchain.BlockChainParser + mempool bchain.Mempool metrics *common.Metrics is *common.InternalState api *api.Worker } // NewSocketIoServer creates new SocketIo interface to blockbook and returns its handle -func NewSocketIoServer(db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, metrics *common.Metrics, is *common.InternalState) (*SocketIoServer, error) { - api, err := api.NewWorker(db, chain, txCache, is) +func NewSocketIoServer(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, metrics *common.Metrics, is *common.InternalState) (*SocketIoServer, error) { + api, err := api.NewWorker(db, chain, mempool, txCache, is) if err != nil { return nil, err } @@ -64,6 +65,7 @@ func NewSocketIoServer(db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCa txCache: txCache, chain: chain, chainParser: chain.GetChainParser(), + mempool: mempool, metrics: metrics, is: is, api: api, @@ -224,7 +226,7 @@ func (s *SocketIoServer) getAddressTxids(addr []string, opts *addrOpts) (res res return res, err } } else { - o, err := s.chain.GetMempoolTransactions(address) + o, err := s.mempool.GetTransactions(address) if err != nil { return res, err } @@ -326,13 +328,15 @@ func txToResTx(tx *api.Tx) resTx { outputs[i] = output } var h int + var blocktime int64 if tx.Confirmations == 0 { h = -1 } else { h = int(tx.Blockheight) + blocktime = tx.Blocktime } return resTx{ - BlockTimestamp: tx.Blocktime, + BlockTimestamp: blocktime, FeeSatoshis: tx.FeesSat.AsInt64(), Hash: tx.Txid, Height: h, diff --git a/server/websocket.go b/server/websocket.go index 0c9ec060..3f504ea3 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -59,6 +59,7 @@ type WebsocketServer struct { txCache *db.TxCache chain bchain.BlockChain chainParser bchain.BlockChainParser + mempool bchain.Mempool metrics *common.Metrics is *common.InternalState api *api.Worker @@ -70,8 +71,8 @@ type WebsocketServer struct { } // NewWebsocketServer creates new websocket interface to blockbook and returns its handle -func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, metrics *common.Metrics, is *common.InternalState) (*WebsocketServer, error) { - api, err := api.NewWorker(db, chain, txCache, is) +func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, metrics *common.Metrics, is *common.InternalState) (*WebsocketServer, error) { + api, err := api.NewWorker(db, chain, mempool, txCache, is) if err != nil { return nil, err } @@ -89,6 +90,7 @@ func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxC txCache: txCache, chain: chain, chainParser: chain.GetChainParser(), + mempool: mempool, metrics: metrics, is: is, api: api, diff --git a/static/css/main.css b/static/css/main.css index d54f9841..c0bf1ee2 100644 --- a/static/css/main.css +++ b/static/css/main.css @@ -268,6 +268,11 @@ table.data-table table.data-table th { margin: 0; } +.h-container h5 { + margin-top: 6px; + margin-bottom: 0; +} + .page-link { color: #428bca; } diff --git a/static/templates/index.html b/static/templates/index.html index f6638adc..5e275fc3 100644 --- a/static/templates/index.html +++ b/static/templates/index.html @@ -43,6 +43,10 @@ Last Mempool Update {{formatTime $bb.LastMempoolTime}} + + Transactions in Mempool + {{if .InternalExplorer}}{{$bb.MempoolSize}}{{else}}{{$bb.MempoolSize}}{{end}} + Size On Disk {{$bb.DbSize}} diff --git a/static/templates/mempool.html b/static/templates/mempool.html new file mode 100644 index 00000000..eb8b0da8 --- /dev/null +++ b/static/templates/mempool.html @@ -0,0 +1,27 @@ +{{define "specific"}}{{$txs := .MempoolTxids.Mempool}}{{$data := .}} +

Mempool Transactions by first seen time +

+
+
{{$.MempoolTxids.MempoolSize}} Transactions in mempool
+ +
+
+ + + + + + + + + {{- range $tx := $txs -}} + + + + + {{- end -}} + +
TransactionFirst Seen Time
{{$tx.Txid}}{{formatUnixTime $tx.Time}}
+
+ +{{end}} \ No newline at end of file diff --git a/static/templates/txdetail.html b/static/templates/txdetail.html index 6b3a3f82..6f8ae69c 100644 --- a/static/templates/txdetail.html +++ b/static/templates/txdetail.html @@ -4,9 +4,7 @@
{{$tx.Txid}}
- {{- if $tx.Confirmations -}} -
mined {{formatUnixTime $tx.Blocktime}}
- {{- end -}} + {{- if $tx.Blocktime}}
{{if $tx.Confirmations}}mined{{else}}first seen{{end}} {{formatUnixTime $tx.Blocktime}}
{{end -}}
diff --git a/static/templates/txdetail_ethereumtype.html b/static/templates/txdetail_ethereumtype.html index bb71a8cc..8f873453 100644 --- a/static/templates/txdetail_ethereumtype.html +++ b/static/templates/txdetail_ethereumtype.html @@ -5,9 +5,7 @@ {{$tx.Txid}} {{if eq $tx.EthereumSpecific.Status 1}}{{end}}{{if eq $tx.EthereumSpecific.Status 0}}{{end}}
- {{- if $tx.Confirmations -}} -
mined {{formatUnixTime $tx.Blocktime}}
- {{- end -}} + {{- if $tx.Blocktime}}
{{if $tx.Confirmations}}mined{{else}}first seen{{end}} {{formatUnixTime $tx.Blocktime}}
{{end -}}
diff --git a/static/templates/xpub.html b/static/templates/xpub.html index df324738..dfe196b8 100644 --- a/static/templates/xpub.html +++ b/static/templates/xpub.html @@ -29,8 +29,8 @@ Used XPUB Addresses {{$addr.TotalTokens}} - {{- if or $addr.Tokens $addr.TotalTokens -}} - + + {{- if or $addr.Tokens $addr.TotalTokens -}} {{if $data.NonZeroBalanceTokens}}XPUB Addresses with Balance{{else}}XPUB Addresses{{end}} @@ -51,15 +51,17 @@ {{- end -}} {{- if $data.NonZeroBalanceTokens -}} - + {{- end -}}
Show all XPUB addressesShow used XPUB addressesShow derived XPUB addresses
+ {{- else -}} + Show derived XPUB addresses + {{- end -}} - {{- end -}} - +
diff --git a/tests/dbtestdata/fakechain.go b/tests/dbtestdata/fakechain.go index 71515bbb..cee7c16b 100644 --- a/tests/dbtestdata/fakechain.go +++ b/tests/dbtestdata/fakechain.go @@ -17,10 +17,18 @@ func NewFakeBlockChain(parser bchain.BlockChainParser) (bchain.BlockChain, error return &fakeBlockChain{&bchain.BaseChain{Parser: parser}}, nil } +func (b *fakeBlockChain) CreateMempool(chain bchain.BlockChain) (bchain.Mempool, error) { + return bchain.NewMempoolBitcoinType(chain, 1, 1), nil +} + func (c *fakeBlockChain) Initialize() error { return nil } +func (c *fakeBlockChain) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc, onNewTxAddr bchain.OnNewTxAddrFunc) error { + return nil +} + func (c *fakeBlockChain) Shutdown(ctx context.Context) error { return nil } @@ -118,10 +126,6 @@ func (c *fakeBlockChain) GetBlockInfo(hash string) (v *bchain.BlockInfo, err err return nil, bchain.ErrBlockNotFound } -func (c *fakeBlockChain) GetMempool() (v []string, err error) { - return nil, errors.New("Not implemented") -} - func getTxInBlock(b *bchain.Block, txid string) *bchain.Tx { for _, tx := range b.Txs { if tx.Txid == txid { @@ -179,22 +183,12 @@ func (c *fakeBlockChain) SendRawTransaction(tx string) (v string, err error) { return "", errors.New("Invalid data") } -func (c *fakeBlockChain) ResyncMempool(onNewTxAddr bchain.OnNewTxAddrFunc) (count int, err error) { - return 0, errors.New("Not implemented") -} - -func (c *fakeBlockChain) GetMempoolTransactions(address string) (v []bchain.Outpoint, err error) { - return nil, errors.New("Not implemented") -} - -func (c *fakeBlockChain) GetMempoolTransactionsForAddrDesc(addrDesc bchain.AddressDescriptor) (v []bchain.Outpoint, err error) { - return []bchain.Outpoint{}, nil -} - -func (c *fakeBlockChain) GetMempoolEntry(txid string) (v *bchain.MempoolEntry, err error) { - return nil, errors.New("Not implemented") -} - +// GetChainParser returns parser for the blockchain func (c *fakeBlockChain) GetChainParser() bchain.BlockChainParser { return c.Parser } + +// GetMempoolTransactions returns transactions in mempool +func (b *fakeBlockChain) GetMempoolTransactions() ([]string, error) { + return nil, errors.New("Not implemented") +} diff --git a/tests/integration.go b/tests/integration.go index ad0815f4..a33026b3 100644 --- a/tests/integration.go +++ b/tests/integration.go @@ -5,7 +5,7 @@ package tests import ( "blockbook/bchain" "blockbook/bchain/coins" - "blockbook/build/tools" + build "blockbook/build/tools" "blockbook/tests/rpc" "blockbook/tests/sync" "encoding/json" @@ -23,7 +23,7 @@ import ( "github.com/martinboehm/btcutil/chaincfg" ) -type TestFunc func(t *testing.T, coin string, chain bchain.BlockChain, testConfig json.RawMessage) +type TestFunc func(t *testing.T, coin string, chain bchain.BlockChain, mempool bchain.Mempool, testConfig json.RawMessage) var integrationTests = map[string]TestFunc{ "rpc": rpc.IntegrationTest, @@ -76,7 +76,7 @@ func runTests(t *testing.T, coin string, cfg map[string]json.RawMessage) { } defer chaincfg.ResetParams() - bc, err := makeBlockChain(coin) + bc, m, err := makeBlockChain(coin) if err != nil { if err == notConnectedError { t.Fatal(err) @@ -86,44 +86,44 @@ func runTests(t *testing.T, coin string, cfg map[string]json.RawMessage) { for test, c := range cfg { if fn, found := integrationTests[test]; found { - t.Run(test, func(t *testing.T) { fn(t, coin, bc, c) }) + t.Run(test, func(t *testing.T) { fn(t, coin, bc, m, c) }) } else { t.Errorf("Test not found: %s", test) } } } -func makeBlockChain(coin string) (bchain.BlockChain, error) { +func makeBlockChain(coin string) (bchain.BlockChain, bchain.Mempool, error) { c, err := build.LoadConfig("../configs", coin) if err != nil { - return nil, err + return nil, nil, err } outputDir, err := ioutil.TempDir("", "integration_test") if err != nil { - return nil, err + return nil, nil, err } defer os.RemoveAll(outputDir) err = build.GeneratePackageDefinitions(c, "../build/templates", outputDir) if err != nil { - return nil, err + return nil, nil, err } b, err := ioutil.ReadFile(filepath.Join(outputDir, "blockbook", "blockchaincfg.json")) if err != nil { - return nil, err + return nil, nil, err } var cfg json.RawMessage err = json.Unmarshal(b, &cfg) if err != nil { - return nil, err + return nil, nil, err } coinName, err := getName(cfg) if err != nil { - return nil, err + return nil, nil, err } return initBlockChain(coinName, cfg) @@ -147,29 +147,39 @@ func getName(raw json.RawMessage) (string, error) { } } -func initBlockChain(coinName string, cfg json.RawMessage) (bchain.BlockChain, error) { +func initBlockChain(coinName string, cfg json.RawMessage) (bchain.BlockChain, bchain.Mempool, error) { factory, found := coins.BlockChainFactories[coinName] if !found { - return nil, fmt.Errorf("Factory function not found") + return nil, nil, fmt.Errorf("Factory function not found") } cli, err := factory(cfg, func(_ bchain.NotificationType) {}) if err != nil { if isNetError(err) { - return nil, notConnectedError + return nil, nil, notConnectedError } - return nil, fmt.Errorf("Factory function failed: %s", err) + return nil, nil, fmt.Errorf("Factory function failed: %s", err) } err = cli.Initialize() if err != nil { if isNetError(err) { - return nil, notConnectedError + return nil, nil, notConnectedError } - return nil, fmt.Errorf("BlockChain initialization failed: %s", err) + return nil, nil, fmt.Errorf("BlockChain initialization failed: %s", err) } - return cli, nil + mempool, err := cli.CreateMempool(cli) + if err != nil { + return nil, nil, fmt.Errorf("Mempool creation failed: %s", err) + } + + err = cli.InitializeMempool(nil, nil) + if err != nil { + return nil, nil, fmt.Errorf("Mempool initialization failed: %s", err) + } + + return cli, mempool, nil } func isNetError(err error) bool { diff --git a/tests/rpc/rpc.go b/tests/rpc/rpc.go index e8722062..d33e6fb7 100644 --- a/tests/rpc/rpc.go +++ b/tests/rpc/rpc.go @@ -30,6 +30,7 @@ var testMap = map[string]func(t *testing.T, th *TestHandler){ type TestHandler struct { Chain bchain.BlockChain + Mempool bchain.Mempool TestData *TestData } @@ -42,7 +43,7 @@ type TestData struct { TxDetails map[string]*bchain.Tx `json:"txDetails"` } -func IntegrationTest(t *testing.T, coin string, chain bchain.BlockChain, testConfig json.RawMessage) { +func IntegrationTest(t *testing.T, coin string, chain bchain.BlockChain, mempool bchain.Mempool, testConfig json.RawMessage) { tests, err := getTests(testConfig) if err != nil { t.Fatalf("Failed loading of test list: %s", err) @@ -54,7 +55,11 @@ func IntegrationTest(t *testing.T, coin string, chain bchain.BlockChain, testCon t.Fatalf("Failed loading of test data: %s", err) } - h := TestHandler{Chain: chain, TestData: td} + h := TestHandler{ + Chain: chain, + Mempool: mempool, + TestData: td, + } for _, test := range tests { if f, found := testMap[test]; found { @@ -195,7 +200,7 @@ func testMempoolSync(t *testing.T, h *TestHandler) { for i := 0; i < 3; i++ { txs := getMempool(t, h) - n, err := h.Chain.ResyncMempool(nil) + n, err := h.Mempool.Resync() if err != nil { t.Fatal(err) } @@ -217,7 +222,7 @@ func testMempoolSync(t *testing.T, h *TestHandler) { for txid, addrs := range txid2addrs { for _, a := range addrs { - got, err := h.Chain.GetMempoolTransactions(a) + got, err := h.Mempool.GetTransactions(a) if err != nil { t.Fatalf("address %q: %s", a, err) } @@ -337,7 +342,7 @@ func testGetBlockHeader(t *testing.T, h *TestHandler) { } func getMempool(t *testing.T, h *TestHandler) []string { - txs, err := h.Chain.GetMempool() + txs, err := h.Chain.GetMempoolTransactions() if err != nil { t.Fatal(err) } diff --git a/tests/sync/sync.go b/tests/sync/sync.go index 6c01bfc0..2364da44 100644 --- a/tests/sync/sync.go +++ b/tests/sync/sync.go @@ -54,7 +54,7 @@ type BlockInfo struct { TxDetails []*bchain.Tx `json:"txDetails"` } -func IntegrationTest(t *testing.T, coin string, chain bchain.BlockChain, testConfig json.RawMessage) { +func IntegrationTest(t *testing.T, coin string, chain bchain.BlockChain, mempool bchain.Mempool, testConfig json.RawMessage) { tests, err := getTests(testConfig) if err != nil { t.Fatalf("Failed loading of test list: %s", err)