This commit is contained in:
Chris Kleeschulte 2017-05-23 08:53:54 -04:00
parent 6fc170f7da
commit b51179274f
8 changed files with 204 additions and 111 deletions

View File

@ -111,6 +111,10 @@ BlockHandler.prototype._setupStreams = function() {
.pipe(processSerial);
processSerial.on('finish', self._onFinish.bind(self));
self.block.once('reorg', function() {
blockStream.push(null);
});
};
BlockHandler.prototype._onFinish = function() {
@ -172,10 +176,6 @@ BlockStream.prototype._process = function() {
self.block.getBlocks(blockArgs, function(err, blocks) {
if(err) {
if (err === 'reorg') {
self.push(null);
return next();
}
return next(err);
}
@ -208,7 +208,7 @@ BlockStream.prototype._pushBlocks = function(blocks) {
for(var i = 0; i < blocks.length; i++) {
self.lastEmittedHash = blocks[i].hash;
log.debug('Pushing block: ' + blocks[i].hash + ' from the blockstream');
log.info('Pushing block: ' + blocks[i].hash + ' from the blockstream');
self.push(blocks[i]);
}

View File

@ -100,36 +100,6 @@ BlockService.prototype.getNetworkTipHash = function() {
return this.bitcoind.tiphash;
};
BlockService.prototype._getBlockOperations = function(obj) {
var self = this;
if (_.isArray(obj)) {
var ops = [];
_.forEach(obj, function(block) {
ops.push(self._getBlockOperations(block));
});
return _.flatten(ops);
}
var operations = [];
operations.push({
type: 'put',
key: self.encoding.encodeBlockHashKey(obj.hash),
value: self.encoding.encodeBlockHeightValue(obj.height)
});
operations.push({
type: 'put',
key: self.encoding.encodeBlockHeightKey(obj.height),
value: self.encoding.encodeBlockHashValue(obj.hash)
});
return operations;
};
BlockService.prototype.getBlockOperations = function(block, add, type, callback) {
var operations = [];
@ -210,7 +180,9 @@ BlockService.prototype.getBlocks = function(blockArgs, callback) {
return next(null, self.genesis);
}
next(null, self._rawBlockQueue.get(blockArg));
var cachedBlock = self._rawBlockQueue.get(blockArg);
console.log(cachedBlock);
next(null);
},
function(block, next) {
@ -374,7 +346,7 @@ BlockService.prototype._getBlocks = function(callback) {
return callback(err);
}
log.debug('Completed syncing block headers from the network.');
log.info('Completed syncing block headers from the network.');
callback(null, blocksDiff);
});
@ -423,7 +395,7 @@ BlockService.prototype._setHandlers = function() {
});
self._blockHandler.on('synced', function() {
log.debug('Synced: ' + self.tip.hash);
log.info('Synced: ' + self.tip.hash);
self._startSubscriptions();
});
};
@ -439,9 +411,6 @@ BlockService.prototype._detectStartupReorg = function(callback) {
self.getBlockHeader(hash, function(err, header) {
if (err) {
if (err.code === -5) {
return callback();
}
return callback(err);
}
@ -449,7 +418,7 @@ BlockService.prototype._detectStartupReorg = function(callback) {
return callback();
}
return self._handleReorg(header.hash, callback);
self._handleReorg(header.hash, callback);
});
};
@ -460,6 +429,7 @@ BlockService.prototype._handleReorg = function(hash, callback) {
self.printTipInfo('Reorg detected!');
self.reorg = true;
self.emit('reorg');
var reorg = new Reorg(self.node, self);
@ -666,4 +636,35 @@ BlockService.prototype._getReorgOperations = function(hash, height) {
};
BlockService.prototype._getBlockOperations = function(obj) {
var self = this;
if (_.isArray(obj)) {
var ops = [];
_.forEach(obj, function(block) {
ops.push(self._getBlockOperations(block));
});
return _.flatten(ops);
}
var operations = [];
operations.push({
type: 'put',
key: self.encoding.encodeBlockHashKey(obj.hash),
value: self.encoding.encodeBlockHeightValue(obj.height)
});
operations.push({
type: 'put',
key: self.encoding.encodeBlockHeightKey(obj.height),
value: self.encoding.encodeBlockHashValue(obj.hash)
});
return operations;
};
module.exports = BlockService;

View File

@ -5,19 +5,20 @@ var BaseService = require('../../service');
var inherits = require('util').inherits;
var LRU = require('lru-cache');
var utils = require('../../../lib/utils');
var bitcore = require('bitcore-lib');
function TimestampService(options) {
BaseService.call(this, options);
this.currentBlock = null;
this.currentTimestamp = null;
this._cache = LRU(50);
var genesis = self.node.services.block.genesis;
this._cache.set(genesis.hash, genesis.__height);
this._cache.set(new Array(65).join('0'), { time: 0 });
this._setHandlers();
}
inherits(TimestampService, BaseService);
TimestampService.dependencies = [ 'db' ];
TimestampService.dependencies = [ 'db', 'block' ];
TimestampService.prototype.start = function(callback) {
var self = this;
@ -34,13 +35,16 @@ TimestampService.prototype.start = function(callback) {
callback();
});
self.counter = 0;
};
TimestampService.prototype.stop = function(callback) {
setImmediate(callback);
};
TimestampService.prototype._setHandlers = function() {
var self = this;
};
TimestampService.prototype._processBlockHandlerQueue = function(block) {
var self = this;
@ -54,7 +58,7 @@ TimestampService.prototype._processBlockHandlerQueue = function(block) {
if (prev && !prev.prevHash) {
if (blockTime <= prev.time) {
blockTime++;
blockTime = prev.time + 1;
}
self._cache.del(prevHash);
@ -89,58 +93,60 @@ TimestampService.prototype.blockHandler = function(block, connectBlock, callback
var operations = [];
if (!queue.length < 1) {
return callback(null, []);
if (queue.length === 0) {
return callback(null, queue);
}
operations = operations.concat([
{
type: action,
key: self.encoding.encodeTimestampBlockKey(timestamp),
value: self.encoding.encodeTimestampBlockValue(block.header.hash)
},
{
type: action,
key: self.encoding.encodeBlockTimestampKey(block.header.hash),
value: self.encoding.encodeBlockTimestampValue(timestamp)
}
]);
for(var i = 0; i < queue.length; i++) {
var item = queue[i];
operations = operations.concat([
{
type: action,
key: self.encoding.encodeTimestampBlockKey(item.time),
value: self.encoding.encodeTimestampBlockValue(item.hash)
},
{
type: action,
key: self.encoding.encodeBlockTimestampKey(item.hash),
value: self.encoding.encodeBlockTimestampValue(item.time)
}
]);
}
callback(null, operations);
};
TimestampService.prototype.getBlockHeights = function(timestamps, callback) {
TimestampService.prototype.getTimestamp = function(hash, callback) {
this._getValue(hash, callback);
};
TimestampService.prototype.getHash = function(timestamp, callback) {
this._getValue(timestamp, callback);
};
TimestampService.prototype._getValue = function(key, callback) {
var self = this;
timestamps.sort();
timestamps = timestamps.map(function(timestamp) {
return timestamp >= MAINNET_BITCOIN_GENESIS_TIME ? timestamp : MAINNET_BITCOIN_GENESIS_TIME;
});
var start = self.encoding.encodeTimestampBlockKey(timestamps[0]);
var end = self.encoding.encodeTimestampBlockKey(timestamps[1]);
var stream = self.db.createReadStream({
gte: start,
lte: end
});
var keyBuf, fn;
var hashes = [];
var hashTuple = [];
var streamErr = null;
if (key.length === 64){
keyBuf = self.encoding.encodeBlockTimestampKey(key);
fn = self.encoding.decodeBlockTimestampValue;
} else {
keyBuf = self.encoding.encodeTimestampBlockKey(key);
fn = self.encoding.decodeTimestampBlockValue;
}
stream.on('data', function(data) {
hashes.push(self.encoding.decodeTimestampBlockValue(data.value));
});
self.db.get(keyBuf, function(err, value) {
stream.on('error', function(err) {
streamErr = err;
});
stream.on('end', function() {
if (!streamErr && hashes.length > 1) {
hashTuple = [ hashes[0], hashes[hashes.length - 1] ];
if (err) {
return callback(err);
}
callback(streamErr, hashTuple);
callback(null, fn(value));
});
};
module.exports = TimestampService;

View File

@ -282,7 +282,6 @@ WebService.prototype.transformHttpsOptions = function() {
WebService.prototype._endpointGetInfo = function() {
var self = this;
return function(req, res) {
console.log(self.node.services);
res.jsonp({
result: 'ok',
dbheight: self.node.services.block.tip.__height,

View File

@ -49,6 +49,7 @@ var bitcore = {
'db',
'web',
'block',
'timestamp',
'block-test'
],
servicesConfig: {
@ -118,7 +119,7 @@ describe('Block Operations', function() {
async.timesLimit(opts.initialHeight, 12, function(n, next) {
utils.queryBitcoreNode(Object.assign({
path: '/test/hash/' + n
path: '/test/block/hash/' + n
}, bitcore.httpOpts), function(err, res) {
if(err) {
@ -143,7 +144,7 @@ describe('Block Operations', function() {
it('should sync block heights as keys and hashes as values', function(done) {
async.timesLimit(opts.initialHeight, 12, function(n, next) {
utils.queryBitcoreNode(Object.assign({
path: '/test/height/' + self.hashes[n]
path: '/test/block/height/' + self.hashes[n]
}, bitcore.httpOpts), function(err, res) {
if(err) {

View File

@ -43,7 +43,8 @@ var bitcore = {
'db',
'web',
'block',
'reorg-test'
'reorg-test',
'timestamp'
],
servicesConfig: {
bitcoind: {

View File

@ -9,7 +9,7 @@ var TestWebService = function(options) {
inherits(TestWebService, BaseService);
TestWebService.dependencies = ['web', 'block'];
TestWebService.dependencies = ['web', 'block', 'timestamp'];
TestWebService.prototype.start = function(callback) {
callback();
@ -23,18 +23,30 @@ TestWebService.prototype.setupRoutes = function(app) {
var self = this;
app.get('/hash/:height', function(req, res) {
app.get('/block/hash/:height', function(req, res) {
self.node.services.block.getBlockHash(req.params.height, function(err, hash) {
res.status(200).jsonp({ hash: hash, height: parseInt(req.params.height) });
});
});
app.get('/height/:hash', function(req, res) {
app.get('/block/height/:hash', function(req, res) {
self.node.services.block.getBlockHeight(req.params.hash, function(err, height) {
res.status(200).jsonp({ hash: req.params.hash, height: height });
});
});
app.get('/timestamp/time/:hash', function(req, res) {
self.node.services.timestamp.getTimestamp(req.params.hash, function(err, timestamp) {
res.status(200).jsonp({ hash: req.params.hash, timestamp: timestamp });
});
});
app.get('/timestamp/hash/:time', function(req, res) {
self.node.services.timestamp.getHash(req.params.time, function(err, hash) {
res.status(200).jsonp({ hash: hash, timestamp: parseInt(req.params.time) });
});
});
};
TestWebService.prototype.getRoutePrefix = function() {

View File

@ -1,12 +1,11 @@
'use strict';
var chai = require('chai');
var should = chai.should();
var expect = chai.expect;
var async = require('async');
var BitcoinRPC = require('bitcoind-rpc');
var path = require('path');
var utils = require('./utils');
var crypto = require('crypto');
var debug = true;
var bitcoreDataDir = '/tmp/bitcore';
@ -49,7 +48,9 @@ var bitcore = {
'bitcoind',
'web',
'db',
'timestamp'
'timestamp',
'block',
'test-timestamp'
],
servicesConfig: {
bitcoind: {
@ -62,6 +63,9 @@ var bitcore = {
zmqpubrawtx: bitcoin.args.zmqpubrawtx
}
]
},
'test-timestamp': {
requirePath: path.resolve(__dirname + '/test_web.js')
}
}
}
@ -85,16 +89,7 @@ var opts = {
bitcoinDataDir: bitcoinDataDir,
bitcoreDataDir: bitcoreDataDir,
rpc: new BitcoinRPC(rpcConfig),
walletPassphrase: 'test',
txCount: 0,
blockHeight: 0,
walletPrivKeys: [],
initialTxs: [],
fee: 100000,
feesReceived: 0,
satoshisSent: 0,
walletId: crypto.createHash('sha256').update('test').digest('hex'),
satoshisReceived: 0,
initialHeight: 150
};
@ -112,15 +107,93 @@ describe('Timestamp Index', function() {
async.series([
utils.startBitcoind.bind(utils, self.opts),
utils.waitForBitcoinReady.bind(utils, self.opts),
utils.unlockWallet.bind(utils, self.opts),
utils.sendTxs.bind(utils, self.opts),
utils.startBitcoreNode.bind(utils, self.opts),
utils.waitForBitcoreNode.bind(utils, self.opts),
function(next) {
async.timesLimit(opts.initialHeight, 12, function(n, next) {
utils.queryBitcoreNode(Object.assign({
path: '/test/block/hash/' + n
}, bitcore.httpOpts), function(err, res) {
if(err) {
return done(err);
}
res = JSON.parse(res);
expect(res.height).to.equal(n);
expect(res.hash.length).to.equal(64);
next(null, res.hash);
});
}, function(err, hashes) {
if(err) {
return next(err);
}
self.hashes = hashes;
next();
});
}
], done);
});
it('should sync timestamps', function(done) {
done();
it('should sync block hashes as keys and timestamps as values', function(done) {
var lastTimestamp = 0;
async.mapLimit(self.hashes, 12, function(hash, next) {
utils.queryBitcoreNode(Object.assign({
path: '/test/timestamp/time/' + hash
}, bitcore.httpOpts), function(err, res) {
if(err) {
return next(err);
}
res = JSON.parse(res);
next(null, res.timestamp);
});
}, function(err, timestamps) {
if(err) {
return done(err);
}
timestamps.forEach(function(timestamp) {
expect(timestamp).to.be.above(lastTimestamp);
lastTimestamp = timestamp;
});
self.timestamps = timestamps;
done();
});
});
it('should sync block timestamps as keys and block hashes as values', function(done) {
async.eachOfLimit(self.timestamps, 12, function(timestamp, index, next) {
utils.queryBitcoreNode(Object.assign({
path: '/test/timestamp/hash/' + timestamp
}, bitcore.httpOpts), function(err, res) {
if(err) {
return done(err);
}
res = JSON.parse(res);
expect(res.hash).to.equal(self.hashes[index]);
expect(res.timestamp).to.equal(timestamp);
next();
});
}, function(err) {
if(err) {
return done(err);
}
done();
});
});
});