restore db and address services

This commit is contained in:
Patrick Nagurny 2017-01-10 16:51:32 -05:00
parent 97683d2ff1
commit 339f56157f
13 changed files with 3202 additions and 5 deletions

View File

@ -49,6 +49,7 @@ function Node(config) {
this.log.formatting = config.formatLogs ? true : false;
}
this.datadir = config.datadir;
this.network = null;
this.services = {};
this._unloadedServices = [];

View File

@ -24,7 +24,7 @@ function getDefaultConfig(options) {
mkdirp.sync(defaultPath);
}
var defaultServices = ['bitcoind', 'web'];
var defaultServices = ['bitcoind', 'db', 'address', 'web'];
if (options.additionalServices) {
defaultServices = defaultServices.concat(options.additionalServices);
}

View File

@ -15,6 +15,9 @@ log.debug = function() {};
* "db" service, or having "datadir" at the root of the config.
*/
function checkConfigVersion2(fullConfig) {
// TODO: Remove
return false;
var datadirUndefined = _.isUndefined(fullConfig.datadir);
var addressDefined = (fullConfig.services.indexOf('address') >= 0);
var dbDefined = (fullConfig.services.indexOf('db') >= 0);
@ -78,10 +81,6 @@ function start(options) {
fullConfig.path = path.resolve(options.path, './bitcore-node.json');
if (checkConfigVersion2(fullConfig)) {
process.exit(1);
}
fullConfig.services = start.setupServices(require, servicesPath, options.config);
var node = new BitcoreNode(fullConfig);
@ -138,6 +137,7 @@ function loadModule(req, service) {
// first try in the built-in bitcore-node services directory
service.module = req(path.resolve(__dirname, '../services/' + service.name));
} catch(e) {
console.log(e);
// check if the package.json specifies a specific file to use
var servicePackage = req(service.name + '/package.json');

View File

@ -0,0 +1,56 @@
'use strict';
var exports = {};
exports.PREFIXES = {
OUTPUTS: new Buffer('02', 'hex'), // Query outputs by address and/or height
SPENTS: new Buffer('03', 'hex'), // Query inputs by address and/or height
SPENTSMAP: new Buffer('05', 'hex') // Get the input that spends an output
};
exports.MEMPREFIXES = {
OUTPUTS: new Buffer('01', 'hex'), // Query mempool outputs by address
SPENTS: new Buffer('02', 'hex'), // Query mempool inputs by address
SPENTSMAP: new Buffer('03', 'hex') // Query mempool for the input that spends an output
};
// To save space, we're only storing the PubKeyHash or ScriptHash in our index.
// To avoid intentional unspendable collisions, which have been seen on the blockchain,
// we must store the hash type (PK or Script) as well.
exports.HASH_TYPES = {
PUBKEY: new Buffer('01', 'hex'),
REDEEMSCRIPT: new Buffer('02', 'hex')
};
// Translates from our enum type back into the hash types returned by
// bitcore-lib/address.
exports.HASH_TYPES_READABLE = {
'01': 'pubkeyhash',
'02': 'scripthash'
};
exports.HASH_TYPES_MAP = {
'pubkeyhash': exports.HASH_TYPES.PUBKEY,
'scripthash': exports.HASH_TYPES.REDEEMSCRIPT
};
exports.SPACER_MIN = new Buffer('00', 'hex');
exports.SPACER_MAX = new Buffer('ff', 'hex');
exports.SPACER_HEIGHT_MIN = new Buffer('0000000000', 'hex');
exports.SPACER_HEIGHT_MAX = new Buffer('ffffffffff', 'hex');
exports.TIMESTAMP_MIN = new Buffer('0000000000000000', 'hex');
exports.TIMESTAMP_MAX = new Buffer('ffffffffffffffff', 'hex');
// The maximum number of inputs that can be queried at once
exports.MAX_INPUTS_QUERY_LENGTH = 50000;
// The maximum number of outputs that can be queried at once
exports.MAX_OUTPUTS_QUERY_LENGTH = 50000;
// The maximum number of transactions that can be queried at once
exports.MAX_HISTORY_QUERY_LENGTH = 100;
// The maximum number of addresses that can be queried at once
exports.MAX_ADDRESSES_QUERY = 10000;
// The maximum number of simultaneous requests
exports.MAX_ADDRESSES_LIMIT = 5;
module.exports = exports;

View File

@ -0,0 +1,307 @@
'use strict';
var bitcore = require('bitcore-lib');
var BufferReader = bitcore.encoding.BufferReader;
var Address = bitcore.Address;
var PublicKey = bitcore.PublicKey;
var constants = require('./constants');
var $ = bitcore.util.preconditions;
var exports = {};
exports.encodeSpentIndexSyncKey = function(txidBuffer, outputIndex) {
var outputIndexBuffer = new Buffer(4);
outputIndexBuffer.writeUInt32BE(outputIndex);
var key = Buffer.concat([
txidBuffer,
outputIndexBuffer
]);
return key.toString('binary');
};
exports.encodeMempoolAddressIndexKey = function(hashBuffer, hashTypeBuffer) {
var key = Buffer.concat([
hashBuffer,
hashTypeBuffer,
]);
return key.toString('binary');
};
exports.encodeOutputKey = function(hashBuffer, hashTypeBuffer, height, txidBuffer, outputIndex) {
var heightBuffer = new Buffer(4);
heightBuffer.writeUInt32BE(height);
var outputIndexBuffer = new Buffer(4);
outputIndexBuffer.writeUInt32BE(outputIndex);
var key = Buffer.concat([
constants.PREFIXES.OUTPUTS,
hashBuffer,
hashTypeBuffer,
constants.SPACER_MIN,
heightBuffer,
txidBuffer,
outputIndexBuffer
]);
return key;
};
exports.decodeOutputKey = function(buffer) {
var reader = new BufferReader(buffer);
var prefix = reader.read(1);
var hashBuffer = reader.read(20);
var hashTypeBuffer = reader.read(1);
var spacer = reader.read(1);
var height = reader.readUInt32BE();
var txid = reader.read(32);
var outputIndex = reader.readUInt32BE();
return {
prefix: prefix,
hashBuffer: hashBuffer,
hashTypeBuffer: hashTypeBuffer,
height: height,
txid: txid,
outputIndex: outputIndex
};
};
exports.encodeOutputValue = function(satoshis, scriptBuffer) {
var satoshisBuffer = new Buffer(8);
satoshisBuffer.writeDoubleBE(satoshis);
return Buffer.concat([satoshisBuffer, scriptBuffer]);
};
exports.encodeOutputMempoolValue = function(satoshis, timestampBuffer, scriptBuffer) {
var satoshisBuffer = new Buffer(8);
satoshisBuffer.writeDoubleBE(satoshis);
return Buffer.concat([satoshisBuffer, timestampBuffer, scriptBuffer]);
};
exports.decodeOutputValue = function(buffer) {
var satoshis = buffer.readDoubleBE(0);
var scriptBuffer = buffer.slice(8, buffer.length);
return {
satoshis: satoshis,
scriptBuffer: scriptBuffer
};
};
exports.decodeOutputMempoolValue = function(buffer) {
var satoshis = buffer.readDoubleBE(0);
var timestamp = buffer.readDoubleBE(8);
var scriptBuffer = buffer.slice(16, buffer.length);
return {
satoshis: satoshis,
timestamp: timestamp,
scriptBuffer: scriptBuffer
};
};
exports.encodeInputKey = function(hashBuffer, hashTypeBuffer, height, prevTxIdBuffer, outputIndex) {
var heightBuffer = new Buffer(4);
heightBuffer.writeUInt32BE(height);
var outputIndexBuffer = new Buffer(4);
outputIndexBuffer.writeUInt32BE(outputIndex);
return Buffer.concat([
constants.PREFIXES.SPENTS,
hashBuffer,
hashTypeBuffer,
constants.SPACER_MIN,
heightBuffer,
prevTxIdBuffer,
outputIndexBuffer
]);
};
exports.decodeInputKey = function(buffer) {
var reader = new BufferReader(buffer);
var prefix = reader.read(1);
var hashBuffer = reader.read(20);
var hashTypeBuffer = reader.read(1);
var spacer = reader.read(1);
var height = reader.readUInt32BE();
var prevTxId = reader.read(32);
var outputIndex = reader.readUInt32BE();
return {
prefix: prefix,
hashBuffer: hashBuffer,
hashTypeBuffer: hashTypeBuffer,
height: height,
prevTxId: prevTxId,
outputIndex: outputIndex
};
};
exports.encodeInputValue = function(txidBuffer, inputIndex) {
var inputIndexBuffer = new Buffer(4);
inputIndexBuffer.writeUInt32BE(inputIndex);
return Buffer.concat([
txidBuffer,
inputIndexBuffer
]);
};
exports.decodeInputValue = function(buffer) {
var txid = buffer.slice(0, 32);
var inputIndex = buffer.readUInt32BE(32);
return {
txid: txid,
inputIndex: inputIndex
};
};
exports.encodeInputKeyMap = function(outputTxIdBuffer, outputIndex) {
var outputIndexBuffer = new Buffer(4);
outputIndexBuffer.writeUInt32BE(outputIndex);
return Buffer.concat([
constants.PREFIXES.SPENTSMAP,
outputTxIdBuffer,
outputIndexBuffer
]);
};
exports.decodeInputKeyMap = function(buffer) {
var txid = buffer.slice(1, 33);
var outputIndex = buffer.readUInt32BE(33);
return {
outputTxId: txid,
outputIndex: outputIndex
};
};
exports.encodeInputValueMap = function(inputTxIdBuffer, inputIndex) {
var inputIndexBuffer = new Buffer(4);
inputIndexBuffer.writeUInt32BE(inputIndex);
return Buffer.concat([
inputTxIdBuffer,
inputIndexBuffer
]);
};
exports.decodeInputValueMap = function(buffer) {
var txid = buffer.slice(0, 32);
var inputIndex = buffer.readUInt32BE(32);
return {
inputTxId: txid,
inputIndex: inputIndex
};
};
exports.encodeSummaryCacheKey = function(address) {
return Buffer.concat([address.hashBuffer, constants.HASH_TYPES_MAP[address.type]]);
};
exports.decodeSummaryCacheKey = function(buffer, network) {
var hashBuffer = buffer.read(20);
var type = constants.HASH_TYPES_READABLE[buffer.read(20, 2).toString('hex')];
var address = new Address({
hashBuffer: hashBuffer,
type: type,
network: network
});
return address;
};
exports.encodeSummaryCacheValue = function(cache, tipHeight, tipHash) {
var tipHashBuffer = new Buffer(tipHash, 'hex');
var buffer = new Buffer(new Array(20));
buffer.writeUInt32BE(tipHeight);
buffer.writeDoubleBE(cache.result.totalReceived, 4);
buffer.writeDoubleBE(cache.result.balance, 12);
var txidBuffers = [];
for (var i = 0; i < cache.result.txids.length; i++) {
var buf = new Buffer(new Array(36));
var txid = cache.result.txids[i];
buf.write(txid, 'hex');
buf.writeUInt32BE(cache.result.appearanceIds[txid], 32);
txidBuffers.push(buf);
}
var txidsBuffer = Buffer.concat(txidBuffers);
var value = Buffer.concat([tipHashBuffer, buffer, txidsBuffer]);
return value;
};
exports.decodeSummaryCacheValue = function(buffer) {
var hash = buffer.slice(0, 32).toString('hex');
var height = buffer.readUInt32BE(32);
var totalReceived = buffer.readDoubleBE(36);
var balance = buffer.readDoubleBE(44);
// read 32 byte chunks until exhausted
var appearanceIds = {};
var txids = [];
var pos = 52;
while(pos < buffer.length) {
var txid = buffer.slice(pos, pos + 32).toString('hex');
var txidHeight = buffer.readUInt32BE(pos + 32);
txids.push(txid);
appearanceIds[txid] = txidHeight;
pos += 36;
}
var cache = {
height: height,
hash: hash,
result: {
appearanceIds: appearanceIds,
txids: txids,
totalReceived: totalReceived,
balance: balance,
unconfirmedAppearanceIds: {}, // unconfirmed values are never stored in cache
unconfirmedBalance: 0
}
};
return cache;
};
exports.getAddressInfo = function(addressStr) {
var addrObj = bitcore.Address(addressStr);
var hashTypeBuffer = constants.HASH_TYPES_MAP[addrObj.type];
return {
hashBuffer: addrObj.hashBuffer,
hashTypeBuffer: hashTypeBuffer,
hashTypeReadable: addrObj.type
};
};
/**
* This function is optimized to return address information about an output script
* without constructing a Bitcore Address instance.
* @param {Script} - An instance of a Bitcore Script
* @param {Network|String} - The network for the address
*/
exports.extractAddressInfoFromScript = function(script, network) {
$.checkArgument(network, 'Second argument is expected to be a network');
var hashBuffer;
var addressType;
var hashTypeBuffer;
if (script.isPublicKeyHashOut()) {
hashBuffer = script.chunks[2].buf;
hashTypeBuffer = constants.HASH_TYPES.PUBKEY;
addressType = Address.PayToPublicKeyHash;
} else if (script.isScriptHashOut()) {
hashBuffer = script.chunks[1].buf;
hashTypeBuffer = constants.HASH_TYPES.REDEEMSCRIPT;
addressType = Address.PayToScriptHash;
} else if (script.isPublicKeyOut()) {
var pubkey = script.chunks[0].buf;
var address = Address.fromPublicKey(new PublicKey(pubkey), network);
hashBuffer = address.hashBuffer;
hashTypeBuffer = constants.HASH_TYPES.PUBKEY;
// pay-to-publickey doesn't have an address, however for compatibility
// purposes, we can create an address
addressType = Address.PayToPublicKeyHash;
} else {
return false;
}
return {
hashBuffer: hashBuffer,
hashTypeBuffer: hashTypeBuffer,
addressType: addressType
};
};
module.exports = exports;

View File

@ -0,0 +1,266 @@
'use strict';
var bitcore = require('bitcore-lib');
var async = require('async');
var _ = bitcore.deps._;
var constants = require('./constants');
/**
* This represents an instance that keeps track of data over a series of
* asynchronous I/O calls to get the transaction history for a group of
* addresses. History can be queried by start and end block heights to limit large sets
* of results (uses leveldb key streaming).
*/
function AddressHistory(args) {
this.node = args.node;
this.options = args.options;
if(Array.isArray(args.addresses)) {
this.addresses = args.addresses;
} else {
this.addresses = [args.addresses];
}
this.maxHistoryQueryLength = args.options.maxHistoryQueryLength || constants.MAX_HISTORY_QUERY_LENGTH;
this.maxAddressesQuery = args.options.maxAddressesQuery || constants.MAX_ADDRESSES_QUERY;
this.maxAddressesLimit = args.options.maxAddressesLimit || constants.MAX_ADDRESSES_LIMIT;
this.addressStrings = [];
for (var i = 0; i < this.addresses.length; i++) {
var address = this.addresses[i];
if (address instanceof bitcore.Address) {
this.addressStrings.push(address.toString());
} else if (_.isString(address)) {
this.addressStrings.push(address);
} else {
throw new TypeError('Addresses are expected to be strings');
}
}
this.detailedArray = [];
}
AddressHistory.prototype._mergeAndSortTxids = function(summaries) {
var appearanceIds = {};
var unconfirmedAppearanceIds = {};
for (var i = 0; i < summaries.length; i++) {
var summary = summaries[i];
for (var key in summary.appearanceIds) {
appearanceIds[key] = summary.appearanceIds[key];
delete summary.appearanceIds[key];
}
for (var unconfirmedKey in summary.unconfirmedAppearanceIds) {
unconfirmedAppearanceIds[unconfirmedKey] = summary.unconfirmedAppearanceIds[unconfirmedKey];
delete summary.unconfirmedAppearanceIds[key];
}
}
var confirmedTxids = Object.keys(appearanceIds);
confirmedTxids.sort(function(a, b) {
// Confirmed are sorted by height
return appearanceIds[a] - appearanceIds[b];
});
var unconfirmedTxids = Object.keys(unconfirmedAppearanceIds);
unconfirmedTxids.sort(function(a, b) {
// Unconfirmed are sorted by timestamp
return unconfirmedAppearanceIds[a] - unconfirmedAppearanceIds[b];
});
return confirmedTxids.concat(unconfirmedTxids);
};
/**
* This function will give detailed history for the configured
* addresses. See AddressService.prototype.getAddressHistory
* for complete documentation about options and response format.
*/
AddressHistory.prototype.get = function(callback) {
var self = this;
if (this.addresses.length > this.maxAddressesQuery) {
return callback(new TypeError('Maximum number of addresses (' + this.maxAddressesQuery + ') exceeded'));
}
var opts = _.clone(this.options);
opts.noBalance = true;
if (this.addresses.length === 1) {
var address = this.addresses[0];
self.node.services.address.getAddressSummary(address, opts, function(err, summary) {
if (err) {
return callback(err);
}
return self._paginateWithDetails.call(self, summary.txids, callback);
});
} else {
opts.fullTxList = true;
async.mapLimit(
self.addresses,
self.maxAddressesLimit,
function(address, next) {
self.node.services.address.getAddressSummary(address, opts, next);
},
function(err, summaries) {
if (err) {
return callback(err);
}
var txids = self._mergeAndSortTxids(summaries);
return self._paginateWithDetails.call(self, txids, callback);
}
);
}
};
AddressHistory.prototype._paginateWithDetails = function(allTxids, callback) {
var self = this;
var totalCount = allTxids.length;
// Slice the page starting with the most recent
var txids;
if (self.options.from >= 0 && self.options.to >= 0) {
var fromOffset = Math.max(0, totalCount - self.options.from);
var toOffset = Math.max(0, totalCount - self.options.to);
txids = allTxids.slice(toOffset, fromOffset);
} else {
txids = allTxids;
}
// Verify that this query isn't too long
if (txids.length > self.maxHistoryQueryLength) {
return callback(new Error(
'Maximum length query (' + self.maxHistoryQueryLength + ') exceeded for address(es): ' +
self.addresses.join(',')
));
}
// Reverse to include most recent at the top
txids.reverse();
async.eachSeries(
txids,
function(txid, next) {
self.getDetailedInfo(txid, next);
},
function(err) {
if (err) {
return callback(err);
}
callback(null, {
totalCount: totalCount,
items: self.detailedArray
});
}
);
};
/**
* This function will transform items from the combinedArray into
* the detailedArray with the full transaction, satoshis and confirmation.
* @param {Object} txInfo - An item from the `combinedArray`
* @param {Function} next
*/
AddressHistory.prototype.getDetailedInfo = function(txid, next) {
var self = this;
var queryMempool = _.isUndefined(self.options.queryMempool) ? true : self.options.queryMempool;
self.node.services.db.getTransactionWithBlockInfo(
txid,
queryMempool,
function(err, transaction) {
if (err) {
return next(err);
}
transaction.populateInputs(self.node.services.db, [], function(err) {
if (err) {
return next(err);
}
var addressDetails = self.getAddressDetailsForTransaction(transaction);
self.detailedArray.push({
addresses: addressDetails.addresses,
satoshis: addressDetails.satoshis,
height: transaction.__height,
confirmations: self.getConfirmationsDetail(transaction),
timestamp: transaction.__timestamp,
// TODO bitcore-lib should return null instead of throwing error on coinbase
fees: !transaction.isCoinbase() ? transaction.getFee() : null,
tx: transaction
});
next();
});
}
);
};
/**
* A helper function for `getDetailedInfo` for getting the confirmations.
* @param {Transaction} transaction - A transaction with a populated __height value.
*/
AddressHistory.prototype.getConfirmationsDetail = function(transaction) {
var confirmations = 0;
if (transaction.__height >= 0) {
confirmations = this.node.services.db.tip.__height - transaction.__height + 1;
}
return confirmations;
};
AddressHistory.prototype.getAddressDetailsForTransaction = function(transaction) {
var result = {
addresses: {},
satoshis: 0
};
for (var inputIndex = 0; inputIndex < transaction.inputs.length; inputIndex++) {
var input = transaction.inputs[inputIndex];
if (!input.script) {
continue;
}
var inputAddress = input.script.toAddress(this.node.network);
if (inputAddress) {
var inputAddressString = inputAddress.toString();
if (this.addressStrings.indexOf(inputAddressString) >= 0) {
if (!result.addresses[inputAddressString]) {
result.addresses[inputAddressString] = {
inputIndexes: [inputIndex],
outputIndexes: []
};
} else {
result.addresses[inputAddressString].inputIndexes.push(inputIndex);
}
result.satoshis -= input.output.satoshis;
}
}
}
for (var outputIndex = 0; outputIndex < transaction.outputs.length; outputIndex++) {
var output = transaction.outputs[outputIndex];
if (!output.script) {
continue;
}
var outputAddress = output.script.toAddress(this.node.network);
if (outputAddress) {
var outputAddressString = outputAddress.toString();
if (this.addressStrings.indexOf(outputAddressString) >= 0) {
if (!result.addresses[outputAddressString]) {
result.addresses[outputAddressString] = {
inputIndexes: [],
outputIndexes: [outputIndex]
};
} else {
result.addresses[outputAddressString].outputIndexes.push(outputIndex);
}
result.satoshis += output.satoshis;
}
}
}
return result;
};
module.exports = AddressHistory;

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,40 @@
'use strict';
var Transform = require('stream').Transform;
var inherits = require('util').inherits;
var bitcore = require('bitcore-lib');
var encodingUtil = require('../encoding');
var $ = bitcore.util.preconditions;
function InputsTransformStream(options) {
$.checkArgument(options.address instanceof bitcore.Address);
Transform.call(this, {
objectMode: true
});
this._address = options.address;
this._addressStr = this._address.toString();
this._tipHeight = options.tipHeight;
}
inherits(InputsTransformStream, Transform);
InputsTransformStream.prototype._transform = function(chunk, encoding, callback) {
var self = this;
var key = encodingUtil.decodeInputKey(chunk.key);
var value = encodingUtil.decodeInputValue(chunk.value);
var input = {
address: this._addressStr,
hashType: this._address.type,
txid: value.txid.toString('hex'),
inputIndex: value.inputIndex,
height: key.height,
confirmations: this._tipHeight - key.height + 1
};
self.push(input);
callback();
};
module.exports = InputsTransformStream;

View File

@ -0,0 +1,42 @@
'use strict';
var Transform = require('stream').Transform;
var inherits = require('util').inherits;
var bitcore = require('bitcore-lib');
var encodingUtil = require('../encoding');
var $ = bitcore.util.preconditions;
function OutputsTransformStream(options) {
Transform.call(this, {
objectMode: true
});
$.checkArgument(options.address instanceof bitcore.Address);
this._address = options.address;
this._addressStr = this._address.toString();
this._tipHeight = options.tipHeight;
}
inherits(OutputsTransformStream, Transform);
OutputsTransformStream.prototype._transform = function(chunk, encoding, callback) {
var self = this;
var key = encodingUtil.decodeOutputKey(chunk.key);
var value = encodingUtil.decodeOutputValue(chunk.value);
var output = {
address: this._addressStr,
hashType: this._address.type,
txid: key.txid.toString('hex'), //TODO use a buffer
outputIndex: key.outputIndex,
height: key.height,
satoshis: value.satoshis,
script: value.scriptBuffer.toString('hex'), //TODO use a buffer
confirmations: this._tipHeight - key.height + 1
};
self.push(output);
callback();
};
module.exports = OutputsTransformStream;

View File

@ -156,6 +156,7 @@ Bitcoin.prototype._initClients = function() {
* Called by Node to determine the available API methods.
*/
Bitcoin.prototype.getAPIMethods = function() {
return [];
var methods = [
['getBlock', this, this.getBlock, 1],
['getRawBlock', this, this.getRawBlock, 1],

804
lib/services/db.js Normal file
View File

@ -0,0 +1,804 @@
'use strict';
var util = require('util');
var fs = require('fs');
var async = require('async');
var levelup = require('levelup');
var leveldown = require('leveldown');
var mkdirp = require('mkdirp');
var bitcore = require('bitcore-lib');
var BufferUtil = bitcore.util.buffer;
var Networks = bitcore.Networks;
var Block = bitcore.Block;
var $ = bitcore.util.preconditions;
var index = require('../');
var errors = index.errors;
var log = index.log;
var Transaction = require('../transaction');
var Service = require('../service');
/**
* This service synchronizes a leveldb database with bitcoin block chain by connecting and
* disconnecting blocks to build new indexes that can be queried. Other services can extend
* the data that is indexed by implementing a `blockHandler` method.
*
* @param {Object} options
* @param {Node} options.node - A reference to the node
* @param {Node} options.store - A levelup backend store
*/
function DB(options) {
/* jshint maxstatements: 20 */
if (!(this instanceof DB)) {
return new DB(options);
}
if (!options) {
options = {};
}
Service.call(this, options);
// Used to keep track of the version of the indexes
// to determine during an upgrade if a reindex is required
this.version = 2;
this.tip = null;
this.genesis = null;
$.checkState(this.node.network, 'Node is expected to have a "network" property');
this.network = this.node.network;
this._setDataPath();
this.maxOpenFiles = options.maxOpenFiles || DB.DEFAULT_MAX_OPEN_FILES;
this.maxTransactionLimit = options.maxTransactionLimit || DB.MAX_TRANSACTION_LIMIT;
this.levelupStore = leveldown;
if (options.store) {
this.levelupStore = options.store;
}
this.retryInterval = 60000;
this.subscriptions = {
transaction: [],
block: []
};
}
util.inherits(DB, Service);
DB.dependencies = ['bitcoind'];
DB.PREFIXES = {
VERSION: new Buffer('ff', 'hex'),
BLOCKS: new Buffer('01', 'hex'),
TIP: new Buffer('04', 'hex')
};
// The maximum number of transactions to query at once
// Used for populating previous inputs
DB.MAX_TRANSACTION_LIMIT = 5;
// The default maxiumum number of files open for leveldb
DB.DEFAULT_MAX_OPEN_FILES = 200;
/**
* This function will set `this.dataPath` based on `this.node.network`.
* @private
*/
DB.prototype._setDataPath = function() {
$.checkState(this.node.datadir, 'Node is expected to have a "datadir" property');
if (this.node.network === Networks.livenet) {
this.dataPath = this.node.datadir + '/bitcore-node.db';
} else if (this.node.network === Networks.testnet) {
if (this.node.network.regtestEnabled) {
this.dataPath = this.node.datadir + '/regtest/bitcore-node.db';
} else {
this.dataPath = this.node.datadir + '/testnet3/bitcore-node.db';
}
} else {
throw new Error('Unknown network: ' + this.network);
}
};
DB.prototype._checkVersion = function(callback) {
var self = this;
var options = {
keyEncoding: 'binary',
valueEncoding: 'binary'
};
self.store.get(DB.PREFIXES.TIP, options, function(err) {
if (err instanceof levelup.errors.NotFoundError) {
// The database is brand new and doesn't have a tip stored
// we can skip version checking
return callback();
} else if (err) {
return callback(err);
}
self.store.get(DB.PREFIXES.VERSION, options, function(err, buffer) {
var version;
if (err instanceof levelup.errors.NotFoundError) {
// The initial version (1) of the database didn't store the version number
version = 1;
} else if (err) {
return callback(err);
} else {
version = buffer.readUInt32BE();
}
if (self.version !== version) {
var helpUrl = 'https://github.com/bitpay/bitcore-node/blob/master/docs/services/db.md#how-to-reindex';
return callback(new Error(
'The version of the database "' + version + '" does not match the expected version "' +
self.version + '". A recreation of "' + self.dataPath + '" (can take several hours) is ' +
'required or to switch versions of software to match. Please see ' + helpUrl +
' for more information.'
));
}
callback();
});
});
};
DB.prototype._setVersion = function(callback) {
var versionBuffer = new Buffer(new Array(4));
versionBuffer.writeUInt32BE(this.version);
this.store.put(DB.PREFIXES.VERSION, versionBuffer, callback);
};
/**
* Called by Node to start the service.
* @param {Function} callback
*/
DB.prototype.start = function(callback) {
var self = this;
if (!fs.existsSync(this.dataPath)) {
mkdirp.sync(this.dataPath);
}
this.genesis = Block.fromBuffer(this.node.services.bitcoind.genesisBuffer);
this.store = levelup(this.dataPath, { db: this.levelupStore, maxOpenFiles: this.maxOpenFiles });
this.node.services.bitcoind.on('tx', this.transactionHandler.bind(this));
this.once('ready', function() {
log.info('Bitcoin Database Ready');
// Notify that there is a new tip
self.node.services.bitcoind.on('tip', function(height) {
if(!self.node.stopping) {
self.sync();
}
});
});
async.series([
function(next) {
self._checkVersion(next);
},
function(next) {
self._setVersion(next);
}
], function(err) {
if (err) {
return callback(err);
}
self.loadTip(function(err) {
if (err) {
return callback(err);
}
self.sync();
self.emit('ready');
setImmediate(callback);
});
});
};
/**
* Called by Node to stop the service
* @param {Function} callback
*/
DB.prototype.stop = function(callback) {
var self = this;
// Wait until syncing stops and all db operations are completed before closing leveldb
async.whilst(function() {
return self.bitcoindSyncing;
}, function(next) {
setTimeout(next, 10);
}, function() {
self.store.close(callback);
});
};
/**
* Will give information about the database from bitcoin.
* @param {Function} callback
*/
DB.prototype.getInfo = function(callback) {
var self = this;
setImmediate(function() {
var info = self.node.bitcoind.getInfo();
callback(null, info);
});
};
/**
* Closes the underlying store database
* @param {Function} callback
*/
DB.prototype.close = function(callback) {
this.store.close(callback);
};
/**
* This function is responsible for emitting `db/transaction` events.
* @param {Object} txInfo - The data from the bitcoind.on('tx') event
* @param {Buffer} txInfo.buffer - The transaction buffer
* @param {Boolean} txInfo.mempool - If the transaction was accepted in the mempool
* @param {String} txInfo.hash - The hash of the transaction
*/
DB.prototype.transactionHandler = function(tx) {
for (var i = 0; i < this.subscriptions.transaction.length; i++) {
this.subscriptions.transaction[i].emit('db/transaction', {
rejected: !txInfo.mempool,
tx: tx
});
}
};
/**
* Called by Node to determine the available API methods.
*/
DB.prototype.getAPIMethods = function() {
var methods = [
['getBlock', this, this.getBlock, 1],
['getBlockHashesByTimestamp', this, this.getBlockHashesByTimestamp, 2],
['getTransaction', this, this.getTransaction, 2],
['getTransactionWithBlockInfo', this, this.getTransactionWithBlockInfo, 2],
['sendTransaction', this, this.sendTransaction, 1],
['estimateFee', this, this.estimateFee, 1]
];
return methods;
};
DB.prototype.loadTip = function(callback) {
var self = this;
var options = {
keyEncoding: 'binary',
valueEncoding: 'binary'
};
self.store.get(DB.PREFIXES.TIP, options, function(err, tipData) {
if(err && err instanceof levelup.errors.NotFoundError) {
self.tip = self.genesis;
self.tip.__height = 0;
self.connectBlock(self.genesis, function(err) {
if(err) {
return callback(err);
}
self.emit('addblock', self.genesis);
callback();
});
return;
} else if(err) {
return callback(err);
}
var hash = tipData.toString('hex');
var times = 0;
async.retry({times: 3, interval: self.retryInterval}, function(done) {
self.getBlock(hash, function(err, tip) {
if(err) {
times++;
log.warn('Bitcoind does not have our tip (' + hash + '). Bitcoind may have crashed and needs to catch up.');
if(times < 3) {
log.warn('Retrying in ' + (self.retryInterval / 1000) + ' seconds.');
}
return done(err);
}
done(null, tip);
});
}, function(err, tip) {
if(err) {
log.warn('Giving up after 3 tries. Please report this bug to https://github.com/bitpay/bitcore-node/issues');
log.warn('Please reindex your database.');
return callback(err);
}
self.tip = tip;
var blockIndex = self.node.services.bitcoind.getBlockIndex(self.tip.hash);
if(!blockIndex) {
return callback(new Error('Could not get height for tip.'));
}
self.tip.__height = blockIndex.height;
callback();
});
});
};
/**
* Will get a block from bitcoind and give a Bitcore Block
* @param {String|Number} hash - A block hash or block height
*/
DB.prototype.getBlock = function(hash, callback) {
this.node.services.bitcoind.getBlock(hash, callback);
};
/**
* Get block hashes between two timestamps
* @param {Number} high - high timestamp, in seconds, inclusive
* @param {Number} low - low timestamp, in seconds, inclusive
* @param {Function} callback
*/
DB.prototype.getBlockHashesByTimestamp = function(high, low, callback) {
var self = this;
var hashes = [];
var lowKey;
var highKey;
try {
lowKey = this._encodeBlockIndexKey(low);
highKey = this._encodeBlockIndexKey(high);
} catch(e) {
return callback(e);
}
var stream = this.store.createReadStream({
gte: lowKey,
lte: highKey,
reverse: true,
valueEncoding: 'binary',
keyEncoding: 'binary'
});
stream.on('data', function(data) {
hashes.push(self._decodeBlockIndexValue(data.value));
});
var error;
stream.on('error', function(streamError) {
if (streamError) {
error = streamError;
}
});
stream.on('close', function() {
if (error) {
return callback(error);
}
callback(null, hashes);
});
return stream;
};
/**
* Will give a Bitcore Transaction from bitcoind by txid
* @param {String} txid - A transaction hash
* @param {Boolean} queryMempool - Include the mempool
* @param {Function} callback
*/
DB.prototype.getTransaction = function(txid, queryMempool, callback) {
this.node.services.bitcoind.getTransaction(txid, queryMempool, function(err, txBuffer) {
if (err) {
return callback(err);
}
if (!txBuffer) {
return callback(new errors.Transaction.NotFound());
}
callback(null, Transaction().fromBuffer(txBuffer));
});
};
/**
* Will give a Bitcore Transaction and populated information about the block included.
* @param {String} txid - A transaction hash
* @param {Boolean} queryMempool - Include the mempool
* @param {Function} callback
*/
DB.prototype.getTransactionWithBlockInfo = function(txid, queryMempool, callback) {
this.node.services.bitcoind.getTransactionWithBlockInfo(txid, queryMempool, function(err, obj) {
if (err) {
return callback(err);
}
var tx = Transaction().fromBuffer(obj.buffer);
tx.__blockHash = obj.blockHash;
tx.__height = obj.height;
tx.__timestamp = obj.timestamp;
callback(null, tx);
});
};
/**
* Will send a transaction to the Bitcoin network.
* @param {Transaction} tx - An instance of a Bitcore Transaction
* @param {Function} callback
*/
DB.prototype.sendTransaction = function(tx, callback) {
var txString;
if (tx instanceof Transaction) {
txString = tx.serialize();
} else {
txString = tx;
}
try {
var txid = this.node.services.bitcoind.sendTransaction(txString);
return callback(null, txid);
} catch(err) {
return callback(err);
}
};
/**
* Will estimate fees for a transaction and give a result in
* satoshis per kilobyte. Similar to the bitcoind estimateFee method.
* @param {Number} blocks - The number of blocks for the transaction to be included.
* @param {Function} callback
*/
DB.prototype.estimateFee = function(blocks, callback) {
var self = this;
setImmediate(function() {
callback(null, self.node.services.bitcoind.estimateFee(blocks));
});
};
/**
* Called by the Bus to determine the available events.
*/
DB.prototype.getPublishEvents = function() {
return [
{
name: 'db/transaction',
scope: this,
subscribe: this.subscribe.bind(this, 'transaction'),
unsubscribe: this.unsubscribe.bind(this, 'transaction')
},
{
name: 'db/block',
scope: this,
subscribe: this.subscribe.bind(this, 'block'),
unsubscribe: this.unsubscribe.bind(this, 'block')
}
];
};
DB.prototype.subscribe = function(name, emitter) {
this.subscriptions[name].push(emitter);
};
DB.prototype.unsubscribe = function(name, emitter) {
var index = this.subscriptions[name].indexOf(emitter);
if (index > -1) {
this.subscriptions[name].splice(index, 1);
}
};
/**
* Will give the previous hash for a block.
* @param {String} blockHash
* @param {Function} callback
*/
DB.prototype.getPrevHash = function(blockHash, callback) {
var blockIndex = this.node.services.bitcoind.getBlockIndex(blockHash);
setImmediate(function() {
if (blockIndex) {
callback(null, blockIndex.prevHash);
} else {
callback(new Error('Could not get prevHash, block not found'));
}
});
};
/**
* Connects a block to the database and add indexes
* @param {Block} block - The bitcore block
* @param {Function} callback
*/
DB.prototype.connectBlock = function(block, callback) {
log.debug('DB handling new chain block');
this.runAllBlockHandlers(block, true, callback);
};
/**
* Disconnects a block from the database and removes indexes
* @param {Block} block - The bitcore block
* @param {Function} callback
*/
DB.prototype.disconnectBlock = function(block, callback) {
log.debug('DB removing chain block');
this.runAllBlockHandlers(block, false, callback);
};
/**
* Will collect all database operations for a block from other services that implement
* `blockHandler` methods and then save operations to the database.
* @param {Block} block - The bitcore block
* @param {Boolean} add - If the block is being added/connected or removed/disconnected
* @param {Function} callback
*/
DB.prototype.runAllBlockHandlers = function(block, add, callback) {
var self = this;
var operations = [];
// Notify block subscribers
for (var i = 0; i < this.subscriptions.block.length; i++) {
this.subscriptions.block[i].emit('db/block', block.hash);
}
// Update tip
var tipHash = add ? new Buffer(block.hash, 'hex') : BufferUtil.reverse(block.header.prevHash);
operations.push({
type: 'put',
key: DB.PREFIXES.TIP,
value: tipHash
});
// Update block index
operations.push({
type: add ? 'put' : 'del',
key: this._encodeBlockIndexKey(block.header.timestamp),
value: this._encodeBlockIndexValue(block.hash)
});
async.eachSeries(
this.node.services,
function(mod, next) {
if(mod.blockHandler) {
$.checkArgument(typeof mod.blockHandler === 'function', 'blockHandler must be a function');
mod.blockHandler.call(mod, block, add, function(err, ops) {
if (err) {
return next(err);
}
if (ops) {
$.checkArgument(Array.isArray(ops), 'blockHandler for ' + mod.name + ' returned non-array');
operations = operations.concat(ops);
}
next();
});
} else {
setImmediate(next);
}
},
function(err) {
if (err) {
return callback(err);
}
log.debug('Updating the database with operations', operations);
self.store.batch(operations, callback);
}
);
};
DB.prototype._encodeBlockIndexKey = function(timestamp) {
$.checkArgument(timestamp >= 0 && timestamp <= 4294967295, 'timestamp out of bounds');
var timestampBuffer = new Buffer(4);
timestampBuffer.writeUInt32BE(timestamp);
return Buffer.concat([DB.PREFIXES.BLOCKS, timestampBuffer]);
};
DB.prototype._encodeBlockIndexValue = function(hash) {
return new Buffer(hash, 'hex');
};
DB.prototype._decodeBlockIndexValue = function(value) {
return value.toString('hex');
};
/**
* This function will find the common ancestor between the current chain and a forked block,
* by moving backwards on both chains until there is a meeting point.
* @param {Block} block - The new tip that forks the current chain.
* @param {Function} done - A callback function that is called when complete.
*/
DB.prototype.findCommonAncestor = function(block, done) {
var self = this;
var mainPosition = self.tip.hash;
var forkPosition = block.hash;
var mainHashesMap = {};
var forkHashesMap = {};
mainHashesMap[mainPosition] = true;
forkHashesMap[forkPosition] = true;
var commonAncestor = null;
async.whilst(
function() {
return !commonAncestor;
},
function(next) {
if(mainPosition) {
var mainBlockIndex = self.node.services.bitcoind.getBlockIndex(mainPosition);
if(mainBlockIndex && mainBlockIndex.prevHash) {
mainHashesMap[mainBlockIndex.prevHash] = true;
mainPosition = mainBlockIndex.prevHash;
} else {
mainPosition = null;
}
}
if(forkPosition) {
var forkBlockIndex = self.node.services.bitcoind.getBlockIndex(forkPosition);
if(forkBlockIndex && forkBlockIndex.prevHash) {
forkHashesMap[forkBlockIndex.prevHash] = true;
forkPosition = forkBlockIndex.prevHash;
} else {
forkPosition = null;
}
}
if(forkPosition && mainHashesMap[forkPosition]) {
commonAncestor = forkPosition;
}
if(mainPosition && forkHashesMap[mainPosition]) {
commonAncestor = mainPosition;
}
if(!mainPosition && !forkPosition) {
return next(new Error('Unknown common ancestor'));
}
setImmediate(next);
},
function(err) {
done(err, commonAncestor);
}
);
};
/**
* This function will attempt to rewind the chain to the common ancestor
* between the current chain and a forked block.
* @param {Block} block - The new tip that forks the current chain.
* @param {Function} done - A callback function that is called when complete.
*/
DB.prototype.syncRewind = function(block, done) {
var self = this;
self.findCommonAncestor(block, function(err, ancestorHash) {
if (err) {
return done(err);
}
log.warn('Reorg common ancestor found:', ancestorHash);
// Rewind the chain to the common ancestor
async.whilst(
function() {
// Wait until the tip equals the ancestor hash
return self.tip.hash !== ancestorHash;
},
function(removeDone) {
var tip = self.tip;
// TODO: expose prevHash as a string from bitcore
var prevHash = BufferUtil.reverse(tip.header.prevHash).toString('hex');
self.getBlock(prevHash, function(err, previousTip) {
if (err) {
removeDone(err);
}
// Undo the related indexes for this block
self.disconnectBlock(tip, function(err) {
if (err) {
return removeDone(err);
}
// Set the new tip
previousTip.__height = self.tip.__height - 1;
self.tip = previousTip;
self.emit('removeblock', tip);
removeDone();
});
});
}, done
);
});
};
/**
* This function will synchronize additional indexes for the chain based on
* the current active chain in the bitcoin daemon. In the event that there is
* a reorganization in the daemon, the chain will rewind to the last common
* ancestor and then resume syncing.
*/
DB.prototype.sync = function() {
var self = this;
if (self.bitcoindSyncing || self.node.stopping || !self.tip) {
return;
}
self.bitcoindSyncing = true;
var height;
async.whilst(function() {
height = self.tip.__height;
return height < self.node.services.bitcoind.height && !self.node.stopping;
}, function(done) {
self.node.services.bitcoind.getBlock(height + 1, function(err, block) {
if (err) {
return done(err);
}
// TODO: expose prevHash as a string from bitcore
var prevHash = BufferUtil.reverse(block.header.prevHash).toString('hex');
if (prevHash === self.tip.hash) {
// This block appends to the current chain tip and we can
// immediately add it to the chain and create indexes.
// Populate height
block.__height = self.tip.__height + 1;
// Create indexes
self.connectBlock(block, function(err) {
if (err) {
return done(err);
}
self.tip = block;
log.debug('Chain added block to main chain');
self.emit('addblock', block);
setImmediate(done);
});
} else {
// This block doesn't progress the current tip, so we'll attempt
// to rewind the chain to the common ancestor of the block and
// then we can resume syncing.
log.warn('Beginning reorg! Current tip: ' + self.tip.hash + '; New tip: ' + block.hash);
self.syncRewind(block, function(err) {
if(err) {
return done(err);
}
log.warn('Reorg complete. New tip is ' + self.tip.hash);
done();
});
}
});
}, function(err) {
if (err) {
Error.captureStackTrace(err);
return self.node.emit('error', err);
}
if(self.node.stopping) {
self.bitcoindSyncing = false;
return;
}
if (self.node.services.bitcoind.isSynced()) {
self.bitcoindSyncing = false;
self.node.emit('synced');
} else {
self.bitcoindSyncing = false;
}
});
};
module.exports = DB;

63
lib/transaction.js Normal file
View File

@ -0,0 +1,63 @@
'use strict';
var async = require('async');
var levelup = require('levelup');
var bitcore = require('bitcore-lib');
var Transaction = bitcore.Transaction;
var MAX_TRANSACTION_LIMIT = 5;
Transaction.prototype.populateInputs = function(db, poolTransactions, callback) {
var self = this;
if(this.isCoinbase()) {
return setImmediate(callback);
}
async.eachLimit(
this.inputs,
db.maxTransactionLimit || MAX_TRANSACTION_LIMIT,
function(input, next) {
self._populateInput(db, input, poolTransactions, next);
},
callback
);
};
Transaction.prototype._populateInput = function(db, input, poolTransactions, callback) {
if (!input.prevTxId || !Buffer.isBuffer(input.prevTxId)) {
return callback(new Error('Input is expected to have prevTxId as a buffer'));
}
var txid = input.prevTxId.toString('hex');
db.getTransaction(txid, true, function(err, prevTx) {
if(err instanceof levelup.errors.NotFoundError) {
// Check the pool for transaction
for(var i = 0; i < poolTransactions.length; i++) {
if(txid === poolTransactions[i].hash) {
input.output = poolTransactions[i].outputs[input.outputIndex];
return callback();
}
}
return callback(new Error('Previous tx ' + input.prevTxId.toString('hex') + ' not found'));
} else if(err) {
callback(err);
} else {
input.output = prevTx.outputs[input.outputIndex];
callback();
}
});
};
Transaction.prototype._checkSpent = function(db, input, poolTransactions, callback) {
// TODO check and see if another transaction in the pool spent the output
db.isSpentDB(input, function(spent) {
if(spent) {
return callback(new Error('Input already spent'));
} else {
callback();
}
});
};
module.exports = Transaction;

View File

@ -52,8 +52,11 @@
"commander": "^2.8.1",
"errno": "^0.1.4",
"express": "^4.13.3",
"leveldown": "bitpay/leveldown#bitpay-1.4.4",
"levelup": "^1.3.1",
"liftoff": "^2.2.0",
"lru-cache": "^4.0.1",
"memdown": "^1.0.0",
"mkdirp": "0.5.0",
"path-is-absolute": "^1.0.0",
"semver": "^5.0.1",