more pool work.

This commit is contained in:
Christopher Jeffrey 2016-05-19 16:08:56 -07:00
parent 65125f7b4f
commit cef88c6e4e
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
5 changed files with 167 additions and 106 deletions

View File

@ -102,7 +102,7 @@ Chain.prototype._init = function _init() {
// Hook into events for debugging
this.on('block', function(block, entry) {
if (self.height < self.network.block.slowHeight)
if (!self.synced && self.height < self.network.block.slowHeight)
return;
bcoin.debug('Block %s (%d) added to chain.',

View File

@ -61,6 +61,8 @@ var constants = bcoin.protocol.constants;
*/
function Peer(pool, options) {
var seed;
if (!(this instanceof Peer))
return new Peer(pool, options);
@ -80,8 +82,8 @@ function Peer(pool, options) {
this.mempool = this.pool.mempool;
this.bloom = this.pool.bloom;
this.network = this.chain.network;
this.parser = new bcoin.protocol.parser(this.chain.options);
this.framer = new bcoin.protocol.framer(this.chain.options);
this.parser = new bcoin.protocol.parser({ network: this.network });
this.framer = new bcoin.protocol.framer({ network: this.network });
this.version = null;
this.destroyed = false;
this.ack = false;
@ -104,14 +106,18 @@ function Peer(pool, options) {
this.socket = options.socket;
this.host = this.socket.remoteAddress;
this.port = this.socket.remotePort;
assert(this.host);
assert(this.port != null);
} else if (options.seed) {
options.seed = utils.parseHost(options.seed);
options.seed.port = options.seed.port || this.network.port;
this.socket = this.createSocket(options.seed.port, options.seed.host);
seed = utils.parseHost(options.seed);
this.host = seed.host;
this.port = seed.port || this.network.port;
this.socket = this.createSocket(this.port, this.host);
} else {
assert(false, 'No seed or socket.');
}
assert(typeof this.host === 'string');
assert(typeof this.port === 'number');
if (!this.socket)
throw new Error('No socket');
@ -146,19 +152,9 @@ Peer.uid = 0;
Peer.prototype._init = function init() {
var self = this;
if (!this.host)
this.host = this.socket.remoteAddress || this.socket._host || null;
if (!this.port)
this.port = this.socket.remotePort || 0;
this.socket.once('connect', function() {
self.ts = utils.now();
self.connected = true;
if (!self.host)
self.host = self.socket.remoteAddress;
if (!self.port)
self.port = self.socket.remotePort;
self.emit('connect');
});
@ -181,6 +177,7 @@ Peer.prototype._init = function init() {
});
this.parser.on('error', function(err) {
bcoin.debug('Parse error:');
bcoin.debug(err.stack + '');
self.sendReject(null, 'malformed', 'error parsing message', 100);
self._error(err);
@ -249,12 +246,6 @@ Peer.prototype.createSocket = function createSocket(port, host) {
var self = this;
var socket, net;
assert(port != null);
assert(host);
this.host = host;
this.port = port;
if (this._createSocket) {
socket = this._createSocket(port, host);
} else if (bcoin.isBrowser) {
@ -264,13 +255,16 @@ Peer.prototype.createSocket = function createSocket(port, host) {
socket = net.connect(port, host);
}
if (utils.isIP(host) === 6)
host = '[' + host + ']';
bcoin.debug(
'Connecting to %s:%d (priority=%s)',
'Connecting to %s:%d (priority=%s).',
host, port, this.priority);
socket.on('connect', function() {
bcoin.debug(
'Connected to %s:%d (priority=%s)',
'Connected to %s:%d (priority=%s).',
host, port, self.priority);
});
@ -411,7 +405,7 @@ Peer.prototype._error = function error(err) {
if (typeof err === 'string')
err = new Error(err);
err.message += ' (' + this.host + ':' + this.port + ')';
err.message += ' (' + this.hostname + ')';
this.destroy();
this.emit('error', err);
@ -583,7 +577,7 @@ Peer.prototype._onPacket = function onPacket(packet) {
this.fire(cmd, payload);
break;
default:
bcoin.debug('Unknown packet: %s', cmd);
bcoin.debug('Unknown packet: %s.', cmd);
this.fire(cmd, payload);
break;
}
@ -642,7 +636,8 @@ Peer.prototype._handleUTXOs = function _handleUTXOs(payload) {
payload.coins = payload.coins(function(coin) {
return new bcoin.coin(coin);
});
bcoin.debug('Received %d utxos from %s.', payload.coins.length, this.host);
bcoin.debug('Received %d utxos (%s).',
payload.coins.length, this.hostname);
this.fire('utxos', payload);
};
@ -1006,7 +1001,7 @@ Peer.prototype._handleMempool = function _handleMempool() {
for (i = 0; i < hashes.length; i++)
items.push({ type: constants.inv.TX, hash: hashes[i] });
bcoin.debug('Sending mempool snapshot to %s.', self.host);
bcoin.debug('Sending mempool snapshot (%s).', self.hostname);
self.sendInv(items);
});
@ -1033,17 +1028,17 @@ Peer.prototype._handleGetData = function handleGetData(items) {
if ((item.type & ~constants.WITNESS_MASK) !== entry.type) {
bcoin.debug(
'Peer %s requested an existing item with the wrong type.',
this.host);
'Peer requested an existing item with the wrong type (%s).',
this.hostname);
continue;
}
bcoin.debug(
'Peer %s requested %s:%s as a %s packet.',
this.host,
'Peer requested %s:%s as a %s packet (%s).',
entry.packetType,
utils.revHex(entry.key),
witness ? 'witness' : 'normal');
utils.revHex(entry.hash),
witness ? 'witness' : 'normal',
this.hostname);
entry.sendTo(peer, witness);
}
@ -1182,10 +1177,10 @@ Peer.prototype._handleGetData = function handleGetData(items) {
self.emit('error', err);
bcoin.debug(
'Served %d items to %s with getdata (notfound=%d).',
'Served %d items s with getdata (notfound=%d) (%s).',
items.length - notfound.length,
self.host,
notfound.length);
notfound.length,
self.hostname);
if (notfound.length > 0)
self.write(self.framer.notFound(notfound));
@ -1217,10 +1212,11 @@ Peer.prototype._handleAddr = function handleAddr(addrs) {
}
bcoin.debug(
'Recieved %d peers (seeds=%d, peers=%d).',
'Received %d addrs (seeds=%d, peers=%d) (%s).',
addrs.length,
this.pool.seeds.length,
this.pool.peers.all.length);
this.pool.peers.all.length,
this.hostname);
};
Peer.prototype._handlePing = function handlePing(data) {
@ -1250,7 +1246,7 @@ Peer.prototype._handleGetAddr = function handleGetAddr() {
for (i = 0; i < this.pool.seeds.length; i++) {
seed = utils.parseHost(this.pool.seeds[i]);
seed = this.pool.getPeer(seed) || seed;
seed = this.pool.getPeer(seed.host) || seed;
version = utils.isIP(seed.host);
if (!version)
@ -1274,6 +1270,11 @@ Peer.prototype._handleGetAddr = function handleGetAddr() {
break;
}
bcoin.debug(
'Sending %d addrs to peer (%s)',
addrs.length,
this.hostname);
return this.write(this.framer.addr(items));
};
@ -1329,7 +1330,7 @@ Peer.prototype._handleAlert = function handleAlert(details) {
var signature = details.signature;
if (!bcoin.ec.verify(hash, signature, this.network.alertKey)) {
bcoin.debug('Peer %s sent a phony alert packet.', this.host);
bcoin.debug('Peer sent a phony alert packet (%s).', this.hostname);
// Let's look at it because why not?
bcoin.debug(details);
this.setMisbehavior(100);
@ -1348,8 +1349,8 @@ Peer.prototype._handleAlert = function handleAlert(details) {
Peer.prototype.getHeaders = function getHeaders(locator, stop) {
bcoin.debug(
'Requesting headers packet from %s with getheaders',
this.host);
'Requesting headers packet from peer with getheaders (%s).',
this.hostname);
bcoin.debug('Height: %s, Hash: %s, Stop: %s',
locator && locator.length ? this.chain._getCachedHeight(locator[0]) : -1,
@ -1367,8 +1368,8 @@ Peer.prototype.getHeaders = function getHeaders(locator, stop) {
Peer.prototype.getBlocks = function getBlocks(locator, stop) {
bcoin.debug(
'Requesting inv packet from %s with getblocks',
this.host);
'Requesting inv packet from peer with getblocks (%s).',
this.hostname);
bcoin.debug('Height: %s, Hash: %s, Stop: %s',
locator && locator.length ? this.chain._getCachedHeight(locator[0]) : null,
@ -1384,8 +1385,8 @@ Peer.prototype.getBlocks = function getBlocks(locator, stop) {
Peer.prototype.getMempool = function getMempool() {
bcoin.debug(
'Requesting inv packet from %s with mempool',
this.host);
'Requesting inv packet from peer with mempool (%s).',
this.hostname);
this.write(this.framer.mempool());
};
@ -1397,8 +1398,8 @@ Peer.prototype.getMempool = function getMempool() {
Peer.prototype.reject = function reject(details) {
bcoin.debug(
'Sending reject packet to %s',
this.host);
'Sending reject packet to peer (%s).',
this.hostname);
this.write(this.framer.reject(details));
};
@ -1434,6 +1435,28 @@ Peer.prototype.sendReject = function sendReject(obj, code, reason, score) {
return this.pool.reject(this, obj, code, reason, score);
};
Peer.prototype.__defineGetter__('hostname', function() {
var host = this.host;
if (utils.isIP(host) === 6)
host = '[' + host + ']';
return host + ':' + this.port;
});
/**
* Inspect the peer.
* @returns {String}
*/
Peer.prototype.inspect = function inspect() {
return '<Peer:'
+ ' id=' + this.id
+ ' connected=' + this.connected
+ ' host=' + this.hostname
+ '>';
};
/*
* Helpers
*/

View File

@ -253,7 +253,7 @@ Pool.prototype.connect = function connect() {
if (this.originalSeeds.length > 0) {
this._addLoader();
for (i = 0; i < this.size; i++)
for (i = 0; i < this.size - 1; i++)
this._addPeer();
this.connected = true;
@ -413,7 +413,7 @@ Pool.prototype.listen = function listen(callback) {
this.server.on('listening', function() {
var data = self.server.address();
bcoin.debug(
'Bitcoin server listening on %s (port=%d)',
'Bitcoin server listening on %s (port=%d).',
data.address, data.port);
});
@ -484,9 +484,7 @@ Pool.prototype._startInterval = function _startInterval() {
if (self.chain.isBusy())
return self._startTimer();
bcoin.debug('Stall recovery: loading again.');
// self._load();
bcoin.debug('Warning: Stalling.');
}
this._interval = setInterval(load, this.load.interval);
@ -507,7 +505,7 @@ Pool.prototype._addLoader = function _addLoader() {
if (this.destroyed)
return;
if (this.peers.load != null)
if (this.peers.load)
return;
peer = this._createPeer({
@ -518,7 +516,7 @@ Pool.prototype._addLoader = function _addLoader() {
witness: this.options.witness
});
bcoin.debug('Added loader peer: %s', peer.host);
bcoin.debug('Added loader peer (%s).', peer.hostname);
this.peers.load = peer;
this.peers.all.push(peer);
@ -528,8 +526,16 @@ Pool.prototype._addLoader = function _addLoader() {
self._stopInterval();
self._stopTimer();
self._removePeer(peer);
if (self.destroyed)
if (self.peers.regular.length === 0) {
bcoin.debug('%s %s %s',
'Could not connect to any peers.',
'Do you have a network connection?',
'Retrying in 5 seconds.');
setTimeout(function() {
self._addLoader();
}, 5000);
return;
}
self._addLoader();
});
@ -660,9 +666,9 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer, callback)
return callback();
bcoin.debug(
'Recieved %s headers from %s',
'Received %s headers from peer (%s).',
headers.length,
peer.host);
peer.hostname);
if (headers.length > 2000) {
peer.setMisbehavior(100);
@ -718,9 +724,9 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer, callback) {
return callback();
bcoin.debug(
'Recieved %s block hashes from %s',
'Received %s block hashes from peer (%s).',
hashes.length,
peer.host);
peer.hostname);
// Normally this is 500, but with older
// versions locator.GetDistanceBack() is called.
@ -812,8 +818,8 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
// us requesting them.
if (!requested) {
bcoin.debug(
'Recieved unrequested block: %s (%s)',
block.rhash, peer.host);
'Received unrequested block: %s (%s).',
block.rhash, peer.hostname);
return callback();
}
@ -916,7 +922,7 @@ Pool.prototype._createPeer = function _createPeer(options) {
bcoin.debug(
'Reject (%s): msg=%s ccode=%s reason=%s data=%s',
peer.host,
peer.hostname,
payload.message,
payload.ccode,
payload.reason,
@ -926,7 +932,7 @@ Pool.prototype._createPeer = function _createPeer(options) {
});
peer.on('alert', function(payload) {
bcoin.debug('Received alert from: %s', peer.host);
bcoin.debug('Received alert from peer (%s).', peer.hostname);
bcoin.debug(payload);
self.emit('alert', payload, peer);
});
@ -1001,8 +1007,8 @@ Pool.prototype._createPeer = function _createPeer(options) {
self.block.versionHeight = version.height;
bcoin.debug(
'Received version from %s: version=%d height=%d agent=%s',
peer.host, version.version, version.height, version.agent);
'Received version (%s): version=%d height=%d agent=%s',
peer.hostname, version.version, version.height, version.agent);
bcoin.time.add(peer.host, version.ts);
@ -1065,7 +1071,7 @@ Pool.prototype._addLeech = function _addLeech(socket) {
witness: false
});
bcoin.debug('Added leech peer: %s', peer.host);
bcoin.debug('Added leech peer (%s).', peer.hostname);
this.peers.leeches.push(peer);
this.peers.all.push(peer);
@ -1117,7 +1123,7 @@ Pool.prototype._addPeer = function _addPeer() {
if (this.destroyed)
return;
if (this.peers.regular.length + this.peers.pending.length >= this.size)
if (this.peers.regular.length + this.peers.pending.length >= this.size - 1)
return;
seed = this.getSeed(false);
@ -1215,10 +1221,10 @@ Pool.prototype.bestPeer = function bestPeer() {
};
Pool.prototype._removePeer = function _removePeer(peer) {
utils.binaryRemove(this.peers.pending, peer);
utils.binaryRemove(this.peers.regular, peer);
utils.binaryRemove(this.peers.leeches, peer);
utils.binaryRemove(this.peers.all, peer);
utils.binaryRemove(this.peers.pending, peer, compare);
utils.binaryRemove(this.peers.regular, peer, compare);
utils.binaryRemove(this.peers.leeches, peer, compare);
utils.binaryRemove(this.peers.all, peer, compare);
delete this.peers.map[peer.host];
if (this.peers.load === peer) {
@ -1227,7 +1233,7 @@ Pool.prototype._removePeer = function _removePeer(peer) {
if (item.peer === peer)
item.finish();
}, this);
bcoin.debug('Removed loader peer (%s).', peer.host);
bcoin.debug('Removed loader peer (%s).', peer.hostname);
this.peers.load = null;
}
};
@ -1424,13 +1430,13 @@ Pool.prototype.searchWallet = function(wallet, callback) {
self.chain.reset(height, function(err) {
if (err) {
bcoin.debug('Failed to reset height: %s', err.stack + '');
bcoin.debug('Failed to reset height: %s', err.message);
return callback(err);
}
bcoin.debug('Wallet height: %s', height);
bcoin.debug('Wallet height: %s.', height);
bcoin.debug(
'Reverted chain to height=%d (%s)',
'Reverted chain to height=%d (%s).',
self.chain.height,
utils.date(self.chain.tip.ts)
);
@ -1446,13 +1452,13 @@ Pool.prototype.searchWallet = function(wallet, callback) {
self.chain.resetTime(ts, function(err) {
if (err) {
bcoin.debug('Failed to reset time: %s', err.stack + '');
bcoin.debug('Failed to reset time: %s', err.message);
return callback(err);
}
bcoin.debug('Wallet time: %s', utils.date(ts));
bcoin.debug('Wallet time is %s.', utils.date(ts));
bcoin.debug(
'Reverted chain to height=%d (%s)',
'Reverted chain to height=%d (%s).',
self.chain.height,
utils.date(self.chain.tip.ts)
);
@ -1511,10 +1517,10 @@ Pool.prototype.getData = function getData(peer, type, hash, options, callback) {
if (peer.queue.tx.length === 0) {
utils.nextTick(function() {
bcoin.debug(
'Requesting %d/%d txs from %s with getdata',
'Requesting %d/%d txs from peer with getdata (%s).',
peer.queue.tx.length,
self.request.activeTX,
peer.host);
peer.hostname);
peer.getData(peer.queue.tx);
peer.queue.tx.length = 0;
@ -1559,9 +1565,12 @@ Pool.prototype.scheduleRequests = function scheduleRequests(peer) {
this._scheduled = true;
utils.print(this.request);
this.chain.onFlush(function() {
utils.nextTick(function() {
self._sendRequests(peer);
utils.print(self.request);
self._scheduled = false;
});
});
@ -1599,10 +1608,10 @@ Pool.prototype._sendRequests = function _sendRequests(peer) {
});
bcoin.debug(
'Requesting %d/%d blocks from %s with getdata',
'Requesting %d/%d blocks from peer with getdata (%s).',
items.length,
this.request.activeBlocks,
peer.host);
peer.hostname);
peer.getData(items);
};
@ -1661,7 +1670,12 @@ Pool.prototype.broadcast = function broadcast(msg, callback) {
*/
Pool.prototype.announce = function announce(msg) {
for (var i = 0; i < this.peers.all.length; i++)
var i;
if (this.peers.load)
this.peers.load.sendInv(msg);
for (i = 0; i < this.peers.regular.length; i++)
this.peers.all[i].sendInv(msg);
};
@ -1907,7 +1921,7 @@ Pool.prototype.setMisbehavior = function setMisbehavior(peer, score) {
if (peer.banScore >= constants.BAN_SCORE) {
this.peers.misbehaving[peer.host] = utils.now();
bcoin.debug('Ban threshold exceeded for %s', peer.host);
bcoin.debug('Ban threshold exceeded (%s).', peer.host);
peer.destroy();
return true;
}
@ -1959,8 +1973,8 @@ Pool.prototype.reject = function reject(peer, obj, code, reason, score) {
if (obj) {
type = (obj instanceof bcoin.tx) ? 'tx' : 'block';
bcoin.debug('Rejecting %s %s from %s: ccode=%s reason=%s',
type, obj.rhash, peer.host, code, reason);
bcoin.debug('Rejecting %s %s (%s): ccode=%s reason=%s.',
type, obj.rhash, peer.hostname, code, reason);
peer.reject({
message: type,
@ -1969,8 +1983,8 @@ Pool.prototype.reject = function reject(peer, obj, code, reason, score) {
data: obj.hash()
});
} else {
bcoin.debug('Rejecting packet from %s: ccode=%s reason=%s',
peer.host, code, reason);
bcoin.debug('Rejecting packet from %s: ccode=%s reason=%s.',
peer.hostname, code, reason);
peer.reject({
ccode: code,
@ -2067,6 +2081,20 @@ LoadRequest.prototype.finish = function finish() {
}
};
/**
* Inspect the load request.
* @returns {String}
*/
LoadRequest.prototype.inspect = function inspect() {
return '<LoadRequest:'
+ ' id=' + this.id
+ ' type=' + (this.type === this.pool.tx.type ? 'tx' : 'block')
+ ' active=' + this.active
+ ' hash=' + utils.revHex(this.hash)
+ '>';
};
/**
* Represents an item that is broadcasted via an inv/getdata cycle.
* @exports BroadcastItem
@ -2093,12 +2121,11 @@ function BroadcastItem(pool, item, callback) {
this.callback = [];
this.id = this.pool.uid++;
this.key = item.hash('hex');
this.hash = item.hash('hex');
this.type = (item instanceof bcoin.tx)
? constants.inv.TX
: constants.inv.BLOCK;
this.msg = item;
this.hash = item.hash();
this.normalValue = item.renderNormal();
this.witnessValue = item.render();
@ -2130,9 +2157,9 @@ BroadcastItem.prototype.start = function start() {
var i;
assert(!this.timeout, 'Already started.');
assert(!this.pool.inv.map[this.key], 'Already started.');
assert(!this.pool.inv.map[this.hash], 'Already started.');
this.pool.inv.map[this.key] = this;
this.pool.inv.map[this.hash] = this;
utils.binaryInsert(this.pool.inv.list, this, compare);
this.refresh();
@ -2170,12 +2197,12 @@ BroadcastItem.prototype.finish = function finish(err) {
var i;
assert(this.timeout, 'Already finished.');
assert(this.pool.inv.map[this.key], 'Already finished.');
assert(this.pool.inv.map[this.hash], 'Already finished.');
clearInterval(this.timeout);
this.timeout = null;
delete this.pool.inv.map[this.key];
delete this.pool.inv.map[this.hash];
utils.binaryRemove(this.pool.inv.list, this, compare);
for (i = 0; i < this.callback.length; i++)
@ -2218,7 +2245,7 @@ BroadcastItem.prototype.reject = function reject(peer) {
this.emit('reject', peer);
err = new Error('Rejected by ' + peer.host);
err = new Error('Rejected by ' + peer.hostname);
for (i = 0; i < this.callback.length; i++)
this.callback[i](err);
@ -2226,6 +2253,18 @@ BroadcastItem.prototype.reject = function reject(peer) {
this.callback.length = 0;
};
/**
* Inspect the broadcast item.
*/
BroadcastItem.prototype.inspect = function inspect() {
return '<BroadcastItem:'
+ ' id=' + this.id
+ ' type=' + (this.type === constants.TX ? 'tx' : 'block')
+ ' hash=' + utils.revHex(this.hash)
+ '>';
};
/*
* Helpers
*/

View File

@ -31,7 +31,7 @@ function Framer(options) {
this.options = options;
this.network = bcoin.network.get(options.network);
this.agent = new Buffer(options.USER_AGENT || constants.USER_AGENT, 'ascii');
this.agent = options.agent || constants.USER_AGENT;
}
/**
@ -454,9 +454,6 @@ Framer.version = function version(options, writer) {
var local = options.local || {};
var nonce = options.nonce;
if (typeof agent === 'string')
agent = new Buffer(agent, 'ascii');
if (local.network == null)
local.network = options.network;
@ -475,7 +472,7 @@ Framer.version = function version(options, writer) {
Framer.address(remote, false, p);
Framer.address(local, false, p);
p.writeU64(nonce);
p.writeVarString(agent);
p.writeVarString(agent, 'ascii');
p.write32(options.height || 0);
p.writeU8(options.relay ? 1 : 0);

View File

@ -805,7 +805,8 @@ segnet3.block = {
bip34height: -1,
bip34hash: null,
pruneAfterHeight: 1000,
maxTipAge: 0x7fffffff,
// maxTipAge: 0x7fffffff,
maxTipAge: 24 * 60 * 60,
slowHeight: 0x7fffffff
};
@ -925,7 +926,8 @@ segnet4.block = {
bip34height: -1,
bip34hash: null,
pruneAfterHeight: 1000,
maxTipAge: 0x7fffffff,
// maxTipAge: 0x7fffffff,
maxTipAge: 24 * 60 * 60,
slowHeight: 0x7fffffff
};