From a47316c264da66787a5f3d55be1508d4ac96936c Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Thu, 9 Mar 2017 14:06:13 -0800 Subject: [PATCH] http: refactor node websocket api. --- lib/http/base.js | 44 ++- lib/http/server.js | 656 +++++++++++++---------------------------- lib/node/nodeclient.js | 5 +- lib/utils/validator.js | 19 ++ lib/wallet/client.js | 34 +-- lib/wallet/walletdb.js | 11 +- 6 files changed, 290 insertions(+), 479 deletions(-) diff --git a/lib/http/base.js b/lib/http/base.js index 7bb19e2f..ee22d9de 100644 --- a/lib/http/base.js +++ b/lib/http/base.js @@ -562,7 +562,7 @@ HTTPBase.prototype.all = function all() { HTTPBase.prototype.addSocket = function addSocket(ws) { var self = this; - var socket = new WebSocket(ws); + var socket = new WebSocket(ws, this); var i, route; socket.on('error', function(err) { @@ -631,7 +631,6 @@ HTTPBase.prototype.joinChannel = function joinChannel(socket, name) { socket.channels[name] = item; }; - /** * Initialize websockets. * @private @@ -653,6 +652,22 @@ HTTPBase.prototype.leaveChannel = function leaveChannel(socket, name) { delete socket.channels[name]; }; +/** + * Initialize websockets. + * @private + */ + +HTTPBase.prototype.channel = function channel(name) { + var list = this.channels[name]; + + if (!list) + return; + + assert(list.size > 0); + + return list; +}; + /** * Open the server. * @alias HTTPBase#open @@ -1598,17 +1613,19 @@ function parseType(type) { * @param {SocketIO.Socket} */ -function WebSocket(socket) { +function WebSocket(socket, ctx) { if (!(this instanceof WebSocket)) - return new WebSocket(socket); + return new WebSocket(socket, ctx); EventEmitter.call(this); + this.context = ctx; this.socket = socket; this.remoteAddress = socket.conn.remoteAddress; this.hooks = {}; this.channels = {}; this.auth = false; + this.filter = null; this.prev = null; this.next = null; @@ -1638,7 +1655,6 @@ WebSocket.prototype.init = function init() { }; WebSocket.prototype.onevent = co(function* onevent(packet) { - var socket = this.socket; var args = (packet.data || []).slice(); var type = args.shift() || ''; var ack, result; @@ -1678,7 +1694,7 @@ WebSocket.prototype.fire = co(function* fire(type, args) { if (!handler) return; - return yield handler(args); + return yield handler.call(this.context, args); }); WebSocket.prototype.join = function join(name) { @@ -1698,7 +1714,21 @@ WebSocket.prototype.emit = function emit() { return this.socket.emit.apply(this.socket, arguments); }; -WebSocket.prototype.disconnect = function disconnect() { +WebSocket.prototype.call = function call() { + var socket = this.socket; + var args = new Array(arguments.length); + var i; + + for (i = 0; i < arguments.length; i++) + args[i] = arguments[i]; + + return new Promise(function(resolve, reject) { + args.push(co.wrap(resolve, reject)); + socket.emit.apply(socket, args); + }); +}; + +WebSocket.prototype.destroy = function destroy() { return this.socket.disconnect(); }; diff --git a/lib/http/server.js b/lib/http/server.js index de8c3df6..e6001c82 100644 --- a/lib/http/server.js +++ b/lib/http/server.js @@ -8,7 +8,6 @@ 'use strict'; var assert = require('assert'); -var EventEmitter = require('events').EventEmitter; var HTTPBase = require('./base'); var util = require('../utils/util'); var co = require('../utils/co'); @@ -383,43 +382,31 @@ HTTPServer.prototype.initSockets = function initSockets() { * @param {WebSocket} socket */ -HTTPServer.prototype.handleSocket = function handleSocket(ws) { - var self = this; - var socket = new ClientSocket(this, ws); - - socket.start(); - - socket.on('close', function() { - socket.destroy(); - }); - +HTTPServer.prototype.handleSocket = function handleSocket(socket) { socket.hook('auth', function(args) { var valid = new Validator([args]); + var hash = this.options.apiHash; var key = valid.str(0); - var hash; if (socket.auth) throw new Error('Already authed.'); - socket.stop(); - - if (!self.options.noAuth) { - hash = hash256(key); - if (!crypto.ccmp(hash, self.options.apiHash)) + if (!this.options.noAuth) { + if (!crypto.ccmp(hash256(key), hash)) throw new Error('Bad key.'); } socket.auth = true; - self.logger.info('Successful auth from %s.', socket.host); - self.handleAuth(socket); + this.logger.info('Successful auth from %s.', socket.host); + this.handleAuth(socket); return null; }); socket.emit('version', { version: pkg.version, - network: self.network.type + network: this.network.type }); }; @@ -430,59 +417,47 @@ HTTPServer.prototype.handleSocket = function handleSocket(ws) { */ HTTPServer.prototype.handleAuth = function handleAuth(socket) { - var self = this; - - socket.hook('options', function (args) { - var options = args[0]; - socket.setOptions(options); - }); - socket.hook('watch chain', function(args) { - if (!socket.auth) - throw new Error('Not authorized.'); - - socket.watchChain(); + socket.join('chain'); }); socket.hook('unwatch chain', function(args) { - if (!socket.auth) - throw new Error('Not authorized.'); + socket.leave('chain'); + }); - socket.unwatchChain(); + socket.hook('watch mempool', function(args) { + socket.join('mempool'); + }); + + socket.hook('unwatch mempool', function(args) { + socket.leave('mempool'); }); socket.hook('set filter', function(args) { - var data = args[0]; - var filter; + var valid = new Validator([args]); + var data = valid.buf(0); - if (!util.isHex(data) && !Buffer.isBuffer(data)) + if (!data) throw new Error('Invalid parameter.'); - if (!socket.auth) - throw new Error('Not authorized.'); + socket.filter = Bloom.fromRaw(data); - filter = Bloom.fromRaw(data, 'hex'); - socket.setFilter(filter); + return null; }); socket.hook('get tip', function(args) { - return socket.frameEntry(self.chain.tip); + return this.chain.tip.toRaw(); }); socket.hook('get entry', co(function* (args) { - var block = args[0]; + var valid = new Validator([args]); + var block = valid.numhash(0); var entry; - if (typeof block === 'string') { - if (!util.isHex256(block)) - throw new Error('Invalid parameter.'); - block = util.revHex(block); - } else { - if (!util.isUInt32(block)) - throw new Error('Invalid parameter.'); - } + if (!block) + throw new Error('Invalid parameter.'); - entry = yield self.chain.db.getEntry(block); + entry = yield this.chain.db.getEntry(block); if (!(yield entry.isMainChain())) entry = null; @@ -490,73 +465,225 @@ HTTPServer.prototype.handleAuth = function handleAuth(socket) { if (!entry) return null; - return socket.frameEntry(entry); + return entry.toRaw(); })); socket.hook('add filter', function(args) { - var chunks = args[0]; + var valid = new Validator([args]); + var chunks = valid.array(0); + var i, data; - if (!Array.isArray(chunks)) + if (!chunks) throw new Error('Invalid parameter.'); - if (!socket.auth) - throw new Error('Not authorized.'); + if (!socket.filter) + throw new Error('No filter set.'); - socket.addFilter(chunks); + valid = new Validator([chunks]); + + for (i = 0; i < chunks.length; i++) { + data = valid.buf(i); + + if (!data) + throw new Error('Bad data chunk.'); + + this.filter.add(data); + + if (this.node.spv) + this.pool.watch(data); + } + + return null; }); socket.hook('reset filter', function(args) { - if (!socket.auth) - throw new Error('Not authorized.'); - - socket.resetFilter(); + socket.filter = null; + return null; }); socket.hook('estimate fee', function(args) { - var blocks = args[0]; + var valid = new Validator([args]); + var blocks = valid.num(0); var rate; - if (blocks != null && !util.isNumber(blocks)) - throw new Error('Invalid parameter.'); - - if (!self.fees) { - rate = self.network.feeRate; + if (!this.fees) { + rate = this.network.feeRate; rate = Amount.btc(rate); return rate; } - rate = self.fees.estimateFee(blocks); + rate = this.fees.estimateFee(blocks); rate = Amount.btc(rate); return rate; }); socket.hook('send', function(args) { - var data = args[0]; + var valid = new Validator([args]); + var data = valid.buf(0); var tx; - if (!util.isHex(data) && !Buffer.isBuffer(data)) + if (!data) throw new Error('Invalid parameter.'); - tx = TX.fromRaw(data, 'hex'); + tx = TX.fromRaw(data); - self.node.send(tx); + this.node.send(tx); + + return null; }); socket.hook('rescan', function(args) { - var start = args[0]; + var valid = new Validator([args]); + var start = valid.numhash(0); - if (!util.isHex256(start) && !util.isUInt32(start)) + if (!start) throw new Error('Invalid parameter.'); - if (!socket.auth) - throw new Error('Not authorized.'); - - if (typeof start === 'string') - start = util.revHex(start); - - return socket.scan(start); + return this.scan(socket, start); }); + + this.bindChain(); +}; + +HTTPServer.prototype.bindChain = function bindChain() { + var self = this; + var pool = this.mempool || this.pool; + + this.chain.on('connect', function(entry, block, view) { + var list = self.channel('chain'); + var item, socket, raw, txs; + + if (!list) + return; + + raw = entry.toRaw(); + + self.to('chain', 'chain connect', raw); + + for (item = list.head; item; item = item.next) { + socket = item.value; + txs = self.filterBlock(socket, block); + socket.emit('block connect', raw, txs); + } + }); + + this.chain.on('disconnect', function(entry, block, view) { + var list = self.channel('chain'); + var raw; + + if (!list) + return; + + raw = entry.toRaw(); + + self.to('chain', 'chain disconnect', raw); + self.to('chain', 'block disconnect', raw); + }); + + this.chain.on('reset', function(tip) { + var list = self.channel('chain'); + var raw; + + if (!list) + return; + + raw = tip.toRaw(); + + self.to('chain', 'chain reset', raw); + }); + + pool.on('tx', function(tx) { + var list = self.channel('mempool'); + var item, socket, raw; + + if (!list) + return; + + raw = tx.toRaw(); + + for (item = list.head; item; item = item.next) { + socket = item.value; + + if (!self.filterTX(socket, tx)) + continue; + + socket.emit('tx', raw); + } + }); +}; + +HTTPServer.prototype.filterBlock = function filterBlock(socket, block) { + var txs = []; + var i, tx; + + if (!socket.filter) + return txs; + + for (i = 0; i < block.txs.length; i++) { + tx = block.txs[i]; + if (this.filterTX(socket, tx)) + txs.push(tx.toRaw()); + } + + return txs; +}; + +HTTPServer.prototype.filterTX = function filterTX(socket, tx) { + var found = false; + var i, hash, input, prevout, output; + + if (!socket.filter) + return false; + + for (i = 0; i < tx.outputs.length; i++) { + output = tx.outputs[i]; + hash = output.getHash(); + + if (!hash) + continue; + + if (socket.filter.test(hash)) { + prevout = Outpoint.fromTX(tx, i); + socket.filter.add(prevout.toRaw()); + found = true; + } + } + + if (found) + return true; + + if (!tx.isCoinbase()) { + for (i = 0; i < tx.inputs.length; i++) { + input = tx.inputs[i]; + prevout = input.prevout; + if (socket.filter.test(prevout.toRaw())) + return true; + } + } + + return false; +}; + +HTTPServer.prototype.scan = co(function* scan(socket, start) { + var scanner = this.scanner.bind(this, socket); + yield this.node.scan(start, socket.filter, scanner); + return null; +}); + +HTTPServer.prototype.scanner = function scanner(socket, entry, txs) { + var block = entry.toRaw(); + var raw = []; + var i, tx; + + for (i = 0; i < txs.length; i++) { + tx = txs[i]; + raw.push(tx.toRaw()); + } + + socket.emit('block rescan', block, raw); + + return Promise.resolve(); }; /** @@ -683,330 +810,6 @@ HTTPOptions.fromOptions = function fromOptions(options) { return new HTTPOptions().fromOptions(options); }; -/** - * ClientSocket - * @constructor - * @ignore - * @param {HTTPServer} server - * @param {SocketIO.Socket} - */ - -function ClientSocket(server, socket) { - if (!(this instanceof ClientSocket)) - return new ClientSocket(server, socket); - - EventEmitter.call(this); - - this.server = server; - this.socket = socket; - this.host = socket.remoteAddress; - this.timeout = null; - this.auth = false; - this.filter = null; - this.raw = false; - this.watching = false; - - this.network = this.server.network; - this.node = this.server.node; - this.chain = this.server.chain; - this.mempool = this.server.mempool; - this.pool = this.server.pool; - this.logger = this.server.logger; - this.events = []; - - this.init(); -} - -util.inherits(ClientSocket, EventEmitter); - -ClientSocket.prototype.init = function init() { - var self = this; - var socket = this.socket; - var emit = EventEmitter.prototype.emit; - - socket.on('error', function(err) { - emit.call(self, 'error', err); - }); - - socket.on('close', function() { - emit.call(self, 'close'); - }); -}; - -ClientSocket.prototype.setOptions = function setOptions(options) { - assert(options && typeof options === 'object', 'Invalid parameter.'); - - if (options.raw != null) { - assert(typeof options.raw === 'boolean', 'Invalid parameter.'); - this.raw = options.raw; - } -}; - -ClientSocket.prototype.setFilter = function setFilter(filter) { - this.filter = filter; -}; - -ClientSocket.prototype.addFilter = function addFilter(chunks) { - var i, data; - - if (!this.filter) - throw new Error('No filter set.'); - - for (i = 0; i < chunks.length; i++) { - data = chunks[i]; - - if (!util.isHex(data) && !Buffer.isBuffer(data)) - throw new Error('Not a hex string.'); - - this.filter.add(data, 'hex'); - - if (this.pool.options.spv) - this.pool.watch(data, 'hex'); - } -}; - -ClientSocket.prototype.resetFilter = function resetFilter() { - if (!this.filter) - throw new Error('No filter set.'); - - this.filter.reset(); -}; - -ClientSocket.prototype.bind = function bind(obj, event, listener) { - this.events.push([obj, event, listener]); - obj.on(event, listener); -}; - -ClientSocket.prototype.unbind = function unbind(obj, event) { - var i, item; - - for (i = this.events.length - 1; i >= 0; i--) { - item = this.events[i]; - if (item[0] === obj && item[1] === event) { - obj.removeListener(event, item[2]); - this.events.splice(i, 1); - } - } -}; - -ClientSocket.prototype.unbindAll = function unbindAll() { - var i, event; - - for (i = 0; i < this.events.length; i++) { - event = this.events[i]; - event[0].removeListener(event[1], event[2]); - } - - this.events.length = 0; -}; - -ClientSocket.prototype.watchChain = function watchChain() { - var self = this; - var pool = this.mempool || this.pool; - - if (this.watching) - throw new Error('Already watching chain.'); - - this.watching = true; - - this.bind(this.chain, 'connect', function(entry, block, view) { - self.connectBlock(entry, block, view); - }); - - this.bind(this.chain, 'disconnect', function(entry, block, view) { - self.disconnectBlock(entry, block, view); - }); - - this.bind(this.chain, 'reset', function(tip) { - self.emit('chain reset', self.frameEntry(tip)); - }); - - this.bind(pool, 'tx', function(tx) { - self.sendTX(tx); - }); -}; - -ClientSocket.prototype.onError = function onError(err) { - var emit = EventEmitter.prototype.emit; - emit.call(this, 'error', err); -}; - -ClientSocket.prototype.unwatchChain = function unwatchChain() { - var pool = this.mempool || this.pool; - - if (!this.watching) - throw new Error('Not watching chain.'); - - this.watching = false; - - this.unbind(this.chain, 'connect'); - this.unbind(this.chain, 'disconnect'); - this.unbind(this.chain, 'reset'); - this.unbind(pool, 'tx'); -}; - -ClientSocket.prototype.connectBlock = function connectBlock(entry, block, view) { - var raw = this.frameEntry(entry); - var txs; - - this.emit('entry connect', raw); - - if (!this.filter) - return; - - txs = this.filterBlock(entry, block, view); - - this.emit('block connect', raw, txs); -}; - -ClientSocket.prototype.disconnectBlock = function disconnectBlock(entry, block, view) { - var raw = this.frameEntry(entry); - - this.emit('entry disconnect', raw); - - if (!this.filter) - return; - - this.emit('block disconnect', raw); -}; - -ClientSocket.prototype.sendTX = function sendTX(tx) { - var raw; - - if (!this.filterTX(tx)) - return; - - raw = this.frameTX(tx); - - this.emit('tx', raw); -}; - -ClientSocket.prototype.rescanBlock = function rescanBlock(entry, txs) { - var self = this; - return new Promise(function(resolve, reject) { - var cb = TimedCB.wrap(resolve, reject); - self.emit('block rescan', entry, txs, cb); - }); -}; - -ClientSocket.prototype.filterBlock = function filterBlock(entry, block, view) { - var txs = []; - var i, tx; - - if (!this.filter) - return; - - for (i = 0; i < block.txs.length; i++) { - tx = block.txs[i]; - if (this.filterTX(tx)) - txs.push(this.frameTX(tx, view, entry, i)); - } - - return txs; -}; - -ClientSocket.prototype.filterTX = function filterTX(tx) { - var found = false; - var i, hash, input, prevout, output; - - if (!this.filter) - return false; - - for (i = 0; i < tx.outputs.length; i++) { - output = tx.outputs[i]; - hash = output.getHash(); - - if (!hash) - continue; - - if (this.filter.test(hash)) { - prevout = Outpoint.fromTX(tx, i); - this.filter.add(prevout.toRaw()); - found = true; - } - } - - if (found) - return true; - - if (!tx.isCoinbase()) { - for (i = 0; i < tx.inputs.length; i++) { - input = tx.inputs[i]; - prevout = input.prevout; - if (this.filter.test(prevout.toRaw())) - return true; - } - } - - return false; -}; - -ClientSocket.prototype.scan = co(function* scan(start) { - var scanner = this.scanner.bind(this); - yield this.node.scan(start, this.filter, scanner); -}); - -ClientSocket.prototype.scanner = function scanner(entry, txs) { - var block = this.frameEntry(entry); - var raw = new Array(txs.length); - var i; - - for (i = 0; i < txs.length; i++) - raw[i] = this.frameTX(txs[i], null, entry, i); - - return this.rescanBlock(block, raw); -}; - -ClientSocket.prototype.frameEntry = function frameEntry(entry) { - if (this.raw) - return entry.toRaw(); - return entry.toJSON(); -}; - -ClientSocket.prototype.frameTX = function frameTX(tx, view, entry, index) { - if (this.raw) - return tx.toRaw(); - return tx.getJSON(this.network, view, entry, index); -}; - -ClientSocket.prototype.join = function join(id) { - this.socket.join(id); -}; - -ClientSocket.prototype.leave = function leave(id) { - this.socket.leave(id); -}; - -ClientSocket.prototype.hook = function hook(type, handler) { - this.socket.hook(type, handler); -}; - -ClientSocket.prototype.emit = function emit() { - this.socket.emit.apply(this.socket, arguments); -}; - -ClientSocket.prototype.start = function start() { - var self = this; - this.stop(); - this.timeout = setTimeout(function() { - self.timeout = null; - self.destroy(); - }, 60000); -}; - -ClientSocket.prototype.stop = function stop() { - if (this.timeout != null) { - clearTimeout(this.timeout); - this.timeout = null; - } -}; - -ClientSocket.prototype.destroy = function() { - this.unbindAll(); - this.stop(); - this.socket.disconnect(); -}; - /* * Helpers */ @@ -1031,47 +834,6 @@ function enforce(value, msg) { } } -/** - * TimedCB - * @constructor - * @ignore - */ - -function TimedCB(resolve, reject) { - this.resolve = resolve; - this.reject = reject; - this.done = false; -} - -TimedCB.wrap = function wrap(resolve, reject) { - return new TimedCB(resolve, reject).start(); -}; - -TimedCB.prototype.start = function start() { - var self = this; - var timeout; - - timeout = setTimeout(function() { - self.cleanup(timeout); - self.reject(new Error('Callback timed out.')); - }, 5000); - - return function(err) { - self.cleanup(timeout); - if (err) { - self.reject(err); - return; - } - self.resolve(); - }; -}; - -TimedCB.prototype.cleanup = function cleanup(timeout) { - assert(!this.done); - this.done = true; - clearTimeout(timeout); -}; - /* * Expose */ diff --git a/lib/node/nodeclient.js b/lib/node/nodeclient.js index de438ba9..ecf85cf4 100644 --- a/lib/node/nodeclient.js +++ b/lib/node/nodeclient.js @@ -184,10 +184,7 @@ NodeClient.prototype.estimateFee = function estimateFee(blocks) { NodeClient.prototype.rescan = function rescan(start) { var self = this; return this.node.chain.scan(start, this.filter, function(entry, txs) { - return new Promise(function(resolve, reject) { - var cb = co.wrap(resolve, reject); - self.emit('block rescan', entry, txs, cb); - }); + return self.fire('block rescan', entry, txs); }); }; diff --git a/lib/utils/validator.js b/lib/utils/validator.js index 0ecd3b32..b9c17b5d 100644 --- a/lib/utils/validator.js +++ b/lib/utils/validator.js @@ -225,6 +225,25 @@ Validator.prototype.hash = function hash(key, fallback) { return out; }; +/** + * Get a config option (as a number). + * @param {String} key + * @param {Object?} fallback + * @returns {Number|null} + */ + +Validator.prototype.numhash = function numhash(key, fallback) { + var value = this.get(key); + + if (value === null) + return fallback; + + if (typeof value === 'string') + return this.hash(key); + + return this.num(key); +}; + /** * Get a config option (as a number). * @param {String} key diff --git a/lib/wallet/client.js b/lib/wallet/client.js index 67ae38f2..b71736be 100644 --- a/lib/wallet/client.js +++ b/lib/wallet/client.js @@ -108,7 +108,7 @@ WalletClient.prototype._open = co(function* _open() { return cb(); } - self.emit('block rescan', block.entry, block.txs, cb); + self.fire('block rescan', block.entry, block.txs).then(cb, cb); }); this.socket.on('chain reset', function(tip) { @@ -136,8 +136,8 @@ WalletClient.prototype._open = co(function* _open() { yield this.onConnect(); yield this.sendAuth(); - yield this.sendOptions({ raw: true }); yield this.watchChain(); + yield this.watchMempool(); }); /** @@ -183,20 +183,7 @@ WalletClient.prototype.sendAuth = function sendAuth() { }; /** - * Wait for websocket options. - * @private - * @returns {Promise} - */ - -WalletClient.prototype.sendOptions = function sendOptions(options) { - var self = this; - return new Promise(function(resolve, reject) { - self.socket.emit('options', options, wrap(resolve, reject)); - }); -}; - -/** - * Wait for websocket options. + * Watch the blockchain. * @private * @returns {Promise} */ @@ -208,6 +195,19 @@ WalletClient.prototype.watchChain = function watchChain() { }); }; +/** + * Watch the blockchain. + * @private + * @returns {Promise} + */ + +WalletClient.prototype.watchMempool = function watchMempool() { + var self = this; + return new Promise(function(resolve, reject) { + self.socket.emit('watch mempool', wrap(resolve, reject)); + }); +}; + /** * Get chain tip. * @returns {Promise} @@ -375,7 +375,7 @@ function BlockResult(entry, txs) { function wrap(resolve, reject, parse) { return function(err, result) { if (err) { - reject(new Error(err.error)); + reject(new Error(err.message)); return; } diff --git a/lib/wallet/walletdb.js b/lib/wallet/walletdb.js index b450f70a..63a97b5e 100644 --- a/lib/wallet/walletdb.js +++ b/lib/wallet/walletdb.js @@ -36,7 +36,6 @@ var PathMapRecord = records.PathMapRecord; var OutpointMapRecord = records.OutpointMapRecord; var TXRecord = records.TXRecord; var U32 = encoding.U32; -var cob = co.cob; var DUMMY = new Buffer([0]); /** @@ -288,8 +287,12 @@ WalletDB.prototype.bind = function bind() { } })); - this.client.on('block rescan', cob(function* (entry, txs) { - yield self.rescanBlock(entry, txs); + this.client.hook('block rescan', co(function* (entry, txs) { + try { + yield self.rescanBlock(entry, txs); + } catch (e) { + self.emit('error', e); + } })); this.client.on('tx', co(function* (tx) { @@ -320,7 +323,7 @@ WalletDB.prototype.connect = co(function* connect() { this.bind(); - yield this.client.open({ raw: true }); + yield this.client.open(); yield this.setFilter(); });