experiment with _lock.

This commit is contained in:
Christopher Jeffrey 2016-02-19 09:10:50 -08:00
parent bccc833299
commit a6668f860f

View File

@ -42,6 +42,7 @@ function Chain(options) {
// this.locked = false;
this.handling = false;
this.busy = false;
this.jobs = [];
this.pending = [];
this.pendingBlocks = {};
this.pendingSize = 0;
@ -693,9 +694,7 @@ Chain.prototype._addEntry = function _addEntry(entry, block, callback) {
});
};
Chain.prototype.resetHeight = function resetHeight(height) {
this._clearPending();
Chain.prototype.resetHeight = function resetHeight(height, force) {
if (height === this.db.getSize() - 1)
return;
@ -709,24 +708,35 @@ Chain.prototype.resetHeight = function resetHeight(height) {
this.orphan.bmap = {};
this.orphan.count = 0;
this.orphan.size = 0;
unlock();
};
Chain.prototype._lock = function _lock(func, args) {
// Maybe do this:
// Chain.prototype._lock = function _lock(func, args, callback, force) {
// And return wrapped callback with an unlock call in it
Chain.prototype._lock = function _lock(func, args, force) {
var self = this;
var block;
if (force) {
assert(this.busy);
return function() {};
}
if (this.busy) {
if (func === Chain.add) {
if (func === Chain.prototype.add) {
block = args[0];
this.pending.push(block);
this.pendingBlocks[block.hash('hex')] = true;
assert(typeof block._size === 'number');
this.pendingSize += block._size;
this.pendingSize += block.getSize();
if (this.pendingSize > this.pendingLimit) {
utils.debug('Warning: %dmb of pending blocks.',
utils.mb(this.pendingSize));
}
}
this.pending.push([func, args]);
this.jobs.push([func, args]);
return;
}
@ -735,36 +745,38 @@ Chain.prototype._lock = function _lock(func, args) {
return function unlock() {
var item, block;
if (func === Chain.add) {
block = args[0];
delete self.pendingBlocks[block.hash('hex')];
assert(typeof block._size === 'number');
self.pendingSize -= block._size;
}
self.busy = false;
if (func === Chain.add && self.pendingSize === 0)
self.emit('flush');
if (self.pending.length === 0) {
return;
if (func === Chain.prototype.add) {
if (self.pending.length === 0)
self.emit('flush');
}
if (self.jobs.length === 0)
return;
item = self.jobs.shift();
if (item[0] === Chain.prototype.add) {
block = item[1][0];
assert(block === self.pending.shift());
delete self.pendingBlocks[block.hash('hex')];
self.pendingSize -= block.getSize();
}
item = self.pending.shift();
item[0].apply(self, item[1]);
};
};
Chain.prototype.resetHeightAsync = function resetHeightAsync(height, callback) {
Chain.prototype.resetHeightAsync = function resetHeightAsync(height, callback, force) {
var self = this;
var lock = this.lock;
this.lock = true;
var unlock = this._lock(resetHeightAsync, [height, callback], force);
if (!unlock)
return;
function done(err, result) {
self.lock = lock;
self._clearPending();
unlock();
callback(err, result);
}
@ -788,18 +800,18 @@ Chain.prototype.resetHeightAsync = function resetHeightAsync(height, callback) {
});
};
Chain.prototype.revertHeight = function revertHeight(height, callback) {
Chain.prototype.revertHeight = function revertHeight(height, callback, force) {
var self = this;
var chainHeight;
var lock = this.lock;
var unlock = this._lock(revertHeight, [height, callback], force);
if (!unlock)
return;
callback = utils.asyncify(callback);
this.lock = true;
function done(err, result) {
self.lock = lock;
self._clearPending();
unlock();
callback(err, result);
}
@ -843,18 +855,18 @@ Chain.prototype.revertHeight = function revertHeight(height, callback) {
self.emit('remove block', block);
});
});
});
}, true);
};
Chain.prototype._revertLast = function _revertLast(existing, callback) {
Chain.prototype._revertLast = function _revertLast(existing, callback, force) {
var self = this;
var lock = this.lock;
this.lock = true;
var unlock = this._lock(_revertLast, [existing, callback], force);
if (!unlock)
return;
function done(err, result) {
self.lock = lock;
self._clearPending();
unlock();
callback(err, result);
}
@ -871,21 +883,21 @@ Chain.prototype._revertLast = function _revertLast(existing, callback) {
return done();
});
});
}, true);
};
Chain.prototype.syncHeight = function syncHeight(callback) {
Chain.prototype.syncHeight = function syncHeight(callback, force) {
var self = this;
var chainHeight;
var lock = this.lock;
var unlock = this._lock(syncHeight, [callback], force);
if (!unlock)
return;
callback = utils.asyncify(callback);
this.lock = true;
function done(err, result) {
self.lock = lock;
self._clearPending();
unlock();
callback(err, result);
}
@ -911,7 +923,7 @@ Chain.prototype.syncHeight = function syncHeight(callback) {
if (blockHeight < chainHeight) {
utils.debug('BlockDB is higher than ChainDB. Syncing...');
return self.resetHeightAsync(blockHeight, done);
return self.resetHeightAsync(blockHeight, done, true);
}
if (blockHeight > chainHeight) {
@ -935,18 +947,29 @@ Chain.prototype.resetTime = function resetTime(ts) {
return this.resetHeight(entry.height);
};
Chain.prototype.resetTimeAsync = function resetTimeAsync(ts, callback) {
Chain.prototype.resetTimeAsync = function resetTimeAsync(ts, callback, force) {
var self = this;
var unlock = this._lock(resetTimeAsync, [ts, callback], force);
if (!unlock)
return;
this.byTimeAsync(ts, function(err, entry) {
if (err)
if (err) {
unlock();
return callback(err);
}
if (!entry)
if (!entry) {
unlock();
return callback();
}
self.resetHeightAsync(entry.height, callback);
});
self.resetHeightAsync(entry.height, function(err) {
unlock();
callback(err);
}, true);
}, true);
};
Chain.prototype._onFlush = function _onFlush(callback) {
@ -963,14 +986,14 @@ Chain.prototype._onFlush = function _onFlush(callback) {
// this.once('unlock', callback);
// };
Chain.prototype.add = function add(initial, peer, callback) {
Chain.prototype.add = function add(initial, peer, callback, force) {
var self = this;
var host = peer ? peer.host : 'unknown';
var total = 0;
assert(!this.loading);
var unlock = this._lock(add, [initial, peer, callback]);
var unlock = this._lock(add, [initial, peer, callback], force);
if (!unlock)
return;
@ -1049,10 +1072,6 @@ Chain.prototype.add = function add(initial, peer, callback) {
checkpoint: false
}, peer);
// Clear the queue. No telling what other
// blocks we're going to get after this.
self._clearPending();
return done();
}
@ -1122,10 +1141,6 @@ Chain.prototype.add = function add(initial, peer, callback) {
checkpoint: true
}, peer);
// Clear the queue. No telling what other
// blocks we're going to get after this.
self._clearPending();
return done();
}
}
@ -1183,12 +1198,8 @@ Chain.prototype.add = function add(initial, peer, callback) {
checkpoint: false
}, peer);
// Clear the queue. No telling what other
// blocks we're going to get after this.
self._clearPending();
return done();
});
}, true);
});
}
@ -1321,15 +1332,6 @@ Chain.prototype.add = function add(initial, peer, callback) {
}
};
Chain.prototype._clearPending = function _clearPending() {
var item;
while (this.pending.length) {
item = this.pending.pop();
delete this.pendingBlocks[item[0].hash('hex')];
this.pendingSize -= item[0].getSize();
}
};
Chain.prototype.has = function has(hash) {
if (this.hasBlock(hash))
return true;
@ -1372,22 +1374,33 @@ Chain.prototype.byTime = function byTime(ts) {
return this.db.getSync(start);
};
Chain.prototype.byTimeAsync = function byTimeAsync(ts, callback) {
Chain.prototype.byTimeAsync = function byTimeAsync(ts, callback, force) {
var self = this;
var start = 0;
var end = this.height + 1;
var pos, delta;
var unlock = this._lock(byTimeAsync, [ts, callback], force);
if (!unlock)
return;
callback = utils.asyncify(callback);
function done(err, result) {
if (err)
if (err) {
unlock();
return callback(err);
}
if (result)
if (result) {
unlock();
return callback(null, result);
}
self.db.getAsync(start, callback);
self.db.getAsync(start, function(err, entry) {
unlock();
callback(err, entry);
});
}
if (ts >= this.tip.ts)
@ -1513,14 +1526,19 @@ Chain.prototype.getHashRange = function getHashRange(start, end) {
return hashes;
};
Chain.prototype.getHashRangeAsync = function getHashRangeAsync(start, end, callback) {
Chain.prototype.getHashRangeAsync = function getHashRangeAsync(start, end, callback, force) {
var self = this;
var called;
var unlock = this._lock(getHashRangeAsync, [start, end, callback], force);
if (!unlock)
return;
function done(err, result) {
if (called)
return;
called = true;
unlock();
callback(err, result);
}
@ -1558,8 +1576,8 @@ Chain.prototype.getHashRangeAsync = function getHashRangeAsync(start, end, callb
return done(null, hashes);
});
}
});
});
}, true);
}, true);
};
Chain.prototype.getLocator = function getLocator(start) {
@ -1610,13 +1628,17 @@ Chain.prototype.getLocator = function getLocator(start) {
return hashes;
};
Chain.prototype.getLocatorAsync = function getLocatorAsync(start, callback) {
Chain.prototype.getLocatorAsync = function getLocatorAsync(start, callback, force) {
var self = this;
var hashes = [];
var top = this.height;
var step = 1;
var i, called, pending;
var unlock = this._lock(getLocatorAsync, [start, callback], force);
if (!unlock)
return;
if (start) {
if (utils.isBuffer(start))
start = utils.toHex(start);
@ -1649,6 +1671,8 @@ Chain.prototype.getLocatorAsync = function getLocatorAsync(start, callback) {
called = true;
unlock();
if (err)
return callback(err);
@ -1692,8 +1716,6 @@ Chain.prototype.getLocatorAsync = function getLocatorAsync(start, callback) {
});
};
// Chain.prototype.getLocatorAsync = wrap(Chain.prototype.getLocatorAsync);
Chain.prototype.getOrphanRoot = function getOrphanRoot(hash) {
var self = this;
var root;