Add getMempoolFilters websocket method

This commit is contained in:
Martin Boehm 2023-05-10 00:59:17 +02:00
parent 3ab5e636ff
commit 4c41b414c4
15 changed files with 194 additions and 73 deletions

View File

@ -14,11 +14,13 @@ type addrIndex struct {
type txEntry struct {
addrIndexes []addrIndex
time uint32
filter string
}
type txidio struct {
txid string
io []addrIndex
txid string
io []addrIndex
filter string
}
// BaseMempool is mempool base handle

View File

@ -383,3 +383,7 @@ func (c *mempoolWithMetrics) GetAllEntries() (v bchain.MempoolTxidEntries) {
func (c *mempoolWithMetrics) GetTransactionTime(txid string) uint32 {
return c.mempool.GetTransactionTime(txid)
}
func (c *mempoolWithMetrics) GetTxidFilterEntries(filterScripts string, fromTimestamp uint32) (bchain.MempoolTxidFilterEntries, error) {
return c.mempool.GetTxidFilterEntries(filterScripts, fromTimestamp)
}

View File

@ -23,18 +23,18 @@ import (
// BitcoinRPC is an interface to JSON-RPC bitcoind service.
type BitcoinRPC struct {
*bchain.BaseChain
client http.Client
rpcURL string
user string
password string
Mempool *bchain.MempoolBitcoinType
ParseBlocks bool
pushHandler func(bchain.NotificationType)
mq *bchain.MQ
ChainConfig *Configuration
RPCMarshaler RPCMarshaler
golombFilterP uint8
golombFilterScripts string
client http.Client
rpcURL string
user string
password string
Mempool *bchain.MempoolBitcoinType
ParseBlocks bool
pushHandler func(bchain.NotificationType)
mq *bchain.MQ
ChainConfig *Configuration
RPCMarshaler RPCMarshaler
golombFilterP uint8
mempoolFilterScripts string
}
// Configuration represents json config file
@ -63,7 +63,7 @@ type Configuration struct {
AlternativeEstimateFeeParams string `json:"alternative_estimate_fee_params,omitempty"`
MinimumCoinbaseConfirmations int `json:"minimumCoinbaseConfirmations,omitempty"`
GolombFilterP uint8 `json:"golomb_filter_p,omitempty"`
GolombFilterScripts string `json:"golomb_filter_scripts,omitempty"`
MempoolFilterScripts string `json:"mempool_filter_scripts,omitempty"`
}
// NewBitcoinRPC returns new BitcoinRPC instance.
@ -100,17 +100,17 @@ func NewBitcoinRPC(config json.RawMessage, pushHandler func(bchain.NotificationT
}
s := &BitcoinRPC{
BaseChain: &bchain.BaseChain{},
client: http.Client{Timeout: time.Duration(c.RPCTimeout) * time.Second, Transport: transport},
rpcURL: c.RPCURL,
user: c.RPCUser,
password: c.RPCPass,
ParseBlocks: c.Parse,
ChainConfig: &c,
pushHandler: pushHandler,
RPCMarshaler: JSONMarshalerV2{},
golombFilterP: c.GolombFilterP,
golombFilterScripts: c.GolombFilterScripts,
BaseChain: &bchain.BaseChain{},
client: http.Client{Timeout: time.Duration(c.RPCTimeout) * time.Second, Transport: transport},
rpcURL: c.RPCURL,
user: c.RPCUser,
password: c.RPCPass,
ParseBlocks: c.Parse,
ChainConfig: &c,
pushHandler: pushHandler,
RPCMarshaler: JSONMarshalerV2{},
golombFilterP: c.GolombFilterP,
mempoolFilterScripts: c.MempoolFilterScripts,
}
return s, nil
@ -156,7 +156,7 @@ func (b *BitcoinRPC) Initialize() error {
// CreateMempool creates mempool if not already created, however does not initialize it
func (b *BitcoinRPC) CreateMempool(chain bchain.BlockChain) (bchain.Mempool, error) {
if b.Mempool == nil {
b.Mempool = bchain.NewMempoolBitcoinType(chain, b.ChainConfig.MempoolWorkers, b.ChainConfig.MempoolSubWorkers, b.golombFilterP, b.golombFilterScripts)
b.Mempool = bchain.NewMempoolBitcoinType(chain, b.ChainConfig.MempoolWorkers, b.ChainConfig.MempoolSubWorkers, b.golombFilterP, b.mempoolFilterScripts)
}
return b.Mempool, nil
}

View File

@ -2,6 +2,8 @@ package bchain
import (
"encoding/hex"
"errors"
"fmt"
"math/big"
"time"
@ -14,10 +16,13 @@ type chanInputPayload struct {
index int
}
type golombFilterScriptsType int
type filterScriptsType int
const golombFilterScriptsAll = golombFilterScriptsType(0)
const golombFilterScriptsTaproot = golombFilterScriptsType(1)
const (
filterScriptsInvalid = filterScriptsType(iota)
filterScriptsAll
filterScriptsTaproot
)
// MempoolBitcoinType is mempool handle.
type MempoolBitcoinType struct {
@ -27,20 +32,15 @@ type MempoolBitcoinType struct {
AddrDescForOutpoint AddrDescForOutpointFunc
golombFilterP uint8
golombFilterM uint64
golombFilterScripts golombFilterScriptsType
filterScripts filterScriptsType
}
// NewMempoolBitcoinType creates new mempool handler.
// For now there is no cleanup of sync routines, the expectation is that the mempool is created only once per process
func NewMempoolBitcoinType(chain BlockChain, workers int, subworkers int, golombFilterP uint8, golombFilterScripts string) *MempoolBitcoinType {
var filterScripts golombFilterScriptsType
switch golombFilterScripts {
case "":
filterScripts = golombFilterScriptsAll
case "taproot":
filterScripts = golombFilterScriptsTaproot
default:
glog.Error("Invalid golombFilterScripts ", golombFilterScripts, ", switching off golomb filter")
func NewMempoolBitcoinType(chain BlockChain, workers int, subworkers int, golombFilterP uint8, filterScripts string) *MempoolBitcoinType {
filterScriptsType := filterScriptsToScriptsType(filterScripts)
if filterScriptsType == filterScriptsInvalid {
glog.Error("Invalid filterScripts ", filterScripts, ", switching off golomb filter")
golombFilterP = 0
}
golombFilterM := uint64(1 << golombFilterP)
@ -50,11 +50,11 @@ func NewMempoolBitcoinType(chain BlockChain, workers int, subworkers int, golomb
txEntries: make(map[string]txEntry),
addrDescToTx: make(map[string][]Outpoint),
},
chanTxid: make(chan string, 1),
chanAddrIndex: make(chan txidio, 1),
golombFilterP: golombFilterP,
golombFilterM: golombFilterM,
golombFilterScripts: filterScripts,
chanTxid: make(chan string, 1),
chanAddrIndex: make(chan txidio, 1),
golombFilterP: golombFilterP,
golombFilterM: golombFilterM,
filterScripts: filterScriptsType,
}
for i := 0; i < workers; i++ {
go func(i int) {
@ -69,11 +69,11 @@ func NewMempoolBitcoinType(chain BlockChain, workers int, subworkers int, golomb
}(j)
}
for txid := range m.chanTxid {
io, ok := m.getTxAddrs(txid, chanInput, chanResult)
io, golombFilter, ok := m.getTxAddrs(txid, chanInput, chanResult)
if !ok {
io = []addrIndex{}
}
m.chanAddrIndex <- txidio{txid, io}
m.chanAddrIndex <- txidio{txid, io, golombFilter}
}
}(i)
}
@ -81,6 +81,16 @@ func NewMempoolBitcoinType(chain BlockChain, workers int, subworkers int, golomb
return m
}
func filterScriptsToScriptsType(filterScripts string) filterScriptsType {
switch filterScripts {
case "":
return filterScriptsAll
case "taproot":
return filterScriptsTaproot
}
return filterScriptsInvalid
}
func (m *MempoolBitcoinType) getInputAddress(payload *chanInputPayload) *addrIndex {
var addrDesc AddressDescriptor
var value *big.Int
@ -126,7 +136,7 @@ func (m *MempoolBitcoinType) computeGolombFilter(mtx *MempoolTx) string {
filterData := make([][]byte, 0)
for i := range mtx.Vin {
vin := &mtx.Vin[i]
if m.golombFilterScripts == golombFilterScriptsAll || (m.golombFilterScripts == golombFilterScriptsTaproot && isTaproot(vin.AddrDesc)) {
if m.filterScripts == filterScriptsAll || (m.filterScripts == filterScriptsTaproot && isTaproot(vin.AddrDesc)) {
filterData = append(filterData, vin.AddrDesc)
}
}
@ -134,7 +144,7 @@ func (m *MempoolBitcoinType) computeGolombFilter(mtx *MempoolTx) string {
vout := &mtx.Vout[i]
b, err := hex.DecodeString(vout.ScriptPubKey.Hex)
if err == nil {
if m.golombFilterScripts == golombFilterScriptsAll || (m.golombFilterScripts == golombFilterScriptsTaproot && isTaproot(b)) {
if m.filterScripts == filterScriptsAll || (m.filterScripts == filterScriptsTaproot && isTaproot(b)) {
filterData = append(filterData, b)
}
}
@ -149,16 +159,21 @@ func (m *MempoolBitcoinType) computeGolombFilter(mtx *MempoolTx) string {
filter, err := gcs.BuildGCSFilter(m.golombFilterP, m.golombFilterM, *(*[gcs.KeySize]byte)(b[:gcs.KeySize]), filterData)
if err != nil {
glog.Error("Cannot create golomb filter for ", mtx.Txid, ", ", err)
return ""
}
fb, err := filter.NBytes()
if err != nil {
glog.Error("Error getting NBytes from golomb filter for ", mtx.Txid, ", ", err)
return ""
}
fb, _ := filter.Bytes()
return hex.EncodeToString(fb)
}
func (m *MempoolBitcoinType) getTxAddrs(txid string, chanInput chan chanInputPayload, chanResult chan *addrIndex) ([]addrIndex, bool) {
func (m *MempoolBitcoinType) getTxAddrs(txid string, chanInput chan chanInputPayload, chanResult chan *addrIndex) ([]addrIndex, string, bool) {
tx, err := m.chain.GetTransactionForMempool(txid)
if err != nil {
glog.Error("cannot get transaction ", txid, ": ", err)
return nil, false
return nil, "", false
}
glog.V(2).Info("mempool: gettxaddrs ", txid, ", ", len(tx.Vin), " inputs")
mtx := m.txToMempoolTx(tx)
@ -205,13 +220,14 @@ func (m *MempoolBitcoinType) getTxAddrs(txid string, chanInput chan chanInputPay
io = append(io, *ai)
}
}
var golombFilter string
if m.golombFilterP > 0 {
mtx.GolombFilter = m.computeGolombFilter(mtx)
golombFilter = m.computeGolombFilter(mtx)
}
if m.OnNewTx != nil {
m.OnNewTx(mtx)
}
return io, true
return io, golombFilter, true
}
// Resync gets mempool transactions and maps outputs to transactions.
@ -248,7 +264,7 @@ func (m *MempoolBitcoinType) Resync() (int, error) {
select {
// store as many processed transactions as possible
case tio := <-m.chanAddrIndex:
onNewEntry(tio.txid, txEntry{tio.io, txTime})
onNewEntry(tio.txid, txEntry{tio.io, txTime, tio.filter})
dispatched--
// send transaction to be processed
case m.chanTxid <- txid:
@ -260,7 +276,7 @@ func (m *MempoolBitcoinType) Resync() (int, error) {
}
for i := 0; i < dispatched; i++ {
tio := <-m.chanAddrIndex
onNewEntry(tio.txid, txEntry{tio.io, txTime})
onNewEntry(tio.txid, txEntry{tio.io, txTime, tio.filter})
}
for txid, entry := range m.txEntries {
@ -273,3 +289,19 @@ func (m *MempoolBitcoinType) Resync() (int, error) {
glog.Info("mempool: resync finished in ", time.Since(start), ", ", len(m.txEntries), " transactions in mempool")
return len(m.txEntries), nil
}
// GetTxidFilterEntries returns all mempool entries with golomb filter from
func (m *MempoolBitcoinType) GetTxidFilterEntries(filterScripts string, fromTimestamp uint32) (MempoolTxidFilterEntries, error) {
if m.filterScripts != filterScriptsToScriptsType(filterScripts) {
return MempoolTxidFilterEntries{}, errors.New(fmt.Sprint("Unsupported script filter ", filterScripts))
}
m.mux.Lock()
entries := make(map[string]string)
for txid, entry := range m.txEntries {
if entry.filter != "" && entry.time >= fromTimestamp {
entries[txid] = entry.filter
}
}
m.mux.Unlock()
return MempoolTxidFilterEntries{entries}, nil
}

View File

@ -15,19 +15,17 @@ func hexToBytes(h string) []byte {
func TestMempoolBitcoinType_computeGolombFilter_taproot(t *testing.T) {
randomScript := hexToBytes("a914ff074800343a81ada8fe86c2d5d5a0e55b93dd7a87")
m := &MempoolBitcoinType{
golombFilterP: 20,
golombFilterM: uint64(1 << 20),
golombFilterScripts: golombFilterScriptsTaproot,
golombFilterP: 20,
golombFilterM: uint64(1 << 20),
filterScripts: filterScriptsTaproot,
}
tests := []struct {
name string
N uint32
mtx MempoolTx
want string
}{
{
name: "taproot",
N: 2,
mtx: MempoolTx{
Txid: "86336c62a63f509a278624e3f400cdd50838d035a44e0af8a7d6d133c04cc2d2",
Vin: []MempoolVin{
@ -47,11 +45,10 @@ func TestMempoolBitcoinType_computeGolombFilter_taproot(t *testing.T) {
},
},
},
want: "35dddcce5d60",
want: "0235dddcce5d60",
},
{
name: "taproot multiple",
N: 7,
mtx: MempoolTx{
Txid: "86336c62a63f509a278624e3f400cdd50838d035a44e0af8a7d6d133c04cc2d2",
Vin: []MempoolVin{
@ -103,11 +100,10 @@ func TestMempoolBitcoinType_computeGolombFilter_taproot(t *testing.T) {
},
},
},
want: "1143e4ad12730965a5247ac15db8c81c89b0bc",
want: "071143e4ad12730965a5247ac15db8c81c89b0bc",
},
{
name: "partial taproot",
N: 1,
mtx: MempoolTx{
Txid: "86336c62a63f509a278624e3f400cdd50838d035a44e0af8a7d6d133c04cc2d2",
Vin: []MempoolVin{
@ -127,11 +123,10 @@ func TestMempoolBitcoinType_computeGolombFilter_taproot(t *testing.T) {
},
},
},
want: "1aeee8",
want: "011aeee8",
},
{
name: "no taproot",
N: 0,
mtx: MempoolTx{
Txid: "86336c62a63f509a278624e3f400cdd50838d035a44e0af8a7d6d133c04cc2d2",
Vin: []MempoolVin{
@ -162,7 +157,7 @@ func TestMempoolBitcoinType_computeGolombFilter_taproot(t *testing.T) {
}
if got != "" {
// build the filter from computed value
filter, err := gcs.FromBytes(tt.N, m.golombFilterP, m.golombFilterM, hexToBytes(got))
filter, err := gcs.FromNBytes(m.golombFilterP, m.golombFilterM, hexToBytes(got))
if err != nil {
t.Errorf("gcs.BuildGCSFilter() unexpected error %v", err)
}

View File

@ -1,6 +1,7 @@
package bchain
import (
"errors"
"time"
"github.com/golang/glog"
@ -165,3 +166,8 @@ func (m *MempoolEthereumType) RemoveTransactionFromMempool(txid string) {
}
m.mux.Unlock()
}
// GetTxidFilterEntries returns all mempool entries with golomb filter from
func (m *MempoolEthereumType) GetTxidFilterEntries(filterScripts string, fromTimestamp uint32) (MempoolTxidFilterEntries, error) {
return MempoolTxidFilterEntries{}, errors.New("Not supported")
}

View File

@ -113,7 +113,6 @@ type MempoolTx struct {
Blocktime int64 `json:"blocktime,omitempty"`
TokenTransfers TokenTransfers `json:"-"`
CoinSpecificData interface{} `json:"-"`
GolombFilter string `json:"-"`
}
// TokenType - type of token
@ -266,6 +265,11 @@ type XpubDescriptor struct {
// MempoolTxidEntries is array of MempoolTxidEntry
type MempoolTxidEntries []MempoolTxidEntry
// MempoolTxidFilterEntries is a map of txids to mempool golomb filters
type MempoolTxidFilterEntries struct {
Entries map[string]string `json:"entries,omitempty"`
}
// OnNewBlockFunc is used to send notification about a new block
type OnNewBlockFunc func(hash string, height uint32)
@ -379,4 +383,5 @@ type Mempool interface {
GetAddrDescTransactions(addrDesc AddressDescriptor) ([]Outpoint, error)
GetAllEntries() MempoolTxidEntries
GetTransactionTime(txid string) uint32
GetTxidFilterEntries(filterScripts string, fromTimestamp uint32) (MempoolTxidFilterEntries, error)
}

View File

@ -309,7 +309,8 @@ export interface WsReq {
| 'ping'
| 'getCurrentFiatRates'
| 'getFiatRatesForTimestamps'
| 'getFiatRatesTickersList';
| 'getFiatRatesTickersList'
| 'getMempoolFilters';
params: any;
}
export interface WsRes {
@ -412,3 +413,10 @@ export interface WsFiatRatesTickersListReq {
timestamp?: number;
token?: string;
}
export interface WsMempoolFiltersReq {
scriptType: string;
fromTimestamp: number;
}
export interface MempoolTxidFilterEntries {
entries?: { [key: string]: string };
}

View File

@ -7,6 +7,7 @@ import (
"github.com/tkrajina/typescriptify-golang-structs/typescriptify"
"github.com/trezor/blockbook/api"
"github.com/trezor/blockbook/bchain"
"github.com/trezor/blockbook/server"
)
@ -56,6 +57,8 @@ func main() {
t.Add(server.WsCurrentFiatRatesReq{})
t.Add(server.WsFiatRatesForTimestampsReq{})
t.Add(server.WsFiatRatesTickersListReq{})
t.Add(server.WsMempoolFiltersReq{})
t.Add(bchain.MempoolTxidFilterEntries{})
err := t.ConvertToFile("blockbook-api.ts")
if err != nil {

View File

@ -69,7 +69,7 @@
"fiat_rates_vs_currencies": "AED,ARS,AUD,BDT,BHD,BMD,BRL,CAD,CHF,CLP,CNY,CZK,DKK,EUR,GBP,HKD,HUF,IDR,ILS,INR,JPY,KRW,KWD,LKR,MMK,MXN,MYR,NGN,NOK,NZD,PHP,PKR,PLN,RUB,SAR,SEK,SGD,THB,TRY,TWD,UAH,USD,VEF,VND,ZAR,BTC,ETH",
"fiat_rates_params": "{\"coin\": \"bitcoin\", \"periodSeconds\": 900}",
"golomb_filter_p": 20,
"golomb_filter_scripts": "taproot"
"mempool_filter_scripts": "taproot"
}
}
},

View File

@ -65,7 +65,7 @@
"slip44": 1,
"additional_params": {
"golomb_filter_p": 20,
"golomb_filter_scripts": "taproot"
"mempool_filter_scripts": "taproot"
}
}
},

View File

@ -1451,6 +1451,26 @@ func websocketTestsBitcoinType(t *testing.T, ts *httptest.Server) {
},
want: `{"id":"40","data":{"error":{"message":"Not supported"}}}`,
},
{
name: "websocket getMempoolFilters",
req: websocketReq{
Method: "getMempoolFilters",
Params: map[string]interface{}{
"scriptType": "",
},
},
want: `{"id":"41","data":{}}`,
},
{
name: "websocket getMempoolFilters invalid type",
req: websocketReq{
Method: "getMempoolFilters",
Params: map[string]interface{}{
"scriptType": "invalid",
},
},
want: `{"id":"42","data":{"error":{"message":"Unsupported script filter invalid"}}}`,
},
}
// send all requests at once

View File

@ -342,6 +342,14 @@ var requestHandlers = map[string]func(*WebsocketServer, *websocketChannel, *WsRe
}
return
},
"getMempoolFilters": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) {
r := WsMempoolFiltersReq{}
err = json.Unmarshal(req.Params, &r)
if err == nil {
rv, err = s.getMempoolFilters(&r)
}
return
},
"subscribeNewBlock": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) {
return s.subscribeNewBlock(c, req)
},
@ -632,6 +640,11 @@ func (s *WebsocketServer) sendTransaction(tx string) (res resultSendTransaction,
return
}
func (s *WebsocketServer) getMempoolFilters(r *WsMempoolFiltersReq) (res bchain.MempoolTxidFilterEntries, err error) {
res, err = s.mempool.GetTxidFilterEntries(r.ScriptType, r.FromTimestamp)
return
}
type subscriptionResponse struct {
Subscribed bool `json:"subscribed"`
}

View File

@ -4,7 +4,7 @@ import "encoding/json"
type WsReq struct {
ID string `json:"id"`
Method string `json:"method" ts_type:"'getAccountInfo' | 'getInfo' | 'getBlockHash'| 'getBlock' | 'getAccountUtxo' | 'getBalanceHistory' | 'getTransaction' | 'getTransactionSpecific' | 'estimateFee' | 'sendTransaction' | 'subscribeNewBlock' | 'unsubscribeNewBlock' | 'subscribeNewTransaction' | 'unsubscribeNewTransaction' | 'subscribeAddresses' | 'unsubscribeAddresses' | 'subscribeFiatRates' | 'unsubscribeFiatRates' | 'ping' | 'getCurrentFiatRates' | 'getFiatRatesForTimestamps' | 'getFiatRatesTickersList'"`
Method string `json:"method" ts_type:"'getAccountInfo' | 'getInfo' | 'getBlockHash'| 'getBlock' | 'getAccountUtxo' | 'getBalanceHistory' | 'getTransaction' | 'getTransactionSpecific' | 'estimateFee' | 'sendTransaction' | 'subscribeNewBlock' | 'unsubscribeNewBlock' | 'subscribeNewTransaction' | 'unsubscribeNewTransaction' | 'subscribeAddresses' | 'unsubscribeAddresses' | 'subscribeFiatRates' | 'unsubscribeFiatRates' | 'ping' | 'getCurrentFiatRates' | 'getFiatRatesForTimestamps' | 'getFiatRatesTickersList' | 'getMempoolFilters'"`
Params json.RawMessage `json:"params" ts_type:"any"`
}
@ -76,6 +76,11 @@ type WsTransactionReq struct {
Txid string `json:"txid"`
}
type WsMempoolFiltersReq struct {
ScriptType string `json:"scriptType"`
FromTimestamp uint32 `json:"fromTimestamp"`
}
type WsTransactionSpecificReq struct {
Txid string `json:"txid"`
}

View File

@ -397,6 +397,20 @@
});
}
function getMempoolFilters() {
const method = 'getMempoolFilters';
var timestamp = document.getElementById('getMempoolFiltersFromTimestamp').value;
var scriptType = document.getElementById('getMempoolFiltersScriptType').value;
fromTimestamp = parseInt(timestamp);
const params = {
scriptType,
fromTimestamp,
};
send(method, params, function (result) {
document.getElementById('getMempoolFiltersResult').innerText = JSON.stringify(result).replace(/,/g, ", ");
});
}
function subscribeNewFiatRatesTicker() {
const method = 'subscribeFiatRates';
var currency = document.getElementById('subscribeFiatRatesCurrency').value;
@ -661,6 +675,20 @@
<div class="row">
<div class="col" id="getFiatRatesTickersListResult"></div>
</div>
<div class="row">
<div class="col-2">
<input class="btn btn-secondary" type="button" value="get mempool filters" onclick="getMempoolFilters()">
</div>
<div class="col-5">
<input type="text" class="form-control" id="getMempoolFiltersScriptType" value="taproot" placeholder="filter script">
</div>
<div class="col-4">
<input type="text" class="form-control" id="getMempoolFiltersFromTimestamp" value="" placeholder="From unix timestamp">
</div>
</div>
<div class="row">
<div class="col" id="getMempoolFiltersResult"></div>
</div>
<div class="row">
<div class="col">
<input class="btn btn-secondary" type="button" value="subscribe new block" onclick="subscribeNewBlock()">