pool: co.

This commit is contained in:
Christopher Jeffrey 2016-09-22 06:25:10 -07:00
parent a7d3626975
commit 19e5359f0f
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
2 changed files with 47 additions and 34 deletions

View File

@ -381,6 +381,7 @@ Mempool.prototype.getCoinsByAddress = function getCoinsByAddress(addresses) {
for (i = 0; i < addresses.length; i++) {
hash = bcoin.address.getHash(addresses[i], 'hex');
if (!hash)
continue;
@ -408,6 +409,7 @@ Mempool.prototype.getTXByAddress = function getTXByAddress(addresses) {
for (i = 0; i < addresses.length; i++) {
hash = bcoin.address.getHash(addresses[i], 'hex');
if (!hash)
continue;
@ -1035,10 +1037,13 @@ Mempool.prototype.countAncestors = function countAncestors(tx) {
for (i = 0; i < tx.inputs.length; i++) {
input = tx.inputs[i];
prev = this.getTX(input.prevout.hash);
if (!prev)
continue;
count = 1;
count += this.countAncestors(prev);
if (count > max)
max = count;
}
@ -1060,10 +1065,13 @@ Mempool.prototype.countDescendants = function countDescendants(tx) {
for (i = 0; i < tx.outputs.length; i++) {
next = this.isSpent(hash, i);
if (!next)
continue;
count = 1;
count += this.countDescendants(next.tx);
if (count > max)
max = count;
}
@ -1087,8 +1095,10 @@ Mempool.prototype.getAncestors = function getAncestors(tx) {
for (i = 0; i < tx.inputs.length; i++) {
input = tx.inputs[i];
prev = self.getTX(input.prevout.hash);
if (!prev)
continue;
entries.push(prev);
traverse(prev);
}
@ -1113,8 +1123,10 @@ Mempool.prototype.getDescendants = function getDescendants(tx) {
for (i = 0; i < tx.outputs.length; i++) {
next = self.isSpent(hash, i);
if (!next)
continue;
entries.push(next);
traverse(next.tx);
}
@ -1188,8 +1200,10 @@ Mempool.prototype.storeOrphan = function storeOrphan(tx) {
for (i = 0; i < missing.length; i++) {
prev = missing[i];
if (!this.waiting[prev])
this.waiting[prev] = [];
this.waiting[prev].push(hash);
}
@ -1219,9 +1233,12 @@ Mempool.prototype.getBalance = function getBalance() {
for (i = 0; i < hashes.length; i++) {
hash = hashes[i];
tx = this.getTX(hash);
if (!tx)
continue;
hash = tx.hash('hex');
for (j = 0; j < tx.outputs.length; j++) {
coin = this.getCoin(hash, j);
if (coin)
@ -1245,8 +1262,10 @@ Mempool.prototype.getHistory = function getHistory() {
for (i = 0; i < hashes.length; i++) {
hash = hashes[i];
tx = this.getTX(hash);
if (!tx)
continue;
txs.push(tx);
}

View File

@ -1133,11 +1133,13 @@ Pool.prototype.createPeer = function createPeer(addr, socket) {
}
});
peer.on('tx', function(tx) {
self._handleTX(tx, peer).catch(function(err) {
self.emit('error', err);
});
});
peer.on('tx', co(function *(tx) {
try {
yield self._handleTX(tx, peer);
} catch (e) {
self.emit('error', e);
}
}));
peer.on('addr', function(addrs) {
var i, addr;
@ -1169,7 +1171,7 @@ Pool.prototype.createPeer = function createPeer(addr, socket) {
self.fillPeers();
});
peer.on('txs', function(txs) {
peer.on('txs', co(function *(txs) {
var i, hash;
self.emit('txs', txs, peer);
@ -1179,9 +1181,13 @@ Pool.prototype.createPeer = function createPeer(addr, socket) {
for (i = 0; i < txs.length; i++) {
hash = txs[i];
self.getDataSync(peer, self.txType, hash);
try {
yield self.getData(peer, self.txType, hash);
} catch (e) {
self.emit('error', e);
}
}
});
}));
peer.on('version', function(version) {
self.logger.info(
@ -1197,23 +1203,27 @@ Pool.prototype.createPeer = function createPeer(addr, socket) {
self.emit('version', version, peer);
});
peer.on('headers', function(headers) {
peer.on('headers', co(function *(headers) {
if (!self.syncing)
return;
self._handleHeaders(headers, peer).catch(function(err) {
self.emit('error', err);
});
});
try {
yield self._handleHeaders(headers, peer);
} catch (e) {
self.emit('error', e);
}
}));
peer.on('blocks', function(hashes) {
peer.on('blocks', co(function *(hashes) {
if (!self.syncing)
return;
self._handleInv(hashes, peer).catch(function(err) {
self.emit('error', err);
});
});
try {
yield self._handleInv(hashes, peer);
} catch (e) {
self.emit('error', e);
}
}));
return peer;
};
@ -1548,22 +1558,6 @@ Pool.prototype.getData = co(function* getData(peer, type, hash) {
return false;
});
/**
* Queue a `getdata` request to be sent. Promise
* error handler will emit an error on the pool.
* @param {Peer} peer
* @param {Number} type - `getdata` type (see {@link constants.inv}).
* @param {Hash} hash - {@link Block} or {@link TX} hash.
* @param {Function} callback
*/
Pool.prototype.getDataSync = function getDataSync(peer, type, hash) {
var self = this;
return this.getData(peer, type, hash).catch(function(err) {
self.emit('error', err);
});
};
/**
* Test whether the pool has or has seen an item.
* @param {Peer} peer