From 26704e52f727378c6ee8986c3c1ba87091cb4cfb Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Thu, 11 May 2017 10:48:55 -0400 Subject: [PATCH] wip --- lib/services/bitcoind/index.js | 15 +++++++++ lib/services/db/index.js | 56 +++++++++++-------------------- lib/services/db/sync.js | 4 +-- lib/services/transaction/index.js | 7 +++- lib/services/wallet-api/index.js | 16 ++++++--- 5 files changed, 53 insertions(+), 45 deletions(-) diff --git a/lib/services/bitcoind/index.js b/lib/services/bitcoind/index.js index 59fc7436..ed1c260a 100644 --- a/lib/services/bitcoind/index.js +++ b/lib/services/bitcoind/index.js @@ -2,6 +2,7 @@ var util = require('util'); var bitcore = require('bitcore-lib'); +var Transaction = bitcore.Transaction; var zmq = require('zmq'); var async = require('async'); var BitcoinRPC = require('bitcoind-rpc'); @@ -389,6 +390,20 @@ Bitcoin.prototype.getBlockHeader = function(blockArg, callback) { self._maybeGetBlockHash(blockArg, queryHeader); }; +Bitcoin.prototype.getTransaction = function(txid, callback) { + var self = this; + self._tryAllClients(function(client, done) { + //this won't work without a bitcoin node that has a tx index + self.client.getRawTransaction(txid.toString('hex'), 0, function(err, response) { + if (err) { + return done(self._wrapRPCError(err)); + } + var tx = new Transaction(response.result); + done(null, tx); + }); + }, callback); +}; + Bitcoin.prototype.getBlock = function(blockArg, callback) { var self = this; diff --git a/lib/services/db/index.js b/lib/services/db/index.js index 762b2fbe..54f01a5a 100644 --- a/lib/services/db/index.js +++ b/lib/services/db/index.js @@ -17,6 +17,7 @@ var Service = require('../../service'); var Sync = require('./sync'); var Reorg = require('./reorg'); var Block = bitcore.Block; +var utils = require('../../utils'); /* * @param {Object} options @@ -57,14 +58,13 @@ function DB(options) { this.retryInterval = 60000; - this.subscriptions = { - transaction: [], - block: [] - }; + this.subscriptions = {}; this._sync = new Sync(this.node, this); this.syncing = true; this.bitcoind = this.node.services.bitcoind; + this._operationsQueue = []; + this._lockTimes = []; } util.inherits(DB, Service); @@ -72,6 +72,7 @@ util.inherits(DB, Service); DB.dependencies = ['bitcoind']; DB.prototype.pauseSync = function(callback) { + this._lockTimes.push(process.hrtime()); if (this._sync.syncing) { this._sync.once('synced', function() { this._sync.paused = true; @@ -84,8 +85,14 @@ DB.prototype.pauseSync = function(callback) { }; DB.prototype.resumeSync = function() { - this._sync.paused = false; - this._sync.sync(); + var time = this._lockTimes.shift(); + if (this._lockTimes.length === 0) { + if (time) { + log.debug('sync lock held for: ' + utils.diffTime(time) + ' secs'); + } + this._sync.paused = false; + this._sync.sync(); + } }; DB.prototype._setDataPath = function() { @@ -256,19 +263,20 @@ DB.prototype.printTipInfo = function(prependedMessage) { DB.prototype.stop = function(callback) { var self = this; + self._stopping = true; async.whilst(function() { - return self.bitcoindSyncing; + return self._operationsQueue > 0; }, function(next) { setTimeout(next, 10); }, function() { - if (self._store) { - self._store.close(callback); - } + self.close(callback); }); }; DB.prototype.close = function(callback) { - this._store.close(callback); + if (this._store) { + this._store.close(callback); + } }; DB.prototype.getAPIMethods = function() { @@ -313,31 +321,7 @@ DB.prototype.loadTips = function(callback) { }; DB.prototype.getPublishEvents = function() { - return [ - { - name: 'db/transaction', - scope: this, - subscribe: this.subscribe.bind(this, 'transaction'), - unsubscribe: this.unsubscribe.bind(this, 'transaction') - }, - { - name: 'db/block', - scope: this, - subscribe: this.subscribe.bind(this, 'block'), - unsubscribe: this.unsubscribe.bind(this, 'block') - } - ]; -}; - -DB.prototype.subscribe = function(name, emitter) { - this.subscriptions[name].push(emitter); -}; - -DB.prototype.unsubscribe = function(name, emitter) { - var index = this.subscriptions[name].indexOf(emitter); - if (index > -1) { - this.subscriptions[name].splice(index, 1); - } + return []; }; DB.prototype.getConcurrentBlockOperations = function(block, add, callback) { diff --git a/lib/services/db/sync.js b/lib/services/db/sync.js index ee11c9c1..8b55b4ef 100644 --- a/lib/services/db/sync.js +++ b/lib/services/db/sync.js @@ -10,8 +10,6 @@ var Block = bitcore.Block; var ProgressBar = require('progress'); var index = require('../../index'); var log = index.log; -var green = '\u001b[42m \u001b[0m'; -var red = '\u001b[41m \u001b[0m'; function BlockStream(highWaterMark, db, sync) { Readable.call(this, {objectMode: true, highWaterMark: highWaterMark}); @@ -148,7 +146,7 @@ Sync.prototype._startSubscriptions = function() { self.subscribed = true; self.bus = self.node.openBus({remoteAddress: 'localhost'}); - self.bus.on('bitcoind/hashblock', function(hashBlockHex) { + self.bus.on('bitcoind/hashblock', function() { self.sync(); }); diff --git a/lib/services/transaction/index.js b/lib/services/transaction/index.js index 1bc47a0f..6a0c2d13 100644 --- a/lib/services/transaction/index.js +++ b/lib/services/transaction/index.js @@ -4,6 +4,7 @@ var async = require('async'); var BaseService = require('../../service'); var inherits = require('util').inherits; var Encoding = require('./encoding'); +var levelup = require('levelup'); /** * The Transaction Service builds upon the Database Service and the Bitcoin Service to add additional @@ -164,7 +165,9 @@ TransactionService.prototype.getTransaction = function(txid, options, callback) async.waterfall([ function(next) { self.node.services.db.store.get(key, function(err, buffer) { - if(err) { + if (err instanceof levelup.errors.NotFoundError) { + return next(null, false); + } else if (err) { return callback(err); } var tx = self.encoding.decodeTransactionValue(buffer); @@ -186,6 +189,8 @@ TransactionService.prototype.getTransaction = function(txid, options, callback) } self._getMissingInputValues(tx, next); }); + }, function(tx, next) { + self.node.services.bitcoind.getTransaction(txid, next); }], callback); }; diff --git a/lib/services/wallet-api/index.js b/lib/services/wallet-api/index.js index 67619a13..ecf9f1ae 100644 --- a/lib/services/wallet-api/index.js +++ b/lib/services/wallet-api/index.js @@ -305,7 +305,7 @@ WalletService.prototype._processSerialInput = function(opts, tx, input, callback walletId = self._addressMap[pubKey]; if (!walletId) { - return next(); + return next(null, []); } } @@ -648,7 +648,6 @@ WalletService.prototype._endpointGetTransactions = function() { return function(req, res) { -console.log('endpoint get transactions'); var walletId = req.params.walletId; self.db.pauseSync(function() { @@ -671,15 +670,23 @@ console.log('endpoint get transactions'); var txid = self._encoding.decodeWalletTransactionKey(chunk).txid.toString('hex'); self._getTransactionFromDb(options, txid, function(err, tx) { - transform.push(utils.toJSONL(self._formatTransaction(tx))); + if(err) { + log.error(err); + transform.push(null); + return; + } + + var formattedTx = utils.toJSONL(self._formatTransaction(tx)); + transform.push(formattedTx); callback(); }); }; - transform._flush = function (callback) { + transform._flush = function(callback) { self.db.resumeSync(); + transform.push(null); callback(); }; @@ -1235,7 +1242,6 @@ WalletService.prototype._endpointJobStatus = function() { WalletService.prototype._endpointGetInfo = function() { var self = this; return function(req, res) { -console.log('info called'); res.jsonp({ result: 'ok', dbheight: self.node.services.db.tip.__height,