diff --git a/lib/services/db.js b/lib/services/db.js index 64461ad4..bba2956d 100644 --- a/lib/services/db.js +++ b/lib/services/db.js @@ -65,6 +65,10 @@ function DB(options) { transaction: [], block: [] }; + + this.concurrentSyncBuffer = []; + this.syncBuffer = {}; + this.concurrentBlocks = 10; } util.inherits(DB, Service); @@ -189,7 +193,7 @@ DB.prototype.start = function(callback) { return callback(err); } - setImmediate(callback); + self.loadConcurrencySyncHeight(callback); }); }); }; @@ -319,6 +323,31 @@ DB.prototype.loadTip = function(callback) { }); }; +DB.prototype.loadConcurrencySyncHeight = function(callback) { + var self = this; + + console.log('loadConcurrencySyncHeight'); + var options = { + keyEncoding: 'string', + valueEncoding: 'binary' + }; + + self.store.get(self.dbPrefix + 'concurrentHeight', options, function(err, heightBuffer) { + if(err) { + if(err.notFound) { + console.log('setting to 0'); + self.concurrentHeight = 0; + return callback(); + } + return callback(err); + } + + + self.concurrentHeight = heightBuffer.readUInt32BE(); + callback(); + }); +}; + /** * Will get a block from bitcoind and give a Bitcore Block * @param {String|Number} hash - A block hash or block height @@ -551,6 +580,7 @@ DB.prototype.runAllConcurrentBlockHandlers = function(block, add, callback) { $.checkArgument(Array.isArray(ops), 'concurrentBlockHandler for ' + mod.name + ' returned non-array'); operations = operations.concat(ops); } + next(); }); } else { @@ -688,35 +718,41 @@ DB.prototype.syncRewind = function(block, done) { }; DB.prototype.sync = function() { - if (self.bitcoindSyncing || self.node.stopping || !self.tip) { + console.log('sync'); + if (this.bitcoindSyncing || this.node.stopping || !this.tip) { return; } - self.bitcoindSyncing = true; + this.bitcoindSyncing = true; - self._getBitcoindBlocks(); - self._concurrentSync(); - self._serialSync(); + this._getBitcoindBlocks(); + this._concurrentSync(); + this._serialSync(); }; -DB.prototype._concurrentSync = function() { +DB.prototype._concurrentSync = function(callback) { var self = this; + console.log('concurrentSync'); + + console.log(self.concurrentHeight, self.node.services.bitcoind.height); async.whilst(function() { return self.concurrentHeight < self.node.services.bitcoind.height && !self.node.stopping; }, function(done) { - if(!self.blockBuffer.length) { + if(self.concurrentSyncBuffer.length < Math.min(self.concurrentBlocks, self.node.services.bitcoind.height - self.concurrentHeight)) { + console.log('cannot concurrentSync... waiting for blocks'); // wait to get more blocks from bitcoind return setTimeout(done, 1000); } // get x blocks off block buffer - var blocks = self.blockBuffer.slice(0, 10); - self.blockBuffer = self.blockBuffer.slice(10); + var blocks = self.concurrentSyncBuffer.slice(0, self.concurrentBlocks); + self.concurrentSyncBuffer = self.concurrentSyncBuffer.slice(self.concurrentBlocks); + console.log('Processing ' + blocks.length + ' blocks'); var operations = []; async.each(blocks, function(block, next) { - self.runAllConcurrentBlockHandlers(block, add, true, function(err, ops) { + self.runAllConcurrentBlockHandlers(block, true, function(err, ops) { if(err) { return next(err); } @@ -729,15 +765,28 @@ DB.prototype._concurrentSync = function() { return done(err); } - // push concurrency height - self.concurrentHeight += blocks.length; + var newHeight = self.concurrentHeight + blocks.length; + + // push concurrent height + var heightBuffer = new Buffer(4); + + heightBuffer.writeUInt32BE(newHeight); operations.push({ - + type: 'put', + key: self.dbPrefix + 'concurrentHeight', + value: heightBuffer }); log.debug('Updating the database with operations', operations); - self.store.batch(operations, done); + self.store.batch(operations, function(err) { + if(err) { + return done(err); + } + + self.concurrentHeight += blocks.length; + done(); + }); }); }, function(err) { if (err) { @@ -747,46 +796,45 @@ DB.prototype._concurrentSync = function() { }); }; -DB.prototype._serialSync = function() { +DB.prototype._serialSync = function(callback) { + console.log('serialSync'); var self = this; // depends on block and transaction services var height = self.tip.__height; + console.log('height', height, typeof(height)); async.whilst(function() { return height < self.node.services.bitcoind.height && !self.node.stopping; }, function(done) { - if(!self.node.services.block || !self.node.services.transaction || height >= self.concurrentHeight) { + if(height >= self.concurrentHeight || !self.syncBuffer[height + 1]) { // wait + console.log('cannot serialSync... waiting for block ' + (height + 1)); return setTimeout(done, 1000); } - // get block from block services - self.node.services.block.getBlockByHeight(height + 1, function(err, block) { - if(err) { + // get block from memory + var block = self.syncBuffer[height + 1]; + if(block.header.prevHash.reverse().toString('hex') !== self.tip.hash) { + // TODO need to rewind our serial blocks as well as concurrent blocks! + } + + // This block appends to the current chain tip and we can + // immediately add it to the chain and create indexes. + + // Populate height + block.__height = self.tip.__height + 1; + + // Create indexes + self.connectBlock(block, function(err) { + if (err) { return done(err); } - - if(block.prevHash.reverse().toString('hex') !== self.tip.hash) { - // TODO need to rewind our serial blocks as well as concurrent blocks! - } - - - // This block appends to the current chain tip and we can - // immediately add it to the chain and create indexes. - - // Populate height - block.__height = self.tip.__height + 1; - - // Create indexes - self.connectBlock(block, function(err) { - if (err) { - return done(err); - } - self.tip = block; - log.debug('Chain added block to main chain'); - self.emit('addblock', block); - setImmediate(done); - }); + self.tip = block; + // remove from memory + delete self.syncBuffer[height + 1]; + log.debug('Chain added block to main chain'); + self.emit('addblock', block); + done(); }); }, function(err) { if (err) { @@ -819,18 +867,54 @@ DB.prototype._getBitcoindBlocks = function() { async.whilst(function() { return self.lastHeight < self.node.services.bitcoind.height && !self.node.stopping; }, function(done) { - var blockCount = Math.min(self.node.services.bitcoind.height - lastHeight, 30 - self.blockBuffer.length); + var blockCount = Math.min(self.node.services.bitcoind.height - lastHeight, 10 - self.blockBuffer.length); - async.timesLimit(blockCount, 5, function(n, next) { - self.node.services.bitcoind.getBlock(lastHeight + n + 1, function(err, block) { + async.timesLimit(blockCount, 10, function(n, next) { + var height = lastHeight + n + 1; + self.node.services.bitcoind.getBlock(height, function(err, block) { if(err) { return next(err); } - self.blockBuffer.push(block); + self.blockBuffer[height] = block; + next(); }); - }, done); + }, function(err) { + if(err) { + return done(err); + } + + async.timesLimit(blockCount, 10, function(n, next) { + var block = self.blockBuffer[lastHeight + n + 1]; + + self._concurrentSync(block, function(err) { + if(err) { + return next(err); + } + + self._serialSync(block, function(err) { + if(err) { + log.err(err); + return node.stop(function() { + process.exit(1); + }); + } + + delete self.blockBuffer[lastHeight + i + 1]; + }); + + next(); + }); + }, function(err) { + if(err) { + return done(err); + } + + lastHeight += blockCount; + done(); + }); + }); }, function(err) { if (err) { Error.captureStackTrace(err); @@ -839,6 +923,52 @@ DB.prototype._getBitcoindBlocks = function() { }); }; +DB.prototype._getBitcoindBlocks = function() { + var self = this; + console.log('getBitcoindBlocks'); + + // keep n blocks in block buffer + // when we run out get more from bitcoind + + var lastHeight = self.tip.__height; + + async.whilst(function() { + return lastHeight < self.node.services.bitcoind.height; + }, function(done) { + var blockCount = Math.min(self.node.services.bitcoind.height - lastHeight, 20 - Object.keys(self.syncBuffer).length); + + if(!blockCount) { + return setTimeout(done, 1000); + } + async.timesLimit(blockCount, self.concurrentBlocks, function(n, next) { + var height = lastHeight + n + 1; + self.node.services.bitcoind.getBlock(height, function(err, block) { + if(err) { + return next(err); + } + + self.concurrentSyncBuffer.push(block); + self.syncBuffer[height] = block; + + next(); + }); + }, function(err) { + if(err) { + return done(err); + } + + lastHeight += blockCount; + done(); + }); + }, function(err) { + console.log('completed', err); + if (err) { + Error.captureStackTrace(err); + return self.node.emit('error', err); + } + }); +}; + /** * This function will synchronize additional indexes for the chain based on * the current active chain in the bitcoin daemon. In the event that there is diff --git a/lib/services/block.js b/lib/services/timestamp.js similarity index 68% rename from lib/services/block.js rename to lib/services/timestamp.js index 6268fdeb..e69b5b61 100644 --- a/lib/services/block.js +++ b/lib/services/timestamp.js @@ -2,20 +2,19 @@ var BaseService = require('../service'); var inherits = require('util').inherits; -function BlockService(options) { +function TimestampService(options) { BaseService.call(this, options); this.currentBlock = null; this.currentTimestamp = null; } -inherits(BlockService, BaseService); +inherits(TimestampService, BaseService); -BlockService.dependencies = [ - 'db', - 'transaction' +TimestampService.dependencies = [ + 'db' ]; -BlockService.prototype.start = function(callback) { +TimestampService.prototype.start = function(callback) { var self = this; this.store = this.node.services.db.store; @@ -31,11 +30,11 @@ BlockService.prototype.start = function(callback) { }); }; -BlockService.prototype.stop = function(callback) { +TimestampService.prototype.stop = function(callback) { setImmediate(callback); }; -BlockService.prototype.blockHandler = function(block, connectBlock, callback) { +TimestampService.prototype.blockHandler = function(block, connectBlock, callback) { var self = this; var action = 'put'; @@ -70,8 +69,6 @@ BlockService.prototype.blockHandler = function(block, connectBlock, callback) { self.currentBlock = block.hash; self.currentTimestamp = timestamp; - // TODO combine with block header - operations = operations.concat( [ { @@ -91,16 +88,7 @@ BlockService.prototype.blockHandler = function(block, connectBlock, callback) { }); }; -BlockService.prototype.getBlock = function(hash, callback) { - // get block header - // get individual transactions -}; - -BlockService.prototype.getBlockByHeight = function(height, callback) { - -}; - -BlockService.prototype.getTimestamp = function(hash, callback) { +TimestampService.prototype.getTimestamp = function(hash, callback) { var self = this; if (hash === self.currentBlock) { @@ -119,40 +107,40 @@ BlockService.prototype.getTimestamp = function(hash, callback) { }); }; -BlockService.prototype._encodeBlockTimestampKey = function(hash) { +TimestampService.prototype._encodeBlockTimestampKey = function(hash) { return Buffer.concat([this.prefix, new Buffer(hash, 'hex')]); }; -BlockService.prototype._decodeBlockTimestampKey = function(buffer) { +TimestampService.prototype._decodeBlockTimestampKey = function(buffer) { return buffer.slice(2).toString('hex'); }; -BlockService.prototype._encodeBlockTimestampValue = function(timestamp) { +TimestampService.prototype._encodeBlockTimestampValue = function(timestamp) { var timestampBuffer = new Buffer(new Array(8)); timestampBuffer.writeDoubleBE(timestamp); return timestampBuffer; }; -BlockService.prototype._decodeBlockTimestampValue = function(buffer) { +TimestampService.prototype._decodeBlockTimestampValue = function(buffer) { return buffer.readDoubleBE(0); }; -BlockService.prototype._encodeTimestampBlockKey = function(timestamp) { +TimestampService.prototype._encodeTimestampBlockKey = function(timestamp) { var timestampBuffer = new Buffer(new Array(8)); timestampBuffer.writeDoubleBE(timestamp); return Buffer.concat([this.prefix, timestampBuffer]); }; -BlockService.prototype._decodeTimestampBlockKey = function(buffer) { +TimestampService.prototype._decodeTimestampBlockKey = function(buffer) { return buffer.readDoubleBE(2); }; -BlockService.prototype._encodeTimestampBlockValue = function(hash) { +TimestampService.prototype._encodeTimestampBlockValue = function(hash) { return new Buffer(hash, 'hex'); }; -BlockService.prototype._decodeTimestampBlockValue = function(buffer) { +TimestampService.prototype._decodeTimestampBlockValue = function(buffer) { return buffer.toString('hex'); }; -module.exports = BlockService; +module.exports = TimestampService; diff --git a/lib/services/transaction.js b/lib/services/transaction.js index a4d45798..ec293294 100644 --- a/lib/services/transaction.js +++ b/lib/services/transaction.js @@ -18,6 +18,7 @@ var utils = require('./wallet-api/utils'); function TransactionService(options) { BaseService.call(this, options); this.concurrency = options.concurrency || 20; + /* upon initialization of the mempool, only txids are obtained from * a trusted bitcoin node (the bitcoind service's rpchost or p2p option) * Since, the mempool is very temporal, I see no reason to take the @@ -27,7 +28,7 @@ function TransactionService(options) { * then call to the trusted bitcoind will take place and the result set. */ this._mempool = LRU({ - max: utils.parseByteCount(options.maxMemPoolSize) || 100 * 1024 * 1024, //100MB + max: 100 * 1024 * 1024, //100MB length: function(tx) { if (tx) { return tx.toBuffer().length; } } }); this.currentTransactions = {};