diff --git a/README.md b/README.md index 4ce25789..9570cc7d 100644 --- a/README.md +++ b/README.md @@ -106,7 +106,7 @@ the blockchain. One built-in module is the address module which exposes the API ### Writing a Module -A new module can be created by inheriting from `BitcoindJS.Module`, implementing the methods `blockHandler()` and `getAPIMethods()`, and any additional methods for querying the data. Here is an example: +A new module can be created by inheriting from `BitcoindJS.Module`, implementing the methods `blockHandler()`, `getAPIMethods()`, `getPublishEvents()` and any additional methods for querying the data. Here is an example: ```js var inherits = require('util').inherits; @@ -158,6 +158,31 @@ MyModule.prototype.getAPIMethods = function() { ]; }; +/** + * the bus events available for subscription + * @return {Array} array of events + */ +MyModule.prototype.getPublishEvents = function() { + return [ + { + name: 'custom', + scope: this, + subscribe: this.subscribeCustom, + unsubscribe: this.unsubscribeCustom + } + ] +}; + +/** + * Will keep track of event listeners to later publish and emit events. + */ +MyModule.prototype.subscribeCustom = function(emitter, param) { + if(!this.subscriptions[param]) { + this.subscriptions[param] = []; + } + this.subscriptions[param].push(emitter); +} + MyModule.prototype.getData = function(arg1, callback) { // You can query the data by reading from the leveldb store on db this.db.store.get(arg1, callback); diff --git a/lib/bus.js b/lib/bus.js new file mode 100644 index 00000000..aaf1da33 --- /dev/null +++ b/lib/bus.js @@ -0,0 +1,43 @@ +'use strict'; + +var events = require('events'); +var util = require('util'); + +function Bus(params) { + events.EventEmitter.call(this); + this.db = params.db; +} + +util.inherits(Bus, events.EventEmitter); + +Bus.prototype.subscribe = function(name) { + for (var i = 0; i < this.db.modules.length; i++) { + var mod = this.db.modules[i]; + var events = mod.getPublishEvents(); + for (var j = 0; j < events.length; j++) { + var event = events[j]; + var params = Array.prototype.slice.call(arguments).slice(1); + params.unshift(this); + if (name === event.name) { + event.subscribe.apply(event.scope, params); + } + } + } +}; + +Bus.prototype.unsubscribe = function(name) { + for (var i = 0; i < this.db.modules.length; i++) { + var mod = this.db.modules[i]; + var events = mod.getPublishEvents(); + for (var j = 0; j < events.length; j++) { + var event = events[j]; + var params = Array.prototype.slice.call(arguments).slice(1); + params.unshift(this); + if (name === event.name) { + event.unsubscribe.apply(event.scope, params); + } + } + } +}; + +module.exports = Bus; diff --git a/lib/module.js b/lib/module.js index 1acce6c4..b9e39e66 100644 --- a/lib/module.js +++ b/lib/module.js @@ -15,6 +15,18 @@ Module.prototype.blockHandler = function(block, add, callback) { setImmediate(callback); }; +/** + * the bus events available for subscription + * @return {Array} an array of event info + */ +Module.prototype.getPublishEvents = function() { + // Example: + // return [ + // ['eventname', this, this.subscribeEvent, this.unsubscribeEvent], + // ]; + return []; +}; + /** * the API methods to expose * @return {Array} return array of methods @@ -33,4 +45,4 @@ Module.prototype.getAPIMethods = function() { // // }; -module.exports = Module; \ No newline at end of file +module.exports = Module; diff --git a/lib/modules/address.js b/lib/modules/address.js index fd340069..89e62874 100644 --- a/lib/modules/address.js +++ b/lib/modules/address.js @@ -7,11 +7,17 @@ var chainlib = require('chainlib'); var log = chainlib.log; var errors = chainlib.errors; var bitcore = require('bitcore'); +var $ = bitcore.util.preconditions; +var EventEmitter = require('events').EventEmitter; var PublicKey = bitcore.PublicKey; var Address = bitcore.Address; var AddressModule = function(options) { BaseModule.call(this, options); + + this.subscriptions = {}; + this.subscriptions.transaction = {}; + this.subscriptions.balance = {}; }; inherits(AddressModule, BaseModule); @@ -29,6 +35,23 @@ AddressModule.prototype.getAPIMethods = function() { ]; }; +AddressModule.prototype.getPublishEvents = function() { + return [ + { + name: 'transaction', + scope: this, + subscribe: this.subscribe.bind(this, 'transaction'), + unsubscribe: this.unsubscribe.bind(this, 'transaction') + }, + { + name: 'balance', + scope: this, + subscribe: this.subscribe.bind(this, 'balance'), + unsubscribe: this.unsubscribe.bind(this, 'balance') + } + ]; +}; + AddressModule.prototype.blockHandler = function(block, addOutput, callback) { var txs = this.db.getTransactionsFromBlock(block); @@ -82,6 +105,11 @@ AddressModule.prototype.blockHandler = function(block, addOutput, callback) { key: [AddressModule.PREFIXES.OUTPUTS, address, timestamp, txid, outputIndex].join('-'), value: [output.satoshis, script, height].join(':') }); + + // publish events to any subscribers + this.transactionEventHandler(block, address, tx); + this.balanceEventHandler(block, address); + } if(tx.isCoinbase()) { @@ -95,6 +123,57 @@ AddressModule.prototype.blockHandler = function(block, addOutput, callback) { }); }; +AddressModule.prototype.transactionEventHandler = function(block, address, tx) { + if(this.subscriptions.transaction[address]) { + var emitters = this.subscriptions.transaction[address]; + for(var k = 0; k < emitters.length; k++) { + emitters[k].emit('transaction', address, tx, block); + } + } +}; + +AddressModule.prototype.balanceEventHandler = function(block, address) { + if(this.subscriptions.balance[address]) { + var emitters = this.subscriptions.balance[address]; + this.getBalance(address, true, function(err, balance) { + if(err) { + return this.emit(err); + } + + for(var i = 0; i < emitters.length; i++) { + emitters[i].emit('balance', address, balance, block); + } + }); + } +}; + +AddressModule.prototype.subscribe = function(name, emitter, addresses) { + $.checkArgument(emitter instanceof EventEmitter, 'First argument is expected to be an EventEmitter'); + $.checkArgument(Array.isArray(addresses), 'Second argument is expected to be an Array of addresses'); + + for(var i = 0; i < addresses.length; i++) { + if(!this.subscriptions[name][addresses[i]]) { + this.subscriptions[name][addresses[i]] = []; + } + this.subscriptions[name][addresses[i]].push(emitter); + } +}; + +AddressModule.prototype.unsubscribe = function(name, emitter, addresses) { + $.checkArgument(emitter instanceof EventEmitter, 'First argument is expected to be an EventEmitter'); + $.checkArgument(Array.isArray(addresses), 'Second argument is expected to be an Array of addresses'); + + for(var i = 0; i < addresses.length; i++) { + if(this.subscriptions[name][addresses[i]]) { + var emitters = this.subscriptions[name][addresses[i]]; + var index = emitters.indexOf(emitter); + if(index > -1) { + emitters.splice(index, 1); + } + } + } +}; + AddressModule.prototype.getBalance = function(address, queryMempool, callback) { this.getUnspentOutputs(address, queryMempool, function(err, outputs) { if(err) { @@ -202,4 +281,4 @@ AddressModule.prototype.isSpent = function(output, queryMempool, callback) { }); }; -module.exports = AddressModule; \ No newline at end of file +module.exports = AddressModule; diff --git a/lib/node.js b/lib/node.js index 2e219783..00a929bd 100644 --- a/lib/node.js +++ b/lib/node.js @@ -16,6 +16,7 @@ var _ = bitcore.deps._; var $ = bitcore.util.preconditions; var genesis = require('./genesis.json'); var daemon = require('./daemon'); +var Bus = require('./bus'); function Node(config) { BaseNode.call(this, config); @@ -24,6 +25,10 @@ function Node(config) { util.inherits(Node, BaseNode); +Node.prototype.openBus = function() { + return new Bus({db: this.db}); +}; + Node.prototype._loadConfiguration = function(config) { var self = this; this._loadBitcoinConf(config); diff --git a/package.json b/package.json index 1183d515..f6e78ee4 100644 --- a/package.json +++ b/package.json @@ -38,14 +38,14 @@ "bitcoind" ], "dependencies": { + "async": "1.3.0", "bindings": "^1.2.1", - "mkdirp": "0.5.0", - "nan": "1.3.0", - "tiny": "0.0.10", "chainlib": "^0.1.1", "errno": "^0.1.2", - "async": "1.3.0", - "memdown": "^1.0.0" + "memdown": "^1.0.0", + "mkdirp": "0.5.0", + "nan": "1.3.0", + "tiny": "0.0.10" }, "devDependencies": { "benchmark": "1.0.0", diff --git a/test/bus.unit.js b/test/bus.unit.js new file mode 100644 index 00000000..3dc09dc5 --- /dev/null +++ b/test/bus.unit.js @@ -0,0 +1,61 @@ +'use strict'; + +var should = require('chai').should(); +var sinon = require('sinon'); +var Bus = require('../lib/bus'); + +describe('Bus', function() { + + describe('#subscribe', function() { + it('will call modules subscribe function with the correct arguments', function() { + var subscribe = sinon.spy(); + var db = { + modules: [ + { + getPublishEvents: sinon.stub().returns([ + { + name: 'test', + scope: this, + subscribe: subscribe, + } + ]) + } + ] + }; + var bus = new Bus({db: db}); + bus.subscribe('test', 'a', 'b', 'c'); + subscribe.callCount.should.equal(1); + subscribe.args[0][0].should.equal(bus); + subscribe.args[0][1].should.equal('a'); + subscribe.args[0][2].should.equal('b'); + subscribe.args[0][3].should.equal('c'); + }); + }); + + describe('#unsubscribe', function() { + it('will call modules unsubscribe function with the correct arguments', function() { + var unsubscribe = sinon.spy(); + var db = { + modules: [ + { + getPublishEvents: sinon.stub().returns([ + { + name: 'test', + scope: this, + unsubscribe: unsubscribe + } + ]) + } + ] + }; + var bus = new Bus({db: db}); + bus.unsubscribe('test', 'a', 'b', 'c'); + unsubscribe.callCount.should.equal(1); + unsubscribe.args[0][0].should.equal(bus); + unsubscribe.args[0][1].should.equal('a'); + unsubscribe.args[0][2].should.equal('b'); + unsubscribe.args[0][3].should.equal('c'); + }); + }); + +}); diff --git a/test/modules/address.unit.js b/test/modules/address.unit.js index 2741dfbc..1bd55439 100644 --- a/test/modules/address.unit.js +++ b/test/modules/address.unit.js @@ -2,8 +2,6 @@ var should = require('chai').should(); var sinon = require('sinon'); -var chainlib = require('chainlib'); -var levelup = chainlib.deps.levelup; var bitcoindjs = require('../../'); var AddressModule = bitcoindjs.modules.AddressModule; var blockData = require('../data/livenet-345003.json'); @@ -21,6 +19,40 @@ describe('AddressModule', function() { }); }); + describe('#getPublishEvents', function() { + it('will return an array of publish event objects', function() { + var am = new AddressModule({}); + am.subscribe = sinon.spy(); + am.unsubscribe = sinon.spy(); + var events = am.getPublishEvents(); + + var callCount = 0; + function testName(event, name) { + event.name.should.equal(name); + event.scope.should.equal(am); + var emitter = new EventEmitter(); + var addresses = []; + event.subscribe(emitter, addresses); + am.subscribe.callCount.should.equal(callCount + 1); + am.subscribe.args[callCount][0].should.equal(name); + am.subscribe.args[callCount][1].should.equal(emitter); + am.subscribe.args[callCount][2].should.equal(addresses); + am.subscribe.thisValues[callCount].should.equal(am); + event.unsubscribe(emitter, addresses); + am.unsubscribe.callCount.should.equal(callCount + 1); + am.unsubscribe.args[callCount][0].should.equal(name); + am.unsubscribe.args[callCount][1].should.equal(emitter); + am.unsubscribe.args[callCount][2].should.equal(addresses); + am.unsubscribe.thisValues[callCount].should.equal(am); + callCount++; + } + events.forEach(function(event) { + testName(event, event.name); + }); + + }); + }); + describe('#blockHandler', function() { var block = bitcore.Block.fromString(blockData); var db = { @@ -124,6 +156,129 @@ describe('AddressModule', function() { done(); }); }); + it('will call event handlers', function() { + var block = bitcore.Block.fromString(blockData); + var db = { + getTransactionsFromBlock: function() { + return block.transactions.slice(0, 8); + } + }; + var am = new AddressModule({db: db, network: 'livenet'}); + am.transactionEventHandler = sinon.spy(); + am.balanceEventHandler = sinon.spy(); + am.blockHandler( + { + __height: 345003, + timestamp: new Date(1424836934000) + }, + true, + function(err) { + if (err) { + throw err; + } + am.transactionEventHandler.callCount.should.equal(11); + am.balanceEventHandler.callCount.should.equal(11); + } + ); + }); + }); + + describe('#transactionEventHandler', function() { + it('will emit a transaction if there is a subscriber', function(done) { + var am = new AddressModule({}); + var emitter = new EventEmitter(); + am.subscriptions.transaction = { + '1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N': [emitter] + }; + var block = {}; + var tx = {}; + emitter.on('transaction', function(address, t, b) { + address.should.equal('1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N'); + t.should.equal(tx); + b.should.equal(block); + done(); + }); + am.transactionEventHandler(block, '1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N', tx); + }); + }); + + describe('#balanceEventHandler', function() { + it('will emit a balance if there is a subscriber', function(done) { + var am = new AddressModule({}); + var emitter = new EventEmitter(); + am.subscriptions.balance = { + '1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N': [emitter] + }; + var block = {}; + var balance = 1000; + am.getBalance = sinon.stub().callsArgWith(2, null, balance); + emitter.on('balance', function(address, bal, b) { + address.should.equal('1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N'); + bal.should.equal(balance); + b.should.equal(block); + done(); + }); + am.balanceEventHandler(block, '1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N'); + }); + }); + + describe('#subscribe', function() { + it('will add emitters to the subscribers array (transaction)', function() { + var am = new AddressModule({}); + var emitter = new EventEmitter(); + + var address = '1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N'; + var name = 'transaction'; + am.subscribe(name, emitter, [address]); + am.subscriptions.transaction[address].should.deep.equal([emitter]); + + var address2 = '1KiW1A4dx1oRgLHtDtBjcunUGkYtFgZ1W'; + am.subscribe(name, emitter, [address2]); + am.subscriptions.transaction[address2].should.deep.equal([emitter]); + + var emitter2 = new EventEmitter(); + am.subscribe(name, emitter2, [address]); + am.subscriptions.transaction[address].should.deep.equal([emitter, emitter2]); + }); + it('will add an emitter to the subscribers array (balance)', function() { + var am = new AddressModule({}); + var emitter = new EventEmitter(); + var name = 'balance'; + var address = '1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N'; + am.subscribe(name, emitter, [address]); + am.subscriptions.balance[address].should.deep.equal([emitter]); + + var address2 = '1KiW1A4dx1oRgLHtDtBjcunUGkYtFgZ1W'; + am.subscribe(name, emitter, [address2]); + am.subscriptions.balance[address2].should.deep.equal([emitter]); + + var emitter2 = new EventEmitter(); + am.subscribe(name, emitter2, [address]); + am.subscriptions.balance[address].should.deep.equal([emitter, emitter2]); + }); + }); + + describe('#unsubscribe', function() { + it('will remove emitter from subscribers array (transaction)', function() { + var am = new AddressModule({}); + var emitter = new EventEmitter(); + var emitter2 = new EventEmitter(); + var address = '1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N'; + am.subscriptions.transaction[address] = [emitter, emitter2]; + var name = 'transaction'; + am.unsubscribe(name, emitter, [address]); + am.subscriptions.transaction[address].should.deep.equal([emitter2]); + }); + it('will remove emitter from subscribers array (balance)', function() { + var am = new AddressModule({}); + var emitter = new EventEmitter(); + var emitter2 = new EventEmitter(); + var address = '1DzjESe6SLmAKVPLFMj6Sx1sWki3qt5i8N'; + var name = 'balance'; + am.subscriptions.balance[address] = [emitter, emitter2]; + am.unsubscribe(name, emitter, [address]); + am.subscriptions.balance[address].should.deep.equal([emitter2]); + }); }); describe('#getBalance', function() { diff --git a/test/node.unit.js b/test/node.unit.js index 33b447d4..2417f1dc 100644 --- a/test/node.unit.js +++ b/test/node.unit.js @@ -31,6 +31,15 @@ var Node = proxyquire('../lib/node', { chainlib.Node = OriginalNode; describe('Bitcoind Node', function() { + describe('#openBus', function() { + it('will create a new bus', function() { + var node = new Node({}); + var db = {}; + node.db = db; + var bus = node.openBus(); + bus.db.should.equal(db); + }); + }); describe('#_loadConfiguration', function() { it('should call the necessary methods', function() { var node = new Node({});