From 45042aee574b706376d32c52078c85bfe772c91a Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Wed, 26 Jul 2017 17:36:41 -0400 Subject: [PATCH] wip --- lib/services/address/index.js | 4 +- lib/services/block/index.js | 494 +++++------------------------- lib/services/mempool/index.js | 12 +- lib/services/timestamp/index.js | 11 +- lib/services/transaction/index.js | 4 +- 5 files changed, 98 insertions(+), 427 deletions(-) diff --git a/lib/services/address/index.js b/lib/services/address/index.js index 66b40a23..d86ca036 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -424,7 +424,9 @@ AddressService.prototype.onBlock = function(block, callback) { self._db.batch(operations); } - callback(null, operations); + setImmediate(function() { + callback(null, operations); + }); }; AddressService.prototype._processInput = function(tx, input, opts) { diff --git a/lib/services/block/index.js b/lib/services/block/index.js index e89b2183..0b108077 100644 --- a/lib/services/block/index.js +++ b/lib/services/block/index.js @@ -6,11 +6,8 @@ var inherits = require('util').inherits; var Encoding = require('./encoding'); var index = require('../../'); var log = index.log; -var LRU = require('lru-cache'); var utils = require('../../utils'); -var _ = require('lodash'); var assert = require('assert'); -var BN = require('bn.js'); var constants = require('../../constants'); var BlockService = function(options) { @@ -27,22 +24,9 @@ var BlockService = function(options) { this._subscriptions.reorg = []; this._unprocessedBlocks = []; - // in-memory full/raw block cache - this._blockQueue = LRU({ - max: 50 * (1 * 1024 * 1024), // 50 MB of blocks, - length: function(n) { - return n.size; - } - }); // hash -> block - // keep track of out-of-order blocks, this is a list of chains (which are lists themselves) - // e.g. [ [ block5, block4 ], [ block8, block7 ] ]; - this._incompleteChains = []; - // list of all chain tips, including main chain and any chains that were orphaned after a reorg - this._chainTips = []; this._blockCount = 0; this.GENESIS_HASH = constants.BITCOIN_GENESIS_HASH[this.node.getNetworkName()]; - }; inherits(BlockService, BaseService); @@ -76,7 +60,6 @@ BlockService.prototype.getBlock = function(arg, callback) { } this._getBlock(hash, callback); - }; BlockService.prototype.getBlockOverview = function(hash, callback) { @@ -155,18 +138,14 @@ BlockService.prototype.start = function(callback) { self._prefix = prefix; self._encoding = new Encoding(self._prefix); self._db.getServiceTip('block', next); - }, - function(tip, next) { - self._setTip(tip); - self._chainTips.push(self._tip.hash); - self._primeBlockQueue(next); } - ], function(err) { + ], function(err, tip) { if(err) { return callback(err); } + self._setTip(tip); self._setListeners(); self._startSubscriptions(); callback(); @@ -175,33 +154,6 @@ BlockService.prototype.start = function(callback) { }; -BlockService.prototype._primeBlockQueue = function(callback) { - // this will load the last 50 blocks into the block queue to prime the cache - var self = this; - var hash = this._tip.hash; - - async.timesSeries(50, function(index, next) { - - self._db.get(self._encoding.encodeBlockKey(hash), function(err, data) { - - if(err) { - return next(err); - } - - if (!data) { - return next(); - } - - var block = self._encoding.decodeBlockValue(data); - hash = block.toHeaders().toJSON().prevBlock; - self._blockQueue.set(block.rhash(), block); - next(); - - }); - - }, callback); -}; - BlockService.prototype.stop = function(callback) { setImmediate(callback); }; @@ -233,133 +185,18 @@ BlockService.prototype.unsubscribe = function(name, emitter) { // --- start private prototype functions -BlockService.prototype._blockAlreadyProcessed = function(block) { - - return this._blockQueue.get(block.rhash()) ? true : false; - -}; - BlockService.prototype._broadcast = function(subscribers, name, entity) { for (var i = 0; i < subscribers.length; i++) { subscribers[i].emit(name, entity); } }; -BlockService.prototype._cacheBlock = function(block) { - - log.debug('Block Service: Setting block: ' + block.rhash() + ' in the block cache.'); - - // 1. set the block queue, which holds full blocks in memory - this._blockQueue.set(block.rhash(), block); - - // 2. store the block in the database - var operations = this._getBlockOperations(block); - - this._db.batch(operations); -}; - -BlockService.prototype._determineBlockState = function(block) { - - if (this._isOutOfOrder(block)) { - return 'outoforder'; +BlockService.prototype._detectReorg = function(block) { + var prevHash = block.toHeaders().toJSON().prevBlock; + if (this._tip.hash !== prevHash) { + return true; } - - if (this._isChainReorganizing(block)) { - return 'reorg'; - } - - return 'normal'; - -}; - -BlockService.prototype._getBlock = function(hash, callback) { - - var self = this; - - var block = this._blockQueue(hash); - - if (block) { - return callback(null, block); - } - - this._db.get(this._encoding.encodeBlockKey(hash), function(err, data) { - - if(err) { - return callback(err); - } - - if (!data) { - return callback(); - } - - var block = self._encoding.decodeBlockValue(data); - callback(null, block); - - }); -}; - -BlockService.prototype._getBlockOperations = function(block) { - - var self = this; - - // block or [block] - if (_.isArray(block)) { - var ops = []; - _.forEach(block, function(b) { - ops.push(self._getBlockOperations(b)); - }); - return _.flatten(ops); - } - - var operations = []; - - // block - operations.push({ - type: 'put', - key: self._encoding.encodeBlockKey(block.rhash()), - value: self._encoding.encodeBlockValue(block) - }); - - return operations; - -}; - -BlockService.prototype._getDelta = function(tip) { - - var blocks = []; - var _tip = tip; - - while (_tip !== this._tip.hash) { - var blk = this._blockQueue.get(_tip); - _tip = blk.toHeaders().toJSON().prevBlock; - blocks.push(blk); - } - - blocks.reverse(); - return blocks; - -}; - -BlockService.prototype._getHash = function(blockArg) { - - var headers = this._header.getAllHeaders(); - - if (utils.isHeight(blockArg)) { - return headers.getIndex(blockArg).hash; - } - -}; - -BlockService.prototype._getIncompleteChainIndexes = function(block) { - var ret = []; - for(var i = 0; i < this._incompleteChains.length; i++) { - var chain = this._incompleteChains[i]; - var lastEntry = chain[chain.length - 1].toHeaders().toJSON(); - if (lastEntry.prevHash === block.rhash()) { - ret.push(i); - } - } - return ret; + return false; }; BlockService.prototype._findCommonAncestor = function(hash, allHeaders, callback) { @@ -420,6 +257,36 @@ BlockService.prototype._findCommonAncestor = function(hash, allHeaders, callback }); }; +BlockService.prototype._getBlock = function(hash, callback) { + + var self = this; + + this._db.get(this._encoding.encodeBlockKey(hash), function(err, data) { + + if(err) { + return callback(err); + } + + if (!data) { + return callback(); + } + + var block = self._encoding.decodeBlockValue(data); + callback(null, block); + + }); +}; + +BlockService.prototype._getHash = function(blockArg) { + + var headers = this._header.getAllHeaders(); + + if (utils.isHeight(blockArg)) { + return headers.getIndex(blockArg).hash; + } + +}; + BlockService.prototype._handleReorg = function(hash, allHeaders) { this._reorging = true; // while this is set, we won't be sending blocks @@ -513,31 +380,6 @@ BlockService.prototype._onReorg = function(commonAncestorHeader, oldBlockList) { }; -BlockService.prototype._isChainReorganizing = function(block) { - - // if we aren't an out of order block, then is this block's prev hash our tip? - var prevHash = block.toHeaders().toJSON().prevBlock; - - return prevHash !== this._tip.hash; - -}; - -BlockService.prototype._isOutOfOrder = function(block) { - - // is this block the direct child of one of our chain tips? If so, not an out of order block - var prevHash = block.toHeaders().toJSON().prevBlock; - - for(var i = 0; i < this._chainTips.length; i++) { - var chainTip = this._chainTips[i]; - if (chainTip === prevHash) { - return false; - } - } - - return true; - -}; - BlockService.prototype._onAllHeaders = function(headers) { this._bestHeight = headers.size; this._startSync(); @@ -572,109 +414,70 @@ BlockService.prototype._processBlock = function() { function(err) { if (err) { - log.error(err); + log.error('Block Service: Error: ' + err); self.node.stop(); return; } - self._tip.height++; - self._tip.hash = block.rhash(); - var tipOps = utils.encodeTip(self._tip, self.name); - - operations.push({ - type: 'put', - key: tipOps.key, - value: tipOps.value - }); - self._db.batch(operations, function(err) { if (err) { - log.error(err); + log.error('Block Service: Error: ' + err); self.node.stop(); return; } - self._sync(); + + self._tip.height++; + self._tip.hash = block.rhash(); + var tipOps = utils.encodeTip(self._tip, self.name); + + self._db.put(tipOps.key, tipOps.value, function(err) { + + if (err) { + log.error('Block Service: Error: ' + err); + self.node.stop(); + return; + } + + self._sync(); + }); }); } ); }; +BlockService.prototype.onBlock = function(block, callback) { + var self = this; + + + setImmediate(function() { + callback(null, [{ + type: 'put', + key: self._encoding.encodeBlockKey(block.rhash()), + value: self._encoding.encodeBlockValue(block) + }]); + }); +}; + BlockService.prototype._onBlock = function(block) { - if (this.node.stopping) { + if (this.node.stopping || this._reorging) { return; } log.debug('Block Service: new block: ' + block.rhash()); + var reorg = this._detectReorg(block); + if (reorg) { + this._handleReorg(block, this._header.getAllHeaders()); + return; + } + this._unprocessedBlocks.push(block); this._processBlock(); }; -BlockService.prototype._selectActiveChain = function() { - - var chainTip; - var mostChainWork = new BN(0); - var headers = this._header.getAllHeaders(); - - if (this._chainTips.length === 1) { - return this._chainTips[0]; - } - - for(var i = 0; i < this._chainTips.length; i++) { - - var header = headers.get(this._chainTips[i]); - assert(header, 'we must have a header for chain tip.'); - var work = new BN(new Buffer(header.chainwork, 'hex')); - - if (work.gt(mostChainWork)) { - mostChainWork = work; - chainTip = this._chainTips[i]; - } - } - - return chainTip; - -}; - - -BlockService.prototype._sendDelta = function() { - - - // when this function is called, we know, for sure, that we have a complete chain of unsent block(s). - // our task is to send all blocks between active chain's tip and our tip. - var activeChainTip = this._selectActiveChain(); - - var blocks = this._getDelta(activeChainTip); - - for(var i = 0; i < blocks.length; i++) { - this._broadcast(this._subscriptions.block, 'block/block', blocks[i]); - } - - var newTipHeight = this._tip.height + i; - var newTipHash = blocks[i - 1].rhash(); - - this._setTip({ height: newTipHeight, hash: newTipHash }); - var tipOps = utils.encodeTip(this._tip, this.name); - - var ops = [{ - type: 'put', - key: tipOps.key, - value: tipOps.value - }]; - - this._db.batch(ops); - - if (++this._blockCount >= BlockService.MAX_BLOCKS) { - this._blockCount = 0; - log.debug('Block Service: calling sync again for more blocks.'); - this._sync(); - } - -}; - BlockService.prototype._setListeners = function() { this._header.once('headers', this._onAllHeaders.bind(this)); @@ -725,157 +528,18 @@ BlockService.prototype._sync = function() { if (this._tip.height < size) { - log.info('Block Service: Blocks download progress: ' + this._tip.height + '/' + - this._numNeeded + ' (' + (this._tip.height / this._bestHeight*100).toFixed(2) + '%)'); + if (this._tip.height % 100 === 0) { + log.info('Block Service: Blocks download progress: ' + this._tip.height + '/' + + this._bestHeight + ' (' + (this._tip.height / this._bestHeight*100).toFixed(2) + '%)'); + } - var end = headers.getIndex(Math.min(this._tip.height + BlockService.MAX_BLOCKS, size)); + var end = headers.getIndex(Math.min(this._tip.height + BlockService.MAX_BLOCKS + 1, size)); var endHash = end ? end.hash : null; - this._p2p.getBlocks({ startHash: this._tip.hash, endHash: endHash }); return; } }; -BlockService.prototype._updateChainInfo = function(block, state) { - - var prevHash = block.toHeaders().toJSON().prevBlock; - - if (state === 'normal') { - this._updateNormalStateChainInfo(block, prevHash); - return; - - } - - if (state === 'reorg') { - this._updateReorgStateChainInfo(block); - return; - } - - this._updateOutOfOrderStateChainInfo(block); - -}; - - -BlockService.prototype._updateNormalStateChainInfo = function(block, prevHash) { - - var index = this._chainTips.indexOf(prevHash); - - assert(index > -1, 'Block state is normal, ' + - 'yet the previous block hash is missing from the chain tips collection.'); - - var incompleteChainIndexes = this._getIncompleteChainIndexes(block); - //retrieving more than one incomplete chain for a given block means there is a future reorg in the incomplete chain - if (incompleteChainIndexes.length < 1) { - this._chainTips.push(block.rhash()); - } else { - - incompleteChainIndexes.sort().reverse(); - for(var i = 0; i < incompleteChainIndexes.length; i++) { - var incompleteIndex = incompleteChainIndexes[i]; - this._chainTips.push(this._incompleteChains[incompleteIndex][0].rhash()); - this._incompleteChains.splice(incompleteIndex, 1); - } - - } - - this._chainTips.splice(index, 1); - -}; - -BlockService.prototype._updateOutOfOrderStateChainInfo = function(block) { - - /* - At this point, we know we have a block that arrived out of order - so we need to search through our existing incomplete chains to detect the following criteria: - - 1. one of more of the chains has us as the last block in the chain - (we are the parent to every other block in that chain) - 2. one of more of the chains has us as the first block in the chain - (we are the youngest block in the chain) - 3. there are no chains that reference us as prev hash and our prev hash does not exist in any chain. - (we create a new incomplete chain with us as the only enrtry) - - */ - - var prevHash = block.toHeaders().toJSON().prevBlock; - var possibleJoins = { tip: [], genesis: [] }; - var newChains = []; - var joinedChains = false; - - for(var i = 0; i < this._incompleteChains.length; i++) { - - - // if we see that a block is the tip of one chain and the genesis of another, - // then we can join those chains together. - - var chain = this._incompleteChains[i]; - var firstEntry = chain[0]; - var lastEntry = chain[chain.length - 1]; - var chains; - - // criteria 1 - var lastEntryPrevHash = lastEntry.toHeaders().toJSON().prevBlock; - if (lastEntryPrevHash === block.hash) { - - joinedChains = true; - if (possibleJoins.tip.length > 0) { - - chains = utils.joinListsOnIndex(possibleJoins.tip, chain, this._incompleteChains); - newChains = utils.removeItemsByIndexList(possibleJoins.tip, newChains); - newChains = newChains.concat(chains); - - } else { - - // push the block on the end of chain - chain.push(block); - newChains.push(chain); - possibleJoins.genesis.push(i); - } - continue; - - } - - // criteria 2 - if (firstEntry.hash === prevHash) { - - joinedChains = true; - if (possibleJoins.genesis.length > 0) { - - chains = utils.joinListsOnIndex(possibleJoins.genesis, chain, this._incompleteChains, 'reverse'); - newChains = utils.removeItemsByIndexList(possibleJoins.genesis, newChains); - newChains = newChains.concat(chains); - - } else { - - // have we already put our block as the genesis of some other chain, if so, join the chains - // add the block as the first element on the chainf - chain.unshift(block); - newChains.push(chain); - possibleJoins.tip.push(i); - } - continue; - } - - newChains.push(chain); - - } - - - if (joinedChains) { - - this._incompleteChains = newChains; - return; - } - - // criteria 3 - this._incompleteChains.push([block]); - -}; - -BlockService.prototype._updateReorgStateChainInfo = function(block) { - this._chainTips.push(block.hash); -}; - module.exports = BlockService; diff --git a/lib/services/mempool/index.js b/lib/services/mempool/index.js index acce3bcf..1d75893b 100644 --- a/lib/services/mempool/index.js +++ b/lib/services/mempool/index.js @@ -12,7 +12,7 @@ var MempoolService = function(options) { util.inherits(MempoolService, BaseService); -MempoolService.dependencies = ['db']; +MempoolService.dependencies = ['db', 'block']; MempoolService.prototype.getAPIMethods = function() { var methods = [ @@ -69,10 +69,10 @@ MempoolService.prototype._startSubscriptions = function() { this._bus = this.node.openBus({remoteAddress: 'localhost-mempool'}); } - //this._bus.on('p2p/block', this._onBlock.bind(this)); - this._bus.on('p2p/transaction', this._onTransaction.bind(this)); + this._bus.on('block/reorg', this._onReorg.bind(this)); + this._bus.subscribe('block/reorg'); - //this._bus.subscribe('p2p/block'); + this._bus.on('p2p/transaction', this._onTransaction.bind(this)); this._bus.subscribe('p2p/transaction'); }; @@ -85,7 +85,9 @@ MempoolService.prototype.onBlock = function(block, callback) { key: self._encoding.encodeMempoolTransactionKey(tx.txid()) }; }); - setImmediate(callback); + setImmediate(function() { + callback(null, ops); + }); }; MempoolService.prototype._onTransaction = function(tx) { diff --git a/lib/services/timestamp/index.js b/lib/services/timestamp/index.js index 79bab071..b53d34b4 100644 --- a/lib/services/timestamp/index.js +++ b/lib/services/timestamp/index.js @@ -119,8 +119,8 @@ TimestampService.prototype._startSubscriptions = function() { this._bus = this.node.openBus({remoteAddress: 'localhost'}); } - this._bus.on('block/block', this._onBlock.bind(this)); - this._bus.subscribe('block/block'); + this._bus.on('block/reorg', this._onReorg.bind(this)); + this._bus.subscribe('block/reorg'); }; TimestampService.prototype._sync = function() { @@ -154,7 +154,7 @@ TimestampService.prototype.stop = function(callback) { setImmediate(callback); }; -TimestampService.prototype._onBlock = function(block) { +TimestampService.prototype.onBlock = function(block, callback) { var operations = []; @@ -191,8 +191,9 @@ TimestampService.prototype._onBlock = function(block) { ]); - this._db.batch(operations); - + setImmediate(function() { + callback(null, operations); + }); }; TimestampService.prototype._onReorg = function(oldBlockList, newBlockList, commonAncestor) { diff --git a/lib/services/transaction/index.js b/lib/services/transaction/index.js index b6b11a2a..e2dadbe2 100644 --- a/lib/services/transaction/index.js +++ b/lib/services/transaction/index.js @@ -167,7 +167,9 @@ TransactionService.prototype.onBlock = function(block, callback) { } - callback(null, operations); + setImmediate(function() { + callback(null, operations); + }); };