This commit is contained in:
Chris Kleeschulte 2017-10-19 19:32:20 -04:00
parent f90e0d2ed5
commit b129bc1048
No known key found for this signature in database
GPG Key ID: 33195D27EF6BDB7F
4 changed files with 195 additions and 51 deletions

View File

@ -217,7 +217,7 @@ AddressService.prototype.getAddressUnspentOutputs = function(address, options, c
return next(null, []);
}
self._mempool.getTxsByAddress(address, next);
self._mempool.getTxsByAddress(address, 'output', next);
},
// if mempool utxos, then add them first
@ -433,7 +433,7 @@ AddressService.prototype._getAddressHistory = function(address, options, callbac
return next(null, []);
}
self._mempool.getTxsByAddress(address, next);
self._mempool.getTxsByAddress(address, 'both', next);
},
// add the meta data such as input values, etc.

View File

@ -4,17 +4,19 @@ var tx = require('bcoin').tx;
function Encoding(servicePrefix) {
this.servicePrefix = servicePrefix;
this.txPrefix = new Buffer('00', 'hex');
this.addressPrefix = new Buffer('01', 'hex');
}
Encoding.prototype.encodeMempoolTransactionKey = function(txid) {
var buffers = [this.servicePrefix];
var buffers = [this.servicePrefix, this.txPrefix];
var txidBuffer = new Buffer(txid, 'hex');
buffers.push(txidBuffer);
return Buffer.concat(buffers);
};
Encoding.prototype.decodeMempoolTransactionKey = function(buffer) {
return buffer.slice(2).toString('hex');
return buffer.slice(3).toString('hex');
};
Encoding.prototype.encodeMempoolTransactionValue = function(transaction) {
@ -25,5 +27,51 @@ Encoding.prototype.decodeMempoolTransactionValue = function(buffer) {
return tx.fromRaw(buffer);
};
Encoding.prototype.encodeMempoolAddressKey = function(address, txid, index, input) {
var buffers = [this.servicePrefix, this.addressPrefix];
var addressSizeBuffer = new Buffer(1);
addressSizeBuffer.writeUInt8(address.length);
var addressBuffer = new Buffer(address, 'utf8');
buffers.push(addressSizeBuffer);
buffers.push(addressBuffer);
var txidBuffer = new Buffer(txid || Array(65).join('0'), 'hex');
buffers.push(txidBuffer);
var indexBuffer = new Buffer(4);
indexBuffer.writeUInt32BE(index || 0);
buffers.push(indexBuffer);
// this is whether the address appears in an input (1) or output (0)
var inputBuffer = new Buffer(1);
inputBuffer.writeUInt8(input || 0);
buffers.push(inputBuffer);
return Buffer.concat(buffers);
};
Encoding.prototype.decodeMempoolAddressKey = function(buffer) {
var addressSize = buffer.readUInt8(3);
var address = buffer.slice(4, addressSize + 4).toString('utf8');
var txid = buffer.slice(addressSize + 4, addressSize + 36).toString('hex');
var index = buffer.readUInt32BE(addressSize + 36);
var input = buffer.readUInt8(addressSize + 40);
return {
address: address,
txid: txid,
index: index,
input: input
};
};
module.exports = Encoding;

View File

@ -11,6 +11,7 @@ var MempoolService = function(options) {
this._db = this.node.services.db;
this._p2p = this.node.services.p2p;
this._network = this.node.network;
this._flush = options.flush;
if (this._network === 'livenet') {
this._network = 'main';
@ -27,7 +28,7 @@ MempoolService.dependencies = ['db'];
MempoolService.prototype.getAPIMethods = function() {
var methods = [
['getMempoolTransaction', this, this.getMempoolTransaction, 1],
['getTxidsByAddress', this, this.getTxidsByAddress, 1],
['getTxsByAddress', this, this.getTxsByAddress, 2],
];
return methods;
};
@ -42,10 +43,44 @@ MempoolService.prototype.start = function(callback) {
self._encoding = new Encoding(prefix);
self._startSubscriptions();
if (self._flush) {
return self._flushMempool(callback);
}
callback();
});
};
MempoolService.prototype._flushMempool = function(callback) {
var self = this;
// TODO: just handle the txindex for now, later handle both txindex and addressindex
var ops = [];
var criteria = {
gte: Buffer.concat([ self._encoding.servicePrefix, new Buffer(new Array(65).join('0'), 'hex')]),
lt: Buffer.concat([ self._encoding.servicePrefix, new Buffer(new Array(65).join('f'), 'hex')])
};
var stream = self._db.createKeyStream(criteria);
stream.on('data', function(key) {
ops.push({
type: 'del',
key: key
});
});
stream.on('end', function() {
ops.batch(ops, function(err) {
if (err) {
return callback(err);
}
callback();
});
});
};
MempoolService.prototype.onReorg = function(args, callback) {
var oldBlockList = args[1];
@ -68,6 +103,8 @@ MempoolService.prototype.onReorg = function(args, callback) {
value: value
});
removalOps = removalOps.concat(this._getAddressOperations(tx, true));
}
}
@ -102,16 +139,66 @@ MempoolService.prototype.onBlock = function(block, callback) {
// remove this block's txs from mempool
var self = this;
var ops = block.txs.map(function(tx) {
return {
var ops = [];
for(var i = 0; i < block.txs.length; i++) {
var tx = block.txs[i];
// tx index
ops.push({
type: 'del',
key: self._encoding.encodeMempoolTransactionKey(tx.txid())
};
});
});
// address index
ops = ops.concat(self._getAddressOperations(tx));
}
callback(null, ops);
};
MempoolService.prototype._getAddressOperations = function(tx, reverse) {
var ops = [];
var action = reverse ? 'put' : 'del';
for(var i = 0; i < tx.outputs.length; i++) {
var output = tx.outputs[i];
var address = utils.getAddress(output, this._network);
if (!address) {
continue;
}
ops.push({
type: action,
key: this.encoding.encodeMempoolAddressKey(address, tx.txid(), i, 0)
});
}
for(i = 0; i < .length; i++) {
var input = tx.inputs[i];
var address = utils.getAddress(input, this._network);
if (!address) {
continue;
}
ops.push({
type: action,
key: this.encoding.encodeMempoolAddressKey(address, tx.txid(), i, 1)
});
}
return ops;
};
MempoolService.prototype._onTransaction = function(tx) {
this._db.put(this._encoding.encodeMempoolTransactionKey(tx.txid()),
this._encoding.encodeMempoolTransactionValue(tx));
@ -137,68 +224,42 @@ MempoolService.prototype.getMempoolTransaction = function(txid, callback) {
};
// TODO optimize this using another index?
MempoolService.prototype.getTxsByAddress = function(address, callback) {
MempoolService.prototype.getTxsByAddress = function(address, type, callback) {
var self = this;
var results = [];
var start = self._encoding.encodeMempoolTransactionKey(new Array(65).join('0'));
var end = self._encoding.encodeMempoolTransactionKey(new Array(65).join('f'));
var start = self._encoding.encodeMempoolAddressKey(address);
var end = Buffer.concat([ start.slice(0, -37), new Buffer(new Array(75).join('f'), 'hex') ]);
var criteria = {
gte: start,
lte: end
};
var stream = self._db.createReadStream(criteria);
var stream = self._db.createKeyStream(criteria);
stream.on('error', function() {
return [];
});
stream.on('end', function() {
return callback(null, results);
});
stream.on('data', function(data) {
var tx = self._encoding.decodeMempoolTransactionValue(data.value);
tx = self._involvesAddress(tx, address);
if (tx) {
results.push(tx);
var addressInfo = self._encoding.decodeMempoolAddressKey(data);
if (type === 'input') {
type = 1;
} else if (type === 'output') {
type = 0;
}
if (type === 'both' || type === addressInfo.input) {
results.push(addressInfo.txid);
}
});
};
MempoolService.prototype._involvesAddress = function(tx, address) {
function contains(collection, network) {
var _address;
for(var i = 0; i < collection.length; i++) {
var item = collection[i];
_address = item.getAddress();
if (!_address) {
continue;
}
_address.network = network;
_address = _address.toString();
if (address === _address) {
return true;
}
}
}
var collections = [ tx.outputs, tx.inputs ];
for(var i = 0; i < collections.length; i++) {
var hasAddress = contains(collections[i], this._network);
if (hasAddress) {
return tx;
}
}
};
MempoolService.prototype.stop = function(callback) {
callback();
};

View File

@ -8,19 +8,25 @@ var Encoding = require('../../../lib/services/mempool/encoding');
describe('Block service encoding', function() {
var servicePrefix = new Buffer('0000', 'hex');
var txPrefix = new Buffer('00', 'hex');
var addressPrefix = new Buffer('01', 'hex');
var encoding = new Encoding(servicePrefix);
var hash = '25e28f9fb0ada5353b7d98d85af5524b2f8df5b0b0e2d188f05968bceca603eb';
var txString = '0100000004de9b4bb17f627096a9ee0b4528e4eae17df5b5c69edc29704c2e84a7371db29f010000006b483045022100f5b1a0d33b7be291c3953c25f8ae39d98601aa7099a8674daf638a08b86c7173022006ce372da5ad088a1cc6e5c49c2760a1b6f085eb1b51b502211b6bc9508661f9012102ec5e3731e54475dd2902326f43602a03ae3d62753324139163f81f20e787514cffffffff7a1d4e5fc2b8177ec738cd723a16cf2bf493791e55573445fc0df630fe5e2d64010000006b483045022100cf97f6cb8f126703e9768545dfb20ffb10ba78ae3d101aa46775f5a239b075fc02203150c4a89a11eaf5e404f4f96b62efa4455e9525765a025525c7105a7e47b6db012102c01e11b1d331f999bbdb83e8831de503cd52a01e3834a95ccafd615c67703d77ffffffff9e52447116415ca0d0567418a1a4ef8f27be3ff5a96bf87c922f3723d7db5d7c000000006b483045022100f6c117e536701be41a6b0b544d7c3b1091301e4e64a6265b6eb167b15d16959d022076916de4b115e700964194ce36a24cb9105f86482f4abbc63110c3f537cd5770012102ddf84cc7bee2d6a82ac09628a8ad4a26cd449fc528b81e7e6cc615707b8169dfffffffff5815d9750eb3572e30d6fd9df7afb4dbd76e042f3aa4988ac763b3fdf8397f80010000006a473044022028f4402b736066d93d2a32b28ccd3b7a21d84bb58fcd07fe392a611db94cdec5022018902ee0bf2c3c840c1b81ead4e6c87c88c48b2005bf5eea796464e561a620a8012102b6cdd1a6cd129ef796faeedb0b840fcd0ca00c57e16e38e46ee7028d59812ae7ffffffff0220a10700000000001976a914c342bcd1a7784d9842f7386b8b3b8a3d4171a06e88ac59611100000000001976a91449f8c749a9960dc29b5cbe7d2397cea7d26611bb88ac00000000'
var txString = '0100000004de9b4bb17f627096a9ee0b4528e4eae17df5b5c69edc29704c2e84a7371db29f010000006b483045022100f5b1a0d33b7be291c3953c25f8ae39d98601aa7099a8674daf638a08b86c7173022006ce372da5ad088a1cc6e5c49c2760a1b6f085eb1b51b502211b6bc9508661f9012102ec5e3731e54475dd2902326f43602a03ae3d62753324139163f81f20e787514cffffffff7a1d4e5fc2b8177ec738cd723a16cf2bf493791e55573445fc0df630fe5e2d64010000006b483045022100cf97f6cb8f126703e9768545dfb20ffb10ba78ae3d101aa46775f5a239b075fc02203150c4a89a11eaf5e404f4f96b62efa4455e9525765a025525c7105a7e47b6db012102c01e11b1d331f999bbdb83e8831de503cd52a01e3834a95ccafd615c67703d77ffffffff9e52447116415ca0d0567418a1a4ef8f27be3ff5a96bf87c922f3723d7db5d7c000000006b483045022100f6c117e536701be41a6b0b544d7c3b1091301e4e64a6265b6eb167b15d16959d022076916de4b115e700964194ce36a24cb9105f86482f4abbc63110c3f537cd5770012102ddf84cc7bee2d6a82ac09628a8ad4a26cd449fc528b81e7e6cc615707b8169dfffffffff5815d9750eb3572e30d6fd9df7afb4dbd76e042f3aa4988ac763b3fdf8397f80010000006a473044022028f4402b736066d93d2a32b28ccd3b7a21d84bb58fcd07fe392a611db94cdec5022018902ee0bf2c3c840c1b81ead4e6c87c88c48b2005bf5eea796464e561a620a8012102b6cdd1a6cd129ef796faeedb0b840fcd0ca00c57e16e38e46ee7028d59812ae7ffffffff0220a10700000000001976a914c342bcd1a7784d9842f7386b8b3b8a3d4171a06e88ac59611100000000001976a91449f8c749a9960dc29b5cbe7d2397cea7d26611bb88ac00000000';
var address = '1234567';
var now = Math.floor(Date.now() / 1000);
var nowBuf = new Buffer(4);
nowBuf.writeUInt32BE(now);
describe('Mempool', function() {
it('should encode mempool transaction key', function() {
encoding.encodeMempoolTransactionKey(hash).should.deep.equal(Buffer.concat([ servicePrefix, new Buffer(hash, 'hex') ]));
encoding.encodeMempoolTransactionKey(hash).should.deep.equal(Buffer.concat([ servicePrefix, txPrefix, new Buffer(hash, 'hex') ]));
});
it('should decode mempool transaction key', function() {
encoding.decodeMempoolTransactionKey(Buffer.concat([ servicePrefix, new Buffer(hash, 'hex') ])).should.deep.equal(hash);
encoding.decodeMempoolTransactionKey(Buffer.concat([ servicePrefix, txPrefix, new Buffer(hash, 'hex') ])).should.deep.equal(hash);
});
it('should encode mempool transaction value', function() {
@ -34,6 +40,35 @@ describe('Block service encoding', function() {
mytx.should.deep.equal(tx.fromRaw(txString, 'hex'));
});
it('should encode mempool address key', function() {
encoding.encodeMempoolAddressKey(address, hash, 0, 1, now)
.should.deep.equal(Buffer.concat([
servicePrefix,
addressPrefix,
new Buffer('07', 'hex'),
new Buffer(address),
new Buffer(hash, 'hex'),
new Buffer('00000000', 'hex'),
new Buffer('01', 'hex'), nowBuf ]));
});
it('should decode mempool address key', function() {
encoding.decodeMempoolAddressKey(Buffer.concat([
servicePrefix,
addressPrefix,
new Buffer('07', 'hex'),
new Buffer(address),
new Buffer(hash, 'hex'),
new Buffer('00000000', 'hex'),
new Buffer('01', 'hex'), nowBuf ])).should.deep.equal({
address: address,
txid: hash,
index: 0,
input: 1,
timestamp: now});
});
});
});