Address Service: Start to cache getAddressSummary based on range of block heights

This commit is contained in:
Braydon Fuller 2015-12-30 00:16:22 -05:00
parent cab25cf397
commit 40eb4f50ae
4 changed files with 197 additions and 39 deletions

View File

@ -37,5 +37,9 @@ exports.HASH_TYPES_MAP = {
exports.SPACER_MIN = new Buffer('00', 'hex'); exports.SPACER_MIN = new Buffer('00', 'hex');
exports.SPACER_MAX = new Buffer('ff', 'hex'); exports.SPACER_MAX = new Buffer('ff', 'hex');
// The total number of transactions that an address can receive before it will start
// to cache the summary to disk.
exports.SUMMARY_CACHE_THRESHOLD = 10000;
module.exports = exports; module.exports = exports;

View File

@ -160,6 +160,65 @@ exports.decodeInputValueMap = function(buffer) {
}; };
}; };
exports.encodeSummaryCacheKey = function(address) {
return Buffer.concat([address.hashBuffer, constants.HASH_TYPES_BUFFER[address.type]]);
};
exports.decodeSummaryCacheKey = function(buffer, network) {
var hashBuffer = buffer.read(20);
var type = constants.HASH_TYPES_READABLE[buffer.read(20, 2).toString('hex')];
var address = new Address({
hashBuffer: hashBuffer,
type: type,
network: network
});
return address;
};
exports.encodeSummaryCacheValue = function(cache, tipHeight) {
var buffer = new Buffer(new Array(20));
buffer.writeUInt32BE(tipHeight);
buffer.writeDoubleBE(cache.result.totalReceived, 4);
buffer.writeDoubleBE(cache.result.balance, 12);
var txidBuffers = [];
for (var key in cache.result.appearanceIds) {
txidBuffers.push(new Buffer(key, 'hex'));
}
var txidsBuffer = Buffer.concat(txidBuffers);
var value = Buffer.concat([buffer, txidsBuffer]);
return value;
};
exports.decodeSummaryCacheValue = function(buffer) {
var height = buffer.readUInt32BE();
var totalReceived = buffer.readDoubleBE(4);
var balance = buffer.readDoubleBE(12);
// read 32 byte chunks until exhausted
var appearanceIds = {};
var pos = 16;
while(pos < buffer.length) {
var txid = buffer.slice(pos, pos + 32).toString('hex');
appearanceIds[txid] = true;
pos += 32;
}
var cache = {
height: height,
result: {
appearanceIds: appearanceIds,
totalReceived: totalReceived,
balance: balance,
unconfirmedAppearanceIds: {}, // unconfirmed values are never stored in cache
unconfirmedBalance: 0
}
};
return cache;
};
exports.getAddressInfo = function(addressStr) { exports.getAddressInfo = function(addressStr) {
var addrObj = bitcore.Address(addressStr); var addrObj = bitcore.Address(addressStr);
var hashTypeBuffer = constants.HASH_TYPES_MAP[addrObj.type]; var hashTypeBuffer = constants.HASH_TYPES_MAP[addrObj.type];

View File

@ -44,7 +44,10 @@ var AddressService = function(options) {
this.node.services.bitcoind.on('tx', this.transactionHandler.bind(this)); this.node.services.bitcoind.on('tx', this.transactionHandler.bind(this));
this.node.services.bitcoind.on('txleave', this.transactionLeaveHandler.bind(this)); this.node.services.bitcoind.on('txleave', this.transactionLeaveHandler.bind(this));
this.summaryCacheThreshold = options.summaryCacheThreshold || constants.SUMMARY_CACHE_THRESHOLD;
this._setMempoolIndexPath(); this._setMempoolIndexPath();
this._setSummaryCachePath();
if (options.mempoolMemoryIndex) { if (options.mempoolMemoryIndex) {
this.levelupStore = memdown; this.levelupStore = memdown;
} else { } else {
@ -74,6 +77,7 @@ AddressService.prototype.start = function(callback) {
} }
}, },
function(next) { function(next) {
// Setup new mempool index
if (!fs.existsSync(self.mempoolIndexPath)) { if (!fs.existsSync(self.mempoolIndexPath)) {
mkdirp(self.mempoolIndexPath, next); mkdirp(self.mempoolIndexPath, next);
} else { } else {
@ -87,7 +91,21 @@ AddressService.prototype.start = function(callback) {
db: self.levelupStore, db: self.levelupStore,
keyEncoding: 'binary', keyEncoding: 'binary',
valueEncoding: 'binary', valueEncoding: 'binary',
fillCache: false fillCache: false,
maxOpenFiles: 200
},
next
);
},
function(next) {
self.summaryCache = levelup(
self.summaryCachePath,
{
db: self.levelupStore,
keyEncoding: 'binary',
valueEncoding: 'binary',
fillCache: false,
maxOpenFiles: 200
}, },
next next
); );
@ -102,21 +120,35 @@ AddressService.prototype.stop = function(callback) {
}; };
/** /**
* This function will set `this.dataPath` based on `this.node.network`. * This function will set `this.summaryCachePath` based on `this.node.network`.
* @private
*/
AddressService.prototype._setSummaryCachePath = function() {
this.summaryCachePath = this._getDBPathFor('bitcore-addresssummary.db');
};
/**
* This function will set `this.mempoolIndexPath` based on `this.node.network`.
* @private * @private
*/ */
AddressService.prototype._setMempoolIndexPath = function() { AddressService.prototype._setMempoolIndexPath = function() {
this.mempoolIndexPath = this._getDBPathFor('bitcore-addressmempool.db');
};
AddressService.prototype._getDBPathFor = function(dbname) {
$.checkState(this.node.datadir, 'Node is expected to have a "datadir" property'); $.checkState(this.node.datadir, 'Node is expected to have a "datadir" property');
var path;
var regtest = Networks.get('regtest'); var regtest = Networks.get('regtest');
if (this.node.network === Networks.livenet) { if (this.node.network === Networks.livenet) {
this.mempoolIndexPath = this.node.datadir + '/bitcore-addressmempool.db'; path = this.node.datadir + '/' + dbname;
} else if (this.node.network === Networks.testnet) { } else if (this.node.network === Networks.testnet) {
this.mempoolIndexPath = this.node.datadir + '/testnet3/bitcore-addressmempool.db'; path = this.node.datadir + '/testnet3/' + dbname;
} else if (this.node.network === regtest) { } else if (this.node.network === regtest) {
this.mempoolIndexPath = this.node.datadir + '/regtest/bitcore-addressmempool.db'; path = this.node.datadir + '/regtest/' + dbname;
} else { } else {
throw new Error('Unknown network: ' + this.network); throw new Error('Unknown network: ' + this.network);
} }
return path;
}; };
/** /**
@ -1270,32 +1302,49 @@ AddressService.prototype.getAddressHistory = function(addresses, options, callba
AddressService.prototype.getAddressSummary = function(addressArg, options, callback) { AddressService.prototype.getAddressSummary = function(addressArg, options, callback) {
var self = this; var self = this;
var startTime = new Date();
var address = new Address(addressArg); var address = new Address(addressArg);
var tipHeight = this.node.services.db.tip.__height;
async.waterfall([ async.waterfall([
function(next) { function(next) {
self._getAddressInputsSummary(address, options, next); self._getAddressSummaryCache(address, next);
}, },
function(result, next) { function(cache, next) {
self._getAddressOutputsSummary(address, options, result, next); self._getAddressInputsSummary(address, cache, tipHeight, next);
},
function(cache, next) {
self._getAddressOutputsSummary(address, cache, tipHeight, next);
},
function(cache, next) {
self._saveAddressSummaryCache(address, cache, tipHeight, next);
} }
], function(err, result) { ], function(err, cache) {
if (err) { if (err) {
return callback(err); return callback(err);
} }
var result = cache.result;
var confirmedTxids = Object.keys(result.appearanceIds); var confirmedTxids = Object.keys(result.appearanceIds);
var unconfirmedTxids = Object.keys(result.unconfirmedAppearanceIds); var unconfirmedTxids = Object.keys(result.unconfirmedAppearanceIds);
var summary = { var summary = {
totalReceived: result.totalReceived, totalReceived: result.totalReceived,
totalSpent: result.totalSpent, totalSpent: result.totalReceived - result.balance,
balance: result.balance, balance: result.balance,
unconfirmedBalance: result.unconfirmedBalance,
appearances: confirmedTxids.length, appearances: confirmedTxids.length,
unconfirmedBalance: result.unconfirmedBalance,
unconfirmedAppearances: unconfirmedTxids.length unconfirmedAppearances: unconfirmedTxids.length
}; };
var timeDelta = new Date() - startTime;
if (timeDelta > 5000) {
var seconds = Math.round(timeDelta / 1000);
log.warn('Slow (' + seconds + 's) getAddressSummary request for address: ' + address.toString());
log.warn('Address Summary:', summary);
}
if (!options.noTxList) { if (!options.noTxList) {
var txids = confirmedTxids.concat(unconfirmedTxids); var txids = confirmedTxids.concat(unconfirmedTxids);
@ -1315,20 +1364,64 @@ AddressService.prototype.getAddressSummary = function(addressArg, options, callb
}; };
AddressService.prototype._getAddressInputsSummary = function(address, options, callback) { AddressService.prototype._saveAddressSummaryCache = function(address, cache, tipHeight, callback) {
var transactionLength = Object.keys(cache.result.appearanceIds).length;
var exceedsCacheThreshold = (transactionLength > this.summaryCacheThreshold);
if (exceedsCacheThreshold) {
log.info('Saving address summary cache for: ' + address.toString() + 'at height: ' + tipHeight);
var key = encoding.encodeSummaryCacheKey(address);
var value = encoding.encodeSummaryCacheValue(cache, tipHeight);
this.summaryCache.put(key, value, function(err) {
if (err) {
return callback(err);
}
callback(null, cache);
});
} else {
callback(null, cache);
}
};
AddressService.prototype._getAddressSummaryCache = function(address, callback) {
var baseCache = {
result: {
appearanceIds: {},
totalReceived: 0,
balance: 0,
unconfirmedAppearanceIds: {},
unconfirmedBalance: 0
}
};
var key = encoding.encodeSummaryCacheKey(address);
this.summaryCache.get(key, {
valueEncoding: 'binary',
keyEncoding: 'binary'
}, function(err, buffer) {
if (err instanceof levelup.errors.NotFoundError) {
return callback(null, baseCache);
} else if (err) {
return callback(err);
}
var cache = encoding.decodeSummaryCacheValue(buffer);
callback(null, cache);
});
};
AddressService.prototype._getAddressInputsSummary = function(address, cache, tipHeight, callback) {
$.checkArgument(address instanceof Address); $.checkArgument(address instanceof Address);
var self = this; var self = this;
var error = null; var error = null;
var result = {
appearanceIds: {}, var opts = {
unconfirmedAppearanceIds: {}, start: _.isUndefined(cache.height) ? 0 : cache.height + 1,
end: tipHeight
}; };
var inputsStream = self.createInputsStream(address, options); var inputsStream = self.createInputsStream(address, opts);
inputsStream.on('data', function(input) { inputsStream.on('data', function(input) {
var txid = input.txid; var txid = input.txid;
result.appearanceIds[txid] = true; cache.result.appearanceIds[txid] = true;
}); });
inputsStream.on('error', function(err) { inputsStream.on('error', function(err) {
@ -1347,27 +1440,27 @@ AddressService.prototype._getAddressInputsSummary = function(address, options, c
} }
for(var i = 0; i < mempoolInputs.length; i++) { for(var i = 0; i < mempoolInputs.length; i++) {
var input = mempoolInputs[i]; var input = mempoolInputs[i];
result.unconfirmedAppearanceIds[input.txid] = true; cache.result.unconfirmedAppearanceIds[input.txid] = true;
} }
callback(error, result); callback(error, cache);
}); });
}); });
}; };
AddressService.prototype._getAddressOutputsSummary = function(address, options, result, callback) { AddressService.prototype._getAddressOutputsSummary = function(address, cache, tipHeight, callback) {
$.checkArgument(address instanceof Address); $.checkArgument(address instanceof Address);
$.checkArgument(!_.isUndefined(result) && $.checkArgument(!_.isUndefined(cache.result) &&
!_.isUndefined(result.appearanceIds) && !_.isUndefined(cache.result.appearanceIds) &&
!_.isUndefined(result.unconfirmedAppearanceIds)); !_.isUndefined(cache.result.unconfirmedAppearanceIds));
var self = this; var self = this;
var outputStream = self.createOutputsStream(address, options); var opts = {
start: _.isUndefined(cache.height) ? 0 : cache.height + 1,
end: tipHeight
};
result.totalReceived = 0; var outputStream = self.createOutputsStream(address, opts);
result.totalSpent = 0;
result.balance = 0;
result.unconfirmedBalance = 0;
outputStream.on('data', function(output) { outputStream.on('data', function(output) {
@ -1376,13 +1469,11 @@ AddressService.prototype._getAddressOutputsSummary = function(address, options,
// Bitcoind's isSpent only works for confirmed transactions // Bitcoind's isSpent only works for confirmed transactions
var spentDB = self.node.services.bitcoind.isSpent(txid, outputIndex); var spentDB = self.node.services.bitcoind.isSpent(txid, outputIndex);
result.totalReceived += output.satoshis; cache.result.totalReceived += output.satoshis;
result.appearanceIds[txid] = true; cache.result.appearanceIds[txid] = true;
if (spentDB) { if (!spentDB) {
result.totalSpent += output.satoshis; cache.result.balance += output.satoshis;
} else {
result.balance += output.satoshis;
} }
// Check to see if this output is spent in the mempool and if so // Check to see if this output is spent in the mempool and if so
@ -1393,7 +1484,7 @@ AddressService.prototype._getAddressOutputsSummary = function(address, options,
); );
var spentMempool = self.mempoolSpentIndex[spentIndexSyncKey]; var spentMempool = self.mempoolSpentIndex[spentIndexSyncKey];
if (spentMempool) { if (spentMempool) {
result.unconfirmedBalance -= output.satoshis; cache.result.unconfirmedBalance -= output.satoshis;
} }
}); });
@ -1418,7 +1509,7 @@ AddressService.prototype._getAddressOutputsSummary = function(address, options,
for(var i = 0; i < mempoolOutputs.length; i++) { for(var i = 0; i < mempoolOutputs.length; i++) {
var output = mempoolOutputs[i]; var output = mempoolOutputs[i];
result.unconfirmedAppearanceIds[output.txid] = true; cache.result.unconfirmedAppearanceIds[output.txid] = true;
var spentIndexSyncKey = encoding.encodeSpentIndexSyncKey( var spentIndexSyncKey = encoding.encodeSpentIndexSyncKey(
new Buffer(output.txid, 'hex'), // TODO: get buffer directly new Buffer(output.txid, 'hex'), // TODO: get buffer directly
@ -1427,11 +1518,11 @@ AddressService.prototype._getAddressOutputsSummary = function(address, options,
var spentMempool = self.mempoolSpentIndex[spentIndexSyncKey]; var spentMempool = self.mempoolSpentIndex[spentIndexSyncKey];
// Only add this to the balance if it's not spent in the mempool already // Only add this to the balance if it's not spent in the mempool already
if (!spentMempool) { if (!spentMempool) {
result.unconfirmedBalance += output.satoshis; cache.result.unconfirmedBalance += output.satoshis;
} }
} }
callback(error, result); callback(error, cache);
}); });

View File

@ -46,6 +46,8 @@ function DB(options) {
this._setDataPath(); this._setDataPath();
this.maxOpenFiles = options.maxOpenFiles || DB.DEFAULT_MAX_OPEN_FILES;
this.levelupStore = leveldown; this.levelupStore = leveldown;
if (options.store) { if (options.store) {
this.levelupStore = options.store; this.levelupStore = options.store;
@ -68,6 +70,8 @@ DB.PREFIXES = {
TIP: new Buffer('04', 'hex') TIP: new Buffer('04', 'hex')
}; };
DB.DEFAULT_MAX_OPEN_FILES = 200;
/** /**
* This function will set `this.dataPath` based on `this.node.network`. * This function will set `this.dataPath` based on `this.node.network`.
* @private * @private
@ -98,7 +102,7 @@ DB.prototype.start = function(callback) {
} }
this.genesis = Block.fromBuffer(this.node.services.bitcoind.genesisBuffer); this.genesis = Block.fromBuffer(this.node.services.bitcoind.genesisBuffer);
this.store = levelup(this.dataPath, { db: this.levelupStore }); this.store = levelup(this.dataPath, { db: this.levelupStore, maxOpenFiles: this.maxOpenFiles });
this.node.services.bitcoind.on('tx', this.transactionHandler.bind(this)); this.node.services.bitcoind.on('tx', this.transactionHandler.bind(this));
this.once('ready', function() { this.once('ready', function() {