more cleanup.

This commit is contained in:
Chris Kleeschulte 2017-02-14 08:05:28 -05:00
parent 8d98abd080
commit d4238225d4
7 changed files with 99 additions and 94 deletions

View File

@ -59,7 +59,7 @@ Logger.prototype.warn = function() {
*/
Logger.prototype._log = function(color) {
if (!this.permitWrites) {
return;
//return;
}
var args = Array.prototype.slice.call(arguments);
args = args.slice(1);

View File

@ -389,24 +389,6 @@ Bitcoin.prototype.start = function(callback) {
};
Bitcoin.prototype._getAddressDetailsForOutput = function(output, outputIndex, result, addressStrings) {
if (!output.address) {
return;
}
var address = output.address;
if (addressStrings.indexOf(address) >= 0) {
if (!result.addresses[address]) {
result.addresses[address] = {
inputIndexes: [],
outputIndexes: [outputIndex]
};
} else {
result.addresses[address].outputIndexes.push(outputIndex);
}
result.satoshis += output.satoshis;
}
};
Bitcoin.prototype._maybeGetBlockHash = function(blockArg, callback) {
var self = this;
if (_.isNumber(blockArg) || (blockArg.length < 40 && /^[0-9]+$/.test(blockArg))) {
@ -432,6 +414,7 @@ Bitcoin.prototype.getRawBlock = function(blockArg, callback) {
return callback(err);
}
self._tryAllClients(function(client, done) {
self.client.getBlock(blockhash, false, function(err, response) {
if (err) {
return done(self._wrapRPCError(err));

View File

@ -168,11 +168,9 @@ DB.prototype.start = function(callback) {
});
this.node.once('ready', function() {
// start syncing
log.permitWrites = false;
self._sync.initialSync();
// Notify that there is a new tip
self.node.services.bitcoind.on('tip', function() {
self._sync.sync();
});

View File

@ -18,7 +18,6 @@ function BlockStream(highWaterMark, bitcoind, lastHeight) {
this.stopping = false;
this.queue = [];
this.processing = false;
this.latestHeightRetrieved = 0;
}
inherits(BlockStream, Readable);
@ -33,11 +32,10 @@ function ProcessConcurrent(highWaterMark, db) {
inherits(ProcessConcurrent, Transform);
function ProcessSerial(highWaterMark, db, tip, progressBar) {
function ProcessSerial(highWaterMark, db, tip) {
Writable.call(this, {objectMode: true, highWaterMark: highWaterMark});
this.db = db;
this.tip = tip;
this.progressBar = progressBar;
this.processBlockStartTime = [];
}
@ -70,7 +68,6 @@ function Sync(node, db) {
inherits(Sync, EventEmitter);
// Use a concurrent strategy to sync
Sync.prototype.initialSync = function() {
var self = this;
@ -78,33 +75,12 @@ Sync.prototype.initialSync = function() {
return;
}
self.lastReportedBlock = self.db.tip.__height;
self.progressBar = new ProgressBar('[:bar] :percent :current blks, :blockspersec blks/sec, :elapsed secs', {
complete: green,
incomplete: red,
total: self.node.services.bitcoind.height,
clear: true
});
self.progressBar.tick(self.db.tip.__height, {
blockspersec: 0
});
var timer = setInterval(function () {
var tick = self.db.tip.__height - self.lastReportedBlock;
self.progressBar.tick(tick, { blockspersec: tick });
self.lastReportedBlock = self.db.tip.__height;
if (self.progressBar.complete) {
clearInterval(timer);
}
}, 1000);
self.syncing = true;
self.blockStream = new BlockStream(self.highWaterMark, self.node.services.bitcoind, self.db.tip.__height);
var processConcurrent = new ProcessConcurrent(self.highWaterMark, self.db);
var writeStream = new WriteStream(self.highWaterMark, self.db);
var processSerial = new ProcessSerial(self.highWaterMark, self.db, self.db.tip, self.progressBar);
var processSerial = new ProcessSerial(self.highWaterMark, self.db, self.db.tip);
self._handleErrors(self.blockStream);
self._handleErrors(processConcurrent);
@ -122,11 +98,34 @@ Sync.prototype.initialSync = function() {
self.blockStream
.pipe(processSerial);
self.lastReportedBlock = self.db.tip.__height;
self.progressBar = new ProgressBar('[:bar] :percent :current blks, :blockspersec blks/sec, :elapsed secs', {
complete: green,
incomplete: red,
total: self.node.services.bitcoind.height,
clear: true
});
self.progressBar.tick(self.db.tip.__height, {
blockspersec: 0
});
var timer = setInterval(function () {
var tick = self.db.tip.__height - self.lastReportedBlock;
self.progressBar.tick(tick, { blockspersec: tick });
self.lastReportedBlock = self.db.tip.__height;
if (!self.syncing) {
self.progressBar.terminate();
clearInterval(timer);
}
}, 1000);
};
// Get concurrent and serial block operations and write them together block by block
// Useful for when we are fully synced and we want to advance the concurrentTip and
// the tip together
Sync.prototype.sync = function() {
var self = this;
if(this.syncing || this.db.reorg) {
@ -140,7 +139,6 @@ Sync.prototype.sync = function() {
this._handleErrors(this.blockStream);
this._handleErrors(processBoth);
//TODO what should happen here
processBoth.on('finish', function() {
self.syncing = false;
});
@ -166,7 +164,6 @@ Sync.prototype._handleErrors = function(stream) {
}
if(err.reorg2) {
// squelch this error
return;
}
@ -184,7 +181,9 @@ BlockStream.prototype._read = function() {
BlockStream.prototype._process = function() {
var self = this;
// TODO push null when we've fully synced
if (!self.syncing) {
return self.push(null);
}
if(this.processing) {
return;
@ -206,8 +205,6 @@ BlockStream.prototype._process = function() {
block.__height = height;
setTimeout(function() {
// we're getting blocks from bitcoind too fast?
// it pauses at 8100
next(null, block);
}, 1);
});
@ -218,6 +215,11 @@ BlockStream.prototype._process = function() {
for(var i = 0; i < blocks.length; i++) {
self.push(blocks[i]);
if (blocks[i].hash === self.bitcoind.tip) {
self.syncing = false;
self.emit('synced');
break;
};
}
next();
@ -234,7 +236,6 @@ BlockStream.prototype._process = function() {
ProcessSerial.prototype._write = function(block, enc, callback) {
var self = this;
self.processBlockStartTime = process.hrtime();
function check() {
return self.db.concurrentTip.__height >= block.__height;
@ -347,9 +348,7 @@ WriteStream.prototype._write = function(obj, enc, callback) {
self.db.concurrentTip = obj.concurrentTip;
self.db.emit('concurrentaddblock');
//if(self.db.concurrentTip.__height - self.lastConcurrentOutputHeight >= 100) {
self.lastConcurrentOutputHeight = self.db.concurrentTip.__height;
//}
self.lastConcurrentOutputHeight = self.db.concurrentTip.__height;
callback();
});
};
@ -390,7 +389,6 @@ ProcessBoth.prototype._write = function(block, encoding, callback) {
}
self.db.tip = block;
self.db.concurrentTip = block;
console.log('Tip:', self.db.tip.__height);
callback();
});
});

View File

@ -1,23 +1,5 @@
'use strict';
/**
* The UntrustedMempool Service builds upon the Database Service. It is also an adjunct to the
* Transaction Service's memory pool to provide block explorers the chance to report on
* transactions that are bouncing around bitcoin's peer network, but are not necessarily going
* to be validated into your trusted node's memory pool or even sent over the zmq socket.
* The strategy is to permanently save txs in the database that are unlikely to ever make it
* into a block. The reason for this is so the block explorer can have a historical record of these
* transactions. There is also an LRU cache that contains all of the transactions from the pool,
* regardless of validation status. When a block comes in, we can prune out the transactions that
* exist in the block because we know the transaction service has already indexed them.
* @param {Object} options
* @param {Node} options.node - An instance of the node
* @param {String} options.name - An optional name of the service
* @param {Integer} options.maxPeers - An optional number of maximum peers to connect to
* @param {Array} options.peers - A list of ip addresses to explicitly connect to (this disables use of seeds)
* @param {Integer} options.maxMempoolSize - Maximum size of the mempool in this service
*/
var p2p = require('bitcore-p2p');
var messages = new p2p.Messages();
var LRU = require('lru-cache');
@ -28,9 +10,9 @@ var index = require('../');
var log = index.log;
var Service = require('../../service');
var UntrustedMempool = function(options) {
if (!(this instanceof UntrustedMempool)) {
return new UntrustedMempool(options);
var P2P = function(options) {
if (!(this instanceof P2P)) {
return new P2P(options);
}
Service.call(this, options);
@ -39,13 +21,17 @@ var UntrustedMempool = function(options) {
this._peers = this.options.peers;
this._maxMempoolSize = this.options.maxMempoolSize || 100 * 1024 * 1024;
this._synced = false;
this.initialHeight = 0;
};
util.inherits(UntrustedMempool, Service);
util.inherits(P2P, Service);
UntrustedMempool.dependencies = [ 'bitcoind', 'db' ];
P2P.dependencies = [ 'bitcoind', 'db' ];
UntrustedMempool.prototype.start = function(callback) {
P2P.prototype.start = function(callback) {
//Step 1: connect to peer(s) as per config
//Step 2: memoize the tip
//Step 3: wait for other services to register listeners
var self = this;
self.once('synced', function() {
self._initPrefix(callback);
@ -56,7 +42,7 @@ UntrustedMempool.prototype.start = function(callback) {
});
};
UntrustedMempool.prototype.stop = function(callback) {
P2P.prototype.stop = function(callback) {
var self = this;
setImmediate(function() {
self._pool.disconnect();
@ -64,7 +50,7 @@ UntrustedMempool.prototype.stop = function(callback) {
});
};
UntrustedMempool.prototype._initPrefix = function(callback) {
P2P.prototype._initPrefix = function(callback) {
var self = this;
self.node.services.db.getPrefix(self.name, function(err, prefix) {
if(err) {
@ -75,7 +61,7 @@ UntrustedMempool.prototype._initPrefix = function(callback) {
});
};
UntrustedMempool.prototype._initCache = function() {
P2P.prototype._initCache = function() {
this._inv = LRU(2000);
this._cache = LRU({
max: this._maxMempoolSize,
@ -83,7 +69,7 @@ UntrustedMempool.prototype._initCache = function() {
});
};
UntrustedMempool.prototype._initPool = function() {
P2P.prototype._initPool = function() {
var opts = {};
var _addrs = [];
if (this.addrs && _.isArray(this.addrs)) {
@ -103,11 +89,11 @@ UntrustedMempool.prototype._initPool = function() {
this._setupListeners();
};
UntrustedMempool.prototype.validTx = function(tx) {
P2P.prototype.validTx = function(tx) {
return tx;
};
UntrustedMempool.prototype._setupListeners = function() {
P2P.prototype._setupListeners = function() {
var self = this;
self._pool.on('peerready', function(peer, addr) {
@ -144,15 +130,15 @@ UntrustedMempool.prototype._setupListeners = function() {
});
};
UntrustedMempool.prototype.getAPIMethods = function() {
P2P.prototype.getAPIMethods = function() {
return [];
};
UntrustedMempool.prototype.getPublishEvents = function() {
P2P.prototype.getPublishEvents = function() {
return [];
};
UntrustedMempool.prototype.blockHandler = function(block, connected, callback) {
P2P.prototype.blockHandler = function(block, connected, callback) {
var self = this;
var operations = [];
@ -173,4 +159,4 @@ UntrustedMempool.prototype.blockHandler = function(block, connected, callback) {
});
};
module.exports = UntrustedMempool;
module.exports = P2P;

View File

@ -0,0 +1,40 @@
'use strict';
var p2p = require('bitcore-p2p');
var messages = new p2p.Messages();
var opts = {};
opts.addrs = [ { ip: { v4: '192.168.3.5' } } ];
opts.dnsSeed = false;
opts.maxPeers = 1;
opts.network = 'livenet';
var pool = new p2p.Pool(opts);
pool.on('peerready', function(peer, addr) {
console.log('Connected to peer: ' + addr.ip.v4);
peer.sendMessage(messages.MemPool());
});
pool.on('peerdisconnect', function(peer, addr) {
console.log('Disconnected from peer: ' + addr.ip.v4);
});
pool.on('peerinv', function(peer, message) {
var invList = [];
message.inventory.forEach(function(inv) {
invList.push(inv);
});
peer.sendMessage(messages.GetData(invList));
});
pool.on('peertx', function(peer, message) {
var tx = new bitcore.Transaction(message.transaction);
if (self.validTx(tx)) {
return self._cache.set(tx.id, tx);
}
return self._operations.push({
type: 'put',
key: new Buffer(tx.id),
value: tx.toBuffer()
});
});