Added next hash support in header service.

This commit is contained in:
Chris Kleeschulte 2017-09-25 21:21:05 -04:00
parent b8e73ae238
commit f5ad8b89fb
No known key found for this signature in database
GPG Key ID: 33195D27EF6BDB7F
8 changed files with 246 additions and 125 deletions

135
: Normal file
View File

@ -0,0 +1,135 @@
'use strict';
var BaseService = require('../../service');
var util = require('util');
var Encoding = require('./encoding');
var log = require('../..').log;
var MempoolService = function(options) {
BaseService.call(this, options);
this._subscriptions = {};
this._subscriptions.transaction = [];
this._db = this.node.services.db;
this._p2p = this.node.services.p2p;
};
util.inherits(MempoolService, BaseService);
MempoolService.dependencies = ['db'];
MempoolService.prototype.getAPIMethods = function() {
var methods = [
['getMempoolTransaction', this, this.getMempoolTransaction, 1]
];
return methods;
};
MempoolService.prototype.start = function(callback) {
var self = this;
self._db.getPrefix(self.name, function(err, prefix) {
if(err) {
return callback(err);
}
self._encoding = new Encoding(prefix);
self._startSubscriptions();
callback();
});
};
MempoolService.prototype.onReorg = function(args, callback) {
var oldBlockList = args[1];
var removalOps = [];
for(var i = 0; i < oldBlockList.length; i++) {
var block = oldBlockList[i];
for(var j = 0; j < block.txs.length; j++) {
var tx = block.txs[j];
var key = this._encoding.encodeMempoolTransactionKey(tx.txid());
var value = this._encoding.encodeMempoolTransactionValue(tx);
removalOps.push({
type: 'put',
key: key,
value: value
});
}
}
setImmediate(function() {
callback(null, removalOps);
});
};
MempoolService.prototype._startSubscriptions = function() {
var self = this;
if (self._subscribed) {
return;
}
self._subscribed = true;
if (!self._bus) {
self._bus = self.node.openBus({remoteAddress: 'localhost-mempool'});
}
self._bus.on('p2p/transaction', self._onTransaction.bind(self));
self._bus.subscribe('p2p/transaction');
self._p2p.on('bestHeight', function() {
log.info('Mempool Service: Geting mempool from peer.');
self._p2p.getMempool();
});
};
MempoolService.prototype.onBlock = function(block, callback) {
// remove this block's txs from mempool
var self = this;
var ops = block.txs.map(function(tx) {
return {
type: 'del',
key: self._encoding.encodeMempoolTransactionKey(tx.txid())
};
});
callback(null, ops);
};
MempoolService.prototype._onTransaction = function(tx) {
this._db.put(this._encoding.encodeMempoolTransactionKey(tx.txid()),
this._encoding.encodeMempoolTransactionValue(tx));
};
MempoolService.prototype.getMempoolTransaction = function(txid, callback) {
var self = this;
self._db.get(self._encoding.encodeMempoolTransactionKey(txid), function(err, tx) {
if (err) {
return callback(err);
}
if (!tx) {
return callback();
}
callback(null, self._encoding.decodeMempoolTransactionValue(tx));
});
};
MempoolService.prototype.stop = function(callback) {
callback();
};
module.exports = MempoolService;

View File

@ -518,7 +518,9 @@ AddressService.prototype._removeOutput = function(output, tx, block, index, call
key: self._encoding.encodeUtxoIndexKey(address, tx.txid(), index)
});
callback(null, removalOps);
setImmediate(function() {
callback(null, removalOps);
});
};
AddressService.prototype.onReorg = function(args, callback) {
@ -558,7 +560,9 @@ AddressService.prototype.onBlock = function(block, callback) {
operations = _.flattenDeep(operations);
callback(null, operations);
setImmediate(function() {
callback(null, operations);
});
};
AddressService.prototype._processInput = function(tx, input, index, opts) {

View File

@ -22,14 +22,11 @@ var BlockService = function(options) {
this._header = this.node.services.header;
this._timestamp = this.node.services.timestamp;
this._subscriptions = {};
this._subscriptions.block = [];
this._blockCount = 0;
this.GENESIS_HASH = constants.BITCOIN_GENESIS_HASH[this.node.network];
this._initialSync = true;
this._reorgBackToBlock = null; // use this to rewind your indexes to a specific point by height or hash
this._timeOfLastBestBlockReport = Date.now() - 30000;
this._timeOfLastBlockReport = Date.now() - 30000;
};
inherits(BlockService, BaseService);
@ -147,19 +144,6 @@ BlockService.prototype.getBlockOverview = function(hash, callback) {
};
BlockService.prototype.getPublishEvents = function() {
return [
{
name: 'block/block',
scope: this,
subscribe: this.subscribe.bind(this, 'block'),
unsubscribe: this.unsubscribe.bind(this, 'block')
}
];
};
BlockService.prototype.getRawBlock = function(hash, callback) {
this.getBlock(hash, function(err, block) {
if(err) {
@ -279,7 +263,7 @@ BlockService.prototype._resetTip = function(callback) {
}
if (!_block) {
log.warn('Block Service: block: ' + header.hash + ' was not found, proceeding to older blocks.');
log.debug('Block Service: block: ' + header.hash + ' was not found, proceeding to older blocks.');
}
block = _block;
@ -287,7 +271,7 @@ BlockService.prototype._resetTip = function(callback) {
assert(header, 'Header not found for reset.');
if (!block) {
log.warn('Block Service: trying block: ' + header.hash);
log.debug('Block Service: trying block: ' + header.hash);
}
next();
@ -372,11 +356,6 @@ BlockService.prototype.stop = function(callback) {
setImmediate(callback);
};
BlockService.prototype.subscribe = function(name, emitter) {
this._subscriptions[name].push(emitter);
log.info(emitter.remoteAddress, 'subscribe:', 'block/' + name, 'total:', this._subscriptions[name].length);
};
BlockService.prototype._queueBlock = function(block) {
var self = this;
@ -387,8 +366,8 @@ BlockService.prototype._queueBlock = function(block) {
return self._handleError(err);
}
if (Date.now() - self._timeOfLastBestBlockReport > 30000) { // 30 seconds
self._timeOfLastBestBlockReport = Date.now();
if (Date.now() - self._timeOfLastBlockReport > 30000) { // 30 seconds
self._timeOfLastBlockReport = Date.now();
log.info('Block Service: The best block hash is: ' + self._tip.hash +
' at height: ' + self._tip.height);
}
@ -410,26 +389,8 @@ BlockService.prototype.syncPercentage = function(callback) {
callback(null, this._syncPercentage());
};
BlockService.prototype.unsubscribe = function(name, emitter) {
var index = this._subscriptions[name].indexOf(emitter);
if (index > -1) {
this._subscriptions[name].splice(index, 1);
}
log.info(emitter.remoteAddress, 'unsubscribe:', 'block/' + name, 'total:', this._subscriptions[name].length);
};
// --- start private prototype functions
BlockService.prototype._broadcast = function(subscribers, name, entity) {
for (var i = 0; i < subscribers.length; i++) {
subscribers[i].emit(name, entity);
}
};
BlockService.prototype._detectReorg = function(block) {
return bcoin.util.revHex(block.prevBlock) !== this._tip.hash;
};
@ -458,7 +419,7 @@ BlockService.prototype._getHash = function(blockArg, callback) {
if (utils.isHeight(blockArg)) {
this._header.getHeaderByHeight(blockArg, function(err, header) {
this._header.getBlockHeader(blockArg, function(err, header) {
if(err) {
return callback(err);
@ -737,7 +698,7 @@ BlockService.prototype._saveBlock = function(block, callback) {
BlockService.prototype._handleError = function(err) {
if (!this.node.stopping) {
log.error('Block Service: ' + err);
log.error(err);
return this.node.stop();
}
};
@ -807,7 +768,8 @@ BlockService.prototype._sync = function() {
return;
}
if (self._tip.height % 144 === 0) {
if (self._tip.height % 144 === 0 || Date.now() - self._timeOfLastBlockReport > 10000) {
self._timeOfLastBlockReport = Date.now();
self._logProgress();
}

View File

@ -43,7 +43,19 @@ Encoding.prototype.encodeHeaderValue = function(header) {
var heightBuf = new Buffer(4);
heightBuf.writeUInt32BE(header.height);
var chainworkBuf = new Buffer(header.chainwork, 'hex');
return Buffer.concat([hashBuf, versionBuf, prevHash, merkleRoot, tsBuf, bitsBuf, nonceBuf, heightBuf, chainworkBuf ]);
var nextHash = new Buffer(header.nextHash || new Array(65).join('0'), 'hex');
return Buffer.concat([
hashBuf,
versionBuf,
prevHash,
merkleRoot,
tsBuf,
bitsBuf,
nonceBuf,
heightBuf,
chainworkBuf,
nextHash
]);
};
Encoding.prototype.decodeHeaderValue = function(buffer) {
@ -56,6 +68,7 @@ Encoding.prototype.decodeHeaderValue = function(buffer) {
var nonce = buffer.readUInt32BE(108);
var height = buffer.readUInt32BE(112);
var chainwork = buffer.slice(116).toString('hex');
var nextHash = buffer.slice(116 + 32).toString('hex');
return {
hash: hash,
version: version,
@ -65,7 +78,8 @@ Encoding.prototype.decodeHeaderValue = function(buffer) {
bits: bits,
nonce: nonce,
height: height,
chainwork: chainwork
chainwork: chainwork,
nextHash: nextHash
};
};

View File

@ -333,7 +333,9 @@ HeaderService.prototype._syncBlock = function(block, callback) {
log.debug('Header Service: new block: ' + block.rhash());
self._saveHeaders(self._onHeader(header), callback);
var dbOps = self._getDBOpForLastHeader(header);
dbOps = dbOps.concat(self._onHeader(header));
self._saveHeaders(dbOps, callback);
};
HeaderService.prototype._broadcast = function(block) {
@ -374,6 +376,49 @@ HeaderService.prototype._onHeader = function(header) {
};
HeaderService.prototype._transformHeaders = function(headers) {
var ret = [];
for(var i = 0; i < headers.length; i++) {
var hdr = headers[i].toObject();
if (headers[i+1]) {
hdr.nextHash = headers[i+1].hash;
}
ret.push(hdr);
}
return ret;
};
HeaderService.prototype._getDBOpForLastHeader = function(nextHeader) {
// we need to apply the next hash value on the last-processed header
// delete operation for the last header already in the db.
// then put operation for the updated last header (with the next hash)
this._lastHeader.nextHash = nextHeader.hash;
var keyHash = this._encoding.encodeHeaderHashKey(this._lastHeader.hash);
var keyHeight = this._encoding.encodeHeaderHeightKey(this._lastHeader.hash);
var value = this._encoding.encodeHeaderValue(this._lastHeader);
return [
{
type: 'del',
key: keyHash
},
{
type: 'del',
key: keyHeight
},
{
type: 'put',
key: keyHash,
value: value
},
{
type: 'put',
key: keyHeight,
value: value
}
];
};
HeaderService.prototype._onHeaders = function(headers) {
var self = this;
@ -385,13 +430,13 @@ HeaderService.prototype._onHeaders = function(headers) {
log.debug('Header Service: Received: ' + headers.length + ' header(s).');
var dbOps = [];
var dbOps = self._getDBOpForLastHeader(headers[0]);
for(var i = 0; i < headers.length; i++) {
var transformedHeaders = self._transformHeaders(headers);
var header = headers[i];
for(var i = 0; i < transformedHeaders.length; i++) {
header = header.toObject();
var header = transformedHeaders[i];
assert(self._lastHeader.hash === header.prevHash, 'headers not in order: ' + self._lastHeader.hash +
' -and- ' + header.prevHash + ' Last header at height: ' + self._lastHeader.height);
@ -411,7 +456,7 @@ HeaderService.prototype._onHeaders = function(headers) {
};
HeaderService.prototype._handleError = function(err) {
log.error(err);
log.error('Header Service: ' + err);
this.node.stop();
};
@ -628,14 +673,14 @@ HeaderService.prototype._startSync = function() {
var self = this;
// remove all listeners
// ensure the blockProcessor is finished processing blocks (empty queue)
// then proceed with gathering new set(s) of headers
// if our tip height is less than the best height of this peer, then:
// 1. the peer is not fully synced.
// 2. the peer has reorg'ed and we need to handle this
// unsub from listening for blocks
// ensure the blockProcessor is finished processing blocks (empty queue)
// then proceed with gathering new set(s) of headers
self._initialSync = true;
log.debug('Header Service: starting sync routines, ensuring no pre-exiting subscriptions to p2p blocks.');
self._removeAllSubscriptions();
@ -664,7 +709,7 @@ HeaderService.prototype._startSync = function() {
});
}
// this should be very uncommon
// very uncommon! when a peer is not sync'ed or has reorg'ed
self._handleLowTipHeight();
});

View File

@ -2,14 +2,14 @@
var BaseService = require('../../service');
var util = require('util');
var Encoding = require('./encoding');
var index = require('../../');
var log = index.log;
var log = require('../..').log;
var MempoolService = function(options) {
BaseService.call(this, options);
this._subscriptions = {};
this._subscriptions.transaction = [];
this._db = this.node.services.db;
this._p2p = this.node.services.p2p;
};
util.inherits(MempoolService, BaseService);
@ -23,38 +23,6 @@ MempoolService.prototype.getAPIMethods = function() {
return methods;
};
MempoolService.prototype.getPublishEvents = function() {
return [
{
name: 'mempool/transaction',
scope: this,
subscribe: this.subscribe.bind(this, 'transaction'),
unsubscribe: this.unsubscribe.bind(this, 'transaction')
}
];
};
MempoolService.prototype.subscribe = function(name, emitter) {
this._subscriptions[name].push(emitter);
log.info(emitter.remoteAddress, 'subscribe:', 'mempool/' + name, 'total:', this._subscriptions[name].length);
};
MempoolService.prototype.unsubscribe = function(name, emitter) {
var index = this._subscriptions[name].indexOf(emitter);
if (index > -1) {
this._subscriptions[name].splice(index, 1);
}
log.info(emitter.remoteAddress, 'unsubscribe:', 'mempool/' + name, 'total:', this._subscriptions[name].length);
};
MempoolService.prototype.start = function(callback) {
var self = this;
@ -64,6 +32,7 @@ MempoolService.prototype.start = function(callback) {
}
self._encoding = new Encoding(prefix);
self._startSubscriptions();
callback();
});
};
@ -100,17 +69,24 @@ MempoolService.prototype.onReorg = function(args, callback) {
MempoolService.prototype._startSubscriptions = function() {
if (this._subscribed) {
var self = this;
if (self._subscribed) {
return;
}
this._subscribed = true;
if (!this._bus) {
this._bus = this.node.openBus({remoteAddress: 'localhost-mempool'});
self._subscribed = true;
if (!self._bus) {
self._bus = self.node.openBus({remoteAddress: 'localhost-mempool'});
}
this._bus.on('p2p/transaction', this._onTransaction.bind(this));
this._bus.subscribe('p2p/transaction');
self._bus.on('p2p/transaction', self._onTransaction.bind(self));
self._bus.subscribe('p2p/transaction');
self._p2p.on('bestHeight', function() {
log.info('Mempool Service: Geting mempool from peer.');
self._p2p.getMempool();
});
};
MempoolService.prototype.onBlock = function(block, callback) {

View File

@ -68,14 +68,9 @@ P2P.prototype.getHeaders = function(filter) {
};
P2P.prototype.getMempool = function(filter) {
P2P.prototype.getMempool = function() {
var peer = this._getPeer();
this._setResourceFilter(filter, 'mempool');
peer.sendMessage(this.messages.MemPool());
};
P2P.prototype.getPublishEvents = function() {
@ -347,10 +342,7 @@ P2P.prototype._onPeerReady = function(peer, addr) {
P2P.prototype._onPeerTx = function(peer, message) {
var filteredMessage = this._applyMempoolFilter(message);
if (filteredMessage) {
this._broadcast(this.subscriptions.transaction, 'p2p/transaction', message.transaction);
}
this._broadcast(this.subscriptions.transaction, 'p2p/transaction', message.transaction);
};
P2P.prototype._removePeer = function(peer) {
@ -369,20 +361,13 @@ P2P.prototype._setListeners = function() {
self.node.on('ready', self._connect.bind(self));
};
P2P.prototype._setResourceFilter = function(filter, resource) {
P2P.prototype._setResourceFilter = function(filter) {
if (resource === 'headers' || resource === 'blocks') {
assert(filter && filter.startHash, 'A "startHash" field is required to retrieve headers or blocks');
if (!filter.endHash) {
filter.endHash = 0;
}
return { starts: [filter.startHash], stop: filter.endHash };
}
if (resource === 'mempool') {
this._mempoolFilter = filter;
return;
assert(filter && filter.startHash, 'A "startHash" field is required to retrieve headers or blocks');
if (!filter.endHash) {
filter.endHash = 0;
}
return { starts: [filter.startHash], stop: filter.endHash };
};

View File

@ -225,7 +225,7 @@ TransactionService.prototype._getInputValues = function(tx, options, callback) {
self._getTransaction(input.prevout.txid(), options, function(err, txid, _tx) {
if (err || !_tx) {
return next(err || new Error('tx not found for tx id: ' + input.prevout.txid()));
return next(err || new Error('Transaction Service: tx not found for tx id: ' + input.prevout.txid()));
}
var output = _tx.outputs[outputIndex];