add noBalance options + mempoolAddressIndex

This commit is contained in:
Matias Alejo Garcia 2016-02-08 12:41:49 -05:00
parent 6a899e4b9c
commit e7e33313cf
2 changed files with 106 additions and 77 deletions

View File

@ -44,6 +44,7 @@ function AddressHistory(args) {
AddressHistory.prototype._mergeAndSortTxids = function(summaries) { AddressHistory.prototype._mergeAndSortTxids = function(summaries) {
var appearanceIds = {}; var appearanceIds = {};
var unconfirmedAppearanceIds = {}; var unconfirmedAppearanceIds = {};
for (var i = 0; i < summaries.length; i++) { for (var i = 0; i < summaries.length; i++) {
var summary = summaries[i]; var summary = summaries[i];
for (var key in summary.appearanceIds) { for (var key in summary.appearanceIds) {
@ -79,6 +80,8 @@ AddressHistory.prototype.get = function(callback) {
return callback(new TypeError('Maximum number of addresses (' + this.maxAddressesQuery + ') exceeded')); return callback(new TypeError('Maximum number of addresses (' + this.maxAddressesQuery + ') exceeded'));
} }
this.options.noBalance = true;
if (this.addresses.length === 1) { if (this.addresses.length === 1) {
var address = this.addresses[0]; var address = this.addresses[0];
self.node.services.address.getAddressSummary(address, this.options, function(err, summary) { self.node.services.address.getAddressSummary(address, this.options, function(err, summary) {
@ -89,10 +92,11 @@ AddressHistory.prototype.get = function(callback) {
}); });
} else { } else {
var opts = _.clone(this.options); var opts = _.clone(this.options);
opts.fullTxList = true; opts.fullTxList = true;
async.mapLimit( async.mapLimit(
self.addresses, self.addresses,
self.maxAddressesLimit, self.maxaddressesLimit,
function(address, next) { function(address, next) {
self.node.services.address.getAddressSummary(address, opts, next); self.node.services.address.getAddressSummary(address, opts, next);
}, },

View File

@ -57,6 +57,7 @@ var AddressService = function(options) {
} }
this.mempoolIndex = null; // Used for larger mempool indexes this.mempoolIndex = null; // Used for larger mempool indexes
this.mempoolSpentIndex = {}; // Used for small quick synchronous lookups this.mempoolSpentIndex = {}; // Used for small quick synchronous lookups
this.mempoolAddressIndex = {}; // Used to check if an address is on the spend pool
}; };
inherits(AddressService, BaseService); inherits(AddressService, BaseService);
@ -70,6 +71,7 @@ AddressService.prototype.start = function(callback) {
var self = this; var self = this;
async.series([ async.series([
function(next) { function(next) {
// Flush any existing mempool index // Flush any existing mempool index
if (fs.existsSync(self.mempoolIndexPath)) { if (fs.existsSync(self.mempoolIndexPath)) {
@ -88,8 +90,7 @@ AddressService.prototype.start = function(callback) {
}, },
function(next) { function(next) {
self.mempoolIndex = levelup( self.mempoolIndex = levelup(
self.mempoolIndexPath, self.mempoolIndexPath, {
{
db: self.levelupStore, db: self.levelupStore,
keyEncoding: 'binary', keyEncoding: 'binary',
valueEncoding: 'binary', valueEncoding: 'binary',
@ -154,20 +155,17 @@ AddressService.prototype.getAPIMethods = function() {
* Called by the Bus to get the available events for this service. * Called by the Bus to get the available events for this service.
*/ */
AddressService.prototype.getPublishEvents = function() { AddressService.prototype.getPublishEvents = function() {
return [ return [{
{ name: 'address/transaction',
name: 'address/transaction', scope: this,
scope: this, subscribe: this.subscribe.bind(this, 'address/transaction'),
subscribe: this.subscribe.bind(this, 'address/transaction'), unsubscribe: this.unsubscribe.bind(this, 'address/transaction')
unsubscribe: this.unsubscribe.bind(this, 'address/transaction') }, {
}, name: 'address/balance',
{ scope: this,
name: 'address/balance', subscribe: this.subscribe.bind(this, 'address/balance'),
scope: this, unsubscribe: this.unsubscribe.bind(this, 'address/balance')
subscribe: this.subscribe.bind(this, 'address/balance'), }];
unsubscribe: this.unsubscribe.bind(this, 'address/balance')
}
];
}; };
/** /**
@ -305,6 +303,14 @@ AddressService.prototype.updateMempoolIndex = function(tx, add, callback) {
continue; continue;
} }
var hashBufferHex = addressInfo.hashBuffer.toString('hex');
if (add) {
this.mempoolAddressIndex[hashBufferHex] = true;
} else {
delete this.mempoolAddressIndex[hashBufferHex];
}
// Update output index // Update output index
var outputIndexBuffer = new Buffer(4); var outputIndexBuffer = new Buffer(4);
outputIndexBuffer.writeUInt32BE(outputIndex); outputIndexBuffer.writeUInt32BE(outputIndex);
@ -397,6 +403,12 @@ AddressService.prototype.updateMempoolIndex = function(tx, add, callback) {
value: inputValue value: inputValue
}); });
var inputHashBufferHex = inputHashBuffer.toString('hex');
if (add) {
this.mempoolAddressIndex[inputHashBufferHex] = true;
} else {
delete this.mempoolAddressIndex[inputHashBufferHex];
}
} }
if (!callback) { if (!callback) {
@ -446,7 +458,7 @@ AddressService.prototype.blockHandler = function(block, addOutput, callback) {
var script = output.script; var script = output.script;
if(!script) { if (!script) {
log.debug('Invalid script'); log.debug('Invalid script');
continue; continue;
} }
@ -462,7 +474,7 @@ AddressService.prototype.blockHandler = function(block, addOutput, callback) {
// less than the mean of the 11 previous blocks) and not greater than 2 // less than the mean of the 11 previous blocks) and not greater than 2
// hours in the future. // hours in the future.
var key = encoding.encodeOutputKey(addressInfo.hashBuffer, addressInfo.hashTypeBuffer, var key = encoding.encodeOutputKey(addressInfo.hashBuffer, addressInfo.hashTypeBuffer,
height, txidBuffer, outputIndex); height, txidBuffer, outputIndex);
var value = encoding.encodeOutputValue(output.satoshis, output._scriptBuffer); var value = encoding.encodeOutputValue(output.satoshis, output._scriptBuffer);
operations.push({ operations.push({
type: action, type: action,
@ -494,11 +506,11 @@ AddressService.prototype.blockHandler = function(block, addOutput, callback) {
this.transactionEventHandler(txmessages[addressKey]); this.transactionEventHandler(txmessages[addressKey]);
} }
if(tx.isCoinbase()) { if (tx.isCoinbase()) {
continue; continue;
} }
for(var inputIndex = 0; inputIndex < inputs.length; inputIndex++) { for (var inputIndex = 0; inputIndex < inputs.length; inputIndex++) {
var input = inputs[inputIndex]; var input = inputs[inputIndex];
var inputHash; var inputHash;
@ -560,14 +572,14 @@ AddressService.prototype.blockHandler = function(block, addOutput, callback) {
* @param {Boolean} obj.rejected - If the transaction was not accepted in the mempool * @param {Boolean} obj.rejected - If the transaction was not accepted in the mempool
*/ */
AddressService.prototype.transactionEventHandler = function(obj) { AddressService.prototype.transactionEventHandler = function(obj) {
if(this.subscriptions['address/transaction'][obj.addressInfo.hashHex]) { if (this.subscriptions['address/transaction'][obj.addressInfo.hashHex]) {
var emitters = this.subscriptions['address/transaction'][obj.addressInfo.hashHex]; var emitters = this.subscriptions['address/transaction'][obj.addressInfo.hashHex];
var address = new Address({ var address = new Address({
hashBuffer: obj.addressInfo.hashBuffer, hashBuffer: obj.addressInfo.hashBuffer,
network: this.node.network, network: this.node.network,
type: obj.addressInfo.addressType type: obj.addressInfo.addressType
}); });
for(var i = 0; i < emitters.length; i++) { for (var i = 0; i < emitters.length; i++) {
emitters[i].emit('address/transaction', { emitters[i].emit('address/transaction', {
rejected: obj.rejected, rejected: obj.rejected,
height: obj.height, height: obj.height,
@ -591,7 +603,7 @@ AddressService.prototype.transactionEventHandler = function(obj) {
* @param {String} obj.addressType * @param {String} obj.addressType
*/ */
AddressService.prototype.balanceEventHandler = function(block, obj) { AddressService.prototype.balanceEventHandler = function(block, obj) {
if(this.subscriptions['address/balance'][obj.hashHex]) { if (this.subscriptions['address/balance'][obj.hashHex]) {
var emitters = this.subscriptions['address/balance'][obj.hashHex]; var emitters = this.subscriptions['address/balance'][obj.hashHex];
var address = new Address({ var address = new Address({
hashBuffer: obj.hashBuffer, hashBuffer: obj.hashBuffer,
@ -599,10 +611,10 @@ AddressService.prototype.balanceEventHandler = function(block, obj) {
type: obj.addressType type: obj.addressType
}); });
this.getBalance(address, true, function(err, balance) { this.getBalance(address, true, function(err, balance) {
if(err) { if (err) {
return this.emit(err); return this.emit(err);
} }
for(var i = 0; i < emitters.length; i++) { for (var i = 0; i < emitters.length; i++) {
emitters[i].emit('address/balance', address, balance, block); emitters[i].emit('address/balance', address, balance, block);
} }
}); });
@ -621,9 +633,9 @@ AddressService.prototype.subscribe = function(name, emitter, addresses) {
$.checkArgument(emitter instanceof EventEmitter, 'First argument is expected to be an EventEmitter'); $.checkArgument(emitter instanceof EventEmitter, 'First argument is expected to be an EventEmitter');
$.checkArgument(Array.isArray(addresses), 'Second argument is expected to be an Array of addresses'); $.checkArgument(Array.isArray(addresses), 'Second argument is expected to be an Array of addresses');
for(var i = 0; i < addresses.length; i++) { for (var i = 0; i < addresses.length; i++) {
var hashHex = bitcore.Address(addresses[i]).hashBuffer.toString('hex'); var hashHex = bitcore.Address(addresses[i]).hashBuffer.toString('hex');
if(!this.subscriptions[name][hashHex]) { if (!this.subscriptions[name][hashHex]) {
this.subscriptions[name][hashHex] = []; this.subscriptions[name][hashHex] = [];
} }
this.subscriptions[name][hashHex].push(emitter); this.subscriptions[name][hashHex].push(emitter);
@ -641,16 +653,16 @@ AddressService.prototype.unsubscribe = function(name, emitter, addresses) {
$.checkArgument(emitter instanceof EventEmitter, 'First argument is expected to be an EventEmitter'); $.checkArgument(emitter instanceof EventEmitter, 'First argument is expected to be an EventEmitter');
$.checkArgument(Array.isArray(addresses) || _.isUndefined(addresses), 'Second argument is expected to be an Array of addresses or undefined'); $.checkArgument(Array.isArray(addresses) || _.isUndefined(addresses), 'Second argument is expected to be an Array of addresses or undefined');
if(!addresses) { if (!addresses) {
return this.unsubscribeAll(name, emitter); return this.unsubscribeAll(name, emitter);
} }
for(var i = 0; i < addresses.length; i++) { for (var i = 0; i < addresses.length; i++) {
var hashHex = bitcore.Address(addresses[i]).hashBuffer.toString('hex'); var hashHex = bitcore.Address(addresses[i]).hashBuffer.toString('hex');
if(this.subscriptions[name][hashHex]) { if (this.subscriptions[name][hashHex]) {
var emitters = this.subscriptions[name][hashHex]; var emitters = this.subscriptions[name][hashHex];
var index = emitters.indexOf(emitter); var index = emitters.indexOf(emitter);
if(index > -1) { if (index > -1) {
emitters.splice(index, 1); emitters.splice(index, 1);
} }
} }
@ -665,10 +677,10 @@ AddressService.prototype.unsubscribe = function(name, emitter, addresses) {
AddressService.prototype.unsubscribeAll = function(name, emitter) { AddressService.prototype.unsubscribeAll = function(name, emitter) {
$.checkArgument(emitter instanceof EventEmitter, 'First argument is expected to be an EventEmitter'); $.checkArgument(emitter instanceof EventEmitter, 'First argument is expected to be an EventEmitter');
for(var hashHex in this.subscriptions[name]) { for (var hashHex in this.subscriptions[name]) {
var emitters = this.subscriptions[name][hashHex]; var emitters = this.subscriptions[name][hashHex];
var index = emitters.indexOf(emitter); var index = emitters.indexOf(emitter);
if(index > -1) { if (index > -1) {
emitters.splice(index, 1); emitters.splice(index, 1);
} }
} }
@ -683,7 +695,7 @@ AddressService.prototype.unsubscribeAll = function(name, emitter) {
*/ */
AddressService.prototype.getBalance = function(address, queryMempool, callback) { AddressService.prototype.getBalance = function(address, queryMempool, callback) {
this.getUnspentOutputs(address, queryMempool, function(err, outputs) { this.getUnspentOutputs(address, queryMempool, function(err, outputs) {
if(err) { if (err) {
return callback(err); return callback(err);
} }
@ -852,7 +864,7 @@ AddressService.prototype.getInputs = function(addressStr, options, callback) {
stream.on('data', function(input) { stream.on('data', function(input) {
inputs.push(input); inputs.push(input);
if (inputs.length > self.maxInputsQueryLength) { if (inputs.length > self.maxInputsQueryLength) {
log.warn('Tried to query too many inputs (' + self.maxInputsQueryLength + ') for address '+ addressStr); log.warn('Tried to query too many inputs (' + self.maxInputsQueryLength + ') for address ' + addressStr);
error = new Error('Maximum number of inputs (' + self.maxInputsQueryLength + ') per query reached'); error = new Error('Maximum number of inputs (' + self.maxInputsQueryLength + ') per query reached');
stream.end(); stream.end();
} }
@ -871,7 +883,7 @@ AddressService.prototype.getInputs = function(addressStr, options, callback) {
return callback(error); return callback(error);
} }
if(options.queryMempool) { if (options.queryMempool) {
self._getInputsMempool(addressStr, hashBuffer, hashTypeBuffer, function(err, mempoolInputs) { self._getInputsMempool(addressStr, hashBuffer, hashTypeBuffer, function(err, mempoolInputs) {
if (err) { if (err) {
return callback(err); return callback(err);
@ -1092,7 +1104,7 @@ AddressService.prototype.getOutputs = function(addressStr, options, callback) {
return callback(error); return callback(error);
} }
if(options.queryMempool) { if (options.queryMempool) {
self._getOutputsMempool(addressStr, hashBuffer, hashTypeBuffer, function(err, mempoolOutputs) { self._getOutputsMempool(addressStr, hashBuffer, hashTypeBuffer, function(err, mempoolOutputs) {
if (err) { if (err) {
return callback(err); return callback(err);
@ -1176,7 +1188,7 @@ AddressService.prototype._getOutputsMempool = function(addressStr, hashBuffer, h
AddressService.prototype.getUnspentOutputs = function(addresses, queryMempool, callback) { AddressService.prototype.getUnspentOutputs = function(addresses, queryMempool, callback) {
var self = this; var self = this;
if(!Array.isArray(addresses)) { if (!Array.isArray(addresses)) {
addresses = [addresses]; addresses = [addresses];
} }
@ -1184,9 +1196,9 @@ AddressService.prototype.getUnspentOutputs = function(addresses, queryMempool, c
async.eachSeries(addresses, function(address, next) { async.eachSeries(addresses, function(address, next) {
self.getUnspentOutputsForAddress(address, queryMempool, function(err, unspents) { self.getUnspentOutputsForAddress(address, queryMempool, function(err, unspents) {
if(err && err instanceof errors.NoOutputs) { if (err && err instanceof errors.NoOutputs) {
return next(); return next();
} else if(err) { } else if (err) {
return next(err); return next(err);
} }
@ -1208,10 +1220,12 @@ AddressService.prototype.getUnspentOutputsForAddress = function(address, queryMe
var self = this; var self = this;
this.getOutputs(address, {queryMempool: queryMempool}, function(err, outputs) { this.getOutputs(address, {
queryMempool: queryMempool
}, function(err, outputs) {
if (err) { if (err) {
return callback(err); return callback(err);
} else if(!outputs.length) { } else if (!outputs.length) {
return callback(new errors.NoOutputs('Address ' + address + ' has no outputs'), []); return callback(new errors.NoOutputs('Address ' + address + ' has no outputs'), []);
} }
@ -1336,6 +1350,7 @@ AddressService.prototype.getAddressSummary = function(addressArg, options, callb
} }
async.waterfall([ async.waterfall([
function(next) { function(next) {
self._getAddressConfirmedSummary(address, options, next); self._getAddressConfirmedSummary(address, options, next);
}, },
@ -1357,9 +1372,7 @@ AddressService.prototype.getAddressSummary = function(addressArg, options, callb
var seconds = Math.round(timeDelta / 1000); var seconds = Math.round(timeDelta / 1000);
log.warn('Slow (' + seconds + 's) getAddressSummary request for address: ' + address.toString()); log.warn('Slow (' + seconds + 's) getAddressSummary request for address: ' + address.toString());
} }
callback(null, summary); callback(null, summary);
}); });
}; };
@ -1375,6 +1388,7 @@ AddressService.prototype._getAddressConfirmedSummary = function(address, options
}; };
async.waterfall([ async.waterfall([
function(next) { function(next) {
self._getAddressConfirmedInputsSummary(address, baseResult, options, next); self._getAddressConfirmedInputsSummary(address, baseResult, options, next);
}, },
@ -1421,8 +1435,8 @@ AddressService.prototype._getAddressConfirmedInputsSummary = function(address, r
AddressService.prototype._getAddressConfirmedOutputsSummary = function(address, result, options, callback) { AddressService.prototype._getAddressConfirmedOutputsSummary = function(address, result, options, callback) {
$.checkArgument(address instanceof Address); $.checkArgument(address instanceof Address);
$.checkArgument(!_.isUndefined(result) && $.checkArgument(!_.isUndefined(result) &&
!_.isUndefined(result.appearanceIds) && !_.isUndefined(result.appearanceIds) &&
!_.isUndefined(result.unconfirmedAppearanceIds)); !_.isUndefined(result.unconfirmedAppearanceIds));
var self = this; var self = this;
var count = 0; var count = 0;
@ -1434,25 +1448,27 @@ AddressService.prototype._getAddressConfirmedOutputsSummary = function(address,
var txid = output.txid; var txid = output.txid;
var outputIndex = output.outputIndex; var outputIndex = output.outputIndex;
// Bitcoind's isSpent only works for confirmed transactions if (!options.noBalance) {
var spentDB = self.node.services.bitcoind.isSpent(txid, outputIndex); // Bitcoind's isSpent only works for confirmed transactions
result.totalReceived += output.satoshis; var spentDB = self.node.services.bitcoind.isSpent(txid, outputIndex);
result.appearanceIds[txid] = output.height; result.totalReceived += output.satoshis;
result.appearanceIds[txid] = output.height;
if (!spentDB) { if (!spentDB) {
result.balance += output.satoshis; result.balance += output.satoshis;
} }
if (options.queryMempool) { if (options.queryMempool) {
// Check to see if this output is spent in the mempool and if so // Check to see if this output is spent in the mempool and if so
// we will subtract it from the unconfirmedBalance (a.k.a unconfirmedDelta) // we will subtract it from the unconfirmedBalance (a.k.a unconfirmedDelta)
var spentIndexSyncKey = encoding.encodeSpentIndexSyncKey( var spentIndexSyncKey = encoding.encodeSpentIndexSyncKey(
new Buffer(txid, 'hex'), // TODO: get buffer directly new Buffer(txid, 'hex'), // TODO: get buffer directly
outputIndex outputIndex
); );
var spentMempool = self.mempoolSpentIndex[spentIndexSyncKey]; var spentMempool = self.mempoolSpentIndex[spentIndexSyncKey];
if (spentMempool) { if (spentMempool) {
result.unconfirmedBalance -= output.satoshis; result.unconfirmedBalance -= output.satoshis;
}
} }
} }
@ -1504,38 +1520,47 @@ AddressService.prototype._getAddressMempoolSummary = function(address, options,
var addressStr = address.toString(); var addressStr = address.toString();
var hashBuffer = address.hashBuffer; var hashBuffer = address.hashBuffer;
var hashTypeBuffer = constants.HASH_TYPES_MAP[address.type]; var hashTypeBuffer = constants.HASH_TYPES_MAP[address.type];
var hashBufferHex = hashBuffer.toString('hex');
if (!this.mempoolAddressIndex[hashBufferHex]) {
return callback(null, result);
}
async.waterfall([ async.waterfall([
function(next) { function(next) {
self._getInputsMempool(addressStr, hashBuffer, hashTypeBuffer, function(err, mempoolInputs) { self._getInputsMempool(addressStr, hashBuffer, hashTypeBuffer, function(err, mempoolInputs) {
if (err) { if (err) {
return next(err); return next(err);
} }
for(var i = 0; i < mempoolInputs.length; i++) { for (var i = 0; i < mempoolInputs.length; i++) {
var input = mempoolInputs[i]; var input = mempoolInputs[i];
result.unconfirmedAppearanceIds[input.txid] = input.timestamp; result.unconfirmedAppearanceIds[input.txid] = input.timestamp;
} }
next(null, result); next(null, result);
}); });
}, function(result, next) { },
function(result, next) {
self._getOutputsMempool(addressStr, hashBuffer, hashTypeBuffer, function(err, mempoolOutputs) { self._getOutputsMempool(addressStr, hashBuffer, hashTypeBuffer, function(err, mempoolOutputs) {
if (err) { if (err) {
return next(err); return next(err);
} }
for(var i = 0; i < mempoolOutputs.length; i++) { if (!options.noBalance) {
var output = mempoolOutputs[i]; for (var i = 0; i < mempoolOutputs.length; i++) {
var output = mempoolOutputs[i];
result.unconfirmedAppearanceIds[output.txid] = output.timestamp; result.unconfirmedAppearanceIds[output.txid] = output.timestamp;
var spentIndexSyncKey = encoding.encodeSpentIndexSyncKey( var spentIndexSyncKey = encoding.encodeSpentIndexSyncKey(
new Buffer(output.txid, 'hex'), // TODO: get buffer directly new Buffer(output.txid, 'hex'), // TODO: get buffer directly
output.outputIndex output.outputIndex
); );
var spentMempool = self.mempoolSpentIndex[spentIndexSyncKey]; var spentMempool = self.mempoolSpentIndex[spentIndexSyncKey];
// Only add this to the balance if it's not spent in the mempool already // Only add this to the balance if it's not spent in the mempool already
if (!spentMempool) { if (!spentMempool) {
result.unconfirmedBalance += output.satoshis; result.unconfirmedBalance += output.satoshis;
}
} }
} }
next(null, result); next(null, result);