got it working

This commit is contained in:
Patrick Nagurny 2017-01-26 18:09:36 -05:00
parent 77c9a149dc
commit 26108753db
3 changed files with 311 additions and 543 deletions

View File

@ -134,7 +134,7 @@ AddressService.prototype.getPublishEvents = function() {
* @param {Boolean} addOutput - If the block is being removed or added to the chain
* @param {Function} callback
*/
AddressService.prototype.blockHandler = function(block, connectBlock, callback) {
AddressService.prototype.concurrentBlockHandler = function(block, connectBlock, callback) {
var self = this;
var txs = block.transactions;

View File

@ -11,11 +11,11 @@ var BufferUtil = bitcore.util.buffer;
var Networks = bitcore.Networks;
var Block = bitcore.Block;
var $ = bitcore.util.preconditions;
var index = require('../');
var index = require('../../');
var errors = index.errors;
var log = index.log;
var Transaction = require('../transaction');
var Service = require('../service');
var Transaction = require('../../transaction');
var Service = require('../../service');
var Sync = require('./sync');
/**
@ -67,7 +67,7 @@ function DB(options) {
block: []
};
this._sync = new Sync(this.node);
this._sync = new Sync(this.node, this);
}
util.inherits(DB, Service);
@ -165,7 +165,7 @@ DB.prototype.start = function(callback) {
this.node.services.bitcoind.on('tx', this.transactionHandler.bind(this));
this._sync.on('error', function(err) {
log.err(err);
log.error(err);
});
this._sync.on('reorg', function() {
@ -206,7 +206,7 @@ DB.prototype.start = function(callback) {
return callback(err);
}
self.loadConcurrencySyncHeight(callback);
self.loadConcurrentTip(callback);
});
});
};
@ -336,28 +336,52 @@ DB.prototype.loadTip = function(callback) {
});
};
DB.prototype.loadConcurrencySyncHeight = function(callback) {
DB.prototype.loadConcurrentTip = function(callback) {
var self = this;
console.log('loadConcurrencySyncHeight');
var options = {
keyEncoding: 'string',
valueEncoding: 'binary'
};
self.store.get(self.dbPrefix + 'concurrentHeight', options, function(err, heightBuffer) {
if(err) {
if(err.notFound) {
console.log('setting to 0');
self.concurrentHeight = 0;
return callback();
}
self.store.get(self.dbPrefix + 'concurrentTip', options, function(err, tipData) {
if(err && err instanceof levelup.errors.NotFoundError) {
self.concurrentTip = self.genesis;
self.concurrentTip.__height = 0;
return;
} else if(err) {
return callback(err);
}
var hash = tipData.slice(0, 32).toString('hex');
var height = tipData.readUInt32BE(32);
self.concurrentHeight = heightBuffer.readUInt32BE();
callback();
var times = 0;
async.retry({times: 3, interval: self.retryInterval}, function(done) {
self.getBlock(hash, function(err, concurrentTip) {
if(err) {
times++;
log.warn('Bitcoind does not have our concurrentTip (' + hash + '). Bitcoind may have crashed and needs to catch up.');
if(times < 3) {
log.warn('Retrying in ' + (self.retryInterval / 1000) + ' seconds.');
}
return done(err);
}
done(null, concurrentTip);
});
}, function(err, concurrentTip) {
if(err) {
log.warn('Giving up after 3 tries. Please report this bug to https://github.com/bitpay/bitcore-node/issues');
log.warn('Please reindex your database.');
return callback(err);
}
concurrentTip.__height = height;
self.concurrentTip = concurrentTip;
callback();
});
});
};
@ -496,8 +520,30 @@ DB.prototype.getPrevHash = function(blockHash, callback) {
* @param {Function} callback
*/
DB.prototype.connectBlock = function(block, callback) {
var self = this;
log.debug('DB handling new chain block');
this.runAllBlockHandlers(block, true, callback);
var operations = [];
self.getConcurrentBlockOperations(block, true, function(err, ops) {
if(err) {
return callback(err);
}
operations = ops;
self.getSerialBlockOperations(block, true, function(err, ops) {
if(err) {
return callback(err);
}
operations = operations.concat(ops);
operations.push(self.getTipOperation(block, true));
operations.push(self.getConcurrentTipOperation(block, true));
self.store.batch(operations, callback);
});
});
};
/**
@ -506,94 +552,41 @@ DB.prototype.connectBlock = function(block, callback) {
* @param {Function} callback
*/
DB.prototype.disconnectBlock = function(block, callback) {
log.debug('DB removing chain block');
this.runAllBlockHandlers(block, false, callback);
};
DB.prototype.getTipOperation = function(add) {
if(add) {
heightBuffer.writeUInt32BE(block.__height);
tipData = Buffer.concat([new Buffer(block.hash, 'hex'), heightBuffer]);
} else {
heightBuffer.writeUInt32BE(block.__height - 1);
tipData = Buffer.concat([BufferUtil.reverse(block.header.prevHash), heightBuffer]);
}
};
/**
* Will collect all database operations for a block from other services that implement
* `blockHandler` methods and then save operations to the database.
* @param {Block} block - The bitcore block
* @param {Boolean} add - If the block is being added/connected or removed/disconnected
* @param {Function} callback
*/
DB.prototype.runAllBlockHandlers = function(block, add, callback) {
var self = this;
log.debug('DB removing chain block');
var operations = [];
self.getConcurrentBlockOperations(block, false, function(err, ops) {
if(err) {
return callback(err);
}
// Notify block subscribers
// for (var i = 0; i < this.subscriptions.block.length; i++) {
// this.subscriptions.block[i].emit('db/block', block.hash);
// }
operations = ops;
// Update tip
var tipData = new Buffer(36);
var heightBuffer = new Buffer(4);
if(add) {
heightBuffer.writeUInt32BE(block.__height);
tipData = Buffer.concat([new Buffer(block.hash, 'hex'), heightBuffer]);
} else {
heightBuffer.writeUInt32BE(block.__height - 1);
tipData = Buffer.concat([BufferUtil.reverse(block.header.prevHash), heightBuffer]);
}
operations.push({
type: 'put',
key: self.dbPrefix + 'tip',
value: tipData
});
async.eachSeries(
this.node.services,
function(mod, next) {
if(mod.blockHandler) {
$.checkArgument(typeof mod.blockHandler === 'function', 'blockHandler must be a function');
mod.blockHandler.call(mod, block, add, function(err, ops) {
if (err) {
return next(err);
}
if (ops) {
$.checkArgument(Array.isArray(ops), 'blockHandler for ' + mod.name + ' returned non-array');
operations = operations.concat(ops);
}
next();
});
} else {
setImmediate(next);
}
},
function(err) {
if (err) {
self.getSerialBlockOperations(block, false, function(err, ops) {
if(err) {
return callback(err);
}
log.debug('Updating the database with operations', operations);
operations = operations.concat(ops);
operations.push(self.getTipOperation(block, false));
operations.push(self.getConcurrentTipOperation(block, false));
self.store.batch(operations, callback);
}
);
});
});
};
DB.prototype.runAllConcurrentBlockHandlers = function(block, add, callback) {
DB.prototype.getConcurrentBlockOperations = function(block, add, callback) {
var self = this;
var operations = [];
async.each(
this.node.services,
function(mod, next) {
if(mod.concurrenctBlockHandler) {
$.checkArgument(typeof mod.concurrenctBlockHandler === 'function', 'concurrentBlockHandler must be a function');
if(mod.concurrentBlockHandler) {
$.checkArgument(typeof mod.concurrentBlockHandler === 'function', 'concurrentBlockHandler must be a function');
mod.concurrentBlockHandler.call(mod, block, add, function(err, ops) {
if (err) {
@ -620,415 +613,81 @@ DB.prototype.runAllConcurrentBlockHandlers = function(block, add, callback) {
);
};
/**
* This function will find the common ancestor between the current chain and a forked block,
* by moving backwards on both chains until there is a meeting point.
* @param {Block} block - The new tip that forks the current chain.
* @param {Function} done - A callback function that is called when complete.
*/
DB.prototype.findCommonAncestor = function(block, done) {
DB.prototype.getSerialBlockOperations = function(block, add, callback) {
var self = this;
var operations = [];
var mainPosition = self.tip.hash;
var forkPosition = block.hash;
async.eachSeries(
this.node.services,
function(mod, next) {
if(mod.blockHandler) {
$.checkArgument(typeof mod.blockHandler === 'function', 'blockHandler must be a function');
var mainHashesMap = {};
var forkHashesMap = {};
mod.blockHandler.call(mod, block, add, function(err, ops) {
if (err) {
return next(err);
}
if (ops) {
$.checkArgument(Array.isArray(ops), 'blockHandler for ' + mod.name + ' returned non-array');
operations = operations.concat(ops);
}
mainHashesMap[mainPosition] = true;
forkHashesMap[forkPosition] = true;
var commonAncestor = null;
async.whilst(
function() {
return !commonAncestor;
},
function(next) {
if(mainPosition) {
var mainBlockIndex = self.node.services.bitcoind.getBlockIndex(mainPosition);
if(mainBlockIndex && mainBlockIndex.prevHash) {
mainHashesMap[mainBlockIndex.prevHash] = true;
mainPosition = mainBlockIndex.prevHash;
} else {
mainPosition = null;
}
next();
});
} else {
setImmediate(next);
}
if(forkPosition) {
var forkBlockIndex = self.node.services.bitcoind.getBlockIndex(forkPosition);
if(forkBlockIndex && forkBlockIndex.prevHash) {
forkHashesMap[forkBlockIndex.prevHash] = true;
forkPosition = forkBlockIndex.prevHash;
} else {
forkPosition = null;
}
}
if(forkPosition && mainHashesMap[forkPosition]) {
commonAncestor = forkPosition;
}
if(mainPosition && forkHashesMap[mainPosition]) {
commonAncestor = mainPosition;
}
if(!mainPosition && !forkPosition) {
return next(new Error('Unknown common ancestor'));
}
setImmediate(next);
},
function(err) {
done(err, commonAncestor);
if (err) {
return callback(err);
}
callback(null, operations);
}
);
};
/**
* This function will attempt to rewind the chain to the common ancestor
* between the current chain and a forked block.
* @param {Block} block - The new tip that forks the current chain.
* @param {Function} done - A callback function that is called when complete.
*/
DB.prototype.syncRewind = function(block, done) {
DB.prototype.getTipOperation = function(block, add) {
var heightBuffer = new Buffer(4);
var tipData;
var self = this;
self.findCommonAncestor(block, function(err, ancestorHash) {
if (err) {
return done(err);
}
log.warn('Reorg common ancestor found:', ancestorHash);
// Rewind the chain to the common ancestor
async.whilst(
function() {
// Wait until the tip equals the ancestor hash
return self.tip.hash !== ancestorHash;
},
function(removeDone) {
var tip = self.tip;
// TODO: expose prevHash as a string from bitcore
var prevHash = BufferUtil.reverse(tip.header.prevHash).toString('hex');
self.getBlock(prevHash, function(err, previousTip) {
if (err) {
removeDone(err);
}
// Undo the related indexes for this block
self.disconnectBlock(tip, function(err) {
if (err) {
return removeDone(err);
}
// Set the new tip
previousTip.__height = self.tip.__height - 1;
self.tip = previousTip;
self.emit('removeblock', tip);
removeDone();
});
});
}, done
);
});
};
DB.prototype._concurrentSync = function(block, callback) {
var self = this;
console.log('concurrentSync');
console.log('Processing ' + blocks.length + ' blocks');
var operations = [];
async.each(blocks, function(block, next) {
self.runAllConcurrentBlockHandlers(block, true, function(err, ops) {
if(err) {
return next(err);
}
operations = operations.concat(ops);
next();
});
}, function(err) {
if(err) {
return done(err);
}
var newHeight = self.concurrentHeight + blocks.length;
// push concurrent height
var heightBuffer = new Buffer(4);
heightBuffer.writeUInt32BE(newHeight);
operations.push({
type: 'put',
key: self.dbPrefix + 'concurrentHeight',
value: heightBuffer
});
log.debug('Updating the database with operations', operations);
self.store.batch(operations, function(err) {
if(err) {
return done(err);
}
self.concurrentHeight += blocks.length;
done();
});
});
};
DB.prototype._serialSync = function(block, callback) {
console.log('serialSync');
var self = this;
// Create indexes
self.connectBlock(block, function(err) {
if (err) {
return callback(err);
}
self.tip = block;
log.debug('Chain added block to main chain');
self.emit('addblock', block);
callback();
});
};
DB.prototype.sync = function() {
var self = this;
if (this.bitcoindSyncing || this.node.stopping || !this.tip) {
return;
if(add) {
heightBuffer.writeUInt32BE(block.__height);
tipData = Buffer.concat([new Buffer(block.hash, 'hex'), heightBuffer]);
} else {
heightBuffer.writeUInt32BE(block.__height - 1);
tipData = Buffer.concat([BufferUtil.reverse(block.header.prevHash), heightBuffer]);
}
this.bitcoindSyncing = true;
// keep n blocks in block buffer
// when we run out get more from bitcoind
var lastHeight = self.tip.__height;
var blocks = {};
var syncError = null;
async.whilst(function() {
return lastHeight < self.node.services.bitcoind.height && !self.node.stopping && !syncError;
}, function(done) {
var blockCount = Math.min(self.node.services.bitcoind.height - lastHeight, 30 - Object.keys(blocks).length, 5);
if(!blockCount) {
return setTimeout(done, 1000);
}
async.times(blockCount, function(n, next) {
var height = lastHeight + n + 1;
self.node.services.bitcoind.getBlock(height, function(err, block) {
if(err) {
return next(err);
}
blocks[height] = block;
next();
});
}, function(err) {
if(err) {
return done(err);
}
for(var i = 0; i < blockCount.length; i++) {
var block = blocks[lastHeight + i + 1];
block.__height = lastHeight + i + 1;
self._concurrentSync(block, function(err) {
if(err) {
syncError = err;
return;
}
self._serialSync(block, function(err) {
if(err) {
syncError = err;
return;
}
delete blocks[lastHeight + i + 1];
if(self.tip.__height === self.node.services.bitcoind.height) {
self.bitcoindSyncing = false;
if(self.isSynced()) {
self.node.emit('synced');
}
}
});
});
}
lastHeight += blockCount;
done();
});
}, function(err) {
if (err) {
Error.captureStackTrace(err);
self.node.emit('error', err);
return;
}
if(syncError) {
Error.captureStackTrace(syncError);
self.node.emit('error', syncError);
return;
}
if(self.node.stopping) {
self.bitcoindSyncing = false;
}
});
return {
type: 'put',
key: this.dbPrefix + 'tip',
value: tipData
};
};
DB.prototype.isSynced = function() {
return self.bitcoind.isSynced() && self.tip.__height === self.node.services.bitcoind.height;
};
DB.prototype.getConcurrentTipOperation = function(block, add) {
var heightBuffer = new Buffer(4);
var tipData;
DB.prototype._getBitcoindBlocks = function() {
var self = this;
console.log('getBitcoindBlocks');
// keep n blocks in block buffer
// when we run out get more from bitcoind
var lastHeight = self.tip.__height;
async.whilst(function() {
return lastHeight < self.node.services.bitcoind.height;
}, function(done) {
var blockCount = Math.min(self.node.services.bitcoind.height - lastHeight, 20 - Object.keys(self.syncBuffer).length);
if(!blockCount) {
return setTimeout(done, 1000);
}
async.timesLimit(blockCount, self.concurrentBlocks, function(n, next) {
var height = lastHeight + n + 1;
self.node.services.bitcoind.getBlock(height, function(err, block) {
if(err) {
return next(err);
}
self.concurrentSyncBuffer.push(block);
self.syncBuffer[height] = block;
next();
});
}, function(err) {
if(err) {
return done(err);
}
lastHeight += blockCount;
done();
});
}, function(err) {
console.log('completed', err);
if (err) {
Error.captureStackTrace(err);
return self.node.emit('error', err);
}
});
};
/**
* This function will synchronize additional indexes for the chain based on
* the current active chain in the bitcoin daemon. In the event that there is
* a reorganization in the daemon, the chain will rewind to the last common
* ancestor and then resume syncing.
*/
DB.prototype.syncOld = function() {
var self = this;
if (self.bitcoindSyncing || self.node.stopping || !self.tip) {
return;
if(add) {
heightBuffer.writeUInt32BE(block.__height);
tipData = Buffer.concat([new Buffer(block.hash, 'hex'), heightBuffer]);
} else {
heightBuffer.writeUInt32BE(block.__height - 1);
tipData = Buffer.concat([BufferUtil.reverse(block.header.prevHash), heightBuffer]);
}
self.bitcoindSyncing = true;
var height;
async.whilst(function() {
height = self.tip.__height;
return height < self.node.services.bitcoind.height && !self.node.stopping;
}, function(done) {
console.log('fetching block ' + (height + 1));
self.node.services.bitcoind.getBlock(height + 1, function(err, block) {
if (err) {
return done(err);
}
// TODO: expose prevHash as a string from bitcore
var prevHash = BufferUtil.reverse(block.header.prevHash).toString('hex');
if (prevHash === self.tip.hash) {
// This block appends to the current chain tip and we can
// immediately add it to the chain and create indexes.
// Populate height
block.__height = self.tip.__height + 1;
// Create indexes
self.connectBlock(block, function(err) {
if (err) {
return done(err);
}
self.tip = block;
log.debug('Chain added block to main chain');
self.emit('addblock', block);
setImmediate(done);
});
} else {
// This block doesn't progress the current tip, so we'll attempt
// to rewind the chain to the common ancestor of the block and
// then we can resume syncing.
log.warn('Beginning reorg! Current tip: ' + self.tip.hash + '; New tip: ' + block.hash);
self.syncRewind(block, function(err) {
if(err) {
return done(err);
}
log.warn('Reorg complete. New tip is ' + self.tip.hash);
done();
});
}
});
}, function(err) {
if (err) {
Error.captureStackTrace(err);
return self.node.emit('error', err);
}
if(self.node.stopping) {
self.bitcoindSyncing = false;
return;
}
if (self.node.services.bitcoind.isSynced()) {
self.bitcoindSyncing = false;
self.node.emit('synced');
} else {
self.bitcoindSyncing = false;
}
});
return {
type: 'put',
key: this.dbPrefix + 'concurrentTip',
value: tipData
};
};
DB.prototype.getPrefix = function(service, callback) {
var self = this;

View File

@ -5,11 +5,14 @@ var Transform = require('stream').Transform;
var inherits = require('util').inherits;
var EventEmitter = require('events').EventEmitter;
var async = require('async');
var bitcore = require('bitcore-lib');
var BufferUtil = bitcore.util.buffer;
function Sync(node) {
function Sync(node, db) {
this.node = node;
this.db = node.db;
this.db = db;
this.syncing = false;
this.highWaterMark = 100;
}
inherits(Sync, EventEmitter);
@ -23,11 +26,11 @@ Sync.prototype.initialSync = function() {
}
this.syncing = true;
this.blockStream = new BlockStream(this.node.services.bitcoind, this.db.tip);
var processConcurrent = new ProcessConcurrent(this.db);
var processSerial = new ProcessSerial(this.db);
var writeStream1 = new WriteStream(this.db);
var writeStream2 = new WriteStream(this.db);
this.blockStream = new BlockStream(this.highWaterMark, this.node.services.bitcoind, this.db.tip.__height);
var processConcurrent = new ProcessConcurrent(this.highWaterMark, this.db);
var processSerial = new ProcessSerial(this.highWaterMark, this.db, this.db.tip);
var writeStream1 = new WriteStream(this.highWaterMark, this.db);
var writeStream2 = new WriteStream(this.highWaterMark, this.db);
var start = Date.now();
@ -90,7 +93,7 @@ Sync.prototype.sync = function() {
Sync.prototype.stop = function() {
if(this.blockStream) {
this.blockStream.destroy();
this.blockStream.stopping = true;
}
};
@ -108,49 +111,132 @@ Sync.prototype._handleErrors = function(stream) {
});
};
function BlockStream(bitcoind, lastHeight) {
Readable.call(this, {objectMode: true, highWaterMark: 10});
function BlockStream(highWaterMark, bitcoind, lastHeight) {
Readable.call(this, {objectMode: true, highWaterMark: highWaterMark});
this.bitcoind = bitcoind;
this.lastHeight = lastHeight;
this.stopping = false;
this.queue = [];
this.processing = false;
}
inherits(BlockStream, Readable);
// BlockStream.prototype._read = function() {
// var self = this;
// // TODO does not work :(
// var blockCount = Math.min(self.bitcoind.height - self.lastHeight, 5);
// if(blockCount <= 0 || this.stopping) {
// return self.push(null);
// }
// console.log('Fetching blocks ' + (self.lastHeight + 1) + ' to ' + (self.lastHeight + blockCount));
// async.times(blockCount, function(n, next) {
// var height = self.lastHeight + n + 1;
// self.bitcoind.getBlock(height, function(err, block) {
// if(err) {
// return next(err);
// }
// block.__height = height;
// next(null, block);
// });
// }, function(err, blocks) {
// if(err) {
// return self.emit('error', err);
// }
// for(var i = 0; i < blocks.length; i++) {
// self.push(blocks[i]);
// }
// self.lastHeight += blocks.length;
// });
// };
// BlockStream.prototype._read = function() {
// var self = this;
// self.lastHeight++;
// //console.log('Fetching block ' + self.lastHeight);
// var height = self.lastHeight;
// self.bitcoind.getBlock(height, function(err, block) {
// if(err) {
// return self.emit(err);
// }
// block.__height = height;
// //console.log('pushing block ' + block.__height);
// self.push(block);
// });
// };
BlockStream.prototype._read = function() {
var self = this;
this.lastHeight++;
this.queue.push(this.lastHeight);
var blockCount = Math.min(self.bitcoind.height - self.lastHeight, 5);
if(blockCount <= 0) {
return self.push(null);
}
console.log('Fetching blocks ' + (self.lastHeight + 1) + ' to ' (self.lastHeight + blockCount.length));
async.times(blockCount, function(n, next) {
var height = self.lastHeight + n + 1;
self.bitcoind.getBlock(height, function(err, block) {
if(err) {
return next(err);
}
next(null, block);
});
}, function(err, blocks) {
if(err) {
return self.emit('error', err);
}
for(var i = 0; i < blocks.length; i++) {
self.push(blocks[i]);
}
self.lastHeight += blocks.length;
});
this._process();
};
function ProcessSerial(db, tip) {
Transform.call(this, {objectMode: true, highWaterMark: 10});
BlockStream.prototype._process = function() {
var self = this;
if(this.processing) {
return;
}
this.processing = true;
async.whilst(
function() {
return self.queue.length;
}, function(next) {
var heights = self.queue.slice(0, Math.min(5, self.queue.length));
self.queue = self.queue.slice(heights.length);
//console.log('fetching blocks ' + heights[0] + ' to ' + heights[heights.length - 1]);
async.map(heights, function(height, next) {
self.bitcoind.getBlock(height, function(err, block) {
if(err) {
return next(err);
}
block.__height = height;
next(null, block);
});
}, function(err, blocks) {
if(err) {
return next(err);
}
for(var i = 0; i < blocks.length; i++) {
self.push(blocks[i]);
}
next();
});
}, function(err) {
if(err) {
return self.emit('error', err);
}
self.processing = false;
}
);
}
function ProcessSerial(highWaterMark, db, tip) {
Transform.call(this, {objectMode: true, highWaterMark: highWaterMark});
this.db = db;
this.tip = tip;
}
@ -160,8 +246,10 @@ inherits(ProcessSerial, Transform);
ProcessSerial.prototype._transform = function(block, enc, callback) {
var self = this;
//console.log('serial', block.__height);
var prevHash = BufferUtil.reverse(block.header.prevHash).toString('hex');
if(prevHash !== self.tip) {
if(prevHash !== self.tip.hash) {
var err = new Error('Reorg detected');
err.reorg = true;
return callback(err);
@ -169,30 +257,37 @@ ProcessSerial.prototype._transform = function(block, enc, callback) {
async.whilst(
function() {
return self.db.concurrentHeight < block.__height;
return self.db.concurrentTip.__height < block.__height;
},
function(next) {
setTimeout(next, 1000);
// wait until concurrent handler is ahead of us
setTimeout(next, 10);
},
function() {
var operations = [{index1: block.height}, {index2: block.height}];
setTimeout(function() {
self.db.getSerialBlockOperations(block, true, function(err, operations) {
if(err) {
return callback(err);
}
var obj = {
tipHeight: block.height,
tip: block,
operations: operations
};
self.tip = block;
callback(null, obj);
}, 100);
});
}
);
};
function ProcessConcurrent(db) {
Transform.call(this, {objectMode: true, highWaterMark: 10});
function ProcessConcurrent(highWaterMark, db) {
Transform.call(this, {objectMode: true, highWaterMark: highWaterMark});
this.db = db;
this.operations = [];
this.lastHeight = 0;
this.lastBlock = 0;
this.blockCount = 0;
};
inherits(ProcessConcurrent, Transform);
@ -200,22 +295,26 @@ inherits(ProcessConcurrent, Transform);
ProcessConcurrent.prototype._transform = function(block, enc, callback) {
var self = this;
this.lastHeight = block.__height;
//console.log('concurrent', block.__height);
self.db.runAllConcurrentBlockHandlers(block, true, function(err, operations) {
this.lastBlock = block;
self.db.getConcurrentBlockOperations(block, true, function(err, operations) {
if(err) {
return callback(err);
}
self.blockCount++;
self.operations = self.operations.concat(operations);
if(self.operations >= 100) {
// TODO add tip operation
if(self.blockCount >= 1) { //self.operations.length >= 100) {
self.operations.push(self.db.getConcurrentTipOperation(block, true));
var obj = {
concurrentTipHeight: block.__height,
concurrentTip: block,
operations: self.operations
}
self.operations = [];
self.blockCount = 0;
return callback(null, obj);
}
@ -226,9 +325,9 @@ ProcessConcurrent.prototype._transform = function(block, enc, callback) {
ProcessConcurrent.prototype._flush = function(callback) {
if(this.operations.length) {
// TODO add tip operation
this.operations.push(this.db.getConcurrentTipOperation(this.lastBlock, true));
var obj = {
concurrentTipHeight: this.lastHeight,
concurrentTipHeight: this.lastBlock,
operations: this.operations
};
@ -237,30 +336,40 @@ ProcessConcurrent.prototype._flush = function(callback) {
}
};
function WriteStream(db) {
Writable.call(this, {objectMode: true, highWaterMark: 10});
function WriteStream(highWaterMark, db) {
Writable.call(this, {objectMode: true, highWaterMark: highWaterMark});
this.db = db;
this.writeTime = 0;
this.lastConcurrentOutputHeight = 0;
}
inherits(WriteStream, Writable);
WriteStream.prototype._write = function(obj, enc, callback) {
var self = this;
setTimeout(function() {
console.log('WriteStreamSlow block ', operations.concurrentTipHeight);
self.writeTime += 2000;
self.db.store.batch(obj.operations, function(err) {
if(err) {
return callback(err);
}
if(obj.tip) {
self.db.tip = obj.tip;
if(self.db.tip.__height % 100 === 0) {
console.log('Tip:', self.db.tip.__height);
}
}
if(obj.concurrentTip) {
self.db.concurrentTip = obj.concurrentTip;
if(self.db.concurrentTip.__height - self.lastConcurrentOutputHeight >= 100) {
console.log('Concurrent tip:', self.db.concurrentTip.__height);
self.lastConcurrentOutputHeight = self.db.concurrentTip.__height;
}
}
callback();
}, 2000);
});
};
module.exports = Sync;