node: refactor.

This commit is contained in:
Christopher Jeffrey 2016-08-26 05:02:08 -07:00
parent c2bcd8fd74
commit 06bb874573
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
3 changed files with 131 additions and 216 deletions

View File

@ -10,7 +10,6 @@
var bcoin = require('../env');
var constants = bcoin.constants;
var utils = require('../utils/utils');
var assert = utils.assert;
var Node = bcoin.node;
/**
@ -43,6 +42,7 @@ var Node = bcoin.node;
* @property {HTTPServer} http
* @emits Fullnode#block
* @emits Fullnode#tx
* @emits Fullnode#alert
* @emits Fullnode#error
*/
@ -56,8 +56,7 @@ function Fullnode(options) {
this.chain = new bcoin.chain({
network: this.network,
logger: this.logger,
profiler: this.profiler,
db: this.db,
db: this.options.db,
location: this.location('chain'),
preload: false,
spv: false,
@ -132,7 +131,7 @@ function Fullnode(options) {
network: this.network,
logger: this.logger,
fees: this.fees,
db: this.db,
db: this.options.db,
location: this.location('walletdb'),
witness: this.options.witness,
useCheckpoints: this.options.useCheckpoints,
@ -168,50 +167,25 @@ utils.inherits(Fullnode, Node);
Fullnode.prototype._init = function _init() {
var self = this;
var onError = this._error.bind(this);
// Bind to errors
this.mempool.on('error', function(err) {
self._error(err);
});
this.chain.on('error', onError);
this.mempool.on('error', onError);
this.pool.on('error', onError);
this.miner.on('error', onError);
this.walletdb.on('error', onError);
this.miner.on('error', function(err) {
self._error(err);
});
if (this.http)
this.http.on('error', onError);
this.pool.on('error', function(err) {
switch (err.reason) {
case 'insufficient priority':
case 'non-final':
return self.logger.spam(err.message);
default:
return self._error(err);
}
});
this.chain.on('error', function(err) {
self._error(err);
});
this.walletdb.on('error', function(err) {
self._error(err);
});
if (this.http) {
this.http.on('error', function(err) {
self._error(err);
});
}
this.pool.on('alert', function(details) {
self.emit('alert', details);
this.pool.on('alert', function(alert) {
self.emit('alert', alert);
});
this.mempool.on('tx', function(tx) {
self.emit('tx', tx);
self.walletdb.addTX(tx, function(err) {
if (err)
self._error(err);
});
self.walletdb.addTX(tx, onError);
});
this.chain.on('block', function(block) {
@ -219,44 +193,25 @@ Fullnode.prototype._init = function _init() {
});
this.chain.on('connect', function(entry, block) {
self.walletdb.addBlock(entry, block.txs, function(err) {
if (err)
self._error(err);
});
self.walletdb.addBlock(entry, block.txs, onError);
if (!self.chain.isFull())
return;
self.mempool.addBlock(block, function(err) {
if (err)
self._error(err);
});
if (self.chain.synced)
self.mempool.addBlock(block, onError);
});
this.chain.on('disconnect', function(entry, block) {
self.walletdb.removeBlock(entry, function(err) {
if (err)
self._error(err);
});
self.walletdb.removeBlock(entry, onError);
if (!self.chain.isFull())
return;
self.mempool.removeBlock(block, function(err) {
if (err)
self._error(err);
});
if (self.chain.synced)
self.mempool.removeBlock(block, onError);
});
this.miner.on('block', function(block) {
self.pool.broadcast(block.toInv());
self.broadcast(block.toInv());
});
this.walletdb.on('send', function(tx) {
self.sendTX(tx, function(err) {
if (err)
self.emit('error', err);
});
self.sendTX(tx, onError);
});
};
@ -280,10 +235,10 @@ Fullnode.prototype._open = function open(callback) {
callback();
}
options = utils.merge({
options = {
id: 'primary',
passphrase: this.options.passphrase
}, this.options.wallet || {});
};
utils.serial([
this.chain.open.bind(this.chain),
@ -291,10 +246,13 @@ Fullnode.prototype._open = function open(callback) {
this.miner.open.bind(this.miner),
this.pool.open.bind(this.pool),
this.walletdb.open.bind(this.walletdb),
function(next) {
self.createWallet(options, function(err, wallet) {
function (next) {
self.walletdb.ensure(options, function(err, wallet) {
if (err)
return next(err);
return callback(err);
self.logger.info('Loaded wallet with id=%s address=%s',
wallet.id, wallet.getAddress());
// Set the miner payout address if the
// programmer didn't pass one in.
@ -311,8 +269,10 @@ Fullnode.prototype._open = function open(callback) {
self.walletdb.setTip(self.chain.tip.hash, self.chain.height, next);
return next();
}
// Always rescan to make sure we didn't miss anything:
// there is no atomicity between the chaindb and walletdb.
// Always rescan to make sure we didn't
// miss anything: there is no atomicity
// between the chaindb and walletdb.
self.walletdb.rescan(self.chain.db, next);
},
function(next) {
@ -356,7 +316,7 @@ Fullnode.prototype._close = function close(callback) {
* Broadcast a transaction (note that this will _not_ be verified
* by the mempool - use with care, lest you get banned from
* bitcoind nodes).
* @param {TX|MTX|Block} item
* @param {TX|Block} item
* @param {Function} callback
*/
@ -370,13 +330,13 @@ Fullnode.prototype.broadcast = function broadcast(item, callback) {
* @example
* node.sendTX(tx, callback);
* node.sendTX(tx, true, callback);
* @param {TX|MTX} item
* @param {TX} tx
* @param {Boolean?} wait - Wait to execute callback until a node
* requests our TX, rejects it, or the broadcast itself times out.
* @param {Function} callback - Returns [{@link VerifyError}|Error].
*/
Fullnode.prototype.sendTX = function sendTX(item, wait, callback) {
Fullnode.prototype.sendTX = function sendTX(tx, wait, callback) {
var self = this;
if (!callback) {
@ -384,19 +344,30 @@ Fullnode.prototype.sendTX = function sendTX(item, wait, callback) {
wait = null;
}
this.mempool.addTX(item, function(err) {
if (err)
this.mempool.addTX(tx, function(err) {
if (err) {
if (err.type === 'VerifyError') {
self._error(err);
self.logger.warning('Verification failed for tx: %.', tx.rhash);
self.logger.warning('Attempting to broadcast anyway...');
if (!wait) {
self.pool.broadcast(tx);
return callback();
}
return self.pool.broadcast(tx, callback);
}
return callback(err);
}
if (!self.pool.options.selfish)
item = item.toInv();
if (!self.options.selfish)
tx = tx.toInv();
if (!wait) {
self.pool.broadcast(item);
self.pool.broadcast(tx);
return callback();
}
self.pool.broadcast(item, callback);
self.pool.broadcast(tx, callback);
});
};
@ -433,37 +404,6 @@ Fullnode.prototype.stopSync = function stopSync() {
return this.pool.stopSync();
};
/**
* Create a {@link Wallet} in the wallet database.
* @param {Object} options - See {@link Wallet}.
* @param {Function} callback - Returns [Error, {@link Wallet}].
*/
Fullnode.prototype.createWallet = function createWallet(options, callback) {
var self = this;
this.walletdb.ensure(options, function(err, wallet) {
if (err)
return callback(err);
assert(wallet);
self.logger.info('Loaded wallet with id=%s address=%s',
wallet.id, wallet.getAddress());
callback(null, wallet);
});
};
/**
* Retrieve a wallet from the wallet database.
* @param {String} id - Wallet ID.
* @param {Function} callback - Returns [Error, {@link Wallet}].
*/
Fullnode.prototype.getWallet = function getWallet(id, callback) {
this.walletdb.get(id, callback);
};
/**
* Retrieve a block from the chain database.
* @param {Hash} hash
@ -507,7 +447,7 @@ Fullnode.prototype.getCoin = function getCoin(hash, index, callback) {
/**
* Get coins that pertain to an address from the mempool or chain database.
* Takes into account spent coins in the mempool.
* @param {Base58Address|Base58Address[]} addresses
* @param {Address} addresses
* @param {Function} callback - Returns [Error, {@link Coin}[]].
*/
@ -532,6 +472,24 @@ Fullnode.prototype.getCoinsByAddress = function getCoinsByAddress(addresses, cal
});
};
/**
* Retrieve transactions pertaining to an
* address from the mempool or chain database.
* @param {Address} addresses
* @param {Function} callback - Returns [Error, {@link TX}[]].
*/
Fullnode.prototype.getTXByAddress = function getTXByAddress(addresses, callback) {
var mempool = this.mempool.getTXByAddress(addresses);
this.chain.db.getTXByAddress(addresses, function(err, txs) {
if (err)
return callback(err);
callback(null, mempool.concat(txs));
});
};
/**
* Retrieve a transaction from the mempool or chain database.
* @param {Hash} hash
@ -574,24 +532,6 @@ Fullnode.prototype.isSpent = function isSpent(hash, index, callback) {
this.chain.db.isSpent(hash, index, callback);
};
/**
* Retrieve transactions pertaining to an
* address from the mempool or chain database.
* @param {Base58Address|Base58Address[]} addresses
* @param {Function} callback - Returns [Error, {@link TX}[]].
*/
Fullnode.prototype.getTXByAddress = function getTXByAddress(addresses, callback) {
var mempool = this.mempool.getTXByAddress(addresses);
this.chain.db.getTXByAddress(addresses, function(err, txs) {
if (err)
return callback(err);
callback(null, mempool.concat(txs));
});
};
/**
* Fill a transaction with coins from the mempool
* and chain database (unspent only).

View File

@ -29,14 +29,13 @@ function Node(options) {
if (!options)
options = {};
options = this.parseOptions(options);
this.parseOptions(options);
this.options = options;
this.network = bcoin.network.get(options.network);
this.prefix = options.prefix;
this.logger = options.logger;
this.db = options.db;
this.mempool = null;
this.pool = null;
this.chain = null;
@ -44,6 +43,7 @@ function Node(options) {
this.miner = null;
this.walletdb = null;
this.wallet = null;
this.http = null;
this._bound = [];
@ -153,7 +153,19 @@ Node.prototype._bind = function _bind(obj, event, listener) {
*/
Node.prototype._error = function _error(err) {
this.logger.error(err);
if (!err)
return;
switch (err.reason) {
case 'insufficient priority':
case 'non-final':
this.logger.spam(err.message);
break;
default:
this.logger.error(err);
break;
}
this.emit('error', err);
};

View File

@ -9,7 +9,6 @@
var bcoin = require('../env');
var utils = require('../utils/utils');
var assert = utils.assert;
var Node = bcoin.node;
/**
@ -31,6 +30,7 @@ var Node = bcoin.node;
* @property {HTTPServer} http
* @emits SPVNode#block
* @emits SPVNode#tx
* @emits SPVNode#alert
* @emits SPVNode#error
*/
@ -43,8 +43,7 @@ function SPVNode(options) {
this.chain = new bcoin.chain({
network: this.network,
logger: this.logger,
profiler: this.profiler,
db: this.db,
db: this.options.db,
location: this.location('spvchain'),
witness: this.options.witness,
useCheckpoints: this.options.useCheckpoints,
@ -73,7 +72,7 @@ function SPVNode(options) {
this.walletdb = new bcoin.walletdb({
network: this.network,
logger: this.logger,
db: this.db,
db: this.options.db,
location: this.location('walletdb'),
witness: this.options.witness,
maxFiles: this.options.maxFiles,
@ -107,44 +106,28 @@ utils.inherits(SPVNode, Node);
SPVNode.prototype._init = function _init() {
var self = this;
var onError = this._error.bind(this);
// Bind to errors
this.pool.on('error', function(err) {
self._error(err);
});
this.chain.on('error', onError);
this.pool.on('error', onError);
this.walletdb.on('error', onError);
this.chain.on('error', function(err) {
self._error(err);
});
if (this.http)
this.http.on('error', onError);
this.walletdb.on('error', function(err) {
self._error(err);
});
if (this.http) {
this.http.on('error', function(err) {
self._error(err);
});
}
this.pool.on('alert', function(details) {
self.emit('alert', details);
this.pool.on('alert', function(alert) {
self.emit('alert', alert);
});
this.pool.on('tx', function(tx) {
self.emit('tx', tx);
self.walletdb.addTX(tx, function(err) {
if (err)
self._error(err);
});
self.walletdb.addTX(tx, onError);
});
this.chain.on('block', function(block, entry) {
self.emit('block', block);
self.walletdb.addBlock(entry, block.txs, function(err) {
if (err)
self._error(err);
});
self.walletdb.addBlock(entry, block.txs, onError);
});
this.walletdb.on('save address', function(address, path) {
@ -152,10 +135,7 @@ SPVNode.prototype._init = function _init() {
});
this.walletdb.on('send', function(tx) {
self.sendTX(tx, function(err) {
if (err)
self.emit('error', err);
});
self.sendTX(tx, onError);
});
};
@ -179,28 +159,27 @@ SPVNode.prototype._open = function open(callback) {
callback();
}
options = utils.merge({
options = {
id: 'primary',
passphrase: this.options.passphrase
}, this.options.wallet || {});
};
// Create or load the primary wallet.
utils.serial([
this.chain.open.bind(this.chain),
this.pool.open.bind(this.pool),
this.walletdb.open.bind(this.walletdb),
function (next) {
self.walletdb.open(function(err) {
self.walletdb.ensure(options, function(err, wallet) {
if (err)
return next(err);
return callback(err);
self.createWallet(options, function(err, wallet) {
if (err)
return next(err);
self.logger.info('Loaded wallet with id=%s address=%s',
wallet.id, wallet.getAddress());
self.wallet = wallet;
self.wallet = wallet;
next();
});
next();
});
},
function(next) {
@ -218,6 +197,21 @@ SPVNode.prototype._open = function open(callback) {
next();
});
},
function(next) {
if (self.options.noScan) {
self.walletdb.setTip(self.chain.tip.hash, self.chain.height, next);
return next();
}
if (self.walletdb.height === 0)
return next();
// Always replay the last block to make
// sure we didn't miss anything: there
// is no atomicity between the chaindb
// and walletdb.
self.chain.reset(self.walletdb.height - 1, next);
},
function(next) {
// Rebroadcast pending transactions.
self.wallet.resend(next);
@ -257,7 +251,7 @@ SPVNode.prototype._close = function close(callback) {
* Broadcast a transaction (note that this will _not_ be verified
* by the mempool - use with care, lest you get banned from
* bitcoind nodes).
* @param {TX|MTX|Block} item
* @param {TX|Block} item
* @param {Function} callback
*/
@ -269,22 +263,22 @@ SPVNode.prototype.broadcast = function broadcast(item, callback) {
* Broadcast a transaction (note that this will _not_ be verified
* by the mempool - use with care, lest you get banned from
* bitcoind nodes).
* @param {TX|MTX} item
* @param {TX} tx
* @param {Function} callback
*/
SPVNode.prototype.sendTX = function sendTX(item, wait, callback) {
SPVNode.prototype.sendTX = function sendTX(tx, wait, callback) {
if (!callback) {
callback = wait;
wait = null;
}
if (!wait) {
this.pool.broadcast(item);
this.pool.broadcast(tx);
return utils.nextTick(callback);
}
this.pool.broadcast(item, callback);
this.pool.broadcast(tx, callback);
};
/**
@ -311,37 +305,6 @@ SPVNode.prototype.stopSync = function stopSync() {
return this.pool.stopSync();
};
/**
* Create a {@link Wallet} in the wallet database.
* @param {Object} options - See {@link Wallet}.
* @param {Function} callback - Returns [Error, {@link Wallet}].
*/
SPVNode.prototype.createWallet = function createWallet(options, callback) {
var self = this;
this.walletdb.ensure(options, function(err, wallet) {
if (err)
return callback(err);
assert(wallet);
self.logger.info('Loaded wallet with id=%s address=%s',
wallet.id, wallet.getAddress());
callback(null, wallet);
});
};
/**
* Retrieve a wallet from the wallet database.
* @param {String} id - Wallet ID.
* @param {Function} callback - Returns [Error, {@link Wallet}].
*/
SPVNode.prototype.getWallet = function getWallet(id, callback) {
this.walletdb.get(id, callback);
};
/*
* Expose
*/