From ce15e8ecf30731bd1229182569b15f6952d66cd4 Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Fri, 19 May 2017 10:48:19 -0400 Subject: [PATCH] wip --- lib/scaffold/start.js | 148 ++++++++++++++++--------- lib/services/block/block_handler.js | 71 +++--------- lib/services/block/index.js | 139 +++++++++++++++-------- lib/services/db/index.js | 15 +-- regtest/{block.js => block.regtest.js} | 9 +- 5 files changed, 209 insertions(+), 173 deletions(-) rename regtest/{block.js => block.regtest.js} (96%) diff --git a/lib/scaffold/start.js b/lib/scaffold/start.js index 7be24f00..21e93f47 100644 --- a/lib/scaffold/start.js +++ b/lib/scaffold/start.js @@ -4,7 +4,6 @@ var path = require('path'); var BitcoreNode = require('../node'); var index = require('../'); var bitcore = require('bitcore-lib'); -var fs = require('fs'); var _ = bitcore.deps._; var log = index.log; var shuttingDown = false; @@ -15,46 +14,46 @@ log.debug = function() {}; * Checks for configuration options from version 2. This includes an "address" and * "db" service, or having "datadir" at the root of the config. */ -function checkConfigVersion2(fullConfig) { - // TODO: Remove - return false; - - var datadirUndefined = _.isUndefined(fullConfig.datadir); - var addressDefined = (fullConfig.services.indexOf('address') >= 0); - var dbDefined = (fullConfig.services.indexOf('db') >= 0); - - if (!datadirUndefined || addressDefined || dbDefined) { - - console.warn('\nConfiguration file is not compatible with this version. \n' + - 'A reindex for bitcoind is necessary for this upgrade with the "reindex=1" bitcoin.conf option. \n' + - 'There are changes necessary in both bitcoin.conf and bitcore-node.json. \n\n' + - 'To upgrade please see the details below and documentation at: \n' + - 'https://github.com/bitpay/bitcore-node/blob/bitcoind/docs/upgrade.md \n'); - - if (!datadirUndefined) { - console.warn('Please remove "datadir" and add it to the config at ' + fullConfig.path + ' with:'); - var missingConfig = { - servicesConfig: { - bitcoind: { - spawn: { - datadir: fullConfig.datadir, - exec: path.resolve(__dirname, '../../bin/bitcoind') - } - } - } - }; - console.warn(JSON.stringify(missingConfig, null, 2) + '\n'); - } - - if (addressDefined || dbDefined) { - console.warn('Please remove "address" and/or "db" from "services" in: ' + fullConfig.path + '\n'); - } - - return true; - } - - return false; -} +//function checkConfigVersion2(fullConfig) { +// // TODO: Remove +// return false; +// +// var datadirUndefined = _.isUndefined(fullConfig.datadir); +// var addressDefined = (fullConfig.services.indexOf('address') >= 0); +// var dbDefined = (fullConfig.services.indexOf('db') >= 0); +// +// if (!datadirUndefined || addressDefined || dbDefined) { +// +// console.warn('\nConfiguration file is not compatible with this version. \n' + +// 'A reindex for bitcoind is necessary for this upgrade with the "reindex=1" bitcoin.conf option. \n' + +// 'There are changes necessary in both bitcoin.conf and bitcore-node.json. \n\n' + +// 'To upgrade please see the details below and documentation at: \n' + +// 'https://github.com/bitpay/bitcore-node/blob/bitcoind/docs/upgrade.md \n'); +// +// if (!datadirUndefined) { +// console.warn('Please remove "datadir" and add it to the config at ' + fullConfig.path + ' with:'); +// var missingConfig = { +// servicesConfig: { +// bitcoind: { +// spawn: { +// datadir: fullConfig.datadir, +// exec: path.resolve(__dirname, '../../bin/bitcoind') +// } +// } +// } +// }; +// console.warn(JSON.stringify(missingConfig, null, 2) + '\n'); +// } +// +// if (addressDefined || dbDefined) { +// console.warn('Please remove "address" and/or "db" from "services" in: ' + fullConfig.path + '\n'); +// } +// +// return true; +// } +// +// return false; +//} /** * This function will instantiate and start a Node, requiring the necessary service @@ -127,6 +126,36 @@ function checkService(service) { } } +function lookInCwd(req, service) { + try { + return req(process.cwd + '/' + service); + } catch(e) { + + } +} + +function lookInBuiltInPath(req, service) { + try { + var serviceFile = path.resolve(__dirname, '../services/' + service.name); + return req(serviceFile); + } catch(e) { + + } +} + +function lookInModuleManifest(req, service) { + try { + var servicePackage = req(service.name + '/package.json'); + var serviceModule = service.name; + if (servicePackage.bitcoreNode) { + serviceModule = service.name + '/' + servicePackage.bitcoreNode; + return req(serviceModule); + } + } catch(e) { + + } +} + /** * Will require a module from local services directory first * and then from available node_modules @@ -134,20 +163,30 @@ function checkService(service) { * @param {Object} service */ function loadModule(req, service) { - try { - // first try in the built-in bitcore-node services directory - var serviceFile = path.resolve(__dirname, '../services/' + service.name); - service.module = req(serviceFile); - } catch(e) { - console.log('Could not load ' + service.name + ' service'); - log.error(e.stack); - var servicePackage = req(service.name + '/package.json'); - var serviceModule = service.name; - if (servicePackage.bitcoreNode) { - serviceModule = service.name + '/' + servicePackage.bitcoreNode; - } - service.module = req(serviceModule); + var serviceCode; + + //first look in the current working directory (of the controlling terminal, if there is one) for the service code + serviceCode = lookInCwd(req, service); + + //second try the built-in services + if(!serviceCode) { + serviceCode = lookInBuiltInPath(req, service); } + + //third see if there is directory in our module search path that has a + //package.json file, if so, then see if there is a bitcoreNode field, if so + //use this as the path to the service module + if(!serviceCode) { + serviceCode = lookInModuleManifest(req, service); + } + + if (!serviceCode) { + throw new Error('Attempted to load the ' + service.name + ' service from: "' + + process.cwd() + '" then from: "' + __dirname + '/../lib/services' + '" finally from: "' + + process.cwd() + '/package.json" - bitcoreNode field. All paths failed to find valid nodeJS code.'); + } + + service.module = serviceCode; } /** @@ -253,4 +292,3 @@ module.exports.registerExitHandlers = registerExitHandlers; module.exports.exitHandler = exitHandler; module.exports.setupServices = setupServices; module.exports.cleanShutdown = cleanShutdown; -module.exports.checkConfigVersion2 = checkConfigVersion2; diff --git a/lib/services/block/block_handler.js b/lib/services/block/block_handler.js index 1ce37e96..d5f121ea 100644 --- a/lib/services/block/block_handler.js +++ b/lib/services/block/block_handler.js @@ -19,7 +19,6 @@ function BlockStream(highWaterMark, sync) { this.lastEmittedHash = this.dbTip.hash; this.queue = []; this.processing = false; - this.bitcoind = this.block.bitcoind; } inherits(BlockStream, Readable); @@ -39,6 +38,7 @@ function ProcessSerial(highWaterMark, sync) { Writable.call(this, {objectMode: true, highWaterMark: highWaterMark}); this.block = sync.block; this.db = sync.db; + this.block = sync.block; this.tip = sync.block.tip; this.processBlockStartTime = []; this._lastReportedTime = Date.now(); @@ -58,6 +58,7 @@ function WriteStream(highWaterMark, sync) { this.db = sync.db; this.writeTime = 0; this.lastConcurrentOutputHeight = 0; + this.block = sync.block; } inherits(WriteStream, Writable); @@ -108,37 +109,10 @@ BlockHandler.prototype._onFinish = function() { var self = this; self.syncing = false; - if (self.forkBlock) { - self.db.handleReorg(self.forkBlock, function() { - self.forkBlock = null; - self.sync(); - }); - return; - } - - self._startSubscriptions(); self.emit('synced'); }; -BlockHandler.prototype._startSubscriptions = function() { - - var self = this; - - if (!self.subscribed) { - - self.subscribed = true; - self.bus = self.node.openBus({remoteAddress: 'localhost'}); - - self.bus.on('bitcoind/hashblock', function() { - self.sync(); - }); - - self.bus.subscribe('bitcoind/hashblock'); - } - -}; - BlockHandler.prototype._handleErrors = function(stream) { var self = this; @@ -151,7 +125,7 @@ BlockHandler.prototype._handleErrors = function(stream) { BlockStream.prototype._read = function() { - if (this.lastEmittedHash === this.bitcoind.tiphash) { + if (this.lastEmittedHash === this.block.getNetworkTipHash()) { return this.push(null); } @@ -193,12 +167,12 @@ BlockStream.prototype._getBlocks = function(heights, callback) { async.map(heights, function(height, next) { if (height === 0) { - var block = new Block(self.bitcoind.genesisBuffer); + var block = new Block(self.block.genesis); block.__height = 0; return next(null, block); } - self.bitcoind.getBlock(height, function(err, block) { + self.block.getBlock(height, function(err, block) { if(err) { return next(err); @@ -208,32 +182,20 @@ BlockStream.prototype._getBlocks = function(heights, callback) { next(null, block); }); - }, function(err, blocks) { if(err) { return callback(err); } - //at this point, we know that all blocks we've sent down the pipe - //have not been reorg'ed, but the new batch here might have been - self.sync.forkBlock = self.db.detectReorg(blocks); + for(var i = 0; i < blocks.length; i++) { - if (!self.sync.forkBlock) { - - for(var i = 0; i < blocks.length; i++) { - - self.lastEmittedHash = blocks[i].hash; - self.push(blocks[i]); - - } - - return callback(); + self.lastEmittedHash = blocks[i].hash; + self.push(blocks[i]); } - self.push(null); - callback(); + return callback(); }); }; @@ -256,7 +218,7 @@ ProcessSerial.prototype._write = function(block, enc, callback) { return self._process(block, callback); } - self.db.once('concurrentaddblock', function() { + self.block.once('concurrentaddblock', function() { if(!check()) { var err = new Error('Concurrent block ' + self.block.concurrentTip.__height + ' is less than ' + block.__height); return self.emit('error', err); @@ -269,7 +231,7 @@ ProcessSerial.prototype._write = function(block, enc, callback) { ProcessSerial.prototype._process = function(block, callback) { var self = this; - self.db.getSerialBlockOperations(block, true, function(err, operations) { + self.block.getSerialBlockOperations(block, true, function(err, operations) { if(err) { return callback(err); } @@ -290,20 +252,19 @@ ProcessSerial.prototype._process = function(block, callback) { self.block.tip = block; self._reportStatus(); - self.db.emit('addblock'); + self.block.emit('addblock'); callback(); }); }); }; - ProcessConcurrent.prototype._transform = function(block, enc, callback) { var self = this; this.lastBlock = block; - self.db.getConcurrentBlockOperations(block, true, function(err, operations) { + self.block.getConcurrentBlockOperations(block, true, function(err, operations) { if(err) { return callback(err); } @@ -348,7 +309,7 @@ WriteStream.prototype._write = function(obj, enc, callback) { } self.block.concurrentTip = obj.concurrentTip; - self.db.emit('concurrentaddblock'); + self.block.emit('concurrentaddblock'); self.lastConcurrentOutputHeight = self.block.concurrentTip.__height; callback(); }); @@ -358,7 +319,7 @@ ProcessBoth.prototype._write = function(block, encoding, callback) { var self = this; async.parallel([function(next) { - self.db.getConcurrentBlockOperations(block, true, function(err, operations) { + self.block.getConcurrentBlockOperations(block, true, function(err, operations) { if(err) { return callback(err); } @@ -366,7 +327,7 @@ ProcessBoth.prototype._write = function(block, encoding, callback) { next(null, operations); }); }, function(next) { - self.db.getSerialBlockOperations(block, true, function(err, operations) { + self.block.getSerialBlockOperations(block, true, function(err, operations) { if(err) { return callback(err); } diff --git a/lib/services/block/index.js b/lib/services/block/index.js index bdfef894..2bdd31ad 100644 --- a/lib/services/block/index.js +++ b/lib/services/block/index.js @@ -1,6 +1,5 @@ 'use strict'; -var assert = require('assert'); var BaseService = require('../../service'); var levelup = require('levelup'); var bitcore = require('bitcore-lib'); @@ -22,6 +21,12 @@ var BlockService = function(options) { this.db = this.node.services.db; this._blockHandler = new BlockHandler(this.node, this); this._lockTimes = []; + this.tip = null; + this.genesis = null; + this.dbOptions = { + keyEncoding: 'string', + valueEncoding: 'binary' + }; }; inherits(BlockService, BaseService); @@ -31,6 +36,28 @@ BlockService.dependencies = [ 'db' ]; +BlockService.prototype.getNetworkTipHash = function() { + return this.bitcoind.tiphash; +}; + +BlockService.prototype._startSubscriptions = function() { + + var self = this; + + if (!self._subscribed) { + + self._subscribed = true; + self.bus = self.node.openBus({remoteAddress: 'localhost'}); + + self.bus.on('bitcoind/hashblock', function() { + self.sync(); + }); + + self.bus.subscribe('bitcoind/hashblock'); + } + +}; + BlockService.prototype._log = function(msg) { return log.info('BlockService: ', msg); }; @@ -55,51 +82,38 @@ BlockService.prototype.start = function(callback) { BlockService.prototype._sync = function() { var self = this; - self._loadTips(function(err) { + async.waterfall([ + + self._loadTips.bind(self), + self._detectReorg.bind(self), + self._getBlocks.bind(self) + + ], function(err) { if(err) { throw err; } - self._log('Bitcoin network tip is currently: ' + self.bitcoind.tiphash + ' at height: ' + self.bitcoind.height); - - self._detectReorg(function(err, header) { - - if(err) { - throw err; - } - - self._handleReorg(header, function - - - }); - - var blocksDiff = self.bitcoind.height - self.tip.__height - 1; - - if (blocksDiff < 0) { - self._log('Peer\'s height is less than our own. The peer may be syncing. The system is usable, but chain may have reorg in future blocks from our peers. We may not answer queries about blocks at heights greater than ' + self.bitcoind.height); - self._blockHandler.sync(); - return; - } - - self._log('Syncing: ' + blocksDiff + ' blocks from the network.'); - - self._getBlocks(blocksDiff, function(err) { - - if(err) { - throw err; - } - self._blockHandler.sync(); - - }); - + self._blockHandler.sync(); }); }; -BlockService.prototype._getBlocks = function(blocksDiff, callback) { +BlockService.prototype._getBlocks = function(callback) { var self = this; + var blocksDiff = self.bitcoind.height - self.tip.__height - 1; + + if (blocksDiff < 0) { + self._log('Peer\'s height is less than our own. The peer may be syncing.' + + ' The system is usable, but chain may have a reorg in future blocks.' + + ' We may not answer queries about blocks at heights greater than ' + self.bitcoind.height); + self._blockHandler.sync(); + return; + } + + self._log('Syncing: ' + blocksDiff + ' blocks from the network.'); + var operations = []; async.timesLimit(blocksDiff, 8, function(n, next) { @@ -114,19 +128,19 @@ BlockService.prototype._getBlocks = function(blocksDiff, callback) { operations.push({ type: 'put', key: self.encoding.encodeBlockHashKey(header.hash), - value: self.encoding.encodeBlockHashValue(header.height) + value: self.encoding.encodeBlockHeightValue(header.height) }); operations.push({ type: 'put', key: self.encoding.encodeBlockHeightKey(header.height), - value: self.encoding.encodeBlockHeightValue(header.hash) + value: self.encoding.encodeBlockHashValue(header.hash) }); next(); }); - }, function(err, headers) { + }, function(err) { if(err) { return callback(err); @@ -143,6 +157,10 @@ BlockService.prototype._setHandlers = function() { self.genesis = Block.fromBuffer(self.bitcoind.genesisBuffer); self._sync(); }); + + self._blockHandler.on('synced', function() { + self._startSubscriptions(); + }); }; BlockService.prototype.stop = function(callback) { @@ -177,11 +195,13 @@ BlockService.prototype.resumeSync = function() { BlockService.prototype._detectReorg = function(callback) { - if (this.tip.__height <= 0) { + var self = this; + + if (self.tip.__height <= 0) { return callback(); } - if (this.tip.hash === this.bitcoind.tiphash && this.tip.__height === this.bitcoind.height) { + if (self.tip.hash === self.bitcoind.tiphash && self.tip.__height === self.bitcoind.height) { return callback(); } @@ -191,16 +211,18 @@ BlockService.prototype._detectReorg = function(callback) { return callback(err); } + //we still might have a reorg, but later if (header.hash === self.tip.hash) { return callback(); } - callback(null, header); + //our hash isn't in the network anymore, we have definitely reorg'ed + callback(header, callback); }); }; -BlockService.prototype.handleReorg = function(header, callback) { +BlockService.prototype._handleReorg = function(header, callback) { var self = this; self.printTipInfo('Reorg detected!'); @@ -209,7 +231,7 @@ BlockService.prototype.handleReorg = function(header, callback) { var reorg = new Reorg(self.node, self); - reorg.handleReorg(forkBlock.hash, function(err) { + reorg.handleReorg(header.hash, function(err) { if(err) { self._log('Reorg failed! ' + err, log.error); @@ -275,7 +297,16 @@ BlockService.prototype._loadTips = function(callback) { }); }); - }, callback); + }, function(err) { + + if(err) { + return callback(err); + } + + self._log('Bitcoin network tip is currently: ' + self.bitcoind.tiphash + ' at height: ' + self.bitcoind.height); + callback(); + + }); }; @@ -384,4 +415,24 @@ BlockService.prototype.getConcurrentTipOperation = function(block, add) { }; }; +BlockService.prototype.getBlock = function(height, callback) { + //if our block service's tip is ahead of the network tip, then we need to + //watch for a reorg + this.bitcoind.getBlock(height, callback); +}; + + +BlockService.prototype.getBlockHash = function(height, callback) { + var self = this; + self.db.get(this.encoding.encodeBlockHeightKey(height), function(err, hashBuf) { + if (err instanceof levelup.errors.NotFoundError) { + return callback(); + } + if (err) { + return callback(err); + } + callback(null, self.encoding.decodeBlockHashValue(hashBuf)); + }); +}; + module.exports = BlockService; diff --git a/lib/services/db/index.js b/lib/services/db/index.js index 1acb0569..7aa0820a 100644 --- a/lib/services/db/index.js +++ b/lib/services/db/index.js @@ -8,10 +8,7 @@ var leveldown = require('leveldown'); var mkdirp = require('mkdirp'); var bitcore = require('bitcore-lib'); var Networks = bitcore.Networks; -var Block = bitcore.Block; var $ = bitcore.util.preconditions; -var index = require('../../'); -var log = index.log; var Service = require('../../service'); function DB(options) { @@ -19,21 +16,13 @@ function DB(options) { if (!(this instanceof DB)) { return new DB(options); } - if (!options) { - options = {}; - } + options = options || {}; Service.call(this, options); this.version = 2; this.dbPrefix = '\u0000\u0000'; - this.tip = null; - this.genesis = null; - this.dbOptions = { - keyEncoding: 'string', - valueEncoding: 'binary' - }; $.checkState(this.node.network, 'Node is expected to have a "network" property'); this.network = this.node.network; @@ -45,8 +34,6 @@ function DB(options) { this.levelupStore = options.store; } - this.retryInterval = 60000; - this.subscriptions = {}; this.bitcoind = this.node.services.bitcoind; diff --git a/regtest/block.js b/regtest/block.regtest.js similarity index 96% rename from regtest/block.js rename to regtest/block.regtest.js index 1b0e166d..6db0b2a2 100644 --- a/regtest/block.js +++ b/regtest/block.regtest.js @@ -1,12 +1,11 @@ 'use strict'; var chai = require('chai'); -var should = chai.should(); +var expect = chai.expect; var async = require('async'); var BitcoinRPC = require('bitcoind-rpc'); var path = require('path'); var utils = require('./utils'); -var crypto = require('crypto'); var debug = true; var bitcoreDataDir = '/tmp/bitcore'; @@ -49,7 +48,8 @@ var bitcore = { 'bitcoind', 'db', 'block', - 'web' + 'web', + 'block.regtest', ], servicesConfig: { bitcoind: { @@ -112,8 +112,7 @@ describe('Block Operations', function() { }); it('should sync block headers', function(done) { - - done(); + done(); }); });