From f5ad8b89fb8778b4101e06d6998167bc022cc19a Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Mon, 25 Sep 2017 21:21:05 -0400 Subject: [PATCH] Added next hash support in header service. --- : | 135 ++++++++++++++++++++++++++++++ lib/services/address/index.js | 8 +- lib/services/block/index.js | 56 ++----------- lib/services/header/encoding.js | 18 +++- lib/services/header/index.js | 67 ++++++++++++--- lib/services/mempool/index.js | 56 ++++--------- lib/services/p2p/index.js | 29 ++----- lib/services/transaction/index.js | 2 +- 8 files changed, 246 insertions(+), 125 deletions(-) create mode 100644 : diff --git a/: b/: new file mode 100644 index 00000000..be9a51b5 --- /dev/null +++ b/: @@ -0,0 +1,135 @@ +'use strict'; +var BaseService = require('../../service'); +var util = require('util'); +var Encoding = require('./encoding'); +var log = require('../..').log; + +var MempoolService = function(options) { + BaseService.call(this, options); + this._subscriptions = {}; + this._subscriptions.transaction = []; + this._db = this.node.services.db; + this._p2p = this.node.services.p2p; +}; + +util.inherits(MempoolService, BaseService); + +MempoolService.dependencies = ['db']; + +MempoolService.prototype.getAPIMethods = function() { + var methods = [ + ['getMempoolTransaction', this, this.getMempoolTransaction, 1] + ]; + return methods; +}; + +MempoolService.prototype.start = function(callback) { + var self = this; + + self._db.getPrefix(self.name, function(err, prefix) { + if(err) { + return callback(err); + } + self._encoding = new Encoding(prefix); + self._startSubscriptions(); + + callback(); + }); +}; + +MempoolService.prototype.onReorg = function(args, callback) { + + var oldBlockList = args[1]; + + var removalOps = []; + + for(var i = 0; i < oldBlockList.length; i++) { + + var block = oldBlockList[i]; + + for(var j = 0; j < block.txs.length; j++) { + + var tx = block.txs[j]; + var key = this._encoding.encodeMempoolTransactionKey(tx.txid()); + var value = this._encoding.encodeMempoolTransactionValue(tx); + + removalOps.push({ + type: 'put', + key: key, + value: value + }); + + } + } + + setImmediate(function() { + callback(null, removalOps); + }); +}; + +MempoolService.prototype._startSubscriptions = function() { + + var self = this; + if (self._subscribed) { + return; + } + + self._subscribed = true; + if (!self._bus) { + self._bus = self.node.openBus({remoteAddress: 'localhost-mempool'}); + } + + self._bus.on('p2p/transaction', self._onTransaction.bind(self)); + self._bus.subscribe('p2p/transaction'); + + self._p2p.on('bestHeight', function() { + log.info('Mempool Service: Geting mempool from peer.'); + self._p2p.getMempool(); + }); + +}; + +MempoolService.prototype.onBlock = function(block, callback) { + + // remove this block's txs from mempool + var self = this; + var ops = block.txs.map(function(tx) { + return { + type: 'del', + key: self._encoding.encodeMempoolTransactionKey(tx.txid()) + }; + }); + callback(null, ops); + +}; + +MempoolService.prototype._onTransaction = function(tx) { + this._db.put(this._encoding.encodeMempoolTransactionKey(tx.txid()), + this._encoding.encodeMempoolTransactionValue(tx)); +}; + +MempoolService.prototype.getMempoolTransaction = function(txid, callback) { + + var self = this; + + self._db.get(self._encoding.encodeMempoolTransactionKey(txid), function(err, tx) { + + if (err) { + return callback(err); + } + + if (!tx) { + return callback(); + } + + callback(null, self._encoding.decodeMempoolTransactionValue(tx)); + + }); + +}; + +MempoolService.prototype.stop = function(callback) { + callback(); +}; + +module.exports = MempoolService; diff --git a/lib/services/address/index.js b/lib/services/address/index.js index 36e46741..02985431 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -518,7 +518,9 @@ AddressService.prototype._removeOutput = function(output, tx, block, index, call key: self._encoding.encodeUtxoIndexKey(address, tx.txid(), index) }); - callback(null, removalOps); + setImmediate(function() { + callback(null, removalOps); + }); }; AddressService.prototype.onReorg = function(args, callback) { @@ -558,7 +560,9 @@ AddressService.prototype.onBlock = function(block, callback) { operations = _.flattenDeep(operations); - callback(null, operations); + setImmediate(function() { + callback(null, operations); + }); }; AddressService.prototype._processInput = function(tx, input, index, opts) { diff --git a/lib/services/block/index.js b/lib/services/block/index.js index 79786dbc..a94cdfc0 100644 --- a/lib/services/block/index.js +++ b/lib/services/block/index.js @@ -22,14 +22,11 @@ var BlockService = function(options) { this._header = this.node.services.header; this._timestamp = this.node.services.timestamp; - this._subscriptions = {}; - this._subscriptions.block = []; - this._blockCount = 0; this.GENESIS_HASH = constants.BITCOIN_GENESIS_HASH[this.node.network]; this._initialSync = true; this._reorgBackToBlock = null; // use this to rewind your indexes to a specific point by height or hash - this._timeOfLastBestBlockReport = Date.now() - 30000; + this._timeOfLastBlockReport = Date.now() - 30000; }; inherits(BlockService, BaseService); @@ -147,19 +144,6 @@ BlockService.prototype.getBlockOverview = function(hash, callback) { }; -BlockService.prototype.getPublishEvents = function() { - - return [ - { - name: 'block/block', - scope: this, - subscribe: this.subscribe.bind(this, 'block'), - unsubscribe: this.unsubscribe.bind(this, 'block') - } - ]; - -}; - BlockService.prototype.getRawBlock = function(hash, callback) { this.getBlock(hash, function(err, block) { if(err) { @@ -279,7 +263,7 @@ BlockService.prototype._resetTip = function(callback) { } if (!_block) { - log.warn('Block Service: block: ' + header.hash + ' was not found, proceeding to older blocks.'); + log.debug('Block Service: block: ' + header.hash + ' was not found, proceeding to older blocks.'); } block = _block; @@ -287,7 +271,7 @@ BlockService.prototype._resetTip = function(callback) { assert(header, 'Header not found for reset.'); if (!block) { - log.warn('Block Service: trying block: ' + header.hash); + log.debug('Block Service: trying block: ' + header.hash); } next(); @@ -372,11 +356,6 @@ BlockService.prototype.stop = function(callback) { setImmediate(callback); }; -BlockService.prototype.subscribe = function(name, emitter) { - this._subscriptions[name].push(emitter); - log.info(emitter.remoteAddress, 'subscribe:', 'block/' + name, 'total:', this._subscriptions[name].length); -}; - BlockService.prototype._queueBlock = function(block) { var self = this; @@ -387,8 +366,8 @@ BlockService.prototype._queueBlock = function(block) { return self._handleError(err); } - if (Date.now() - self._timeOfLastBestBlockReport > 30000) { // 30 seconds - self._timeOfLastBestBlockReport = Date.now(); + if (Date.now() - self._timeOfLastBlockReport > 30000) { // 30 seconds + self._timeOfLastBlockReport = Date.now(); log.info('Block Service: The best block hash is: ' + self._tip.hash + ' at height: ' + self._tip.height); } @@ -410,26 +389,8 @@ BlockService.prototype.syncPercentage = function(callback) { callback(null, this._syncPercentage()); }; -BlockService.prototype.unsubscribe = function(name, emitter) { - - var index = this._subscriptions[name].indexOf(emitter); - - if (index > -1) { - this._subscriptions[name].splice(index, 1); - } - - log.info(emitter.remoteAddress, 'unsubscribe:', 'block/' + name, 'total:', this._subscriptions[name].length); - -}; - // --- start private prototype functions -BlockService.prototype._broadcast = function(subscribers, name, entity) { - for (var i = 0; i < subscribers.length; i++) { - subscribers[i].emit(name, entity); - } -}; - BlockService.prototype._detectReorg = function(block) { return bcoin.util.revHex(block.prevBlock) !== this._tip.hash; }; @@ -458,7 +419,7 @@ BlockService.prototype._getHash = function(blockArg, callback) { if (utils.isHeight(blockArg)) { - this._header.getHeaderByHeight(blockArg, function(err, header) { + this._header.getBlockHeader(blockArg, function(err, header) { if(err) { return callback(err); @@ -737,7 +698,7 @@ BlockService.prototype._saveBlock = function(block, callback) { BlockService.prototype._handleError = function(err) { if (!this.node.stopping) { - log.error('Block Service: ' + err); + log.error(err); return this.node.stop(); } }; @@ -807,7 +768,8 @@ BlockService.prototype._sync = function() { return; } - if (self._tip.height % 144 === 0) { + if (self._tip.height % 144 === 0 || Date.now() - self._timeOfLastBlockReport > 10000) { + self._timeOfLastBlockReport = Date.now(); self._logProgress(); } diff --git a/lib/services/header/encoding.js b/lib/services/header/encoding.js index 72878ea7..9a7ca77a 100644 --- a/lib/services/header/encoding.js +++ b/lib/services/header/encoding.js @@ -43,7 +43,19 @@ Encoding.prototype.encodeHeaderValue = function(header) { var heightBuf = new Buffer(4); heightBuf.writeUInt32BE(header.height); var chainworkBuf = new Buffer(header.chainwork, 'hex'); - return Buffer.concat([hashBuf, versionBuf, prevHash, merkleRoot, tsBuf, bitsBuf, nonceBuf, heightBuf, chainworkBuf ]); + var nextHash = new Buffer(header.nextHash || new Array(65).join('0'), 'hex'); + return Buffer.concat([ + hashBuf, + versionBuf, + prevHash, + merkleRoot, + tsBuf, + bitsBuf, + nonceBuf, + heightBuf, + chainworkBuf, + nextHash + ]); }; Encoding.prototype.decodeHeaderValue = function(buffer) { @@ -56,6 +68,7 @@ Encoding.prototype.decodeHeaderValue = function(buffer) { var nonce = buffer.readUInt32BE(108); var height = buffer.readUInt32BE(112); var chainwork = buffer.slice(116).toString('hex'); + var nextHash = buffer.slice(116 + 32).toString('hex'); return { hash: hash, version: version, @@ -65,7 +78,8 @@ Encoding.prototype.decodeHeaderValue = function(buffer) { bits: bits, nonce: nonce, height: height, - chainwork: chainwork + chainwork: chainwork, + nextHash: nextHash }; }; diff --git a/lib/services/header/index.js b/lib/services/header/index.js index de86d4a2..27d64ddc 100644 --- a/lib/services/header/index.js +++ b/lib/services/header/index.js @@ -333,7 +333,9 @@ HeaderService.prototype._syncBlock = function(block, callback) { log.debug('Header Service: new block: ' + block.rhash()); - self._saveHeaders(self._onHeader(header), callback); + var dbOps = self._getDBOpForLastHeader(header); + dbOps = dbOps.concat(self._onHeader(header)); + self._saveHeaders(dbOps, callback); }; HeaderService.prototype._broadcast = function(block) { @@ -374,6 +376,49 @@ HeaderService.prototype._onHeader = function(header) { }; +HeaderService.prototype._transformHeaders = function(headers) { + var ret = []; + for(var i = 0; i < headers.length; i++) { + var hdr = headers[i].toObject(); + if (headers[i+1]) { + hdr.nextHash = headers[i+1].hash; + } + ret.push(hdr); + } + return ret; +}; + +HeaderService.prototype._getDBOpForLastHeader = function(nextHeader) { + // we need to apply the next hash value on the last-processed header + + // delete operation for the last header already in the db. + // then put operation for the updated last header (with the next hash) + this._lastHeader.nextHash = nextHeader.hash; + var keyHash = this._encoding.encodeHeaderHashKey(this._lastHeader.hash); + var keyHeight = this._encoding.encodeHeaderHeightKey(this._lastHeader.hash); + var value = this._encoding.encodeHeaderValue(this._lastHeader); + return [ + { + type: 'del', + key: keyHash + }, + { + type: 'del', + key: keyHeight + }, + { + type: 'put', + key: keyHash, + value: value + }, + { + type: 'put', + key: keyHeight, + value: value + } + ]; +}; + HeaderService.prototype._onHeaders = function(headers) { var self = this; @@ -385,13 +430,13 @@ HeaderService.prototype._onHeaders = function(headers) { log.debug('Header Service: Received: ' + headers.length + ' header(s).'); - var dbOps = []; + var dbOps = self._getDBOpForLastHeader(headers[0]); - for(var i = 0; i < headers.length; i++) { + var transformedHeaders = self._transformHeaders(headers); - var header = headers[i]; + for(var i = 0; i < transformedHeaders.length; i++) { - header = header.toObject(); + var header = transformedHeaders[i]; assert(self._lastHeader.hash === header.prevHash, 'headers not in order: ' + self._lastHeader.hash + ' -and- ' + header.prevHash + ' Last header at height: ' + self._lastHeader.height); @@ -411,7 +456,7 @@ HeaderService.prototype._onHeaders = function(headers) { }; HeaderService.prototype._handleError = function(err) { - log.error(err); + log.error('Header Service: ' + err); this.node.stop(); }; @@ -628,14 +673,14 @@ HeaderService.prototype._startSync = function() { var self = this; + // remove all listeners + // ensure the blockProcessor is finished processing blocks (empty queue) + // then proceed with gathering new set(s) of headers + // if our tip height is less than the best height of this peer, then: // 1. the peer is not fully synced. // 2. the peer has reorg'ed and we need to handle this - // unsub from listening for blocks - // ensure the blockProcessor is finished processing blocks (empty queue) - // then proceed with gathering new set(s) of headers - self._initialSync = true; log.debug('Header Service: starting sync routines, ensuring no pre-exiting subscriptions to p2p blocks.'); self._removeAllSubscriptions(); @@ -664,7 +709,7 @@ HeaderService.prototype._startSync = function() { }); } - // this should be very uncommon + // very uncommon! when a peer is not sync'ed or has reorg'ed self._handleLowTipHeight(); }); diff --git a/lib/services/mempool/index.js b/lib/services/mempool/index.js index 302affcf..be9a51b5 100644 --- a/lib/services/mempool/index.js +++ b/lib/services/mempool/index.js @@ -2,14 +2,14 @@ var BaseService = require('../../service'); var util = require('util'); var Encoding = require('./encoding'); -var index = require('../../'); -var log = index.log; +var log = require('../..').log; var MempoolService = function(options) { BaseService.call(this, options); this._subscriptions = {}; this._subscriptions.transaction = []; this._db = this.node.services.db; + this._p2p = this.node.services.p2p; }; util.inherits(MempoolService, BaseService); @@ -23,38 +23,6 @@ MempoolService.prototype.getAPIMethods = function() { return methods; }; -MempoolService.prototype.getPublishEvents = function() { - - return [ - { - name: 'mempool/transaction', - scope: this, - subscribe: this.subscribe.bind(this, 'transaction'), - unsubscribe: this.unsubscribe.bind(this, 'transaction') - } - ]; - -}; - -MempoolService.prototype.subscribe = function(name, emitter) { - - this._subscriptions[name].push(emitter); - log.info(emitter.remoteAddress, 'subscribe:', 'mempool/' + name, 'total:', this._subscriptions[name].length); - -}; - -MempoolService.prototype.unsubscribe = function(name, emitter) { - - var index = this._subscriptions[name].indexOf(emitter); - - if (index > -1) { - this._subscriptions[name].splice(index, 1); - } - - log.info(emitter.remoteAddress, 'unsubscribe:', 'mempool/' + name, 'total:', this._subscriptions[name].length); - -}; - MempoolService.prototype.start = function(callback) { var self = this; @@ -64,6 +32,7 @@ MempoolService.prototype.start = function(callback) { } self._encoding = new Encoding(prefix); self._startSubscriptions(); + callback(); }); }; @@ -100,17 +69,24 @@ MempoolService.prototype.onReorg = function(args, callback) { MempoolService.prototype._startSubscriptions = function() { - if (this._subscribed) { + var self = this; + if (self._subscribed) { return; } - this._subscribed = true; - if (!this._bus) { - this._bus = this.node.openBus({remoteAddress: 'localhost-mempool'}); + self._subscribed = true; + if (!self._bus) { + self._bus = self.node.openBus({remoteAddress: 'localhost-mempool'}); } - this._bus.on('p2p/transaction', this._onTransaction.bind(this)); - this._bus.subscribe('p2p/transaction'); + self._bus.on('p2p/transaction', self._onTransaction.bind(self)); + self._bus.subscribe('p2p/transaction'); + + self._p2p.on('bestHeight', function() { + log.info('Mempool Service: Geting mempool from peer.'); + self._p2p.getMempool(); + }); + }; MempoolService.prototype.onBlock = function(block, callback) { diff --git a/lib/services/p2p/index.js b/lib/services/p2p/index.js index f780e4bf..8a7647d0 100644 --- a/lib/services/p2p/index.js +++ b/lib/services/p2p/index.js @@ -68,14 +68,9 @@ P2P.prototype.getHeaders = function(filter) { }; -P2P.prototype.getMempool = function(filter) { - +P2P.prototype.getMempool = function() { var peer = this._getPeer(); - - this._setResourceFilter(filter, 'mempool'); - peer.sendMessage(this.messages.MemPool()); - }; P2P.prototype.getPublishEvents = function() { @@ -347,10 +342,7 @@ P2P.prototype._onPeerReady = function(peer, addr) { P2P.prototype._onPeerTx = function(peer, message) { - var filteredMessage = this._applyMempoolFilter(message); - if (filteredMessage) { - this._broadcast(this.subscriptions.transaction, 'p2p/transaction', message.transaction); - } + this._broadcast(this.subscriptions.transaction, 'p2p/transaction', message.transaction); }; P2P.prototype._removePeer = function(peer) { @@ -369,20 +361,13 @@ P2P.prototype._setListeners = function() { self.node.on('ready', self._connect.bind(self)); }; -P2P.prototype._setResourceFilter = function(filter, resource) { +P2P.prototype._setResourceFilter = function(filter) { - if (resource === 'headers' || resource === 'blocks') { - assert(filter && filter.startHash, 'A "startHash" field is required to retrieve headers or blocks'); - if (!filter.endHash) { - filter.endHash = 0; - } - return { starts: [filter.startHash], stop: filter.endHash }; - } - - if (resource === 'mempool') { - this._mempoolFilter = filter; - return; + assert(filter && filter.startHash, 'A "startHash" field is required to retrieve headers or blocks'); + if (!filter.endHash) { + filter.endHash = 0; } + return { starts: [filter.startHash], stop: filter.endHash }; }; diff --git a/lib/services/transaction/index.js b/lib/services/transaction/index.js index 43ce75cb..e1efbbac 100644 --- a/lib/services/transaction/index.js +++ b/lib/services/transaction/index.js @@ -225,7 +225,7 @@ TransactionService.prototype._getInputValues = function(tx, options, callback) { self._getTransaction(input.prevout.txid(), options, function(err, txid, _tx) { if (err || !_tx) { - return next(err || new Error('tx not found for tx id: ' + input.prevout.txid())); + return next(err || new Error('Transaction Service: tx not found for tx id: ' + input.prevout.txid())); } var output = _tx.outputs[outputIndex];