From 7862481df5485afab4b17bd28bcb96d7fa5b0fbe Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Wed, 1 Feb 2017 10:31:48 -0500 Subject: [PATCH] Minor cleanup. --- lib/encoding.js | 3 +- lib/services/db/sync.js | 113 +++++++++++++++++++--------------------- 2 files changed, 55 insertions(+), 61 deletions(-) diff --git a/lib/encoding.js b/lib/encoding.js index 64a51d6d..c57ab29e 100644 --- a/lib/encoding.js +++ b/lib/encoding.js @@ -2,7 +2,6 @@ var bitcore = require('bitcore-lib'); var BufferReader = bitcore.encoding.BufferReader; -var utils = require('./utils'); function Encoding(servicePrefix) { this.servicePrefix = servicePrefix; @@ -100,7 +99,7 @@ Encoding.prototype.encodeUtxoIndexValue = function(height, satoshis, scriptBuffe heightBuffer.writeUInt32BE(height); var satoshisBuffer = new Buffer(8); satoshisBuffer.writeDoubleBE(satoshis); - return Buffer.concat([height, satoshisBuffer, scriptBuffer]); + return Buffer.concat([heightBuffer, satoshisBuffer, scriptBuffer]); }; Encoding.prototype.decodeUtxoIndexValue = function(buffer) { diff --git a/lib/services/db/sync.js b/lib/services/db/sync.js index 5ca6d1e2..c3dde482 100644 --- a/lib/services/db/sync.js +++ b/lib/services/db/sync.js @@ -1,4 +1,4 @@ -'use strict' +'use strict'; var Readable = require('stream').Readable; var Writable = require('stream').Writable; var Transform = require('stream').Transform; @@ -8,6 +8,51 @@ var async = require('async'); var bitcore = require('bitcore-lib'); var BufferUtil = bitcore.util.buffer; +function BlockStream(highWaterMark, bitcoind, lastHeight) { + Readable.call(this, {objectMode: true, highWaterMark: highWaterMark}); + this.bitcoind = bitcoind; + this.lastHeight = lastHeight; + this.stopping = false; + this.queue = []; + this.processing = false; +} + +inherits(BlockStream, Readable); + +function ProcessConcurrent(highWaterMark, db) { + Transform.call(this, {objectMode: true, highWaterMark: highWaterMark}); + this.db = db; + this.operations = []; + this.lastBlock = 0; + this.blockCount = 0; +} + +inherits(ProcessConcurrent, Transform); + +function ProcessSerial(highWaterMark, db, tip) { + Writable.call(this, {objectMode: true, highWaterMark: highWaterMark}); + this.db = db; + this.tip = tip; +} + +inherits(ProcessSerial, Writable); + +function ProcessBoth(highWaterMark, db) { + Writable.call(this, {objectMode: true, highWaterMark: highWaterMark}); + this.db = db; +} + +inherits(ProcessBoth, Writable); + +function WriteStream(highWaterMark, db) { + Writable.call(this, {objectMode: true, highWaterMark: highWaterMark}); + this.db = db; + this.writeTime = 0; + this.lastConcurrentOutputHeight = 0; +} + +inherits(WriteStream, Writable); + function Sync(node, db) { this.node = node; this.db = db; @@ -22,7 +67,7 @@ Sync.prototype.initialSync = function() { var self = this; if(this.syncing) { - return + return; } this.syncing = true; @@ -31,8 +76,6 @@ Sync.prototype.initialSync = function() { var writeStream = new WriteStream(this.highWaterMark, this.db); var processSerial = new ProcessSerial(this.highWaterMark, this.db, this.db.tip); - var start = Date.now(); - this._handleErrors(this.blockStream); this._handleErrors(processConcurrent); this._handleErrors(processSerial); @@ -55,8 +98,9 @@ Sync.prototype.initialSync = function() { // Useful for when we are fully synced and we want to advance the concurrentTip and // the tip together Sync.prototype.sync = function() { + var self = this; if(this.syncing) { - return + return; } this.syncing = true; @@ -95,16 +139,6 @@ Sync.prototype._handleErrors = function(stream) { }); }; -function BlockStream(highWaterMark, bitcoind, lastHeight) { - Readable.call(this, {objectMode: true, highWaterMark: highWaterMark}); - this.bitcoind = bitcoind; - this.lastHeight = lastHeight; - this.stopping = false; - this.queue = []; - this.processing = false; -} - -inherits(BlockStream, Readable); // BlockStream.prototype._read = function() { // var self = this; @@ -223,17 +257,13 @@ BlockStream.prototype._process = function() { ); }; -function ProcessSerial(highWaterMark, db, tip) { - Writable.call(this, {objectMode: true, highWaterMark: highWaterMark}); - this.db = db; - this.tip = tip; -} - -inherits(ProcessSerial, Writable); ProcessSerial.prototype._write = function(block, enc, callback) { var self = this; + function check() { + return self.db.concurrentTip.__height >= block.__height; + } //console.log('serial', block.__height); var prevHash = BufferUtil.reverse(block.header.prevHash).toString('hex'); @@ -255,9 +285,6 @@ ProcessSerial.prototype._write = function(block, enc, callback) { self._process(block, callback); }); - function check() { - return self.db.concurrentTip.__height >= block.__height; - } }; ProcessSerial.prototype._process = function(block, callback) { @@ -293,15 +320,6 @@ ProcessSerial.prototype._process = function(block, callback) { }); }; -function ProcessConcurrent(highWaterMark, db) { - Transform.call(this, {objectMode: true, highWaterMark: highWaterMark}); - this.db = db; - this.operations = []; - this.lastBlock = 0; - this.blockCount = 0; -}; - -inherits(ProcessConcurrent, Transform); ProcessConcurrent.prototype._transform = function(block, enc, callback) { var self = this; @@ -323,7 +341,7 @@ ProcessConcurrent.prototype._transform = function(block, enc, callback) { var obj = { concurrentTip: block, operations: self.operations - } + }; self.operations = []; self.blockCount = 0; @@ -337,25 +355,11 @@ ProcessConcurrent.prototype._transform = function(block, enc, callback) { ProcessConcurrent.prototype._flush = function(callback) { if(this.operations.length) { this.operations.push(this.db.getConcurrentTipOperation(this.lastBlock, true)); - var obj = { - concurrentTipHeight: this.lastBlock, - operations: this.operations - }; - this.operations = []; - return callback(null, operations); + return callback(null, this.operations); } }; -function WriteStream(highWaterMark, db) { - Writable.call(this, {objectMode: true, highWaterMark: highWaterMark}); - this.db = db; - this.writeTime = 0; - this.lastConcurrentOutputHeight = 0; -} - -inherits(WriteStream, Writable); - WriteStream.prototype._write = function(obj, enc, callback) { var self = this; @@ -375,15 +379,6 @@ WriteStream.prototype._write = function(obj, enc, callback) { }); }; -var ProcessBoth = function(highWaterMark, db) { - Writable.call(this, {objectMode: true, highWaterMark: highWaterMark}); - this.db = db; - - -}; - -inherits(ProcessBoth, Writable); - ProcessBoth.prototype._write = function(block, encoding, callback) { var self = this; async.parallel([function(next) {