Fixed small items with syncing.

This commit is contained in:
Chris Kleeschulte 2017-03-03 13:35:43 -05:00
parent aadd336b4c
commit d06c177a1a
4 changed files with 42 additions and 38 deletions

View File

@ -59,7 +59,7 @@ Logger.prototype.warn = function() {
*/
Logger.prototype._log = function(color) {
if (!this.permitWrites) {
//return;
return;
}
var args = Array.prototype.slice.call(arguments);
args = args.slice(1);

View File

@ -173,6 +173,7 @@ Bitcoin.prototype._updateTip = function(node, message) {
var hex = message.toString('hex');
if (hex !== self.tiphash) {
self.tiphash = message.toString('hex');
node.client.getBlock(self.tiphash, function(err, response) {
if (err) {
var error = self._wrapRPCError(err);
@ -284,7 +285,9 @@ Bitcoin.prototype._initZmqSubSocket = function(node, zmqUrl) {
});
node.zmqSubSocket.monitor(100, 0);
node.zmqSubSocket.connect(zmqUrl);
if (_.isString(zmqUrl)) {
node.zmqSubSocket.connect(zmqUrl);
}
};
Bitcoin.prototype._loadTipFromNode = function(node, callback) {
@ -300,6 +303,7 @@ Bitcoin.prototype._loadTipFromNode = function(node, callback) {
if (err) {
return callback(self._wrapRPCError(err));
}
self.height = response.result.height;
$.checkState(self.height >= 0);
self.emit('tip', self.height);
@ -338,8 +342,8 @@ Bitcoin.prototype._connectProcess = function(config, callback) {
return callback(new Error('Stopping while trying to connect to bitcoind.'));
}
//self._initZmqSubSocket(node, config.zmqpubrawtx);
//self._subscribeZmqEvents(node);
self._initZmqSubSocket(node, config.zmqpubrawtx);
self._subscribeZmqEvents(node);
callback(null, node);
});
@ -418,7 +422,6 @@ Bitcoin.prototype.getRawBlock = function(blockArg, callback) {
return callback(err);
}
self._tryAllClients(function(client, done) {
self.client.getBlock(blockhash, false, function(err, response) {
if (err) {
return done(self._wrapRPCError(err));
@ -449,6 +452,7 @@ Bitcoin.prototype.getBlock = function(blockArg, callback) {
});
}, callback);
}
self._maybeGetBlockHash(blockArg, queryBlock);
};

View File

@ -171,11 +171,10 @@ DB.prototype.start = function(callback) {
self.node.once('ready', function() {
log.permitWrites = false;
if (!(self.tip.__height === self.node.services.bitcoind.height &&
self.tip === self.node.services.bitcoind.tiphash)) {
self._sync.initialSync();
}
self.node.services.bitcoind.on('tip', function() {
self._sync.initialSync();
self.node.services.bitcoind.on('tip', function(height) {
log.info('New tip at height: ' + height + ' hash: ' + self.node.services.bitcoind.tiphash);
self._sync.sync();
});
@ -349,7 +348,7 @@ DB.prototype.unsubscribe = function(name, emitter) {
DB.prototype.connectBlock = function(block, callback) {
var self = this;
log.debug('DB handling new chain block');
log.info('DB handling new chain block');
var operations = [];
self.getConcurrentBlockOperations(block, true, function(err, ops) {
if(err) {

View File

@ -11,10 +11,12 @@ var ProgressBar = require('progress');
var green = '\u001b[42m \u001b[0m';
var red = '\u001b[41m \u001b[0m';
function BlockStream(highWaterMark, bitcoind, lastHeight) {
function BlockStream(highWaterMark, bitcoind, dbTip) {
Readable.call(this, {objectMode: true, highWaterMark: highWaterMark});
this.bitcoind = bitcoind;
this.lastHeight = lastHeight;
this.dbTip = dbTip;
this.lastReadHeight = dbTip.__height;
this.lastEmittedHash = dbTip.hash;
this.stopping = false;
this.queue = [];
this.processing = false;
@ -78,7 +80,7 @@ Sync.prototype.initialSync = function() {
self.syncing = true;
self.blockStream = new BlockStream(self.highWaterMark, self.node.services.bitcoind, self.db.tip.__height);
self.blockStream = new BlockStream(self.highWaterMark, self.node.services.bitcoind, self.db.tip);
var processConcurrent = new ProcessConcurrent(self.highWaterMark, self.db);
var writeStream = new WriteStream(self.highWaterMark, self.db);
var processSerial = new ProcessSerial(self.highWaterMark, self.db, self.db.tip);
@ -88,15 +90,10 @@ Sync.prototype.initialSync = function() {
self._handleErrors(processSerial);
self._handleErrors(writeStream);
processSerial.on('finish', function() {
self.syncing = false;
self.emit('synced');
});
self.blockStream
.pipe(processConcurrent)
.pipe(writeStream);
self.blockStream
.pipe(processSerial);
@ -114,16 +111,20 @@ Sync.prototype.initialSync = function() {
var timer = setInterval(function () {
var tick = self.db.tip.__height - self.lastReportedBlock;
self.progressBar.tick(tick, { blockspersec: tick });
self.lastReportedBlock = self.db.tip.__height;
}, 1000);
if (!self.syncing) {
processSerial.on('finish', function() {
self.syncing = false;
if (self.progressBar) {
self.progressBar.terminate();
}
if (timer) {
clearInterval(timer);
}
}, 1000);
self.emit('synced');
});
};
@ -134,7 +135,7 @@ Sync.prototype.sync = function() {
}
this.syncing = true;
this.blockStream = new BlockStream(this.highWaterMark, this.node.services.bitcoind, this.db.tip.__height);
this.blockStream = new BlockStream(this.highWaterMark, this.node.services.bitcoind, this.db.tip);
var processBoth = new ProcessBoth(this.highWaterMark, this.db);
this._handleErrors(this.blockStream);
@ -174,20 +175,19 @@ Sync.prototype._handleErrors = function(stream) {
BlockStream.prototype._read = function() {
this.lastHeight++;
this.queue.push(this.lastHeight);
if (this.lastEmittedHash === this.bitcoind.tiphash) {
return this.push(null);
}
this.queue.push(++this.lastReadHeight);
this._process();
};
BlockStream.prototype._process = function() {
var self = this;
if (!self.syncing) {
return self.push(null);
}
if(this.processing) {
if(self.processing) {
return;
}
@ -197,33 +197,34 @@ BlockStream.prototype._process = function() {
function() {
return self.queue.length;
}, function(next) {
var heights = self.queue.slice(0, Math.min(5, self.queue.length));
self.queue = self.queue.slice(heights.length);
async.map(heights, function(height, next) {
self.bitcoind.getBlock(height, function(err, block) {
if(err) {
return next(err);
}
block.__height = height;
setTimeout(function() {
next(null, block);
}, 1);
});
}, function(err, blocks) {
if(err) {
return next(err);
}
for(var i = 0; i < blocks.length; i++) {
self.lastEmittedHash = blocks[i].hash;
self.push(blocks[i]);
if (blocks[i].hash === self.bitcoind.tip) {
self.syncing = false;
self.emit('synced');
break;
}
}
next();