make ProcessSerial a Writable stream

This commit is contained in:
Patrick Nagurny 2017-01-27 17:02:38 -05:00
parent fae38b1ee8
commit 4b51bc0143

View File

@ -28,35 +28,27 @@ Sync.prototype.initialSync = function() {
this.syncing = true; this.syncing = true;
this.blockStream = new BlockStream(this.highWaterMark, this.node.services.bitcoind, this.db.tip.__height); this.blockStream = new BlockStream(this.highWaterMark, this.node.services.bitcoind, this.db.tip.__height);
var processConcurrent = new ProcessConcurrent(this.highWaterMark, this.db); var processConcurrent = new ProcessConcurrent(this.highWaterMark, this.db);
var writeStream = new WriteStream(this.highWaterMark, this.db);
var processSerial = new ProcessSerial(this.highWaterMark, this.db, this.db.tip); var processSerial = new ProcessSerial(this.highWaterMark, this.db, this.db.tip);
var writeStream1 = new WriteStream(this.highWaterMark, this.db);
var writeStream2 = new WriteStream(this.highWaterMark, this.db);
var start = Date.now(); var start = Date.now();
this._handleErrors(this.blockStream); this._handleErrors(this.blockStream);
this._handleErrors(processConcurrent); this._handleErrors(processConcurrent);
this._handleErrors(processSerial); this._handleErrors(processSerial);
this._handleErrors(writeStream1); this._handleErrors(writeStream);
this._handleErrors(writeStream2);
writeStream2.on('finish', function() {
var end = Date.now();
console.log('Total time: ', (end - start) + ' ms');
console.log('Concurrent write time: ', writeStreamSlow.writeTime + ' ms');
console.log('Serial write time: ', writeStreamFast.writeTime + ' ms');
processSerial.on('finish', function() {
self.syncing = false; self.syncing = false;
self.emit('initialsync'); self.emit('initialsync');
}); });
this.blockStream this.blockStream
.pipe(processConcurrent) .pipe(processConcurrent)
.pipe(writeStream1); .pipe(writeStream);
this.blockStream this.blockStream
.pipe(processSerial) .pipe(processSerial);
.pipe(writeStream2);
}; };
// Get concurrent and serial block operations and write them together block by block // Get concurrent and serial block operations and write them together block by block
@ -240,14 +232,14 @@ BlockStream.prototype._process = function() {
} }
function ProcessSerial(highWaterMark, db, tip) { function ProcessSerial(highWaterMark, db, tip) {
Transform.call(this, {objectMode: true, highWaterMark: highWaterMark}); Writable.call(this, {objectMode: true, highWaterMark: highWaterMark});
this.db = db; this.db = db;
this.tip = tip; this.tip = tip;
} }
inherits(ProcessSerial, Transform); inherits(ProcessSerial, Writable);
ProcessSerial.prototype._transform = function(block, enc, callback) { ProcessSerial.prototype._write = function(block, enc, callback) {
var self = this; var self = this;
//console.log('serial', block.__height); //console.log('serial', block.__height);
@ -259,28 +251,20 @@ ProcessSerial.prototype._transform = function(block, enc, callback) {
return callback(err); return callback(err);
} }
if(check) { if(check()) {
return self._process(block, callback); return self._process(block, callback);
} }
var processed = false; self.db.once('concurrentblock', function() {
if(!check()) {
self.db.on('addblock', function() { var err = new Error('Concurrent block ' + self.db.concurrentTip.__height + ' is less than ' + block.__height);
if(!processed) { return self.emit('error', err);
processed = true;
return self._process(block, callback);
}
});
self.db.on('concurrentblock', function() {
if(!processed) {
processed = true;
return self._process(block, callback);
} }
self._process(block, callback);
}); });
function check() { function check() {
return self.db.concurrentTip.__height < block.__height || self.db.tip.__height < block.__height - 1; return self.db.concurrentTip.__height < block.__height;
} }
}; };
@ -301,7 +285,19 @@ ProcessSerial.prototype._process = function(block, callback) {
self.tip = block; self.tip = block;
callback(null, obj); self.db.store.batch(obj.operations, function(err) {
if(err) {
return callback(err);
}
self.db.tip = block;
self.db.emit('addblock');
if(self.db.tip.__height % 100 === 0) {
console.log('Tip:', self.db.tip.__height);
}
callback();
});
}); });
}; };
@ -376,21 +372,11 @@ WriteStream.prototype._write = function(obj, enc, callback) {
return callback(err); return callback(err);
} }
if(obj.tip) { self.db.concurrentTip = obj.concurrentTip;
self.db.tip = obj.tip; self.db.emit('concurrentaddblock');
self.db.emit('addblock'); if(self.db.concurrentTip.__height - self.lastConcurrentOutputHeight >= 100) {
if(self.db.tip.__height % 100 === 0) { console.log('Concurrent tip:', self.db.concurrentTip.__height);
console.log('Tip:', self.db.tip.__height); self.lastConcurrentOutputHeight = self.db.concurrentTip.__height;
}
}
if(obj.concurrentTip) {
self.db.concurrentTip = obj.concurrentTip;
self.db.emit('concurrentaddblock');
if(self.db.concurrentTip.__height - self.lastConcurrentOutputHeight >= 100) {
console.log('Concurrent tip:', self.db.concurrentTip.__height);
self.lastConcurrentOutputHeight = self.db.concurrentTip.__height;
}
} }
callback(); callback();