async chain.

This commit is contained in:
Christopher Jeffrey 2016-02-16 03:03:11 -08:00
parent 5b7ac01e84
commit e912f78814
6 changed files with 457 additions and 257 deletions

View File

@ -443,6 +443,8 @@ Block.prototype.verifyContext = function verifyContext() {
}
// BIP30 - Ensure there are no duplicate txids
// this.node.hasTX(tx.hash('hex'), function(err, has) {
// if (has)
if (this.chain[tx.hash('hex')]) {
// Blocks 91842 and 91880 created duplicate
// txids by using the same exact output script
@ -465,19 +467,19 @@ Block.prototype.verifyContext = function verifyContext() {
if (!input.output)
continue;
assert(input.output);
// Ensure tx is not double spending an output
if (!input.output) {
utils.debug('Block is using spent inputs: %s (tx: %s, output: %s)',
this.rhash, tx.hash('hex'),
input.prevout.hash + '/' + input.prevout.index);
return false;
}
// Verify the script
if (!tx.verify(j, true, flags)) {
utils.debug('Block has invalid inputs: %s', this.rhash);
return false;
}
// Ensure tx is not double spending an output
// if (this.chain.isSpent(input.prevout.hash, input.prevout.index)) {
// utils.debug('Block is using spent inputs: %s', this.rhash);
// return false;
// }
}
}
}

View File

@ -259,50 +259,149 @@ Chain.prototype._preload = function _preload(callback) {
});
};
Chain.prototype._addEntry = function _addEntry(entry) {
Chain.prototype._saveBlock = function _saveBlock(block, callback) {
var node = bcoin.node.global;
if (!node)
return callback();
node.block.saveBlock(block, function(err) {
if (err)
return callback(err);
node.mempool.addBlock(block);
return callback();
});
};
Chain.prototype._fillCoins = function _fillCoin(block, callback) {
var node = bcoin.node.global;
if (!node)
return callback();
node.block.fillCoins(block, callback);
};
Chain.prototype._verifyContext = function _verifyContext(block, prev, callback) {
var node = bcoin.node.global;
if (!node)
return callback(null, block.verifyContext());
var height = prev.height + 1;
var scriptChecks = true;
node.block.fillCoins(block, function(err) {
var pending;
if (err)
return callback(err);
pending = block.txs.length;
// If we are an ancestor of a checkpoint, we can
// skip the input verification.
if (height < network.checkpoints.lastHeight && !network.checkpoints[height])
scriptChecks = false;
if (!block.verifyContext())
return callback(null, false);
if (!pending)
return callback(null, true);
// Check all transactions
block.txs.forEach(function(tx) {
var i;
for (i = 0; j < tx.inputs.length; i++) {
input = tx.inputs[i];
// Ensure tx is not double spending an output
if (!input.output) {
utils.debug('Block is using spent inputs: %s (tx: %s, output: %s)',
this.rhash, tx.hash('hex'),
input.prevout.hash + '/' + input.prevout.index);
return callback(null, false);
}
}
// BIP30 - Ensure there are no duplicate txids
node.block.hasTX(tx.hash('hex'), function(err, has) {
// Blocks 91842 and 91880 created duplicate
// txids by using the same exact output script
// and extraNonce.
if (has) {
utils.debug('Block is overwriting txids: %s', this.rhash);
if (!(network.type === 'main' && (height === 91842 || height === 91880)))
return callback(null, false);
}
return callback(null, true);
});
});
});
};
Chain.prototype._removeBlock = function _removeBlock(tip, callback) {
var node = bcoin.node.global;
if (!node)
return callback();
node.block.removeBlock(tip, function(err, block) {
if (err)
return callback(err);
if (!block)
return;
node.mempool.removeBlock(block);
});
};
Chain.prototype._addEntry = function _addEntry(entry, block, callback) {
var self = this;
var existing;
callback = utils.asyncify(callback);
// Already added
if (this.heightLookup[entry.hash] != null) {
assert(this.heightLookup[entry.hash] === entry.height);
return Chain.codes.unchanged;
return callback(null, Chain.codes.unchanged);
}
// Duplicate height
existing = this.db.get(entry.height);
if (existing && existing.hash === entry.hash)
return Chain.codes.unchanged;
return callback(null, Chain.codes.unchanged);
// Fork at checkpoint
checkpoint = network.checkpoints[entry.height];
if (checkpoint) {
this.emit('checkpoint', entry.height, entry.hash, checkpoint);
if (hash !== checkpoint) {
// Resetting to the last checkpoint _really_ isn't
// necessary (even bitcoind doesn't do it), but it
// could be used if you want to be on the overly
// safe (see: paranoid) side.
// this.resetLastCheckpoint(entry.height);
return Chain.codes.badCheckpoint;
}
}
this._saveBlock(block, function(err) {
if (err)
return callback(err);
this._saveEntry(entry, true);
self._saveEntry(entry, function(err) {
if (err)
return callback(err);
return Chain.codes.okay;
return callback(null, Chain.codes.okay);
});
});
};
Chain.prototype._saveEntry = function _saveEntry(entry, save) {
if (save)
this.db.save(entry);
Chain.prototype._saveEntry = function _saveEntry(entry, callback) {
this.heightLookup[entry.hash] = entry.height;
if (!this.tip || entry.height > this.tip.height) {
this.tip = entry;
this.emit('tip', this.tip);
}
if (callback) {
if (typeof callback !== 'function')
callback = null;
this.db.save(entry, callback);
}
};
Chain.prototype.resetLastCheckpoint = function resetLastCheckpoint(height) {
@ -355,18 +454,19 @@ Chain.prototype.resetTime = function resetTime(ts) {
return this.resetHeight(entry.height);
};
Chain.prototype.add = function add(block, peer) {
Chain.prototype.add = function add(block, peer, callback) {
var self = this;
var initial = block;
var code = Chain.codes.unchanged;
var hash, prevHash, prevHeight, entry, tip, existing;
var hash, prevHash, prevHeight, entry, tip, existing, checkpoint;
var total = 0;
for (;;) {
(function next() {
hash = block.hash('hex');
prevHash = block.prevBlock;
// Find the previous block height/index.
prevHeight = this.heightLookup[prevHash];
prevHeight = self.heightLookup[prevHash];
// Validate the block we want to add.
// This is only necessary for new
@ -374,50 +474,50 @@ Chain.prototype.add = function add(block, peer) {
// orphans.
if (block === initial && !block.verify()) {
code = Chain.codes.invalid;
this.emit('invalid', {
self.emit('invalid', {
height: prevHeight + 1,
hash: hash
}, peer);
break;
return done(null, code);
}
// If the block is already known to be
// an orphan, ignore it.
if (this.orphan.map[prevHash]) {
if (self.orphan.map[prevHash]) {
// If the orphan chain forked, simply
// reset the orphans and find a new peer.
if (this.orphan.map[prevHash].hash('hex') !== hash) {
this.orphan.map = {};
this.orphan.bmap = {};
this.orphan.count = 0;
this.orphan.size = 0;
this.emit('fork', {
if (self.orphan.map[prevHash].hash('hex') !== hash) {
self.orphan.map = {};
self.orphan.bmap = {};
self.orphan.count = 0;
self.orphan.size = 0;
self.emit('fork', {
height: -1,
expected: this.orphan.map[prevHash].hash('hex'),
expected: self.orphan.map[prevHash].hash('hex'),
received: hash,
checkpoint: false
}, peer);
code = Chain.codes.forked;
break;
return done(null, code);
}
code = Chain.codes.knownOrphan;
break;
return done(null, code);
}
// If previous block wasn't ever seen,
// add it current to orphans and break.
if (prevHeight == null) {
this.orphan.count++;
this.orphan.size += block.getSize();
this.orphan.map[prevHash] = block;
this.orphan.bmap[hash] = block;
self.orphan.count++;
self.orphan.size += block.getSize();
self.orphan.map[prevHash] = block;
self.orphan.bmap[hash] = block;
code = Chain.codes.newOrphan;
total++;
break;
return done(null, code);
}
// Create a new chain entry.
entry = new bcoin.chainblock(this, {
entry = new bcoin.chainblock(self, {
hash: hash,
version: block.version,
prevBlock: prevHash,
@ -430,9 +530,9 @@ Chain.prototype.add = function add(block, peer) {
// Add entry if we do not have it (or if
// there is another entry at its height)
existing = this.db.get(entry.height);
existing = self.db.get(entry.height);
if (!existing || existing.hash !== hash) {
assert(this.heightLookup[entry.hash] == null);
assert(self.heightLookup[entry.hash] == null);
// A valid block with an already existing
// height came in, that spells fork. We
@ -443,9 +543,9 @@ Chain.prototype.add = function add(block, peer) {
// The tip has more chainwork, it is a
// higher height than the entry. This is
// not an alternate tip. Ignore it.
if (this.tip.chainwork.cmp(entry.chainwork) > 0) {
if (self.tip.chainwork.cmp(entry.chainwork) > 0) {
code = Chain.codes.unchanged;
break;
return done(null, code);
}
// Get _our_ tip as opposed to
// the attempted alternate tip.
@ -453,57 +553,63 @@ Chain.prototype.add = function add(block, peer) {
// The block has equal chainwork (an
// alternate tip). Reset the chain, find
// a new peer, and wait to see who wins.
this.resetHeight(entry.height - 1);
this.emit('fork', {
self.resetHeight(entry.height - 1);
self.emit('fork', {
height: prevHeight + 1,
expected: tip.hash,
received: hash,
checkpoint: false
}, peer);
code = Chain.codes.forked;
break;
return self._removeBlock(tip.hash, function(err) {
if (err)
return done(err);
return done(null, code);
});
}
// Do "contextual" verification on our block
// now that we're certain its previous
// block is in the chain.
if (!block.verifyContext()) {
code = Chain.codes.invalid;
this.emit('invalid', {
height: prevHeight + 1,
hash: hash
}, peer);
break;
}
// Attempt to add block to the chain index.
code = this._addEntry(entry);
// Result should never be `unchanged` since
// we already verified there were no
// duplicate heights, etc.
assert(code !== Chain.codes.unchanged);
// Fork at checkpoint
// Block did not match the checkpoint. The
// chain could be reset to the last sane
// checkpoint, but it really isn't necessary,
// so we don't do it. The misbehaving peer has
// been killed and hopefully we find a peer
// who isn't trying to fool us.
if (code === Chain.codes.badCheckpoint) {
this.emit('fork', {
height: entry.height,
expected: network.checkpoints[entry.height],
received: entry.hash,
checkpoint: true
});
break;
checkpoint = network.checkpoints[entry.height];
if (checkpoint) {
self.emit('checkpoint', entry.height, entry.hash, checkpoint);
if (hash !== checkpoint) {
// Resetting to the last checkpoint _really_ isn't
// necessary (even bitcoind doesn't do it), but it
// could be used if you want to be on the overly
// safe (see: paranoid) side.
// this.resetLastCheckpoint(entry.height);
code = Chain.codes.badCheckpoint;
self.emit('fork', {
height: entry.height,
expected: network.checkpoints[entry.height],
received: entry.hash,
checkpoint: true
});
return done(null, code);
}
}
// Should never happen, but... something
// went wrong. Ignore this block.
if (code !== Chain.codes.okay)
break;
// Could fill here for contextual verification.
// Also check isSpent here!
// self._fillCoins(block, function(err) {
// Do "contextual" verification on our block
// now that we're certain its previous
// block is in the chain.
if (!block.verifyContext()) {
code = Chain.codes.invalid;
self.emit('invalid', {
height: prevHeight + 1,
hash: hash
}, peer);
return done(null, code);
}
// Update the block height
block.height = entry.height;
@ -511,50 +617,79 @@ Chain.prototype.add = function add(block, peer) {
tx.height = entry.height;
});
// Keep track of the number of blocks we
// added and the number of orphans resolved.
total++;
// Attempt to add block to the chain index.
self._addEntry(entry, block, function(err, code_) {
if (err)
return done(err);
// Emit our block (and potentially resolved
// orphan) so the programmer can save it.
this.emit('block', block, peer);
this.emit('entry', entry);
if (block !== initial)
this.emit('resolved', block, peer);
code = code_;
// Fullfill request
this.request.fullfill(hash, block);
// Result should never be `unchanged` since
// we already verified there were no
// duplicate heights, etc.
assert(code !== Chain.codes.unchanged);
// Should always be okay.
assert(code === Chain.codes.okay);
// Keep track of the number of blocks we
// added and the number of orphans resolved.
total++;
// Emit our block (and potentially resolved
// orphan) so the programmer can save it.
self.emit('block', block, peer);
self.emit('entry', entry);
if (block !== initial)
self.emit('resolved', block, peer);
// Fullfill request
self.request.fullfill(hash, block);
handleOrphans();
});
} else {
handleOrphans();
}
if (!this.orphan.map[hash])
break;
function handleOrphans() {
if (!self.orphan.map[hash])
return done(null, code);
// An orphan chain was found, start resolving.
block = this.orphan.map[hash];
delete this.orphan.bmap[block.hash('hex')];
delete this.orphan.map[hash];
this.orphan.count--;
this.orphan.size -= block.getSize();
// An orphan chain was found, start resolving.
block = self.orphan.map[hash];
delete self.orphan.bmap[block.hash('hex')];
delete self.orphan.map[hash];
self.orphan.count--;
self.orphan.size -= block.getSize();
next();
}
})();
function done(err, code) {
// Failsafe for large orphan chains. Do not
// allow more than 20mb stored in memory.
if (self.orphan.size > 20971520) {
Object.keys(self.orphan.bmap).forEach(function(hash) {
self.emit('unresolved', self.orphan.bmap[hash], peer);
});
self.orphan.map = {};
self.orphan.bmap = {};
self.orphan.count = 0;
self.orphan.size = 0;
}
if (code !== Chain.codes.okay) {
if (!(self.options.multiplePeers && code === Chain.codes.newOrphan))
utils.debug('Chain Error: %s', Chain.msg(code));
}
if (err)
return callback(err);
return callback(null, total);
}
// Failsafe for large orphan chains. Do not
// allow more than 20mb stored in memory.
if (this.orphan.size > 20971520) {
Object.keys(this.orphan.bmap).forEach(function(hash) {
this.emit('unresolved', this.orphan.bmap[hash], peer);
}, this);
this.orphan.map = {};
this.orphan.bmap = {};
this.orphan.count = 0;
this.orphan.size = 0;
}
if (code !== Chain.codes.okay) {
if (!(this.options.multiplePeers && code === Chain.codes.newOrphan))
utils.debug('Chain Error: %s', Chain.msg(code));
}
return total;
};
Chain.prototype.has = function has(hash) {

View File

@ -222,8 +222,8 @@ ChainDB.prototype.getAsync = function getAsync(height, callback) {
});
};
ChainDB.prototype.save = function save(entry) {
return this.saveAsync(entry);
ChainDB.prototype.save = function save(entry, callback) {
return this.saveAsync(entry, callback);
};
ChainDB.prototype.saveSync = function saveSync(entry) {

View File

@ -60,6 +60,12 @@ Node.prototype._init = function _init() {
this.pool = new bcoin.pool(this.options.pool);
this.chain = this.pool.chain;
if (0)
this.pool.on('block', function(block, peer) {
self.mempool.addBlock(block);
});
if (0)
this.pool.on('block', function(block, peer) {
self.block.saveBlock(block, function(err) {
if (err)
@ -93,6 +99,12 @@ Node.prototype._init = function _init() {
self.emit('error', err);
});
this.pool.on('fork', function(data) {
if (data.block)
self.mempool.removeBlock(block);
});
if (0)
this.pool.on('fork', function(a, b) {
[a, b].forEach(function(hash) {
self.block.removeBlock(hash, function(err, block) {

View File

@ -380,10 +380,15 @@ Pool.prototype._addLoader = function _addLoader() {
return;
// If the peer sent us a block that was added
// to the chain (not orphans), reset the timeout.
if (self._handleBlock(block, peer)) {
self._startInterval();
self._startTimer();
}
self._handleBlock(block, peer, function(err, added) {
if (err)
self.emit('error', err);
if (added) {
self._startInterval();
self._startTimer();
}
});
});
peer.on('block', function(block) {
@ -391,10 +396,15 @@ Pool.prototype._addLoader = function _addLoader() {
return;
// If the peer sent us a block that was added
// to the chain (not orphans), reset the timeout.
if (self._handleBlock(block, peer)) {
self._startInterval();
self._startTimer();
}
self._handleBlock(block, peer, function(err, added) {
if (err)
self.emit('error', err);
if (added) {
self._startInterval();
self._startTimer();
}
});
});
if (self.options.headers) {
@ -590,126 +600,152 @@ Pool.prototype._prehandleBlock = function _prehandleBlock(block, peer, callback)
return callback(null, block);
};
Pool.prototype._handleBlock = function _handleBlock(block, peer) {
Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
var self = this;
var requested;
// Fulfill our request.
requested = this._response(block);
callback = utils.asyncify(callback);
// Emulate BIP37: emit all the filtered transactions.
if (this.options.fullNode && this.listeners('watched').length > 0) {
block.txs.forEach(function(tx) {
if (self.isWatched(tx))
self.emit('watched', tx, peer);
this._prehandleBlock(block, peer, function(err) {
var requested;
if (err)
return callback(err);
// Fulfill our request.
requested = self._response(block);
// Emulate BIP37: emit all the filtered transactions.
if (self.options.fullNode && self.listeners('watched').length > 0) {
block.txs.forEach(function(tx) {
if (self.isWatched(tx))
self.emit('watched', tx, peer);
});
}
// Ensure the block was not invalid last time.
// Someone might be sending us bad blocks to DoS us.
if (self.block.invalid[block.hash('hex')]) {
utils.debug('Peer is sending an invalid chain (%s)', peer.host);
self.setMisbehavior(peer, 100);
return callback(null, false);
}
// Ensure this is not a continuation
// of an invalid chain.
if (self.block.invalid[block.prevBlock]) {
utils.debug(
'Peer is sending an invalid continuation chain (%s)',
peer.host);
self.setMisbehavior(peer, 100);
return callback(null, false);
}
// Ignore if we already have.
if (self.chain.has(block)) {
utils.debug('Already have block %s (%s)', block.height, peer.host);
self.setMisbehavior(peer, 1);
return callback(null, false);
}
// Make sure the block is valid.
if (!block.verify()) {
utils.debug(
'Block verification failed for %s (%s)',
block.rhash, peer.host);
self.block.invalid[block.hash('hex')] = true;
self.setMisbehavior(peer, 100);
return callback(null, false);
}
// Someone is sending us blocks without
// us requesting them.
if (!requested) {
utils.debug(
'Recieved unrequested block: %s (%s)',
block.rhash, peer.host);
}
// Resolve orphan chain
if (!self.options.headers) {
if (!self.chain.hasBlock(block.prevBlock)) {
// Special case for genesis block.
if (block.isGenesis())
return callback(null, false);
// Make sure the peer doesn't send us
// more than 200 orphans every 3 minutes.
if (self.isOrphaning(peer)) {
utils.debug('Peer is orphaning (%s)', peer.host);
self.setMisbehavior(peer, 100);
return callback(null, false);
}
// NOTE: If we were to emit new orphans here, we
// would not need to store full blocks as orphans.
// However, the listener would not be able to see
// the height until later.
self._addIndex(block, peer, function(err, added) {
if (err)
return callback(err);
if (added)
self.emit('pool block', block, peer);
// Resolve orphan chain.
self.peers.load.loadBlocks(
self.chain.getLocator(),
self.chain.getOrphanRoot(block)
);
utils.debug('Handled orphan %s (%s)', block.rhash, peer.host);
return callback(null, false);
});
return;
}
} else {
if (!self.chain.hasBlock(block.prevBlock)) {
// Special case for genesis block.
if (block.isGenesis())
return callback(null, false);
// Increase banscore by 10 if we're using getheaders.
if (!self.options.multiplePeers) {
if (self.setMisbehavior(peer, 10))
return callback(null, false);
}
}
}
// Add to index and emit/save
self._addIndex(block, peer, function(err, added) {
if (err)
return callback(err);
if (added) {
self.emit('pool block', block, peer);
return callback(null, true);
}
return callback(null, false);
});
}
// Ensure the block was not invalid last time.
// Someone might be sending us bad blocks to DoS us.
if (this.block.invalid[block.hash('hex')]) {
utils.debug('Peer is sending an invalid chain (%s)', peer.host);
this.setMisbehavior(peer, 100);
return false;
}
// Ensure this is not a continuation
// of an invalid chain.
if (this.block.invalid[block.prevBlock]) {
utils.debug(
'Peer is sending an invalid continuation chain (%s)',
peer.host);
this.setMisbehavior(peer, 100);
return false;
}
// Ignore if we already have.
if (this.chain.has(block)) {
utils.debug('Already have block %s (%s)', block.height, peer.host);
this.setMisbehavior(peer, 1);
return false;
}
// Make sure the block is valid.
if (!block.verify()) {
utils.debug(
'Block verification failed for %s (%s)',
block.rhash, peer.host);
this.block.invalid[block.hash('hex')] = true;
this.setMisbehavior(peer, 100);
return false;
}
// Someone is sending us blocks without
// us requesting them.
if (!requested) {
utils.debug(
'Recieved unrequested block: %s (%s)',
block.rhash, peer.host);
}
// Resolve orphan chain
if (!this.options.headers) {
if (!this.chain.hasBlock(block.prevBlock)) {
// Special case for genesis block.
if (block.isGenesis())
return false;
// Make sure the peer doesn't send us
// more than 200 orphans every 3 minutes.
if (this.isOrphaning(peer)) {
utils.debug('Peer is orphaning (%s)', peer.host);
this.setMisbehavior(peer, 100);
return false;
}
// NOTE: If we were to emit new orphans here, we
// would not need to store full blocks as orphans.
// However, the listener would not be able to see
// the height until later.
if (this._addIndex(block, peer))
this.emit('pool block', block, peer);
// Resolve orphan chain.
this.peers.load.loadBlocks(
this.chain.getLocator(),
this.chain.getOrphanRoot(block)
);
utils.debug('Handled orphan %s (%s)', block.rhash, peer.host);
return false;
}
} else {
if (!this.chain.hasBlock(block.prevBlock)) {
// Special case for genesis block.
if (block.isGenesis())
return false;
// Increase banscore by 10 if we're using getheaders.
if (!this.options.multiplePeers) {
if (this.setMisbehavior(peer, 10))
return false;
}
}
}
// Add to index and emit/save
if (this._addIndex(block, peer)) {
this.emit('pool block', block, peer);
return true;
}
});
};
Pool.prototype._addIndex = function _addIndex(block, peer) {
var added = this.chain.add(block, peer);
Pool.prototype._addIndex = function _addIndex(block, peer, callback) {
var self = this;
this.chain.add(block, peer, function(err, added) {
if (err)
return callback(err);
if (added === 0)
return false;
if (added === 0)
return callback(null, false);
this.emit('chain-progress', this.chain.fillPercent(), peer);
self.emit('chain-progress', self.chain.fillPercent(), peer);
return true;
return callback(null, true);
});
};
Pool.prototype.isFull = function isFull() {
@ -816,15 +852,28 @@ Pool.prototype._createPeer = function _createPeer(options) {
return peer;
};
Pool.prototype._handleTX = function _handleTX(tx, peer) {
var requested = this._response(tx);
var added = this._addTX(tx, 1);
Pool.prototype._handleTX = function _handleTX(tx, peer, callback) {
var self = this;
if (added || tx.block)
this.emit('tx', tx, peer);
callback = utils.asyncify(callback);
if (!this.options.fullNode && tx.block)
this.emit('watched', tx, peer);
this._prehandleTX(tx, peer, function(err) {
var requested, added;
if (err)
return callback(err);
requested = self._response(tx);
added = self._addTX(tx, 1);
if (added || tx.block)
self.emit('tx', tx, peer);
if (!self.options.fullNode && tx.block)
self.emit('watched', tx, peer);
return callback();
});
};
Pool.prototype._addLeech = function _addLeech(socket) {

View File

@ -419,7 +419,9 @@ RequestCache.prototype.fullfill = function fullfill(id, err, data) {
utils.RequestCache = RequestCache;
utils.asyncify = function asyncify(fn) {
return function _asynicifedFn(err, data1, data2) {
if (fn && fn.name === '_asyncifiedFn')
return fn;
return function _asyncifiedFn(err, data1, data2) {
if (!fn)
return err || data1;
utils.nextTick(function() {