fix chaindb queue system.

This commit is contained in:
Christopher Jeffrey 2016-02-19 06:30:10 -08:00
parent 7df05b971d
commit 7c3191aee8
3 changed files with 110 additions and 60 deletions

View File

@ -40,6 +40,8 @@ function Chain(options) {
this.mempool = options.mempool;
this.blockdb = options.blockdb;
this.locked = false;
this.handling = false;
this.busy = false;
this.pending = [];
this.pendingBlocks = {};
this.pendingSize = 0;
@ -900,6 +902,7 @@ Chain.prototype.resetTime = function resetTime(ts) {
Chain.prototype.resetTimeAsync = function resetTimeAsync(ts, callback) {
var self = this;
this.byTimeAsync(ts, function(err, entry) {
if (err)
return callback(err);
@ -911,6 +914,50 @@ Chain.prototype.resetTimeAsync = function resetTimeAsync(ts, callback) {
});
};
Chain.prototype._onFlush = function _onFlush(callback) {
if (this.pending.length === 0)
return callback();
this.once('flush', callback);
};
Chain.prototype._onUnlock = function _onUnlock(callback) {
if (!this.handling)
return callback();
this.once('unlock', callback);
};
Chain.prototype._onBored = function _onBored(callback) {
if (!this.busy)
return callback();
this.once('bored', callback);
};
// REALLY? Did it have to be this fucking complicated?
Chain.prototype._onReady = function _onReady(internal, callback) {
var self = this;
if (internal)
return callback();
this._onUnlock(function() {
if (self.locked)
return self._onReady(internal, callback);
self.locked = true;
self._onBored(function() {
if (self.busy)
return self._onReady(internal, callback);
self.busy = true;
callback(function unlock() {
self.busy = false;
self.locked = false;
self.emit('unlock');
self.emit('bored');
});
});
});
};
Chain.prototype.add = function add(initial, peer, callback) {
var self = this;
var host = peer ? peer.host : 'unknown';
@ -929,7 +976,10 @@ Chain.prototype.add = function add(initial, peer, callback) {
return;
}
assert(!this.handling);
this.locked = true;
this.handling = true;
(function next(block) {
var hash = block.hash('hex');
@ -1240,6 +1290,9 @@ Chain.prototype.add = function add(initial, peer, callback) {
self.total += total;
self.locked = false;
self.handling = false;
self.emit('unlock');
// Start resolving the queue
// (I love asynchronous IO).

View File

@ -39,6 +39,7 @@ function ChainDB(chain, options) {
this.heightLookup = {};
this.queue = {};
this.queueSize = 0;
this.cache = {};
this.bufferPool = { used: {} };
this.highest = -1;
@ -260,24 +261,26 @@ ChainDB.prototype._populate = function _populate(entry) {
}
};
ChainDB.prototype.getSync = function getSync(height) {
ChainDB.prototype.getSync = function getSync(height, force) {
var data, entry;
if (typeof height === 'string')
height = this.heightLookup[height];
if (height < 0 || height == null)
return;
if (!force) {
if ((height + 1) * BLOCK_SIZE > this.size)
return;
}
if (this.cache[height])
return this.cache[height];
if (this.queue[height])
return this.queue[height];
if (height < 0 || height == null)
return;
if ((height + 1) * BLOCK_SIZE > this.size)
return;
data = this._readSync(BLOCK_SIZE, height * BLOCK_SIZE);
if (!data)
@ -303,12 +306,6 @@ ChainDB.prototype.getAsync = function getAsync(height, callback, force) {
if (typeof height === 'string')
height = this.heightLookup[height];
if (this.cache[height])
return callback(null, this.cache[height]);
if (this.queue[height])
return callback(null, this.queue[height]);
if (height < 0 || height == null)
return callback();
@ -317,6 +314,12 @@ ChainDB.prototype.getAsync = function getAsync(height, callback, force) {
return callback();
}
if (this.cache[height])
return callback(null, this.cache[height]);
if (this.queue[height])
return callback(null, this.queue[height]);
return this._readAsync(BLOCK_SIZE, height * BLOCK_SIZE, function(err, data) {
var entry;
@ -382,17 +385,13 @@ ChainDB.prototype.saveAsync = function saveAsync(entry, callback) {
// Populate the entry.
this._populate(entry);
// Something is already writing. Cancel it
// and synchronously write the data after
// it cancels.
if (this.queue[entry.height]) {
this.queue[entry.height] = entry;
return callback();
}
// Something is already writing.
assert(!this.queue[entry.height]);
// Speed up writes by doing them asynchronously
// and keeping the data to be written in memory.
this.queue[entry.height] = entry;
this.queueSize++;
// Write asynchronously to the db.
raw = entry.toRaw();
@ -402,18 +401,11 @@ ChainDB.prototype.saveAsync = function saveAsync(entry, callback) {
if (err)
return callback(err);
var item = self.queue[entry.height];
// Something tried to write here but couldn't.
// Synchronously write it and get it over with.
try {
if (item && item !== entry)
success = self._writeSync(item.toRaw(), offset);
} catch (e) {
err = e;
}
assert(self.queue[entry.height]);
delete self.queue[entry.height];
self.queueSize--;
if (self.queueSize === 0)
self.emit('flush');
return callback(null, success);
});
@ -426,13 +418,14 @@ ChainDB.prototype._drop = function _drop(height) {
// to handle this.
if (this.queue[height]) {
utils.debug('Warning: write job in progress.');
delete this.queue[height];
// delete this.queue[height];
}
delete this.cache[height];
};
ChainDB.prototype.resetHeightSync = function resetHeightSync(height) {
var self = this;
var size, count, existing;
if (typeof height === 'string')
@ -450,23 +443,26 @@ ChainDB.prototype.resetHeightSync = function resetHeightSync(height) {
assert(this.tip);
for (i = height + 1; i < count; i++) {
existing = this.get(i);
existing = this.getSync(i);
assert(existing);
this._drop(i);
delete this.heightLookup[existing.hash];
}
if (!bcoin.fs)
this.ramdisk.truncate(size);
else
fs.ftruncateSync(this.fd, size);
this.size = size;
this.highest = height;
this.tip = this.get(height);
this.tip = this.getSync(height);
assert(this.tip);
this.height = this.tip.height;
this.emit('tip', this.tip);
// This will be synchronous 99% of the time.
this._onFlush(function() {
if (!bcoin.fs)
self.ramdisk.truncate(size);
else
fs.ftruncateSync(self.fd, size);
});
};
ChainDB.prototype.resetHeightAsync = function resetHeightAsync(height, callback) {
@ -492,7 +488,7 @@ ChainDB.prototype.resetHeightAsync = function resetHeightAsync(height, callback)
this.size = size;
this.highest = height;
this.tip = this.get(height);
this.tip = this.getSync(height);
assert(this.tip);
this.height = this.tip.height;
this.emit('tip', this.tip);
@ -522,20 +518,28 @@ ChainDB.prototype.resetHeightAsync = function resetHeightAsync(height, callback)
if (err)
return callback(err);
if (!bcoin.fs) {
self.ramdisk.truncate(size);
return callback();
}
self._onFlush(function() {
if (!bcoin.fs) {
self.ramdisk.truncate(size);
return callback();
}
fs.ftruncate(self.fd, size, function(err) {
if (err)
return callback(err);
fs.ftruncate(self.fd, size, function(err) {
if (err)
return callback(err);
return callback();
return callback();
});
});
}
};
ChainDB.prototype._onFlush = function _onFlush(callback) {
if (this.queueSize === 0)
return callback();
this.once('flush', callback);
};
ChainDB.prototype.has = function has(height) {
if (typeof height === 'string')
height = this.heightLookup[height];

View File

@ -249,7 +249,7 @@ Pool.prototype._init = function _init() {
Pool.prototype.loadBlocks = function loadBlocks(peer, top, stop) {
var self = this;
this._onFlush(function() {
this.chain._onFlush(function() {
peer.loadBlocks(self.chain.getLocator(top), stop);
});
};
@ -257,7 +257,7 @@ Pool.prototype.loadBlocks = function loadBlocks(peer, top, stop) {
Pool.prototype.loadOrphan = function loadOrphan(peer, top, orphan) {
var self = this;
assert(orphan);
this._onFlush(function() {
this.chain._onFlush(function() {
peer.loadBlocks(
self.chain.getLocator(top),
self.chain.getOrphanRoot(orphan)
@ -267,7 +267,7 @@ Pool.prototype.loadOrphan = function loadOrphan(peer, top, orphan) {
Pool.prototype.loadHeaders = function loadHeaders(peer, top, stop) {
var self = this;
this._onFlush(function() {
this.chain._onFlush(function() {
peer.loadHeaders(self.chain.getLocator(top), stop);
});
};
@ -570,7 +570,7 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer) {
this.emit('blocks', hashes);
this._onFlush(function() {
this.chain._onFlush(function() {
for (i = 0; i < hashes.length; i++) {
hash = hashes[i];
@ -612,13 +612,6 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer) {
this._startTimer();
};
Pool.prototype._onFlush = function _onFlush(callback) {
if (this.chain.pending.length === 0)
return callback();
this.chain.once('flush', callback);
};
Pool.prototype._handleInv = function _handleInv(hashes, peer) {
var i, hash;
@ -1491,7 +1484,7 @@ Pool.prototype._request = function _request(peer, type, hash, options, cb) {
}
if (peer._blockQueue.length === 0) {
this._onFlush(function() {
this.chain._onFlush(function() {
utils.nextTick(function() {
utils.debug(
'Requesting %d/%d blocks from %s with getdata',