rewrite broadcast system.
This commit is contained in:
parent
ee3d3eaea1
commit
07517ecd04
@ -514,10 +514,10 @@ Block.reward = function reward(height, network) {
|
||||
network = bcoin.network.get(network);
|
||||
halvings = height / network.halvingInterval | 0;
|
||||
|
||||
// BIP 42 (not technically necessary since we
|
||||
// don't suffer from the undefined behavior of C++).
|
||||
// BIP 42 (well, our own version of it,
|
||||
// since we can only handle 32 bit shifts).
|
||||
// https://github.com/bitcoin/bips/blob/master/bip-0042.mediawiki
|
||||
if (halvings >= 64)
|
||||
if (halvings >= 33)
|
||||
return 0;
|
||||
|
||||
// We need to shift right by `halvings`,
|
||||
@ -527,13 +527,6 @@ Block.reward = function reward(height, network) {
|
||||
if (halvings === 0)
|
||||
return 5000000000;
|
||||
|
||||
// We can't shift right by 32 bits.
|
||||
// 25m bitcoin is 32 bits, so we
|
||||
// can safely return zero if the
|
||||
// shift will be 32.
|
||||
if (halvings >= 33)
|
||||
return 0;
|
||||
|
||||
return 2500000000 >>> (halvings - 1);
|
||||
};
|
||||
|
||||
|
||||
@ -673,6 +673,7 @@ Iterator.prototype.next = function(callback) {
|
||||
|
||||
Iterator.prototype.seek = function seek(key) {
|
||||
var self = this;
|
||||
var item;
|
||||
|
||||
assert(!this.ended, 'Already ended.');
|
||||
|
||||
@ -682,6 +683,11 @@ Iterator.prototype.seek = function seek(key) {
|
||||
this.index = utils.binarySearch(this.items, key, true, function(a, b) {
|
||||
return self.tree.compare(a.key, b);
|
||||
});
|
||||
|
||||
item = this.items[this.index];
|
||||
|
||||
if (!item || this.tree.compare(item.key, key) !== 0)
|
||||
this.index += 1;
|
||||
};
|
||||
|
||||
Iterator.prototype._end = function end(callback) {
|
||||
|
||||
@ -285,7 +285,7 @@ ChainDB.prototype.getHeight = function getHeight(hash, callback) {
|
||||
|
||||
this.db.fetch('h/' + hash, function(data) {
|
||||
assert(data.length === 4, 'Database corruption.');
|
||||
return utils.readU32(data, 0);
|
||||
return data.readUInt32LE(0, true);
|
||||
}, function(err, height) {
|
||||
if (err)
|
||||
return callback(err);
|
||||
|
||||
@ -179,7 +179,7 @@ Fullnode.prototype._init = function _init() {
|
||||
});
|
||||
|
||||
this.miner.on('block', function(block) {
|
||||
self.pool.sendBlock(block);
|
||||
self.pool.announce(block);
|
||||
});
|
||||
|
||||
function load(err) {
|
||||
@ -262,11 +262,11 @@ Fullnode.prototype.sendTX = function sendTX(item, wait, callback) {
|
||||
return callback(err);
|
||||
|
||||
if (!wait) {
|
||||
self.pool.broadcast(item);
|
||||
self.pool.announce(item);
|
||||
return callback();
|
||||
}
|
||||
|
||||
self.pool.broadcast(item, callback);
|
||||
self.pool.announce(item, callback);
|
||||
});
|
||||
};
|
||||
|
||||
|
||||
@ -113,19 +113,13 @@ function Peer(pool, options) {
|
||||
if (!this.socket)
|
||||
throw new Error('No socket');
|
||||
|
||||
this._broadcast = {
|
||||
timeout: this.options.broadcastTimeout || 30000,
|
||||
interval: this.options.broadcastInterval || 3000,
|
||||
this.requests = {
|
||||
timeout: this.options.requestTimeout || 10000,
|
||||
skip: {},
|
||||
map: {}
|
||||
};
|
||||
|
||||
this._request = {
|
||||
timeout: this.options.requestTimeout || 10000,
|
||||
skip: {},
|
||||
queue: {}
|
||||
};
|
||||
|
||||
this._ping = {
|
||||
this.pings = {
|
||||
timer: null,
|
||||
interval: this.options.pingInterval || 30000
|
||||
};
|
||||
@ -135,9 +129,8 @@ function Peer(pool, options) {
|
||||
tx: []
|
||||
};
|
||||
|
||||
Peer.uid.iaddn(1);
|
||||
|
||||
this.id = Peer.uid.toString(10);
|
||||
this.uid = 0;
|
||||
this.id = Peer.uid++;
|
||||
|
||||
this.setMaxListeners(10000);
|
||||
|
||||
@ -146,7 +139,7 @@ function Peer(pool, options) {
|
||||
|
||||
utils.inherits(Peer, EventEmitter);
|
||||
|
||||
Peer.uid = new bn(0);
|
||||
Peer.uid = 0;
|
||||
|
||||
Peer.prototype._init = function init() {
|
||||
var self = this;
|
||||
@ -193,13 +186,13 @@ Peer.prototype._init = function init() {
|
||||
|
||||
this.challenge = utils.nonce();
|
||||
|
||||
this._ping.timer = setInterval(function() {
|
||||
this.pings.timer = setInterval(function() {
|
||||
self.write(self.framer.ping({
|
||||
nonce: self.challenge
|
||||
}));
|
||||
}, this._ping.interval);
|
||||
}, this.pings.interval);
|
||||
|
||||
this._req('verack', function(err, payload) {
|
||||
this.request('verack', function(err, payload) {
|
||||
if (err) {
|
||||
self._error(err);
|
||||
self.destroy();
|
||||
@ -275,97 +268,45 @@ Peer.prototype.createSocket = function createSocket(port, host) {
|
||||
|
||||
/**
|
||||
* Broadcast items to peer (transactions or blocks).
|
||||
* @param {TX|Block|TX[]|Block[]} items
|
||||
* @returns {BroadcastPromise[]}
|
||||
* @param {BroadcastEntry[]} items
|
||||
*/
|
||||
|
||||
Peer.prototype.broadcast = function broadcast(items) {
|
||||
var self = this;
|
||||
var result = [];
|
||||
var payload = [];
|
||||
|
||||
if (!this.relay)
|
||||
Peer.prototype.sendInv = function sendInv(items) {
|
||||
if (this.destroyed)
|
||||
return;
|
||||
|
||||
if (this.destroyed)
|
||||
if (!this.relay)
|
||||
return;
|
||||
|
||||
if (!Array.isArray(items))
|
||||
items = [items];
|
||||
|
||||
items.forEach(function(item) {
|
||||
var key, old, type, entry, packetType;
|
||||
if (this.filter)
|
||||
items = items.map(this.isWatched, this);
|
||||
|
||||
if (item.mutable)
|
||||
item = item.toTX();
|
||||
this.write(this.framer.inv(items));
|
||||
};
|
||||
|
||||
key = item.hash('hex');
|
||||
old = this._broadcast.map[key];
|
||||
type = item.type;
|
||||
/**
|
||||
* Test whether an is being watched by the peer.
|
||||
* @param {BroadcastItem|TX} item
|
||||
* @returns {Boolean}
|
||||
*/
|
||||
|
||||
if (typeof type === 'string')
|
||||
type = constants.inv[type.toUpperCase()];
|
||||
Peer.prototype.isWatched = function isWatched(item) {
|
||||
if (!this.filter)
|
||||
return true;
|
||||
|
||||
// INV does not set the witness
|
||||
// mask (only GETDATA does this).
|
||||
type &= ~constants.WITNESS_MASK;
|
||||
if (!item)
|
||||
return true;
|
||||
|
||||
if (type === constants.inv.BLOCK)
|
||||
packetType = 'block';
|
||||
else if (type === constants.inv.TX)
|
||||
packetType = 'tx';
|
||||
else
|
||||
assert(false, 'Bad type.');
|
||||
if (item instanceof bcoin.tx)
|
||||
return item.isWatched(this.filter);
|
||||
|
||||
if (self.filter && type === constants.inv.TX) {
|
||||
if (!item.isWatched(self.filter))
|
||||
return;
|
||||
}
|
||||
if (!(item.msg instanceof bcoin.tx))
|
||||
return true;
|
||||
|
||||
if (old) {
|
||||
clearTimeout(old.timer);
|
||||
clearInterval(old.interval);
|
||||
}
|
||||
|
||||
// Auto-cleanup broadcast map after timeout
|
||||
entry = {
|
||||
e: new EventEmitter(),
|
||||
timeout: setTimeout(function() {
|
||||
entry.e.emit('timeout');
|
||||
clearInterval(entry.interval);
|
||||
delete self._broadcast.map[key];
|
||||
}, this._broadcast.timeout),
|
||||
|
||||
// Retransmit
|
||||
interval: setInterval(function() {
|
||||
self.write(entry.inv);
|
||||
}, this._broadcast.interval),
|
||||
|
||||
inv: this.framer.inv([{
|
||||
type: type,
|
||||
hash: item.hash()
|
||||
}]),
|
||||
|
||||
packetType: packetType,
|
||||
type: type,
|
||||
hash: item.hash(),
|
||||
value: item.renderNormal(),
|
||||
witnessValue: item.renderWitness()
|
||||
};
|
||||
|
||||
this._broadcast.map[key] = entry;
|
||||
|
||||
result.push(entry.e);
|
||||
|
||||
payload.push({
|
||||
type: entry.type,
|
||||
hash: entry.hash
|
||||
});
|
||||
}, this);
|
||||
|
||||
this.write(this.framer.inv(payload));
|
||||
|
||||
return result;
|
||||
return item.msg.isWatched(this.filter);
|
||||
};
|
||||
|
||||
/**
|
||||
@ -399,17 +340,11 @@ Peer.prototype.destroy = function destroy() {
|
||||
this.socket = null;
|
||||
this.emit('close');
|
||||
|
||||
// Clean-up timeouts
|
||||
Object.keys(this._broadcast.map).forEach(function(key) {
|
||||
clearTimeout(this._broadcast.map[key].timer);
|
||||
clearInterval(this._broadcast.map[key].interval);
|
||||
}, this);
|
||||
clearInterval(this.pings.timer);
|
||||
this.pings.timer = null;
|
||||
|
||||
clearInterval(this._ping.timer);
|
||||
this._ping.timer = null;
|
||||
|
||||
Object.keys(this._request).forEach(function(cmd) {
|
||||
var queue = this._request[cmd];
|
||||
Object.keys(this.requests.map).forEach(function(cmd) {
|
||||
var queue = this.requests.map[cmd];
|
||||
var i;
|
||||
|
||||
for (i = 0; i < queue.length; i++)
|
||||
@ -457,52 +392,51 @@ Peer.prototype._error = function error(err) {
|
||||
* Executed on timeout or once packet is received.
|
||||
*/
|
||||
|
||||
Peer.prototype._req = function _req(cmd, cb) {
|
||||
Peer.prototype.request = function request(cmd, callback) {
|
||||
var self = this;
|
||||
var entry;
|
||||
|
||||
if (this.destroyed)
|
||||
return utils.asyncify(cb)(new Error('Destroyed, sorry'));
|
||||
return utils.asyncify(callback)(new Error('Destroyed, sorry'));
|
||||
|
||||
entry = {
|
||||
cmd: cmd,
|
||||
cb: cb,
|
||||
callback: callback,
|
||||
id: this.uid++,
|
||||
ontimeout: function() {
|
||||
var queue = self._request.queue[cmd];
|
||||
var i;
|
||||
var queue = self.requests.map[cmd];
|
||||
|
||||
if (!queue)
|
||||
return;
|
||||
|
||||
i = queue.indexOf(entry);
|
||||
|
||||
if (i !== -1) {
|
||||
queue.splice(i, 1);
|
||||
cb(new Error('Timed out: ' + cmd), null);
|
||||
if (utils.binaryRemove(queue, entry, compare)) {
|
||||
if (queue.length === 0)
|
||||
delete self.requests.map[cmd];
|
||||
callback(new Error('Timed out: ' + cmd));
|
||||
}
|
||||
},
|
||||
timer: null
|
||||
};
|
||||
|
||||
entry.timer = setTimeout(entry.ontimeout, this._request.timeout);
|
||||
entry.timer = setTimeout(entry.ontimeout, this.requests.timeout);
|
||||
|
||||
if (!this._request.queue[cmd])
|
||||
this._request.queue[cmd] = [];
|
||||
if (!this.requests.map[cmd])
|
||||
this.requests.map[cmd] = [];
|
||||
|
||||
this._request.queue[cmd].push(entry);
|
||||
this.requests.map[cmd].push(entry);
|
||||
|
||||
return entry;
|
||||
};
|
||||
|
||||
/**
|
||||
* Fulfill awaiting requests created with {@link Peer#_req}.
|
||||
* Fulfill awaiting requests created with {@link Peer#request}.
|
||||
* @private
|
||||
* @param {String} cmd - Packet name.
|
||||
* @param {Object} payload
|
||||
*/
|
||||
|
||||
Peer.prototype._res = function _res(cmd, payload) {
|
||||
var queue = this._request.queue[cmd];
|
||||
Peer.prototype.response = function response(cmd, payload) {
|
||||
var queue = this.requests.map[cmd];
|
||||
var entry, res;
|
||||
|
||||
if (!queue)
|
||||
@ -513,12 +447,12 @@ Peer.prototype._res = function _res(cmd, payload) {
|
||||
if (!entry)
|
||||
return false;
|
||||
|
||||
res = entry.cb(null, payload, cmd);
|
||||
res = entry.callback(null, payload, cmd);
|
||||
|
||||
if (res !== this._request.skip) {
|
||||
if (res !== this.requests.skip) {
|
||||
queue.shift();
|
||||
if (queue.length === 0)
|
||||
delete this._request.queue[cmd];
|
||||
delete this.requests.map[cmd];
|
||||
clearTimeout(entry.timer);
|
||||
entry.timer = null;
|
||||
return true;
|
||||
@ -601,11 +535,11 @@ Peer.prototype._onPacket = function onPacket(packet) {
|
||||
break;
|
||||
case 'sendheaders':
|
||||
this.sendHeaders = true;
|
||||
this._res(cmd, payload);
|
||||
this.response(cmd, payload);
|
||||
break;
|
||||
case 'havewitness':
|
||||
this.haveWitness = true;
|
||||
this._res(cmd, payload);
|
||||
this.response(cmd, payload);
|
||||
break;
|
||||
case 'verack':
|
||||
this._emit(cmd, payload);
|
||||
@ -621,7 +555,7 @@ Peer.prototype._onPacket = function onPacket(packet) {
|
||||
};
|
||||
|
||||
Peer.prototype._emit = function _emit(cmd, payload) {
|
||||
if (this._res(cmd, payload))
|
||||
if (this.response(cmd, payload))
|
||||
return;
|
||||
|
||||
this.emit(cmd, payload);
|
||||
@ -706,7 +640,7 @@ Peer.prototype._getUTXOs = function getUTXOs(utxos, callback) {
|
||||
var index = 0;
|
||||
var i, prevout, coin;
|
||||
|
||||
this._req('utxos', function(err, payload) {
|
||||
this.request('utxos', function(err, payload) {
|
||||
if (err)
|
||||
return callback(err);
|
||||
|
||||
@ -904,7 +838,7 @@ Peer.prototype._handleGetBlocks = function _handleGetBlocks(payload) {
|
||||
function done(err) {
|
||||
if (err)
|
||||
return self.emit('error', err);
|
||||
self.write(self.framer.inv(blocks));
|
||||
self.sendInv(blocks);
|
||||
}
|
||||
|
||||
this.chain.findLocator(payload.locator, function(err, tip) {
|
||||
@ -974,7 +908,7 @@ Peer.prototype._handleVersion = function handleVersion(payload) {
|
||||
|
||||
if (this.options.witness) {
|
||||
if (!(services & constants.services.WITNESS)) {
|
||||
this._req('havewitness', function(err) {
|
||||
this.request('havewitness', function(err) {
|
||||
if (err) {
|
||||
self._error('Peer does not support segregated witness.');
|
||||
self.setMisbehavior(100);
|
||||
@ -1015,7 +949,7 @@ Peer.prototype._handleMempool = function _handleMempool() {
|
||||
|
||||
bcoin.debug('Sending mempool snapshot to %s.', self.host);
|
||||
|
||||
self.write(self.framer.inv(items));
|
||||
self.sendInv(items);
|
||||
});
|
||||
};
|
||||
|
||||
@ -1032,8 +966,8 @@ Peer.prototype._handleGetData = function handleGetData(items) {
|
||||
item = items[i];
|
||||
|
||||
hash = item.hash;
|
||||
entry = this._broadcast.map[hash];
|
||||
isWitness = item.type & constants.WITNESS_MASK;
|
||||
entry = this.pool.inv.map[hash];
|
||||
isWitness = (item.type & constants.WITNESS_MASK) !== 0;
|
||||
value = null;
|
||||
|
||||
if (!entry) {
|
||||
@ -1052,15 +986,10 @@ Peer.prototype._handleGetData = function handleGetData(items) {
|
||||
'Peer %s requested %s:%s as a %s packet.',
|
||||
this.host,
|
||||
entry.packetType,
|
||||
utils.revHex(entry.hash),
|
||||
utils.revHex(entry.key),
|
||||
isWitness ? 'witness' : 'normal');
|
||||
|
||||
if (isWitness)
|
||||
this.write(this.framer.packet(entry.packetType, entry.witnessValue));
|
||||
else
|
||||
this.write(this.framer.packet(entry.packetType, entry.value));
|
||||
|
||||
entry.e.emit('request');
|
||||
entry.sendTo(peer, isWitness);
|
||||
}
|
||||
|
||||
if (this.pool.options.selfish)
|
||||
@ -1068,7 +997,7 @@ Peer.prototype._handleGetData = function handleGetData(items) {
|
||||
|
||||
utils.forEachSerial(check, function(item, next) {
|
||||
var isWitness = item.type & constants.WITNESS_MASK;
|
||||
var type = item.type & ~constants.WITNESS_MASK;
|
||||
var type = (item.type & ~constants.WITNESS_MASK) !== 0;
|
||||
var hash = item.hash;
|
||||
var i, tx, data;
|
||||
|
||||
@ -1123,10 +1052,10 @@ Peer.prototype._handleGetData = function handleGetData(items) {
|
||||
self.write(self.framer.packet('block', data));
|
||||
|
||||
if (hash === self.hashContinue) {
|
||||
self.write(self.framer.inv([{
|
||||
self.sendInv({
|
||||
type: constants.inv.BLOCK,
|
||||
hash: self.chain.tip.hash
|
||||
}]));
|
||||
});
|
||||
self.hashContinue = null;
|
||||
}
|
||||
|
||||
@ -1168,10 +1097,10 @@ Peer.prototype._handleGetData = function handleGetData(items) {
|
||||
}
|
||||
|
||||
if (hash === self.hashContinue) {
|
||||
self.write(self.framer.inv([{
|
||||
self.sendInv({
|
||||
type: constants.inv.BLOCK,
|
||||
hash: self.chain.tip.hash
|
||||
}]));
|
||||
});
|
||||
self.hashContinue = null;
|
||||
}
|
||||
|
||||
@ -1324,12 +1253,12 @@ Peer.prototype._handleReject = function handleReject(payload) {
|
||||
return;
|
||||
|
||||
hash = payload.data;
|
||||
entry = this._broadcast.map[hash];
|
||||
entry = this.pool.inv.map[hash];
|
||||
|
||||
if (!entry)
|
||||
return;
|
||||
|
||||
entry.e.emit('reject', payload);
|
||||
entry.reject(this);
|
||||
};
|
||||
|
||||
Peer.prototype._handleAlert = function handleAlert(details) {
|
||||
@ -1442,6 +1371,14 @@ Peer.prototype.sendReject = function sendReject(obj, code, reason, score) {
|
||||
return this.pool.reject(this, obj, code, reason, score);
|
||||
};
|
||||
|
||||
/*
|
||||
* Helpers
|
||||
*/
|
||||
|
||||
function compare(a, b) {
|
||||
return a.id - b.id;
|
||||
}
|
||||
|
||||
/*
|
||||
* Expose
|
||||
*/
|
||||
|
||||
@ -113,6 +113,7 @@ function Pool(options) {
|
||||
this.loaded = false;
|
||||
this.size = options.size || 8;
|
||||
this.connected = false;
|
||||
this.uid = 0;
|
||||
|
||||
this.syncing = false;
|
||||
this.synced = false;
|
||||
@ -145,7 +146,9 @@ function Pool(options) {
|
||||
// All peers
|
||||
all: [],
|
||||
// Misbehaving hosts
|
||||
misbehaving: {}
|
||||
misbehaving: {},
|
||||
// Map of hosts
|
||||
map: {}
|
||||
};
|
||||
|
||||
this.block = {
|
||||
@ -184,7 +187,9 @@ function Pool(options) {
|
||||
// Currently broadcasted objects
|
||||
this.inv = {
|
||||
list: [],
|
||||
timeout: options.invTimeout || 60000
|
||||
map: {},
|
||||
timeout: options.invTimeout || 60000,
|
||||
interval: options.invInterval || 3000
|
||||
};
|
||||
|
||||
function done(err) {
|
||||
@ -232,7 +237,7 @@ Pool.prototype.connect = function connect() {
|
||||
if (this.options.broadcast) {
|
||||
if (this.mempool) {
|
||||
this.mempool.on('tx', function(tx) {
|
||||
self.broadcast(tx);
|
||||
self.announce(tx);
|
||||
});
|
||||
}
|
||||
|
||||
@ -242,11 +247,13 @@ Pool.prototype.connect = function connect() {
|
||||
// miner sends us an invalid competing
|
||||
// chain that we can't connect and
|
||||
// verify yet.
|
||||
this.chain.on('block', function(block) {
|
||||
if (!self.synced)
|
||||
return;
|
||||
self.broadcast(block);
|
||||
});
|
||||
if (!this.options.spv) {
|
||||
this.chain.on('block', function(block) {
|
||||
if (!self.synced)
|
||||
return;
|
||||
self.announce(block);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (this.originalSeeds.length > 0) {
|
||||
@ -524,6 +531,7 @@ Pool.prototype._addLoader = function _addLoader() {
|
||||
|
||||
this.peers.load = peer;
|
||||
this.peers.all.push(peer);
|
||||
this.peers.map[peer.host] = peer;
|
||||
|
||||
peer.once('close', function() {
|
||||
self._stopInterval();
|
||||
@ -1010,6 +1018,10 @@ Pool.prototype._createPeer = function _createPeer(options) {
|
||||
self.emit('version', version, peer);
|
||||
});
|
||||
|
||||
peer.on('ack', function() {
|
||||
peer.sendInv(self.inv.list);
|
||||
});
|
||||
|
||||
return peer;
|
||||
};
|
||||
|
||||
@ -1070,6 +1082,7 @@ Pool.prototype._addLeech = function _addLeech(socket) {
|
||||
|
||||
this.peers.leeches.push(peer);
|
||||
this.peers.all.push(peer);
|
||||
this.peers.map[peer.host] = peer;
|
||||
|
||||
peer.once('close', function() {
|
||||
self._removePeer(peer);
|
||||
@ -1080,20 +1093,6 @@ Pool.prototype._addLeech = function _addLeech(socket) {
|
||||
return;
|
||||
|
||||
peer.updateWatch();
|
||||
|
||||
self.inv.list.forEach(function(entry) {
|
||||
var result = peer.broadcast(entry.msg);
|
||||
if (!result)
|
||||
return;
|
||||
|
||||
result[0].once('request', function() {
|
||||
entry.e.emit('ack', peer);
|
||||
});
|
||||
|
||||
result[0].once('reject', function(payload) {
|
||||
entry.e.emit('reject', payload, peer);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
peer.on('merkleblock', function(block) {
|
||||
@ -1151,6 +1150,7 @@ Pool.prototype._addPeer = function _addPeer() {
|
||||
|
||||
this.peers.pending.push(peer);
|
||||
this.peers.all.push(peer);
|
||||
this.peers.map[peer.host] = peer;
|
||||
|
||||
peer.once('close', function() {
|
||||
self._removePeer(peer);
|
||||
@ -1165,27 +1165,10 @@ Pool.prototype._addPeer = function _addPeer() {
|
||||
if (self.destroyed)
|
||||
return;
|
||||
|
||||
i = self.peers.pending.indexOf(peer);
|
||||
if (i !== -1) {
|
||||
self.peers.pending.splice(i, 1);
|
||||
self.peers.regular.push(peer);
|
||||
}
|
||||
if (utils.binaryRemove(self.peers.pending, peer, compare))
|
||||
utils.binaryInsert(self.peers.regular, peer, compare);
|
||||
|
||||
peer.updateWatch();
|
||||
|
||||
self.inv.list.forEach(function(entry) {
|
||||
var result = peer.broadcast(entry.msg);
|
||||
if (!result)
|
||||
return;
|
||||
|
||||
result[0].once('request', function() {
|
||||
entry.e.emit('ack', peer);
|
||||
});
|
||||
|
||||
result[0].once('reject', function(payload) {
|
||||
entry.e.emit('reject', payload, peer);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
peer.on('merkleblock', function(block) {
|
||||
@ -1245,21 +1228,11 @@ Pool.prototype.bestPeer = function bestPeer() {
|
||||
};
|
||||
|
||||
Pool.prototype._removePeer = function _removePeer(peer) {
|
||||
var i = this.peers.pending.indexOf(peer);
|
||||
if (i !== -1)
|
||||
this.peers.pending.splice(i, 1);
|
||||
|
||||
i = this.peers.regular.indexOf(peer);
|
||||
if (i !== -1)
|
||||
this.peers.regular.splice(i, 1);
|
||||
|
||||
i = this.peers.leeches.indexOf(peer);
|
||||
if (i !== -1)
|
||||
this.peers.leeches.splice(i, 1);
|
||||
|
||||
i = this.peers.all.indexOf(peer);
|
||||
if (i !== -1)
|
||||
this.peers.all.splice(i, 1);
|
||||
utils.binaryRemove(this.peers.pending, peer);
|
||||
utils.binaryRemove(this.peers.regular, peer);
|
||||
utils.binaryRemove(this.peers.leeches, peer);
|
||||
utils.binaryRemove(this.peers.all, peer);
|
||||
delete this.peers.map[peer.host];
|
||||
|
||||
if (this.peers.load === peer) {
|
||||
Object.keys(this.request.map).forEach(function(hash) {
|
||||
@ -1867,51 +1840,44 @@ Pool.prototype.sendTX = function sendTX(tx, callback) {
|
||||
* @param {TX|Block} msg
|
||||
* @param {Function} callback - Returns [Error]. Executes on request, reject,
|
||||
* or timeout.
|
||||
* @returns {BroadcastItem}
|
||||
*/
|
||||
|
||||
Pool.prototype.broadcast = function broadcast(msg, callback) {
|
||||
var self = this;
|
||||
var e = new EventEmitter();
|
||||
var entry;
|
||||
var hash = msg.hash('hex');
|
||||
var item = this.inv.map[hash];
|
||||
|
||||
callback = utils.once(callback);
|
||||
if (item) {
|
||||
item.refresh(msg);
|
||||
item.addCallback(callback);
|
||||
return item;
|
||||
}
|
||||
|
||||
if (msg.mutable)
|
||||
msg = msg.toTX();
|
||||
item = new BroadcastItem(this, msg, callback);
|
||||
|
||||
entry = {
|
||||
msg: msg,
|
||||
e: e,
|
||||
timer: setTimeout(function() {
|
||||
var i = self.inv.list.indexOf(entry);
|
||||
if (i !== -1)
|
||||
self.inv.list.splice(i, 1);
|
||||
callback(new Error('Timed out.'));
|
||||
}, this.inv.timeout)
|
||||
// return item.start();
|
||||
return item;
|
||||
};
|
||||
|
||||
/**
|
||||
* Announce an item by sending an inv to all
|
||||
* peers. This does not add it to the broadcast
|
||||
* queue.
|
||||
* @param {TX|Block} tx
|
||||
*/
|
||||
|
||||
Pool.prototype.announce = function announce(msg) {
|
||||
var i, msg;
|
||||
|
||||
msg = {
|
||||
type: (msg instanceof bcoin.tx)
|
||||
? constants.inv.TX
|
||||
: constants.inv.BLOCK,
|
||||
hash: msg.hash()
|
||||
};
|
||||
|
||||
this.inv.list.push(entry);
|
||||
|
||||
this.peers.all.forEach(function(peer) {
|
||||
var result = peer.broadcast(msg);
|
||||
if (!result)
|
||||
return;
|
||||
|
||||
result[0].once('request', function() {
|
||||
e.emit('ack', peer);
|
||||
// Give them a chance to send a reject.
|
||||
setTimeout(function() {
|
||||
callback();
|
||||
}, 100);
|
||||
});
|
||||
|
||||
result[0].once('reject', function(payload) {
|
||||
e.emit('reject', payload, peer);
|
||||
callback(new Error('TX was rejected: ' + payload.reason));
|
||||
});
|
||||
});
|
||||
|
||||
return e;
|
||||
for (i = 0; i < this.peers.all.length; i++)
|
||||
this.peers.all[i].sendInv(msg);
|
||||
};
|
||||
|
||||
/**
|
||||
@ -1965,18 +1931,12 @@ Pool.prototype.destroy = function destroy(callback) {
|
||||
*/
|
||||
|
||||
Pool.prototype.getPeer = function getPeer(addr) {
|
||||
var i, peer;
|
||||
|
||||
if (!addr)
|
||||
return;
|
||||
|
||||
addr = utils.parseHost(addr);
|
||||
|
||||
for (i = 0; i < this.peers.all.length; i++) {
|
||||
peer = this.peers.all[i];
|
||||
if (peer.host === addr.host)
|
||||
return peer;
|
||||
}
|
||||
return this.peers.map[addr.host];
|
||||
};
|
||||
|
||||
/**
|
||||
@ -2258,12 +2218,16 @@ Pool.prototype.reject = function reject(peer, obj, code, reason, score) {
|
||||
*/
|
||||
|
||||
function LoadRequest(pool, peer, type, hash, callback) {
|
||||
if (!(this instanceof LoadRequest))
|
||||
return new LoadRequest(pool, peer, type, hash, callback);
|
||||
|
||||
this.pool = pool;
|
||||
this.peer = peer;
|
||||
this.type = type;
|
||||
this.hash = hash;
|
||||
this.callback = [];
|
||||
this.active = false;
|
||||
this.id = this.pool.uid++;
|
||||
|
||||
if (callback)
|
||||
this.callback.push(callback);
|
||||
@ -2312,15 +2276,10 @@ LoadRequest.prototype.finish = function finish() {
|
||||
}
|
||||
}
|
||||
|
||||
if (this.type === this.pool.tx.type) {
|
||||
index = this.peer.queue.tx.indexOf(this);
|
||||
if (index !== -1)
|
||||
this.peer.queue.tx.splice(index, 1);
|
||||
} else {
|
||||
index = this.peer.queue.block.indexOf(this);
|
||||
if (index !== -1)
|
||||
this.peer.queue.block.splice(index, 1);
|
||||
}
|
||||
if (this.type === this.pool.tx.type)
|
||||
utils.binaryRemove(this.peer.queue.tx, this, compare);
|
||||
else
|
||||
utils.binaryRemove(this.peer.queue.block, this, compare);
|
||||
|
||||
this.peer.removeListener('close', this._finish);
|
||||
|
||||
@ -2330,6 +2289,176 @@ LoadRequest.prototype.finish = function finish() {
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Represents an item that is broadcasted via an inv/getdata cycle.
|
||||
* @exports BroadcastItem
|
||||
* @constructor
|
||||
* @private
|
||||
* @param {Pool} pool
|
||||
* @param {TX|Block} item
|
||||
* @param {Function?} callback
|
||||
* @emits BroadcastItem#ack
|
||||
* @emits BroadcastItem#reject
|
||||
* @emits BroadcastItem#timeout
|
||||
*/
|
||||
|
||||
function BroadcastItem(pool, item, callback) {
|
||||
if (!(this instanceof BroadcastItem))
|
||||
return new BroadcastItem(pool, item);
|
||||
|
||||
if (item instanceof bcoin.tx) {
|
||||
if (item.mutable)
|
||||
item = item.toTX();
|
||||
}
|
||||
|
||||
this.pool = pool;
|
||||
this.callback = [];
|
||||
|
||||
this.id = this.pool.uid++;
|
||||
this.key = item.hash('hex');
|
||||
this.type = (item instanceof bcoin.tx)
|
||||
? constants.inv.TX
|
||||
: constants.inv.BLOCK;
|
||||
// this.msg = item;
|
||||
this.hash = item.hash();
|
||||
this.normalValue = item.renderNormal();
|
||||
this.witnessValue = item.render();
|
||||
|
||||
// INV does not set the witness
|
||||
// mask (only GETDATA does this).
|
||||
assert((this.type & constants.WITNESS_MASK) === 0);
|
||||
|
||||
this.addCallback(callback);
|
||||
|
||||
this.start(item);
|
||||
};
|
||||
|
||||
utils.inherits(BroadcastItem, EventEmitter);
|
||||
|
||||
/**
|
||||
* Add a callback to be executed on ack, timeout, or reject.
|
||||
* @param {
|
||||
*/
|
||||
|
||||
BroadcastItem.prototype.addCallback = function addCallback(callback) {
|
||||
if (callback)
|
||||
this.callback.push(callback);
|
||||
};
|
||||
|
||||
/**
|
||||
* Start the broadcast.
|
||||
* @param {TX|Block} item
|
||||
*/
|
||||
|
||||
BroadcastItem.prototype.start = function start(item) {
|
||||
var self = this;
|
||||
var i;
|
||||
|
||||
assert(!this.timeout, 'Already started.');
|
||||
assert(!this.pool.inv.map[this.key], 'Already started.');
|
||||
|
||||
this.pool.inv.map[this.key] = this;
|
||||
utils.binaryInsert(this.pool.inv.list, this, compare);
|
||||
|
||||
this.refresh(item);
|
||||
|
||||
return this;
|
||||
};
|
||||
|
||||
/**
|
||||
* Refresh the timeout on the broadcast.
|
||||
*/
|
||||
|
||||
BroadcastItem.prototype.refresh = function refresh(item) {
|
||||
if (this.timeout) {
|
||||
clearTimeout(this.timeout);
|
||||
this.timeout = null;
|
||||
}
|
||||
|
||||
this.timeout = setTimeout(function() {
|
||||
self.emit('timeout');
|
||||
self.finish(new Error('Timed out.'));
|
||||
}, this.pool.inv.timeout);
|
||||
|
||||
for (i = 0; i < this.pool.peers.all.length; i++) {
|
||||
if (this.pool.peers.all[i].isWatched(item))
|
||||
this.pool.peers.all[i].sendInv(this);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Finish the broadcast, potentially with an error.
|
||||
* @param {Error?} err
|
||||
*/
|
||||
|
||||
BroadcastItem.prototype.finish = function finish(err) {
|
||||
var i;
|
||||
|
||||
assert(this.timeout, 'Already finished.');
|
||||
assert(this.pool.inv.map[this.key], 'Already finished.');
|
||||
|
||||
clearInterval(this.timeout);
|
||||
this.timeout = null;
|
||||
|
||||
delete this.pool.inv.map[this.key];
|
||||
utils.binaryRemove(this.pool.inv.list, this, compare);
|
||||
|
||||
for (i = 0; i < this.callback.length; i++)
|
||||
this.callback[i](err);
|
||||
|
||||
this.callback.length = 0;
|
||||
};
|
||||
|
||||
/**
|
||||
* Send the item to a peer.
|
||||
* @param {Peer} peer
|
||||
* @param {Boolean} witness - Whether to use the witness serialization.
|
||||
*/
|
||||
|
||||
BroadcastItem.prototype.sendTo = function sendTo(peer, witness) {
|
||||
var self = this;
|
||||
var value = witness ? this.witnessValue : this.normalValue;
|
||||
var packetType = this.type === constants.inv.TX ? 'tx' : 'block';
|
||||
var i;
|
||||
|
||||
peer.write(peer.framer.packet(packetType, value));
|
||||
|
||||
setTimeout(function() {
|
||||
self.emit('ack', peer);
|
||||
|
||||
for (i = 0; i < this.callback.length; i++)
|
||||
self.callback[i]();
|
||||
|
||||
self.callback.length = 0;
|
||||
}, 1000);
|
||||
};
|
||||
|
||||
/**
|
||||
* Handle a reject from a peer.
|
||||
* @param {Peer} peer
|
||||
*/
|
||||
|
||||
BroadcastItem.prototype.reject = function reject(peer) {
|
||||
var i, err;
|
||||
|
||||
this.emit('reject', peer);
|
||||
|
||||
err = new Error('Rejected by ' + peer.host);
|
||||
|
||||
for (i = 0; i < this.callback.length; i++)
|
||||
this.callback[i](err);
|
||||
|
||||
this.callback.length = 0;
|
||||
};
|
||||
|
||||
/*
|
||||
* Helpers
|
||||
*/
|
||||
|
||||
function compare(a, b) {
|
||||
return a.id - b.id;
|
||||
}
|
||||
|
||||
/*
|
||||
* Expose
|
||||
*/
|
||||
|
||||
@ -60,8 +60,7 @@ TimeData.prototype.add = function add(host, time) {
|
||||
|
||||
this.known[host] = sample;
|
||||
|
||||
i = utils.binarySearch(this.samples, sample, true, compare);
|
||||
this.samples.splice(i + 1, 0, sample);
|
||||
utils.binaryInsert(this.samples, sample, compare);
|
||||
|
||||
bcoin.debug('Added time data: samples=%d, offset=%d (%d minutes)',
|
||||
this.samples.length, sample, sample / 60 | 0);
|
||||
|
||||
@ -47,14 +47,6 @@
|
||||
* @global
|
||||
*/
|
||||
|
||||
/**
|
||||
* @typedef {EventEmitter} BroadcastPromise
|
||||
* @emits BroadcastPromise#ack
|
||||
* @emits BroadcastPromise#timeout
|
||||
* @emits BroadcastPromise#reject
|
||||
* @global
|
||||
*/
|
||||
|
||||
/**
|
||||
* @typedef {Object} ParsedURI
|
||||
* @property {Base58Address} address
|
||||
|
||||
@ -831,6 +831,13 @@ utils.parseHost = function parseHost(addr) {
|
||||
if (typeof addr === 'object')
|
||||
return addr;
|
||||
|
||||
if (!/[\[:]/.test(addr)) {
|
||||
return {
|
||||
host: addr,
|
||||
port: 0
|
||||
};
|
||||
}
|
||||
|
||||
if (addr.indexOf(']') !== -1)
|
||||
parts = addr.split(/\]:?/);
|
||||
else
|
||||
@ -2417,6 +2424,36 @@ utils.binarySearch = function binarySearch(items, key, insert, compare) {
|
||||
return start - 1;
|
||||
};
|
||||
|
||||
/**
|
||||
* Perform a binary insert on a sorted array.
|
||||
* @param {Array} items
|
||||
* @param {Object} item
|
||||
* @param {Function?} compare
|
||||
* @returns {Number} Length.
|
||||
*/
|
||||
|
||||
utils.binaryInsert = function binaryInsert(items, item, compare) {
|
||||
var i = utils.binarySearch(items, item, true, compare);
|
||||
items.splice(i + 1, 0, item);
|
||||
return items.length;
|
||||
};
|
||||
|
||||
/**
|
||||
* Perform a binary removal on a sorted array.
|
||||
* @param {Array} items
|
||||
* @param {Object} item
|
||||
* @param {Function?} compare
|
||||
* @returns {Number} Length.
|
||||
*/
|
||||
|
||||
utils.binaryRemove = function binaryRemove(items, item, compare) {
|
||||
var i = utils.binarySearch(items, item, false, compare);
|
||||
if (i === -1)
|
||||
return false;
|
||||
items.splice(i, 1);
|
||||
return true;
|
||||
};
|
||||
|
||||
/**
|
||||
* Reference to the global object.
|
||||
* @const {Object}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user