diff --git a/lib/constants.js b/lib/constants.js index 0e9cd8bb..c317ff61 100644 --- a/lib/constants.js +++ b/lib/constants.js @@ -7,6 +7,7 @@ module.exports = { regtest: '0f9188f13cb7b2c71f2a335e3a4fc328bf5beb436012afca590b1a11466e2206', testnet: '000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943', //this is testnet3 testnet5: '000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943' //this is testnet5 - } + }, + DB_PREFIX: new Buffer('ffff', 'hex') }; diff --git a/lib/scaffold/start.js b/lib/scaffold/start.js index 0f31ee96..8dcb2c94 100644 --- a/lib/scaffold/start.js +++ b/lib/scaffold/start.js @@ -10,28 +10,15 @@ var shuttingDown = false; var fs = require('fs'); -/** - * This function will instantiate and start a Node, requiring the necessary service - * modules, and registering event handlers. - * @param {Object} options - * @param {Object} options.servicesPath - The path to the location of service modules - * @param {String} options.path - The absolute path of the configuration file - * @param {Object} options.config - The parsed bitcore-node.json configuration file - * @param {Array} options.config.services - An array of services names. - * @param {Object} options.config.servicesConfig - Parameters to pass to each service - * @param {String} options.config.network - 'livenet', 'testnet' or 'regtest - * @param {Number} options.config.port - The port to use for the web service - */ function start(options) { - /* jshint maxstatements: 20 */ var fullConfig = _.clone(options.config); var servicesPath; if (options.servicesPath) { - servicesPath = options.servicesPath; // services are in a different directory than the config + servicesPath = options.servicesPath; } else { - servicesPath = options.path; // defaults to the same directory + servicesPath = options.path; } fullConfig.path = path.resolve(options.path, './bitcore-node.json'); @@ -201,11 +188,6 @@ function setupServices(req, servicesPath, config) { return services; } -/** - * Will shutdown a node and then the process - * @param {Object} _process - The Node.js process object - * @param {Node} node - The Bitcore Node instance - */ function cleanShutdown(_process, node) { node.stop(function(err) { if(err) { @@ -217,15 +199,6 @@ function cleanShutdown(_process, node) { }); } -/** - * Will handle all the shutdown tasks that need to take place to ensure a safe exit - * @param {Object} options - * @param {String} options.sigint - The signal given was a SIGINT - * @param {Array} options.exit - The signal given was an uncaughtException - * @param {Object} _process - The Node.js process - * @param {Node} node - * @param {Error} error -*/ function exitHandler(options, _process, node, err) { if (err) { log.error('uncaught exception:', err); @@ -247,17 +220,8 @@ function exitHandler(options, _process, node, err) { } } -/** - * Will register event handlers to stop the node for `process` events - * `uncaughtException` and `SIGINT`. - * @param {Object} _process - The Node.js process - * @param {Node} node - */ function registerExitHandlers(_process, node) { - //catches uncaught exceptions _process.on('uncaughtException', exitHandler.bind(null, {exit:true}, _process, node)); - - //catches ctrl+c event _process.on('SIGINT', exitHandler.bind(null, {sigint:true}, _process, node)); } diff --git a/lib/services/block/index.js b/lib/services/block/index.js index ba116bf1..6a55b132 100644 --- a/lib/services/block/index.js +++ b/lib/services/block/index.js @@ -171,7 +171,6 @@ BlockService.prototype.isSynced = function(callback) { BlockService.prototype.start = function(callback) { var self = this; - self._setListeners(); async.waterfall([ function(next) { @@ -191,6 +190,7 @@ BlockService.prototype.start = function(callback) { if(err) { return callback(err); } + self._setListeners(); self._startSubscriptions(); callback(); }); @@ -406,7 +406,7 @@ BlockService.prototype._getIncompleteChainIndexes = function(block) { BlockService.prototype._getOldBlocks = function(currentHash, commonAncestorHash) { // the old blocks should be in the meta colection - if (currentHash === commonAncestorHash || !commonAncestor || !currentHash) { + if (currentHash === commonAncestorHash || !commonAncestorHash || !currentHash) { return; } @@ -435,7 +435,7 @@ BlockService.prototype._handleReorg = function(block) { var commonAncestor = this._findCommonAncestor(block); var oldBlocks = this._getOldBlocks(this._tip.hash, block.hash, commonAncestor.hash); - if (!commonAncestor || !oldBlocks || oldBlock.length < 1) { + if (!commonAncestor || !oldBlocks || oldBlocks.length < 1) { log.error('A common ancestor block between hash: ' + this._tip.hash + ' (our current tip) and: ' + block.hash + ' (the forked block) could not be found. Bitcore-node must exit.'); this.node.stop(); @@ -602,7 +602,7 @@ BlockService.prototype._saveMetaData = function(block) { // tip this._tip.hash = block.hash; - this._tip.height = block.height; + this._tip.height = this._meta.length; var tipInfo = utils.encodeTip(this._tip, this.name); @@ -662,18 +662,27 @@ BlockService.prototype._sendDelta = function() { if (++this._blockCount >= BlockService.MAX_BLOCKS) { this._latestBlockHash = this._tip.hash; - this._numCompleted = this._tip.height; this._blockCount = 0; - this._sync(); + // this is where we ask the p2p network for more block + var self = this; + + // we need a pause here before asking for more blocks so that the db work write queue, + // does not get too large and cause a memory issue + // the larger the difference between network speeds and disk i/o speeds should result in + // larger waits, e.g. gigabit internet speeds and a usb 2.0 5400 rpm hard disk might 10-20 + // between each 500 block chunk + setTimeout(function() { + + self._sync(); + + }, 10000); } }; BlockService.prototype._setListeners = function() { - var self = this; - - self._p2p.once('bestHeight', self._onBestHeight.bind(self)); + this._p2p.once('bestHeight', this._onBestHeight.bind(this)); }; @@ -681,14 +690,11 @@ BlockService.prototype._setTip = function(tip) { log.debug('Setting tip to height: ' + tip.height); log.debug('Setting tip to hash: ' + tip.hash); this._tip = tip; - this._db.setServiceTip('block', this._tip); }; BlockService.prototype._startSync = function() { - var currentHeight = this._tip.height; - this._numNeeded = this._bestHeight - currentHeight; - this._numCompleted = currentHeight; + this._numNeeded = this._bestHeight - this._tip.height; if (this._numNeeded <= 0) { return; } @@ -719,8 +725,8 @@ BlockService.prototype._sync = function() { if (--this._p2pBlockCallsNeeded > 0) { - log.info('Blocks download progress: ' + this._numCompleted + '/' + - this._numNeeded + ' (' + (this._numCompleted/this._numNeeded*100).toFixed(2) + '%)'); + log.info('Blocks download progress: ' + this._tip.height + '/' + + this._numNeeded + ' (' + (this._tip.height / this._numNeeded*100).toFixed(2) + '%)'); this._p2p.getBlocks({ startHash: this._latestBlockHash }); return; diff --git a/lib/services/db/index.js b/lib/services/db/index.js index fc454e9a..36863893 100644 --- a/lib/services/db/index.js +++ b/lib/services/db/index.js @@ -22,11 +22,10 @@ function DB(options) { Service.call(this, options); + this._dbPrefix = constants.DB_PREFIX; + this.version = 1; - this.dbPrefix = new Buffer('00', 'hex'); - - $.checkState(this.node.network, 'Node is expected to have a "network" property'); this.network = this.node.network; this._setDataPath(); @@ -38,8 +37,10 @@ function DB(options) { this.subscriptions = {}; - this._operationsQueue = 0; this.GENESIS_HASH = constants.BITCOIN_GENESIS_HASH[this.node.getNetworkName()]; + this.node.on('stopping', function() { + log.warn('Node is stopping, gently closing the database.'); + }); } util.inherits(DB, Service); @@ -75,7 +76,7 @@ DB.prototype._checkVersion = function(callback) { // presupposition is that IF there is a database to open -and- there is a version key // in the form below, it will be related to us and it must be equal to this service's version. - var versionBuf = Buffer.concat([ self.dbPrefix, new Buffer('version', 'utf8') ]); + var versionBuf = Buffer.concat([ self._dbPrefix, new Buffer('version', 'utf8') ]); self.get(versionBuf, self.dbOptions, function(err, buffer) { if (err) { @@ -98,7 +99,7 @@ DB.prototype._checkVersion = function(callback) { DB.prototype._setVersion = function(callback) { var versionBuffer = new Buffer(new Array(4)); versionBuffer.writeUInt32BE(this.version); - this.put(this.dbPrefix + 'version', versionBuffer, callback); + this.put(this._dbPrefix + 'version', versionBuffer, callback); }; DB.prototype.start = function(callback) { @@ -157,11 +158,8 @@ DB.prototype.put = function(key, value, options) { return; } - self._operationsQueue++; self._store.put(key, value, options, function(err) { - self._operationsQueue--; - if (err) { self.emit('error', err); return; @@ -178,11 +176,8 @@ DB.prototype.batch = function(ops, options) { return; } - self._operationsQueue += ops.length; self._store.batch(ops, options, function(err) { - self._operationsQueue -= ops.length; - if (err) { self.emit('error', err); return; @@ -198,7 +193,7 @@ DB.prototype.createReadStream = function(op) { stream = this._store.createReadStream(op); } if (stream) { - stream.on('error', this.onError.bind(this)); + stream.on('error', this._onError.bind(this)); } return stream; }; @@ -217,13 +212,7 @@ DB.prototype.createKeyStream = function(op) { DB.prototype.stop = function(callback) { var self = this; self._stopping = true; - async.whilst(function() { - return self._operationsQueue > 0; - }, function(next) { - setTimeout(next, 3000); - }, function() { - self.close(callback); - }); + self.close(callback); }; DB.prototype.close = function(callback) { @@ -243,7 +232,7 @@ DB.prototype.getPublishEvents = function() { DB.prototype.getServiceTip = function(serviceName, callback) { - var keyBuf = Buffer.concat([ this.dbPrefix, new Buffer('tip-' + serviceName, 'utf8') ]); + var keyBuf = Buffer.concat([ this._dbPrefix, new Buffer('tip-' + serviceName, 'utf8') ]); var self = this; self.get(keyBuf, function(err, tipBuf) { @@ -278,8 +267,8 @@ DB.prototype.getServiceTip = function(serviceName, callback) { DB.prototype.getPrefix = function(service, callback) { var self = this; - var keyBuf = Buffer.concat([ self.dbPrefix, new Buffer('prefix-', 'utf8'), new Buffer(service, 'utf8') ]); - var unusedBuf = Buffer.concat([ self.dbPrefix, new Buffer('nextUnused', 'utf8') ]); + var keyBuf = Buffer.concat([ self._dbPrefix, new Buffer('prefix-', 'utf8'), new Buffer(service, 'utf8') ]); + var unusedBuf = Buffer.concat([ self._dbPrefix, new Buffer('nextUnused', 'utf8') ]); function getPrefix(next) { diff --git a/lib/utils.js b/lib/utils.js index 6460d821..08193595 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -5,6 +5,7 @@ var BufferUtil = bitcore.util.buffer; var MAX_SAFE_INTEGER = 0x1fffffffffffff; // 2 ^ 53 - 1 var crypto = require('crypto'); var _ = require('lodash'); +var constants = require('./constants'); var utils = {}; utils.isHash = function(value) { @@ -218,7 +219,7 @@ utils.removeItemsByIndexList = function(indexList, list) { }; utils.encodeTip = function(tip, name) { - var key = Buffer.concat([ new Buffer('00', 'hex'), + var key = Buffer.concat([ constants.DB_PREFIX, new Buffer('tip-' + name, 'utf8') ]); var heightBuf = new Buffer(4);