This commit is contained in:
Chris Kleeschulte 2017-05-19 10:48:19 -04:00
parent 72602ba89f
commit ce15e8ecf3
5 changed files with 209 additions and 173 deletions

View File

@ -4,7 +4,6 @@ var path = require('path');
var BitcoreNode = require('../node'); var BitcoreNode = require('../node');
var index = require('../'); var index = require('../');
var bitcore = require('bitcore-lib'); var bitcore = require('bitcore-lib');
var fs = require('fs');
var _ = bitcore.deps._; var _ = bitcore.deps._;
var log = index.log; var log = index.log;
var shuttingDown = false; var shuttingDown = false;
@ -15,46 +14,46 @@ log.debug = function() {};
* Checks for configuration options from version 2. This includes an "address" and * Checks for configuration options from version 2. This includes an "address" and
* "db" service, or having "datadir" at the root of the config. * "db" service, or having "datadir" at the root of the config.
*/ */
function checkConfigVersion2(fullConfig) { //function checkConfigVersion2(fullConfig) {
// TODO: Remove // // TODO: Remove
return false; // return false;
//
var datadirUndefined = _.isUndefined(fullConfig.datadir); // var datadirUndefined = _.isUndefined(fullConfig.datadir);
var addressDefined = (fullConfig.services.indexOf('address') >= 0); // var addressDefined = (fullConfig.services.indexOf('address') >= 0);
var dbDefined = (fullConfig.services.indexOf('db') >= 0); // var dbDefined = (fullConfig.services.indexOf('db') >= 0);
//
if (!datadirUndefined || addressDefined || dbDefined) { // if (!datadirUndefined || addressDefined || dbDefined) {
//
console.warn('\nConfiguration file is not compatible with this version. \n' + // console.warn('\nConfiguration file is not compatible with this version. \n' +
'A reindex for bitcoind is necessary for this upgrade with the "reindex=1" bitcoin.conf option. \n' + // 'A reindex for bitcoind is necessary for this upgrade with the "reindex=1" bitcoin.conf option. \n' +
'There are changes necessary in both bitcoin.conf and bitcore-node.json. \n\n' + // 'There are changes necessary in both bitcoin.conf and bitcore-node.json. \n\n' +
'To upgrade please see the details below and documentation at: \n' + // 'To upgrade please see the details below and documentation at: \n' +
'https://github.com/bitpay/bitcore-node/blob/bitcoind/docs/upgrade.md \n'); // 'https://github.com/bitpay/bitcore-node/blob/bitcoind/docs/upgrade.md \n');
//
if (!datadirUndefined) { // if (!datadirUndefined) {
console.warn('Please remove "datadir" and add it to the config at ' + fullConfig.path + ' with:'); // console.warn('Please remove "datadir" and add it to the config at ' + fullConfig.path + ' with:');
var missingConfig = { // var missingConfig = {
servicesConfig: { // servicesConfig: {
bitcoind: { // bitcoind: {
spawn: { // spawn: {
datadir: fullConfig.datadir, // datadir: fullConfig.datadir,
exec: path.resolve(__dirname, '../../bin/bitcoind') // exec: path.resolve(__dirname, '../../bin/bitcoind')
} // }
} // }
} // }
}; // };
console.warn(JSON.stringify(missingConfig, null, 2) + '\n'); // console.warn(JSON.stringify(missingConfig, null, 2) + '\n');
} // }
//
if (addressDefined || dbDefined) { // if (addressDefined || dbDefined) {
console.warn('Please remove "address" and/or "db" from "services" in: ' + fullConfig.path + '\n'); // console.warn('Please remove "address" and/or "db" from "services" in: ' + fullConfig.path + '\n');
} // }
//
return true; // return true;
} // }
//
return false; // return false;
} //}
/** /**
* This function will instantiate and start a Node, requiring the necessary service * This function will instantiate and start a Node, requiring the necessary service
@ -127,6 +126,36 @@ function checkService(service) {
} }
} }
function lookInCwd(req, service) {
try {
return req(process.cwd + '/' + service);
} catch(e) {
}
}
function lookInBuiltInPath(req, service) {
try {
var serviceFile = path.resolve(__dirname, '../services/' + service.name);
return req(serviceFile);
} catch(e) {
}
}
function lookInModuleManifest(req, service) {
try {
var servicePackage = req(service.name + '/package.json');
var serviceModule = service.name;
if (servicePackage.bitcoreNode) {
serviceModule = service.name + '/' + servicePackage.bitcoreNode;
return req(serviceModule);
}
} catch(e) {
}
}
/** /**
* Will require a module from local services directory first * Will require a module from local services directory first
* and then from available node_modules * and then from available node_modules
@ -134,20 +163,30 @@ function checkService(service) {
* @param {Object} service * @param {Object} service
*/ */
function loadModule(req, service) { function loadModule(req, service) {
try { var serviceCode;
// first try in the built-in bitcore-node services directory
var serviceFile = path.resolve(__dirname, '../services/' + service.name); //first look in the current working directory (of the controlling terminal, if there is one) for the service code
service.module = req(serviceFile); serviceCode = lookInCwd(req, service);
} catch(e) {
console.log('Could not load ' + service.name + ' service'); //second try the built-in services
log.error(e.stack); if(!serviceCode) {
var servicePackage = req(service.name + '/package.json'); serviceCode = lookInBuiltInPath(req, service);
var serviceModule = service.name;
if (servicePackage.bitcoreNode) {
serviceModule = service.name + '/' + servicePackage.bitcoreNode;
}
service.module = req(serviceModule);
} }
//third see if there is directory in our module search path that has a
//package.json file, if so, then see if there is a bitcoreNode field, if so
//use this as the path to the service module
if(!serviceCode) {
serviceCode = lookInModuleManifest(req, service);
}
if (!serviceCode) {
throw new Error('Attempted to load the ' + service.name + ' service from: "' +
process.cwd() + '" then from: "' + __dirname + '/../lib/services' + '" finally from: "' +
process.cwd() + '/package.json" - bitcoreNode field. All paths failed to find valid nodeJS code.');
}
service.module = serviceCode;
} }
/** /**
@ -253,4 +292,3 @@ module.exports.registerExitHandlers = registerExitHandlers;
module.exports.exitHandler = exitHandler; module.exports.exitHandler = exitHandler;
module.exports.setupServices = setupServices; module.exports.setupServices = setupServices;
module.exports.cleanShutdown = cleanShutdown; module.exports.cleanShutdown = cleanShutdown;
module.exports.checkConfigVersion2 = checkConfigVersion2;

View File

@ -19,7 +19,6 @@ function BlockStream(highWaterMark, sync) {
this.lastEmittedHash = this.dbTip.hash; this.lastEmittedHash = this.dbTip.hash;
this.queue = []; this.queue = [];
this.processing = false; this.processing = false;
this.bitcoind = this.block.bitcoind;
} }
inherits(BlockStream, Readable); inherits(BlockStream, Readable);
@ -39,6 +38,7 @@ function ProcessSerial(highWaterMark, sync) {
Writable.call(this, {objectMode: true, highWaterMark: highWaterMark}); Writable.call(this, {objectMode: true, highWaterMark: highWaterMark});
this.block = sync.block; this.block = sync.block;
this.db = sync.db; this.db = sync.db;
this.block = sync.block;
this.tip = sync.block.tip; this.tip = sync.block.tip;
this.processBlockStartTime = []; this.processBlockStartTime = [];
this._lastReportedTime = Date.now(); this._lastReportedTime = Date.now();
@ -58,6 +58,7 @@ function WriteStream(highWaterMark, sync) {
this.db = sync.db; this.db = sync.db;
this.writeTime = 0; this.writeTime = 0;
this.lastConcurrentOutputHeight = 0; this.lastConcurrentOutputHeight = 0;
this.block = sync.block;
} }
inherits(WriteStream, Writable); inherits(WriteStream, Writable);
@ -108,37 +109,10 @@ BlockHandler.prototype._onFinish = function() {
var self = this; var self = this;
self.syncing = false; self.syncing = false;
if (self.forkBlock) {
self.db.handleReorg(self.forkBlock, function() {
self.forkBlock = null;
self.sync();
});
return;
}
self._startSubscriptions();
self.emit('synced'); self.emit('synced');
}; };
BlockHandler.prototype._startSubscriptions = function() {
var self = this;
if (!self.subscribed) {
self.subscribed = true;
self.bus = self.node.openBus({remoteAddress: 'localhost'});
self.bus.on('bitcoind/hashblock', function() {
self.sync();
});
self.bus.subscribe('bitcoind/hashblock');
}
};
BlockHandler.prototype._handleErrors = function(stream) { BlockHandler.prototype._handleErrors = function(stream) {
var self = this; var self = this;
@ -151,7 +125,7 @@ BlockHandler.prototype._handleErrors = function(stream) {
BlockStream.prototype._read = function() { BlockStream.prototype._read = function() {
if (this.lastEmittedHash === this.bitcoind.tiphash) { if (this.lastEmittedHash === this.block.getNetworkTipHash()) {
return this.push(null); return this.push(null);
} }
@ -193,12 +167,12 @@ BlockStream.prototype._getBlocks = function(heights, callback) {
async.map(heights, function(height, next) { async.map(heights, function(height, next) {
if (height === 0) { if (height === 0) {
var block = new Block(self.bitcoind.genesisBuffer); var block = new Block(self.block.genesis);
block.__height = 0; block.__height = 0;
return next(null, block); return next(null, block);
} }
self.bitcoind.getBlock(height, function(err, block) { self.block.getBlock(height, function(err, block) {
if(err) { if(err) {
return next(err); return next(err);
@ -208,32 +182,20 @@ BlockStream.prototype._getBlocks = function(heights, callback) {
next(null, block); next(null, block);
}); });
}, function(err, blocks) { }, function(err, blocks) {
if(err) { if(err) {
return callback(err); return callback(err);
} }
//at this point, we know that all blocks we've sent down the pipe for(var i = 0; i < blocks.length; i++) {
//have not been reorg'ed, but the new batch here might have been
self.sync.forkBlock = self.db.detectReorg(blocks);
if (!self.sync.forkBlock) { self.lastEmittedHash = blocks[i].hash;
self.push(blocks[i]);
for(var i = 0; i < blocks.length; i++) {
self.lastEmittedHash = blocks[i].hash;
self.push(blocks[i]);
}
return callback();
} }
self.push(null); return callback();
callback();
}); });
}; };
@ -256,7 +218,7 @@ ProcessSerial.prototype._write = function(block, enc, callback) {
return self._process(block, callback); return self._process(block, callback);
} }
self.db.once('concurrentaddblock', function() { self.block.once('concurrentaddblock', function() {
if(!check()) { if(!check()) {
var err = new Error('Concurrent block ' + self.block.concurrentTip.__height + ' is less than ' + block.__height); var err = new Error('Concurrent block ' + self.block.concurrentTip.__height + ' is less than ' + block.__height);
return self.emit('error', err); return self.emit('error', err);
@ -269,7 +231,7 @@ ProcessSerial.prototype._write = function(block, enc, callback) {
ProcessSerial.prototype._process = function(block, callback) { ProcessSerial.prototype._process = function(block, callback) {
var self = this; var self = this;
self.db.getSerialBlockOperations(block, true, function(err, operations) { self.block.getSerialBlockOperations(block, true, function(err, operations) {
if(err) { if(err) {
return callback(err); return callback(err);
} }
@ -290,20 +252,19 @@ ProcessSerial.prototype._process = function(block, callback) {
self.block.tip = block; self.block.tip = block;
self._reportStatus(); self._reportStatus();
self.db.emit('addblock'); self.block.emit('addblock');
callback(); callback();
}); });
}); });
}; };
ProcessConcurrent.prototype._transform = function(block, enc, callback) { ProcessConcurrent.prototype._transform = function(block, enc, callback) {
var self = this; var self = this;
this.lastBlock = block; this.lastBlock = block;
self.db.getConcurrentBlockOperations(block, true, function(err, operations) { self.block.getConcurrentBlockOperations(block, true, function(err, operations) {
if(err) { if(err) {
return callback(err); return callback(err);
} }
@ -348,7 +309,7 @@ WriteStream.prototype._write = function(obj, enc, callback) {
} }
self.block.concurrentTip = obj.concurrentTip; self.block.concurrentTip = obj.concurrentTip;
self.db.emit('concurrentaddblock'); self.block.emit('concurrentaddblock');
self.lastConcurrentOutputHeight = self.block.concurrentTip.__height; self.lastConcurrentOutputHeight = self.block.concurrentTip.__height;
callback(); callback();
}); });
@ -358,7 +319,7 @@ ProcessBoth.prototype._write = function(block, encoding, callback) {
var self = this; var self = this;
async.parallel([function(next) { async.parallel([function(next) {
self.db.getConcurrentBlockOperations(block, true, function(err, operations) { self.block.getConcurrentBlockOperations(block, true, function(err, operations) {
if(err) { if(err) {
return callback(err); return callback(err);
} }
@ -366,7 +327,7 @@ ProcessBoth.prototype._write = function(block, encoding, callback) {
next(null, operations); next(null, operations);
}); });
}, function(next) { }, function(next) {
self.db.getSerialBlockOperations(block, true, function(err, operations) { self.block.getSerialBlockOperations(block, true, function(err, operations) {
if(err) { if(err) {
return callback(err); return callback(err);
} }

View File

@ -1,6 +1,5 @@
'use strict'; 'use strict';
var assert = require('assert');
var BaseService = require('../../service'); var BaseService = require('../../service');
var levelup = require('levelup'); var levelup = require('levelup');
var bitcore = require('bitcore-lib'); var bitcore = require('bitcore-lib');
@ -22,6 +21,12 @@ var BlockService = function(options) {
this.db = this.node.services.db; this.db = this.node.services.db;
this._blockHandler = new BlockHandler(this.node, this); this._blockHandler = new BlockHandler(this.node, this);
this._lockTimes = []; this._lockTimes = [];
this.tip = null;
this.genesis = null;
this.dbOptions = {
keyEncoding: 'string',
valueEncoding: 'binary'
};
}; };
inherits(BlockService, BaseService); inherits(BlockService, BaseService);
@ -31,6 +36,28 @@ BlockService.dependencies = [
'db' 'db'
]; ];
BlockService.prototype.getNetworkTipHash = function() {
return this.bitcoind.tiphash;
};
BlockService.prototype._startSubscriptions = function() {
var self = this;
if (!self._subscribed) {
self._subscribed = true;
self.bus = self.node.openBus({remoteAddress: 'localhost'});
self.bus.on('bitcoind/hashblock', function() {
self.sync();
});
self.bus.subscribe('bitcoind/hashblock');
}
};
BlockService.prototype._log = function(msg) { BlockService.prototype._log = function(msg) {
return log.info('BlockService: ', msg); return log.info('BlockService: ', msg);
}; };
@ -55,51 +82,38 @@ BlockService.prototype.start = function(callback) {
BlockService.prototype._sync = function() { BlockService.prototype._sync = function() {
var self = this; var self = this;
self._loadTips(function(err) { async.waterfall([
self._loadTips.bind(self),
self._detectReorg.bind(self),
self._getBlocks.bind(self)
], function(err) {
if(err) { if(err) {
throw err; throw err;
} }
self._log('Bitcoin network tip is currently: ' + self.bitcoind.tiphash + ' at height: ' + self.bitcoind.height); self._blockHandler.sync();
self._detectReorg(function(err, header) {
if(err) {
throw err;
}
self._handleReorg(header, function
});
var blocksDiff = self.bitcoind.height - self.tip.__height - 1;
if (blocksDiff < 0) {
self._log('Peer\'s height is less than our own. The peer may be syncing. The system is usable, but chain may have reorg in future blocks from our peers. We may not answer queries about blocks at heights greater than ' + self.bitcoind.height);
self._blockHandler.sync();
return;
}
self._log('Syncing: ' + blocksDiff + ' blocks from the network.');
self._getBlocks(blocksDiff, function(err) {
if(err) {
throw err;
}
self._blockHandler.sync();
});
}); });
}; };
BlockService.prototype._getBlocks = function(blocksDiff, callback) { BlockService.prototype._getBlocks = function(callback) {
var self = this; var self = this;
var blocksDiff = self.bitcoind.height - self.tip.__height - 1;
if (blocksDiff < 0) {
self._log('Peer\'s height is less than our own. The peer may be syncing.' +
' The system is usable, but chain may have a reorg in future blocks.' +
' We may not answer queries about blocks at heights greater than ' + self.bitcoind.height);
self._blockHandler.sync();
return;
}
self._log('Syncing: ' + blocksDiff + ' blocks from the network.');
var operations = []; var operations = [];
async.timesLimit(blocksDiff, 8, function(n, next) { async.timesLimit(blocksDiff, 8, function(n, next) {
@ -114,19 +128,19 @@ BlockService.prototype._getBlocks = function(blocksDiff, callback) {
operations.push({ operations.push({
type: 'put', type: 'put',
key: self.encoding.encodeBlockHashKey(header.hash), key: self.encoding.encodeBlockHashKey(header.hash),
value: self.encoding.encodeBlockHashValue(header.height) value: self.encoding.encodeBlockHeightValue(header.height)
}); });
operations.push({ operations.push({
type: 'put', type: 'put',
key: self.encoding.encodeBlockHeightKey(header.height), key: self.encoding.encodeBlockHeightKey(header.height),
value: self.encoding.encodeBlockHeightValue(header.hash) value: self.encoding.encodeBlockHashValue(header.hash)
}); });
next(); next();
}); });
}, function(err, headers) { }, function(err) {
if(err) { if(err) {
return callback(err); return callback(err);
@ -143,6 +157,10 @@ BlockService.prototype._setHandlers = function() {
self.genesis = Block.fromBuffer(self.bitcoind.genesisBuffer); self.genesis = Block.fromBuffer(self.bitcoind.genesisBuffer);
self._sync(); self._sync();
}); });
self._blockHandler.on('synced', function() {
self._startSubscriptions();
});
}; };
BlockService.prototype.stop = function(callback) { BlockService.prototype.stop = function(callback) {
@ -177,11 +195,13 @@ BlockService.prototype.resumeSync = function() {
BlockService.prototype._detectReorg = function(callback) { BlockService.prototype._detectReorg = function(callback) {
if (this.tip.__height <= 0) { var self = this;
if (self.tip.__height <= 0) {
return callback(); return callback();
} }
if (this.tip.hash === this.bitcoind.tiphash && this.tip.__height === this.bitcoind.height) { if (self.tip.hash === self.bitcoind.tiphash && self.tip.__height === self.bitcoind.height) {
return callback(); return callback();
} }
@ -191,16 +211,18 @@ BlockService.prototype._detectReorg = function(callback) {
return callback(err); return callback(err);
} }
//we still might have a reorg, but later
if (header.hash === self.tip.hash) { if (header.hash === self.tip.hash) {
return callback(); return callback();
} }
callback(null, header); //our hash isn't in the network anymore, we have definitely reorg'ed
callback(header, callback);
}); });
}; };
BlockService.prototype.handleReorg = function(header, callback) { BlockService.prototype._handleReorg = function(header, callback) {
var self = this; var self = this;
self.printTipInfo('Reorg detected!'); self.printTipInfo('Reorg detected!');
@ -209,7 +231,7 @@ BlockService.prototype.handleReorg = function(header, callback) {
var reorg = new Reorg(self.node, self); var reorg = new Reorg(self.node, self);
reorg.handleReorg(forkBlock.hash, function(err) { reorg.handleReorg(header.hash, function(err) {
if(err) { if(err) {
self._log('Reorg failed! ' + err, log.error); self._log('Reorg failed! ' + err, log.error);
@ -275,7 +297,16 @@ BlockService.prototype._loadTips = function(callback) {
}); });
}); });
}, callback); }, function(err) {
if(err) {
return callback(err);
}
self._log('Bitcoin network tip is currently: ' + self.bitcoind.tiphash + ' at height: ' + self.bitcoind.height);
callback();
});
}; };
@ -384,4 +415,24 @@ BlockService.prototype.getConcurrentTipOperation = function(block, add) {
}; };
}; };
BlockService.prototype.getBlock = function(height, callback) {
//if our block service's tip is ahead of the network tip, then we need to
//watch for a reorg
this.bitcoind.getBlock(height, callback);
};
BlockService.prototype.getBlockHash = function(height, callback) {
var self = this;
self.db.get(this.encoding.encodeBlockHeightKey(height), function(err, hashBuf) {
if (err instanceof levelup.errors.NotFoundError) {
return callback();
}
if (err) {
return callback(err);
}
callback(null, self.encoding.decodeBlockHashValue(hashBuf));
});
};
module.exports = BlockService; module.exports = BlockService;

View File

@ -8,10 +8,7 @@ var leveldown = require('leveldown');
var mkdirp = require('mkdirp'); var mkdirp = require('mkdirp');
var bitcore = require('bitcore-lib'); var bitcore = require('bitcore-lib');
var Networks = bitcore.Networks; var Networks = bitcore.Networks;
var Block = bitcore.Block;
var $ = bitcore.util.preconditions; var $ = bitcore.util.preconditions;
var index = require('../../');
var log = index.log;
var Service = require('../../service'); var Service = require('../../service');
function DB(options) { function DB(options) {
@ -19,21 +16,13 @@ function DB(options) {
if (!(this instanceof DB)) { if (!(this instanceof DB)) {
return new DB(options); return new DB(options);
} }
if (!options) { options = options || {};
options = {};
}
Service.call(this, options); Service.call(this, options);
this.version = 2; this.version = 2;
this.dbPrefix = '\u0000\u0000'; this.dbPrefix = '\u0000\u0000';
this.tip = null;
this.genesis = null;
this.dbOptions = {
keyEncoding: 'string',
valueEncoding: 'binary'
};
$.checkState(this.node.network, 'Node is expected to have a "network" property'); $.checkState(this.node.network, 'Node is expected to have a "network" property');
this.network = this.node.network; this.network = this.node.network;
@ -45,8 +34,6 @@ function DB(options) {
this.levelupStore = options.store; this.levelupStore = options.store;
} }
this.retryInterval = 60000;
this.subscriptions = {}; this.subscriptions = {};
this.bitcoind = this.node.services.bitcoind; this.bitcoind = this.node.services.bitcoind;

View File

@ -1,12 +1,11 @@
'use strict'; 'use strict';
var chai = require('chai'); var chai = require('chai');
var should = chai.should(); var expect = chai.expect;
var async = require('async'); var async = require('async');
var BitcoinRPC = require('bitcoind-rpc'); var BitcoinRPC = require('bitcoind-rpc');
var path = require('path'); var path = require('path');
var utils = require('./utils'); var utils = require('./utils');
var crypto = require('crypto');
var debug = true; var debug = true;
var bitcoreDataDir = '/tmp/bitcore'; var bitcoreDataDir = '/tmp/bitcore';
@ -49,7 +48,8 @@ var bitcore = {
'bitcoind', 'bitcoind',
'db', 'db',
'block', 'block',
'web' 'web',
'block.regtest',
], ],
servicesConfig: { servicesConfig: {
bitcoind: { bitcoind: {
@ -112,8 +112,7 @@ describe('Block Operations', function() {
}); });
it('should sync block headers', function(done) { it('should sync block headers', function(done) {
done();
done();
}); });
}); });