Add websocket interface

This commit is contained in:
Martin Boehm 2018-12-10 17:22:37 +01:00
parent 75e2ffa025
commit 13f7b48ae6
9 changed files with 128 additions and 21 deletions

View File

@ -101,20 +101,25 @@ func (w *Worker) GetSpendingTxid(txid string, n int) (string, error) {
// GetTransaction reads transaction data from txid // GetTransaction reads transaction data from txid
func (w *Worker) GetTransaction(txid string, spendingTxs bool, specificJSON bool) (*Tx, error) { func (w *Worker) GetTransaction(txid string, spendingTxs bool, specificJSON bool) (*Tx, error) {
start := time.Now()
bchainTx, height, err := w.txCache.GetTransaction(txid) bchainTx, height, err := w.txCache.GetTransaction(txid)
if err != nil { if err != nil {
return nil, NewAPIError(fmt.Sprintf("Tx not found, %v", err), true) return nil, NewAPIError(fmt.Sprintf("Tx not found, %v", err), true)
} }
return w.GetTransactionFromBchainTx(bchainTx, height, spendingTxs, specificJSON)
}
// GetTransactionFromBchainTx reads transaction data from txid
func (w *Worker) GetTransactionFromBchainTx(bchainTx *bchain.Tx, height uint32, spendingTxs bool, specificJSON bool) (*Tx, error) {
var err error
var ta *db.TxAddresses var ta *db.TxAddresses
var erc20t []Erc20Transfer var erc20t []Erc20Transfer
var ethSpecific *eth.EthereumTxData var ethSpecific *eth.EthereumTxData
var blockhash string var blockhash string
if bchainTx.Confirmations > 0 { if bchainTx.Confirmations > 0 {
if w.chainType == bchain.ChainBitcoinType { if w.chainType == bchain.ChainBitcoinType {
ta, err = w.db.GetTxAddresses(txid) ta, err = w.db.GetTxAddresses(bchainTx.Txid)
if err != nil { if err != nil {
return nil, errors.Annotatef(err, "GetTxAddresses %v", txid) return nil, errors.Annotatef(err, "GetTxAddresses %v", bchainTx.Txid)
} }
} }
blockhash, err = w.db.GetBlockHash(height) blockhash, err = w.db.GetBlockHash(height)
@ -284,9 +289,6 @@ func (w *Worker) GetTransaction(txid string, spendingTxs bool, specificJSON bool
Erc20Transfers: erc20t, Erc20Transfers: erc20t,
EthereumSpecific: ethSpecific, EthereumSpecific: ethSpecific,
} }
if spendingTxs {
glog.Info("GetTransaction ", txid, " finished in ", time.Since(start))
}
return r, nil return r, nil
} }

View File

@ -122,7 +122,7 @@ func (m *MempoolBitcoinType) getTxAddrs(txid string, chanInput chan Outpoint, ch
io = append(io, addrIndex{string(addrDesc), int32(output.N)}) io = append(io, addrIndex{string(addrDesc), int32(output.N)})
} }
if m.onNewTxAddr != nil { if m.onNewTxAddr != nil {
m.onNewTxAddr(tx.Txid, addrDesc, true) m.onNewTxAddr(tx, addrDesc, true)
} }
} }
dispatched := 0 dispatched := 0

View File

@ -79,7 +79,7 @@ func (m *MempoolEthereumType) Resync(onNewTxAddr OnNewTxAddrFunc) (int, error) {
io = append(io, addrIndex{string(addrDesc), int32(output.N)}) io = append(io, addrIndex{string(addrDesc), int32(output.N)})
} }
if onNewTxAddr != nil { if onNewTxAddr != nil {
onNewTxAddr(tx.Txid, addrDesc, true) onNewTxAddr(tx, addrDesc, true)
} }
} }
for _, input := range tx.Vin { for _, input := range tx.Vin {
@ -92,7 +92,7 @@ func (m *MempoolEthereumType) Resync(onNewTxAddr OnNewTxAddrFunc) (int, error) {
} }
io = append(io, addrIndex{string(addrDesc), int32(^i)}) io = append(io, addrIndex{string(addrDesc), int32(^i)})
if onNewTxAddr != nil { if onNewTxAddr != nil {
onNewTxAddr(tx.Txid, addrDesc, false) onNewTxAddr(tx, addrDesc, false)
} }
} }
} }

View File

@ -179,7 +179,7 @@ type Erc20Contract struct {
type OnNewBlockFunc func(hash string, height uint32) type OnNewBlockFunc func(hash string, height uint32)
// OnNewTxAddrFunc is used to send notification about a new transaction/address // OnNewTxAddrFunc is used to send notification about a new transaction/address
type OnNewTxAddrFunc func(txid string, desc AddressDescriptor, isOutput bool) type OnNewTxAddrFunc func(tx *Tx, desc AddressDescriptor, isOutput bool)
// BlockChain defines common interface to block chain daemon // BlockChain defines common interface to block chain daemon
type BlockChain interface { type BlockChain interface {

View File

@ -498,9 +498,9 @@ func storeInternalStateLoop() {
glog.Info("storeInternalStateLoop stopped") glog.Info("storeInternalStateLoop stopped")
} }
func onNewTxAddr(txid string, desc bchain.AddressDescriptor, isOutput bool) { func onNewTxAddr(tx *bchain.Tx, desc bchain.AddressDescriptor, isOutput bool) {
for _, c := range callbacksOnNewTxAddr { for _, c := range callbacksOnNewTxAddr {
c(txid, desc, isOutput) c(tx, desc, isOutput)
} }
} }

View File

@ -167,8 +167,9 @@ func (s *PublicServer) OnNewBlock(hash string, height uint32) {
} }
// OnNewTxAddr notifies users subscribed to bitcoind/addresstxid about new block // OnNewTxAddr notifies users subscribed to bitcoind/addresstxid about new block
func (s *PublicServer) OnNewTxAddr(txid string, desc bchain.AddressDescriptor, isOutput bool) { func (s *PublicServer) OnNewTxAddr(tx *bchain.Tx, desc bchain.AddressDescriptor, isOutput bool) {
s.socketio.OnNewTxAddr(txid, desc, isOutput) s.socketio.OnNewTxAddr(tx.Txid, desc, isOutput)
s.websocket.OnNewTxAddr(tx, desc, isOutput)
} }
func (s *PublicServer) txRedirect(w http.ResponseWriter, r *http.Request) { func (s *PublicServer) txRedirect(w http.ResponseWriter, r *http.Request) {

View File

@ -197,6 +197,15 @@ func (s *WebsocketServer) onConnect(c *websocketChannel) {
func (s *WebsocketServer) onDisconnect(c *websocketChannel) { func (s *WebsocketServer) onDisconnect(c *websocketChannel) {
s.unsubscribeNewBlock(c) s.unsubscribeNewBlock(c)
s.addressSubscriptionsLock.Lock()
defer s.addressSubscriptionsLock.Unlock()
for _, sa := range s.addressSubscriptions {
for sc := range sa {
if sc == c {
delete(sa, c)
}
}
}
glog.Info("Client disconnected ", c.id, ", ", c.ip) glog.Info("Client disconnected ", c.id, ", ", c.ip)
s.metrics.WebsocketClients.Dec() s.metrics.WebsocketClients.Dec()
} }
@ -227,6 +236,20 @@ var requestHandlers = map[string]func(*WebsocketServer, *websocketChannel, *webs
rv, err = s.unsubscribeNewBlock(c) rv, err = s.unsubscribeNewBlock(c)
return return
}, },
"subscribeAddress": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) {
ad, err := s.unmarshalAddress(req.Params)
if err == nil {
rv, err = s.subscribeAddress(c, ad, req)
}
return
},
"unsubscribeAddress": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) {
ad, err := s.unmarshalAddress(req.Params)
if err == nil {
rv, err = s.unsubscribeAddress(c, ad)
}
return
},
} }
func (s *WebsocketServer) onRequest(c *websocketChannel, req *websocketReq) { func (s *WebsocketServer) onRequest(c *websocketChannel, req *websocketReq) {
@ -325,6 +348,39 @@ func (s *WebsocketServer) unsubscribeNewBlock(c *websocketChannel) (res interfac
return return
} }
func (s *WebsocketServer) unmarshalAddress(params []byte) (bchain.AddressDescriptor, error) {
r := struct {
Address string `json:"address"`
}{}
err := json.Unmarshal(params, &r)
if err != nil {
return nil, err
}
return s.chainParser.GetAddrDescFromAddress(r.Address)
}
func (s *WebsocketServer) subscribeAddress(c *websocketChannel, addrDesc bchain.AddressDescriptor, req *websocketReq) (res interface{}, err error) {
s.addressSubscriptionsLock.Lock()
defer s.addressSubscriptionsLock.Unlock()
as, ok := s.addressSubscriptions[string(addrDesc)]
if !ok {
as = make(map[*websocketChannel]string)
s.addressSubscriptions[string(addrDesc)] = as
}
as[c] = req.ID
return
}
func (s *WebsocketServer) unsubscribeAddress(c *websocketChannel, addrDesc bchain.AddressDescriptor) (res interface{}, err error) {
s.addressSubscriptionsLock.Lock()
defer s.addressSubscriptionsLock.Unlock()
as, ok := s.addressSubscriptions[string(addrDesc)]
if ok {
delete(as, c)
}
return
}
// OnNewBlock is a callback that broadcasts info about new block to subscribed clients // OnNewBlock is a callback that broadcasts info about new block to subscribed clients
func (s *WebsocketServer) OnNewBlock(hash string, height uint32) { func (s *WebsocketServer) OnNewBlock(hash string, height uint32) {
s.newBlockSubscriptionsLock.Lock() s.newBlockSubscriptionsLock.Lock()
@ -346,3 +402,49 @@ func (s *WebsocketServer) OnNewBlock(hash string, height uint32) {
} }
glog.Info("broadcasting new block ", height, " ", hash, " to ", len(s.newBlockSubscriptions), " channels") glog.Info("broadcasting new block ", height, " ", hash, " to ", len(s.newBlockSubscriptions), " channels")
} }
// OnNewTxAddr is a callback that broadcasts info about a tx affecting subscribed address
func (s *WebsocketServer) OnNewTxAddr(tx *bchain.Tx, addrDesc bchain.AddressDescriptor, isOutput bool) {
// check if there is any subscription but release lock immediately, GetTransactionFromBchainTx may take some time
s.addressSubscriptionsLock.Lock()
as, ok := s.addressSubscriptions[string(addrDesc)]
s.addressSubscriptionsLock.Unlock()
if ok && len(as) > 0 {
addr, _, err := s.chainParser.GetAddressesFromAddrDesc(addrDesc)
if err != nil {
glog.Error("GetAddressesFromAddrDesc error ", err, " for ", addrDesc)
return
}
if len(addr) == 1 {
atx, err := s.api.GetTransactionFromBchainTx(tx, 0, false, false)
if err != nil {
glog.Error("GetTransactionFromBchainTx error ", err, " for ", tx.Txid)
return
}
data := struct {
Address string `json:"address"`
Input bool `json:"input"`
Tx *api.Tx `json:"tx"`
}{
Address: addr[0],
Input: !isOutput,
Tx: atx,
}
// get the list of subscriptions again, this time keep the lock
s.addressSubscriptionsLock.Lock()
defer s.addressSubscriptionsLock.Unlock()
as, ok = s.addressSubscriptions[string(addrDesc)]
if ok {
for c, id := range as {
if c.IsAlive() {
c.out <- &websocketRes{
ID: id,
Data: &data,
}
}
}
glog.Info("broadcasting new tx ", tx.Txid, " for addr ", addr[0], " to ", len(as), " channels")
}
}
}
}

View File

@ -11,7 +11,7 @@
} }
</style> </style>
<script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/1.7.4/socket.io.js"></script> <script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/1.7.4/socket.io.js"></script>
<title>Blockbook test socket.io</title> <title>Blockbook Socket.io Test Page</title>
<script> <script>
var socket; var socket;
function connect(server) { function connect(server) {
@ -251,7 +251,7 @@
<body> <body>
<div class="container"> <div class="container">
<div class="row justify-content-center"> <div class="row justify-content-center">
<h1>Socket.io tester</h1> <h1>Blockbook Socket.io Test Page</h1>
</div> </div>
<div class="row"> <div class="row">
<div class="col"> <div class="col">

View File

@ -10,7 +10,7 @@
margin-top: 1%; margin-top: 1%;
} }
</style> </style>
<title>Blockbook test websocket</title> <title>Blockbook Websocket Test Page</title>
<script> <script>
var ws; var ws;
var messageID; var messageID;
@ -130,8 +130,8 @@
var id = subscribe(method, params, function (result) { var id = subscribe(method, params, function (result) {
document.getElementById('subscribeAddressResult').innerText += JSON.stringify(result).replace(/,/g, ", ") + "\n"; document.getElementById('subscribeAddressResult').innerText += JSON.stringify(result).replace(/,/g, ", ") + "\n";
}); });
document.getElementById('subscribeAddressIds').innerText += id+" "; document.getElementById('subscribeAddressIds').innerText += " " + id ;
} }
function getBlockHeader() { function getBlockHeader() {
var param = document.getElementById('getBlockHeaderParam').value.trim(); var param = document.getElementById('getBlockHeaderParam').value.trim();
@ -188,7 +188,7 @@
<body> <body>
<div class="container"> <div class="container">
<div class="row justify-content-center"> <div class="row justify-content-center">
<h1>Socket.io tester</h1> <h1>Blockbook Websocket Test Page</h1>
</div> </div>
<div class="row"> <div class="row">
<div class="col"> <div class="col">
@ -223,6 +223,7 @@
<div class="col" id="getAccountInfoResult"> <div class="col" id="getAccountInfoResult">
</div> </div>
</div> </div>
<!--
<div class="row"> <div class="row">
<div class="col"> <div class="col">
<input class="btn btn-secondary" type="button" value="getBlockHeader" onclick="getBlockHeader()"> <input class="btn btn-secondary" type="button" value="getBlockHeader" onclick="getBlockHeader()">
@ -257,6 +258,7 @@
<div class="col-10" id="getInfoResult"> <div class="col-10" id="getInfoResult">
</div> </div>
</div> </div>
-->
<div class="row"> <div class="row">
<div class="col"> <div class="col">
<input class="btn btn-secondary" type="button" value="sendTransaction" onclick="sendTransaction()"> <input class="btn btn-secondary" type="button" value="sendTransaction" onclick="sendTransaction()">
@ -293,7 +295,7 @@
</div> </div>
</div> </div>
<div class="row"> <div class="row">
<div class="col" id="subscribeAddressTxidResult"> <div class="col" id="subscribeAddressResult">
</div> </div>
</div> </div>
</div> </div>