store mempool orphans in memory always.
This commit is contained in:
parent
a940ea5158
commit
299a49e76b
@ -10,8 +10,6 @@ module.exports = function(bcoin) {
|
|||||||
/*
|
/*
|
||||||
* Database Layout:
|
* Database Layout:
|
||||||
* (inherits all from txdb)
|
* (inherits all from txdb)
|
||||||
* o/[hash] -> orphan/waiting hashes
|
|
||||||
* O/[hash] -> orphan tx
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
var EventEmitter = require('events').EventEmitter;
|
var EventEmitter = require('events').EventEmitter;
|
||||||
@ -43,7 +41,7 @@ var DUMMY = new Buffer([0]);
|
|||||||
* @property {Boolean} loaded
|
* @property {Boolean} loaded
|
||||||
* @property {Object} db
|
* @property {Object} db
|
||||||
* @property {Number} size
|
* @property {Number} size
|
||||||
* @property {Number} orphans
|
* @property {Number} totalOrphans
|
||||||
* @property {Locker} locker
|
* @property {Locker} locker
|
||||||
* @property {Number} freeCount
|
* @property {Number} freeCount
|
||||||
* @property {Number} lastTime
|
* @property {Number} lastTime
|
||||||
@ -76,7 +74,9 @@ function Mempool(options) {
|
|||||||
this.db = null;
|
this.db = null;
|
||||||
this.tx = null;
|
this.tx = null;
|
||||||
this.size = 0;
|
this.size = 0;
|
||||||
this.orphans = 0;
|
this.waiting = {};
|
||||||
|
this.orphans = {};
|
||||||
|
this.totalOrphans = 0;
|
||||||
this.spent = 0;
|
this.spent = 0;
|
||||||
this.total = 0;
|
this.total = 0;
|
||||||
|
|
||||||
@ -406,47 +406,14 @@ Mempool.prototype.limitMempoolSize = function limitMempoolSize(callback) {
|
|||||||
|
|
||||||
Mempool.prototype.pruneOrphans = function pruneOrphans(callback) {
|
Mempool.prototype.pruneOrphans = function pruneOrphans(callback) {
|
||||||
var self = this;
|
var self = this;
|
||||||
var hashes = [];
|
|
||||||
var iter;
|
|
||||||
|
|
||||||
callback = utils.ensure(callback);
|
callback = utils.ensure(callback);
|
||||||
|
|
||||||
iter = this.db.iterator({
|
utils.forEachSerial(Object.keys(this.orphans), function(hash, next) {
|
||||||
gte: 'O',
|
if (self.totalOrphans <= constants.mempool.MAX_ORPHAN_TX)
|
||||||
lte: 'O~',
|
return next();
|
||||||
keys: true,
|
self.removeOrphan(hash, next);
|
||||||
values: false,
|
}, callback);
|
||||||
fillCache: false,
|
|
||||||
keyAsBuffer: false
|
|
||||||
});
|
|
||||||
|
|
||||||
function done(err) {
|
|
||||||
if (err)
|
|
||||||
return callback(err);
|
|
||||||
|
|
||||||
utils.forEachSerial(hashes, function(hash, next) {
|
|
||||||
if (self.orphans <= constants.mempool.MAX_ORPHAN_TX)
|
|
||||||
return next();
|
|
||||||
self.removeOrphan(hash, next);
|
|
||||||
}, callback);
|
|
||||||
}
|
|
||||||
|
|
||||||
(function next() {
|
|
||||||
iter.next(function(err, key, value) {
|
|
||||||
if (err) {
|
|
||||||
return iter.end(function() {
|
|
||||||
callback(err);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
if (key === undefined)
|
|
||||||
return iter.end(done);
|
|
||||||
|
|
||||||
hashes.push(key.split('/')[1]);
|
|
||||||
|
|
||||||
next();
|
|
||||||
});
|
|
||||||
})();
|
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1020,7 +987,7 @@ Mempool.prototype.countAncestors = function countAncestors(tx, callback) {
|
|||||||
Mempool.prototype.storeOrphan = function storeOrphan(tx, callback, force) {
|
Mempool.prototype.storeOrphan = function storeOrphan(tx, callback, force) {
|
||||||
var self = this;
|
var self = this;
|
||||||
var prevout = {};
|
var prevout = {};
|
||||||
var i, hash, batch, input, p;
|
var i, hash, batch, input, prev;
|
||||||
|
|
||||||
if (tx.getSize() > 5000) {
|
if (tx.getSize() > 5000) {
|
||||||
bcoin.debug('Ignoring large orphan: %s', tx.rhash);
|
bcoin.debug('Ignoring large orphan: %s', tx.rhash);
|
||||||
@ -1028,7 +995,6 @@ Mempool.prototype.storeOrphan = function storeOrphan(tx, callback, force) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
hash = tx.hash('hex');
|
hash = tx.hash('hex');
|
||||||
batch = this.db.batch();
|
|
||||||
|
|
||||||
for (i = 0; i < tx.inputs.length; i++) {
|
for (i = 0; i < tx.inputs.length; i++) {
|
||||||
input = tx.inputs[i];
|
input = tx.inputs[i];
|
||||||
@ -1040,40 +1006,25 @@ Mempool.prototype.storeOrphan = function storeOrphan(tx, callback, force) {
|
|||||||
|
|
||||||
assert(prevout.length > 0);
|
assert(prevout.length > 0);
|
||||||
|
|
||||||
utils.forEachSerial(prevout, function(prev, next) {
|
for (i = 0; i < prevout.length; i++) {
|
||||||
self.getWaiting(prev, function(err, orphans, buf) {
|
prev = prevout[i];
|
||||||
|
if (!this.waiting[prev])
|
||||||
|
this.waiting[prev] = [];
|
||||||
|
this.waiting[prev].push(hash);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.orphans[hash] = tx.toExtended(true);
|
||||||
|
this.totalOrphans++;
|
||||||
|
|
||||||
|
if (self.totalOrphans > constants.mempool.MAX_ORPHAN_TX) {
|
||||||
|
return self.pruneOrphans(function(err) {
|
||||||
if (err)
|
if (err)
|
||||||
return next(err);
|
return callback(err);
|
||||||
|
callback();
|
||||||
p = new BufferWriter();
|
|
||||||
|
|
||||||
if (buf)
|
|
||||||
p.writeBytes(buf);
|
|
||||||
|
|
||||||
p.writeHash(hash);
|
|
||||||
|
|
||||||
batch.put('o/' + prev, p.render());
|
|
||||||
|
|
||||||
next();
|
|
||||||
});
|
});
|
||||||
}, function(err) {
|
}
|
||||||
if (err)
|
|
||||||
return callback(err);
|
|
||||||
|
|
||||||
self.orphans++;
|
return callback();
|
||||||
|
|
||||||
batch.put('O/' + hash, tx.toExtended(true));
|
|
||||||
|
|
||||||
if (self.orphans > constants.mempool.MAX_ORPHAN_TX) {
|
|
||||||
return self.pruneOrphans(function(err) {
|
|
||||||
if (err)
|
|
||||||
return callback(err);
|
|
||||||
batch.write(callback);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
batch.write(callback);
|
|
||||||
});
|
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1101,28 +1052,7 @@ Mempool.prototype.getHistory = function getHistory(callback) {
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
Mempool.prototype.getWaiting = function getWaiting(hash, callback) {
|
Mempool.prototype.getWaiting = function getWaiting(hash, callback) {
|
||||||
var self = this;
|
return callback(null, this.waiting[hash] || []);
|
||||||
var hashes = [];
|
|
||||||
var p;
|
|
||||||
|
|
||||||
this.db.get('o/' + hash, function(err, buf) {
|
|
||||||
if (err && err.type !== 'NotFoundError')
|
|
||||||
return callback(err);
|
|
||||||
|
|
||||||
if (!buf)
|
|
||||||
return callback(null, hashes, buf);
|
|
||||||
|
|
||||||
p = new BufferReader(buf);
|
|
||||||
|
|
||||||
try {
|
|
||||||
while (p.left())
|
|
||||||
hashes.push(p.readHash('hex'));
|
|
||||||
} catch (e) {
|
|
||||||
return callback(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
return callback(null, hashes, buf);
|
|
||||||
});
|
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1133,22 +1063,18 @@ Mempool.prototype.getWaiting = function getWaiting(hash, callback) {
|
|||||||
|
|
||||||
Mempool.prototype.getOrphan = function getOrphan(orphanHash, callback) {
|
Mempool.prototype.getOrphan = function getOrphan(orphanHash, callback) {
|
||||||
var self = this;
|
var self = this;
|
||||||
|
var orphan = this.orphans[orphanHash];
|
||||||
|
|
||||||
this.db.get('O/' + orphanHash, function(err, orphan) {
|
if (!orphan)
|
||||||
if (err && err.type !== 'NotFoundError')
|
return callback();
|
||||||
return callback(err);
|
|
||||||
|
|
||||||
if (!orphan)
|
try {
|
||||||
return callback();
|
orphan = bcoin.tx.fromExtended(orphan, true);
|
||||||
|
} catch (e) {
|
||||||
|
return callback(e);
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
return callback(null, orphan);
|
||||||
orphan = bcoin.tx.fromExtended(orphan, true);
|
|
||||||
} catch (e) {
|
|
||||||
return callback(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
return callback(null, orphan);
|
|
||||||
});
|
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1157,12 +1083,7 @@ Mempool.prototype.getOrphan = function getOrphan(orphanHash, callback) {
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
Mempool.prototype.hasOrphan = function hasOrphan(orphanHash, callback) {
|
Mempool.prototype.hasOrphan = function hasOrphan(orphanHash, callback) {
|
||||||
this.db.get('O/' + orphanHash, function(err, orphan) {
|
return callback(null, this.orphans[orphanHash] != null);
|
||||||
if (err && err.type !== 'NotFoundError')
|
|
||||||
return callback(err);
|
|
||||||
|
|
||||||
return callback(null, orphan != null);
|
|
||||||
});
|
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1178,7 +1099,6 @@ Mempool.prototype.resolveOrphans = function resolveOrphans(tx, callback, force)
|
|||||||
var self = this;
|
var self = this;
|
||||||
var hash = tx.hash('hex');
|
var hash = tx.hash('hex');
|
||||||
var resolved = [];
|
var resolved = [];
|
||||||
var batch = this.db.batch();
|
|
||||||
|
|
||||||
this.getWaiting(hash, function(err, hashes) {
|
this.getWaiting(hash, function(err, hashes) {
|
||||||
if (err)
|
if (err)
|
||||||
@ -1195,29 +1115,23 @@ Mempool.prototype.resolveOrphans = function resolveOrphans(tx, callback, force)
|
|||||||
orphan.fillCoins(tx);
|
orphan.fillCoins(tx);
|
||||||
|
|
||||||
if (orphan.hasCoins()) {
|
if (orphan.hasCoins()) {
|
||||||
self.orphans--;
|
self.totalOrphans--;
|
||||||
batch.del('O/' + orphanHash);
|
delete self.orphans[orphanHash];
|
||||||
resolved.push(orphan);
|
resolved.push(orphan);
|
||||||
return next();
|
return next();
|
||||||
}
|
}
|
||||||
|
|
||||||
batch.put('O/' + orphanHash, orphan.toExtended(true));
|
self.orphans[orphanHash] = orphan.toExtended(true);
|
||||||
|
|
||||||
next();
|
next();
|
||||||
});
|
});
|
||||||
}, function(err) {
|
}, function(err) {
|
||||||
if (err)
|
if (err)
|
||||||
return callback(err);
|
return callback(err);
|
||||||
|
|
||||||
function done(err) {
|
delete self.waiting[hash];
|
||||||
if (err)
|
|
||||||
return callback(err);
|
|
||||||
|
|
||||||
return callback(null, resolved);
|
return callback(null, resolved);
|
||||||
}
|
|
||||||
|
|
||||||
batch.del('o/' + hash);
|
|
||||||
|
|
||||||
return batch.write(done);
|
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
@ -1230,7 +1144,7 @@ Mempool.prototype.resolveOrphans = function resolveOrphans(tx, callback, force)
|
|||||||
|
|
||||||
Mempool.prototype.removeOrphan = function removeOrphan(tx, callback) {
|
Mempool.prototype.removeOrphan = function removeOrphan(tx, callback) {
|
||||||
var self = this;
|
var self = this;
|
||||||
var batch, prevout, hash;
|
var prevout, hash;
|
||||||
|
|
||||||
function getOrphan(tx, callback) {
|
function getOrphan(tx, callback) {
|
||||||
if (typeof tx === 'string')
|
if (typeof tx === 'string')
|
||||||
@ -1245,14 +1159,10 @@ Mempool.prototype.removeOrphan = function removeOrphan(tx, callback) {
|
|||||||
if (!tx)
|
if (!tx)
|
||||||
return callback();
|
return callback();
|
||||||
|
|
||||||
batch = self.db.batch();
|
|
||||||
|
|
||||||
hash = tx.hash('hex');
|
hash = tx.hash('hex');
|
||||||
prevout = tx.getPrevout();
|
prevout = tx.getPrevout();
|
||||||
|
|
||||||
batch.del('O/' + hash);
|
utils.forEachSerial(prevout, function(prev, next) {
|
||||||
|
|
||||||
utils.forEach(prevout, function(prev, next) {
|
|
||||||
var i, p;
|
var i, p;
|
||||||
self.getWaiting(prev, function(err, hashes) {
|
self.getWaiting(prev, function(err, hashes) {
|
||||||
if (err)
|
if (err)
|
||||||
@ -1266,16 +1176,11 @@ Mempool.prototype.removeOrphan = function removeOrphan(tx, callback) {
|
|||||||
hashes.splice(i, 1);
|
hashes.splice(i, 1);
|
||||||
|
|
||||||
if (hashes.length === 0) {
|
if (hashes.length === 0) {
|
||||||
batch.del('o/' + prev);
|
delete self.waiting[prev];
|
||||||
return next();
|
return next();
|
||||||
}
|
}
|
||||||
|
|
||||||
p = new BufferWriter();
|
self.waiting[prev] = hashes;
|
||||||
|
|
||||||
for (i = 0; i < hashes.length; i++)
|
|
||||||
p.writeHash(hashes[i]);
|
|
||||||
|
|
||||||
batch.put('o/' + prev, p.render());
|
|
||||||
|
|
||||||
next();
|
next();
|
||||||
});
|
});
|
||||||
@ -1283,9 +1188,10 @@ Mempool.prototype.removeOrphan = function removeOrphan(tx, callback) {
|
|||||||
if (err)
|
if (err)
|
||||||
return callback(err);
|
return callback(err);
|
||||||
|
|
||||||
self.orphans--;
|
delete self.orphans[hash];
|
||||||
|
self.totalOrphans--;
|
||||||
|
|
||||||
batch.write(callback);
|
callback();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user