Headers are no longer persisted.

This commit is contained in:
Chris Kleeschulte 2017-08-14 12:04:40 -04:00
parent b4fdfa3d42
commit d0eaedbb01
7 changed files with 494 additions and 282 deletions

View File

@ -46,7 +46,8 @@ BlockService.prototype.getAPIMethods = function() {
}; };
BlockService.prototype.getBestBlockHash = function(callback) { BlockService.prototype.getBestBlockHash = function(callback) {
callback(this._header.getAllHeaders().getLastIndex().hash); var hash = this._header.getLastHeader().hash;
callback(null, hash);
}; };
BlockService.prototype.getTip = function() { BlockService.prototype.getTip = function() {
@ -169,7 +170,8 @@ BlockService.prototype.subscribe = function(name, emitter) {
}; };
BlockService.prototype._syncPercentage = function() { BlockService.prototype._syncPercentage = function() {
var ratio = this._tip.height/this._header.getAllHeaders().size; var height = this._header.getLastHeader().height;
var ratio = this._tip.height/height;
return (ratio*100).toFixed(2); return (ratio*100).toFixed(2);
}; };
@ -262,7 +264,7 @@ BlockService.prototype._findCommonAncestor = function(hash, allHeaders, callback
} }
var commonAncestorHash = _newTip; var commonAncestorHash = _newTip;
callback(null, hash, commonAncestorHash, oldBlocks); callback(null, commonAncestorHash, oldBlocks);
}); });
}; };
@ -287,15 +289,26 @@ BlockService.prototype._getBlock = function(hash, callback) {
}); });
}; };
BlockService.prototype._getHash = function(blockArg) { BlockService.prototype._getHash = function(blockArg, callback) {
var headers = this._header.getAllHeaders();
if (utils.isHeight(blockArg)) { if (utils.isHeight(blockArg)) {
return headers.getIndex(blockArg).hash;
this._header.getHeaderByHeight(blockArg, function(err, header) {
if(err) {
return callback(err);
}
if (!header) {
return callback();
}
callback(null, header.hash);
});
} }
return blockArg; return callback(null, blockArg);
}; };
@ -310,12 +323,12 @@ BlockService.prototype._handleReorg = function(hash, allHeaders) {
log.warn('Block Service: Chain reorganization detected! Our current block tip is: ' + log.warn('Block Service: Chain reorganization detected! Our current block tip is: ' +
self._tip.hash + ' the current block: ' + hash + '.'); self._tip.hash + ' the current block: ' + hash + '.');
self._findCommonAncestor(hash, allHeaders, function(err, newHash, commonAncestorHash, oldBlocks) { self._findCommonAncestor(hash, allHeaders, function(err, commonAncestorHash, oldBlocks) {
if (err) { if (err) {
log.error('Block Service: A common ancestor block between hash: ' + log.error('Block Service: A common ancestor block between hash: ' +
self._tip.hash + ' (our current tip) and: ' + newHash + self._tip.hash + ' (our current tip) and: ' + hash +
' (the forked block) could not be found. Bitcore-node must exit.'); ' (the forked block) could not be found. Bitcore-node must exit.');
self.node.stop(); self.node.stop();
@ -408,7 +421,6 @@ BlockService.prototype._processBlock = function(block) {
var self = this; var self = this;
var operations = []; var operations = [];
var services = self.node.services; var services = self.node.services;
var headers = self._header.getAllHeaders();
async.eachSeries( async.eachSeries(
services, services,
@ -448,7 +460,7 @@ BlockService.prototype._processBlock = function(block) {
return; return;
} }
self._tip.height = headers.get(block.rhash()).height; self._tip.height = self._tip.height + 1;
self._tip.hash = block.rhash(); self._tip.hash = block.rhash();
var tipOps = utils.encodeTip(self._tip, self.name); var tipOps = utils.encodeTip(self._tip, self.name);
@ -494,6 +506,7 @@ BlockService.prototype._onBlock = function(block) {
} }
log.debug('Block Service: new block: ' + block.rhash()); log.debug('Block Service: new block: ' + block.rhash());
block.height = this._tip.height + 1;
this._processBlock(block); this._processBlock(block);
}; };
@ -519,10 +532,7 @@ BlockService.prototype._setTip = function(tip) {
BlockService.prototype._startSync = function() { BlockService.prototype._startSync = function() {
this._numNeeded = this._header.getAllHeaders().size - this._tip.height; this._numNeeded = this._header.getLastHeader().height - this._tip.height;
if (this._numNeeded <= 0) {
return;
}
log.info('Block Service: Gathering: ' + this._numNeeded + ' block(s) from the peer-to-peer network.'); log.info('Block Service: Gathering: ' + this._numNeeded + ' block(s) from the peer-to-peer network.');
@ -546,37 +556,38 @@ BlockService.prototype._startSubscriptions = function() {
BlockService.prototype._sync = function() { BlockService.prototype._sync = function() {
if (this.node.stopping) { var self = this;
if (self.node.stopping) {
return; return;
} }
var headers = this._header.getAllHeaders(); var lastHeaderIndex = self._header.getLastHeader().height;
var lastHeaderIndex = headers.size - 1;
if (this._tip.height < lastHeaderIndex) { if (self._tip.height < lastHeaderIndex) {
if (this._tip.height % 144 === 0) { if (self._tip.height % 144 === 0) {
log.info('Block Service: Blocks download progress: ' + log.info('Block Service: Blocks download progress: ' +
this._tip.height + '/' + headers.size + self._tip.height + '/' + lastHeaderIndex +
' (' + this._syncPercentage() + '%)'); ' (' + self._syncPercentage() + '%)');
} }
// the end should be our current tip height + 2 blocks, but if we only return self._header.getNextHash(self._tip, function(err, hash) {
// have one block to go, then the end should be 0, so we just get the last block
var endHash; if(err) {
if (this._tip.height === lastHeaderIndex - 1) { log.error(err);
endHash = 0; self.node.stop();
} else { return;
var end = headers.getIndex(this._tip.height + 2); }
endHash = end ? end.hash : null;
} self._p2p.getBlocks({ startHash: self._tip.hash, endHash: hash });
});
this._p2p.getBlocks({ startHash: this._tip.hash, endHash: endHash });
return;
} }
log.info('Block Service: The best block hash is: ' + this._tip.hash + this._header.blockServiceSyncing = false;
' at height: ' + this._tip.height); log.info('Block Service: The best block hash is: ' + self._tip.hash +
' at height: ' + self._tip.height);
}; };

View File

@ -3,24 +3,29 @@
function Encoding(servicePrefix) { function Encoding(servicePrefix) {
this._servicePrefix = servicePrefix; this._servicePrefix = servicePrefix;
this._hashPrefix = new Buffer('00', 'hex');
this._heightPrefix = new Buffer('01', 'hex');
} }
// ---- hash --> header // ---- hash --> header
Encoding.prototype.encodeHeaderKey = function(height, hash) { Encoding.prototype.encodeHeaderHashKey = function(hash) {
var heightBuf = new Buffer(4); var hashBuf = new Buffer(hash, 'hex');
heightBuf.writeUInt32BE(height); return Buffer.concat([ this._servicePrefix, this._hashPrefix, hashBuf ]);
var hashBuf = new Buffer(hash || new Array(65).join('0'), 'hex');
return Buffer.concat([ this._servicePrefix, heightBuf, hashBuf ]);
}; };
Encoding.prototype.decodeHeaderKey = function(buffer) { Encoding.prototype.decodeHeaderHashKey = function(buffer) {
var height = buffer.readUInt32BE(2); return buffer.slice(3).toString('hex');
return { };
height: height,
hash: buffer.slice(6).toString('hex')
};
// ---- height --> header
Encoding.prototype.encodeHeaderHeightKey = function(height) {
var heightBuf = new Buffer(4);
heightBuf.writeUInt32BE(height);
return Buffer.concat([ this._servicePrefix, this._heightPrefix, heightBuf ]);
};
Encoding.prototype.decodeHeaderHeightKey = function(buffer) {
return buffer.readUInt32BE(3);
}; };
Encoding.prototype.encodeHeaderValue = function(header) { Encoding.prototype.encodeHeaderValue = function(header) {

View File

@ -20,13 +20,15 @@ var HeaderService = function(options) {
this._tip = null; this._tip = null;
this._p2p = this.node.services.p2p; this._p2p = this.node.services.p2p;
this._db = this.node.services.db; this._db = this.node.services.db;
this._headers = new utils.SimpleMap(); this._hashes = [];
this.subscriptions = {}; this.subscriptions = {};
this.subscriptions.block = []; this.subscriptions.block = [];
this._checkpoint = options.checkpoint || 2000; this._checkpoint = options.checkpoint || 2000;
this.GENESIS_HASH = constants.BITCOIN_GENESIS_HASH[this.node.network]; this.GENESIS_HASH = constants.BITCOIN_GENESIS_HASH[this.node.network];
this._initiallySynced = false; this._lastHeader = null;
this.blockServiceSyncing = true;
}; };
inherits(HeaderService, BaseService); inherits(HeaderService, BaseService);
@ -66,13 +68,49 @@ HeaderService.prototype.getAPIMethods = function() {
}; };
HeaderService.prototype.getAllHeaders = function(callback) {
var self = this;
var start = self._encoding.encodeHeaderHeightKey(0);
var end = self._encoding.encodeHeaderHeightKey(self._tip.height + 1);
var allHeaders = new utils.SimpleMap();
var criteria = {
gte: start,
lt: end
};
var stream = self._db.createReadStream(criteria);
var streamErr;
stream.on('error', function(error) {
streamErr = error;
});
stream.on('data', function(data) {
var header = self._encoding.decodeHeaderValue(data.value);
allHeaders.set(header.hash, header, header.height);
});
stream.on('end', function() {
if (streamErr) {
return streamErr;
}
callback(null, allHeaders);
});
};
HeaderService.prototype.getBlockHeader = function(arg, callback) { HeaderService.prototype.getBlockHeader = function(arg, callback) {
if (utils.isHeight(arg)) { if (utils.isHeight(arg)) {
return callback(null, this._headers.getIndex(arg)); return this._getHeader(arg, null, callback);
} }
return callback(null, this._headers.get(arg)); return this._getHeader(null, arg, callback);
}; };
@ -116,18 +154,26 @@ HeaderService.prototype.start = function(callback) {
merkleRoot: '4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b' merkleRoot: '4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b'
}; };
self._headers.set(self.GENESIS_HASH, genesisHeader, 0); self._lastHeader = genesisHeader;
self._db._store.put(self._encoding.encodeHeaderKey(0, self.GENESIS_HASH), var dbOps = [
self._encoding.encodeHeaderValue(genesisHeader), next); {
return; type: 'put',
key: self._encoding.encodeHeaderHeightKey(0),
value: self._encoding.encodeHeaderValue(genesisHeader)
},
{
type: 'put',
key: self._encoding.encodeHeaderHashKey(self.GENESIS_HASH),
value: self._encoding.encodeHeaderValue(genesisHeader)
}
];
return self._db.batch(dbOps, next);
} }
next(); self._getLastHeader(next);
}, },
function(next) {
self._getPersistedHeaders(next);
}
], function(err) { ], function(err) {
if (err) { if (err) {
@ -170,54 +216,83 @@ HeaderService.prototype.getPublishEvents = function() {
HeaderService.prototype._onBlock = function(block) { HeaderService.prototype._onBlock = function(block) {
var hash = block.rhash(); var self = this;
var header = this._headers.get(hash);
var hash = block.rhash();
var prevHash = bcoin.util.revHex(block.prevBlock);
var newBlock = prevHash === self._lastHeader.hash;
if (newBlock) {
if (!header) {
log.debug('Header Service: new block: ' + hash); log.debug('Header Service: new block: ' + hash);
if (this._detectReorg()) { var header = block.toHeaders().toJSON();
this._handleReorg();
return;
}
header = block.toHeaders().toJSON();
header.timestamp = header.ts; header.timestamp = header.ts;
header.prevHash = header.prevBlock; header.prevHash = header.prevBlock;
this._saveHeaders([this._onHeader(header)]);
self._saveHeaders(self._onHeader(header));
} }
// this is the rare case that a block comes to us out of order or is a reorg'ed block
// in almost all cases, this will be a reorg
if (!newBlock && !self.blockServiceSyncing) {
return self._detectReorg(block, function(err, reorg) {
if (err) {
log.error(err);
self.node.stop();
return;
}
if (reorg) {
self._handleReorg(block);
return;
}
self._broadcast(block);
});
}
setImmediate(function() {
self._broadcast(block);
});
};
HeaderService.prototype._broadcast = function(block) {
for (var i = 0; i < this.subscriptions.block.length; i++) { for (var i = 0; i < this.subscriptions.block.length; i++) {
var prevHeader = this._headers.get(header.prevHash); this.subscriptions.block[i].emit('header/block', block);
assert(prevHeader, 'We must have a previous header in order to calculate this block\'s data.');
block.height = prevHeader.height + 1;
this.subscriptions.block[i].emit('header/block', block, header);
} }
}; };
HeaderService.prototype._onHeader = function(header) { HeaderService.prototype._onHeader = function(header) {
var prevHeader = this._headers.get(header.prevHash); if (!header) {
assert(prevHeader, 'We must have a previous header in order to calculate this header\'s data, current header is: ' + header.hash); return;
}
header.height = prevHeader.height + 1; header.height = this._lastHeader.height + 1;
header.chainwork = this._getChainwork(header, prevHeader).toString(16, 64); header.chainwork = this._getChainwork(header, this._lastHeader).toString(16, 64);
var newHdr = { this._lastHeader = header;
hash: header.hash,
prevHash: header.prevHash,
height: header.height,
chainwork: header.chainwork
};
this._headers.set(header.hash, newHdr, header.height); return [
{
return { type: 'put',
type: 'put', key: this._encoding.encodeHeaderHashKey(header.hash),
key: this._encoding.encodeHeaderKey(header.height, header.hash), value: this._encoding.encodeHeaderValue(header)
value: this._encoding.encodeHeaderValue(header) },
}; {
type: 'put',
key: this._encoding.encodeHeaderHeightKey(header.height),
value: this._encoding.encodeHeaderValue(header)
}
];
}; };
@ -233,7 +308,9 @@ HeaderService.prototype._onHeaders = function(headers) {
header = header.toObject(); header = header.toObject();
dbOps.push(this._onHeader(header)); var ops = this._onHeader(header);
dbOps = dbOps.concat(ops);
this._tip.height = header.height; this._tip.height = header.height;
this._tip.hash = header.hash; this._tip.hash = header.hash;
@ -258,36 +335,42 @@ HeaderService.prototype._saveHeaders = function(dbOps) {
HeaderService.prototype._onHeadersSave = function(err) { HeaderService.prototype._onHeadersSave = function(err) {
if(err) { var self = this;
if (err) {
log.error(err);
self.node.stop();
return;
}
if (!self._syncComplete()) {
self._sync();
return;
}
self._startBlockSubscription();
self._setBestHeader();
self._detectStartupReorg(function(err, reorg) {
if (err) {
log.error(err); log.error(err);
this.node.stop(); self.node.stop();
return; return;
} }
if (!this._syncComplete()) { if (reorg) {
self._handleReorg();
this._sync();
return;
}
assert(!this._headers.hasNullItems(), 'Header list is not complete yet peer has sent all available headers.');
this._startBlockSubscription();
this._setBestHeader();
if (this._detectReorg()) {
this._handleReorg();
return; return;
} }
this._populateNextHashes(); log.info('Header Service: emitting headers to block service.');
log.debug('Header Service: emitting headers to block service.'); self.emit('headers');
this._populateNextHashes(); });
this.emit('headers', this._headers);
}; };
@ -310,67 +393,119 @@ HeaderService.prototype._syncComplete = function() {
}; };
HeaderService.prototype._setBestHeader = function() { HeaderService.prototype._setBestHeader = function() {
var bestHeader = this._headers.getLastIndex(); var bestHeader = this._lastHeader;
this._tip.height = bestHeader.height; this._tip.height = bestHeader.height;
this._tip.hash = bestHeader.hash; this._tip.hash = bestHeader.hash;
log.debug('Header Service: ' + bestHeader.hash + ' is the best block hash.'); log.debug('Header Service: ' + bestHeader.hash + ' is the best block hash.');
}; };
HeaderService.prototype._populateNextHashes = function() { HeaderService.prototype._getHeader = function(height, hash, callback) {
var count = 0; var self = this;
while (count < this._headers.length) { /*jshint -W018 */
var hdr = this._headers.getIndex(count); if (!hash && !(height >= 0)) {
var nextHdr = this._headers.getIndex(++count); /*jshint +W018 */
if (nextHdr) { return callback(new Error('invalid arguments'));
hdr.nextHash = nextHdr.hash;
}
} }
var key;
if (hash) {
key = self._encoding.encodeHeaderHashKey(hash);
} else {
key = self._encoding.encodeHeaderHeightKey(height);
}
self._db.get(key, function(err, data) {
if (err) {
return callback(err);
}
if (!data) {
return callback();
}
callback(null, self._encoding.decodeHeaderValue(data));
});
}; };
HeaderService.prototype._detectReorg = function(block) { HeaderService.prototype._detectReorg = function(block, callback) {
// this is a new block coming after we are sync'ed. assert(block, 'Block is needed to detect reorg.');
// for this not to be a reorg, this block's prev hash should be our tip
// in rare cases, we don't even have this block's parent in our collection console.log(bcoin.util.revHex(block.prevBlock));
if (block) { var key = this._encoding.encodeHeaderHashKey(bcoin.util.revHex(block.prevBlock));
var prevHash = bcoin.util.revHex(block.prevBlock);
if (prevHash === this._tip.hash) { this._db.get(key, function(err, val) {
return false;
if (err) {
return callback(err);
} }
var parentHeader = this._headers.get(prevHash); // is this block's prevHash already referenced in the database? If so, reorg
if (!parentHeader) { if (val) {
log.warn('Block with hash: ' + block.rhash() + return callback(null, true);
' is not in our header set. This could be a block delievered out of order or this block\'s parent: ' +
prevHash + ' has been orphaned before we synced our headers the last time.');
return false;
} }
return true; callback(null, false);
}
// is our original tip's height and hash the same after we rewound by the checkpoint amount of blocks
// and re-imported? If so, then we've reorg'ed since we've been shut down.
var headerHash = this._headers.getIndex(this._originalTip.height).hash;
assert(headerHash, 'Expected a header to exist at height ' + this._originalTip.height); });
if (this._originalTip.hash !== headerHash) {
return true;
}
return false;
}; };
HeaderService.prototype._handleReorg = function() { HeaderService.prototype._detectStartupReorg = function(callback) {
this.emit('reorg', this._headers.getIndex(this._originalTip.height).hash, this._headers);
var self = this;
self._getHeader(self._originalTip.height, null, function(err, header) {
if (err) {
return callback(err);
}
if (!header) {
return callback(null, true);
}
if (header.hash !== self._originalTip.hash) {
return callback(null, true);
}
callback(null, false);
});
};
HeaderService.prototype._handleReorg = function(block) {
var self = this;
self.getAllHeaders(function(err, headers) {
if (err || !headers) {
log.error(err || new Error('Missing headers'));
self.node.stop();
return;
}
var hash = headers.getIndex(self._originalTip.height).hash;
if (block) {
hash = block.rhash();
}
assert(hash, 'To reorg, we need a hash to reorg to.');
self.emit('reorg', hash, headers);
});
}; };
HeaderService.prototype._setListeners = function() { HeaderService.prototype._setListeners = function() {
@ -399,22 +534,65 @@ HeaderService.prototype._startSync = function() {
HeaderService.prototype._sync = function() { HeaderService.prototype._sync = function() {
log.debug('Header Service: download progress: ' + this._tip.height + '/' + log.info('Header Service: download progress: ' + this._tip.height + '/' +
this._bestHeight + ' (' + (this._tip.height / this._bestHeight*100.00).toFixed(2) + '%)'); this._bestHeight + ' (' + (this._tip.height / this._bestHeight*100.00).toFixed(2) + '%)');
this._p2p.getHeaders({ startHash: this._tip.hash }); this._p2p.getHeaders({ startHash: this._tip.hash });
}; };
HeaderService.prototype.getAllHeaders = function() { // this gets the header that is +2 places from hash or returns 0 if there is no such
return this._headers; HeaderService.prototype.getNextHash = function(tip, callback) {
};
HeaderService.prototype._getPersistedHeaders = function(callback) {
var self = this; var self = this;
var startingHeight = self._tip.height; // if the tip being passed in is the second to last block, then return 0 because there isn't a block
// after the last block
if (tip.height + 1 === self._tip.height) {
return callback(null, 0);
}
var start = self._encoding.encodeHeaderHeightKey(tip.height + 2);
var end = self._encoding.encodeHeaderHeightKey(tip.height + 3);
var result = 0;
var criteria = {
gte: start,
lt: end
};
var stream = self._db.createReadStream(criteria);
var streamErr;
stream.on('error', function(error) {
streamErr = error;
});
stream.on('data', function(data) {
result = self._encoding.decodeHeaderValue(data.value).hash;
});
stream.on('end', function() {
if (streamErr) {
return streamErr;
}
callback(null, result);
});
};
HeaderService.prototype.getLastHeader = function() {
assert(this._lastHeader, 'Last headers should be populated.');
return this._lastHeader;
};
HeaderService.prototype._getLastHeader = function(callback) {
var self = this;
if (self._tip.height > self._checkpoint) { if (self._tip.height > self._checkpoint) {
self._tip.height -= self._checkpoint; self._tip.height -= self._checkpoint;
@ -422,10 +600,10 @@ HeaderService.prototype._getPersistedHeaders = function(callback) {
var removalOps = []; var removalOps = [];
var start = self._encoding.encodeHeaderKey(0); var start = self._encoding.encodeHeaderHeightKey(self._tip.height);
var end = self._encoding.encodeHeaderKey(startingHeight + 1); var end = self._encoding.encodeHeaderHeightKey(0xffffffff);
log.info('Getting persisted headers from genesis block to block ' + self._tip.height); log.info('Getting last header synced at height: ' + self._tip.height);
var criteria = { var criteria = {
gte: start, gte: start,
@ -450,17 +628,10 @@ HeaderService.prototype._getPersistedHeaders = function(callback) {
key: data.key key: data.key
}); });
return; return;
} else if (header.height === self._tip.height) {
self._lastHeader = header;
} }
var newHdr = {
hash: header.hash,
prevHash: header.prevHash,
height: header.height,
chainwork: header.chainwork
};
self._headers.set(self._encoding.decodeHeaderKey(data.key).hash, newHdr, header.height);
}); });
stream.on('end', function() { stream.on('end', function() {
@ -469,8 +640,8 @@ HeaderService.prototype._getPersistedHeaders = function(callback) {
return streamErr; return streamErr;
} }
var tipHeader = self._headers.getIndex(self._tip.height); assert(self._lastHeader, 'The last synced header was not in the database.');
self._tip.hash = tipHeader.hash; self._tip.hash = self._lastHeader.hash;
self._db.batch(removalOps, callback); self._db.batch(removalOps, callback);
}); });

1
lib/services/insight-api Symbolic link
View File

@ -0,0 +1 @@
/home/k/source/insight-api

View File

@ -111,6 +111,7 @@ MempoolService.prototype._startSubscriptions = function() {
}; };
MempoolService.prototype.onBlock = function(block, callback) { MempoolService.prototype.onBlock = function(block, callback) {
// remove this block's txs from mempool // remove this block's txs from mempool
var self = this; var self = this;
var ops = block.txs.map(function(tx) { var ops = block.txs.map(function(tx) {
@ -120,6 +121,7 @@ MempoolService.prototype.onBlock = function(block, callback) {
}; };
}); });
callback(null, ops); callback(null, ops);
}; };
MempoolService.prototype._onTransaction = function(tx) { MempoolService.prototype._onTransaction = function(tx) {
@ -135,6 +137,4 @@ MempoolService.prototype.stop = function(callback) {
callback(); callback();
}; };
module.exports = MempoolService; module.exports = MempoolService;

View File

@ -5,7 +5,6 @@ var inherits = require('util').inherits;
var Encoding = require('./encoding'); var Encoding = require('./encoding');
var utils = require('../../utils'); var utils = require('../../utils');
var _ = require('lodash'); var _ = require('lodash');
var LRU = require('lru-cache');
var Unit = require('bitcore-lib').Unit; var Unit = require('bitcore-lib').Unit;
var log = require('../../index').log; var log = require('../../index').log;
var async = require('async'); var async = require('async');
@ -14,11 +13,11 @@ var assert = require('assert');
function TransactionService(options) { function TransactionService(options) {
BaseService.call(this, options); BaseService.call(this, options);
this._db = this.node.services.db; this._db = this.node.services.db;
this._mempool = this.node.services._mempool; this._mempool = this.node.services.mempool;
this._block = this.node.services.block; this._block = this.node.services.block;
this._header = this.node.services.header;
this._p2p = this.node.services.p2p; this._p2p = this.node.services.p2p;
this._timestamp = this.node.services.timestamp; this._timestamp = this.node.services.timestamp;
this._inputValuesCache = LRU(500000); // this should speed up syncing
} }
inherits(TransactionService, BaseService); inherits(TransactionService, BaseService);
@ -45,75 +44,6 @@ TransactionService.prototype.getDetailedTransaction = function(txid, options, ca
this.getTransaction(txid, options, callback); this.getTransaction(txid, options, callback);
}; };
TransactionService.prototype._getTransaction = function(txid, options, callback) {
var self = this;
var _tx;
var queryMempool = _.isUndefined(options.queryMempool) ? true : options.queryMempool;
var key = self._encoding.encodeTransactionKey(txid);
self._db.get(key, function(err, tx) {
if(err) {
return callback(err);
}
if (queryMempool && !tx) {
self._mempool.getTransaction(tx, function(err, memTx) {
if(err) {
return callback(err);
}
if (memTx) {
memTx = self._encoding.decodeTransactionValue(memTx);
memTx.confirmations = 0;
_tx = memTx;
}
});
} else {
if (tx) {
tx = self._encoding.decodeTransactionValue(tx);
tx.confirmations = self._p2p.getBestHeight - tx.__height;
tx.blockHash = self._header.get(tx.__height).hash;
_tx = tx;
}
}
if (!_tx) {
return callback();
}
var outputSatoshis = 0;
_tx.outputs.forEach(function(output) {
outputSatoshis += output.value;
});
_tx.outputs.forEach(function(output) {
outputSatoshis += output.value;
});
_tx.outputSatoshis = outputSatoshis;
if (!_tx.inputs[0].isCoinbase()) {
var inputSatoshis = 0;
_tx.__inputValues.forEach(function(val) {
if (val >- 0) {
inputSatoshis += val;
}
});
var feeSatoshis = inputSatoshis - outputSatoshis;
_tx.inputSatosbis = inputSatoshis;
_tx.feeSatosbis = feeSatoshis;
}
callback(null, _tx);
});
};
TransactionService.prototype.getTransaction = function(txid, options, callback) { TransactionService.prototype.getTransaction = function(txid, options, callback) {
var self = this; var self = this;
@ -138,6 +68,113 @@ TransactionService.prototype.getTransaction = function(txid, options, callback)
}; };
TransactionService.prototype._getTransaction = function(txid, options, callback) {
var self = this;
var queryMempool = _.isUndefined(options.queryMempool) ? true : options.queryMempool;
var key = self._encoding.encodeTransactionKey(txid);
async.waterfall([
// main index?
function(next) {
self._db.get(key, function(err, tx) {
if (err) {
return next(err);
}
if (!tx) {
return next(null, tx);
}
tx = self._encoding.decodeTransactionValue(tx);
tx.confirmations = self._header.getBestHeight() - tx.__height;
self._header.getBlockHeader(tx.__height, function(err, header) {
if (err) {
return next(err);
}
if (header) {
tx.blockHash = header.hash;
}
next(null, tx);
});
});
},
// mempool?
function(tx, next) {
if (queryMempool && !tx) {
return self._mempool.getMempoolTransaction(txid, function(err, memTx) {
if (err) {
return next(err);
}
if (memTx) {
memTx = self._encoding.decodeTransactionValue(memTx);
memTx.confirmations = 0;
}
next(null, memTx);
});
}
next(null, tx);
}
], function(err, tx) {
if (err) {
return callback(err);
}
if (!tx) {
return callback();
}
// output values
var outputSatoshis = 0;
tx.outputs.forEach(function(output) {
outputSatoshis += output.value;
});
tx.outputSatoshis = outputSatoshis;
//input values
if (!tx.inputs[0].isCoinbase()) {
var inputSatoshis = 0;
tx.__inputValues.forEach(function(val) {
if (val >- 0) {
inputSatoshis += val;
}
});
var feeSatoshis = inputSatoshis - outputSatoshis;
tx.inputSatosbis = inputSatoshis;
tx.feeSatosbis = feeSatoshis;
}
callback(null, tx);
});
};
TransactionService.prototype._addMissingInputValues = function(tx, options, callback) { TransactionService.prototype._addMissingInputValues = function(tx, options, callback) {
// if we have cache misses from when we populated input values, // if we have cache misses from when we populated input values,
@ -147,11 +184,12 @@ TransactionService.prototype._addMissingInputValues = function(tx, options, call
var inputSatoshis = tx.__inputValues[index]; var inputSatoshis = tx.__inputValues[index];
if (inputSatoshis >= 0) { if (inputSatoshis >= 0 || input.isCoinbase()) {
return next(); return next();
} }
var outputIndex = input.prevout.index; var outputIndex = input.prevout.index;
self._getTransaction(input.prevout.txid(), options, function(err, _tx) { self._getTransaction(input.prevout.txid(), options, function(err, _tx) {
if (err || !_tx) { if (err || !_tx) {
@ -171,7 +209,18 @@ TransactionService.prototype._addMissingInputValues = function(tx, options, call
return callback(err); return callback(err);
} }
callback(null, tx); var key = self._encoding.encodeTransactionKey(tx.txid());
var value = self._encoding.encodeTransactionValue(tx);
self._db.put(key, value, function(err) {
if (err) {
return callback(err);
}
callback(null, tx);
});
}); });
}; };
@ -211,33 +260,10 @@ TransactionService.prototype.stop = function(callback) {
}; };
// --- start private prototype functions // --- start private prototype functions
TransactionService.prototype._cacheOutputValues = function(tx) {
var values = tx.outputs.map(function(output) {
return Unit.fromBTC(output.value).toSatoshis();
});
this._inputValuesCache.set(tx.txid(), values);
};
TransactionService.prototype._getBlockTimestamp = function(hash) { TransactionService.prototype._getBlockTimestamp = function(hash) {
return this._timestamp.getTimestampSync(hash); return this._timestamp.getTimestampSync(hash);
}; };
TransactionService.prototype._getInputValues = function(tx) {
var self = this;
return tx.inputs.map(function(input) {
var value = self._inputValuesCache.get(input.prevout.txid());
if (value) {
return value[input.prevout.index];
}
return null;
});
};
TransactionService.prototype.onBlock = function(block, callback) { TransactionService.prototype.onBlock = function(block, callback) {
var self = this; var self = this;
@ -250,9 +276,7 @@ TransactionService.prototype.onBlock = function(block, callback) {
return self._processTransaction(tx, { block: block }); return self._processTransaction(tx, { block: block });
}); });
setImmediate(function() { callback(null, operations);
callback(null, operations);
});
}; };
@ -296,20 +320,20 @@ TransactionService.prototype._onReorg = function(commonAncestorHeader, oldBlockL
TransactionService.prototype._processTransaction = function(tx, opts) { TransactionService.prototype._processTransaction = function(tx, opts) {
// squirrel away he current outputs
this._cacheOutputValues(tx);
// this index is very simple txid -> tx, but we also need to find each // this index is very simple txid -> tx, but we also need to find each
// input's prev output value, the adjusted timestamp for the block and // input's prev output value, the adjusted timestamp for the block and
// the tx's block height // the tx's block height
// input values // input values
tx.__inputValues = this._getInputValues(tx); //if there are any nulls here, this is a cache miss tx.__inputValues = []; // these are lazy-loaded on the first access of the tx
//timestamp // timestamp
tx.__timestamp = this._getBlockTimestamp(opts.block.rhash()); tx.__timestamp = this._getBlockTimestamp(opts.block.rhash());
//height assert(tx.__timestamp, 'Timestamp is required when saving a transaction.');
// height
tx.__height = opts.block.height; tx.__height = opts.block.height;
assert(tx.__height, 'Block height is required when saving a trasnaction.');
return { return {
key: this._encoding.encodeTransactionKey(tx.txid()), key: this._encoding.encodeTransactionKey(tx.txid()),
@ -330,7 +354,6 @@ TransactionService.prototype._startSubscriptions = function() {
} }
this._bus.on('block/reorg', this._onReorg.bind(this)); this._bus.on('block/reorg', this._onReorg.bind(this));
this._bus.subscribe('block/reorg'); this._bus.subscribe('block/reorg');
}; };

View File

@ -63,6 +63,7 @@
"liftoff": "^2.2.0", "liftoff": "^2.2.0",
"lodash": "^4.17.4", "lodash": "^4.17.4",
"lru-cache": "^4.0.2", "lru-cache": "^4.0.2",
"memwatch-next": "^0.3.0",
"mkdirp": "0.5.0", "mkdirp": "0.5.0",
"path-is-absolute": "^1.0.0", "path-is-absolute": "^1.0.0",
"socket.io": "^1.4.5", "socket.io": "^1.4.5",