pool: make some functions asynchronous

This commit is contained in:
Fedor Indutny 2014-05-08 16:28:45 +04:00
parent 5c103aeb6c
commit 8043be6e40
5 changed files with 135 additions and 64 deletions

View File

@ -40,6 +40,7 @@ function Chain(options) {
this.fromJSON(preload);
this.loading = false;
this._init();
}
util.inherits(Chain, EventEmitter);
@ -53,18 +54,23 @@ Chain.prototype._init = function _init() {
if (!this._storage)
return;
this.loading = true;
var self = this;
var s = this._storage.createReadStream({
start: this._prefix,
end: this._prefix + 'z'
})
});
s.on('data', function(data) {
var hash = data.key.slice(self.prefix.length);
self.addIndex(hash, data.value.ts, data.value.height);
self._addIndex(hash, data.value.ts, data.value.height);
});
s.on('error', function(err) {
self.emit('error', err);
});
s.on('end', function() {
self.loading = false;
self.emit('load');
});
};
Chain.prototype._getRange = function _getRange(hash, ts, futureOnly) {
@ -88,7 +94,7 @@ Chain.prototype._getRange = function _getRange(hash, ts, futureOnly) {
return { start: start, end: end };
};
Chain.prototype.probeIndex = function probeIndex(hash, ts) {
Chain.prototype._probeIndex = function _probeIndex(hash, ts) {
if (!this.index.bloom.test(hash, 'hex'))
return false;
if (!this.strict)
@ -109,8 +115,8 @@ Chain.prototype.probeIndex = function probeIndex(hash, ts) {
return false;
};
Chain.prototype.addIndex = function addIndex(hash, ts, height) {
if (this.probeIndex(hash, ts))
Chain.prototype._addIndex = function _addIndex(hash, ts, height) {
if (this._probeIndex(hash, ts))
return;
var pos = utils.binaryInsert(this.index.ts, ts, compareTs, true);
@ -142,6 +148,13 @@ Chain.prototype.addIndex = function addIndex(hash, ts, height) {
};
Chain.prototype.add = function add(block) {
if (this.loading) {
this.once('load', function() {
this.add(block);
});
return;
}
var res = false;
var initial = block;
do {
@ -157,7 +170,8 @@ Chain.prototype.add = function add(block) {
break;
// If previous block wasn't ever seen - add current to orphans
if (!this.probeIndex(hash, block.ts) && !this.probeIndex(prev, block.ts)) {
if (!this._probeIndex(hash, block.ts) &&
!this._probeIndex(prev, block.ts)) {
this.orphan.count++;
this.orphan.map[prev] = block;
@ -168,7 +182,7 @@ Chain.prototype.add = function add(block) {
}
// Validated known block at this point - add it to index
this.addIndex(hash, block.ts);
this._addIndex(hash, block.ts);
// At least one block was added
res = true;
@ -213,18 +227,30 @@ Chain.prototype._bloomBlock = function _bloomBlock(block) {
this.block.merkleBloom.add(block.hashes[i], 'hex');
};
Chain.prototype.has = function has(hash, noProbe) {
Chain.prototype.has = function has(hash, noProbe, cb) {
if (typeof noProbe === 'function') {
cb = noProbe;
noProbe = false;
}
if (this.loading) {
this.once('load', function() {
this.has(hash, noProbe, cb);
});
return;
}
cb = utils.asyncify(cb);
if (noProbe && this.block.bloom.test(hash, 'hex')) {
if (this.strict) {
for (var i = 0; i < this.block.list.length; i++)
if (this.block.list[i].hash('hex') === hash)
return true;
return cb(true);
} else {
return true;
return cb(true);
}
}
return !noProbe && this.probeIndex(hash) && !!this.orphan.map[hash];
return cb(!noProbe && this._probeIndex(hash) && !!this.orphan.map[hash]);
};
Chain.prototype.hasMerkle = function hasMerkle(hash) {
@ -268,7 +294,14 @@ Chain.prototype.isFull = function isFull() {
(+new Date / 1000) - this.index.ts[this.index.ts.length - 1] < 40 * 60;
};
Chain.prototype.hashesInRange = function hashesInRange(start, end) {
Chain.prototype.hashesInRange = function hashesInRange(start, end, cb) {
if (this.loading) {
this.once('load', function() {
this.hashesInRange(start, end, cb);
});
return;
}
cb = utils.asyncify(cb);
var ts = this.index.ts;
start = utils.binaryInsert(ts, start, compareTs, true);
@ -279,8 +312,15 @@ Chain.prototype.hashesInRange = function hashesInRange(start, end) {
return this.index.hashes.slice(start, end);
};
Chain.prototype.getLast = function getLast() {
return this.index.hashes[this.index.hashes.length - 1];
Chain.prototype.getLast = function getLast(cb) {
if (this.loading) {
this.once('load', function() {
this.getLast(cb);
});
return;
}
cb = utils.asyncify(cb);
return cb(this.index.hashes[this.index.hashes.length - 1]);
};
Chain.prototype.toJSON = function toJSON() {

View File

@ -156,20 +156,23 @@ Pool.prototype._addLoader = function _addLoader() {
self._scheduleRequests();
// The part of the response is in chain, no need to escalate requests
var allNew = hashes.every(function(hash) {
return !self.chain.has(utils.toHex(hash));
async.every(hashes, function(hash, cb) {
self.chain.has(utils.toHex(hash), function(res) {
cb(!res);
});
}, function(allNew) {
if (!allNew)
return;
// Store last hash to continue global load
self.block.lastHash = hashes[hashes.length - 1];
clearTimeout(timer);
// Reinstantiate timeout
if (self._load())
timer = setTimeout(destroy, self.load.timeout);
});
if (!allNew)
return;
// Store last hash to continue global load
self.block.lastHash = hashes[hashes.length - 1];
clearTimeout(timer);
// Reinstantiate timeout
if (self._load())
timer = setTimeout(destroy, self.load.timeout);
});
};
@ -200,18 +203,20 @@ Pool.prototype._load = function _load() {
return false;
}
this.load.hiReached = false;
var self = this;
// Load more blocks, starting from last hash
var hash;
if (this.block.lastHash)
hash = this.block.lastHash;
next(this.block.lastHash);
else
hash = this.chain.getLast();
this.chain.getLast(next);
if (!this.peers.load)
this._addLoader();
else
this.peers.load.loadBlocks([ hash ]);
function next(hash) {
if (!self.peers.load)
self._addLoader();
else
self.peers.load.loadBlocks([ hash ]);
}
return true;
};
@ -366,16 +371,19 @@ Pool.prototype.search = function search(id, range) {
var self = this;
var e = new EventEmitter();
var hashes = this.chain.hashesInRange(range.start, range.end);
var waiting = hashes.length;
this.chain.hashesInRange(range.start, range.end, function(hashes) {
var waiting = hashes.length;
if (id)
this.watch(id);
if (id)
self.watch(id);
this._loadRange(hashes);
hashes.slice().reverse().forEach(function(hash) {
// Get the block that is in index
this.chain.get(hash, function(block) {
self._loadRange(hashes);
hashes.slice().reverse().forEach(function(hash) {
// Get the block that is in index
self.chain.get(hash, loadBlock);
});
function loadBlock(block) {
// Get block's prev and request it and all of it's parents up to
// the next known block hash
self.chain.get(block.prevBlock, function() {
@ -387,15 +395,15 @@ Pool.prototype.search = function search(id, range) {
e.emit('end');
}
});
});
}, this);
}
// Empty search
if (hashes.length === 0) {
bcoin.utils.nextTick(function() {
e.emit('end', true);
});
}
// Empty search
if (hashes.length === 0) {
bcoin.utils.nextTick(function() {
e.emit('end', true);
});
}
});
return e;
};
@ -416,12 +424,21 @@ Pool.prototype._request = function _request(type, hash, options, cb) {
if (this.request.map[hex])
return this.request.map[hex].addCallback(cb);
// Block should be not in chain, or be requested
if (type === 'block' && this.chain.has(hash, options.force))
return;
var self = this;
var req = new LoadRequest(this, type, hex, cb);
req.add(options.noQueue);
// Block should be not in chain, or be requested
if (type === 'block')
return this.chain.has(hash, options.force, next)
else
return next(false);
function next(has) {
if (has)
return;
var req = new LoadRequest(self, type, hex, cb);
req.add(options.noQueue);
}
};
Pool.prototype._response = function _response(entity) {

View File

@ -34,7 +34,7 @@ TXPool.prototype._init = function init() {
end: this._prefix + 'z'
})
s.on('data', function(data) {
self.add(bcoin.tx.fromJSON(data.value), true);
self.add(bcoin.tx.fromJSON(data), true);
});
s.on('error', function(err) {
self.emit('error', err);
@ -44,14 +44,13 @@ TXPool.prototype._init = function init() {
TXPool.prototype.add = function add(tx, noWrite) {
var hash = tx.hash('hex');
if (!this._wallet.own(tx))
return;
// Do not add TX two times
if (this._all[hash])
return false;
this._all[hash] = tx;
var own = this._wallet.own(tx);
// Consume unspent money or add orphans
for (var i = 0; i < tx.inputs.length; i++) {
var input = tx.inputs[i];
@ -62,17 +61,21 @@ TXPool.prototype.add = function add(tx, noWrite) {
// Add TX to inputs and spend money
tx.input(unspent.tx, unspent.index);
delete this._unspent[key];
this.emit('update');
continue;
}
// Double-spend?!
if (this._orphans[key])
if (!own || this._orphans[key])
continue;
// Add orphan, if no parent transaction is yet known
this._orphans[key] = { tx: tx, index: i };
}
if (!own)
return;
// Add unspent outputs or fullfill orphans
for (var i = 0; i < tx.outputs.length; i++) {
var out = tx.outputs[i];
@ -81,6 +84,7 @@ TXPool.prototype.add = function add(tx, noWrite) {
var orphan = this._orphans[key];
if (!orphan) {
this._unspent[key] = { tx: tx, index: i };
this.emit('update');
continue;
}
delete this._orphans[key];
@ -97,8 +101,6 @@ TXPool.prototype.add = function add(tx, noWrite) {
});
}
this.emit('tx', tx);
return true;
};

View File

@ -317,3 +317,11 @@ RequestCache.prototype.fullfill = function fullfill(id, err, data) {
cb(err, data);
});
};
utils.asyncify = function asyncify(fn) {
return function _asynicifedFn(err, data1, data2) {
utils.nextTick(function() {
fn(err, data1, data2);
});
}
}

View File

@ -46,8 +46,12 @@ module.exports = Wallet;
Wallet.prototype._init = function init() {
// Notify owners about new accepted transactions
var self = this;
this.tx.on('tx', function(tx) {
self.emit('tx', tx);
var prevBalance = null;
this.tx.on('update', function(tx) {
var b = this.balance();
if (prevBalance && prevBalance.cmp(b) !== 0)
self.emit('balance', b);
prevBalance = b;
});
this.tx.on('error', function(err) {