From 735810919a6ba9c5420eb6c9d05be2d0f62d98af Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Mon, 30 Jan 2017 14:59:53 -0500 Subject: [PATCH] wip --- lib/services/db/index.js | 2 +- lib/services/db/sync.js | 58 ++++++++++++++++++++++++++++++++-------- 2 files changed, 48 insertions(+), 12 deletions(-) diff --git a/lib/services/db/index.js b/lib/services/db/index.js index 077e57ce..04592669 100644 --- a/lib/services/db/index.js +++ b/lib/services/db/index.js @@ -182,7 +182,7 @@ DB.prototype.start = function(callback) { this.node.once('ready', function() { // start syncing - self._sync.initialSync(); + self._sync.sync(); // Notify that there is a new tip self.node.services.bitcoind.on('tip', function() { diff --git a/lib/services/db/sync.js b/lib/services/db/sync.js index c2a0b24d..52404197 100644 --- a/lib/services/db/sync.js +++ b/lib/services/db/sync.js @@ -40,7 +40,7 @@ Sync.prototype.initialSync = function() { processSerial.on('finish', function() { self.syncing = false; - self.emit('initialsync'); + self.emit('synced'); }); this.blockStream @@ -62,20 +62,13 @@ Sync.prototype.sync = function() { this.syncing = true; this.blockStream = new BlockStream(); var processBoth = new ProcessBoth(this.db); - var writeStream = new WriteStream(this.db); - - var start = Date.now(); this._handleErrors(this.blockStream); this._handleErrors(processBoth); - this._handleErrors(writeStream); - writeStream.on('finish', function() { - var end = Date.now(); - console.log('Total time: ', (end - start) + ' ms'); - console.log('Concurrent write time: ', writeStreamSlow.writeTime + ' ms'); - console.log('Serial write time: ', writeStreamFast.writeTime + ' ms'); - self.emit('synced'); + //TODO what should happen here + processBoth.on('finish', function() { + self.syncing = false; }); this.blockStream @@ -383,4 +376,47 @@ WriteStream.prototype._write = function(obj, enc, callback) { }); }; +var ProcessBoth = function(highWaterMark, db) { + Writable.call(this, {objectMode: true, highWaterMark: highWaterMark}); + this.db = db; + + +}; + +inherits(ProcessBoth, Writable); + +ProcessBoth.prototype._write = function(block, encoding, callback) { + + async.parallel([function(next) { + self.db.getConcurrentBlockOperations(block, true, function(err, operations) { + if(err) { + return callback(err); + } + operations.push(self.db.getConcurrentTipOperation(block, true)); + next(null, operations); + }); + }, function(next) { + self.db.getSerialBlockOperations(block, true, function(err, operations) { + if(err) { + return callback(err); + } + operations.push(self.db.getTipOperation(block, true)); + next(null, operations); + }); + }], function(err, results) { + if(err) { + return callback(err); + } + var operations = results[0].concat(results[1]); + self.db.store.batch(operations, function(err) { + if(err) { + return callback(err); + } + self.db.tip = block; + self.db.concurrentTip = block; + callback(); + }); + }); +}; + module.exports = Sync;