Added more logging. Services should call the db service for db operations.

This commit is contained in:
Chris Kleeschulte 2017-05-17 18:02:00 +00:00
parent b4214e6ffc
commit bb73bac027
8 changed files with 132 additions and 68 deletions

View File

@ -47,8 +47,8 @@ AddressService.dependencies = [
AddressService.prototype.start = function(callback) {
var self = this;
this.store = this.node.services.db.store;
this.node.services.db.getPrefix(this.name, function(err, prefix) {
this.db = this.node.services.db;
this.db.getPrefix(this.name, function(err, prefix) {
if(err) {
return callback(err);
}
@ -513,7 +513,7 @@ AddressService.prototype.getInputForOutput = function(txid, outputIndex, options
valueEncoding: 'binary',
keyEncoding: 'binary'
};
this.node.services.db.store.get(key, dbOptions, function(err, buffer) {
this.db.get(key, dbOptions, function(err, buffer) {
if (err instanceof levelup.errors.NotFoundError) {
return callback(null, false);
} else if (err) {
@ -572,7 +572,7 @@ AddressService.prototype.createInputsDBStream = function(addressStr, options) {
var adjustedStart = options.start + 1;
startBuffer.writeUInt32BE(adjustedStart, 0);
stream = this.node.services.db.store.createReadStream({
stream = this.db.createReadStream({
gt: Buffer.concat([
constants.PREFIXES.SPENTS,
hashBuffer,
@ -592,7 +592,7 @@ AddressService.prototype.createInputsDBStream = function(addressStr, options) {
});
} else {
var allKey = Buffer.concat([constants.PREFIXES.SPENTS, hashBuffer, hashTypeBuffer]);
stream = this.node.services.db.store.createReadStream({
stream = this.db.createReadStream({
gt: Buffer.concat([allKey, constants.SPACER_HEIGHT_MIN]),
lt: Buffer.concat([allKey, constants.SPACER_HEIGHT_MAX]),
valueEncoding: 'binary',
@ -786,7 +786,7 @@ AddressService.prototype.createOutputsDBStream = function(addressStr, options) {
var startAdjusted = options.start + 1;
startBuffer.writeUInt32BE(startAdjusted, 0);
stream = this.node.services.db.store.createReadStream({
stream = this.db.createReadStream({
gt: Buffer.concat([
constants.PREFIXES.OUTPUTS,
hashBuffer,
@ -806,7 +806,7 @@ AddressService.prototype.createOutputsDBStream = function(addressStr, options) {
});
} else {
var allKey = Buffer.concat([constants.PREFIXES.OUTPUTS, hashBuffer, hashTypeBuffer]);
stream = this.node.services.db.store.createReadStream({
stream = this.db.createReadStream({
gt: Buffer.concat([allKey, constants.SPACER_HEIGHT_MIN]),
lt: Buffer.concat([allKey, constants.SPACER_HEIGHT_MAX]),
valueEncoding: 'binary',
@ -988,7 +988,7 @@ AddressService.prototype.getUtxosForAddress = function(address, queryMempool, ca
var self = this;
var stream = self.store.createReadStream({
var stream = self.db.createReadStream({
gte: self._encoding.encodeUtxoIndexKey(address),
lt: self._encoding.encodeUtxoIndexKey(utils.getTerminalKey(new Buffer(address)))
});
@ -1130,7 +1130,7 @@ AddressService.prototype.getAddressTxids = function(address, options, callback)
var start = self._encoding.encodeAddressIndexKey(address, opts.start, opts.txid);
var end = self._encoding.encodeAddressIndexKey(address, opts.end, opts.txid);
var stream = self.store.createKeyStream({
var stream = self.db.createKeyStream({
gte: start,
lt: end
});
@ -1162,7 +1162,7 @@ AddressService.prototype.getAddressTxidsWithHeights = function(address, options,
var start = self._encoding.encodeAddressIndexKey(address, opts.start || 0); //the start and end must be the same length
var end = Buffer.concat([ start.slice(0, -36), new Buffer((opts.end || 'ffffffff'), 'hex') ]);
var stream = self.store.createKeyStream({
var stream = self.db.createKeyStream({
gte: start,
lt: end
});

View File

@ -115,13 +115,13 @@ DB.prototype._setDataPath = function() {
DB.prototype._checkVersion = function(callback) {
var self = this;
self.store.get(self.dbPrefix + 'tip', self.dbOptions, function(err) {
self.get(self.dbPrefix + 'tip', self.dbOptions, function(err) {
if (err instanceof levelup.errors.NotFoundError) {
return callback();
} else if (err) {
return callback(err);
}
self.store.get(self.dbPrefix + 'version', self.dbOptions, function(err, buffer) {
self.get(self.dbPrefix + 'version', self.dbOptions, function(err, buffer) {
var version;
if (err instanceof levelup.errors.NotFoundError) {
version = 1;
@ -147,7 +147,7 @@ DB.prototype._checkVersion = function(callback) {
DB.prototype._setVersion = function(callback) {
var versionBuffer = new Buffer(new Array(4));
versionBuffer.writeUInt32BE(this.version);
this.store.put(this.dbPrefix + 'version', versionBuffer, callback);
this._store.put(this.dbPrefix + 'version', versionBuffer, callback);
};
DB.prototype.start = function(callback) {
@ -159,14 +159,6 @@ DB.prototype.start = function(callback) {
self._store = levelup(self.dataPath, { db: self.levelupStore, keyEncoding: 'binary', valueEncoding: 'binary'});
self.store = {
get: self._store.get.bind(self._store),
put: self._store.put.bind(self._store),
batch: self._store.batch.bind(self._store),
createReadStream: self._store.createReadStream.bind(self._store),
createKeyStream: self._store.createKeyStream.bind(self._store),
};
self.node.once('ready', function() {
self.genesis = Block.fromBuffer(self.bitcoind.genesisBuffer);
@ -189,6 +181,63 @@ DB.prototype.start = function(callback) {
};
DB.prototype.get = function(key, options, callback) {
var cb = callback;
var opts = options;
if (typeof callback !== 'function') {
cb = options;
opts = {};
}
if (!this._stopping) {
this._store.get(key, opts, cb);
} else {
setImmediate(cb);
}
};
DB.prototype.put = function(key, value, options, callback) {
var cb = callback;
var opts = options;
if (typeof callback !== 'function') {
cb = options;
opts = {};
}
if (!this._stopping) {
this._store.put(key, value, opts, cb);
} else {
setImmediate(cb);
}
};
DB.prototype.batch = function(ops, options, callback) {
var cb = callback;
var opts = options;
if (typeof callback !== 'function') {
cb = options;
opts = {};
}
if (!this._stopping) {
this._store.batch(ops, opts, cb);
} else {
setImmediate(cb);
}
};
DB.prototype.createReadStream = function(op) {
var stream;
if (!this._stopping) {
stream = this._store.createReadStream(op)
}
return stream;
};
DB.prototype.createKeyStream = function(op) {
var stream;
if (!this._stopping) {
stream = this._store.createKeyStream(op)
}
return stream;
};
DB.prototype.detectReorg = function(blocks) {
@ -270,14 +319,14 @@ DB.prototype.stop = function(callback) {
async.whilst(function() {
return self._operationsQueue > 0;
}, function(next) {
setTimeout(next, 10);
setTimeout(next, 3000);
}, function() {
self.close(callback);
});
};
DB.prototype.close = function(callback) {
if (this._store) {
if (this._store && this._store.isOpen()) {
this._store.close(callback);
}
};
@ -294,7 +343,7 @@ DB.prototype.loadTips = function(callback) {
async.each(tipStrings, function(tip, next) {
self.store.get(self.dbPrefix + tip, self.dbOptions, function(err, tipData) {
self._store.get(self.dbPrefix + tip, self.dbOptions, function(err, tipData) {
if(err && !(err instanceof levelup.errors.NotFoundError)) {
return next(err);
@ -443,7 +492,7 @@ DB.prototype.getPrefix = function(service, callback) {
var self = this;
function getPrefix(next) {
self.store.get(self.dbPrefix + 'prefix-' + service, function(err, buffer) {
self.get(self.dbPrefix + 'prefix-' + service, function(err, buffer) {
if(err) {
if(err.notFound) {
return next();
@ -455,7 +504,7 @@ DB.prototype.getPrefix = function(service, callback) {
}
function getUnused(next) {
self.store.get(self.dbPrefix + 'nextUnused', function(err, buffer) {
self.get(self.dbPrefix + 'nextUnused', function(err, buffer) {
if(err) {
if(err.notFound) {
return next(null, new Buffer('0001', 'hex'));
@ -468,7 +517,7 @@ DB.prototype.getPrefix = function(service, callback) {
}
function putPrefix(buffer, next) {
self.store.put(self.dbPrefix + 'prefix-' + service, buffer, function(err) {
self._store.put(self.dbPrefix + 'prefix-' + service, buffer, function(err) {
if(err) {
return next(err);
}
@ -482,7 +531,7 @@ DB.prototype.getPrefix = function(service, callback) {
var nextUnused = new Buffer(2);
nextUnused.writeUInt16BE(prefix + 1);
self.store.put(self.dbPrefix + 'nextUnused', nextUnused, function(err) {
self._store.put(self.dbPrefix + 'nextUnused', nextUnused, function(err) {
if(err) {
return next(err);
}

View File

@ -69,7 +69,7 @@ Reorg.prototype.rewindConcurrentTip = function(commonAncestor, callback) {
}
operations.push(self.db.getConcurrentTipOperation(self.db.concurrentTip, false));
self.db.store.batch(operations, function(err) {
self.db.batch(operations, function(err) {
if(err) {
return next(err);
}
@ -108,7 +108,7 @@ Reorg.prototype.fastForwardConcurrentTip = function(newHashes, callback) {
}
operations.push(self.db.getConcurrentTipOperation(block, true));
self.db.store.batch(operations, function(err) {
self.db.batch(operations, function(err) {
if(err) {
return next(err);
}
@ -157,7 +157,7 @@ Reorg.prototype.rewindBothTips = function(commonAncestor, callback) {
}
var operations = results[0].concat(results[1]);
self.db.store.batch(operations, function(err) {
self.db.batch(operations, function(err) {
if(err) {
return next(err);
}
@ -220,7 +220,7 @@ Reorg.prototype.fastForwardBothTips = function(newHashes, callback) {
var operations = results[0].concat(results[1]);
self.db.store.batch(operations, function(err) {
self.db.batch(operations, function(err) {
if(err) {
return next(err);
}

View File

@ -280,7 +280,7 @@ ProcessSerial.prototype._process = function(block, callback) {
self.tip = block;
self.db.store.batch(obj.operations, function(err) {
self.db.batch(obj.operations, function(err) {
if(err) {
return callback(err);
}
@ -339,7 +339,7 @@ WriteStream.prototype._write = function(obj, enc, callback) {
return setImmediate(callback);
}
self.db.store.batch(obj.operations, function(err) {
self.db.batch(obj.operations, function(err) {
if(err) {
return callback(err);
}
@ -375,7 +375,7 @@ ProcessBoth.prototype._write = function(block, encoding, callback) {
return callback(err);
}
var operations = results[0].concat(results[1]);
self.db.store.batch(operations, function(err) {
self.db.batch(operations, function(err) {
if(err) {
return callback(err);
}

View File

@ -13,7 +13,7 @@ var MempoolService = function(options) {
this.name = options.name;
this._txIndex = {};
this._addressIndex = {};
this.store = this.node.services.db.store;
this.db = this.node.services.db;
this._handleBlocks = false;
};
@ -220,7 +220,7 @@ MempoolService.prototype.getTransaction = function(txid, callback) {
callback(null, self._encoding.decodeMempoolTransactionValue(txBuffer));
});
}
self.store.get(key, function(err, value) {
self.db.get(key, function(err, value) {
if(err) {
return callback(err);
}
@ -251,7 +251,7 @@ MempoolService.prototype._updateMempool = function(tx, action, callback) {
return callback(err);
}
var operations = [operation].concat(self._getTransactionAddressDetailOperations(tx, action));
self.store.batch(operations, function(err) {
self.db.batch(operations, function(err) {
if(err) {
log.error('batch operation for updating Mempool failed.');
return;
@ -267,7 +267,7 @@ MempoolService.prototype.getTransactionsByAddress = function(address, callback)
var maxTxid = new Buffer(new Array(65).join('f'), 'hex');
var start = self._encoding.encodeMempoolAddressKey(address);
var end = Buffer.concat([self._encoding.encodeMempoolAddressKey(address), maxTxid]);
var stream = self.store.createKeyStream({
var stream = self.db.createKeyStream({
gte: start,
lte: end
});

View File

@ -17,7 +17,7 @@ TimestampService.dependencies = [ 'db' ];
TimestampService.prototype.start = function(callback) {
var self = this;
this.store = this.node.services.db.store;
this.db = this.node.services.db;
this.node.services.db.getPrefix(this.name, function(err, prefix) {
if(err) {
@ -94,7 +94,7 @@ TimestampService.prototype.getTimestamp = function(hash, callback) {
}
var key = self.encoding.encodeBlockTimestampKey(hash);
self.store.get(key, function(err, buffer) {
self.db.get(key, function(err, buffer) {
if(err) {
return callback(err);
}
@ -113,7 +113,7 @@ TimestampService.prototype.getBlockHeights = function(timestamps, callback) {
});
var start = self.encoding.encodeTimestampBlockKey(timestamps[0]);
var end = self.encoding.encodeTimestampBlockKey(timestamps[1]);
var stream = self.store.createReadStream({
var stream = self.db.createReadStream({
gte: start,
lte: end
});

View File

@ -34,7 +34,7 @@ TransactionService.dependencies = [
TransactionService.prototype.start = function(callback) {
var self = this;
self.store = this.node.services.db.store;
self.db = this.node.services.db;
self.node.services.db.getPrefix(self.name, function(err, prefix) {
if(err) {
@ -169,7 +169,7 @@ TransactionService.prototype.getTransaction = function(txid, options, callback)
async.waterfall([
function(next) {
self.node.services.db.store.get(key, function(err, buffer) {
self.node.services.db.get(key, function(err, buffer) {
if (err instanceof levelup.errors.NotFoundError) {
return next(null, false);
} else if (err) {

View File

@ -16,7 +16,9 @@ var _ = require('lodash');
var bodyParser = require('body-parser');
var LRU = require('lru-cache');
var Encoding = require('./encoding');
var Input = require('bitcore-lib').Transaction.Input;
var bitcore = require('bitcore-lib');
var Input = bitcore.Transaction.Input;
var Unit = bitcore.Unit;
var Transform = require('stream').Transform;
var WalletService = function(options) {
@ -30,6 +32,8 @@ var WalletService = function(options) {
this._addressMap = {};
this.balances = {};
this.db = this.node.services.db;
};
inherits(WalletService, BaseService);
@ -49,8 +53,6 @@ WalletService.prototype.getAPIMethods = function() {
WalletService.prototype.start = function(callback) {
var self = this;
self.db = self.node.services.db;
self.store = self.db.store;
self.node.services.db.getPrefix(self.name, function(err, servicePrefix) {
@ -354,7 +356,7 @@ WalletService.prototype._loadAllAddresses = function(callback) {
var start = self._encoding.encodeWalletAddressesKey('00');
var end = self._encoding.encodeWalletAddressesKey(Array(65).join('f'));
var stream = self.store.createReadStream({
var stream = self.db.createReadStream({
gte: start,
lt: end
});
@ -390,7 +392,7 @@ WalletService.prototype._loadAllBalances = function(callback) {
var start = self._encoding.encodeWalletBalanceKey('00');
var end = self._encoding.encodeWalletBalanceKey(Array(65).join('f'));
var stream = self.store.createReadStream({
var stream = self.db.createReadStream({
gte: start,
lt: end
});
@ -538,7 +540,7 @@ WalletService.prototype._endpointDumpAllWallets = function() {
var start = new Buffer(self.servicePrefix);
var end = new Buffer.concat([start, new Buffer('ff', 'hex')]);
var stream = self.store.createKeyStream({
var stream = self.db.createKeyStream({
gte: start,
lt: end
});
@ -571,7 +573,7 @@ WalletService.prototype._endpointGetWalletIds = function() {
return function(req, res) {
var start = new Buffer.concat([self.servicePrefix, new Buffer(self._encoding.subKeyMap.addresses.buffer)]);
var end = new Buffer.concat([start, new Buffer('ff', 'hex')]);
var stream = self.store.createKeyStream({
var stream = self.db.createKeyStream({
gte: start,
lt: end
});
@ -711,12 +713,18 @@ WalletService.prototype._endpointGetTransactions = function() {
walletId: walletId
};
var missingTxidCount = 0;
var transform = new Transform({ objectMode: true, highWaterMark: 1000000 });
//txids are sent in and the actual tx's are found here
transform._transform = function(chunk, enc, callback) {
var txid = self._encoding.decodeWalletTransactionKey(chunk).txid.toString('hex');
assert(txid.length === 64, 'WalletService, Txid: ' + txid + ' with length: ' + txid.length + ' does not resemble a txid.');
if (txid.length !== 64 || txid === '0000000000000000000000000000000000000000000000000000000000000000') {
missingTxidCount++;
log.error('missingTxidCount: ', missingTxidCount);
return callback();
}
self._getTransactionFromDb(options, txid, function(err, tx) {
@ -740,7 +748,11 @@ WalletService.prototype._endpointGetTransactions = function() {
};
var encodingFn = self._encoding.encodeWalletTransactionKey.bind(self._encoding);
var stream = self.store.createKeyStream(self._getSearchParams(encodingFn, options));
var stream = self.db.createKeyStream(self._getSearchParams(encodingFn, options));
stream.on('close', function() {
stream.unpipe();
});
stream.pipe(transform).pipe(res);
});
@ -808,7 +820,7 @@ WalletService.prototype._endpointPutAddresses = function() {
WalletService.prototype._getUtxos = function(walletId, options, callback) {
var self = this;
var stream = self.store.createReadStream({
var stream = self.db.createReadStream({
gte: self._encoding.encodeWalletUtxoKey(walletId),
lt: self._encoding.encodeWalletUtxoKey(mainUtils.getTerminalKey(new Buffer(walletId)))
});
@ -843,7 +855,7 @@ WalletService.prototype._getBalance = function(walletId, options, callback) {
var key = self._encoding.encodeWalletBalanceKey(walletId);
self.store.get(key, function(err, buffer) {
self.db.get(key, function(err, buffer) {
if(err) {
return callback(err);
@ -913,7 +925,7 @@ WalletService.prototype._removeWallet = function(walletId, callback) {
.fn.call(self._encoding, walletId),
new Buffer('ff', 'hex')]);
var stream = self.store.createKeyStream({
var stream = self.db.createKeyStream({
gte: start,
lt: end
});
@ -943,7 +955,7 @@ WalletService.prototype._removeWallet = function(walletId, callback) {
key: results[i]
});
}
self.store.batch(operations, function(err) {
self.db.batch(operations, function(err) {
if(err) {
return callback(err);
}
@ -959,7 +971,7 @@ WalletService.prototype._removeAllWallets = function(callback) {
var start = self._encoding.servicePrefix;
var end = new Buffer.concat([ start, new Buffer('ff', 'hex') ]);
var stream = self.store.createKeyStream({
var stream = self.db.createKeyStream({
gte: start,
lte: end
});
@ -974,7 +986,7 @@ WalletService.prototype._removeAllWallets = function(callback) {
});
stream.on('end', function() {
self.store.batch(operations, function(err) {
self.db.batch(operations, function(err) {
if(err) {
return callback(err);
}
@ -986,7 +998,7 @@ WalletService.prototype._removeAllWallets = function(callback) {
WalletService.prototype._getAddresses = function(walletId, callback) {
var self = this;
var key = self._encoding.encodeWalletAddressesKey(walletId);
self.store.get(key, function(err, value) {
self.db.get(key, function(err, value) {
if(err) {
return callback(err);
}
@ -1000,10 +1012,10 @@ WalletService.prototype._getAddresses = function(walletId, callback) {
WalletService.prototype._createWallet = function(walletId, callback) {
var self = this;
var key = self._encoding.encodeWalletAddressesKey(walletId);
self.store.get(key, function(err) {
self.db.get(key, function(err) {
if (err && ((/notfound/i).test(err) || err.notFound)) {
var value = self._encoding.encodeWalletAddressesValue([]);
return self.store.put(key, value, callback);
return self.db.put(key, value, callback);
}
callback();
});
@ -1072,7 +1084,7 @@ WalletService.prototype._importAddresses = function(walletId, addresses, jobId,
return callback(err, jobResults);
}
log.info('loaded addresses, count: ', oldAddresses.length);
log.info('loaded existing addresses, count: ', oldAddresses.length);
async.parallel(
[
self._getUTXOIndexOperations.bind(self, walletId, addresses, jobId),
@ -1095,7 +1107,7 @@ WalletService.prototype._importAddresses = function(walletId, addresses, jobId,
value: self._encoding.encodeWalletAddressesValue(oldAddresses.concat(addresses))
});
self.store.batch(operations, function(err) {
self.db.batch(operations, function(err) {
if(err) {
return callback(err, jobResults);
}
@ -1132,6 +1144,8 @@ WalletService.prototype._getUTXOIndexOperations = function(walletId, addresses,
balance = initialBalance;
}
log.info('Initial balance of walletId: ' + walletId + ' is: ' + Unit.fromSatoshis(balance).toBTC() + ' BTC.');
log.info('Starting to gather utxos for walletId: ' + walletId);
self.node.services.address.getUtxos(addresses, false, function(err, utxos) {
if(err) {
return callback(err);
@ -1164,6 +1178,7 @@ WalletService.prototype._getUTXOIndexOperations = function(walletId, addresses,
value: self._encoding.encodeWalletBalanceValue(balance)
});
log.info('Final balance for walletId: ' + walletId + ' is: ' + Unit.fromSatoshis(balance).toBTC() + ' BTC.');
callback(null, operations);
});
});
@ -1180,7 +1195,7 @@ WalletService.prototype._getTxidIndexOperations = function(walletId, addresses,
return next(err);
}
if (logCount++ % 1000 === 0) {
log.info('loaded 10 addresses, total count: ', Object.keys(txids).length);
log.info('loaded address txids, total count: ', Object.keys(txids).length);
}
txids = _.merge(txids, tmpTxids);
return next();
@ -1190,8 +1205,8 @@ WalletService.prototype._getTxidIndexOperations = function(walletId, addresses,
return callback(err);
}
var operations = Object.keys(txids).map(function(txid) {
assert(txid.length === 64, 'WalletService, Txid: ' + txid + ' with length: ' + txid.length + ' does not resemble a txid.');
return {
type: 'put',
key: self._encoding.encodeWalletTransactionKey(walletId, txids[txid], txid)
@ -1205,13 +1220,13 @@ WalletService.prototype._getTxidIndexOperations = function(walletId, addresses,
WalletService.prototype._storeAddresses = function(walletId, addresses, callback) {
var key = this._encoding.encodeWalletAddressesKey(walletId);
var value = this._encoding.encodeWalletValue(addresses);
this.store.put(key, value, callback);
this.db.put(key, value, callback);
};
WalletService.prototype._storeBalance = function(walletId, balance, callback) {
var key = this._encoding.encodeWalletBalanceKey(walletId);
var value = this._encoding.encodeWalletBalanceValue(balance);
this.store.put(key, value, callback);
this.db.put(key, value, callback);
};
WalletService.prototype._processStartEndOptions = function(req, callback) {