diff --git a/lib/services/address/index.js b/lib/services/address/index.js index 3765921b..2ccbcb30 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -217,7 +217,7 @@ AddressService.prototype.getAddressUnspentOutputs = function(address, options, c return next(null, []); } - self._mempool.getTxsByAddress(address, next); + self._mempool.getTxsByAddress(address, 'output', next); }, // if mempool utxos, then add them first @@ -433,7 +433,7 @@ AddressService.prototype._getAddressHistory = function(address, options, callbac return next(null, []); } - self._mempool.getTxsByAddress(address, next); + self._mempool.getTxsByAddress(address, 'both', next); }, // add the meta data such as input values, etc. diff --git a/lib/services/mempool/encoding.js b/lib/services/mempool/encoding.js index bd35ef5d..ad327da2 100644 --- a/lib/services/mempool/encoding.js +++ b/lib/services/mempool/encoding.js @@ -4,17 +4,19 @@ var tx = require('bcoin').tx; function Encoding(servicePrefix) { this.servicePrefix = servicePrefix; + this.txPrefix = new Buffer('00', 'hex'); + this.addressPrefix = new Buffer('01', 'hex'); } Encoding.prototype.encodeMempoolTransactionKey = function(txid) { - var buffers = [this.servicePrefix]; + var buffers = [this.servicePrefix, this.txPrefix]; var txidBuffer = new Buffer(txid, 'hex'); buffers.push(txidBuffer); return Buffer.concat(buffers); }; Encoding.prototype.decodeMempoolTransactionKey = function(buffer) { - return buffer.slice(2).toString('hex'); + return buffer.slice(3).toString('hex'); }; Encoding.prototype.encodeMempoolTransactionValue = function(transaction) { @@ -25,5 +27,51 @@ Encoding.prototype.decodeMempoolTransactionValue = function(buffer) { return tx.fromRaw(buffer); }; +Encoding.prototype.encodeMempoolAddressKey = function(address, txid, index, input) { + var buffers = [this.servicePrefix, this.addressPrefix]; + + var addressSizeBuffer = new Buffer(1); + addressSizeBuffer.writeUInt8(address.length); + var addressBuffer = new Buffer(address, 'utf8'); + + buffers.push(addressSizeBuffer); + buffers.push(addressBuffer); + + var txidBuffer = new Buffer(txid || Array(65).join('0'), 'hex'); + buffers.push(txidBuffer); + + var indexBuffer = new Buffer(4); + indexBuffer.writeUInt32BE(index || 0); + buffers.push(indexBuffer); + + // this is whether the address appears in an input (1) or output (0) + var inputBuffer = new Buffer(1); + inputBuffer.writeUInt8(input || 0); + buffers.push(inputBuffer); + + return Buffer.concat(buffers); + +}; + +Encoding.prototype.decodeMempoolAddressKey = function(buffer) { + + var addressSize = buffer.readUInt8(3); + var address = buffer.slice(4, addressSize + 4).toString('utf8'); + + var txid = buffer.slice(addressSize + 4, addressSize + 36).toString('hex'); + + var index = buffer.readUInt32BE(addressSize + 36); + + var input = buffer.readUInt8(addressSize + 40); + + return { + address: address, + txid: txid, + index: index, + input: input + }; + +}; + module.exports = Encoding; diff --git a/lib/services/mempool/index.js b/lib/services/mempool/index.js index 3188405a..925b4d38 100644 --- a/lib/services/mempool/index.js +++ b/lib/services/mempool/index.js @@ -11,6 +11,7 @@ var MempoolService = function(options) { this._db = this.node.services.db; this._p2p = this.node.services.p2p; this._network = this.node.network; + this._flush = options.flush; if (this._network === 'livenet') { this._network = 'main'; @@ -27,7 +28,7 @@ MempoolService.dependencies = ['db']; MempoolService.prototype.getAPIMethods = function() { var methods = [ ['getMempoolTransaction', this, this.getMempoolTransaction, 1], - ['getTxidsByAddress', this, this.getTxidsByAddress, 1], + ['getTxsByAddress', this, this.getTxsByAddress, 2], ]; return methods; }; @@ -42,10 +43,44 @@ MempoolService.prototype.start = function(callback) { self._encoding = new Encoding(prefix); self._startSubscriptions(); + if (self._flush) { + return self._flushMempool(callback); + } callback(); }); }; +MempoolService.prototype._flushMempool = function(callback) { + var self = this; + + // TODO: just handle the txindex for now, later handle both txindex and addressindex + var ops = []; + + var criteria = { + gte: Buffer.concat([ self._encoding.servicePrefix, new Buffer(new Array(65).join('0'), 'hex')]), + lt: Buffer.concat([ self._encoding.servicePrefix, new Buffer(new Array(65).join('f'), 'hex')]) + }; + + var stream = self._db.createKeyStream(criteria); + + stream.on('data', function(key) { + ops.push({ + type: 'del', + key: key + }); + }); + + stream.on('end', function() { + ops.batch(ops, function(err) { + if (err) { + return callback(err); + } + callback(); + }); + }); + +}; + MempoolService.prototype.onReorg = function(args, callback) { var oldBlockList = args[1]; @@ -68,6 +103,8 @@ MempoolService.prototype.onReorg = function(args, callback) { value: value }); + removalOps = removalOps.concat(this._getAddressOperations(tx, true)); + } } @@ -102,16 +139,66 @@ MempoolService.prototype.onBlock = function(block, callback) { // remove this block's txs from mempool var self = this; - var ops = block.txs.map(function(tx) { - return { + var ops = []; + + for(var i = 0; i < block.txs.length; i++) { + var tx = block.txs[i]; + + // tx index + ops.push({ type: 'del', key: self._encoding.encodeMempoolTransactionKey(tx.txid()) - }; - }); + }); + + // address index + ops = ops.concat(self._getAddressOperations(tx)); + + } + callback(null, ops); }; +MempoolService.prototype._getAddressOperations = function(tx, reverse) { + + var ops = []; + + var action = reverse ? 'put' : 'del'; + + for(var i = 0; i < tx.outputs.length; i++) { + + var output = tx.outputs[i]; + var address = utils.getAddress(output, this._network); + + if (!address) { + continue; + } + + ops.push({ + type: action, + key: this.encoding.encodeMempoolAddressKey(address, tx.txid(), i, 0) + }); + } + + for(i = 0; i < .length; i++) { + var input = tx.inputs[i]; + var address = utils.getAddress(input, this._network); + + if (!address) { + continue; + } + + ops.push({ + type: action, + key: this.encoding.encodeMempoolAddressKey(address, tx.txid(), i, 1) + }); + } + + return ops; + + +}; + MempoolService.prototype._onTransaction = function(tx) { this._db.put(this._encoding.encodeMempoolTransactionKey(tx.txid()), this._encoding.encodeMempoolTransactionValue(tx)); @@ -137,68 +224,42 @@ MempoolService.prototype.getMempoolTransaction = function(txid, callback) { }; -// TODO optimize this using another index? -MempoolService.prototype.getTxsByAddress = function(address, callback) { +MempoolService.prototype.getTxsByAddress = function(address, type, callback) { var self = this; var results = []; - var start = self._encoding.encodeMempoolTransactionKey(new Array(65).join('0')); - var end = self._encoding.encodeMempoolTransactionKey(new Array(65).join('f')); + var start = self._encoding.encodeMempoolAddressKey(address); + var end = Buffer.concat([ start.slice(0, -37), new Buffer(new Array(75).join('f'), 'hex') ]); var criteria = { gte: start, lte: end }; - var stream = self._db.createReadStream(criteria); + var stream = self._db.createKeyStream(criteria); stream.on('error', function() { return []; }); stream.on('end', function() { - return callback(null, results); + }); stream.on('data', function(data) { - var tx = self._encoding.decodeMempoolTransactionValue(data.value); - tx = self._involvesAddress(tx, address); - if (tx) { - results.push(tx); + var addressInfo = self._encoding.decodeMempoolAddressKey(data); + if (type === 'input') { + type = 1; + } else if (type === 'output') { + type = 0; + } + if (type === 'both' || type === addressInfo.input) { + results.push(addressInfo.txid); } }); }; -MempoolService.prototype._involvesAddress = function(tx, address) { - - function contains(collection, network) { - var _address; - for(var i = 0; i < collection.length; i++) { - var item = collection[i]; - _address = item.getAddress(); - if (!_address) { - continue; - } - _address.network = network; - _address = _address.toString(); - if (address === _address) { - return true; - } - } - } - - var collections = [ tx.outputs, tx.inputs ]; - - for(var i = 0; i < collections.length; i++) { - var hasAddress = contains(collections[i], this._network); - if (hasAddress) { - return tx; - } - } - -}; - MempoolService.prototype.stop = function(callback) { callback(); }; diff --git a/test/services/mempool/encoding.unit.js b/test/services/mempool/encoding.unit.js index 3e1e0a45..744eb0dd 100644 --- a/test/services/mempool/encoding.unit.js +++ b/test/services/mempool/encoding.unit.js @@ -8,19 +8,25 @@ var Encoding = require('../../../lib/services/mempool/encoding'); describe('Block service encoding', function() { var servicePrefix = new Buffer('0000', 'hex'); + var txPrefix = new Buffer('00', 'hex'); + var addressPrefix = new Buffer('01', 'hex'); var encoding = new Encoding(servicePrefix); var hash = '25e28f9fb0ada5353b7d98d85af5524b2f8df5b0b0e2d188f05968bceca603eb'; - var txString = '0100000004de9b4bb17f627096a9ee0b4528e4eae17df5b5c69edc29704c2e84a7371db29f010000006b483045022100f5b1a0d33b7be291c3953c25f8ae39d98601aa7099a8674daf638a08b86c7173022006ce372da5ad088a1cc6e5c49c2760a1b6f085eb1b51b502211b6bc9508661f9012102ec5e3731e54475dd2902326f43602a03ae3d62753324139163f81f20e787514cffffffff7a1d4e5fc2b8177ec738cd723a16cf2bf493791e55573445fc0df630fe5e2d64010000006b483045022100cf97f6cb8f126703e9768545dfb20ffb10ba78ae3d101aa46775f5a239b075fc02203150c4a89a11eaf5e404f4f96b62efa4455e9525765a025525c7105a7e47b6db012102c01e11b1d331f999bbdb83e8831de503cd52a01e3834a95ccafd615c67703d77ffffffff9e52447116415ca0d0567418a1a4ef8f27be3ff5a96bf87c922f3723d7db5d7c000000006b483045022100f6c117e536701be41a6b0b544d7c3b1091301e4e64a6265b6eb167b15d16959d022076916de4b115e700964194ce36a24cb9105f86482f4abbc63110c3f537cd5770012102ddf84cc7bee2d6a82ac09628a8ad4a26cd449fc528b81e7e6cc615707b8169dfffffffff5815d9750eb3572e30d6fd9df7afb4dbd76e042f3aa4988ac763b3fdf8397f80010000006a473044022028f4402b736066d93d2a32b28ccd3b7a21d84bb58fcd07fe392a611db94cdec5022018902ee0bf2c3c840c1b81ead4e6c87c88c48b2005bf5eea796464e561a620a8012102b6cdd1a6cd129ef796faeedb0b840fcd0ca00c57e16e38e46ee7028d59812ae7ffffffff0220a10700000000001976a914c342bcd1a7784d9842f7386b8b3b8a3d4171a06e88ac59611100000000001976a91449f8c749a9960dc29b5cbe7d2397cea7d26611bb88ac00000000' + var txString = '0100000004de9b4bb17f627096a9ee0b4528e4eae17df5b5c69edc29704c2e84a7371db29f010000006b483045022100f5b1a0d33b7be291c3953c25f8ae39d98601aa7099a8674daf638a08b86c7173022006ce372da5ad088a1cc6e5c49c2760a1b6f085eb1b51b502211b6bc9508661f9012102ec5e3731e54475dd2902326f43602a03ae3d62753324139163f81f20e787514cffffffff7a1d4e5fc2b8177ec738cd723a16cf2bf493791e55573445fc0df630fe5e2d64010000006b483045022100cf97f6cb8f126703e9768545dfb20ffb10ba78ae3d101aa46775f5a239b075fc02203150c4a89a11eaf5e404f4f96b62efa4455e9525765a025525c7105a7e47b6db012102c01e11b1d331f999bbdb83e8831de503cd52a01e3834a95ccafd615c67703d77ffffffff9e52447116415ca0d0567418a1a4ef8f27be3ff5a96bf87c922f3723d7db5d7c000000006b483045022100f6c117e536701be41a6b0b544d7c3b1091301e4e64a6265b6eb167b15d16959d022076916de4b115e700964194ce36a24cb9105f86482f4abbc63110c3f537cd5770012102ddf84cc7bee2d6a82ac09628a8ad4a26cd449fc528b81e7e6cc615707b8169dfffffffff5815d9750eb3572e30d6fd9df7afb4dbd76e042f3aa4988ac763b3fdf8397f80010000006a473044022028f4402b736066d93d2a32b28ccd3b7a21d84bb58fcd07fe392a611db94cdec5022018902ee0bf2c3c840c1b81ead4e6c87c88c48b2005bf5eea796464e561a620a8012102b6cdd1a6cd129ef796faeedb0b840fcd0ca00c57e16e38e46ee7028d59812ae7ffffffff0220a10700000000001976a914c342bcd1a7784d9842f7386b8b3b8a3d4171a06e88ac59611100000000001976a91449f8c749a9960dc29b5cbe7d2397cea7d26611bb88ac00000000'; + var address = '1234567'; + var now = Math.floor(Date.now() / 1000); + var nowBuf = new Buffer(4); + nowBuf.writeUInt32BE(now); describe('Mempool', function() { it('should encode mempool transaction key', function() { - encoding.encodeMempoolTransactionKey(hash).should.deep.equal(Buffer.concat([ servicePrefix, new Buffer(hash, 'hex') ])); + encoding.encodeMempoolTransactionKey(hash).should.deep.equal(Buffer.concat([ servicePrefix, txPrefix, new Buffer(hash, 'hex') ])); }); it('should decode mempool transaction key', function() { - encoding.decodeMempoolTransactionKey(Buffer.concat([ servicePrefix, new Buffer(hash, 'hex') ])).should.deep.equal(hash); + encoding.decodeMempoolTransactionKey(Buffer.concat([ servicePrefix, txPrefix, new Buffer(hash, 'hex') ])).should.deep.equal(hash); }); it('should encode mempool transaction value', function() { @@ -34,6 +40,35 @@ describe('Block service encoding', function() { mytx.should.deep.equal(tx.fromRaw(txString, 'hex')); }); + it('should encode mempool address key', function() { + + encoding.encodeMempoolAddressKey(address, hash, 0, 1, now) + .should.deep.equal(Buffer.concat([ + servicePrefix, + addressPrefix, + new Buffer('07', 'hex'), + new Buffer(address), + new Buffer(hash, 'hex'), + new Buffer('00000000', 'hex'), + new Buffer('01', 'hex'), nowBuf ])); + }); + + it('should decode mempool address key', function() { + encoding.decodeMempoolAddressKey(Buffer.concat([ + servicePrefix, + addressPrefix, + new Buffer('07', 'hex'), + new Buffer(address), + new Buffer(hash, 'hex'), + new Buffer('00000000', 'hex'), + new Buffer('01', 'hex'), nowBuf ])).should.deep.equal({ + address: address, + txid: hash, + index: 0, + input: 1, + timestamp: now}); + }); + }); });