From 25c14925807aa9e5106157affed1afe58eb55f07 Mon Sep 17 00:00:00 2001 From: Patrick Nagurny Date: Tue, 24 Jan 2017 17:44:01 -0500 Subject: [PATCH 01/11] concurrency wip --- lib/services/{timestamp.js => block.js} | 46 +++--- lib/services/db.js | 188 +++++++++++++++++++++++- 2 files changed, 216 insertions(+), 18 deletions(-) rename lib/services/{timestamp.js => block.js} (68%) diff --git a/lib/services/timestamp.js b/lib/services/block.js similarity index 68% rename from lib/services/timestamp.js rename to lib/services/block.js index e69b5b61..6268fdeb 100644 --- a/lib/services/timestamp.js +++ b/lib/services/block.js @@ -2,19 +2,20 @@ var BaseService = require('../service'); var inherits = require('util').inherits; -function TimestampService(options) { +function BlockService(options) { BaseService.call(this, options); this.currentBlock = null; this.currentTimestamp = null; } -inherits(TimestampService, BaseService); +inherits(BlockService, BaseService); -TimestampService.dependencies = [ - 'db' +BlockService.dependencies = [ + 'db', + 'transaction' ]; -TimestampService.prototype.start = function(callback) { +BlockService.prototype.start = function(callback) { var self = this; this.store = this.node.services.db.store; @@ -30,11 +31,11 @@ TimestampService.prototype.start = function(callback) { }); }; -TimestampService.prototype.stop = function(callback) { +BlockService.prototype.stop = function(callback) { setImmediate(callback); }; -TimestampService.prototype.blockHandler = function(block, connectBlock, callback) { +BlockService.prototype.blockHandler = function(block, connectBlock, callback) { var self = this; var action = 'put'; @@ -69,6 +70,8 @@ TimestampService.prototype.blockHandler = function(block, connectBlock, callback self.currentBlock = block.hash; self.currentTimestamp = timestamp; + // TODO combine with block header + operations = operations.concat( [ { @@ -88,7 +91,16 @@ TimestampService.prototype.blockHandler = function(block, connectBlock, callback }); }; -TimestampService.prototype.getTimestamp = function(hash, callback) { +BlockService.prototype.getBlock = function(hash, callback) { + // get block header + // get individual transactions +}; + +BlockService.prototype.getBlockByHeight = function(height, callback) { + +}; + +BlockService.prototype.getTimestamp = function(hash, callback) { var self = this; if (hash === self.currentBlock) { @@ -107,40 +119,40 @@ TimestampService.prototype.getTimestamp = function(hash, callback) { }); }; -TimestampService.prototype._encodeBlockTimestampKey = function(hash) { +BlockService.prototype._encodeBlockTimestampKey = function(hash) { return Buffer.concat([this.prefix, new Buffer(hash, 'hex')]); }; -TimestampService.prototype._decodeBlockTimestampKey = function(buffer) { +BlockService.prototype._decodeBlockTimestampKey = function(buffer) { return buffer.slice(2).toString('hex'); }; -TimestampService.prototype._encodeBlockTimestampValue = function(timestamp) { +BlockService.prototype._encodeBlockTimestampValue = function(timestamp) { var timestampBuffer = new Buffer(new Array(8)); timestampBuffer.writeDoubleBE(timestamp); return timestampBuffer; }; -TimestampService.prototype._decodeBlockTimestampValue = function(buffer) { +BlockService.prototype._decodeBlockTimestampValue = function(buffer) { return buffer.readDoubleBE(0); }; -TimestampService.prototype._encodeTimestampBlockKey = function(timestamp) { +BlockService.prototype._encodeTimestampBlockKey = function(timestamp) { var timestampBuffer = new Buffer(new Array(8)); timestampBuffer.writeDoubleBE(timestamp); return Buffer.concat([this.prefix, timestampBuffer]); }; -TimestampService.prototype._decodeTimestampBlockKey = function(buffer) { +BlockService.prototype._decodeTimestampBlockKey = function(buffer) { return buffer.readDoubleBE(2); }; -TimestampService.prototype._encodeTimestampBlockValue = function(hash) { +BlockService.prototype._encodeTimestampBlockValue = function(hash) { return new Buffer(hash, 'hex'); }; -TimestampService.prototype._decodeTimestampBlockValue = function(buffer) { +BlockService.prototype._decodeTimestampBlockValue = function(buffer) { return buffer.toString('hex'); }; -module.exports = TimestampService; +module.exports = BlockService; diff --git a/lib/services/db.js b/lib/services/db.js index ecd4299d..64461ad4 100644 --- a/lib/services/db.js +++ b/lib/services/db.js @@ -533,6 +533,40 @@ DB.prototype.runAllBlockHandlers = function(block, add, callback) { ); }; +DB.prototype.runAllConcurrentBlockHandlers = function(block, add, callback) { + var self = this; + var operations = []; + + async.each( + this.node.services, + function(mod, next) { + if(mod.concurrenctBlockHandler) { + $.checkArgument(typeof mod.concurrenctBlockHandler === 'function', 'concurrentBlockHandler must be a function'); + + mod.concurrentBlockHandler.call(mod, block, add, function(err, ops) { + if (err) { + return next(err); + } + if (ops) { + $.checkArgument(Array.isArray(ops), 'concurrentBlockHandler for ' + mod.name + ' returned non-array'); + operations = operations.concat(ops); + } + next(); + }); + } else { + setImmediate(next); + } + }, + function(err) { + if (err) { + return callback(err); + } + + callback(null, operations); + } + ); +}; + /** * This function will find the common ancestor between the current chain and a forked block, * by moving backwards on both chains until there is a meeting point. @@ -653,13 +687,165 @@ DB.prototype.syncRewind = function(block, done) { }); }; +DB.prototype.sync = function() { + if (self.bitcoindSyncing || self.node.stopping || !self.tip) { + return; + } + + self.bitcoindSyncing = true; + + self._getBitcoindBlocks(); + self._concurrentSync(); + self._serialSync(); +}; + +DB.prototype._concurrentSync = function() { + var self = this; + + async.whilst(function() { + return self.concurrentHeight < self.node.services.bitcoind.height && !self.node.stopping; + }, function(done) { + if(!self.blockBuffer.length) { + // wait to get more blocks from bitcoind + return setTimeout(done, 1000); + } + + // get x blocks off block buffer + var blocks = self.blockBuffer.slice(0, 10); + self.blockBuffer = self.blockBuffer.slice(10); + + var operations = []; + async.each(blocks, function(block, next) { + self.runAllConcurrentBlockHandlers(block, add, true, function(err, ops) { + if(err) { + return next(err); + } + + operations = operations.concat(ops); + next(); + }); + }, function(err) { + if(err) { + return done(err); + } + + // push concurrency height + self.concurrentHeight += blocks.length; + + operations.push({ + + }); + + log.debug('Updating the database with operations', operations); + self.store.batch(operations, done); + }); + }, function(err) { + if (err) { + Error.captureStackTrace(err); + return self.node.emit('error', err); + } + }); +}; + +DB.prototype._serialSync = function() { + var self = this; + // depends on block and transaction services + + var height = self.tip.__height; + async.whilst(function() { + return height < self.node.services.bitcoind.height && !self.node.stopping; + }, function(done) { + if(!self.node.services.block || !self.node.services.transaction || height >= self.concurrentHeight) { + // wait + return setTimeout(done, 1000); + } + + // get block from block services + self.node.services.block.getBlockByHeight(height + 1, function(err, block) { + if(err) { + return done(err); + } + + if(block.prevHash.reverse().toString('hex') !== self.tip.hash) { + // TODO need to rewind our serial blocks as well as concurrent blocks! + } + + + // This block appends to the current chain tip and we can + // immediately add it to the chain and create indexes. + + // Populate height + block.__height = self.tip.__height + 1; + + // Create indexes + self.connectBlock(block, function(err) { + if (err) { + return done(err); + } + self.tip = block; + log.debug('Chain added block to main chain'); + self.emit('addblock', block); + setImmediate(done); + }); + }); + }, function(err) { + if (err) { + Error.captureStackTrace(err); + return self.node.emit('error', err); + } + + if(self.node.stopping) { + self.bitcoindSyncing = false; + return; + } + + if (self.node.services.bitcoind.isSynced()) { + self.bitcoindSyncing = false; + self.node.emit('synced'); + } else { + self.bitcoindSyncing = false; + } + }); +}; + +DB.prototype._getBitcoindBlocks = function() { + var self = this; + + // keep n blocks in block buffer + // when we run out get more from bitcoind + + var lastHeight = self.tip.__height; + + async.whilst(function() { + return self.lastHeight < self.node.services.bitcoind.height && !self.node.stopping; + }, function(done) { + var blockCount = Math.min(self.node.services.bitcoind.height - lastHeight, 30 - self.blockBuffer.length); + + async.timesLimit(blockCount, 5, function(n, next) { + self.node.services.bitcoind.getBlock(lastHeight + n + 1, function(err, block) { + if(err) { + return next(err); + } + + self.blockBuffer.push(block); + next(); + }); + }, done); + }, function(err) { + if (err) { + Error.captureStackTrace(err); + return self.node.emit('error', err); + } + }); +}; + /** * This function will synchronize additional indexes for the chain based on * the current active chain in the bitcoin daemon. In the event that there is * a reorganization in the daemon, the chain will rewind to the last common * ancestor and then resume syncing. */ -DB.prototype.sync = function() { +DB.prototype.syncOld = function() { var self = this; if (self.bitcoindSyncing || self.node.stopping || !self.tip) { From a220bbc43c3bf3e77f931dd03c3cd5bce0f2564f Mon Sep 17 00:00:00 2001 From: Patrick Nagurny Date: Wed, 25 Jan 2017 16:37:38 -0500 Subject: [PATCH 02/11] wip --- lib/services/db.js | 224 +++++++++++++++++++----- lib/services/{block.js => timestamp.js} | 46 ++--- lib/services/transaction.js | 3 +- 3 files changed, 196 insertions(+), 77 deletions(-) rename lib/services/{block.js => timestamp.js} (68%) diff --git a/lib/services/db.js b/lib/services/db.js index 64461ad4..bba2956d 100644 --- a/lib/services/db.js +++ b/lib/services/db.js @@ -65,6 +65,10 @@ function DB(options) { transaction: [], block: [] }; + + this.concurrentSyncBuffer = []; + this.syncBuffer = {}; + this.concurrentBlocks = 10; } util.inherits(DB, Service); @@ -189,7 +193,7 @@ DB.prototype.start = function(callback) { return callback(err); } - setImmediate(callback); + self.loadConcurrencySyncHeight(callback); }); }); }; @@ -319,6 +323,31 @@ DB.prototype.loadTip = function(callback) { }); }; +DB.prototype.loadConcurrencySyncHeight = function(callback) { + var self = this; + + console.log('loadConcurrencySyncHeight'); + var options = { + keyEncoding: 'string', + valueEncoding: 'binary' + }; + + self.store.get(self.dbPrefix + 'concurrentHeight', options, function(err, heightBuffer) { + if(err) { + if(err.notFound) { + console.log('setting to 0'); + self.concurrentHeight = 0; + return callback(); + } + return callback(err); + } + + + self.concurrentHeight = heightBuffer.readUInt32BE(); + callback(); + }); +}; + /** * Will get a block from bitcoind and give a Bitcore Block * @param {String|Number} hash - A block hash or block height @@ -551,6 +580,7 @@ DB.prototype.runAllConcurrentBlockHandlers = function(block, add, callback) { $.checkArgument(Array.isArray(ops), 'concurrentBlockHandler for ' + mod.name + ' returned non-array'); operations = operations.concat(ops); } + next(); }); } else { @@ -688,35 +718,41 @@ DB.prototype.syncRewind = function(block, done) { }; DB.prototype.sync = function() { - if (self.bitcoindSyncing || self.node.stopping || !self.tip) { + console.log('sync'); + if (this.bitcoindSyncing || this.node.stopping || !this.tip) { return; } - self.bitcoindSyncing = true; + this.bitcoindSyncing = true; - self._getBitcoindBlocks(); - self._concurrentSync(); - self._serialSync(); + this._getBitcoindBlocks(); + this._concurrentSync(); + this._serialSync(); }; -DB.prototype._concurrentSync = function() { +DB.prototype._concurrentSync = function(callback) { var self = this; + console.log('concurrentSync'); + + console.log(self.concurrentHeight, self.node.services.bitcoind.height); async.whilst(function() { return self.concurrentHeight < self.node.services.bitcoind.height && !self.node.stopping; }, function(done) { - if(!self.blockBuffer.length) { + if(self.concurrentSyncBuffer.length < Math.min(self.concurrentBlocks, self.node.services.bitcoind.height - self.concurrentHeight)) { + console.log('cannot concurrentSync... waiting for blocks'); // wait to get more blocks from bitcoind return setTimeout(done, 1000); } // get x blocks off block buffer - var blocks = self.blockBuffer.slice(0, 10); - self.blockBuffer = self.blockBuffer.slice(10); + var blocks = self.concurrentSyncBuffer.slice(0, self.concurrentBlocks); + self.concurrentSyncBuffer = self.concurrentSyncBuffer.slice(self.concurrentBlocks); + console.log('Processing ' + blocks.length + ' blocks'); var operations = []; async.each(blocks, function(block, next) { - self.runAllConcurrentBlockHandlers(block, add, true, function(err, ops) { + self.runAllConcurrentBlockHandlers(block, true, function(err, ops) { if(err) { return next(err); } @@ -729,15 +765,28 @@ DB.prototype._concurrentSync = function() { return done(err); } - // push concurrency height - self.concurrentHeight += blocks.length; + var newHeight = self.concurrentHeight + blocks.length; + + // push concurrent height + var heightBuffer = new Buffer(4); + + heightBuffer.writeUInt32BE(newHeight); operations.push({ - + type: 'put', + key: self.dbPrefix + 'concurrentHeight', + value: heightBuffer }); log.debug('Updating the database with operations', operations); - self.store.batch(operations, done); + self.store.batch(operations, function(err) { + if(err) { + return done(err); + } + + self.concurrentHeight += blocks.length; + done(); + }); }); }, function(err) { if (err) { @@ -747,46 +796,45 @@ DB.prototype._concurrentSync = function() { }); }; -DB.prototype._serialSync = function() { +DB.prototype._serialSync = function(callback) { + console.log('serialSync'); var self = this; // depends on block and transaction services var height = self.tip.__height; + console.log('height', height, typeof(height)); async.whilst(function() { return height < self.node.services.bitcoind.height && !self.node.stopping; }, function(done) { - if(!self.node.services.block || !self.node.services.transaction || height >= self.concurrentHeight) { + if(height >= self.concurrentHeight || !self.syncBuffer[height + 1]) { // wait + console.log('cannot serialSync... waiting for block ' + (height + 1)); return setTimeout(done, 1000); } - // get block from block services - self.node.services.block.getBlockByHeight(height + 1, function(err, block) { - if(err) { + // get block from memory + var block = self.syncBuffer[height + 1]; + if(block.header.prevHash.reverse().toString('hex') !== self.tip.hash) { + // TODO need to rewind our serial blocks as well as concurrent blocks! + } + + // This block appends to the current chain tip and we can + // immediately add it to the chain and create indexes. + + // Populate height + block.__height = self.tip.__height + 1; + + // Create indexes + self.connectBlock(block, function(err) { + if (err) { return done(err); } - - if(block.prevHash.reverse().toString('hex') !== self.tip.hash) { - // TODO need to rewind our serial blocks as well as concurrent blocks! - } - - - // This block appends to the current chain tip and we can - // immediately add it to the chain and create indexes. - - // Populate height - block.__height = self.tip.__height + 1; - - // Create indexes - self.connectBlock(block, function(err) { - if (err) { - return done(err); - } - self.tip = block; - log.debug('Chain added block to main chain'); - self.emit('addblock', block); - setImmediate(done); - }); + self.tip = block; + // remove from memory + delete self.syncBuffer[height + 1]; + log.debug('Chain added block to main chain'); + self.emit('addblock', block); + done(); }); }, function(err) { if (err) { @@ -819,18 +867,54 @@ DB.prototype._getBitcoindBlocks = function() { async.whilst(function() { return self.lastHeight < self.node.services.bitcoind.height && !self.node.stopping; }, function(done) { - var blockCount = Math.min(self.node.services.bitcoind.height - lastHeight, 30 - self.blockBuffer.length); + var blockCount = Math.min(self.node.services.bitcoind.height - lastHeight, 10 - self.blockBuffer.length); - async.timesLimit(blockCount, 5, function(n, next) { - self.node.services.bitcoind.getBlock(lastHeight + n + 1, function(err, block) { + async.timesLimit(blockCount, 10, function(n, next) { + var height = lastHeight + n + 1; + self.node.services.bitcoind.getBlock(height, function(err, block) { if(err) { return next(err); } - self.blockBuffer.push(block); + self.blockBuffer[height] = block; + next(); }); - }, done); + }, function(err) { + if(err) { + return done(err); + } + + async.timesLimit(blockCount, 10, function(n, next) { + var block = self.blockBuffer[lastHeight + n + 1]; + + self._concurrentSync(block, function(err) { + if(err) { + return next(err); + } + + self._serialSync(block, function(err) { + if(err) { + log.err(err); + return node.stop(function() { + process.exit(1); + }); + } + + delete self.blockBuffer[lastHeight + i + 1]; + }); + + next(); + }); + }, function(err) { + if(err) { + return done(err); + } + + lastHeight += blockCount; + done(); + }); + }); }, function(err) { if (err) { Error.captureStackTrace(err); @@ -839,6 +923,52 @@ DB.prototype._getBitcoindBlocks = function() { }); }; +DB.prototype._getBitcoindBlocks = function() { + var self = this; + console.log('getBitcoindBlocks'); + + // keep n blocks in block buffer + // when we run out get more from bitcoind + + var lastHeight = self.tip.__height; + + async.whilst(function() { + return lastHeight < self.node.services.bitcoind.height; + }, function(done) { + var blockCount = Math.min(self.node.services.bitcoind.height - lastHeight, 20 - Object.keys(self.syncBuffer).length); + + if(!blockCount) { + return setTimeout(done, 1000); + } + async.timesLimit(blockCount, self.concurrentBlocks, function(n, next) { + var height = lastHeight + n + 1; + self.node.services.bitcoind.getBlock(height, function(err, block) { + if(err) { + return next(err); + } + + self.concurrentSyncBuffer.push(block); + self.syncBuffer[height] = block; + + next(); + }); + }, function(err) { + if(err) { + return done(err); + } + + lastHeight += blockCount; + done(); + }); + }, function(err) { + console.log('completed', err); + if (err) { + Error.captureStackTrace(err); + return self.node.emit('error', err); + } + }); +}; + /** * This function will synchronize additional indexes for the chain based on * the current active chain in the bitcoin daemon. In the event that there is diff --git a/lib/services/block.js b/lib/services/timestamp.js similarity index 68% rename from lib/services/block.js rename to lib/services/timestamp.js index 6268fdeb..e69b5b61 100644 --- a/lib/services/block.js +++ b/lib/services/timestamp.js @@ -2,20 +2,19 @@ var BaseService = require('../service'); var inherits = require('util').inherits; -function BlockService(options) { +function TimestampService(options) { BaseService.call(this, options); this.currentBlock = null; this.currentTimestamp = null; } -inherits(BlockService, BaseService); +inherits(TimestampService, BaseService); -BlockService.dependencies = [ - 'db', - 'transaction' +TimestampService.dependencies = [ + 'db' ]; -BlockService.prototype.start = function(callback) { +TimestampService.prototype.start = function(callback) { var self = this; this.store = this.node.services.db.store; @@ -31,11 +30,11 @@ BlockService.prototype.start = function(callback) { }); }; -BlockService.prototype.stop = function(callback) { +TimestampService.prototype.stop = function(callback) { setImmediate(callback); }; -BlockService.prototype.blockHandler = function(block, connectBlock, callback) { +TimestampService.prototype.blockHandler = function(block, connectBlock, callback) { var self = this; var action = 'put'; @@ -70,8 +69,6 @@ BlockService.prototype.blockHandler = function(block, connectBlock, callback) { self.currentBlock = block.hash; self.currentTimestamp = timestamp; - // TODO combine with block header - operations = operations.concat( [ { @@ -91,16 +88,7 @@ BlockService.prototype.blockHandler = function(block, connectBlock, callback) { }); }; -BlockService.prototype.getBlock = function(hash, callback) { - // get block header - // get individual transactions -}; - -BlockService.prototype.getBlockByHeight = function(height, callback) { - -}; - -BlockService.prototype.getTimestamp = function(hash, callback) { +TimestampService.prototype.getTimestamp = function(hash, callback) { var self = this; if (hash === self.currentBlock) { @@ -119,40 +107,40 @@ BlockService.prototype.getTimestamp = function(hash, callback) { }); }; -BlockService.prototype._encodeBlockTimestampKey = function(hash) { +TimestampService.prototype._encodeBlockTimestampKey = function(hash) { return Buffer.concat([this.prefix, new Buffer(hash, 'hex')]); }; -BlockService.prototype._decodeBlockTimestampKey = function(buffer) { +TimestampService.prototype._decodeBlockTimestampKey = function(buffer) { return buffer.slice(2).toString('hex'); }; -BlockService.prototype._encodeBlockTimestampValue = function(timestamp) { +TimestampService.prototype._encodeBlockTimestampValue = function(timestamp) { var timestampBuffer = new Buffer(new Array(8)); timestampBuffer.writeDoubleBE(timestamp); return timestampBuffer; }; -BlockService.prototype._decodeBlockTimestampValue = function(buffer) { +TimestampService.prototype._decodeBlockTimestampValue = function(buffer) { return buffer.readDoubleBE(0); }; -BlockService.prototype._encodeTimestampBlockKey = function(timestamp) { +TimestampService.prototype._encodeTimestampBlockKey = function(timestamp) { var timestampBuffer = new Buffer(new Array(8)); timestampBuffer.writeDoubleBE(timestamp); return Buffer.concat([this.prefix, timestampBuffer]); }; -BlockService.prototype._decodeTimestampBlockKey = function(buffer) { +TimestampService.prototype._decodeTimestampBlockKey = function(buffer) { return buffer.readDoubleBE(2); }; -BlockService.prototype._encodeTimestampBlockValue = function(hash) { +TimestampService.prototype._encodeTimestampBlockValue = function(hash) { return new Buffer(hash, 'hex'); }; -BlockService.prototype._decodeTimestampBlockValue = function(buffer) { +TimestampService.prototype._decodeTimestampBlockValue = function(buffer) { return buffer.toString('hex'); }; -module.exports = BlockService; +module.exports = TimestampService; diff --git a/lib/services/transaction.js b/lib/services/transaction.js index a4d45798..ec293294 100644 --- a/lib/services/transaction.js +++ b/lib/services/transaction.js @@ -18,6 +18,7 @@ var utils = require('./wallet-api/utils'); function TransactionService(options) { BaseService.call(this, options); this.concurrency = options.concurrency || 20; + /* upon initialization of the mempool, only txids are obtained from * a trusted bitcoin node (the bitcoind service's rpchost or p2p option) * Since, the mempool is very temporal, I see no reason to take the @@ -27,7 +28,7 @@ function TransactionService(options) { * then call to the trusted bitcoind will take place and the result set. */ this._mempool = LRU({ - max: utils.parseByteCount(options.maxMemPoolSize) || 100 * 1024 * 1024, //100MB + max: 100 * 1024 * 1024, //100MB length: function(tx) { if (tx) { return tx.toBuffer().length; } } }); this.currentTransactions = {}; From 77c9a149dc487d995bcd44b424facbf298ed6bda Mon Sep 17 00:00:00 2001 From: Patrick Nagurny Date: Thu, 26 Jan 2017 15:47:16 -0500 Subject: [PATCH 03/11] wip --- lib/services/db.js | 213 ++++++++++++++------------------ lib/services/db/sync.js | 266 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 360 insertions(+), 119 deletions(-) create mode 100644 lib/services/db/sync.js diff --git a/lib/services/db.js b/lib/services/db.js index bba2956d..f7dc6a6d 100644 --- a/lib/services/db.js +++ b/lib/services/db.js @@ -16,6 +16,7 @@ var errors = index.errors; var log = index.log; var Transaction = require('../transaction'); var Service = require('../service'); +var Sync = require('./sync'); /** * This service synchronizes a leveldb database with bitcoin block chain by connecting and @@ -66,9 +67,7 @@ function DB(options) { block: [] }; - this.concurrentSyncBuffer = []; - this.syncBuffer = {}; - this.concurrentBlocks = 10; + this._sync = new Sync(this.node); } util.inherits(DB, Service); @@ -165,15 +164,29 @@ DB.prototype.start = function(callback) { this.store = levelup(this.dataPath, { db: this.levelupStore, maxOpenFiles: this.maxOpenFiles, keyEncoding: 'binary', valueEncoding: 'binary'}); this.node.services.bitcoind.on('tx', this.transactionHandler.bind(this)); + this._sync.on('error', function(err) { + log.err(err); + }); + + this._sync.on('reorg', function() { + throw new Error('Reorg detected! Unhandled :(') + }); + + this._sync.on('initialsync', function() { + log.info('Initial sync complete'); + }); + + this.node.on('stopping', function() { + self._sync.stop(); + }); + this.node.once('ready', function() { // start syncing - self.sync(); + self._sync.initialSync(); // Notify that there is a new tip self.node.services.bitcoind.on('tip', function() { - if(!self.node.stopping) { - self.sync(); - } + self._sync.sync(); }); }); @@ -497,6 +510,16 @@ DB.prototype.disconnectBlock = function(block, callback) { this.runAllBlockHandlers(block, false, callback); }; +DB.prototype.getTipOperation = function(add) { + if(add) { + heightBuffer.writeUInt32BE(block.__height); + tipData = Buffer.concat([new Buffer(block.hash, 'hex'), heightBuffer]); + } else { + heightBuffer.writeUInt32BE(block.__height - 1); + tipData = Buffer.concat([BufferUtil.reverse(block.header.prevHash), heightBuffer]); + } +}; + /** * Will collect all database operations for a block from other services that implement * `blockHandler` methods and then save operations to the database. @@ -717,38 +740,10 @@ DB.prototype.syncRewind = function(block, done) { }); }; -DB.prototype.sync = function() { - console.log('sync'); - if (this.bitcoindSyncing || this.node.stopping || !this.tip) { - return; - } - - this.bitcoindSyncing = true; - - this._getBitcoindBlocks(); - this._concurrentSync(); - this._serialSync(); -}; - -DB.prototype._concurrentSync = function(callback) { +DB.prototype._concurrentSync = function(block, callback) { var self = this; console.log('concurrentSync'); - console.log(self.concurrentHeight, self.node.services.bitcoind.height); - - async.whilst(function() { - return self.concurrentHeight < self.node.services.bitcoind.height && !self.node.stopping; - }, function(done) { - if(self.concurrentSyncBuffer.length < Math.min(self.concurrentBlocks, self.node.services.bitcoind.height - self.concurrentHeight)) { - console.log('cannot concurrentSync... waiting for blocks'); - // wait to get more blocks from bitcoind - return setTimeout(done, 1000); - } - - // get x blocks off block buffer - var blocks = self.concurrentSyncBuffer.slice(0, self.concurrentBlocks); - self.concurrentSyncBuffer = self.concurrentSyncBuffer.slice(self.concurrentBlocks); - console.log('Processing ' + blocks.length + ' blocks'); var operations = []; async.each(blocks, function(block, next) { @@ -788,95 +783,59 @@ DB.prototype._concurrentSync = function(callback) { done(); }); }); - }, function(err) { - if (err) { - Error.captureStackTrace(err); - return self.node.emit('error', err); - } - }); }; -DB.prototype._serialSync = function(callback) { +DB.prototype._serialSync = function(block, callback) { console.log('serialSync'); var self = this; - // depends on block and transaction services - var height = self.tip.__height; - console.log('height', height, typeof(height)); - async.whilst(function() { - return height < self.node.services.bitcoind.height && !self.node.stopping; - }, function(done) { - if(height >= self.concurrentHeight || !self.syncBuffer[height + 1]) { - // wait - console.log('cannot serialSync... waiting for block ' + (height + 1)); - return setTimeout(done, 1000); - } - - // get block from memory - var block = self.syncBuffer[height + 1]; - if(block.header.prevHash.reverse().toString('hex') !== self.tip.hash) { - // TODO need to rewind our serial blocks as well as concurrent blocks! - } - - // This block appends to the current chain tip and we can - // immediately add it to the chain and create indexes. - - // Populate height - block.__height = self.tip.__height + 1; - - // Create indexes - self.connectBlock(block, function(err) { - if (err) { - return done(err); - } - self.tip = block; - // remove from memory - delete self.syncBuffer[height + 1]; - log.debug('Chain added block to main chain'); - self.emit('addblock', block); - done(); - }); - }, function(err) { + // Create indexes + self.connectBlock(block, function(err) { if (err) { - Error.captureStackTrace(err); - return self.node.emit('error', err); - } - - if(self.node.stopping) { - self.bitcoindSyncing = false; - return; - } - - if (self.node.services.bitcoind.isSynced()) { - self.bitcoindSyncing = false; - self.node.emit('synced'); - } else { - self.bitcoindSyncing = false; + return callback(err); } + self.tip = block; + log.debug('Chain added block to main chain'); + self.emit('addblock', block); + callback(); }); }; -DB.prototype._getBitcoindBlocks = function() { +DB.prototype.sync = function() { var self = this; + if (this.bitcoindSyncing || this.node.stopping || !this.tip) { + return; + } + + this.bitcoindSyncing = true; + // keep n blocks in block buffer // when we run out get more from bitcoind var lastHeight = self.tip.__height; - async.whilst(function() { - return self.lastHeight < self.node.services.bitcoind.height && !self.node.stopping; - }, function(done) { - var blockCount = Math.min(self.node.services.bitcoind.height - lastHeight, 10 - self.blockBuffer.length); + var blocks = {}; - async.timesLimit(blockCount, 10, function(n, next) { + var syncError = null; + + async.whilst(function() { + return lastHeight < self.node.services.bitcoind.height && !self.node.stopping && !syncError; + }, function(done) { + var blockCount = Math.min(self.node.services.bitcoind.height - lastHeight, 30 - Object.keys(blocks).length, 5); + + if(!blockCount) { + return setTimeout(done, 1000); + } + + async.times(blockCount, function(n, next) { var height = lastHeight + n + 1; self.node.services.bitcoind.getBlock(height, function(err, block) { if(err) { return next(err); } - self.blockBuffer[height] = block; + blocks[height] = block; next(); }); @@ -885,44 +844,60 @@ DB.prototype._getBitcoindBlocks = function() { return done(err); } - async.timesLimit(blockCount, 10, function(n, next) { - var block = self.blockBuffer[lastHeight + n + 1]; + for(var i = 0; i < blockCount.length; i++) { + var block = blocks[lastHeight + i + 1]; + block.__height = lastHeight + i + 1; self._concurrentSync(block, function(err) { if(err) { - return next(err); + syncError = err; + return; } self._serialSync(block, function(err) { if(err) { - log.err(err); - return node.stop(function() { - process.exit(1); - }); + syncError = err; + return; } - delete self.blockBuffer[lastHeight + i + 1]; + delete blocks[lastHeight + i + 1]; + + if(self.tip.__height === self.node.services.bitcoind.height) { + self.bitcoindSyncing = false; + if(self.isSynced()) { + self.node.emit('synced'); + } + } }); - - next(); }); - }, function(err) { - if(err) { - return done(err); - } + } - lastHeight += blockCount; - done(); - }); + lastHeight += blockCount; + done(); }); }, function(err) { if (err) { Error.captureStackTrace(err); - return self.node.emit('error', err); + self.node.emit('error', err); + return; + } + + if(syncError) { + Error.captureStackTrace(syncError); + self.node.emit('error', syncError); + return; + } + + if(self.node.stopping) { + self.bitcoindSyncing = false; } }); }; +DB.prototype.isSynced = function() { + return self.bitcoind.isSynced() && self.tip.__height === self.node.services.bitcoind.height; +}; + DB.prototype._getBitcoindBlocks = function() { var self = this; console.log('getBitcoindBlocks'); diff --git a/lib/services/db/sync.js b/lib/services/db/sync.js new file mode 100644 index 00000000..e5ed75cd --- /dev/null +++ b/lib/services/db/sync.js @@ -0,0 +1,266 @@ +'use strict' +var Readable = require('stream').Readable; +var Writable = require('stream').Writable; +var Transform = require('stream').Transform; +var inherits = require('util').inherits; +var EventEmitter = require('events').EventEmitter; +var async = require('async'); + +function Sync(node) { + this.node = node; + this.db = node.db; + this.syncing = false; +} + +inherits(Sync, EventEmitter); + +// Use a concurrent strategy to sync +Sync.prototype.initialSync = function() { + var self = this; + + if(this.syncing) { + return + } + + this.syncing = true; + this.blockStream = new BlockStream(this.node.services.bitcoind, this.db.tip); + var processConcurrent = new ProcessConcurrent(this.db); + var processSerial = new ProcessSerial(this.db); + var writeStream1 = new WriteStream(this.db); + var writeStream2 = new WriteStream(this.db); + + var start = Date.now(); + + this._handleErrors(this.blockStream); + this._handleErrors(processConcurrent); + this._handleErrors(processSerial); + this._handleErrors(writeStream1); + this._handleErrors(writeStream2); + + writeStream2.on('finish', function() { + var end = Date.now(); + console.log('Total time: ', (end - start) + ' ms'); + console.log('Concurrent write time: ', writeStreamSlow.writeTime + ' ms'); + console.log('Serial write time: ', writeStreamFast.writeTime + ' ms'); + + self.syncing = false; + self.emit('initialsync'); + }); + + this.blockStream + .pipe(processConcurrent) + .pipe(writeStream1); + + this.blockStream + .pipe(processSerial) + .pipe(writeStream2); +}; + +// Get concurrent and serial block operations and write them together block by block +// Useful for when we are fully synced and we want to advance the concurrentTip and +// the tip together +Sync.prototype.sync = function() { + if(this.syncing) { + return + } + + this.syncing = true; + this.blockStream = new BlockStream(); + var processBoth = new ProcessBoth(this.db); + var writeStream = new WriteStream(this.db); + + var start = Date.now(); + + this._handleErrors(this.blockStream); + this._handleErrors(processBoth); + this._handleErrors(writeStream); + + writeStream.on('finish', function() { + var end = Date.now(); + console.log('Total time: ', (end - start) + ' ms'); + console.log('Concurrent write time: ', writeStreamSlow.writeTime + ' ms'); + console.log('Serial write time: ', writeStreamFast.writeTime + ' ms'); + self.emit('synced'); + }); + + this.blockStream + .pipe(processBoth) + .pipe(writeStream1); +}; + +Sync.prototype.stop = function() { + if(this.blockStream) { + this.blockStream.destroy(); + } +}; + +Sync.prototype._handleErrors = function(stream) { + var self = this; + + stream.on('error', function(err) { + self.syncing = false; + + if(err.reorg) { + return self.emit('reorg'); + } + + self.emit('error', err); + }); +}; + +function BlockStream(bitcoind, lastHeight) { + Readable.call(this, {objectMode: true, highWaterMark: 10}); + this.bitcoind = bitcoind; + this.lastHeight = lastHeight; +} + +inherits(BlockStream, Readable); + +BlockStream.prototype._read = function() { + var self = this; + + var blockCount = Math.min(self.bitcoind.height - self.lastHeight, 5); + + if(blockCount <= 0) { + return self.push(null); + } + + console.log('Fetching blocks ' + (self.lastHeight + 1) + ' to ' (self.lastHeight + blockCount.length)); + + async.times(blockCount, function(n, next) { + var height = self.lastHeight + n + 1; + self.bitcoind.getBlock(height, function(err, block) { + if(err) { + return next(err); + } + + next(null, block); + }); + }, function(err, blocks) { + if(err) { + return self.emit('error', err); + } + + for(var i = 0; i < blocks.length; i++) { + self.push(blocks[i]); + } + + self.lastHeight += blocks.length; + }); +}; + +function ProcessSerial(db, tip) { + Transform.call(this, {objectMode: true, highWaterMark: 10}); + this.db = db; + this.tip = tip; +} + +inherits(ProcessSerial, Transform); + +ProcessSerial.prototype._transform = function(block, enc, callback) { + var self = this; + + var prevHash = BufferUtil.reverse(block.header.prevHash).toString('hex'); + if(prevHash !== self.tip) { + var err = new Error('Reorg detected'); + err.reorg = true; + return callback(err); + } + + async.whilst( + function() { + return self.db.concurrentHeight < block.__height; + }, + function(next) { + setTimeout(next, 1000); + }, + function() { + var operations = [{index1: block.height}, {index2: block.height}]; + setTimeout(function() { + var obj = { + tipHeight: block.height, + operations: operations + }; + + callback(null, obj); + }, 100); + } + ); +}; + +function ProcessConcurrent(db) { + Transform.call(this, {objectMode: true, highWaterMark: 10}); + this.db = db; + this.operations = []; + this.lastHeight = 0; +}; + +inherits(ProcessConcurrent, Transform); + +ProcessConcurrent.prototype._transform = function(block, enc, callback) { + var self = this; + + this.lastHeight = block.__height; + + self.db.runAllConcurrentBlockHandlers(block, true, function(err, operations) { + if(err) { + return callback(err); + } + + self.operations = self.operations.concat(operations); + + if(self.operations >= 100) { + // TODO add tip operation + var obj = { + concurrentTipHeight: block.__height, + operations: self.operations + } + self.operations = []; + + return callback(null, obj); + } + + callback(); + }); +}; + +ProcessConcurrent.prototype._flush = function(callback) { + if(this.operations.length) { + // TODO add tip operation + var obj = { + concurrentTipHeight: this.lastHeight, + operations: this.operations + }; + + this.operations = []; + return callback(null, operations); + } +}; + +function WriteStream(db) { + Writable.call(this, {objectMode: true, highWaterMark: 10}); + this.db = db; + this.writeTime = 0; +} + +inherits(WriteStream, Writable); + +WriteStream.prototype._write = function(obj, enc, callback) { + var self = this; + setTimeout(function() { + console.log('WriteStreamSlow block ', operations.concurrentTipHeight); + self.writeTime += 2000; + + if(obj.tip) { + self.db.tip = obj.tip; + } + + if(obj.concurrentTip) { + self.db.concurrentTip = obj.concurrentTip; + } + + callback(); + }, 2000); +}; + +module.exports = Sync; \ No newline at end of file From 26108753dbb2cc4508ac3ed447ee1fbd9bebc880 Mon Sep 17 00:00:00 2001 From: Patrick Nagurny Date: Thu, 26 Jan 2017 18:09:36 -0500 Subject: [PATCH 04/11] got it working --- lib/services/address/index.js | 2 +- lib/services/{db.js => db/index.js} | 615 +++++++--------------------- lib/services/db/sync.js | 237 ++++++++--- 3 files changed, 311 insertions(+), 543 deletions(-) rename lib/services/{db.js => db/index.js} (58%) diff --git a/lib/services/address/index.js b/lib/services/address/index.js index c764bf25..eea2acd5 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -134,7 +134,7 @@ AddressService.prototype.getPublishEvents = function() { * @param {Boolean} addOutput - If the block is being removed or added to the chain * @param {Function} callback */ -AddressService.prototype.blockHandler = function(block, connectBlock, callback) { +AddressService.prototype.concurrentBlockHandler = function(block, connectBlock, callback) { var self = this; var txs = block.transactions; diff --git a/lib/services/db.js b/lib/services/db/index.js similarity index 58% rename from lib/services/db.js rename to lib/services/db/index.js index f7dc6a6d..ea34b4e1 100644 --- a/lib/services/db.js +++ b/lib/services/db/index.js @@ -11,11 +11,11 @@ var BufferUtil = bitcore.util.buffer; var Networks = bitcore.Networks; var Block = bitcore.Block; var $ = bitcore.util.preconditions; -var index = require('../'); +var index = require('../../'); var errors = index.errors; var log = index.log; -var Transaction = require('../transaction'); -var Service = require('../service'); +var Transaction = require('../../transaction'); +var Service = require('../../service'); var Sync = require('./sync'); /** @@ -67,7 +67,7 @@ function DB(options) { block: [] }; - this._sync = new Sync(this.node); + this._sync = new Sync(this.node, this); } util.inherits(DB, Service); @@ -165,7 +165,7 @@ DB.prototype.start = function(callback) { this.node.services.bitcoind.on('tx', this.transactionHandler.bind(this)); this._sync.on('error', function(err) { - log.err(err); + log.error(err); }); this._sync.on('reorg', function() { @@ -206,7 +206,7 @@ DB.prototype.start = function(callback) { return callback(err); } - self.loadConcurrencySyncHeight(callback); + self.loadConcurrentTip(callback); }); }); }; @@ -336,28 +336,52 @@ DB.prototype.loadTip = function(callback) { }); }; -DB.prototype.loadConcurrencySyncHeight = function(callback) { +DB.prototype.loadConcurrentTip = function(callback) { var self = this; - console.log('loadConcurrencySyncHeight'); var options = { keyEncoding: 'string', valueEncoding: 'binary' }; - self.store.get(self.dbPrefix + 'concurrentHeight', options, function(err, heightBuffer) { - if(err) { - if(err.notFound) { - console.log('setting to 0'); - self.concurrentHeight = 0; - return callback(); - } + self.store.get(self.dbPrefix + 'concurrentTip', options, function(err, tipData) { + if(err && err instanceof levelup.errors.NotFoundError) { + self.concurrentTip = self.genesis; + self.concurrentTip.__height = 0; + return; + } else if(err) { return callback(err); } + var hash = tipData.slice(0, 32).toString('hex'); + var height = tipData.readUInt32BE(32); - self.concurrentHeight = heightBuffer.readUInt32BE(); - callback(); + var times = 0; + async.retry({times: 3, interval: self.retryInterval}, function(done) { + self.getBlock(hash, function(err, concurrentTip) { + if(err) { + times++; + log.warn('Bitcoind does not have our concurrentTip (' + hash + '). Bitcoind may have crashed and needs to catch up.'); + if(times < 3) { + log.warn('Retrying in ' + (self.retryInterval / 1000) + ' seconds.'); + } + return done(err); + } + + done(null, concurrentTip); + }); + }, function(err, concurrentTip) { + if(err) { + log.warn('Giving up after 3 tries. Please report this bug to https://github.com/bitpay/bitcore-node/issues'); + log.warn('Please reindex your database.'); + return callback(err); + } + + concurrentTip.__height = height; + self.concurrentTip = concurrentTip; + + callback(); + }); }); }; @@ -496,8 +520,30 @@ DB.prototype.getPrevHash = function(blockHash, callback) { * @param {Function} callback */ DB.prototype.connectBlock = function(block, callback) { + var self = this; + log.debug('DB handling new chain block'); - this.runAllBlockHandlers(block, true, callback); + var operations = []; + self.getConcurrentBlockOperations(block, true, function(err, ops) { + if(err) { + return callback(err); + } + + operations = ops; + + self.getSerialBlockOperations(block, true, function(err, ops) { + if(err) { + return callback(err); + } + + operations = operations.concat(ops); + + operations.push(self.getTipOperation(block, true)); + operations.push(self.getConcurrentTipOperation(block, true)); + + self.store.batch(operations, callback); + }); + }); }; /** @@ -506,94 +552,41 @@ DB.prototype.connectBlock = function(block, callback) { * @param {Function} callback */ DB.prototype.disconnectBlock = function(block, callback) { - log.debug('DB removing chain block'); - this.runAllBlockHandlers(block, false, callback); -}; - -DB.prototype.getTipOperation = function(add) { - if(add) { - heightBuffer.writeUInt32BE(block.__height); - tipData = Buffer.concat([new Buffer(block.hash, 'hex'), heightBuffer]); - } else { - heightBuffer.writeUInt32BE(block.__height - 1); - tipData = Buffer.concat([BufferUtil.reverse(block.header.prevHash), heightBuffer]); - } -}; - -/** - * Will collect all database operations for a block from other services that implement - * `blockHandler` methods and then save operations to the database. - * @param {Block} block - The bitcore block - * @param {Boolean} add - If the block is being added/connected or removed/disconnected - * @param {Function} callback - */ -DB.prototype.runAllBlockHandlers = function(block, add, callback) { var self = this; + + log.debug('DB removing chain block'); var operations = []; + self.getConcurrentBlockOperations(block, false, function(err, ops) { + if(err) { + return callback(err); + } - // Notify block subscribers - // for (var i = 0; i < this.subscriptions.block.length; i++) { - // this.subscriptions.block[i].emit('db/block', block.hash); - // } + operations = ops; - // Update tip - var tipData = new Buffer(36); - var heightBuffer = new Buffer(4); - - if(add) { - heightBuffer.writeUInt32BE(block.__height); - tipData = Buffer.concat([new Buffer(block.hash, 'hex'), heightBuffer]); - } else { - heightBuffer.writeUInt32BE(block.__height - 1); - tipData = Buffer.concat([BufferUtil.reverse(block.header.prevHash), heightBuffer]); - } - - operations.push({ - type: 'put', - key: self.dbPrefix + 'tip', - value: tipData - }); - - async.eachSeries( - this.node.services, - function(mod, next) { - if(mod.blockHandler) { - $.checkArgument(typeof mod.blockHandler === 'function', 'blockHandler must be a function'); - - mod.blockHandler.call(mod, block, add, function(err, ops) { - if (err) { - return next(err); - } - if (ops) { - $.checkArgument(Array.isArray(ops), 'blockHandler for ' + mod.name + ' returned non-array'); - operations = operations.concat(ops); - } - next(); - }); - } else { - setImmediate(next); - } - }, - function(err) { - if (err) { + self.getSerialBlockOperations(block, false, function(err, ops) { + if(err) { return callback(err); } - log.debug('Updating the database with operations', operations); + operations = operations.concat(ops); + + operations.push(self.getTipOperation(block, false)); + operations.push(self.getConcurrentTipOperation(block, false)); + self.store.batch(operations, callback); - } - ); + }); + }); }; -DB.prototype.runAllConcurrentBlockHandlers = function(block, add, callback) { +DB.prototype.getConcurrentBlockOperations = function(block, add, callback) { var self = this; var operations = []; async.each( this.node.services, function(mod, next) { - if(mod.concurrenctBlockHandler) { - $.checkArgument(typeof mod.concurrenctBlockHandler === 'function', 'concurrentBlockHandler must be a function'); + if(mod.concurrentBlockHandler) { + $.checkArgument(typeof mod.concurrentBlockHandler === 'function', 'concurrentBlockHandler must be a function'); mod.concurrentBlockHandler.call(mod, block, add, function(err, ops) { if (err) { @@ -620,415 +613,81 @@ DB.prototype.runAllConcurrentBlockHandlers = function(block, add, callback) { ); }; -/** - * This function will find the common ancestor between the current chain and a forked block, - * by moving backwards on both chains until there is a meeting point. - * @param {Block} block - The new tip that forks the current chain. - * @param {Function} done - A callback function that is called when complete. - */ -DB.prototype.findCommonAncestor = function(block, done) { - +DB.prototype.getSerialBlockOperations = function(block, add, callback) { var self = this; + var operations = []; - var mainPosition = self.tip.hash; - var forkPosition = block.hash; + async.eachSeries( + this.node.services, + function(mod, next) { + if(mod.blockHandler) { + $.checkArgument(typeof mod.blockHandler === 'function', 'blockHandler must be a function'); - var mainHashesMap = {}; - var forkHashesMap = {}; + mod.blockHandler.call(mod, block, add, function(err, ops) { + if (err) { + return next(err); + } + if (ops) { + $.checkArgument(Array.isArray(ops), 'blockHandler for ' + mod.name + ' returned non-array'); + operations = operations.concat(ops); + } - mainHashesMap[mainPosition] = true; - forkHashesMap[forkPosition] = true; - - var commonAncestor = null; - - async.whilst( - function() { - return !commonAncestor; - }, - function(next) { - - if(mainPosition) { - var mainBlockIndex = self.node.services.bitcoind.getBlockIndex(mainPosition); - if(mainBlockIndex && mainBlockIndex.prevHash) { - mainHashesMap[mainBlockIndex.prevHash] = true; - mainPosition = mainBlockIndex.prevHash; - } else { - mainPosition = null; - } + next(); + }); + } else { + setImmediate(next); } - - if(forkPosition) { - var forkBlockIndex = self.node.services.bitcoind.getBlockIndex(forkPosition); - if(forkBlockIndex && forkBlockIndex.prevHash) { - forkHashesMap[forkBlockIndex.prevHash] = true; - forkPosition = forkBlockIndex.prevHash; - } else { - forkPosition = null; - } - } - - if(forkPosition && mainHashesMap[forkPosition]) { - commonAncestor = forkPosition; - } - - if(mainPosition && forkHashesMap[mainPosition]) { - commonAncestor = mainPosition; - } - - if(!mainPosition && !forkPosition) { - return next(new Error('Unknown common ancestor')); - } - - setImmediate(next); }, function(err) { - done(err, commonAncestor); + if (err) { + return callback(err); + } + + callback(null, operations); } ); }; -/** - * This function will attempt to rewind the chain to the common ancestor - * between the current chain and a forked block. - * @param {Block} block - The new tip that forks the current chain. - * @param {Function} done - A callback function that is called when complete. - */ -DB.prototype.syncRewind = function(block, done) { +DB.prototype.getTipOperation = function(block, add) { + var heightBuffer = new Buffer(4); + var tipData; - var self = this; - - self.findCommonAncestor(block, function(err, ancestorHash) { - if (err) { - return done(err); - } - log.warn('Reorg common ancestor found:', ancestorHash); - // Rewind the chain to the common ancestor - async.whilst( - function() { - // Wait until the tip equals the ancestor hash - return self.tip.hash !== ancestorHash; - }, - function(removeDone) { - - var tip = self.tip; - - // TODO: expose prevHash as a string from bitcore - var prevHash = BufferUtil.reverse(tip.header.prevHash).toString('hex'); - - self.getBlock(prevHash, function(err, previousTip) { - if (err) { - removeDone(err); - } - - // Undo the related indexes for this block - self.disconnectBlock(tip, function(err) { - if (err) { - return removeDone(err); - } - - // Set the new tip - previousTip.__height = self.tip.__height - 1; - self.tip = previousTip; - self.emit('removeblock', tip); - removeDone(); - }); - - }); - - }, done - ); - }); -}; - -DB.prototype._concurrentSync = function(block, callback) { - var self = this; - console.log('concurrentSync'); - - console.log('Processing ' + blocks.length + ' blocks'); - var operations = []; - async.each(blocks, function(block, next) { - self.runAllConcurrentBlockHandlers(block, true, function(err, ops) { - if(err) { - return next(err); - } - - operations = operations.concat(ops); - next(); - }); - }, function(err) { - if(err) { - return done(err); - } - - var newHeight = self.concurrentHeight + blocks.length; - - // push concurrent height - var heightBuffer = new Buffer(4); - - heightBuffer.writeUInt32BE(newHeight); - - operations.push({ - type: 'put', - key: self.dbPrefix + 'concurrentHeight', - value: heightBuffer - }); - - log.debug('Updating the database with operations', operations); - self.store.batch(operations, function(err) { - if(err) { - return done(err); - } - - self.concurrentHeight += blocks.length; - done(); - }); - }); -}; - -DB.prototype._serialSync = function(block, callback) { - console.log('serialSync'); - var self = this; - - // Create indexes - self.connectBlock(block, function(err) { - if (err) { - return callback(err); - } - self.tip = block; - log.debug('Chain added block to main chain'); - self.emit('addblock', block); - callback(); - }); -}; - -DB.prototype.sync = function() { - var self = this; - - if (this.bitcoindSyncing || this.node.stopping || !this.tip) { - return; + if(add) { + heightBuffer.writeUInt32BE(block.__height); + tipData = Buffer.concat([new Buffer(block.hash, 'hex'), heightBuffer]); + } else { + heightBuffer.writeUInt32BE(block.__height - 1); + tipData = Buffer.concat([BufferUtil.reverse(block.header.prevHash), heightBuffer]); } - this.bitcoindSyncing = true; - - // keep n blocks in block buffer - // when we run out get more from bitcoind - - var lastHeight = self.tip.__height; - - var blocks = {}; - - var syncError = null; - - async.whilst(function() { - return lastHeight < self.node.services.bitcoind.height && !self.node.stopping && !syncError; - }, function(done) { - var blockCount = Math.min(self.node.services.bitcoind.height - lastHeight, 30 - Object.keys(blocks).length, 5); - - if(!blockCount) { - return setTimeout(done, 1000); - } - - async.times(blockCount, function(n, next) { - var height = lastHeight + n + 1; - self.node.services.bitcoind.getBlock(height, function(err, block) { - if(err) { - return next(err); - } - - blocks[height] = block; - - next(); - }); - }, function(err) { - if(err) { - return done(err); - } - - for(var i = 0; i < blockCount.length; i++) { - var block = blocks[lastHeight + i + 1]; - block.__height = lastHeight + i + 1; - - self._concurrentSync(block, function(err) { - if(err) { - syncError = err; - return; - } - - self._serialSync(block, function(err) { - if(err) { - syncError = err; - return; - } - - delete blocks[lastHeight + i + 1]; - - if(self.tip.__height === self.node.services.bitcoind.height) { - self.bitcoindSyncing = false; - if(self.isSynced()) { - self.node.emit('synced'); - } - } - }); - }); - } - - lastHeight += blockCount; - done(); - }); - }, function(err) { - if (err) { - Error.captureStackTrace(err); - self.node.emit('error', err); - return; - } - - if(syncError) { - Error.captureStackTrace(syncError); - self.node.emit('error', syncError); - return; - } - - if(self.node.stopping) { - self.bitcoindSyncing = false; - } - }); + return { + type: 'put', + key: this.dbPrefix + 'tip', + value: tipData + }; }; -DB.prototype.isSynced = function() { - return self.bitcoind.isSynced() && self.tip.__height === self.node.services.bitcoind.height; -}; +DB.prototype.getConcurrentTipOperation = function(block, add) { + var heightBuffer = new Buffer(4); + var tipData; -DB.prototype._getBitcoindBlocks = function() { - var self = this; - console.log('getBitcoindBlocks'); - - // keep n blocks in block buffer - // when we run out get more from bitcoind - - var lastHeight = self.tip.__height; - - async.whilst(function() { - return lastHeight < self.node.services.bitcoind.height; - }, function(done) { - var blockCount = Math.min(self.node.services.bitcoind.height - lastHeight, 20 - Object.keys(self.syncBuffer).length); - - if(!blockCount) { - return setTimeout(done, 1000); - } - async.timesLimit(blockCount, self.concurrentBlocks, function(n, next) { - var height = lastHeight + n + 1; - self.node.services.bitcoind.getBlock(height, function(err, block) { - if(err) { - return next(err); - } - - self.concurrentSyncBuffer.push(block); - self.syncBuffer[height] = block; - - next(); - }); - }, function(err) { - if(err) { - return done(err); - } - - lastHeight += blockCount; - done(); - }); - }, function(err) { - console.log('completed', err); - if (err) { - Error.captureStackTrace(err); - return self.node.emit('error', err); - } - }); -}; - -/** - * This function will synchronize additional indexes for the chain based on - * the current active chain in the bitcoin daemon. In the event that there is - * a reorganization in the daemon, the chain will rewind to the last common - * ancestor and then resume syncing. - */ -DB.prototype.syncOld = function() { - var self = this; - - if (self.bitcoindSyncing || self.node.stopping || !self.tip) { - return; + if(add) { + heightBuffer.writeUInt32BE(block.__height); + tipData = Buffer.concat([new Buffer(block.hash, 'hex'), heightBuffer]); + } else { + heightBuffer.writeUInt32BE(block.__height - 1); + tipData = Buffer.concat([BufferUtil.reverse(block.header.prevHash), heightBuffer]); } - self.bitcoindSyncing = true; - - var height; - - async.whilst(function() { - height = self.tip.__height; - return height < self.node.services.bitcoind.height && !self.node.stopping; - }, function(done) { - console.log('fetching block ' + (height + 1)); - self.node.services.bitcoind.getBlock(height + 1, function(err, block) { - if (err) { - return done(err); - } - - // TODO: expose prevHash as a string from bitcore - var prevHash = BufferUtil.reverse(block.header.prevHash).toString('hex'); - - if (prevHash === self.tip.hash) { - - // This block appends to the current chain tip and we can - // immediately add it to the chain and create indexes. - - // Populate height - block.__height = self.tip.__height + 1; - - // Create indexes - self.connectBlock(block, function(err) { - if (err) { - return done(err); - } - self.tip = block; - log.debug('Chain added block to main chain'); - self.emit('addblock', block); - setImmediate(done); - }); - } else { - // This block doesn't progress the current tip, so we'll attempt - // to rewind the chain to the common ancestor of the block and - // then we can resume syncing. - log.warn('Beginning reorg! Current tip: ' + self.tip.hash + '; New tip: ' + block.hash); - self.syncRewind(block, function(err) { - if(err) { - return done(err); - } - - log.warn('Reorg complete. New tip is ' + self.tip.hash); - done(); - }); - } - }); - }, function(err) { - if (err) { - Error.captureStackTrace(err); - return self.node.emit('error', err); - } - - if(self.node.stopping) { - self.bitcoindSyncing = false; - return; - } - - if (self.node.services.bitcoind.isSynced()) { - self.bitcoindSyncing = false; - self.node.emit('synced'); - } else { - self.bitcoindSyncing = false; - } - - }); - + return { + type: 'put', + key: this.dbPrefix + 'concurrentTip', + value: tipData + }; }; + + DB.prototype.getPrefix = function(service, callback) { var self = this; diff --git a/lib/services/db/sync.js b/lib/services/db/sync.js index e5ed75cd..5d7c6069 100644 --- a/lib/services/db/sync.js +++ b/lib/services/db/sync.js @@ -5,11 +5,14 @@ var Transform = require('stream').Transform; var inherits = require('util').inherits; var EventEmitter = require('events').EventEmitter; var async = require('async'); +var bitcore = require('bitcore-lib'); +var BufferUtil = bitcore.util.buffer; -function Sync(node) { +function Sync(node, db) { this.node = node; - this.db = node.db; + this.db = db; this.syncing = false; + this.highWaterMark = 100; } inherits(Sync, EventEmitter); @@ -23,11 +26,11 @@ Sync.prototype.initialSync = function() { } this.syncing = true; - this.blockStream = new BlockStream(this.node.services.bitcoind, this.db.tip); - var processConcurrent = new ProcessConcurrent(this.db); - var processSerial = new ProcessSerial(this.db); - var writeStream1 = new WriteStream(this.db); - var writeStream2 = new WriteStream(this.db); + this.blockStream = new BlockStream(this.highWaterMark, this.node.services.bitcoind, this.db.tip.__height); + var processConcurrent = new ProcessConcurrent(this.highWaterMark, this.db); + var processSerial = new ProcessSerial(this.highWaterMark, this.db, this.db.tip); + var writeStream1 = new WriteStream(this.highWaterMark, this.db); + var writeStream2 = new WriteStream(this.highWaterMark, this.db); var start = Date.now(); @@ -90,7 +93,7 @@ Sync.prototype.sync = function() { Sync.prototype.stop = function() { if(this.blockStream) { - this.blockStream.destroy(); + this.blockStream.stopping = true; } }; @@ -108,49 +111,132 @@ Sync.prototype._handleErrors = function(stream) { }); }; -function BlockStream(bitcoind, lastHeight) { - Readable.call(this, {objectMode: true, highWaterMark: 10}); +function BlockStream(highWaterMark, bitcoind, lastHeight) { + Readable.call(this, {objectMode: true, highWaterMark: highWaterMark}); this.bitcoind = bitcoind; this.lastHeight = lastHeight; + this.stopping = false; + this.queue = []; + this.processing = false; } inherits(BlockStream, Readable); +// BlockStream.prototype._read = function() { +// var self = this; + +// // TODO does not work :( + +// var blockCount = Math.min(self.bitcoind.height - self.lastHeight, 5); + +// if(blockCount <= 0 || this.stopping) { +// return self.push(null); +// } + +// console.log('Fetching blocks ' + (self.lastHeight + 1) + ' to ' + (self.lastHeight + blockCount)); + +// async.times(blockCount, function(n, next) { +// var height = self.lastHeight + n + 1; +// self.bitcoind.getBlock(height, function(err, block) { +// if(err) { +// return next(err); +// } + +// block.__height = height; + +// next(null, block); +// }); +// }, function(err, blocks) { +// if(err) { +// return self.emit('error', err); +// } + +// for(var i = 0; i < blocks.length; i++) { +// self.push(blocks[i]); +// } + +// self.lastHeight += blocks.length; +// }); +// }; + +// BlockStream.prototype._read = function() { +// var self = this; + +// self.lastHeight++; +// //console.log('Fetching block ' + self.lastHeight); + +// var height = self.lastHeight; + +// self.bitcoind.getBlock(height, function(err, block) { +// if(err) { +// return self.emit(err); +// } + +// block.__height = height; + +// //console.log('pushing block ' + block.__height); +// self.push(block); +// }); +// }; + BlockStream.prototype._read = function() { - var self = this; + this.lastHeight++; + this.queue.push(this.lastHeight); - var blockCount = Math.min(self.bitcoind.height - self.lastHeight, 5); - - if(blockCount <= 0) { - return self.push(null); - } - - console.log('Fetching blocks ' + (self.lastHeight + 1) + ' to ' (self.lastHeight + blockCount.length)); - - async.times(blockCount, function(n, next) { - var height = self.lastHeight + n + 1; - self.bitcoind.getBlock(height, function(err, block) { - if(err) { - return next(err); - } - - next(null, block); - }); - }, function(err, blocks) { - if(err) { - return self.emit('error', err); - } - - for(var i = 0; i < blocks.length; i++) { - self.push(blocks[i]); - } - - self.lastHeight += blocks.length; - }); + this._process(); }; -function ProcessSerial(db, tip) { - Transform.call(this, {objectMode: true, highWaterMark: 10}); +BlockStream.prototype._process = function() { + var self = this; + + if(this.processing) { + return; + } + + this.processing = true; + + async.whilst( + function() { + return self.queue.length; + }, function(next) { + var heights = self.queue.slice(0, Math.min(5, self.queue.length)); + self.queue = self.queue.slice(heights.length); + + //console.log('fetching blocks ' + heights[0] + ' to ' + heights[heights.length - 1]); + + async.map(heights, function(height, next) { + self.bitcoind.getBlock(height, function(err, block) { + if(err) { + return next(err); + } + + block.__height = height; + + next(null, block); + }); + }, function(err, blocks) { + if(err) { + return next(err); + } + + for(var i = 0; i < blocks.length; i++) { + self.push(blocks[i]); + } + + next(); + }); + }, function(err) { + if(err) { + return self.emit('error', err); + } + + self.processing = false; + } + ); +} + +function ProcessSerial(highWaterMark, db, tip) { + Transform.call(this, {objectMode: true, highWaterMark: highWaterMark}); this.db = db; this.tip = tip; } @@ -160,8 +246,10 @@ inherits(ProcessSerial, Transform); ProcessSerial.prototype._transform = function(block, enc, callback) { var self = this; + //console.log('serial', block.__height); + var prevHash = BufferUtil.reverse(block.header.prevHash).toString('hex'); - if(prevHash !== self.tip) { + if(prevHash !== self.tip.hash) { var err = new Error('Reorg detected'); err.reorg = true; return callback(err); @@ -169,30 +257,37 @@ ProcessSerial.prototype._transform = function(block, enc, callback) { async.whilst( function() { - return self.db.concurrentHeight < block.__height; + return self.db.concurrentTip.__height < block.__height; }, function(next) { - setTimeout(next, 1000); + // wait until concurrent handler is ahead of us + setTimeout(next, 10); }, function() { - var operations = [{index1: block.height}, {index2: block.height}]; - setTimeout(function() { + self.db.getSerialBlockOperations(block, true, function(err, operations) { + if(err) { + return callback(err); + } + var obj = { - tipHeight: block.height, + tip: block, operations: operations }; + self.tip = block; + callback(null, obj); - }, 100); + }); } ); }; -function ProcessConcurrent(db) { - Transform.call(this, {objectMode: true, highWaterMark: 10}); +function ProcessConcurrent(highWaterMark, db) { + Transform.call(this, {objectMode: true, highWaterMark: highWaterMark}); this.db = db; this.operations = []; - this.lastHeight = 0; + this.lastBlock = 0; + this.blockCount = 0; }; inherits(ProcessConcurrent, Transform); @@ -200,22 +295,26 @@ inherits(ProcessConcurrent, Transform); ProcessConcurrent.prototype._transform = function(block, enc, callback) { var self = this; - this.lastHeight = block.__height; + //console.log('concurrent', block.__height); - self.db.runAllConcurrentBlockHandlers(block, true, function(err, operations) { + this.lastBlock = block; + + self.db.getConcurrentBlockOperations(block, true, function(err, operations) { if(err) { return callback(err); } + self.blockCount++; self.operations = self.operations.concat(operations); - if(self.operations >= 100) { - // TODO add tip operation + if(self.blockCount >= 1) { //self.operations.length >= 100) { + self.operations.push(self.db.getConcurrentTipOperation(block, true)); var obj = { - concurrentTipHeight: block.__height, + concurrentTip: block, operations: self.operations } self.operations = []; + self.blockCount = 0; return callback(null, obj); } @@ -226,9 +325,9 @@ ProcessConcurrent.prototype._transform = function(block, enc, callback) { ProcessConcurrent.prototype._flush = function(callback) { if(this.operations.length) { - // TODO add tip operation + this.operations.push(this.db.getConcurrentTipOperation(this.lastBlock, true)); var obj = { - concurrentTipHeight: this.lastHeight, + concurrentTipHeight: this.lastBlock, operations: this.operations }; @@ -237,30 +336,40 @@ ProcessConcurrent.prototype._flush = function(callback) { } }; -function WriteStream(db) { - Writable.call(this, {objectMode: true, highWaterMark: 10}); +function WriteStream(highWaterMark, db) { + Writable.call(this, {objectMode: true, highWaterMark: highWaterMark}); this.db = db; this.writeTime = 0; + this.lastConcurrentOutputHeight = 0; } inherits(WriteStream, Writable); WriteStream.prototype._write = function(obj, enc, callback) { var self = this; - setTimeout(function() { - console.log('WriteStreamSlow block ', operations.concurrentTipHeight); - self.writeTime += 2000; + + self.db.store.batch(obj.operations, function(err) { + if(err) { + return callback(err); + } if(obj.tip) { self.db.tip = obj.tip; + if(self.db.tip.__height % 100 === 0) { + console.log('Tip:', self.db.tip.__height); + } } if(obj.concurrentTip) { self.db.concurrentTip = obj.concurrentTip; + if(self.db.concurrentTip.__height - self.lastConcurrentOutputHeight >= 100) { + console.log('Concurrent tip:', self.db.concurrentTip.__height); + self.lastConcurrentOutputHeight = self.db.concurrentTip.__height; + } } callback(); - }, 2000); + }); }; module.exports = Sync; \ No newline at end of file From df6cfeb1646faba30e39baf2632c3514ba8e5b3d Mon Sep 17 00:00:00 2001 From: Patrick Nagurny Date: Thu, 26 Jan 2017 18:17:36 -0500 Subject: [PATCH 05/11] add delay to make bitcoind not pause --- lib/services/db/sync.js | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/services/db/sync.js b/lib/services/db/sync.js index 5d7c6069..c1d30b0b 100644 --- a/lib/services/db/sync.js +++ b/lib/services/db/sync.js @@ -212,7 +212,11 @@ BlockStream.prototype._process = function() { block.__height = height; - next(null, block); + setTimeout(function() { + // we're getting blocks from bitcoind too fast? + // it pauses at 8100 + next(null, block); + }, 1); }); }, function(err, blocks) { if(err) { From e26aec9402eb8b7233c510377218d737ead70e6e Mon Sep 17 00:00:00 2001 From: Patrick Nagurny Date: Thu, 26 Jan 2017 18:18:57 -0500 Subject: [PATCH 06/11] stream test --- contrib/streamtest.js | 149 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 149 insertions(+) create mode 100644 contrib/streamtest.js diff --git a/contrib/streamtest.js b/contrib/streamtest.js new file mode 100644 index 00000000..937ab5e3 --- /dev/null +++ b/contrib/streamtest.js @@ -0,0 +1,149 @@ +'use strict' +var Readable = require('stream').Readable; +var Writable = require('stream').Writable; +var Transform = require('stream').Transform; +var inherits = require('util').inherits; +var async = require('async'); + +function main() { + var blockStream = new BlockStream(); + var processConcurrent = new ProcessConcurrent(); + var processSerial = new ProcessSerial(); + var writeStreamFast = new WriteStreamFast(); + var writeStreamSlow = new WriteStreamSlow(); + + var start = Date.now(); + + writeStreamFast.on('finish', function() { + var end = Date.now(); + console.log('Total time: ', (end - start) + ' ms'); + console.log('Concurrent write time: ', writeStreamSlow.writeTime + ' ms'); + console.log('Serial write time: ', writeStreamFast.writeTime + ' ms'); + }); + + blockStream + .pipe(processConcurrent) + .pipe(writeStreamSlow); + + blockStream + .pipe(processSerial) + .pipe(writeStreamFast); +} + +function BlockStream() { + Readable.call(this, {objectMode: true, highWaterMark: 10}); + this.height = 0; +} + +inherits(BlockStream, Readable); + +BlockStream.prototype._read = function() { + var self = this; + console.log('_read'); + + setTimeout(function() { + self.height++; + if(self.height > 40) { + self.push(null); + return; + } + + console.log('ReadStream block ', self.height); + console.log(self.push({height: self.height})); + }, 500); +}; + +function ProcessSerial() { + Transform.call(this, {objectMode: true, highWaterMark: 10}); +} + +inherits(ProcessSerial, Transform); + +ProcessSerial.prototype._transform = function(block, enc, callback) { + var operations = [{index1: block.height}, {index2: block.height}]; + setTimeout(function() { + var obj = { + tipHeight: block.height, + operations: operations + }; + + callback(null, obj); + }, 100); +}; + +function ProcessConcurrent() { + Transform.call(this, {objectMode: true, highWaterMark: 10}); + this.operations = []; + this.lastHeight = 0; +}; + +inherits(ProcessConcurrent, Transform); + +ProcessConcurrent.prototype._transform = function(block, enc, callback) { + var self = this; + + self.lastHeight = block.height; + + setTimeout(function() { + self.operations = self.operations.concat([{index3: block.height}, {index4: block.height}]); + + console.log(self.operations.length); + if(self.operations.length >= 10) { + var obj = { + concurrentTipHeight: self.lastHeight, + operations: self.operations + }; + self.operations = []; + + return callback(null, obj); + } + + callback(); + }, 100); +}; + +ProcessConcurrent.prototype._flush = function(callback) { + if(this.operations.length) { + var obj = { + concurrentTipHeight: this.lastHeight, + operations: this.operations + }; + + this.operations = []; + return callback(null, operations); + } +}; + +function WriteStreamSlow() { + Writable.call(this, {objectMode: true, highWaterMark: 10}); + this.writeTime = 0; +} + +inherits(WriteStreamSlow, Writable); + +WriteStreamSlow.prototype._write = function(operations, enc, callback) { + var self = this; + setTimeout(function() { + console.log('WriteStreamSlow block ', operations.concurrentTipHeight); + self.writeTime += 2000; + callback(); + }, 2000); +}; + +function WriteStreamFast() { + Writable.call(this, {objectMode: true, highWaterMark: 1}); + this.writeTime = 0; +} + +inherits(WriteStreamFast, Writable); + +WriteStreamFast.prototype._write = function(operations, enc, callback) { + var self = this; + setTimeout(function() { + console.log('WriteStreamFast block ', operations.tipHeight); + self.writeTime += 1000; + callback(); + }, 1000); +}; + +main(); \ No newline at end of file From 68973b4c85d9bf58648ff3370091fa713cd19483 Mon Sep 17 00:00:00 2001 From: Patrick Nagurny Date: Fri, 27 Jan 2017 10:15:34 -0500 Subject: [PATCH 07/11] get tip operation in serial sync --- lib/services/db/sync.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/services/db/sync.js b/lib/services/db/sync.js index c1d30b0b..e668393e 100644 --- a/lib/services/db/sync.js +++ b/lib/services/db/sync.js @@ -273,6 +273,8 @@ ProcessSerial.prototype._transform = function(block, enc, callback) { return callback(err); } + operations.push(self.db.getTipOperation(block, true)); + var obj = { tip: block, operations: operations From c5875332d47ca0fe929e6a9050900cd56e3111a0 Mon Sep 17 00:00:00 2001 From: Patrick Nagurny Date: Fri, 27 Jan 2017 14:16:44 -0500 Subject: [PATCH 08/11] remove serial sync timeout --- lib/services/db/sync.js | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/lib/services/db/sync.js b/lib/services/db/sync.js index e668393e..65ea54bb 100644 --- a/lib/services/db/sync.js +++ b/lib/services/db/sync.js @@ -261,11 +261,23 @@ ProcessSerial.prototype._transform = function(block, enc, callback) { async.whilst( function() { - return self.db.concurrentTip.__height < block.__height; + return self.db.concurrentTip.__height < block.__height || self.db.tip.__height < block.__height - 1; }, function(next) { + var nextCalled = false; // wait until concurrent handler is ahead of us - setTimeout(next, 10); + self.db.once('addblock', function() { + if(!nextCalled) { + next(); + } + nextCalled = true; + }); + self.db.once('concurrentaddblock', function() { + if(!nextCalled) { + next(); + } + nextCalled = true; + }); }, function() { self.db.getSerialBlockOperations(block, true, function(err, operations) { @@ -361,6 +373,7 @@ WriteStream.prototype._write = function(obj, enc, callback) { if(obj.tip) { self.db.tip = obj.tip; + self.db.emit('addblock'); if(self.db.tip.__height % 100 === 0) { console.log('Tip:', self.db.tip.__height); } @@ -368,6 +381,7 @@ WriteStream.prototype._write = function(obj, enc, callback) { if(obj.concurrentTip) { self.db.concurrentTip = obj.concurrentTip; + self.db.emit('concurrentaddblock'); if(self.db.concurrentTip.__height - self.lastConcurrentOutputHeight >= 100) { console.log('Concurrent tip:', self.db.concurrentTip.__height); self.lastConcurrentOutputHeight = self.db.concurrentTip.__height; From fae38b1ee8bc09b3b6653ed72a1a48a4f27f8206 Mon Sep 17 00:00:00 2001 From: Patrick Nagurny Date: Fri, 27 Jan 2017 14:49:03 -0500 Subject: [PATCH 09/11] try this --- lib/services/db/sync.js | 77 ++++++++++++++++++++++------------------- 1 file changed, 41 insertions(+), 36 deletions(-) diff --git a/lib/services/db/sync.js b/lib/services/db/sync.js index 65ea54bb..1a96ca16 100644 --- a/lib/services/db/sync.js +++ b/lib/services/db/sync.js @@ -259,45 +259,50 @@ ProcessSerial.prototype._transform = function(block, enc, callback) { return callback(err); } - async.whilst( - function() { - return self.db.concurrentTip.__height < block.__height || self.db.tip.__height < block.__height - 1; - }, - function(next) { - var nextCalled = false; - // wait until concurrent handler is ahead of us - self.db.once('addblock', function() { - if(!nextCalled) { - next(); - } - nextCalled = true; - }); - self.db.once('concurrentaddblock', function() { - if(!nextCalled) { - next(); - } - nextCalled = true; - }); - }, - function() { - self.db.getSerialBlockOperations(block, true, function(err, operations) { - if(err) { - return callback(err); - } + if(check) { + return self._process(block, callback); + } - operations.push(self.db.getTipOperation(block, true)); + var processed = false; - var obj = { - tip: block, - operations: operations - }; - - self.tip = block; - - callback(null, obj); - }); + self.db.on('addblock', function() { + if(!processed) { + processed = true; + return self._process(block, callback); } - ); + }); + + self.db.on('concurrentblock', function() { + if(!processed) { + processed = true; + return self._process(block, callback); + } + }); + + function check() { + return self.db.concurrentTip.__height < block.__height || self.db.tip.__height < block.__height - 1; + } +}; + +ProcessSerial.prototype._process = function(block, callback) { + var self = this; + + self.db.getSerialBlockOperations(block, true, function(err, operations) { + if(err) { + return callback(err); + } + + operations.push(self.db.getTipOperation(block, true)); + + var obj = { + tip: block, + operations: operations + }; + + self.tip = block; + + callback(null, obj); + }); }; function ProcessConcurrent(highWaterMark, db) { From 4b51bc01436414fb214d6764a1ec7a3e52f92013 Mon Sep 17 00:00:00 2001 From: Patrick Nagurny Date: Fri, 27 Jan 2017 17:02:38 -0500 Subject: [PATCH 10/11] make ProcessSerial a Writable stream --- lib/services/db/sync.js | 80 +++++++++++++++++------------------------ 1 file changed, 33 insertions(+), 47 deletions(-) diff --git a/lib/services/db/sync.js b/lib/services/db/sync.js index 1a96ca16..ed9a5348 100644 --- a/lib/services/db/sync.js +++ b/lib/services/db/sync.js @@ -28,35 +28,27 @@ Sync.prototype.initialSync = function() { this.syncing = true; this.blockStream = new BlockStream(this.highWaterMark, this.node.services.bitcoind, this.db.tip.__height); var processConcurrent = new ProcessConcurrent(this.highWaterMark, this.db); + var writeStream = new WriteStream(this.highWaterMark, this.db); var processSerial = new ProcessSerial(this.highWaterMark, this.db, this.db.tip); - var writeStream1 = new WriteStream(this.highWaterMark, this.db); - var writeStream2 = new WriteStream(this.highWaterMark, this.db); var start = Date.now(); this._handleErrors(this.blockStream); this._handleErrors(processConcurrent); this._handleErrors(processSerial); - this._handleErrors(writeStream1); - this._handleErrors(writeStream2); - - writeStream2.on('finish', function() { - var end = Date.now(); - console.log('Total time: ', (end - start) + ' ms'); - console.log('Concurrent write time: ', writeStreamSlow.writeTime + ' ms'); - console.log('Serial write time: ', writeStreamFast.writeTime + ' ms'); + this._handleErrors(writeStream); + processSerial.on('finish', function() { self.syncing = false; self.emit('initialsync'); }); this.blockStream .pipe(processConcurrent) - .pipe(writeStream1); + .pipe(writeStream); this.blockStream - .pipe(processSerial) - .pipe(writeStream2); + .pipe(processSerial); }; // Get concurrent and serial block operations and write them together block by block @@ -240,14 +232,14 @@ BlockStream.prototype._process = function() { } function ProcessSerial(highWaterMark, db, tip) { - Transform.call(this, {objectMode: true, highWaterMark: highWaterMark}); + Writable.call(this, {objectMode: true, highWaterMark: highWaterMark}); this.db = db; this.tip = tip; } -inherits(ProcessSerial, Transform); +inherits(ProcessSerial, Writable); -ProcessSerial.prototype._transform = function(block, enc, callback) { +ProcessSerial.prototype._write = function(block, enc, callback) { var self = this; //console.log('serial', block.__height); @@ -259,28 +251,20 @@ ProcessSerial.prototype._transform = function(block, enc, callback) { return callback(err); } - if(check) { + if(check()) { return self._process(block, callback); } - var processed = false; - - self.db.on('addblock', function() { - if(!processed) { - processed = true; - return self._process(block, callback); - } - }); - - self.db.on('concurrentblock', function() { - if(!processed) { - processed = true; - return self._process(block, callback); + self.db.once('concurrentblock', function() { + if(!check()) { + var err = new Error('Concurrent block ' + self.db.concurrentTip.__height + ' is less than ' + block.__height); + return self.emit('error', err); } + self._process(block, callback); }); function check() { - return self.db.concurrentTip.__height < block.__height || self.db.tip.__height < block.__height - 1; + return self.db.concurrentTip.__height < block.__height; } }; @@ -301,7 +285,19 @@ ProcessSerial.prototype._process = function(block, callback) { self.tip = block; - callback(null, obj); + self.db.store.batch(obj.operations, function(err) { + if(err) { + return callback(err); + } + + self.db.tip = block; + self.db.emit('addblock'); + if(self.db.tip.__height % 100 === 0) { + console.log('Tip:', self.db.tip.__height); + } + + callback(); + }); }); }; @@ -376,21 +372,11 @@ WriteStream.prototype._write = function(obj, enc, callback) { return callback(err); } - if(obj.tip) { - self.db.tip = obj.tip; - self.db.emit('addblock'); - if(self.db.tip.__height % 100 === 0) { - console.log('Tip:', self.db.tip.__height); - } - } - - if(obj.concurrentTip) { - self.db.concurrentTip = obj.concurrentTip; - self.db.emit('concurrentaddblock'); - if(self.db.concurrentTip.__height - self.lastConcurrentOutputHeight >= 100) { - console.log('Concurrent tip:', self.db.concurrentTip.__height); - self.lastConcurrentOutputHeight = self.db.concurrentTip.__height; - } + self.db.concurrentTip = obj.concurrentTip; + self.db.emit('concurrentaddblock'); + if(self.db.concurrentTip.__height - self.lastConcurrentOutputHeight >= 100) { + console.log('Concurrent tip:', self.db.concurrentTip.__height); + self.lastConcurrentOutputHeight = self.db.concurrentTip.__height; } callback(); From 8511e2f31b19f99b91aa3219786cbcec636be0f4 Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Mon, 30 Jan 2017 14:21:22 -0500 Subject: [PATCH 11/11] Fixed event name. --- lib/services/db/sync.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/services/db/sync.js b/lib/services/db/sync.js index ed9a5348..c2a0b24d 100644 --- a/lib/services/db/sync.js +++ b/lib/services/db/sync.js @@ -52,7 +52,7 @@ Sync.prototype.initialSync = function() { }; // Get concurrent and serial block operations and write them together block by block -// Useful for when we are fully synced and we want to advance the concurrentTip and +// Useful for when we are fully synced and we want to advance the concurrentTip and // the tip together Sync.prototype.sync = function() { if(this.syncing) { @@ -255,7 +255,7 @@ ProcessSerial.prototype._write = function(block, enc, callback) { return self._process(block, callback); } - self.db.once('concurrentblock', function() { + self.db.once('concurrentaddblock', function() { if(!check()) { var err = new Error('Concurrent block ' + self.db.concurrentTip.__height + ' is less than ' + block.__height); return self.emit('error', err); @@ -383,4 +383,4 @@ WriteStream.prototype._write = function(obj, enc, callback) { }); }; -module.exports = Sync; \ No newline at end of file +module.exports = Sync;