This commit is contained in:
Chris Kleeschulte 2017-05-01 16:01:26 -04:00
parent 3fe2c3ea16
commit d9efd7bc02
13 changed files with 969 additions and 698 deletions

View File

@ -5,13 +5,14 @@ var bitcore = require('bitcore-lib');
var zmq = require('zmq');
var async = require('async');
var BitcoinRPC = require('bitcoind-rpc');
var $ = bitcore.util.preconditions;
var _ = bitcore.deps._;
var index = require('../../');
var errors = index.errors;
var log = index.log;
var Service = require('../../service');
var LRU = require('lru-cache');
function Bitcoin(options) {
if (!(this instanceof Bitcoin)) {
@ -35,6 +36,8 @@ function Bitcoin(options) {
this.on('error', function(err) {
log.error(err.stack);
});
this._hashBlockCache = LRU(100);
}
util.inherits(Bitcoin, Service);
@ -110,10 +113,39 @@ Bitcoin.prototype._wrapRPCError = function(errObj) {
return err;
};
Bitcoin.prototype._initChain = function(callback) {
Bitcoin.prototype._getGenesisBlock = function(callback) {
var self = this;
self.client.getBlockHash(0, function(err, response) {
if (err) {
return callback(self._wrapRPCError(err));
}
var blockhash = response.result;
self.getRawBlock(blockhash, function(err, blockBuffer) {
if (err) {
return callback(err);
}
self.genesisBuffer = blockBuffer;
callback();
});
});
};
Bitcoin.prototype._getNetworkTip = function(callback) {
var self = this;
self.client.getBestBlockHash(function(err, response) {
if (err) {
return callback(self._wrapRPCError(err));
}
@ -121,76 +153,68 @@ Bitcoin.prototype._initChain = function(callback) {
self.tiphash = response.result;
self.client.getBlock(response.result, function(err, response) {
if (err) {
return callback(self._wrapRPCError(err));
}
self.height = response.result.height;
callback();
self.client.getBlockHash(0, function(err, response) {
if (err) {
return callback(self._wrapRPCError(err));
}
var blockhash = response.result;
self.getRawBlock(blockhash, function(err, blockBuffer) {
if (err) {
return callback(err);
}
self.genesisBuffer = blockBuffer;
self.emit('ready');
log.info('Bitcoin Daemon Ready');
callback();
});
});
});
});
};
Bitcoin.prototype._zmqBlockHandler = function(node, message) {
Bitcoin.prototype._initChain = function(callback) {
var self = this;
self._rapidProtectedUpdateTip(node, message);
async.series([
self._getNetworkTip.bind(self),
self._getGenesisBlock.bind(self),
], function(err) {
if(err) {
return callback(err);
}
self.emit('ready');
callback();
});
};
Bitcoin.prototype._zmqBlockHandler = function(message) {
var self = this;
var hashBlockHex = message.toString('hex');
if (!self._isSendableHashBlock(hashBlockHex)) {
return;
}
self._hashBlockCache.set(hashBlockHex);
self.tiphash = hashBlockHex;
self.emit('block', message);
for (var i = 0; i < this.subscriptions.hashblock.length; i++) {
this.subscriptions.hashblock[i].emit('bitcoind/hashblock', message.toString('hex'));
this.subscriptions.hashblock[i].emit('bitcoind/hashblock', hashBlockHex);
}
};
Bitcoin.prototype._rapidProtectedUpdateTip = function(node, message) {
var self = this;
if (new Date() - self.lastTip > 1000) {
self.lastTip = new Date();
self._updateTip(node, message);
} else {
clearTimeout(self.lastTipTimeout);
self.lastTipTimeout = setTimeout(function() {
self._updateTip(node, message);
}, 1000);
}
Bitcoin.prototype._isSendableHashBlock = function(hashBlockHex) {
return hashBlockHex.length === 64 && !this._hashBlockCache.get(hashBlockHex);
};
Bitcoin.prototype._updateTip = function(node, message) {
var self = this;
var hex = message.toString('hex');
if (hex !== self.tiphash) {
self.tiphash = message.toString('hex');
node.client.getBlock(self.tiphash, function(err, response) {
if (err) {
var error = self._wrapRPCError(err);
self.emit('error', error);
} else {
self.height = response.result.height;
$.checkState(self.height >= 0);
self.emit('tip', self.height);
}
});
}
};
Bitcoin.prototype._zmqTransactionHandler = function(node, message) {
var self = this;
self.emit('tx', message);
@ -199,55 +223,6 @@ Bitcoin.prototype._zmqTransactionHandler = function(node, message) {
}
};
Bitcoin.prototype._checkSyncedAndSubscribeZmqEvents = function(node) {
var self = this;
var interval;
function checkAndSubscribe(callback) {
node.client.getBestBlockHash(function(err, response) {
if (err) {
return callback(self._wrapRPCError(err));
}
var blockhash = new Buffer(response.result, 'hex');
self.emit('block', blockhash);
self._updateTip(node, blockhash);
node.client.getBlockchainInfo(function(err, response) {
if (err) {
return callback(self._wrapRPCError(err));
}
var progress = response.result.verificationprogress;
if (progress >= self.zmqSubscribeProgress) {
self._subscribeZmqEvents(node);
clearInterval(interval);
callback(null, true);
} else {
callback(null, false);
}
});
});
}
checkAndSubscribe(function(err, synced) {
if (err) {
log.error(err);
}
if (!synced) {
interval = setInterval(function() {
if (self.node.stopping) {
return clearInterval(interval);
}
checkAndSubscribe(function(err) {
if (err) {
log.error(err);
}
});
}, node._tipUpdateInterval || Bitcoin.DEFAULT_TIP_UPDATE_INTERVAL);
}
});
};
Bitcoin.prototype._subscribeZmqEvents = function(node) {
var self = this;
node.zmqSubSocket.subscribe('hashblock');
@ -257,7 +232,7 @@ Bitcoin.prototype._subscribeZmqEvents = function(node) {
if (topicString === 'rawtx') {
self._zmqTransactionHandler(node, message);
} else if (topicString === 'hashblock') {
self._zmqBlockHandler(node, message);
self._zmqBlockHandler(message);
}
});
};
@ -293,105 +268,44 @@ Bitcoin.prototype._initZmqSubSocket = function(node, zmqUrl) {
}
};
Bitcoin.prototype._loadTipFromNode = function(node, callback) {
var self = this;
node.client.getBestBlockHash(function(err, response) {
if (err && err.code === -28) {
log.warn(err.message);
return callback(self._wrapRPCError(err));
} else if (err) {
return callback(self._wrapRPCError(err));
}
node.client.getBlock(response.result, function(err, response) {
if (err) {
return callback(self._wrapRPCError(err));
}
self.height = response.result.height;
$.checkState(self.height >= 0);
self.emit('tip', self.height);
callback();
});
});
};
Bitcoin.prototype._connectProcess = function(config, callback) {
Bitcoin.prototype._connectProcess = function(config) {
var self = this;
var node = {};
var exitShutdown = false;
async.retry({times: self.startRetryTimes, interval: self.startRetryInterval}, function(done) {
if (self.node.stopping) {
exitShutdown = true;
return done();
}
node.client = new BitcoinRPC({
protocol: config.rpcprotocol || 'http',
host: config.rpchost || '127.0.0.1',
port: config.rpcport,
user: config.rpcuser,
pass: config.rpcpassword,
rejectUnauthorized: _.isUndefined(config.rpcstrict) ? true : config.rpcstrict
});
self._loadTipFromNode(node, done);
}, function(err) {
if (err) {
return callback(err);
}
if (exitShutdown) {
return callback(new Error('Stopping while trying to connect to bitcoind.'));
}
self._initZmqSubSocket(node, config.zmqpubrawtx);
self._subscribeZmqEvents(node);
callback(null, node);
node.client = new BitcoinRPC({
protocol: config.rpcprotocol || 'http',
host: config.rpchost || '127.0.0.1',
port: config.rpcport,
user: config.rpcuser,
pass: config.rpcpassword,
rejectUnauthorized: _.isUndefined(config.rpcstrict) ? true : config.rpcstrict
});
self._initZmqSubSocket(node, config.zmqpubrawtx);
self._subscribeZmqEvents(node);
return node;
};
Bitcoin.prototype.start = function(callback) {
var self = this;
async.series([
function(next) {
if (self.options.spawn) {
self._spawnChildProcess(function(err, node) {
if (err) {
return next(err);
}
self.nodes.push(node);
next();
});
} else {
next();
}
},
function(next) {
if (self.options.connect) {
async.map(self.options.connect, self._connectProcess.bind(self), function(err, nodes) {
if (err) {
return callback(err);
}
for(var i = 0; i < nodes.length; i++) {
self.nodes.push(nodes[i]);
}
next();
});
} else {
next();
}
}
], function(err) {
if (err) {
return callback(err);
}
if (self.nodes.length === 0) {
return callback(new Error('Bitcoin configuration options "spawn" or "connect" are expected'));
}
self._initChain(callback);
if (!self.options.connect) {
throw new Error('A "connect" array is required in the bitcoind service configuration.');
}
self.nodes = self.options.connect.map(self._connectProcess.bind(self));
if (self.nodes.length === 0) {
throw new Error('Could not connect to any servers in connect array.');
}
self._initChain(function() {
log.info('Bitcoin Daemon Ready');
callback();
});
};
@ -438,6 +352,42 @@ Bitcoin.prototype.getRawBlock = function(blockArg, callback) {
self._maybeGetBlockHash(blockArg, queryBlock);
};
Bitcoin.prototype.getBlockHeader = function(blockArg, callback) {
var self = this;
function queryHeader(err, blockhash) {
if (err) {
return callback(err);
}
self._tryAllClients(function(client, done) {
client.getBlockHeader(blockhash, function(err, response) {
if (err) {
return done(self._wrapRPCError(err));
}
var result = response.result;
var header = {
hash: result.hash,
version: result.version,
confirmations: result.confirmations,
height: result.height,
chainWork: result.chainwork,
prevHash: result.previousblockhash,
nextHash: result.nextblockhash,
merkleRoot: result.merkleroot,
time: result.time,
medianTime: result.mediantime,
nonce: result.nonce,
bits: result.bits,
difficulty: result.difficulty
};
done(null, header);
});
}, callback);
}
self._maybeGetBlockHash(blockArg, queryHeader);
};
Bitcoin.prototype.getBlock = function(blockArg, callback) {
var self = this;

View File

@ -16,6 +16,7 @@ var log = index.log;
var Service = require('../../service');
var Sync = require('./sync');
var Reorg = require('./reorg');
var Block = bitcore.Block;
/*
* @param {Object} options
@ -63,6 +64,7 @@ function DB(options) {
this._sync = new Sync(this.node, this);
this.syncing = true;
this.bitcoind = this.node.services.bitcoind;
}
util.inherits(DB, Service);
@ -129,63 +131,24 @@ DB.prototype.start = function(callback) {
mkdirp.sync(this.dataPath);
}
self.genesis = Block.fromBuffer(self.node.services.bitcoind.genesisBuffer);
self.genesis = Block.fromBuffer(self.bitcoind.genesisBuffer);
self.store = levelup(self.dataPath, { db: self.levelupStore, keyEncoding: 'binary', valueEncoding: 'binary'});
self._sync.on('error', function(err) {
self.syncing = false;
log.error(err);
});
self._sync.on('reorg', function(block) {
log.warn('Reorg detected! Tip: ' + self.tip.hash +
' Concurrent tip: ' + self.concurrentTip.hash +
' Bitcoind tip: ' + self.node.services.bitcoind.tiphash);
self.reorg = true;
var reorg = new Reorg(self.node, self);
reorg.handleReorg(block, function(err) {
if(err) {
log.error('Reorg failed! ' + err);
return self.node.stop(function() {});
}
log.warn('Reorg successful! Tip: ' + self.tip.hash +
' Concurrent tip: ' + self.concurrentTip.hash +
' Bitcoind tip: ' + self.node.services.bitcoind.tiphash
);
self.reorg = false;
self._sync.sync();
});
});
self._sync.once('synced', function() {
self.syncing = false;
self.node.services.bitcoind.on('tip', function(height) {
log.info('New tip at height: ' + height + ' hash: ' + self.node.services.bitcoind.tiphash);
self._sync.sync();
});
log.info('Initial sync complete');
});
self.node.on('stopping', function() {
self._sync.stop();
});
self.node.once('ready', function() {
function finish(err) {
self.loadTips(function(err) {
if(err) {
throw err;
}
self._sync.initialSync();
}
self.loadTip(self.loadConcurrentTip.bind(self, finish));
self._sync.sync();
});
});
setImmediate(function() {
@ -194,6 +157,81 @@ DB.prototype.start = function(callback) {
};
DB.prototype.detectReorg = function(blocks) {
var self = this;
if (!blocks || blocks.length === 0) {
return;
}
var tipHash = self.reorgTipHash || self.tip.hash;
var chainMembers = [];
var loopIndex = 0;
var overallCounter = 0;
while(overallCounter < blocks.length) {
if (loopIndex >= blocks.length) {
overallCounter++;
loopIndex = 0;
}
var prevHash = BufferUtil.reverse(blocks[loopIndex].header.prevHash).toString('hex');
if (prevHash === tipHash) {
tipHash = blocks[loopIndex].hash;
chainMembers.push(blocks[loopIndex]);
}
loopIndex++;
}
for(var i = 0; i < blocks.length; i++) {
if (chainMembers.indexOf(blocks[i]) === -1) {
return blocks[i];
}
self.reorgTipHash = blocks[i].hash;
}
};
DB.prototype.handleReorg = function(forkBlock, callback) {
var self = this;
self.printTipInfo('Reorg detected!');
self.reorg = true;
var reorg = new Reorg(self.node, self);
reorg.handleReorg(forkBlock.hash, function(err) {
if(err) {
log.error('Reorg failed! ' + err);
self.node.stop(function() {});
throw err;
}
self.printTipInfo('Reorg successful!');
self.reorg = false;
callback();
});
};
DB.prototype.printTipInfo = function(prependedMessage) {
log.info(
prependedMessage + ' Serial Tip: ' + this.tip.hash +
' Concurrent tip: ' + this.concurrentTip.hash +
' Bitcoind tip: ' + this.bitcoind.tiphash
);
};
DB.prototype.stop = function(callback) {
var self = this;
async.whilst(function() {
@ -201,7 +239,9 @@ DB.prototype.stop = function(callback) {
}, function(next) {
setTimeout(next, 10);
}, function() {
self.store.close(callback);
if (self.store) {
self.store.close(callback);
}
});
};
@ -213,106 +253,41 @@ DB.prototype.getAPIMethods = function() {
return [];
};
DB.prototype.loadTip = function(callback) {
DB.prototype.loadTips = function(callback) {
var self = this;
self.store.get(self.dbPrefix + 'tip', self.dbOptions, function(err, tipData) {
var tipStrings = ['tip', 'concurrentTip'];
if(err && err instanceof levelup.errors.NotFoundError) {
async.each(tipStrings, function(tip, next) {
self.tip = self.genesis;
self.tip.__height = 0;
self.store.get(self.dbPrefix + tip, self.dbOptions, function(err, tipData) {
self.connectBlock(self.genesis, function(err) {
if(err) {
return callback(err);
}
self.emit('addblock', self.genesis);
callback();
});
} else if(err) {
return callback(err);
} else {
var hash = tipData.slice(0, 32).toString('hex');
var height = tipData.readUInt32BE(32);
var times = 0;
async.retry({times: 3, interval: self.retryInterval}, function(done) {
self.node.services.bitcoind.getBlock(hash, function(err, tip) {
if(err) {
times++;
log.warn('Bitcoind does not have our tip (' + hash + '). Bitcoind may have crashed and needs to catch up.');
if(times < 3) {
log.warn('Retrying in ' + (self.retryInterval / 1000) + ' seconds.');
}
return done(err);
}
done(null, tip);
});
}, function(err, tip) {
if(err) {
log.warn('Giving up after 3 tries. Please report this bug to https://github.com/bitpay/bitcore-node/issues');
log.warn('Please reindex your database.');
return callback(err);
}
tip.__height = height;
self.tip = tip;
callback();
});
}
});
};
DB.prototype.loadConcurrentTip = function(callback) {
var self = this;
self.store.get(self.dbPrefix + 'concurrentTip', self.dbOptions, function(err, tipData) {
if (err instanceof levelup.errors.NotFoundError) {
self.concurrentTip = self.genesis;
self.concurrentTip.__height = 0;
return callback();
} else if (err) {
return callback(err);
}
var hash = tipData.slice(0, 32).toString('hex');
var height = tipData.readUInt32BE(32);
var times = 0;
async.retry({times: 3, interval: self.retryInterval}, function(done) {
self.node.services.bitcoind.getBlock(hash, function(err, concurrentTip) {
if(err) {
times++;
log.warn('Bitcoind does not have our concurrentTip (' + hash + ').' +
' Bitcoind may have crashed and needs to catch up.');
if(times < 3) {
log.warn('Retrying in ' + (self.retryInterval / 1000) + ' seconds.');
}
return done(err);
}
done(null, concurrentTip);
});
}, function(err, concurrentTip) {
if(err) {
log.warn('Giving up after 3 tries. Please report this bug to https://github.com/bitpay/bitcore-node/issues');
log.warn('Please reindex your database.');
return callback(err);
if(err && !(err instanceof levelup.errors.NotFoundError)) {
return next(err);
}
concurrentTip.__height = height;
self.concurrentTip = concurrentTip;
var height, hash;
//genesis block, set to -1 because we have no yet processed the blocks
if (!tipData) {
height = -1;
hash = new Array(65).join('0');
} else {
height = tipData.readUInt32BE(32);
hash = tipData.slice(0, 32).toString('hex');
}
callback();
self[tip] = {
hash: hash,
height: height,
'__height': height //to be consistent with real blocks
};
next();
});
});
}, callback);
};
DB.prototype.getPublishEvents = function() {
@ -437,10 +412,6 @@ DB.prototype.getSerialBlockOperations = function(block, add, callback) {
async.eachSeries(
this.node.services,
function(mod, next) {
//console.log('s***********************');
//console.log('here');
//console.log(mod.name, block.__height);
//console.log('e***********************');
if(mod.blockHandler) {
$.checkArgument(typeof mod.blockHandler === 'function', 'blockHandler must be a function');

View File

@ -10,7 +10,7 @@ function Reorg(node, db) {
this.db = db;
}
Reorg.prototype.handleReorg = function(block, callback) {
Reorg.prototype.handleReorg = function(newBlockHash, callback) {
var self = this;
self.handleConcurrentReorg(function(err) {
@ -18,7 +18,7 @@ Reorg.prototype.handleReorg = function(block, callback) {
return callback(err);
}
self.findCommonAncestorAndNewHashes(self.db.tip, block, function(err, commonAncestor, newHashes) {
self.findCommonAncestorAndNewHashes(self.db.tip.hash, newBlockHash, function(err, commonAncestor, newHashes) {
if(err) {
return callback(err);
}
@ -27,7 +27,6 @@ Reorg.prototype.handleReorg = function(block, callback) {
if(err) {
return callback(err);
}
self.fastForwardBothTips(newHashes, callback);
});
});
@ -41,7 +40,7 @@ Reorg.prototype.handleConcurrentReorg = function(callback) {
return callback();
}
self.findCommonAncestorAndNewHashes(self.db.concurrentTip, self.db.tip, function(err, commonAncestor, newHashes) {
self.findCommonAncestorAndNewHashes(self.db.concurrentTip.hash, self.db.tip.hash, function(err, commonAncestor, newHashes) {
if(err) {
return callback(err);
}
@ -137,7 +136,6 @@ Reorg.prototype.rewindBothTips = function(commonAncestor, callback) {
if(err) {
return next(err);
}
operations.push(self.db.getConcurrentTipOperation(self.db.concurrentTip, false));
next(null, operations);
});
@ -148,7 +146,7 @@ Reorg.prototype.rewindBothTips = function(commonAncestor, callback) {
return next(err);
}
operations.push(self.db.getTipOperation(self.db.concurrentTip, false));
operations.push(self.db.getTipOperation(self.db.tip, false));
next(null, operations);
});
}
@ -237,11 +235,11 @@ Reorg.prototype.fastForwardBothTips = function(newHashes, callback) {
}, callback);
};
Reorg.prototype.findCommonAncestorAndNewHashes = function(oldTip, newTip, callback) {
Reorg.prototype.findCommonAncestorAndNewHashes = function(oldTipHash, newTipHash, callback) {
var self = this;
var mainPosition = oldTip.hash;
var forkPosition = newTip.hash;
var mainPosition = oldTipHash;
var forkPosition = newTipHash;
var mainHashesMap = {};
var forkHashesMap = {};
@ -275,7 +273,6 @@ Reorg.prototype.findCommonAncestorAndNewHashes = function(oldTip, newTip, callba
} else {
mainPosition = null;
}
next();
});
},
@ -344,4 +341,4 @@ Reorg.prototype.findCommonAncestorAndNewHashes = function(oldTip, newTip, callba
);
};
module.exports = Reorg;
module.exports = Reorg;

View File

@ -6,23 +6,25 @@ var inherits = require('util').inherits;
var EventEmitter = require('events').EventEmitter;
var async = require('async');
var bitcore = require('bitcore-lib');
var BufferUtil = bitcore.util.buffer;
var Block = bitcore.Block;
var ProgressBar = require('progress');
var index = require('../../index');
var log = index.log;
var green = '\u001b[42m \u001b[0m';
var red = '\u001b[41m \u001b[0m';
function BlockStream(highWaterMark, bitcoind, dbTip) {
function BlockStream(highWaterMark, db, sync) {
Readable.call(this, {objectMode: true, highWaterMark: highWaterMark});
this.bitcoind = bitcoind;
this.dbTip = dbTip;
this.lastReadHeight = dbTip.__height;
this.lastEmittedHash = dbTip.hash;
this.sync = sync;
this.db = db;
this.dbTip = this.db.tip;
this.lastReadHeight = this.dbTip.__height;
this.lastEmittedHash = this.dbTip.hash;
this.stopping = false;
this.queue = [];
this.processing = false;
this.syncing = true;
this.bitcoind = this.db.bitcoind;
}
inherits(BlockStream, Readable);
@ -73,30 +75,29 @@ function Sync(node, db) {
inherits(Sync, EventEmitter);
Sync.prototype.initialSync = function() {
Sync.prototype.sync = function() {
var self = this;
if(this.syncing || this.db.reorg) {
if(this.syncing) {
return;
}
self.syncing = true;
self.blockStream = new BlockStream(self.highWaterMark, self.node.services.bitcoind, self.db.tip);
var blockStream = new BlockStream(self.highWaterMark, self.db, self);
var processConcurrent = new ProcessConcurrent(self.highWaterMark, self.db);
var writeStream = new WriteStream(self.highWaterMark, self.db);
var processSerial = new ProcessSerial(self.highWaterMark, self.db, self.db.tip);
self._handleErrors(self.blockStream);
self._handleErrors(blockStream);
self._handleErrors(processConcurrent);
self._handleErrors(processSerial);
self._handleErrors(writeStream);
self.blockStream
blockStream
.pipe(processConcurrent)
.pipe(writeStream);
self.blockStream
blockStream
.pipe(processSerial);
self.lastReportedBlock = self.db.tip.__height;
@ -108,21 +109,52 @@ Sync.prototype.initialSync = function() {
clear: true
});
var timer = setInterval(self.reportStatus.bind(this), 1000);
self.progressBarTimer = setInterval(self.reportStatus.bind(self), 1000);
processSerial.on('finish', function() {
self.syncing = false;
if (self.progressBar) {
self.progressBar.terminate();
}
if (timer) {
clearInterval(timer);
}
self.emit('synced');
});
processSerial.on('finish', self._onFinish.bind(self));
};
Sync.prototype._onFinish = function() {
var self = this;
self.syncing = false;
if (self.progressBar) {
self.progressBar.terminate();
}
if (self.progressBarTimer) {
clearInterval(self.progressBarTimer);
}
if (self.forkBlock) {
self.db.handleReorg(self.forkBlock, function() {
self.forkBlock = null;
self.sync();
});
return;
}
self._startSubscriptions();
log.info('Sync complete');
};
Sync.prototype._startSubscriptions = function() {
var self = this;
if (!self.subscribed) {
self.subscribed = true;
self.bus = self.node.openBus({remoteAddress: 'localhost'});
self.bus.on('bitcoind/hashblock', function(hashBlockHex) {
self.sync();
});
self.bus.subscribe('bitcoind/hashblock');
}
};
Sync.prototype.reportStatus = function() {
if (process.stderr.isTTY) {
var tick = this.db.tip.__height - this.lastReportedBlock;
@ -133,47 +165,11 @@ Sync.prototype.reportStatus = function() {
}
};
Sync.prototype.sync = function() {
var self = this;
if(this.syncing || this.db.reorg) {
return;
}
this.syncing = true;
this.blockStream = new BlockStream(this.highWaterMark, this.node.services.bitcoind, this.db.tip);
var processBoth = new ProcessBoth(this.highWaterMark, this.db);
this._handleErrors(this.blockStream);
this._handleErrors(processBoth);
processBoth.on('finish', function() {
self.syncing = false;
});
this.blockStream
.pipe(processBoth);
};
Sync.prototype.stop = function() {
if(this.blockStream) {
this.blockStream.stopping = true;
}
};
Sync.prototype._handleErrors = function(stream) {
var self = this;
stream.on('error', function(err, block) {
stream.on('error', function(err) {
self.syncing = false;
if(err.reorg) {
return self.emit('reorg', block);
}
if(err.reorg2) {
return;
}
self.emit('error', err);
});
};
@ -203,37 +199,10 @@ BlockStream.prototype._process = function() {
return self.queue.length;
}, function(next) {
var heights = self.queue.slice(0, Math.min(5, self.queue.length));
self.queue = self.queue.slice(heights.length);
async.map(heights, function(height, next) {
var blockArgs = self.queue.slice(0, Math.min(5, self.queue.length));
self.queue = self.queue.slice(blockArgs.length);
self._getBlocks(blockArgs, next);
self.bitcoind.getBlock(height, function(err, block) {
if(err) {
return next(err);
}
block.__height = height;
setTimeout(function() {
next(null, block);
}, 1);
});
}, function(err, blocks) {
if(err) {
return next(err);
}
for(var i = 0; i < blocks.length; i++) {
self.lastEmittedHash = blocks[i].hash;
self.push(blocks[i]);
}
next();
});
}, function(err) {
if(err) {
return self.emit('error', err);
@ -244,6 +213,56 @@ BlockStream.prototype._process = function() {
};
BlockStream.prototype._getBlocks = function(heights, callback) {
var self = this;
async.map(heights, function(height, next) {
if (height === 0) {
var block = new Block(self.bitcoind.genesisBuffer);
block.__height = 0;
return next(null, block);
}
self.bitcoind.getBlock(height, function(err, block) {
if(err) {
return next(err);
}
block.__height = height;
next(null, block);
});
}, function(err, blocks) {
if(err) {
return callback(err);
}
//at this point, we know that all blocks we've sent down the pipe
//have not been reorg'ed, but the new batch here might have been
self.sync.forkBlock = self.db.detectReorg(blocks);
if (!self.sync.forkBlock) {
for(var i = 0; i < blocks.length; i++) {
self.lastEmittedHash = blocks[i].hash;
self.push(blocks[i]);
}
return callback();
}
self.push(null);
});
};
ProcessSerial.prototype._write = function(block, enc, callback) {
var self = this;
@ -251,13 +270,6 @@ ProcessSerial.prototype._write = function(block, enc, callback) {
return self.db.concurrentTip.__height >= block.__height;
}
var prevHash = BufferUtil.reverse(block.header.prevHash).toString('hex');
if(prevHash !== self.tip.hash) {
var err = new Error('Reorg detected');
err.reorg = true;
return self.emit('error', err, block);
}
if(check()) {
return self._process(block, callback);
}
@ -343,12 +355,6 @@ ProcessConcurrent.prototype._flush = function(callback) {
WriteStream.prototype._write = function(obj, enc, callback) {
var self = this;
if(self.db.reorg) {
var err = new Error('reorg in process');
err.reorg2 = true;
return callback(err);
}
self.db.store.batch(obj.operations, function(err) {
if(err) {
return callback(err);
@ -364,12 +370,6 @@ WriteStream.prototype._write = function(obj, enc, callback) {
ProcessBoth.prototype._write = function(block, encoding, callback) {
var self = this;
if(self.db.reorg) {
var err = new Error('reorg in process');
err.reorg2 = true;
return callback(err);
}
async.parallel([function(next) {
self.db.getConcurrentBlockOperations(block, true, function(err, operations) {
if(err) {

View File

@ -49,7 +49,7 @@ TimestampService.prototype.blockHandler = function(block, connectBlock, callback
return next(null, (block.header.timestamp - 1));
}
self.getTimestamp(block.header.prevHash.reverse().toString('hex'), next);
self.getTimestamp(block.toObject().header.prevHash, next);
}
getLastTimestamp(function(err, lastTimestamp) {

View File

@ -15,8 +15,9 @@ var _ = require('lodash');
var bodyParser = require('body-parser');
var LRU = require('lru-cache');
var Encoding = require('./encoding');
var Readable = require('stream').Readable;
var Input = require('bitcore-lib').Transaction.Input;
var Transform = require('stream').Transform;
var transform = new Transform({ objectMode: true });
var WalletService = function(options) {
BaseService.call(this, options);
@ -27,14 +28,6 @@ var WalletService = function(options) {
maxAge: 86400000 * 3 //3 days
});
this._cache = LRU({
max: 500 * 1024 * 1024,
length: function(n) {
return Buffer.byteLength(n, 'utf8');
},
maxAge: 30 * 60 * 1000
});
this._addressMap = {};
this.balances = {};
};
@ -59,6 +52,7 @@ WalletService.prototype.start = function(callback) {
self.store = self.node.services.db.store;
self.node.services.db.getPrefix(self.name, function(err, servicePrefix) {
if(err) {
return callback(err);
}
@ -67,12 +61,14 @@ WalletService.prototype.start = function(callback) {
self._encoding = new Encoding(self.servicePrefix);
self._loadAllAddresses(function(err) {
if(err) {
return callback(err);
}
self._loadAllBalances(callback);
});
});
};
@ -107,20 +103,24 @@ WalletService.prototype._checkAddresses = function() {
};
WalletService.prototype.blockHandler = function(block, connectBlock, callback) {
var opts = {
block: block,
connectBlock: connectBlock,
serial: true
};
this._blockHandler(opts, callback);
};
WalletService.prototype.concurrentBlockHandler = function(block, connectBlock, callback) {
var opts = {
block: block,
connectBlock: connectBlock
};
this._blockHandler(opts, callback);
};
WalletService.prototype._blockHandler = function(opts, callback) {
@ -272,8 +272,6 @@ WalletService.prototype._processSerialInput = function(opts, tx, input, callback
var self = this;
//we may not have walletIds to update but this input may be spending a pay-to-pub-key utxo
//so we must get the spending tx to check on that
var walletIds = input.script && input.script.isPublicKeyIn() ?
['p2pk'] :
self._getWalletIdsFromScript(input);
@ -619,7 +617,9 @@ WalletService.prototype._endpointPostAddresses = function() {
};
WalletService.prototype._endpointGetTransactions = function() {
var self = this;
return function(req, res) {
var walletId = req.params.walletId;
@ -632,28 +632,30 @@ WalletService.prototype._endpointGetTransactions = function() {
var options = {
start: heights[0] || 0,
end : heights[1] || 0xffffffff,
from: req.query.from,
to: req.query.to
self: self,
walletId: walletId
};
self._getTransactions(walletId, options, function(err, transactions) {
transform._transform = function(chunk, enc, callback) {
if(err) {
return utils.sendError(err, res);
}
var txid = self._encoding.decodeWalletTransactionKey(chunk).txid;
self._getTransactionFromDb(options, txid, function(err, tx) {
var rs = new Readable();
transform.push(utils.toJSONL(self._formatTransaction(tx)));
callback();
transactions.forEach(function(transaction) {
if (transaction) {
rs.push(utils.toJSONL(self._formatTransaction(transaction)));
}
});
rs.push(null);
rs.pipe(res);
};
});
transform._flush = function (callback) {
callback();
}
var encodingFn = self._encoding.encodeWalletTransactionKey.bind(self._encoding);
var stream = self.store.createKeyStream(self._getSearchParams(encodingFn, options));
stream.pipe(transform).pipe(res);
});
};
};
@ -664,7 +666,7 @@ WalletService.prototype._formatTransactions = function(txs) {
WalletService.prototype._formatTransaction = function(tx) {
var obj = tx.toObject();
//jsonl parser will not allow newline characters here
for(var i = 0; i < tx.inputs.length; i++) {
obj.inputs[i].inputSatoshis = tx.__inputValues[i];
}
@ -758,20 +760,6 @@ WalletService.prototype._getBalance = function(walletId, options, callback) {
};
WalletService.prototype._chunkAdresses = function(addresses) {
var maxLength = this.node.services.bitcoind.maxAddressesQuery;
var groups = [];
var groupsCount = Math.ceil(addresses.length / maxLength);
for(var i = 0; i < groupsCount; i++) {
groups.push(addresses.slice(i * maxLength, Math.min(maxLength * (i + 1), addresses.length)));
}
return groups;
};
WalletService.prototype._getSearchParams = function(fn, options) {
return {
gte: fn.call(this, options.walletId, options.start),
@ -782,7 +770,6 @@ WalletService.prototype._getSearchParams = function(fn, options) {
WalletService.prototype._getTxidsFromDb = function(options, callback) {
var self = this;
var txids = [];
var encodingFn = self._encoding.encodeWalletTransactionKey.bind(self._encoding);
var stream = self.store.createKeyStream(self._getSearchParams(encodingFn, options));
@ -802,32 +789,20 @@ WalletService.prototype._getTxidsFromDb = function(options, callback) {
};
WalletService.prototype._getTransactionsFromDb = function(txids, options, callback) {
WalletService.prototype._getTransactionFromDb = function(options, txid, callback) {
var self = this;
async.mapLimit(txids, 10, function(txid, next) {
var self = options.self;
self.node.services.transaction.getTransaction(txid, options, next);
}, function(err, txs) {
self.node.services.transaction.getTransaction(txid, options, function(err, tx) {
if(err) {
return callback(err);
}
self._cache.set(options.key, JSON.stringify(self._formatTransactions(txs)));
if (!options.queryMempool) {
options.to = options.to || txs.length;
return callback(null, txs.slice(options.from, options.to), txs.length);
}
options.txs = txs;
self._getTransactionsFromMempool(options, callback);
callback(null, tx);
});
};
WalletService.prototype._getTransactionsFromMempool = function(options, callback) {
@ -841,12 +816,20 @@ WalletService.prototype._getTransactionsFromMempool = function(options, callback
}
self.mempool.getTransactionsByAddresses(addresses, function(err, mempoolTxs) {
if(err) {
return callback(err);
}
var txs = options.txs.concat(mempoolTxs);
callback(null, txs.slice(options.from, options.to), txs.length);
if (mempoolTxs) {
mempoolTxs.forEach(function(tx) {
options.readable.push(utils.toJSONL(self._formatTransaction(tx)));
});
}
options.readable.push(null);
options.readable.pipe(options.response);
callback();
});
});
@ -862,35 +845,20 @@ WalletService.prototype._getTransactions = function(walletId, options, callback)
var opts = {
start: start,
end: end,
from: options.from || 0,
to: options.to || 0,
walletId: walletId,
key: walletId + start + end
key: walletId + start + end,
readable: options.readable
};
if (!self._cache.peek(opts.key)) {
return self._getTxidsFromDb(opts, callback);
}
try {
opts.txs = JSON.parse(self._cache.get(opts.key));
self._getTransactionsFromMempool(opts, callback);
} catch(e) {
self._cache.del(opts.key);
return callback(e);
}
return self._getTxidsFromDb(opts, callback);
};
WalletService.prototype._removeWallet = function(walletId, callback) {
var self = this;
async.map(Object.keys(self._encoding.subKeyMap), function(prefix, next) {
var keys = [];
var start = self._encoding.subKeyMap[prefix].fn.call(self._encoding, walletId);
@ -1252,16 +1220,6 @@ WalletService.prototype._endpointJobs = function() {
};
WalletService.prototype._endpointIsSynced = function() {
var self = this;
return function(req, res) {
res.status(200).jsonp({ result: !self.node.services.db.syncing });
};
};
WalletService.prototype._endpointJobStatus = function() {
var self = this;
@ -1320,9 +1278,6 @@ WalletService.prototype._setupReadOnlyRoutes = function(app) {
app.get('/jobs',
s._endpointJobs()
);
app.get('/issynced',
s._endpointIsSynced()
);
};
WalletService.prototype._setupWriteRoutes = function(app) {

215
regtest/db.js Normal file
View File

@ -0,0 +1,215 @@
'use strict';
var chai = require('chai');
var should = chai.should();
var async = require('async');
var path = require('path');
var utils = require('./utils');
var zmq = require('zmq');
var http = require('http');
var blocks = require('../test/data/blocks.json');
var bitcore = require('bitcore-lib');
var Block = bitcore.Block;
var BufferUtil = bitcore.util.buffer;
/*
Bitcoind does not need to be started or run
*/
var debug = false;
var bitcoreDataDir = '/tmp/bitcore';
var pubSocket;
var rpcServer;
function setupFakeRpcServer() {
rpcServer = http.createServer();
rpcServer.listen(48332, '127.0.0.1');
}
function setupFakeZmq() {
pubSocket = zmq.socket('pub');
pubSocket.bind('tcp://127.0.0.1:38332');
}
var bitcore = {
configFile: {
file: bitcoreDataDir + '/bitcore-node.json',
conf: {
network: 'regtest',
port: 53001,
datadir: bitcoreDataDir,
services: [
'bitcoind',
'db',
'transaction',
'timestamp',
'address',
'mempool',
'wallet-api',
'web'
],
servicesConfig: {
bitcoind: {
connect: [
{
rpcconnect: '127.0.0.1',
rpcport: 48332,
rpcuser: 'bitcoin',
rpcpassword: 'local321',
zmqpubrawtx: 'tcp://127.0.0.1:38332'
}
]
}
}
}
},
httpOpts: {
protocol: 'http:',
hostname: 'localhost',
port: 53001,
},
opts: { cwd: bitcoreDataDir },
datadir: bitcoreDataDir,
exec: path.resolve(__dirname, '../bin/bitcore-node'),
args: ['start'],
process: null
};
var opts = {
debug: debug,
bitcore: bitcore,
bitcoreDataDir: bitcoreDataDir,
blockHeight: 0
};
var genesis = new Block(new Buffer(blocks.genesis, 'hex'));
var block1 = new Block(new Buffer(blocks.block1a, 'hex'));
var block2 = new Block(new Buffer(blocks.block1b, 'hex'));
describe('DB Operations', function() {
this.timeout(60000);
describe('DB Reorg', function() {
var self = this;
var responses = [
genesis.hash,
{ hash: genesis.hash, height: 0 },
genesis.hash,
blocks.genesis, //end initChain
block1.hash,
blocks.block1a,
block2.hash,
blocks.block1b,
{ hash: block1.header.hash, previousblockhash: BufferUtil.reverse(block1.header.prevHash).toString('hex') },
{ hash: block2.header.hash, previousblockhash: BufferUtil.reverse(block2.header.prevHash).toString('hex') },
blocks.genesis,
blocks.block1b,
];
after(function(done) {
pubSocket.close();
rpcServer.close();
bitcore.process.kill();
setTimeout(done, 1000);
});
before(function(done) {
var responseCount = 0;
setupFakeRpcServer();
rpcServer.on('request', function(req, res) {
var data = '';
req.on('data', function(chunk) {
data += chunk.toString();
});
req.on('end', function() {
var body = JSON.parse(data);
//console.log('request', body);
var response = JSON.stringify({ result: responses[responseCount++] });
//console.log('response', response, 'id: ', body.id);
res.write(response);
res.end();
});
});
setupFakeZmq();
self.opts = Object.assign({}, opts);
utils.startBitcoreNode(self.opts, function() {
utils.waitForBitcoreNode(self.opts, done);
});
});
it('should reorg when needed', function(done) {
var block1a = '77d0b8043d3a1353ffd22ad70e228e30c15fd0f250d51d608b1b7997e6239ffb';
var block1b = '2e516187b1b58467cb138bf68ff00d9bda71b5487cdd7b9b9cfbe7b153cd59d4';
/*
_______________________________________________________
| | | | |
| Genesis | Block 1a | Block 1b | Result |
| _______ ________ |
| | |_____| |___________________ORPHANED |
| |_______| |________| |
| | ________ ________ |
| |______________________| |____| | |
| |________| |________| |
|_______________________________________________________|
*/
async.series([
publishBlockHash.bind(self, block1a),
publishBlockHash.bind(self, block1b)
], function(err) {
if(err) {
return done(err);
}
done();
});
});
});
});
function publishBlockHash(blockHash, callback) {
pubSocket.send([ 'hashblock', new Buffer(blockHash, 'hex') ]);
var httpOpts = utils.getHttpOpts(opts, { path: '/wallet-api/info' });
//we don't know exactly when all the blockhandlers will complete after the "tip" event
//so we must wait an indeterminate time to check on the current tip
setTimeout(function() {
utils.queryBitcoreNode(httpOpts, function(err, res) {
if(err) {
return callback(err);
}
blockHash.should.equal(JSON.parse(res).hash);
callback();
});
}, 2000);
}

View File

@ -11,7 +11,6 @@ var http = require('http');
var Unit = bitcore.Unit;
var Transaction = bitcore.Transaction;
var PrivateKey = bitcore.PrivateKey;
var crypto = require('crypto');
var utils = {};
@ -73,21 +72,23 @@ utils.queryBitcoreNode = function(httpOpts, callback) {
request.end();
};
utils.waitForBitcoreNode = function(callback) {
utils.waitForBitcoreNode = function(opts, callback) {
bitcore.process.stdout.on('data', function(data) {
if (debug) {
var self = this;
opts.bitcore.process.stdout.on('data', function(data) {
if (opts.debug) {
console.log(data.toString());
}
});
bitcore.process.stderr.on('data', function(data) {
opts.bitcore.process.stderr.on('data', function(data) {
console.log(data.toString());
});
var errorFilter = function(err, res) {
try {
if (JSON.parse(res).height === blockHeight) {
if (JSON.parse(res).height === opts.blockHeight) {
return;
}
return res;
@ -96,29 +97,40 @@ utils.waitForBitcoreNode = function(callback) {
}
};
var httpOpts = getHttpOpts({ path: '/wallet-api/info', errorFilter: errorFilter });
var httpOpts = self.getHttpOpts(opts, { path: '/wallet-api/info', errorFilter: errorFilter });
waitForService(queryBitcoreNode.bind(this, httpOpts), callback);
self.waitForService(self.queryBitcoreNode.bind(self, httpOpts), callback);
};
utils.waitForBitcoinReady = function(callback) {
waitForService(function(callback) {
rpc.generate(initialHeight, function(err, res) {
utils.waitForBitcoinReady = function(opts, callback) {
var self = this;
self.waitForService(function(callback) {
opts.rpc.generate(opts.initialHeight, function(err, res) {
if (err || (res && res.error)) {
return callback('keep trying');
}
blockHeight += initialHeight;
opts.blockHeight += opts.initialHeight;
callback();
});
}, function(err) {
if(err) {
return callback(err);
}
callback();
}, callback);
}
};
utils.initializeAndStartService = function(opts, callback) {
var self = this;
rimraf(opts.datadir, function(err) {
if(err) {
return callback(err);
@ -128,34 +140,37 @@ utils.initializeAndStartService = function(opts, callback) {
return callback(err);
}
if (opts.configFile) {
writeConfigFile(opts.configFile.file, opts.configFile.conf);
self.writeConfigFile(opts.configFile.file, opts.configFile.conf);
}
var args = _.isArray(opts.args) ? opts.args : toArgs(opts.args);
var args = _.isArray(opts.args) ? opts.args : self.toArgs(opts.args);
opts.process = spawn(opts.exec, args, opts.opts);
callback();
});
});
}
utils.startBitcoreNode = function(callback) {
initializeAndStartService(bitcore, callback);
}
};
utils.startBitcoind = function(callback) {
initializeAndStartService(bitcoin, callback);
}
utils.startBitcoreNode = function(opts, callback) {
this.initializeAndStartService(opts.bitcore, callback);
};
utils.unlockWallet = function(callback) {
rpc.walletPassPhrase(walletPassphrase, 3000, function(err) {
utils.startBitcoind = function(opts, callback) {
this.initializeAndStartService(opts.bitcoin, callback);
};
utils.unlockWallet = function(opts, callback) {
opts.rpc.walletPassPhrase(opts.walletPassphrase, 3000, function(err) {
if(err && err.code !== -15) {
return callback(err);
}
callback();
});
}
};
utils.getPrivateKeysWithABalance = function(opts, callback) {
opts.rpc.listUnspent(function(err, res) {
utils.getPrivateKeysWithABalance = function(callback) {
rpc.listUnspent(function(err, res) {
if(err) {
return callback(err);
}
@ -170,13 +185,15 @@ utils.getPrivateKeysWithABalance = function(callback) {
return callback(new Error('no utxos available'));
}
async.mapLimit(utxos, 8, function(utxo, callback) {
rpc.dumpPrivKey(utxo.address, function(err, res) {
opts.rpc.dumpPrivKey(utxo.address, function(err, res) {
if(err) {
return callback(err);
}
var privKey = res.result;
callback(null, { utxo: utxo, privKey: privKey });
});
}, function(err, utxos) {
if(err) {
return callback(err);
@ -184,11 +201,13 @@ utils.getPrivateKeysWithABalance = function(callback) {
callback(null, utxos);
});
});
}
utils.generateSpendingTxs = function(utxos) {
};
utils.generateSpendingTxs = function(opts, utxos) {
return utxos.map(function(utxo) {
txCount++;
var toPrivKey = new PrivateKey('testnet'); //external addresses
var changePrivKey = new PrivateKey('testnet'); //our wallet keys
var utxoSatoshis = Unit.fromBTC(utxo.utxo.amount).satoshis;
@ -197,77 +216,90 @@ utils.generateSpendingTxs = function(utxos) {
tx.from(utxo.utxo);
tx.to(toPrivKey.toAddress().toString(), satsToPrivKey);
tx.fee(fee);
tx.fee(opts.fee);
tx.change(changePrivKey.toAddress().toString());
tx.sign(utxo.privKey);
walletPrivKeys.push(changePrivKey);
satoshisReceived += Unit.fromBTC(utxo.utxo.amount).toSatoshis() - (satsToPrivKey + fee);
opts.walletPrivKeys.push(changePrivKey);
opts.satoshisReceived += Unit.fromBTC(utxo.utxo.amount).toSatoshis() - (satsToPrivKey + opts.fee);
return tx;
});
}
utils.setupInitialTxs = function(callback) {
getPrivateKeysWithABalance(function(err, utxos) {
};
utils.setupInitialTxs = function(opts, callback) {
var self = this;
self.getPrivateKeysWithABalance(opts, function(err, utxos) {
if(err) {
return callback(err);
}
initialTxs = generateSpendingTxs(utxos);
opts.initialTxs = self.generateSpendingTxs(opts, utxos);
callback();
});
}
utils.sendTxs = function(callback) {
async.eachOfSeries(initialTxs, sendTx, callback);
}
};
utils.sendTx = function(tx, index, callback) {
rpc.sendRawTransaction(tx.serialize(), function(err) {
utils.sendTxs = function(opts, callback) {
async.eachOfSeries(opts.initialTxs, this.sendTx.bind(this, opts), callback);
};
utils.sendTx = function(opts, tx, index, callback) {
opts.rpc.sendRawTransaction(tx.serialize(), function(err) {
if (err) {
return callback(err);
}
var mod = index % 2;
if (mod === 1) {
blockHeight++;
rpc.generate(1, callback);
opts.blockHeight++;
opts.rpc.generate(1, callback);
} else {
callback();
}
});
}
utils.getHttpOpts = function(opts) {
};
utils.getHttpOpts = function(opts, httpOpts) {
return Object.assign({
path: opts.path,
method: opts.method || 'GET',
body: opts.body,
path: httpOpts.path,
method: httpOpts.method || 'GET',
body: httpOpts.body,
headers: {
'Content-Type': 'application/json',
'Content-Length': opts.length || 0
'Content-Length': httpOpts.length || 0
},
errorFilter: opts.errorFilter
}, bitcore.httpOpts);
}
errorFilter: httpOpts.errorFilter
}, opts.bitcore.httpOpts);
};
utils.registerWallet = function(callback) {
var httpOpts = getHttpOpts({ path: '/wallet-api/wallets/' + walletId, method: 'POST' });
queryBitcoreNode(httpOpts, callback);
}
utils.registerWallet = function(opts, callback) {
utils.uploadWallet = function(callback) {
var addresses = JSON.stringify(walletPrivKeys.map(function(privKey) {
var httpOpts = this.getHttpOpts(opts, { path: '/wallet-api/wallets/' + opts.walletId, method: 'POST' });
this.queryBitcoreNode(httpOpts, callback);
};
utils.uploadWallet = function(opts, callback) {
var self = this;
var addresses = JSON.stringify(opts.walletPrivKeys.map(function(privKey) {
if (privKey.privKey) {
return privKey.pubKey.toString();
}
return privKey.toAddress().toString();
}));
var httpOpts = getHttpOpts({
path: '/wallet-api/wallets/' + walletId + '/addresses',
var httpOpts = self.getHttpOpts(opts, {
path: '/wallet-api/wallets/' + opts.walletId + '/addresses',
method: 'POST',
body: addresses,
length: addresses.length
});
async.waterfall([ queryBitcoreNode.bind(this, httpOpts) ], function(err, res) {
async.waterfall([ self.queryBitcoreNode.bind(self, httpOpts) ], function(err, res) {
if (err) {
return callback(err);
}
@ -275,10 +307,10 @@ utils.uploadWallet = function(callback) {
Object.keys(job).should.deep.equal(['jobId']);
var httpOpts = getHttpOpts({ path: '/wallet-api/jobs/' + job.jobId });
var httpOpts = self.getHttpOpts(opts, { path: '/wallet-api/jobs/' + job.jobId });
async.retry({ times: 10, interval: 1000 }, function(next) {
queryBitcoreNode(httpOpts, function(err, res) {
self.queryBitcoreNode(httpOpts, function(err, res) {
if (err) {
return next(err);
}
@ -296,23 +328,30 @@ utils.uploadWallet = function(callback) {
callback();
});
});
}
utils.getListOfTxs = function(callback) {
};
utils.getListOfTxs = function(opts, callback) {
var self = this;
var end = Date.now() + 86400000;
var httpOpts = getHttpOpts({ path: '/wallet-api/wallets/' + walletId + '/transactions?start=0&end=' + end });
queryBitcoreNode(httpOpts, function(err, res) {
var httpOpts = self.getHttpOpts(opts, {
path: '/wallet-api/wallets/' + opts.walletId + '/transactions?start=0&end=' + end });
self.queryBitcoreNode(httpOpts, function(err, res) {
if(err) {
return callback(err);
}
var results = [];
res.split('\n').forEach(function(result) {
if (result.length > 0) {
return results.push(JSON.parse(result));
}
});
var map = initialTxs.map(function(tx) {
var map = opts.initialTxs.map(function(tx) {
return tx.serialize();
});
@ -322,28 +361,15 @@ utils.getListOfTxs = function(callback) {
});
map.length.should.equal(0);
results.length.should.equal(initialTxs.length);
results.length.should.equal(opts.initialTxs.length);
callback();
});
}
};
utils.initGlobals = function() {
walletPassphrase = 'test';
txCount = 0;
blockHeight = 0;
walletPrivKeys = [];
initialTxs = [];
fee = 100000;
feesReceived = 0;
satoshisSent = 0;
walletId = crypto.createHash('sha256').update('test').digest('hex');
satoshisReceived = 0;
}
utils.cleanup = function(callback) {
bitcore.process.kill();
bitcoin.process.kill();
utils.cleanup = function(opts, callback) {
opts.bitcore.process.kill();
opts.bitcoin.process.kill();
setTimeout(callback, 2000);
}
};
module.exports = utils;

View File

@ -3,10 +3,10 @@
var chai = require('chai');
var should = chai.should();
var async = require('async');
var bitcore = require('bitcore-lib');
var BitcoinRPC = require('bitcoind-rpc');
var path = require('path');
var utils = require('utils');
var utils = require('./utils');
var crypto = require('crypto');
var debug = false;
var bitcoreDataDir = '/tmp/bitcore';
@ -82,36 +82,58 @@ var bitcore = {
process: null
};
var rpc = new BitcoinRPC(rpcConfig);
var walletPassphrase, txCount, blockHeight, walletPrivKeys,
initialTxs, fee, walletId, satoshisReceived, satoshisSent, feesReceived;
var initialHeight = 150;
var opts = {
debug: debug,
bitcore: bitcore,
bitcoin: bitcoin,
bitcoinDataDir: bitcoinDataDir,
bitcoreDataDir: bitcoreDataDir,
rpc: new BitcoinRPC(rpcConfig),
walletPassphrase: 'test',
txCount: 0,
blockHeight: 0,
walletPrivKeys: [],
initialTxs: [],
fee: 100000,
feesReceived: 0,
satoshisSent: 0,
walletId: crypto.createHash('sha256').update('test').digest('hex'),
satoshisReceived: 0,
initialHeight: 150
};
describe('Wallet Operations', function() {
this.timeout(60000);
after(cleanup);
describe('Register, Upload, GetTransactions', function() {
describe('Register and Upload', function() {
var self = this;
after(function(done) {
utils.cleanup(self.opts, done);
});
before(function(done) {
initGlobals();
self.opts = Object.assign({}, opts);
async.series([
startBitcoind,
waitForBitcoinReady,
unlockWallet,
setupInitialTxs,
startBitcoreNode,
waitForBitcoreNode
utils.startBitcoind.bind(utils, self.opts),
utils.waitForBitcoinReady.bind(utils, self.opts),
utils.unlockWallet.bind(utils, self.opts),
utils.setupInitialTxs.bind(utils, self.opts),
utils.startBitcoreNode.bind(utils, self.opts),
utils.waitForBitcoreNode.bind(utils, self.opts)
], done);
});
it('should register wallet', function(done) {
registerWallet(function(err, res) {
utils.registerWallet.call(utils, self.opts, function(err, res) {
if (err) {
return done(err);
}
res.should.deep.equal(JSON.stringify({
walletId: '9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08'
}));
@ -120,25 +142,26 @@ describe('Wallet Operations', function() {
});
it('should upload a wallet', function(done) {
uploadWallet(done);
});
});
utils.uploadWallet.call(utils, self.opts, done);
describe('Load addresses at genesis block', function() {
before(function(done) {
sendTxs(function(err) {
if(err) {
return done(err);
}
waitForBitcoreNode(done);
});
});
it('should get a list of transactions', function(done) {
getListOfTxs(done);
utils.sendTxs.call(utils, self.opts, function(err) {
if(err) {
return done(err);
}
utils.waitForBitcoreNode.call(utils, self.opts, function(err) {
if(err) {
return done(err);
}
utils.getListOfTxs.call(utils, self.opts, done);
});
});
});
@ -146,62 +169,79 @@ describe('Wallet Operations', function() {
describe('Load addresses after syncing the blockchain', function() {
var self = this;
self.opts = Object.assign({}, opts);
after(utils.cleanup.bind(utils, self.opts));
before(function(done) {
initGlobals();
async.series([
cleanup,
startBitcoind,
waitForBitcoinReady,
unlockWallet,
setupInitialTxs,
sendTxs,
startBitcoreNode,
waitForBitcoreNode,
registerWallet,
uploadWallet
utils.startBitcoind.bind(utils, self.opts),
utils.waitForBitcoinReady.bind(utils, self.opts),
utils.unlockWallet.bind(utils, self.opts),
utils.setupInitialTxs.bind(utils, self.opts),
utils.sendTxs.bind(utils, self.opts),
utils.startBitcoreNode.bind(utils, self.opts),
utils.waitForBitcoreNode.bind(utils, self.opts),
utils.registerWallet.bind(utils, self.opts),
utils.uploadWallet.bind(utils, self.opts)
], done);
});
it('should get list of transactions', function(done) {
getListOfTxs(done);
utils.getListOfTxs.call(utils, self.opts, done);
});
it('should get the balance of a wallet', function(done) {
var httpOpts = getHttpOpts({ path: '/wallet-api/wallets/' + walletId + '/balance' });
queryBitcoreNode(httpOpts, function(err, res) {
var httpOpts = utils.getHttpOpts.call(
utils,
self.opts,
{ path: '/wallet-api/wallets/' + self.opts.walletId + '/balance' });
utils.queryBitcoreNode.call(utils, httpOpts, function(err, res) {
if(err) {
return done(err);
}
var results = JSON.parse(res);
results.satoshis.should.equal(satoshisReceived);
results.satoshis.should.equal(self.opts.satoshisReceived);
done();
});
});
it('should get the set of utxos for the wallet', function(done) {
var httpOpts = getHttpOpts({ path: '/wallet-api/wallets/' + walletId + '/utxos' });
queryBitcoreNode(httpOpts, function(err, res) {
var httpOpts = utils.getHttpOpts.call(
utils,
self.opts,
{ path: '/wallet-api/wallets/' + opts.walletId + '/utxos' });
utils.queryBitcoreNode.call(utils, httpOpts, function(err, res) {
if(err) {
return done(err);
}
var results = JSON.parse(res);
var balance = 0;
results.utxos.forEach(function(utxo) {
balance += utxo.satoshis;
});
results.height.should.equal(blockHeight);
balance.should.equal(satoshisReceived);
results.height.should.equal(self.opts.blockHeight);
balance.should.equal(self.opts.satoshisReceived);
done();
});
});
it('should get the list of jobs', function(done) {
var httpOpts = getHttpOpts({ path: '/wallet-api/jobs' });
queryBitcoreNode(httpOpts, function(err, res) {
var httpOpts = utils.getHttpOpts.call(utils, self.opts, { path: '/wallet-api/jobs' });
utils.queryBitcoreNode.call(utils, httpOpts, function(err, res) {
if(err) {
return done(err);
}
@ -212,8 +252,8 @@ describe('Wallet Operations', function() {
});
it('should remove all wallets', function(done) {
var httpOpts = getHttpOpts({ path: '/wallet-api/wallets', method: 'DELETE' });
queryBitcoreNode(httpOpts, function(err, res) {
var httpOpts = utils.getHttpOpts.call(utils, self.opts, { path: '/wallet-api/wallets', method: 'DELETE' });
utils.queryBitcoreNode.call(utils, httpOpts, function(err, res) {
if(err) {
return done(err);
}

5
test/data/blocks.json Normal file
View File

@ -0,0 +1,5 @@
{
"genesis": "0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4adae5494dffff7f20020000000101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000",
"block1a": "0000002006226e46111a0b59caaf126043eb5bbf28c34f3a5e332a1fc7b2b73cf188910f69965a91e7fc9ccccbe4051b74d086114741b96678f5e491b5609b18962252fd2d12f858ffff7f20040000000102000000010000000000000000000000000000000000000000000000000000000000000000ffffffff03510101ffffffff0200f2052a0100000023210372bfaa748e546ba784a4d1395f5cedf673f9f5a8160effbe0f595fe905fb3e59ac0000000000000000266a24aa21a9ede2f61c3f71d1defd3fa999dfa36953755c690689799962b48bebd836974e8cf900000000",
"block1b": "0000002006226e46111a0b59caaf126043eb5bbf28c34f3a5e332a1fc7b2b73cf188910f36f612e7b9a88a90fdd781e8885ae425ee9124e17c22d6eb4253094d0e6f6ae6dfede058ffff7f20010000000101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff03510101ffffffff0100f2052a01000000232103caa96df67b8a5ce37cca53ddf394a093fab6922830fad79fc5f0d8369200121cac00000000"
}

View File

@ -0,0 +1,56 @@
'use strict';
var expect = require('chai').expect;
var bitcore = require('bitcore-lib');
var DB = require('../../../lib/services/db');
describe('DB', function() {
describe('Reorg', function() {
before(function() {
this.db = new DB({
node: {
network: bitcore.Networks.testnet,
datadir: '/tmp',
services: ''
}
});
this.db.tip = { hash: 'ff', height: 444 };
});
it('should detect a reorg from a common ancenstor that is in our set', function() {
var block1 = { hash: '11', header: { prevHash: new Buffer('ff', 'hex') } };
var block2 = { hash: '22', header: { prevHash: new Buffer('11', 'hex') } };
var block3 = { hash: '33', header: { prevHash: new Buffer('22', 'hex') } };
var block4 = { hash: '44', header: { prevHash: new Buffer('22', 'hex') } };
//blocks must be passed in the order that they are received.
var blocks = [ block3, block2, block1, block4 ];
expect(this.db.detectReorg(blocks)).to.deep.equal(block3);
});
it('should detect a reorg from a common ancenstor that is not in our set', function() {
var block1 = { hash: '11', header: { prevHash: new Buffer('ff', 'hex') } };
var block2 = { hash: '22', header: { prevHash: new Buffer('11', 'hex') } };
var block3 = { hash: '33', header: { prevHash: new Buffer('22', 'hex') } };
var block4 = { hash: '44', header: { prevHash: new Buffer('ee', 'hex') } };
var blocks = [ block3, block2, block1, block4 ];
expect(this.db.detectReorg(blocks)).to.deep.equal(block4);
});
it('should not detect a reorg', function() {
this.db.reorgTipHash = null;
var block1 = { hash: '11', header: { prevHash: new Buffer('ff', 'hex') } };
var block2 = { hash: '22', header: { prevHash: new Buffer('11', 'hex') } };
var block3 = { hash: '33', header: { prevHash: new Buffer('22', 'hex') } };
var block4 = { hash: '44', header: { prevHash: new Buffer('33', 'hex') } };
var blocks = [ block3, block2, block1, block4 ];
var actual = this.db.detectReorg(blocks);
expect(actual).to.be.undefined;
});
});
});

View File

@ -0,0 +1,56 @@
'use strict';
var should = require('chai').should();
var sinon = require('sinon');
var bitcore = require('bitcore-lib');
var BufferUtil = bitcore.util.buffer;
var DB = require('../../../lib/services/db');
var Networks = bitcore.Networks;
var EventEmitter = require('events').EventEmitter;
var rimraf = require('rimraf');
var mkdirp = require('mkdirp');
var blocks = require('../../data/blocks.json');
describe('DB', function() {
var bitcoind = {
on: function(event, callback) {
},
genesisBuffer: blocks.genesis
};
var node = {
network: Networks.testnet,
datadir: '/tmp/datadir',
services: { bitcoind: bitcoind },
on: sinon.stub(),
once: sinon.stub()
};
before(function(done) {
var self = this;
rimraf(node.datadir, function(err) {
if(err) {
return done(err);
}
mkdirp(node.datadir, done);
});
this.db = new DB({node: node});
this.emitter = new EventEmitter();
});
describe('Reorg', function() {
it('should start db service', function(done) {
this.db.start(done);
});
});
});

View File

@ -14,7 +14,7 @@ describe('Reorg', function() {
toString: function() {
return input;
}
}
};
});
});
@ -96,11 +96,11 @@ describe('Reorg', function() {
}
}
}
}
};
var reorg = new Reorg(node, db);
reorg.handleReorg(bitcoindBlocks[3], function(err) {
reorg.handleReorg(bitcoindBlocks[3].hash, function(err) {
should.not.exist(err);
db.tip.hash.should.equal('main3');
@ -132,4 +132,4 @@ describe('Reorg', function() {
});
});
});
});
});