From d06c177a1a3e560776802ccd42894910526fc064 Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Fri, 3 Mar 2017 13:35:43 -0500 Subject: [PATCH] Fixed small items with syncing. --- lib/logger.js | 2 +- lib/services/bitcoind/index.js | 12 +++++--- lib/services/db/index.js | 11 ++++--- lib/services/db/sync.js | 55 +++++++++++++++++----------------- 4 files changed, 42 insertions(+), 38 deletions(-) diff --git a/lib/logger.js b/lib/logger.js index bbf960e1..0e8576c7 100644 --- a/lib/logger.js +++ b/lib/logger.js @@ -59,7 +59,7 @@ Logger.prototype.warn = function() { */ Logger.prototype._log = function(color) { if (!this.permitWrites) { - //return; + return; } var args = Array.prototype.slice.call(arguments); args = args.slice(1); diff --git a/lib/services/bitcoind/index.js b/lib/services/bitcoind/index.js index 408d4691..e812e272 100644 --- a/lib/services/bitcoind/index.js +++ b/lib/services/bitcoind/index.js @@ -173,6 +173,7 @@ Bitcoin.prototype._updateTip = function(node, message) { var hex = message.toString('hex'); if (hex !== self.tiphash) { self.tiphash = message.toString('hex'); + node.client.getBlock(self.tiphash, function(err, response) { if (err) { var error = self._wrapRPCError(err); @@ -284,7 +285,9 @@ Bitcoin.prototype._initZmqSubSocket = function(node, zmqUrl) { }); node.zmqSubSocket.monitor(100, 0); - node.zmqSubSocket.connect(zmqUrl); + if (_.isString(zmqUrl)) { + node.zmqSubSocket.connect(zmqUrl); + } }; Bitcoin.prototype._loadTipFromNode = function(node, callback) { @@ -300,6 +303,7 @@ Bitcoin.prototype._loadTipFromNode = function(node, callback) { if (err) { return callback(self._wrapRPCError(err)); } + self.height = response.result.height; $.checkState(self.height >= 0); self.emit('tip', self.height); @@ -338,8 +342,8 @@ Bitcoin.prototype._connectProcess = function(config, callback) { return callback(new Error('Stopping while trying to connect to bitcoind.')); } - //self._initZmqSubSocket(node, config.zmqpubrawtx); - //self._subscribeZmqEvents(node); + self._initZmqSubSocket(node, config.zmqpubrawtx); + self._subscribeZmqEvents(node); callback(null, node); }); @@ -418,7 +422,6 @@ Bitcoin.prototype.getRawBlock = function(blockArg, callback) { return callback(err); } self._tryAllClients(function(client, done) { - self.client.getBlock(blockhash, false, function(err, response) { if (err) { return done(self._wrapRPCError(err)); @@ -449,6 +452,7 @@ Bitcoin.prototype.getBlock = function(blockArg, callback) { }); }, callback); } + self._maybeGetBlockHash(blockArg, queryBlock); }; diff --git a/lib/services/db/index.js b/lib/services/db/index.js index b01a9340..e1ae91ea 100644 --- a/lib/services/db/index.js +++ b/lib/services/db/index.js @@ -171,11 +171,10 @@ DB.prototype.start = function(callback) { self.node.once('ready', function() { log.permitWrites = false; - if (!(self.tip.__height === self.node.services.bitcoind.height && - self.tip === self.node.services.bitcoind.tiphash)) { - self._sync.initialSync(); - } - self.node.services.bitcoind.on('tip', function() { + self._sync.initialSync(); + + self.node.services.bitcoind.on('tip', function(height) { + log.info('New tip at height: ' + height + ' hash: ' + self.node.services.bitcoind.tiphash); self._sync.sync(); }); @@ -349,7 +348,7 @@ DB.prototype.unsubscribe = function(name, emitter) { DB.prototype.connectBlock = function(block, callback) { var self = this; - log.debug('DB handling new chain block'); + log.info('DB handling new chain block'); var operations = []; self.getConcurrentBlockOperations(block, true, function(err, ops) { if(err) { diff --git a/lib/services/db/sync.js b/lib/services/db/sync.js index 9456387e..5be7b0c9 100644 --- a/lib/services/db/sync.js +++ b/lib/services/db/sync.js @@ -11,10 +11,12 @@ var ProgressBar = require('progress'); var green = '\u001b[42m \u001b[0m'; var red = '\u001b[41m \u001b[0m'; -function BlockStream(highWaterMark, bitcoind, lastHeight) { +function BlockStream(highWaterMark, bitcoind, dbTip) { Readable.call(this, {objectMode: true, highWaterMark: highWaterMark}); this.bitcoind = bitcoind; - this.lastHeight = lastHeight; + this.dbTip = dbTip; + this.lastReadHeight = dbTip.__height; + this.lastEmittedHash = dbTip.hash; this.stopping = false; this.queue = []; this.processing = false; @@ -78,7 +80,7 @@ Sync.prototype.initialSync = function() { self.syncing = true; - self.blockStream = new BlockStream(self.highWaterMark, self.node.services.bitcoind, self.db.tip.__height); + self.blockStream = new BlockStream(self.highWaterMark, self.node.services.bitcoind, self.db.tip); var processConcurrent = new ProcessConcurrent(self.highWaterMark, self.db); var writeStream = new WriteStream(self.highWaterMark, self.db); var processSerial = new ProcessSerial(self.highWaterMark, self.db, self.db.tip); @@ -88,15 +90,10 @@ Sync.prototype.initialSync = function() { self._handleErrors(processSerial); self._handleErrors(writeStream); - processSerial.on('finish', function() { - self.syncing = false; - self.emit('synced'); - }); self.blockStream .pipe(processConcurrent) .pipe(writeStream); - self.blockStream .pipe(processSerial); @@ -114,16 +111,20 @@ Sync.prototype.initialSync = function() { var timer = setInterval(function () { var tick = self.db.tip.__height - self.lastReportedBlock; - self.progressBar.tick(tick, { blockspersec: tick }); - self.lastReportedBlock = self.db.tip.__height; + }, 1000); - if (!self.syncing) { + processSerial.on('finish', function() { + self.syncing = false; + if (self.progressBar) { self.progressBar.terminate(); + } + if (timer) { clearInterval(timer); } - }, 1000); + self.emit('synced'); + }); }; @@ -134,7 +135,7 @@ Sync.prototype.sync = function() { } this.syncing = true; - this.blockStream = new BlockStream(this.highWaterMark, this.node.services.bitcoind, this.db.tip.__height); + this.blockStream = new BlockStream(this.highWaterMark, this.node.services.bitcoind, this.db.tip); var processBoth = new ProcessBoth(this.highWaterMark, this.db); this._handleErrors(this.blockStream); @@ -174,20 +175,19 @@ Sync.prototype._handleErrors = function(stream) { BlockStream.prototype._read = function() { - this.lastHeight++; - this.queue.push(this.lastHeight); + + if (this.lastEmittedHash === this.bitcoind.tiphash) { + return this.push(null); + } + + this.queue.push(++this.lastReadHeight); this._process(); }; BlockStream.prototype._process = function() { var self = this; - if (!self.syncing) { - - return self.push(null); - } - - if(this.processing) { + if(self.processing) { return; } @@ -197,33 +197,34 @@ BlockStream.prototype._process = function() { function() { return self.queue.length; }, function(next) { + var heights = self.queue.slice(0, Math.min(5, self.queue.length)); self.queue = self.queue.slice(heights.length); async.map(heights, function(height, next) { + self.bitcoind.getBlock(height, function(err, block) { + if(err) { return next(err); } + block.__height = height; setTimeout(function() { next(null, block); }, 1); }); + }, function(err, blocks) { if(err) { return next(err); } for(var i = 0; i < blocks.length; i++) { + + self.lastEmittedHash = blocks[i].hash; self.push(blocks[i]); - if (blocks[i].hash === self.bitcoind.tip) { - - self.syncing = false; - self.emit('synced'); - break; - } } next();