refactor mempool orphans.

This commit is contained in:
Christopher Jeffrey 2016-05-16 04:30:48 -07:00
parent c37a069978
commit 88a5b8a784
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD

View File

@ -247,8 +247,10 @@ Mempool.prototype.addBlock = function addBlock(block, callback, force) {
if (err)
return next(err);
if (!entry)
return self.removeOrphan(hash, next);
if (!entry) {
self.removeOrphan(hash);
return next();
}
self.removeUnchecked(entry, false, function(err) {
if (err)
@ -380,29 +382,21 @@ Mempool.prototype.limitMempoolSize = function limitMempoolSize(entryHash, callba
/**
* Purge orphan transactions from the mempool.
* @param {Function} callback
*/
Mempool.prototype.limitOrphans = function limitOrphans(callback) {
var self = this;
Mempool.prototype.limitOrphans = function limitOrphans() {
var orphans = Object.keys(this.orphans);
var i, hash;
(function next(err) {
if (err)
return callback(err);
if (self.totalOrphans <= constants.mempool.MAX_ORPHAN_TX)
return callback();
while (this.totalOrphans > constants.mempool.MAX_ORPHAN_TX) {
i = bcoin.ec.rand(0, orphans.length);
hash = orphans[i];
orphans.splice(i, 1);
bcoin.debug('Removing orphan %s from mempool.', utils.revHex(hash));
self.removeOrphan(hash, next);
})();
this.removeOrphan(hash);
}
};
/**
@ -606,20 +600,13 @@ Mempool.prototype.getRange = function getRange(options, callback) {
*/
Mempool.prototype.has = function has(hash, callback) {
var self = this;
if (this.locker.hasPending(hash))
return utils.asyncify(callback)(null, true);
return this.hasTX(hash, function(err, exists) {
if (err)
return callback(err);
if (this.hasOrphan(hash))
return utils.asyncify(callback)(null, true);
if (exists)
return callback(null, true);
self.hasOrphan(hash, callback);
});
return this.hasTX(hash, callback);
};
/**
@ -722,8 +709,10 @@ Mempool.prototype.addTX = function addTX(tx, callback, force) {
if (err)
return callback(err);
if (!tx.hasCoins())
return self.storeOrphan(tx, callback);
if (!tx.hasCoins()) {
self.storeOrphan(tx);
return callback();
}
entry = MempoolEntry.fromTX(tx, self.chain.height);
@ -769,6 +758,7 @@ Mempool.prototype.addTX = function addTX(tx, callback, force) {
Mempool.prototype.addUnchecked = function addUnchecked(entry, callback, force) {
var self = this;
var resolved;
var unlock = this._lock(addUnchecked, [entry, callback], force);
if (!unlock)
@ -788,34 +778,31 @@ Mempool.prototype.addUnchecked = function addUnchecked(entry, callback, force) {
bcoin.debug('Added tx %s to the mempool.', entry.tx.rhash);
self.resolveOrphans(entry.tx, function(err, resolved) {
if (err)
return callback(err);
resolved = self.resolveOrphans(entry.tx);
utils.forEachSerial(resolved, function(tx, next) {
var entry = MempoolEntry.fromTX(tx, self.chain.height);
self.verify(entry, function(err) {
utils.forEachSerial(resolved, function(tx, next) {
var entry = MempoolEntry.fromTX(tx, self.chain.height);
self.verify(entry, function(err) {
if (err) {
if (err.type === 'VerifyError') {
bcoin.debug('Could not resolve orphan %s: %s.',
tx.rhash,
err.message);
return next();
}
self.emit('error', err);
return next();
}
self.addUnchecked(entry, function(err) {
if (err) {
if (err.type === 'VerifyError') {
bcoin.debug('Could not resolve orphan %s: %s.',
tx.rhash,
err.message);
return next();
}
self.emit('error', err);
return next();
}
self.addUnchecked(entry, function(err) {
if (err) {
self.emit('error', err);
return next();
}
bcoin.debug('Resolved orphan %s in mempool.', entry.tx.rhash);
next();
}, true);
});
}, callback);
});
bcoin.debug('Resolved orphan %s in mempool.', entry.tx.rhash);
next();
}, true);
});
}, callback);
});
};
@ -840,31 +827,28 @@ Mempool.prototype.removeUnchecked = function removeUnchecked(entry, limit, callb
if (err)
return callback(err);
self.removeOrphan(entry.tx, function(err) {
self.removeOrphan(entry.tx);
self._removeUnchecked(entry, limit, function(err) {
if (err)
return callback(err);
self._removeUnchecked(entry, limit, function(err) {
if (err)
return callback(err);
self.spent -= entry.tx.inputs.length;
self.size -= self.memUsage(entry.tx);
self.total--;
self.spent -= entry.tx.inputs.length;
self.size -= self.memUsage(entry.tx);
self.total--;
if (limit) {
rate = Math.floor(entry.fees * 1000 / entry.size);
rate += self.minReasonableFee;
if (rate > self.minFeeRate) {
self.minFeeRate = rate;
self.blockSinceBump = false;
}
if (limit) {
rate = Math.floor(entry.fees * 1000 / entry.size);
rate += self.minReasonableFee;
if (rate > self.minFeeRate) {
self.minFeeRate = rate;
self.blockSinceBump = false;
}
}
self.emit('remove tx', entry.tx);
self.emit('remove tx', entry.tx);
return callback();
});
return callback();
});
});
};
@ -1097,17 +1081,15 @@ Mempool.prototype.countAncestors = function countAncestors(tx, callback) {
/**
* Store an orphaned transaction.
* @param {TX} tx
* @param {Function} callback
*/
Mempool.prototype.storeOrphan = function storeOrphan(tx, callback, force) {
var self = this;
Mempool.prototype.storeOrphan = function storeOrphan(tx) {
var prevout = {};
var i, hash, batch, input, prev;
if (tx.getSize() > 5000) {
bcoin.debug('Ignoring large orphan: %s', tx.rhash);
return callback();
return;
}
hash = tx.hash('hex');
@ -1134,10 +1116,7 @@ Mempool.prototype.storeOrphan = function storeOrphan(tx, callback, force) {
bcoin.debug('Added orphan %s to mempool.', tx.rhash);
if (this.totalOrphans > constants.mempool.MAX_ORPHAN_TX)
return this.limitOrphans(callback);
return callback();
this.limitOrphans();
};
/**
@ -1187,45 +1166,38 @@ Mempool.prototype.getHistory = function getHistory(callback) {
}, callback);
};
/**
* Get hashes of all orphans a transaction hash potentially resolves.
* @param {Hash} hash - Resolving transaction.
* @param {Function} callback - Return [Error, {@link Hash}[]].
*/
Mempool.prototype.getWaiting = function getWaiting(hash, callback) {
return callback(null, this.waiting[hash] || []);
};
/**
* Retrieve an orphan transaction.
* @param {Hash} orphanHash
* @param {Function} callback - Returns [Error, {@link TX}].
* @param {Hash} hash
* @returns {TX}
*/
Mempool.prototype.getOrphan = function getOrphan(orphanHash, callback) {
var self = this;
var orphan = this.orphans[orphanHash];
Mempool.prototype.getOrphan = function getOrphan(hash) {
var orphan = this.orphans[hash];
if (!orphan)
return callback();
return;
try {
orphan = bcoin.tx.fromExtended(orphan, true);
} catch (e) {
return callback(e);
delete this.orphans[hash];
bcoin.debug('%s %s',
'Warning: possible memory corruption.',
'Orphan failed deserialization.');
return;
}
return callback(null, orphan);
return orphan;
};
/**
* @param {Hash} orphanHash
* @param {Function} callback - Returns [Error, Boolean].
* @param {Hash} hash
* @returns {Boolean}
*/
Mempool.prototype.hasOrphan = function hasOrphan(orphanHash, callback) {
return callback(null, this.orphans[orphanHash] != null);
Mempool.prototype.hasOrphan = function hasOrphan(hash) {
return this.orphans[hash] != null;
};
/**
@ -1234,107 +1206,81 @@ Mempool.prototype.hasOrphan = function hasOrphan(orphanHash, callback) {
* Deletes all orphan entries and
* returns orphan hashes.
* @param {TX} tx
* @param {Function} callback - Returns [Error, {@link Hash}[]].
* @returns {Array} Resolved
*/
Mempool.prototype.resolveOrphans = function resolveOrphans(tx, callback, force) {
var self = this;
Mempool.prototype.resolveOrphans = function resolveOrphans(tx) {
var hash = tx.hash('hex');
var resolved = [];
var hashes = this.waiting[hash];
var i, orphanHash, orphan;
this.getWaiting(hash, function(err, hashes) {
if (err)
return callback(err);
if (!hashes)
return resolved;
utils.forEachSerial(hashes, function(orphanHash, next, i) {
self.getOrphan(orphanHash, function(err, orphan) {
if (err)
return next(err);
for (i = 0; i < hashes.length; i++) {
orphanHash = hashes[i];
orphan = this.getOrphan(orphanHash);
if (!orphan)
return next();
if (!orphan)
continue;
orphan.fillCoins(tx);
orphan.fillCoins(tx);
if (orphan.hasCoins()) {
self.totalOrphans--;
delete self.orphans[orphanHash];
resolved.push(orphan);
return next();
}
if (orphan.hasCoins()) {
this.totalOrphans--;
delete this.orphans[orphanHash];
resolved.push(orphan);
continue;
}
self.orphans[orphanHash] = orphan.toExtended(true);
this.orphans[orphanHash] = orphan.toExtended(true);
}
next();
});
}, function(err) {
if (err)
return callback(err);
delete this.waiting[hash];
delete self.waiting[hash];
return callback(null, resolved);
});
});
return resolved;
};
/**
* Remove a transaction from the mempool.
* @param {TX|Hash} tx
* @param {Function} callback
*/
Mempool.prototype.removeOrphan = function removeOrphan(tx, callback) {
var self = this;
var i, prevout, hash;
Mempool.prototype.removeOrphan = function removeOrphan(tx) {
var i, j, hashes, prevout, prev, hash;
function getOrphan(tx, callback) {
if (typeof tx === 'string')
return self.getOrphan(tx, callback);
return callback(null, tx);
if (typeof tx === 'string')
tx = this.getOrphan(tx);
if (!tx)
return;
hash = tx.hash('hex');
prevout = tx.getPrevout();
for (i = 0; i < prevout.length; i++) {
prev = prevout[i];
hashes = this.waiting[prev];
if (!hashes)
continue;
j = hashes.indexOf(hash);
if (j !== -1)
hashes.splice(j, 1);
if (hashes.length === 0) {
delete this.waiting[prev];
continue;
}
this.waiting[prev] = hashes;
}
return getOrphan(tx, function(err, tx) {
if (err)
return callback(err);
delete this.orphans[hash];
if (!tx)
return callback();
hash = tx.hash('hex');
prevout = tx.getPrevout();
utils.forEachSerial(prevout, function(prev, next) {
self.getWaiting(prev, function(err, hashes) {
if (err)
return next(err);
if (hashes.length === 0)
return next();
i = hashes.indexOf(hash);
if (i !== -1)
hashes.splice(i, 1);
if (hashes.length === 0) {
delete self.waiting[prev];
return next();
}
self.waiting[prev] = hashes;
next();
});
}, function(err) {
if (err)
return callback(err);
delete self.orphans[hash];
self.totalOrphans--;
callback();
});
});
this.totalOrphans--;
};
/**