diff --git a/lib/services/timestamp.js b/lib/services/block.js similarity index 68% rename from lib/services/timestamp.js rename to lib/services/block.js index e69b5b61..6268fdeb 100644 --- a/lib/services/timestamp.js +++ b/lib/services/block.js @@ -2,19 +2,20 @@ var BaseService = require('../service'); var inherits = require('util').inherits; -function TimestampService(options) { +function BlockService(options) { BaseService.call(this, options); this.currentBlock = null; this.currentTimestamp = null; } -inherits(TimestampService, BaseService); +inherits(BlockService, BaseService); -TimestampService.dependencies = [ - 'db' +BlockService.dependencies = [ + 'db', + 'transaction' ]; -TimestampService.prototype.start = function(callback) { +BlockService.prototype.start = function(callback) { var self = this; this.store = this.node.services.db.store; @@ -30,11 +31,11 @@ TimestampService.prototype.start = function(callback) { }); }; -TimestampService.prototype.stop = function(callback) { +BlockService.prototype.stop = function(callback) { setImmediate(callback); }; -TimestampService.prototype.blockHandler = function(block, connectBlock, callback) { +BlockService.prototype.blockHandler = function(block, connectBlock, callback) { var self = this; var action = 'put'; @@ -69,6 +70,8 @@ TimestampService.prototype.blockHandler = function(block, connectBlock, callback self.currentBlock = block.hash; self.currentTimestamp = timestamp; + // TODO combine with block header + operations = operations.concat( [ { @@ -88,7 +91,16 @@ TimestampService.prototype.blockHandler = function(block, connectBlock, callback }); }; -TimestampService.prototype.getTimestamp = function(hash, callback) { +BlockService.prototype.getBlock = function(hash, callback) { + // get block header + // get individual transactions +}; + +BlockService.prototype.getBlockByHeight = function(height, callback) { + +}; + +BlockService.prototype.getTimestamp = function(hash, callback) { var self = this; if (hash === self.currentBlock) { @@ -107,40 +119,40 @@ TimestampService.prototype.getTimestamp = function(hash, callback) { }); }; -TimestampService.prototype._encodeBlockTimestampKey = function(hash) { +BlockService.prototype._encodeBlockTimestampKey = function(hash) { return Buffer.concat([this.prefix, new Buffer(hash, 'hex')]); }; -TimestampService.prototype._decodeBlockTimestampKey = function(buffer) { +BlockService.prototype._decodeBlockTimestampKey = function(buffer) { return buffer.slice(2).toString('hex'); }; -TimestampService.prototype._encodeBlockTimestampValue = function(timestamp) { +BlockService.prototype._encodeBlockTimestampValue = function(timestamp) { var timestampBuffer = new Buffer(new Array(8)); timestampBuffer.writeDoubleBE(timestamp); return timestampBuffer; }; -TimestampService.prototype._decodeBlockTimestampValue = function(buffer) { +BlockService.prototype._decodeBlockTimestampValue = function(buffer) { return buffer.readDoubleBE(0); }; -TimestampService.prototype._encodeTimestampBlockKey = function(timestamp) { +BlockService.prototype._encodeTimestampBlockKey = function(timestamp) { var timestampBuffer = new Buffer(new Array(8)); timestampBuffer.writeDoubleBE(timestamp); return Buffer.concat([this.prefix, timestampBuffer]); }; -TimestampService.prototype._decodeTimestampBlockKey = function(buffer) { +BlockService.prototype._decodeTimestampBlockKey = function(buffer) { return buffer.readDoubleBE(2); }; -TimestampService.prototype._encodeTimestampBlockValue = function(hash) { +BlockService.prototype._encodeTimestampBlockValue = function(hash) { return new Buffer(hash, 'hex'); }; -TimestampService.prototype._decodeTimestampBlockValue = function(buffer) { +BlockService.prototype._decodeTimestampBlockValue = function(buffer) { return buffer.toString('hex'); }; -module.exports = TimestampService; +module.exports = BlockService; diff --git a/lib/services/db.js b/lib/services/db.js index ecd4299d..64461ad4 100644 --- a/lib/services/db.js +++ b/lib/services/db.js @@ -533,6 +533,40 @@ DB.prototype.runAllBlockHandlers = function(block, add, callback) { ); }; +DB.prototype.runAllConcurrentBlockHandlers = function(block, add, callback) { + var self = this; + var operations = []; + + async.each( + this.node.services, + function(mod, next) { + if(mod.concurrenctBlockHandler) { + $.checkArgument(typeof mod.concurrenctBlockHandler === 'function', 'concurrentBlockHandler must be a function'); + + mod.concurrentBlockHandler.call(mod, block, add, function(err, ops) { + if (err) { + return next(err); + } + if (ops) { + $.checkArgument(Array.isArray(ops), 'concurrentBlockHandler for ' + mod.name + ' returned non-array'); + operations = operations.concat(ops); + } + next(); + }); + } else { + setImmediate(next); + } + }, + function(err) { + if (err) { + return callback(err); + } + + callback(null, operations); + } + ); +}; + /** * This function will find the common ancestor between the current chain and a forked block, * by moving backwards on both chains until there is a meeting point. @@ -653,13 +687,165 @@ DB.prototype.syncRewind = function(block, done) { }); }; +DB.prototype.sync = function() { + if (self.bitcoindSyncing || self.node.stopping || !self.tip) { + return; + } + + self.bitcoindSyncing = true; + + self._getBitcoindBlocks(); + self._concurrentSync(); + self._serialSync(); +}; + +DB.prototype._concurrentSync = function() { + var self = this; + + async.whilst(function() { + return self.concurrentHeight < self.node.services.bitcoind.height && !self.node.stopping; + }, function(done) { + if(!self.blockBuffer.length) { + // wait to get more blocks from bitcoind + return setTimeout(done, 1000); + } + + // get x blocks off block buffer + var blocks = self.blockBuffer.slice(0, 10); + self.blockBuffer = self.blockBuffer.slice(10); + + var operations = []; + async.each(blocks, function(block, next) { + self.runAllConcurrentBlockHandlers(block, add, true, function(err, ops) { + if(err) { + return next(err); + } + + operations = operations.concat(ops); + next(); + }); + }, function(err) { + if(err) { + return done(err); + } + + // push concurrency height + self.concurrentHeight += blocks.length; + + operations.push({ + + }); + + log.debug('Updating the database with operations', operations); + self.store.batch(operations, done); + }); + }, function(err) { + if (err) { + Error.captureStackTrace(err); + return self.node.emit('error', err); + } + }); +}; + +DB.prototype._serialSync = function() { + var self = this; + // depends on block and transaction services + + var height = self.tip.__height; + async.whilst(function() { + return height < self.node.services.bitcoind.height && !self.node.stopping; + }, function(done) { + if(!self.node.services.block || !self.node.services.transaction || height >= self.concurrentHeight) { + // wait + return setTimeout(done, 1000); + } + + // get block from block services + self.node.services.block.getBlockByHeight(height + 1, function(err, block) { + if(err) { + return done(err); + } + + if(block.prevHash.reverse().toString('hex') !== self.tip.hash) { + // TODO need to rewind our serial blocks as well as concurrent blocks! + } + + + // This block appends to the current chain tip and we can + // immediately add it to the chain and create indexes. + + // Populate height + block.__height = self.tip.__height + 1; + + // Create indexes + self.connectBlock(block, function(err) { + if (err) { + return done(err); + } + self.tip = block; + log.debug('Chain added block to main chain'); + self.emit('addblock', block); + setImmediate(done); + }); + }); + }, function(err) { + if (err) { + Error.captureStackTrace(err); + return self.node.emit('error', err); + } + + if(self.node.stopping) { + self.bitcoindSyncing = false; + return; + } + + if (self.node.services.bitcoind.isSynced()) { + self.bitcoindSyncing = false; + self.node.emit('synced'); + } else { + self.bitcoindSyncing = false; + } + }); +}; + +DB.prototype._getBitcoindBlocks = function() { + var self = this; + + // keep n blocks in block buffer + // when we run out get more from bitcoind + + var lastHeight = self.tip.__height; + + async.whilst(function() { + return self.lastHeight < self.node.services.bitcoind.height && !self.node.stopping; + }, function(done) { + var blockCount = Math.min(self.node.services.bitcoind.height - lastHeight, 30 - self.blockBuffer.length); + + async.timesLimit(blockCount, 5, function(n, next) { + self.node.services.bitcoind.getBlock(lastHeight + n + 1, function(err, block) { + if(err) { + return next(err); + } + + self.blockBuffer.push(block); + next(); + }); + }, done); + }, function(err) { + if (err) { + Error.captureStackTrace(err); + return self.node.emit('error', err); + } + }); +}; + /** * This function will synchronize additional indexes for the chain based on * the current active chain in the bitcoin daemon. In the event that there is * a reorganization in the daemon, the chain will rewind to the last common * ancestor and then resume syncing. */ -DB.prototype.sync = function() { +DB.prototype.syncOld = function() { var self = this; if (self.bitcoindSyncing || self.node.stopping || !self.tip) {