diff --git a/lib/service.js b/lib/service.js index 1ca4bcba..cd77a02d 100644 --- a/lib/service.js +++ b/lib/service.js @@ -3,6 +3,7 @@ var util = require('util'); var EventEmitter = require('events').EventEmitter; var LRU = require('lru-cache'); +var assert = require('assert'); var Service = function(options) { EventEmitter.call(this); @@ -131,4 +132,53 @@ Service.prototype._retrieveCachedItems = function(key, valueItem, prevKey, fn) { return resolvedDeps; }; +Service.prototype._getGenesisBlock = function() { + + this.genesis = {}; + this.genesis.height = 0; + this.genesis.hash = constants.BITCOIN_GENESIS_HASH[this.node.getNetworkName()]; + return this.genesis; + +}; + +Service.prototype._decodeTipData = function(tipBuf) { + + assert(tipBuf && Buffer.isBuffer(tipBuf) && tipBuf.length === 36, + 'tip data: ' + tipBuf.toString('hex') + ' if not in the proper format'); + + var hash = tipBuf.slice(0, 32).toString('hex'); + var height = tipBuf.slice(32).toString('hex').readUInt32BE(); + return { + hash: hash, + height: height + }; +}; + +Service.prototype._loadTip = function(callback) { + + var self = this; + + self.db.get(new Buffer('00', 'hex') + self.name, function(err, tipBuf) { + + if (err) { + return callback(err); + } + + if (!tipBuf) { + self.tip = self._getGenesisBlock(); + return callback(); + } + + self.tip = self._decodeTipData(tipData); + + log.info('Loaded tip for: ' + self.name + ' service, hash: ' + + self.tip.hash + ' height: ' + self.tip.height); + + callback(); + + }); + + +}; + module.exports = Service; diff --git a/lib/services/block/index.js b/lib/services/block/index.js index 6582ef38..8804add2 100644 --- a/lib/services/block/index.js +++ b/lib/services/block/index.js @@ -18,8 +18,10 @@ var constants = require('../../constants'); var BlockService = function(options) { BaseService.call(this, options); - this.bitcoind = this.node.services.bitcoind; + this.p2p = this.node.services.p2p; this.db = this.node.services.db; + this.subscriptions = {}; + this.subscriptions.block = []; this._blockHandler = new BlockHandler(this.node, this); this._lockTimes = []; this.tip = null; @@ -30,12 +32,13 @@ var BlockService = function(options) { }; this._blockHeaderQueue = LRU(50); //hash -> header, height -> header, this._blockQueue = LRU(10); // keep 10 blocks in the cache in case of reorg's + this._lastReportedTime = Date.now(); }; inherits(BlockService, BaseService); BlockService.dependencies = [ - 'bitcoind', + 'p2p', 'db' ]; @@ -62,28 +65,41 @@ BlockService.prototype.stop = function(callback) { } }; +BlockService.prototype.getAPIMethods = function() { + var methods = [ + ['processBlockOperations', this, this.processBlockOperations, 1] + ]; + return methods; +}; + +BlockService.prototype.subscribe = function(name, emitter) { + this.subscriptions[name].push(emitter); + log.info(emitter.remoteAddress, 'subscribe:', 'block/' + name, 'total:', this.subscriptions[name].length); +}; + +BlockService.prototype.unsubscribe = function(name, emitter) { + var index = this.subscriptions[name].indexOf(emitter); + if (index > -1) { + this.subscriptions[name].splice(index, 1); + } + log.info(emitter.remoteAddress, 'unsubscribe:', 'block/' + name, 'total:', this.subscriptions[name].length); +}; + +BlockService.prototype.getPublishEvents = function() { + return [ + { + name: 'block/block', + scope: this, + subscribe: this.subscribe.bind(this, 'block'), + unsubscribe: this.unsubscribe.bind(this, 'block') + } + ]; +}; + BlockService.prototype._sync = function() { var self = this; - async.waterfall([ - - self._loadTips.bind(self), - self._detectStartupReorg.bind(self), - self._getBlocks.bind(self) - - ], function(err, blocksDiff) { - - if(err) { - log.error(err); - self.node.stop(); - } - - if (blocksDiff > 0) { - return self._blockHandler.sync(); - } - - self._startSubscriptions(); - }); + self._startSubscriptions(); }; @@ -91,58 +107,49 @@ BlockService.prototype.printTipInfo = function(prependedMessage) { log.info( prependedMessage + ' Serial Tip: ' + this.tip.hash + - ' Concurrent tip: ' + this.concurrentTip.hash + - ' Network tip: ' + this.bitcoind.tiphash + ' Concurrent tip: ' + this.concurrentTip.hash ); }; -BlockService.prototype.getNetworkTipHash = function() { - return this.bitcoind.tiphash; +BlockService.prototype._reportStatus = function(serviceName) { + + var tip = this.tips[serviceName]; + + if ((Date.now() - this._lastReportedTime) > 1000) { + this._lastReportedTime = Date.now(); + log.info(serviceName + ' sync: current height is: ' + tip.height + + ' - hash is: ' + tip.hash); + } + }; -BlockService.prototype.getBlockOperations = function(block, add, type, callback) { - var operations = []; +BlockService.prototype.processBlockOperations = function(opts, callback) { - async.each( - this.node.services, - function(mod, next) { + if (!_.isArray(opts.operations)) { + return; + } - var fn = mod.blockHandler; + // TODO: when writing to leveldb, it turns out that the optimal batch size to write to the + // database is 1000 bytes to achieve optimal write performance on most systems. + // This is not a comprehensive study, however and certainly not for other db types. + var self = this; - if (type === 'concurrent') { - fn = mod.concurrentBlockHandler; - } + self.db.batch(opts.operations, function(err) { - if (fn) { - - fn.call(mod, block, add, function(err, ops) { - - if (err) { - return next(err); - } - - if (ops) { - operations = operations.concat(ops); - } - - next(); - }); - } else { - - setImmediate(next); - - } - }, - function(err) { - - if (err) { - return callback(err); - } - - callback(null, operations); + if(err) { + return callback(err); } - ); + + if (!opts.serviceName) { + opts.serviceName = 'unknown'; + } + + self.setTip(opts); + self._reportStatus(opts.serviceName); + + callback(); + }); }; @@ -168,9 +175,15 @@ BlockService.prototype.getTipOperation = function(block, add, tipType) { }; }; -BlockService.prototype.getBlocks = function(blockArgs, callback) { +BlockService.prototype.getBlocks = function(startHash, endHash, callback) { +}; + +BlockService.prototype._getBlocks = function(startHash, endHash, callback) { var self = this; + assert(startHash && startHash.length === 64, 'startHash is required to getBlocks'); + + self.p2p.getBlocks({ startHash: startHash, endHash: endHash }); async.mapLimit(blockArgs, 8, function(blockArg, next) { @@ -236,29 +249,29 @@ BlockService.prototype.getBlocks = function(blockArgs, callback) { }; -BlockService.prototype.getBlockHeader = function(blockArg, callback) { - - var self = this; - var header = self._blockHeaderQueue.get(blockArg); - - if (header) { - return setImmediate(function() { - callback(null, header); - }); - } - - self.bitcoind.getBlockHeader(blockArg, function(err, header) { - - if(err) { - return callback(err); - } - - self._setBlockHeaderQueue(header); - callback(null, header); - - }); +BlockService.prototype._checkCache = function(key, cache) { + return cache.get(key); }; +BlockService.prototype.getBlockHeader = function(hash, callback) { + + var self = this; + var header = self._checkCache(hash, self._blockHeaderQueue); + + if (header) { + return callback(null, header); + } + + self.p2p.getBlockHeaders(hash); + var timer = setInterval(function() { + var header = self._checkCache(hash, self._blockHeaderQueue); + if (header) { + clearInterval(timer); + callback(null, header); + } + }, 250); + timer.unref(); +}; BlockService.prototype.getBlockHash = function(height, callback) { @@ -276,87 +289,45 @@ BlockService.prototype._startSubscriptions = function() { var self = this; - if (!self._subscribed) { - - self._subscribed = true; - self.bus = self.node.openBus({remoteAddress: 'localhost'}); - - self.bus.on('bitcoind/rawblock', function(block) { - - log.info('New block received: ' + block.hash + ' at height: ' + block.height); - self._cacheRawBlock(block); - var header = self._getHeader(block); - self._setBlockHeaderQueue(header); - - self._detectReorg([block], function() { - self._blockHandler.sync(block); - }); - - }); - - self.bus.subscribe('bitcoind/rawblock'); + if (self._subscribed) { + return; } + self._subscribed = true; + self.bus = self.node.openBus({remoteAddress: 'localhost'}); + + self.bus.on('p2p/block', self._onBlock.bind(self)); + self.bus.on('p2p/headers', self._onHeaders.bind(self)); + + self.bus.subscribe('p2p/block'); + self.bus.subscribe('p2p/headers'); + }; -BlockService.prototype._cacheRawBlock = function(block) { - log.debug('Setting block: ' + block.hash + ' in the raw block cache.'); +BlockService.prototype._onHeaders = function(headers) { + log.debug('New header received: ' + block.hash); + this._cacheHeaders(headers); +}; + +BlockService.prototype._onBlock = function(block) { + + log.debug('New block received: ' + block.hash); + + this._cacheBlock(block); + this._broadcast(this.subscriptions.block, 'block/block', block); +}; + +BlockService.prototype._broadcast = function(subscribers, name, entity) { + for (var i = 0; i < subscribers.length; i++) { + subscribers[i].emit(name, entity); + } +}; + +BlockService.prototype._cacheBlock = function(block) { + + log.debug('Setting block: ' + block.hash + ' in the block cache.'); this._blockQueue.set(block.hash, block); -}; -BlockService.prototype._getBlocks = function(callback) { - - var self = this; - var blocksDiff = self.bitcoind.height - self.tip.__height; - - // we will need to wait for new blocks to be pushed to us - if (blocksDiff < 0) { - log.warn('Peer\'s height is less than our own. The peer may be syncing. ' + - 'The system is usable, but the chain may have a reorg in future blocks. ' + - 'You should not rely on query responses for heights greater than ' + - self.bitcoind.height + ' until fully synced.'); - - return callback(null, blocksDiff); - } - - log.info('Syncing: ' + blocksDiff + ' blocks from the network.'); - - var operations = []; - - async.timesLimit(blocksDiff, 8, function(n, next) { - - var blockNumber = n + self.tip.__height + 1; - - self.getBlockHeader(blockNumber, function(err, header) { - - if(err) { - return next(err); - } - - self._getBlockOperations(header).forEach(function(op) { - operations.push(op); - }); - - next(); - }); - - }, function(err) { - - if(err) { - return callback(err); - } - - self.db.batch(operations, function(err) { - - if (err) { - return callback(err); - } - - log.info('Completed syncing block headers from the network.'); - callback(null, blocksDiff); - }); - - }); }; BlockService.prototype._getHeader = function(block) { @@ -381,52 +352,15 @@ BlockService.prototype._setBlockHeaderQueue = function(header) { BlockService.prototype._setHandlers = function() { var self = this; - self.node.once('ready', function() { + self.p2p.once('bestHeight', function(height) { - self._sync(); + self._bestHeight = height; + self._loadTip(self._sync); }); - self._blockHandler.on('synced', function() { - - log.info('Synced: ' + self.tip.hash); - self._startSubscriptions(); - - }); }; -BlockService.prototype._getGenesisBlock = function() { - - this.genesis = {}; - this.genesis.height = 0; - this.genesis.hash = constants.BITCOIN_GENESIS_HASH[this.node.getNetworkName()]; - return this.genesis; - -}; - -BlockService.prototype._detectStartupReorg = function(callback) { - - var self = this; - - if (self.tip.height === 0) { - return callback(); - } - - self.getBlockHeader(hash, function(err, header) { - - if (err) { - return callback(err); - } - - if (header.height === height) { - return callback(); - } - - var opts = { preReorgTipHash: hash, preReorgTipHeight: header.height }; - self._handleReorg(header.hash, opts, callback); - - }); -}; BlockService.prototype._handleReorg = function(hash, callback) { @@ -453,53 +387,6 @@ BlockService.prototype._handleReorg = function(hash, callback) { }; -BlockService.prototype._decodeTipData = function(tipDataBuf) { - var hash = tipDataBuf.slice(0, 32).toString('hex'); - var height = tipDataBuf.slice(32).toString('hex').readUInt32BE(); - return { - hash: hash, - height: height - }; -}; - -BlockService.prototype._loadTips = function(callback) { - - var self = this; - - var tipStrings = ['tip', 'concurrentTip']; - - async.each(tipStrings, function(tip, next) { - - self.db.get(self.dbPrefix + tip, self.dbOptions, function(err, tipData) { - - if(err && !(err instanceof levelup.errors.NotFoundError)) { - return next(err); - } - - if (!tipData) { - self[tip] = self._getGenesisBlock(); - return next(); - } - - self[tip] = self._decodeTipData(tipData); - log.info('loaded ' + tip + ' hash: ' + self[tip].hash + ' height: ' + self[tip].height); - next(); - - }); - - }, function(err) { - - if(err) { - return callback(err); - } - - log.info('Bitcoin network tip is currently: ' + self.bitcoind.tiphash + ' at height: ' + self.bitcoind.height); - callback(); - - }); - -}; - BlockService.prototype._detectReorg = function(blocks, callback) { var tipHash = this.reorgHash || this.tip.hash; @@ -583,43 +470,6 @@ BlockService.prototype._isGenesisBlock = function(blockArg, callback) { }; -BlockService.prototype._getBlock = function(blockArg, callback) { - - var self = this; - - var cachedBlock = self._blockQueue.get(blockArg); - if (cachedBlock) { - return setImmediate(function() { - callback(null, cachedBlock); - }); - } - - self.bitcoind.getBlock(blockArg, function(err, block) { - - if(err) { - return callback(err); - } - - if (blockArg.length === 64) { - return self.getBlockHeader(blockArg, function(err, header) { - - if(err) { - return callback(err); - } - - block.__height = header.height; - block.height = header.height; - callback(null, block); - - }); - } - - block.__height = blockArg; - block.height = blockArg; - callback(null, block); - }); -}; - BlockService.prototype._getReorgOperations = function(hash, height) { if (!hash || !height) { diff --git a/lib/services/db/index.js b/lib/services/db/index.js index 7aa0820a..48644390 100644 --- a/lib/services/db/index.js +++ b/lib/services/db/index.js @@ -36,13 +36,12 @@ function DB(options) { this.subscriptions = {}; - this.bitcoind = this.node.services.bitcoind; this._operationsQueue = []; } util.inherits(DB, Service); -DB.dependencies = ['bitcoind']; +DB.dependencies = []; DB.prototype._setDataPath = function() { $.checkState(this.node.datadir, 'Node is expected to have a "datadir" property'); @@ -114,17 +113,37 @@ DB.prototype.start = function(callback) { }; DB.prototype.get = function(key, options, callback) { + var cb = callback; var opts = options; + if (typeof callback !== 'function') { cb = options; opts = {}; } + if (!this._stopping) { - this._store.get(key, opts, cb); + + this._store.get(key, opts, function(err, data) { + + if(err && err instanceof levelup.errors.NotFoundError) { + return cb(); + } + + if (err) { + return cb(err); + } + + cb(null, data); + + }); + } else { - setImmediate(cb); + + // TODO: will this tell the caller the right thing? + cb(new Error('Shutdown sequence underway, not able to complete the query')); } + }; DB.prototype.put = function(key, value, options, callback) { diff --git a/lib/services/p2p/index.js b/lib/services/p2p/index.js index 4ceb79ab..c658dcff 100644 --- a/lib/services/p2p/index.js +++ b/lib/services/p2p/index.js @@ -30,6 +30,7 @@ var P2P = function(options) { this._configPeers = this.options.peers; this.subscriptions = {}; this.subscriptions.block = []; + this.subscriptions.headers = []; this.subscriptions.transaction = []; this.messages = new p2p.Messages({ network: this.node.network }); this._peerHeights = []; @@ -75,10 +76,10 @@ P2P.prototype.getPublishEvents = function() { unsubscribe: this.unsubscribe.bind(this, 'block') }, { - name: 'p2p/header', + name: 'p2p/headers', scope: this, - subscribe: this.subscribe.bind(this, 'header'), - unsubscribe: this.unsubscribe.bind(this, 'header') + subscribe: this.subscribe.bind(this, 'headers'), + unsubscribe: this.unsubscribe.bind(this, 'headers') } ]; }; @@ -166,7 +167,6 @@ P2P.prototype._onPeerInventory = function(peer, message) { var self = this; var newDataNeeded = []; - message.inventory.forEach(function(inv) { if (!self._inv.get(inv.hash)) { @@ -176,6 +176,7 @@ P2P.prototype._onPeerInventory = function(peer, message) { newDataNeeded.push(inv); } + }); if (newDataNeeded.length > 0) { @@ -194,6 +195,10 @@ P2P.prototype._onPeerBlock = function(peer, message) { this._broadcast(this.subscriptions.block, 'p2p/block', message.block); }; +P2P.prototype._onPeerHeaders = function(peer, message) { + this._broadcast(this.subscriptions.headers, 'p2p/headers', message.headers); +}; + P2P.prototype._setupListeners = function() { var self = this; @@ -202,6 +207,7 @@ P2P.prototype._setupListeners = function() { self._pool.on('peerinv', self._onPeerInventory.bind(self)); self._pool.on('peertx', self._onPeerTx.bind(self)); self._pool.on('peerblock', self._onPeerBlock.bind(self)); + self._pool.on('peerheaders', self._onPeerHeaders.bind(self)); self.node.on('ready', function() { self._pool.connect(); }); @@ -226,7 +232,17 @@ P2P.prototype._applyMempoolFilter = function(message) { return message; }; -P2P.prototype._getResourceFilter = function(filter, resource) { +P2P.prototype._setResourceFilter = function(filter, resource) { + + // startHash is usually the last block or header you have, endHash is a hash that comes after that hash, + // or a block at a greater height + if (resource === 'headers' || resource === 'blocks') { + assert(filter.startHash, 'A "startHash" field is required to retrieve headers or blocks'); + if (!filter.endHash) { + filter.endHash = 0; + } + return { starts: [filter.startHash], stop: filter.endHash }; + } // this function knows about mempool, block and header filters // mempool filters are considered after the tx is delivered to us @@ -236,21 +252,13 @@ P2P.prototype._getResourceFilter = function(filter, resource) { return; } - if (resource === 'blocks' || resource === 'headers') { - assert(filter.newestHash, 'A "newestHash" field is required to retrieve blocks or headers'); - if (!filter.oldestHash) { - filter.oldestHash = filter.newestHash; - } - return { starts: [filter.newestHash], stop: filter.oldestHash }; - } - }; P2P.prototype.getAPIMethods = function() { var methods = [ - ['getHeaders', this, this.getHeaders, 2], - ['getMempool', this, this.getMempool, 1], - ['getBlocks', this, this.getBlocks, 2] + ['getHeaders', this, this.getHeaders, 1], + ['getMempool', this, this.getMempool, 0], + ['getBlocks', this, this.getBlocks, 1] ]; return methods; }; @@ -258,7 +266,7 @@ P2P.prototype.getAPIMethods = function() { P2P.prototype.getHeaders = function(filter) { var peer = this._getPeer(); - var headerFilter = this._getResourceFilter(filter, 'headers'); + var headerFilter = this._setResourceFilter(filter, 'headers'); peer.sendMessage(this.messages.GetHeaders(headerFilter)); }; @@ -271,8 +279,7 @@ P2P.prototype.getMempool = function(filter) { // all manner of txs (low/no fee, "non-standard", etc.). As such, this filter can // grow quite large over time. Care should be taken to limit the size of this filter. // Even still, when a tx is broadcasted, the reference to it is dropped from the filter. - this._mempoolFilter = this._getResourceFilter(filter, 'mempool'); - + this._setResourceFilter(filter, 'mempool'); peer.sendMessage(this.messages.MemPool()); @@ -281,9 +288,13 @@ P2P.prototype.getMempool = function(filter) { P2P.prototype.getBlocks = function(filter) { var peer = this._getPeer(); - var blockFilter = this._getResourceFilter(filter, 'blocks'); + var blockFilter = this._setResourceFilter(filter, 'blocks'); peer.sendMessage(this.messages.GetBlocks(blockFilter)); }; +P2P.prototype.clearInventoryCache = function() { + this._inv.reset(); +}; + module.exports = P2P; diff --git a/lib/services/web/index.js b/lib/services/web/index.js index 5e7d59a0..c52b9243 100644 --- a/lib/services/web/index.js +++ b/lib/services/web/index.js @@ -286,8 +286,8 @@ WebService.prototype._endpointGetInfo = function() { result: 'ok', dbheight: self.node.services.block.tip.__height, dbhash: self.node.services.block.tip.hash, - bitcoindheight: self.node.services.bitcoind.height, - bitcoindhash: self.node.services.bitcoind.tiphash + networkHeight: self.node.services.bitcoind.height, + networkHash: self.node.services.bitcoind.tiphash }); }; }; diff --git a/regtest/block.js b/regtest/block.js index 13c43b88..53441f36 100644 --- a/regtest/block.js +++ b/regtest/block.js @@ -8,8 +8,9 @@ var path = require('path'); var utils = require('./utils'); var debug = true; -var bitcoreDataDir = '/tmp/bitcore'; -var bitcoinDataDir = '/tmp/bitcoin'; +var extraDebug = true; +var bitcoreDataDir = '/tmp/testtmpfs/bitcore'; +var bitcoinDataDir = '/tmp/testtmpfs/bitcoin'; var rpcConfig = { protocol: 'http', @@ -45,7 +46,7 @@ var bitcore = { port: 53001, datadir: bitcoreDataDir, services: [ - 'bitcoind', + 'p2p', 'db', 'web', 'block', diff --git a/regtest/p2p.js b/regtest/p2p.js index 27da1c6c..9d783dba 100644 --- a/regtest/p2p.js +++ b/regtest/p2p.js @@ -11,12 +11,16 @@ var zmq = require('zmq'); var bitcore = require('bitcore-lib'); var Transaction = bitcore.Transaction; var PrivateKey = bitcore.PrivateKey; +var Block = bitcore.Block; +var BlockHeader = bitcore.BlockHeader; var Unit = bitcore.Unit; - +var constants = require('../lib/constants'); var debug = true; var extraDebug = true; -var bitcoreDataDir = '/tmp/bitcore'; -var bitcoinDataDir = '/tmp/bitcoin'; +//advisable to use a tmpfs here, much easier on NAND-based disks and good for performance +var bitcoreDataDir = '/tmp/testtmpfs/bitcore'; +// to do this on Linux: sudo mount -t tmpfs -o size=512m tmpfs /tmp/testtmpfs +var bitcoinDataDir = '/tmp/testtmpfs/bitcoin'; var rpcConfig = { protocol: 'http', @@ -101,13 +105,12 @@ var opts = { satoshisSent: 0, walletId: crypto.createHash('sha256').update('test').digest('hex'), satoshisReceived: 0, - initialHeight: 150, + initialHeight: 110, path: '/test/info', errorFilter: function(err, res) { -console.log(err, res); try { var info = JSON.parse(res); - if (res.result) { + if (info.result) { return; } } catch(e) { @@ -121,13 +124,18 @@ var utils = new Utils(opts); var subSocket; var txs = []; var blocks = []; - +var headers = []; +var startingBlockHash; +var count = 0; function processMessages(topic, message) { var topicStr = topic.toString(); if (topicStr === 'transaction') { return txs.push(message); } else if (topicStr === 'block') { + count++; return blocks.push(message); + } else if (topicStr === 'headers') { + return headers.push(message); } } @@ -150,6 +158,7 @@ function setupZmqSubscriber(callback) { subSocket.connect('tcp://127.0.0.1:38332'); subSocket.subscribe('transaction'); subSocket.subscribe('block'); + subSocket.subscribe('headers'); subSocket.on('message', processMessages); callback(); } @@ -165,31 +174,18 @@ describe('P2P Operations', function() { before(function(done) { async.series([ utils.startBitcoind.bind(utils), - utils.waitForBitcoinReady.bind(utils) + utils.waitForBitcoinReady.bind(utils), + utils.unlockWallet.bind(utils), + utils.setupInitialTxs.bind(utils), + utils.startBitcoreNode.bind(utils), + utils.waitForBitcoreNode.bind(utils), + setupZmqSubscriber, + utils.sendTxs.bind(utils, false) ], done); }); - - it('should connect to the p2p network and stream the mempool to clients', function(done) { - async.series([ - utils.unlockWallet.bind(utils), - utils.setupInitialTxs.bind(utils), - - function(next) { - async.eachSeries(utils.opts.initialTxs, function(tx, next) { - utils.opts.rpc.sendRawTransaction(tx.serialize(), next); - }, next); - }, - - utils.startBitcoreNode.bind(utils), - setupZmqSubscriber, - utils.waitForBitcoreNode.bind(utils) - - ], function(err) { - - if(err) { - return done(err); - } + describe('Mempool', function() { + it('should send new transactions as they are broadcasted by our trusted peer (unsoliticted)', function(done) { var initialTxs = {}; @@ -205,70 +201,219 @@ describe('P2P Operations', function() { expect(initialTxs[tx.hash]).to.equal(true); } - expect(utils.opts.initialTxs.length).to.equal(i); + expect(txs.length).to.equal(i); done(); + }); - }); - it('should send new transactions as they are broadcasted by our trusted peer (unsoliticted)', function(done) { - var tx; - async.series([ - - function(next) { - opts.rpc.generate(10, next); - }, - - function(next) { - utils.getPrivateKeysWithABalance(function(err, spendables) { + it('should connect to the p2p network and stream the mempool to clients', function(done) { + // this tricky because if the p2p service has already asked for the data + // from a particular peer, it will not ask again until the inv hash is dropped + // from its lru cache. So, to fake this out, I will clear this cache manually + txs.length = 0; + utils.queryBitcoreNode(Object.assign({ + path: '/test/mempool', + }, bitcore.httpOpts), function(err) { if(err) { - return next(err); + return done(err); } - tx = new Transaction() - .from(spendables[0].utxo) - .to(new PrivateKey('testnet').toAddress().toString(), - Unit.fromBTC(spendables[0].utxo.amount).satoshis - 100000) - .fee(100000) - .sign(spendables[0].privKey); + setTimeout(function() { + var initialTxs = {}; - utils.sendTx(tx, 0, next); + utils.opts.initialTxs.map(function(tx) { + initialTxs[tx.hash] = true; + return; + }); + + var i = 0; + for(; i < utils.opts.initialTxs.length; i++) { + var tx = new Transaction(txs[i]); + expect(initialTxs[tx.hash]).to.equal(true); + } + + expect(txs.length).to.equal(i); + + done(); + }, 2000); }); - } - ], function(err) { + }); - if(err) { - return done(err); - } + it('should be able to set a mempool filter and only get back what is NOT in the filter', function(done) { + var newTx = txs.shift(); + newTx = new Transaction(newTx); + var argTxs = txs.map(function(rawTx) { + var tx = new Transaction(rawTx); + return tx.hash; + }); + txs.length = 0; + utils.queryBitcoreNode(Object.assign({ + path: '/test/mempool?filter=' + JSON.stringify(argTxs) + }, bitcore.httpOpts), function(err) { - setTimeout(function() { - var broadcastedTx = new Transaction(txs[txs.length - 1]); - expect(tx.hash).to.equal(broadcastedTx.hash); - expect(txs.length).to.equal(utils.opts.initialTxs.length + 1); - done(); - }, 1000); + if (err) { + return done(err); + } + + setTimeout(function() { + + var tx = new Transaction(txs[0]); + expect(newTx.hash).to.equal(tx.hash); + expect(txs.length).to.equal(1); + done(); + }, 2000); + }); + }); + }); + + describe('Block', function() { + + it('should get blocks when they are relayed to us', function(done) { + opts.rpc.generate(1, function(err, res) { + if(err) { + return done(err); + } + startingBlockHash = res.result[0]; + setTimeout(function() { + expect(blocks.length).to.equal(1); + var block = new Block(blocks[0]); + expect(startingBlockHash).to.equal(block.hash); + done(); + }, 2000); + }); + }); + + it('should be able to get historical blocks', function(done) { + + blocks.length = 0; + var filter = { startHash: constants.BITCOIN_GENESIS_HASH.regtest }; + utils.queryBitcoreNode(Object.assign({ + path: '/test/blocks?filter=' + JSON.stringify(filter), + }, bitcore.httpOpts), function(err) { + + if(err) { + return done(err); + } + + setTimeout(function() { + expect(blocks.length).to.equal(utils.opts.blockHeight + 1); + var lastBlock = new Block(blocks[blocks.length - 1]); + expect(startingBlockHash).to.equal(lastBlock.hash); + done(); + }, 2000); + + + }); + + }); + + + }); + + describe('Block Headers', function() { + + it('should be able to get historical block headers', function(done) { + + var filter = { startHash: constants.BITCOIN_GENESIS_HASH.regtest }; + utils.queryBitcoreNode(Object.assign({ + path: '/test/headers?filter=' + JSON.stringify(filter), + }, bitcore.httpOpts), function(err) { + + if(err) { + return done(err); + } + + setTimeout(function() { + + expect(headers.length).to.equal(utils.opts.blockHeight + 1); + var lastBlockHeader = new BlockHeader(blocks[blocks.length - 1]); + expect(startingBlockHash).to.equal(lastBlockHeader.hash); + done(); + + }, 2000); + + + }); + }); + + it('should return up to 2000 headers in a single call to getHeaders', function(done) { + + // p2p note: when asking for a series of headers, your peer will always follow up + // with an additional inventory message after delivering the initial data. + + // after getHeaders: an inv message for the block matching the latest header you received. + // remember: getHeaders message does not respond with an inventory message like getBlocks does, + // instead it responds with the header message, but THEN will respond with a single inventory + // message representing the block of the last header delievered. + + // For example: if there exists 4 blocks with block hashes a,b,c,d: + // getHeaders({ starts: 'a', stop: 0 }) should receive headers for b,c,d and an inv message for block d. + var additionalBlockCount = 2000 - 111; + headers.length = 0; + opts.rpc.generate(additionalBlockCount, function(err) { + + if(err) { + return done(err); + } + + var filter = { startHash: constants.BITCOIN_GENESIS_HASH.regtest }; + + utils.queryBitcoreNode(Object.assign({ + path: '/test/headers?filter=' + JSON.stringify(filter), + }, bitcore.httpOpts), function(err) { + + if(err) { + return done(err); + } + + setTimeout(function() { + + expect(headers.length).to.equal(2000); + done(); + + }, 2000); + + + }); + + }); + }); + + it('should return up to 500 blocks in a single call to getBlocks', function(done) { + + // p2p note: when asking for a series of headers, your peer will always follow up + // with an additional inventory message after delivering the initial data. + + // after getBlocks: an inv message for the block immediately following the last one you received, if + // there more blocks to retrieve. Since there is a 500 block limit in the initial inventory message response, + // when receiving 500 blocks, an additional inventory message will tell you what the next block is and that + // are more blocks to be retrieved. + + blocks.length = 0; + count = 0; + var filter = { startHash: constants.BITCOIN_GENESIS_HASH.regtest }; + + utils.queryBitcoreNode(Object.assign({ + path: '/test/blocks?filter=' + JSON.stringify(filter), + }, bitcore.httpOpts), function(err) { + + if(err) { + return done(err); + } + + setTimeout(function() { + + expect(blocks.length).to.equal(501); + done(); + + }, 2000); + + }); }); }); - it('should get blocks from peer that we do not have on startup', function(done) { - expect(blocks.length).to.equal(151); - done(); - }); - - it('should be able to set a mempool filter and only get back what is NOT in the filter', function(done) { - var newTx = txs.shift(); - utils.queryBitcoreNode(Object.assign({ - path: '/mempool?filter=' + txs - }), bitcore.httpOpts, function(err, data) { - if (err) { - return done(err); - } - newTxs = JSON.parse(data).transactions; - expect(newTxs.length).to.equal(2); - expect(newTx).to.equal(newTxs[0].hash); - }); - }); }); diff --git a/regtest/test_bus.js b/regtest/test_bus.js index 17b1437a..f575988b 100644 --- a/regtest/test_bus.js +++ b/regtest/test_bus.js @@ -5,8 +5,6 @@ var inherits = require('util').inherits; var zmq = require('zmq'); var index = require('../lib'); var log = index.log; -var constants = require('../lib/constants'); -var assert = require('assert'); var TestBusService = function(options) { BaseService.call(this, options); @@ -31,11 +29,10 @@ TestBusService.prototype.start = function(callback) { self.bus.on('p2p/transaction', function(tx) { self._cache.transaction.push(tx); if (self._ready) { - for(var i = 0; i < self._cache.transaction.length; i++) { + while(self._cache.transaction.length > 0) { var transaction = self._cache.transaction.shift(); self.pubSocket.send([ 'transaction', new Buffer(transaction.uncheckedSerialize(), 'hex') ]); } - return; } }); @@ -46,22 +43,33 @@ TestBusService.prototype.start = function(callback) { self.bus.on('p2p/block', function(block) { self._cache.block.push(block); if (self._ready) { - for(var i = 0; i < self._cache.block.length; i++) { + while(self._cache.block.length > 0) { var blk = self._cache.block.shift(); self.pubSocket.send([ 'block', blk.toBuffer() ]); } - return; + } + }); + + self.bus.on('p2p/headers', function(headers) { + headers.forEach(function(header) { + self._cache.headers.push(header); + }); + + if (self._ready) { + while(self._cache.headers.length > 0) { + var hdr = self._cache.headers.shift(); + self.pubSocket.send([ 'headers', hdr.toBuffer() ]); + } } }); self.bus.subscribe('p2p/transaction'); self.bus.subscribe('p2p/block'); + self.bus.subscribe('p2p/headers'); self.node.on('ready', function() { self._ready = true; - self.node.services.p2p.getMempool(); - self.node.services.p2p.getBlocks({ newestHash: constants.BITCOIN_GENESIS_HASH.regtest }); }); @@ -73,17 +81,35 @@ TestBusService.prototype.setupRoutes = function(app) { var self = this; app.get('/mempool', function(req, res) { - self.node.services.p2p.getMempool(req.params.filter); - res.status(200); + self.node.services.p2p.clearInventoryCache(); + var filter; + if (req.query.filter) { + filter = JSON.parse(req.query.filter); + } + self.node.services.p2p.getMempool(filter); + res.status(200).end(); }); app.get('/blocks', function(req, res) { - self.node.services.p2p.getBlocks(req.params.filter); - res.status(200); + self.node.services.p2p.clearInventoryCache(); + var filter; + if (req.query.filter) { + filter = JSON.parse(req.query.filter); + } + self.node.services.p2p.getBlocks(filter); + res.status(200).end(); + }); + + app.get('/headers', function(req, res) { + var filter; + if (req.query.filter) { + filter = JSON.parse(req.query.filter); + } + self.node.services.p2p.getHeaders(filter); + res.status(200).end(); }); app.get('/info', function(req, res) { -console.log('test test test test'); res.status(200).jsonp({ result: (self._ready && (self._bestHeight >= 0))}); }); }; diff --git a/regtest/utils.js b/regtest/utils.js index 6f440851..2206b009 100644 --- a/regtest/utils.js +++ b/regtest/utils.js @@ -11,6 +11,7 @@ var http = require('http'); var Unit = bitcore.Unit; var Transaction = bitcore.Transaction; var PrivateKey = bitcore.PrivateKey; +var assert = require('assert'); var Utils = function(opts) { this.opts = opts; @@ -94,7 +95,7 @@ Utils.prototype.waitForBitcoreNode = function(callback) { } }; } - var httpOpts = self.getHttpOpts({ path: opts.path || '/info', errorFilter: errorFilter }); + var httpOpts = self.getHttpOpts({ path: self.opts.path || '/info', errorFilter: errorFilter }); self.waitForService(self.queryBitcoreNode.bind(self, httpOpts), callback); }; @@ -281,24 +282,35 @@ Utils.prototype.setupInitialTxs = function(callback) { }; -Utils.prototype.sendTxs = function(callback) { +Utils.prototype.sendTxs = function(generateBlockAfterEach, callback) { + var self = this; + if (typeof generateBlockAfterEach !== 'function') { + return async.eachSeries(this.opts.initialTxs, function(tx, next) { + self.sendTx(tx, generateBlockAfterEach, next); + }, callback); + } async.eachOfSeries(this.opts.initialTxs, this.sendTx.bind(this), callback); }; Utils.prototype.sendTx = function(tx, index, callback) { var self = this; - self.opts.rpc.sendRawTransaction(tx.serialize(), function(err) { + // sending these too quickly will prevent them from being relayed over the + // p2p network + self.opts.rpc.sendRawTransaction(tx.serialize(), function(err, res) { if (err) { return callback(err); } + assert(res.result === tx.hash, 'sendTx: provided hash did not match returned hash'); var mod = index % 2; - if (mod === 1) { - self.opts.blockHeight++; - self.opts.rpc.generate(1, callback); - } else { - callback(); - } + setTimeout(function() { + if (mod === 1) { + self.opts.blockHeight++; + self.opts.rpc.generate(1, callback); + } else { + callback(); + } + }, 200); }); };