Merge branch 'feature/concurrency' into feature/walletIndex

This commit is contained in:
Chris Kleeschulte 2017-01-30 15:05:05 -05:00
commit fbdafb74f9
4 changed files with 728 additions and 173 deletions

149
contrib/streamtest.js Normal file
View File

@ -0,0 +1,149 @@
'use strict'
var Readable = require('stream').Readable;
var Writable = require('stream').Writable;
var Transform = require('stream').Transform;
var inherits = require('util').inherits;
var async = require('async');
function main() {
var blockStream = new BlockStream();
var processConcurrent = new ProcessConcurrent();
var processSerial = new ProcessSerial();
var writeStreamFast = new WriteStreamFast();
var writeStreamSlow = new WriteStreamSlow();
var start = Date.now();
writeStreamFast.on('finish', function() {
var end = Date.now();
console.log('Total time: ', (end - start) + ' ms');
console.log('Concurrent write time: ', writeStreamSlow.writeTime + ' ms');
console.log('Serial write time: ', writeStreamFast.writeTime + ' ms');
});
blockStream
.pipe(processConcurrent)
.pipe(writeStreamSlow);
blockStream
.pipe(processSerial)
.pipe(writeStreamFast);
}
function BlockStream() {
Readable.call(this, {objectMode: true, highWaterMark: 10});
this.height = 0;
}
inherits(BlockStream, Readable);
BlockStream.prototype._read = function() {
var self = this;
console.log('_read');
setTimeout(function() {
self.height++;
if(self.height > 40) {
self.push(null);
return;
}
console.log('ReadStream block ', self.height);
console.log(self.push({height: self.height}));
}, 500);
};
function ProcessSerial() {
Transform.call(this, {objectMode: true, highWaterMark: 10});
}
inherits(ProcessSerial, Transform);
ProcessSerial.prototype._transform = function(block, enc, callback) {
var operations = [{index1: block.height}, {index2: block.height}];
setTimeout(function() {
var obj = {
tipHeight: block.height,
operations: operations
};
callback(null, obj);
}, 100);
};
function ProcessConcurrent() {
Transform.call(this, {objectMode: true, highWaterMark: 10});
this.operations = [];
this.lastHeight = 0;
};
inherits(ProcessConcurrent, Transform);
ProcessConcurrent.prototype._transform = function(block, enc, callback) {
var self = this;
self.lastHeight = block.height;
setTimeout(function() {
self.operations = self.operations.concat([{index3: block.height}, {index4: block.height}]);
console.log(self.operations.length);
if(self.operations.length >= 10) {
var obj = {
concurrentTipHeight: self.lastHeight,
operations: self.operations
};
self.operations = [];
return callback(null, obj);
}
callback();
}, 100);
};
ProcessConcurrent.prototype._flush = function(callback) {
if(this.operations.length) {
var obj = {
concurrentTipHeight: this.lastHeight,
operations: this.operations
};
this.operations = [];
return callback(null, operations);
}
};
function WriteStreamSlow() {
Writable.call(this, {objectMode: true, highWaterMark: 10});
this.writeTime = 0;
}
inherits(WriteStreamSlow, Writable);
WriteStreamSlow.prototype._write = function(operations, enc, callback) {
var self = this;
setTimeout(function() {
console.log('WriteStreamSlow block ', operations.concurrentTipHeight);
self.writeTime += 2000;
callback();
}, 2000);
};
function WriteStreamFast() {
Writable.call(this, {objectMode: true, highWaterMark: 1});
this.writeTime = 0;
}
inherits(WriteStreamFast, Writable);
WriteStreamFast.prototype._write = function(operations, enc, callback) {
var self = this;
setTimeout(function() {
console.log('WriteStreamFast block ', operations.tipHeight);
self.writeTime += 1000;
callback();
}, 1000);
};
main();

View File

@ -134,7 +134,7 @@ AddressService.prototype.getPublishEvents = function() {
* @param {Boolean} addOutput - If the block is being removed or added to the chain
* @param {Function} callback
*/
AddressService.prototype.blockHandler = function(block, connectBlock, callback) {
AddressService.prototype.concurrentBlockHandler = function(block, connectBlock, callback) {
var self = this;
var txs = block.transactions;

View File

@ -11,11 +11,12 @@ var BufferUtil = bitcore.util.buffer;
var Networks = bitcore.Networks;
var Block = bitcore.Block;
var $ = bitcore.util.preconditions;
var index = require('../');
var index = require('../../');
var errors = index.errors;
var log = index.log;
var Transaction = require('../transaction');
var Service = require('../service');
var Transaction = require('../../transaction');
var Service = require('../../service');
var Sync = require('./sync');
/**
* This service synchronizes a leveldb database with bitcoin block chain by connecting and
@ -65,6 +66,8 @@ function DB(options) {
transaction: [],
block: []
};
this._sync = new Sync(this.node, this);
}
util.inherits(DB, Service);
@ -161,15 +164,29 @@ DB.prototype.start = function(callback) {
this.store = levelup(this.dataPath, { db: this.levelupStore, maxOpenFiles: this.maxOpenFiles, keyEncoding: 'binary', valueEncoding: 'binary'});
this.node.services.bitcoind.on('tx', this.transactionHandler.bind(this));
this._sync.on('error', function(err) {
log.error(err);
});
this._sync.on('reorg', function() {
throw new Error('Reorg detected! Unhandled :(')
});
this._sync.on('initialsync', function() {
log.info('Initial sync complete');
});
this.node.on('stopping', function() {
self._sync.stop();
});
this.node.once('ready', function() {
// start syncing
self.sync();
self._sync.initialSync();
// Notify that there is a new tip
self.node.services.bitcoind.on('tip', function() {
if(!self.node.stopping) {
self.sync();
}
self._sync.sync();
});
});
@ -189,7 +206,7 @@ DB.prototype.start = function(callback) {
return callback(err);
}
setImmediate(callback);
self.loadConcurrentTip(callback);
});
});
};
@ -319,6 +336,55 @@ DB.prototype.loadTip = function(callback) {
});
};
DB.prototype.loadConcurrentTip = function(callback) {
var self = this;
var options = {
keyEncoding: 'string',
valueEncoding: 'binary'
};
self.store.get(self.dbPrefix + 'concurrentTip', options, function(err, tipData) {
if(err && err instanceof levelup.errors.NotFoundError) {
self.concurrentTip = self.genesis;
self.concurrentTip.__height = 0;
return;
} else if(err) {
return callback(err);
}
var hash = tipData.slice(0, 32).toString('hex');
var height = tipData.readUInt32BE(32);
var times = 0;
async.retry({times: 3, interval: self.retryInterval}, function(done) {
self.getBlock(hash, function(err, concurrentTip) {
if(err) {
times++;
log.warn('Bitcoind does not have our concurrentTip (' + hash + '). Bitcoind may have crashed and needs to catch up.');
if(times < 3) {
log.warn('Retrying in ' + (self.retryInterval / 1000) + ' seconds.');
}
return done(err);
}
done(null, concurrentTip);
});
}, function(err, concurrentTip) {
if(err) {
log.warn('Giving up after 3 tries. Please report this bug to https://github.com/bitpay/bitcore-node/issues');
log.warn('Please reindex your database.');
return callback(err);
}
concurrentTip.__height = height;
self.concurrentTip = concurrentTip;
callback();
});
});
};
/**
* Will get a block from bitcoind and give a Bitcore Block
* @param {String|Number} hash - A block hash or block height
@ -454,8 +520,30 @@ DB.prototype.getPrevHash = function(blockHash, callback) {
* @param {Function} callback
*/
DB.prototype.connectBlock = function(block, callback) {
var self = this;
log.debug('DB handling new chain block');
this.runAllBlockHandlers(block, true, callback);
var operations = [];
self.getConcurrentBlockOperations(block, true, function(err, ops) {
if(err) {
return callback(err);
}
operations = ops;
self.getSerialBlockOperations(block, true, function(err, ops) {
if(err) {
return callback(err);
}
operations = operations.concat(ops);
operations.push(self.getTipOperation(block, true));
operations.push(self.getConcurrentTipOperation(block, true));
self.store.batch(operations, callback);
});
});
};
/**
@ -464,43 +552,70 @@ DB.prototype.connectBlock = function(block, callback) {
* @param {Function} callback
*/
DB.prototype.disconnectBlock = function(block, callback) {
var self = this;
log.debug('DB removing chain block');
this.runAllBlockHandlers(block, false, callback);
var operations = [];
self.getConcurrentBlockOperations(block, false, function(err, ops) {
if(err) {
return callback(err);
}
operations = ops;
self.getSerialBlockOperations(block, false, function(err, ops) {
if(err) {
return callback(err);
}
operations = operations.concat(ops);
operations.push(self.getTipOperation(block, false));
operations.push(self.getConcurrentTipOperation(block, false));
self.store.batch(operations, callback);
});
});
};
/**
* Will collect all database operations for a block from other services that implement
* `blockHandler` methods and then save operations to the database.
* @param {Block} block - The bitcore block
* @param {Boolean} add - If the block is being added/connected or removed/disconnected
* @param {Function} callback
*/
DB.prototype.runAllBlockHandlers = function(block, add, callback) {
DB.prototype.getConcurrentBlockOperations = function(block, add, callback) {
var self = this;
var operations = [];
// Notify block subscribers
// for (var i = 0; i < this.subscriptions.block.length; i++) {
// this.subscriptions.block[i].emit('db/block', block.hash);
// }
async.each(
this.node.services,
function(mod, next) {
if(mod.concurrentBlockHandler) {
$.checkArgument(typeof mod.concurrentBlockHandler === 'function', 'concurrentBlockHandler must be a function');
// Update tip
var tipData = new Buffer(36);
var heightBuffer = new Buffer(4);
mod.concurrentBlockHandler.call(mod, block, add, function(err, ops) {
if (err) {
return next(err);
}
if (ops) {
$.checkArgument(Array.isArray(ops), 'concurrentBlockHandler for ' + mod.name + ' returned non-array');
operations = operations.concat(ops);
}
if(add) {
heightBuffer.writeUInt32BE(block.__height);
tipData = Buffer.concat([new Buffer(block.hash, 'hex'), heightBuffer]);
} else {
heightBuffer.writeUInt32BE(block.__height - 1);
tipData = Buffer.concat([BufferUtil.reverse(block.header.prevHash), heightBuffer]);
}
next();
});
} else {
setImmediate(next);
}
},
function(err) {
if (err) {
return callback(err);
}
operations.push({
type: 'put',
key: self.dbPrefix + 'tip',
value: tipData
});
callback(null, operations);
}
);
};
DB.prototype.getSerialBlockOperations = function(block, add, callback) {
var self = this;
var operations = [];
async.eachSeries(
this.node.services,
@ -516,6 +631,7 @@ DB.prototype.runAllBlockHandlers = function(block, add, callback) {
$.checkArgument(Array.isArray(ops), 'blockHandler for ' + mod.name + ' returned non-array');
operations = operations.concat(ops);
}
next();
});
} else {
@ -527,145 +643,43 @@ DB.prototype.runAllBlockHandlers = function(block, add, callback) {
return callback(err);
}
log.debug('Updating the database with operations', operations);
self.store.batch(operations, callback);
callback(null, operations);
}
);
};
/**
* This function will find the common ancestor between the current chain and a forked block,
* by moving backwards on both chains until there is a meeting point.
* @param {Block} block - The new tip that forks the current chain.
* @param {Function} done - A callback function that is called when complete.
*/
DB.prototype.findCommonAncestor = function(block, done) {
DB.prototype.getTipOperation = function(block, add) {
var heightBuffer = new Buffer(4);
var tipData;
var self = this;
var mainPosition = self.tip.hash;
var forkPosition = block.hash;
var mainHashesMap = {};
var forkHashesMap = {};
mainHashesMap[mainPosition] = true;
forkHashesMap[forkPosition] = true;
var commonAncestor = null;
async.whilst(
function() {
return !commonAncestor;
},
function(next) {
if(mainPosition) {
var mainBlockIndex = self.node.services.bitcoind.getBlockIndex(mainPosition);
if(mainBlockIndex && mainBlockIndex.prevHash) {
mainHashesMap[mainBlockIndex.prevHash] = true;
mainPosition = mainBlockIndex.prevHash;
} else {
mainPosition = null;
}
}
if(forkPosition) {
var forkBlockIndex = self.node.services.bitcoind.getBlockIndex(forkPosition);
if(forkBlockIndex && forkBlockIndex.prevHash) {
forkHashesMap[forkBlockIndex.prevHash] = true;
forkPosition = forkBlockIndex.prevHash;
} else {
forkPosition = null;
}
}
if(forkPosition && mainHashesMap[forkPosition]) {
commonAncestor = forkPosition;
}
if(mainPosition && forkHashesMap[mainPosition]) {
commonAncestor = mainPosition;
}
if(!mainPosition && !forkPosition) {
return next(new Error('Unknown common ancestor'));
}
setImmediate(next);
},
function(err) {
done(err, commonAncestor);
}
);
};
/**
* This function will attempt to rewind the chain to the common ancestor
* between the current chain and a forked block.
* @param {Block} block - The new tip that forks the current chain.
* @param {Function} done - A callback function that is called when complete.
*/
DB.prototype.syncRewind = function(block, done) {
var self = this;
self.findCommonAncestor(block, function(err, ancestorHash) {
if (err) {
return done(err);
}
log.warn('Reorg common ancestor found:', ancestorHash);
// Rewind the chain to the common ancestor
async.whilst(
function() {
// Wait until the tip equals the ancestor hash
return self.tip.hash !== ancestorHash;
},
function(removeDone) {
var tip = self.tip;
// TODO: expose prevHash as a string from bitcore
var prevHash = BufferUtil.reverse(tip.header.prevHash).toString('hex');
self.getBlock(prevHash, function(err, previousTip) {
if (err) {
removeDone(err);
}
// Undo the related indexes for this block
self.disconnectBlock(tip, function(err) {
if (err) {
return removeDone(err);
}
// Set the new tip
previousTip.__height = self.tip.__height - 1;
self.tip = previousTip;
self.emit('removeblock', tip);
removeDone();
});
});
}, done
);
});
};
/**
* This function will synchronize additional indexes for the chain based on
* the current active chain in the bitcoin daemon. In the event that there is
* a reorganization in the daemon, the chain will rewind to the last common
* ancestor and then resume syncing.
*/
DB.prototype.sync = function() {
var self = this;
if (self.bitcoindSyncing || self.node.stopping || !self.tip) {
return;
if(add) {
heightBuffer.writeUInt32BE(block.__height);
tipData = Buffer.concat([new Buffer(block.hash, 'hex'), heightBuffer]);
} else {
heightBuffer.writeUInt32BE(block.__height - 1);
tipData = Buffer.concat([BufferUtil.reverse(block.header.prevHash), heightBuffer]);
}
return {
type: 'put',
key: this.dbPrefix + 'tip',
value: tipData
};
};
DB.prototype.getConcurrentTipOperation = function(block, add) {
var heightBuffer = new Buffer(4);
var tipData;
if(add) {
heightBuffer.writeUInt32BE(block.__height);
tipData = Buffer.concat([new Buffer(block.hash, 'hex'), heightBuffer]);
} else {
heightBuffer.writeUInt32BE(block.__height - 1);
tipData = Buffer.concat([BufferUtil.reverse(block.header.prevHash), heightBuffer]);
}
<<<<<<< HEAD:lib/services/db.js
self.bitcoindSyncing = true;
var height;
@ -733,10 +747,16 @@ DB.prototype.sync = function() {
} else {
self.bitcoindSyncing = false;
}
});
=======
return {
type: 'put',
key: this.dbPrefix + 'concurrentTip',
value: tipData
};
};
>>>>>>> feature/concurrency:lib/services/db/index.js
DB.prototype.getPrefix = function(service, callback) {
var self = this;

386
lib/services/db/sync.js Normal file
View File

@ -0,0 +1,386 @@
'use strict'
var Readable = require('stream').Readable;
var Writable = require('stream').Writable;
var Transform = require('stream').Transform;
var inherits = require('util').inherits;
var EventEmitter = require('events').EventEmitter;
var async = require('async');
var bitcore = require('bitcore-lib');
var BufferUtil = bitcore.util.buffer;
function Sync(node, db) {
this.node = node;
this.db = db;
this.syncing = false;
this.highWaterMark = 100;
}
inherits(Sync, EventEmitter);
// Use a concurrent strategy to sync
Sync.prototype.initialSync = function() {
var self = this;
if(this.syncing) {
return
}
this.syncing = true;
this.blockStream = new BlockStream(this.highWaterMark, this.node.services.bitcoind, this.db.tip.__height);
var processConcurrent = new ProcessConcurrent(this.highWaterMark, this.db);
var writeStream = new WriteStream(this.highWaterMark, this.db);
var processSerial = new ProcessSerial(this.highWaterMark, this.db, this.db.tip);
var start = Date.now();
this._handleErrors(this.blockStream);
this._handleErrors(processConcurrent);
this._handleErrors(processSerial);
this._handleErrors(writeStream);
processSerial.on('finish', function() {
self.syncing = false;
self.emit('initialsync');
});
this.blockStream
.pipe(processConcurrent)
.pipe(writeStream);
this.blockStream
.pipe(processSerial);
};
// Get concurrent and serial block operations and write them together block by block
// Useful for when we are fully synced and we want to advance the concurrentTip and
// the tip together
Sync.prototype.sync = function() {
if(this.syncing) {
return
}
this.syncing = true;
this.blockStream = new BlockStream();
var processBoth = new ProcessBoth(this.db);
var writeStream = new WriteStream(this.db);
var start = Date.now();
this._handleErrors(this.blockStream);
this._handleErrors(processBoth);
this._handleErrors(writeStream);
writeStream.on('finish', function() {
var end = Date.now();
console.log('Total time: ', (end - start) + ' ms');
console.log('Concurrent write time: ', writeStreamSlow.writeTime + ' ms');
console.log('Serial write time: ', writeStreamFast.writeTime + ' ms');
self.emit('synced');
});
this.blockStream
.pipe(processBoth)
.pipe(writeStream1);
};
Sync.prototype.stop = function() {
if(this.blockStream) {
this.blockStream.stopping = true;
}
};
Sync.prototype._handleErrors = function(stream) {
var self = this;
stream.on('error', function(err) {
self.syncing = false;
if(err.reorg) {
return self.emit('reorg');
}
self.emit('error', err);
});
};
function BlockStream(highWaterMark, bitcoind, lastHeight) {
Readable.call(this, {objectMode: true, highWaterMark: highWaterMark});
this.bitcoind = bitcoind;
this.lastHeight = lastHeight;
this.stopping = false;
this.queue = [];
this.processing = false;
}
inherits(BlockStream, Readable);
// BlockStream.prototype._read = function() {
// var self = this;
// // TODO does not work :(
// var blockCount = Math.min(self.bitcoind.height - self.lastHeight, 5);
// if(blockCount <= 0 || this.stopping) {
// return self.push(null);
// }
// console.log('Fetching blocks ' + (self.lastHeight + 1) + ' to ' + (self.lastHeight + blockCount));
// async.times(blockCount, function(n, next) {
// var height = self.lastHeight + n + 1;
// self.bitcoind.getBlock(height, function(err, block) {
// if(err) {
// return next(err);
// }
// block.__height = height;
// next(null, block);
// });
// }, function(err, blocks) {
// if(err) {
// return self.emit('error', err);
// }
// for(var i = 0; i < blocks.length; i++) {
// self.push(blocks[i]);
// }
// self.lastHeight += blocks.length;
// });
// };
// BlockStream.prototype._read = function() {
// var self = this;
// self.lastHeight++;
// //console.log('Fetching block ' + self.lastHeight);
// var height = self.lastHeight;
// self.bitcoind.getBlock(height, function(err, block) {
// if(err) {
// return self.emit(err);
// }
// block.__height = height;
// //console.log('pushing block ' + block.__height);
// self.push(block);
// });
// };
BlockStream.prototype._read = function() {
this.lastHeight++;
this.queue.push(this.lastHeight);
this._process();
};
BlockStream.prototype._process = function() {
var self = this;
if(this.processing) {
return;
}
this.processing = true;
async.whilst(
function() {
return self.queue.length;
}, function(next) {
var heights = self.queue.slice(0, Math.min(5, self.queue.length));
self.queue = self.queue.slice(heights.length);
//console.log('fetching blocks ' + heights[0] + ' to ' + heights[heights.length - 1]);
async.map(heights, function(height, next) {
self.bitcoind.getBlock(height, function(err, block) {
if(err) {
return next(err);
}
block.__height = height;
setTimeout(function() {
// we're getting blocks from bitcoind too fast?
// it pauses at 8100
next(null, block);
}, 1);
});
}, function(err, blocks) {
if(err) {
return next(err);
}
for(var i = 0; i < blocks.length; i++) {
self.push(blocks[i]);
}
next();
});
}, function(err) {
if(err) {
return self.emit('error', err);
}
self.processing = false;
}
);
}
function ProcessSerial(highWaterMark, db, tip) {
Writable.call(this, {objectMode: true, highWaterMark: highWaterMark});
this.db = db;
this.tip = tip;
}
inherits(ProcessSerial, Writable);
ProcessSerial.prototype._write = function(block, enc, callback) {
var self = this;
//console.log('serial', block.__height);
var prevHash = BufferUtil.reverse(block.header.prevHash).toString('hex');
if(prevHash !== self.tip.hash) {
var err = new Error('Reorg detected');
err.reorg = true;
return callback(err);
}
if(check()) {
return self._process(block, callback);
}
self.db.once('concurrentaddblock', function() {
if(!check()) {
var err = new Error('Concurrent block ' + self.db.concurrentTip.__height + ' is less than ' + block.__height);
return self.emit('error', err);
}
self._process(block, callback);
});
function check() {
return self.db.concurrentTip.__height < block.__height;
}
};
ProcessSerial.prototype._process = function(block, callback) {
var self = this;
self.db.getSerialBlockOperations(block, true, function(err, operations) {
if(err) {
return callback(err);
}
operations.push(self.db.getTipOperation(block, true));
var obj = {
tip: block,
operations: operations
};
self.tip = block;
self.db.store.batch(obj.operations, function(err) {
if(err) {
return callback(err);
}
self.db.tip = block;
self.db.emit('addblock');
if(self.db.tip.__height % 100 === 0) {
console.log('Tip:', self.db.tip.__height);
}
callback();
});
});
};
function ProcessConcurrent(highWaterMark, db) {
Transform.call(this, {objectMode: true, highWaterMark: highWaterMark});
this.db = db;
this.operations = [];
this.lastBlock = 0;
this.blockCount = 0;
};
inherits(ProcessConcurrent, Transform);
ProcessConcurrent.prototype._transform = function(block, enc, callback) {
var self = this;
//console.log('concurrent', block.__height);
this.lastBlock = block;
self.db.getConcurrentBlockOperations(block, true, function(err, operations) {
if(err) {
return callback(err);
}
self.blockCount++;
self.operations = self.operations.concat(operations);
if(self.blockCount >= 1) { //self.operations.length >= 100) {
self.operations.push(self.db.getConcurrentTipOperation(block, true));
var obj = {
concurrentTip: block,
operations: self.operations
}
self.operations = [];
self.blockCount = 0;
return callback(null, obj);
}
callback();
});
};
ProcessConcurrent.prototype._flush = function(callback) {
if(this.operations.length) {
this.operations.push(this.db.getConcurrentTipOperation(this.lastBlock, true));
var obj = {
concurrentTipHeight: this.lastBlock,
operations: this.operations
};
this.operations = [];
return callback(null, operations);
}
};
function WriteStream(highWaterMark, db) {
Writable.call(this, {objectMode: true, highWaterMark: highWaterMark});
this.db = db;
this.writeTime = 0;
this.lastConcurrentOutputHeight = 0;
}
inherits(WriteStream, Writable);
WriteStream.prototype._write = function(obj, enc, callback) {
var self = this;
self.db.store.batch(obj.operations, function(err) {
if(err) {
return callback(err);
}
self.db.concurrentTip = obj.concurrentTip;
self.db.emit('concurrentaddblock');
if(self.db.concurrentTip.__height - self.lastConcurrentOutputHeight >= 100) {
console.log('Concurrent tip:', self.db.concurrentTip.__height);
self.lastConcurrentOutputHeight = self.db.concurrentTip.__height;
}
callback();
});
};
module.exports = Sync;