This commit is contained in:
Chris Kleeschulte 2017-05-30 08:01:13 -04:00
parent ec826b4940
commit 87df236e35
8 changed files with 435 additions and 1477 deletions

11
lib/constants.js Normal file
View File

@ -0,0 +1,11 @@
'use strict';
module.exports = {
BITCOIN_GENESIS_HASH: {
livenet: '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f',
regtest: '0f9188f13cb7b2c71f2a335e3a4fc328bf5beb436012afca590b1a11466e2206',
testnet: '000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943' //this is testnet3
}
};

View File

@ -14,6 +14,7 @@ var BlockHandler = require('./block_handler');
var LRU = require('lru-cache');
var utils = require('../../utils');
var _ = require('lodash');
var constants = require('../../constants');
var BlockService = function(options) {
BaseService.call(this, options);
@ -27,8 +28,8 @@ var BlockService = function(options) {
keyEncoding: 'string',
valueEncoding: 'binary'
};
this._blockQueue = LRU(50); //hash -> header, height -> header,
this._rawBlockQueue = LRU(10); // keep 10 blocks in the cache in case of reorg's
this._blockHeaderQueue = LRU(50); //hash -> header, height -> header,
this._blockQueue = LRU(10); // keep 10 blocks in the cache in case of reorg's
};
inherits(BlockService, BaseService);
@ -177,20 +178,10 @@ BlockService.prototype.getBlocks = function(blockArgs, callback) {
self._isGenesisBlock.bind(self, blockArg),
function(block, next) {
if (block) {
return next(null, self.genesis);
}
var cachedBlock = self._rawBlockQueue.get(blockArg);
next(null, cachedBlock);
},
function(block, next) {
if (block) {
return next(null, block);
return next(null, self.genesis);
}
self._getBlock(blockArg, next);
@ -209,13 +200,25 @@ BlockService.prototype.getBlocks = function(blockArgs, callback) {
// index, but we want to mark it as "-REORG"
var reorgOperations = self._getReorgOperations(reorgHash, reorgHeight);
var headers = blocks.map(function(block) {
return {
var headers = [];
var tipIndexHeight = self.tip.__height;
for(var i = 0; i < blocks.length; i++) {
var block = blocks[i];
if (block.__height !== ++tipIndexHeight) {
block.height = tipIndexHeight;
block.__height = tipIndexHeight;
self._blockQueue.set(block.hash, block);
}
headers.push({
hash: block.hash,
prevHash: utils.reverseBufferToString(block.header.prevHash),
height: block.__height
};
});
height: tipIndexHeight
});
}
var operations = self._getBlockOperations(headers);
@ -236,7 +239,8 @@ BlockService.prototype.getBlocks = function(blockArgs, callback) {
BlockService.prototype.getBlockHeader = function(blockArg, callback) {
var self = this;
var header = self._blockQueue.get(blockArg);
var header = self._blockHeaderQueue.get(blockArg);
if (header) {
return setImmediate(function() {
callback(null, header);
@ -249,7 +253,7 @@ BlockService.prototype.getBlockHeader = function(blockArg, callback) {
return callback(err);
}
self._setBlockQueue(header);
self._setBlockHeaderQueue(header);
callback(null, header);
});
@ -282,10 +286,10 @@ BlockService.prototype._startSubscriptions = function() {
log.info('New block received: ' + block.hash + ' at height: ' + block.height);
self._cacheRawBlock(block);
var header = self._getHeader(block);
self._setBlockQueue(header);
self._setBlockHeaderQueue(header);
self._detectReorg([block], function() {
self._blockHandler.sync(block);
self._blockHandler.sync(block);
});
});
@ -297,7 +301,7 @@ BlockService.prototype._startSubscriptions = function() {
BlockService.prototype._cacheRawBlock = function(block) {
log.debug('Setting block: ' + block.hash + ' in the raw block cache.');
this._rawBlockQueue.set(block.hash, block);
this._blockQueue.set(block.hash, block);
};
BlockService.prototype._getBlocks = function(callback) {
@ -367,48 +371,46 @@ BlockService.prototype._getHeader = function(block) {
};
};
BlockService.prototype._setBlockQueue = function(header) {
BlockService.prototype._setBlockHeaderQueue = function(header) {
this._blockQueue.set(header.height, header);
this._blockQueue.set(header.hash, header);
this._blockHeaderQueue.set(header.height, header);
this._blockHeaderQueue.set(header.hash, header);
};
BlockService.prototype._setHandlers = function() {
var self = this;
self.node.once('ready', function() {
self.genesis = bitcore.Block.fromBuffer(self.bitcoind.genesisBuffer);
self.genesis.__height = 0;
self.genesis.height = 0;
var genesisHeader = self._getHeader(self.genesis);
self._setBlockQueue(genesisHeader);
self.db.batch(self._getBlockOperations(genesisHeader), function(err) {
if (err) {
log.error('Unable to store genesis block in block index.');
return;
}
self._sync();
});
self._sync();
});
self._blockHandler.on('synced', function() {
log.info('Synced: ' + self.tip.hash);
self._startSubscriptions();
});
};
BlockService.prototype._getGenesisBlock = function() {
this.genesis = {};
this.genesis.height = 0;
this.genesis.hash = constants.BITCOIN_GENESIS_HASH[this.node.getNetworkName()];
return this.genesis;
};
BlockService.prototype._detectStartupReorg = function(callback) {
var self = this;
// does our block's header exist on the network. and if, so, is it the correct height?
var height = self.bitcoind.height;
var hash = self.bitcoind.tiphash;
if (self.tip.height === 0) {
return callback();
}
self.getBlockHeader(hash, function(err, header) {
@ -420,7 +422,8 @@ BlockService.prototype._detectStartupReorg = function(callback) {
return callback();
}
self._handleReorg(header.hash, callback);
var opts = { preReorgTipHash: hash, preReorgTipHeight: header.height };
self._handleReorg(header.hash, opts, callback);
});
};
@ -444,12 +447,21 @@ BlockService.prototype._handleReorg = function(hash, callback) {
self.printTipInfo('Reorg successful!');
self.reorg = false;
callback();
self.cleanupAfterReorg(callback);
});
};
BlockService.prototype._decodeTipData = function(tipDataBuf) {
var hash = tipDataBuf.slice(0, 32).toString('hex');
var height = tipDataBuf.slice(32).toString('hex').readUInt32BE();
return {
hash: hash,
height: height
};
};
BlockService.prototype._loadTips = function(callback) {
var self = this;
@ -464,25 +476,15 @@ BlockService.prototype._loadTips = function(callback) {
return next(err);
}
var hash;
if (!tipData) {
self[tip] = self.genesis;
self[tip] = self._getGenesisBlock();
return next();
}
hash = tipData.slice(0, 32).toString('hex');
self[tip] = self._decodeTipData(tipData);
log.info('loaded ' + tip + ' hash: ' + self[tip].hash + ' height: ' + self[tip].height);
next();
self._getBlock(hash, function(err, block) {
if(err) {
return next(err);
}
self[tip] = block;
log.info('loaded ' + tip + ' hash: ' + block.hash + ' height: ' + block.__height);
next();
});
});
}, function(err) {
@ -502,7 +504,6 @@ BlockService.prototype._detectReorg = function(blocks, callback) {
var tipHash = this.reorgHash || this.tip.hash;
var tipHeight = this.reorgHeight || this.tip.__height;
var forkedHash;
for(var i = 0; i < blocks.length; i++) {
@ -510,8 +511,10 @@ BlockService.prototype._detectReorg = function(blocks, callback) {
continue;
}
if (utils.reverseBufferToString(blocks[i].header.prevHash) !== tipHash) {
return this._handleReorg(forkedHash, callback.bind(this, tipHash, tipHeight));
var prevHash = utils.reverseBufferToString(blocks[i].header.prevHash);
if (prevHash !== tipHash) {
var opts = { preReorgTipHash: tipHash, preReorgTipHeight: tipHeight };
return this._handleReorg(prevHash, opts, callback);
}
tipHash = blocks[i].hash;
@ -583,6 +586,14 @@ BlockService.prototype._isGenesisBlock = function(blockArg, callback) {
BlockService.prototype._getBlock = function(blockArg, callback) {
var self = this;
var cachedBlock = self._blockQueue.get(blockArg);
if (cachedBlock) {
return setImmediate(function() {
callback(null, cachedBlock);
});
}
self.bitcoind.getBlock(blockArg, function(err, block) {
if(err) {

View File

@ -1,14 +1,14 @@
'use strict';
var p2p = require('bitcore-p2p');
var messages = new p2p.Messages();
var LRU = require('lru-cache');
var util = require('util');
var _ = require('lodash');
var bitcore = require('bitcore-lib');
var index = require('../');
var index = require('../../');
var log = index.log;
var Service = require('../../service');
var BaseService = require('../../service');
var constants = require('../../constants');
/*
Purpose:
@ -19,37 +19,42 @@ var Service = require('../../service');
5. broadcast messages on behalf of subscribers (block headers, blocks, bloom filters)
*/
var P2P = function(options) {
if (!(this instanceof P2P)) {
return new P2P(options);
}
Service.call(this, options);
BaseService.call(this, options);
this.options = options;
this._maxPeers = this.options.maxPeers || 60;
this._peers = this.options.peers;
this.subscriptions = {};
this.subscriptions.block = [];
this.subscriptions.transaction = [];
this.messages = new p2p.Messages({ network: this.node.network });
};
util.inherits(P2P, Service);
util.inherits(P2P, BaseService);
P2P.dependencies = [];
P2P.prototype.start = function(callback) {
var self = this;
self.once('synced', function() {
self._initPool();
self._initCache();
self._pool.connect();
self._synced = true;
});
self._initCache();
self._initPool();
this._setupListeners();
callback();
};
P2P.prototype.stop = function(callback) {
var self = this;
setImmediate(function() {
self._pool.disconnect();
callback();
});
self._pool.disconnect();
callback();
};
P2P.prototype.subscribe = function(name, emitter) {
@ -67,71 +72,83 @@ P2P.prototype.unsubscribe = function(name, emitter) {
P2P.prototype._initCache = function() {
this._inv = LRU(2000);
this._cache = LRU({
max: this._maxMempoolSize,
length: function(tx) { return tx.toBuffer().length; }
});
this._cache = [];
};
P2P.prototype._initPool = function() {
var opts = {};
var _addrs = [];
if (this.addrs && _.isArray(this.addrs)) {
for(var i = 0; i < this.addrs.length; i++) {
_addrs.push({
ip: {
v4: this.addrs[i]
}
});
}
opts.addrs = _addrs;
if (this._peers) {
opts.addrs = this._peers;
opts.dnsSeed = false;
}
opts.maxPeers = this._maxPeers;
opts.network = this.node.getNetworkName();
this._pool = new p2p.Pool(opts);
this._setupListeners();
};
P2P.prototype.validTx = function(tx) {
return tx;
};
P2P.prototype._setupListeners = function() {
var self = this;
self._pool.on('peerready', function(peer, addr) {
log.info('Connected to peer: ' + addr.ip.v4);
peer.sendMessage(messages.MemPool());
log.info('Connected to peer: ' + addr.ip.v4 + ', network: ' +
peer.network.alias + ', version: ' + peer.version + ', subversion: ' +
peer.subversion + ', status: ' + peer.status + ', port: ' +
peer.port + ', best height: ' + peer.bestHeight);
peer.sendMessage(self.messages.MemPool());
});
self._pool.on('peerdisconnect', function(peer, addr) {
log.info('Disconnected from peer: ' + addr.ip.v4);
});
self._pool.on('peerinv', function(peer, message) {
var invList = [];
var newDataNeeded = [];
message.inventory.forEach(function(inv) {
var hash = self._inv.get(inv.hash);
if (inv.type === 1 && !hash) {
if (!self._inv.get(inv.hash)) {
self._inv.set(inv.hash, true);
invList.push(inv);
//self._generateRetrievalMessage()
newDataNeeded.push(inv);
}
});
peer.sendMessage(messages.GetData(invList));
if (newDataNeeded.length > 0) {
peer.sendMessage(self.messages.GetData(newDataNeeded));
}
});
self._pool.on('peertx', function(peer, message) {
var tx = new bitcore.Transaction(message.transaction);
if (self.validTx(tx)) {
return self._cache.set(tx.id, tx);
}
return self._operations.push({
type: 'put',
key: new Buffer(tx.id),
value: tx.toBuffer()
});
self._pool.on('peertx', self._broadcastTx.bind(self));
self._pool.on('peerblock', self._broadcastBlock.bind(self));
self.node.on('ready', function() {
setTimeout(function() {
self._pool.connect();
}, 1000);
});
};
P2P.prototype._broadcastTx = function(peer, message) {
for (var i = 0; i < this.subscriptions.transaction.length; i++) {
this.subscriptions.transaction[i].emit('p2p/transaction', message.transaction);
}
};
P2P.prototype._broadcastBlock = function(peer, message) {
for (var i = 0; i < this.subscriptions.block.length; i++) {
this.subscriptions.block[i].emit('p2p/block', message.block);
}
};
P2P.prototype.getAPIMethods = function() {
@ -152,35 +169,7 @@ P2P.prototype.getPublishEvents = function() {
subscribe: this.subscribe.bind(this, 'block'),
unsubscribe: this.unsubscribe.bind(this, 'block')
},
{
name: 'bitcoind/rawblock',
scope: this,
subscribe: this.subscribe.bind(this, 'rawblock'),
unsubscribe: this.unsubscribe.bind(this, 'rawblock')
}
];
return [];
};
P2P.prototype.blockHandler = function(block, connected, callback) {
var self = this;
var operations = [];
if (!self._synced) {
return callback(operations);
}
var action = 'put';
var reverseAction = 'del';
if (!connected) {
action = 'del';
reverseAction = 'put';
}
block.transactions.forEach(function(tx) {
self._cache.del(tx.id);
});
};
module.exports = P2P;

View File

@ -72,7 +72,7 @@
},
"devDependencies": {
"benchmark": "1.0.0",
"bitcore-p2p": "^1.1.0",
"bitcore-p2p": "",
"chai": "^3.5.0",
"coveralls": "^2.11.9",
"istanbul": "^0.4.3",

View File

@ -4,7 +4,7 @@ var chai = require('chai');
var expect = chai.expect;
var async = require('async');
var path = require('path');
var utils = require('./utils');
var Utils = require('./utils');
var zmq = require('zmq');
var http = require('http');
var blocks = require('../test/data/blocks.json');
@ -80,6 +80,7 @@ var opts = {
bitcoreDataDir: bitcoreDataDir,
blockHeight: 0
};
var utils = new Utils(opts);
var genesis = new Block(new Buffer(blocks.genesis, 'hex'));
var block1 = new Block(new Buffer(blocks.block1a, 'hex'));
@ -109,7 +110,7 @@ function publishBlockHash(rawBlockHex, callback) {
pubSocket.send([ 'rawblock', new Buffer(rawBlockHex, 'hex') ]);
var httpOpts = utils.getHttpOpts(opts, { path: '/info' });
var httpOpts = utils.getHttpOpts({ path: '/info' });
// we don't know exactly when all the blockhandlers will complete after the "tip" event
// so we must wait an indeterminate time to check on the current tip
@ -184,10 +185,8 @@ describe('DB Operations', function() {
setupFakeZmq();
self.opts = Object.assign({}, opts);
utils.startBitcoreNode(self.opts, function() {
utils.waitForBitcoreNode(self.opts, done);
utils.startBitcoreNode(function() {
utils.waitForBitcoreNode(done);
});
});
@ -211,17 +210,15 @@ describe('DB Operations', function() {
async.series([
publishBlockHash.bind(self, rawBlock1),
publishBlockHash.bind(self, rawBlock2)
publishBlockHash.bind(self, rawBlock2),
function(next) {
utils.opts.blockHeight++;
next();
},
utils.waitForBitcoreNode.bind(utils)
], function(err) {
], done);
if(err) {
return done(err);
}
done();
});
});
});

File diff suppressed because it is too large Load Diff

47
regtest/test_bus.js Normal file
View File

@ -0,0 +1,47 @@
'use strict';
var BaseService = require('../lib/service');
var inherits = require('util').inherits;
var zmq = require('zmq');
var index = require('../lib');
var log = index.log;
var TestBusService = function(options) {
BaseService.call(this, options);
};
inherits(TestBusService, BaseService);
TestBusService.dependencies = ['p2p'];
TestBusService.prototype.start = function(callback) {
var self = this;
self.pubSocket = zmq.socket('pub');
log.info('zmq bound to port: 38332');
self.pubSocket.bind('tcp://127.0.0.1:38332');
self.bus = self.node.openBus({ remoteAddress: 'localhost' });
self.bus.on('p2p/transaction', function(tx) {
self.pubSocket.send([ 'transaction', new Buffer(tx.uncheckedSerialize(), 'hex') ]);
});
self.bus.on('p2p/block', function(block) {
self.pubSocket.send([ 'block', block.toBuffer() ]);
});
self.bus.subscribe('p2p/transaction');
self.bus.subscribe('p2p/block');
callback();
};
TestBusService.prototype.stop = function(callback) {
callback();
};
module.exports = TestBusService;

View File

@ -127,19 +127,25 @@ Utils.prototype.initializeAndStartService = function(opts, callback) {
var self = this;
rimraf(opts.datadir, function(err) {
if(err) {
return callback(err);
}
mkdirp(opts.datadir, function(err) {
if(err) {
return callback(err);
}
if (opts.configFile) {
self.writeConfigFile(opts.configFile.file, opts.configFile.conf);
}
var args = _.isArray(opts.args) ? opts.args : self.toArgs(opts.args);
opts.process = spawn(opts.exec, args, opts.opts);
callback();
});
});
@ -170,7 +176,22 @@ Utils.prototype.startBitcoreNode = function(callback) {
};
Utils.prototype.startBitcoind = function(callback) {
this.initializeAndStartService(this.opts.bitcoin, callback);
var self = this;
self.initializeAndStartService(self.opts.bitcoin, function() {
// in case you choose to -printtoconsole
self.opts.bitcoin.process.stdout.on('data', function(data) {
if (self.opts.debug) {
process.stdout.write(data.toString());
}
});
self.opts.bitcoin.process.stderr.on('data', function(data) {
process.stdout.write(data.toString());
});
callback();
});
};
Utils.prototype.unlockWallet = function(callback) {