This commit is contained in:
Chris Kleeschulte 2017-07-24 20:51:48 -04:00
parent d426169f33
commit c7c9052034
4 changed files with 152 additions and 55 deletions

View File

@ -9,7 +9,6 @@ var log = index.log;
var shuttingDown = false; var shuttingDown = false;
var fs = require('fs'); var fs = require('fs');
function start(options) { function start(options) {
var fullConfig = _.clone(options.config); var fullConfig = _.clone(options.config);

View File

@ -26,6 +26,7 @@ var BlockService = function(options) {
this._subscriptions.block = []; this._subscriptions.block = [];
this._subscriptions.reorg = []; this._subscriptions.reorg = [];
this._unprocessedBlocks = [];
// in-memory full/raw block cache // in-memory full/raw block cache
this._blockQueue = LRU({ this._blockQueue = LRU({
max: 50 * (1 * 1024 * 1024), // 50 MB of blocks, max: 50 * (1 * 1024 * 1024), // 50 MB of blocks,
@ -48,7 +49,7 @@ inherits(BlockService, BaseService);
BlockService.dependencies = [ 'p2p', 'db', 'header' ]; BlockService.dependencies = [ 'p2p', 'db', 'header' ];
BlockService.MAX_BLOCKS = 10; BlockService.MAX_BLOCKS = 1;
// --- public prototype functions // --- public prototype functions
BlockService.prototype.getAPIMethods = function() { BlockService.prototype.getAPIMethods = function() {
@ -201,24 +202,6 @@ BlockService.prototype._primeBlockQueue = function(callback) {
}, callback); }, callback);
}; };
// upon startup, has the chain reorg'ed from where we were when we shutdown?
BlockService.prototype._detectInitialChainState = function(headers) {
if (this._tip.height === 0) {
return;
}
var index = this._tip.height - 1;
var record = headers.getIndex(index);
if (record.hash !== this._tip.hash) {
// reorg! we don't yet have the blocks to reorg to, so we'll rewind the chain back to
// to common ancestor, set the tip to the common ancestor and start the sync
this._chainTips.push(record.hash);
this._handleReorg(record.hash);
}
};
BlockService.prototype.stop = function(callback) { BlockService.prototype.stop = function(callback) {
setImmediate(callback); setImmediate(callback);
}; };
@ -542,27 +525,83 @@ BlockService.prototype._isOutOfOrder = function(block) {
BlockService.prototype._onAllHeaders = function(headers) { BlockService.prototype._onAllHeaders = function(headers) {
this._bestHeight = headers.size; this._bestHeight = headers.size;
this._detectInitialChainState(headers);
this._startSync(); this._startSync();
}; };
BlockService.prototype._processBlock = function() {
var operations = [];
var services = this.node.services;
var block = this._unprocessedBlocks.shift();
var self = this;
async.eachSeries(
services,
function(mod, next) {
if(mod.onBlock) {
mod.onBlock.call(mod, block, function(err, ops) {
if (err) {
return next(err);
}
if (ops) {
operations = operations.concat(ops);
}
next();
});
} else {
setImmediate(next);
}
},
function(err) {
if (err) {
log.error(err);
self.node.stop();
return;
}
self._tip.height++;
self._tip.hash = block.rhash();
var tipOps = utils.encodeTip(self._tip, self.name);
operations.push({
type: 'put',
key: tipOps.key,
value: tipOps.value
});
self._db._store.batch(operations, function(err) {
if (err) {
log.error(err);
self.node.stop();
return;
}
self._sync();
});
}
);
};
BlockService.prototype._onBlock = function(block) { BlockService.prototype._onBlock = function(block) {
this._unprocessedBlocks.push(block);
this._processBlock();
return;
// 1. have we already seen this block? // 1. have we already seen this block?
if (this._blockAlreadyProcessed(block)) { //if (this._blockAlreadyProcessed(block)) {
return; // return;
} //}
// 2. log the reception // 2. log the reception
log.debug('New block received: ' + block.rhash()); //log.debug('New block received: ' + block.rhash());
// 3. store the block for safe keeping // 3. store the block for safe keeping
this._cacheBlock(block); //this._cacheBlock(block);
// 4. don't process any more blocks if we are currently in a reorg // 4. don't process any more blocks if we are currently in a reorg
if (this._reorging) { //if (this._reorging) {
return; // return;
} //}
// 5. determine block state, reorg, outoforder, normal // 5. determine block state, reorg, outoforder, normal
var blockState = this._determineBlockState(block); var blockState = this._determineBlockState(block);
@ -575,12 +614,14 @@ BlockService.prototype._onBlock = function(block) {
// nothing to do, but wait until ancestor blocks come in // nothing to do, but wait until ancestor blocks come in
break; break;
case 'reorg': case 'reorg':
this._handleReorg(block.hash); //this._handleReorg(block.hash);
break; //break;
default: default:
// send all unsent blocks now that we have a complete chain // send all unsent blocks now that we have a complete chain
this._unprocessedBlocks.push(block);
this._sendDelta(); this._processBlock();
return;
//this._sendDelta();
break; break;
} }
}; };
@ -614,6 +655,7 @@ BlockService.prototype._selectActiveChain = function() {
BlockService.prototype._sendDelta = function() { BlockService.prototype._sendDelta = function() {
// when this function is called, we know, for sure, that we have a complete chain of unsent block(s). // when this function is called, we know, for sure, that we have a complete chain of unsent block(s).
// our task is to send all blocks between active chain's tip and our tip. // our task is to send all blocks between active chain's tip and our tip.
var activeChainTip = this._selectActiveChain(); var activeChainTip = this._selectActiveChain();
@ -665,13 +707,12 @@ BlockService.prototype._startSync = function() {
return; return;
} }
log.info('Gathering: ' + this._numNeeded + ' ' + 'block(s) from the peer-to-peer network.'); log.info('Gathering: ' + this._numNeeded + ' block(s) from the peer-to-peer network.');
this._sync(); this._sync();
}; };
BlockService.prototype._startSubscriptions = function() { BlockService.prototype._startSubscriptions = function() {
if (this._subscribed) { if (this._subscribed) {
return; return;
} }
@ -681,8 +722,8 @@ BlockService.prototype._startSubscriptions = function() {
this._bus = this.node.openBus({remoteAddress: 'localhost-block'}); this._bus = this.node.openBus({remoteAddress: 'localhost-block'});
} }
this._bus.on('p2p/block', this._onBlock.bind(this)); this._bus.on('header/block', this._onBlock.bind(this));
this._bus.subscribe('p2p/block'); this._bus.subscribe('header/block');
}; };
BlockService.prototype._sync = function() { BlockService.prototype._sync = function() {
@ -693,9 +734,10 @@ BlockService.prototype._sync = function() {
if (this._tip.height < size) { if (this._tip.height < size) {
log.info('Blocks download progress: ' + this._tip.height + '/' + log.info('Blocks download progress: ' + this._tip.height + '/' +
this._numNeeded + ' (' + (this._tip.height / this._numNeeded*100).toFixed(2) + '%)'); this._numNeeded + ' (' + (this._tip.height / this._bestHeight*100).toFixed(2) + '%)');
var endHash = headers.getIndex( Math.min(this._tip.height + BlockService.MAX_BLOCKS, size) ).hash; var end = headers.getIndex(Math.min(this._tip.height + BlockService.MAX_BLOCKS, size));
var endHash = end ? end.hash : null;
this._p2p.getBlocks({ startHash: this._tip.hash, endHash: endHash }); this._p2p.getBlocks({ startHash: this._tip.hash, endHash: endHash });
return; return;

View File

@ -9,6 +9,7 @@ var utils = require('../../utils');
var async = require('async'); var async = require('async');
var BN = require('bn.js'); var BN = require('bn.js');
var consensus = require('bcoin').consensus; var consensus = require('bcoin').consensus;
var assert = require('assert');
var HeaderService = function(options) { var HeaderService = function(options) {
@ -19,6 +20,9 @@ var HeaderService = function(options) {
this._db = this.node.services.db; this._db = this.node.services.db;
this._checkpoint = options.checkpoint || 0; // the # of header to look back on boot this._checkpoint = options.checkpoint || 0; // the # of header to look back on boot
this._headers = new utils.SimpleMap(); this._headers = new utils.SimpleMap();
this.subscriptions = {};
this.subscriptions.block = [];
}; };
inherits(HeaderService, BaseService); inherits(HeaderService, BaseService);
@ -29,6 +33,26 @@ HeaderService.MAX_CHAINWORK = new BN(1).ushln(256);
HeaderService.STARTING_CHAINWORK = '0000000000000000000000000000000000000000000000000000000100010001'; HeaderService.STARTING_CHAINWORK = '0000000000000000000000000000000000000000000000000000000100010001';
// --- public prototype functions // --- public prototype functions
HeaderService.prototype.subscribe = function(name, emitter) {
console.log(this.subscriptions, name);
this.subscriptions[name].push(emitter);
log.info(emitter.remoteAddress, 'subscribe:', 'header/' + name, 'total:', this.subscriptions[name].length);
};
HeaderService.prototype.unsubscribe = function(name, emitter) {
var index = this.subscriptions[name].indexOf(emitter);
if (index > -1) {
this.subscriptions[name].splice(index, 1);
}
log.info(emitter.remoteAddress, 'unsubscribe:', 'header/' + name, 'total:', this.subscriptions[name].length);
};
HeaderService.prototype.getAPIMethods = function() { HeaderService.prototype.getAPIMethods = function() {
var methods = [ var methods = [
@ -110,10 +134,32 @@ HeaderService.prototype._startSubscriptions = function() {
}; };
HeaderService.prototype.getPublishEvents = function() {
return [
{
name: 'header/block',
scope: this,
subscribe: this.subscribe.bind(this, 'header'),
unsubscribe: this.unsubscribe.bind(this, 'header')
}
];
};
HeaderService.prototype._onBlock = function(block) { HeaderService.prototype._onBlock = function(block) {
// we just want the header to keep a running list // we just want the header to keep a running list
log.debug('Header Service: new block: ' + block.rhash()); var hash = block.rhash();
log.debug('Header Service: new block: ' + hash);
if (this._headers.get(hash)) {
for (var i = 0; i < this.subscriptions.length; i++) {
this.subscriptions[i].emit('header/block', block);
}
return;
}
log.debug('Header Service: new block has arrived: ' + hash);
var header = block.toHeaders().toJSON(); var header = block.toHeaders().toJSON();
header.timestamp = header.ts; header.timestamp = header.ts;
header.prevHash = header.prevBlock; header.prevHash = header.prevBlock;
@ -122,6 +168,7 @@ HeaderService.prototype._onBlock = function(block) {
HeaderService.prototype._onHeaders = function(headers, convert) { HeaderService.prototype._onHeaders = function(headers, convert) {
var self = this;
if (!headers || headers.length < 1) { if (!headers || headers.length < 1) {
return; return;
} }
@ -136,28 +183,36 @@ HeaderService.prototype._onHeaders = function(headers, convert) {
}); });
} }
var runningHeight = this._tip.height; var runningHeight = self._tip.height;
var prevHeader = this._headers.getLastIndex(); var prevHeader = self._headers.getLastIndex();
var dbOps = []; var dbOps = [];
for(var i = 0; i < headers.length; i++) { for(var i = 0; i < headers.length; i++) {
var header = headers[i]; var header = headers[i];
header.height = ++runningHeight; header.height = ++runningHeight;
header.chainwork = this._getChainwork(header, prevHeader).toString(16, 32); header.chainwork = self._getChainwork(header, prevHeader).toString(16, 32);
dbOps.push({ dbOps.push({
type: 'put', type: 'put',
key: this._encoding.encodeHeaderKey(header.hash), key: self._encoding.encodeHeaderKey(header.height, header.hash),
value: this._encoding.encodeHeaderValue(header) value: self._encoding.encodeHeaderValue(header)
}); });
prevHeader = header; prevHeader = header;
this._headers.set(header.hash, header); self._headers.set(header.hash, header);
} }
this._startingHash = this._tip.hash = newHeaders[newHeaders.length - 1].hash; self._startingHash = self._tip.hash = newHeaders[newHeaders.length - 1].hash;
this._tip.height = this._tip.height + newHeaders.length; self._tip.height = self._tip.height + newHeaders.length;
this._db.batch(dbOps); var tipOps = utils.encodeTip(self._tip, self.name);
this._sync(); dbOps.push({
type: 'put',
key: tipOps.key,
value: tipOps.value
});
self._db._store.batch(dbOps, function() {
self._sync();
});
}; };
@ -168,6 +223,7 @@ HeaderService.prototype._setListeners = function() {
}; };
HeaderService.prototype._onBestHeight = function(height) { HeaderService.prototype._onBestHeight = function(height) {
assert(height >= this._tip.height, 'Our peer does not seem to be fully synced.');
log.debug('Header Service: Best Height is: ' + height); log.debug('Header Service: Best Height is: ' + height);
this._bestHeight = height; this._bestHeight = height;
this._startSync(); this._startSync();

View File

@ -42,17 +42,17 @@ MempoolService.prototype._startSubscriptions = function() {
this._subscribed = true; this._subscribed = true;
if (!this._bus) { if (!this._bus) {
this._bus = this.node.openBus({remoteAddress: 'localhost'}); this._bus = this.node.openBus({remoteAddress: 'localhost-mempool'});
} }
this._bus.on('p2p/block', this._onBlock.bind(this)); //this._bus.on('p2p/block', this._onBlock.bind(this));
this._bus.on('p2p/transaction', this._onTransaction.bind(this)); this._bus.on('p2p/transaction', this._onTransaction.bind(this));
this._bus.subscribe('p2p/block'); //this._bus.subscribe('p2p/block');
this._bus.subscribe('p2p/transaction'); this._bus.subscribe('p2p/transaction');
}; };
MempoolService.prototype._onBlock = function(block) { MempoolService.prototype.onBlock = function(block, callback) {
// remove this block's txs from mempool // remove this block's txs from mempool
var self = this; var self = this;
var ops = block.txs.map(function(tx) { var ops = block.txs.map(function(tx) {
@ -61,7 +61,7 @@ MempoolService.prototype._onBlock = function(block) {
key: self._encoding.encodeMempoolTransactionKey(tx.txid()) key: self._encoding.encodeMempoolTransactionKey(tx.txid())
}; };
}); });
self._db.batch(ops); setImmediate(callback);
}; };
MempoolService.prototype._onTransaction = function(tx) { MempoolService.prototype._onTransaction = function(tx) {