From 02ff6c680c8bc31aaa26e3e2371481692bd4d202 Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Thu, 7 Sep 2017 18:55:33 -0400 Subject: [PATCH] Fixed sync resume (memory issues). Input values on tx index are no longer lazy loaded. --- lib/services/address/index.js | 7 +- lib/services/block/index.js | 27 ++++- lib/services/header/index.js | 20 +--- lib/services/p2p/index.js | 36 ++++--- lib/services/transaction/index.js | 168 ++++++++++++++++-------------- package-lock.json | 2 +- package.json | 2 +- 7 files changed, 143 insertions(+), 119 deletions(-) diff --git a/lib/services/address/index.js b/lib/services/address/index.js index 5b38271e..8cfaa819 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -96,6 +96,7 @@ AddressService.prototype.getAddressSummary = function(address, options, callback transactions: [] }; + // txid criteria var start = self._encoding.encodeAddressIndexKey(address, options.from); var end = self._encoding.encodeAddressIndexKey(address, options.to); @@ -130,12 +131,6 @@ AddressService.prototype.getAddressSummary = function(address, options, callback txStream._transform = function(chunk, enc, callback) { - // in the case where an address appears in both an input -and- - // an output (sending money to one's self or using the sending - // address as the change address (not recommended), we will get - // duplicates. We don't want to look up the tx again. - // Luckily, due to the way leveldb stores keys, we should get - // txids out in lexigraphical order, so we can use an LRU here var key = self._encoding.decodeAddressIndexKey(chunk); self._tx.getTransaction(key.txid, options, function(err, tx) { diff --git a/lib/services/block/index.js b/lib/services/block/index.js index beee0282..0efbf73f 100644 --- a/lib/services/block/index.js +++ b/lib/services/block/index.js @@ -37,6 +37,7 @@ BlockService.dependencies = [ 'timestamp', 'p2p', 'db', 'header' ]; // --- public prototype functions BlockService.prototype.getAPIMethods = function() { var methods = [ + ['getInfo', this, this.getInfo, 0], ['getBlock', this, this.getBlock, 1], ['getRawBlock', this, this.getRawBlock, 1], ['getBlockOverview', this, this.getBlockOverview, 1], @@ -47,6 +48,22 @@ BlockService.prototype.getAPIMethods = function() { return methods; }; +BlockService.prototype.getInfo = function(callback) { + callback(null, { + blocks: this.getTip().height, + connections: this._p2p.getNumberOfPeers(), + timeoffset: 0, + proxy: '', + testnet: this.node.network === 'livenet' ? false: true, + errors: '', + network: this.node.network, + relayFee: 0, + version: 'bitcore-1.1.2', + protocolversion: 700001, + difficulty: this._header.getCurrentDifficulty() + }); +}; + BlockService.prototype.isSynced = function(callback) { callback(null, !this._initialSync); }; @@ -447,7 +464,7 @@ BlockService.prototype._processReorg = function(commonAncestorHeader, oldBlocks, if (err) { if (!self.node.stopping) { - log.error('Block Service: Error: ' + err); + log.error('Block Service: ' + err); self.node.stop(); } return; @@ -456,7 +473,7 @@ BlockService.prototype._processReorg = function(commonAncestorHeader, oldBlocks, self._db.batch(operations, function(err) { if (err && !self.node.stopping) { - log.error('Block Service: Error: ' + err); + log.error('Block Service: ' + err); self.node.stop(); } @@ -496,7 +513,7 @@ BlockService.prototype._processBlock = function(block) { if (err) { if (!self.node.stopping) { - log.error('Block Service: Error: ' + err); + log.error('Block Service: ' + err); self.node.stop(); } return; @@ -506,7 +523,7 @@ BlockService.prototype._processBlock = function(block) { if (err) { if (!self.node.stopping) { - log.error('Block Service: Error: ' + err); + log.error('Block Service: ' + err); self.node.stop(); } return; @@ -520,7 +537,7 @@ BlockService.prototype._processBlock = function(block) { if (err) { if (!self.node.stopping) { - log.error('Block Service: Error: ' + err); + log.error('Block Service: ' + err); self.node.stop(); } return; diff --git a/lib/services/header/index.js b/lib/services/header/index.js index 9479a583..e1a184a6 100644 --- a/lib/services/header/index.js +++ b/lib/services/header/index.js @@ -62,7 +62,6 @@ HeaderService.prototype.getAPIMethods = function() { var methods = [ ['getAllHeaders', this, this.getAllHeaders, 0], ['getBestHeight', this, this.getBestHeight, 0], - ['getInfo', this, this.getInfo, 0], ['getBlockHeader', this, this.getBlockHeader, 1] ]; @@ -75,22 +74,6 @@ HeaderService.prototype.getCurrentDifficulty = function() { return bcoin.mining.common.getDifficulty(target); }; -HeaderService.prototype.getInfo = function(callback) { - callback(null, { - blocks: this._lastHeader.height, - connections: this._p2p.getNumberOfPeers(), - timeoffset: 0, - proxy: '', - testnet: this.node.network === 'livenet' ? false: true, - errors: '', - network: this.node.network, - relayFee: 0, - version: 'bitcore-1.1.2', - protocolversion: 700001, - difficulty: this.getCurrentDifficulty() - }); -}; - HeaderService.prototype.getAllHeaders = function(callback) { var self = this; @@ -217,10 +200,12 @@ HeaderService.prototype.stop = function(callback) { if (this._headerInterval) { clearInterval(this._headerInterval); + this._headerInterval = null; } if (this._blockProcessor) { clearInterval(this._blockProcessor); + this._blockProcessor = null; } callback(); @@ -419,6 +404,7 @@ HeaderService.prototype._onHeaders = function(headers) { if (self._headerInterval) { clearInterval(self._headerInterval); + self._headerInterval = null; } log.debug('Header Service: Received: ' + headers.length + ' header(s).'); diff --git a/lib/services/p2p/index.js b/lib/services/p2p/index.js index 487ee3bc..941a0f5a 100644 --- a/lib/services/p2p/index.js +++ b/lib/services/p2p/index.js @@ -174,6 +174,18 @@ P2P.prototype._broadcast = function(subscribers, name, entity) { } }; +P2P.prototype._setRetryInterval = function() { + var self = this; + if (!self._retryInterval && !self.node.stopping) { + + self._retryInterval = setInterval(function() { + log.info('Retrying connection to p2p network.'); + self._pool.connect(); + }, 5000); + + } +}; + P2P.prototype._connect = function() { var self = this; @@ -181,14 +193,7 @@ P2P.prototype._connect = function() { log.info('Connecting to p2p network.'); self._pool.connect(); - var retryInterval = setInterval(function() { - log.info('Retrying connection to p2p network.'); - self._pool.connect(); - }, 5000); - - self._pool.once('peerready', function() { - clearInterval(retryInterval); - }); + self._setRetryInterval(); }; @@ -264,14 +269,13 @@ P2P.prototype._onPeerBlock = function(peer, message) { P2P.prototype._onPeerDisconnect = function(peer, addr) { this._removePeer(peer); - log.info('Disconnected from peer: ' + addr.ip.v4); - if (!this.node.stopping) { - log.info('Attempting to reconnect to the p2p network after disconnect signal.'); - this._connect(); - return; + if (this._peers.length < 1) { + this._setRetryInterval(); } + log.info('Disconnected from peer: ' + addr.ip.v4); + }; P2P.prototype._onPeerHeaders = function(peer, message) { @@ -315,6 +319,12 @@ P2P.prototype._matchNetwork = function(network) { P2P.prototype._onPeerReady = function(peer, addr) { + // clear any interval timers that we previously set + if (this._retryInterval) { + clearInterval(this._retryInterval); + this._retryInterval = null; + } + // want to make sure the peer we are connecting to matches our network config. var network = this._matchNetwork(peer.network); diff --git a/lib/services/transaction/index.js b/lib/services/transaction/index.js index dcfa3c17..87012dda 100644 --- a/lib/services/transaction/index.js +++ b/lib/services/transaction/index.js @@ -11,6 +11,7 @@ function TransactionService(options) { BaseService.call(this, options); this._db = this.node.services.db; this._mempool = this.node.services.mempool; + this._block = this.node.services.block; this._header = this.node.services.header; this._p2p = this.node.services.p2p; this._timestamp = this.node.services.timestamp; @@ -22,7 +23,9 @@ TransactionService.dependencies = [ 'p2p', 'db', 'timestamp', - 'mempool' + 'mempool', + 'block', + 'header' ]; // ---- start public function protorypes @@ -30,8 +33,7 @@ TransactionService.prototype.getAPIMethods = function() { return [ ['getRawTransaction', this, this.getRawTransaction, 1], ['getTransaction', this, this.getTransaction, 1], - ['getDetailedTransaction', this, this.getDetailedTransaction, 1], - ['getInputValues', this, this.getInputValues, 1] + ['getDetailedTransaction', this, this.getDetailedTransaction, 1] ]; }; @@ -74,22 +76,47 @@ TransactionService.prototype.getTransaction = function(txid, options, callback) } async.waterfall([ - function(next) { - self._getTransaction(txid, options, next); - }, + self._getTransaction.bind(self, txid, options), + self._getSupplementaryTransactionInfo.bind(self), self._getMempoolTransaction.bind(self), - self.getInputValues.bind(self), self._setMetaInfo.bind(self) ], callback); }; +TransactionService.prototype._getSupplementaryTransactionInfo = function(txid, tx, options, callback) { + + if (!tx) { + return callback(null, txid, tx, options); + } + + var self = this; + tx.confirmations = self._block.getTip().height - tx.__height + 1; + + // TODO maybe we should index the block hash along with the height on tx, + // so that this extra lookup isn't necessary + self._header.getBlockHeader(tx.__height, function(err, header) { + + if (err) { + return callback(err); + } + + if (header) { + tx.blockHash = header.hash; + } + + callback(null, txid, tx, options); + + }); +}; + TransactionService.prototype._setMetaInfo = function(tx, options, callback) { if (!tx) { return callback(); } + // output values var outputSatoshis = 0; @@ -142,6 +169,7 @@ TransactionService.prototype._getMempoolTransaction = function(txid, tx, options } tx.confirmations = 0; + tx.blockHash = null; callback(null, tx, options); }); @@ -152,8 +180,19 @@ TransactionService.prototype._getTransaction = function(txid, options, callback) var self = this; - var key = self._encoding.encodeTransactionKey(txid); + // txs will be in the index, the current block at LOWER tx indexes + // or they don't exist for the purposes of this function + // inputValues will be on the tx already by this point. + var currentBlockTx = options && options.processedTxs && + options.processedTxs[txid] ? options.processedTxs[txid] : null; + if (currentBlockTx) { + return setImmediate(function() { + callback(null, txid, currentBlockTx, options); + }); + } + + var key = self._encoding.encodeTransactionKey(txid); self._db.get(key, function(err, tx) { if (err) { @@ -165,44 +204,19 @@ TransactionService.prototype._getTransaction = function(txid, options, callback) } tx = self._encoding.decodeTransactionValue(tx); - tx.confirmations = self._header.getBestHeight() - tx.__height; - - self._header.getBlockHeader(tx.__height, function(err, header) { - - if (err) { - return callback(err); - } - - if (header) { - tx.blockHash = header.hash; - } - - callback(null, txid, tx, options); - - }); - + callback(null, txid, tx, options); }); }; -TransactionService.prototype.getInputValues = function(tx, options, callback) { +TransactionService.prototype._getInputValues = function(tx, options, callback) { var self = this; - if (!tx) { - return callback(null, tx, options); - } + async.mapLimit(tx.inputs, 4, function(input, next) { - async.eachOfLimit(tx.inputs, 4, function(input, index, next) { - - if (!tx.__inputValues) { - tx.__inputValues = []; - } - - var inputSatoshis = tx.__inputValues[index]; - - if (inputSatoshis >= 0 || input.isCoinbase()) { - return next(); + if (input.isCoinbase()) { + return next(null, 0); } var outputIndex = input.prevout.index; @@ -215,31 +229,12 @@ TransactionService.prototype.getInputValues = function(tx, options, callback) { var output = _tx.outputs[outputIndex]; assert(output, 'Expected an output, but did not get one for tx: ' + _tx.txid() + ' outputIndex: ' + outputIndex); - tx.__inputValues[index] = output.value; - next(); + next(null, output.value); }); - }, function(err) { + }, callback); - if (err) { - return callback(err); - } - - var key = self._encoding.encodeTransactionKey(tx.txid()); - var value = self._encoding.encodeTransactionValue(tx); - - self._db.put(key, value, function(err) { - - if (err) { - return callback(err); - } - - callback(null, tx, options); - - }); - - }); }; TransactionService.prototype.sendTransaction = function(tx, callback) { @@ -275,16 +270,25 @@ TransactionService.prototype._getBlockTimestamp = function(hash) { TransactionService.prototype.onBlock = function(block, callback) { var self = this; + var processedTxs = {}; if (self.node.stopping) { return callback(); } - var operations = block.txs.map(function(tx) { - return self._processTransaction(tx, { block: block }); - }); + async.mapSeries(block.txs, function(tx, next) { - callback(null, operations); + processedTxs[tx.txid()] = tx; + self._processTransaction(tx, { block: block, processedTxs: processedTxs }, next); + + }, function(err, operations) { + + if (err) { + return callback(err); + } + + callback(null, operations); + }); }; @@ -316,27 +320,39 @@ TransactionService.prototype.onReorg = function(args, callback) { }; -TransactionService.prototype._processTransaction = function(tx, opts) { +TransactionService.prototype._processTransaction = function(tx, opts, callback) { // this index is very simple txid -> tx, but we also need to find each // input's prev output value, the adjusted timestamp for the block and // the tx's block height - // input values - tx.__inputValues = []; // these are lazy-loaded on the first access of the tx + var self = this; - // timestamp - tx.__timestamp = this._getBlockTimestamp(opts.block.rhash()); - assert(tx.__timestamp, 'Timestamp is required when saving a transaction.'); + self._getInputValues(tx, opts, function(err, inputValues) { - // height - tx.__height = opts.block.height; - assert(tx.__height, 'Block height is required when saving a trasnaction.'); + if (err) { + return callback(err); + } - return { - key: this._encoding.encodeTransactionKey(tx.txid()), - value: this._encoding.encodeTransactionValue(tx) - }; + assert(inputValues && inputValues.length === tx.inputs.length, + 'Input values missing from tx.'); + + // inputValues + tx.__inputValues = inputValues; + + // timestamp + tx.__timestamp = self._getBlockTimestamp(opts.block.rhash()); + assert(tx.__timestamp, 'Timestamp is required when saving a transaction.'); + + // height + tx.__height = opts.block.height; + assert(tx.__height, 'Block height is required when saving a trasnaction.'); + + callback(null, { + key: self._encoding.encodeTransactionKey(tx.txid()), + value: self._encoding.encodeTransactionValue(tx) + }); + }); }; diff --git a/package-lock.json b/package-lock.json index bbc98cc2..c015bc19 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "bitcore-node", - "version": "5.0.0-beta.4", + "version": "5.0.0-beta.5", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/package.json b/package.json index f25a3247..d251f7a5 100644 --- a/package.json +++ b/package.json @@ -5,7 +5,7 @@ "node": ">=8.0.0" }, "author": "BitPay ", - "version": "5.0.0-beta.5", + "version": "5.0.0-beta.6", "main": "./index.js", "repository": "git://github.com/bitpay/bitcore-node.git", "homepage": "https://github.com/bitpay/bitcore-node",