From bde062e744509db6da171a07259358bb5ccfea28 Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Thu, 9 Nov 2017 11:45:00 -0500 Subject: [PATCH] wip on changing subscriptions endpoints. --- lib/services/block/index.js | 32 ++++++++++++++++++++++++++++++++ lib/services/mempool/index.js | 31 +++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+) diff --git a/lib/services/block/index.js b/lib/services/block/index.js index df033df3..1ffab141 100644 --- a/lib/services/block/index.js +++ b/lib/services/block/index.js @@ -17,6 +17,9 @@ var BlockService = function(options) { BaseService.call(this, options); + this._subscriptions = {}; + this._subscriptions.block = []; + this._tip = null; this._db = this.node.services.db; this._p2p = this.node.services.p2p; @@ -39,6 +42,30 @@ inherits(BlockService, BaseService); BlockService.dependencies = [ 'timestamp', 'p2p', 'db', 'header', 'mempool' ]; +BlockService.prototype.subscribe = function(name, emitter) { + this._subscriptions[name].push(emitter); + log.info(emitter.remoteAddress, 'subscribe:', 'block/' + name, 'total:', this._subscriptions[name].length); +}; + +BlockService.prototype.unsubscribe = function(name, emitter) { + var index = this._subscriptions[name].indexOf(emitter); + if (index > -1) { + this._subscriptions[name].splice(index, 1); + } + log.info(emitter.remoteAddress, 'unsubscribe:', 'block/' + name, 'total:', this._subscriptions[name].length); +}; + +BlockService.prototype.getPublishEvents = function() { + return [ + { + name: 'block/block', + scope: this, + subscribe: this.subscribe.bind(this, 'block'), + unsubscribe: this.unsubscribe.bind(this, 'block') + } + ]; +}; + BlockService.prototype.getAPIMethods = function() { var methods = [ ['getInfo', this, this.getInfo, 0], @@ -930,6 +957,11 @@ BlockService.prototype._saveBlock = function(block, callback) { return callback(err); } self._processingBlock = false; + + for (var i = 0; i < self._subscriptions.block.length; i++) { + self._subscriptions.block[i].emit('block/block', block); + } + callback(); }); diff --git a/lib/services/mempool/index.js b/lib/services/mempool/index.js index 150e12ca..a9960185 100644 --- a/lib/services/mempool/index.js +++ b/lib/services/mempool/index.js @@ -7,8 +7,10 @@ var utils = require('../../utils'); var MempoolService = function(options) { BaseService.call(this, options); + this._subscriptions = {}; this._subscriptions.transaction = []; + this._db = this.node.services.db; this._p2p = this.node.services.p2p; this._network = this.node.network; @@ -28,6 +30,19 @@ util.inherits(MempoolService, BaseService); MempoolService.dependencies = ['db']; MempoolService.prototype.getAPIMethods = function() { + +MempoolService.prototype.subscribe = function(name, emitter) { + this._subscriptions[name].push(emitter); + log.info(emitter.remoteAddress, 'subscribe:', 'mempool/' + name, 'total:', this._subscriptions[name].length); +}; + +MempoolService.prototype.unsubscribe = function(name, emitter) { + var index = this._subscriptions[name].indexOf(emitter); + if (index > -1) { + this._subscriptions[name].splice(index, 1); + } + log.info(emitter.remoteAddress, 'unsubscribe:', 'mempool/' + name, 'total:', this._subscriptions[name].length); +}; var methods = [ ['getMempoolTransaction', this, this.getMempoolTransaction, 1], ['getTxidsByAddress', this, this.getTxsByAddress, 2], @@ -35,6 +50,17 @@ MempoolService.prototype.getAPIMethods = function() { return methods; }; +MempoolService.prototype.getPublishEvents = function() { + return [ + { + name: 'mempool/transaction', + scope: this, + subscribe: this.subscribe.bind(this, 'transaction'), + unsubscribe: this.unsubscribe.bind(this, 'transaction') + } + ]; +}; + MempoolService.prototype.start = function(callback) { var self = this; @@ -223,6 +249,11 @@ MempoolService.prototype._onTransaction = function(tx) { log.error(err); self.node.stop(); } + + for (var i = 0; i < self._subscriptions.transaction.length; i++) { + self._subscriptions.transaction[i].emit('mempool/transaction', tx); + } + }); };