Fixed issue with missing inputValues.

This commit is contained in:
Chris Kleeschulte 2017-05-09 16:36:09 -04:00
parent 57a7155265
commit f452d63d75
6 changed files with 179 additions and 205 deletions

View File

@ -202,6 +202,7 @@ Bitcoin.prototype._zmqBlockHandler = function(message) {
self._hashBlockCache.set(hashBlockHex);
self.tiphash = hashBlockHex;
self.height++;
self.emit('block', message);

View File

@ -71,6 +71,23 @@ util.inherits(DB, Service);
DB.dependencies = ['bitcoind'];
DB.prototype.pauseSync = function(callback) {
if (this._sync.syncing) {
this._sync.once('synced', function() {
this._sync.paused = true;
callback();
});
} else {
this._sync.paused = true;
setImmediate(callback);
}
};
DB.prototype.resumeSync = function() {
this._sync.paused = false;
this._sync.sync();
};
DB.prototype._setDataPath = function() {
$.checkState(this.node.datadir, 'Node is expected to have a "datadir" property');
if (this.node.network === Networks.livenet) {
@ -132,7 +149,15 @@ DB.prototype.start = function(callback) {
}
self.genesis = Block.fromBuffer(self.bitcoind.genesisBuffer);
self.store = levelup(self.dataPath, { db: self.levelupStore, keyEncoding: 'binary', valueEncoding: 'binary'});
self._store = levelup(self.dataPath, { db: self.levelupStore, keyEncoding: 'binary', valueEncoding: 'binary'});
self.store = {
get: self._store.get.bind(self._store),
put: self._store.put.bind(self._store),
batch: self._store.batch.bind(self._store),
createReadStream: self._store.createReadStream.bind(self._store),
createKeyStream: self._store.createKeyStream.bind(self._store),
};
self.node.once('ready', function() {
@ -236,14 +261,14 @@ DB.prototype.stop = function(callback) {
}, function(next) {
setTimeout(next, 10);
}, function() {
if (self.store) {
self.store.close(callback);
if (self._store) {
self._store.close(callback);
}
});
};
DB.prototype.close = function(callback) {
this.store.close(callback);
this._store.close(callback);
};
DB.prototype.getAPIMethods = function() {
@ -315,60 +340,6 @@ DB.prototype.unsubscribe = function(name, emitter) {
}
};
DB.prototype.connectBlock = function(block, callback) {
var self = this;
log.info('DB handling new chain block');
var operations = [];
self.getConcurrentBlockOperations(block, true, function(err, ops) {
if(err) {
return callback(err);
}
operations = ops;
self.getSerialBlockOperations(block, true, function(err, ops) {
if(err) {
return callback(err);
}
operations = operations.concat(ops);
operations.push(self.getTipOperation(block, true));
operations.push(self.getConcurrentTipOperation(block, true));
self.store.batch(operations, callback);
});
});
};
DB.prototype.disconnectBlock = function(block, callback) {
var self = this;
log.debug('DB removing chain block');
var operations = [];
self.getConcurrentBlockOperations(block, false, function(err, ops) {
if(err) {
return callback(err);
}
operations = ops;
self.getSerialBlockOperations(block, false, function(err, ops) {
if(err) {
return callback(err);
}
operations = operations.concat(ops);
operations.push(self.getTipOperation(block, false));
operations.push(self.getConcurrentTipOperation(block, false));
self.store.batch(operations, callback);
});
});
};
DB.prototype.getConcurrentBlockOperations = function(block, add, callback) {
var operations = [];

View File

@ -22,7 +22,6 @@ function BlockStream(highWaterMark, db, sync) {
this.lastEmittedHash = this.dbTip.hash;
this.queue = [];
this.processing = false;
this.syncing = true;
this.bitcoind = this.db.bitcoind;
}
@ -67,6 +66,7 @@ function Sync(node, db) {
this.node = node;
this.db = db;
this.syncing = false;
this.paused = false; //we can't sync while one of our indexes is reading/writing separate from us
this.highWaterMark = 10;
this.progressBar = null;
this.lastReportedBlock = 0;
@ -77,7 +77,7 @@ inherits(Sync, EventEmitter);
Sync.prototype.sync = function() {
var self = this;
if(this.syncing) {
if(this.syncing || this.paused) {
return;
}
@ -135,22 +135,26 @@ Sync.prototype._onFinish = function() {
}
self._startSubscriptions();
log.info('Sync complete');
self.emit('synced');
};
Sync.prototype._startSubscriptions = function() {
var self = this;
if (!self.subscribed) {
self.subscribed = true;
self.bus = self.node.openBus({remoteAddress: 'localhost'});
self.bus.on('bitcoind/hashblock', function(hashBlockHex) {
self.sync();
});
self.bus.subscribe('bitcoind/hashblock');
}
};
Sync.prototype.reportStatus = function() {

View File

@ -49,7 +49,8 @@ WalletService.prototype.getAPIMethods = function() {
WalletService.prototype.start = function(callback) {
var self = this;
self.store = self.node.services.db.store;
self.db = self.node.services.db;
self.store = self.db.store;
self.node.services.db.getPrefix(self.name, function(err, servicePrefix) {
@ -126,6 +127,7 @@ WalletService.prototype.concurrentBlockHandler = function(block, connectBlock, c
WalletService.prototype._blockHandler = function(opts, callback) {
var self = this;
if (!self._checkAddresses()) {
return setImmediate(function() {
callback(null, []);
@ -133,13 +135,18 @@ WalletService.prototype._blockHandler = function(opts, callback) {
}
async.mapSeries(opts.block.transactions, function(tx, next) {
self._processTransaction(opts, tx, next);
}, function(err, operations) {
if(err) {
return callback(err);
}
var ret = _.compact(_.flattenDeep(operations));
callback(null, ret);
});
};
@ -410,19 +417,23 @@ WalletService.prototype._loadAllBalances = function(callback) {
WalletService.prototype._endpointUTXOs = function() {
var self = this;
return function(req, res) {
req.setTimeout(600000);
var walletId = req.params.walletId;
var queryMempool = req.query.queryMempool !== false;
var height = self.node.services.db.tip.__height;
var options = {
queryMempool: queryMempool
};
self._getUtxos(walletId, options, function(err, utxos) {
if(err) {
return utils.sendError(err, res);
}
res.status(200).jsonp({
utxos: utxos,
height: height
self.db.pauseSync(function() {
self._getUtxos(walletId, options, function(err, utxos) {
if(err) {
return utils.sendError(err, res);
}
self.db.resumeSync();
res.status(200).jsonp({
utxos: utxos,
height: height
});
});
});
};
@ -441,14 +452,17 @@ WalletService.prototype._endpointGetBalance= function() {
byAddress: byAddress
};
self._getBalance(walletId, options, function(err, result) {
if(err) {
return utils.sendError(err, res);
}
res.status(200).jsonp({
satoshis: result,
height: self.node.services.db.tip.__height,
hash: self.node.services.db.tip.hash
self.db.pauseSync(function() {
self._getBalance(walletId, options, function(err, result) {
if(err) {
return utils.sendError(err, res);
}
self.db.resumeSync();
res.status(200).jsonp({
satoshis: result,
height: self.node.services.db.tip.__height,
hash: self.node.services.db.tip.hash
});
});
});
};
@ -459,13 +473,16 @@ WalletService.prototype._endpointRemoveWallet = function() {
return function(req, res) {
var walletId = req.params.walletId;
self._removeWallet(walletId, function(err, numRecords) {
if(err) {
return utils.sendError(err, res);
}
res.status(200).jsonp({
walletId: walletId,
numberRemoved: numRecords
self.db.pauseSync(function() {
self._removeWallet(walletId, function(err, numRecords) {
if(err) {
return utils.sendError(err, res);
}
self.db.resumeSync();
res.status(200).jsonp({
walletId: walletId,
numberRemoved: numRecords
});
});
});
};
@ -475,12 +492,15 @@ WalletService.prototype._endpointRemoveAllWallets = function() {
var self = this;
return function(req, res) {
self._removeAllWallets(function(err, numRecords) {
if(err) {
return utils.sendError(err, res);
}
res.status(200).jsonp({
numberRemoved: numRecords
self.db.pauseSync(function() {
self._removeAllWallets(function(err, numRecords) {
if(err) {
return utils.sendError(err, res);
}
self.db.resumeSync();
res.status(200).jsonp({
numberRemoved: numRecords
});
});
});
};
@ -491,17 +511,20 @@ WalletService.prototype._endpointGetAddresses = function() {
return function(req, res) {
var walletId = req.params.walletId;
self._getAddresses(walletId, function(err, addresses) {
if(err) {
return utils.sendError(err, res);
}
self.db.pauseSync(function() {
self._getAddresses(walletId, function(err, addresses) {
self.db.resumeSync();
if(err) {
return utils.sendError(err, res);
}
if(!addresses) {
return res.status(404).send('Not found');
}
if(!addresses) {
return res.status(404).send('Not found');
}
res.status(200).jsonp({
addresses: addresses
res.status(200).jsonp({
addresses: addresses
});
});
});
};
@ -610,9 +633,12 @@ WalletService.prototype._endpointPostAddresses = function() {
var jobId = utils.generateJobId();
self._importAddresses(walletId, addresses, jobId, self._jobCompletionCallback.bind(self));
self.db.pauseSync(function() {
res.status(200).jsonp({jobId: jobId});
self._importAddresses(walletId, addresses, jobId, self._jobCompletionCallback.bind(self));
res.status(200).jsonp({jobId: jobId});
});
};
};
@ -623,39 +649,44 @@ WalletService.prototype._endpointGetTransactions = function() {
return function(req, res) {
var walletId = req.params.walletId;
self._processStartEndOptions(req, function(err, heights) {
self.db.pauseSync(function() {
self._processStartEndOptions(req, function(err, heights) {
if(err) {
return utils.sendError(err, res);
}
if(err) {
return utils.sendError(err, res);
}
var options = {
start: heights[0] || 0,
end : heights[1] || 0xffffffff,
self: self,
walletId: walletId
};
var options = {
start: heights[0] || 0,
end : heights[1] || 0xffffffff,
self: self,
walletId: walletId
};
transform._transform = function(chunk, enc, callback) {
//txids are sent in and the actual tx's are found here
transform._transform = function(chunk, enc, callback) {
var txid = self._encoding.decodeWalletTransactionKey(chunk).txid;
self._getTransactionFromDb(options, txid, function(err, tx) {
var txid = self._encoding.decodeWalletTransactionKey(chunk).txid;
self._getTransactionFromDb(options, txid, function(err, tx) {
transform.push(utils.toJSONL(self._formatTransaction(tx)));
transform.push(utils.toJSONL(self._formatTransaction(tx)));
callback();
});
};
transform._flush = function (callback) {
self.db.resumeSync();
callback();
}
});
var encodingFn = self._encoding.encodeWalletTransactionKey.bind(self._encoding);
//creates a stream of txids that match the wallet's set of addresses
var stream = self.store.createKeyStream(self._getSearchParams(encodingFn, options));
};
transform._flush = function (callback) {
callback();
}
var encodingFn = self._encoding.encodeWalletTransactionKey.bind(self._encoding);
var stream = self.store.createKeyStream(self._getSearchParams(encodingFn, options));
stream.pipe(transform).pipe(res);
stream.pipe(transform).pipe(res);
});
});
};
};
@ -693,21 +724,27 @@ WalletService.prototype._endpointPutAddresses = function() {
return utils.sendError(new Error('WalletId must be given.'), res);
}
self._getAddresses(walletId, function(err, oldAddresses) {
if(err) {
return utils.sendError(err, res);
}
self.db.pauseSync(function() {
if(!oldAddresses) {
return res.status(404).send('Not found');
}
self._getAddresses(walletId, function(err, oldAddresses) {
var addAddresses = _.without(newAddresses, oldAddresses);
if(err) {
return utils.sendError(err, res);
}
var jobId = utils.generateJobId();
self._importAddresses(walletId, addAddresses, jobId, self._jobCompletionCallback.bind(self));
res.status(200).jsonp({jobId: jobId});
if(!oldAddresses) {
return res.status(404).send('Not found');
}
var addAddresses = _.without(newAddresses, oldAddresses);
var jobId = utils.generateJobId();
self._importAddresses(walletId, addAddresses, jobId, self._jobCompletionCallback.bind(self));
res.status(200).jsonp({jobId: jobId});
});
});
};
};
@ -767,28 +804,6 @@ WalletService.prototype._getSearchParams = function(fn, options) {
};
};
WalletService.prototype._getTxidsFromDb = function(options, callback) {
var self = this;
var encodingFn = self._encoding.encodeWalletTransactionKey.bind(self._encoding);
var stream = self.store.createKeyStream(self._getSearchParams(encodingFn, options));
var streamErr;
stream.on('error', function(err) {
streamErr = err;
});
stream.on('data', function(data) {
txids.push(self._encoding.decodeWalletTransactionKey(data).txid);
});
stream.on('end', function() {
self._getTransactionsFromDb(txids, options, callback);
});
};
WalletService.prototype._getTransactionFromDb = function(options, txid, callback) {
var self = options.self;
@ -799,58 +814,32 @@ WalletService.prototype._getTransactionFromDb = function(options, txid, callback
return callback(err);
}
callback(null, tx);
});
};
WalletService.prototype._getTransactionsFromMempool = function(options, callback) {
var self = this;
self._getAddresses(options.walletId, function(err, addresses) {
if(err) {
return callback(err);
if (tx.__inputValues) {
return callback(null, tx);
}
self.mempool.getTransactionsByAddresses(addresses, function(err, mempoolTxs) {
async.map(tx.inputs, function(input, next) {
self.node.services.transaction.getTransaction(input.prevTxId, options, function(err, tx) {
if(err) {
return next(err);
}
next(null, tx.outputs[input.outputIndex].satoshis);
});
}, function(err, inputValues) {
if(err) {
return callback(err);
}
if (mempoolTxs) {
mempoolTxs.forEach(function(tx) {
options.readable.push(utils.toJSONL(self._formatTransaction(tx)));
});
}
options.readable.push(null);
options.readable.pipe(options.response);
callback();
tx.__inputValues = inputValues;
callback(null, tx);
});
});
};
WalletService.prototype._getTransactions = function(walletId, options, callback) {
var self = this;
var start = options.start || 0;
var end = options.end || 0xffffffff;
var opts = {
start: start,
end: end,
walletId: walletId,
key: walletId + start + end,
readable: options.readable
};
return self._getTxidsFromDb(opts, callback);
};
@ -978,6 +967,9 @@ WalletService.prototype._isJobQueueReady = function() {
};
WalletService.prototype._jobCompletionCallback = function(err, results) {
this.db.resumeSync();
log.info('Completed job: ', results.jobId);
var jobId = results.jobId;
@ -1017,6 +1009,7 @@ WalletService.prototype._importAddresses = function(walletId, addresses, jobId,
this._jobs.set(jobId, job);
self._getAddresses(walletId, function(err, oldAddresses) {
if(err) {
return callback(err, jobResults);
@ -1242,8 +1235,10 @@ WalletService.prototype._endpointGetInfo = function() {
return function(req, res) {
res.jsonp({
result: 'ok',
height: self.node.services.db.tip.__height,
hash: self.node.services.db.tip.hash
dbheight: self.node.services.db.tip.__height,
dbhash: self.node.services.db.tip.hash,
bitcoindheight: self.node.services.bitcoind.height,
bitcoindhash: self.node.services.bitcoind.tiphash
});
};
};

View File

@ -88,7 +88,9 @@ utils.waitForBitcoreNode = function(opts, callback) {
var errorFilter = function(err, res) {
try {
if (JSON.parse(res).height === opts.blockHeight) {
var info = JSON.parse(res);
if (info.dbheight === opts.blockHeight &&
info.bitcoindheight === opts.blockHeight) {
return;
}
return res;

View File

@ -8,7 +8,7 @@ var path = require('path');
var utils = require('./utils');
var crypto = require('crypto');
var debug = false;
var debug = true;
var bitcoreDataDir = '/tmp/bitcore';
var bitcoinDataDir = '/tmp/bitcoin';
@ -149,6 +149,7 @@ describe('Wallet Operations', function() {
it('should get a list of transactions', function(done) {
//the wallet should be fully uploaded and indexed by the time this happens
utils.sendTxs.call(utils, self.opts, function(err) {
if(err) {