This commit is contained in:
Chris Kleeschulte 2017-02-05 22:14:00 -05:00
parent 71ca53f8ba
commit f83b8a63e3
9 changed files with 232 additions and 192 deletions

View File

@ -13,6 +13,7 @@ function Logger(options) {
options = {};
}
this.formatting = _.isUndefined(options.formatting) ? Logger.DEFAULT_FORMATTING : options.formatting;
this.permitWrites = true;
}
Logger.DEFAULT_FORMATTING = true;
@ -54,6 +55,9 @@ Logger.prototype.warn = function() {
* #_log
*/
Logger.prototype._log = function(color) {
if (!this.permitWrites) {
return;
}
var args = Array.prototype.slice.call(arguments);
args = args.slice(1);
var level = args.shift();

View File

@ -123,21 +123,21 @@ Bitcoin.prototype._initDefaults = function(options) {
Bitcoin.prototype._initCaches = function() {
// caches valid until there is a new block
this.utxosCache = LRU(50000);
this.txidsCache = LRU(50000);
this.balanceCache = LRU(50000);
this.summaryCache = LRU(50000);
this.blockOverviewCache = LRU(144);
this.transactionDetailedCache = LRU(100000);
this.utxosCache = LRU(500);
this.txidsCache = LRU(500);
this.balanceCache = LRU(500);
this.summaryCache = LRU(500);
this.blockOverviewCache = LRU(12);
this.transactionDetailedCache = LRU(1000);
// caches valid indefinitely
this.transactionCache = LRU(100000);
this.rawTransactionCache = LRU(50000);
this.blockCache = LRU(144);
this.rawBlockCache = LRU(72);
this.blockHeaderCache = LRU(288);
this.zmqKnownTransactions = LRU(5000);
this.zmqKnownBlocks = LRU(50);
this.transactionCache = LRU(1000);
this.rawTransactionCache = LRU(500);
this.blockCache = LRU(12);
this.rawBlockCache = LRU(6);
this.blockHeaderCache = LRU(24);
this.zmqKnownTransactions = LRU(50);
this.zmqKnownBlocks = LRU(1);
this.lastTip = 0;
this.lastTipTimeout = false;
};

View File

@ -203,6 +203,7 @@ DB.prototype.start = function(callback) {
this.node.once('ready', function() {
// start syncing
log.permitWrites = false;
self._sync.initialSync();
// Notify that there is a new tip

View File

@ -7,6 +7,10 @@ var EventEmitter = require('events').EventEmitter;
var async = require('async');
var bitcore = require('bitcore-lib');
var BufferUtil = bitcore.util.buffer;
var ProgressBar = require('progress');
var utils = require('../../utils.js');
var green = '\u001b[42m \u001b[0m';
var red = '\u001b[41m \u001b[0m';
function BlockStream(highWaterMark, bitcoind, lastHeight) {
Readable.call(this, {objectMode: true, highWaterMark: highWaterMark});
@ -15,6 +19,7 @@ function BlockStream(highWaterMark, bitcoind, lastHeight) {
this.stopping = false;
this.queue = [];
this.processing = false;
this.latestHeightRetrieved = 0;
}
inherits(BlockStream, Readable);
@ -29,10 +34,12 @@ function ProcessConcurrent(highWaterMark, db) {
inherits(ProcessConcurrent, Transform);
function ProcessSerial(highWaterMark, db, tip) {
function ProcessSerial(highWaterMark, db, tip, progressBar) {
Writable.call(this, {objectMode: true, highWaterMark: highWaterMark});
this.db = db;
this.tip = tip;
this.progressBar = progressBar;
this.processBlockStartTime = [];
}
inherits(ProcessSerial, Writable);
@ -58,6 +65,7 @@ function Sync(node, db) {
this.db = db;
this.syncing = false;
this.highWaterMark = 100;
this.progressBar = null;
}
inherits(Sync, EventEmitter);
@ -70,27 +78,38 @@ Sync.prototype.initialSync = function() {
return;
}
this.syncing = true;
this.blockStream = new BlockStream(this.highWaterMark, this.node.services.bitcoind, this.db.tip.__height);
var processConcurrent = new ProcessConcurrent(this.highWaterMark, this.db);
var writeStream = new WriteStream(this.highWaterMark, this.db);
var processSerial = new ProcessSerial(this.highWaterMark, this.db, this.db.tip);
self.progressBar = new ProgressBar('[:bar] :percent :current blocks/sec: :blockspersec', {
complete: green,
incomplete: red,
total: self.node.services.bitcoind.height
});
this._handleErrors(this.blockStream);
this._handleErrors(processConcurrent);
this._handleErrors(processSerial);
this._handleErrors(writeStream);
console.log('tip: ', self.db.tip.__height);
self.progressBar.tick(self.db.tip.__height, {
blockspersec: 0.0000
});
self.syncing = true;
self.blockStream = new BlockStream(self.highWaterMark, self.node.services.bitcoind, self.db.tip.__height);
var processConcurrent = new ProcessConcurrent(self.highWaterMark, self.db);
var writeStream = new WriteStream(self.highWaterMark, self.db);
var processSerial = new ProcessSerial(self.highWaterMark, self.db, self.db.tip, self.progressBar);
self._handleErrors(self.blockStream);
self._handleErrors(processConcurrent);
self._handleErrors(processSerial);
self._handleErrors(writeStream);
processSerial.on('finish', function() {
self.syncing = false;
self.emit('synced');
});
this.blockStream
self.blockStream
.pipe(processConcurrent)
.pipe(writeStream);
this.blockStream
self.blockStream
.pipe(processSerial);
};
@ -145,67 +164,9 @@ Sync.prototype._handleErrors = function(stream) {
};
// BlockStream.prototype._read = function() {
// var self = this;
// // TODO does not work :(
// var blockCount = Math.min(self.bitcoind.height - self.lastHeight, 5);
// if(blockCount <= 0 || this.stopping) {
// return self.push(null);
// }
// console.log('Fetching blocks ' + (self.lastHeight + 1) + ' to ' + (self.lastHeight + blockCount));
// async.times(blockCount, function(n, next) {
// var height = self.lastHeight + n + 1;
// self.bitcoind.getBlock(height, function(err, block) {
// if(err) {
// return next(err);
// }
// block.__height = height;
// next(null, block);
// });
// }, function(err, blocks) {
// if(err) {
// return self.emit('error', err);
// }
// for(var i = 0; i < blocks.length; i++) {
// self.push(blocks[i]);
// }
// self.lastHeight += blocks.length;
// });
// };
// BlockStream.prototype._read = function() {
// var self = this;
// self.lastHeight++;
// //console.log('Fetching block ' + self.lastHeight);
// var height = self.lastHeight;
// self.bitcoind.getBlock(height, function(err, block) {
// if(err) {
// return self.emit(err);
// }
// block.__height = height;
// //console.log('pushing block ' + block.__height);
// self.push(block);
// });
// };
BlockStream.prototype._read = function() {
this.lastHeight++;
this.queue.push(this.lastHeight);
this._process();
};
@ -226,15 +187,11 @@ BlockStream.prototype._process = function() {
}, function(next) {
var heights = self.queue.slice(0, Math.min(5, self.queue.length));
self.queue = self.queue.slice(heights.length);
//console.log('fetching blocks ' + heights[0] + ' to ' + heights[heights.length - 1]);
async.map(heights, function(height, next) {
self.bitcoind.getBlock(height, function(err, block) {
if(err) {
return next(err);
}
block.__height = height;
setTimeout(function() {
@ -258,7 +215,6 @@ BlockStream.prototype._process = function() {
if(err) {
return self.emit('error', err);
}
self.processing = false;
}
);
@ -267,11 +223,11 @@ BlockStream.prototype._process = function() {
ProcessSerial.prototype._write = function(block, enc, callback) {
var self = this;
self.processBlockStartTime = process.hrtime();
function check() {
return self.db.concurrentTip.__height >= block.__height;
}
//console.log('serial', block.__height);
var prevHash = BufferUtil.reverse(block.header.prevHash).toString('hex');
if(prevHash !== self.tip.hash) {
@ -318,9 +274,10 @@ ProcessSerial.prototype._process = function(block, callback) {
self.db.tip = block;
self.db.emit('addblock');
if(self.db.tip.__height % 100 === 0) {
console.log('Tip:', self.db.tip.__height);
}
self.progressBar.tick(1, {
blockspersec: (1 / utils.diffTime(self.processBlockStartTime)).toFixed(4)
});
callback();
});
@ -383,11 +340,9 @@ WriteStream.prototype._write = function(obj, enc, callback) {
self.db.concurrentTip = obj.concurrentTip;
self.db.emit('concurrentaddblock');
if(self.db.concurrentTip.__height - self.lastConcurrentOutputHeight >= 100) {
console.log('Concurrent tip:', self.db.concurrentTip.__height);
//if(self.db.concurrentTip.__height - self.lastConcurrentOutputHeight >= 100) {
self.lastConcurrentOutputHeight = self.db.concurrentTip.__height;
}
//}
callback();
});
};

View File

@ -28,7 +28,7 @@ Encoding.prototype.encodeTransactionValue = function(transaction) {
}
var inputValuesLengthBuffer = new Buffer(2);
inputValuesLengthBuffer.writeUInt16BE(inputValues.length * 8);
inputValuesLengthBuffer.writeUInt16BE(inputValues.length);
return new Buffer.concat([heightBuffer, timestampBuffer,
inputValuesLengthBuffer, inputValuesBuffer, transaction.toBuffer()]);

View File

@ -142,7 +142,7 @@ TransactionService.prototype._getInputValues = function(tx, callback) {
return next(err);
}
if (!prevTx.outputs[input.outputIndex]) {
return next(new Error('Input did not have utxo.'));
return next(new Error('Input did not have utxo: ' + prevTx.id + ' for tx: ' + tx.id));
}
var satoshis = prevTx.outputs[input.outputIndex].satoshis;
next(null, satoshis);

View File

@ -5,17 +5,32 @@ var BufferReader = bitcore.encoding.BufferReader;
function Encoding(servicePrefix) {
this.servicePrefix = servicePrefix;
this._subKeyMap = {
transaction: new Buffer('00', 'hex'),
addresses: new Buffer('01', 'hex'),
utxo: new Buffer('02', 'hex'),
utxoSat: new Buffer('03', 'hex'),
balance: new Buffer('04', 'hex')
this.subKeyMap = {
transaction: {
fn: this.encodeWalletTransactionKey,
buffer: new Buffer('00', 'hex')
},
addresses: {
fn: this.encodeWalletAddressesKey,
buffer: new Buffer('01', 'hex')
},
utxo: {
fn: this.encodeWalletUtxoKey,
buffer: new Buffer('02', 'hex')
},
utxoSat: {
fn: this.encodeWalletUtxoSatoshisKey,
buffer: new Buffer('03', 'hex')
},
balance: {
fn: this.encodeWalletBalanceKey,
buffer: new Buffer('04', 'hex')
}
};
}
Encoding.prototype.encodeWalletTransactionKey = function(walletId, height) {
var buffers = [this.servicePrefix, this._subKeyMap.transaction];
var buffers = [this.servicePrefix, this.subKeyMap.transaction.buffer];
var walletIdSizeBuffer = new Buffer(1);
walletIdSizeBuffer.writeUInt8(walletId.length);
@ -24,11 +39,9 @@ Encoding.prototype.encodeWalletTransactionKey = function(walletId, height) {
buffers.push(walletIdSizeBuffer);
buffers.push(walletIdBuffer);
if(height !== undefined) {
var heightBuffer = new Buffer(4);
heightBuffer.writeUInt32BE(height);
buffers.push(heightBuffer);
}
var heightBuffer = new Buffer(4);
heightBuffer.writeUInt32BE(height || 0);
buffers.push(heightBuffer);
return Buffer.concat(buffers);
};
@ -58,7 +71,7 @@ Encoding.prototype.decodeWalletTransactionValue = function(buffer) {
};
Encoding.prototype.encodeWalletUtxoKey = function(walletId, txid, outputIndex) {
var buffers = [this.servicePrefix, this._subKeyMap.utxo];
var buffers = [this.servicePrefix, this.subKeyMap.utxo.buffer];
var walletIdSizeBuffer = new Buffer(1);
walletIdSizeBuffer.writeUInt8(walletId.length);
@ -67,16 +80,12 @@ Encoding.prototype.encodeWalletUtxoKey = function(walletId, txid, outputIndex) {
buffers.push(walletIdSizeBuffer);
buffers.push(walletIdBuffer);
if(txid) {
var txidBuffer = new Buffer(txid, 'hex');
buffers.push(txidBuffer);
}
var txidBuffer = new Buffer(txid || new Array(33).join('0'), 'hex');
buffers.push(txidBuffer);
if(outputIndex !== undefined) {
var outputIndexBuffer = new Buffer(4);
outputIndexBuffer.writeUInt32BE(outputIndex);
buffers.push(outputIndexBuffer);
}
var outputIndexBuffer = new Buffer(4);
outputIndexBuffer.writeUInt32BE(outputIndex || 0);
buffers.push(outputIndexBuffer);
return Buffer.concat(buffers);
};
@ -117,7 +126,7 @@ Encoding.prototype.decodeWalletUtxoValue = function(buffer) {
};
Encoding.prototype.encodeWalletUtxoSatoshisKey = function(walletId, satoshis, txid, outputIndex) {
var buffers = [this.servicePrefix, this._subKeyMap.utxoSat];
var buffers = [this.servicePrefix, this.subKeyMap.utxoSat.buffer];
var walletIdSizeBuffer = new Buffer(1);
walletIdSizeBuffer.writeUInt8(walletId.length);
@ -126,22 +135,16 @@ Encoding.prototype.encodeWalletUtxoSatoshisKey = function(walletId, satoshis, tx
buffers.push(walletIdSizeBuffer);
buffers.push(walletIdBuffer);
if(satoshis !== undefined) {
var satoshisBuffer = new Buffer(8);
satoshisBuffer.writeUInt32BE(satoshis);
buffers.push(satoshisBuffer);
}
var satoshisBuffer = new Buffer(8);
satoshisBuffer.writeUInt32BE(satoshis || 0);
buffers.push(satoshisBuffer);
if(txid) {
var txidBuffer = new Buffer(txid, 'hex');
buffers.push(txidBuffer);
}
var txidBuffer = new Buffer(txid || new Array(33).join('0'), 'hex');
buffers.push(txidBuffer);
if(outputIndex !== undefined) {
var outputIndexBuffer = new Buffer(4);
outputIndexBuffer.writeUInt32BE(outputIndex);
buffers.push(outputIndexBuffer);
}
var outputIndexBuffer = new Buffer(4);
outputIndexBuffer.writeUInt32BE(outputIndex || 0);
buffers.push(outputIndexBuffer);
return Buffer.concat(buffers);
};
@ -180,7 +183,7 @@ Encoding.prototype.decodeWalletUtxoSatoshisValue = function(buffer) {
};
Encoding.prototype.encodeWalletAddressesKey = function(walletId) {
var prefix = this._subKeyMap.addresses;
var prefix = this.subKeyMap.addresses.buffer;
var walletIdSizeBuffer = new Buffer(1);
walletIdSizeBuffer.writeUInt8(walletId.length);
var walletIdBuffer = new Buffer(walletId, 'utf8');
@ -188,7 +191,10 @@ Encoding.prototype.encodeWalletAddressesKey = function(walletId) {
};
Encoding.prototype.decodeWalletAddressesKey = function(buffer) {
return buffer.slice(3).toString('hex');
var reader = new BufferReader(buffer);
reader.read(3);
var walletSize = reader.readUInt8();
return reader.read(walletSize).toString('utf8');
};
Encoding.prototype.encodeWalletAddressesValue = function(addresses) {
@ -218,7 +224,7 @@ Encoding.prototype.decodeWalletAddressesValue = function(buffer) {
};
Encoding.prototype.encodeWalletBalanceKey = function(walletId) {
var prefix = this._subKeyMap.balance;
var prefix = this.subKeyMap.balance.buffer;
var walletIdSizeBuffer = new Buffer(1);
walletIdSizeBuffer.writeUInt8(walletId.length);
var walletIdBuffer = new Buffer(walletId, 'utf8');
@ -226,7 +232,10 @@ Encoding.prototype.encodeWalletBalanceKey = function(walletId) {
};
Encoding.prototype.decodeWalletBalanceKey = function(buffer) {
return buffer.slice(3).toString('hex');
var reader = new BufferReader(buffer);
reader.read(3);
var walletSize = reader.readUInt8();
return reader.read(walletSize).toString('utf8');
};
Encoding.prototype.encodeWalletBalanceValue = function(balance) {
@ -236,8 +245,7 @@ Encoding.prototype.encodeWalletBalanceValue = function(balance) {
};
Encoding.prototype.decodeWalletBalanceValue = function(buffer) {
var balance = buffer.readDoubleBE();
return balance;
return buffer.readDoubleBE();
};
module.exports = Encoding;

View File

@ -436,13 +436,13 @@ WalletService.prototype._endpointRemoveWallet = function() {
return function(req, res) {
var walletId = req.params.walletId;
self._removeWallet(walletId, function(err) {
self._removeWallet(walletId, function(err, numRecords) {
if(err) {
return utils.sendError(err, res);
}
res.status(200).jsonp({
walletId: walletId,
result: 'removed'
numberRemoved: numRecords
});
});
};
@ -469,6 +469,71 @@ WalletService.prototype._endpointGetAddresses = function() {
};
};
WalletService.prototype._endpointDumpAllWallets = function() {
var self = this;
return function(req, res) {
var keys = [];
var start = new Buffer(self.servicePrefix);
var end = new Buffer.concat([start, new Buffer('ff', 'hex')]);
var stream = self.store.createKeyStream({
gte: start,
lt: end
});
var streamErr = null;
stream.on('error', function(err) {
streamErr = err;
});
stream.on('data', function(data) {
keys.push(data);
});
stream.on('end', function() {
if(streamErr) {
return utils.sendError(streamErr, res);
}
var resultsMap = keys.map(function(key) {
return key.toString('hex');
});
res.status(200).jsonp({
result: resultsMap
});
});
};
};
WalletService.prototype._endpointGetWalletIds = function() {
var self = this;
return function(req, res) {
var start = new Buffer.concat([self.servicePrefix, new Buffer(self._encoding.subKeyMap.addresses.buffer)]);
var end = new Buffer.concat([start, new Buffer('ff', 'hex')]);
var stream = self.store.createKeyStream({
gte: start,
lt: end
});
var walletIds = [];
var streamErr;
stream.on('error', function(err) {
var streamErr = err;
});
stream.on('data', function(data) {
walletIds.push(self._encoding.decodeWalletAddressesKey(data));
});
stream.on('end', function() {
if(streamErr) {
return utils.sendError(streamErr, res);
}
res.status(200).jsonp({
walletIds: walletIds
});
});
};
};
WalletService.prototype._endpointPostAddresses = function() {
var self = this;
return function(req, res) {
@ -481,7 +546,7 @@ WalletService.prototype._endpointPostAddresses = function() {
//if this is a post to /wallets, then it is expected that a new wallet is to be created
async.series([
function(next) {
self._createWallet(walletId, [], function(err) {
self._createWallet(walletId, function(err) {
if(err) {
return next(err);
}
@ -689,57 +754,53 @@ WalletService.prototype._getTransactions = function(walletId, options, callback)
return callback(e);
}
}
//if (!self._cache.peek(key)) {
// self._getAddresses(walletId, function(err, addresses) {
// if(err) {
// return callback(err);
// }
// if (!addresses) {
// return callback(new Error('wallet not found'));
// }
// var addressGroups = self._chunkAdresses(addresses);
// async.eachSeries(addressGroups, function(addresses, next) {
// self.node.services.bitcoind.getAddressHistory(addresses, opts, function(err, history) {
// if(err) {
// return next(err);
// }
// var groupTransactions = history.items.map(function(item) {
// return item.tx;
// });
// transactions = _.union(transactions, groupTransactions);
// next();
// });
// }, function(err) {
// if(err) {
// return callback(err);
// }
// self._cache.set(key, JSON.stringify(transactions));
// finish();
// });
// });
//} else {
// try {
// transactions = JSON.parse(self._cache.get(key));
// finish();
// } catch(e) {
// self._cache.del(key);
// return callback(e);
// }
//}
};
WalletService.prototype._removeWallet = function(walletId, callback) {
var self = this;
var boundary = self._encoding.encodeWalletAddressesKey(walletId);
//wallet tz
// wallet addresses
// utxo
// utxosat
//wallet balance
async.map(Object.keys(self._encoding.subKeyMap), function(prefix, next) {
var keys = [];
var txStream = self.store.createReadStream({
gte: boundary
lte: boundary
var walletIdBuffer = new Buffer(walletId, 'utf8');
var start = self._encoding.subKeyMap[prefix].fn.call(self._encoding, walletId);
var end = new Buffer.concat([self._encoding.subKeyMap[prefix].fn.call(self._encoding, walletId), new Buffer('ff', 'hex')]);
var stream = self.store.createKeyStream({
gte: start,
lt: end
});
var streamErr = null;
stream.on('error', function(err) {
streamErr = err;
});
stream.on('data', function(data) {
keys.push(data);
});
stream.on('end', function() {
next(streamErr, keys);
});
}, function(err, results) {
if(err) {
return callback(err);
}
results = _.flatten(results);
var operations = [];
for(var i = 0; i < results.length; i++) {
operations.push({
type: 'del',
key: results[i]
});
}
self.store.batch(operations, function(err) {
if(err) {
return callback(err);
}
callback(null, operations.length);
});
});
};
@ -757,10 +818,10 @@ WalletService.prototype._getAddresses = function(walletId, callback) {
});
};
WalletService.prototype._createWallet = function(walletId, addresses, callback) {
WalletService.prototype._createWallet = function(walletId, callback) {
var self = this;
var key = self._encoding.encodeWalletAddressesKey(walletId);
var value = self._encoding.encodeWalletAddressesValue(addresses);
var value = self._encoding.encodeWalletAddressesValue([]);
this.store.put(key, value, callback);
};
@ -927,6 +988,9 @@ WalletService.prototype.setupRoutes = function(app) {
app.get('/wallets/:walletId/balance',
s._endpointGetBalance()
);
app.get('/wallets/dump',
s._endpointDumpAllWallets()
);
app.get('/wallets/:walletId',
s._endpointGetAddresses()
);
@ -944,6 +1008,9 @@ WalletService.prototype.setupRoutes = function(app) {
v.checkAddresses,
s._endpointPostAddresses()
);
app.get('/wallets',
s._endpointGetWalletIds()
);
};
WalletService.prototype.getRoutePrefix = function() {

View File

@ -97,4 +97,9 @@ utils.hasRequiredArgsForEncoding = function(args) {
return true;
};
utils.diffTime = function(time) {
var diff = process.hrtime(time);
return (diff[0] * 1E9 + diff[1])/(1E9 * 1.0);
};
module.exports = utils;