refactor peer and pool.

This commit is contained in:
Christopher Jeffrey 2016-05-21 09:51:33 -07:00
parent 3e2f43e178
commit c398564036
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
9 changed files with 204 additions and 144 deletions

View File

@ -157,6 +157,18 @@ AbstractBlock.prototype.__defineGetter__('rhash', function() {
return utils.revHex(this.hash('hex'));
});
/**
* Convert the block to an inv item.
* @returns {InvItem}
*/
AbstractBlock.prototype.toInv = function toInv() {
return {
type: constants.inv.BLOCK,
hash: this.hash()
};
};
/*
* Expose
*/

View File

@ -689,7 +689,7 @@ Block.prototype.toMerkle = function toMerkle(filter) {
};
/**
* Test an object to see if it is a Block.
* Test whether an object is a Block.
* @param {Object} obj
* @returns {Boolean}
*/
@ -697,7 +697,7 @@ Block.prototype.toMerkle = function toMerkle(filter) {
Block.isBlock = function isBlock(obj) {
return obj
&& typeof obj.merkleRoot === 'string'
&& typeof obj.toCompact === 'function';
&& typeof obj.getCommitmentHash === 'function';
};
/*

View File

@ -107,7 +107,7 @@ CompactBlock.prototype.toBlock = function toBlock() {
};
/**
* Test an object to see if it is a CompactBlock.
* Test whether an object is a CompactBlock.
* @param {Object} obj
* @returns {Boolean}
*/

View File

@ -391,7 +391,7 @@ MerkleBlock.fromBlock = function fromBlock(block, bloom) {
};
/**
* Test an object to see if it is a MerkleBlock object.
* Test whether an object is a MerkleBlock.
* @param {Object} obj
* @returns {Boolean}
*/

View File

@ -1275,7 +1275,7 @@ MTX.prototype.toTX = function toTX() {
};
/**
* Test an object to see if it is a TX.
* Test whether an object is an MTX.
* @param {Object} obj
* @returns {Boolean}
*/
@ -1283,8 +1283,7 @@ MTX.prototype.toTX = function toTX() {
MTX.isMTX = function isMTX(obj) {
return obj
&& Array.isArray(obj.inputs)
&& typeof obj.ps === 'number'
&& typeof obj.changeIndex === 'number'
&& typeof obj.locktime === 'number'
&& typeof obj.scriptInput === 'function';
};

View File

@ -24,7 +24,6 @@ var constants = bcoin.protocol.constants;
* priority (i.e. a loader).
* @param {Chain} options.chain
* @param {Mempool} options.mempool
* @param {Bloom} options.bloom - The _local_ bloom filter.
* @param {Number?} options.ts - Time at which peer was discovered (unix time).
* @param {net.Socket?} options.socket
* @param {Seed?} options.seed - Host to connect to.
@ -37,7 +36,6 @@ var constants = bcoin.protocol.constants;
* @property {Framer} framer
* @property {Chain} chain
* @property {Mempool} mempool
* @property {Bloom} bloom
* @property {Object?} version - Version packet payload.
* @property {Boolean} destroyed
* @property {Boolean} ack - Whether verack has been received.
@ -49,7 +47,7 @@ var constants = bcoin.protocol.constants;
* either notified via service bits or deprecated `havewitness` packet.
* @property {Hash?} hashContinue - The block hash at which to continue
* the sync for the peer.
* @property {Bloom?} filter - The _peer's_ bloom filter.
* @property {Bloom?} spvFilter - The _peer's_ bloom spvFilter.
* @property {Boolean} relay - Whether to relay transactions
* immediately to the peer.
* @property {BN} challenge - Local nonce.
@ -84,7 +82,6 @@ function Peer(pool, options) {
this.priority = this.options.priority;
this.chain = this.pool.chain;
this.mempool = this.pool.mempool;
this.bloom = this.pool.bloom;
this.network = this.chain.network;
this.parser = new bcoin.protocol.parser({ network: this.network });
this.framer = new bcoin.protocol.framer({ network: this.network });
@ -96,10 +93,10 @@ function Peer(pool, options) {
this.sendHeaders = false;
this.haveWitness = false;
this.hashContinue = null;
this.filter = null;
this.spvFilter = null;
this.relay = true;
this.localNonce = utils.nonce();
this.filterRate = -1;
this.feeRate = -1;
this.addrFilter = new bcoin.bloom.rolling(5000, 0.001);
this.invFilter = new bcoin.bloom.rolling(50000, 0.000001);
@ -187,52 +184,71 @@ Peer.prototype._init = function init() {
});
this.parser.on('error', function(err) {
bcoin.debug('Parse error:');
bcoin.debug(err.stack + '');
self.sendReject(null, 'malformed', 'error parsing message', 100);
self.sendReject(null, 'malformed', 'error parsing message', 1);
self._error(err);
});
this.request('verack', function(err, payload) {
this.request('verack', function callee(err) {
if (err) {
self._error(err);
self.destroy();
return;
}
self.ack = true;
self.emit('ack');
self.ts = utils.now();
// Wait for _their_ version.
if (!self.version) {
bcoin.debug(
'Peer sent a verack without a version (%s).',
self.hostname);
self.request('version', callee);
return;
}
// Setup the ping interval.
self.ping.timer = setInterval(function() {
self.sendPing();
}, self.ping.interval);
// Ask for headers-only.
if (self.options.headers) {
if (self.version && self.version.version > 70012)
self.write(self.framer.sendHeaders());
}
// Let them know we support segwit (old
// segwit3 nodes require this instead
// of service bits).
if (self.options.witness) {
if (self.version && self.version.version >= 70012)
self.write(self.framer.haveWitness());
}
// Find some more peers.
self.write(self.framer.getAddr());
// Relay our spv filter if we have one.
self.updateWatch();
// Announce our currently broadcasted items.
self.sendInv(self.pool.inv.items);
if (self.pool.options.filterRate != null) {
self.write(self.framer.feeFilter({
rate: self.pool.options.filterRate
}));
}
// Set a fee rate filter.
if (self.pool.feeRate !== -1)
self.setFeeRate(self.pool.feeRate);
if (self.chain.isFull())
// If we're fully synced, see
// what we missed out on.
if (self.pool.synced)
self.getMempool();
// Finally we can let the pool know
// that this peer is ready to go.
self.ack = true;
self.ts = utils.now();
self.emit('ack');
});
// Send hello
// Say hello.
this.write(this.framer.version({
height: this.chain.height,
relay: this.options.relay,
@ -277,11 +293,13 @@ Peer.prototype.createSocket = function createSocket(port, host) {
/**
* Broadcast items to peer (transactions or blocks).
* @param {BroadcastEntry[]} items
* @param {Block[]|TX[]|InvItem[]|BroadcastEntry[]} items
*/
Peer.prototype.sendInv = function sendInv(items) {
var self = this;
var inv = [];
var i, item, chunk;
if (this.destroyed)
return;
@ -295,33 +313,25 @@ Peer.prototype.sendInv = function sendInv(items) {
if (!Array.isArray(items))
items = [items];
if (items.length === 0)
return;
for (i = 0; i < items.length; i++) {
item = items[i];
if (items.length > 50000)
items = items.slice(0, 50000);
if (!this.isWatched(item))
continue;
if (this.filter)
items = items.filter(this.isWatched, this);
if (item.toInv)
item = item.toInv();
items = items.map(function(msg) {
if (!(msg instanceof bcoin.tx)
&& !(msg instanceof bcoin.block)) {
return msg;
}
return {
type: (msg instanceof bcoin.tx)
? constants.inv.TX
: constants.inv.BLOCK,
hash: msg.hash()
};
});
if (!this.invFilter.added(item.hash, 'hex'))
continue;
items = items.filter(function(msg) {
return self.invFilter.added(msg.hash, 'hex');
});
inv.push(item);
}
this.write(this.framer.inv(items));
for (i = 0; i < inv.length; i += 50000) {
chunk = inv.slice(i, i + 50000);
this.write(this.framer.inv(chunk));
}
};
/**
@ -357,19 +367,19 @@ Peer.prototype.sendPing = function sendPing() {
*/
Peer.prototype.isWatched = function isWatched(item) {
if (!this.filter)
if (!this.spvFilter)
return true;
if (!item)
return true;
if (item instanceof bcoin.tx)
return item.isWatched(this.filter);
return item.isWatched(this.spvFilter);
if (!(item.msg instanceof bcoin.tx))
return true;
if (item.msg instanceof bcoin.tx)
return item.msg.isWatched(this.spvFilter);
return item.msg.isWatched(this.filter);
return true;
};
/**
@ -381,7 +391,18 @@ Peer.prototype.updateWatch = function updateWatch() {
return;
if (this.ack)
this.write(this.framer.filterLoad(this.bloom));
this.write(this.framer.filterLoad(this.pool.spvFilter));
};
/**
* Set a fee rate filter for the peer.
* @param {Rate} rate
*/
Peer.prototype.setFeeRate = function setFeeRate(rate) {
this.write(this.framer.feeFilter({
rate: rate
}));
};
/**
@ -596,11 +617,11 @@ Peer.prototype._onPacket = function onPacket(packet) {
break;
case 'sendheaders':
this.sendHeaders = true;
this.response(cmd, payload);
this.fire(cmd, payload);
break;
case 'havewitness':
this.haveWitness = true;
this.response(cmd, payload);
this.fire(cmd, payload);
break;
case 'verack':
this.fire(cmd, payload);
@ -616,9 +637,7 @@ Peer.prototype._onPacket = function onPacket(packet) {
};
Peer.prototype.fire = function fire(cmd, payload) {
if (this.response(cmd, payload))
return;
this.response(cmd, payload);
this.emit(cmd, payload);
};
@ -629,15 +648,15 @@ Peer.prototype._flushMerkle = function _flushMerkle() {
};
Peer.prototype._handleFilterLoad = function _handleFilterLoad(payload) {
this.filter = new bcoin.bloom(
this.spvFilter = new bcoin.bloom(
payload.filter,
payload.n,
payload.tweak,
payload.update
);
if (!this.filter.isWithinConstraints()) {
delete this.filter;
if (!this.spvFilter.isWithinConstraints()) {
this.spvFilter = null;
this.setMisbehavior(100);
return;
}
@ -651,15 +670,15 @@ Peer.prototype._handleFilterAdd = function _handleFilterAdd(payload) {
return;
}
if (this.filter)
this.filter.add(payload.data);
if (this.spvFilter)
this.spvFilter.add(payload.data);
this.relay = true;
};
Peer.prototype._handleFilterClear = function _handleFilterClear(payload) {
if (this.filter)
this.filter.reset();
if (this.spvFilter)
this.spvFilter.reset();
this.relay = true;
};
@ -679,7 +698,7 @@ Peer.prototype._handleFeeFilter = function _handleFeeFilter(payload) {
return;
}
this.filterRate = payload.rate;
this.feeRate = payload.rate;
this.fire('feefilter', payload);
};
@ -1012,7 +1031,7 @@ Peer.prototype._handleVersion = function handleVersion(payload) {
// ACK
this.write(this.framer.verack());
this.version = payload;
this.emit('version', payload);
this.fire('version', payload);
};
Peer.prototype._handleMempool = function _handleMempool() {
@ -1103,8 +1122,8 @@ Peer.prototype._handleGetData = function handleGetData(items) {
// We should technically calculate this in
// the `mempool` handler, but it would be
// too slow.
if (self.filterRate !== -1) {
if (bcoin.tx.getRate(entry.size, entry.fees) < self.filterRate)
if (self.feeRate !== -1) {
if (bcoin.tx.getRate(entry.size, entry.fees) < self.feeRate)
return next();
}
@ -1174,7 +1193,7 @@ Peer.prototype._handleGetData = function handleGetData(items) {
return next();
}
block = block.toMerkle(self.filter);
block = block.toMerkle(self.spvFilter);
self.write(self.framer.merkleBlock(block));
@ -1255,7 +1274,7 @@ Peer.prototype._handleAddr = function handleAddr(addrs) {
Peer.prototype._handlePing = function handlePing(data) {
this.write(this.framer.pong(data));
this.emit('ping', this.minPing);
this.fire('ping', this.minPing);
};
Peer.prototype._handlePong = function handlePong(data) {
@ -1287,7 +1306,7 @@ Peer.prototype._handlePong = function handlePong(data) {
this.challenge = null;
this.emit('pong', this.minPing);
this.fire('pong', this.minPing);
};
Peer.prototype._handleGetAddr = function handleGetAddr() {
@ -1341,7 +1360,7 @@ Peer.prototype._handleInv = function handleInv(items) {
var txs = [];
var i, item, unknown;
this.emit('inv', items);
this.fire('inv', items);
for (i = 0; i < items.length; i++) {
item = items[i];
@ -1371,13 +1390,13 @@ Peer.prototype._handleHeaders = function handleHeaders(headers) {
return new bcoin.headers(header);
});
this.emit('headers', headers);
this.fire('headers', headers);
};
Peer.prototype._handleReject = function handleReject(payload) {
var hash, entry;
this.emit('reject', payload);
this.fire('reject', payload);
if (!payload.data)
return;
@ -1403,7 +1422,7 @@ Peer.prototype._handleAlert = function handleAlert(details) {
return;
}
this.emit('alert', details);
this.fire('alert', details);
};
/**

View File

@ -25,6 +25,7 @@ var VerifyError = bcoin.errors.VerifyError;
* for relayed transactions.
* @param {Boolean?} options.headers - Whether
* to use `getheaders` for sync.
* @param {Number?} [options.feeRate] - Fee filter rate.
* @param {Number?} [options.loadTimeout=120000] - Sync timeout before
* finding a new loader peer.
* @param {Number?} [options.loadInterval=20000] - Timeout before attempting to
@ -92,19 +93,11 @@ function Pool(options) {
this.network = this.chain.network;
if (options.relay == null) {
if (options.spv)
options.relay = false;
else
options.relay = true;
}
if (options.relay == null)
options.relay = !options.spv;
if (options.headers == null) {
if (options.spv)
options.headers = true;
else
options.headers = false;
}
if (options.headers == null)
options.headers = options.spv;
seeds = (options.seeds || this.network.seeds).slice();
@ -128,6 +121,7 @@ function Pool(options) {
this.syncing = false;
this.synced = false;
this._scheduled = false;
this._pendingWatch = false;
this.load = {
timeout: options.loadTimeout || 120000,
@ -138,7 +132,11 @@ function Pool(options) {
this.watchMap = {};
this.bloom = bcoin.bloom.fromRate(10000, 0.01, constants.bloom.NONE);
this.feeRate = options.feeRate != null ? options.feeRate : -1;
this.spvFilter = options.spv
? bcoin.bloom.fromRate(10000, 0.01, constants.bloom.NONE)
: null;
this.peers = {
// Peers that are loading blocks themselves
@ -236,7 +234,7 @@ Pool.prototype.connect = function connect() {
if (this.connected)
return;
if (this.options.broadcast) {
if (!this.options.selfish && !this.options.spv) {
if (this.mempool) {
this.mempool.on('tx', function(tx) {
self.announce(tx);
@ -249,13 +247,11 @@ Pool.prototype.connect = function connect() {
// miner sends us an invalid competing
// chain that we can't connect and
// verify yet.
if (!this.options.spv) {
this.chain.on('block', function(block) {
if (!self.synced)
return;
self.announce(block);
});
}
this.chain.on('block', function(block) {
if (!self.synced)
return;
self.announce(block);
});
}
if (this.originalSeeds.length > 0) {
@ -550,7 +546,6 @@ Pool.prototype._addLoader = function _addLoader() {
});
peer.once('ack', function() {
peer.updateWatch();
if (!self.syncing)
return;
self._load();
@ -1119,13 +1114,6 @@ Pool.prototype._addLeech = function _addLeech(socket) {
self._removePeer(peer);
});
peer.once('ack', function() {
if (self.destroyed)
return;
peer.updateWatch();
});
peer.on('merkleblock', function(block) {
if (!self.options.spv)
return;
@ -1198,8 +1186,6 @@ Pool.prototype._addPeer = function _addPeer() {
if (utils.binaryRemove(self.peers.pending, peer, compare))
utils.binaryInsert(self.peers.regular, peer, compare);
peer.updateWatch();
});
peer.on('merkleblock', function(block) {
@ -1255,58 +1241,57 @@ Pool.prototype._removePeer = function _removePeer(peer) {
/**
* Watch a piece of data (filterload, SPV-only).
* @param {Buffer} id
* @param {Buffer|Hash} data
*/
Pool.prototype.watch = function watch(id) {
Pool.prototype.watch = function watch(data) {
var self = this;
var hid, i;
if (id instanceof bcoin.wallet) {
this.watchWallet(id);
if (Buffer.isBuffer(data))
data = data.toString('hex');
if (data instanceof bcoin.wallet) {
this.watchWallet(data);
return;
}
if (id) {
hid = id.toString('hex');
if (this.watchMap[hid]) {
this.watchMap[hid]++;
return;
}
this.watchMap[hid] = 1;
this.bloom.add(id);
if (this.watchMap[data]) {
this.watchMap[data]++;
return;
}
// Send it to peers
this.watchMap[data] = 1;
this.spvFilter.add(data, 'hex');
this.updateWatch();
};
/**
* Unwatch a piece of data (filterload, SPV-only).
* @param {Buffer} id
* @param {Buffer|Hash} data
*/
Pool.prototype.unwatch = function unwatch(id) {
Pool.prototype.unwatch = function unwatch(data) {
var self = this;
var i, hid;
hid = id.toString('hex');
if (Buffer.isBuffer(data))
data = data.toString('hex');
if (!this.watchMap[hid] || --this.watchMap[hid] !== 0)
if (!this.watchMap[data])
return;
delete this.watchMap[hid];
if (--this.watchMap[data] !== 0)
return;
// Reset bloom filter
this.bloom.reset();
Object.keys(this.watchMap).forEach(function(id) {
this.bloom.add(id);
delete this.watchMap[data];
this.spvFilter.reset();
Object.keys(this.watchMap).forEach(function(data) {
this.spvFilter.add(data, 'hex');
}, this);
// Resend it to peers
this.updateWatch();
};
@ -1316,6 +1301,7 @@ Pool.prototype.unwatch = function unwatch(id) {
Pool.prototype.updateWatch = function updateWatch() {
var self = this;
var i;
if (this._pendingWatch)
return;
@ -1353,7 +1339,7 @@ Pool.prototype.addWallet = function addWallet(wallet, callback) {
return callback(err);
for (i = 0; i < txs.length; i++)
self.sendTX(txs[i]);
self.broadcast(txs[i]);
if (!self.options.spv)
return callback();
@ -1401,7 +1387,7 @@ Pool.prototype.unwatchAddress = function unwatchAddress(address) {
Pool.prototype.watchWallet = function watchWallet(wallet) {
Object.keys(wallet.addressMap).forEach(function(address) {
this.watch(new Buffer(address, 'hex'));
this.watch(address);
}, this);
};
@ -1412,7 +1398,7 @@ Pool.prototype.watchWallet = function watchWallet(wallet) {
Pool.prototype.unwatchWallet = function unwatchWallet(wallet) {
Object.keys(wallet.addressMap).forEach(function(address) {
this.unwatch(new Buffer(address, 'hex'));
this.unwatch(address);
}, this);
};
@ -1729,6 +1715,23 @@ Pool.prototype.announce = function announce(msg) {
this.peers.regular[i].sendInv(msg);
};
/**
* Set a fee rate filter for all peers.
* @param {Rate} rate
*/
Pool.prototype.setFeeRate = function setFeeRate(rate) {
var i;
this.feeRate = rate;
if (this.peers.load)
this.peers.load.setFeeRate(rate);
for (i = 0; i < this.peers.regular.length; i++)
this.peers.regular[i].setFeeRate(rate);
};
/**
* Close and destroy the pool.
* @method
@ -2327,6 +2330,15 @@ BroadcastItem.prototype.reject = function reject(peer) {
this.callback.length = 0;
};
/**
* Convert to an InvItem.
* @returns {InvItem}
*/
BroadcastItem.prototype.toInv = function toInv() {
return this;
};
/**
* Inspect the broadcast item.
*/

View File

@ -1700,6 +1700,18 @@ TX.prototype.__defineGetter__('wtxid', function() {
return this.rwhash;
});
/**
* Convert the tx to an inv item.
* @returns {InvItem}
*/
TX.prototype.toInv = function toInv() {
return {
type: constants.inv.TX,
hash: this.hash()
};
};
/**
* Calculate minimum fee based on rate and size.
* @param {Number?} size
@ -1954,7 +1966,7 @@ TX.prototype.toCoins = function toCoins() {
};
/**
* Test an object to see if it is a TX.
* Test whether an object is a TX.
* @param {Object} obj
* @returns {Boolean}
*/
@ -1962,8 +1974,8 @@ TX.prototype.toCoins = function toCoins() {
TX.isTX = function isTX(obj) {
return obj
&& Array.isArray(obj.inputs)
&& typeof obj.ps === 'number'
&& typeof obj.getVirtualSize === 'function';
&& typeof obj.locktime === 'number'
&& typeof obj.witnessHash === 'function';
};
/*

View File

@ -11,6 +11,12 @@
* @global
*/
/**
* One of {@link module:constants.inv}.
* @typedef {Number|String} InvType
* @global
*/
/**
* @typedef {Object} Outpoint
* @property {Hash} hash