Fixed sync resume (memory issues).

Input values on tx index are no longer lazy loaded.
This commit is contained in:
Chris Kleeschulte 2017-09-07 18:55:33 -04:00
parent 55ebc03602
commit 02ff6c680c
No known key found for this signature in database
GPG Key ID: 33195D27EF6BDB7F
7 changed files with 143 additions and 119 deletions

View File

@ -96,6 +96,7 @@ AddressService.prototype.getAddressSummary = function(address, options, callback
transactions: [] transactions: []
}; };
// txid criteria // txid criteria
var start = self._encoding.encodeAddressIndexKey(address, options.from); var start = self._encoding.encodeAddressIndexKey(address, options.from);
var end = self._encoding.encodeAddressIndexKey(address, options.to); var end = self._encoding.encodeAddressIndexKey(address, options.to);
@ -130,12 +131,6 @@ AddressService.prototype.getAddressSummary = function(address, options, callback
txStream._transform = function(chunk, enc, callback) { txStream._transform = function(chunk, enc, callback) {
// in the case where an address appears in both an input -and-
// an output (sending money to one's self or using the sending
// address as the change address (not recommended), we will get
// duplicates. We don't want to look up the tx again.
// Luckily, due to the way leveldb stores keys, we should get
// txids out in lexigraphical order, so we can use an LRU here
var key = self._encoding.decodeAddressIndexKey(chunk); var key = self._encoding.decodeAddressIndexKey(chunk);
self._tx.getTransaction(key.txid, options, function(err, tx) { self._tx.getTransaction(key.txid, options, function(err, tx) {

View File

@ -37,6 +37,7 @@ BlockService.dependencies = [ 'timestamp', 'p2p', 'db', 'header' ];
// --- public prototype functions // --- public prototype functions
BlockService.prototype.getAPIMethods = function() { BlockService.prototype.getAPIMethods = function() {
var methods = [ var methods = [
['getInfo', this, this.getInfo, 0],
['getBlock', this, this.getBlock, 1], ['getBlock', this, this.getBlock, 1],
['getRawBlock', this, this.getRawBlock, 1], ['getRawBlock', this, this.getRawBlock, 1],
['getBlockOverview', this, this.getBlockOverview, 1], ['getBlockOverview', this, this.getBlockOverview, 1],
@ -47,6 +48,22 @@ BlockService.prototype.getAPIMethods = function() {
return methods; return methods;
}; };
BlockService.prototype.getInfo = function(callback) {
callback(null, {
blocks: this.getTip().height,
connections: this._p2p.getNumberOfPeers(),
timeoffset: 0,
proxy: '',
testnet: this.node.network === 'livenet' ? false: true,
errors: '',
network: this.node.network,
relayFee: 0,
version: 'bitcore-1.1.2',
protocolversion: 700001,
difficulty: this._header.getCurrentDifficulty()
});
};
BlockService.prototype.isSynced = function(callback) { BlockService.prototype.isSynced = function(callback) {
callback(null, !this._initialSync); callback(null, !this._initialSync);
}; };
@ -447,7 +464,7 @@ BlockService.prototype._processReorg = function(commonAncestorHeader, oldBlocks,
if (err) { if (err) {
if (!self.node.stopping) { if (!self.node.stopping) {
log.error('Block Service: Error: ' + err); log.error('Block Service: ' + err);
self.node.stop(); self.node.stop();
} }
return; return;
@ -456,7 +473,7 @@ BlockService.prototype._processReorg = function(commonAncestorHeader, oldBlocks,
self._db.batch(operations, function(err) { self._db.batch(operations, function(err) {
if (err && !self.node.stopping) { if (err && !self.node.stopping) {
log.error('Block Service: Error: ' + err); log.error('Block Service: ' + err);
self.node.stop(); self.node.stop();
} }
@ -496,7 +513,7 @@ BlockService.prototype._processBlock = function(block) {
if (err) { if (err) {
if (!self.node.stopping) { if (!self.node.stopping) {
log.error('Block Service: Error: ' + err); log.error('Block Service: ' + err);
self.node.stop(); self.node.stop();
} }
return; return;
@ -506,7 +523,7 @@ BlockService.prototype._processBlock = function(block) {
if (err) { if (err) {
if (!self.node.stopping) { if (!self.node.stopping) {
log.error('Block Service: Error: ' + err); log.error('Block Service: ' + err);
self.node.stop(); self.node.stop();
} }
return; return;
@ -520,7 +537,7 @@ BlockService.prototype._processBlock = function(block) {
if (err) { if (err) {
if (!self.node.stopping) { if (!self.node.stopping) {
log.error('Block Service: Error: ' + err); log.error('Block Service: ' + err);
self.node.stop(); self.node.stop();
} }
return; return;

View File

@ -62,7 +62,6 @@ HeaderService.prototype.getAPIMethods = function() {
var methods = [ var methods = [
['getAllHeaders', this, this.getAllHeaders, 0], ['getAllHeaders', this, this.getAllHeaders, 0],
['getBestHeight', this, this.getBestHeight, 0], ['getBestHeight', this, this.getBestHeight, 0],
['getInfo', this, this.getInfo, 0],
['getBlockHeader', this, this.getBlockHeader, 1] ['getBlockHeader', this, this.getBlockHeader, 1]
]; ];
@ -75,22 +74,6 @@ HeaderService.prototype.getCurrentDifficulty = function() {
return bcoin.mining.common.getDifficulty(target); return bcoin.mining.common.getDifficulty(target);
}; };
HeaderService.prototype.getInfo = function(callback) {
callback(null, {
blocks: this._lastHeader.height,
connections: this._p2p.getNumberOfPeers(),
timeoffset: 0,
proxy: '',
testnet: this.node.network === 'livenet' ? false: true,
errors: '',
network: this.node.network,
relayFee: 0,
version: 'bitcore-1.1.2',
protocolversion: 700001,
difficulty: this.getCurrentDifficulty()
});
};
HeaderService.prototype.getAllHeaders = function(callback) { HeaderService.prototype.getAllHeaders = function(callback) {
var self = this; var self = this;
@ -217,10 +200,12 @@ HeaderService.prototype.stop = function(callback) {
if (this._headerInterval) { if (this._headerInterval) {
clearInterval(this._headerInterval); clearInterval(this._headerInterval);
this._headerInterval = null;
} }
if (this._blockProcessor) { if (this._blockProcessor) {
clearInterval(this._blockProcessor); clearInterval(this._blockProcessor);
this._blockProcessor = null;
} }
callback(); callback();
@ -419,6 +404,7 @@ HeaderService.prototype._onHeaders = function(headers) {
if (self._headerInterval) { if (self._headerInterval) {
clearInterval(self._headerInterval); clearInterval(self._headerInterval);
self._headerInterval = null;
} }
log.debug('Header Service: Received: ' + headers.length + ' header(s).'); log.debug('Header Service: Received: ' + headers.length + ' header(s).');

View File

@ -174,6 +174,18 @@ P2P.prototype._broadcast = function(subscribers, name, entity) {
} }
}; };
P2P.prototype._setRetryInterval = function() {
var self = this;
if (!self._retryInterval && !self.node.stopping) {
self._retryInterval = setInterval(function() {
log.info('Retrying connection to p2p network.');
self._pool.connect();
}, 5000);
}
};
P2P.prototype._connect = function() { P2P.prototype._connect = function() {
var self = this; var self = this;
@ -181,14 +193,7 @@ P2P.prototype._connect = function() {
log.info('Connecting to p2p network.'); log.info('Connecting to p2p network.');
self._pool.connect(); self._pool.connect();
var retryInterval = setInterval(function() { self._setRetryInterval();
log.info('Retrying connection to p2p network.');
self._pool.connect();
}, 5000);
self._pool.once('peerready', function() {
clearInterval(retryInterval);
});
}; };
@ -264,14 +269,13 @@ P2P.prototype._onPeerBlock = function(peer, message) {
P2P.prototype._onPeerDisconnect = function(peer, addr) { P2P.prototype._onPeerDisconnect = function(peer, addr) {
this._removePeer(peer); this._removePeer(peer);
log.info('Disconnected from peer: ' + addr.ip.v4);
if (!this.node.stopping) { if (this._peers.length < 1) {
log.info('Attempting to reconnect to the p2p network after disconnect signal.'); this._setRetryInterval();
this._connect();
return;
} }
log.info('Disconnected from peer: ' + addr.ip.v4);
}; };
P2P.prototype._onPeerHeaders = function(peer, message) { P2P.prototype._onPeerHeaders = function(peer, message) {
@ -315,6 +319,12 @@ P2P.prototype._matchNetwork = function(network) {
P2P.prototype._onPeerReady = function(peer, addr) { P2P.prototype._onPeerReady = function(peer, addr) {
// clear any interval timers that we previously set
if (this._retryInterval) {
clearInterval(this._retryInterval);
this._retryInterval = null;
}
// want to make sure the peer we are connecting to matches our network config. // want to make sure the peer we are connecting to matches our network config.
var network = this._matchNetwork(peer.network); var network = this._matchNetwork(peer.network);

View File

@ -11,6 +11,7 @@ function TransactionService(options) {
BaseService.call(this, options); BaseService.call(this, options);
this._db = this.node.services.db; this._db = this.node.services.db;
this._mempool = this.node.services.mempool; this._mempool = this.node.services.mempool;
this._block = this.node.services.block;
this._header = this.node.services.header; this._header = this.node.services.header;
this._p2p = this.node.services.p2p; this._p2p = this.node.services.p2p;
this._timestamp = this.node.services.timestamp; this._timestamp = this.node.services.timestamp;
@ -22,7 +23,9 @@ TransactionService.dependencies = [
'p2p', 'p2p',
'db', 'db',
'timestamp', 'timestamp',
'mempool' 'mempool',
'block',
'header'
]; ];
// ---- start public function protorypes // ---- start public function protorypes
@ -30,8 +33,7 @@ TransactionService.prototype.getAPIMethods = function() {
return [ return [
['getRawTransaction', this, this.getRawTransaction, 1], ['getRawTransaction', this, this.getRawTransaction, 1],
['getTransaction', this, this.getTransaction, 1], ['getTransaction', this, this.getTransaction, 1],
['getDetailedTransaction', this, this.getDetailedTransaction, 1], ['getDetailedTransaction', this, this.getDetailedTransaction, 1]
['getInputValues', this, this.getInputValues, 1]
]; ];
}; };
@ -74,22 +76,47 @@ TransactionService.prototype.getTransaction = function(txid, options, callback)
} }
async.waterfall([ async.waterfall([
function(next) { self._getTransaction.bind(self, txid, options),
self._getTransaction(txid, options, next); self._getSupplementaryTransactionInfo.bind(self),
},
self._getMempoolTransaction.bind(self), self._getMempoolTransaction.bind(self),
self.getInputValues.bind(self),
self._setMetaInfo.bind(self) self._setMetaInfo.bind(self)
], callback); ], callback);
}; };
TransactionService.prototype._getSupplementaryTransactionInfo = function(txid, tx, options, callback) {
if (!tx) {
return callback(null, txid, tx, options);
}
var self = this;
tx.confirmations = self._block.getTip().height - tx.__height + 1;
// TODO maybe we should index the block hash along with the height on tx,
// so that this extra lookup isn't necessary
self._header.getBlockHeader(tx.__height, function(err, header) {
if (err) {
return callback(err);
}
if (header) {
tx.blockHash = header.hash;
}
callback(null, txid, tx, options);
});
};
TransactionService.prototype._setMetaInfo = function(tx, options, callback) { TransactionService.prototype._setMetaInfo = function(tx, options, callback) {
if (!tx) { if (!tx) {
return callback(); return callback();
} }
// output values // output values
var outputSatoshis = 0; var outputSatoshis = 0;
@ -142,6 +169,7 @@ TransactionService.prototype._getMempoolTransaction = function(txid, tx, options
} }
tx.confirmations = 0; tx.confirmations = 0;
tx.blockHash = null;
callback(null, tx, options); callback(null, tx, options);
}); });
@ -152,8 +180,19 @@ TransactionService.prototype._getTransaction = function(txid, options, callback)
var self = this; var self = this;
var key = self._encoding.encodeTransactionKey(txid); // txs will be in the index, the current block at LOWER tx indexes
// or they don't exist for the purposes of this function
// inputValues will be on the tx already by this point.
var currentBlockTx = options && options.processedTxs &&
options.processedTxs[txid] ? options.processedTxs[txid] : null;
if (currentBlockTx) {
return setImmediate(function() {
callback(null, txid, currentBlockTx, options);
});
}
var key = self._encoding.encodeTransactionKey(txid);
self._db.get(key, function(err, tx) { self._db.get(key, function(err, tx) {
if (err) { if (err) {
@ -165,44 +204,19 @@ TransactionService.prototype._getTransaction = function(txid, options, callback)
} }
tx = self._encoding.decodeTransactionValue(tx); tx = self._encoding.decodeTransactionValue(tx);
tx.confirmations = self._header.getBestHeight() - tx.__height; callback(null, txid, tx, options);
self._header.getBlockHeader(tx.__height, function(err, header) {
if (err) {
return callback(err);
}
if (header) {
tx.blockHash = header.hash;
}
callback(null, txid, tx, options);
});
}); });
}; };
TransactionService.prototype.getInputValues = function(tx, options, callback) { TransactionService.prototype._getInputValues = function(tx, options, callback) {
var self = this; var self = this;
if (!tx) { async.mapLimit(tx.inputs, 4, function(input, next) {
return callback(null, tx, options);
}
async.eachOfLimit(tx.inputs, 4, function(input, index, next) { if (input.isCoinbase()) {
return next(null, 0);
if (!tx.__inputValues) {
tx.__inputValues = [];
}
var inputSatoshis = tx.__inputValues[index];
if (inputSatoshis >= 0 || input.isCoinbase()) {
return next();
} }
var outputIndex = input.prevout.index; var outputIndex = input.prevout.index;
@ -215,31 +229,12 @@ TransactionService.prototype.getInputValues = function(tx, options, callback) {
var output = _tx.outputs[outputIndex]; var output = _tx.outputs[outputIndex];
assert(output, 'Expected an output, but did not get one for tx: ' + _tx.txid() + ' outputIndex: ' + outputIndex); assert(output, 'Expected an output, but did not get one for tx: ' + _tx.txid() + ' outputIndex: ' + outputIndex);
tx.__inputValues[index] = output.value; next(null, output.value);
next();
}); });
}, function(err) { }, callback);
if (err) {
return callback(err);
}
var key = self._encoding.encodeTransactionKey(tx.txid());
var value = self._encoding.encodeTransactionValue(tx);
self._db.put(key, value, function(err) {
if (err) {
return callback(err);
}
callback(null, tx, options);
});
});
}; };
TransactionService.prototype.sendTransaction = function(tx, callback) { TransactionService.prototype.sendTransaction = function(tx, callback) {
@ -275,16 +270,25 @@ TransactionService.prototype._getBlockTimestamp = function(hash) {
TransactionService.prototype.onBlock = function(block, callback) { TransactionService.prototype.onBlock = function(block, callback) {
var self = this; var self = this;
var processedTxs = {};
if (self.node.stopping) { if (self.node.stopping) {
return callback(); return callback();
} }
var operations = block.txs.map(function(tx) { async.mapSeries(block.txs, function(tx, next) {
return self._processTransaction(tx, { block: block });
});
callback(null, operations); processedTxs[tx.txid()] = tx;
self._processTransaction(tx, { block: block, processedTxs: processedTxs }, next);
}, function(err, operations) {
if (err) {
return callback(err);
}
callback(null, operations);
});
}; };
@ -316,27 +320,39 @@ TransactionService.prototype.onReorg = function(args, callback) {
}; };
TransactionService.prototype._processTransaction = function(tx, opts) { TransactionService.prototype._processTransaction = function(tx, opts, callback) {
// this index is very simple txid -> tx, but we also need to find each // this index is very simple txid -> tx, but we also need to find each
// input's prev output value, the adjusted timestamp for the block and // input's prev output value, the adjusted timestamp for the block and
// the tx's block height // the tx's block height
// input values var self = this;
tx.__inputValues = []; // these are lazy-loaded on the first access of the tx
// timestamp self._getInputValues(tx, opts, function(err, inputValues) {
tx.__timestamp = this._getBlockTimestamp(opts.block.rhash());
assert(tx.__timestamp, 'Timestamp is required when saving a transaction.');
// height if (err) {
tx.__height = opts.block.height; return callback(err);
assert(tx.__height, 'Block height is required when saving a trasnaction.'); }
return { assert(inputValues && inputValues.length === tx.inputs.length,
key: this._encoding.encodeTransactionKey(tx.txid()), 'Input values missing from tx.');
value: this._encoding.encodeTransactionValue(tx)
}; // inputValues
tx.__inputValues = inputValues;
// timestamp
tx.__timestamp = self._getBlockTimestamp(opts.block.rhash());
assert(tx.__timestamp, 'Timestamp is required when saving a transaction.');
// height
tx.__height = opts.block.height;
assert(tx.__height, 'Block height is required when saving a trasnaction.');
callback(null, {
key: self._encoding.encodeTransactionKey(tx.txid()),
value: self._encoding.encodeTransactionValue(tx)
});
});
}; };

2
package-lock.json generated
View File

@ -1,6 +1,6 @@
{ {
"name": "bitcore-node", "name": "bitcore-node",
"version": "5.0.0-beta.4", "version": "5.0.0-beta.5",
"lockfileVersion": 1, "lockfileVersion": 1,
"requires": true, "requires": true,
"dependencies": { "dependencies": {

View File

@ -5,7 +5,7 @@
"node": ">=8.0.0" "node": ">=8.0.0"
}, },
"author": "BitPay <dev@bitpay.com>", "author": "BitPay <dev@bitpay.com>",
"version": "5.0.0-beta.5", "version": "5.0.0-beta.6",
"main": "./index.js", "main": "./index.js",
"repository": "git://github.com/bitpay/bitcore-node.git", "repository": "git://github.com/bitpay/bitcore-node.git",
"homepage": "https://github.com/bitpay/bitcore-node", "homepage": "https://github.com/bitpay/bitcore-node",