diff --git a/lib/scaffold/start.js b/lib/scaffold/start.js index 7a187541..2b78b36e 100644 --- a/lib/scaffold/start.js +++ b/lib/scaffold/start.js @@ -9,7 +9,6 @@ var log = index.log; var shuttingDown = false; var fs = require('fs'); - function start(options) { var fullConfig = _.clone(options.config); diff --git a/lib/services/block/index.js b/lib/services/block/index.js index b174aabf..d147cbf9 100644 --- a/lib/services/block/index.js +++ b/lib/services/block/index.js @@ -26,6 +26,7 @@ var BlockService = function(options) { this._subscriptions.block = []; this._subscriptions.reorg = []; + this._unprocessedBlocks = []; // in-memory full/raw block cache this._blockQueue = LRU({ max: 50 * (1 * 1024 * 1024), // 50 MB of blocks, @@ -48,7 +49,7 @@ inherits(BlockService, BaseService); BlockService.dependencies = [ 'p2p', 'db', 'header' ]; -BlockService.MAX_BLOCKS = 10; +BlockService.MAX_BLOCKS = 1; // --- public prototype functions BlockService.prototype.getAPIMethods = function() { @@ -201,24 +202,6 @@ BlockService.prototype._primeBlockQueue = function(callback) { }, callback); }; -// upon startup, has the chain reorg'ed from where we were when we shutdown? -BlockService.prototype._detectInitialChainState = function(headers) { - - if (this._tip.height === 0) { - return; - } - - var index = this._tip.height - 1; - var record = headers.getIndex(index); - - if (record.hash !== this._tip.hash) { - // reorg! we don't yet have the blocks to reorg to, so we'll rewind the chain back to - // to common ancestor, set the tip to the common ancestor and start the sync - this._chainTips.push(record.hash); - this._handleReorg(record.hash); - } -}; - BlockService.prototype.stop = function(callback) { setImmediate(callback); }; @@ -542,27 +525,83 @@ BlockService.prototype._isOutOfOrder = function(block) { BlockService.prototype._onAllHeaders = function(headers) { this._bestHeight = headers.size; - this._detectInitialChainState(headers); this._startSync(); }; + +BlockService.prototype._processBlock = function() { + var operations = []; + var services = this.node.services; + var block = this._unprocessedBlocks.shift(); + var self = this; + async.eachSeries( + services, + function(mod, next) { + if(mod.onBlock) { + mod.onBlock.call(mod, block, function(err, ops) { + if (err) { + return next(err); + } + if (ops) { + operations = operations.concat(ops); + } + next(); + }); + } else { + setImmediate(next); + } + }, + function(err) { + + if (err) { + log.error(err); + self.node.stop(); + return; + } + + self._tip.height++; + self._tip.hash = block.rhash(); + var tipOps = utils.encodeTip(self._tip, self.name); + + operations.push({ + type: 'put', + key: tipOps.key, + value: tipOps.value + }); + + self._db._store.batch(operations, function(err) { + + if (err) { + log.error(err); + self.node.stop(); + return; + } + self._sync(); + }); + } + ); +}; + BlockService.prototype._onBlock = function(block) { + this._unprocessedBlocks.push(block); + this._processBlock(); + return; // 1. have we already seen this block? - if (this._blockAlreadyProcessed(block)) { - return; - } + //if (this._blockAlreadyProcessed(block)) { + // return; + //} // 2. log the reception - log.debug('New block received: ' + block.rhash()); + //log.debug('New block received: ' + block.rhash()); // 3. store the block for safe keeping - this._cacheBlock(block); + //this._cacheBlock(block); // 4. don't process any more blocks if we are currently in a reorg - if (this._reorging) { - return; - } + //if (this._reorging) { + // return; + //} // 5. determine block state, reorg, outoforder, normal var blockState = this._determineBlockState(block); @@ -575,12 +614,14 @@ BlockService.prototype._onBlock = function(block) { // nothing to do, but wait until ancestor blocks come in break; case 'reorg': - this._handleReorg(block.hash); - break; + //this._handleReorg(block.hash); + //break; default: // send all unsent blocks now that we have a complete chain - - this._sendDelta(); + this._unprocessedBlocks.push(block); + this._processBlock(); + return; + //this._sendDelta(); break; } }; @@ -614,6 +655,7 @@ BlockService.prototype._selectActiveChain = function() { BlockService.prototype._sendDelta = function() { + // when this function is called, we know, for sure, that we have a complete chain of unsent block(s). // our task is to send all blocks between active chain's tip and our tip. var activeChainTip = this._selectActiveChain(); @@ -665,13 +707,12 @@ BlockService.prototype._startSync = function() { return; } - log.info('Gathering: ' + this._numNeeded + ' ' + 'block(s) from the peer-to-peer network.'); + log.info('Gathering: ' + this._numNeeded + ' block(s) from the peer-to-peer network.'); this._sync(); }; BlockService.prototype._startSubscriptions = function() { - if (this._subscribed) { return; } @@ -681,8 +722,8 @@ BlockService.prototype._startSubscriptions = function() { this._bus = this.node.openBus({remoteAddress: 'localhost-block'}); } - this._bus.on('p2p/block', this._onBlock.bind(this)); - this._bus.subscribe('p2p/block'); + this._bus.on('header/block', this._onBlock.bind(this)); + this._bus.subscribe('header/block'); }; BlockService.prototype._sync = function() { @@ -693,9 +734,10 @@ BlockService.prototype._sync = function() { if (this._tip.height < size) { log.info('Blocks download progress: ' + this._tip.height + '/' + - this._numNeeded + ' (' + (this._tip.height / this._numNeeded*100).toFixed(2) + '%)'); + this._numNeeded + ' (' + (this._tip.height / this._bestHeight*100).toFixed(2) + '%)'); - var endHash = headers.getIndex( Math.min(this._tip.height + BlockService.MAX_BLOCKS, size) ).hash; + var end = headers.getIndex(Math.min(this._tip.height + BlockService.MAX_BLOCKS, size)); + var endHash = end ? end.hash : null; this._p2p.getBlocks({ startHash: this._tip.hash, endHash: endHash }); return; diff --git a/lib/services/header/index.js b/lib/services/header/index.js index 8b16505c..51467ab9 100644 --- a/lib/services/header/index.js +++ b/lib/services/header/index.js @@ -9,6 +9,7 @@ var utils = require('../../utils'); var async = require('async'); var BN = require('bn.js'); var consensus = require('bcoin').consensus; +var assert = require('assert'); var HeaderService = function(options) { @@ -19,6 +20,9 @@ var HeaderService = function(options) { this._db = this.node.services.db; this._checkpoint = options.checkpoint || 0; // the # of header to look back on boot this._headers = new utils.SimpleMap(); + + this.subscriptions = {}; + this.subscriptions.block = []; }; inherits(HeaderService, BaseService); @@ -29,6 +33,26 @@ HeaderService.MAX_CHAINWORK = new BN(1).ushln(256); HeaderService.STARTING_CHAINWORK = '0000000000000000000000000000000000000000000000000000000100010001'; // --- public prototype functions +HeaderService.prototype.subscribe = function(name, emitter) { + +console.log(this.subscriptions, name); + this.subscriptions[name].push(emitter); + log.info(emitter.remoteAddress, 'subscribe:', 'header/' + name, 'total:', this.subscriptions[name].length); + +}; + +HeaderService.prototype.unsubscribe = function(name, emitter) { + + var index = this.subscriptions[name].indexOf(emitter); + + if (index > -1) { + this.subscriptions[name].splice(index, 1); + } + + log.info(emitter.remoteAddress, 'unsubscribe:', 'header/' + name, 'total:', this.subscriptions[name].length); + +}; + HeaderService.prototype.getAPIMethods = function() { var methods = [ @@ -110,10 +134,32 @@ HeaderService.prototype._startSubscriptions = function() { }; +HeaderService.prototype.getPublishEvents = function() { + + return [ + { + name: 'header/block', + scope: this, + subscribe: this.subscribe.bind(this, 'header'), + unsubscribe: this.unsubscribe.bind(this, 'header') + } + ]; + +}; + HeaderService.prototype._onBlock = function(block) { // we just want the header to keep a running list - log.debug('Header Service: new block: ' + block.rhash()); + var hash = block.rhash(); + log.debug('Header Service: new block: ' + hash); + + if (this._headers.get(hash)) { + for (var i = 0; i < this.subscriptions.length; i++) { + this.subscriptions[i].emit('header/block', block); + } + return; + } + log.debug('Header Service: new block has arrived: ' + hash); var header = block.toHeaders().toJSON(); header.timestamp = header.ts; header.prevHash = header.prevBlock; @@ -122,6 +168,7 @@ HeaderService.prototype._onBlock = function(block) { HeaderService.prototype._onHeaders = function(headers, convert) { + var self = this; if (!headers || headers.length < 1) { return; } @@ -136,28 +183,36 @@ HeaderService.prototype._onHeaders = function(headers, convert) { }); } - var runningHeight = this._tip.height; - var prevHeader = this._headers.getLastIndex(); + var runningHeight = self._tip.height; + var prevHeader = self._headers.getLastIndex(); var dbOps = []; for(var i = 0; i < headers.length; i++) { var header = headers[i]; header.height = ++runningHeight; - header.chainwork = this._getChainwork(header, prevHeader).toString(16, 32); + header.chainwork = self._getChainwork(header, prevHeader).toString(16, 32); dbOps.push({ type: 'put', - key: this._encoding.encodeHeaderKey(header.hash), - value: this._encoding.encodeHeaderValue(header) + key: self._encoding.encodeHeaderKey(header.height, header.hash), + value: self._encoding.encodeHeaderValue(header) }); prevHeader = header; - this._headers.set(header.hash, header); + self._headers.set(header.hash, header); } - this._startingHash = this._tip.hash = newHeaders[newHeaders.length - 1].hash; - this._tip.height = this._tip.height + newHeaders.length; + self._startingHash = self._tip.hash = newHeaders[newHeaders.length - 1].hash; + self._tip.height = self._tip.height + newHeaders.length; - this._db.batch(dbOps); - this._sync(); + var tipOps = utils.encodeTip(self._tip, self.name); + dbOps.push({ + type: 'put', + key: tipOps.key, + value: tipOps.value + }); + + self._db._store.batch(dbOps, function() { + self._sync(); + }); }; @@ -168,6 +223,7 @@ HeaderService.prototype._setListeners = function() { }; HeaderService.prototype._onBestHeight = function(height) { + assert(height >= this._tip.height, 'Our peer does not seem to be fully synced.'); log.debug('Header Service: Best Height is: ' + height); this._bestHeight = height; this._startSync(); diff --git a/lib/services/mempool/index.js b/lib/services/mempool/index.js index 9aa6d180..795266de 100644 --- a/lib/services/mempool/index.js +++ b/lib/services/mempool/index.js @@ -42,17 +42,17 @@ MempoolService.prototype._startSubscriptions = function() { this._subscribed = true; if (!this._bus) { - this._bus = this.node.openBus({remoteAddress: 'localhost'}); + this._bus = this.node.openBus({remoteAddress: 'localhost-mempool'}); } - this._bus.on('p2p/block', this._onBlock.bind(this)); + //this._bus.on('p2p/block', this._onBlock.bind(this)); this._bus.on('p2p/transaction', this._onTransaction.bind(this)); - this._bus.subscribe('p2p/block'); + //this._bus.subscribe('p2p/block'); this._bus.subscribe('p2p/transaction'); }; -MempoolService.prototype._onBlock = function(block) { +MempoolService.prototype.onBlock = function(block, callback) { // remove this block's txs from mempool var self = this; var ops = block.txs.map(function(tx) { @@ -61,7 +61,7 @@ MempoolService.prototype._onBlock = function(block) { key: self._encoding.encodeMempoolTransactionKey(tx.txid()) }; }); - self._db.batch(ops); + setImmediate(callback); }; MempoolService.prototype._onTransaction = function(tx) {