From 58a5c14cdf9efbe2839e531578a72f8d1d2bbea3 Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Fri, 23 Jun 2017 07:33:54 -0400 Subject: [PATCH] wip --- lib/services/block/encoding.js | 42 +-- lib/services/block/index.js | 372 +++++++++++++----------- lib/services/db/index.js | 32 ++- lib/services/p2p/bcoin.js | 32 ++- lib/services/p2p/index.js | 320 ++++++++++----------- lib/utils.js | 14 + scratch.txt | 512 +++++++++++++++++++++++++++++++++ 7 files changed, 953 insertions(+), 371 deletions(-) create mode 100644 scratch.txt diff --git a/lib/services/block/encoding.js b/lib/services/block/encoding.js index bd17eec7..70c7b9ec 100644 --- a/lib/services/block/encoding.js +++ b/lib/services/block/encoding.js @@ -1,45 +1,27 @@ 'use strict'; +var Block = require('bitcore-lib').Block; +// stores -- block header as key, block itself as value (optionally) + function Encoding(servicePrefix) { this.servicePrefix = servicePrefix; - this.hashPrefix = new Buffer('00', 'hex'); - this.heightPrefix = new Buffer('01', 'hex'); } -Encoding.prototype.encodeBlockHashKey = function(hash) { - return Buffer.concat([ this.servicePrefix, this.hashPrefix, new Buffer(hash, 'hex') ]); +Encoding.prototype.encodeBlockKey = function(header) { + var headerBuf = new Buffer(JSON.stringify(header), 'utf8'); + return Buffer.concat([ this.servicePrefix, headerBuf ]); }; -Encoding.prototype.decodeBlockHashKey = function(buffer) { - return buffer.slice(3).toString('hex'); +Encoding.prototype.decodeBlockKey = function(buffer) { + return JSON.parse(buffer.slice(2).toString('utf8')); }; -Encoding.prototype.encodeBlockHashValue = function(hash) { - return new Buffer(hash, 'hex'); +Encoding.prototype.encodeBlockValue = function(block) { + return block.toBuffer(); }; -Encoding.prototype.decodeBlockHashValue = function(buffer) { - return buffer.toString('hex'); -}; - -Encoding.prototype.encodeBlockHeightKey = function(height) { - var heightBuf = new Buffer(4); - heightBuf.writeUInt32BE(height); - return Buffer.concat([ this.servicePrefix, this.heightPrefix, heightBuf ]); -}; - -Encoding.prototype.decodeBlockHeightKey = function(buffer) { - return buffer.slice(3).readUInt32BE(); -}; - -Encoding.prototype.encodeBlockHeightValue = function(height) { - var heightBuf = new Buffer(4); - heightBuf.writeUInt32BE(height); - return heightBuf; -}; - -Encoding.prototype.decodeBlockHeightValue = function(buffer) { - return buffer.readUInt32BE(); +Encoding.prototype.decodeBlockValue = function(buffer) { + return Block.fromBuffer(buffer); }; module.exports = Encoding; diff --git a/lib/services/block/index.js b/lib/services/block/index.js index d2ed548f..240d2eb0 100644 --- a/lib/services/block/index.js +++ b/lib/services/block/index.js @@ -16,30 +16,31 @@ var assert = require('assert'); var BlockService = function(options) { BaseService.call(this, options); - this.p2p = this.node.services.p2p; - this.db = this.node.services.db; - this.subscriptions = {}; - this.subscriptions.block = []; - this.subscriptions.reorg = []; this.tip = null; - this._blockHeaderQueue = LRU(50); //hash -> header, height -> header, - this._blockQueue = LRU(10); // keep 10 blocks in the cache in case of reorg's - this._lastReportedTime = Date.now(); - this._lastHeight; + this._p2p = this.node.services.p2p; + this._db = this.node.services.db; + this._subscriptions = {}; + this._subscriptions.block = []; + this._subscriptions.reorg = []; + this._blockHeaderQueue = LRU(20000); // hash -> header, height -> header + this._blockQueue = LRU({ + max: 50, + length: function(n) { + return n.length * (1 * 1024 * 1024); // 50 MB of blocks + } + }); // header -> block + this._orphanedBlockList = []; // this is a list of block hashes for which one or more ancestor blocks has not yet arrived }; inherits(BlockService, BaseService); -BlockService.dependencies = [ - 'p2p', - 'db' -]; +BlockService.dependencies = [ 'p2p', 'db' ]; BlockService.prototype.start = function(callback) { var self = this; - self.db.getPrefix(self.name, function(err, prefix) { + self._db.getPrefix(self.name, function(err, prefix) { if(err) { return callback(err); @@ -65,19 +66,6 @@ BlockService.prototype.getAPIMethods = function() { return methods; }; -BlockService.prototype.subscribe = function(name, emitter) { - this.subscriptions[name].push(emitter); - log.info(emitter.remoteAddress, 'subscribe:', 'block/' + name, 'total:', this.subscriptions[name].length); -}; - -BlockService.prototype.unsubscribe = function(name, emitter) { - var index = this.subscriptions[name].indexOf(emitter); - if (index > -1) { - this.subscriptions[name].splice(index, 1); - } - log.info(emitter.remoteAddress, 'unsubscribe:', 'block/' + name, 'total:', this.subscriptions[name].length); -}; - BlockService.prototype.getPublishEvents = function() { return [ { @@ -95,7 +83,21 @@ BlockService.prototype.getPublishEvents = function() { ]; }; -BlockService.prototype.printTipInfo = function(prependedMessage) { + +BlockService.prototype.subscribe = function(name, emitter) { + this._subscriptions[name].push(emitter); + log.info(emitter.remoteAddress, 'subscribe:', 'block/' + name, 'total:', this._subscriptions[name].length); +}; + +BlockService.prototype.unsubscribe = function(name, emitter) { + var index = this._subscriptions[name].indexOf(emitter); + if (index > -1) { + this._subscriptions[name].splice(index, 1); + } + log.info(emitter.remoteAddress, 'unsubscribe:', 'block/' + name, 'total:', this._subscriptions[name].length); +}; + +BlockService.prototype._printTipInfo = function(prependedMessage) { log.info( prependedMessage + ' Serial Tip: ' + this.tip.hash + @@ -104,21 +106,17 @@ BlockService.prototype.printTipInfo = function(prependedMessage) { }; -BlockService.prototype._reportStatus = function(serviceName) { - - var tip = this.tips[serviceName]; - - if ((Date.now() - this._lastReportedTime) > 1000) { - this._lastReportedTime = Date.now(); - log.info(serviceName + ' sync: current height is: ' + tip.height + - ' - hash is: ' + tip.hash); - } - +BlockService.prototype._reportBootStatus = function() { + var blockInfoString = utils.getBlockInfoString(this.tip.height, this.bestHeight); + log.info('Block Service tip is currently height: ' + this.tip.height + ' hash: ' + + this.tip.hash + ' P2P network best height: ' + this.bestHeight + '. Block Service is: ' + + blockInfoString); }; -BlockService.prototype._setTip = function(opts) { - this.tip.height = opts.block.height; - this.tip.hash = opts.block.hash; +BlockService.prototype._setTip = function(block) { + this.tip.height = block.height; + this.tip.hash = block.hash; + this._db.setServiceTip('block', this.tip); }; BlockService.prototype.processBlockOperations = function(opts, callback) { @@ -127,12 +125,9 @@ BlockService.prototype.processBlockOperations = function(opts, callback) { return; } - // TODO: when writing to leveldb, it turns out that the optimal batch size to write to the - // database is 1000 bytes to achieve optimal write performance on most systems. - // This is not a comprehensive study, however and certainly not for other db types. var self = this; - self.db.batch(opts.operations, function(err) { + self._db.batch(opts.operations, function(err) { if(err) { return callback(err); @@ -172,78 +167,36 @@ BlockService.prototype.getTipOperation = function(block, add, tipType) { }; }; -BlockService.prototype.getBlocks = function(startHash, endHash, callback) { -}; -BlockService.prototype._getBlocks = function(startHash, endHash, callback) { + +BlockService.prototype.getBlocks = function(startHash, endHash) { var self = this; assert(startHash && startHash.length === 64, 'startHash is required to getBlocks'); - self.p2p.getBlocks({ startHash: startHash, endHash: endHash }); + // if start and end hash are the same, the caller is getting, at most, one block + // otherwise, the caller gets all blocks from startHash to endHash, inclusive. - async.mapLimit(blockArgs, 8, function(blockArg, next) { + // check our memory cache first, then db, then go out to p2p network - async.waterfall([ + // LRU in-memory + var results = self._getCachedBlocks(startHash, endHash); - self._isGenesisBlock.bind(self, blockArg), + // in db + if (!results) { + results = self._getBlocksInDb(startHash, endHash); + } - function(block, next) { - - if (block) { - return next(null, self.genesis); - } - - self._getBlock(blockArg, next); - } - ], next); - - }, function(err, blocks) { - - if(err) { - return callback(err); - } - - self._detectReorg(blocks, function(reorgHash, reorgHeight) { - - // if we have reorg'ed, we want to retain the block's hash in our - // index, but we want to mark it as "-REORG" - var reorgOperations = self._getReorgOperations(reorgHash, reorgHeight); - - var headers = []; - var tipIndexHeight = self.tip.__height; - - for(var i = 0; i < blocks.length; i++) { - - var block = blocks[i]; - if (block.__height !== ++tipIndexHeight) { - block.height = tipIndexHeight; - block.__height = tipIndexHeight; - self._blockQueue.set(block.hash, block); - } - - headers.push({ - hash: block.hash, - prevHash: utils.reverseBufferToString(block.header.prevHash), - height: tipIndexHeight - }); - - } - - var operations = self._getBlockOperations(headers); - - if (reorgOperations) { - operations = reorgOperations.concat(operations); - } - - self.db.batch(operations, function(err) { - callback(err, blocks); - }); - - }); - - }); + var lockedOut = self._getBlocksLockedOut(); + if (!results && !lockedOut) { + self._p2p.getBlocks({ startHash: startHash }); + return true; + } + if (lockedOut) { + return false; + log.debug('Block Service: getBlocks called, but service is still in a lock out period.'); + } }; BlockService.prototype._checkCache = function(key, cache) { @@ -259,7 +212,7 @@ BlockService.prototype.getBlockHeader = function(hash, callback) { return callback(null, header); } - self.p2p.getBlockHeaders(hash); + self._p2p.getBlockHeaders(hash); var timer = setInterval(function() { var header = self._checkCache(hash, self._blockHeaderQueue); if (header) { @@ -302,17 +255,109 @@ BlockService.prototype._startSubscriptions = function() { }; BlockService.prototype._onHeaders = function(headers) { - log.info('New header received: ' + block.hash); + log.info('New headers received.'); this._cacheHeaders(headers); }; +BlockService.prototype._blockAlreadyProcessed = function(block) { + // if the block is not in our block header queue, we probably have not seen it, or + // it is older than about 1000 blocks + return this._blockHeaderQueue.get(block.hash); +}; + BlockService.prototype._onBlock = function(block) { + // 1. have we already seen this block? + if (this._blockAlreadyProcessed(block)) { + return; + } + + // 2. log the reception log.info('New block received: ' + block.hash); - block.height = ++this._lastHeight; + // 3. store the block for safe keeping this._cacheBlock(block); - this._broadcast(this.subscriptions.block, 'block/block', block); + + // 4. determine block state -- see the function for details about block state + var blockSteta = this._determineBlockState(block); + + // 5. react to state of block + switch (blockState) { + case 'orphaned': + this._handleOrphanedBlock(block); + break; + case 'reorg': + this.emit('reorg', block); + break; + default: + this.setTip(block); + this._broadcast(this.subscriptions.blocks, 'block/block', block); + break; + } + +}; + +BlockService.prototype._determineBlockState = function(block) { + + /* + + block could be in 1 of 3 possible states. + + 1. normal state: block's prev hssh points to our current tip + + 2. reorg state: block's prev hash points to block in our chain that is not the tip. + + Another way to express this is: New block's prev block ALREADY has another block pointing to it. + + This leads to 2 children, for 1 parent block, which is a fork and not a chain. This is a blockchain + and not a blockfork ;) Thus, this new block should be considered the rightful child of its parent. + Remember, this isn't a validating node. Blocks that are relayed to us are considered to be the authoritative. + New blocks always trump what came before. + + 3. orphaned state: block's prev hash is not in our chain at all. Possible reasons are: + + * blocks were delivered out of order, parent of this block is yet to come + * blocks were, again, delivered out order, but the parent will never come + + In point 1, waiting longer for blocks to arrive is the best action. + + In point 2, waiting won't help. The block's parent block may be in an orphaned chain and this chain may + never become the main chain. Also, your peers may nor may not give you all the parent blocks for this orphan chain. + It is best to not assign this block a height until all of its parents are linked. We should, however, call getBlocks with + startHash of our tip and end hash of the list of orphaned blocks periodically. + + */ + + if (this._hasBlockArrivedOutOfOrder(block)) { + return 'orphaned'; + } + + if (this._isChainReorganizing(block)) { + return 'reorg'; + } + + return 'normal'; + +}; + +BlockService.prototype._hasBlockArrivedOutOfOrder = function(block) { + + var prevHash = utils.reverseBufferToString(block.header.prevHash); + return !this._blockHeaderQueue.get(prevHash); + +}; + +BlockService.prototype._isChainReorganizing = function(block) { + + var prevHash = utils.reverseBufferToString(block.header.prevHash); + return prevHash !== this.tip.hash + +}; + + +BlockService.prototypee._findCommonAncestor = function(tipHash, forkedHash) { + // find the common ancestor between these 2 hashes + // most likely, we will already have the block headers }; BlockService.prototype._broadcast = function(subscribers, name, entity) { @@ -328,10 +373,11 @@ BlockService.prototype._cacheBlock = function(block) { var operations = this._getBlockOperations(block); - this.db.batch(operations, function(err) { + this._db.batch(operations, function(err) { if(err) { log.error('There was an error attempting to save block hash: ' + block.hash); + this._db.emit('error', err); return; } @@ -362,66 +408,45 @@ BlockService.prototype._setBlockHeaderQueue = function(header) { BlockService.prototype._setListeners = function() { var self = this; - self.p2p.once('bestHeight', function(height) { + self._p2p.once('bestHeight', function(height) { self._bestHeight = height; - self._loadTip(self._startSubscriptions.bind(self)); + + // once we have best height, we know the p2p network is ready to go + self.once('tip-block', function(tip) { + self._tip = tip; + + self._startSubscriptions(); + }); + self._loadTip(); }); + self._db.on('error', self._onDbError.bind(self)); + self.on('reorg', self._handleReorg.bind(self)); + }; -BlockService.prototype._handleReorg = function(hash, callback) { - - var self = this; - self.printTipInfo('Reorg detected!'); - - self.reorg = true; - self.emit('reorg'); - - var reorg = new Reorg(self.node, self); - - reorg.handleReorg(hash, function(err) { - - if(err) { - log.error('Reorg failed! ' + err); - self.node.stop(); - } - - self.printTipInfo('Reorg successful!'); - self.reorg = false; - self.cleanupAfterReorg(callback); - - }); - +BlockService.prototype._loadTip = function() { + this._db.getServiceTip('block'); }; -BlockService.prototype._detectReorg = function(blocks, callback) { +BlockService.prototype._onDbError = function(err) { + log.error('Block Service: Error: ' + err.message + ' not recovering.'); + this.node.stop(); +}; - var tipHash = this.reorgHash || this.tip.hash; - var tipHeight = this.reorgHeight || this.tip.__height; - for(var i = 0; i < blocks.length; i++) { +BlockService.prototype._isGetP2PBlocksLockedOut = function() { + return Date.now() < (self._getP2PBlocksLockoutPeriod + (self._previousdGetP2PBlocksLockTime || 0)); +}; - if (blocks[i].__height === 0) { - continue; - } - - var prevHash = utils.reverseBufferToString(blocks[i].header.prevHash); - if (prevHash !== tipHash) { - var opts = { preReorgTipHash: tipHash, preReorgTipHeight: tipHeight }; - return this._handleReorg(prevHash, opts, callback); - } - - tipHash = blocks[i].hash; - tipHeight = blocks[i].__height; +BlockService.prototype._getP2PBlocks = function(block) { + if (!!this._isGetP2PBlocksLockedOut()) { + this._previousdGetP2PBlocksLockTime = Date.now(); + self._p2p.getBlocks({ startHash: startHash }); } - - this.reorgHash = tipHash; - this.reorgHeight = tipHeight; - callback(); - }; BlockService.prototype._getBlockValue = function(hashOrHeight, callback) { @@ -438,8 +463,7 @@ BlockService.prototype._getBlockValue = function(hashOrHeight, callback) { valueFn = self.encoding.decodeBlockHeightValue.bind(self.encoding); } -console.trace('hello'); - self.db.get(key, function(err, buf) { + self._db.get(key, function(err, buf) { if (err) { return callback(err); @@ -532,6 +556,34 @@ BlockService.prototype._getBlockOperations = function(obj) { return operations; }; +BlockService.prototype._handleReorg = function(hash, callback) { + log.error('A common ancestor block between hash: ' + this.tip.hash + ' (our current tip) and: ' + + block.hash + ' (the forked block) could not be found.'); + + this.node.stop(); + + var self = this; + self._printTipInfo('Reorg detected!'); + + self.reorg = true; + self.emit('reorg'); + + var reorg = new Reorg(self.node, self); + + reorg.handleReorg(hash, function(err) { + + if(err) { + log.error('Reorg failed! ' + err); + self.node.stop(); + } + + self._printTipInfo('Reorg successful!'); + self.reorg = false; + self.cleanupAfterReorg(callback); + + }); + +}; module.exports = BlockService; diff --git a/lib/services/db/index.js b/lib/services/db/index.js index 5dba482c..c5395c25 100644 --- a/lib/services/db/index.js +++ b/lib/services/db/index.js @@ -89,7 +89,7 @@ DB.prototype._checkVersion = function(callback) { DB.prototype._setVersion = function(callback) { var versionBuffer = new Buffer(new Array(4)); versionBuffer.writeUInt32BE(this.version); - this._store.put(this.dbPrefix + 'version', versionBuffer, callback); + this.put(this.dbPrefix + 'version', versionBuffer, callback); }; DB.prototype.start = function(callback) { @@ -136,7 +136,6 @@ DB.prototype.get = function(key, options, callback) { } else { - // TODO: will this tell the caller the right thing? cb(new Error('Shutdown sequence underway, not able to complete the query')); } @@ -213,6 +212,31 @@ DB.prototype.getPublishEvents = function() { return []; }; +DB.prototype.getServiceTip = function(service) { + var keyBuf = Buffer.concat([ this.dbPrefix, new Buffer('tip-', 'utf8'), new Buffer(service, 'utf8') ]); + this.get(keyBuf, function(err, tipBuf) { + if (err) { + this.emit('error', err); + return; + } + var heightBuf = new Buffer(4); + var tip = { + height: heightBuf.readUInt32BE(), + hash: tipBuf.slice(4) + } + this.emit('tip-' + service, tip); + }); +}; + +DB.prototype.setServiceTip = function(service, tip) { + var keyBuf = Buffer.concat([ this.dbPrefix, new Buffer('tip-', 'utf8'), new Buffer(service, 'utf8') ]); + var heightBuf = new Buffer(4); + var valBuf = Buffer.concat([ heightBuf.writeUInt32BE(tip.height), new Buffer(tip.hash, 'hex') ]); + this.put(keyBuf, valBuf, function(err) { + this.emit('error', err); + }); +}; + DB.prototype.getPrefix = function(service, callback) { var self = this; @@ -252,7 +276,7 @@ DB.prototype.getPrefix = function(service, callback) { function putPrefix(buffer, next) { - self._store.put(keyBuf, buffer, function(err) { + self.put(keyBuf, buffer, function(err) { if (err) { return callback(err); @@ -270,7 +294,7 @@ DB.prototype.getPrefix = function(service, callback) { var nextUnused = new Buffer(2); nextUnused.writeUInt16BE(prefix + 1); - self._store.put(unusedBuf, nextUnused, function(err) { + self.put(unusedBuf, nextUnused, function(err) { if (err) { return callback(err); diff --git a/lib/services/p2p/bcoin.js b/lib/services/p2p/bcoin.js index c448a75f..487a539e 100644 --- a/lib/services/p2p/bcoin.js +++ b/lib/services/p2p/bcoin.js @@ -10,21 +10,6 @@ var Bcoin = function(options) { this.emitter = new EE(); }; -Bcoin.prototype._getConfig = function(options) { - var config = { - checkpoints: true, - network: options.network || 'main', - listen: true - }; - if (options.prefix) { - config.prefix = options.prefix; - } - if (options.logLevel) { - config.logLevel = options.logLevel; - } - return config; -}; - Bcoin.prototype.start = function() { var self = this; self._bcoin = bcoin.fullnode(self._config); @@ -45,4 +30,21 @@ Bcoin.prototype.stop = function() { this._bcoin.close(); }; +// --- privates + +Bcoin.prototype._getConfig = function(options) { + var config = { + checkpoints: true, + network: options.network || 'main', + listen: true + }; + if (options.prefix) { + config.prefix = options.prefix; + } + if (options.logLevel) { + config.logLevel = options.logLevel; + } + return config; +}; + module.exports = Bcoin; diff --git a/lib/services/p2p/index.js b/lib/services/p2p/index.js index 8bcce89a..4da99c6e 100644 --- a/lib/services/p2p/index.js +++ b/lib/services/p2p/index.js @@ -29,63 +29,43 @@ util.inherits(P2P, BaseService); P2P.dependencies = []; - -P2P.prototype._hasPeers = function() { - return this._options && - this._options.peers && - this._options.peers.host; +P2P.prototype.clearInventoryCache = function() { + this._inv.reset(); }; -P2P.prototype._startBcoin = function() { - this._bcoin = new Bcoin({ - network: this.node.getNetworkName(), - prefix: this.node.datadir - }); - this._bcoin.start(); +P2P.prototype.getAPIMethods = function() { + var methods = [ + ['getHeaders', this, this.getHeaders, 1], + ['getMempool', this, this.getMempool, 0], + ['getBlocks', this, this.getBlocks, 1] + ['clearInventoryCache', this, this.clearInventoryCache, 0] + ]; + return methods; }; -P2P.prototype._initP2P = function() { - this._maxPeers = this._options.maxPeers || 60; - this._minPeers = this._options.minPeers || 1; - this._configPeers = this._options.peers; - this.messages = new p2p.Messages({ network: this.node.network }); - this._peerHeights = []; - this._peers = []; - this._peerIndex = 0; - this._mempoolFilter = []; -}; +P2P.prototype.getBlocks = function(filter) { -P2P.prototype._initPubSub = function() { - this.subscriptions = {}; - this.subscriptions.block = []; - this.subscriptions.headers = []; - this.subscriptions.transaction = []; -}; - - -P2P.prototype._startBcoinIfNecessary = function() { - if (!this._hasPeers()) { - log.info('Peers not explicitly configured, starting a local bcoin node.'); - this._startBcoin(); - this._options.peers = [{ ip: { v4: '127.0.0.1' }, port: 48444}]; - } -}; - -P2P.prototype.start = function(callback) { - - var self = this; - self._initCache(); - self._initPool(); - this._setListeners(); - callback(); + var peer = this._getPeer(); + var blockFilter = this._setResourceFilter(filter, 'blocks'); + peer.sendMessage(this.messages.GetBlocks(blockFilter)); }; -P2P.prototype.stop = function(callback) { +P2P.prototype.getHeaders = function(filter) { - var self = this; - self._pool.disconnect(); - callback(); + var peer = this._getPeer(); + var headerFilter = this._setResourceFilter(filter, 'headers'); + peer.sendMessage(this.messages.GetHeaders(headerFilter)); + +}; + +P2P.prototype.getMempool = function(filter) { + + var peer = this._getPeer(); + + this._setResourceFilter(filter, 'mempool'); + + peer.sendMessage(this.messages.MemPool()); }; @@ -112,6 +92,25 @@ P2P.prototype.getPublishEvents = function() { ]; }; + +P2P.prototype.start = function(callback) { + + var self = this; + self._initCache(); + self._initPool(); + this._setListeners(); + callback(); + +}; + +P2P.prototype.stop = function(callback) { + + var self = this; + self._pool.disconnect(); + callback(); + +}; + P2P.prototype.subscribe = function(name, emitter) { this.subscriptions[name].push(emitter); log.info(emitter.remoteAddress, 'subscribe:', 'p2p/' + name, 'total:', this.subscriptions[name].length); @@ -125,11 +124,80 @@ P2P.prototype.unsubscribe = function(name, emitter) { log.info(emitter.remoteAddress, 'unsubscribe:', 'p2p/' + name, 'total:', this.subscriptions[name].length); }; + +// --- privates + +P2P.prototype._addPeer = function(peer) { + this._peers.push(peer); +}; + +P2P.prototype._applyMempoolFilter = function(message) { + if (!this._mempoolFilter) { + return message; + } + var txIndex = this._mempoolFilter.indexOf(message.transaction.hash); + if (txIndex >= 0) { + this._mempoolFilter.splice(txIndex, 1); + return; + } + return message; +}; + +P2P.prototype._broadcast = function(subscribers, name, entity) { + for (var i = 0; i < subscribers.length; i++) { + subscribers[i].emit(name, entity); + } +}; + +P2P.prototype._connect = function() { + this._connectCalled = this._connectCalled > 0 ? 2 : 1; + if (this._connectCalled > 1) { + log.info('Connecting to p2p network.'); + this._pool.connect(); + } +}; + +P2P.prototype._getBestHeight = function(peer) { + + this._peerHeights.push(peer.bestHeight); + + if (this._peerHeights.length >= this._minPeers) { + return Math.max(...this._peerHeights); + } + +}; + +P2P.prototype._getPeer = function() { + + if (this._peers.length === 0) { + return; + } + var index = this._peerIndex++ % this._peers.length; + return this._peers[index]; +}; + +P2P.prototype._hasPeers = function() { + return this._options && + this._options.peers && + this._options.peers.host; +}; + P2P.prototype._initCache = function() { this._inv = LRU(2000); this._cache = []; }; +P2P.prototype._initP2P = function() { + this._maxPeers = this._options.maxPeers || 60; + this._minPeers = this._options.minPeers || 1; + this._configPeers = this._options.peers; + this.messages = new p2p.Messages({ network: this.node.network }); + this._peerHeights = []; + this._peers = []; + this._peerIndex = 0; + this._mempoolFilter = []; +}; + P2P.prototype._initPool = function() { var opts = {}; if (this._configPeers) { @@ -141,47 +209,16 @@ P2P.prototype._initPool = function() { this._pool = new p2p.Pool(opts); }; -P2P.prototype._addPeer = function(peer) { - this._peers.push(peer); +P2P.prototype._initPubSub = function() { + this.subscriptions = {}; + this.subscriptions.block = []; + this.subscriptions.headers = []; + this.subscriptions.transaction = []; }; -P2P.prototype._removePeer = function(peer) { - this._peers.splice(this._peers.indexOf(peer), 1); -}; - -P2P.prototype._getPeer = function() { - - if (this._peers.length === 0) { - return; - } - var index = this._peerIndex++ % this._peers.length; - return this._peers[index]; -}; - -P2P.prototype._getBestHeight = function(peer) { - - this._peerHeights.push(peer.bestHeight); - - if (this._peerHeights.length >= this._minPeers) { - return Math.max(...this._peerHeights); - } - -}; - -P2P.prototype._onPeerReady = function(peer, addr) { - - log.info('Connected to peer: ' + addr.ip.v4 + ', network: ' + - peer.network.alias + ', version: ' + peer.version + ', subversion: ' + - peer.subversion + ', status: ' + peer.status + ', port: ' + - peer.port + ', best height: ' + peer.bestHeight); - - this._addPeer(peer); - var bestHeight = this._getBestHeight(peer); - - if (bestHeight >= 0) { - this.emit('bestHeight', bestHeight); - } +P2P.prototype._onPeerBlock = function(peer, message) { + this._broadcast(this.subscriptions.block, 'p2p/block', message.block); }; P2P.prototype._onPeerDisconnect = function(peer, addr) { @@ -191,6 +228,10 @@ P2P.prototype._onPeerDisconnect = function(peer, addr) { }; +P2P.prototype._onPeerHeaders = function(peer, message) { + this._broadcast(this.subscriptions.headers, 'p2p/headers', message.headers); +}; + P2P.prototype._onPeerInventory = function(peer, message) { var self = this; @@ -212,6 +253,23 @@ P2P.prototype._onPeerInventory = function(peer, message) { } }; +P2P.prototype._onPeerReady = function(peer, addr) { + + log.info('Connected to peer: ' + addr.ip.v4 + ', network: ' + + peer.network.alias + ', version: ' + peer.version + ', subversion: ' + + peer.subversion + ', status: ' + peer.status + ', port: ' + + peer.port + ', best height: ' + peer.bestHeight); + + this._addPeer(peer); + var bestHeight = this._getBestHeight(peer); + + if (bestHeight >= 0) { + this.emit('bestHeight', bestHeight); + } + +}; + + P2P.prototype._onPeerTx = function(peer, message) { var filteredMessage = this._applyMempoolFilter(message); if (filteredMessage) { @@ -219,12 +277,8 @@ P2P.prototype._onPeerTx = function(peer, message) { } }; -P2P.prototype._onPeerBlock = function(peer, message) { - this._broadcast(this.subscriptions.block, 'p2p/block', message.block); -}; - -P2P.prototype._onPeerHeaders = function(peer, message) { - this._broadcast(this.subscriptions.headers, 'p2p/headers', message.headers); +P2P.prototype._removePeer = function(peer) { + this._peers.splice(this._peers.indexOf(peer), 1); }; P2P.prototype._setListeners = function() { @@ -243,36 +297,8 @@ P2P.prototype._setListeners = function() { }; -P2P.prototype._connect = function() { - this._connectCalled = this._connectCalled > 0 ? 2 : 1; - if (this._connectCalled > 1) { - log.info('Connecting to p2p network.'); - this._pool.connect(); - } -}; - -P2P.prototype._broadcast = function(subscribers, name, entity) { - for (var i = 0; i < subscribers.length; i++) { - subscribers[i].emit(name, entity); - } -}; - -P2P.prototype._applyMempoolFilter = function(message) { - if (!this._mempoolFilter) { - return message; - } - var txIndex = this._mempoolFilter.indexOf(message.transaction.hash); - if (txIndex >= 0) { - this._mempoolFilter.splice(txIndex, 1); - return; - } - return message; -}; - P2P.prototype._setResourceFilter = function(filter, resource) { - // startHash is usually the last block or header you have, endHash is a hash that comes after that hash, - // or a block at a greater height if (resource === 'headers' || resource === 'blocks') { assert(filter && filter.startHash, 'A "startHash" field is required to retrieve headers or blocks'); if (!filter.endHash) { @@ -281,9 +307,6 @@ P2P.prototype._setResourceFilter = function(filter, resource) { return { starts: [filter.startHash], stop: filter.endHash }; } - // this function knows about mempool, block and header filters - // mempool filters are considered after the tx is delivered to us - // because we can't match a tx to the query params. if (resource === 'mempool') { this._mempoolFilter = filter; return; @@ -291,47 +314,20 @@ P2P.prototype._setResourceFilter = function(filter, resource) { }; -P2P.prototype.getAPIMethods = function() { - var methods = [ - ['getHeaders', this, this.getHeaders, 1], - ['getMempool', this, this.getMempool, 0], - ['getBlocks', this, this.getBlocks, 1] - ]; - return methods; +P2P.prototype._startBcoin = function() { + this._bcoin = new Bcoin({ + network: this.node.getNetworkName(), + prefix: this.node.datadir + }); + this._bcoin.start(); }; -P2P.prototype.getHeaders = function(filter) { - - var peer = this._getPeer(); - var headerFilter = this._setResourceFilter(filter, 'headers'); - peer.sendMessage(this.messages.GetHeaders(headerFilter)); - -}; - -P2P.prototype.getMempool = function(filter) { - - var peer = this._getPeer(); - - // mempools can grow quite large, especially if subscribers are liberally accepting - // all manner of txs (low/no fee, "non-standard", etc.). As such, this filter can - // grow quite large over time. Care should be taken to limit the size of this filter. - // Even still, when a tx is broadcasted, the reference to it is dropped from the filter. - this._setResourceFilter(filter, 'mempool'); - - peer.sendMessage(this.messages.MemPool()); - -}; - -P2P.prototype.getBlocks = function(filter) { - - var peer = this._getPeer(); - var blockFilter = this._setResourceFilter(filter, 'blocks'); - peer.sendMessage(this.messages.GetBlocks(blockFilter)); - -}; - -P2P.prototype.clearInventoryCache = function() { - this._inv.reset(); +P2P.prototype._startBcoinIfNecessary = function() { + if (!this._hasPeers()) { + log.info('Peers not explicitly configured, starting a local bcoin node.'); + this._startBcoin(); + this._options.peers = [{ ip: { v4: '127.0.0.1' }, port: 48444}]; + } }; module.exports = P2P; diff --git a/lib/utils.js b/lib/utils.js index 5062faa3..9db49dd9 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -154,5 +154,19 @@ utils.getAddressString = function(script, output) { return null; }; +utils.getBlockInfoString = function(tip, best) { + + var diff = best - tip; + var astr = diff + ' blocks behind.'; + + if (diff === -1) { + astr = Math.abs(diff) + ' block ahead. Peer may be syncing or we may need to reorganize our chain after new blocks arrive.'; + } else if (diff < 1) { + astr = Math.abs(diff) + ' blocks ahead. Peer may be syncing or we may need to reorganize our chain after new blocks arrive.'; + } else if (diff === 1) { + astr = diff + ' block behind.'; + } + +}; module.exports = utils; diff --git a/scratch.txt b/scratch.txt new file mode 100644 index 00000000..90c1e30d --- /dev/null +++ b/scratch.txt @@ -0,0 +1,512 @@ +AddressService.prototype.concurrentBlockHandler = function(block, connectBlock, callback) { + var self = this; + + var txs = block.transactions; + var height = block.__height; + + var action = 'put'; + var reverseAction = 'del'; + if (!connectBlock) { + action = 'del'; + reverseAction = 'put'; + } + + var operations = []; + + for(var i = 0; i < txs.length; i++) { + + var tx = txs[i]; + var txid = tx.id; + var inputs = tx.inputs; + var outputs = tx.outputs; + + // Subscription messages + var txmessages = {}; + + var outputLength = outputs.length; + for (var outputIndex = 0; outputIndex < outputLength; outputIndex++) { + var output = outputs[outputIndex]; + + var script = output.script; + + if(!script) { + log.debug('Invalid script'); + continue; + } + + var address = self.getAddressString(script); + if(!address) { + continue; + } + + var key = self._encoding.encodeAddressIndexKey(address, height, txid); + operations.push({ + type: action, + key: key + }); + + // Collect data for subscribers + if (txmessages[address]) { + txmessages[address].outputIndexes.push(outputIndex); + } else { + txmessages[address] = { + tx: tx, + height: height, + outputIndexes: [outputIndex], + address: address, + timestamp: block.header.timestamp + }; + } + } + + if(tx.isCoinbase()) { + continue; + } + + //TODO deal with P2PK + for(var inputIndex = 0; inputIndex < inputs.length; inputIndex++) { + var input = inputs[inputIndex]; + + if(!input.script) { + log.debug('Invalid script'); + continue; + } + + var inputAddress = self.getAddressString(input.script); + + if(!inputAddress) { + continue; + } + + var inputKey = self._encoding.encodeAddressIndexKey(inputAddress, height, txid); + + operations.push({ + type: action, + key: inputKey + }); + + } + } + setImmediate(function() { + callback(null, operations); + }); +}; + +AddressService.prototype.blockHandler = function(block, connectBlock, callback) { + var self = this; + + var txs = block.transactions; + + var action = 'put'; + var reverseAction = 'del'; + if (!connectBlock) { + action = 'del'; + reverseAction = 'put'; + } + + var operations = []; + + async.eachSeries(txs, function(tx, next) { + var txid = tx.id; + var inputs = tx.inputs; + var outputs = tx.outputs; + + var outputLength = outputs.length; + for (var outputIndex = 0; outputIndex < outputLength; outputIndex++) { + var output = outputs[outputIndex]; + + var script = output.script; + + if(!script) { + log.debug('Invalid script'); + continue; + } + + var address = self.getAddressString(script); + + if(!address) { + continue; + } + + var key = self._encoding.encodeUtxoIndexKey(address, txid, outputIndex); + var value = self._encoding.encodeUtxoIndexValue(block.__height, output.satoshis, output._scriptBuffer); + operations.push({ + type: action, + key: key, + value: value + }); + + } + + if(tx.isCoinbase()) { + return next(); + } + + //TODO deal with P2PK + async.each(inputs, function(input, next) { + if(!input.script) { + log.debug('Invalid script'); + return next(); + } + + var inputAddress = self.getAddressString(input.script); + + if(!inputAddress) { + return next(); + } + + var inputKey = self._encoding.encodeUtxoIndexKey(inputAddress, input.prevTxId, input.outputIndex); + //common case is connecting blocks and deleting outputs spent by these inputs + if (connectBlock) { + operations.push({ + type: 'del', + key: inputKey + }); + next(); + } else { // uncommon and slower, this happens during a reorg + self.node.services.transaction.getTransaction(input.prevTxId.toString('hex'), {}, function(err, tx) { + var utxo = tx.outputs[input.outputIndex]; + var inputValue = self._encoding.encodeUtxoIndexValue(tx.__height, utxo.satoshis, utxo._scriptBuffer); + operations.push({ + type: 'put', + key: inputKey, + value: inputValue + }); + next(); + }); + } + }, function(err) { + if(err) { + return next(err); + } + next(); + }); + }, function(err) { + //we are aync predicated on reorg sitch + if(err) { + return callback(err); + } + callback(null, operations); + }); +}; +TransactionService.prototype.blockHandler = function(block, connectBlock, callback) { + var self = this; + var action = 'put'; + var reverseAction = 'del'; + if (!connectBlock) { + action = 'del'; + reverseAction = 'put'; + } + + var operations = []; + + this.currentTransactions = {}; + + async.series([ + function(next) { + self.node.services.timestamp.getTimestamp(block.hash, function(err, timestamp) { + if(err) { + return next(err); + } + block.__timestamp = timestamp; + next(); + }); + }, function(next) { + async.eachSeries(block.transactions, function(tx, next) { + tx.__timestamp = block.__timestamp; + tx.__height = block.__height; + + self._getInputValues(tx, function(err, inputValues) { + if(err) { + return next(err); + } + tx.__inputValues = inputValues; + self.currentTransactions[tx.id] = tx; + + operations.push({ + type: action, + key: self.encoding.encodeTransactionKey(tx.id), + value: self.encoding.encodeTransactionValue(tx) + }); + next(); + }); + }, function(err) { + if(err) { + return next(err); + } + next(); + }); + }], function(err) { + if(err) { + return callback(err); + } + callback(null, operations); + }); + +}; +MempoolService.prototype.blockHandler = function(block, connectBlock, callback) { + var self = this; + + if (!self._handleBlocks) { + return setImmediate(callback); + } + + var txs = block.transactions; + + var action = 'del'; + if (!connectBlock) { + action = 'put'; + } + + for(var i = 0; i < txs.length; i++) { + + var tx = txs[i]; + self._updateMempool(tx, action); + } + setImmediate(callback); +}; +WalletService.prototype.blockHandler = function(block, connectBlock, callback) { + + var opts = { + block: block, + connectBlock: connectBlock, + serial: true + }; + this._blockHandler(opts, callback); + +}; + +WalletService.prototype.concurrentBlockHandler = function(block, connectBlock, callback) { + + var opts = { + block: block, + connectBlock: connectBlock + }; + this._blockHandler(opts, callback); + +}; + +WalletService.prototype._blockHandler = function(opts, callback) { + + var self = this; + + if (!self._checkAddresses()) { + return setImmediate(function() { + callback(null, []); + }); + } + + async.mapSeries(opts.block.transactions, function(tx, next) { + + self._processTransaction(opts, tx, next); + + }, function(err, operations) { + + if(err) { + return callback(err); + } + + var ret = _.compact(_.flattenDeep(operations)); + callback(null, ret); + + }); + +}; + +WalletService.prototype._processTransaction = function(opts, tx, callback) { + var self = this; + + tx.outputs.forEach(function(output, index) { + output.index = index; + }); + + var ioData = tx.inputs.concat(tx.outputs); + + async.mapSeries(ioData, function(io, next) { + if (opts.serial) { + self._processSerialIO(opts, tx, io, next); + } else { + self._processConcurrentIO(opts, tx, io, next); + } + }, function(err, operations) { + if(err) { + return callback(err); + } + callback(null, operations); + }); + +}; + +WalletService.prototype._processConcurrentIO = function(opts, tx, io, callback) { + + var self = this; + var walletIds = self._getWalletIdsFromScript(io); + + if (!walletIds) { + return callback(); + } + var actions = self._getActions(opts.connectBlock); + + var operations = walletIds.map(function(walletId) { + return { + type: actions[0], + key: self._encoding.encodeWalletTransactionKey(walletId, opts.block.__height, tx.id) + }; + }); + + setImmediate(function() { + callback(null, operations); + }); + +}; + +WalletService.prototype._processSerialIO = function(opts, tx, io, callback) { + var fn = this._processSerialOutput; + if (io instanceof Input) { + fn = this._processSerialInput; + } + fn.call(this, opts, tx, io, callback); +}; + +WalletService.prototype._getWalletIdsFromScript = function(io) { + + if(!io.script) { + log.debug('Invalid script'); + return; + } + + return this._addressMap[this.getAddressString(io)]; + +}; + +WalletService.prototype._getActions = function(connect) { + var action = 'put'; + var reverseAction = 'del'; + if (!connect) { + action = 'del'; + reverseAction = 'put'; + } + return [action, reverseAction]; +}; + +WalletService.prototype._processSerialOutput = function(opts, tx, output, callback) { + + var self = this; + var walletIds = self._getWalletIdsFromScript(output); + + if (!walletIds) { + return callback(); + } + + var actions = self._getActions(opts.connectBlock); + + async.mapSeries(walletIds, function(walletId, next) { + + self.balances[walletId] = self.balances[walletId] || 0; + self.balances[walletId] += opts.connectBlock ? output.satoshis : (-1 * output.satoshis); + + var operations = [ + { + type: actions[0], + key: self._encoding.encodeWalletUtxoKey(walletId, tx.id, output.index), + value: self._encoding.encodeWalletUtxoValue(opts.block.__height, output.satoshis, output._scriptBuffer) + }, + { + type: actions[0], + key: self._encoding.encodeWalletUtxoSatoshisKey(walletId, output.satoshis, tx.id, output.index), + value: self._encoding.encodeWalletUtxoSatoshisValue(opts.block.__height, output._scriptBuffer) + }, + { + type: 'put', + key: self._encoding.encodeWalletBalanceKey(walletId), + value: self._encoding.encodeWalletBalanceValue(self.balances[walletId]) + } + ]; + + next(null, operations); + + }, function(err, operations) { + + if(err) { + return callback(err); + } + + callback(null, operations); + + }); + +}; + +WalletService.prototype._processSerialInput = function(opts, tx, input, callback) { + + var self = this; + + var walletIds = input.script && input.script.isPublicKeyIn() ? + ['p2pk'] : + self._getWalletIdsFromScript(input); + + if (!walletIds) { + return callback(); + } + + var actions = self._getActions(opts.connectBlock); + + async.mapSeries(walletIds, function(walletId, next) { + + self.node.services.transaction.getTransaction(input.prevTxId.toString('hex'), {}, function(err, tx) { + + if(err) { + return next(err); + } + + var utxo = tx.outputs[input.outputIndex]; + + if (walletId === 'p2pk') { + + var pubKey = utxo.script.getPublicKey().toString('hex'); + walletId = self._addressMap[pubKey]; + + if (!walletId) { + return next(null, []); + } + + } + + self.balances[walletId] = self.balances[walletId] || 0; + self.balances[walletId] += opts.connectBlock ? (-1 * utxo.satoshis) : utxo.satoshis; + + var operations = [ + { + type: actions[1], + key: self._encoding.encodeWalletUtxoKey(walletId, input.prevTxId, input.outputIndex), + value: self._encoding.encodeWalletUtxoValue(tx.__height, utxo.satoshis, utxo._scriptBuffer) + }, + { + type: actions[1], + key: self._encoding.encodeWalletUtxoSatoshisKey(walletId, utxo.satoshis, tx.id, input.outputIndex), + value: self._encoding.encodeWalletUtxoSatoshisValue(tx.__height, utxo._scriptBuffer) + }, + { + type: 'put', + key: self._encoding.encodeWalletBalanceKey(walletId), + value: self._encoding.encodeWalletBalanceValue(self.balances[walletId]) + } + ]; + + next(null, operations); + + }); + }, function(err, operations) { + + if(err) { + return callback(err); + } + + callback(null, operations); + + }); + +}; + +WalletService.prototype._loadAllAddresses = function(callback) { +