added more blocks stuff.

This commit is contained in:
Chris Kleeschulte 2017-05-30 08:00:28 -04:00
parent 443688face
commit ec826b4940
10 changed files with 260 additions and 145 deletions

View File

@ -330,16 +330,23 @@ Bitcoin.prototype.start = function(callback) {
var self = this;
if (!self.options.connect) {
throw new Error('A "connect" array is required in the bitcoind service configuration.');
log.error('A "connect" array is required in the bitcoind service configuration.');
process.exit(-1);
}
self.nodes = self.options.connect.map(self._connectProcess.bind(self));
if (self.nodes.length === 0) {
throw new Error('Could not connect to any servers in connect array.');
log.error('Could not connect to any servers in connect array.');
process.exit(-1);
}
self._initChain(function() {
async.retry({ interval: 2000, times: 30 }, self._initChain.bind(this), function(err) {
if(err) {
log.error(err.message);
process.exit(-1);
}
log.info('Bitcoin Daemon Ready');
callback();

View File

@ -18,6 +18,10 @@ function BlockStream(highWaterMark, sync) {
this.lastEmittedHash = this.dbTip.hash;
this.queue = [];
this.processing = false;
var self = this;
self.block.on('reorg', function() {
self.push(null);
});
}
inherits(BlockStream, Readable);
@ -112,9 +116,6 @@ BlockHandler.prototype._setupStreams = function() {
processSerial.on('finish', self._onFinish.bind(self));
self.block.once('reorg', function() {
blockStream.push(null);
});
};
BlockHandler.prototype._onFinish = function() {

View File

@ -472,7 +472,7 @@ BlockService.prototype._loadTips = function(callback) {
hash = tipData.slice(0, 32).toString('hex');
self.bitcoind.getBlock(hash, function(err, block) {
self._getBlock(hash, function(err, block) {
if(err) {
return next(err);
@ -500,9 +500,8 @@ BlockService.prototype._loadTips = function(callback) {
BlockService.prototype._detectReorg = function(blocks, callback) {
var self = this;
var tipHash = self.reorgHash || self.tip.hash;
var tipHeight = self.reorgHeight || self.tip.__height;
var tipHash = this.reorgHash || this.tip.hash;
var tipHeight = this.reorgHeight || this.tip.__height;
var forkedHash;
for(var i = 0; i < blocks.length; i++) {
@ -511,24 +510,16 @@ BlockService.prototype._detectReorg = function(blocks, callback) {
continue;
}
var prevHash = utils.reverseBufferToString(blocks[i].header.prevHash);
if (prevHash !== tipHash) {
forkedHash = blocks[i].hash;
break;
if (utils.reverseBufferToString(blocks[i].header.prevHash) !== tipHash) {
return this._handleReorg(forkedHash, callback.bind(this, tipHash, tipHeight));
}
tipHash = blocks[i].hash;
tipHeight = blocks[i].__height;
}
if (forkedHash) {
return self._handleReorg(forkedHash, function() {
callback(tipHash, tipHeight);
});
}
self.reorgHash = tipHash;
self.reorgHeight = tipHeight;
this.reorgHash = tipHash;
this.reorgHeight = tipHeight;
callback();
};

View File

@ -11,6 +11,8 @@ function Encoding(servicePrefix) {
}
Encoding.prototype.encodeUtxoIndexKey = function(address, txid, outputIndex) {
assert(address, 'address is required');
var buffers = [this.servicePrefix, this.nonP2PKPrefix];
var addressSizeBuffer = new Buffer(1);
@ -20,17 +22,22 @@ Encoding.prototype.encodeUtxoIndexKey = function(address, txid, outputIndex) {
buffers.push(addressSizeBuffer);
buffers.push(addressBuffer);
var txidBuffer = new Buffer(txid || new Array(65).join('0'), 'hex');
buffers.push(txidBuffer);
if (txid) {
var txidBuffer = new Buffer(txid, 'hex');
buffers.push(txidBuffer);
}
var outputIndexBuffer = new Buffer(4);
outputIndexBuffer.writeUInt32BE(outputIndex || 0);
buffers.push(outputIndexBuffer);
if (outputIndex >= 0) {
var outputIndexBuffer = new Buffer(4);
outputIndexBuffer.writeUInt32BE(outputIndex);
buffers.push(outputIndexBuffer);
}
return Buffer.concat(buffers);
};
Encoding.prototype.decodeUtxoIndexKey = function(buffer) {
var reader = new BufferReader(buffer);
reader.read(3);
@ -44,17 +51,21 @@ Encoding.prototype.decodeUtxoIndexKey = function(buffer) {
txid: txid,
outputIndex: outputIndex
};
};
Encoding.prototype.encodeUtxoIndexValue = function(height, satoshis, scriptBuffer) {
var heightBuffer = new Buffer(4);
heightBuffer.writeUInt32BE(height);
var satoshisBuffer = new Buffer(8);
satoshisBuffer.writeDoubleBE(satoshis);
return Buffer.concat([heightBuffer, satoshisBuffer, scriptBuffer]);
};
Encoding.prototype.decodeUtxoIndexValue = function(buffer) {
var height = buffer.readUInt32BE();
var satoshis = buffer.readDoubleBE(4);
var scriptBuffer = buffer.slice(12);
@ -63,13 +74,13 @@ Encoding.prototype.decodeUtxoIndexValue = function(buffer) {
satoshis: satoshis,
script: scriptBuffer
};
};
Encoding.prototype.encodeP2PKUtxoIndexKey = function(txid, outputIndex) {
var buffers = [this.servicePrefix, this.P2PKPrefix];
assert(txid && txid.length === 64, 'Txid required');
assert(outputIndex >= 0, 'outputIndex required');
assert(txid, 'txid is required');
var buffers = [this.servicePrefix, this.P2PKPrefix];
var txidBuffer = new Buffer(txid);
buffers.push(txidBuffer);

View File

@ -57,30 +57,34 @@ UtxoService.prototype.blockHandler = function(block, connect, callback) {
operations = self._processOutputs(tx, outputs, block, connect).concat(operations);
}
callback(null, operations);
setImmediate(function() {
callback(null, operations);
});
};
UtxoService.prototype.getUtxosForAddress = function(address, callback) {
var self = this;
var utxos = [];
var start = self.encoding.encodeUtxoIndexKey(address);
var stream = self.db.createReadStream({
gte: start.slice(0, -36),
lt: Buffer.concat([ start.slice(0, -36), new Buffer('ff', 'hex') ])
gte: start,
lt: utils.getTerminalKey(start)
});
stream.on('data', function(data) {
var key = self.encoding.decodeUtxoIndexKey(data.key);
var value = self.encoding.decodeUtxoIndexValue(data.value);
utxos.push({
txid: key.txid,
outputIndex: key.outputIndex,
address: address,
txId: key.txid,
outputIndex: key.outputIndex,
height: value.height,
satoshis: value.satoshis,
script: value.script
script: value.script.toString('hex')
});
});
@ -149,6 +153,7 @@ UtxoService.prototype._processOutputs = function(tx, outputs, block, connect) {
});
if (key && value) {
//console.log(this.encoding.decodeUtxoIndexKey(key));
var operation = connect ? {
type: 'put',
key: key,

View File

@ -41,32 +41,6 @@ utils.parseParamsWithJSON = function(paramsArg) {
return params;
};
/*
* input: arguments passed into originating function (whoever called us)
* output: bool args are valid for encoding a key to the database
*/
utils.hasRequiredArgsForEncoding = function(args) {
function exists(arg) {
return !(arg === null || arg === undefined);
}
if (!exists(args[0])) {
return false;
}
var pastArgMissing;
for(var i = 1; i < args.length; i++) {
var argMissing = exists(args[i]);
if (argMissing && pastArgMissing) {
return false;
}
pastArgMissing = argMissing;
}
return true;
};
utils.getTerminalKey = function(startKey) {
var endKey = Buffer.from(startKey);
endKey.writeUInt8(startKey.readUInt8(startKey.length - 1) + 1, startKey.length - 1);

View File

@ -78,7 +78,7 @@
"istanbul": "^0.4.3",
"jshint": "^2.9.2",
"jshint-stylish": "^2.1.0",
"mocha": "^2.4.5",
"mocha": "",
"proxyquire": "^1.3.1",
"rimraf": "^2.4.2",
"sinon": "^1.15.4"

View File

@ -49,7 +49,7 @@ TestWebService.prototype.setupRoutes = function(app) {
app.get('/utxo/:address', function(req, res) {
self.node.services.utxo.getUtxosForAddress(req.params.address, function(err, utxos) {
res.status(200).jsonp({ address: req.params.address, utxos: utxos });
res.status(200).jsonp({ utxos: utxos });
});
});
};

View File

@ -12,24 +12,26 @@ var Unit = bitcore.Unit;
var Transaction = bitcore.Transaction;
var PrivateKey = bitcore.PrivateKey;
var utils = {};
var Utils = function(opts) {
this.opts = opts;
};
utils.writeConfigFile = function(fileStr, obj) {
Utils.prototype.writeConfigFile = function(fileStr, obj) {
fs.writeFileSync(fileStr, JSON.stringify(obj));
};
utils.toArgs = function(opts) {
Utils.prototype.toArgs = function(opts) {
return Object.keys(opts).map(function(key) {
return '-' + key + '=' + opts[key];
});
};
utils.waitForService = function(task, callback) {
Utils.prototype.waitForService = function(task, callback) {
var retryOpts = { times: 20, interval: 1000 };
async.retry(retryOpts, task, callback);
};
utils.queryBitcoreNode = function(httpOpts, callback) {
Utils.prototype.queryBitcoreNode = function(httpOpts, callback) {
var error;
var request = http.request(httpOpts, function(res) {
@ -72,25 +74,16 @@ utils.queryBitcoreNode = function(httpOpts, callback) {
request.end();
};
utils.waitForBitcoreNode = function(opts, callback) {
Utils.prototype.waitForBitcoreNode = function(callback) {
var self = this;
opts.bitcore.process.stdout.on('data', function(data) {
if (opts.debug) {
console.log(data.toString());
}
});
opts.bitcore.process.stderr.on('data', function(data) {
console.log(data.toString());
});
var errorFilter = function(err, res) {
try {
var info = JSON.parse(res);
if (info.dbheight === opts.blockHeight &&
info.bitcoindheight === opts.blockHeight) {
if (info.dbheight === self.opts.blockHeight &&
info.dbheight === info.bitcoindheight &&
info.bitcoindhash === info.dbhash) {
return;
}
return res;
@ -99,22 +92,22 @@ utils.waitForBitcoreNode = function(opts, callback) {
}
};
var httpOpts = self.getHttpOpts(opts, { path: '/info', errorFilter: errorFilter });
var httpOpts = self.getHttpOpts({ path: '/info', errorFilter: errorFilter });
self.waitForService(self.queryBitcoreNode.bind(self, httpOpts), callback);
};
utils.waitForBitcoinReady = function(opts, callback) {
Utils.prototype.waitForBitcoinReady = function(callback) {
var self = this;
self.waitForService(function(callback) {
opts.rpc.generate(opts.initialHeight, function(err, res) {
self.opts.rpc.generate(self.opts.initialHeight, function(err, res) {
if (err || (res && res.error)) {
return callback('keep trying');
}
opts.blockHeight += opts.initialHeight;
self.opts.blockHeight += self.opts.initialHeight;
callback();
});
}, function(err) {
@ -129,7 +122,7 @@ utils.waitForBitcoinReady = function(opts, callback) {
};
utils.initializeAndStartService = function(opts, callback) {
Utils.prototype.initializeAndStartService = function(opts, callback) {
var self = this;
@ -152,16 +145,36 @@ utils.initializeAndStartService = function(opts, callback) {
};
utils.startBitcoreNode = function(opts, callback) {
this.initializeAndStartService(opts.bitcore, callback);
Utils.prototype.startBitcoreNode = function(callback) {
var self = this;
this.initializeAndStartService(self.opts.bitcore, function(err) {
if(err) {
return callback(err);
}
self.opts.bitcore.process.stdout.on('data', function(data) {
if (self.opts.debug) {
process.stdout.write(data.toString());
}
});
self.opts.bitcore.process.stderr.on('data', function(data) {
process.stdout.write(data.toString());
});
callback();
});
};
utils.startBitcoind = function(opts, callback) {
this.initializeAndStartService(opts.bitcoin, callback);
Utils.prototype.startBitcoind = function(callback) {
this.initializeAndStartService(this.opts.bitcoin, callback);
};
utils.unlockWallet = function(opts, callback) {
opts.rpc.walletPassPhrase(opts.walletPassphrase, 3000, function(err) {
Utils.prototype.unlockWallet = function(callback) {
this.opts.rpc.walletPassPhrase(this.opts.walletPassphrase, 3000, function(err) {
if(err && err.code !== -15) {
return callback(err);
}
@ -169,9 +182,10 @@ utils.unlockWallet = function(opts, callback) {
});
};
utils.getPrivateKeysWithABalance = function(opts, callback) {
Utils.prototype.getPrivateKeysWithABalance = function(callback) {
opts.rpc.listUnspent(function(err, res) {
var self = this;
self.opts.rpc.listUnspent(function(err, res) {
if(err) {
return callback(err);
@ -188,7 +202,7 @@ utils.getPrivateKeysWithABalance = function(opts, callback) {
}
async.mapLimit(utxos, 8, function(utxo, callback) {
opts.rpc.dumpPrivKey(utxo.address, function(err, res) {
self.opts.rpc.dumpPrivKey(utxo.address, function(err, res) {
if(err) {
return callback(err);
}
@ -206,8 +220,9 @@ utils.getPrivateKeysWithABalance = function(opts, callback) {
};
utils.generateSpendingTxs = function(opts, utxos) {
Utils.prototype.generateSpendingTxs = function(utxos) {
var self = this;
return utxos.map(function(utxo) {
var toPrivKey = new PrivateKey('testnet'); //external addresses
@ -218,45 +233,46 @@ utils.generateSpendingTxs = function(opts, utxos) {
tx.from(utxo.utxo);
tx.to(toPrivKey.toAddress().toString(), satsToPrivKey);
tx.fee(opts.fee);
tx.fee(self.opts.fee);
tx.change(changePrivKey.toAddress().toString());
tx.sign(utxo.privKey);
opts.walletPrivKeys.push(changePrivKey);
opts.satoshisReceived += Unit.fromBTC(utxo.utxo.amount).toSatoshis() - (satsToPrivKey + opts.fee);
self.opts.walletPrivKeys.push(changePrivKey);
self.opts.satoshisReceived += Unit.fromBTC(utxo.utxo.amount).toSatoshis() - (satsToPrivKey + self.opts.fee);
return tx;
});
};
utils.setupInitialTxs = function(opts, callback) {
Utils.prototype.setupInitialTxs = function(callback) {
var self = this;
self.getPrivateKeysWithABalance(opts, function(err, utxos) {
self.getPrivateKeysWithABalance(function(err, utxos) {
if(err) {
return callback(err);
}
opts.initialTxs = self.generateSpendingTxs(opts, utxos);
self.opts.initialTxs = self.generateSpendingTxs(utxos);
callback();
});
};
utils.sendTxs = function(opts, callback) {
async.eachOfSeries(opts.initialTxs, this.sendTx.bind(this, opts), callback);
Utils.prototype.sendTxs = function(callback) {
async.eachOfSeries(this.opts.initialTxs, this.sendTx.bind(this), callback);
};
utils.sendTx = function(opts, tx, index, callback) {
Utils.prototype.sendTx = function(tx, index, callback) {
opts.rpc.sendRawTransaction(tx.serialize(), function(err) {
var self = this;
self.opts.rpc.sendRawTransaction(tx.serialize(), function(err) {
if (err) {
return callback(err);
}
var mod = index % 2;
if (mod === 1) {
opts.blockHeight++;
opts.rpc.generate(1, callback);
self.opts.blockHeight++;
self.opts.rpc.generate(1, callback);
} else {
callback();
}
@ -264,7 +280,7 @@ utils.sendTx = function(opts, tx, index, callback) {
};
utils.getHttpOpts = function(opts, httpOpts) {
Utils.prototype.getHttpOpts = function(httpOpts) {
return Object.assign({
path: httpOpts.path,
method: httpOpts.method || 'GET',
@ -274,28 +290,28 @@ utils.getHttpOpts = function(opts, httpOpts) {
'Content-Length': httpOpts.length || 0
},
errorFilter: httpOpts.errorFilter
}, opts.bitcore.httpOpts);
}, this.opts.bitcore.httpOpts);
};
utils.registerWallet = function(opts, callback) {
Utils.prototype.registerWallet = function(callback) {
var httpOpts = this.getHttpOpts(opts, { path: '/wallet-api/wallets/' + opts.walletId, method: 'POST' });
var httpOpts = this.getHttpOpts(this.opts, { path: '/wallet-api/wallets/' + this.opts.walletId, method: 'POST' });
this.queryBitcoreNode(httpOpts, callback);
};
utils.uploadWallet = function(opts, callback) {
Utils.prototype.uploadWallet = function(callback) {
var self = this;
var addresses = JSON.stringify(opts.walletPrivKeys.map(function(privKey) {
var addresses = JSON.stringify(self.opts.walletPrivKeys.map(function(privKey) {
if (privKey.privKey) {
return privKey.pubKey.toString();
}
return privKey.toAddress().toString();
}));
var httpOpts = self.getHttpOpts(opts, {
path: '/wallet-api/wallets/' + opts.walletId + '/addresses',
var httpOpts = self.getHttpOpts(self.opts, {
path: '/wallet-api/wallets/' + self.opts.walletId + '/addresses',
method: 'POST',
body: addresses,
length: addresses.length
@ -309,7 +325,7 @@ utils.uploadWallet = function(opts, callback) {
Object.keys(job).should.deep.equal(['jobId']);
var httpOpts = self.getHttpOpts(opts, { path: '/wallet-api/jobs/' + job.jobId });
var httpOpts = self.getHttpOpts(self.opts, { path: '/wallet-api/jobs/' + job.jobId });
async.retry({ times: 10, interval: 1000 }, function(next) {
self.queryBitcoreNode(httpOpts, function(err, res) {
@ -333,12 +349,12 @@ utils.uploadWallet = function(opts, callback) {
};
utils.getListOfTxs = function(opts, callback) {
Utils.prototype.getListOfTxs = function(callback) {
var self = this;
var end = Date.now() + 86400000;
var httpOpts = self.getHttpOpts(opts, {
path: '/wallet-api/wallets/' + opts.walletId + '/transactions?start=0&end=' + end });
var httpOpts = self.getHttpOpts(self.opts, {
path: '/wallet-api/wallets/' + self.opts.walletId + '/transactions?start=0&end=' + end });
self.queryBitcoreNode(httpOpts, function(err, res) {
if(err) {
@ -353,7 +369,7 @@ utils.getListOfTxs = function(opts, callback) {
});
var map = opts.initialTxs.map(function(tx) {
var map = self.opts.initialTxs.map(function(tx) {
return tx.serialize();
});
@ -363,15 +379,15 @@ utils.getListOfTxs = function(opts, callback) {
});
map.length.should.equal(0);
results.length.should.equal(opts.initialTxs.length);
results.length.should.equal(self.opts.initialTxs.length);
callback();
});
};
utils.cleanup = function(opts, callback) {
opts.bitcore.process.kill();
opts.bitcoin.process.kill();
Utils.prototype.cleanup = function(callback) {
this.opts.bitcore.process.kill();
this.opts.bitcoin.process.kill();
setTimeout(callback, 2000);
};
module.exports = utils;
module.exports = Utils;

View File

@ -5,10 +5,16 @@ var expect = chai.expect;
var async = require('async');
var BitcoinRPC = require('bitcoind-rpc');
var path = require('path');
var utils = require('./utils');
var Utils = require('./utils');
var crypto = require('crypto');
var bitcore = require('bitcore-lib');
var PrivateKey = bitcore.PrivateKey;
var Transaction = bitcore.Transaction;
var Output = bitcore.Transaction.Output;
var Script = bitcore.Script;
var _ = require('lodash');
var debug = true;
var debug = false;
var bitcoreDataDir = '/tmp/bitcore';
var bitcoinDataDir = '/tmp/bitcoin';
@ -31,7 +37,7 @@ var bitcoin = {
rpcpassword: rpcConfig.pass,
rpcport: rpcConfig.port,
zmqpubrawtx: 'tcp://127.0.0.1:38332',
zmqpubhashblock: 'tcp://127.0.0.1:38332'
zmqpubrawblock: 'tcp://127.0.0.1:38332'
},
datadir: bitcoinDataDir,
exec: 'bitcoind', //if this isn't on your PATH, then provide the absolute path, e.g. /usr/local/bin/bitcoind
@ -104,57 +110,161 @@ var opts = {
initialHeight: 150
};
var utils = new Utils(opts);
describe('Utxo Operations', function() {
this.timeout(60000);
var self = this;
after(function(done) {
utils.cleanup(self.opts, done);
utils.cleanup(done);
});
before(function(done) {
self.opts = Object.assign({}, opts);
async.series([
utils.startBitcoind.bind(utils, self.opts),
utils.waitForBitcoinReady.bind(utils, self.opts),
utils.unlockWallet.bind(utils, self.opts),
utils.setupInitialTxs.bind(utils, self.opts),
utils.sendTxs.bind(utils, self.opts),
utils.startBitcoreNode.bind(utils, self.opts),
utils.waitForBitcoreNode.bind(utils, self.opts)
utils.startBitcoind.bind(utils),
utils.waitForBitcoinReady.bind(utils),
utils.unlockWallet.bind(utils),
utils.setupInitialTxs.bind(utils),
utils.sendTxs.bind(utils),
utils.startBitcoreNode.bind(utils),
utils.waitForBitcoreNode.bind(utils)
], done);
});
it('should index utxos', function(done) {
async.mapLimit(opts.walletPrivKeys, 12, function(privKey, next) {
async.mapSeries(opts.walletPrivKeys, function(privKey, next) {
var address = privKey.toAddress().toString();
utils.queryBitcoreNode(Object.assign({
var httpOpts = Object.assign({
path: '/test/utxo/' + address
}, bitcore.httpOpts), function(err, res) {
}, bitcore.httpOpts);
utils.queryBitcoreNode(httpOpts, function(err, res) {
if(err) {
return next(err);
}
res = JSON.parse(res);
expect(res.address).to.equal(address);
expect(res.utxos.length).equal(1);
expect(Object.keys(res.utxos[0])).to.deep.equal([ 'txid', 'outputIndex', 'address', 'height', 'satoshis', 'script' ]);
expect(res.utxos[0].address).to.equal(address);
expect(Object.keys(res.utxos[0])).to.deep.equal([
'address',
'txId',
'outputIndex',
'height',
'satoshis',
'script' ]);
next(null, res.utxos);
});
}, function(err, utxos) {
}, function(err, results) {
if(err) {
return done(err);
}
self.utxos = _.flatten(results);
done();
});
});
it('should store p2pk and p2pkh utxos', function(done) {
var pk1 = new PrivateKey('testnet');
var pk2 = new PrivateKey('testnet');
var satoshis = 100000000;
var utxo = self.utxos[0];
var tx = new Transaction();
tx.from(utxo);
tx.addOutput(new Output({
satoshis: satoshis,
script: Script.buildPublicKeyOut(pk1.publicKey)
}));
tx.change(pk2.toAddress().toString());
tx.sign(opts.walletPrivKeys[0]);
async.series([
function(next) {
utils.sendTx(tx, 1, function(err) {
if (err) {
return next(err);
}
next();
});
},
function(next) {
utils.waitForBitcoreNode(function(err) {
if (err) {
return next(err);
}
next();
});
},
function(next) {
var address = pk1.publicKey.toString('hex');
var httpOpts = Object.assign({
path: '/test/utxo/' + address
}, bitcore.httpOpts);
utils.queryBitcoreNode(httpOpts, function(err, res) {
if(err) {
return next(err);
}
res = JSON.parse(res);
expect(res.utxos.length).to.equal(1);
expect(res.utxos[0].address).to.equal(address);
expect(res.utxos[0].satoshis).to.equal(satoshis);
expect(Object.keys(res.utxos[0])).to.deep.equal([
'address',
'txId',
'outputIndex',
'height',
'satoshis',
'script' ]);
next();
});
}
], function(err) {
if(err) {
return done(err);
}
done();
});
});
});