diff --git a/lib/logger.js b/lib/logger.js index 0e8576c7..253a5277 100644 --- a/lib/logger.js +++ b/lib/logger.js @@ -13,7 +13,7 @@ function Logger(options) { options = {}; } this.formatting = _.isUndefined(options.formatting) ? Logger.DEFAULT_FORMATTING : options.formatting; - this.permitWrites = true; + this.level = options.logLevel || process.env['LOG_LEVEL'] || 0; // lower numbers means less logs } Logger.DEFAULT_FORMATTING = true; @@ -32,9 +32,7 @@ Logger.prototype.info = function() { */ Logger.prototype.error = function() { var existingPermitWrites = this.permitWrites; - this.permitWrites = true; this._log.apply(this, ['red', 'error'].concat(Array.prototype.slice.call(arguments))); - this.permitWrites = existingPermitWrites; }; /** @@ -42,7 +40,29 @@ Logger.prototype.error = function() { * #debug */ Logger.prototype.debug = function() { - this._log.apply(this, ['magenta', 'debug'].concat(Array.prototype.slice.call(arguments))); + if (this.level >= 1) { + this._log.apply(this, ['magenta', 'debug'].concat(Array.prototype.slice.call(arguments))); + } +}; + +/** + * Prints an debug2 message + * #debug2 + */ +Logger.prototype.debug2 = function() { + if (this.level >= 2) { + this._log.apply(this, ['green', 'debug'].concat(Array.prototype.slice.call(arguments))); + } +}; + +/** + * Prints an debug3 message + * #debug3 + */ +Logger.prototype.debug3 = function() { + if (this.level >= 3) { + this._log.apply(this, ['cyan', 'debug'].concat(Array.prototype.slice.call(arguments))); + } }; /** @@ -58,9 +78,6 @@ Logger.prototype.warn = function() { * #_log */ Logger.prototype._log = function(color) { - if (!this.permitWrites) { - return; - } var args = Array.prototype.slice.call(arguments); args = args.slice(1); var level = args.shift(); diff --git a/lib/service.js b/lib/service.js index 8489b75d..006e8b4d 100644 --- a/lib/service.js +++ b/lib/service.js @@ -89,99 +89,4 @@ Service.prototype.getRoutePrefix = function() { return this.name; }; -Service.prototype._createConcurrencyCache = function(opts) { - this._concurrencyCache = LRU(opts || 500); -}; - -Service.prototype._createCache = function(opts) { - this._cache = LRU(opts || 500); -}; - -Service.prototype._retrieveCachedItems = function(key, valueItem, prevKey, fn) { - var self = this; - - var prev = self._concurrencyCache.get(prevKey); - - if (prev && !prev.prevKey) { - - if (fn) { - valueItem = fn.call(self, valueItem, prev.valueItem); - } - - self._concurrencyCache.del(prevKey); - self._concurrencyCache.set(key, { valueItem: valueItem }); - return [{ key: key, value: valueItem }]; - } - - self._concurrencyCache.set(key, { valueItem: valueItem, prevKey: prevKey }); - - var resolvedDeps = []; - var depKey = key; - - self._concurrencyCache.rforEach(function(value, key) { - - if (depKey === value.prevKey) { - - valueItem = fn.call(self, value.valueItem, depKey.valueItem); - resolvedDeps.push({ key: key, value: valueItem }); - depKey = key; - self._concurrencyCache.del(key); - - } - }); - - return resolvedDeps; -}; - -Service.prototype._getGenesisBlock = function() { - - this.genesis = {}; - this.genesis.height = 0; - this.genesis.hash = constants.BITCOIN_GENESIS_HASH[this.node.getNetworkName()]; - return this.genesis; - -}; - -Service.prototype._decodeTipData = function(tipBuf) { - - assert(tipBuf && Buffer.isBuffer(tipBuf) && tipBuf.length === 36, - 'tip data: ' + tipBuf.toString('hex') + ' if not in the proper format'); - - var hash = tipBuf.slice(0, 32).toString('hex'); - var height = tipBuf.slice(32).toString('hex').readUInt32BE(); - return { - hash: hash, - height: height - }; -}; - -Service.prototype._loadTip = function(callback) { - - var self = this; - - self.db.get(new Buffer('00', 'hex') + self.name, function(err, tipBuf) { - - if (err) { - return callback(err); - } - - if (!tipBuf) { - self.tip = self._getGenesisBlock(); - self._lastHeight = 0; - return callback(); - } - - self.tip = self._decodeTipData(tipData); - self._lastHeight = self.tip.height; - - log.info('Loaded tip for: ' + self.name + ' service, hash: ' + - self.tip.hash + ' height: ' + self.tip.height); - - callback(); - - }); - - -}; - module.exports = Service; diff --git a/lib/services/block/encoding.js b/lib/services/block/encoding.js index fd9f398c..cbc017e9 100644 --- a/lib/services/block/encoding.js +++ b/lib/services/block/encoding.js @@ -29,13 +29,13 @@ Encoding.prototype.decodeBlockValue = function(buffer) { // ---- height --> hash, chainwork Encoding.prototype.encodeMetaKey = function(height) { - var heigthtBuf = new Buffer(4); + var heightBuf = new Buffer(4); heightBuf.writeUInt32BE(height); return Buffer.concat([ this.blockPrefix, this.metaPrefix, heightBuf ]); }; Encoding.prototype.decodeMetaKey = function(buffer) { - return heightBuf.readUInt32BE(3); + return buffer.readUInt32BE(3); }; Encoding.prototype.encodeMetaValue = function(value) { diff --git a/lib/services/block/index.js b/lib/services/block/index.js index 114cfd6f..f2c8d429 100644 --- a/lib/services/block/index.js +++ b/lib/services/block/index.js @@ -1,5 +1,6 @@ 'use strict'; +var async = require('async'); var BaseService = require('../../service'); var inherits = require('util').inherits; var Encoding = require('./encoding'); @@ -39,7 +40,7 @@ var BlockService = function(options) { }); // hash -> block // keep track of out-of-order blocks, this is a list of chains (which are lists themselves) - // e.g. [ [ block1, block5 ], [ block2, block6 ] ]; + // e.g. [ [ block5, block4 ], [ block8, block7 ] ]; this._incompleteChains = []; this._chainTips = []; // list of all chain tips, including main chain and any chains that were orphaned after a reorg this._blockCount = 0; @@ -55,37 +56,6 @@ BlockService.MAX_CHAINWORK = new BN(1).ushln(256); BlockService.MAX_BLOCKS = 500; // --- public prototype functions -BlockService.prototype.start = function(callback) { - - var self = this; - - self._db.getPrefix(self.name, function(err, prefix) { - - if(err) { - return callback(err); - } - - self._db.getServiceTip('block', function(err, tip) { - - if (err) { - return callback(err); - } - self.prefix = prefix; - self._encoding = new Encoding(self.prefix); - self._loadMeta(); - self._setListeners(); - self._onTipBlock(tip); - callback(); - }); - }); - -}; - -BlockService.prototype.stop = function(callback) { - this._saveMetaData(); - callback(); -}; - BlockService.prototype.getAPIMethods = function() { var methods = [ ['getBlock', this, this.getBlock, 1], @@ -117,6 +87,39 @@ BlockService.prototype.getPublishEvents = function() { }; +BlockService.prototype.start = function(callback) { + + var self = this; + self._setListeners(); + + async.waterfall([ + function(next) { + self._db.getPrefix(self.name, next); + }, + function(prefix, next) { + self._prefix = prefix; + self._encoding = new Encoding(self._prefix); + self._db.getServiceTip('block', next); + }, + function(tip, next) { + self._tip = tip; + self._chainTips.push(self._tip.hash); + self._loadMeta(next); + } + ], function(err) { + if(err) { + return callback(err); + } + self._startSubscriptions(); + callback(); + }); + +}; + +BlockService.prototype.stop = function(callback) { + callback(); +}; + BlockService.prototype.subscribe = function(name, emitter) { this._subscriptions[name].push(emitter); @@ -260,49 +263,12 @@ BlockService.prototype._getBlockOperations = function(block) { }; -BlockService.prototype._getBlocksFromHashes = function(hashes) { - - var self = this; - - var blocks = hashes.map(function(hash) { - - var block = self._blockQueue.get(hash); - - if (!block) { - log.error('block: ' + hash + ' was not found in our in-memory block cache.'); - this.node.stop(); - return; - } - - return block; - - }); - - return blocks; - -}; - BlockService.prototype._getChainwork = function(tipHash) { var block = this._blockQueue.get(tipHash); - assert(block, 'expected to find a block in block queue for hash: ' + tipHash + ', but did not find it.'); - var prevHash = utils.reverseBufferToString(block.header.prevHash); - - var prevChainwork; - // if the previous block is the genesis block, then the chain work is the same for all know chains - if (prevHash === this.GENESIS_HASH) { - prevChainwork = new BN(new Buffer('0000000000000000000000000000000000000000000000000000000100010001', 'hex')); - } else { - var prevBlock = this._blockQueue.get(prevHash); - - assert(prevBlock, 'expected to find a previous block in block queue for hash: ' + - prevHash + ', but did not find any.'); - - // whatevs the latest chainwork in meta, this is the cumulative chainwork - var lastChainwork = this._meta[this._meta.length - 1].chainwork; - prevChainwork = new BN(new Buffer(lastChainwork, 'hex')); - } + var lastChainwork = this._meta[this._meta.length - 1].chainwork; + var prevChainwork = new BN(new Buffer(lastChainwork, 'hex')); return this._computeChainwork(block.header.bits, prevChainwork); }; @@ -318,10 +284,23 @@ BlockService.prototype._getDelta = function(tip) { blocks.push(blk); } + blocks.reverse(); return blocks; }; +BlockService.prototype._getIncompleteChainIndexes = function(block) { + var ret = []; + for(var i = 0; i < this._incompleteChains.length; i++) { + var chain = this._incompleteChains[i]; + var lastEntry = chain[chain.length - 1]; + if (utils.reverseBufferToString(lastEntry.header.prevHash) === block.hash) { + ret.push(i); + } + } + return ret; +}; + BlockService.prototype._handleReorg = function(block) { this._reorging = true; @@ -333,12 +312,12 @@ BlockService.prototype._handleReorg = function(block) { if (!commonAncestor) { log.error('A common ancestor block between hash: ' + this._tip.hash + ' (our current tip) and: ' + block.hash + ' (the forked block) could not be found. Bitcore-node must exit.'); + log.permitWrites = false; this.node.stop(); return; } log.warn('A common ancestor block was found to at hash: ' + commonAncestor + '.'); - this._setTip(block); this._broadcast(this.subscriptions.reorg, 'block/reorg', [block, commonAncestor]); this._reorging = false; @@ -369,17 +348,31 @@ BlockService.prototype._isOutOfOrder = function(block) { }; -BlockService.prototype._loadMeta = function() { - this._meta = this._db.loadBlockMetaData(); - if (!this._meta || this._meta.length < 1) { - this._rebuildMetaData(); - } +BlockService.prototype._loadMeta = function(callback) { + var self = this; + var criteria = { + gte: self._encoding.encodeMetaKey(0), + lte: self._encoding.encodeMetaKey(0xffffffff) + }; + var stream = this._db.createReadStream(criteria); + stream.on('error', self._onDbError.bind(self)); + stream.on('end', function() { + if (self._meta.length < 1) { + self._meta.push({ + chainwork: '0000000000000000000000000000000000000000000000000000000100010001', + hash: self.GENESIS_HASH + }); + } + callback(); + }); + stream.on('data', function(data) { + self._meta.push(self._encoding.decodeMetaValue(data.value)); + }); }; BlockService.prototype._onBestHeight = function(height) { - - var self = this; - self._bestHeight = height; + this._bestHeight = height; + this._startSync(); }; BlockService.prototype._onBlock = function(block) { @@ -403,7 +396,7 @@ BlockService.prototype._onBlock = function(block) { // 7. react to state of block switch (blockState) { - case 'orphaned': + case 'outoforder': // nothing to do, but wait until ancestor blocks come in break; case 'reorg': @@ -411,7 +404,7 @@ BlockService.prototype._onBlock = function(block) { break; default: // send all unsent blocks now that we have a complete chain - this._updateMeta(block); + this._saveMetaData(block); this._sendDelta(); break; } @@ -424,49 +417,20 @@ BlockService.prototype._onDbError = function(err) { }; -BlockService.prototype._onTipBlock = function(tip) { +BlockService.prototype._saveMetaData = function(block) { + var item = { + chainwork: this._getChainwork(block.hash).toString(16, 64), + hash: block.hash + }; - var self = this; - self._tip = tip; - self._chainTips.push(self._tip.hash); - self._startSubscriptions(); - self._startSync(); - -}; - -BlockService.prototype._rebuildMetaData = function() { - // we have to go through all the blocks in the database and calculate chainwork - // TODO: implement this - this._meta = [{ - hash: this.GENESIS_HASH, - chainwork: '0000000000000000000000000000000000000000000000000000000100010001' - }]; -}; - -BlockService.prototype._reportBootStatus = function() { - - var blockInfoString = utils.getBlockInfoString(this._tip.height, this._bestHeight); - - log.info('Block Service tip is currently height: ' + this._tip.height + ' hash: ' + - this._tip.hash + ' P2P network best height: ' + this._bestHeight + '. Block Service is: ' + - blockInfoString); + this._meta.push(item); + // save meta position in db + this._db.put(this._encoding.encodeMetaKey(this._meta.length - 1), + this._encoding.encodeMetaValue(item)); }; -BlockService.prototype._saveMetaData = function() { - try { - this._db.saveBlockMetaData(this._meta); - } catch(e) { - log.error('Block meta file failed to save, error: ' + e); - } -}; - -/* - Since blocks can arrive out of order from our trusted peer, we can't rely on the latest block - being the tip of the main/active chain. We should, instead, take the chain with the most work completed. - We need not concern ourselves whether or not the block is valid, we trust our peer to do this validation. -*/ BlockService.prototype._selectActiveChain = function() { var chainTip; @@ -523,15 +487,9 @@ BlockService.prototype._setListeners = function() { }; -BlockService.prototype._setMetaData = function(block) { - this._meta.push({ - chainwork: this._getChainwork(block.hash).toString(16, 64), - hash: block.hash - }); -}; - BlockService.prototype._setTip = function(tip) { log.debug('Setting tip to height: ' + tip.height); + log.debug('Setting tip to hash: ' + tip.hash); this._tip = tip; this._db.setServiceTip('block', this._tip); }; @@ -580,44 +538,6 @@ BlockService.prototype._sync = function() { }; -BlockService.prototype._updateNormalStateChainInfo = function(block, prevHash) { - var index = this._chainTips.indexOf(prevHash); - assert(index > -1, 'Block state is normal, ' + - 'yet the previous block hash is missing from the chain tips collection.'); - - this._chainTips.push(block.hash); - this._chainTips.splice(index, 1); -}; - -BlockService.prototype._updateReorgStateChainInfo = function(block) { - this._chainTips.push(block.hash); -}; - -BlockService.prototype._updateOutOfOrderStateChainInfo = function(block, prevHash) { - // add this block to the incomplete chains - // are we the parent of some other block in here? - // are we the child of some other block in here? - // block chains in here go newest to oldest block. - // When the oldest block's prevHash is one of the chainTips, the - // incomplete chain is now complete. - for(var i = 0; i < this._incompleteChains.length; i++) { - var chain = this._incompleteChains[i]; - // does the last block in this chain have a prevHash that equal's our block's hash? - // if so, this block should be pushed on the end - if (chain.length > 0) { - - if (utils.reverseBufferToString(chain[chain.length - 1].header.prevHash) === block.hash) { - chain.push(block); - } - - // is our block's prevHash equal to the first block's hash? - if (chain[0].hash === prevHash) { - chain.unshift(block); - } - } - } -}; - BlockService.prototype._updateChainInfo = function(block, state) { var prevHash = utils.reverseBufferToString(block.header.prevHash); @@ -628,33 +548,134 @@ BlockService.prototype._updateChainInfo = function(block, state) { } - // if this is a reorg state, then a new chain tip will be added to the list if (state === 'reorg') { this._updateReorgStateChainInfo(block); return; } - this._updateOutOfOrderStateChainInfo(block, prevHash); - + this._updateOutOfOrderStateChainInfo(block); }; -BlockService.prototype._updateMeta = function(block) { - // should always have at least one item in here. - var self = this; - var latestMetaHash = self._meta[self._meta.length - 1].hash; - var prevHash = utils.reverseBufferToString(block.header.prevHash); - if (prevHash === latestMetaHash) { - self._setMetaData(block); - latestMetaHash = block.hash; - // check for past orphans that we can now stack on top - self._blockQueue.rforEach(function(v) { - if (latestMetaHash === utils.reverseBufferToString(v.header.prevHash)) { - self._setMetaData(v); - latestMetaHash = v.hash; - } - }); + +BlockService.prototype._updateNormalStateChainInfo = function(block, prevHash) { + + var index = this._chainTips.indexOf(prevHash); + + assert(index > -1, 'Block state is normal, ' + + 'yet the previous block hash is missing from the chain tips collection.'); + + var incompleteChainIndexes = this._getIncompleteChainIndexes(block); + //retrieving more than one incomplete chain for a given block means there is a future reorg in the incomplete chain + if (incompleteChainIndexes.length < 1) { + this._chainTips.push(block.hash); + } else { + + incompleteChainIndexes.sort().reverse(); + for(var i = 0; i < incompleteChainIndexes.length; i++) { + var incompleteIndex = incompleteChainIndexes[i]; + this._chainTips.push(this._incompleteChains[incompleteIndex][0].hash); + this._incompleteChains.splice(incompleteIndex, 1); + } + } + + this._chainTips.splice(index, 1); + +}; + +BlockService.prototype._updateOutOfOrderStateChainInfo = function(block) { + + /* + At this point, we know we have a block that arrived out of order + so we need to search through our existing incomplete chains to detect the following criteria: + + 1. one of more of the chains has us as the last block in the chain + (we are the parent to every other block in that chain) + 2. one of more of the chains has us as the first block in the chain + (we are the youngest block in the chain) + 3. there are no chains that reference us as prev hash and our prev hash does not exist in any chain. + (we create a new incomplete chain with us as the only enrtry) + + */ + + var prevHash = utils.reverseBufferToString(block.header.prevHash); + var possibleJoins = { tip: [], genesis: [] }; + var newChains = []; + var joinedChains = false; + + for(var i = 0; i < this._incompleteChains.length; i++) { + + + // if we see that a block is the tip of one chain and the genesis of another, + // then we can join those chains together. + + var chain = this._incompleteChains[i]; + var firstEntry = chain[0]; + var lastEntry = chain[chain.length - 1]; + var chains; + + // criteria 1 + var lastEntryPrevHash = utils.reverseBufferToString(lastEntry.header.prevHash); + if (lastEntryPrevHash === block.hash) { + + joinedChains = true; + if (possibleJoins.tip.length > 0) { + + chains = utils.joinListsOnIndex(possibleJoins.tip, chain, this._incompleteChains); + newChains = utils.removeItemsByIndexList(possibleJoins.tip, newChains); + newChains = newChains.concat(chains); + + } else { + + // push the block on the end of chain + chain.push(block); + newChains.push(chain); + possibleJoins.genesis.push(i); + } + continue; + + } + + // criteria 2 + if (firstEntry.hash === prevHash) { + + joinedChains = true; + if (possibleJoins.genesis.length > 0) { + + chains = utils.joinListsOnIndex(possibleJoins.genesis, chain, this._incompleteChains, 'reverse'); + newChains = utils.removeItemsByIndexList(possibleJoins.genesis, newChains); + newChains = newChains.concat(chains); + + } else { + + // have we already put our block as the genesis of some other chain, if so, join the chains + // add the block as the first element on the chainf + chain.unshift(block); + newChains.push(chain); + possibleJoins.tip.push(i); + } + continue; + } + + newChains.push(chain); + + } + + + if (joinedChains) { + + this._incompleteChains = newChains; + return; + } + + // criteria 3 + this._incompleteChains.push([block]); + +}; + +BlockService.prototype._updateReorgStateChainInfo = function(block) { + this._chainTips.push(block.hash); }; module.exports = BlockService; diff --git a/lib/services/db/index.js b/lib/services/db/index.js index fc0c28f5..ac458ad6 100644 --- a/lib/services/db/index.js +++ b/lib/services/db/index.js @@ -10,6 +10,7 @@ var bitcore = require('bitcore-lib'); var Networks = bitcore.Networks; var $ = bitcore.util.preconditions; var Service = require('../../service'); +var constants = require('../../constants'); function DB(options) { @@ -44,19 +45,6 @@ util.inherits(DB, Service); DB.dependencies = []; -DB.prototype.saveBlockMetaData = function(data) { - fs.writeFileSync(this.node.datadir + '/block-meta-data.json', JSON.stringify(data)); -}; - -DB.prototype.loadBlockMetaData = function() { - var json; - try { - json = require(this.node.datadir + '/block-meta-data.json'); - } catch(e) { - } - return json; -}; - DB.prototype._setDataPath = function() { $.checkState(this.node.datadir, 'Node is expected to have a "datadir" property'); if (this.node.network === Networks.livenet) { @@ -182,7 +170,9 @@ DB.prototype.put = function(key, value, options, callback) { }); } else { - setImmediate(cb); + if (typeof cb === 'function') { + cb(); + } } }; @@ -275,11 +265,9 @@ DB.prototype.getServiceTip = function(serviceName, callback) { var tip; if (tipBuf) { - var heightBuf = new Buffer(4); - tip = { - height: heightBuf.readUInt32BE(), - hash: tipBuf.slice(4) + height: tipBuf.readUInt32BE(0,4), + hash: tipBuf.slice(4).toString('hex') }; } else { @@ -302,6 +290,7 @@ DB.prototype.setServiceTip = function(serviceName, tip) { var keyBuf = Buffer.concat([ self.dbPrefix, new Buffer('tip-' + serviceName, 'utf8') ]); var heightBuf = new Buffer(4); heightBuf.writeUInt32BE(tip.height); + var valBuf = Buffer.concat([ heightBuf, new Buffer(tip.hash, 'hex') ]); self.put(keyBuf, valBuf, function(err) { if (err) { diff --git a/lib/utils.js b/lib/utils.js index 84696978..09a6f4f0 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -188,5 +188,33 @@ utils.getBlockInfoString = function(tip, best) { } }; +utils.joinListsOnIndex = function(indexList, list1, list2, direction) { + + var newChains = []; + + indexList.forEach(function(index) { + var otherList = list2[index]; + if (direction && direction === 'reverse') { + newChains.push(otherList.concat(list1)); + } else { + newChains.push(list1.concat(otherList)); + } + }); + + return newChains; + + +}; + +utils.removeItemsByIndexList = function(indexList, list) { + var ret = []; + for(var i = 0; i < list.length; i++) { + var item = list[i]; + if (indexList.indexOf(i) === -1) { + ret.push(item); + } + } + return ret; +}; module.exports = utils;