wip on changing subscriptions endpoints.

This commit is contained in:
Chris Kleeschulte 2017-11-09 11:45:00 -05:00
parent b138a558ae
commit bde062e744
No known key found for this signature in database
GPG Key ID: 33195D27EF6BDB7F
2 changed files with 63 additions and 0 deletions

View File

@ -17,6 +17,9 @@ var BlockService = function(options) {
BaseService.call(this, options);
this._subscriptions = {};
this._subscriptions.block = [];
this._tip = null;
this._db = this.node.services.db;
this._p2p = this.node.services.p2p;
@ -39,6 +42,30 @@ inherits(BlockService, BaseService);
BlockService.dependencies = [ 'timestamp', 'p2p', 'db', 'header', 'mempool' ];
BlockService.prototype.subscribe = function(name, emitter) {
this._subscriptions[name].push(emitter);
log.info(emitter.remoteAddress, 'subscribe:', 'block/' + name, 'total:', this._subscriptions[name].length);
};
BlockService.prototype.unsubscribe = function(name, emitter) {
var index = this._subscriptions[name].indexOf(emitter);
if (index > -1) {
this._subscriptions[name].splice(index, 1);
}
log.info(emitter.remoteAddress, 'unsubscribe:', 'block/' + name, 'total:', this._subscriptions[name].length);
};
BlockService.prototype.getPublishEvents = function() {
return [
{
name: 'block/block',
scope: this,
subscribe: this.subscribe.bind(this, 'block'),
unsubscribe: this.unsubscribe.bind(this, 'block')
}
];
};
BlockService.prototype.getAPIMethods = function() {
var methods = [
['getInfo', this, this.getInfo, 0],
@ -930,6 +957,11 @@ BlockService.prototype._saveBlock = function(block, callback) {
return callback(err);
}
self._processingBlock = false;
for (var i = 0; i < self._subscriptions.block.length; i++) {
self._subscriptions.block[i].emit('block/block', block);
}
callback();
});

View File

@ -7,8 +7,10 @@ var utils = require('../../utils');
var MempoolService = function(options) {
BaseService.call(this, options);
this._subscriptions = {};
this._subscriptions.transaction = [];
this._db = this.node.services.db;
this._p2p = this.node.services.p2p;
this._network = this.node.network;
@ -28,6 +30,19 @@ util.inherits(MempoolService, BaseService);
MempoolService.dependencies = ['db'];
MempoolService.prototype.getAPIMethods = function() {
MempoolService.prototype.subscribe = function(name, emitter) {
this._subscriptions[name].push(emitter);
log.info(emitter.remoteAddress, 'subscribe:', 'mempool/' + name, 'total:', this._subscriptions[name].length);
};
MempoolService.prototype.unsubscribe = function(name, emitter) {
var index = this._subscriptions[name].indexOf(emitter);
if (index > -1) {
this._subscriptions[name].splice(index, 1);
}
log.info(emitter.remoteAddress, 'unsubscribe:', 'mempool/' + name, 'total:', this._subscriptions[name].length);
};
var methods = [
['getMempoolTransaction', this, this.getMempoolTransaction, 1],
['getTxidsByAddress', this, this.getTxsByAddress, 2],
@ -35,6 +50,17 @@ MempoolService.prototype.getAPIMethods = function() {
return methods;
};
MempoolService.prototype.getPublishEvents = function() {
return [
{
name: 'mempool/transaction',
scope: this,
subscribe: this.subscribe.bind(this, 'transaction'),
unsubscribe: this.unsubscribe.bind(this, 'transaction')
}
];
};
MempoolService.prototype.start = function(callback) {
var self = this;
@ -223,6 +249,11 @@ MempoolService.prototype._onTransaction = function(tx) {
log.error(err);
self.node.stop();
}
for (var i = 0; i < self._subscriptions.transaction.length; i++) {
self._subscriptions.transaction[i].emit('mempool/transaction', tx);
}
});
};