From b51179274f35aa46fe9b5c1e39602fd5e48542ec Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Tue, 23 May 2017 08:53:54 -0400 Subject: [PATCH] wip --- lib/services/block/block_handler.js | 10 +-- lib/services/block/index.js | 75 ++++++++++---------- lib/services/timestamp/index.js | 98 ++++++++++++++------------ lib/services/web/index.js | 1 - regtest/block.js | 5 +- regtest/db.js | 3 +- regtest/test_web.js | 18 ++++- regtest/timestamp.js | 105 +++++++++++++++++++++++----- 8 files changed, 204 insertions(+), 111 deletions(-) diff --git a/lib/services/block/block_handler.js b/lib/services/block/block_handler.js index 94d4cd1f..c6e98afd 100644 --- a/lib/services/block/block_handler.js +++ b/lib/services/block/block_handler.js @@ -111,6 +111,10 @@ BlockHandler.prototype._setupStreams = function() { .pipe(processSerial); processSerial.on('finish', self._onFinish.bind(self)); + + self.block.once('reorg', function() { + blockStream.push(null); + }); }; BlockHandler.prototype._onFinish = function() { @@ -172,10 +176,6 @@ BlockStream.prototype._process = function() { self.block.getBlocks(blockArgs, function(err, blocks) { if(err) { - if (err === 'reorg') { - self.push(null); - return next(); - } return next(err); } @@ -208,7 +208,7 @@ BlockStream.prototype._pushBlocks = function(blocks) { for(var i = 0; i < blocks.length; i++) { self.lastEmittedHash = blocks[i].hash; - log.debug('Pushing block: ' + blocks[i].hash + ' from the blockstream'); + log.info('Pushing block: ' + blocks[i].hash + ' from the blockstream'); self.push(blocks[i]); } diff --git a/lib/services/block/index.js b/lib/services/block/index.js index 4b537fbe..6dfba20d 100644 --- a/lib/services/block/index.js +++ b/lib/services/block/index.js @@ -100,36 +100,6 @@ BlockService.prototype.getNetworkTipHash = function() { return this.bitcoind.tiphash; }; -BlockService.prototype._getBlockOperations = function(obj) { - - var self = this; - - if (_.isArray(obj)) { - var ops = []; - _.forEach(obj, function(block) { - ops.push(self._getBlockOperations(block)); - }); - return _.flatten(ops); - } - - var operations = []; - - operations.push({ - type: 'put', - key: self.encoding.encodeBlockHashKey(obj.hash), - value: self.encoding.encodeBlockHeightValue(obj.height) - }); - - operations.push({ - type: 'put', - key: self.encoding.encodeBlockHeightKey(obj.height), - value: self.encoding.encodeBlockHashValue(obj.hash) - }); - - return operations; - -}; - BlockService.prototype.getBlockOperations = function(block, add, type, callback) { var operations = []; @@ -210,7 +180,9 @@ BlockService.prototype.getBlocks = function(blockArgs, callback) { return next(null, self.genesis); } - next(null, self._rawBlockQueue.get(blockArg)); + var cachedBlock = self._rawBlockQueue.get(blockArg); + console.log(cachedBlock); + next(null); }, function(block, next) { @@ -374,7 +346,7 @@ BlockService.prototype._getBlocks = function(callback) { return callback(err); } - log.debug('Completed syncing block headers from the network.'); + log.info('Completed syncing block headers from the network.'); callback(null, blocksDiff); }); @@ -423,7 +395,7 @@ BlockService.prototype._setHandlers = function() { }); self._blockHandler.on('synced', function() { - log.debug('Synced: ' + self.tip.hash); + log.info('Synced: ' + self.tip.hash); self._startSubscriptions(); }); }; @@ -439,9 +411,6 @@ BlockService.prototype._detectStartupReorg = function(callback) { self.getBlockHeader(hash, function(err, header) { if (err) { - if (err.code === -5) { - return callback(); - } return callback(err); } @@ -449,7 +418,7 @@ BlockService.prototype._detectStartupReorg = function(callback) { return callback(); } - return self._handleReorg(header.hash, callback); + self._handleReorg(header.hash, callback); }); }; @@ -460,6 +429,7 @@ BlockService.prototype._handleReorg = function(hash, callback) { self.printTipInfo('Reorg detected!'); self.reorg = true; + self.emit('reorg'); var reorg = new Reorg(self.node, self); @@ -666,4 +636,35 @@ BlockService.prototype._getReorgOperations = function(hash, height) { }; +BlockService.prototype._getBlockOperations = function(obj) { + + var self = this; + + if (_.isArray(obj)) { + var ops = []; + _.forEach(obj, function(block) { + ops.push(self._getBlockOperations(block)); + }); + return _.flatten(ops); + } + + var operations = []; + + operations.push({ + type: 'put', + key: self.encoding.encodeBlockHashKey(obj.hash), + value: self.encoding.encodeBlockHeightValue(obj.height) + }); + + operations.push({ + type: 'put', + key: self.encoding.encodeBlockHeightKey(obj.height), + value: self.encoding.encodeBlockHashValue(obj.hash) + }); + + return operations; + +}; + + module.exports = BlockService; diff --git a/lib/services/timestamp/index.js b/lib/services/timestamp/index.js index 5233b322..e27b8f3e 100644 --- a/lib/services/timestamp/index.js +++ b/lib/services/timestamp/index.js @@ -5,19 +5,20 @@ var BaseService = require('../../service'); var inherits = require('util').inherits; var LRU = require('lru-cache'); var utils = require('../../../lib/utils'); +var bitcore = require('bitcore-lib'); function TimestampService(options) { BaseService.call(this, options); this.currentBlock = null; this.currentTimestamp = null; this._cache = LRU(50); - var genesis = self.node.services.block.genesis; - this._cache.set(genesis.hash, genesis.__height); + this._cache.set(new Array(65).join('0'), { time: 0 }); + this._setHandlers(); } inherits(TimestampService, BaseService); -TimestampService.dependencies = [ 'db' ]; +TimestampService.dependencies = [ 'db', 'block' ]; TimestampService.prototype.start = function(callback) { var self = this; @@ -34,13 +35,16 @@ TimestampService.prototype.start = function(callback) { callback(); }); - self.counter = 0; }; TimestampService.prototype.stop = function(callback) { setImmediate(callback); }; +TimestampService.prototype._setHandlers = function() { + var self = this; +}; + TimestampService.prototype._processBlockHandlerQueue = function(block) { var self = this; @@ -54,7 +58,7 @@ TimestampService.prototype._processBlockHandlerQueue = function(block) { if (prev && !prev.prevHash) { if (blockTime <= prev.time) { - blockTime++; + blockTime = prev.time + 1; } self._cache.del(prevHash); @@ -89,58 +93,60 @@ TimestampService.prototype.blockHandler = function(block, connectBlock, callback var operations = []; - if (!queue.length < 1) { - return callback(null, []); + if (queue.length === 0) { + return callback(null, queue); } - operations = operations.concat([ - { - type: action, - key: self.encoding.encodeTimestampBlockKey(timestamp), - value: self.encoding.encodeTimestampBlockValue(block.header.hash) - }, - { - type: action, - key: self.encoding.encodeBlockTimestampKey(block.header.hash), - value: self.encoding.encodeBlockTimestampValue(timestamp) - } - ]); + for(var i = 0; i < queue.length; i++) { + + var item = queue[i]; + operations = operations.concat([ + { + type: action, + key: self.encoding.encodeTimestampBlockKey(item.time), + value: self.encoding.encodeTimestampBlockValue(item.hash) + }, + { + type: action, + key: self.encoding.encodeBlockTimestampKey(item.hash), + value: self.encoding.encodeBlockTimestampValue(item.time) + } + ]); + } callback(null, operations); + }; -TimestampService.prototype.getBlockHeights = function(timestamps, callback) { +TimestampService.prototype.getTimestamp = function(hash, callback) { + this._getValue(hash, callback); +}; + +TimestampService.prototype.getHash = function(timestamp, callback) { + this._getValue(timestamp, callback); +}; + +TimestampService.prototype._getValue = function(key, callback) { + var self = this; - timestamps.sort(); - timestamps = timestamps.map(function(timestamp) { - return timestamp >= MAINNET_BITCOIN_GENESIS_TIME ? timestamp : MAINNET_BITCOIN_GENESIS_TIME; - }); - var start = self.encoding.encodeTimestampBlockKey(timestamps[0]); - var end = self.encoding.encodeTimestampBlockKey(timestamps[1]); - var stream = self.db.createReadStream({ - gte: start, - lte: end - }); + var keyBuf, fn; - var hashes = []; - var hashTuple = []; - var streamErr = null; + if (key.length === 64){ + keyBuf = self.encoding.encodeBlockTimestampKey(key); + fn = self.encoding.decodeBlockTimestampValue; + } else { + keyBuf = self.encoding.encodeTimestampBlockKey(key); + fn = self.encoding.decodeTimestampBlockValue; + } - stream.on('data', function(data) { - hashes.push(self.encoding.decodeTimestampBlockValue(data.value)); - }); + self.db.get(keyBuf, function(err, value) { - stream.on('error', function(err) { - streamErr = err; - }); - - stream.on('end', function() { - if (!streamErr && hashes.length > 1) { - hashTuple = [ hashes[0], hashes[hashes.length - 1] ]; + if (err) { + return callback(err); } - callback(streamErr, hashTuple); + + callback(null, fn(value)); + }); - }; - module.exports = TimestampService; diff --git a/lib/services/web/index.js b/lib/services/web/index.js index 99558344..5e7d59a0 100644 --- a/lib/services/web/index.js +++ b/lib/services/web/index.js @@ -282,7 +282,6 @@ WebService.prototype.transformHttpsOptions = function() { WebService.prototype._endpointGetInfo = function() { var self = this; return function(req, res) { -console.log(self.node.services); res.jsonp({ result: 'ok', dbheight: self.node.services.block.tip.__height, diff --git a/regtest/block.js b/regtest/block.js index 31a1d549..13c43b88 100644 --- a/regtest/block.js +++ b/regtest/block.js @@ -49,6 +49,7 @@ var bitcore = { 'db', 'web', 'block', + 'timestamp', 'block-test' ], servicesConfig: { @@ -118,7 +119,7 @@ describe('Block Operations', function() { async.timesLimit(opts.initialHeight, 12, function(n, next) { utils.queryBitcoreNode(Object.assign({ - path: '/test/hash/' + n + path: '/test/block/hash/' + n }, bitcore.httpOpts), function(err, res) { if(err) { @@ -143,7 +144,7 @@ describe('Block Operations', function() { it('should sync block heights as keys and hashes as values', function(done) { async.timesLimit(opts.initialHeight, 12, function(n, next) { utils.queryBitcoreNode(Object.assign({ - path: '/test/height/' + self.hashes[n] + path: '/test/block/height/' + self.hashes[n] }, bitcore.httpOpts), function(err, res) { if(err) { diff --git a/regtest/db.js b/regtest/db.js index ebbd4671..57b16566 100644 --- a/regtest/db.js +++ b/regtest/db.js @@ -43,7 +43,8 @@ var bitcore = { 'db', 'web', 'block', - 'reorg-test' + 'reorg-test', + 'timestamp' ], servicesConfig: { bitcoind: { diff --git a/regtest/test_web.js b/regtest/test_web.js index 0e863b90..f2f28806 100644 --- a/regtest/test_web.js +++ b/regtest/test_web.js @@ -9,7 +9,7 @@ var TestWebService = function(options) { inherits(TestWebService, BaseService); -TestWebService.dependencies = ['web', 'block']; +TestWebService.dependencies = ['web', 'block', 'timestamp']; TestWebService.prototype.start = function(callback) { callback(); @@ -23,18 +23,30 @@ TestWebService.prototype.setupRoutes = function(app) { var self = this; - app.get('/hash/:height', function(req, res) { + app.get('/block/hash/:height', function(req, res) { self.node.services.block.getBlockHash(req.params.height, function(err, hash) { res.status(200).jsonp({ hash: hash, height: parseInt(req.params.height) }); }); }); - app.get('/height/:hash', function(req, res) { + app.get('/block/height/:hash', function(req, res) { self.node.services.block.getBlockHeight(req.params.hash, function(err, height) { res.status(200).jsonp({ hash: req.params.hash, height: height }); }); }); + app.get('/timestamp/time/:hash', function(req, res) { + self.node.services.timestamp.getTimestamp(req.params.hash, function(err, timestamp) { + res.status(200).jsonp({ hash: req.params.hash, timestamp: timestamp }); + }); + }); + + app.get('/timestamp/hash/:time', function(req, res) { + self.node.services.timestamp.getHash(req.params.time, function(err, hash) { + res.status(200).jsonp({ hash: hash, timestamp: parseInt(req.params.time) }); + }); + }); + }; TestWebService.prototype.getRoutePrefix = function() { diff --git a/regtest/timestamp.js b/regtest/timestamp.js index 925f9df5..857fb8d6 100644 --- a/regtest/timestamp.js +++ b/regtest/timestamp.js @@ -1,12 +1,11 @@ 'use strict'; var chai = require('chai'); -var should = chai.should(); +var expect = chai.expect; var async = require('async'); var BitcoinRPC = require('bitcoind-rpc'); var path = require('path'); var utils = require('./utils'); -var crypto = require('crypto'); var debug = true; var bitcoreDataDir = '/tmp/bitcore'; @@ -49,7 +48,9 @@ var bitcore = { 'bitcoind', 'web', 'db', - 'timestamp' + 'timestamp', + 'block', + 'test-timestamp' ], servicesConfig: { bitcoind: { @@ -62,6 +63,9 @@ var bitcore = { zmqpubrawtx: bitcoin.args.zmqpubrawtx } ] + }, + 'test-timestamp': { + requirePath: path.resolve(__dirname + '/test_web.js') } } } @@ -85,16 +89,7 @@ var opts = { bitcoinDataDir: bitcoinDataDir, bitcoreDataDir: bitcoreDataDir, rpc: new BitcoinRPC(rpcConfig), - walletPassphrase: 'test', - txCount: 0, blockHeight: 0, - walletPrivKeys: [], - initialTxs: [], - fee: 100000, - feesReceived: 0, - satoshisSent: 0, - walletId: crypto.createHash('sha256').update('test').digest('hex'), - satoshisReceived: 0, initialHeight: 150 }; @@ -112,15 +107,93 @@ describe('Timestamp Index', function() { async.series([ utils.startBitcoind.bind(utils, self.opts), utils.waitForBitcoinReady.bind(utils, self.opts), - utils.unlockWallet.bind(utils, self.opts), - utils.sendTxs.bind(utils, self.opts), utils.startBitcoreNode.bind(utils, self.opts), utils.waitForBitcoreNode.bind(utils, self.opts), + function(next) { + + async.timesLimit(opts.initialHeight, 12, function(n, next) { + utils.queryBitcoreNode(Object.assign({ + path: '/test/block/hash/' + n + }, bitcore.httpOpts), function(err, res) { + + if(err) { + return done(err); + } + res = JSON.parse(res); + expect(res.height).to.equal(n); + expect(res.hash.length).to.equal(64); + next(null, res.hash); + }); + }, function(err, hashes) { + + if(err) { + return next(err); + } + self.hashes = hashes; + next(); + + }); + + } ], done); }); - it('should sync timestamps', function(done) { - done(); + it('should sync block hashes as keys and timestamps as values', function(done) { + + var lastTimestamp = 0; + async.mapLimit(self.hashes, 12, function(hash, next) { + + utils.queryBitcoreNode(Object.assign({ + path: '/test/timestamp/time/' + hash + }, bitcore.httpOpts), function(err, res) { + + if(err) { + return next(err); + } + + res = JSON.parse(res); + next(null, res.timestamp); + }); + }, function(err, timestamps) { + + if(err) { + return done(err); + } + timestamps.forEach(function(timestamp) { + expect(timestamp).to.be.above(lastTimestamp); + lastTimestamp = timestamp; + }); + self.timestamps = timestamps; + done(); + + }); + }); + + it('should sync block timestamps as keys and block hashes as values', function(done) { + + async.eachOfLimit(self.timestamps, 12, function(timestamp, index, next) { + utils.queryBitcoreNode(Object.assign({ + path: '/test/timestamp/hash/' + timestamp + }, bitcore.httpOpts), function(err, res) { + + if(err) { + return done(err); + } + + res = JSON.parse(res); + expect(res.hash).to.equal(self.hashes[index]); + expect(res.timestamp).to.equal(timestamp); + next(); + }); + }, function(err) { + + if(err) { + return done(err); + } + done(); + + }); + }); });