From cee8b6598969bb6f6409442b9ba376ae2286c7fc Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Wed, 9 Mar 2016 15:11:36 -0800 Subject: [PATCH] http features. --- lib/bcoin/http.js | 343 +++++++++++++++++++++++++++++++++++++++++- lib/bcoin/node.js | 6 - lib/bcoin/wallet.js | 5 + lib/bcoin/walletdb.js | 15 +- 4 files changed, 354 insertions(+), 15 deletions(-) diff --git a/lib/bcoin/http.js b/lib/bcoin/http.js index 8eee8108..fcc3af88 100644 --- a/lib/bcoin/http.js +++ b/lib/bcoin/http.js @@ -7,6 +7,13 @@ var EventEmitter = require('events').EventEmitter; var StringDecoder = require('string_decoder').StringDecoder; var url = require('url'); +var engine; + +try { + engine = require('engine.io'); +} catch (e) { + ; +} var bcoin = require('../bcoin'); var bn = require('bn.js'); @@ -48,6 +55,7 @@ HTTPServer.prototype._init = function _init() { var self = this; this._initRouter(); + this._initEngine(); this.server.on('connection', function(socket) { socket.on('error', function(err) { @@ -216,6 +224,9 @@ HTTPServer.prototype._init = function _init() { // Wallet Balance this.get('/wallet/:id/balance', function(req, res, next, send) { + var id = req.params.id; + if (id === '_all') + id = null; self.node.walletdb.getBalance(req.params.id, function(err, balance) { if (err) return next(err); @@ -229,6 +240,9 @@ HTTPServer.prototype._init = function _init() { // Wallet UTXOs this.get('/wallet/:id/coin', function(req, res, next, send) { + var id = req.params.id; + if (id === '_all') + id = null; self.node.walletdb.getCoins(req.params.id, function(err, coins) { if (err) return next(err); @@ -240,8 +254,27 @@ HTTPServer.prototype._init = function _init() { }); }); + // Wallet TX + this.get('/wallet/:id/coin/:hash/:index', function(req, res, next, send) { + var id = req.params.id; + if (id === '_all') + id = null; + self.node.walletdb.getCoin(req.params.hash, +req.params.index, function(err, coin) { + if (err) + return next(err); + + if (!coin) + return send(404); + + send(200, coin.toJSON()); + }); + }); + // Wallet TXs - this.get('/wallet/:id/all', function(req, res, next, send) { + this.get('/wallet/:id/tx/all', function(req, res, next, send) { + var id = req.params.id; + if (id === '_all') + id = null; self.node.walletdb.getAll(req.params.id, function(err, txs) { if (err) return next(err); @@ -254,7 +287,10 @@ HTTPServer.prototype._init = function _init() { }); // Wallet Pending TXs - this.get('/wallet/:id/pending', function(req, res, next, send) { + this.get('/wallet/:id/tx/pending', function(req, res, next, send) { + var id = req.params.id; + if (id === '_all') + id = null; self.node.walletdb.getPending(req.params.id, function(err, txs) { if (err) return next(err); @@ -266,9 +302,95 @@ HTTPServer.prototype._init = function _init() { }); }); - // Emit events for any wallet txs that come in. - this.node.on('wallet tx', function(tx, ids) { - self.sendWebhook({ tx: tx, ids: ids }); + // Wallet TXs within time range + this.get('/wallet/:id/tx/range', function(req, res, next, send) { + var id = req.params.id; + if (id === '_all') + id = null; + + var options = { + start: +req.query.start, + end: +req.query.end, + limit: +req.query.limit + }; + + self.node.walletdb.getTimeRange(req.params.id, options, function(err, txs) { + if (err) + return next(err); + + if (!txs.length) + return send(404); + + send(200, txs.map(function(tx) { return tx.toJSON(); })); + }); + }); + + // Wallet TXs within time range + this.get('/wallet/:id/tx/last', function(req, res, next, send) { + var id = req.params.id; + if (id === '_all') + id = null; + + self.node.walletdb.getTimeRange(id, +req.query.limit, function(err, txs) { + if (err) + return next(err); + + if (!txs.length) + return send(404); + + send(200, txs.map(function(tx) { return tx.toJSON(); })); + }); + }); + + // Wallet TX + this.get('/wallet/:id/tx/:hash', function(req, res, next, send) { + self.node.walletdb.getTX(req.params.hash, function(err, tx) { + if (err) + return next(err); + + if (!tx) + return send(404); + + send(200, tx.toJSON()); + }); + }); + + this.post('/broadcast', function(req, res, next, send) { + self.node.pool.broadcast(bcoin.tx.fromRaw(req.body.tx, 'hex')); + send(200, { success: true }); + }); +}; + +HTTPServer.prototype._initEngine = function _initEngine() { + var self = this; + + if (!engine) + return; + + this.clients = []; + + this.engine = new engine.Server(); + + this.engine.attach(this.server); + + this.engine.on('connection', function(socket) { + var s = new Socket(self, socket); + + socket.on('message', function(data) { + s.parse(data); + }); + + socket.on('close', function() { + s.destroy(); + }); + + socket.on('error', function(err) { + self.emit('error', err); + }); + + s.on('error', function(err) { + self.emit('error', err); + }); }); }; @@ -392,8 +514,6 @@ HTTPServer.prototype.del = function del(path, callback) { this.routes.del.push({ path: path, callback: callback }); }; -// Send webhooks to notify other servers -// of incoming txs and wallet updates. HTTPServer.prototype.sendWebhook = function sendWebhook(msg, callback) { var request, body, secret, hmac; @@ -586,6 +706,215 @@ function unescape(str) { } } +function Socket(server, socket) { + this.server = server; + this.engine = server.engine; + this.node = server.node; + this.walletdb = server.node.walletdb; + this.socket = socket; + this.listeners = {}; + this._init(); +} + +Socket.prototype._init = function _init() { + this.server.clients.push(this); +}; + +Socket.prototype.parse = function parse(msg) { + var size, off, header, payload; + + size = utils.readIntv(msg, 0); + off = size.off; + size = size.r; + + if (off + size > msg.length) + return this.send({ event: 'error', msg: 'Size larger than message.' }); + + try { + header = JSON.parse(msg.slice(off, off + size).toString('utf8')); + off += size; + } catch (e) { + return this.send({ event: 'error', msg: 'Header malformed.' }); + } + + payload = data.slice(off); + + try { + return this._handle(header, payload); + } catch (e) { + return this.send({ event: 'error', msg: e.message + '' }); + } +}; + +Socket.prototype._handle = function _handle(header, payload) { + if (header.cmd === 'handshake') + return this._handleHandshake(header, payload); + + if (header.cmd === 'listen') + return this._handleListen(header, payload); + + if (header.cmd === 'unlisten') + return this._handleUnlisten(header, payload); + + if (header.event) + return this.emit(header.event, header, payload); + + throw new Error('Not a valid command.'); +}; + +Socket.prototype.destroy = function destroy() { + this.remove(); + this.cleanup(); + return this.clients.destroy(); +}; + +Socket.prototype.remove = function remove() { + var i = this.server.clients.indexOf(this); + if (i !== -1) + this.server.clients.splice(i, 1); +}; + +Socket.prototype.cleanup = function cleanup() { + Object.keys(this.listeners).forEach(function(id) { + this.unlistenWallet(id); + }, this); +}; + +Socket.prototype.unlistenWallet = function unlistenWallet(id) { + this.listeners[id].forEach(function(listener) { + this.walletdb.removeListener(listener[0], listener[1]); + }, this); + + delete this.listeners[id]; +}; + +Socket.prototype._listenWallet = function _listenWallet(id, event, listener) { + if (!this.listeners[id]) + this.listeners[id] = []; + + this.listeners[id].push([event, listener]); + this.walletdb.on(event, listener); +}; + +Socket.prototype.listenWallet = function listenWallet(event, id, listener) { + if (id === '_all') { + this._listenWallet(id, 'tx', function(tx, map) { + self.send({ event: 'tx', map: map, id: id, _payload: tx.toExtended() }); + }); + + this._listenWallet(id, 'updated', function(tx, map) { + self.send({ event: 'updated', map: map, id: id, _payload: tx.toExtended() }); + }); + + this._listenWallet(id, 'confirmed', function(tx, map) { + self.send({ event: 'confirmed', map: map, id: id, _payload: tx.toExtended() }); + }); + + this._listenWallet(id, 'unconfirmed', function(tx, map) { + self.send({ event: 'unconfirmed', map: map, id: id, _payload: tx.toExtended() }); + }); + + this._listenWallet(id, 'balances', function(balances) { + Object.keys(balances).forEach(function(key) { + balances[key] = utils.btc(balances[key]); + }); + self.send({ event: 'balances', id: id, balances: balances }); + }); + + return; + } + + this._listenWallet(id, id + ' tx', function(tx) { + self.send({ event: 'tx', id: id, _payload: tx.toExtended() }); + }); + + this._listenWallet(id, id + ' updated', function(tx) { + self.send({ event: 'updated', id: id, _payload: tx.toExtended() }); + }); + + this._listenWallet(id, id + ' confirmed', function(tx) { + self.send({ event: 'confirmed', id: id, _payload: tx.toExtended() }); + }); + + this._listenWallet(id, id + ' unconfirmed', function(tx) { + self.send({ event: 'unconfirmed', id: id, _payload: tx.toExtended() }); + }); + + this._listenWallet(id, id + ' balance', function(balance) { + self.send({ event: 'balance', id: id, balance: utils.btc(balance) }); + }); +}; + +Socket.prototype._onListen = function _onListen(header, payload) { + var self = this; + var id = header.id; + + if (typeof id !== 'string') + throw new Error('Wallet ID is not a string.'); + + if (id.length > 1000) + throw new Error('Wallet ID too large.'); + + if (!/^[a-zA-Z0-9]+$/.test(id)) + throw new Error('Wallet ID must be alphanumeric.'); + + if (this.listeners[id]) + throw new Error('Already listening.'); + + this.listenWallet(id); +}; + +Socket.prototype._onUnlisten = function _onUnlisten(header, payload) { + var self = this; + var id = header.id; + + if (typeof id !== 'string') + throw new Error('Wallet ID is not a string.'); + + if (id.length > 1000) + throw new Error('Wallet ID too large.'); + + if (!/^[a-zA-Z0-9]+$/.test(id)) + throw new Error('Wallet ID must be alphanumeric.'); + + if (!this.listeners[id]) + throw new Error('Not listening.'); + + this.unlistenWallet(id); +}; + +Socket.prototype._handleHandshake = function _handleHandshake(header, payload) { + if (this._activated) + throw new Error('Handshake already completed.'); + + if (payload.length > 0) + throw new Error('Unexpected payload.'); + + this._activated = true; +}; + +Socket.prototype.send = function send(data) { + var header, payload, size, msg; + + if (data._payload) { + payload = data._payload; + delete data._payload; + } else { + payload = new Buffer([]); + } + + assert(Buffer.isBuffer(payload)); + + header = new Buffer(JSON.stringify(data), 'utf8'); + + size = new Buffer(utils.sizeIntv(header.length)); + utils.writeIntv(size, header.length, 0); + + msg = Buffer.concat([size, header, payload]); + + return this.socket.send(msg); +}; + /** * Expose */ diff --git a/lib/bcoin/node.js b/lib/bcoin/node.js index 24ab1876..05fa3fed 100644 --- a/lib/bcoin/node.js +++ b/lib/bcoin/node.js @@ -104,12 +104,6 @@ Fullnode.prototype._init = function _init() { self.emit('error', err); }); - // Emit events for any TX we see that's - // is relevant to one of our wallets. - this.walletdb.on('wallet tx', function(tx, ids) { - self.emit('wallet tx', tx, ids); - }); - if (0) this.on('tx', function(tx) { self.walletdb.addTX(tx, function(err) { diff --git a/lib/bcoin/wallet.js b/lib/bcoin/wallet.js index 8e8748f6..3404d11e 100644 --- a/lib/bcoin/wallet.js +++ b/lib/bcoin/wallet.js @@ -303,6 +303,11 @@ Wallet.prototype.createAddress = function createAddress(change) { this.receiveAddress = address; } + if (this.provider && this.provider.sync) { + assert(!this.provider.update); + this.provider.sync(this, address); + } + return address; }; diff --git a/lib/bcoin/walletdb.js b/lib/bcoin/walletdb.js index 03f7e120..08e10d04 100644 --- a/lib/bcoin/walletdb.js +++ b/lib/bcoin/walletdb.js @@ -124,39 +124,50 @@ WalletDB.prototype._init = function _init() { }); this.tx.on('tx', function(tx, map) { + self.emit('tx', tx, map); map.all.forEach(function(id) { self.emit(id + ' tx', tx); }); }); this.tx.on('confirmed', function(tx, map) { + self.emit('confirmed', tx, map); map.all.forEach(function(id) { self.emit(id + ' confirmed', tx); }); }); this.tx.on('unconfirmed', function(tx, map) { + self.emit('unconfirmed', tx, map); map.all.forEach(function(id) { self.emit(id + ' unconfirmed', tx); }); }); this.tx.on('updated', function(tx, map) { - self.emit('wallet tx', tx, map); + var balances = {}; + + self.emit('updated', tx, map); utils.forEachSerial(map.output, function(id, next) { - if (self.listeners(id + ' balance').length === 0) + if (self.listeners('balance').length === 0 + && self.listeners(id + ' balance').length === 0) { return next(); + } self.getBalance(id, function(err, balance) { if (err) return self.emit('error', err); + balances[id] = balance; + self.emit(id + ' balance', balance); }); }, function(err) { if (err) self.emit('error', err); + + self.emit('balances', balances); }); // Only sync for confirmed txs.