This commit is contained in:
Chris Kleeschulte 2017-05-22 15:45:16 -04:00
parent 6cccce833d
commit 6fc170f7da
7 changed files with 1422 additions and 215 deletions

View File

@ -91,7 +91,7 @@ BlockService.prototype.printTipInfo = function(prependedMessage) {
log.info( log.info(
prependedMessage + ' Serial Tip: ' + this.tip.hash + prependedMessage + ' Serial Tip: ' + this.tip.hash +
' Concurrent tip: ' + this.concurrentTip.hash + ' Concurrent tip: ' + this.concurrentTip.hash +
' Bitcoind tip: ' + this.bitcoind.tiphash ' Network tip: ' + this.bitcoind.tiphash
); );
}; };

View File

@ -10,36 +10,38 @@ var index = require('../');
var log = index.log; var log = index.log;
var Service = require('../../service'); var Service = require('../../service');
/*
Purpose:
1. join a P2P
2. relay messages
3. publish new transactions (pub/sub)
4. publish new blocks (pub/sub)
5. broadcast messages on behalf of subscribers (block headers, blocks, bloom filters)
*/
var P2P = function(options) { var P2P = function(options) {
if (!(this instanceof P2P)) { if (!(this instanceof P2P)) {
return new P2P(options); return new P2P(options);
} }
Service.call(this, options); Service.call(this, options);
this.options = options; this.options = options;
this._maxPeers = this.options.maxPeers || 60; this._maxPeers = this.options.maxPeers || 60;
this._peers = this.options.peers; this._peers = this.options.peers;
this._maxMempoolSize = this.options.maxMempoolSize || 100 * 1024 * 1024; this.subscriptions = {};
this._synced = false;
this.initialHeight = 0;
}; };
util.inherits(P2P, Service); util.inherits(P2P, Service);
P2P.dependencies = [ 'bitcoind', 'db' ]; P2P.dependencies = [];
P2P.prototype.start = function(callback) { P2P.prototype.start = function(callback) {
//Step 1: connect to peer(s) as per config
//Step 2: memoize the tip
//Step 3: wait for other services to register listeners
var self = this; var self = this;
self.once('synced', function() { self.once('synced', function() {
self._initPrefix(callback);
self._initPool(); self._initPool();
self._initCache(); self._initCache();
self._pool.connect(); self._pool.connect();
self._synced = true; self._synced = true;
}); });
callback();
}; };
P2P.prototype.stop = function(callback) { P2P.prototype.stop = function(callback) {
@ -50,15 +52,17 @@ P2P.prototype.stop = function(callback) {
}); });
}; };
P2P.prototype._initPrefix = function(callback) { P2P.prototype.subscribe = function(name, emitter) {
var self = this; this.subscriptions[name].push(emitter);
self.node.services.db.getPrefix(self.name, function(err, prefix) { log.info(emitter.remoteAddress, 'subscribe:', 'p2p/' + name, 'total:', this.subscriptions[name].length);
if(err) { };
return callback(err);
} P2P.prototype.unsubscribe = function(name, emitter) {
self.prefix = prefix; var index = this.subscriptions[name].indexOf(emitter);
callback(); if (index > -1) {
}); this.subscriptions[name].splice(index, 1);
}
log.info(emitter.remoteAddress, 'unsubscribe:', 'p2p/' + name, 'total:', this.subscriptions[name].length);
}; };
P2P.prototype._initCache = function() { P2P.prototype._initCache = function() {
@ -135,6 +139,26 @@ P2P.prototype.getAPIMethods = function() {
}; };
P2P.prototype.getPublishEvents = function() { P2P.prototype.getPublishEvents = function() {
return [
{
name: 'p2p/transaction',
scope: this,
subscribe: this.subscribe.bind(this, 'transaction'),
unsubscribe: this.unsubscribe.bind(this, 'transaction')
},
{
name: 'p2p/block',
scope: this,
subscribe: this.subscribe.bind(this, 'block'),
unsubscribe: this.unsubscribe.bind(this, 'block')
},
{
name: 'bitcoind/rawblock',
scope: this,
subscribe: this.subscribe.bind(this, 'rawblock'),
unsubscribe: this.unsubscribe.bind(this, 'rawblock')
}
];
return []; return [];
}; };

View File

@ -10,7 +10,9 @@ function TimestampService(options) {
BaseService.call(this, options); BaseService.call(this, options);
this.currentBlock = null; this.currentBlock = null;
this.currentTimestamp = null; this.currentTimestamp = null;
this._blockHandlerQueue = LRU(50); this._cache = LRU(50);
var genesis = self.node.services.block.genesis;
this._cache.set(genesis.hash, genesis.__height);
} }
inherits(TimestampService, BaseService); inherits(TimestampService, BaseService);
@ -40,28 +42,41 @@ TimestampService.prototype.stop = function(callback) {
}; };
TimestampService.prototype._processBlockHandlerQueue = function(block) { TimestampService.prototype._processBlockHandlerQueue = function(block) {
var self = this; var self = this;
//do we have a record in the queue that is waiting on me to consume it?
var blockTime = block.header.timestamp;
var prevHash = utils.reverseBufferToString(block.header.prevHash); var prevHash = utils.reverseBufferToString(block.header.prevHash);
if (prevHash.length !== 64) {
return;
}
var timestamp = self._blockHandlerQueue.get(prevHash); var prev = self._cache.get(prevHash);
if (timestamp) { if (prev && !prev.prevHash) {
if (block.header.timestamp <= timestamp) {
timestamp = block.header.timestamp + 1; if (blockTime <= prev.time) {
blockTime++;
} }
self.counter++;
timestamp = block.header.timestamp; self._cache.del(prevHash);
self._blockHandlerQueue.del(prevHash); self._cache.set(block.hash, { time: blockTime });
} else { return [{ hash: block.hash, time: blockTime }];
timestamp = block.header.timestamp;
} }
self._blockHandlerQueue.set(block.hash, timestamp); self._cache.set(block.hash, { time: blockTime, prevHash: prevHash });
return timestamp;
var additionalBlocks = [];
var dependentHash = block.hash;
self._cache.rforEach(function(value, key) {
if (dependentHash === value.prevHash) {
additionalBlocks.push({ hash: key, time: value.time });
dependentHash = value.prevHash;
self._cache.del(key);
}
});
return additionalBlocks;
}; };
TimestampService.prototype.blockHandler = function(block, connectBlock, callback) { TimestampService.prototype.blockHandler = function(block, connectBlock, callback) {
@ -70,14 +85,14 @@ TimestampService.prototype.blockHandler = function(block, connectBlock, callback
var action = connectBlock ? 'put' : 'del'; var action = connectBlock ? 'put' : 'del';
var timestamp = self._processBlockHandlerQueue(block); var queue = self._processBlockHandlerQueue(block);
if (!timestamp) {
return callback(null, []);
}
var operations = []; var operations = [];
if (!queue.length < 1) {
return callback(null, []);
}
operations = operations.concat([ operations = operations.concat([
{ {
type: action, type: action,

View File

@ -282,6 +282,7 @@ WebService.prototype.transformHttpsOptions = function() {
WebService.prototype._endpointGetInfo = function() { WebService.prototype._endpointGetInfo = function() {
var self = this; var self = this;
return function(req, res) { return function(req, res) {
console.log(self.node.services);
res.jsonp({ res.jsonp({
result: 'ok', result: 'ok',
dbheight: self.node.services.block.tip.__height, dbheight: self.node.services.block.tip.__height,

View File

@ -82,5 +82,10 @@ utils.reverseBufferToString = function(buf) {
return BufferUtil.reverse(buf).toString('hex'); return BufferUtil.reverse(buf).toString('hex');
}; };
//TODO: write some code here
utils.getIpAddressInfo = function(ipStr) {
//is this ipv4 or ipv6, 4 is 32 bits, 6 is 128 bits
//does this string have colons or periods?
};
module.exports = utils; module.exports = utils;

File diff suppressed because it is too large Load Diff

View File

@ -53,7 +53,8 @@ var bitcore = {
'address', 'address',
'mempool', 'mempool',
'wallet-api', 'wallet-api',
'web' 'web',
'block'
], ],
servicesConfig: { servicesConfig: {
bitcoind: { bitcoind: {