From d9efd7bc02959c7ed73dcca78bb2a937f7f3a7f1 Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Mon, 1 May 2017 16:01:26 -0400 Subject: [PATCH] wip --- lib/services/bitcoind/index.js | 336 +++++++++++--------------- lib/services/db/index.js | 257 +++++++++----------- lib/services/db/reorg.js | 19 +- lib/services/db/sync.js | 220 ++++++++--------- lib/services/timestamp/index.js | 2 +- lib/services/wallet-api/index.js | 141 ++++------- regtest/db.js | 215 ++++++++++++++++ regtest/utils.js | 206 +++++++++------- regtest/wallet.js | 144 +++++++---- test/data/blocks.json | 5 + test/services/db/index.unit.js | 56 +++++ test/services/db/reorg.integration.js | 56 +++++ test/services/db/reorg.unit.js | 10 +- 13 files changed, 969 insertions(+), 698 deletions(-) create mode 100644 regtest/db.js create mode 100644 test/data/blocks.json create mode 100644 test/services/db/index.unit.js create mode 100644 test/services/db/reorg.integration.js diff --git a/lib/services/bitcoind/index.js b/lib/services/bitcoind/index.js index d5043c51..074131e2 100644 --- a/lib/services/bitcoind/index.js +++ b/lib/services/bitcoind/index.js @@ -5,13 +5,14 @@ var bitcore = require('bitcore-lib'); var zmq = require('zmq'); var async = require('async'); var BitcoinRPC = require('bitcoind-rpc'); -var $ = bitcore.util.preconditions; var _ = bitcore.deps._; var index = require('../../'); var errors = index.errors; var log = index.log; var Service = require('../../service'); +var LRU = require('lru-cache'); + function Bitcoin(options) { if (!(this instanceof Bitcoin)) { @@ -35,6 +36,8 @@ function Bitcoin(options) { this.on('error', function(err) { log.error(err.stack); }); + + this._hashBlockCache = LRU(100); } util.inherits(Bitcoin, Service); @@ -110,10 +113,39 @@ Bitcoin.prototype._wrapRPCError = function(errObj) { return err; }; -Bitcoin.prototype._initChain = function(callback) { +Bitcoin.prototype._getGenesisBlock = function(callback) { + + var self = this; + + self.client.getBlockHash(0, function(err, response) { + + if (err) { + return callback(self._wrapRPCError(err)); + } + + var blockhash = response.result; + + self.getRawBlock(blockhash, function(err, blockBuffer) { + + if (err) { + return callback(err); + } + + self.genesisBuffer = blockBuffer; + callback(); + + }); + + }); + +}; + +Bitcoin.prototype._getNetworkTip = function(callback) { + var self = this; self.client.getBestBlockHash(function(err, response) { + if (err) { return callback(self._wrapRPCError(err)); } @@ -121,76 +153,68 @@ Bitcoin.prototype._initChain = function(callback) { self.tiphash = response.result; self.client.getBlock(response.result, function(err, response) { + if (err) { return callback(self._wrapRPCError(err)); } self.height = response.result.height; + callback(); - self.client.getBlockHash(0, function(err, response) { - if (err) { - return callback(self._wrapRPCError(err)); - } - var blockhash = response.result; - self.getRawBlock(blockhash, function(err, blockBuffer) { - if (err) { - return callback(err); - } - self.genesisBuffer = blockBuffer; - self.emit('ready'); - log.info('Bitcoin Daemon Ready'); - callback(); - }); - }); }); + }); + }; -Bitcoin.prototype._zmqBlockHandler = function(node, message) { +Bitcoin.prototype._initChain = function(callback) { + var self = this; - self._rapidProtectedUpdateTip(node, message); + + async.series([ + + self._getNetworkTip.bind(self), + self._getGenesisBlock.bind(self), + + ], function(err) { + + if(err) { + return callback(err); + } + + self.emit('ready'); + callback(); + + }); + +}; + +Bitcoin.prototype._zmqBlockHandler = function(message) { + + var self = this; + + var hashBlockHex = message.toString('hex'); + + if (!self._isSendableHashBlock(hashBlockHex)) { + return; + } + + self._hashBlockCache.set(hashBlockHex); + + self.tiphash = hashBlockHex; + self.emit('block', message); for (var i = 0; i < this.subscriptions.hashblock.length; i++) { - this.subscriptions.hashblock[i].emit('bitcoind/hashblock', message.toString('hex')); + this.subscriptions.hashblock[i].emit('bitcoind/hashblock', hashBlockHex); } + }; -Bitcoin.prototype._rapidProtectedUpdateTip = function(node, message) { - var self = this; - - if (new Date() - self.lastTip > 1000) { - self.lastTip = new Date(); - self._updateTip(node, message); - } else { - clearTimeout(self.lastTipTimeout); - self.lastTipTimeout = setTimeout(function() { - self._updateTip(node, message); - }, 1000); - } +Bitcoin.prototype._isSendableHashBlock = function(hashBlockHex) { + return hashBlockHex.length === 64 && !this._hashBlockCache.get(hashBlockHex); }; -Bitcoin.prototype._updateTip = function(node, message) { - var self = this; - - var hex = message.toString('hex'); - if (hex !== self.tiphash) { - self.tiphash = message.toString('hex'); - - node.client.getBlock(self.tiphash, function(err, response) { - if (err) { - var error = self._wrapRPCError(err); - self.emit('error', error); - } else { - self.height = response.result.height; - $.checkState(self.height >= 0); - self.emit('tip', self.height); - } - }); - } -}; - - Bitcoin.prototype._zmqTransactionHandler = function(node, message) { var self = this; self.emit('tx', message); @@ -199,55 +223,6 @@ Bitcoin.prototype._zmqTransactionHandler = function(node, message) { } }; -Bitcoin.prototype._checkSyncedAndSubscribeZmqEvents = function(node) { - var self = this; - var interval; - - function checkAndSubscribe(callback) { - node.client.getBestBlockHash(function(err, response) { - if (err) { - return callback(self._wrapRPCError(err)); - } - var blockhash = new Buffer(response.result, 'hex'); - self.emit('block', blockhash); - self._updateTip(node, blockhash); - - node.client.getBlockchainInfo(function(err, response) { - if (err) { - return callback(self._wrapRPCError(err)); - } - var progress = response.result.verificationprogress; - if (progress >= self.zmqSubscribeProgress) { - self._subscribeZmqEvents(node); - clearInterval(interval); - callback(null, true); - } else { - callback(null, false); - } - }); - }); - } - - checkAndSubscribe(function(err, synced) { - if (err) { - log.error(err); - } - if (!synced) { - interval = setInterval(function() { - if (self.node.stopping) { - return clearInterval(interval); - } - checkAndSubscribe(function(err) { - if (err) { - log.error(err); - } - }); - }, node._tipUpdateInterval || Bitcoin.DEFAULT_TIP_UPDATE_INTERVAL); - } - }); - -}; - Bitcoin.prototype._subscribeZmqEvents = function(node) { var self = this; node.zmqSubSocket.subscribe('hashblock'); @@ -257,7 +232,7 @@ Bitcoin.prototype._subscribeZmqEvents = function(node) { if (topicString === 'rawtx') { self._zmqTransactionHandler(node, message); } else if (topicString === 'hashblock') { - self._zmqBlockHandler(node, message); + self._zmqBlockHandler(message); } }); }; @@ -293,105 +268,44 @@ Bitcoin.prototype._initZmqSubSocket = function(node, zmqUrl) { } }; -Bitcoin.prototype._loadTipFromNode = function(node, callback) { - var self = this; - node.client.getBestBlockHash(function(err, response) { - if (err && err.code === -28) { - log.warn(err.message); - return callback(self._wrapRPCError(err)); - } else if (err) { - return callback(self._wrapRPCError(err)); - } - node.client.getBlock(response.result, function(err, response) { - if (err) { - return callback(self._wrapRPCError(err)); - } - - self.height = response.result.height; - $.checkState(self.height >= 0); - self.emit('tip', self.height); - callback(); - }); - }); -}; - -Bitcoin.prototype._connectProcess = function(config, callback) { +Bitcoin.prototype._connectProcess = function(config) { var self = this; var node = {}; - var exitShutdown = false; - async.retry({times: self.startRetryTimes, interval: self.startRetryInterval}, function(done) { - if (self.node.stopping) { - exitShutdown = true; - return done(); - } - - node.client = new BitcoinRPC({ - protocol: config.rpcprotocol || 'http', - host: config.rpchost || '127.0.0.1', - port: config.rpcport, - user: config.rpcuser, - pass: config.rpcpassword, - rejectUnauthorized: _.isUndefined(config.rpcstrict) ? true : config.rpcstrict - }); - - self._loadTipFromNode(node, done); - - }, function(err) { - if (err) { - return callback(err); - } - if (exitShutdown) { - return callback(new Error('Stopping while trying to connect to bitcoind.')); - } - - self._initZmqSubSocket(node, config.zmqpubrawtx); - self._subscribeZmqEvents(node); - - callback(null, node); + node.client = new BitcoinRPC({ + protocol: config.rpcprotocol || 'http', + host: config.rpchost || '127.0.0.1', + port: config.rpcport, + user: config.rpcuser, + pass: config.rpcpassword, + rejectUnauthorized: _.isUndefined(config.rpcstrict) ? true : config.rpcstrict }); + + self._initZmqSubSocket(node, config.zmqpubrawtx); + self._subscribeZmqEvents(node); + + return node; }; Bitcoin.prototype.start = function(callback) { + var self = this; - async.series([ - function(next) { - if (self.options.spawn) { - self._spawnChildProcess(function(err, node) { - if (err) { - return next(err); - } - self.nodes.push(node); - next(); - }); - } else { - next(); - } - }, - function(next) { - if (self.options.connect) { - async.map(self.options.connect, self._connectProcess.bind(self), function(err, nodes) { - if (err) { - return callback(err); - } - for(var i = 0; i < nodes.length; i++) { - self.nodes.push(nodes[i]); - } - next(); - }); - } else { - next(); - } - } - ], function(err) { - if (err) { - return callback(err); - } - if (self.nodes.length === 0) { - return callback(new Error('Bitcoin configuration options "spawn" or "connect" are expected')); - } - self._initChain(callback); + if (!self.options.connect) { + throw new Error('A "connect" array is required in the bitcoind service configuration.'); + } + + self.nodes = self.options.connect.map(self._connectProcess.bind(self)); + + if (self.nodes.length === 0) { + throw new Error('Could not connect to any servers in connect array.'); + } + + self._initChain(function() { + + log.info('Bitcoin Daemon Ready'); + callback(); + }); }; @@ -438,6 +352,42 @@ Bitcoin.prototype.getRawBlock = function(blockArg, callback) { self._maybeGetBlockHash(blockArg, queryBlock); }; +Bitcoin.prototype.getBlockHeader = function(blockArg, callback) { + var self = this; + + function queryHeader(err, blockhash) { + if (err) { + return callback(err); + } + self._tryAllClients(function(client, done) { + client.getBlockHeader(blockhash, function(err, response) { + if (err) { + return done(self._wrapRPCError(err)); + } + var result = response.result; + var header = { + hash: result.hash, + version: result.version, + confirmations: result.confirmations, + height: result.height, + chainWork: result.chainwork, + prevHash: result.previousblockhash, + nextHash: result.nextblockhash, + merkleRoot: result.merkleroot, + time: result.time, + medianTime: result.mediantime, + nonce: result.nonce, + bits: result.bits, + difficulty: result.difficulty + }; + done(null, header); + }); + }, callback); + } + + self._maybeGetBlockHash(blockArg, queryHeader); +}; + Bitcoin.prototype.getBlock = function(blockArg, callback) { var self = this; diff --git a/lib/services/db/index.js b/lib/services/db/index.js index 011df27e..19ee431b 100644 --- a/lib/services/db/index.js +++ b/lib/services/db/index.js @@ -16,6 +16,7 @@ var log = index.log; var Service = require('../../service'); var Sync = require('./sync'); var Reorg = require('./reorg'); +var Block = bitcore.Block; /* * @param {Object} options @@ -63,6 +64,7 @@ function DB(options) { this._sync = new Sync(this.node, this); this.syncing = true; + this.bitcoind = this.node.services.bitcoind; } util.inherits(DB, Service); @@ -129,63 +131,24 @@ DB.prototype.start = function(callback) { mkdirp.sync(this.dataPath); } - self.genesis = Block.fromBuffer(self.node.services.bitcoind.genesisBuffer); + self.genesis = Block.fromBuffer(self.bitcoind.genesisBuffer); self.store = levelup(self.dataPath, { db: self.levelupStore, keyEncoding: 'binary', valueEncoding: 'binary'}); - self._sync.on('error', function(err) { - self.syncing = false; - log.error(err); - }); - - self._sync.on('reorg', function(block) { - log.warn('Reorg detected! Tip: ' + self.tip.hash + - ' Concurrent tip: ' + self.concurrentTip.hash + - ' Bitcoind tip: ' + self.node.services.bitcoind.tiphash); - - self.reorg = true; - - var reorg = new Reorg(self.node, self); - reorg.handleReorg(block, function(err) { - if(err) { - log.error('Reorg failed! ' + err); - return self.node.stop(function() {}); - } - - log.warn('Reorg successful! Tip: ' + self.tip.hash + - ' Concurrent tip: ' + self.concurrentTip.hash + - ' Bitcoind tip: ' + self.node.services.bitcoind.tiphash - ); - - self.reorg = false; - self._sync.sync(); - }); - }); - - self._sync.once('synced', function() { - - self.syncing = false; - - self.node.services.bitcoind.on('tip', function(height) { - log.info('New tip at height: ' + height + ' hash: ' + self.node.services.bitcoind.tiphash); - self._sync.sync(); - }); - - log.info('Initial sync complete'); - }); - self.node.on('stopping', function() { - self._sync.stop(); }); self.node.once('ready', function() { - function finish(err) { + self.loadTips(function(err) { + if(err) { throw err; } - self._sync.initialSync(); - } - self.loadTip(self.loadConcurrentTip.bind(self, finish)); + + self._sync.sync(); + + }); + }); setImmediate(function() { @@ -194,6 +157,81 @@ DB.prototype.start = function(callback) { }; + +DB.prototype.detectReorg = function(blocks) { + + var self = this; + + if (!blocks || blocks.length === 0) { + return; + } + + var tipHash = self.reorgTipHash || self.tip.hash; + var chainMembers = []; + + var loopIndex = 0; + var overallCounter = 0; + + while(overallCounter < blocks.length) { + + if (loopIndex >= blocks.length) { + overallCounter++; + loopIndex = 0; + } + + var prevHash = BufferUtil.reverse(blocks[loopIndex].header.prevHash).toString('hex'); + if (prevHash === tipHash) { + tipHash = blocks[loopIndex].hash; + chainMembers.push(blocks[loopIndex]); + } + loopIndex++; + + } + + for(var i = 0; i < blocks.length; i++) { + if (chainMembers.indexOf(blocks[i]) === -1) { + return blocks[i]; + } + self.reorgTipHash = blocks[i].hash; + } + +}; + +DB.prototype.handleReorg = function(forkBlock, callback) { + + var self = this; + self.printTipInfo('Reorg detected!'); + + self.reorg = true; + + var reorg = new Reorg(self.node, self); + + reorg.handleReorg(forkBlock.hash, function(err) { + + if(err) { + log.error('Reorg failed! ' + err); + self.node.stop(function() {}); + throw err; + } + + self.printTipInfo('Reorg successful!'); + self.reorg = false; + callback(); + + }); + +}; + +DB.prototype.printTipInfo = function(prependedMessage) { + + log.info( + prependedMessage + ' Serial Tip: ' + this.tip.hash + + ' Concurrent tip: ' + this.concurrentTip.hash + + ' Bitcoind tip: ' + this.bitcoind.tiphash + ); + +}; + DB.prototype.stop = function(callback) { var self = this; async.whilst(function() { @@ -201,7 +239,9 @@ DB.prototype.stop = function(callback) { }, function(next) { setTimeout(next, 10); }, function() { - self.store.close(callback); + if (self.store) { + self.store.close(callback); + } }); }; @@ -213,106 +253,41 @@ DB.prototype.getAPIMethods = function() { return []; }; -DB.prototype.loadTip = function(callback) { +DB.prototype.loadTips = function(callback) { + var self = this; - self.store.get(self.dbPrefix + 'tip', self.dbOptions, function(err, tipData) { + var tipStrings = ['tip', 'concurrentTip']; - if(err && err instanceof levelup.errors.NotFoundError) { + async.each(tipStrings, function(tip, next) { - self.tip = self.genesis; - self.tip.__height = 0; + self.store.get(self.dbPrefix + tip, self.dbOptions, function(err, tipData) { - self.connectBlock(self.genesis, function(err) { - if(err) { - return callback(err); - } - - self.emit('addblock', self.genesis); - callback(); - }); - - } else if(err) { - return callback(err); - } else { - - var hash = tipData.slice(0, 32).toString('hex'); - var height = tipData.readUInt32BE(32); - - var times = 0; - async.retry({times: 3, interval: self.retryInterval}, function(done) { - self.node.services.bitcoind.getBlock(hash, function(err, tip) { - if(err) { - times++; - log.warn('Bitcoind does not have our tip (' + hash + '). Bitcoind may have crashed and needs to catch up.'); - if(times < 3) { - log.warn('Retrying in ' + (self.retryInterval / 1000) + ' seconds.'); - } - return done(err); - } - - done(null, tip); - }); - }, function(err, tip) { - if(err) { - log.warn('Giving up after 3 tries. Please report this bug to https://github.com/bitpay/bitcore-node/issues'); - log.warn('Please reindex your database.'); - return callback(err); - } - - tip.__height = height; - self.tip = tip; - - callback(); - }); - } - }); -}; - -DB.prototype.loadConcurrentTip = function(callback) { - var self = this; - - self.store.get(self.dbPrefix + 'concurrentTip', self.dbOptions, function(err, tipData) { - - if (err instanceof levelup.errors.NotFoundError) { - self.concurrentTip = self.genesis; - self.concurrentTip.__height = 0; - return callback(); - } else if (err) { - return callback(err); - } - - var hash = tipData.slice(0, 32).toString('hex'); - var height = tipData.readUInt32BE(32); - - var times = 0; - async.retry({times: 3, interval: self.retryInterval}, function(done) { - self.node.services.bitcoind.getBlock(hash, function(err, concurrentTip) { - if(err) { - times++; - log.warn('Bitcoind does not have our concurrentTip (' + hash + ').' + - ' Bitcoind may have crashed and needs to catch up.'); - if(times < 3) { - log.warn('Retrying in ' + (self.retryInterval / 1000) + ' seconds.'); - } - return done(err); - } - - done(null, concurrentTip); - }); - }, function(err, concurrentTip) { - if(err) { - log.warn('Giving up after 3 tries. Please report this bug to https://github.com/bitpay/bitcore-node/issues'); - log.warn('Please reindex your database.'); - return callback(err); + if(err && !(err instanceof levelup.errors.NotFoundError)) { + return next(err); } - concurrentTip.__height = height; - self.concurrentTip = concurrentTip; + var height, hash; + //genesis block, set to -1 because we have no yet processed the blocks + if (!tipData) { + height = -1; + hash = new Array(65).join('0'); + } else { + height = tipData.readUInt32BE(32); + hash = tipData.slice(0, 32).toString('hex'); + } - callback(); + self[tip] = { + hash: hash, + height: height, + '__height': height //to be consistent with real blocks + }; + + next(); }); - }); + + }, callback); + }; DB.prototype.getPublishEvents = function() { @@ -437,10 +412,6 @@ DB.prototype.getSerialBlockOperations = function(block, add, callback) { async.eachSeries( this.node.services, function(mod, next) { -//console.log('s***********************'); -//console.log('here'); -//console.log(mod.name, block.__height); -//console.log('e***********************'); if(mod.blockHandler) { $.checkArgument(typeof mod.blockHandler === 'function', 'blockHandler must be a function'); diff --git a/lib/services/db/reorg.js b/lib/services/db/reorg.js index 68e44d5e..d36151ac 100644 --- a/lib/services/db/reorg.js +++ b/lib/services/db/reorg.js @@ -10,7 +10,7 @@ function Reorg(node, db) { this.db = db; } -Reorg.prototype.handleReorg = function(block, callback) { +Reorg.prototype.handleReorg = function(newBlockHash, callback) { var self = this; self.handleConcurrentReorg(function(err) { @@ -18,7 +18,7 @@ Reorg.prototype.handleReorg = function(block, callback) { return callback(err); } - self.findCommonAncestorAndNewHashes(self.db.tip, block, function(err, commonAncestor, newHashes) { + self.findCommonAncestorAndNewHashes(self.db.tip.hash, newBlockHash, function(err, commonAncestor, newHashes) { if(err) { return callback(err); } @@ -27,7 +27,6 @@ Reorg.prototype.handleReorg = function(block, callback) { if(err) { return callback(err); } - self.fastForwardBothTips(newHashes, callback); }); }); @@ -41,7 +40,7 @@ Reorg.prototype.handleConcurrentReorg = function(callback) { return callback(); } - self.findCommonAncestorAndNewHashes(self.db.concurrentTip, self.db.tip, function(err, commonAncestor, newHashes) { + self.findCommonAncestorAndNewHashes(self.db.concurrentTip.hash, self.db.tip.hash, function(err, commonAncestor, newHashes) { if(err) { return callback(err); } @@ -137,7 +136,6 @@ Reorg.prototype.rewindBothTips = function(commonAncestor, callback) { if(err) { return next(err); } - operations.push(self.db.getConcurrentTipOperation(self.db.concurrentTip, false)); next(null, operations); }); @@ -148,7 +146,7 @@ Reorg.prototype.rewindBothTips = function(commonAncestor, callback) { return next(err); } - operations.push(self.db.getTipOperation(self.db.concurrentTip, false)); + operations.push(self.db.getTipOperation(self.db.tip, false)); next(null, operations); }); } @@ -237,11 +235,11 @@ Reorg.prototype.fastForwardBothTips = function(newHashes, callback) { }, callback); }; -Reorg.prototype.findCommonAncestorAndNewHashes = function(oldTip, newTip, callback) { +Reorg.prototype.findCommonAncestorAndNewHashes = function(oldTipHash, newTipHash, callback) { var self = this; - var mainPosition = oldTip.hash; - var forkPosition = newTip.hash; + var mainPosition = oldTipHash; + var forkPosition = newTipHash; var mainHashesMap = {}; var forkHashesMap = {}; @@ -275,7 +273,6 @@ Reorg.prototype.findCommonAncestorAndNewHashes = function(oldTip, newTip, callba } else { mainPosition = null; } - next(); }); }, @@ -344,4 +341,4 @@ Reorg.prototype.findCommonAncestorAndNewHashes = function(oldTip, newTip, callba ); }; -module.exports = Reorg; \ No newline at end of file +module.exports = Reorg; diff --git a/lib/services/db/sync.js b/lib/services/db/sync.js index 073fee51..7f959ac4 100644 --- a/lib/services/db/sync.js +++ b/lib/services/db/sync.js @@ -6,23 +6,25 @@ var inherits = require('util').inherits; var EventEmitter = require('events').EventEmitter; var async = require('async'); var bitcore = require('bitcore-lib'); -var BufferUtil = bitcore.util.buffer; +var Block = bitcore.Block; var ProgressBar = require('progress'); var index = require('../../index'); var log = index.log; var green = '\u001b[42m \u001b[0m'; var red = '\u001b[41m \u001b[0m'; -function BlockStream(highWaterMark, bitcoind, dbTip) { +function BlockStream(highWaterMark, db, sync) { Readable.call(this, {objectMode: true, highWaterMark: highWaterMark}); - this.bitcoind = bitcoind; - this.dbTip = dbTip; - this.lastReadHeight = dbTip.__height; - this.lastEmittedHash = dbTip.hash; + this.sync = sync; + this.db = db; + this.dbTip = this.db.tip; + this.lastReadHeight = this.dbTip.__height; + this.lastEmittedHash = this.dbTip.hash; this.stopping = false; this.queue = []; this.processing = false; this.syncing = true; + this.bitcoind = this.db.bitcoind; } inherits(BlockStream, Readable); @@ -73,30 +75,29 @@ function Sync(node, db) { inherits(Sync, EventEmitter); -Sync.prototype.initialSync = function() { +Sync.prototype.sync = function() { var self = this; - if(this.syncing || this.db.reorg) { + if(this.syncing) { return; } self.syncing = true; - self.blockStream = new BlockStream(self.highWaterMark, self.node.services.bitcoind, self.db.tip); + var blockStream = new BlockStream(self.highWaterMark, self.db, self); var processConcurrent = new ProcessConcurrent(self.highWaterMark, self.db); var writeStream = new WriteStream(self.highWaterMark, self.db); var processSerial = new ProcessSerial(self.highWaterMark, self.db, self.db.tip); - self._handleErrors(self.blockStream); + self._handleErrors(blockStream); self._handleErrors(processConcurrent); self._handleErrors(processSerial); self._handleErrors(writeStream); - - self.blockStream + blockStream .pipe(processConcurrent) .pipe(writeStream); - self.blockStream + blockStream .pipe(processSerial); self.lastReportedBlock = self.db.tip.__height; @@ -108,21 +109,52 @@ Sync.prototype.initialSync = function() { clear: true }); - var timer = setInterval(self.reportStatus.bind(this), 1000); + self.progressBarTimer = setInterval(self.reportStatus.bind(self), 1000); - processSerial.on('finish', function() { - self.syncing = false; - if (self.progressBar) { - self.progressBar.terminate(); - } - if (timer) { - clearInterval(timer); - } - self.emit('synced'); - }); + processSerial.on('finish', self._onFinish.bind(self)); }; +Sync.prototype._onFinish = function() { + + var self = this; + self.syncing = false; + + if (self.progressBar) { + self.progressBar.terminate(); + } + + if (self.progressBarTimer) { + clearInterval(self.progressBarTimer); + } + + if (self.forkBlock) { + self.db.handleReorg(self.forkBlock, function() { + self.forkBlock = null; + self.sync(); + }); + return; + } + + self._startSubscriptions(); + log.info('Sync complete'); + +}; + +Sync.prototype._startSubscriptions = function() { + + var self = this; + if (!self.subscribed) { + self.subscribed = true; + self.bus = self.node.openBus({remoteAddress: 'localhost'}); + + self.bus.on('bitcoind/hashblock', function(hashBlockHex) { + self.sync(); + }); + self.bus.subscribe('bitcoind/hashblock'); + } +}; + Sync.prototype.reportStatus = function() { if (process.stderr.isTTY) { var tick = this.db.tip.__height - this.lastReportedBlock; @@ -133,47 +165,11 @@ Sync.prototype.reportStatus = function() { } }; -Sync.prototype.sync = function() { - var self = this; - if(this.syncing || this.db.reorg) { - return; - } - - this.syncing = true; - this.blockStream = new BlockStream(this.highWaterMark, this.node.services.bitcoind, this.db.tip); - var processBoth = new ProcessBoth(this.highWaterMark, this.db); - - this._handleErrors(this.blockStream); - this._handleErrors(processBoth); - - processBoth.on('finish', function() { - self.syncing = false; - }); - - this.blockStream - .pipe(processBoth); -}; - -Sync.prototype.stop = function() { - if(this.blockStream) { - this.blockStream.stopping = true; - } -}; - Sync.prototype._handleErrors = function(stream) { var self = this; - stream.on('error', function(err, block) { + stream.on('error', function(err) { self.syncing = false; - - if(err.reorg) { - return self.emit('reorg', block); - } - - if(err.reorg2) { - return; - } - self.emit('error', err); }); }; @@ -203,37 +199,10 @@ BlockStream.prototype._process = function() { return self.queue.length; }, function(next) { - var heights = self.queue.slice(0, Math.min(5, self.queue.length)); - self.queue = self.queue.slice(heights.length); - async.map(heights, function(height, next) { + var blockArgs = self.queue.slice(0, Math.min(5, self.queue.length)); + self.queue = self.queue.slice(blockArgs.length); + self._getBlocks(blockArgs, next); - self.bitcoind.getBlock(height, function(err, block) { - - if(err) { - return next(err); - } - - block.__height = height; - - setTimeout(function() { - next(null, block); - }, 1); - }); - - }, function(err, blocks) { - if(err) { - return next(err); - } - - for(var i = 0; i < blocks.length; i++) { - - self.lastEmittedHash = blocks[i].hash; - self.push(blocks[i]); - - } - - next(); - }); }, function(err) { if(err) { return self.emit('error', err); @@ -244,6 +213,56 @@ BlockStream.prototype._process = function() { }; +BlockStream.prototype._getBlocks = function(heights, callback) { + + var self = this; + async.map(heights, function(height, next) { + + if (height === 0) { + var block = new Block(self.bitcoind.genesisBuffer); + block.__height = 0; + return next(null, block); + } + + self.bitcoind.getBlock(height, function(err, block) { + + if(err) { + return next(err); + } + + block.__height = height; + next(null, block); + }); + + + }, function(err, blocks) { + + if(err) { + return callback(err); + } + + //at this point, we know that all blocks we've sent down the pipe + //have not been reorg'ed, but the new batch here might have been + self.sync.forkBlock = self.db.detectReorg(blocks); + + if (!self.sync.forkBlock) { + + for(var i = 0; i < blocks.length; i++) { + + self.lastEmittedHash = blocks[i].hash; + self.push(blocks[i]); + + } + + return callback(); + + } + + self.push(null); + + }); +}; + ProcessSerial.prototype._write = function(block, enc, callback) { var self = this; @@ -251,13 +270,6 @@ ProcessSerial.prototype._write = function(block, enc, callback) { return self.db.concurrentTip.__height >= block.__height; } - var prevHash = BufferUtil.reverse(block.header.prevHash).toString('hex'); - if(prevHash !== self.tip.hash) { - var err = new Error('Reorg detected'); - err.reorg = true; - return self.emit('error', err, block); - } - if(check()) { return self._process(block, callback); } @@ -343,12 +355,6 @@ ProcessConcurrent.prototype._flush = function(callback) { WriteStream.prototype._write = function(obj, enc, callback) { var self = this; - if(self.db.reorg) { - var err = new Error('reorg in process'); - err.reorg2 = true; - return callback(err); - } - self.db.store.batch(obj.operations, function(err) { if(err) { return callback(err); @@ -364,12 +370,6 @@ WriteStream.prototype._write = function(obj, enc, callback) { ProcessBoth.prototype._write = function(block, encoding, callback) { var self = this; - if(self.db.reorg) { - var err = new Error('reorg in process'); - err.reorg2 = true; - return callback(err); - } - async.parallel([function(next) { self.db.getConcurrentBlockOperations(block, true, function(err, operations) { if(err) { diff --git a/lib/services/timestamp/index.js b/lib/services/timestamp/index.js index e5f75b92..19bfa185 100644 --- a/lib/services/timestamp/index.js +++ b/lib/services/timestamp/index.js @@ -49,7 +49,7 @@ TimestampService.prototype.blockHandler = function(block, connectBlock, callback return next(null, (block.header.timestamp - 1)); } - self.getTimestamp(block.header.prevHash.reverse().toString('hex'), next); + self.getTimestamp(block.toObject().header.prevHash, next); } getLastTimestamp(function(err, lastTimestamp) { diff --git a/lib/services/wallet-api/index.js b/lib/services/wallet-api/index.js index 8e4977f2..23971c94 100644 --- a/lib/services/wallet-api/index.js +++ b/lib/services/wallet-api/index.js @@ -15,8 +15,9 @@ var _ = require('lodash'); var bodyParser = require('body-parser'); var LRU = require('lru-cache'); var Encoding = require('./encoding'); -var Readable = require('stream').Readable; var Input = require('bitcore-lib').Transaction.Input; +var Transform = require('stream').Transform; +var transform = new Transform({ objectMode: true }); var WalletService = function(options) { BaseService.call(this, options); @@ -27,14 +28,6 @@ var WalletService = function(options) { maxAge: 86400000 * 3 //3 days }); - this._cache = LRU({ - max: 500 * 1024 * 1024, - length: function(n) { - return Buffer.byteLength(n, 'utf8'); - }, - maxAge: 30 * 60 * 1000 - }); - this._addressMap = {}; this.balances = {}; }; @@ -59,6 +52,7 @@ WalletService.prototype.start = function(callback) { self.store = self.node.services.db.store; self.node.services.db.getPrefix(self.name, function(err, servicePrefix) { + if(err) { return callback(err); } @@ -67,12 +61,14 @@ WalletService.prototype.start = function(callback) { self._encoding = new Encoding(self.servicePrefix); self._loadAllAddresses(function(err) { + if(err) { return callback(err); } self._loadAllBalances(callback); }); + }); }; @@ -107,20 +103,24 @@ WalletService.prototype._checkAddresses = function() { }; WalletService.prototype.blockHandler = function(block, connectBlock, callback) { + var opts = { block: block, connectBlock: connectBlock, serial: true }; this._blockHandler(opts, callback); + }; WalletService.prototype.concurrentBlockHandler = function(block, connectBlock, callback) { + var opts = { block: block, connectBlock: connectBlock }; this._blockHandler(opts, callback); + }; WalletService.prototype._blockHandler = function(opts, callback) { @@ -272,8 +272,6 @@ WalletService.prototype._processSerialInput = function(opts, tx, input, callback var self = this; - //we may not have walletIds to update but this input may be spending a pay-to-pub-key utxo - //so we must get the spending tx to check on that var walletIds = input.script && input.script.isPublicKeyIn() ? ['p2pk'] : self._getWalletIdsFromScript(input); @@ -619,7 +617,9 @@ WalletService.prototype._endpointPostAddresses = function() { }; WalletService.prototype._endpointGetTransactions = function() { + var self = this; + return function(req, res) { var walletId = req.params.walletId; @@ -632,28 +632,30 @@ WalletService.prototype._endpointGetTransactions = function() { var options = { start: heights[0] || 0, end : heights[1] || 0xffffffff, - from: req.query.from, - to: req.query.to + self: self, + walletId: walletId }; - self._getTransactions(walletId, options, function(err, transactions) { + transform._transform = function(chunk, enc, callback) { - if(err) { - return utils.sendError(err, res); - } + var txid = self._encoding.decodeWalletTransactionKey(chunk).txid; + self._getTransactionFromDb(options, txid, function(err, tx) { - var rs = new Readable(); + transform.push(utils.toJSONL(self._formatTransaction(tx))); + callback(); - transactions.forEach(function(transaction) { - if (transaction) { - rs.push(utils.toJSONL(self._formatTransaction(transaction))); - } }); - rs.push(null); - rs.pipe(res); + }; - }); + transform._flush = function (callback) { + callback(); + } + + var encodingFn = self._encoding.encodeWalletTransactionKey.bind(self._encoding); + var stream = self.store.createKeyStream(self._getSearchParams(encodingFn, options)); + + stream.pipe(transform).pipe(res); }); }; }; @@ -664,7 +666,7 @@ WalletService.prototype._formatTransactions = function(txs) { WalletService.prototype._formatTransaction = function(tx) { var obj = tx.toObject(); - //jsonl parser will not allow newline characters here + for(var i = 0; i < tx.inputs.length; i++) { obj.inputs[i].inputSatoshis = tx.__inputValues[i]; } @@ -758,20 +760,6 @@ WalletService.prototype._getBalance = function(walletId, options, callback) { }; -WalletService.prototype._chunkAdresses = function(addresses) { - - var maxLength = this.node.services.bitcoind.maxAddressesQuery; - var groups = []; - var groupsCount = Math.ceil(addresses.length / maxLength); - - for(var i = 0; i < groupsCount; i++) { - groups.push(addresses.slice(i * maxLength, Math.min(maxLength * (i + 1), addresses.length))); - } - - return groups; - -}; - WalletService.prototype._getSearchParams = function(fn, options) { return { gte: fn.call(this, options.walletId, options.start), @@ -782,7 +770,6 @@ WalletService.prototype._getSearchParams = function(fn, options) { WalletService.prototype._getTxidsFromDb = function(options, callback) { var self = this; - var txids = []; var encodingFn = self._encoding.encodeWalletTransactionKey.bind(self._encoding); var stream = self.store.createKeyStream(self._getSearchParams(encodingFn, options)); @@ -802,32 +789,20 @@ WalletService.prototype._getTxidsFromDb = function(options, callback) { }; -WalletService.prototype._getTransactionsFromDb = function(txids, options, callback) { +WalletService.prototype._getTransactionFromDb = function(options, txid, callback) { - var self = this; - async.mapLimit(txids, 10, function(txid, next) { + var self = options.self; - self.node.services.transaction.getTransaction(txid, options, next); - - }, function(err, txs) { + self.node.services.transaction.getTransaction(txid, options, function(err, tx) { if(err) { return callback(err); } - self._cache.set(options.key, JSON.stringify(self._formatTransactions(txs))); - - if (!options.queryMempool) { - - options.to = options.to || txs.length; - return callback(null, txs.slice(options.from, options.to), txs.length); - - } - - options.txs = txs; - self._getTransactionsFromMempool(options, callback); + callback(null, tx); }); + }; WalletService.prototype._getTransactionsFromMempool = function(options, callback) { @@ -841,12 +816,20 @@ WalletService.prototype._getTransactionsFromMempool = function(options, callback } self.mempool.getTransactionsByAddresses(addresses, function(err, mempoolTxs) { + if(err) { return callback(err); } - var txs = options.txs.concat(mempoolTxs); - callback(null, txs.slice(options.from, options.to), txs.length); + if (mempoolTxs) { + mempoolTxs.forEach(function(tx) { + options.readable.push(utils.toJSONL(self._formatTransaction(tx))); + }); + } + + options.readable.push(null); + options.readable.pipe(options.response); + callback(); }); }); @@ -862,35 +845,20 @@ WalletService.prototype._getTransactions = function(walletId, options, callback) var opts = { start: start, end: end, - from: options.from || 0, - to: options.to || 0, walletId: walletId, - key: walletId + start + end + key: walletId + start + end, + readable: options.readable }; - if (!self._cache.peek(opts.key)) { - - return self._getTxidsFromDb(opts, callback); - - } - - try { - - opts.txs = JSON.parse(self._cache.get(opts.key)); - self._getTransactionsFromMempool(opts, callback); - - } catch(e) { - - self._cache.del(opts.key); - return callback(e); - - } + return self._getTxidsFromDb(opts, callback); }; WalletService.prototype._removeWallet = function(walletId, callback) { + var self = this; async.map(Object.keys(self._encoding.subKeyMap), function(prefix, next) { + var keys = []; var start = self._encoding.subKeyMap[prefix].fn.call(self._encoding, walletId); @@ -1252,16 +1220,6 @@ WalletService.prototype._endpointJobs = function() { }; -WalletService.prototype._endpointIsSynced = function() { - var self = this; - - return function(req, res) { - - res.status(200).jsonp({ result: !self.node.services.db.syncing }); - - }; -}; - WalletService.prototype._endpointJobStatus = function() { var self = this; @@ -1320,9 +1278,6 @@ WalletService.prototype._setupReadOnlyRoutes = function(app) { app.get('/jobs', s._endpointJobs() ); - app.get('/issynced', - s._endpointIsSynced() - ); }; WalletService.prototype._setupWriteRoutes = function(app) { diff --git a/regtest/db.js b/regtest/db.js new file mode 100644 index 00000000..1c0dae3f --- /dev/null +++ b/regtest/db.js @@ -0,0 +1,215 @@ +'use strict'; + +var chai = require('chai'); +var should = chai.should(); +var async = require('async'); +var path = require('path'); +var utils = require('./utils'); +var zmq = require('zmq'); +var http = require('http'); +var blocks = require('../test/data/blocks.json'); +var bitcore = require('bitcore-lib'); +var Block = bitcore.Block; +var BufferUtil = bitcore.util.buffer; + +/* + Bitcoind does not need to be started or run +*/ + +var debug = false; +var bitcoreDataDir = '/tmp/bitcore'; +var pubSocket; +var rpcServer; + +function setupFakeRpcServer() { + rpcServer = http.createServer(); + rpcServer.listen(48332, '127.0.0.1'); +} + +function setupFakeZmq() { + pubSocket = zmq.socket('pub'); + pubSocket.bind('tcp://127.0.0.1:38332'); +} + +var bitcore = { + configFile: { + file: bitcoreDataDir + '/bitcore-node.json', + conf: { + network: 'regtest', + port: 53001, + datadir: bitcoreDataDir, + services: [ + 'bitcoind', + 'db', + 'transaction', + 'timestamp', + 'address', + 'mempool', + 'wallet-api', + 'web' + ], + servicesConfig: { + bitcoind: { + connect: [ + { + rpcconnect: '127.0.0.1', + rpcport: 48332, + rpcuser: 'bitcoin', + rpcpassword: 'local321', + zmqpubrawtx: 'tcp://127.0.0.1:38332' + } + ] + } + } + } + }, + httpOpts: { + protocol: 'http:', + hostname: 'localhost', + port: 53001, + }, + opts: { cwd: bitcoreDataDir }, + datadir: bitcoreDataDir, + exec: path.resolve(__dirname, '../bin/bitcore-node'), + args: ['start'], + process: null +}; + +var opts = { + debug: debug, + bitcore: bitcore, + bitcoreDataDir: bitcoreDataDir, + blockHeight: 0 +}; + +var genesis = new Block(new Buffer(blocks.genesis, 'hex')); +var block1 = new Block(new Buffer(blocks.block1a, 'hex')); +var block2 = new Block(new Buffer(blocks.block1b, 'hex')); + +describe('DB Operations', function() { + + this.timeout(60000); + + describe('DB Reorg', function() { + + var self = this; + + var responses = [ + genesis.hash, + { hash: genesis.hash, height: 0 }, + genesis.hash, + blocks.genesis, //end initChain + block1.hash, + blocks.block1a, + block2.hash, + blocks.block1b, + { hash: block1.header.hash, previousblockhash: BufferUtil.reverse(block1.header.prevHash).toString('hex') }, + { hash: block2.header.hash, previousblockhash: BufferUtil.reverse(block2.header.prevHash).toString('hex') }, + blocks.genesis, + blocks.block1b, + ]; + + after(function(done) { + pubSocket.close(); + rpcServer.close(); + bitcore.process.kill(); + setTimeout(done, 1000); + }); + + + before(function(done) { + + var responseCount = 0; + + setupFakeRpcServer(); + + rpcServer.on('request', function(req, res) { + var data = ''; + + req.on('data', function(chunk) { + data += chunk.toString(); + }); + + req.on('end', function() { + var body = JSON.parse(data); + //console.log('request', body); + var response = JSON.stringify({ result: responses[responseCount++] }); + //console.log('response', response, 'id: ', body.id); + res.write(response); + res.end(); + }); + + }); + + setupFakeZmq(); + + self.opts = Object.assign({}, opts); + + utils.startBitcoreNode(self.opts, function() { + utils.waitForBitcoreNode(self.opts, done); + }); + + }); + + it('should reorg when needed', function(done) { + + var block1a = '77d0b8043d3a1353ffd22ad70e228e30c15fd0f250d51d608b1b7997e6239ffb'; + var block1b = '2e516187b1b58467cb138bf68ff00d9bda71b5487cdd7b9b9cfbe7b153cd59d4'; + + /* + _______________________________________________________ + | | | | | + | Genesis | Block 1a | Block 1b | Result | + | _______ ________ | + | | |_____| |___________________ORPHANED | + | |_______| |________| | + | | ________ ________ | + | |______________________| |____| | | + | |________| |________| | + |_______________________________________________________| + + */ + + async.series([ + + publishBlockHash.bind(self, block1a), + publishBlockHash.bind(self, block1b) + + ], function(err) { + + if(err) { + return done(err); + } + done(); + + }); + }); + }); + +}); + +function publishBlockHash(blockHash, callback) { + + pubSocket.send([ 'hashblock', new Buffer(blockHash, 'hex') ]); + + var httpOpts = utils.getHttpOpts(opts, { path: '/wallet-api/info' }); + + //we don't know exactly when all the blockhandlers will complete after the "tip" event + //so we must wait an indeterminate time to check on the current tip + setTimeout(function() { + + utils.queryBitcoreNode(httpOpts, function(err, res) { + + if(err) { + return callback(err); + } + + blockHash.should.equal(JSON.parse(res).hash); + callback(); + + }); + + }, 2000); +} + + diff --git a/regtest/utils.js b/regtest/utils.js index 4c5fefe3..296b7dc2 100644 --- a/regtest/utils.js +++ b/regtest/utils.js @@ -11,7 +11,6 @@ var http = require('http'); var Unit = bitcore.Unit; var Transaction = bitcore.Transaction; var PrivateKey = bitcore.PrivateKey; -var crypto = require('crypto'); var utils = {}; @@ -73,21 +72,23 @@ utils.queryBitcoreNode = function(httpOpts, callback) { request.end(); }; -utils.waitForBitcoreNode = function(callback) { +utils.waitForBitcoreNode = function(opts, callback) { - bitcore.process.stdout.on('data', function(data) { - if (debug) { + var self = this; + + opts.bitcore.process.stdout.on('data', function(data) { + if (opts.debug) { console.log(data.toString()); } }); - bitcore.process.stderr.on('data', function(data) { + opts.bitcore.process.stderr.on('data', function(data) { console.log(data.toString()); }); var errorFilter = function(err, res) { try { - if (JSON.parse(res).height === blockHeight) { + if (JSON.parse(res).height === opts.blockHeight) { return; } return res; @@ -96,29 +97,40 @@ utils.waitForBitcoreNode = function(callback) { } }; - var httpOpts = getHttpOpts({ path: '/wallet-api/info', errorFilter: errorFilter }); + var httpOpts = self.getHttpOpts(opts, { path: '/wallet-api/info', errorFilter: errorFilter }); - waitForService(queryBitcoreNode.bind(this, httpOpts), callback); + self.waitForService(self.queryBitcoreNode.bind(self, httpOpts), callback); }; -utils.waitForBitcoinReady = function(callback) { - waitForService(function(callback) { - rpc.generate(initialHeight, function(err, res) { +utils.waitForBitcoinReady = function(opts, callback) { + + var self = this; + self.waitForService(function(callback) { + + opts.rpc.generate(opts.initialHeight, function(err, res) { + if (err || (res && res.error)) { return callback('keep trying'); } - blockHeight += initialHeight; + opts.blockHeight += opts.initialHeight; callback(); }); }, function(err) { + if(err) { return callback(err); } + callback(); + }, callback); -} + +}; utils.initializeAndStartService = function(opts, callback) { + + var self = this; + rimraf(opts.datadir, function(err) { if(err) { return callback(err); @@ -128,34 +140,37 @@ utils.initializeAndStartService = function(opts, callback) { return callback(err); } if (opts.configFile) { - writeConfigFile(opts.configFile.file, opts.configFile.conf); + self.writeConfigFile(opts.configFile.file, opts.configFile.conf); } - var args = _.isArray(opts.args) ? opts.args : toArgs(opts.args); + var args = _.isArray(opts.args) ? opts.args : self.toArgs(opts.args); opts.process = spawn(opts.exec, args, opts.opts); callback(); }); }); -} -utils.startBitcoreNode = function(callback) { - initializeAndStartService(bitcore, callback); -} +}; -utils.startBitcoind = function(callback) { - initializeAndStartService(bitcoin, callback); -} +utils.startBitcoreNode = function(opts, callback) { + this.initializeAndStartService(opts.bitcore, callback); +}; -utils.unlockWallet = function(callback) { - rpc.walletPassPhrase(walletPassphrase, 3000, function(err) { +utils.startBitcoind = function(opts, callback) { + this.initializeAndStartService(opts.bitcoin, callback); +}; + +utils.unlockWallet = function(opts, callback) { + opts.rpc.walletPassPhrase(opts.walletPassphrase, 3000, function(err) { if(err && err.code !== -15) { return callback(err); } callback(); }); -} +}; + +utils.getPrivateKeysWithABalance = function(opts, callback) { + + opts.rpc.listUnspent(function(err, res) { -utils.getPrivateKeysWithABalance = function(callback) { - rpc.listUnspent(function(err, res) { if(err) { return callback(err); } @@ -170,13 +185,15 @@ utils.getPrivateKeysWithABalance = function(callback) { return callback(new Error('no utxos available')); } async.mapLimit(utxos, 8, function(utxo, callback) { - rpc.dumpPrivKey(utxo.address, function(err, res) { + + opts.rpc.dumpPrivKey(utxo.address, function(err, res) { if(err) { return callback(err); } var privKey = res.result; callback(null, { utxo: utxo, privKey: privKey }); }); + }, function(err, utxos) { if(err) { return callback(err); @@ -184,11 +201,13 @@ utils.getPrivateKeysWithABalance = function(callback) { callback(null, utxos); }); }); -} -utils.generateSpendingTxs = function(utxos) { +}; + +utils.generateSpendingTxs = function(opts, utxos) { + return utxos.map(function(utxo) { - txCount++; + var toPrivKey = new PrivateKey('testnet'); //external addresses var changePrivKey = new PrivateKey('testnet'); //our wallet keys var utxoSatoshis = Unit.fromBTC(utxo.utxo.amount).satoshis; @@ -197,77 +216,90 @@ utils.generateSpendingTxs = function(utxos) { tx.from(utxo.utxo); tx.to(toPrivKey.toAddress().toString(), satsToPrivKey); - tx.fee(fee); + tx.fee(opts.fee); tx.change(changePrivKey.toAddress().toString()); tx.sign(utxo.privKey); - walletPrivKeys.push(changePrivKey); - satoshisReceived += Unit.fromBTC(utxo.utxo.amount).toSatoshis() - (satsToPrivKey + fee); + opts.walletPrivKeys.push(changePrivKey); + opts.satoshisReceived += Unit.fromBTC(utxo.utxo.amount).toSatoshis() - (satsToPrivKey + opts.fee); return tx; }); -} -utils.setupInitialTxs = function(callback) { - getPrivateKeysWithABalance(function(err, utxos) { +}; + +utils.setupInitialTxs = function(opts, callback) { + + var self = this; + self.getPrivateKeysWithABalance(opts, function(err, utxos) { + if(err) { return callback(err); } - initialTxs = generateSpendingTxs(utxos); + opts.initialTxs = self.generateSpendingTxs(opts, utxos); callback(); }); -} -utils.sendTxs = function(callback) { - async.eachOfSeries(initialTxs, sendTx, callback); -} +}; -utils.sendTx = function(tx, index, callback) { - rpc.sendRawTransaction(tx.serialize(), function(err) { +utils.sendTxs = function(opts, callback) { + async.eachOfSeries(opts.initialTxs, this.sendTx.bind(this, opts), callback); +}; + +utils.sendTx = function(opts, tx, index, callback) { + + opts.rpc.sendRawTransaction(tx.serialize(), function(err) { if (err) { return callback(err); } var mod = index % 2; if (mod === 1) { - blockHeight++; - rpc.generate(1, callback); + opts.blockHeight++; + opts.rpc.generate(1, callback); } else { callback(); } }); -} -utils.getHttpOpts = function(opts) { +}; + +utils.getHttpOpts = function(opts, httpOpts) { return Object.assign({ - path: opts.path, - method: opts.method || 'GET', - body: opts.body, + path: httpOpts.path, + method: httpOpts.method || 'GET', + body: httpOpts.body, headers: { 'Content-Type': 'application/json', - 'Content-Length': opts.length || 0 + 'Content-Length': httpOpts.length || 0 }, - errorFilter: opts.errorFilter - }, bitcore.httpOpts); -} + errorFilter: httpOpts.errorFilter + }, opts.bitcore.httpOpts); +}; -utils.registerWallet = function(callback) { - var httpOpts = getHttpOpts({ path: '/wallet-api/wallets/' + walletId, method: 'POST' }); - queryBitcoreNode(httpOpts, callback); -} +utils.registerWallet = function(opts, callback) { -utils.uploadWallet = function(callback) { - var addresses = JSON.stringify(walletPrivKeys.map(function(privKey) { + var httpOpts = this.getHttpOpts(opts, { path: '/wallet-api/wallets/' + opts.walletId, method: 'POST' }); + this.queryBitcoreNode(httpOpts, callback); + +}; + +utils.uploadWallet = function(opts, callback) { + + var self = this; + var addresses = JSON.stringify(opts.walletPrivKeys.map(function(privKey) { if (privKey.privKey) { return privKey.pubKey.toString(); } return privKey.toAddress().toString(); })); - var httpOpts = getHttpOpts({ - path: '/wallet-api/wallets/' + walletId + '/addresses', + + var httpOpts = self.getHttpOpts(opts, { + path: '/wallet-api/wallets/' + opts.walletId + '/addresses', method: 'POST', body: addresses, length: addresses.length }); - async.waterfall([ queryBitcoreNode.bind(this, httpOpts) ], function(err, res) { + + async.waterfall([ self.queryBitcoreNode.bind(self, httpOpts) ], function(err, res) { if (err) { return callback(err); } @@ -275,10 +307,10 @@ utils.uploadWallet = function(callback) { Object.keys(job).should.deep.equal(['jobId']); - var httpOpts = getHttpOpts({ path: '/wallet-api/jobs/' + job.jobId }); + var httpOpts = self.getHttpOpts(opts, { path: '/wallet-api/jobs/' + job.jobId }); async.retry({ times: 10, interval: 1000 }, function(next) { - queryBitcoreNode(httpOpts, function(err, res) { + self.queryBitcoreNode(httpOpts, function(err, res) { if (err) { return next(err); } @@ -296,23 +328,30 @@ utils.uploadWallet = function(callback) { callback(); }); }); -} -utils.getListOfTxs = function(callback) { +}; + +utils.getListOfTxs = function(opts, callback) { + + var self = this; var end = Date.now() + 86400000; - var httpOpts = getHttpOpts({ path: '/wallet-api/wallets/' + walletId + '/transactions?start=0&end=' + end }); - queryBitcoreNode(httpOpts, function(err, res) { + var httpOpts = self.getHttpOpts(opts, { + path: '/wallet-api/wallets/' + opts.walletId + '/transactions?start=0&end=' + end }); + + self.queryBitcoreNode(httpOpts, function(err, res) { if(err) { return callback(err); } var results = []; res.split('\n').forEach(function(result) { + if (result.length > 0) { return results.push(JSON.parse(result)); } + }); - var map = initialTxs.map(function(tx) { + var map = opts.initialTxs.map(function(tx) { return tx.serialize(); }); @@ -322,28 +361,15 @@ utils.getListOfTxs = function(callback) { }); map.length.should.equal(0); - results.length.should.equal(initialTxs.length); + results.length.should.equal(opts.initialTxs.length); callback(); }); -} +}; -utils.initGlobals = function() { - walletPassphrase = 'test'; - txCount = 0; - blockHeight = 0; - walletPrivKeys = []; - initialTxs = []; - fee = 100000; - feesReceived = 0; - satoshisSent = 0; - walletId = crypto.createHash('sha256').update('test').digest('hex'); - satoshisReceived = 0; -} - -utils.cleanup = function(callback) { - bitcore.process.kill(); - bitcoin.process.kill(); +utils.cleanup = function(opts, callback) { + opts.bitcore.process.kill(); + opts.bitcoin.process.kill(); setTimeout(callback, 2000); -} +}; module.exports = utils; diff --git a/regtest/wallet.js b/regtest/wallet.js index dffa02c4..9684f73d 100644 --- a/regtest/wallet.js +++ b/regtest/wallet.js @@ -3,10 +3,10 @@ var chai = require('chai'); var should = chai.should(); var async = require('async'); -var bitcore = require('bitcore-lib'); var BitcoinRPC = require('bitcoind-rpc'); var path = require('path'); -var utils = require('utils'); +var utils = require('./utils'); +var crypto = require('crypto'); var debug = false; var bitcoreDataDir = '/tmp/bitcore'; @@ -82,36 +82,58 @@ var bitcore = { process: null }; -var rpc = new BitcoinRPC(rpcConfig); -var walletPassphrase, txCount, blockHeight, walletPrivKeys, -initialTxs, fee, walletId, satoshisReceived, satoshisSent, feesReceived; -var initialHeight = 150; +var opts = { + debug: debug, + bitcore: bitcore, + bitcoin: bitcoin, + bitcoinDataDir: bitcoinDataDir, + bitcoreDataDir: bitcoreDataDir, + rpc: new BitcoinRPC(rpcConfig), + walletPassphrase: 'test', + txCount: 0, + blockHeight: 0, + walletPrivKeys: [], + initialTxs: [], + fee: 100000, + feesReceived: 0, + satoshisSent: 0, + walletId: crypto.createHash('sha256').update('test').digest('hex'), + satoshisReceived: 0, + initialHeight: 150 +}; describe('Wallet Operations', function() { this.timeout(60000); - after(cleanup); + describe('Register, Upload, GetTransactions', function() { - describe('Register and Upload', function() { + var self = this; + + after(function(done) { + utils.cleanup(self.opts, done); + }); before(function(done) { - initGlobals(); + self.opts = Object.assign({}, opts); async.series([ - startBitcoind, - waitForBitcoinReady, - unlockWallet, - setupInitialTxs, - startBitcoreNode, - waitForBitcoreNode + utils.startBitcoind.bind(utils, self.opts), + utils.waitForBitcoinReady.bind(utils, self.opts), + utils.unlockWallet.bind(utils, self.opts), + utils.setupInitialTxs.bind(utils, self.opts), + utils.startBitcoreNode.bind(utils, self.opts), + utils.waitForBitcoreNode.bind(utils, self.opts) ], done); }); it('should register wallet', function(done) { - registerWallet(function(err, res) { + + utils.registerWallet.call(utils, self.opts, function(err, res) { + if (err) { return done(err); } + res.should.deep.equal(JSON.stringify({ walletId: '9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08' })); @@ -120,25 +142,26 @@ describe('Wallet Operations', function() { }); it('should upload a wallet', function(done) { - uploadWallet(done); - }); - }); + utils.uploadWallet.call(utils, self.opts, done); - describe('Load addresses at genesis block', function() { - - before(function(done) { - sendTxs(function(err) { - if(err) { - return done(err); - } - waitForBitcoreNode(done); - }); }); it('should get a list of transactions', function(done) { - getListOfTxs(done); + utils.sendTxs.call(utils, self.opts, function(err) { + + if(err) { + return done(err); + } + utils.waitForBitcoreNode.call(utils, self.opts, function(err) { + + if(err) { + return done(err); + } + utils.getListOfTxs.call(utils, self.opts, done); + }); + }); }); @@ -146,62 +169,79 @@ describe('Wallet Operations', function() { describe('Load addresses after syncing the blockchain', function() { + var self = this; + + self.opts = Object.assign({}, opts); + + after(utils.cleanup.bind(utils, self.opts)); + before(function(done) { - initGlobals(); async.series([ - cleanup, - startBitcoind, - waitForBitcoinReady, - unlockWallet, - setupInitialTxs, - sendTxs, - startBitcoreNode, - waitForBitcoreNode, - registerWallet, - uploadWallet + utils.startBitcoind.bind(utils, self.opts), + utils.waitForBitcoinReady.bind(utils, self.opts), + utils.unlockWallet.bind(utils, self.opts), + utils.setupInitialTxs.bind(utils, self.opts), + utils.sendTxs.bind(utils, self.opts), + utils.startBitcoreNode.bind(utils, self.opts), + utils.waitForBitcoreNode.bind(utils, self.opts), + utils.registerWallet.bind(utils, self.opts), + utils.uploadWallet.bind(utils, self.opts) ], done); }); it('should get list of transactions', function(done) { - getListOfTxs(done); + utils.getListOfTxs.call(utils, self.opts, done); }); it('should get the balance of a wallet', function(done) { - var httpOpts = getHttpOpts({ path: '/wallet-api/wallets/' + walletId + '/balance' }); - queryBitcoreNode(httpOpts, function(err, res) { + + var httpOpts = utils.getHttpOpts.call( + utils, + self.opts, + { path: '/wallet-api/wallets/' + self.opts.walletId + '/balance' }); + + utils.queryBitcoreNode.call(utils, httpOpts, function(err, res) { if(err) { return done(err); } var results = JSON.parse(res); - results.satoshis.should.equal(satoshisReceived); + results.satoshis.should.equal(self.opts.satoshisReceived); done(); }); }); it('should get the set of utxos for the wallet', function(done) { - var httpOpts = getHttpOpts({ path: '/wallet-api/wallets/' + walletId + '/utxos' }); - queryBitcoreNode(httpOpts, function(err, res) { + + var httpOpts = utils.getHttpOpts.call( + utils, + self.opts, + { path: '/wallet-api/wallets/' + opts.walletId + '/utxos' }); + + utils.queryBitcoreNode.call(utils, httpOpts, function(err, res) { + if(err) { return done(err); } + var results = JSON.parse(res); var balance = 0; results.utxos.forEach(function(utxo) { balance += utxo.satoshis; }); - results.height.should.equal(blockHeight); - balance.should.equal(satoshisReceived); + + results.height.should.equal(self.opts.blockHeight); + balance.should.equal(self.opts.satoshisReceived); done(); }); }); it('should get the list of jobs', function(done) { - var httpOpts = getHttpOpts({ path: '/wallet-api/jobs' }); - queryBitcoreNode(httpOpts, function(err, res) { + var httpOpts = utils.getHttpOpts.call(utils, self.opts, { path: '/wallet-api/jobs' }); + utils.queryBitcoreNode.call(utils, httpOpts, function(err, res) { if(err) { return done(err); } @@ -212,8 +252,8 @@ describe('Wallet Operations', function() { }); it('should remove all wallets', function(done) { - var httpOpts = getHttpOpts({ path: '/wallet-api/wallets', method: 'DELETE' }); - queryBitcoreNode(httpOpts, function(err, res) { + var httpOpts = utils.getHttpOpts.call(utils, self.opts, { path: '/wallet-api/wallets', method: 'DELETE' }); + utils.queryBitcoreNode.call(utils, httpOpts, function(err, res) { if(err) { return done(err); } diff --git a/test/data/blocks.json b/test/data/blocks.json new file mode 100644 index 00000000..381bb900 --- /dev/null +++ b/test/data/blocks.json @@ -0,0 +1,5 @@ +{ + "genesis": "0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4adae5494dffff7f20020000000101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000", + "block1a": "0000002006226e46111a0b59caaf126043eb5bbf28c34f3a5e332a1fc7b2b73cf188910f69965a91e7fc9ccccbe4051b74d086114741b96678f5e491b5609b18962252fd2d12f858ffff7f20040000000102000000010000000000000000000000000000000000000000000000000000000000000000ffffffff03510101ffffffff0200f2052a0100000023210372bfaa748e546ba784a4d1395f5cedf673f9f5a8160effbe0f595fe905fb3e59ac0000000000000000266a24aa21a9ede2f61c3f71d1defd3fa999dfa36953755c690689799962b48bebd836974e8cf900000000", + "block1b": "0000002006226e46111a0b59caaf126043eb5bbf28c34f3a5e332a1fc7b2b73cf188910f36f612e7b9a88a90fdd781e8885ae425ee9124e17c22d6eb4253094d0e6f6ae6dfede058ffff7f20010000000101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff03510101ffffffff0100f2052a01000000232103caa96df67b8a5ce37cca53ddf394a093fab6922830fad79fc5f0d8369200121cac00000000" +} diff --git a/test/services/db/index.unit.js b/test/services/db/index.unit.js new file mode 100644 index 00000000..37731ef7 --- /dev/null +++ b/test/services/db/index.unit.js @@ -0,0 +1,56 @@ +'use strict'; + +var expect = require('chai').expect; +var bitcore = require('bitcore-lib'); +var DB = require('../../../lib/services/db'); + +describe('DB', function() { + + describe('Reorg', function() { + + before(function() { + this.db = new DB({ + node: { + network: bitcore.Networks.testnet, + datadir: '/tmp', + services: '' + } + }); + this.db.tip = { hash: 'ff', height: 444 }; + }); + + it('should detect a reorg from a common ancenstor that is in our set', function() { + var block1 = { hash: '11', header: { prevHash: new Buffer('ff', 'hex') } }; + var block2 = { hash: '22', header: { prevHash: new Buffer('11', 'hex') } }; + var block3 = { hash: '33', header: { prevHash: new Buffer('22', 'hex') } }; + var block4 = { hash: '44', header: { prevHash: new Buffer('22', 'hex') } }; + //blocks must be passed in the order that they are received. + var blocks = [ block3, block2, block1, block4 ]; + expect(this.db.detectReorg(blocks)).to.deep.equal(block3); + + }); + + it('should detect a reorg from a common ancenstor that is not in our set', function() { + var block1 = { hash: '11', header: { prevHash: new Buffer('ff', 'hex') } }; + var block2 = { hash: '22', header: { prevHash: new Buffer('11', 'hex') } }; + var block3 = { hash: '33', header: { prevHash: new Buffer('22', 'hex') } }; + var block4 = { hash: '44', header: { prevHash: new Buffer('ee', 'hex') } }; + var blocks = [ block3, block2, block1, block4 ]; + expect(this.db.detectReorg(blocks)).to.deep.equal(block4); + + }); + + it('should not detect a reorg', function() { + this.db.reorgTipHash = null; + var block1 = { hash: '11', header: { prevHash: new Buffer('ff', 'hex') } }; + var block2 = { hash: '22', header: { prevHash: new Buffer('11', 'hex') } }; + var block3 = { hash: '33', header: { prevHash: new Buffer('22', 'hex') } }; + var block4 = { hash: '44', header: { prevHash: new Buffer('33', 'hex') } }; + var blocks = [ block3, block2, block1, block4 ]; + var actual = this.db.detectReorg(blocks); + expect(actual).to.be.undefined; + }); + + }); +}); + diff --git a/test/services/db/reorg.integration.js b/test/services/db/reorg.integration.js new file mode 100644 index 00000000..8f80a9fc --- /dev/null +++ b/test/services/db/reorg.integration.js @@ -0,0 +1,56 @@ +'use strict'; + +var should = require('chai').should(); +var sinon = require('sinon'); +var bitcore = require('bitcore-lib'); +var BufferUtil = bitcore.util.buffer; +var DB = require('../../../lib/services/db'); +var Networks = bitcore.Networks; +var EventEmitter = require('events').EventEmitter; +var rimraf = require('rimraf'); +var mkdirp = require('mkdirp'); +var blocks = require('../../data/blocks.json'); + + +describe('DB', function() { + + var bitcoind = { + on: function(event, callback) { + }, + genesisBuffer: blocks.genesis + }; + + var node = { + network: Networks.testnet, + datadir: '/tmp/datadir', + services: { bitcoind: bitcoind }, + on: sinon.stub(), + once: sinon.stub() + }; + + before(function(done) { + + var self = this; + + rimraf(node.datadir, function(err) { + if(err) { + return done(err); + } + mkdirp(node.datadir, done); + }); + + this.db = new DB({node: node}); + this.emitter = new EventEmitter(); + + }); + + + describe('Reorg', function() { + + it('should start db service', function(done) { + this.db.start(done); + }); + + }); +}); + diff --git a/test/services/db/reorg.unit.js b/test/services/db/reorg.unit.js index 0da7a02f..c49891f3 100644 --- a/test/services/db/reorg.unit.js +++ b/test/services/db/reorg.unit.js @@ -14,7 +14,7 @@ describe('Reorg', function() { toString: function() { return input; } - } + }; }); }); @@ -96,11 +96,11 @@ describe('Reorg', function() { } } } - } + }; var reorg = new Reorg(node, db); - - reorg.handleReorg(bitcoindBlocks[3], function(err) { + + reorg.handleReorg(bitcoindBlocks[3].hash, function(err) { should.not.exist(err); db.tip.hash.should.equal('main3'); @@ -132,4 +132,4 @@ describe('Reorg', function() { }); }); }); -}); \ No newline at end of file +});