Moved closer toward not relying on zmq and rpc.

This commit is contained in:
Chris Kleeschulte 2017-05-22 08:21:37 -04:00
parent 0e1b21b988
commit 6cccce833d
11 changed files with 727 additions and 467 deletions

View File

@ -285,6 +285,7 @@ Node.prototype.getNetworkName = function() {
* @param {Function} callback - Called when all services are stopped
*/
Node.prototype.stop = function(callback) {
log.info('Beginning shutdown');
var self = this;
var services = this.getServiceOrder().reverse();
@ -293,18 +294,22 @@ Node.prototype.stop = function(callback) {
this.emit('stopping');
async.eachSeries(
services,
function(service, next) {
if (self.services[service.name]) {
log.info('Stopping ' + service.name);
self.services[service.name].stop(next);
} else {
log.info('Stopping ' + service.name + ' (not started)');
setImmediate(next);
}
},
callback
);
services,
function(service, next) {
if (self.services[service.name]) {
log.info('Stopping ' + service.name);
self.services[service.name].stop(next);
} else {
log.info('Stopping ' + service.name + ' (not started)');
setImmediate(next);
}
},
function(err) {
if (callback) {
callback();
}
});
};
module.exports = Node;

View File

@ -131,8 +131,7 @@ function lookInRequirePathConfig(req, service) {
if (!service.config.requirePath) {
return;
}
//if a file, then remove any possible '.js' extension
//if a directory, then leave alone
try {
if (fs.statSync(service.config.requirePath).isDirectory()) {
return req(service.config.requirePath);
@ -140,6 +139,7 @@ function lookInRequirePathConfig(req, service) {
var serviceFile = service.config.requirePath.replace(/.js$/, '');
return req(serviceFile);
} catch(e) {
// we want to throw in all cases expect file not found
}
}
@ -147,7 +147,7 @@ function lookInCwd(req, service) {
try {
return req(process.cwd + '/' + service);
} catch(e) {
// throw in all cases expect file not found
}
}
@ -156,7 +156,6 @@ function lookInBuiltInPath(req, service) {
var serviceFile = path.resolve(__dirname, '../services/' + service.name);
return req(serviceFile);
} catch(e) {
}
}
@ -169,7 +168,6 @@ function lookInModuleManifest(req, service) {
return req(serviceModule);
}
} catch(e) {
}
}

View File

@ -2,7 +2,6 @@
var util = require('util');
var bitcore = require('bitcore-lib');
var Transaction = bitcore.Transaction;
var zmq = require('zmq');
var async = require('async');
var BitcoinRPC = require('bitcoind-rpc');
@ -26,6 +25,7 @@ function Bitcoin(options) {
this.subscriptions = {};
this.subscriptions.rawtransaction = [];
this.subscriptions.hashblock = [];
this.subscriptions.rawblock = [];
this.startRetryTimes = this.options.startRetryTimes || 120;
this.startRetryInterval = this.options.startRetryInterval || 1000;
@ -80,6 +80,12 @@ Bitcoin.prototype.getPublishEvents = function() {
scope: this,
subscribe: this.subscribe.bind(this, 'hashblock'),
unsubscribe: this.unsubscribe.bind(this, 'hashblock')
},
{
name: 'bitcoind/rawblock',
scope: this,
subscribe: this.subscribe.bind(this, 'rawblock'),
unsubscribe: this.unsubscribe.bind(this, 'rawblock')
}
];
};
@ -118,6 +124,19 @@ Bitcoin.prototype._getGenesisBlock = function(callback) {
var self = this;
if (self.height === 0) {
return self.getRawBlock(self.tiphash, function(err, blockBuffer) {
if(err) {
return callback(err);
}
self.genesisBuffer = blockBuffer;
callback();
});
}
self.client.getBlockHash(0, function(err, response) {
if (err) {
@ -190,6 +209,20 @@ Bitcoin.prototype._initChain = function(callback) {
};
Bitcoin.prototype._zmqRawBlockHandler = function(message) {
var block = new bitcore.Block(message);
this.tiphash = block.hash;
this.height++;
block.__height = this.height;
block.height = this.height;
for (var i = 0; i < this.subscriptions.rawblock.length; i++) {
this.subscriptions.rawblock[i].emit('bitcoind/rawblock', block);
}
};
Bitcoin.prototype._zmqBlockHandler = function(message) {
var self = this;
@ -229,12 +262,15 @@ Bitcoin.prototype._subscribeZmqEvents = function(node) {
var self = this;
node.zmqSubSocket.subscribe('hashblock');
node.zmqSubSocket.subscribe('rawtx');
node.zmqSubSocket.subscribe('rawblock');
node.zmqSubSocket.on('message', function(topic, message) {
var topicString = topic.toString('utf8');
if (topicString === 'rawtx') {
self._zmqTransactionHandler(node, message);
} else if (topicString === 'hashblock') {
self._zmqBlockHandler(message);
} else if (topicString === 'rawblock') {
self._zmqRawBlockHandler(message);
}
});
};
@ -419,17 +455,8 @@ Bitcoin.prototype.getBlock = function(blockArg, callback) {
return done(self._wrapRPCError(err));
}
self.getBlockHeader(blockhash, function(err, header) {
if(err) {
return done(self._wrapRPCError(err));
}
var blockObj = bitcore.Block.fromString(response.result);
blockObj.__height = header.height;
done(null, blockObj);
});
var blockObj = bitcore.Block.fromString(response.result);
done(null, blockObj);
});
}, callback);

View File

@ -5,17 +5,16 @@ var Transform = require('stream').Transform;
var inherits = require('util').inherits;
var EventEmitter = require('events').EventEmitter;
var async = require('async');
var bitcore = require('bitcore-lib');
var Block = bitcore.Block;
var index = require('../../index');
var log = index.log;
var _ = require('lodash');
function BlockStream(highWaterMark, sync) {
Readable.call(this, {objectMode: true, highWaterMark: highWaterMark});
this.sync = sync;
this.block = this.sync.block;
this.dbTip = this.block.tip;
this.lastReadHeight = this.dbTip.__height;
this.lastReadHeight = this._getTipHeight();
this.lastEmittedHash = this.dbTip.hash;
this.queue = [];
this.processing = false;
@ -68,15 +67,20 @@ function BlockHandler(node, block) {
this.db = this.node.services.db;
this.block = block;
this.syncing = false;
this.paused = false; //we can't sync while one of our indexes is reading/writing separate from us
this.paused = false;
this.blockQueue = [];
this.highWaterMark = 10;
}
inherits(BlockHandler, EventEmitter);
BlockHandler.prototype.sync = function() {
BlockHandler.prototype.sync = function(block) {
var self = this;
if (block) {
self.blockQueue.push(block);
}
if(this.syncing || this.paused) {
log.debug('Sync lock held, not able to sync at the moment');
return;
@ -84,6 +88,12 @@ BlockHandler.prototype.sync = function() {
self.syncing = true;
self._setupStreams();
};
BlockHandler.prototype._setupStreams = function() {
var self = this;
var blockStream = new BlockStream(self.highWaterMark, self);
var processConcurrent = new ProcessConcurrent(self.highWaterMark, self);
var writeStream = new WriteStream(self.highWaterMark, self);
@ -101,7 +111,6 @@ BlockHandler.prototype.sync = function() {
.pipe(processSerial);
processSerial.on('finish', self._onFinish.bind(self));
};
BlockHandler.prototype._onFinish = function() {
@ -129,7 +138,16 @@ BlockStream.prototype._read = function() {
return this.push(null);
}
this.queue.push(++this.lastReadHeight);
if (this.sync.blockQueue.length === 0) {
this.queue.push(++this.lastReadHeight);
} else {
var block = this.sync.blockQueue.shift();
if (block) {
this.lastReadHeight = block.__height;
this.queue.push(block);
}
}
this._process();
};
@ -149,59 +167,59 @@ BlockStream.prototype._process = function() {
var blockArgs = self.queue.slice(0, Math.min(5, self.queue.length));
self.queue = self.queue.slice(blockArgs.length);
self._getBlocks(blockArgs, next);
if (_.isNumber(blockArgs[0])) {
self.block.getBlocks(blockArgs, function(err, blocks) {
if(err) {
if (err === 'reorg') {
self.push(null);
return next();
}
return next(err);
}
self._pushBlocks(blocks);
next();
});
} else {
self._pushBlocks(blockArgs);
next();
}
}, function(err) {
if(err) {
return self.emit('error', err);
}
self.processing = false;
}
);
};
BlockStream.prototype._getBlocks = function(heights, callback) {
BlockStream.prototype._pushBlocks = function(blocks) {
var self = this;
async.map(heights, function(height, next) {
if (height === 0) {
var block = new Block(self.block.genesis);
block.__height = 0;
return next(null, block);
}
for(var i = 0; i < blocks.length; i++) {
self.block.getBlock(height, function(err, block) {
self.lastEmittedHash = blocks[i].hash;
log.debug('Pushing block: ' + blocks[i].hash + ' from the blockstream');
self.push(blocks[i]);
if (err === 'reorg') {
return self.push(null);
}
}
if(err) {
return next(err);
}
};
block.__height = height;
next(null, block);
});
}, function(err, blocks) {
if(err) {
return callback(err);
}
for(var i = 0; i < blocks.length; i++) {
self.lastEmittedHash = blocks[i].hash;
self.push(blocks[i]);
}
return callback();
});
BlockStream.prototype._getTipHeight = function() {
if (this.dbTip.__height === 0) {
return -1;
}
return this.dbTip.__height;
};
ProcessSerial.prototype._reportStatus = function() {
@ -235,7 +253,7 @@ ProcessSerial.prototype._write = function(block, enc, callback) {
ProcessSerial.prototype._process = function(block, callback) {
var self = this;
self.block.getSerialBlockOperations(block, true, function(err, operations) {
self.block.getBlockOperations(block, true, 'serial', function(err, operations) {
if(err) {
return callback(err);
}
@ -268,7 +286,7 @@ ProcessConcurrent.prototype._transform = function(block, enc, callback) {
this.lastBlock = block;
self.block.getConcurrentBlockOperations(block, true, function(err, operations) {
self.block.getBlockOperations(block, true, 'concurrent', function(err, operations) {
if(err) {
return callback(err);
}
@ -323,7 +341,7 @@ ProcessBoth.prototype._write = function(block, encoding, callback) {
var self = this;
async.parallel([function(next) {
self.block.getConcurrentBlockOperations(block, true, function(err, operations) {
self.block.getBlockOperations(block, true, 'concurrent', function(err, operations) {
if(err) {
return callback(err);
}
@ -331,7 +349,7 @@ ProcessBoth.prototype._write = function(block, encoding, callback) {
next(null, operations);
});
}, function(next) {
self.block.getSerialBlockOperations(block, true, function(err, operations) {
self.block.getBlockOperations(block, true, 'serial', function(err, operations) {
if(err) {
return callback(err);
}

View File

@ -3,17 +3,17 @@
var BaseService = require('../../service');
var levelup = require('levelup');
var bitcore = require('bitcore-lib');
var Block = bitcore.Block;
var inherits = require('util').inherits;
var Encoding = require('./encoding');
var index = require('../../');
var log = index.log;
var BufferUtil = bitcore.util.buffer;
var utils = require('../../utils');
var Reorg = require('./reorg');
var $ = bitcore.util.preconditions;
var async = require('async');
var BlockHandler = require('./block_handler');
var LRU = require('lru-cache');
var utils = require('../../utils');
var _ = require('lodash');
var BlockService = function(options) {
BaseService.call(this, options);
@ -27,6 +27,8 @@ var BlockService = function(options) {
keyEncoding: 'string',
valueEncoding: 'binary'
};
this._blockQueue = LRU(50); //hash -> header, height -> header,
this._rawBlockQueue = LRU(10); // keep 10 blocks in the cache in case of reorg's
};
inherits(BlockService, BaseService);
@ -36,32 +38,6 @@ BlockService.dependencies = [
'db'
];
BlockService.prototype.getNetworkTipHash = function() {
return this.bitcoind.tiphash;
};
BlockService.prototype._startSubscriptions = function() {
var self = this;
if (!self._subscribed) {
self._subscribed = true;
self.bus = self.node.openBus({remoteAddress: 'localhost'});
self.bus.on('bitcoind/hashblock', function() {
self._blockHandler.sync();
});
self.bus.subscribe('bitcoind/hashblock');
}
};
BlockService.prototype._log = function(msg) {
return log.info('BlockService: ', msg);
};
BlockService.prototype.start = function(callback) {
var self = this;
@ -79,180 +55,40 @@ BlockService.prototype.start = function(callback) {
});
};
BlockService.prototype.stop = function(callback) {
if (callback) {
setImmediate(callback);
}
};
BlockService.prototype._sync = function() {
var self = this;
async.waterfall([
self._loadTips.bind(self),
self._detectReorg.bind(self),
self._detectStartupReorg.bind(self),
self._getBlocks.bind(self)
], function(err) {
], function(err, blocksDiff) {
if(err) {
throw err;
log.error(err);
self.node.stop();
}
self._blockHandler.sync();
});
};
BlockService.prototype._getBlocks = function(callback) {
var self = this;
var blocksDiff = self.bitcoind.height - self.tip.__height;
if (blocksDiff < 0) {
self._log('Peer\'s height is less than our own. The peer may be syncing.' +
' The system is usable, but chain may have a reorg in future blocks.' +
' We may not answer queries about blocks at heights greater than ' + self.bitcoind.height);
self._blockHandler.sync();
return;
}
self._log('Syncing: ' + blocksDiff + ' blocks from the network.');
var operations = [];
async.timesLimit(blocksDiff, 8, function(n, next) {
var blockNumber = n + self.tip.__height + 1;
self.bitcoind.getBlockHeader(blockNumber, function(err, header) {
if(err) {
return next(err);
}
operations.push({
type: 'put',
key: self.encoding.encodeBlockHashKey(header.hash),
value: self.encoding.encodeBlockHeightValue(header.height)
});
operations.push({
type: 'put',
key: self.encoding.encodeBlockHeightKey(header.height),
value: self.encoding.encodeBlockHashValue(header.hash)
});
next();
});
}, function(err) {
if(err) {
return callback(err);
if (blocksDiff > 0) {
return self._blockHandler.sync();
}
self.db.batch(operations, callback);
});
};
BlockService.prototype._setHandlers = function() {
var self = this;
self.node.once('ready', function() {
self.genesis = Block.fromBuffer(self.bitcoind.genesisBuffer);
self._sync();
});
self._blockHandler.on('synced', function() {
self._startSubscriptions();
});
};
BlockService.prototype.stop = function(callback) {
setImmediate(callback);
};
BlockService.prototype.pauseSync = function(callback) {
var self = this;
self._lockTimes.push(process.hrtime());
if (self._sync.syncing) {
self._sync.once('synced', function() {
self._sync.paused = true;
callback();
});
} else {
self._sync.paused = true;
setImmediate(callback);
}
};
BlockService.prototype.resumeSync = function() {
this._log('Attempting to resume sync', log.debug);
var time = this._lockTimes.shift();
if (this._lockTimes.length === 0) {
if (time) {
this._log('sync lock held for: ' + utils.diffTime(time) + ' secs', log.debug);
}
this._sync.paused = false;
this._sync.sync();
}
};
BlockService.prototype._detectReorg = function(callback) {
var self = this;
if (self.tip.__height <= 0) {
return callback();
}
// all synced
if (self.tip.hash === self.bitcoind.tiphash && self.tip.__height === self.bitcoind.height) {
return callback();
}
// check if our tip height has the same hash as the network's
self.bitcoind.getBlockHeader(self.tip.__height, function(err, header) {
if(err) {
return callback(err);
}
// we still might have a reorg if our tip is greater than the network's
// we won't know about this until we start syncing
if (header.hash === self.tip.hash) {
return callback();
}
//our hash isn't in the network chain anymore, we have reorg'ed
self._handleReorg(header.hash, callback);
});
};
BlockService.prototype._handleReorg = function(hash, callback) {
var self = this;
self.printTipInfo('Reorg detected!');
self.reorg = true;
var reorg = new Reorg(self.node, self);
reorg.handleReorg(hash, function(err) {
if(err) {
self._log('Reorg failed! ' + err, log.error);
self.node.stop(function() {});
throw err;
}
self.printTipInfo('Reorg successful!');
self.reorg = false;
callback();
});
};
BlockService.prototype.printTipInfo = function(prependedMessage) {
this._log(
log.info(
prependedMessage + ' Serial Tip: ' + this.tip.hash +
' Concurrent tip: ' + this.concurrentTip.hash +
' Bitcoind tip: ' + this.bitcoind.tiphash
@ -260,74 +96,62 @@ BlockService.prototype.printTipInfo = function(prependedMessage) {
};
BlockService.prototype._loadTips = function(callback) {
BlockService.prototype.getNetworkTipHash = function() {
return this.bitcoind.tiphash;
};
BlockService.prototype._getBlockOperations = function(obj) {
var self = this;
var tipStrings = ['tip', 'concurrentTip'];
async.each(tipStrings, function(tip, next) {
self.db.get(self.dbPrefix + tip, self.dbOptions, function(err, tipData) {
if(err && !(err instanceof levelup.errors.NotFoundError)) {
return next(err);
}
var hash;
if (!tipData) {
hash = new Array(65).join('0');
self[tip] = {
height: -1,
hash: hash,
'__height': -1
};
return next();
}
hash = tipData.slice(0, 32).toString('hex');
self.bitcoind.getBlock(hash, function(err, block) {
if(err) {
return next(err);
}
self[tip] = block;
self._log('loaded ' + tip + ' hash: ' + block.hash + ' height: ' + block.__height);
next();
});
if (_.isArray(obj)) {
var ops = [];
_.forEach(obj, function(block) {
ops.push(self._getBlockOperations(block));
});
return _.flatten(ops);
}
}, function(err) {
if(err) {
return callback(err);
}
self._log('Bitcoin network tip is currently: ' + self.bitcoind.tiphash + ' at height: ' + self.bitcoind.height);
callback();
var operations = [];
operations.push({
type: 'put',
key: self.encoding.encodeBlockHashKey(obj.hash),
value: self.encoding.encodeBlockHeightValue(obj.height)
});
operations.push({
type: 'put',
key: self.encoding.encodeBlockHeightKey(obj.height),
value: self.encoding.encodeBlockHashValue(obj.hash)
});
return operations;
};
BlockService.prototype.getConcurrentBlockOperations = function(block, add, callback) {
BlockService.prototype.getBlockOperations = function(block, add, type, callback) {
var operations = [];
async.each(
this.node.services,
function(mod, next) {
if(mod.concurrentBlockHandler) {
$.checkArgument(typeof mod.concurrentBlockHandler === 'function', 'concurrentBlockHandler must be a function');
mod.concurrentBlockHandler.call(mod, block, add, function(err, ops) {
var fn = mod.blockHandler;
if (type === 'concurrent') {
fn = mod.concurrentBlockHandler;
}
if(fn) {
fn.call(mod, block, add, function(err, ops) {
if (err) {
return next(err);
}
if (ops) {
$.checkArgument(Array.isArray(ops), 'concurrentBlockHandler for ' + mod.name + ' returned non-array');
operations = operations.concat(ops);
}
@ -345,40 +169,7 @@ BlockService.prototype.getConcurrentBlockOperations = function(block, add, callb
callback(null, operations);
}
);
};
BlockService.prototype.getSerialBlockOperations = function(block, add, callback) {
var operations = [];
async.eachSeries(
this.node.services,
function(mod, next) {
if(mod.blockHandler) {
$.checkArgument(typeof mod.blockHandler === 'function', 'blockHandler must be a function');
mod.blockHandler.call(mod, block, add, function(err, ops) {
if (err) {
return next(err);
}
if (ops) {
$.checkArgument(Array.isArray(ops), 'blockHandler for ' + mod.name + ' returned non-array');
operations = operations.concat(ops);
}
next();
});
} else {
setImmediate(next);
}
},
function(err) {
if (err) {
return callback(err);
}
callback(null, operations);
}
);
};
BlockService.prototype.getTipOperation = function(block, add, tipType) {
@ -403,72 +194,476 @@ BlockService.prototype.getTipOperation = function(block, add, tipType) {
};
};
BlockService.prototype.getBlock = function(height, callback) {
BlockService.prototype.getBlocks = function(blockArgs, callback) {
var self = this;
if (self.tip.__height >= self.bitcoind.height) {
// if our block service's tip is ahead of the network tip, then we need to
// watch for a reorg by getting what we have for the tip hash and comparing it to
// what the network has.
return self.db.get(self.encoding.encodeBlockHeightKey(height), function(err, hash) {
async.mapLimit(blockArgs, 8, function(blockArg, next) {
async.waterfall([
self._isGenesisBlock.bind(self, blockArg),
function(block, next) {
if (block) {
return next(null, self.genesis);
}
next(null, self._rawBlockQueue.get(blockArg));
},
function(block, next) {
if (block) {
return next(null, block);
}
self._getBlock(blockArg, next);
}
], next);
}, function(err, blocks) {
if(err) {
return callback(err);
}
self._detectReorg(blocks, function(reorgHash, reorgHeight) {
// if we have reorg'ed, we want to retain the block's hash in our
// index, but we want to mark it as "-REORG"
var reorgOperations = self._getReorgOperations(reorgHash, reorgHeight);
var headers = blocks.map(function(block) {
return {
hash: block.hash,
prevHash: utils.reverseBufferToString(block.header.prevHash),
height: block.__height
};
});
var operations = self._getBlockOperations(headers);
if (reorgOperations) {
operations = reorgOperations.concat(operations);
}
self.db.batch(operations, function(err) {
callback(err, blocks);
});
});
});
};
BlockService.prototype.getBlockHeader = function(blockArg, callback) {
var self = this;
var header = self._blockQueue.get(blockArg);
if (header) {
return setImmediate(function() {
callback(null, header);
});
}
self.bitcoind.getBlockHeader(blockArg, function(err, header) {
if(err) {
return callback(err);
}
self._setBlockQueue(header);
callback(null, header);
});
};
BlockService.prototype.getBlockHash = function(height, callback) {
this._getBlockValue(height, callback);
};
BlockService.prototype.getBlockHeight = function(hash, callback) {
this._getBlockValue(hash, callback);
};
BlockService.prototype._startSubscriptions = function() {
var self = this;
if (!self._subscribed) {
self._subscribed = true;
self.bus = self.node.openBus({remoteAddress: 'localhost'});
self.bus.on('bitcoind/rawblock', function(block) {
log.info('New block received: ' + block.hash + ' at height: ' + block.height);
self._cacheRawBlock(block);
var header = self._getHeader(block);
self._setBlockQueue(header);
self._detectReorg([block], function() {
self._blockHandler.sync(block);
});
});
self.bus.subscribe('bitcoind/rawblock');
}
};
BlockService.prototype._cacheRawBlock = function(block) {
log.debug('Setting block: ' + block.hash + ' in the raw block cache.');
this._rawBlockQueue.set(block.hash, block);
};
BlockService.prototype._getBlocks = function(callback) {
var self = this;
var blocksDiff = self.bitcoind.height - self.tip.__height;
// we will need to wait for new blocks to be pushed to us
if (blocksDiff < 0) {
log.warn('Peer\'s height is less than our own. The peer may be syncing. ' +
'The system is usable, but the chain may have a reorg in future blocks. ' +
'You should not rely on query responses for heights greater than ' +
self.bitcoind.height + ' until fully synced.');
return callback(null, blocksDiff);
}
log.info('Syncing: ' + blocksDiff + ' blocks from the network.');
var operations = [];
async.timesLimit(blocksDiff, 8, function(n, next) {
var blockNumber = n + self.tip.__height + 1;
self.getBlockHeader(blockNumber, function(err, header) {
if(err) {
return next(err);
}
self._getBlockOperations(header).forEach(function(op) {
operations.push(op);
});
next();
});
}, function(err) {
if(err) {
return callback(err);
}
self.db.batch(operations, function(err) {
if (err) {
return callback(err);
}
self.bitcoind.getBlock(height, function(res, block) {
log.debug('Completed syncing block headers from the network.');
callback(null, blocksDiff);
});
});
};
BlockService.prototype._getHeader = function(block) {
return {
hash: block.hash,
version: 1,
prevHash: utils.reverseBufferToString(block.header.prevHash),
merkleRoot: utils.reverseBufferToString(block.header.merkleRoot),
time: block.header.time,
height: block.__height
};
};
BlockService.prototype._setBlockQueue = function(header) {
this._blockQueue.set(header.height, header);
this._blockQueue.set(header.hash, header);
};
BlockService.prototype._setHandlers = function() {
var self = this;
self.node.once('ready', function() {
self.genesis = bitcore.Block.fromBuffer(self.bitcoind.genesisBuffer);
self.genesis.__height = 0;
self.genesis.height = 0;
var genesisHeader = self._getHeader(self.genesis);
self._setBlockQueue(genesisHeader);
self.db.batch(self._getBlockOperations(genesisHeader), function(err) {
if (err) {
log.error('Unable to store genesis block in block index.');
return;
}
self._sync();
});
});
self._blockHandler.on('synced', function() {
log.debug('Synced: ' + self.tip.hash);
self._startSubscriptions();
});
};
BlockService.prototype._detectStartupReorg = function(callback) {
var self = this;
// does our block's header exist on the network. and if, so, is it the correct height?
var height = self.bitcoind.height;
var hash = self.bitcoind.tiphash;
self.getBlockHeader(hash, function(err, header) {
if (err) {
if (err.code === -5) {
return callback();
}
return callback(err);
}
if (header.height === height) {
return callback();
}
return self._handleReorg(header.hash, callback);
});
};
BlockService.prototype._handleReorg = function(hash, callback) {
var self = this;
self.printTipInfo('Reorg detected!');
self.reorg = true;
var reorg = new Reorg(self.node, self);
reorg.handleReorg(hash, function(err) {
if(err) {
log.error('Reorg failed! ' + err);
self.node.stop();
}
self.printTipInfo('Reorg successful!');
self.reorg = false;
callback();
});
};
BlockService.prototype._loadTips = function(callback) {
var self = this;
var tipStrings = ['tip', 'concurrentTip'];
async.each(tipStrings, function(tip, next) {
self.db.get(self.dbPrefix + tip, self.dbOptions, function(err, tipData) {
if(err && !(err instanceof levelup.errors.NotFoundError)) {
return next(err);
}
var hash;
if (!tipData) {
self[tip] = self.genesis;
return next();
}
hash = tipData.slice(0, 32).toString('hex');
self.bitcoind.getBlock(hash, function(err, block) {
if(err) {
return next(err);
}
self[tip] = block;
log.info('loaded ' + tip + ' hash: ' + block.hash + ' height: ' + block.__height);
next();
});
});
}, function(err) {
if(err) {
return callback(err);
}
log.info('Bitcoin network tip is currently: ' + self.bitcoind.tiphash + ' at height: ' + self.bitcoind.height);
callback();
});
};
BlockService.prototype._detectReorg = function(blocks, callback) {
var self = this;
var tipHash = self.tip.hash;
var tipHeight = self.tip.__height;
var forkedHash;
for(var i = 0; i < blocks.length; i++) {
if (blocks[i].__height === 0) {
continue;
}
var prevHash = utils.reverseBufferToString(blocks[i].header.prevHash);
if (prevHash !== tipHash) {
forkedHash = blocks[i].hash;
break;
}
tipHash = prevHash;
}
if (forkedHash) {
return self._handleReorg(forkedHash, function() {
callback(tipHash, tipHeight);
});
}
callback();
};
BlockService.prototype._getBlockValue = function(hashOrHeight, callback) {
var self = this;
var key, valueFn;
if (hashOrHeight.length < 64) {
key = self.encoding.encodeBlockHeightKey(parseInt(hashOrHeight));
valueFn = self.encoding.decodeBlockHashValue.bind(self.encoding);
} else {
key = self.encoding.encodeBlockHashKey(hashOrHeight);
valueFn = self.encoding.decodeBlockHeightValue.bind(self.encoding);
}
self.db.get(key, function(err, buf) {
if (err instanceof levelup.errors.NotFoundError) {
return callback();
}
if (err) {
return callback(err);
}
callback(null, valueFn(buf));
});
};
BlockService.prototype._isGenesisBlock = function(blockArg, callback) {
if (blockArg.length === 64) {
return this._getBlockValue(blockArg, function(err, value) {
if (err) {
return callback(null, false);
}
if (value === 0) {
return callback(null, true);
}
callback(null, false);
});
}
setImmediate(function() {
if (blockArg === 0) {
return callback(null, true);
}
callback(null, false);
});
};
BlockService.prototype._getBlock = function(blockArg, callback) {
var self = this;
self.bitcoind.getBlock(blockArg, function(err, block) {
if(err) {
return callback(err);
}
if (blockArg.length === 64) {
return self.getBlockHeader(blockArg, function(err, header) {
if(err) {
return callback(err);
}
//oh noes! reorg sitch
if (hash !== block.hash) {
callback('reorg');
return self._handleReorg(block.hash, function() {
self._blockHandler.sync();
});
}
block.__height = header.height;
block.height = header.height;
callback(null, block);
});
}
block.__height = blockArg;
block.height = blockArg;
callback(null, block);
});
};
});
BlockService.prototype._getReorgOperations = function(hash, height) {
if (!hash || !height) {
return;
}
self.bitcoind.getBlock(height, callback);
};
BlockService.prototype.getBlockHash = function(height, callback) {
var self = this;
self.db.get(this.encoding.encodeBlockHeightKey(height), function(err, hashBuf) {
if (err instanceof levelup.errors.NotFoundError) {
return callback();
}
if (err) {
return callback(err);
}
callback(null, self.encoding.decodeBlockHashValue(hashBuf));
});
};
BlockService.prototype.getBlockHeight = function(hash, callback) {
var self = this;
self.db.get(this.encoding.encodeBlockHashKey(hash), function(err, heightBuf) {
if (err instanceof levelup.errors.NotFoundError) {
return callback();
}
if (err) {
return callback(err);
}
callback(null, self.encoding.decodeBlockHeightValue(heightBuf));
});
var heightKey = self.encoding.encodeBlockHeightKey(height);
var hashKey = self.encoding.encodeBlockHashKey(hash);
var heightValue = self.encoding.encodeBlockHeightValue(height);
var newHashKey = self.encoding.encodeBlockHashKey(hash + '-REORG');
var newHashValue = self.encoding.encodeBlockHashValue(hash + '-REORG');
return [
{ action: 'del', key: heightKey },
{ action: 'del', key: hashKey },
{ action: 'put', key: newHashKey, value: heightValue },
{ action: 'put', key: heightKey, value: newHashValue }
];
};
module.exports = BlockService;

View File

@ -1,13 +1,12 @@
'use strict';
var index = require('../../');
var bitcore = require('bitcore-lib');
var BufferUtil = bitcore.util.buffer;
var log = index.log;
var async = require('async');
function Reorg(node, block) {
this.node = node;
this.block = block;
this.db = block.db;
}
Reorg.prototype.handleReorg = function(newBlockHash, callback) {
@ -40,7 +39,10 @@ Reorg.prototype.handleConcurrentReorg = function(callback) {
return callback();
}
self.findCommonAncestorAndNewHashes(self.block.concurrentTip.hash, self.block.tip.hash, function(err, commonAncestor, newHashes) {
self.findCommonAncestorAndNewHashes(
self.block.concurrentTip.hash,
self.block.tip.hash,
function(err, commonAncestor, newHashes) {
if(err) {
return callback(err);
}
@ -63,25 +65,25 @@ Reorg.prototype.rewindConcurrentTip = function(commonAncestor, callback) {
return self.block.concurrentTip.hash !== commonAncestor;
},
function(next) {
self.block.getConcurrentBlockOperations(self.block.concurrentTip, false, function(err, operations) {
self.block.getBlockOperations(self.block.concurrentTip, false, 'concurrent', function(err, operations) {
if(err) {
return next(err);
}
operations.push(self.block.getTipOperation(self.block.concurrentTip, false, 'concurrentTip'));
self.block.batch(operations, function(err) {
self.db.batch(operations, function(err) {
if(err) {
return next(err);
}
var prevHash = BufferUtil.reverse(self.block.concurrentTip.header.prevHash).toString('hex');
self.node.services.bitcoind.getBlock(prevHash, function(err, block) {
self.block.getBlocks([prevHash], function(err, blocks) {
if(err) {
return next(err);
}
self.block.concurrentTip = block;
self.block.concurrentTip = blocks[0];
next();
});
});
@ -97,23 +99,23 @@ Reorg.prototype.fastForwardConcurrentTip = function(newHashes, callback) {
var self = this;
async.eachSeries(newHashes, function(hash, next) {
self.node.services.bitcoind.getBlock(hash, function(err, block) {
self.block.getBlocks([hash], function(err, blocks) {
if(err) {
return next(err);
}
self.block.getConcurrentBlockOperations(block, true, function(err, operations) {
self.block.getBlockOperations(blocks[0], true, 'concurrent', function(err, operations) {
if(err) {
return next(err);
}
operations.push(self.block.getTipOperation(block, true, 'concurrentTip'));
self.block.batch(operations, function(err) {
operations.push(self.block.getTipOperation(blocks[0], true, 'concurrentTip'));
self.db.batch(operations, function(err) {
if(err) {
return next(err);
}
self.block.concurrentTip = block;
self.block.concurrentTip = blocks[0];
next();
});
});
@ -132,7 +134,7 @@ Reorg.prototype.rewindBothTips = function(commonAncestor, callback) {
async.parallel(
[
function(next) {
self.block.getConcurrentBlockOperations(self.block.concurrentTip, false, function(err, operations) {
self.block.getBlockOperations(self.block.concurrentTip, false, 'concurrent', function(err, operations) {
if(err) {
return next(err);
}
@ -141,7 +143,7 @@ Reorg.prototype.rewindBothTips = function(commonAncestor, callback) {
});
},
function(next) {
self.block.getSerialBlockOperations(self.block.tip, false, function(err, operations) {
self.block.getBlockOperations(self.block.tip, false, 'serial', function(err, operations) {
if(err) {
return next(err);
}
@ -157,20 +159,21 @@ Reorg.prototype.rewindBothTips = function(commonAncestor, callback) {
}
var operations = results[0].concat(results[1]);
self.block.batch(operations, function(err) {
self.db.batch(operations, function(err) {
if(err) {
return next(err);
}
var prevHash = BufferUtil.reverse(self.block.tip.header.prevHash).toString('hex');
self.node.services.bitcoind.getBlock(prevHash, function(err, block) {
self.block.getBlocks([prevHash], function(err, blocks) {
if(err) {
return next(err);
}
self.block.concurrentTip = block;
self.block.tip = block;
self.block.concurrentTip = blocks[0];
self.block.tip = blocks[0];
next();
});
});
@ -185,7 +188,7 @@ Reorg.prototype.fastForwardBothTips = function(newHashes, callback) {
var self = this;
async.eachSeries(newHashes, function(hash, next) {
self.node.services.bitcoind.getBlock(hash, function(err, block) {
self.block.getBlocks([hash], function(err, blocks) {
if(err) {
return next(err);
}
@ -193,22 +196,22 @@ Reorg.prototype.fastForwardBothTips = function(newHashes, callback) {
async.parallel(
[
function(next) {
self.block.getConcurrentBlockOperations(block, true, function(err, operations) {
self.block.getBlockOperations(blocks[0], true, 'concurrent', function(err, operations) {
if(err) {
return next(err);
}
operations.push(self.block.getTipOperation(block, true, 'concurrentTip'));
operations.push(self.block.getTipOperation(blocks[0], true, 'concurrentTip'));
next(null, operations);
});
},
function(next) {
self.block.getSerialBlockOperations(block, true, function(err, operations) {
self.block.getBlockOperations(blocks[0], true, 'serial', function(err, operations) {
if(err) {
return next(err);
}
operations.push(self.block.getTipOperation(block, true));
operations.push(self.block.getTipOperation(blocks[0], true));
next(null, operations);
});
}
@ -220,13 +223,13 @@ Reorg.prototype.fastForwardBothTips = function(newHashes, callback) {
var operations = results[0].concat(results[1]);
self.block.batch(operations, function(err) {
self.db.batch(operations, function(err) {
if(err) {
return next(err);
}
self.block.concurrentTip = block;
self.block.tip = block;
self.block.concurrentTip = blocks[0];
self.block.tip = blocks[0];
next();
});
}
@ -262,7 +265,7 @@ Reorg.prototype.findCommonAncestorAndNewHashes = function(oldTipHash, newTipHash
return next();
}
self.node.services.bitcoind.getBlockHeader(mainPosition, function(err, mainBlockHeader) {
self.block.getBlockHeader(mainPosition, function(err, mainBlockHeader) {
if(err) {
return next(err);
}
@ -281,7 +284,7 @@ Reorg.prototype.findCommonAncestorAndNewHashes = function(oldTipHash, newTipHash
return next();
}
self.node.services.bitcoind.getBlockHeader(forkPosition, function(err, forkBlockHeader) {
self.block.getBlockHeader(forkPosition, function(err, forkBlockHeader) {
if(err) {
return next(err);
}
@ -317,7 +320,7 @@ Reorg.prototype.findCommonAncestorAndNewHashes = function(oldTipHash, newTipHash
next();
}
)
);
},
function(err) {
if(err) {

View File

@ -42,7 +42,7 @@ TimestampService.prototype.stop = function(callback) {
TimestampService.prototype._processBlockHandlerQueue = function(block) {
var self = this;
//do we have a record in the queue that is waiting on me to consume it?
var prevHash = utils.getPrevHashString(block);
var prevHash = utils.reverseBufferToString(block.header.prevHash);
if (prevHash.length !== 64) {
return;
}

View File

@ -78,8 +78,8 @@ utils.diffTime = function(time) {
return (diff[0] * 1E9 + diff[1])/(1E9 * 1.0);
};
utils.getPrevHashString = function(block) {
return BufferUtil.reverse(block.header.prevHash).toString('hex');
utils.reverseBufferToString = function(buf) {
return BufferUtil.reverse(buf).toString('hex');
};

View File

@ -30,7 +30,7 @@ var bitcoin = {
rpcpassword: rpcConfig.pass,
rpcport: rpcConfig.port,
zmqpubrawtx: 'tcp://127.0.0.1:38332',
zmqpubhashblock: 'tcp://127.0.0.1:38332'
zmqpubrawblock: 'tcp://127.0.0.1:38332'
},
datadir: bitcoinDataDir,
exec: 'bitcoind', //if this isn't on your PATH, then provide the absolute path, e.g. /usr/local/bin/bitcoind
@ -116,7 +116,7 @@ describe('Block Operations', function() {
it('should sync block hashes as keys and heights as values', function(done) {
async.timesLimit(opts.initialHeight + 1, 12, function(n, next) {
async.timesLimit(opts.initialHeight, 12, function(n, next) {
utils.queryBitcoreNode(Object.assign({
path: '/test/hash/' + n
}, bitcore.httpOpts), function(err, res) {
@ -141,7 +141,7 @@ describe('Block Operations', function() {
});
it('should sync block heights as keys and hashes as values', function(done) {
async.timesLimit(opts.initialHeight + 1, 12, function(n, next) {
async.timesLimit(opts.initialHeight, 12, function(n, next) {
utils.queryBitcoreNode(Object.assign({
path: '/test/height/' + self.hashes[n]
}, bitcore.httpOpts), function(err, res) {

View File

@ -1,7 +1,7 @@
'use strict';
var chai = require('chai');
var should = chai.should();
var expect = chai.expect;
var async = require('async');
var path = require('path');
var utils = require('./utils');
@ -83,6 +83,51 @@ var opts = {
var genesis = new Block(new Buffer(blocks.genesis, 'hex'));
var block1 = new Block(new Buffer(blocks.block1a, 'hex'));
var block2 = new Block(new Buffer(blocks.block1b, 'hex'));
var rawGenesis = blocks.genesis;
var rawBlock1 = blocks.block1a;
var rawBlock2 = blocks.block1b;
var genesisHash = genesis.hash;
var genesisHeader = {
height: 0,
hash: genesis.hash,
previousblockhash: new Array(65).join('0')
};
var block1Header = {
height: 1,
hash: block1.header.hash,
previousblockhash: BufferUtil.reverse(block1.header.prevHash).toString('hex')
};
var block2Header = {
height: 1,
hash: block2.header.hash,
previousblockhash: BufferUtil.reverse(block2.header.prevHash).toString('hex')
};
function publishBlockHash(rawBlockHex, callback) {
pubSocket.send([ 'rawblock', new Buffer(rawBlockHex, 'hex') ]);
var httpOpts = utils.getHttpOpts(opts, { path: '/info' });
// we don't know exactly when all the blockhandlers will complete after the "tip" event
// so we must wait an indeterminate time to check on the current tip
setTimeout(function() {
utils.queryBitcoreNode(httpOpts, function(err, res) {
if(err) {
return callback(err);
}
var block = Block.fromString(rawBlockHex);
expect(block.hash).equal(JSON.parse(res).dbhash);
callback();
});
}, 2000);
}
describe('DB Operations', function() {
@ -93,23 +138,11 @@ describe('DB Operations', function() {
var self = this;
var responses = [
genesis.hash,
{ height: 0, hash: genesis.hash },
genesis.hash,
blocks.genesis,
genesis.hash,
{ height: 0, hash: genesis.hash, previousblockhash: new Array(65).join('0') },
block1.hash,
blocks.block1a,
{ height: 1, hash: block1.header.hash, previousblockhash: BufferUtil.reverse(block1.header.prevHash).toString('hex') },
block2.hash,
blocks.block1b,
{ height: 1, hash: block2.header.hash, previousblockhash: BufferUtil.reverse(block2.header.prevHash).toString('hex') },
{ height: 1, hash: block1.header.hash, previousblockhash: BufferUtil.reverse(block1.header.prevHash).toString('hex') },
{ height: 1, hash: block2.header.hash, previousblockhash: BufferUtil.reverse(block2.header.prevHash).toString('hex') },
blocks.genesis,
blocks.block1b,
{ height: 1, hash: block1.header.hash, previousblockhash: BufferUtil.reverse(block2.header.prevHash).toString('hex') },
genesisHash,
genesisHeader,
rawGenesis,
block1Header,
block2Header
];
after(function(done) {
@ -135,9 +168,13 @@ describe('DB Operations', function() {
req.on('end', function() {
var body = JSON.parse(data);
console.log('request', body);
if (debug) {
console.log('request', body);
}
var response = JSON.stringify({ result: responses[responseCount++], count: responseCount });
console.log('response', response, 'id: ', body.id);
if (debug) {
console.log('response', response, 'id: ', body.id);
}
res.write(response);
res.end();
});
@ -156,9 +193,6 @@ describe('DB Operations', function() {
it('should reorg when needed', function(done) {
var block1a = '77d0b8043d3a1353ffd22ad70e228e30c15fd0f250d51d608b1b7997e6239ffb';
var block1b = '2e516187b1b58467cb138bf68ff00d9bda71b5487cdd7b9b9cfbe7b153cd59d4';
/*
_______________________________________________________
| | | | |
@ -175,14 +209,15 @@ describe('DB Operations', function() {
async.series([
publishBlockHash.bind(self, block1a),
publishBlockHash.bind(self, block1b)
publishBlockHash.bind(self, rawBlock1),
publishBlockHash.bind(self, rawBlock2)
], function(err) {
if(err) {
return done(err);
}
done();
});
@ -191,28 +226,5 @@ describe('DB Operations', function() {
});
function publishBlockHash(blockHash, callback) {
pubSocket.send([ 'hashblock', new Buffer(blockHash, 'hex') ]);
var httpOpts = utils.getHttpOpts(opts, { path: '/wallet-api/info' });
//we don't know exactly when all the blockhandlers will complete after the "tip" event
//so we must wait an indeterminate time to check on the current tip
setTimeout(function() {
utils.queryBitcoreNode(httpOpts, function(err, res) {
if(err) {
return callback(err);
}
blockHash.should.equal(JSON.parse(res).dbhash);
callback();
});
}, 2000);
}

View File

@ -1,3 +1,5 @@
'use strict';
var BaseService = require('../lib/service');
var inherits = require('util').inherits;