This commit is contained in:
Chris Kleeschulte 2017-05-26 07:40:26 -04:00
parent 8730ca6148
commit d6f1f26469
4 changed files with 105 additions and 76 deletions

View File

@ -121,7 +121,6 @@ BlockHandler.prototype._onFinish = function() {
var self = this;
self.syncing = false;
console.log('done');
self.emit('synced');
@ -209,7 +208,6 @@ BlockStream.prototype._pushBlocks = function(blocks) {
for(var i = 0; i < blocks.length; i++) {
self.lastEmittedHash = blocks[i].hash;
log.info('Pushing block: ' + blocks[i].hash + ' from the blockstream');
self.push(blocks[i]);
}

View File

@ -4,13 +4,10 @@ var BaseService = require('../../service');
var inherits = require('util').inherits;
var Encoding = require('./encoding');
var LRU = require('lru-cache');
const REORG_BUFFER = 6;
var utils = require('../../utils');
function UtxoService(options) {
BaseService.call(this, options);
this._operations = [];
this._createCache({ max: 500000, dispose: this._getUtxoOperations.bind(this) });
this._exclusionIndexes = [];
}
@ -39,6 +36,58 @@ UtxoService.prototype.stop = function(callback) {
}
};
UtxoService.prototype.blockHandler = function(block, connect, callback) {
var self = this;
self._exclusionIndexes.length = 0;
var operations = [];
for(var i = 0; i < block.transactions.length; i++) {
var tx = block.transactions[i];
var inputs = tx.inputs;
var outputs = tx.outputs;
if (!tx.isCoinbase()) {
operations = self._processInputs(tx, inputs, connect).concat(operations);
}
operations = self._processOutputs(tx, outputs, block, connect).concat(operations);
}
callback(null, operations);
};
UtxoService.prototype.getUtxosForAddress = function(address, callback) {
var self = this;
var utxos = [];
var start = self.encoding.encodeUtxoIndexKey(address);
var stream = self.db.createReadStream({
gte: start.slice(0, -36),
lt: Buffer.concat([ start.slice(0, -36), new Buffer('ff', 'hex') ])
});
stream.on('data', function(data) {
var key = self.encoding.decodeUtxoIndexKey(data.key);
var value = self.encoding.decodeUtxoIndexValue(data.value);
utxos.push({
txid: key.txid,
outputIndex: key.outputIndex,
address: address,
height: value.height,
satoshis: value.satoshis,
script: value.scriptBuffer
});
});
stream.on('end', function() {
callback(null, utxos);
});
};
UtxoService.prototype._processInputs = function(tx, inputs, connect) {
var operations = [];
@ -72,91 +121,40 @@ UtxoService.prototype._processOutputs = function(tx, outputs, block, connect) {
continue;
}
if (connect) {
//when the cache is full, we will write out.
return this._setCache(key, block, output);
}
return this._cache.del(key);
var action = connect ? 'put' : 'del';
operations = operations.concat(
{ action: action,
key: this._getOperationsKey(key, output),
value: this._getOperationsValue({
height: block.__height,
satoshis: output.satoshis,
script: output.script
})
}
);
}
};
UtxoService.prototype._setCache = function(key, block, output, value) {
if (!value) {
value = {
output: output,
height: block.__height,
hash: block.hash
};
}
this._cache.set(key, value); // key = 36 bytes, value = (8 + 25ish) + 36 = 69 bytes
return operations;
};
UtxoService.prototype._moveOutput = function(key, connect) {
if (connect) {
self._cache.del(key);
return { action: 'del', key: key };
}
// this should only happen during a reorg, hopefully this is an infrequent occurence
// the ramifications are that comsumers of this data will need to make an additional
// lookup of the tx index. We are ok with trade-off for performance.
return { action: 'put', key: key, value: null };
};
UtxoService.prototype.blockHandler = function(block, connect) {
var self = this;
self._currentBlockHeight = block.__height;
self._exclusionIndexes.length = 0;
var operations = [];
for(var i = 0; i < block.transactions.length; i++) {
var tx = block.transactions[i];
var inputs = tx.inputs;
var outputs = tx.outputs;
if (!tx.isCoinbase()) {
operations = self._processInputs(tx, inputs, connect).concat(operations);
}
self._processOutputs(tx, outputs, block, connect);
}
operations = this._operations.concat(operations);
this._operations.length = 0;
return operations;
};
UtxoService.prototype._getUtxoOperations = function(key, value) {
if (value.height + REORG_BUFFER >= self._currentHeight) {
log.error('Writing utxos to the database before ' + REORG_BUFFER + ' confirmation blocks.' +
' The internal cache might be too small or the system does not have enough memory.');
}
this._operations.push({
action: 'put',
key: this._getOperationsKey(key, value),
value: this._getOperationsValue(value)
});
};
UtxoService.prototype._getOperationsKey = function(key, value) {
var address = utils.getAddressStringFromScript(value.output.script, this.node.network);
return self.encoding.encodeUtxoIndexKey(address, key.slice(0, 32), parseInt(key.slice(32)));
UtxoService.prototype._getOperationsKey = function(key, output) {
var address = utils.getAddressStringFromScript(output.script, this.node.network);
return this.encoding.encodeUtxoIndexKey(address, key.slice(0, 64), parseInt(key.slice(64)));
};
UtxoService.prototype._getOperationsValue = function(value) {
return self.encoding.encodeUtxoIndexValue(value.height, value.output.satoshis, value.output.script);
return this.encoding.encodeUtxoIndexValue(value.height, value.satoshis, value.script.toBuffer());
};
module.exports = UtxoService;

View File

@ -47,6 +47,11 @@ TestWebService.prototype.setupRoutes = function(app) {
});
});
app.get('/utxo/:address', function(req, res) {
self.node.services.utxo.getUtxosForAddress(req.params.address, function(err, utxos) {
res.status(200).jsonp({ address: req.params.address, utxos: utxos });
});
});
};
TestWebService.prototype.getRoutePrefix = function() {

View File

@ -1,7 +1,7 @@
'use strict';
var chai = require('chai');
var should = chai.should();
var expect = chai.expect;
var async = require('async');
var BitcoinRPC = require('bitcoind-rpc');
var path = require('path');
@ -51,7 +51,8 @@ var bitcore = {
'timestamp',
'web',
'block',
'utxo'
'utxo',
'utxo-test'
],
servicesConfig: {
bitcoind: {
@ -64,6 +65,9 @@ var bitcore = {
zmqpubrawtx: bitcoin.args.zmqpubrawtx
}
]
},
'utxo-test': {
requirePath: path.resolve(__dirname + '/test_web.js')
}
}
}
@ -123,7 +127,31 @@ describe('Utxo Operations', function() {
});
it('should index utxos', function(done) {
done();
async.mapLimit(opts.walletPrivKeys, 12, function(privKey, next) {
var address = privKey.toAddress().toString();
utils.queryBitcoreNode(Object.assign({
path: '/test/utxo/' + address
}, bitcore.httpOpts), function(err, res) {
if(err) {
return next(err);
}
res = JSON.parse(res);
expect(res.address).to.equal(address);
next(null, res.utxos);
});
}, function(err, utxos) {
if(err) {
return done(err);
}
console.log('done');
done();
});
});
});