Added blockHandler.

This commit is contained in:
Chris Kleeschulte 2017-01-18 18:23:17 -05:00
parent aeeaba0754
commit 43dfeffd5e
2 changed files with 70 additions and 73 deletions

View File

@ -9,19 +9,24 @@ var $ = bitcore.util.preconditions;
var exports = {};
exports.encodeAddressIndexKey = function(hashTypeBuffer, hashBuffer, height, txidBuffer, index, spending) {
exports.encodeAddressIndexKey = function(address, isSpent, height, txidBuffer, index, spending) {
var addressSizeBuffer = new Buffer(1);
addressSizeBuffer.writeUInt8(address.length);
var addressBuffer = new Buffer(address, 'utf8');
var heightBuffer = new Buffer(4);
heightBuffer.writeUInt32BE(height);
var indexBuffer = new Buffer(4);
indexBuffer.writeUInt32BE(index);
var spendingBuffer = new Buffer(1);
spendingBuffer.writeUInt8(spending);
var isSpentBuffer = new Buffer(1);
isSpentBuffer.writeUInt8(isSpent);
var key = Buffer.concat({
constants.PREFIXES.ADDRESS,
hashTypeBuffer,
hashBuffer,
constants.SPACER_MIN,
addressSizeBuffer,
addressBuffer,
isSpentBuffer,
heightBuffer,
txidBuffer,
indexBuffer,
@ -32,34 +37,38 @@ exports.encodeAddressIndexKey = function(hashTypeBuffer, hashBuffer, height, txi
exports.decodeAddressIndexKey = function(buffer) {
var reader = new BufferReader(buffer);
var prefix = reader.read(1);
var hashTypeBuffer = reader.read(1);
var hashBuffer = reader.read(20);
var spacer = reader.read(1);
var addressSize = reader.readUInt8();
var address = reader.read(addressSize).toString('utf8');
var isSpent = reader.readUInt8();
var height = reader.readUInt32BE();
var txid = reader.read(32);
var index = reader.readUInt32BE();
var spending = reader.readUInt8();
return {
prefix: prefix,
hashTypeBuffer: hashTypeBuffer,
hashBuffer: hashBuffer,
address: address,
isSpent: isSpent ? true : false,
height: height,
txid: txid,
index: outputIndex,
spending: spending
spending: spending ? true : false
};
};
exports.encodeAddressIndexValue = function(satoshis) {
exports.encodeAddressIndexValue = function(satoshis, scriptBuffer) {
var satoshisBuffer = new Buffer(8);
satoshisBuffer.writeDoubleBE(satoshis);
return satoshisBuffer;
return Buffer.concat([satoshisBuffer, scriptBuffer]);
};
exports.decodeAddressIndexValue = function(buffer) {
var satoshis = buffer.readDoubleBE(0);
return satoshis;
var scriptBuffer = buffer.slice(8, buffer.length);
return {
satoshis: satoshis,
script: scriptBuffer
};
};
exports.encodeUnspentIndexKey = function(hashTypeBuffer, hashBuffer, txidBuffer, index) {
@ -367,7 +376,7 @@ exports.decodeSummaryCacheValue = function(buffer) {
exports.getAddressInfo = function(addressStr) {
var addrObj = bitcore.Address(addressStr);
var hashTypeBuffer = constants.HASH_TYPES_MAP[addrObj.type];
return {
hashBuffer: addrObj.hashBuffer,
hashTypeBuffer: hashTypeBuffer,

View File

@ -49,6 +49,8 @@ var AddressService = function(options) {
this.maxInputsQueryLength = options.maxInputsQueryLength || constants.MAX_INPUTS_QUERY_LENGTH;
this.maxOutputsQueryLength = options.maxOutputsQueryLength || constants.MAX_OUTPUTS_QUERY_LENGTH;
this.concurrency = options.concurrency || 20;
this._setMempoolIndexPath();
if (options.mempoolMemoryIndex) {
this.levelupStore = memdown;
@ -450,23 +452,23 @@ AddressService.prototype.updateMempoolIndex = function(tx, add, callback) {
* @param {Boolean} addOutput - If the block is being removed or added to the chain
* @param {Function} callback
*/
AddressService.prototype.blockHandler = function(block, addOutput, callback) {
AddressService.prototype.blockHandler = function(block, connectBlock, callback) {
var txs = block.transactions;
var height = block.__height;
var action = 'put';
if (!addOutput) {
var reverseAction = 'del';
if (!connectBlock) {
action = 'del';
reverseAction = 'put';
}
var operations = [];
var transactionLength = txs.length;
for (var i = 0; i < transactionLength; i++) {
async.eachLimit(txs, self.concurrency, function(tx, next) {
var tx = txs[i];
var txid = tx.id;
var txidBuffer = new Buffer(txid, 'hex');
var inputs = tx.inputs;
var outputs = tx.outputs;
@ -478,24 +480,15 @@ AddressService.prototype.blockHandler = function(block, addOutput, callback) {
var output = outputs[outputIndex];
var script = output.script;
var address = script.toAddress();
if(!script) {
log.debug('Invalid script');
continue;
}
var addressInfo = encoding.extractAddressInfoFromScript(script, this.node.network);
if (!addressInfo) {
continue;
}
// We need to use the height for indexes (and not the timestamp) because the
// the timestamp has unreliable sequential ordering. The next block
// can have a time that is previous to the previous block (however not
// less than the mean of the 11 previous blocks) and not greater than 2
// hours in the future.
var key = encoding.encodeOutputKey(addressInfo.hashBuffer, addressInfo.hashTypeBuffer,
height, txidBuffer, outputIndex);
var key = encoding.encodeAddressIndexKey(address, false, height, txid, outputIndex);
var value = encoding.encodeOutputValue(output.satoshis, output._scriptBuffer);
operations.push({
type: action,
@ -503,22 +496,20 @@ AddressService.prototype.blockHandler = function(block, addOutput, callback) {
value: value
});
addressInfo.hashHex = addressInfo.hashBuffer.toString('hex');
// Collect data for subscribers
if (txmessages[addressInfo.hashHex]) {
txmessages[addressInfo.hashHex].outputIndexes.push(outputIndex);
if (txmessages[address]) {
txmessages[address].outputIndexes.push(outputIndex);
} else {
txmessages[addressInfo.hashHex] = {
txmessages[address] = {
tx: tx,
height: height,
outputIndexes: [outputIndex],
addressInfo: addressInfo,
address: address,
timestamp: block.header.timestamp
};
}
this.balanceEventHandler(block, addressInfo);
this.balanceEventHandler(block, address);
}
@ -528,53 +519,50 @@ AddressService.prototype.blockHandler = function(block, addOutput, callback) {
}
if(tx.isCoinbase()) {
continue;
return next();
}
for(var inputIndex = 0; inputIndex < inputs.length; inputIndex++) {
async.eachLimit(inputs, self.concurrency, function(input, next) {
var input = inputs[inputIndex];
var inputHash;
var inputHashType;
if (input.script.isPublicKeyHashIn()) {
inputHash = Hash.sha256ripemd160(input.script.chunks[1].buf);
inputHashType = constants.HASH_TYPES.PUBKEY;
} else if (input.script.isScriptHashIn()) {
inputHash = Hash.sha256ripemd160(input.script.chunks[input.script.chunks.length - 1].buf);
inputHashType = constants.HASH_TYPES.REDEEMSCRIPT;
} else {
continue;
}
var prevTxIdBuffer = new Buffer(input.prevTxId, 'hex');
var address = input.script.toAddress();
// To be able to query inputs by address and spent height
var inputKey = encoding.encodeInputKey(inputHash, inputHashType, height, prevTxIdBuffer, input.outputIndex);
var inputValue = encoding.encodeInputValue(txidBuffer, inputIndex);
operations.push({
type: action,
key: inputKey,
value: inputValue
var inputKey = encoding.encodeAddressIndexKey(address, true, height, txid, inputIndex, true);
self.node.services.transaction.getTransaction(input.prevTxId, function(err, tx) {
if(err) {
return next(err);
}
var output = tx.outputs[input.outputIndex];
var outputKey = encoding.encodeAddressIndexKey(address, true, tx.__height, tx.id, input.outputIndex, false);
var outputKeyToDelete = encoding.encodeAddressIndexKey(address, false, tx.__height, tx.id, input.outputIndex, false);
var outputValue = encoding.encodeAdressIndexValue(output.satoshis, output._scriptBuffer);
var inputValue = encoding.encodeAddressIndexValue(output.satoshis, input._scriptBuffer);
operations = operations.concat([{
type: action,
key: inputKey,
value: inputValue
},
{
type: action,
key: outputKey,
value: outputValue
},
{
type: reverseAction,
key: outputKeyToDelete,
value: outputValue
}]);
next();
});
// To be able to search for an input spending an output
var inputKeyMap = encoding.encodeInputKeyMap(prevTxIdBuffer, input.outputIndex);
var inputValueMap = encoding.encodeInputValueMap(txidBuffer, inputIndex);
operations.push({
type: action,
key: inputKeyMap,
value: inputValueMap
});
}
}
setImmediate(function() {
callback(null, operations);
}, function(err) {
callback(err, operations);
});
});
};
/**