Minor cleanup.

This commit is contained in:
Chris Kleeschulte 2017-02-01 10:31:48 -05:00
parent 3051c04147
commit 7862481df5
2 changed files with 55 additions and 61 deletions

View File

@ -2,7 +2,6 @@
var bitcore = require('bitcore-lib'); var bitcore = require('bitcore-lib');
var BufferReader = bitcore.encoding.BufferReader; var BufferReader = bitcore.encoding.BufferReader;
var utils = require('./utils');
function Encoding(servicePrefix) { function Encoding(servicePrefix) {
this.servicePrefix = servicePrefix; this.servicePrefix = servicePrefix;
@ -100,7 +99,7 @@ Encoding.prototype.encodeUtxoIndexValue = function(height, satoshis, scriptBuffe
heightBuffer.writeUInt32BE(height); heightBuffer.writeUInt32BE(height);
var satoshisBuffer = new Buffer(8); var satoshisBuffer = new Buffer(8);
satoshisBuffer.writeDoubleBE(satoshis); satoshisBuffer.writeDoubleBE(satoshis);
return Buffer.concat([height, satoshisBuffer, scriptBuffer]); return Buffer.concat([heightBuffer, satoshisBuffer, scriptBuffer]);
}; };
Encoding.prototype.decodeUtxoIndexValue = function(buffer) { Encoding.prototype.decodeUtxoIndexValue = function(buffer) {

View File

@ -1,4 +1,4 @@
'use strict' 'use strict';
var Readable = require('stream').Readable; var Readable = require('stream').Readable;
var Writable = require('stream').Writable; var Writable = require('stream').Writable;
var Transform = require('stream').Transform; var Transform = require('stream').Transform;
@ -8,6 +8,51 @@ var async = require('async');
var bitcore = require('bitcore-lib'); var bitcore = require('bitcore-lib');
var BufferUtil = bitcore.util.buffer; var BufferUtil = bitcore.util.buffer;
function BlockStream(highWaterMark, bitcoind, lastHeight) {
Readable.call(this, {objectMode: true, highWaterMark: highWaterMark});
this.bitcoind = bitcoind;
this.lastHeight = lastHeight;
this.stopping = false;
this.queue = [];
this.processing = false;
}
inherits(BlockStream, Readable);
function ProcessConcurrent(highWaterMark, db) {
Transform.call(this, {objectMode: true, highWaterMark: highWaterMark});
this.db = db;
this.operations = [];
this.lastBlock = 0;
this.blockCount = 0;
}
inherits(ProcessConcurrent, Transform);
function ProcessSerial(highWaterMark, db, tip) {
Writable.call(this, {objectMode: true, highWaterMark: highWaterMark});
this.db = db;
this.tip = tip;
}
inherits(ProcessSerial, Writable);
function ProcessBoth(highWaterMark, db) {
Writable.call(this, {objectMode: true, highWaterMark: highWaterMark});
this.db = db;
}
inherits(ProcessBoth, Writable);
function WriteStream(highWaterMark, db) {
Writable.call(this, {objectMode: true, highWaterMark: highWaterMark});
this.db = db;
this.writeTime = 0;
this.lastConcurrentOutputHeight = 0;
}
inherits(WriteStream, Writable);
function Sync(node, db) { function Sync(node, db) {
this.node = node; this.node = node;
this.db = db; this.db = db;
@ -22,7 +67,7 @@ Sync.prototype.initialSync = function() {
var self = this; var self = this;
if(this.syncing) { if(this.syncing) {
return return;
} }
this.syncing = true; this.syncing = true;
@ -31,8 +76,6 @@ Sync.prototype.initialSync = function() {
var writeStream = new WriteStream(this.highWaterMark, this.db); var writeStream = new WriteStream(this.highWaterMark, this.db);
var processSerial = new ProcessSerial(this.highWaterMark, this.db, this.db.tip); var processSerial = new ProcessSerial(this.highWaterMark, this.db, this.db.tip);
var start = Date.now();
this._handleErrors(this.blockStream); this._handleErrors(this.blockStream);
this._handleErrors(processConcurrent); this._handleErrors(processConcurrent);
this._handleErrors(processSerial); this._handleErrors(processSerial);
@ -55,8 +98,9 @@ Sync.prototype.initialSync = function() {
// Useful for when we are fully synced and we want to advance the concurrentTip and // Useful for when we are fully synced and we want to advance the concurrentTip and
// the tip together // the tip together
Sync.prototype.sync = function() { Sync.prototype.sync = function() {
var self = this;
if(this.syncing) { if(this.syncing) {
return return;
} }
this.syncing = true; this.syncing = true;
@ -95,16 +139,6 @@ Sync.prototype._handleErrors = function(stream) {
}); });
}; };
function BlockStream(highWaterMark, bitcoind, lastHeight) {
Readable.call(this, {objectMode: true, highWaterMark: highWaterMark});
this.bitcoind = bitcoind;
this.lastHeight = lastHeight;
this.stopping = false;
this.queue = [];
this.processing = false;
}
inherits(BlockStream, Readable);
// BlockStream.prototype._read = function() { // BlockStream.prototype._read = function() {
// var self = this; // var self = this;
@ -223,17 +257,13 @@ BlockStream.prototype._process = function() {
); );
}; };
function ProcessSerial(highWaterMark, db, tip) {
Writable.call(this, {objectMode: true, highWaterMark: highWaterMark});
this.db = db;
this.tip = tip;
}
inherits(ProcessSerial, Writable);
ProcessSerial.prototype._write = function(block, enc, callback) { ProcessSerial.prototype._write = function(block, enc, callback) {
var self = this; var self = this;
function check() {
return self.db.concurrentTip.__height >= block.__height;
}
//console.log('serial', block.__height); //console.log('serial', block.__height);
var prevHash = BufferUtil.reverse(block.header.prevHash).toString('hex'); var prevHash = BufferUtil.reverse(block.header.prevHash).toString('hex');
@ -255,9 +285,6 @@ ProcessSerial.prototype._write = function(block, enc, callback) {
self._process(block, callback); self._process(block, callback);
}); });
function check() {
return self.db.concurrentTip.__height >= block.__height;
}
}; };
ProcessSerial.prototype._process = function(block, callback) { ProcessSerial.prototype._process = function(block, callback) {
@ -293,15 +320,6 @@ ProcessSerial.prototype._process = function(block, callback) {
}); });
}; };
function ProcessConcurrent(highWaterMark, db) {
Transform.call(this, {objectMode: true, highWaterMark: highWaterMark});
this.db = db;
this.operations = [];
this.lastBlock = 0;
this.blockCount = 0;
};
inherits(ProcessConcurrent, Transform);
ProcessConcurrent.prototype._transform = function(block, enc, callback) { ProcessConcurrent.prototype._transform = function(block, enc, callback) {
var self = this; var self = this;
@ -323,7 +341,7 @@ ProcessConcurrent.prototype._transform = function(block, enc, callback) {
var obj = { var obj = {
concurrentTip: block, concurrentTip: block,
operations: self.operations operations: self.operations
} };
self.operations = []; self.operations = [];
self.blockCount = 0; self.blockCount = 0;
@ -337,25 +355,11 @@ ProcessConcurrent.prototype._transform = function(block, enc, callback) {
ProcessConcurrent.prototype._flush = function(callback) { ProcessConcurrent.prototype._flush = function(callback) {
if(this.operations.length) { if(this.operations.length) {
this.operations.push(this.db.getConcurrentTipOperation(this.lastBlock, true)); this.operations.push(this.db.getConcurrentTipOperation(this.lastBlock, true));
var obj = {
concurrentTipHeight: this.lastBlock,
operations: this.operations
};
this.operations = []; this.operations = [];
return callback(null, operations); return callback(null, this.operations);
} }
}; };
function WriteStream(highWaterMark, db) {
Writable.call(this, {objectMode: true, highWaterMark: highWaterMark});
this.db = db;
this.writeTime = 0;
this.lastConcurrentOutputHeight = 0;
}
inherits(WriteStream, Writable);
WriteStream.prototype._write = function(obj, enc, callback) { WriteStream.prototype._write = function(obj, enc, callback) {
var self = this; var self = this;
@ -375,15 +379,6 @@ WriteStream.prototype._write = function(obj, enc, callback) {
}); });
}; };
var ProcessBoth = function(highWaterMark, db) {
Writable.call(this, {objectMode: true, highWaterMark: highWaterMark});
this.db = db;
};
inherits(ProcessBoth, Writable);
ProcessBoth.prototype._write = function(block, encoding, callback) { ProcessBoth.prototype._write = function(block, encoding, callback) {
var self = this; var self = this;
async.parallel([function(next) { async.parallel([function(next) {