From 6cccce833d25d7e6aff9590b17cc9a138948da94 Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Mon, 22 May 2017 08:21:37 -0400 Subject: [PATCH] Moved closer toward not relying on zmq and rpc. --- lib/node.js | 29 +- lib/scaffold/start.js | 8 +- lib/services/bitcoind/index.js | 51 +- lib/services/block/block_handler.js | 112 ++-- lib/services/block/index.js | 811 +++++++++++++++++----------- lib/services/block/reorg.js | 61 ++- lib/services/timestamp/index.js | 2 +- lib/utils.js | 4 +- regtest/block.js | 6 +- regtest/db.js | 108 ++-- regtest/test_web.js | 2 + 11 files changed, 727 insertions(+), 467 deletions(-) diff --git a/lib/node.js b/lib/node.js index aa46635d..a996cc07 100644 --- a/lib/node.js +++ b/lib/node.js @@ -285,6 +285,7 @@ Node.prototype.getNetworkName = function() { * @param {Function} callback - Called when all services are stopped */ Node.prototype.stop = function(callback) { + log.info('Beginning shutdown'); var self = this; var services = this.getServiceOrder().reverse(); @@ -293,18 +294,22 @@ Node.prototype.stop = function(callback) { this.emit('stopping'); async.eachSeries( - services, - function(service, next) { - if (self.services[service.name]) { - log.info('Stopping ' + service.name); - self.services[service.name].stop(next); - } else { - log.info('Stopping ' + service.name + ' (not started)'); - setImmediate(next); - } - }, - callback - ); + + services, + function(service, next) { + if (self.services[service.name]) { + log.info('Stopping ' + service.name); + self.services[service.name].stop(next); + } else { + log.info('Stopping ' + service.name + ' (not started)'); + setImmediate(next); + } + }, + function(err) { + if (callback) { + callback(); + } + }); }; module.exports = Node; diff --git a/lib/scaffold/start.js b/lib/scaffold/start.js index 3f683608..696c17f6 100644 --- a/lib/scaffold/start.js +++ b/lib/scaffold/start.js @@ -131,8 +131,7 @@ function lookInRequirePathConfig(req, service) { if (!service.config.requirePath) { return; } - //if a file, then remove any possible '.js' extension - //if a directory, then leave alone + try { if (fs.statSync(service.config.requirePath).isDirectory()) { return req(service.config.requirePath); @@ -140,6 +139,7 @@ function lookInRequirePathConfig(req, service) { var serviceFile = service.config.requirePath.replace(/.js$/, ''); return req(serviceFile); } catch(e) { + // we want to throw in all cases expect file not found } } @@ -147,7 +147,7 @@ function lookInCwd(req, service) { try { return req(process.cwd + '/' + service); } catch(e) { - + // throw in all cases expect file not found } } @@ -156,7 +156,6 @@ function lookInBuiltInPath(req, service) { var serviceFile = path.resolve(__dirname, '../services/' + service.name); return req(serviceFile); } catch(e) { - } } @@ -169,7 +168,6 @@ function lookInModuleManifest(req, service) { return req(serviceModule); } } catch(e) { - } } diff --git a/lib/services/bitcoind/index.js b/lib/services/bitcoind/index.js index 801d5f92..2139ba9b 100644 --- a/lib/services/bitcoind/index.js +++ b/lib/services/bitcoind/index.js @@ -2,7 +2,6 @@ var util = require('util'); var bitcore = require('bitcore-lib'); -var Transaction = bitcore.Transaction; var zmq = require('zmq'); var async = require('async'); var BitcoinRPC = require('bitcoind-rpc'); @@ -26,6 +25,7 @@ function Bitcoin(options) { this.subscriptions = {}; this.subscriptions.rawtransaction = []; this.subscriptions.hashblock = []; + this.subscriptions.rawblock = []; this.startRetryTimes = this.options.startRetryTimes || 120; this.startRetryInterval = this.options.startRetryInterval || 1000; @@ -80,6 +80,12 @@ Bitcoin.prototype.getPublishEvents = function() { scope: this, subscribe: this.subscribe.bind(this, 'hashblock'), unsubscribe: this.unsubscribe.bind(this, 'hashblock') + }, + { + name: 'bitcoind/rawblock', + scope: this, + subscribe: this.subscribe.bind(this, 'rawblock'), + unsubscribe: this.unsubscribe.bind(this, 'rawblock') } ]; }; @@ -118,6 +124,19 @@ Bitcoin.prototype._getGenesisBlock = function(callback) { var self = this; + if (self.height === 0) { + return self.getRawBlock(self.tiphash, function(err, blockBuffer) { + + if(err) { + return callback(err); + } + + self.genesisBuffer = blockBuffer; + callback(); + + }); + } + self.client.getBlockHash(0, function(err, response) { if (err) { @@ -190,6 +209,20 @@ Bitcoin.prototype._initChain = function(callback) { }; +Bitcoin.prototype._zmqRawBlockHandler = function(message) { + + var block = new bitcore.Block(message); + this.tiphash = block.hash; + this.height++; + block.__height = this.height; + block.height = this.height; + + for (var i = 0; i < this.subscriptions.rawblock.length; i++) { + this.subscriptions.rawblock[i].emit('bitcoind/rawblock', block); + } + +}; + Bitcoin.prototype._zmqBlockHandler = function(message) { var self = this; @@ -229,12 +262,15 @@ Bitcoin.prototype._subscribeZmqEvents = function(node) { var self = this; node.zmqSubSocket.subscribe('hashblock'); node.zmqSubSocket.subscribe('rawtx'); + node.zmqSubSocket.subscribe('rawblock'); node.zmqSubSocket.on('message', function(topic, message) { var topicString = topic.toString('utf8'); if (topicString === 'rawtx') { self._zmqTransactionHandler(node, message); } else if (topicString === 'hashblock') { self._zmqBlockHandler(message); + } else if (topicString === 'rawblock') { + self._zmqRawBlockHandler(message); } }); }; @@ -419,17 +455,8 @@ Bitcoin.prototype.getBlock = function(blockArg, callback) { return done(self._wrapRPCError(err)); } - self.getBlockHeader(blockhash, function(err, header) { - - if(err) { - return done(self._wrapRPCError(err)); - } - - var blockObj = bitcore.Block.fromString(response.result); - blockObj.__height = header.height; - done(null, blockObj); - - }); + var blockObj = bitcore.Block.fromString(response.result); + done(null, blockObj); }); }, callback); diff --git a/lib/services/block/block_handler.js b/lib/services/block/block_handler.js index fd9b3198..94d4cd1f 100644 --- a/lib/services/block/block_handler.js +++ b/lib/services/block/block_handler.js @@ -5,17 +5,16 @@ var Transform = require('stream').Transform; var inherits = require('util').inherits; var EventEmitter = require('events').EventEmitter; var async = require('async'); -var bitcore = require('bitcore-lib'); -var Block = bitcore.Block; var index = require('../../index'); var log = index.log; +var _ = require('lodash'); function BlockStream(highWaterMark, sync) { Readable.call(this, {objectMode: true, highWaterMark: highWaterMark}); this.sync = sync; this.block = this.sync.block; this.dbTip = this.block.tip; - this.lastReadHeight = this.dbTip.__height; + this.lastReadHeight = this._getTipHeight(); this.lastEmittedHash = this.dbTip.hash; this.queue = []; this.processing = false; @@ -68,15 +67,20 @@ function BlockHandler(node, block) { this.db = this.node.services.db; this.block = block; this.syncing = false; - this.paused = false; //we can't sync while one of our indexes is reading/writing separate from us + this.paused = false; + this.blockQueue = []; this.highWaterMark = 10; } inherits(BlockHandler, EventEmitter); -BlockHandler.prototype.sync = function() { +BlockHandler.prototype.sync = function(block) { var self = this; + if (block) { + self.blockQueue.push(block); + } + if(this.syncing || this.paused) { log.debug('Sync lock held, not able to sync at the moment'); return; @@ -84,6 +88,12 @@ BlockHandler.prototype.sync = function() { self.syncing = true; + self._setupStreams(); + +}; + +BlockHandler.prototype._setupStreams = function() { + var self = this; var blockStream = new BlockStream(self.highWaterMark, self); var processConcurrent = new ProcessConcurrent(self.highWaterMark, self); var writeStream = new WriteStream(self.highWaterMark, self); @@ -101,7 +111,6 @@ BlockHandler.prototype.sync = function() { .pipe(processSerial); processSerial.on('finish', self._onFinish.bind(self)); - }; BlockHandler.prototype._onFinish = function() { @@ -129,7 +138,16 @@ BlockStream.prototype._read = function() { return this.push(null); } - this.queue.push(++this.lastReadHeight); + if (this.sync.blockQueue.length === 0) { + this.queue.push(++this.lastReadHeight); + } else { + var block = this.sync.blockQueue.shift(); + if (block) { + this.lastReadHeight = block.__height; + this.queue.push(block); + } + } + this._process(); }; @@ -149,59 +167,59 @@ BlockStream.prototype._process = function() { var blockArgs = self.queue.slice(0, Math.min(5, self.queue.length)); self.queue = self.queue.slice(blockArgs.length); - self._getBlocks(blockArgs, next); + + if (_.isNumber(blockArgs[0])) { + self.block.getBlocks(blockArgs, function(err, blocks) { + + if(err) { + if (err === 'reorg') { + self.push(null); + return next(); + } + return next(err); + } + + self._pushBlocks(blocks); + next(); + + }); + } else { + + self._pushBlocks(blockArgs); + next(); + + } }, function(err) { + if(err) { return self.emit('error', err); } self.processing = false; + } ); }; - -BlockStream.prototype._getBlocks = function(heights, callback) { +BlockStream.prototype._pushBlocks = function(blocks) { var self = this; - async.map(heights, function(height, next) { - if (height === 0) { - var block = new Block(self.block.genesis); - block.__height = 0; - return next(null, block); - } + for(var i = 0; i < blocks.length; i++) { - self.block.getBlock(height, function(err, block) { + self.lastEmittedHash = blocks[i].hash; + log.debug('Pushing block: ' + blocks[i].hash + ' from the blockstream'); + self.push(blocks[i]); - if (err === 'reorg') { - return self.push(null); - } + } - if(err) { - return next(err); - } +}; - block.__height = height; - next(null, block); - }); - - }, function(err, blocks) { - - if(err) { - return callback(err); - } - - for(var i = 0; i < blocks.length; i++) { - - self.lastEmittedHash = blocks[i].hash; - self.push(blocks[i]); - - } - - return callback(); - - }); +BlockStream.prototype._getTipHeight = function() { + if (this.dbTip.__height === 0) { + return -1; + } + return this.dbTip.__height; }; ProcessSerial.prototype._reportStatus = function() { @@ -235,7 +253,7 @@ ProcessSerial.prototype._write = function(block, enc, callback) { ProcessSerial.prototype._process = function(block, callback) { var self = this; - self.block.getSerialBlockOperations(block, true, function(err, operations) { + self.block.getBlockOperations(block, true, 'serial', function(err, operations) { if(err) { return callback(err); } @@ -268,7 +286,7 @@ ProcessConcurrent.prototype._transform = function(block, enc, callback) { this.lastBlock = block; - self.block.getConcurrentBlockOperations(block, true, function(err, operations) { + self.block.getBlockOperations(block, true, 'concurrent', function(err, operations) { if(err) { return callback(err); } @@ -323,7 +341,7 @@ ProcessBoth.prototype._write = function(block, encoding, callback) { var self = this; async.parallel([function(next) { - self.block.getConcurrentBlockOperations(block, true, function(err, operations) { + self.block.getBlockOperations(block, true, 'concurrent', function(err, operations) { if(err) { return callback(err); } @@ -331,7 +349,7 @@ ProcessBoth.prototype._write = function(block, encoding, callback) { next(null, operations); }); }, function(next) { - self.block.getSerialBlockOperations(block, true, function(err, operations) { + self.block.getBlockOperations(block, true, 'serial', function(err, operations) { if(err) { return callback(err); } diff --git a/lib/services/block/index.js b/lib/services/block/index.js index c1ab2f6a..4840a2fa 100644 --- a/lib/services/block/index.js +++ b/lib/services/block/index.js @@ -3,17 +3,17 @@ var BaseService = require('../../service'); var levelup = require('levelup'); var bitcore = require('bitcore-lib'); -var Block = bitcore.Block; var inherits = require('util').inherits; var Encoding = require('./encoding'); var index = require('../../'); var log = index.log; var BufferUtil = bitcore.util.buffer; -var utils = require('../../utils'); var Reorg = require('./reorg'); -var $ = bitcore.util.preconditions; var async = require('async'); var BlockHandler = require('./block_handler'); +var LRU = require('lru-cache'); +var utils = require('../../utils'); +var _ = require('lodash'); var BlockService = function(options) { BaseService.call(this, options); @@ -27,6 +27,8 @@ var BlockService = function(options) { keyEncoding: 'string', valueEncoding: 'binary' }; + this._blockQueue = LRU(50); //hash -> header, height -> header, + this._rawBlockQueue = LRU(10); // keep 10 blocks in the cache in case of reorg's }; inherits(BlockService, BaseService); @@ -36,32 +38,6 @@ BlockService.dependencies = [ 'db' ]; -BlockService.prototype.getNetworkTipHash = function() { - return this.bitcoind.tiphash; -}; - -BlockService.prototype._startSubscriptions = function() { - - var self = this; - - if (!self._subscribed) { - - self._subscribed = true; - self.bus = self.node.openBus({remoteAddress: 'localhost'}); - - self.bus.on('bitcoind/hashblock', function() { - self._blockHandler.sync(); - }); - - self.bus.subscribe('bitcoind/hashblock'); - } - -}; - -BlockService.prototype._log = function(msg) { - return log.info('BlockService: ', msg); -}; - BlockService.prototype.start = function(callback) { var self = this; @@ -79,180 +55,40 @@ BlockService.prototype.start = function(callback) { }); }; +BlockService.prototype.stop = function(callback) { + if (callback) { + setImmediate(callback); + } +}; + BlockService.prototype._sync = function() { var self = this; async.waterfall([ self._loadTips.bind(self), - self._detectReorg.bind(self), + self._detectStartupReorg.bind(self), self._getBlocks.bind(self) - ], function(err) { + ], function(err, blocksDiff) { if(err) { - throw err; + log.error(err); + self.node.stop(); } - self._blockHandler.sync(); - }); - -}; - -BlockService.prototype._getBlocks = function(callback) { - - var self = this; - var blocksDiff = self.bitcoind.height - self.tip.__height; - - if (blocksDiff < 0) { - self._log('Peer\'s height is less than our own. The peer may be syncing.' + - ' The system is usable, but chain may have a reorg in future blocks.' + - ' We may not answer queries about blocks at heights greater than ' + self.bitcoind.height); - self._blockHandler.sync(); - return; - } - - self._log('Syncing: ' + blocksDiff + ' blocks from the network.'); - - var operations = []; - - async.timesLimit(blocksDiff, 8, function(n, next) { - - var blockNumber = n + self.tip.__height + 1; - self.bitcoind.getBlockHeader(blockNumber, function(err, header) { - - if(err) { - return next(err); - } - - operations.push({ - type: 'put', - key: self.encoding.encodeBlockHashKey(header.hash), - value: self.encoding.encodeBlockHeightValue(header.height) - }); - - operations.push({ - type: 'put', - key: self.encoding.encodeBlockHeightKey(header.height), - value: self.encoding.encodeBlockHashValue(header.hash) - }); - - next(); - }); - - }, function(err) { - - if(err) { - return callback(err); + if (blocksDiff > 0) { + return self._blockHandler.sync(); } - self.db.batch(operations, callback); - - }); -}; - -BlockService.prototype._setHandlers = function() { - var self = this; - self.node.once('ready', function() { - self.genesis = Block.fromBuffer(self.bitcoind.genesisBuffer); - self._sync(); - }); - - self._blockHandler.on('synced', function() { self._startSubscriptions(); }); -}; - -BlockService.prototype.stop = function(callback) { - setImmediate(callback); -}; - -BlockService.prototype.pauseSync = function(callback) { - var self = this; - self._lockTimes.push(process.hrtime()); - if (self._sync.syncing) { - self._sync.once('synced', function() { - self._sync.paused = true; - callback(); - }); - } else { - self._sync.paused = true; - setImmediate(callback); - } -}; - -BlockService.prototype.resumeSync = function() { - this._log('Attempting to resume sync', log.debug); - var time = this._lockTimes.shift(); - if (this._lockTimes.length === 0) { - if (time) { - this._log('sync lock held for: ' + utils.diffTime(time) + ' secs', log.debug); - } - this._sync.paused = false; - this._sync.sync(); - } -}; - -BlockService.prototype._detectReorg = function(callback) { - - var self = this; - - if (self.tip.__height <= 0) { - return callback(); - } - - // all synced - if (self.tip.hash === self.bitcoind.tiphash && self.tip.__height === self.bitcoind.height) { - return callback(); - } - - // check if our tip height has the same hash as the network's - self.bitcoind.getBlockHeader(self.tip.__height, function(err, header) { - - if(err) { - return callback(err); - } - - // we still might have a reorg if our tip is greater than the network's - // we won't know about this until we start syncing - if (header.hash === self.tip.hash) { - return callback(); - } - - //our hash isn't in the network chain anymore, we have reorg'ed - self._handleReorg(header.hash, callback); - - }); -}; - -BlockService.prototype._handleReorg = function(hash, callback) { - - var self = this; - self.printTipInfo('Reorg detected!'); - - self.reorg = true; - - var reorg = new Reorg(self.node, self); - - reorg.handleReorg(hash, function(err) { - - if(err) { - self._log('Reorg failed! ' + err, log.error); - self.node.stop(function() {}); - throw err; - } - - self.printTipInfo('Reorg successful!'); - self.reorg = false; - callback(); - - }); }; BlockService.prototype.printTipInfo = function(prependedMessage) { - this._log( + log.info( prependedMessage + ' Serial Tip: ' + this.tip.hash + ' Concurrent tip: ' + this.concurrentTip.hash + ' Bitcoind tip: ' + this.bitcoind.tiphash @@ -260,74 +96,62 @@ BlockService.prototype.printTipInfo = function(prependedMessage) { }; -BlockService.prototype._loadTips = function(callback) { +BlockService.prototype.getNetworkTipHash = function() { + return this.bitcoind.tiphash; +}; + +BlockService.prototype._getBlockOperations = function(obj) { var self = this; - var tipStrings = ['tip', 'concurrentTip']; - - async.each(tipStrings, function(tip, next) { - - self.db.get(self.dbPrefix + tip, self.dbOptions, function(err, tipData) { - - if(err && !(err instanceof levelup.errors.NotFoundError)) { - return next(err); - } - - var hash; - if (!tipData) { - hash = new Array(65).join('0'); - self[tip] = { - height: -1, - hash: hash, - '__height': -1 - }; - return next(); - } - - hash = tipData.slice(0, 32).toString('hex'); - - self.bitcoind.getBlock(hash, function(err, block) { - - if(err) { - return next(err); - } - - self[tip] = block; - self._log('loaded ' + tip + ' hash: ' + block.hash + ' height: ' + block.__height); - next(); - - }); + if (_.isArray(obj)) { + var ops = []; + _.forEach(obj, function(block) { + ops.push(self._getBlockOperations(block)); }); + return _.flatten(ops); + } - }, function(err) { - - if(err) { - return callback(err); - } - - self._log('Bitcoin network tip is currently: ' + self.bitcoind.tiphash + ' at height: ' + self.bitcoind.height); - callback(); + var operations = []; + operations.push({ + type: 'put', + key: self.encoding.encodeBlockHashKey(obj.hash), + value: self.encoding.encodeBlockHeightValue(obj.height) }); + operations.push({ + type: 'put', + key: self.encoding.encodeBlockHeightKey(obj.height), + value: self.encoding.encodeBlockHashValue(obj.hash) + }); + + return operations; + }; -BlockService.prototype.getConcurrentBlockOperations = function(block, add, callback) { +BlockService.prototype.getBlockOperations = function(block, add, type, callback) { var operations = []; async.each( this.node.services, function(mod, next) { - if(mod.concurrentBlockHandler) { - $.checkArgument(typeof mod.concurrentBlockHandler === 'function', 'concurrentBlockHandler must be a function'); - mod.concurrentBlockHandler.call(mod, block, add, function(err, ops) { + var fn = mod.blockHandler; + + if (type === 'concurrent') { + fn = mod.concurrentBlockHandler; + } + + if(fn) { + + fn.call(mod, block, add, function(err, ops) { + if (err) { return next(err); } + if (ops) { - $.checkArgument(Array.isArray(ops), 'concurrentBlockHandler for ' + mod.name + ' returned non-array'); operations = operations.concat(ops); } @@ -345,40 +169,7 @@ BlockService.prototype.getConcurrentBlockOperations = function(block, add, callb callback(null, operations); } ); -}; -BlockService.prototype.getSerialBlockOperations = function(block, add, callback) { - var operations = []; - - async.eachSeries( - this.node.services, - function(mod, next) { - if(mod.blockHandler) { - $.checkArgument(typeof mod.blockHandler === 'function', 'blockHandler must be a function'); - - mod.blockHandler.call(mod, block, add, function(err, ops) { - if (err) { - return next(err); - } - if (ops) { - $.checkArgument(Array.isArray(ops), 'blockHandler for ' + mod.name + ' returned non-array'); - operations = operations.concat(ops); - } - - next(); - }); - } else { - setImmediate(next); - } - }, - function(err) { - if (err) { - return callback(err); - } - - callback(null, operations); - } - ); }; BlockService.prototype.getTipOperation = function(block, add, tipType) { @@ -403,72 +194,476 @@ BlockService.prototype.getTipOperation = function(block, add, tipType) { }; }; -BlockService.prototype.getBlock = function(height, callback) { +BlockService.prototype.getBlocks = function(blockArgs, callback) { var self = this; - if (self.tip.__height >= self.bitcoind.height) { - // if our block service's tip is ahead of the network tip, then we need to - // watch for a reorg by getting what we have for the tip hash and comparing it to - // what the network has. - return self.db.get(self.encoding.encodeBlockHeightKey(height), function(err, hash) { + async.mapLimit(blockArgs, 8, function(blockArg, next) { + + async.waterfall([ + + self._isGenesisBlock.bind(self, blockArg), + + function(block, next) { + + if (block) { + return next(null, self.genesis); + } + + next(null, self._rawBlockQueue.get(blockArg)); + }, + + function(block, next) { + + if (block) { + return next(null, block); + } + + self._getBlock(blockArg, next); + } + ], next); + + }, function(err, blocks) { + + if(err) { + return callback(err); + } + + self._detectReorg(blocks, function(reorgHash, reorgHeight) { + + // if we have reorg'ed, we want to retain the block's hash in our + // index, but we want to mark it as "-REORG" + var reorgOperations = self._getReorgOperations(reorgHash, reorgHeight); + + var headers = blocks.map(function(block) { + return { + hash: block.hash, + prevHash: utils.reverseBufferToString(block.header.prevHash), + height: block.__height + }; + }); + + var operations = self._getBlockOperations(headers); + + if (reorgOperations) { + operations = reorgOperations.concat(operations); + } + + self.db.batch(operations, function(err) { + callback(err, blocks); + }); + + }); + + }); + +}; + +BlockService.prototype.getBlockHeader = function(blockArg, callback) { + + var self = this; + var header = self._blockQueue.get(blockArg); + if (header) { + return setImmediate(function() { + callback(null, header); + }); + } + + self.bitcoind.getBlockHeader(blockArg, function(err, header) { + + if(err) { + return callback(err); + } + + self._setBlockQueue(header); + callback(null, header); + + }); +}; + + +BlockService.prototype.getBlockHash = function(height, callback) { + + this._getBlockValue(height, callback); + +}; + +BlockService.prototype.getBlockHeight = function(hash, callback) { + + this._getBlockValue(hash, callback); + +}; + +BlockService.prototype._startSubscriptions = function() { + + var self = this; + + if (!self._subscribed) { + + self._subscribed = true; + self.bus = self.node.openBus({remoteAddress: 'localhost'}); + + self.bus.on('bitcoind/rawblock', function(block) { + + log.info('New block received: ' + block.hash + ' at height: ' + block.height); + self._cacheRawBlock(block); + var header = self._getHeader(block); + self._setBlockQueue(header); + + self._detectReorg([block], function() { + self._blockHandler.sync(block); + }); + + }); + + self.bus.subscribe('bitcoind/rawblock'); + } + +}; + +BlockService.prototype._cacheRawBlock = function(block) { + log.debug('Setting block: ' + block.hash + ' in the raw block cache.'); + this._rawBlockQueue.set(block.hash, block); +}; + +BlockService.prototype._getBlocks = function(callback) { + + var self = this; + var blocksDiff = self.bitcoind.height - self.tip.__height; + + // we will need to wait for new blocks to be pushed to us + if (blocksDiff < 0) { + log.warn('Peer\'s height is less than our own. The peer may be syncing. ' + + 'The system is usable, but the chain may have a reorg in future blocks. ' + + 'You should not rely on query responses for heights greater than ' + + self.bitcoind.height + ' until fully synced.'); + + return callback(null, blocksDiff); + } + + log.info('Syncing: ' + blocksDiff + ' blocks from the network.'); + + var operations = []; + + async.timesLimit(blocksDiff, 8, function(n, next) { + + var blockNumber = n + self.tip.__height + 1; + + self.getBlockHeader(blockNumber, function(err, header) { if(err) { + return next(err); + } + + self._getBlockOperations(header).forEach(function(op) { + operations.push(op); + }); + + next(); + }); + + }, function(err) { + + if(err) { + return callback(err); + } + + self.db.batch(operations, function(err) { + + if (err) { return callback(err); } - self.bitcoind.getBlock(height, function(res, block) { + log.debug('Completed syncing block headers from the network.'); + callback(null, blocksDiff); + }); + + }); +}; + +BlockService.prototype._getHeader = function(block) { + + return { + hash: block.hash, + version: 1, + prevHash: utils.reverseBufferToString(block.header.prevHash), + merkleRoot: utils.reverseBufferToString(block.header.merkleRoot), + time: block.header.time, + height: block.__height + }; +}; + +BlockService.prototype._setBlockQueue = function(header) { + + this._blockQueue.set(header.height, header); + this._blockQueue.set(header.hash, header); + +}; + +BlockService.prototype._setHandlers = function() { + var self = this; + self.node.once('ready', function() { + + self.genesis = bitcore.Block.fromBuffer(self.bitcoind.genesisBuffer); + self.genesis.__height = 0; + self.genesis.height = 0; + + var genesisHeader = self._getHeader(self.genesis); + self._setBlockQueue(genesisHeader); + self.db.batch(self._getBlockOperations(genesisHeader), function(err) { + + if (err) { + log.error('Unable to store genesis block in block index.'); + return; + } + + self._sync(); + }); + + }); + + self._blockHandler.on('synced', function() { + log.debug('Synced: ' + self.tip.hash); + self._startSubscriptions(); + }); +}; + +BlockService.prototype._detectStartupReorg = function(callback) { + + var self = this; + + // does our block's header exist on the network. and if, so, is it the correct height? + var height = self.bitcoind.height; + var hash = self.bitcoind.tiphash; + + self.getBlockHeader(hash, function(err, header) { + + if (err) { + if (err.code === -5) { + return callback(); + } + return callback(err); + } + + if (header.height === height) { + return callback(); + } + + return self._handleReorg(header.hash, callback); + + }); +}; + +BlockService.prototype._handleReorg = function(hash, callback) { + + var self = this; + self.printTipInfo('Reorg detected!'); + + self.reorg = true; + + var reorg = new Reorg(self.node, self); + + reorg.handleReorg(hash, function(err) { + + if(err) { + log.error('Reorg failed! ' + err); + self.node.stop(); + } + + self.printTipInfo('Reorg successful!'); + self.reorg = false; + callback(); + + }); + +}; + +BlockService.prototype._loadTips = function(callback) { + + var self = this; + + var tipStrings = ['tip', 'concurrentTip']; + + async.each(tipStrings, function(tip, next) { + + self.db.get(self.dbPrefix + tip, self.dbOptions, function(err, tipData) { + + if(err && !(err instanceof levelup.errors.NotFoundError)) { + return next(err); + } + + var hash; + if (!tipData) { + self[tip] = self.genesis; + return next(); + } + + hash = tipData.slice(0, 32).toString('hex'); + + self.bitcoind.getBlock(hash, function(err, block) { + + if(err) { + return next(err); + } + + self[tip] = block; + log.info('loaded ' + tip + ' hash: ' + block.hash + ' height: ' + block.__height); + next(); + + }); + }); + + }, function(err) { + + if(err) { + return callback(err); + } + + log.info('Bitcoin network tip is currently: ' + self.bitcoind.tiphash + ' at height: ' + self.bitcoind.height); + callback(); + + }); + +}; + +BlockService.prototype._detectReorg = function(blocks, callback) { + + var self = this; + var tipHash = self.tip.hash; + var tipHeight = self.tip.__height; + var forkedHash; + + for(var i = 0; i < blocks.length; i++) { + + if (blocks[i].__height === 0) { + continue; + } + + var prevHash = utils.reverseBufferToString(blocks[i].header.prevHash); + if (prevHash !== tipHash) { + forkedHash = blocks[i].hash; + break; + } + + tipHash = prevHash; + } + + if (forkedHash) { + return self._handleReorg(forkedHash, function() { + callback(tipHash, tipHeight); + }); + } + + callback(); + +}; + +BlockService.prototype._getBlockValue = function(hashOrHeight, callback) { + + var self = this; + + var key, valueFn; + + if (hashOrHeight.length < 64) { + key = self.encoding.encodeBlockHeightKey(parseInt(hashOrHeight)); + valueFn = self.encoding.decodeBlockHashValue.bind(self.encoding); + } else { + key = self.encoding.encodeBlockHashKey(hashOrHeight); + valueFn = self.encoding.decodeBlockHeightValue.bind(self.encoding); + } + + self.db.get(key, function(err, buf) { + if (err instanceof levelup.errors.NotFoundError) { + return callback(); + } + if (err) { + return callback(err); + } + callback(null, valueFn(buf)); + }); + +}; + +BlockService.prototype._isGenesisBlock = function(blockArg, callback) { + + if (blockArg.length === 64) { + + return this._getBlockValue(blockArg, function(err, value) { + + if (err) { + return callback(null, false); + } + + if (value === 0) { + return callback(null, true); + } + + callback(null, false); + + }); + + } + + setImmediate(function() { + + if (blockArg === 0) { + return callback(null, true); + } + callback(null, false); + }); + +}; + +BlockService.prototype._getBlock = function(blockArg, callback) { + + var self = this; + self.bitcoind.getBlock(blockArg, function(err, block) { + + if(err) { + return callback(err); + } + + if (blockArg.length === 64) { + return self.getBlockHeader(blockArg, function(err, header) { if(err) { return callback(err); } - //oh noes! reorg sitch - if (hash !== block.hash) { - - callback('reorg'); - return self._handleReorg(block.hash, function() { - self._blockHandler.sync(); - }); - - } - + block.__height = header.height; + block.height = header.height; callback(null, block); + }); + } + block.__height = blockArg; + block.height = blockArg; + callback(null, block); + }); +}; - }); +BlockService.prototype._getReorgOperations = function(hash, height) { + + if (!hash || !height) { + return; } - self.bitcoind.getBlock(height, callback); - -}; - - -BlockService.prototype.getBlockHash = function(height, callback) { var self = this; - self.db.get(this.encoding.encodeBlockHeightKey(height), function(err, hashBuf) { - if (err instanceof levelup.errors.NotFoundError) { - return callback(); - } - if (err) { - return callback(err); - } - callback(null, self.encoding.decodeBlockHashValue(hashBuf)); - }); -}; -BlockService.prototype.getBlockHeight = function(hash, callback) { - var self = this; - self.db.get(this.encoding.encodeBlockHashKey(hash), function(err, heightBuf) { - if (err instanceof levelup.errors.NotFoundError) { - return callback(); - } - if (err) { - return callback(err); - } - callback(null, self.encoding.decodeBlockHeightValue(heightBuf)); - }); + var heightKey = self.encoding.encodeBlockHeightKey(height); + var hashKey = self.encoding.encodeBlockHashKey(hash); + var heightValue = self.encoding.encodeBlockHeightValue(height); + var newHashKey = self.encoding.encodeBlockHashKey(hash + '-REORG'); + var newHashValue = self.encoding.encodeBlockHashValue(hash + '-REORG'); + + return [ + { action: 'del', key: heightKey }, + { action: 'del', key: hashKey }, + { action: 'put', key: newHashKey, value: heightValue }, + { action: 'put', key: heightKey, value: newHashValue } + ]; + }; module.exports = BlockService; diff --git a/lib/services/block/reorg.js b/lib/services/block/reorg.js index da864d61..ced5ece4 100644 --- a/lib/services/block/reorg.js +++ b/lib/services/block/reorg.js @@ -1,13 +1,12 @@ 'use strict'; -var index = require('../../'); var bitcore = require('bitcore-lib'); var BufferUtil = bitcore.util.buffer; -var log = index.log; var async = require('async'); function Reorg(node, block) { this.node = node; this.block = block; + this.db = block.db; } Reorg.prototype.handleReorg = function(newBlockHash, callback) { @@ -40,7 +39,10 @@ Reorg.prototype.handleConcurrentReorg = function(callback) { return callback(); } - self.findCommonAncestorAndNewHashes(self.block.concurrentTip.hash, self.block.tip.hash, function(err, commonAncestor, newHashes) { + self.findCommonAncestorAndNewHashes( + self.block.concurrentTip.hash, + self.block.tip.hash, + function(err, commonAncestor, newHashes) { if(err) { return callback(err); } @@ -63,25 +65,25 @@ Reorg.prototype.rewindConcurrentTip = function(commonAncestor, callback) { return self.block.concurrentTip.hash !== commonAncestor; }, function(next) { - self.block.getConcurrentBlockOperations(self.block.concurrentTip, false, function(err, operations) { + self.block.getBlockOperations(self.block.concurrentTip, false, 'concurrent', function(err, operations) { if(err) { return next(err); } operations.push(self.block.getTipOperation(self.block.concurrentTip, false, 'concurrentTip')); - self.block.batch(operations, function(err) { + self.db.batch(operations, function(err) { if(err) { return next(err); } var prevHash = BufferUtil.reverse(self.block.concurrentTip.header.prevHash).toString('hex'); - self.node.services.bitcoind.getBlock(prevHash, function(err, block) { + self.block.getBlocks([prevHash], function(err, blocks) { if(err) { return next(err); } - self.block.concurrentTip = block; + self.block.concurrentTip = blocks[0]; next(); }); }); @@ -97,23 +99,23 @@ Reorg.prototype.fastForwardConcurrentTip = function(newHashes, callback) { var self = this; async.eachSeries(newHashes, function(hash, next) { - self.node.services.bitcoind.getBlock(hash, function(err, block) { + self.block.getBlocks([hash], function(err, blocks) { if(err) { return next(err); } - self.block.getConcurrentBlockOperations(block, true, function(err, operations) { + self.block.getBlockOperations(blocks[0], true, 'concurrent', function(err, operations) { if(err) { return next(err); } - operations.push(self.block.getTipOperation(block, true, 'concurrentTip')); - self.block.batch(operations, function(err) { + operations.push(self.block.getTipOperation(blocks[0], true, 'concurrentTip')); + self.db.batch(operations, function(err) { if(err) { return next(err); } - self.block.concurrentTip = block; + self.block.concurrentTip = blocks[0]; next(); }); }); @@ -132,7 +134,7 @@ Reorg.prototype.rewindBothTips = function(commonAncestor, callback) { async.parallel( [ function(next) { - self.block.getConcurrentBlockOperations(self.block.concurrentTip, false, function(err, operations) { + self.block.getBlockOperations(self.block.concurrentTip, false, 'concurrent', function(err, operations) { if(err) { return next(err); } @@ -141,7 +143,7 @@ Reorg.prototype.rewindBothTips = function(commonAncestor, callback) { }); }, function(next) { - self.block.getSerialBlockOperations(self.block.tip, false, function(err, operations) { + self.block.getBlockOperations(self.block.tip, false, 'serial', function(err, operations) { if(err) { return next(err); } @@ -157,20 +159,21 @@ Reorg.prototype.rewindBothTips = function(commonAncestor, callback) { } var operations = results[0].concat(results[1]); - self.block.batch(operations, function(err) { + self.db.batch(operations, function(err) { if(err) { return next(err); } var prevHash = BufferUtil.reverse(self.block.tip.header.prevHash).toString('hex'); - self.node.services.bitcoind.getBlock(prevHash, function(err, block) { + self.block.getBlocks([prevHash], function(err, blocks) { + if(err) { return next(err); } - self.block.concurrentTip = block; - self.block.tip = block; + self.block.concurrentTip = blocks[0]; + self.block.tip = blocks[0]; next(); }); }); @@ -185,7 +188,7 @@ Reorg.prototype.fastForwardBothTips = function(newHashes, callback) { var self = this; async.eachSeries(newHashes, function(hash, next) { - self.node.services.bitcoind.getBlock(hash, function(err, block) { + self.block.getBlocks([hash], function(err, blocks) { if(err) { return next(err); } @@ -193,22 +196,22 @@ Reorg.prototype.fastForwardBothTips = function(newHashes, callback) { async.parallel( [ function(next) { - self.block.getConcurrentBlockOperations(block, true, function(err, operations) { + self.block.getBlockOperations(blocks[0], true, 'concurrent', function(err, operations) { if(err) { return next(err); } - operations.push(self.block.getTipOperation(block, true, 'concurrentTip')); + operations.push(self.block.getTipOperation(blocks[0], true, 'concurrentTip')); next(null, operations); }); }, function(next) { - self.block.getSerialBlockOperations(block, true, function(err, operations) { + self.block.getBlockOperations(blocks[0], true, 'serial', function(err, operations) { if(err) { return next(err); } - operations.push(self.block.getTipOperation(block, true)); + operations.push(self.block.getTipOperation(blocks[0], true)); next(null, operations); }); } @@ -220,13 +223,13 @@ Reorg.prototype.fastForwardBothTips = function(newHashes, callback) { var operations = results[0].concat(results[1]); - self.block.batch(operations, function(err) { + self.db.batch(operations, function(err) { if(err) { return next(err); } - self.block.concurrentTip = block; - self.block.tip = block; + self.block.concurrentTip = blocks[0]; + self.block.tip = blocks[0]; next(); }); } @@ -262,7 +265,7 @@ Reorg.prototype.findCommonAncestorAndNewHashes = function(oldTipHash, newTipHash return next(); } - self.node.services.bitcoind.getBlockHeader(mainPosition, function(err, mainBlockHeader) { + self.block.getBlockHeader(mainPosition, function(err, mainBlockHeader) { if(err) { return next(err); } @@ -281,7 +284,7 @@ Reorg.prototype.findCommonAncestorAndNewHashes = function(oldTipHash, newTipHash return next(); } - self.node.services.bitcoind.getBlockHeader(forkPosition, function(err, forkBlockHeader) { + self.block.getBlockHeader(forkPosition, function(err, forkBlockHeader) { if(err) { return next(err); } @@ -317,7 +320,7 @@ Reorg.prototype.findCommonAncestorAndNewHashes = function(oldTipHash, newTipHash next(); } - ) + ); }, function(err) { if(err) { diff --git a/lib/services/timestamp/index.js b/lib/services/timestamp/index.js index 10affa6c..62527412 100644 --- a/lib/services/timestamp/index.js +++ b/lib/services/timestamp/index.js @@ -42,7 +42,7 @@ TimestampService.prototype.stop = function(callback) { TimestampService.prototype._processBlockHandlerQueue = function(block) { var self = this; //do we have a record in the queue that is waiting on me to consume it? - var prevHash = utils.getPrevHashString(block); + var prevHash = utils.reverseBufferToString(block.header.prevHash); if (prevHash.length !== 64) { return; } diff --git a/lib/utils.js b/lib/utils.js index 60e50e80..1ca9fdee 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -78,8 +78,8 @@ utils.diffTime = function(time) { return (diff[0] * 1E9 + diff[1])/(1E9 * 1.0); }; -utils.getPrevHashString = function(block) { - return BufferUtil.reverse(block.header.prevHash).toString('hex'); +utils.reverseBufferToString = function(buf) { + return BufferUtil.reverse(buf).toString('hex'); }; diff --git a/regtest/block.js b/regtest/block.js index 0f088181..31a1d549 100644 --- a/regtest/block.js +++ b/regtest/block.js @@ -30,7 +30,7 @@ var bitcoin = { rpcpassword: rpcConfig.pass, rpcport: rpcConfig.port, zmqpubrawtx: 'tcp://127.0.0.1:38332', - zmqpubhashblock: 'tcp://127.0.0.1:38332' + zmqpubrawblock: 'tcp://127.0.0.1:38332' }, datadir: bitcoinDataDir, exec: 'bitcoind', //if this isn't on your PATH, then provide the absolute path, e.g. /usr/local/bin/bitcoind @@ -116,7 +116,7 @@ describe('Block Operations', function() { it('should sync block hashes as keys and heights as values', function(done) { - async.timesLimit(opts.initialHeight + 1, 12, function(n, next) { + async.timesLimit(opts.initialHeight, 12, function(n, next) { utils.queryBitcoreNode(Object.assign({ path: '/test/hash/' + n }, bitcore.httpOpts), function(err, res) { @@ -141,7 +141,7 @@ describe('Block Operations', function() { }); it('should sync block heights as keys and hashes as values', function(done) { - async.timesLimit(opts.initialHeight + 1, 12, function(n, next) { + async.timesLimit(opts.initialHeight, 12, function(n, next) { utils.queryBitcoreNode(Object.assign({ path: '/test/height/' + self.hashes[n] }, bitcore.httpOpts), function(err, res) { diff --git a/regtest/db.js b/regtest/db.js index 400fff4d..ebbd4671 100644 --- a/regtest/db.js +++ b/regtest/db.js @@ -1,7 +1,7 @@ 'use strict'; var chai = require('chai'); -var should = chai.should(); +var expect = chai.expect; var async = require('async'); var path = require('path'); var utils = require('./utils'); @@ -83,6 +83,51 @@ var opts = { var genesis = new Block(new Buffer(blocks.genesis, 'hex')); var block1 = new Block(new Buffer(blocks.block1a, 'hex')); var block2 = new Block(new Buffer(blocks.block1b, 'hex')); +var rawGenesis = blocks.genesis; +var rawBlock1 = blocks.block1a; +var rawBlock2 = blocks.block1b; +var genesisHash = genesis.hash; +var genesisHeader = { + height: 0, + hash: genesis.hash, + previousblockhash: new Array(65).join('0') +}; +var block1Header = { + height: 1, + hash: block1.header.hash, + previousblockhash: BufferUtil.reverse(block1.header.prevHash).toString('hex') +}; +var block2Header = { + height: 1, + hash: block2.header.hash, + previousblockhash: BufferUtil.reverse(block2.header.prevHash).toString('hex') +}; + + +function publishBlockHash(rawBlockHex, callback) { + + pubSocket.send([ 'rawblock', new Buffer(rawBlockHex, 'hex') ]); + + var httpOpts = utils.getHttpOpts(opts, { path: '/info' }); + + // we don't know exactly when all the blockhandlers will complete after the "tip" event + // so we must wait an indeterminate time to check on the current tip + setTimeout(function() { + + utils.queryBitcoreNode(httpOpts, function(err, res) { + + if(err) { + return callback(err); + } + + var block = Block.fromString(rawBlockHex); + expect(block.hash).equal(JSON.parse(res).dbhash); + callback(); + + }); + + }, 2000); +} describe('DB Operations', function() { @@ -93,23 +138,11 @@ describe('DB Operations', function() { var self = this; var responses = [ - genesis.hash, - { height: 0, hash: genesis.hash }, - genesis.hash, - blocks.genesis, - genesis.hash, - { height: 0, hash: genesis.hash, previousblockhash: new Array(65).join('0') }, - block1.hash, - blocks.block1a, - { height: 1, hash: block1.header.hash, previousblockhash: BufferUtil.reverse(block1.header.prevHash).toString('hex') }, - block2.hash, - blocks.block1b, - { height: 1, hash: block2.header.hash, previousblockhash: BufferUtil.reverse(block2.header.prevHash).toString('hex') }, - { height: 1, hash: block1.header.hash, previousblockhash: BufferUtil.reverse(block1.header.prevHash).toString('hex') }, - { height: 1, hash: block2.header.hash, previousblockhash: BufferUtil.reverse(block2.header.prevHash).toString('hex') }, - blocks.genesis, - blocks.block1b, - { height: 1, hash: block1.header.hash, previousblockhash: BufferUtil.reverse(block2.header.prevHash).toString('hex') }, + genesisHash, + genesisHeader, + rawGenesis, + block1Header, + block2Header ]; after(function(done) { @@ -135,9 +168,13 @@ describe('DB Operations', function() { req.on('end', function() { var body = JSON.parse(data); - console.log('request', body); + if (debug) { + console.log('request', body); + } var response = JSON.stringify({ result: responses[responseCount++], count: responseCount }); - console.log('response', response, 'id: ', body.id); + if (debug) { + console.log('response', response, 'id: ', body.id); + } res.write(response); res.end(); }); @@ -156,9 +193,6 @@ describe('DB Operations', function() { it('should reorg when needed', function(done) { - var block1a = '77d0b8043d3a1353ffd22ad70e228e30c15fd0f250d51d608b1b7997e6239ffb'; - var block1b = '2e516187b1b58467cb138bf68ff00d9bda71b5487cdd7b9b9cfbe7b153cd59d4'; - /* _______________________________________________________ | | | | | @@ -175,14 +209,15 @@ describe('DB Operations', function() { async.series([ - publishBlockHash.bind(self, block1a), - publishBlockHash.bind(self, block1b) + publishBlockHash.bind(self, rawBlock1), + publishBlockHash.bind(self, rawBlock2) ], function(err) { if(err) { return done(err); } + done(); }); @@ -191,28 +226,5 @@ describe('DB Operations', function() { }); -function publishBlockHash(blockHash, callback) { - - pubSocket.send([ 'hashblock', new Buffer(blockHash, 'hex') ]); - - var httpOpts = utils.getHttpOpts(opts, { path: '/wallet-api/info' }); - - //we don't know exactly when all the blockhandlers will complete after the "tip" event - //so we must wait an indeterminate time to check on the current tip - setTimeout(function() { - - utils.queryBitcoreNode(httpOpts, function(err, res) { - - if(err) { - return callback(err); - } - - blockHash.should.equal(JSON.parse(res).dbhash); - callback(); - - }); - - }, 2000); -} diff --git a/regtest/test_web.js b/regtest/test_web.js index 5d1acf55..0e863b90 100644 --- a/regtest/test_web.js +++ b/regtest/test_web.js @@ -1,3 +1,5 @@ +'use strict'; + var BaseService = require('../lib/service'); var inherits = require('util').inherits;