play around with _lock method.
This commit is contained in:
parent
3a63da5735
commit
bccc833299
@ -39,10 +39,9 @@ function Chain(options) {
|
||||
this.loading = false;
|
||||
this.mempool = options.mempool;
|
||||
this.blockdb = options.blockdb;
|
||||
this.locked = false;
|
||||
// this.locked = false;
|
||||
this.handling = false;
|
||||
this.busy = false;
|
||||
this.jobQueue = [];
|
||||
this.pending = [];
|
||||
this.pendingBlocks = {};
|
||||
this.pendingSize = 0;
|
||||
@ -712,6 +711,51 @@ Chain.prototype.resetHeight = function resetHeight(height) {
|
||||
this.orphan.size = 0;
|
||||
};
|
||||
|
||||
Chain.prototype._lock = function _lock(func, args) {
|
||||
var self = this;
|
||||
var block;
|
||||
|
||||
if (this.busy) {
|
||||
if (func === Chain.add) {
|
||||
block = args[0];
|
||||
this.pendingBlocks[block.hash('hex')] = true;
|
||||
assert(typeof block._size === 'number');
|
||||
this.pendingSize += block._size;
|
||||
if (this.pendingSize > this.pendingLimit) {
|
||||
utils.debug('Warning: %dmb of pending blocks.',
|
||||
utils.mb(this.pendingSize));
|
||||
}
|
||||
}
|
||||
this.pending.push([func, args]);
|
||||
return;
|
||||
}
|
||||
|
||||
this.busy = true;
|
||||
|
||||
return function unlock() {
|
||||
var item, block;
|
||||
|
||||
if (func === Chain.add) {
|
||||
block = args[0];
|
||||
delete self.pendingBlocks[block.hash('hex')];
|
||||
assert(typeof block._size === 'number');
|
||||
self.pendingSize -= block._size;
|
||||
}
|
||||
|
||||
self.busy = false;
|
||||
|
||||
if (func === Chain.add && self.pendingSize === 0)
|
||||
self.emit('flush');
|
||||
|
||||
if (self.pending.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
item = self.pending.shift();
|
||||
item[0].apply(self, item[1]);
|
||||
};
|
||||
};
|
||||
|
||||
Chain.prototype.resetHeightAsync = function resetHeightAsync(height, callback) {
|
||||
var self = this;
|
||||
var lock = this.lock;
|
||||
@ -912,96 +956,12 @@ Chain.prototype._onFlush = function _onFlush(callback) {
|
||||
this.once('flush', callback);
|
||||
};
|
||||
|
||||
Chain.prototype._onUnlock = function _onUnlock(callback) {
|
||||
if (!this.handling)
|
||||
return callback();
|
||||
|
||||
this.once('unlock', callback);
|
||||
};
|
||||
|
||||
// REALLY? Did it have to be this fucking complicated?
|
||||
Chain.prototype._onReady = function _onReady(internal, callback) {
|
||||
var self = this;
|
||||
|
||||
if (typeof internal === 'function') {
|
||||
callback = internal;
|
||||
internal = false;
|
||||
}
|
||||
|
||||
if (internal)
|
||||
return callback(function() {});
|
||||
|
||||
if (this.busy) {
|
||||
this.jobQueue.push(callback);
|
||||
return;
|
||||
}
|
||||
|
||||
self.busy = true;
|
||||
|
||||
this._onUnlock(function() {
|
||||
assert(!self.locked);
|
||||
assert(!self.handling);
|
||||
assert(self.busy);
|
||||
|
||||
self.locked = true;
|
||||
|
||||
callback(function unlock() {
|
||||
var item;
|
||||
|
||||
assert(self.locked);
|
||||
assert(!self.handling);
|
||||
assert(self.busy);
|
||||
|
||||
self.busy = false;
|
||||
self.locked = false;
|
||||
|
||||
if (self.jobQueue.length > 0) {
|
||||
item = self.jobQueue.shift();
|
||||
self._onReady(false, item);
|
||||
return;
|
||||
}
|
||||
|
||||
if (self.pending.length === 0)
|
||||
return;
|
||||
|
||||
item = self.pending.shift();
|
||||
delete self.pendingBlocks[item[0].hash('hex')];
|
||||
self.pendingSize -= item[0].getSize();
|
||||
|
||||
self.add(item[0], item[1], item[2]);
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
function wrap(method) {
|
||||
return function wrapper() {
|
||||
var self = this;
|
||||
var args = Array.prototype.slice.call(arguments);
|
||||
var callback, internal;
|
||||
|
||||
if (typeof args[args.length - 1] === 'boolean')
|
||||
internal = args.pop();
|
||||
|
||||
if (typeof args[args.length - 1] === 'function')
|
||||
callback = args.pop();
|
||||
|
||||
this._onReady(internal, function(unlock) {
|
||||
args.push(function() {
|
||||
unlock();
|
||||
|
||||
if (!callback)
|
||||
return;
|
||||
|
||||
return callback.apply(null, arguments);
|
||||
});
|
||||
|
||||
method.apply(self, args);
|
||||
|
||||
if (!callback)
|
||||
unlock();
|
||||
});
|
||||
};
|
||||
}
|
||||
// Chain.prototype._onUnlock = function _onUnlock(callback) {
|
||||
// if (!this.handling)
|
||||
// return callback();
|
||||
//
|
||||
// this.once('unlock', callback);
|
||||
// };
|
||||
|
||||
Chain.prototype.add = function add(initial, peer, callback) {
|
||||
var self = this;
|
||||
@ -1010,16 +970,20 @@ Chain.prototype.add = function add(initial, peer, callback) {
|
||||
|
||||
assert(!this.loading);
|
||||
|
||||
if (this.locked) {
|
||||
this.pending.push([initial, peer, callback]);
|
||||
this.pendingBlocks[initial.hash('hex')] = true;
|
||||
this.pendingSize += initial.getSize();
|
||||
if (this.pendingSize > this.pendingLimit) {
|
||||
utils.debug('Warning: %dmb of pending blocks.',
|
||||
utils.mb(this.pendingSize));
|
||||
}
|
||||
var unlock = this._lock(add, [initial, peer, callback]);
|
||||
if (!unlock)
|
||||
return;
|
||||
}
|
||||
|
||||
// if (this.locked) {
|
||||
// this.pending.push([initial, peer, callback]);
|
||||
// this.pendingBlocks[initial.hash('hex')] = true;
|
||||
// this.pendingSize += initial.getSize();
|
||||
// if (this.pendingSize > this.pendingLimit) {
|
||||
// utils.debug('Warning: %dmb of pending blocks.',
|
||||
// utils.mb(this.pendingSize));
|
||||
// }
|
||||
// return;
|
||||
// }
|
||||
|
||||
assert(!this.handling);
|
||||
|
||||
@ -1334,26 +1298,25 @@ Chain.prototype.add = function add(initial, peer, callback) {
|
||||
callback(null, total);
|
||||
|
||||
self.total += total;
|
||||
self.locked = false;
|
||||
// self.locked = false;
|
||||
self.handling = false;
|
||||
|
||||
self.emit('unlock');
|
||||
unlock();
|
||||
|
||||
if (self.locked)
|
||||
return;
|
||||
|
||||
// Start resolving the queue
|
||||
// (I love asynchronous IO).
|
||||
if (self.pending.length === 0) {
|
||||
self.emit('flush');
|
||||
return;
|
||||
}
|
||||
|
||||
item = self.pending.shift();
|
||||
delete self.pendingBlocks[item[0].hash('hex')];
|
||||
self.pendingSize -= item[0].getSize();
|
||||
|
||||
self.add(item[0], item[1], item[2]);
|
||||
// self.emit('unlock');
|
||||
//
|
||||
// // Start resolving the queue
|
||||
// // (I love asynchronous IO).
|
||||
// if (self.pending.length === 0) {
|
||||
// self.emit('flush');
|
||||
// return;
|
||||
// }
|
||||
//
|
||||
// item = self.pending.shift();
|
||||
// delete self.pendingBlocks[item[0].hash('hex')];
|
||||
// self.pendingSize -= item[0].getSize();
|
||||
//
|
||||
// self.add(item[0], item[1], item[2]);
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
Loading…
Reference in New Issue
Block a user