http: misc fixes.

This commit is contained in:
Christopher Jeffrey 2017-03-10 05:32:42 -08:00
parent 1b098b61ba
commit 48066f1634
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
7 changed files with 195 additions and 157 deletions

View File

@ -18,16 +18,16 @@ var util = require('../utils/util');
var co = require('../utils/co');
var Validator = require('../utils/validator');
var List = require('../utils/list');
var ListItem = List.Item;
var fs = require('../utils/fs');
var crypto = require('../crypto/crypto');
var ListItem = List.Item;
/**
* HTTPBase
* @alias module:http.Base
* @constructor
* @param {Object?} options
* @emits HTTPBase#websocket
* @emits HTTPBase#socket
*/
function HTTPBase(options) {
@ -193,22 +193,19 @@ HTTPBase.prototype.basicAuth = function basicAuth(options) {
var pass = options.password;
var realm = options.realm;
if (!Buffer.isBuffer(user)) {
assert(typeof user === 'string');
user = new Buffer(user, 'utf8');
if (user) {
if (typeof user === 'string')
user = new Buffer(user, 'utf8');
assert(Buffer.isBuffer(user));
user = crypto.hash256(user);
}
if (!Buffer.isBuffer(pass)) {
assert(typeof pass === 'string');
if (typeof pass === 'string')
pass = new Buffer(pass, 'utf8');
}
user = crypto.hash256(user);
assert(Buffer.isBuffer(pass));
pass = crypto.hash256(pass);
// XXX
user = null;
if (!realm)
realm = 'server';
@ -586,8 +583,9 @@ HTTPBase.prototype.all = function all() {
};
/**
* Initialize websockets.
* Add and initialize a websocket.
* @private
* @param {SocketIO.Socket} ws
*/
HTTPBase.prototype.addSocket = function addSocket(ws) {
@ -622,8 +620,9 @@ HTTPBase.prototype.addSocket = function addSocket(ws) {
};
/**
* Initialize websockets.
* Remove a socket from lists.
* @private
* @param {WebSocket} socket
*/
HTTPBase.prototype.removeSocket = function removeSocket(socket) {
@ -639,8 +638,10 @@ HTTPBase.prototype.removeSocket = function removeSocket(socket) {
};
/**
* Initialize websockets.
* Add a socket to channel list.
* @private
* @param {WebSocket} socket
* @param {String} name
*/
HTTPBase.prototype.joinChannel = function joinChannel(socket, name) {
@ -662,8 +663,10 @@ HTTPBase.prototype.joinChannel = function joinChannel(socket, name) {
};
/**
* Initialize websockets.
* Remove a socket from channel list.
* @private
* @param {WebSocket} socket
* @param {String} name
*/
HTTPBase.prototype.leaveChannel = function leaveChannel(socket, name) {
@ -683,8 +686,9 @@ HTTPBase.prototype.leaveChannel = function leaveChannel(socket, name) {
};
/**
* Initialize websockets.
* Get channel list.
* @private
* @param {String} name
*/
HTTPBase.prototype.channel = function channel(name) {
@ -1533,6 +1537,130 @@ Response.prototype.send = function send(code, msg, type) {
assert(false, 'Bad object passed to send.');
};
/**
* WebSocket
* @constructor
* @ignore
* @param {SocketIO.Socket}
*/
function WebSocket(socket, ctx) {
if (!(this instanceof WebSocket))
return new WebSocket(socket, ctx);
EventEmitter.call(this);
this.context = ctx;
this.socket = socket;
this.remoteAddress = socket.conn.remoteAddress;
this.hooks = {};
this.channels = {};
this.auth = false;
this.filter = null;
this.prev = null;
this.next = null;
this.init();
}
util.inherits(WebSocket, EventEmitter);
WebSocket.prototype.init = function init() {
var self = this;
var socket = this.socket;
var onevent = socket.onevent.bind(socket);
socket.onevent = function(packet) {
var result = onevent(packet);
self.onevent(packet);
return result;
};
socket.on('error', function(err) {
self.dispatch('error', err);
});
socket.on('disconnect', function() {
self.dispatch('close');
});
};
WebSocket.prototype.onevent = co(function* onevent(packet) {
var args = (packet.data || []).slice();
var type = args.shift() || '';
var ack, result;
if (typeof args[args.length - 1] === 'function')
ack = args.pop();
else
ack = this.socket.ack(packet.id);
try {
result = yield this.fire(type, args);
} catch (e) {
ack({
type: e.type || 'Error',
message: e.stack,
code: e.code
});
return;
}
if (result === undefined)
return;
ack(null, result);
});
WebSocket.prototype.hook = function hook(type, handler) {
assert(!this.hooks[type], 'Event already added.');
this.hooks[type] = handler;
};
WebSocket.prototype.fire = co(function* fire(type, args) {
var handler = this.hooks[type];
if (!handler)
return;
return yield handler.call(this.context, args);
});
WebSocket.prototype.join = function join(name) {
this.dispatch('join channel', name);
};
WebSocket.prototype.leave = function leave(name) {
this.dispatch('leave channel', name);
};
WebSocket.prototype.dispatch = function dispatch() {
var emit = EventEmitter.prototype.emit;
return emit.apply(this, arguments);
};
WebSocket.prototype.emit = function emit() {
return this.socket.emit.apply(this.socket, arguments);
};
WebSocket.prototype.call = function call() {
var socket = this.socket;
var args = new Array(arguments.length);
var i;
for (i = 0; i < arguments.length; i++)
args[i] = arguments[i];
return new Promise(function(resolve, reject) {
args.push(co.wrap(resolve, reject));
socket.emit.apply(socket, args);
});
};
WebSocket.prototype.destroy = function destroy() {
return this.socket.disconnect();
};
/*
* Helpers
*/
@ -1636,132 +1764,6 @@ function parseType(type) {
}
}
/**
* WebSocket
* @constructor
* @ignore
* @param {SocketIO.Socket}
*/
function WebSocket(socket, ctx) {
if (!(this instanceof WebSocket))
return new WebSocket(socket, ctx);
EventEmitter.call(this);
this.context = ctx;
this.socket = socket;
this.remoteAddress = socket.conn.remoteAddress;
this.hooks = {};
this.channels = {};
this.auth = false;
this.filter = null;
this.prev = null;
this.next = null;
this.init();
}
util.inherits(WebSocket, EventEmitter);
WebSocket.prototype.init = function init() {
var self = this;
var socket = this.socket;
var onevent = socket.onevent.bind(socket);
socket.onevent = function(packet) {
var result = onevent(packet);
self.onevent(packet);
return result;
};
socket.on('error', function(err) {
self.dispatch('error', err);
});
socket.on('disconnect', function() {
self.dispatch('close');
});
};
WebSocket.prototype.onevent = co(function* onevent(packet) {
var args = (packet.data || []).slice();
var type = args.shift() || '';
var ack, result;
if (typeof args[args.length - 1] === 'function')
ack = args.pop();
else
ack = this.socket.ack(packet.id);
try {
result = yield this.fire(type, args);
} catch (e) {
ack({
type: e.type || 'Error',
message: e.stack,
code: e.code
});
return;
}
if (result === undefined)
return;
ack(null, result);
});
WebSocket.prototype.hook = function hook(type, handler) {
// assert(!this.hooks[type], 'Event already added.');
if (this.hooks[type])
return;
this.hooks[type] = handler;
};
WebSocket.prototype.fire = co(function* fire(type, args) {
var handler = this.hooks[type];
if (!handler)
return;
return yield handler.call(this.context, args);
});
WebSocket.prototype.join = function join(name) {
this.dispatch('join channel', name);
};
WebSocket.prototype.leave = function leave(name) {
this.dispatch('leave channel', name);
};
WebSocket.prototype.dispatch = function dispatch() {
var emit = EventEmitter.prototype.emit;
return emit.apply(this, arguments);
};
WebSocket.prototype.emit = function emit() {
return this.socket.emit.apply(this.socket, arguments);
};
WebSocket.prototype.call = function call() {
var socket = this.socket;
var args = new Array(arguments.length);
var i;
for (i = 0; i < arguments.length; i++)
args[i] = arguments[i];
return new Promise(function(resolve, reject) {
args.push(co.wrap(resolve, reject));
socket.emit.apply(socket, args);
});
};
WebSocket.prototype.destroy = function destroy() {
return this.socket.disconnect();
};
/*
* Expose
*/

View File

@ -1228,7 +1228,7 @@ RPC.prototype._createTemplate = co(function* _createTemplate(version, coinbase,
attempt.version |= 1 << deploy.bit;
case common.thresholdStates.STARTED:
if (!deploy.force) {
if (rules.indexOf(name) === -1)
if (!rules || rules.indexOf(name) === -1)
attempt.version &= ~(1 << deploy.bit);
name = '!' + name;
}
@ -1236,7 +1236,7 @@ RPC.prototype._createTemplate = co(function* _createTemplate(version, coinbase,
break;
case common.thresholdStates.ACTIVE:
if (!deploy.force) {
if (rules.indexOf(name) === -1)
if (!rules || rules.indexOf(name) === -1)
throw new RPCError('Client must support ' + name + '.');
name = '!' + name;
}

View File

@ -154,7 +154,7 @@ RPCBase.prototype.execute = co(function* execute(json, help) {
if (!func) {
for (i = 0; i < this.mounts.length; i++) {
mount = this.mounts[i];
if (mount.call[json.method])
if (mount.calls[json.method])
return yield mount.execute(json, help);
}
throw new RPCError('Method not found: ' + json.method + '.');

View File

@ -29,7 +29,7 @@ var RPC = require('./rpc');
* @param {Object} options
* @param {Fullnode} options.node
* @see HTTPBase
* @emits HTTPServer#websocket
* @emits HTTPServer#socket
*/
function HTTPServer(options) {
@ -92,7 +92,6 @@ HTTPServer.prototype.initRouter = function initRouter() {
if (!this.options.noAuth) {
this.use(this.basicAuth({
username: 'bitcoinrpc',
password: this.options.apiKey,
realm: 'node'
}));
@ -361,8 +360,8 @@ HTTPServer.prototype.initSockets = function initSockets() {
if (!this.io)
return;
this.on('socket', function(ws) {
self.handleSocket(ws);
this.on('socket', function(socket) {
self.handleSocket(socket);
});
};
@ -536,6 +535,11 @@ HTTPServer.prototype.handleAuth = function handleAuth(socket) {
this.bindChain();
};
/**
* Bind to chain events.
* @private
*/
HTTPServer.prototype.bindChain = function bindChain() {
var self = this;
var pool = this.mempool || this.pool;
@ -603,6 +607,14 @@ HTTPServer.prototype.bindChain = function bindChain() {
});
};
/**
* Filter block by socket.
* @private
* @param {WebSocket} socket
* @param {Block} block
* @returns {TX[]}
*/
HTTPServer.prototype.filterBlock = function filterBlock(socket, block) {
var txs = [];
var i, tx;
@ -619,6 +631,14 @@ HTTPServer.prototype.filterBlock = function filterBlock(socket, block) {
return txs;
};
/**
* Filter transaction by socket.
* @private
* @param {WebSocket} socket
* @param {TX} tx
* @returns {Boolean}
*/
HTTPServer.prototype.filterTX = function filterTX(socket, tx) {
var found = false;
var i, hash, input, prevout, output;
@ -655,12 +675,29 @@ HTTPServer.prototype.filterTX = function filterTX(socket, tx) {
return false;
};
/**
* Scan using a socket's filter.
* @private
* @param {WebSocket} socket
* @param {Hash} start
* @returns {Promise}
*/
HTTPServer.prototype.scan = co(function* scan(socket, start) {
var scanner = this.scanner.bind(this, socket);
yield this.node.scan(start, socket.filter, scanner);
return null;
});
/**
* Handle rescan iteration.
* @private
* @param {WebSocket} socket
* @param {ChainEntry} entry
* @param {TX[]} txs
* @returns {Promise}
*/
HTTPServer.prototype.scanner = function scanner(socket, entry, txs) {
var block = entry.toRaw();
var raw = [];

View File

@ -170,7 +170,7 @@ FullNode.prototype._init = function _init() {
this.http.on('error', onError);
this.mempool.on('tx', function(tx) {
self.miner.notifyEntry();
self.miner.cpu.notifyEntry();
self.emit('tx', tx);
});

View File

@ -23,11 +23,11 @@ var RPC = require('./rpc');
/**
* HTTPServer
* @alias module:http.Server
* @alias module:wallet.HTTPServer
* @constructor
* @param {Object} options
* @see HTTPBase
* @emits HTTPServer#websocket
* @emits HTTPServer#socket
*/
function HTTPServer(options) {
@ -62,7 +62,7 @@ HTTPServer.prototype.attach = function attach(server) {
};
/**
* Initialize routes.
* Initialize http server.
* @private
*/
@ -96,7 +96,6 @@ HTTPServer.prototype.initRouter = function initRouter() {
if (!this.options.noAuth) {
this.use(this.basicAuth({
username: 'walletrpc',
password: this.options.apiKey,
realm: 'wallet'
}));
@ -769,8 +768,6 @@ HTTPServer.prototype.initRouter = function initRouter() {
yield req.wallet.resend();
res.send(200, { success: true });
}));
this.initSockets();
};
/**

View File

@ -7,9 +7,11 @@
exports.Account = require('./account');
exports.Client = require('./client');
exports.common = require('./common');
exports.HTTPServer = require('./http');
exports.layout = require('./layout');
exports.MasterKey = require('./masterkey');
exports.Path = require('./path');
exports.RPC = require('./rpc');
exports.records = require('./records');
exports.TXDB = require('./txdb');
exports.WalletDB = require('./walletdb');