From f452d63d75340674aa387001302eefac7a3f9360 Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Tue, 9 May 2017 16:36:09 -0400 Subject: [PATCH] Fixed issue with missing inputValues. --- lib/services/bitcoind/index.js | 1 + lib/services/db/index.js | 87 ++++------ lib/services/db/sync.js | 10 +- lib/services/wallet-api/index.js | 279 +++++++++++++++---------------- regtest/utils.js | 4 +- regtest/wallet.js | 3 +- 6 files changed, 179 insertions(+), 205 deletions(-) diff --git a/lib/services/bitcoind/index.js b/lib/services/bitcoind/index.js index 074131e2..59fc7436 100644 --- a/lib/services/bitcoind/index.js +++ b/lib/services/bitcoind/index.js @@ -202,6 +202,7 @@ Bitcoin.prototype._zmqBlockHandler = function(message) { self._hashBlockCache.set(hashBlockHex); self.tiphash = hashBlockHex; + self.height++; self.emit('block', message); diff --git a/lib/services/db/index.js b/lib/services/db/index.js index 1225cec8..762b2fbe 100644 --- a/lib/services/db/index.js +++ b/lib/services/db/index.js @@ -71,6 +71,23 @@ util.inherits(DB, Service); DB.dependencies = ['bitcoind']; +DB.prototype.pauseSync = function(callback) { + if (this._sync.syncing) { + this._sync.once('synced', function() { + this._sync.paused = true; + callback(); + }); + } else { + this._sync.paused = true; + setImmediate(callback); + } +}; + +DB.prototype.resumeSync = function() { + this._sync.paused = false; + this._sync.sync(); +}; + DB.prototype._setDataPath = function() { $.checkState(this.node.datadir, 'Node is expected to have a "datadir" property'); if (this.node.network === Networks.livenet) { @@ -132,7 +149,15 @@ DB.prototype.start = function(callback) { } self.genesis = Block.fromBuffer(self.bitcoind.genesisBuffer); - self.store = levelup(self.dataPath, { db: self.levelupStore, keyEncoding: 'binary', valueEncoding: 'binary'}); + self._store = levelup(self.dataPath, { db: self.levelupStore, keyEncoding: 'binary', valueEncoding: 'binary'}); + + self.store = { + get: self._store.get.bind(self._store), + put: self._store.put.bind(self._store), + batch: self._store.batch.bind(self._store), + createReadStream: self._store.createReadStream.bind(self._store), + createKeyStream: self._store.createKeyStream.bind(self._store), + }; self.node.once('ready', function() { @@ -236,14 +261,14 @@ DB.prototype.stop = function(callback) { }, function(next) { setTimeout(next, 10); }, function() { - if (self.store) { - self.store.close(callback); + if (self._store) { + self._store.close(callback); } }); }; DB.prototype.close = function(callback) { - this.store.close(callback); + this._store.close(callback); }; DB.prototype.getAPIMethods = function() { @@ -315,60 +340,6 @@ DB.prototype.unsubscribe = function(name, emitter) { } }; -DB.prototype.connectBlock = function(block, callback) { - var self = this; - - log.info('DB handling new chain block'); - var operations = []; - self.getConcurrentBlockOperations(block, true, function(err, ops) { - if(err) { - return callback(err); - } - - operations = ops; - - self.getSerialBlockOperations(block, true, function(err, ops) { - if(err) { - return callback(err); - } - - operations = operations.concat(ops); - - operations.push(self.getTipOperation(block, true)); - operations.push(self.getConcurrentTipOperation(block, true)); - - self.store.batch(operations, callback); - }); - }); -}; - -DB.prototype.disconnectBlock = function(block, callback) { - var self = this; - - log.debug('DB removing chain block'); - var operations = []; - self.getConcurrentBlockOperations(block, false, function(err, ops) { - if(err) { - return callback(err); - } - - operations = ops; - - self.getSerialBlockOperations(block, false, function(err, ops) { - if(err) { - return callback(err); - } - - operations = operations.concat(ops); - - operations.push(self.getTipOperation(block, false)); - operations.push(self.getConcurrentTipOperation(block, false)); - - self.store.batch(operations, callback); - }); - }); -}; - DB.prototype.getConcurrentBlockOperations = function(block, add, callback) { var operations = []; diff --git a/lib/services/db/sync.js b/lib/services/db/sync.js index 4bb8915e..ee11c9c1 100644 --- a/lib/services/db/sync.js +++ b/lib/services/db/sync.js @@ -22,7 +22,6 @@ function BlockStream(highWaterMark, db, sync) { this.lastEmittedHash = this.dbTip.hash; this.queue = []; this.processing = false; - this.syncing = true; this.bitcoind = this.db.bitcoind; } @@ -67,6 +66,7 @@ function Sync(node, db) { this.node = node; this.db = db; this.syncing = false; + this.paused = false; //we can't sync while one of our indexes is reading/writing separate from us this.highWaterMark = 10; this.progressBar = null; this.lastReportedBlock = 0; @@ -77,7 +77,7 @@ inherits(Sync, EventEmitter); Sync.prototype.sync = function() { var self = this; - if(this.syncing) { + if(this.syncing || this.paused) { return; } @@ -135,22 +135,26 @@ Sync.prototype._onFinish = function() { } self._startSubscriptions(); - log.info('Sync complete'); + self.emit('synced'); }; Sync.prototype._startSubscriptions = function() { var self = this; + if (!self.subscribed) { + self.subscribed = true; self.bus = self.node.openBus({remoteAddress: 'localhost'}); self.bus.on('bitcoind/hashblock', function(hashBlockHex) { self.sync(); }); + self.bus.subscribe('bitcoind/hashblock'); } + }; Sync.prototype.reportStatus = function() { diff --git a/lib/services/wallet-api/index.js b/lib/services/wallet-api/index.js index 23971c94..c239497f 100644 --- a/lib/services/wallet-api/index.js +++ b/lib/services/wallet-api/index.js @@ -49,7 +49,8 @@ WalletService.prototype.getAPIMethods = function() { WalletService.prototype.start = function(callback) { var self = this; - self.store = self.node.services.db.store; + self.db = self.node.services.db; + self.store = self.db.store; self.node.services.db.getPrefix(self.name, function(err, servicePrefix) { @@ -126,6 +127,7 @@ WalletService.prototype.concurrentBlockHandler = function(block, connectBlock, c WalletService.prototype._blockHandler = function(opts, callback) { var self = this; + if (!self._checkAddresses()) { return setImmediate(function() { callback(null, []); @@ -133,13 +135,18 @@ WalletService.prototype._blockHandler = function(opts, callback) { } async.mapSeries(opts.block.transactions, function(tx, next) { + self._processTransaction(opts, tx, next); + }, function(err, operations) { + if(err) { return callback(err); } + var ret = _.compact(_.flattenDeep(operations)); callback(null, ret); + }); }; @@ -410,19 +417,23 @@ WalletService.prototype._loadAllBalances = function(callback) { WalletService.prototype._endpointUTXOs = function() { var self = this; return function(req, res) { + req.setTimeout(600000); var walletId = req.params.walletId; var queryMempool = req.query.queryMempool !== false; var height = self.node.services.db.tip.__height; var options = { queryMempool: queryMempool }; - self._getUtxos(walletId, options, function(err, utxos) { - if(err) { - return utils.sendError(err, res); - } - res.status(200).jsonp({ - utxos: utxos, - height: height + self.db.pauseSync(function() { + self._getUtxos(walletId, options, function(err, utxos) { + if(err) { + return utils.sendError(err, res); + } + self.db.resumeSync(); + res.status(200).jsonp({ + utxos: utxos, + height: height + }); }); }); }; @@ -441,14 +452,17 @@ WalletService.prototype._endpointGetBalance= function() { byAddress: byAddress }; - self._getBalance(walletId, options, function(err, result) { - if(err) { - return utils.sendError(err, res); - } - res.status(200).jsonp({ - satoshis: result, - height: self.node.services.db.tip.__height, - hash: self.node.services.db.tip.hash + self.db.pauseSync(function() { + self._getBalance(walletId, options, function(err, result) { + if(err) { + return utils.sendError(err, res); + } + self.db.resumeSync(); + res.status(200).jsonp({ + satoshis: result, + height: self.node.services.db.tip.__height, + hash: self.node.services.db.tip.hash + }); }); }); }; @@ -459,13 +473,16 @@ WalletService.prototype._endpointRemoveWallet = function() { return function(req, res) { var walletId = req.params.walletId; - self._removeWallet(walletId, function(err, numRecords) { - if(err) { - return utils.sendError(err, res); - } - res.status(200).jsonp({ - walletId: walletId, - numberRemoved: numRecords + self.db.pauseSync(function() { + self._removeWallet(walletId, function(err, numRecords) { + if(err) { + return utils.sendError(err, res); + } + self.db.resumeSync(); + res.status(200).jsonp({ + walletId: walletId, + numberRemoved: numRecords + }); }); }); }; @@ -475,12 +492,15 @@ WalletService.prototype._endpointRemoveAllWallets = function() { var self = this; return function(req, res) { - self._removeAllWallets(function(err, numRecords) { - if(err) { - return utils.sendError(err, res); - } - res.status(200).jsonp({ - numberRemoved: numRecords + self.db.pauseSync(function() { + self._removeAllWallets(function(err, numRecords) { + if(err) { + return utils.sendError(err, res); + } + self.db.resumeSync(); + res.status(200).jsonp({ + numberRemoved: numRecords + }); }); }); }; @@ -491,17 +511,20 @@ WalletService.prototype._endpointGetAddresses = function() { return function(req, res) { var walletId = req.params.walletId; - self._getAddresses(walletId, function(err, addresses) { - if(err) { - return utils.sendError(err, res); - } + self.db.pauseSync(function() { + self._getAddresses(walletId, function(err, addresses) { + self.db.resumeSync(); + if(err) { + return utils.sendError(err, res); + } - if(!addresses) { - return res.status(404).send('Not found'); - } + if(!addresses) { + return res.status(404).send('Not found'); + } - res.status(200).jsonp({ - addresses: addresses + res.status(200).jsonp({ + addresses: addresses + }); }); }); }; @@ -610,9 +633,12 @@ WalletService.prototype._endpointPostAddresses = function() { var jobId = utils.generateJobId(); - self._importAddresses(walletId, addresses, jobId, self._jobCompletionCallback.bind(self)); + self.db.pauseSync(function() { - res.status(200).jsonp({jobId: jobId}); + self._importAddresses(walletId, addresses, jobId, self._jobCompletionCallback.bind(self)); + res.status(200).jsonp({jobId: jobId}); + + }); }; }; @@ -623,39 +649,44 @@ WalletService.prototype._endpointGetTransactions = function() { return function(req, res) { var walletId = req.params.walletId; - self._processStartEndOptions(req, function(err, heights) { + self.db.pauseSync(function() { + self._processStartEndOptions(req, function(err, heights) { - if(err) { - return utils.sendError(err, res); - } + if(err) { + return utils.sendError(err, res); + } - var options = { - start: heights[0] || 0, - end : heights[1] || 0xffffffff, - self: self, - walletId: walletId - }; + var options = { + start: heights[0] || 0, + end : heights[1] || 0xffffffff, + self: self, + walletId: walletId + }; - transform._transform = function(chunk, enc, callback) { + //txids are sent in and the actual tx's are found here + transform._transform = function(chunk, enc, callback) { - var txid = self._encoding.decodeWalletTransactionKey(chunk).txid; - self._getTransactionFromDb(options, txid, function(err, tx) { + var txid = self._encoding.decodeWalletTransactionKey(chunk).txid; + self._getTransactionFromDb(options, txid, function(err, tx) { - transform.push(utils.toJSONL(self._formatTransaction(tx))); + transform.push(utils.toJSONL(self._formatTransaction(tx))); + callback(); + + }); + + }; + + transform._flush = function (callback) { + self.db.resumeSync(); callback(); + } - }); + var encodingFn = self._encoding.encodeWalletTransactionKey.bind(self._encoding); + //creates a stream of txids that match the wallet's set of addresses + var stream = self.store.createKeyStream(self._getSearchParams(encodingFn, options)); - }; - - transform._flush = function (callback) { - callback(); - } - - var encodingFn = self._encoding.encodeWalletTransactionKey.bind(self._encoding); - var stream = self.store.createKeyStream(self._getSearchParams(encodingFn, options)); - - stream.pipe(transform).pipe(res); + stream.pipe(transform).pipe(res); + }); }); }; }; @@ -693,21 +724,27 @@ WalletService.prototype._endpointPutAddresses = function() { return utils.sendError(new Error('WalletId must be given.'), res); } - self._getAddresses(walletId, function(err, oldAddresses) { - if(err) { - return utils.sendError(err, res); - } + self.db.pauseSync(function() { - if(!oldAddresses) { - return res.status(404).send('Not found'); - } + self._getAddresses(walletId, function(err, oldAddresses) { - var addAddresses = _.without(newAddresses, oldAddresses); + if(err) { + return utils.sendError(err, res); + } - var jobId = utils.generateJobId(); - self._importAddresses(walletId, addAddresses, jobId, self._jobCompletionCallback.bind(self)); - res.status(200).jsonp({jobId: jobId}); + if(!oldAddresses) { + return res.status(404).send('Not found'); + } + + var addAddresses = _.without(newAddresses, oldAddresses); + + var jobId = utils.generateJobId(); + self._importAddresses(walletId, addAddresses, jobId, self._jobCompletionCallback.bind(self)); + res.status(200).jsonp({jobId: jobId}); + + }); }); + }; }; @@ -767,28 +804,6 @@ WalletService.prototype._getSearchParams = function(fn, options) { }; }; -WalletService.prototype._getTxidsFromDb = function(options, callback) { - - var self = this; - var encodingFn = self._encoding.encodeWalletTransactionKey.bind(self._encoding); - - var stream = self.store.createKeyStream(self._getSearchParams(encodingFn, options)); - - var streamErr; - stream.on('error', function(err) { - streamErr = err; - }); - - stream.on('data', function(data) { - txids.push(self._encoding.decodeWalletTransactionKey(data).txid); - }); - - stream.on('end', function() { - self._getTransactionsFromDb(txids, options, callback); - }); - -}; - WalletService.prototype._getTransactionFromDb = function(options, txid, callback) { var self = options.self; @@ -799,58 +814,32 @@ WalletService.prototype._getTransactionFromDb = function(options, txid, callback return callback(err); } - callback(null, tx); - - }); - -}; - -WalletService.prototype._getTransactionsFromMempool = function(options, callback) { - - var self = this; - - self._getAddresses(options.walletId, function(err, addresses) { - - if(err) { - return callback(err); + if (tx.__inputValues) { + return callback(null, tx); } - self.mempool.getTransactionsByAddresses(addresses, function(err, mempoolTxs) { + async.map(tx.inputs, function(input, next) { + + self.node.services.transaction.getTransaction(input.prevTxId, options, function(err, tx) { + + if(err) { + return next(err); + } + + next(null, tx.outputs[input.outputIndex].satoshis); + }); + + }, function(err, inputValues) { if(err) { return callback(err); } - if (mempoolTxs) { - mempoolTxs.forEach(function(tx) { - options.readable.push(utils.toJSONL(self._formatTransaction(tx))); - }); - } - - options.readable.push(null); - options.readable.pipe(options.response); - callback(); + tx.__inputValues = inputValues; + callback(null, tx); }); }); -}; - -WalletService.prototype._getTransactions = function(walletId, options, callback) { - - var self = this; - - var start = options.start || 0; - var end = options.end || 0xffffffff; - - var opts = { - start: start, - end: end, - walletId: walletId, - key: walletId + start + end, - readable: options.readable - }; - - return self._getTxidsFromDb(opts, callback); }; @@ -978,6 +967,9 @@ WalletService.prototype._isJobQueueReady = function() { }; WalletService.prototype._jobCompletionCallback = function(err, results) { + + this.db.resumeSync(); + log.info('Completed job: ', results.jobId); var jobId = results.jobId; @@ -1017,6 +1009,7 @@ WalletService.prototype._importAddresses = function(walletId, addresses, jobId, this._jobs.set(jobId, job); + self._getAddresses(walletId, function(err, oldAddresses) { if(err) { return callback(err, jobResults); @@ -1242,8 +1235,10 @@ WalletService.prototype._endpointGetInfo = function() { return function(req, res) { res.jsonp({ result: 'ok', - height: self.node.services.db.tip.__height, - hash: self.node.services.db.tip.hash + dbheight: self.node.services.db.tip.__height, + dbhash: self.node.services.db.tip.hash, + bitcoindheight: self.node.services.bitcoind.height, + bitcoindhash: self.node.services.bitcoind.tiphash }); }; }; diff --git a/regtest/utils.js b/regtest/utils.js index 296b7dc2..c7c3f219 100644 --- a/regtest/utils.js +++ b/regtest/utils.js @@ -88,7 +88,9 @@ utils.waitForBitcoreNode = function(opts, callback) { var errorFilter = function(err, res) { try { - if (JSON.parse(res).height === opts.blockHeight) { + var info = JSON.parse(res); + if (info.dbheight === opts.blockHeight && + info.bitcoindheight === opts.blockHeight) { return; } return res; diff --git a/regtest/wallet.js b/regtest/wallet.js index 9684f73d..4bec09d6 100644 --- a/regtest/wallet.js +++ b/regtest/wallet.js @@ -8,7 +8,7 @@ var path = require('path'); var utils = require('./utils'); var crypto = require('crypto'); -var debug = false; +var debug = true; var bitcoreDataDir = '/tmp/bitcore'; var bitcoinDataDir = '/tmp/bitcoin'; @@ -149,6 +149,7 @@ describe('Wallet Operations', function() { it('should get a list of transactions', function(done) { + //the wallet should be fully uploaded and indexed by the time this happens utils.sendTxs.call(utils, self.opts, function(err) { if(err) {