From 26108753dbb2cc4508ac3ed447ee1fbd9bebc880 Mon Sep 17 00:00:00 2001 From: Patrick Nagurny Date: Thu, 26 Jan 2017 18:09:36 -0500 Subject: [PATCH] got it working --- lib/services/address/index.js | 2 +- lib/services/{db.js => db/index.js} | 615 +++++++--------------------- lib/services/db/sync.js | 237 ++++++++--- 3 files changed, 311 insertions(+), 543 deletions(-) rename lib/services/{db.js => db/index.js} (58%) diff --git a/lib/services/address/index.js b/lib/services/address/index.js index c764bf25..eea2acd5 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -134,7 +134,7 @@ AddressService.prototype.getPublishEvents = function() { * @param {Boolean} addOutput - If the block is being removed or added to the chain * @param {Function} callback */ -AddressService.prototype.blockHandler = function(block, connectBlock, callback) { +AddressService.prototype.concurrentBlockHandler = function(block, connectBlock, callback) { var self = this; var txs = block.transactions; diff --git a/lib/services/db.js b/lib/services/db/index.js similarity index 58% rename from lib/services/db.js rename to lib/services/db/index.js index f7dc6a6d..ea34b4e1 100644 --- a/lib/services/db.js +++ b/lib/services/db/index.js @@ -11,11 +11,11 @@ var BufferUtil = bitcore.util.buffer; var Networks = bitcore.Networks; var Block = bitcore.Block; var $ = bitcore.util.preconditions; -var index = require('../'); +var index = require('../../'); var errors = index.errors; var log = index.log; -var Transaction = require('../transaction'); -var Service = require('../service'); +var Transaction = require('../../transaction'); +var Service = require('../../service'); var Sync = require('./sync'); /** @@ -67,7 +67,7 @@ function DB(options) { block: [] }; - this._sync = new Sync(this.node); + this._sync = new Sync(this.node, this); } util.inherits(DB, Service); @@ -165,7 +165,7 @@ DB.prototype.start = function(callback) { this.node.services.bitcoind.on('tx', this.transactionHandler.bind(this)); this._sync.on('error', function(err) { - log.err(err); + log.error(err); }); this._sync.on('reorg', function() { @@ -206,7 +206,7 @@ DB.prototype.start = function(callback) { return callback(err); } - self.loadConcurrencySyncHeight(callback); + self.loadConcurrentTip(callback); }); }); }; @@ -336,28 +336,52 @@ DB.prototype.loadTip = function(callback) { }); }; -DB.prototype.loadConcurrencySyncHeight = function(callback) { +DB.prototype.loadConcurrentTip = function(callback) { var self = this; - console.log('loadConcurrencySyncHeight'); var options = { keyEncoding: 'string', valueEncoding: 'binary' }; - self.store.get(self.dbPrefix + 'concurrentHeight', options, function(err, heightBuffer) { - if(err) { - if(err.notFound) { - console.log('setting to 0'); - self.concurrentHeight = 0; - return callback(); - } + self.store.get(self.dbPrefix + 'concurrentTip', options, function(err, tipData) { + if(err && err instanceof levelup.errors.NotFoundError) { + self.concurrentTip = self.genesis; + self.concurrentTip.__height = 0; + return; + } else if(err) { return callback(err); } + var hash = tipData.slice(0, 32).toString('hex'); + var height = tipData.readUInt32BE(32); - self.concurrentHeight = heightBuffer.readUInt32BE(); - callback(); + var times = 0; + async.retry({times: 3, interval: self.retryInterval}, function(done) { + self.getBlock(hash, function(err, concurrentTip) { + if(err) { + times++; + log.warn('Bitcoind does not have our concurrentTip (' + hash + '). Bitcoind may have crashed and needs to catch up.'); + if(times < 3) { + log.warn('Retrying in ' + (self.retryInterval / 1000) + ' seconds.'); + } + return done(err); + } + + done(null, concurrentTip); + }); + }, function(err, concurrentTip) { + if(err) { + log.warn('Giving up after 3 tries. Please report this bug to https://github.com/bitpay/bitcore-node/issues'); + log.warn('Please reindex your database.'); + return callback(err); + } + + concurrentTip.__height = height; + self.concurrentTip = concurrentTip; + + callback(); + }); }); }; @@ -496,8 +520,30 @@ DB.prototype.getPrevHash = function(blockHash, callback) { * @param {Function} callback */ DB.prototype.connectBlock = function(block, callback) { + var self = this; + log.debug('DB handling new chain block'); - this.runAllBlockHandlers(block, true, callback); + var operations = []; + self.getConcurrentBlockOperations(block, true, function(err, ops) { + if(err) { + return callback(err); + } + + operations = ops; + + self.getSerialBlockOperations(block, true, function(err, ops) { + if(err) { + return callback(err); + } + + operations = operations.concat(ops); + + operations.push(self.getTipOperation(block, true)); + operations.push(self.getConcurrentTipOperation(block, true)); + + self.store.batch(operations, callback); + }); + }); }; /** @@ -506,94 +552,41 @@ DB.prototype.connectBlock = function(block, callback) { * @param {Function} callback */ DB.prototype.disconnectBlock = function(block, callback) { - log.debug('DB removing chain block'); - this.runAllBlockHandlers(block, false, callback); -}; - -DB.prototype.getTipOperation = function(add) { - if(add) { - heightBuffer.writeUInt32BE(block.__height); - tipData = Buffer.concat([new Buffer(block.hash, 'hex'), heightBuffer]); - } else { - heightBuffer.writeUInt32BE(block.__height - 1); - tipData = Buffer.concat([BufferUtil.reverse(block.header.prevHash), heightBuffer]); - } -}; - -/** - * Will collect all database operations for a block from other services that implement - * `blockHandler` methods and then save operations to the database. - * @param {Block} block - The bitcore block - * @param {Boolean} add - If the block is being added/connected or removed/disconnected - * @param {Function} callback - */ -DB.prototype.runAllBlockHandlers = function(block, add, callback) { var self = this; + + log.debug('DB removing chain block'); var operations = []; + self.getConcurrentBlockOperations(block, false, function(err, ops) { + if(err) { + return callback(err); + } - // Notify block subscribers - // for (var i = 0; i < this.subscriptions.block.length; i++) { - // this.subscriptions.block[i].emit('db/block', block.hash); - // } + operations = ops; - // Update tip - var tipData = new Buffer(36); - var heightBuffer = new Buffer(4); - - if(add) { - heightBuffer.writeUInt32BE(block.__height); - tipData = Buffer.concat([new Buffer(block.hash, 'hex'), heightBuffer]); - } else { - heightBuffer.writeUInt32BE(block.__height - 1); - tipData = Buffer.concat([BufferUtil.reverse(block.header.prevHash), heightBuffer]); - } - - operations.push({ - type: 'put', - key: self.dbPrefix + 'tip', - value: tipData - }); - - async.eachSeries( - this.node.services, - function(mod, next) { - if(mod.blockHandler) { - $.checkArgument(typeof mod.blockHandler === 'function', 'blockHandler must be a function'); - - mod.blockHandler.call(mod, block, add, function(err, ops) { - if (err) { - return next(err); - } - if (ops) { - $.checkArgument(Array.isArray(ops), 'blockHandler for ' + mod.name + ' returned non-array'); - operations = operations.concat(ops); - } - next(); - }); - } else { - setImmediate(next); - } - }, - function(err) { - if (err) { + self.getSerialBlockOperations(block, false, function(err, ops) { + if(err) { return callback(err); } - log.debug('Updating the database with operations', operations); + operations = operations.concat(ops); + + operations.push(self.getTipOperation(block, false)); + operations.push(self.getConcurrentTipOperation(block, false)); + self.store.batch(operations, callback); - } - ); + }); + }); }; -DB.prototype.runAllConcurrentBlockHandlers = function(block, add, callback) { +DB.prototype.getConcurrentBlockOperations = function(block, add, callback) { var self = this; var operations = []; async.each( this.node.services, function(mod, next) { - if(mod.concurrenctBlockHandler) { - $.checkArgument(typeof mod.concurrenctBlockHandler === 'function', 'concurrentBlockHandler must be a function'); + if(mod.concurrentBlockHandler) { + $.checkArgument(typeof mod.concurrentBlockHandler === 'function', 'concurrentBlockHandler must be a function'); mod.concurrentBlockHandler.call(mod, block, add, function(err, ops) { if (err) { @@ -620,415 +613,81 @@ DB.prototype.runAllConcurrentBlockHandlers = function(block, add, callback) { ); }; -/** - * This function will find the common ancestor between the current chain and a forked block, - * by moving backwards on both chains until there is a meeting point. - * @param {Block} block - The new tip that forks the current chain. - * @param {Function} done - A callback function that is called when complete. - */ -DB.prototype.findCommonAncestor = function(block, done) { - +DB.prototype.getSerialBlockOperations = function(block, add, callback) { var self = this; + var operations = []; - var mainPosition = self.tip.hash; - var forkPosition = block.hash; + async.eachSeries( + this.node.services, + function(mod, next) { + if(mod.blockHandler) { + $.checkArgument(typeof mod.blockHandler === 'function', 'blockHandler must be a function'); - var mainHashesMap = {}; - var forkHashesMap = {}; + mod.blockHandler.call(mod, block, add, function(err, ops) { + if (err) { + return next(err); + } + if (ops) { + $.checkArgument(Array.isArray(ops), 'blockHandler for ' + mod.name + ' returned non-array'); + operations = operations.concat(ops); + } - mainHashesMap[mainPosition] = true; - forkHashesMap[forkPosition] = true; - - var commonAncestor = null; - - async.whilst( - function() { - return !commonAncestor; - }, - function(next) { - - if(mainPosition) { - var mainBlockIndex = self.node.services.bitcoind.getBlockIndex(mainPosition); - if(mainBlockIndex && mainBlockIndex.prevHash) { - mainHashesMap[mainBlockIndex.prevHash] = true; - mainPosition = mainBlockIndex.prevHash; - } else { - mainPosition = null; - } + next(); + }); + } else { + setImmediate(next); } - - if(forkPosition) { - var forkBlockIndex = self.node.services.bitcoind.getBlockIndex(forkPosition); - if(forkBlockIndex && forkBlockIndex.prevHash) { - forkHashesMap[forkBlockIndex.prevHash] = true; - forkPosition = forkBlockIndex.prevHash; - } else { - forkPosition = null; - } - } - - if(forkPosition && mainHashesMap[forkPosition]) { - commonAncestor = forkPosition; - } - - if(mainPosition && forkHashesMap[mainPosition]) { - commonAncestor = mainPosition; - } - - if(!mainPosition && !forkPosition) { - return next(new Error('Unknown common ancestor')); - } - - setImmediate(next); }, function(err) { - done(err, commonAncestor); + if (err) { + return callback(err); + } + + callback(null, operations); } ); }; -/** - * This function will attempt to rewind the chain to the common ancestor - * between the current chain and a forked block. - * @param {Block} block - The new tip that forks the current chain. - * @param {Function} done - A callback function that is called when complete. - */ -DB.prototype.syncRewind = function(block, done) { +DB.prototype.getTipOperation = function(block, add) { + var heightBuffer = new Buffer(4); + var tipData; - var self = this; - - self.findCommonAncestor(block, function(err, ancestorHash) { - if (err) { - return done(err); - } - log.warn('Reorg common ancestor found:', ancestorHash); - // Rewind the chain to the common ancestor - async.whilst( - function() { - // Wait until the tip equals the ancestor hash - return self.tip.hash !== ancestorHash; - }, - function(removeDone) { - - var tip = self.tip; - - // TODO: expose prevHash as a string from bitcore - var prevHash = BufferUtil.reverse(tip.header.prevHash).toString('hex'); - - self.getBlock(prevHash, function(err, previousTip) { - if (err) { - removeDone(err); - } - - // Undo the related indexes for this block - self.disconnectBlock(tip, function(err) { - if (err) { - return removeDone(err); - } - - // Set the new tip - previousTip.__height = self.tip.__height - 1; - self.tip = previousTip; - self.emit('removeblock', tip); - removeDone(); - }); - - }); - - }, done - ); - }); -}; - -DB.prototype._concurrentSync = function(block, callback) { - var self = this; - console.log('concurrentSync'); - - console.log('Processing ' + blocks.length + ' blocks'); - var operations = []; - async.each(blocks, function(block, next) { - self.runAllConcurrentBlockHandlers(block, true, function(err, ops) { - if(err) { - return next(err); - } - - operations = operations.concat(ops); - next(); - }); - }, function(err) { - if(err) { - return done(err); - } - - var newHeight = self.concurrentHeight + blocks.length; - - // push concurrent height - var heightBuffer = new Buffer(4); - - heightBuffer.writeUInt32BE(newHeight); - - operations.push({ - type: 'put', - key: self.dbPrefix + 'concurrentHeight', - value: heightBuffer - }); - - log.debug('Updating the database with operations', operations); - self.store.batch(operations, function(err) { - if(err) { - return done(err); - } - - self.concurrentHeight += blocks.length; - done(); - }); - }); -}; - -DB.prototype._serialSync = function(block, callback) { - console.log('serialSync'); - var self = this; - - // Create indexes - self.connectBlock(block, function(err) { - if (err) { - return callback(err); - } - self.tip = block; - log.debug('Chain added block to main chain'); - self.emit('addblock', block); - callback(); - }); -}; - -DB.prototype.sync = function() { - var self = this; - - if (this.bitcoindSyncing || this.node.stopping || !this.tip) { - return; + if(add) { + heightBuffer.writeUInt32BE(block.__height); + tipData = Buffer.concat([new Buffer(block.hash, 'hex'), heightBuffer]); + } else { + heightBuffer.writeUInt32BE(block.__height - 1); + tipData = Buffer.concat([BufferUtil.reverse(block.header.prevHash), heightBuffer]); } - this.bitcoindSyncing = true; - - // keep n blocks in block buffer - // when we run out get more from bitcoind - - var lastHeight = self.tip.__height; - - var blocks = {}; - - var syncError = null; - - async.whilst(function() { - return lastHeight < self.node.services.bitcoind.height && !self.node.stopping && !syncError; - }, function(done) { - var blockCount = Math.min(self.node.services.bitcoind.height - lastHeight, 30 - Object.keys(blocks).length, 5); - - if(!blockCount) { - return setTimeout(done, 1000); - } - - async.times(blockCount, function(n, next) { - var height = lastHeight + n + 1; - self.node.services.bitcoind.getBlock(height, function(err, block) { - if(err) { - return next(err); - } - - blocks[height] = block; - - next(); - }); - }, function(err) { - if(err) { - return done(err); - } - - for(var i = 0; i < blockCount.length; i++) { - var block = blocks[lastHeight + i + 1]; - block.__height = lastHeight + i + 1; - - self._concurrentSync(block, function(err) { - if(err) { - syncError = err; - return; - } - - self._serialSync(block, function(err) { - if(err) { - syncError = err; - return; - } - - delete blocks[lastHeight + i + 1]; - - if(self.tip.__height === self.node.services.bitcoind.height) { - self.bitcoindSyncing = false; - if(self.isSynced()) { - self.node.emit('synced'); - } - } - }); - }); - } - - lastHeight += blockCount; - done(); - }); - }, function(err) { - if (err) { - Error.captureStackTrace(err); - self.node.emit('error', err); - return; - } - - if(syncError) { - Error.captureStackTrace(syncError); - self.node.emit('error', syncError); - return; - } - - if(self.node.stopping) { - self.bitcoindSyncing = false; - } - }); + return { + type: 'put', + key: this.dbPrefix + 'tip', + value: tipData + }; }; -DB.prototype.isSynced = function() { - return self.bitcoind.isSynced() && self.tip.__height === self.node.services.bitcoind.height; -}; +DB.prototype.getConcurrentTipOperation = function(block, add) { + var heightBuffer = new Buffer(4); + var tipData; -DB.prototype._getBitcoindBlocks = function() { - var self = this; - console.log('getBitcoindBlocks'); - - // keep n blocks in block buffer - // when we run out get more from bitcoind - - var lastHeight = self.tip.__height; - - async.whilst(function() { - return lastHeight < self.node.services.bitcoind.height; - }, function(done) { - var blockCount = Math.min(self.node.services.bitcoind.height - lastHeight, 20 - Object.keys(self.syncBuffer).length); - - if(!blockCount) { - return setTimeout(done, 1000); - } - async.timesLimit(blockCount, self.concurrentBlocks, function(n, next) { - var height = lastHeight + n + 1; - self.node.services.bitcoind.getBlock(height, function(err, block) { - if(err) { - return next(err); - } - - self.concurrentSyncBuffer.push(block); - self.syncBuffer[height] = block; - - next(); - }); - }, function(err) { - if(err) { - return done(err); - } - - lastHeight += blockCount; - done(); - }); - }, function(err) { - console.log('completed', err); - if (err) { - Error.captureStackTrace(err); - return self.node.emit('error', err); - } - }); -}; - -/** - * This function will synchronize additional indexes for the chain based on - * the current active chain in the bitcoin daemon. In the event that there is - * a reorganization in the daemon, the chain will rewind to the last common - * ancestor and then resume syncing. - */ -DB.prototype.syncOld = function() { - var self = this; - - if (self.bitcoindSyncing || self.node.stopping || !self.tip) { - return; + if(add) { + heightBuffer.writeUInt32BE(block.__height); + tipData = Buffer.concat([new Buffer(block.hash, 'hex'), heightBuffer]); + } else { + heightBuffer.writeUInt32BE(block.__height - 1); + tipData = Buffer.concat([BufferUtil.reverse(block.header.prevHash), heightBuffer]); } - self.bitcoindSyncing = true; - - var height; - - async.whilst(function() { - height = self.tip.__height; - return height < self.node.services.bitcoind.height && !self.node.stopping; - }, function(done) { - console.log('fetching block ' + (height + 1)); - self.node.services.bitcoind.getBlock(height + 1, function(err, block) { - if (err) { - return done(err); - } - - // TODO: expose prevHash as a string from bitcore - var prevHash = BufferUtil.reverse(block.header.prevHash).toString('hex'); - - if (prevHash === self.tip.hash) { - - // This block appends to the current chain tip and we can - // immediately add it to the chain and create indexes. - - // Populate height - block.__height = self.tip.__height + 1; - - // Create indexes - self.connectBlock(block, function(err) { - if (err) { - return done(err); - } - self.tip = block; - log.debug('Chain added block to main chain'); - self.emit('addblock', block); - setImmediate(done); - }); - } else { - // This block doesn't progress the current tip, so we'll attempt - // to rewind the chain to the common ancestor of the block and - // then we can resume syncing. - log.warn('Beginning reorg! Current tip: ' + self.tip.hash + '; New tip: ' + block.hash); - self.syncRewind(block, function(err) { - if(err) { - return done(err); - } - - log.warn('Reorg complete. New tip is ' + self.tip.hash); - done(); - }); - } - }); - }, function(err) { - if (err) { - Error.captureStackTrace(err); - return self.node.emit('error', err); - } - - if(self.node.stopping) { - self.bitcoindSyncing = false; - return; - } - - if (self.node.services.bitcoind.isSynced()) { - self.bitcoindSyncing = false; - self.node.emit('synced'); - } else { - self.bitcoindSyncing = false; - } - - }); - + return { + type: 'put', + key: this.dbPrefix + 'concurrentTip', + value: tipData + }; }; + + DB.prototype.getPrefix = function(service, callback) { var self = this; diff --git a/lib/services/db/sync.js b/lib/services/db/sync.js index e5ed75cd..5d7c6069 100644 --- a/lib/services/db/sync.js +++ b/lib/services/db/sync.js @@ -5,11 +5,14 @@ var Transform = require('stream').Transform; var inherits = require('util').inherits; var EventEmitter = require('events').EventEmitter; var async = require('async'); +var bitcore = require('bitcore-lib'); +var BufferUtil = bitcore.util.buffer; -function Sync(node) { +function Sync(node, db) { this.node = node; - this.db = node.db; + this.db = db; this.syncing = false; + this.highWaterMark = 100; } inherits(Sync, EventEmitter); @@ -23,11 +26,11 @@ Sync.prototype.initialSync = function() { } this.syncing = true; - this.blockStream = new BlockStream(this.node.services.bitcoind, this.db.tip); - var processConcurrent = new ProcessConcurrent(this.db); - var processSerial = new ProcessSerial(this.db); - var writeStream1 = new WriteStream(this.db); - var writeStream2 = new WriteStream(this.db); + this.blockStream = new BlockStream(this.highWaterMark, this.node.services.bitcoind, this.db.tip.__height); + var processConcurrent = new ProcessConcurrent(this.highWaterMark, this.db); + var processSerial = new ProcessSerial(this.highWaterMark, this.db, this.db.tip); + var writeStream1 = new WriteStream(this.highWaterMark, this.db); + var writeStream2 = new WriteStream(this.highWaterMark, this.db); var start = Date.now(); @@ -90,7 +93,7 @@ Sync.prototype.sync = function() { Sync.prototype.stop = function() { if(this.blockStream) { - this.blockStream.destroy(); + this.blockStream.stopping = true; } }; @@ -108,49 +111,132 @@ Sync.prototype._handleErrors = function(stream) { }); }; -function BlockStream(bitcoind, lastHeight) { - Readable.call(this, {objectMode: true, highWaterMark: 10}); +function BlockStream(highWaterMark, bitcoind, lastHeight) { + Readable.call(this, {objectMode: true, highWaterMark: highWaterMark}); this.bitcoind = bitcoind; this.lastHeight = lastHeight; + this.stopping = false; + this.queue = []; + this.processing = false; } inherits(BlockStream, Readable); +// BlockStream.prototype._read = function() { +// var self = this; + +// // TODO does not work :( + +// var blockCount = Math.min(self.bitcoind.height - self.lastHeight, 5); + +// if(blockCount <= 0 || this.stopping) { +// return self.push(null); +// } + +// console.log('Fetching blocks ' + (self.lastHeight + 1) + ' to ' + (self.lastHeight + blockCount)); + +// async.times(blockCount, function(n, next) { +// var height = self.lastHeight + n + 1; +// self.bitcoind.getBlock(height, function(err, block) { +// if(err) { +// return next(err); +// } + +// block.__height = height; + +// next(null, block); +// }); +// }, function(err, blocks) { +// if(err) { +// return self.emit('error', err); +// } + +// for(var i = 0; i < blocks.length; i++) { +// self.push(blocks[i]); +// } + +// self.lastHeight += blocks.length; +// }); +// }; + +// BlockStream.prototype._read = function() { +// var self = this; + +// self.lastHeight++; +// //console.log('Fetching block ' + self.lastHeight); + +// var height = self.lastHeight; + +// self.bitcoind.getBlock(height, function(err, block) { +// if(err) { +// return self.emit(err); +// } + +// block.__height = height; + +// //console.log('pushing block ' + block.__height); +// self.push(block); +// }); +// }; + BlockStream.prototype._read = function() { - var self = this; + this.lastHeight++; + this.queue.push(this.lastHeight); - var blockCount = Math.min(self.bitcoind.height - self.lastHeight, 5); - - if(blockCount <= 0) { - return self.push(null); - } - - console.log('Fetching blocks ' + (self.lastHeight + 1) + ' to ' (self.lastHeight + blockCount.length)); - - async.times(blockCount, function(n, next) { - var height = self.lastHeight + n + 1; - self.bitcoind.getBlock(height, function(err, block) { - if(err) { - return next(err); - } - - next(null, block); - }); - }, function(err, blocks) { - if(err) { - return self.emit('error', err); - } - - for(var i = 0; i < blocks.length; i++) { - self.push(blocks[i]); - } - - self.lastHeight += blocks.length; - }); + this._process(); }; -function ProcessSerial(db, tip) { - Transform.call(this, {objectMode: true, highWaterMark: 10}); +BlockStream.prototype._process = function() { + var self = this; + + if(this.processing) { + return; + } + + this.processing = true; + + async.whilst( + function() { + return self.queue.length; + }, function(next) { + var heights = self.queue.slice(0, Math.min(5, self.queue.length)); + self.queue = self.queue.slice(heights.length); + + //console.log('fetching blocks ' + heights[0] + ' to ' + heights[heights.length - 1]); + + async.map(heights, function(height, next) { + self.bitcoind.getBlock(height, function(err, block) { + if(err) { + return next(err); + } + + block.__height = height; + + next(null, block); + }); + }, function(err, blocks) { + if(err) { + return next(err); + } + + for(var i = 0; i < blocks.length; i++) { + self.push(blocks[i]); + } + + next(); + }); + }, function(err) { + if(err) { + return self.emit('error', err); + } + + self.processing = false; + } + ); +} + +function ProcessSerial(highWaterMark, db, tip) { + Transform.call(this, {objectMode: true, highWaterMark: highWaterMark}); this.db = db; this.tip = tip; } @@ -160,8 +246,10 @@ inherits(ProcessSerial, Transform); ProcessSerial.prototype._transform = function(block, enc, callback) { var self = this; + //console.log('serial', block.__height); + var prevHash = BufferUtil.reverse(block.header.prevHash).toString('hex'); - if(prevHash !== self.tip) { + if(prevHash !== self.tip.hash) { var err = new Error('Reorg detected'); err.reorg = true; return callback(err); @@ -169,30 +257,37 @@ ProcessSerial.prototype._transform = function(block, enc, callback) { async.whilst( function() { - return self.db.concurrentHeight < block.__height; + return self.db.concurrentTip.__height < block.__height; }, function(next) { - setTimeout(next, 1000); + // wait until concurrent handler is ahead of us + setTimeout(next, 10); }, function() { - var operations = [{index1: block.height}, {index2: block.height}]; - setTimeout(function() { + self.db.getSerialBlockOperations(block, true, function(err, operations) { + if(err) { + return callback(err); + } + var obj = { - tipHeight: block.height, + tip: block, operations: operations }; + self.tip = block; + callback(null, obj); - }, 100); + }); } ); }; -function ProcessConcurrent(db) { - Transform.call(this, {objectMode: true, highWaterMark: 10}); +function ProcessConcurrent(highWaterMark, db) { + Transform.call(this, {objectMode: true, highWaterMark: highWaterMark}); this.db = db; this.operations = []; - this.lastHeight = 0; + this.lastBlock = 0; + this.blockCount = 0; }; inherits(ProcessConcurrent, Transform); @@ -200,22 +295,26 @@ inherits(ProcessConcurrent, Transform); ProcessConcurrent.prototype._transform = function(block, enc, callback) { var self = this; - this.lastHeight = block.__height; + //console.log('concurrent', block.__height); - self.db.runAllConcurrentBlockHandlers(block, true, function(err, operations) { + this.lastBlock = block; + + self.db.getConcurrentBlockOperations(block, true, function(err, operations) { if(err) { return callback(err); } + self.blockCount++; self.operations = self.operations.concat(operations); - if(self.operations >= 100) { - // TODO add tip operation + if(self.blockCount >= 1) { //self.operations.length >= 100) { + self.operations.push(self.db.getConcurrentTipOperation(block, true)); var obj = { - concurrentTipHeight: block.__height, + concurrentTip: block, operations: self.operations } self.operations = []; + self.blockCount = 0; return callback(null, obj); } @@ -226,9 +325,9 @@ ProcessConcurrent.prototype._transform = function(block, enc, callback) { ProcessConcurrent.prototype._flush = function(callback) { if(this.operations.length) { - // TODO add tip operation + this.operations.push(this.db.getConcurrentTipOperation(this.lastBlock, true)); var obj = { - concurrentTipHeight: this.lastHeight, + concurrentTipHeight: this.lastBlock, operations: this.operations }; @@ -237,30 +336,40 @@ ProcessConcurrent.prototype._flush = function(callback) { } }; -function WriteStream(db) { - Writable.call(this, {objectMode: true, highWaterMark: 10}); +function WriteStream(highWaterMark, db) { + Writable.call(this, {objectMode: true, highWaterMark: highWaterMark}); this.db = db; this.writeTime = 0; + this.lastConcurrentOutputHeight = 0; } inherits(WriteStream, Writable); WriteStream.prototype._write = function(obj, enc, callback) { var self = this; - setTimeout(function() { - console.log('WriteStreamSlow block ', operations.concurrentTipHeight); - self.writeTime += 2000; + + self.db.store.batch(obj.operations, function(err) { + if(err) { + return callback(err); + } if(obj.tip) { self.db.tip = obj.tip; + if(self.db.tip.__height % 100 === 0) { + console.log('Tip:', self.db.tip.__height); + } } if(obj.concurrentTip) { self.db.concurrentTip = obj.concurrentTip; + if(self.db.concurrentTip.__height - self.lastConcurrentOutputHeight >= 100) { + console.log('Concurrent tip:', self.db.concurrentTip.__height); + self.lastConcurrentOutputHeight = self.db.concurrentTip.__height; + } } callback(); - }, 2000); + }); }; module.exports = Sync; \ No newline at end of file