This commit is contained in:
Patrick Nagurny 2017-01-25 16:37:38 -05:00
parent 25c1492580
commit a220bbc43c
3 changed files with 196 additions and 77 deletions

View File

@ -65,6 +65,10 @@ function DB(options) {
transaction: [],
block: []
};
this.concurrentSyncBuffer = [];
this.syncBuffer = {};
this.concurrentBlocks = 10;
}
util.inherits(DB, Service);
@ -189,7 +193,7 @@ DB.prototype.start = function(callback) {
return callback(err);
}
setImmediate(callback);
self.loadConcurrencySyncHeight(callback);
});
});
};
@ -319,6 +323,31 @@ DB.prototype.loadTip = function(callback) {
});
};
DB.prototype.loadConcurrencySyncHeight = function(callback) {
var self = this;
console.log('loadConcurrencySyncHeight');
var options = {
keyEncoding: 'string',
valueEncoding: 'binary'
};
self.store.get(self.dbPrefix + 'concurrentHeight', options, function(err, heightBuffer) {
if(err) {
if(err.notFound) {
console.log('setting to 0');
self.concurrentHeight = 0;
return callback();
}
return callback(err);
}
self.concurrentHeight = heightBuffer.readUInt32BE();
callback();
});
};
/**
* Will get a block from bitcoind and give a Bitcore Block
* @param {String|Number} hash - A block hash or block height
@ -551,6 +580,7 @@ DB.prototype.runAllConcurrentBlockHandlers = function(block, add, callback) {
$.checkArgument(Array.isArray(ops), 'concurrentBlockHandler for ' + mod.name + ' returned non-array');
operations = operations.concat(ops);
}
next();
});
} else {
@ -688,35 +718,41 @@ DB.prototype.syncRewind = function(block, done) {
};
DB.prototype.sync = function() {
if (self.bitcoindSyncing || self.node.stopping || !self.tip) {
console.log('sync');
if (this.bitcoindSyncing || this.node.stopping || !this.tip) {
return;
}
self.bitcoindSyncing = true;
this.bitcoindSyncing = true;
self._getBitcoindBlocks();
self._concurrentSync();
self._serialSync();
this._getBitcoindBlocks();
this._concurrentSync();
this._serialSync();
};
DB.prototype._concurrentSync = function() {
DB.prototype._concurrentSync = function(callback) {
var self = this;
console.log('concurrentSync');
console.log(self.concurrentHeight, self.node.services.bitcoind.height);
async.whilst(function() {
return self.concurrentHeight < self.node.services.bitcoind.height && !self.node.stopping;
}, function(done) {
if(!self.blockBuffer.length) {
if(self.concurrentSyncBuffer.length < Math.min(self.concurrentBlocks, self.node.services.bitcoind.height - self.concurrentHeight)) {
console.log('cannot concurrentSync... waiting for blocks');
// wait to get more blocks from bitcoind
return setTimeout(done, 1000);
}
// get x blocks off block buffer
var blocks = self.blockBuffer.slice(0, 10);
self.blockBuffer = self.blockBuffer.slice(10);
var blocks = self.concurrentSyncBuffer.slice(0, self.concurrentBlocks);
self.concurrentSyncBuffer = self.concurrentSyncBuffer.slice(self.concurrentBlocks);
console.log('Processing ' + blocks.length + ' blocks');
var operations = [];
async.each(blocks, function(block, next) {
self.runAllConcurrentBlockHandlers(block, add, true, function(err, ops) {
self.runAllConcurrentBlockHandlers(block, true, function(err, ops) {
if(err) {
return next(err);
}
@ -729,15 +765,28 @@ DB.prototype._concurrentSync = function() {
return done(err);
}
// push concurrency height
self.concurrentHeight += blocks.length;
var newHeight = self.concurrentHeight + blocks.length;
// push concurrent height
var heightBuffer = new Buffer(4);
heightBuffer.writeUInt32BE(newHeight);
operations.push({
type: 'put',
key: self.dbPrefix + 'concurrentHeight',
value: heightBuffer
});
log.debug('Updating the database with operations', operations);
self.store.batch(operations, done);
self.store.batch(operations, function(err) {
if(err) {
return done(err);
}
self.concurrentHeight += blocks.length;
done();
});
});
}, function(err) {
if (err) {
@ -747,46 +796,45 @@ DB.prototype._concurrentSync = function() {
});
};
DB.prototype._serialSync = function() {
DB.prototype._serialSync = function(callback) {
console.log('serialSync');
var self = this;
// depends on block and transaction services
var height = self.tip.__height;
console.log('height', height, typeof(height));
async.whilst(function() {
return height < self.node.services.bitcoind.height && !self.node.stopping;
}, function(done) {
if(!self.node.services.block || !self.node.services.transaction || height >= self.concurrentHeight) {
if(height >= self.concurrentHeight || !self.syncBuffer[height + 1]) {
// wait
console.log('cannot serialSync... waiting for block ' + (height + 1));
return setTimeout(done, 1000);
}
// get block from block services
self.node.services.block.getBlockByHeight(height + 1, function(err, block) {
if(err) {
// get block from memory
var block = self.syncBuffer[height + 1];
if(block.header.prevHash.reverse().toString('hex') !== self.tip.hash) {
// TODO need to rewind our serial blocks as well as concurrent blocks!
}
// This block appends to the current chain tip and we can
// immediately add it to the chain and create indexes.
// Populate height
block.__height = self.tip.__height + 1;
// Create indexes
self.connectBlock(block, function(err) {
if (err) {
return done(err);
}
if(block.prevHash.reverse().toString('hex') !== self.tip.hash) {
// TODO need to rewind our serial blocks as well as concurrent blocks!
}
// This block appends to the current chain tip and we can
// immediately add it to the chain and create indexes.
// Populate height
block.__height = self.tip.__height + 1;
// Create indexes
self.connectBlock(block, function(err) {
if (err) {
return done(err);
}
self.tip = block;
log.debug('Chain added block to main chain');
self.emit('addblock', block);
setImmediate(done);
});
self.tip = block;
// remove from memory
delete self.syncBuffer[height + 1];
log.debug('Chain added block to main chain');
self.emit('addblock', block);
done();
});
}, function(err) {
if (err) {
@ -819,18 +867,54 @@ DB.prototype._getBitcoindBlocks = function() {
async.whilst(function() {
return self.lastHeight < self.node.services.bitcoind.height && !self.node.stopping;
}, function(done) {
var blockCount = Math.min(self.node.services.bitcoind.height - lastHeight, 30 - self.blockBuffer.length);
var blockCount = Math.min(self.node.services.bitcoind.height - lastHeight, 10 - self.blockBuffer.length);
async.timesLimit(blockCount, 5, function(n, next) {
self.node.services.bitcoind.getBlock(lastHeight + n + 1, function(err, block) {
async.timesLimit(blockCount, 10, function(n, next) {
var height = lastHeight + n + 1;
self.node.services.bitcoind.getBlock(height, function(err, block) {
if(err) {
return next(err);
}
self.blockBuffer.push(block);
self.blockBuffer[height] = block;
next();
});
}, done);
}, function(err) {
if(err) {
return done(err);
}
async.timesLimit(blockCount, 10, function(n, next) {
var block = self.blockBuffer[lastHeight + n + 1];
self._concurrentSync(block, function(err) {
if(err) {
return next(err);
}
self._serialSync(block, function(err) {
if(err) {
log.err(err);
return node.stop(function() {
process.exit(1);
});
}
delete self.blockBuffer[lastHeight + i + 1];
});
next();
});
}, function(err) {
if(err) {
return done(err);
}
lastHeight += blockCount;
done();
});
});
}, function(err) {
if (err) {
Error.captureStackTrace(err);
@ -839,6 +923,52 @@ DB.prototype._getBitcoindBlocks = function() {
});
};
DB.prototype._getBitcoindBlocks = function() {
var self = this;
console.log('getBitcoindBlocks');
// keep n blocks in block buffer
// when we run out get more from bitcoind
var lastHeight = self.tip.__height;
async.whilst(function() {
return lastHeight < self.node.services.bitcoind.height;
}, function(done) {
var blockCount = Math.min(self.node.services.bitcoind.height - lastHeight, 20 - Object.keys(self.syncBuffer).length);
if(!blockCount) {
return setTimeout(done, 1000);
}
async.timesLimit(blockCount, self.concurrentBlocks, function(n, next) {
var height = lastHeight + n + 1;
self.node.services.bitcoind.getBlock(height, function(err, block) {
if(err) {
return next(err);
}
self.concurrentSyncBuffer.push(block);
self.syncBuffer[height] = block;
next();
});
}, function(err) {
if(err) {
return done(err);
}
lastHeight += blockCount;
done();
});
}, function(err) {
console.log('completed', err);
if (err) {
Error.captureStackTrace(err);
return self.node.emit('error', err);
}
});
};
/**
* This function will synchronize additional indexes for the chain based on
* the current active chain in the bitcoin daemon. In the event that there is

View File

@ -2,20 +2,19 @@
var BaseService = require('../service');
var inherits = require('util').inherits;
function BlockService(options) {
function TimestampService(options) {
BaseService.call(this, options);
this.currentBlock = null;
this.currentTimestamp = null;
}
inherits(BlockService, BaseService);
inherits(TimestampService, BaseService);
BlockService.dependencies = [
'db',
'transaction'
TimestampService.dependencies = [
'db'
];
BlockService.prototype.start = function(callback) {
TimestampService.prototype.start = function(callback) {
var self = this;
this.store = this.node.services.db.store;
@ -31,11 +30,11 @@ BlockService.prototype.start = function(callback) {
});
};
BlockService.prototype.stop = function(callback) {
TimestampService.prototype.stop = function(callback) {
setImmediate(callback);
};
BlockService.prototype.blockHandler = function(block, connectBlock, callback) {
TimestampService.prototype.blockHandler = function(block, connectBlock, callback) {
var self = this;
var action = 'put';
@ -70,8 +69,6 @@ BlockService.prototype.blockHandler = function(block, connectBlock, callback) {
self.currentBlock = block.hash;
self.currentTimestamp = timestamp;
// TODO combine with block header
operations = operations.concat(
[
{
@ -91,16 +88,7 @@ BlockService.prototype.blockHandler = function(block, connectBlock, callback) {
});
};
BlockService.prototype.getBlock = function(hash, callback) {
// get block header
// get individual transactions
};
BlockService.prototype.getBlockByHeight = function(height, callback) {
};
BlockService.prototype.getTimestamp = function(hash, callback) {
TimestampService.prototype.getTimestamp = function(hash, callback) {
var self = this;
if (hash === self.currentBlock) {
@ -119,40 +107,40 @@ BlockService.prototype.getTimestamp = function(hash, callback) {
});
};
BlockService.prototype._encodeBlockTimestampKey = function(hash) {
TimestampService.prototype._encodeBlockTimestampKey = function(hash) {
return Buffer.concat([this.prefix, new Buffer(hash, 'hex')]);
};
BlockService.prototype._decodeBlockTimestampKey = function(buffer) {
TimestampService.prototype._decodeBlockTimestampKey = function(buffer) {
return buffer.slice(2).toString('hex');
};
BlockService.prototype._encodeBlockTimestampValue = function(timestamp) {
TimestampService.prototype._encodeBlockTimestampValue = function(timestamp) {
var timestampBuffer = new Buffer(new Array(8));
timestampBuffer.writeDoubleBE(timestamp);
return timestampBuffer;
};
BlockService.prototype._decodeBlockTimestampValue = function(buffer) {
TimestampService.prototype._decodeBlockTimestampValue = function(buffer) {
return buffer.readDoubleBE(0);
};
BlockService.prototype._encodeTimestampBlockKey = function(timestamp) {
TimestampService.prototype._encodeTimestampBlockKey = function(timestamp) {
var timestampBuffer = new Buffer(new Array(8));
timestampBuffer.writeDoubleBE(timestamp);
return Buffer.concat([this.prefix, timestampBuffer]);
};
BlockService.prototype._decodeTimestampBlockKey = function(buffer) {
TimestampService.prototype._decodeTimestampBlockKey = function(buffer) {
return buffer.readDoubleBE(2);
};
BlockService.prototype._encodeTimestampBlockValue = function(hash) {
TimestampService.prototype._encodeTimestampBlockValue = function(hash) {
return new Buffer(hash, 'hex');
};
BlockService.prototype._decodeTimestampBlockValue = function(buffer) {
TimestampService.prototype._decodeTimestampBlockValue = function(buffer) {
return buffer.toString('hex');
};
module.exports = BlockService;
module.exports = TimestampService;

View File

@ -18,6 +18,7 @@ var utils = require('./wallet-api/utils');
function TransactionService(options) {
BaseService.call(this, options);
this.concurrency = options.concurrency || 20;
/* upon initialization of the mempool, only txids are obtained from
* a trusted bitcoin node (the bitcoind service's rpchost or p2p option)
* Since, the mempool is very temporal, I see no reason to take the
@ -27,7 +28,7 @@ function TransactionService(options) {
* then call to the trusted bitcoind will take place and the result set.
*/
this._mempool = LRU({
max: utils.parseByteCount(options.maxMemPoolSize) || 100 * 1024 * 1024, //100MB
max: 100 * 1024 * 1024, //100MB
length: function(tx) { if (tx) { return tx.toBuffer().length; } }
});
this.currentTransactions = {};