wait for flush.

This commit is contained in:
Christopher Jeffrey 2016-02-17 19:40:56 -08:00
parent 0139e1340c
commit d22dc02e8f
6 changed files with 153 additions and 52 deletions

View File

@ -44,6 +44,9 @@ function BlockDB(options) {
tx: new bcoin.lru(32 * 1024 * 1024)
};
if (+process.env.BCOIN_FRESH === 1 && bcoin.cp)
bcoin.cp.execFileSync('rm', ['-rf', this.file], { stdio: 'ignore' });
this.index = new levelup(this.file, {
keyEncoding: 'ascii',
valueEncoding: 'binary',
@ -895,6 +898,50 @@ BlockDB.prototype.hasCoin = function hasCoin(hash, index, callback) {
});
};
// For BIP30
// https://bitcointalk.org/index.php?topic=67738.0
BlockDB.prototype.hasUnspentTX = function hasUnspentTX(hash, callback) {
var self = this;
this.getTX(hash, function(err, tx) {
var called, hash, pending, spent;
if (err)
return callback(err);
if (!tx)
return callback(null, false);
hash = tx.hash('hex');
pending = tx.outputs.length;
spent = 0;
if (!pending)
return callback(null, false);
function done(err) {
if (called)
return;
called = true;
if (err)
return callback(err);
return callback(null, spent < tx.outputs.length);
}
tx.outputs.forEach(function(output, i) {
self.isSpent(hash, i, function(err, result) {
if (err)
return done(err);
if (result)
spent++;
if (!--pending)
done();
});
});
});
};
BlockDB.prototype.hasTX = function hasTX(hash, callback) {
var self = this;
var id = 't/t/' + hash;

View File

@ -1128,8 +1128,10 @@ Chain.prototype.add = function add(initial, peer, callback) {
// Start resolving the queue
// (I love asynchronous IO).
if (self.pending.length === 0)
if (self.pending.length === 0) {
self.emit('flush');
return;
}
item = self.pending.shift();
delete self.pendingBlocks[item[0].hash('hex')];

View File

@ -77,6 +77,13 @@ Mempool.prototype.removeBlock = function removeBlock(block) {
self.removeTX(mtx);
});
// Add transaction back into mempool
tx = tx.clone();
tx.ps = utils.now();
tx.ts = 0;
tx.block = null;
tx.network = true;
self.addTX(tx);
});
};
@ -201,6 +208,8 @@ Mempool.prototype.addTX = function addTX(tx, peer, callback) {
if (tx.isCoinbase())
return callback(new Error('What?'));
assert(tx.ts === 0);
this._lockTX(tx);
this.blockdb.fillCoin(tx, function(err) {
@ -217,7 +226,6 @@ Mempool.prototype.addTX = function addTX(tx, peer, callback) {
data: tx.hash(),
reason: 'no-prevout'
});
pool.setMisbehavior(peer, 100);
return callback(new Error('Previous outputs not found.'));
}
@ -316,8 +324,12 @@ Mempool.prototype.addTX = function addTX(tx, peer, callback) {
priority = tx.getPriority();
tx.inputs.forEach(function(input) {
var type = input.getType();
var address = input.getAddress();
if (type === 'pubkey' || type === 'multisig')
address = null;
if (!address)
return;
@ -328,8 +340,12 @@ Mempool.prototype.addTX = function addTX(tx, peer, callback) {
});
tx.outputs.forEach(function(output) {
var type = output.getType();
var address = output.getAddress();
if (type === 'pubkey' || type === 'multisig')
address = null;
if (!address)
return;
@ -401,8 +417,12 @@ Mempool.prototype.removeTX = function removeTX(hash, callback) {
}
tx.inputs.forEach(function(input) {
var type = input.getType();
var address = input.getAddress();
if (type === 'pubkey' || type === 'multisig')
address = null;
if (!address)
return;
@ -414,8 +434,12 @@ Mempool.prototype.removeTX = function removeTX(hash, callback) {
});
tx.outputs.forEach(function(output) {
var type = output.getType();
var address = output.getAddress();
if (type === 'pubkey' || type === 'multisig')
address = null;
if (!address)
return;

View File

@ -182,6 +182,7 @@ Peer.prototype._init = function init() {
};
Peer.prototype.createSocket = function createSocket(port, host) {
var self = this;
var net, socket;
assert(port != null);
@ -205,7 +206,7 @@ Peer.prototype.createSocket = function createSocket(port, host) {
socket.on('connect', function() {
utils.debug(
'Connected to %s:%d (priority=%s)',
host, port, this.priority);
host, port, self.priority);
});
return socket;

View File

@ -233,10 +233,7 @@ Pool.prototype._init = function _init() {
}
// Resolve orphan chain.
self.peers.load.loadBlocks(
self.chain.getLocator(),
self.chain.getOrphanRoot(data.hash)
);
self.loadOrphan(self.peers.load, null, data.hash);
} else {
// Increase banscore by 10 if we're using getheaders.
if (!self.options.multiplePeers)
@ -258,6 +255,31 @@ Pool.prototype._init = function _init() {
this.startServer();
};
Pool.prototype.loadBlocks = function loadBlocks(peer, top, stop) {
var self = this;
this._onFlush(function() {
peer.loadBlocks(self.chain.getLocator(top), stop);
});
};
Pool.prototype.loadOrphan = function loadOrphan(peer, top, orphan) {
var self = this;
assert(orphan);
this._onFlush(function() {
peer.loadBlocks(
self.chain.getLocator(top),
self.chain.getOrphanRoot(orphan)
);
});
};
Pool.prototype.loadHeaders = function loadHeaders(peer, top, stop) {
var self = this;
this._onFlush(function() {
peer.loadHeaders(self.chain.getLocator(top), stop);
});
};
Pool.prototype.startServer = function startServer() {
var self = this;
@ -529,13 +551,14 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer) {
// simply tries to find the latest block in
// the peer's chain.
if (last && headers.length === 2000)
peer.loadHeaders(this.chain.getLocator(last), null);
this.loadHeaders(peer, last, null);
// Reset interval to avoid calling getheaders unnecessarily
this._startInterval();
};
Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer) {
var self = this;
var i, hash;
assert(!this.options.headers);
@ -555,37 +578,36 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer) {
this.emit('blocks', hashes);
for (i = 0; i < hashes.length; i++) {
hash = hashes[i];
this._onFlush(function() {
for (i = 0; i < hashes.length; i++) {
hash = hashes[i];
if (this.chain.hasOrphan(hash)) {
utils.debug('Peer sent a hash that is already a known orphan.');
if (self.chain.hasOrphan(hash)) {
utils.debug('Peer sent a hash that is already a known orphan.');
// Make sure the peer doesn't send us
// more than 200 orphans every 3 minutes.
if (this.isOrphaning(peer)) {
utils.debug('Peer is orphaning (%s)', peer.host);
this.setMisbehavior(peer, 100);
return;
// Make sure the peer doesn't send us
// more than 200 orphans every 3 minutes.
if (self.isOrphaning(peer)) {
utils.debug('Peer is orphaning (%s)', peer.host);
self.setMisbehavior(peer, 100);
return;
}
// Resolve orphan chain.
self.loadOrphan(peer, null, hash);
continue;
}
// Resolve orphan chain.
peer.loadBlocks(
this.chain.getLocator(),
this.chain.getOrphanRoot(hash)
);
continue;
// Request block if we don't have it or if
// this is the last hash: this is done as
// a failsafe because we _need_ to request
// the hashContinue no matter what.
if (!self.chain.has(hash) || i === hashes.length - 1) {
self._request(peer, self.block.type, hash);
continue;
}
}
// Request block if we don't have it or if
// this is the last hash: this is done as
// a failsafe because we _need_ to request
// the hashContinue no matter what.
if (!this.chain.has(hash) || i === hashes.length - 1) {
this._request(peer, this.block.type, hash);
continue;
}
}
});
// Reset interval to avoid calling getblocks unnecessarily
this._startInterval();
@ -594,6 +616,13 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer) {
this._startTimer();
};
Pool.prototype._onFlush = function _onFlush(callback) {
if (this.chain.pending.length === 0)
return callback();
this.chain.once('flush', callback);
};
Pool.prototype._handleInv = function _handleInv(hashes, peer) {
var i, hash;
@ -605,7 +634,7 @@ Pool.prototype._handleInv = function _handleInv(hashes, peer) {
hash = utils.toHex(hashes[i]);
if (!this.chain.has(hash)) {
if (this.options.headers)
this.peers.load.loadHeaders(this.chain.getLocator(), hash);
this.loadHeaders(this.peers.load, null, hash);
else
this._request(peer, this.block.type, hash);
}
@ -635,6 +664,7 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
utils.debug(
'Recieved unrequested block: %s (%s)',
block.rhash, peer.host);
return;
}
this._prehandleBlock(block, peer, function(err) {
@ -687,9 +717,9 @@ Pool.prototype._load = function _load() {
}
if (this.options.headers)
this.peers.load.loadHeaders(this.chain.getLocator(), null);
this.loadHeaders(this.peers.load, null, null);
else
this.peers.load.loadBlocks(this.chain.getLocator(), null);
this.loadBlocks(this.peers.load, null, null);
};
Pool.prototype.loadMempool = function loadMempool() {
@ -1435,13 +1465,6 @@ Pool.prototype._request = function _request(peer, type, hash, options, cb) {
return;
}
// Block should be not in chain, or be requested
// Do not use with headers-first
// if (!options.force && (type === 'block' || type === 'filtered')) {
// if (this.chain.has(hash))
// return;
// }
item = new LoadRequest(this, peer, type, hash, cb);
if (item.type === 'tx') {
@ -1467,15 +1490,17 @@ Pool.prototype._request = function _request(peer, type, hash, options, cb) {
}
if (peer._blockQueue.length === 0) {
utils.nextTick(function() {
utils.debug(
'Requesting %d/%d blocks from %s with getdata',
peer._blockQueue.length,
self.request.activeBlocks,
peer.host);
this._onFlush(function() {
utils.nextTick(function() {
utils.debug(
'Requesting %d/%d blocks from %s with getdata',
peer._blockQueue.length,
self.request.activeBlocks,
peer.host);
peer.getData(peer._blockQueue);
peer._blockQueue.length = 0;
peer.getData(peer._blockQueue);
peer._blockQueue.length = 0;
});
});
}

View File

@ -309,6 +309,8 @@ script.checksig = function checksig(msg, sig, key) {
try {
return bcoin.ecdsa.verify(msg, sig, key);
} catch (e) {
utils.debug('Elliptic threw during verification:');
utils.debug(e.stack + '');
return false;
}
};