From d4238225d4a5a2b57d0768a15e56dd1345f24d2c Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Tue, 14 Feb 2017 08:05:28 -0500 Subject: [PATCH] more cleanup. --- lib/logger.js | 2 +- lib/services/bitcoind/index.js | 19 +---- lib/services/db/index.js | 2 - lib/services/db/sync.js | 76 +++++++++---------- .../{untrusted-mempool => p2p}/encoding.js | 0 .../{untrusted-mempool => p2p}/index.js | 54 +++++-------- lib/services/p2p/testp2p.js | 40 ++++++++++ 7 files changed, 99 insertions(+), 94 deletions(-) rename lib/services/{untrusted-mempool => p2p}/encoding.js (100%) rename lib/services/{untrusted-mempool => p2p}/index.js (57%) create mode 100644 lib/services/p2p/testp2p.js diff --git a/lib/logger.js b/lib/logger.js index 0e8576c7..bbf960e1 100644 --- a/lib/logger.js +++ b/lib/logger.js @@ -59,7 +59,7 @@ Logger.prototype.warn = function() { */ Logger.prototype._log = function(color) { if (!this.permitWrites) { - return; + //return; } var args = Array.prototype.slice.call(arguments); args = args.slice(1); diff --git a/lib/services/bitcoind/index.js b/lib/services/bitcoind/index.js index 9ceb108d..65623bea 100644 --- a/lib/services/bitcoind/index.js +++ b/lib/services/bitcoind/index.js @@ -389,24 +389,6 @@ Bitcoin.prototype.start = function(callback) { }; -Bitcoin.prototype._getAddressDetailsForOutput = function(output, outputIndex, result, addressStrings) { - if (!output.address) { - return; - } - var address = output.address; - if (addressStrings.indexOf(address) >= 0) { - if (!result.addresses[address]) { - result.addresses[address] = { - inputIndexes: [], - outputIndexes: [outputIndex] - }; - } else { - result.addresses[address].outputIndexes.push(outputIndex); - } - result.satoshis += output.satoshis; - } -}; - Bitcoin.prototype._maybeGetBlockHash = function(blockArg, callback) { var self = this; if (_.isNumber(blockArg) || (blockArg.length < 40 && /^[0-9]+$/.test(blockArg))) { @@ -432,6 +414,7 @@ Bitcoin.prototype.getRawBlock = function(blockArg, callback) { return callback(err); } self._tryAllClients(function(client, done) { + self.client.getBlock(blockhash, false, function(err, response) { if (err) { return done(self._wrapRPCError(err)); diff --git a/lib/services/db/index.js b/lib/services/db/index.js index 1b866e6a..be814780 100644 --- a/lib/services/db/index.js +++ b/lib/services/db/index.js @@ -168,11 +168,9 @@ DB.prototype.start = function(callback) { }); this.node.once('ready', function() { - // start syncing log.permitWrites = false; self._sync.initialSync(); - // Notify that there is a new tip self.node.services.bitcoind.on('tip', function() { self._sync.sync(); }); diff --git a/lib/services/db/sync.js b/lib/services/db/sync.js index a8a31f98..bedfe74a 100644 --- a/lib/services/db/sync.js +++ b/lib/services/db/sync.js @@ -18,7 +18,6 @@ function BlockStream(highWaterMark, bitcoind, lastHeight) { this.stopping = false; this.queue = []; this.processing = false; - this.latestHeightRetrieved = 0; } inherits(BlockStream, Readable); @@ -33,11 +32,10 @@ function ProcessConcurrent(highWaterMark, db) { inherits(ProcessConcurrent, Transform); -function ProcessSerial(highWaterMark, db, tip, progressBar) { +function ProcessSerial(highWaterMark, db, tip) { Writable.call(this, {objectMode: true, highWaterMark: highWaterMark}); this.db = db; this.tip = tip; - this.progressBar = progressBar; this.processBlockStartTime = []; } @@ -70,7 +68,6 @@ function Sync(node, db) { inherits(Sync, EventEmitter); -// Use a concurrent strategy to sync Sync.prototype.initialSync = function() { var self = this; @@ -78,33 +75,12 @@ Sync.prototype.initialSync = function() { return; } - self.lastReportedBlock = self.db.tip.__height; - self.progressBar = new ProgressBar('[:bar] :percent :current blks, :blockspersec blks/sec, :elapsed secs', { - complete: green, - incomplete: red, - total: self.node.services.bitcoind.height, - clear: true - }); - - self.progressBar.tick(self.db.tip.__height, { - blockspersec: 0 - }); - - var timer = setInterval(function () { - var tick = self.db.tip.__height - self.lastReportedBlock; - self.progressBar.tick(tick, { blockspersec: tick }); - self.lastReportedBlock = self.db.tip.__height; - if (self.progressBar.complete) { - clearInterval(timer); - } - }, 1000); - self.syncing = true; self.blockStream = new BlockStream(self.highWaterMark, self.node.services.bitcoind, self.db.tip.__height); var processConcurrent = new ProcessConcurrent(self.highWaterMark, self.db); var writeStream = new WriteStream(self.highWaterMark, self.db); - var processSerial = new ProcessSerial(self.highWaterMark, self.db, self.db.tip, self.progressBar); + var processSerial = new ProcessSerial(self.highWaterMark, self.db, self.db.tip); self._handleErrors(self.blockStream); self._handleErrors(processConcurrent); @@ -122,11 +98,34 @@ Sync.prototype.initialSync = function() { self.blockStream .pipe(processSerial); + + self.lastReportedBlock = self.db.tip.__height; + self.progressBar = new ProgressBar('[:bar] :percent :current blks, :blockspersec blks/sec, :elapsed secs', { + complete: green, + incomplete: red, + total: self.node.services.bitcoind.height, + clear: true + }); + + self.progressBar.tick(self.db.tip.__height, { + blockspersec: 0 + }); + + var timer = setInterval(function () { + var tick = self.db.tip.__height - self.lastReportedBlock; + + self.progressBar.tick(tick, { blockspersec: tick }); + + self.lastReportedBlock = self.db.tip.__height; + + if (!self.syncing) { + self.progressBar.terminate(); + clearInterval(timer); + } + }, 1000); + }; -// Get concurrent and serial block operations and write them together block by block -// Useful for when we are fully synced and we want to advance the concurrentTip and -// the tip together Sync.prototype.sync = function() { var self = this; if(this.syncing || this.db.reorg) { @@ -140,7 +139,6 @@ Sync.prototype.sync = function() { this._handleErrors(this.blockStream); this._handleErrors(processBoth); - //TODO what should happen here processBoth.on('finish', function() { self.syncing = false; }); @@ -166,7 +164,6 @@ Sync.prototype._handleErrors = function(stream) { } if(err.reorg2) { - // squelch this error return; } @@ -184,7 +181,9 @@ BlockStream.prototype._read = function() { BlockStream.prototype._process = function() { var self = this; - // TODO push null when we've fully synced + if (!self.syncing) { + return self.push(null); + } if(this.processing) { return; @@ -206,8 +205,6 @@ BlockStream.prototype._process = function() { block.__height = height; setTimeout(function() { - // we're getting blocks from bitcoind too fast? - // it pauses at 8100 next(null, block); }, 1); }); @@ -218,6 +215,11 @@ BlockStream.prototype._process = function() { for(var i = 0; i < blocks.length; i++) { self.push(blocks[i]); + if (blocks[i].hash === self.bitcoind.tip) { + self.syncing = false; + self.emit('synced'); + break; + }; } next(); @@ -234,7 +236,6 @@ BlockStream.prototype._process = function() { ProcessSerial.prototype._write = function(block, enc, callback) { var self = this; - self.processBlockStartTime = process.hrtime(); function check() { return self.db.concurrentTip.__height >= block.__height; @@ -347,9 +348,7 @@ WriteStream.prototype._write = function(obj, enc, callback) { self.db.concurrentTip = obj.concurrentTip; self.db.emit('concurrentaddblock'); - //if(self.db.concurrentTip.__height - self.lastConcurrentOutputHeight >= 100) { - self.lastConcurrentOutputHeight = self.db.concurrentTip.__height; - //} + self.lastConcurrentOutputHeight = self.db.concurrentTip.__height; callback(); }); }; @@ -390,7 +389,6 @@ ProcessBoth.prototype._write = function(block, encoding, callback) { } self.db.tip = block; self.db.concurrentTip = block; - console.log('Tip:', self.db.tip.__height); callback(); }); }); diff --git a/lib/services/untrusted-mempool/encoding.js b/lib/services/p2p/encoding.js similarity index 100% rename from lib/services/untrusted-mempool/encoding.js rename to lib/services/p2p/encoding.js diff --git a/lib/services/untrusted-mempool/index.js b/lib/services/p2p/index.js similarity index 57% rename from lib/services/untrusted-mempool/index.js rename to lib/services/p2p/index.js index ca82879b..f86d9ed8 100644 --- a/lib/services/untrusted-mempool/index.js +++ b/lib/services/p2p/index.js @@ -1,23 +1,5 @@ 'use strict'; -/** - * The UntrustedMempool Service builds upon the Database Service. It is also an adjunct to the - * Transaction Service's memory pool to provide block explorers the chance to report on - * transactions that are bouncing around bitcoin's peer network, but are not necessarily going - * to be validated into your trusted node's memory pool or even sent over the zmq socket. - * The strategy is to permanently save txs in the database that are unlikely to ever make it - * into a block. The reason for this is so the block explorer can have a historical record of these - * transactions. There is also an LRU cache that contains all of the transactions from the pool, - * regardless of validation status. When a block comes in, we can prune out the transactions that - * exist in the block because we know the transaction service has already indexed them. - * @param {Object} options - * @param {Node} options.node - An instance of the node - * @param {String} options.name - An optional name of the service - * @param {Integer} options.maxPeers - An optional number of maximum peers to connect to - * @param {Array} options.peers - A list of ip addresses to explicitly connect to (this disables use of seeds) - * @param {Integer} options.maxMempoolSize - Maximum size of the mempool in this service - */ - var p2p = require('bitcore-p2p'); var messages = new p2p.Messages(); var LRU = require('lru-cache'); @@ -28,9 +10,9 @@ var index = require('../'); var log = index.log; var Service = require('../../service'); -var UntrustedMempool = function(options) { - if (!(this instanceof UntrustedMempool)) { - return new UntrustedMempool(options); +var P2P = function(options) { + if (!(this instanceof P2P)) { + return new P2P(options); } Service.call(this, options); @@ -39,13 +21,17 @@ var UntrustedMempool = function(options) { this._peers = this.options.peers; this._maxMempoolSize = this.options.maxMempoolSize || 100 * 1024 * 1024; this._synced = false; + this.initialHeight = 0; }; -util.inherits(UntrustedMempool, Service); +util.inherits(P2P, Service); -UntrustedMempool.dependencies = [ 'bitcoind', 'db' ]; +P2P.dependencies = [ 'bitcoind', 'db' ]; -UntrustedMempool.prototype.start = function(callback) { +P2P.prototype.start = function(callback) { + //Step 1: connect to peer(s) as per config + //Step 2: memoize the tip + //Step 3: wait for other services to register listeners var self = this; self.once('synced', function() { self._initPrefix(callback); @@ -56,7 +42,7 @@ UntrustedMempool.prototype.start = function(callback) { }); }; -UntrustedMempool.prototype.stop = function(callback) { +P2P.prototype.stop = function(callback) { var self = this; setImmediate(function() { self._pool.disconnect(); @@ -64,7 +50,7 @@ UntrustedMempool.prototype.stop = function(callback) { }); }; -UntrustedMempool.prototype._initPrefix = function(callback) { +P2P.prototype._initPrefix = function(callback) { var self = this; self.node.services.db.getPrefix(self.name, function(err, prefix) { if(err) { @@ -75,7 +61,7 @@ UntrustedMempool.prototype._initPrefix = function(callback) { }); }; -UntrustedMempool.prototype._initCache = function() { +P2P.prototype._initCache = function() { this._inv = LRU(2000); this._cache = LRU({ max: this._maxMempoolSize, @@ -83,7 +69,7 @@ UntrustedMempool.prototype._initCache = function() { }); }; -UntrustedMempool.prototype._initPool = function() { +P2P.prototype._initPool = function() { var opts = {}; var _addrs = []; if (this.addrs && _.isArray(this.addrs)) { @@ -103,11 +89,11 @@ UntrustedMempool.prototype._initPool = function() { this._setupListeners(); }; -UntrustedMempool.prototype.validTx = function(tx) { +P2P.prototype.validTx = function(tx) { return tx; }; -UntrustedMempool.prototype._setupListeners = function() { +P2P.prototype._setupListeners = function() { var self = this; self._pool.on('peerready', function(peer, addr) { @@ -144,15 +130,15 @@ UntrustedMempool.prototype._setupListeners = function() { }); }; -UntrustedMempool.prototype.getAPIMethods = function() { +P2P.prototype.getAPIMethods = function() { return []; }; -UntrustedMempool.prototype.getPublishEvents = function() { +P2P.prototype.getPublishEvents = function() { return []; }; -UntrustedMempool.prototype.blockHandler = function(block, connected, callback) { +P2P.prototype.blockHandler = function(block, connected, callback) { var self = this; var operations = []; @@ -173,4 +159,4 @@ UntrustedMempool.prototype.blockHandler = function(block, connected, callback) { }); }; -module.exports = UntrustedMempool; +module.exports = P2P; diff --git a/lib/services/p2p/testp2p.js b/lib/services/p2p/testp2p.js new file mode 100644 index 00000000..47186035 --- /dev/null +++ b/lib/services/p2p/testp2p.js @@ -0,0 +1,40 @@ +'use strict'; + +var p2p = require('bitcore-p2p'); +var messages = new p2p.Messages(); +var opts = {}; +opts.addrs = [ { ip: { v4: '192.168.3.5' } } ]; +opts.dnsSeed = false; +opts.maxPeers = 1; +opts.network = 'livenet'; +var pool = new p2p.Pool(opts); + +pool.on('peerready', function(peer, addr) { + console.log('Connected to peer: ' + addr.ip.v4); + peer.sendMessage(messages.MemPool()); +}); + +pool.on('peerdisconnect', function(peer, addr) { + console.log('Disconnected from peer: ' + addr.ip.v4); +}); + +pool.on('peerinv', function(peer, message) { + var invList = []; + message.inventory.forEach(function(inv) { + invList.push(inv); + }); + peer.sendMessage(messages.GetData(invList)); +}); + +pool.on('peertx', function(peer, message) { + var tx = new bitcore.Transaction(message.transaction); + if (self.validTx(tx)) { + return self._cache.set(tx.id, tx); + } + return self._operations.push({ + type: 'put', + key: new Buffer(tx.id), + value: tx.toBuffer() + }); +}); +