diff --git a/lib/bcoin.js b/lib/bcoin.js index 279fd79d..3d5fe6e7 100644 --- a/lib/bcoin.js +++ b/lib/bcoin.js @@ -7,5 +7,7 @@ bcoin.bloom = require('./bcoin/bloom'); bcoin.protocol = require('./bcoin/protocol'); bcoin.tx = require('./bcoin/tx'); bcoin.block = require('./bcoin/block'); +bcoin.chain = require('./bcoin/chain'); bcoin.wallet = require('./bcoin/wallet'); bcoin.peer = require('./bcoin/peer'); +bcoin.pool = require('./bcoin/pool'); diff --git a/lib/bcoin/block.js b/lib/bcoin/block.js index f1ccc64d..62ddcd39 100644 --- a/lib/bcoin/block.js +++ b/lib/bcoin/block.js @@ -14,14 +14,17 @@ function Block(data) { this.nonce = data.nonce; this._hash = null; + this._hexHash = null; } module.exports = Block; Block.prototype.hash = function hash(enc) { // Hash it - if (!this._hash) + if (!this._hash) { this._hash = utils.dsha256(this.abbr()); - return enc === 'hex' ? utils.toHex(this._hash) : this._hash; + this._hexHash = utils.toHex(this._hash); + } + return enc === 'hex' ? this._hexHash : this._hash; }; Block.prototype.abbr = function abbr() { @@ -36,7 +39,11 @@ Block.prototype.abbr = function abbr() { return res; }; +Block.prototype.verify = function verify() { + // TODO(indutny): verify nonce + return true; +}; + Block.prototype.render = function render(framer) { - console.log('here'); return []; }; diff --git a/lib/bcoin/chain.js b/lib/bcoin/chain.js new file mode 100644 index 00000000..95a8e653 --- /dev/null +++ b/lib/bcoin/chain.js @@ -0,0 +1,63 @@ +var bcoin = require('../bcoin'); +var constants = bcoin.protocol.constants; +var utils = bcoin.utils; + +function Chain() { + if (!(this instanceof Chain)) + return new Chain(); + + this.chain = []; + this.orphan = { + map: {}, + count: 0 + }; + this.map = {}; + this.last = null; + this.add(new bcoin.block(constants.genesis)); +} +module.exports = Chain; + +Chain.prototype.add = function add(block) { + var res = false; + do { + var hash = block.hash('hex'); + var prev = utils.toHex(block.prevBlock); + + // No need to revalidate orphans + if (!res && !block.verify()) + break; + + // Add orphan + if (this.last && prev !== this.last) { + if (!this.orphan.map[prev]) { + this.orphan.count++; + this.orphan.map[prev] = block; + } + break; + } + + this.map[hash] = block; + this.chain.push(block); + this.last = hash; + res = true; + + // We have orphan child for this block - add it to chain + if (this.orphan.map[hash]) { + block = this.orphan.map[hash]; + delete this.orphan.map[hash]; + this.orphan.count--; + continue; + } + + break; + } while (true); +}; + +Chain.prototype.getLast = function getLast() { + return this.chain[this.chain.length - 1]; +}; + +Chain.prototype.has = function hash(hash) { + hash = utils.toHex(hash); + return !!this.map[hash] && !!this.orphan.map[hash]; +}; diff --git a/lib/bcoin/peer.js b/lib/bcoin/peer.js index 64ce767f..821c1133 100644 --- a/lib/bcoin/peer.js +++ b/lib/bcoin/peer.js @@ -12,19 +12,21 @@ try { } catch (e) { } -function Peer(socket, options) { +function Peer(pool, socket, options) { if (!(this instanceof Peer)) - return new Peer(socket, options); + return new Peer(pool, socket, options); EventEmitter.call(this); + + this.pool = pool; this.socket = socket; this.parser = new bcoin.protocol.parser(); this.framer = new bcoin.protocol.framer(); - this.bloom = new bcoin.bloom(8 * 10 * 1024, - 10, - (Math.random() * 0xffffffff) | 0), + this.chain = this.pool.chain; + this.bloom = this.pool.bloom; this.version = null; this.destroyed = false; + this.ack = false; this.options = options || {}; this._broadcast = { @@ -33,7 +35,9 @@ function Peer(socket, options) { }; this._request = { - timeout: this.options.requestTimeout || 30000, + timeout: this.options.requestTimeout || 10000, + cont: {}, + skip: {}, queue: [] }; @@ -77,6 +81,7 @@ Peer.prototype._init = function init() { this._req('verack', function(err, payload) { if (err) return self._error(err); + self.ack = true; self.emit('ack'); }); }; @@ -106,16 +111,9 @@ Peer.prototype.broadcast = function broadcast(items) { this._write(this.framer.inv(items)); }; -Peer.prototype.watch = function watch(id) { - this.bloom.add(id); - this._write(this.framer.filterLoad(this.bloom, 'pubkeyOnly')); -}; - -Peer.prototype.loadBlocks = function loadBlocks() { - if (this.loadingBlocks) - return; - this.loadingBlocks = true; - this._write(this.framer.getBlocks([ constants.genesis ])); +Peer.prototype.updateWatch = function updateWatch() { + if (this.ack) + this._write(this.framer.filterLoad(this.bloom, 'pubkeyOnly')); }; Peer.prototype.destroy = function destroy() { @@ -157,41 +155,49 @@ Peer.prototype._req = function _req(cmd, cb) { cb: cb, ontimeout: function() { var i = self._request.queue.indexOf(entry); - if (i !== -1) - self.request.queue.splice(i, 1); - cb(new Error('Timed out'), null); + if (i !== -1) { + self._request.queue.splice(i, 1); + cb(new Error('Timed out: ' + cmd), null); + } }, timer: null }; entry.timer = setTimeout(entry.ontimeout, this._request.timeout) this._request.queue.push(entry); + + return entry; }; Peer.prototype._res = function _res(cmd, payload) { - var entry = this._request.queue[0]; - if (!entry || entry.cmd && entry.cmd !== cmd) - return; + for (var i = 0; i < this._request.queue.length; i++) { + var entry = this._request.queue[i]; + if (!entry || entry.cmd && entry.cmd !== cmd) + return false; - var res = entry.cb(null, payload, cmd); + var res = entry.cb(null, payload, cmd); - // If callback returns false - it hasn't finished processing responses - if (res === false) { - assert(!entry.cmd); + if (res === this._request.cont) { + assert(!entry.cmd); - // Restart timer - entry.timer = setTimeout(entry.ontimeout, this._request.timeout) - } else { - this._request.queue.shift(); - clearTimeout(entry.timer); - entry.timer = null; + // Restart timer + entry.timer = setTimeout(entry.ontimeout, this._request.timeout) + return true; + } else if (res !== this._request.skip) { + this._request.queue.shift(); + clearTimeout(entry.timer); + entry.timer = null; + return true; + } } + + return false; }; -Peer.prototype._getData = function _getData(items, cb) { +Peer.prototype.getData = function getData(items, cb) { var map = {}; var waiting = items.length; items.forEach(function(item) { - map[utils.toHex(item.hash)] = { + map[item.hash.join('.')] = { item: item, once: false, result: null @@ -199,13 +205,10 @@ Peer.prototype._getData = function _getData(items, cb) { }); var self = this; - function markEntry(hash, result) { - var entry = map[utils.toHex(hash)]; - if (!entry || entry.once) { - done(new Error('Invalid notfound entry hash')); + var entry = map[hash.join('.')]; + if (!entry || entry.once) return false; - } entry.once = true; entry.result = result; @@ -215,40 +218,38 @@ Peer.prototype._getData = function _getData(items, cb) { this._write(this.framer.getData(items)); // Process all incoming data, until all data is returned - this._req(null, function(err, payload, cmd) { - var ok = true; + var req = this._req(null, function(err, payload, cmd) { + if (err) + return done(err); + + var ok = false; if (cmd === 'notfound') { - ok = payload.every(function(item) { - return markEntry(item.hash, null); + ok = true; + payload.forEach(function(item) { + if (!markEntry(item.hash, null)) + ok = false; }); - } else if (cmd === 'tx') { - var tx = bcoin.tx(payload); - ok = markEntry(tx.hash(), b); - } else if (cmd === 'merkleblock') { - var b = bcoin.block(payload); - ok = markEntry(b.hash(), b); - } else if (cmd === 'block') { - var b = bcoin.block(payload); - ok = markEntry(b.hash(), b); - } else { - done(new Error('Unknown packet in reply to getdata: ' + cmd)); - return; + } else if (payload.hash) { + ok = markEntry(payload.hash(), payload); } if (!ok) - return; + return self._request.skip; if (waiting === 0) done(); else - return false; + return self._request.cont; }); + // Just for debugging purposes + req._getDataMap = map; + function done(err) { if (err) return cb(err); cb(null, items.map(function(item) { - return map[utils.toHex(item.hash)].result; + return map[item.hash.join('.')].result; })); } }; @@ -263,8 +264,22 @@ Peer.prototype._onPacket = function onPacket(packet) { return this._handleInv(payload); else if (cmd === 'getdata') return this._handleGetData(payload); - else - return this._res(cmd, payload); + else if (cmd === 'addr') + return this._handleAddr(payload); + else if (cmd === 'ping') + return this._handlePing(payload); + else if (cmd === 'pong') + return this._handlePong(payload); + + if (cmd === 'merkleblock' || cmd === 'block') + payload = bcoin.block(payload); + else if (cmd === 'tx') + payload = bcoin.tx(payload); + if (this._res(cmd, payload)) { + return; + } else if (cmd === 'tx') { + console.log('Omg', this.socket._handle.fd, cmd, payload.hash('hex')); + } }; Peer.prototype._handleVersion = function handleVersion(payload) { @@ -288,25 +303,50 @@ Peer.prototype._handleGetData = function handleGetData(items) { }, this); }; +Peer.prototype._handleAddr = function handleAddr(addr) { + // No-op for now +}; + +Peer.prototype._handlePing = function handlePing() { + // No-op for now +}; + +Peer.prototype._handlePong = function handlePong() { + // No-op for now +}; + Peer.prototype._handleInv = function handleInv(items) { - // Always request what advertised - var req = items.filter(function(item) { - return item.type === 'tx' || item.type === 'block'; + // Always request advertised TXs + var txs = items.filter(function(item) { + return item.type === 'tx'; }).map(function(item) { if (item.type === 'tx') return item; - if (item.type === 'block') - return { type: 'filtered', hash: item.hash }; }); + // Emit new blocks to schedule them between multiple peers + var blocks = items.filter(function(item) { + return item.type === 'block'; + }, this).map(function(item) { + return item.hash; + }); + this.emit('blocks', blocks); + + if (txs.length === 0) + return; + var self = this; - this._getData(req, function(err, data) { + this.getData(txs, function(err, data) { if (err) return self._error(err); - console.log(data.join(', ')); + + data.forEach(function(item) { + console.log(item.type, item.hash('hex')); + }); }); }; -Peer.prototype._handleMerkleBlock = function handleMerkleBlock(block) { - console.log(utils.toHex(block.prevBlock)); +Peer.prototype.loadBlocks = function loadBlocks(hash) { + console.log(this.socket._handle.fd, 'loading from: ', utils.toHex(hash), this.chain.chain.length, this.chain.orphan.count); + this._write(this.framer.getBlocks([ hash ])); }; diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js new file mode 100644 index 00000000..b5975d7f --- /dev/null +++ b/lib/bcoin/pool.js @@ -0,0 +1,153 @@ +var assert = require('assert'); +var bcoin = require('../bcoin'); +var utils = bcoin.utils; + +function Pool(options) { + if (!(this instanceof Pool)) + return new Pool(options); + + this.options = options || {}; + this.size = options.size || 16; + this.redundancy = options.redundancy || 3; + this.parallel = options.parallel || this.size; + this.pendingBlocks = {}; + this.chain = new bcoin.chain(); + this.bloom = new bcoin.bloom(8 * 10 * 1024, + 10, + (Math.random() * 0xffffffff) | 0), + this.peers = []; + this.pending = []; + + // Peers that are loading block ids + this.loadPeers = []; + this.createConnection = options.createConnection; + assert(this.createConnection); + + this._init(); +} +module.exports = Pool; + +Pool.prototype._init = function _init() { + for (var i = 0; i < this.size; i++) + this._addPeer(); +}; + +Pool.prototype._addPeer = function _addPeer() { + if (this.peers.length + this.pending.length >= this.size) + return; + + var socket = this.createConnection(); + var peer = bcoin.peer(this, socket, this.options.peer); + this.pending.push(peer); + + var load = false; + if (this.loadPeers.length < this.redundancy) { + this.loadPeers.push(peer); + load = true; + } + + // Create new peer on failure + var self = this; + peer.once('error', function(err) { + self._removePeer(peer); + self._addPeer(); + }); + + peer.once('ack', function() { + var i = self.pending.indexOf(peer); + if (i !== -1) { + self.pending.splice(i, 1); + self.peers.push(peer); + } + + peer.updateWatch(); + if (load) + peer.loadBlocks(self.chain.getLast().hash()); + }); + + // Split blocks and request them using multiple peers + peer.on('blocks', function(hashes) { + if (hashes.length === 0) + return; + self._requestBlocks(hashes); + if (load) + peer.loadBlocks(hashes[hashes.length - 1]); + }); +}; + +Pool.prototype._removePeer = function _removePeer(peer) { + var i = this.pending.indexOf(peer); + if (i !== -1) + this.pending.splice(i, 1); + + i = this.peers.indexOf(peer); + if (i !== -1) + this.peers.splice(i, 1); + + i = this.loadPeers.indexOf(peer); + if (i !== -1) + this.loadPeers.splice(i, 1); +}; + +Pool.prototype.watch = function watch(id) { + if (id) + this.bloom.add(id); + for (var i = 0; i < this.peers.length; i++) + this.peers[i].updateWatch(); +}; + +Pool.prototype._getPeers = function _getPeers() { + var res = []; + + if (this.peers.length <= this.redundancy) + return this.peers.slice(); + + for (var i = 0; i < this.redundancy; i++) { + var peer = this.peers[(Math.random() * this.peers.length) | 0]; + if (res.indexOf(peer) !== -1) + continue; + + res.push(peer); + } + + return res; +}; + +Pool.prototype._requestBlocks = function _requestBlocks(hashes, force) { + // Do not request blocks that either already in chain, or are + // already requested from some of the peers + hashes = hashes.filter(function(hash) { + return !this.chain.has(hash) && + (force || !this.pendingBlocks[utils.toHex(hash)]); + }, this).map(function(hash) { + this.pendingBlocks[utils.toHex(hash)] = true; + return { type: 'filtered', hash: hash }; + }, this); + + // Split blocks into chunks and send each chunk to some peers + var chunks = []; + var count = Math.ceil(hashes.length / this.parallel); + for (var i = 0; i < hashes.length; i += count) + chunks.push(hashes.slice(i, i + count)); + + var self = this; + chunks.forEach(function(chunk) { + var peers = self._getPeers(); + peers.forEach(function(peer) { + peer.getData(chunk, function(err, blocks) { + // Re-request blocks on failure + if (err) { + return self._requestBlocks(chunk.map(function(item) { + return item.hash; + }), true); + } + + // Add blocks to chain on success + blocks.forEach(function(block) { + delete self.pendingBlocks[block.hash('hex')]; + self.chain.add(block); + }); + }); + }); + }); +}; diff --git a/lib/bcoin/protocol/constants.js b/lib/bcoin/protocol/constants.js index 65c1e8fb..b4cbdbbf 100644 --- a/lib/bcoin/protocol/constants.js +++ b/lib/bcoin/protocol/constants.js @@ -4,8 +4,20 @@ var utils = bcoin.utils; exports.minVersion = 70001; exports.version = 70002; exports.magic = 0xd9b4bef9; -exports.genesis = utils.toArray( - '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f', 'hex'); +exports.genesis = { + version: 1, + prevBlock: [ 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0 ], + merkleRoot: utils.toArray( + '4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b', + 'hex' + ).reverse(), + ts: 1231006505, + bits: 0x1d00ffff, + nonce: 2083236893 +}; // version - services field exports.services = { diff --git a/lib/bcoin/protocol/parser.js b/lib/bcoin/protocol/parser.js index c294b6ed..2ff2e874 100644 --- a/lib/bcoin/protocol/parser.js +++ b/lib/bcoin/protocol/parser.js @@ -90,6 +90,8 @@ Parser.prototype.parsePayload = function parsePayload(cmd, p) { return this.parseInvList(p); else if (cmd === 'merkleblock') return this.parseMerkleBlock(p); + else if (cmd === 'block') + return this.parseBlock(p); else return p; }; @@ -162,7 +164,7 @@ Parser.prototype.parseInvList = function parseInvList(p) { return items; }; -Parser.prototype.parseMerkleBlock = function parsMerkleBlock(p) { +Parser.prototype.parseMerkleBlock = function parseMerkleBlock(p) { if (p.length < 84) return this.emit('error', new Error('Invalid merkleblock size')); @@ -179,3 +181,21 @@ Parser.prototype.parseMerkleBlock = function parsMerkleBlock(p) { // flags: }; }; + +Parser.prototype.parseBlock = function parseBlock(p) { + if (p.length < 84) + return this.emit('error', new Error('Invalid block size')); + + return { + version: readU32(p, 0), + prevBlock: p.slice(4, 36), + merkleRoot: p.slice(36, 68), + ts: readU32(p, 68), + bits: readU32(p, 72), + nonce: readU32(p, 76), + totalTx: readU32(p, 80) + + // hashes: + // flags: + }; +}; diff --git a/lib/bcoin/tx.js b/lib/bcoin/tx.js index eb81b12c..b81ecbcc 100644 --- a/lib/bcoin/tx.js +++ b/lib/bcoin/tx.js @@ -23,6 +23,5 @@ TX.prototype.hash = function hash(enc) { }; TX.prototype.render = function render(framer) { - console.log('here'); return []; };