http features.

This commit is contained in:
Christopher Jeffrey 2016-03-09 15:11:36 -08:00
parent 9f56efe0d7
commit cee8b65989
4 changed files with 354 additions and 15 deletions

View File

@ -7,6 +7,13 @@
var EventEmitter = require('events').EventEmitter;
var StringDecoder = require('string_decoder').StringDecoder;
var url = require('url');
var engine;
try {
engine = require('engine.io');
} catch (e) {
;
}
var bcoin = require('../bcoin');
var bn = require('bn.js');
@ -48,6 +55,7 @@ HTTPServer.prototype._init = function _init() {
var self = this;
this._initRouter();
this._initEngine();
this.server.on('connection', function(socket) {
socket.on('error', function(err) {
@ -216,6 +224,9 @@ HTTPServer.prototype._init = function _init() {
// Wallet Balance
this.get('/wallet/:id/balance', function(req, res, next, send) {
var id = req.params.id;
if (id === '_all')
id = null;
self.node.walletdb.getBalance(req.params.id, function(err, balance) {
if (err)
return next(err);
@ -229,6 +240,9 @@ HTTPServer.prototype._init = function _init() {
// Wallet UTXOs
this.get('/wallet/:id/coin', function(req, res, next, send) {
var id = req.params.id;
if (id === '_all')
id = null;
self.node.walletdb.getCoins(req.params.id, function(err, coins) {
if (err)
return next(err);
@ -240,8 +254,27 @@ HTTPServer.prototype._init = function _init() {
});
});
// Wallet TX
this.get('/wallet/:id/coin/:hash/:index', function(req, res, next, send) {
var id = req.params.id;
if (id === '_all')
id = null;
self.node.walletdb.getCoin(req.params.hash, +req.params.index, function(err, coin) {
if (err)
return next(err);
if (!coin)
return send(404);
send(200, coin.toJSON());
});
});
// Wallet TXs
this.get('/wallet/:id/all', function(req, res, next, send) {
this.get('/wallet/:id/tx/all', function(req, res, next, send) {
var id = req.params.id;
if (id === '_all')
id = null;
self.node.walletdb.getAll(req.params.id, function(err, txs) {
if (err)
return next(err);
@ -254,7 +287,10 @@ HTTPServer.prototype._init = function _init() {
});
// Wallet Pending TXs
this.get('/wallet/:id/pending', function(req, res, next, send) {
this.get('/wallet/:id/tx/pending', function(req, res, next, send) {
var id = req.params.id;
if (id === '_all')
id = null;
self.node.walletdb.getPending(req.params.id, function(err, txs) {
if (err)
return next(err);
@ -266,9 +302,95 @@ HTTPServer.prototype._init = function _init() {
});
});
// Emit events for any wallet txs that come in.
this.node.on('wallet tx', function(tx, ids) {
self.sendWebhook({ tx: tx, ids: ids });
// Wallet TXs within time range
this.get('/wallet/:id/tx/range', function(req, res, next, send) {
var id = req.params.id;
if (id === '_all')
id = null;
var options = {
start: +req.query.start,
end: +req.query.end,
limit: +req.query.limit
};
self.node.walletdb.getTimeRange(req.params.id, options, function(err, txs) {
if (err)
return next(err);
if (!txs.length)
return send(404);
send(200, txs.map(function(tx) { return tx.toJSON(); }));
});
});
// Wallet TXs within time range
this.get('/wallet/:id/tx/last', function(req, res, next, send) {
var id = req.params.id;
if (id === '_all')
id = null;
self.node.walletdb.getTimeRange(id, +req.query.limit, function(err, txs) {
if (err)
return next(err);
if (!txs.length)
return send(404);
send(200, txs.map(function(tx) { return tx.toJSON(); }));
});
});
// Wallet TX
this.get('/wallet/:id/tx/:hash', function(req, res, next, send) {
self.node.walletdb.getTX(req.params.hash, function(err, tx) {
if (err)
return next(err);
if (!tx)
return send(404);
send(200, tx.toJSON());
});
});
this.post('/broadcast', function(req, res, next, send) {
self.node.pool.broadcast(bcoin.tx.fromRaw(req.body.tx, 'hex'));
send(200, { success: true });
});
};
HTTPServer.prototype._initEngine = function _initEngine() {
var self = this;
if (!engine)
return;
this.clients = [];
this.engine = new engine.Server();
this.engine.attach(this.server);
this.engine.on('connection', function(socket) {
var s = new Socket(self, socket);
socket.on('message', function(data) {
s.parse(data);
});
socket.on('close', function() {
s.destroy();
});
socket.on('error', function(err) {
self.emit('error', err);
});
s.on('error', function(err) {
self.emit('error', err);
});
});
};
@ -392,8 +514,6 @@ HTTPServer.prototype.del = function del(path, callback) {
this.routes.del.push({ path: path, callback: callback });
};
// Send webhooks to notify other servers
// of incoming txs and wallet updates.
HTTPServer.prototype.sendWebhook = function sendWebhook(msg, callback) {
var request, body, secret, hmac;
@ -586,6 +706,215 @@ function unescape(str) {
}
}
function Socket(server, socket) {
this.server = server;
this.engine = server.engine;
this.node = server.node;
this.walletdb = server.node.walletdb;
this.socket = socket;
this.listeners = {};
this._init();
}
Socket.prototype._init = function _init() {
this.server.clients.push(this);
};
Socket.prototype.parse = function parse(msg) {
var size, off, header, payload;
size = utils.readIntv(msg, 0);
off = size.off;
size = size.r;
if (off + size > msg.length)
return this.send({ event: 'error', msg: 'Size larger than message.' });
try {
header = JSON.parse(msg.slice(off, off + size).toString('utf8'));
off += size;
} catch (e) {
return this.send({ event: 'error', msg: 'Header malformed.' });
}
payload = data.slice(off);
try {
return this._handle(header, payload);
} catch (e) {
return this.send({ event: 'error', msg: e.message + '' });
}
};
Socket.prototype._handle = function _handle(header, payload) {
if (header.cmd === 'handshake')
return this._handleHandshake(header, payload);
if (header.cmd === 'listen')
return this._handleListen(header, payload);
if (header.cmd === 'unlisten')
return this._handleUnlisten(header, payload);
if (header.event)
return this.emit(header.event, header, payload);
throw new Error('Not a valid command.');
};
Socket.prototype.destroy = function destroy() {
this.remove();
this.cleanup();
return this.clients.destroy();
};
Socket.prototype.remove = function remove() {
var i = this.server.clients.indexOf(this);
if (i !== -1)
this.server.clients.splice(i, 1);
};
Socket.prototype.cleanup = function cleanup() {
Object.keys(this.listeners).forEach(function(id) {
this.unlistenWallet(id);
}, this);
};
Socket.prototype.unlistenWallet = function unlistenWallet(id) {
this.listeners[id].forEach(function(listener) {
this.walletdb.removeListener(listener[0], listener[1]);
}, this);
delete this.listeners[id];
};
Socket.prototype._listenWallet = function _listenWallet(id, event, listener) {
if (!this.listeners[id])
this.listeners[id] = [];
this.listeners[id].push([event, listener]);
this.walletdb.on(event, listener);
};
Socket.prototype.listenWallet = function listenWallet(event, id, listener) {
if (id === '_all') {
this._listenWallet(id, 'tx', function(tx, map) {
self.send({ event: 'tx', map: map, id: id, _payload: tx.toExtended() });
});
this._listenWallet(id, 'updated', function(tx, map) {
self.send({ event: 'updated', map: map, id: id, _payload: tx.toExtended() });
});
this._listenWallet(id, 'confirmed', function(tx, map) {
self.send({ event: 'confirmed', map: map, id: id, _payload: tx.toExtended() });
});
this._listenWallet(id, 'unconfirmed', function(tx, map) {
self.send({ event: 'unconfirmed', map: map, id: id, _payload: tx.toExtended() });
});
this._listenWallet(id, 'balances', function(balances) {
Object.keys(balances).forEach(function(key) {
balances[key] = utils.btc(balances[key]);
});
self.send({ event: 'balances', id: id, balances: balances });
});
return;
}
this._listenWallet(id, id + ' tx', function(tx) {
self.send({ event: 'tx', id: id, _payload: tx.toExtended() });
});
this._listenWallet(id, id + ' updated', function(tx) {
self.send({ event: 'updated', id: id, _payload: tx.toExtended() });
});
this._listenWallet(id, id + ' confirmed', function(tx) {
self.send({ event: 'confirmed', id: id, _payload: tx.toExtended() });
});
this._listenWallet(id, id + ' unconfirmed', function(tx) {
self.send({ event: 'unconfirmed', id: id, _payload: tx.toExtended() });
});
this._listenWallet(id, id + ' balance', function(balance) {
self.send({ event: 'balance', id: id, balance: utils.btc(balance) });
});
};
Socket.prototype._onListen = function _onListen(header, payload) {
var self = this;
var id = header.id;
if (typeof id !== 'string')
throw new Error('Wallet ID is not a string.');
if (id.length > 1000)
throw new Error('Wallet ID too large.');
if (!/^[a-zA-Z0-9]+$/.test(id))
throw new Error('Wallet ID must be alphanumeric.');
if (this.listeners[id])
throw new Error('Already listening.');
this.listenWallet(id);
};
Socket.prototype._onUnlisten = function _onUnlisten(header, payload) {
var self = this;
var id = header.id;
if (typeof id !== 'string')
throw new Error('Wallet ID is not a string.');
if (id.length > 1000)
throw new Error('Wallet ID too large.');
if (!/^[a-zA-Z0-9]+$/.test(id))
throw new Error('Wallet ID must be alphanumeric.');
if (!this.listeners[id])
throw new Error('Not listening.');
this.unlistenWallet(id);
};
Socket.prototype._handleHandshake = function _handleHandshake(header, payload) {
if (this._activated)
throw new Error('Handshake already completed.');
if (payload.length > 0)
throw new Error('Unexpected payload.');
this._activated = true;
};
Socket.prototype.send = function send(data) {
var header, payload, size, msg;
if (data._payload) {
payload = data._payload;
delete data._payload;
} else {
payload = new Buffer([]);
}
assert(Buffer.isBuffer(payload));
header = new Buffer(JSON.stringify(data), 'utf8');
size = new Buffer(utils.sizeIntv(header.length));
utils.writeIntv(size, header.length, 0);
msg = Buffer.concat([size, header, payload]);
return this.socket.send(msg);
};
/**
* Expose
*/

View File

@ -104,12 +104,6 @@ Fullnode.prototype._init = function _init() {
self.emit('error', err);
});
// Emit events for any TX we see that's
// is relevant to one of our wallets.
this.walletdb.on('wallet tx', function(tx, ids) {
self.emit('wallet tx', tx, ids);
});
if (0)
this.on('tx', function(tx) {
self.walletdb.addTX(tx, function(err) {

View File

@ -303,6 +303,11 @@ Wallet.prototype.createAddress = function createAddress(change) {
this.receiveAddress = address;
}
if (this.provider && this.provider.sync) {
assert(!this.provider.update);
this.provider.sync(this, address);
}
return address;
};

View File

@ -124,39 +124,50 @@ WalletDB.prototype._init = function _init() {
});
this.tx.on('tx', function(tx, map) {
self.emit('tx', tx, map);
map.all.forEach(function(id) {
self.emit(id + ' tx', tx);
});
});
this.tx.on('confirmed', function(tx, map) {
self.emit('confirmed', tx, map);
map.all.forEach(function(id) {
self.emit(id + ' confirmed', tx);
});
});
this.tx.on('unconfirmed', function(tx, map) {
self.emit('unconfirmed', tx, map);
map.all.forEach(function(id) {
self.emit(id + ' unconfirmed', tx);
});
});
this.tx.on('updated', function(tx, map) {
self.emit('wallet tx', tx, map);
var balances = {};
self.emit('updated', tx, map);
utils.forEachSerial(map.output, function(id, next) {
if (self.listeners(id + ' balance').length === 0)
if (self.listeners('balance').length === 0
&& self.listeners(id + ' balance').length === 0) {
return next();
}
self.getBalance(id, function(err, balance) {
if (err)
return self.emit('error', err);
balances[id] = balance;
self.emit(id + ' balance', balance);
});
}, function(err) {
if (err)
self.emit('error', err);
self.emit('balances', balances);
});
// Only sync for confirmed txs.