diff --git a/lib/bcoin/peer.js b/lib/bcoin/peer.js index c17953e0..2d1c166d 100644 --- a/lib/bcoin/peer.js +++ b/lib/bcoin/peer.js @@ -77,9 +77,8 @@ function Peer(pool, options) { this._request = { timeout: this.options.requestTimeout || 10000, - cont: {}, skip: {}, - queue: [] + queue: {} }; this._ping = { @@ -316,13 +315,17 @@ Peer.prototype.updateWatch = function updateWatch() { if (!this.pool.options.spv) return; - if (this.ack) - this._write(this.framer.filterLoad(this.bloom, 'none')); + if (this.ack) { + this._write(this.framer.filterLoad({ + filter: this.bloom.toBuffer(), + n: this.bloom.n, + tweak: this.bloom.tweak, + update: 'none' + })); + } }; Peer.prototype.destroy = function destroy() { - var i; - if (this.destroyed) return; @@ -340,8 +343,13 @@ Peer.prototype.destroy = function destroy() { clearInterval(this._ping.timer); this._ping.timer = null; - for (i = 0; i < this._request.queue.length; i++) - clearTimeout(this._request.queue[i].timer); + Object.keys(this._request).forEach(function(cmd) { + var queue = this._request[cmd]; + var i; + + for (i = 0; i < queue.length; i++) + clearTimeout(queue[i].timer); + }, this); }; Peer.prototype._write = function write(chunk) { @@ -375,9 +383,16 @@ Peer.prototype._req = function _req(cmd, cb) { cmd: cmd, cb: cb, ontimeout: function() { - var i = self._request.queue.indexOf(entry); + var queue = self._request.queue[cmd]; + var i; + + if (!queue) + return; + + i = queue.indexOf(entry); + if (i !== -1) { - self._request.queue.splice(i, 1); + queue.splice(i, 1); cb(new Error('Timed out: ' + cmd), null); } }, @@ -386,35 +401,35 @@ Peer.prototype._req = function _req(cmd, cb) { entry.timer = setTimeout(entry.ontimeout, this._request.timeout); - this._request.queue.push(entry); + if (!this._request.queue[cmd]) + this._request.queue[cmd] = []; + + this._request.queue[cmd].push(entry); return entry; }; Peer.prototype._res = function _res(cmd, payload) { - var i, entry, res; + var queue = this._request.queue[cmd]; + var entry, res; - for (i = 0; i < this._request.queue.length; i++) { - entry = this._request.queue[i]; + if (!queue) + return false; - if (!entry || (entry.cmd && entry.cmd !== cmd)) - return false; + entry = queue[0]; - res = entry.cb(null, payload, cmd); + if (!entry) + return false; - if (res === this._request.cont) { - assert(!entry.cmd); + res = entry.cb(null, payload, cmd); - // Restart timer - if (!this.destroyed) - entry.timer = setTimeout(entry.ontimeout, this._request.timeout); - return true; - } else if (res !== this._request.skip) { - this._request.queue.shift(); - clearTimeout(entry.timer); - entry.timer = null; - return true; - } + if (res !== this._request.skip) { + queue.shift(); + if (queue.length === 0) + delete this._request.queue[cmd]; + clearTimeout(entry.timer); + entry.timer = null; + return true; } return false; @@ -543,13 +558,71 @@ Peer.prototype._handleUTXOs = function _handleUTXOs(payload) { return new bcoin.coin(coin); }); utils.debug('Received %d utxos from %s.', payload.coins.length, this.host); - this.emit('utxos', payload); + this._emit('utxos', payload); +}; + +Peer.prototype.getUTXOs = function getUTXOs(utxos, callback) { + var self = this; + var reqs = []; + var coins = []; + var i; + + for (i = 0; i < utxos.length; i += 15) + reqs.push(utxos.slice(i, i + 15)); + + utils.forEachSerial(reqs, function(utxos, next) { + self._getUTXOs(utxos, function(err, coin) { + if (err) + return next(err); + + coins = coins.concat(coin); + + next(); + }); + }, function(err) { + if (err) + return callback(err); + + return callback(null, coins); + }); +}; + +Peer.prototype._getUTXOs = function getUTXOs(utxos, callback) { + var index = 0; + var i, prevout, coin; + + this._req('utxos', function(err, payload) { + if (err) + return callback(err); + + for (i = 0; i < payload.hits.length; i++) { + if (payload.hits[i]) { + prevout = utxos[i]; + coin = payload.coins[index++]; + + if (!prevout || !coin) + return callback(new Error('Malformed utxos message.')); + + coin.hash = prevout.hash; + coin.index = prevout.index; + } + } + + return callback(null, payload.coins); + }); + + this._write(this.framer.getUTXOs({ + mempool: true, + prevout: utxos.map(function(item) { + return { hash: item[0], index: item[1] }; + }) + })); }; Peer.prototype._handleGetUTXOs = function _handleGetUTXOs(payload) { var self = this; var coins = []; - var notfound = []; + var hits = []; if (this.pool.options.selfish) return; @@ -577,6 +650,9 @@ Peer.prototype._handleGetUTXOs = function _handleGetUTXOs(payload) { self.mempool.isSpent(hash, index, callback); } + if (payload.prevout.length > 15) + return; + utils.forEachSerial(payload.prevout, function(prevout, next, i) { var hash = prevout.hash; var index = prevout.index; @@ -586,6 +662,7 @@ Peer.prototype._handleGetUTXOs = function _handleGetUTXOs(payload) { return next(err); if (coin) { + hits.push(1); coins.push(coin); return next(); } @@ -595,7 +672,7 @@ Peer.prototype._handleGetUTXOs = function _handleGetUTXOs(payload) { return next(err); if (result) { - notfound.push(i); + hits.push(0); return next(); } @@ -604,10 +681,11 @@ Peer.prototype._handleGetUTXOs = function _handleGetUTXOs(payload) { return next(err); if (!coin) { - notfound.push(i); + hits.push(0); return next(); } + hits.push(1); coins.push(coin); next(); @@ -621,7 +699,7 @@ Peer.prototype._handleGetUTXOs = function _handleGetUTXOs(payload) { self._write(self.framer.UTXOs({ height: self.chain.height, tip: self.chain.tip.hash, - notfound: notfound, + hits: hits, coins: coins })); }); @@ -778,7 +856,7 @@ Peer.prototype._handleVersion = function handleVersion(payload) { if (this.options.witness) { if (!payload.witness) { - this._req('havewitness', function(err, payload) { + this._req('havewitness', function(err) { if (err) { self._error('Peer does not support segregated witness.'); self.setMisbehavior(100); @@ -1151,30 +1229,30 @@ Peer.prototype._handleAlert = function handleAlert(details) { this.emit('alert', details); }; -Peer.prototype.getHeaders = function getHeaders(hashes, stop) { +Peer.prototype.getHeaders = function getHeaders(locator, stop) { utils.debug( 'Requesting headers packet from %s with getheaders', this.host); utils.debug('Height: %s, Hash: %s, Stop: %s', - this.chain._getCachedHeight(hashes[0]), - hashes ? utils.revHex(hashes[0]) : 0, + locator && locator.length ? this.chain._getCachedHeight(locator[0]) : null, + locator && locator.length ? utils.revHex(locator[0]) : 0, stop ? utils.revHex(stop) : 0); - this._write(this.framer.getHeaders(hashes, stop)); + this._write(this.framer.getHeaders({ locator: locator, stop: stop })); }; -Peer.prototype.getBlocks = function getBlocks(hashes, stop) { +Peer.prototype.getBlocks = function getBlocks(locator, stop) { utils.debug( 'Requesting inv packet from %s with getblocks', this.host); utils.debug('Height: %s, Hash: %s, Stop: %s', - this.chain._getCachedHeight(hashes[0]), - hashes ? utils.revHex(hashes[0]) : 0, + locator && locator.length ? this.chain._getCachedHeight(locator[0]) : null, + locator && locator.length ? utils.revHex(locator[0]) : 0, stop ? utils.revHex(stop) : 0); - this._write(this.framer.getBlocks(hashes, stop)); + this._write(this.framer.getBlocks({ locator: locator, stop: stop })); }; Peer.prototype.getMempool = function getMempool() { diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index 3c9501aa..c41d6d22 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -1730,6 +1730,39 @@ Pool.prototype.getPeer = function getPeer(addr) { } }; +Pool.prototype.getUTXOs = function getUTXOs(utxos, callback) { + var peer = this.peers.load || this.peers.regular[0]; + + if (!peer) + return utils.asyncify(callback)(new Error('No peer available.')); + + peer.getUTXOs(utxos, callback); +}; + +Pool.prototype.fillTX = function fillTX(tx, callback) { + var utxos = []; + var reqs = []; + var i, input; + + for (i = 0; i < tx.inputs.length; i++) { + input = tx.inputs[i]; + if (!input.coin) + utxos.push([input.prevout.hash, input.prevout.index]); + } + + if (utxos.length === 0) + return utils.asyncify(callback)(null, tx); + + this.getUTXOs(utxos, function(err, coins) { + if (err) + return callback(err); + + tx.fillCoins(coins); + + return callback(null, tx); + }); +}; + Pool.prototype.getSeed = function getSeed(priority) { var addr; diff --git a/lib/bcoin/protocol/framer.js b/lib/bcoin/protocol/framer.js index d86ca0ee..3980e214 100644 --- a/lib/bcoin/protocol/framer.js +++ b/lib/bcoin/protocol/framer.js @@ -11,7 +11,6 @@ var utils = require('../utils'); var assert = utils.assert; var BufferWriter = require('../writer'); var DUMMY = new Buffer([]); -var bn = require('bn.js'); /** * Framer @@ -141,16 +140,16 @@ Framer.prototype.pong = function pong(data) { return this.packet('pong', Framer.pong(data)); }; -Framer.prototype.filterLoad = function filterLoad(bloom, update) { - return this.packet('filterload', Framer.filterLoad(bloom, update)); +Framer.prototype.filterLoad = function filterLoad(data) { + return this.packet('filterload', Framer.filterLoad(data)); }; -Framer.prototype.getHeaders = function getHeaders(hashes, stop) { - return this.packet('getheaders', Framer.getHeaders(hashes, stop)); +Framer.prototype.getHeaders = function getHeaders(data) { + return this.packet('getheaders', Framer.getHeaders(data)); }; -Framer.prototype.getBlocks = function getBlocks(hashes, stop) { - return this.packet('getblocks', Framer.getBlocks(hashes, stop)); +Framer.prototype.getBlocks = function getBlocks(data) { + return this.packet('getblocks', Framer.getBlocks(data)); }; Framer.prototype.utxo = function _coin(coin) { @@ -192,38 +191,30 @@ Framer.prototype.addr = function addr(peers) { Framer.address = function address(data, full, writer) { var p = new BufferWriter(writer); - if (!data) - data = {}; + if (full) { + if (!data.ts) + p.writeU32(utils.now() - (process.uptime() | 0)); + else + p.writeU32(data.ts); + } - if (!data.ts) - data.ts = utils.now() - (process.uptime() | 0); - - if (!data.services) - data.services = 0; - - if (!data.port) - data.port = network.port; - - if (full) - p.writeU32(data.ts); - - p.writeU64(data.services); + p.writeU64(data.services || 0); if (data.ipv6) { - data.ipv6 = utils.ip2array(data.ipv6, 6); - p.writeBytes(data.ipv6); + p.writeBytes(utils.ip2array(data.ipv6, 6)); } else { - if (!data.ipv4) - data.ipv4 = new Buffer([0, 0, 0, 0]); - data.ipv4 = utils.ip2array(data.ipv4, 4); - // We don't have an ipv6, convert ipv4 to ipv4-mapped ipv6 address + // We don't have an ipv6 address: convert + // ipv4 to ipv4-mapped ipv6 address. p.writeU32BE(0x00000000); p.writeU32BE(0x00000000); p.writeU32BE(0x0000ffff); - p.writeBytes(data.ipv4); + if (data.ipv4) + p.writeBytes(utils.ip2array(data.ipv4, 4)); + else + p.writeU32BE(0x00000000); } - p.writeU16BE(data.port); + p.writeU16BE(data.port || network.port); if (!writer) p = p.render(); @@ -233,29 +224,23 @@ Framer.address = function address(data, full, writer) { Framer.version = function version(options, writer) { var p = new BufferWriter(writer); + var agent = options.agent || constants.userAgent; + var remote = options.remote || {}; + var local = options.local || {}; - if (!options.agent) - options.agent = new Buffer(constants.userAgent, 'ascii'); + if (typeof agent === 'string') + agent = new Buffer(agent, 'ascii'); - if (!options) - options = {}; - - if (!options.remote) - options.remote = {}; - - if (!options.local) - options.local = {}; - - if (options.local.services == null) - options.local.services = constants.localServices; + if (local.services == null) + local.services = constants.localServices; p.write32(constants.version); p.writeU64(constants.localServices); p.write64(utils.now()); - Framer.address(options.remote, false, p); - Framer.address(options.local, false, p); + Framer.address(remote, false, p); + Framer.address(local, false, p); p.writeU64(utils.nonce()); - p.writeVarString(options.agent); + p.writeVarString(agent); p.write32(options.height || 0); p.writeU8(options.relay ? 1 : 0); @@ -317,30 +302,18 @@ Framer.pong = function pong(data) { return p; }; -Framer.filterLoad = function filterLoad(bloom, update, writer) { +Framer.filterLoad = function filterLoad(data, writer) { var p = new BufferWriter(writer); - var filter, n, tweak; - - if (bloom instanceof bcoin.bloom) { - filter = bloom.toBuffer(); - n = bloom.n; - tweak = bloom.tweak; - } else { - writer = update; - update = bloom.update; - filter = bloom.filter; - n = bloom.n; - tweak = bloom.tweak; - } + var update = data.update; if (typeof update === 'string') update = constants.filterFlags[update]; assert(update != null, 'Bad filter flag.'); - p.writeVarBytes(filter); - p.writeU32(n); - p.writeU32(tweak); + p.writeVarBytes(data.filter); + p.writeU32(data.n); + p.writeU32(data.tweak); p.writeU8(update); if (!writer) @@ -349,33 +322,33 @@ Framer.filterLoad = function filterLoad(bloom, update, writer) { return p; }; -Framer.getHeaders = function getHeaders(locator, stop, writer) { - // NOTE: getheaders can have an empty locator. - return Framer._getBlocks(locator || [], stop, writer); +Framer.getHeaders = function getHeaders(data, writer) { + return Framer._getBlocks(data, writer, true); }; -Framer.getBlocks = function getBlocks(locator, stop, writer) { - return Framer._getBlocks(locator, stop, writer); +Framer.getBlocks = function getBlocks(data, writer) { + return Framer._getBlocks(data, writer, false); }; -Framer._getBlocks = function _getBlocks(locator, stop, writer) { - var p, i, version; - - if (locator.locator) { - writer = stop; - stop = locator.stop; - version = locator.version; - locator = locator.locator; - } +Framer._getBlocks = function _getBlocks(data, writer, headers) { + var version = data.version; + var locator = data.locator; + var stop = data.stop; + var p, i; if (!version) version = constants.version; + if (!locator) { + if (headers) + locator = []; + else + assert(false, 'getblocks requires a locator'); + } + if (!stop) stop = constants.zeroHash; - assert(locator, 'getblocks requires a locator'); - p = new BufferWriter(writer); p.writeU32(version); @@ -819,19 +792,10 @@ Framer.UTXOs = function UTXOs(data, writer) { var i, j, coin, height, index, map; if (!data.map) { - assert(data.notfound); - map = new bn(0); - j = -1; - for (i = 0; i < data.notfound.length; i++) { - index = data.notfound[i]; - while (++j < index) { - map.iushln(1); - map.iuorn(1); - } - map.iushln(1); - map.iuorn(0); - } - map = map.toBuffer('be'); + assert(data.hits); + map = new Buffer((data.hits.length + 7) / 8 | 0); + for (i = 0; i < data.hits.length; i++) + map[i / 8 | 0] |= +data.hits[i] << (i % 8); } else { map = data.map; } diff --git a/lib/bcoin/protocol/parser.js b/lib/bcoin/protocol/parser.js index a167b1c2..5e6e8d0c 100644 --- a/lib/bcoin/protocol/parser.js +++ b/lib/bcoin/protocol/parser.js @@ -287,7 +287,7 @@ Parser.parseGetUTXOs = function parseGetUTXOs(p) { Parser.parseUTXOs = function parseUTXOs(p) { var chainHeight, tip, map, count, coins; - var coin, version, height, i, notfound, ch, j; + var coin, version, height, i, hits, ch, j; p = new BufferReader(p); p.start(); @@ -297,15 +297,11 @@ Parser.parseUTXOs = function parseUTXOs(p) { map = p.readVarBytes(); count = p.readVarint(); coins = []; - notfound = []; + hits = []; for (i = 0; i < map.length; i++) { - ch = map[i]; - for (j = 0; j < 8; j++) { - if ((ch & 1) === 0) - notfound.push(i + j); - ch >>>= 1; - } + for (j = 0; j < 8; j++) + hits.push((map[i] >> j) & 1); } for (i = 0; i < count; i++) { @@ -326,7 +322,7 @@ Parser.parseUTXOs = function parseUTXOs(p) { tip: tip, map: map, coins: coins, - notfound: notfound, + hits: hits, _size: p.end() }; };