From 4b51bc01436414fb214d6764a1ec7a3e52f92013 Mon Sep 17 00:00:00 2001 From: Patrick Nagurny Date: Fri, 27 Jan 2017 17:02:38 -0500 Subject: [PATCH] make ProcessSerial a Writable stream --- lib/services/db/sync.js | 80 +++++++++++++++++------------------------ 1 file changed, 33 insertions(+), 47 deletions(-) diff --git a/lib/services/db/sync.js b/lib/services/db/sync.js index 1a96ca16..ed9a5348 100644 --- a/lib/services/db/sync.js +++ b/lib/services/db/sync.js @@ -28,35 +28,27 @@ Sync.prototype.initialSync = function() { this.syncing = true; this.blockStream = new BlockStream(this.highWaterMark, this.node.services.bitcoind, this.db.tip.__height); var processConcurrent = new ProcessConcurrent(this.highWaterMark, this.db); + var writeStream = new WriteStream(this.highWaterMark, this.db); var processSerial = new ProcessSerial(this.highWaterMark, this.db, this.db.tip); - var writeStream1 = new WriteStream(this.highWaterMark, this.db); - var writeStream2 = new WriteStream(this.highWaterMark, this.db); var start = Date.now(); this._handleErrors(this.blockStream); this._handleErrors(processConcurrent); this._handleErrors(processSerial); - this._handleErrors(writeStream1); - this._handleErrors(writeStream2); - - writeStream2.on('finish', function() { - var end = Date.now(); - console.log('Total time: ', (end - start) + ' ms'); - console.log('Concurrent write time: ', writeStreamSlow.writeTime + ' ms'); - console.log('Serial write time: ', writeStreamFast.writeTime + ' ms'); + this._handleErrors(writeStream); + processSerial.on('finish', function() { self.syncing = false; self.emit('initialsync'); }); this.blockStream .pipe(processConcurrent) - .pipe(writeStream1); + .pipe(writeStream); this.blockStream - .pipe(processSerial) - .pipe(writeStream2); + .pipe(processSerial); }; // Get concurrent and serial block operations and write them together block by block @@ -240,14 +232,14 @@ BlockStream.prototype._process = function() { } function ProcessSerial(highWaterMark, db, tip) { - Transform.call(this, {objectMode: true, highWaterMark: highWaterMark}); + Writable.call(this, {objectMode: true, highWaterMark: highWaterMark}); this.db = db; this.tip = tip; } -inherits(ProcessSerial, Transform); +inherits(ProcessSerial, Writable); -ProcessSerial.prototype._transform = function(block, enc, callback) { +ProcessSerial.prototype._write = function(block, enc, callback) { var self = this; //console.log('serial', block.__height); @@ -259,28 +251,20 @@ ProcessSerial.prototype._transform = function(block, enc, callback) { return callback(err); } - if(check) { + if(check()) { return self._process(block, callback); } - var processed = false; - - self.db.on('addblock', function() { - if(!processed) { - processed = true; - return self._process(block, callback); - } - }); - - self.db.on('concurrentblock', function() { - if(!processed) { - processed = true; - return self._process(block, callback); + self.db.once('concurrentblock', function() { + if(!check()) { + var err = new Error('Concurrent block ' + self.db.concurrentTip.__height + ' is less than ' + block.__height); + return self.emit('error', err); } + self._process(block, callback); }); function check() { - return self.db.concurrentTip.__height < block.__height || self.db.tip.__height < block.__height - 1; + return self.db.concurrentTip.__height < block.__height; } }; @@ -301,7 +285,19 @@ ProcessSerial.prototype._process = function(block, callback) { self.tip = block; - callback(null, obj); + self.db.store.batch(obj.operations, function(err) { + if(err) { + return callback(err); + } + + self.db.tip = block; + self.db.emit('addblock'); + if(self.db.tip.__height % 100 === 0) { + console.log('Tip:', self.db.tip.__height); + } + + callback(); + }); }); }; @@ -376,21 +372,11 @@ WriteStream.prototype._write = function(obj, enc, callback) { return callback(err); } - if(obj.tip) { - self.db.tip = obj.tip; - self.db.emit('addblock'); - if(self.db.tip.__height % 100 === 0) { - console.log('Tip:', self.db.tip.__height); - } - } - - if(obj.concurrentTip) { - self.db.concurrentTip = obj.concurrentTip; - self.db.emit('concurrentaddblock'); - if(self.db.concurrentTip.__height - self.lastConcurrentOutputHeight >= 100) { - console.log('Concurrent tip:', self.db.concurrentTip.__height); - self.lastConcurrentOutputHeight = self.db.concurrentTip.__height; - } + self.db.concurrentTip = obj.concurrentTip; + self.db.emit('concurrentaddblock'); + if(self.db.concurrentTip.__height - self.lastConcurrentOutputHeight >= 100) { + console.log('Concurrent tip:', self.db.concurrentTip.__height); + self.lastConcurrentOutputHeight = self.db.concurrentTip.__height; } callback();