From 16f7ffa784b70bd8baae3bf00fe7142cca3b9c71 Mon Sep 17 00:00:00 2001 From: Braydon Fuller Date: Fri, 7 Aug 2015 13:14:30 -0400 Subject: [PATCH] Process incoming transactions for the Address Module - Transaction events are emitted when an address matches subscribers - The Address Module listens for incoming transactions from the daemon --- lib/db.js | 16 ++-- lib/modules/address.js | 118 +++++++++++++++++++++++++--- lib/node.js | 14 +++- test/modules/address.unit.js | 147 ++++++++++++++++++++++++++++------- test/node.unit.js | 4 + 5 files changed, 249 insertions(+), 50 deletions(-) diff --git a/lib/db.js b/lib/db.js index ef68db92..2420533e 100644 --- a/lib/db.js +++ b/lib/db.js @@ -9,9 +9,7 @@ var bitcore = require('bitcore'); var $ = bitcore.util.preconditions; var BufferWriter = bitcore.encoding.BufferWriter; var errors = require('./errors'); -var levelup = chainlib.deps.levelup; var log = chainlib.log; -var Address = bitcore.Address; var BaseModule = require('./module'); var AddressModule = require('./modules/address'); @@ -27,17 +25,13 @@ function DB(options) { this.Transaction = Transaction; this.network = bitcore.Networks.get(options.network) || bitcore.Networks.testnet; + + // Modules to be loaded when ready + this._modules = options.modules || []; + this._modules.push(AddressModule); + this.modules = []; - // Add address module - this.addModule(AddressModule); - - // Add other modules - if(options.modules && options.modules.length) { - for(var i = 0; i < options.modules.length; i++) { - this.addModule(options.modules[i]); - } - } } util.inherits(DB, BaseDB); diff --git a/lib/modules/address.js b/lib/modules/address.js index 7b9464cb..e55a77cb 100644 --- a/lib/modules/address.js +++ b/lib/modules/address.js @@ -20,6 +20,9 @@ var AddressModule = function(options) { this.subscriptions = {}; this.subscriptions.transaction = {}; this.subscriptions.balance = {}; + + this.db.bitcoind.on('tx', this.transactionHandler.bind(this)); + }; inherits(AddressModule, BaseModule); @@ -56,6 +59,74 @@ AddressModule.prototype.getPublishEvents = function() { ]; }; +/** + * Will process each output of a transaction from the daemon "tx" event, and contruct + * an object with the data for the message to be relayed to any subscribers for an address. + * + * @param {Object} messages - An object to collect messages + * @param {Transaction} tx - Instance of the transaction + * @param {Output} output - The output being handled + * @param {Number} outputIndex - The index of the output in the transaction + * @param {Boolean} rejected - If the transaction was rejected by the mempool + */ +AddressModule.prototype.transactionOutputHandler = function(messages, tx, outputIndex, rejected) { + var script = tx.outputs[outputIndex].script; + + // If the script is invalid skip + if (!script) { + return; + } + + // Find the address for the output + var address = script.toAddress(this.db.network); + if (!address && script.isPublicKeyOut()) { + var pubkey = script.chunks[0].buf; + address = Address.fromPublicKey(new PublicKey(pubkey), this.db.network); + } else if (!address){ + return; + } + + // Collect data to publish to address subscribers + if (messages[address]) { + messages[address].outputIndexes.push(outputIndex); + } else { + messages[address] = { + tx: tx, + outputIndexes: [outputIndex], + address: address.toString(), + rejected: rejected + }; + } +}; + +/** + * This will handle data from the daemon "tx" event, go through each of the outputs + * and send messages to any subscribers for a particular address. + * + * @param {Object} txInfo - The data from the daemon.on('tx') event + * @param {Buffer} txInfo.buffer - The transaction buffer + * @param {Boolean} txInfo.mempool - If the transaction was accepted in the mempool + * @param {String} txInfo.hash - The hash of the transaction + */ +AddressModule.prototype.transactionHandler = function(txInfo) { + + // Basic transaction format is handled by the daemon + // and we can safely assume the buffer is properly formatted. + var tx = bitcore.Transaction().fromBuffer(txInfo.buffer); + + var messages = {}; + + var outputsLength = tx.outputs.length; + for (var i = 0; i < outputsLength; i++) { + this.transactionOutputHandler(messages, tx, i, !txInfo.mempool); + } + + for (var key in messages) { + this.transactionEventHandler(messages[key]); + } + +}; + AddressModule.prototype.blockHandler = function(block, addOutput, callback) { var txs = this.db.getTransactionsFromBlock(block); @@ -74,6 +145,9 @@ AddressModule.prototype.blockHandler = function(block, addOutput, callback) { var inputs = tx.inputs; var outputs = tx.outputs; + // Subscription messages + var txmessages = {}; + var outputLength = outputs.length; for (var j = 0; j < outputLength; j++) { var output = outputs[j]; @@ -109,22 +183,38 @@ AddressModule.prototype.blockHandler = function(block, addOutput, callback) { value: value }); - // publish events to any subscribers - this.transactionEventHandler(block, address, tx); + // Collect data for subscribers + if (txmessages[addressStr]) { + txmessages[addressStr].outputIndexes.push(outputIndex); + } else { + txmessages[addressStr] = { + tx: tx, + height: block.__height, + outputIndexes: [outputIndex], + address: addressStr, + timestamp: block.timestamp + }; + } + this.balanceEventHandler(block, address); } + // Publish events to any subscribers for this transaction + for (var addressKey in txmessages) { + this.transactionEventHandler(txmessages[addressKey]); + } + if(tx.isCoinbase()) { continue; } - for(var j = 0; j < inputs.length; j++) { - var input = inputs[j].toObject(); + for(var k = 0; k < inputs.length; k++) { + var input = inputs[k].toObject(); operations.push({ type: action, key: [AddressModule.PREFIXES.SPENTS, input.prevTxId, input.outputIndex].join('-'), - value: [txid, j].join(':') + value: [txid, k].join(':') }); } } @@ -134,11 +224,21 @@ AddressModule.prototype.blockHandler = function(block, addOutput, callback) { }); }; -AddressModule.prototype.transactionEventHandler = function(block, address, tx) { - if(this.subscriptions.transaction[address]) { - var emitters = this.subscriptions.transaction[address]; +/** + * @param {Object} obj + * @param {Transaction} obj.tx - The transaction + * @param {String} [obj.address] - The address for the subscription + * @param {Array} [obj.outputIndexes] - Indexes of the inputs that includes the address + * @param {Array} [obj.inputIndexes] - Indexes of the outputs that includes the address + * @param {Date} [obj.timestamp] - The time of the block the transaction was included + * @param {Number} [obj.height] - The height of the block the transaction was included + * @param {Boolean} [obj.rejected] - If the transaction was not accepted in the mempool + */ +AddressModule.prototype.transactionEventHandler = function(obj) { + if(this.subscriptions.transaction[obj.address]) { + var emitters = this.subscriptions.transaction[obj.address]; for(var k = 0; k < emitters.length; k++) { - emitters[k].emit('transaction', address, tx, block); + emitters[k].emit('transaction', obj); } } }; diff --git a/lib/node.js b/lib/node.js index 3d9c429d..6225764d 100644 --- a/lib/node.js +++ b/lib/node.js @@ -400,6 +400,15 @@ Node.prototype._initializeDatabase = function() { // Database this.db.on('ready', function() { + + // Add all db option modules + var modules = self.db._modules; + if(modules && modules.length) { + for(var i = 0; i < modules.length; i++) { + self.db.addModule(modules[i]); + } + } + log.info('Bitcoin Database Ready'); self.chain.initialize(); }); @@ -408,6 +417,7 @@ Node.prototype._initializeDatabase = function() { Error.captureStackTrace(err); self.emit('error', err); }); + }; Node.prototype._initializeChain = function() { @@ -416,7 +426,7 @@ Node.prototype._initializeChain = function() { // Chain this.chain.on('ready', function() { log.info('Bitcoin Chain Ready'); - self._syncBitcoind(); + self._syncBitcoind(); self.emit('ready'); }); @@ -428,8 +438,6 @@ Node.prototype._initializeChain = function() { Node.prototype._initialize = function() { - var self = this; - // DB References this.db.chain = this.chain; this.db.Block = this.Block; diff --git a/test/modules/address.unit.js b/test/modules/address.unit.js index 83fed199..ce6ad882 100644 --- a/test/modules/address.unit.js +++ b/test/modules/address.unit.js @@ -11,11 +11,17 @@ var errors = bitcoindjs.errors; var chainlib = require('chainlib'); var levelup = chainlib.deps.levelup; +var mockdb = { + bitcoind: { + on: sinon.stub() + } +}; + describe('AddressModule', function() { describe('#getAPIMethods', function() { it('should return the correct methods', function() { - var am = new AddressModule({}); + var am = new AddressModule({db: mockdb}); var methods = am.getAPIMethods(); methods.length.should.equal(5); }); @@ -23,7 +29,7 @@ describe('AddressModule', function() { describe('#getPublishEvents', function() { it('will return an array of publish event objects', function() { - var am = new AddressModule({}); + var am = new AddressModule({db: mockdb}); am.subscribe = sinon.spy(); am.unsubscribe = sinon.spy(); var events = am.getPublishEvents(); @@ -55,14 +61,52 @@ describe('AddressModule', function() { }); }); + describe('#transactionOutputHandler', function() { + it('create a message for an address', function() { + var txBuf = new Buffer('01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d0104ffffffff0100f2052a0100000043410496b538e853519c726a2c91e61ec11600ae1390813a627c66fb8be7947be63c52da7589379515d4e0a604f8141781e62294721166bf621e73a82cbf2342c858eeac00000000', 'hex'); + var tx = bitcore.Transaction().fromBuffer(txBuf); + var am = new AddressModule({db: mockdb}); + var address = '12c6DSiU4Rq3P4ZxziKxzrL5LmMBrzjrJX'; + var messages = {}; + am.transactionOutputHandler(messages, tx, 0, true); + should.exist(messages[address]); + var message = messages[address]; + message.tx.should.equal(tx); + message.outputIndexes.should.deep.equal([0]); + message.address.should.equal(address); + message.rejected.should.equal(true); + }); + }); + + describe('#transactionHandler', function() { + it('will pass outputs to transactionOutputHandler and call transactionEventHandler', function() { + var txBuf = new Buffer('01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d0104ffffffff0100f2052a0100000043410496b538e853519c726a2c91e61ec11600ae1390813a627c66fb8be7947be63c52da7589379515d4e0a604f8141781e62294721166bf621e73a82cbf2342c858eeac00000000', 'hex'); + var am = new AddressModule({db: mockdb}); + var address = '12c6DSiU4Rq3P4ZxziKxzrL5LmMBrzjrJX'; + var message = {}; + am.transactionOutputHandler = function(messages) { + messages[address] = message; + }; + am.transactionEventHandler = sinon.spy(); + am.transactionHandler({ + buffer: txBuf + }); + am.transactionEventHandler.callCount.should.equal(1); + }); + }); + describe('#blockHandler', function() { - var block = bitcore.Block.fromString(blockData); + var am; var db = { getTransactionsFromBlock: function() { return block.transactions.slice(0, 8); + }, + bitcoind: { + on: sinon.stub() } }; - var am = new AddressModule({db: db, network: 'livenet'}); + + var block = bitcore.Block.fromString(blockData); var data = [ { @@ -110,6 +154,10 @@ describe('AddressModule', function() { var key64 = data[2].key; var value64 = data[2].value; + before(function() { + am = new AddressModule({db: db, network: 'livenet'}); + }); + it('should create the correct operations when updating/adding outputs', function(done) { am.blockHandler({__height: 345003, timestamp: new Date(1424836934000)}, true, function(err, operations) { should.not.exist(err); @@ -161,6 +209,9 @@ describe('AddressModule', function() { var db = { getTransactionsFromBlock: function() { return transactions; + }, + bitcoind: { + on: sinon.stub() } }; @@ -177,6 +228,9 @@ describe('AddressModule', function() { var db = { getTransactionsFromBlock: function() { return block.transactions.slice(0, 8); + }, + bitcoind: { + on: sinon.stub() } }; var am = new AddressModule({db: db, network: 'livenet'}); @@ -201,26 +255,37 @@ describe('AddressModule', function() { describe('#transactionEventHandler', function() { it('will emit a transaction if there is a subscriber', function(done) { - var am = new AddressModule({}); + var am = new AddressModule({db: mockdb}); var emitter = new EventEmitter(); am.subscriptions.transaction = { '1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N': [emitter] }; - var block = {}; + var block = { + __height: 0, + timestamp: new Date() + }; var tx = {}; - emitter.on('transaction', function(address, t, b) { - address.should.equal('1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N'); - t.should.equal(tx); - b.should.equal(block); + emitter.on('transaction', function(obj) { + obj.address.should.equal('1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N'); + obj.tx.should.equal(tx); + obj.timestamp.should.equal(block.timestamp); + obj.height.should.equal(block.__height); + obj.outputIndexes.should.deep.equal([1]); done(); }); - am.transactionEventHandler(block, '1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N', tx); + am.transactionEventHandler({ + address: '1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N', + height: block.__height, + timestamp: block.timestamp, + outputIndexes: [1], + tx: tx + }); }); }); describe('#balanceEventHandler', function() { it('will emit a balance if there is a subscriber', function(done) { - var am = new AddressModule({}); + var am = new AddressModule({db: mockdb}); var emitter = new EventEmitter(); am.subscriptions.balance = { '1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N': [emitter] @@ -240,7 +305,7 @@ describe('AddressModule', function() { describe('#subscribe', function() { it('will add emitters to the subscribers array (transaction)', function() { - var am = new AddressModule({}); + var am = new AddressModule({db: mockdb}); var emitter = new EventEmitter(); var address = '1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N'; @@ -257,7 +322,7 @@ describe('AddressModule', function() { am.subscriptions.transaction[address].should.deep.equal([emitter, emitter2]); }); it('will add an emitter to the subscribers array (balance)', function() { - var am = new AddressModule({}); + var am = new AddressModule({db: mockdb}); var emitter = new EventEmitter(); var name = 'balance'; var address = '1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N'; @@ -276,7 +341,7 @@ describe('AddressModule', function() { describe('#unsubscribe', function() { it('will remove emitter from subscribers array (transaction)', function() { - var am = new AddressModule({}); + var am = new AddressModule({db: mockdb}); var emitter = new EventEmitter(); var emitter2 = new EventEmitter(); var address = '1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N'; @@ -286,7 +351,7 @@ describe('AddressModule', function() { am.subscriptions.transaction[address].should.deep.equal([emitter2]); }); it('will remove emitter from subscribers array (balance)', function() { - var am = new AddressModule({}); + var am = new AddressModule({db: mockdb}); var emitter = new EventEmitter(); var emitter2 = new EventEmitter(); var address = '1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N'; @@ -296,7 +361,7 @@ describe('AddressModule', function() { am.subscriptions.balance[address].should.deep.equal([emitter2]); }); it('should unsubscribe from all addresses if no addresses are specified', function() { - var am = new AddressModule({}); + var am = new AddressModule({db: mockdb}); var emitter = new EventEmitter(); var emitter2 = new EventEmitter(); am.subscriptions.balance = { @@ -313,7 +378,7 @@ describe('AddressModule', function() { describe('#getBalance', function() { it('should sum up the unspent outputs', function(done) { - var am = new AddressModule({}); + var am = new AddressModule({db: mockdb}); var outputs = [ {satoshis: 1000}, {satoshis: 2000}, {satoshis: 3000} ]; @@ -326,7 +391,7 @@ describe('AddressModule', function() { }); it('will handle error from unspent outputs', function(done) { - var am = new AddressModule({}); + var am = new AddressModule({db: mockdb}); am.getUnspentOutputs = sinon.stub().callsArgWith(2, new Error('error')); am.getBalance('someaddress', false, function(err) { should.exist(err); @@ -338,8 +403,17 @@ describe('AddressModule', function() { }); describe('#getOutputs', function() { - var am = new AddressModule({db: {}}); + var am; var address = '1KiW1A4dx1oRgLHtDtBjcunUGkYtFgZ1W'; + var db = { + bitcoind: { + on: sinon.stub() + } + }; + + before(function() { + am = new AddressModule({db: db}); + }); it('should get outputs for an address', function(done) { var readStream1 = new EventEmitter(); @@ -433,7 +507,7 @@ describe('AddressModule', function() { ]; var i = 0; - var am = new AddressModule({}); + var am = new AddressModule({db: mockdb}); am.getOutputs = sinon.stub().callsArgWith(2, null, outputs); am.isUnspent = function(output, queryMempool, callback) { callback(!outputs[i].spent); @@ -449,7 +523,7 @@ describe('AddressModule', function() { }); }); it('should handle an error from getOutputs', function(done) { - var am = new AddressModule({}); + var am = new AddressModule({db: mockdb}); am.getOutputs = sinon.stub().callsArgWith(2, new Error('error')); am.getUnspentOutputs('1KiW1A4dx1oRgLHtDtBjcunUGkYtFgZ1W', false, function(err, outputs) { should.exist(err); @@ -458,7 +532,7 @@ describe('AddressModule', function() { }); }); it('should handle when there are no outputs', function(done) { - var am = new AddressModule({}); + var am = new AddressModule({db: mockdb}); am.getOutputs = sinon.stub().callsArgWith(2, null, []); am.getUnspentOutputs('1KiW1A4dx1oRgLHtDtBjcunUGkYtFgZ1W', false, function(err, outputs) { should.exist(err); @@ -470,7 +544,11 @@ describe('AddressModule', function() { }); describe('#isUnspent', function() { - var am = new AddressModule({}); + var am; + + before(function() { + am = new AddressModule({db: mockdb}); + }); it('should give true when isSpent() gives false', function(done) { am.isSpent = sinon.stub().callsArgWith(2, false); @@ -498,10 +576,19 @@ describe('AddressModule', function() { }); describe('#isSpent', function() { - var am = new AddressModule({db: {}}); - am.db.bitcoind = { - isSpent: sinon.stub().returns(true) + var am; + var db = { + bitcoind: { + on: sinon.stub() + } }; + before(function() { + am = new AddressModule({db: db}); + am.db.bitcoind = { + isSpent: sinon.stub().returns(true), + on: sinon.stub() + }; + }); it('should give true if bitcoind.isSpent gives true', function(done) { am.isSpent('output', true, function(spent) { @@ -516,6 +603,9 @@ describe('AddressModule', function() { var db = { store: { get: sinon.stub().callsArgWith(1, null, 'spendtxid:1') + }, + bitcoind: { + on: sinon.stub() } }; var am = new AddressModule({db: db}); @@ -616,6 +706,9 @@ describe('AddressModule', function() { } } callback(new Error('tx ' + txid + ' not found')); + }, + bitcoind: { + on: sinon.stub() } }; var am = new AddressModule({db: db}); diff --git a/test/node.unit.js b/test/node.unit.js index 438fae88..29bf6f4c 100644 --- a/test/node.unit.js +++ b/test/node.unit.js @@ -437,6 +437,9 @@ describe('Bitcoind Node', function() { it('will call chain.initialize() on ready event', function(done) { var node = new Node({}); node.db = new EventEmitter(); + node.db.addModule = sinon.spy(); + var module = {}; + node.db._modules = [module]; node.chain = { initialize: sinon.spy() }; @@ -445,6 +448,7 @@ describe('Bitcoind Node', function() { setImmediate(function() { chainlib.log.info.callCount.should.equal(1); chainlib.log.info.restore(); + node.db.addModule.callCount.should.equal(1); node.chain.initialize.callCount.should.equal(1); done(); });