This commit is contained in:
Chris Kleeschulte 2017-05-11 10:48:55 -04:00
parent 94445c2073
commit 26704e52f7
5 changed files with 53 additions and 45 deletions

View File

@ -2,6 +2,7 @@
var util = require('util'); var util = require('util');
var bitcore = require('bitcore-lib'); var bitcore = require('bitcore-lib');
var Transaction = bitcore.Transaction;
var zmq = require('zmq'); var zmq = require('zmq');
var async = require('async'); var async = require('async');
var BitcoinRPC = require('bitcoind-rpc'); var BitcoinRPC = require('bitcoind-rpc');
@ -389,6 +390,20 @@ Bitcoin.prototype.getBlockHeader = function(blockArg, callback) {
self._maybeGetBlockHash(blockArg, queryHeader); self._maybeGetBlockHash(blockArg, queryHeader);
}; };
Bitcoin.prototype.getTransaction = function(txid, callback) {
var self = this;
self._tryAllClients(function(client, done) {
//this won't work without a bitcoin node that has a tx index
self.client.getRawTransaction(txid.toString('hex'), 0, function(err, response) {
if (err) {
return done(self._wrapRPCError(err));
}
var tx = new Transaction(response.result);
done(null, tx);
});
}, callback);
};
Bitcoin.prototype.getBlock = function(blockArg, callback) { Bitcoin.prototype.getBlock = function(blockArg, callback) {
var self = this; var self = this;

View File

@ -17,6 +17,7 @@ var Service = require('../../service');
var Sync = require('./sync'); var Sync = require('./sync');
var Reorg = require('./reorg'); var Reorg = require('./reorg');
var Block = bitcore.Block; var Block = bitcore.Block;
var utils = require('../../utils');
/* /*
* @param {Object} options * @param {Object} options
@ -57,14 +58,13 @@ function DB(options) {
this.retryInterval = 60000; this.retryInterval = 60000;
this.subscriptions = { this.subscriptions = {};
transaction: [],
block: []
};
this._sync = new Sync(this.node, this); this._sync = new Sync(this.node, this);
this.syncing = true; this.syncing = true;
this.bitcoind = this.node.services.bitcoind; this.bitcoind = this.node.services.bitcoind;
this._operationsQueue = [];
this._lockTimes = [];
} }
util.inherits(DB, Service); util.inherits(DB, Service);
@ -72,6 +72,7 @@ util.inherits(DB, Service);
DB.dependencies = ['bitcoind']; DB.dependencies = ['bitcoind'];
DB.prototype.pauseSync = function(callback) { DB.prototype.pauseSync = function(callback) {
this._lockTimes.push(process.hrtime());
if (this._sync.syncing) { if (this._sync.syncing) {
this._sync.once('synced', function() { this._sync.once('synced', function() {
this._sync.paused = true; this._sync.paused = true;
@ -84,8 +85,14 @@ DB.prototype.pauseSync = function(callback) {
}; };
DB.prototype.resumeSync = function() { DB.prototype.resumeSync = function() {
this._sync.paused = false; var time = this._lockTimes.shift();
this._sync.sync(); if (this._lockTimes.length === 0) {
if (time) {
log.debug('sync lock held for: ' + utils.diffTime(time) + ' secs');
}
this._sync.paused = false;
this._sync.sync();
}
}; };
DB.prototype._setDataPath = function() { DB.prototype._setDataPath = function() {
@ -256,19 +263,20 @@ DB.prototype.printTipInfo = function(prependedMessage) {
DB.prototype.stop = function(callback) { DB.prototype.stop = function(callback) {
var self = this; var self = this;
self._stopping = true;
async.whilst(function() { async.whilst(function() {
return self.bitcoindSyncing; return self._operationsQueue > 0;
}, function(next) { }, function(next) {
setTimeout(next, 10); setTimeout(next, 10);
}, function() { }, function() {
if (self._store) { self.close(callback);
self._store.close(callback);
}
}); });
}; };
DB.prototype.close = function(callback) { DB.prototype.close = function(callback) {
this._store.close(callback); if (this._store) {
this._store.close(callback);
}
}; };
DB.prototype.getAPIMethods = function() { DB.prototype.getAPIMethods = function() {
@ -313,31 +321,7 @@ DB.prototype.loadTips = function(callback) {
}; };
DB.prototype.getPublishEvents = function() { DB.prototype.getPublishEvents = function() {
return [ return [];
{
name: 'db/transaction',
scope: this,
subscribe: this.subscribe.bind(this, 'transaction'),
unsubscribe: this.unsubscribe.bind(this, 'transaction')
},
{
name: 'db/block',
scope: this,
subscribe: this.subscribe.bind(this, 'block'),
unsubscribe: this.unsubscribe.bind(this, 'block')
}
];
};
DB.prototype.subscribe = function(name, emitter) {
this.subscriptions[name].push(emitter);
};
DB.prototype.unsubscribe = function(name, emitter) {
var index = this.subscriptions[name].indexOf(emitter);
if (index > -1) {
this.subscriptions[name].splice(index, 1);
}
}; };
DB.prototype.getConcurrentBlockOperations = function(block, add, callback) { DB.prototype.getConcurrentBlockOperations = function(block, add, callback) {

View File

@ -10,8 +10,6 @@ var Block = bitcore.Block;
var ProgressBar = require('progress'); var ProgressBar = require('progress');
var index = require('../../index'); var index = require('../../index');
var log = index.log; var log = index.log;
var green = '\u001b[42m \u001b[0m';
var red = '\u001b[41m \u001b[0m';
function BlockStream(highWaterMark, db, sync) { function BlockStream(highWaterMark, db, sync) {
Readable.call(this, {objectMode: true, highWaterMark: highWaterMark}); Readable.call(this, {objectMode: true, highWaterMark: highWaterMark});
@ -148,7 +146,7 @@ Sync.prototype._startSubscriptions = function() {
self.subscribed = true; self.subscribed = true;
self.bus = self.node.openBus({remoteAddress: 'localhost'}); self.bus = self.node.openBus({remoteAddress: 'localhost'});
self.bus.on('bitcoind/hashblock', function(hashBlockHex) { self.bus.on('bitcoind/hashblock', function() {
self.sync(); self.sync();
}); });

View File

@ -4,6 +4,7 @@ var async = require('async');
var BaseService = require('../../service'); var BaseService = require('../../service');
var inherits = require('util').inherits; var inherits = require('util').inherits;
var Encoding = require('./encoding'); var Encoding = require('./encoding');
var levelup = require('levelup');
/** /**
* The Transaction Service builds upon the Database Service and the Bitcoin Service to add additional * The Transaction Service builds upon the Database Service and the Bitcoin Service to add additional
@ -164,7 +165,9 @@ TransactionService.prototype.getTransaction = function(txid, options, callback)
async.waterfall([ async.waterfall([
function(next) { function(next) {
self.node.services.db.store.get(key, function(err, buffer) { self.node.services.db.store.get(key, function(err, buffer) {
if(err) { if (err instanceof levelup.errors.NotFoundError) {
return next(null, false);
} else if (err) {
return callback(err); return callback(err);
} }
var tx = self.encoding.decodeTransactionValue(buffer); var tx = self.encoding.decodeTransactionValue(buffer);
@ -186,6 +189,8 @@ TransactionService.prototype.getTransaction = function(txid, options, callback)
} }
self._getMissingInputValues(tx, next); self._getMissingInputValues(tx, next);
}); });
}, function(tx, next) {
self.node.services.bitcoind.getTransaction(txid, next);
}], callback); }], callback);
}; };

View File

@ -305,7 +305,7 @@ WalletService.prototype._processSerialInput = function(opts, tx, input, callback
walletId = self._addressMap[pubKey]; walletId = self._addressMap[pubKey];
if (!walletId) { if (!walletId) {
return next(); return next(null, []);
} }
} }
@ -648,7 +648,6 @@ WalletService.prototype._endpointGetTransactions = function() {
return function(req, res) { return function(req, res) {
console.log('endpoint get transactions');
var walletId = req.params.walletId; var walletId = req.params.walletId;
self.db.pauseSync(function() { self.db.pauseSync(function() {
@ -671,15 +670,23 @@ console.log('endpoint get transactions');
var txid = self._encoding.decodeWalletTransactionKey(chunk).txid.toString('hex'); var txid = self._encoding.decodeWalletTransactionKey(chunk).txid.toString('hex');
self._getTransactionFromDb(options, txid, function(err, tx) { self._getTransactionFromDb(options, txid, function(err, tx) {
transform.push(utils.toJSONL(self._formatTransaction(tx))); if(err) {
log.error(err);
transform.push(null);
return;
}
var formattedTx = utils.toJSONL(self._formatTransaction(tx));
transform.push(formattedTx);
callback(); callback();
}); });
}; };
transform._flush = function (callback) { transform._flush = function(callback) {
self.db.resumeSync(); self.db.resumeSync();
transform.push(null);
callback(); callback();
}; };
@ -1235,7 +1242,6 @@ WalletService.prototype._endpointJobStatus = function() {
WalletService.prototype._endpointGetInfo = function() { WalletService.prototype._endpointGetInfo = function() {
var self = this; var self = this;
return function(req, res) { return function(req, res) {
console.log('info called');
res.jsonp({ res.jsonp({
result: 'ok', result: 'ok',
dbheight: self.node.services.db.tip.__height, dbheight: self.node.services.db.tip.__height,