This commit is contained in:
Chris Kleeschulte 2017-06-05 08:17:24 -04:00
parent 84f4dbf7aa
commit 09ff858e81
9 changed files with 526 additions and 412 deletions

View File

@ -3,6 +3,7 @@
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var LRU = require('lru-cache');
var assert = require('assert');
var Service = function(options) {
EventEmitter.call(this);
@ -131,4 +132,53 @@ Service.prototype._retrieveCachedItems = function(key, valueItem, prevKey, fn) {
return resolvedDeps;
};
Service.prototype._getGenesisBlock = function() {
this.genesis = {};
this.genesis.height = 0;
this.genesis.hash = constants.BITCOIN_GENESIS_HASH[this.node.getNetworkName()];
return this.genesis;
};
Service.prototype._decodeTipData = function(tipBuf) {
assert(tipBuf && Buffer.isBuffer(tipBuf) && tipBuf.length === 36,
'tip data: ' + tipBuf.toString('hex') + ' if not in the proper format');
var hash = tipBuf.slice(0, 32).toString('hex');
var height = tipBuf.slice(32).toString('hex').readUInt32BE();
return {
hash: hash,
height: height
};
};
Service.prototype._loadTip = function(callback) {
var self = this;
self.db.get(new Buffer('00', 'hex') + self.name, function(err, tipBuf) {
if (err) {
return callback(err);
}
if (!tipBuf) {
self.tip = self._getGenesisBlock();
return callback();
}
self.tip = self._decodeTipData(tipData);
log.info('Loaded tip for: ' + self.name + ' service, hash: ' +
self.tip.hash + ' height: ' + self.tip.height);
callback();
});
};
module.exports = Service;

View File

@ -18,8 +18,10 @@ var constants = require('../../constants');
var BlockService = function(options) {
BaseService.call(this, options);
this.bitcoind = this.node.services.bitcoind;
this.p2p = this.node.services.p2p;
this.db = this.node.services.db;
this.subscriptions = {};
this.subscriptions.block = [];
this._blockHandler = new BlockHandler(this.node, this);
this._lockTimes = [];
this.tip = null;
@ -30,12 +32,13 @@ var BlockService = function(options) {
};
this._blockHeaderQueue = LRU(50); //hash -> header, height -> header,
this._blockQueue = LRU(10); // keep 10 blocks in the cache in case of reorg's
this._lastReportedTime = Date.now();
};
inherits(BlockService, BaseService);
BlockService.dependencies = [
'bitcoind',
'p2p',
'db'
];
@ -62,28 +65,41 @@ BlockService.prototype.stop = function(callback) {
}
};
BlockService.prototype.getAPIMethods = function() {
var methods = [
['processBlockOperations', this, this.processBlockOperations, 1]
];
return methods;
};
BlockService.prototype.subscribe = function(name, emitter) {
this.subscriptions[name].push(emitter);
log.info(emitter.remoteAddress, 'subscribe:', 'block/' + name, 'total:', this.subscriptions[name].length);
};
BlockService.prototype.unsubscribe = function(name, emitter) {
var index = this.subscriptions[name].indexOf(emitter);
if (index > -1) {
this.subscriptions[name].splice(index, 1);
}
log.info(emitter.remoteAddress, 'unsubscribe:', 'block/' + name, 'total:', this.subscriptions[name].length);
};
BlockService.prototype.getPublishEvents = function() {
return [
{
name: 'block/block',
scope: this,
subscribe: this.subscribe.bind(this, 'block'),
unsubscribe: this.unsubscribe.bind(this, 'block')
}
];
};
BlockService.prototype._sync = function() {
var self = this;
async.waterfall([
self._loadTips.bind(self),
self._detectStartupReorg.bind(self),
self._getBlocks.bind(self)
], function(err, blocksDiff) {
if(err) {
log.error(err);
self.node.stop();
}
if (blocksDiff > 0) {
return self._blockHandler.sync();
}
self._startSubscriptions();
});
self._startSubscriptions();
};
@ -91,58 +107,49 @@ BlockService.prototype.printTipInfo = function(prependedMessage) {
log.info(
prependedMessage + ' Serial Tip: ' + this.tip.hash +
' Concurrent tip: ' + this.concurrentTip.hash +
' Network tip: ' + this.bitcoind.tiphash
' Concurrent tip: ' + this.concurrentTip.hash
);
};
BlockService.prototype.getNetworkTipHash = function() {
return this.bitcoind.tiphash;
BlockService.prototype._reportStatus = function(serviceName) {
var tip = this.tips[serviceName];
if ((Date.now() - this._lastReportedTime) > 1000) {
this._lastReportedTime = Date.now();
log.info(serviceName + ' sync: current height is: ' + tip.height +
' - hash is: ' + tip.hash);
}
};
BlockService.prototype.getBlockOperations = function(block, add, type, callback) {
var operations = [];
BlockService.prototype.processBlockOperations = function(opts, callback) {
async.each(
this.node.services,
function(mod, next) {
if (!_.isArray(opts.operations)) {
return;
}
var fn = mod.blockHandler;
// TODO: when writing to leveldb, it turns out that the optimal batch size to write to the
// database is 1000 bytes to achieve optimal write performance on most systems.
// This is not a comprehensive study, however and certainly not for other db types.
var self = this;
if (type === 'concurrent') {
fn = mod.concurrentBlockHandler;
}
self.db.batch(opts.operations, function(err) {
if (fn) {
fn.call(mod, block, add, function(err, ops) {
if (err) {
return next(err);
}
if (ops) {
operations = operations.concat(ops);
}
next();
});
} else {
setImmediate(next);
}
},
function(err) {
if (err) {
return callback(err);
}
callback(null, operations);
if(err) {
return callback(err);
}
);
if (!opts.serviceName) {
opts.serviceName = 'unknown';
}
self.setTip(opts);
self._reportStatus(opts.serviceName);
callback();
});
};
@ -168,9 +175,15 @@ BlockService.prototype.getTipOperation = function(block, add, tipType) {
};
};
BlockService.prototype.getBlocks = function(blockArgs, callback) {
BlockService.prototype.getBlocks = function(startHash, endHash, callback) {
};
BlockService.prototype._getBlocks = function(startHash, endHash, callback) {
var self = this;
assert(startHash && startHash.length === 64, 'startHash is required to getBlocks');
self.p2p.getBlocks({ startHash: startHash, endHash: endHash });
async.mapLimit(blockArgs, 8, function(blockArg, next) {
@ -236,29 +249,29 @@ BlockService.prototype.getBlocks = function(blockArgs, callback) {
};
BlockService.prototype.getBlockHeader = function(blockArg, callback) {
var self = this;
var header = self._blockHeaderQueue.get(blockArg);
if (header) {
return setImmediate(function() {
callback(null, header);
});
}
self.bitcoind.getBlockHeader(blockArg, function(err, header) {
if(err) {
return callback(err);
}
self._setBlockHeaderQueue(header);
callback(null, header);
});
BlockService.prototype._checkCache = function(key, cache) {
return cache.get(key);
};
BlockService.prototype.getBlockHeader = function(hash, callback) {
var self = this;
var header = self._checkCache(hash, self._blockHeaderQueue);
if (header) {
return callback(null, header);
}
self.p2p.getBlockHeaders(hash);
var timer = setInterval(function() {
var header = self._checkCache(hash, self._blockHeaderQueue);
if (header) {
clearInterval(timer);
callback(null, header);
}
}, 250);
timer.unref();
};
BlockService.prototype.getBlockHash = function(height, callback) {
@ -276,87 +289,45 @@ BlockService.prototype._startSubscriptions = function() {
var self = this;
if (!self._subscribed) {
self._subscribed = true;
self.bus = self.node.openBus({remoteAddress: 'localhost'});
self.bus.on('bitcoind/rawblock', function(block) {
log.info('New block received: ' + block.hash + ' at height: ' + block.height);
self._cacheRawBlock(block);
var header = self._getHeader(block);
self._setBlockHeaderQueue(header);
self._detectReorg([block], function() {
self._blockHandler.sync(block);
});
});
self.bus.subscribe('bitcoind/rawblock');
if (self._subscribed) {
return;
}
self._subscribed = true;
self.bus = self.node.openBus({remoteAddress: 'localhost'});
self.bus.on('p2p/block', self._onBlock.bind(self));
self.bus.on('p2p/headers', self._onHeaders.bind(self));
self.bus.subscribe('p2p/block');
self.bus.subscribe('p2p/headers');
};
BlockService.prototype._cacheRawBlock = function(block) {
log.debug('Setting block: ' + block.hash + ' in the raw block cache.');
BlockService.prototype._onHeaders = function(headers) {
log.debug('New header received: ' + block.hash);
this._cacheHeaders(headers);
};
BlockService.prototype._onBlock = function(block) {
log.debug('New block received: ' + block.hash);
this._cacheBlock(block);
this._broadcast(this.subscriptions.block, 'block/block', block);
};
BlockService.prototype._broadcast = function(subscribers, name, entity) {
for (var i = 0; i < subscribers.length; i++) {
subscribers[i].emit(name, entity);
}
};
BlockService.prototype._cacheBlock = function(block) {
log.debug('Setting block: ' + block.hash + ' in the block cache.');
this._blockQueue.set(block.hash, block);
};
BlockService.prototype._getBlocks = function(callback) {
var self = this;
var blocksDiff = self.bitcoind.height - self.tip.__height;
// we will need to wait for new blocks to be pushed to us
if (blocksDiff < 0) {
log.warn('Peer\'s height is less than our own. The peer may be syncing. ' +
'The system is usable, but the chain may have a reorg in future blocks. ' +
'You should not rely on query responses for heights greater than ' +
self.bitcoind.height + ' until fully synced.');
return callback(null, blocksDiff);
}
log.info('Syncing: ' + blocksDiff + ' blocks from the network.');
var operations = [];
async.timesLimit(blocksDiff, 8, function(n, next) {
var blockNumber = n + self.tip.__height + 1;
self.getBlockHeader(blockNumber, function(err, header) {
if(err) {
return next(err);
}
self._getBlockOperations(header).forEach(function(op) {
operations.push(op);
});
next();
});
}, function(err) {
if(err) {
return callback(err);
}
self.db.batch(operations, function(err) {
if (err) {
return callback(err);
}
log.info('Completed syncing block headers from the network.');
callback(null, blocksDiff);
});
});
};
BlockService.prototype._getHeader = function(block) {
@ -381,52 +352,15 @@ BlockService.prototype._setBlockHeaderQueue = function(header) {
BlockService.prototype._setHandlers = function() {
var self = this;
self.node.once('ready', function() {
self.p2p.once('bestHeight', function(height) {
self._sync();
self._bestHeight = height;
self._loadTip(self._sync);
});
self._blockHandler.on('synced', function() {
log.info('Synced: ' + self.tip.hash);
self._startSubscriptions();
});
};
BlockService.prototype._getGenesisBlock = function() {
this.genesis = {};
this.genesis.height = 0;
this.genesis.hash = constants.BITCOIN_GENESIS_HASH[this.node.getNetworkName()];
return this.genesis;
};
BlockService.prototype._detectStartupReorg = function(callback) {
var self = this;
if (self.tip.height === 0) {
return callback();
}
self.getBlockHeader(hash, function(err, header) {
if (err) {
return callback(err);
}
if (header.height === height) {
return callback();
}
var opts = { preReorgTipHash: hash, preReorgTipHeight: header.height };
self._handleReorg(header.hash, opts, callback);
});
};
BlockService.prototype._handleReorg = function(hash, callback) {
@ -453,53 +387,6 @@ BlockService.prototype._handleReorg = function(hash, callback) {
};
BlockService.prototype._decodeTipData = function(tipDataBuf) {
var hash = tipDataBuf.slice(0, 32).toString('hex');
var height = tipDataBuf.slice(32).toString('hex').readUInt32BE();
return {
hash: hash,
height: height
};
};
BlockService.prototype._loadTips = function(callback) {
var self = this;
var tipStrings = ['tip', 'concurrentTip'];
async.each(tipStrings, function(tip, next) {
self.db.get(self.dbPrefix + tip, self.dbOptions, function(err, tipData) {
if(err && !(err instanceof levelup.errors.NotFoundError)) {
return next(err);
}
if (!tipData) {
self[tip] = self._getGenesisBlock();
return next();
}
self[tip] = self._decodeTipData(tipData);
log.info('loaded ' + tip + ' hash: ' + self[tip].hash + ' height: ' + self[tip].height);
next();
});
}, function(err) {
if(err) {
return callback(err);
}
log.info('Bitcoin network tip is currently: ' + self.bitcoind.tiphash + ' at height: ' + self.bitcoind.height);
callback();
});
};
BlockService.prototype._detectReorg = function(blocks, callback) {
var tipHash = this.reorgHash || this.tip.hash;
@ -583,43 +470,6 @@ BlockService.prototype._isGenesisBlock = function(blockArg, callback) {
};
BlockService.prototype._getBlock = function(blockArg, callback) {
var self = this;
var cachedBlock = self._blockQueue.get(blockArg);
if (cachedBlock) {
return setImmediate(function() {
callback(null, cachedBlock);
});
}
self.bitcoind.getBlock(blockArg, function(err, block) {
if(err) {
return callback(err);
}
if (blockArg.length === 64) {
return self.getBlockHeader(blockArg, function(err, header) {
if(err) {
return callback(err);
}
block.__height = header.height;
block.height = header.height;
callback(null, block);
});
}
block.__height = blockArg;
block.height = blockArg;
callback(null, block);
});
};
BlockService.prototype._getReorgOperations = function(hash, height) {
if (!hash || !height) {

View File

@ -36,13 +36,12 @@ function DB(options) {
this.subscriptions = {};
this.bitcoind = this.node.services.bitcoind;
this._operationsQueue = [];
}
util.inherits(DB, Service);
DB.dependencies = ['bitcoind'];
DB.dependencies = [];
DB.prototype._setDataPath = function() {
$.checkState(this.node.datadir, 'Node is expected to have a "datadir" property');
@ -114,17 +113,37 @@ DB.prototype.start = function(callback) {
};
DB.prototype.get = function(key, options, callback) {
var cb = callback;
var opts = options;
if (typeof callback !== 'function') {
cb = options;
opts = {};
}
if (!this._stopping) {
this._store.get(key, opts, cb);
this._store.get(key, opts, function(err, data) {
if(err && err instanceof levelup.errors.NotFoundError) {
return cb();
}
if (err) {
return cb(err);
}
cb(null, data);
});
} else {
setImmediate(cb);
// TODO: will this tell the caller the right thing?
cb(new Error('Shutdown sequence underway, not able to complete the query'));
}
};
DB.prototype.put = function(key, value, options, callback) {

View File

@ -30,6 +30,7 @@ var P2P = function(options) {
this._configPeers = this.options.peers;
this.subscriptions = {};
this.subscriptions.block = [];
this.subscriptions.headers = [];
this.subscriptions.transaction = [];
this.messages = new p2p.Messages({ network: this.node.network });
this._peerHeights = [];
@ -75,10 +76,10 @@ P2P.prototype.getPublishEvents = function() {
unsubscribe: this.unsubscribe.bind(this, 'block')
},
{
name: 'p2p/header',
name: 'p2p/headers',
scope: this,
subscribe: this.subscribe.bind(this, 'header'),
unsubscribe: this.unsubscribe.bind(this, 'header')
subscribe: this.subscribe.bind(this, 'headers'),
unsubscribe: this.unsubscribe.bind(this, 'headers')
}
];
};
@ -166,7 +167,6 @@ P2P.prototype._onPeerInventory = function(peer, message) {
var self = this;
var newDataNeeded = [];
message.inventory.forEach(function(inv) {
if (!self._inv.get(inv.hash)) {
@ -176,6 +176,7 @@ P2P.prototype._onPeerInventory = function(peer, message) {
newDataNeeded.push(inv);
}
});
if (newDataNeeded.length > 0) {
@ -194,6 +195,10 @@ P2P.prototype._onPeerBlock = function(peer, message) {
this._broadcast(this.subscriptions.block, 'p2p/block', message.block);
};
P2P.prototype._onPeerHeaders = function(peer, message) {
this._broadcast(this.subscriptions.headers, 'p2p/headers', message.headers);
};
P2P.prototype._setupListeners = function() {
var self = this;
@ -202,6 +207,7 @@ P2P.prototype._setupListeners = function() {
self._pool.on('peerinv', self._onPeerInventory.bind(self));
self._pool.on('peertx', self._onPeerTx.bind(self));
self._pool.on('peerblock', self._onPeerBlock.bind(self));
self._pool.on('peerheaders', self._onPeerHeaders.bind(self));
self.node.on('ready', function() {
self._pool.connect();
});
@ -226,7 +232,17 @@ P2P.prototype._applyMempoolFilter = function(message) {
return message;
};
P2P.prototype._getResourceFilter = function(filter, resource) {
P2P.prototype._setResourceFilter = function(filter, resource) {
// startHash is usually the last block or header you have, endHash is a hash that comes after that hash,
// or a block at a greater height
if (resource === 'headers' || resource === 'blocks') {
assert(filter.startHash, 'A "startHash" field is required to retrieve headers or blocks');
if (!filter.endHash) {
filter.endHash = 0;
}
return { starts: [filter.startHash], stop: filter.endHash };
}
// this function knows about mempool, block and header filters
// mempool filters are considered after the tx is delivered to us
@ -236,21 +252,13 @@ P2P.prototype._getResourceFilter = function(filter, resource) {
return;
}
if (resource === 'blocks' || resource === 'headers') {
assert(filter.newestHash, 'A "newestHash" field is required to retrieve blocks or headers');
if (!filter.oldestHash) {
filter.oldestHash = filter.newestHash;
}
return { starts: [filter.newestHash], stop: filter.oldestHash };
}
};
P2P.prototype.getAPIMethods = function() {
var methods = [
['getHeaders', this, this.getHeaders, 2],
['getMempool', this, this.getMempool, 1],
['getBlocks', this, this.getBlocks, 2]
['getHeaders', this, this.getHeaders, 1],
['getMempool', this, this.getMempool, 0],
['getBlocks', this, this.getBlocks, 1]
];
return methods;
};
@ -258,7 +266,7 @@ P2P.prototype.getAPIMethods = function() {
P2P.prototype.getHeaders = function(filter) {
var peer = this._getPeer();
var headerFilter = this._getResourceFilter(filter, 'headers');
var headerFilter = this._setResourceFilter(filter, 'headers');
peer.sendMessage(this.messages.GetHeaders(headerFilter));
};
@ -271,8 +279,7 @@ P2P.prototype.getMempool = function(filter) {
// all manner of txs (low/no fee, "non-standard", etc.). As such, this filter can
// grow quite large over time. Care should be taken to limit the size of this filter.
// Even still, when a tx is broadcasted, the reference to it is dropped from the filter.
this._mempoolFilter = this._getResourceFilter(filter, 'mempool');
this._setResourceFilter(filter, 'mempool');
peer.sendMessage(this.messages.MemPool());
@ -281,9 +288,13 @@ P2P.prototype.getMempool = function(filter) {
P2P.prototype.getBlocks = function(filter) {
var peer = this._getPeer();
var blockFilter = this._getResourceFilter(filter, 'blocks');
var blockFilter = this._setResourceFilter(filter, 'blocks');
peer.sendMessage(this.messages.GetBlocks(blockFilter));
};
P2P.prototype.clearInventoryCache = function() {
this._inv.reset();
};
module.exports = P2P;

View File

@ -286,8 +286,8 @@ WebService.prototype._endpointGetInfo = function() {
result: 'ok',
dbheight: self.node.services.block.tip.__height,
dbhash: self.node.services.block.tip.hash,
bitcoindheight: self.node.services.bitcoind.height,
bitcoindhash: self.node.services.bitcoind.tiphash
networkHeight: self.node.services.bitcoind.height,
networkHash: self.node.services.bitcoind.tiphash
});
};
};

View File

@ -8,8 +8,9 @@ var path = require('path');
var utils = require('./utils');
var debug = true;
var bitcoreDataDir = '/tmp/bitcore';
var bitcoinDataDir = '/tmp/bitcoin';
var extraDebug = true;
var bitcoreDataDir = '/tmp/testtmpfs/bitcore';
var bitcoinDataDir = '/tmp/testtmpfs/bitcoin';
var rpcConfig = {
protocol: 'http',
@ -45,7 +46,7 @@ var bitcore = {
port: 53001,
datadir: bitcoreDataDir,
services: [
'bitcoind',
'p2p',
'db',
'web',
'block',

View File

@ -11,12 +11,16 @@ var zmq = require('zmq');
var bitcore = require('bitcore-lib');
var Transaction = bitcore.Transaction;
var PrivateKey = bitcore.PrivateKey;
var Block = bitcore.Block;
var BlockHeader = bitcore.BlockHeader;
var Unit = bitcore.Unit;
var constants = require('../lib/constants');
var debug = true;
var extraDebug = true;
var bitcoreDataDir = '/tmp/bitcore';
var bitcoinDataDir = '/tmp/bitcoin';
//advisable to use a tmpfs here, much easier on NAND-based disks and good for performance
var bitcoreDataDir = '/tmp/testtmpfs/bitcore';
// to do this on Linux: sudo mount -t tmpfs -o size=512m tmpfs /tmp/testtmpfs
var bitcoinDataDir = '/tmp/testtmpfs/bitcoin';
var rpcConfig = {
protocol: 'http',
@ -101,13 +105,12 @@ var opts = {
satoshisSent: 0,
walletId: crypto.createHash('sha256').update('test').digest('hex'),
satoshisReceived: 0,
initialHeight: 150,
initialHeight: 110,
path: '/test/info',
errorFilter: function(err, res) {
console.log(err, res);
try {
var info = JSON.parse(res);
if (res.result) {
if (info.result) {
return;
}
} catch(e) {
@ -121,13 +124,18 @@ var utils = new Utils(opts);
var subSocket;
var txs = [];
var blocks = [];
var headers = [];
var startingBlockHash;
var count = 0;
function processMessages(topic, message) {
var topicStr = topic.toString();
if (topicStr === 'transaction') {
return txs.push(message);
} else if (topicStr === 'block') {
count++;
return blocks.push(message);
} else if (topicStr === 'headers') {
return headers.push(message);
}
}
@ -150,6 +158,7 @@ function setupZmqSubscriber(callback) {
subSocket.connect('tcp://127.0.0.1:38332');
subSocket.subscribe('transaction');
subSocket.subscribe('block');
subSocket.subscribe('headers');
subSocket.on('message', processMessages);
callback();
}
@ -165,31 +174,18 @@ describe('P2P Operations', function() {
before(function(done) {
async.series([
utils.startBitcoind.bind(utils),
utils.waitForBitcoinReady.bind(utils)
utils.waitForBitcoinReady.bind(utils),
utils.unlockWallet.bind(utils),
utils.setupInitialTxs.bind(utils),
utils.startBitcoreNode.bind(utils),
utils.waitForBitcoreNode.bind(utils),
setupZmqSubscriber,
utils.sendTxs.bind(utils, false)
], done);
});
it('should connect to the p2p network and stream the mempool to clients', function(done) {
async.series([
utils.unlockWallet.bind(utils),
utils.setupInitialTxs.bind(utils),
function(next) {
async.eachSeries(utils.opts.initialTxs, function(tx, next) {
utils.opts.rpc.sendRawTransaction(tx.serialize(), next);
}, next);
},
utils.startBitcoreNode.bind(utils),
setupZmqSubscriber,
utils.waitForBitcoreNode.bind(utils)
], function(err) {
if(err) {
return done(err);
}
describe('Mempool', function() {
it('should send new transactions as they are broadcasted by our trusted peer (unsoliticted)', function(done) {
var initialTxs = {};
@ -205,70 +201,219 @@ describe('P2P Operations', function() {
expect(initialTxs[tx.hash]).to.equal(true);
}
expect(utils.opts.initialTxs.length).to.equal(i);
expect(txs.length).to.equal(i);
done();
});
});
it('should send new transactions as they are broadcasted by our trusted peer (unsoliticted)', function(done) {
var tx;
async.series([
function(next) {
opts.rpc.generate(10, next);
},
function(next) {
utils.getPrivateKeysWithABalance(function(err, spendables) {
it('should connect to the p2p network and stream the mempool to clients', function(done) {
// this tricky because if the p2p service has already asked for the data
// from a particular peer, it will not ask again until the inv hash is dropped
// from its lru cache. So, to fake this out, I will clear this cache manually
txs.length = 0;
utils.queryBitcoreNode(Object.assign({
path: '/test/mempool',
}, bitcore.httpOpts), function(err) {
if(err) {
return next(err);
return done(err);
}
tx = new Transaction()
.from(spendables[0].utxo)
.to(new PrivateKey('testnet').toAddress().toString(),
Unit.fromBTC(spendables[0].utxo.amount).satoshis - 100000)
.fee(100000)
.sign(spendables[0].privKey);
setTimeout(function() {
var initialTxs = {};
utils.sendTx(tx, 0, next);
utils.opts.initialTxs.map(function(tx) {
initialTxs[tx.hash] = true;
return;
});
var i = 0;
for(; i < utils.opts.initialTxs.length; i++) {
var tx = new Transaction(txs[i]);
expect(initialTxs[tx.hash]).to.equal(true);
}
expect(txs.length).to.equal(i);
done();
}, 2000);
});
}
], function(err) {
});
if(err) {
return done(err);
}
it('should be able to set a mempool filter and only get back what is NOT in the filter', function(done) {
var newTx = txs.shift();
newTx = new Transaction(newTx);
var argTxs = txs.map(function(rawTx) {
var tx = new Transaction(rawTx);
return tx.hash;
});
txs.length = 0;
utils.queryBitcoreNode(Object.assign({
path: '/test/mempool?filter=' + JSON.stringify(argTxs)
}, bitcore.httpOpts), function(err) {
setTimeout(function() {
var broadcastedTx = new Transaction(txs[txs.length - 1]);
expect(tx.hash).to.equal(broadcastedTx.hash);
expect(txs.length).to.equal(utils.opts.initialTxs.length + 1);
done();
}, 1000);
if (err) {
return done(err);
}
setTimeout(function() {
var tx = new Transaction(txs[0]);
expect(newTx.hash).to.equal(tx.hash);
expect(txs.length).to.equal(1);
done();
}, 2000);
});
});
});
describe('Block', function() {
it('should get blocks when they are relayed to us', function(done) {
opts.rpc.generate(1, function(err, res) {
if(err) {
return done(err);
}
startingBlockHash = res.result[0];
setTimeout(function() {
expect(blocks.length).to.equal(1);
var block = new Block(blocks[0]);
expect(startingBlockHash).to.equal(block.hash);
done();
}, 2000);
});
});
it('should be able to get historical blocks', function(done) {
blocks.length = 0;
var filter = { startHash: constants.BITCOIN_GENESIS_HASH.regtest };
utils.queryBitcoreNode(Object.assign({
path: '/test/blocks?filter=' + JSON.stringify(filter),
}, bitcore.httpOpts), function(err) {
if(err) {
return done(err);
}
setTimeout(function() {
expect(blocks.length).to.equal(utils.opts.blockHeight + 1);
var lastBlock = new Block(blocks[blocks.length - 1]);
expect(startingBlockHash).to.equal(lastBlock.hash);
done();
}, 2000);
});
});
});
describe('Block Headers', function() {
it('should be able to get historical block headers', function(done) {
var filter = { startHash: constants.BITCOIN_GENESIS_HASH.regtest };
utils.queryBitcoreNode(Object.assign({
path: '/test/headers?filter=' + JSON.stringify(filter),
}, bitcore.httpOpts), function(err) {
if(err) {
return done(err);
}
setTimeout(function() {
expect(headers.length).to.equal(utils.opts.blockHeight + 1);
var lastBlockHeader = new BlockHeader(blocks[blocks.length - 1]);
expect(startingBlockHash).to.equal(lastBlockHeader.hash);
done();
}, 2000);
});
});
it('should return up to 2000 headers in a single call to getHeaders', function(done) {
// p2p note: when asking for a series of headers, your peer will always follow up
// with an additional inventory message after delivering the initial data.
// after getHeaders: an inv message for the block matching the latest header you received.
// remember: getHeaders message does not respond with an inventory message like getBlocks does,
// instead it responds with the header message, but THEN will respond with a single inventory
// message representing the block of the last header delievered.
// For example: if there exists 4 blocks with block hashes a,b,c,d:
// getHeaders({ starts: 'a', stop: 0 }) should receive headers for b,c,d and an inv message for block d.
var additionalBlockCount = 2000 - 111;
headers.length = 0;
opts.rpc.generate(additionalBlockCount, function(err) {
if(err) {
return done(err);
}
var filter = { startHash: constants.BITCOIN_GENESIS_HASH.regtest };
utils.queryBitcoreNode(Object.assign({
path: '/test/headers?filter=' + JSON.stringify(filter),
}, bitcore.httpOpts), function(err) {
if(err) {
return done(err);
}
setTimeout(function() {
expect(headers.length).to.equal(2000);
done();
}, 2000);
});
});
});
it('should return up to 500 blocks in a single call to getBlocks', function(done) {
// p2p note: when asking for a series of headers, your peer will always follow up
// with an additional inventory message after delivering the initial data.
// after getBlocks: an inv message for the block immediately following the last one you received, if
// there more blocks to retrieve. Since there is a 500 block limit in the initial inventory message response,
// when receiving 500 blocks, an additional inventory message will tell you what the next block is and that
// are more blocks to be retrieved.
blocks.length = 0;
count = 0;
var filter = { startHash: constants.BITCOIN_GENESIS_HASH.regtest };
utils.queryBitcoreNode(Object.assign({
path: '/test/blocks?filter=' + JSON.stringify(filter),
}, bitcore.httpOpts), function(err) {
if(err) {
return done(err);
}
setTimeout(function() {
expect(blocks.length).to.equal(501);
done();
}, 2000);
});
});
});
it('should get blocks from peer that we do not have on startup', function(done) {
expect(blocks.length).to.equal(151);
done();
});
it('should be able to set a mempool filter and only get back what is NOT in the filter', function(done) {
var newTx = txs.shift();
utils.queryBitcoreNode(Object.assign({
path: '/mempool?filter=' + txs
}), bitcore.httpOpts, function(err, data) {
if (err) {
return done(err);
}
newTxs = JSON.parse(data).transactions;
expect(newTxs.length).to.equal(2);
expect(newTx).to.equal(newTxs[0].hash);
});
});
});

View File

@ -5,8 +5,6 @@ var inherits = require('util').inherits;
var zmq = require('zmq');
var index = require('../lib');
var log = index.log;
var constants = require('../lib/constants');
var assert = require('assert');
var TestBusService = function(options) {
BaseService.call(this, options);
@ -31,11 +29,10 @@ TestBusService.prototype.start = function(callback) {
self.bus.on('p2p/transaction', function(tx) {
self._cache.transaction.push(tx);
if (self._ready) {
for(var i = 0; i < self._cache.transaction.length; i++) {
while(self._cache.transaction.length > 0) {
var transaction = self._cache.transaction.shift();
self.pubSocket.send([ 'transaction', new Buffer(transaction.uncheckedSerialize(), 'hex') ]);
}
return;
}
});
@ -46,22 +43,33 @@ TestBusService.prototype.start = function(callback) {
self.bus.on('p2p/block', function(block) {
self._cache.block.push(block);
if (self._ready) {
for(var i = 0; i < self._cache.block.length; i++) {
while(self._cache.block.length > 0) {
var blk = self._cache.block.shift();
self.pubSocket.send([ 'block', blk.toBuffer() ]);
}
return;
}
});
self.bus.on('p2p/headers', function(headers) {
headers.forEach(function(header) {
self._cache.headers.push(header);
});
if (self._ready) {
while(self._cache.headers.length > 0) {
var hdr = self._cache.headers.shift();
self.pubSocket.send([ 'headers', hdr.toBuffer() ]);
}
}
});
self.bus.subscribe('p2p/transaction');
self.bus.subscribe('p2p/block');
self.bus.subscribe('p2p/headers');
self.node.on('ready', function() {
self._ready = true;
self.node.services.p2p.getMempool();
self.node.services.p2p.getBlocks({ newestHash: constants.BITCOIN_GENESIS_HASH.regtest });
});
@ -73,17 +81,35 @@ TestBusService.prototype.setupRoutes = function(app) {
var self = this;
app.get('/mempool', function(req, res) {
self.node.services.p2p.getMempool(req.params.filter);
res.status(200);
self.node.services.p2p.clearInventoryCache();
var filter;
if (req.query.filter) {
filter = JSON.parse(req.query.filter);
}
self.node.services.p2p.getMempool(filter);
res.status(200).end();
});
app.get('/blocks', function(req, res) {
self.node.services.p2p.getBlocks(req.params.filter);
res.status(200);
self.node.services.p2p.clearInventoryCache();
var filter;
if (req.query.filter) {
filter = JSON.parse(req.query.filter);
}
self.node.services.p2p.getBlocks(filter);
res.status(200).end();
});
app.get('/headers', function(req, res) {
var filter;
if (req.query.filter) {
filter = JSON.parse(req.query.filter);
}
self.node.services.p2p.getHeaders(filter);
res.status(200).end();
});
app.get('/info', function(req, res) {
console.log('test test test test');
res.status(200).jsonp({ result: (self._ready && (self._bestHeight >= 0))});
});
};

View File

@ -11,6 +11,7 @@ var http = require('http');
var Unit = bitcore.Unit;
var Transaction = bitcore.Transaction;
var PrivateKey = bitcore.PrivateKey;
var assert = require('assert');
var Utils = function(opts) {
this.opts = opts;
@ -94,7 +95,7 @@ Utils.prototype.waitForBitcoreNode = function(callback) {
}
};
}
var httpOpts = self.getHttpOpts({ path: opts.path || '/info', errorFilter: errorFilter });
var httpOpts = self.getHttpOpts({ path: self.opts.path || '/info', errorFilter: errorFilter });
self.waitForService(self.queryBitcoreNode.bind(self, httpOpts), callback);
};
@ -281,24 +282,35 @@ Utils.prototype.setupInitialTxs = function(callback) {
};
Utils.prototype.sendTxs = function(callback) {
Utils.prototype.sendTxs = function(generateBlockAfterEach, callback) {
var self = this;
if (typeof generateBlockAfterEach !== 'function') {
return async.eachSeries(this.opts.initialTxs, function(tx, next) {
self.sendTx(tx, generateBlockAfterEach, next);
}, callback);
}
async.eachOfSeries(this.opts.initialTxs, this.sendTx.bind(this), callback);
};
Utils.prototype.sendTx = function(tx, index, callback) {
var self = this;
self.opts.rpc.sendRawTransaction(tx.serialize(), function(err) {
// sending these too quickly will prevent them from being relayed over the
// p2p network
self.opts.rpc.sendRawTransaction(tx.serialize(), function(err, res) {
if (err) {
return callback(err);
}
assert(res.result === tx.hash, 'sendTx: provided hash did not match returned hash');
var mod = index % 2;
if (mod === 1) {
self.opts.blockHeight++;
self.opts.rpc.generate(1, callback);
} else {
callback();
}
setTimeout(function() {
if (mod === 1) {
self.opts.blockHeight++;
self.opts.rpc.generate(1, callback);
} else {
callback();
}
}, 200);
});
};