From bb73bac027720cdd7ae140b2b0b9e87c5df5a17c Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Wed, 17 May 2017 18:02:00 +0000 Subject: [PATCH] Added more logging. Services should call the db service for db operations. --- lib/services/address/index.js | 20 ++++---- lib/services/db/index.js | 85 ++++++++++++++++++++++++------- lib/services/db/reorg.js | 8 +-- lib/services/db/sync.js | 6 +-- lib/services/mempool/index.js | 8 +-- lib/services/timestamp/index.js | 6 +-- lib/services/transaction/index.js | 4 +- lib/services/wallet-api/index.js | 63 ++++++++++++++--------- 8 files changed, 132 insertions(+), 68 deletions(-) diff --git a/lib/services/address/index.js b/lib/services/address/index.js index 9da49bb5..8370fdbe 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -47,8 +47,8 @@ AddressService.dependencies = [ AddressService.prototype.start = function(callback) { var self = this; - this.store = this.node.services.db.store; - this.node.services.db.getPrefix(this.name, function(err, prefix) { + this.db = this.node.services.db; + this.db.getPrefix(this.name, function(err, prefix) { if(err) { return callback(err); } @@ -513,7 +513,7 @@ AddressService.prototype.getInputForOutput = function(txid, outputIndex, options valueEncoding: 'binary', keyEncoding: 'binary' }; - this.node.services.db.store.get(key, dbOptions, function(err, buffer) { + this.db.get(key, dbOptions, function(err, buffer) { if (err instanceof levelup.errors.NotFoundError) { return callback(null, false); } else if (err) { @@ -572,7 +572,7 @@ AddressService.prototype.createInputsDBStream = function(addressStr, options) { var adjustedStart = options.start + 1; startBuffer.writeUInt32BE(adjustedStart, 0); - stream = this.node.services.db.store.createReadStream({ + stream = this.db.createReadStream({ gt: Buffer.concat([ constants.PREFIXES.SPENTS, hashBuffer, @@ -592,7 +592,7 @@ AddressService.prototype.createInputsDBStream = function(addressStr, options) { }); } else { var allKey = Buffer.concat([constants.PREFIXES.SPENTS, hashBuffer, hashTypeBuffer]); - stream = this.node.services.db.store.createReadStream({ + stream = this.db.createReadStream({ gt: Buffer.concat([allKey, constants.SPACER_HEIGHT_MIN]), lt: Buffer.concat([allKey, constants.SPACER_HEIGHT_MAX]), valueEncoding: 'binary', @@ -786,7 +786,7 @@ AddressService.prototype.createOutputsDBStream = function(addressStr, options) { var startAdjusted = options.start + 1; startBuffer.writeUInt32BE(startAdjusted, 0); - stream = this.node.services.db.store.createReadStream({ + stream = this.db.createReadStream({ gt: Buffer.concat([ constants.PREFIXES.OUTPUTS, hashBuffer, @@ -806,7 +806,7 @@ AddressService.prototype.createOutputsDBStream = function(addressStr, options) { }); } else { var allKey = Buffer.concat([constants.PREFIXES.OUTPUTS, hashBuffer, hashTypeBuffer]); - stream = this.node.services.db.store.createReadStream({ + stream = this.db.createReadStream({ gt: Buffer.concat([allKey, constants.SPACER_HEIGHT_MIN]), lt: Buffer.concat([allKey, constants.SPACER_HEIGHT_MAX]), valueEncoding: 'binary', @@ -988,7 +988,7 @@ AddressService.prototype.getUtxosForAddress = function(address, queryMempool, ca var self = this; - var stream = self.store.createReadStream({ + var stream = self.db.createReadStream({ gte: self._encoding.encodeUtxoIndexKey(address), lt: self._encoding.encodeUtxoIndexKey(utils.getTerminalKey(new Buffer(address))) }); @@ -1130,7 +1130,7 @@ AddressService.prototype.getAddressTxids = function(address, options, callback) var start = self._encoding.encodeAddressIndexKey(address, opts.start, opts.txid); var end = self._encoding.encodeAddressIndexKey(address, opts.end, opts.txid); - var stream = self.store.createKeyStream({ + var stream = self.db.createKeyStream({ gte: start, lt: end }); @@ -1162,7 +1162,7 @@ AddressService.prototype.getAddressTxidsWithHeights = function(address, options, var start = self._encoding.encodeAddressIndexKey(address, opts.start || 0); //the start and end must be the same length var end = Buffer.concat([ start.slice(0, -36), new Buffer((opts.end || 'ffffffff'), 'hex') ]); - var stream = self.store.createKeyStream({ + var stream = self.db.createKeyStream({ gte: start, lt: end }); diff --git a/lib/services/db/index.js b/lib/services/db/index.js index 7d6c4919..454aef11 100644 --- a/lib/services/db/index.js +++ b/lib/services/db/index.js @@ -115,13 +115,13 @@ DB.prototype._setDataPath = function() { DB.prototype._checkVersion = function(callback) { var self = this; - self.store.get(self.dbPrefix + 'tip', self.dbOptions, function(err) { + self.get(self.dbPrefix + 'tip', self.dbOptions, function(err) { if (err instanceof levelup.errors.NotFoundError) { return callback(); } else if (err) { return callback(err); } - self.store.get(self.dbPrefix + 'version', self.dbOptions, function(err, buffer) { + self.get(self.dbPrefix + 'version', self.dbOptions, function(err, buffer) { var version; if (err instanceof levelup.errors.NotFoundError) { version = 1; @@ -147,7 +147,7 @@ DB.prototype._checkVersion = function(callback) { DB.prototype._setVersion = function(callback) { var versionBuffer = new Buffer(new Array(4)); versionBuffer.writeUInt32BE(this.version); - this.store.put(this.dbPrefix + 'version', versionBuffer, callback); + this._store.put(this.dbPrefix + 'version', versionBuffer, callback); }; DB.prototype.start = function(callback) { @@ -159,14 +159,6 @@ DB.prototype.start = function(callback) { self._store = levelup(self.dataPath, { db: self.levelupStore, keyEncoding: 'binary', valueEncoding: 'binary'}); - self.store = { - get: self._store.get.bind(self._store), - put: self._store.put.bind(self._store), - batch: self._store.batch.bind(self._store), - createReadStream: self._store.createReadStream.bind(self._store), - createKeyStream: self._store.createKeyStream.bind(self._store), - }; - self.node.once('ready', function() { self.genesis = Block.fromBuffer(self.bitcoind.genesisBuffer); @@ -189,6 +181,63 @@ DB.prototype.start = function(callback) { }; +DB.prototype.get = function(key, options, callback) { + var cb = callback; + var opts = options; + if (typeof callback !== 'function') { + cb = options; + opts = {}; + } + if (!this._stopping) { + this._store.get(key, opts, cb); + } else { + setImmediate(cb); + } +}; + +DB.prototype.put = function(key, value, options, callback) { + var cb = callback; + var opts = options; + if (typeof callback !== 'function') { + cb = options; + opts = {}; + } + if (!this._stopping) { + this._store.put(key, value, opts, cb); + } else { + setImmediate(cb); + } +}; + +DB.prototype.batch = function(ops, options, callback) { + var cb = callback; + var opts = options; + if (typeof callback !== 'function') { + cb = options; + opts = {}; + } + if (!this._stopping) { + this._store.batch(ops, opts, cb); + } else { + setImmediate(cb); + } +}; + +DB.prototype.createReadStream = function(op) { + var stream; + if (!this._stopping) { + stream = this._store.createReadStream(op) + } + return stream; +}; + +DB.prototype.createKeyStream = function(op) { + var stream; + if (!this._stopping) { + stream = this._store.createKeyStream(op) + } + return stream; +}; DB.prototype.detectReorg = function(blocks) { @@ -270,14 +319,14 @@ DB.prototype.stop = function(callback) { async.whilst(function() { return self._operationsQueue > 0; }, function(next) { - setTimeout(next, 10); + setTimeout(next, 3000); }, function() { self.close(callback); }); }; DB.prototype.close = function(callback) { - if (this._store) { + if (this._store && this._store.isOpen()) { this._store.close(callback); } }; @@ -294,7 +343,7 @@ DB.prototype.loadTips = function(callback) { async.each(tipStrings, function(tip, next) { - self.store.get(self.dbPrefix + tip, self.dbOptions, function(err, tipData) { + self._store.get(self.dbPrefix + tip, self.dbOptions, function(err, tipData) { if(err && !(err instanceof levelup.errors.NotFoundError)) { return next(err); @@ -443,7 +492,7 @@ DB.prototype.getPrefix = function(service, callback) { var self = this; function getPrefix(next) { - self.store.get(self.dbPrefix + 'prefix-' + service, function(err, buffer) { + self.get(self.dbPrefix + 'prefix-' + service, function(err, buffer) { if(err) { if(err.notFound) { return next(); @@ -455,7 +504,7 @@ DB.prototype.getPrefix = function(service, callback) { } function getUnused(next) { - self.store.get(self.dbPrefix + 'nextUnused', function(err, buffer) { + self.get(self.dbPrefix + 'nextUnused', function(err, buffer) { if(err) { if(err.notFound) { return next(null, new Buffer('0001', 'hex')); @@ -468,7 +517,7 @@ DB.prototype.getPrefix = function(service, callback) { } function putPrefix(buffer, next) { - self.store.put(self.dbPrefix + 'prefix-' + service, buffer, function(err) { + self._store.put(self.dbPrefix + 'prefix-' + service, buffer, function(err) { if(err) { return next(err); } @@ -482,7 +531,7 @@ DB.prototype.getPrefix = function(service, callback) { var nextUnused = new Buffer(2); nextUnused.writeUInt16BE(prefix + 1); - self.store.put(self.dbPrefix + 'nextUnused', nextUnused, function(err) { + self._store.put(self.dbPrefix + 'nextUnused', nextUnused, function(err) { if(err) { return next(err); } diff --git a/lib/services/db/reorg.js b/lib/services/db/reorg.js index d36151ac..42370c62 100644 --- a/lib/services/db/reorg.js +++ b/lib/services/db/reorg.js @@ -69,7 +69,7 @@ Reorg.prototype.rewindConcurrentTip = function(commonAncestor, callback) { } operations.push(self.db.getConcurrentTipOperation(self.db.concurrentTip, false)); - self.db.store.batch(operations, function(err) { + self.db.batch(operations, function(err) { if(err) { return next(err); } @@ -108,7 +108,7 @@ Reorg.prototype.fastForwardConcurrentTip = function(newHashes, callback) { } operations.push(self.db.getConcurrentTipOperation(block, true)); - self.db.store.batch(operations, function(err) { + self.db.batch(operations, function(err) { if(err) { return next(err); } @@ -157,7 +157,7 @@ Reorg.prototype.rewindBothTips = function(commonAncestor, callback) { } var operations = results[0].concat(results[1]); - self.db.store.batch(operations, function(err) { + self.db.batch(operations, function(err) { if(err) { return next(err); } @@ -220,7 +220,7 @@ Reorg.prototype.fastForwardBothTips = function(newHashes, callback) { var operations = results[0].concat(results[1]); - self.db.store.batch(operations, function(err) { + self.db.batch(operations, function(err) { if(err) { return next(err); } diff --git a/lib/services/db/sync.js b/lib/services/db/sync.js index c9e873dd..33728073 100644 --- a/lib/services/db/sync.js +++ b/lib/services/db/sync.js @@ -280,7 +280,7 @@ ProcessSerial.prototype._process = function(block, callback) { self.tip = block; - self.db.store.batch(obj.operations, function(err) { + self.db.batch(obj.operations, function(err) { if(err) { return callback(err); } @@ -339,7 +339,7 @@ WriteStream.prototype._write = function(obj, enc, callback) { return setImmediate(callback); } - self.db.store.batch(obj.operations, function(err) { + self.db.batch(obj.operations, function(err) { if(err) { return callback(err); } @@ -375,7 +375,7 @@ ProcessBoth.prototype._write = function(block, encoding, callback) { return callback(err); } var operations = results[0].concat(results[1]); - self.db.store.batch(operations, function(err) { + self.db.batch(operations, function(err) { if(err) { return callback(err); } diff --git a/lib/services/mempool/index.js b/lib/services/mempool/index.js index 41ff7d0b..007b16ef 100644 --- a/lib/services/mempool/index.js +++ b/lib/services/mempool/index.js @@ -13,7 +13,7 @@ var MempoolService = function(options) { this.name = options.name; this._txIndex = {}; this._addressIndex = {}; - this.store = this.node.services.db.store; + this.db = this.node.services.db; this._handleBlocks = false; }; @@ -220,7 +220,7 @@ MempoolService.prototype.getTransaction = function(txid, callback) { callback(null, self._encoding.decodeMempoolTransactionValue(txBuffer)); }); } - self.store.get(key, function(err, value) { + self.db.get(key, function(err, value) { if(err) { return callback(err); } @@ -251,7 +251,7 @@ MempoolService.prototype._updateMempool = function(tx, action, callback) { return callback(err); } var operations = [operation].concat(self._getTransactionAddressDetailOperations(tx, action)); - self.store.batch(operations, function(err) { + self.db.batch(operations, function(err) { if(err) { log.error('batch operation for updating Mempool failed.'); return; @@ -267,7 +267,7 @@ MempoolService.prototype.getTransactionsByAddress = function(address, callback) var maxTxid = new Buffer(new Array(65).join('f'), 'hex'); var start = self._encoding.encodeMempoolAddressKey(address); var end = Buffer.concat([self._encoding.encodeMempoolAddressKey(address), maxTxid]); - var stream = self.store.createKeyStream({ + var stream = self.db.createKeyStream({ gte: start, lte: end }); diff --git a/lib/services/timestamp/index.js b/lib/services/timestamp/index.js index 19bfa185..fd59591d 100644 --- a/lib/services/timestamp/index.js +++ b/lib/services/timestamp/index.js @@ -17,7 +17,7 @@ TimestampService.dependencies = [ 'db' ]; TimestampService.prototype.start = function(callback) { var self = this; - this.store = this.node.services.db.store; + this.db = this.node.services.db; this.node.services.db.getPrefix(this.name, function(err, prefix) { if(err) { @@ -94,7 +94,7 @@ TimestampService.prototype.getTimestamp = function(hash, callback) { } var key = self.encoding.encodeBlockTimestampKey(hash); - self.store.get(key, function(err, buffer) { + self.db.get(key, function(err, buffer) { if(err) { return callback(err); } @@ -113,7 +113,7 @@ TimestampService.prototype.getBlockHeights = function(timestamps, callback) { }); var start = self.encoding.encodeTimestampBlockKey(timestamps[0]); var end = self.encoding.encodeTimestampBlockKey(timestamps[1]); - var stream = self.store.createReadStream({ + var stream = self.db.createReadStream({ gte: start, lte: end }); diff --git a/lib/services/transaction/index.js b/lib/services/transaction/index.js index ac11903b..4ba3da9c 100644 --- a/lib/services/transaction/index.js +++ b/lib/services/transaction/index.js @@ -34,7 +34,7 @@ TransactionService.dependencies = [ TransactionService.prototype.start = function(callback) { var self = this; - self.store = this.node.services.db.store; + self.db = this.node.services.db; self.node.services.db.getPrefix(self.name, function(err, prefix) { if(err) { @@ -169,7 +169,7 @@ TransactionService.prototype.getTransaction = function(txid, options, callback) async.waterfall([ function(next) { - self.node.services.db.store.get(key, function(err, buffer) { + self.node.services.db.get(key, function(err, buffer) { if (err instanceof levelup.errors.NotFoundError) { return next(null, false); } else if (err) { diff --git a/lib/services/wallet-api/index.js b/lib/services/wallet-api/index.js index 32acd663..ba3baa6f 100644 --- a/lib/services/wallet-api/index.js +++ b/lib/services/wallet-api/index.js @@ -16,7 +16,9 @@ var _ = require('lodash'); var bodyParser = require('body-parser'); var LRU = require('lru-cache'); var Encoding = require('./encoding'); -var Input = require('bitcore-lib').Transaction.Input; +var bitcore = require('bitcore-lib'); +var Input = bitcore.Transaction.Input; +var Unit = bitcore.Unit; var Transform = require('stream').Transform; var WalletService = function(options) { @@ -30,6 +32,8 @@ var WalletService = function(options) { this._addressMap = {}; this.balances = {}; + + this.db = this.node.services.db; }; inherits(WalletService, BaseService); @@ -49,8 +53,6 @@ WalletService.prototype.getAPIMethods = function() { WalletService.prototype.start = function(callback) { var self = this; - self.db = self.node.services.db; - self.store = self.db.store; self.node.services.db.getPrefix(self.name, function(err, servicePrefix) { @@ -354,7 +356,7 @@ WalletService.prototype._loadAllAddresses = function(callback) { var start = self._encoding.encodeWalletAddressesKey('00'); var end = self._encoding.encodeWalletAddressesKey(Array(65).join('f')); - var stream = self.store.createReadStream({ + var stream = self.db.createReadStream({ gte: start, lt: end }); @@ -390,7 +392,7 @@ WalletService.prototype._loadAllBalances = function(callback) { var start = self._encoding.encodeWalletBalanceKey('00'); var end = self._encoding.encodeWalletBalanceKey(Array(65).join('f')); - var stream = self.store.createReadStream({ + var stream = self.db.createReadStream({ gte: start, lt: end }); @@ -538,7 +540,7 @@ WalletService.prototype._endpointDumpAllWallets = function() { var start = new Buffer(self.servicePrefix); var end = new Buffer.concat([start, new Buffer('ff', 'hex')]); - var stream = self.store.createKeyStream({ + var stream = self.db.createKeyStream({ gte: start, lt: end }); @@ -571,7 +573,7 @@ WalletService.prototype._endpointGetWalletIds = function() { return function(req, res) { var start = new Buffer.concat([self.servicePrefix, new Buffer(self._encoding.subKeyMap.addresses.buffer)]); var end = new Buffer.concat([start, new Buffer('ff', 'hex')]); - var stream = self.store.createKeyStream({ + var stream = self.db.createKeyStream({ gte: start, lt: end }); @@ -711,12 +713,18 @@ WalletService.prototype._endpointGetTransactions = function() { walletId: walletId }; + var missingTxidCount = 0; var transform = new Transform({ objectMode: true, highWaterMark: 1000000 }); //txids are sent in and the actual tx's are found here transform._transform = function(chunk, enc, callback) { var txid = self._encoding.decodeWalletTransactionKey(chunk).txid.toString('hex'); - assert(txid.length === 64, 'WalletService, Txid: ' + txid + ' with length: ' + txid.length + ' does not resemble a txid.'); + + if (txid.length !== 64 || txid === '0000000000000000000000000000000000000000000000000000000000000000') { + missingTxidCount++; + log.error('missingTxidCount: ', missingTxidCount); + return callback(); + } self._getTransactionFromDb(options, txid, function(err, tx) { @@ -740,7 +748,11 @@ WalletService.prototype._endpointGetTransactions = function() { }; var encodingFn = self._encoding.encodeWalletTransactionKey.bind(self._encoding); - var stream = self.store.createKeyStream(self._getSearchParams(encodingFn, options)); + var stream = self.db.createKeyStream(self._getSearchParams(encodingFn, options)); + + stream.on('close', function() { + stream.unpipe(); + }); stream.pipe(transform).pipe(res); }); @@ -808,7 +820,7 @@ WalletService.prototype._endpointPutAddresses = function() { WalletService.prototype._getUtxos = function(walletId, options, callback) { var self = this; - var stream = self.store.createReadStream({ + var stream = self.db.createReadStream({ gte: self._encoding.encodeWalletUtxoKey(walletId), lt: self._encoding.encodeWalletUtxoKey(mainUtils.getTerminalKey(new Buffer(walletId))) }); @@ -843,7 +855,7 @@ WalletService.prototype._getBalance = function(walletId, options, callback) { var key = self._encoding.encodeWalletBalanceKey(walletId); - self.store.get(key, function(err, buffer) { + self.db.get(key, function(err, buffer) { if(err) { return callback(err); @@ -913,7 +925,7 @@ WalletService.prototype._removeWallet = function(walletId, callback) { .fn.call(self._encoding, walletId), new Buffer('ff', 'hex')]); - var stream = self.store.createKeyStream({ + var stream = self.db.createKeyStream({ gte: start, lt: end }); @@ -943,7 +955,7 @@ WalletService.prototype._removeWallet = function(walletId, callback) { key: results[i] }); } - self.store.batch(operations, function(err) { + self.db.batch(operations, function(err) { if(err) { return callback(err); } @@ -959,7 +971,7 @@ WalletService.prototype._removeAllWallets = function(callback) { var start = self._encoding.servicePrefix; var end = new Buffer.concat([ start, new Buffer('ff', 'hex') ]); - var stream = self.store.createKeyStream({ + var stream = self.db.createKeyStream({ gte: start, lte: end }); @@ -974,7 +986,7 @@ WalletService.prototype._removeAllWallets = function(callback) { }); stream.on('end', function() { - self.store.batch(operations, function(err) { + self.db.batch(operations, function(err) { if(err) { return callback(err); } @@ -986,7 +998,7 @@ WalletService.prototype._removeAllWallets = function(callback) { WalletService.prototype._getAddresses = function(walletId, callback) { var self = this; var key = self._encoding.encodeWalletAddressesKey(walletId); - self.store.get(key, function(err, value) { + self.db.get(key, function(err, value) { if(err) { return callback(err); } @@ -1000,10 +1012,10 @@ WalletService.prototype._getAddresses = function(walletId, callback) { WalletService.prototype._createWallet = function(walletId, callback) { var self = this; var key = self._encoding.encodeWalletAddressesKey(walletId); - self.store.get(key, function(err) { + self.db.get(key, function(err) { if (err && ((/notfound/i).test(err) || err.notFound)) { var value = self._encoding.encodeWalletAddressesValue([]); - return self.store.put(key, value, callback); + return self.db.put(key, value, callback); } callback(); }); @@ -1072,7 +1084,7 @@ WalletService.prototype._importAddresses = function(walletId, addresses, jobId, return callback(err, jobResults); } - log.info('loaded addresses, count: ', oldAddresses.length); + log.info('loaded existing addresses, count: ', oldAddresses.length); async.parallel( [ self._getUTXOIndexOperations.bind(self, walletId, addresses, jobId), @@ -1095,7 +1107,7 @@ WalletService.prototype._importAddresses = function(walletId, addresses, jobId, value: self._encoding.encodeWalletAddressesValue(oldAddresses.concat(addresses)) }); - self.store.batch(operations, function(err) { + self.db.batch(operations, function(err) { if(err) { return callback(err, jobResults); } @@ -1132,6 +1144,8 @@ WalletService.prototype._getUTXOIndexOperations = function(walletId, addresses, balance = initialBalance; } + log.info('Initial balance of walletId: ' + walletId + ' is: ' + Unit.fromSatoshis(balance).toBTC() + ' BTC.'); + log.info('Starting to gather utxos for walletId: ' + walletId); self.node.services.address.getUtxos(addresses, false, function(err, utxos) { if(err) { return callback(err); @@ -1164,6 +1178,7 @@ WalletService.prototype._getUTXOIndexOperations = function(walletId, addresses, value: self._encoding.encodeWalletBalanceValue(balance) }); + log.info('Final balance for walletId: ' + walletId + ' is: ' + Unit.fromSatoshis(balance).toBTC() + ' BTC.'); callback(null, operations); }); }); @@ -1180,7 +1195,7 @@ WalletService.prototype._getTxidIndexOperations = function(walletId, addresses, return next(err); } if (logCount++ % 1000 === 0) { - log.info('loaded 10 addresses, total count: ', Object.keys(txids).length); + log.info('loaded address txids, total count: ', Object.keys(txids).length); } txids = _.merge(txids, tmpTxids); return next(); @@ -1190,8 +1205,8 @@ WalletService.prototype._getTxidIndexOperations = function(walletId, addresses, return callback(err); } - var operations = Object.keys(txids).map(function(txid) { + assert(txid.length === 64, 'WalletService, Txid: ' + txid + ' with length: ' + txid.length + ' does not resemble a txid.'); return { type: 'put', key: self._encoding.encodeWalletTransactionKey(walletId, txids[txid], txid) @@ -1205,13 +1220,13 @@ WalletService.prototype._getTxidIndexOperations = function(walletId, addresses, WalletService.prototype._storeAddresses = function(walletId, addresses, callback) { var key = this._encoding.encodeWalletAddressesKey(walletId); var value = this._encoding.encodeWalletValue(addresses); - this.store.put(key, value, callback); + this.db.put(key, value, callback); }; WalletService.prototype._storeBalance = function(walletId, balance, callback) { var key = this._encoding.encodeWalletBalanceKey(walletId); var value = this._encoding.encodeWalletBalanceValue(balance); - this.store.put(key, value, callback); + this.db.put(key, value, callback); }; WalletService.prototype._processStartEndOptions = function(req, callback) {