diff --git a/lib/services/db.js b/lib/services/db.js index bba2956d..f7dc6a6d 100644 --- a/lib/services/db.js +++ b/lib/services/db.js @@ -16,6 +16,7 @@ var errors = index.errors; var log = index.log; var Transaction = require('../transaction'); var Service = require('../service'); +var Sync = require('./sync'); /** * This service synchronizes a leveldb database with bitcoin block chain by connecting and @@ -66,9 +67,7 @@ function DB(options) { block: [] }; - this.concurrentSyncBuffer = []; - this.syncBuffer = {}; - this.concurrentBlocks = 10; + this._sync = new Sync(this.node); } util.inherits(DB, Service); @@ -165,15 +164,29 @@ DB.prototype.start = function(callback) { this.store = levelup(this.dataPath, { db: this.levelupStore, maxOpenFiles: this.maxOpenFiles, keyEncoding: 'binary', valueEncoding: 'binary'}); this.node.services.bitcoind.on('tx', this.transactionHandler.bind(this)); + this._sync.on('error', function(err) { + log.err(err); + }); + + this._sync.on('reorg', function() { + throw new Error('Reorg detected! Unhandled :(') + }); + + this._sync.on('initialsync', function() { + log.info('Initial sync complete'); + }); + + this.node.on('stopping', function() { + self._sync.stop(); + }); + this.node.once('ready', function() { // start syncing - self.sync(); + self._sync.initialSync(); // Notify that there is a new tip self.node.services.bitcoind.on('tip', function() { - if(!self.node.stopping) { - self.sync(); - } + self._sync.sync(); }); }); @@ -497,6 +510,16 @@ DB.prototype.disconnectBlock = function(block, callback) { this.runAllBlockHandlers(block, false, callback); }; +DB.prototype.getTipOperation = function(add) { + if(add) { + heightBuffer.writeUInt32BE(block.__height); + tipData = Buffer.concat([new Buffer(block.hash, 'hex'), heightBuffer]); + } else { + heightBuffer.writeUInt32BE(block.__height - 1); + tipData = Buffer.concat([BufferUtil.reverse(block.header.prevHash), heightBuffer]); + } +}; + /** * Will collect all database operations for a block from other services that implement * `blockHandler` methods and then save operations to the database. @@ -717,38 +740,10 @@ DB.prototype.syncRewind = function(block, done) { }); }; -DB.prototype.sync = function() { - console.log('sync'); - if (this.bitcoindSyncing || this.node.stopping || !this.tip) { - return; - } - - this.bitcoindSyncing = true; - - this._getBitcoindBlocks(); - this._concurrentSync(); - this._serialSync(); -}; - -DB.prototype._concurrentSync = function(callback) { +DB.prototype._concurrentSync = function(block, callback) { var self = this; console.log('concurrentSync'); - console.log(self.concurrentHeight, self.node.services.bitcoind.height); - - async.whilst(function() { - return self.concurrentHeight < self.node.services.bitcoind.height && !self.node.stopping; - }, function(done) { - if(self.concurrentSyncBuffer.length < Math.min(self.concurrentBlocks, self.node.services.bitcoind.height - self.concurrentHeight)) { - console.log('cannot concurrentSync... waiting for blocks'); - // wait to get more blocks from bitcoind - return setTimeout(done, 1000); - } - - // get x blocks off block buffer - var blocks = self.concurrentSyncBuffer.slice(0, self.concurrentBlocks); - self.concurrentSyncBuffer = self.concurrentSyncBuffer.slice(self.concurrentBlocks); - console.log('Processing ' + blocks.length + ' blocks'); var operations = []; async.each(blocks, function(block, next) { @@ -788,95 +783,59 @@ DB.prototype._concurrentSync = function(callback) { done(); }); }); - }, function(err) { - if (err) { - Error.captureStackTrace(err); - return self.node.emit('error', err); - } - }); }; -DB.prototype._serialSync = function(callback) { +DB.prototype._serialSync = function(block, callback) { console.log('serialSync'); var self = this; - // depends on block and transaction services - var height = self.tip.__height; - console.log('height', height, typeof(height)); - async.whilst(function() { - return height < self.node.services.bitcoind.height && !self.node.stopping; - }, function(done) { - if(height >= self.concurrentHeight || !self.syncBuffer[height + 1]) { - // wait - console.log('cannot serialSync... waiting for block ' + (height + 1)); - return setTimeout(done, 1000); - } - - // get block from memory - var block = self.syncBuffer[height + 1]; - if(block.header.prevHash.reverse().toString('hex') !== self.tip.hash) { - // TODO need to rewind our serial blocks as well as concurrent blocks! - } - - // This block appends to the current chain tip and we can - // immediately add it to the chain and create indexes. - - // Populate height - block.__height = self.tip.__height + 1; - - // Create indexes - self.connectBlock(block, function(err) { - if (err) { - return done(err); - } - self.tip = block; - // remove from memory - delete self.syncBuffer[height + 1]; - log.debug('Chain added block to main chain'); - self.emit('addblock', block); - done(); - }); - }, function(err) { + // Create indexes + self.connectBlock(block, function(err) { if (err) { - Error.captureStackTrace(err); - return self.node.emit('error', err); - } - - if(self.node.stopping) { - self.bitcoindSyncing = false; - return; - } - - if (self.node.services.bitcoind.isSynced()) { - self.bitcoindSyncing = false; - self.node.emit('synced'); - } else { - self.bitcoindSyncing = false; + return callback(err); } + self.tip = block; + log.debug('Chain added block to main chain'); + self.emit('addblock', block); + callback(); }); }; -DB.prototype._getBitcoindBlocks = function() { +DB.prototype.sync = function() { var self = this; + if (this.bitcoindSyncing || this.node.stopping || !this.tip) { + return; + } + + this.bitcoindSyncing = true; + // keep n blocks in block buffer // when we run out get more from bitcoind var lastHeight = self.tip.__height; - async.whilst(function() { - return self.lastHeight < self.node.services.bitcoind.height && !self.node.stopping; - }, function(done) { - var blockCount = Math.min(self.node.services.bitcoind.height - lastHeight, 10 - self.blockBuffer.length); + var blocks = {}; - async.timesLimit(blockCount, 10, function(n, next) { + var syncError = null; + + async.whilst(function() { + return lastHeight < self.node.services.bitcoind.height && !self.node.stopping && !syncError; + }, function(done) { + var blockCount = Math.min(self.node.services.bitcoind.height - lastHeight, 30 - Object.keys(blocks).length, 5); + + if(!blockCount) { + return setTimeout(done, 1000); + } + + async.times(blockCount, function(n, next) { var height = lastHeight + n + 1; self.node.services.bitcoind.getBlock(height, function(err, block) { if(err) { return next(err); } - self.blockBuffer[height] = block; + blocks[height] = block; next(); }); @@ -885,44 +844,60 @@ DB.prototype._getBitcoindBlocks = function() { return done(err); } - async.timesLimit(blockCount, 10, function(n, next) { - var block = self.blockBuffer[lastHeight + n + 1]; + for(var i = 0; i < blockCount.length; i++) { + var block = blocks[lastHeight + i + 1]; + block.__height = lastHeight + i + 1; self._concurrentSync(block, function(err) { if(err) { - return next(err); + syncError = err; + return; } self._serialSync(block, function(err) { if(err) { - log.err(err); - return node.stop(function() { - process.exit(1); - }); + syncError = err; + return; } - delete self.blockBuffer[lastHeight + i + 1]; + delete blocks[lastHeight + i + 1]; + + if(self.tip.__height === self.node.services.bitcoind.height) { + self.bitcoindSyncing = false; + if(self.isSynced()) { + self.node.emit('synced'); + } + } }); - - next(); }); - }, function(err) { - if(err) { - return done(err); - } + } - lastHeight += blockCount; - done(); - }); + lastHeight += blockCount; + done(); }); }, function(err) { if (err) { Error.captureStackTrace(err); - return self.node.emit('error', err); + self.node.emit('error', err); + return; + } + + if(syncError) { + Error.captureStackTrace(syncError); + self.node.emit('error', syncError); + return; + } + + if(self.node.stopping) { + self.bitcoindSyncing = false; } }); }; +DB.prototype.isSynced = function() { + return self.bitcoind.isSynced() && self.tip.__height === self.node.services.bitcoind.height; +}; + DB.prototype._getBitcoindBlocks = function() { var self = this; console.log('getBitcoindBlocks'); diff --git a/lib/services/db/sync.js b/lib/services/db/sync.js new file mode 100644 index 00000000..e5ed75cd --- /dev/null +++ b/lib/services/db/sync.js @@ -0,0 +1,266 @@ +'use strict' +var Readable = require('stream').Readable; +var Writable = require('stream').Writable; +var Transform = require('stream').Transform; +var inherits = require('util').inherits; +var EventEmitter = require('events').EventEmitter; +var async = require('async'); + +function Sync(node) { + this.node = node; + this.db = node.db; + this.syncing = false; +} + +inherits(Sync, EventEmitter); + +// Use a concurrent strategy to sync +Sync.prototype.initialSync = function() { + var self = this; + + if(this.syncing) { + return + } + + this.syncing = true; + this.blockStream = new BlockStream(this.node.services.bitcoind, this.db.tip); + var processConcurrent = new ProcessConcurrent(this.db); + var processSerial = new ProcessSerial(this.db); + var writeStream1 = new WriteStream(this.db); + var writeStream2 = new WriteStream(this.db); + + var start = Date.now(); + + this._handleErrors(this.blockStream); + this._handleErrors(processConcurrent); + this._handleErrors(processSerial); + this._handleErrors(writeStream1); + this._handleErrors(writeStream2); + + writeStream2.on('finish', function() { + var end = Date.now(); + console.log('Total time: ', (end - start) + ' ms'); + console.log('Concurrent write time: ', writeStreamSlow.writeTime + ' ms'); + console.log('Serial write time: ', writeStreamFast.writeTime + ' ms'); + + self.syncing = false; + self.emit('initialsync'); + }); + + this.blockStream + .pipe(processConcurrent) + .pipe(writeStream1); + + this.blockStream + .pipe(processSerial) + .pipe(writeStream2); +}; + +// Get concurrent and serial block operations and write them together block by block +// Useful for when we are fully synced and we want to advance the concurrentTip and +// the tip together +Sync.prototype.sync = function() { + if(this.syncing) { + return + } + + this.syncing = true; + this.blockStream = new BlockStream(); + var processBoth = new ProcessBoth(this.db); + var writeStream = new WriteStream(this.db); + + var start = Date.now(); + + this._handleErrors(this.blockStream); + this._handleErrors(processBoth); + this._handleErrors(writeStream); + + writeStream.on('finish', function() { + var end = Date.now(); + console.log('Total time: ', (end - start) + ' ms'); + console.log('Concurrent write time: ', writeStreamSlow.writeTime + ' ms'); + console.log('Serial write time: ', writeStreamFast.writeTime + ' ms'); + self.emit('synced'); + }); + + this.blockStream + .pipe(processBoth) + .pipe(writeStream1); +}; + +Sync.prototype.stop = function() { + if(this.blockStream) { + this.blockStream.destroy(); + } +}; + +Sync.prototype._handleErrors = function(stream) { + var self = this; + + stream.on('error', function(err) { + self.syncing = false; + + if(err.reorg) { + return self.emit('reorg'); + } + + self.emit('error', err); + }); +}; + +function BlockStream(bitcoind, lastHeight) { + Readable.call(this, {objectMode: true, highWaterMark: 10}); + this.bitcoind = bitcoind; + this.lastHeight = lastHeight; +} + +inherits(BlockStream, Readable); + +BlockStream.prototype._read = function() { + var self = this; + + var blockCount = Math.min(self.bitcoind.height - self.lastHeight, 5); + + if(blockCount <= 0) { + return self.push(null); + } + + console.log('Fetching blocks ' + (self.lastHeight + 1) + ' to ' (self.lastHeight + blockCount.length)); + + async.times(blockCount, function(n, next) { + var height = self.lastHeight + n + 1; + self.bitcoind.getBlock(height, function(err, block) { + if(err) { + return next(err); + } + + next(null, block); + }); + }, function(err, blocks) { + if(err) { + return self.emit('error', err); + } + + for(var i = 0; i < blocks.length; i++) { + self.push(blocks[i]); + } + + self.lastHeight += blocks.length; + }); +}; + +function ProcessSerial(db, tip) { + Transform.call(this, {objectMode: true, highWaterMark: 10}); + this.db = db; + this.tip = tip; +} + +inherits(ProcessSerial, Transform); + +ProcessSerial.prototype._transform = function(block, enc, callback) { + var self = this; + + var prevHash = BufferUtil.reverse(block.header.prevHash).toString('hex'); + if(prevHash !== self.tip) { + var err = new Error('Reorg detected'); + err.reorg = true; + return callback(err); + } + + async.whilst( + function() { + return self.db.concurrentHeight < block.__height; + }, + function(next) { + setTimeout(next, 1000); + }, + function() { + var operations = [{index1: block.height}, {index2: block.height}]; + setTimeout(function() { + var obj = { + tipHeight: block.height, + operations: operations + }; + + callback(null, obj); + }, 100); + } + ); +}; + +function ProcessConcurrent(db) { + Transform.call(this, {objectMode: true, highWaterMark: 10}); + this.db = db; + this.operations = []; + this.lastHeight = 0; +}; + +inherits(ProcessConcurrent, Transform); + +ProcessConcurrent.prototype._transform = function(block, enc, callback) { + var self = this; + + this.lastHeight = block.__height; + + self.db.runAllConcurrentBlockHandlers(block, true, function(err, operations) { + if(err) { + return callback(err); + } + + self.operations = self.operations.concat(operations); + + if(self.operations >= 100) { + // TODO add tip operation + var obj = { + concurrentTipHeight: block.__height, + operations: self.operations + } + self.operations = []; + + return callback(null, obj); + } + + callback(); + }); +}; + +ProcessConcurrent.prototype._flush = function(callback) { + if(this.operations.length) { + // TODO add tip operation + var obj = { + concurrentTipHeight: this.lastHeight, + operations: this.operations + }; + + this.operations = []; + return callback(null, operations); + } +}; + +function WriteStream(db) { + Writable.call(this, {objectMode: true, highWaterMark: 10}); + this.db = db; + this.writeTime = 0; +} + +inherits(WriteStream, Writable); + +WriteStream.prototype._write = function(obj, enc, callback) { + var self = this; + setTimeout(function() { + console.log('WriteStreamSlow block ', operations.concurrentTipHeight); + self.writeTime += 2000; + + if(obj.tip) { + self.db.tip = obj.tip; + } + + if(obj.concurrentTip) { + self.db.concurrentTip = obj.concurrentTip; + } + + callback(); + }, 2000); +}; + +module.exports = Sync; \ No newline at end of file